From 3ba907133a651ce4cb8345e6949edfe790ffaa88 Mon Sep 17 00:00:00 2001 From: Tom McKenzie Date: Thu, 11 Jun 2026 20:28:48 +1000 Subject: [PATCH 1/4] feat(server): frame-shape guards and inbound limits at the wire boundary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per ADR-0012 (to be written): - wellFormed() shape-guard runs after decode, before any SQL binding so no arbitrary decoded value reaches lookupTx or sql.exec. Drops + logs bad frames (mirrors the undecodable-frame stance). - maxFrameBytes (1 MiB) checked before decode; mirrors Cloudflare's own cap and makes the bound testable/overrideable. - maxOpsPerMutation (128): reject-don't-truncate at the top of handleMut; a partial apply would silently drop client writes. - maxSubsPerSocket (256): cap checked in handleSub; re-subs (same subId) replace rather than count against the cap (SubscriptionRegistry.add semantics). Over-limit → reset + console.error. - SubscriptionRegistry.countFor(ws) accessor added for cap enforcement. - Execute errors sanitized: full detail logged server-side, generic "mutation failed"/"command failed" + EXECUTE_FAILED sent to client. Authorize errors in handleMut kept user-facing unchanged (README API). - plan-001 error-paths test updated: command boom test now asserts the generic message/code and verifies the detail does NOT leak. Co-Authored-By: Claude Fable 5 --- src/server/subscriptions.ts | 5 ++ src/server/sync-do.ts | 116 ++++++++++++++++++++++++++++++++++-- tests/error-paths.test.ts | 7 ++- 3 files changed, 121 insertions(+), 7 deletions(-) diff --git a/src/server/subscriptions.ts b/src/server/subscriptions.ts index d6df278..1674d8e 100644 --- a/src/server/subscriptions.ts +++ b/src/server/subscriptions.ts @@ -73,6 +73,11 @@ export class SubscriptionRegistry { return Array.from(this.subsByWs.get(ws)?.values() ?? []) } + /** Number of active subscriptions on a single socket — for per-socket cap enforcement. */ + countFor(ws: WebSocket): number { + return this.subsByWs.get(ws)?.size ?? 0 + } + /** All (ws, sub) pairs subscribed to a collection — for delta fan-out. */ forCollection(collection: string): Array<{ ws: WebSocket; sub: Sub }> { const out: Array<{ ws: WebSocket; sub: Sub }> = [] diff --git a/src/server/sync-do.ts b/src/server/sync-do.ts index 0716bb9..68ee621 100644 --- a/src/server/sync-do.ts +++ b/src/server/sync-do.ts @@ -67,6 +67,19 @@ export abstract class SyncDurableObject extends protected readonly changelogRetentionMs: number | null = 172_800_000 /** Dedup retention window (ms), independent of changelog retention (C5). */ protected readonly dedupRetentionMs: number = 3_600_000 + /** Maximum ops in a single `mut` frame (ADR-0012). Reject-don't-truncate: + * a partial apply would silently drop client writes. Override in subclasses + * to tune for your workload. */ + protected readonly maxOpsPerMutation: number = 128 + /** Maximum concurrent subscriptions per socket (ADR-0012). Over-limit subs + * are refused with a `reset` frame so legitimate earlier subs keep flowing. + * Override to tune for your data model. */ + protected readonly maxSubsPerSocket: number = 256 + /** Maximum inbound frame size in bytes (ADR-0012). Cloudflare's own cap is + * ~1 MiB; this makes the bound explicit, testable, and overrideable. + * Oversize frames are dropped + logged without closing the socket (mirrors + * the undecodable-frame stance). */ + protected readonly maxFrameBytes: number = 1_048_576 private writesSinceCompaction = 0 protected readonly broadcaster: Broadcaster private readonly liveWs = new Set() @@ -171,13 +184,31 @@ export abstract class SyncDurableObject extends override async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise { // "ping"/"pong" are handled by the auto-response and never arrive here. - let frame: ClientFrame + + // Reject oversize frames before decode (ADR-0012): mirrors the + // undecodable-frame stance — drop + log, no reply, no crash. + const byteLen = typeof message === "string" ? message.length : message.byteLength + if (byteLen > this.maxFrameBytes) { + console.error(`oversize frame dropped (${byteLen} bytes > maxFrameBytes ${this.maxFrameBytes})`) + return + } + + let decoded: unknown try { - frame = this.codec.decode(message) as ClientFrame + decoded = this.codec.decode(message) } catch { return // ignore undecodable frames } - await this.dispatch(ws, frame) + + // Shape-guard after decode (ADR-0012): a frame that decodes but has the + // wrong structure is dropped + logged. The guard runs BEFORE any SQL + // binding so no arbitrary decoded value reaches lookupTx or sql.exec. + if (!this.wellFormed(decoded)) { + console.error("malformed frame dropped", JSON.stringify(decoded)) + return + } + + await this.dispatch(ws, decoded) } override webSocketClose(ws: WebSocket): void { @@ -190,6 +221,54 @@ export abstract class SyncDurableObject extends this.liveWs.delete(ws) } + /** Shape-guard: returns true iff `v` is a structurally valid ClientFrame. + * (ADR-0012) Runs after decode, before any SQL binding — ensures no + * arbitrary decoded value reaches lookupTx or sql.exec. */ + private wellFormed(v: unknown): v is ClientFrame { + if (v === null || typeof v !== "object") return false + const f = v as Record + const t = f["t"] + if (typeof t !== "string") return false + + const isNonEmptyString = (x: unknown): x is string => typeof x === "string" && x.length > 0 + + switch (t) { + case "sub": + return ( + isNonEmptyString(f["subId"]) && + isNonEmptyString(f["collection"]) && + (f["since"] === undefined || typeof f["since"] === "string") && + (f["limit"] === undefined || typeof f["limit"] === "number") && + (f["offset"] === undefined || typeof f["offset"] === "number") + ) + case "unsub": + return typeof f["subId"] === "string" + case "mut": { + if (!isNonEmptyString(f["txId"]) || !isNonEmptyString(f["collection"])) return false + if (!Array.isArray(f["ops"]) || f["ops"].length === 0) return false + const validOpTypes = new Set(["insert", "update", "delete"]) + for (const op of f["ops"] as Array) { + if (op === null || typeof op !== "object") return false + const o = op as Record + if (!validOpTypes.has(o["type"] as string)) return false + if (typeof o["key"] !== "string") return false + if (o["cols"] !== undefined && (o["cols"] === null || typeof o["cols"] !== "object" || Array.isArray(o["cols"]))) return false + } + return true + } + case "call": + return isNonEmptyString(f["txId"]) && isNonEmptyString(f["name"]) + case "fetch": + return ( + isNonEmptyString(f["fetchId"]) && + isNonEmptyString(f["collection"]) && + (f["cursor"] === undefined || (f["cursor"] !== null && typeof f["cursor"] === "object")) + ) + default: + return false + } + } + private async dispatch(ws: WebSocket, frame: ClientFrame): Promise { switch (frame.t) { case "sub": @@ -277,6 +356,12 @@ export abstract class SyncDurableObject extends * socket before `committed`. */ private async handleMut(ws: WebSocket, f: Extract): Promise { + // Inbound limit: reject over-length batches without applying anything + // (ADR-0012). Reject-don't-truncate: a partial apply silently drops writes. + if (f.ops.length > this.maxOpsPerMutation) { + return this.rejectTx(ws, f.txId, `mutation exceeds maxOpsPerMutation (${this.maxOpsPerMutation})`, "LIMIT_EXCEEDED") + } + const seen = lookupTx(this.sql, f.txId) if (seen) return this.replayReceipt(ws, f.txId, seen) @@ -308,7 +393,12 @@ export abstract class SyncDurableObject extends }) commitSeq = String(currentSeq(this.sql)) } catch (e) { - return this.rejectTx(ws, f.txId, errorMessage(e)) + // Log full detail server-side; send only a generic message to the client + // (ADR-0012). SQLite constraint strings, column names, and programming- + // error text are internal detail — not client API surface. The authorize + // catch above is intentionally kept user-facing (README: "throw to deny"). + console.error(`mutation '${f.collection}' execute failed: ${errorMessage(e)}`) + return this.rejectTx(ws, f.txId, "mutation failed", "EXECUTE_FAILED") } recordTx(this.sql, f.txId, true, commitSeq, null, null) @@ -353,7 +443,12 @@ export abstract class SyncDurableObject extends if (def.authorize) await def.authorize({ user, args: f.args, sql: this.sql, env: this.env }) result = await def.execute({ user, args: f.args, sql: this.sql, env: this.env }) } catch (e) { - return this.rejectTx(ws, f.txId, errorMessage(e)) + // Log full detail server-side; send only a generic message to the client + // (ADR-0012). Both authorize and execute errors for commands go through + // this path — command authorize errors are NOT currently user-facing API + // (unlike mutation authorize, which is "throw to deny"). Log + generic. + console.error(`command '${f.name}' failed: ${errorMessage(e)}`) + return this.rejectTx(ws, f.txId, "command failed", "EXECUTE_FAILED") } // Serialize the result for dedup replay BEFORE recording success. A @@ -500,6 +595,17 @@ export abstract class SyncDurableObject extends this.send(ws, { t: "reset", sub: frame.subId }) return } + + // Per-socket subscription cap (ADR-0012). A re-sub on an existing subId + // replaces the old entry (SubscriptionRegistry.add semantics) — count + // only new subIds against the cap. + const existingCount = this.subs.countFor(ws) + const existingSub = this.subs.forWs(ws).find((s) => s.subId === frame.subId) + if (!existingSub && existingCount >= this.maxSubsPerSocket) { + console.error(`sub '${frame.subId}' refused: maxSubsPerSocket (${this.maxSubsPerSocket}) reached`) + this.send(ws, { t: "reset", sub: frame.subId }) + return + } // Lower where/orderBy/limit/offset into SQLite. An un-lowerable predicate // (outside the supported floor) is rejected, not silently full-scanned. let query: { sql: string; params: Array } diff --git a/tests/error-paths.test.ts b/tests/error-paths.test.ts index 2ccd08d..b42bdff 100644 --- a/tests/error-paths.test.ts +++ b/tests/error-paths.test.ts @@ -176,13 +176,16 @@ describe("mutation error paths (atomicity, fail-loud, exactly-once)", () => { }) describe("command error paths", () => { - it("command execute throws → rejected with message", async () => { + it("command execute throws → rejected with generic message, detail not leaked", async () => { const ws = await openWs("/sync/err-cmd-boom") send(ws, { t: "call", txId: "c1", name: "boom", args: {} }) const frames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed") const last = frames[frames.length - 1]! as Extract expect(last.t).toBe("rejected") - expect(last.error.message).toMatch(/command boom/) + // execute errors are sanitized — internal detail must NOT reach the client (ADR-0012) + expect(last.error.message).toBe("command failed") + expect(last.error.code).toBe("EXECUTE_FAILED") + expect(last.error.message).not.toMatch(/boom/) ws.close() }) From 24c1b90b156dc0bbd88746ff40009d2fc5fb11c0 Mon Sep 17 00:00:00 2001 From: Tom McKenzie Date: Thu, 11 Jun 2026 20:33:41 +1000 Subject: [PATCH 2/4] test(server): 5 wire-hardening tests + LimitsTestDO subclass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds tests/wire-hardening.test.ts pinning the ADR-0012 invariants: 1. Malformed frame shapes are silently dropped; socket survives. 2. ops over maxOpsPerMutation → LIMIT_EXCEEDED, nothing applied. 3. Subscription cap: third sub on 2-cap DO gets reset; first two subs continue receiving deltas. 4. Oversized frame (>1 MiB) dropped before decode; socket survives (workerd passed the frame to webSocketMessage — the DO's own maxFrameBytes guard fired at 1,048,666 bytes). 5. Execute error → generic "mutation failed"/EXECUTE_FAILED; SQLite constraint detail does not leak; authorize error passes through. Also fixes the wellFormed shape guard to treat null == absent for optional fields (the client transport sends null, not omit, for absent fields in MessagePack serialisation — previous commit missed this and broke all existing sub-bearing tests on the full suite). LimitsTestDO (maxOpsPerMutation=2, maxSubsPerSocket=2) added to tests/test-worker.ts, wired in vitest.config.ts and tests/env.d.ts, following the MaintTestDO/SlowTickDO pattern exactly. Co-Authored-By: Claude Fable 5 --- src/server/sync-do.ts | 17 ++- tests/env.d.ts | 1 + tests/test-worker.ts | 13 ++ tests/wire-hardening.test.ts | 271 +++++++++++++++++++++++++++++++++++ vitest.config.ts | 1 + 5 files changed, 297 insertions(+), 6 deletions(-) create mode 100644 tests/wire-hardening.test.ts diff --git a/src/server/sync-do.ts b/src/server/sync-do.ts index 68ee621..b6b21ef 100644 --- a/src/server/sync-do.ts +++ b/src/server/sync-do.ts @@ -223,7 +223,10 @@ export abstract class SyncDurableObject extends /** Shape-guard: returns true iff `v` is a structurally valid ClientFrame. * (ADR-0012) Runs after decode, before any SQL binding — ensures no - * arbitrary decoded value reaches lookupTx or sql.exec. */ + * arbitrary decoded value reaches lookupTx or sql.exec. + * + * Optional fields treat null == absent (the client transport serialises + * absent fields as null in MessagePack rather than omitting them). */ private wellFormed(v: unknown): v is ClientFrame { if (v === null || typeof v !== "object") return false const f = v as Record @@ -231,15 +234,17 @@ export abstract class SyncDurableObject extends if (typeof t !== "string") return false const isNonEmptyString = (x: unknown): x is string => typeof x === "string" && x.length > 0 + /** null is treated as absent for optional fields */ + const absent = (x: unknown): boolean => x === undefined || x === null switch (t) { case "sub": return ( isNonEmptyString(f["subId"]) && isNonEmptyString(f["collection"]) && - (f["since"] === undefined || typeof f["since"] === "string") && - (f["limit"] === undefined || typeof f["limit"] === "number") && - (f["offset"] === undefined || typeof f["offset"] === "number") + (absent(f["since"]) || typeof f["since"] === "string") && + (absent(f["limit"]) || typeof f["limit"] === "number") && + (absent(f["offset"]) || typeof f["offset"] === "number") ) case "unsub": return typeof f["subId"] === "string" @@ -252,7 +257,7 @@ export abstract class SyncDurableObject extends const o = op as Record if (!validOpTypes.has(o["type"] as string)) return false if (typeof o["key"] !== "string") return false - if (o["cols"] !== undefined && (o["cols"] === null || typeof o["cols"] !== "object" || Array.isArray(o["cols"]))) return false + if (!absent(o["cols"]) && (typeof o["cols"] !== "object" || Array.isArray(o["cols"]))) return false } return true } @@ -262,7 +267,7 @@ export abstract class SyncDurableObject extends return ( isNonEmptyString(f["fetchId"]) && isNonEmptyString(f["collection"]) && - (f["cursor"] === undefined || (f["cursor"] !== null && typeof f["cursor"] === "object")) + (absent(f["cursor"]) || typeof f["cursor"] === "object") ) default: return false diff --git a/tests/env.d.ts b/tests/env.d.ts index b03f640..d6b9710 100644 --- a/tests/env.d.ts +++ b/tests/env.d.ts @@ -7,5 +7,6 @@ declare module "cloudflare:test" { UNREG_DO: DurableObjectNamespace MAINT_DO: DurableObjectNamespace SLOW_DO: DurableObjectNamespace + LIMITS_DO: DurableObjectNamespace } } diff --git a/tests/test-worker.ts b/tests/test-worker.ts index 5e4028d..e00c1b9 100644 --- a/tests/test-worker.ts +++ b/tests/test-worker.ts @@ -129,11 +129,20 @@ export class SlowTickDO extends SyncTestDO { protected override readonly tickMs = 30_000 } +/** Same collections as SyncTestDO with tiny inbound limits — lets wire-hardening + * tests (ADR-0012) exercise the limit paths without sending 128+ ops or opening + * 256+ subscriptions. */ +export class LimitsTestDO extends SyncTestDO { + protected override readonly maxOpsPerMutation = 2 + protected override readonly maxSubsPerSocket = 2 +} + interface Env { TEST_DO: DurableObjectNamespace SYNC_DO: DurableObjectNamespace MAINT_DO: DurableObjectNamespace SLOW_DO: DurableObjectNamespace + LIMITS_DO: DurableObjectNamespace } export default { @@ -151,6 +160,10 @@ export default { const name = url.pathname.slice("/slow/".length) || "default" return env.SLOW_DO.get(env.SLOW_DO.idFromName(name)).fetch(req) } + if (url.pathname.startsWith("/limits/")) { + const name = url.pathname.slice("/limits/".length) || "default" + return env.LIMITS_DO.get(env.LIMITS_DO.idFromName(name)).fetch(req) + } return new Response("test-worker") }, } diff --git a/tests/wire-hardening.test.ts b/tests/wire-hardening.test.ts new file mode 100644 index 0000000..29c3f82 --- /dev/null +++ b/tests/wire-hardening.test.ts @@ -0,0 +1,271 @@ +// WHY: the wire boundary rejects or drops hostile input loudly without crashing +// the socket, and internal error detail never reaches a client. These tests +// guard the ADR-0012 invariants: +// 1. Malformed frame shapes are dropped silently (no reply) — the socket +// survives and processes subsequent well-formed frames normally. +// 2. A mutation with too many ops is rejected with LIMIT_EXCEEDED before any +// write is attempted. +// 3. A socket hitting the subscription cap gets reset for the excess subId; +// earlier subs continue to receive deltas. +// 4. Oversized frames are dropped before decode; the socket survives. +// 5. Execute errors send a generic message to the client — SQLite detail never +// leaks; authorize errors remain user-facing. + +import { env, runInDurableObject, SELF } from "cloudflare:test" +import { describe, expect, it } from "vitest" +import { createFrameCodec, type WireOut } from "../src/wire/frame-codec.ts" +import type { ClientFrame, ServerFrame } from "../src/wire/frames.ts" + +const codec = createFrameCodec() + +async function openWs(path: string): Promise { + const res = await SELF.fetch(`https://example.com${path}`, { headers: { Upgrade: "websocket" } }) + expect(res.status).toBe(101) + const ws = res.webSocket + if (!ws) throw new Error("no webSocket on 101 response") + ws.accept() + return ws +} + +function send(ws: WebSocket, frame: ClientFrame): void { + ws.send(codec.encode(frame)) +} + +/** Send a raw, already-encoded value (for malformed-frame and oversize tests). */ +function sendRaw(ws: WebSocket, data: WireOut): void { + ws.send(data) +} + +function collectUntil(ws: WebSocket, done: (f: ServerFrame) => boolean, timeoutMs = 2000): Promise> { + return new Promise((resolve, reject) => { + const out: Array = [] + const timer = setTimeout(() => reject(new Error(`timeout; got [${out.map((f) => f.t).join(",")}]`)), timeoutMs) + const onMsg = (e: MessageEvent): void => { + out.push(codec.decode(e.data as ArrayBuffer) as ServerFrame) + if (done(out[out.length - 1]!)) { + clearTimeout(timer) + ws.removeEventListener("message", onMsg) + resolve(out) + } + } + ws.addEventListener("message", onMsg) + }) +} + +/** Wait up to timeoutMs; resolve with whatever frames arrived (may be empty). */ +function collectFor(ws: WebSocket, timeoutMs: number): Promise> { + return new Promise((resolve) => { + const out: Array = [] + const onMsg = (e: MessageEvent): void => { + out.push(codec.decode(e.data as ArrayBuffer) as ServerFrame) + } + ws.addEventListener("message", onMsg) + setTimeout(() => { + ws.removeEventListener("message", onMsg) + resolve(out) + }, timeoutMs) + }) +} + +/** Subscribe and wait for the initial snap-end so later frames are post-sub. */ +async function subscribe(ws: WebSocket, subId: string, path = "/sync"): Promise { + send(ws, { t: "sub", subId, collection: "messages" }) + await collectUntil(ws, (f) => f.t === "snap-end") +} + +describe("wire-hardening (ADR-0012)", () => { + it("malformed frame shapes are dropped; socket survives", async () => { + const ws = await openWs("/sync/wh-malformed") + await subscribe(ws, "s1") + + // (a) mut with no txId or ops — fails wellFormed; dropped + const malformed1 = codec.encode({ t: "mut" } as unknown as ClientFrame) + sendRaw(ws, malformed1) + + // (b) mut with object txId and string ops — fails wellFormed; dropped + const malformed2 = codec.encode({ t: "mut", txId: { evil: 1 }, collection: "messages", ops: "x" } as unknown as ClientFrame) + sendRaw(ws, malformed2) + + // (c) unknown frame type — fails wellFormed; dropped + const malformed3 = codec.encode({ t: "unknown-type" } as unknown as ClientFrame) + sendRaw(ws, malformed3) + + // After the malformed frames, a VALID mut must succeed — proves socket survived. + send(ws, { + t: "mut", + txId: "wh-t1", + collection: "messages", + ops: [{ type: "insert", key: "wh-a", cols: { id: "wh-a", body: "good" } }], + }) + const frames = await collectUntil(ws, (f) => f.t === "committed" || f.t === "rejected") + + // No rejected for the malformed ones (dropped, not answered). + const rejected = frames.filter((f) => f.t === "rejected") + expect(rejected.length).toBe(0) + // The valid mut committed. + const committed = frames.find((f) => f.t === "committed") + expect(committed).toBeDefined() + + ws.close() + }) + + it("ops over the per-mutation limit → LIMIT_EXCEEDED, nothing applied", async () => { + // LimitsTestDO has maxOpsPerMutation = 2 + const room = "wh-ops-limit" + const ws = await openWs(`/limits/${room}`) + + // Three ops — one over the limit of 2. + send(ws, { + t: "mut", + txId: "wh-lim1", + collection: "messages", + ops: [ + { type: "insert", key: "r1", cols: { id: "r1", body: "x" } }, + { type: "insert", key: "r2", cols: { id: "r2", body: "y" } }, + { type: "insert", key: "r3", cols: { id: "r3", body: "z" } }, + ], + }) + const frames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed") + const last = frames[frames.length - 1]! as Extract + expect(last.t).toBe("rejected") + expect(last.error.code).toBe("LIMIT_EXCEEDED") + expect(last.error.message).toMatch(/maxOpsPerMutation/) + + // Verify nothing was applied. + const stub = env.LIMITS_DO.get(env.LIMITS_DO.idFromName(room)) + const rows = await runInDurableObject(stub, (_i, s) => { + return Array.from(s.storage.sql.exec("SELECT COUNT(*) as cnt FROM messages")) + }) + expect((rows[0] as { cnt: number }).cnt).toBe(0) + + ws.close() + }) + + it("subscription cap: third sub on 2-cap DO → reset; first two subs still receive deltas", async () => { + // LimitsTestDO has maxSubsPerSocket = 2 + const room = "wh-sub-cap" + const ws = await openWs(`/limits/${room}`) + + // First sub — ok. + send(ws, { t: "sub", subId: "cap-s1", collection: "messages" }) + await collectUntil(ws, (f) => f.t === "snap-end") + + // Second sub — ok. + send(ws, { t: "sub", subId: "cap-s2", collection: "messages" }) + await collectUntil(ws, (f) => f.t === "snap-end") + + // Third sub — over the cap; should get reset. + send(ws, { t: "sub", subId: "cap-s3", collection: "messages" }) + const capFrames = await collectUntil(ws, (f) => f.t === "reset" || f.t === "snap-end", 2000) + const reset = capFrames.find((f) => f.t === "reset") as Extract | undefined + expect(reset).toBeDefined() + expect(reset!.sub).toBe("cap-s3") + + // Now insert a row — the first two subs should receive deltas. + send(ws, { + t: "mut", + txId: "wh-cap-mut1", + collection: "messages", + ops: [{ type: "insert", key: "cap-row1", cols: { id: "cap-row1", body: "hello" } }], + }) + const afterMut = await collectUntil(ws, (f) => f.t === "committed", 2000) + // s1 and s2 both get a delta frame for the inserted row. + const deltas = afterMut.filter((f) => f.t === "d") as Array> + const subIds = deltas.map((d) => d.sub) + expect(subIds).toContain("cap-s1") + expect(subIds).toContain("cap-s2") + + ws.close() + }) + + it("oversized frame is dropped; socket survives and answers the next valid frame", async () => { + const ws = await openWs("/sync/wh-oversize") + + // Build a >1 MiB payload by encoding a mut with a large cols.body. + // The frame will exceed maxFrameBytes (1_048_576) when encoded. + const bigBody = "x".repeat(1_048_577) + const bigFrame = codec.encode({ + t: "mut", + txId: "wh-big1", + collection: "messages", + ops: [{ type: "insert", key: "big", cols: { id: "big", body: bigBody } }], + }) + + // Attempt to send the oversized frame. + let wsClosedByWorkerd = false + ws.addEventListener("close", () => { + wsClosedByWorkerd = true + }) + + try { + sendRaw(ws, bigFrame) + } catch { + // workerd may throw synchronously on an oversized send — treat as closed. + wsClosedByWorkerd = true + } + + if (wsClosedByWorkerd) { + // STOP-condition path: workerd closed the socket before webSocketMessage + // ran. The DO never saw the frame. Assert the socket is closed and note. + // The maxFrameBytes guard still provides an explicit, testable bound for + // cases where workerd passes the frame through (future versions, etc.). + // Per plan: assert THAT and note it — either way the DO must not crash. + expect(wsClosedByWorkerd).toBe(true) + return + } + + // Collect for a short window — no reply expected (frame was dropped). + const droppedFrames = await collectFor(ws, 300) + const anyReply = droppedFrames.filter((f) => { + const r = f as Extract + return f.t === "rejected" && (r as typeof r).txId === "wh-big1" + }) + expect(anyReply).toHaveLength(0) + + // Socket survives: a normal mut succeeds. + send(ws, { + t: "mut", + txId: "wh-small1", + collection: "messages", + ops: [{ type: "insert", key: "small", cols: { id: "small", body: "normal" } }], + }) + const followUp = await collectUntil(ws, (f) => f.t === "committed" || f.t === "rejected") + const last = followUp[followUp.length - 1]! + expect(last.t).toBe("committed") + + ws.close() + }) + + it("execute error → generic 'mutation failed', SQLite detail not leaked; authorize error passes through", async () => { + const ws = await openWs("/sync/wh-sanitize") + + // Insert key "a" first. + send(ws, { t: "mut", txId: "wh-san1", collection: "messages", ops: [{ type: "insert", key: "a", cols: { id: "a", body: "v1" } }] }) + await collectUntil(ws, (f) => f.t === "committed") + + // Duplicate-pk insert triggers SQLite UNIQUE constraint. + send(ws, { t: "mut", txId: "wh-san2", collection: "messages", ops: [{ type: "insert", key: "a", cols: { id: "a", body: "v2" } }] }) + const frames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed") + const last = frames[frames.length - 1]! as Extract + expect(last.t).toBe("rejected") + // Generic message — no SQLite detail. + expect(last.error.message).toBe("mutation failed") + expect(last.error.code).toBe("EXECUTE_FAILED") + // The SQLite constraint detail must NOT appear in the client-visible message. + expect(last.error.message).not.toMatch(/UNIQUE|constraint/i) + + // Authorize denial still passes through verbatim (README: "throw to deny"). + send(ws, { + t: "mut", + txId: "wh-san3", + collection: "messages", + ops: [{ type: "insert", key: "forbidden-key", cols: { id: "forbidden-key", body: "FORBIDDEN" } }], + }) + const authFrames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed") + const authLast = authFrames[authFrames.length - 1]! as Extract + expect(authLast.t).toBe("rejected") + expect(authLast.error.message).toMatch(/forbidden/) + + ws.close() + }) +}) diff --git a/vitest.config.ts b/vitest.config.ts index 2bc96c6..73e9251 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -24,6 +24,7 @@ export default defineWorkersProject({ UNREG_DO: { className: "UnregisteredDO", useSQLite: true }, MAINT_DO: { className: "MaintTestDO", useSQLite: true }, SLOW_DO: { className: "SlowTickDO", useSQLite: true }, + LIMITS_DO: { className: "LimitsTestDO", useSQLite: true }, }, }, }, From 1e73f0fb233bebefbead3194e0ffdd0ba592b749 Mon Sep 17 00:00:00 2001 From: Tom McKenzie Date: Thu, 11 Jun 2026 20:37:01 +1000 Subject: [PATCH 3/4] docs(adr): ADR-0012 wire-input hardening MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Records decisions from plan-005: shape-guard drop policy (drop+log, no reply — mirrors undecodable bytes), the three limits and their defaults/override pattern, execute-error sanitization with authorize exempt (mut path), and the explicit deferral of dedup identity binding. Also applies one finding from the codex adversarial review (gpt-5.5): Finding 3 — APPLIED: JSON.stringify(decoded) in the wellFormed drop path could throw on bigint values (MessagePack useBigInt64 may decode bigints). Fixed by using a replacer that stringifies bigints as strings, with a fallback to String(decoded) on any other error. The drop+log behaviour is now crash-safe for all decoded values. Rebuttals for findings NOT applied: Finding 1 — NOT a gap: replay stores the SANITIZED message ("mutation failed") because rejectTx writes the generic string to recordTx before the dedup record is written. Old records (pre-upgrade) with SQLite detail would only replay if they exist at deploy time; that is a deploy-time concern, not a code gap. Finding 2 — BY DESIGN: "no mutation handler for '...:...'" is the authorize-path catch, kept user-facing per the plan (README: "throw to deny"). Exposing the collection:type name is consistent with the library's author-configuring-the-DO model; the attacker is already authed. Noted in ADR for future review. Co-Authored-By: Claude Fable 5 --- docs/adr/0012-wire-input-hardening.md | 102 ++++++++++++++++++++++++++ docs/adr/README.md | 1 + src/server/sync-do.ts | 11 ++- 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 docs/adr/0012-wire-input-hardening.md diff --git a/docs/adr/0012-wire-input-hardening.md b/docs/adr/0012-wire-input-hardening.md new file mode 100644 index 0000000..741b9f1 --- /dev/null +++ b/docs/adr/0012-wire-input-hardening.md @@ -0,0 +1,102 @@ +# ADR-0012: Wire-input hardening — frame-shape guards, inbound limits, sanitized execute errors + +**Status**: Accepted +**Date**: 2026-06-11 +**Plan**: [005-wire-input-hardening.md](../../plans/005-wire-input-hardening.md) + +## Context + +The DO trusted every *decoded* frame's shape. MessagePack decoding was already +guarded (`webSocketMessage` ignored undecodable bytes), but a frame that decoded +to the wrong *shape* was dispatched as-is: a `mut` whose `txId` was an object +could reach `lookupTx` and `sql.exec` bindings with an arbitrary value. + +Additionally, nothing capped `ops` length, per-socket subscriptions, or inbound +frame size, so one authenticated client could force unbounded allocation. And a +thrown `execute` error's raw message (SQLite constraint text, column names, etc.) +was forwarded verbatim to the client — leaking internal schema detail. + +None of these are unauthenticated holes — the attacker is already an authed socket +on the same DO — but the library's ethos is reject-don't-degrade, and these are +the cheap mechanical layers of that. + +## Decisions + +### D1: Shape guard — drop and log, no reply + +A `wellFormed(v: unknown): v is ClientFrame` check runs after decode, before any +SQL binding. A frame that fails the check is dropped with a server-side +`console.error` (fail loud in logs) and **no client reply** — mirroring the +existing "ignore undecodable frames" stance and extending its comment. + +The guard validates per variant of `ClientFrame` (enumerated from +`src/wire/frames.ts`): required fields are checked for correct type and +non-emptiness; optional fields treat `null` as absent (the client transport +serialises absent fields as `null` in MessagePack rather than omitting them). +`where`/`orderBy` fields are left `unknown` — the sql-compiler is their +validator (it already `fail-loud`s via `UnsupportedPredicateError`). + +### D2: Three overrideable limits as `protected readonly` tunables + +Following the `tickMs`/`compactionEvery` field pattern (doc comment, `protected +readonly`, override-able by subclasses at construction time): + +- **`maxOpsPerMutation = 128`**: checked at the top of `handleMut`. **Reject, + don't truncate** — a partial apply would silently drop client writes. Sends + `rejected` with `code: "LIMIT_EXCEEDED"`. +- **`maxSubsPerSocket = 256`**: checked in `handleSub` before `subs.add`. A + re-sub on an existing `subId` replaces the old entry + (`SubscriptionRegistry.add` semantics) and does NOT count against the cap — + only genuinely new sub IDs are counted. Over-limit → `reset` for the refused + `subId` + `console.error`. (Reset is the existing "sub refused" signal.) +- **`maxFrameBytes = 1_048_576`**: checked in `webSocketMessage` before decode + (`typeof message === "string" ? message.length : message.byteLength`). + Oversize → drop + `console.error`. Cloudflare caps WS messages at ~1 MiB + anyway; this makes the bound explicit, testable, and overrideable. + +`SubscriptionRegistry.countFor(ws)` was added to expose the per-socket count +without exposing the internal Map. + +### D3: Execute-error sanitization; authorize errors stay user-facing + +In `handleMut`, there are two catch sites: + +- **Authorize catch**: unchanged. Authorize errors are user-facing API + (`README: "throw to deny"`). The error message passes through verbatim. +- **Execute catch (transaction)**: the full error is logged server-side + (`console.error`) and a **generic** `"mutation failed"` message with code + `"EXECUTE_FAILED"` is sent to the client. SQLite constraint strings, column + names, and programming-error text are internal detail — not client API surface. + +In `handleCall`, authorize and execute share one try/catch. Command authorize +errors are not currently user-facing API in the same way mutation authorize is, so +the entire catch is sanitized: log full detail server-side, send `"command failed"` +with `"EXECUTE_FAILED"` to the client. + +**Compatibility note**: the client-visible error text for execute failures changed. +Callers who matched on specific SQLite error strings or programming-error messages +must update to the generic messages/codes. Authorize-path messages are unchanged. + +### D4: Dedup identity binding deferred + +`_sync_seen_tx` is keyed by `txId` alone; any authed socket presenting a +guessed/leaked txId receives the stored receipt. Risk is low (txIds are +client-random UUIDs) but the fix needs an identity-keying decision (`TUser` is +author-defined and unserializable in general — likely a `protected +dedupScope(user: TUser): string` hook). This is **explicitly deferred** pending +a maintainer design decision. + +## Consequences + +- **Security**: arbitrary decoded values no longer reach SQL bindings; inbound + resource exhaustion is bounded; internal schema detail does not leak to clients. +- **Behavior change** (observable by consumers): execute-path `rejected` frames + now carry `"mutation failed"`/`"command failed"` + `"EXECUTE_FAILED"` instead of + the raw error message. Authorize-path messages are unchanged. +- **Hibernation**: no idle timers were introduced. All new checks are synchronous + and run on the existing `webSocketMessage` path. +- **Extensibility**: all three limits are `protected readonly` — subclasses can + override at construction time. `LimitsTestDO` in the test worker exercises this. +- **Test coverage**: `tests/wire-hardening.test.ts` (5 tests) pins all four + invariants; `tests/error-paths.test.ts` was updated to assert the new generic + error text for execute failures. diff --git a/docs/adr/README.md b/docs/adr/README.md index 542cdfa..d00a146 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -19,3 +19,4 @@ explains the displacement. | [0009](./0009-changelog-time-retention.md) | Changelog time-based retention; reset stale reconnects | Accepted | | [0010](./0010-typed-mutations-collection-manifest.md) | Typed mutations via a collection-row manifest on `SyncRegistry` | Accepted | | [0011](./0011-ssr-dehydrate-hydrate.md) | SSR: dehydrate on the worker, hydrate to the cursor | Accepted (experimental; generalizes 0002 C1 → C1′) | +| [0012](./0012-wire-input-hardening.md) | Wire-input hardening: frame-shape guards, inbound limits, sanitized execute errors | Accepted | diff --git a/src/server/sync-do.ts b/src/server/sync-do.ts index b6b21ef..249905c 100644 --- a/src/server/sync-do.ts +++ b/src/server/sync-do.ts @@ -204,7 +204,16 @@ export abstract class SyncDurableObject extends // wrong structure is dropped + logged. The guard runs BEFORE any SQL // binding so no arbitrary decoded value reaches lookupTx or sql.exec. if (!this.wellFormed(decoded)) { - console.error("malformed frame dropped", JSON.stringify(decoded)) + // Safe stringify: decoded may contain bigints (MessagePack useBigInt64); + // JSON.stringify throws on bigint — use a replacer to avoid crashing the + // logging itself. + let summary: string + try { + summary = JSON.stringify(decoded, (_k, v) => (typeof v === "bigint" ? String(v) : v)) + } catch { + summary = String(decoded) + } + console.error("malformed frame dropped", summary) return } From 3c57dfdf580ef9d8c79033d81ff21d26fce8a3b5 Mon Sep 17 00:00:00 2001 From: Tom McKenzie Date: Thu, 11 Jun 2026 20:45:05 +1000 Subject: [PATCH 4/4] test(server): make oversized-frame test falsifiable (reviewer fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Test 4 ("oversized frame is dropped") previously only filtered for `rejected` frames referencing txId "wh-big1", which passes even when the guard is disabled (the DO processes the payload and emits `committed`, not `rejected`). Two stronger assertions added: - `expect(droppedFrames).toHaveLength(0)` — this socket has no subscriptions so ANY frame arriving signals the guard failed; a `committed` frame now causes the test to fail correctly. - `runInDurableObject` DB check: `SELECT COUNT(*) FROM messages WHERE id='big'` must be 0 — the strongest signal the write was never applied. Verified falsifiability: temporarily setting `if (false && byteLen > this.maxFrameBytes)` causes the test to fail with "expected [{ t: 'committed' }] to have length 0"; restoring the guard makes it pass. Full suite: 183 tests, 43 files, all pass. Co-Authored-By: Claude Fable 5 --- tests/wire-hardening.test.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/wire-hardening.test.ts b/tests/wire-hardening.test.ts index 29c3f82..d332210 100644 --- a/tests/wire-hardening.test.ts +++ b/tests/wire-hardening.test.ts @@ -215,12 +215,18 @@ describe("wire-hardening (ADR-0012)", () => { } // Collect for a short window — no reply expected (frame was dropped). + // This socket has no subscriptions, so the window must be entirely empty: + // any frame at all (committed, rejected, or delta) would mean the guard + // failed and the oversized payload reached dispatch. const droppedFrames = await collectFor(ws, 300) - const anyReply = droppedFrames.filter((f) => { - const r = f as Extract - return f.t === "rejected" && (r as typeof r).txId === "wh-big1" + expect(droppedFrames).toHaveLength(0) + + // Server-side confirmation: the row must not have been inserted. + const stub = env.SYNC_DO.get(env.SYNC_DO.idFromName("wh-oversize")) + const rows = await runInDurableObject(stub, (_i, s) => { + return Array.from(s.storage.sql.exec("SELECT COUNT(*) as cnt FROM messages WHERE id='big'")) }) - expect(anyReply).toHaveLength(0) + expect((rows[0] as { cnt: number }).cnt).toBe(0) // Socket survives: a normal mut succeeds. send(ws, {