Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions docs/adr/0012-wire-input-hardening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# ADR-0012: Wire-input hardening — frame-shape guards, inbound limits, sanitized execute errors

**Status**: Accepted
**Date**: 2026-06-11
**Plan**: [005-wire-input-hardening.md](../../plans/005-wire-input-hardening.md)

## Context

The DO trusted every *decoded* frame's shape. MessagePack decoding was already
guarded (`webSocketMessage` ignored undecodable bytes), but a frame that decoded
to the wrong *shape* was dispatched as-is: a `mut` whose `txId` was an object
could reach `lookupTx` and `sql.exec` bindings with an arbitrary value.

Additionally, nothing capped `ops` length, per-socket subscriptions, or inbound
frame size, so one authenticated client could force unbounded allocation. And a
thrown `execute` error's raw message (SQLite constraint text, column names, etc.)
was forwarded verbatim to the client — leaking internal schema detail.

None of these are unauthenticated holes — the attacker is already an authed socket
on the same DO — but the library's ethos is reject-don't-degrade, and these are
the cheap mechanical layers of that.

## Decisions

### D1: Shape guard — drop and log, no reply

A `wellFormed(v: unknown): v is ClientFrame` check runs after decode, before any
SQL binding. A frame that fails the check is dropped with a server-side
`console.error` (fail loud in logs) and **no client reply** — mirroring the
existing "ignore undecodable frames" stance and extending its comment.

The guard validates per variant of `ClientFrame` (enumerated from
`src/wire/frames.ts`): required fields are checked for correct type and
non-emptiness; optional fields treat `null` as absent (the client transport
serialises absent fields as `null` in MessagePack rather than omitting them).
`where`/`orderBy` fields are left `unknown` — the sql-compiler is their
validator (it already `fail-loud`s via `UnsupportedPredicateError`).

### D2: Three overrideable limits as `protected readonly` tunables

Following the `tickMs`/`compactionEvery` field pattern (doc comment, `protected
readonly`, override-able by subclasses at construction time):

- **`maxOpsPerMutation = 128`**: checked at the top of `handleMut`. **Reject,
don't truncate** — a partial apply would silently drop client writes. Sends
`rejected` with `code: "LIMIT_EXCEEDED"`.
- **`maxSubsPerSocket = 256`**: checked in `handleSub` before `subs.add`. A
re-sub on an existing `subId` replaces the old entry
(`SubscriptionRegistry.add` semantics) and does NOT count against the cap —
only genuinely new sub IDs are counted. Over-limit → `reset` for the refused
`subId` + `console.error`. (Reset is the existing "sub refused" signal.)
- **`maxFrameBytes = 1_048_576`**: checked in `webSocketMessage` before decode
(`typeof message === "string" ? message.length : message.byteLength`).
Oversize → drop + `console.error`. Cloudflare caps WS messages at ~1 MiB
anyway; this makes the bound explicit, testable, and overrideable.

`SubscriptionRegistry.countFor(ws)` was added to expose the per-socket count
without exposing the internal Map.

### D3: Execute-error sanitization; authorize errors stay user-facing

In `handleMut`, there are two catch sites:

- **Authorize catch**: unchanged. Authorize errors are user-facing API
(`README: "throw to deny"`). The error message passes through verbatim.
- **Execute catch (transaction)**: the full error is logged server-side
(`console.error`) and a **generic** `"mutation failed"` message with code
`"EXECUTE_FAILED"` is sent to the client. SQLite constraint strings, column
names, and programming-error text are internal detail — not client API surface.

In `handleCall`, authorize and execute share one try/catch. Command authorize
errors are not currently user-facing API in the same way mutation authorize is, so
the entire catch is sanitized: log full detail server-side, send `"command failed"`
with `"EXECUTE_FAILED"` to the client.

**Compatibility note**: the client-visible error text for execute failures changed.
Callers who matched on specific SQLite error strings or programming-error messages
must update to the generic messages/codes. Authorize-path messages are unchanged.

### D4: Dedup identity binding deferred

`_sync_seen_tx` is keyed by `txId` alone; any authed socket presenting a
guessed/leaked txId receives the stored receipt. Risk is low (txIds are
client-random UUIDs) but the fix needs an identity-keying decision (`TUser` is
author-defined and unserializable in general — likely a `protected
dedupScope(user: TUser): string` hook). This is **explicitly deferred** pending
a maintainer design decision.

