diff --git a/.changeset/hold-connection-cap.md b/.changeset/hold-connection-cap.md new file mode 100644 index 0000000..6194cde --- /dev/null +++ b/.changeset/hold-connection-cap.md @@ -0,0 +1,24 @@ +--- +"sideshow": patch +--- + +Held SSE (`/api/events`) and long-poll (`/api/comments?wait=N`) connections +are now bounded per workspace. Both are GETs that pin a socket open and, on a +`publicRead` board, are reachable unauthenticated — without a ceiling a flood +could exhaust connections (cleanup was already correct, there was just no +limit). Once over `maxHoldConnections` (default 32, configurable via +`AppOptions`), new held connections return `503`; an instant `?wait=0` read +still succeeds since it doesn't hold a slot. Slots release exactly once on +stream/request abort or normal return. The default is sized for the real +concurrency of a single-user workspace — a few viewer tabs (one SSE each) plus +active agent long-polls, including a multi-agent session with several agents +connected at once — since one workspace is one user; a real flood is orders of +magnitude bigger, so the cap rejects it regardless of the exact default. + +Also indexes the referenced-asset set used by `/a/:id`'s optimistic-read wait +and asset eviction: it was re-parsing every post's `surfaces` + `history` JSON +on each call (a full-table scan on every `/a/:id` miss), and is now built +lazily and maintained incrementally on post create/update and invalidated on +remove. History is append-only, so an asset id once referenced stays +referenced until its whole post is deleted — the cache stays correct without +re-scanning. diff --git a/server/app.ts b/server/app.ts index c6c9adf..e93a85d 100644 --- a/server/app.ts +++ b/server/app.ts @@ -52,6 +52,20 @@ const MAX_STEP_LABEL = 500; // edge to keep one oversize value from bloating the agent's context forever. const MAX_COMMENT_TEXT = 8000; const MAX_TITLE = 500; +// Ceiling on concurrently-held SSE + long-poll connections. Both are GETs that +// pin a connection open (the event stream indefinitely, /api/comments?wait up +// to MAX_WAIT_SECONDS); on a publicRead board they're reachable unauthenticated, +// so without a cap a flood exhausts sockets. One app instance is one workspace +// (a single Durable Object), and one workspace is one user — so legitimate +// concurrent holds are small but not tiny: each open viewer tab holds one SSE, +// and each active agent holds a long-poll (and possibly its own SSE). A +// multi-agent session with 5 agents plus a few viewer tabs can legitimately +// reach ~15. 32 clears that with headroom while still bounding a flood on a +// no-token local board — and a real flood is orders of magnitude bigger, so +// rejecting at 32 vs 16 makes no difference to flood protection, only to +// legitimate use. Configurable so deployments can tune it and tests can +// exercise the cap cheaply. +const DEFAULT_MAX_HOLD_CONNECTIONS = 32; // Asset serving policy: only raster images are served inline; everything else // (incl. svg, json, text, the octet-stream catch-all) is an attachment, so a @@ -141,6 +155,10 @@ export interface AppOptions { upgradeCommand?: string; // Test seam: replaces the npm-registry/GitHub lookup for the latest release. fetchLatestRelease?: () => Promise; + // Max concurrently-held SSE (`/api/events`) + long-poll (`/api/comments?wait`) + // connections before new ones are rejected with 503. Bounds a connection flood + // on publicRead boards; defaults to DEFAULT_MAX_HOLD_CONNECTIONS. + maxHoldConnections?: number; } export interface LatestRelease { @@ -270,10 +288,29 @@ export function createApp({ version, upgradeCommand, fetchLatestRelease, + maxHoldConnections = DEFAULT_MAX_HOLD_CONNECTIONS, }: AppOptions) { const app = new Hono(); const bus = new EventBus(); + // Live count of held SSE + long-poll connections, gated by maxHoldConnections. + // Each holder increments on entry and releases exactly once via a guarded + // release() wired to every exit (stream abort, request abort, normal return). + let holdConnections = 0; + const acquireHold = (): boolean => { + if (holdConnections >= maxHoldConnections) return false; + holdConnections++; + return true; + }; + const makeRelease = () => { + let released = false; + return () => { + if (released) return; + released = true; + holdConnections--; + }; + }; + // Rendered-document cache for /s/:id rich surfaces. Rendering a markdown/code/ // diff surface runs shiki / @pierre-diffs SSR, which is non-trivial (a big diff // is tens of ms + tens of KB), so memoize the finished document string. The @@ -929,6 +966,8 @@ export function createApp({ // Long-poll friendly: ?wait=N holds the request open up to N seconds until // a matching comment arrives. This is how terminal agents block on feedback. + // A wait counts against the connection cap (it pins a socket just like SSE); + // an instant ?wait=0 read does not. app.get("/api/comments", async (c) => { const sessionId = c.req.query("session"); const surfaceId = c.req.query("surface") ?? c.req.query("snippet"); @@ -944,12 +983,31 @@ export function createApp({ } } } + const waitSeconds = Number(c.req.query("wait") ?? 0) || 0; + if (waitSeconds > 0) { + if (!acquireHold()) return c.json({ error: "too many concurrent connections" }, 503); + const release = makeRelease(); + // If the client disconnects mid-wait, release the slot promptly. + c.req.raw.signal.addEventListener("abort", release, { once: true }); + try { + const result = await waitForComments({ + sessionId, + surfaceId, + author: c.req.query("author"), + afterSeq: c.req.query("after") ? Number(c.req.query("after")) : undefined, + waitSeconds, + }); + return c.json(result); + } finally { + release(); + } + } const result = await waitForComments({ sessionId, surfaceId, author: c.req.query("author"), afterSeq: c.req.query("after") ? Number(c.req.query("after")) : undefined, - waitSeconds: Number(c.req.query("wait") ?? 0) || 0, + waitSeconds: 0, }); return c.json(result); }); @@ -1134,6 +1192,12 @@ export function createApp({ if (!sessionId) return c.json({ error: "session required" }, 401); if (!(await store.getSession(sessionId))) return c.json({ error: "session not found" }, 404); } + if (!acquireHold()) return c.json({ error: "too many concurrent connections" }, 503); + const release = makeRelease(); + // Safety net: if the client disconnects before the stream callback opens, + // the request abort still releases the slot. close() below is guarded so a + // later abort firing release again is a no-op. + c.req.raw.signal.addEventListener("abort", release, { once: true }); const eventSessionId = (event: Parameters[0]>[0]) => { if ("sessionId" in event) return event.sessionId; if (event.type.startsWith("session-")) return event.id; @@ -1148,32 +1212,40 @@ export function createApp({ wake?.(); }); let open = true; + let closed = false; const close = () => { + if (closed) return; + closed = true; open = false; unsubscribe(); wake?.(); + release(); }; stream.onAbort(close); c.req.raw.signal.addEventListener("abort", close, { once: true }); - await stream.writeSSE({ event: "hello", data: "{}" }); - while (open) { - while (queue.length > 0) { - await stream.writeSSE({ data: JSON.stringify(queue.shift()) }); - } - let pingTimer: ReturnType | null = null; - await Promise.race([ - new Promise((resolve) => { - wake = resolve; - }), - new Promise((resolve) => { - pingTimer = setTimeout(resolve, 15000); - }), - ]); - wake = null; - if (pingTimer) clearTimeout(pingTimer); - if (open && queue.length === 0) { - await stream.writeSSE({ event: "ping", data: "{}" }); + try { + await stream.writeSSE({ event: "hello", data: "{}" }); + while (open) { + while (queue.length > 0) { + await stream.writeSSE({ data: JSON.stringify(queue.shift()) }); + } + let pingTimer: ReturnType | null = null; + await Promise.race([ + new Promise((resolve) => { + wake = resolve; + }), + new Promise((resolve) => { + pingTimer = setTimeout(resolve, 15000); + }), + ]); + wake = null; + if (pingTimer) clearTimeout(pingTimer); + if (open && queue.length === 0) { + await stream.writeSSE({ event: "ping", data: "{}" }); + } } + } finally { + close(); } }); }); diff --git a/server/sqlStore.ts b/server/sqlStore.ts index d632ac4..10065ed 100644 --- a/server/sqlStore.ts +++ b/server/sqlStore.ts @@ -32,6 +32,15 @@ import { // One workspace = one database, so plain SQL with no tenant columns. export class SqlStore implements Store { private sql: SqlStorage; + // Cached set of asset ids referenced by any live surface (current or a + // historical version of any post). Built lazily and maintained incrementally + // — createPost/updatePost add to it, removes invalidate it — so isAssetReferenced + // (hit on every /a/:id miss) and putAsset's eviction scan no longer re-parse + // every post's surfaces+history JSON on each call. Stays correct because post + // history is append-only: a surface only ever moves INTO history, so an asset + // id once referenced stays referenced until the whole post (and its history) + // is deleted — at which point we invalidate and recompute from scratch. + private assetRefCache: Set | undefined; constructor(sql: SqlStorage) { this.sql = sql; @@ -273,6 +282,7 @@ export class SqlStore implements Store { // Posts are gone, so referencedAssetIds now reflects survivors only: // drop this session's own assets except any a surviving surface still // points at (assets are content-addressed and may be shared across sessions). + this.invalidateAssetRefs(); const referenced = this.referencedAssetIds(); for (const r of this.sql.exec("SELECT id FROM assets WHERE sessionId = ?", id).toArray()) { const aid = r.id as string; @@ -356,6 +366,7 @@ export class SqlStore implements Store { "[]", ); this.touch(input.sessionId); + this.addAssetRefs(input.surfaces); return surface; } @@ -396,6 +407,7 @@ export class SqlStore implements Store { const affected = this.sql.exec("SELECT changes() AS n").one().n as number; if (affected > 0) { this.touch(surface.sessionId); + if (patch.surfaces !== undefined) this.addAssetRefs(patch.surfaces); return { ...surface, title, surfaces, version, updatedAt, history }; } // Lost the race — retry with the now-current version. @@ -407,6 +419,7 @@ export class SqlStore implements Store { if (!(await this.getPost(id))) return false; this.sql.exec("DELETE FROM comments WHERE postId = ?", id); this.sql.exec("DELETE FROM posts WHERE id = ?", id); + this.invalidateAssetRefs(); return true; } @@ -505,6 +518,7 @@ export class SqlStore implements Store { // --- assets --- private referencedAssetIds(): Set { + if (this.assetRefCache) return this.assetRefCache; const out = new Set(); for (const r of this.sql.exec("SELECT surfaces, history FROM posts").toArray()) { collectAssetIds(JSON.parse(r.surfaces as string) as Surface[], out); @@ -512,9 +526,21 @@ export class SqlStore implements Store { collectAssetIds(h.surfaces, out); } } + this.assetRefCache = out; return out; } + // Fold a freshly-written surfaces list into the cache. If the cache hasn't + // been built yet, skip — the next referencedAssetIds() reads the post from + // disk and picks it up. Only mutates a populated cache. + private addAssetRefs(surfaces: Surface[]): void { + if (this.assetRefCache) collectAssetIds(surfaces, this.assetRefCache); + } + + private invalidateAssetRefs(): void { + this.assetRefCache = undefined; + } + async putAsset(input: CreateAssetInput) { if (!(await this.getSession(input.sessionId))) return null; // Content-addressed: identical bytes dedupe to the existing blob (idempotent @@ -694,6 +720,7 @@ export class SqlStore implements Store { ); } this.sql.exec("COMMIT"); + this.invalidateAssetRefs(); } catch (e) { this.sql.exec("ROLLBACK"); throw e; diff --git a/server/storage.ts b/server/storage.ts index 0920041..ba39cfb 100644 --- a/server/storage.ts +++ b/server/storage.ts @@ -145,6 +145,15 @@ export class JsonFileStore implements Store { private lastSeq = 0; private settings = new Map(); private loaded = false; + // Cached set of asset ids referenced by any live surface (current or a + // historical version of any post). Built lazily and maintained incrementally + // — createPost/updatePost add to it, removes invalidate it — so isAssetReferenced + // (hit on every /a/:id miss) and putAsset's eviction scan don't re-walk every + // post on each call. Correct because post history is append-only: a surface + // only ever moves INTO history, so an asset id once referenced stays + // referenced until the whole post (and its history) is deleted, at which point + // we invalidate and recompute from scratch. + private assetRefCache: Set | undefined; private loadPromise: Promise | null = null; private writeQueue: Promise = Promise.resolve(); private filePath: string; @@ -286,6 +295,7 @@ export class JsonFileStore implements Store { // session only takes its OWN assets down with it, and only those no live // surface still points at (referencedAssetIds is computed after the above // deletes, so it reflects survivors only). + this.invalidateAssetRefs(); const referenced = this.referencedAssetIds(); for (const [aid, asset] of this.assets) { if (asset.sessionId === id && !referenced.has(aid)) this.assets.delete(aid); @@ -351,6 +361,7 @@ export class JsonFileStore implements Store { }; this.surfaces.set(surface.id, surface); this.touch(input.sessionId); + this.addAssetRefs(input.surfaces); await this.persist(); return clone(surface); } @@ -371,6 +382,7 @@ export class JsonFileStore implements Store { surface.version += 1; surface.updatedAt = new Date().toISOString(); this.touch(surface.sessionId); + if (patch.surfaces !== undefined) this.addAssetRefs(patch.surfaces); await this.persist(); return clone(surface); } @@ -381,6 +393,7 @@ export class JsonFileStore implements Store { if (!surface) return false; this.surfaces.delete(id); this.comments = this.comments.filter((c) => c.postId !== id); + this.invalidateAssetRefs(); await this.persist(); return true; } @@ -436,14 +449,27 @@ export class JsonFileStore implements Store { // --- assets --- private referencedAssetIds(): Set { + if (this.assetRefCache) return this.assetRefCache; const out = new Set(); for (const s of this.surfaces.values()) { collectAssetIds(s.surfaces, out); for (const h of s.history) collectAssetIds(h.surfaces, out); } + this.assetRefCache = out; return out; } + // Fold a freshly-written surfaces list into the cache. If the cache hasn't + // been built yet, skip — the next referencedAssetIds() walks the in-memory + // posts and picks it up. Only mutates a populated cache. + private addAssetRefs(surfaces: Surface[]): void { + if (this.assetRefCache) collectAssetIds(surfaces, this.assetRefCache); + } + + private invalidateAssetRefs(): void { + this.assetRefCache = undefined; + } + async putAsset(input: CreateAssetInput) { await this.load(); if (!this.sessions.has(input.sessionId)) return null; diff --git a/test/api.test.ts b/test/api.test.ts index 4a6deb9..f474074 100644 --- a/test/api.test.ts +++ b/test/api.test.ts @@ -13,6 +13,7 @@ function makeApp( basePath?: string; viewerHtml?: string; screenshots?: boolean; + maxHoldConnections?: number; }, ) { const dir = mkdtempSync(join(tmpdir(), "sideshow-test-")); @@ -813,6 +814,71 @@ test("long-poll resolves when a comment arrives", async () => { assert.ok(Date.now() - start < 4000, "should resolve well before the timeout"); }); +// --- connection caps (SSE + long-poll share one per-instance bound) --- + +test("SSE connections are capped; a released slot lets a new one in", async () => { + const app = makeApp(undefined, { maxHoldConnections: 2 }); + const controllers = [new AbortController(), new AbortController()]; + // Two streams fill the cap. The slot is acquired before streamSSE opens, so + // merely having the Response back means it's held — no body read needed. + const streams = await Promise.all( + controllers.map((ac) => app.request("/api/events", { signal: ac.signal })), + ); + assert.ok(streams.every((s) => s.status === 200)); + // A third is rejected. + assert.equal((await app.request("/api/events")).status, 503); + // Releasing one frees a slot for a fresh stream. + controllers[0].abort(); + await streams[0].body!.cancel().catch(() => undefined); + await new Promise((resolve) => setTimeout(resolve, 30)); + const again = await app.request("/api/events", { signal: new AbortController().signal }); + assert.equal(again.status, 200); + // cleanup + controllers[1].abort(); + await streams[1].body!.cancel().catch(() => undefined); + await again.body!.cancel().catch(() => undefined); +}); + +test("long-poll waits count against the hold cap; instant reads do not", async () => { + const app = makeApp(undefined, { maxHoldConnections: 2 }); + const s = (await (await app.request("/api/snippets", json({ html: "

