diff --git a/docs/decisions/045-hub-storage-scaling.md b/docs/decisions/045-hub-storage-scaling.md index 4a74863c..c21d4941 100644 --- a/docs/decisions/045-hub-storage-scaling.md +++ b/docs/decisions/045-hub-storage-scaling.md @@ -8,9 +8,10 @@ > [`discussions/hub-scaling-storage-and-concurrency.md`](../discussions/hub-scaling-storage-and-concurrency.md) > and [`discussions/hub-store-separation-and-fold-policy.md`](../discussions/hub-store-separation-and-fold-policy.md). > **Amends [ADR-038](038-per-run-event-digest.md) §2** (synchronous -> digest fold → bounded-staleness deferred fold). D1 + D2 are shipped (D2 -> via the P1 class split and the P2 per-team shard — see the -> [plan](../plans/hub-storage-scaling.md)); D3 is decided, not yet built. +> digest fold → bounded-staleness deferred fold). D1 + D2 + D4 are +> shipped (D2 via the P1 class split and the P2 per-team shard — see the +> [plan](../plans/hub-storage-scaling.md); D4 the storage-maintenance +> loop); D3 is decided, not yet built. > **Audience:** contributors > **Last verified vs code:** v1.0.807-alpha @@ -144,6 +145,61 @@ independent of the DB choice. The D2 split is what makes D3 affordable — the backend choice is per-store, so a deployment can move one store (e.g. control → Postgres) without touching the others. +### D4 — automated WAL-checkpoint + incremental reclamation (amends D2) + +The D2 split leaves two storage-hygiene gaps that bit an operator (#79): +**WAL files grow unbounded** and **freed pages are never returned to the +OS**. They are *different* mechanisms and are addressed separately — a +common confusion that conflates checkpointing with `VACUUM`. + +- **WAL growth is a reader-pinning problem.** SQLite's auto-checkpoint + (default 1000 pages) can only reset the WAL up to the *oldest live + reader's snapshot*. The hub holds long-lived **SSE readers** + (Activity / transcript streams), so a continuous reader + continuous + firehose write keeps the checkpoint from ever reaching the WAL head — + it grows without bound. The fix is a periodic + `PRAGMA wal_checkpoint(TRUNCATE)` run by the hub, not a vacuum. + +- **Reclamation uses `auto_vacuum=INCREMENTAL`, not full `VACUUM`.** A + full `VACUUM` rewrites the whole file: it needs ~2× the DB size in + free disk (an ENOSPC risk on a 2 GB VPS), takes a global write lock + for an O(DB-size) duration (stop-the-world, *per shard*), and fights + the very SSE readers above. Incremental auto-vacuum instead returns + free pages to the OS in **bounded chunks**, each a short transaction + that interleaves with readers — the same pattern always-on embedded + SQLite uses (Chromium, Firefox, Android). Its two costs are immaterial + here: the per-commit pointer-map overhead is negligible against the + measured fold cost, and its no-defragment limitation barely bites an + **append-mostly** firehose (near-sequential inserts ⇒ low + fragmentation). Blob-ref externalization (the D2 prerequisite) already + removes the large transient payloads that would otherwise strand free + pages, so the residual reclamation need is modest — incremental is + cheap insurance, not a hot path. + +**The policy:** + +1. **New event/digest shards are created `auto_vacuum=INCREMENTAL`** (set + on the schema-creating writer connection at first open — it must + precede the first table). `hub.db` (control) keeps freelist reuse: low + delete volume, and converting an existing file needs a full VACUUM. +2. **A background maintenance loop** (same ctx lifetime as the other + sweeps) runs every `HUB_STORE_MAINTENANCE_INTERVAL` (default 5 m). Per + currently-open shard writer (`hub.db` + each open team's events/digest + writer) it: (a) `wal_checkpoint(TRUNCATE)`; (b) a bounded + `incremental_vacuum` **with hysteresis** — only when the freelist is + ≥25 % of the file *and* above an absolute floor, reclaiming down to a + watermark (not to zero) and capped per pass, so a still-active + firehose can't thrash returning pages it immediately re-allocates. + `incremental_vacuum` is a no-op where `auto_vacuum≠INCREMENTAL`, so it + is safe on `hub.db` and pre-D4 shards. Disable with + `HUB_STORE_MAINTENANCE_DISABLE`. Evicted teams are checkpointed on + pool close (SQLite checkpoints when the last connection closes), so a + cold team needs no loop coverage. +3. **Full `VACUUM` stays operator-only and offline** — the existing + `hub-server db vacuum` (now also sets `auto_vacuum=INCREMENTAL` before + the rebuild, so it doubles as the one-time pre-D4-shard converter). + It is never run automatically. + ## Consequences - **Digest freshness becomes visible by design** (D1): a long open turn diff --git a/hub/cmd/hub-server/db_cmd.go b/hub/cmd/hub-server/db_cmd.go index c9b42e67..6e38ce21 100644 --- a/hub/cmd/hub-server/db_cmd.go +++ b/hub/cmd/hub-server/db_cmd.go @@ -213,14 +213,23 @@ func vacuumTeamStores(dataRoot string) (reclaimed int64, files int, err error) { return reclaimed, files, nil } -// vacuumFile runs a whole-database VACUUM on a single sqlite store file. +// vacuumFile runs a whole-database VACUUM on a single per-team store file. It +// first sets auto_vacuum=INCREMENTAL: the following VACUUM rewrites the file in +// that mode, so this offline command doubles as the one-time converter for +// pre-D4 shards (created before ADR-045 D4; new shards are born INCREMENTAL). +// Changing auto_vacuum only takes effect via the subsequent VACUUM; on an +// already-INCREMENTAL shard it is a no-op. func vacuumFile(path string) error { db, err := sql.Open("sqlite", path+"?_pragma=busy_timeout(10000)") if err != nil { return err } defer db.Close() - _, err = db.ExecContext(context.Background(), "VACUUM") + ctx := context.Background() + if _, err = db.ExecContext(ctx, "PRAGMA auto_vacuum=INCREMENTAL"); err != nil { + return err + } + _, err = db.ExecContext(ctx, "VACUUM") return err } diff --git a/hub/internal/server/db.go b/hub/internal/server/db.go index cb0d3ef0..c1096c14 100644 --- a/hub/internal/server/db.go +++ b/hub/internal/server/db.go @@ -30,6 +30,16 @@ import ( // (≥800 concurrent agents, internal/server/load_test.go), a wash below it. const pragmaCommon = "_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=busy_timeout(5000)&_pragma=temp_store(2)&_pragma=mmap_size(268435456)" +// pragmaStoreAutoVacuum requests INCREMENTAL auto-vacuum (mode 2) for the +// per-team event/digest shards (ADR-045 D4). auto_vacuum is a database-level +// property fixed when the first table is created, so it MUST ride on the +// schema-creating writer connection at first open (openStorePool); on an +// already-populated file the pragma is a harmless no-op (the mode is set, and +// changing it would require a full VACUUM — that path is the operator-only +// `hub-server db vacuum`). It is deliberately kept OFF hub.db (control), which +// has low delete volume and keeps plain freelist reuse. +const pragmaStoreAutoVacuum = "&_pragma=auto_vacuum(2)" + // pragmaWriterCache is resolved once at startup. Default 64 MiB; the size (in // KiB) is operator-tunable per VPS via HUB_SQLITE_WRITER_CACHE_KB (also used to // sweep the value in load tests). Applied to the SINGLE global control writer diff --git a/hub/internal/server/server.go b/hub/internal/server/server.go index f606c44a..c3460af4 100644 --- a/hub/internal/server/server.go +++ b/hub/internal/server/server.go @@ -298,6 +298,13 @@ func (s *Server) Serve(ctx context.Context) error { if s.otlp != nil { go s.runOTLPExport(ctx) } + // Storage maintenance (ADR-045 D4): periodic wal_checkpoint(TRUNCATE) + + // bounded incremental_vacuum across open shards — bounds -wal growth (the + // SSE-reader-pinning hazard) and returns freed pages to the OS. Same ctx + // lifetime. Opt out with HUB_STORE_MAINTENANCE_DISABLE. + if os.Getenv("HUB_STORE_MAINTENANCE_DISABLE") == "" { + go s.runStoreMaintenance(ctx) + } // SIGHUP → hot-reload policy.yaml. Lets an operator edit the file and // signal the daemon without restarting and losing in-flight connections. diff --git a/hub/internal/server/store_maintenance.go b/hub/internal/server/store_maintenance.go new file mode 100644 index 00000000..39342093 --- /dev/null +++ b/hub/internal/server/store_maintenance.go @@ -0,0 +1,149 @@ +package server + +import ( + "context" + "database/sql" + "fmt" + "os" + "time" +) + +// store_maintenance.go — ADR-045 D4: automated WAL checkpointing + bounded +// incremental reclamation across the per-team shards. +// +// Two distinct storage-hygiene problems, two distinct mechanisms (do not +// conflate them): +// +// - WAL growth. SQLite's auto-checkpoint can only reset the WAL up to the +// oldest LIVE reader's snapshot. The hub holds long-lived SSE readers, so a +// continuous reader + continuous firehose write keeps the checkpoint from +// ever reaching the WAL head and it grows without bound. Fixed by a periodic +// wal_checkpoint(TRUNCATE) — NOT by VACUUM. +// - Free pages never returned to the OS. After a retention/refold delete, freed +// pages sit on the freelist (reused by future inserts, but the file stays at +// its high-water mark). Returned to the OS by incremental auto-vacuum in +// bounded chunks — NOT by a full VACUUM, whose ~2× disk + global write lock +// for an O(DB-size) duration is hostile to a small always-on VPS. +// +// New shards are created auto_vacuum=INCREMENTAL (store_split.go / openStorePool), +// so incremental_vacuum here actually returns pages; on hub.db and pre-D4 shards +// (auto_vacuum=NONE) incremental_vacuum is a documented no-op, so the same pass +// is safe everywhere. Full VACUUM stays the operator-only `hub-server db vacuum`. + +const ( + // storeVacuumWatermarkPages is the freelist slack kept after a reclamation + // pass (~512 KiB at the 4 KiB default page size). Reclaiming to zero on a + // still-active firehose would just hand back pages the next insert + // re-allocates — this watermark is the hysteresis floor. + storeVacuumWatermarkPages = 128 + // storeVacuumFreeFracPct gates a pass on the freelist being a meaningful + // fraction of the file, so a near-full file with a few free pages is left + // alone (its high-water mark IS its working set). + storeVacuumFreeFracPct = 25 + // storeVacuumMaxPagesPerPass bounds how many pages one pass returns (~8 MiB), + // so the short incremental_vacuum transaction never holds the write lock long + // even on a store with a huge freelist; the next tick continues draining. + storeVacuumMaxPagesPerPass = 2048 +) + +// maintTarget is one store-file writer pool the maintenance loop operates on, +// with a human label for logs. +type maintTarget struct { + label string + db *sql.DB +} + +// storeMaintenanceInterval is the loop cadence, operator-tunable via +// HUB_STORE_MAINTENANCE_INTERVAL (a Go duration, e.g. "2m", "10m"). +func storeMaintenanceInterval() time.Duration { + d := 5 * time.Minute + if v := os.Getenv("HUB_STORE_MAINTENANCE_INTERVAL"); v != "" { + if p, err := time.ParseDuration(v); err == nil && p > 0 { + d = p + } + } + return d +} + +// runStoreMaintenance is the maintenance loop (ADR-045 D4). It runs until ctx is +// cancelled. Started from Start() unless HUB_STORE_MAINTENANCE_DISABLE is set. +func (s *Server) runStoreMaintenance(ctx context.Context) { + ticker := time.NewTicker(storeMaintenanceInterval()) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.maintainStores(ctx) + } + } +} + +// maintainStores runs one maintenance pass over hub.db and every currently-open +// team shard writer. Cold (unopened/evicted) teams are skipped on purpose: they +// take no writes, and SQLite checkpoints a file when its last connection closes, +// so an evicted team's WAL is already truncated. +func (s *Server) maintainStores(ctx context.Context) { + if s.writeDB != nil { + s.maintainStore(ctx, s.writeDB, "hub.db") + } + if s.stores != nil { + for _, t := range s.stores.maintenanceTargets() { + select { + case <-ctx.Done(): + return + default: + } + s.maintainStore(ctx, t.db, t.label) + } + } +} + +// maintainStore checkpoints one store's WAL, then conditionally returns free +// pages to the OS. All operations run on the store's single-connection WRITER +// pool, so they serialize with the store's other writes rather than racing them. +// Every failure is best-effort and logged at debug — a missed pass is retried on +// the next tick, never an error to a caller. +func (s *Server) maintainStore(ctx context.Context, db *sql.DB, label string) { + // 1. Truncate the WAL back into the main file. TRUNCATE blocks for + // busy_timeout if a reader pins an old snapshot, then returns busy=1 + // (not an error) — we just try again next tick. + var busy, walFrames, checkpointed int + if err := db.QueryRowContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`). + Scan(&busy, &walFrames, &checkpointed); err != nil { + s.log.Debug("store maintenance: checkpoint failed", "store", label, "err", err) + } else if busy != 0 { + s.log.Debug("store maintenance: checkpoint busy (reader pinned)", "store", label) + } + + // 2. Reclaim free pages with hysteresis. incremental_vacuum is a no-op when + // the store is not auto_vacuum=INCREMENTAL, so this is safe on hub.db and + // pre-D4 shards (it simply returns nothing there). + var freelist, pageCount int + if err := db.QueryRowContext(ctx, `PRAGMA freelist_count`).Scan(&freelist); err != nil { + s.log.Debug("store maintenance: freelist_count failed", "store", label, "err", err) + return + } + if err := db.QueryRowContext(ctx, `PRAGMA page_count`).Scan(&pageCount); err != nil { + s.log.Debug("store maintenance: page_count failed", "store", label, "err", err) + return + } + if pageCount <= 0 || freelist <= storeVacuumWatermarkPages { + return + } + if freelist*100 < pageCount*storeVacuumFreeFracPct { + return // free space below the fraction gate — leave the high-water mark + } + n := freelist - storeVacuumWatermarkPages + if n > storeVacuumMaxPagesPerPass { + n = storeVacuumMaxPagesPerPass + } + if _, err := db.ExecContext(ctx, fmt.Sprintf(`PRAGMA incremental_vacuum(%d)`, n)); err != nil { + s.log.Debug("store maintenance: incremental_vacuum failed", + "store", label, "pages", n, "err", err) + return + } + s.log.Debug("store maintenance: reclaimed pages", + "store", label, "pages", n, "freelist_before", freelist, "page_count", pageCount) +} diff --git a/hub/internal/server/store_maintenance_test.go b/hub/internal/server/store_maintenance_test.go new file mode 100644 index 00000000..71329ae8 --- /dev/null +++ b/hub/internal/server/store_maintenance_test.go @@ -0,0 +1,178 @@ +package server + +import ( + "context" + "database/sql" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "strings" + "testing" +) + +// maintTestServer is a Server with only the fields maintainStore touches (the +// logger) — the maintenance pass needs no DB wiring beyond the handle passed in. +func maintTestServer() *Server { + return &Server{log: slog.New(slog.NewTextHandler(io.Discard, nil))} +} + +// freshEventsWriter opens a brand-new per-team events shard through the runtime +// path (openStorePool writer + ensureEventsSchema), so it is born +// auto_vacuum=INCREMENTAL exactly as a real shard is. +func freshEventsWriter(t *testing.T) (*sql.DB, string) { + t.Helper() + p := filepath.Join(t.TempDir(), "events.db") + w, err := openStorePool(p, true, "") + if err != nil { + t.Fatalf("openStorePool: %v", err) + } + t.Cleanup(func() { _ = w.Close() }) + if err := ensureEventsSchema(w); err != nil { + t.Fatalf("ensureEventsSchema: %v", err) + } + return w, p +} + +// insertEvents bulk-inserts n agent_events rows with a payload of ~payloadBytes, +// in one transaction. +func insertEvents(t *testing.T, db *sql.DB, n, payloadBytes int) { + t.Helper() + payload := `{"t":"` + strings.Repeat("x", payloadBytes) + `"}` + tx, err := db.Begin() + if err != nil { + t.Fatalf("begin: %v", err) + } + stmt, err := tx.Prepare(`INSERT INTO agent_events + (id, agent_id, seq, ts, kind, producer, payload_json) + VALUES (?, 'a1', ?, ?, 'text', 'agent', ?)`) + if err != nil { + t.Fatalf("prepare: %v", err) + } + for i := 0; i < n; i++ { + if _, err := stmt.Exec(fmt.Sprintf("e%d", i), i, fmt.Sprintf("ts%d", i), payload); err != nil { + t.Fatalf("insert %d: %v", i, err) + } + } + if err := tx.Commit(); err != nil { + t.Fatalf("commit: %v", err) + } +} + +func pragmaInt(t *testing.T, db *sql.DB, p string) int { + t.Helper() + var v int + if err := db.QueryRow("PRAGMA " + p).Scan(&v); err != nil { + t.Fatalf("PRAGMA %s: %v", p, err) + } + return v +} + +// TestNewShardIsIncrementalAutoVacuum pins ADR-045 D4: a shard created through +// the runtime writer path is auto_vacuum=INCREMENTAL (mode 2). +func TestNewShardIsIncrementalAutoVacuum(t *testing.T) { + w, _ := freshEventsWriter(t) + if av := pragmaInt(t, w, "auto_vacuum"); av != 2 { + t.Fatalf("auto_vacuum = %d, want 2 (INCREMENTAL)", av) + } +} + +// TestMaintainStoreReclaimsFreePages: after a bulk delete leaves a large +// freelist, one maintenance pass returns pages to the OS (page_count shrinks, +// freelist shrinks), bounded by the per-pass cap. +func TestMaintainStoreReclaimsFreePages(t *testing.T) { + w, _ := freshEventsWriter(t) + insertEvents(t, w, 3000, 1200) + if _, err := w.Exec(`DELETE FROM agent_events WHERE seq < 2950`); err != nil { + t.Fatalf("delete: %v", err) + } + flBefore := pragmaInt(t, w, "freelist_count") + pcBefore := pragmaInt(t, w, "page_count") + if flBefore <= storeVacuumWatermarkPages { + t.Fatalf("setup: freelist %d not above watermark %d — test can't exercise reclaim", + flBefore, storeVacuumWatermarkPages) + } + + maintTestServer().maintainStore(context.Background(), w, "test/events.db") + + flAfter := pragmaInt(t, w, "freelist_count") + pcAfter := pragmaInt(t, w, "page_count") + if pcAfter >= pcBefore { + t.Errorf("page_count did not shrink: before=%d after=%d", pcBefore, pcAfter) + } + if flAfter >= flBefore { + t.Errorf("freelist did not shrink: before=%d after=%d", flBefore, flAfter) + } + // Per-pass cap honored: no single pass reclaims more than the bound. + if reclaimed := flBefore - flAfter; reclaimed > storeVacuumMaxPagesPerPass { + t.Errorf("reclaimed %d pages in one pass, exceeds cap %d", reclaimed, storeVacuumMaxPagesPerPass) + } +} + +// TestMaintainStoreNoReclaimBelowThreshold: a store with little free space keeps +// its high-water mark (the hysteresis gate refuses to thrash). +func TestMaintainStoreNoReclaimBelowThreshold(t *testing.T) { + w, _ := freshEventsWriter(t) + insertEvents(t, w, 200, 400) // no deletes → freelist ~0 + pcBefore := pragmaInt(t, w, "page_count") + + maintTestServer().maintainStore(context.Background(), w, "test/events.db") + + if pcAfter := pragmaInt(t, w, "page_count"); pcAfter != pcBefore { + t.Errorf("page_count changed despite tiny freelist: before=%d after=%d", pcBefore, pcAfter) + } +} + +// TestMaintainStoreSafeOnNonIncremental: on an auto_vacuum=NONE store (hub.db / +// legacy shards), the pass runs without error and incremental_vacuum is a no-op +// (the file is not rewritten). +func TestMaintainStoreSafeOnNonIncremental(t *testing.T) { + p := filepath.Join(t.TempDir(), "events.db") + db, err := ensureEventsStore(p) // dsnFKOff → no auto_vacuum → mode NONE + if err != nil { + t.Fatalf("ensureEventsStore: %v", err) + } + defer db.Close() + if av := pragmaInt(t, db, "auto_vacuum"); av != 0 { + t.Fatalf("precondition: auto_vacuum = %d, want 0 (NONE)", av) + } + insertEvents(t, db, 2000, 1200) + if _, err := db.Exec(`DELETE FROM agent_events WHERE seq < 1950`); err != nil { + t.Fatalf("delete: %v", err) + } + pcBefore := pragmaInt(t, db, "page_count") + + maintTestServer().maintainStore(context.Background(), db, "legacy/events.db") + + if pcAfter := pragmaInt(t, db, "page_count"); pcAfter != pcBefore { + t.Errorf("page_count changed on NONE store (incremental_vacuum should no-op): before=%d after=%d", + pcBefore, pcAfter) + } +} + +// TestMaintainStoreTruncatesWAL: the checkpoint(TRUNCATE) step shrinks the -wal +// sidecar back toward zero. +func TestMaintainStoreTruncatesWAL(t *testing.T) { + w, p := freshEventsWriter(t) + insertEvents(t, w, 3000, 1200) // grow the WAL + walPath := p + "-wal" + fi, err := os.Stat(walPath) + if err != nil { + t.Fatalf("stat -wal: %v", err) + } + walBefore := fi.Size() + if walBefore == 0 { + t.Skip("WAL already empty before checkpoint (auto-checkpoint raced) — nothing to assert") + } + + maintTestServer().maintainStore(context.Background(), w, "test/events.db") + + fi, err = os.Stat(walPath) + if err != nil { + t.Fatalf("stat -wal after: %v", err) + } + if fi.Size() >= walBefore { + t.Errorf("WAL not truncated: before=%d after=%d", walBefore, fi.Size()) + } +} diff --git a/hub/internal/server/store_registry.go b/hub/internal/server/store_registry.go index 89ce1a4c..48296f9e 100644 --- a/hub/internal/server/store_registry.go +++ b/hub/internal/server/store_registry.go @@ -198,6 +198,29 @@ func (r *teamStores) openCount() int { return r.lru.Len() } +// maintenanceTargets snapshots the writer pool of every currently-open team's +// events + digest shard (ADR-045 D4). It returns the *sql.DB handles under the +// lock but performs no I/O — the caller (the maintenance loop) runs checkpoint / +// incremental_vacuum on them outside the lock. A handle in the snapshot can be +// evicted+closed before the caller uses it; (*sql.DB) operations on a closed +// pool return an error the loop already treats as best-effort, so the race is +// benign (the evicted file was checkpointed on close anyway). +func (r *teamStores) maintenanceTargets() []maintTarget { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]maintTarget, 0, r.lru.Len()*2) + for el := r.lru.Front(); el != nil; el = el.Next() { + h := el.Value.(*teamHandles) + if h.eventsW != nil { + out = append(out, maintTarget{label: h.team + "/events.db", db: h.eventsW}) + } + if h.digestW != nil { + out = append(out, maintTarget{label: h.team + "/digest.db", db: h.digestW}) + } + } + return out +} + // ensurePerTeamLayout resolves the per-team store layout at startup (ADR-045 // P2). Unlike P1's ensureStoreSplit it opens NO global store — the registry // opens each team's shard lazily — so its only job is to validate the boot state diff --git a/hub/internal/server/store_split.go b/hub/internal/server/store_split.go index 0d751288..14182318 100644 --- a/hub/internal/server/store_split.go +++ b/hub/internal/server/store_split.go @@ -164,7 +164,11 @@ func storePathsFor(controlPath string) (eventsPath, digestPath string) { func openStorePool(path string, writer bool, cachePragma string) (*sql.DB, error) { dsn := path + "?_pragma=foreign_keys(1)&" + pragmaCommon if writer { - dsn += cachePragma + // The writer is the schema-creating connection (ensureEventsSchema / + // ensureDigestSchema run on it), so auto_vacuum=INCREMENTAL must ride + // here to take effect on a fresh shard before its first table (ADR-045 + // D4); no-op on an already-populated shard. + dsn += cachePragma + pragmaStoreAutoVacuum } db, err := sql.Open("sqlite", dsn) if err != nil {