## Consequences

- **Security**: arbitrary decoded values no longer reach SQL bindings; inbound
resource exhaustion is bounded; internal schema detail does not leak to clients.
- **Behavior change** (observable by consumers): execute-path `rejected` frames
now carry `"mutation failed"`/`"command failed"` + `"EXECUTE_FAILED"` instead of
the raw error message. Authorize-path messages are unchanged.
- **Hibernation**: no idle timers were introduced. All new checks are synchronous
and run on the existing `webSocketMessage` path.
- **Extensibility**: all three limits are `protected readonly` — subclasses can
override at construction time. `LimitsTestDO` in the test worker exercises this.
- **Test coverage**: `tests/wire-hardening.test.ts` (5 tests) pins all four
invariants; `tests/error-paths.test.ts` was updated to assert the new generic
error text for execute failures.
1 change: 1 addition & 0 deletions docs/adr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ explains the displacement.
| [0009](./0009-changelog-time-retention.md) | Changelog time-based retention; reset stale reconnects | Accepted |
| [0010](./0010-typed-mutations-collection-manifest.md) | Typed mutations via a collection-row manifest on `SyncRegistry` | Accepted |
| [0011](./0011-ssr-dehydrate-hydrate.md) | SSR: dehydrate on the worker, hydrate to the cursor | Accepted (experimental; generalizes 0002 C1 → C1′) |
| [0012](./0012-wire-input-hardening.md) | Wire-input hardening: frame-shape guards, inbound limits, sanitized execute errors | Accepted |
5 changes: 5 additions & 0 deletions src/server/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export class SubscriptionRegistry {
return Array.from(this.subsByWs.get(ws)?.values() ?? [])
}

/** Number of active subscriptions on a single socket — for per-socket cap enforcement. */
countFor(ws: WebSocket): number {
return this.subsByWs.get(ws)?.size ?? 0
}

/** All (ws, sub) pairs subscribed to a collection — for delta fan-out. */
forCollection(collection: string): Array<{ ws: WebSocket; sub: Sub }> {
const out: Array<{ ws: WebSocket; sub: Sub }> = []
Expand Down
130 changes: 125 additions & 5 deletions src/server/sync-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
protected readonly changelogRetentionMs: number | null = 172_800_000
/** Dedup retention window (ms), independent of changelog retention (C5). */
protected readonly dedupRetentionMs: number = 3_600_000
/** Maximum ops in a single `mut` frame (ADR-0012). Reject-don't-truncate:
* a partial apply would silently drop client writes. Override in subclasses
* to tune for your workload. */
protected readonly maxOpsPerMutation: number = 128
/** Maximum concurrent subscriptions per socket (ADR-0012). Over-limit subs
* are refused with a `reset` frame so legitimate earlier subs keep flowing.
* Override to tune for your data model. */
protected readonly maxSubsPerSocket: number = 256
/** Maximum inbound frame size in bytes (ADR-0012). Cloudflare's own cap is
* ~1 MiB; this makes the bound explicit, testable, and overrideable.
* Oversize frames are dropped + logged without closing the socket (mirrors
* the undecodable-frame stance). */
protected readonly maxFrameBytes: number = 1_048_576
private writesSinceCompaction = 0
protected readonly broadcaster: Broadcaster
private readonly liveWs = new Set<WebSocket>()
Expand Down Expand Up @@ -171,13 +184,40 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends

override async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
// "ping"/"pong" are handled by the auto-response and never arrive here.
let frame: ClientFrame

// Reject oversize frames before decode (ADR-0012): mirrors the
// undecodable-frame stance — drop + log, no reply, no crash.
const byteLen = typeof message === "string" ? message.length : message.byteLength
if (byteLen > this.maxFrameBytes) {
console.error(`oversize frame dropped (${byteLen} bytes > maxFrameBytes ${this.maxFrameBytes})`)
return
}

let decoded: unknown
try {
frame = this.codec.decode(message) as ClientFrame
decoded = this.codec.decode(message)
} catch {
return // ignore undecodable frames
}
await this.dispatch(ws, frame)

// Shape-guard after decode (ADR-0012): a frame that decodes but has the
// wrong structure is dropped + logged. The guard runs BEFORE any SQL
// binding so no arbitrary decoded value reaches lookupTx or sql.exec.
if (!this.wellFormed(decoded)) {
// Safe stringify: decoded may contain bigints (MessagePack useBigInt64);
// JSON.stringify throws on bigint — use a replacer to avoid crashing the
// logging itself.
let summary: string
try {
summary = JSON.stringify(decoded, (_k, v) => (typeof v === "bigint" ? String(v) : v))
} catch {
summary = String(decoded)
}
console.error("malformed frame dropped", summary)
return
}

await this.dispatch(ws, decoded)
}

