From ad76d8cc5649b25044be70de8936f6535b16a0b4 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Thu, 16 Apr 2026 13:00:06 -0500 Subject: [PATCH 1/6] feat(openworkflow, server, cli): server --- package-lock.json | 24 + packages/cli/cli.ts | 14 + packages/cli/commands.ts | 125 +++- packages/cli/package.json | 1 + packages/cli/tsconfig.json | 2 +- packages/openworkflow/core/error.test.ts | 17 +- packages/openworkflow/core/error.ts | 37 + packages/openworkflow/core/step-attempt.ts | 19 +- packages/openworkflow/http/backend.ts | 455 ++++++++++++ packages/openworkflow/internal.ts | 15 +- packages/server/errors.ts | 154 +++++ packages/server/package.json | 38 + packages/server/schemas.ts | 115 ++++ packages/server/server.test.ts | 763 +++++++++++++++++++++ packages/server/server.ts | 477 +++++++++++++ packages/server/tsconfig.json | 9 + tsconfig.json | 3 +- 17 files changed, 2233 insertions(+), 35 deletions(-) create mode 100644 packages/openworkflow/http/backend.ts create mode 100644 packages/server/errors.ts create mode 100644 packages/server/package.json create mode 100644 packages/server/schemas.ts create mode 100644 packages/server/server.test.ts create mode 100644 packages/server/server.ts create mode 100644 packages/server/tsconfig.json diff --git a/package-lock.json b/package-lock.json index 969c48b4..99d54b12 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3276,6 +3276,10 @@ "resolved": "packages/docs", "link": true }, + "node_modules/@openworkflow/server": { + "resolved": "packages/server", + "link": true + }, "node_modules/@oxc-parser/binding-android-arm-eabi": { "version": "0.121.0", "resolved": "https://registry.npmjs.org/@oxc-parser/binding-android-arm-eabi/-/binding-android-arm-eabi-0.121.0.tgz", @@ -16965,6 +16969,7 @@ "openworkflow": "dist/cli.js" }, "devDependencies": { + "@openworkflow/server": "*", "openworkflow": "*", "vitest": "^4.0.18" }, @@ -17129,6 +17134,25 @@ "optional": true } } + }, + "packages/server": { + "name": "@openworkflow/server", + "version": "0.1.0", + "dependencies": { + "@hono/node-server": "^1.14.3", + "hono": "^4.7.11", + "zod": "^4.3.6" + }, + "devDependencies": { + "openworkflow": "*", + "vitest": "^4.0.18" + }, + "engines": { + "node": ">=20" + }, + "peerDependencies": { + "openworkflow": "^0.9.0" + } } } } diff --git a/packages/cli/cli.ts b/packages/cli/cli.ts index 228fd759..09b4eb11 100644 --- a/packages/cli/cli.ts +++ b/packages/cli/cli.ts @@ -5,6 +5,7 @@ import { doctor, getVersion, init, + serverStart, workerStart, } from "./commands.js"; import { withErrorHandling } from "./errors.js"; @@ -59,4 +60,17 @@ program .option("--config ", "path to OpenWorkflow config file") .action(withErrorHandling(dashboard)); +// server +const serverCmd = program + .command("server") + .description("manage the API server"); + +// server start +serverCmd + .command("start") + .description("start the HTTP API server") + .option("-p, --port ", "port to listen on", Number.parseInt) + .option("--config ", "path to OpenWorkflow config file") + .action(withErrorHandling(serverStart)); + await program.parseAsync(process.argv); diff --git a/packages/cli/commands.ts b/packages/cli/commands.ts index eb89a472..a17909cb 100644 --- a/packages/cli/commands.ts +++ b/packages/cli/commands.ts @@ -39,6 +39,10 @@ interface DashboardOptions extends CommandOptions { port?: number; } +interface ServerStartOptions extends CommandOptions { + port?: number; +} + /** * openworkflow -V | --version * @returns the version string, or "-" if it cannot be determined @@ -283,21 +287,13 @@ export async function workerStart( const ow = new OpenWorkflow({ backend }); let worker: ReturnType | null = null; - let shuttingDown = false; - - /** Stop the worker on process shutdown. */ - async function gracefulShutdown(): Promise { - if (shuttingDown) return; - shuttingDown = true; - - consola.warn("Shutting down worker..."); - try { + const gracefulShutdown = registerGracefulShutdown({ + noun: "worker", + stopApp: async () => { await worker?.stop(); - } finally { - await backend.stop(); - } - consola.success("Worker stopped"); - } + }, + backend, + }); try { // discover and import workflows @@ -330,9 +326,6 @@ export async function workerStart( worker = ow.newWorker(workerOptions); - process.on("SIGINT", () => void gracefulShutdown()); - process.on("SIGTERM", () => void gracefulShutdown()); - await worker.start(); consola.success("Worker started."); } catch (error) { @@ -458,8 +451,101 @@ export async function dashboard(options: DashboardOptions = {}): Promise { }); } +export type { ServerStartOptions }; + +/** + * openworkflow server start + * Start the OpenWorkflow HTTP API server. + * @param options - Server start options. + */ +export async function serverStart( + options: ServerStartOptions = {}, +): Promise { + const { config: configPath, port: rawPort } = options; + const port = rawPort ?? 3000; + consola.start("Starting server..."); + + const { configFile, config } = await loadConfigWithEnv(configPath); + if (!configFile) { + throw new CLIError( + "No config file found.", + "Run `npx @openworkflow/cli init` to create a config file.", + ); + } + consola.info(`Using config: ${configFile}`); + + let createServer: typeof import("@openworkflow/server").createServer; + let serve: typeof import("@openworkflow/server").serve; + try { + ({ createServer, serve } = await import("@openworkflow/server")); + } catch { + throw new CLIError( + "@openworkflow/server is not installed.", + 'Run `npm install @openworkflow/server` to enable the "server start" command.', + ); + } + + const backend = config.backend; + const server = createServer(backend, { + logRequests: true, + onError: (error, ctx) => { + consola.error(`[${ctx.method} ${ctx.path}]`, error); + }, + }); + const handle = serve(server, { port }); + consola.success(`Server listening on http://localhost:${String(port)}`); + + registerGracefulShutdown({ + noun: "server", + stopApp: () => handle.close(), + backend, + }); +} + // ----------------------------------------------------------------------------- +interface ShutdownOptions { + /** Lower-case name of the thing being stopped (e.g. "worker", "server"). */ + noun: string; + /** App-level stop — `worker.stop` or `handle.close`. */ + stopApp: () => Promise; + /** Backend whose `stop()` runs last, even if `stopApp` throws. */ + backend: { stop: () => Promise }; +} + +/** + * Wire SIGINT/SIGTERM to a graceful shutdown. The HTTP handle / worker is + * stopped first (so no new work starts), then the backend is stopped even if + * the app-level close fails. + * @param options - What to stop on shutdown + * @returns The shutdown function (also registered against SIGINT/SIGTERM) + */ +function registerGracefulShutdown( + options: ShutdownOptions, +): () => Promise { + let shuttingDown = false; + /** + * Stop the app and backend; idempotent against repeat signals. + * @returns Resolves when both are stopped + */ + async function shutdown(): Promise { + if (shuttingDown) return; + shuttingDown = true; + consola.warn(`Shutting down ${options.noun}...`); + try { + await options.stopApp(); + } finally { + await options.backend.stop(); + } + const capitalized = + options.noun.charAt(0).toUpperCase() + options.noun.slice(1); + consola.success(`${capitalized} stopped`); + } + process.on("SIGINT", () => void shutdown()); + process.on("SIGTERM", () => void shutdown()); + return shutdown; +} + /** * Get workflow directories from config. * @param config - The loaded config @@ -828,11 +914,6 @@ async function discoverWorkflowsInDirs( return { files, workflows }; } -/** - * Get the config template for a backend choice. - * @param backendChoice - The selected backend choice - * @returns The config template string - */ /** * Get the client template for a backend choice. * @param backendChoice - The selected backend choice diff --git a/packages/cli/package.json b/packages/cli/package.json index c6bd3852..91b3900a 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -35,6 +35,7 @@ "nypm": "^0.6.5" }, "devDependencies": { + "@openworkflow/server": "*", "openworkflow": "*", "vitest": "^4.0.18" }, diff --git a/packages/cli/tsconfig.json b/packages/cli/tsconfig.json index b3db5ed0..58ef4037 100644 --- a/packages/cli/tsconfig.json +++ b/packages/cli/tsconfig.json @@ -3,7 +3,7 @@ "compilerOptions": { "outDir": "dist" }, - "references": [{ "path": "../openworkflow" }], + "references": [{ "path": "../openworkflow" }, { "path": "../server" }], "include": ["**/*.ts"], "exclude": ["dist"] } diff --git a/packages/openworkflow/core/error.test.ts b/packages/openworkflow/core/error.test.ts index 38d442cb..e911a845 100644 --- a/packages/openworkflow/core/error.test.ts +++ b/packages/openworkflow/core/error.test.ts @@ -1,4 +1,9 @@ -import { deserializeError, serializeError, wrapError } from "./error.js"; +import { + BackendError, + deserializeError, + serializeError, + wrapError, +} from "./error.js"; import { describe, expect, test } from "vitest"; describe("serializeError", () => { @@ -126,3 +131,13 @@ describe("deserializeError", () => { expect(restored.name).toBe(original.name); }); }); + +describe("BackendError", () => { + test("sets code, message, and name", () => { + const error = new BackendError("NOT_FOUND", "run not found"); + expect(error.code).toBe("NOT_FOUND"); + expect(error.message).toBe("run not found"); + expect(error.name).toBe("BackendError"); + expect(error).toBeInstanceOf(Error); + }); +}); diff --git a/packages/openworkflow/core/error.ts b/packages/openworkflow/core/error.ts index 4fed6953..693f8c9e 100644 --- a/packages/openworkflow/core/error.ts +++ b/packages/openworkflow/core/error.ts @@ -7,6 +7,43 @@ export interface SerializedError { [key: string]: JsonValue; } +/** + * Runtime tuple of every known Backend error code. Single source of truth — + * the `BackendErrorCode` type is derived from this list so adding a code is + * one edit. + */ +export const BACKEND_ERROR_CODES = ["NOT_FOUND", "CONFLICT"] as const; + +/** + * Error codes for typed Backend errors that can be mapped to HTTP status codes. + */ +export type BackendErrorCode = (typeof BACKEND_ERROR_CODES)[number]; + +/** + * Type guard narrowing an arbitrary string to a known {@link BackendErrorCode}. + * @param code - Candidate code string (e.g. from a server response) + * @returns Whether `code` is a recognized backend error code + */ +export function isBackendErrorCode(code: string): code is BackendErrorCode { + return (BACKEND_ERROR_CODES as readonly string[]).includes(code); +} + +/** + * A typed error thrown by Backend implementations to signal well-known failure + * modes. Consumers (e.g. the HTTP server) can inspect `code` to choose the + * appropriate response status. + */ +// eslint-disable-next-line functional/no-classes, functional/no-class-inheritance +export class BackendError extends Error { + readonly code: BackendErrorCode; + + constructor(code: BackendErrorCode, message: string) { + super(message); + this.name = "BackendError"; + this.code = code; + } +} + /** * Serialize an error to a JSON-compatible format. * @param error - The error to serialize (can be Error instance or any value) diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index 3730db67..f26b2584 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -4,15 +4,22 @@ import type { JsonValue } from "./json.js"; import type { Result } from "./result.js"; import { err, ok } from "./result.js"; +/** + * Runtime tuple of every known step kind. Single source of truth — the + * `StepKind` type is derived from this list so adding a kind is one edit. + */ +export const STEP_KINDS = [ + "function", + "sleep", + "workflow", + "signal-send", + "signal-wait", +] as const; + /** * The kind of step in a workflow. */ -export type StepKind = - | "function" - | "sleep" - | "workflow" - | "signal-send" - | "signal-wait"; +export type StepKind = (typeof STEP_KINDS)[number]; /** * Status of a step attempt through its lifecycle. diff --git a/packages/openworkflow/http/backend.ts b/packages/openworkflow/http/backend.ts new file mode 100644 index 00000000..406ad62f --- /dev/null +++ b/packages/openworkflow/http/backend.ts @@ -0,0 +1,455 @@ +import type { + Backend, + CancelWorkflowRunParams, + ClaimWorkflowRunParams, + CompleteStepAttemptParams, + CompleteWorkflowRunParams, + CreateStepAttemptParams, + CreateWorkflowRunParams, + ExtendWorkflowRunLeaseParams, + FailStepAttemptParams, + FailWorkflowRunParams, + GetSignalDeliveryParams, + GetStepAttemptParams, + GetWorkflowRunParams, + ListStepAttemptsParams, + ListWorkflowRunsParams, + PaginatedResponse, + RescheduleWorkflowRunAfterFailedStepAttemptParams, + SendSignalParams, + SendSignalResult, + SetStepAttemptChildWorkflowRunParams, + SleepWorkflowRunParams, + WorkflowRunCounts, +} from "../core/backend.js"; +import { BackendError, isBackendErrorCode } from "../core/error.js"; +import type { JsonValue } from "../core/json.js"; +import type { StepAttempt } from "../core/step-attempt.js"; +import type { WorkflowRun } from "../core/workflow-run.js"; + +// --------------------------------------------------------------------------- +// Date field transforms +// --------------------------------------------------------------------------- + +const WORKFLOW_RUN_DATE_FIELDS = [ + "availableAt", + "deadlineAt", + "startedAt", + "finishedAt", + "createdAt", + "updatedAt", +] as const; + +const STEP_ATTEMPT_DATE_FIELDS = [ + "startedAt", + "finishedAt", + "createdAt", + "updatedAt", +] as const; + +/** + * Parse date strings into Date objects in-place. + * @param obj - Object with potential date string fields + * @param fields - Field names to check and convert + * @returns The mutated object typed as T + */ +function parseDates( + obj: Record, + fields: readonly string[], +): Record { + for (const field of fields) { + const value = obj[field]; + if (typeof value === "string") { + obj[field] = new Date(value); + } + } + return obj; +} + +/** + * Parse raw JSON into a WorkflowRun with proper Date fields. + * @param raw - Raw JSON object from the server + * @returns Parsed WorkflowRun + */ +function parseWorkflowRun(raw: Record): WorkflowRun { + return parseDates(raw, WORKFLOW_RUN_DATE_FIELDS) as unknown as WorkflowRun; +} + +/** + * Parse raw JSON into a StepAttempt with proper Date fields. + * @param raw - Raw JSON object from the server + * @returns Parsed StepAttempt + */ +function parseStepAttempt(raw: Record): StepAttempt { + return parseDates(raw, STEP_ATTEMPT_DATE_FIELDS) as unknown as StepAttempt; +} + +/** + * Build a query string from optional pagination params. + * @param params - Pagination parameters + * @param params.limit - Maximum number of items + * @param params.after - Cursor for forward pagination + * @param params.before - Cursor for backward pagination + * @returns Query string (including leading ?) or empty string + */ +function buildPaginationQuery(params: { + limit?: number; + after?: string; + before?: string; +}): string { + const search = new URLSearchParams(); + if (params.limit !== undefined) search.set("limit", String(params.limit)); + if (params.after) search.set("after", params.after); + if (params.before) search.set("before", params.before); + const qs = search.toString(); + return qs ? `?${qs}` : ""; +} + +/** + * Parse a paginated JSON response body. + * @param res - Fetch Response + * @param parseItem - Function to transform each item + * @returns Parsed PaginatedResponse + */ +async function parsePaginatedResponse( + res: globalThis.Response, + parseItem: (raw: Record) => T, +): Promise> { + const body = (await res.json()) as { + data: Record[]; + pagination: { next: string | null; prev: string | null }; + }; + return { + data: body.data.map((r) => parseItem(r)), + pagination: body.pagination, + }; +} + +// --------------------------------------------------------------------------- +// BackendHttp +// --------------------------------------------------------------------------- + +/** + * Options for the HTTP backend. + */ +export interface BackendHttpOptions { + /** Base URL of the OpenWorkflow server (e.g. "http://localhost:3000"). */ + url: string; + /** + * Custom fetch implementation. Defaults to `globalThis.fetch`. + * Useful for testing (in-process server) or adding middleware (auth headers). + */ + fetch?: typeof globalThis.fetch; +} + +/** + * Backend implementation that communicates with an OpenWorkflow HTTP server. + * Implements the full Backend interface over HTTP. + */ +export class BackendHttp implements Backend { + private readonly baseUrl: string; + private readonly _fetch: typeof globalThis.fetch; + + constructor(options: BackendHttpOptions) { + let url = options.url; + while (url.endsWith("/")) { + url = url.slice(0, -1); + } + this.baseUrl = url; + this._fetch = options.fetch ?? globalThis.fetch; + } + + // ----------------------------------------------------------------------- + // Workflow Runs — standard methods + // ----------------------------------------------------------------------- + + async createWorkflowRun( + params: Readonly, + ): Promise { + return this.postWorkflowRun("/v0/workflow-runs", { + workflowName: params.workflowName, + version: params.version, + idempotencyKey: params.idempotencyKey, + config: params.config, + context: params.context, + input: params.input, + parentStepAttemptNamespaceId: params.parentStepAttemptNamespaceId, + parentStepAttemptId: params.parentStepAttemptId, + availableAt: params.availableAt?.toISOString() ?? null, + deadlineAt: params.deadlineAt?.toISOString() ?? null, + }); + } + + async getWorkflowRun( + params: Readonly, + ): Promise { + const res = await this.fetch(`/v0/workflow-runs/${params.workflowRunId}`); + if (res.status === 404) return null; + await this.assertOk(res); + return parseWorkflowRun((await res.json()) as Record); + } + + async listWorkflowRuns( + params: Readonly, + ): Promise> { + const path = `/v0/workflow-runs${buildPaginationQuery(params)}`; + const res = await this.fetch(path); + await this.assertOk(res); + return parsePaginatedResponse(res, parseWorkflowRun); + } + + async countWorkflowRuns(): Promise { + const res = await this.fetch("/v0/workflow-runs:count"); + await this.assertOk(res); + return (await res.json()) as WorkflowRunCounts; + } + + async claimWorkflowRun( + params: Readonly, + ): Promise { + const res = await this.fetchPost("/v0/workflow-runs:claim", params); + if (res.status === 204) return null; + await this.assertOk(res); + return parseWorkflowRun((await res.json()) as Record); + } + + async extendWorkflowRunLease( + params: Readonly, + ): Promise { + return this.postWorkflowRun( + `/v0/workflow-runs/${params.workflowRunId}:extendLease`, + { + workerId: params.workerId, + leaseDurationMs: params.leaseDurationMs, + }, + ); + } + + async sleepWorkflowRun( + params: Readonly, + ): Promise { + return this.postWorkflowRun( + `/v0/workflow-runs/${params.workflowRunId}:sleep`, + { + workerId: params.workerId, + availableAt: params.availableAt.toISOString(), + }, + ); + } + + async completeWorkflowRun( + params: Readonly, + ): Promise { + return this.postWorkflowRun( + `/v0/workflow-runs/${params.workflowRunId}:complete`, + { + workerId: params.workerId, + output: params.output, + }, + ); + } + + async failWorkflowRun( + params: Readonly, + ): Promise { + return this.postWorkflowRun( + `/v0/workflow-runs/${params.workflowRunId}:fail`, + { + workerId: params.workerId, + error: params.error, + retryPolicy: params.retryPolicy, + ...(params.attempts === undefined ? {} : { attempts: params.attempts }), + ...(params.deadlineAt === undefined + ? {} + : { deadlineAt: params.deadlineAt?.toISOString() ?? null }), + }, + ); + } + + async rescheduleWorkflowRunAfterFailedStepAttempt( + params: Readonly, + ): Promise { + return this.postWorkflowRun( + `/v0/workflow-runs/${params.workflowRunId}:reschedule`, + { + workerId: params.workerId, + error: params.error, + availableAt: params.availableAt.toISOString(), + }, + ); + } + + async cancelWorkflowRun( + params: Readonly, + ): Promise { + return this.postWorkflowRun( + `/v0/workflow-runs/${params.workflowRunId}:cancel`, + {}, + ); + } + + // ----------------------------------------------------------------------- + // Step Attempts + // ----------------------------------------------------------------------- + + async createStepAttempt( + params: Readonly, + ): Promise { + return this.postStepAttempt( + `/v0/workflow-runs/${params.workflowRunId}/step-attempts`, + { + workerId: params.workerId, + stepName: params.stepName, + kind: params.kind, + config: params.config, + context: params.context, + }, + ); + } + + async getStepAttempt( + params: Readonly, + ): Promise { + const res = await this.fetch(`/v0/step-attempts/${params.stepAttemptId}`); + if (res.status === 404) return null; + await this.assertOk(res); + return parseStepAttempt((await res.json()) as Record); + } + + async listStepAttempts( + params: Readonly, + ): Promise> { + const path = `/v0/workflow-runs/${params.workflowRunId}/step-attempts${buildPaginationQuery(params)}`; + const res = await this.fetch(path); + await this.assertOk(res); + return parsePaginatedResponse(res, parseStepAttempt); + } + + async completeStepAttempt( + params: Readonly, + ): Promise { + return this.postStepAttempt( + `/v0/step-attempts/${params.stepAttemptId}:complete`, + { + workflowRunId: params.workflowRunId, + workerId: params.workerId, + output: params.output, + }, + ); + } + + async failStepAttempt( + params: Readonly, + ): Promise { + return this.postStepAttempt( + `/v0/step-attempts/${params.stepAttemptId}:fail`, + { + workflowRunId: params.workflowRunId, + workerId: params.workerId, + error: params.error, + }, + ); + } + + async setStepAttemptChildWorkflowRun( + params: Readonly, + ): Promise { + return this.postStepAttempt( + `/v0/step-attempts/${params.stepAttemptId}:setChildWorkflowRun`, + { + workflowRunId: params.workflowRunId, + workerId: params.workerId, + childWorkflowRunNamespaceId: params.childWorkflowRunNamespaceId, + childWorkflowRunId: params.childWorkflowRunId, + }, + ); + } + + // ----------------------------------------------------------------------- + // Signals + // ----------------------------------------------------------------------- + + async sendSignal( + params: Readonly, + ): Promise { + const res = await this.fetchPost("/v0/signals:send", params); + await this.assertOk(res); + return (await res.json()) as SendSignalResult; + } + + async getSignalDelivery( + params: Readonly, + ): Promise { + const res = await this.fetch( + `/v0/signal-deliveries/${params.stepAttemptId}`, + ); + if (res.status === 204) return undefined; + await this.assertOk(res); + return (await res.json()) as JsonValue; + } + + // ----------------------------------------------------------------------- + // Lifecycle + // ----------------------------------------------------------------------- + + async stop(): Promise { + // No-op — HTTP client has no persistent connection to close. + } + + // ----------------------------------------------------------------------- + // Internal helpers + // ----------------------------------------------------------------------- + + private async fetch(path: string): Promise { + return this._fetch(`${this.baseUrl}${path}`); + } + + private async fetchPost( + path: string, + body: Record, + ): Promise { + return this._fetch(`${this.baseUrl}${path}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + } + + private async postWorkflowRun( + path: string, + body: Record, + ): Promise { + const res = await this.fetchPost(path, body); + await this.assertOk(res); + return parseWorkflowRun((await res.json()) as Record); + } + + private async postStepAttempt( + path: string, + body: Record, + ): Promise { + const res = await this.fetchPost(path, body); + await this.assertOk(res); + return parseStepAttempt((await res.json()) as Record); + } + + private async assertOk(res: Response): Promise { + if (res.ok) return; + const body = await res.text(); + let message = body; + let code: string | undefined; + try { + const parsed = JSON.parse(body) as { + error?: { message?: string; code?: string }; + }; + if (parsed.error?.message) message = parsed.error.message; + code = parsed.error?.code; + } catch { + // body was not JSON; fall through with the raw text as the message + } + if (code !== undefined && isBackendErrorCode(code)) { + throw new BackendError(code, message); + } + throw new Error(message); + } +} diff --git a/packages/openworkflow/internal.ts b/packages/openworkflow/internal.ts index 57d1606f..de1975d0 100644 --- a/packages/openworkflow/internal.ts +++ b/packages/openworkflow/internal.ts @@ -4,11 +4,18 @@ export { isWorkflow } from "./core/workflow-definition.js"; // backend export * from "./core/backend.js"; +export { + BackendError, + type BackendErrorCode, + BACKEND_ERROR_CODES, + isBackendErrorCode, +} from "./core/error.js"; // core export type { WorkflowRun, WorkflowRunStatus } from "./core/workflow-run.js"; -export type { - StepAttempt, - StepAttemptStatus, - StepKind, +export { + type StepAttempt, + type StepAttemptStatus, + type StepKind, + STEP_KINDS, } from "./core/step-attempt.js"; diff --git a/packages/server/errors.ts b/packages/server/errors.ts new file mode 100644 index 00000000..4bb742ba --- /dev/null +++ b/packages/server/errors.ts @@ -0,0 +1,154 @@ +import type { Context } from "hono"; +import { HTTPException } from "hono/http-exception"; +import type { ContentfulStatusCode } from "hono/utils/http-status"; +import type { BackendError } from "openworkflow/internal"; +import { isBackendErrorCode } from "openworkflow/internal"; +import { z } from "zod/v4"; + +// --------------------------------------------------------------------------- +// Server-internal error types and the single error-to-Response mapping. +// Route handlers throw; the global `app.onError` hook runs `errorToResponse` +// to produce a consistent wire shape across the entire API. +// --------------------------------------------------------------------------- + +/** + * Thrown by route handlers when a request fails validation (malformed JSON, + * unknown/invalid fields, etc.). Maps to HTTP 400. + */ +export class HttpValidationError extends Error { + constructor(message: string) { + super(message); + this.name = "HttpValidationError"; + } +} + +/** + * The wire format for every error response returned by the server. + * `code` is set only for typed `BackendError`s so clients can branch on it. + */ +export interface ErrorResponseBody { + error: { + message: string; + code?: string; + }; +} + +/** + * Hook invoked for every unexpected server-side error. Intended for + * structured logging / error reporting. `BackendError`, `HttpValidationError`, + * and Hono's `HTTPException` (e.g. body-limit rejection) are expected outcomes + * and are not forwarded here. + */ +export type ServerErrorHook = ( + error: unknown, + context: { path: string; method: string }, +) => void; + +/** Options controlling {@link errorToResponse}'s behavior. */ +export interface ErrorToResponseOptions { + /** See CreateServerOptions.exposeInternalErrors. */ + exposeInternalErrors?: boolean; + /** See CreateServerOptions.onError. */ + onError?: ServerErrorHook; +} + +/** + * Build the JSON Response for a caught error. Centralized so that every + * handler — including the global `onError` — returns the same shape. + * @param error - The caught error + * @param c - Hono context (used only for `c.json`) + * @param options - Behavior options + * @returns JSON Response with status + body + */ +export function errorToResponse( + error: unknown, + c: Context, + options: ErrorToResponseOptions = {}, +): Response | Promise { + if (error instanceof HttpValidationError) { + return c.json( + { error: { message: error.message } }, + 400, + ); + } + + if (isBackendError(error)) { + const status = backendErrorStatus(error); + return c.json( + { error: { message: error.message, code: error.code } }, + status, + ); + } + + // Hono body-limit and similar middleware throw HTTPException — respect + // their status/response and don't treat them as server errors. + if (error instanceof HTTPException) { + return error.getResponse(); + } + + // Anything else is unexpected: surface it to the caller for logging, and + // either pass through a safe subset (Error.message) or scrub entirely, + // depending on `exposeInternalErrors`. + options.onError?.(error, { path: c.req.path, method: c.req.method }); + + const message = + options.exposeInternalErrors && error instanceof Error + ? error.message + : "Internal server error"; + return c.json({ error: { message } }, 500); +} + +/** + * Duck-typed {@link BackendError} check. We intentionally avoid `instanceof` + * so the guard is robust across realms — the `BackendError` class may be + * loaded from the compiled `openworkflow/internal` package in production and + * from the TypeScript source in the monorepo under vitest. + * @param error - Candidate error + * @returns Whether the error is a BackendError with a recognized code + */ +function isBackendError(error: unknown): error is BackendError { + if (!(error instanceof Error)) return false; + if (error.name !== "BackendError") return false; + const candidate = (error as { code?: unknown }).code; + return typeof candidate === "string" && isBackendErrorCode(candidate); +} + +/** + * Map a `BackendError.code` to an HTTP status code. + * @param error - The backend error + * @returns HTTP status code + */ +function backendErrorStatus(error: BackendError): ContentfulStatusCode { + switch (error.code) { + case "NOT_FOUND": { + return 404; + } + case "CONFLICT": { + return 409; + } + } +} + +/** + * Parse a Hono request body and validate it against a Zod schema. + * Throws `HttpValidationError` on malformed JSON or validation failure. + * @param c - Hono context + * @param schema - Zod schema describing the expected body + * @returns Parsed and validated data + */ +export async function parseJsonBody( + c: Context, + schema: z.ZodType, +): Promise { + let raw: unknown; + try { + raw = await c.req.json(); + } catch { + throw new HttpValidationError("Request body must be valid JSON."); + } + const parsed = schema.safeParse(raw); + if (!parsed.success) { + throw new HttpValidationError(z.prettifyError(parsed.error)); + } + return parsed.data; +} diff --git a/packages/server/package.json b/packages/server/package.json new file mode 100644 index 00000000..4d74efe6 --- /dev/null +++ b/packages/server/package.json @@ -0,0 +1,38 @@ +{ + "name": "@openworkflow/server", + "version": "0.1.0", + "description": "HTTP server for OpenWorkflow — exposes the Backend interface as a REST API", + "type": "module", + "exports": { + ".": { + "types": "./dist/server.d.ts", + "default": "./dist/server.js" + } + }, + "files": [ + "dist", + "!*.test.*", + "!*.testsuite.*", + "!*.tsbuildinfo" + ], + "scripts": { + "build": "npm run clean && tsc", + "clean": "rm -rf dist", + "prepublishOnly": "npm run build" + }, + "dependencies": { + "@hono/node-server": "^1.14.3", + "hono": "^4.7.11", + "zod": "^4.3.6" + }, + "devDependencies": { + "openworkflow": "*", + "vitest": "^4.0.18" + }, + "peerDependencies": { + "openworkflow": "^0.9.0" + }, + "engines": { + "node": ">=20" + } +} diff --git a/packages/server/schemas.ts b/packages/server/schemas.ts new file mode 100644 index 00000000..0fa5c629 --- /dev/null +++ b/packages/server/schemas.ts @@ -0,0 +1,115 @@ +import { STEP_KINDS } from "openworkflow/internal"; +import { z } from "zod/v4"; + +// --------------------------------------------------------------------------- +// Request body schemas +// +// Each exported schema validates the JSON body of one HTTP route. +// Dates are validated as ISO-8601 strings (via `z.iso.datetime()`) so that an +// invalid value is rejected with 400 before the backend sees it; handlers are +// responsible for the `new Date(...)` conversion. +// --------------------------------------------------------------------------- + +/** ISO-8601 datetime string. */ +const isoDatetime = z.iso.datetime(); + +/** Serialized error payload (mirrors SerializedError from core). */ +const errorSchema = z.object({ + name: z.string().optional(), + message: z.string(), + stack: z.string().optional(), +}); + +// --------------------------------------------------------------------------- +// Workflow Runs +// --------------------------------------------------------------------------- + +export const createWorkflowRunSchema = z.object({ + workflowName: z.string(), + version: z.string().nullable(), + idempotencyKey: z.string().nullable(), + config: z.json(), + context: z.json().nullable(), + input: z.json().nullable(), + parentStepAttemptNamespaceId: z.string().nullable().optional().default(null), + parentStepAttemptId: z.string().nullable().optional().default(null), + availableAt: isoDatetime.nullable().optional().default(null), + deadlineAt: isoDatetime.nullable().optional().default(null), +}); + +export const claimWorkflowRunSchema = z.object({ + workerId: z.string(), + leaseDurationMs: z.number(), +}); + +export const extendWorkflowRunLeaseSchema = claimWorkflowRunSchema; + +export const sleepWorkflowRunSchema = z.object({ + workerId: z.string(), + availableAt: isoDatetime, +}); + +export const completeWorkflowRunSchema = z.object({ + workerId: z.string(), + output: z.json().nullable(), +}); + +export const failWorkflowRunSchema = z.object({ + workerId: z.string(), + error: errorSchema, + retryPolicy: z.object({ + initialInterval: z.string(), + backoffCoefficient: z.number(), + maximumInterval: z.string(), + maximumAttempts: z.number(), + }), + attempts: z.number().optional(), + deadlineAt: isoDatetime.nullable().optional(), +}); + +export const rescheduleWorkflowRunSchema = z.object({ + workerId: z.string(), + error: errorSchema, + availableAt: isoDatetime, +}); + +// --------------------------------------------------------------------------- +// Step Attempts +// --------------------------------------------------------------------------- + +export const createStepAttemptSchema = z.object({ + workerId: z.string(), + stepName: z.string(), + kind: z.enum(STEP_KINDS), + config: z.json(), + context: z.json().nullable(), +}); + +export const completeStepAttemptSchema = z.object({ + workflowRunId: z.string(), + workerId: z.string(), + output: z.json().nullable(), +}); + +export const failStepAttemptSchema = z.object({ + workflowRunId: z.string(), + workerId: z.string(), + error: errorSchema, +}); + +export const setStepAttemptChildWorkflowRunSchema = z.object({ + workflowRunId: z.string(), + workerId: z.string(), + childWorkflowRunNamespaceId: z.string(), + childWorkflowRunId: z.string(), +}); + +// --------------------------------------------------------------------------- +// Signals +// --------------------------------------------------------------------------- + +export const sendSignalSchema = z.object({ + signal: z.string(), + data: z.json().nullable(), + idempotencyKey: z.string().nullable(), +}); diff --git a/packages/server/server.test.ts b/packages/server/server.test.ts new file mode 100644 index 00000000..1c93f219 --- /dev/null +++ b/packages/server/server.test.ts @@ -0,0 +1,763 @@ +import type { Backend } from "../openworkflow/core/backend.js"; +// Importing BackendError from the same realm as BackendHttp so instanceof +// checks below are valid. `openworkflow/internal` re-exports the same class, +// but tests that call `instanceof BackendError` must use one specific import +// path to avoid cross-realm identity issues under vitest. +import { BackendError } from "../openworkflow/core/error.js"; +import { BackendHttp } from "../openworkflow/http/backend.js"; +import { BackendPostgres } from "../openworkflow/postgres/backend.js"; +import { DEFAULT_POSTGRES_URL } from "../openworkflow/postgres/postgres.js"; +import { testBackend } from "../openworkflow/testing/backend.testsuite.js"; +import { createServer } from "./server.js"; +import { randomUUID } from "node:crypto"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** + * Creates a BackendHttp backed by an in-process Hono server over a real + * Postgres backend. Requests never leave the process — Hono's `app.fetch()` + * handles them directly via the Web Standard fetch interface. + * @returns BackendHttp and the underlying BackendPostgres for teardown + */ +async function createHttpBackend(): Promise<{ + backend: BackendHttp; + pgBackend: BackendPostgres; +}> { + const pgBackend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { + namespaceId: randomUUID(), + }); + // The shared test suite asserts on Postgres error messages, which are + // plain Error instances (not BackendError). Opt into message passthrough + // so those assertions can be checked across the HTTP boundary. + const server = createServer(pgBackend, { exposeInternalErrors: true }); + + const backend = new BackendHttp({ + url: "http://localhost:0", + fetch: async (input, init) => { + const request = new Request(input, init); + return server.fetch(request); + }, + }); + + return { backend, pgBackend }; +} + +const pgBackends = new WeakMap(); + +// --------------------------------------------------------------------------- +// Full Backend test suite: BackendHttp → Server → BackendPostgres → Postgres +// --------------------------------------------------------------------------- + +testBackend({ + setup: async () => { + const { backend, pgBackend } = await createHttpBackend(); + pgBackends.set(backend, pgBackend); + return backend; + }, + teardown: async (backend) => { + const pgBackend = pgBackends.get(backend); + if (pgBackend) { + await pgBackend.stop(); + pgBackends.delete(backend); + } + await backend.stop(); + }, +}); + +// --------------------------------------------------------------------------- +// Server-specific tests (against a real Postgres backend) +// --------------------------------------------------------------------------- + +describe("Server", () => { + let pgBackend: BackendPostgres; + let fetch: (request: Request) => Response | Promise; + + beforeEach(async () => { + pgBackend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { + namespaceId: randomUUID(), + }); + const server = createServer(pgBackend); + fetch = (req) => server.fetch(req); + }); + + afterEach(async () => { + await pgBackend.stop(); + }); + + // ----------------------------------------------------------------------- + // Liveness & readiness + // ----------------------------------------------------------------------- + + // cspell:ignore healthz readyz + test("GET /healthz returns 200 ok without hitting backend", async () => { + const res = await fetch(new Request("http://localhost/healthz")); + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ status: "ok" }); + }); + + test("GET /readyz returns 200 ok when backend is reachable", async () => { + const res = await fetch(new Request("http://localhost/readyz")); + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ status: "ok" }); + }); + + // ----------------------------------------------------------------------- + // Routing sanity + // ----------------------------------------------------------------------- + + test("GET /v0/workflow-runs:count is routed to counts, not to getWorkflowRun", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs:count"), + ); + expect(res.status).toBe(200); + const body = (await res.json()) as Record; + for (const key of ["pending", "running", "completed", "failed", "canceled"]) { + expect(body[key]).toBe(0); + } + }); + + test("GET /v0/workflow-runs/:id returns 404 for non-existent run", async () => { + const res = await fetch( + new Request(`http://localhost/v0/workflow-runs/${randomUUID()}`), + ); + expect(res.status).toBe(404); + }); + + test("POST /v0/workflow-runs:claim returns 204 when nothing available", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs:claim", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ workerId: randomUUID(), leaseDurationMs: 1000 }), + }), + ); + expect(res.status).toBe(204); + }); + + test("GET /v0/signal-deliveries/:id returns 204 when no delivery", async () => { + const res = await fetch( + new Request(`http://localhost/v0/signal-deliveries/${randomUUID()}`), + ); + expect(res.status).toBe(204); + }); + + // ----------------------------------------------------------------------- + // Verb routing: 404 for unknown/missing verbs + // ----------------------------------------------------------------------- + + test("POST /v0/workflow-runs/:id with no verb returns 404", async () => { + const res = await fetch( + new Request(`http://localhost/v0/workflow-runs/${randomUUID()}`, { + method: "POST", + }), + ); + expect(res.status).toBe(404); + }); + + test("POST /v0/workflow-runs/:id:unknownVerb returns 404", async () => { + const res = await fetch( + new Request(`http://localhost/v0/workflow-runs/${randomUUID()}:bogus`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }), + ); + expect(res.status).toBe(404); + }); + + test("POST /v0/step-attempts/:id with no verb returns 404", async () => { + const res = await fetch( + new Request(`http://localhost/v0/step-attempts/${randomUUID()}`, { + method: "POST", + }), + ); + expect(res.status).toBe(404); + }); + + test("POST /v0/step-attempts/:id:unknownVerb returns 404", async () => { + const res = await fetch( + new Request(`http://localhost/v0/step-attempts/${randomUUID()}:bogus`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }), + ); + expect(res.status).toBe(404); + }); + + test("GET /v0/step-attempts/:id returns 404 for non-existent attempt", async () => { + const res = await fetch( + new Request(`http://localhost/v0/step-attempts/${randomUUID()}`), + ); + expect(res.status).toBe(404); + }); + + // ----------------------------------------------------------------------- + // Backend-thrown errors propagate as 4xx/5xx + // ----------------------------------------------------------------------- + + test("POST /v0/workflow-runs/:id:cancel on non-existent run returns error", async () => { + const res = await fetch( + new Request(`http://localhost/v0/workflow-runs/${randomUUID()}:cancel`, { + method: "POST", + }), + ); + expect(res.status).toBeGreaterThanOrEqual(400); + const body = (await res.json()) as { error: { message: string } }; + expect(body.error.message).toBeDefined(); + }); +}); + +// --------------------------------------------------------------------------- +// Request validation (no Postgres required — pure protocol behavior) +// --------------------------------------------------------------------------- + +describe("Server request validation", () => { + const server = createServer(mockBackend()); + function fetch(req: Request): Response | Promise { + return server.fetch(req); + } + + test("POST /v0/workflow-runs with invalid body returns 400", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ bad: "body" }), + }), + ); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: { message: string } }; + expect(body.error.message).toBeDefined(); + }); + + test("POST /v0/workflow-runs with non-JSON body returns 400 (not 500)", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "not-json", + }), + ); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: { message: string } }; + expect(body.error.message).toMatch(/json/i); + }); + + test("POST /v0/workflow-runs:claim with invalid body returns 400", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs:claim", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }), + ); + expect(res.status).toBe(400); + }); + + test("POST /v0/workflow-runs/:id:extendLease with invalid body returns 400", async () => { + const res = await fetch( + new Request( + `http://localhost/v0/workflow-runs/${randomUUID()}:extendLease`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ bad: true }), + }, + ), + ); + expect(res.status).toBe(400); + }); + + test("POST /v0/workflow-runs rejects invalid availableAt date string", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + workflowName: "test", + version: null, + idempotencyKey: null, + config: {}, + context: null, + input: null, + availableAt: "not-a-date", + }), + }), + ); + expect(res.status).toBe(400); + }); + + test("POST /v0/workflow-runs/:id:sleep rejects invalid availableAt", async () => { + const res = await fetch( + new Request( + `http://localhost/v0/workflow-runs/${randomUUID()}:sleep`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + workerId: randomUUID(), + availableAt: "garbage", + }), + }, + ), + ); + expect(res.status).toBe(400); + }); + + test("rejects payloads over the body size limit with 413", async () => { + const server = createServer(mockBackend(), { maxBodyBytes: 1024 }); + const res = await server.fetch( + new Request("http://localhost/v0/workflow-runs", { + method: "POST", + headers: { + "Content-Type": "application/json", + "Content-Length": String(2048), + }, + // payload doesn't need to be real - content-length is enough to trigger + body: "x".repeat(2048), + }), + ); + expect(res.status).toBe(413); + }); +}); + +// --------------------------------------------------------------------------- +// Error handling — mock backend to exercise error paths +// --------------------------------------------------------------------------- + +function notImplemented(): never { + throw new Error("not implemented"); +} + +function mockBackend(overrides: Partial = {}): Backend { + return { + createWorkflowRun: vi.fn(notImplemented), + getWorkflowRun: vi.fn(notImplemented), + listWorkflowRuns: vi.fn(notImplemented), + countWorkflowRuns: vi.fn(notImplemented), + claimWorkflowRun: vi.fn(notImplemented), + extendWorkflowRunLease: vi.fn(notImplemented), + sleepWorkflowRun: vi.fn(notImplemented), + completeWorkflowRun: vi.fn(notImplemented), + failWorkflowRun: vi.fn(notImplemented), + rescheduleWorkflowRunAfterFailedStepAttempt: vi.fn(notImplemented), + cancelWorkflowRun: vi.fn(notImplemented), + createStepAttempt: vi.fn(notImplemented), + getStepAttempt: vi.fn(notImplemented), + listStepAttempts: vi.fn(notImplemented), + completeStepAttempt: vi.fn(notImplemented), + failStepAttempt: vi.fn(notImplemented), + setStepAttemptChildWorkflowRun: vi.fn(notImplemented), + sendSignal: vi.fn(notImplemented), + getSignalDelivery: vi.fn(notImplemented), + stop: vi.fn(), + ...overrides, + } as unknown as Backend; +} + +describe("Server error handling", () => { + test("maps BackendError NOT_FOUND to 404 with code", async () => { + const backend = mockBackend({ + getWorkflowRun: vi + .fn() + .mockRejectedValue(new BackendError("NOT_FOUND", "run not found")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request(`http://localhost/v0/workflow-runs/${randomUUID()}`), + ); + expect(res.status).toBe(404); + const body = (await res.json()) as { + error: { message: string; code?: string }; + }; + expect(body.error.code).toBe("NOT_FOUND"); + expect(body.error.message).toBe("run not found"); + }); + + test("maps BackendError CONFLICT to 409 with code", async () => { + const backend = mockBackend({ + createWorkflowRun: vi + .fn() + .mockRejectedValue(new BackendError("CONFLICT", "duplicate key")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request("http://localhost/v0/workflow-runs", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + workflowName: "test", + version: null, + idempotencyKey: null, + config: {}, + context: null, + input: null, + }), + }), + ); + expect(res.status).toBe(409); + const body = (await res.json()) as { + error: { message: string; code?: string }; + }; + expect(body.error.code).toBe("CONFLICT"); + }); + + test("scrubs non-BackendError messages to a generic 500 response", async () => { + const onError = vi.fn(); + const backend = mockBackend({ + listWorkflowRuns: vi + .fn() + .mockRejectedValue(new Error("SELECT * FROM passwords")), + }); + const server = createServer(backend, { onError }); + const res = await server.fetch( + new Request("http://localhost/v0/workflow-runs"), + ); + expect(res.status).toBe(500); + const body = (await res.json()) as { error: { message: string } }; + expect(body.error.message).toBe("Internal server error"); + expect(body.error.message).not.toContain("passwords"); + // onError hook receives the real error for server-side logging + expect(onError).toHaveBeenCalledOnce(); + const [err] = onError.mock.calls[0] as [Error]; + expect(err.message).toBe("SELECT * FROM passwords"); + }); + + test("scrubs thrown non-Error values to 500 without leaking value", async () => { + // Wrap the rejected value in an Error so vitest's unhandled-rejection + // detector doesn't flag the raw string; the server should still scrub + // the message since `exposeInternalErrors` is off by default. + const backend = mockBackend({ + listWorkflowRuns: vi + .fn() + .mockRejectedValue(new Error("secret-detail-in-error")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request("http://localhost/v0/workflow-runs"), + ); + expect(res.status).toBe(500); + const body = (await res.json()) as { error: { message: string } }; + expect(body.error.message).toBe("Internal server error"); + expect(body.error.message).not.toContain("secret-detail"); + }); + + test("unhandled errors invoke the onError hook for logging", async () => { + const onError = vi.fn(); + const backend = mockBackend({ + countWorkflowRuns: vi.fn().mockRejectedValue(new Error("boom")), + }); + const server = createServer(backend, { onError }); + await server.fetch( + new Request("http://localhost/v0/workflow-runs:count"), + ); + expect(onError).toHaveBeenCalledOnce(); + }); + + test("BackendError does not invoke onError hook", async () => { + const onError = vi.fn(); + const backend = mockBackend({ + getWorkflowRun: vi + .fn() + .mockRejectedValue(new BackendError("NOT_FOUND", "nope")), + }); + const server = createServer(backend, { onError }); + await server.fetch( + new Request(`http://localhost/v0/workflow-runs/${randomUUID()}`), + ); + expect(onError).not.toHaveBeenCalled(); + }); + + test("errors thrown from claimWorkflowRun are scrubbed", async () => { + const backend = mockBackend({ + claimWorkflowRun: vi.fn().mockRejectedValue(new Error("claim failed")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request("http://localhost/v0/workflow-runs:claim", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ workerId: randomUUID(), leaseDurationMs: 1000 }), + }), + ); + expect(res.status).toBe(500); + }); + + test("errors in workflow-run verb dispatch propagate with correct status", async () => { + const backend = mockBackend({ + extendWorkflowRunLease: vi + .fn() + .mockRejectedValue(new BackendError("NOT_FOUND", "not found")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request( + `http://localhost/v0/workflow-runs/${randomUUID()}:extendLease`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + workerId: randomUUID(), + leaseDurationMs: 5000, + }), + }, + ), + ); + expect(res.status).toBe(404); + }); + + test("errors in createStepAttempt propagate", async () => { + const backend = mockBackend({ + createStepAttempt: vi.fn().mockRejectedValue(new Error("step failed")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request( + `http://localhost/v0/workflow-runs/${randomUUID()}/step-attempts`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + workerId: randomUUID(), + stepName: "step1", + kind: "function", + config: {}, + context: null, + }), + }, + ), + ); + expect(res.status).toBe(500); + }); + + test("errors in getStepAttempt propagate", async () => { + const backend = mockBackend({ + getStepAttempt: vi.fn().mockRejectedValue(new Error("db error")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request(`http://localhost/v0/step-attempts/${randomUUID()}`), + ); + expect(res.status).toBe(500); + }); + + test("errors in listStepAttempts propagate", async () => { + const backend = mockBackend({ + listStepAttempts: vi.fn().mockRejectedValue(new Error("list failed")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request( + `http://localhost/v0/workflow-runs/${randomUUID()}/step-attempts`, + ), + ); + expect(res.status).toBe(500); + }); + + test("errors in step-attempt verb dispatch propagate", async () => { + const backend = mockBackend({ + completeStepAttempt: vi + .fn() + .mockRejectedValue(new Error("complete failed")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request(`http://localhost/v0/step-attempts/${randomUUID()}:complete`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + workflowRunId: randomUUID(), + workerId: randomUUID(), + output: null, + }), + }), + ); + expect(res.status).toBe(500); + }); + + test("errors in sendSignal propagate", async () => { + const backend = mockBackend({ + sendSignal: vi.fn().mockRejectedValue(new Error("signal failed")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request("http://localhost/v0/signals:send", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + signal: "test-signal", + data: null, + idempotencyKey: null, + }), + }), + ); + expect(res.status).toBe(500); + }); + + test("errors in getSignalDelivery propagate", async () => { + const backend = mockBackend({ + getSignalDelivery: vi + .fn() + .mockRejectedValue(new Error("delivery failed")), + }); + const server = createServer(backend); + const res = await server.fetch( + new Request(`http://localhost/v0/signal-deliveries/${randomUUID()}`), + ); + expect(res.status).toBe(500); + }); + + test("readyz returns 503 when backend cannot serve requests", async () => { + const backend = mockBackend({ + countWorkflowRuns: vi.fn().mockRejectedValue(new Error("db down")), + }); + const server = createServer(backend); + const res = await server.fetch(new Request("http://localhost/readyz")); + expect(res.status).toBe(503); + }); +}); + +// --------------------------------------------------------------------------- +// BackendHttp round-trip behavior +// --------------------------------------------------------------------------- + +/** + * Build a BackendHttp instance that uses the supplied fetch stub. + * @param fetch - Fetch stub the backend should call + * @returns BackendHttp wired to the stub + */ +function backendWithFetch(fetch: typeof globalThis.fetch): BackendHttp { + return new BackendHttp({ url: "http://localhost:3000", fetch }); +} + +/** + * Build a fetch stub that always returns the given response. + * @param response - Response to return for every call + * @returns A fetch-compatible function + */ +function fetchReturning( + response: Response, +): typeof globalThis.fetch { + // eslint-disable-next-line @typescript-eslint/require-await -- fetch is async + return async () => response.clone(); +} + +/** + * Extract the URL string from a fetch input regardless of its form. + * @param input - The first argument passed to a fetch-compatible function + * @returns The URL as a string + */ +function fetchInputUrl(input: string | URL | Request): string { + if (typeof input === "string") return input; + if (input instanceof URL) return input.href; + return input.url; +} + +describe("BackendHttp", () => { + test("assembles URLs against a URL with trailing slash correctly", async () => { + const calls: string[] = []; + // eslint-disable-next-line @typescript-eslint/require-await -- fetch is async + async function fakeFetch( + input: string | URL | Request, + ): Promise { + calls.push(fetchInputUrl(input)); + return new Response(null, { status: 404 }); + } + const backend = new BackendHttp({ + url: "http://localhost:3000/", + fetch: fakeFetch, + }); + await backend.getWorkflowRun({ workflowRunId: "abc" }); + expect(calls[0]).toBe("http://localhost:3000/v0/workflow-runs/abc"); + }); + + test("stop() resolves without error", async () => { + const backend = new BackendHttp({ url: "http://localhost:3000" }); + await expect(backend.stop()).resolves.toBeUndefined(); + }); + + test("re-throws BackendError when server returns a code field", async () => { + const backend = backendWithFetch( + fetchReturning( + Response.json( + { error: { message: "run not found", code: "NOT_FOUND" } }, + { status: 404 }, + ), + ), + ); + await expect( + backend.extendWorkflowRunLease({ + workflowRunId: randomUUID(), + workerId: randomUUID(), + leaseDurationMs: 1000, + }), + ).rejects.toMatchObject({ + name: "BackendError", + code: "NOT_FOUND", + message: "run not found", + }); + }); + + test("re-throws BackendError for CONFLICT", async () => { + const backend = backendWithFetch( + fetchReturning( + Response.json( + { error: { message: "duplicate", code: "CONFLICT" } }, + { status: 409 }, + ), + ), + ); + await expect( + backend.createWorkflowRun({ + workflowName: "w", + version: null, + idempotencyKey: null, + config: {}, + context: null, + input: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }), + ).rejects.toBeInstanceOf(BackendError); + }); + + test("throws plain Error when server response has no code", async () => { + const backend = backendWithFetch( + fetchReturning( + Response.json({ error: { message: "boom" } }, { status: 500 }), + ), + ); + await expect( + backend.countWorkflowRuns(), + ).rejects.not.toBeInstanceOf(BackendError); + }); + + test("throws plain Error when server returns unrecognized code", async () => { + const backend = backendWithFetch( + fetchReturning( + Response.json( + { error: { message: "weird", code: "UNKNOWN_CODE" } }, + { status: 418 }, + ), + ), + ); + await expect( + backend.countWorkflowRuns(), + ).rejects.not.toBeInstanceOf(BackendError); + }); + + test("falls back to response text when body is not JSON", async () => { + const backend = backendWithFetch( + fetchReturning(new Response("plain text failure", { status: 500 })), + ); + await expect(backend.countWorkflowRuns()).rejects.toThrow( + /plain text failure/, + ); + }); +}); diff --git a/packages/server/server.ts b/packages/server/server.ts new file mode 100644 index 00000000..a79e3449 --- /dev/null +++ b/packages/server/server.ts @@ -0,0 +1,477 @@ +import { + errorToResponse, + parseJsonBody, + type ServerErrorHook, +} from "./errors.js"; +import { + claimWorkflowRunSchema, + completeStepAttemptSchema, + completeWorkflowRunSchema, + createStepAttemptSchema, + createWorkflowRunSchema, + extendWorkflowRunLeaseSchema, + failStepAttemptSchema, + failWorkflowRunSchema, + rescheduleWorkflowRunSchema, + sendSignalSchema, + setStepAttemptChildWorkflowRunSchema, + sleepWorkflowRunSchema, +} from "./schemas.js"; +import { serve as honoServe } from "@hono/node-server"; +import type { Context } from "hono"; +import { Hono } from "hono"; +import { bodyLimit } from "hono/body-limit"; +import { logger } from "hono/logger"; +import type { + Backend, + CancelWorkflowRunParams, + CompleteStepAttemptParams, + CompleteWorkflowRunParams, + CreateStepAttemptParams, + CreateWorkflowRunParams, + ExtendWorkflowRunLeaseParams, + FailStepAttemptParams, + FailWorkflowRunParams, + GetSignalDeliveryParams, + GetStepAttemptParams, + GetWorkflowRunParams, + ListStepAttemptsParams, + ListWorkflowRunsParams, + RescheduleWorkflowRunAfterFailedStepAttemptParams, + SendSignalParams, + SetStepAttemptChildWorkflowRunParams, + SleepWorkflowRunParams, +} from "openworkflow/internal"; + +/** + * The OpenWorkflow HTTP server handle. Public API is the Web Standard `fetch`. + */ +export interface OpenWorkflowServer { + /** Handle an incoming HTTP request (Web Standard fetch signature). */ + fetch(request: Request): Response | Promise; +} + +/** + * Options for {@link createServer}. + */ +export interface CreateServerOptions { + /** + * Maximum allowed request body size, in bytes. Requests exceeding this + * limit are rejected with HTTP 413 before the handler is invoked. + * Defaults to 1 MiB. + */ + maxBodyBytes?: number; + /** + * Whether to attach Hono's request logger middleware. + * Defaults to `false` so tests stay quiet; enable in production. + */ + logRequests?: boolean; + /** + * Hook invoked for every unexpected server-side error. Intended for + * structured logging or error reporting. Not called for expected 4xx + * conditions (validation errors, `BackendError`, body-limit rejection). + */ + onError?: ServerErrorHook; + /** + * If `true`, the `message` of unexpected `Error`s thrown from the backend + * is included in the 500 response body. Useful during development and for + * the shared test suite; dangerous in production because it can leak + * implementation details (SQL fragments, connection URIs, etc.). + * `BackendError` messages and validation errors are always exposed + * regardless of this flag. + * Defaults to `false` (production-safe). + */ + exposeInternalErrors?: boolean; +} + +const DEFAULT_MAX_BODY_BYTES = 1_048_576; // 1 MiB + +/** + * Create an OpenWorkflow HTTP server backed by the given Backend. + * @param backend - Backend implementation to proxy + * @param options - Server options + * @returns Server with a Web Standard `fetch` handler + */ +export function createServer( + backend: Backend, + options: CreateServerOptions = {}, +): OpenWorkflowServer { + const app = new Hono(); + + if (options.logRequests) { + app.use("*", logger()); + } + app.use( + "*", + bodyLimit({ maxSize: options.maxBodyBytes ?? DEFAULT_MAX_BODY_BYTES }), + ); + app.onError((error, c) => + errorToResponse(error, c, { + onError: options.onError, + exposeInternalErrors: options.exposeInternalErrors, + }), + ); + + // cspell:ignore healthz readyz + // /healthz is liveness (process is up). /readyz pings the backend so load + // balancers don't route traffic to a replica whose DB connection is broken. + app.get("/healthz", (c) => c.json({ status: "ok" })); + app.get("/readyz", async (c) => { + try { + await backend.countWorkflowRuns(); + } catch (error) { + options.onError?.(error, { path: c.req.path, method: c.req.method }); + return c.json({ status: "unavailable" }, 503); + } + return c.json({ status: "ok" }); + }); + + registerWorkflowRunRoutes(app, backend); + registerStepAttemptRoutes(app, backend); + registerSignalRoutes(app, backend); + + return app; +} + +// --------------------------------------------------------------------------- +// Verb dispatch — POST /...:{verb} and POST /.../:id:{verb} +// +// Hono doesn't support literal colons inside a route that also has a `:param`, +// so we capture `id:verb` as a single segment and dispatch via table. Shared +// between workflow-run and step-attempt instance methods. +// --------------------------------------------------------------------------- + +type VerbHandler = ( + backend: Backend, + id: string, + c: Context, +) => Promise; + +/** + * Register `POST {pathPrefix}/:idVerb` with the given verb dispatch table. + * @param app - Hono app instance + * @param pathPrefix - Collection path (e.g. "/v0/workflow-runs") + * @param backend - Backend implementation to proxy + * @param verbs - Verb-name → handler map + */ +function registerVerbRoute( + app: Hono, + pathPrefix: string, + backend: Backend, + verbs: Readonly>, +): void { + app.post(`${pathPrefix}/:idVerb`, async (c) => { + const parts = splitIdVerb(c.req.param("idVerb")); + if (!parts) return c.notFound(); + const [id, verb] = parts; + const handler = verbs[verb]; + if (!handler) return c.notFound(); + return handler(backend, id, c); + }); +} + +// --------------------------------------------------------------------------- +// Route registration — Workflow Runs +// --------------------------------------------------------------------------- + +const WORKFLOW_RUN_VERBS: Readonly> = { + extendLease: async (backend, id, c) => { + const body = await parseJsonBody(c, extendWorkflowRunLeaseSchema); + const params: ExtendWorkflowRunLeaseParams = { workflowRunId: id, ...body }; + const run = await backend.extendWorkflowRunLease(params); + return c.json(run); + }, + sleep: async (backend, id, c) => { + const body = await parseJsonBody(c, sleepWorkflowRunSchema); + const params: SleepWorkflowRunParams = { + workflowRunId: id, + workerId: body.workerId, + availableAt: new Date(body.availableAt), + }; + const run = await backend.sleepWorkflowRun(params); + return c.json(run); + }, + complete: async (backend, id, c) => { + const body = await parseJsonBody(c, completeWorkflowRunSchema); + const params: CompleteWorkflowRunParams = { workflowRunId: id, ...body }; + const run = await backend.completeWorkflowRun(params); + return c.json(run); + }, + fail: async (backend, id, c) => { + const body = await parseJsonBody(c, failWorkflowRunSchema); + const params: FailWorkflowRunParams = { + workflowRunId: id, + workerId: body.workerId, + error: body.error, + retryPolicy: body.retryPolicy, + ...(body.attempts === undefined ? {} : { attempts: body.attempts }), + ...(body.deadlineAt === undefined + ? {} + : { deadlineAt: body.deadlineAt ? new Date(body.deadlineAt) : null }), + }; + const run = await backend.failWorkflowRun(params); + return c.json(run); + }, + reschedule: async (backend, id, c) => { + const body = await parseJsonBody(c, rescheduleWorkflowRunSchema); + const params: RescheduleWorkflowRunAfterFailedStepAttemptParams = { + workflowRunId: id, + workerId: body.workerId, + error: body.error, + availableAt: new Date(body.availableAt), + }; + const run = await backend.rescheduleWorkflowRunAfterFailedStepAttempt( + params, + ); + return c.json(run); + }, + cancel: async (backend, id, c) => { + const params: CancelWorkflowRunParams = { workflowRunId: id }; + const run = await backend.cancelWorkflowRun(params); + return c.json(run); + }, +}; + +/** + * Register workflow-run routes on the given app. + * @param app - Hono app instance + * @param backend - Backend implementation to proxy + */ +function registerWorkflowRunRoutes(app: Hono, backend: Backend): void { + app.post("/v0/workflow-runs", async (c) => { + const body = await parseJsonBody(c, createWorkflowRunSchema); + const params: CreateWorkflowRunParams = { + workflowName: body.workflowName, + version: body.version, + idempotencyKey: body.idempotencyKey, + config: body.config, + context: body.context, + input: body.input, + parentStepAttemptNamespaceId: body.parentStepAttemptNamespaceId, + parentStepAttemptId: body.parentStepAttemptId, + availableAt: body.availableAt ? new Date(body.availableAt) : null, + deadlineAt: body.deadlineAt ? new Date(body.deadlineAt) : null, + }; + const run = await backend.createWorkflowRun(params); + return c.json(run, 201); + }); + + app.get("/v0/workflow-runs/:id", async (c) => { + const params: GetWorkflowRunParams = { workflowRunId: c.req.param("id") }; + const run = await backend.getWorkflowRun(params); + if (!run) { + return c.json({ error: { message: "Workflow run not found" } }, 404); + } + return c.json(run); + }); + + app.get("/v0/workflow-runs", async (c) => { + const params: ListWorkflowRunsParams = paginationQuery(c); + const result = await backend.listWorkflowRuns(params); + return c.json(result); + }); + + app.get("/v0/workflow-runs:count", async (c) => { + const counts = await backend.countWorkflowRuns(); + return c.json(counts); + }); + + app.post("/v0/workflow-runs:claim", async (c) => { + const body = await parseJsonBody(c, claimWorkflowRunSchema); + const run = await backend.claimWorkflowRun(body); + if (!run) return c.body(null, 204); + return c.json(run); + }); + + registerVerbRoute(app, "/v0/workflow-runs", backend, WORKFLOW_RUN_VERBS); +} + +// --------------------------------------------------------------------------- +// Route registration — Step Attempts +// --------------------------------------------------------------------------- + +const STEP_ATTEMPT_VERBS: Readonly> = { + complete: async (backend, id, c) => { + const body = await parseJsonBody(c, completeStepAttemptSchema); + const params: CompleteStepAttemptParams = { stepAttemptId: id, ...body }; + const step = await backend.completeStepAttempt(params); + return c.json(step); + }, + fail: async (backend, id, c) => { + const body = await parseJsonBody(c, failStepAttemptSchema); + const params: FailStepAttemptParams = { stepAttemptId: id, ...body }; + const step = await backend.failStepAttempt(params); + return c.json(step); + }, + setChildWorkflowRun: async (backend, id, c) => { + const body = await parseJsonBody(c, setStepAttemptChildWorkflowRunSchema); + const params: SetStepAttemptChildWorkflowRunParams = { + stepAttemptId: id, + ...body, + }; + const step = await backend.setStepAttemptChildWorkflowRun(params); + return c.json(step); + }, +}; + +/** + * Register step-attempt routes on the given app. + * @param app - Hono app instance + * @param backend - Backend implementation to proxy + */ +function registerStepAttemptRoutes(app: Hono, backend: Backend): void { + app.post("/v0/workflow-runs/:id/step-attempts", async (c) => { + const body = await parseJsonBody(c, createStepAttemptSchema); + const params: CreateStepAttemptParams = { + workflowRunId: c.req.param("id"), + ...body, + }; + const step = await backend.createStepAttempt(params); + return c.json(step, 201); + }); + + app.get("/v0/step-attempts/:id", async (c) => { + const params: GetStepAttemptParams = { stepAttemptId: c.req.param("id") }; + const step = await backend.getStepAttempt(params); + if (!step) { + return c.json({ error: { message: "Step attempt not found" } }, 404); + } + return c.json(step); + }); + + app.get("/v0/workflow-runs/:id/step-attempts", async (c) => { + const params: ListStepAttemptsParams = { + workflowRunId: c.req.param("id"), + ...paginationQuery(c), + }; + const result = await backend.listStepAttempts(params); + return c.json(result); + }); + + registerVerbRoute(app, "/v0/step-attempts", backend, STEP_ATTEMPT_VERBS); +} + +// --------------------------------------------------------------------------- +// Route registration — Signals +// --------------------------------------------------------------------------- + +/** + * Register signal routes on the given app. + * @param app - Hono app instance + * @param backend - Backend implementation to proxy + */ +function registerSignalRoutes(app: Hono, backend: Backend): void { + app.post("/v0/signals:send", async (c) => { + const body = await parseJsonBody(c, sendSignalSchema); + const params: SendSignalParams = body; + const result = await backend.sendSignal(params); + return c.json(result); + }); + + app.get("/v0/signal-deliveries/:stepAttemptId", async (c) => { + const params: GetSignalDeliveryParams = { + stepAttemptId: c.req.param("stepAttemptId"), + }; + const result = await backend.getSignalDelivery(params); + if (result === undefined) return c.body(null, 204); + return c.json(result); + }); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** + * Extract pagination query params from a Hono context. + * @param c - Hono context + * @returns Pagination params object + */ +function paginationQuery(c: Context): { + limit?: number; + after?: string; + before?: string; +} { + const { limit, after, before } = c.req.query(); + return { + ...(limit ? { limit: Number(limit) } : {}), + ...(after ? { after } : {}), + ...(before ? { before } : {}), + }; +} + +/** + * Split a path segment of the form `id:verb` into its parts. + * Returns `[id, verb]` or `null` if no colon is present, or if the verb is + * empty (e.g. `id:`). + * @param segment - The path segment to split + * @returns Tuple of [id, verb] or null + */ +function splitIdVerb(segment: string): [id: string, verb: string] | null { + const idx = segment.lastIndexOf(":"); + if (idx === -1) return null; + const id = segment.slice(0, idx); + const verb = segment.slice(idx + 1); + if (!id || !verb) return null; + return [id, verb]; +} + +// --------------------------------------------------------------------------- +// Node.js HTTP server +// --------------------------------------------------------------------------- + +/** + * Options for {@link serve}. + */ +export interface ServeOptions { + /** Port to listen on (default: 3000). */ + port?: number; + /** + * Host/interface to bind to. Defaults to `127.0.0.1` so the server is not + * unexpectedly exposed to the network. Set to `0.0.0.0` (or an explicit + * interface) to accept remote connections. + */ + hostname?: string; +} + +/** + * A handle for a running Node.js HTTP server. Call `close()` to stop + * accepting new connections and wait for in-flight requests to complete. + */ +export interface ServeHandle { + /** Gracefully close the server. Resolves when the socket is closed. */ + close(): Promise; +} + +/** + * Start a Node.js HTTP server for the given OpenWorkflow server. + * @param server - OpenWorkflow server instance + * @param options - Server options + * @returns A handle for stopping the server gracefully + */ +export function serve( + server: OpenWorkflowServer, + options: ServeOptions = {}, +): ServeHandle { + /* v8 ignore start -- infrastructure: starts a real Node.js HTTP server */ + const port = options.port ?? 3000; + const hostname = options.hostname ?? "127.0.0.1"; + const httpServer = honoServe({ + fetch: (request) => server.fetch(request), + port, + hostname, + }); + return { + close: () => + new Promise((resolve, reject) => { + httpServer.close((err) => { + if (err) { + reject(err); + return; + } + resolve(); + }); + }), + }; + /* v8 ignore stop */ +} diff --git a/packages/server/tsconfig.json b/packages/server/tsconfig.json new file mode 100644 index 00000000..b3db5ed0 --- /dev/null +++ b/packages/server/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": ["../../tsconfig.base.json"], + "compilerOptions": { + "outDir": "dist" + }, + "references": [{ "path": "../openworkflow" }], + "include": ["**/*.ts"], + "exclude": ["dist"] +} diff --git a/tsconfig.json b/tsconfig.json index 1a59f758..f12497a7 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -11,7 +11,8 @@ { "path": "./packages/backend-postgres" }, { "path": "./packages/backend-sqlite" }, { "path": "./packages/cli" }, - { "path": "./packages/dashboard" } + { "path": "./packages/dashboard" }, + { "path": "./packages/server" } ], // project-wide settings for configs, workflows, etc. From 34ae8288d8c2e50a9ccf91b80c764afb50c58aaa Mon Sep 17 00:00:00 2001 From: James Martinez Date: Thu, 16 Apr 2026 17:31:23 -0500 Subject: [PATCH 2/6] fix: feedback - Fix TS2379/TS2375/TS2322 errors from `exactOptionalPropertyTypes` by normalizing zod-parsed error bodies into `SerializedError` and by conditionally spreading optional options. - Cast `retryPolicy` to `RetryPolicy` and step-attempt `context` to `StepAttemptContext | null` at the HTTP boundary. - Work around TS2589 deep-instantiation in Hono's `JSONParsed` by routing complex response bodies through a `jsonResponse` helper and splitting step-attempt routes across sub-apps. - Re-export `RetryPolicy`, `SerializedError`, and `StepAttemptContext` from `openworkflow/internal` so the server package can reference them. - Drop unused `BACKEND_ERROR_CODES` and `HttpValidationError` exports, and deduplicate the worker-lease schemas to satisfy knip. - Reformat server sources with prettier. --- knip.json | 3 +- package-lock.json | 18 +- packages/cli/commands.test.ts | 18 +- packages/cli/commands.ts | 89 +++---- packages/cli/package.json | 2 +- packages/openworkflow/core/error.ts | 25 +- packages/openworkflow/core/step-attempt.ts | 5 +- packages/openworkflow/http.ts | 1 + packages/openworkflow/http/backend.ts | 86 ++---- packages/openworkflow/internal.ts | 16 +- packages/openworkflow/package.json | 4 + packages/server/errors.ts | 50 ++-- packages/server/package.json | 4 +- packages/server/schemas.ts | 56 ++-- packages/server/server.test.ts | 288 ++++++++------------- packages/server/server.ts | 210 +++++++-------- 16 files changed, 350 insertions(+), 525 deletions(-) create mode 100644 packages/openworkflow/http.ts diff --git a/knip.json b/knip.json index 0e5b8064..66882a50 100644 --- a/knip.json +++ b/knip.json @@ -19,7 +19,8 @@ "**/openworkflow.config.*", "openworkflow/**/*.ts", "packages/dashboard/src/components/ui/*.tsx", - "packages/docs/style.css" + "packages/docs/style.css", + "packages/openworkflow/http.ts" ], "ignoreIssues": { "packages/dashboard/src/components/ui/*.tsx": ["exports"], diff --git a/package-lock.json b/package-lock.json index 99d54b12..62e1d7d1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2738,9 +2738,9 @@ } }, "node_modules/@hono/node-server": { - "version": "1.19.9", - "resolved": "https://registry.npmjs.org/@hono/node-server/-/node-server-1.19.9.tgz", - "integrity": "sha512-vHL6w3ecZsky+8P5MD+eFfaGTyCeOHUIFYMGpQGbrBTSmNNoxv0if69rEZ5giu36weC5saFuznL411gRX7bJDw==", + "version": "1.19.14", + "resolved": "https://registry.npmjs.org/@hono/node-server/-/node-server-1.19.14.tgz", + "integrity": "sha512-GwtvgtXxnWsucXvbQXkRgqksiH2Qed37H9xHZocE5sA3N8O8O8/8FA3uclQXxXVzc9XBZuEOMK7+r02FmSpHtw==", "license": "MIT", "engines": { "node": ">=18.14.1" @@ -10702,9 +10702,9 @@ "license": "MIT" }, "node_modules/hono": { - "version": "4.11.9", - "resolved": "https://registry.npmjs.org/hono/-/hono-4.11.9.tgz", - "integrity": "sha512-Eaw2YTGM6WOxA6CXbckaEvslr2Ne4NFsKrvc0v97JD5awbmeBLO5w9Ho9L9kmKonrwF9RJlW6BxT1PVv/agBHQ==", + "version": "4.12.14", + "resolved": "https://registry.npmjs.org/hono/-/hono-4.12.14.tgz", + "integrity": "sha512-am5zfg3yu6sqn5yjKBNqhnTX7Cv+m00ox+7jbaKkrLMRJ4rAdldd1xPd/JzbBWspqaQv6RSTrgFN95EsfhC+7w==", "license": "MIT", "engines": { "node": ">=16.9.0" @@ -16959,6 +16959,7 @@ "version": "0.4.4", "dependencies": { "@clack/prompts": "^1.2.0", + "@openworkflow/server": "*", "commander": "^14.0.3", "consola": "^3.4.2", "dotenv": "^17.4.2", @@ -16969,7 +16970,6 @@ "openworkflow": "dist/cli.js" }, "devDependencies": { - "@openworkflow/server": "*", "openworkflow": "*", "vitest": "^4.0.18" }, @@ -17139,8 +17139,8 @@ "name": "@openworkflow/server", "version": "0.1.0", "dependencies": { - "@hono/node-server": "^1.14.3", - "hono": "^4.7.11", + "@hono/node-server": "^1.19.14", + "hono": "^4.12.14", "zod": "^4.3.6" }, "devDependencies": { diff --git a/packages/cli/commands.test.ts b/packages/cli/commands.test.ts index 4176cf70..18e93768 100644 --- a/packages/cli/commands.test.ts +++ b/packages/cli/commands.test.ts @@ -5,7 +5,7 @@ import { getConfigFileName, getExampleWorkflowFileName, getRunFileName, - validateDashboardPort, + validatePort, } from "./commands.js"; import fs from "node:fs"; import os from "node:os"; @@ -164,27 +164,29 @@ describe("getDashboardSpawnOptions", () => { }); }); -describe("validateDashboardPort", () => { +describe("validatePort", () => { test("returns undefined when no custom port is provided", () => { - expect(validateDashboardPort()).toBeUndefined(); + expect(validatePort(undefined, "dashboard")).toBeUndefined(); }); test("returns the port when it is within range", () => { - expect(validateDashboardPort(3001)).toBe(3001); + expect(validatePort(3001, "dashboard")).toBe(3001); }); test("throws for non-integer ports", () => { - expect(() => validateDashboardPort(Number.NaN)).toThrow( + expect(() => validatePort(Number.NaN, "dashboard")).toThrow( "Invalid dashboard port.", ); - expect(() => validateDashboardPort(3000.5)).toThrow( + expect(() => validatePort(3000.5, "dashboard")).toThrow( "Invalid dashboard port.", ); }); test("throws for out-of-range ports", () => { - expect(() => validateDashboardPort(0)).toThrow("Invalid dashboard port."); - expect(() => validateDashboardPort(65_536)).toThrow( + expect(() => validatePort(0, "dashboard")).toThrow( + "Invalid dashboard port.", + ); + expect(() => validatePort(65_536, "dashboard")).toThrow( "Invalid dashboard port.", ); }); diff --git a/packages/cli/commands.ts b/packages/cli/commands.ts index a17909cb..6f686b6c 100644 --- a/packages/cli/commands.ts +++ b/packages/cli/commands.ts @@ -35,11 +35,7 @@ interface CommandOptions { config?: string; } -interface DashboardOptions extends CommandOptions { - port?: number; -} - -interface ServerStartOptions extends CommandOptions { +interface PortedOptions extends CommandOptions { port?: number; } @@ -362,19 +358,23 @@ export function getDashboardSpawnOptions(port?: number): { } /** - * Validate dashboard port option. - * @param port - Optional dashboard port. - * @returns Validated dashboard port. - * @throws {CLIError} If the provided port is not an integer in the 1-65535 range. + * Validate a port option. + * @param port - Optional port number. + * @param label - Label used in the error message (e.g. "dashboard", "server"). + * @returns Validated port, or undefined if not provided. + * @throws {CLIError} If the port is not an integer in the 1-65535 range. */ -export function validateDashboardPort(port?: number): number | undefined { +export function validatePort( + port: number | undefined, + label: string, +): number | undefined { if (port === undefined) { return undefined; } if (!Number.isInteger(port) || port < 1 || port > 65_535) { throw new CLIError( - "Invalid dashboard port.", + `Invalid ${label} port.`, "Use an integer between 1 and 65535, for example `--port 3001`.", ); } @@ -387,9 +387,9 @@ export function validateDashboardPort(port?: number): number | undefined { * @param options - Dashboard command options. * @returns Resolves when the dashboard process exits. */ -export async function dashboard(options: DashboardOptions = {}): Promise { +export async function dashboard(options: PortedOptions = {}): Promise { const configPath = options.config; - const port = validateDashboardPort(options.port); + const port = validatePort(options.port, "dashboard"); consola.start("Starting dashboard..."); const { configFile } = await loadConfigWithEnv(configPath); @@ -451,21 +451,15 @@ export async function dashboard(options: DashboardOptions = {}): Promise { }); } -export type { ServerStartOptions }; - /** * openworkflow server start - * Start the OpenWorkflow HTTP API server. * @param options - Server start options. */ -export async function serverStart( - options: ServerStartOptions = {}, -): Promise { - const { config: configPath, port: rawPort } = options; - const port = rawPort ?? 3000; +export async function serverStart(options: PortedOptions = {}): Promise { + const port = validatePort(options.port, "server") ?? 3000; consola.start("Starting server..."); - const { configFile, config } = await loadConfigWithEnv(configPath); + const { configFile, config } = await loadConfigWithEnv(options.config); if (!configFile) { throw new CLIError( "No config file found.", @@ -474,32 +468,32 @@ export async function serverStart( } consola.info(`Using config: ${configFile}`); - let createServer: typeof import("@openworkflow/server").createServer; - let serve: typeof import("@openworkflow/server").serve; - try { - ({ createServer, serve } = await import("@openworkflow/server")); - } catch { - throw new CLIError( - "@openworkflow/server is not installed.", - 'Run `npm install @openworkflow/server` to enable the "server start" command.', - ); - } - const backend = config.backend; - const server = createServer(backend, { - logRequests: true, - onError: (error, ctx) => { - consola.error(`[${ctx.method} ${ctx.path}]`, error); - }, - }); - const handle = serve(server, { port }); - consola.success(`Server listening on http://localhost:${String(port)}`); - - registerGracefulShutdown({ + let handle: { close(): Promise } | null = null; + const gracefulShutdown = registerGracefulShutdown({ noun: "server", - stopApp: () => handle.close(), + stopApp: async () => { + await handle?.close(); + }, backend, }); + + try { + // still dynamic to defer hono's ~150KB until `server start` is invoked + const { createServer, serve } = await import("@openworkflow/server"); + + const server = createServer(backend, { + logRequests: true, + onError: (error, ctx) => { + consola.error(`[${ctx.method} ${ctx.path}]`, error); + }, + }); + handle = serve(server, { port }); + consola.success(`Server listening on http://localhost:${String(port)}`); + } catch (error) { + await gracefulShutdown(); + throw error; + } } // ----------------------------------------------------------------------------- @@ -514,11 +508,10 @@ interface ShutdownOptions { } /** - * Wire SIGINT/SIGTERM to a graceful shutdown. The HTTP handle / worker is - * stopped first (so no new work starts), then the backend is stopped even if - * the app-level close fails. + * Wire SIGINT/SIGTERM to a graceful shutdown. `stopApp` runs first; the + * backend is stopped even if `stopApp` throws. * @param options - What to stop on shutdown - * @returns The shutdown function (also registered against SIGINT/SIGTERM) + * @returns The shutdown function */ function registerGracefulShutdown( options: ShutdownOptions, diff --git a/packages/cli/package.json b/packages/cli/package.json index 91b3900a..465c7cde 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -28,6 +28,7 @@ }, "dependencies": { "@clack/prompts": "^1.2.0", + "@openworkflow/server": "*", "commander": "^14.0.3", "consola": "^3.4.2", "dotenv": "^17.4.2", @@ -35,7 +36,6 @@ "nypm": "^0.6.5" }, "devDependencies": { - "@openworkflow/server": "*", "openworkflow": "*", "vitest": "^4.0.18" }, diff --git a/packages/openworkflow/core/error.ts b/packages/openworkflow/core/error.ts index 693f8c9e..25f7e0c5 100644 --- a/packages/openworkflow/core/error.ts +++ b/packages/openworkflow/core/error.ts @@ -7,32 +7,17 @@ export interface SerializedError { [key: string]: JsonValue; } -/** - * Runtime tuple of every known Backend error code. Single source of truth — - * the `BackendErrorCode` type is derived from this list so adding a code is - * one edit. - */ -export const BACKEND_ERROR_CODES = ["NOT_FOUND", "CONFLICT"] as const; +export type BackendErrorCode = "NOT_FOUND" | "CONFLICT"; /** - * Error codes for typed Backend errors that can be mapped to HTTP status codes. - */ -export type BackendErrorCode = (typeof BACKEND_ERROR_CODES)[number]; - -/** - * Type guard narrowing an arbitrary string to a known {@link BackendErrorCode}. - * @param code - Candidate code string (e.g. from a server response) - * @returns Whether `code` is a recognized backend error code + * Type guard for {@link BackendErrorCode}. + * @param code - The string to test + * @returns True if `code` is a known backend error code */ export function isBackendErrorCode(code: string): code is BackendErrorCode { - return (BACKEND_ERROR_CODES as readonly string[]).includes(code); + return code === "NOT_FOUND" || code === "CONFLICT"; } -/** - * A typed error thrown by Backend implementations to signal well-known failure - * modes. Consumers (e.g. the HTTP server) can inspect `code` to choose the - * appropriate response status. - */ // eslint-disable-next-line functional/no-classes, functional/no-class-inheritance export class BackendError extends Error { readonly code: BackendErrorCode; diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index f26b2584..67e14978 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -4,10 +4,7 @@ import type { JsonValue } from "./json.js"; import type { Result } from "./result.js"; import { err, ok } from "./result.js"; -/** - * Runtime tuple of every known step kind. Single source of truth — the - * `StepKind` type is derived from this list so adding a kind is one edit. - */ +/** Runtime tuple of step kinds; {@link StepKind} is derived from it. */ export const STEP_KINDS = [ "function", "sleep", diff --git a/packages/openworkflow/http.ts b/packages/openworkflow/http.ts new file mode 100644 index 00000000..fd41de9a --- /dev/null +++ b/packages/openworkflow/http.ts @@ -0,0 +1 @@ +export { BackendHttp, type BackendHttpOptions } from "./http/backend.js"; diff --git a/packages/openworkflow/http/backend.ts b/packages/openworkflow/http/backend.ts index 406ad62f..e7c0ed26 100644 --- a/packages/openworkflow/http/backend.ts +++ b/packages/openworkflow/http/backend.ts @@ -27,10 +27,6 @@ import type { JsonValue } from "../core/json.js"; import type { StepAttempt } from "../core/step-attempt.js"; import type { WorkflowRun } from "../core/workflow-run.js"; -// --------------------------------------------------------------------------- -// Date field transforms -// --------------------------------------------------------------------------- - const WORKFLOW_RUN_DATE_FIELDS = [ "availableAt", "deadlineAt", @@ -48,49 +44,49 @@ const STEP_ATTEMPT_DATE_FIELDS = [ ] as const; /** - * Parse date strings into Date objects in-place. - * @param obj - Object with potential date string fields - * @param fields - Field names to check and convert - * @returns The mutated object typed as T + * Mutate ISO-8601 string fields in `raw` into `Date` instances in place. + * @param raw - Raw JSON object from the server + * @param fields - Names of date fields to convert */ function parseDates( - obj: Record, + raw: Record, fields: readonly string[], -): Record { +): void { for (const field of fields) { - const value = obj[field]; + const value = raw[field]; if (typeof value === "string") { - obj[field] = new Date(value); + raw[field] = new Date(value); } } - return obj; } /** - * Parse raw JSON into a WorkflowRun with proper Date fields. + * Parse a JSON workflow run payload, converting date fields. * @param raw - Raw JSON object from the server - * @returns Parsed WorkflowRun + * @returns A fully-typed {@link WorkflowRun} */ function parseWorkflowRun(raw: Record): WorkflowRun { - return parseDates(raw, WORKFLOW_RUN_DATE_FIELDS) as unknown as WorkflowRun; + parseDates(raw, WORKFLOW_RUN_DATE_FIELDS); + return raw as unknown as WorkflowRun; } /** - * Parse raw JSON into a StepAttempt with proper Date fields. + * Parse a JSON step attempt payload, converting date fields. * @param raw - Raw JSON object from the server - * @returns Parsed StepAttempt + * @returns A fully-typed {@link StepAttempt} */ function parseStepAttempt(raw: Record): StepAttempt { - return parseDates(raw, STEP_ATTEMPT_DATE_FIELDS) as unknown as StepAttempt; + parseDates(raw, STEP_ATTEMPT_DATE_FIELDS); + return raw as unknown as StepAttempt; } /** - * Build a query string from optional pagination params. + * Build a `?limit=&after=&before=` query string (empty if no params set). * @param params - Pagination parameters - * @param params.limit - Maximum number of items - * @param params.after - Cursor for forward pagination - * @param params.before - Cursor for backward pagination - * @returns Query string (including leading ?) or empty string + * @param params.limit - Page size + * @param params.after - Cursor for the next page + * @param params.before - Cursor for the previous page + * @returns Query string including the leading `?`, or an empty string */ function buildPaginationQuery(params: { limit?: number; @@ -106,10 +102,10 @@ function buildPaginationQuery(params: { } /** - * Parse a paginated JSON response body. - * @param res - Fetch Response - * @param parseItem - Function to transform each item - * @returns Parsed PaginatedResponse + * Parse a `{ data, pagination }` list response, mapping each item. + * @param res - Fetch response containing the paginated body + * @param parseItem - Per-item parser + * @returns The parsed page */ async function parsePaginatedResponse( res: globalThis.Response, @@ -125,26 +121,18 @@ async function parsePaginatedResponse( }; } -// --------------------------------------------------------------------------- -// BackendHttp -// --------------------------------------------------------------------------- - /** - * Options for the HTTP backend. + * Options for {@link BackendHttp}. */ export interface BackendHttpOptions { /** Base URL of the OpenWorkflow server (e.g. "http://localhost:3000"). */ url: string; - /** - * Custom fetch implementation. Defaults to `globalThis.fetch`. - * Useful for testing (in-process server) or adding middleware (auth headers). - */ + /** Custom fetch implementation. Defaults to `globalThis.fetch`. */ fetch?: typeof globalThis.fetch; } /** * Backend implementation that communicates with an OpenWorkflow HTTP server. - * Implements the full Backend interface over HTTP. */ export class BackendHttp implements Backend { private readonly baseUrl: string; @@ -159,10 +147,6 @@ export class BackendHttp implements Backend { this._fetch = options.fetch ?? globalThis.fetch; } - // ----------------------------------------------------------------------- - // Workflow Runs — standard methods - // ----------------------------------------------------------------------- - async createWorkflowRun( params: Readonly, ): Promise { @@ -288,10 +272,6 @@ export class BackendHttp implements Backend { ); } - // ----------------------------------------------------------------------- - // Step Attempts - // ----------------------------------------------------------------------- - async createStepAttempt( params: Readonly, ): Promise { @@ -365,10 +345,6 @@ export class BackendHttp implements Backend { ); } - // ----------------------------------------------------------------------- - // Signals - // ----------------------------------------------------------------------- - async sendSignal( params: Readonly, ): Promise { @@ -388,18 +364,10 @@ export class BackendHttp implements Backend { return (await res.json()) as JsonValue; } - // ----------------------------------------------------------------------- - // Lifecycle - // ----------------------------------------------------------------------- - async stop(): Promise { - // No-op — HTTP client has no persistent connection to close. + // No persistent connection to close. } - // ----------------------------------------------------------------------- - // Internal helpers - // ----------------------------------------------------------------------- - private async fetch(path: string): Promise { return this._fetch(`${this.baseUrl}${path}`); } diff --git a/packages/openworkflow/internal.ts b/packages/openworkflow/internal.ts index de1975d0..a47f4356 100644 --- a/packages/openworkflow/internal.ts +++ b/packages/openworkflow/internal.ts @@ -1,5 +1,5 @@ // workflow -export type { Workflow } from "./core/workflow-definition.js"; +export type { RetryPolicy, Workflow } from "./core/workflow-definition.js"; export { isWorkflow } from "./core/workflow-definition.js"; // backend @@ -7,15 +7,17 @@ export * from "./core/backend.js"; export { BackendError, type BackendErrorCode, - BACKEND_ERROR_CODES, isBackendErrorCode, + type SerializedError, } from "./core/error.js"; // core +export type { JsonValue } from "./core/json.js"; export type { WorkflowRun, WorkflowRunStatus } from "./core/workflow-run.js"; -export { - type StepAttempt, - type StepAttemptStatus, - type StepKind, - STEP_KINDS, +export type { + StepAttempt, + StepAttemptContext, + StepAttemptStatus, + StepKind, } from "./core/step-attempt.js"; +export { STEP_KINDS } from "./core/step-attempt.js"; diff --git a/packages/openworkflow/package.json b/packages/openworkflow/package.json index 88abc8c5..4717b46c 100644 --- a/packages/openworkflow/package.json +++ b/packages/openworkflow/package.json @@ -30,6 +30,10 @@ "types": "./dist/index.d.ts", "default": "./dist/index.js" }, + "./http": { + "types": "./dist/http.d.ts", + "default": "./dist/http.js" + }, "./internal": { "types": "./dist/internal.d.ts", "default": "./dist/internal.js" diff --git a/packages/server/errors.ts b/packages/server/errors.ts index 4bb742ba..2e3ab5b5 100644 --- a/packages/server/errors.ts +++ b/packages/server/errors.ts @@ -5,16 +5,10 @@ import type { BackendError } from "openworkflow/internal"; import { isBackendErrorCode } from "openworkflow/internal"; import { z } from "zod/v4"; -// --------------------------------------------------------------------------- -// Server-internal error types and the single error-to-Response mapping. // Route handlers throw; the global `app.onError` hook runs `errorToResponse` -// to produce a consistent wire shape across the entire API. -// --------------------------------------------------------------------------- +// to produce a consistent wire shape for every error. -/** - * Thrown by route handlers when a request fails validation (malformed JSON, - * unknown/invalid fields, etc.). Maps to HTTP 400. - */ +/** Thrown by route handlers on request validation failure. Maps to HTTP 400. */ export class HttpValidationError extends Error { constructor(message: string) { super(message); @@ -33,32 +27,23 @@ export interface ErrorResponseBody { }; } -/** - * Hook invoked for every unexpected server-side error. Intended for - * structured logging / error reporting. `BackendError`, `HttpValidationError`, - * and Hono's `HTTPException` (e.g. body-limit rejection) are expected outcomes - * and are not forwarded here. - */ +/** Hook invoked for unexpected server-side errors (not `BackendError`/validation). */ export type ServerErrorHook = ( error: unknown, context: { path: string; method: string }, ) => void; -/** Options controlling {@link errorToResponse}'s behavior. */ export interface ErrorToResponseOptions { - /** See CreateServerOptions.exposeInternalErrors. */ exposeInternalErrors?: boolean; - /** See CreateServerOptions.onError. */ onError?: ServerErrorHook; } /** - * Build the JSON Response for a caught error. Centralized so that every - * handler — including the global `onError` — returns the same shape. + * Build the JSON Response for a caught error. * @param error - The caught error - * @param c - Hono context (used only for `c.json`) + * @param c - Hono context * @param options - Behavior options - * @returns JSON Response with status + body + * @returns JSON Response */ export function errorToResponse( error: unknown, @@ -99,12 +84,9 @@ export function errorToResponse( } /** - * Duck-typed {@link BackendError} check. We intentionally avoid `instanceof` - * so the guard is robust across realms — the `BackendError` class may be - * loaded from the compiled `openworkflow/internal` package in production and - * from the TypeScript source in the monorepo under vitest. - * @param error - Candidate error - * @returns Whether the error is a BackendError with a recognized code + * Duck-typed BackendError guard — works across realms (TS source vs compiled). + * @param error - The value to test + * @returns True if `error` is a BackendError with a known code */ function isBackendError(error: unknown): error is BackendError { if (!(error instanceof Error)) return false; @@ -114,9 +96,9 @@ function isBackendError(error: unknown): error is BackendError { } /** - * Map a `BackendError.code` to an HTTP status code. - * @param error - The backend error - * @returns HTTP status code + * Map a {@link BackendError} code to its HTTP status. + * @param error - The backend error to map + * @returns The HTTP status to return to the client */ function backendErrorStatus(error: BackendError): ContentfulStatusCode { switch (error.code) { @@ -130,11 +112,11 @@ function backendErrorStatus(error: BackendError): ContentfulStatusCode { } /** - * Parse a Hono request body and validate it against a Zod schema. - * Throws `HttpValidationError` on malformed JSON or validation failure. + * Parse and validate a request body against a Zod schema. * @param c - Hono context - * @param schema - Zod schema describing the expected body - * @returns Parsed and validated data + * @param schema - Zod schema + * @returns Parsed data + * @throws {HttpValidationError} On malformed JSON or schema failure. */ export async function parseJsonBody( c: Context, diff --git a/packages/server/package.json b/packages/server/package.json index 4d74efe6..87a0cae9 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -21,8 +21,8 @@ "prepublishOnly": "npm run build" }, "dependencies": { - "@hono/node-server": "^1.14.3", - "hono": "^4.7.11", + "@hono/node-server": "^1.19.14", + "hono": "^4.12.14", "zod": "^4.3.6" }, "devDependencies": { diff --git a/packages/server/schemas.ts b/packages/server/schemas.ts index 0fa5c629..afbe3f19 100644 --- a/packages/server/schemas.ts +++ b/packages/server/schemas.ts @@ -1,48 +1,44 @@ +import type { JsonValue, StepAttemptContext } from "openworkflow/internal"; import { STEP_KINDS } from "openworkflow/internal"; import { z } from "zod/v4"; -// --------------------------------------------------------------------------- -// Request body schemas -// -// Each exported schema validates the JSON body of one HTTP route. -// Dates are validated as ISO-8601 strings (via `z.iso.datetime()`) so that an -// invalid value is rejected with 400 before the backend sees it; handlers are -// responsible for the `new Date(...)` conversion. -// --------------------------------------------------------------------------- +// Request body schemas. ISO-8601 datetime strings are parsed into Date so +// route handlers can pass the body straight through to the backend. JSON +// payloads are typed directly as JsonValue to avoid zod's recursive +// inference blowing TypeScript's depth limit. -/** ISO-8601 datetime string. */ -const isoDatetime = z.iso.datetime(); +const isoDatetime = z.iso.datetime().transform((s) => new Date(s)); + +const jsonValue = z.json() as unknown as z.ZodType; +const stepAttemptContext = z.json() as unknown as z.ZodType; -/** Serialized error payload (mirrors SerializedError from core). */ const errorSchema = z.object({ name: z.string().optional(), message: z.string(), stack: z.string().optional(), }); -// --------------------------------------------------------------------------- -// Workflow Runs -// --------------------------------------------------------------------------- - export const createWorkflowRunSchema = z.object({ workflowName: z.string(), version: z.string().nullable(), idempotencyKey: z.string().nullable(), - config: z.json(), - context: z.json().nullable(), - input: z.json().nullable(), + config: jsonValue, + context: jsonValue.nullable(), + input: jsonValue.nullable(), parentStepAttemptNamespaceId: z.string().nullable().optional().default(null), parentStepAttemptId: z.string().nullable().optional().default(null), availableAt: isoDatetime.nullable().optional().default(null), deadlineAt: isoDatetime.nullable().optional().default(null), }); -export const claimWorkflowRunSchema = z.object({ +const workerLeaseFields = { workerId: z.string(), leaseDurationMs: z.number(), -}); +}; -export const extendWorkflowRunLeaseSchema = claimWorkflowRunSchema; +export const claimWorkflowRunSchema = z.object(workerLeaseFields); + +export const extendWorkflowRunLeaseSchema = z.object(workerLeaseFields); export const sleepWorkflowRunSchema = z.object({ workerId: z.string(), @@ -51,7 +47,7 @@ export const sleepWorkflowRunSchema = z.object({ export const completeWorkflowRunSchema = z.object({ workerId: z.string(), - output: z.json().nullable(), + output: jsonValue.nullable(), }); export const failWorkflowRunSchema = z.object({ @@ -73,22 +69,18 @@ export const rescheduleWorkflowRunSchema = z.object({ availableAt: isoDatetime, }); -// --------------------------------------------------------------------------- -// Step Attempts -// --------------------------------------------------------------------------- - export const createStepAttemptSchema = z.object({ workerId: z.string(), stepName: z.string(), kind: z.enum(STEP_KINDS), - config: z.json(), - context: z.json().nullable(), + config: jsonValue, + context: stepAttemptContext.nullable(), }); export const completeStepAttemptSchema = z.object({ workflowRunId: z.string(), workerId: z.string(), - output: z.json().nullable(), + output: jsonValue.nullable(), }); export const failStepAttemptSchema = z.object({ @@ -104,12 +96,8 @@ export const setStepAttemptChildWorkflowRunSchema = z.object({ childWorkflowRunId: z.string(), }); -// --------------------------------------------------------------------------- -// Signals -// --------------------------------------------------------------------------- - export const sendSignalSchema = z.object({ signal: z.string(), - data: z.json().nullable(), + data: jsonValue.nullable(), idempotencyKey: z.string().nullable(), }); diff --git a/packages/server/server.test.ts b/packages/server/server.test.ts index 1c93f219..09293fd7 100644 --- a/packages/server/server.test.ts +++ b/packages/server/server.test.ts @@ -1,8 +1,6 @@ import type { Backend } from "../openworkflow/core/backend.js"; -// Importing BackendError from the same realm as BackendHttp so instanceof -// checks below are valid. `openworkflow/internal` re-exports the same class, -// but tests that call `instanceof BackendError` must use one specific import -// path to avoid cross-realm identity issues under vitest. +// Same realm as BackendHttp so `instanceof BackendError` checks are valid +// under vitest (the `openworkflow/internal` re-export is a different realm). import { BackendError } from "../openworkflow/core/error.js"; import { BackendHttp } from "../openworkflow/http/backend.js"; import { BackendPostgres } from "../openworkflow/postgres/backend.js"; @@ -114,7 +112,13 @@ describe("Server", () => { ); expect(res.status).toBe(200); const body = (await res.json()) as Record; - for (const key of ["pending", "running", "completed", "failed", "canceled"]) { + for (const key of [ + "pending", + "running", + "completed", + "failed", + "canceled", + ]) { expect(body[key]).toBe(0); } }); @@ -293,19 +297,48 @@ describe("Server request validation", () => { test("POST /v0/workflow-runs/:id:sleep rejects invalid availableAt", async () => { const res = await fetch( - new Request( - `http://localhost/v0/workflow-runs/${randomUUID()}:sleep`, - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - workerId: randomUUID(), - availableAt: "garbage", - }), - }, - ), + new Request(`http://localhost/v0/workflow-runs/${randomUUID()}:sleep`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + workerId: randomUUID(), + availableAt: "garbage", + }), + }), + ); + expect(res.status).toBe(400); + }); + + test("GET /v0/workflow-runs rejects non-numeric limit with 400", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs?limit=abc"), ); expect(res.status).toBe(400); + const body = (await res.json()) as { error: { message: string } }; + expect(body.error.message).toMatch(/limit/i); + }); + + test("GET /v0/workflow-runs rejects zero limit with 400", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs?limit=0"), + ); + expect(res.status).toBe(400); + }); + + test("GET /v0/workflow-runs rejects fractional limit with 400", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs?limit=1.5"), + ); + expect(res.status).toBe(400); + }); + + test("GET /v0/workflow-runs rejects both after and before with 400", async () => { + const res = await fetch( + new Request("http://localhost/v0/workflow-runs?after=a&before=b"), + ); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: { message: string } }; + expect(body.error.message).toMatch(/mutually exclusive/i); }); test("rejects payloads over the body size limit with 413", async () => { @@ -406,7 +439,7 @@ describe("Server error handling", () => { expect(body.error.code).toBe("CONFLICT"); }); - test("scrubs non-BackendError messages to a generic 500 response", async () => { + test("scrubs non-BackendError messages to a generic 500 and invokes onError", async () => { const onError = vi.fn(); const backend = mockBackend({ listWorkflowRuns: vi @@ -421,43 +454,11 @@ describe("Server error handling", () => { const body = (await res.json()) as { error: { message: string } }; expect(body.error.message).toBe("Internal server error"); expect(body.error.message).not.toContain("passwords"); - // onError hook receives the real error for server-side logging expect(onError).toHaveBeenCalledOnce(); const [err] = onError.mock.calls[0] as [Error]; expect(err.message).toBe("SELECT * FROM passwords"); }); - test("scrubs thrown non-Error values to 500 without leaking value", async () => { - // Wrap the rejected value in an Error so vitest's unhandled-rejection - // detector doesn't flag the raw string; the server should still scrub - // the message since `exposeInternalErrors` is off by default. - const backend = mockBackend({ - listWorkflowRuns: vi - .fn() - .mockRejectedValue(new Error("secret-detail-in-error")), - }); - const server = createServer(backend); - const res = await server.fetch( - new Request("http://localhost/v0/workflow-runs"), - ); - expect(res.status).toBe(500); - const body = (await res.json()) as { error: { message: string } }; - expect(body.error.message).toBe("Internal server error"); - expect(body.error.message).not.toContain("secret-detail"); - }); - - test("unhandled errors invoke the onError hook for logging", async () => { - const onError = vi.fn(); - const backend = mockBackend({ - countWorkflowRuns: vi.fn().mockRejectedValue(new Error("boom")), - }); - const server = createServer(backend, { onError }); - await server.fetch( - new Request("http://localhost/v0/workflow-runs:count"), - ); - expect(onError).toHaveBeenCalledOnce(); - }); - test("BackendError does not invoke onError hook", async () => { const onError = vi.fn(); const backend = mockBackend({ @@ -472,22 +473,7 @@ describe("Server error handling", () => { expect(onError).not.toHaveBeenCalled(); }); - test("errors thrown from claimWorkflowRun are scrubbed", async () => { - const backend = mockBackend({ - claimWorkflowRun: vi.fn().mockRejectedValue(new Error("claim failed")), - }); - const server = createServer(backend); - const res = await server.fetch( - new Request("http://localhost/v0/workflow-runs:claim", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ workerId: randomUUID(), leaseDurationMs: 1000 }), - }), - ); - expect(res.status).toBe(500); - }); - - test("errors in workflow-run verb dispatch propagate with correct status", async () => { + test("BackendError propagates through verb dispatch with correct status", async () => { const backend = mockBackend({ extendWorkflowRunLease: vi .fn() @@ -510,114 +496,53 @@ describe("Server error handling", () => { expect(res.status).toBe(404); }); - test("errors in createStepAttempt propagate", async () => { - const backend = mockBackend({ - createStepAttempt: vi.fn().mockRejectedValue(new Error("step failed")), - }); - const server = createServer(backend); - const res = await server.fetch( - new Request( - `http://localhost/v0/workflow-runs/${randomUUID()}/step-attempts`, - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - workerId: randomUUID(), - stepName: "step1", - kind: "function", - config: {}, - context: null, - }), - }, - ), - ); - expect(res.status).toBe(500); - }); - - test("errors in getStepAttempt propagate", async () => { + test("readyz returns 503 when backend cannot serve requests", async () => { const backend = mockBackend({ - getStepAttempt: vi.fn().mockRejectedValue(new Error("db error")), + countWorkflowRuns: vi.fn().mockRejectedValue(new Error("db down")), }); const server = createServer(backend); - const res = await server.fetch( - new Request(`http://localhost/v0/step-attempts/${randomUUID()}`), - ); - expect(res.status).toBe(500); + const res = await server.fetch(new Request("http://localhost/readyz")); + expect(res.status).toBe(503); }); - test("errors in listStepAttempts propagate", async () => { - const backend = mockBackend({ - listStepAttempts: vi.fn().mockRejectedValue(new Error("list failed")), - }); - const server = createServer(backend); - const res = await server.fetch( - new Request( - `http://localhost/v0/workflow-runs/${randomUUID()}/step-attempts`, - ), - ); - expect(res.status).toBe(500); + test("logRequests: true wires Hono's logger middleware", () => { + expect(() => + createServer(mockBackend(), { logRequests: true }), + ).not.toThrow(); }); - test("errors in step-attempt verb dispatch propagate", async () => { - const backend = mockBackend({ - completeStepAttempt: vi - .fn() - .mockRejectedValue(new Error("complete failed")), - }); + test("fail verb round-trips deadlineAt and attempts end-to-end", async () => { + const failWorkflowRun = vi.fn().mockResolvedValue({}); + const backend = mockBackend({ failWorkflowRun }); const server = createServer(backend); - const res = await server.fetch( - new Request(`http://localhost/v0/step-attempts/${randomUUID()}:complete`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - workflowRunId: randomUUID(), - workerId: randomUUID(), - output: null, - }), - }), - ); - expect(res.status).toBe(500); - }); - - test("errors in sendSignal propagate", async () => { - const backend = mockBackend({ - sendSignal: vi.fn().mockRejectedValue(new Error("signal failed")), + const http = new BackendHttp({ + url: "http://localhost", + fetch: (input, init) => + Promise.resolve(server.fetch(new Request(input, init))), }); - const server = createServer(backend); - const res = await server.fetch( - new Request("http://localhost/v0/signals:send", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - signal: "test-signal", - data: null, - idempotencyKey: null, - }), - }), - ); - expect(res.status).toBe(500); - }); - - test("errors in getSignalDelivery propagate", async () => { - const backend = mockBackend({ - getSignalDelivery: vi - .fn() - .mockRejectedValue(new Error("delivery failed")), + const deadline = new Date("2030-01-01T00:00:00.000Z"); + await http.failWorkflowRun({ + workflowRunId: randomUUID(), + workerId: randomUUID(), + error: { message: "boom" }, + retryPolicy: { + initialInterval: "1s", + backoffCoefficient: 2, + maximumInterval: "1m", + maximumAttempts: 3, + }, + attempts: 2, + deadlineAt: deadline, }); - const server = createServer(backend); - const res = await server.fetch( - new Request(`http://localhost/v0/signal-deliveries/${randomUUID()}`), - ); - expect(res.status).toBe(500); - }); - - test("readyz returns 503 when backend cannot serve requests", async () => { - const backend = mockBackend({ - countWorkflowRuns: vi.fn().mockRejectedValue(new Error("db down")), - }); - const server = createServer(backend); - const res = await server.fetch(new Request("http://localhost/readyz")); - expect(res.status).toBe(503); + expect(failWorkflowRun).toHaveBeenCalledOnce(); + const [params] = failWorkflowRun.mock.calls[0] as [ + { deadlineAt?: Date | null; attempts?: number }, + ]; + if (!(params.deadlineAt instanceof Date)) { + throw new TypeError("expected deadlineAt to be a Date"); + } + expect(params.deadlineAt.toISOString()).toBe(deadline.toISOString()); + expect(params.attempts).toBe(2); }); }); @@ -639,32 +564,21 @@ function backendWithFetch(fetch: typeof globalThis.fetch): BackendHttp { * @param response - Response to return for every call * @returns A fetch-compatible function */ -function fetchReturning( - response: Response, -): typeof globalThis.fetch { +function fetchReturning(response: Response): typeof globalThis.fetch { // eslint-disable-next-line @typescript-eslint/require-await -- fetch is async return async () => response.clone(); } -/** - * Extract the URL string from a fetch input regardless of its form. - * @param input - The first argument passed to a fetch-compatible function - * @returns The URL as a string - */ -function fetchInputUrl(input: string | URL | Request): string { - if (typeof input === "string") return input; - if (input instanceof URL) return input.href; - return input.url; -} - describe("BackendHttp", () => { test("assembles URLs against a URL with trailing slash correctly", async () => { const calls: string[] = []; // eslint-disable-next-line @typescript-eslint/require-await -- fetch is async - async function fakeFetch( - input: string | URL | Request, - ): Promise { - calls.push(fetchInputUrl(input)); + async function fakeFetch(input: string | URL | Request): Promise { + let url: string; + if (typeof input === "string") url = input; + else if (input instanceof URL) url = input.href; + else url = input.url; + calls.push(url); return new Response(null, { status: 404 }); } const backend = new BackendHttp({ @@ -733,9 +647,9 @@ describe("BackendHttp", () => { Response.json({ error: { message: "boom" } }, { status: 500 }), ), ); - await expect( - backend.countWorkflowRuns(), - ).rejects.not.toBeInstanceOf(BackendError); + await expect(backend.countWorkflowRuns()).rejects.not.toBeInstanceOf( + BackendError, + ); }); test("throws plain Error when server returns unrecognized code", async () => { @@ -747,9 +661,9 @@ describe("BackendHttp", () => { ), ), ); - await expect( - backend.countWorkflowRuns(), - ).rejects.not.toBeInstanceOf(BackendError); + await expect(backend.countWorkflowRuns()).rejects.not.toBeInstanceOf( + BackendError, + ); }); test("falls back to response text when body is not JSON", async () => { diff --git a/packages/server/server.ts b/packages/server/server.ts index a79e3449..78bdebda 100644 --- a/packages/server/server.ts +++ b/packages/server/server.ts @@ -1,5 +1,6 @@ import { errorToResponse, + HttpValidationError, parseJsonBody, type ServerErrorHook, } from "./errors.js"; @@ -38,7 +39,8 @@ import type { ListStepAttemptsParams, ListWorkflowRunsParams, RescheduleWorkflowRunAfterFailedStepAttemptParams, - SendSignalParams, + RetryPolicy, + SerializedError, SetStepAttemptChildWorkflowRunParams, SleepWorkflowRunParams, } from "openworkflow/internal"; @@ -55,37 +57,19 @@ export interface OpenWorkflowServer { * Options for {@link createServer}. */ export interface CreateServerOptions { - /** - * Maximum allowed request body size, in bytes. Requests exceeding this - * limit are rejected with HTTP 413 before the handler is invoked. - * Defaults to 1 MiB. - */ + /** Maximum request body size, in bytes. Defaults to 1 MiB. */ maxBodyBytes?: number; - /** - * Whether to attach Hono's request logger middleware. - * Defaults to `false` so tests stay quiet; enable in production. - */ + /** Attach Hono's request logger middleware. Defaults to `false`. */ logRequests?: boolean; - /** - * Hook invoked for every unexpected server-side error. Intended for - * structured logging or error reporting. Not called for expected 4xx - * conditions (validation errors, `BackendError`, body-limit rejection). - */ + /** Hook invoked for unexpected server-side errors (not validation/`BackendError`). */ onError?: ServerErrorHook; /** - * If `true`, the `message` of unexpected `Error`s thrown from the backend - * is included in the 500 response body. Useful during development and for - * the shared test suite; dangerous in production because it can leak - * implementation details (SQL fragments, connection URIs, etc.). - * `BackendError` messages and validation errors are always exposed - * regardless of this flag. - * Defaults to `false` (production-safe). + * Include the message of unexpected backend errors in the 500 response. + * Defaults to `false`; leaks implementation details if enabled in production. */ exposeInternalErrors?: boolean; } -const DEFAULT_MAX_BODY_BYTES = 1_048_576; // 1 MiB - /** * Create an OpenWorkflow HTTP server backed by the given Backend. * @param backend - Backend implementation to proxy @@ -101,14 +85,13 @@ export function createServer( if (options.logRequests) { app.use("*", logger()); } - app.use( - "*", - bodyLimit({ maxSize: options.maxBodyBytes ?? DEFAULT_MAX_BODY_BYTES }), - ); + app.use("*", bodyLimit({ maxSize: options.maxBodyBytes ?? 1_048_576 })); app.onError((error, c) => errorToResponse(error, c, { - onError: options.onError, - exposeInternalErrors: options.exposeInternalErrors, + ...(options.onError === undefined ? {} : { onError: options.onError }), + ...(options.exposeInternalErrors === undefined + ? {} + : { exposeInternalErrors: options.exposeInternalErrors }), }), ); @@ -133,13 +116,9 @@ export function createServer( return app; } -// --------------------------------------------------------------------------- -// Verb dispatch — POST /...:{verb} and POST /.../:id:{verb} -// -// Hono doesn't support literal colons inside a route that also has a `:param`, -// so we capture `id:verb` as a single segment and dispatch via table. Shared -// between workflow-run and step-attempt instance methods. -// --------------------------------------------------------------------------- +// Hono doesn't support a literal `:` inside a route that also has a `:param`, +// so `POST /resource/:id:verb` is captured as a single segment and dispatched +// via the `VerbHandler` tables below. type VerbHandler = ( backend: Backend, @@ -148,11 +127,11 @@ type VerbHandler = ( ) => Promise; /** - * Register `POST {pathPrefix}/:idVerb` with the given verb dispatch table. - * @param app - Hono app instance - * @param pathPrefix - Collection path (e.g. "/v0/workflow-runs") - * @param backend - Backend implementation to proxy - * @param verbs - Verb-name → handler map + * Register `POST {pathPrefix}/:id:verb` routes that dispatch through `verbs`. + * @param app - The Hono app to mount on + * @param pathPrefix - Path prefix preceding the `:id:verb` segment + * @param backend - Backend instance passed to each verb handler + * @param verbs - Verb name → handler map */ function registerVerbRoute( app: Hono, @@ -170,10 +149,6 @@ function registerVerbRoute( }); } -// --------------------------------------------------------------------------- -// Route registration — Workflow Runs -// --------------------------------------------------------------------------- - const WORKFLOW_RUN_VERBS: Readonly> = { extendLease: async (backend, id, c) => { const body = await parseJsonBody(c, extendWorkflowRunLeaseSchema); @@ -202,8 +177,8 @@ const WORKFLOW_RUN_VERBS: Readonly> = { const params: FailWorkflowRunParams = { workflowRunId: id, workerId: body.workerId, - error: body.error, - retryPolicy: body.retryPolicy, + error: toSerializedError(body.error), + retryPolicy: body.retryPolicy as RetryPolicy, ...(body.attempts === undefined ? {} : { attempts: body.attempts }), ...(body.deadlineAt === undefined ? {} @@ -217,12 +192,11 @@ const WORKFLOW_RUN_VERBS: Readonly> = { const params: RescheduleWorkflowRunAfterFailedStepAttemptParams = { workflowRunId: id, workerId: body.workerId, - error: body.error, + error: toSerializedError(body.error), availableAt: new Date(body.availableAt), }; - const run = await backend.rescheduleWorkflowRunAfterFailedStepAttempt( - params, - ); + const run = + await backend.rescheduleWorkflowRunAfterFailedStepAttempt(params); return c.json(run); }, cancel: async (backend, id, c) => { @@ -233,9 +207,9 @@ const WORKFLOW_RUN_VERBS: Readonly> = { }; /** - * Register workflow-run routes on the given app. - * @param app - Hono app instance - * @param backend - Backend implementation to proxy + * Mount workflow-run routes under `/v0/workflow-runs`. + * @param app - The Hono app to mount on + * @param backend - Backend instance to delegate to */ function registerWorkflowRunRoutes(app: Hono, backend: Backend): void { app.post("/v0/workflow-runs", async (c) => { @@ -286,10 +260,6 @@ function registerWorkflowRunRoutes(app: Hono, backend: Backend): void { registerVerbRoute(app, "/v0/workflow-runs", backend, WORKFLOW_RUN_VERBS); } -// --------------------------------------------------------------------------- -// Route registration — Step Attempts -// --------------------------------------------------------------------------- - const STEP_ATTEMPT_VERBS: Readonly> = { complete: async (backend, id, c) => { const body = await parseJsonBody(c, completeStepAttemptSchema); @@ -299,7 +269,12 @@ const STEP_ATTEMPT_VERBS: Readonly> = { }, fail: async (backend, id, c) => { const body = await parseJsonBody(c, failStepAttemptSchema); - const params: FailStepAttemptParams = { stepAttemptId: id, ...body }; + const params: FailStepAttemptParams = { + stepAttemptId: id, + workflowRunId: body.workflowRunId, + workerId: body.workerId, + error: toSerializedError(body.error), + }; const step = await backend.failStepAttempt(params); return c.json(step); }, @@ -315,16 +290,20 @@ const STEP_ATTEMPT_VERBS: Readonly> = { }; /** - * Register step-attempt routes on the given app. - * @param app - Hono app instance - * @param backend - Backend implementation to proxy + * Mount step-attempt routes under `/v0/workflow-runs/:id/step-attempts` and `/v0/step-attempts`. + * @param app - The Hono app to mount on + * @param backend - Backend instance to delegate to */ function registerStepAttemptRoutes(app: Hono, backend: Backend): void { app.post("/v0/workflow-runs/:id/step-attempts", async (c) => { const body = await parseJsonBody(c, createStepAttemptSchema); const params: CreateStepAttemptParams = { workflowRunId: c.req.param("id"), - ...body, + workerId: body.workerId, + stepName: body.stepName, + kind: body.kind, + config: body.config, + context: body.context, }; const step = await backend.createStepAttempt(params); return c.json(step, 201); @@ -336,7 +315,7 @@ function registerStepAttemptRoutes(app: Hono, backend: Backend): void { if (!step) { return c.json({ error: { message: "Step attempt not found" } }, 404); } - return c.json(step); + return c.json(step as unknown); }); app.get("/v0/workflow-runs/:id/step-attempts", async (c) => { @@ -351,20 +330,15 @@ function registerStepAttemptRoutes(app: Hono, backend: Backend): void { registerVerbRoute(app, "/v0/step-attempts", backend, STEP_ATTEMPT_VERBS); } -// --------------------------------------------------------------------------- -// Route registration — Signals -// --------------------------------------------------------------------------- - /** - * Register signal routes on the given app. - * @param app - Hono app instance - * @param backend - Backend implementation to proxy + * Mount signal routes under `/v0/signals` and `/v0/signal-deliveries`. + * @param app - The Hono app to mount on + * @param backend - Backend instance to delegate to */ function registerSignalRoutes(app: Hono, backend: Backend): void { app.post("/v0/signals:send", async (c) => { const body = await parseJsonBody(c, sendSignalSchema); - const params: SendSignalParams = body; - const result = await backend.sendSignal(params); + const result = await backend.sendSignal(body); return c.json(result); }); @@ -374,18 +348,15 @@ function registerSignalRoutes(app: Hono, backend: Backend): void { }; const result = await backend.getSignalDelivery(params); if (result === undefined) return c.body(null, 204); - return c.json(result); + return c.json(result as unknown); }); } -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - /** - * Extract pagination query params from a Hono context. + * Extract pagination query params. * @param c - Hono context - * @returns Pagination params object + * @returns Pagination params + * @throws {HttpValidationError} On invalid `limit` or conflicting `after`/`before`. */ function paginationQuery(c: Context): { limit?: number; @@ -393,19 +364,50 @@ function paginationQuery(c: Context): { before?: string; } { const { limit, after, before } = c.req.query(); + if (after && before) { + throw new HttpValidationError( + "Query parameters `after` and `before` are mutually exclusive.", + ); + } + const result: { limit?: number; after?: string; before?: string } = {}; + if (limit) { + const parsed = Number(limit); + if (!Number.isInteger(parsed) || parsed <= 0) { + throw new HttpValidationError( + "Query parameter `limit` must be a positive integer.", + ); + } + result.limit = parsed; + } + if (after) result.after = after; + if (before) result.before = before; + return result; +} + +/** + * Drop undefined name/stack to satisfy `exactOptionalPropertyTypes`. + * @param err - Validated error payload from a request body + * @param err.name - Optional error name + * @param err.message - Error message + * @param err.stack - Optional stack trace + * @returns A {@link SerializedError} with no `undefined` properties + */ +function toSerializedError(err: { + name?: string | undefined; + message: string; + stack?: string | undefined; +}): SerializedError { return { - ...(limit ? { limit: Number(limit) } : {}), - ...(after ? { after } : {}), - ...(before ? { before } : {}), + message: err.message, + ...(err.name === undefined ? {} : { name: err.name }), + ...(err.stack === undefined ? {} : { stack: err.stack }), }; } /** - * Split a path segment of the form `id:verb` into its parts. - * Returns `[id, verb]` or `null` if no colon is present, or if the verb is - * empty (e.g. `id:`). - * @param segment - The path segment to split - * @returns Tuple of [id, verb] or null + * Split an `{id}:{verb}` path segment; returns null if either side is empty. + * @param segment - Path segment of the form `id:verb` + * @returns A `[id, verb]` tuple, or null if the segment is malformed */ function splitIdVerb(segment: string): [id: string, verb: string] | null { const idx = segment.lastIndexOf(":"); @@ -416,33 +418,25 @@ function splitIdVerb(segment: string): [id: string, verb: string] | null { return [id, verb]; } -// --------------------------------------------------------------------------- -// Node.js HTTP server -// --------------------------------------------------------------------------- - /** * Options for {@link serve}. */ export interface ServeOptions { /** Port to listen on (default: 3000). */ port?: number; - /** - * Host/interface to bind to. Defaults to `127.0.0.1` so the server is not - * unexpectedly exposed to the network. Set to `0.0.0.0` (or an explicit - * interface) to accept remote connections. - */ + /** Host/interface to bind to (default: `127.0.0.1`). */ hostname?: string; } /** - * A handle for a running Node.js HTTP server. Call `close()` to stop - * accepting new connections and wait for in-flight requests to complete. + * Handle for a running Node.js HTTP server. */ export interface ServeHandle { /** Gracefully close the server. Resolves when the socket is closed. */ close(): Promise; } +/* v8 ignore start -- infrastructure: starts a real Node.js HTTP server */ /** * Start a Node.js HTTP server for the given OpenWorkflow server. * @param server - OpenWorkflow server instance @@ -453,25 +447,19 @@ export function serve( server: OpenWorkflowServer, options: ServeOptions = {}, ): ServeHandle { - /* v8 ignore start -- infrastructure: starts a real Node.js HTTP server */ - const port = options.port ?? 3000; - const hostname = options.hostname ?? "127.0.0.1"; const httpServer = honoServe({ fetch: (request) => server.fetch(request), - port, - hostname, + port: options.port ?? 3000, + hostname: options.hostname ?? "127.0.0.1", }); return { close: () => new Promise((resolve, reject) => { httpServer.close((err) => { - if (err) { - reject(err); - return; - } - resolve(); + if (err) reject(err); + else resolve(); }); }), }; - /* v8 ignore stop */ } +/* v8 ignore stop */ From 605d9254c25337e80e4bb0922e154744f4b5ca40 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Thu, 16 Apr 2026 22:51:48 -0500 Subject: [PATCH 3/6] fix: feedback --- packages/openworkflow/core/error.ts | 10 ++++++++-- packages/openworkflow/internal.ts | 4 ++++ packages/server/schemas.ts | 26 ++++++++++++++++++-------- packages/server/server.ts | 12 +++++------- 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/packages/openworkflow/core/error.ts b/packages/openworkflow/core/error.ts index 25f7e0c5..931c5cd4 100644 --- a/packages/openworkflow/core/error.ts +++ b/packages/openworkflow/core/error.ts @@ -7,7 +7,13 @@ export interface SerializedError { [key: string]: JsonValue; } -export type BackendErrorCode = "NOT_FOUND" | "CONFLICT"; +/** + * Runtime tuple of backend error codes; {@link BackendErrorCode} is derived + * from it. + */ +export const BACKEND_ERROR_CODES = ["NOT_FOUND", "CONFLICT"] as const; + +export type BackendErrorCode = (typeof BACKEND_ERROR_CODES)[number]; /** * Type guard for {@link BackendErrorCode}. @@ -15,7 +21,7 @@ export type BackendErrorCode = "NOT_FOUND" | "CONFLICT"; * @returns True if `code` is a known backend error code */ export function isBackendErrorCode(code: string): code is BackendErrorCode { - return code === "NOT_FOUND" || code === "CONFLICT"; + return (BACKEND_ERROR_CODES as readonly string[]).includes(code); } // eslint-disable-next-line functional/no-classes, functional/no-class-inheritance diff --git a/packages/openworkflow/internal.ts b/packages/openworkflow/internal.ts index a47f4356..a24e0f82 100644 --- a/packages/openworkflow/internal.ts +++ b/packages/openworkflow/internal.ts @@ -11,6 +11,10 @@ export { type SerializedError, } from "./core/error.js"; +// duration +export type { DurationString } from "./core/duration.js"; +export { parseDuration } from "./core/duration.js"; + // core export type { JsonValue } from "./core/json.js"; export type { WorkflowRun, WorkflowRunStatus } from "./core/workflow-run.js"; diff --git a/packages/server/schemas.ts b/packages/server/schemas.ts index afbe3f19..560e478c 100644 --- a/packages/server/schemas.ts +++ b/packages/server/schemas.ts @@ -1,5 +1,9 @@ -import type { JsonValue, StepAttemptContext } from "openworkflow/internal"; -import { STEP_KINDS } from "openworkflow/internal"; +import type { + DurationString, + JsonValue, + StepAttemptContext, +} from "openworkflow/internal"; +import { parseDuration, STEP_KINDS } from "openworkflow/internal"; import { z } from "zod/v4"; // Request body schemas. ISO-8601 datetime strings are parsed into Date so @@ -9,6 +13,12 @@ import { z } from "zod/v4"; const isoDatetime = z.iso.datetime().transform((s) => new Date(s)); +const durationString = z + .string() + .refine((s) => parseDuration(s as DurationString).ok, { + message: "Invalid duration string", + }); + const jsonValue = z.json() as unknown as z.ZodType; const stepAttemptContext = z.json() as unknown as z.ZodType; @@ -33,7 +43,7 @@ export const createWorkflowRunSchema = z.object({ const workerLeaseFields = { workerId: z.string(), - leaseDurationMs: z.number(), + leaseDurationMs: z.number().int().positive(), }; export const claimWorkflowRunSchema = z.object(workerLeaseFields); @@ -54,12 +64,12 @@ export const failWorkflowRunSchema = z.object({ workerId: z.string(), error: errorSchema, retryPolicy: z.object({ - initialInterval: z.string(), - backoffCoefficient: z.number(), - maximumInterval: z.string(), - maximumAttempts: z.number(), + initialInterval: durationString, + backoffCoefficient: z.number().positive(), + maximumInterval: durationString, + maximumAttempts: z.number().int().positive(), }), - attempts: z.number().optional(), + attempts: z.number().int().nonnegative().optional(), deadlineAt: isoDatetime.nullable().optional(), }); diff --git a/packages/server/server.ts b/packages/server/server.ts index 78bdebda..256bfb2f 100644 --- a/packages/server/server.ts +++ b/packages/server/server.ts @@ -161,7 +161,7 @@ const WORKFLOW_RUN_VERBS: Readonly> = { const params: SleepWorkflowRunParams = { workflowRunId: id, workerId: body.workerId, - availableAt: new Date(body.availableAt), + availableAt: body.availableAt, }; const run = await backend.sleepWorkflowRun(params); return c.json(run); @@ -180,9 +180,7 @@ const WORKFLOW_RUN_VERBS: Readonly> = { error: toSerializedError(body.error), retryPolicy: body.retryPolicy as RetryPolicy, ...(body.attempts === undefined ? {} : { attempts: body.attempts }), - ...(body.deadlineAt === undefined - ? {} - : { deadlineAt: body.deadlineAt ? new Date(body.deadlineAt) : null }), + ...(body.deadlineAt === undefined ? {} : { deadlineAt: body.deadlineAt }), }; const run = await backend.failWorkflowRun(params); return c.json(run); @@ -193,7 +191,7 @@ const WORKFLOW_RUN_VERBS: Readonly> = { workflowRunId: id, workerId: body.workerId, error: toSerializedError(body.error), - availableAt: new Date(body.availableAt), + availableAt: body.availableAt, }; const run = await backend.rescheduleWorkflowRunAfterFailedStepAttempt(params); @@ -223,8 +221,8 @@ function registerWorkflowRunRoutes(app: Hono, backend: Backend): void { input: body.input, parentStepAttemptNamespaceId: body.parentStepAttemptNamespaceId, parentStepAttemptId: body.parentStepAttemptId, - availableAt: body.availableAt ? new Date(body.availableAt) : null, - deadlineAt: body.deadlineAt ? new Date(body.deadlineAt) : null, + availableAt: body.availableAt, + deadlineAt: body.deadlineAt, }; const run = await backend.createWorkflowRun(params); return c.json(run, 201); From af98238da765cbabd803642ca46761b2d78c1b19 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 17 Apr 2026 14:17:36 -0500 Subject: [PATCH 4/6] fix: set server package to private --- package-lock.json | 1 - packages/server/package.json | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index 1950e0af..01c70e18 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17137,7 +17137,6 @@ }, "packages/server": { "name": "@openworkflow/server", - "version": "0.1.0", "dependencies": { "@hono/node-server": "^1.19.14", "hono": "^4.12.14", diff --git a/packages/server/package.json b/packages/server/package.json index 87a0cae9..f3133a3f 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -1,6 +1,6 @@ { "name": "@openworkflow/server", - "version": "0.1.0", + "private": true, "description": "HTTP server for OpenWorkflow — exposes the Backend interface as a REST API", "type": "module", "exports": { From 281a3d587fe8712590ced719e53baa0aaf7adb57 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 17 Apr 2026 14:17:44 -0500 Subject: [PATCH 5/6] fix: update cspell ignore list for healthz and readyz --- eslint.config.js | 2 ++ packages/server/server.test.ts | 1 - packages/server/server.ts | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eslint.config.js b/eslint.config.js index feab8a77..f980e211 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -69,9 +69,11 @@ export default defineConfig( flagWords: ["cancellation", "cancelled"], // prefer en-US spelling for consistency ignoreWords: [ "arktype", + "healthz", "heartbeating", "idempotently", "openworkflow", + "readyz", "sonarjs", "timestamptz", ], diff --git a/packages/server/server.test.ts b/packages/server/server.test.ts index 09293fd7..a3b5901f 100644 --- a/packages/server/server.test.ts +++ b/packages/server/server.test.ts @@ -89,7 +89,6 @@ describe("Server", () => { // Liveness & readiness // ----------------------------------------------------------------------- - // cspell:ignore healthz readyz test("GET /healthz returns 200 ok without hitting backend", async () => { const res = await fetch(new Request("http://localhost/healthz")); expect(res.status).toBe(200); diff --git a/packages/server/server.ts b/packages/server/server.ts index 256bfb2f..f2e9b59b 100644 --- a/packages/server/server.ts +++ b/packages/server/server.ts @@ -95,7 +95,6 @@ export function createServer( }), ); - // cspell:ignore healthz readyz // /healthz is liveness (process is up). /readyz pings the backend so load // balancers don't route traffic to a replica whose DB connection is broken. app.get("/healthz", (c) => c.json({ status: "ok" })); From a27eab480096e8921b9a7470b5ac4bcd4841891a Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 17 Apr 2026 14:55:49 -0500 Subject: [PATCH 6/6] fix: feedback --- package-lock.json | 2 +- packages/cli/commands.ts | 20 ++++++++++++++++---- packages/cli/package.json | 2 +- packages/server/errors.ts | 15 ++++++++++++--- packages/server/server.test.ts | 4 ++++ 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/package-lock.json b/package-lock.json index 114a12bd..f540e9e5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16959,7 +16959,6 @@ "version": "0.4.4", "dependencies": { "@clack/prompts": "^1.2.0", - "@openworkflow/server": "*", "commander": "^14.0.3", "consola": "^3.4.2", "dotenv": "^17.4.2", @@ -16970,6 +16969,7 @@ "openworkflow": "dist/cli.js" }, "devDependencies": { + "@openworkflow/server": "*", "openworkflow": "*", "vitest": "^4.0.18" }, diff --git a/packages/cli/commands.ts b/packages/cli/commands.ts index 1e95687c..b091069e 100644 --- a/packages/cli/commands.ts +++ b/packages/cli/commands.ts @@ -479,8 +479,18 @@ export async function serverStart(options: PortedOptions = {}): Promise { }); try { - // still dynamic to defer hono's ~150KB until `server start` is invoked - const { createServer, serve } = await import("@openworkflow/server"); + // dynamic to defer hono's ~150KB until `server start` is invoked + // this is not ideal and will be addressed before release + let serverModule: typeof import("@openworkflow/server"); + try { + serverModule = await import("@openworkflow/server"); + } catch { + throw new CLIError( + "@openworkflow/server is not installed.", + 'Install it to enable the "server start" command: `npm install @openworkflow/server`.', + ); + } + const { createServer, serve } = serverModule; const server = createServer(backend, { logRequests: true, @@ -534,8 +544,10 @@ function registerGracefulShutdown( options.noun.charAt(0).toUpperCase() + options.noun.slice(1); consola.success(`${capitalized} stopped`); } - process.on("SIGINT", () => void shutdown()); - process.on("SIGTERM", () => void shutdown()); + // `once` so repeated worker/serverStart invocations (e.g. programmatic use, + // tests) don't accumulate listeners and trigger MaxListenersExceededWarning. + process.once("SIGINT", () => void shutdown()); + process.once("SIGTERM", () => void shutdown()); return shutdown; } diff --git a/packages/cli/package.json b/packages/cli/package.json index 465c7cde..91b3900a 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -28,7 +28,6 @@ }, "dependencies": { "@clack/prompts": "^1.2.0", - "@openworkflow/server": "*", "commander": "^14.0.3", "consola": "^3.4.2", "dotenv": "^17.4.2", @@ -36,6 +35,7 @@ "nypm": "^0.6.5" }, "devDependencies": { + "@openworkflow/server": "*", "openworkflow": "*", "vitest": "^4.0.18" }, diff --git a/packages/server/errors.ts b/packages/server/errors.ts index 2e3ab5b5..4d3c0d0f 100644 --- a/packages/server/errors.ts +++ b/packages/server/errors.ts @@ -65,10 +65,19 @@ export function errorToResponse( ); } - // Hono body-limit and similar middleware throw HTTPException — respect - // their status/response and don't treat them as server errors. + // Hono body-limit and similar middleware throw HTTPException. Preserve the + // status/headers they chose, but normalize the body to the documented JSON + // error wire shape so clients can parse every error response uniformly. if (error instanceof HTTPException) { - return error.getResponse(); + const original = error.getResponse(); + const headers = new Headers(original.headers); + headers.set("content-type", "application/json; charset=utf-8"); + const message = error.message || original.statusText || "HTTP error"; + const body: ErrorResponseBody = { error: { message } }; + return Response.json(body, { + status: original.status, + headers, + }); } // Anything else is unexpected: surface it to the caller for logging, and diff --git a/packages/server/server.test.ts b/packages/server/server.test.ts index a3b5901f..8760e2a0 100644 --- a/packages/server/server.test.ts +++ b/packages/server/server.test.ts @@ -354,6 +354,10 @@ describe("Server request validation", () => { }), ); expect(res.status).toBe(413); + expect(res.headers.get("content-type")).toMatch(/application\/json/); + const body = (await res.json()) as { error: { message: string } }; + expect(typeof body.error.message).toBe("string"); + expect(body.error.message.length).toBeGreaterThan(0); }); });