Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ A workflow run can be in one of the following states:

- **`pending`**: The workflow run has been created and is waiting for a worker
to claim it.
- **`running`**: The workflow run is actively being executed by a worker.
- **`sleeping`**: The workflow run is waiting for a duration to elapse
(`step.sleep`) or waiting for a child workflow result (`step.invokeWorkflow`).
The `availableAt` timestamp controls when it becomes available again.
- **`running`**: The workflow run is either actively being executed by a worker
or durably parked with `workerId = null` until `availableAt`.
- **`sleeping`** (deprecated): Legacy parked state kept for backward
compatibility. New runs are parked in `running` instead.
- **`succeeded`** (deprecated): Legacy success state kept for backward
compatibility. New successful runs use `completed`.
- **`completed`**: The workflow run has completed successfully.
- **`failed`**: The workflow run has failed after exhausting retries or deadline
reached.
Expand All @@ -64,6 +66,8 @@ A workflow run can be in one of the following states:
A step attempt can be in one of the following states:

- **`running`**: The step attempt is currently being executed.
- **`succeeded`** (deprecated): Legacy success state kept for backward
compatibility. New successful attempts use `completed`.
- **`completed`**: The step attempt completed successfully and its result is
stored.
- **`failed`**: The step attempt failed. The workflow may create a new attempt
Expand Down Expand Up @@ -133,10 +137,10 @@ of coordination. There is no separate orchestrator server.
table with a `pending` status.
3. **Job Polling**: A **Worker** process polls the `workflow_runs` table,
looking for runs whose `availableAt` timestamp is in the past and whose
status is either `pending` (new work), `sleeping` (but done), or `running`
with an expired lease. It uses an atomic `FOR UPDATE SKIP LOCKED` query to
claim a single workflow run, setting its status to `running` and extending
the lease.
status is either `pending` (new work), `running` (parked or with an expired
lease), or legacy `sleeping`. It uses an atomic `FOR UPDATE SKIP LOCKED`
query to claim a single workflow run, setting its status to `running` and
extending the lease.
4. **Code Execution (Replay Loop)**: The Worker loads the history of completed
`step_attempts` for the claimed workflow. It then executes the workflow code
from the beginning, using the history to memoize results of
Expand All @@ -148,7 +152,7 @@ of coordination. There is no separate orchestrator server.
sleep.
6. **State Update**: The Worker updates the Backend with each `step_attempt` as
it is created and completed, and updates the status of the `workflow_run`
(e.g., `completed`, `sleeping`).
(e.g., `completed`, `running` for parked waits).

## 3. The Execution Model: State Machine Replication

