diff --git a/src/api/handlers.ts b/src/api/handlers.ts index 532a5466..f4f2351f 100644 --- a/src/api/handlers.ts +++ b/src/api/handlers.ts @@ -319,8 +319,7 @@ export async function handleChatStream( event.type === "tool.preparing.done" || event.type === "tool.start" || event.type === "tool.done" || - event.type === "llm.done" || - event.type === "data.changed" + event.type === "llm.done" ) { if (event.type === "chat.start") { broadcastUserMessageOnce(); diff --git a/src/conversation/auto-title.ts b/src/conversation/auto-title.ts index cdc110e0..c11950d5 100644 --- a/src/conversation/auto-title.ts +++ b/src/conversation/auto-title.ts @@ -1,5 +1,8 @@ import type { LanguageModelV3 } from "@ai-sdk/provider"; +/** Hard cap so a hung fast-model call can't leave the caller's promise pending. */ +const TITLE_TIMEOUT_MS = 10_000; + /** * Generate a short conversation title using the provided model. * Non-blocking — call fire-and-forget after first turn. @@ -30,6 +33,7 @@ export async function generateTitle( }, ], maxOutputTokens: 30, + abortSignal: AbortSignal.timeout(TITLE_TIMEOUT_MS), }); const textBlock = result.content.find((b) => b.type === "text"); if (textBlock?.type === "text") { diff --git a/src/runtime/runtime.ts b/src/runtime/runtime.ts index 76d3c749..f6819122 100644 --- a/src/runtime/runtime.ts +++ b/src/runtime/runtime.ts @@ -165,6 +165,24 @@ class MultiEventSink implements EventSink { } } +/** + * Wraps a sink so a throw in `emit` is logged instead of propagating. Used + * only at the `defaultEvents` fan-out (`buildEventSink`) — server.ts wraps + * defaultEvents.emit to chain the SSE broadcast after the log writer; an + * unhandled throw in the log writer would otherwise abort the broadcast. + * Engine-time sink chains stay loud (a throwing engine sink is a bug). + */ +class FaultIsolatedSink implements EventSink { + constructor(private inner: EventSink) {} + emit(event: EngineEvent): void { + try { + this.inner.emit(event); + } catch (err) { + console.error("[runtime] event sink threw on emit:", err); + } + } +} + /** * Tracks parent engine run state for delegate context. * Listens to engine events to maintain current runId and iteration count. @@ -1467,22 +1485,21 @@ export class Runtime { }; engineConfig.toolPromotion = this.buildToolPromotionFactory(); - // Emit chat.start so the client knows the conversation ID immediately - // and conversation list UIs can refresh + // Emit chat.start so the client knows the conversation ID immediately. if (requestSink) { requestSink.emit({ type: "chat.start", data: { conversationId: conversation.id }, }); - // Notify conversation browser UIs that a new conversation exists - if (!request.conversationId) { - requestSink.emit({ - type: "data.changed", - data: { server: "conversations", tool: "list" }, - }); - } } + // Surface a new conversation the moment its file exists — the user + // message is already on disk by this point, so the list shows the + // conversation with the message preview as its label. Once title + // generation settles, the title-block `.finally` fires a second + // broadcast that flips the label to the generated title (#155). + if (!request.conversationId) this.notifyConversationsChanged(); + const result = await runWithRequestContext(reqCtx, () => engine.run(engineConfig, systemPrompt, messages, tools), ); @@ -1523,6 +1540,10 @@ export class Runtime { // chaining, a `void store.update(...)` orphan rejection (e.g. ENOENT when // the conversation was deleted between chat() returning and the title // landing) surfaces as an unhandled rejection and fails the whole run. + // + // The post-turn broadcast above already surfaced this conversation in the + // list (labelled with its message preview); the `.finally` below + // broadcasts again so the label flips to the generated title (#155). if (conversation.title === null) { const titleModel = this.resolveModelFn(this.getModelSlot("fast")); const titleInput = @@ -1534,8 +1555,9 @@ export class Runtime { // Title generation is best-effort; a failed write must not crash // the chat. Common causes: model latency timeout (generateTitle), // or ENOENT on the conversation file (deleted concurrently). - console.error("[runtime] title generation failed:", err); - }); + console.error("[runtime] title generation or persist failed:", err); + }) + .finally(() => this.notifyConversationsChanged()); } return { @@ -2307,6 +2329,20 @@ export class Runtime { return this._bundleMcpDepsFactory?.(wsId); } + /** + * Broadcast a conversations-list refresh on the runtime's default sink — + * the one api/server.ts wraps to drive the `/v1/events` SSE broadcast that + * `useDataSync` consumes. The per-request chat sink never reaches that + * channel, so a conversation created mid-chat would otherwise stay absent + * from the list until a manual refresh (#155). + */ + private notifyConversationsChanged(): void { + this.defaultEvents.emit({ + type: "data.changed", + data: { server: "conversations", tool: "list" }, + }); + } + /** * Get a per-workdir `InstructionsStore` for the org / workspace overlays. * Per-bundle instructions are NOT stored here — bundles own their storage @@ -3396,7 +3432,11 @@ function buildEventSink(config: RuntimeConfig): { logLevel: config.logging?.level ?? "normal", }); } - const events: EventSink = sinks.length > 0 ? new MultiEventSink(sinks) : new NoopEventSink(); + // Isolate each sink in the default fan-out: a logging throw must not abort + // later sinks in the chain (notably the SSE broadcast wrap in api/server.ts). + const isolated = sinks.map((s) => new FaultIsolatedSink(s)); + const events: EventSink = + isolated.length > 0 ? new MultiEventSink(isolated) : new NoopEventSink(); return { events, eventStore }; } diff --git a/test/integration/runtime/chat-start-event.test.ts b/test/integration/runtime/chat-start-event.test.ts index c46fa7b1..e7462b17 100644 --- a/test/integration/runtime/chat-start-event.test.ts +++ b/test/integration/runtime/chat-start-event.test.ts @@ -40,7 +40,7 @@ describe("chat.start event", () => { await runtime.shutdown(); }); - it("emits data.changed when a new conversation is created", async () => { + it("emits data.changed on the default sink when a new conversation is created", async () => { const workDir = join(testDir, "chat-start-new-conv"); mkdirSync(workDir, { recursive: true }); @@ -51,24 +51,50 @@ describe("chat.start event", () => { }); await provisionTestWorkspace(runtime); - const events: EngineEvent[] = []; - const sink: EventSink = { emit: (e) => events.push(e) }; - - // No conversationId provided — triggers new conversation creation - await runtime.chat({ message: "Hello", workspaceId: TEST_WORKSPACE_ID }, sink); - - const dataChangedEvents = events.filter((e) => e.type === "data.changed"); - expect(dataChangedEvents.length).toBeGreaterThanOrEqual(1); - - const convListChange = dataChangedEvents.find( - (e) => e.data.server === "conversations" && e.data.tool === "list", - ); - expect(convListChange).toBeDefined(); - - await runtime.shutdown(); + // Capture events on the runtime's default sink — the one api/server.ts + // wraps to drive the `/v1/events` SSE broadcast (`useDataSync`). A + // data.changed that only reaches the per-request chat sink never gets + // there, so conversation-list iframes stay stale (issue #155). + const sseEvents: EngineEvent[] = []; + const defaultSink = runtime.getEventSink(); + const origEmit = defaultSink.emit.bind(defaultSink); + defaultSink.emit = (e) => { + sseEvents.push(e); + origEmit(e); + }; + + try { + // No conversationId provided — triggers new conversation creation. + await runtime.chat({ message: "Hello", workspaceId: TEST_WORKSPACE_ID }); + + const isConvListChange = (e: EngineEvent) => + e.type === "data.changed" && + e.data.server === "conversations" && + e.data.tool === "list"; + + // Two broadcasts per new conversation: one in the post-turn `finally` + // (surfaces it immediately, labelled with the message preview) and + // one after fire-and-forget title generation settles (updates the + // label to the generated title). + const deadline = Date.now() + 5000; + while (Date.now() < deadline && sseEvents.filter(isConvListChange).length < 2) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + // Let any further (regression) broadcasts land before asserting the count. + await new Promise((resolve) => setTimeout(resolve, 400)); + + // >=2 not ===2: the assertion guards against the conversation-list + // signal regressing to nothing, but stays robust to unrelated + // data.changed emits (e.g. non-nb tool calls fanning out via + // api/server.ts) that may legitimately arrive during a real turn. + expect(sseEvents.filter(isConvListChange).length).toBeGreaterThanOrEqual(2); + } finally { + defaultSink.emit = origEmit; + await runtime.shutdown(); + } }); - it("does NOT emit data.changed when resuming an existing conversation", async () => { + it("does NOT emit data.changed on the default sink when resuming an existing conversation", async () => { const workDir = join(testDir, "chat-start-resume"); mkdirSync(workDir, { recursive: true }); @@ -79,39 +105,69 @@ describe("chat.start event", () => { }); await provisionTestWorkspace(runtime); - // First chat — creates a new conversation - const first = await runtime.chat({ - message: "First message", - workspaceId: TEST_WORKSPACE_ID, - }); - - // Second chat — resume existing conversation, capture events - const events: EngineEvent[] = []; - const sink: EventSink = { emit: (e) => events.push(e) }; + // Wrap before the first chat so the title-gen broadcast is observable. + // A fixed pre-second-chat sleep would race the first turn's async title + // generation under CI load — poll for both first-turn broadcasts instead. + const sseEvents: EngineEvent[] = []; + const defaultSink = runtime.getEventSink(); + const origEmit = defaultSink.emit.bind(defaultSink); + defaultSink.emit = (e) => { + sseEvents.push(e); + origEmit(e); + }; + + try { + const isConvListChange = (e: EngineEvent) => + e.type === "data.changed" && + e.data.server === "conversations" && + e.data.tool === "list"; - await runtime.chat( - { + // First chat — creates a new conversation; emits twice (post-turn + post-title). + const first = await runtime.chat({ + message: "First message", + workspaceId: TEST_WORKSPACE_ID, + }); + + const deadline = Date.now() + 5000; + while (Date.now() < deadline && sseEvents.filter(isConvListChange).length < 2) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + expect(sseEvents.filter(isConvListChange).length).toBeGreaterThanOrEqual(2); + + // Quiesce: the first turn's title-gen `.finally` broadcast is + // fire-and-forget, so it can still land just after the count hits 2. + // Wait for the conversations-list count to stop moving before the + // resume, otherwise a straggler from chat #1 races chat #2 and makes + // this assertion flaky. + let prevCount = -1; + let stableCount = sseEvents.filter(isConvListChange).length; + const settleDeadline = Date.now() + 5000; + while (Date.now() < settleDeadline && stableCount !== prevCount) { + prevCount = stableCount; + await new Promise((resolve) => setTimeout(resolve, 300)); + stableCount = sseEvents.filter(isConvListChange).length; + } + + // Reset the capture window so the resume's emissions are measured in + // isolation from chat #1's (now-settled) broadcasts. + sseEvents.length = 0; + + // Second chat — resume existing conversation. Not new, title set → + // neither the post-turn nor the title-gen path should fire, so no + // conversations-list change should reach the default sink. + await runtime.chat({ message: "Second message", conversationId: first.conversationId, workspaceId: TEST_WORKSPACE_ID, - }, - sink, - ); + }); - // chat.start should still be emitted - const chatStartEvents = events.filter((e) => e.type === "chat.start"); - expect(chatStartEvents).toHaveLength(1); - expect(chatStartEvents[0]!.data.conversationId).toBe(first.conversationId); - - // data.changed with server=conversations should NOT be emitted - const convListChange = events.filter( - (e) => - e.type === "data.changed" && - e.data.server === "conversations" && - e.data.tool === "list", - ); - expect(convListChange).toHaveLength(0); + // Allow any straggler broadcast to land before asserting none fired. + await new Promise((resolve) => setTimeout(resolve, 600)); - await runtime.shutdown(); + expect(sseEvents.filter(isConvListChange).length).toBe(0); + } finally { + defaultSink.emit = origEmit; + await runtime.shutdown(); + } }); });