diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index b287f946134..f9a7496ac0d 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -236,6 +237,9 @@ public class GobblinYarnAppLauncher { // This flag tells if the Yarn application has already completed. This is used to // tell if it is necessary to send a shutdown message to the ApplicationMaster. private volatile boolean applicationCompleted = false; + // Released by the status monitor once the application reaches a terminal state (or the launcher is stopped); + // in attached mode launch()/run() blocks on this so it represents the full application lifecycle. + private final CountDownLatch applicationTerminalLatch = new CountDownLatch(1); private volatile boolean stopped = false; @@ -408,6 +412,28 @@ public void launch() throws IOException, YarnException, InterruptedException { handleApplicationLaunchFailure(e); throw e; } + + // In attached mode, block the calling (launch()/run()) thread until the status monitor signals that the + // application has reached a terminal state (or the launcher was otherwise stopped), then tear down. Running + // stop() here -- on the calling thread rather than the monitor thread -- shuts the (non-daemon) status + // monitor and the launcher's services down before launch() returns, so no launcher-owned thread is left to + // keep the JVM alive. Detached launches return right after submission, leaving the application running. + if (!this.detachOnExitEnabled) { + LOGGER.info("Waiting for Gobblin Yarn application {} to reach a terminal state before exiting the launcher", + this.applicationId.get()); + try { + this.applicationTerminalLatch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.info("Interrupted while waiting for Gobblin Yarn application {} to complete", this.applicationId.get()); + } + LOGGER.info("Gobblin Yarn application monitoring complete; stopping the launcher"); + try { + stop(); + } catch (IOException | TimeoutException e) { + LOGGER.error("Failed to stop " + GobblinYarnAppLauncher.class.getSimpleName() + " after application monitoring", e); + } + } } public boolean isApplicationRunning() { @@ -460,6 +486,10 @@ public synchronized void stop() throws IOException, TimeoutException { return; } + // Release the launch()/run() thread if it is blocked waiting for the application to finish (e.g. on an + // external cancel/shutdown) so it can return. + this.applicationTerminalLatch.countDown(); + LOGGER.info("Stopping the " + GobblinYarnAppLauncher.class.getSimpleName()); try { @@ -522,17 +552,22 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap handleTerminalAppStatus(applicationReport); - try { - GobblinYarnAppLauncher.this.stop(); - } catch (IOException ioe) { - LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); - } catch (TimeoutException te) { - LOGGER.error("Timeout in stopping the service manager", te); - } finally { - if (this.emailNotificationOnShutdown) { - sendEmailOnShutdown(Optional.of(applicationReport)); + // Detached launches have no caller blocked on the latch, so stop here as before. Attached launches are + // torn down by the launch()/run() thread once it is released by the latch below -- running stop() off this + // monitor thread avoids a self-shutdown of this executor. + if (this.detachOnExitEnabled) { + try { + GobblinYarnAppLauncher.this.stop(); + } catch (IOException ioe) { + LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); + } catch (TimeoutException te) { + LOGGER.error("Timeout in stopping the service manager", te); } } + if (this.emailNotificationOnShutdown) { + sendEmailOnShutdown(Optional.of(applicationReport)); + } + this.applicationTerminalLatch.countDown(); } } @@ -611,17 +646,21 @@ public void handleGetApplicationReportFailureEvent( handleLostAmVisibility(); - try { - stop(); - } catch (IOException ioe) { - LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); - } catch (TimeoutException te) { - LOGGER.error("Timeout in stopping the service manager", te); - } finally { - if (this.emailNotificationOnShutdown) { - sendEmailOnShutdown(Optional.absent()); + // See handleApplicationReportArrivalEvent: detached launches stop here; attached launches are torn down by + // the launch()/run() thread once the latch below releases it (off this monitor thread). + if (this.detachOnExitEnabled) { + try { + stop(); + } catch (IOException ioe) { + LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); + } catch (TimeoutException te) { + LOGGER.error("Timeout in stopping the service manager", te); } } + if (this.emailNotificationOnShutdown) { + sendEmailOnShutdown(Optional.absent()); + } + this.applicationTerminalLatch.countDown(); } } diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java index 67537934a04..bba44a6fb60 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java @@ -18,16 +18,23 @@ package org.apache.gobblin.yarn; import java.lang.reflect.Field; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Optional; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent; +import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.mock; @@ -166,4 +173,61 @@ public void testTerminalHandledGuardDispatchesOnlyOnce() throws Exception { Mockito.verify(this.launcher, Mockito.times(1)) .onTerminalApplicationStatus(Mockito.any(), Mockito.any()); } + + // ---------- applicationTerminalLatch: monitor releases the launch()/run() thread, off the monitor thread ---------- + + private ApplicationReport terminalReport(FinalApplicationStatus status) { + ApplicationReport report = mockReport(status); + when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED); + when(report.getDiagnostics()).thenReturn(""); + return report; + } + + @Test + public void testTerminalReportReleasesLatchAndDoesNotStopWhenAttached() throws Exception { + // Attached (detachOnExitEnabled defaults to false): the monitor signals the latch and does NOT call stop() + // itself -- the blocked launch()/run() thread performs teardown, avoiding a self-shutdown of the monitor. + CountDownLatch latch = new CountDownLatch(1); + setField("applicationTerminalLatch", latch); + setField("getApplicationReportFailureCount", new AtomicInteger(0)); + setField("helixClusterLifecycleManager", Optional.absent()); + + this.launcher.handleApplicationReportArrivalEvent( + new ApplicationReportArrivalEvent(terminalReport(FinalApplicationStatus.SUCCEEDED))); + + assertEquals(latch.getCount(), 0); + Mockito.verify(this.launcher, Mockito.never()).stop(); + } + + @Test + public void testLostAmReleasesLatchAndDoesNotStopWhenAttached() throws Exception { + // Regression guard: the lost-AM-visibility path stops/releases without ever setting applicationCompleted, so + // the latch must still be released (otherwise launch()/run() would block forever). + CountDownLatch latch = new CountDownLatch(1); + setField("applicationTerminalLatch", latch); + setField("getApplicationReportFailureCount", new AtomicInteger(0)); // maxGetApplicationReportFailures defaults to 0 + + this.launcher.handleGetApplicationReportFailureEvent( + new GetApplicationReportFailureEvent(new RuntimeException("rm unreachable"))); + + assertEquals(latch.getCount(), 0); + Mockito.verify(this.launcher, Mockito.never()).stop(); + } + + @Test + public void testTerminalReportStopsDirectlyWhenDetached() throws Exception { + // Detached: no caller is blocked on the latch, so the monitor tears down here as before. + CountDownLatch latch = new CountDownLatch(1); + setField("applicationTerminalLatch", latch); + setField("getApplicationReportFailureCount", new AtomicInteger(0)); + setField("helixClusterLifecycleManager", Optional.absent()); + setField("detachOnExitEnabled", true); + Mockito.doNothing().when(this.launcher).stop(); + + this.launcher.handleApplicationReportArrivalEvent( + new ApplicationReportArrivalEvent(terminalReport(FinalApplicationStatus.SUCCEEDED))); + + Mockito.verify(this.launcher, Mockito.times(1)).stop(); + assertEquals(latch.getCount(), 0); + } }