Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .changeset/hold-connection-cap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"sideshow": patch
---

Held SSE (`/api/events`) and long-poll (`/api/comments?wait=N`) connections
are now bounded per workspace. Both are GETs that pin a socket open and, on a
`publicRead` board, are reachable unauthenticated — without a ceiling a flood
could exhaust connections (cleanup was already correct, there was just no
limit). Once over `maxHoldConnections` (default 32, configurable via
`AppOptions`), new held connections return `503`; an instant `?wait=0` read
still succeeds since it doesn't hold a slot. Slots release exactly once on
stream/request abort or normal return. The default is sized for the real
concurrency of a single-user workspace — a few viewer tabs (one SSE each) plus
active agent long-polls, including a multi-agent session with several agents
connected at once — since one workspace is one user; a real flood is orders of
magnitude bigger, so the cap rejects it regardless of the exact default.

Also indexes the referenced-asset set used by `/a/:id`'s optimistic-read wait
and asset eviction: it was re-parsing every post's `surfaces` + `history` JSON
on each call (a full-table scan on every `/a/:id` miss), and is now built
lazily and maintained incrementally on post create/update and invalidated on
remove. History is append-only, so an asset id once referenced stays
referenced until its whole post is deleted — the cache stays correct without
re-scanning.
110 changes: 91 additions & 19 deletions server/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ const MAX_STEP_LABEL = 500;
// edge to keep one oversize value from bloating the agent's context forever.
const MAX_COMMENT_TEXT = 8000;
const MAX_TITLE = 500;
// Ceiling on concurrently-held SSE + long-poll connections. Both are GETs that
// pin a connection open (the event stream indefinitely, /api/comments?wait up
// to MAX_WAIT_SECONDS); on a publicRead board they're reachable unauthenticated,
// so without a cap a flood exhausts sockets. One app instance is one workspace
// (a single Durable Object), and one workspace is one user — so legitimate
// concurrent holds are small but not tiny: each open viewer tab holds one SSE,
// and each active agent holds a long-poll (and possibly its own SSE). A
// multi-agent session with 5 agents plus a few viewer tabs can legitimately
// reach ~15. 32 clears that with headroom while still bounding a flood on a
// no-token local board — and a real flood is orders of magnitude bigger, so
// rejecting at 32 vs 16 makes no difference to flood protection, only to
// legitimate use. Configurable so deployments can tune it and tests can
// exercise the cap cheaply.
const DEFAULT_MAX_HOLD_CONNECTIONS = 32;

// Asset serving policy: only raster images are served inline; everything else
// (incl. svg, json, text, the octet-stream catch-all) is an attachment, so a
Expand Down Expand Up @@ -141,6 +155,10 @@ export interface AppOptions {
upgradeCommand?: string;
// Test seam: replaces the npm-registry/GitHub lookup for the latest release.
fetchLatestRelease?: () => Promise<LatestRelease | null>;
// Max concurrently-held SSE (`/api/events`) + long-poll (`/api/comments?wait`)
// connections before new ones are rejected with 503. Bounds a connection flood
// on publicRead boards; defaults to DEFAULT_MAX_HOLD_CONNECTIONS.
maxHoldConnections?: number;
}

