Skip to content

Commit 042ab00

Browse files
Add support for idempotency keys
1 parent d828d64 commit 042ab00

10 files changed

Lines changed: 553 additions & 15 deletions

File tree

packages/docs/docs/roadmap.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ description: What's coming next for OpenWorkflow
1616
- ✅ Workflow versioning
1717
- ✅ Workflow cancelation
1818
- ✅ Configurable retry policies
19+
- ✅ Idempotency keys
1920

2021
## Coming Soon
2122

22-
- Idempotency keys
2323
- Rollback / compensation functions
2424
- Signals for external events
2525
- Native OpenTelemetry integration

packages/docs/docs/workflows.mdx

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,26 @@ defineWorkflow(
177177
Any `retryPolicy` fields you omit fall back to defaults. See
178178
[Retries](/docs/retries) for the full behavior and defaults.
179179

180+
### Idempotency Key (Optional)
181+
182+
You can prevent duplicate run creation by providing an idempotency key, though
183+
there is a performance cost to checking for duplicates, so use this only when
184+
necessary:
185+
186+
```ts
187+
const handle = await ow.runWorkflow(
188+
sendWelcomeEmail.spec,
189+
{ userId: "user_123" },
190+
{ idempotencyKey: "welcome-email:user_123" },
191+
);
192+
```
193+
194+
Within a given namespace, when an existing run matches the same
195+
`workflowName` + `idempotencyKey`, OpenWorkflow returns that existing run
196+
immediately. This dedupe window is built-in and lasts 24 hours from the original
197+
run creation time. The same `idempotencyKey` used in a different namespace will
198+
create a separate run.
199+
180200
## Workflow Function Parameters
181201

182202
The workflow function receives an object with three properties:

packages/openworkflow/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# openworkflow
22

3+
## Unreleased
4+
5+
- Add workflow-scoped idempotency keys via
6+
`ow.runWorkflow(spec, input, { idempotencyKey })`
7+
- Built-in run idempotency dedupe period is 24 hours from run creation time
8+
39
## 0.6.7
410

511
- Add support for Bun as an alternative to Node

packages/openworkflow/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ For more details, check out our [docs](https://openworkflow.dev/docs).
6767
-**Long pauses** - Sleep for seconds or months
6868
-**Scheduled runs** - Start workflows at a specific time
6969
-**Parallel execution** - Run steps concurrently
70+
-**Idempotency keys** - Deduplicate repeated run requests safely (24h
71+
window)
7072
-**No extra servers** - Uses your existing database
7173
-**Dashboard included** - Monitor and debug workflows
7274
-**Production ready** - PostgreSQL and SQLite support

packages/openworkflow/backend.testsuite.ts

Lines changed: 314 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js";
12
import type { Backend } from "./backend.js";
23
import type { StepAttempt } from "./core/step.js";
34
import type { WorkflowRun } from "./core/workflow.js";
45
import { DEFAULT_WORKFLOW_RETRY_POLICY } from "./workflow.js";
56
import { randomUUID } from "node:crypto";
6-
import { afterAll, beforeAll, describe, expect, test } from "vitest";
7+
import { afterAll, beforeAll, describe, expect, test, vi } from "vitest";
78

89
/**
910
* Options for the Backend test suite.
@@ -104,6 +105,318 @@ export function testBackend(options: TestBackendOptions): void {
104105
expect(deltaSeconds(createdMin.availableAt)).toBeLessThan(1); // defaults to NOW()
105106
expect(createdMin.deadlineAt).toBeNull();
106107
});
108+
109+
test("reuses the same run for matching idempotency key and workflow identity", async () => {
110+
const backend = await setup();
111+
const workflowName = randomUUID();
112+
const version = "v1";
113+
const idempotencyKey = randomUUID();
114+
115+
const first = await backend.createWorkflowRun({
116+
workflowName,
117+
version,
118+
idempotencyKey,
119+
input: { val: 1 },
120+
config: {},
121+
context: null,
122+
availableAt: null,
123+
deadlineAt: null,
124+
});
125+
126+
const second = await backend.createWorkflowRun({
127+
workflowName,
128+
version,
129+
idempotencyKey,
130+
input: { val: 2 },
131+
config: { changed: true },
132+
context: null,
133+
availableAt: null,
134+
deadlineAt: null,
135+
});
136+
137+
expect(second.id).toBe(first.id);
138+
await teardown(backend);
139+
});
140+
141+
test("allows the same idempotency key across different workflow names", async () => {
142+
const backend = await setup();
143+
const idempotencyKey = randomUUID();
144+
145+
const first = await backend.createWorkflowRun({
146+
workflowName: "workflow-a",
147+
version: "v1",
148+
idempotencyKey,
149+
input: null,
150+
config: {},
151+
context: null,
152+
availableAt: null,
153+
deadlineAt: null,
154+
});
155+
156+
const second = await backend.createWorkflowRun({
157+
workflowName: "workflow-b",
158+
version: "v1",
159+
idempotencyKey,
160+
input: null,
161+
config: {},
162+
context: null,
163+
availableAt: null,
164+
deadlineAt: null,
165+
});
166+
167+
expect(second.id).not.toBe(first.id);
168+
await teardown(backend);
169+
});
170+
171+
test("returns existing run when reusing key with same workflow and different version", async () => {
172+
const backend = await setup();
173+
const workflowName = randomUUID();
174+
const idempotencyKey = randomUUID();
175+
176+
const first = await backend.createWorkflowRun({
177+
workflowName,
178+
version: "v1",
179+
idempotencyKey,
180+
input: null,
181+
config: {},
182+
context: null,
183+
availableAt: null,
184+
deadlineAt: null,
185+
});
186+
187+
const second = await backend.createWorkflowRun({
188+
workflowName,
189+
version: "v2",
190+
idempotencyKey,
191+
input: null,
192+
config: {},
193+
context: null,
194+
availableAt: null,
195+
deadlineAt: null,
196+
});
197+
expect(second.id).toBe(first.id);
198+
expect(second.version).toBe("v1");
199+
200+
await teardown(backend);
201+
});
202+
203+
test("creates a new run when matching key is older than the idempotency period", async () => {
204+
const backend = await setup();
205+
const workflowName = randomUUID();
206+
const idempotencyKey = randomUUID();
207+
const now = Date.now();
208+
const nowSpy = vi.spyOn(Date, "now");
209+
210+
try {
211+
nowSpy.mockReturnValue(now);
212+
const first = await backend.createWorkflowRun({
213+
workflowName,
214+
version: "v1",
215+
idempotencyKey,
216+
input: null,
217+
config: {},
218+
context: null,
219+
availableAt: null,
220+
deadlineAt: null,
221+
});
222+
223+
nowSpy.mockReturnValue(
224+
now + DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS + 60_000,
225+
);
226+
227+
const second = await backend.createWorkflowRun({
228+
workflowName,
229+
version: "v1",
230+
idempotencyKey,
231+
input: null,
232+
config: {},
233+
context: null,
234+
availableAt: null,
235+
deadlineAt: null,
236+
});
237+
238+
expect(second.id).not.toBe(first.id);
239+
} finally {
240+
nowSpy.mockRestore();
241+
await teardown(backend);
242+
}
243+
});
244+
245+
test("creates distinct runs when idempotency key is null", async () => {
246+
const backend = await setup();
247+
const workflowName = randomUUID();
248+
249+
const first = await backend.createWorkflowRun({
250+
workflowName,
251+
version: null,
252+
idempotencyKey: null,
253+
input: null,
254+
config: {},
255+
context: null,
256+
availableAt: null,
257+
deadlineAt: null,
258+
});
259+
260+
const second = await backend.createWorkflowRun({
261+
workflowName,
262+
version: null,
263+
idempotencyKey: null,
264+
input: null,
265+
config: {},
266+
context: null,
267+
availableAt: null,
268+
deadlineAt: null,
269+
});
270+
271+
expect(second.id).not.toBe(first.id);
272+
await teardown(backend);
273+
});
274+
275+
test("collapses concurrent creates with same key to one run id", async () => {
276+
const backend = await setup();
277+
const workflowName = randomUUID();
278+
const version = "v1";
279+
const idempotencyKey = randomUUID();
280+
281+
const runs = await Promise.all(
282+
Array.from({ length: 10 }, (_, i) =>
283+
backend.createWorkflowRun({
284+
workflowName,
285+
version,
286+
idempotencyKey,
287+
input: { i },
288+
config: {},
289+
context: null,
290+
availableAt: null,
291+
deadlineAt: null,
292+
}),
293+
),
294+
);
295+
296+
const uniqueRunIds = new Set(runs.map((run) => run.id));
297+
expect(uniqueRunIds.size).toBe(1);
298+
await teardown(backend);
299+
});
300+
301+
test("returns existing completed run for matching key", async () => {
302+
const backend = await setup();
303+
const workflowName = randomUUID();
304+
const version = "v1";
305+
const idempotencyKey = randomUUID();
306+
307+
const created = await backend.createWorkflowRun({
308+
workflowName,
309+
version,
310+
idempotencyKey,
311+
input: null,
312+
config: {},
313+
context: null,
314+
availableAt: null,
315+
deadlineAt: null,
316+
});
317+
318+
const workerId = randomUUID();
319+
const claimed = await backend.claimWorkflowRun({
320+
workerId,
321+
leaseDurationMs: 100,
322+
});
323+
expect(claimed?.id).toBe(created.id);
324+
325+
await backend.completeWorkflowRun({
326+
workflowRunId: created.id,
327+
workerId,
328+
output: { ok: true },
329+
});
330+
331+
const deduped = await backend.createWorkflowRun({
332+
workflowName,
333+
version,
334+
idempotencyKey,
335+
input: null,
336+
config: {},
337+
context: null,
338+
availableAt: null,
339+
deadlineAt: null,
340+
});
341+
342+
expect(deduped.id).toBe(created.id);
343+
expect(deduped.status).toBe("completed");
344+
await teardown(backend);
345+
});
346+
347+
test("returns existing failed and canceled runs for matching key", async () => {
348+
const backend = await setup();
349+
const workflowName = randomUUID();
350+
const version = "v1";
351+
352+
const failedKey = randomUUID();
353+
const failedRun = await backend.createWorkflowRun({
354+
workflowName,
355+
version,
356+
idempotencyKey: failedKey,
357+
input: null,
358+
config: {},
359+
context: null,
360+
availableAt: null,
361+
deadlineAt: null,
362+
});
363+
364+
const failedWorkerId = randomUUID();
365+
const failedClaimed = await backend.claimWorkflowRun({
366+
workerId: failedWorkerId,
367+
leaseDurationMs: 100,
368+
});
369+
expect(failedClaimed?.id).toBe(failedRun.id);
370+
371+
await backend.failWorkflowRun({
372+
workflowRunId: failedRun.id,
373+
workerId: failedWorkerId,
374+
error: { message: "terminal failure" },
375+
retryPolicy: { ...DEFAULT_WORKFLOW_RETRY_POLICY, maximumAttempts: 1 },
376+
});
377+
378+
const failedDeduped = await backend.createWorkflowRun({
379+
workflowName,
380+
version,
381+
idempotencyKey: failedKey,
382+
input: null,
383+
config: {},
384+
context: null,
385+
availableAt: null,
386+
deadlineAt: null,
387+
});
388+
expect(failedDeduped.id).toBe(failedRun.id);
389+
expect(failedDeduped.status).toBe("failed");
390+
391+
const canceledKey = randomUUID();
392+
const canceledRun = await backend.createWorkflowRun({
393+
workflowName,
394+
version,
395+
idempotencyKey: canceledKey,
396+
input: null,
397+
config: {},
398+
context: null,
399+
availableAt: null,
400+
deadlineAt: null,
401+
});
402+
403+
await backend.cancelWorkflowRun({ workflowRunId: canceledRun.id });
404+
405+
const canceledDeduped = await backend.createWorkflowRun({
406+
workflowName,
407+
version,
408+
idempotencyKey: canceledKey,
409+
input: null,
410+
config: {},
411+
context: null,
412+
availableAt: null,
413+
deadlineAt: null,
414+
});
415+
expect(canceledDeduped.id).toBe(canceledRun.id);
416+
expect(canceledDeduped.status).toBe("canceled");
417+
418+
await teardown(backend);
419+
});
107420
});
108421

109422
describe("listWorkflowRuns()", () => {

packages/openworkflow/backend.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { WorkflowRun } from "./core/workflow.js";
55
import type { RetryPolicy } from "./workflow.js";
66

77
export const DEFAULT_NAMESPACE_ID = "default";
8+
export const DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS = 24 * 60 * 60 * 1000;
89

910
/**
1011
* Backend is the interface for backend providers to implement.

0 commit comments

Comments
 (0)