Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ coverage/
.idea/
.vscode/
*.tsbuildinfo
.zed/
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,30 @@ While pre-1.0, the public API may change between 0.x releases.

_Nothing yet._

## [0.3.1] — 2026-06-11

### Fixed

- **Catch-up reinsert no longer wedges the client.** A key deleted-and-
reinserted while a client was away arrives in the catch-up as op=`insert`
for a key the client still holds; TanStack's sync write throws
`DuplicateKeySyncError` on that, aborting the whole catch-up transaction.
The adapter now applies a held-key insert as the upsert it semantically is
(the move-in update-upsert contract, ADR-0002 C4).
- **Cursor barrier (C1′).** A snapshot or catch-up served on a socket
with still-buffered coalesced deltas could advance the client's cursor past
an undelivered write (multi-collection reconnect; drop before the tick lost
the write). The server now flushes the socket's pending deltas before any
synchronous cursor-advancing emission — ADR-0002 C1 generalized from
`committed` to all cursor boundaries.
- **Reconnect window no longer kills subscriptions.** The `reconnecting` flag
was set inside the reconnect timer, so a mutation fired within the reconnect
delay of a drop established the fresh socket before the timer ran — with no
resubscribe, leaving every subscription silently dead on the new socket (and
the late timer wedged the flag). The flag is now set when the reconnect is
scheduled, so whichever connect wins — timer- or demand-driven — resubscribes
from the cursor.

## [0.3.0] — 2026-06-09

