diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 7805a0af..d3e7372b 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -421,33 +421,37 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); - test("collapses concurrent creates with same key to one run id", async () => { - const backend = await setup(); - const workflowName = randomUUID(); - const version = "v1"; - const idempotencyKey = randomUUID(); - - const runs = await Promise.all( - Array.from({ length: 10 }, (_, i) => - backend.createWorkflowRun({ - workflowName, - version, - idempotencyKey, - input: { i }, - config: {}, - context: null, - parentStepAttemptNamespaceId: null, - parentStepAttemptId: null, - availableAt: null, - deadlineAt: null, - }), - ), - ); + test( + "collapses concurrent creates with same key to one run id", + { timeout: 15_000 }, + async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const idempotencyKey = randomUUID(); + + const runs = await Promise.all( + Array.from({ length: 10 }, (_, i) => + backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey, + input: { i }, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }), + ), + ); - const uniqueRunIds = new Set(runs.map((run) => run.id)); - expect(uniqueRunIds.size).toBe(1); - await teardown(backend); - }); + const uniqueRunIds = new Set(runs.map((run) => run.id)); + expect(uniqueRunIds.size).toBe(1); + await teardown(backend); + }, + ); test("returns existing completed run for matching key", async () => { const backend = await setup(); diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 9b6b7792..ef8dce19 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -296,7 +296,7 @@ describe("StepExecutor", () => { // First tick - hits sleep await worker.tick(); - await sleep(50); // Wait for tick to complete + await sleep(200); // Wait for tick to complete const parked = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); @@ -1803,356 +1803,373 @@ describe("StepExecutor", () => { await expect(handle.result()).resolves.toBe(10); }); - test("fans out 300 isolated child workflows in parallel and notifies once all complete", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); + test( + "fans out 300 isolated child workflows in parallel and notifies once all complete", + { timeout: 30_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - let activeChildren = 0; - let maxParallelChildren = 0; - let notifyStepCalls = 0; + let activeChildren = 0; + let maxParallelChildren = 0; + let notifyStepCalls = 0; - const child = client.defineWorkflow< - { childId: number }, - { childId: number; childRunId: string } - >( - { name: `workflow-child-fan-out-${randomUUID()}` }, - async ({ input, run, step }) => { - return await step.run({ name: "work" }, async () => { - activeChildren += 1; - maxParallelChildren = Math.max(maxParallelChildren, activeChildren); - - try { - await sleep(5); - return { - childId: input.childId, - childRunId: run.id, - }; - } finally { - activeChildren -= 1; - } - }); - }, - ); + const child = client.defineWorkflow< + { childId: number }, + { childId: number; childRunId: string } + >( + { name: `workflow-child-fan-out-${randomUUID()}` }, + async ({ input, run, step }) => { + return await step.run({ name: "work" }, async () => { + activeChildren += 1; + maxParallelChildren = Math.max(maxParallelChildren, activeChildren); - const parent = client.defineWorkflow< - undefined, - { - childResults: { childId: number; childRunId: string }[]; - notifyMessage: string; - } - >({ name: `workflow-parent-fan-out-${randomUUID()}` }, async ({ step }) => { - const childResults = await Promise.all( - Array.from({ length: 300 }, (_, index) => - step.runWorkflow( - child.workflow.spec, - { childId: index + 1 }, - { name: `fan-out-child-${String(index + 1)}` }, - ), - ), - ); - - const notifyMessage = await step.run( - { name: "notify-parent-complete" }, - () => { - notifyStepCalls += 1; - return `Completed ${String(childResults.length)} child workflows`; + try { + await sleep(5); + return { + childId: input.childId, + childRunId: run.id, + }; + } finally { + activeChildren -= 1; + } + }); }, ); - return { childResults, notifyMessage }; - }); + const parent = client.defineWorkflow< + undefined, + { + childResults: { childId: number; childRunId: string }[]; + notifyMessage: string; + } + >( + { name: `workflow-parent-fan-out-${randomUUID()}` }, + async ({ step }) => { + const childResults = await Promise.all( + Array.from({ length: 300 }, (_, index) => + step.runWorkflow( + child.workflow.spec, + { childId: index + 1 }, + { name: `fan-out-child-${String(index + 1)}` }, + ), + ), + ); - const worker = client.newWorker({ concurrency: 350 }); - const handle = await parent.run(); - const status = await tickUntilTerminal( - backend, - worker, - handle.workflowRun.id, - 800, - 10, - { maxWaitMs: 10_000 }, - ); + const notifyMessage = await step.run( + { name: "notify-parent-complete" }, + () => { + notifyStepCalls += 1; + return `Completed ${String(childResults.length)} child workflows`; + }, + ); - expect(status).toBe("completed"); + return { childResults, notifyMessage }; + }, + ); - const result = await handle.result(); - expect(result.childResults).toHaveLength(300); - expect(result.notifyMessage).toBe("Completed 300 child workflows"); - expect( - new Set(result.childResults.map((childResult) => childResult.childRunId)) - .size, - ).toBe(300); - expect(maxParallelChildren).toBeGreaterThan(1); - expect(notifyStepCalls).toBe(1); + const worker = client.newWorker({ concurrency: 350 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 800, + 10, + { maxWaitMs: 20_000 }, + ); - const parentAttempts = await backend.listStepAttempts({ - workflowRunId: handle.workflowRun.id, - limit: 400, - }); - const workflowAttempts = parentAttempts.data.filter( - (stepAttempt) => stepAttempt.kind === "workflow", - ); - expect(workflowAttempts).toHaveLength(300); - const maxWorkflowFinishedAt = Math.max( - ...workflowAttempts.map( - (workflowAttempt) => workflowAttempt.finishedAt?.getTime() ?? 0, - ), - ); - const notifyAttempt = parentAttempts.data.find( - (stepAttempt) => stepAttempt.stepName === "notify-parent-complete", - ); - if (!notifyAttempt) { - throw new Error("Expected notify-parent-complete step attempt"); - } - if (notifyAttempt.startedAt === null) { - throw new Error("Expected notify-parent-complete startedAt timestamp"); - } - expect(notifyAttempt.startedAt.getTime()).toBeGreaterThanOrEqual( - maxWorkflowFinishedAt, - ); + expect(status).toBe("completed"); - const linkedChildRunIds = workflowAttempts.map( - (workflowAttempt) => workflowAttempt.childWorkflowRunId, - ); - expect(linkedChildRunIds).not.toContain(null); - expect(new Set(linkedChildRunIds).size).toBe(300); + const result = await handle.result(); + expect(result.childResults).toHaveLength(300); + expect(result.notifyMessage).toBe("Completed 300 child workflows"); + expect( + new Set( + result.childResults.map((childResult) => childResult.childRunId), + ).size, + ).toBe(300); + expect(maxParallelChildren).toBeGreaterThan(1); + expect(notifyStepCalls).toBe(1); + + const parentAttempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 400, + }); + const workflowAttempts = parentAttempts.data.filter( + (stepAttempt) => stepAttempt.kind === "workflow", + ); + expect(workflowAttempts).toHaveLength(300); + const maxWorkflowFinishedAt = Math.max( + ...workflowAttempts.map( + (workflowAttempt) => workflowAttempt.finishedAt?.getTime() ?? 0, + ), + ); + const notifyAttempt = parentAttempts.data.find( + (stepAttempt) => stepAttempt.stepName === "notify-parent-complete", + ); + if (!notifyAttempt) { + throw new Error("Expected notify-parent-complete step attempt"); + } + if (notifyAttempt.startedAt === null) { + throw new Error("Expected notify-parent-complete startedAt timestamp"); + } + expect(notifyAttempt.startedAt.getTime()).toBeGreaterThanOrEqual( + maxWorkflowFinishedAt, + ); - const runs = await backend.listWorkflowRuns({ limit: 500 }); - const linkedChildRunIdSet = new Set(linkedChildRunIds); - const linkedChildRuns = runs.data.filter((run) => - linkedChildRunIdSet.has(run.id), - ); - expect(linkedChildRuns).toHaveLength(300); + const linkedChildRunIds = workflowAttempts.map( + (workflowAttempt) => workflowAttempt.childWorkflowRunId, + ); + expect(linkedChildRunIds).not.toContain(null); + expect(new Set(linkedChildRunIds).size).toBe(300); - expect(notifyAttempt.status).toBe("completed"); - }); + const runs = await backend.listWorkflowRuns({ limit: 500 }); + const linkedChildRunIdSet = new Set(linkedChildRunIds); + const linkedChildRuns = runs.data.filter((run) => + linkedChildRunIdSet.has(run.id), + ); + expect(linkedChildRuns).toHaveLength(300); - test("fails parent when child 150 fails and does not auto-replay the failed child", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); + expect(notifyAttempt.status).toBe("completed"); + }, + ); - let injectedFailures = 0; - let notifyStepCalls = 0; - const failedChildIds = new Set(); + test( + "fails parent when child 150 fails and does not auto-replay the failed child", + { timeout: 30_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - const child = client.defineWorkflow< - { childId: number }, - { childId: number; childRunId: string } - >( - { - name: `workflow-child-fan-out-replay-${randomUUID()}`, - retryPolicy: { - initialInterval: "1ms", - maximumInterval: "1ms", - backoffCoefficient: 1, - maximumAttempts: 1, - }, - }, - async ({ input, run, step }) => { - return await step.run( - { name: "work", retryPolicy: { maximumAttempts: 1 } }, - async () => { - if (input.childId === 150 && !failedChildIds.has(input.childId)) { - failedChildIds.add(input.childId); - injectedFailures += 1; - throw new Error("Injected failure at child 150"); - } + let injectedFailures = 0; + let notifyStepCalls = 0; + const failedChildIds = new Set(); - await sleep(5); - return { childId: input.childId, childRunId: run.id }; + const child = client.defineWorkflow< + { childId: number }, + { childId: number; childRunId: string } + >( + { + name: `workflow-child-fan-out-replay-${randomUUID()}`, + retryPolicy: { + initialInterval: "1ms", + maximumInterval: "1ms", + backoffCoefficient: 1, + maximumAttempts: 1, }, - ); - }, - ); - - const parent = client.defineWorkflow< - undefined, - { - childResults: { childId: number; childRunId: string }[]; - notifyMessage: string; - } - >( - { - name: `workflow-parent-fan-out-replay-${randomUUID()}`, - retryPolicy: { - initialInterval: "1ms", - maximumInterval: "1ms", - backoffCoefficient: 1, - maximumAttempts: 1, }, - }, - async ({ step }) => { - const childResults = await Promise.all( - Array.from({ length: 300 }, (_, index) => - step.runWorkflow( - child.workflow.spec, - { childId: index + 1 }, - { name: `fan-out-child-${String(index + 1)}` }, - ), - ), - ); + async ({ input, run, step }) => { + return await step.run( + { name: "work", retryPolicy: { maximumAttempts: 1 } }, + async () => { + if (input.childId === 150 && !failedChildIds.has(input.childId)) { + failedChildIds.add(input.childId); + injectedFailures += 1; + throw new Error("Injected failure at child 150"); + } + + await sleep(5); + return { childId: input.childId, childRunId: run.id }; + }, + ); + }, + ); - const notifyMessage = await step.run( - { name: "notify-parent-complete" }, - () => { - notifyStepCalls += 1; - return `Completed ${String(childResults.length)} child workflows`; + const parent = client.defineWorkflow< + undefined, + { + childResults: { childId: number; childRunId: string }[]; + notifyMessage: string; + } + >( + { + name: `workflow-parent-fan-out-replay-${randomUUID()}`, + retryPolicy: { + initialInterval: "1ms", + maximumInterval: "1ms", + backoffCoefficient: 1, + maximumAttempts: 1, }, - ); - - return { childResults, notifyMessage }; - }, - ); + }, + async ({ step }) => { + const childResults = await Promise.all( + Array.from({ length: 300 }, (_, index) => + step.runWorkflow( + child.workflow.spec, + { childId: index + 1 }, + { name: `fan-out-child-${String(index + 1)}` }, + ), + ), + ); - const worker = client.newWorker({ concurrency: 350 }); - const handle = await parent.run(); - const status = await tickUntilTerminal( - backend, - worker, - handle.workflowRun.id, - 1200, - 10, - { maxWaitMs: 20_000 }, - ); + const notifyMessage = await step.run( + { name: "notify-parent-complete" }, + () => { + notifyStepCalls += 1; + return `Completed ${String(childResults.length)} child workflows`; + }, + ); - expect(status).toBe("failed"); - await expect(handle.result()).rejects.toThrow( - /Injected failure at child 150/, - ); - expect(injectedFailures).toBe(1); - expect(notifyStepCalls).toBe(0); + return { childResults, notifyMessage }; + }, + ); - const parentAttempts = await backend.listStepAttempts({ - workflowRunId: handle.workflowRun.id, - limit: 1200, - }); - const workflowAttempts = parentAttempts.data.filter( - (stepAttempt) => stepAttempt.kind === "workflow", - ); - const failedWorkflowAttempts = workflowAttempts.filter( - (stepAttempt) => stepAttempt.status === "failed", - ); - const completedWorkflowAttempts = workflowAttempts.filter( - (stepAttempt) => stepAttempt.status === "completed", - ); + const worker = client.newWorker({ concurrency: 350 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 1200, + 10, + { maxWaitMs: 20_000 }, + ); - expect(failedWorkflowAttempts).toHaveLength(1); - expect(completedWorkflowAttempts.length).toBeGreaterThan(0); - expect(completedWorkflowAttempts.length).toBeLessThanOrEqual(299); - expect(workflowAttempts).toHaveLength(300); + expect(status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /Injected failure at child 150/, + ); + expect(injectedFailures).toBe(1); + expect(notifyStepCalls).toBe(0); - const child150Attempts = workflowAttempts.filter((stepAttempt) => - stepAttempt.stepName.startsWith("fan-out-child-150"), - ); - expect(child150Attempts).toHaveLength(1); - expect(child150Attempts[0]?.status).toBe("failed"); + const parentAttempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 1200, + }); + const workflowAttempts = parentAttempts.data.filter( + (stepAttempt) => stepAttempt.kind === "workflow", + ); + const failedWorkflowAttempts = workflowAttempts.filter( + (stepAttempt) => stepAttempt.status === "failed", + ); + const completedWorkflowAttempts = workflowAttempts.filter( + (stepAttempt) => stepAttempt.status === "completed", + ); - expect( - parentAttempts.data.some( - (stepAttempt) => stepAttempt.stepName === "notify-parent-complete", - ), - ).toBe(false); + expect(failedWorkflowAttempts).toHaveLength(1); + expect(completedWorkflowAttempts.length).toBeGreaterThan(0); + expect(completedWorkflowAttempts.length).toBeLessThanOrEqual(299); + expect(workflowAttempts).toHaveLength(300); - const runs = await backend.listWorkflowRuns({ limit: 1200 }); - const childRuns = runs.data.filter( - (run) => - run.workflowName === child.workflow.spec.name && - run.parentStepAttemptId !== null, - ); - expect(childRuns).toHaveLength(300); - }); + const child150Attempts = workflowAttempts.filter((stepAttempt) => + stepAttempt.stepName.startsWith("fan-out-child-150"), + ); + expect(child150Attempts).toHaveLength(1); + expect(child150Attempts[0]?.status).toBe("failed"); - test("completes when child 150 has transient failure handled by child-level retries", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); + expect( + parentAttempts.data.some( + (stepAttempt) => stepAttempt.stepName === "notify-parent-complete", + ), + ).toBe(false); - let injectedFailures = 0; - let notifyStepCalls = 0; - const failedChildIds = new Set(); + const runs = await backend.listWorkflowRuns({ limit: 1200 }); + const childRuns = runs.data.filter( + (run) => + run.workflowName === child.workflow.spec.name && + run.parentStepAttemptId !== null, + ); + expect(childRuns).toHaveLength(300); + }, + ); - const child = client.defineWorkflow< - { childId: number }, - { childId: number; childRunId: string } - >( - { name: `workflow-child-fan-out-child-retry-${randomUUID()}` }, - async ({ input, run, step }) => { - return await step.run( - { name: "work", retryPolicy: { maximumAttempts: 2 } }, - async () => { - if (input.childId === 150 && !failedChildIds.has(input.childId)) { - failedChildIds.add(input.childId); - injectedFailures += 1; - throw new Error("Injected transient failure at child 150"); - } + test( + "completes when child 150 has transient failure handled by child-level retries", + { timeout: 30_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - await sleep(5); - return { childId: input.childId, childRunId: run.id }; - }, - ); - }, - ); + let injectedFailures = 0; + let notifyStepCalls = 0; + const failedChildIds = new Set(); + + const child = client.defineWorkflow< + { childId: number }, + { childId: number; childRunId: string } + >( + { name: `workflow-child-fan-out-child-retry-${randomUUID()}` }, + async ({ input, run, step }) => { + return await step.run( + { name: "work", retryPolicy: { maximumAttempts: 2 } }, + async () => { + if (input.childId === 150 && !failedChildIds.has(input.childId)) { + failedChildIds.add(input.childId); + injectedFailures += 1; + throw new Error("Injected transient failure at child 150"); + } + + await sleep(5); + return { childId: input.childId, childRunId: run.id }; + }, + ); + }, + ); - const parent = client.defineWorkflow< - undefined, - { - childResults: { childId: number; childRunId: string }[]; - notifyMessage: string; - } - >( - { name: `workflow-parent-fan-out-child-retry-${randomUUID()}` }, - async ({ step }) => { - const childResults = await Promise.all( - Array.from({ length: 300 }, (_, index) => - step.runWorkflow( - child.workflow.spec, - { childId: index + 1 }, - { name: `fan-out-child-${String(index + 1)}` }, + const parent = client.defineWorkflow< + undefined, + { + childResults: { childId: number; childRunId: string }[]; + notifyMessage: string; + } + >( + { name: `workflow-parent-fan-out-child-retry-${randomUUID()}` }, + async ({ step }) => { + const childResults = await Promise.all( + Array.from({ length: 300 }, (_, index) => + step.runWorkflow( + child.workflow.spec, + { childId: index + 1 }, + { name: `fan-out-child-${String(index + 1)}` }, + ), ), - ), - ); - - const notifyMessage = await step.run( - { name: "notify-parent-complete" }, - () => { - notifyStepCalls += 1; - return `Completed ${String(childResults.length)} child workflows`; - }, - ); + ); - return { childResults, notifyMessage }; - }, - ); + const notifyMessage = await step.run( + { name: "notify-parent-complete" }, + () => { + notifyStepCalls += 1; + return `Completed ${String(childResults.length)} child workflows`; + }, + ); - const worker = client.newWorker({ concurrency: 350 }); - const handle = await parent.run(); - const status = await tickUntilTerminal( - backend, - worker, - handle.workflowRun.id, - 1200, - 10, - { maxWaitMs: 20_000 }, - ); + return { childResults, notifyMessage }; + }, + ); - expect(status).toBe("completed"); - const result = await handle.result(); - expect(result.childResults).toHaveLength(300); - expect(result.notifyMessage).toBe("Completed 300 child workflows"); - expect( - new Set(result.childResults.map((childResult) => childResult.childRunId)) - .size, - ).toBe(300); - expect(injectedFailures).toBe(1); - expect(notifyStepCalls).toBe(1); + const worker = client.newWorker({ concurrency: 350 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 1200, + 10, + { maxWaitMs: 20_000 }, + ); - const runs = await backend.listWorkflowRuns({ limit: 1200 }); - const childRuns = runs.data.filter( - (run) => - run.workflowName === child.workflow.spec.name && - run.parentStepAttemptId !== null, - ); - expect(childRuns).toHaveLength(300); - }); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result.childResults).toHaveLength(300); + expect(result.notifyMessage).toBe("Completed 300 child workflows"); + expect( + new Set( + result.childResults.map((childResult) => childResult.childRunId), + ).size, + ).toBe(300); + expect(injectedFailures).toBe(1); + expect(notifyStepCalls).toBe(1); + + const runs = await backend.listWorkflowRuns({ limit: 1200 }); + const childRuns = runs.data.filter( + (run) => + run.workflowName === child.workflow.spec.name && + run.parentStepAttemptId !== null, + ); + expect(childRuns).toHaveLength(300); + }, + ); test("auto-indexes duplicate workflow names in parallel Promise.all", async () => { const backend = await createTestBackend(); @@ -2197,14 +2214,17 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - const workflowStepNames = steps.data - .filter( - (stepAttempt) => - stepAttempt.kind === "workflow" && - stepAttempt.stepName.startsWith(child.workflow.spec.name), - ) - .map((stepAttempt) => stepAttempt.stepName) - .toSorted((a, b) => a.localeCompare(b)); + const workflowStepNames = [ + ...new Set( + steps.data + .filter( + (stepAttempt) => + stepAttempt.kind === "workflow" && + stepAttempt.stepName.startsWith(child.workflow.spec.name), + ) + .map((stepAttempt) => stepAttempt.stepName), + ), + ].toSorted((a, b) => a.localeCompare(b)); expect(workflowStepNames).toEqual([ child.workflow.spec.name, `${child.workflow.spec.name}:1`, @@ -3428,7 +3448,7 @@ describe("executeWorkflow", () => { const handle = await workflow.run(); const worker = client.newWorker(); await worker.tick(); - await sleep(50); + await sleep(200); const workflowRun = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, @@ -3455,7 +3475,7 @@ describe("executeWorkflow", () => { // first tick - hits sleep await worker.tick(); - await sleep(50); + await sleep(200); const parked = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, @@ -3744,7 +3764,7 @@ describe("executeWorkflow", () => { const worker = client.newWorker(); const handle = await workflow.run(); await worker.tick(); - await sleep(50); + await sleep(200); await worker.tick(); await handle.result(); diff --git a/packages/openworkflow/worker/worker.test.ts b/packages/openworkflow/worker/worker.test.ts index 843633ec..a8470638 100644 --- a/packages/openworkflow/worker/worker.test.ts +++ b/packages/openworkflow/worker/worker.test.ts @@ -292,7 +292,7 @@ describe("Worker", () => { expect(maxTime - minTime).toBeLessThan(100); }); - test("respects worker concurrency limit", async () => { + test("respects worker concurrency limit", { timeout: 15_000 }, async () => { const backend = await createTestBackend(); const client = new OpenWorkflow({ backend }); @@ -313,7 +313,7 @@ describe("Worker", () => { ]); await worker.tick(); - await sleep(100); + await sleep(200); let completed = 0; for (const handle of handles) { @@ -508,182 +508,194 @@ describe("Worker", () => { }); }); - test("worker only sleeps between claims when no work is available", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); + test( + "worker only sleeps between claims when no work is available", + { timeout: 15_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - const workflow = client.defineWorkflow( - { name: "adaptive-test" }, - async ({ step }) => { - await step.run({ name: "step-1" }, () => "done"); - return "complete"; - }, - ); + const workflow = client.defineWorkflow( + { name: "adaptive-test" }, + async ({ step }) => { + await step.run({ name: "step-1" }, () => "done"); + return "complete"; + }, + ); - // enqueue many workflows - const handles = []; - for (let i = 0; i < 20; i++) { - handles.push(await workflow.run()); - } + // enqueue many workflows + const handles = []; + for (let i = 0; i < 20; i++) { + handles.push(await workflow.run()); + } - const worker = client.newWorker({ concurrency: 5 }); + const worker = client.newWorker({ concurrency: 5 }); - const startTime = Date.now(); - await worker.start(); + const startTime = Date.now(); + await worker.start(); - // wait for all workflows to complete - await Promise.all(handles.map((h) => h.result())); - await worker.stop(); + // wait for all workflows to complete + await Promise.all(handles.map((h) => h.result())); + await worker.stop(); - const duration = Date.now() - startTime; + const duration = Date.now() - startTime; - // with this conditional sleep, all workflows should complete quickly - // without it (with 100ms sleep between ticks), it would take much longer - expect(duration).toBeLessThan(3000); // should complete in under 3 seconds - }); + // with this conditional sleep, all workflows should complete quickly + // without it (with 100ms sleep between ticks), it would take much longer + expect(duration).toBeLessThan(10_000); + }, + ); - test("only failed steps re-execute on retry", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); - - const executionCounts = { - stepA: 0, - stepB: 0, - stepC: 0, - }; + test( + "only failed steps re-execute on retry", + { timeout: 15_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - const workflow = client.defineWorkflow( - { name: "mixed-retry" }, - async ({ step }) => { - const a = await step.run({ name: "step-a" }, () => { - executionCounts.stepA++; - return "a-result"; - }); + const executionCounts = { + stepA: 0, + stepB: 0, + stepC: 0, + }; - const b = await step.run({ name: "step-b" }, () => { - executionCounts.stepB++; - if (executionCounts.stepB === 1) { - throw new Error("Step B fails on first attempt"); - } - return "b-result"; - }); + const workflow = client.defineWorkflow( + { name: "mixed-retry" }, + async ({ step }) => { + const a = await step.run({ name: "step-a" }, () => { + executionCounts.stepA++; + return "a-result"; + }); - const c = await step.run({ name: "step-c" }, () => { - executionCounts.stepC++; - return "c-result"; - }); + const b = await step.run({ name: "step-b" }, () => { + executionCounts.stepB++; + if (executionCounts.stepB === 1) { + throw new Error("Step B fails on first attempt"); + } + return "b-result"; + }); - return { a, b, c }; - }, - ); + const c = await step.run({ name: "step-c" }, () => { + executionCounts.stepC++; + return "c-result"; + }); - const worker = client.newWorker(); - const handle = await workflow.run(); - - // first workflow attempt - // - step-a succeeds - // - step-b fails - // - step-c never runs (workflow fails at step-b) - await worker.tick(); - await sleep(100); - expect(executionCounts.stepA).toBe(1); - expect(executionCounts.stepB).toBe(1); - expect(executionCounts.stepC).toBe(0); + return { a, b, c }; + }, + ); - // wait for backoff - await sleep(1100); + const worker = client.newWorker(); + const handle = await workflow.run(); - // second workflow attempt - // - step-a should be cached (not re-executed) - // - step-b should be re-executed (failed previously) - // - step-c should execute for first time - await worker.tick(); - await sleep(100); - expect(executionCounts.stepA).toBe(1); // still 1, was cached - expect(executionCounts.stepB).toBe(2); // incremented, was retried - expect(executionCounts.stepC).toBe(1); // incremented, first execution + // first workflow attempt + // - step-a succeeds + // - step-b fails + // - step-c never runs (workflow fails at step-b) + await worker.tick(); + await sleep(200); + expect(executionCounts.stepA).toBe(1); + expect(executionCounts.stepB).toBe(1); + expect(executionCounts.stepC).toBe(0); + + // wait for backoff + await sleep(1100); + + // second workflow attempt + // - step-a should be cached (not re-executed) + // - step-b should be re-executed (failed previously) + // - step-c should execute for first time + await worker.tick(); + await sleep(200); + expect(executionCounts.stepA).toBe(1); // still 1, was cached + expect(executionCounts.stepB).toBe(2); // incremented, was retried + expect(executionCounts.stepC).toBe(1); // incremented, first execution - const result = await handle.result(); - expect(result).toEqual({ - a: "a-result", - b: "b-result", - c: "c-result", - }); - }); + const result = await handle.result(); + expect(result).toEqual({ + a: "a-result", + b: "b-result", + c: "c-result", + }); + }, + ); - test("step.sleep postpones workflow execution", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); + test( + "step.sleep postpones workflow execution", + { timeout: 15_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - let stepCount = 0; - const workflow = client.defineWorkflow( - { name: "sleep-test" }, - async ({ step }) => { - const before = await step.run({ name: "before-sleep" }, () => { - stepCount++; - return "before"; - }); + let stepCount = 0; + const workflow = client.defineWorkflow( + { name: "sleep-test" }, + async ({ step }) => { + const before = await step.run({ name: "before-sleep" }, () => { + stepCount++; + return "before"; + }); - await step.sleep("pause", "100ms"); + await step.sleep("pause", "500ms"); - const after = await step.run({ name: "after-sleep" }, () => { - stepCount++; - return "after"; - }); + const after = await step.run({ name: "after-sleep" }, () => { + stepCount++; + return "after"; + }); - return { before, after }; - }, - ); + return { before, after }; + }, + ); - const worker = client.newWorker(); - const handle = await workflow.run(); + const worker = client.newWorker(); + const handle = await workflow.run(); - // first execution - runs before-sleep, then sleeps - await worker.tick(); - await sleep(50); // wait for processing - expect(stepCount).toBe(1); + // first execution - runs before-sleep, then sleeps + await worker.tick(); + await sleep(200); // wait for processing + expect(stepCount).toBe(1); - // verify workflow was postponed while remaining in running status - const slept = await backend.getWorkflowRun({ - workflowRunId: handle.workflowRun.id, - }); - expect(slept?.status).toBe("running"); - expect(slept?.workerId).toBeNull(); // released during sleep - expect(slept?.availableAt).not.toBeNull(); - if (!slept?.availableAt) throw new Error("availableAt should be set"); - const delayMs = slept.availableAt.getTime() - Date.now(); - expect(delayMs).toBeGreaterThan(0); - expect(delayMs).toBeLessThan(150); // should be ~100ms - - // verify sleep step is in "running" state during sleep - const attempts = await backend.listStepAttempts({ - workflowRunId: handle.workflowRun.id, - }); - const sleepStep = attempts.data.find((a) => a.stepName === "pause"); - expect(sleepStep?.status).toBe("running"); + // verify workflow was postponed while remaining in running status + const slept = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(slept?.status).toBe("running"); + expect(slept?.workerId).toBeNull(); // released during sleep + expect(slept?.availableAt).not.toBeNull(); + if (!slept?.availableAt) throw new Error("availableAt should be set"); + const delayMs = slept.availableAt.getTime() - Date.now(); + expect(delayMs).toBeGreaterThan(0); + expect(delayMs).toBeLessThan(550); // should be ~500ms + + // verify sleep step is in "running" state during sleep + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + }); + const sleepStep = attempts.data.find((a) => a.stepName === "pause"); + expect(sleepStep?.status).toBe("running"); - // wait for sleep duration - await sleep(150); + // wait for sleep duration + await sleep(400); - // second execution (after sleep) - await worker.tick(); - await sleep(50); // wait for processing - expect(stepCount).toBe(2); + // second execution (after sleep) + await worker.tick(); + await sleep(200); // wait for processing + expect(stepCount).toBe(2); - // verify sleep step is now "completed" - const refreshedAttempts = await backend.listStepAttempts({ - workflowRunId: handle.workflowRun.id, - }); - const completedSleepStep = refreshedAttempts.data.find( - (a) => a.stepName === "pause", - ); - expect(completedSleepStep?.status).toBe("completed"); + // verify sleep step is now "completed" + const refreshedAttempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + }); + const completedSleepStep = refreshedAttempts.data.find( + (a) => a.stepName === "pause", + ); + expect(completedSleepStep?.status).toBe("completed"); - const result = await handle.result(); - expect(result).toEqual({ before: "before", after: "after" }); - }); + const result = await handle.result(); + expect(result).toEqual({ before: "before", after: "after" }); + }, + ); - test("step.sleep is cached on replay", async () => { + test("step.sleep is cached on replay", { timeout: 15_000 }, async () => { const backend = await createTestBackend(); const client = new OpenWorkflow({ backend }); @@ -698,7 +710,7 @@ describe("Worker", () => { }); // this should only postpone once - await step.sleep("wait", "50ms"); + await step.sleep("wait", "200ms"); await step.run({ name: "step-2" }, () => { step2Count++; @@ -714,15 +726,15 @@ describe("Worker", () => { // first attempt: execute step-1, then sleep (step-2 not executed) await worker.tick(); - await sleep(50); + await sleep(200); expect(step1Count).toBe(1); expect(step2Count).toBe(0); - await sleep(100); // wait for sleep to complete + await sleep(200); // wait for sleep to complete // second attempt: step-1 is cached (not re-executed), sleep is cached, step-2 executes await worker.tick(); - await sleep(50); + await sleep(200); expect(step1Count).toBe(1); // still 1, was cached expect(step2Count).toBe(1); // now 1, executed after sleep @@ -770,9 +782,9 @@ describe("Worker", () => { executionCount++; await step.run({ name: "step-1" }, () => "one"); - await step.sleep("sleep-1", "50ms"); + await step.sleep("sleep-1", "200ms"); await step.run({ name: "step-2" }, () => "two"); - await step.sleep("sleep-2", "50ms"); + await step.sleep("sleep-2", "200ms"); await step.run({ name: "step-3" }, () => "three"); return "done"; @@ -784,7 +796,7 @@ describe("Worker", () => { // first execution: step-1, then sleep-1 await worker.tick(); - await sleep(50); + await sleep(200); expect(executionCount).toBe(1); // verify first sleep is running @@ -796,11 +808,11 @@ describe("Worker", () => { ); // wait for first sleep - await sleep(100); + await sleep(200); // second execution: sleep-1 completed, step-2, then sleep-2 await worker.tick(); - await sleep(50); + await sleep(200); expect(executionCount).toBe(2); // verify second sleep is running @@ -815,11 +827,11 @@ describe("Worker", () => { ); // wait for second sleep - await sleep(100); + await sleep(200); // third execution: sleep-2 completed, step-3, complete await worker.tick(); - await sleep(50); + await sleep(200); expect(executionCount).toBe(3); const result = await handle.result(); @@ -835,119 +847,127 @@ describe("Worker", () => { ); }); - test("parked workflows can be claimed after availableAt", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); - - const workflow = client.defineWorkflow( - { name: "parked-claim-test" }, - async ({ step }) => { - await step.run({ name: "before" }, () => "before"); - await step.sleep("wait", "100ms"); - await step.run({ name: "after" }, () => "after"); - return "done"; - }, - ); + test( + "parked workflows can be claimed after availableAt", + { timeout: 15_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - const worker = client.newWorker(); - const handle = await workflow.run(); + const workflow = client.defineWorkflow( + { name: "parked-claim-test" }, + async ({ step }) => { + await step.run({ name: "before" }, () => "before"); + await step.sleep("wait", "100ms"); + await step.run({ name: "after" }, () => "after"); + return "done"; + }, + ); - // first execution - sleep - await worker.tick(); - await sleep(50); + const worker = client.newWorker(); + const handle = await workflow.run(); - // verify workflow is parked in running state - const parked = await backend.getWorkflowRun({ - workflowRunId: handle.workflowRun.id, - }); - expect(parked?.status).toBe("running"); - expect(parked?.workerId).toBeNull(); + // first execution - sleep + await worker.tick(); + await sleep(200); - // wait for sleep duration - await sleep(100); + // verify workflow is parked in running state + const parked = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parked?.status).toBe("running"); + expect(parked?.workerId).toBeNull(); - // verify workflow can be claimed again - const claimed = await backend.claimWorkflowRun({ - workerId: "test-worker", - leaseDurationMs: 30_000, - }); - expect(claimed?.id).toBe(handle.workflowRun.id); - expect(claimed?.status).toBe("running"); - expect(claimed?.workerId).toBe("test-worker"); - }); + // wait for sleep duration + await sleep(200); - test("sleep is not skipped when worker crashes after creating sleep step but before parking workflow", async () => { - const backend = await createTestBackend(); - const client = new OpenWorkflow({ backend }); + // verify workflow can be claimed again + const claimed = await backend.claimWorkflowRun({ + workerId: "test-worker", + leaseDurationMs: 30_000, + }); + expect(claimed?.id).toBe(handle.workflowRun.id); + expect(claimed?.status).toBe("running"); + expect(claimed?.workerId).toBe("test-worker"); + }, + ); + + test( + "sleep is not skipped when worker crashes after creating sleep step but before parking workflow", + { timeout: 15_000 }, + async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); - let executionCount = 0; - let beforeSleepCount = 0; - let afterSleepCount = 0; + let executionCount = 0; + let beforeSleepCount = 0; + let afterSleepCount = 0; - const workflow = client.defineWorkflow( - { name: "crash-during-sleep" }, - async ({ step }) => { - executionCount++; + const workflow = client.defineWorkflow( + { name: "crash-during-sleep" }, + async ({ step }) => { + executionCount++; - await step.run({ name: "before-sleep" }, () => { - beforeSleepCount++; - return "before"; - }); + await step.run({ name: "before-sleep" }, () => { + beforeSleepCount++; + return "before"; + }); - // this sleep should NOT be skipped even if crash happens - await step.sleep("critical-pause", "200ms"); + // this sleep should NOT be skipped even if crash happens + await step.sleep("critical-pause", "200ms"); - await step.run({ name: "after-sleep" }, () => { - afterSleepCount++; - return "after"; - }); + await step.run({ name: "after-sleep" }, () => { + afterSleepCount++; + return "after"; + }); - return { executionCount, beforeSleepCount, afterSleepCount }; - }, - ); + return { executionCount, beforeSleepCount, afterSleepCount }; + }, + ); - const handle = await workflow.run(); + const handle = await workflow.run(); - // first worker processes the workflow until sleep - const worker1 = client.newWorker(); - await worker1.tick(); - await sleep(100); + // first worker processes the workflow until sleep + const worker1 = client.newWorker(); + await worker1.tick(); + await sleep(200); - const workflowAfterFirst = await backend.getWorkflowRun({ - workflowRunId: handle.workflowRun.id, - }); + const workflowAfterFirst = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); - expect(workflowAfterFirst?.status).toBe("running"); - expect(workflowAfterFirst?.workerId).toBeNull(); + expect(workflowAfterFirst?.status).toBe("running"); + expect(workflowAfterFirst?.workerId).toBeNull(); - const attemptsAfterFirst = await backend.listStepAttempts({ - workflowRunId: handle.workflowRun.id, - }); - const sleepStep = attemptsAfterFirst.data.find( - (a) => a.stepName === "critical-pause", - ); - expect(sleepStep).toBeDefined(); - expect(sleepStep?.kind).toBe("sleep"); - expect(sleepStep?.status).toBe("running"); + const attemptsAfterFirst = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + }); + const sleepStep = attemptsAfterFirst.data.find( + (a) => a.stepName === "critical-pause", + ); + expect(sleepStep).toBeDefined(); + expect(sleepStep?.kind).toBe("sleep"); + expect(sleepStep?.status).toBe("running"); - await sleep(50); // only 50ms of the 200ms sleep + await sleep(50); // only 50ms of the 200ms sleep - // if there's a running sleep step, the workflow should be properly parked - const worker2 = client.newWorker(); - await worker2.tick(); + // if there's a running sleep step, the workflow should be properly parked + const worker2 = client.newWorker(); + await worker2.tick(); - // after-sleep step should NOT have executed yet - expect(afterSleepCount).toBe(0); + // after-sleep step should NOT have executed yet + expect(afterSleepCount).toBe(0); - // wait for the full sleep duration to elapse then check to make sure - // workflow is claimable and resume - await sleep(200); - await worker2.tick(); - await sleep(100); - expect(afterSleepCount).toBe(1); - const result = await handle.result(); - expect(result.afterSleepCount).toBe(1); - }); + // wait for the full sleep duration to elapse then check to make sure + // workflow is claimable and resume + await sleep(300); + await worker2.tick(); + await sleep(200); + expect(afterSleepCount).toBe(1); + const result = await handle.result(); + expect(result.afterSleepCount).toBe(1); + }, + ); test("version enables conditional code paths", async () => { const backend = await createTestBackend();