export interface LatestRelease {
Expand Down Expand Up @@ -270,10 +288,29 @@ export function createApp({
version,
upgradeCommand,
fetchLatestRelease,
maxHoldConnections = DEFAULT_MAX_HOLD_CONNECTIONS,
}: AppOptions) {
const app = new Hono();
const bus = new EventBus();

// Live count of held SSE + long-poll connections, gated by maxHoldConnections.
// Each holder increments on entry and releases exactly once via a guarded
// release() wired to every exit (stream abort, request abort, normal return).
let holdConnections = 0;
const acquireHold = (): boolean => {
if (holdConnections >= maxHoldConnections) return false;
holdConnections++;
return true;
};
const makeRelease = () => {
let released = false;
return () => {
if (released) return;
released = true;
holdConnections--;
};
};

// Rendered-document cache for /s/:id rich surfaces. Rendering a markdown/code/
// diff surface runs shiki / @pierre-diffs SSR, which is non-trivial (a big diff
// is tens of ms + tens of KB), so memoize the finished document string. The
Expand Down Expand Up @@ -929,6 +966,8 @@ export function createApp({

// Long-poll friendly: ?wait=N holds the request open up to N seconds until
// a matching comment arrives. This is how terminal agents block on feedback.
// A wait counts against the connection cap (it pins a socket just like SSE);
// an instant ?wait=0 read does not.
app.get("/api/comments", async (c) => {
const sessionId = c.req.query("session");
const surfaceId = c.req.query("surface") ?? c.req.query("snippet");
Expand All @@ -944,12 +983,31 @@ export function createApp({
}
}
}
const waitSeconds = Number(c.req.query("wait") ?? 0) || 0;
if (waitSeconds > 0) {
if (!acquireHold()) return c.json({ error: "too many concurrent connections" }, 503);
const release = makeRelease();
// If the client disconnects mid-wait, release the slot promptly.
c.req.raw.signal.addEventListener("abort", release, { once: true });
try {
const result = await waitForComments({
sessionId,
surfaceId,
author: c.req.query("author"),
afterSeq: c.req.query("after") ? Number(c.req.query("after")) : undefined,
waitSeconds,
});
return c.json(result);
} finally {
release();
}
}
const result = await waitForComments({
sessionId,
surfaceId,
author: c.req.query("author"),
afterSeq: c.req.query("after") ? Number(c.req.query("after")) : undefined,
waitSeconds: Number(c.req.query("wait") ?? 0) || 0,
waitSeconds: 0,
});
return c.json(result);
});
Expand Down Expand Up @@ -1134,6 +1192,12 @@ export function createApp({
if (!sessionId) return c.json({ error: "session required" }, 401);
if (!(await store.getSession(sessionId))) return c.json({ error: "session not found" }, 404);
}
if (!acquireHold()) return c.json({ error: "too many concurrent connections" }, 503);
const release = makeRelease();
// Safety net: if the client disconnects before the stream callback opens,
// the request abort still releases the slot. close() below is guarded so a
// later abort firing release again is a no-op.
c.req.raw.signal.addEventListener("abort", release, { once: true });
const eventSessionId = (event: Parameters<Parameters<EventBus["subscribe"]>[0]>[0]) => {
if ("sessionId" in event) return event.sessionId;
if (event.type.startsWith("session-")) return event.id;
Expand All @@ -1148,32 +1212,40 @@ export function createApp({
wake?.();
});
let open = true;
let closed = false;
const close = () => {
if (closed) return;
closed = true;
open = false;
unsubscribe();
wake?.();
release();
};
stream.onAbort(close);
c.req.raw.signal.addEventListener("abort", close, { once: true });
await stream.writeSSE({ event: "hello", data: "{}" });
while (open) {
while (queue.length > 0) {
await stream.writeSSE({ data: JSON.stringify(queue.shift()) });
}
let pingTimer: ReturnType<typeof setTimeout> | null = null;
await Promise.race([
new Promise<void>((resolve) => {
wake = resolve;
}),
new Promise<void>((resolve) => {
pingTimer = setTimeout(resolve, 15000);
}),
]);
wake = null;
if (pingTimer) clearTimeout(pingTimer);
if (open && queue.length === 0) {
await stream.writeSSE({ event: "ping", data: "{}" });
try {
await stream.writeSSE({ event: "hello", data: "{}" });
while (open) {
while (queue.length > 0) {
await stream.writeSSE({ data: JSON.stringify(queue.shift()) });
}
let pingTimer: ReturnType<typeof setTimeout> | null = null;
await Promise.race([
new Promise<void>((resolve) => {
wake = resolve;
}),
new Promise<void>((resolve) => {
pingTimer = setTimeout(resolve, 15000);
}),
]);
wake = null;
if (pingTimer) clearTimeout(pingTimer);
if (open && queue.length === 0) {
await stream.writeSSE({ event: "ping", data: "{}" });
}
}
} finally {
close();
}
});
});
Expand Down
27 changes: 27 additions & 0 deletions server/sqlStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ import {
// One workspace = one database, so plain SQL with no tenant columns.
export class SqlStore implements Store {
private sql: SqlStorage;
// Cached set of asset ids referenced by any live surface (current or a
// historical version of any post). Built lazily and maintained incrementally
// — createPost/updatePost add to it, removes invalidate it — so isAssetReferenced
// (hit on every /a/:id miss) and putAsset's eviction scan no longer re-parse
// every post's surfaces+history JSON on each call. Stays correct because post
// history is append-only: a surface only ever moves INTO history, so an asset
// id once referenced stays referenced until the whole post (and its history)
// is deleted — at which point we invalidate and recompute from scratch.
private assetRefCache: Set<string> | undefined;

constructor(sql: SqlStorage) {
this.sql = sql;
Expand Down Expand Up @@ -273,6 +282,7 @@ export class SqlStore implements Store {
// Posts are gone, so referencedAssetIds now reflects survivors only:
// drop this session's own assets except any a surviving surface still
// points at (assets are content-addressed and may be shared across sessions).
this.invalidateAssetRefs();
const referenced = this.referencedAssetIds();
for (const r of this.sql.exec("SELECT id FROM assets WHERE sessionId = ?", id).toArray()) {
const aid = r.id as string;
Expand Down Expand Up @@ -356,6 +366,7 @@ export class SqlStore implements Store {
"[]",
);
this.touch(input.sessionId);
this.addAssetRefs(input.surfaces);
return surface;
}

Expand Down Expand Up @@ -396,6 +407,7 @@ export class SqlStore implements Store {
const affected = this.sql.exec("SELECT changes() AS n").one().n as number;
if (affected > 0) {
this.touch(surface.sessionId);
if (patch.surfaces !== undefined) this.addAssetRefs(patch.surfaces);
return { ...surface, title, surfaces, version, updatedAt, history };
}
// Lost the race — retry with the now-current version.
Expand All @@ -407,6 +419,7 @@ export class SqlStore implements Store {
if (!(await this.getPost(id))) return false;
this.sql.exec("DELETE FROM comments WHERE postId = ?", id);
this.sql.exec("DELETE FROM posts WHERE id = ?", id);
this.invalidateAssetRefs();
return true;
}

Expand Down Expand Up @@ -505,16 +518,29 @@ export class SqlStore implements Store {
// --- assets ---

private referencedAssetIds(): Set<string> {
if (this.assetRefCache) return this.assetRefCache;
const out = new Set<string>();
for (const r of this.sql.exec("SELECT surfaces, history FROM posts").toArray()) {
collectAssetIds(JSON.parse(r.surfaces as string) as Surface[], out);
for (const h of JSON.parse(r.history as string) as PostVersion[]) {
collectAssetIds(h.surfaces, out);
}
}
this.assetRefCache = out;
return out;
}

// Fold a freshly-written surfaces list into the cache. If the cache hasn't
// been built yet, skip — the next referencedAssetIds() reads the post from
// disk and picks it up. Only mutates a populated cache.
private addAssetRefs(surfaces: Surface[]): void {
if (this.assetRefCache) collectAssetIds(surfaces, this.assetRefCache);
}

private invalidateAssetRefs(): void {
this.assetRefCache = undefined;
}

async putAsset(input: CreateAssetInput) {
if (!(await this.getSession(input.sessionId))) return null;
// Content-addressed: identical bytes dedupe to the existing blob (idempotent
Expand Down Expand Up @@ -694,6 +720,7 @@ export class SqlStore implements Store {
);
}
this.sql.exec("COMMIT");
this.invalidateAssetRefs();
} catch (e) {
this.sql.exec("ROLLBACK");
throw e;
Expand Down
26 changes: 26 additions & 0 deletions server/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ export class JsonFileStore implements Store {
private lastSeq = 0;
private settings = new Map<string, string>();
private loaded = false;
// Cached set of asset ids referenced by any live surface (current or a
// historical version of any post). Built lazily and maintained incrementally
// — createPost/updatePost add to it, removes invalidate it — so isAssetReferenced
// (hit on every /a/:id miss) and putAsset's eviction scan don't re-walk every
// post on each call. Correct because post history is append-only: a surface
// only ever moves INTO history, so an asset id once referenced stays
// referenced until the whole post (and its history) is deleted, at which point
// we invalidate and recompute from scratch.
private assetRefCache: Set<string> | undefined;
private loadPromise: Promise<void> | null = null;
private writeQueue: Promise<void> = Promise.resolve();
private filePath: string;
Expand Down Expand Up @@ -286,6 +295,7 @@ export class JsonFileStore implements Store {
// session only takes its OWN assets down with it, and only those no live
// surface still points at (referencedAssetIds is computed after the above
// deletes, so it reflects survivors only).
this.invalidateAssetRefs();
const referenced = this.referencedAssetIds();
for (const [aid, asset] of this.assets) {
if (asset.sessionId === id && !referenced.has(aid)) this.assets.delete(aid);
Expand Down Expand Up @@ -351,6 +361,7 @@ export class JsonFileStore implements Store {
};
this.surfaces.set(surface.id, surface);
this.touch(input.sessionId);
this.addAssetRefs(input.surfaces);
await this.persist();
return clone(surface);
}
Expand All @@ -371,6 +382,7 @@ export class JsonFileStore implements Store {
surface.version += 1;
surface.updatedAt = new Date().toISOString();
this.touch(surface.sessionId);
if (patch.surfaces !== undefined) this.addAssetRefs(patch.surfaces);
await this.persist();
return clone(surface);
}
Expand All @@ -381,6 +393,7 @@ export class JsonFileStore implements Store {
if (!surface) return false;
this.surfaces.delete(id);
this.comments = this.comments.filter((c) => c.postId !== id);
this.invalidateAssetRefs();
await this.persist();
return true;
}
Expand Down Expand Up @@ -436,14 +449,27 @@ export class JsonFileStore implements Store {
// --- assets ---

private referencedAssetIds(): Set<string> {
if (this.assetRefCache) return this.assetRefCache;
const out = new Set<string>();
for (const s of this.surfaces.values()) {
collectAssetIds(s.surfaces, out);
for (const h of s.history) collectAssetIds(h.surfaces, out);
}
this.assetRefCache = out;
return out;
}

// Fold a freshly-written surfaces list into the cache. If the cache hasn't
// been built yet, skip — the next referencedAssetIds() walks the in-memory
// posts and picks it up. Only mutates a populated cache.
private addAssetRefs(surfaces: Surface[]): void {
if (this.assetRefCache) collectAssetIds(surfaces, this.assetRefCache);
}

private invalidateAssetRefs(): void {
this.assetRefCache = undefined;
}

async putAsset(input: CreateAssetInput) {
await this.load();
if (!this.sessions.has(input.sessionId)) return null;
Expand Down
Loading
Loading