Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions apps/dashboard/src/components/run-resume-action.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import {
AlertDialog,
AlertDialogAction,
AlertDialogCancel,
AlertDialogContent,
AlertDialogDescription,
AlertDialogFooter,
AlertDialogHeader,
AlertDialogTitle,
} from "@/components/ui/alert-dialog";
import { Button } from "@/components/ui/button";
import { resumeWorkflowRunServerFn } from "@/lib/api";
import { isRunResumableStatus } from "@/lib/status";
import type { WorkflowRunStatus } from "openworkflow/internal";
import { useState } from "react";

interface RunResumeActionProps {
runId: string;
status: WorkflowRunStatus;
onResumed?: (() => Promise<void>) | (() => void);
}

function getErrorMessage(error: unknown): string {
if (error instanceof Error && error.message) {
return error.message;
}

return "Unable to resume workflow run";
}

export function RunResumeAction({
runId,
status,
onResumed,
}: RunResumeActionProps) {
const [isOpen, setIsOpen] = useState(false);
const [isResuming, setIsResuming] = useState(false);
const [error, setError] = useState<string | null>(null);

if (!isRunResumableStatus(status)) {
return null;
}

async function resumeRun() {
setIsResuming(true);
setError(null);

try {
await resumeWorkflowRunServerFn({
data: {
workflowRunId: runId,
},
});
await onResumed?.();
setIsOpen(false);
} catch (caughtError) {
setError(getErrorMessage(caughtError));
} finally {
setIsResuming(false);
}
}

return (
<AlertDialog
open={isOpen}
Comment on lines +64 to +65
onOpenChange={(nextOpen) => {
Comment on lines +63 to +66
setIsOpen(nextOpen);
if (!nextOpen) {
setError(null);
}
}}
>
<Button
type="button"
variant="default"
Comment on lines +72 to +75
onClick={() => {
setIsOpen(true);
}}
disabled={isResuming}
>
Resume Run
</Button>
Comment thread
vudc marked this conversation as resolved.

<AlertDialogContent>
<AlertDialogHeader>
<AlertDialogTitle>Resume this failed run?</AlertDialogTitle>
<AlertDialogDescription>
Completed steps stay cached and won't re-run. The failing step will
be retried with a fresh retry budget. Previous failed attempts will
be discarded.
</AlertDialogDescription>
</AlertDialogHeader>

{error && <p className="text-destructive text-xs">{error}</p>}

<AlertDialogFooter>
<AlertDialogCancel disabled={isResuming}>Cancel</AlertDialogCancel>
<AlertDialogAction
onClick={() => {
void resumeRun();
}}
disabled={isResuming}
>
{isResuming ? "Resuming..." : "Resume Run"}
</AlertDialogAction>
</AlertDialogFooter>
</AlertDialogContent>
</AlertDialog>
);
}
12 changes: 12 additions & 0 deletions apps/dashboard/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ export const cancelWorkflowRunServerFn = createServerFn({ method: "POST" })
return backend.cancelWorkflowRun({ workflowRunId: data.workflowRunId });
});

/**
* Resume a failed workflow run by ID. Flips the run back to `pending` and
* drops failed step attempts so the failing step starts with a fresh retry
* budget; completed steps stay cached.
*/
export const resumeWorkflowRunServerFn = createServerFn({ method: "POST" })
.inputValidator(z.object({ workflowRunId: z.string() }))
.handler(async ({ data }): Promise<WorkflowRun> => {
const backend = await getBackend();
return backend.resumeWorkflowRun({ workflowRunId: data.workflowRunId });
});

/**
* List step attempts for a workflow run.
*/
Expand Down
9 changes: 9 additions & 0 deletions apps/dashboard/src/lib/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ const CANCELABLE_RUN_STATUSES: ReadonlySet<WorkflowRunStatus> = new Set([
"sleeping",
]);

/** Run statuses that can be resumed from the dashboard. */
const RESUMABLE_RUN_STATUSES: ReadonlySet<WorkflowRunStatus> = new Set([
"failed",
]);

const fallbackStatusConfig = STATUS_CONFIG.pending;

