diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 2352b518..84eb8fc8 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -793,15 +793,20 @@ class StepExecutor implements StepApi { ? runningAttempt : await this.linkChildWorkflowRun(stepName, runningAttempt, request); - const childId = workflowAttempt.childWorkflowRunId; - if (!childId) { - return await this.failStepWithError( + const failWorkflowStep = (error: Error): Promise => + this.failStepWithError( stepName, workflowAttempt.id, + error, + TERMINAL_STEP_RETRY_POLICY, + ); + + const childId = workflowAttempt.childWorkflowRunId; + if (!childId) { + return await failWorkflowStep( new Error( `Workflow step "${stepName}" could not find linked child workflow run`, ), - TERMINAL_STEP_RETRY_POLICY, ); } @@ -809,23 +814,17 @@ class StepExecutor implements StepApi { workflowRunId: childId, }); if (!childRun) { - return await this.failStepWithError( - stepName, - workflowAttempt.id, + return await failWorkflowStep( new Error( `Workflow step "${stepName}" could not find linked child workflow run "${childId}"`, ), - TERMINAL_STEP_RETRY_POLICY, ); } // Check timeout before checking child result if (hasWorkflowTimedOut(workflowAttempt, childRun)) { - return await this.failStepWithError( - stepName, - workflowAttempt.id, + return await failWorkflowStep( new Error("Timed out waiting for child workflow to complete"), - TERMINAL_STEP_RETRY_POLICY, ); } @@ -844,27 +843,19 @@ class StepExecutor implements StepApi { // Child failed — propagate its error if (childRun.status === "failed") { - const childError = + return await failWorkflowStep( childRun.error === null ? new Error(`Child workflow run "${childRun.id}" failed`) - : deserializeError(childRun.error); - return await this.failStepWithError( - stepName, - workflowAttempt.id, - childError, - TERMINAL_STEP_RETRY_POLICY, + : deserializeError(childRun.error), ); } // Child canceled — propagate as error if (childRun.status === "canceled") { - return await this.failStepWithError( - stepName, - workflowAttempt.id, + return await failWorkflowStep( new Error( `Workflow step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`, ), - TERMINAL_STEP_RETRY_POLICY, ); } @@ -1259,6 +1250,20 @@ export async function executeWorkflow( params; const executionFence = new ExecutionFence(); + /** + * Run a backend transition for this workflow run, handling stale-write races. + * @param fn - Backend transition to execute + * @returns Promise resolved when the transition completes + */ + function runTransition(fn: () => Promise): Promise { + return executeWorkflowRunTransition({ + backend, + workflowRunId: workflowRun.id, + workerId, + transition: fn, + }); + } + try { // load all pages of step history const attempts = await listAllStepAttemptsForWorkflowRun( @@ -1311,55 +1316,39 @@ export async function executeWorkflow( // mark success executionFence.deactivate(); - await executeWorkflowRunTransition({ - backend, - workflowRunId: workflowRun.id, - workerId, - transition: async () => { - await backend.completeWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - output: (output ?? null) as JsonValue, - }); - }, - }); + await runTransition(() => + backend.completeWorkflowRun({ + workflowRunId: workflowRun.id, + workerId, + output: (output ?? null) as JsonValue, + }), + ); } catch (error) { executionFence.deactivate(); // handle sleep signal by parking the workflow in running status if (error instanceof SleepSignal) { - await executeWorkflowRunTransition({ - backend, - workflowRunId: workflowRun.id, - workerId, - transition: async () => { - await backend.sleepWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - availableAt: error.resumeAt, - }); - }, - }); - + await runTransition(() => + backend.sleepWorkflowRun({ + workflowRunId: workflowRun.id, + workerId, + availableAt: error.resumeAt, + }), + ); return; } if (error instanceof StepLimitExceededError) { - await executeWorkflowRunTransition({ - backend, - workflowRunId: workflowRun.id, - workerId, - transition: async () => { - await backend.failWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - error: serializeStepLimitExceededError(error), - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, - attempts: workflowRun.attempts, - deadlineAt: workflowRun.deadlineAt, - }); - }, - }); + await runTransition(() => + backend.failWorkflowRun({ + workflowRunId: workflowRun.id, + workerId, + error: serializeStepLimitExceededError(error), + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + attempts: workflowRun.attempts, + deadlineAt: workflowRun.deadlineAt, + }), + ); return; } @@ -1375,21 +1364,16 @@ export async function executeWorkflow( ); if (retryDecision.status === "failed") { - await executeWorkflowRunTransition({ - backend, - workflowRunId: workflowRun.id, - workerId, - transition: async () => { - await backend.failWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - error: serializedError, - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, - attempts: workflowRun.attempts, - deadlineAt: workflowRun.deadlineAt, - }); - }, - }); + await runTransition(() => + backend.failWorkflowRun({ + workflowRunId: workflowRun.id, + workerId, + error: serializedError, + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + attempts: workflowRun.attempts, + deadlineAt: workflowRun.deadlineAt, + }), + ); return; } @@ -1404,19 +1388,14 @@ export async function executeWorkflow( const availableAt = retryDecision.availableAt; - await executeWorkflowRunTransition({ - backend, - workflowRunId: workflowRun.id, - workerId, - transition: async () => { - await backend.rescheduleWorkflowRunAfterFailedStepAttempt({ - workflowRunId: workflowRun.id, - workerId, - error: serializedError, - availableAt, - }); - }, - }); + await runTransition(() => + backend.rescheduleWorkflowRunAfterFailedStepAttempt({ + workflowRunId: workflowRun.id, + workerId, + error: serializedError, + availableAt, + }), + ); return; } @@ -1425,20 +1404,15 @@ export async function executeWorkflow( } // mark failure - await executeWorkflowRunTransition({ - backend, - workflowRunId: workflowRun.id, - workerId, - transition: async () => { - await backend.failWorkflowRun({ - workflowRunId: workflowRun.id, - workerId, - error: serializeError(error), - retryPolicy: params.retryPolicy, - attempts: workflowRun.attempts, - deadlineAt: workflowRun.deadlineAt, - }); - }, - }); + await runTransition(() => + backend.failWorkflowRun({ + workflowRunId: workflowRun.id, + workerId, + error: serializeError(error), + retryPolicy: params.retryPolicy, + attempts: workflowRun.attempts, + deadlineAt: workflowRun.deadlineAt, + }), + ); } }