diff --git a/openworkflow/hello-world.ts b/openworkflow/hello-world.ts index 8093fe33..e407e4f1 100644 --- a/openworkflow/hello-world.ts +++ b/openworkflow/hello-world.ts @@ -14,7 +14,9 @@ import { defineWorkflow } from "openworkflow"; */ export const helloWorld = defineWorkflow( { name: "hello-world" }, - async ({ step }) => { + async ({ step, run }) => { + console.log(`[run ${run.id}]`); + const greeting = await step.run({ name: "greet" }, () => { return "Hello, World!"; }); diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 198639cf..343f6954 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -211,18 +211,20 @@ create a separate run. ## Workflow Function Parameters -The workflow function receives an object with three properties: +The workflow function receives an object with four properties: -| Parameter | Type | Description | -| --------- | ---------------- | ------------------------------------------------- | -| `input` | Generic | The input data passed when starting the workflow | -| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`) | -| `version` | `string \| null` | The workflow version, if specified | +| Parameter | Type | Description | +| --------- | --------------------- | ------------------------------------------------- | +| `input` | Generic | The input data passed when starting the workflow | +| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`) | +| `version` | `string \| null` | The workflow version, if specified | +| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | ```ts -defineWorkflow({ name: "example" }, async ({ input, step, version }) => { +defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { console.log("Input:", input); console.log("Version:", version); + console.log("Run ID:", run.id); await step.run({ name: "my-step" }, async () => { // step logic diff --git a/packages/openworkflow/execution.test.ts b/packages/openworkflow/execution.test.ts index 7871ae49..6823f355 100644 --- a/packages/openworkflow/execution.test.ts +++ b/packages/openworkflow/execution.test.ts @@ -540,6 +540,116 @@ describe("executeWorkflow", () => { expect(result).toEqual({ receivedVersion: null }); }); }); + + describe("run", () => { + test("exposes run metadata from workflow run", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const deadlineAt = new Date(Date.now() + 60_000); + const idempotencyKey = "run-metadata-idempotency"; + + const workflow = client.defineWorkflow( + { name: "run-metadata", version: "1.2.3" }, + ({ run }) => { + return { + id: run.id, + workflowName: run.workflowName, + createdAtIsDate: run.createdAt instanceof Date, + startedAtIsDate: run.startedAt instanceof Date, + createdAtMs: run.createdAt.getTime(), + startedAtMs: run.startedAt?.getTime() ?? null, + }; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run( + {}, + { + deadlineAt, + idempotencyKey, + }, + ); + await worker.tick(); + const result = await handle.result(); + + expect(result.id).toBe(handle.workflowRun.id); + expect(result.workflowName).toBe("run-metadata"); + expect(result.createdAtIsDate).toBe(true); + expect(result.startedAtIsDate).toBe(true); + expect(result.startedAtMs).not.toBeNull(); + if (result.startedAtMs === null) { + throw new Error("expected startedAtMs"); + } + expect(result.startedAtMs).toBeGreaterThanOrEqual(result.createdAtMs); + }); + + test("keeps run metadata frozen at runtime", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + let mutationError: unknown = null; + + const workflow = client.defineWorkflow( + { name: "run-frozen" }, + async ({ run, step }) => { + await step.run({ name: "mutate-run" }, () => { + try { + Object.assign(run as unknown as Record, { + id: "mutated", + }); + } catch (error) { + mutationError = error; + } + return null; + }); + return run.id; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + await worker.tick(); + + const result = await handle.result(); + expect(result).toBe(handle.workflowRun.id); + if (mutationError !== null) { + expect(mutationError).toBeInstanceOf(TypeError); + } + }); + + test("keeps id and timestamps stable across replay", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const snapshots: { + id: string; + createdAt: number; + startedAt: number | null; + }[] = []; + + const workflow = client.defineWorkflow( + { name: "run-replay-stable" }, + async ({ run, step }) => { + snapshots.push({ + id: run.id, + createdAt: run.createdAt.getTime(), + startedAt: run.startedAt?.getTime() ?? null, + }); + await step.sleep("pause", "10ms"); + return null; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + await worker.tick(); + await sleep(50); + await worker.tick(); + await handle.result(); + + expect(snapshots.length).toBe(2); + expect(snapshots[0]).toEqual(snapshots[1]); + }); + }); }); describe("createStepExecutionStateFromAttempts", () => { diff --git a/packages/openworkflow/execution.ts b/packages/openworkflow/execution.ts index 18bac5ac..dbf5a520 100644 --- a/packages/openworkflow/execution.ts +++ b/packages/openworkflow/execution.ts @@ -52,6 +52,14 @@ export type StepFunction = () => | Output | undefined; +/** + * Read-only workflow run metadata exposed to workflow functions. + */ +export type WorkflowRunMetadata = Pick< + WorkflowRun, + "id" | "workflowName" | "createdAt" | "startedAt" +>; + /** * Params passed to a workflow function for the user to use when defining steps. */ @@ -59,6 +67,7 @@ export interface WorkflowFunctionParams { input: Input; step: StepApi; version: string | null; + run: WorkflowRunMetadata; } /** @@ -364,7 +373,6 @@ export async function executeWorkflow( } } - // create step executor const executor = new StepExecutor({ backend, workflowRunId: workflowRun.id, @@ -372,11 +380,19 @@ export async function executeWorkflow( attempts, }); + const run = Object.freeze({ + id: workflowRun.id, + workflowName: workflowRun.workflowName, + createdAt: workflowRun.createdAt, + startedAt: workflowRun.startedAt, + }); + // execute workflow const output = await workflowFn({ input: workflowRun.input as unknown, step: executor, version: workflowVersion, + run, }); // mark success diff --git a/packages/openworkflow/index.ts b/packages/openworkflow/index.ts index fa0d6304..ec7636d6 100644 --- a/packages/openworkflow/index.ts +++ b/packages/openworkflow/index.ts @@ -2,6 +2,9 @@ export type { OpenWorkflowOptions } from "./client.js"; export { OpenWorkflow } from "./client.js"; +// execution +export type { WorkflowRunMetadata } from "./execution.js"; + // worker export type { WorkerOptions } from "./worker.js"; export { Worker } from "./worker.js";