### Added
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tanstack-do-db-collection",
"version": "0.3.0",
"version": "0.3.1",
"description": "Sync a TanStack DB collection to a Cloudflare Durable Object over WebSockets — optimistic mutations, live queries, and single-ordered-stream write confirmation.",
"type": "module",
"license": "MIT",
Expand Down
6 changes: 6 additions & 0 deletions src/client/do-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ export function doCollectionOptions<T extends object>(
onDelta: (op, key, cols) => {
ensureBegin()
if (op === "delete") write({ type: "delete", key: key as string })
// A catch-up emits the LATEST op per changed key, so a key deleted-and-
// reinserted while we were away arrives as "insert" for a key we still
// HOLD — TanStack's sync write throws DuplicateKeySyncError on that
// unless values deep-equal. Apply a held-key insert as the upsert it
// semantically is (update upserts; the move-in contract, ADR-0002 C4).
else if (op === "insert" && collection.get(key as string) !== undefined) write({ type: "update", value: cols })
else write({ type: op, value: cols })
},
onUptodate: () => flush(),
Expand Down
8 changes: 7 additions & 1 deletion src/client/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ export class WebSocketTransport {
this.connectPromise = null
// Auto-reconnect on an unexpected drop while subscriptions are active.
if (!this.intentionallyClosed && this.handlers.size > 0) {
// The flag is set at SCHEDULING time, not in the timer: a demand-
// driven connect() (a mutation inside the reconnect window) may
// establish the fresh socket first, and it must run the resubscribe
// path too — or every subscription is silently dead on the new
// socket and the late timer wedges the flag (pre-existing bug, found
// in the ADR-0011 grill).
this.reconnecting = true
setTimeout(() => {
this.reconnecting = true
void this.connect().catch(() => {
/* next attempt retries on the following close */
})
Expand Down
8 changes: 8 additions & 0 deletions src/server/sync-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,14 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
throw e
}

// C1′ (ADR-0011, generalizing ADR-0002 C1): what follows is a synchronous
// cursor-advancing emission — a snapshot's `snap-end` or a catch-up's
// `uptodate` carries the CURRENT seq, which may include changes whose
// deltas are still buffered in the coalescer for this socket. Flush them
// first, or the client's cursor claims a seq it never applied and a drop
// before the tick loses the write (reconnect resumes past it).
this.broadcaster.flushOne(ws)

const sub = this.subs.add(ws, frame.subId, frame.collection, frame.where)
const seq = String(currentSeq(this.sql))

Expand Down
121 changes: 121 additions & 0 deletions tests/cursor-barrier.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { SELF } from "cloudflare:test"
import { describe, expect, it } from "vitest"
import { createFrameCodec } from "../src/wire/frame-codec.ts"
import type { ClientFrame, ServerFrame } from "../src/wire/frames.ts"

// WHY (ADR-0011 C1′, generalizing ADR-0002 C1): the client's single cursor may
// only ever advance over a CONTIGUOUS applied prefix. Any server emission that
// advances the cursor (`snap-end`, catch-up `uptodate`, `committed`) must
// therefore be preceded by a flush of that socket's pending coalesced deltas —
// otherwise the cursor claims a seq whose delta is still sitting in the
// broadcaster buffer, and a drop before the tick loses that write forever
// (reconnect resumes from the claimed seq and skips it).
//
// This drives the failing interleaving found in adversarial review: two
// collections multiplexed on one socket; a write to `files` is enqueued but
// unflushed (the SlowTickDO never ticks); the same socket then opens a
// `messages` catch-up sub. The catch-up's `uptodate` carries the current seq —
// which includes the files write — so the files delta MUST be on the wire
// before it.

const codec = createFrameCodec()

async function openWs(path: string): Promise<WebSocket> {
const res = await SELF.fetch(`https://example.com${path}`, { headers: { Upgrade: "websocket" } })
expect(res.status).toBe(101)
const ws = res.webSocket
if (!ws) throw new Error("no webSocket on 101 response")
ws.accept()
return ws
}

/** Persistent recorder — attached once so no frame between steps is missed. */
function record(ws: WebSocket): Array<ServerFrame> {
const out: Array<ServerFrame> = []
ws.addEventListener("message", (e) => {
out.push(codec.decode((e as MessageEvent).data as ArrayBuffer) as ServerFrame)
})
return out
}

async function waitFor(pred: () => boolean, timeoutMs = 3000): Promise<void> {
const start = Date.now()
while (!pred()) {
if (Date.now() - start > timeoutMs) throw new Error("waitFor timeout")
await new Promise((r) => setTimeout(r, 5))
}
}

const send = (ws: WebSocket, f: ClientFrame): void => ws.send(codec.encode(f))

describe("cursor barrier: pending deltas flush before any cursor-advancing frame (C1′)", () => {
it("a catch-up sub on one collection cannot advance the cursor past another collection's buffered delta", async () => {
const room = `barrier-${crypto.randomUUID()}`
const ws1 = await openWs(`/slow/${room}`)
const ws2 = await openWs(`/slow/${room}`)
const f1 = record(ws1)
const f2 = record(ws2)

// ws1 watches BOTH collections on one socket.
send(ws1, { t: "sub", subId: "m1", collection: "messages" })
send(ws1, { t: "sub", subId: "f1", collection: "files" })
await waitFor(() => f1.filter((f) => f.t === "snap-end").length === 2)

// A confirmed write gives ws1 a real cursor S1 > 0 (catch-up needs one).
send(ws1, { t: "mut", txId: "tx-m", collection: "messages", ops: [{ type: "insert", key: "a", cols: { id: "a", body: "hi" } }] })
await waitFor(() => f1.some((f) => f.t === "committed"))
const s1 = (f1.find((f) => f.t === "committed") as Extract<ServerFrame, { t: "committed" }>).seq

// ws2 writes to `files`: ws1's delta for it is enqueued but NOT flushed
// (SlowTickDO's coalescer never ticks inside this test).
send(ws2, { t: "mut", txId: "tx-f", collection: "files", ops: [{ type: "insert", key: "x", cols: { id: "x", name: "doc" } }] })
await waitFor(() => f2.some((f) => f.t === "committed"))
const s2 = (f2.find((f) => f.t === "committed") as Extract<ServerFrame, { t: "committed" }>).seq
expect(BigInt(s2)).toBeGreaterThan(BigInt(s1))

// Same socket now opens a messages catch-up sub from S1. Its `uptodate`
// carries the current seq (>= S2) and the client advances on it — so the
// buffered files delta must arrive FIRST.
const before = f1.length
send(ws1, { t: "sub", subId: "m2", collection: "messages", since: s1 })
await waitFor(() => f1.slice(before).some((f) => f.t === "uptodate" && BigInt(f.seq) >= BigInt(s2)))

const since = f1.slice(before)
const deltaIdx = since.findIndex((f) => f.t === "d" && f.sub === "f1" && f.key === "x")
const boundaryIdx = since.findIndex((f) => f.t === "uptodate" && BigInt(f.seq) >= BigInt(s2))
expect(deltaIdx).toBeGreaterThanOrEqual(0) // the buffered delta was flushed at all
expect(deltaIdx).toBeLessThan(boundaryIdx) // ...and BEFORE the cursor boundary
ws1.close()
ws2.close()
})

it("a fresh snapshot on one collection cannot advance the cursor past another collection's buffered delta", async () => {
const room = `barrier-snap-${crypto.randomUUID()}`
const ws1 = await openWs(`/slow/${room}`)
const ws2 = await openWs(`/slow/${room}`)
const f1 = record(ws1)
const f2 = record(ws2)

send(ws1, { t: "sub", subId: "f1", collection: "files" })
await waitFor(() => f1.some((f) => f.t === "snap-end"))

// Buffered, unflushed files delta on ws1 (originating socket is ws2).
send(ws2, { t: "mut", txId: "tx-f2", collection: "files", ops: [{ type: "insert", key: "y", cols: { id: "y", name: "img" } }] })
await waitFor(() => f2.some((f) => f.t === "committed"))
const s2 = (f2.find((f) => f.t === "committed") as Extract<ServerFrame, { t: "committed" }>).seq

// A FRESH sub (no since) on messages snapshots at the current seq: its
// snap-end advances the cursor to >= S2, so the files delta must precede it.
const before = f1.length
send(ws1, { t: "sub", subId: "m1", collection: "messages" })
await waitFor(() => f1.slice(before).some((f) => f.t === "snap-end" && f.sub === "m1"))

const since = f1.slice(before)
const deltaIdx = since.findIndex((f) => f.t === "d" && f.sub === "f1" && f.key === "y")
const endIdx = since.findIndex((f) => f.t === "snap-end" && f.sub === "m1")
expect(deltaIdx).toBeGreaterThanOrEqual(0)
expect(deltaIdx).toBeLessThan(endIdx)
ws1.close()
ws2.close()
})
})
4 changes: 2 additions & 2 deletions tests/do-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function startSync(transport: WebSocketTransport): { calls: Array<Call> } {
// sync lives on opts.sync.sync; invoke with spy controls (cast: type-only dep).
const syncConfig = (opts as unknown as { sync: { sync: (p: unknown) => void } }).sync
syncConfig.sync({
collection: {},
collection: { get: () => undefined }, // adapter consults held keys (held-insert upsert)
begin: () => calls.push(["begin"]),
write: (m: unknown) => calls.push(["write", m]),
commit: () => calls.push(["commit"]),
Expand Down Expand Up @@ -93,7 +93,7 @@ describe("doCollectionOptions (M3 adapter)", () => {
const adapter = doCollectionOptions<Msg>({ transport: t, table: "messages", getKey: (r) => r.id })
const calls: Array<Call> = []
;(adapter as unknown as { sync: { sync: (p: unknown) => void } }).sync.sync({
collection: {},
collection: { get: () => undefined },
begin: () => calls.push(["begin"]),
write: (m: unknown) => calls.push(["write", m]),
commit: () => calls.push(["commit"]),
Expand Down
1 change: 1 addition & 0 deletions tests/env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ declare module "cloudflare:test" {
SYNC_DO: DurableObjectNamespace
UNREG_DO: DurableObjectNamespace
MAINT_DO: DurableObjectNamespace
SLOW_DO: DurableObjectNamespace
}
}
2 changes: 1 addition & 1 deletion tests/filtered-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function startFiltered(transport: WebSocketTransport, where: unknown): { calls:
const calls: Array<Call> = []
const adapter = doCollectionOptions<Msg>({ transport, table: "messages", getKey: (r) => r.id, where })
;(adapter as unknown as { sync: { sync: (p: unknown) => void } }).sync.sync({
collection: {},
collection: { get: () => undefined }, // adapter consults held keys (held-insert upsert)
begin: () => calls.push(["begin"]),
write: (m: unknown) => calls.push(["write", m]),
commit: () => calls.push(["commit"]),
Expand Down
92 changes: 92 additions & 0 deletions tests/reconnect-window.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { describe, expect, it } from "vitest"
import { WebSocketTransport, type WebSocketLike } from "../src/client/transport.ts"
import { createFrameCodec } from "../src/wire/frame-codec.ts"
import type { ClientFrame, ServerFrame } from "../src/wire/frames.ts"

// WHY: PRE-EXISTING bug found while grilling ADR-0011's forced-reconnect
// design (the bug itself is in the plain reconnect path, present on this
// branch; the forced-reconnect machinery is not). The `reconnecting` flag was
// set inside the reconnect TIMER, so a connect() triggered on demand — a
// mutation fired within reconnectDelayMs of a drop — established the fresh
// socket with the flag still false: NO resubscribeAll, every subscription
// silently dead (the server has no subs for the new socket), and the late
// timer's connect() early-returned, wedging the flag. The flag must be set
// when the reconnect is SCHEDULED, so whichever connect() establishes —
// timer-driven or demand-driven — runs the resubscribe path.

const codec = createFrameCodec()

interface Fake {
ws: WebSocketLike
sent: Array<ClientFrame>
emit: (type: string, ev: { data?: unknown }) => void
}

function makeFake(): Fake {
const listeners = new Map<string, Array<(ev: { data?: unknown }) => void>>()
const fake: Fake = {
sent: [],
emit: (type, ev) => {
for (const l of listeners.get(type) ?? []) l(ev)
},
ws: {
send: (data) => fake.sent.push(codec.decode(data as ArrayBuffer | string) as ClientFrame),
close: () => {},
addEventListener: (type, l) => {
const arr = listeners.get(type) ?? []
arr.push(l)
listeners.set(type, arr)
},
removeEventListener: () => {},
},
}
return fake
}

async function waitFor(pred: () => boolean, timeoutMs = 3000): Promise<void> {
const start = Date.now()
while (!pred()) {
if (Date.now() - start > timeoutMs) throw new Error("waitFor timeout")
await new Promise((r) => setTimeout(r, 5))
}
}

describe("reconnect window (drop → demand-driven connect before the timer)", () => {
it("a mutation-driven connect still resubscribes from the cursor", async () => {
const fakes: Array<Fake> = []
const t = new WebSocketTransport({
url: "wss://fake",
reconnectDelayMs: 60_000, // the timer must NOT be what saves us
open: () => {
const f = makeFake()
fakes.push(f)
return f.ws
},
})
await t.subscribe(
"s1",
"messages",
{ onSnap: () => {}, onSnapEnd: () => {}, onDelta: () => {}, onUptodate: () => {}, onReset: () => {} },
)
fakes[0]!.emit("message", { data: codec.encode({ t: "snap-end", sub: "s1", seq: "5" } satisfies ServerFrame) })
expect(t.appliedCursor).toBe("5")

// Unexpected drop...
fakes[0]!.emit("close", {})
// ...and a mutation fires immediately, long before the reconnect timer.
const mut = t.sendMut({ t: "mut", txId: "tx1", collection: "messages", ops: [] })
// Wait for the mut FRAME (not just the socket): the send lands a microtask
// after open(), and the committed reply below needs its waiter registered.
await waitFor(() => fakes.length === 2 && fakes[1]!.sent.some((f) => f.t === "mut"))

// The demand-driven socket must carry the resubscription, from the cursor.
const resub = fakes[1]!.sent.find((f) => f.t === "sub") as Extract<ClientFrame, { t: "sub" }> | undefined
expect(resub).toBeDefined()
expect(resub!.since).toBe("5")

// Settle the in-flight mutation cleanly, then shut down.
fakes[1]!.emit("message", { data: codec.encode({ t: "committed", txId: "tx1", seq: "6" } satisfies ServerFrame) })
await mut
t.close()
})
})
Loading