override webSocketClose(ws: WebSocket): void {
Expand All @@ -190,6 +230,59 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
this.liveWs.delete(ws)
}

/** Shape-guard: returns true iff `v` is a structurally valid ClientFrame.
* (ADR-0012) Runs after decode, before any SQL binding — ensures no
* arbitrary decoded value reaches lookupTx or sql.exec.
*
* Optional fields treat null == absent (the client transport serialises
* absent fields as null in MessagePack rather than omitting them). */
private wellFormed(v: unknown): v is ClientFrame {
if (v === null || typeof v !== "object") return false
const f = v as Record<string, unknown>
const t = f["t"]
if (typeof t !== "string") return false

const isNonEmptyString = (x: unknown): x is string => typeof x === "string" && x.length > 0
/** null is treated as absent for optional fields */
const absent = (x: unknown): boolean => x === undefined || x === null

switch (t) {
case "sub":
return (
isNonEmptyString(f["subId"]) &&
isNonEmptyString(f["collection"]) &&
(absent(f["since"]) || typeof f["since"] === "string") &&
(absent(f["limit"]) || typeof f["limit"] === "number") &&
(absent(f["offset"]) || typeof f["offset"] === "number")
)
case "unsub":
return typeof f["subId"] === "string"
case "mut": {
if (!isNonEmptyString(f["txId"]) || !isNonEmptyString(f["collection"])) return false
if (!Array.isArray(f["ops"]) || f["ops"].length === 0) return false
const validOpTypes = new Set(["insert", "update", "delete"])
for (const op of f["ops"] as Array<unknown>) {
if (op === null || typeof op !== "object") return false
const o = op as Record<string, unknown>
if (!validOpTypes.has(o["type"] as string)) return false
if (typeof o["key"] !== "string") return false
if (!absent(o["cols"]) && (typeof o["cols"] !== "object" || Array.isArray(o["cols"]))) return false
}
return true
}
case "call":
return isNonEmptyString(f["txId"]) && isNonEmptyString(f["name"])
case "fetch":
return (
isNonEmptyString(f["fetchId"]) &&
isNonEmptyString(f["collection"]) &&
(absent(f["cursor"]) || typeof f["cursor"] === "object")
)
default:
return false
}
}

private async dispatch(ws: WebSocket, frame: ClientFrame): Promise<void> {
switch (frame.t) {
case "sub":
Expand Down Expand Up @@ -277,6 +370,12 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
* socket before `committed`.
*/
private async handleMut(ws: WebSocket, f: Extract<ClientFrame, { t: "mut" }>): Promise<void> {
// Inbound limit: reject over-length batches without applying anything
// (ADR-0012). Reject-don't-truncate: a partial apply silently drops writes.
if (f.ops.length > this.maxOpsPerMutation) {
return this.rejectTx(ws, f.txId, `mutation exceeds maxOpsPerMutation (${this.maxOpsPerMutation})`, "LIMIT_EXCEEDED")
}

const seen = lookupTx(this.sql, f.txId)
if (seen) return this.replayReceipt(ws, f.txId, seen)

Expand Down Expand Up @@ -308,7 +407,12 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
})
commitSeq = String(currentSeq(this.sql))
} catch (e) {
return this.rejectTx(ws, f.txId, errorMessage(e))
// Log full detail server-side; send only a generic message to the client
// (ADR-0012). SQLite constraint strings, column names, and programming-
// error text are internal detail — not client API surface. The authorize
// catch above is intentionally kept user-facing (README: "throw to deny").
console.error(`mutation '${f.collection}' execute failed: ${errorMessage(e)}`)
return this.rejectTx(ws, f.txId, "mutation failed", "EXECUTE_FAILED")
}

