diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx index 60eafc0b..e9bec9f9 100644 --- a/packages/docs/docs/roadmap.mdx +++ b/packages/docs/docs/roadmap.mdx @@ -16,10 +16,10 @@ description: What's coming next for OpenWorkflow - ✅ Workflow versioning - ✅ Workflow cancelation - ✅ Configurable retry policies +- ✅ Idempotency keys ## Coming Soon -- Idempotency keys - Rollback / compensation functions - Signals for external events - Native OpenTelemetry integration diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index f98bc087..d62ce105 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -177,6 +177,26 @@ defineWorkflow( Any `retryPolicy` fields you omit fall back to defaults. See [Retries](/docs/retries) for the full behavior and defaults. +### Idempotency Key (Optional) + +You can prevent duplicate run creation by providing an idempotency key, though +there is a performance cost to checking for duplicates, so use this only when +necessary: + +```ts +const handle = await ow.runWorkflow( + sendWelcomeEmail.spec, + { userId: "user_123" }, + { idempotencyKey: "welcome-email:user_123" }, +); +``` + +Within a given namespace, when an existing run matches the same +`workflowName` + `idempotencyKey`, OpenWorkflow returns that existing run +immediately. This dedupe window is built-in and lasts 24 hours from the original +run creation time. The same `idempotencyKey` used in a different namespace will +create a separate run. + ## Workflow Function Parameters The workflow function receives an object with three properties: diff --git a/packages/openworkflow/CHANGELOG.md b/packages/openworkflow/CHANGELOG.md index 394302b1..b1730451 100644 --- a/packages/openworkflow/CHANGELOG.md +++ b/packages/openworkflow/CHANGELOG.md @@ -1,5 +1,11 @@ # openworkflow +## Unreleased + +- Add workflow-scoped idempotency keys via + `ow.runWorkflow(spec, input, { idempotencyKey })` +- Built-in run idempotency dedupe period is 24 hours from run creation time + ## 0.6.7 - Add support for Bun as an alternative to Node diff --git a/packages/openworkflow/README.md b/packages/openworkflow/README.md index dc782e63..b8d9a93b 100644 --- a/packages/openworkflow/README.md +++ b/packages/openworkflow/README.md @@ -67,6 +67,7 @@ For more details, check out our [docs](https://openworkflow.dev/docs). - ✅ **Long pauses** - Sleep for seconds or months - ✅ **Scheduled runs** - Start workflows at a specific time - ✅ **Parallel execution** - Run steps concurrently +- ✅ **Idempotency keys** - Deduplicate repeated run requests (24h window) - ✅ **No extra servers** - Uses your existing database - ✅ **Dashboard included** - Monitor and debug workflows - ✅ **Production ready** - PostgreSQL and SQLite support diff --git a/packages/openworkflow/backend.testsuite.ts b/packages/openworkflow/backend.testsuite.ts index e707eafd..7109d2db 100644 --- a/packages/openworkflow/backend.testsuite.ts +++ b/packages/openworkflow/backend.testsuite.ts @@ -1,9 +1,10 @@ +import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js"; import type { Backend } from "./backend.js"; import type { StepAttempt } from "./core/step.js"; import type { WorkflowRun } from "./core/workflow.js"; import { DEFAULT_WORKFLOW_RETRY_POLICY } from "./workflow.js"; import { randomUUID } from "node:crypto"; -import { afterAll, beforeAll, describe, expect, test } from "vitest"; +import { afterAll, beforeAll, describe, expect, test, vi } from "vitest"; /** * Options for the Backend test suite. @@ -104,6 +105,380 @@ export function testBackend(options: TestBackendOptions): void { expect(deltaSeconds(createdMin.availableAt)).toBeLessThan(1); // defaults to NOW() expect(createdMin.deadlineAt).toBeNull(); }); + + test("reuses the same run for matching idempotency key and workflow identity", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const idempotencyKey = randomUUID(); + + const first = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey, + input: { val: 1 }, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const second = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey, + input: { val: 2 }, + config: { changed: true }, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(second.id).toBe(first.id); + await teardown(backend); + }); + + test("allows the same idempotency key across different workflow names", async () => { + const backend = await setup(); + const idempotencyKey = randomUUID(); + + const first = await backend.createWorkflowRun({ + workflowName: "workflow-a", + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const second = await backend.createWorkflowRun({ + workflowName: "workflow-b", + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(second.id).not.toBe(first.id); + await teardown(backend); + }); + + test("allows the same idempotency key across different namespaces", async () => { + const backendA = await setup(); + const backendB = await setup(); + const workflowName = randomUUID(); + const idempotencyKey = randomUUID(); + + try { + const firstA = await backendA.createWorkflowRun({ + workflowName, + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const firstB = await backendB.createWorkflowRun({ + workflowName, + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(firstA.id).not.toBe(firstB.id); + expect(firstA.namespaceId).not.toBe(firstB.namespaceId); + + const secondA = await backendA.createWorkflowRun({ + workflowName, + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const secondB = await backendB.createWorkflowRun({ + workflowName, + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(secondA.id).toBe(firstA.id); + expect(secondB.id).toBe(firstB.id); + } finally { + await teardown(backendA); + await teardown(backendB); + } + }); + + test("returns existing run when reusing key with same workflow and different version", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const idempotencyKey = randomUUID(); + + const first = await backend.createWorkflowRun({ + workflowName, + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const second = await backend.createWorkflowRun({ + workflowName, + version: "v2", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(second.id).toBe(first.id); + expect(second.version).toBe("v1"); + + await teardown(backend); + }); + + test("creates a new run when matching key is older than the idempotency period", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const idempotencyKey = randomUUID(); + const now = Date.now(); + const nowSpy = vi.spyOn(Date, "now"); + + try { + nowSpy.mockReturnValue(now); + const first = await backend.createWorkflowRun({ + workflowName, + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + nowSpy.mockReturnValue( + now + DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS + 60_000, + ); + + const second = await backend.createWorkflowRun({ + workflowName, + version: "v1", + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(second.id).not.toBe(first.id); + } finally { + nowSpy.mockRestore(); + await teardown(backend); + } + }); + + test("creates distinct runs when idempotency key is null", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + + const first = await backend.createWorkflowRun({ + workflowName, + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const second = await backend.createWorkflowRun({ + workflowName, + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(second.id).not.toBe(first.id); + await teardown(backend); + }); + + test("collapses concurrent creates with same key to one run id", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const idempotencyKey = randomUUID(); + + const runs = await Promise.all( + Array.from({ length: 10 }, (_, i) => + backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey, + input: { i }, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }), + ), + ); + + const uniqueRunIds = new Set(runs.map((run) => run.id)); + expect(uniqueRunIds.size).toBe(1); + await teardown(backend); + }); + + test("returns existing completed run for matching key", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const idempotencyKey = randomUUID(); + + const created = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const workerId = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + expect(claimed?.id).toBe(created.id); + + await backend.completeWorkflowRun({ + workflowRunId: created.id, + workerId, + output: { ok: true }, + }); + + const deduped = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(deduped.id).toBe(created.id); + expect(deduped.status).toBe("completed"); + await teardown(backend); + }); + + test("returns existing failed and canceled runs for matching key", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + + const failedKey = randomUUID(); + const failedRun = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: failedKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const failedWorkerId = randomUUID(); + const failedClaimed = await backend.claimWorkflowRun({ + workerId: failedWorkerId, + leaseDurationMs: 100, + }); + expect(failedClaimed?.id).toBe(failedRun.id); + + await backend.failWorkflowRun({ + workflowRunId: failedRun.id, + workerId: failedWorkerId, + error: { message: "terminal failure" }, + retryPolicy: { ...DEFAULT_WORKFLOW_RETRY_POLICY, maximumAttempts: 1 }, + }); + + const failedDeduped = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: failedKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(failedDeduped.id).toBe(failedRun.id); + expect(failedDeduped.status).toBe("failed"); + + const canceledKey = randomUUID(); + const canceledRun = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: canceledKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + await backend.cancelWorkflowRun({ workflowRunId: canceledRun.id }); + + const canceledDeduped = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: canceledKey, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(canceledDeduped.id).toBe(canceledRun.id); + expect(canceledDeduped.status).toBe("canceled"); + + await teardown(backend); + }); }); describe("listWorkflowRuns()", () => { diff --git a/packages/openworkflow/backend.ts b/packages/openworkflow/backend.ts index 8ee875e1..be4f96fa 100644 --- a/packages/openworkflow/backend.ts +++ b/packages/openworkflow/backend.ts @@ -5,6 +5,7 @@ import type { WorkflowRun } from "./core/workflow.js"; import type { RetryPolicy } from "./workflow.js"; export const DEFAULT_NAMESPACE_ID = "default"; +export const DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS = 24 * 60 * 60 * 1000; /** * Backend is the interface for backend providers to implement. diff --git a/packages/openworkflow/client.test.ts b/packages/openworkflow/client.test.ts index 2920196a..a7289d9c 100644 --- a/packages/openworkflow/client.test.ts +++ b/packages/openworkflow/client.test.ts @@ -1,6 +1,7 @@ +import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js"; import { OpenWorkflow } from "./client.js"; import { BackendPostgres } from "./postgres.js"; -import { DEFAULT_POSTGRES_URL } from "./postgres/postgres.js"; +import { DEFAULT_POSTGRES_URL, Postgres } from "./postgres/postgres.js"; import { DEFAULT_WORKFLOW_RETRY_POLICY, defineWorkflowSpec, @@ -329,6 +330,67 @@ describe("OpenWorkflow", () => { expect(handle.workflowRun.version).toBeNull(); }); + test("creates workflow run with idempotency key", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: "idempotency-test" }, + noopFn, + ); + const key = randomUUID(); + const handle = await workflow.run({ value: 1 }, { idempotencyKey: key }); + + expect(handle.workflowRun.idempotencyKey).toBe(key); + }); + + test("reuses existing workflow run for same idempotency key", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: "idempotency-dedupe-test" }, + noopFn, + ); + const key = randomUUID(); + + const first = await workflow.run({ value: 1 }, { idempotencyKey: key }); + const second = await workflow.run({ value: 2 }, { idempotencyKey: key }); + + expect(second.workflowRun.id).toBe(first.workflowRun.id); + expect(second.workflowRun.input).toEqual(first.workflowRun.input); + }); + + test("creates a new workflow run after the 24h idempotency window", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const workflow = client.defineWorkflow( + { name: "idempotency-expiration-test" }, + noopFn, + ); + const key = randomUUID(); + + const first = await workflow.run({ value: 1 }, { idempotencyKey: key }); + + const internalBackend = backend as unknown as { + pg: Postgres; + namespaceId: string; + }; + const staleCreatedAt = new Date( + Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS - 60_000, + ); + + await internalBackend.pg` + UPDATE "openworkflow"."workflow_runs" + SET "created_at" = ${staleCreatedAt} + WHERE "namespace_id" = ${internalBackend.namespaceId} + AND "id" = ${first.workflowRun.id} + `; + + const second = await workflow.run({ value: 2 }, { idempotencyKey: key }); + expect(second.workflowRun.id).not.toBe(first.workflowRun.id); + }); + test("cancels workflow run via handle", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); diff --git a/packages/openworkflow/client.ts b/packages/openworkflow/client.ts index 6a80e902..9521721e 100644 --- a/packages/openworkflow/client.ts +++ b/packages/openworkflow/client.ts @@ -104,7 +104,7 @@ export class OpenWorkflow { const workflowRun = await this.backend.createWorkflowRun({ workflowName: spec.name, version: spec.version ?? null, - idempotencyKey: null, + idempotencyKey: options?.idempotencyKey ?? null, config: {}, context: null, input: parsedInput ?? null, @@ -207,6 +207,11 @@ export interface WorkflowRunOptions { * it will be marked as failed. */ deadlineAt?: Date; + /** + * Prevent duplicate workflow run creation for the same workflow and key. + * Reusing the same key returns the existing run for up to 24 hours. + */ + idempotencyKey?: string; } /** diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 71d4a329..cb597e30 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -1,5 +1,6 @@ import { DEFAULT_NAMESPACE_ID, + DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, Backend, CancelWorkflowRunParams, ClaimWorkflowRunParams, @@ -92,7 +93,46 @@ export class BackendPostgres implements Backend { async createWorkflowRun( params: CreateWorkflowRunParams, ): Promise { - const [workflowRun] = await this.pg` + if (params.idempotencyKey === null) { + return await this.insertWorkflowRun(this.pg, params); + } + + const { workflowName, idempotencyKey } = params; + const lockScope = JSON.stringify({ + namespaceId: this.namespaceId, + workflowName, + idempotencyKey, + }); + + return await this.pg.begin(async (_tx) => { + const pgTx = _tx as unknown as Postgres; + + /* eslint-disable @cspell/spellchecker */ + await pgTx.unsafe( + "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::bigint))", + [lockScope], + ); + /* eslint-enable @cspell/spellchecker */ + + const existing = await this.getWorkflowRunByIdempotencyKey( + pgTx, + workflowName, + idempotencyKey, + new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), + ); + if (existing) { + return existing; + } + + return await this.insertWorkflowRun(pgTx, params); + }); + } + + private async insertWorkflowRun( + pg: Postgres, + params: CreateWorkflowRunParams, + ): Promise { + const [workflowRun] = await pg` INSERT INTO "openworkflow"."workflow_runs" ( "namespace_id", "id", @@ -116,11 +156,11 @@ export class BackendPostgres implements Backend { ${params.version}, 'pending', ${params.idempotencyKey}, - ${this.pg.json(params.config)}, - ${this.pg.json(params.context)}, - ${this.pg.json(params.input)}, + ${pg.json(params.config)}, + ${pg.json(params.context)}, + ${pg.json(params.input)}, 0, - ${sqlDateDefaultNow(this.pg, params.availableAt)}, + ${sqlDateDefaultNow(pg, params.availableAt)}, ${params.deadlineAt}, date_trunc('milliseconds', NOW()), NOW() @@ -133,6 +173,26 @@ export class BackendPostgres implements Backend { return workflowRun; } + private async getWorkflowRunByIdempotencyKey( + pg: Postgres, + workflowName: string, + idempotencyKey: string, + createdAt: Date, + ): Promise { + const [workflowRun] = await pg` + SELECT * + FROM "openworkflow"."workflow_runs" + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_name" = ${workflowName} + AND "idempotency_key" = ${idempotencyKey} + AND "created_at" >= ${createdAt} + ORDER BY "created_at" ASC, "id" ASC + LIMIT 1 + `; + + return workflowRun ?? null; + } + async getWorkflowRun( params: GetWorkflowRunParams, ): Promise { diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index 33c478c8..c0138658 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -1,5 +1,6 @@ import { testBackend } from "../backend.testsuite.js"; import { BackendSqlite } from "./backend.js"; +import { Database } from "./sqlite.js"; import assert from "node:assert"; import { randomUUID } from "node:crypto"; import { unlinkSync, existsSync } from "node:fs"; @@ -74,3 +75,83 @@ describe("BackendSqlite.connect errors", () => { ); }); }); + +describe("BackendSqlite.createWorkflowRun error handling", () => { + test("rolls back and rejects with the original error when keyed insert fails", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + const internalBackend = backend as unknown as { + insertWorkflowRun: (params: unknown) => unknown; + }; + const originalInsertWorkflowRun = internalBackend.insertWorkflowRun; + + internalBackend.insertWorkflowRun = () => { + throw new Error("insert failed"); + }; + + try { + await expect( + backend.createWorkflowRun({ + workflowName: "failing-workflow", + version: "v1", + idempotencyKey: randomUUID(), + config: {}, + context: null, + input: null, + availableAt: null, + deadlineAt: null, + }), + ).rejects.toThrow("insert failed"); + } finally { + internalBackend.insertWorkflowRun = originalInsertWorkflowRun; + await backend.stop(); + } + }); + + test("swallows rollback failures and wraps non-Error thrown values", async () => { + type BackendSqliteCtor = new ( + db: Database, + namespaceId: string, + ) => BackendSqlite; + + const calls: string[] = []; + const fakeDb: Database = { + exec(sql: string) { + calls.push(sql); + if (sql === "BEGIN IMMEDIATE") { + // eslint-disable-next-line @typescript-eslint/only-throw-error + throw "busy"; + } + if (sql === "ROLLBACK") throw new Error("cannot rollback"); + }, + prepare() { + throw new Error("prepare should not be called when BEGIN fails"); + }, + close() { + // no-op + }, + }; + + const backend = new (BackendSqlite as unknown as BackendSqliteCtor)( + fakeDb, + randomUUID(), + ); + + await expect( + backend.createWorkflowRun({ + workflowName: "failing-workflow", + version: "v1", + idempotencyKey: randomUUID(), + config: {}, + context: null, + input: null, + availableAt: null, + deadlineAt: null, + }), + ).rejects.toThrow("busy"); + + expect(calls).toEqual(["BEGIN IMMEDIATE", "ROLLBACK"]); + await backend.stop(); + }); +}); diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index bcb845dd..e6a333fb 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -1,5 +1,6 @@ import { DEFAULT_NAMESPACE_ID, + DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, Backend, CancelWorkflowRunParams, ClaimWorkflowRunParams, @@ -91,9 +92,43 @@ export class BackendSqlite implements Backend { this.db.close(); } - async createWorkflowRun( - params: CreateWorkflowRunParams, - ): Promise { + createWorkflowRun(params: CreateWorkflowRunParams): Promise { + const { workflowName, idempotencyKey } = params; + + if (idempotencyKey === null) { + return Promise.resolve(this.insertWorkflowRun(params)); + } + + try { + this.db.exec("BEGIN IMMEDIATE"); + + const existing = this.getWorkflowRunByIdempotencyKey( + workflowName, + idempotencyKey, + new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), + ); + if (existing) { + this.db.exec("COMMIT"); + return Promise.resolve(existing); + } + + const workflowRun = this.insertWorkflowRun(params); + this.db.exec("COMMIT"); + return Promise.resolve(workflowRun); + } catch (error) { + try { + this.db.exec("ROLLBACK"); + } catch { + // ignore + } + + return Promise.reject( + error instanceof Error ? error : new Error(String(error)), + ); + } + } + + private insertWorkflowRun(params: CreateWorkflowRunParams): WorkflowRun { const id = generateUUID(); const currentTime = now(); const availableAt = params.availableAt @@ -135,10 +170,44 @@ export class BackendSqlite implements Backend { currentTime, ); - const workflowRun = await this.getWorkflowRun({ workflowRunId: id }); - if (!workflowRun) throw new Error("Failed to create workflow run"); + const row = this.db + .prepare( + ` + SELECT * + FROM "workflow_runs" + WHERE "namespace_id" = ? AND "id" = ? + LIMIT 1 + `, + ) + .get(this.namespaceId, id) as WorkflowRunRow | undefined; + if (!row) throw new Error("Failed to create workflow run"); + + return rowToWorkflowRun(row); + } + + private getWorkflowRunByIdempotencyKey( + workflowName: string, + idempotencyKey: string, + createdAt: Date, + ): WorkflowRun | null { + const stmt = this.db.prepare(` + SELECT * + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "workflow_name" = ? + AND "idempotency_key" = ? + AND "created_at" >= ? + ORDER BY "created_at" ASC, "id" ASC + LIMIT 1 + `); - return workflowRun; + const row = stmt.get( + this.namespaceId, + workflowName, + idempotencyKey, + toISO(createdAt), + ) as WorkflowRunRow | undefined; + return row ? rowToWorkflowRun(row) : null; } getWorkflowRun(params: GetWorkflowRunParams): Promise {