diff --git a/package.json b/package.json index 07fece80..09225f3d 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,8 @@ "test:runtime-episode-contracts": "tsx tests/runtime-episode-contracts.test.ts", "test:runtime-neutral-contracts": "tsx tests/runtime-neutral-contracts.test.ts", "test:run-registry": "tsx tests/run-registry.test.ts", + "test:run-plan-executor": "tsx tests/run-plan-executor.test.ts", + "test:agent-fanout-executor": "tsx tests/agent-fanout-executor.test.ts", "test:runtime-php-snippets": "tsx tests/runtime-php-snippets.test.ts", "test:browser-runner-template": "tsx tests/browser-runner-template.test.ts", "test:editor-actions": "tsx tests/editor-actions.test.ts", diff --git a/packages/cli/src/agent-fanout.ts b/packages/cli/src/agent-fanout.ts index 9cc5e547..7e8fcbfb 100644 --- a/packages/cli/src/agent-fanout.ts +++ b/packages/cli/src/agent-fanout.ts @@ -1,6 +1,6 @@ import { appendFile, mkdir, readFile, writeFile } from "node:fs/promises" import { join } from "node:path" -import { commandArgValue, countRunPlanChildResults, createRunPlanEvent, FANOUT_EVENT_SCHEMA, FANOUT_PLAN_SCHEMA, FANOUT_REQUEST_SCHEMA, FANOUT_RESULT_SCHEMA, normalizeRunPlanConcurrency, normalizeRunPlanWorkerDescriptors, parseCommandJsonObject, runBoundedConcurrent, type FanoutLifecycleEvent, type FanoutRequestContract, type RunPlanWorkerDescriptor } from "@automattic/wp-codebox-core" +import { commandArgValue, createRunPlanEvent, executeRunPlan, FANOUT_EVENT_SCHEMA, FANOUT_PLAN_SCHEMA, FANOUT_REQUEST_SCHEMA, FANOUT_RESULT_SCHEMA, normalizeRunPlanConcurrency, normalizeRunPlanWorkerDescriptors, parseCommandJsonObject, type FanoutLifecycleEvent, type FanoutRequestContract, type RunPlanWorkerAdapter, type RunPlanWorkerDescriptor } from "@automattic/wp-codebox-core" import { agentTaskStatusSucceeded, aggregateFanoutOutputs, normalizeAgentTaskStatus, stripUndefined, type FanoutAggregationOutput } from "@automattic/wp-codebox-core/internals" import { runAgentTask, type AgentTaskRunInput, type AgentTaskRunOptions } from "./commands/agent-task-run.js" @@ -45,6 +45,8 @@ interface AgentFanoutWorkerResult { type AgentFanoutWorkerOutput = Record & { success?: boolean; evidence_refs?: unknown[]; error?: unknown } type AgentFanoutWorkerDescriptor = RunPlanWorkerDescriptor +type AgentFanoutWorkerExecutionResult = AgentFanoutWorkerResult & { workerId: string; success: boolean } +type AgentFanoutWorkerAdapter = RunPlanWorkerAdapter export async function executeAgentFanoutRequest(request: FanoutRequestContract, options: AgentFanoutExecutionOptions): Promise { if (request.schema && request.schema !== FANOUT_REQUEST_SCHEMA) { @@ -83,51 +85,15 @@ export async function executeAgentFanoutRequest(request: FanoutRequestContract, await writeJson(planPath, plan) await emitEvent(eventsPath, { event: "fanout.started", total: workers.length, active: 0, completed: 0, failed: 0, cancelled: 0 }) - const runWorker = options.runWorker ?? (async (input: AgentTaskRunInput, workerOptions: AgentTaskRunOptions): Promise => runAgentTask(input, workerOptions) as unknown as AgentFanoutWorkerOutput) - const workerResults = await runBoundedConcurrent(workers, concurrency, async (worker): Promise => { - const workerArtifacts = join(workersRoot, worker.id, "artifacts") - const childSessionId = `${sessionId}:${worker.id}` - await mkdir(workerArtifacts, { recursive: true }) - await emitEvent(eventsPath, { event: "worker.started", worker_id: worker.id }) - try { - const output = await runWorker(workerInput(request, worker, childSessionId, workerArtifacts), { - inputPath: "", - json: true, - previewHoldSeconds: options.previewHoldSeconds ?? "", - previewPublicUrl: options.previewPublicUrl ?? "", - }) - const status = normalizeAgentTaskStatus({ status: output.status, success: output.success }) - const success = agentTaskStatusSucceeded(status) - const resultRef = `fanout/workers/${worker.id}/result.json` - const workerResult = stripUndefined({ - worker_id: worker.id, - status, - required: worker.required, - session_id: childSessionId, - result_ref: resultRef, - artifact_refs: workerArtifactRefs(worker.id, output), - output, - ...(!success ? { error: { code: "worker-failed", message: stringValue(objectValue(output.error)?.message) || `Fanout worker ${worker.id} failed.` } } : {}), - }) as AgentFanoutWorkerResult - await writeJson(join(workersRoot, worker.id, "result.json"), workerResult) - await emitEvent(eventsPath, { event: success ? "worker.completed" : "worker.failed", worker_id: worker.id, status: workerResult.status }) - return workerResult - } catch (error) { - const resultRef = `fanout/workers/${worker.id}/result.json` - const workerResult = { - worker_id: worker.id, - status: "failed" as const, - required: worker.required, - session_id: childSessionId, - result_ref: resultRef, - artifact_refs: [], - error: { code: "worker-exception", message: error instanceof Error ? error.message : String(error) }, - } - await writeJson(join(workersRoot, worker.id, "result.json"), workerResult) - await emitEvent(eventsPath, { event: "worker.failed", worker_id: worker.id, status: "failed" }) - return workerResult - } + const execution = await executeRunPlan({ workers: request.workers, concurrency }, { + adapter: agentTaskFanoutWorkerAdapter(request, { ...options, workersRoot, sessionId }), + defaultAgent: stringValue(request.agent), + requireGoal: true, + onWorkerStarted: (worker) => emitEvent(eventsPath, { event: "worker.started", worker_id: worker.id }), + onWorkerCompleted: (worker, result) => emitEvent(eventsPath, { event: "worker.completed", worker_id: worker.id, status: result.status }), + onWorkerFailed: (worker, result) => emitEvent(eventsPath, { event: "worker.failed", worker_id: worker.id, status: result.status }), }) + const workerResults: AgentFanoutWorkerResult[] = execution.workers.map(({ success: _success, workerId: _workerId, ...result }) => result as unknown as AgentFanoutWorkerResult) await emitEvent(eventsPath, { event: "aggregation.started", total: workers.length, completed: workerResults.filter((worker) => agentTaskStatusSucceeded(worker.status)).length, failed: workerResults.filter((worker) => !agentTaskStatusSucceeded(worker.status)).length }) const aggregate = aggregateFanoutOutputs({ @@ -149,7 +115,7 @@ export async function executeAgentFanoutRequest(request: FanoutRequestContract, await writeJson(join(aggregateFinalRoot, "result.json"), aggregate) await emitEvent(eventsPath, { event: "aggregation.completed", status: aggregate.status }) - const counts = countRunPlanChildResults(workerResults.map((worker) => ({ ...worker, success: agentTaskStatusSucceeded(worker.status) }))) + const counts = execution.counts const success = aggregate.status === "succeeded" const result: AgentFanoutExecutionResult = { schema: FANOUT_RESULT_SCHEMA, @@ -195,7 +161,59 @@ async function fanoutRequestFromArgs(args: string[], recipeDirectory: string): P return parseCommandJsonObject(text, "wp-codebox.agent-fanout fanout-json") as FanoutRequestContract } -function workerInput(request: FanoutRequestContract, descriptor: AgentFanoutWorkerDescriptor, childSessionId: string, artifactsPath: string): AgentTaskRunInput { +function agentTaskFanoutWorkerAdapter(request: FanoutRequestContract, options: AgentFanoutExecutionOptions & { workersRoot: string; sessionId: string }): AgentFanoutWorkerAdapter { + const runWorker = options.runWorker ?? (async (input: AgentTaskRunInput, workerOptions: AgentTaskRunOptions): Promise => runAgentTask(input, workerOptions) as unknown as AgentFanoutWorkerOutput) + + return { + async run({ descriptor }) { + const workerArtifacts = join(options.workersRoot, descriptor.id, "artifacts") + const childSessionId = `${options.sessionId}:${descriptor.id}` + await mkdir(workerArtifacts, { recursive: true }) + try { + const output = await runWorker(agentTaskWorkerInput(request, descriptor, childSessionId, workerArtifacts), { + inputPath: "", + json: true, + previewHoldSeconds: options.previewHoldSeconds ?? "", + previewPublicUrl: options.previewPublicUrl ?? "", + }) + const status = normalizeAgentTaskStatus({ status: output.status, success: output.success }) + const success = agentTaskStatusSucceeded(status) + const resultRef = `fanout/workers/${descriptor.id}/result.json` + const workerResult = stripUndefined({ + workerId: descriptor.id, + success, + worker_id: descriptor.id, + status, + required: descriptor.required, + session_id: childSessionId, + result_ref: resultRef, + artifact_refs: workerArtifactRefs(descriptor.id, output), + output, + ...(!success ? { error: { code: "worker-failed", message: stringValue(objectValue(output.error)?.message) || `Fanout worker ${descriptor.id} failed.` } } : {}), + }) as AgentFanoutWorkerExecutionResult + await writeJson(join(options.workersRoot, descriptor.id, "result.json"), workerResult) + return workerResult + } catch (error) { + const resultRef = `fanout/workers/${descriptor.id}/result.json` + const workerResult = { + workerId: descriptor.id, + success: false, + worker_id: descriptor.id, + status: "failed" as const, + required: descriptor.required, + session_id: childSessionId, + result_ref: resultRef, + artifact_refs: [], + error: { code: "worker-exception", message: error instanceof Error ? error.message : String(error) }, + } + await writeJson(join(options.workersRoot, descriptor.id, "result.json"), workerResult) + return workerResult + } + }, + } +} + +function agentTaskWorkerInput(request: FanoutRequestContract, descriptor: AgentFanoutWorkerDescriptor, childSessionId: string, artifactsPath: string): AgentTaskRunInput { const worker = descriptor.worker const parentInput = objectValue(request.task_input) || objectValue(request.taskInput) || {} const workerTaskInput = objectValue(worker.task_input) || objectValue(worker.taskInput) || {} diff --git a/packages/runtime-core/src/run-plan.ts b/packages/runtime-core/src/run-plan.ts index 0e2eb7e8..99b45e29 100644 --- a/packages/runtime-core/src/run-plan.ts +++ b/packages/runtime-core/src/run-plan.ts @@ -77,6 +77,38 @@ export interface RunPlanResultCounts { cancelled: number } +export interface RunPlanWorkerExecution { + descriptor: RunPlanWorkerDescriptor + index: number +} + +export interface RunPlanWorkerResult extends RunPlanChildResult { + workerId: string + output?: TOutput + error?: { code: string; message: string } + [key: string]: unknown +} + +export type RunPlanWorkerResultLike = { workerId: string; success?: boolean; status?: string } + +export interface RunPlanWorkerAdapter { + run(execution: RunPlanWorkerExecution): Promise +} + +export interface RunPlanExecutorOptions extends RunPlanNormalizationOptions { + adapter: RunPlanWorkerAdapter + onWorkerStarted?: (descriptor: RunPlanWorkerDescriptor, index: number) => Promise | void + onWorkerCompleted?: (descriptor: RunPlanWorkerDescriptor, result: TResult, index: number) => Promise | void + onWorkerFailed?: (descriptor: RunPlanWorkerDescriptor, result: TResult, index: number) => Promise | void +} + +export interface RunPlanExecutorResult { + success: boolean + concurrency: number + counts: RunPlanResultCounts + workers: TResult[] +} + export function countRunPlanChildResults(results: RunPlanChildResult[]): RunPlanResultCounts { const completed = results.filter((result) => result.success === true).length const cancelled = results.filter((result) => result.status === "cancelled").length @@ -93,6 +125,29 @@ export function runPlanSucceeded(counts: Pick(plan: Pick, options: RunPlanExecutorOptions): Promise> { + const workers = normalizeRunPlanWorkerDescriptors(plan.workers as TWorker[], options) + const concurrency = normalizeRunPlanConcurrency(plan.concurrency, options) + const results = await runBoundedConcurrent(workers, concurrency, async (descriptor, index) => { + await options.onWorkerStarted?.(descriptor, index) + const result = await options.adapter.run({ descriptor, index }) + if (result.success === true) { + await options.onWorkerCompleted?.(descriptor, result, index) + } else { + await options.onWorkerFailed?.(descriptor, result, index) + } + return result + }) + const counts = countRunPlanChildResults(results) + + return { + success: runPlanSucceeded(counts), + concurrency, + counts, + workers: results, + } +} + export function normalizeRunPlanConcurrency(value: unknown, options: Pick = {}): number { const defaultConcurrency = Math.max(1, Math.floor(Number(options.defaultConcurrency) || 1)) const maxConcurrency = Math.max(1, Math.floor(Number(options.maxConcurrency) || Number.MAX_SAFE_INTEGER)) diff --git a/tests/agent-fanout-executor.test.ts b/tests/agent-fanout-executor.test.ts new file mode 100644 index 00000000..52b16fad --- /dev/null +++ b/tests/agent-fanout-executor.test.ts @@ -0,0 +1,51 @@ +import assert from "node:assert/strict" +import { readFile } from "node:fs/promises" +import { join } from "node:path" + +import { FANOUT_REQUEST_SCHEMA } from "../packages/runtime-core/src/index.js" +import { executeAgentFanoutRequest } from "../packages/cli/src/agent-fanout.js" +import { withTempDir } from "../scripts/test-kit.js" + +await withTempDir("wp-codebox-agent-fanout-executor-", async (root) => { + const result = await executeAgentFanoutRequest({ + schema: FANOUT_REQUEST_SCHEMA, + concurrency: 3, + agent: "sandbox-agent", + orchestrator: { session_id: "fanout-test" }, + workers: [ + { id: "one", goal: "Collect first result" }, + { id: "two", goal: "Collect second result" }, + ], + }, { + artifactRoot: root, + recipeDirectory: root, + runWorker: async (input) => ({ + success: true, + status: "succeeded", + evidence_refs: [{ path: `${input.artifacts_path}/result.json`, kind: "worker-result" }], + }), + previewHoldSeconds: "", + previewPublicUrl: "", + }) + + assert.equal(result.success, true) + assert.equal(result.concurrency, 3) + assert.deepEqual(result.counts, { total: 2, completed: 2, failed: 0, cancelled: 0 }) + assert.deepEqual(result.session.children.map((child) => child.id), ["fanout-test:one", "fanout-test:two"]) + assert.deepEqual(result.workers.map((worker) => worker.status), ["succeeded", "succeeded"]) + assert.equal(result.workers[0].artifact_refs[0].namespace, "workers/one") + + const events = (await readFile(join(root, "fanout", "events.jsonl"), "utf8")).trim().split("\n").map((line) => JSON.parse(line)) + assert.deepEqual(events.map((event) => event.event), [ + "fanout.started", + "worker.started", + "worker.started", + "worker.completed", + "worker.completed", + "aggregation.started", + "aggregation.completed", + "fanout.completed", + ]) +}) + +console.log("agent fanout executor ok") diff --git a/tests/run-plan-executor.test.ts b/tests/run-plan-executor.test.ts new file mode 100644 index 00000000..47f515d2 --- /dev/null +++ b/tests/run-plan-executor.test.ts @@ -0,0 +1,46 @@ +import assert from "node:assert/strict" + +import { executeRunPlan, type RunPlanWorkerAdapter } from "../packages/runtime-core/src/index.js" + +const events: string[] = [] +const adapter: RunPlanWorkerAdapter = { + async run({ descriptor }) { + return { + workerId: descriptor.id, + status: descriptor.id === "failed" ? "failed" : "succeeded", + success: descriptor.id !== "failed", + output: { + goal: descriptor.goal, + artifactNamespace: descriptor.artifactNamespace, + }, + } + }, +} + +const result = await executeRunPlan({ + concurrency: 10, + workers: [ + { id: "one", goal: "Collect first result" }, + { id: "failed", goal: "Collect failed result", artifact_namespace: "custom/failed" }, + { id: "two", goal: "Collect second result" }, + ], +}, { + adapter, + maxConcurrency: 2, + requireGoal: true, + onWorkerStarted: (worker) => events.push(`started:${worker.id}`), + onWorkerCompleted: (worker) => events.push(`completed:${worker.id}`), + onWorkerFailed: (worker) => events.push(`failed:${worker.id}`), +}) + +assert.equal(result.success, false) +assert.equal(result.concurrency, 2) +assert.deepEqual(result.counts, { total: 3, completed: 2, failed: 1, cancelled: 0 }) +assert.deepEqual(result.workers.map((worker) => worker.workerId), ["one", "failed", "two"]) +assert.equal(result.workers[1].output?.artifactNamespace, "custom/failed") +assert.deepEqual(events, ["started:one", "started:failed", "completed:one", "failed:failed", "started:two", "completed:two"]) + +await assert.rejects(executeRunPlan({ concurrency: 1, workers: [{ id: "missing-goal" }] }, { adapter, requireGoal: true }), /requires goal/) +await assert.rejects(executeRunPlan({ concurrency: 1, workers: [{ id: "duplicate", goal: "one" }, { id: "duplicate", goal: "two" }] }, { adapter }), /must be unique/) + +console.log("run plan executor ok")