Skip to content

Commit bc18f2f

Browse files
fix(openworkflow): park replay pre-pass on earliest running wait
1 parent d02ba09 commit bc18f2f

2 files changed

Lines changed: 138 additions & 28 deletions

File tree

packages/openworkflow/worker/execution.test.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1636,6 +1636,81 @@ describe("StepExecutor", () => {
16361636
expect(runningWorkflowStepNames).toEqual(["wait-long", "wait-short"]);
16371637
});
16381638

1639+
test("replay pre-pass parks on earliest running wait across sleep and runWorkflow history", async () => {
1640+
const backend = await createBackend();
1641+
1642+
const workflowRun = await backend.createWorkflowRun({
1643+
workflowName: `workflow-replay-earliest-wait-${randomUUID()}`,
1644+
version: null,
1645+
idempotencyKey: null,
1646+
config: {},
1647+
context: null,
1648+
input: null,
1649+
parentStepAttemptNamespaceId: null,
1650+
parentStepAttemptId: null,
1651+
availableAt: null,
1652+
deadlineAt: null,
1653+
});
1654+
1655+
const claimedWorkflowRun = await backend.claimWorkflowRun({
1656+
workerId: randomUUID(),
1657+
leaseDurationMs: 5000,
1658+
});
1659+
if (!claimedWorkflowRun) {
1660+
throw new Error("Expected workflow run to be claimed");
1661+
}
1662+
expect(claimedWorkflowRun.id).toBe(workflowRun.id);
1663+
if (!claimedWorkflowRun.workerId) {
1664+
throw new Error("Expected claimed workflow run worker id");
1665+
}
1666+
1667+
await backend.createStepAttempt({
1668+
workflowRunId: claimedWorkflowRun.id,
1669+
workerId: claimedWorkflowRun.workerId,
1670+
stepName: "sleep-late",
1671+
kind: "sleep",
1672+
config: {},
1673+
context: {
1674+
kind: "sleep",
1675+
resumeAt: new Date(Date.now() + 120_000).toISOString(),
1676+
},
1677+
});
1678+
await backend.createStepAttempt({
1679+
workflowRunId: claimedWorkflowRun.id,
1680+
workerId: claimedWorkflowRun.workerId,
1681+
stepName: "wait-early",
1682+
kind: "workflow",
1683+
config: {},
1684+
context: {
1685+
kind: "workflow",
1686+
timeoutAt: new Date(Date.now() + 5000).toISOString(),
1687+
},
1688+
});
1689+
1690+
await executeWorkflow({
1691+
backend,
1692+
workflowRun: claimedWorkflowRun,
1693+
workflowFn: () => "unreachable",
1694+
workflowVersion: null,
1695+
workerId: claimedWorkflowRun.workerId,
1696+
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
1697+
});
1698+
1699+
const parkedRun = await backend.getWorkflowRun({
1700+
workflowRunId: workflowRun.id,
1701+
});
1702+
expect(parkedRun?.status).toBe("running");
1703+
expect(parkedRun?.workerId).toBeNull();
1704+
expect(parkedRun?.availableAt).not.toBeNull();
1705+
if (!parkedRun?.availableAt) {
1706+
throw new Error("Expected parked workflow availableAt");
1707+
}
1708+
1709+
const millisecondsUntilWake = parkedRun.availableAt.getTime() - Date.now();
1710+
expect(millisecondsUntilWake).toBeGreaterThan(1000);
1711+
expect(millisecondsUntilWake).toBeLessThan(20_000);
1712+
});
1713+
16391714
test("best-effort fences late parallel branches after parent parks", async () => {
16401715
const backend = await createBackend();
16411716
const client = new OpenWorkflow({ backend });

packages/openworkflow/worker/execution.ts

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,55 @@ function getEarliestRunningWaitResumeAt(
376376
return earliest;
377377
}
378378

379+
/**
380+
* Complete running sleep step attempts whose resume timestamp has elapsed.
381+
* Malformed historical resume timestamps are treated as elapsed for backward
382+
* compatibility.
383+
* @param options - Sleep pre-pass options
384+
* @returns Whether any running sleep remains pending after completion pass
385+
*/
386+
async function completeElapsedRunningSleepAttempts(
387+
options: Readonly<{
388+
backend: Backend;
389+
workflowRunId: string;
390+
workerId: string;
391+
attempts: StepAttempt[];
392+
}>,
393+
): Promise<boolean> {
394+
let hasPendingRunningSleep = false;
395+
396+
for (let i = 0; i < options.attempts.length; i += 1) {
397+
const attempt = options.attempts[i];
398+
if (!attempt) continue;
399+
400+
if (
401+
attempt.status !== "running" ||
402+
attempt.kind !== "sleep" ||
403+
attempt.context?.kind !== "sleep"
404+
) {
405+
continue;
406+
}
407+
408+
const resumeAt = new Date(attempt.context.resumeAt);
409+
const resumeAtMs = resumeAt.getTime();
410+
if (Number.isFinite(resumeAtMs) && Date.now() < resumeAtMs) {
411+
hasPendingRunningSleep = true;
412+
continue;
413+
}
414+
415+
const completed = await options.backend.completeStepAttempt({
416+
workflowRunId: options.workflowRunId,
417+
stepAttemptId: attempt.id,
418+
workerId: options.workerId,
419+
output: null,
420+
});
421+
422+
options.attempts[i] = completed;
423+
}
424+
425+
return hasPendingRunningSleep;
426+
}
427+
379428
/**
380429
* Load all step attempts for a workflow run.
381430
* @param backend - Backend instance
@@ -1009,37 +1058,23 @@ export async function executeWorkflow(
10091058
workflowRun.id,
10101059
);
10111060

1012-
// mark any sleep steps as completed if their sleep duration has elapsed,
1013-
// or rethrow SleepSignal if still sleeping
1014-
for (let i = 0; i < attempts.length; i++) {
1015-
const attempt = attempts[i];
1016-
if (!attempt) continue;
1061+
// complete any elapsed sleep waits first, then park on the earliest
1062+
// remaining running wait (sleep or runWorkflow timeout).
1063+
const hasPendingRunningSleep = await completeElapsedRunningSleepAttempts({
1064+
backend,
1065+
workflowRunId: workflowRun.id,
1066+
workerId,
1067+
attempts,
1068+
});
10171069

1070+
if (hasPendingRunningSleep) {
1071+
const earliestRunningWaitResumeAt =
1072+
getEarliestRunningWaitResumeAt(attempts);
10181073
if (
1019-
attempt.status === "running" &&
1020-
attempt.kind === "sleep" &&
1021-
attempt.context?.kind === "sleep"
1074+
earliestRunningWaitResumeAt &&
1075+
Date.now() < earliestRunningWaitResumeAt.getTime()
10221076
) {
1023-
const now = Date.now();
1024-
const resumeAt = new Date(attempt.context.resumeAt);
1025-
const resumeAtMs = resumeAt.getTime();
1026-
1027-
if (now < resumeAtMs) {
1028-
// sleep duration HAS NOT elapsed yet, throw signal to put workflow
1029-
// back to sleep
1030-
throw new SleepSignal(resumeAt);
1031-
}
1032-
1033-
// sleep duration HAS elapsed, mark the step as completed and continue
1034-
const completed = await backend.completeStepAttempt({
1035-
workflowRunId: workflowRun.id,
1036-
stepAttemptId: attempt.id,
1037-
workerId,
1038-
output: null,
1039-
});
1040-
1041-
// update cache w/ completed attempt
1042-
attempts[i] = completed;
1077+
throw new SleepSignal(earliestRunningWaitResumeAt);
10431078
}
10441079
}
10451080

0 commit comments

Comments
 (0)