diff --git a/examples/signals/index.ts b/examples/signals/index.ts new file mode 100644 index 00000000..ae353f51 --- /dev/null +++ b/examples/signals/index.ts @@ -0,0 +1,157 @@ +import { BackendPostgres } from "@openworkflow/backend-postgres"; +import { randomUUID } from "node:crypto"; +import { OpenWorkflow, SignalTimeoutError } from "openworkflow"; + +const databaseUrl = "postgresql://postgres:postgres@localhost:5432/postgres"; +const backend = await BackendPostgres.connect(databaseUrl, { + namespaceId: randomUUID(), +}); +const ow = new OpenWorkflow({ backend }); + +interface ApprovalRequest { + documentId: string; + requestedBy: string; +} + +interface ApprovalSignal { + approved: boolean; + reviewedBy: string; + comment?: string; +} + +interface ApprovalResult { + documentId: string; + status: "approved" | "rejected" | "timed-out"; + reviewedBy?: string | undefined; + comment?: string | undefined; +} + +/** + * An approval workflow that pauses and waits for an external signal before + * continuing. Demonstrates step.waitForSignal() with an optional timeout. + */ +const approvalWorkflow = ow.defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + // Simulate sending a notification to a reviewer + await step.run({ name: "send-notification" }, () => { + console.log( + `Notification sent to reviewer for document "${input.documentId}" (requested by ${input.requestedBy})`, + ); + }); + + // Pause and wait for an external approval signal (timeout after 10 seconds + // for demo purposes; in production this would be hours or days) + let approval: ApprovalSignal; + try { + approval = await step.waitForSignal("approval-decision", { + timeout: "10s", + }); + } catch (error) { + if (error instanceof SignalTimeoutError) { + console.log("No approval received within timeout — auto-rejecting."); + return { + documentId: input.documentId, + status: "timed-out", + }; + } + throw error; + } + + // Continue processing based on the signal payload + const result = await step.run({ name: "process-decision" }, () => { + const status = approval.approved ? "approved" : "rejected"; + console.log( + `Document "${input.documentId}" ${status} by ${approval.reviewedBy}` + + (approval.comment ? `: "${approval.comment}"` : ""), + ); + return { + documentId: input.documentId, + status: status as ApprovalResult["status"], + reviewedBy: approval.reviewedBy, + comment: approval.comment, + }; + }); + + return result; + }, +); + +async function main() { + const worker = ow.newWorker({ concurrency: 2 }); + await worker.start(); + console.log("Worker started.\n"); + + // --- Demo 1: signal arrives in time --- + console.log("=== Demo 1: Signal arrives in time ==="); + const handle1 = await approvalWorkflow.run({ + documentId: "doc-001", + requestedBy: "alice", + }); + console.log(`Workflow started: ${handle1.workflowRun.id}`); + + // Wait a moment for the workflow to reach the waitForSignal step + await sleep(500); + + // Send the approval signal from outside the workflow + const signalResult = await handle1.sendSignal("approval-decision", { + approved: true, + reviewedBy: "bob", + comment: "Looks good!", + } satisfies ApprovalSignal); + console.log(`Signal delivered: ${JSON.stringify(signalResult)}`); + + const result1 = await handle1.result(); + console.log(`Result: ${JSON.stringify(result1)}\n`); + + // --- Demo 2: timeout elapses before signal arrives --- + console.log("=== Demo 2: Timeout (no signal sent) ==="); + const handle2 = await approvalWorkflow.run({ + documentId: "doc-002", + requestedBy: "charlie", + }); + console.log(`Workflow started: ${handle2.workflowRun.id}`); + console.log("Waiting for timeout (10s)..."); + + const result2 = await handle2.result({ timeoutMs: 30_000 }); + console.log(`Result: ${JSON.stringify(result2)}\n`); + + // --- Demo 3: sendSignal via ow.sendSignal() with a run ID --- + console.log("=== Demo 3: sendSignal via ow.sendSignal() ==="); + const handle3 = await approvalWorkflow.run({ + documentId: "doc-003", + requestedBy: "diana", + }); + console.log(`Workflow started: ${handle3.workflowRun.id}`); + + await sleep(500); + + // Use the top-level ow.sendSignal() instead of handle.sendSignal() + const signalResult3 = await ow.sendSignal( + handle3.workflowRun.id, + "approval-decision", + { + approved: false, + reviewedBy: "eve", + comment: "Needs revision.", + } satisfies ApprovalSignal, + ); + console.log(`Signal delivered: ${JSON.stringify(signalResult3)}`); + + const result3 = await handle3.result(); + console.log(`Result: ${JSON.stringify(result3)}\n`); + + console.log("Stopping worker..."); + await worker.stop(); + await backend.stop(); + console.log("Done."); +} + +await main().catch((error: unknown) => { + console.error(error); + process.exitCode = 1; +}); + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/examples/signals/package.json b/examples/signals/package.json new file mode 100644 index 00000000..d296678e --- /dev/null +++ b/examples/signals/package.json @@ -0,0 +1,15 @@ +{ + "name": "example-signals", + "private": true, + "type": "module", + "scripts": { + "start": "tsx index.ts" + }, + "dependencies": { + "@openworkflow/backend-postgres": "*", + "openworkflow": "*" + }, + "devDependencies": { + "tsx": "^4.21.0" + } +} diff --git a/examples/signals/tsconfig.json b/examples/signals/tsconfig.json new file mode 100644 index 00000000..1958d50c --- /dev/null +++ b/examples/signals/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": ["../../tsconfig.base.json"], + "compilerOptions": { + "outDir": "dist" + }, + "include": ["**/*.ts"], + "exclude": ["dist"] +} diff --git a/package-lock.json b/package-lock.json index e671598a..099b4a82 100644 --- a/package-lock.json +++ b/package-lock.json @@ -81,6 +81,16 @@ "tsx": "^4.21.0" } }, + "examples/signals": { + "name": "example-signals", + "dependencies": { + "@openworkflow/backend-postgres": "*", + "openworkflow": "*" + }, + "devDependencies": { + "tsx": "^4.21.0" + } + }, "examples/with-schema-validation": { "name": "example-with-zod-schema", "dependencies": { @@ -9832,6 +9842,10 @@ "resolved": "examples/declare-workflow", "link": true }, + "node_modules/example-signals": { + "resolved": "examples/signals", + "link": true + }, "node_modules/example-with-zod-schema": { "resolved": "examples/with-schema-validation", "link": true diff --git a/packages/docs/docs.json b/packages/docs/docs.json index 13eb3f90..843a9733 100644 --- a/packages/docs/docs.json +++ b/packages/docs/docs.json @@ -39,6 +39,7 @@ "group": "Guides", "pages": [ "docs/sleeping", + "docs/signals", "docs/parallel-steps", "docs/dynamic-steps", "docs/child-workflows", diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx new file mode 100644 index 00000000..2178e103 --- /dev/null +++ b/packages/docs/docs/signals.mdx @@ -0,0 +1,308 @@ +--- +title: Signals +description: Pause workflows and resume them with external data +--- + +Sometimes a workflow needs to stop and wait for something that happens outside +the process — a human approving a request, a webhook arriving, or another +service completing work. `step.waitForSignal()` pauses a workflow durably until +an external caller delivers a named signal. + +While the workflow is waiting, no worker slot is held. The run sits in `running` +state with `workerId = null` until a signal arrives, at which point it's woken +up and resumed with the signal's payload as the return value. + +## Basic Usage + +```ts +import { OpenWorkflow, SignalTimeoutError } from "openworkflow"; + +const ow = new OpenWorkflow({ backend }); + +interface ApprovalSignal { + approved: boolean; + comment?: string; +} + +const approvalWorkflow = ow.defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + await step.run({ name: "notify-reviewer" }, async () => { + await email.send({ to: input.reviewerEmail, subject: "Approval needed" }); + }); + + // Pause until someone sends the "approval-decision" signal + const decision = + await step.waitForSignal("approval-decision"); + + return decision.approved ? "approved" : "rejected"; + }, +); +``` + +## Sending a Signal + +Send a signal using the workflow run handle or the top-level client: + +```ts +// From the run handle +const handle = await approvalWorkflow.run({ reviewerEmail: "bob@example.com" }); +await handle.sendSignal("approval-decision", { approved: true }); + +// From the top-level client, using the run ID +await ow.sendSignal(handle.workflowRun.id, "approval-decision", { + approved: true, +}); +``` + +`sendSignal` returns a result indicating whether the signal was delivered: + +```ts +const result = await handle.sendSignal("approval-decision", { approved: true }); + +if (result.delivered) { + console.log("Signal delivered"); +} else { + console.log("Not delivered:", result.reason); + // reason: "workflow_not_found" | "signal_not_waiting" +} +``` + +## How Signals Work + +When a workflow encounters `step.waitForSignal()`: + +1. A signal step attempt is created with `status = running` +2. The workflow is durably parked with `workerId = null` +3. The worker slot is freed for other work + +When `sendSignal()` is called: + +1. The signal payload is written to the step attempt +2. The workflow run is woken up (`availableAt` set to now) +3. A worker claims the run and replays the workflow +4. `waitForSignal()` returns the payload and execution continues + +## Timeout + +By default, a workflow waits indefinitely for a signal (bounded by the +workflow's own deadline). Pass `timeout` to fail the step if no signal +arrives in time: + +```ts +const decision = await step.waitForSignal("approval-decision", { + timeout: "7d", +}); +``` + +When the timeout elapses, `waitForSignal()` throws a `SignalTimeoutError` that +you can catch inside the workflow: + +```ts +import { SignalTimeoutError } from "openworkflow"; + +const approvalWorkflow = ow.defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + try { + const decision = await step.waitForSignal( + "approval-decision", + { timeout: "7d" }, + ); + return decision.approved ? "approved" : "rejected"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + return "timed-out"; + } + throw error; + } + }, +); +``` + +`timeout` accepts a [duration string](/docs/sleeping#duration-formats), a +number of milliseconds, or a `Date`: + +```ts +// Duration string +await step.waitForSignal("sig", { timeout: "24h" }); + +// Milliseconds +await step.waitForSignal("sig", { timeout: 60_000 }); + +// Absolute deadline +await step.waitForSignal("sig", { timeout: new Date("2026-12-31") }); +``` + +## Signal Names + +Each signal name must match exactly between `waitForSignal()` and +`sendSignal()`. Names follow the same rules as step names — they're deduplicated +automatically if you use the same name more than once in a workflow: + +```ts +// First call uses "approval" +const first = await step.waitForSignal("approval"); + +// Second call becomes "approval:1" +const second = await step.waitForSignal("approval"); +``` + + + Signal names are case-sensitive. `"Approval"` and `"approval"` are different + signals. + + +## Payload + +Signals carry a typed JSON payload. Use a type parameter on `waitForSignal` to +type the resolved value: + +```ts +interface PaymentConfirmed { + transactionId: string; + amount: number; +} + +const payment = await step.waitForSignal("payment-confirmed"); +console.log(payment.transactionId); +``` + +Signals with no payload send `null` and are still treated as delivered: + +```ts +// Sender +await handle.sendSignal("proceed"); + +// Workflow +const payload = await step.waitForSignal("proceed"); // null +``` + +## Common Patterns + +### Human-in-the-Loop Approval + +```ts +const reviewWorkflow = ow.defineWorkflow( + { name: "content-review" }, + async ({ input, step }) => { + await step.run({ name: "submit-for-review" }, async () => { + await db.reviews.create({ + contentId: input.contentId, + status: "pending", + }); + await email.send({ to: "editor@example.com", subject: "Review needed" }); + }); + + try { + const decision = await step.waitForSignal<{ approved: boolean }>( + "review-decision", + { timeout: "3d" }, + ); + + await step.run({ name: "apply-decision" }, async () => { + await db.content.update(input.contentId, { + status: decision.approved ? "published" : "rejected", + }); + }); + + return decision.approved ? "published" : "rejected"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + await step.run({ name: "escalate" }, async () => { + await email.send({ + to: "manager@example.com", + subject: "Review overdue", + }); + }); + return "escalated"; + } + throw error; + } + }, +); +``` + +### Webhook Confirmation + +```ts +const orderWorkflow = ow.defineWorkflow( + { name: "order-workflow" }, + async ({ input, step }) => { + const charge = await step.run({ name: "create-charge" }, async () => { + return await payments.createCharge({ amount: input.amount }); + }); + + // Wait for the payment provider's webhook to confirm the charge + const confirmation = await step.waitForSignal<{ status: string }>( + "payment-webhook", + { timeout: "10m" }, + ); + + if (confirmation.status !== "succeeded") { + throw new Error(`Payment failed: ${confirmation.status}`); + } + + await step.run({ name: "fulfill-order" }, async () => { + await orders.fulfill(input.orderId); + }); + }, +); + +// In your webhook handler: +app.post("/webhooks/payment", async (req, res) => { + const { workflowRunId, status } = req.body; + await ow.sendSignal(workflowRunId, "payment-webhook", { status }); + res.sendStatus(200); +}); +``` + +### Gate on External Completion + +```ts +const deployWorkflow = ow.defineWorkflow( + { name: "deploy-workflow" }, + async ({ input, step }) => { + await step.run({ name: "trigger-ci" }, async () => { + await ci.triggerPipeline({ + repo: input.repo, + sha: input.sha, + callbackRunId: input.workflowRunId, + }); + }); + + // Wait for CI to report back (up to 1 hour) + const result = await step.waitForSignal<{ passed: boolean }>("ci-result", { + timeout: "1h", + }); + + if (!result.passed) { + throw new Error("CI pipeline failed"); + } + + await step.run({ name: "deploy" }, async () => { + await deploy.release(input.repo, input.sha); + }); + }, +); +``` + +## Memoization + +Once a signal is received, the step is memoized. If the workflow replays after +the signal has been delivered, `waitForSignal()` returns the stored payload +immediately without parking again: + +```ts +// On first execution: parks and waits for "approval-decision" +// On replay after delivery: returns immediately with the stored payload +const decision = await step.waitForSignal("approval-decision"); +``` + + + Sending a signal to a workflow that is not currently waiting for it (e.g. + it's still running steps before `waitForSignal`) returns + `{ delivered: false, reason: "signal_not_waiting" }`. You can either retry + delivery or store the signal externally and deliver it once the workflow + parks. + diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index 61b09a0a..afd6c259 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -1,5 +1,11 @@ -import type { Backend } from "../core/backend.js"; +import type { Backend, DeliverSignalResult } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; +import type { JsonValue } from "../core/json.js"; +import { + defineSignalSpec as defineSignalSpecFn, + resolveSignalName, + type SignalSpec, +} from "../core/signal-spec.js"; import type { StandardSchemaV1 } from "../core/standard-schema.js"; import { calculateDateFromDuration } from "../core/step-attempt.js"; import { @@ -173,6 +179,70 @@ export class OpenWorkflow { async cancelWorkflowRun(workflowRunId: string): Promise { await this.backend.cancelWorkflowRun({ workflowRunId }); } + + /** + * Send a typed signal to a waiting workflow run using a {@link SignalSpec} + * descriptor. The payload type is enforced by the spec. + * @param workflowRunId - The ID of the workflow run to signal + * @param spec - A `SignalSpec` created with {@link defineSignalSpec} + * @param payload - Signal payload (typed to the spec's `Payload` generic) + * @returns Result indicating whether the signal was delivered + */ + async sendSignal( + workflowRunId: string, + spec: SignalSpec, + payload?: Payload, + ): Promise; + + /** + * Send a signal to a waiting workflow run by name. + * @param workflowRunId - The ID of the workflow run to signal + * @param signalName - The signal name (must match the name used in `step.waitForSignal`) + * @param payload - Optional data to pass to the waiting workflow step + * @returns Result indicating whether the signal was delivered + * @example + * ```ts + * await ow.sendSignal("run-id", "approval-received", { approved: true }); + * ``` + */ + async sendSignal( + workflowRunId: string, + signalName: string, + payload?: JsonValue, + ): Promise; + + async sendSignal( + workflowRunId: string, + nameOrSpec: string | SignalSpec, + payload?: JsonValue, + ): Promise { + return this.backend.deliverSignal({ + workflowRunId, + signalName: resolveSignalName(nameOrSpec), + payload: payload ?? null, + }); + } + + /** + * Create a typed signal descriptor for use with `step.waitForSignal` and + * `sendSignal`. Using a spec on both sides ensures the signal name and + * payload type are consistent. + * @param name - Signal name + * @returns A `SignalSpec` descriptor + * @example + * ```ts + * const approvalSignal = ow.defineSignalSpec<{ approved: boolean }>("approval"); + * + * // In workflow: + * const decision = await step.waitForSignal(approvalSignal); + * + * // Sender: + * await handle.sendSignal(approvalSignal, { approved: true }); + * ``` + */ + defineSignalSpec(name: string): SignalSpec { + return defineSignalSpecFn(name); + } } /** @@ -346,4 +416,38 @@ class WorkflowRunHandle { workflowRunId: this.workflowRun.id, }); } + + /** + * Send a typed signal to this workflow run using a {@link SignalSpec} + * descriptor. The payload type is enforced by the spec. + * @param spec - A `SignalSpec` created with {@link defineSignalSpec} + * @param payload - Signal payload (typed to the spec's `Payload` generic) + * @returns Result indicating whether the signal was delivered + */ + async sendSignal( + spec: SignalSpec, + payload?: Payload, + ): Promise; + + /** + * Send a signal to this workflow run by name. + * @param signalName - The signal name (must match the name used in `step.waitForSignal`) + * @param payload - Optional data to pass to the waiting workflow step + * @returns Result indicating whether the signal was delivered + */ + async sendSignal( + signalName: string, + payload?: JsonValue, + ): Promise; + + async sendSignal( + nameOrSpec: string | SignalSpec, + payload?: JsonValue, + ): Promise { + return this.backend.deliverSignal({ + workflowRunId: this.workflowRun.id, + signalName: resolveSignalName(nameOrSpec), + payload: payload ?? null, + }); + } } diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 7110fa29..45c37bfe 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -68,6 +68,11 @@ export interface Backend { params: Readonly, ): Promise; + // Signals + deliverSignal( + params: Readonly, + ): Promise; + // Lifecycle stop(): Promise; } @@ -163,6 +168,19 @@ export interface FailStepAttemptParams { error: SerializedError; } +export interface DeliverSignalParams { + workflowRunId: string; + signalName: string; + payload: JsonValue | null; +} + +export type DeliverSignalResult = + | { delivered: true } + | { + delivered: false; + reason: "workflow_not_found" | "signal_not_waiting"; + }; + export interface SetStepAttemptChildWorkflowRunParams { workflowRunId: string; stepAttemptId: string; diff --git a/packages/openworkflow/core/signal-spec.ts b/packages/openworkflow/core/signal-spec.ts new file mode 100644 index 00000000..190b97b0 --- /dev/null +++ b/packages/openworkflow/core/signal-spec.ts @@ -0,0 +1,56 @@ +/** + * A typed descriptor for a named signal. The `Payload` generic is compile-time + * only — only `name` exists at runtime. + * + * Create one with {@link defineSignalSpec} and use it on both sides of the + * signal API to get end-to-end type safety: + * + * ```ts + * const approvalSignal = defineSignalSpec<{ approved: boolean }>("approval"); + * + * // Workflow: + * const decision = await step.waitForSignal(approvalSignal); + * + * // Sender: + * await handle.sendSignal(approvalSignal, { approved: true }); + * ``` + */ +export interface SignalSpec { + /** The signal name matched between `step.waitForSignal` and `sendSignal`. */ + readonly name: string; + /** + * Phantom type carrier — does NOT exist at runtime. + * Prevents structural collapse between different `SignalSpec` instantiations. + * @internal + */ + readonly __types?: { payload: Payload }; +} + +/** + * Create a typed signal descriptor. + * @param name - Signal name. Must match the name passed to both + * `step.waitForSignal` and `sendSignal`. + * @returns A `SignalSpec` descriptor. + * @example + * ```ts + * const approvalSignal = defineSignalSpec<{ approved: boolean; comment?: string }>( + * "approval-decision", + * ); + * ``` + */ +export function defineSignalSpec( + name: string, +): SignalSpec { + return { name }; +} + +/** + * Extract the signal name from a `string` or `SignalSpec`. + * @param nameOrSpec - Signal name string or `SignalSpec` descriptor. + * @returns The signal name string. + */ +export function resolveSignalName( + nameOrSpec: string | SignalSpec, +): string { + return typeof nameOrSpec === "string" ? nameOrSpec : nameOrSpec.name; +} diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index 888120ac..868b5774 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -7,7 +7,7 @@ import { err, ok } from "./result.js"; /** * The kind of step in a workflow. */ -export type StepKind = "function" | "sleep" | "workflow"; +export type StepKind = "function" | "sleep" | "workflow" | "signal"; /** * Status of a step attempt through its lifecycle. @@ -34,12 +34,23 @@ export interface WorkflowStepAttemptContext { timeoutAt: string | null; } +/** + * Context for a signal step attempt. + */ +export interface SignalStepAttemptContext { + kind: "signal"; + timeoutAt: string | null; + /** Set to true by deliverSignal once the signal payload has been written. */ + delivered?: boolean; +} + /** * Context for a step attempt. */ export type StepAttemptContext = | SleepStepAttemptContext - | WorkflowStepAttemptContext; + | WorkflowStepAttemptContext + | SignalStepAttemptContext; /** * StepAttempt represents a single attempt of a step within a workflow. @@ -171,3 +182,17 @@ export function createWorkflowContext( timeoutAt: timeoutAt?.toISOString() ?? null, }; } + +/** + * Create the context object for a signal step attempt. + * @param timeoutAt - Signal timeout deadline, or null for no timeout + * @returns The context object for a signal step + */ +export function createSignalContext( + timeoutAt: Readonly | null, +): SignalStepAttemptContext { + return { + kind: "signal" as const, + timeoutAt: timeoutAt?.toISOString() ?? null, + }; +} diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 01745679..74b38fc3 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -1,4 +1,5 @@ import type { DurationString } from "./duration.js"; +import type { SignalSpec } from "./signal-spec.js"; import type { RetryPolicy, WorkflowSpec } from "./workflow-definition.js"; import type { WorkflowRun } from "./workflow-run.js"; @@ -40,6 +41,17 @@ export interface StepRunWorkflowOptions { timeout?: number | string | Date; } +/** + * Options for an individual step defined with `step.waitForSignal()`. + */ +export interface StepWaitForSignalOptions { + /** + * Maximum time to wait for the signal to arrive. If the signal is not + * received before the timeout, the step throws a SignalTimeoutError. + */ + timeout?: number | string | Date; +} + /** * Represents the API for defining steps within a workflow. Used within a * workflow handler to define steps by calling `step.run()`, `step.sleep()`, @@ -56,6 +68,10 @@ export interface StepApi { options?: Readonly, ) => Promise; sleep: (name: string, duration: DurationString) => Promise; + waitForSignal: ( + nameOrSpec: string | SignalSpec, + options?: Readonly, + ) => Promise; } /** diff --git a/packages/openworkflow/index.ts b/packages/openworkflow/index.ts index d59653d9..db25a85d 100644 --- a/packages/openworkflow/index.ts +++ b/packages/openworkflow/index.ts @@ -4,7 +4,17 @@ export { OpenWorkflow } from "./client/client.js"; // core export type { RetryPolicy, Workflow } from "./core/workflow-definition.js"; -export type { WorkflowRunMetadata } from "./core/workflow-function.js"; +export type { + WorkflowRunMetadata, + StepWaitForSignalOptions, +} from "./core/workflow-function.js"; +export type { + DeliverSignalParams, + DeliverSignalResult, +} from "./core/backend.js"; +export type { SignalSpec } from "./core/signal-spec.js"; +export { defineSignalSpec } from "./core/signal-spec.js"; +export { SignalTimeoutError } from "./worker/execution.js"; export { defineWorkflowSpec, defineWorkflow, diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 8475ffd4..d34fba76 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -21,6 +21,8 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + DeliverSignalParams, + DeliverSignalResult, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; import { JsonValue } from "../core/json.js"; @@ -406,11 +408,11 @@ export class BackendPostgres implements Backend { } /** - * Reconcile a just-parked parent run that is waiting on workflow replay. If the - * child already reached a terminal state before the parent cleared workerId, - * the normal child-completion wake-up can be missed. This forces an immediate - * wake-up for that case. - * @param workflowRunId - Parent workflow run id + * Reconcile a just-parked run that may have missed a wake-up while the + * worker still held its lease. Covers two cases: + * 1. A child workflow finished before the parent cleared workerId. + * 2. A signal was delivered before the parent cleared workerId. + * @param workflowRunId - Workflow run id to reconcile * @returns Updated run when reconciliation changed availability, otherwise null */ private async reconcileWorkflowSleepWakeUp( @@ -432,17 +434,28 @@ export class BackendPostgres implements Backend { AND wr."id" = ${workflowRunId} AND wr."status" = 'running' AND wr."worker_id" IS NULL - AND EXISTS ( - SELECT 1 - FROM ${stepAttemptsTable} sa - JOIN ${workflowRunsTable} child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = wr."namespace_id" - AND sa."workflow_run_id" = wr."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + AND ( + EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + JOIN ${workflowRunsTable} child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'signal' + AND sa."status" = 'running' + AND sa."context" @> '{"delivered":true}' + ) ) RETURNING wr.* `; @@ -602,6 +615,62 @@ export class BackendPostgres implements Backend { return updated; } + async deliverSignal( + params: DeliverSignalParams, + ): Promise { + const workflowRunsTable = this.workflowRunsTable(); + const stepAttemptsTable = this.stepAttemptsTable(); + + // Check the workflow run exists and is active + const workflowRun = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!workflowRun) { + return { delivered: false, reason: "workflow_not_found" }; + } + + // Write the signal payload into the step attempt and mark it delivered. + // Use context.delivered as the delivery flag so null payloads work correctly. + const [updated] = await this.pg<{ id: string }[]>` + UPDATE ${stepAttemptsTable} + SET + "output" = ${this.pg.json(params.payload)}, + "context" = "context" || '{"delivered":true}'::jsonb, + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "step_name" = ${params.signalName} + AND "kind" = 'signal' + AND "status" = 'running' + AND NOT ("context" @> '{"delivered":true}') + RETURNING "id" + `; + + if (!updated) { + return { delivered: false, reason: "signal_not_waiting" }; + } + + // Wake the workflow run so it picks up the signal on next execution. + // No worker_id guard: signal can arrive while the worker still holds the + // lease, and sleepWorkflowRun's reconcile step will correct available_at + // after the worker parks. + await this.pg` + UPDATE ${workflowRunsTable} + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > NOW() + THEN NOW() + ELSE "available_at" + END, + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ${params.workflowRunId} + AND "status" IN ('pending', 'running') + `; + + return { delivered: true }; + } + private async wakeParentWorkflowRun( childWorkflowRun: Readonly, ): Promise { diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 443d7f07..f7e9b8f5 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -20,6 +20,8 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + DeliverSignalParams, + DeliverSignalResult, toWorkflowRunCounts, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; @@ -406,11 +408,12 @@ export class BackendSqlite implements Backend { } /** - * Reconcile a parked parent run that is waiting on workflow replay. This closes - * the race where a child finished before the parent cleared workerId. - * @param workflowRunId - Parent workflow run id - * @returns Updated run when reconciliation changed availability, otherwise - * null + * Reconcile a just-parked run that may have missed a wake-up while the + * worker still held its lease. Covers two cases: + * 1. A child workflow finished before the parent cleared workerId. + * 2. A signal was delivered before the parent cleared workerId. + * @param workflowRunId - Workflow run id to reconcile + * @returns Updated run when reconciliation changed availability, otherwise null */ private async reconcileWorkflowSleepWakeUp( workflowRunId: string, @@ -428,17 +431,28 @@ export class BackendSqlite implements Backend { AND "id" = ? AND "status" = 'running' AND "worker_id" IS NULL - AND EXISTS ( - SELECT 1 - FROM "step_attempts" sa - JOIN "workflow_runs" child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = "workflow_runs"."namespace_id" - AND sa."workflow_run_id" = "workflow_runs"."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + AND ( + EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_runs" child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM "step_attempts" sa + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'signal' + AND sa."status" = 'running' + AND json_extract(sa."context", '$.delivered') IS TRUE + ) ) `); @@ -658,6 +672,90 @@ export class BackendSqlite implements Backend { return updated; } + // eslint-disable-next-line @typescript-eslint/require-await + async deliverSignal( + params: DeliverSignalParams, + ): Promise { + const currentTime = now(); + + // Check the workflow run exists and is active + const workflowRow = this.db + .prepare( + ` + SELECT "id", "status" + FROM "workflow_runs" + WHERE "namespace_id" = ? AND "id" = ? + LIMIT 1 + `, + ) + .get(this.namespaceId, params.workflowRunId) as + | { id: string; status: string } + | undefined; + + if (!workflowRow) { + return { delivered: false, reason: "workflow_not_found" }; + } + + // Write the signal payload and mark it delivered via context.delivered. + // Use context.delivered as the delivery flag so null payloads work correctly. + const updateResult = this.db + .prepare( + ` + UPDATE "step_attempts" + SET + "output" = ?, + "context" = json_patch("context", '{"delivered":true}'), + "updated_at" = ? + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "step_name" = ? + AND "kind" = 'signal' + AND "status" = 'running' + AND json_extract("context", '$.delivered') IS NOT TRUE + `, + ) + .run( + toJSON(params.payload), + currentTime, + this.namespaceId, + params.workflowRunId, + params.signalName, + ); + + if (updateResult.changes === 0) { + return { delivered: false, reason: "signal_not_waiting" }; + } + + // Wake the workflow run so it picks up the signal on next execution. + // No worker_id guard: signal can arrive while the worker still holds the + // lease, and sleepWorkflowRun's reconcile step will correct available_at + // after the worker parks. + this.db + .prepare( + ` + UPDATE "workflow_runs" + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > ? THEN ? + ELSE "available_at" + END, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running') + `, + ) + .run( + currentTime, + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + ); + + return { delivered: true }; + } + private wakeParentWorkflowRun(childWorkflowRun: Readonly): void { if ( !childWorkflowRun.parentStepAttemptNamespaceId || diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index f63c678a..3e0e9f05 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -1,7 +1,11 @@ import { OpenWorkflow } from "../client/client.js"; import type { Backend } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; -import type { StepAttempt } from "../core/step-attempt.js"; +import { defineSignalSpec } from "../core/signal-spec.js"; +import type { + SignalStepAttemptContext, + StepAttempt, +} from "../core/step-attempt.js"; import { DEFAULT_WORKFLOW_RETRY_POLICY } from "../core/workflow-definition.js"; import type { WorkflowFunctionParams } from "../core/workflow-function.js"; import type { WorkflowRun } from "../core/workflow-run.js"; @@ -10,6 +14,7 @@ import { DEFAULT_POSTGRES_URL } from "../postgres/postgres.js"; import { WORKFLOW_STEP_LIMIT, STEP_LIMIT_EXCEEDED_ERROR_CODE, + SignalTimeoutError, createStepExecutionStateFromAttempts, executeWorkflow, } from "./execution.js"; @@ -1737,6 +1742,633 @@ describe("StepExecutor", () => { ); expect(childStatus).toBe("completed"); }); + // ---- step.waitForSignal tests ----------------------------------------- + + describe("step.waitForSignal", () => { + test("parks workflow waiting for signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-parks-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ value: number }>( + "my-signal", + ); + return payload.value; + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + const parkedRun = await tickUntilParked( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + + expect(parkedRun.status).toBe("running"); + expect(parkedRun.workerId).toBeNull(); + expect(parkedRun.availableAt).not.toBeNull(); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempt = attempts.data.find((a) => a.kind === "signal"); + expect(signalAttempt).toBeDefined(); + expect(signalAttempt?.stepName).toBe("my-signal"); + expect(signalAttempt?.status).toBe("running"); + }); + + test("resumes workflow with signal payload after sendSignal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-resume-${randomUUID()}` }, + async ({ step }) => { + const before = await step.run({ name: "before" }, () => 10); + const payload = await step.waitForSignal<{ multiplier: number }>( + "multiply-signal", + ); + return before * payload.multiplier; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + // Tick until the workflow parks waiting for the signal + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Send the signal + await handle.sendSignal("multiply-signal", { multiplier: 3 }); + + // Tick until completed + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe(30); + }); + + test("throws SignalTimeoutError when timeout elapses", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-timeout-${randomUUID()}` }, + async ({ step }) => { + try { + await step.waitForSignal("timed-signal", { timeout: "20ms" }); + return "signal-received"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + return "timed-out"; + } + throw error; + } + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + // Tick until parked (waiting for signal) + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Wait for timeout to elapse + await sleep(50); + + // Tick again — timeout should be detected, workflow completes + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("timed-out"); + }); + + test("steps after waitForSignal are skipped on replay", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + let sideEffectCount = 0; + + const workflow = client.defineWorkflow( + { name: `signal-replay-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ value: string }>( + "my-signal", + ); + // This step should only run once (not on the replay pass that parks) + await step.run({ name: "after-signal" }, () => { + sideEffectCount++; + }); + return payload.value; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + expect(sideEffectCount).toBe(0); + + await handle.sendSignal("my-signal", { value: "hello" }); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + expect(sideEffectCount).toBe(1); + await expect(handle.result()).resolves.toBe("hello"); + }); + + test("ow.sendSignal() with run ID delivers signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-ow-send-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ approved: boolean }>( + "approval", + ); + return payload.approved ? "approved" : "rejected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Use top-level ow.sendSignal() instead of handle.sendSignal() + const result = await client.sendSignal( + handle.workflowRun.id, + "approval", + { approved: true }, + ); + expect(result.delivered).toBe(true); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe("approved"); + }); + + test("sendSignal returns signal_not_waiting when no signal step exists", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-not-waiting-${randomUUID()}` }, + async ({ step }) => { + await step.run({ name: "only-step" }, () => "done"); + return "complete"; + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + // Workflow is already complete — signal delivery should fail + const result = await client.sendSignal( + handle.workflowRun.id, + "some-signal", + {}, + ); + expect(result.delivered).toBe(false); + }); + + test("signal step stores timeoutAt in context", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-context-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("my-signal", { timeout: "1h" }); + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempt = attempts.data.find((a) => a.kind === "signal"); + const signalContext = signalAttempt?.context as SignalStepAttemptContext; + expect(signalContext.kind).toBe("signal"); + // timeoutAt should be approximately 1 hour from now + expect(signalContext.timeoutAt).toBeTypeOf("string"); + const timeoutAt = new Date(signalContext.timeoutAt ?? ""); + const diffMs = timeoutAt.getTime() - Date.now(); + expect(diffMs).toBeGreaterThan(59 * 60 * 1000); + expect(diffMs).toBeLessThan(61 * 60 * 1000); + }); + + test("signal step without timeout stores null timeoutAt", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-no-timeout-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("my-signal"); + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempt = attempts.data.find((a) => a.kind === "signal"); + expect(signalAttempt?.context).toMatchObject({ + kind: "signal", + timeoutAt: null, + }); + }); + + test("duplicate waitForSignal names are auto-indexed", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-duplicate-${randomUUID()}` }, + async ({ step }) => { + const a = await step.waitForSignal<{ v: number }>("sig"); + const b = await step.waitForSignal<{ v: number }>("sig"); + return a.v + b.v; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + // Parks on first signal + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("sig", { v: 10 }); + + // Parks on second signal + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("sig:1", { v: 20 }); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe(30); + }); + + test("timeout: 0 times out immediately", async () => { + // Regression: timeout: 0 is falsy, so the truthiness check + // `options?.timeout ? ...` skipped resolveWorkflowTimeoutAt and + // treated it as no timeout, causing the workflow to park until + // deadline/default instead of timing out immediately. + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-timeout-zero-${randomUUID()}` }, + async ({ step }) => { + try { + await step.waitForSignal("never", { timeout: 0 }); + return "received"; + } catch (error) { + if (error instanceof SignalTimeoutError) return "timed-out"; + throw error; + } + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("timed-out"); + }); + + test("signal received before timeout does not trigger timeout on replay", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-before-timeout-${randomUUID()}` }, + async ({ step }) => { + try { + const payload = await step.waitForSignal<{ value: string }>( + "quick-signal", + { timeout: "500ms" }, + ); + return `received:${payload.value}`; + } catch (error) { + if (error instanceof SignalTimeoutError) return "timed-out"; + throw error; + } + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + // Send signal before timeout elapses + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("quick-signal", { value: "fast" }); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe("received:fast"); + }); + + test("signal timeout carries correct signalName on the error", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + let caughtError: SignalTimeoutError | undefined; + + const workflow = client.defineWorkflow( + { name: `signal-error-name-${randomUUID()}` }, + async ({ step }) => { + try { + await step.waitForSignal("named-signal", { timeout: "20ms" }); + return "received"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + caughtError = error; + return "caught"; + } + throw error; + } + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await sleep(50); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + await expect(handle.result()).resolves.toBe("caught"); + expect(caughtError).toBeInstanceOf(SignalTimeoutError); + expect(caughtError?.signalName).toBe("named-signal"); + expect(caughtError?.code).toBe("SIGNAL_TIMEOUT"); + }); + + test("null payload signal is treated as delivered", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-null-payload-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal("ping"); + return payload === null ? "got-null" : "unexpected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Send signal with no payload — defaults to null + const result = await handle.sendSignal("ping"); + expect(result.delivered).toBe(true); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("got-null"); + }); + + test("explicit null payload signal is treated as delivered", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-explicit-null-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal("ping"); + return payload === null ? "got-null" : "unexpected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Send signal with explicit null payload + const result = await client.sendSignal( + handle.workflowRun.id, + "ping", + null, + ); + expect(result.delivered).toBe(true); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("got-null"); + }); + + test("waitForSignal step is not created again on replay after signal received", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-no-duplicate-attempt-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal<{ ok: boolean }>("once"); + return "done"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("once", { ok: true }); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + // There should be exactly one signal step attempt + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempts = attempts.data.filter((a) => a.kind === "signal"); + expect(signalAttempts).toHaveLength(1); + const signalAttempt = signalAttempts.at(0); + expect(signalAttempt?.status).toBe("completed"); + expect(signalAttempt?.output).toEqual({ ok: true }); + }); + + test("signal delivered while worker holds lease still wakes the run", async () => { + // Regression: deliverSignal previously guarded the wake-up query with + // "worker_id IS NULL". If the signal arrived while the worker still held + // its lease, no wake-up was recorded and the run slept until + // timeout/deadline. sleepWorkflowRun's reconcile step must correct this. + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-race-lease-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ v: number }>("race"); + return payload.v * 2; + }, + ); + + // Long deadline so the bug would cause a hang, not a spurious pass. + const handle = await workflow.run( + {}, + { deadlineAt: new Date(Date.now() + 60_000) }, + ); + const worker = client.newWorker({ concurrency: 1 }); + + // Start the tick (worker claims run, creates signal step, then parks). + // Deliver the signal concurrently so it may arrive while worker_id is set. + const tickPromise = worker.tick(); + await sleep(20); // give the worker time to create the signal step + const deliverResult = await handle.sendSignal("race", { v: 7 }); + expect(deliverResult.delivered).toBe(true); + await tickPromise; + + // reconcileWorkflowSleepWakeUp should have reset available_at to NOW(), + // so the next tick picks it up immediately. + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe(14); + }); + + test("workflow deadline is used as park time when no signal timeout is set", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + // Deadline 2 seconds from now — well in the future but measurable + const deadlineAt = new Date(Date.now() + 2000); + + const workflow = client.defineWorkflow( + { name: `signal-deadline-park-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("any-signal"); + return "received"; + }, + ); + + const handle = await workflow.run({}, { deadlineAt }); + const worker = client.newWorker(); + const parkedRun = await tickUntilParked( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + + // availableAt should be at (or very close to) deadlineAt since no + // signal timeout was set — the workflow parks until its own deadline + expect(parkedRun.availableAt).not.toBeNull(); + const diff = Math.abs( + parkedRun.availableAt.getTime() - deadlineAt.getTime(), + ); + expect(diff).toBeLessThan(2000); + }); + + test("workflow with expired deadline fails while waiting for signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-deadline-fail-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("any-signal"); + return "received"; + }, + ); + + // deadline already in the past + const handle = await workflow.run( + {}, + { deadlineAt: new Date(Date.now() - 1) }, + ); + const worker = client.newWorker(); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("failed"); + }); + + test("waitForSignal accepts a SignalSpec descriptor", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const approvalSignal = defineSignalSpec<{ approved: boolean }>( + "approval", + ); + + const workflow = client.defineWorkflow( + { name: `signal-spec-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal(approvalSignal); + return payload.approved ? "approved" : "rejected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + const result = await handle.sendSignal(approvalSignal, { + approved: true, + }); + expect(result.delivered).toBe(true); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe("approved"); + }); + }); }); describe("executeWorkflow", () => { diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index d3f60474..26ccfab8 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -2,6 +2,8 @@ import type { Backend } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; import { deserializeError, serializeError } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; +import type { SignalSpec } from "../core/signal-spec.js"; +import { resolveSignalName } from "../core/signal-spec.js"; import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; import { getCachedStepAttempt, @@ -10,6 +12,7 @@ import { calculateDateFromDuration, createSleepContext, createWorkflowContext, + createSignalContext, } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate, @@ -19,6 +22,7 @@ import { } from "../core/workflow-definition.js"; import type { StepRunWorkflowOptions, + StepWaitForSignalOptions, StepApi, StepFunction, StepFunctionConfig, @@ -72,6 +76,20 @@ class StepError extends Error { } } +/** + * Error thrown when a signal step times out before receiving a signal. + */ +export class SignalTimeoutError extends Error { + readonly code = "SIGNAL_TIMEOUT"; + readonly signalName: string; + + constructor(signalName: string) { + super(`Timed out waiting for signal "${signalName}"`); + this.name = "SignalTimeoutError"; + this.signalName = signalName; + } +} + /** Default retry policy for step failures. */ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { initialInterval: "1s", @@ -289,6 +307,7 @@ export interface StepExecutorOptions { backend: Backend; workflowRunId: string; workerId: string; + deadlineAt: Date | null; attempts: StepAttempt[]; stepLimit?: number; } @@ -311,6 +330,7 @@ class StepExecutor implements StepApi { private readonly backend: Backend; private readonly workflowRunId: string; private readonly workerId: string; + private readonly deadlineAt: Date | null; private readonly stepLimit: number; private stepCount: number; private cache: StepAttemptCache; @@ -323,6 +343,7 @@ class StepExecutor implements StepApi { this.backend = options.backend; this.workflowRunId = options.workflowRunId; this.workerId = options.workerId; + this.deadlineAt = options.deadlineAt; this.stepLimit = Math.max(1, options.stepLimit ?? WORKFLOW_STEP_LIMIT); this.stepCount = options.attempts.length; @@ -453,6 +474,49 @@ class StepExecutor implements StepApi { throw new SleepSignal(resumeAt); } + // ---- step.waitForSignal ------------------------------------------------- + + async waitForSignal( + nameOrSpec: string | SignalSpec, + options?: Readonly, + ): Promise { + const stepName = this.resolveStepName(resolveSignalName(nameOrSpec)); + + // return cached result if signal already completed on a prior replay + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) return existingAttempt.output as Payload; + + // if signal previously timed out (failed), surface the error so the + // workflow function can catch it with a try/catch around waitForSignal() + const failedCount = this.failedCountsByStepName.get(stepName); + if (failedCount !== undefined && failedCount > 0) { + throw new SignalTimeoutError(stepName); + } + + // create new step attempt for the signal + const timeoutAt = + options?.timeout === undefined + ? null + : resolveWorkflowTimeoutAt(options.timeout); + const context = createSignalContext(timeoutAt); + + this.ensureStepLimitNotReached(); + await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal", + config: {}, + context, + }); + this.stepCount += 1; + + // park workflow waiting for signal (or until timeout) + throw new SleepSignal( + timeoutAt ?? this.deadlineAt ?? defaultWorkflowTimeoutAt(), + ); + } + // ---- step.runWorkflow ----------------------------------------------- async runWorkflow( @@ -780,6 +844,48 @@ export async function executeWorkflow( // update cache w/ completed attempt attempts[i] = completed; + continue; + } + + if ( + attempt.status === "running" && + attempt.kind === "signal" && + attempt.context?.kind === "signal" + ) { + const { timeoutAt } = attempt.context; + + // Signal has been delivered (context.delivered set by deliverSignal) + if (attempt.context.delivered) { + const completed = await backend.completeStepAttempt({ + workflowRunId: workflowRun.id, + stepAttemptId: attempt.id, + workerId, + output: attempt.output, + }); + attempts[i] = completed; + continue; + } + + // Signal timeout has elapsed without receiving the signal + if (timeoutAt && Date.now() >= new Date(timeoutAt).getTime()) { + const stepName = attempt.stepName; + await backend.failStepAttempt({ + workflowRunId: workflowRun.id, + stepAttemptId: attempt.id, + workerId, + error: serializeError(new SignalTimeoutError(stepName)), + }); + // Mark the attempt as failed so StepExecutor treats it as a failed step + attempts[i] = { ...attempt, status: "failed" }; + continue; + } + + // Signal not yet received — park workflow until timeout (or until workflow deadline) + throw new SleepSignal( + timeoutAt + ? new Date(timeoutAt) + : (workflowRun.deadlineAt ?? defaultWorkflowTimeoutAt()), + ); } } @@ -787,6 +893,7 @@ export async function executeWorkflow( backend, workflowRunId: workflowRun.id, workerId, + deadlineAt: workflowRun.deadlineAt, attempts, });