Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions backend/__tests__/db.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
retentionSweep,
resetDbCache,
} from '../src/db/index.js';
import { insertMessageEvents } from '../src/db/messageEvents.js';
import { createUser } from '../src/db/users.js';
import { config } from '../src/env.js';
import { safeTestSetup } from '../test_support/databaseSafety.js';
Expand Down Expand Up @@ -129,6 +130,32 @@ describe('DB helpers', () => {
const partial = getMessagesPage({ conversationId: 'conv', afterSeq: 2, limit: 5 });
assert.equal(partial.next_after_seq, null);
});

test('includes message_events when present', () => {
createConversation({ id: 'conv', sessionId, userId: testUser.id });
const db = getDb();
const info = db.prepare(
`INSERT INTO messages (conversation_id, role, status, content, seq)
VALUES (@cid, 'assistant', 'final', @content, @seq)`
).run({ cid: 'conv', content: 'Hello world', seq: 1 });

insertMessageEvents({
messageId: info.lastInsertRowid,
conversationId: 'conv',
events: [
{ seq: 0, type: 'content', payload: { text: 'Hello ' } },
{ seq: 1, type: 'content', payload: { text: 'world' } },
],
});

const page = getMessagesPage({ conversationId: 'conv', afterSeq: 0, limit: 5 });
assert.equal(page.messages.length, 1);
const [message] = page.messages;
assert.ok(Array.isArray(message.message_events));
assert.equal(message.message_events.length, 2);
assert.equal(message.message_events[0].payload.text, 'Hello ');
assert.equal(message.message_events[1].payload.text, 'world');
});
});

