diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index add1e83b..93607de3 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -49,10 +49,12 @@ A workflow run can be in one of the following states: - **`pending`**: The workflow run has been created and is waiting for a worker to claim it. -- **`running`**: The workflow run is actively being executed by a worker. -- **`sleeping`**: The workflow run is waiting for a duration to elapse - (`step.sleep`) or waiting for a child workflow result (`step.invokeWorkflow`). - The `availableAt` timestamp controls when it becomes available again. +- **`running`**: The workflow run is either actively being executed by a worker + or durably parked with `workerId = null` until `availableAt`. +- **`sleeping`** (deprecated): Legacy parked state kept for backward + compatibility. New runs are parked in `running` instead. +- **`succeeded`** (deprecated): Legacy success state kept for backward + compatibility. New successful runs use `completed`. - **`completed`**: The workflow run has completed successfully. - **`failed`**: The workflow run has failed after exhausting retries or deadline reached. @@ -64,6 +66,8 @@ A workflow run can be in one of the following states: A step attempt can be in one of the following states: - **`running`**: The step attempt is currently being executed. +- **`succeeded`** (deprecated): Legacy success state kept for backward + compatibility. New successful attempts use `completed`. - **`completed`**: The step attempt completed successfully and its result is stored. - **`failed`**: The step attempt failed. The workflow may create a new attempt @@ -133,10 +137,10 @@ of coordination. There is no separate orchestrator server. table with a `pending` status. 3. **Job Polling**: A **Worker** process polls the `workflow_runs` table, looking for runs whose `availableAt` timestamp is in the past and whose - status is either `pending` (new work), `sleeping` (but done), or `running` - with an expired lease. It uses an atomic `FOR UPDATE SKIP LOCKED` query to - claim a single workflow run, setting its status to `running` and extending - the lease. + status is either `pending` (new work), `running` (parked or with an expired + lease), or legacy `sleeping`. It uses an atomic `FOR UPDATE SKIP LOCKED` + query to claim a single workflow run, setting its status to `running` and + extending the lease. 4. **Code Execution (Replay Loop)**: The Worker loads the history of completed `step_attempts` for the claimed workflow. It then executes the workflow code from the beginning, using the history to memoize results of @@ -148,7 +152,7 @@ of coordination. There is no separate orchestrator server. sleep. 6. **State Update**: The Worker updates the Backend with each `step_attempt` as it is created and completed, and updates the status of the `workflow_run` - (e.g., `completed`, `sleeping`). + (e.g., `completed`, `running` for parked waits). ## 3. The Execution Model: State Machine Replication @@ -215,9 +219,10 @@ const user = await step.run({ name: "fetch-user" }, async () => { ``` **`step.sleep(name, duration)`**: Pauses the workflow until a specified time. -When encountered, the worker sets the workflow run's `status` to `sleeping` and -`availableAt` to the resume time, then releases the workflow. This frees up the -worker slot for other work - it's not a blocking sleep but a durable pause. +When encountered, the worker keeps the workflow run's `status` as `running`, +sets `availableAt` to the resume time, clears `workerId`, and releases the +workflow. This frees up the worker slot for other work - it's not a blocking +sleep but a durable pause. ```ts await step.sleep("wait-one-hour", "1h"); diff --git a/packages/dashboard/src/lib/status.ts b/packages/dashboard/src/lib/status.ts index b9cefedd..ac7c5269 100644 --- a/packages/dashboard/src/lib/status.ts +++ b/packages/dashboard/src/lib/status.ts @@ -52,6 +52,7 @@ export const STATUS_CONFIG: Record< badgeClass: "bg-warning/10 border-warning/20 text-warning", }, sleeping: { + // legacy status kept for backward compatibility icon: HourglassIcon, color: "text-sleeping", label: "Sleeping", @@ -90,6 +91,7 @@ export const STEP_STATUS_CONFIG: Record< /** Run statuses that represent a finished workflow (no further updates expected). */ export const TERMINAL_RUN_STATUSES: ReadonlySet = new Set([ "completed", + // legacy status kept for backward compatibility "succeeded", "failed", "canceled", @@ -99,6 +101,7 @@ export const TERMINAL_RUN_STATUSES: ReadonlySet = new Set([ const CANCELABLE_RUN_STATUSES: ReadonlySet = new Set([ "pending", "running", + // legacy status kept for backward compatibility "sleeping", ]); diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index c6c59a56..24844d6a 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -161,8 +161,8 @@ export class OpenWorkflow { } /** - * Cancels the workflow run with the given ID. Only workflow runs in pending, running, or sleeping - * status can be canceled. + * Cancels the workflow run with the given ID. Workflow runs in `pending`, + * `running`, or legacy `sleeping` status can be canceled. * @param workflowRunId - The ID of the workflow run to cancel * @returns Promise * @example @@ -338,8 +338,8 @@ class WorkflowRunHandle { } /** - * Cancels the workflow run. Only workflows in pending, running, or sleeping - * status can be canceled. + * Cancels the workflow run. Workflows in `pending`, `running`, or legacy + * `sleeping` status can be canceled. */ async cancel(): Promise { await this.backend.cancelWorkflowRun({ diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 54696dbb..7110fa29 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -202,7 +202,7 @@ export function toWorkflowRunCounts( const counts: WorkflowRunCounts = { pending: 0, running: 0, - sleeping: 0, + sleeping: 0, // deprecated, retained for backward compatibility completed: 0, failed: 0, canceled: 0, diff --git a/packages/openworkflow/core/workflow-run.ts b/packages/openworkflow/core/workflow-run.ts index 5f85421f..8c91f7e2 100644 --- a/packages/openworkflow/core/workflow-run.ts +++ b/packages/openworkflow/core/workflow-run.ts @@ -8,7 +8,7 @@ import type { StandardSchemaV1 } from "./standard-schema.js"; export type WorkflowRunStatus = | "pending" | "running" - | "sleeping" + | "sleeping" // deprecated in favor of staying 'running' | "succeeded" // deprecated in favor of 'completed' | "completed" | "failed" diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index 63a6c1fc..abec0ccb 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -216,3 +216,57 @@ describe("BackendPostgres cancel fallback", () => { } }); }); + +describe("BackendPostgres legacy sleeping compatibility", () => { + test("claims workflow runs persisted with legacy sleeping status", async () => { + const namespaceId = randomUUID(); + const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { + namespaceId, + }); + + try { + const run = await backend.createWorkflowRun({ + workflowName: "legacy-sleeping-claim", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL); + try { + const workflowRunsTable = pg`${pg(DEFAULT_SCHEMA)}.${pg("workflow_runs")}`; + + await pg` + UPDATE ${workflowRunsTable} + SET + "status" = 'sleeping', + "worker_id" = NULL, + "available_at" = NOW() - INTERVAL '1 second', + "updated_at" = NOW() + WHERE "namespace_id" = ${namespaceId} + AND "id" = ${run.id} + `; + } finally { + await pg.end(); + } + + const workerId = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 60_000, + }); + + expect(claimed?.id).toBe(run.id); + expect(claimed?.status).toBe("running"); + expect(claimed?.workerId).toBe(workerId); + } finally { + await backend.stop(); + } + }); +}); diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 162527eb..02ba67fc 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -377,13 +377,13 @@ export class BackendPostgres implements Backend { } async sleepWorkflowRun(params: SleepWorkflowRunParams): Promise { - // 'succeeded' status is deprecated + // 'sleeping' and 'succeeded' statuses are deprecated const workflowRunsTable = this.workflowRunsTable(); const [updated] = await this.pg` UPDATE ${workflowRunsTable} SET - "status" = 'sleeping', + "status" = 'running', "available_at" = CASE WHEN "available_at" IS NOT NULL AND "available_at" <= NOW() THEN "available_at" @@ -585,7 +585,10 @@ export class BackendPostgres implements Backend { AND sa."id" = ${childWorkflowRun.parentStepAttemptId} AND wr."namespace_id" = sa."namespace_id" AND wr."id" = sa."workflow_run_id" - AND wr."status" = 'sleeping' + AND ( + wr."status" = 'sleeping' + OR (wr."status" = 'running' AND wr."worker_id" IS NULL) + ) `; } diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index 85889868..87457909 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -238,3 +238,58 @@ describe("BackendSqlite.setStepAttemptChildWorkflowRun error handling", () => { } }); }); + +describe("BackendSqlite legacy sleeping compatibility", () => { + test("claims workflow runs persisted with legacy sleeping status", async () => { + const namespaceId = randomUUID(); + const backend = BackendSqlite.connect(":memory:", { + namespaceId, + }); + + try { + const run = await backend.createWorkflowRun({ + workflowName: "legacy-sleeping-claim", + version: null, + idempotencyKey: null, + config: {}, + context: null, + input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + const internalBackend = backend as unknown as { + db: Database; + }; + const past = new Date(Date.now() - 1000).toISOString(); + internalBackend.db + .prepare( + ` + UPDATE "workflow_runs" + SET + "status" = 'sleeping', + "worker_id" = NULL, + "available_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + `, + ) + .run(past, past, namespaceId, run.id); + + const workerId = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 60_000, + }); + + expect(claimed?.id).toBe(run.id); + expect(claimed?.status).toBe("running"); + expect(claimed?.workerId).toBe(workerId); + } finally { + await backend.stop(); + } + }); +}); diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 3e870176..5c293aff 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -371,7 +371,7 @@ export class BackendSqlite implements Backend { const stmt = this.db.prepare(` UPDATE "workflow_runs" SET - "status" = 'sleeping', + "status" = 'running', "available_at" = CASE WHEN "available_at" IS NOT NULL AND "available_at" <= ? THEN "available_at" ELSE ? @@ -380,7 +380,7 @@ export class BackendSqlite implements Backend { "updated_at" = ? WHERE "namespace_id" = ? AND "id" = ? - AND "status" NOT IN ('completed', 'failed', 'canceled') + AND "status" NOT IN ('succeeded', 'completed', 'failed', 'canceled') AND "worker_id" = ? `); @@ -587,8 +587,8 @@ export class BackendSqlite implements Backend { return existing; } - // throw error for completed/failed workflows - if (["completed", "failed"].includes(existing.status)) { + // 'succeeded' status is deprecated + if (["succeeded", "completed", "failed"].includes(existing.status)) { throw new Error( `Cannot cancel workflow run ${params.workflowRunId} with status ${existing.status}`, ); @@ -632,7 +632,10 @@ export class BackendSqlite implements Backend { AND "id" = ? LIMIT 1 ) - AND "status" = 'sleeping' + AND ( + "status" = 'sleeping' + OR ("status" = 'running' AND "worker_id" IS NULL) + ) `); stmt.run( diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 7f352dd9..ede9d8ce 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -708,12 +708,12 @@ export function testBackend(options: TestBackendOptions): void { const runningWorkerId = runningRun.workerId; if (!runningWorkerId) throw new Error("Expected workerId"); - const sleepingRun = await createClaimedWorkflowRun(backend); - const sleepingWorkerId = sleepingRun.workerId; - if (!sleepingWorkerId) throw new Error("Expected workerId"); + const parkedRun = await createClaimedWorkflowRun(backend); + const parkedWorkerId = parkedRun.workerId; + if (!parkedWorkerId) throw new Error("Expected workerId"); await backend.sleepWorkflowRun({ - workflowRunId: sleepingRun.id, - workerId: sleepingWorkerId, + workflowRunId: parkedRun.id, + workerId: parkedWorkerId, availableAt: new Date(Date.now() + 60_000), }); @@ -746,8 +746,8 @@ export function testBackend(options: TestBackendOptions): void { expect(await backend.countWorkflowRuns()).toEqual({ pending: 1, - running: 1, - sleeping: 1, + running: 2, + sleeping: 0, completed: 1, failed: 1, canceled: 1, @@ -904,7 +904,7 @@ export function testBackend(options: TestBackendOptions): void { }); describe("sleepWorkflowRun()", () => { - test("sets a running workflow to sleeping status until a future time", async () => { + test("parks a running workflow with a future availableAt", async () => { const workerId = randomUUID(); await createPendingWorkflowRun(backend); @@ -929,7 +929,7 @@ export function testBackend(options: TestBackendOptions): void { expect(fetched).not.toBeNull(); expect(fetched?.availableAt?.getTime()).toBe(sleepUntil.getTime()); expect(fetched?.workerId).toBeNull(); - expect(fetched?.status).toBe("sleeping"); + expect(fetched?.status).toBe("running"); }); test("fails when trying to sleep a canceled workflow", async () => { @@ -1008,7 +1008,7 @@ export function testBackend(options: TestBackendOptions): void { expect(completed.availableAt).toBeNull(); }); - test("wakes a sleeping parent run when a child run completes", async () => { + test("wakes a parked parent run when a child run completes", async () => { const backend = await setup(); const parentRun = await createClaimedWorkflowRun(backend); @@ -1057,7 +1057,8 @@ export function testBackend(options: TestBackendOptions): void { const parentAfter = await backend.getWorkflowRun({ workflowRunId: parentRun.id, }); - expect(parentAfter?.status).toBe("sleeping"); + expect(parentAfter?.status).toBe("running"); + expect(parentAfter?.workerId).toBeNull(); expect(parentAfter?.availableAt).not.toBeNull(); if (!parentAfter?.availableAt) { throw new Error("Expected parent availableAt after child completion"); @@ -1285,7 +1286,8 @@ export function testBackend(options: TestBackendOptions): void { const parentAfter = await backend.getWorkflowRun({ workflowRunId: parentRun.id, }); - expect(parentAfter?.status).toBe("sleeping"); + expect(parentAfter?.status).toBe("running"); + expect(parentAfter?.workerId).toBeNull(); expect(parentAfter?.availableAt).not.toBeNull(); if (!parentAfter?.availableAt) { throw new Error("Expected parent availableAt after child retry"); @@ -2039,22 +2041,23 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); - test("cancels a sleeping workflow run", async () => { + test("cancels a parked workflow run", async () => { const backend = await setup(); const claimed = await createClaimedWorkflowRun(backend); // put workflow to sleep const sleepUntil = new Date(Date.now() + 60_000); // 1 minute from now - const sleeping = await backend.sleepWorkflowRun({ + const parked = await backend.sleepWorkflowRun({ workflowRunId: claimed.id, workerId: claimed.workerId ?? "", availableAt: sleepUntil, }); - expect(sleeping.status).toBe("sleeping"); + expect(parked.status).toBe("running"); + expect(parked.workerId).toBeNull(); const canceled = await backend.cancelWorkflowRun({ - workflowRunId: sleeping.id, + workflowRunId: parked.id, }); expect(canceled.status).toBe("canceled"); diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 591813aa..aecdd688 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -258,7 +258,7 @@ describe("StepExecutor", () => { await expect(handle.result()).rejects.toThrow(/deadline exceeded/); }); - test("sleep puts workflow in sleeping status", async () => { + test("sleep parks workflow in running status", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -272,21 +272,17 @@ describe("StepExecutor", () => { const handle = await workflow.run(); const worker = client.newWorker(); - const status = await tickUntilStatus( + const parkedRun = await tickUntilParked( backend, worker, handle.workflowRun.id, - "sleeping", 200, 20, ); - const workflowRun = await backend.getWorkflowRun({ - workflowRunId: handle.workflowRun.id, - }); - expect(status).toBe("sleeping"); - expect(workflowRun?.status).toBe("sleeping"); - expect(workflowRun?.availableAt).not.toBeNull(); + expect(parkedRun.status).toBe("running"); + expect(parkedRun.workerId).toBeNull(); + expect(parkedRun.availableAt).not.toBeNull(); }); test("workflow resumes after sleep duration", async () => { @@ -308,10 +304,11 @@ describe("StepExecutor", () => { // First tick - hits sleep await worker.tick(); await sleep(50); // Wait for tick to complete - const sleeping = await backend.getWorkflowRun({ + const parked = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(sleeping?.status).toBe("sleeping"); + expect(parked?.status).toBe("running"); + expect(parked?.workerId).toBeNull(); // Wait for sleep to elapse await sleep(50); @@ -1145,7 +1142,8 @@ describe("StepExecutor", () => { const parentAfterFirstPass = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(parentAfterFirstPass?.status).toBe("sleeping"); + expect(parentAfterFirstPass?.status).toBe("running"); + expect(parentAfterFirstPass?.workerId).toBeNull(); const attempts = await backend.listStepAttempts({ workflowRunId: handle.workflowRun.id, @@ -1385,7 +1383,8 @@ describe("StepExecutor", () => { const parentAfterFirstPass = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(parentAfterFirstPass?.status).toBe("sleeping"); + expect(parentAfterFirstPass?.status).toBe("running"); + expect(parentAfterFirstPass?.workerId).toBeNull(); await sleep(150); @@ -1455,25 +1454,16 @@ describe("StepExecutor", () => { const worker = client.newWorker({ concurrency: 2 }); const handle = await parent.run(); - const parentSleepingStatus = await tickUntilStatus( + const parkedParent = await tickUntilParked( backend, worker, handle.workflowRun.id, - "sleeping", 200, 20, ); - expect(parentSleepingStatus).toBe("sleeping"); - - const sleepingParent = await backend.getWorkflowRun({ - workflowRunId: handle.workflowRun.id, - }); - if (!sleepingParent?.availableAt) { - throw new Error("Expected parent invoke wait availableAt"); - } const millisecondsUntilWake = - sleepingParent.availableAt.getTime() - Date.now(); + parkedParent.availableAt.getTime() - Date.now(); expect(millisecondsUntilWake).toBeGreaterThan(6 * 24 * 60 * 60 * 1000); expect(millisecondsUntilWake).toBeLessThan(8 * 24 * 60 * 60 * 1000); @@ -1661,15 +1651,7 @@ describe("StepExecutor", () => { const worker = client.newWorker({ concurrency: 2 }); const handle = await parent.run(); - const parentSleepingStatus = await tickUntilStatus( - backend, - worker, - handle.workflowRun.id, - "sleeping", - 300, - 20, - ); - expect(parentSleepingStatus).toBe("sleeping"); + await tickUntilParked(backend, worker, handle.workflowRun.id, 300, 20); const steps = await backend.listStepAttempts({ workflowRunId: handle.workflowRun.id, @@ -2031,7 +2013,7 @@ describe("executeWorkflow", () => { }); describe("sleep handling", () => { - test("workflow enters sleeping status", async () => { + test("workflow parks in running status", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -2050,7 +2032,8 @@ describe("executeWorkflow", () => { const workflowRun = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(workflowRun?.status).toBe("sleeping"); + expect(workflowRun?.status).toBe("running"); + expect(workflowRun?.workerId).toBeNull(); }); test("resumes workflow after sleep duration", async () => { @@ -2073,10 +2056,11 @@ describe("executeWorkflow", () => { await worker.tick(); await sleep(50); - const sleeping = await backend.getWorkflowRun({ + const parked = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(sleeping?.status).toBe("sleeping"); + expect(parked?.status).toBe("running"); + expect(parked?.workerId).toBeNull(); // wait for sleep await sleep(50); @@ -2441,6 +2425,11 @@ function sleep(ms: number): Promise { } const TERMINAL_RUN_STATUSES = new Set(["completed", "failed", "canceled"]); +type ParkedWorkflowRun = WorkflowRun & { + status: "running"; + workerId: null; + availableAt: Date; +}; async function tickUntilTerminal( backend: BackendPostgres, @@ -2483,6 +2472,31 @@ async function tickUntilStatus( ); } +async function tickUntilParked( + backend: BackendPostgres, + worker: ReturnType, + workflowRunId: string, + maxTicks: number, + sleepMs: number, +): Promise { + for (let i = 0; i < maxTicks; i++) { + await worker.tick(); + const run = await backend.getWorkflowRun({ workflowRunId }); + if ( + run?.status === "running" && + run.workerId === null && + run.availableAt !== null + ) { + return run as ParkedWorkflowRun; + } + await sleep(sleepMs); + } + + throw new Error( + `Timed out waiting for workflow run ${workflowRunId} to park`, + ); +} + function createMockStepAttempt( overrides: Partial = {}, ): StepAttempt { diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 9d959adf..34f14047 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -710,10 +710,10 @@ export interface ExecuteWorkflowParams { /** * Execute a workflow run. This is the core application use case that handles: * - Loading step history - * - Handling sleeping steps + * - Handling paused (sleep/invoke wait) steps * - Creating the step executor * - Executing the workflow function - * - Completing, failing, or sleeping the workflow run based on the outcome + * - Completing, failing, or parking the workflow run based on the outcome * @param params - The execution parameters */ // eslint-disable-next-line sonarjs/cognitive-complexity @@ -803,7 +803,7 @@ export async function executeWorkflow( output: (output ?? null) as JsonValue, }); } catch (error) { - // handle sleep signal by setting workflow to sleeping status + // handle sleep signal by parking the workflow in running status if (error instanceof SleepSignal) { await backend.sleepWorkflowRun({ workflowRunId: workflowRun.id, diff --git a/packages/openworkflow/worker/worker.test.ts b/packages/openworkflow/worker/worker.test.ts index edecd8c6..e9c0c9f7 100644 --- a/packages/openworkflow/worker/worker.test.ts +++ b/packages/openworkflow/worker/worker.test.ts @@ -614,11 +614,11 @@ describe("Worker", () => { await sleep(50); // wait for processing expect(stepCount).toBe(1); - // verify workflow was postponed with sleeping status + // verify workflow was postponed while remaining in running status const slept = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(slept?.status).toBe("sleeping"); + expect(slept?.status).toBe("running"); expect(slept?.workerId).toBeNull(); // released during sleep expect(slept?.availableAt).not.toBeNull(); if (!slept?.availableAt) throw new Error("availableAt should be set"); @@ -806,12 +806,12 @@ describe("Worker", () => { ); }); - test("sleeping workflows can be claimed after availableAt", async () => { + test("parked workflows can be claimed after availableAt", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const workflow = client.defineWorkflow( - { name: "sleeping-claim-test" }, + { name: "parked-claim-test" }, async ({ step }) => { await step.run({ name: "before" }, () => "before"); await step.sleep("wait", "100ms"); @@ -827,12 +827,12 @@ describe("Worker", () => { await worker.tick(); await sleep(50); - // verify workflow is in sleeping state - const sleeping = await backend.getWorkflowRun({ + // verify workflow is parked in running state + const parked = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(sleeping?.status).toBe("sleeping"); - expect(sleeping?.workerId).toBeNull(); + expect(parked?.status).toBe("running"); + expect(parked?.workerId).toBeNull(); // wait for sleep duration await sleep(100); @@ -847,7 +847,7 @@ describe("Worker", () => { expect(claimed?.workerId).toBe("test-worker"); }); - test("sleep is not skipped when worker crashes after creating sleep step but before marking workflow as sleeping", async () => { + test("sleep is not skipped when worker crashes after creating sleep step but before parking workflow", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -888,7 +888,8 @@ describe("Worker", () => { workflowRunId: handle.workflowRun.id, }); - expect(workflowAfterFirst?.status).toBe("sleeping"); + expect(workflowAfterFirst?.status).toBe("running"); + expect(workflowAfterFirst?.workerId).toBeNull(); const attemptsAfterFirst = await backend.listStepAttempts({ workflowRunId: handle.workflowRun.id, @@ -902,8 +903,7 @@ describe("Worker", () => { await sleep(50); // only 50ms of the 200ms sleep - // if there's a running sleep step, the workflow should be properly - // transitioned to sleeping + // if there's a running sleep step, the workflow should be properly parked const worker2 = client.newWorker(); await worker2.tick(); @@ -989,12 +989,12 @@ describe("Worker", () => { expect(workflowRun?.workerId).toBeNull(); }); - test("cancels a sleeping workflow", async () => { + test("cancels a parked workflow", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const workflow = client.defineWorkflow( - { name: "cancel-sleeping" }, + { name: "cancel-parked" }, async ({ step }) => { await step.sleep("sleep-1", "1h"); return { completed: true }; @@ -1005,7 +1005,7 @@ describe("Worker", () => { const handle = await workflow.run(); await worker.tick(); - // cancel while sleeping + // cancel while parked await handle.cancel(); const canceled = await backend.getWorkflowRun({ @@ -1761,7 +1761,8 @@ describe("Worker", () => { let run = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); - expect(run?.status).toBe("sleeping"); + expect(run?.status).toBe("running"); + expect(run?.workerId).toBeNull(); await sleep(80); // wait for sleep step to elapse