Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ea229ea
feat(runtime): server-authoritative resumable chat turns (RunBus)
Ovaculos May 25, 2026
83d88af
fix(conversations): rework auto-title prompt to stop response echo (#…
Ovaculos May 25, 2026
549cbd6
feat(web): per-conversation chat viewer over the server turn stream
Ovaculos May 25, 2026
fd7fcea
feat(conversations): live streaming dots + flicker-free list
Ovaculos May 25, 2026
00f4244
refactor(web): remove old streaming-chat client orphaned by the rewrite
Ovaculos May 25, 2026
a86bafd
test(conversations): fix auto-title detection after #253 prompt change
Ovaculos May 25, 2026
9ddfd2e
fix(adapters): log sinks must not throw into the event-emit path
Ovaculos May 25, 2026
46d775b
fix(runtime): deliver `cancelled` frame to live viewers on Stop
Ovaculos May 25, 2026
f2c0c33
fix(api): map ConversationCorruptedError to 422 in handleChatStart
Ovaculos May 25, 2026
66210e1
fix(runtime): serialize startTurn create to prevent a double-create race
Ovaculos May 25, 2026
75e01e3
fix(web): clear stuck streaming on reconnect after the turn ended
Ovaculos May 25, 2026
2ce8d73
fix(conversations): complete a partial disk snapshot on resume (no fl…
Ovaculos May 25, 2026
f4e4a62
docs(conversations): note legacy seq-less broadcast vs RunBus viewer
Ovaculos May 25, 2026
1c2d75f
fix(adapters): warn once per failure episode in best-effort log sinks
Ovaculos May 26, 2026
014492b
docs+chore: surface RunBus single-process limit when replicas > 1
Ovaculos May 26, 2026
96d5e65
fix(runtime): cap per-run RunBus event buffer to prevent unbounded gr…
Ovaculos May 27, 2026
5818db5
fix(tests): hard-error on missing workDir under bun test, fix 8 leakers
Ovaculos May 27, 2026
0556d83
perf(conversations): patch list row title in-place instead of refetching
Ovaculos May 27, 2026
d37e54e
chore: drop stale streamChat refs + scope code-style check to non-bun…
Ovaculos May 27, 2026
296dd62
Merge remote-tracking branch 'upstream/main' into feat/conversations-…
Ovaculos May 27, 2026
9f56a1d
Merge remote-tracking branch 'upstream/main' into feat/conversations-…
Ovaculos May 27, 2026
117378f
Merge remote-tracking branch 'upstream/main' into feat/conversations-…
Ovaculos Jun 1, 2026
8d0a612
refactor(web): consolidate onto the seq-based conversation stream
Ovaculos Jun 1, 2026
92f67b6
fix(web): live-update the conversation list title via the correct ifr…
Ovaculos Jun 2, 2026
6b82c93
fix(web): recover from an unwatchable turn instead of hanging the spi…
Ovaculos Jun 2, 2026
c3a98d8
fix(runtime): allow identity-level chat-start instead of 500 on missi…
Ovaculos Jun 2, 2026
bac91a2
fix(api): reject a malformed conversationId with 400 instead of 500
Ovaculos Jun 2, 2026
c856b0a
fix(runtime): abort in-flight detached turns on shutdown
Ovaculos Jun 2, 2026
7decc8e
docs+chore: clarify conversation.title wsId scoping; trim bug-archaeo…
Ovaculos Jun 2, 2026
ec3d900
Merge upstream/main into feat/conversations-tab-rework
Ovaculos Jun 2, 2026
2852d11
Merge upstream/main into feat/conversations-tab-rework
mgoldsborough Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ Routing requests to the process owning a session's transport is the **load balan
3. `sessionStore.type: "redis"`. Each tenant gets its own Redis instance in its own namespace (see `infra/CLAUDE.md` per-tenant Redis pattern). Default `nb:mcp:session:` keyPrefix is correct under that model.
4. `platform.strategy.type: RollingUpdate`. Only after (1).

**Known limitation under `replicas > 1`: RunBus is single-process.** Chat turn replay/resume (the SSE-stream-backed viewer attaches to a per-conversation event log) lives in-memory on the pod that started the turn. A viewer landing on a different pod sees `isActive:false` for an in-flight turn elsewhere and the live frames don't fan out cross-pod. Sticky routing on `Mcp-Session-Id` (prereq #2) mitigates for the active tab; a pod restart or any cross-pod viewer (other tab/device) still drops resume mid-turn. The clustered Redis-backed RunBus is deferred work, tracked in `src/runtime/run-bus.ts` — `serve` warns at boot when `sessionStore.type === "redis"` so the gap is visible.

**TTL units: seconds at the surface, ms internally.** Operator-facing: `MCP_SESSION_TTL_SECONDS` env (highest priority) > `sessionStore.ttlSeconds` config > 8h default. Conversion to ms happens in `Runtime.getSessionStoreTtlMs()` only — registry constructors and the host's idle sweep both take ms from there. Don't add mixed-unit code elsewhere.

## MCP App Bridge Rules
Expand Down
7 changes: 7 additions & 0 deletions scripts/check-code-style.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ function checkNoInlineTypeImports(): CheckResult {
// this skip the check passes in CI but fails on a developer's machine.
if (file.split(/[\\/]/).includes("node_modules")) continue;
const rel = relative(ROOT, file);
// Skip bundle subtrees (their UIs have their own conventions, per the
// doc comment) and vendored deps. `bun run build:bundles` installs
// node_modules under each bundle's UI, so an unfiltered walk picks
// up thousands of vendored `.d.ts` violations that have nothing to
// do with our source.
if (rel.includes("/node_modules/")) continue;
if (rel.startsWith("src/bundles/")) continue;
const content = readFileSync(file, "utf-8");
const source = ts.createSourceFile(file, content, ts.ScriptTarget.Latest, true);

Expand Down
22 changes: 21 additions & 1 deletion src/adapters/structured-log-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export class StructuredLogSink implements EventSink {
private conversationId: string | undefined;
private userId: string | undefined;
private workspaceId: string | undefined;
/** True after a write failure surfaced a console.warn; reset on the next
* successful write so a recurring failure (after intermittent recovery)
* warns again. Avoids spamming during a sustained outage. */
private writeWarned = false;

constructor(config: StructuredLogConfig) {
this.dir = config.dir;
Expand Down Expand Up @@ -92,7 +96,23 @@ export class StructuredLogSink implements EventSink {
private writeLine(record: Record<string, unknown>): void {
const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD
const filename = `nimblebrain-${today}.jsonl`;
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
try {
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
this.writeWarned = false;
} catch (err) {
// Best-effort logging: a write failure (disk full, perms, or a detached
// turn emitting after the workdir was torn down) must never throw into
// the event-emit path and crash the caller. Surface the first failure of
// an episode so operators see disk/perms incidents; suppress until a
// subsequent success re-arms.
if (!this.writeWarned) {
this.writeWarned = true;
console.warn(
`[structured-log-sink] write to ${this.dir} failed (further failures suppressed until recovery):`,
err instanceof Error ? err.message : err,
);
}
}
}

/** Remove log files older than the retention threshold. */
Expand Down
20 changes: 19 additions & 1 deletion src/adapters/workspace-log-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export interface WorkspaceLogConfig {
*/
export class WorkspaceLogSink implements EventSink {
private dir: string;
/** First-failure-of-an-episode flag — see structured-log-sink. */
private writeWarned = false;

constructor(config: WorkspaceLogConfig) {
this.dir = join(config.dir, "workspace");
Expand All @@ -65,7 +67,23 @@ export class WorkspaceLogSink implements EventSink {

const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD
const filename = `${today}.jsonl`;
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
try {
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
this.writeWarned = false;
} catch (err) {
// Best-effort logging: a write failure (disk full, perms, or a detached
// turn emitting after the workdir was torn down) must never throw into
// the event-emit path and crash the caller. Surface the first failure of
// an episode so operators see disk/perms incidents; suppress until a
// subsequent success re-arms.
if (!this.writeWarned) {
this.writeWarned = true;
console.warn(
`[workspace-log-sink] write to ${this.dir} failed (further failures suppressed until recovery):`,
err instanceof Error ? err.message : err,
);
}
}
}

/** No-op — kept for API compatibility. Writes are synchronous. */
Expand Down
83 changes: 64 additions & 19 deletions src/api/conversation-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
* Separate from SseEventManager which handles workspace-level events.
*/

import type { BufferedRunEvent } from "../runtime/run-bus.ts";

/** A subscriber watching a specific conversation's events. */
interface ConversationSubscriber {
id: string;
Expand All @@ -19,6 +21,13 @@ interface ConversationSubscriber {

const encoder = new TextEncoder();

/** Format an SSE frame. `seq`, when present, is sent as the `id:` line so a
* reconnecting viewer can resume from its last-seen sequence number. */
function frame(eventType: string, data: unknown, seq?: number): Uint8Array {
const idLine = seq != null ? `id: ${seq}\n` : "";
return encoder.encode(`event: ${eventType}\n${idLine}data: ${JSON.stringify(data)}\n\n`);
}

export class ConversationEventManager {
private subscribers = new Map<string, ConversationSubscriber>();
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
Expand Down Expand Up @@ -73,16 +82,33 @@ export class ConversationEventManager {
addSubscriber(
conversationId: string,
userId: string,
replay?: BufferedRunEvent[],
meta?: { isActive: boolean; activeSeq: number },
): { stream: ReadableStream<Uint8Array>; subscriberId: string } {
const id = crypto.randomUUID();
let sub: ConversationSubscriber;

const stream = new ReadableStream<Uint8Array>({
start: (controller) => {
sub = { id, userId, conversationId, controller, closed: false };
// The subscribed frame tells the client whether a turn is in flight
// (so it can trim a stale in-flight turn from disk history before the
// RunBus replay rebuilds it) and its current seq.
controller.enqueue(
frame("subscribed", {
subscriberId: id,
isActive: meta?.isActive ?? false,
activeSeq: meta?.activeSeq ?? 0,
}),
);
// Replay the in-flight turn (if any) BEFORE registering for live
// fan-out. start() runs synchronously and we add to the subscribers
// map only after replaying, so no live event can interleave ahead of
// the replay — viewers never see out-of-order deltas.
if (replay) {
for (const e of replay) controller.enqueue(frame(e.type, e.data, e.seq));
}
this.subscribers.set(id, sub);
const subscribedMsg = `event: subscribed\ndata: ${JSON.stringify({ subscriberId: id })}\n\n`;
controller.enqueue(encoder.encode(subscribedMsg));
},
cancel: () => {
this.removeSubscriber(id);
Expand All @@ -92,6 +118,29 @@ export class ConversationEventManager {
return { stream, subscriberId: id };
}

/**
* Fan out a live run event (with its sequence number) to every subscriber
* of the conversation. The seq lets viewers de-duplicate against replay and
* resume after a reconnect.
*/
publishEvent(conversationId: string, event: BufferedRunEvent): void {
const encoded = frame(event.type, event.data, event.seq);
for (const [id, sub] of this.subscribers) {
if (sub.closed) {
this.subscribers.delete(id);
continue;
}
if (sub.conversationId !== conversationId) continue;
try {
sub.controller.enqueue(encoded);
} catch (err) {
console.warn("[conversation-events] SSE write failed:", err);
this.closeSub(sub);
this.subscribers.delete(id);
}
}
}

/** Remove a specific subscriber. */
removeSubscriber(subscriberId: string): void {
const sub = this.subscribers.get(subscriberId);
Expand All @@ -104,29 +153,25 @@ export class ConversationEventManager {
/**
* Broadcast an event to all subscribers of a specific conversation.
*
* Stage 1 single-owner: every legitimate subscriber to a given
* conversation is the same user (the owner) connected from another
* tab/device. Filtering on `userId` would skip every subscriber
* (round-3 had this bug); not filtering at all double-delivers to
* the sender's own tab (round-4 had this bug — the sender's tab
* receives via both `/v1/chat/stream` and its own
* `/v1/conversations/:id/events` subscription).
*
* The correct filter key is the **subscriber id**: the sender
* passes its current conv-events subscriber id as
* `excludeSubscriberId`, so its own subscription is skipped while
* peer tabs (different subscriber ids, same userId) still receive.
* Conversations are single-owner (Stage 1): every subscriber is the same
* user on another tab/device. The exclusion key is the **subscriber id**,
* not `userId` — filtering by `userId` would skip every tab; not filtering
* double-delivers to the sender (it receives via both `/v1/chat/stream` and
* its own `/v1/conversations/:id/events` subscription). The sender passes
* its conv-events subscriber id as `excludeSubscriberId` so its own
* subscription is skipped while peer tabs still receive. (Stage 4
* multi-participant semantics will need explicit policy gates here.)
*
* Stage 4 will reintroduce multi-participant semantics with
* explicit policy gates; until then, this is the only exclusion
* shape needed.
* Seq-less: unlike {@link publishEvent} (the RunBus path), these frames carry
* no `id:` sequence. A seq-tracking `conversation-stream` viewer applies them
* live but can't replay/resume them. Only `/v1/chat` + `/v1/chat/stream` use
* this; the web shell is RunBus-only.
*
* @param conversationId - Target conversation
* @param eventType - SSE event type (e.g. "text.delta", "user.message")
* @param data - Event data payload
* @param excludeSubscriberId - Optional subscriber id to skip
* (typically the sender's own subscriber id, to prevent
* self-echo on chat-stream-originated broadcasts).
* (typically the sender's own, to prevent self-echo).
*/
broadcastToConversation(
conversationId: string,
Expand Down
5 changes: 5 additions & 0 deletions src/api/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ const SSE_ROUTES: Partial<Record<EngineEventType, SseRoute>> = {
// existing "broadcast to all clients in this process" behavior to avoid
// silently breaking iframe refresh. Revisit when payload grows wsId.
"data.changed": { scope: "global" },
// Live conversation-title update (auto-title generation completes after the
// turn). Workspace-scoped via the conversation's workspaceId breadcrumb so
// it doesn't leak across tenants. The shell routes it to the matching
// conversation slice by `conversationId`.
"conversation.title": { scope: "workspace", wsIdField: "wsId" },
// Org-level config (model preferences, feature flags). Affects every
// workspace; broadcast to all.
"config.changed": { scope: "global" },
Expand Down
110 changes: 101 additions & 9 deletions src/api/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import { resolve } from "node:path";
import { CallbackEventSink } from "../adapters/callback-events.ts";
import { log } from "../cli/log.ts";
import { isToolEnabled, isToolVisibleToRole, type ResolvedFeatures } from "../config/features.ts";
import { CONVERSATION_ID_RE } from "../conversation/types.ts";
import type { EngineEvent, EventSink } from "../engine/types.ts";
import { ingestFiles, isAllowedMime, type UploadedFile } from "../files/ingest.ts";
import { resolveMimeType } from "../files/mime.ts";
import type { FileEntry } from "../files/types.ts";
import type { IdentityProvider, UserIdentity } from "../identity/provider.ts";
import { DEV_IDENTITY } from "../identity/providers/dev.ts";
import {
ConversationAccessDeniedError,
ConversationCorruptedError,
Expand Down Expand Up @@ -79,10 +81,8 @@ export async function handleChat(
// backgrounded tab, network blip) must not cancel the in-flight
// engine loop; the run completes server-side, persists, and is
// replayed to any reconnecting /v1/conversations/:id/events
// subscriber. See the detached `.chat(parsed, sink)` in
// handleChatStream for the full rationale (PR #251 regression). The
// automations executor's deadline cancellation is unaffected — that
// path supplies its own AbortController.
// subscriber. (The automations executor's deadline cancellation is
// unaffected — that path supplies its own AbortController.)
const result = await runtime.chat(parsed);
// Cost is derived at the boundary, never stored. Same wire shape as
// the streaming `done` event so clients see one consistent contract.
Expand Down Expand Up @@ -157,6 +157,88 @@ function runInProgressResponse(conversationId: string): Response {
);
}

/**
* Handle POST /v1/chat/start — kick off a detached, server-authoritative turn
* and return the conversation id immediately. The turn runs to completion on
* the server regardless of this request's lifecycle (closing the tab does NOT
* cancel it). Clients watch the turn via GET /v1/conversations/:id/events,
* which replays the in-flight turn then tails live.
*/
export async function handleChatStart(
request: Request,
runtime: Runtime,
features: ResolvedFeatures,
identity?: UserIdentity,
workspaceId?: string,
): Promise<Response> {
const parsed = await parseChatBody(request, runtime, features, identity, workspaceId);
if (parsed instanceof Response) return parsed;
try {
const { conversationId } = await runtime.startTurn(parsed);
return Response.json({ conversationId });
} catch (err) {
if (err instanceof RunInProgressError) {
return runInProgressResponse(parsed.conversationId ?? "");
}
if (err instanceof ConversationAccessDeniedError) {
return apiError(
403,
"conversation_access_denied",
"You do not have access to this conversation.",
{ conversationId: parsed.conversationId },
);
}
// startTurn → store.load can throw on a pre-migration (ownerless)
// conversation. Map to 422 — parity with handleChat / handleChatCancel —
// instead of leaking a raw 500.
if (err instanceof ConversationCorruptedError) {
return conversationCorruptedResponse(err);
}
throw err;
}
}

/**
* Handle POST /v1/conversations/:id/cancel — the explicit Stop button. The
* ONLY thing that aborts generation; client disconnect does not. Ownership is
* enforced (same posture as the events route).
*/
export async function handleChatCancel(
conversationId: string,
runtime: Runtime,
identity?: UserIdentity,
): Promise<Response> {
const callerId = identity?.id ?? (runtime.getIdentityProvider() ? null : DEV_IDENTITY.id);
if (!callerId) {
return apiError(401, "authentication_required", "Authentication required.");
}
const conversation = await runtime.findConversation(conversationId).catch((err) => {
if (err instanceof ConversationCorruptedError) return err;
throw err;
});
if (conversation instanceof ConversationCorruptedError) {
return apiError(422, "conversation_corrupted", conversation.message, {
conversationId: conversation.conversationId,
reason: conversation.reason,
});
}
if (!conversation) {
return apiError(404, "not_found", "Conversation not found");
}
if (conversation.ownerId !== callerId) {
return apiError(
403,
"conversation_access_denied",
"You do not have access to this conversation.",
{
conversationId,
},
);
}
const cancelled = runtime.cancelTurn(conversationId);
return Response.json({ cancelled });
}

function conversationAccessDeniedResponse(conversationId: string): Response {
return apiError(
403,
Expand Down Expand Up @@ -358,11 +440,10 @@ export async function handleChatStream(
// engine loop. The run completes server-side, persists to the
// conversation store, and replays to any reconnecting
// /v1/conversations/:id/events subscriber — the "leave and come
// back" contract. PR #251 bound the run to the connection and
// silently abandoned prompts the moment a mobile client dropped
// (run.error "The connection was closed"). The one caller that
// must cancel on a deadline — the automations executor — owns its
// own AbortController in bundles/automations/src/executor.ts.
// back" contract. Binding the run to the connection would silently
// abandon a prompt the moment a mobile client dropped. The one
// caller that must cancel on a deadline — the automations executor —
// owns its own AbortController in bundles/automations/src/executor.ts.
.chat(parsed, sink)
.then((result) => {
// Cost is computed at the API boundary — never stored. The
Expand Down Expand Up @@ -1642,6 +1723,17 @@ async function parseMultipartChatBody(
const message = typeof messageRaw === "string" ? messageRaw : "";

const conversationId = formData.get("conversationId");
// Reject a malformed conversationId before it reaches store path-building
// / ingestFiles (convId feeds the file-store path). The JSON surface gets
// this from the ChatRequestBody schema pattern; multipart parses raw, so
// validate the same shape here. Mirrors the canonical conv_<16 hex> regex.
if (
typeof conversationId === "string" &&
conversationId &&
!CONVERSATION_ID_RE.test(conversationId)
) {
return apiError(400, "bad_request", "Invalid conversationId format");
}
const model = formData.get("model");

let appContext: { appName: string; serverName: string } | undefined;
Expand Down
Loading