Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion openworkflow/hello-world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import { defineWorkflow } from "openworkflow";
*/
export const helloWorld = defineWorkflow(
{ name: "hello-world" },
async ({ step }) => {
async ({ step, run }) => {
console.log(`[run ${run.id}]`);

const greeting = await step.run({ name: "greet" }, () => {
return "Hello, World!";
});
Expand Down
16 changes: 9 additions & 7 deletions packages/docs/docs/workflows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,20 @@ create a separate run.

## Workflow Function Parameters

The workflow function receives an object with three properties:
The workflow function receives an object with four properties:

| Parameter | Type | Description |
| --------- | ---------------- | ------------------------------------------------- |
| `input` | Generic | The input data passed when starting the workflow |
| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`) |
| `version` | `string \| null` | The workflow version, if specified |
| Parameter | Type | Description |
| --------- | --------------------- | ------------------------------------------------- |
| `input` | Generic | The input data passed when starting the workflow |
| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`) |
| `version` | `string \| null` | The workflow version, if specified |
| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) |
Comment thread
jamescmartinez marked this conversation as resolved.

```ts
defineWorkflow({ name: "example" }, async ({ input, step, version }) => {
defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => {
console.log("Input:", input);
console.log("Version:", version);
console.log("Run ID:", run.id);
Comment thread
jamescmartinez marked this conversation as resolved.

await step.run({ name: "my-step" }, async () => {
// step logic
Expand Down
110 changes: 110 additions & 0 deletions packages/openworkflow/execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,116 @@ describe("executeWorkflow", () => {
expect(result).toEqual({ receivedVersion: null });
});
});

describe("run", () => {
test("exposes run metadata from workflow run", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });
const deadlineAt = new Date(Date.now() + 60_000);
const idempotencyKey = "run-metadata-idempotency";

const workflow = client.defineWorkflow(
{ name: "run-metadata", version: "1.2.3" },
({ run }) => {
return {
id: run.id,
workflowName: run.workflowName,
createdAtIsDate: run.createdAt instanceof Date,
startedAtIsDate: run.startedAt instanceof Date,
createdAtMs: run.createdAt.getTime(),
startedAtMs: run.startedAt?.getTime() ?? null,
};
},
);

const worker = client.newWorker();
const handle = await workflow.run(
{},
{
deadlineAt,
idempotencyKey,
},
);
await worker.tick();
const result = await handle.result();

expect(result.id).toBe(handle.workflowRun.id);
expect(result.workflowName).toBe("run-metadata");
expect(result.createdAtIsDate).toBe(true);
expect(result.startedAtIsDate).toBe(true);
expect(result.startedAtMs).not.toBeNull();
if (result.startedAtMs === null) {
throw new Error("expected startedAtMs");
}
expect(result.startedAtMs).toBeGreaterThanOrEqual(result.createdAtMs);
});

test("keeps run metadata frozen at runtime", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });
let mutationError: unknown = null;

const workflow = client.defineWorkflow(
{ name: "run-frozen" },
async ({ run, step }) => {
await step.run({ name: "mutate-run" }, () => {
try {
Object.assign(run as unknown as Record<string, unknown>, {
id: "mutated",
});
} catch (error) {
mutationError = error;
}
return null;
});
return run.id;
},
);

const worker = client.newWorker();
const handle = await workflow.run();
await worker.tick();

const result = await handle.result();
expect(result).toBe(handle.workflowRun.id);
if (mutationError !== null) {
expect(mutationError).toBeInstanceOf(TypeError);
}
});

test("keeps id and timestamps stable across replay", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });
const snapshots: {
id: string;
createdAt: number;
startedAt: number | null;
}[] = [];

const workflow = client.defineWorkflow(
{ name: "run-replay-stable" },
async ({ run, step }) => {
snapshots.push({
id: run.id,
createdAt: run.createdAt.getTime(),
startedAt: run.startedAt?.getTime() ?? null,
});
await step.sleep("pause", "10ms");
return null;
},
);

const worker = client.newWorker();
const handle = await workflow.run();
await worker.tick();
await sleep(50);
await worker.tick();
await handle.result();

expect(snapshots.length).toBe(2);
expect(snapshots[0]).toEqual(snapshots[1]);
});
});
});

describe("createStepExecutionStateFromAttempts", () => {
Expand Down
18 changes: 17 additions & 1 deletion packages/openworkflow/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,22 @@ export type StepFunction<Output> = () =>
| Output
| undefined;

/**
* Read-only workflow run metadata exposed to workflow functions.
*/
export type WorkflowRunMetadata = Pick<
WorkflowRun,
"id" | "workflowName" | "createdAt" | "startedAt"
>;
Comment thread
jamescmartinez marked this conversation as resolved.

/**
* Params passed to a workflow function for the user to use when defining steps.
*/
export interface WorkflowFunctionParams<Input> {
input: Input;
step: StepApi;
version: string | null;
run: WorkflowRunMetadata;
}

/**
Expand Down Expand Up @@ -364,19 +373,26 @@ export async function executeWorkflow(
}
}

// create step executor
const executor = new StepExecutor({
backend,
workflowRunId: workflowRun.id,
workerId,
attempts,
});

const run = Object.freeze<WorkflowRunMetadata>({
id: workflowRun.id,
workflowName: workflowRun.workflowName,
createdAt: workflowRun.createdAt,
startedAt: workflowRun.startedAt,
});

// execute workflow
const output = await workflowFn({
input: workflowRun.input as unknown,
step: executor,
version: workflowVersion,
run,
});

// mark success
Expand Down
3 changes: 3 additions & 0 deletions packages/openworkflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
export type { OpenWorkflowOptions } from "./client.js";
export { OpenWorkflow } from "./client.js";

// execution
export type { WorkflowRunMetadata } from "./execution.js";

// worker
export type { WorkerOptions } from "./worker.js";
export { Worker } from "./worker.js";
Expand Down
Loading