diff --git a/apps/server/src/runs/event-bridge.ts b/apps/server/src/runs/event-bridge.ts index bfe0b4c..f62cdab 100644 --- a/apps/server/src/runs/event-bridge.ts +++ b/apps/server/src/runs/event-bridge.ts @@ -1,4 +1,6 @@ import { + COORDINATOR_AGENT, + COORDINATOR_TASK_ID, deserializeTeamRunResult, type ForgePlanTask, type ForgeWorkflowEvent, @@ -54,7 +56,16 @@ export function applyForgeWorkflowEvent( return false } case 'plan': { - session.setPlanRecords(planTasksToRecords(event.data.tasks)) + const records = planTasksToRecords(event.data.tasks) + session.setPlanRecords(records) + publishTraceLine(session, { + runId: session.id, + at: Date.now(), + level: 'info', + agent: COORDINATOR_AGENT, + taskId: COORDINATOR_TASK_ID, + message: `Plan published: ${records.length} tasks`, + }) publishSnapshot(session) return false } diff --git a/apps/server/src/runs/session.ts b/apps/server/src/runs/session.ts index c7fa857..bb886ae 100644 --- a/apps/server/src/runs/session.ts +++ b/apps/server/src/runs/session.ts @@ -66,7 +66,7 @@ export class RunSession { } setPlan(tasks: readonly Task[]): void { - this.tasks = tasks.map(taskToRecord) + this.setPlanRecords(tasks.map(taskToRecord)) } setPlanRecords(tasks: TaskExecutionRecord[]): void { diff --git a/apps/server/tests/coordinator-trace.test.ts b/apps/server/tests/coordinator-trace.test.ts new file mode 100644 index 0000000..7e7d104 --- /dev/null +++ b/apps/server/tests/coordinator-trace.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from 'vitest' +import { filterTraceLinesForCoordinator, filterTraceLinesForTask } from '@oma-forge/shared' + +describe('filterTraceLinesForCoordinator', () => { + it('keeps coordinator agent and plan messages', () => { + const lines = [ + { + runId: 'r1', + at: 1, + level: 'stream' as const, + agent: 'researcher', + message: 'Finding sources…', + }, + { + runId: 'r1', + at: 2, + level: 'info' as const, + agent: 'coordinator', + message: 'Plan ready: 2 tasks (approved)', + }, + ] + expect(filterTraceLinesForCoordinator(lines).map((l) => l.message)).toEqual([ + 'Plan ready: 2 tasks (approved)', + ]) + }) +}) + +describe('filterTraceLinesForTask', () => { + it('excludes coordinator traces from worker nodes', () => { + const worker = { + id: 't1', + title: 'Research', + assignee: 'researcher', + status: 'in_progress' as const, + dependsOn: [], + } + const lines = [ + { + runId: 'r1', + at: 1, + level: 'info' as const, + agent: 'coordinator', + message: 'Agent started: coordinator', + }, + { + runId: 'r1', + at: 2, + level: 'stream' as const, + agent: 'researcher', + taskId: 't1', + message: 'Hello', + }, + ] + expect(filterTraceLinesForTask(lines, worker).map((l) => l.message)).toEqual(['Hello']) + }) +}) \ No newline at end of file diff --git a/apps/server/tests/runs-state.test.ts b/apps/server/tests/runs-state.test.ts index f9e023d..b89cf20 100644 --- a/apps/server/tests/runs-state.test.ts +++ b/apps/server/tests/runs-state.test.ts @@ -47,10 +47,10 @@ describe('RunSession', () => { ]) run.applyProgress({ type: 'task_start', task: 't1' }) - expect(run.toSnapshot().tasks[0]?.status).toBe('in_progress') + expect(run.toSnapshot().tasks.find((t) => t.id === 't1')?.status).toBe('in_progress') run.applyProgress({ type: 'task_complete', task: 't1' }) - expect(run.toSnapshot().tasks[0]?.status).toBe('completed') + expect(run.toSnapshot().tasks.find((t) => t.id === 't1')?.status).toBe('completed') }) it('creates tasks on task_start when no plan was received', () => { @@ -81,6 +81,27 @@ describe('RunSession', () => { ]) }) + it('does not add a coordinator task to the DAG snapshot', () => { + const run = new RunSession('run-1', 'Ship the feature') + run.applyProgress({ type: 'agent_start', agent: 'coordinator' }) + expect(run.toSnapshot().tasks).toEqual([]) + + run.setPlan([ + { + id: 't1', + title: 'Research', + status: 'pending', + description: 'Research', + dependsOn: [], + createdAt: new Date(), + updatedAt: new Date(), + }, + ]) + expect(run.toSnapshot().tasks).toEqual([ + { id: 't1', title: 'Research', status: 'pending', dependsOn: [] }, + ]) + }) + it('tracks short-circuit runs before the final result arrives', () => { const run = new RunSession('run-1', 'Quick ask') run.applyProgress({ diff --git a/apps/web/package.json b/apps/web/package.json index a929353..0ff2fe7 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -5,7 +5,7 @@ "license": "Apache-2.0", "type": "module", "scripts": { - "dev": "vite", + "dev": "npm run build -w @oma-forge/shared && vite", "build": "npm run build -w @oma-forge/shared && tsc --noEmit && vite build", "preview": "vite preview", "lint": "npm run build -w @oma-forge/shared && tsc --noEmit" diff --git a/apps/web/src/components/dashboard/CoordinatorPanel.tsx b/apps/web/src/components/dashboard/CoordinatorPanel.tsx new file mode 100644 index 0000000..80f1354 --- /dev/null +++ b/apps/web/src/components/dashboard/CoordinatorPanel.tsx @@ -0,0 +1,79 @@ +import { useEffect, useMemo, useRef, useState } from 'react' +import { + filterTraceLinesForCoordinator, + type ForgeTraceLine, + type RunStatus, +} from '@oma-forge/shared' +import { LiveOutput } from './LiveOutput.tsx' + +type CoordinatorPanelProps = { + readonly traceLines: readonly ForgeTraceLine[] + readonly runStatus?: RunStatus + readonly hasPlanTasks: boolean +} + +export function CoordinatorPanel({ + traceLines, + runStatus, + hasPlanTasks, +}: CoordinatorPanelProps) { + const [expanded, setExpanded] = useState(false) + const autoExpanded = useRef(false) + const coordinatorLines = useMemo( + () => filterTraceLinesForCoordinator(traceLines), + [traceLines], + ) + const hasActivity = coordinatorLines.length > 0 + const isRunning = runStatus === 'running' + const planningDone = hasPlanTasks || !isRunning + + useEffect(() => { + if (!hasActivity || autoExpanded.current) return + autoExpanded.current = true + setExpanded(true) + }, [hasActivity]) + + if (!isRunning && !hasActivity) { + return null + } + + return ( +
+ + {expanded ? ( +
+ +
+ ) : null} +
+ ) +} diff --git a/apps/web/src/components/dashboard/DetailsPanel.tsx b/apps/web/src/components/dashboard/DetailsPanel.tsx index 2fb705e..bd286b8 100644 --- a/apps/web/src/components/dashboard/DetailsPanel.tsx +++ b/apps/web/src/components/dashboard/DetailsPanel.tsx @@ -31,7 +31,8 @@ export function DetailsPanel({ return ( ) diff --git a/apps/web/src/components/dashboard/LiveOutput.tsx b/apps/web/src/components/dashboard/LiveOutput.tsx index 0556be2..d03063b 100644 --- a/apps/web/src/components/dashboard/LiveOutput.tsx +++ b/apps/web/src/components/dashboard/LiveOutput.tsx @@ -1,8 +1,13 @@ import type { ForgeTraceLine, TaskExecutionRecord, TraceLineLevel } from '@oma-forge/shared' +import { filterTraceLinesForTask } from '@oma-forge/shared' type LiveOutputProps = { readonly tasks: readonly TaskExecutionRecord[] readonly traceLines: readonly ForgeTraceLine[] + /** When set, only trace and status lines for this DAG node are shown. */ + readonly scopeTask?: TaskExecutionRecord | null + readonly variant?: 'default' | 'coordinator' + readonly planningDone?: boolean } const terminalStatuses = new Set(['completed', 'failed', 'skipped', 'blocked']) @@ -21,17 +26,46 @@ function formatTracePrefix(line: ForgeTraceLine): string { return parts.length > 0 ? `[${parts.join(':')}] ` : '' } -export function LiveOutput({ tasks, traceLines }: LiveOutputProps) { - const finished = tasks.every((task) => terminalStatuses.has(task.status)) - const visibleTrace = traceLines.slice(-80) +function systemMessage( + variant: 'default' | 'coordinator', + finished: boolean, + planningDone?: boolean, +): string { + if (variant === 'coordinator') { + if (finished || planningDone) { + return '[SYSTEM] Coordinator planning finished.' + } + return '[SYSTEM] Coordinator planning in progress.' + } + return finished + ? '[SYSTEM] Task graph execution finished.' + : '[SYSTEM] Task graph execution in progress.' +} + +export function LiveOutput({ + tasks, + traceLines, + scopeTask, + variant = 'default', + planningDone, +}: LiveOutputProps) { + const scopedTasks = scopeTask ? [scopeTask] : tasks + const finished = + variant === 'coordinator' + ? Boolean(planningDone) + : scopedTasks.every((task) => terminalStatuses.has(task.status)) + const filteredTrace = scopeTask + ? filterTraceLinesForTask(traceLines, scopeTask) + : traceLines + const visibleTrace = filteredTrace.slice(-80) return ( -
-

- {finished - ? '[SYSTEM] Task graph execution finished.' - : '[SYSTEM] Task graph execution in progress.'} -

+
+

{systemMessage(variant, finished, planningDone)}

{visibleTrace.map((line, index) => (

))} - {visibleTrace.length === 0 - ? tasks.map((task) => ( + {visibleTrace.length === 0 && variant === 'coordinator' ? ( +

Waiting for coordinator output…

+ ) : null} + {visibleTrace.length === 0 && variant !== 'coordinator' + ? scopedTasks.map((task) => (

[{task.assignee?.toUpperCase() ?? 'UNASSIGNED'}] {task.title} {'->'}{' '} diff --git a/apps/web/src/components/dashboard/TeamRunDashboard.tsx b/apps/web/src/components/dashboard/TeamRunDashboard.tsx index 460a800..2cb28b2 100644 --- a/apps/web/src/components/dashboard/TeamRunDashboard.tsx +++ b/apps/web/src/components/dashboard/TeamRunDashboard.tsx @@ -6,11 +6,11 @@ import type { RunStatus, TaskExecutionRecord, } from '@oma-forge/shared' +import { CoordinatorPanel } from './CoordinatorPanel.tsx' import { DagEdges } from './DagEdges.tsx' import { DagNode } from './DagNode.tsx' import { DagViewport } from './DagViewport.tsx' import { DetailsPanel } from './DetailsPanel.tsx' -import { LiveOutput } from './LiveOutput.tsx' type TeamRunDashboardProps = { readonly result: ForgeDashboardRun @@ -24,7 +24,6 @@ export function TeamRunDashboard({ result, traceLines, runStatus }: TeamRunDashb const layout = useMemo(() => layoutTasks(tasks), [tasks]) const [panelOpen, setPanelOpen] = useState(false) const [selected, setSelected] = useState(null) - const isRunning = runStatus === 'running' const handleSelect = useCallback((task: TaskExecutionRecord) => { setSelected(task) @@ -38,7 +37,13 @@ export function TeamRunDashboard({ result, traceLines, runStatus }: TeamRunDashb const handleBackdropClick = useCallback( (e: React.MouseEvent) => { const target = e.target as HTMLElement - if (target.closest('.node') || target.closest('aside')) return + if ( + target.closest('.node') || + target.closest('[data-collateral]') || + target.closest('[data-node-details]') + ) { + return + } setPanelOpen(false) }, [], @@ -47,14 +52,14 @@ export function TeamRunDashboard({ result, traceLines, runStatus }: TeamRunDashb return ( <>

-
+
- {tasks.length === 0 && isRunning ? ( + {tasks.length === 0 && runStatus === 'running' ? (
Waiting for task plan…
@@ -75,23 +80,23 @@ export function TeamRunDashboard({ result, traceLines, runStatus }: TeamRunDashb })}
- {isRunning ? ( -
-

- Live output -

- -
- ) : null}
- + +
+ 0} + /> + +
traceLineMatchesTask(line, task)) +}