diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 84eb8fc8..9f2c9b62 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -7,10 +7,8 @@ import { } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; import type { StandardSchemaV1 } from "../core/standard-schema.js"; -import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; +import type { StepAttempt } from "../core/step-attempt.js"; import { - getCachedStepAttempt, - addToStepAttemptCache, normalizeStepOutput, calculateDateFromDuration, createSleepContext, @@ -36,6 +34,20 @@ import { validateInput, type WorkflowRun, } from "../core/workflow-run.js"; +import { + defaultWaitTimeoutAt, + getContextTimeoutAt, + StepHistory, + StepLimitExceededError, + WORKFLOW_STEP_LIMIT, +} from "./step-history.js"; + +export { + WORKFLOW_STEP_LIMIT, + STEP_LIMIT_EXCEEDED_ERROR_CODE, + createStepExecutionStateFromAttempts, + type StepExecutionState, +} from "./step-history.js"; /** * Signal thrown when a workflow needs to sleep. Contains the time when the @@ -135,30 +147,6 @@ const TERMINAL_STEP_RETRY_POLICY: RetryPolicy = { maximumAttempts: 1, }; -/** Maximum number of step attempts allowed for a single workflow run. */ -export const WORKFLOW_STEP_LIMIT = 1000; - -/** Error code used when a workflow run exceeds the step-attempt limit. */ -export const STEP_LIMIT_EXCEEDED_ERROR_CODE = "STEP_LIMIT_EXCEEDED"; - -/** - * Error thrown when a workflow run reaches the maximum allowed step attempts. - */ -class StepLimitExceededError extends Error { - readonly code = STEP_LIMIT_EXCEEDED_ERROR_CODE; - readonly limit: number; - readonly stepCount: number; - - constructor(limit: number, stepCount: number) { - super( - `Exceeded the step limit of ${String(limit)} attempts (current count: ${String(stepCount)})`, - ); - this.name = "StepLimitExceededError"; - this.limit = limit; - this.stepCount = stepCount; - } -} - /** * Convert a step-limit error to a persisted serialized error payload. * @param error - Step-limit error @@ -192,53 +180,6 @@ function resolveStepRetryPolicy(partial?: Partial): RetryPolicy { return { ...DEFAULT_STEP_RETRY_POLICY, ...partial }; } -/** - * Derived in-memory step state for a single workflow execution pass. - */ -export interface StepExecutionState { - cache: StepAttemptCache; - failedCountsByStepName: ReadonlyMap; - failedByStepName: ReadonlyMap; - runningByStepName: ReadonlyMap; -} - -/** - * Build step execution state from loaded attempts in one pass. - * @param attempts - Loaded step attempts for the workflow run - * @returns Successful cache plus failed-attempt counts by step name - */ -export function createStepExecutionStateFromAttempts( - attempts: readonly StepAttempt[], -): StepExecutionState { - const cache = new Map(); - const failedCountsByStepName = new Map(); - const failedByStepName = new Map(); - const runningByStepName = new Map(); - - for (const attempt of attempts) { - if (attempt.status === "completed" || attempt.status === "succeeded") { - cache.set(attempt.stepName, attempt); - continue; - } - - if (attempt.status === "failed") { - const previousCount = failedCountsByStepName.get(attempt.stepName) ?? 0; - failedCountsByStepName.set(attempt.stepName, previousCount + 1); - failedByStepName.set(attempt.stepName, attempt); - continue; - } - - runningByStepName.set(attempt.stepName, attempt); - } - - return { - cache, - failedCountsByStepName, - failedByStepName, - runningByStepName, - }; -} - /** * Resolve wait timeout input to an absolute deadline. * @param timeout - Relative/absolute timeout input @@ -270,40 +211,6 @@ function resolveWaitTimeoutAt( return result.value; } -/** - * Default wait timeout: 1 year from a base time. - * @param base - Base timestamp (defaults to now) - * @returns Timeout deadline - */ -function defaultWaitTimeoutAt(base: Readonly = new Date()): Date { - const timeoutAt = new Date(base); - timeoutAt.setFullYear(timeoutAt.getFullYear() + 1); - return timeoutAt; -} - -/** - * Extract the timeout from a persisted step attempt's context. - * Works for both workflow and signal-wait step types. - * @param attempt - Running step attempt - * @returns Timeout deadline, or null when context has no timeout - */ -function getContextTimeoutAt(attempt: Readonly): Date | null { - if ( - attempt.context?.kind !== "workflow" && - attempt.context?.kind !== "signal-wait" - ) { - return null; - } - - const { timeoutAt } = attempt.context; - if (timeoutAt === null) { - // backward compatibility for previously persisted workflow contexts - // (signal-wait timeoutAt is never null per SignalWaitStepAttemptContext). - return defaultWaitTimeoutAt(attempt.createdAt); - } - return new Date(timeoutAt); -} - /** * Determine whether the workflow timeout has elapsed before the child completed. * @param attempt - Running workflow step attempt @@ -328,58 +235,6 @@ function hasWorkflowTimedOut( return true; } -/** - * Resolve the next wake-up timestamp for a running wait step attempt. - * @param attempt - Running step attempt - * @returns Wake-up timestamp, or null when the attempt is not a wait step - */ -function getRunningWaitAttemptResumeAt( - attempt: Readonly, -): Date | null { - if (attempt.status !== "running") { - return null; - } - - if (attempt.kind === "sleep" && attempt.context?.kind === "sleep") { - const resumeAt = new Date(attempt.context.resumeAt); - return Number.isFinite(resumeAt.getTime()) ? resumeAt : null; - } - - if (attempt.kind !== "signal-wait" && attempt.kind !== "workflow") { - return null; - } - - const timeoutAt = - getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); - return Number.isFinite(timeoutAt.getTime()) - ? timeoutAt - : defaultWaitTimeoutAt(attempt.createdAt); -} - -/** - * Compute the earliest wake-up timestamp across running wait step attempts. - * @param attempts - Persisted step attempts for the workflow run - * @returns Earliest wake-up timestamp, or null when no running wait exists - */ -function getEarliestRunningWaitResumeAt( - attempts: readonly StepAttempt[], -): Date | null { - let earliest: Date | null = null; - - for (const attempt of attempts) { - const resumeAt = getRunningWaitAttemptResumeAt(attempt); - if (!resumeAt) { - continue; - } - - if (!earliest || resumeAt.getTime() < earliest.getTime()) { - earliest = resumeAt; - } - } - - return earliest; -} - /** * Complete running sleep step attempts whose resume timestamp has elapsed. * Malformed historical resume timestamps are treated as elapsed for backward @@ -392,20 +247,16 @@ async function completeElapsedRunningSleepAttempts( backend: Backend; workflowRunId: string; workerId: string; - attempts: StepAttempt[]; + history: StepHistory; }>, ): Promise { let hasPendingRunningSleep = false; - for (let i = 0; i < options.attempts.length; i += 1) { - const attempt = options.attempts[i]; - if (!attempt) continue; + // Snapshot running attempts since we mutate history during iteration. + const running = [...options.history.runningAttempts()]; - if ( - attempt.status !== "running" || - attempt.kind !== "sleep" || - attempt.context?.kind !== "sleep" - ) { + for (const attempt of running) { + if (attempt.kind !== "sleep" || attempt.context?.kind !== "sleep") { continue; } @@ -423,7 +274,7 @@ async function completeElapsedRunningSleepAttempts( output: null, }); - options.attempts[i] = completed; + options.history.recordCompletion(completed); } return hasPendingRunningSleep; @@ -487,8 +338,7 @@ export interface StepExecutorOptions { backend: Backend; workflowRunId: string; workerId: string; - attempts: StepAttempt[]; - stepLimit?: number; + history: StepHistory; executionFence: ExecutionFenceController; } @@ -510,30 +360,14 @@ class StepExecutor implements StepApi { private readonly backend: Backend; private readonly workflowRunId: string; private readonly workerId: string; - private readonly stepLimit: number; - private stepCount: number; - private cache: StepAttemptCache; - private readonly failedCountsByStepName: Map; - private readonly failedByStepName: Map; - private readonly runningByStepName: Map; - private readonly expectedNextStepIndexByName: Map; - private readonly resolvedStepNames: Set; + private readonly history: StepHistory; private readonly executionFence: ExecutionFenceController; constructor(options: Readonly) { this.backend = options.backend; this.workflowRunId = options.workflowRunId; this.workerId = options.workerId; - this.stepLimit = Math.max(1, options.stepLimit ?? WORKFLOW_STEP_LIMIT); - this.stepCount = options.attempts.length; - - const state = createStepExecutionStateFromAttempts(options.attempts); - this.cache = state.cache; - this.failedCountsByStepName = new Map(state.failedCountsByStepName); - this.failedByStepName = new Map(state.failedByStepName); - this.runningByStepName = new Map(state.runningByStepName); - this.expectedNextStepIndexByName = new Map(); - this.resolvedStepNames = new Set(); + this.history = options.history; this.executionFence = options.executionFence; } @@ -541,62 +375,6 @@ class StepExecutor implements StepApi { this.executionFence.assertActive(); } - /** - * Resolve the earliest known wake-up timestamp for running wait attempts in - * this execution pass. - * @param fallbackResumeAt - Candidate wake-up timestamp for the current wait - * @returns Earliest known wake-up timestamp - */ - private resolveEarliestRunningWaitResumeAt( - fallbackResumeAt: Readonly, - ): Date { - const earliestRunningWaitResumeAt = getEarliestRunningWaitResumeAt([ - ...this.runningByStepName.values(), - ]); - if (!earliestRunningWaitResumeAt) { - return new Date(fallbackResumeAt); - } - - const fallbackMs = fallbackResumeAt.getTime(); - if (!Number.isFinite(fallbackMs)) { - return earliestRunningWaitResumeAt; - } - - if (earliestRunningWaitResumeAt.getTime() < fallbackMs) { - return earliestRunningWaitResumeAt; - } - - return new Date(fallbackResumeAt); - } - - /** - * Resolve a step name to a deterministic, unique key for this workflow - * execution pass. When a name collides, suffixes are appended as - * `name:1`, `name:2`, etc. If those suffixes already exist (including - * user-provided names), indexing continues until an unused name is found. - * @param stepName - User-provided step name - * @returns Resolved step name used for durable step state - */ - private resolveStepName(stepName: string): string { - if (!this.resolvedStepNames.has(stepName)) { - this.resolvedStepNames.add(stepName); - return stepName; - } - - const expectedNextIndex = - this.expectedNextStepIndexByName.get(stepName) ?? 1; - for (let index = expectedNextIndex; ; index += 1) { - const resolvedName = `${stepName}:${String(index)}`; - if (this.resolvedStepNames.has(resolvedName)) { - continue; - } - - this.expectedNextStepIndexByName.set(stepName, index + 1); - this.resolvedStepNames.add(resolvedName); - return resolvedName; - } - } - // ---- step.run ----------------------------------------------------------- async run( @@ -604,17 +382,15 @@ class StepExecutor implements StepApi { fn: StepFunction, ): Promise { const { name: baseStepName, retryPolicy: retryPolicyOverride } = config; - const stepName = this.resolveStepName(baseStepName); + const stepName = this.history.resolveStepName(baseStepName); - // return cached result if available - const existingAttempt = getCachedStepAttempt(this.cache, stepName); + const existingAttempt = this.history.findCached(stepName); if (existingAttempt) { return existingAttempt.output as Output; } - // not in cache, create new step attempt this.assertExecutionActive(); - this.ensureStepLimitNotReached(); + this.history.ensureCanRecordNewAttempt(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -623,27 +399,18 @@ class StepExecutor implements StepApi { config: {}, context: null, }); - - this.stepCount += 1; - this.runningByStepName.set(stepName, attempt); + this.history.recordNewAttempt(attempt); try { - // execute step function const result = await fn(); const output = normalizeStepOutput(result); - - // mark success const savedAttempt = await this.backend.completeStepAttempt({ workflowRunId: this.workflowRunId, stepAttemptId: attempt.id, workerId: this.workerId, output, }); - - // cache result - this.cache = addToStepAttemptCache(this.cache, savedAttempt); - this.runningByStepName.delete(stepName); - + this.history.recordCompletion(savedAttempt); return savedAttempt.output as Output; } catch (error) { return this.failStepWithError( @@ -658,13 +425,10 @@ class StepExecutor implements StepApi { // ---- step.sleep --------------------------------------------------------- async sleep(baseStepName: string, duration: DurationString): Promise { - const stepName = this.resolveStepName(baseStepName); + const stepName = this.history.resolveStepName(baseStepName); - // return cached result if this sleep already completed - const existingAttempt = getCachedStepAttempt(this.cache, stepName); - if (existingAttempt) return; + if (this.history.findCached(stepName)) return; - // create new step attempt for the sleep const result = calculateDateFromDuration(duration); if (!result.ok) { throw result.error; @@ -673,7 +437,7 @@ class StepExecutor implements StepApi { const context = createSleepContext(resumeAt); this.assertExecutionActive(); - this.ensureStepLimitNotReached(); + this.history.ensureCanRecordNewAttempt(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -682,44 +446,41 @@ class StepExecutor implements StepApi { config: {}, context, }); - this.stepCount += 1; - this.runningByStepName.set(stepName, attempt); + this.history.recordNewAttempt(attempt); - // throw sleep signal to trigger postponement - // we do not mark the step as completed here; it will be updated - // when the workflow resumes - throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt)); + // Sleep attempts are not marked completed here — that happens when the + // workflow resumes. + throw new SleepSignal( + this.history.resolveEarliestRunningWaitResumeAt(resumeAt), + ); } - // ---- step.runWorkflow ----------------------------------------------- + // ---- step.runWorkflow -------------------------------------------------- async runWorkflow( spec: WorkflowSpec, input?: RunInput, options?: Readonly, ): Promise { - const stepName = this.resolveStepName(options?.name ?? spec.name); + const stepName = this.history.resolveStepName(options?.name ?? spec.name); const request: RunWorkflowStepRequest = { workflowSpec: spec, input, timeout: options?.timeout, }; - const existingAttempt = getCachedStepAttempt(this.cache, stepName); + const existingAttempt = this.history.findCached(stepName); if (existingAttempt) { return existingAttempt.output as Output; } - // Workflow steps are terminal once a failure is persisted. This prevents + // Workflow steps are terminal once a failure is persisted. Prevents // replay from spawning duplicate children when Promise.all short-circuits // on a sibling SleepSignal in the same pass. - const failedAttempt = this.failedByStepName.get(stepName); - if ( - failedAttempt?.kind === "workflow" && - failedAttempt.childWorkflowRunNamespaceId && - failedAttempt.childWorkflowRunId - ) { - const serializedFailedError = failedAttempt.error; + const terminallyFailedAttempt = + this.history.findTerminallyFailedWorkflow(stepName); + if (terminallyFailedAttempt) { + const serializedFailedError = terminallyFailedAttempt.error; const failedError = serializedFailedError && typeof serializedFailedError === "object" && @@ -729,14 +490,14 @@ class StepExecutor implements StepApi { : new Error(`Workflow step "${stepName}" previously failed`); throw new StepError({ stepName, - stepFailedAttempts: this.failedCountsByStepName.get(stepName) ?? 1, + stepFailedAttempts: this.history.failedAttemptCount(stepName), retryPolicy: TERMINAL_STEP_RETRY_POLICY, error: failedError, }); } // Resume a running workflow attempt (replay path) - const runningAttempt = this.runningByStepName.get(stepName); + const runningAttempt = this.history.findRunning(stepName); if (runningAttempt?.kind === "workflow") { return await this.resolveRunningWorkflow( stepName, @@ -748,7 +509,7 @@ class StepExecutor implements StepApi { // First encounter — create the workflow step and child workflow run const timeoutAt = resolveWaitTimeoutAt(request.timeout); this.assertExecutionActive(); - this.ensureStepLimitNotReached(); + this.history.ensureCanRecordNewAttempt(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -757,11 +518,9 @@ class StepExecutor implements StepApi { config: {}, context: createWorkflowContext(timeoutAt), }); - this.stepCount += 1; - this.runningByStepName.set(stepName, attempt); + this.history.recordNewAttempt(attempt); const linkedAttempt = await this.linkChildWorkflowRun( - stepName, attempt, request, ).catch( @@ -791,7 +550,7 @@ class StepExecutor implements StepApi { runningAttempt.childWorkflowRunId && runningAttempt.childWorkflowRunNamespaceId ? runningAttempt - : await this.linkChildWorkflowRun(stepName, runningAttempt, request); + : await this.linkChildWorkflowRun(runningAttempt, request); const failWorkflowStep = (error: Error): Promise => this.failStepWithError( @@ -836,8 +595,7 @@ class StepExecutor implements StepApi { workerId: this.workerId, output: childRun.output, }); - this.runningByStepName.delete(stepName); - this.cache = addToStepAttemptCache(this.cache, completed); + this.history.recordCompletion(completed); return completed.output as Output; } @@ -865,19 +623,19 @@ class StepExecutor implements StepApi { timeoutAt && Number.isFinite(timeoutAt.getTime()) ? timeoutAt : defaultWaitTimeoutAt(workflowAttempt.createdAt); - throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt)); + throw new SleepSignal( + this.history.resolveEarliestRunningWaitResumeAt(resumeAt), + ); } /** * Create (or dedupe) the child workflow run and persist the linkage on the * parent workflow step attempt. - * @param stepName - Parent workflow step name * @param attempt - Parent workflow step attempt * @param request - Workflow step request * @returns Updated step attempt with child linkage */ private async linkChildWorkflowRun( - stepName: string, attempt: Readonly, request: Readonly>, ): Promise { @@ -912,14 +670,15 @@ class StepExecutor implements StepApi { childWorkflowRunNamespaceId: childRun.namespaceId, childWorkflowRunId: childRun.id, }); - this.runningByStepName.set(stepName, linked); + this.history.replaceRunningAttempt(linked); return linked; } /** - * Record a step failure, update the failed-attempt counter, and throw a - * StepError. Shared by both `step.run` failures and workflow failures. + * Record a step failure and throw a StepError. Shared by `step.run` + * failures, workflow failures, signal-send failures, and signal-wait + * validation failures. * @param stepName - Step name * @param stepAttemptId - Step attempt id * @param error - Error that caused the failure @@ -935,7 +694,6 @@ class StepExecutor implements StepApi { throw new StaleExecutionBranchError(); } - this.runningByStepName.delete(stepName); let failedAttempt: StepAttempt; try { failedAttempt = await this.backend.failStepAttempt({ @@ -951,10 +709,7 @@ class StepExecutor implements StepApi { throw stepFailError; } - const stepFailedAttempts = - (this.failedCountsByStepName.get(stepName) ?? 0) + 1; - this.failedCountsByStepName.set(stepName, stepFailedAttempts); - this.failedByStepName.set(stepName, failedAttempt); + const stepFailedAttempts = this.history.recordFailedAttempt(failedAttempt); throw new StepError({ stepName, @@ -994,20 +749,22 @@ class StepExecutor implements StepApi { data?: JsonValue; }>, ): Promise<{ workflowRunIds: string[] }> { - const stepName = this.resolveStepName(options.name ?? options.signal); + const stepName = this.history.resolveStepName( + options.name ?? options.signal, + ); - const existingAttempt = getCachedStepAttempt(this.cache, stepName); + const existingAttempt = this.history.findCached(stepName); if (existingAttempt) { return existingAttempt.output as { workflowRunIds: string[] }; } - const runningAttempt = this.runningByStepName.get(stepName); + const runningAttempt = this.history.findRunning(stepName); if (runningAttempt?.kind === "signal-send") { return await this.resolveSignalSend(stepName, runningAttempt, options); } this.assertExecutionActive(); - this.ensureStepLimitNotReached(); + this.history.ensureCanRecordNewAttempt(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -1016,8 +773,7 @@ class StepExecutor implements StepApi { config: {}, context: null, }); - this.stepCount += 1; - this.runningByStepName.set(stepName, attempt); + this.history.recordNewAttempt(attempt); return await this.resolveSignalSend(stepName, attempt, options); } @@ -1040,8 +796,7 @@ class StepExecutor implements StepApi { workerId: this.workerId, output: { ...result }, }); - this.cache = addToStepAttemptCache(this.cache, completed); - this.runningByStepName.delete(stepName); + this.history.recordCompletion(completed); return completed.output as { workflowRunIds: string[] }; } catch (error) { return await this.failStepWithError( @@ -1063,14 +818,16 @@ class StepExecutor implements StepApi { schema?: StandardSchemaV1; }>, ): Promise<{ data: Output } | null> { - const stepName = this.resolveStepName(options.name ?? options.signal); + const stepName = this.history.resolveStepName( + options.name ?? options.signal, + ); - const existingAttempt = getCachedStepAttempt(this.cache, stepName); + const existingAttempt = this.history.findCached(stepName); if (existingAttempt) { return existingAttempt.output as { data: Output } | null; } - const runningAttempt = this.runningByStepName.get(stepName); + const runningAttempt = this.history.findRunning(stepName); if (runningAttempt?.kind === "signal-wait") { return await this.resolveSignalWait( stepName, @@ -1079,22 +836,19 @@ class StepExecutor implements StepApi { ); } - for (const [name, a] of this.runningByStepName) { - if ( - name !== stepName && - a.kind === "signal-wait" && - a.context?.kind === "signal-wait" && - a.context.signal === options.signal - ) { - throw new Error( - `Signal "${options.signal}" is already being waited on by step "${name}"`, - ); - } + const conflict = this.history.findConflictingSignalWait( + options.signal, + stepName, + ); + if (conflict) { + throw new Error( + `Signal "${options.signal}" is already being waited on by step "${conflict.stepName}"`, + ); } const timeoutAt = resolveWaitTimeoutAt(options.timeout); this.assertExecutionActive(); - this.ensureStepLimitNotReached(); + this.history.ensureCanRecordNewAttempt(); const attempt = await this.backend.createStepAttempt({ workflowRunId: this.workflowRunId, workerId: this.workerId, @@ -1103,8 +857,7 @@ class StepExecutor implements StepApi { config: {}, context: createSignalWaitContext(options.signal, timeoutAt), }); - this.stepCount += 1; - this.runningByStepName.set(stepName, attempt); + this.history.recordNewAttempt(attempt); return await this.resolveSignalWait(stepName, attempt, options); } @@ -1152,7 +905,9 @@ class StepExecutor implements StepApi { return await this.completeSignalWaitStep(attempt, null); } - throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(timeoutAt)); + throw new SleepSignal( + this.history.resolveEarliestRunningWaitResumeAt(timeoutAt), + ); } /** @@ -1171,16 +926,9 @@ class StepExecutor implements StepApi { workerId: this.workerId, output: output as JsonValue | null, }); - this.cache = addToStepAttemptCache(this.cache, completed); - this.runningByStepName.delete(attempt.stepName); + this.history.recordCompletion(completed); return completed.output as { data: Output } | null; } - - private ensureStepLimitNotReached(): void { - if (this.stepCount >= this.stepLimit) { - throw new StepLimitExceededError(this.stepLimit, this.stepCount); - } - } } /** @@ -1270,24 +1018,21 @@ export async function executeWorkflow( backend, workflowRun.id, ); + const history = new StepHistory({ attempts }); - // complete any elapsed sleep waits first, then park on the earliest + // Complete any elapsed sleep waits first, then park on the earliest // remaining running wait (sleep or runWorkflow timeout). const hasPendingRunningSleep = await completeElapsedRunningSleepAttempts({ backend, workflowRunId: workflowRun.id, workerId, - attempts, + history, }); if (hasPendingRunningSleep) { - const earliestRunningWaitResumeAt = - getEarliestRunningWaitResumeAt(attempts); - if ( - earliestRunningWaitResumeAt && - Date.now() < earliestRunningWaitResumeAt.getTime() - ) { - throw new SleepSignal(earliestRunningWaitResumeAt); + const earliestResumeAt = history.earliestRunningWaitResumeAt(); + if (earliestResumeAt && Date.now() < earliestResumeAt.getTime()) { + throw new SleepSignal(earliestResumeAt); } } @@ -1295,7 +1040,7 @@ export async function executeWorkflow( backend, workflowRunId: workflowRun.id, workerId, - attempts, + history, executionFence, }); diff --git a/packages/openworkflow/worker/step-history.test.ts b/packages/openworkflow/worker/step-history.test.ts new file mode 100644 index 00000000..a218c6f3 --- /dev/null +++ b/packages/openworkflow/worker/step-history.test.ts @@ -0,0 +1,262 @@ +import type { StepAttempt } from "../core/step-attempt.js"; +import { StepHistory, StepLimitExceededError } from "./step-history.js"; +import { describe, test, expect } from "vitest"; + +describe("StepHistory", () => { + describe("resolveStepName", () => { + test("returns the base name on first use", () => { + const history = new StepHistory({ attempts: [] }); + expect(history.resolveStepName("step")).toBe("step"); + }); + + test("appends incrementing suffixes for collisions", () => { + const history = new StepHistory({ attempts: [] }); + expect(history.resolveStepName("step")).toBe("step"); + expect(history.resolveStepName("step")).toBe("step:1"); + expect(history.resolveStepName("step")).toBe("step:2"); + }); + + test("skips suffixes that were user-supplied as base names", () => { + const history = new StepHistory({ attempts: [] }); + history.resolveStepName("step"); + history.resolveStepName("step:1"); // user-supplied collision + expect(history.resolveStepName("step")).toBe("step:2"); + }); + }); + + describe("find*", () => { + test("findCached returns successful attempts only", () => { + const completed = createMockStepAttempt({ + stepName: "a", + status: "completed", + output: "done", + }); + const failed = createMockStepAttempt({ + stepName: "b", + status: "failed", + }); + const history = new StepHistory({ attempts: [completed, failed] }); + + expect(history.findCached("a")).toBe(completed); + expect(history.findCached("b")).toBeUndefined(); + }); + + test("findTerminallyFailedWorkflow requires linked child ids", () => { + const unlinked = createMockStepAttempt({ + stepName: "a", + kind: "workflow", + status: "failed", + }); + const linked = createMockStepAttempt({ + stepName: "b", + kind: "workflow", + status: "failed", + childWorkflowRunNamespaceId: "default", + childWorkflowRunId: "child-run", + }); + const nonWorkflow = createMockStepAttempt({ + stepName: "c", + kind: "function", + status: "failed", + }); + + const history = new StepHistory({ + attempts: [unlinked, linked, nonWorkflow], + }); + + expect(history.findTerminallyFailedWorkflow("a")).toBeUndefined(); + expect(history.findTerminallyFailedWorkflow("b")).toBe(linked); + expect(history.findTerminallyFailedWorkflow("c")).toBeUndefined(); + }); + + test("findConflictingSignalWait matches signal name, excluding caller", () => { + const waitingA = createMockStepAttempt({ + stepName: "wait-a", + kind: "signal-wait", + status: "running", + context: { + kind: "signal-wait", + signal: "approve", + timeoutAt: "2026-05-01T00:00:00.000Z", + }, + }); + const waitingB = createMockStepAttempt({ + stepName: "wait-b", + kind: "signal-wait", + status: "running", + context: { + kind: "signal-wait", + signal: "cancel", + timeoutAt: "2026-05-01T00:00:00.000Z", + }, + }); + + const history = new StepHistory({ attempts: [waitingA, waitingB] }); + + expect(history.findConflictingSignalWait("approve", "other")).toEqual({ + stepName: "wait-a", + attempt: waitingA, + }); + expect(history.findConflictingSignalWait("approve", "wait-a")).toBeNull(); + expect(history.findConflictingSignalWait("unknown", "other")).toBeNull(); + }); + }); + + describe("mutations", () => { + test("recordNewAttempt enforces the step limit", () => { + const history = new StepHistory({ attempts: [], stepLimit: 1 }); + history.ensureCanRecordNewAttempt(); + history.recordNewAttempt( + createMockStepAttempt({ id: "a", stepName: "a", status: "running" }), + ); + expect(() => { + history.ensureCanRecordNewAttempt(); + }).toThrow(StepLimitExceededError); + }); + + test("recordCompletion moves a running attempt into the cache", () => { + const running = createMockStepAttempt({ + stepName: "a", + status: "running", + }); + const history = new StepHistory({ attempts: [running] }); + expect(history.findRunning("a")).toBe(running); + expect(history.findCached("a")).toBeUndefined(); + + const completed = createMockStepAttempt({ + stepName: "a", + status: "completed", + output: "value", + }); + history.recordCompletion(completed); + + expect(history.findRunning("a")).toBeUndefined(); + expect(history.findCached("a")).toBe(completed); + }); + + test("recordFailedAttempt increments the failure count", () => { + const running = createMockStepAttempt({ + stepName: "a", + status: "running", + }); + const history = new StepHistory({ attempts: [running] }); + + const failed = createMockStepAttempt({ + stepName: "a", + status: "failed", + }); + expect(history.recordFailedAttempt(failed)).toBe(1); + expect(history.recordFailedAttempt(failed)).toBe(2); + expect(history.failedAttemptCount("a")).toBe(2); + expect(history.findRunning("a")).toBeUndefined(); + }); + + test("replaceRunningAttempt updates the running entry in place", () => { + const initial = createMockStepAttempt({ + id: "attempt-1", + stepName: "wf", + kind: "workflow", + status: "running", + }); + const history = new StepHistory({ + attempts: [initial], + stepLimit: 2, + }); + + const linked = { + ...initial, + childWorkflowRunId: "child-run", + childWorkflowRunNamespaceId: "default", + }; + history.replaceRunningAttempt(linked); + + expect(history.findRunning("wf")).toBe(linked); + + // Counter should not have moved, so we can still record a second attempt. + history.ensureCanRecordNewAttempt(); + }); + }); + + describe("wait-time helpers", () => { + test("earliestRunningWaitResumeAt returns null with no running waits", () => { + const history = new StepHistory({ attempts: [] }); + expect(history.earliestRunningWaitResumeAt()).toBeNull(); + }); + + test("earliestRunningWaitResumeAt picks the earliest running wait", () => { + const sleepLate = createMockStepAttempt({ + stepName: "sleep-late", + kind: "sleep", + status: "running", + context: { kind: "sleep", resumeAt: "2026-06-01T00:00:00.000Z" }, + }); + const sleepEarly = createMockStepAttempt({ + stepName: "sleep-early", + kind: "sleep", + status: "running", + context: { kind: "sleep", resumeAt: "2026-05-01T00:00:00.000Z" }, + }); + const history = new StepHistory({ attempts: [sleepLate, sleepEarly] }); + + expect(history.earliestRunningWaitResumeAt()?.toISOString()).toBe( + "2026-05-01T00:00:00.000Z", + ); + }); + + test("resolveEarliestRunningWaitResumeAt picks the earlier of fallback or running", () => { + const sleep = createMockStepAttempt({ + stepName: "sleep", + kind: "sleep", + status: "running", + context: { kind: "sleep", resumeAt: "2026-06-01T00:00:00.000Z" }, + }); + const history = new StepHistory({ attempts: [sleep] }); + + const earlierFallback = new Date("2026-05-01T00:00:00.000Z"); + expect( + history + .resolveEarliestRunningWaitResumeAt(earlierFallback) + .toISOString(), + ).toBe("2026-05-01T00:00:00.000Z"); + + const laterFallback = new Date("2026-07-01T00:00:00.000Z"); + expect( + history.resolveEarliestRunningWaitResumeAt(laterFallback).toISOString(), + ).toBe("2026-06-01T00:00:00.000Z"); + }); + + test("resolveEarliestRunningWaitResumeAt falls back when no running waits", () => { + const history = new StepHistory({ attempts: [] }); + const fallback = new Date("2026-05-01T00:00:00.000Z"); + expect( + history.resolveEarliestRunningWaitResumeAt(fallback).toISOString(), + ).toBe("2026-05-01T00:00:00.000Z"); + }); + }); +}); + +function createMockStepAttempt( + overrides: Partial = {}, +): StepAttempt { + const status = overrides.status ?? "completed"; + return { + namespaceId: "default", + id: "step-attempt-id", + workflowRunId: "workflow-run-id", + stepName: "step", + kind: "function", + status, + config: {}, + context: null, + output: null, + error: null, + childWorkflowRunNamespaceId: null, + childWorkflowRunId: null, + startedAt: new Date("2026-01-01T00:00:00.000Z"), + finishedAt: + status === "running" ? null : new Date("2026-01-01T00:00:01.000Z"), + createdAt: new Date("2026-01-01T00:00:00.000Z"), + updatedAt: new Date("2026-01-01T00:00:01.000Z"), + ...overrides, + }; +} diff --git a/packages/openworkflow/worker/step-history.ts b/packages/openworkflow/worker/step-history.ts new file mode 100644 index 00000000..39532890 --- /dev/null +++ b/packages/openworkflow/worker/step-history.ts @@ -0,0 +1,373 @@ +import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; +import { + addToStepAttemptCache, + getCachedStepAttempt, +} from "../core/step-attempt.js"; + +/** Maximum number of step attempts allowed for a single workflow run. */ +export const WORKFLOW_STEP_LIMIT = 1000; + +/** Error code used when a workflow run exceeds the step-attempt limit. */ +export const STEP_LIMIT_EXCEEDED_ERROR_CODE = "STEP_LIMIT_EXCEEDED"; + +/** + * Error thrown when a workflow run reaches the maximum allowed step attempts. + */ +export class StepLimitExceededError extends Error { + readonly code = STEP_LIMIT_EXCEEDED_ERROR_CODE; + readonly limit: number; + readonly stepCount: number; + + constructor(limit: number, stepCount: number) { + super( + `Exceeded the step limit of ${String(limit)} attempts (current count: ${String(stepCount)})`, + ); + this.name = "StepLimitExceededError"; + this.limit = limit; + this.stepCount = stepCount; + } +} + +/** + * Derived in-memory step state for a single workflow execution pass. + */ +export interface StepExecutionState { + cache: StepAttemptCache; + failedCountsByStepName: ReadonlyMap; + failedByStepName: ReadonlyMap; + runningByStepName: ReadonlyMap; +} + +/** + * Build step execution state from loaded attempts in one pass. + * @param attempts - Loaded step attempts for the workflow run + * @returns Successful cache plus failed-attempt counts by step name + */ +export function createStepExecutionStateFromAttempts( + attempts: readonly StepAttempt[], +): StepExecutionState { + const cache = new Map(); + const failedCountsByStepName = new Map(); + const failedByStepName = new Map(); + const runningByStepName = new Map(); + + for (const attempt of attempts) { + if (attempt.status === "completed" || attempt.status === "succeeded") { + cache.set(attempt.stepName, attempt); + continue; + } + + if (attempt.status === "failed") { + const previousCount = failedCountsByStepName.get(attempt.stepName) ?? 0; + failedCountsByStepName.set(attempt.stepName, previousCount + 1); + failedByStepName.set(attempt.stepName, attempt); + continue; + } + + runningByStepName.set(attempt.stepName, attempt); + } + + return { + cache, + failedCountsByStepName, + failedByStepName, + runningByStepName, + }; +} + +/** + * Default wait timeout: 1 year from a base time. + * @param base - Base timestamp (defaults to now) + * @returns Timeout deadline + */ +export function defaultWaitTimeoutAt(base: Readonly = new Date()): Date { + const timeoutAt = new Date(base); + timeoutAt.setFullYear(timeoutAt.getFullYear() + 1); + return timeoutAt; +} + +/** + * Extract the timeout from a persisted step attempt's context. + * Works for both workflow and signal-wait step types. + * @param attempt - Running step attempt + * @returns Timeout deadline, or null when context has no timeout + */ +export function getContextTimeoutAt( + attempt: Readonly, +): Date | null { + if ( + attempt.context?.kind !== "workflow" && + attempt.context?.kind !== "signal-wait" + ) { + return null; + } + + const { timeoutAt } = attempt.context; + if (timeoutAt === null) { + // backward compatibility for previously persisted workflow contexts + // (signal-wait timeoutAt is never null per SignalWaitStepAttemptContext). + return defaultWaitTimeoutAt(attempt.createdAt); + } + return new Date(timeoutAt); +} + +/** + * Resolve the next wake-up timestamp for a running wait step attempt. + * @param attempt - Running step attempt + * @returns Wake-up timestamp, or null when the attempt is not a wait step + */ +function getRunningWaitAttemptResumeAt( + attempt: Readonly, +): Date | null { + if (attempt.status !== "running") { + return null; + } + + if (attempt.kind === "sleep" && attempt.context?.kind === "sleep") { + const resumeAt = new Date(attempt.context.resumeAt); + return Number.isFinite(resumeAt.getTime()) ? resumeAt : null; + } + + if (attempt.kind !== "signal-wait" && attempt.kind !== "workflow") { + return null; + } + + const timeoutAt = + getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); + return Number.isFinite(timeoutAt.getTime()) + ? timeoutAt + : defaultWaitTimeoutAt(attempt.createdAt); +} + +/** + * Compute the earliest wake-up timestamp across running wait step attempts. + * @param attempts - Persisted step attempts for the workflow run + * @returns Earliest wake-up timestamp, or null when no running wait exists + */ +function getEarliestRunningWaitResumeAt( + attempts: readonly StepAttempt[], +): Date | null { + let earliest: Date | null = null; + + for (const attempt of attempts) { + const resumeAt = getRunningWaitAttemptResumeAt(attempt); + if (!resumeAt) { + continue; + } + + if (!earliest || resumeAt.getTime() < earliest.getTime()) { + earliest = resumeAt; + } + } + + return earliest; +} + +/** + * Options for constructing a {@link StepHistory}. + */ +export interface StepHistoryOptions { + attempts: readonly StepAttempt[]; + stepLimit?: number; +} + +/** + * Encapsulates the in-memory step-attempt ledger for a single workflow + * execution pass: the successful-result cache, running/failed maps, failure + * counts, resolved step names, and the step-attempt limit. Exposes a narrow + * API so step-kind logic in {@link StepExecutor} doesn't touch these maps + * directly. + */ +export class StepHistory { + private cache: StepAttemptCache; + private readonly failedCountsByStepName: Map; + private readonly failedByStepName: Map; + private readonly runningByStepName: Map; + private readonly resolvedStepNames = new Set(); + private readonly expectedNextStepIndexByName = new Map(); + private readonly stepLimit: number; + private stepCount: number; + + constructor(options: Readonly) { + this.stepLimit = Math.max(1, options.stepLimit ?? WORKFLOW_STEP_LIMIT); + this.stepCount = options.attempts.length; + + const state = createStepExecutionStateFromAttempts(options.attempts); + this.cache = state.cache; + this.failedCountsByStepName = new Map(state.failedCountsByStepName); + this.failedByStepName = new Map(state.failedByStepName); + this.runningByStepName = new Map(state.runningByStepName); + } + + /** + * Resolve a step name to a deterministic, unique key for this workflow + * execution pass. When a name collides, suffixes are appended as + * `name:1`, `name:2`, etc. If those suffixes already exist (including + * user-provided names), indexing continues until an unused name is found. + * @param baseStepName - User-provided step name + * @returns Resolved step name used for durable step state + */ + resolveStepName(baseStepName: string): string { + if (!this.resolvedStepNames.has(baseStepName)) { + this.resolvedStepNames.add(baseStepName); + return baseStepName; + } + + const expectedNextIndex = + this.expectedNextStepIndexByName.get(baseStepName) ?? 1; + for (let index = expectedNextIndex; ; index += 1) { + const resolvedName = `${baseStepName}:${String(index)}`; + if (this.resolvedStepNames.has(resolvedName)) { + continue; + } + + this.expectedNextStepIndexByName.set(baseStepName, index + 1); + this.resolvedStepNames.add(resolvedName); + return resolvedName; + } + } + + findCached(stepName: string): StepAttempt | undefined { + return getCachedStepAttempt(this.cache, stepName); + } + + findRunning(stepName: string): StepAttempt | undefined { + return this.runningByStepName.get(stepName); + } + + /** + * Find a previously-failed workflow step attempt that already created a + * child workflow run. Workflow steps are terminal once a failure is + * persisted with linkage: the caller should surface the failure instead of + * spawning another child. + * @param stepName - Resolved step name + * @returns Terminally-failed workflow attempt, or undefined + */ + findTerminallyFailedWorkflow(stepName: string): StepAttempt | undefined { + const attempt = this.failedByStepName.get(stepName); + if ( + attempt?.kind === "workflow" && + attempt.childWorkflowRunNamespaceId && + attempt.childWorkflowRunId + ) { + return attempt; + } + return undefined; + } + + /** + * Find a running signal-wait step that is already waiting on the given + * signal, excluding a specific step name. + * @param signal - Signal address + * @param excludeStepName - Step name to skip (usually the caller) + * @returns Conflict descriptor, or null when none exists + */ + findConflictingSignalWait( + signal: string, + excludeStepName: string, + ): { stepName: string; attempt: StepAttempt } | null { + for (const [stepName, attempt] of this.runningByStepName) { + if ( + stepName !== excludeStepName && + attempt.kind === "signal-wait" && + attempt.context?.kind === "signal-wait" && + attempt.context.signal === signal + ) { + return { stepName, attempt }; + } + } + return null; + } + + failedAttemptCount(stepName: string): number { + return this.failedCountsByStepName.get(stepName) ?? 0; + } + + /** + * Iterate over currently-running step attempts. + * @returns Iterator over running step attempts + */ + runningAttempts(): IterableIterator { + return this.runningByStepName.values(); + } + + /** + * Earliest wake-up timestamp across running wait attempts. + * @returns Earliest wake-up timestamp, or null when no running wait exists + */ + earliestRunningWaitResumeAt(): Date | null { + return getEarliestRunningWaitResumeAt([...this.runningByStepName.values()]); + } + + /** + * Earliest wake-up timestamp considering running waits and a fallback (from + * the in-progress wait the caller is about to park on). + * @param fallback - Candidate timestamp for the in-progress wait + * @returns The earlier of the fallback or any known running wait. If no + * running wait exists, returns a clone of `fallback`, which will also be + * invalid when `fallback` is invalid. + */ + resolveEarliestRunningWaitResumeAt(fallback: Readonly): Date { + const earliest = this.earliestRunningWaitResumeAt(); + if (!earliest) return new Date(fallback); + + const fallbackMs = fallback.getTime(); + if (!Number.isFinite(fallbackMs)) return earliest; + + return earliest.getTime() < fallbackMs ? earliest : new Date(fallback); + } + + /** + * Assert that recording another step attempt would not exceed the step + * limit. + * @throws {StepLimitExceededError} When the step-attempt limit is reached + */ + ensureCanRecordNewAttempt(): void { + if (this.stepCount >= this.stepLimit) { + throw new StepLimitExceededError(this.stepLimit, this.stepCount); + } + } + + /** + * Record a newly-created step attempt as running and increment the attempt + * counter. Callers must invoke {@link ensureCanRecordNewAttempt} beforehand. + * @param attempt - Step attempt just created in the backend + */ + recordNewAttempt(attempt: Readonly): void { + this.runningByStepName.set(attempt.stepName, attempt); + this.stepCount += 1; + } + + /** + * Replace the running entry for a step (e.g. after linking a child workflow + * run onto an existing attempt). Does not change the attempt counter. + * @param attempt - Updated step attempt + */ + replaceRunningAttempt(attempt: Readonly): void { + this.runningByStepName.set(attempt.stepName, attempt); + } + + /** + * Mark a step attempt as completed: remove from running, add to the cache. + * @param attempt - Completed step attempt + */ + recordCompletion(attempt: Readonly): void { + this.runningByStepName.delete(attempt.stepName); + this.cache = addToStepAttemptCache(this.cache, attempt); + } + + /** + * Mark a step attempt as failed: remove from running, record failure, and + * return the new failed-attempt count for that step name. + * @param attempt - Failed step attempt + * @returns The new cumulative failed-attempt count for this step name + */ + recordFailedAttempt(attempt: Readonly): number { + this.runningByStepName.delete(attempt.stepName); + const nextCount = + (this.failedCountsByStepName.get(attempt.stepName) ?? 0) + 1; + this.failedCountsByStepName.set(attempt.stepName, nextCount); + this.failedByStepName.set(attempt.stepName, attempt); + return nextCount; + } +}