Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions src/server/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,45 @@ export function readChangesSince(sql: SqlStorage, cursor: number): Array<ChangeR
)
}

/** One table's change rows with seq > cursor, in seq order. Uses the
* `_sync_changes_tbl_seq` composite index — this is the per-table catch-up
* read that index exists for. */
export function readChangesSinceFor(
sql: SqlStorage,
tbl: string,
cursor: number,
): Array<ChangeRow> {
return Array.from(
sql.exec<ChangeRow>(
"SELECT seq, tbl, key, op, ts FROM _sync_changes WHERE tbl = ? AND seq > ? ORDER BY seq",
tbl,
cursor,
),
)
}

/** Current rows for a set of keys, for hydrating deltas. `tbl`/`pk` are
* validated identifiers (the SyncRegistry enforces this). */
* validated identifiers (the SyncRegistry enforces this). Queries in chunks
* of 64 to avoid SQLite bound-parameter limits and eliminate the N+1 pattern:
* a reconnect catch-up over 500 keys now issues ⌈500/64⌉ = 8 queries instead
* of 500. Identifiers are quoted; values are bound parameters. */
export function hydrateRows(
sql: SqlStorage,
tbl: string,
pk: string,
keys: Array<string>,
): Map<string, Record<string, SqlStorageValue>> {
const out = new Map<string, Record<string, SqlStorageValue>>()
for (const k of keys) {
const rows = Array.from(
sql.exec<Record<string, SqlStorageValue>>(`SELECT * FROM ${tbl} WHERE ${pk} = ? LIMIT 1`, k),
)
if (rows.length > 0) out.set(k, rows[0]!)
const CHUNK = 64 // stay far below any SQLite bound-parameter limit
for (let i = 0; i < keys.length; i += CHUNK) {
const chunk = keys.slice(i, i + CHUNK)
const placeholders = chunk.map(() => "?").join(", ")
for (const row of sql.exec<Record<string, SqlStorageValue>>(
`SELECT * FROM "${tbl}" WHERE "${pk}" IN (${placeholders})`,
...chunk,
)) {
out.set(String(row[pk]), row)
}
}
return out
}
3 changes: 2 additions & 1 deletion src/server/sync-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
minChangeSeq,
pruneChanges,
readChangesSince,
readChangesSinceFor,
setDrainCursor,
} from "./changes.ts"
import { Broadcaster } from "./broadcast.ts"
Expand Down Expand Up @@ -527,7 +528,7 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
since: number,
seq: string,
): void {
const changes = readChangesSince(this.sql, since).filter((c) => c.tbl === coll.table)
const changes = readChangesSinceFor(this.sql, coll.table, since)
const latest = new Map<string, (typeof changes)[number]>()
for (const c of changes) latest.set(c.key, c)
const liveKeys = [...latest.values()].filter((c) => c.op !== "delete").map((c) => c.key)
Expand Down
72 changes: 72 additions & 0 deletions tests/cdc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import { describe, expect, it } from "vitest"
import {
currentSeq,
getDrainCursor,
hydrateRows,
initSchema,
installTriggers,
readChangesSince,
readChangesSinceFor,
setDrainCursor,
} from "../src/server/changes.ts"

Expand Down Expand Up @@ -113,4 +115,74 @@ describe("CDC: AFTER triggers -> _sync_changes (D12)", () => {
expect(new Set(seqs).size).toBe(10)
})
})

// WHY: chunking must not drop or duplicate keys — the batched IN query must
// return exactly the rows that exist, regardless of how many chunks are needed
// (ADR-0007 D9: TEXT pk, String(row[pk]) keying must match the changelog key).
it("hydrateRows returns exactly the existing keys across multiple chunks (no drops, no dups)", async () => {
await runInDurableObject(freshStub(), (_i, state) => {
const sql = state.storage.sql
setup(sql)
// Seed 150 rows — spans 3 chunks of 64 (64+64+22).
const ids: string[] = []
for (let i = 0; i < 150; i++) {
const id = `row-${String(i).padStart(3, "0")}`
ids.push(id)
sql.exec("INSERT INTO items(id,name,n) VALUES(?,?,?)", id, `name-${i}`, i)
}
// Request all existing keys plus a handful of nonexistent ones.
const nonexistent = ["missing-1", "missing-2", "missing-3"]
const result = hydrateRows(sql, "items", "id", [...ids, ...nonexistent])
// Exactly the 150 seeded rows — nonexistent keys absent, no dups.
expect(result.size).toBe(150)
for (const id of ids) {
const row = result.get(id)
expect(row).toBeDefined()
expect(row!["id"]).toBe(id)
}
for (const id of nonexistent) {
expect(result.has(id)).toBe(false)
}
})
})

// WHY: readChangesSinceFor must use the (tbl,seq) index path and return only
// the requested table's rows — interleaved writes to other tables must not
// appear. A mid-stream cursor must window correctly, matching the per-table
// contract that emitCatchUp relies on for exactly-once catch-up delivery.
it("readChangesSinceFor isolates by table and windows by cursor", async () => {
await runInDurableObject(freshStub(), (_i, state) => {
const sql = state.storage.sql
initSchema(sql)
// Two tables with interleaved writes.
sql.exec(`CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY, body TEXT)`)
sql.exec(`CREATE TABLE IF NOT EXISTS files (id TEXT PRIMARY KEY, name TEXT)`)
installTriggers(sql, "messages", "id")
installTriggers(sql, "files", "id")

sql.exec("INSERT INTO messages(id,body) VALUES('m1','hello')")
sql.exec("INSERT INTO files(id,name) VALUES('f1','doc.pdf')")
sql.exec("INSERT INTO messages(id,body) VALUES('m2','world')")
sql.exec("INSERT INTO files(id,name) VALUES('f2','img.png')")
sql.exec("INSERT INTO messages(id,body) VALUES('m3','!')")

// readChangesSinceFor("messages", 0) returns only messages rows, ascending.
const msgRows = readChangesSinceFor(sql, "messages", 0)
expect(msgRows.map((r) => r.key)).toEqual(["m1", "m2", "m3"])
expect(msgRows.every((r) => r.tbl === "messages")).toBe(true)
// seq values are ascending.
const seqs = msgRows.map((r) => r.seq)
expect(seqs).toEqual([...seqs].sort((a, b) => a - b))

// readChangesSinceFor("files", 0) returns only files rows.
const fileRows = readChangesSinceFor(sql, "files", 0)
expect(fileRows.map((r) => r.key)).toEqual(["f1", "f2"])
expect(fileRows.every((r) => r.tbl === "files")).toBe(true)

// Mid-stream cursor: only rows after the seq of m2 (the 3rd change overall).
// seq of m2 is msgRows[1].seq; we want only m3 after that.
const afterM2 = readChangesSinceFor(sql, "messages", msgRows[1]!.seq)
expect(afterM2.map((r) => r.key)).toEqual(["m3"])
})
})
})