Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ specific `stepName` in the workflow run. If retryable, the workflow run is
rescheduled by setting `availableAt` to the computed backoff time. On the next
execution, replay reaches the failed step and re-executes its function.

To prevent runaway workflows from accumulating unbounded step history, execution
enforces a default hard cap of 1000 step attempts per workflow run. When that
limit is reached, the run fails immediately and is not retried.

### 4.2. Workflow Failures & Retries

If an error is unhandled by the workflow code, the entire workflow run fails.
Expand Down
7 changes: 6 additions & 1 deletion packages/docs/docs/retries.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ Steps are attempted up to 10 times by default (one initial attempt plus up to
nine retries). If the step still fails after all attempts, the workflow is
permanently marked as `failed`.

To prevent runaway executions, each workflow run also has a hard cap of 1000
total step attempts. If the run reaches that cap, it fails immediately and is
not retried.

## Step Retries

Steps that throw are retried automatically:
Expand Down Expand Up @@ -167,7 +171,8 @@ defineWorkflow({ name: "with-error-handling" }, async ({ input, step }) => {
## Terminal Failures

A workflow is permanently marked `failed` when step retries are exhausted
(`maximumAttempts` reached) or `deadlineAt` expires.
(`maximumAttempts` reached), `deadlineAt` expires, or the run exceeds the step
attempt cap.

Once terminal, no more automatic retries occur. You can inspect and manually
retry failed workflows from the [dashboard](/docs/dashboard).
Expand Down
3 changes: 3 additions & 0 deletions packages/docs/docs/steps.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ await step.run({ name: "risky-operation" }, async () => {
Steps that throw are marked as failed. The workflow can be retried, and failed
steps will re-execute (not return cached results).

Each workflow run also has a hard cap of 1000 total step attempts. If that
limit is reached, the run fails and does not retry.

## What Makes a Good Step

Each step should be a meaningful unit of work. Good steps:
Expand Down
2 changes: 1 addition & 1 deletion packages/docs/docs/workflows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ A workflow run progresses through these states:
| `running` | Actively being executed by a worker |
| `sleeping` | Paused while waiting for `step.sleep` or `step.invokeWorkflow` |
| `completed` | Finished successfully |
| `failed` | Failed after exhausting retries or deadline reached |
| `failed` | Failed after exhausting retries, hitting deadline, or step cap |
| `canceled` | Explicitly canceled and will not continue |

## Determinism
Expand Down
213 changes: 213 additions & 0 deletions packages/openworkflow/worker/execution.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { OpenWorkflow } from "../client/client.js";
import type { Backend } from "../core/backend.js";
import type { DurationString } from "../core/duration.js";
import type { StepAttempt } from "../core/step-attempt.js";
import { DEFAULT_WORKFLOW_RETRY_POLICY } from "../core/workflow-definition.js";
import type { WorkflowFunctionParams } from "../core/workflow-function.js";
import type { WorkflowRun } from "../core/workflow-run.js";
import { BackendPostgres } from "../postgres.js";
import { DEFAULT_POSTGRES_URL } from "../postgres/postgres.js";
import {
WORKFLOW_STEP_LIMIT,
STEP_LIMIT_EXCEEDED_ERROR_CODE,
createStepExecutionStateFromAttempts,
executeWorkflow,
} from "./execution.js";
Expand Down Expand Up @@ -1494,6 +1499,185 @@ describe("executeWorkflow", () => {
});

describe("error handling", () => {
test("fails terminally when replay step history reaches the step limit", async () => {
const attempts = Array.from({ length: WORKFLOW_STEP_LIMIT }, (_, index) =>
createMockStepAttempt({
id: `history-step-${String(index)}`,
stepName: `history-step-${String(index)}`,
status: "completed",
}),
);

const listStepAttempts = vi.fn(() =>
Promise.resolve({
data: attempts,
pagination: { next: null, prev: null },
}),
);
const failWorkflowRun = vi.fn(
(params: Parameters<Backend["failWorkflowRun"]>[0]) => {
if (params.workflowRunId.length === 0) {
throw new TypeError("Expected workflowRunId");
}
return Promise.resolve(
createMockWorkflowRun({
status: "failed",
error: {
code: STEP_LIMIT_EXCEEDED_ERROR_CODE,
message: "step limit exceeded",
},
}),
);
},
);
const workflowFn = vi.fn(() => "unreachable");
const workflowRun = createMockWorkflowRun({
id: "replay-step-limit-run",
workerId: "worker-step-limit",
});

await executeWorkflow({
backend: {
listStepAttempts,
failWorkflowRun,
} as unknown as Backend,
workflowRun,
workflowFn,
workflowVersion: null,
workerId: "worker-step-limit",
retryPolicy: {
...DEFAULT_WORKFLOW_RETRY_POLICY,
maximumAttempts: 5,
},
});

expect(workflowFn).not.toHaveBeenCalled();
expect(listStepAttempts).toHaveBeenCalledTimes(1);

const failCall = failWorkflowRun.mock.calls[0]?.[0];
if (!failCall) throw new Error("Expected failWorkflowRun call");

expect(failCall.workflowRunId).toBe(workflowRun.id);
expect(failCall.workerId).toBe("worker-step-limit");
expect(failCall.retryPolicy).toEqual(DEFAULT_WORKFLOW_RETRY_POLICY);
expect(failCall.error["code"]).toBe(STEP_LIMIT_EXCEEDED_ERROR_CODE);
expect(failCall.error["limit"]).toBe(WORKFLOW_STEP_LIMIT);
expect(failCall.error["stepCount"]).toBe(WORKFLOW_STEP_LIMIT);
if (typeof failCall.error.message !== "string") {
throw new TypeError("Expected step-limit message to be a string");
}
expect(failCall.error.message).toMatch(/exceeded the step limit/i);
});

test("fails terminally when new steps would exceed the step limit", async () => {
const stepNamesByAttemptId = new Map<string, string>();
const listStepAttempts = vi.fn(() =>
Promise.resolve({
data: Array.from({ length: WORKFLOW_STEP_LIMIT - 1 }, (_, index) =>
createMockStepAttempt({
id: `existing-step-${String(index)}`,
stepName: `existing-step-${String(index)}`,
status: "completed",
}),
),
pagination: { next: null, prev: null },
}),
);
const createStepAttempt = vi.fn(
(params: Parameters<Backend["createStepAttempt"]>[0]) => {
const createdId = `created-${params.stepName}`;
stepNamesByAttemptId.set(createdId, params.stepName);
return Promise.resolve(
createMockStepAttempt({
id: createdId,
stepName: params.stepName,
kind: params.kind,
status: "running",
output: null,
finishedAt: null,
}),
);
},
);
const completeStepAttempt = vi.fn(
(params: Parameters<Backend["completeStepAttempt"]>[0]) => {
const stepName = stepNamesByAttemptId.get(params.stepAttemptId);
if (!stepName) {
throw new Error(`Missing step name for ${params.stepAttemptId}`);
}
return Promise.resolve(
createMockStepAttempt({
id: params.stepAttemptId,
stepName,
status: "completed",
output: params.output ?? null,
}),
);
},
);
const failWorkflowRun = vi.fn(
(params: Parameters<Backend["failWorkflowRun"]>[0]) => {
if (params.workflowRunId.length === 0) {
throw new TypeError("Expected workflowRunId");
}
return Promise.resolve(
createMockWorkflowRun({
status: "failed",
error: {
code: STEP_LIMIT_EXCEEDED_ERROR_CODE,
message: "step limit exceeded",
},
}),
);
},
);
const workflowRun = createMockWorkflowRun({
id: "runtime-step-limit-run",
workerId: "worker-step-runtime",
});
const workflowFn = vi.fn(
async ({ step }: WorkflowFunctionParams<unknown>) => {
await step.run({ name: "new-step-1" }, () => "first");
await step.run({ name: "new-step-2" }, () => "second");
return "unreachable";
},
);

await executeWorkflow({
backend: {
listStepAttempts,
createStepAttempt,
completeStepAttempt,
failWorkflowRun,
} as unknown as Backend,
workflowRun,
workflowFn,
workflowVersion: null,
workerId: "worker-step-runtime",
retryPolicy: {
...DEFAULT_WORKFLOW_RETRY_POLICY,
maximumAttempts: 5,
},
});

expect(createStepAttempt).toHaveBeenCalledTimes(1);
expect(completeStepAttempt).toHaveBeenCalledTimes(1);

const failCall = failWorkflowRun.mock.calls[0]?.[0];
if (!failCall) throw new Error("Expected failWorkflowRun call");

expect(failCall.workflowRunId).toBe(workflowRun.id);
expect(failCall.workerId).toBe("worker-step-runtime");
expect(failCall.retryPolicy).toEqual(DEFAULT_WORKFLOW_RETRY_POLICY);
expect(failCall.error["code"]).toBe(STEP_LIMIT_EXCEEDED_ERROR_CODE);
expect(failCall.error["limit"]).toBe(WORKFLOW_STEP_LIMIT);
expect(failCall.error["stepCount"]).toBe(WORKFLOW_STEP_LIMIT);
if (typeof failCall.error.message !== "string") {
throw new TypeError("Expected step-limit message to be a string");
}
expect(failCall.error.message).toMatch(/exceeded the step limit/i);
});

test("handles workflow errors with deadline exceeded", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });
Expand Down Expand Up @@ -2039,3 +2223,32 @@ function createMockStepAttempt(
...overrides,
};
}

function createMockWorkflowRun(
overrides: Partial<WorkflowRun> = {},
): WorkflowRun {
return {
namespaceId: "default",
id: "workflow-run-id",
workflowName: "workflow-name",
version: null,
status: "running",
idempotencyKey: null,
config: {},
context: null,
input: null,
output: null,
error: null,
attempts: 1,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
workerId: "worker-id",
availableAt: null,
deadlineAt: null,
startedAt: new Date("2026-01-01T00:00:00.000Z"),
finishedAt: null,
createdAt: new Date("2026-01-01T00:00:00.000Z"),
updatedAt: new Date("2026-01-01T00:00:00.000Z"),
...overrides,
};
}
Loading
Loading