Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@
<version>1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>mock-slave</artifactId>
<version>153.v9768799a_2294</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ static final class Execution extends AbstractStepExecutionImpl implements Runnab
} else {
LOGGER.fine(() -> "rediscovering that " + node + " has been removed and timeout has expired");
listener().getLogger().println(node + " has been removed for " + Util.getTimeSpanString(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS) + ", assuming it is not coming back");
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeCause());
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeTimeoutCause());
}
}
removedNodeDiscovered = 0; // something else; reset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected Executor tryResolve() throws Exception {
Queue.getInstance().cancel(item);
owner.getListener().getLogger().printf("Killed %s after waiting for %s because we assume unknown agent %s is never going to appear%n",
item.task.getDisplayName(), Util.getTimeSpanString(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS), placeholder.getAssignedLabel());
throw new FlowInterruptedException(Result.ABORTED, new ExecutorStepExecution.RemovedNodeCause());
throw new FlowInterruptedException(Result.ABORTED, new ExecutorStepExecution.RemovedNodeTimeoutCause());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.stream.Stream;
import jenkins.model.CauseOfInterruption;
import org.jenkinsci.Symbol;
import org.jenkinsci.plugins.workflow.flow.ErrorCondition;
import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException;
Expand All @@ -57,8 +58,7 @@ public final class AgentErrorCondition extends ErrorCondition {
if (t instanceof AgentOfflineException) {
return true;
}
if (t instanceof FlowInterruptedException && ((FlowInterruptedException) t).getCauses().stream().anyMatch(
c -> c instanceof ExecutorStepExecution.RemovedNodeCause || c instanceof ExecutorStepExecution.QueueTaskCancelled)) {
if (t instanceof FlowInterruptedException && ((FlowInterruptedException) t).getCauses().stream().anyMatch(Retryable.class::isInstance)) {
return true;
}
if (isClosedChannelException(t)) {
Expand Down Expand Up @@ -90,6 +90,11 @@ private static boolean isClosedChannelException(Throwable t) {
}
}

/**
* A marker interface for {@link CauseOfInterruption} instances that can be retried through {@link AgentErrorCondition}.
*/
public interface Retryable {}

@Symbol("agent")
@Extension public static final class DescriptorImpl extends ErrorConditionDescriptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void resume(StepContext context) throws Exception {
exec = item.getFuture().getStartCondition().get(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS, TimeUnit.MILLISECONDS);
} catch (TimeoutException x) {
listener.getLogger().println(node + " has been removed for " + Util.getTimeSpanString(ExecutorStepExecution.TIMEOUT_WAITING_FOR_NODE_MILLIS) + ", assuming it is not coming back");
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeCause());
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.RemovedNodeTimeoutCause());
} catch (CancellationException x) {
LOGGER.log(Level.FINE, "ceased to wait for " + node, x);
throw new FlowInterruptedException(Result.ABORTED, /* TODO false probably more appropriate */true, new ExecutorStepExecution.QueueTaskCancelled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import hudson.model.ResourceList;
import hudson.model.Result;
import hudson.model.Run;
import hudson.model.Slave;
import hudson.model.TaskListener;
import hudson.model.TopLevelItem;
import hudson.model.User;
Expand All @@ -38,6 +39,7 @@
import hudson.security.ACLContext;
import hudson.security.AccessControlled;
import hudson.security.Permission;
import hudson.slaves.AbstractCloudSlave;
import hudson.slaves.OfflineCause;
import hudson.slaves.WorkspaceList;
import java.io.IOException;
Expand Down Expand Up @@ -74,6 +76,7 @@
import org.acegisecurity.Authentication;
import org.jenkinsci.plugins.durabletask.executors.ContinuableExecutable;
import org.jenkinsci.plugins.durabletask.executors.ContinuedTask;
import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy;
import org.jenkinsci.plugins.workflow.actions.LabelAction;
import org.jenkinsci.plugins.workflow.actions.QueueItemAction;
import org.jenkinsci.plugins.workflow.actions.ThreadNameAction;
Expand Down Expand Up @@ -334,7 +337,7 @@ public void stop(@NonNull Throwable cause) throws Exception {

}

public static final class QueueTaskCancelled extends CauseOfInterruption {
public static final class QueueTaskCancelled extends RetryableCauseOfInterruption {
@Override public String getShortDescription() {
return Messages.ExecutorStepExecution_queue_task_cancelled();
}
Expand All @@ -346,51 +349,75 @@ public static final class QueueTaskCancelled extends CauseOfInterruption {
return;
}
LOGGER.fine(() -> "received node deletion event on " + node.getNodeName());
Timer.get().schedule(() -> {
Computer c = node.toComputer();
if (c == null || c.isOnline()) {
LOGGER.fine(() -> "computer for " + node.getNodeName() + " was missing or online, skipping");
return;
}
LOGGER.fine(() -> "processing node deletion event on " + node.getNodeName());
for (Executor e : c.getExecutors()) {
Queue.Executable exec = e.getCurrentExecutable();
if (exec instanceof PlaceholderTask.PlaceholderExecutable) {
PlaceholderTask task = ((PlaceholderTask.PlaceholderExecutable) exec).getParent();
TaskListener listener;
try {
listener = task.context.get(TaskListener.class);
} catch (Exception x) {
LOGGER.log(Level.WARNING, null, x);
continue;
}
task.withExecution(execution -> {
BodyExecution body = execution.body;
if (body == null) {
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted, but do not have a node body to cancel");
return;
}
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted; cancelling node body");
if (Util.isOverridden(BodyExecution.class, body.getClass(), "cancel", Throwable.class)) {
body.cancel(new FlowInterruptedException(Result.ABORTED, false, new RemovedNodeCause()));
} else { // TODO remove once https://github.com/jenkinsci/workflow-cps-plugin/pull/570 is widely deployed
body.cancel(new RemovedNodeCause());
}
});
if (isOneShotAgent(node)) {
LOGGER.fine(() -> "Cancelling owner run for one-shot agent " + node.getNodeName() + " immediately");
cancelOwnerExecution(node, new RemovedNodeCause());
Comment on lines +352 to +354
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The crucial part FTR.)

} else {
LOGGER.fine(() -> "Will cancel owner run for agent " + node.getNodeName() + " after waiting for " + TIMEOUT_WAITING_FOR_NODE_MILLIS + "ms");
Timer.get().schedule(() -> cancelOwnerExecution(node, new RemovedNodeCause()), TIMEOUT_WAITING_FOR_NODE_MILLIS, TimeUnit.MILLISECONDS);
}
}

private static boolean isOneShotAgent(Node node) {
return node instanceof AbstractCloudSlave ||
(node instanceof Slave && ((Slave) node).getRetentionStrategy() instanceof OnceRetentionStrategy);
Comment on lines +362 to +363
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this heuristic does not match EC2AbstractSlave extends Slave nor EC2RetentionStrategy extends RetentionStrategy. I guess we need to hard-code support for those nonstandard implementations. CC @car-roll

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we start introducing a marker interface for this usage?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, though I think it would suffice for EC2AbstractSlave to extend AbstractCloudSlave and EC2Computer to extend AbstractCloudComputer, with some minor refactoring to delete then-redundant logic. CloudBees-internal reference

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private static void cancelOwnerExecution(Node node, CauseOfInterruption... causes) {
Computer c = node.toComputer();
if (c == null || c.isOnline()) {
LOGGER.fine(() -> "computer for " + node.getNodeName() + " was missing or online, skipping");
return;
}
LOGGER.fine(() -> "processing node deletion event on " + node.getNodeName());
for (Executor e : c.getExecutors()) {
Queue.Executable exec = e.getCurrentExecutable();
if (exec instanceof PlaceholderTask.PlaceholderExecutable) {
PlaceholderTask task = ((PlaceholderTask.PlaceholderExecutable) exec).getParent();
TaskListener listener;
try {
listener = task.context.get(TaskListener.class);
} catch (Exception x) {
LOGGER.log(Level.WARNING, null, x);
continue;
}
task.withExecution(execution -> {
BodyExecution body = execution.body;
if (body == null) {
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted, but do not have a node body to cancel");
return;
}
listener.getLogger().println("Agent " + node.getNodeName() + " was deleted; cancelling node body");
if (Util.isOverridden(BodyExecution.class, body.getClass(), "cancel", Throwable.class)) {
body.cancel(new FlowInterruptedException(Result.ABORTED, false, causes));
} else { // TODO remove once https://github.com/jenkinsci/workflow-cps-plugin/pull/570 is widely deployed
body.cancel(causes);
}
});
}
}, TIMEOUT_WAITING_FOR_NODE_MILLIS, TimeUnit.MILLISECONDS);
}
}
}

public static final class RemovedNodeCause extends CauseOfInterruption {
public static final class RemovedNodeCause extends RetryableCauseOfInterruption {
@SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", justification = "deliberately mutable")
public static boolean ENABLED = Boolean.parseBoolean(System.getProperty(ExecutorStepExecution.class.getName() + ".REMOVED_NODE_DETECTION", "true"));
@Override public String getShortDescription() {
return "Agent was removed";
}
}

public static final class RemovedNodeTimeoutCause extends RetryableCauseOfInterruption {
@Override public String getShortDescription() {
return "Timeout waiting for agent to come back";
}
}

/**
* Base class for a cause of interruption that can be retried via {@link AgentErrorCondition}.
*/
private abstract static class RetryableCauseOfInterruption extends CauseOfInterruption implements AgentErrorCondition.Retryable {}

/** Transient handle of a running executor task. */
private static final class RunningTask {
/** null until placeholder executable runs */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,38 @@

package org.jenkinsci.plugins.workflow.support.steps;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import hudson.model.Label;
import hudson.model.Queue;
import hudson.model.Result;
import hudson.slaves.DumbSlave;
import hudson.slaves.RetentionStrategy;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import jenkins.model.InterruptedBuildAction;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.anyOf;
import org.jenkinci.plugins.mock_slave.MockCloud;
import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.flow.FlowExecutionList;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.JenkinsSessionRule;
Expand All @@ -60,9 +65,12 @@ public class ExecutorStepDynamicContextTest {

@ClassRule public static BuildWatcher buildWatcher = new BuildWatcher();
@Rule public JenkinsSessionRule sessions = new JenkinsSessionRule();
@Rule public TemporaryFolder tmp = new TemporaryFolder();
@Rule public LoggerRule logging = new LoggerRule();

private void commonSetup() {
logging.recordPackage(ExecutorStepExecution.class, Level.FINE).record(FlowExecutionList.class, Level.FINE);
}

@Test public void canceledQueueItem() throws Throwable {
sessions.then(j -> {
DumbSlave s = j.createSlave(Label.get("remote"));
Expand All @@ -75,11 +83,7 @@ public class ExecutorStepDynamicContextTest {
sessions.then(j -> {
SemaphoreStep.success("wait/1", null);
WorkflowRun b = j.jenkins.getItemByFullName("p", WorkflowJob.class).getBuildByNumber(1);
while (Queue.getInstance().getItems().length == 0) {
Thread.sleep(100);
}
Queue.Item[] items = Queue.getInstance().getItems();
assertEquals(1, items.length);
var items = await().timeout(Duration.ofMinutes(1)).until(() -> j.jenkins.getQueue().getItems(), arrayWithSize(1));
Queue.getInstance().cancel(items[0]);
j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(b));
InterruptedBuildAction iba = b.getAction(InterruptedBuildAction.class);
Expand All @@ -98,7 +102,7 @@ public class ExecutorStepDynamicContextTest {
*/
@Issue("JENKINS-36013")
@Test public void normalNodeDisappearance() throws Throwable {
logging.recordPackage(ExecutorStepExecution.class, Level.FINE).record(FlowExecutionList.class, Level.FINE);
commonSetup();
sessions.then(j -> {
// Start up a build that needs executor and then reboot and take the node offline
// Starting job first ensures we don't immediately fail if Node comes from a Cloud
Expand All @@ -114,20 +118,19 @@ public class ExecutorStepDynamicContextTest {
sessions.then(j -> {
// Start up a build and then reboot and take the node offline
assertEquals(0, j.jenkins.getLabel("ghost").getNodes().size()); // Make sure test impl is correctly deleted
assertNull(j.jenkins.getNode("ghost")); // Make sure test impl is correctly deleted
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless assertion, since the node would just have the label ghost but usually named slave0 based on generation.

WorkflowRun run = j.jenkins.getItemByFullName("p", WorkflowJob.class).getLastBuild();
j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(run));
j.assertLogContains("slave0 has been removed for ", run);
assertThat(j.jenkins.getQueue().getItems(), emptyArray());
InterruptedBuildAction iba = run.getAction(InterruptedBuildAction.class);
assertNotNull(iba);
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class)));
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeTimeoutCause.class)));
});
}

@Issue("JENKINS-36013")
@Test public void parallelNodeDisappearance() throws Throwable {
logging.recordPackage(ExecutorStepExecution.class, Level.FINE).record(FlowExecutionList.class, Level.FINE);
commonSetup();
sessions.then(j -> {
WorkflowJob p = j.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("def bs = [:]; for (int _i = 0; _i < 5; _i++) {def i = _i; bs[/b$i/] = {node('remote') {semaphore(/s$i/)}}}; parallel bs", true));
Expand Down Expand Up @@ -207,4 +210,43 @@ public class ExecutorStepDynamicContextTest {
});
}

@Test public void onceRetentionStrategyNodeDisappearance() throws Throwable {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially a copy of normalNodeDisappearance checking some behavioural differences.

commonSetup();
sessions.then(j -> {
DumbSlave s = j.createSlave(Label.get("ghost"));
s.setRetentionStrategy(new OnceRetentionStrategy(0));
Comment thread
Vlatombe marked this conversation as resolved.
WorkflowJob p = j.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node('ghost') {if (isUnix()) {sh 'sleep infinity'} else {bat 'echo + sleep infinity && ping -n 999999 localhost'}}", true));
var run = p.scheduleBuild2(0).waitForStart();
j.waitForMessage("+ sleep infinity", run);
j.jenkins.removeNode(s);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this is what Reaper in kubernetes would do as soon as an agent pod is deleted.

j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(run));
assertThat(j.jenkins.getQueue().getItems(), emptyArray());
InterruptedBuildAction iba = run.getAction(InterruptedBuildAction.class);
assertNotNull(iba);
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class)));
});
}

@Test public void cloudNodeDisappearance() throws Throwable {
commonSetup();
sessions.then(j -> {
var mockCloud = new MockCloud("mock");
mockCloud.setLabels("mock");
j.jenkins.clouds.add(mockCloud);
WorkflowJob p = j.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("node('mock') {if (isUnix()) {sh 'sleep infinity'} else {bat 'echo + sleep infinity && ping -n 999999 localhost'}}", true));
WorkflowRun run = p.scheduleBuild2(0).waitForStart();
j.waitForMessage("+ sleep infinity", run);
var mockNodes = j.jenkins.getLabel("mock").getNodes();
assertThat(mockNodes, hasSize(1));
var mockNode = mockNodes.iterator().next();
j.jenkins.removeNode(mockNode);
j.assertBuildStatus(Result.ABORTED, j.waitForCompletion(run));
assertThat(j.jenkins.getQueue().getItems(), emptyArray());
InterruptedBuildAction iba = run.getAction(InterruptedBuildAction.class);
assertNotNull(iba);
assertThat(iba.getCauses(), contains(isA(ExecutorStepExecution.RemovedNodeCause.class)));
});
}
}