From 803364e3d6b9e130412899de7cd578f206d25202 Mon Sep 17 00:00:00 2001 From: pratapaditya04 Date: Wed, 24 Jun 2026 14:55:20 +0530 Subject: [PATCH] [GOBBLIN-2267] Block GobblinYarnAppLauncher launch() until the YARN application is terminal GobblinYarnAppLauncher.launch() returned as soon as the YARN application was submitted while the status monitor kept polling on a background non-daemon thread. For the Azkaban submitter path run() therefore returned at submission time, and the launcher-owned non-daemon threads (the status monitor, the ServiceManager, the YARN client) kept the JVM alive for the whole job, so the process hung after run() finished. The status monitor also called stop() from its own worker thread on a terminal report, which then awaited termination of its own executor and stalled for the full timeout. Add a CountDownLatch that the status monitor releases once the application reaches a terminal state (or the launcher is otherwise stopped, e.g. lost AM visibility). In attached mode launch()/run() blocks on the latch and then runs stop() on the calling thread, so teardown shuts the (non-daemon) monitor and the launcher services down before launch() returns: no launcher-owned thread is left to keep the JVM alive, and stop() no longer runs on the monitor thread, removing the self-shutdown stall. Detached launches return right after submission and are torn down by the monitor as before. Add unit tests covering that a terminal or lost-AM report releases the latch without calling stop() on the monitor thread (attached), and that a detached launch still stops on the monitor thread. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../gobblin/yarn/GobblinYarnAppLauncher.java | 75 ++++++++++++++----- ...GobblinYarnAppLauncherTerminalGteTest.java | 64 ++++++++++++++++ 2 files changed, 121 insertions(+), 18 deletions(-) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index b287f946134..f9a7496ac0d 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -236,6 +237,9 @@ public class GobblinYarnAppLauncher { // This flag tells if the Yarn application has already completed. This is used to // tell if it is necessary to send a shutdown message to the ApplicationMaster. private volatile boolean applicationCompleted = false; + // Released by the status monitor once the application reaches a terminal state (or the launcher is stopped); + // in attached mode launch()/run() blocks on this so it represents the full application lifecycle. + private final CountDownLatch applicationTerminalLatch = new CountDownLatch(1); private volatile boolean stopped = false; @@ -408,6 +412,28 @@ public void launch() throws IOException, YarnException, InterruptedException { handleApplicationLaunchFailure(e); throw e; } + + // In attached mode, block the calling (launch()/run()) thread until the status monitor signals that the + // application has reached a terminal state (or the launcher was otherwise stopped), then tear down. Running + // stop() here -- on the calling thread rather than the monitor thread -- shuts the (non-daemon) status + // monitor and the launcher's services down before launch() returns, so no launcher-owned thread is left to + // keep the JVM alive. Detached launches return right after submission, leaving the application running. + if (!this.detachOnExitEnabled) { + LOGGER.info("Waiting for Gobblin Yarn application {} to reach a terminal state before exiting the launcher", + this.applicationId.get()); + try { + this.applicationTerminalLatch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.info("Interrupted while waiting for Gobblin Yarn application {} to complete", this.applicationId.get()); + } + LOGGER.info("Gobblin Yarn application monitoring complete; stopping the launcher"); + try { + stop(); + } catch (IOException | TimeoutException e) { + LOGGER.error("Failed to stop " + GobblinYarnAppLauncher.class.getSimpleName() + " after application monitoring", e); + } + } } public boolean isApplicationRunning() { @@ -460,6 +486,10 @@ public synchronized void stop() throws IOException, TimeoutException { return; } + // Release the launch()/run() thread if it is blocked waiting for the application to finish (e.g. on an + // external cancel/shutdown) so it can return. + this.applicationTerminalLatch.countDown(); + LOGGER.info("Stopping the " + GobblinYarnAppLauncher.class.getSimpleName()); try { @@ -522,17 +552,22 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap handleTerminalAppStatus(applicationReport); - try { - GobblinYarnAppLauncher.this.stop(); - } catch (IOException ioe) { - LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); - } catch (TimeoutException te) { - LOGGER.error("Timeout in stopping the service manager", te); - } finally { - if (this.emailNotificationOnShutdown) { - sendEmailOnShutdown(Optional.of(applicationReport)); + // Detached launches have no caller blocked on the latch, so stop here as before. Attached launches are + // torn down by the launch()/run() thread once it is released by the latch below -- running stop() off this + // monitor thread avoids a self-shutdown of this executor. + if (this.detachOnExitEnabled) { + try { + GobblinYarnAppLauncher.this.stop(); + } catch (IOException ioe) { + LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); + } catch (TimeoutException te) { + LOGGER.error("Timeout in stopping the service manager", te); } } + if (this.emailNotificationOnShutdown) { + sendEmailOnShutdown(Optional.of(applicationReport)); + } + this.applicationTerminalLatch.countDown(); } } @@ -611,17 +646,21 @@ public void handleGetApplicationReportFailureEvent( handleLostAmVisibility(); - try { - stop(); - } catch (IOException ioe) { - LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); - } catch (TimeoutException te) { - LOGGER.error("Timeout in stopping the service manager", te); - } finally { - if (this.emailNotificationOnShutdown) { - sendEmailOnShutdown(Optional.absent()); + // See handleApplicationReportArrivalEvent: detached launches stop here; attached launches are torn down by + // the launch()/run() thread once the latch below releases it (off this monitor thread). + if (this.detachOnExitEnabled) { + try { + stop(); + } catch (IOException ioe) { + LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe); + } catch (TimeoutException te) { + LOGGER.error("Timeout in stopping the service manager", te); } } + if (this.emailNotificationOnShutdown) { + sendEmailOnShutdown(Optional.absent()); + } + this.applicationTerminalLatch.countDown(); } } diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java index 67537934a04..bba44a6fb60 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java @@ -18,16 +18,23 @@ package org.apache.gobblin.yarn; import java.lang.reflect.Field; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Optional; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent; +import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.mock; @@ -166,4 +173,61 @@ public void testTerminalHandledGuardDispatchesOnlyOnce() throws Exception { Mockito.verify(this.launcher, Mockito.times(1)) .onTerminalApplicationStatus(Mockito.any(), Mockito.any()); } + + // ---------- applicationTerminalLatch: monitor releases the launch()/run() thread, off the monitor thread ---------- + + private ApplicationReport terminalReport(FinalApplicationStatus status) { + ApplicationReport report = mockReport(status); + when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED); + when(report.getDiagnostics()).thenReturn(""); + return report; + } + + @Test + public void testTerminalReportReleasesLatchAndDoesNotStopWhenAttached() throws Exception { + // Attached (detachOnExitEnabled defaults to false): the monitor signals the latch and does NOT call stop() + // itself -- the blocked launch()/run() thread performs teardown, avoiding a self-shutdown of the monitor. + CountDownLatch latch = new CountDownLatch(1); + setField("applicationTerminalLatch", latch); + setField("getApplicationReportFailureCount", new AtomicInteger(0)); + setField("helixClusterLifecycleManager", Optional.absent()); + + this.launcher.handleApplicationReportArrivalEvent( + new ApplicationReportArrivalEvent(terminalReport(FinalApplicationStatus.SUCCEEDED))); + + assertEquals(latch.getCount(), 0); + Mockito.verify(this.launcher, Mockito.never()).stop(); + } + + @Test + public void testLostAmReleasesLatchAndDoesNotStopWhenAttached() throws Exception { + // Regression guard: the lost-AM-visibility path stops/releases without ever setting applicationCompleted, so + // the latch must still be released (otherwise launch()/run() would block forever). + CountDownLatch latch = new CountDownLatch(1); + setField("applicationTerminalLatch", latch); + setField("getApplicationReportFailureCount", new AtomicInteger(0)); // maxGetApplicationReportFailures defaults to 0 + + this.launcher.handleGetApplicationReportFailureEvent( + new GetApplicationReportFailureEvent(new RuntimeException("rm unreachable"))); + + assertEquals(latch.getCount(), 0); + Mockito.verify(this.launcher, Mockito.never()).stop(); + } + + @Test + public void testTerminalReportStopsDirectlyWhenDetached() throws Exception { + // Detached: no caller is blocked on the latch, so the monitor tears down here as before. + CountDownLatch latch = new CountDownLatch(1); + setField("applicationTerminalLatch", latch); + setField("getApplicationReportFailureCount", new AtomicInteger(0)); + setField("helixClusterLifecycleManager", Optional.absent()); + setField("detachOnExitEnabled", true); + Mockito.doNothing().when(this.launcher).stop(); + + this.launcher.handleApplicationReportArrivalEvent( + new ApplicationReportArrivalEvent(terminalReport(FinalApplicationStatus.SUCCEEDED))); + + Mockito.verify(this.launcher, Mockito.times(1)).stop(); + assertEquals(latch.getCount(), 0); + } }