From 2a0d036c979caf99c499493513019a9cd595dbec Mon Sep 17 00:00:00 2001 From: James Martinez Date: Wed, 25 Feb 2026 14:48:37 -0600 Subject: [PATCH 1/2] feat(openworkflow,dashboard): add child workflows --- ARCHITECTURE.md | 8 +- openworkflow/hello-world-parent.run.ts | 12 + openworkflow/hello-world-parent.ts | 18 + .../dashboard/src/components/run-list.tsx | 32 +- .../dashboard/src/components/ui/badge.tsx | 1 - packages/dashboard/src/lib/api.ts | 15 + packages/dashboard/src/routes/index.tsx | 61 +- packages/dashboard/src/routes/runs/$runId.tsx | 143 +- packages/docs/docs/roadmap.mdx | 2 +- packages/docs/docs/steps.mdx | 17 +- packages/docs/docs/workflows.mdx | 28 +- packages/openworkflow/README.md | 1 + packages/openworkflow/client/client.ts | 2 + packages/openworkflow/core/backend.ts | 13 + packages/openworkflow/core/error.test.ts | 34 +- packages/openworkflow/core/error.ts | 17 + .../openworkflow/core/step-attempt.test.ts | 22 + packages/openworkflow/core/step-attempt.ts | 42 +- .../openworkflow/core/workflow-function.ts | 34 +- .../openworkflow/core/workflow-run.test.ts | 17 +- packages/openworkflow/core/workflow-run.ts | 14 + .../openworkflow/postgres/backend.test.ts | 6 + packages/openworkflow/postgres/backend.ts | 84 +- packages/openworkflow/sqlite/backend.test.ts | 85 +- packages/openworkflow/sqlite/backend.ts | 123 +- .../openworkflow/testing/backend.testsuite.ts | 406 +++++ .../openworkflow/worker/execution.test.ts | 1304 ++++++++++++++++- packages/openworkflow/worker/execution.ts | 378 ++++- packages/openworkflow/worker/worker.test.ts | 8 + 29 files changed, 2840 insertions(+), 87 deletions(-) create mode 100644 openworkflow/hello-world-parent.run.ts create mode 100644 openworkflow/hello-world-parent.ts diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index af2631bd..17c2a82c 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -51,8 +51,8 @@ A workflow run can be in one of the following states: to claim it. - **`running`**: The workflow run is actively being executed by a worker. - **`sleeping`**: The workflow run is waiting for a duration to elapse - (`step.sleep`). The `availableAt` timestamp controls when it becomes available - again. + (`step.sleep`) or waiting for a child workflow result (`step.invoke`). The + `availableAt` timestamp controls when it becomes available again. - **`completed`**: The workflow run has completed successfully. - **`failed`**: The workflow run has failed after exhausting retries or deadline reached. @@ -220,6 +220,10 @@ worker slot for other work - it's not a blocking sleep but a durable pause. await step.sleep("wait-one-hour", "1h"); ``` +**`step.invoke(name, options)`**: Starts a child workflow and waits for it +durably. When the timeout is reached (default 7d), the parent step fails but the +child workflow continues running independently. + ## 4. Error Handling & Retries ### 4.1. Step Failures & Retries diff --git a/openworkflow/hello-world-parent.run.ts b/openworkflow/hello-world-parent.run.ts new file mode 100644 index 00000000..b703a672 --- /dev/null +++ b/openworkflow/hello-world-parent.run.ts @@ -0,0 +1,12 @@ +import { backend, ow } from "./client.js"; +import { helloWorldParent } from "./hello-world-parent.js"; + +console.log("Running hello-world-parent workflow..."); +const handle = await ow.runWorkflow(helloWorldParent.spec, {}); + +console.log("Waiting for result..."); +const result = await handle.result(); + +console.log(`Workflow result: ${JSON.stringify(result, null, 2)}`); + +await backend.stop(); diff --git a/openworkflow/hello-world-parent.ts b/openworkflow/hello-world-parent.ts new file mode 100644 index 00000000..d6910cf7 --- /dev/null +++ b/openworkflow/hello-world-parent.ts @@ -0,0 +1,18 @@ +import { helloWorld } from "./hello-world.js"; +import { defineWorkflow } from "openworkflow"; + +/** + * Example workflow that invokes hello-world as a child workflow. + */ +export const helloWorldParent = defineWorkflow( + { name: "hello-world-parent" }, + async ({ step, run }) => { + console.log(`[run ${run.id}]`); + + const childResult = await step.invoke("hello-world-child", { + workflow: helloWorld, + }); + + return { childResult, parentMessage: "Hello from the parent workflow!" }; + }, +); diff --git a/packages/dashboard/src/components/run-list.tsx b/packages/dashboard/src/components/run-list.tsx index e0776232..dd71f90e 100644 --- a/packages/dashboard/src/components/run-list.tsx +++ b/packages/dashboard/src/components/run-list.tsx @@ -7,8 +7,14 @@ import { CaretRightIcon } from "@phosphor-icons/react"; import { Link } from "@tanstack/react-router"; import type { WorkflowRun } from "openworkflow/internal"; +export interface ChildRunRelation { + parentRunId: string; + parentWorkflowName?: string | undefined; +} + export interface RunListProps { runs: WorkflowRun[]; + childRunRelationsByRunId?: Record; title?: string; showHeader?: boolean; showCount?: boolean; @@ -16,6 +22,7 @@ export interface RunListProps { export function RunList({ runs, + childRunRelationsByRunId, title = "Workflow Runs", showHeader = true, showCount = true, @@ -60,6 +67,7 @@ export function RunList({ const StatusIcon = config.icon; const duration = computeDuration(run.startedAt, run.finishedAt); const startedAt = formatRelativeTime(run.startedAt); + const childRunRelation = childRunRelationsByRunId?.[run.id]; return ( {run.workflowName} {run.version && ( - - {run.version} - + {run.version} )} {run.id} -
+
{config.label} + {childRunRelation && ( + + {childRunRelation.parentWorkflowName && ( + + [{childRunRelation.parentWorkflowName}] + + )} + {childRunRelation.parentRunId} + + )}
diff --git a/packages/dashboard/src/components/ui/badge.tsx b/packages/dashboard/src/components/ui/badge.tsx index 8926f556..456528d5 100644 --- a/packages/dashboard/src/components/ui/badge.tsx +++ b/packages/dashboard/src/components/ui/badge.tsx @@ -32,7 +32,6 @@ function Badge({ render, ...props }: useRender.ComponentProps<"span"> & VariantProps) { - // @ts-expect-error - render is not typed properly return useRender({ defaultTagName: "span", props: mergeProps<"span">( diff --git a/packages/dashboard/src/lib/api.ts b/packages/dashboard/src/lib/api.ts index 17b7b6e2..41ff0e86 100644 --- a/packages/dashboard/src/lib/api.ts +++ b/packages/dashboard/src/lib/api.ts @@ -111,6 +111,19 @@ export const listStepAttemptsServerFn = createServerFn({ method: "GET" }) return result; }); +/** + * Get a single step attempt by ID. + */ +export const getStepAttemptServerFn = createServerFn({ method: "GET" }) + .inputValidator(z.object({ stepAttemptId: z.string() })) + .handler(async ({ data }): Promise => { + const backend = await getBackend(); + const stepAttempt = await backend.getStepAttempt({ + stepAttemptId: data.stepAttemptId, + }); + return stepAttempt; + }); + /** * Create a new workflow run. */ @@ -154,6 +167,8 @@ export const createWorkflowRunServerFn = createServerFn({ method: "POST" }) config: {}, context: null, input: parsedInput, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt, deadlineAt, }); diff --git a/packages/dashboard/src/routes/index.tsx b/packages/dashboard/src/routes/index.tsx index 7bc275c0..428b7e52 100644 --- a/packages/dashboard/src/routes/index.tsx +++ b/packages/dashboard/src/routes/index.tsx @@ -1,6 +1,6 @@ import { AppLayout } from "@/components/app-layout"; import { CreateRunForm } from "@/components/create-run-form"; -import { RunList } from "@/components/run-list"; +import { RunList, type ChildRunRelation } from "@/components/run-list"; import { Button } from "@/components/ui/button"; import { Dialog, @@ -11,12 +11,15 @@ import { } from "@/components/ui/dialog"; import { WorkflowStats } from "@/components/workflow-stats"; import { + getStepAttemptServerFn, getWorkflowRunCountsServerFn, + getWorkflowRunServerFn, listWorkflowRunsServerFn, } from "@/lib/api"; import { usePolling } from "@/lib/use-polling"; import { PlusIcon } from "@phosphor-icons/react"; import { createFileRoute } from "@tanstack/react-router"; +import type { StepAttempt, WorkflowRun } from "openworkflow/internal"; import { useState } from "react"; export const Route = createFileRoute("/")({ @@ -26,16 +29,64 @@ export const Route = createFileRoute("/")({ listWorkflowRunsServerFn({ data: { limit: 100 } }), getWorkflowRunCountsServerFn(), ]); + const runs = runsResponse.data; + const childRuns = runs.filter( + (run): run is WorkflowRun & { parentStepAttemptId: string } => + run.parentStepAttemptId !== null && run.parentStepAttemptId !== "", + ); + const parentStepAttemptIds = [ + ...new Set(childRuns.map((childRun) => childRun.parentStepAttemptId)), + ]; + const parentStepAttemptsById: Record = {}; + await Promise.all( + parentStepAttemptIds.map(async (parentStepAttemptId) => { + parentStepAttemptsById[parentStepAttemptId] = + await getStepAttemptServerFn({ + data: { stepAttemptId: parentStepAttemptId }, + }); + }), + ); + const parentRunIds = [ + ...new Set( + Object.values(parentStepAttemptsById) + .map((parentStepAttempt) => parentStepAttempt?.workflowRunId) + .filter((parentRunId): parentRunId is string => !!parentRunId), + ), + ]; + const parentRunsById: Record = {}; + await Promise.all( + parentRunIds.map(async (parentRunId) => { + parentRunsById[parentRunId] = await getWorkflowRunServerFn({ + data: { workflowRunId: parentRunId }, + }); + }), + ); + const childRunRelationsByRunId: Record = {}; + for (const childRun of childRuns) { + const parentStepAttempt = + parentStepAttemptsById[childRun.parentStepAttemptId]; + if (!parentStepAttempt) { + continue; + } + + const parentRun = parentRunsById[parentStepAttempt.workflowRunId]; + childRunRelationsByRunId[childRun.id] = { + parentRunId: parentStepAttempt.workflowRunId, + parentWorkflowName: parentRun?.workflowName ?? undefined, + }; + } return { runsResponse, workflowRunCounts, + childRunRelationsByRunId, }; }, }); function HomePage() { - const { runsResponse, workflowRunCounts } = Route.useLoaderData(); + const { runsResponse, workflowRunCounts, childRunRelationsByRunId } = + Route.useLoaderData(); const { data: runs } = runsResponse; const [isCreateRunOpen, setIsCreateRunOpen] = useState(false); usePolling(); @@ -63,7 +114,11 @@ function HomePage() { - + diff --git a/packages/dashboard/src/routes/runs/$runId.tsx b/packages/dashboard/src/routes/runs/$runId.tsx index cb2e5325..ed48f14b 100644 --- a/packages/dashboard/src/routes/runs/$runId.tsx +++ b/packages/dashboard/src/routes/runs/$runId.tsx @@ -3,7 +3,11 @@ import { RunCancelAction } from "@/components/run-cancel-action"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { Card } from "@/components/ui/card"; -import { getWorkflowRunServerFn, listStepAttemptsServerFn } from "@/lib/api"; +import { + getStepAttemptServerFn, + getWorkflowRunServerFn, + listStepAttemptsServerFn, +} from "@/lib/api"; import { STEP_STATUS_CONFIG, TERMINAL_RUN_STATUSES, @@ -19,7 +23,7 @@ import { ListDashesIcon, } from "@phosphor-icons/react"; import { createFileRoute, Link, useRouter } from "@tanstack/react-router"; -import type { StepAttempt } from "openworkflow/internal"; +import type { StepAttempt, WorkflowRun } from "openworkflow/internal"; import { useState } from "react"; export const Route = createFileRoute("/runs/$runId")({ @@ -28,13 +32,57 @@ export const Route = createFileRoute("/runs/$runId")({ getWorkflowRunServerFn({ data: { workflowRunId: params.runId } }), listStepAttemptsServerFn({ data: { workflowRunId: params.runId } }), ]); - return { run, steps: stepsResult.data }; + const steps = stepsResult.data; + + let parentStepAttempt: StepAttempt | null = null; + let parentRun: WorkflowRun | null = null; + + if (run?.parentStepAttemptId) { + parentStepAttempt = await getStepAttemptServerFn({ + data: { stepAttemptId: run.parentStepAttemptId }, + }); + + if (parentStepAttempt) { + parentRun = await getWorkflowRunServerFn({ + data: { workflowRunId: parentStepAttempt.workflowRunId }, + }); + } + } + + const childRunIds = [ + ...new Set( + steps + .map((step) => + step.kind === "invoke" ? step.childWorkflowRunId : null, + ) + .filter((childRunId): childRunId is string => childRunId !== null), + ), + ]; + + const childRunsById = Object.fromEntries( + await Promise.all( + childRunIds.map(async (childRunId) => [ + childRunId, + await getWorkflowRunServerFn({ + data: { workflowRunId: childRunId }, + }), + ]), + ), + ) as Record; + + return { + run, + steps, + parentStepAttempt, + parentRun, + childRunsById, + }; }, component: RunDetailsPage, }); function RunDetailsPage() { - const { run, steps } = Route.useLoaderData(); + const { run, steps, parentRun, childRunsById } = Route.useLoaderData(); const router = useRouter(); const [expandedSteps, setExpandedSteps] = useState>(new Set()); usePolling({ @@ -87,17 +135,10 @@ function RunDetailsPage() {

{run.workflowName}

- {run.version && ( - - {run.version} - - )} + {run.version && {run.version}} {run.status} @@ -105,6 +146,14 @@ function RunDetailsPage() {

Run ID: {run.id}

+ {parentRun && ( + + )}
- {/* Left side - Steps list */}
{steps.length === 0 ? ( @@ -135,6 +183,13 @@ function RunDetailsPage() { const config = STEP_STATUS_CONFIG[step.status]; const StatusIcon = config.icon; const iconColor = config.color; + const stepTypeLabel = + step.kind === "function" ? "run" : step.kind; + const childRunId = + step.kind === "invoke" ? step.childWorkflowRunId : null; + const childRun = childRunId + ? (childRunsById[childRunId] ?? null) + : null; const stepDuration = computeDuration( step.startedAt, step.finishedAt, @@ -142,12 +197,15 @@ function RunDetailsPage() { const stepStartedAt = formatRelativeTime(step.startedAt); return ( -
+
+ {childRunId && ( +
+ +
+ )} + {isExpanded && ( -
+

{step.error ? "Error" : "Output"} @@ -225,7 +286,6 @@ function RunDetailsPage() {

- {/* Right side - Sidebar */}
@@ -260,3 +320,32 @@ function RunDetailsPage() { ); } + +interface RunRelationRowProps { + label: string; + runId: string; + workflowName?: string | undefined; + className?: string | undefined; +} + +function RunRelationRow({ + label, + runId, + workflowName, + className, +}: RunRelationRowProps) { + return ( +
+ {label}: + } + > + {workflowName && ( + [{workflowName}] + )} + {runId} + +
+ ); +} diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx index 418ebe78..dcf1ed4e 100644 --- a/packages/docs/docs/roadmap.mdx +++ b/packages/docs/docs/roadmap.mdx @@ -19,10 +19,10 @@ description: What's coming next for OpenWorkflow - ✅ Configurable retry policies - ✅ Idempotency keys - ✅ Prometheus `/metrics` endpoint +- ✅ Child workflows (`step.invoke`) ## Coming Soon -- Child workflows - Signals - Cron / scheduling - Rollback / compensation functions diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index aab0eae7..65453380 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -117,7 +117,7 @@ await step.run({ name: "log-event" }, async () => { ## Step Types -OpenWorkflow provides two step types: +OpenWorkflow provides three step types: ### `step.run()` @@ -138,6 +138,21 @@ Pauses the workflow until a specified duration has elapsed. See await step.sleep("wait-one-hour", "1h"); ``` +### `step.invoke()` + +Starts a child workflow and waits for its result durably: + +```ts +const childOutput = await step.invoke("generate-report", { + workflow: generateReportWorkflow, + input: { reportId: input.reportId }, + timeout: "5m", // optional, defaults to 7 days +}); +``` + +If `timeout` is reached, the parent step fails, but the child workflow keeps +running independently. + ## Retry Policy (Optional) Control backoff and retry limits for an individual step: diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 343f6954..d866db2e 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -213,12 +213,12 @@ create a separate run. The workflow function receives an object with four properties: -| Parameter | Type | Description | -| --------- | --------------------- | ------------------------------------------------- | -| `input` | Generic | The input data passed when starting the workflow | -| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`) | -| `version` | `string \| null` | The workflow version, if specified | -| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | +| Parameter | Type | Description | +| --------- | --------------------- | ---------------------------------------------------------------- | +| `input` | Generic | The input data passed when starting the workflow | +| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.invoke`) | +| `version` | `string \| null` | The workflow version, if specified | +| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | ```ts defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { @@ -236,14 +236,14 @@ defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { A workflow run progresses through these states: -| Status | Description | -| ----------- | --------------------------------------------------- | -| `pending` | Created and waiting for a worker to claim it | -| `running` | Actively being executed by a worker | -| `sleeping` | Paused and waiting for a `step.sleep` to complete | -| `completed` | Finished successfully | -| `failed` | Failed after exhausting retries or deadline reached | -| `canceled` | Explicitly canceled and will not continue | +| Status | Description | +| ----------- | ------------------------------------------------------ | +| `pending` | Created and waiting for a worker to claim it | +| `running` | Actively being executed by a worker | +| `sleeping` | Paused while waiting for `step.sleep` or `step.invoke` | +| `completed` | Finished successfully | +| `failed` | Failed after exhausting retries or deadline reached | +| `canceled` | Explicitly canceled and will not continue | ## Determinism diff --git a/packages/openworkflow/README.md b/packages/openworkflow/README.md index 4611be8e..6cabe585 100644 --- a/packages/openworkflow/README.md +++ b/packages/openworkflow/README.md @@ -66,6 +66,7 @@ For more details, check out our [docs](https://openworkflow.dev/docs). - ✅ **Step memoization** - Never repeat completed work - ✅ **Automatic retries** - Built-in exponential backoff - ✅ **Long pauses** - Sleep for seconds or months +- ✅ **Child workflows** - Invoke and await child workflow runs - ✅ **Scheduled runs** - Start workflows at a specific time - ✅ **Parallel execution** - Run steps concurrently - ✅ **Idempotency keys** - Deduplicate repeated run requests (24h window) diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index 11893342..c6c59a56 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -108,6 +108,8 @@ export class OpenWorkflow { config: {}, context: null, input: parsedInput ?? null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: resolveAvailableAt(options?.availableAt), deadlineAt: options?.deadlineAt ?? null, }); diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 4fa90f15..54696dbb 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -64,6 +64,9 @@ export interface Backend { failStepAttempt( params: Readonly, ): Promise; + setStepAttemptChildWorkflowRun( + params: Readonly, + ): Promise; // Lifecycle stop(): Promise; @@ -76,6 +79,8 @@ export interface CreateWorkflowRunParams { config: JsonValue; context: JsonValue | null; input: JsonValue | null; + parentStepAttemptNamespaceId: string | null; + parentStepAttemptId: string | null; availableAt: Date | null; // null = immediately deadlineAt: Date | null; // null = no deadline } @@ -158,6 +163,14 @@ export interface FailStepAttemptParams { error: SerializedError; } +export interface SetStepAttemptChildWorkflowRunParams { + workflowRunId: string; + stepAttemptId: string; + workerId: string; + childWorkflowRunNamespaceId: string; + childWorkflowRunId: string; +} + export interface PaginationOptions { limit?: number; after?: string; diff --git a/packages/openworkflow/core/error.test.ts b/packages/openworkflow/core/error.test.ts index 85dd19ea..38d442cb 100644 --- a/packages/openworkflow/core/error.test.ts +++ b/packages/openworkflow/core/error.test.ts @@ -1,4 +1,4 @@ -import { serializeError, wrapError } from "./error.js"; +import { deserializeError, serializeError, wrapError } from "./error.js"; import { describe, expect, test } from "vitest"; describe("serializeError", () => { @@ -94,3 +94,35 @@ describe("wrapError", () => { expect(wrapped.cause).toBe("boom"); }); }); + +describe("deserializeError", () => { + test("reconstructs Error with message", () => { + const error = deserializeError({ message: "boom" }); + + expect(error).toBeInstanceOf(Error); + expect(error.message).toBe("boom"); + }); + + test("preserves name from serialized payload", () => { + const error = deserializeError({ message: "fail", name: "TypeError" }); + + expect(error.name).toBe("TypeError"); + expect(error.message).toBe("fail"); + }); + + test("preserves stack from serialized payload", () => { + const stack = "Error: fail\n at test.ts:1:1"; + const error = deserializeError({ message: "fail", stack }); + + expect(error.stack).toBe(stack); + }); + + test("roundtrips through serializeError", () => { + const original = new TypeError("type mismatch"); + const serialized = serializeError(original); + const restored = deserializeError(serialized); + + expect(restored.message).toBe(original.message); + expect(restored.name).toBe(original.name); + }); +}); diff --git a/packages/openworkflow/core/error.ts b/packages/openworkflow/core/error.ts index adfe1ca8..4fed6953 100644 --- a/packages/openworkflow/core/error.ts +++ b/packages/openworkflow/core/error.ts @@ -28,6 +28,23 @@ export function serializeError(error: unknown): SerializedError { }; } +/** + * Convert a serialized error payload back into an Error instance so messages + * survive re-serialization without becoming "[object Object]". + * @param serialized - Serialized error payload from persisted workflow state + * @returns Rehydrated Error preserving message/name/stack when available + */ +export function deserializeError(serialized: Readonly): Error { + const error = new Error(serialized.message); + if (serialized.name) { + error.name = serialized.name; + } + if (serialized.stack) { + error.stack = serialized.stack; + } + return error; +} + /** * Wrap an error with a clearer message while preserving the original cause. * @param message - The message to use for the new error diff --git a/packages/openworkflow/core/step-attempt.test.ts b/packages/openworkflow/core/step-attempt.test.ts index 99ee6c57..62d4f38a 100644 --- a/packages/openworkflow/core/step-attempt.test.ts +++ b/packages/openworkflow/core/step-attempt.test.ts @@ -6,6 +6,7 @@ import { normalizeStepOutput, calculateDateFromDuration, createSleepContext, + createInvokeContext, } from "./step-attempt.js"; import type { StepAttempt, StepAttemptCache } from "./step-attempt.js"; import { describe, expect, test } from "vitest"; @@ -317,6 +318,27 @@ describe("createSleepContext", () => { }); }); +describe("createInvokeContext", () => { + test("creates invoke context with timeout", () => { + const timeoutAt = new Date("2025-06-15T10:30:00.000Z"); + const context = createInvokeContext(timeoutAt); + + expect(context).toEqual({ + kind: "invoke", + timeoutAt: "2025-06-15T10:30:00.000Z", + }); + }); + + test("creates invoke context with null timeout", () => { + const context = createInvokeContext(null); + + expect(context).toEqual({ + kind: "invoke", + timeoutAt: null, + }); + }); +}); + function createMockStepAttempt( overrides: Partial = {}, ): StepAttempt { diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index 86329c6e..dd3aae6f 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -7,7 +7,7 @@ import { err, ok } from "./result.js"; /** * The kind of step in a workflow. */ -export type StepKind = "function" | "sleep"; +export type StepKind = "function" | "sleep" | "invoke"; /** * Status of a step attempt through its lifecycle. @@ -19,13 +19,28 @@ export type StepAttemptStatus = | "failed"; /** - * Context for a step attempt (currently only used for sleep steps). + * Context for a sleep step attempt. */ -export interface StepAttemptContext { +export interface SleepStepAttemptContext { kind: "sleep"; resumeAt: string; } +/** + * Context for an invoke step attempt. + */ +export interface InvokeStepAttemptContext { + kind: "invoke"; + timeoutAt: string | null; +} + +/** + * Context for a step attempt. + */ +export type StepAttemptContext = + | SleepStepAttemptContext + | InvokeStepAttemptContext; + /** * StepAttempt represents a single attempt of a step within a workflow. */ @@ -134,12 +149,25 @@ export function calculateDateFromDuration( * @param resumeAt - The time when the sleep should resume * @returns The context object for the sleep step */ -export function createSleepContext(resumeAt: Readonly): { - kind: "sleep"; - resumeAt: string; -} { +export function createSleepContext( + resumeAt: Readonly, +): SleepStepAttemptContext { return { kind: "sleep" as const, resumeAt: resumeAt.toISOString(), }; } + +/** + * Create the context object for an invoke step attempt. + * @param timeoutAt - Parent wait timeout deadline, or null for no timeout + * @returns The context object for an invoke step + */ +export function createInvokeContext( + timeoutAt: Readonly | null, +): InvokeStepAttemptContext { + return { + kind: "invoke" as const, + timeoutAt: timeoutAt?.toISOString() ?? null, + }; +} diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 71a52bbe..744e1b23 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -1,5 +1,9 @@ import type { DurationString } from "./duration.js"; -import type { RetryPolicy } from "./workflow-definition.js"; +import type { + RetryPolicy, + Workflow, + WorkflowSpec, +} from "./workflow-definition.js"; import type { WorkflowRun } from "./workflow-run.js"; /** @@ -26,9 +30,31 @@ export type StepFunction = () => | Output | undefined; +/** + * Target workflow reference for `step.invoke`. + */ +type InvokeWorkflowTarget = + | WorkflowSpec + | Workflow + | string; + +/** + * Config for invoking a child workflow from `step.invoke()`. + */ +export interface InvokeStepConfig< + Input = unknown, + Output = unknown, + RunInput = Input, +> { + workflow: InvokeWorkflowTarget; + input?: RunInput; + timeout?: number | string | Date; +} + /** * Represents the API for defining steps within a workflow. Used within a - * workflow handler to define steps by calling `step.run()`. + * workflow handler to define steps by calling `step.run()`, `step.sleep()`, + * and `step.invoke()`. */ export interface StepApi { run: ( @@ -36,6 +62,10 @@ export interface StepApi { fn: StepFunction, ) => Promise; sleep: (name: string, duration: DurationString) => Promise; + invoke: ( + name: string, + opts: Readonly>, + ) => Promise; } /** diff --git a/packages/openworkflow/core/workflow-run.test.ts b/packages/openworkflow/core/workflow-run.test.ts index 2db7c964..8b6765b3 100644 --- a/packages/openworkflow/core/workflow-run.test.ts +++ b/packages/openworkflow/core/workflow-run.test.ts @@ -1,7 +1,22 @@ import type { StandardSchemaV1 } from "./standard-schema.js"; -import { validateInput } from "./workflow-run.js"; +import { isTerminalStatus, validateInput } from "./workflow-run.js"; +import type { WorkflowRunStatus } from "./workflow-run.js"; import { describe, expect, test } from "vitest"; +describe("isTerminalStatus", () => { + test.each<[WorkflowRunStatus, boolean]>([ + ["pending", false], + ["running", false], + ["sleeping", false], + ["completed", true], + ["succeeded", true], + ["failed", true], + ["canceled", true], + ])("returns %s for status '%s'", (status, expected) => { + expect(isTerminalStatus(status)).toBe(expected); + }); +}); + describe("validateInput", () => { test("returns success with input when no schema provided (null)", async () => { const input = { name: "test", value: 42 }; diff --git a/packages/openworkflow/core/workflow-run.ts b/packages/openworkflow/core/workflow-run.ts index e525aa9b..5f85421f 100644 --- a/packages/openworkflow/core/workflow-run.ts +++ b/packages/openworkflow/core/workflow-run.ts @@ -14,6 +14,20 @@ export type WorkflowRunStatus = | "failed" | "canceled"; +/** + * Determine whether a workflow run status is terminal (no further transitions). + * @param status - Workflow run status + * @returns True when status is terminal + */ +export function isTerminalStatus(status: WorkflowRunStatus): boolean { + return ( + status === "completed" || + status === "succeeded" || + status === "failed" || + status === "canceled" + ); +} + /** * WorkflowRun represents a single execution instance of a workflow. */ diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index 3c6e6014..63a6c1fc 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -66,6 +66,8 @@ describe("BackendPostgres schema option", () => { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -112,6 +114,8 @@ describe("BackendPostgres schema option", () => { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -182,6 +186,8 @@ describe("BackendPostgres cancel fallback", () => { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 45ad21d6..162527eb 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -16,6 +16,7 @@ import { PaginatedResponse, FailStepAttemptParams, CompleteStepAttemptParams, + SetStepAttemptChildWorkflowRunParams, FailWorkflowRunParams, RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, @@ -142,6 +143,9 @@ export class BackendPostgres implements Backend { params: CreateWorkflowRunParams, ): Promise { const workflowRunsTable = this.workflowRunsTable(pg); + const parentStepAttemptNamespaceId = + params.parentStepAttemptNamespaceId ?? null; + const parentStepAttemptId = params.parentStepAttemptId ?? null; const [workflowRun] = await pg` INSERT INTO ${workflowRunsTable} ( @@ -155,6 +159,8 @@ export class BackendPostgres implements Backend { "context", "input", "attempts", + "parent_step_attempt_namespace_id", + "parent_step_attempt_id", "available_at", "deadline_at", "created_at", @@ -171,6 +177,8 @@ export class BackendPostgres implements Backend { ${pg.json(params.context)}, ${pg.json(params.input)}, 0, + ${parentStepAttemptNamespaceId}, + ${parentStepAttemptId}, ${sqlDateDefaultNow(pg, params.availableAt)}, ${params.deadlineAt}, date_trunc('milliseconds', NOW()), @@ -376,7 +384,11 @@ export class BackendPostgres implements Backend { UPDATE ${workflowRunsTable} SET "status" = 'sleeping', - "available_at" = ${params.availableAt}, + "available_at" = CASE + WHEN "available_at" IS NOT NULL AND "available_at" <= NOW() + THEN "available_at" + ELSE ${params.availableAt} + END, "worker_id" = NULL, "updated_at" = NOW() WHERE "namespace_id" = ${this.namespaceId} @@ -418,6 +430,8 @@ export class BackendPostgres implements Backend { if (!updated) throw new Error("Failed to mark workflow run completed"); + await this.wakeParentWorkflowRun(updated); + return updated; } @@ -457,6 +471,10 @@ export class BackendPostgres implements Backend { if (!updated) throw new Error("Failed to mark workflow run failed"); + if (updated.status === "failed") { + await this.wakeParentWorkflowRun(updated); + } + return updated; } @@ -535,9 +553,42 @@ export class BackendPostgres implements Backend { throw new Error("Failed to cancel workflow run"); } + await this.wakeParentWorkflowRun(updated); + return updated; } + private async wakeParentWorkflowRun( + childWorkflowRun: Readonly, + ): Promise { + if ( + !childWorkflowRun.parentStepAttemptNamespaceId || + !childWorkflowRun.parentStepAttemptId + ) { + return; + } + + const workflowRunsTable = this.workflowRunsTable(); + const stepAttemptsTable = this.stepAttemptsTable(); + + await this.pg` + UPDATE ${workflowRunsTable} wr + SET + "available_at" = CASE + WHEN wr."available_at" IS NULL OR wr."available_at" > NOW() + THEN NOW() + ELSE wr."available_at" + END, + "updated_at" = NOW() + FROM ${stepAttemptsTable} sa + WHERE sa."namespace_id" = ${childWorkflowRun.parentStepAttemptNamespaceId} + AND sa."id" = ${childWorkflowRun.parentStepAttemptId} + AND wr."namespace_id" = sa."namespace_id" + AND wr."id" = sa."workflow_run_id" + AND wr."status" = 'sleeping' + `; + } + async createStepAttempt( params: CreateStepAttemptParams, ): Promise { @@ -578,6 +629,37 @@ export class BackendPostgres implements Backend { return stepAttempt; } + async setStepAttemptChildWorkflowRun( + params: SetStepAttemptChildWorkflowRunParams, + ): Promise { + const stepAttemptsTable = this.stepAttemptsTable(); + const workflowRunsTable = this.workflowRunsTable(); + + const [updated] = await this.pg` + UPDATE ${stepAttemptsTable} sa + SET + "child_workflow_run_namespace_id" = ${params.childWorkflowRunNamespaceId}, + "child_workflow_run_id" = ${params.childWorkflowRunId}, + "updated_at" = NOW() + FROM ${workflowRunsTable} wr + WHERE sa."namespace_id" = ${this.namespaceId} + AND sa."workflow_run_id" = ${params.workflowRunId} + AND sa."id" = ${params.stepAttemptId} + AND sa."status" = 'running' + AND wr."namespace_id" = sa."namespace_id" + AND wr."id" = sa."workflow_run_id" + AND wr."status" = 'running' + AND wr."worker_id" = ${params.workerId} + RETURNING sa.* + `; + + if (!updated) { + throw new Error("Failed to set step attempt child workflow run"); + } + + return updated; + } + async getStepAttempt( params: GetStepAttemptParams, ): Promise { diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index 6f3dc3ab..85889868 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -6,7 +6,7 @@ import { randomUUID } from "node:crypto"; import { unlinkSync, existsSync } from "node:fs"; import { tmpdir } from "node:os"; import path from "node:path"; -import { test, describe, afterAll, expect } from "vitest"; +import { test, describe, afterAll, expect, vi } from "vitest"; test("it is a test file (workaround for sonarjs/no-empty-test-file linter)", () => { assert.ok(true); @@ -99,6 +99,8 @@ describe("BackendSqlite.createWorkflowRun error handling", () => { config: {}, context: null, input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }), @@ -146,6 +148,8 @@ describe("BackendSqlite.createWorkflowRun error handling", () => { config: {}, context: null, input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }), @@ -155,3 +159,82 @@ describe("BackendSqlite.createWorkflowRun error handling", () => { await backend.stop(); }); }); + +describe("BackendSqlite.setStepAttemptChildWorkflowRun error handling", () => { + test("throws when linked step attempt cannot be reloaded", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + try { + const parent = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + config: {}, + context: null, + input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + const workerId = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + if (!claimed) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimed.id).toBe(parent.id); + + const stepAttempt = await backend.createStepAttempt({ + workflowRunId: claimed.id, + workerId, + stepName: randomUUID(), + kind: "invoke", + config: {}, + context: null, + }); + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + config: {}, + context: null, + input: null, + parentStepAttemptNamespaceId: stepAttempt.namespaceId, + parentStepAttemptId: stepAttempt.id, + availableAt: null, + deadlineAt: null, + }); + + const originalGetStepAttempt = backend.getStepAttempt.bind(backend); + const getStepAttemptSpy = vi + .spyOn(backend, "getStepAttempt") + .mockImplementation(async (params) => { + if (params.stepAttemptId === stepAttempt.id) { + return null; + } + return await originalGetStepAttempt(params); + }); + + try { + await expect( + backend.setStepAttemptChildWorkflowRun({ + workflowRunId: claimed.id, + stepAttemptId: stepAttempt.id, + workerId, + childWorkflowRunNamespaceId: childRun.namespaceId, + childWorkflowRunId: childRun.id, + }), + ).rejects.toThrow("Failed to set step attempt child workflow run"); + } finally { + getStepAttemptSpy.mockRestore(); + } + } finally { + await backend.stop(); + } + }); +}); diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 19411e8f..3e870176 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -15,6 +15,7 @@ import { PaginatedResponse, FailStepAttemptParams, CompleteStepAttemptParams, + SetStepAttemptChildWorkflowRunParams, FailWorkflowRunParams, RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, @@ -137,6 +138,8 @@ export class BackendSqlite implements Backend { const availableAt = params.availableAt ? toISO(params.availableAt) : currentTime; + const parentStepAttemptNamespaceId = params.parentStepAttemptNamespaceId; + const parentStepAttemptId = params.parentStepAttemptId ?? null; const stmt = this.db.prepare(` INSERT INTO "workflow_runs" ( @@ -150,12 +153,14 @@ export class BackendSqlite implements Backend { "context", "input", "attempts", + "parent_step_attempt_namespace_id", + "parent_step_attempt_id", "available_at", "deadline_at", "created_at", "updated_at" ) - VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?) `); stmt.run( @@ -167,6 +172,8 @@ export class BackendSqlite implements Backend { toJSON(params.config), toJSON(params.context), toJSON(params.input), + parentStepAttemptNamespaceId, + parentStepAttemptId, availableAt, toISO(params.deadlineAt), currentTime, @@ -359,12 +366,16 @@ export class BackendSqlite implements Backend { async sleepWorkflowRun(params: SleepWorkflowRunParams): Promise { const currentTime = now(); + const resumeAt = toISO(params.availableAt); const stmt = this.db.prepare(` UPDATE "workflow_runs" SET "status" = 'sleeping', - "available_at" = ?, + "available_at" = CASE + WHEN "available_at" IS NOT NULL AND "available_at" <= ? THEN "available_at" + ELSE ? + END, "worker_id" = NULL, "updated_at" = ? WHERE "namespace_id" = ? @@ -374,7 +385,8 @@ export class BackendSqlite implements Backend { `); const result = stmt.run( - toISO(params.availableAt), + currentTime, + resumeAt, currentTime, this.namespaceId, params.workflowRunId, @@ -433,6 +445,8 @@ export class BackendSqlite implements Backend { }); if (!updated) throw new Error("Failed to mark workflow run completed"); + this.wakeParentWorkflowRun(updated); + return updated; } @@ -486,6 +500,10 @@ export class BackendSqlite implements Backend { const updated = await this.getWorkflowRun({ workflowRunId }); if (!updated) throw new Error("Failed to mark workflow run failed"); + if (updated.status === "failed") { + this.wakeParentWorkflowRun(updated); + } + return updated; } @@ -584,9 +602,49 @@ export class BackendSqlite implements Backend { }); if (!updated) throw new Error("Failed to cancel workflow run"); + this.wakeParentWorkflowRun(updated); + return updated; } + private wakeParentWorkflowRun(childWorkflowRun: Readonly): void { + if ( + !childWorkflowRun.parentStepAttemptNamespaceId || + !childWorkflowRun.parentStepAttemptId + ) { + return; + } + + const currentTime = now(); + const stmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > ? THEN ? + ELSE "available_at" + END, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ( + SELECT "workflow_run_id" + FROM "step_attempts" + WHERE "namespace_id" = ? + AND "id" = ? + LIMIT 1 + ) + AND "status" = 'sleeping' + `); + + stmt.run( + currentTime, + currentTime, + currentTime, + childWorkflowRun.parentStepAttemptNamespaceId, + childWorkflowRun.parentStepAttemptNamespaceId, + childWorkflowRun.parentStepAttemptId, + ); + } + // eslint-disable-next-line @typescript-eslint/require-await async countWorkflowRuns(): Promise { const stmt = this.db.prepare(` @@ -817,6 +875,65 @@ export class BackendSqlite implements Backend { return stepAttempt; } + async setStepAttemptChildWorkflowRun( + params: SetStepAttemptChildWorkflowRunParams, + ): Promise { + const currentTime = now(); + + const workflowStmt = this.db.prepare(` + SELECT "id" + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" = 'running' + AND "worker_id" = ? + `); + + const workflowRow = workflowStmt.get( + this.namespaceId, + params.workflowRunId, + params.workerId, + ) as { id: string } | undefined; + + if (!workflowRow) { + throw new Error("Failed to set step attempt child workflow run"); + } + + const stmt = this.db.prepare(` + UPDATE "step_attempts" + SET + "child_workflow_run_namespace_id" = ?, + "child_workflow_run_id" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "id" = ? + AND "status" = 'running' + `); + + const result = stmt.run( + params.childWorkflowRunNamespaceId, + params.childWorkflowRunId, + currentTime, + this.namespaceId, + params.workflowRunId, + params.stepAttemptId, + ); + + if (result.changes === 0) { + throw new Error("Failed to set step attempt child workflow run"); + } + + const updated = await this.getStepAttempt({ + stepAttemptId: params.stepAttemptId, + }); + if (!updated) { + throw new Error("Failed to set step attempt child workflow run"); + } + + return updated; + } + getStepAttempt(params: GetStepAttemptParams): Promise { const stmt = this.db.prepare(` SELECT * diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 9ebb790e..7f352dd9 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -78,6 +78,8 @@ export function testBackend(options: TestBackendOptions): void { input: expected.input, config: expected.config, context: expected.context, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: expected.availableAt, deadlineAt: expected.deadlineAt, }); @@ -102,6 +104,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -113,6 +117,36 @@ export function testBackend(options: TestBackendOptions): void { expect(createdMin.deadlineAt).toBeNull(); }); + test("persists parent step attempt linkage when provided", async () => { + const parentRun = await createClaimedWorkflowRun(backend); + const parentStepAttempt = await backend.createStepAttempt({ + workflowRunId: parentRun.id, + workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "function", + config: {}, + context: null, + }); + + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: { key: "val" }, + config: {}, + context: null, + parentStepAttemptNamespaceId: parentStepAttempt.namespaceId, + parentStepAttemptId: parentStepAttempt.id, + availableAt: null, + deadlineAt: null, + }); + + expect(childRun.parentStepAttemptNamespaceId).toBe( + parentStepAttempt.namespaceId, + ); + expect(childRun.parentStepAttemptId).toBe(parentStepAttempt.id); + }); + test("reuses the same run for matching idempotency key and workflow identity", async () => { const backend = await setup(); const workflowName = randomUUID(); @@ -126,6 +160,8 @@ export function testBackend(options: TestBackendOptions): void { input: { val: 1 }, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -137,6 +173,8 @@ export function testBackend(options: TestBackendOptions): void { input: { val: 2 }, config: { changed: true }, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -156,6 +194,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -167,6 +207,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -189,6 +231,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -200,6 +244,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -214,6 +260,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -225,6 +273,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -249,6 +299,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -260,6 +312,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -285,6 +339,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -300,6 +356,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -322,6 +380,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -333,6 +393,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -356,6 +418,8 @@ export function testBackend(options: TestBackendOptions): void { input: { i }, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }), @@ -380,6 +444,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -404,6 +470,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -426,6 +494,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -451,6 +521,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -465,6 +537,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -478,6 +552,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -931,6 +1007,142 @@ export function testBackend(options: TestBackendOptions): void { expect(completed.finishedAt).not.toBeNull(); expect(completed.availableAt).toBeNull(); }); + + test("wakes a sleeping parent run when a child run completes", async () => { + const backend = await setup(); + + const parentRun = await createClaimedWorkflowRun(backend); + const parentWorkerId = parentRun.workerId ?? ""; + const invokeAttempt = await backend.createStepAttempt({ + workflowRunId: parentRun.id, + workerId: parentWorkerId, + stepName: randomUUID(), + kind: "invoke", + config: {}, + context: { kind: "invoke", timeoutAt: null }, + }); + const farFuture = new Date(Date.now() + 5 * 60 * 1000); + await backend.sleepWorkflowRun({ + workflowRunId: parentRun.id, + workerId: parentWorkerId, + availableAt: farFuture, + }); + + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: invokeAttempt.namespaceId, + parentStepAttemptId: invokeAttempt.id, + availableAt: null, + deadlineAt: null, + }); + + const claimedChild = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + if (!claimedChild) throw new Error("Expected child run to be claimed"); + expect(claimedChild.id).toBe(childRun.id); + + await backend.completeWorkflowRun({ + workflowRunId: childRun.id, + workerId: claimedChild.workerId ?? "", + output: { ok: true }, + }); + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: parentRun.id, + }); + expect(parentAfter?.status).toBe("sleeping"); + expect(parentAfter?.availableAt).not.toBeNull(); + if (!parentAfter?.availableAt) { + throw new Error("Expected parent availableAt after child completion"); + } + + expect(parentAfter.availableAt.getTime()).toBeLessThan( + farFuture.getTime(), + ); + expect(parentAfter.availableAt.getTime()).toBeLessThanOrEqual( + Date.now() + 1000, + ); + + await teardown(backend); + }); + + test("does not wake a running parent run when a child run completes", async () => { + const backend = await setup(); + + const parentWorkerId = randomUUID(); + const pendingParent = await createPendingWorkflowRun(backend); + const parentRun = await backend.claimWorkflowRun({ + workerId: parentWorkerId, + leaseDurationMs: 5 * 60 * 1000, + }); + if (!parentRun) throw new Error("Expected parent run to be claimed"); + expect(parentRun.id).toBe(pendingParent.id); + if (!parentRun.availableAt) { + throw new Error("Expected parent availableAt while running"); + } + const parentLeaseBeforeChild = parentRun.availableAt.getTime(); + + const invokeAttempt = await backend.createStepAttempt({ + workflowRunId: parentRun.id, + workerId: parentWorkerId, + stepName: randomUUID(), + kind: "invoke", + config: {}, + context: { kind: "invoke", timeoutAt: null }, + }); + + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: invokeAttempt.namespaceId, + parentStepAttemptId: invokeAttempt.id, + availableAt: null, + deadlineAt: null, + }); + + const claimedChild = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + if (!claimedChild) throw new Error("Expected child run to be claimed"); + expect(claimedChild.id).toBe(childRun.id); + + await backend.completeWorkflowRun({ + workflowRunId: childRun.id, + workerId: claimedChild.workerId ?? "", + output: { ok: true }, + }); + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: parentRun.id, + }); + expect(parentAfter?.status).toBe("running"); + expect(parentAfter?.workerId).toBe(parentWorkerId); + expect(parentAfter?.availableAt).not.toBeNull(); + if (!parentAfter?.availableAt) { + throw new Error("Expected parent availableAt after child completion"); + } + + expect(parentAfter.availableAt.getTime()).toBeGreaterThanOrEqual( + parentLeaseBeforeChild - 1000, + ); + expect(parentAfter.availableAt.getTime()).toBeGreaterThan( + Date.now() + 1000, + ); + + await teardown(backend); + }); }); describe("failWorkflowRun()", () => { @@ -1022,6 +1234,69 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + + test("does not wake parent when child failure is retryable", async () => { + const backend = await setup(); + + const parentRun = await createClaimedWorkflowRun(backend); + const parentWorkerId = parentRun.workerId ?? ""; + const invokeAttempt = await backend.createStepAttempt({ + workflowRunId: parentRun.id, + workerId: parentWorkerId, + stepName: randomUUID(), + kind: "invoke", + config: {}, + context: { kind: "invoke", timeoutAt: null }, + }); + const farFuture = new Date(Date.now() + 5 * 60 * 1000); + await backend.sleepWorkflowRun({ + workflowRunId: parentRun.id, + workerId: parentWorkerId, + availableAt: farFuture, + }); + + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: invokeAttempt.namespaceId, + parentStepAttemptId: invokeAttempt.id, + availableAt: null, + deadlineAt: null, + }); + const claimedChild = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + if (!claimedChild) throw new Error("Expected child run to be claimed"); + expect(claimedChild.id).toBe(childRun.id); + + const failedChild = await backend.failWorkflowRun({ + workflowRunId: childRun.id, + workerId: claimedChild.workerId ?? "", + error: { message: "transient child error" }, + retryPolicy: RESCHEDULING_RETRY_POLICY, + }); + expect(failedChild.status).toBe("pending"); + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: parentRun.id, + }); + expect(parentAfter?.status).toBe("sleeping"); + expect(parentAfter?.availableAt).not.toBeNull(); + if (!parentAfter?.availableAt) { + throw new Error("Expected parent availableAt after child retry"); + } + + expect(parentAfter.availableAt.getTime()).toBeGreaterThan( + Date.now() + 4 * 60 * 1000, + ); + + await teardown(backend); + }); }); describe("rescheduleWorkflowRunAfterFailedStepAttempt()", () => { @@ -1126,6 +1401,123 @@ export function testBackend(options: TestBackendOptions): void { }); }); + describe("setStepAttemptChildWorkflowRun()", () => { + test("sets child workflow linkage on a running step attempt", async () => { + const parentRun = await createClaimedWorkflowRun(backend); + const stepAttempt = await backend.createStepAttempt({ + workflowRunId: parentRun.id, + workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "invoke", + config: {}, + context: null, + }); + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: stepAttempt.namespaceId, + parentStepAttemptId: stepAttempt.id, + availableAt: null, + deadlineAt: null, + }); + + const updated = await backend.setStepAttemptChildWorkflowRun({ + workflowRunId: parentRun.id, + stepAttemptId: stepAttempt.id, + workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + childWorkflowRunNamespaceId: childRun.namespaceId, + childWorkflowRunId: childRun.id, + }); + + expect(updated.childWorkflowRunNamespaceId).toBe(childRun.namespaceId); + expect(updated.childWorkflowRunId).toBe(childRun.id); + + const fetched = await backend.getStepAttempt({ + stepAttemptId: stepAttempt.id, + }); + expect(fetched?.childWorkflowRunNamespaceId).toBe(childRun.namespaceId); + expect(fetched?.childWorkflowRunId).toBe(childRun.id); + }); + + test("throws when parent workflow run is not owned by worker", async () => { + const parentRun = await createClaimedWorkflowRun(backend); + const stepAttempt = await backend.createStepAttempt({ + workflowRunId: parentRun.id, + workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "invoke", + config: {}, + context: null, + }); + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: stepAttempt.namespaceId, + parentStepAttemptId: stepAttempt.id, + availableAt: null, + deadlineAt: null, + }); + + await expect( + backend.setStepAttemptChildWorkflowRun({ + workflowRunId: parentRun.id, + stepAttemptId: stepAttempt.id, + workerId: randomUUID(), + childWorkflowRunNamespaceId: childRun.namespaceId, + childWorkflowRunId: childRun.id, + }), + ).rejects.toThrow("Failed to set step attempt child workflow run"); + }); + + test("throws when step attempt is not running", async () => { + const parentRun = await createClaimedWorkflowRun(backend); + const stepAttempt = await backend.createStepAttempt({ + workflowRunId: parentRun.id, + workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "invoke", + config: {}, + context: null, + }); + await backend.completeStepAttempt({ + workflowRunId: parentRun.id, + stepAttemptId: stepAttempt.id, + workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + output: { ok: true }, + }); + const childRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: stepAttempt.namespaceId, + parentStepAttemptId: stepAttempt.id, + availableAt: null, + deadlineAt: null, + }); + + await expect( + backend.setStepAttemptChildWorkflowRun({ + workflowRunId: parentRun.id, + stepAttemptId: stepAttempt.id, + workerId: parentRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + childWorkflowRunNamespaceId: childRun.namespaceId, + childWorkflowRunId: childRun.id, + }), + ).rejects.toThrow("Failed to set step attempt child workflow run"); + }); + }); + describe("listStepAttempts()", () => { test("lists step attempts ordered by creation time", async () => { const claimed = await createClaimedWorkflowRun(backend); @@ -1453,6 +1845,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: deadline, }); @@ -1472,6 +1866,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: pastDeadline, }); @@ -1497,6 +1893,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: pastDeadline, }); @@ -1533,6 +1931,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: deadline, }); @@ -1571,6 +1971,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: deadline, }); @@ -1697,6 +2099,8 @@ export function testBackend(options: TestBackendOptions): void { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: new Date(Date.now() - 1000), // deadline in the past }); @@ -1804,6 +2208,8 @@ async function createPendingWorkflowRun(b: Backend) { input: null, config: {}, context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index fa4aef37..85948a18 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -1,10 +1,23 @@ import { OpenWorkflow } from "../client/client.js"; +import type { DurationString } from "../core/duration.js"; import type { StepAttempt } from "../core/step-attempt.js"; +import { DEFAULT_WORKFLOW_RETRY_POLICY } from "../core/workflow-definition.js"; import { BackendPostgres } from "../postgres.js"; import { DEFAULT_POSTGRES_URL } from "../postgres/postgres.js"; -import { createStepExecutionStateFromAttempts } from "./execution.js"; +import { + createStepExecutionStateFromAttempts, + executeWorkflow, +} from "./execution.js"; import { randomUUID } from "node:crypto"; -import { describe, test, expect } from "vitest"; +import { afterEach, describe, test, expect, vi } from "vitest"; + +const backendsToStop: BackendPostgres[] = []; + +afterEach(async () => { + await Promise.all( + backendsToStop.splice(0).map(async (backend) => backend.stop()), + ); +}); describe("StepExecutor", () => { test("executes step and returns result", async () => { @@ -125,11 +138,19 @@ describe("StepExecutor", () => { const handle = await workflow.run(); const worker = client.newWorker(); - await worker.tick(); + const status = await tickUntilStatus( + backend, + worker, + handle.workflowRun.id, + "sleeping", + 200, + 20, + ); const workflowRun = await backend.getWorkflowRun({ workflowRunId: handle.workflowRun.id, }); + expect(status).toBe("sleeping"); expect(workflowRun?.status).toBe("sleeping"); expect(workflowRun?.availableAt).not.toBeNull(); }); @@ -167,6 +188,1232 @@ describe("StepExecutor", () => { const result = await handle.result(); expect(result).toBe(15); }); + + test("invokes a child workflow and returns child output", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-success-${randomUUID()}` }, + ({ input }: { input: { value: number } }) => { + return input.value + 1; + }, + ); + + const parent = client.defineWorkflow<{ value: number }, number>( + { name: `invoke-parent-success-${randomUUID()}` }, + async ({ input, step }) => { + const childResult = await step.invoke("invoke-child", { + workflow: child.workflow, + input: { value: input.value }, + }); + return childResult * 2; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run({ value: 5 }); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe(12); + }); + + test("completes parent immediately when invoked child already finished", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-early-finish-${randomUUID()}` }, + () => { + return { ignored: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-early-finish-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const handle = await parent.run(); + const claimedParent = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParent) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimedParent.id).toBe(handle.workflowRun.id); + + const originalCreateWorkflowRun = backend.createWorkflowRun.bind(backend); + const createWorkflowRunSpy = vi + .spyOn(backend, "createWorkflowRun") + .mockImplementation(async (params) => { + const created = await originalCreateWorkflowRun(params); + + if ( + params.parentStepAttemptNamespaceId !== null && + params.parentStepAttemptId !== null + ) { + const claimedChild = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedChild) { + throw new Error("Expected child workflow run to be claimed"); + } + expect(claimedChild.id).toBe(created.id); + + await backend.completeWorkflowRun({ + workflowRunId: claimedChild.id, + workerId: claimedChild.workerId ?? "", + output: { fast: true }, + }); + } + + return created; + }); + + try { + await executeWorkflow({ + backend, + workflowRun: claimedParent, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParent.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + } finally { + createWorkflowRunSpy.mockRestore(); + } + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfter?.status).toBe("completed"); + expect(parentAfter?.availableAt).toBeNull(); + await expect(handle.result()).resolves.toEqual({ fast: true }); + }); + + test("supports invoke step string name options", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-step-shape-${randomUUID()}` }, + ({ input }: { input: { value: number } }) => { + return input.value + 1; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-step-shape-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + input: { value: 9 }, + }); + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe(10); + }); + + test("supports workflow-name targets, date/number timeouts, and cached invoke replay", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-timeout-shapes-${randomUUID()}` }, + ({ input }: { input: { value: number } }) => { + return input.value + 1; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-timeout-shapes-${randomUUID()}` }, + async ({ step }) => { + const first = await step.invoke("invoke-cached", { + workflow: child.workflow.spec.name, + input: { value: 4 }, + timeout: new Date(Date.now() + 60_000), + }); + const second = await step.invoke("invoke-cached", { + workflow: child.workflow.spec.name, + input: { value: 99 }, + timeout: 60_000, + }); + const numeric = await step.invoke("invoke-number-timeout", { + workflow: child.workflow.spec.name, + input: { value: 8 }, + timeout: 60_000, + }); + const spec = await step.invoke("invoke-spec-target", { + workflow: { name: child.workflow.spec.name }, + input: { value: 1 }, + timeout: 60_000, + }); + return { first, second, numeric, spec }; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ + first: 5, + second: 5, + numeric: 9, + spec: 2, + }); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + expect( + steps.data.filter((stepAttempt) => stepAttempt.kind === "invoke"), + ).toHaveLength(3); + }); + + test("fails invoke when timeout number is invalid", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const parent = client.defineWorkflow( + { name: `invoke-parent-invalid-timeout-number-${randomUUID()}` }, + async ({ step }) => { + await step.invoke("invoke-child", { + workflow: `invoke-child-invalid-timeout-number-${randomUUID()}`, + timeout: -1, + }); + return "never"; + }, + ); + + const worker = client.newWorker(); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 20, + ); + + expect(status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /Invoke timeout must be a non-negative number/, + ); + }); + + test("fails invoke when timeout duration string is invalid", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const parent = client.defineWorkflow( + { name: `invoke-parent-invalid-timeout-duration-${randomUUID()}` }, + async ({ step }) => { + await step.invoke("invoke-child", { + workflow: `invoke-child-invalid-timeout-duration-${randomUUID()}`, + timeout: "not-a-duration" as DurationString, + }); + return "never"; + }, + ); + + const worker = client.newWorker(); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 20, + ); + + expect(status).toBe("failed"); + await expect(handle.result()).rejects.toThrow(/not-a-duration/); + }); + + test("handles invoke replay with non-invoke context shape", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-context-null-${randomUUID()}` }, + () => { + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-context-null-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const originalSetStepAttemptChildWorkflowRun = + backend.setStepAttemptChildWorkflowRun.bind(backend); + const setStepAttemptChildWorkflowRunSpy = vi + .spyOn(backend, "setStepAttemptChildWorkflowRun") + .mockImplementation(async (params) => { + const linked = await originalSetStepAttemptChildWorkflowRun(params); + return { + ...linked, + context: null, + }; + }); + + try { + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ ok: true }); + } finally { + setStepAttemptChildWorkflowRunSpy.mockRestore(); + } + }); + + test("handles invoke replay with legacy null timeout context", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-legacy-timeout-${randomUUID()}` }, + () => { + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-legacy-timeout-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const originalSetStepAttemptChildWorkflowRun = + backend.setStepAttemptChildWorkflowRun.bind(backend); + const setStepAttemptChildWorkflowRunSpy = vi + .spyOn(backend, "setStepAttemptChildWorkflowRun") + .mockImplementation(async (params) => { + const linked = await originalSetStepAttemptChildWorkflowRun(params); + return { + ...linked, + context: { kind: "invoke", timeoutAt: null }, + }; + }); + + try { + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ ok: true }); + } finally { + setStepAttemptChildWorkflowRunSpy.mockRestore(); + } + }); + + test("handles invoke replay with invalid timeout timestamp context", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-invalid-timeout-context-${randomUUID()}` }, + () => { + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-invalid-timeout-context-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + let childRunId: string | null = null; + const originalSetStepAttemptChildWorkflowRun = + backend.setStepAttemptChildWorkflowRun.bind(backend); + const setStepAttemptChildWorkflowRunSpy = vi + .spyOn(backend, "setStepAttemptChildWorkflowRun") + .mockImplementation(async (params) => { + childRunId = params.childWorkflowRunId; + const linked = await originalSetStepAttemptChildWorkflowRun(params); + return { + ...linked, + context: { kind: "invoke", timeoutAt: "not-a-date" }, + }; + }); + + const originalGetWorkflowRun = backend.getWorkflowRun.bind(backend); + const getWorkflowRunSpy = vi + .spyOn(backend, "getWorkflowRun") + .mockImplementation(async (params) => { + const run = await originalGetWorkflowRun(params); + if (!run || !childRunId || params.workflowRunId !== childRunId) { + return run; + } + + return { + ...run, + status: "completed", + output: { ok: true }, + finishedAt: new Date(), + workerId: null, + availableAt: null, + }; + }); + + try { + const handle = await parent.run(); + const claimedParent = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParent) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimedParent.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParent, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParent.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfter?.status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ ok: true }); + } finally { + getWorkflowRunSpy.mockRestore(); + setStepAttemptChildWorkflowRunSpy.mockRestore(); + } + }); + + test("fails invoke when child linkage is missing run id", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-link-missing-id-${randomUUID()}` }, + () => { + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-link-missing-id-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const originalSetStepAttemptChildWorkflowRun = + backend.setStepAttemptChildWorkflowRun.bind(backend); + const setStepAttemptChildWorkflowRunSpy = vi + .spyOn(backend, "setStepAttemptChildWorkflowRun") + .mockImplementation(async (params) => { + const linked = await originalSetStepAttemptChildWorkflowRun(params); + return { + ...linked, + childWorkflowRunNamespaceId: null, + childWorkflowRunId: null, + }; + }); + + try { + const handle = await parent.run(); + const claimedParent = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParent) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimedParent.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParent, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParent.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfter?.status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /could not find linked child workflow run/, + ); + } finally { + setStepAttemptChildWorkflowRunSpy.mockRestore(); + } + }); + + test("fails invoke when linked child run cannot be loaded", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-not-found-${randomUUID()}` }, + () => { + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-child-not-found-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + let childRunId: string | null = null; + const originalSetStepAttemptChildWorkflowRun = + backend.setStepAttemptChildWorkflowRun.bind(backend); + const setStepAttemptChildWorkflowRunSpy = vi + .spyOn(backend, "setStepAttemptChildWorkflowRun") + .mockImplementation(async (params) => { + childRunId = params.childWorkflowRunId; + return await originalSetStepAttemptChildWorkflowRun(params); + }); + + const originalGetWorkflowRun = backend.getWorkflowRun.bind(backend); + const getWorkflowRunSpy = vi + .spyOn(backend, "getWorkflowRun") + .mockImplementation(async (params) => { + if (childRunId && params.workflowRunId === childRunId) { + return null; + } + return await originalGetWorkflowRun(params); + }); + + try { + const handle = await parent.run(); + const claimedParent = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParent) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimedParent.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParent, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParent.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfter?.status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /could not find linked child workflow run/, + ); + } finally { + getWorkflowRunSpy.mockRestore(); + setStepAttemptChildWorkflowRunSpy.mockRestore(); + } + }); + + test("uses fallback child error when failed child run has no error payload", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-failed-null-error-${randomUUID()}` }, + () => { + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-failed-null-error-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + let childRunId: string | null = null; + const originalSetStepAttemptChildWorkflowRun = + backend.setStepAttemptChildWorkflowRun.bind(backend); + const setStepAttemptChildWorkflowRunSpy = vi + .spyOn(backend, "setStepAttemptChildWorkflowRun") + .mockImplementation(async (params) => { + childRunId = params.childWorkflowRunId; + return await originalSetStepAttemptChildWorkflowRun(params); + }); + + const originalGetWorkflowRun = backend.getWorkflowRun.bind(backend); + const getWorkflowRunSpy = vi + .spyOn(backend, "getWorkflowRun") + .mockImplementation(async (params) => { + const run = await originalGetWorkflowRun(params); + if (!run || !childRunId || params.workflowRunId !== childRunId) { + return run; + } + + return { + ...run, + status: "failed", + error: null, + finishedAt: new Date(), + }; + }); + + try { + const handle = await parent.run(); + const claimedParent = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParent) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimedParent.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParent, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParent.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const parentAfter = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfter?.status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /Child workflow run .* failed/, + ); + } finally { + getWorkflowRunSpy.mockRestore(); + setStepAttemptChildWorkflowRunSpy.mockRestore(); + } + }); + + test("surfaces canceled child workflow through parent invoke step", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-canceled-${randomUUID()}` }, + async ({ step }) => { + await step.sleep("wait", "5s"); + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-canceled-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const handle = await parent.run(); + const claimedParentFirstPass = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParentFirstPass) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimedParentFirstPass.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParentFirstPass, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParentFirstPass.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const parentAfterFirstPass = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfterFirstPass?.status).toBe("sleeping"); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const invokeAttempt = attempts.data.find( + (stepAttempt) => stepAttempt.stepName === "invoke-child", + ); + const childRunId = invokeAttempt?.childWorkflowRunId; + if (!childRunId) { + throw new Error("Expected invoke attempt child workflow run id"); + } + + await backend.cancelWorkflowRun({ + workflowRunId: childRunId, + }); + + const claimedParentReplay = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParentReplay) { + throw new Error("Expected parent replay workflow run to be claimed"); + } + expect(claimedParentReplay.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParentReplay, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParentReplay.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const parentAfterReplay = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfterReplay?.status).toBe("failed"); + await expect(handle.result()).rejects.toThrow(/was canceled/); + }, 20_000); + + test("fails invoke when workflow target string is empty", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const parent = client.defineWorkflow( + { name: `invoke-parent-empty-target-${randomUUID()}` }, + async ({ step }) => { + await step.invoke("invoke-child", { + workflow: "", + }); + return "never"; + }, + ); + + const worker = client.newWorker(); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 20, + ); + + expect(status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /Invoke workflow target must be a non-empty string/, + ); + }); + + test("surfaces child failure through parent invoke step", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-failure-${randomUUID()}` }, + async ({ step }) => { + await step.run( + { name: "fail", retryPolicy: { maximumAttempts: 1 } }, + () => { + throw new Error("child boom"); + }, + ); + return "never"; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-failure-${randomUUID()}` }, + async ({ step }) => { + await step.invoke("invoke-child", { + workflow: child.workflow, + input: null, + }); + return "never"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 20, + ); + + expect(status).toBe("failed"); + await expect(handle.result()).rejects.toThrow(/child boom/); + }, 15_000); + + test("invoke timeout fails parent wait but child continues and completes", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-timeout-${randomUUID()}` }, + async ({ step }) => { + await step.sleep("wait", "1600ms"); + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-timeout-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + timeout: "100ms", + }); + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const parentStatus = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + expect(parentStatus).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /Timed out waiting for invoked workflow to complete/, + ); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const invokeAttempt = steps.data.find( + (step) => step.stepName === "invoke-child", + ); + expect(invokeAttempt?.childWorkflowRunId).not.toBeNull(); + expect(invokeAttempt?.childWorkflowRunId).toHaveLength(36); + + const childRunId = invokeAttempt?.childWorkflowRunId; + if (!childRunId) { + throw new Error("Expected invoke attempt child workflow run id"); + } + + const runs = await backend.listWorkflowRuns({ limit: 100 }); + const childrenForInvokeAttempt = runs.data.filter( + (run) => run.parentStepAttemptId === invokeAttempt.id, + ); + expect(childrenForInvokeAttempt).toHaveLength(1); + + const childStatus = await tickUntilStatus( + backend, + worker, + childRunId, + "completed", + 500, + 20, + ); + expect(childStatus).toBe("completed"); + }, 15_000); + + test("invoke timeout still fails when child finishes after timeout before parent replay", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-timeout-order-${randomUUID()}` }, + async ({ step }) => { + await step.sleep("wait", "5s"); + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-timeout-order-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + timeout: "100ms", + }); + }, + ); + + const handle = await parent.run(); + const initialWorkerId = randomUUID(); + const claimedParentFirstPass = await backend.claimWorkflowRun({ + workerId: initialWorkerId, + leaseDurationMs: 5000, + }); + if (!claimedParentFirstPass) { + throw new Error("Expected parent workflow run to be claimed"); + } + expect(claimedParentFirstPass.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParentFirstPass, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParentFirstPass.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const invokeAttempt = steps.data.find( + (step) => step.stepName === "invoke-child", + ); + const childRunId = invokeAttempt?.childWorkflowRunId; + if (!childRunId) { + throw new Error("Expected invoke attempt child workflow run id"); + } + + const parentAfterFirstPass = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfterFirstPass?.status).toBe("sleeping"); + + await sleep(150); + + const claimedChild = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedChild) { + throw new Error("Expected child workflow run to be claimed"); + } + expect(claimedChild.id).toBe(childRunId); + + await backend.completeWorkflowRun({ + workflowRunId: childRunId, + workerId: claimedChild.workerId ?? "", + output: { ok: true }, + }); + + const claimedParentReplay = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }); + if (!claimedParentReplay) { + throw new Error("Expected parent workflow run replay to be claimed"); + } + expect(claimedParentReplay.id).toBe(handle.workflowRun.id); + + await executeWorkflow({ + backend, + workflowRun: claimedParentReplay, + workflowFn: parent.workflow.fn, + workflowVersion: parent.workflow.spec.version ?? null, + workerId: claimedParentReplay.workerId ?? "", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const parentAfterReplay = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(parentAfterReplay?.status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /Timed out waiting for invoked workflow to complete/, + ); + }, 20_000); + + test("invoke wait parks until timeout and does not use poll-loop wake-up events", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-parked-${randomUUID()}` }, + async ({ step }) => { + await step.sleep("wait", "500ms"); + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-parked-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + + const parentSleepingStatus = await tickUntilStatus( + backend, + worker, + handle.workflowRun.id, + "sleeping", + 200, + 20, + ); + expect(parentSleepingStatus).toBe("sleeping"); + + const sleepingParent = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + if (!sleepingParent?.availableAt) { + throw new Error("Expected parent invoke wait availableAt"); + } + + const millisecondsUntilWake = + sleepingParent.availableAt.getTime() - Date.now(); + expect(millisecondsUntilWake).toBeGreaterThan(6 * 24 * 60 * 60 * 1000); + expect(millisecondsUntilWake).toBeLessThan(8 * 24 * 60 * 60 * 1000); + + const parentTerminalStatus = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 500, + 20, + ); + expect(parentTerminalStatus).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ ok: true }); + }, 20_000); + + test("supports parallel invokes via Promise.all", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-parallel-${randomUUID()}` }, + ({ input }: { input: { value: number } }) => { + return input.value * 2; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-parallel-${randomUUID()}` }, + async ({ step }) => { + const [a, b] = await Promise.all([ + step.invoke("invoke-a", { + workflow: child.workflow, + input: { value: 2 }, + }), + step.invoke("invoke-b", { + workflow: child.workflow, + input: { value: 3 }, + }), + ]); + return a + b; + }, + ); + + const worker = client.newWorker({ concurrency: 3 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 300, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe(10); + }); + + test("does not create duplicate child runs while waiting across replays", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-replay-${randomUUID()}` }, + async ({ step }) => { + await step.sleep("wait", "2500ms"); + return { ok: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-replay-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 800, + 20, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ ok: true }); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const invokeAttempt = steps.data.find( + (step) => step.stepName === "invoke-child", + ); + if (!invokeAttempt) { + throw new Error("Expected invoke attempt for step invoke-child"); + } + + const runs = await backend.listWorkflowRuns({ limit: 100 }); + const childrenForInvokeAttempt = runs.data.filter( + (run) => run.parentStepAttemptId === invokeAttempt.id, + ); + expect(childrenForInvokeAttempt).toHaveLength(1); + }, 20_000); + + test("canceling parent while waiting does not cancel child workflow", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const child = client.defineWorkflow( + { name: `invoke-child-cancel-${randomUUID()}` }, + async ({ step }) => { + await step.sleep("wait", "2000ms"); + return { childDone: true }; + }, + ); + + const parent = client.defineWorkflow( + { name: `invoke-parent-cancel-${randomUUID()}` }, + async ({ step }) => { + return await step.invoke("invoke-child", { + workflow: child.workflow, + }); + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await parent.run(); + const parentSleepingStatus = await tickUntilStatus( + backend, + worker, + handle.workflowRun.id, + "sleeping", + 300, + 20, + ); + expect(parentSleepingStatus).toBe("sleeping"); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const invokeAttempt = steps.data.find( + (step) => step.stepName === "invoke-child", + ); + const childRunId = invokeAttempt?.childWorkflowRunId; + if (!childRunId) { + throw new Error("Expected invoke attempt child workflow run id"); + } + + await handle.cancel(); + + const parentStatus = await tickUntilStatus( + backend, + worker, + handle.workflowRun.id, + "canceled", + 300, + 20, + ); + expect(parentStatus).toBe("canceled"); + + const childStatus = await tickUntilStatus( + backend, + worker, + childRunId, + "completed", + 800, + 20, + ); + expect(childStatus).toBe("completed"); + }, 20_000); }); describe("executeWorkflow", () => { @@ -697,6 +1944,8 @@ describe("createStepExecutionStateFromAttempts", () => { expect(state.failedCountsByStepName.get("step-a")).toBe(2); expect(state.failedCountsByStepName.get("step-b")).toBe(1); expect(state.failedCountsByStepName.has("step-c")).toBe(false); + expect(state.runningByStepName.get("step-c")).toBe(running); + expect(state.runningByStepName.has("step-b")).toBe(false); }); test("returns empty cache and counts for empty history", () => { @@ -704,19 +1953,66 @@ describe("createStepExecutionStateFromAttempts", () => { expect(state.cache.size).toBe(0); expect(state.failedCountsByStepName.size).toBe(0); + expect(state.runningByStepName.size).toBe(0); }); }); async function createBackend(): Promise { - return await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { + const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { namespaceId: randomUUID(), }); + backendsToStop.push(backend); + + return backend; } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } +const TERMINAL_RUN_STATUSES = new Set(["completed", "failed", "canceled"]); + +async function tickUntilTerminal( + backend: BackendPostgres, + worker: ReturnType, + workflowRunId: string, + maxTicks: number, + sleepMs: number, +): Promise { + for (let i = 0; i < maxTicks; i++) { + await worker.tick(); + const run = await backend.getWorkflowRun({ workflowRunId }); + if (run && TERMINAL_RUN_STATUSES.has(run.status)) { + return run.status; + } + await sleep(sleepMs); + } + + throw new Error(`Timed out waiting for workflow run ${workflowRunId}`); +} + +async function tickUntilStatus( + backend: BackendPostgres, + worker: ReturnType, + workflowRunId: string, + expectedStatus: string, + maxTicks: number, + sleepMs: number, +): Promise { + for (let i = 0; i < maxTicks; i++) { + await worker.tick(); + const run = await backend.getWorkflowRun({ workflowRunId }); + if (run?.status === expectedStatus) { + return run.status; + } + await sleep(sleepMs); + } + + throw new Error( + `Timed out waiting for workflow run ${workflowRunId} to reach ${expectedStatus}`, + ); +} + function createMockStepAttempt( overrides: Partial = {}, ): StepAttempt { diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 378f5f15..6a2599e7 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -1,6 +1,6 @@ import type { Backend } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; -import { serializeError } from "../core/error.js"; +import { deserializeError, serializeError } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; import { @@ -9,20 +9,24 @@ import { normalizeStepOutput, calculateDateFromDuration, createSleepContext, + createInvokeContext, } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate, DEFAULT_WORKFLOW_RETRY_POLICY, type RetryPolicy, + type Workflow, + type WorkflowSpec, } from "../core/workflow-definition.js"; import type { + InvokeStepConfig, StepApi, StepFunction, StepFunctionConfig, WorkflowFunction, WorkflowRunMetadata, } from "../core/workflow-function.js"; -import type { WorkflowRun } from "../core/workflow-run.js"; +import { isTerminalStatus, type WorkflowRun } from "../core/workflow-run.js"; /** * Signal thrown when a workflow needs to sleep. Contains the time when the @@ -65,9 +69,7 @@ class StepError extends Error { } } -/** - * Retry defaults for step failures. - */ +/** Default retry policy for step failures. */ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { initialInterval: "1s", backoffCoefficient: 2, @@ -75,6 +77,15 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { maximumAttempts: 10, }; +/** + * Retry policy for invoke step failures (no retries - the child workflow + * is responsible for retries). + */ +const INVOKE_FAILURE_RETRY_POLICY: RetryPolicy = { + ...DEFAULT_STEP_RETRY_POLICY, + maximumAttempts: 1, +}; + /** * Resolve a partial step retry policy by merging it with step defaults. * @param partial - Optional partial retry policy @@ -91,6 +102,7 @@ function resolveStepRetryPolicy(partial?: Partial): RetryPolicy { export interface StepExecutionState { cache: StepAttemptCache; failedCountsByStepName: ReadonlyMap; + runningByStepName: ReadonlyMap; } /** @@ -103,6 +115,7 @@ export function createStepExecutionStateFromAttempts( ): StepExecutionState { const cache = new Map(); const failedCountsByStepName = new Map(); + const runningByStepName = new Map(); for (const attempt of attempts) { if (attempt.status === "completed" || attempt.status === "succeeded") { @@ -113,15 +126,130 @@ export function createStepExecutionStateFromAttempts( if (attempt.status === "failed") { const previousCount = failedCountsByStepName.get(attempt.stepName) ?? 0; failedCountsByStepName.set(attempt.stepName, previousCount + 1); + continue; } + + runningByStepName.set(attempt.stepName, attempt); } return { cache, failedCountsByStepName, + runningByStepName, }; } +/** + * Resolve invoke timeout input to an absolute deadline. + * @param timeout - Relative/absolute timeout input + * @returns Absolute timeout deadline + * @throws {Error} When timeout is invalid + */ +function resolveInvokeTimeoutAt( + timeout: number | string | Date | undefined, +): Date { + if (timeout === undefined) { + return defaultInvokeTimeoutAt(); + } + + if (timeout instanceof Date) { + return timeout; + } + + if (typeof timeout === "number") { + if (!Number.isFinite(timeout) || timeout < 0) { + throw new Error("Invoke timeout must be a non-negative number"); + } + return new Date(Date.now() + timeout); + } + + const result = calculateDateFromDuration(timeout as DurationString); + if (!result.ok) { + throw result.error; + } + return result.value; +} + +/** + * Default invoke timeout: 7 days from a base time. + * @param base - Base timestamp (defaults to now) + * @returns Timeout deadline + */ +function defaultInvokeTimeoutAt(base: Readonly = new Date()): Date { + const timeoutAt = new Date(base); + timeoutAt.setDate(timeoutAt.getDate() + 7); + return timeoutAt; +} + +/** + * Extract the invoke timeout from a persisted step attempt's context. + * @param attempt - Running invoke step attempt + * @returns Timeout deadline, or null when context is not invoke + */ +function getInvokeTimeoutAt(attempt: Readonly): Date | null { + if (attempt.context?.kind !== "invoke") { + return null; + } + + if (attempt.context.timeoutAt === null) { + // Backward compatibility for previously persisted invoke contexts. + return defaultInvokeTimeoutAt(attempt.createdAt); + } + + return new Date(attempt.context.timeoutAt); +} + +/** + * Determine whether the invoke timeout has elapsed before the child completed. + * @param attempt - Running invoke step attempt + * @param childRun - Linked child workflow run + * @returns True when timeout elapsed before child terminal completion + */ +function hasInvokeTimedOut( + attempt: Readonly, + childRun: Readonly, +): boolean { + const timeoutAt = getInvokeTimeoutAt(attempt); + if (!timeoutAt) return false; + + const timeoutMs = timeoutAt.getTime(); + if (!Number.isFinite(timeoutMs)) return false; + if (Date.now() < timeoutMs) return false; + + if (isTerminalStatus(childRun.status) && childRun.finishedAt) { + return childRun.finishedAt.getTime() > timeoutMs; + } + + return true; +} + +/** + * Normalize a workflow target (string | WorkflowSpec | Workflow) to a + * WorkflowSpec. + * @param workflow - Workflow target reference + * @returns WorkflowSpec for child run creation + */ +function toWorkflowSpec( + workflow: + | WorkflowSpec + | Workflow + | string, +): WorkflowSpec { + if (typeof workflow === "string") { + return { name: workflow }; + } + return "spec" in workflow ? workflow.spec : workflow; +} + +/** + * Build deterministic idempotency key for child workflow invocation. + * @param attempt - Parent invoke step attempt + * @returns Stable idempotency key + */ +function buildInvokeIdempotencyKey(attempt: Readonly): string { + return `__invoke:${attempt.namespaceId}:${attempt.id}`; +} + /** * Configures the options for a StepExecutor. */ @@ -142,6 +270,7 @@ class StepExecutor implements StepApi { private readonly workerId: string; private cache: StepAttemptCache; private readonly failedCountsByStepName: Map; + private readonly runningByStepName: Map; constructor(options: Readonly) { this.backend = options.backend; @@ -151,8 +280,11 @@ class StepExecutor implements StepApi { const state = createStepExecutionStateFromAttempts(options.attempts); this.cache = state.cache; this.failedCountsByStepName = new Map(state.failedCountsByStepName); + this.runningByStepName = new Map(state.runningByStepName); } + // ---- step.run ----------------------------------------------------------- + async run( config: Readonly, fn: StepFunction, @@ -174,6 +306,7 @@ class StepExecutor implements StepApi { config: {}, context: null, }); + this.runningByStepName.set(name, attempt); try { // execute step function @@ -190,10 +323,12 @@ class StepExecutor implements StepApi { // cache result this.cache = addToStepAttemptCache(this.cache, savedAttempt); + this.runningByStepName.delete(name); return savedAttempt.output as Output; } catch (error) { // mark failure + this.runningByStepName.delete(name); await this.backend.failStepAttempt({ workflowRunId: this.workflowRunId, stepAttemptId: attempt.id, @@ -201,8 +336,8 @@ class StepExecutor implements StepApi { error: serializeError(error), }); - const previousFailedAttempts = this.failedCountsByStepName.get(name) ?? 0; - const stepFailedAttempts = previousFailedAttempts + 1; + const stepFailedAttempts = + (this.failedCountsByStepName.get(name) ?? 0) + 1; this.failedCountsByStepName.set(name, stepFailedAttempts); throw new StepError({ @@ -214,6 +349,8 @@ class StepExecutor implements StepApi { } } + // ---- step.sleep --------------------------------------------------------- + async sleep(name: string, duration: DurationString): Promise { // return cached result if this sleep already completed const existingAttempt = getCachedStepAttempt(this.cache, name); @@ -241,6 +378,233 @@ class StepExecutor implements StepApi { // when the workflow resumes throw new SleepSignal(resumeAt); } + + // ---- step.invoke -------------------------------------------------------- + + async invoke( + stepName: string, + opts: Readonly>, + ): Promise { + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) { + return existingAttempt.output as Output; + } + + // Resume a running invoke attempt (replay path) + const runningAttempt = this.runningByStepName.get(stepName); + if (runningAttempt?.kind === "invoke") { + return await this.resolveRunningInvoke(stepName, runningAttempt, opts); + } + + // First encounter — create the invoke step and child workflow run + const timeoutAt = resolveInvokeTimeoutAt(opts.timeout); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "invoke", + config: {}, + context: createInvokeContext(timeoutAt), + }); + this.runningByStepName.set(stepName, attempt); + + const linkedAttempt = await this.linkChildWorkflowRun( + stepName, + attempt, + opts, + ).catch( + async (error: unknown) => + await this.failStepWithError( + stepName, + attempt.id, + error, + INVOKE_FAILURE_RETRY_POLICY, + ), + ); + + return await this.resolveRunningInvoke(stepName, linkedAttempt, opts); + } + + /** + * Resolve a running invoke attempt — check child status and either complete, + * fail, or go back to sleep. + * @param stepName - Invoke step name + * @param runningAttempt - Previously created invoke step attempt + * @param opts - Invoke step configuration + * @returns The child workflow output when available + */ + private async resolveRunningInvoke( + stepName: string, + runningAttempt: Readonly, + opts: Readonly>, + ): Promise { + // Ensure the invoke attempt has a linked child (may need to create one if + // a previous attempt crashed before linking) + const invokeAttempt = + runningAttempt.childWorkflowRunId && + runningAttempt.childWorkflowRunNamespaceId + ? runningAttempt + : await this.linkChildWorkflowRun(stepName, runningAttempt, opts); + + const childId = invokeAttempt.childWorkflowRunId; + if (!childId) { + return await this.failStepWithError( + stepName, + invokeAttempt.id, + new Error( + `Invoke step "${stepName}" could not find linked child workflow run`, + ), + INVOKE_FAILURE_RETRY_POLICY, + ); + } + + const childRun = await this.backend.getWorkflowRun({ + workflowRunId: childId, + }); + if (!childRun) { + return await this.failStepWithError( + stepName, + invokeAttempt.id, + new Error( + `Invoke step "${stepName}" could not find linked child workflow run "${childId}"`, + ), + INVOKE_FAILURE_RETRY_POLICY, + ); + } + + // Check timeout before checking child result + if (hasInvokeTimedOut(invokeAttempt, childRun)) { + return await this.failStepWithError( + stepName, + invokeAttempt.id, + new Error("Timed out waiting for invoked workflow to complete"), + INVOKE_FAILURE_RETRY_POLICY, + ); + } + + // Child completed successfully — propagate result + if (childRun.status === "completed" || childRun.status === "succeeded") { + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: invokeAttempt.id, + workerId: this.workerId, + output: childRun.output, + }); + this.runningByStepName.delete(stepName); + this.cache = addToStepAttemptCache(this.cache, completed); + return completed.output as Output; + } + + // Child failed — propagate its error + if (childRun.status === "failed") { + const childError = + childRun.error === null + ? new Error(`Child workflow run "${childRun.id}" failed`) + : deserializeError(childRun.error); + return await this.failStepWithError( + stepName, + invokeAttempt.id, + childError, + INVOKE_FAILURE_RETRY_POLICY, + ); + } + + // Child canceled — propagate as error + if (childRun.status === "canceled") { + return await this.failStepWithError( + stepName, + invokeAttempt.id, + new Error( + `Invoke step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`, + ), + INVOKE_FAILURE_RETRY_POLICY, + ); + } + + // Child still running — sleep until timeout + const timeoutAt = getInvokeTimeoutAt(invokeAttempt); + throw new SleepSignal( + timeoutAt ?? defaultInvokeTimeoutAt(invokeAttempt.createdAt), + ); + } + + /** + * Create (or dedupe) the child workflow run and persist the linkage on the + * parent invoke step attempt. + * @param stepName - Parent invoke step name + * @param attempt - Parent invoke step attempt + * @param opts - Invoke step configuration + * @returns Updated step attempt with child linkage + */ + private async linkChildWorkflowRun( + stepName: string, + attempt: Readonly, + opts: Readonly>, + ): Promise { + const workflow = opts.workflow; + if (typeof workflow === "string" && workflow.length === 0) { + throw new Error("Invoke workflow target must be a non-empty string"); + } + + const spec = toWorkflowSpec(workflow); + const childRun = await this.backend.createWorkflowRun({ + workflowName: spec.name, + version: spec.version ?? null, + idempotencyKey: buildInvokeIdempotencyKey(attempt), + config: {}, + context: null, + input: normalizeStepOutput(opts.input), + parentStepAttemptNamespaceId: attempt.namespaceId, + parentStepAttemptId: attempt.id, + availableAt: null, + deadlineAt: null, + }); + + const linked = await this.backend.setStepAttemptChildWorkflowRun({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + childWorkflowRunNamespaceId: childRun.namespaceId, + childWorkflowRunId: childRun.id, + }); + this.runningByStepName.set(stepName, linked); + + return linked; + } + + /** + * Record a step failure, update the failed-attempt counter, and throw a + * StepError. Shared by both `step.run` failures and invoke failures. + * @param stepName - Step name + * @param stepAttemptId - Step attempt id + * @param error - Error that caused the failure + * @param retryPolicy - Retry policy for this failure + */ + private async failStepWithError( + stepName: string, + stepAttemptId: string, + error: unknown, + retryPolicy: RetryPolicy, + ): Promise { + this.runningByStepName.delete(stepName); + await this.backend.failStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId, + workerId: this.workerId, + error: serializeError(error), + }); + + const stepFailedAttempts = + (this.failedCountsByStepName.get(stepName) ?? 0) + 1; + this.failedCountsByStepName.set(stepName, stepFailedAttempts); + + throw new StepError({ + stepName, + stepFailedAttempts, + retryPolicy, + error, + }); + } } /** diff --git a/packages/openworkflow/worker/worker.test.ts b/packages/openworkflow/worker/worker.test.ts index 6db07c4f..5b3de723 100644 --- a/packages/openworkflow/worker/worker.test.ts +++ b/packages/openworkflow/worker/worker.test.ts @@ -87,6 +87,8 @@ describe("Worker", () => { config: {}, context: null, input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -1183,6 +1185,8 @@ describe("Worker", () => { config: {}, context: null, input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -1218,6 +1222,8 @@ describe("Worker", () => { config: {}, context: null, input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); @@ -1253,6 +1259,8 @@ describe("Worker", () => { config: {}, context: null, input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, availableAt: null, deadlineAt: null, }); From d85c439c23ea5526afa62e47806b0b595bb18b57 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Wed, 25 Feb 2026 14:54:58 -0600 Subject: [PATCH 2/2] refactor: rename step.invoke to step.invokeWorkflow --- ARCHITECTURE.md | 10 ++-- openworkflow/hello-world-parent.ts | 2 +- packages/docs/docs/roadmap.mdx | 2 +- packages/docs/docs/steps.mdx | 4 +- packages/docs/docs/workflows.mdx | 28 +++++------ .../openworkflow/core/workflow-function.ts | 8 +-- .../openworkflow/worker/execution.test.ts | 50 +++++++++---------- packages/openworkflow/worker/execution.ts | 4 +- 8 files changed, 54 insertions(+), 54 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 17c2a82c..b1f6eed2 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -51,8 +51,8 @@ A workflow run can be in one of the following states: to claim it. - **`running`**: The workflow run is actively being executed by a worker. - **`sleeping`**: The workflow run is waiting for a duration to elapse - (`step.sleep`) or waiting for a child workflow result (`step.invoke`). The - `availableAt` timestamp controls when it becomes available again. + (`step.sleep`) or waiting for a child workflow result (`step.invokeWorkflow`). + The `availableAt` timestamp controls when it becomes available again. - **`completed`**: The workflow run has completed successfully. - **`failed`**: The workflow run has failed after exhausting retries or deadline reached. @@ -220,9 +220,9 @@ worker slot for other work - it's not a blocking sleep but a durable pause. await step.sleep("wait-one-hour", "1h"); ``` -**`step.invoke(name, options)`**: Starts a child workflow and waits for it -durably. When the timeout is reached (default 7d), the parent step fails but the -child workflow continues running independently. +**`step.invokeWorkflow(name, options)`**: Starts a child workflow and waits for +it durably. When the timeout is reached (default 7d), the parent step fails but +the child workflow continues running independently. ## 4. Error Handling & Retries diff --git a/openworkflow/hello-world-parent.ts b/openworkflow/hello-world-parent.ts index d6910cf7..25f66087 100644 --- a/openworkflow/hello-world-parent.ts +++ b/openworkflow/hello-world-parent.ts @@ -9,7 +9,7 @@ export const helloWorldParent = defineWorkflow( async ({ step, run }) => { console.log(`[run ${run.id}]`); - const childResult = await step.invoke("hello-world-child", { + const childResult = await step.invokeWorkflow("hello-world-child", { workflow: helloWorld, }); diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx index dcf1ed4e..354822fa 100644 --- a/packages/docs/docs/roadmap.mdx +++ b/packages/docs/docs/roadmap.mdx @@ -19,7 +19,7 @@ description: What's coming next for OpenWorkflow - ✅ Configurable retry policies - ✅ Idempotency keys - ✅ Prometheus `/metrics` endpoint -- ✅ Child workflows (`step.invoke`) +- ✅ Child workflows (`step.invokeWorkflow`) ## Coming Soon diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 65453380..348ef25b 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -138,12 +138,12 @@ Pauses the workflow until a specified duration has elapsed. See await step.sleep("wait-one-hour", "1h"); ``` -### `step.invoke()` +### `step.invokeWorkflow()` Starts a child workflow and waits for its result durably: ```ts -const childOutput = await step.invoke("generate-report", { +const childOutput = await step.invokeWorkflow("generate-report", { workflow: generateReportWorkflow, input: { reportId: input.reportId }, timeout: "5m", // optional, defaults to 7 days diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index d866db2e..12a913a6 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -213,12 +213,12 @@ create a separate run. The workflow function receives an object with four properties: -| Parameter | Type | Description | -| --------- | --------------------- | ---------------------------------------------------------------- | -| `input` | Generic | The input data passed when starting the workflow | -| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.invoke`) | -| `version` | `string \| null` | The workflow version, if specified | -| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | +| Parameter | Type | Description | +| --------- | --------------------- | ------------------------------------------------------------------------ | +| `input` | Generic | The input data passed when starting the workflow | +| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.invokeWorkflow`) | +| `version` | `string \| null` | The workflow version, if specified | +| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | ```ts defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { @@ -236,14 +236,14 @@ defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { A workflow run progresses through these states: -| Status | Description | -| ----------- | ------------------------------------------------------ | -| `pending` | Created and waiting for a worker to claim it | -| `running` | Actively being executed by a worker | -| `sleeping` | Paused while waiting for `step.sleep` or `step.invoke` | -| `completed` | Finished successfully | -| `failed` | Failed after exhausting retries or deadline reached | -| `canceled` | Explicitly canceled and will not continue | +| Status | Description | +| ----------- | -------------------------------------------------------------- | +| `pending` | Created and waiting for a worker to claim it | +| `running` | Actively being executed by a worker | +| `sleeping` | Paused while waiting for `step.sleep` or `step.invokeWorkflow` | +| `completed` | Finished successfully | +| `failed` | Failed after exhausting retries or deadline reached | +| `canceled` | Explicitly canceled and will not continue | ## Determinism diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 744e1b23..79be0a65 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -31,7 +31,7 @@ export type StepFunction = () => | undefined; /** - * Target workflow reference for `step.invoke`. + * Target workflow reference for `step.invokeWorkflow`. */ type InvokeWorkflowTarget = | WorkflowSpec @@ -39,7 +39,7 @@ type InvokeWorkflowTarget = | string; /** - * Config for invoking a child workflow from `step.invoke()`. + * Config for invoking a child workflow from `step.invokeWorkflow()`. */ export interface InvokeStepConfig< Input = unknown, @@ -54,7 +54,7 @@ export interface InvokeStepConfig< /** * Represents the API for defining steps within a workflow. Used within a * workflow handler to define steps by calling `step.run()`, `step.sleep()`, - * and `step.invoke()`. + * and `step.invokeWorkflow()`. */ export interface StepApi { run: ( @@ -62,7 +62,7 @@ export interface StepApi { fn: StepFunction, ) => Promise; sleep: (name: string, duration: DurationString) => Promise; - invoke: ( + invokeWorkflow: ( name: string, opts: Readonly>, ) => Promise; diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 85948a18..357e28bc 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -203,7 +203,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow<{ value: number }, number>( { name: `invoke-parent-success-${randomUUID()}` }, async ({ input, step }) => { - const childResult = await step.invoke("invoke-child", { + const childResult = await step.invokeWorkflow("invoke-child", { workflow: child.workflow, input: { value: input.value }, }); @@ -239,7 +239,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-early-finish-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -319,7 +319,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-step-shape-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, input: { value: 9 }, }); @@ -354,22 +354,22 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-timeout-shapes-${randomUUID()}` }, async ({ step }) => { - const first = await step.invoke("invoke-cached", { + const first = await step.invokeWorkflow("invoke-cached", { workflow: child.workflow.spec.name, input: { value: 4 }, timeout: new Date(Date.now() + 60_000), }); - const second = await step.invoke("invoke-cached", { + const second = await step.invokeWorkflow("invoke-cached", { workflow: child.workflow.spec.name, input: { value: 99 }, timeout: 60_000, }); - const numeric = await step.invoke("invoke-number-timeout", { + const numeric = await step.invokeWorkflow("invoke-number-timeout", { workflow: child.workflow.spec.name, input: { value: 8 }, timeout: 60_000, }); - const spec = await step.invoke("invoke-spec-target", { + const spec = await step.invokeWorkflow("invoke-spec-target", { workflow: { name: child.workflow.spec.name }, input: { value: 1 }, timeout: 60_000, @@ -412,7 +412,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-invalid-timeout-number-${randomUUID()}` }, async ({ step }) => { - await step.invoke("invoke-child", { + await step.invokeWorkflow("invoke-child", { workflow: `invoke-child-invalid-timeout-number-${randomUUID()}`, timeout: -1, }); @@ -443,7 +443,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-invalid-timeout-duration-${randomUUID()}` }, async ({ step }) => { - await step.invoke("invoke-child", { + await step.invokeWorkflow("invoke-child", { workflow: `invoke-child-invalid-timeout-duration-${randomUUID()}`, timeout: "not-a-duration" as DurationString, }); @@ -479,7 +479,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-context-null-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -529,7 +529,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-legacy-timeout-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -579,7 +579,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-invalid-timeout-context-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -663,7 +663,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-link-missing-id-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -728,7 +728,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-child-not-found-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -801,7 +801,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-failed-null-error-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -882,7 +882,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-canceled-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -960,7 +960,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-empty-target-${randomUUID()}` }, async ({ step }) => { - await step.invoke("invoke-child", { + await step.invokeWorkflow("invoke-child", { workflow: "", }); return "never"; @@ -1003,7 +1003,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-failure-${randomUUID()}` }, async ({ step }) => { - await step.invoke("invoke-child", { + await step.invokeWorkflow("invoke-child", { workflow: child.workflow, input: null, }); @@ -1040,7 +1040,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-timeout-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, timeout: "100ms", }); @@ -1108,7 +1108,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-timeout-order-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, timeout: "100ms", }); @@ -1211,7 +1211,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-parked-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -1268,11 +1268,11 @@ describe("StepExecutor", () => { { name: `invoke-parent-parallel-${randomUUID()}` }, async ({ step }) => { const [a, b] = await Promise.all([ - step.invoke("invoke-a", { + step.invokeWorkflow("invoke-a", { workflow: child.workflow, input: { value: 2 }, }), - step.invoke("invoke-b", { + step.invokeWorkflow("invoke-b", { workflow: child.workflow, input: { value: 3 }, }), @@ -1310,7 +1310,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-replay-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, @@ -1362,7 +1362,7 @@ describe("StepExecutor", () => { const parent = client.defineWorkflow( { name: `invoke-parent-cancel-${randomUUID()}` }, async ({ step }) => { - return await step.invoke("invoke-child", { + return await step.invokeWorkflow("invoke-child", { workflow: child.workflow, }); }, diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 6a2599e7..9ff55d2b 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -379,9 +379,9 @@ class StepExecutor implements StepApi { throw new SleepSignal(resumeAt); } - // ---- step.invoke -------------------------------------------------------- + // ---- step.invokeWorkflow ----------------------------------------------- - async invoke( + async invokeWorkflow( stepName: string, opts: Readonly>, ): Promise {