Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
"test:runtime-episode-contracts": "tsx tests/runtime-episode-contracts.test.ts",
"test:runtime-neutral-contracts": "tsx tests/runtime-neutral-contracts.test.ts",
"test:run-registry": "tsx tests/run-registry.test.ts",
"test:run-plan-executor": "tsx tests/run-plan-executor.test.ts",
"test:agent-fanout-executor": "tsx tests/agent-fanout-executor.test.ts",
"test:runtime-php-snippets": "tsx tests/runtime-php-snippets.test.ts",
"test:browser-runner-template": "tsx tests/browser-runner-template.test.ts",
"test:editor-actions": "tsx tests/editor-actions.test.ts",
Expand Down
112 changes: 65 additions & 47 deletions packages/cli/src/agent-fanout.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { appendFile, mkdir, readFile, writeFile } from "node:fs/promises"
import { join } from "node:path"
import { commandArgValue, countRunPlanChildResults, createRunPlanEvent, FANOUT_EVENT_SCHEMA, FANOUT_PLAN_SCHEMA, FANOUT_REQUEST_SCHEMA, FANOUT_RESULT_SCHEMA, normalizeRunPlanConcurrency, normalizeRunPlanWorkerDescriptors, parseCommandJsonObject, runBoundedConcurrent, type FanoutLifecycleEvent, type FanoutRequestContract, type RunPlanWorkerDescriptor } from "@automattic/wp-codebox-core"
import { commandArgValue, createRunPlanEvent, executeRunPlan, FANOUT_EVENT_SCHEMA, FANOUT_PLAN_SCHEMA, FANOUT_REQUEST_SCHEMA, FANOUT_RESULT_SCHEMA, normalizeRunPlanConcurrency, normalizeRunPlanWorkerDescriptors, parseCommandJsonObject, type FanoutLifecycleEvent, type FanoutRequestContract, type RunPlanWorkerAdapter, type RunPlanWorkerDescriptor } from "@automattic/wp-codebox-core"
import { agentTaskStatusSucceeded, aggregateFanoutOutputs, normalizeAgentTaskStatus, stripUndefined, type FanoutAggregationOutput } from "@automattic/wp-codebox-core/internals"
import { runAgentTask, type AgentTaskRunInput, type AgentTaskRunOptions } from "./commands/agent-task-run.js"

