diff --git a/cspell.config.jsonc b/cspell.config.jsonc index 8a177f2a..104141b8 100644 --- a/cspell.config.jsonc +++ b/cspell.config.jsonc @@ -40,6 +40,7 @@ "heartbeating", "idempotently", "llms", + "unaliased", "wakeups", // usernames & contributor names diff --git a/packages/cli/commands.ts b/packages/cli/commands.ts index db1439f1..00540ef7 100644 --- a/packages/cli/commands.ts +++ b/packages/cli/commands.ts @@ -82,11 +82,7 @@ export async function init(options: CommandOptions = {}): Promise { initialValue: false, }); - if (!shouldOverride || p.isCancel(shouldOverride)) { - p.cancel("Setup canceled."); - // eslint-disable-next-line unicorn/no-process-exit - process.exit(0); - } + if (!shouldOverride || p.isCancel(shouldOverride)) cancelSetup(); configFileToDelete = configFile; } @@ -113,11 +109,7 @@ export async function init(options: CommandOptions = {}): Promise { initialValue: "sqlite", }); - if (p.isCancel(backendChoice)) { - p.cancel("Setup canceled."); - // eslint-disable-next-line unicorn/no-process-exit - process.exit(0); - } + if (p.isCancel(backendChoice)) cancelSetup(); const spinner = p.spinner(); @@ -148,11 +140,7 @@ export async function init(options: CommandOptions = {}): Promise { initialValue: true, }); - if (p.isCancel(shouldSetup)) { - p.cancel("Setup canceled."); - // eslint-disable-next-line unicorn/no-process-exit - process.exit(0); - } + if (p.isCancel(shouldSetup)) cancelSetup(); if (!shouldSetup) { p.outro("Setup skipped."); @@ -460,6 +448,15 @@ export async function dashboard(options: DashboardOptions = {}): Promise { // ----------------------------------------------------------------------------- +/** + * Show a canceled-setup message and exit the process with status 0. + */ +function cancelSetup(): never { + p.cancel("Setup canceled."); + // eslint-disable-next-line unicorn/no-process-exit + process.exit(0); +} + /** * Get workflow directories from config. * @param config - The loaded config @@ -554,11 +551,8 @@ function warnAboutDuplicateWorkflows( ): void { const duplicates = findDuplicateWorkflows(workflows); for (const duplicate of duplicates) { - const versionStr = duplicate.version - ? ` (version: ${duplicate.version})` - : ""; consola.warn( - `Duplicate workflow detected: "${duplicate.name}"${versionStr}`, + `Duplicate workflow detected: ${formatWorkflowIdentity(duplicate.name, duplicate.version)}`, ); consola.warn( "Multiple files export a workflow with the same name and version.", @@ -891,25 +885,46 @@ function createConfigFile(configFileName: string): void { } /** - * Create hello-world runner file. - * @param runFileName - The runner filename to write + * Write a file under the `openworkflow/` project directory, skipping when a + * file with the same name already exists. Progress is surfaced via the clack + * spinner using the provided label (e.g. "client file"). + * @param label - Lowercase label describing the file (used in spinner text) + * @param fileName - Filename to write inside `openworkflow/` + * @param content - File contents to write when the file does not exist */ -function createRunFile(runFileName: string): void { +function writeWorkflowFileIfMissing( + label: string, + fileName: string, + content: string, +): void { const spinner = p.spinner(); const workflowsDir = path.join(process.cwd(), "openworkflow"); if (!existsSync(workflowsDir)) { mkdirSync(workflowsDir, { recursive: true }); } - const runDestPath = path.join(workflowsDir, runFileName); - if (existsSync(runDestPath)) { - spinner.start("Checking hello-world runner..."); - spinner.stop(`Hello-world runner already exists at ${runDestPath}`); + const destPath = path.join(workflowsDir, fileName); + if (existsSync(destPath)) { + spinner.start(`Checking ${label}...`); + const capitalized = label.charAt(0).toUpperCase() + label.slice(1); + spinner.stop(`${capitalized} already exists at ${destPath}`); return; } - spinner.start("Creating hello-world runner..."); - writeFileSync(runDestPath, HELLO_WORLD_RUNNER, "utf8"); - spinner.stop(`Created hello-world runner at ${runDestPath}`); + spinner.start(`Creating ${label}...`); + writeFileSync(destPath, content, "utf8"); + spinner.stop(`Created ${label} at ${destPath}`); +} + +/** + * Create hello-world runner file. + * @param runFileName - The runner filename to write + */ +function createRunFile(runFileName: string): void { + writeWorkflowFileIfMissing( + "hello-world runner", + runFileName, + HELLO_WORLD_RUNNER, + ); } /** @@ -921,22 +936,11 @@ function createClientFile( backendChoice: BackendChoice, clientFileName: string, ): void { - const spinner = p.spinner(); - const workflowsDir = path.join(process.cwd(), "openworkflow"); - if (!existsSync(workflowsDir)) { - mkdirSync(workflowsDir, { recursive: true }); - } - const clientDestPath = path.join(workflowsDir, clientFileName); - if (existsSync(clientDestPath)) { - spinner.start("Checking client file..."); - spinner.stop(`Client file already exists at ${clientDestPath}`); - return; - } - - spinner.start("Creating client file..."); - const clientTemplate = getClientTemplate(backendChoice); - writeFileSync(clientDestPath, clientTemplate, "utf8"); - spinner.stop(`Created client file at ${clientDestPath}`); + writeWorkflowFileIfMissing( + "client file", + clientFileName, + getClientTemplate(backendChoice), + ); } /** @@ -944,24 +948,10 @@ function createClientFile( * @param exampleWorkflowFileName - The example workflow filename to write */ function createExampleWorkflow(exampleWorkflowFileName: string): void { - const spinner = p.spinner(); - const workflowsDir = path.join(process.cwd(), "openworkflow"); - if (!existsSync(workflowsDir)) { - mkdirSync(workflowsDir, { recursive: true }); - } - const helloWorldDestPath = path.join(workflowsDir, exampleWorkflowFileName); - if (existsSync(helloWorldDestPath)) { - spinner.start("Checking example (hello-world) workflow..."); - spinner.stop( - `Example (hello-world) workflow already exists at ${helloWorldDestPath}`, - ); - return; - } - - spinner.start("Creating example (hello-world) workflow..."); - writeFileSync(helloWorldDestPath, HELLO_WORLD_WORKFLOW, "utf8"); - spinner.stop( - `Created example (hello-world) workflow at ${helloWorldDestPath}`, + writeWorkflowFileIfMissing( + "example (hello-world) workflow", + exampleWorkflowFileName, + HELLO_WORLD_WORKFLOW, ); } @@ -977,12 +967,9 @@ function updateGitignoreForSqlite(): void { const gitignorePath = path.join(process.cwd(), ".gitignore"); const spinner = p.spinner(); spinner.start("Updating .gitignore..."); - const result = ensureGitignoreEntry( - gitignorePath, - "openworkflow/backend.db*", - ); + const added = ensureGitignoreEntry(gitignorePath, "openworkflow/backend.db*"); spinner.stop( - result.added + added ? "Added openworkflow/backend.db* to .gitignore" : "openworkflow/backend.db* already in .gitignore", ); @@ -1020,44 +1007,43 @@ function addWorkerScriptToPackageJson(): void { } /** - * Ensure a specific entry exists in a .gitignore file. Creates the file if it - * doesn't exist, appends the entry if not present. - * @param gitignorePath - Path to the .gitignore file - * @param entry - The entry to add (e.g. "openworkflow/backend.db*") - * @returns Object indicating whether the entry was added or already existed + * Append a line to a file if no existing line matches. Creates the file if it + * doesn't exist. + * @param filePath - Path to the file + * @param line - Line to append (without a trailing newline) + * @param matchesExisting - Predicate that returns true when an existing line + * should be treated as already representing `line` + * @returns Whether the line was appended */ -function ensureGitignoreEntry( - gitignorePath: string, - entry: string, -): { added: boolean; created: boolean } { - const fileExists = existsSync(gitignorePath); - let content = ""; - - if (fileExists) { - content = readFileSync(gitignorePath, "utf8"); - } - - // check if entry already exists - const lines = content.split("\n"); - const hasEntry = lines.some((line) => line.trim() === entry); - - if (hasEntry) { - return { added: false, created: false }; - } +function appendLineIfMissing( + filePath: string, + line: string, + matchesExisting: (existing: string) => boolean, +): boolean { + const content = existsSync(filePath) ? readFileSync(filePath, "utf8") : ""; - // add entry to .gitignore - let newContent: string; - if (content === "") { - newContent = `${entry}\n`; - } else if (content.endsWith("\n")) { - newContent = `${content}${entry}\n`; - } else { - newContent = `${content}\n${entry}\n`; + if (content.split("\n").some((existing) => matchesExisting(existing))) { + return false; } - writeFileSync(gitignorePath, newContent, "utf8"); + const separator = content === "" || content.endsWith("\n") ? "" : "\n"; + writeFileSync(filePath, `${content}${separator}${line}\n`, "utf8"); + return true; +} - return { added: true, created: !fileExists }; +/** + * Ensure a specific entry exists in a .gitignore file. Creates the file if it + * doesn't exist, appends the entry if not present. + * @param gitignorePath - Path to the .gitignore file + * @param entry - The entry to add (e.g. "openworkflow/backend.db*") + * @returns Whether the entry was appended + */ +function ensureGitignoreEntry(gitignorePath: string, entry: string): boolean { + return appendLineIfMissing( + gitignorePath, + entry, + (line) => line.trim() === entry, + ); } /** @@ -1067,13 +1053,13 @@ function updateEnvForPostgres(): void { const envPath = path.join(process.cwd(), ".env"); const spinner = p.spinner(); spinner.start("Updating .env..."); - const result = ensureEnvEntry( + const added = ensureEnvEntry( envPath, "OPENWORKFLOW_POSTGRES_URL", "postgresql://user:password@localhost:5432/openworkflow", ); spinner.stop( - result.added + added ? "Added OPENWORKFLOW_POSTGRES_URL to .env" : "OPENWORKFLOW_POSTGRES_URL already in .env", ); @@ -1121,6 +1107,20 @@ function readPackageJsonForDoctor(): PackageJsonForDoctor | null { } } +/** + * Pick the script file extension for generated files based on whether the + * project uses TypeScript. + * @param packageJson - Parsed package.json (or null if missing) + * @returns ".ts" when TypeScript is a dependency, otherwise ".js" + */ +function getScriptExtension( + packageJson: Readonly | null, +): ".ts" | ".js" { + return packageJson && hasDependency(packageJson, "typescript") + ? ".ts" + : ".js"; +} + /** * Determine the config filename to write during init. * @param packageJson - Parsed package.json (or null if missing) @@ -1129,11 +1129,7 @@ function readPackageJsonForDoctor(): PackageJsonForDoctor | null { export function getConfigFileName( packageJson: Readonly | null, ): string { - if (packageJson && hasDependency(packageJson, "typescript")) { - return "openworkflow.config.ts"; - } - - return "openworkflow.config.js"; + return `openworkflow.config${getScriptExtension(packageJson)}`; } /** @@ -1144,10 +1140,7 @@ export function getConfigFileName( export function getExampleWorkflowFileName( packageJson: Readonly | null, ): string { - const configFileName = getConfigFileName(packageJson); - const extension = path.extname(configFileName) || ".js"; - - return `hello-world${extension}`; + return `hello-world${getScriptExtension(packageJson)}`; } /** @@ -1158,10 +1151,7 @@ export function getExampleWorkflowFileName( export function getRunFileName( packageJson: Readonly | null, ): string { - const configFileName = getConfigFileName(packageJson); - const extension = path.extname(configFileName) || ".js"; - - return `hello-world.run${extension}`; + return `hello-world.run${getScriptExtension(packageJson)}`; } /** @@ -1172,10 +1162,7 @@ export function getRunFileName( export function getClientFileName( packageJson: Readonly | null, ): string { - const configFileName = getConfigFileName(packageJson); - const extension = path.extname(configFileName) || ".js"; - - return `client${extension}`; + return `client${getScriptExtension(packageJson)}`; } /** @@ -1238,50 +1225,18 @@ function warnIfMissingTsconfig( } /** - * Ensure a specific environment variable exists in a .env file. Creates the file if it - * doesn't exist, appends the variable if not present. + * Ensure a specific environment variable exists in a .env file. Creates the + * file if it doesn't exist, appends the variable if not present. * @param envPath - Path to the .env file * @param key - The environment variable key (e.g. "OPENWORKFLOW_POSTGRES_URL") * @param value - The default value for the environment variable - * @returns Object indicating whether the entry was added or already existed + * @returns Whether the entry was appended */ -function ensureEnvEntry( - envPath: string, - key: string, - value: string, -): { added: boolean; created: boolean } { - const fileExists = existsSync(envPath); - let content = ""; - - if (fileExists) { - content = readFileSync(envPath, "utf8"); - } - - // check if key already exists (looking for KEY= at start of line) - const lines = content.split("\n"); - const hasKey = lines.some((line) => { +function ensureEnvEntry(envPath: string, key: string, value: string): boolean { + return appendLineIfMissing(envPath, `${key}=${value}`, (line) => { const trimmed = line.trim(); return trimmed.startsWith(`${key}=`) || trimmed.startsWith(`${key} =`); }); - - if (hasKey) { - return { added: false, created: false }; - } - - // add entry to .env - let newContent: string; - const envEntry = `${key}=${value}`; - if (content === "") { - newContent = `${envEntry}\n`; - } else if (content.endsWith("\n")) { - newContent = `${content}${envEntry}\n`; - } else { - newContent = `${content}\n${envEntry}\n`; - } - - writeFileSync(envPath, newContent, "utf8"); - - return { added: true, created: !fileExists }; } /** diff --git a/packages/openworkflow/core/cursor.test.ts b/packages/openworkflow/core/cursor.test.ts index 93b62dfb..d2a5b5cc 100644 --- a/packages/openworkflow/core/cursor.test.ts +++ b/packages/openworkflow/core/cursor.test.ts @@ -1,4 +1,10 @@ -import { decodeCursor, encodeCursor, type Cursor } from "./cursor.js"; +import { + buildPaginatedResponse, + decodeCursor, + decodeListCursor, + encodeCursor, + type Cursor, +} from "./cursor.js"; import { describe, expect, test } from "vitest"; describe("encodeCursor", () => { @@ -166,3 +172,128 @@ describe("encodeCursor / decodeCursor round-trip", () => { } }); }); + +describe("decodeListCursor", () => { + const cursor: Cursor = { + createdAt: new Date("2026-01-15T12:34:56.789Z"), + id: "abc123", + }; + + test("returns null when neither after nor before is set", () => { + expect(decodeListCursor({})).toBeNull(); + }); + + test("decodes the after cursor when only after is set", () => { + const decoded = decodeListCursor({ after: encodeCursor(cursor) }); + expect(decoded?.id).toBe(cursor.id); + expect(decoded?.createdAt.getTime()).toBe(cursor.createdAt.getTime()); + }); + + test("decodes the before cursor when only before is set", () => { + const decoded = decodeListCursor({ before: encodeCursor(cursor) }); + expect(decoded?.id).toBe(cursor.id); + expect(decoded?.createdAt.getTime()).toBe(cursor.createdAt.getTime()); + }); + + test("throws when both after and before are set", () => { + const afterCursor: Cursor = { + createdAt: new Date("2026-02-01T00:00:00.000Z"), + id: "after", + }; + const beforeCursor: Cursor = { + createdAt: new Date("2026-01-01T00:00:00.000Z"), + id: "before", + }; + expect(() => + decodeListCursor({ + after: encodeCursor(afterCursor), + before: encodeCursor(beforeCursor), + }), + ).toThrow("Cannot specify both 'after' and 'before' cursors"); + }); + + test("ignores empty-string after and before", () => { + expect(decodeListCursor({ after: "", before: "" })).toBeNull(); + }); +}); + +/** + * Build a fixture row satisfying Cursor with a distinguishing `value` so + * pagination tests can make strict structural assertions. + * @param i - Zero-based row index + * @returns Fixture row + */ +function makeRow(i: number): Cursor & { value: number } { + return { + createdAt: new Date( + `2026-01-${String(i + 1).padStart(2, "0")}T00:00:00.000Z`, + ), + id: `row-${String(i)}`, + value: i, + }; +} + +describe("buildPaginatedResponse", () => { + test("returns all rows when under the limit with no cursors", () => { + const rows = [makeRow(0), makeRow(1)]; + const response = buildPaginatedResponse(rows, 10, false, false); + expect(response.data).toEqual(rows); + expect(response.pagination).toEqual({ next: null, prev: null }); + }); + + test("trims the trailing overflow row and exposes a next cursor", () => { + const rows = [makeRow(0), makeRow(1), makeRow(2)]; + const response = buildPaginatedResponse(rows, 2, false, false); + expect(response.data).toEqual([makeRow(0), makeRow(1)]); + expect(response.pagination.next).toBe(encodeCursor(makeRow(1))); + expect(response.pagination.prev).toBeNull(); + }); + + test("exposes a prev cursor when hasAfter but no overflow", () => { + const rows = [makeRow(0), makeRow(1)]; + const response = buildPaginatedResponse(rows, 5, true, false); + expect(response.data).toEqual(rows); + expect(response.pagination.next).toBeNull(); + expect(response.pagination.prev).toBe(encodeCursor(makeRow(0))); + }); + + test("exposes both cursors on a middle page with overflow and hasAfter", () => { + const rows = [makeRow(0), makeRow(1), makeRow(2)]; + const response = buildPaginatedResponse(rows, 2, true, false); + expect(response.data).toEqual([makeRow(0), makeRow(1)]); + expect(response.pagination.next).toBe(encodeCursor(makeRow(1))); + expect(response.pagination.prev).toBe(encodeCursor(makeRow(0))); + }); + + test("reverses rows when hasBefore is true and always exposes next", () => { + const rows = [makeRow(2), makeRow(1), makeRow(0)]; + const response = buildPaginatedResponse(rows, 5, false, true); + expect(response.data).toEqual([makeRow(0), makeRow(1), makeRow(2)]); + expect(response.pagination.next).toBe(encodeCursor(makeRow(2))); + expect(response.pagination.prev).toBeNull(); + }); + + test("drops leading overflow row and exposes prev when hasBefore overflows", () => { + // Backends over-fetch `limit + 1` rows in reverse order so the extra row + // lands at index 0 after reversing. Dropping it yields the page-sized + // window, and the next-row's cursor becomes `prev` for a further jump back. + const rows = [makeRow(3), makeRow(2), makeRow(1), makeRow(0)]; + const response = buildPaginatedResponse(rows, 3, false, true); + expect(response.data).toEqual([makeRow(1), makeRow(2), makeRow(3)]); + expect(response.pagination.next).toBe(encodeCursor(makeRow(3))); + expect(response.pagination.prev).toBe(encodeCursor(makeRow(1))); + }); + + test("returns empty pagination cursors for an empty page", () => { + const response = buildPaginatedResponse([], 10, false, false); + expect(response.data).toEqual([]); + expect(response.pagination).toEqual({ next: null, prev: null }); + }); + + test("does not mutate the input rows array", () => { + const rows = [makeRow(0), makeRow(1), makeRow(2)]; + const snapshot = [...rows]; + buildPaginatedResponse(rows, 5, false, true); + expect(rows).toEqual(snapshot); + }); +}); diff --git a/packages/openworkflow/core/cursor.ts b/packages/openworkflow/core/cursor.ts index c5c20edc..11866a53 100644 --- a/packages/openworkflow/core/cursor.ts +++ b/packages/openworkflow/core/cursor.ts @@ -1,3 +1,10 @@ +import type { PaginatedResponse } from "./backend.js"; + +/** + * Default page size applied when a pagination request omits `limit`. + */ +export const DEFAULT_PAGINATION_PAGE_SIZE = 100; + /** * Cursor used for pagination. Requires created_at and id fields. Because JS * Date does not natively support microsecond precision dates, created_at should @@ -33,3 +40,80 @@ export function decodeCursor(cursor: string): Cursor { id: parsed.id, }; } + +/** + * Decode the active cursor from list pagination params. Backends derive sort + * direction from the raw `before`/`after` flags, so passing both would mix the + * decoded cursor with an inverted ordering — reject that combination instead. + * @param params - Pagination params + * @returns Decoded cursor, or null when neither side is set + * @throws {Error} When both `after` and `before` are set + */ +export function decodeListCursor( + params: Readonly<{ after?: string; before?: string }>, +): Cursor | null { + if (params.after && params.before) { + // eslint-disable-next-line functional/no-throw-statements + throw new Error("Cannot specify both 'after' and 'before' cursors"); + } + if (params.after) return decodeCursor(params.after); + if (params.before) return decodeCursor(params.before); + return null; +} + +/** + * Assemble a {@link PaginatedResponse} from an over-fetched row batch. Backends + * query `limit + 1` rows to determine whether a next/previous page exists, and + * this helper encapsulates the trim-and-reverse logic that decision requires. + * + * When `hasBefore` is true, the rows arrive in reverse order (ASC instead of + * the forward query's DESC), so the response is reversed and the leading extra + * row (if any) signals a previous page. Otherwise the trailing extra row (if + * any) signals a next page. + * @param rows - Rows fetched from the database (may exceed `limit` by one) + * @param limit - The caller-facing page size + * @param hasAfter - Whether the caller supplied an `after` cursor + * @param hasBefore - Whether the caller supplied a `before` cursor + * @returns Paginated response with next/prev cursors + */ +export function buildPaginatedResponse( + rows: readonly T[], + limit: number, + hasAfter: boolean, + hasBefore: boolean, +): PaginatedResponse { + const overflow = rows.length > limit; + const ordered = hasBefore ? rows.toReversed() : [...rows]; + const data = trimOverflow(ordered, overflow, hasBefore); + + const hasNext = hasBefore || overflow; + const hasPrev = hasBefore ? overflow : hasAfter; + + const lastItem = data.at(-1); + const firstItem = data[0]; + + return { + data, + pagination: { + next: hasNext && lastItem ? encodeCursor(lastItem) : null, + prev: hasPrev && firstItem ? encodeCursor(firstItem) : null, + }, + }; +} + +/** + * Drop the extra over-fetched row from the leading or trailing edge, leaving + * the page-sized window that callers should see. + * @param rows - Oriented rows (reversed when `hasBefore` is true) + * @param overflow - Whether the caller received more rows than the page size + * @param hasBefore - Whether the caller supplied a `before` cursor + * @returns Rows trimmed to the page size + */ +function trimOverflow( + rows: readonly T[], + overflow: boolean, + hasBefore: boolean, +): T[] { + if (!overflow) return [...rows]; + return hasBefore ? rows.slice(1) : rows.slice(0, -1); +} diff --git a/packages/openworkflow/core/error.ts b/packages/openworkflow/core/error.ts index 4fed6953..b667a87b 100644 --- a/packages/openworkflow/core/error.ts +++ b/packages/openworkflow/core/error.ts @@ -55,3 +55,18 @@ export function wrapError(message: string, error: unknown): Error { const { message: wrappedMessage } = serializeError(error); return new Error(`${message}: ${wrappedMessage}`, { cause: error }); } + +/** + * Assert a backend mutation returned a row, throwing `Failed to ${operation}` + * otherwise. + * @param row - The row returned by the backend (or undefined/null if none matched) + * @param operation - Suffix describing the attempted mutation + * @throws {Error} When the row is null or undefined + */ +export function requireRow( + row: T, + operation: string, +): asserts row is NonNullable { + // eslint-disable-next-line functional/no-throw-statements + if (!row) throw new Error(`Failed to ${operation}`); +} diff --git a/packages/openworkflow/core/workflow-definition.ts b/packages/openworkflow/core/workflow-definition.ts index 5818bab1..a9e6705a 100644 --- a/packages/openworkflow/core/workflow-definition.ts +++ b/packages/openworkflow/core/workflow-definition.ts @@ -166,37 +166,38 @@ export function computeFailedWorkflowRunUpdate( error: Readonly, now: Readonly, ): FailedWorkflowRunUpdate { - if (deadlineAt && now >= deadlineAt) { + /** + * Build the terminal "failed" update payload using the captured `now`. + * @param finalError - Error to record on the run + * @returns Failed run update payload + */ + function failed( + finalError: Readonly, + ): FailedWorkflowRunUpdate { return { status: "failed", availableAt: null, finishedAt: now, - error: { message: "Workflow run deadline exceeded" }, + error: finalError, }; } + if (deadlineAt && now >= deadlineAt) { + return failed({ message: "Workflow run deadline exceeded" }); + } + if ( retryPolicy.maximumAttempts > 0 && // 0 = unlimited attempts attempts >= retryPolicy.maximumAttempts ) { - return { - status: "failed", - availableAt: null, - finishedAt: now, - error, - }; + return failed(error); } const retryDelayMs = computeBackoffDelayMs(retryPolicy, attempts); const nextRetryAt = new Date(now.getTime() + retryDelayMs); if (deadlineAt && nextRetryAt >= deadlineAt) { - return { - status: "failed", - availableAt: null, - finishedAt: now, - error, - }; + return failed(error); } return { diff --git a/packages/openworkflow/core/workflow-run.test.ts b/packages/openworkflow/core/workflow-run.test.ts index 8b6765b3..3fa5da9a 100644 --- a/packages/openworkflow/core/workflow-run.test.ts +++ b/packages/openworkflow/core/workflow-run.test.ts @@ -1,6 +1,10 @@ import type { StandardSchemaV1 } from "./standard-schema.js"; -import { isTerminalStatus, validateInput } from "./workflow-run.js"; -import type { WorkflowRunStatus } from "./workflow-run.js"; +import { + isTerminalStatus, + resolveCancelWorkflowRunConflict, + validateInput, +} from "./workflow-run.js"; +import type { WorkflowRun, WorkflowRunStatus } from "./workflow-run.js"; import { describe, expect, test } from "vitest"; describe("isTerminalStatus", () => { @@ -143,6 +147,63 @@ describe("validateInput", () => { }); }); +describe("resolveCancelWorkflowRunConflict", () => { + function makeWorkflowRun(status: WorkflowRunStatus): WorkflowRun { + return { + namespaceId: "ns", + id: "wr-1", + workflowName: "wf", + version: null, + status, + idempotencyKey: null, + config: {}, + context: null, + input: null, + output: null, + error: null, + attempts: 0, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + workerId: null, + availableAt: null, + deadlineAt: null, + startedAt: null, + finishedAt: null, + createdAt: new Date(0), + updatedAt: new Date(0), + }; + } + + test("throws when existing run is null", () => { + expect(() => resolveCancelWorkflowRunConflict("wr-1", null)).toThrow( + "Workflow run wr-1 does not exist", + ); + }); + + test("returns the existing run when already canceled", () => { + const run = makeWorkflowRun("canceled"); + expect(resolveCancelWorkflowRunConflict("wr-1", run)).toBe(run); + }); + + test.each(["succeeded", "completed", "failed"])( + "throws when existing status is terminal non-canceled (%s)", + (status) => { + expect(() => + resolveCancelWorkflowRunConflict("wr-1", makeWorkflowRun(status)), + ).toThrow(`Cannot cancel workflow run wr-1 with status ${status}`); + }, + ); + + test.each(["pending", "running", "sleeping"])( + "throws generic failure when existing status is non-terminal (%s)", + (status) => { + expect(() => + resolveCancelWorkflowRunConflict("wr-1", makeWorkflowRun(status)), + ).toThrow("Failed to cancel workflow run"); + }, + ); +}); + function createMockSchema(options: { validate: ( input: unknown, diff --git a/packages/openworkflow/core/workflow-run.ts b/packages/openworkflow/core/workflow-run.ts index 8c91f7e2..a3ffeb8f 100644 --- a/packages/openworkflow/core/workflow-run.ts +++ b/packages/openworkflow/core/workflow-run.ts @@ -28,6 +28,41 @@ export function isTerminalStatus(status: WorkflowRunStatus): boolean { ); } +/** + * Resolve the outcome when a cancelWorkflowRun UPDATE affected no rows. + * Returns the existing run if already canceled (idempotent), otherwise throws + * an error describing why cancellation is impossible. + * @param workflowRunId - ID of the workflow run (used in error messages) + * @param existing - Current workflow run, or null if not found + * @returns The existing workflow run when already canceled + * @throws {Error} If the run does not exist, is in a non-canceled terminal + * state, or cannot be canceled for an unknown reason + */ +export function resolveCancelWorkflowRunConflict( + workflowRunId: string, + existing: Readonly | null, +): WorkflowRun { + if (!existing) { + // eslint-disable-next-line functional/no-throw-statements + throw new Error(`Workflow run ${workflowRunId} does not exist`); + } + + if (existing.status === "canceled") { + return existing; + } + + // 'succeeded' status is deprecated + if (["succeeded", "completed", "failed"].includes(existing.status)) { + // eslint-disable-next-line functional/no-throw-statements + throw new Error( + `Cannot cancel workflow run ${workflowRunId} with status ${existing.status}`, + ); + } + + // eslint-disable-next-line functional/no-throw-statements + throw new Error("Failed to cancel workflow run"); +} + /** * WorkflowRun represents a single execution instance of a workflow. */ diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index dacec07e..cf8188f0 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -25,23 +25,30 @@ import { SendSignalResult, GetSignalDeliveryParams, } from "../core/backend.js"; -import { decodeCursor, encodeCursor, type Cursor } from "../core/cursor.js"; -import { wrapError } from "../core/error.js"; +import { + buildPaginatedResponse, + DEFAULT_PAGINATION_PAGE_SIZE, + decodeListCursor, + type Cursor, +} from "../core/cursor.js"; +import { requireRow, wrapError } from "../core/error.js"; import { JsonValue } from "../core/json.js"; import { StepAttempt } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate } from "../core/workflow-definition.js"; -import { WorkflowRun } from "../core/workflow-run.js"; +import { + resolveCancelWorkflowRunConflict, + WorkflowRun, +} from "../core/workflow-run.js"; import { newPostgres, newPostgresMaxOne, Postgres, + PostgresFragment, migrate, DEFAULT_SCHEMA, assertValidSchemaName, } from "./postgres.js"; -const DEFAULT_PAGINATION_PAGE_SIZE = 100; - interface BackendPostgresOptions { namespaceId?: string; runMigrations?: boolean; @@ -226,7 +233,7 @@ export class BackendPostgres implements Backend { RETURNING * `; - if (!workflowRun) throw new Error("Failed to create workflow run"); + requireRow(workflowRun, "create workflow run"); return workflowRun; } @@ -375,13 +382,7 @@ export class BackendPostgres implements Backend { ): Promise> { const limit = params.limit ?? DEFAULT_PAGINATION_PAGE_SIZE; const { after, before } = params; - - let cursor: Cursor | null = null; - if (after) { - cursor = decodeCursor(after); - } else if (before) { - cursor = decodeCursor(before); - } + const cursor = decodeListCursor(params); const whereClause = this.buildListWorkflowRunsWhere(params, cursor); const order = before @@ -397,7 +398,7 @@ export class BackendPostgres implements Backend { LIMIT ${limit + 1} `; - return this.processPaginationResults(rows, limit, !!after, !!before); + return buildPaginatedResponse(rows, limit, !!after, !!before); } private buildListWorkflowRunsWhere( @@ -414,16 +415,7 @@ export class BackendPostgres implements Backend { ); } - let whereClause = conditions[0]; - if (!whereClause) throw new Error("No conditions"); - - for (let i = 1; i < conditions.length; i++) { - const condition = conditions[i]; - if (condition) { - whereClause = this.pg`${whereClause} AND ${condition}`; - } - } - return whereClause; + return this.joinConditionsWithAnd(conditions); } async countWorkflowRuns(): Promise { @@ -504,14 +496,11 @@ export class BackendPostgres implements Backend { SET "available_at" = ${this.pg`NOW() + ${params.leaseDurationMs} * INTERVAL '1 millisecond'`}, "updated_at" = NOW() - WHERE "namespace_id" = ${this.namespaceId} - AND "id" = ${params.workflowRunId} - AND "status" = 'running' - AND "worker_id" = ${params.workerId} + WHERE ${this.runningWorkflowRunOwnedByWorkerWhere(params)} RETURNING * `; - if (!updated) throw new Error("Failed to extend lease for workflow run"); + requireRow(updated, "extend lease for workflow run"); return updated; } @@ -537,7 +526,7 @@ export class BackendPostgres implements Backend { RETURNING * `; - if (!updated) throw new Error("Failed to sleep workflow run"); + requireRow(updated, "sleep workflow run"); const reconciled = await this.reconcileWorkflowSleepWakeUp( params.workflowRunId, @@ -618,14 +607,11 @@ export class BackendPostgres implements Backend { "available_at" = NULL, "finished_at" = NOW(), "updated_at" = NOW() - WHERE "namespace_id" = ${this.namespaceId} - AND "id" = ${params.workflowRunId} - AND "status" = 'running' - AND "worker_id" = ${params.workerId} + WHERE ${this.runningWorkflowRunOwnedByWorkerWhere(params)} RETURNING * `; - if (!updated) throw new Error("Failed to mark workflow run completed"); + requireRow(updated, "mark workflow run completed"); await this.wakeParentWorkflowRun(updated); @@ -666,14 +652,11 @@ export class BackendPostgres implements Backend { "worker_id" = NULL, "started_at" = NULL, "updated_at" = NOW() - WHERE "namespace_id" = ${this.namespaceId} - AND "id" = ${workflowRunId} - AND "status" = 'running' - AND "worker_id" = ${params.workerId} + WHERE ${this.runningWorkflowRunOwnedByWorkerWhere(params)} RETURNING * `; - if (!updated) throw new Error("Failed to mark workflow run failed"); + requireRow(updated, "mark workflow run failed"); if (updated.status === "failed") { await this.wakeParentWorkflowRun(updated); @@ -697,18 +680,11 @@ export class BackendPostgres implements Backend { "worker_id" = NULL, "started_at" = NULL, "updated_at" = NOW() - WHERE "namespace_id" = ${this.namespaceId} - AND "id" = ${params.workflowRunId} - AND "status" = 'running' - AND "worker_id" = ${params.workerId} + WHERE ${this.runningWorkflowRunOwnedByWorkerWhere(params)} RETURNING * `; - if (!updated) { - throw new Error( - "Failed to reschedule workflow run after failed step attempt", - ); - } + requireRow(updated, "reschedule workflow run after failed step attempt"); return updated; } @@ -737,24 +713,7 @@ export class BackendPostgres implements Backend { const existing = await this.getWorkflowRun({ workflowRunId: params.workflowRunId, }); - if (!existing) { - throw new Error(`Workflow run ${params.workflowRunId} does not exist`); - } - - // if already canceled, just return it - if (existing.status === "canceled") { - return existing; - } - - // throw error for completed/failed workflows - // 'succeeded' status is deprecated - if (["succeeded", "completed", "failed"].includes(existing.status)) { - throw new Error( - `Cannot cancel workflow run ${params.workflowRunId} with status ${existing.status}`, - ); - } - - throw new Error("Failed to cancel workflow run"); + return resolveCancelWorkflowRunConflict(params.workflowRunId, existing); } await this.wakeParentWorkflowRun(updated); @@ -835,7 +794,7 @@ export class BackendPostgres implements Backend { RETURNING * `; - if (!stepAttempt) throw new Error("Failed to create step attempt"); + requireRow(stepAttempt, "create step attempt"); return stepAttempt; } @@ -853,20 +812,11 @@ export class BackendPostgres implements Backend { "child_workflow_run_id" = ${params.childWorkflowRunId}, "updated_at" = NOW() FROM ${workflowRunsTable} wr - WHERE sa."namespace_id" = ${this.namespaceId} - AND sa."workflow_run_id" = ${params.workflowRunId} - AND sa."id" = ${params.stepAttemptId} - AND sa."status" = 'running' - AND wr."namespace_id" = sa."namespace_id" - AND wr."id" = sa."workflow_run_id" - AND wr."status" = 'running' - AND wr."worker_id" = ${params.workerId} + WHERE ${this.runningStepAttemptOwnedByWorkerWhere(params)} RETURNING sa.* `; - if (!updated) { - throw new Error("Failed to set step attempt child workflow run"); - } + requireRow(updated, "set step attempt child workflow run"); return updated; } @@ -891,13 +841,7 @@ export class BackendPostgres implements Backend { ): Promise> { const limit = params.limit ?? DEFAULT_PAGINATION_PAGE_SIZE; const { after, before } = params; - - let cursor: Cursor | null = null; - if (after) { - cursor = decodeCursor(after); - } else if (before) { - cursor = decodeCursor(before); - } + const cursor = decodeListCursor(params); const whereClause = this.buildListStepAttemptsWhere(params, cursor); const order = before @@ -913,7 +857,7 @@ export class BackendPostgres implements Backend { LIMIT ${limit + 1} `; - return this.processPaginationResults(rows, limit, !!after, !!before); + return buildPaginatedResponse(rows, limit, !!after, !!before); } private buildListStepAttemptsWhere( @@ -933,6 +877,65 @@ export class BackendPostgres implements Backend { ); } + return this.joinConditionsWithAnd(conditions); + } + + /** + * Match a running workflow run currently owned by the given worker. Shared + * by the workflow-run mutation queries which all fence on this same + * condition. + * @param params - Identifiers for the workflow run and owning worker + * @returns Combined WHERE fragment for the unaliased workflow_runs table + */ + private runningWorkflowRunOwnedByWorkerWhere( + params: Readonly<{ workflowRunId: string; workerId: string }>, + ): PostgresFragment { + return this.pg` + "namespace_id" = ${this.namespaceId} + AND "id" = ${params.workflowRunId} + AND "status" = 'running' + AND "worker_id" = ${params.workerId} + `; + } + + /** + * Match a running step attempt currently owned by the given worker, joined + * against its parent workflow run (also running and held by the same + * worker). Shared by the step-attempt mutation queries which all fence on + * this same condition. + * @param params - Identifiers for the step attempt and owning worker + * @returns Combined WHERE fragment using `sa`/`wr` aliases + */ + private runningStepAttemptOwnedByWorkerWhere( + params: Readonly<{ + workflowRunId: string; + stepAttemptId: string; + workerId: string; + }>, + ): PostgresFragment { + return this.pg` + sa."namespace_id" = ${this.namespaceId} + AND sa."workflow_run_id" = ${params.workflowRunId} + AND sa."id" = ${params.stepAttemptId} + AND sa."status" = 'running' + AND wr."namespace_id" = sa."namespace_id" + AND wr."id" = sa."workflow_run_id" + AND wr."status" = 'running' + AND wr."worker_id" = ${params.workerId} + `; + } + + /** + * AND-compose a non-empty array of query fragments into a single WHERE + * expression. Empty arrays are rejected because the caller's conditions + * list is always seeded with at least one predicate. + * @param conditions - Query fragments to AND together + * @returns Combined WHERE fragment + * @throws {Error} When the array is empty + */ + private joinConditionsWithAnd( + conditions: PostgresFragment[], + ): PostgresFragment { let whereClause = conditions[0]; if (!whereClause) throw new Error("No conditions"); @@ -945,47 +948,6 @@ export class BackendPostgres implements Backend { return whereClause; } - private processPaginationResults( - rows: T[], - limit: number, - hasAfter: boolean, - hasBefore: boolean, - ): PaginatedResponse { - const data = rows; - let hasNext = false; - let hasPrev = false; - - if (hasBefore) { - data.reverse(); - if (data.length > limit) { - hasPrev = true; - data.shift(); - } - hasNext = true; - } else { - if (data.length > limit) { - hasNext = true; - data.pop(); - } - if (hasAfter) { - hasPrev = true; - } - } - - const lastItem = data.at(-1); - const nextCursor = hasNext && lastItem ? encodeCursor(lastItem) : null; - const firstItem = data[0]; - const prevCursor = hasPrev && firstItem ? encodeCursor(firstItem) : null; - - return { - data, - pagination: { - next: nextCursor, - prev: prevCursor, - }, - }; - } - async completeStepAttempt( params: CompleteStepAttemptParams, ): Promise { @@ -1001,18 +963,11 @@ export class BackendPostgres implements Backend { "finished_at" = NOW(), "updated_at" = NOW() FROM ${workflowRunsTable} wr - WHERE sa."namespace_id" = ${this.namespaceId} - AND sa."workflow_run_id" = ${params.workflowRunId} - AND sa."id" = ${params.stepAttemptId} - AND sa."status" = 'running' - AND wr."namespace_id" = sa."namespace_id" - AND wr."id" = sa."workflow_run_id" - AND wr."status" = 'running' - AND wr."worker_id" = ${params.workerId} + WHERE ${this.runningStepAttemptOwnedByWorkerWhere(params)} RETURNING sa.* `; - if (!updated) throw new Error("Failed to mark step attempt completed"); + requireRow(updated, "mark step attempt completed"); return updated; } @@ -1030,18 +985,11 @@ export class BackendPostgres implements Backend { "finished_at" = NOW(), "updated_at" = NOW() FROM ${workflowRunsTable} wr - WHERE sa."namespace_id" = ${this.namespaceId} - AND sa."workflow_run_id" = ${params.workflowRunId} - AND sa."id" = ${params.stepAttemptId} - AND sa."status" = 'running' - AND wr."namespace_id" = sa."namespace_id" - AND wr."id" = sa."workflow_run_id" - AND wr."status" = 'running' - AND wr."worker_id" = ${params.workerId} + WHERE ${this.runningStepAttemptOwnedByWorkerWhere(params)} RETURNING sa.* `; - if (!updated) throw new Error("Failed to mark step attempt failed"); + requireRow(updated, "mark step attempt failed"); return updated; } diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index b4620fea..f52f6523 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -8,6 +8,7 @@ export const DEFAULT_SCHEMA = "openworkflow"; export type Postgres = ReturnType; export type PostgresOptions = Parameters[1]; +export type PostgresFragment = postgres.Fragment; const SCHEMA_NAME_PATTERN = /^[a-zA-Z_]\w*$/; const MAX_POSTGRES_IDENTIFIER_BYTES = 63; diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 10dcd84c..182d42e5 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -25,12 +25,20 @@ import { GetSignalDeliveryParams, toWorkflowRunCounts, } from "../core/backend.js"; -import { decodeCursor, encodeCursor, type Cursor } from "../core/cursor.js"; -import { wrapError } from "../core/error.js"; +import { + buildPaginatedResponse, + type Cursor, + DEFAULT_PAGINATION_PAGE_SIZE, + decodeListCursor, +} from "../core/cursor.js"; +import { requireRow, wrapError } from "../core/error.js"; import { JsonValue } from "../core/json.js"; import { StepAttempt } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate } from "../core/workflow-definition.js"; -import { WorkflowRun } from "../core/workflow-run.js"; +import { + resolveCancelWorkflowRunConflict, + WorkflowRun, +} from "../core/workflow-run.js"; import { newDatabase, Database, @@ -44,13 +52,45 @@ import { fromISO, } from "./sqlite.js"; -const DEFAULT_PAGINATION_PAGE_SIZE = 100; - interface BackendSqliteOptions { namespaceId?: string; runMigrations?: boolean; } +/** + * WHERE fragment matching a running workflow run currently owned by the given + * worker. Consumes 3 positional placeholders: `namespace_id`, `id`, + * `worker_id`. Pair with {@link BackendSqlite#runningWorkflowRunOwnedParams} + * to keep the placeholders aligned. + */ +const RUNNING_WORKFLOW_RUN_OWNED_WHERE = ` + "namespace_id" = ? + AND "id" = ? + AND "status" = 'running' + AND "worker_id" = ?`; + +/** + * WHERE fragment matching a running step attempt whose parent workflow run is + * also running and held by the given worker. Consumes 6 positional + * placeholders: step attempt's namespace, workflow_run_id, id, then the + * parent workflow run's namespace, id, worker_id. Pair with + * {@link BackendSqlite#runningStepAttemptOwnedParams} to keep the + * placeholders aligned. + */ +const RUNNING_STEP_ATTEMPT_OWNED_WHERE = ` + "namespace_id" = ? + AND "workflow_run_id" = ? + AND "id" = ? + AND "status" = 'running' + AND EXISTS ( + SELECT 1 + FROM "workflow_runs" wr + WHERE wr."namespace_id" = ? + AND wr."id" = ? + AND wr."status" = 'running' + AND wr."worker_id" = ? + )`; + /** * Manages a connection to a SQLite database for workflow operations. */ @@ -195,7 +235,7 @@ export class BackendSqlite implements Backend { `, ) .get(this.namespaceId, id) as WorkflowRunRow | undefined; - if (!row) throw new Error("Failed to create workflow run"); + requireRow(row, "create workflow run"); return rowToWorkflowRun(row); } @@ -467,21 +507,16 @@ export class BackendSqlite implements Backend { SET "available_at" = ?, "updated_at" = ? - WHERE "namespace_id" = ? - AND "id" = ? - AND "status" = 'running' - AND "worker_id" = ? + WHERE ${RUNNING_WORKFLOW_RUN_OWNED_WHERE} RETURNING * `); const row = stmt.get( newAvailableAt, currentTime, - this.namespaceId, - params.workflowRunId, - params.workerId, + ...this.runningWorkflowRunOwnedParams(params), ) as WorkflowRunRow | undefined; - if (!row) throw new Error("Failed to extend lease for workflow run"); + requireRow(row, "extend lease for workflow run"); return await Promise.resolve(rowToWorkflowRun(row)); } @@ -541,7 +576,7 @@ export class BackendSqlite implements Backend { params.workflowRunId, params.workerId, ) as WorkflowRunRow | undefined; - if (!row) throw new Error("Failed to sleep workflow run"); + requireRow(row, "sleep workflow run"); return await Promise.resolve(rowToWorkflowRun(row)); } @@ -561,10 +596,7 @@ export class BackendSqlite implements Backend { "available_at" = NULL, "finished_at" = ?, "updated_at" = ? - WHERE "namespace_id" = ? - AND "id" = ? - AND "status" = 'running' - AND "worker_id" = ? + WHERE ${RUNNING_WORKFLOW_RUN_OWNED_WHERE} RETURNING * `); @@ -573,11 +605,9 @@ export class BackendSqlite implements Backend { params.workerId, currentTime, currentTime, - this.namespaceId, - params.workflowRunId, - params.workerId, + ...this.runningWorkflowRunOwnedParams(params), ) as WorkflowRunRow | undefined; - if (!row) throw new Error("Failed to mark workflow run completed"); + requireRow(row, "mark workflow run completed"); const updated = rowToWorkflowRun(row); this.wakeParentWorkflowRun(updated); @@ -617,10 +647,7 @@ export class BackendSqlite implements Backend { "worker_id" = NULL, "started_at" = NULL, "updated_at" = ? - WHERE "namespace_id" = ? - AND "id" = ? - AND "status" = 'running' - AND "worker_id" = ? + WHERE ${RUNNING_WORKFLOW_RUN_OWNED_WHERE} RETURNING * `); @@ -630,11 +657,9 @@ export class BackendSqlite implements Backend { failureUpdate.finishedAt?.toISOString() ?? null, toJSON(failureUpdate.error), currentTimeIso, - this.namespaceId, - workflowRunId, - params.workerId, + ...this.runningWorkflowRunOwnedParams(params), ) as WorkflowRunRow | undefined; - if (!row) throw new Error("Failed to mark workflow run failed"); + requireRow(row, "mark workflow run failed"); const updated = rowToWorkflowRun(row); if (updated.status === "failed") { this.wakeParentWorkflowRun(updated); @@ -657,10 +682,7 @@ export class BackendSqlite implements Backend { "worker_id" = NULL, "started_at" = NULL, "updated_at" = ? - WHERE "namespace_id" = ? - AND "id" = ? - AND "status" = 'running' - AND "worker_id" = ? + WHERE ${RUNNING_WORKFLOW_RUN_OWNED_WHERE} RETURNING * `); @@ -668,9 +690,7 @@ export class BackendSqlite implements Backend { toISO(params.availableAt), toJSON(params.error), currentTime, - this.namespaceId, - params.workflowRunId, - params.workerId, + ...this.runningWorkflowRunOwnedParams(params), ) as WorkflowRunRow | undefined; if (!row) { return Promise.reject( @@ -713,35 +733,54 @@ export class BackendSqlite implements Backend { const existing = await this.getWorkflowRun({ workflowRunId: params.workflowRunId, }); - if (!existing) { - throw new Error(`Workflow run ${params.workflowRunId} does not exist`); - } - - // if already canceled, just return it - if (existing.status === "canceled") { - return existing; - } - - // 'succeeded' status is deprecated - if (["succeeded", "completed", "failed"].includes(existing.status)) { - throw new Error( - `Cannot cancel workflow run ${params.workflowRunId} with status ${existing.status}`, - ); - } - - throw new Error("Failed to cancel workflow run"); + return resolveCancelWorkflowRunConflict(params.workflowRunId, existing); } const updated = await this.getWorkflowRun({ workflowRunId: params.workflowRunId, }); - if (!updated) throw new Error("Failed to cancel workflow run"); + requireRow(updated, "cancel workflow run"); this.wakeParentWorkflowRun(updated); return updated; } + /** + * Return positional placeholders for {@link RUNNING_WORKFLOW_RUN_OWNED_WHERE} + * in the order the fragment expects: namespace, run id, worker id. + * @param params - Workflow run identity params + * @returns Tuple of placeholder values + */ + private runningWorkflowRunOwnedParams( + params: Readonly<{ workflowRunId: string; workerId: string }>, + ): [string, string, string] { + return [this.namespaceId, params.workflowRunId, params.workerId]; + } + + /** + * Return positional placeholders for {@link RUNNING_STEP_ATTEMPT_OWNED_WHERE} + * in the order the fragment expects. + * @param params - Step attempt identity params + * @returns Tuple of placeholder values + */ + private runningStepAttemptOwnedParams( + params: Readonly<{ + workflowRunId: string; + stepAttemptId: string; + workerId: string; + }>, + ): [string, string, string, string, string, string] { + return [ + this.namespaceId, + params.workflowRunId, + params.stepAttemptId, + this.namespaceId, + params.workflowRunId, + params.workerId, + ]; + } + private wakeParentWorkflowRun(childWorkflowRun: Readonly): void { if ( !childWorkflowRun.parentStepAttemptNamespaceId || @@ -808,175 +847,84 @@ export class BackendSqlite implements Backend { listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { - const limit = params.limit ?? DEFAULT_PAGINATION_PAGE_SIZE; - const { after, before } = params; - - let cursor: Cursor | null = null; - if (after) { - cursor = decodeCursor(after); - } else if (before) { - cursor = decodeCursor(before); - } - - const order = before - ? `ORDER BY "created_at" ASC, "id" ASC` - : `ORDER BY "created_at" DESC, "id" DESC`; - - let query: string; - let queryParams: (string | number)[]; - - if (cursor) { - const op = after ? "<" : ">"; - query = ` - SELECT * - FROM "workflow_runs" - WHERE "namespace_id" = ? - AND ("created_at", "id") ${op} (?, ?) - ${order} - LIMIT ? - `; - queryParams = [ - this.namespaceId, - cursor.createdAt.toISOString(), - cursor.id, - limit + 1, - ]; - } else { - query = ` - SELECT * - FROM "workflow_runs" - WHERE "namespace_id" = ? - ${order} - LIMIT ? - `; - queryParams = [this.namespaceId, limit + 1]; - } - - const stmt = this.db.prepare(query); - const rawRows = stmt.all(...queryParams); - - if (!Array.isArray(rawRows)) { - return Promise.resolve({ - data: [], - pagination: { next: null, prev: null }, - }); - } - - const rows = rawRows.map((row) => rowToWorkflowRun(row as WorkflowRunRow)); - - return Promise.resolve( - this.processPaginationResults(rows, limit, !!after, !!before), - ); + return this.listPaginated(params, { + table: "workflow_runs", + naturalOrder: "DESC", + baseWhere: `"namespace_id" = ?`, + baseParams: [this.namespaceId], + mapRow: (row) => rowToWorkflowRun(row as WorkflowRunRow), + }); } listStepAttempts( params: ListStepAttemptsParams, ): Promise> { + return this.listPaginated(params, { + table: "step_attempts", + naturalOrder: "ASC", + baseWhere: `"namespace_id" = ? AND "workflow_run_id" = ?`, + baseParams: [this.namespaceId, params.workflowRunId], + mapRow: (row) => rowToStepAttempt(row as StepAttemptRow), + }); + } + + /** + * Execute a cursor-paginated SELECT against a namespace-scoped table. + * `before` reverses the table's natural order, and the cursor comparison + * operator follows the effective direction so over-fetched rows line up + * with {@link buildPaginatedResponse}'s expectations. + * @param params - Pagination params (limit/after/before) + * @param options - Query shape + * @param options.table - Table name to select from + * @param options.naturalOrder - Default sort direction for this table + * @param options.baseWhere - WHERE fragment with `?` placeholders + * @param options.baseParams - Values for the placeholders in `baseWhere` + * @param options.mapRow - Convert a raw row to the domain type + * @returns Paginated response + */ + private listPaginated( + params: Readonly<{ after?: string; before?: string; limit?: number }>, + options: { + readonly table: string; + readonly naturalOrder: "ASC" | "DESC"; + readonly baseWhere: string; + readonly baseParams: readonly unknown[]; + readonly mapRow: (row: unknown) => T; + }, + ): Promise> { const limit = params.limit ?? DEFAULT_PAGINATION_PAGE_SIZE; const { after, before } = params; + const cursor = decodeListCursor(params); + + const reversedOrder = options.naturalOrder === "ASC" ? "DESC" : "ASC"; + const effectiveOrder = before ? reversedOrder : options.naturalOrder; + const cursorOp = effectiveOrder === "ASC" ? ">" : "<"; + + const whereClause = cursor + ? `${options.baseWhere} AND ("created_at", "id") ${cursorOp} (?, ?)` + : options.baseWhere; + const queryParams: unknown[] = [ + ...options.baseParams, + ...(cursor ? [cursor.createdAt.toISOString(), cursor.id] : []), + limit + 1, + ]; + + const query = ` + SELECT * + FROM "${options.table}" + WHERE ${whereClause} + ORDER BY "created_at" ${effectiveOrder}, "id" ${effectiveOrder} + LIMIT ? + `; - let cursor: Cursor | null = null; - if (after) { - cursor = decodeCursor(after); - } else if (before) { - cursor = decodeCursor(before); - } - - const order = before - ? `ORDER BY "created_at" DESC, "id" DESC` - : `ORDER BY "created_at" ASC, "id" ASC`; - - let query: string; - let queryParams: (string | number)[]; - - if (cursor) { - const op = after ? ">" : "<"; - query = ` - SELECT * - FROM "step_attempts" - WHERE "namespace_id" = ? - AND "workflow_run_id" = ? - AND ("created_at", "id") ${op} (?, ?) - ${order} - LIMIT ? - `; - queryParams = [ - this.namespaceId, - params.workflowRunId, - cursor.createdAt.toISOString(), - cursor.id, - limit + 1, - ]; - } else { - query = ` - SELECT * - FROM "step_attempts" - WHERE "namespace_id" = ? - AND "workflow_run_id" = ? - ${order} - LIMIT ? - `; - queryParams = [this.namespaceId, params.workflowRunId, limit + 1]; - } - - const stmt = this.db.prepare(query); - const rawRows = stmt.all(...queryParams); - - if (!Array.isArray(rawRows)) { - return Promise.resolve({ - data: [], - pagination: { next: null, prev: null }, - }); - } - - const rows = rawRows.map((row) => rowToStepAttempt(row as StepAttemptRow)); + const rawRows = this.db.prepare(query).all(...queryParams); + const rows = rawRows.map((row) => options.mapRow(row)); return Promise.resolve( - this.processPaginationResults(rows, limit, !!after, !!before), + buildPaginatedResponse(rows, limit, !!after, !!before), ); } - private processPaginationResults( - rows: T[], - limit: number, - hasAfter: boolean, - hasBefore: boolean, - ): PaginatedResponse { - const data = rows; - let hasNext = false; - let hasPrev = false; - - if (hasBefore) { - data.reverse(); - if (data.length > limit) { - hasPrev = true; - data.shift(); - } - hasNext = true; - } else { - if (data.length > limit) { - hasNext = true; - data.pop(); - } - if (hasAfter) { - hasPrev = true; - } - } - - const lastItem = data.at(-1); - const nextCursor = hasNext && lastItem ? encodeCursor(lastItem) : null; - const firstItem = data[0]; - const prevCursor = hasPrev && firstItem ? encodeCursor(firstItem) : null; - - return { - data, - pagination: { - next: nextCursor, - prev: prevCursor, - }, - }; - } - async createStepAttempt( params: CreateStepAttemptParams, ): Promise { @@ -1013,7 +961,7 @@ export class BackendSqlite implements Backend { currentTime, currentTime, ) as StepAttemptRow | undefined; - if (!row) throw new Error("Failed to create step attempt"); + requireRow(row, "create step attempt"); return await Promise.resolve(rowToStepAttempt(row)); } @@ -1029,18 +977,7 @@ export class BackendSqlite implements Backend { "child_workflow_run_namespace_id" = ?, "child_workflow_run_id" = ?, "updated_at" = ? - WHERE "namespace_id" = ? - AND "workflow_run_id" = ? - AND "id" = ? - AND "status" = 'running' - AND EXISTS ( - SELECT 1 - FROM "workflow_runs" wr - WHERE wr."namespace_id" = ? - AND wr."id" = ? - AND wr."status" = 'running' - AND wr."worker_id" = ? - ) + WHERE ${RUNNING_STEP_ATTEMPT_OWNED_WHERE} RETURNING * `); @@ -1048,14 +985,9 @@ export class BackendSqlite implements Backend { params.childWorkflowRunNamespaceId, params.childWorkflowRunId, currentTime, - this.namespaceId, - params.workflowRunId, - params.stepAttemptId, - this.namespaceId, - params.workflowRunId, - params.workerId, + ...this.runningStepAttemptOwnedParams(params), ) as StepAttemptRow | undefined; - if (!row) throw new Error("Failed to set step attempt child workflow run"); + requireRow(row, "set step attempt child workflow run"); return await Promise.resolve(rowToStepAttempt(row)); } @@ -1088,18 +1020,7 @@ export class BackendSqlite implements Backend { "error" = NULL, "finished_at" = ?, "updated_at" = ? - WHERE "namespace_id" = ? - AND "workflow_run_id" = ? - AND "id" = ? - AND "status" = 'running' - AND EXISTS ( - SELECT 1 - FROM "workflow_runs" wr - WHERE wr."namespace_id" = ? - AND wr."id" = ? - AND wr."status" = 'running' - AND wr."worker_id" = ? - ) + WHERE ${RUNNING_STEP_ATTEMPT_OWNED_WHERE} RETURNING * `); @@ -1107,14 +1028,9 @@ export class BackendSqlite implements Backend { toJSON(params.output), currentTime, currentTime, - this.namespaceId, - params.workflowRunId, - params.stepAttemptId, - this.namespaceId, - params.workflowRunId, - params.workerId, + ...this.runningStepAttemptOwnedParams(params), ) as StepAttemptRow | undefined; - if (!row) throw new Error("Failed to mark step attempt completed"); + requireRow(row, "mark step attempt completed"); return await Promise.resolve(rowToStepAttempt(row)); } @@ -1130,18 +1046,7 @@ export class BackendSqlite implements Backend { "error" = ?, "finished_at" = ?, "updated_at" = ? - WHERE "namespace_id" = ? - AND "workflow_run_id" = ? - AND "id" = ? - AND "status" = 'running' - AND EXISTS ( - SELECT 1 - FROM "workflow_runs" wr - WHERE wr."namespace_id" = ? - AND wr."id" = ? - AND wr."status" = 'running' - AND wr."worker_id" = ? - ) + WHERE ${RUNNING_STEP_ATTEMPT_OWNED_WHERE} RETURNING * `); @@ -1149,14 +1054,9 @@ export class BackendSqlite implements Backend { toJSON(params.error), currentTime, currentTime, - this.namespaceId, - params.workflowRunId, - params.stepAttemptId, - this.namespaceId, - params.workflowRunId, - params.workerId, + ...this.runningStepAttemptOwnedParams(params), ) as StepAttemptRow | undefined; - if (!row) throw new Error("Failed to mark step attempt failed"); + requireRow(row, "mark step attempt failed"); return await Promise.resolve(rowToStepAttempt(row)); } @@ -1208,12 +1108,15 @@ interface StepAttemptRow { // Conversion functions /** - * Convert a database row to a WorkflowRun. - * @param row - Workflow run row - * @returns Workflow run - * @throws {Error} If required fields are missing + * Parse and validate the `created_at`, `updated_at`, and `config` fields + * shared by workflow run and step attempt rows. + * @param row - Row with timestamp and config columns + * @returns Parsed createdAt/updatedAt dates and decoded config + * @throws {Error} If any required field is missing */ -function rowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { +function parseRequiredRowFields( + row: Readonly<{ created_at: string; updated_at: string; config: string }>, +): { createdAt: Date; updatedAt: Date; config: unknown } { const createdAt = fromISO(row.created_at); const updatedAt = fromISO(row.updated_at); const config = fromJSON(row.config); @@ -1222,6 +1125,18 @@ function rowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { if (!updatedAt) throw new Error("updatedAt is required"); if (config === null) throw new Error("config is required"); + return { createdAt, updatedAt, config }; +} + +/** + * Convert a database row to a WorkflowRun. + * @param row - Workflow run row + * @returns Workflow run + * @throws {Error} If required fields are missing + */ +function rowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { + const { createdAt, updatedAt, config } = parseRequiredRowFields(row); + return { namespaceId: row.namespace_id, id: row.id, @@ -1254,13 +1169,7 @@ function rowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { * @throws {Error} If required fields are missing */ function rowToStepAttempt(row: StepAttemptRow): StepAttempt { - const createdAt = fromISO(row.created_at); - const updatedAt = fromISO(row.updated_at); - const config = fromJSON(row.config); - - if (!createdAt) throw new Error("createdAt is required"); - if (!updatedAt) throw new Error("updatedAt is required"); - if (config === null) throw new Error("config is required"); + const { createdAt, updatedAt, config } = parseRequiredRowFields(row); return { namespaceId: row.namespace_id, diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 82f67d08..b3031b02 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -7,7 +7,11 @@ import { } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; import type { StandardSchemaV1 } from "../core/standard-schema.js"; -import type { StepAttempt } from "../core/step-attempt.js"; +import type { + StepAttempt, + StepAttemptContext, + StepKind, +} from "../core/step-attempt.js"; import { normalizeStepOutput, calculateDateFromDuration, @@ -375,6 +379,53 @@ class StepExecutor implements StepApi { this.executionFence.assertActive(); } + /** + * Persist a new step attempt after asserting the execution fence and step + * budget, and record it on the history. + * @param stepName - Resolved step name + * @param kind - Step kind + * @param context - Step attempt context, or null when not applicable + * @returns The newly created step attempt + */ + private async createNewStepAttempt( + stepName: string, + kind: StepKind, + context: StepAttemptContext | null, + ): Promise { + this.assertExecutionActive(); + this.history.ensureCanRecordNewAttempt(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind, + config: {}, + context, + }); + this.history.recordNewAttempt(attempt); + return attempt; + } + + /** + * Persist completion for a step attempt and record it on the history. + * @param stepAttemptId - Step attempt id to complete + * @param output - Step output payload (or null) + * @returns The completed step attempt + */ + private async completeStepAttemptAndRecord( + stepAttemptId: string, + output: JsonValue | null, + ): Promise { + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId, + workerId: this.workerId, + output, + }); + this.history.recordCompletion(completed); + return completed; + } + // ---- step.run ----------------------------------------------------------- async run( @@ -389,28 +440,14 @@ class StepExecutor implements StepApi { return existingAttempt.output as Output; } - this.assertExecutionActive(); - this.history.ensureCanRecordNewAttempt(); - const attempt = await this.backend.createStepAttempt({ - workflowRunId: this.workflowRunId, - workerId: this.workerId, - stepName, - kind: "function", - config: {}, - context: null, - }); - this.history.recordNewAttempt(attempt); + const attempt = await this.createNewStepAttempt(stepName, "function", null); try { const result = await fn(); - const output = normalizeStepOutput(result); - const savedAttempt = await this.backend.completeStepAttempt({ - workflowRunId: this.workflowRunId, - stepAttemptId: attempt.id, - workerId: this.workerId, - output, - }); - this.history.recordCompletion(savedAttempt); + const savedAttempt = await this.completeStepAttemptAndRecord( + attempt.id, + normalizeStepOutput(result), + ); return savedAttempt.output as Output; } catch (error) { return this.failStepWithError( @@ -434,19 +471,12 @@ class StepExecutor implements StepApi { throw result.error; } const resumeAt = result.value; - const context = createSleepContext(resumeAt); - this.assertExecutionActive(); - this.history.ensureCanRecordNewAttempt(); - const attempt = await this.backend.createStepAttempt({ - workflowRunId: this.workflowRunId, - workerId: this.workerId, + await this.createNewStepAttempt( stepName, - kind: "sleep", - config: {}, - context, - }); - this.history.recordNewAttempt(attempt); + "sleep", + createSleepContext(resumeAt), + ); // Sleep attempts are not marked completed here — that happens when the // workflow resumes. @@ -508,17 +538,11 @@ class StepExecutor implements StepApi { // First encounter — create the workflow step and child workflow run const timeoutAt = resolveWaitTimeoutAt(request.timeout); - this.assertExecutionActive(); - this.history.ensureCanRecordNewAttempt(); - const attempt = await this.backend.createStepAttempt({ - workflowRunId: this.workflowRunId, - workerId: this.workerId, + const attempt = await this.createNewStepAttempt( stepName, - kind: "workflow", - config: {}, - context: createWorkflowContext(timeoutAt), - }); - this.history.recordNewAttempt(attempt); + "workflow", + createWorkflowContext(timeoutAt), + ); const linkedAttempt = await this.linkChildWorkflowRun( attempt, @@ -589,13 +613,10 @@ class StepExecutor implements StepApi { // Child completed successfully — propagate result if (childRun.status === "completed" || childRun.status === "succeeded") { - const completed = await this.backend.completeStepAttempt({ - workflowRunId: this.workflowRunId, - stepAttemptId: workflowAttempt.id, - workerId: this.workerId, - output: childRun.output, - }); - this.history.recordCompletion(completed); + const completed = await this.completeStepAttemptAndRecord( + workflowAttempt.id, + childRun.output, + ); return completed.output as Output; } @@ -690,9 +711,7 @@ class StepExecutor implements StepApi { error: unknown, retryPolicy: RetryPolicy, ): Promise { - if (!this.executionFence.isActive()) { - throw new StaleExecutionBranchError(); - } + this.assertExecutionActive(); let failedAttempt: StepAttempt; try { @@ -703,9 +722,7 @@ class StepExecutor implements StepApi { error: serializeError(error), }); } catch (stepFailError) { - if (!this.executionFence.isActive()) { - throw new StaleExecutionBranchError(); - } + this.assertExecutionActive(); throw stepFailError; } @@ -728,9 +745,7 @@ class StepExecutor implements StepApi { throw error; } - if (!this.executionFence.isActive()) { - throw new StaleExecutionBranchError(); - } + this.assertExecutionActive(); return await this.failStepWithError( stepName, @@ -763,17 +778,11 @@ class StepExecutor implements StepApi { return await this.resolveSignalSend(stepName, runningAttempt, options); } - this.assertExecutionActive(); - this.history.ensureCanRecordNewAttempt(); - const attempt = await this.backend.createStepAttempt({ - workflowRunId: this.workflowRunId, - workerId: this.workerId, + const attempt = await this.createNewStepAttempt( stepName, - kind: "signal-send", - config: {}, - context: null, - }); - this.history.recordNewAttempt(attempt); + "signal-send", + null, + ); return await this.resolveSignalSend(stepName, attempt, options); } @@ -790,13 +799,9 @@ class StepExecutor implements StepApi { idempotencyKey: buildSignalIdempotencyKey(this.workflowRunId, stepName), }); - const completed = await this.backend.completeStepAttempt({ - workflowRunId: this.workflowRunId, - stepAttemptId: attempt.id, - workerId: this.workerId, - output: { ...result }, + const completed = await this.completeStepAttemptAndRecord(attempt.id, { + ...result, }); - this.history.recordCompletion(completed); return completed.output as { workflowRunIds: string[] }; } catch (error) { return await this.failStepWithError( @@ -847,17 +852,11 @@ class StepExecutor implements StepApi { } const timeoutAt = resolveWaitTimeoutAt(options.timeout); - this.assertExecutionActive(); - this.history.ensureCanRecordNewAttempt(); - const attempt = await this.backend.createStepAttempt({ - workflowRunId: this.workflowRunId, - workerId: this.workerId, + const attempt = await this.createNewStepAttempt( stepName, - kind: "signal-wait", - config: {}, - context: createSignalWaitContext(options.signal, timeoutAt), - }); - this.history.recordNewAttempt(attempt); + "signal-wait", + createSignalWaitContext(options.signal, timeoutAt), + ); return await this.resolveSignalWait(stepName, attempt, options); } @@ -920,13 +919,10 @@ class StepExecutor implements StepApi { attempt: Readonly, output: { data: Output } | null, ): Promise<{ data: Output } | null> { - const completed = await this.backend.completeStepAttempt({ - workflowRunId: this.workflowRunId, - stepAttemptId: attempt.id, - workerId: this.workerId, - output: output as JsonValue | null, - }); - this.history.recordCompletion(completed); + const completed = await this.completeStepAttemptAndRecord( + attempt.id, + output as JsonValue | null, + ); return completed.output as { data: Output } | null; } } @@ -1012,6 +1008,30 @@ export async function executeWorkflow( }); } + /** + * Fail the workflow run with the given error and retry policy. Shared by the + * step-limit, step-error, and catch-all failure branches, which only differ + * in the error payload and retry policy. + * @param error - Serialized error payload + * @param retryPolicy - Retry policy to evaluate + * @returns Promise resolved when the transition completes + */ + function failRun( + error: SerializedError, + retryPolicy: RetryPolicy, + ): Promise { + return runTransition(() => + backend.failWorkflowRun({ + workflowRunId: workflowRun.id, + workerId, + error, + retryPolicy, + attempts: workflowRun.attempts, + deadlineAt: workflowRun.deadlineAt, + }), + ); + } + try { // load all pages of step history const attempts = await listAllStepAttemptsForWorkflowRun( @@ -1084,15 +1104,9 @@ export async function executeWorkflow( } if (error instanceof StepLimitExceededError) { - await runTransition(() => - backend.failWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - error: serializeStepLimitExceededError(error), - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, - attempts: workflowRun.attempts, - deadlineAt: workflowRun.deadlineAt, - }), + await failRun( + serializeStepLimitExceededError(error), + DEFAULT_WORKFLOW_RETRY_POLICY, ); return; } @@ -1109,16 +1123,7 @@ export async function executeWorkflow( ); if (retryDecision.status === "failed") { - await runTransition(() => - backend.failWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - error: serializedError, - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, - attempts: workflowRun.attempts, - deadlineAt: workflowRun.deadlineAt, - }), - ); + await failRun(serializedError, DEFAULT_WORKFLOW_RETRY_POLICY); return; } @@ -1149,15 +1154,6 @@ export async function executeWorkflow( } // mark failure - await runTransition(() => - backend.failWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - error: serializeError(error), - retryPolicy: params.retryPolicy, - attempts: workflowRun.attempts, - deadlineAt: workflowRun.deadlineAt, - }), - ); + await failRun(serializeError(error), params.retryPolicy); } }