@@ -23,21 +23,16 @@ Licensed to the Apache Software Foundation (ASF) under one
2323package org .trafodion .dcs .master ;
2424
2525import java .io .IOException ;
26- import java .io .InputStream ;
27- import java .net .InetAddress ;
28- import java .net .InetSocketAddress ;
29- import java .net .ServerSocket ;
30- import java .net .NetworkInterface ;
31- import java .nio .charset .Charset ;
32- import java .util .Enumeration ;
33- import java .util .*;
26+ import java .util .concurrent .Callable ;
27+ import java .util .concurrent .CompletionService ;
3428import java .util .concurrent .CountDownLatch ;
35- import java .util .concurrent .Executors ;
29+ import java .util .concurrent .ExecutionException ;
30+ import java .util .concurrent .ExecutorCompletionService ;
3631import java .util .concurrent .ExecutorService ;
32+ import java .util .concurrent .Executors ;
3733import java .util .concurrent .Future ;
38- import java .util .concurrent .ExecutionException ;
34+ import java .util .concurrent .TimeUnit ;
3935
40- import org .apache .commons .io .IOUtils ;
4136import org .apache .commons .cli .CommandLine ;
4237import org .apache .commons .cli .GnuParser ;
4338import org .apache .commons .cli .Options ;
@@ -46,23 +41,22 @@ Licensed to the Apache Software Foundation (ASF) under one
4641import org .apache .commons .logging .LogFactory ;
4742import org .apache .hadoop .conf .Configuration ;
4843import org .apache .zookeeper .CreateMode ;
44+ import org .apache .zookeeper .KeeperException ;
4945import org .apache .zookeeper .ZooDefs ;
5046import org .apache .zookeeper .data .Stat ;
51- import org .apache .zookeeper .KeeperException ;
52- import org .apache .zookeeper .ZooKeeper .States ;
53-
54- import org .apache .hadoop .util .StringUtils ;
55-
5647import org .trafodion .dcs .Constants ;
48+ import org .trafodion .dcs .master .listener .ListenerService ;
49+ import org .trafodion .dcs .master .listener .ListenerWorker ;
5750import org .trafodion .dcs .util .DcsConfiguration ;
5851import org .trafodion .dcs .util .DcsNetworkConfiguration ;
5952import org .trafodion .dcs .util .InfoServer ;
53+ import org .trafodion .dcs .util .RetryCounter ;
54+ import org .trafodion .dcs .util .RetryCounterFactory ;
6055import org .trafodion .dcs .util .VersionInfo ;
61- import org .trafodion .dcs .zookeeper .ZkClient ;
6256import org .trafodion .dcs .zookeeper .ZKConfig ;
63- import org .trafodion .dcs .master . listener . ListenerService ;
57+ import org .trafodion .dcs .zookeeper . ZkClient ;
6458
65- public class DcsMaster implements Runnable {
59+ public class DcsMaster implements Callable < Integer > {
6660 private static final Log LOG = LogFactory .getLog (DcsMaster .class );
6761 private Thread thrd ;
6862 private ZkClient zkc = null ;
@@ -111,11 +105,50 @@ public DcsMaster(String[] args) {
111105 trafodionHome = System .getProperty (Constants .DCS_TRAFODION_HOME );
112106 jvmShutdownHook = new JVMShutdownHook ();
113107 Runtime .getRuntime ().addShutdownHook (jvmShutdownHook );
114- thrd = new Thread (this );
115- thrd .start ();
108+
109+ // 35000 * 15mins ~= 1 years
110+ RetryCounter retryCounter = RetryCounterFactory .create (35000 , 15 , TimeUnit .MINUTES );
111+ ExecutorService executorService = Executors .newFixedThreadPool (1 );
112+ CompletionService <Integer > completionService = new ExecutorCompletionService <Integer >(executorService );
113+
114+ while (true ) {
115+ completionService .submit (this );
116+ Future <Integer > f = null ;
117+ try {
118+ f = completionService .take ();
119+ if (f != null ) {
120+ Integer status = f .get ();
121+ if (status <= 0 ) {
122+ System .exit (status );
123+ } else if (status == 1 ) {
124+ if (retryCounter .shouldRetry ()) {
125+ retryCounter .sleepUntilNextRetry ();
126+ retryCounter .useRetry ();
127+ } else {
128+ System .exit (-2 );
129+ }
130+ // reset lock
131+ isLeader = new CountDownLatch (1 );
132+ break ;
133+ } else {
134+ //TODO for other unknown status
135+ }
136+ }
137+ } catch (InterruptedException | ExecutionException e ) {
138+ LOG .error (e .getMessage (), e );
139+ }
140+ }
141+
116142 }
117143
118- public void run () {
144+ // return value lesser than 0, means can't recover exception exit.
145+ // -1 configure error
146+ // -2 retry exhaust
147+ // return value greater than 0 , means exception can be recover.
148+ // 1 means network error, retry till network recover.
149+ // return value equals 0, means unknow exception, do exit now.
150+ // change value other than 0 when confirm the exception real reason.
151+ public Integer call () {
119152 VersionInfo .logVersion ();
120153
121154 Options opt = new Options ();
@@ -129,19 +162,19 @@ public void run() {
129162 instance = "1" ;
130163 } catch (NullPointerException e ) {
131164 LOG .error ("No args found: " , e );
132- System . exit ( 1 ) ;
165+ return - 1 ;
133166 } catch (ParseException e ) {
134167 LOG .error ("Could not parse: " , e );
135- System . exit ( 1 ) ;
168+ return - 1 ;
136169 }
137170
138171 try {
139172 zkc = new ZkClient ();
140173 zkc .connect ();
141174 LOG .info ("Connected to ZooKeeper" );
142- } catch (Exception e ) {
143- LOG .error (e );
144- System . exit ( 1 ) ;
175+ } catch (IOException | InterruptedException e ) {
176+ LOG .error (e . getMessage (), e );
177+ return 1 ;
145178 }
146179
147180 try {
@@ -202,9 +235,10 @@ public void run() {
202235 }
203236 } catch (KeeperException .NodeExistsException e ) {
204237 // do nothing...some other server has created znodes
238+ LOG .warn (e .getMessage (), e );
205239 } catch (Exception e ) {
206- LOG .error (e );
207- System . exit ( 0 ) ;
240+ LOG .error (e . getMessage (), e );
241+ return 0 ;
208242 }
209243
210244 metrics = new Metrics ();
@@ -213,10 +247,10 @@ public void run() {
213247 try {
214248 netConf = new DcsNetworkConfiguration (conf );
215249 serverName = netConf .getHostName ();
216- if (serverName == null ) {
250+ if (serverName == null ) {
217251 LOG .error ("DNS Interface [" + conf .get (Constants .DCS_DNS_INTERFACE , Constants .DEFAULT_DCS_DNS_INTERFACE )
218- + "] configured in dcs.site.xml is not found!" );
219- System . exit ( 1 ) ;
252+ + "] configured in dcs.site.xml is not found!" );
253+ return - 1 ;
220254 }
221255
222256 // Wait to become the leader of all DcsMasters
@@ -229,6 +263,11 @@ public void run() {
229263 + ":" + startTime ;
230264 zkc .create (path , new byte [0 ], ZooDefs .Ids .OPEN_ACL_UNSAFE ,
231265 CreateMode .EPHEMERAL );
266+ // Add a check path here for session expired situation,
267+ // if there meets session expired, use the mark to compare with the exist znode,
268+ // if not match, that means a backup master take over the master role.
269+ zkc .setCheckPath (path );
270+
232271 LOG .info ("Created znode [" + path + "]" );
233272
234273 int requestTimeout = conf .getInt (
@@ -262,12 +301,50 @@ public void run() {
262301 future .get ();// block
263302
264303 } catch (Exception e ) {
265- LOG .error (e );
266- e .printStackTrace ();
267- if (pool != null )
268- pool .shutdown ();
269- System .exit (0 );
304+ LOG .error (e .getMessage (), e );
305+ try {
306+ FloatingIp floatingIp = FloatingIp .getInstance (this );
307+ floatingIp .unbindScript ();
308+ } catch (Exception e1 ) {
309+ if (LOG .isErrorEnabled ()) {
310+ LOG .error ("Error creating class FloatingIp [" + e1 .getMessage () + "]" , e1 );
311+ }
312+ }
313+
314+ if (pool != null ) {
315+ try {
316+ pool .shutdown ();
317+ LOG .info ("Interrupt listenerService." );
318+ } catch (Exception e2 ) {
319+ LOG .error ("Error while shutdown ServerManager thread [" + e2 .getMessage () + "]" , e2 );
320+ }
321+ }
322+
323+ if (ls != null ) {
324+ try {
325+ ListenerWorker lw = ls .getWorker ();
326+ if (lw != null ) {
327+ lw .interrupt ();
328+ LOG .info ("Interrupt listenerWorker." );
329+ }
330+ ls .interrupt ();
331+ LOG .info ("Interrupt listenerService." );
332+ } catch (Exception e2 ) {
333+ LOG .error ("Error while shutdown ListenerService thread [" + e2 .getMessage () + "]" , e2 );
334+ }
335+ }
336+ if (infoServer != null ) {
337+ try {
338+ infoServer .stop ();
339+ LOG .info ("Stop infoServer." );
340+ } catch (Exception e2 ) {
341+ LOG .error ("Error while shutdown InfoServer thread [" + e2 .getMessage (), e2 );
342+ }
343+ }
344+ return 1 ;
345+
270346 }
347+ return 0 ;
271348 }
272349
273350 public String getServerName () {
0 commit comments