Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -236,6 +237,9 @@ public class GobblinYarnAppLauncher {
// This flag tells if the Yarn application has already completed. This is used to
// tell if it is necessary to send a shutdown message to the ApplicationMaster.
private volatile boolean applicationCompleted = false;
// Released by the status monitor once the application reaches a terminal state (or the launcher is stopped);
// in attached mode launch()/run() blocks on this so it represents the full application lifecycle.
private final CountDownLatch applicationTerminalLatch = new CountDownLatch(1);

private volatile boolean stopped = false;

Expand Down Expand Up @@ -408,6 +412,28 @@ public void launch() throws IOException, YarnException, InterruptedException {
handleApplicationLaunchFailure(e);
throw e;
}

// In attached mode, block the calling (launch()/run()) thread until the status monitor signals that the
// application has reached a terminal state (or the launcher was otherwise stopped), then tear down. Running
// stop() here -- on the calling thread rather than the monitor thread -- shuts the (non-daemon) status
// monitor and the launcher's services down before launch() returns, so no launcher-owned thread is left to
// keep the JVM alive. Detached launches return right after submission, leaving the application running.
if (!this.detachOnExitEnabled) {
LOGGER.info("Waiting for Gobblin Yarn application {} to reach a terminal state before exiting the launcher",
this.applicationId.get());
try {
this.applicationTerminalLatch.await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOGGER.info("Interrupted while waiting for Gobblin Yarn application {} to complete", this.applicationId.get());
}
LOGGER.info("Gobblin Yarn application monitoring complete; stopping the launcher");
try {
stop();
} catch (IOException | TimeoutException e) {
LOGGER.error("Failed to stop " + GobblinYarnAppLauncher.class.getSimpleName() + " after application monitoring", e);
}
}
}

public boolean isApplicationRunning() {
Expand Down Expand Up @@ -460,6 +486,10 @@ public synchronized void stop() throws IOException, TimeoutException {
return;
}

// Release the launch()/run() thread if it is blocked waiting for the application to finish (e.g. on an
// external cancel/shutdown) so it can return.
this.applicationTerminalLatch.countDown();

LOGGER.info("Stopping the " + GobblinYarnAppLauncher.class.getSimpleName());

try {
Expand Down Expand Up @@ -522,17 +552,22 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap

handleTerminalAppStatus(applicationReport);

try {
GobblinYarnAppLauncher.this.stop();
} catch (IOException ioe) {
LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe);
} catch (TimeoutException te) {
LOGGER.error("Timeout in stopping the service manager", te);
} finally {
if (this.emailNotificationOnShutdown) {
sendEmailOnShutdown(Optional.of(applicationReport));
// Detached launches have no caller blocked on the latch, so stop here as before. Attached launches are
// torn down by the launch()/run() thread once it is released by the latch below -- running stop() off this
// monitor thread avoids a self-shutdown of this executor.
if (this.detachOnExitEnabled) {
try {
GobblinYarnAppLauncher.this.stop();
} catch (IOException ioe) {
LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe);
} catch (TimeoutException te) {
LOGGER.error("Timeout in stopping the service manager", te);
}
}
if (this.emailNotificationOnShutdown) {
sendEmailOnShutdown(Optional.of(applicationReport));
}
this.applicationTerminalLatch.countDown();
}
}

Expand Down Expand Up @@ -611,17 +646,21 @@ public void handleGetApplicationReportFailureEvent(

handleLostAmVisibility();

try {
stop();
} catch (IOException ioe) {
LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe);
} catch (TimeoutException te) {
LOGGER.error("Timeout in stopping the service manager", te);
} finally {
if (this.emailNotificationOnShutdown) {
sendEmailOnShutdown(Optional.<ApplicationReport>absent());
// See handleApplicationReportArrivalEvent: detached launches stop here; attached launches are torn down by
// the launch()/run() thread once the latch below releases it (off this monitor thread).
if (this.detachOnExitEnabled) {
try {
stop();
} catch (IOException ioe) {
LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), ioe);
} catch (TimeoutException te) {
LOGGER.error("Timeout in stopping the service manager", te);
}
}
if (this.emailNotificationOnShutdown) {
sendEmailOnShutdown(Optional.<ApplicationReport>absent());
}
this.applicationTerminalLatch.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
package org.apache.gobblin.yarn;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Optional;

import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;

import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -166,4 +173,61 @@ public void testTerminalHandledGuardDispatchesOnlyOnce() throws Exception {
Mockito.verify(this.launcher, Mockito.times(1))
.onTerminalApplicationStatus(Mockito.any(), Mockito.any());
}

// ---------- applicationTerminalLatch: monitor releases the launch()/run() thread, off the monitor thread ----------

private ApplicationReport terminalReport(FinalApplicationStatus status) {
ApplicationReport report = mockReport(status);
when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
when(report.getDiagnostics()).thenReturn("");
return report;
}

@Test
public void testTerminalReportReleasesLatchAndDoesNotStopWhenAttached() throws Exception {
// Attached (detachOnExitEnabled defaults to false): the monitor signals the latch and does NOT call stop()
// itself -- the blocked launch()/run() thread performs teardown, avoiding a self-shutdown of the monitor.
CountDownLatch latch = new CountDownLatch(1);
setField("applicationTerminalLatch", latch);
setField("getApplicationReportFailureCount", new AtomicInteger(0));
setField("helixClusterLifecycleManager", Optional.absent());

this.launcher.handleApplicationReportArrivalEvent(
new ApplicationReportArrivalEvent(terminalReport(FinalApplicationStatus.SUCCEEDED)));

assertEquals(latch.getCount(), 0);
Mockito.verify(this.launcher, Mockito.never()).stop();
}

@Test
public void testLostAmReleasesLatchAndDoesNotStopWhenAttached() throws Exception {
// Regression guard: the lost-AM-visibility path stops/releases without ever setting applicationCompleted, so
// the latch must still be released (otherwise launch()/run() would block forever).
CountDownLatch latch = new CountDownLatch(1);
setField("applicationTerminalLatch", latch);
setField("getApplicationReportFailureCount", new AtomicInteger(0)); // maxGetApplicationReportFailures defaults to 0

this.launcher.handleGetApplicationReportFailureEvent(
new GetApplicationReportFailureEvent(new RuntimeException("rm unreachable")));

assertEquals(latch.getCount(), 0);
Mockito.verify(this.launcher, Mockito.never()).stop();
}

@Test
public void testTerminalReportStopsDirectlyWhenDetached() throws Exception {
// Detached: no caller is blocked on the latch, so the monitor tears down here as before.
CountDownLatch latch = new CountDownLatch(1);
setField("applicationTerminalLatch", latch);
setField("getApplicationReportFailureCount", new AtomicInteger(0));
setField("helixClusterLifecycleManager", Optional.absent());
setField("detachOnExitEnabled", true);
Mockito.doNothing().when(this.launcher).stop();

this.launcher.handleApplicationReportArrivalEvent(
new ApplicationReportArrivalEvent(terminalReport(FinalApplicationStatus.SUCCEEDED)));

Mockito.verify(this.launcher, Mockito.times(1)).stop();
assertEquals(latch.getCount(), 0);
}
}
Loading