From 53246e33dbba77598c4d500767a4cd91ed9645fa Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 4 May 2026 10:24:34 +0100 Subject: [PATCH 1/4] fix(protocol,client): resolve subagent type mismatches breaking CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit StreamingBlock.subagent was typed as StreamingSubagentState only, but the SUBAGENT_END reducer replaces it with a FinishedSubagentState (subagent finishes while parent message is still streaming). Also, finishCurrent() copied the streaming subagent without conversion, and SubagentCard imported types not exported from @mitzo/protocol. - Widen StreamingBlock.subagent to accept both streaming and finished - Export FinishedSubagentState, StreamingSubagentState, SubagentState, SubagentUsage from @mitzo/protocol - Add finishSubagent() helper that handles both states (Map→array for streaming, passthrough for already-finished) - Cast to StreamingSubagentState in streaming reducer cases where the subagent is known to still be running Co-Authored-By: Claude Opus 4.6 --- packages/client/src/slices/messages.ts | 112 +++++++++++++++---------- packages/protocol/src/index.ts | 4 + packages/protocol/src/types.ts | 2 +- 3 files changed, 72 insertions(+), 46 deletions(-) diff --git a/packages/client/src/slices/messages.ts b/packages/client/src/slices/messages.ts index 5bb378bf..362ea104 100644 --- a/packages/client/src/slices/messages.ts +++ b/packages/client/src/slices/messages.ts @@ -13,6 +13,8 @@ import type { PermissionRequest, RawToolInput, BlockType, + StreamingSubagentState, + FinishedSubagentState, } from '@mitzo/protocol'; // ─── State ─────────────────────────────────────────────────────────────────── @@ -139,6 +141,33 @@ export type MessagesAction = // ─── Helpers ───────────────────────────────────────────────────────────────── +function finishSubagent( + sub: StreamingSubagentState | FinishedSubagentState, +): FinishedSubagentState { + // Already finished (SUBAGENT_END already fired) + if (Array.isArray(sub.blocks)) return sub as FinishedSubagentState; + + // Still streaming — convert Map to FinishedBlock[] + const streaming = sub as StreamingSubagentState; + return { + messageId: streaming.messageId, + blocks: streaming.blockOrder.map((blockId) => { + const b = streaming.blocks.get(blockId)!; + return { + blockId: b.blockId, + blockType: b.blockType, + content: b.content, + toolName: b.toolName, + toolId: b.toolId, + toolInput: b.toolInput, + rawInput: b.rawInput, + toolResult: b.toolResult, + toolError: b.toolError, + }; + }), + }; +} + export function finishCurrent(current: StreamingMessage): FinishedMessage { const blocks: FinishedBlock[] = current.blockOrder.map((blockId) => { const b = current.blocks.get(blockId)!; @@ -152,7 +181,7 @@ export function finishCurrent(current: StreamingMessage): FinishedMessage { rawInput: b.rawInput, toolResult: b.toolResult, toolError: b.toolError, - subagent: b.subagent, + subagent: b.subagent ? finishSubagent(b.subagent) : undefined, }; }); return { messageId: current.messageId, role: 'assistant', blocks, timestamp: Date.now() }; @@ -528,6 +557,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); if (!parentBlock?.subagent) return state; + const sub = parentBlock.subagent as StreamingSubagentState; const newBlock: StreamingBlock = { blockId: action.blockId, @@ -537,16 +567,16 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M ...(action.toolName ? { toolName: action.toolName } : {}), }; - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(action.blockId, newBlock); const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, subagent: { - ...parentBlock.subagent, + ...sub, blocks: newSubBlocks, - blockOrder: [...parentBlock.subagent.blockOrder, action.blockId], + blockOrder: [...sub.blockOrder, action.blockId], }, }); @@ -557,11 +587,12 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); if (!parentBlock?.subagent) return state; + const sub = parentBlock.subagent as StreamingSubagentState; - const subBlock = parentBlock.subagent.blocks.get(action.blockId); + const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(action.blockId, { ...subBlock, content: subBlock.content + action.delta, @@ -570,10 +601,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, - subagent: { - ...parentBlock.subagent, - blocks: newSubBlocks, - }, + subagent: { ...sub, blocks: newSubBlocks }, }); return { ...state, current: { ...state.current, blocks: newBlocks } }; @@ -583,11 +611,12 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); if (!parentBlock?.subagent) return state; + const sub = parentBlock.subagent as StreamingSubagentState; - const subBlock = parentBlock.subagent.blocks.get(action.blockId); + const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(action.blockId, { ...subBlock, done: true, @@ -600,10 +629,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, - subagent: { - ...parentBlock.subagent, - blocks: newSubBlocks, - }, + subagent: { ...sub, blocks: newSubBlocks }, }); return { ...state, current: { ...state.current, blocks: newBlocks } }; @@ -613,11 +639,12 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); if (!parentBlock?.subagent) return state; + const sub = parentBlock.subagent as StreamingSubagentState; // Find the tool block with matching toolId - for (const [blockId, subBlock] of parentBlock.subagent.blocks) { + for (const [blockId, subBlock] of sub.blocks) { if (subBlock.toolId === action.toolId) { - const newSubBlocks = new Map(parentBlock.subagent.blocks); + const newSubBlocks = new Map(sub.blocks); newSubBlocks.set(blockId, { ...subBlock, toolResult: action.result, @@ -627,10 +654,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M const newBlocks = new Map(state.current.blocks); newBlocks.set(action.parentBlockId, { ...parentBlock, - subagent: { - ...parentBlock.subagent, - blocks: newSubBlocks, - }, + subagent: { ...sub, blocks: newSubBlocks }, }); return { ...state, current: { ...state.current, blocks: newBlocks } }; @@ -644,33 +668,31 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); if (!parentBlock?.subagent) return state; + const sub = parentBlock.subagent as StreamingSubagentState; // Convert streaming subagent state to finished state - const finishedBlocks: FinishedBlock[] = parentBlock.subagent.blockOrder.map((blockId) => { - const b = parentBlock.subagent!.blocks.get(blockId)!; - return { - blockId: b.blockId, - blockType: b.blockType, - content: b.content, - toolName: b.toolName, - toolId: b.toolId, - toolInput: b.toolInput, - rawInput: b.rawInput, - toolResult: b.toolResult, - toolError: b.toolError, - }; - }); + const finished: FinishedSubagentState = { + messageId: sub.messageId, + blocks: sub.blockOrder.map((blockId) => { + const b = sub.blocks.get(blockId)!; + return { + blockId: b.blockId, + blockType: b.blockType, + content: b.content, + toolName: b.toolName, + toolId: b.toolId, + toolInput: b.toolInput, + rawInput: b.rawInput, + toolResult: b.toolResult, + toolError: b.toolError, + }; + }), + summary: action.summary, + usage: action.usage, + }; const newBlocks = new Map(state.current.blocks); - newBlocks.set(action.parentBlockId, { - ...parentBlock, - subagent: { - messageId: parentBlock.subagent.messageId, - blocks: finishedBlocks, - summary: action.summary, - usage: action.usage, - }, - }); + newBlocks.set(action.parentBlockId, { ...parentBlock, subagent: finished }); return { ...state, current: { ...state.current, blocks: newBlocks } }; } diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index 174b0ecc..f8fcc1c5 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -27,6 +27,10 @@ export type { SessionActivity, ServiceHealthStatus, ServiceHealthPayload, + StreamingSubagentState, + FinishedSubagentState, + SubagentState, + SubagentUsage, } from './types.js'; // Constants diff --git a/packages/protocol/src/types.ts b/packages/protocol/src/types.ts index 324a3cde..d784d656 100644 --- a/packages/protocol/src/types.ts +++ b/packages/protocol/src/types.ts @@ -50,7 +50,7 @@ export interface StreamingBlock { rawInput?: RawToolInput; toolResult?: string; toolError?: boolean; - subagent?: StreamingSubagentState; + subagent?: StreamingSubagentState | FinishedSubagentState; } export interface StreamingMessage { From 915af906be6d370ce0fac0fbb3945b29f4823d1a Mon Sep 17 00:00:00 2001 From: dimakis Date: Tue, 5 May 2026 21:38:42 +0100 Subject: [PATCH 2/4] fix(client): replace unsafe casts with narrowing guards, fix test types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Centaur review findings on PR #312: - Replace 5 `as StreamingSubagentState` casts with `'blockOrder' in` narrowing guards that safely bail if the subagent is already finished - Fix test type errors: cast to StreamingSubagentState/FinishedSubagentState in assertions so tests pass tsc --noEmit (not just esbuild) - Add two tests for finishSubagent: passthrough path (already-finished) and Map→array conversion path (still-streaming at MESSAGE_END) Co-Authored-By: Claude Opus 4.6 --- .../client/__tests__/subagent-reducer.test.ts | 158 +++++++++++++++--- packages/client/src/slices/messages.ts | 20 +-- 2 files changed, 142 insertions(+), 36 deletions(-) diff --git a/packages/client/__tests__/subagent-reducer.test.ts b/packages/client/__tests__/subagent-reducer.test.ts index cc96a8ae..fb3349d4 100644 --- a/packages/client/__tests__/subagent-reducer.test.ts +++ b/packages/client/__tests__/subagent-reducer.test.ts @@ -1,6 +1,10 @@ import { describe, it, expect } from 'vitest'; -import { messagesReducer, INITIAL_MESSAGES_STATE } from '../src/slices/messages.js'; -import type { StreamingBlock } from '@mitzo/protocol'; +import { messagesReducer, finishCurrent, INITIAL_MESSAGES_STATE } from '../src/slices/messages.js'; +import type { + StreamingBlock, + StreamingSubagentState, + FinishedSubagentState, +} from '@mitzo/protocol'; describe('Subagent Reducer Actions', () => { it('SUBAGENT_START initializes subagent state on parent tool block', () => { @@ -33,10 +37,11 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent).toBeDefined(); - expect(newState.current?.blocks.get('b1')?.subagent?.messageId).toBe('msg-sub-1'); - expect(newState.current?.blocks.get('b1')?.subagent?.running).toBe(true); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.size).toBe(0); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub).toBeDefined(); + expect(sub.messageId).toBe('msg-sub-1'); + expect(sub.running).toBe(true); + expect(sub.blocks.size).toBe(0); }); it('SUBAGENT_BLOCK_START adds block to subagent state', () => { @@ -76,11 +81,10 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.size).toBe(1); - expect(newState.current?.blocks.get('b1')?.subagent?.blockOrder).toEqual(['b-sub-1']); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-1')?.blockType).toBe( - 'thinking', - ); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.size).toBe(1); + expect(sub.blockOrder).toEqual(['b-sub-1']); + expect(sub.blocks.get('b-sub-1')?.blockType).toBe('thinking'); }); it('SUBAGENT_BLOCK_DELTA appends to subagent block content', () => { @@ -130,9 +134,8 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-1')?.content).toBe( - 'Hello world', - ); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.get('b-sub-1')?.content).toBe('Hello world'); }); it('SUBAGENT_BLOCK_END marks subagent block as done', () => { @@ -181,7 +184,8 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-1')?.done).toBe(true); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.get('b-sub-1')?.done).toBe(true); }); it('SUBAGENT_TOOL_RESULT patches tool result in subagent block', () => { @@ -234,12 +238,9 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-2')?.toolResult).toBe( - 'file contents', - ); - expect(newState.current?.blocks.get('b1')?.subagent?.blocks.get('b-sub-2')?.toolError).toBe( - false, - ); + const sub = newState.current?.blocks.get('b1')?.subagent as StreamingSubagentState; + expect(sub.blocks.get('b-sub-2')?.toolResult).toBe('file contents'); + expect(sub.blocks.get('b-sub-2')?.toolError).toBe(false); }); it('SUBAGENT_END finalizes subagent state with summary and usage', () => { @@ -294,10 +295,115 @@ describe('Subagent Reducer Actions', () => { const newState = messagesReducer(state, action); - const subagent = newState.current?.blocks.get('b1')?.subagent; - expect(subagent?.running).toBeUndefined(); - expect(Array.isArray(subagent?.blocks)).toBe(true); - expect(subagent?.summary).toBe('Search complete'); - expect(subagent?.usage?.inputTokens).toBe(100); + const sub = newState.current?.blocks.get('b1')?.subagent as FinishedSubagentState; + expect(sub.running).toBeUndefined(); + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.summary).toBe('Search complete'); + expect(sub.usage?.inputTokens).toBe(100); + }); + + it('finishCurrent converts already-finished subagent via passthrough', () => { + // Simulate full lifecycle: SUBAGENT_START → blocks → SUBAGENT_END → MESSAGE_END + let state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Done', + done: true, + }, + ], + ]), + blockOrder: ['b-sub-1'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // End the subagent first + state = messagesReducer(state, { + type: 'SUBAGENT_END', + parentBlockId: 'b1', + summary: 'All done', + }); + + // Now finish the message — finishSubagent should passthrough the already-finished state + const finished = finishCurrent(state.current!); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub).toBeDefined(); + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].content).toBe('Done'); + expect(sub.summary).toBe('All done'); + }); + + it('finishCurrent converts still-streaming subagent to finished', () => { + // Edge case: MESSAGE_END arrives without SUBAGENT_END + const state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Partial', + done: false, + }, + ], + ]), + blockOrder: ['b-sub-1'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // Finish message without SUBAGENT_END — finishSubagent must convert Map→array + const finished = finishCurrent(state.current!); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub).toBeDefined(); + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].content).toBe('Partial'); + expect(sub.summary).toBeUndefined(); }); }); diff --git a/packages/client/src/slices/messages.ts b/packages/client/src/slices/messages.ts index 362ea104..b18120c8 100644 --- a/packages/client/src/slices/messages.ts +++ b/packages/client/src/slices/messages.ts @@ -556,8 +556,8 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_START': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; - const sub = parentBlock.subagent as StreamingSubagentState; + if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; + const sub = parentBlock.subagent; const newBlock: StreamingBlock = { blockId: action.blockId, @@ -586,8 +586,8 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_DELTA': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; - const sub = parentBlock.subagent as StreamingSubagentState; + if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; + const sub = parentBlock.subagent; const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; @@ -610,8 +610,8 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_END': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; - const sub = parentBlock.subagent as StreamingSubagentState; + if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; + const sub = parentBlock.subagent; const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; @@ -638,8 +638,8 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_TOOL_RESULT': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; - const sub = parentBlock.subagent as StreamingSubagentState; + if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; + const sub = parentBlock.subagent; // Find the tool block with matching toolId for (const [blockId, subBlock] of sub.blocks) { @@ -667,8 +667,8 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_END': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent) return state; - const sub = parentBlock.subagent as StreamingSubagentState; + if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; + const sub = parentBlock.subagent; // Convert streaming subagent state to finished state const finished: FinishedSubagentState = { From d682400f441ee0b9eff62d158ece7ee70a4544d7 Mon Sep 17 00:00:00 2001 From: dimakis Date: Wed, 6 May 2026 06:45:59 +0100 Subject: [PATCH 3/4] style: fix prettier formatting in connection-registry test and query-loop Co-Authored-By: Claude Opus 4.6 --- packages/harness/__tests__/connection-registry.test.ts | 4 +--- server/query-loop.ts | 5 ++++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/harness/__tests__/connection-registry.test.ts b/packages/harness/__tests__/connection-registry.test.ts index fb2ab27d..6c4cd851 100644 --- a/packages/harness/__tests__/connection-registry.test.ts +++ b/packages/harness/__tests__/connection-registry.test.ts @@ -485,9 +485,7 @@ describe('ConnectionRegistry', () => { it('still syncs all sessions when isSessionActive is not provided', async () => { vi.useFakeTimers(); const t = mockTransport(true); - const store = mockEventStore([ - { seq: 5, payload: { type: 'msg1' } }, - ]); + const store = mockEventStore([{ seq: 5, payload: { type: 'msg1' } }]); // No isSessionActive — backwards compatible registry.setEventStore(store); diff --git a/server/query-loop.ts b/server/query-loop.ts index d26ed244..3d1b6e76 100644 --- a/server/query-loop.ts +++ b/server/query-loop.ts @@ -675,7 +675,10 @@ async function _runQueryLoopInner( const index = evt.index as number; const blockId = nextBlockId(); const blockType = contentBlock?.type as string | undefined; - subagent.subagentBlockIdByIndex.set(index, { blockId, blockType: blockType ?? 'text' }); + subagent.subagentBlockIdByIndex.set(index, { + blockId, + blockType: blockType ?? 'text', + }); if (blockType === 'thinking' || blockType === 'redacted_thinking') { emit( From b83443d8d5fdbf62c5c8b37eda0e3a12a6e0843e Mon Sep 17 00:00:00 2001 From: dimakis Date: Wed, 6 May 2026 06:50:29 +0100 Subject: [PATCH 4/4] refactor(client): extract getStreamingSubagent helper, defensive filters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Centaur review round 2: - Extract getStreamingSubagent() helper to DRY the 5 repeated 'blockOrder' in narrowing guards into a single function - Replace non-null assertions (!) with .filter(Boolean) in both finishSubagent and SUBAGENT_END — handles stale blockOrder entries - Add test: stale blockId in blockOrder is safely skipped - Add test: streaming events after SUBAGENT_END are silently dropped - Update existing finishCurrent test to verify Map→array conversion Co-Authored-By: Claude Opus 4.6 --- .../client/__tests__/messages-slice.test.ts | 11 +- .../client/__tests__/subagent-reducer.test.ts | 103 ++++++++++++++++++ packages/client/src/slices/messages.ts | 51 +++++---- 3 files changed, 142 insertions(+), 23 deletions(-) diff --git a/packages/client/__tests__/messages-slice.test.ts b/packages/client/__tests__/messages-slice.test.ts index a80e25af..43503f12 100644 --- a/packages/client/__tests__/messages-slice.test.ts +++ b/packages/client/__tests__/messages-slice.test.ts @@ -1,7 +1,7 @@ import { describe, it, expect } from 'vitest'; import { messagesReducer, finishCurrent, INITIAL_MESSAGES_STATE } from '../src/slices/messages.js'; import type { MessagesState } from '../src/slices/messages.js'; -import type { FinishedBlock, StreamingMessage } from '@mitzo/protocol'; +import type { FinishedBlock, FinishedSubagentState, StreamingMessage } from '@mitzo/protocol'; const INITIAL = INITIAL_MESSAGES_STATE; @@ -1180,8 +1180,13 @@ describe('finishCurrent', () => { }; const finished = finishCurrent(current); - expect(finished.blocks[0].subagent).toBeDefined(); - expect(finished.blocks[0].subagent!.messageId).toBe('sub-msg-1'); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub).toBeDefined(); + expect(sub.messageId).toBe('sub-msg-1'); + // Verify streaming Map was converted to finished array + expect(Array.isArray(sub.blocks)).toBe(true); + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].content).toBe('subagent output'); }); it('works normally when subagent field is absent', () => { diff --git a/packages/client/__tests__/subagent-reducer.test.ts b/packages/client/__tests__/subagent-reducer.test.ts index fb3349d4..2910bc45 100644 --- a/packages/client/__tests__/subagent-reducer.test.ts +++ b/packages/client/__tests__/subagent-reducer.test.ts @@ -406,4 +406,107 @@ describe('Subagent Reducer Actions', () => { expect(sub.blocks[0].content).toBe('Partial'); expect(sub.summary).toBeUndefined(); }); + + it('finishSubagent skips stale blockOrder entries not in the Map', () => { + const state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + // blockOrder references 'ghost' which is NOT in the Map + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Real', + done: true, + }, + ], + ]), + blockOrder: ['b-sub-1', 'ghost'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // Should not throw — stale entry is filtered out + const finished = finishCurrent(state.current!); + const sub = finished.blocks[0].subagent as FinishedSubagentState; + expect(sub.blocks).toHaveLength(1); + expect(sub.blocks[0].blockId).toBe('b-sub-1'); + }); + + it('drops streaming events after SUBAGENT_END (out-of-order delivery)', () => { + let state = { + ...INITIAL_MESSAGES_STATE, + current: { + messageId: 'msg-1', + blocks: new Map([ + [ + 'b1', + { + blockId: 'b1', + blockType: 'tool_use', + content: '', + done: true, + toolName: 'Agent', + toolId: 't1', + subagent: { + messageId: 'msg-sub-1', + blocks: new Map([ + [ + 'b-sub-1', + { + blockId: 'b-sub-1', + blockType: 'text', + content: 'Done', + done: true, + }, + ], + ]), + blockOrder: ['b-sub-1'], + running: true as const, + }, + }, + ], + ]), + blockOrder: ['b1'], + }, + }; + + // End the subagent + state = messagesReducer(state, { + type: 'SUBAGENT_END', + parentBlockId: 'b1', + summary: 'Complete', + }); + + // Late-arriving streaming event should be silently dropped + const afterDelta = messagesReducer(state, { + type: 'SUBAGENT_BLOCK_DELTA', + parentBlockId: 'b1', + blockId: 'b-sub-1', + delta: ' extra', + }); + + // State unchanged — event was dropped by getStreamingSubagent guard + expect(afterDelta).toBe(state); + }); }); diff --git a/packages/client/src/slices/messages.ts b/packages/client/src/slices/messages.ts index b18120c8..ee17be3e 100644 --- a/packages/client/src/slices/messages.ts +++ b/packages/client/src/slices/messages.ts @@ -141,6 +141,12 @@ export type MessagesAction = // ─── Helpers ───────────────────────────────────────────────────────────────── +/** Narrow a block's subagent to StreamingSubagentState, or null if already finished. */ +function getStreamingSubagent(block: StreamingBlock): StreamingSubagentState | null { + if (!block.subagent || !('blockOrder' in block.subagent)) return null; + return block.subagent; +} + function finishSubagent( sub: StreamingSubagentState | FinishedSubagentState, ): FinishedSubagentState { @@ -151,9 +157,10 @@ function finishSubagent( const streaming = sub as StreamingSubagentState; return { messageId: streaming.messageId, - blocks: streaming.blockOrder.map((blockId) => { - const b = streaming.blocks.get(blockId)!; - return { + blocks: streaming.blockOrder + .map((blockId) => streaming.blocks.get(blockId)) + .filter((b): b is StreamingBlock => b != null) + .map((b) => ({ blockId: b.blockId, blockType: b.blockType, content: b.content, @@ -163,8 +170,7 @@ function finishSubagent( rawInput: b.rawInput, toolResult: b.toolResult, toolError: b.toolError, - }; - }), + })), }; } @@ -556,8 +562,9 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_START': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; - const sub = parentBlock.subagent; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; const newBlock: StreamingBlock = { blockId: action.blockId, @@ -586,8 +593,9 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_DELTA': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; - const sub = parentBlock.subagent; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; @@ -610,8 +618,9 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_BLOCK_END': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; - const sub = parentBlock.subagent; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; const subBlock = sub.blocks.get(action.blockId); if (!subBlock) return state; @@ -638,8 +647,9 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_TOOL_RESULT': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; - const sub = parentBlock.subagent; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; // Find the tool block with matching toolId for (const [blockId, subBlock] of sub.blocks) { @@ -667,15 +677,17 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M case 'SUBAGENT_END': { if (!state.current) return state; const parentBlock = state.current.blocks.get(action.parentBlockId); - if (!parentBlock?.subagent || !('blockOrder' in parentBlock.subagent)) return state; - const sub = parentBlock.subagent; + if (!parentBlock) return state; + const sub = getStreamingSubagent(parentBlock); + if (!sub) return state; // Convert streaming subagent state to finished state const finished: FinishedSubagentState = { messageId: sub.messageId, - blocks: sub.blockOrder.map((blockId) => { - const b = sub.blocks.get(blockId)!; - return { + blocks: sub.blockOrder + .map((blockId) => sub.blocks.get(blockId)) + .filter((b): b is StreamingBlock => b != null) + .map((b) => ({ blockId: b.blockId, blockType: b.blockType, content: b.content, @@ -685,8 +697,7 @@ export function messagesReducer(state: MessagesState, action: MessagesAction): M rawInput: b.rawInput, toolResult: b.toolResult, toolError: b.toolError, - }; - }), + })), summary: action.summary, usage: action.usage, };