Skip to content

Commit 7ef2843

Browse files
dimakisclaude
andcommitted
fix: flush pending message_end on new turn, finalize orphaned current
In multi-turn streaming-input sessions, message_start for turn N+1 could arrive before the deferred message_end for turn N flushed. The message_start handler reset pendingMessageEnd to null, silently dropping turn N's message_end. On the frontend, MESSAGE_START overwrote current without finalizing it into messages[]. Server: extract forceFlushPendingMessage() helper that force-closes open blocks and emits the pending message_end. Called from both the message_start handler (before resetting turn state) and the result handler (before session_end). DRYs the duplicated flush logic. Frontend: MESSAGE_START now finalizes any orphaned current into messages[] before creating the new streaming message. Belt-and- suspenders against any remaining server ordering edge cases. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Made-with: Cursor
1 parent 5b30415 commit 7ef2843

4 files changed

Lines changed: 158 additions & 57 deletions

File tree

frontend/src/hooks/__tests__/chatMessagesReducer.test.ts

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,35 @@ describe('MESSAGE_START', () => {
2323
expect(state.current!.blocks.size).toBe(0);
2424
});
2525

26-
it('replaces an existing current when a new message starts', () => {
27-
const withCurrent = chatMessagesReducer(INITIAL, { type: 'MESSAGE_START', messageId: 'msg-1' });
28-
const replaced = chatMessagesReducer(withCurrent, {
29-
type: 'MESSAGE_START',
30-
messageId: 'msg-2',
31-
});
32-
expect(replaced.current!.messageId).toBe('msg-2');
33-
expect(replaced.current!.blockOrder).toHaveLength(0);
26+
it('finalizes orphaned current into messages when a new message starts', () => {
27+
let state = chatMessagesReducer(INITIAL, { type: 'MESSAGE_START', messageId: 'msg-1' });
28+
state = chatMessagesReducer(state, {
29+
type: 'BLOCK_START',
30+
messageId: 'msg-1',
31+
blockId: 'b1',
32+
blockType: 'text',
33+
});
34+
state = chatMessagesReducer(state, {
35+
type: 'BLOCK_DELTA',
36+
messageId: 'msg-1',
37+
blockId: 'b1',
38+
blockType: 'text',
39+
delta: 'orphaned content',
40+
});
41+
// New message starts before old one got MESSAGE_END
42+
state = chatMessagesReducer(state, { type: 'MESSAGE_START', messageId: 'msg-2' });
43+
expect(state.current!.messageId).toBe('msg-2');
44+
expect(state.current!.blockOrder).toHaveLength(0);
45+
expect(state.messages).toHaveLength(1);
46+
expect(state.messages[0].messageId).toBe('msg-1');
47+
expect(state.messages[0].blocks[0].content).toBe('orphaned content');
48+
});
49+
50+
it('creates new current cleanly when no prior current exists', () => {
51+
const state = chatMessagesReducer(INITIAL, { type: 'MESSAGE_START', messageId: 'msg-2' });
52+
expect(state.current!.messageId).toBe('msg-2');
53+
expect(state.current!.blockOrder).toHaveLength(0);
54+
expect(state.messages).toHaveLength(0);
3455
});
3556
});
3657

