From 0e84af75c16e309322a89ed570fa3246e857af39 Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Thu, 25 Dec 2025 11:58:07 +0700 Subject: [PATCH 1/2] refactor: db structure to preserve tool calls order in thinking process --- backend/__tests__/db.test.js | 27 +++ backend/package.json | 3 +- backend/scripts/backfill-message-events.js | 184 ++++++++++++++++++ backend/src/db/messageEvents.js | 98 ++++++++++ backend/src/db/messages.js | 10 + .../src/db/migrations/022-message-events.js | 25 +++ backend/src/env.js | 1 + backend/src/lib/openaiProxy.js | 35 ++++ .../src/lib/persistence/PersistenceConfig.js | 9 + backend/src/lib/simplifiedPersistence.js | 76 ++++++++ backend/src/lib/streamingHandler.js | 6 + backend/src/lib/toolsJson.js | 8 + backend/src/lib/toolsStreaming.js | 11 ++ frontend/components/MessageList.tsx | 52 ++++- frontend/hooks/useChat.ts | 7 +- frontend/lib/index.ts | 1 + frontend/lib/types.ts | 12 ++ 17 files changed, 558 insertions(+), 7 deletions(-) create mode 100644 backend/scripts/backfill-message-events.js create mode 100644 backend/src/db/messageEvents.js create mode 100644 backend/src/db/migrations/022-message-events.js diff --git a/backend/__tests__/db.test.js b/backend/__tests__/db.test.js index 96fe1070..4cc45a19 100644 --- a/backend/__tests__/db.test.js +++ b/backend/__tests__/db.test.js @@ -10,6 +10,7 @@ import { retentionSweep, resetDbCache, } from '../src/db/index.js'; +import { insertMessageEvents } from '../src/db/messageEvents.js'; import { createUser } from '../src/db/users.js'; import { config } from '../src/env.js'; import { safeTestSetup } from '../test_support/databaseSafety.js'; @@ -129,6 +130,32 @@ describe('DB helpers', () => { const partial = getMessagesPage({ conversationId: 'conv', afterSeq: 2, limit: 5 }); assert.equal(partial.next_after_seq, null); }); + + test('includes message_events when present', () => { + createConversation({ id: 'conv', sessionId, userId: testUser.id }); + const db = getDb(); + const info = db.prepare( + `INSERT INTO messages (conversation_id, role, status, content, seq) + VALUES (@cid, 'assistant', 'final', @content, @seq)` + ).run({ cid: 'conv', content: 'Hello world', seq: 1 }); + + insertMessageEvents({ + messageId: info.lastInsertRowid, + conversationId: 'conv', + events: [ + { seq: 0, type: 'content', payload: { text: 'Hello ' } }, + { seq: 1, type: 'content', payload: { text: 'world' } }, + ], + }); + + const page = getMessagesPage({ conversationId: 'conv', afterSeq: 0, limit: 5 }); + assert.equal(page.messages.length, 1); + const [message] = page.messages; + assert.ok(Array.isArray(message.message_events)); + assert.equal(message.message_events.length, 2); + assert.equal(message.message_events[0].payload.text, 'Hello '); + assert.equal(message.message_events[1].payload.text, 'world'); + }); }); describe('retentionSweep', () => { diff --git a/backend/package.json b/backend/package.json index f41c5fcb..ca0242ba 100644 --- a/backend/package.json +++ b/backend/package.json @@ -12,7 +12,8 @@ "test": "NODE_OPTIONS=--experimental-vm-modules jest", "lint": "eslint .", "format": "prettier --write .", - "migrate": "node scripts/migrate.js" + "migrate": "node scripts/migrate.js", + "backfill:message-events": "node scripts/backfill-message-events.js" }, "dependencies": { "@blackglory/better-sqlite3-migrations": "^0.1.20", diff --git a/backend/scripts/backfill-message-events.js b/backend/scripts/backfill-message-events.js new file mode 100644 index 00000000..035482ef --- /dev/null +++ b/backend/scripts/backfill-message-events.js @@ -0,0 +1,184 @@ +import { getDb } from '../src/db/client.js'; +import { insertMessageEvents } from '../src/db/messageEvents.js'; +import { logger } from '../src/logger.js'; + +function extractTextFromMixedContent(content) { + if (!Array.isArray(content)) return ''; + const segments = []; + for (const part of content) { + if (!part) continue; + if (typeof part === 'string') { + segments.push(part); + continue; + } + if (typeof part === 'object') { + if (typeof part.text === 'string') { + segments.push(part.text); + continue; + } + if (typeof part.value === 'string') { + segments.push(part.value); + continue; + } + if (typeof part.content === 'string') { + segments.push(part.content); + } + } + } + return segments.join(''); +} + +function extractReasoningText(raw) { + if (!raw) return ''; + try { + const parsed = JSON.parse(raw); + if (Array.isArray(parsed)) { + return parsed + .map((detail) => (typeof detail?.text === 'string' ? detail.text.trim() : '')) + .filter(Boolean) + .join('\n\n'); + } + if (parsed && typeof parsed === 'object' && typeof parsed.text === 'string') { + return parsed.text.trim(); + } + } catch { + return ''; + } + return ''; +} + +function buildEvents({ contentText, reasoningText, toolCalls }) { + const events = []; + const hasThinkingInContent = contentText.includes(''); + + if (reasoningText && !hasThinkingInContent) { + events.push({ type: 'reasoning', payload: { text: reasoningText } }); + } + + const callsWithOffsets = toolCalls + .map((call) => ({ + id: call.id, + index: call.call_index, + offset: Number.isFinite(call.text_offset) ? call.text_offset : null, + })) + .sort((a, b) => { + if (a.offset == null && b.offset == null) return (a.index ?? 0) - (b.index ?? 0); + if (a.offset == null) return 1; + if (b.offset == null) return -1; + if (a.offset !== b.offset) return a.offset - b.offset; + return (a.index ?? 0) - (b.index ?? 0); + }); + + let cursor = 0; + for (const call of callsWithOffsets) { + if (call.offset == null) continue; + const normalized = Math.max(0, Math.min(call.offset, contentText.length)); + if (normalized > cursor) { + const chunk = contentText.slice(cursor, normalized); + if (chunk) events.push({ type: 'content', payload: { text: chunk } }); + cursor = normalized; + } + events.push({ + type: 'tool_call', + payload: { tool_call_id: call.id, tool_call_index: call.index ?? 0 }, + }); + } + + if (cursor < contentText.length) { + const remaining = contentText.slice(cursor); + if (remaining) events.push({ type: 'content', payload: { text: remaining } }); + } + + for (const call of callsWithOffsets) { + if (call.offset != null) continue; + events.push({ + type: 'tool_call', + payload: { tool_call_id: call.id, tool_call_index: call.index ?? 0 }, + }); + } + + if (events.length === 0 && contentText) { + events.push({ type: 'content', payload: { text: contentText } }); + } + + return events; +} + +function parseArgs() { + const args = new Set(process.argv.slice(2)); + return { + dryRun: args.has('--dry-run'), + limit: (() => { + const match = process.argv.find((arg) => arg.startsWith('--limit=')); + if (!match) return null; + const value = Number(match.split('=')[1]); + return Number.isFinite(value) ? value : null; + })(), + }; +} + +function main() { + const { dryRun, limit } = parseArgs(); + const db = getDb(); + + const rows = db + .prepare( + `SELECT m.id, m.conversation_id, m.content, m.content_json, m.reasoning_details + FROM messages m + WHERE m.role = 'assistant' + AND m.status IN ('final', 'error') + AND NOT EXISTS ( + SELECT 1 FROM message_events e WHERE e.message_id = m.id + ) + ORDER BY m.id ASC + ${limit ? 'LIMIT @limit' : ''}` + ) + .all(limit ? { limit } : {}); + + let processed = 0; + let inserted = 0; + + const toolCallsStmt = db.prepare( + `SELECT id, call_index, text_offset + FROM tool_calls + WHERE message_id = @messageId + ORDER BY call_index ASC` + ); + + for (const row of rows) { + processed += 1; + let contentText = typeof row.content === 'string' ? row.content : ''; + + if (row.content_json) { + try { + const parsed = JSON.parse(row.content_json); + contentText = extractTextFromMixedContent(parsed); + } catch { + contentText = typeof row.content === 'string' ? row.content : ''; + } + } + + const reasoningText = extractReasoningText(row.reasoning_details); + const toolCalls = toolCallsStmt.all({ messageId: row.id }); + const events = buildEvents({ contentText, reasoningText, toolCalls }); + + if (events.length === 0) continue; + inserted += 1; + + if (!dryRun) { + insertMessageEvents({ + messageId: row.id, + conversationId: row.conversation_id, + events: events.map((event, index) => ({ ...event, seq: index })), + }); + } + } + + logger.info('[backfill-message-events] completed', { + processed, + inserted, + dryRun, + }); +} + +main(); diff --git a/backend/src/db/messageEvents.js b/backend/src/db/messageEvents.js new file mode 100644 index 00000000..427ea6ac --- /dev/null +++ b/backend/src/db/messageEvents.js @@ -0,0 +1,98 @@ +import { getDb } from './client.js'; +import { logger } from '../logger.js'; + +let messageEventsTableAvailable = null; + +function isMessageEventsTableAvailable(db) { + if (messageEventsTableAvailable !== null) return messageEventsTableAvailable; + try { + const row = db.prepare( + `SELECT name FROM sqlite_master WHERE type='table' AND name='message_events'` + ).get(); + messageEventsTableAvailable = Boolean(row?.name); + } catch (error) { + logger.warn('[messageEvents] Failed to check table availability', error); + messageEventsTableAvailable = false; + } + return messageEventsTableAvailable; +} + +function serializePayload(payload) { + if (payload === undefined) return null; + if (payload === null) return null; + try { + return JSON.stringify(payload); + } catch (error) { + logger.warn('[messageEvents] Failed to serialize payload', error); + return null; + } +} + +function parsePayload(raw, eventId) { + if (raw == null) return null; + try { + return JSON.parse(raw); + } catch (error) { + logger.warn(`[messageEvents] Failed to parse payload for event ${eventId}`, error); + return null; + } +} + +export function insertMessageEvents({ messageId, conversationId, events }) { + if (!messageId || !conversationId || !Array.isArray(events) || events.length === 0) return; + const db = getDb(); + if (!isMessageEventsTableAvailable(db)) return; + const now = new Date().toISOString(); + const stmt = db.prepare( + `INSERT INTO message_events (message_id, conversation_id, seq, type, payload, created_at) + VALUES (@messageId, @conversationId, @seq, @type, @payload, @now)` + ); + + const insertMany = db.transaction((rows) => { + for (const row of rows) { + stmt.run(row); + } + }); + + const rows = events.map((event) => ({ + messageId, + conversationId, + seq: typeof event.seq === 'number' ? event.seq : 0, + type: event.type, + payload: serializePayload(event.payload), + now, + })); + + insertMany(rows); +} + +export function getMessageEventsByMessageIds(messageIds) { + if (!Array.isArray(messageIds) || messageIds.length === 0) return {}; + const db = getDb(); + if (!isMessageEventsTableAvailable(db)) return {}; + const placeholders = messageIds.map(() => '?').join(','); + const rows = db + .prepare( + `SELECT id, message_id, conversation_id, seq, type, payload, created_at + FROM message_events + WHERE message_id IN (${placeholders}) + ORDER BY message_id ASC, seq ASC` + ) + .all(...messageIds); + + const eventsByMessage = {}; + for (const row of rows) { + if (!eventsByMessage[row.message_id]) { + eventsByMessage[row.message_id] = []; + } + eventsByMessage[row.message_id].push({ + id: row.id, + seq: row.seq, + type: row.type, + payload: parsePayload(row.payload, row.id), + created_at: row.created_at, + }); + } + + return eventsByMessage; +} diff --git a/backend/src/db/messages.js b/backend/src/db/messages.js index 8a89630e..b1770bc2 100644 --- a/backend/src/db/messages.js +++ b/backend/src/db/messages.js @@ -1,5 +1,6 @@ import { getDb } from './client.js'; import { logger } from '../logger.js'; +import { getMessageEventsByMessageIds } from './messageEvents.js'; function extractTextFromMixedContent(content) { if (!Array.isArray(content)) return ''; @@ -306,6 +307,7 @@ export function getMessagesPage({ conversationId, afterSeq = 0, limit = 50 }) { if (messages.length > 0) { const messageIds = messages.map(m => m.id); const placeholders = messageIds.map(() => '?').join(','); + const messageEventsByMessage = getMessageEventsByMessageIds(messageIds); // Get all tool calls for these messages const toolCalls = db @@ -362,6 +364,9 @@ export function getMessagesPage({ conversationId, afterSeq = 0, limit = 50 }) { // Attach tool calls and outputs to messages (still using integer IDs) for (const message of messages) { + if (messageEventsByMessage[message.id]) { + message.message_events = messageEventsByMessage[message.id]; + } if (toolCallsByMessage[message.id]) { message.tool_calls = toolCallsByMessage[message.id]; } @@ -455,6 +460,11 @@ export function getLastMessage({ conversationId }) { ) .all({ messageId: integerMessageId }); + const messageEventsByMessage = getMessageEventsByMessageIds([integerMessageId]); + if (messageEventsByMessage[integerMessageId]) { + message.message_events = messageEventsByMessage[integerMessageId]; + } + // Transform tool calls to OpenAI format if (toolCalls.length > 0) { message.tool_calls = toolCalls.map(tc => ({ diff --git a/backend/src/db/migrations/022-message-events.js b/backend/src/db/migrations/022-message-events.js new file mode 100644 index 00000000..fa8913ec --- /dev/null +++ b/backend/src/db/migrations/022-message-events.js @@ -0,0 +1,25 @@ +export default { + version: 22, + up: ` + -- Create message_events table to capture ordered assistant events + CREATE TABLE IF NOT EXISTS message_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + message_id INTEGER NOT NULL, + conversation_id TEXT NOT NULL, + seq INTEGER NOT NULL, + type TEXT NOT NULL, + payload TEXT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(message_id) REFERENCES messages(id) ON DELETE CASCADE, + FOREIGN KEY(conversation_id) REFERENCES conversations(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_message_events_message_id ON message_events(message_id, seq); + CREATE INDEX IF NOT EXISTS idx_message_events_conversation_id ON message_events(conversation_id, seq); + `, + down: ` + DROP INDEX IF EXISTS idx_message_events_conversation_id; + DROP INDEX IF EXISTS idx_message_events_message_id; + DROP TABLE IF EXISTS message_events; + ` +}; diff --git a/backend/src/env.js b/backend/src/env.js index 730f0f46..5573908e 100644 --- a/backend/src/env.js +++ b/backend/src/env.js @@ -97,6 +97,7 @@ export const config = { intervalMs: parsePositiveNumber(process.env.CHECKPOINT_INTERVAL_MS, 3000), minCharacters: parsePositiveNumber(process.env.CHECKPOINT_MIN_CHARACTERS, 500), }, + messageEventsEnabled: bool(process.env.MESSAGE_EVENTS_ENABLED, true), }, auth: { jwtSecret: process.env.JWT_SECRET || 'development-secret-key-change-in-production', diff --git a/backend/src/lib/openaiProxy.js b/backend/src/lib/openaiProxy.js index 82b27b8e..deaa2dc6 100644 --- a/backend/src/lib/openaiProxy.js +++ b/backend/src/lib/openaiProxy.js @@ -440,9 +440,36 @@ async function handleRequest(context, req, res) { if (message.content !== undefined) { const safeContent = sanitizeContent(message.content); persistence.setAssistantContent(safeContent); + if (typeof persistence.addMessageEvent === 'function') { + const contentText = Array.isArray(safeContent) + ? safeContent + .map((part) => { + if (typeof part === 'string') return part; + if (part && typeof part === 'object') { + if (typeof part.text === 'string') return part.text; + if (typeof part.value === 'string') return part.value; + if (typeof part.content === 'string') return part.content; + } + return ''; + }) + .join('') + : (typeof safeContent === 'string' ? safeContent : ''); + if (contentText) { + persistence.addMessageEvent('content', { text: contentText }); + } + } } if (Array.isArray(message.reasoning_details)) { persistence.setReasoningDetails(message.reasoning_details); + if (typeof persistence.addMessageEvent === 'function') { + const reasoningText = message.reasoning_details + .map((detail) => (typeof detail?.text === 'string' ? detail.text.trim() : '')) + .filter(Boolean) + .join('\n\n'); + if (reasoningText) { + persistence.addMessageEvent('reasoning', { text: reasoningText }); + } + } } if (Array.isArray(message.tool_calls) && message.tool_calls.length > 0) { const safeContent = sanitizeContent(message.content); @@ -454,6 +481,14 @@ async function handleRequest(context, req, res) { textOffset: contentLength })); persistence.addToolCalls(toolCallsWithOffset); + if (typeof persistence.addMessageEvent === 'function') { + for (const tc of toolCallsWithOffset) { + persistence.addMessageEvent('tool_call', { + tool_call_id: tc.id ?? null, + tool_call_index: tc.index ?? null, + }); + } + } } const finishReason = upstreamJson.choices?.[0]?.finish_reason || null; diff --git a/backend/src/lib/persistence/PersistenceConfig.js b/backend/src/lib/persistence/PersistenceConfig.js index 966e2081..b99547aa 100644 --- a/backend/src/lib/persistence/PersistenceConfig.js +++ b/backend/src/lib/persistence/PersistenceConfig.js @@ -34,6 +34,15 @@ export class PersistenceConfig { return this.config?.persistence?.maxMessagesPerConversation || 1000; } + /** + * Check if message event storage is enabled + * @returns {boolean} True if message event storage is enabled + */ + isMessageEventsEnabled() { + if (this.config?.persistence?.messageEventsEnabled === undefined) return true; + return Boolean(this.config.persistence.messageEventsEnabled); + } + /** * Get default model * @returns {string|null} Default model name diff --git a/backend/src/lib/simplifiedPersistence.js b/backend/src/lib/simplifiedPersistence.js index 25954e27..12002337 100644 --- a/backend/src/lib/simplifiedPersistence.js +++ b/backend/src/lib/simplifiedPersistence.js @@ -9,6 +9,7 @@ import { insertToolCalls, insertToolOutputs, } from '../db/toolCalls.js'; +import { insertMessageEvents } from '../db/messageEvents.js'; import { insertToolMessage, getNextSeq, @@ -47,6 +48,9 @@ export class SimplifiedPersistence { this.reasoningTokens = null; // Reasoning token usage metadata this.userMessageId = null; // Persisted user message ID from latest sync this.assistantMessageId = null; // Persisted assistant message ID for the current turn + this.messageEventsEnabled = this.persistenceConfig?.isMessageEventsEnabled?.() ?? true; + this.messageEvents = []; // Ordered assistant events for rendering + this.nextEventSeq = 0; this._latestSyncMappings = []; // Checkpoint state this.lastCheckpoint = 0; // timestamp @@ -238,6 +242,9 @@ export class SimplifiedPersistence { this.reasoningTextBuffer = ''; this.reasoningTokens = null; this.assistantMessageId = null; + this.messageEventsEnabled = this.persistenceConfig?.isMessageEventsEnabled?.() ?? true; + this.messageEvents = []; + this.nextEventSeq = 0; // Create a draft row immediately so we can checkpoint during streaming try { this.createDraftMessage(); @@ -405,6 +412,7 @@ export class SimplifiedPersistence { appendReasoningText(delta) { if (!this.persist || !delta) return; this.reasoningTextBuffer += delta; + this.addMessageEvent('reasoning', { text: String(delta) }); } setReasoningDetails(details) { @@ -512,6 +520,7 @@ export class SimplifiedPersistence { const text = this._extractTextFromMixedContent(cloned); if (text) { this.assistantBuffer += text; + this.addMessageEvent('content', { text }); } try { if (this.shouldCheckpoint()) this.performCheckpoint(); @@ -523,6 +532,7 @@ export class SimplifiedPersistence { if (typeof delta === 'string') { this.assistantBuffer += delta; + this.addMessageEvent('content', { text: delta }); try { if (this.shouldCheckpoint()) this.performCheckpoint(); } catch (err) { @@ -534,10 +544,13 @@ export class SimplifiedPersistence { if (typeof delta === 'object') { if (typeof delta.text === 'string') { this.assistantBuffer += delta.text; + this.addMessageEvent('content', { text: delta.text }); } else if (typeof delta.value === 'string') { this.assistantBuffer += delta.value; + this.addMessageEvent('content', { text: delta.value }); } else if (typeof delta.content === 'string') { this.assistantBuffer += delta.content; + this.addMessageEvent('content', { text: delta.content }); } } // After updating in-memory buffer, consider checkpointing to DB @@ -640,8 +653,23 @@ export class SimplifiedPersistence { } } + if (this.messageEventsEnabled) { + const finalizedReasoning = this._finalizeReasoningDetails(); + const hasReasoningEvent = this.messageEvents.some((event) => event?.type === 'reasoning'); + if (!hasReasoningEvent && Array.isArray(finalizedReasoning)) { + const reasoningText = finalizedReasoning + .map((detail) => (typeof detail?.text === 'string' ? detail.text.trim() : '')) + .filter(Boolean) + .join('\n\n'); + if (reasoningText) { + this.messageEvents.unshift({ seq: -1, type: 'reasoning', payload: { text: reasoningText } }); + } + } + } + // Automatically persist any buffered tool calls and outputs now that we have a messageId this.persistToolCallsAndOutputs(); + this.persistMessageEvents(); } /** @@ -659,6 +687,29 @@ export class SimplifiedPersistence { this.toolCalls.push(...toolCalls); } + addMessageEvent(type, payload) { + if (!this.persist || !type || !this.messageEventsEnabled) return; + const normalizedPayload = payload ?? null; + const last = this.messageEvents[this.messageEvents.length - 1]; + const isMergeable = + last && + last.type === type && + (type === 'content' || type === 'reasoning') && + last.payload && + normalizedPayload && + typeof last.payload.text === 'string' && + typeof normalizedPayload.text === 'string'; + + if (isMergeable) { + last.payload.text += normalizedPayload.text; + return; + } + + const seq = this.nextEventSeq; + this.nextEventSeq += 1; + this.messageEvents.push({ seq, type, payload: normalizedPayload }); + } + /** * Add tool outputs to buffer * @param {Array} toolOutputs - Array of tool outputs @@ -751,6 +802,31 @@ export class SimplifiedPersistence { } } + persistMessageEvents() { + if (!this.persist || !this.conversationId || !this.currentMessageId || !this.messageEventsEnabled) { + this.messageEvents = []; + return; + } + + if (this.messageEvents.length === 0) return; + + try { + const orderedEvents = this.messageEvents.map((event, index) => ({ + ...event, + seq: index, + })); + insertMessageEvents({ + messageId: this.currentMessageId, + conversationId: this.conversationId, + events: orderedEvents, + }); + } catch (error) { + logger.error('[SimplifiedPersistence] Failed to persist message events:', error); + } finally { + this.messageEvents = []; + } + } + /** * Set the response ID for the current response * @param {string} responseId - OpenAI response ID diff --git a/backend/src/lib/streamingHandler.js b/backend/src/lib/streamingHandler.js index d43c5e59..3c353e8c 100644 --- a/backend/src/lib/streamingHandler.js +++ b/backend/src/lib/streamingHandler.js @@ -132,6 +132,12 @@ function processPersistenceChunk(obj, persistence, toolCallMap, lastFinishReason // Capture textOffset when tool call first appears if (isNewToolCall && persistence) { existing.textOffset = persistence.getContentLength(); + if (typeof persistence.addMessageEvent === 'function') { + persistence.addMessageEvent('tool_call', { + tool_call_id: tcDelta.id ?? null, + tool_call_index: idx, + }); + } } if (tcDelta.id) existing.id = tcDelta.id; diff --git a/backend/src/lib/toolsJson.js b/backend/src/lib/toolsJson.js index 0b73b19c..4af12aca 100644 --- a/backend/src/lib/toolsJson.js +++ b/backend/src/lib/toolsJson.js @@ -717,6 +717,14 @@ export async function handleToolsJson({ textOffset: contentLength })); persistence.addToolCalls(toolCallsWithOffset); + if (typeof persistence.addMessageEvent === 'function') { + for (const tc of toolCallsWithOffset) { + persistence.addMessageEvent('tool_call', { + tool_call_id: tc.id ?? null, + tool_call_index: tc.index ?? null, + }); + } + } } // Execute all tools (support optional parallel execution) diff --git a/backend/src/lib/toolsStreaming.js b/backend/src/lib/toolsStreaming.js index 0820cbd6..a846fd16 100644 --- a/backend/src/lib/toolsStreaming.js +++ b/backend/src/lib/toolsStreaming.js @@ -399,6 +399,11 @@ export async function handleToolsStreaming({ persistence.setReasoningDetails(delta.reasoning_details); } } + + const reasoningText = delta.reasoning_content ?? delta.reasoning; + if (reasoningText && persistence && typeof persistence.appendReasoningText === 'function') { + persistence.appendReasoningText(reasoningText); + } if (message?.reasoning_details && Array.isArray(message.reasoning_details)) { // For non-streaming message format, replace accumulated with full array accumulatedReasoningDetails = message.reasoning_details; @@ -434,6 +439,12 @@ export async function handleToolsStreaming({ // Capture textOffset when tool call first appears if (isNewToolCall && persistence && typeof persistence.getContentLength === 'function') { existing.textOffset = persistence.getContentLength(); + if (typeof persistence.addMessageEvent === 'function') { + persistence.addMessageEvent('tool_call', { + tool_call_id: tcDelta.id ?? null, + tool_call_index: idx, + }); + } } if (tcDelta.id && !existing.id) existing.id = tcDelta.id; diff --git a/frontend/components/MessageList.tsx b/frontend/components/MessageList.tsx index a9cdfef4..e5e2b824 100644 --- a/frontend/components/MessageList.tsx +++ b/frontend/components/MessageList.tsx @@ -71,10 +71,7 @@ function buildAssistantSegments(message: ChatMessage): AssistantSegment[] { const content = extractTextFromContent(message.content); const toolCalls = Array.isArray(message.tool_calls) ? message.tool_calls : []; const toolOutputs = Array.isArray(message.tool_outputs) ? message.tool_outputs : []; - - if (toolCalls.length === 0) { - return content ? [{ kind: 'text', text: content }] : []; - } + const messageEvents = Array.isArray(message.message_events) ? message.message_events : []; // Helper to resolve outputs for a tool call const resolveOutputs = (call: any): ToolOutput[] => { @@ -94,6 +91,53 @@ function buildAssistantSegments(message: ChatMessage): AssistantSegment[] { call.textOffset > 0 ); + if (messageEvents.length > 0) { + const sortedEvents = [...messageEvents].sort((a, b) => (a.seq ?? 0) - (b.seq ?? 0)); + const segments: AssistantSegment[] = []; + + for (const event of sortedEvents) { + if (event.type === 'content') { + const text = typeof event.payload?.text === 'string' ? event.payload.text : ''; + if (text) { + segments.push({ kind: 'text', text }); + } + continue; + } + + if (event.type === 'reasoning') { + const text = typeof event.payload?.text === 'string' ? event.payload.text : ''; + if (text) { + segments.push({ kind: 'text', text: `${text}` }); + } + continue; + } + + if (event.type === 'tool_call') { + const toolCallId = event.payload?.tool_call_id; + const toolCallIndex = event.payload?.tool_call_index; + const toolCall = + (toolCallId + ? toolCalls.find((call: any) => call?.id === toolCallId) + : undefined) || + (typeof toolCallIndex === 'number' + ? toolCalls.find((call: any) => (call?.index ?? 0) === toolCallIndex) + : undefined); + + if (toolCall) { + segments.push({ kind: 'tool_call', toolCall, outputs: resolveOutputs(toolCall) }); + } + } + } + + if (segments.length > 0) { + return segments; + } + } + + if (toolCalls.length === 0) { + return content ? [{ kind: 'text', text: content }] : []; + } + // For loaded conversations (no valid textOffset), show tools first, then content if (!hasValidTextOffset) { const segments: AssistantSegment[] = []; diff --git a/frontend/hooks/useChat.ts b/frontend/hooks/useChat.ts index 5ad73308..067838ef 100644 --- a/frontend/hooks/useChat.ts +++ b/frontend/hooks/useChat.ts @@ -1,6 +1,6 @@ import { useState, useCallback, useRef, useEffect } from 'react'; import { useSystemPrompts } from './useSystemPrompts'; -import type { MessageContent, TextContent } from '../lib'; +import type { MessageContent, TextContent, MessageEvent } from '../lib'; import { conversations as conversationsApi, chat, auth } from '../lib/api'; import { httpClient } from '../lib/http'; import { APIError, StreamingNotSupportedError } from '../lib/streaming'; @@ -26,6 +26,7 @@ export interface Message { content: MessageContent; timestamp?: number; tool_calls?: any[]; + message_events?: MessageEvent[]; tool_call_id?: string; tool_outputs?: Array<{ tool_call_id?: string; @@ -382,9 +383,10 @@ export function useChat() { .filter(Boolean) .join('\n\n') : ''; + const hasMessageEvents = Array.isArray(msg.message_events) && msg.message_events.length > 0; const content = - msg.role === 'assistant' && reasoningText + msg.role === 'assistant' && reasoningText && !hasMessageEvents ? prependReasoningToContent(baseContent, reasoningText) : baseContent; @@ -394,6 +396,7 @@ export function useChat() { content, timestamp: new Date(msg.created_at).getTime(), tool_calls: msg.tool_calls, + message_events: msg.message_events, tool_outputs: msg.tool_outputs, reasoning_details: msg.reasoning_details ?? undefined, reasoning_tokens: msg.reasoning_tokens ?? undefined, diff --git a/frontend/lib/index.ts b/frontend/lib/index.ts index 65bf1859..7230851f 100644 --- a/frontend/lib/index.ts +++ b/frontend/lib/index.ts @@ -103,6 +103,7 @@ export type { FileProcessingState, FileUploadProgress, ChatMessage, + MessageEvent, ChatEvent, ChatResponse, ConversationMeta, diff --git a/frontend/lib/types.ts b/frontend/lib/types.ts index e1da8367..437398c0 100644 --- a/frontend/lib/types.ts +++ b/frontend/lib/types.ts @@ -203,6 +203,7 @@ export interface ChatMessage { // Local image attachments (used during composition, converted to content format for API) images?: ImageAttachment[]; tool_calls?: any[]; + message_events?: MessageEvent[]; tool_call_id?: string; tool_outputs?: Array<{ tool_call_id?: string; @@ -222,6 +223,16 @@ export interface ChatMessage { reasoning_tokens?: number | null; } +export interface MessageEvent { + seq: number; + type: 'content' | 'reasoning' | 'tool_call'; + payload: { + text?: string; + tool_call_id?: string | null; + tool_call_index?: number | null; + } | null; +} + export interface ChatEvent { type: 'text' | 'reasoning' | 'tool_call' | 'tool_output' | 'usage' | 'final'; value: any; @@ -306,6 +317,7 @@ export interface ConversationWithMessages { }; textOffset?: number; }>; + message_events?: MessageEvent[]; tool_outputs?: Array<{ tool_call_id: string; output: unknown; From c40a3e6fefcccaae8be76291a43010ac749bca8f Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Thu, 25 Dec 2025 12:15:43 +0700 Subject: [PATCH 2/2] fix lint --- frontend/components/MessageList.tsx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/frontend/components/MessageList.tsx b/frontend/components/MessageList.tsx index e5e2b824..1048b7db 100644 --- a/frontend/components/MessageList.tsx +++ b/frontend/components/MessageList.tsx @@ -116,9 +116,7 @@ function buildAssistantSegments(message: ChatMessage): AssistantSegment[] { const toolCallId = event.payload?.tool_call_id; const toolCallIndex = event.payload?.tool_call_index; const toolCall = - (toolCallId - ? toolCalls.find((call: any) => call?.id === toolCallId) - : undefined) || + (toolCallId ? toolCalls.find((call: any) => call?.id === toolCallId) : undefined) || (typeof toolCallIndex === 'number' ? toolCalls.find((call: any) => (call?.index ?? 0) === toolCallIndex) : undefined);