From 38d216f2f79743dfb4341b384af065b9d79f2296 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Wed, 25 Feb 2026 16:27:33 -0600 Subject: [PATCH] feat(openworkflow): enforce a hard cap of 1000 step attempts per workflow run --- ARCHITECTURE.md | 4 + packages/docs/docs/retries.mdx | 7 +- packages/docs/docs/steps.mdx | 3 + packages/docs/docs/workflows.mdx | 2 +- .../openworkflow/worker/execution.test.ts | 213 ++++++++++++++++++ packages/openworkflow/worker/execution.ts | 79 ++++++- 6 files changed, 305 insertions(+), 3 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index b1f6eed2..adacc416 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -235,6 +235,10 @@ specific `stepName` in the workflow run. If retryable, the workflow run is rescheduled by setting `availableAt` to the computed backoff time. On the next execution, replay reaches the failed step and re-executes its function. +To prevent runaway workflows from accumulating unbounded step history, execution +enforces a default hard cap of 1000 step attempts per workflow run. When that +limit is reached, the run fails immediately and is not retried. + ### 4.2. Workflow Failures & Retries If an error is unhandled by the workflow code, the entire workflow run fails. diff --git a/packages/docs/docs/retries.mdx b/packages/docs/docs/retries.mdx index 23513ef6..e92ddd84 100644 --- a/packages/docs/docs/retries.mdx +++ b/packages/docs/docs/retries.mdx @@ -24,6 +24,10 @@ Steps are attempted up to 10 times by default (one initial attempt plus up to nine retries). If the step still fails after all attempts, the workflow is permanently marked as `failed`. +To prevent runaway executions, each workflow run also has a hard cap of 1000 +total step attempts. If the run reaches that cap, it fails immediately and is +not retried. + ## Step Retries Steps that throw are retried automatically: @@ -167,7 +171,8 @@ defineWorkflow({ name: "with-error-handling" }, async ({ input, step }) => { ## Terminal Failures A workflow is permanently marked `failed` when step retries are exhausted -(`maximumAttempts` reached) or `deadlineAt` expires. +(`maximumAttempts` reached), `deadlineAt` expires, or the run exceeds the step +attempt cap. Once terminal, no more automatic retries occur. You can inspect and manually retry failed workflows from the [dashboard](/docs/dashboard). diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 348ef25b..8761c056 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -195,6 +195,9 @@ await step.run({ name: "risky-operation" }, async () => { Steps that throw are marked as failed. The workflow can be retried, and failed steps will re-execute (not return cached results). +Each workflow run also has a hard cap of 1000 total step attempts. If that +limit is reached, the run fails and does not retry. + ## What Makes a Good Step Each step should be a meaningful unit of work. Good steps: diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 12a913a6..73db03d4 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -242,7 +242,7 @@ A workflow run progresses through these states: | `running` | Actively being executed by a worker | | `sleeping` | Paused while waiting for `step.sleep` or `step.invokeWorkflow` | | `completed` | Finished successfully | -| `failed` | Failed after exhausting retries or deadline reached | +| `failed` | Failed after exhausting retries, hitting deadline, or step cap | | `canceled` | Explicitly canceled and will not continue | ## Determinism diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 357e28bc..4ff68744 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -1,10 +1,15 @@ import { OpenWorkflow } from "../client/client.js"; +import type { Backend } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; import type { StepAttempt } from "../core/step-attempt.js"; import { DEFAULT_WORKFLOW_RETRY_POLICY } from "../core/workflow-definition.js"; +import type { WorkflowFunctionParams } from "../core/workflow-function.js"; +import type { WorkflowRun } from "../core/workflow-run.js"; import { BackendPostgres } from "../postgres.js"; import { DEFAULT_POSTGRES_URL } from "../postgres/postgres.js"; import { + WORKFLOW_STEP_LIMIT, + STEP_LIMIT_EXCEEDED_ERROR_CODE, createStepExecutionStateFromAttempts, executeWorkflow, } from "./execution.js"; @@ -1494,6 +1499,185 @@ describe("executeWorkflow", () => { }); describe("error handling", () => { + test("fails terminally when replay step history reaches the step limit", async () => { + const attempts = Array.from({ length: WORKFLOW_STEP_LIMIT }, (_, index) => + createMockStepAttempt({ + id: `history-step-${String(index)}`, + stepName: `history-step-${String(index)}`, + status: "completed", + }), + ); + + const listStepAttempts = vi.fn(() => + Promise.resolve({ + data: attempts, + pagination: { next: null, prev: null }, + }), + ); + const failWorkflowRun = vi.fn( + (params: Parameters[0]) => { + if (params.workflowRunId.length === 0) { + throw new TypeError("Expected workflowRunId"); + } + return Promise.resolve( + createMockWorkflowRun({ + status: "failed", + error: { + code: STEP_LIMIT_EXCEEDED_ERROR_CODE, + message: "step limit exceeded", + }, + }), + ); + }, + ); + const workflowFn = vi.fn(() => "unreachable"); + const workflowRun = createMockWorkflowRun({ + id: "replay-step-limit-run", + workerId: "worker-step-limit", + }); + + await executeWorkflow({ + backend: { + listStepAttempts, + failWorkflowRun, + } as unknown as Backend, + workflowRun, + workflowFn, + workflowVersion: null, + workerId: "worker-step-limit", + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 5, + }, + }); + + expect(workflowFn).not.toHaveBeenCalled(); + expect(listStepAttempts).toHaveBeenCalledTimes(1); + + const failCall = failWorkflowRun.mock.calls[0]?.[0]; + if (!failCall) throw new Error("Expected failWorkflowRun call"); + + expect(failCall.workflowRunId).toBe(workflowRun.id); + expect(failCall.workerId).toBe("worker-step-limit"); + expect(failCall.retryPolicy).toEqual(DEFAULT_WORKFLOW_RETRY_POLICY); + expect(failCall.error["code"]).toBe(STEP_LIMIT_EXCEEDED_ERROR_CODE); + expect(failCall.error["limit"]).toBe(WORKFLOW_STEP_LIMIT); + expect(failCall.error["stepCount"]).toBe(WORKFLOW_STEP_LIMIT); + if (typeof failCall.error.message !== "string") { + throw new TypeError("Expected step-limit message to be a string"); + } + expect(failCall.error.message).toMatch(/exceeded the step limit/i); + }); + + test("fails terminally when new steps would exceed the step limit", async () => { + const stepNamesByAttemptId = new Map(); + const listStepAttempts = vi.fn(() => + Promise.resolve({ + data: Array.from({ length: WORKFLOW_STEP_LIMIT - 1 }, (_, index) => + createMockStepAttempt({ + id: `existing-step-${String(index)}`, + stepName: `existing-step-${String(index)}`, + status: "completed", + }), + ), + pagination: { next: null, prev: null }, + }), + ); + const createStepAttempt = vi.fn( + (params: Parameters[0]) => { + const createdId = `created-${params.stepName}`; + stepNamesByAttemptId.set(createdId, params.stepName); + return Promise.resolve( + createMockStepAttempt({ + id: createdId, + stepName: params.stepName, + kind: params.kind, + status: "running", + output: null, + finishedAt: null, + }), + ); + }, + ); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => { + const stepName = stepNamesByAttemptId.get(params.stepAttemptId); + if (!stepName) { + throw new Error(`Missing step name for ${params.stepAttemptId}`); + } + return Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + stepName, + status: "completed", + output: params.output ?? null, + }), + ); + }, + ); + const failWorkflowRun = vi.fn( + (params: Parameters[0]) => { + if (params.workflowRunId.length === 0) { + throw new TypeError("Expected workflowRunId"); + } + return Promise.resolve( + createMockWorkflowRun({ + status: "failed", + error: { + code: STEP_LIMIT_EXCEEDED_ERROR_CODE, + message: "step limit exceeded", + }, + }), + ); + }, + ); + const workflowRun = createMockWorkflowRun({ + id: "runtime-step-limit-run", + workerId: "worker-step-runtime", + }); + const workflowFn = vi.fn( + async ({ step }: WorkflowFunctionParams) => { + await step.run({ name: "new-step-1" }, () => "first"); + await step.run({ name: "new-step-2" }, () => "second"); + return "unreachable"; + }, + ); + + await executeWorkflow({ + backend: { + listStepAttempts, + createStepAttempt, + completeStepAttempt, + failWorkflowRun, + } as unknown as Backend, + workflowRun, + workflowFn, + workflowVersion: null, + workerId: "worker-step-runtime", + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 5, + }, + }); + + expect(createStepAttempt).toHaveBeenCalledTimes(1); + expect(completeStepAttempt).toHaveBeenCalledTimes(1); + + const failCall = failWorkflowRun.mock.calls[0]?.[0]; + if (!failCall) throw new Error("Expected failWorkflowRun call"); + + expect(failCall.workflowRunId).toBe(workflowRun.id); + expect(failCall.workerId).toBe("worker-step-runtime"); + expect(failCall.retryPolicy).toEqual(DEFAULT_WORKFLOW_RETRY_POLICY); + expect(failCall.error["code"]).toBe(STEP_LIMIT_EXCEEDED_ERROR_CODE); + expect(failCall.error["limit"]).toBe(WORKFLOW_STEP_LIMIT); + expect(failCall.error["stepCount"]).toBe(WORKFLOW_STEP_LIMIT); + if (typeof failCall.error.message !== "string") { + throw new TypeError("Expected step-limit message to be a string"); + } + expect(failCall.error.message).toMatch(/exceeded the step limit/i); + }); + test("handles workflow errors with deadline exceeded", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -2039,3 +2223,32 @@ function createMockStepAttempt( ...overrides, }; } + +function createMockWorkflowRun( + overrides: Partial = {}, +): WorkflowRun { + return { + namespaceId: "default", + id: "workflow-run-id", + workflowName: "workflow-name", + version: null, + status: "running", + idempotencyKey: null, + config: {}, + context: null, + input: null, + output: null, + error: null, + attempts: 1, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + workerId: "worker-id", + availableAt: null, + deadlineAt: null, + startedAt: new Date("2026-01-01T00:00:00.000Z"), + finishedAt: null, + createdAt: new Date("2026-01-01T00:00:00.000Z"), + updatedAt: new Date("2026-01-01T00:00:00.000Z"), + ...overrides, + }; +} diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 9ff55d2b..4dc4ea55 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -86,6 +86,53 @@ const INVOKE_FAILURE_RETRY_POLICY: RetryPolicy = { maximumAttempts: 1, }; +/** Maximum number of step attempts allowed for a single workflow run. */ +export const WORKFLOW_STEP_LIMIT = 1000; + +/** Error code used when a workflow run exceeds the step-attempt limit. */ +export const STEP_LIMIT_EXCEEDED_ERROR_CODE = "STEP_LIMIT_EXCEEDED"; + +/** + * Error thrown when a workflow run reaches the maximum allowed step attempts. + */ +class StepLimitExceededError extends Error { + readonly code = STEP_LIMIT_EXCEEDED_ERROR_CODE; + readonly limit: number; + readonly stepCount: number; + + constructor(limit: number, stepCount: number) { + super( + `Exceeded the step limit of ${String(limit)} attempts (current count: ${String(stepCount)})`, + ); + this.name = "StepLimitExceededError"; + this.limit = limit; + this.stepCount = stepCount; + } +} + +/** + * Convert a step-limit error to a persisted serialized error payload. + * @param error - Step-limit error + * @returns Serialized error payload with limit metadata + */ +function serializeStepLimitExceededError( + error: Readonly, +): { + name: string; + message: string; + code: string; + limit: number; + stepCount: number; +} { + return { + name: error.name, + message: error.message, + code: error.code, + limit: error.limit, + stepCount: error.stepCount, + }; +} + /** * Resolve a partial step retry policy by merging it with step defaults. * @param partial - Optional partial retry policy @@ -258,6 +305,7 @@ export interface StepExecutorOptions { workflowRunId: string; workerId: string; attempts: StepAttempt[]; + stepLimit?: number; } /** @@ -268,6 +316,8 @@ class StepExecutor implements StepApi { private readonly backend: Backend; private readonly workflowRunId: string; private readonly workerId: string; + private readonly stepLimit: number; + private stepCount: number; private cache: StepAttemptCache; private readonly failedCountsByStepName: Map; private readonly runningByStepName: Map; @@ -276,6 +326,8 @@ class StepExecutor implements StepApi { this.backend = options.backend; this.workflowRunId = options.workflowRunId; this.workerId = options.workerId; + this.stepLimit = Math.max(1, options.stepLimit ?? WORKFLOW_STEP_LIMIT); + this.stepCount = options.attempts.length; const state = createStepExecutionStateFromAttempts(options.attempts); this.cache = state.cache; @@ -298,6 +350,7 @@ class StepExecutor implements StepApi { } // not in cache, create new step attempt + this.ensureStepLimitNotReached(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -306,6 +359,7 @@ class StepExecutor implements StepApi { config: {}, context: null, }); + this.stepCount += 1; this.runningByStepName.set(name, attempt); try { @@ -364,6 +418,7 @@ class StepExecutor implements StepApi { const resumeAt = result.value; const context = createSleepContext(resumeAt); + this.ensureStepLimitNotReached(); await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -372,6 +427,7 @@ class StepExecutor implements StepApi { config: {}, context, }); + this.stepCount += 1; // throw sleep signal to trigger postponement // we do not mark the step as completed here; it will be updated @@ -398,6 +454,7 @@ class StepExecutor implements StepApi { // First encounter — create the invoke step and child workflow run const timeoutAt = resolveInvokeTimeoutAt(opts.timeout); + this.ensureStepLimitNotReached(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -406,6 +463,7 @@ class StepExecutor implements StepApi { config: {}, context: createInvokeContext(timeoutAt), }); + this.stepCount += 1; this.runningByStepName.set(stepName, attempt); const linkedAttempt = await this.linkChildWorkflowRun( @@ -605,6 +663,12 @@ class StepExecutor implements StepApi { error, }); } + + private ensureStepLimitNotReached(): void { + if (this.stepCount >= this.stepLimit) { + throw new StepLimitExceededError(this.stepLimit, this.stepCount); + } + } } /** @@ -643,9 +707,12 @@ export async function executeWorkflow( const response = await backend.listStepAttempts({ workflowRunId: workflowRun.id, ...(cursor ? { after: cursor } : {}), - limit: 1000, + limit: WORKFLOW_STEP_LIMIT, }); attempts.push(...response.data); + if (attempts.length >= WORKFLOW_STEP_LIMIT) { + throw new StepLimitExceededError(WORKFLOW_STEP_LIMIT, attempts.length); + } cursor = response.pagination.next ?? undefined; } while (cursor); @@ -723,6 +790,16 @@ export async function executeWorkflow( return; } + if (error instanceof StepLimitExceededError) { + await backend.failWorkflowRun({ + workflowRunId: workflowRun.id, + workerId, + error: serializeStepLimitExceededError(error), + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + return; + } + // handle step error if (error instanceof StepError) { const serializedError = serializeError(error.originalError);