frontend/src/hooks/useChatMessages.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,16 @@ export function chatMessagesReducer(
124124
): ChatMessagesState {
125125
switch (action.type) {
126126
case 'MESSAGE_START': {
127-
const block = new Map<string, StreamingBlock>();
127+
const base = state.current
128+
? { ...state, messages: [...state.messages, finishCurrent(state.current)] }
129+
: state;
128130
return {
129-
...state,
130-
current: { messageId: action.messageId, blocks: block, blockOrder: [] },
131+
...base,
132+
current: {
133+
messageId: action.messageId,
134+
blocks: new Map<string, StreamingBlock>(),
135+
blockOrder: [],
136+
},
131137
};
132138
}
133139

server/__tests__/query-loop.test.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,75 @@ describe('runQueryLoop', () => {
292292
expect(blockEndIdx).toBeLessThan(messageEndIdx);
293293
});
294294

295+
it('flushes pending message_end when new message_start arrives (multi-turn race)', async () => {
296+
const events: Record<string, unknown>[] = [
297+
// Turn 1: thinking block, assistant fires before block_stop
298+
{ type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-t1' } } },
299+
{
300+
type: 'stream_event',
301+
event: { type: 'content_block_start', index: 0, content_block: { type: 'thinking' } },
302+
},
303+
{
304+
type: 'stream_event',
305+
event: {
306+
type: 'content_block_delta',
307+
index: 0,
308+
delta: { type: 'thinking_delta', thinking: 'hmm' },
309+
},
310+
},
311+
{ type: 'stream_event', event: { type: 'content_block_stop', index: 0 } },
312+
{
313+
type: 'stream_event',
314+
event: { type: 'content_block_start', index: 1, content_block: { type: 'text' } },
315+
},
316+
{
317+
type: 'stream_event',
318+
event: {
319+
type: 'content_block_delta',
320+
index: 1,
321+
delta: { type: 'text_delta', text: 'Turn 1 response' },
322+
},
323+
},
324+
// assistant fires BEFORE content_block_stop for the text block
325+
{ type: 'assistant', message: { content: [] }, session_id: 'sess-mt' },
326+
// Turn 2 starts before Turn 1's text block_stop arrives
327+
{ type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-t2' } } },
328+
// Turn 1's block_stop is now orphaned — server should have force-flushed
329+
{
330+
type: 'stream_event',
331+
event: { type: 'content_block_start', index: 0, content_block: { type: 'text' } },
332+
},
333+
{
334+
type: 'stream_event',
335+
event: {
336+
type: 'content_block_delta',
337+
index: 0,
338+
delta: { type: 'text_delta', text: 'Turn 2 response' },
339+
},
340+
},
341+
{ type: 'stream_event', event: { type: 'content_block_stop', index: 0 } },
342+
{ type: 'assistant', message: { content: [] }, session_id: 'sess-mt' },
343+
{ type: 'result', session_id: 'sess-mt' },
344+
];
345+
346+
await runQueryLoop(eventStream(events), clientId, registry, abortController, ws);
347+
348+
const sent = ws.sent;
349+
350+
// Turn 1 message_end must exist and come before Turn 2 message_start
351+
const t1MsgEnd = sent.findIndex((m) => m.type === 'message_end' && m.messageId === 'msg-t1');
352+
const t2MsgStart = sent.findIndex(
353+
(m) => m.type === 'message_start' && m.messageId === 'msg-t2',
354+
);
355+
expect(t1MsgEnd).toBeGreaterThan(-1);
356+
expect(t2MsgStart).toBeGreaterThan(-1);
357+
expect(t1MsgEnd).toBeLessThan(t2MsgStart);
358+
359+
// Turn 2 message_end must also exist
360+
const t2MsgEnd = sent.findIndex((m) => m.type === 'message_end' && m.messageId === 'msg-t2');
361+
expect(t2MsgEnd).toBeGreaterThan(t2MsgStart);
362+
});
363+
295364
it('does not emit old-style text or text_delta events', async () => {
296365
const events: Record<string, unknown>[] = [
297366
{ type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-nodupe' } } },

server/query-loop.ts

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,55 @@ export async function runQueryLoop(
7373
}
7474
}
7575

76+
function forceFlushPendingMessage(
77+
ws: WebSocket,
78+
session: { currentSnapshot: { blocks: SnapshotBlock[] } | null },
79+
) {
80+
if (!pendingMessageEnd) return;
81+
for (const [index, bid] of blockIdByIndex) {
82+
if (openBlockCount <= 0) break;
83+
const snap = session.currentSnapshot?.blocks.find((b) => b.blockId === bid);
84+
if (snap && !snap.done) {
85+
snap.done = true;
86+
const toolEntry = toolInputBuffers.get(index);
87+
if (toolEntry) {
88+
toolInputBuffers.delete(index);
89+
let toolInput: Record<string, unknown> = {};
90+
try {
91+
toolInput = JSON.parse(toolEntry.inputBuf || '{}');
92+
} catch {
93+
/* empty */
94+
}
95+
send(
96+
ws,
97+
v2('block_end', {
98+
messageId: currentMessageId,
99+
blockId: bid,
100+
blockType: 'tool_use',
101+
toolName: toolEntry.name,
102+
toolId: toolEntry.id,
103+
input: summarizeToolInput(toolEntry.name, toolInput),
104+
}),
105+
);
106+
} else {
107+
send(
108+
ws,
109+
v2('block_end', {
110+
messageId: currentMessageId,
111+
blockId: bid,
112+
blockType: snap.blockType,
113+
}),
114+
);
115+
}
116+
openBlockCount = Math.max(0, openBlockCount - 1);
117+
}
118+
}
119+
send(ws, pendingMessageEnd);
120+
pendingMessageEnd = null;
121+
currentMessageId = null;
122+
session.currentSnapshot = null;
123+
}
124+
76125
log.info('query loop started', { clientId });
77126

78127
try {
@@ -100,63 +149,19 @@ export async function runQueryLoop(
100149
} else if (msg.type === 'result') {
101150
log.info('result received', { clientId, sessionId: msg.session_id });
102151
if (msg.session_id) send(currentWs, { type: 'session_id', sessionId: msg.session_id });
103-
// Force-close any open blocks and flush pending message_end before session_end.
104-
if (pendingMessageEnd) {
105-
for (const [index, bid] of blockIdByIndex) {
106-
if (openBlockCount <= 0) break;
107-
const snap = currentSession.currentSnapshot?.blocks.find((b) => b.blockId === bid);
108-
if (snap && !snap.done) {
109-
snap.done = true;
110-
const toolEntry = toolInputBuffers.get(index);
111-
if (toolEntry) {
112-
toolInputBuffers.delete(index);
113-
let toolInput: Record<string, unknown> = {};
114-
try {
115-
toolInput = JSON.parse(toolEntry.inputBuf || '{}');
116-
} catch {
117-
/* empty */
118-
}
119-
send(
120-
currentWs,
121-
v2('block_end', {
122-
messageId: currentMessageId,
123-
blockId: bid,
124-
blockType: 'tool_use',
125-
toolName: toolEntry.name,
126-
toolId: toolEntry.id,
127-
input: summarizeToolInput(toolEntry.name, toolInput),
128-
}),
129-
);
130-
} else {
131-
send(
132-
currentWs,
133-
v2('block_end', {
134-
messageId: currentMessageId,
135-
blockId: bid,
136-
blockType: snap.blockType,
137-
}),
138-
);
139-
}
140-
openBlockCount = Math.max(0, openBlockCount - 1);
141-
}
142-
}
143-
send(currentWs, pendingMessageEnd);
144-
pendingMessageEnd = null;
145-
currentMessageId = null;
146-
currentSession.currentSnapshot = null;
147-
}
152+
forceFlushPendingMessage(currentWs, currentSession);
148153
doneSent = true;
149154
send(currentWs, v2('session_end', { sessionId: msg.session_id }));
150155
} else if (msg.type === 'stream_event') {
151156
const evt = msg.event as Record<string, unknown> | undefined;
152157
log.debug('stream event', { clientId, evtType: evt?.type });
153158

154159
if (evt?.type === 'message_start') {
160+
forceFlushPendingMessage(currentWs, currentSession);
155161
toolInputBuffers.clear();
156162
blockIdByIndex.clear();
157163
blockCounter = 0;
158164
openBlockCount = 0;
159-
pendingMessageEnd = null;
160165
// Use API message ID if available, otherwise generate one.
161166
const apiMsg = evt.message as Record<string, unknown> | undefined;
162167
currentMessageId = (apiMsg?.id as string | undefined) ?? `msg-${Date.now()}`;

0 commit comments

Comments
 (0)