Expand Down Expand Up @@ -215,9 +219,10 @@ const user = await step.run({ name: "fetch-user" }, async () => {
```

**`step.sleep(name, duration)`**: Pauses the workflow until a specified time.
When encountered, the worker sets the workflow run's `status` to `sleeping` and
`availableAt` to the resume time, then releases the workflow. This frees up the
worker slot for other work - it's not a blocking sleep but a durable pause.
When encountered, the worker keeps the workflow run's `status` as `running`,
sets `availableAt` to the resume time, clears `workerId`, and releases the
workflow. This frees up the worker slot for other work - it's not a blocking
sleep but a durable pause.

```ts
await step.sleep("wait-one-hour", "1h");
Expand Down
3 changes: 3 additions & 0 deletions packages/dashboard/src/lib/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export const STATUS_CONFIG: Record<
badgeClass: "bg-warning/10 border-warning/20 text-warning",
},
sleeping: {
// legacy status kept for backward compatibility
icon: HourglassIcon,
color: "text-sleeping",
label: "Sleeping",
Expand Down Expand Up @@ -90,6 +91,7 @@ export const STEP_STATUS_CONFIG: Record<
/** Run statuses that represent a finished workflow (no further updates expected). */
export const TERMINAL_RUN_STATUSES: ReadonlySet<WorkflowRunStatus> = new Set([
"completed",
// legacy status kept for backward compatibility
"succeeded",
"failed",
"canceled",
Expand All @@ -99,6 +101,7 @@ export const TERMINAL_RUN_STATUSES: ReadonlySet<WorkflowRunStatus> = new Set([
const CANCELABLE_RUN_STATUSES: ReadonlySet<WorkflowRunStatus> = new Set([
"pending",
"running",
// legacy status kept for backward compatibility
"sleeping",
]);

Expand Down
8 changes: 4 additions & 4 deletions packages/openworkflow/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ export class OpenWorkflow {
}

/**
* Cancels the workflow run with the given ID. Only workflow runs in pending, running, or sleeping
* status can be canceled.
* Cancels the workflow run with the given ID. Workflow runs in `pending`,
* `running`, or legacy `sleeping` status can be canceled.
* @param workflowRunId - The ID of the workflow run to cancel
* @returns Promise<void>
* @example
Expand Down Expand Up @@ -338,8 +338,8 @@ class WorkflowRunHandle<Output> {
}

/**
* Cancels the workflow run. Only workflows in pending, running, or sleeping
* status can be canceled.
* Cancels the workflow run. Workflows in `pending`, `running`, or legacy
* `sleeping` status can be canceled.
*/
async cancel(): Promise<void> {
await this.backend.cancelWorkflowRun({
Expand Down
2 changes: 1 addition & 1 deletion packages/openworkflow/core/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export function toWorkflowRunCounts(
const counts: WorkflowRunCounts = {
pending: 0,
running: 0,
sleeping: 0,
sleeping: 0, // deprecated, retained for backward compatibility
completed: 0,
failed: 0,
canceled: 0,
Expand Down
2 changes: 1 addition & 1 deletion packages/openworkflow/core/workflow-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { StandardSchemaV1 } from "./standard-schema.js";
export type WorkflowRunStatus =
| "pending"
| "running"
| "sleeping"
| "sleeping" // deprecated in favor of staying 'running'
| "succeeded" // deprecated in favor of 'completed'
| "completed"
| "failed"
Expand Down
54 changes: 54 additions & 0 deletions packages/openworkflow/postgres/backend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,57 @@ describe("BackendPostgres cancel fallback", () => {
}
});
});

describe("BackendPostgres legacy sleeping compatibility", () => {
test("claims workflow runs persisted with legacy sleeping status", async () => {
const namespaceId = randomUUID();
const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
namespaceId,
});

try {
const run = await backend.createWorkflowRun({
workflowName: "legacy-sleeping-claim",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
availableAt: null,
deadlineAt: null,
});

const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
try {
const workflowRunsTable = pg`${pg(DEFAULT_SCHEMA)}.${pg("workflow_runs")}`;

await pg`
UPDATE ${workflowRunsTable}
SET
"status" = 'sleeping',
"worker_id" = NULL,
"available_at" = NOW() - INTERVAL '1 second',
"updated_at" = NOW()
WHERE "namespace_id" = ${namespaceId}
AND "id" = ${run.id}
`;
} finally {
await pg.end();
}

const workerId = randomUUID();
const claimed = await backend.claimWorkflowRun({
workerId,
leaseDurationMs: 60_000,
});

expect(claimed?.id).toBe(run.id);
expect(claimed?.status).toBe("running");
expect(claimed?.workerId).toBe(workerId);
} finally {
await backend.stop();
}
});
});
9 changes: 6 additions & 3 deletions packages/openworkflow/postgres/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,13 @@ export class BackendPostgres implements Backend {
}

async sleepWorkflowRun(params: SleepWorkflowRunParams): Promise<WorkflowRun> {
// 'succeeded' status is deprecated
// 'sleeping' and 'succeeded' statuses are deprecated
const workflowRunsTable = this.workflowRunsTable();

const [updated] = await this.pg<WorkflowRun[]>`
UPDATE ${workflowRunsTable}
SET
"status" = 'sleeping',
"status" = 'running',
"available_at" = CASE
WHEN "available_at" IS NOT NULL AND "available_at" <= NOW()
THEN "available_at"
Expand Down Expand Up @@ -585,7 +585,10 @@ export class BackendPostgres implements Backend {
AND sa."id" = ${childWorkflowRun.parentStepAttemptId}
AND wr."namespace_id" = sa."namespace_id"
AND wr."id" = sa."workflow_run_id"
AND wr."status" = 'sleeping'
AND (
wr."status" = 'sleeping'
OR (wr."status" = 'running' AND wr."worker_id" IS NULL)
)
`;
}

Expand Down
55 changes: 55 additions & 0 deletions packages/openworkflow/sqlite/backend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,58 @@ describe("BackendSqlite.setStepAttemptChildWorkflowRun error handling", () => {
}
});
});

describe("BackendSqlite legacy sleeping compatibility", () => {
test("claims workflow runs persisted with legacy sleeping status", async () => {
const namespaceId = randomUUID();
const backend = BackendSqlite.connect(":memory:", {
namespaceId,
});

try {
const run = await backend.createWorkflowRun({
workflowName: "legacy-sleeping-claim",
version: null,
idempotencyKey: null,
config: {},
context: null,
input: null,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
availableAt: null,
deadlineAt: null,
});

const internalBackend = backend as unknown as {
db: Database;
};
const past = new Date(Date.now() - 1000).toISOString();
internalBackend.db
.prepare(
`
UPDATE "workflow_runs"
SET
"status" = 'sleeping',
"worker_id" = NULL,
"available_at" = ?,
"updated_at" = ?
WHERE "namespace_id" = ?
AND "id" = ?
`,
)
.run(past, past, namespaceId, run.id);

const workerId = randomUUID();
const claimed = await backend.claimWorkflowRun({
workerId,
leaseDurationMs: 60_000,
});

expect(claimed?.id).toBe(run.id);
expect(claimed?.status).toBe("running");
expect(claimed?.workerId).toBe(workerId);
} finally {
await backend.stop();
}
});
});
13 changes: 8 additions & 5 deletions packages/openworkflow/sqlite/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ export class BackendSqlite implements Backend {
const stmt = this.db.prepare(`
UPDATE "workflow_runs"
SET
"status" = 'sleeping',
"status" = 'running',
"available_at" = CASE
WHEN "available_at" IS NOT NULL AND "available_at" <= ? THEN "available_at"
ELSE ?
Expand All @@ -380,7 +380,7 @@ export class BackendSqlite implements Backend {
"updated_at" = ?
WHERE "namespace_id" = ?
AND "id" = ?
AND "status" NOT IN ('completed', 'failed', 'canceled')
AND "status" NOT IN ('succeeded', 'completed', 'failed', 'canceled')
AND "worker_id" = ?
`);

Expand Down Expand Up @@ -587,8 +587,8 @@ export class BackendSqlite implements Backend {
return existing;
}

// throw error for completed/failed workflows
if (["completed", "failed"].includes(existing.status)) {
// 'succeeded' status is deprecated
if (["succeeded", "completed", "failed"].includes(existing.status)) {
throw new Error(
`Cannot cancel workflow run ${params.workflowRunId} with status ${existing.status}`,
);
Expand Down Expand Up @@ -632,7 +632,10 @@ export class BackendSqlite implements Backend {
AND "id" = ?
LIMIT 1
)
AND "status" = 'sleeping'
AND (
"status" = 'sleeping'
OR ("status" = 'running' AND "worker_id" IS NULL)
)
`);

stmt.run(
Expand Down
Loading
Loading