diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index adacc416..add1e83b 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -187,9 +187,12 @@ const welcomeEmail = await step.run({ name: "welcome-email" }, async () => { All steps are executed synchronously by the worker. When a worker encounters a new step: -1. It creates a `step_attempt` record with status `running`. -2. It executes the step function inline. -3. Upon completion, it updates the `step_attempt` to status `completed` with +1. It resolves the step's durable key for this execution pass. The first + occurrence keeps its base name; later collisions are auto-indexed as + `name:1`, `name:2`, and so on. +2. It creates a `step_attempt` record with status `running`. +3. It executes the step function inline. +4. Upon completion, it updates the `step_attempt` to status `completed` with the result. Workers can be configured with a high concurrency limit (e.g., 100 or more) to @@ -224,6 +227,11 @@ await step.sleep("wait-one-hour", "1h"); it durably. When the timeout is reached (default 7d), the parent step fails but the child workflow continues running independently. +All step APIs (`step.run`, `step.sleep`, and `step.invokeWorkflow`) share the +same collision logic for durable keys. If duplicate base names are encountered +in one execution pass, OpenWorkflow auto-indexes them as `name`, `name:1`, +`name:2`, and so on so each step call maps to a distinct step attempt. + ## 4. Error Handling & Retries ### 4.1. Step Failures & Retries diff --git a/packages/docs/docs/dynamic-steps.mdx b/packages/docs/docs/dynamic-steps.mdx index 0165e462..22dddd2c 100644 --- a/packages/docs/docs/dynamic-steps.mdx +++ b/packages/docs/docs/dynamic-steps.mdx @@ -5,9 +5,11 @@ description: Run a variable number of steps based on runtime data Sometimes you don't know how many steps a workflow needs until it runs. You might need to fetch data for each item in a list, process rows from a query, -or fan out across a set of IDs from an API response. OpenWorkflow handles -this — you can create steps inside loops and maps, as long as each step has a -deterministic name. +or fan out across a set of IDs from an API response. + +OpenWorkflow handles this automatically. When multiple steps share the same +name, they're disambiguated in order (`fetch-data`, `fetch-data:1`, +`fetch-data:2`, ...). You don't need to generate unique names yourself. ## Basic Pattern @@ -16,49 +18,33 @@ Map over your data and create a step per item using `Promise.all`: ```ts const results = await Promise.all( input.items.map((item) => - step.run({ name: `fetch-data:${item.id}` }, async () => { + step.run({ name: "fetch-data" }, async () => { return await thirdPartyApi.fetch(item.id); }), ), ); ``` -Each step is individually memoized. If the workflow restarts, completed steps -return their cached results and only the remaining steps re-execute. - -The most important rule: **step names must be deterministic across replays**. -Use a stable identifier from the data itself — like a database ID, a slug, or -a unique key: - -```ts -// Good — stable ID from the data -step.run({ name: `process-order:${order.id}` }, ...) -step.run({ name: `send-email:${user.email}` }, ...) - -// Bad — non-deterministic, different on every run -step.run({ name: `task-${Date.now()}` }, ...) -step.run({ name: `task-${crypto.randomUUID()}` }, ...) -``` - - - Non-deterministic names (timestamps, random values, request IDs) break replay. - Completed steps won't be found in history, causing them to re-execute. - +Every step uses the same name — OpenWorkflow appends `:1`, `:2`, etc. +automatically. Each step is individually memoized, so if the workflow restarts, +completed steps return their cached results and only the remaining steps +re-execute. -### Falling Back to Array Indexes +## Stable IDs for Mutable Collections -When no stable ID exists, you can use the array index: +If items can be added, removed, or reordered between retries, include a stable +ID from the data in the step name instead of relying on auto-indexing: ```ts const results = await Promise.all( - input.items.map((item, index) => - step.run({ name: `fetch-data:${index}` }, async () => { - return await thirdPartyApi.fetch(item.lookupKey); + input.orders.map((order) => + step.run({ name: `process-order:${order.id}` }, async () => { + return await processOrder(order); }), ), ); ``` -This is safe only if the array order is identical between the original run and -any replay. If the order changes, cached results get returned for the wrong -items. +This way, each step is tied to a specific item regardless of its position in +the array. Use any stable identifier — a database ID, a slug, or a unique key +from the data itself. diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 8761c056..05843a5d 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -65,22 +65,20 @@ The customer is charged exactly once. ## Step Names -Step names must be unique within a workflow. They identify steps during replay -and should be stable across code changes. +Step names identify checkpoints during replay and should be stable across code +changes. Use descriptive names that reflect what the step does: ```ts // Good - descriptive, stable names await step.run({ name: "fetch-user" }, ...); await step.run({ name: "send-welcome-email" }, ...); await step.run({ name: "update-user-status" }, ...); - -// Bad - generic names that could conflict -await step.run({ name: "step-1" }, ...); -await step.run({ name: "step-2" }, ...); ``` -If you need to create a dynamic number of steps from runtime data (like -mapping over an array), see [Dynamic Steps](/docs/dynamic-steps). +If two steps share the same name in a single execution, OpenWorkflow +automatically disambiguates them by appending `:1`, `:2`, and so on in +encounter order. This is most useful for [dynamic steps](/docs/dynamic-steps) +where the number of steps isn't known ahead of time. Changing step names after workflows are in-flight can cause replay errors. @@ -150,9 +148,6 @@ const childOutput = await step.invokeWorkflow("generate-report", { }); ``` -If `timeout` is reached, the parent step fails, but the child workflow keeps -running independently. - ## Retry Policy (Optional) Control backoff and retry limits for an individual step: diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 4ff68744..591813aa 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -45,7 +45,7 @@ describe("StepExecutor", () => { expect(result).toBe(8); }); - test("caches step results for same step name", async () => { + test("auto-indexes duplicate step.run names", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -72,9 +72,138 @@ describe("StepExecutor", () => { const result = await handle.result(); expect(result).toEqual({ first: "first-execution", - second: "first-execution", + second: "second-execution", + }); + expect(executionCount).toBe(2); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const stepNames = steps.data + .map((stepAttempt) => stepAttempt.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(stepNames).toEqual(["cached-step", "cached-step:1"]); + }); + + test("avoids step-name collisions with explicit numeric suffixes", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + let thirdStepExecutions = 0; + const workflow = client.defineWorkflow( + { name: `executor-collision-suffix-${randomUUID()}` }, + async ({ step }) => { + const first = await step.run({ name: "foo" }, () => "A"); + const second = await step.run({ name: "foo:1" }, () => "B"); + const third = await step.run({ name: "foo" }, () => { + thirdStepExecutions += 1; + return "C"; + }); + return { first, second, third }; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + await worker.tick(); + + await expect(handle.result()).resolves.toEqual({ + first: "A", + second: "B", + third: "C", + }); + expect(thirdStepExecutions).toBe(1); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const stepNames = steps.data + .map((stepAttempt) => stepAttempt.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(stepNames).toEqual(["foo", "foo:1", "foo:2"]); + }); + + test("handles chaotic explicit numeric suffix naming without collisions", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + let executionCount = 0; + const workflow = client.defineWorkflow( + { name: `executor-collision-chaos-${randomUUID()}` }, + async ({ step }) => { + async function runStep(name: string, value: string) { + return await step.run({ name }, () => { + executionCount += 1; + return value; + }); + } + + return [ + await runStep("foo", "a"), + await runStep("foo:2", "b"), + await runStep("foo", "c"), + await runStep("foo:1", "d"), + await runStep("foo", "e"), + await runStep("foo:2", "f"), + await runStep("foo", "g"), + await runStep("foo:1", "h"), + await runStep("foo:3", "i"), + await runStep("foo", "j"), + ]; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + await worker.tick(); + + await expect(handle.result()).resolves.toEqual([ + "a", + "b", + "c", + "d", + "e", + "f", + "g", + "h", + "i", + "j", + ]); + expect(executionCount).toBe(10); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + + expect(steps.data).toHaveLength(10); + expect( + new Set(steps.data.map((stepAttempt) => stepAttempt.stepName)).size, + ).toBe(10); + + const stepNameByOutput = Object.fromEntries( + steps.data.map((stepAttempt): readonly [string, string] => { + if (typeof stepAttempt.output !== "string") { + throw new TypeError("Expected string output for chaos naming test"); + } + return [stepAttempt.output, stepAttempt.stepName]; + }), + ); + + expect(stepNameByOutput).toEqual({ + a: "foo", + b: "foo:2", + c: "foo:1", + d: "foo:1:1", + e: "foo:3", + f: "foo:2:1", + g: "foo:4", + h: "foo:1:2", + i: "foo:3:1", + j: "foo:5", }); - expect(executionCount).toBe(1); }); test("different step names execute independently", async () => { @@ -194,6 +323,43 @@ describe("StepExecutor", () => { expect(result).toBe(15); }); + test("auto-indexes duplicate sleep names", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `sleep-duplicate-names-${randomUUID()}` }, + async ({ step }) => { + await step.sleep("pause", "10ms"); + await step.sleep("pause", "10ms"); + return "done"; + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("done"); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const sleepStepNames = steps.data + .filter((stepAttempt) => stepAttempt.kind === "sleep") + .map((stepAttempt) => stepAttempt.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(sleepStepNames).toEqual(["pause", "pause:1"]); + }); + test("invokes a child workflow and returns child output", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -345,7 +511,64 @@ describe("StepExecutor", () => { await expect(handle.result()).resolves.toBe(10); }); - test("supports workflow-name targets, date/number timeouts, and cached invoke replay", async () => { + test("applies collision indexing across step.run and step.invokeWorkflow", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-cross-type-${randomUUID()}` }, + ({ input }: { input: { value: number } }) => { + return input.value + 1; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-cross-type-${randomUUID()}` }, + async ({ step }) => { + const local = await step.run({ name: "shared-name" }, () => 41); + const invoked = await step.invokeWorkflow("shared-name", { + workflow: child.workflow, + input: { value: 1 }, + }); + return { local, invoked }; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ local: 41, invoked: 2 }); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const sharedSteps = steps.data.filter((stepAttempt) => + stepAttempt.stepName.startsWith("shared-name"), + ); + expect(sharedSteps).toHaveLength(2); + const kindByStepName = new Map( + sharedSteps.map((stepAttempt): readonly [string, string] => [ + stepAttempt.stepName, + stepAttempt.kind, + ]), + ); + expect( + [...kindByStepName.keys()].toSorted((a, b) => a.localeCompare(b)), + ).toEqual(["shared-name", "shared-name:1"]); + expect(kindByStepName.get("shared-name")).toBe("function"); + expect(kindByStepName.get("shared-name:1")).toBe("invoke"); + }); + + test("supports workflow-name targets, date/number timeouts, and auto-indexed duplicate invoke names", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -396,7 +619,7 @@ describe("StepExecutor", () => { expect(status).toBe("completed"); await expect(handle.result()).resolves.toEqual({ first: 5, - second: 5, + second: 100, numeric: 9, spec: 2, }); @@ -405,10 +628,17 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - expect( - steps.data.filter((stepAttempt) => stepAttempt.kind === "invoke"), - ).toHaveLength(3); - }); + const invokeStepNames = steps.data + .filter((stepAttempt) => stepAttempt.kind === "invoke") + .map((stepAttempt) => stepAttempt.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(invokeStepNames).toEqual([ + "invoke-cached", + "invoke-cached:1", + "invoke-number-timeout", + "invoke-spec-target", + ]); + }, 15_000); test("fails invoke when timeout number is invalid", async () => { const backend = await createBackend(); @@ -1300,6 +1530,62 @@ describe("StepExecutor", () => { await expect(handle.result()).resolves.toBe(10); }); + test("auto-indexes duplicate invoke names in parallel Promise.all", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-parallel-duplicate-${randomUUID()}` }, + ({ input }: { input: { value: number } }) => { + return input.value * 3; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-parallel-duplicate-${randomUUID()}` }, + async ({ step }) => { + const [first, second] = await Promise.all([ + step.invokeWorkflow("invoke-same", { + workflow: child.workflow, + input: { value: 2 }, + }), + step.invokeWorkflow("invoke-same", { + workflow: child.workflow, + input: { value: 3 }, + }), + ]); + return { first, second }; + }, + ); + + const worker = client.newWorker({ concurrency: 3 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 300, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ first: 6, second: 9 }); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const invokeStepNames = steps.data + .filter( + (stepAttempt) => + stepAttempt.kind === "invoke" && + stepAttempt.stepName.startsWith("invoke-same"), + ) + .map((stepAttempt) => stepAttempt.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(invokeStepNames).toEqual(["invoke-same", "invoke-same:1"]); + }); + test("does not create duplicate child runs while waiting across replays", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 4dc4ea55..9d959adf 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -321,6 +321,8 @@ class StepExecutor implements StepApi { private cache: StepAttemptCache; private readonly failedCountsByStepName: Map; private readonly runningByStepName: Map; + private readonly expectedNextStepIndexByName: Map; + private readonly resolvedStepNames: Set; constructor(options: Readonly) { this.backend = options.backend; @@ -333,6 +335,36 @@ class StepExecutor implements StepApi { this.cache = state.cache; this.failedCountsByStepName = new Map(state.failedCountsByStepName); this.runningByStepName = new Map(state.runningByStepName); + this.expectedNextStepIndexByName = new Map(); + this.resolvedStepNames = new Set(); + } + + /** + * Resolve a step name to a deterministic, unique key for this workflow + * execution pass. When a name collides, suffixes are appended as + * `name:1`, `name:2`, etc. If those suffixes already exist (including + * user-provided names), indexing continues until an unused name is found. + * @param stepName - User-provided step name + * @returns Resolved step name used for durable step state + */ + private resolveStepName(stepName: string): string { + if (!this.resolvedStepNames.has(stepName)) { + this.resolvedStepNames.add(stepName); + return stepName; + } + + const expectedNextIndex = + this.expectedNextStepIndexByName.get(stepName) ?? 1; + for (let index = expectedNextIndex; ; index += 1) { + const resolvedName = `${stepName}:${String(index)}`; + if (this.resolvedStepNames.has(resolvedName)) { + continue; + } + + this.expectedNextStepIndexByName.set(stepName, index + 1); + this.resolvedStepNames.add(resolvedName); + return resolvedName; + } } // ---- step.run ----------------------------------------------------------- @@ -341,10 +373,11 @@ class StepExecutor implements StepApi { config: Readonly, fn: StepFunction, ): Promise { - const { name, retryPolicy: retryPolicyOverride } = config; + const { name: baseStepName, retryPolicy: retryPolicyOverride } = config; + const stepName = this.resolveStepName(baseStepName); // return cached result if available - const existingAttempt = getCachedStepAttempt(this.cache, name); + const existingAttempt = getCachedStepAttempt(this.cache, stepName); if (existingAttempt) { return existingAttempt.output as Output; } @@ -354,13 +387,14 @@ class StepExecutor implements StepApi { const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, - stepName: name, + stepName, kind: "function", config: {}, context: null, }); + this.stepCount += 1; - this.runningByStepName.set(name, attempt); + this.runningByStepName.set(stepName, attempt); try { // execute step function @@ -377,37 +411,26 @@ class StepExecutor implements StepApi { // cache result this.cache = addToStepAttemptCache(this.cache, savedAttempt); - this.runningByStepName.delete(name); + this.runningByStepName.delete(stepName); return savedAttempt.output as Output; } catch (error) { - // mark failure - this.runningByStepName.delete(name); - await this.backend.failStepAttempt({ - workflowRunId: this.workflowRunId, - stepAttemptId: attempt.id, - workerId: this.workerId, - error: serializeError(error), - }); - - const stepFailedAttempts = - (this.failedCountsByStepName.get(name) ?? 0) + 1; - this.failedCountsByStepName.set(name, stepFailedAttempts); - - throw new StepError({ - stepName: name, - stepFailedAttempts, - retryPolicy: resolveStepRetryPolicy(retryPolicyOverride), + return this.failStepWithError( + stepName, + attempt.id, error, - }); + resolveStepRetryPolicy(retryPolicyOverride), + ); } } // ---- step.sleep --------------------------------------------------------- - async sleep(name: string, duration: DurationString): Promise { + async sleep(baseStepName: string, duration: DurationString): Promise { + const stepName = this.resolveStepName(baseStepName); + // return cached result if this sleep already completed - const existingAttempt = getCachedStepAttempt(this.cache, name); + const existingAttempt = getCachedStepAttempt(this.cache, stepName); if (existingAttempt) return; // create new step attempt for the sleep @@ -422,7 +445,7 @@ class StepExecutor implements StepApi { await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, - stepName: name, + stepName, kind: "sleep", config: {}, context, @@ -438,9 +461,10 @@ class StepExecutor implements StepApi { // ---- step.invokeWorkflow ----------------------------------------------- async invokeWorkflow( - stepName: string, + baseStepName: string, opts: Readonly>, ): Promise { + const stepName = this.resolveStepName(baseStepName); const existingAttempt = getCachedStepAttempt(this.cache, stepName); if (existingAttempt) { return existingAttempt.output as Output; diff --git a/packages/openworkflow/worker/worker.test.ts b/packages/openworkflow/worker/worker.test.ts index 5b3de723..edecd8c6 100644 --- a/packages/openworkflow/worker/worker.test.ts +++ b/packages/openworkflow/worker/worker.test.ts @@ -46,7 +46,7 @@ describe("Worker", () => { expect(result).toBe(42); }); - test("step.run reuses cached results", async () => { + test("step.run auto-indexes duplicate names", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -60,7 +60,7 @@ describe("Worker", () => { }); const second = await step.run({ name: "once" }, () => { executionCount++; - return "should-not-run"; + return "second-value"; }); return { first, second }; }, @@ -72,8 +72,17 @@ describe("Worker", () => { await worker.tick(); const result = await handle.result(); - expect(result).toEqual({ first: "value", second: "value" }); - expect(executionCount).toBe(1); + expect(result).toEqual({ first: "value", second: "second-value" }); + expect(executionCount).toBe(2); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const stepNames = steps.data + .map((stepAttempt) => stepAttempt.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(stepNames).toEqual(["once", "once:1"]); }); test("reschedules workflow when definition is missing", async () => {