Skip to content
147 changes: 147 additions & 0 deletions src/services/socket-manager/message-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,153 @@ describe('createMessageEventQueue', () => {
});
});

describe('multi-message assistant turns', () => {
beforeEach(() => {
mockItems.messages.push({
id: 'user-1',
role: 'user',
content: 'please book a meeting',
parts: [],
created_at: new Date().toISOString(),
transcribed: true,
});
});

it('should append a new assistant message when answer arrives with a different id', () => {
const { onMessage } = createMessageEventQueue(
mockAnalytics,
mockItems,
mockOptions,
mockAgent,
mockOnStreamDone
);

onMessage(ChatProgress.Answer, { id: 'assistant-1', content: "ok, i'll book a meeting" });
onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'i booked a meeting for you' });

const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant');
expect(assistantMessages).toHaveLength(2);
expect(assistantMessages[0].id).toBe('assistant-1');
expect(assistantMessages[0].content).toBe("ok, i'll book a meeting");
expect(assistantMessages[1].id).toBe('assistant-2');
expect(assistantMessages[1].content).toBe('i booked a meeting for you');
});

it('should preserve the first assistant message when a second arrives after a tool call', () => {
const { onMessage } = createMessageEventQueue(
mockAnalytics,
mockItems,
mockOptions,
mockAgent,
mockOnStreamDone
);

onMessage(ChatProgress.Answer, { id: 'assistant-1', content: "ok, i'll book a meeting" });
// Tool-call events are dispatched via a separate path and do not touch messages;
// simulating that gap here means the next answer event arrives with a fresh id.
onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'i booked a meeting for you' });

expect(mockItems.messages.map(m => m.content)).toEqual([
'please book a meeting',
"ok, i'll book a meeting",
'i booked a meeting for you',
]);
});

it('should overwrite the last assistant message when answer has the same id', () => {
const { onMessage } = createMessageEventQueue(
mockAnalytics,
mockItems,
mockOptions,
mockAgent,
mockOnStreamDone
);

onMessage(ChatProgress.Answer, { id: 'assistant-1', content: 'first draft' });
onMessage(ChatProgress.Answer, { id: 'assistant-1', content: 'final answer' });

const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant');
expect(assistantMessages).toHaveLength(1);
expect(assistantMessages[0].content).toBe('final answer');
});

it('should overwrite the last assistant message when answer has no id (legacy backends)', () => {
const { onMessage } = createMessageEventQueue(
mockAnalytics,
mockItems,
mockOptions,
mockAgent,
mockOnStreamDone
);

onMessage(ChatProgress.Answer, { content: 'first' });
onMessage(ChatProgress.Answer, { content: 'second' });

const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant');
expect(assistantMessages).toHaveLength(1);
expect(assistantMessages[0].content).toBe('second');
});

it('should not leak content from the previous assistant message into the new one', () => {
const { onMessage } = createMessageEventQueue(
mockAnalytics,
mockItems,
mockOptions,
mockAgent,
mockOnStreamDone
);

onMessage(ChatProgress.Answer, { id: 'assistant-1', content: "ok, i'll book a meeting" });
onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'done' });

const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant');
expect(assistantMessages).toHaveLength(2);
expect(assistantMessages[1].content).toBe('done');
expect(assistantMessages[1].content).not.toContain("ok, i'll book");
});

it('should keep streaming partials into the same assistant message', () => {
const { onMessage } = createMessageEventQueue(
mockAnalytics,
mockItems,
mockOptions,
mockAgent,
mockOnStreamDone
);

onMessage(ChatProgress.Partial, { content: 'Hello', sequence: 0 });
onMessage(ChatProgress.Partial, { content: ' World', sequence: 1 });

const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant');
expect(assistantMessages).toHaveLength(1);
expect(assistantMessages[0].content).toBe('Hello World');
});

it('should append a new assistant message when partials arrive with a different id', () => {
const { onMessage } = createMessageEventQueue(
mockAnalytics,
mockItems,
mockOptions,
mockAgent,
mockOnStreamDone
);

onMessage(ChatProgress.Partial, { id: 'assistant-1', content: 'first ', sequence: 0 });
onMessage(ChatProgress.Partial, { id: 'assistant-1', content: 'message', sequence: 1 });
onMessage(ChatProgress.Answer, { id: 'assistant-1', content: 'first message' });

onMessage(ChatProgress.Partial, { id: 'assistant-2', content: 'second ', sequence: 0 });
onMessage(ChatProgress.Partial, { id: 'assistant-2', content: 'message', sequence: 1 });
onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'second message' });

const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant');
expect(assistantMessages).toHaveLength(2);
expect(assistantMessages[0].content).toBe('first message');
expect(assistantMessages[1].content).toBe('second message');
expect(assistantMessages[1].content).not.toContain('first');
});
});

describe('message parts population', () => {
it('should populate parts on partial messages', () => {
const { onMessage } = createMessageEventQueue(
Expand Down
20 changes: 16 additions & 4 deletions src/services/socket-manager/message-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ function processChatEvent(
data: any,
chatEventQueue: ChatEventQueue,
items: AgentManagerItems,
onNewMessage: AgentManagerOptions['callbacks']['onNewMessage']
onNewMessage: AgentManagerOptions['callbacks']['onNewMessage'],
clearQueue: () => void
) {
if (event === ChatProgress.Transcribe && data.content) {
handleAudioTranscribedMessage(data, items, onNewMessage);
Expand All @@ -70,10 +71,21 @@ function processChatEvent(

const lastMessage = items.messages[items.messages.length - 1];

// A new assistant message within the same turn (e.g. after a client tool call, or several
// assistant messages in a row) is signalled by a chat event whose `id` differs from the
// last assistant message. The new message typically starts with `Partial` events and ends
// with `Answer`, so both branches must detect the id change — otherwise the SDK overwrites
// the previous message on the first partial of the new one.
const isNewAssistantMessage = data.id && lastMessage?.role === 'assistant' && lastMessage.id !== data.id;

let currentMessage: Message;
if (lastMessage?.role === 'assistant') {
if (lastMessage?.role === 'assistant' && !isNewAssistantMessage) {
currentMessage = lastMessage;
} else if (!lastMessage || (lastMessage.transcribed && lastMessage.role === 'user')) {
} else if (!lastMessage || (lastMessage.transcribed && lastMessage.role === 'user') || isNewAssistantMessage) {
if (isNewAssistantMessage) {
// Reset the streaming buffer so the next message does not inherit the previous one's content.
clearQueue();
}
currentMessage = {
id: data.id || `assistant-${Date.now()}`,
role: data.role || 'assistant',
Expand Down Expand Up @@ -132,7 +144,7 @@ export function createMessageEventQueue(
: event === StreamEvents.ChatAudioTranscribed
? ChatProgress.Transcribe
: (event as ChatProgress);
processChatEvent(chatEvent, data, chatEventQueue, items, onNewMessage);
processChatEvent(chatEvent, data, chatEventQueue, items, onNewMessage, clearQueue);

if (chatEvent === ChatProgress.Answer) {
analytics.track('agent-message-received', {
Expand Down
Loading
Loading