From 15897a2a743d9fbf627e17683275e5304d60775f Mon Sep 17 00:00:00 2001 From: Jared Lunde Date: Tue, 2 Jun 2026 19:25:54 -0700 Subject: [PATCH] fix(ci): serialize rust unit/integration tests (io_uring ENOMEM) + arch docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `cargo test --lib` failed on `main` in CI: each test builds its own monoio io_uring runtime, and running them in parallel exhausts the runner's lockable memory — `io_uring_setup` returns ENOMEM and the runtime build panics. The homelab kernel doesn't account io_uring against that limit, so it never reproduced locally. Run the unit + integration suites with `--test-threads=1` so only one ring exists at a time. Also: architecture-doc updates. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/engine/ARCHITECTURE.md | 281 ++++++++++++++++++++++++++ crates/engine/src/log/ARCHITECTURE.md | 277 +++++++++++++++++++------ mise.toml | 7 +- 3 files changed, 499 insertions(+), 66 deletions(-) create mode 100644 crates/engine/ARCHITECTURE.md diff --git a/crates/engine/ARCHITECTURE.md b/crates/engine/ARCHITECTURE.md new file mode 100644 index 0000000..1c3800f --- /dev/null +++ b/crates/engine/ARCHITECTURE.md @@ -0,0 +1,281 @@ +# Engine Architecture + +Takes key-value operations (GET/SET/DEL/SCAN/WATCH/CAS/INCR and their bulk variants) against named namespaces, routes them through an S3-FIFO in-memory cache and a per-namespace append-only log on disk (via io_uring), and pushes change events to registered watch subscribers — all on a single `monoio` thread per shard. + +## Data Flow + +### Read (GET / MGET) + +``` +get(ns, key) + │ + ├─► L1 cache lookup (ns\x00key) + │ hit ──► bump freq → return Entry (no I/O) + │ miss ──► cache_miss_count++ + │ + ├─► NsIndex::get(key) [in-RAM BTreeMap] + │ None ──► return None + │ expired ──► tombstone(key) [async, io_uring] → remove from cache → None + │ + ├─► NamespaceLog::read_value(entry) [io_uring positioned read] + │ VALUE_SEP flag? → ValueStore::get(hash) [blob read] + │ inline? → slice record bytes + │ + └─► cache.insert(ns\x00key, value, …) → return Entry + +mget: same path; cold reads dispatched concurrently via join_all → parallel io_uring + expired keys tombstoned in one join_all batch +``` + +### Write (SET / MSET) + +``` +set(ns, key, value, opts) + │ + ├─► ensure_ns(ns) [open NamespaceLog lazily if first access] + │ + ├─► KEEPTTL? → index.ttl(key) else opts.ttl → absolute ms + │ + ├─► value >= value_sep_threshold? + │ yes → ValueStore::put(value) [BLAKE3-128, write-once, fsync blob + dir] + │ returns content hash (16 bytes) → stored as val in record + │ no → value bytes stored inline + │ + ├─► NamespaceLog::put_full(key, val, meta, expires_at_ms) + │ RecordHeader::encode() → LogFile::append() → io_uring write_at + │ NsIndex::insert() ← new offset + tstamp_ms (revision) + │ + ├─► cache.try_update(ns\x00key) || cache.insert(…) + │ + └─► WatchRegistry::notify(ns, key, WatchEvent::Set{…}) + +mset: put_many() batches N records; single fsync for the whole batch +``` + +### Delete (DEL) + +``` +del(ns, keys[]) + │ + ├─► snapshot is_expired per key [index borrow, sync] + │ + ├─► join_all(tombstone(k) for k in keys) [parallel io_uring] + │ + ├─► cache.remove(ns\x00k) for each key + │ + └─► for each key where tombstone returned Some(revision) && !was_expired: + count++; WatchRegistry::notify(Del{…}) + return count +``` + +### CAS Write (SETREV / SETNX / SETXX) + +``` +setrev(ns, key, value, expected_rev) + │ + ├─► put_full_cond(key, value, meta, ttl, WriteCondition::Revision(expected_rev)) + │ index borrow → check tstamp_ms == expected_rev + │ mismatch → return None (no write, no append) + │ match → append record → update index → return Some(new_revision) + │ + ├─► hit: cache.try_update + WatchRegistry::notify + └─► miss: return None (caller sees CAS failure) + +setnx → WriteCondition::KeyAbsent +setxx → WriteCondition::KeyPresent +delrev → tombstone_cond(key, expected_rev) +``` + +### INCR (optimistic CAS loop) + +``` +incr(ns, key, delta) — up to 64 retries + │ + ├─► try { + │ read current value from cache or disk (+ revision + ttl) + │ parse as i64; add delta; check overflow + │ put_full_cond(WriteCondition::Revision(rev) or KeyAbsent) + │ None → CAS lost → retry + │ Some(t) → update cache; notify watchers; return new_val + │ } + └─► 64 failures → EngineError::Conflict +``` + +### Watch Subscribe + +``` +watch_subscribe(ns, filter, since) + │ + ├─► WatchRegistry::subscribe_key/prefix → mpsc::Receiver (cap 512) + │ (subscribe FIRST — live events start queuing immediately) + │ + ├─► since == 0 → current_entries(&filter) [index snapshot of live keys] + │ since > 0 → scan_since(&filter, since) [replay log records with tstamp > since] + │ + └─► return (initial_events, receiver) + caller deduplicates by revision (a write between subscribe + scan appears in both) +``` + +### Background / Periodic Paths + +``` +sync_logs() — fsync all namespaces; appendfsync-everysec timer +sweep_cache() — bulk-evict expired L1 entries; background timer +reclaim_if_needed() — compaction: seal + merge when sealed_count > threshold +seal_all_for_shutdown() — freeze all namespaces, drain in-flight, write footers +``` + +## Concepts & Terminology + +| Term | What It Controls | NOT | +| ---------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------ | +| `ShardStore` | The entire shard: L1 cache + all namespaces + watch registry; public KV API | Not thread-safe (`!Sync`); lives behind `Rc` on one monoio worker | +| Namespace | An independent key-space with its own log, index, and value store | Not a security/tenant boundary — any call can name any namespace | +| Cache key | `{ns}\x00{key}` — the `\x00` byte separates namespace from key | Not the on-disk key; the separator prevents `ns="a", key="bc"` colliding with `ns="ab", key="c"` | +| `MemCache` | S3-FIFO L1 cache; sized in bytes; evicts cold entries; not persisted | Not a write-through buffer — it caches reads and is updated on writes | +| Ghost set | Remembers recently evicted keys; a re-insert goes to Main instead of Small | Not a negative cache — does not cause misses; only affects queue placement | +| `NamespaceLog` | All reads/writes for one namespace; owns the `NsIndex` and file set | See `src/log/ARCHITECTURE.md` for full log internals | +| `ValueStore` | Content-addressed blob store at `{ns_dir}/values/`; dedup + refcounted | Not for small values — only keys with values >= `value_sep_threshold` | +| `WatchRegistry` | In-process pub/sub; holds senders per (ns, key) or (ns, prefix) | Not persistent — subscriptions are lost on restart | +| Revision | `tstamp_ms` of the write record; monotonically increasing; used for CAS and WATCH | Not a sequence number — two writes on different shards may have the same ms | +| `WriteCondition` | Guards appends: `KeyAbsent`, `KeyPresent`, or `Revision(u64)` CAS check | The check happens against the in-memory index; serialized by the per-key stripe lock | +| Frozen | A per-namespace flag set by `freeze_and_drain()`; new writes return `EngineError::Frozen` | Cleared by `unfreeze()` on handoff abort; not set during normal operation | +| `flush()` | Destroys all namespace files and recreates them (FLUSHDB) | Not fsync — this erases all data in the namespace | + +## Core Mechanisms + +### L1 Cache (S3-FIFO) — `cache.rs` + +New entries enter the **Small** queue (10% of capacity). On eviction from Small: + +- `freq == 1` (accessed since insertion) → promoted to **Main** (90%). +- `freq == 0` → evicted and recorded in the **ghost set** (bounded ~10% of est. capacity). + +A new insert that is in the ghost set skips Small and goes directly to Main — it has demonstrated re-use and survives the next eviction pass. This prevents one-hit items from polluting Main while protecting hot entries that get evicted under burst pressure. + +Cache key is `{ns}\x00{key}`. Keys ≤ 128 bytes total are built on the stack (`with_cache_key` uses a `[u8; 128]` buf) — no heap allocation on the hot read path. + +`try_update` updates an existing entry in-place using a borrowed key slice (no alloc). `insert` falls back to an owned `Bytes` key only on cache misses. + +### Write Path — no locks, no mutex + +`ShardStore` lives on a single `monoio` thread. There are no cross-thread locks on the write path. Concurrency within one thread is cooperative (`.await` yield points only), so index reads and writes between yields are safe. + +Per-key CAS serialization (`WriteCondition`) is handled inside `NamespaceLog::put_full_cond`: it takes a short per-key stripe lock (16 stripes), re-checks the condition against the current index entry, and only appends if the condition holds. This makes INCR's optimistic CAS loop correct: a lost race writes nothing (the append is suppressed) and signals a retry via `None`. + +### Watch Registry — `watch.rs` + +`WatchRegistry` holds: + +- `keys: FxHashMap<(ns, key), Vec>` for exact-key subscriptions (multiple subscribers per key). +- `prefixes: Vec<((ns, prefix), Sender)>` for prefix subscriptions (scanned linearly on every notify). + +`notify` tries to send on all matching senders; `try_send` failure (closed or full channel) prunes the sender lazily. Channel capacity is 512; a slow subscriber that fills its buffer is dropped rather than back-pressuring writes. + +Hard cap: 65,536 total live subscriptions. Dead senders are pruned before enforcing the cap, so subscriber churn never produces false capacity errors. + +`watch_subscribe` subscribes **before** scanning initial state to prevent a race where a write arrives between scan and subscribe and is missed. Writes that arrive between subscribe and scan appear in both initial events and the live channel; callers deduplicate by revision. + +### Namespace Isolation — `store.rs:ensure_ns` + +Namespaces are lazily opened on first access. Up to 1,024 namespaces per shard. A post-await dedup prevents concurrent first-opens of the same name from inserting twice. The cap is re-checked after the await for the same reason. + +Namespace names: 1–64 bytes, ASCII alphanumeric, `_`, or `-`. Invalid names return `EngineError::InvalidNamespace` immediately without touching the filesystem. + +DB index mapping: `db == 0` → `"default"`, `db == n` → `"dn"` (e.g. `db3` → `"db3"`). See `store.rs:ns_name`. + +### Glob Matching — `store.rs:glob_match` + +SCAN pattern matching supports `*`, `?`, `[abc]`, `[^abc]`, `[a-z]`. A fast path locates the first metacharacter with `memchr3` (SIMD-accelerated); bytes before it are compared as a literal prefix using `memcmp`. The common `"prefix*"` pattern short-circuits after the literal check. + +## State Machine + +``` + open() + │ + ▼ + NORMAL ◄─────────────── unfreeze() + (writable) │ + │ │ +freeze_and_drain() resume_after_abort() + │ │ + ▼ │ + FROZEN ─── seal_all() ──► SEALED + (writes return (footer written, + Frozen error) ready for handoff) +``` + +| State | Writes | `seal_all_for_shutdown` result | How to Exit | +| ------ | ------------- | ------------------------------ | -------------------------------------------------------- | +| NORMAL | accepted | n/a | `freeze_and_drain()` | +| FROZEN | `Err(Frozen)` | blocks until in-flight drain | `resume_after_abort()` | +| SEALED | `Err(Frozen)` | footers written | `resume_after_abort()` reopens active log + `unfreeze()` | + +Freeze/seal is a shard-handoff mechanism, not a normal shutdown path. SIGTERM calls `seal_all_for_shutdown` directly (freeze + drain + seal in one call). + +## Why It Behaves This Way + +### Why the cache uses S3-FIFO instead of LRU + +S3-FIFO evicts one-hit items (frequency 0 in Small) immediately without giving them space in Main. This matches the KV workload where a significant fraction of reads are one-shot (bulk imports, range scans, ephemeral keys). A ghost set promotes re-inserts directly to Main so keys that were evicted under burst pressure regain their slot without burning another Small→Main promotion cycle. + +LRU would require touching a per-item node on every access (pointer chasing). S3-FIFO sets a single `freq` byte per access and does queue manipulation only on eviction. The `Cell` freq field is cache-line-friendly. + +### Why subscribe happens before scan in `watch_subscribe` + +A write arriving between "read current state" and "register receiver" would be invisible: the scan misses it (wrote after snapshot) and the receiver misses it (registered after write). Subscribing first queues all live writes immediately; the initial scan then covers everything written before the subscribe. Duplicates (writes between subscribe and scan) are filtered by revision at the call site. + +### Why INCR uses a CAS loop instead of a dedicated lock + +INCR needs read-modify-write atomicity. A dedicated per-key mutex would block the event loop thread during the `.await` between read and write. Instead, INCR reads the current revision, computes the new value, then calls `put_full_cond(WriteCondition::Revision(rev))`. If a concurrent INCR wins the stripe and bumps the revision, `put_full_cond` returns `None` and INCR retries with the new value. No lock held across the I/O. 64 retries caps pathological livelock. + +### Why cache keys use `\x00` as the namespace separator + +Namespaces are ASCII alphanumeric plus `_` and `-`. None of these include `\x00`. Using `\x00` as the separator makes `ns="a", key="bc"` and `ns="ab", key="c"` produce distinct cache keys (`a\x00bc` vs `ab\x00c`) without a separate length prefix. The validation in `is_valid_ns_name` enforces this invariant. + +### Why `flush()` unlinks and recreates instead of truncating + +A truncate leaves the file descriptor open with length 0; any in-flight read at the old offset would return zeros rather than an I/O error, silently corrupting a response. Unlinking the old file and creating a new one ensures that: (a) in-flight reads on the old fd complete correctly against the old data (the inode stays alive until the last fd closes), and (b) new reads see a clean empty file. It also gives a new inode number that CoW snapshot invalidation can observe. + +## Package Structure + +| File | What It Does | +| -------------------- | ---------------------------------------------------------------------------------------- | +| `src/store.rs` | `ShardStore`: public KV API; routes operations through cache → index → log | +| `src/cache.rs` | `MemCache`: S3-FIFO in-memory L1 cache; eviction, ghost set, prefix removal | +| `src/watch.rs` | `WatchRegistry`: exact-key and prefix pub/sub; live change events | +| `src/value_store.rs` | `ValueStore`: content-addressed blob store for large (value-separated) values | +| `src/types.rs` | Shared types: `Entry`, `SetOptions`, `TtlResult`, `GetExOp`, `ScanPage` | +| `src/error.rs` | `EngineError` enum; all error variants used across the crate | +| `src/log/` | Append-only log, index, reclaim, recovery, record format — see `src/log/ARCHITECTURE.md` | +| `benches/engine.rs` | Divan benchmarks: cache hits/misses, set (append/overwrite/sync), mget warm/cold | +| `tests/emfile.rs` | Smoke test: EMFILE (too many open files) handling | +| `tests/writeamp.rs` | Write amplification test: EXPIRE on large value must not rewrite the value | + +## Configuration + +`ShardStore::open(data_dir, memory_bytes)` reads env vars at startup: + +| Variable | Default | What It Controls | +| ------------------------ | ------- | ------------------------------------------------------------------------------------------------------ | +| `memory_bytes` (arg) | caller | Hard byte cap on L1 cache (`MemCache::max_bytes`); excess entries evicted immediately on insert | +| `KV_COMPACTION_FANOUT` | 8 | Size-tiered fanout: a log level merges once it holds this many runs (clamped `>= 2`) | +| `KV_VALUE_SEP_THRESHOLD` | 131072 | Values `>=` this byte count go to `ValueStore` instead of inline; one GlideFS block = 128 KiB | +| `KV_TEST_FAIL_ONCE_FILE` | unset | If set to a path that exists at seal time, injects `TestSealFailure` once (unlinks file after trigger) | + +`LogConfig` fields `rotate_threshold` (1 GiB) and `fanout` (8) are set from env vars at `ShardStore::open`; `value_sep_threshold` (128 KiB) likewise. See `src/log/config.rs`. + +## Failure Modes + +| Failure | What Actually Happens | Recovery | +| ----------------------------------------- | ------------------------------------------------------------------------- | -------------------------------------------------------------------------------- | +| L1 cache eviction under memory pressure | Cold entries dropped from Small/Main; next read goes to disk via io_uring | Transparent; cache reloads on next access | +| Watch channel full (512 items) | Sender pruned; subscriber receives no further events on that channel | Subscriber reconnects; new subscription with `since=last_seen_revision` | +| Namespace limit reached (1,024) | `ensure_ns` returns `EngineError::CapacityExceeded`; op fails | Drop unused namespaces or reduce shard count | +| Watch subscription limit reached (65,536) | New subscribe returns `CapacityExceeded` after pruning dead senders | Subscribers must reconnect after dead senders are reclaimed | +| INCR CAS loop exhausted (64 tries) | Returns `EngineError::Conflict` | Caller retries; only fires under pathological same-key contention | +| `seal_all_for_shutdown` I/O error | First error returned; other namespace seals may or may not have succeeded | Next startup falls back to full record replay for any namespace missing a footer | +| `freeze_and_drain` + crash before footer | No footer written; active file treated as crashed on next open | Recovery replays records up to first bad CRC; no data loss beyond last fsync | +| FLUSHDB called accidentally | All namespace files unlinked + recreated; data is gone | No recovery — FLUSHDB is destructive; `ValueStore::clear()` also runs | +| Process crash mid-write | Partial record at tail of active file | Recovery truncates at first bad CRC; see `src/log/ARCHITECTURE.md` | +| io_uring not available | `monoio::FusionDriver` falls back to legacy epoll driver | Performance degrades; correctness preserved | diff --git a/crates/engine/src/log/ARCHITECTURE.md b/crates/engine/src/log/ARCHITECTURE.md index bda1f45..7497ef2 100644 --- a/crates/engine/src/log/ARCHITECTURE.md +++ b/crates/engine/src/log/ARCHITECTURE.md @@ -10,18 +10,63 @@ Accepts key-value writes as append-only records on disk, maintains an in-memory Caller │ ▼ -NamespaceLog::put_full() / put_many() +await_reclaim() ← stalls at 500µs intervals while reclaim/flush holds the flag │ - ├─► RecordHeader::encode() — serialize header + body into buf - │ [crc64 | tstamp_ms | flags | expires_at_ms | key_sz | val_sz | meta_sz | key | val | meta] + ▼ +begin_write() → WriteGuard ← increments in_flight_writes; returns Err(Frozen) if frozen + │ + ▼ +wlock(key).lock() ← per-key write stripe (64 stripes; FxHash & 63) + │ same key → same stripe → serialized + │ different keys → (usually) different stripes → concurrent + │ + ├─► (put_full_cond only) cond.check(live_rev(index, key, now)) + │ None → return Ok(None) (no write, no blob, no append) + │ + ├─► next_tstamp() ← wall clock; nudges +1 if clock didn't advance + │ + ├─► value >= value_sep_threshold? + │ yes → ValueStore::put(value) [blake3-hash, write-once, fsync blob + dir] + │ → content_hash (16 bytes) stored as val; VALUE_SEP flag set + │ on append failure → values.unref(hash) [rollback] + │ no → value bytes stored inline + │ + ├─► pool_acquire_write(capacity) + │ RecordHeader::encode_into(buf, tstamp, flags, exp, key, val, meta) + │ + ├─► LogFile::append(buf) + │ poisoned? → Err("log file poisoned") + │ reserve write_offset (Cell read+set, no lock — single-threaded) + │ monoio write_all_at(buf, offset) [io_uring positioned write] + │ I/O error? → poisoned = true; Err(io) + │ + ├─► pool_release_write(buf) + │ + ├─► NsIndex::insert(key, IndexEntry{file_id, offset, size, tstamp}, expires_at_ms) + │ old valsep hash? → values.unref(old_hash) + │ + ├─► write_offset >= rotate_threshold? + │ yes → rotate_active() [seal footer + open new active file] + │ + └─► return Ok(tstamp) +``` + +### `put_many` (MSET) — single write + single fsync + +``` +put_many(pairs) + │ + ├─► collect distinct stripe indices for all keys + │ sort + dedup → acquire all stripes in sorted order [deadlock prevention] │ - ├─► LogFile::append() - │ reserve write_offset with Cell (atomic increment, no lock needed) - │ └─► monoio write_at() — io_uring positioned write + ├─► for each pair: + │ next_tstamp(); maybe_separate(value); encode_into(buf) │ - ├─► (if put_many) LogFile::sync() — single fsync for the whole batch + ├─► LogFile::append(whole_buf) — one io_uring write for all N records │ - └─► NsIndex::insert() — update in-memory index to point at new offset + ├─► NsIndex bulk insert (one index.borrow_mut(), N inserts) + │ + └─► return Vec (one per pair, in input order) ``` ### Read (read_value / bulk_read) @@ -38,22 +83,30 @@ NsIndex::get() — look up file_id + record_offset + record_size │ ▼ LogFile::read_exact(record_offset, record_size) - └─► monoio read_at() — io_uring positioned read + pool_acquire(size) — exact-capacity match (monoio passes capacity to io_uring) + └─► monoio read_exact_at() [io_uring positioned read] + │ + ▼ + extract_value_meta() — parse header, verify CRC64, slice value + meta │ ▼ - RecordHeader::decode() — verify CRC64, parse flags, extract value slice + VALUE_SEP flag? + inline → return value bytes + sep → values.get(hash) + re-hash to verify BLAKE3-128 content ``` ### Batch read ``` -bulk_read(entries) +bulk_read([(slot, IndexEntry), ...]) │ - ▼ -futures::join_all([read_exact(), read_exact(), ...]) — concurrent io_uring ops + ├─► join_all([read_exact(), ...]) — concurrent io_uring SQEs │ - ▼ -[Option, ...] + ├─► extract_value_meta() for each result + │ + ├─► join_all([deref(value, flags), ...]) — concurrent blob fetches (VALUE_SEP) + │ + └─► [(slot, value, metadata), ...] ``` ### Reclaim (size-tiered compaction) @@ -65,19 +118,24 @@ on GlideFS a reclaim re-uploads one level rather than the whole namespace. ``` NamespaceLog::reclaim() │ - ├─ 1. Seal active file — write footer — and insert it as a fresh level-0 run + ├─ 0. reclaim_in_progress.replace(true) — atomic gate; second caller gets ReclamationBusy + │ drain in_flight_writes to 0 [existing writes finish; new ones stall in await_reclaim] + │ + ├─ 1. Seal active file — write footer — insert it as a fresh level-0 run │ ├─ 2. Cascade: while some level L holds >= `fanout` runs: │ │ │ ├─ collect that level's live records (index entries with those file_ids) - │ ├─ reclaim_namespace(): read them concurrently, write one merged file + │ ├─ reclaim::reclaim_namespace(): read them concurrently, write one merged file │ │ to data-{next_id}.log.tmp, footer + fsync, rename .tmp → .log, │ │ fsync dir, unlink the input files (leak-logged, never errors) │ ├─ open_ro the merged file FIRST (only fallible step), THEN swap index │ │ + sealed map atomically — a failed open leaves state consistent │ └─ tag the merged run at level L+1 │ - └─ 3. Open a fresh active LogFile → return ReclaimReport + ├─ 3. Open a fresh active LogFile → sync_dir + │ + └─ 4. reclaim_in_progress.set(false) → return ReclaimReport ``` `fanout` (default 8) is the per-level run count that triggers a merge. Levels @@ -111,36 +169,74 @@ open_namespace(dir, config) seed the revision clock from the highest recovered tstamp_ms ``` +### Watch Replay (scan_since / current_entries) + +``` +watch_subscribe(filter, since=0) + → current_entries(filter, now) + index.iter() → filter live matching keys + bulk_read(all matches) [concurrent io_uring] + return Vec + +watch_subscribe(filter, since>0) + → scan_since(filter, since_revision) + for each file (sealed asc, then active): + end = data_end_offset() ← reads magic at EOF to exclude footer bytes + scan_file_records(file, end, filter, since_revision, values, &mut events) + header read → parse_header → verify_crc (stop on mismatch) + tstamp_ms > since_revision && !TTL_UPDATE? → WatchEvent + VALUE_SEP? → values.get(hash) + re-hash verify + events.sort_by_key(revision) + return events +``` + ## Concepts & Terminology -| Term | What It Controls | NOT | -| -------------- | --------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------- | -| `NamespaceLog` | All reads/writes for one key-space; owns the index and file set | Not a shard — multiple namespaces can live in one shard | -| `LogFile` | One `data-{id}.log` file; tracks write offset, exposes positioned I/O | Not a WAL segment; the log IS the store | -| `active` file | The only writable file at any time; receives all new appends | Not memory-mapped; accessed via io_uring | -| `sealed` files | Immutable; readable only; eligible for reclaim | Not deleted until reclaim completes the rename | -| Footer | Per-key metadata block at the end of a file; enables fast recovery | Written to the active file on clean shutdown; absence means crash or in-progress | -| Tombstone | A record with the `TOMBSTONE` flag; marks a key as deleted in the log | Not a physical delete — the old record remains until reclaim | -| TTL-update | A tiny record with the `TTL_UPDATE` flag; updates expiry with no value copy | Not authoritative until replayed against the index | -| `NsIndex` | In-memory key → `IndexEntry` map (`BTreeMap`, ordered for SCAN) + TTL sidecar + value-sep sidecar | Not persisted — rebuilt from log on every open | -| `IndexEntry` | 24-byte struct: record_offset (u64) + record_size (u32) + file_id (u32) + tstamp_ms (u64) | Does not hold the value, the key, or flags; `tstamp_ms` doubles as the CAS revision | -| `ValueStore` | Content-addressed blob store for large (value-separated) values; refcounted, deduped, deferred-GC | Not in the log — compaction moves only the 16-byte pointer, never the blob | -| Reclaim | GC: rewrites live keys into one new file; auto-triggered by sealed-file count threshold or `BGREWRITEAOF` | Caller must serialize with writes; cannot run concurrently with appends | -| `flush()` | Unlinks and recreates all files (CoW snapshot invalidation) | Not fsync — this destroys all data in the namespace | +| Term | What It Controls | NOT | +| -------------- | --------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------- | +| `NamespaceLog` | All reads/writes for one key-space; owns the index, file set, write stripes, and value store | Not a shard — multiple namespaces can live in one shard | +| `LogFile` | One `data-{id}.log` file; tracks write offset, exposes positioned I/O; poisons on write error | Not a WAL segment; the log IS the store | +| `active` file | The only writable file at any time; receives all new appends | Not memory-mapped; accessed via io_uring | +| `sealed` files | Immutable; readable only; eligible for reclaim | Not deleted until reclaim completes the rename | +| Footer | Per-key metadata block at the end of a file; enables fast recovery | Written to the active file on clean shutdown or rotation; absence means crash or active-in-progress | +| Tombstone | A record with the `TOMBSTONE` flag; marks a key as deleted in the log | Not a physical delete — the old record remains until reclaim | +| TTL-update | A tiny record with the `TTL_UPDATE` flag; updates expiry with no value copy | Not authoritative until replayed against the index; skipped by `scan_since` | +| `NsIndex` | In-memory key → `IndexEntry` map (`BTreeMap`, ordered for SCAN) + TTL sidecar + value-sep sidecar | Not persisted — rebuilt from log on every open | +| `IndexEntry` | 24-byte struct: record_offset (u64) + record_size (u32) + file_id (u32) + tstamp_ms (u64) | Does not hold the value, the key, or flags; `tstamp_ms` doubles as the CAS revision | +| `ValueStore` | Content-addressed blob store for large (value-separated) values; refcounted, deduped, deferred-GC | Not in the log — compaction moves only the 16-byte pointer, never the blob | +| Write stripe | One of 64 `Mutex<()>` buckets keyed by FxHash of the key; serializes same-key writes | Not a per-file or per-namespace lock; different keys usually hit different stripes | +| Reclaim | GC: rewrites live keys into one new file; auto-triggered by sealed-file count threshold or `BGREWRITEAOF` | Writes stall (await_reclaim); they do not error while reclaim runs | +| `flush()` | Unlinks and recreates all files (CoW snapshot invalidation) | Not fsync — this destroys all data in the namespace | +| `poisoned` | `Cell` on `LogFile`; set on any append I/O error; all subsequent appends return `Err` immediately | Not set on read errors; only write failures trigger poisoning | +| `frozen` | `Cell` on `NamespaceLog`; set by `freeze_and_drain`; `begin_write` returns `Err(Frozen)` | Cleared by `unfreeze()` on handoff abort; not set during normal operation | ## Core Mechanisms -### Append and offset reservation (`file.rs:append`) +### Append and offset reservation (`file.rs:LogFile::append`) `LogFile` serializes concurrent appends without a mutex by using a `Cell` as a reservation counter. Each caller atomically reads-and-increments the offset before issuing its `write_at`. Because io_uring writes are positioned, two concurrent appends to non-overlapping offsets are safe. This means `put_many()` can issue all its record writes before calling a single `sync()`, collapsing N fsyncs into 1. +If an append I/O fails, `append` sets `poisoned = true` and all subsequent append calls return an error immediately — without trying to write. This prevents a later write from landing past the torn slot, which would survive recovery while records between it and the truncation point are silently dropped. `file.rs:enospc_tests` verifies this property. + +### Write stripe locking (`mod.rs:NamespaceLog::wlock`) + +Every mutating method (`put_full`, `put_full_cond`, `tombstone`, `tombstone_cond`, `ttl_update`) holds the per-key write stripe across the full check-and-append cycle. With 64 stripes, distinct keys hash to distinct buckets in the common case — writes to different keys stay concurrent. Same-key writes serialize: `put_full_cond` checks the condition **after** acquiring the stripe, so no concurrent write to the same key can interleave between the check and the append. A failed condition produces no record — there is no optimistic-orphan that a crash could resurrect. + +`put_many` acquires all stripes it touches in sorted-distinct order to prevent deadlocks: two batches that share a subset of stripes always acquire them in the same order. + +### Freeze and drain (`mod.rs:freeze_and_drain` / `WriteGuard`) + +Every write method calls `begin_write()` before appending. `begin_write` checks `frozen` and increments `in_flight_writes`, returning a `WriteGuard` that decrements on drop. The check-and-increment is synchronous (no `.await`), so it is serialized under monoio's single-threaded scheduler with `freeze_and_drain`'s flag-set + counter-poll. + +`freeze_and_drain` sets `frozen = true` (new writes fail immediately with `Frozen`) then polls `in_flight_writes` to zero at 1ms intervals. Once drained, `seal_active_for_shutdown` builds a footer from a consistent snapshot of on-disk state. + ### Record format (`record.rs`) Every record on disk is self-describing: ``` Byte range Field Notes -0..8 crc64-nvme covers bytes 8..end of record +0..8 crc64-nvme covers bytes 8..end of record (not the CRC field itself) 8..16 tstamp_ms monotonic; used for tie-breaking on recovery 16 flags TOMBSTONE=0x01 | NO_EXPIRY=0x02 | TTL_UPDATE=0x04 | VALUE_SEP=0x08 17..25 expires_at_ms 0 when NO_EXPIRY flag set @@ -154,11 +250,7 @@ Byte range Field Notes a 16-byte BLAKE3-128 content hash pointing into the blob store (see Value Separation below); `val_size == 16` in that case. -The CRC covers the entire record body. Any byte-level corruption causes the -record to be skipped: on recovery the active file is truncated to the last clean -record, and on the watch catch-up path (`scan_since` / `scan_file_records`) the -scan of that file stops at the first bad CRC rather than streaming a corrupt -event. +The CRC covers bytes 8..end (tstamp_ms through the end of meta). It does NOT cover the CRC field itself (bytes 0–7). Any byte-level corruption causes the record to be skipped: on recovery the active file is truncated to the last clean record, and on the watch catch-up path (`scan_since` / `scan_file_records`) the scan of that file stops at the first bad CRC rather than streaming a corrupt event. ### Sealed file footer (`file.rs`) @@ -168,27 +260,45 @@ When a file is sealed (by reclaim, rotation, or clean-shutdown seal), a footer i [ FooterEntry × N ][ footer_body_len: u64 ][ crc64: u64 ][ magic: u64 = 0x4259_4F4E_445F_4B58 ] ``` -Each `FooterEntry` carries `key`, `record_offset`, `record_size`, -`expires_at_ms` (optional), `tstamp_ms`, and the optional 16-byte value-sep hash -— enough to rebuild the index, the TTL sidecar, and the blob refcounts without -reading record bodies. The 24-byte trailer is `footer_body_len`, the body CRC, -and the magic. +Each `FooterEntry` wire format: + +``` +key_size(u32) + record_offset(u64) + record_size(u32) + +expires_at_ms(u64) + has_expiry(u8) + tstamp_ms(u64) + +has_valsep(u8) [+ value_hash(16 bytes if has_valsep)] + key_bytes +``` + +This carries enough to rebuild the index, the TTL sidecar, and the blob refcounts without reading record bodies. The 24-byte trailer is `footer_body_len`, the body CRC, and the magic. The magic value (`BYOND_KX` in ASCII — the `X` marks the v3 format that added the per-entry tstamp and value-sep hash) lets recovery distinguish a cleanly sealed file from a crashed active file. If the footer is present and CRC-valid, recovery uses it to populate the index without scanning the full file body. +`data_end_offset()` reads the magic from the last 8 bytes: if present, it returns `total_size - FOOTER_TRAILER_LEN - footer_body_len`, bounding `scan_since` so footer bytes are never misread as records. + ### In-memory index and TTL sidecar (`index.rs`) -`NsIndex` stores all live keys with 16-byte `IndexEntry` values. Keys with a TTL also appear in a secondary `FxHashMap` (expires_at_ms). Only keys that carry a TTL pay the extra allocation. Reads check expiry inline and return `None` for stale entries; the caller is responsible for appending the tombstone (lazy deletion). +`NsIndex` stores all live keys with 24-byte `IndexEntry` values in a `BTreeMap` (ordered for SCAN cursor semantics). Keys with a TTL also appear in a secondary `FxHashMap` (expires_at_ms). Keys with a value-separated large value appear in a third `FxHashMap` (valsep sidecar). Only keys that carry a TTL or large value pay the extra allocation. -`scan()` implements Redis SCAN semantics: a cursor encodes the current position in the key set; each call returns up to `count` live, non-expired keys matching an optional glob pattern. +Reads check expiry inline and return `None` for stale entries; the caller is responsible for appending the tombstone (lazy deletion). + +`live_count` tracks live key count with saturating arithmetic — incremented on insert, decremented on remove. An overwrite does not increment it. Matches Redis `DBSIZE` semantics (may overcount by lazy-expired-but-not-yet-tombstoned keys). + +`scan()` implements Redis SCAN semantics: a cursor encodes the current position in the key set as the last yielded key (exclusive lower bound via `BTreeMap::range`); each call returns up to `count` live, non-expired keys matching an optional filter. Cursor stability: a key-based cursor is stable across concurrent map mutations. ### Reclaim atomicity (`reclaim.rs`) The compaction rename (`data-{id}.log.tmp` → `data-{id}.log`) is the only atomic step. If the process crashes before the rename, the `.tmp` file is abandoned and recovery ignores it. If the crash happens after the rename but before old files are unlinked, the old sealed files remain; the next reclaim will skip them because the index no longer references their entries. Dead files produce a log warning, not an error. +Opening the merged file happens before mutating in-memory state. If `open_ro` fails (EMFILE, hardware error), the function returns with the index and sealed map untouched — old file descriptors remain open and keep serving reads even though `reclaim_namespace` already unlinked the paths. Keys stay accessible until restart. + +Writes stall during reclaim (not error): every write calls `await_reclaim()` before `begin_write()`, polling `reclaim_in_progress` at 500µs intervals. This keeps the write error surface clean while reclaim runs. + +### File rotation (`mod.rs:rotate_active`) + +After every write, if `active.write_offset() >= config.rotate_threshold`, the active file is rotated: footer written, active inserted into the sealed map as level 0, new `data-{next_id}.log` opened, `sync_dir` called. Rotation is guarded by `rotate_in_progress: Cell` to prevent two concurrent writers from both entering `rotate_active` after each observes the threshold crossed. + ### Value separation (`value_store.rs`) Values `>= config.value_sep_threshold` (default 128 KiB = one GlideFS block) are @@ -213,6 +323,15 @@ Blobs are: On read, the blob is fetched by hash and re-hashed to verify integrity — parity with the CRC the inline path pays on every read. +### Buffer pools (`file.rs`) + +Two thread-local pools recycle `Vec` buffers to avoid per-I/O heap allocation: + +- **Read pool** (`BUF_POOL`): exact-capacity match only. monoio passes `capacity` (not `len`) to io_uring as the read size via `bytes_total()`; a buffer with `cap > size` would let the kernel read past the requested bytes and corrupt the `len` field, breaking CRC checks. `pool_acquire` finds an exact-capacity match or allocates a fresh buffer. `BufGuard` returns the buffer to the pool on drop. +- **Write pool** (`WRITE_BUF_POOL`): at-least-capacity match (write size is known upfront). Buffers up to 64 KiB are pooled; larger ones (e.g. value-sep compaction) are discarded. + +Both pools hold at most 32 buffers. The pools are thread-local because `NamespaceLog` is `!Sync` and lives on a single monoio worker. + ## State Machine ``` @@ -230,12 +349,14 @@ APPENDED SEALING ──► COMPACTING ────┘ in index) sync) drop old files) ``` -| Phase | Files on disk | Index state | Writable? | -| ------------------- | --------------------------- | --------------- | -------------------------- | -| OPEN | active + 0..N sealed | fully populated | yes | -| SEALING | active being sealed | unchanged | no — caller must serialize | -| COMPACTING | sealed + .tmp | unchanged | no — caller must serialize | -| OPEN (post-reclaim) | 1 new sealed + fresh active | unchanged | yes | +| Phase | Files on disk | Index state | Writable? | +| ------------------- | --------------------------- | --------------- | ------------------------------------------------ | +| OPEN | active + 0..N sealed | fully populated | yes | +| SEALING | active being sealed | unchanged | stalled (await_reclaim) — not errored | +| COMPACTING | sealed + .tmp | unchanged | stalled (await_reclaim) | +| OPEN (post-reclaim) | 1 new sealed + fresh active | unchanged | yes | +| FROZEN | active (unmodified) | fully populated | no — Err(Frozen); freeze_and_drain drains writes | +| SEALED (shutdown) | footer appended to active | fully populated | no — Err(Frozen) | ## Why It Behaves This Way @@ -247,6 +368,8 @@ Reclaim cannot run concurrently with writes — it seals the active file, which Updating a TTL naively requires rewriting the full value (key + value + metadata). For large values this is expensive. A `TTL_UPDATE` record is a fixed-size append that contains only the key and the new expiry. On recovery, TTL-update records are replayed as index patches: they update only the expiry in `NsIndex` and leave the value record untouched. This makes `EXPIRE` O(record-header + key) on the write path. +`scan_since` skips `TTL_UPDATE` records — TTL changes are not watch events; only value mutations and deletions are. + ### Why the index is always in RAM The index is not persisted separately — it is rebuilt from the log on startup. This eliminates index/log consistency bugs (there is only one source of truth) and avoids write amplification (index updates are free on the write path). The tradeoff is that startup time scales with the number of records if footer loading fails; the footer format was added to make the common case fast (load footer → populate index in one pass, no record parsing). @@ -255,20 +378,34 @@ The index is not persisted separately — it is rebuilt from the log on startup. The engine runs on a single-threaded `monoio` runtime per shard. There is no cross-thread contention on `write_offset`. `Cell` is cheaper than an atomic (no memory barriers) and makes the single-threaded contract explicit. If the concurrency model changes, `Cell` will produce a compile error at the call sites. +### Why `put_many` acquires stripes in sorted order + +`put_many` touches potentially many per-key stripes at once. If two concurrent `put_many` batches each acquire their stripes in arbitrary order, they can deadlock: batch A holds stripe 3 and waits for stripe 7; batch B holds stripe 7 and waits for stripe 3. Sorting and deduplicating the stripe indices before acquiring ensures a total order across all lock operations, breaking the cycle. + +### Why a failed `open_ro` in reclaim does not corrupt in-memory state + +The index is mutated only after the merged file is open and ready to serve reads. If `open_ro` fails (e.g., EMFILE), the function returns an error and the index still references the old file IDs — whose `Rc` handles remain open. On Linux, unlinked files stay alive as long as there is an open descriptor; reads against those keys continue to succeed until restart. Mutating the index before the `open_ro` would leave entries pointing at a `file_id` absent from `sealed`, causing "file_id not found" on every affected read. + +### Why `append` poisons the file on write failure + +When an I/O error occurs, the write offset has already been incremented. A later write at the now-advanced offset would succeed, landing its record past the torn slot. On recovery, the scan would reach the later record (valid CRC), trust it, and discard everything between the torn slot and it — silently losing committed records. Poisoning prevents any subsequent write from advancing past the known-bad gap. + ## Failure Modes -| Failure | What Actually Happens | Recovery | -| --------------------------------------------- | ------------------------------------------- | --------------------------------------------------------------------------------------------------------------- | -| Crash mid-append (active file) | Partial record at tail of active file | Recovery replays records; stops and truncates at first bad CRC | -| Crash mid-reclaim before rename | `.tmp` file left on disk | Ignored on next open (no `.log` suffix); old sealed files intact | -| Crash mid-reclaim after rename | Old sealed files not unlinked | Next reclaim drops them; logged as warnings | -| Sealed file footer corrupt | Footer CRC check fails | Falls back to full sequential record scan | -| Read from expired key | Returns `None`; tombstone appended lazily | Tombstone write is best-effort; a crash before it completes means the key re-expires on next read | -| `flush()` called accidentally | All namespace files unlinked and recreated | Data is gone; no recovery — `flush()` is a destructive reset | -| Clean shutdown (SIGTERM/SIGINT) | Footer written to active file before exit | Next startup treats it as sealed; no record replay needed | -| Crash after blob write, before pointer record | Orphan blob on disk, no referencing key | `sweep_orphans` unlinks it on next open (refcounts rebuilt from the live index first) | -| Corrupt record on watch replay | CRC mismatch in `scan_file_records` | Scan of that file stops at the bad record; no bogus event is streamed | -| `open_ro` of merged file fails mid-reclaim | Merged file on disk, in-memory swap aborted | Index/sealed left untouched; old (unlinked-but-open) fds keep serving reads until restart finds the merged file | +| Failure | What Actually Happens | Recovery | +| --------------------------------------------- | --------------------------------------------- | --------------------------------------------------------------------------------------------------------------- | +| Crash mid-append (active file) | Partial record at tail of active file | Recovery replays records; stops and truncates at first bad CRC | +| Crash mid-reclaim before rename | `.tmp` file left on disk | Ignored on next open (no `.log` suffix); old sealed files intact | +| Crash mid-reclaim after rename | Old sealed files not unlinked | Next reclaim drops them; logged as warnings | +| Sealed file footer corrupt | Footer CRC check fails | Falls back to full sequential record scan | +| Read from expired key | Returns `None`; tombstone appended lazily | Tombstone write is best-effort; a crash before it completes means the key re-expires on next read | +| `flush()` called accidentally | All namespace files unlinked and recreated | Data is gone; no recovery — `flush()` is a destructive reset | +| Clean shutdown (SIGTERM/SIGINT) | Footer written to active file before exit | Next startup treats it as sealed; no record replay needed | +| Crash after blob write, before pointer record | Orphan blob on disk, no referencing key | `sweep_orphans` unlinks it on next open (refcounts rebuilt from the live index first) | +| Corrupt record on watch replay | CRC mismatch in `scan_file_records` | Scan of that file stops at the bad record; no bogus event is streamed | +| `open_ro` of merged file fails mid-reclaim | Merged file on disk, in-memory swap aborted | Index/sealed left untouched; old (unlinked-but-open) fds keep serving reads until restart finds the merged file | +| Append I/O error (disk full, hardware fault) | LogFile poisoned; all subsequent appends fail | Process should restart; recovery truncates at last clean record on next open | +| `freeze_and_drain` + crash before footer | No footer; active file treated as crashed | Recovery replays records up to first bad CRC | ## Configuration @@ -282,3 +419,15 @@ The engine runs on a single-threaded `monoio` runtime per shard. There is no cro `KV_COMPACTION_FANOUT` and `KV_VALUE_SEP_THRESHOLD` env vars override `fanout` and `value_sep_threshold` at `ShardStore::open` (fanout is clamped to `>= 2`). + +## Package Structure + +| File | What It Does | +| ------------ | --------------------------------------------------------------------------------------------------------------------------------- | +| `mod.rs` | `NamespaceLog`: write/read/tombstone/TTL-update API; write stripe locking; freeze/drain; rotation; reclaim dispatch; watch replay | +| `index.rs` | `NsIndex`: BTreeMap + TTL sidecar + valsep sidecar; cursor-based SCAN; `IndexEntry` (24 bytes) | +| `file.rs` | `LogFile`: positioned I/O via io_uring; offset reservation; poisoning; footer read/write; buffer pools | +| `record.rs` | On-disk record format: CRC64-NVME encode/decode; flag constants; header parsing | +| `reclaim.rs` | `reclaim_namespace`: reads live records concurrently, writes merged file, renames atomically | +| `recover.rs` | `open_namespace`: footer-fast or record-scan recovery; determines active vs sealed | +| `config.rs` | `LogConfig`: `rotate_threshold`, `fanout`, `value_sep_threshold` | diff --git a/mise.toml b/mise.toml index 198809e..eb42b38 100644 --- a/mise.toml +++ b/mise.toml @@ -42,10 +42,13 @@ run = "npm ci" dir = "sdk/ts" [tasks."test:unit:rs"] -run = "cargo test --lib" +# Serialized (--test-threads=1): each test builds its own monoio io_uring +# runtime; running many in parallel exhausts the CI runner's lockable memory and +# io_uring_setup fails with ENOMEM. One ring at a time keeps memory bounded. +run = "cargo test --lib -- --test-threads=1" [tasks."test:integration:rs"] -run = "cargo test --test cross_shard --test http --test resp --test tls" +run = "cargo test --test cross_shard --test http --test resp --test tls -- --test-threads=1" [tasks."test:handoff:rs"] description = "End-to-end handoff tests (real beyond-kv subprocesses, listener-FD inheritance, flock dance, data persistence across binary swap). Serial because each test spawns 2+ KV processes with their own ephemeral ports."