diff --git a/.gitignore b/.gitignore index 59dbdc4..603c88d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ coverage/ .idea/ .vscode/ *.tsbuildinfo +.zed/ diff --git a/CHANGELOG.md b/CHANGELOG.md index c213d91..6e487d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,30 @@ While pre-1.0, the public API may change between 0.x releases. _Nothing yet._ +## [0.3.1] — 2026-06-11 + +### Fixed + +- **Catch-up reinsert no longer wedges the client.** A key deleted-and- + reinserted while a client was away arrives in the catch-up as op=`insert` + for a key the client still holds; TanStack's sync write throws + `DuplicateKeySyncError` on that, aborting the whole catch-up transaction. + The adapter now applies a held-key insert as the upsert it semantically is + (the move-in update-upsert contract, ADR-0002 C4). +- **Cursor barrier (C1′).** A snapshot or catch-up served on a socket + with still-buffered coalesced deltas could advance the client's cursor past + an undelivered write (multi-collection reconnect; drop before the tick lost + the write). The server now flushes the socket's pending deltas before any + synchronous cursor-advancing emission — ADR-0002 C1 generalized from + `committed` to all cursor boundaries. +- **Reconnect window no longer kills subscriptions.** The `reconnecting` flag + was set inside the reconnect timer, so a mutation fired within the reconnect + delay of a drop established the fresh socket before the timer ran — with no + resubscribe, leaving every subscription silently dead on the new socket (and + the late timer wedged the flag). The flag is now set when the reconnect is + scheduled, so whichever connect wins — timer- or demand-driven — resubscribes + from the cursor. + ## [0.3.0] — 2026-06-09 ### Added diff --git a/package.json b/package.json index c744e4f..d687775 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "tanstack-do-db-collection", - "version": "0.3.0", + "version": "0.3.1", "description": "Sync a TanStack DB collection to a Cloudflare Durable Object over WebSockets — optimistic mutations, live queries, and single-ordered-stream write confirmation.", "type": "module", "license": "MIT", diff --git a/src/client/do-collection.ts b/src/client/do-collection.ts index a859ed1..80b0a2d 100644 --- a/src/client/do-collection.ts +++ b/src/client/do-collection.ts @@ -123,6 +123,12 @@ export function doCollectionOptions( onDelta: (op, key, cols) => { ensureBegin() if (op === "delete") write({ type: "delete", key: key as string }) + // A catch-up emits the LATEST op per changed key, so a key deleted-and- + // reinserted while we were away arrives as "insert" for a key we still + // HOLD — TanStack's sync write throws DuplicateKeySyncError on that + // unless values deep-equal. Apply a held-key insert as the upsert it + // semantically is (update upserts; the move-in contract, ADR-0002 C4). + else if (op === "insert" && collection.get(key as string) !== undefined) write({ type: "update", value: cols }) else write({ type: op, value: cols }) }, onUptodate: () => flush(), diff --git a/src/client/transport.ts b/src/client/transport.ts index 182efef..ac8a056 100644 --- a/src/client/transport.ts +++ b/src/client/transport.ts @@ -133,8 +133,14 @@ export class WebSocketTransport { this.connectPromise = null // Auto-reconnect on an unexpected drop while subscriptions are active. if (!this.intentionallyClosed && this.handlers.size > 0) { + // The flag is set at SCHEDULING time, not in the timer: a demand- + // driven connect() (a mutation inside the reconnect window) may + // establish the fresh socket first, and it must run the resubscribe + // path too — or every subscription is silently dead on the new + // socket and the late timer wedges the flag (pre-existing bug, found + // in the ADR-0011 grill). + this.reconnecting = true setTimeout(() => { - this.reconnecting = true void this.connect().catch(() => { /* next attempt retries on the following close */ }) diff --git a/src/server/sync-do.ts b/src/server/sync-do.ts index 6739839..667049e 100644 --- a/src/server/sync-do.ts +++ b/src/server/sync-do.ts @@ -480,6 +480,14 @@ export abstract class SyncDurableObject extends throw e } + // C1′ (ADR-0011, generalizing ADR-0002 C1): what follows is a synchronous + // cursor-advancing emission — a snapshot's `snap-end` or a catch-up's + // `uptodate` carries the CURRENT seq, which may include changes whose + // deltas are still buffered in the coalescer for this socket. Flush them + // first, or the client's cursor claims a seq it never applied and a drop + // before the tick loses the write (reconnect resumes past it). + this.broadcaster.flushOne(ws) + const sub = this.subs.add(ws, frame.subId, frame.collection, frame.where) const seq = String(currentSeq(this.sql)) diff --git a/tests/cursor-barrier.test.ts b/tests/cursor-barrier.test.ts new file mode 100644 index 0000000..afcf952 --- /dev/null +++ b/tests/cursor-barrier.test.ts @@ -0,0 +1,121 @@ +import { SELF } from "cloudflare:test" +import { describe, expect, it } from "vitest" +import { createFrameCodec } from "../src/wire/frame-codec.ts" +import type { ClientFrame, ServerFrame } from "../src/wire/frames.ts" + +// WHY (ADR-0011 C1′, generalizing ADR-0002 C1): the client's single cursor may +// only ever advance over a CONTIGUOUS applied prefix. Any server emission that +// advances the cursor (`snap-end`, catch-up `uptodate`, `committed`) must +// therefore be preceded by a flush of that socket's pending coalesced deltas — +// otherwise the cursor claims a seq whose delta is still sitting in the +// broadcaster buffer, and a drop before the tick loses that write forever +// (reconnect resumes from the claimed seq and skips it). +// +// This drives the failing interleaving found in adversarial review: two +// collections multiplexed on one socket; a write to `files` is enqueued but +// unflushed (the SlowTickDO never ticks); the same socket then opens a +// `messages` catch-up sub. The catch-up's `uptodate` carries the current seq — +// which includes the files write — so the files delta MUST be on the wire +// before it. + +const codec = createFrameCodec() + +async function openWs(path: string): Promise { + const res = await SELF.fetch(`https://example.com${path}`, { headers: { Upgrade: "websocket" } }) + expect(res.status).toBe(101) + const ws = res.webSocket + if (!ws) throw new Error("no webSocket on 101 response") + ws.accept() + return ws +} + +/** Persistent recorder — attached once so no frame between steps is missed. */ +function record(ws: WebSocket): Array { + const out: Array = [] + ws.addEventListener("message", (e) => { + out.push(codec.decode((e as MessageEvent).data as ArrayBuffer) as ServerFrame) + }) + return out +} + +async function waitFor(pred: () => boolean, timeoutMs = 3000): Promise { + const start = Date.now() + while (!pred()) { + if (Date.now() - start > timeoutMs) throw new Error("waitFor timeout") + await new Promise((r) => setTimeout(r, 5)) + } +} + +const send = (ws: WebSocket, f: ClientFrame): void => ws.send(codec.encode(f)) + +describe("cursor barrier: pending deltas flush before any cursor-advancing frame (C1′)", () => { + it("a catch-up sub on one collection cannot advance the cursor past another collection's buffered delta", async () => { + const room = `barrier-${crypto.randomUUID()}` + const ws1 = await openWs(`/slow/${room}`) + const ws2 = await openWs(`/slow/${room}`) + const f1 = record(ws1) + const f2 = record(ws2) + + // ws1 watches BOTH collections on one socket. + send(ws1, { t: "sub", subId: "m1", collection: "messages" }) + send(ws1, { t: "sub", subId: "f1", collection: "files" }) + await waitFor(() => f1.filter((f) => f.t === "snap-end").length === 2) + + // A confirmed write gives ws1 a real cursor S1 > 0 (catch-up needs one). + send(ws1, { t: "mut", txId: "tx-m", collection: "messages", ops: [{ type: "insert", key: "a", cols: { id: "a", body: "hi" } }] }) + await waitFor(() => f1.some((f) => f.t === "committed")) + const s1 = (f1.find((f) => f.t === "committed") as Extract).seq + + // ws2 writes to `files`: ws1's delta for it is enqueued but NOT flushed + // (SlowTickDO's coalescer never ticks inside this test). + send(ws2, { t: "mut", txId: "tx-f", collection: "files", ops: [{ type: "insert", key: "x", cols: { id: "x", name: "doc" } }] }) + await waitFor(() => f2.some((f) => f.t === "committed")) + const s2 = (f2.find((f) => f.t === "committed") as Extract).seq + expect(BigInt(s2)).toBeGreaterThan(BigInt(s1)) + + // Same socket now opens a messages catch-up sub from S1. Its `uptodate` + // carries the current seq (>= S2) and the client advances on it — so the + // buffered files delta must arrive FIRST. + const before = f1.length + send(ws1, { t: "sub", subId: "m2", collection: "messages", since: s1 }) + await waitFor(() => f1.slice(before).some((f) => f.t === "uptodate" && BigInt(f.seq) >= BigInt(s2))) + + const since = f1.slice(before) + const deltaIdx = since.findIndex((f) => f.t === "d" && f.sub === "f1" && f.key === "x") + const boundaryIdx = since.findIndex((f) => f.t === "uptodate" && BigInt(f.seq) >= BigInt(s2)) + expect(deltaIdx).toBeGreaterThanOrEqual(0) // the buffered delta was flushed at all + expect(deltaIdx).toBeLessThan(boundaryIdx) // ...and BEFORE the cursor boundary + ws1.close() + ws2.close() + }) + + it("a fresh snapshot on one collection cannot advance the cursor past another collection's buffered delta", async () => { + const room = `barrier-snap-${crypto.randomUUID()}` + const ws1 = await openWs(`/slow/${room}`) + const ws2 = await openWs(`/slow/${room}`) + const f1 = record(ws1) + const f2 = record(ws2) + + send(ws1, { t: "sub", subId: "f1", collection: "files" }) + await waitFor(() => f1.some((f) => f.t === "snap-end")) + + // Buffered, unflushed files delta on ws1 (originating socket is ws2). + send(ws2, { t: "mut", txId: "tx-f2", collection: "files", ops: [{ type: "insert", key: "y", cols: { id: "y", name: "img" } }] }) + await waitFor(() => f2.some((f) => f.t === "committed")) + const s2 = (f2.find((f) => f.t === "committed") as Extract).seq + + // A FRESH sub (no since) on messages snapshots at the current seq: its + // snap-end advances the cursor to >= S2, so the files delta must precede it. + const before = f1.length + send(ws1, { t: "sub", subId: "m1", collection: "messages" }) + await waitFor(() => f1.slice(before).some((f) => f.t === "snap-end" && f.sub === "m1")) + + const since = f1.slice(before) + const deltaIdx = since.findIndex((f) => f.t === "d" && f.sub === "f1" && f.key === "y") + const endIdx = since.findIndex((f) => f.t === "snap-end" && f.sub === "m1") + expect(deltaIdx).toBeGreaterThanOrEqual(0) + expect(deltaIdx).toBeLessThan(endIdx) + ws1.close() + ws2.close() + }) +}) diff --git a/tests/do-collection.test.ts b/tests/do-collection.test.ts index 76abdba..462ca41 100644 --- a/tests/do-collection.test.ts +++ b/tests/do-collection.test.ts @@ -48,7 +48,7 @@ function startSync(transport: WebSocketTransport): { calls: Array } { // sync lives on opts.sync.sync; invoke with spy controls (cast: type-only dep). const syncConfig = (opts as unknown as { sync: { sync: (p: unknown) => void } }).sync syncConfig.sync({ - collection: {}, + collection: { get: () => undefined }, // adapter consults held keys (held-insert upsert) begin: () => calls.push(["begin"]), write: (m: unknown) => calls.push(["write", m]), commit: () => calls.push(["commit"]), @@ -93,7 +93,7 @@ describe("doCollectionOptions (M3 adapter)", () => { const adapter = doCollectionOptions({ transport: t, table: "messages", getKey: (r) => r.id }) const calls: Array = [] ;(adapter as unknown as { sync: { sync: (p: unknown) => void } }).sync.sync({ - collection: {}, + collection: { get: () => undefined }, begin: () => calls.push(["begin"]), write: (m: unknown) => calls.push(["write", m]), commit: () => calls.push(["commit"]), diff --git a/tests/env.d.ts b/tests/env.d.ts index 4361efe..b03f640 100644 --- a/tests/env.d.ts +++ b/tests/env.d.ts @@ -6,5 +6,6 @@ declare module "cloudflare:test" { SYNC_DO: DurableObjectNamespace UNREG_DO: DurableObjectNamespace MAINT_DO: DurableObjectNamespace + SLOW_DO: DurableObjectNamespace } } diff --git a/tests/filtered-client.test.ts b/tests/filtered-client.test.ts index a456ff9..cc1810e 100644 --- a/tests/filtered-client.test.ts +++ b/tests/filtered-client.test.ts @@ -51,7 +51,7 @@ function startFiltered(transport: WebSocketTransport, where: unknown): { calls: const calls: Array = [] const adapter = doCollectionOptions({ transport, table: "messages", getKey: (r) => r.id, where }) ;(adapter as unknown as { sync: { sync: (p: unknown) => void } }).sync.sync({ - collection: {}, + collection: { get: () => undefined }, // adapter consults held keys (held-insert upsert) begin: () => calls.push(["begin"]), write: (m: unknown) => calls.push(["write", m]), commit: () => calls.push(["commit"]), diff --git a/tests/reconnect-window.test.ts b/tests/reconnect-window.test.ts new file mode 100644 index 0000000..70d8892 --- /dev/null +++ b/tests/reconnect-window.test.ts @@ -0,0 +1,92 @@ +import { describe, expect, it } from "vitest" +import { WebSocketTransport, type WebSocketLike } from "../src/client/transport.ts" +import { createFrameCodec } from "../src/wire/frame-codec.ts" +import type { ClientFrame, ServerFrame } from "../src/wire/frames.ts" + +// WHY: PRE-EXISTING bug found while grilling ADR-0011's forced-reconnect +// design (the bug itself is in the plain reconnect path, present on this +// branch; the forced-reconnect machinery is not). The `reconnecting` flag was +// set inside the reconnect TIMER, so a connect() triggered on demand — a +// mutation fired within reconnectDelayMs of a drop — established the fresh +// socket with the flag still false: NO resubscribeAll, every subscription +// silently dead (the server has no subs for the new socket), and the late +// timer's connect() early-returned, wedging the flag. The flag must be set +// when the reconnect is SCHEDULED, so whichever connect() establishes — +// timer-driven or demand-driven — runs the resubscribe path. + +const codec = createFrameCodec() + +interface Fake { + ws: WebSocketLike + sent: Array + emit: (type: string, ev: { data?: unknown }) => void +} + +function makeFake(): Fake { + const listeners = new Map void>>() + const fake: Fake = { + sent: [], + emit: (type, ev) => { + for (const l of listeners.get(type) ?? []) l(ev) + }, + ws: { + send: (data) => fake.sent.push(codec.decode(data as ArrayBuffer | string) as ClientFrame), + close: () => {}, + addEventListener: (type, l) => { + const arr = listeners.get(type) ?? [] + arr.push(l) + listeners.set(type, arr) + }, + removeEventListener: () => {}, + }, + } + return fake +} + +async function waitFor(pred: () => boolean, timeoutMs = 3000): Promise { + const start = Date.now() + while (!pred()) { + if (Date.now() - start > timeoutMs) throw new Error("waitFor timeout") + await new Promise((r) => setTimeout(r, 5)) + } +} + +describe("reconnect window (drop → demand-driven connect before the timer)", () => { + it("a mutation-driven connect still resubscribes from the cursor", async () => { + const fakes: Array = [] + const t = new WebSocketTransport({ + url: "wss://fake", + reconnectDelayMs: 60_000, // the timer must NOT be what saves us + open: () => { + const f = makeFake() + fakes.push(f) + return f.ws + }, + }) + await t.subscribe( + "s1", + "messages", + { onSnap: () => {}, onSnapEnd: () => {}, onDelta: () => {}, onUptodate: () => {}, onReset: () => {} }, + ) + fakes[0]!.emit("message", { data: codec.encode({ t: "snap-end", sub: "s1", seq: "5" } satisfies ServerFrame) }) + expect(t.appliedCursor).toBe("5") + + // Unexpected drop... + fakes[0]!.emit("close", {}) + // ...and a mutation fires immediately, long before the reconnect timer. + const mut = t.sendMut({ t: "mut", txId: "tx1", collection: "messages", ops: [] }) + // Wait for the mut FRAME (not just the socket): the send lands a microtask + // after open(), and the committed reply below needs its waiter registered. + await waitFor(() => fakes.length === 2 && fakes[1]!.sent.some((f) => f.t === "mut")) + + // The demand-driven socket must carry the resubscription, from the cursor. + const resub = fakes[1]!.sent.find((f) => f.t === "sub") as Extract | undefined + expect(resub).toBeDefined() + expect(resub!.since).toBe("5") + + // Settle the in-flight mutation cleanly, then shut down. + fakes[1]!.emit("message", { data: codec.encode({ t: "committed", txId: "tx1", seq: "6" } satisfies ServerFrame) }) + await mut + t.close() + }) +}) diff --git a/tests/reinsert-catchup.test.ts b/tests/reinsert-catchup.test.ts new file mode 100644 index 0000000..487044f --- /dev/null +++ b/tests/reinsert-catchup.test.ts @@ -0,0 +1,76 @@ +import { createCollection } from "@tanstack/db" +import { env, runInDurableObject, SELF } from "cloudflare:test" +import { describe, expect, it } from "vitest" +import { doCollectionOptions } from "../src/client/do-collection.ts" +import { WebSocketTransport, type WebSocketLike } from "../src/client/transport.ts" + +// WHY: a catch-up (reconnect or SSR hydration) emits the LATEST CDC op per +// changed key. A key that was deleted-and-reinserted while the client was away +// therefore arrives as op="insert" — for a key the client still HOLDS (its +// delete was compacted away with the rest of the window). TanStack's sync +// write throws DuplicateKeySyncError on insert-over-existing unless values +// deep-equal, so without normalization the reinsert aborts the whole catch-up +// transaction and the client wedges on stale state. The adapter must apply a +// held-key "insert" as the upsert it semantically is (ADR-0011 D4) — the same +// update-upsert contract move-in already relies on (ADR-0002 C4). + +interface Msg { + id: string + body: string +} + +function makeTransport(room: string): WebSocketTransport { + return new WebSocketTransport({ + url: `https://example.com/sync/${room}`, + reconnectDelayMs: 20, + open: async () => { + const res = await SELF.fetch(`https://example.com/sync/${room}`, { headers: { Upgrade: "websocket" } }) + const ws = res.webSocket + if (!ws) throw new Error("no webSocket") + ws.accept() + return ws as unknown as WebSocketLike + }, + }) +} + +async function waitFor(pred: () => boolean, timeoutMs = 3000): Promise { + const start = Date.now() + while (!pred()) { + if (Date.now() - start > timeoutMs) throw new Error("waitFor timeout") + await new Promise((r) => setTimeout(r, 5)) + } +} + +describe("catch-up reinsert lands as an upsert, not a DuplicateKeySyncError", () => { + it("converges a key deleted-and-reinserted while the client was away", async () => { + const room = `reinsert-${crypto.randomUUID()}` + const t = makeTransport(room) + await t.connect() + const stub = env.SYNC_DO.get(env.SYNC_DO.idFromName(room)) + + // Seed before subscribing so the snapshot carries the row and a cursor > 0. + await runInDurableObject(stub, (_i, s) => { + s.storage.sql.exec("INSERT INTO messages(id,body) VALUES('k','v1')") + }) + + const messages = createCollection(doCollectionOptions({ transport: t, table: "messages", getKey: (r) => r.id })) + await messages.preload() + expect(messages.get("k")).toMatchObject({ id: "k", body: "v1" }) + + // Drop the socket; while away, 'k' is deleted then reinserted with a new + // value — the catch-up's latest-op-per-key for 'k' is op="insert". + await runInDurableObject(stub, (_i, state) => { + for (const sock of state.getWebSockets()) sock.close(1000, "drop") + }) + await runInDurableObject(stub, (_i, s) => { + s.storage.sql.exec("DELETE FROM messages WHERE id='k'") + s.storage.sql.exec("INSERT INTO messages(id,body) VALUES('k','v2')") + }) + + // After auto-reconnect + catch-up, the held key must converge to v2 — + // not wedge on a DuplicateKeySyncError-aborted transaction. + await waitFor(() => messages.get("k")?.body === "v2") + expect(messages.get("k")).toMatchObject({ id: "k", body: "v2" }) + t.close() + }) +}) diff --git a/tests/test-worker.ts b/tests/test-worker.ts index 5195bb8..74d874c 100644 --- a/tests/test-worker.ts +++ b/tests/test-worker.ts @@ -111,10 +111,19 @@ export class MaintTestDO extends SyncTestDO { protected override readonly compactionEvery = 3 } +/** Same collections as SyncTestDO with an effectively-infinite coalescer tick: + * enqueued deltas stay buffered until something calls flushOne explicitly. + * Lets the cursor-barrier tests (ADR-0011 C1′) hold "pending egress" still + * while a snapshot/catch-up is served on the same socket. */ +export class SlowTickDO extends SyncTestDO { + protected override readonly tickMs = 30_000 +} + interface Env { TEST_DO: DurableObjectNamespace SYNC_DO: DurableObjectNamespace MAINT_DO: DurableObjectNamespace + SLOW_DO: DurableObjectNamespace } export default { @@ -128,6 +137,10 @@ export default { const name = url.pathname.slice("/maint/".length) || "default" return env.MAINT_DO.get(env.MAINT_DO.idFromName(name)).fetch(req) } + if (url.pathname.startsWith("/slow/")) { + const name = url.pathname.slice("/slow/".length) || "default" + return env.SLOW_DO.get(env.SLOW_DO.idFromName(name)).fetch(req) + } return new Response("test-worker") }, } diff --git a/vitest.config.ts b/vitest.config.ts index 91659b5..2bc96c6 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -23,6 +23,7 @@ export default defineWorkersProject({ SYNC_DO: { className: "SyncTestDO", useSQLite: true }, UNREG_DO: { className: "UnregisteredDO", useSQLite: true }, MAINT_DO: { className: "MaintTestDO", useSQLite: true }, + SLOW_DO: { className: "SlowTickDO", useSQLite: true }, }, }, },