Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 80 additions & 106 deletions packages/openworkflow/worker/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -793,39 +793,38 @@ class StepExecutor implements StepApi {
? runningAttempt
: await this.linkChildWorkflowRun(stepName, runningAttempt, request);

const childId = workflowAttempt.childWorkflowRunId;
if (!childId) {
return await this.failStepWithError(
const failWorkflowStep = (error: Error): Promise<never> =>
this.failStepWithError(
stepName,
workflowAttempt.id,
error,
TERMINAL_STEP_RETRY_POLICY,
);

const childId = workflowAttempt.childWorkflowRunId;
if (!childId) {
return await failWorkflowStep(
new Error(
`Workflow step "${stepName}" could not find linked child workflow run`,
),
TERMINAL_STEP_RETRY_POLICY,
);
}

const childRun = await this.backend.getWorkflowRun({
workflowRunId: childId,
});
if (!childRun) {
return await this.failStepWithError(
stepName,
workflowAttempt.id,
return await failWorkflowStep(
new Error(
`Workflow step "${stepName}" could not find linked child workflow run "${childId}"`,
),
TERMINAL_STEP_RETRY_POLICY,
);
}

// Check timeout before checking child result
if (hasWorkflowTimedOut(workflowAttempt, childRun)) {
return await this.failStepWithError(
stepName,
workflowAttempt.id,
return await failWorkflowStep(
new Error("Timed out waiting for child workflow to complete"),
TERMINAL_STEP_RETRY_POLICY,
);
}

Expand All @@ -844,27 +843,19 @@ class StepExecutor implements StepApi {

// Child failed — propagate its error
if (childRun.status === "failed") {
const childError =
return await failWorkflowStep(
childRun.error === null
? new Error(`Child workflow run "${childRun.id}" failed`)
: deserializeError(childRun.error);
return await this.failStepWithError(
stepName,
workflowAttempt.id,
childError,
TERMINAL_STEP_RETRY_POLICY,
: deserializeError(childRun.error),
);
}

// Child canceled — propagate as error
if (childRun.status === "canceled") {
return await this.failStepWithError(
stepName,
workflowAttempt.id,
return await failWorkflowStep(
new Error(
`Workflow step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`,
),
TERMINAL_STEP_RETRY_POLICY,
);
}

Expand Down Expand Up @@ -1259,6 +1250,20 @@ export async function executeWorkflow(
params;
const executionFence = new ExecutionFence();

/**
* Run a backend transition for this workflow run, handling stale-write races.
* @param fn - Backend transition to execute
* @returns Promise resolved when the transition completes
*/
function runTransition(fn: () => Promise<unknown>): Promise<void> {
return executeWorkflowRunTransition({
backend,
workflowRunId: workflowRun.id,
workerId,
transition: fn,
});
}

try {
// load all pages of step history
const attempts = await listAllStepAttemptsForWorkflowRun(
Expand Down Expand Up @@ -1311,55 +1316,39 @@ export async function executeWorkflow(

// mark success
executionFence.deactivate();
await executeWorkflowRunTransition({
backend,
workflowRunId: workflowRun.id,
workerId,
transition: async () => {
await backend.completeWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
output: (output ?? null) as JsonValue,
});
},
});
await runTransition(() =>
backend.completeWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
output: (output ?? null) as JsonValue,
}),
);
} catch (error) {
executionFence.deactivate();

// handle sleep signal by parking the workflow in running status
if (error instanceof SleepSignal) {
await executeWorkflowRunTransition({
backend,
workflowRunId: workflowRun.id,
workerId,
transition: async () => {
await backend.sleepWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
availableAt: error.resumeAt,
});
},
});

await runTransition(() =>
backend.sleepWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
availableAt: error.resumeAt,
}),
);
return;
}

if (error instanceof StepLimitExceededError) {
await executeWorkflowRunTransition({
backend,
workflowRunId: workflowRun.id,
workerId,
transition: async () => {
await backend.failWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
error: serializeStepLimitExceededError(error),
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
attempts: workflowRun.attempts,
deadlineAt: workflowRun.deadlineAt,
});
},
});
await runTransition(() =>
backend.failWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
error: serializeStepLimitExceededError(error),
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
attempts: workflowRun.attempts,
deadlineAt: workflowRun.deadlineAt,
}),
);
return;
}

Expand All @@ -1375,21 +1364,16 @@ export async function executeWorkflow(
);

if (retryDecision.status === "failed") {
await executeWorkflowRunTransition({
backend,
workflowRunId: workflowRun.id,
workerId,
transition: async () => {
await backend.failWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
error: serializedError,
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
attempts: workflowRun.attempts,
deadlineAt: workflowRun.deadlineAt,
});
},
});
await runTransition(() =>
backend.failWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
error: serializedError,
retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
attempts: workflowRun.attempts,
deadlineAt: workflowRun.deadlineAt,
}),
);
return;
}

Expand All @@ -1404,19 +1388,14 @@ export async function executeWorkflow(

const availableAt = retryDecision.availableAt;

await executeWorkflowRunTransition({
backend,
workflowRunId: workflowRun.id,
workerId,
transition: async () => {
await backend.rescheduleWorkflowRunAfterFailedStepAttempt({
workflowRunId: workflowRun.id,
workerId,
error: serializedError,
availableAt,
});
},
});
await runTransition(() =>
backend.rescheduleWorkflowRunAfterFailedStepAttempt({
workflowRunId: workflowRun.id,
workerId,
error: serializedError,
availableAt,
}),
);
return;
}

Expand All @@ -1425,20 +1404,15 @@ export async function executeWorkflow(
}

// mark failure
await executeWorkflowRunTransition({
backend,
workflowRunId: workflowRun.id,
workerId,
transition: async () => {
await backend.failWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
error: serializeError(error),
retryPolicy: params.retryPolicy,
attempts: workflowRun.attempts,
deadlineAt: workflowRun.deadlineAt,
});
},
});
await runTransition(() =>
backend.failWorkflowRun({
workflowRunId: workflowRun.id,
workerId,
error: serializeError(error),
retryPolicy: params.retryPolicy,
attempts: workflowRun.attempts,
deadlineAt: workflowRun.deadlineAt,
}),
);
}
}
Loading