x

" }))).json()) as any; + // Two held long-polls fill the cap; they resolve when a comment lands. + const pending = [ + app.request(`/api/comments?session=${s.sessionId}&wait=5`), + app.request(`/api/comments?session=${s.sessionId}&wait=5`), + ]; + await new Promise((resolve) => setTimeout(resolve, 30)); + // A third held wait is rejected. + assert.equal((await app.request(`/api/comments?session=${s.sessionId}&wait=5`)).status, 503); + // An instant (?wait=0) read is not a held connection — still served at cap. + assert.equal((await app.request(`/api/comments?session=${s.sessionId}`)).status, 200); + // Post a comment so both held waits resolve and release their slots. + await app.request("/api/comments", json({ snippet: s.id, text: "release" })); + const resolved = await Promise.all(pending); + assert.ok(resolved.every((r) => r.status === 200)); +}); + +test("SSE and long-poll share the same connection budget", async () => { + const app = makeApp(undefined, { maxHoldConnections: 2 }); + const s = (await (await app.request("/api/snippets", json({ html: "

x

" }))).json()) as any; + // One SSE + one long-poll fills the shared cap. + const sse = await app.request("/api/events", { signal: new AbortController().signal }); + assert.equal(sse.status, 200); + const poll = app.request(`/api/comments?session=${s.sessionId}&wait=5`); + await new Promise((resolve) => setTimeout(resolve, 30)); + // Neither a second SSE nor a second long-poll fits. + assert.equal((await app.request("/api/events")).status, 503); + assert.equal((await app.request(`/api/comments?session=${s.sessionId}&wait=5`)).status, 503); + // Releasing the long-poll (comment lands) frees a slot for SSE again. + await app.request("/api/comments", json({ snippet: s.id, text: "release" })); + await poll; + const sseAgain = await app.request("/api/events", { signal: new AbortController().signal }); + assert.equal(sseAgain.status, 200); + // cleanup + await sse.body!.cancel().catch(() => undefined); + await sseAgain.body!.cancel().catch(() => undefined); +}); + test("deleting a session cascades to snippets and comments", async () => { const app = makeApp(); const s = (await (await app.request("/api/snippets", json({ html: "