describe('retentionSweep', () => {
Expand Down
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"test": "NODE_OPTIONS=--experimental-vm-modules jest",
"lint": "eslint .",
"format": "prettier --write .",
"migrate": "node scripts/migrate.js"
"migrate": "node scripts/migrate.js",
"backfill:message-events": "node scripts/backfill-message-events.js"
},
"dependencies": {
"@blackglory/better-sqlite3-migrations": "^0.1.20",
Expand Down
184 changes: 184 additions & 0 deletions backend/scripts/backfill-message-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import { getDb } from '../src/db/client.js';
import { insertMessageEvents } from '../src/db/messageEvents.js';
import { logger } from '../src/logger.js';

function extractTextFromMixedContent(content) {
if (!Array.isArray(content)) return '';
const segments = [];
for (const part of content) {
if (!part) continue;
if (typeof part === 'string') {
segments.push(part);
continue;
}
if (typeof part === 'object') {
if (typeof part.text === 'string') {
segments.push(part.text);
continue;
}
if (typeof part.value === 'string') {
segments.push(part.value);
continue;
}
if (typeof part.content === 'string') {
segments.push(part.content);
}
}
}
return segments.join('');
}

function extractReasoningText(raw) {
if (!raw) return '';
try {
const parsed = JSON.parse(raw);
if (Array.isArray(parsed)) {
return parsed
.map((detail) => (typeof detail?.text === 'string' ? detail.text.trim() : ''))
.filter(Boolean)
.join('\n\n');
}
if (parsed && typeof parsed === 'object' && typeof parsed.text === 'string') {
return parsed.text.trim();
}
} catch {
return '';
}
return '';
}

function buildEvents({ contentText, reasoningText, toolCalls }) {
const events = [];
const hasThinkingInContent = contentText.includes('<thinking>');

if (reasoningText && !hasThinkingInContent) {
events.push({ type: 'reasoning', payload: { text: reasoningText } });
}

const callsWithOffsets = toolCalls
.map((call) => ({
id: call.id,
index: call.call_index,
offset: Number.isFinite(call.text_offset) ? call.text_offset : null,
}))
.sort((a, b) => {
if (a.offset == null && b.offset == null) return (a.index ?? 0) - (b.index ?? 0);
if (a.offset == null) return 1;
if (b.offset == null) return -1;
if (a.offset !== b.offset) return a.offset - b.offset;
return (a.index ?? 0) - (b.index ?? 0);
});

let cursor = 0;
for (const call of callsWithOffsets) {
if (call.offset == null) continue;
const normalized = Math.max(0, Math.min(call.offset, contentText.length));
if (normalized > cursor) {
const chunk = contentText.slice(cursor, normalized);
if (chunk) events.push({ type: 'content', payload: { text: chunk } });
cursor = normalized;
}
events.push({
type: 'tool_call',
payload: { tool_call_id: call.id, tool_call_index: call.index ?? 0 },
});
}

if (cursor < contentText.length) {
const remaining = contentText.slice(cursor);
if (remaining) events.push({ type: 'content', payload: { text: remaining } });
}

for (const call of callsWithOffsets) {
if (call.offset != null) continue;
events.push({
type: 'tool_call',
payload: { tool_call_id: call.id, tool_call_index: call.index ?? 0 },
});
}

if (events.length === 0 && contentText) {
events.push({ type: 'content', payload: { text: contentText } });
}

return events;
}

function parseArgs() {
const args = new Set(process.argv.slice(2));
return {
dryRun: args.has('--dry-run'),
limit: (() => {
const match = process.argv.find((arg) => arg.startsWith('--limit='));
if (!match) return null;
const value = Number(match.split('=')[1]);
return Number.isFinite(value) ? value : null;
})(),
};
}

function main() {
const { dryRun, limit } = parseArgs();
const db = getDb();

const rows = db
.prepare(
`SELECT m.id, m.conversation_id, m.content, m.content_json, m.reasoning_details
FROM messages m
WHERE m.role = 'assistant'
AND m.status IN ('final', 'error')
AND NOT EXISTS (
SELECT 1 FROM message_events e WHERE e.message_id = m.id
)
ORDER BY m.id ASC
${limit ? 'LIMIT @limit' : ''}`
)
.all(limit ? { limit } : {});

let processed = 0;
let inserted = 0;

const toolCallsStmt = db.prepare(
`SELECT id, call_index, text_offset
FROM tool_calls
WHERE message_id = @messageId
ORDER BY call_index ASC`
);

for (const row of rows) {
processed += 1;
let contentText = typeof row.content === 'string' ? row.content : '';

if (row.content_json) {
try {
const parsed = JSON.parse(row.content_json);
contentText = extractTextFromMixedContent(parsed);
} catch {
contentText = typeof row.content === 'string' ? row.content : '';
}
}

const reasoningText = extractReasoningText(row.reasoning_details);
const toolCalls = toolCallsStmt.all({ messageId: row.id });
const events = buildEvents({ contentText, reasoningText, toolCalls });

if (events.length === 0) continue;
inserted += 1;

if (!dryRun) {
insertMessageEvents({
messageId: row.id,
conversationId: row.conversation_id,
events: events.map((event, index) => ({ ...event, seq: index })),
});
}
}

logger.info('[backfill-message-events] completed', {
processed,
inserted,
dryRun,
});
}

main();
98 changes: 98 additions & 0 deletions backend/src/db/messageEvents.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { getDb } from './client.js';
import { logger } from '../logger.js';

let messageEventsTableAvailable = null;

function isMessageEventsTableAvailable(db) {
if (messageEventsTableAvailable !== null) return messageEventsTableAvailable;
try {
const row = db.prepare(
`SELECT name FROM sqlite_master WHERE type='table' AND name='message_events'`
).get();
messageEventsTableAvailable = Boolean(row?.name);
} catch (error) {
logger.warn('[messageEvents] Failed to check table availability', error);
messageEventsTableAvailable = false;
}
return messageEventsTableAvailable;
}

function serializePayload(payload) {
if (payload === undefined) return null;
if (payload === null) return null;
try {
return JSON.stringify(payload);
} catch (error) {
logger.warn('[messageEvents] Failed to serialize payload', error);
return null;
}
}

function parsePayload(raw, eventId) {
if (raw == null) return null;
try {
return JSON.parse(raw);
} catch (error) {
logger.warn(`[messageEvents] Failed to parse payload for event ${eventId}`, error);
return null;
}
}

export function insertMessageEvents({ messageId, conversationId, events }) {
if (!messageId || !conversationId || !Array.isArray(events) || events.length === 0) return;
const db = getDb();
if (!isMessageEventsTableAvailable(db)) return;
const now = new Date().toISOString();
const stmt = db.prepare(
`INSERT INTO message_events (message_id, conversation_id, seq, type, payload, created_at)
VALUES (@messageId, @conversationId, @seq, @type, @payload, @now)`
);

const insertMany = db.transaction((rows) => {
for (const row of rows) {
stmt.run(row);
}
});

const rows = events.map((event) => ({
messageId,
conversationId,
seq: typeof event.seq === 'number' ? event.seq : 0,
type: event.type,
payload: serializePayload(event.payload),
now,
}));

insertMany(rows);
}

export function getMessageEventsByMessageIds(messageIds) {
if (!Array.isArray(messageIds) || messageIds.length === 0) return {};
const db = getDb();
if (!isMessageEventsTableAvailable(db)) return {};
const placeholders = messageIds.map(() => '?').join(',');
const rows = db
.prepare(
`SELECT id, message_id, conversation_id, seq, type, payload, created_at
FROM message_events
WHERE message_id IN (${placeholders})
ORDER BY message_id ASC, seq ASC`
)
.all(...messageIds);

const eventsByMessage = {};
for (const row of rows) {
if (!eventsByMessage[row.message_id]) {
eventsByMessage[row.message_id] = [];
}
eventsByMessage[row.message_id].push({
id: row.id,
seq: row.seq,
type: row.type,
payload: parsePayload(row.payload, row.id),
created_at: row.created_at,
});
}

return eventsByMessage;
}
10 changes: 10 additions & 0 deletions backend/src/db/messages.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { getDb } from './client.js';
import { logger } from '../logger.js';
import { getMessageEventsByMessageIds } from './messageEvents.js';

function extractTextFromMixedContent(content) {
if (!Array.isArray(content)) return '';
Expand Down Expand Up @@ -306,6 +307,7 @@ export function getMessagesPage({ conversationId, afterSeq = 0, limit = 50 }) {
if (messages.length > 0) {
const messageIds = messages.map(m => m.id);
const placeholders = messageIds.map(() => '?').join(',');
const messageEventsByMessage = getMessageEventsByMessageIds(messageIds);

// Get all tool calls for these messages
const toolCalls = db
Expand Down Expand Up @@ -362,6 +364,9 @@ export function getMessagesPage({ conversationId, afterSeq = 0, limit = 50 }) {

// Attach tool calls and outputs to messages (still using integer IDs)
for (const message of messages) {
if (messageEventsByMessage[message.id]) {
message.message_events = messageEventsByMessage[message.id];
}
if (toolCallsByMessage[message.id]) {
message.tool_calls = toolCallsByMessage[message.id];
}
Expand Down Expand Up @@ -455,6 +460,11 @@ export function getLastMessage({ conversationId }) {
)
.all({ messageId: integerMessageId });

const messageEventsByMessage = getMessageEventsByMessageIds([integerMessageId]);
if (messageEventsByMessage[integerMessageId]) {
message.message_events = messageEventsByMessage[integerMessageId];
}

// Transform tool calls to OpenAI format
if (toolCalls.length > 0) {
message.tool_calls = toolCalls.map(tc => ({
Expand Down
25 changes: 25 additions & 0 deletions backend/src/db/migrations/022-message-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export default {
version: 22,
up: `
-- Create message_events table to capture ordered assistant events
CREATE TABLE IF NOT EXISTS message_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER NOT NULL,
conversation_id TEXT NOT NULL,
seq INTEGER NOT NULL,
type TEXT NOT NULL,
payload TEXT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(message_id) REFERENCES messages(id) ON DELETE CASCADE,
FOREIGN KEY(conversation_id) REFERENCES conversations(id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_message_events_message_id ON message_events(message_id, seq);
CREATE INDEX IF NOT EXISTS idx_message_events_conversation_id ON message_events(conversation_id, seq);
`,
down: `
DROP INDEX IF EXISTS idx_message_events_conversation_id;
DROP INDEX IF EXISTS idx_message_events_message_id;
DROP TABLE IF EXISTS message_events;
`
};
Loading
Loading