export function getRunStatusConfig(status: string): StatusConfig {
Expand Down Expand Up @@ -198,3 +203,7 @@ export function getStatusStatIconClass(status: string): string {
export function isRunCancelableStatus(status: string): boolean {
return CANCELABLE_RUN_STATUSES.has(status as WorkflowRunStatus);
}

export function isRunResumableStatus(status: string): boolean {
return RESUMABLE_RUN_STATUSES.has(status as WorkflowRunStatus);
}
10 changes: 9 additions & 1 deletion apps/dashboard/src/routes/runs/$runId.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AppLayout } from "@/components/app-layout";
import { RunCancelAction } from "@/components/run-cancel-action";
import { RunResumeAction } from "@/components/run-resume-action";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
import { Card } from "@/components/ui/card";
Expand Down Expand Up @@ -303,7 +304,14 @@ function RunDetailsPage() {
/>
)}
</div>
<div className="sm:shrink-0">
<div className="flex gap-2 sm:shrink-0">
<RunResumeAction
runId={run.id}
status={run.status}
onResumed={async () => {
await router.invalidate();
}}
/>
Comment thread
vudc marked this conversation as resolved.
<RunCancelAction
runId={run.id}
status={run.status}
Expand Down
32 changes: 32 additions & 0 deletions openworkflow/flaky-payment.run.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { backend, ow } from "./client.js";
import { flakyPayment } from "./flaky-payment.js";

console.log("Running flaky-payment workflow...");
console.log("Expected: 3 failed attempts -> workflow marked failed.");
console.log(
"Then open http://localhost:3000, click 'Resume Run', and watch it complete.\n",
);

const handle = await ow.runWorkflow(flakyPayment.spec, {
cartId: "cart_demo_1",
amountCents: 4200,
});

console.log(`Run id: ${handle.workflowRun.id}`);
console.log("Waiting for first terminal state...");

try {
const result = await handle.result();
console.log(`Workflow completed: ${JSON.stringify(result, null, 2)}`);
} catch (error) {
console.log("\nWorkflow failed (as expected on first pass):");
console.log(error instanceof Error ? error.message : String(error));
console.log(
"\nNow click 'Resume Run' on the run detail page in the dashboard.",
);
console.log(
"Leave the worker running so the in-memory attempt counter persists.",
);
}

await backend.stop();
95 changes: 95 additions & 0 deletions openworkflow/flaky-payment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { defineWorkflow } from "openworkflow";

interface FlakyPaymentInput {
cartId: string;
amountCents: number;
}

interface FlakyPaymentOutput {
cartId: string;
authorizationId: string;
receiptId: string;
attemptsForReserve: number;
}

const RESERVE_MAX_ATTEMPTS = 3;

// Module-scoped counter. Persists across the failure-then-resume cycle as long
// as the worker process is alive, so the step succeeds on the first attempt
// after `ow.resumeWorkflowRun` is called.
let reserveAttempt = 0;
Comment thread
vudc marked this conversation as resolved.

