Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/api/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ export async function handleChatStream(
event.type === "tool.preparing.done" ||
event.type === "tool.start" ||
event.type === "tool.done" ||
event.type === "llm.done" ||
event.type === "data.changed"
event.type === "llm.done"
) {
if (event.type === "chat.start") {
broadcastUserMessageOnce();
Expand Down
4 changes: 4 additions & 0 deletions src/conversation/auto-title.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import type { LanguageModelV3 } from "@ai-sdk/provider";

/** Hard cap so a hung fast-model call can't leave the caller's promise pending. */
const TITLE_TIMEOUT_MS = 10_000;

/**
* Generate a short conversation title using the provided model.
* Non-blocking — call fire-and-forget after first turn.
Expand Down Expand Up @@ -30,6 +33,7 @@ export async function generateTitle(
},
],
maxOutputTokens: 30,
abortSignal: AbortSignal.timeout(TITLE_TIMEOUT_MS),
});
const textBlock = result.content.find((b) => b.type === "text");
if (textBlock?.type === "text") {
Expand Down
64 changes: 52 additions & 12 deletions src/runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,24 @@ class MultiEventSink implements EventSink {
}
}

/**
* Wraps a sink so a throw in `emit` is logged instead of propagating. Used
* only at the `defaultEvents` fan-out (`buildEventSink`) — server.ts wraps
* defaultEvents.emit to chain the SSE broadcast after the log writer; an
* unhandled throw in the log writer would otherwise abort the broadcast.
* Engine-time sink chains stay loud (a throwing engine sink is a bug).
*/
class FaultIsolatedSink implements EventSink {
constructor(private inner: EventSink) {}
emit(event: EngineEvent): void {
try {
this.inner.emit(event);
} catch (err) {
console.error("[runtime] event sink threw on emit:", err);
}
}
}

/**
* Tracks parent engine run state for delegate context.
* Listens to engine events to maintain current runId and iteration count.
Expand Down Expand Up @@ -1467,22 +1485,21 @@ export class Runtime {
};
engineConfig.toolPromotion = this.buildToolPromotionFactory();

// Emit chat.start so the client knows the conversation ID immediately
// and conversation list UIs can refresh
// Emit chat.start so the client knows the conversation ID immediately.
if (requestSink) {
requestSink.emit({
type: "chat.start",
data: { conversationId: conversation.id },
});
// Notify conversation browser UIs that a new conversation exists
if (!request.conversationId) {
requestSink.emit({
type: "data.changed",
data: { server: "conversations", tool: "list" },
});
}
}

// Surface a new conversation the moment its file exists — the user
// message is already on disk by this point, so the list shows the
// conversation with the message preview as its label. Once title
// generation settles, the title-block `.finally` fires a second
// broadcast that flips the label to the generated title (#155).
if (!request.conversationId) this.notifyConversationsChanged();

const result = await runWithRequestContext(reqCtx, () =>
engine.run(engineConfig, systemPrompt, messages, tools),
);
Expand Down Expand Up @@ -1523,6 +1540,10 @@ export class Runtime {
// chaining, a `void store.update(...)` orphan rejection (e.g. ENOENT when
// the conversation was deleted between chat() returning and the title
// landing) surfaces as an unhandled rejection and fails the whole run.
//
// The post-turn broadcast above already surfaced this conversation in the
// list (labelled with its message preview); the `.finally` below
// broadcasts again so the label flips to the generated title (#155).
if (conversation.title === null) {
const titleModel = this.resolveModelFn(this.getModelSlot("fast"));
const titleInput =
Expand All @@ -1534,8 +1555,9 @@ export class Runtime {
// Title generation is best-effort; a failed write must not crash
// the chat. Common causes: model latency timeout (generateTitle),
// or ENOENT on the conversation file (deleted concurrently).
console.error("[runtime] title generation failed:", err);
});
console.error("[runtime] title generation or persist failed:", err);
})
.finally(() => this.notifyConversationsChanged());
}

return {
Expand Down Expand Up @@ -2307,6 +2329,20 @@ export class Runtime {
return this._bundleMcpDepsFactory?.(wsId);
}

/**
* Broadcast a conversations-list refresh on the runtime's default sink —
* the one api/server.ts wraps to drive the `/v1/events` SSE broadcast that
* `useDataSync` consumes. The per-request chat sink never reaches that
* channel, so a conversation created mid-chat would otherwise stay absent
* from the list until a manual refresh (#155).
*/
private notifyConversationsChanged(): void {
this.defaultEvents.emit({
type: "data.changed",
data: { server: "conversations", tool: "list" },
});
}

/**
* Get a per-workdir `InstructionsStore` for the org / workspace overlays.
* Per-bundle instructions are NOT stored here — bundles own their storage
Expand Down Expand Up @@ -3396,7 +3432,11 @@ function buildEventSink(config: RuntimeConfig): {
logLevel: config.logging?.level ?? "normal",
});
}
const events: EventSink = sinks.length > 0 ? new MultiEventSink(sinks) : new NoopEventSink();
// Isolate each sink in the default fan-out: a logging throw must not abort
// later sinks in the chain (notably the SSE broadcast wrap in api/server.ts).
const isolated = sinks.map((s) => new FaultIsolatedSink(s));
const events: EventSink =
isolated.length > 0 ? new MultiEventSink(isolated) : new NoopEventSink();
return { events, eventStore };
}

Expand Down
146 changes: 101 additions & 45 deletions test/integration/runtime/chat-start-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe("chat.start event", () => {
await runtime.shutdown();
});

it("emits data.changed when a new conversation is created", async () => {
it("emits data.changed on the default sink when a new conversation is created", async () => {
const workDir = join(testDir, "chat-start-new-conv");
mkdirSync(workDir, { recursive: true });

Expand All @@ -51,24 +51,50 @@ describe("chat.start event", () => {
});
await provisionTestWorkspace(runtime);

const events: EngineEvent[] = [];
const sink: EventSink = { emit: (e) => events.push(e) };

// No conversationId provided — triggers new conversation creation
await runtime.chat({ message: "Hello", workspaceId: TEST_WORKSPACE_ID }, sink);

const dataChangedEvents = events.filter((e) => e.type === "data.changed");
expect(dataChangedEvents.length).toBeGreaterThanOrEqual(1);

const convListChange = dataChangedEvents.find(
(e) => e.data.server === "conversations" && e.data.tool === "list",
);
expect(convListChange).toBeDefined();

await runtime.shutdown();
// Capture events on the runtime's default sink — the one api/server.ts
// wraps to drive the `/v1/events` SSE broadcast (`useDataSync`). A
// data.changed that only reaches the per-request chat sink never gets
// there, so conversation-list iframes stay stale (issue #155).
const sseEvents: EngineEvent[] = [];
const defaultSink = runtime.getEventSink();
const origEmit = defaultSink.emit.bind(defaultSink);
defaultSink.emit = (e) => {
sseEvents.push(e);
origEmit(e);
};

try {
// No conversationId provided — triggers new conversation creation.
await runtime.chat({ message: "Hello", workspaceId: TEST_WORKSPACE_ID });

const isConvListChange = (e: EngineEvent) =>
e.type === "data.changed" &&
e.data.server === "conversations" &&
e.data.tool === "list";

// Two broadcasts per new conversation: one in the post-turn `finally`
// (surfaces it immediately, labelled with the message preview) and
// one after fire-and-forget title generation settles (updates the
// label to the generated title).
const deadline = Date.now() + 5000;
while (Date.now() < deadline && sseEvents.filter(isConvListChange).length < 2) {
await new Promise((resolve) => setTimeout(resolve, 50));
}
// Let any further (regression) broadcasts land before asserting the count.
await new Promise((resolve) => setTimeout(resolve, 400));

// >=2 not ===2: the assertion guards against the conversation-list
// signal regressing to nothing, but stays robust to unrelated
// data.changed emits (e.g. non-nb tool calls fanning out via
// api/server.ts) that may legitimately arrive during a real turn.
expect(sseEvents.filter(isConvListChange).length).toBeGreaterThanOrEqual(2);
} finally {
defaultSink.emit = origEmit;
await runtime.shutdown();
}
});

it("does NOT emit data.changed when resuming an existing conversation", async () => {
it("does NOT emit data.changed on the default sink when resuming an existing conversation", async () => {
const workDir = join(testDir, "chat-start-resume");
mkdirSync(workDir, { recursive: true });

Expand All @@ -79,39 +105,69 @@ describe("chat.start event", () => {
});
await provisionTestWorkspace(runtime);

// First chat — creates a new conversation
const first = await runtime.chat({
message: "First message",
workspaceId: TEST_WORKSPACE_ID,
});

// Second chat — resume existing conversation, capture events
const events: EngineEvent[] = [];
const sink: EventSink = { emit: (e) => events.push(e) };
// Wrap before the first chat so the title-gen broadcast is observable.
// A fixed pre-second-chat sleep would race the first turn's async title
// generation under CI load — poll for both first-turn broadcasts instead.
const sseEvents: EngineEvent[] = [];
const defaultSink = runtime.getEventSink();
const origEmit = defaultSink.emit.bind(defaultSink);
defaultSink.emit = (e) => {
sseEvents.push(e);
origEmit(e);
};

try {
const isConvListChange = (e: EngineEvent) =>
e.type === "data.changed" &&
e.data.server === "conversations" &&
e.data.tool === "list";

await runtime.chat(
{
// First chat — creates a new conversation; emits twice (post-turn + post-title).
const first = await runtime.chat({
message: "First message",
workspaceId: TEST_WORKSPACE_ID,
});

const deadline = Date.now() + 5000;
while (Date.now() < deadline && sseEvents.filter(isConvListChange).length < 2) {
await new Promise((resolve) => setTimeout(resolve, 50));
}
expect(sseEvents.filter(isConvListChange).length).toBeGreaterThanOrEqual(2);

// Quiesce: the first turn's title-gen `.finally` broadcast is
// fire-and-forget, so it can still land just after the count hits 2.
// Wait for the conversations-list count to stop moving before the
// resume, otherwise a straggler from chat #1 races chat #2 and makes
// this assertion flaky.
let prevCount = -1;
let stableCount = sseEvents.filter(isConvListChange).length;
const settleDeadline = Date.now() + 5000;
while (Date.now() < settleDeadline && stableCount !== prevCount) {
prevCount = stableCount;
await new Promise((resolve) => setTimeout(resolve, 300));
stableCount = sseEvents.filter(isConvListChange).length;
}

// Reset the capture window so the resume's emissions are measured in
// isolation from chat #1's (now-settled) broadcasts.
sseEvents.length = 0;

// Second chat — resume existing conversation. Not new, title set →
// neither the post-turn nor the title-gen path should fire, so no
// conversations-list change should reach the default sink.
await runtime.chat({
message: "Second message",
conversationId: first.conversationId,
workspaceId: TEST_WORKSPACE_ID,
},
sink,
);
});

// chat.start should still be emitted
const chatStartEvents = events.filter((e) => e.type === "chat.start");
expect(chatStartEvents).toHaveLength(1);
expect(chatStartEvents[0]!.data.conversationId).toBe(first.conversationId);

// data.changed with server=conversations should NOT be emitted
const convListChange = events.filter(
(e) =>
e.type === "data.changed" &&
e.data.server === "conversations" &&
e.data.tool === "list",
);
expect(convListChange).toHaveLength(0);
// Allow any straggler broadcast to land before asserting none fired.
await new Promise((resolve) => setTimeout(resolve, 600));

await runtime.shutdown();
expect(sseEvents.filter(isConvListChange).length).toBe(0);
} finally {
defaultSink.emit = origEmit;
await runtime.shutdown();
}
});
});