diff --git a/apps/dashboard/src/components/run-resume-action.tsx b/apps/dashboard/src/components/run-resume-action.tsx new file mode 100644 index 00000000..452d4c20 --- /dev/null +++ b/apps/dashboard/src/components/run-resume-action.tsx @@ -0,0 +1,110 @@ +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, +} from "@/components/ui/alert-dialog"; +import { Button } from "@/components/ui/button"; +import { resumeWorkflowRunServerFn } from "@/lib/api"; +import { isRunResumableStatus } from "@/lib/status"; +import type { WorkflowRunStatus } from "openworkflow/internal"; +import { useState } from "react"; + +interface RunResumeActionProps { + runId: string; + status: WorkflowRunStatus; + onResumed?: (() => Promise) | (() => void); +} + +function getErrorMessage(error: unknown): string { + if (error instanceof Error && error.message) { + return error.message; + } + + return "Unable to resume workflow run"; +} + +export function RunResumeAction({ + runId, + status, + onResumed, +}: RunResumeActionProps) { + const [isOpen, setIsOpen] = useState(false); + const [isResuming, setIsResuming] = useState(false); + const [error, setError] = useState(null); + + if (!isRunResumableStatus(status)) { + return null; + } + + async function resumeRun() { + setIsResuming(true); + setError(null); + + try { + await resumeWorkflowRunServerFn({ + data: { + workflowRunId: runId, + }, + }); + await onResumed?.(); + setIsOpen(false); + } catch (caughtError) { + setError(getErrorMessage(caughtError)); + } finally { + setIsResuming(false); + } + } + + return ( + { + setIsOpen(nextOpen); + if (!nextOpen) { + setError(null); + } + }} + > + + + + + Resume this failed run? + + Completed steps stay cached and won't re-run. The failing step will + be retried with a fresh retry budget. Previous failed attempts will + be discarded. + + + + {error &&

{error}

} + + + Cancel + { + void resumeRun(); + }} + disabled={isResuming} + > + {isResuming ? "Resuming..." : "Resume Run"} + + +
+
+ ); +} diff --git a/apps/dashboard/src/lib/api.ts b/apps/dashboard/src/lib/api.ts index 41ff0e86..ab783c79 100644 --- a/apps/dashboard/src/lib/api.ts +++ b/apps/dashboard/src/lib/api.ts @@ -90,6 +90,18 @@ export const cancelWorkflowRunServerFn = createServerFn({ method: "POST" }) return backend.cancelWorkflowRun({ workflowRunId: data.workflowRunId }); }); +/** + * Resume a failed workflow run by ID. Flips the run back to `pending` and + * drops failed step attempts so the failing step starts with a fresh retry + * budget; completed steps stay cached. + */ +export const resumeWorkflowRunServerFn = createServerFn({ method: "POST" }) + .inputValidator(z.object({ workflowRunId: z.string() })) + .handler(async ({ data }): Promise => { + const backend = await getBackend(); + return backend.resumeWorkflowRun({ workflowRunId: data.workflowRunId }); + }); + /** * List step attempts for a workflow run. */ diff --git a/apps/dashboard/src/lib/status.ts b/apps/dashboard/src/lib/status.ts index af762394..be95d797 100644 --- a/apps/dashboard/src/lib/status.ts +++ b/apps/dashboard/src/lib/status.ts @@ -169,6 +169,11 @@ const CANCELABLE_RUN_STATUSES: ReadonlySet = new Set([ "sleeping", ]); +/** Run statuses that can be resumed from the dashboard. */ +const RESUMABLE_RUN_STATUSES: ReadonlySet = new Set([ + "failed", +]); + const fallbackStatusConfig = STATUS_CONFIG.pending; export function getRunStatusConfig(status: string): StatusConfig { @@ -198,3 +203,7 @@ export function getStatusStatIconClass(status: string): string { export function isRunCancelableStatus(status: string): boolean { return CANCELABLE_RUN_STATUSES.has(status as WorkflowRunStatus); } + +export function isRunResumableStatus(status: string): boolean { + return RESUMABLE_RUN_STATUSES.has(status as WorkflowRunStatus); +} diff --git a/apps/dashboard/src/routes/runs/$runId.tsx b/apps/dashboard/src/routes/runs/$runId.tsx index 10beee0f..43d8a558 100644 --- a/apps/dashboard/src/routes/runs/$runId.tsx +++ b/apps/dashboard/src/routes/runs/$runId.tsx @@ -1,5 +1,6 @@ import { AppLayout } from "@/components/app-layout"; import { RunCancelAction } from "@/components/run-cancel-action"; +import { RunResumeAction } from "@/components/run-resume-action"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { Card } from "@/components/ui/card"; @@ -303,7 +304,14 @@ function RunDetailsPage() { /> )} -
+
+ { + await router.invalidate(); + }} + /> workflow marked failed."); +console.log( + "Then open http://localhost:3000, click 'Resume Run', and watch it complete.\n", +); + +const handle = await ow.runWorkflow(flakyPayment.spec, { + cartId: "cart_demo_1", + amountCents: 4200, +}); + +console.log(`Run id: ${handle.workflowRun.id}`); +console.log("Waiting for first terminal state..."); + +try { + const result = await handle.result(); + console.log(`Workflow completed: ${JSON.stringify(result, null, 2)}`); +} catch (error) { + console.log("\nWorkflow failed (as expected on first pass):"); + console.log(error instanceof Error ? error.message : String(error)); + console.log( + "\nNow click 'Resume Run' on the run detail page in the dashboard.", + ); + console.log( + "Leave the worker running so the in-memory attempt counter persists.", + ); +} + +await backend.stop(); diff --git a/openworkflow/flaky-payment.ts b/openworkflow/flaky-payment.ts new file mode 100644 index 00000000..d4d719ec --- /dev/null +++ b/openworkflow/flaky-payment.ts @@ -0,0 +1,95 @@ +import { defineWorkflow } from "openworkflow"; + +interface FlakyPaymentInput { + cartId: string; + amountCents: number; +} + +interface FlakyPaymentOutput { + cartId: string; + authorizationId: string; + receiptId: string; + attemptsForReserve: number; +} + +const RESERVE_MAX_ATTEMPTS = 3; + +// Module-scoped counter. Persists across the failure-then-resume cycle as long +// as the worker process is alive, so the step succeeds on the first attempt +// after `ow.resumeWorkflowRun` is called. +let reserveAttempt = 0; + +/** + * Demo workflow for the Resume feature. + * + * Flow: + * 1. First run: "reserve-funds" throws on every attempt; the step's retry + * budget (RESERVE_MAX_ATTEMPTS) is exhausted, so the workflow run ends in + * `failed`. The downstream steps never run. + * 2. Click "Resume Run" in the dashboard (or call `ow.resumeWorkflowRun(id)`). + * The failed step_attempt rows are dropped and the run is requeued. + * 3. On the next worker tick, "validate-cart" is served from the cache (not + * re-executed); "reserve-funds" runs once more, the counter is now past + * RESERVE_MAX_ATTEMPTS so it returns successfully, and "confirm-payment" + * plus "send-receipt" proceed to completion. + * + * Note: this relies on `reserveAttempt` persisting in the worker process. If + * you restart the worker between the failure and the resume, the counter + * resets, so resume will fail again and you'll need to resume once more. + */ +export const flakyPayment = defineWorkflow( + { name: "flaky-payment" }, + async ({ input, step, run }) => { + console.log(`[run ${run.id}] flaky-payment for cart ${input.cartId}`); + + await step.run({ name: "validate-cart" }, () => { + if (input.amountCents <= 0) { + throw new Error("amountCents must be positive"); + } + }); + + const { authorizationId, attempts } = await step.run( + { + name: "reserve-funds", + retryPolicy: { + maximumAttempts: RESERVE_MAX_ATTEMPTS, + initialInterval: "500ms", + }, + }, + () => { + reserveAttempt++; + console.log(`reserve-funds attempt ${String(reserveAttempt)}`); + + if (reserveAttempt <= RESERVE_MAX_ATTEMPTS) { + throw new Error( + `simulated upstream 503 (attempt ${String(reserveAttempt)})`, + ); + } + + console.log( + `reserve-funds recovered on attempt ${String(reserveAttempt)} (after resume)`, + ); + return { + authorizationId: `auth_${input.cartId}_${String(Date.now())}`, + attempts: reserveAttempt, + }; + }, + ); + + const receiptId = await step.run({ name: "confirm-payment" }, () => { + console.log(`confirming with ${authorizationId}`); + return `rcpt_${input.cartId}`; + }); + + await step.run({ name: "send-receipt" }, () => { + console.log(`receipt ${receiptId} mailed`); + }); + + return { + cartId: input.cartId, + authorizationId, + receiptId, + attemptsForReserve: attempts, + }; + }, +); diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index e5f1f9fd..411f4814 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -175,6 +175,23 @@ export class OpenWorkflow { await this.backend.cancelWorkflowRun({ workflowRunId }); } + /** + * Resume a failed workflow run. The run's status flips back to `pending` + * so the next worker tick picks it up. Already-completed steps are served + * from history without re-executing; failed step attempts are discarded so + * the failing step starts with a fresh retry budget. + * @param workflowRunId - The ID of the failed workflow run to resume + * @returns The updated workflow run + * @throws {Error} If the run does not exist or is not in `failed` status + * @example + * ```ts + * await ow.resumeWorkflowRun("123"); + * ``` + */ + async resumeWorkflowRun(workflowRunId: string): Promise { + return await this.backend.resumeWorkflowRun({ workflowRunId }); + } + /** * Send a signal to all workflows currently waiting on the given signal * string. If no workflow is waiting, the signal is silently dropped. diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 9edf79be..e2f93be4 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -47,6 +47,9 @@ export interface Backend { cancelWorkflowRun( params: Readonly, ): Promise; + resumeWorkflowRun( + params: Readonly, + ): Promise; // Step Attempts createStepAttempt( @@ -140,6 +143,10 @@ export interface CancelWorkflowRunParams { workflowRunId: string; } +export interface ResumeWorkflowRunParams { + workflowRunId: string; +} + export interface CreateStepAttemptParams { workflowRunId: string; workerId: string; diff --git a/packages/openworkflow/core/workflow-run.ts b/packages/openworkflow/core/workflow-run.ts index a3ffeb8f..5f45ef08 100644 --- a/packages/openworkflow/core/workflow-run.ts +++ b/packages/openworkflow/core/workflow-run.ts @@ -63,6 +63,30 @@ export function resolveCancelWorkflowRunConflict( throw new Error("Failed to cancel workflow run"); } +/** + * Resolve the outcome when a resumeWorkflowRun UPDATE affected no rows. The + * UPDATE is gated on `status = 'failed'`, so a zero-row outcome means either + * the run doesn't exist or it isn't in a resumable state. + * @param workflowRunId - ID of the workflow run (used in error messages) + * @param existing - Current workflow run, or null if not found + * @returns Never; always throws describing why the resume is impossible + * @throws {Error} If the run does not exist or is not in `failed` status + */ +export function resolveResumeWorkflowRunConflict( + workflowRunId: string, + existing: Readonly | null, +): never { + if (!existing) { + // eslint-disable-next-line functional/no-throw-statements + throw new Error(`Workflow run ${workflowRunId} does not exist`); + } + + // eslint-disable-next-line functional/no-throw-statements + throw new Error( + `Cannot resume workflow run ${workflowRunId} with status ${existing.status}; only failed runs can be resumed`, + ); +} + /** * WorkflowRun represents a single execution instance of a workflow. */ diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index cf8188f0..1188bc63 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -5,6 +5,7 @@ import { Backend, WorkflowRunCounts, CancelWorkflowRunParams, + ResumeWorkflowRunParams, ClaimWorkflowRunParams, CreateStepAttemptParams, CreateWorkflowRunParams, @@ -37,6 +38,7 @@ import { StepAttempt } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate } from "../core/workflow-definition.js"; import { resolveCancelWorkflowRunConflict, + resolveResumeWorkflowRunConflict, WorkflowRun, } from "../core/workflow-run.js"; import { @@ -721,6 +723,50 @@ export class BackendPostgres implements Backend { return updated; } + async resumeWorkflowRun( + params: ResumeWorkflowRunParams, + ): Promise { + return await this.pg.begin(async (sql): Promise => { + const tx = sql as unknown as Postgres; + const workflowRunsTable = this.workflowRunsTable(tx); + const stepAttemptsTable = this.stepAttemptsTable(tx); + + const [updated] = await tx` + UPDATE ${workflowRunsTable} + SET + "status" = 'pending', + "worker_id" = NULL, + "error" = NULL, + "finished_at" = NULL, + "available_at" = NOW(), + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ${params.workflowRunId} + AND "status" = 'failed' + RETURNING * + `; + + if (!updated) { + const existing = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + resolveResumeWorkflowRunConflict(params.workflowRunId, existing); + } + + // Drop the prior failed attempts so the next worker pass starts the + // failed step with a fresh retry budget and the existing completed + // attempts remain in cache (replay skips re-execution). + await tx` + DELETE FROM ${stepAttemptsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "status" = 'failed' + `; + + return updated; + }); + } + private async wakeParentWorkflowRun( childWorkflowRun: Readonly, ): Promise { diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 182d42e5..e727d75d 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -4,6 +4,7 @@ import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, Backend, CancelWorkflowRunParams, + ResumeWorkflowRunParams, ClaimWorkflowRunParams, CreateStepAttemptParams, CreateWorkflowRunParams, @@ -37,6 +38,7 @@ import { StepAttempt } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate } from "../core/workflow-definition.js"; import { resolveCancelWorkflowRunConflict, + resolveResumeWorkflowRunConflict, WorkflowRun, } from "../core/workflow-run.js"; import { @@ -746,6 +748,71 @@ export class BackendSqlite implements Backend { return updated; } + async resumeWorkflowRun( + params: ResumeWorkflowRunParams, + ): Promise { + const currentTime = now(); + + try { + this.db.exec("BEGIN IMMEDIATE"); + + const updateStmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "status" = 'pending', + "worker_id" = NULL, + "error" = NULL, + "finished_at" = NULL, + "available_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" = 'failed' + `); + + const updateResult = updateStmt.run( + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + ); + + if (updateResult.changes === 0) { + this.db.exec("ROLLBACK"); + const existing = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + resolveResumeWorkflowRunConflict(params.workflowRunId, existing); + } + + this.db + .prepare( + ` + DELETE FROM "step_attempts" + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "status" = 'failed' + `, + ) + .run(this.namespaceId, params.workflowRunId); + + this.db.exec("COMMIT"); + } catch (error) { + try { + this.db.exec("ROLLBACK"); + } catch { + // ignore + } + throw error; + } + + const updated = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + requireRow(updated, "resume workflow run"); + return updated; + } + /** * Return positional placeholders for {@link RUNNING_WORKFLOW_RUN_OWNED_WHERE} * in the order the fragment expects: namespace, run id, worker id. diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index ef8dce19..3299be90 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -2993,6 +2993,119 @@ describe("StepExecutor", () => { expect(status).toBe("failed"); sendSignalSpy.mockRestore(); }); + + test("resumeWorkflowRun re-runs the failed step without re-executing completed steps", async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); + + let validateRuns = 0; + let flakyRuns = 0; + let shouldFail = true; + + const workflow = client.defineWorkflow( + { name: `resume-from-failure-${randomUUID()}` }, + async ({ step }) => { + const validated = await step.run({ name: "validate" }, () => { + validateRuns++; + return "ok"; + }); + + const flaky = await step.run( + { name: "flaky", retryPolicy: { maximumAttempts: 2 } }, + () => { + flakyRuns++; + if (shouldFail) { + throw new Error("simulated upstream failure"); + } + return "recovered"; + }, + ); + + return { validated, flaky }; + }, + ); + + const worker = client.newWorker({ concurrency: 1 }); + const handle = await workflow.run(); + + const failedStatus = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 40, + 25, + ); + expect(failedStatus).toBe("failed"); + expect(validateRuns).toBe(1); + expect(flakyRuns).toBe(2); + + const stepsBeforeResume = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const failedBefore = stepsBeforeResume.data.filter( + (s) => s.status === "failed", + ); + expect(failedBefore.length).toBe(2); + + shouldFail = false; + await backend.resumeWorkflowRun({ workflowRunId: handle.workflowRun.id }); + + const resumedRun = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(resumedRun?.status).toBe("pending"); + expect(resumedRun?.error).toBeNull(); + expect(resumedRun?.finishedAt).toBeNull(); + expect(resumedRun?.workerId).toBeNull(); + + const stepsAfterResume = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + expect( + stepsAfterResume.data.some((s) => s.status === "failed"), + ).toBe(false); + expect( + stepsAfterResume.data.some( + (s) => s.stepName === "validate" && s.status === "completed", + ), + ).toBe(true); + + const finalStatus = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 40, + 25, + ); + expect(finalStatus).toBe("completed"); + // "validate" was cached from the original run, not re-executed + expect(validateRuns).toBe(1); + // "flaky" ran 2 times on the original run + 1 on resume + expect(flakyRuns).toBe(3); + + const result = await handle.result(); + expect(result).toEqual({ validated: "ok", flaky: "recovered" }); + }); + + test("resumeWorkflowRun throws when the run is not in failed status", async () => { + const backend = await createTestBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `resume-invalid-${randomUUID()}` }, + async ({ step }) => { + return await step.run({ name: "noop" }, () => "ok"); + }, + ); + + const handle = await workflow.run(); + + await expect( + backend.resumeWorkflowRun({ workflowRunId: handle.workflowRun.id }), + ).rejects.toThrow(/Cannot resume workflow run.*pending/); + }); }); describe("executeWorkflow", () => {