Expand Down Expand Up @@ -45,6 +45,8 @@ interface AgentFanoutWorkerResult {

type AgentFanoutWorkerOutput = Record<string, unknown> & { success?: boolean; evidence_refs?: unknown[]; error?: unknown }
type AgentFanoutWorkerDescriptor = RunPlanWorkerDescriptor<FanoutRequestContract["workers"][number]>
type AgentFanoutWorkerExecutionResult = AgentFanoutWorkerResult & { workerId: string; success: boolean }
type AgentFanoutWorkerAdapter = RunPlanWorkerAdapter<FanoutRequestContract["workers"][number], AgentFanoutWorkerExecutionResult>

export async function executeAgentFanoutRequest(request: FanoutRequestContract, options: AgentFanoutExecutionOptions): Promise<AgentFanoutExecutionResult> {
if (request.schema && request.schema !== FANOUT_REQUEST_SCHEMA) {
Expand Down Expand Up @@ -83,51 +85,15 @@ export async function executeAgentFanoutRequest(request: FanoutRequestContract,
await writeJson(planPath, plan)
await emitEvent(eventsPath, { event: "fanout.started", total: workers.length, active: 0, completed: 0, failed: 0, cancelled: 0 })

const runWorker = options.runWorker ?? (async (input: AgentTaskRunInput, workerOptions: AgentTaskRunOptions): Promise<AgentFanoutWorkerOutput> => runAgentTask(input, workerOptions) as unknown as AgentFanoutWorkerOutput)
const workerResults = await runBoundedConcurrent(workers, concurrency, async (worker): Promise<AgentFanoutWorkerResult> => {
const workerArtifacts = join(workersRoot, worker.id, "artifacts")
const childSessionId = `${sessionId}:${worker.id}`
await mkdir(workerArtifacts, { recursive: true })
await emitEvent(eventsPath, { event: "worker.started", worker_id: worker.id })
try {
const output = await runWorker(workerInput(request, worker, childSessionId, workerArtifacts), {
inputPath: "",
json: true,
previewHoldSeconds: options.previewHoldSeconds ?? "",
previewPublicUrl: options.previewPublicUrl ?? "",
})
const status = normalizeAgentTaskStatus({ status: output.status, success: output.success })
const success = agentTaskStatusSucceeded(status)
const resultRef = `fanout/workers/${worker.id}/result.json`
const workerResult = stripUndefined({
worker_id: worker.id,
status,
required: worker.required,
session_id: childSessionId,
result_ref: resultRef,
artifact_refs: workerArtifactRefs(worker.id, output),
output,
...(!success ? { error: { code: "worker-failed", message: stringValue(objectValue(output.error)?.message) || `Fanout worker ${worker.id} failed.` } } : {}),
}) as AgentFanoutWorkerResult
await writeJson(join(workersRoot, worker.id, "result.json"), workerResult)
await emitEvent(eventsPath, { event: success ? "worker.completed" : "worker.failed", worker_id: worker.id, status: workerResult.status })
return workerResult
} catch (error) {
const resultRef = `fanout/workers/${worker.id}/result.json`
const workerResult = {
worker_id: worker.id,
status: "failed" as const,
required: worker.required,
session_id: childSessionId,
result_ref: resultRef,
artifact_refs: [],
error: { code: "worker-exception", message: error instanceof Error ? error.message : String(error) },
}
await writeJson(join(workersRoot, worker.id, "result.json"), workerResult)
await emitEvent(eventsPath, { event: "worker.failed", worker_id: worker.id, status: "failed" })
return workerResult
}
const execution = await executeRunPlan({ workers: request.workers, concurrency }, {
adapter: agentTaskFanoutWorkerAdapter(request, { ...options, workersRoot, sessionId }),
defaultAgent: stringValue(request.agent),
requireGoal: true,
onWorkerStarted: (worker) => emitEvent(eventsPath, { event: "worker.started", worker_id: worker.id }),
onWorkerCompleted: (worker, result) => emitEvent(eventsPath, { event: "worker.completed", worker_id: worker.id, status: result.status }),
onWorkerFailed: (worker, result) => emitEvent(eventsPath, { event: "worker.failed", worker_id: worker.id, status: result.status }),
})
const workerResults: AgentFanoutWorkerResult[] = execution.workers.map(({ success: _success, workerId: _workerId, ...result }) => result as unknown as AgentFanoutWorkerResult)

await emitEvent(eventsPath, { event: "aggregation.started", total: workers.length, completed: workerResults.filter((worker) => agentTaskStatusSucceeded(worker.status)).length, failed: workerResults.filter((worker) => !agentTaskStatusSucceeded(worker.status)).length })
const aggregate = aggregateFanoutOutputs({
Expand All @@ -149,7 +115,7 @@ export async function executeAgentFanoutRequest(request: FanoutRequestContract,
await writeJson(join(aggregateFinalRoot, "result.json"), aggregate)
await emitEvent(eventsPath, { event: "aggregation.completed", status: aggregate.status })

const counts = countRunPlanChildResults(workerResults.map((worker) => ({ ...worker, success: agentTaskStatusSucceeded(worker.status) })))
const counts = execution.counts
const success = aggregate.status === "succeeded"
const result: AgentFanoutExecutionResult = {
schema: FANOUT_RESULT_SCHEMA,
Expand Down Expand Up @@ -195,7 +161,59 @@ async function fanoutRequestFromArgs(args: string[], recipeDirectory: string): P
return parseCommandJsonObject(text, "wp-codebox.agent-fanout fanout-json") as FanoutRequestContract
}

function workerInput(request: FanoutRequestContract, descriptor: AgentFanoutWorkerDescriptor, childSessionId: string, artifactsPath: string): AgentTaskRunInput {
function agentTaskFanoutWorkerAdapter(request: FanoutRequestContract, options: AgentFanoutExecutionOptions & { workersRoot: string; sessionId: string }): AgentFanoutWorkerAdapter {
const runWorker = options.runWorker ?? (async (input: AgentTaskRunInput, workerOptions: AgentTaskRunOptions): Promise<AgentFanoutWorkerOutput> => runAgentTask(input, workerOptions) as unknown as AgentFanoutWorkerOutput)

return {
async run({ descriptor }) {
const workerArtifacts = join(options.workersRoot, descriptor.id, "artifacts")
const childSessionId = `${options.sessionId}:${descriptor.id}`
await mkdir(workerArtifacts, { recursive: true })
try {
const output = await runWorker(agentTaskWorkerInput(request, descriptor, childSessionId, workerArtifacts), {
inputPath: "",
json: true,
previewHoldSeconds: options.previewHoldSeconds ?? "",
previewPublicUrl: options.previewPublicUrl ?? "",
})
const status = normalizeAgentTaskStatus({ status: output.status, success: output.success })
const success = agentTaskStatusSucceeded(status)
const resultRef = `fanout/workers/${descriptor.id}/result.json`
const workerResult = stripUndefined({
workerId: descriptor.id,
success,
worker_id: descriptor.id,
status,
required: descriptor.required,
session_id: childSessionId,
result_ref: resultRef,
artifact_refs: workerArtifactRefs(descriptor.id, output),
output,
...(!success ? { error: { code: "worker-failed", message: stringValue(objectValue(output.error)?.message) || `Fanout worker ${descriptor.id} failed.` } } : {}),
}) as AgentFanoutWorkerExecutionResult
await writeJson(join(options.workersRoot, descriptor.id, "result.json"), workerResult)
return workerResult
} catch (error) {
const resultRef = `fanout/workers/${descriptor.id}/result.json`
const workerResult = {
workerId: descriptor.id,
success: false,
worker_id: descriptor.id,
status: "failed" as const,
required: descriptor.required,
session_id: childSessionId,
result_ref: resultRef,
artifact_refs: [],
error: { code: "worker-exception", message: error instanceof Error ? error.message : String(error) },
}
await writeJson(join(options.workersRoot, descriptor.id, "result.json"), workerResult)
return workerResult
}
},
}
}

function agentTaskWorkerInput(request: FanoutRequestContract, descriptor: AgentFanoutWorkerDescriptor, childSessionId: string, artifactsPath: string): AgentTaskRunInput {
const worker = descriptor.worker
const parentInput = objectValue(request.task_input) || objectValue(request.taskInput) || {}
const workerTaskInput = objectValue(worker.task_input) || objectValue(worker.taskInput) || {}
Expand Down
55 changes: 55 additions & 0 deletions packages/runtime-core/src/run-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,38 @@ export interface RunPlanResultCounts {
cancelled: number
}

export interface RunPlanWorkerExecution<TWorker extends RunPlanWorkerContract = RunPlanWorkerContract> {
descriptor: RunPlanWorkerDescriptor<TWorker>
index: number
}

export interface RunPlanWorkerResult<TOutput = unknown> extends RunPlanChildResult {
workerId: string
output?: TOutput
error?: { code: string; message: string }
[key: string]: unknown
}

export type RunPlanWorkerResultLike = { workerId: string; success?: boolean; status?: string }

export interface RunPlanWorkerAdapter<TWorker extends RunPlanWorkerContract = RunPlanWorkerContract, TResult extends RunPlanWorkerResultLike = RunPlanWorkerResult> {
run(execution: RunPlanWorkerExecution<TWorker>): Promise<TResult>
}

export interface RunPlanExecutorOptions<TWorker extends RunPlanWorkerContract = RunPlanWorkerContract, TResult extends RunPlanWorkerResultLike = RunPlanWorkerResult> extends RunPlanNormalizationOptions {
adapter: RunPlanWorkerAdapter<TWorker, TResult>
onWorkerStarted?: (descriptor: RunPlanWorkerDescriptor<TWorker>, index: number) => Promise<void> | void
onWorkerCompleted?: (descriptor: RunPlanWorkerDescriptor<TWorker>, result: TResult, index: number) => Promise<void> | void
onWorkerFailed?: (descriptor: RunPlanWorkerDescriptor<TWorker>, result: TResult, index: number) => Promise<void> | void
}

export interface RunPlanExecutorResult<TResult extends RunPlanWorkerResultLike = RunPlanWorkerResult> {
success: boolean
concurrency: number
counts: RunPlanResultCounts
workers: TResult[]
}

export function countRunPlanChildResults(results: RunPlanChildResult[]): RunPlanResultCounts {
const completed = results.filter((result) => result.success === true).length
const cancelled = results.filter((result) => result.status === "cancelled").length
Expand All @@ -93,6 +125,29 @@ export function runPlanSucceeded(counts: Pick<RunPlanResultCounts, "failed" | "c
return counts.failed === 0 && counts.cancelled === 0
}

export async function executeRunPlan<TWorker extends RunPlanWorkerContract, TResult extends RunPlanWorkerResultLike>(plan: Pick<RunPlanContract, "workers" | "concurrency">, options: RunPlanExecutorOptions<TWorker, TResult>): Promise<RunPlanExecutorResult<TResult>> {
const workers = normalizeRunPlanWorkerDescriptors(plan.workers as TWorker[], options)
const concurrency = normalizeRunPlanConcurrency(plan.concurrency, options)
const results = await runBoundedConcurrent(workers, concurrency, async (descriptor, index) => {
await options.onWorkerStarted?.(descriptor, index)
const result = await options.adapter.run({ descriptor, index })
if (result.success === true) {
await options.onWorkerCompleted?.(descriptor, result, index)
} else {
await options.onWorkerFailed?.(descriptor, result, index)
}
return result
})
const counts = countRunPlanChildResults(results)

return {
success: runPlanSucceeded(counts),
concurrency,
counts,
workers: results,
}
}

export function normalizeRunPlanConcurrency(value: unknown, options: Pick<RunPlanNormalizationOptions, "defaultConcurrency" | "maxConcurrency" | "concurrencyMode"> = {}): number {
const defaultConcurrency = Math.max(1, Math.floor(Number(options.defaultConcurrency) || 1))
const maxConcurrency = Math.max(1, Math.floor(Number(options.maxConcurrency) || Number.MAX_SAFE_INTEGER))
Expand Down
51 changes: 51 additions & 0 deletions tests/agent-fanout-executor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import assert from "node:assert/strict"
import { readFile } from "node:fs/promises"
import { join } from "node:path"

import { FANOUT_REQUEST_SCHEMA } from "../packages/runtime-core/src/index.js"
import { executeAgentFanoutRequest } from "../packages/cli/src/agent-fanout.js"
import { withTempDir } from "../scripts/test-kit.js"

await withTempDir("wp-codebox-agent-fanout-executor-", async (root) => {
const result = await executeAgentFanoutRequest({
schema: FANOUT_REQUEST_SCHEMA,
concurrency: 3,
agent: "sandbox-agent",
orchestrator: { session_id: "fanout-test" },
workers: [
{ id: "one", goal: "Collect first result" },
{ id: "two", goal: "Collect second result" },
],
}, {
artifactRoot: root,
recipeDirectory: root,
runWorker: async (input) => ({
success: true,
status: "succeeded",
evidence_refs: [{ path: `${input.artifacts_path}/result.json`, kind: "worker-result" }],
}),
previewHoldSeconds: "",
previewPublicUrl: "",
})

assert.equal(result.success, true)
assert.equal(result.concurrency, 3)
assert.deepEqual(result.counts, { total: 2, completed: 2, failed: 0, cancelled: 0 })
assert.deepEqual(result.session.children.map((child) => child.id), ["fanout-test:one", "fanout-test:two"])
assert.deepEqual(result.workers.map((worker) => worker.status), ["succeeded", "succeeded"])
assert.equal(result.workers[0].artifact_refs[0].namespace, "workers/one")

const events = (await readFile(join(root, "fanout", "events.jsonl"), "utf8")).trim().split("\n").map((line) => JSON.parse(line))
assert.deepEqual(events.map((event) => event.event), [
"fanout.started",
"worker.started",
"worker.started",
"worker.completed",
"worker.completed",
"aggregation.started",
"aggregation.completed",
"fanout.completed",
])
})

console.log("agent fanout executor ok")
46 changes: 46 additions & 0 deletions tests/run-plan-executor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import assert from "node:assert/strict"

import { executeRunPlan, type RunPlanWorkerAdapter } from "../packages/runtime-core/src/index.js"

const events: string[] = []
const adapter: RunPlanWorkerAdapter = {
async run({ descriptor }) {
return {
workerId: descriptor.id,
status: descriptor.id === "failed" ? "failed" : "succeeded",
success: descriptor.id !== "failed",
output: {
goal: descriptor.goal,
artifactNamespace: descriptor.artifactNamespace,
},
}
},
}

const result = await executeRunPlan({
concurrency: 10,
workers: [
{ id: "one", goal: "Collect first result" },
{ id: "failed", goal: "Collect failed result", artifact_namespace: "custom/failed" },
{ id: "two", goal: "Collect second result" },
],
}, {
adapter,
maxConcurrency: 2,
requireGoal: true,
onWorkerStarted: (worker) => events.push(`started:${worker.id}`),
onWorkerCompleted: (worker) => events.push(`completed:${worker.id}`),
onWorkerFailed: (worker) => events.push(`failed:${worker.id}`),
})

assert.equal(result.success, false)
assert.equal(result.concurrency, 2)
assert.deepEqual(result.counts, { total: 3, completed: 2, failed: 1, cancelled: 0 })
assert.deepEqual(result.workers.map((worker) => worker.workerId), ["one", "failed", "two"])
assert.equal(result.workers[1].output?.artifactNamespace, "custom/failed")
assert.deepEqual(events, ["started:one", "started:failed", "completed:one", "failed:failed", "started:two", "completed:two"])

await assert.rejects(executeRunPlan({ concurrency: 1, workers: [{ id: "missing-goal" }] }, { adapter, requireGoal: true }), /requires goal/)
await assert.rejects(executeRunPlan({ concurrency: 1, workers: [{ id: "duplicate", goal: "one" }, { id: "duplicate", goal: "two" }] }, { adapter }), /must be unique/)

console.log("run plan executor ok")