From 80956e5722b43916662aa6ee4cc523bab7ee284a Mon Sep 17 00:00:00 2001 From: James Martinez Date: Thu, 26 Feb 2026 21:34:42 -0600 Subject: [PATCH] feat(openworkflow): `step.runWorkflow` to run child workflows --- ARCHITECTURE.md | 18 +- openworkflow/hello-world-parent.ts | 6 +- packages/dashboard/src/routes/runs/$runId.tsx | 4 +- packages/docs/docs/child-workflows.mdx | 87 ++-- packages/docs/docs/roadmap.mdx | 2 +- packages/docs/docs/steps.mdx | 12 +- packages/docs/docs/workflows.mdx | 14 +- packages/openworkflow/client/client.ts | 2 +- .../openworkflow/core/step-attempt.test.ts | 16 +- packages/openworkflow/core/step-attempt.ts | 20 +- .../openworkflow/core/workflow-function.ts | 42 +- .../openworkflow/postgres/backend.test.ts | 40 +- packages/openworkflow/postgres/backend.ts | 10 +- packages/openworkflow/sqlite/backend.test.ts | 42 +- packages/openworkflow/sqlite/backend.ts | 10 +- .../openworkflow/testing/backend.testsuite.ts | 40 +- .../openworkflow/worker/execution.test.ts | 475 +++++++++--------- packages/openworkflow/worker/execution.ts | 215 ++++---- 18 files changed, 522 insertions(+), 533 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 93607de3..3b7bfa6f 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -228,14 +228,16 @@ sleep but a durable pause. await step.sleep("wait-one-hour", "1h"); ``` -**`step.invokeWorkflow(name, options)`**: Starts a child workflow and waits for -it durably. When the timeout is reached (default 7d), the parent step fails but -the child workflow continues running independently. - -All step APIs (`step.run`, `step.sleep`, and `step.invokeWorkflow`) share the -same collision logic for durable keys. If duplicate base names are encountered -in one execution pass, OpenWorkflow auto-indexes them as `name`, `name:1`, -`name:2`, and so on so each step call maps to a distinct step attempt. +**`step.runWorkflow(spec, input?, options?)`**: Starts a child workflow and +waits for it durably. `options.name` sets the durable step name (defaults to the +target workflow name in `spec`) and `options.timeout` controls the wait timeout +(default 7d). When the timeout is reached, the parent step fails but the child +workflow continues running independently. + +All step APIs (`step.run`, `step.sleep`, and `step.runWorkflow`) share the same +collision logic for durable keys. If duplicate base names are encountered in one +execution pass, OpenWorkflow auto-indexes them as `name`, `name:1`, `name:2`, +and so on so each step call maps to a distinct step attempt. ## 4. Error Handling & Retries diff --git a/openworkflow/hello-world-parent.ts b/openworkflow/hello-world-parent.ts index 25f66087..0d3bbdf9 100644 --- a/openworkflow/hello-world-parent.ts +++ b/openworkflow/hello-world-parent.ts @@ -2,16 +2,14 @@ import { helloWorld } from "./hello-world.js"; import { defineWorkflow } from "openworkflow"; /** - * Example workflow that invokes hello-world as a child workflow. + * Example workflow that runs hello-world as a child workflow. */ export const helloWorldParent = defineWorkflow( { name: "hello-world-parent" }, async ({ step, run }) => { console.log(`[run ${run.id}]`); - const childResult = await step.invokeWorkflow("hello-world-child", { - workflow: helloWorld, - }); + const childResult = await step.runWorkflow(helloWorld.spec); return { childResult, parentMessage: "Hello from the parent workflow!" }; }, diff --git a/packages/dashboard/src/routes/runs/$runId.tsx b/packages/dashboard/src/routes/runs/$runId.tsx index ed48f14b..67488ba8 100644 --- a/packages/dashboard/src/routes/runs/$runId.tsx +++ b/packages/dashboard/src/routes/runs/$runId.tsx @@ -53,7 +53,7 @@ export const Route = createFileRoute("/runs/$runId")({ ...new Set( steps .map((step) => - step.kind === "invoke" ? step.childWorkflowRunId : null, + step.kind === "workflow" ? step.childWorkflowRunId : null, ) .filter((childRunId): childRunId is string => childRunId !== null), ), @@ -186,7 +186,7 @@ function RunDetailsPage() { const stepTypeLabel = step.kind === "function" ? "run" : step.kind; const childRunId = - step.kind === "invoke" ? step.childWorkflowRunId : null; + step.kind === "workflow" ? step.childWorkflowRunId : null; const childRun = childRunId ? (childRunsById[childRunId] ?? null) : null; diff --git a/packages/docs/docs/child-workflows.mdx b/packages/docs/docs/child-workflows.mdx index 45cdd159..e2a09578 100644 --- a/packages/docs/docs/child-workflows.mdx +++ b/packages/docs/docs/child-workflows.mdx @@ -1,10 +1,10 @@ --- title: Child Workflows -description: Invoke workflows from other workflows and wait for their results +description: Run workflows from other workflows and wait for their results --- A parent workflow can start a child workflow and durably wait for its result -using `step.invokeWorkflow()`. This lets you compose complex processes from +using `step.runWorkflow()`. This lets you compose complex processes from smaller, reusable workflows — like splitting an order pipeline into separate payment and shipping workflows. @@ -39,9 +39,8 @@ const processOrder = defineWorkflow( }); // Start the report workflow and wait for it to finish - const report = await step.invokeWorkflow("generate-report", { - workflow: generateReport, - input: { reportId: input.orderId }, + const report = await step.runWorkflow(generateReport.spec, { + reportId: input.orderId, }); await step.run({ name: "send-confirmation" }, async () => { @@ -58,29 +57,28 @@ const processOrder = defineWorkflow( ## Timeout By default, the parent waits up to **7 days** for the child to finish. You can -customize this with the `timeout` option: +customize this with `options.timeout`: ```ts -const result = await step.invokeWorkflow("quick-task", { - workflow: quickTaskWorkflow, - input: { taskId: "abc" }, - timeout: "5m", // wait at most 5 minutes -}); +const result = await step.runWorkflow( + quickTaskWorkflow.spec, + { taskId: "abc" }, + { timeout: "5m" }, // wait at most 5 minutes +); ``` -`timeout` accepts a [duration string](/docs/sleeping#duration-formats), a -number of milliseconds, or a `Date`: +`timeout` accepts a [duration string](/docs/sleeping#duration-formats), a number +of milliseconds, or a `Date`: ```ts // Duration string -await step.invokeWorkflow("task", { workflow: w, timeout: "1h" }); +await step.runWorkflow(w.spec, undefined, { timeout: "1h" }); // Milliseconds -await step.invokeWorkflow("task", { workflow: w, timeout: 60_000 }); +await step.runWorkflow(w.spec, undefined, { timeout: 60_000 }); // Absolute deadline -await step.invokeWorkflow("task", { - workflow: w, +await step.runWorkflow(w.spec, undefined, { timeout: new Date("2026-03-01"), }); ``` @@ -91,43 +89,34 @@ await step.invokeWorkflow("task", { canceled. -## Workflow Target +## Workflow Spec -The `workflow` option accepts a workflow definition, a workflow spec, or a plain -string name: +The first argument accepts a workflow spec: ```ts -// Workflow definition (recommended — type-safe input/output) -await step.invokeWorkflow("run-child", { - workflow: myWorkflow, - input: { key: "value" }, -}); - -// Workflow spec -await step.invokeWorkflow("run-child", { - workflow: myWorkflow.spec, - input: { key: "value" }, -}); +// From a defined workflow +await step.runWorkflow(myWorkflow.spec, { key: "value" }); -// String name (useful when the child is defined in another package) -await step.invokeWorkflow("run-child", { - workflow: "my-workflow", - input: { key: "value" }, -}); +// Or any WorkflowSpec-compatible object +await step.runWorkflow({ name: "my-workflow" }, { key: "value" }); ``` +## Step Name + +Set `options.name` to control the durable step name. If omitted, OpenWorkflow +uses the target workflow name. + ## Error Handling -If the child workflow **fails**, the parent invoke step also fails: +If the child workflow **fails**, the parent workflow step also fails: ```ts const orderPipeline = defineWorkflow( { name: "order-pipeline" }, async ({ input, step }) => { try { - const result = await step.invokeWorkflow("charge", { - workflow: chargeWorkflow, - input: { orderId: input.orderId }, + const result = await step.runWorkflow(chargeWorkflow.spec, { + orderId: input.orderId, }); return result; } catch (error) { @@ -140,13 +129,13 @@ const orderPipeline = defineWorkflow( ); ``` -If the child workflow is **canceled**, the parent invoke step fails with an +If the child workflow is **canceled**, the parent workflow step fails with an error indicating the child was canceled. - Invoke steps do not retry automatically. The child workflow is responsible for - its own retries. If the child fails permanently, the error propagates to the - parent. + Workflow steps do not retry automatically. The child workflow is responsible + for its own retries. If the child fails permanently, the error propagates to + the parent. ## Parallel Child Workflows @@ -155,13 +144,11 @@ Start multiple child workflows concurrently with `Promise.all`: ```ts const [payment, shipping] = await Promise.all([ - step.invokeWorkflow("process-payment", { - workflow: paymentWorkflow, - input: { orderId: input.orderId }, + step.runWorkflow(paymentWorkflow.spec, { + orderId: input.orderId, }), - step.invokeWorkflow("prepare-shipping", { - workflow: shippingWorkflow, - input: { orderId: input.orderId }, + step.runWorkflow(shippingWorkflow.spec, { + orderId: input.orderId, }), ]); ``` diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx index 354822fa..f97bc187 100644 --- a/packages/docs/docs/roadmap.mdx +++ b/packages/docs/docs/roadmap.mdx @@ -19,7 +19,7 @@ description: What's coming next for OpenWorkflow - ✅ Configurable retry policies - ✅ Idempotency keys - ✅ Prometheus `/metrics` endpoint -- ✅ Child workflows (`step.invokeWorkflow`) +- ✅ Child workflows (`step.runWorkflow`) ## Coming Soon diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 05843a5d..3163bac3 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -136,16 +136,16 @@ Pauses the workflow until a specified duration has elapsed. See await step.sleep("wait-one-hour", "1h"); ``` -### `step.invokeWorkflow()` +### `step.runWorkflow()` Starts a child workflow and waits for its result durably: ```ts -const childOutput = await step.invokeWorkflow("generate-report", { - workflow: generateReportWorkflow, - input: { reportId: input.reportId }, - timeout: "5m", // optional, defaults to 7 days -}); +const childOutput = await step.runWorkflow( + generateReportWorkflow.spec, + { reportId: input.reportId }, + { timeout: "5m" }, // optional, defaults to 7 days +); ``` ## Retry Policy (Optional) diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 73db03d4..833e55c4 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -213,12 +213,12 @@ create a separate run. The workflow function receives an object with four properties: -| Parameter | Type | Description | -| --------- | --------------------- | ------------------------------------------------------------------------ | -| `input` | Generic | The input data passed when starting the workflow | -| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.invokeWorkflow`) | -| `version` | `string \| null` | The workflow version, if specified | -| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | +| Parameter | Type | Description | +| --------- | --------------------- | --------------------------------------------------------------------- | +| `input` | Generic | The input data passed when starting the workflow | +| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`) | +| `version` | `string \| null` | The workflow version, if specified | +| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | ```ts defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { @@ -240,7 +240,7 @@ A workflow run progresses through these states: | ----------- | -------------------------------------------------------------- | | `pending` | Created and waiting for a worker to claim it | | `running` | Actively being executed by a worker | -| `sleeping` | Paused while waiting for `step.sleep` or `step.invokeWorkflow` | +| `sleeping` | Paused while waiting for `step.sleep` or `step.runWorkflow` | | `completed` | Finished successfully | | `failed` | Failed after exhausting retries, hitting deadline, or step cap | | `canceled` | Explicitly canceled and will not continue | diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index 24844d6a..61b09a0a 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -86,7 +86,7 @@ export class OpenWorkflow { * @returns Handle for awaiting the result * @example * ```ts - * const handle = await ow.runWorkflow(emailWorkflow, { to: 'user@example.com' }); + * const handle = await ow.runWorkflow(emailWorkflow.spec, { to: 'user@example.com' }); * const result = await handle.result(); * ``` */ diff --git a/packages/openworkflow/core/step-attempt.test.ts b/packages/openworkflow/core/step-attempt.test.ts index 62d4f38a..ce5f207f 100644 --- a/packages/openworkflow/core/step-attempt.test.ts +++ b/packages/openworkflow/core/step-attempt.test.ts @@ -6,7 +6,7 @@ import { normalizeStepOutput, calculateDateFromDuration, createSleepContext, - createInvokeContext, + createWorkflowContext, } from "./step-attempt.js"; import type { StepAttempt, StepAttemptCache } from "./step-attempt.js"; import { describe, expect, test } from "vitest"; @@ -318,22 +318,22 @@ describe("createSleepContext", () => { }); }); -describe("createInvokeContext", () => { - test("creates invoke context with timeout", () => { +describe("createWorkflowContext", () => { + test("creates workflow context with timeout", () => { const timeoutAt = new Date("2025-06-15T10:30:00.000Z"); - const context = createInvokeContext(timeoutAt); + const context = createWorkflowContext(timeoutAt); expect(context).toEqual({ - kind: "invoke", + kind: "workflow", timeoutAt: "2025-06-15T10:30:00.000Z", }); }); - test("creates invoke context with null timeout", () => { - const context = createInvokeContext(null); + test("creates workflow context with null timeout", () => { + const context = createWorkflowContext(null); expect(context).toEqual({ - kind: "invoke", + kind: "workflow", timeoutAt: null, }); }); diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index dd3aae6f..888120ac 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -7,7 +7,7 @@ import { err, ok } from "./result.js"; /** * The kind of step in a workflow. */ -export type StepKind = "function" | "sleep" | "invoke"; +export type StepKind = "function" | "sleep" | "workflow"; /** * Status of a step attempt through its lifecycle. @@ -27,10 +27,10 @@ export interface SleepStepAttemptContext { } /** - * Context for an invoke step attempt. + * Context for a workflow step attempt. */ -export interface InvokeStepAttemptContext { - kind: "invoke"; +export interface WorkflowStepAttemptContext { + kind: "workflow"; timeoutAt: string | null; } @@ -39,7 +39,7 @@ export interface InvokeStepAttemptContext { */ export type StepAttemptContext = | SleepStepAttemptContext - | InvokeStepAttemptContext; + | WorkflowStepAttemptContext; /** * StepAttempt represents a single attempt of a step within a workflow. @@ -159,15 +159,15 @@ export function createSleepContext( } /** - * Create the context object for an invoke step attempt. + * Create the context object for a workflow step attempt. * @param timeoutAt - Parent wait timeout deadline, or null for no timeout - * @returns The context object for an invoke step + * @returns The context object for a workflow step */ -export function createInvokeContext( +export function createWorkflowContext( timeoutAt: Readonly | null, -): InvokeStepAttemptContext { +): WorkflowStepAttemptContext { return { - kind: "invoke" as const, + kind: "workflow" as const, timeoutAt: timeoutAt?.toISOString() ?? null, }; } diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 79be0a65..01745679 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -1,9 +1,5 @@ import type { DurationString } from "./duration.js"; -import type { - RetryPolicy, - Workflow, - WorkflowSpec, -} from "./workflow-definition.js"; +import type { RetryPolicy, WorkflowSpec } from "./workflow-definition.js"; import type { WorkflowRun } from "./workflow-run.js"; /** @@ -31,41 +27,35 @@ export type StepFunction = () => | undefined; /** - * Target workflow reference for `step.invokeWorkflow`. + * Options for an individual step defined with `step.runWorkflow()`. */ -type InvokeWorkflowTarget = - | WorkflowSpec - | Workflow - | string; - -/** - * Config for invoking a child workflow from `step.invokeWorkflow()`. - */ -export interface InvokeStepConfig< - Input = unknown, - Output = unknown, - RunInput = Input, -> { - workflow: InvokeWorkflowTarget; - input?: RunInput; +export interface StepRunWorkflowOptions { + /** + * Optional durable step name. Defaults to the target workflow name. + */ + name?: string; + /** + * Maximum time to wait for the child workflow to complete. + */ timeout?: number | string | Date; } /** * Represents the API for defining steps within a workflow. Used within a * workflow handler to define steps by calling `step.run()`, `step.sleep()`, - * and `step.invokeWorkflow()`. + * and `step.runWorkflow()`. */ export interface StepApi { run: ( config: Readonly, fn: StepFunction, ) => Promise; - sleep: (name: string, duration: DurationString) => Promise; - invokeWorkflow: ( - name: string, - opts: Readonly>, + runWorkflow: ( + spec: WorkflowSpec, + input?: RunInput, + options?: Readonly, ) => Promise; + sleep: (name: string, duration: DurationString) => Promise; } /** diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index c47f5dc2..49f880f2 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -271,7 +271,7 @@ describe("BackendPostgres legacy sleeping compatibility", () => { }); }); -describe("BackendPostgres invoke wake-up reconciliation", () => { +describe("BackendPostgres workflow wake-up reconciliation", () => { test("wakes parked parent immediately when child already finished", async () => { const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { namespaceId: randomUUID(), @@ -279,7 +279,7 @@ describe("BackendPostgres invoke wake-up reconciliation", () => { try { const parent = await backend.createWorkflowRun({ - workflowName: "invoke-parent-reconcile", + workflowName: "workflow-parent-reconcile", version: null, idempotencyKey: null, input: null, @@ -301,31 +301,31 @@ describe("BackendPostgres invoke wake-up reconciliation", () => { throw new Error("Expected parent workflow run to be claimed"); } - const invokeAttempt = await backend.createStepAttempt({ + const workflowAttempt = await backend.createStepAttempt({ workflowRunId: parent.id, workerId: parentWorkerId, - stepName: "invoke-child", - kind: "invoke", + stepName: "workflow-child", + kind: "workflow", config: {}, context: null, }); const child = await backend.createWorkflowRun({ - workflowName: "invoke-child-reconcile", + workflowName: "workflow-child-reconcile", version: null, idempotencyKey: null, input: null, config: {}, context: null, - parentStepAttemptNamespaceId: invokeAttempt.namespaceId, - parentStepAttemptId: invokeAttempt.id, + parentStepAttemptNamespaceId: workflowAttempt.namespaceId, + parentStepAttemptId: workflowAttempt.id, availableAt: null, deadlineAt: null, }); await backend.setStepAttemptChildWorkflowRun({ workflowRunId: parent.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, childWorkflowRunNamespaceId: child.namespaceId, childWorkflowRunId: child.id, @@ -367,14 +367,14 @@ describe("BackendPostgres invoke wake-up reconciliation", () => { } }); - test("does not wake parked parent when invoke step is no longer running", async () => { + test("does not wake parked parent when workflow step is no longer running", async () => { const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { namespaceId: randomUUID(), }); try { const parent = await backend.createWorkflowRun({ - workflowName: "invoke-parent-no-wake-after-failed-invoke", + workflowName: "workflow-parent-no-wake-after-failed-workflow", version: null, idempotencyKey: null, input: null, @@ -396,31 +396,31 @@ describe("BackendPostgres invoke wake-up reconciliation", () => { throw new Error("Expected parent workflow run to be claimed"); } - const invokeAttempt = await backend.createStepAttempt({ + const workflowAttempt = await backend.createStepAttempt({ workflowRunId: parent.id, workerId: parentWorkerId, - stepName: "invoke-child", - kind: "invoke", + stepName: "workflow-child", + kind: "workflow", config: {}, context: null, }); const child = await backend.createWorkflowRun({ - workflowName: "invoke-child-no-wake-after-failed-invoke", + workflowName: "workflow-child-no-wake-after-failed-workflow", version: null, idempotencyKey: null, input: null, config: {}, context: null, - parentStepAttemptNamespaceId: invokeAttempt.namespaceId, - parentStepAttemptId: invokeAttempt.id, + parentStepAttemptNamespaceId: workflowAttempt.namespaceId, + parentStepAttemptId: workflowAttempt.id, availableAt: null, deadlineAt: null, }); await backend.setStepAttemptChildWorkflowRun({ workflowRunId: parent.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, childWorkflowRunNamespaceId: child.namespaceId, childWorkflowRunId: child.id, @@ -428,9 +428,9 @@ describe("BackendPostgres invoke wake-up reconciliation", () => { await backend.failStepAttempt({ workflowRunId: parent.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, - error: { message: "invoke failed in parent" }, + error: { message: "workflow failed in parent" }, }); const sleepTarget = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 6ad220a8..8475ffd4 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -399,21 +399,21 @@ export class BackendPostgres implements Backend { if (!updated) throw new Error("Failed to sleep workflow run"); - const reconciled = await this.reconcileInvokeSleepWakeUp( + const reconciled = await this.reconcileWorkflowSleepWakeUp( params.workflowRunId, ); return reconciled ?? updated; } /** - * Reconcile a just-parked parent run that is waiting on invoke replay. If the + * Reconcile a just-parked parent run that is waiting on workflow replay. If the * child already reached a terminal state before the parent cleared workerId, * the normal child-completion wake-up can be missed. This forces an immediate * wake-up for that case. * @param workflowRunId - Parent workflow run id * @returns Updated run when reconciliation changed availability, otherwise null */ - private async reconcileInvokeSleepWakeUp( + private async reconcileWorkflowSleepWakeUp( workflowRunId: string, ): Promise { const workflowRunsTable = this.workflowRunsTable(); @@ -440,7 +440,7 @@ export class BackendPostgres implements Backend { AND child."id" = sa."child_workflow_run_id" WHERE sa."namespace_id" = wr."namespace_id" AND sa."workflow_run_id" = wr."id" - AND sa."kind" = 'invoke' + AND sa."kind" = 'workflow' AND sa."status" = 'running' AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') ) @@ -627,7 +627,7 @@ export class BackendPostgres implements Backend { FROM ${stepAttemptsTable} sa WHERE sa."namespace_id" = ${childWorkflowRun.parentStepAttemptNamespaceId} AND sa."id" = ${childWorkflowRun.parentStepAttemptId} - AND sa."kind" = 'invoke' + AND sa."kind" = 'workflow' AND sa."status" = 'running' AND sa."child_workflow_run_namespace_id" = ${childWorkflowRun.namespaceId} AND sa."child_workflow_run_id" = ${childWorkflowRun.id} diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index d998fd93..7826d305 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -193,7 +193,7 @@ describe("BackendSqlite.setStepAttemptChildWorkflowRun error handling", () => { workflowRunId: claimed.id, workerId, stepName: randomUUID(), - kind: "invoke", + kind: "workflow", config: {}, context: null, }); @@ -294,7 +294,7 @@ describe("BackendSqlite legacy sleeping compatibility", () => { }); }); -describe("BackendSqlite invoke wake-up reconciliation", () => { +describe("BackendSqlite workflow wake-up reconciliation", () => { test("wakes parked parent immediately when child already finished", async () => { const backend = BackendSqlite.connect(":memory:", { namespaceId: randomUUID(), @@ -302,7 +302,7 @@ describe("BackendSqlite invoke wake-up reconciliation", () => { try { const parent = await backend.createWorkflowRun({ - workflowName: "invoke-parent-reconcile", + workflowName: "workflow-parent-reconcile", version: null, idempotencyKey: null, input: null, @@ -324,31 +324,31 @@ describe("BackendSqlite invoke wake-up reconciliation", () => { throw new Error("Expected parent workflow run to be claimed"); } - const invokeAttempt = await backend.createStepAttempt({ + const workflowAttempt = await backend.createStepAttempt({ workflowRunId: parent.id, workerId: parentWorkerId, - stepName: "invoke-child", - kind: "invoke", + stepName: "workflow-child", + kind: "workflow", config: {}, context: null, }); const child = await backend.createWorkflowRun({ - workflowName: "invoke-child-reconcile", + workflowName: "workflow-child-reconcile", version: null, idempotencyKey: null, input: null, config: {}, context: null, - parentStepAttemptNamespaceId: invokeAttempt.namespaceId, - parentStepAttemptId: invokeAttempt.id, + parentStepAttemptNamespaceId: workflowAttempt.namespaceId, + parentStepAttemptId: workflowAttempt.id, availableAt: null, deadlineAt: null, }); await backend.setStepAttemptChildWorkflowRun({ workflowRunId: parent.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, childWorkflowRunNamespaceId: child.namespaceId, childWorkflowRunId: child.id, @@ -390,14 +390,14 @@ describe("BackendSqlite invoke wake-up reconciliation", () => { } }); - test("does not wake parked parent when invoke step is no longer running", async () => { + test("does not wake parked parent when workflow step is no longer running", async () => { const backend = BackendSqlite.connect(":memory:", { namespaceId: randomUUID(), }); try { const parent = await backend.createWorkflowRun({ - workflowName: "invoke-parent-no-wake-after-failed-invoke", + workflowName: "workflow-parent-no-wake-after-failed-workflow", version: null, idempotencyKey: null, input: null, @@ -419,31 +419,31 @@ describe("BackendSqlite invoke wake-up reconciliation", () => { throw new Error("Expected parent workflow run to be claimed"); } - const invokeAttempt = await backend.createStepAttempt({ + const workflowAttempt = await backend.createStepAttempt({ workflowRunId: parent.id, workerId: parentWorkerId, - stepName: "invoke-child", - kind: "invoke", + stepName: "workflow-child", + kind: "workflow", config: {}, context: null, }); const child = await backend.createWorkflowRun({ - workflowName: "invoke-child-no-wake-after-failed-invoke", + workflowName: "workflow-child-no-wake-after-failed-workflow", version: null, idempotencyKey: null, input: null, config: {}, context: null, - parentStepAttemptNamespaceId: invokeAttempt.namespaceId, - parentStepAttemptId: invokeAttempt.id, + parentStepAttemptNamespaceId: workflowAttempt.namespaceId, + parentStepAttemptId: workflowAttempt.id, availableAt: null, deadlineAt: null, }); await backend.setStepAttemptChildWorkflowRun({ workflowRunId: parent.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, childWorkflowRunNamespaceId: child.namespaceId, childWorkflowRunId: child.id, @@ -451,9 +451,9 @@ describe("BackendSqlite invoke wake-up reconciliation", () => { await backend.failStepAttempt({ workflowRunId: parent.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, - error: { message: "invoke failed in parent" }, + error: { message: "workflow failed in parent" }, }); const sleepTarget = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 83a566f3..6c2744ed 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -398,20 +398,20 @@ export class BackendSqlite implements Backend { }); if (!updated) throw new Error("Failed to sleep workflow run"); - const reconciled = await this.reconcileInvokeSleepWakeUp( + const reconciled = await this.reconcileWorkflowSleepWakeUp( params.workflowRunId, ); return reconciled ?? updated; } /** - * Reconcile a parked parent run that is waiting on invoke replay. This closes + * Reconcile a parked parent run that is waiting on workflow replay. This closes * the race where a child finished before the parent cleared workerId. * @param workflowRunId - Parent workflow run id * @returns Updated run when reconciliation changed availability, otherwise * null */ - private async reconcileInvokeSleepWakeUp( + private async reconcileWorkflowSleepWakeUp( workflowRunId: string, ): Promise { const currentTime = now(); @@ -435,7 +435,7 @@ export class BackendSqlite implements Backend { AND child."id" = sa."child_workflow_run_id" WHERE sa."namespace_id" = "workflow_runs"."namespace_id" AND sa."workflow_run_id" = "workflow_runs"."id" - AND sa."kind" = 'invoke' + AND sa."kind" = 'workflow' AND sa."status" = 'running' AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') ) @@ -680,7 +680,7 @@ export class BackendSqlite implements Backend { FROM "step_attempts" WHERE "namespace_id" = ? AND "id" = ? - AND "kind" = 'invoke' + AND "kind" = 'workflow' AND "status" = 'running' AND "child_workflow_run_namespace_id" = ? AND "child_workflow_run_id" = ? diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index f24ed84f..68d12798 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -1013,13 +1013,13 @@ export function testBackend(options: TestBackendOptions): void { const parentRun = await createClaimedWorkflowRun(backend); const parentWorkerId = parentRun.workerId ?? ""; - const invokeAttempt = await backend.createStepAttempt({ + const workflowAttempt = await backend.createStepAttempt({ workflowRunId: parentRun.id, workerId: parentWorkerId, stepName: randomUUID(), - kind: "invoke", + kind: "workflow", config: {}, - context: { kind: "invoke", timeoutAt: null }, + context: { kind: "workflow", timeoutAt: null }, }); const childRun = await backend.createWorkflowRun({ @@ -1029,15 +1029,15 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, - parentStepAttemptNamespaceId: invokeAttempt.namespaceId, - parentStepAttemptId: invokeAttempt.id, + parentStepAttemptNamespaceId: workflowAttempt.namespaceId, + parentStepAttemptId: workflowAttempt.id, availableAt: null, deadlineAt: null, }); await backend.setStepAttemptChildWorkflowRun({ workflowRunId: parentRun.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, childWorkflowRunNamespaceId: childRun.namespaceId, childWorkflowRunId: childRun.id, @@ -1099,13 +1099,13 @@ export function testBackend(options: TestBackendOptions): void { } const parentLeaseBeforeChild = parentRun.availableAt.getTime(); - const invokeAttempt = await backend.createStepAttempt({ + const workflowAttempt = await backend.createStepAttempt({ workflowRunId: parentRun.id, workerId: parentWorkerId, stepName: randomUUID(), - kind: "invoke", + kind: "workflow", config: {}, - context: { kind: "invoke", timeoutAt: null }, + context: { kind: "workflow", timeoutAt: null }, }); const childRun = await backend.createWorkflowRun({ @@ -1115,15 +1115,15 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, - parentStepAttemptNamespaceId: invokeAttempt.namespaceId, - parentStepAttemptId: invokeAttempt.id, + parentStepAttemptNamespaceId: workflowAttempt.namespaceId, + parentStepAttemptId: workflowAttempt.id, availableAt: null, deadlineAt: null, }); await backend.setStepAttemptChildWorkflowRun({ workflowRunId: parentRun.id, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: parentWorkerId, childWorkflowRunNamespaceId: childRun.namespaceId, childWorkflowRunId: childRun.id, @@ -1258,13 +1258,13 @@ export function testBackend(options: TestBackendOptions): void { const parentRun = await createClaimedWorkflowRun(backend); const parentWorkerId = parentRun.workerId ?? ""; - const invokeAttempt = await backend.createStepAttempt({ + const workflowAttempt = await backend.createStepAttempt({ workflowRunId: parentRun.id, workerId: parentWorkerId, stepName: randomUUID(), - kind: "invoke", + kind: "workflow", config: {}, - context: { kind: "invoke", timeoutAt: null }, + context: { kind: "workflow", timeoutAt: null }, }); const farFuture = new Date(Date.now() + 5 * 60 * 1000); await backend.sleepWorkflowRun({ @@ -1280,8 +1280,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, - parentStepAttemptNamespaceId: invokeAttempt.namespaceId, - parentStepAttemptId: invokeAttempt.id, + parentStepAttemptNamespaceId: workflowAttempt.namespaceId, + parentStepAttemptId: workflowAttempt.id, availableAt: null, deadlineAt: null, }); @@ -1427,7 +1427,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: parentRun.id, workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion stepName: randomUUID(), - kind: "invoke", + kind: "workflow", config: {}, context: null, }); @@ -1468,7 +1468,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: parentRun.id, workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion stepName: randomUUID(), - kind: "invoke", + kind: "workflow", config: {}, context: null, }); @@ -1502,7 +1502,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: parentRun.id, workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion stepName: randomUUID(), - kind: "invoke", + kind: "workflow", config: {}, context: null, }); diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index a640ccca..f63c678a 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -15,6 +15,7 @@ import { } from "./execution.js"; import { randomUUID } from "node:crypto"; import { afterEach, describe, test, expect, vi } from "vitest"; +import { z } from "zod"; const backendsToStop: BackendPostgres[] = []; @@ -357,23 +358,22 @@ describe("StepExecutor", () => { expect(sleepStepNames).toEqual(["pause", "pause:1"]); }); - test("invokes a child workflow and returns child output", async () => { + test("runs a child workflow and returns child output", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-success-${randomUUID()}` }, + { name: `workflow-child-success-${randomUUID()}` }, ({ input }: { input: { value: number } }) => { return input.value + 1; }, ); const parent = client.defineWorkflow<{ value: number }, number>( - { name: `invoke-parent-success-${randomUUID()}` }, + { name: `workflow-parent-success-${randomUUID()}` }, async ({ input, step }) => { - const childResult = await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - input: { value: input.value }, + const childResult = await step.runWorkflow(child.workflow.spec, { + value: input.value, }); return childResult * 2; }, @@ -391,25 +391,31 @@ describe("StepExecutor", () => { expect(status).toBe("completed"); await expect(handle.result()).resolves.toBe(12); + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const workflowAttempt = attempts.data.find( + (stepAttempt) => stepAttempt.kind === "workflow", + ); + expect(workflowAttempt?.stepName).toBe(child.workflow.spec.name); }); - test("wakes parent invoke wait when child completes before parent parks", async () => { + test("wakes parent runWorkflow wait when child completes before parent parks", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-race-${randomUUID()}` }, + { name: `workflow-child-race-${randomUUID()}` }, () => { return { ok: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-race-${randomUUID()}` }, + { name: `workflow-parent-race-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -440,23 +446,21 @@ describe("StepExecutor", () => { } }); - test("completes parent immediately when invoked child already finished", async () => { + test("completes parent immediately when child workflow already finished", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-early-finish-${randomUUID()}` }, + { name: `workflow-child-early-finish-${randomUUID()}` }, () => { return { ignored: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-early-finish-${randomUUID()}` }, + { name: `workflow-parent-early-finish-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -520,24 +524,25 @@ describe("StepExecutor", () => { await expect(handle.result()).resolves.toEqual({ fast: true }); }); - test("supports invoke step string name options", async () => { + test("supports explicit runWorkflow options.name override", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-step-shape-${randomUUID()}` }, + { name: `workflow-child-step-shape-${randomUUID()}` }, ({ input }: { input: { value: number } }) => { return input.value + 1; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-step-shape-${randomUUID()}` }, + { name: `workflow-parent-step-shape-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - input: { value: 9 }, - }); + return await step.runWorkflow( + child.workflow.spec, + { value: 9 }, + { name: "workflow-child" }, + ); }, ); @@ -555,26 +560,27 @@ describe("StepExecutor", () => { await expect(handle.result()).resolves.toBe(10); }); - test("applies collision indexing across step.run and step.invokeWorkflow", async () => { + test("applies collision indexing across step.run and step.runWorkflow", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-cross-type-${randomUUID()}` }, + { name: `workflow-child-cross-type-${randomUUID()}` }, ({ input }: { input: { value: number } }) => { return input.value + 1; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-cross-type-${randomUUID()}` }, + { name: `workflow-parent-cross-type-${randomUUID()}` }, async ({ step }) => { const local = await step.run({ name: "shared-name" }, () => 41); - const invoked = await step.invokeWorkflow("shared-name", { - workflow: child.workflow, - input: { value: 1 }, - }); - return { local, invoked }; + const childWorkflowResult = await step.runWorkflow( + child.workflow.spec, + { value: 1 }, + { name: "shared-name" }, + ); + return { local, childWorkflowResult }; }, ); @@ -589,7 +595,10 @@ describe("StepExecutor", () => { ); expect(status).toBe("completed"); - await expect(handle.result()).resolves.toEqual({ local: 41, invoked: 2 }); + await expect(handle.result()).resolves.toEqual({ + local: 41, + childWorkflowResult: 2, + }); const steps = await backend.listStepAttempts({ workflowRunId: handle.workflowRun.id, @@ -609,43 +618,43 @@ describe("StepExecutor", () => { [...kindByStepName.keys()].toSorted((a, b) => a.localeCompare(b)), ).toEqual(["shared-name", "shared-name:1"]); expect(kindByStepName.get("shared-name")).toBe("function"); - expect(kindByStepName.get("shared-name:1")).toBe("invoke"); + expect(kindByStepName.get("shared-name:1")).toBe("workflow"); }); - test("supports workflow-name targets, date/number timeouts, and auto-indexed duplicate invoke names", async () => { + test("supports workflow spec targets, date/number timeouts, and auto-indexed duplicate workflow names", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-timeout-shapes-${randomUUID()}` }, + { name: `workflow-child-timeout-shapes-${randomUUID()}` }, ({ input }: { input: { value: number } }) => { return input.value + 1; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-timeout-shapes-${randomUUID()}` }, + { name: `workflow-parent-timeout-shapes-${randomUUID()}` }, async ({ step }) => { - const first = await step.invokeWorkflow("invoke-cached", { - workflow: child.workflow.spec.name, - input: { value: 4 }, - timeout: new Date(Date.now() + 60_000), - }); - const second = await step.invokeWorkflow("invoke-cached", { - workflow: child.workflow.spec.name, - input: { value: 99 }, - timeout: 60_000, - }); - const numeric = await step.invokeWorkflow("invoke-number-timeout", { - workflow: child.workflow.spec.name, - input: { value: 8 }, - timeout: 60_000, - }); - const spec = await step.invokeWorkflow("invoke-spec-target", { - workflow: { name: child.workflow.spec.name }, - input: { value: 1 }, - timeout: 60_000, - }); + const first = await step.runWorkflow( + child.workflow.spec, + { value: 4 }, + { timeout: new Date(Date.now() + 60_000) }, + ); + const second = await step.runWorkflow( + child.workflow.spec, + { value: 99 }, + { timeout: 60_000 }, + ); + const numeric = await step.runWorkflow( + child.workflow.spec, + { value: 8 }, + { timeout: 60_000 }, + ); + const spec = await step.runWorkflow( + { name: child.workflow.spec.name }, + { value: 1 }, + { timeout: 60_000 }, + ); return { first, second, numeric, spec }; }, ); @@ -672,29 +681,30 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - const invokeStepNames = steps.data - .filter((stepAttempt) => stepAttempt.kind === "invoke") + const workflowStepNames = steps.data + .filter((stepAttempt) => stepAttempt.kind === "workflow") .map((stepAttempt) => stepAttempt.stepName) .toSorted((a, b) => a.localeCompare(b)); - expect(invokeStepNames).toEqual([ - "invoke-cached", - "invoke-cached:1", - "invoke-number-timeout", - "invoke-spec-target", + expect(workflowStepNames).toEqual([ + child.workflow.spec.name, + `${child.workflow.spec.name}:1`, + `${child.workflow.spec.name}:2`, + `${child.workflow.spec.name}:3`, ]); }); - test("fails invoke when timeout number is invalid", async () => { + test("fails workflow when timeout number is invalid", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const parent = client.defineWorkflow( - { name: `invoke-parent-invalid-timeout-number-${randomUUID()}` }, + { name: `workflow-parent-invalid-timeout-number-${randomUUID()}` }, async ({ step }) => { - await step.invokeWorkflow("invoke-child", { - workflow: `invoke-child-invalid-timeout-number-${randomUUID()}`, - timeout: -1, - }); + await step.runWorkflow( + { name: `workflow-child-invalid-timeout-number-${randomUUID()}` }, + undefined, + { timeout: -1 }, + ); return "never"; }, ); @@ -711,21 +721,24 @@ describe("StepExecutor", () => { expect(status).toBe("failed"); await expect(handle.result()).rejects.toThrow( - /Invoke timeout must be a non-negative number/, + /Workflow timeout must be a non-negative number/, ); }); - test("fails invoke when timeout duration string is invalid", async () => { + test("fails workflow when timeout duration string is invalid", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const parent = client.defineWorkflow( - { name: `invoke-parent-invalid-timeout-duration-${randomUUID()}` }, + { name: `workflow-parent-invalid-timeout-duration-${randomUUID()}` }, async ({ step }) => { - await step.invokeWorkflow("invoke-child", { - workflow: `invoke-child-invalid-timeout-duration-${randomUUID()}`, - timeout: "not-a-duration" as DurationString, - }); + await step.runWorkflow( + { name: `workflow-child-invalid-timeout-duration-${randomUUID()}` }, + undefined, + { + timeout: "not-a-duration" as DurationString, + }, + ); return "never"; }, ); @@ -744,23 +757,65 @@ describe("StepExecutor", () => { await expect(handle.result()).rejects.toThrow(/not-a-duration/); }); - test("handles invoke replay with non-invoke context shape", async () => { + test("validates child input before creating child run", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-context-null-${randomUUID()}` }, + { + name: `workflow-child-schema-${randomUUID()}`, + schema: z.object({ email: z.email() }), + }, + ({ input }: { input: { email: string } }) => { + return input.email; + }, + ); + + const parent = client.defineWorkflow( + { name: `workflow-parent-schema-${randomUUID()}` }, + async ({ step }) => { + await step.runWorkflow(child.workflow.spec, { + email: "not-an-email", + }); + return "never"; + }, + ); + + const worker = client.newWorker(); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 150, + 10, + ); + + expect(status).toBe("failed"); + await expect(handle.result()).rejects.toThrow(); + + const runs = await backend.listWorkflowRuns({ limit: 100 }); + const childRuns = runs.data.filter( + (run) => run.parentStepAttemptId !== null, + ); + expect(childRuns).toHaveLength(0); + }); + + test("handles runWorkflow replay with non-workflow context shape", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `workflow-child-context-null-${randomUUID()}` }, () => { return { ok: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-context-null-${randomUUID()}` }, + { name: `workflow-parent-context-null-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -794,23 +849,21 @@ describe("StepExecutor", () => { } }); - test("handles invoke replay with legacy null timeout context", async () => { + test("handles runWorkflow replay with legacy null timeout context", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-legacy-timeout-${randomUUID()}` }, + { name: `workflow-child-legacy-timeout-${randomUUID()}` }, () => { return { ok: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-legacy-timeout-${randomUUID()}` }, + { name: `workflow-parent-legacy-timeout-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -822,7 +875,7 @@ describe("StepExecutor", () => { const linked = await originalSetStepAttemptChildWorkflowRun(params); return { ...linked, - context: { kind: "invoke", timeoutAt: null }, + context: { kind: "workflow", timeoutAt: null }, }; }); @@ -844,23 +897,21 @@ describe("StepExecutor", () => { } }); - test("handles invoke replay with invalid timeout timestamp context", async () => { + test("handles runWorkflow replay with invalid timeout timestamp context", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-invalid-timeout-context-${randomUUID()}` }, + { name: `workflow-child-invalid-timeout-context-${randomUUID()}` }, () => { return { ok: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-invalid-timeout-context-${randomUUID()}` }, + { name: `workflow-parent-invalid-timeout-context-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -874,7 +925,7 @@ describe("StepExecutor", () => { const linked = await originalSetStepAttemptChildWorkflowRun(params); return { ...linked, - context: { kind: "invoke", timeoutAt: "not-a-date" }, + context: { kind: "workflow", timeoutAt: "not-a-date" }, }; }); @@ -928,23 +979,21 @@ describe("StepExecutor", () => { } }); - test("fails invoke when child linkage is missing run id", async () => { + test("fails workflow when child linkage is missing run id", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-link-missing-id-${randomUUID()}` }, + { name: `workflow-child-link-missing-id-${randomUUID()}` }, () => { return { ok: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-link-missing-id-${randomUUID()}` }, + { name: `workflow-parent-link-missing-id-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -993,23 +1042,21 @@ describe("StepExecutor", () => { } }); - test("fails invoke when linked child run cannot be loaded", async () => { + test("fails workflow when linked child run cannot be loaded", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-not-found-${randomUUID()}` }, + { name: `workflow-child-not-found-${randomUUID()}` }, () => { return { ok: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-child-not-found-${randomUUID()}` }, + { name: `workflow-parent-child-not-found-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -1071,18 +1118,16 @@ describe("StepExecutor", () => { const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-failed-null-error-${randomUUID()}` }, + { name: `workflow-child-failed-null-error-${randomUUID()}` }, () => { return { ok: true }; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-failed-null-error-${randomUUID()}` }, + { name: `workflow-parent-failed-null-error-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -1146,12 +1191,12 @@ describe("StepExecutor", () => { } }); - test("surfaces canceled child workflow through parent invoke step", async () => { + test("surfaces canceled child workflow through parent workflow step", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-canceled-${randomUUID()}` }, + { name: `workflow-child-canceled-${randomUUID()}` }, async ({ step }) => { await step.sleep("wait", "500ms"); return { ok: true }; @@ -1159,11 +1204,9 @@ describe("StepExecutor", () => { ); const parent = client.defineWorkflow( - { name: `invoke-parent-canceled-${randomUUID()}` }, + { name: `workflow-parent-canceled-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -1196,12 +1239,12 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - const invokeAttempt = attempts.data.find( - (stepAttempt) => stepAttempt.stepName === "invoke-child", + const workflowAttempt = attempts.data.find( + (stepAttempt) => stepAttempt.stepName === child.workflow.spec.name, ); - const childRunId = invokeAttempt?.childWorkflowRunId; + const childRunId = workflowAttempt?.childWorkflowRunId; if (!childRunId) { - throw new Error("Expected invoke attempt child workflow run id"); + throw new Error("Expected workflow attempt child workflow run id"); } await backend.cancelWorkflowRun({ @@ -1233,42 +1276,12 @@ describe("StepExecutor", () => { await expect(handle.result()).rejects.toThrow(/was canceled/); }); - test("fails invoke when workflow target string is empty", async () => { - const backend = await createBackend(); - const client = new OpenWorkflow({ backend }); - - const parent = client.defineWorkflow( - { name: `invoke-parent-empty-target-${randomUUID()}` }, - async ({ step }) => { - await step.invokeWorkflow("invoke-child", { - workflow: "", - }); - return "never"; - }, - ); - - const worker = client.newWorker(); - const handle = await parent.run(); - const status = await tickUntilTerminal( - backend, - worker, - handle.workflowRun.id, - 150, - 10, - ); - - expect(status).toBe("failed"); - await expect(handle.result()).rejects.toThrow( - /Invoke workflow target must be a non-empty string/, - ); - }); - - test("surfaces child failure through parent invoke step", async () => { + test("surfaces child failure through parent workflow step", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-failure-${randomUUID()}` }, + { name: `workflow-child-failure-${randomUUID()}` }, async ({ step }) => { await step.run( { name: "fail", retryPolicy: { maximumAttempts: 1 } }, @@ -1281,12 +1294,9 @@ describe("StepExecutor", () => { ); const parent = client.defineWorkflow( - { name: `invoke-parent-failure-${randomUUID()}` }, + { name: `workflow-parent-failure-${randomUUID()}` }, async ({ step }) => { - await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - input: null, - }); + await step.runWorkflow(child.workflow.spec, null); return "never"; }, ); @@ -1305,12 +1315,12 @@ describe("StepExecutor", () => { await expect(handle.result()).rejects.toThrow(/child boom/); }); - test("invoke timeout fails parent wait but child continues and completes", async () => { + test("workflow timeout fails parent wait but child continues and completes", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-timeout-${randomUUID()}` }, + { name: `workflow-child-timeout-${randomUUID()}` }, async ({ step }) => { await step.sleep("wait", "600ms"); return { ok: true }; @@ -1318,10 +1328,9 @@ describe("StepExecutor", () => { ); const parent = client.defineWorkflow( - { name: `invoke-parent-timeout-${randomUUID()}` }, + { name: `workflow-parent-timeout-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, + return await step.runWorkflow(child.workflow.spec, undefined, { timeout: "100ms", }); }, @@ -1338,29 +1347,29 @@ describe("StepExecutor", () => { ); expect(parentStatus).toBe("failed"); await expect(handle.result()).rejects.toThrow( - /Timed out waiting for invoked workflow to complete/, + /Timed out waiting for child workflow to complete/, ); const steps = await backend.listStepAttempts({ workflowRunId: handle.workflowRun.id, limit: 100, }); - const invokeAttempt = steps.data.find( - (step) => step.stepName === "invoke-child", + const workflowAttempt = steps.data.find( + (step) => step.stepName === child.workflow.spec.name, ); - expect(invokeAttempt?.childWorkflowRunId).not.toBeNull(); - expect(invokeAttempt?.childWorkflowRunId).toHaveLength(36); + expect(workflowAttempt?.childWorkflowRunId).not.toBeNull(); + expect(workflowAttempt?.childWorkflowRunId).toHaveLength(36); - const childRunId = invokeAttempt?.childWorkflowRunId; + const childRunId = workflowAttempt?.childWorkflowRunId; if (!childRunId) { - throw new Error("Expected invoke attempt child workflow run id"); + throw new Error("Expected workflow attempt child workflow run id"); } const runs = await backend.listWorkflowRuns({ limit: 100 }); - const childrenForInvokeAttempt = runs.data.filter( - (run) => run.parentStepAttemptId === invokeAttempt.id, + const childrenForWorkflowAttempt = runs.data.filter( + (run) => run.parentStepAttemptId === workflowAttempt.id, ); - expect(childrenForInvokeAttempt).toHaveLength(1); + expect(childrenForWorkflowAttempt).toHaveLength(1); const childStatus = await tickUntilStatus( backend, @@ -1373,12 +1382,12 @@ describe("StepExecutor", () => { expect(childStatus).toBe("completed"); }); - test("invoke timeout still fails when child finishes after timeout before parent replay", async () => { + test("workflow timeout still fails when child finishes after timeout before parent replay", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-timeout-order-${randomUUID()}` }, + { name: `workflow-child-timeout-order-${randomUUID()}` }, async ({ step }) => { await step.sleep("wait", "600ms"); return { ok: true }; @@ -1386,10 +1395,9 @@ describe("StepExecutor", () => { ); const parent = client.defineWorkflow( - { name: `invoke-parent-timeout-order-${randomUUID()}` }, + { name: `workflow-parent-timeout-order-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, + return await step.runWorkflow(child.workflow.spec, undefined, { timeout: "100ms", }); }, @@ -1419,12 +1427,12 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - const invokeAttempt = steps.data.find( - (step) => step.stepName === "invoke-child", + const workflowAttempt = steps.data.find( + (step) => step.stepName === child.workflow.spec.name, ); - const childRunId = invokeAttempt?.childWorkflowRunId; + const childRunId = workflowAttempt?.childWorkflowRunId; if (!childRunId) { - throw new Error("Expected invoke attempt child workflow run id"); + throw new Error("Expected workflow attempt child workflow run id"); } const parentAfterFirstPass = await backend.getWorkflowRun({ @@ -1473,16 +1481,16 @@ describe("StepExecutor", () => { }); expect(parentAfterReplay?.status).toBe("failed"); await expect(handle.result()).rejects.toThrow( - /Timed out waiting for invoked workflow to complete/, + /Timed out waiting for child workflow to complete/, ); }); - test("invoke wait parks until timeout and does not use poll-loop wake-up events", async () => { + test("runWorkflow wait parks until timeout and does not use poll-loop wake-up events", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-parked-${randomUUID()}` }, + { name: `workflow-child-parked-${randomUUID()}` }, async ({ step }) => { await step.sleep("wait", "200ms"); return { ok: true }; @@ -1490,11 +1498,9 @@ describe("StepExecutor", () => { ); const parent = client.defineWorkflow( - { name: `invoke-parent-parked-${randomUUID()}` }, + { name: `workflow-parent-parked-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -1525,28 +1531,26 @@ describe("StepExecutor", () => { await expect(handle.result()).resolves.toEqual({ ok: true }); }); - test("supports parallel invokes via Promise.all", async () => { + test("supports parallel workflows via Promise.all", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-parallel-${randomUUID()}` }, + { name: `workflow-child-parallel-${randomUUID()}` }, ({ input }: { input: { value: number } }) => { return input.value * 2; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-parallel-${randomUUID()}` }, + { name: `workflow-parent-parallel-${randomUUID()}` }, async ({ step }) => { const [a, b] = await Promise.all([ - step.invokeWorkflow("invoke-a", { - workflow: child.workflow, - input: { value: 2 }, + step.runWorkflow(child.workflow.spec, { + value: 2, }), - step.invokeWorkflow("invoke-b", { - workflow: child.workflow, - input: { value: 3 }, + step.runWorkflow(child.workflow.spec, { + value: 3, }), ]); return a + b; @@ -1567,28 +1571,26 @@ describe("StepExecutor", () => { await expect(handle.result()).resolves.toBe(10); }); - test("auto-indexes duplicate invoke names in parallel Promise.all", async () => { + test("auto-indexes duplicate workflow names in parallel Promise.all", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-parallel-duplicate-${randomUUID()}` }, + { name: `workflow-child-parallel-duplicate-${randomUUID()}` }, ({ input }: { input: { value: number } }) => { return input.value * 3; }, ); const parent = client.defineWorkflow( - { name: `invoke-parent-parallel-duplicate-${randomUUID()}` }, + { name: `workflow-parent-parallel-duplicate-${randomUUID()}` }, async ({ step }) => { const [first, second] = await Promise.all([ - step.invokeWorkflow("invoke-same", { - workflow: child.workflow, - input: { value: 2 }, + step.runWorkflow(child.workflow.spec, { + value: 2, }), - step.invokeWorkflow("invoke-same", { - workflow: child.workflow, - input: { value: 3 }, + step.runWorkflow(child.workflow.spec, { + value: 3, }), ]); return { first, second }; @@ -1612,15 +1614,18 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - const invokeStepNames = steps.data + const workflowStepNames = steps.data .filter( (stepAttempt) => - stepAttempt.kind === "invoke" && - stepAttempt.stepName.startsWith("invoke-same"), + stepAttempt.kind === "workflow" && + stepAttempt.stepName.startsWith(child.workflow.spec.name), ) .map((stepAttempt) => stepAttempt.stepName) .toSorted((a, b) => a.localeCompare(b)); - expect(invokeStepNames).toEqual(["invoke-same", "invoke-same:1"]); + expect(workflowStepNames).toEqual([ + child.workflow.spec.name, + `${child.workflow.spec.name}:1`, + ]); }); test("does not create duplicate child runs while waiting across replays", async () => { @@ -1628,7 +1633,7 @@ describe("StepExecutor", () => { const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-replay-${randomUUID()}` }, + { name: `workflow-child-replay-${randomUUID()}` }, async ({ step }) => { await step.sleep("wait", "600ms"); return { ok: true }; @@ -1636,11 +1641,9 @@ describe("StepExecutor", () => { ); const parent = client.defineWorkflow( - { name: `invoke-parent-replay-${randomUUID()}` }, + { name: `workflow-parent-replay-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -1661,18 +1664,20 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - const invokeAttempt = steps.data.find( - (step) => step.stepName === "invoke-child", + const workflowAttempt = steps.data.find( + (step) => step.stepName === child.workflow.spec.name, ); - if (!invokeAttempt) { - throw new Error("Expected invoke attempt for step invoke-child"); + if (!workflowAttempt) { + throw new Error( + `Expected workflow attempt for step ${child.workflow.spec.name}`, + ); } const runs = await backend.listWorkflowRuns({ limit: 100 }); - const childrenForInvokeAttempt = runs.data.filter( - (run) => run.parentStepAttemptId === invokeAttempt.id, + const childrenForWorkflowAttempt = runs.data.filter( + (run) => run.parentStepAttemptId === workflowAttempt.id, ); - expect(childrenForInvokeAttempt).toHaveLength(1); + expect(childrenForWorkflowAttempt).toHaveLength(1); }); test("canceling parent while waiting does not cancel child workflow", async () => { @@ -1680,7 +1685,7 @@ describe("StepExecutor", () => { const client = new OpenWorkflow({ backend }); const child = client.defineWorkflow( - { name: `invoke-child-cancel-${randomUUID()}` }, + { name: `workflow-child-cancel-${randomUUID()}` }, async ({ step }) => { await step.sleep("wait", "600ms"); return { childDone: true }; @@ -1688,11 +1693,9 @@ describe("StepExecutor", () => { ); const parent = client.defineWorkflow( - { name: `invoke-parent-cancel-${randomUUID()}` }, + { name: `workflow-parent-cancel-${randomUUID()}` }, async ({ step }) => { - return await step.invokeWorkflow("invoke-child", { - workflow: child.workflow, - }); + return await step.runWorkflow(child.workflow.spec); }, ); @@ -1704,12 +1707,12 @@ describe("StepExecutor", () => { workflowRunId: handle.workflowRun.id, limit: 100, }); - const invokeAttempt = steps.data.find( - (step) => step.stepName === "invoke-child", + const workflowAttempt = steps.data.find( + (step) => step.stepName === child.workflow.spec.name, ); - const childRunId = invokeAttempt?.childWorkflowRunId; + const childRunId = workflowAttempt?.childWorkflowRunId; if (!childRunId) { - throw new Error("Expected invoke attempt child workflow run id"); + throw new Error("Expected workflow attempt child workflow run id"); } await handle.cancel(); diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index b9c2ebac..d3f60474 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -9,24 +9,27 @@ import { normalizeStepOutput, calculateDateFromDuration, createSleepContext, - createInvokeContext, + createWorkflowContext, } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate, DEFAULT_WORKFLOW_RETRY_POLICY, type RetryPolicy, - type Workflow, type WorkflowSpec, } from "../core/workflow-definition.js"; import type { - InvokeStepConfig, + StepRunWorkflowOptions, StepApi, StepFunction, StepFunctionConfig, WorkflowFunction, WorkflowRunMetadata, } from "../core/workflow-function.js"; -import { isTerminalStatus, type WorkflowRun } from "../core/workflow-run.js"; +import { + isTerminalStatus, + validateInput, + type WorkflowRun, +} from "../core/workflow-run.js"; /** * Signal thrown when a workflow needs to sleep. Contains the time when the @@ -78,10 +81,10 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { }; /** - * Retry policy for invoke step failures (no retries - the child workflow + * Retry policy for workflow step failures (no retries - the child workflow * is responsible for retries). */ -const INVOKE_FAILURE_RETRY_POLICY: RetryPolicy = { +const WORKFLOW_STEP_FAILURE_RETRY_POLICY: RetryPolicy = { ...DEFAULT_STEP_RETRY_POLICY, maximumAttempts: 1, }; @@ -187,16 +190,16 @@ export function createStepExecutionStateFromAttempts( } /** - * Resolve invoke timeout input to an absolute deadline. + * Resolve workflow timeout input to an absolute deadline. * @param timeout - Relative/absolute timeout input * @returns Absolute timeout deadline * @throws {Error} When timeout is invalid */ -function resolveInvokeTimeoutAt( +function resolveWorkflowTimeoutAt( timeout: number | string | Date | undefined, ): Date { if (timeout === undefined) { - return defaultInvokeTimeoutAt(); + return defaultWorkflowTimeoutAt(); } if (timeout instanceof Date) { @@ -205,7 +208,7 @@ function resolveInvokeTimeoutAt( if (typeof timeout === "number") { if (!Number.isFinite(timeout) || timeout < 0) { - throw new Error("Invoke timeout must be a non-negative number"); + throw new Error("Workflow timeout must be a non-negative number"); } return new Date(Date.now() + timeout); } @@ -218,45 +221,45 @@ function resolveInvokeTimeoutAt( } /** - * Default invoke timeout: 7 days from a base time. + * Default workflow timeout: 7 days from a base time. * @param base - Base timestamp (defaults to now) * @returns Timeout deadline */ -function defaultInvokeTimeoutAt(base: Readonly = new Date()): Date { +function defaultWorkflowTimeoutAt(base: Readonly = new Date()): Date { const timeoutAt = new Date(base); timeoutAt.setDate(timeoutAt.getDate() + 7); return timeoutAt; } /** - * Extract the invoke timeout from a persisted step attempt's context. - * @param attempt - Running invoke step attempt - * @returns Timeout deadline, or null when context is not invoke + * Extract the workflow timeout from a persisted step attempt's context. + * @param attempt - Running workflow step attempt + * @returns Timeout deadline, or null when context is not workflow */ -function getInvokeTimeoutAt(attempt: Readonly): Date | null { - if (attempt.context?.kind !== "invoke") { +function getWorkflowTimeoutAt(attempt: Readonly): Date | null { + if (attempt.context?.kind !== "workflow") { return null; } if (attempt.context.timeoutAt === null) { - // Backward compatibility for previously persisted invoke contexts. - return defaultInvokeTimeoutAt(attempt.createdAt); + // Backward compatibility for previously persisted workflow contexts. + return defaultWorkflowTimeoutAt(attempt.createdAt); } return new Date(attempt.context.timeoutAt); } /** - * Determine whether the invoke timeout has elapsed before the child completed. - * @param attempt - Running invoke step attempt + * Determine whether the workflow timeout has elapsed before the child completed. + * @param attempt - Running workflow step attempt * @param childRun - Linked child workflow run * @returns True when timeout elapsed before child terminal completion */ -function hasInvokeTimedOut( +function hasWorkflowTimedOut( attempt: Readonly, childRun: Readonly, ): boolean { - const timeoutAt = getInvokeTimeoutAt(attempt); + const timeoutAt = getWorkflowTimeoutAt(attempt); if (!timeoutAt) return false; const timeoutMs = timeoutAt.getTime(); @@ -270,31 +273,13 @@ function hasInvokeTimedOut( return true; } -/** - * Normalize a workflow target (string | WorkflowSpec | Workflow) to a - * WorkflowSpec. - * @param workflow - Workflow target reference - * @returns WorkflowSpec for child run creation - */ -function toWorkflowSpec( - workflow: - | WorkflowSpec - | Workflow - | string, -): WorkflowSpec { - if (typeof workflow === "string") { - return { name: workflow }; - } - return "spec" in workflow ? workflow.spec : workflow; -} - /** * Build deterministic idempotency key for child workflow invocation. - * @param attempt - Parent invoke step attempt + * @param attempt - Parent workflow step attempt * @returns Stable idempotency key */ -function buildInvokeIdempotencyKey(attempt: Readonly): string { - return `__invoke:${attempt.namespaceId}:${attempt.id}`; +function buildWorkflowIdempotencyKey(attempt: Readonly): string { + return `__workflow:${attempt.namespaceId}:${attempt.id}`; } /** @@ -308,6 +293,16 @@ export interface StepExecutorOptions { stepLimit?: number; } +interface RunWorkflowStepRequest< + Input = unknown, + Output = unknown, + RunInput = Input, +> { + workflowSpec: WorkflowSpec; + input: RunInput | undefined; + timeout: number | string | Date | undefined; +} + /** * Replays prior step attempts and persists new ones while memoizing * deterministic step outputs. @@ -458,34 +453,45 @@ class StepExecutor implements StepApi { throw new SleepSignal(resumeAt); } - // ---- step.invokeWorkflow ----------------------------------------------- + // ---- step.runWorkflow ----------------------------------------------- - async invokeWorkflow( - baseStepName: string, - opts: Readonly>, + async runWorkflow( + spec: WorkflowSpec, + input?: RunInput, + options?: Readonly, ): Promise { - const stepName = this.resolveStepName(baseStepName); + const stepName = this.resolveStepName(options?.name ?? spec.name); + const request: RunWorkflowStepRequest = { + workflowSpec: spec, + input, + timeout: options?.timeout, + }; + const existingAttempt = getCachedStepAttempt(this.cache, stepName); if (existingAttempt) { return existingAttempt.output as Output; } - // Resume a running invoke attempt (replay path) + // Resume a running workflow attempt (replay path) const runningAttempt = this.runningByStepName.get(stepName); - if (runningAttempt?.kind === "invoke") { - return await this.resolveRunningInvoke(stepName, runningAttempt, opts); + if (runningAttempt?.kind === "workflow") { + return await this.resolveRunningWorkflow( + stepName, + runningAttempt, + request, + ); } - // First encounter — create the invoke step and child workflow run - const timeoutAt = resolveInvokeTimeoutAt(opts.timeout); + // First encounter — create the workflow step and child workflow run + const timeoutAt = resolveWorkflowTimeoutAt(request.timeout); this.ensureStepLimitNotReached(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, stepName, - kind: "invoke", + kind: "workflow", config: {}, - context: createInvokeContext(timeoutAt), + context: createWorkflowContext(timeoutAt), }); this.stepCount += 1; this.runningByStepName.set(stepName, attempt); @@ -493,50 +499,50 @@ class StepExecutor implements StepApi { const linkedAttempt = await this.linkChildWorkflowRun( stepName, attempt, - opts, + request, ).catch( async (error: unknown) => await this.failStepWithError( stepName, attempt.id, error, - INVOKE_FAILURE_RETRY_POLICY, + WORKFLOW_STEP_FAILURE_RETRY_POLICY, ), ); - return await this.resolveRunningInvoke(stepName, linkedAttempt, opts); + return await this.resolveRunningWorkflow(stepName, linkedAttempt, request); } /** - * Resolve a running invoke attempt — check child status and either complete, + * Resolve a running workflow attempt — check child status and either complete, * fail, or go back to sleep. - * @param stepName - Invoke step name - * @param runningAttempt - Previously created invoke step attempt - * @param opts - Invoke step configuration + * @param stepName - Workflow step name + * @param runningAttempt - Previously created workflow step attempt + * @param request - Workflow step request * @returns The child workflow output when available */ - private async resolveRunningInvoke( + private async resolveRunningWorkflow( stepName: string, runningAttempt: Readonly, - opts: Readonly>, + request: Readonly>, ): Promise { - // Ensure the invoke attempt has a linked child (may need to create one if + // Ensure the workflow attempt has a linked child (may need to create one if // a previous attempt crashed before linking) - const invokeAttempt = + const workflowAttempt = runningAttempt.childWorkflowRunId && runningAttempt.childWorkflowRunNamespaceId ? runningAttempt - : await this.linkChildWorkflowRun(stepName, runningAttempt, opts); + : await this.linkChildWorkflowRun(stepName, runningAttempt, request); - const childId = invokeAttempt.childWorkflowRunId; + const childId = workflowAttempt.childWorkflowRunId; if (!childId) { return await this.failStepWithError( stepName, - invokeAttempt.id, + workflowAttempt.id, new Error( - `Invoke step "${stepName}" could not find linked child workflow run`, + `Workflow step "${stepName}" could not find linked child workflow run`, ), - INVOKE_FAILURE_RETRY_POLICY, + WORKFLOW_STEP_FAILURE_RETRY_POLICY, ); } @@ -546,21 +552,21 @@ class StepExecutor implements StepApi { if (!childRun) { return await this.failStepWithError( stepName, - invokeAttempt.id, + workflowAttempt.id, new Error( - `Invoke step "${stepName}" could not find linked child workflow run "${childId}"`, + `Workflow step "${stepName}" could not find linked child workflow run "${childId}"`, ), - INVOKE_FAILURE_RETRY_POLICY, + WORKFLOW_STEP_FAILURE_RETRY_POLICY, ); } // Check timeout before checking child result - if (hasInvokeTimedOut(invokeAttempt, childRun)) { + if (hasWorkflowTimedOut(workflowAttempt, childRun)) { return await this.failStepWithError( stepName, - invokeAttempt.id, - new Error("Timed out waiting for invoked workflow to complete"), - INVOKE_FAILURE_RETRY_POLICY, + workflowAttempt.id, + new Error("Timed out waiting for child workflow to complete"), + WORKFLOW_STEP_FAILURE_RETRY_POLICY, ); } @@ -568,7 +574,7 @@ class StepExecutor implements StepApi { if (childRun.status === "completed" || childRun.status === "succeeded") { const completed = await this.backend.completeStepAttempt({ workflowRunId: this.workflowRunId, - stepAttemptId: invokeAttempt.id, + stepAttemptId: workflowAttempt.id, workerId: this.workerId, output: childRun.output, }); @@ -585,9 +591,9 @@ class StepExecutor implements StepApi { : deserializeError(childRun.error); return await this.failStepWithError( stepName, - invokeAttempt.id, + workflowAttempt.id, childError, - INVOKE_FAILURE_RETRY_POLICY, + WORKFLOW_STEP_FAILURE_RETRY_POLICY, ); } @@ -595,47 +601,50 @@ class StepExecutor implements StepApi { if (childRun.status === "canceled") { return await this.failStepWithError( stepName, - invokeAttempt.id, + workflowAttempt.id, new Error( - `Invoke step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`, + `Workflow step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`, ), - INVOKE_FAILURE_RETRY_POLICY, + WORKFLOW_STEP_FAILURE_RETRY_POLICY, ); } // Child still running — sleep until timeout - const timeoutAt = getInvokeTimeoutAt(invokeAttempt); + const timeoutAt = getWorkflowTimeoutAt(workflowAttempt); throw new SleepSignal( - timeoutAt ?? defaultInvokeTimeoutAt(invokeAttempt.createdAt), + timeoutAt ?? defaultWorkflowTimeoutAt(workflowAttempt.createdAt), ); } /** * Create (or dedupe) the child workflow run and persist the linkage on the - * parent invoke step attempt. - * @param stepName - Parent invoke step name - * @param attempt - Parent invoke step attempt - * @param opts - Invoke step configuration + * parent workflow step attempt. + * @param stepName - Parent workflow step name + * @param attempt - Parent workflow step attempt + * @param request - Workflow step request * @returns Updated step attempt with child linkage */ - private async linkChildWorkflowRun( + private async linkChildWorkflowRun( stepName: string, attempt: Readonly, - opts: Readonly>, + request: Readonly>, ): Promise { - const workflow = opts.workflow; - if (typeof workflow === "string" && workflow.length === 0) { - throw new Error("Invoke workflow target must be a non-empty string"); + const validationResult = await validateInput( + request.workflowSpec.schema, + request.input, + ); + if (!validationResult.success) { + throw new Error(validationResult.error); } + const parsedInput = validationResult.value; - const spec = toWorkflowSpec(workflow); const childRun = await this.backend.createWorkflowRun({ - workflowName: spec.name, - version: spec.version ?? null, - idempotencyKey: buildInvokeIdempotencyKey(attempt), + workflowName: request.workflowSpec.name, + version: request.workflowSpec.version ?? null, + idempotencyKey: buildWorkflowIdempotencyKey(attempt), config: {}, context: null, - input: normalizeStepOutput(opts.input), + input: normalizeStepOutput(parsedInput), parentStepAttemptNamespaceId: attempt.namespaceId, parentStepAttemptId: attempt.id, availableAt: null, @@ -656,7 +665,7 @@ class StepExecutor implements StepApi { /** * Record a step failure, update the failed-attempt counter, and throw a - * StepError. Shared by both `step.run` failures and invoke failures. + * StepError. Shared by both `step.run` failures and workflow failures. * @param stepName - Step name * @param stepAttemptId - Step attempt id * @param error - Error that caused the failure @@ -710,7 +719,7 @@ export interface ExecuteWorkflowParams { /** * Execute a workflow run. This is the core application use case that handles: * - Loading step history - * - Handling paused (sleep/invoke wait) steps + * - Handling paused (sleep/runWorkflow wait) steps * - Creating the step executor * - Executing the workflow function * - Completing, failing, or parking the workflow run based on the outcome