x

" }))).json()) as any; diff --git a/test/storeContract.ts b/test/storeContract.ts index 3860fa2..51af7ae 100644 --- a/test/storeContract.ts +++ b/test/storeContract.ts @@ -685,4 +685,74 @@ export function runStoreContract(name: string, makeStore: () => Store | Promise< assert.ok(got, "referenced asset should survive its owning session's deletion"); assert.deepEqual([...got.data], [7, 7, 7]); }); + + // The referenced-asset index is maintained incrementally (cached, then + // updated on post mutations) instead of recomputed on every read. These + // contracts pin its correctness across each kind of mutation: a stale cache + // after an update or remove would make isAssetReferenced lie. + + contract("an asset referenced only by a removed post is no longer referenced", async (store) => { + const session = await store.createSession({ agent: "pi" }); + const asset = await store.putAsset({ + sessionId: session.id, + kind: "image", + contentType: "image/png", + data: bytes(1, 2, 3), + }); + assert.ok(asset); + const post = await store.createPost({ + sessionId: session.id, + surfaces: [{ kind: "image", assetId: asset.id }], + }); + assert.ok(post); + // Warm the cache (the /a/:id path reads this before any mutation). + assert.equal(await store.isAssetReferenced(asset.id), true); + await store.removePost(post.id); + assert.equal(await store.isAssetReferenced(asset.id), false); + }); + + contract("an update keeps history-referenced assets and adds new ones", async (store) => { + const session = await store.createSession({ agent: "pi" }); + const oldAsset = await store.putAsset({ + sessionId: session.id, + kind: "image", + contentType: "image/png", + data: bytes(1), + }); + const newAsset = await store.putAsset({ + sessionId: session.id, + kind: "image", + contentType: "image/png", + data: bytes(2), + }); + assert.ok(oldAsset && newAsset); + const post = await store.createPost({ + sessionId: session.id, + surfaces: [{ kind: "image", assetId: oldAsset.id }], + }); + assert.ok(post); + // Warm the cache, then update the surface to point at a different asset. + assert.equal(await store.isAssetReferenced(oldAsset.id), true); + const updated = await store.updatePost(post.id, { + surfaces: [{ kind: "image", assetId: newAsset.id }], + }); + assert.ok(updated); + // The old asset is still referenced by the post's history (append-only), + // and the new asset is referenced by the current surface. + assert.equal(await store.isAssetReferenced(oldAsset.id), true); + assert.equal(await store.isAssetReferenced(newAsset.id), true); + }); + + contract("an unreferenced asset is reported unreferenced from a cold cache", async (store) => { + const session = await store.createSession({ agent: "pi" }); + const asset = await store.putAsset({ + sessionId: session.id, + kind: "image", + contentType: "image/png", + data: bytes(9), + }); + assert.ok(asset); + // No post references it; a fresh read (no prior warm-up) must say so. + assert.equal(await store.isAssetReferenced(asset.id), false); + }); }