diff --git a/packages/client/__tests__/messages-slice.test.ts b/packages/client/__tests__/messages-slice.test.ts index a80e25af..43503f12 100644 --- a/packages/client/__tests__/messages-slice.test.ts +++ b/packages/client/__tests__/messages-slice.test.ts @@ -1,7 +1,7 @@ import { describe, it, expect } from 'vitest'; import { messagesReducer, finishCurrent, INITIAL_MESSAGES_STATE } from '../src/slices/messages.js'; import type { MessagesState } from '../src/slices/messages.js'; -import type { FinishedBlock, StreamingMessage } from '@mitzo/protocol'; +import type { FinishedBlock, FinishedSubagentState, StreamingMessage } from '@mitzo/protocol'; const INITIAL = INITIAL_MESSAGES_STATE; @@ -1180,8 +1180,13 @@ describe('finishCurrent', () => { }; const finished = finishCurrent(current); - expect(finished.blocks[0].subagent).toBeDefined(); - expect(finished.blocks[0].subagent!.messageId).toBe('sub-msg-1'); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub).toBeDefined(); + expect(sub.messageId).toBe('sub-msg-1'); + // Verify streaming Map was converted to finished array + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].content).toBe('subagent output'); }); it('works normally when subagent field is absent', () => { diff --git a/packages/client/__tests__/subagent-reducer.test.ts b/packages/client/__tests__/subagent-reducer.test.ts index cc96a8ae..2910bc45 100644 --- a/packages/client/__tests__/subagent-reducer.test.ts +++ b/packages/client/__tests__/subagent-reducer.test.ts @@ -1,6 +1,10 @@ import { describe, it, expect } from 'vitest'; -import { messagesReducer, INITIAL_MESSAGES_STATE } from '../src/slices/messages.js'; -import type { StreamingBlock } from '@mitzo/protocol'; +import { messagesReducer, finishCurrent, INITIAL_MESSAGES_STATE } from '../src/slices/messages.js'; +import type { + StreamingBlock, + StreamingSubagentState, + FinishedSubagentState, +} from '@mitzo/protocol'; describe('Subagent Reducer Actions', () => { it('SUBAGENT_START initializes subagent state on parent tool block', () => { @@ -33,10 +37,11 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent).toBeDefined(); - expect(newState.current?.blocks.get('b1')?.subagent?.messageId).toBe('msg-sub-1'); - expect(newState.current?.blocks.get('b1')?.subagent?.running).toBe(true); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.size).toBe(0); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub).toBeDefined(); + expect(sub.messageId).toBe('msg-sub-1'); + expect(sub.running).toBe(true); + expect(sub.blocks.size).toBe(0); }); it('SUBAGENT_BLOCK_START adds block to subagent state', () => { @@ -76,11 +81,10 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.size).toBe(1); - expect(newState.current?.blocks.get('b1')?.subagent?.blockOrder).toEqual(['b-sub-1']); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-1')?.blockType).toBe( - 'thinking', - ); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.size).toBe(1); + expect(sub.blockOrder).toEqual(['b-sub-1']); + expect(sub.blocks.get('b-sub-1')?.blockType).toBe('thinking'); }); it('SUBAGENT_BLOCK_DELTA appends to subagent block content', () => { @@ -130,9 +134,8 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-1')?.content).toBe( - 'Hello world', - ); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.get('b-sub-1')?.content).toBe('Hello world'); }); it('SUBAGENT_BLOCK_END marks subagent block as done', () => { @@ -181,7 +184,8 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-1')?.done).toBe(true); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.get('b-sub-1')?.done).toBe(true); }); it('SUBAGENT_TOOL_RESULT patches tool result in subagent block', () => { @@ -234,12 +238,9 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-2')?.toolResult).toBe( - 'file contents', - ); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-2')?.toolError).toBe( - false, - ); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.get('b-sub-2')?.toolResult).toBe('file contents'); + expect(sub.blocks.get('b-sub-2')?.toolError).toBe(false); }); it('SUBAGENT_END finalizes subagent state with summary and usage', () => { @@ -294,10 +295,218 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - const subagent = newState.current?.blocks.get('b1')?.subagent; - expect(subagent?.running).toBeUndefined(); - expect(Array.isArray(subagent?.blocks)).toBe(true); - expect(subagent?.summary).toBe('Search complete'); - expect(subagent?.usage?.inputTokens).toBe(100); + const sub = newState.current?.blocks.get('b1')?.subagent as FinishedSubagentState; + expect(sub.running).toBeUndefined(); + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.summary).toBe('Search complete'); + expect(sub.usage?.inputTokens).toBe(100); + }); + + it('finishCurrent converts already-finished subagent via passthrough', () => { + // Simulate full lifecycle: SUBAGENT_START → blocks → SUBAGENT_END → MESSAGE_END + let state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Done', + done: true, + }, + ], + ]), + blockOrder: ['b-sub-1'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // End the subagent first + state = messagesReducer(state, { + type: 'SUBAGENT_END', + parentBlockId: 'b1', + summary: 'All done', + }); + + // Now finish the message — finishSubagent should passthrough the already-finished state + const finished = finishCurrent(state.current!); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub).toBeDefined(); + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].content).toBe('Done'); + expect(sub.summary).toBe('All done'); + }); + + it('finishCurrent converts still-streaming subagent to finished', () => { + // Edge case: MESSAGE_END arrives without SUBAGENT_END + const state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Partial', + done: false, + }, + ], + ]), + blockOrder: ['b-sub-1'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // Finish message without SUBAGENT_END — finishSubagent must convert Map→array + const finished = finishCurrent(state.current!); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub).toBeDefined(); + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].content).toBe('Partial'); + expect(sub.summary).toBeUndefined(); + }); + + it('finishSubagent skips stale blockOrder entries not in the Map', () => { + const state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + // blockOrder references 'ghost' which is NOT in the Map + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Real', + done: true, + }, + ], + ]), + blockOrder: ['b-sub-1', 'ghost'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // Should not throw — stale entry is filtered out + const finished = finishCurrent(state.current!); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].blockId).toBe('b-sub-1'); + }); + + it('drops streaming events after SUBAGENT_END (out-of-order delivery)', () => { + let state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Done', + done: true, + }, + ], + ]), + blockOrder: ['b-sub-1'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // End the subagent + state = messagesReducer(state, { + type: 'SUBAGENT_END', + parentBlockId: 'b1', + summary: 'Complete', + }); + + // Late-arriving streaming event should be silently dropped + const afterDelta = messagesReducer(state, { + type: 'SUBAGENT_BLOCK_DELTA', + parentBlockId: 'b1', + blockId: 'b-sub-1', + delta: ' extra', + }); + + // State unchanged — event was dropped by getStreamingSubagent guard + expect(afterDelta).toBe(state); }); }); diff --git a/packages/client/src/slices/messages.ts b/packages/client/src/slices/messages.ts index 5bb378bf..ee17be3e 100644 --- a/packages/client/src/slices/messages.ts +++ b/packages/client/src/slices/messages.ts @@ -13,6 +13,8 @@ import type { PermissionRequest, RawToolInput, BlockType, + StreamingSubagentState, + FinishedSubagentState, } from '@mitzo/protocol'; // ─── State ─────────────────────────────────────────────────────────────────── @@ -139,6 +141,39 @@ export type MessagesAction = // ─── Helpers ───────────────────────────────────────────────────────────────── +/** Narrow a block's subagent to StreamingSubagentState, or null if already finished. */ +function getStreamingSubagent(block: StreamingBlock): StreamingSubagentState | null { + if (!block.subagent || !('blockOrder' in block.subagent)) return null; + return block.subagent; +} + +function finishSubagent( + sub: StreamingSubagentState | FinishedSubagentState, +): FinishedSubagentState { + // Already finished (SUBAGENT_END already fired) + if (Array.isArray(sub.blocks)) return sub as FinishedSubagentState; + + // Still streaming — convert Map to FinishedBlock[] + const streaming = sub as StreamingSubagentState; + return { + messageId: streaming.messageId, + blocks: streaming.blockOrder + .map((blockId) => streaming.blocks.get(blockId)) + .filter((b): b is StreamingBlock => b != null) + .map((b) => ({ + blockId: b.blockId, + blockType: b.blockType, + content: b.content, + toolName: b.toolName, + toolId: b.toolId, + toolInput: b.toolInput, + rawInput: b.rawInput, + toolResult: b.toolResult, + toolError: b.toolError, + })), + }; +} + export function finishCurrent(current: StreamingMessage): FinishedMessage { const blocks: FinishedBlock[] = current.blockOrder.map((blockId) => { const b = current.blocks.get(blockId)!; @@ -152,7 +187,7 @@ export function finishCurrent(current: StreamingMessage): FinishedMessage { rawInput: b.rawInput, toolResult: b.toolResult, toolError: b.toolError, - subagent: b.subagent, + subagent: b.subagent ? finishSubagent(b.subagent) : undefined, }; }); return { messageId: current.messageId, role: 'assistant', blocks, timestamp: Date.now() }; @@ -527,7 +562,9 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_START': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; const newBlock: StreamingBlock = { blockId: action.blockId, @@ -537,16 +574,16 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M ...(action.toolName ? { toolName: action.toolName } : {}), }; - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(action.blockId, newBlock); const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, subagent: { - ...parentBlock.subagent, + ...sub, blocks: newSubBlocks, - blockOrder: [...parentBlock.subagent.blockOrder, action.blockId], + blockOrder: [...sub.blockOrder, action.blockId], }, }); @@ -556,12 +593,14 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_DELTA': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; - const subBlock = parentBlock.subagent.blocks.get(action.blockId); + const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(action.blockId, { ...subBlock, content: subBlock.content + action.delta, @@ -570,10 +609,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, - subagent: { - ...parentBlock.subagent, - blocks: newSubBlocks, - }, + subagent: { ...sub, blocks: newSubBlocks }, }); return { ...state, current: { ...state.current, blocks: newBlocks } }; @@ -582,12 +618,14 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_END': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; - const subBlock = parentBlock.subagent.blocks.get(action.blockId); + const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(action.blockId, { ...subBlock, done: true, @@ -600,10 +638,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, - subagent: { - ...parentBlock.subagent, - blocks: newSubBlocks, - }, + subagent: { ...sub, blocks: newSubBlocks }, }); return { ...state, current: { ...state.current, blocks: newBlocks } }; @@ -612,12 +647,14 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_TOOL_RESULT': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; // Find the tool block with matching toolId - for (const [blockId, subBlock] of parentBlock.subagent.blocks) { + for (const [blockId, subBlock] of sub.blocks) { if (subBlock.toolId === action.toolId) { - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(blockId, { ...subBlock, toolResult: action.result, @@ -627,10 +664,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, - subagent: { - ...parentBlock.subagent, - blocks: newSubBlocks, - }, + subagent: { ...sub, blocks: newSubBlocks }, }); return { ...state, current: { ...state.current, blocks: newBlocks } }; @@ -643,34 +677,33 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_END': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; // Convert streaming subagent state to finished state - const finishedBlocks: FinishedBlock[] = parentBlock.subagent.blockOrder.map((blockId) => { - const b = parentBlock.subagent!.blocks.get(blockId)!; - return { - blockId: b.blockId, - blockType: b.blockType, - content: b.content, - toolName: b.toolName, - toolId: b.toolId, - toolInput: b.toolInput, - rawInput: b.rawInput, - toolResult: b.toolResult, - toolError: b.toolError, - }; - }); + const finished: FinishedSubagentState = { + messageId: sub.messageId, + blocks: sub.blockOrder + .map((blockId) => sub.blocks.get(blockId)) + .filter((b): b is StreamingBlock => b != null) + .map((b) => ({ + blockId: b.blockId, + blockType: b.blockType, + content: b.content, + toolName: b.toolName, + toolId: b.toolId, + toolInput: b.toolInput, + rawInput: b.rawInput, + toolResult: b.toolResult, + toolError: b.toolError, + })), + summary: action.summary, + usage: action.usage, + }; const newBlocks = new Map(state.current.blocks); - newBlocks.set(action.parentBlockId, { - ...parentBlock, - subagent: { - messageId: parentBlock.subagent.messageId, - blocks: finishedBlocks, - summary: action.summary, - usage: action.usage, - }, - }); + newBlocks.set(action.parentBlockId, { ...parentBlock, subagent: finished }); return { ...state, current: { ...state.current, blocks: newBlocks } }; } diff --git a/packages/harness/__tests__/connection-registry.test.ts b/packages/harness/__tests__/connection-registry.test.ts index fb2ab27d..6c4cd851 100644 --- a/packages/harness/__tests__/connection-registry.test.ts +++ b/packages/harness/__tests__/connection-registry.test.ts @@ -485,9 +485,7 @@ describe('ConnectionRegistry', () => { it('still syncs all sessions when isSessionActive is not provided', async () => { vi.useFakeTimers(); const t = mockTransport(true); - const store = mockEventStore([ - { seq: 5, payload: { type: 'msg1' } }, - ]); + const store = mockEventStore([{ seq: 5, payload: { type: 'msg1' } }]); // No isSessionActive — backwards compatible registry.setEventStore(store); diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index 174b0ecc..f8fcc1c5 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -27,6 +27,10 @@ export type { SessionActivity, ServiceHealthStatus, ServiceHealthPayload, + StreamingSubagentState, + FinishedSubagentState, + SubagentState, + SubagentUsage, } from './types.js'; // Constants diff --git a/packages/protocol/src/types.ts b/packages/protocol/src/types.ts index 324a3cde..d784d656 100644 --- a/packages/protocol/src/types.ts +++ b/packages/protocol/src/types.ts @@ -50,7 +50,7 @@ export interface StreamingBlock { rawInput?: RawToolInput; toolResult?: string; toolError?: boolean; - subagent?: StreamingSubagentState; + subagent?: StreamingSubagentState | FinishedSubagentState; } export interface StreamingMessage { diff --git a/server/query-loop.ts b/server/query-loop.ts index d26ed244..3d1b6e76 100644 --- a/server/query-loop.ts +++ b/server/query-loop.ts @@ -675,7 +675,10 @@ async function _runQueryLoopInner( const index = evt.index as number; const blockId = nextBlockId(); const blockType = contentBlock?.type as string | undefined; - subagent.subagentBlockIdByIndex.set(index, { blockId, blockType: blockType ?? 'text' }); + subagent.subagentBlockIdByIndex.set(index, { + blockId, + blockType: blockType ?? 'text', + }); if (blockType === 'thinking' || blockType === 'redacted_thinking') { emit(