/**
* Demo workflow for the Resume feature.
*
* Flow:
* 1. First run: "reserve-funds" throws on every attempt; the step's retry
* budget (RESERVE_MAX_ATTEMPTS) is exhausted, so the workflow run ends in
* `failed`. The downstream steps never run.
* 2. Click "Resume Run" in the dashboard (or call `ow.resumeWorkflowRun(id)`).
* The failed step_attempt rows are dropped and the run is requeued.
* 3. On the next worker tick, "validate-cart" is served from the cache (not
* re-executed); "reserve-funds" runs once more, the counter is now past
* RESERVE_MAX_ATTEMPTS so it returns successfully, and "confirm-payment"
* plus "send-receipt" proceed to completion.
*
* Note: this relies on `reserveAttempt` persisting in the worker process. If
* you restart the worker between the failure and the resume, the counter
* resets, so resume will fail again and you'll need to resume once more.
*/
export const flakyPayment = defineWorkflow<FlakyPaymentInput, FlakyPaymentOutput>(
{ name: "flaky-payment" },
async ({ input, step, run }) => {
console.log(`[run ${run.id}] flaky-payment for cart ${input.cartId}`);

await step.run({ name: "validate-cart" }, () => {
if (input.amountCents <= 0) {
throw new Error("amountCents must be positive");
}
});

const { authorizationId, attempts } = await step.run(
{
name: "reserve-funds",
retryPolicy: {
maximumAttempts: RESERVE_MAX_ATTEMPTS,
initialInterval: "500ms",
},
},
() => {
reserveAttempt++;
console.log(`reserve-funds attempt ${String(reserveAttempt)}`);

if (reserveAttempt <= RESERVE_MAX_ATTEMPTS) {
throw new Error(
`simulated upstream 503 (attempt ${String(reserveAttempt)})`,
);
}
Comment on lines +63 to +67

console.log(
`reserve-funds recovered on attempt ${String(reserveAttempt)} (after resume)`,
);
return {
authorizationId: `auth_${input.cartId}_${String(Date.now())}`,
attempts: reserveAttempt,
};
},
);

const receiptId = await step.run({ name: "confirm-payment" }, () => {
console.log(`confirming with ${authorizationId}`);
return `rcpt_${input.cartId}`;
});

await step.run({ name: "send-receipt" }, () => {
console.log(`receipt ${receiptId} mailed`);
});

return {
cartId: input.cartId,
authorizationId,
receiptId,
attemptsForReserve: attempts,
};
},
);
17 changes: 17 additions & 0 deletions packages/openworkflow/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@ export class OpenWorkflow {
await this.backend.cancelWorkflowRun({ workflowRunId });
}

/**
* Resume a failed workflow run. The run's status flips back to `pending`
* so the next worker tick picks it up. Already-completed steps are served
* from history without re-executing; failed step attempts are discarded so
* the failing step starts with a fresh retry budget.
* @param workflowRunId - The ID of the failed workflow run to resume
* @returns The updated workflow run
* @throws {Error} If the run does not exist or is not in `failed` status
* @example
* ```ts
* await ow.resumeWorkflowRun("123");
* ```
*/
async resumeWorkflowRun(workflowRunId: string): Promise<WorkflowRun> {
return await this.backend.resumeWorkflowRun({ workflowRunId });
}

/**
* Send a signal to all workflows currently waiting on the given signal
* string. If no workflow is waiting, the signal is silently dropped.
Expand Down
7 changes: 7 additions & 0 deletions packages/openworkflow/core/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ export interface Backend {
cancelWorkflowRun(
params: Readonly<CancelWorkflowRunParams>,
): Promise<WorkflowRun>;
resumeWorkflowRun(
params: Readonly<ResumeWorkflowRunParams>,
): Promise<WorkflowRun>;

// Step Attempts
createStepAttempt(
Expand Down Expand Up @@ -140,6 +143,10 @@ export interface CancelWorkflowRunParams {
workflowRunId: string;
}

export interface ResumeWorkflowRunParams {
workflowRunId: string;
}

export interface CreateStepAttemptParams {
workflowRunId: string;
workerId: string;
Expand Down
24 changes: 24 additions & 0 deletions packages/openworkflow/core/workflow-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,30 @@ export function resolveCancelWorkflowRunConflict(
throw new Error("Failed to cancel workflow run");
}

/**
* Resolve the outcome when a resumeWorkflowRun UPDATE affected no rows. The
* UPDATE is gated on `status = 'failed'`, so a zero-row outcome means either
* the run doesn't exist or it isn't in a resumable state.
* @param workflowRunId - ID of the workflow run (used in error messages)
* @param existing - Current workflow run, or null if not found
* @returns Never; always throws describing why the resume is impossible
* @throws {Error} If the run does not exist or is not in `failed` status
*/
export function resolveResumeWorkflowRunConflict(
workflowRunId: string,
existing: Readonly<WorkflowRun> | null,
): never {
if (!existing) {
// eslint-disable-next-line functional/no-throw-statements
throw new Error(`Workflow run ${workflowRunId} does not exist`);
}

// eslint-disable-next-line functional/no-throw-statements
throw new Error(
`Cannot resume workflow run ${workflowRunId} with status ${existing.status}; only failed runs can be resumed`,
);
Comment thread
vudc marked this conversation as resolved.
}

/**
* WorkflowRun represents a single execution instance of a workflow.
*/
Expand Down
Loading