Skip to content

Commit 0c298e9

Browse files
fix(openworkflow): handle stale-write races in workflow run transitions
1 parent 510a87d commit 0c298e9

3 files changed

Lines changed: 174 additions & 31 deletions

File tree

packages/openworkflow/worker/execution.test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2726,6 +2726,56 @@ describe("executeWorkflow", () => {
27262726
expect(failCall.error.message).toMatch(/exceeded the step limit/i);
27272727
});
27282728

2729+
test("ignores fail-transition races after run cancel", async () => {
2730+
const listStepAttempts = vi.fn(() =>
2731+
Promise.resolve({
2732+
data: [],
2733+
pagination: { next: null, prev: null },
2734+
}),
2735+
);
2736+
const failWorkflowRun = vi.fn(() =>
2737+
Promise.reject(new Error("Failed to mark workflow run failed")),
2738+
);
2739+
const getWorkflowRun = vi.fn(() =>
2740+
Promise.resolve(
2741+
createMockWorkflowRun({
2742+
id: "workflow-cancel-race-run",
2743+
status: "canceled",
2744+
workerId: null,
2745+
finishedAt: new Date("2026-01-01T00:00:02.000Z"),
2746+
}),
2747+
),
2748+
);
2749+
const workflowFn = vi.fn(() => {
2750+
throw new Error("workflow failed");
2751+
});
2752+
const workflowRun = createMockWorkflowRun({
2753+
id: "workflow-cancel-race-run",
2754+
workerId: "worker-cancel-race",
2755+
});
2756+
2757+
await expect(
2758+
executeWorkflow({
2759+
backend: {
2760+
listStepAttempts,
2761+
failWorkflowRun,
2762+
getWorkflowRun,
2763+
} as unknown as Backend,
2764+
workflowRun,
2765+
workflowFn,
2766+
workflowVersion: null,
2767+
workerId: "worker-cancel-race",
2768+
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
2769+
}),
2770+
).resolves.toBeUndefined();
2771+
2772+
expect(workflowFn).toHaveBeenCalledTimes(1);
2773+
expect(failWorkflowRun).toHaveBeenCalledTimes(1);
2774+
expect(getWorkflowRun).toHaveBeenCalledWith({
2775+
workflowRunId: workflowRun.id,
2776+
});
2777+
});
2778+
27292779
test("handles workflow errors with deadline exceeded", async () => {
27302780
const backend = await createBackend();
27312781
const client = new OpenWorkflow({ backend });

packages/openworkflow/worker/execution.ts

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -935,6 +935,44 @@ class StepExecutor implements StepApi {
935935
}
936936
}
937937

938+
/**
939+
* Execute a workflow-run transition and swallow expected stale-write races when
940+
* this worker no longer owns an actively running execution.
941+
* @param options - Transition execution options
942+
*/
943+
async function executeWorkflowRunTransition(
944+
options: Readonly<{
945+
backend: Backend;
946+
workflowRunId: string;
947+
workerId: string;
948+
transition: () => Promise<unknown>;
949+
}>,
950+
): Promise<void> {
951+
try {
952+
await options.transition();
953+
} catch (error) {
954+
let currentRun: WorkflowRun | null = null;
955+
956+
try {
957+
currentRun = await options.backend.getWorkflowRun({
958+
workflowRunId: options.workflowRunId,
959+
});
960+
} catch {
961+
throw error;
962+
}
963+
964+
if (
965+
currentRun &&
966+
(currentRun.status !== "running" ||
967+
currentRun.workerId !== options.workerId)
968+
) {
969+
return;
970+
}
971+
972+
throw error;
973+
}
974+
}
975+
938976
/**
939977
* Parameters for the workflow execution use case.
940978
*/
@@ -1030,31 +1068,52 @@ export async function executeWorkflow(
10301068

10311069
// mark success
10321070
executionFence.deactivate();
1033-
await backend.completeWorkflowRun({
1071+
await executeWorkflowRunTransition({
1072+
backend,
10341073
workflowRunId: workflowRun.id,
10351074
workerId,
1036-
output: (output ?? null) as JsonValue,
1075+
transition: async () => {
1076+
await backend.completeWorkflowRun({
1077+
workflowRunId: workflowRun.id,
1078+
workerId,
1079+
output: (output ?? null) as JsonValue,
1080+
});
1081+
},
10371082
});
10381083
} catch (error) {
10391084
executionFence.deactivate();
10401085

10411086
// handle sleep signal by parking the workflow in running status
10421087
if (error instanceof SleepSignal) {
1043-
await backend.sleepWorkflowRun({
1088+
await executeWorkflowRunTransition({
1089+
backend,
10441090
workflowRunId: workflowRun.id,
10451091
workerId,
1046-
availableAt: error.resumeAt,
1092+
transition: async () => {
1093+
await backend.sleepWorkflowRun({
1094+
workflowRunId: workflowRun.id,
1095+
workerId,
1096+
availableAt: error.resumeAt,
1097+
});
1098+
},
10471099
});
10481100

10491101
return;
10501102
}
10511103

10521104
if (error instanceof StepLimitExceededError) {
1053-
await backend.failWorkflowRun({
1105+
await executeWorkflowRunTransition({
1106+
backend,
10541107
workflowRunId: workflowRun.id,
10551108
workerId,
1056-
error: serializeStepLimitExceededError(error),
1057-
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
1109+
transition: async () => {
1110+
await backend.failWorkflowRun({
1111+
workflowRunId: workflowRun.id,
1112+
workerId,
1113+
error: serializeStepLimitExceededError(error),
1114+
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
1115+
});
1116+
},
10581117
});
10591118
return;
10601119
}
@@ -1071,11 +1130,18 @@ export async function executeWorkflow(
10711130
);
10721131

10731132
if (retryDecision.status === "failed") {
1074-
await backend.failWorkflowRun({
1133+
await executeWorkflowRunTransition({
1134+
backend,
10751135
workflowRunId: workflowRun.id,
10761136
workerId,
1077-
error: serializedError,
1078-
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
1137+
transition: async () => {
1138+
await backend.failWorkflowRun({
1139+
workflowRunId: workflowRun.id,
1140+
workerId,
1141+
error: serializedError,
1142+
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
1143+
});
1144+
},
10791145
});
10801146
return;
10811147
}
@@ -1089,11 +1155,20 @@ export async function executeWorkflow(
10891155
}
10901156
/* v8 ignore stop */
10911157

1092-
await backend.rescheduleWorkflowRunAfterFailedStepAttempt({
1158+
const availableAt = retryDecision.availableAt;
1159+
1160+
await executeWorkflowRunTransition({
1161+
backend,
10931162
workflowRunId: workflowRun.id,
10941163
workerId,
1095-
error: serializedError,
1096-
availableAt: retryDecision.availableAt,
1164+
transition: async () => {
1165+
await backend.rescheduleWorkflowRunAfterFailedStepAttempt({
1166+
workflowRunId: workflowRun.id,
1167+
workerId,
1168+
error: serializedError,
1169+
availableAt,
1170+
});
1171+
},
10971172
});
10981173
return;
10991174
}
@@ -1103,11 +1178,18 @@ export async function executeWorkflow(
11031178
}
11041179

11051180
// mark failure
1106-
await backend.failWorkflowRun({
1181+
await executeWorkflowRunTransition({
1182+
backend,
11071183
workflowRunId: workflowRun.id,
11081184
workerId,
1109-
error: serializeError(error),
1110-
retryPolicy: params.retryPolicy,
1185+
transition: async () => {
1186+
await backend.failWorkflowRun({
1187+
workflowRunId: workflowRun.id,
1188+
workerId,
1189+
error: serializeError(error),
1190+
retryPolicy: params.retryPolicy,
1191+
});
1192+
},
11111193
});
11121194
}
11131195
}

packages/openworkflow/worker/worker.test.ts

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,6 +1078,9 @@ describe("Worker", () => {
10781078
test("worker handles when canceled workflow during execution", async () => {
10791079
const backend = await createBackend();
10801080
const client = new OpenWorkflow({ backend });
1081+
const consoleErrorSpy = vi
1082+
.spyOn(console, "error")
1083+
.mockImplementation(() => 0);
10811084

10821085
let stepExecuted = false;
10831086
const workflow = client.defineWorkflow(
@@ -1094,25 +1097,33 @@ describe("Worker", () => {
10941097
);
10951098
const worker = client.newWorker();
10961099

1097-
const handle = await workflow.run();
1100+
try {
1101+
const handle = await workflow.run();
10981102

1099-
// start processing in the background
1100-
const tickPromise = worker.tick();
1101-
await sleep(25);
1103+
// start processing in the background
1104+
const tickPromise = worker.tick();
1105+
await sleep(25);
11021106

1103-
// cancel while step is executing
1104-
await handle.cancel();
1107+
// cancel while step is executing
1108+
await handle.cancel();
11051109

1106-
// wait for tick to complete
1107-
await tickPromise;
1108-
await worker.stop();
1110+
// wait for tick to complete
1111+
await tickPromise;
1112+
await worker.stop();
11091113

1110-
// step should have been executed but workflow should be canceled
1111-
expect(stepExecuted).toBe(true);
1112-
const canceled = await backend.getWorkflowRun({
1113-
workflowRunId: handle.workflowRun.id,
1114-
});
1115-
expect(canceled?.status).toBe("canceled");
1114+
// step should have been executed but workflow should be canceled
1115+
expect(stepExecuted).toBe(true);
1116+
const canceled = await backend.getWorkflowRun({
1117+
workflowRunId: handle.workflowRun.id,
1118+
});
1119+
expect(canceled?.status).toBe("canceled");
1120+
expect(consoleErrorSpy).not.toHaveBeenCalledWith(
1121+
expect.stringContaining("Critical error during workflow execution"),
1122+
expect.anything(),
1123+
);
1124+
} finally {
1125+
consoleErrorSpy.mockRestore();
1126+
}
11161127
});
11171128

11181129
test("result() rejects for canceled workflows", async () => {

0 commit comments

Comments
 (0)