recordTx(this.sql, f.txId, true, commitSeq, null, null)
Expand Down Expand Up @@ -353,7 +457,12 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
if (def.authorize) await def.authorize({ user, args: f.args, sql: this.sql, env: this.env })
result = await def.execute({ user, args: f.args, sql: this.sql, env: this.env })
} catch (e) {
return this.rejectTx(ws, f.txId, errorMessage(e))
// Log full detail server-side; send only a generic message to the client
// (ADR-0012). Both authorize and execute errors for commands go through
// this path — command authorize errors are NOT currently user-facing API
// (unlike mutation authorize, which is "throw to deny"). Log + generic.
console.error(`command '${f.name}' failed: ${errorMessage(e)}`)
return this.rejectTx(ws, f.txId, "command failed", "EXECUTE_FAILED")
}

// Serialize the result for dedup replay BEFORE recording success. A
Expand Down Expand Up @@ -500,6 +609,17 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
this.send(ws, { t: "reset", sub: frame.subId })
return
}

// Per-socket subscription cap (ADR-0012). A re-sub on an existing subId
// replaces the old entry (SubscriptionRegistry.add semantics) — count
// only new subIds against the cap.
const existingCount = this.subs.countFor(ws)
const existingSub = this.subs.forWs(ws).find((s) => s.subId === frame.subId)
if (!existingSub && existingCount >= this.maxSubsPerSocket) {
console.error(`sub '${frame.subId}' refused: maxSubsPerSocket (${this.maxSubsPerSocket}) reached`)
this.send(ws, { t: "reset", sub: frame.subId })
return
}
// Lower where/orderBy/limit/offset into SQLite. An un-lowerable predicate
// (outside the supported floor) is rejected, not silently full-scanned.
let query: { sql: string; params: Array<unknown> }
Expand Down
1 change: 1 addition & 0 deletions tests/env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ declare module "cloudflare:test" {
UNREG_DO: DurableObjectNamespace
MAINT_DO: DurableObjectNamespace
SLOW_DO: DurableObjectNamespace
LIMITS_DO: DurableObjectNamespace
}
}
7 changes: 5 additions & 2 deletions tests/error-paths.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,16 @@ describe("mutation error paths (atomicity, fail-loud, exactly-once)", () => {
})

describe("command error paths", () => {
it("command execute throws → rejected with message", async () => {
it("command execute throws → rejected with generic message, detail not leaked", async () => {
const ws = await openWs("/sync/err-cmd-boom")
send(ws, { t: "call", txId: "c1", name: "boom", args: {} })
const frames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed")
const last = frames[frames.length - 1]! as Extract<ServerFrame, { t: "rejected" }>
expect(last.t).toBe("rejected")
expect(last.error.message).toMatch(/command boom/)
// execute errors are sanitized — internal detail must NOT reach the client (ADR-0012)
expect(last.error.message).toBe("command failed")
expect(last.error.code).toBe("EXECUTE_FAILED")
expect(last.error.message).not.toMatch(/boom/)
ws.close()
})

Expand Down
13 changes: 13 additions & 0 deletions tests/test-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,20 @@ export class SlowTickDO extends SyncTestDO {
protected override readonly tickMs = 30_000
}

/** Same collections as SyncTestDO with tiny inbound limits — lets wire-hardening
* tests (ADR-0012) exercise the limit paths without sending 128+ ops or opening
* 256+ subscriptions. */
export class LimitsTestDO extends SyncTestDO {
protected override readonly maxOpsPerMutation = 2
protected override readonly maxSubsPerSocket = 2
}

interface Env {
TEST_DO: DurableObjectNamespace
SYNC_DO: DurableObjectNamespace
MAINT_DO: DurableObjectNamespace
SLOW_DO: DurableObjectNamespace
LIMITS_DO: DurableObjectNamespace
}

export default {
Expand All @@ -151,6 +160,10 @@ export default {
const name = url.pathname.slice("/slow/".length) || "default"
return env.SLOW_DO.get(env.SLOW_DO.idFromName(name)).fetch(req)
}
if (url.pathname.startsWith("/limits/")) {
const name = url.pathname.slice("/limits/".length) || "default"
return env.LIMITS_DO.get(env.LIMITS_DO.idFromName(name)).fetch(req)
}
return new Response("test-worker")
},
}
Loading