From e14033959474be1324c775479b77ade679dd1f63 Mon Sep 17 00:00:00 2001 From: Tom McKenzie Date: Thu, 11 Jun 2026 20:27:41 +1000 Subject: [PATCH] perf(server): batch hydrateRows; per-table catch-up reads use the (tbl,seq) index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminates the N+1 SELECT in hydrateRows by issuing chunked IN queries (64 keys per chunk), cutting SQLite rows-read on reconnect storms. Adds readChangesSinceFor for emitCatchUp's per-table read, activating the _sync_changes_tbl_seq composite index that previously went unused — the JS .filter() after readChangesSince is gone. drainAndBroadcast's all-tables readChangesSince call is intentionally unchanged (one scan, N tables). New tests in cdc.test.ts pin: chunking over 150 rows (no drops/dups, TEXT pk keying), and readChangesSinceFor table isolation + mid-stream cursor windowing. Co-Authored-By: Claude Fable 5 --- src/server/changes.ts | 37 ++++++++++++++++++---- src/server/sync-do.ts | 3 +- tests/cdc.test.ts | 72 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 7 deletions(-) diff --git a/src/server/changes.ts b/src/server/changes.ts index 11cc88b..85bcdcc 100644 --- a/src/server/changes.ts +++ b/src/server/changes.ts @@ -240,13 +240,33 @@ export function readChangesSince(sql: SqlStorage, cursor: number): Array cursor, in seq order. Uses the + * `_sync_changes_tbl_seq` composite index — this is the per-table catch-up + * read that index exists for. */ +export function readChangesSinceFor( + sql: SqlStorage, + tbl: string, + cursor: number, +): Array { + return Array.from( + sql.exec( + "SELECT seq, tbl, key, op, ts FROM _sync_changes WHERE tbl = ? AND seq > ? ORDER BY seq", + tbl, + cursor, + ), + ) +} + /** Every current row of a collection table — the initial-subscribe snapshot. */ export function snapshotAll(sql: SqlStorage, tbl: string): Array> { return Array.from(sql.exec>(`SELECT * FROM ${tbl}`)) } /** Current rows for a set of keys, for hydrating deltas. `tbl`/`pk` are - * validated identifiers (the SyncRegistry enforces this). */ + * validated identifiers (the SyncRegistry enforces this). Queries in chunks + * of 64 to avoid SQLite bound-parameter limits and eliminate the N+1 pattern: + * a reconnect catch-up over 500 keys now issues ⌈500/64⌉ = 8 queries instead + * of 500. Identifiers are quoted; values are bound parameters. */ export function hydrateRows( sql: SqlStorage, tbl: string, @@ -254,11 +274,16 @@ export function hydrateRows( keys: Array, ): Map> { const out = new Map>() - for (const k of keys) { - const rows = Array.from( - sql.exec>(`SELECT * FROM ${tbl} WHERE ${pk} = ? LIMIT 1`, k), - ) - if (rows.length > 0) out.set(k, rows[0]!) + const CHUNK = 64 // stay far below any SQLite bound-parameter limit + for (let i = 0; i < keys.length; i += CHUNK) { + const chunk = keys.slice(i, i + CHUNK) + const placeholders = chunk.map(() => "?").join(", ") + for (const row of sql.exec>( + `SELECT * FROM "${tbl}" WHERE "${pk}" IN (${placeholders})`, + ...chunk, + )) { + out.set(String(row[pk]), row) + } } return out } diff --git a/src/server/sync-do.ts b/src/server/sync-do.ts index 0716bb9..b68214d 100644 --- a/src/server/sync-do.ts +++ b/src/server/sync-do.ts @@ -27,6 +27,7 @@ import { minChangeSeq, pruneChanges, readChangesSince, + readChangesSinceFor, setDrainCursor, } from "./changes.ts" import { Broadcaster } from "./broadcast.ts" @@ -566,7 +567,7 @@ export abstract class SyncDurableObject extends since: number, seq: string, ): void { - const changes = readChangesSince(this.sql, since).filter((c) => c.tbl === coll.table) + const changes = readChangesSinceFor(this.sql, coll.table, since) const latest = new Map() for (const c of changes) latest.set(c.key, c) const liveKeys = [...latest.values()].filter((c) => c.op !== "delete").map((c) => c.key) diff --git a/tests/cdc.test.ts b/tests/cdc.test.ts index 6072812..71b0975 100644 --- a/tests/cdc.test.ts +++ b/tests/cdc.test.ts @@ -4,9 +4,11 @@ import { describe, expect, it } from "vitest" import { currentSeq, getDrainCursor, + hydrateRows, initSchema, installTriggers, readChangesSince, + readChangesSinceFor, setDrainCursor, } from "../src/server/changes.ts" @@ -113,4 +115,74 @@ describe("CDC: AFTER triggers -> _sync_changes (D12)", () => { expect(new Set(seqs).size).toBe(10) }) }) + + // WHY: chunking must not drop or duplicate keys — the batched IN query must + // return exactly the rows that exist, regardless of how many chunks are needed + // (ADR-0007 D9: TEXT pk, String(row[pk]) keying must match the changelog key). + it("hydrateRows returns exactly the existing keys across multiple chunks (no drops, no dups)", async () => { + await runInDurableObject(freshStub(), (_i, state) => { + const sql = state.storage.sql + setup(sql) + // Seed 150 rows — spans 3 chunks of 64 (64+64+22). + const ids: string[] = [] + for (let i = 0; i < 150; i++) { + const id = `row-${String(i).padStart(3, "0")}` + ids.push(id) + sql.exec("INSERT INTO items(id,name,n) VALUES(?,?,?)", id, `name-${i}`, i) + } + // Request all existing keys plus a handful of nonexistent ones. + const nonexistent = ["missing-1", "missing-2", "missing-3"] + const result = hydrateRows(sql, "items", "id", [...ids, ...nonexistent]) + // Exactly the 150 seeded rows — nonexistent keys absent, no dups. + expect(result.size).toBe(150) + for (const id of ids) { + const row = result.get(id) + expect(row).toBeDefined() + expect(row!["id"]).toBe(id) + } + for (const id of nonexistent) { + expect(result.has(id)).toBe(false) + } + }) + }) + + // WHY: readChangesSinceFor must use the (tbl,seq) index path and return only + // the requested table's rows — interleaved writes to other tables must not + // appear. A mid-stream cursor must window correctly, matching the per-table + // contract that emitCatchUp relies on for exactly-once catch-up delivery. + it("readChangesSinceFor isolates by table and windows by cursor", async () => { + await runInDurableObject(freshStub(), (_i, state) => { + const sql = state.storage.sql + initSchema(sql) + // Two tables with interleaved writes. + sql.exec(`CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY, body TEXT)`) + sql.exec(`CREATE TABLE IF NOT EXISTS files (id TEXT PRIMARY KEY, name TEXT)`) + installTriggers(sql, "messages", "id") + installTriggers(sql, "files", "id") + + sql.exec("INSERT INTO messages(id,body) VALUES('m1','hello')") + sql.exec("INSERT INTO files(id,name) VALUES('f1','doc.pdf')") + sql.exec("INSERT INTO messages(id,body) VALUES('m2','world')") + sql.exec("INSERT INTO files(id,name) VALUES('f2','img.png')") + sql.exec("INSERT INTO messages(id,body) VALUES('m3','!')") + + // readChangesSinceFor("messages", 0) returns only messages rows, ascending. + const msgRows = readChangesSinceFor(sql, "messages", 0) + expect(msgRows.map((r) => r.key)).toEqual(["m1", "m2", "m3"]) + expect(msgRows.every((r) => r.tbl === "messages")).toBe(true) + // seq values are ascending. + const seqs = msgRows.map((r) => r.seq) + expect(seqs).toEqual([...seqs].sort((a, b) => a - b)) + + // readChangesSinceFor("files", 0) returns only files rows. + const fileRows = readChangesSinceFor(sql, "files", 0) + expect(fileRows.map((r) => r.key)).toEqual(["f1", "f2"]) + expect(fileRows.every((r) => r.tbl === "files")).toBe(true) + + // Mid-stream cursor: only rows after the seq of m2 (the 3rd change overall). + // seq of m2 is msgRows[1].seq; we want only m3 after that. + const afterM2 = readChangesSinceFor(sql, "messages", msgRows[1]!.seq) + expect(afterM2.map((r) => r.key)).toEqual(["m3"]) + }) + }) })