diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 98b0dfa..3dddaea 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -69,7 +69,10 @@ watch_all_from(cursor, tx) | `KvUpdate` | One watch event: `Put`, `Delete`, or `Purge` | Not a read result; carries deletes too | | `Snapshot` | Deduplicated KV state + cursor persisted to disk | Not the source of truth; a cache of NATS | | `SnapshotWriter` | Append-only log of `KvUpdate`s; no in-memory state beyond a counter | Not the in-memory cache itself | -| `watch_applied` | Combinator: batch → apply → *then* advance cursor/checkpoint | Not a raw watch; the cursor follows `apply`, not receipt | +| `SnapshotStore` | Trait: the durable-fold contract — atomic `apply(batch, cursor)`, `load`, `get`, `range` | Not a serving index; stops at fold + cursor + query | +| `AppendLogSnapshot` | Default `SnapshotStore`: append-only log + in-RAM fold (pure-Rust) | Not for folds larger than RAM | +| `FjallSnapshot` | On-disk `SnapshotStore` (fjall LSM, `feature = "fjall"`) for large folds | Not in the pure-Rust core; opt-in feature | +| `watch_applied` | Combinator: batch → apply → *then* advance cursor / fold into `SnapshotStore` | Not a raw watch; the cursor follows `apply`, not receipt | | `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, …) | Not enforced; purely advisory | ## Layer Architecture @@ -91,7 +94,8 @@ watch_all_from(cursor, tx) └─────────────────────────────────────────────────────────────┘ snapshot.rs (orthogonal, optional) ┌─────────────────────────────────────────────────────────────┐ -│ SnapshotWriter │ load() │ compact_to_file() │ +│ SnapshotStore trait: apply(batch, cursor) │ load │ get │ range +│ AppendLogSnapshot (default, in-RAM) FjallSnapshot (feat) │ │ (append-only CRC log, tempfile+rename compact) │ └─────────────────────────────────────────────────────────────┘ applied.rs (combinator over KvWatcher + snapshot) @@ -182,6 +186,32 @@ Every NATS operation is wrapped in `timed()` (30 s). Without it, a CLOSE_WAIT co ## Snapshot Subsystem +### The durable-fold trait + +The durable fold is a trait, `SnapshotStore`, so the backend is pluggable while the contract is fixed: + +```rust +fn load(path) -> (WatchCursor, Self); // resume position + store +fn apply(&mut self, batch, cursor); // fold data AND advance cursor, atomically +fn get(key) -> Option; // point query +fn range(prefix) -> Vec; // ordered prefix scan +``` + +Three invariants bind every implementation: + +- **Pure function of the log.** Delete the store, replay every update with revision `> cursor`, and the state is identical. The store caches the fold; NATS is the source of truth. +- **Cursor-after-apply.** `apply` makes data and cursor durable together, so the cursor never names a revision whose data is absent — one transaction on a transactional backend, data-then-cursor on the append log (a torn write leaves data *ahead* of the cursor, which replay re-folds, never skips). +- **Snapshot is a cache.** A tail lost to power loss (under a no-sync durability mode) is rebuilt by resuming the watch from the recovered cursor. + +`watch_applied` is generic over `SnapshotStore`: on each flush, after `apply` returns, it hands the raw batch + post-apply cursor to `store.apply(...)` on a blocking task. The trait stops at fold + cursor + query; serving structures built from the fold (routing rings, hashrings) live in the consumer, which reads them out via `get`/`range`. + +| Backend | Module | State | Durability | +| ------- | ------ | ----- | ---------- | +| `AppendLogSnapshot` (default) | `snapshot.rs` | Append-only CRC log + in-RAM `HashMap` fold | `checkpoint` flush (page cache); `fsync` only at `compact` | +| `FjallSnapshot` (`feature = "fjall"`) | `snapshot_fjall.rs` | On-disk fjall LSM (`data` + `meta` partitions) | One atomic batch per `apply` (data + cursor); per-commit `fsync` configurable (NO_SYNC default) | + +`FjallSnapshot` keeps the cursor in the same fjall `Batch` as the data it names, so under NO_SYNC a crash can lose the un-synced tail but never desynchronize cursor from data — on reopen the recovered cursor is consistent and the watch re-folds the tail. The rest of this section describes the **append-log backend** (the default), whose on-disk format is below. + ### File Format ``` @@ -339,8 +369,9 @@ Checkpoints are frequent (every N watch events). An fsync per checkpoint would a | `src/kv.rs` | Core traits (`KvReader`, `KvWriter`, `KvWatcher`, `KvTtl`) and types (`KvEntry`, `KvUpdate`, `VersionToken`, `WatchCursor`, `KvError`) | | `src/stores.rs` | `Connection`, `KvStore`, `StoreConfig`, `StorageType`, `ConnectionCapabilities` | | `src/nats.rs` | NATS JetStream implementation; bucket creation, scan consumer lifecycle, timeout wrapping, Synadia Cloud workarounds | -| `src/snapshot.rs` | Append-only snapshot log: `SnapshotWriter`, `load()`, `replay_log()`, `compact_to_file()` | -| `src/applied.rs` | `watch_applied` cursor-after-apply combinator: `WatchScope`, `BatchConfig` | +| `src/snapshot.rs` | `SnapshotStore` trait; append-only log + `AppendLogSnapshot` (default backend): `SnapshotWriter`, `load()`, `replay_log()`, `compact_to_file()` | +| `src/snapshot_fjall.rs` | `FjallSnapshot`: on-disk `SnapshotStore` backed by fjall (`feature = "fjall"`) | +| `src/applied.rs` | `watch_applied` cursor-after-apply combinator, generic over `SnapshotStore`: `WatchScope`, `BatchConfig` | | `src/lib.rs` | Re-exports all public types; no logic | | `benches/` | Criterion benchmarks for snapshot write/checkpoint/load throughput and batch throughput | | `tests/` | Integration tests (require live NATS) | diff --git a/Cargo.lock b/Cargo.lock index 9fcd897..c3efa8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,13 +97,14 @@ checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" [[package]] name = "beyond-slipstream" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-nats", "async-trait", "base64", "crc32fast", "criterion", + "fjall", "futures", "serde_json", "tempfile", @@ -134,6 +135,12 @@ version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" +[[package]] +name = "byteorder-lite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" + [[package]] name = "bytes" version = "1.11.1" @@ -143,6 +150,12 @@ dependencies = [ "serde", ] +[[package]] +name = "byteview" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c53ba0f290bfc610084c05582d9c5d421662128fc69f4bf236707af6fd321b9" + [[package]] name = "cast" version = "0.3.0" @@ -217,6 +230,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" +[[package]] +name = "compare" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" + [[package]] name = "const-oid" version = "0.9.6" @@ -312,6 +331,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -360,6 +389,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.11.0" @@ -436,6 +479,18 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -449,7 +504,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -470,6 +525,32 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "fjall" +version = "3.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62b25b4d815ae178d7d9e4aa32ee59f072efd5431c736abede1e6ee13c8c453" +dependencies = [ + "byteorder-lite", + "byteview", + "dashmap", + "flume", + "log", + "lsm-tree", + "lz4_flex", + "tempfile", + "xxhash-rust", +] + +[[package]] +name = "flume" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" +dependencies = [ + "spin", +] + [[package]] name = "foldhash" version = "0.1.5" @@ -618,6 +699,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -627,6 +714,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + [[package]] name = "hashbrown" version = "0.17.1" @@ -781,6 +874,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "interval-heap" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" +dependencies = [ + "compare", +] + [[package]] name = "is-terminal" version = "0.4.17" @@ -789,7 +891,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -843,12 +945,52 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "616ec5685824bcc94416c6d4a7a446eea774a31efd7062c8480ba6fd06d7a6e5" +[[package]] +name = "lsm-tree" +version = "3.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e447ac67ff6aef4ec07fc19e507b219336cbba90a697c0dbeb1bf51b91536b67" +dependencies = [ + "byteorder-lite", + "byteview", + "crossbeam-skiplist", + "enum_dispatch", + "interval-heap", + "log", + "lz4_flex", + "quick_cache", + "rustc-hash", + "self_cell", + "sfa", + "tempfile", + "varint-rs", + "xxhash-rust", +] + +[[package]] +name = "lz4_flex" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef0d4ed8669f8f8826eb00dc878084aa8f253506c4fd5e8f58f5bce72ddb97e" +dependencies = [ + "twox-hash", +] + [[package]] name = "memchr" version = "2.8.1" @@ -923,6 +1065,19 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -1051,6 +1206,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick_cache" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a3db184a8b66cfe87f0263a1de147a6b554c864d1767c6f7fa4eb0e5497b565" +dependencies = [ + "equivalent", + "hashbrown 0.16.1", +] + [[package]] name = "quote" version = "1.0.45" @@ -1116,6 +1281,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.12.3" @@ -1159,6 +1333,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustc-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" + [[package]] name = "rustc_version" version = "0.4.1" @@ -1178,7 +1358,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1271,6 +1451,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "2.11.1" @@ -1294,6 +1480,12 @@ dependencies = [ "libc", ] +[[package]] +name = "self_cell" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" + [[package]] name = "semver" version = "1.0.28" @@ -1363,6 +1555,17 @@ dependencies = [ "syn", ] +[[package]] +name = "sfa" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" +dependencies = [ + "byteorder-lite", + "log", + "xxhash-rust", +] + [[package]] name = "sha2" version = "0.10.9" @@ -1424,6 +1627,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" @@ -1478,7 +1690,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1694,6 +1906,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" + [[package]] name = "typenum" version = "1.20.1" @@ -1736,6 +1954,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "varint-rs" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa6c38708f6257f1ec2ca7e5a11f9bbf58a27d7060078b6b333624968183d96" + [[package]] name = "version_check" version = "0.9.5" @@ -1889,7 +2113,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2080,6 +2304,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yoke" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 0f631c9..cf40dde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ rust-version = "1.92" [package] name = "beyond-slipstream" -version = "0.2.0" +version = "0.3.0" edition.workspace = true license.workspace = true rust-version.workspace = true @@ -23,6 +23,7 @@ async-nats = "0.46" async-trait = "0.1" base64 = "0.22" crc32fast = "1" +fjall = { version = "3", optional = true } futures = "0.3" serde_json = "1" tempfile = "3" @@ -46,3 +47,6 @@ harness = false [[bench]] name = "applied" harness = false + +[features] +fjall = ["dep:fjall"] diff --git a/README.md b/README.md index cc6d95c..b042437 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,12 @@ Trait-based KV abstraction over NATS JetStream. Read, write, and watch distribut beyond-slipstream = "0.2" ``` +The crate core is pure-Rust. On-disk snapshot backends are opt-in cargo features (no C toolchain is pulled unless you enable one): + +```toml +beyond-slipstream = { version = "0.2", features = ["fjall"] } # on-disk SnapshotStore for large folds +``` + ## Concepts | Term | What It Is | @@ -22,7 +28,10 @@ beyond-slipstream = "0.2" | `KvUpdate` | One watch event: `Put`, `Delete`, or `Purge` | | `Snapshot` | Deduplicated KV state + cursor at a point in time. Disk cache, not source of truth | | `SnapshotWriter` | Append-only log of `KvUpdate`s; survives restarts without a full NATS scan | -| `watch_applied` | Watch loop that advances the cursor only after your `apply` returns | +| `SnapshotStore` | Trait: the durable-fold contract — `apply` (data + cursor, atomically), `load`, `get`, `range` | +| `AppendLogSnapshot` | Default `SnapshotStore`: the append-only log + an in-RAM fold (pure-Rust, small state) | +| `FjallSnapshot` | On-disk `SnapshotStore` for folds too large for RAM; queryable (`feature = "fjall"`) | +| `watch_applied` | Watch loop that advances the cursor only after your `apply` returns, folding into any `SnapshotStore` | | `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, global ordering) | ## Usage @@ -207,18 +216,57 @@ Record: crc32:u32le ++ type:u8 ++ payload A truncated final record (crash mid-write) is discarded; earlier records are intact. A CRC failure mid-file returns `SnapshotError::Corrupted`. +### Pluggable backends + +The durable fold is a trait, [`SnapshotStore`], so a consumer picks where its fold lives. The contract is small — apply a batch and advance the cursor *atomically*, resume from the cursor on restart, and query the result: + +```rust +pub trait SnapshotStore: Sized + Send { + fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>; + fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>; + fn get(&self, key: &str) -> Result, SnapshotError>; + fn range(&self, prefix: &str) -> Result, SnapshotError>; +} +``` + +Every backend keeps the same invariants: the fold is a pure function of the log (delete the store, replay from the cursor, get identical state), the cursor never names a revision whose data isn't durable (cursor-after-apply), and the store is a cache — a tail lost to power loss is rebuilt by resuming the watch. + +| Backend | When | Notes | +| ------- | ---- | ----- | +| `AppendLogSnapshot` | **Default.** Fold fits in RAM (edge/tunnel-style services) | Pure-Rust, the append-only log above plus an in-RAM map serving `get`/`range`. No extra dependencies. | +| `FjallSnapshot` | Fold too large for RAM (e.g. routing at ~1B keys) | On-disk [fjall](https://docs.rs/fjall) LSM, `feature = "fjall"`. Each `apply` is one atomic batch (data **and** cursor); durability (NO_SYNC vs fsync) is configurable. | + +Pick a backend, then hand it to [`watch_applied`](#applied-watch) — `load` returns the resume cursor alongside the store: + +```rust +use slipstream::{AppendLogSnapshot, SnapshotStore}; + +// Default in-RAM backend: +let (resume, store) = AppendLogSnapshot::load(Path::new("/var/lib/svc/state.snap"))?; + +// Or, behind `feature = "fjall"`, an on-disk fold for a large consumer: +// let (resume, store) = FjallSnapshot::open(dir, FjallConfig { sync: false })?; + +let final_cursor = watch_applied( + watcher, WatchScope::All, Some(resume), Some(store), BatchConfig::default(), + parse, apply, on_applied, shutdown, +).await?; +``` + +The trait stops at *durable fold + cursor + query*. Serving structures built from the fold (routing rings, hashrings, indexes) live in the consumer — query them out of the store with `get`/`range`. A consumer with a different engine can implement `SnapshotStore` itself; the rest of slipstream is unchanged. + ## Applied watch -`watch_applied` drives the watch-batch-apply-checkpoint loop and enforces one rule the hand-rolled version can't: the cursor advances only after your `apply` returns, never on receipt. +`watch_applied` drives the watch-batch-apply-checkpoint loop and enforces one rule the hand-rolled version can't: the cursor advances only after your `apply` returns, never on receipt. It is generic over the [`SnapshotStore`](#pluggable-backends) backend, so the consumer chooses where the durable fold lives (or `None` to run without persistence). ```rust -use slipstream::{watch_applied, BatchConfig, KvUpdate, WatchCursor, WatchScope}; +use slipstream::{watch_applied, AppendLogSnapshot, BatchConfig, KvUpdate, WatchCursor, WatchScope}; let final_cursor = watch_applied( watcher, WatchScope::All, // or WatchScope::Prefix("node.".into()) - load_cursor(), // Option — resume here, or None - Some(snapshot_writer), // checkpoints at the applied cursor, or None + Some(resume), // Option — resume here, or None + Some(store), // any SnapshotStore (e.g. AppendLogSnapshot), or None BatchConfig::default(), // 10ms window, 100 updates per batch |update: &KvUpdate| parse(update), // KvUpdate -> Option; None just drops it |batch: Vec| cache.apply_batch(batch), // your only domain logic @@ -227,7 +275,7 @@ let final_cursor = watch_applied( ).await?; ``` -A batch closes when `window` elapses or it hits `max` updates, whichever comes first. Then, in order: `apply(batch)` runs to completion, the cursor advances to the batch's highest revision, the snapshot checkpoints at that cursor, and `on_applied` fires. +A batch closes when `window` elapses or it hits `max` updates, whichever comes first. Then, in order: `apply(batch)` runs to completion, the cursor advances to the batch's highest revision, the batch + cursor are folded into the `store` atomically (on a blocking task), and `on_applied` fires. Persist the cursor on receipt instead and a crash between receive and apply loses data: the cursor reads "caught up to rev N" while rev N sits in an unapplied buffer, and the next resume starts past it. `watch_applied` checkpoints at the applied cursor, so a persisted cursor always means every update up to it has been applied. diff --git a/benches/applied.rs b/benches/applied.rs index 7aadd62..63e26c7 100644 --- a/benches/applied.rs +++ b/benches/applied.rs @@ -81,7 +81,7 @@ fn bench_batch_throughput(c: &mut Criterion) { watcher as Arc, WatchScope::All, None, - None, + None::, BatchConfig { window: Duration::from_millis(10), max: 100, diff --git a/src/applied.rs b/src/applied.rs index 007828a..026d3c8 100644 --- a/src/applied.rs +++ b/src/applied.rs @@ -58,7 +58,7 @@ use tokio::sync::watch; use tracing::warn; use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor}; -use crate::snapshot::SnapshotWriter; +use crate::snapshot::SnapshotStore; /// What to watch: every key, or every key under a prefix. /// @@ -102,15 +102,18 @@ impl Default for BatchConfig { /// /// Subscribes per `scope` (resuming from `resume` when it carries a position), /// batches updates per `config`, applies each batch via `apply`, and only then -/// advances the cursor / checkpoints `snapshot` / calls `on_applied`. Returns -/// the final applied cursor when the watch ends (shutdown signalled, or the -/// underlying stream closed). +/// advances the cursor / folds the batch into `store` / calls `on_applied`. +/// Returns the final applied cursor when the watch ends (shutdown signalled, or +/// the underlying stream closed). /// -/// Raw [`KvUpdate`]s are streamed to `snapshot` as they arrive, but the -/// *checkpoint* cursor written on each flush is the post-apply cursor — so a -/// loaded snapshot's cursor is always consistent with the state it carries -/// (the cursor never names a revision whose `apply` had not returned before the -/// checkpoint). +/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM +/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or +/// its own impl) — or `None` to run without persistence. On each flush, *after* +/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to +/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's +/// persisted cursor is always the post-apply cursor and never names a revision +/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a +/// crash leaves the store consistent and resume re-folds only the tail. /// /// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and /// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see @@ -127,15 +130,15 @@ impl Default for BatchConfig { // type and is monomorphized at the call site. Folding them into a builder struct // would either box the closures or force a single generic bundle, losing that. #[allow(clippy::too_many_arguments)] -// The flush macro resets `batch_high`/`batch_deadline`/`snapshot` for the next -// loop iteration. At the two flush sites that return immediately afterward -// (shutdown, channel-close) those resets are dead stores — correct, but flagged. +// The flush macro resets `batch_high`/`batch_deadline` for the next loop +// iteration. At the two flush sites that return immediately afterward (shutdown, +// channel-close) those resets are dead stores — correct, but flagged. #[allow(unused_assignments)] -pub async fn watch_applied( +pub async fn watch_applied( watcher: Arc, scope: WatchScope, resume: Option, - mut snapshot: Option, + mut store: Option, config: BatchConfig, mut parse: P, mut apply: A, @@ -144,6 +147,10 @@ pub async fn watch_applied( ) -> Result where U: Send, + // `Send + 'static`: each flush moves `store` onto a blocking task to run its + // (potentially blocking) `apply`, then takes it back — the same offload the + // append log's compaction always used. + S: SnapshotStore + Send + 'static, P: FnMut(&KvUpdate) -> Option + Send, A: FnMut(Vec) + Send, O: FnMut(WatchCursor) + Send, @@ -175,21 +182,27 @@ where // "nothing to apply", hence covered. Reset to `none()` after every flush. let batch_cap = config.max.clamp(1, 64); let mut batch: Vec = Vec::with_capacity(batch_cap); + // Raw received updates for the durable `store`, in revision order. Only + // populated when a `store` is present; the store folds the *raw* updates + // (including ones `parse` rejected — they are still part of the bucket's + // state), whereas the parsed `batch` above is the consumer's domain view. + let mut raw_batch: Vec = Vec::new(); let mut batch_high = WatchCursor::none(); // `Some` once a batch has opened and the window timer is armed; `None` // between flushes. Only the armed/idle distinction is read in the loop — // the absolute instant lives in the pinned `sleep` future below. let mut batch_deadline: Option = None; - // Flush the current batch: apply (if non-empty), then advance the cursor, - // checkpoint the snapshot at that cursor, and fire `on_applied` — in that - // order. Returns whether the snapshot grew past its compaction threshold. - // Defined as a macro so it can mutate the locals above and `.await` nothing - // itself; compaction (which does block) is handled by the caller via - // `flush_and_compact!`. - macro_rules! flush_inner { + // Flush the current batch, in order: run the domain `apply` (if non-empty) to + // completion, advance the cursor, fold the raw batch + cursor durably into + // `store`, then fire `on_applied`. The store fold runs on a blocking task + // (its `apply` may block on I/O), moving the store in and taking it back — the + // same offload the append log's compaction always used. A store error is + // logged and the watch continues (the snapshot is a cache); a panicked + // blocking task drops the store irrecoverably, which breaks the + // resume-after-restart guarantee, so it is surfaced as fatal. + macro_rules! flush { () => {{ - let mut needs_compact = false; // Nothing received since the last flush → nothing to do at all. if !batch.is_empty() || !batch_high.is_none() { if !batch.is_empty() { @@ -197,67 +210,44 @@ where // advance below. Move the batch out so a panicking apply // can't leave half-consumed state behind. // - // `replace` (not `take`) leaves a pre-sized Vec behind: - // `take` swaps in a zero-capacity `Vec::new()`, so every - // batch after the first re-climbs the reallocation ladder - // (4→8→…→cap). Handing back a `with_capacity` Vec keeps the - // amortized allocation to one per batch. + // `replace` (not `take`) leaves a pre-sized Vec behind so each + // batch after the first doesn't re-climb the reallocation + // ladder (4→8→…→cap). apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap))); } if !batch_high.is_none() { applied = batch_high.clone(); - if let Some(sw) = snapshot.as_mut() { - match sw.checkpoint(&applied) { - Ok(true) => needs_compact = true, - Ok(false) => {} - Err(e) => warn!(error = %e, "snapshot checkpoint failed"), + if let Some(mut st) = store.take() { + let raw = std::mem::take(&mut raw_batch); + let cur = applied.clone(); + // Hand the store back unconditionally on a clean return so + // a *failed* apply (Ok(Err)) keeps the watch running; only + // a *panicked* task (Err) loses the store and is fatal. + match tokio::task::spawn_blocking(move || { + let res = st.apply(&raw, &cur); + (st, res) + }) + .await + { + Ok((st, Ok(()))) => store = Some(st), + Ok((st, Err(e))) => { + warn!(error = %e, "snapshot store apply failed; continuing"); + store = Some(st); + } + Err(e) => { + warn!(error = %e, "snapshot store task panicked; aborting watch"); + handle.abort(); + return Err(KvError::WatchError(format!( + "snapshot store task panicked: {e}" + ))); + } } } on_applied(applied.clone()); + batch_high = WatchCursor::none(); } - batch_high = WatchCursor::none(); } batch_deadline = None; - needs_compact - }}; - } - - // Flush, then run compaction off the hot path if the log grew too large. - // Compaction reads and rewrites the whole snapshot file, so it must not run - // on the async reactor thread — `spawn_blocking` moves it to the blocking - // pool and we reclaim the writer afterward. - macro_rules! flush_and_compact { - () => {{ - if flush_inner!() - && let Some(mut sw) = snapshot.take() - { - // Return the writer to the closure unconditionally so a *failed* - // compaction (Ok(Err)) still hands the writer back — checkpoints - // continue, only this compaction was skipped. A *panicked* - // blocking task (Err) drops the writer on the blocking thread and - // we can't recover it; rather than silently run the rest of the - // watch without persistence — which breaks the resume-after-restart - // guarantee — we surface it as a fatal error. - match tokio::task::spawn_blocking(move || { - let res = sw.compact(); - (sw, res) - }) - .await - { - Ok((sw, Ok(()))) => snapshot = Some(sw), - Ok((sw, Err(e))) => { - warn!(error = %e, "snapshot compaction failed; continuing without compacting"); - snapshot = Some(sw); - } - Err(e) => { - warn!(error = %e, "snapshot compaction task panicked; aborting watch"); - handle.abort(); - return Err(KvError::WatchError(format!( - "snapshot compaction task panicked: {e}" - ))); - } - } - } }}; } @@ -281,7 +271,7 @@ where // re-delivered on the next resume — and return the applied cursor. res = shutdown.changed() => { if res.is_err() || *shutdown.borrow() { - flush_and_compact!(); + flush!(); handle.abort(); // Observe the task's terminal state. An abort surfaces as a // cancelled JoinError, which we ignore; a genuine panic that @@ -297,7 +287,7 @@ where // Batch window elapsed. () = &mut sleep, if batch_deadline.is_some() => { - flush_and_compact!(); + flush!(); } update = rx.recv() => { @@ -308,15 +298,13 @@ where // keeps it. batch_high = WatchCursor::from_version(u.version().clone()); - // Stream the raw update to the snapshot log as it - // arrives. The durable checkpoint is written later at - // the applied cursor, so a crash here just means the log - // holds data ahead of its cursor — re-applied on resume, - // never skipped. - if let Some(sw) = snapshot.as_mut() - && let Err(e) = sw.write_update(&u) - { - warn!(error = %e, "snapshot write failed"); + // Buffer the raw update for the durable store fold (which + // commits the whole batch + cursor atomically on flush). + // Done before `parse` consumes `u` by reference, and only + // when a store is present so the no-persistence path keeps + // its zero-copy cost. + if store.is_some() { + raw_batch.push(u.clone()); } if let Some(parsed) = parse(&u) { @@ -334,15 +322,19 @@ where batch_deadline = Some(deadline); } - if batch.len() >= config.max { - flush_and_compact!(); + // Flush on a full parsed batch, or — when persisting — a + // full raw batch, so a window packed with parse-rejected + // updates can't grow `raw_batch` without bound before the + // window elapses. + if batch.len() >= config.max || raw_batch.len() >= config.max { + flush!(); } } None => { // Stream closed. Flush the remainder, then surface the // watch task's terminal result: a clean end returns the // applied cursor, an error propagates. - flush_and_compact!(); + flush!(); return match handle.await { Ok(Ok(())) => Ok(applied), Ok(Err(e)) => Err(e), @@ -412,6 +404,7 @@ async fn run_watch( mod tests { use super::*; use crate::kv::{KvEntry, VersionToken}; + use crate::snapshot::AppendLogSnapshot; use async_trait::async_trait; use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; @@ -545,7 +538,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig::default(), parse_put, move |batch| ab.lock().unwrap().push(batch), @@ -579,7 +572,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig::default(), parse_put, move |batch: Vec>| { @@ -621,7 +614,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig { window: Duration::from_secs(3600), // effectively never max, @@ -660,7 +653,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig { window: Duration::from_secs(3600), // window won't fire max: 100, @@ -710,7 +703,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig { window: Duration::from_secs(3600), max, @@ -756,7 +749,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig::default(), // Reject everything — simulates corrupt/irrelevant entries. |_u: &KvUpdate| -> Option> { None }, @@ -799,7 +792,7 @@ mod tests { watcher, WatchScope::All, Some(WatchCursor::from_u64(5)), // resume position that "expired" - None, + None::, BatchConfig::default(), parse_put, move |batch: Vec>| ab.lock().unwrap().extend(batch), @@ -825,7 +818,7 @@ mod tests { async fn snapshot_checkpoint_matches_applied_cursor() { let dir = tempfile::TempDir::new().unwrap(); let path = dir.path().join("applied.snap"); - let writer = SnapshotWriter::open(&path, u64::MAX).unwrap(); + let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap(); let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)]; let watcher = Arc::new(MockWatcher::new(updates, false)); // close after @@ -835,7 +828,7 @@ mod tests { watcher, WatchScope::All, None, - Some(writer), + Some(store), BatchConfig::default(), parse_put, move |_batch: Vec>| {}, @@ -882,7 +875,7 @@ mod tests { watcher, WatchScope::All, Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired - None, + None::, BatchConfig::default(), parse_put, move |batch: Vec>| ab.lock().unwrap().extend(batch), @@ -920,7 +913,7 @@ mod tests { watcher, WatchScope::Prefix("node.".to_string()), None, - None, + None::, BatchConfig::default(), parse_put, move |batch: Vec>| ab.lock().unwrap().extend(batch), @@ -958,7 +951,7 @@ mod tests { watcher, WatchScope::Prefix("node.".to_string()), Some(WatchCursor::from_u64(5)), // resume position that "expired" - None, + None::, BatchConfig::default(), parse_put, move |batch: Vec>| ab.lock().unwrap().extend(batch), @@ -987,7 +980,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig::default(), parse_put, move |_batch: Vec>| {}, @@ -1026,7 +1019,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig::default(), // Keep only keys under "keep."; reject everything else. |u: &KvUpdate| -> Option> { @@ -1072,7 +1065,7 @@ mod tests { watcher, WatchScope::All, None, - None, + None::, BatchConfig::default(), parse_put, move |_batch: Vec>| { @@ -1111,8 +1104,9 @@ mod tests { let dir = tempfile::TempDir::new().unwrap(); let path = dir.path().join("applied.snap"); // threshold 0 → every checkpoint reports "needs compact", forcing the - // spawn_blocking compaction branch on each flush. - let writer = SnapshotWriter::open(&path, 0).unwrap(); + // store's inline-compaction branch on each flush (run off the hot path via + // spawn_blocking inside watch_applied). + let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap(); // Re-put the same key across flushes so compaction has duplicates to // dedup; small max forces multiple flushes (hence multiple compactions). @@ -1129,7 +1123,7 @@ mod tests { watcher, WatchScope::All, None, - Some(writer), + Some(store), BatchConfig { window: Duration::from_secs(3600), max: 1, // one update per flush → a compaction per update diff --git a/src/lib.rs b/src/lib.rs index fcdc236..222317f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,8 @@ mod applied; mod kv; mod nats; pub mod snapshot; +#[cfg(feature = "fjall")] +mod snapshot_fjall; mod stores; pub use applied::{BatchConfig, WatchScope, watch_applied}; @@ -34,4 +36,7 @@ pub use kv::{ KvEntry, KvError, KvReader, KvTtl, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor, }; pub use nats::{NatsConnection, NatsConnectionConfig, nats_connect}; +pub use snapshot::{AppendLogSnapshot, SnapshotStore}; +#[cfg(feature = "fjall")] +pub use snapshot_fjall::{FjallConfig, FjallSnapshot}; pub use stores::{Connection, ConnectionCapabilities, KvStore, StorageType, StoreConfig}; diff --git a/src/snapshot.rs b/src/snapshot.rs index 1e8b1a3..2fa8634 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -73,6 +73,12 @@ pub enum SnapshotError { InvalidFormat(String), #[error("snapshot corrupted (CRC mismatch)")] Corrupted, + /// A pluggable [`SnapshotStore`] backend (e.g. the `fjall` on-disk store) + /// reported an error. Kept backend-agnostic — no backend type leaks into this + /// enum's signature — so enabling a backend feature does not change the public + /// error surface. + #[error("snapshot backend error: {0}")] + Backend(String), } /// Result of loading a snapshot from disk. @@ -282,6 +288,193 @@ pub fn load(path: &Path) -> Result, SnapshotError> { Ok(Some(Snapshot { cursor, entries })) } +// --------------------------------------------------------------------------- +// The durable-fold contract +// --------------------------------------------------------------------------- + +/// A durable, resumable, queryable fold of a KV watch stream. +/// +/// This is the primitive slipstream owns: a place to apply a stream of +/// [`KvUpdate`]s such that, after any restart, the fold resumes from exactly +/// where it left off and stays a faithful function of the log. [`watch_applied`] +/// drives it; consumers pick the backend. +/// +/// [`watch_applied`]: crate::watch_applied +/// +/// ## Invariants every implementation must hold +/// +/// - **Pure function of the log.** Delete the store, replay every update with +/// revision `>` the persisted cursor, and you get byte-identical state. The +/// store caches the fold; it is never the source of truth (that is NATS). +/// - **Cursor-after-apply.** A persisted cursor `C` implies every update with +/// revision `≤ C` is durably folded in. [`apply`](Self::apply) writes data and +/// cursor together so the cursor never names a revision whose data is absent — +/// on a transactional backend in one txn, on the append log data-then-cursor +/// (a torn write leaves data *ahead* of the cursor, which replay re-folds, never +/// skips). +/// - **Snapshot is a cache.** Any tail lost to power loss (under a no-sync +/// durability mode) is rebuilt by resuming the watch from the recovered cursor; +/// never a correctness failure. +/// +/// ## Threading +/// +/// Methods are **synchronous** and may block on I/O — the same runtime-agnostic +/// discipline as the rest of this module. [`watch_applied`] offloads +/// [`apply`](Self::apply) to a blocking task; a consumer calling +/// [`get`](Self::get)/[`range`](Self::range) from an async context should do the +/// same. +pub trait SnapshotStore: Sized + Send { + /// Open (or resume) the store at `path`. + /// + /// Returns the persisted resume cursor — [`WatchCursor::none`] when the store + /// is fresh — and the store ready to [`apply`](Self::apply)/query. Pass the + /// returned cursor to [`watch_applied`](crate::watch_applied) as the resume + /// position. + /// + /// Backends with tuning knobs (compaction threshold, sync mode) expose them + /// on their own constructors; this uses safe defaults. + fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>; + + /// Atomically fold `batch` into the store and advance the resume cursor. + /// + /// Data and cursor become durable together (see the cursor-after-apply + /// invariant). `batch` is the raw, revision-ordered updates received since the + /// last flush — including ones a consumer's `parse` rejected, since they are + /// still part of the bucket's state. `cursor` is the highest revision received + /// in the batch. + fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>; + + /// Look up the live entry for `key`. `None` if absent or deleted. + fn get(&self, key: &str) -> Result, SnapshotError>; + + /// All live entries whose key starts with `prefix`, in ascending key order. + /// + /// Buffers the whole match set into a `Vec`. Convenient for the bounded + /// prefixes an in-RAM consumer scans, but a broad prefix against an on-disk + /// fold (the 1B-route case an on-disk backend exists for) materializes every + /// match at once — use [`for_each_in_range`](Self::for_each_in_range) there. + fn range(&self, prefix: &str) -> Result, SnapshotError>; + + /// Stream every live entry whose key starts with `prefix`, in ascending key + /// order, invoking `f` once per entry — without buffering the whole match + /// set in memory. + /// + /// This is the memory-bounded counterpart to [`range`](Self::range). The + /// reason to pick an on-disk backend is a fold too large for RAM; for such a + /// consumer a broad [`range`](Self::range) defeats the purpose by collecting + /// every match into one `Vec`. `f` returning `Err` stops the scan early and + /// propagates that error. + /// + /// The provided implementation delegates to [`range`](Self::range) — correct + /// for in-RAM backends, where the fold already fits in memory. On-disk + /// backends override it to stream straight from storage. + fn for_each_in_range( + &self, + prefix: &str, + mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>, + ) -> Result<(), SnapshotError> { + for entry in self.range(prefix)? { + f(entry)?; + } + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// AppendLogSnapshot: the default, pure-Rust backend +// --------------------------------------------------------------------------- + +/// Default compaction threshold for [`AppendLogSnapshot::load`]: 10 MiB of +/// appended records before a compaction is triggered. Matches the value the +/// hand-rolled callers used; tune via [`AppendLogSnapshot::open`]. +pub const DEFAULT_COMPACT_THRESHOLD: u64 = 10 * 1024 * 1024; + +/// The append-only CRC log as a [`SnapshotStore`] (the default backend). +/// +/// Wraps the existing [`SnapshotWriter`] + [`load`] machinery — same v2 on-disk +/// format, same CRC framing, same FDB-versionstamp-safe cursor encoding, so files +/// written by either path are mutually loadable. Keeps the whole fold in a +/// [`HashMap`] in RAM to serve [`get`](SnapshotStore::get)/[`range`](SnapshotStore::range), +/// which is exactly what edge/tunnel-style consumers already do — small state, +/// pure Rust. A consumer that cannot hold its fold in RAM wants an on-disk +/// backend instead (e.g. the `fjall` feature). +pub struct AppendLogSnapshot { + writer: SnapshotWriter, + entries: HashMap, + cursor: WatchCursor, +} + +impl AppendLogSnapshot { + /// Open or resume the log at `path` with an explicit compaction threshold. + /// + /// Replays any existing log into the in-RAM fold (and compacts it, exactly as + /// [`load`] does), then opens the writer for append. Returns the resume cursor + /// alongside the store. + pub fn open(path: &Path, compact_threshold: u64) -> Result<(WatchCursor, Self), SnapshotError> { + let (cursor, entries) = match load(path)? { + Some(snap) => (snap.cursor, snap.entries), + None => (WatchCursor::none(), HashMap::new()), + }; + let writer = SnapshotWriter::open(path, compact_threshold)?; + Ok(( + cursor.clone(), + Self { + writer, + entries, + cursor, + }, + )) + } +} + +impl SnapshotStore for AppendLogSnapshot { + fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> { + Self::open(path, DEFAULT_COMPACT_THRESHOLD) + } + + fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> { + // Stream the raw records, then the cursor checkpoint — data-then-cursor, + // so a torn write leaves data ahead of the cursor (re-folded on resume), + // never a cursor ahead of its data. The in-RAM fold mirrors the same + // mutations so get/range stay consistent with what was just durably written. + for update in batch { + self.writer.write_update(update)?; + match update { + KvUpdate::Put(entry) => { + self.entries.insert(entry.key.clone(), entry.clone()); + } + KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => { + self.entries.remove(key); + } + } + } + // checkpoint() flushes the buffered records + the cursor record to the OS; + // it returns true when the log has grown past the compaction threshold, in + // which case we compact inline (this whole call already runs off the hot + // path via spawn_blocking in watch_applied). + if self.writer.checkpoint(cursor)? { + self.writer.compact()?; + } + self.cursor = cursor.clone(); + Ok(()) + } + + fn get(&self, key: &str) -> Result, SnapshotError> { + Ok(self.entries.get(key).cloned()) + } + + fn range(&self, prefix: &str) -> Result, SnapshotError> { + let mut out: Vec = self + .entries + .values() + .filter(|e| e.key.starts_with(prefix)) + .cloned() + .collect(); + out.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + Ok(out) + } +} + // --------------------------------------------------------------------------- // Internal: log replay // --------------------------------------------------------------------------- diff --git a/src/snapshot_fjall.rs b/src/snapshot_fjall.rs new file mode 100644 index 0000000..346b08e --- /dev/null +++ b/src/snapshot_fjall.rs @@ -0,0 +1,383 @@ +//! On-disk [`SnapshotStore`] backed by [fjall](https://docs.rs/fjall) — for +//! consumers whose fold is too large to hold in RAM (e.g. routing at ~1B routes). +//! +//! fjall is a pure-Rust LSM-tree (no C toolchain), so the crate core stays +//! pure-Rust and this backend is opt-in behind `feature = "fjall"`. The same +//! engine is already used by `../objects`. +//! +//! ## How it honors the [`SnapshotStore`] invariants +//! +//! - **Atomic data + cursor.** Each [`apply`](SnapshotStore::apply) is a single +//! fjall write batch: every put/delete *and* the resume cursor land under one +//! sequence number and commit together. There is no window where the cursor +//! names a revision whose data is missing. +//! - **Self-sufficient under NO_SYNC.** The durability mode is configurable. With +//! sync off (the default — same cache philosophy as the append log's +//! no-fsync-per-checkpoint path), a commit is not fsync'd; a power-loss crash can +//! lose the un-synced *tail*. That is safe precisely because data and cursor are +//! one atomic batch: whatever survived has its matching cursor, so on reopen the +//! consumer resumes the watch from the recovered cursor and re-folds the tail +//! from NATS. Set `sync = true` to fsync every commit. +//! - **Queryable.** [`get`](SnapshotStore::get) and [`range`](SnapshotStore::range) +//! read straight from fjall's block-cached, `Slice`-backed storage — no full-DB +//! deserialization — so a 1B-route consumer can build its serving index from a +//! prefix scan. +//! +//! ## Threading +//! +//! fjall is synchronous; [`watch_applied`](crate::watch_applied) already offloads +//! [`apply`](SnapshotStore::apply) to a blocking task, and async callers querying +//! [`get`](SnapshotStore::get)/[`range`](SnapshotStore::range) should use +//! `spawn_blocking` likewise. + +use std::path::Path; + +use fjall::{Config, Database, Keyspace, KeyspaceCreateOptions, PersistMode}; + +use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor}; +use crate::snapshot::{SnapshotError, SnapshotStore}; + +/// Partition holding the folded KV state: `key` → encoded `(version, value)`. +const DATA_PARTITION: &str = "data"; +/// Partition holding fold metadata (just the resume cursor today). +const META_PARTITION: &str = "meta"; +/// Key under [`META_PARTITION`] storing the resume cursor's raw version bytes. +const CURSOR_KEY: &[u8] = b"cursor"; + +/// Durability configuration for [`FjallSnapshot`]. +/// +/// Defaults to NO_SYNC (`sync: false`) — same cache philosophy as the append +/// log's no-fsync-per-checkpoint path. +#[derive(Debug, Clone, Copy, Default)] +pub struct FjallConfig { + /// `fsync` every [`apply`](SnapshotStore::apply) commit when `true`. When + /// `false` (the default), commits are not fsync'd (NO_SYNC): faster, and a + /// tail lost to power loss is rebuilt by resuming the watch from the recovered + /// cursor — the snapshot is a cache. + pub sync: bool, +} + +/// On-disk durable fold backed by fjall. See the [module docs](self). +pub struct FjallSnapshot { + // fjall 3 renamed its types: the database root is `Database` (was `Keyspace`) + // and each named partition is a `Keyspace` (was `PartitionHandle`). + db: Database, + data: Keyspace, + meta: Keyspace, + config: FjallConfig, + cursor: WatchCursor, +} + +impl FjallSnapshot { + /// Open or resume the store at `path` with explicit durability config. + /// + /// `path` is a directory (fjall keyspace), created if absent. Returns the + /// persisted resume cursor — [`WatchCursor::none`] when fresh — and the store. + pub fn open(path: &Path, config: FjallConfig) -> Result<(WatchCursor, Self), SnapshotError> { + std::fs::create_dir_all(path)?; + let db = Database::open(Config::new(path)).map_err(map_fjall)?; + let data = db + .keyspace(DATA_PARTITION, KeyspaceCreateOptions::default) + .map_err(map_fjall)?; + let meta = db + .keyspace(META_PARTITION, KeyspaceCreateOptions::default) + .map_err(map_fjall)?; + + let cursor = match meta.get(CURSOR_KEY).map_err(map_fjall)? { + Some(raw) => VersionToken::from_raw(&raw) + .map(WatchCursor::from_version) + .ok_or_else(|| { + SnapshotError::InvalidFormat(format!( + "stored cursor is {} bytes, exceeds version token capacity", + raw.len() + )) + })?, + None => WatchCursor::none(), + }; + + Ok(( + cursor.clone(), + Self { + db, + data, + meta, + config, + cursor, + }, + )) + } + + /// The most recently applied resume cursor. + pub fn cursor(&self) -> &WatchCursor { + &self.cursor + } +} + +impl SnapshotStore for FjallSnapshot { + fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> { + Self::open(path, FjallConfig::default()) + } + + fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> { + // One atomic batch: every data mutation AND the cursor commit under a + // single sequence number. Either the whole fold step is durable or none of + // it is — the cursor never outraces its data. + let mut wb = self.db.batch().durability(self.durability()); + // One scratch buffer reused across the whole batch. `insert` converts its + // value into fjall's owned `Slice` eagerly — it copies the bytes before + // returning — so the buffer is free to be refilled for the next entry. That + // turns N per-`Put` assembly allocations into one amortized allocation. + let mut scratch = Vec::new(); + for update in batch { + match update { + KvUpdate::Put(entry) => { + encode_value_into(&mut scratch, &entry.value, &entry.version)?; + wb.insert(&self.data, entry.key.as_bytes(), scratch.as_slice()); + } + KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => { + wb.remove(&self.data, key.as_bytes()); + } + } + } + // Cursor in the SAME batch as the data it names. + wb.insert(&self.meta, CURSOR_KEY, cursor.version().as_bytes()); + wb.commit().map_err(map_fjall)?; + + self.cursor = cursor.clone(); + Ok(()) + } + + fn get(&self, key: &str) -> Result, SnapshotError> { + match self.data.get(key.as_bytes()).map_err(map_fjall)? { + Some(raw) => Ok(Some(decode_entry(key, &raw)?)), + None => Ok(None), + } + } + + fn range(&self, prefix: &str) -> Result, SnapshotError> { + // Collect the streaming scan — same decode path as `for_each_in_range`, + // just buffered. fjall yields keys in ascending byte order, so the result + // is already sorted (unlike the HashMap-backed append log). + let mut out = Vec::new(); + self.for_each_in_range(prefix, |entry| { + out.push(entry); + Ok(()) + })?; + Ok(out) + } + + fn for_each_in_range( + &self, + prefix: &str, + mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>, + ) -> Result<(), SnapshotError> { + // fjall's prefix iterator is lazy — entries are decoded and handed to `f` + // one at a time, so a 1B-route consumer building a serving index never + // holds more than a single `KvEntry` in memory at once. + for guard in self.data.prefix(prefix.as_bytes()) { + // fjall 3 yields a lazy `Guard` per entry; `into_inner` resolves it to + // the `(key, value)` pair (loading the value, which keeps the scan lazy + // for key-only iterations elsewhere). + let (raw_key, raw_val) = guard.into_inner().map_err(map_fjall)?; + let key = std::str::from_utf8(&raw_key).map_err(|e| { + SnapshotError::InvalidFormat(format!("non-UTF-8 key in fjall store: {e}")) + })?; + f(decode_entry(key, &raw_val)?)?; + } + Ok(()) + } +} + +impl FjallSnapshot { + /// Per-commit durability: `fsync` when configured, otherwise NO_SYNC. + fn durability(&self) -> Option { + if self.config.sync { + Some(PersistMode::SyncAll) + } else { + // Explicit NO_SYNC: flush to OS buffers only — survives a process crash, + // not a power loss, which is exactly the cache semantics the module docs + // promise. Stating `Buffer` rather than `None` keeps that guarantee + // independent of whatever default durability the keyspace was opened + // with, so a future change to fjall's default can't silently make + // `sync: false` durable (or weaker). + Some(PersistMode::Buffer) + } + } +} + +/// Encode a stored value as `[ver_len:u8][version bytes][value bytes]` into `buf`. +/// +/// `buf` is cleared and refilled (its capacity is reused across a batch). The +/// version is length-prefixed raw bytes for the same reason the append-log format +/// uses it: a backend's token (NATS u64, FDB 10-byte versionstamp) must round-trip +/// intact. +/// +/// `VersionToken` caps inline storage at 10 bytes, so the `u8` length prefix never +/// truncates today. Checking with `try_from` rather than casting surfaces a format +/// error instead of silently writing a wrong length — which would frame a record +/// `decode_entry` then mis-parses — if a future token ever widens past 255 bytes. +/// This mirrors `write_put_record` in `snapshot.rs`. +fn encode_value_into( + buf: &mut Vec, + value: &[u8], + version: &VersionToken, +) -> Result<(), SnapshotError> { + let vb = version.as_bytes(); + let ver_len = u8::try_from(vb.len()).map_err(|_| { + SnapshotError::InvalidFormat(format!( + "version too long: {} bytes (max {})", + vb.len(), + u8::MAX + )) + })?; + buf.clear(); + buf.reserve(1 + vb.len() + value.len()); + buf.push(ver_len); + buf.extend_from_slice(vb); + buf.extend_from_slice(value); + Ok(()) +} + +/// Decode a `[ver_len:u8][version][value]` record back into a [`KvEntry`]. +fn decode_entry(key: &str, raw: &[u8]) -> Result { + let ver_len = *raw.first().ok_or_else(|| { + SnapshotError::InvalidFormat("fjall value record is empty (no version length)".into()) + })? as usize; + let value_off = 1 + ver_len; + if raw.len() < value_off { + return Err(SnapshotError::InvalidFormat(format!( + "fjall value record truncated: need {value_off} bytes for version, have {}", + raw.len() + ))); + } + let version = VersionToken::from_raw(&raw[1..value_off]).ok_or_else(|| { + SnapshotError::InvalidFormat(format!( + "version length {ver_len} exceeds version token capacity" + )) + })?; + Ok(KvEntry { + key: key.to_string(), + value: raw[value_off..].to_vec(), + version, + }) +} + +/// Map a [`fjall::Error`] into the backend-agnostic [`SnapshotError`]. +fn map_fjall(e: fjall::Error) -> SnapshotError { + match e { + // Surface I/O failures (disk full, permission denied, …) as a real + // `io::Error` so the OS errno and the `#[source]` chain survive for + // operators, instead of being flattened into an opaque backend string. + fjall::Error::Io(io) => SnapshotError::Io(io), + // Everything else keeps fjall's own variant name — its `Display` renders + // as `FjallError: {variant:?}`, so `Poisoned` (a flush/commit failure + // that should crash the app), journal recovery, decode, etc. stay legible + // in logs without leaking the `fjall` type into this error enum. + other => SnapshotError::Backend(other.to_string()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + /// A 10-byte FDB versionstamp has no `u64` form; the length-prefixed value + /// format must carry it intact. A `u64`-only field would flatten it to 0 and + /// silently break every later CAS — so this is the load-bearing reason the + /// record stores a length-prefixed token rather than a fixed 8 bytes. + #[test] + fn encode_decode_round_trips_fdb_versionstamp() { + let vs = VersionToken::from_fdb_versionstamp(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let mut enc = Vec::new(); + encode_value_into(&mut enc, b"payload", &vs).expect("encode"); + let entry = decode_entry("k", &enc).expect("decode"); + + assert_eq!(entry.version.as_bytes(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + assert!( + entry.version.as_u64().is_none(), + "a 10-byte token has no u64 form — it must not be flattened" + ); + assert_eq!(entry.value, b"payload"); + } + + /// An empty value (the CAS-tombstone shape) encodes to just the version prefix + /// and decodes back to a present, empty-valued entry with its version intact. + #[test] + fn encode_decode_round_trips_empty_value() { + let mut enc = Vec::new(); + encode_value_into(&mut enc, b"", &VersionToken::from_u64(7)).expect("encode"); + let entry = decode_entry("k", &enc).expect("decode"); + + assert!(entry.value.is_empty()); + assert_eq!(entry.version.as_u64(), Some(7)); + } + + /// A zero-byte record has no version-length byte — corruption, not a valid + /// record. It must surface as a recoverable `InvalidFormat`, never a panic. + #[test] + fn decode_entry_rejects_empty_record() { + let err = decode_entry("k", &[]).unwrap_err(); + assert!( + matches!(err, SnapshotError::InvalidFormat(_)), + "empty record must be a format error, got {err:?}" + ); + } + + /// A record that claims a longer version than its bytes provide is truncated + /// on-disk corruption — reject it instead of reading past the buffer. + #[test] + fn decode_entry_rejects_truncated_version() { + // Claims a 5-byte version, but only 2 bytes follow the length prefix. + let raw = [5u8, 0xAA, 0xBB]; + let err = decode_entry("k", &raw).unwrap_err(); + assert!( + matches!(err, SnapshotError::InvalidFormat(_)), + "truncated version must be a format error, got {err:?}" + ); + } + + /// A version length beyond `VersionToken`'s 10-byte capacity can't round-trip; + /// `from_raw` rejects it and `decode_entry` maps that to `InvalidFormat` rather + /// than silently truncating to a wrong (CAS-breaking) version. + #[test] + fn decode_entry_rejects_oversized_version() { + // ver_len = 11 with 11 trailing bytes: passes the truncation check, then + // trips the capacity check inside `VersionToken::from_raw`. + let mut raw = vec![11u8]; + raw.extend_from_slice(&[0u8; 11]); + let err = decode_entry("k", &raw).unwrap_err(); + assert!( + matches!(err, SnapshotError::InvalidFormat(_)), + "oversized version must be a format error, got {err:?}" + ); + } + + /// A persisted cursor blob larger than the version-token capacity must surface + /// as a recoverable `InvalidFormat` at `open`, not a panic or a silently + /// truncated cursor that would resume the watch from the wrong position. + #[test] + fn open_rejects_corrupted_cursor() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + + { + let (_c, store) = + FjallSnapshot::open(&path, FjallConfig::default()).expect("initial open"); + // Write an 11-byte blob straight into the meta partition under the + // cursor key, bypassing the apply path's bounded encoding. + store + .meta + .insert(CURSOR_KEY, [0u8; 11]) + .expect("insert oversized cursor"); + store.db.persist(PersistMode::SyncAll).expect("persist"); + } + + // `FjallSnapshot` isn't `Debug`, so match the result rather than `unwrap_err`. + match FjallSnapshot::open(&path, FjallConfig::default()) { + Err(SnapshotError::InvalidFormat(_)) => {} + Err(other) => panic!("expected InvalidFormat, got {other:?}"), + Ok(_) => panic!("expected open to reject the oversized cursor"), + } + } +} diff --git a/tests/integration.rs b/tests/integration.rs index c0150dc..74e05c5 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1131,7 +1131,7 @@ async fn health_reflects_server_death() { // (see `seed_then_watch_prefix_loses_writes_in_the_gap`) without needing the // sentinel handshake, which the combinator's owned watcher can't expose. -use slipstream::snapshot::{self, SnapshotWriter}; +use slipstream::snapshot::{self, AppendLogSnapshot}; use slipstream::{BatchConfig, WatchScope, watch_applied}; use tokio::sync::watch as tokio_watch; @@ -1159,7 +1159,7 @@ fn node_put_key(u: &KvUpdate) -> Option { fn spawn_applied( watcher: Arc, baseline: Option, - snapshot: Option, + snapshot: Option, parse: fn(&KvUpdate) -> Option, ) -> ( tokio::task::JoinHandle>, @@ -1269,9 +1269,9 @@ async fn applied_resumes_from_snapshot_cursor_without_skipping() { .expect("u64 rev"); // --- Run 1: apply {a, b}, checkpoint the snapshot, shut down. --- - let writer1 = SnapshotWriter::open(&snap_path, u64::MAX).expect("open snapshot"); + let (_resume1, store1) = AppendLogSnapshot::open(&snap_path, u64::MAX).expect("open snapshot"); let (task, mut applied_rx, _cur_rx, sd_tx) = - spawn_applied(watcher.clone(), Some(baseline), Some(writer1), put_key); + spawn_applied(watcher.clone(), Some(baseline), Some(store1), put_key); let first = collect_applied(&mut applied_rx, 2).await; assert_eq!(first, vec!["node.a", "node.b"]); diff --git a/tests/snapshot_store.rs b/tests/snapshot_store.rs new file mode 100644 index 0000000..8c7621b --- /dev/null +++ b/tests/snapshot_store.rs @@ -0,0 +1,549 @@ +//! Backend-agnostic conformance suite for [`SnapshotStore`]. +//! +//! Every check is written generically over a backend and an `open` closure, then +//! instantiated for each shipped backend: [`AppendLogSnapshot`] (always) and +//! `FjallSnapshot` (behind `--features fjall`). New backends get the whole suite +//! by adding two wrapper lines. +//! +//! Run the full matrix with: +//! ```text +//! cargo test --test snapshot_store +//! cargo test --test snapshot_store --features fjall +//! ``` + +use std::path::Path; + +use slipstream::snapshot::SnapshotStore; +use slipstream::{KvEntry, KvUpdate, VersionToken, WatchCursor}; +use tempfile::TempDir; + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate { + KvUpdate::Put(KvEntry { + key: key.to_string(), + value: value.to_vec(), + version: VersionToken::from_u64(rev), + }) +} + +fn del(key: &str, rev: u64) -> KvUpdate { + KvUpdate::Delete { + key: key.to_string(), + version: VersionToken::from_u64(rev), + } +} + +fn purge(key: &str, rev: u64) -> KvUpdate { + KvUpdate::Purge { + key: key.to_string(), + version: VersionToken::from_u64(rev), + } +} + +/// A deterministic stream exercising puts, an overwrite, and a delete. Final live +/// state: `node.a=v1b`, `node.c=v3`, `svc.x=sx`, `svc.y=sy` (`node.b` deleted), +/// resume cursor `7`. +fn stream() -> Vec { + vec![ + put("node.a", b"v1", 1), + put("node.b", b"v2", 2), + put("svc.x", b"sx", 3), + put("node.a", b"v1b", 4), // overwrite + del("node.b", 5), // delete + put("node.c", b"v3", 6), + put("svc.y", b"sy", 7), + ] +} + +/// Live state of [`stream`] as `(key, value)` pairs in key order — what every +/// backend must converge to. +fn expected_state() -> Vec<(String, Vec)> { + vec![ + ("node.a".into(), b"v1b".to_vec()), + ("node.c".into(), b"v3".to_vec()), + ("svc.x".into(), b"sx".to_vec()), + ("svc.y".into(), b"sy".to_vec()), + ] +} + +/// Fold `updates` into `store` as a series of batches (3 updates each), advancing +/// the cursor to each batch's last revision — mirroring how `watch_applied` flushes. +fn fold(store: &mut S, updates: &[KvUpdate]) { + for chunk in updates.chunks(3) { + let cursor = WatchCursor::from_version(chunk.last().unwrap().version().clone()); + store.apply(chunk, &cursor).expect("apply batch"); + } +} + +/// The full live state as `(key, value)` pairs in key order, via `range("")`. +fn dump(store: &S) -> Vec<(String, Vec)> { + store + .range("") + .expect("range") + .into_iter() + .map(|e| (e.key, e.value)) + .collect() +} + +// --------------------------------------------------------------------------- +// Generic checks — each runs against any backend via its `open` closure +// --------------------------------------------------------------------------- + +/// Round-trip: fold, drop, reopen — state and cursor survive the restart. +fn check_round_trip(open: impl Fn(&Path) -> (WatchCursor, S)) { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + + { + let (_resume, mut s) = open(&path); + fold(&mut s, &stream()); + } // drop closes the store + + let (cursor, s) = open(&path); + assert_eq!(cursor.as_u64(), Some(7), "resume cursor survives reopen"); + assert_eq!(dump(&s), expected_state(), "state survives reopen"); +} + +/// get/range correctness: point lookups, deleted keys, prefix scans, ordering. +fn check_get_range(open: impl Fn(&Path) -> (WatchCursor, S)) { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + let (_resume, mut s) = open(&path); + fold(&mut s, &stream()); + + // Point lookups: live key, overwritten key (latest value), deleted key, absent key. + assert_eq!(s.get("svc.x").unwrap().unwrap().value, b"sx"); + assert_eq!(s.get("node.a").unwrap().unwrap().value, b"v1b"); + assert!( + s.get("node.b").unwrap().is_none(), + "deleted key reads as None" + ); + assert!(s.get("absent").unwrap().is_none()); + + // The matched entry carries its version. + assert_eq!(s.get("svc.x").unwrap().unwrap().version.as_u64(), Some(3)); + + // Prefix scan: only matching, deleted excluded, ascending key order. + let nodes: Vec = s + .range("node.") + .unwrap() + .into_iter() + .map(|e| e.key) + .collect(); + assert_eq!(nodes, vec!["node.a".to_string(), "node.c".to_string()]); + + let svcs: Vec = s + .range("svc.") + .unwrap() + .into_iter() + .map(|e| e.key) + .collect(); + assert_eq!(svcs, vec!["svc.x".to_string(), "svc.y".to_string()]); + + // Empty prefix returns everything; a non-matching prefix returns nothing. + assert_eq!(s.range("").unwrap().len(), 4); + assert!(s.range("zzz").unwrap().is_empty()); +} + +/// `for_each_in_range` streams the same entries `range` buffers — same matches, +/// same ascending order, deletes excluded — and an early `Err` from the callback +/// stops the scan and propagates. +fn check_for_each_in_range(open: impl Fn(&Path) -> (WatchCursor, S)) { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + let (_resume, mut s) = open(&path); + fold(&mut s, &stream()); + + // Streamed scan equals the buffered scan, for a prefix and for everything. + for prefix in ["", "node.", "svc.", "zzz"] { + let mut streamed = Vec::new(); + s.for_each_in_range(prefix, |e| { + streamed.push((e.key, e.value)); + Ok(()) + }) + .expect("for_each_in_range"); + let buffered: Vec<_> = s + .range(prefix) + .unwrap() + .into_iter() + .map(|e| (e.key, e.value)) + .collect(); + assert_eq!( + streamed, buffered, + "streamed scan matches range for {prefix:?}" + ); + } + + // An `Err` from the callback halts the scan early and propagates. + let mut seen = 0; + let result = s.for_each_in_range("", |_| { + seen += 1; + if seen == 2 { + return Err(slipstream::snapshot::SnapshotError::Backend("stop".into())); + } + Ok(()) + }); + assert!(result.is_err(), "callback error propagates"); + assert_eq!(seen, 2, "scan stops at the first callback error"); +} + +/// Cursor-resume after a reconnect: fold a first segment, reopen (cursor reflects +/// it), fold the post-cursor delta, reopen again (cursor advanced). Models a +/// service restarting and resuming the watch from the persisted position. +fn check_cursor_resume(open: impl Fn(&Path) -> (WatchCursor, S)) { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + let all = stream(); + let (first, rest) = all.split_at(3); // revs 1..=3, then 4..=7 + + { + let (_r, mut s) = open(&path); + fold(&mut s, first); + } + + // Scope the handle: a "restart" means the prior store is gone before the next + // open. An on-disk backend may hold a single-writer lock on the path (fjall + // does), so leaving this handle alive would block the reopen below. + { + let (resume, _s) = open(&path); + assert_eq!( + resume.as_u64(), + Some(3), + "cursor reflects the first segment" + ); + } + + { + let (r, mut s) = open(&path); + assert_eq!(r.as_u64(), Some(3)); + fold(&mut s, rest); // only the post-cursor delta + } + + let (resume2, s) = open(&path); + assert_eq!(resume2.as_u64(), Some(7), "cursor advanced over the delta"); + assert_eq!(dump(&s), expected_state()); +} + +/// PROPERTY — pure function of the log. Lose the store mid-stream, replay from the +/// cursor, and the fold is byte-identical to one that never lost the store. +/// +/// "rm the store" wipes its files (and therefore its cursor), so reopen resumes +/// from `none()` and the entire stream is replayed — exactly what a consumer does +/// when its cache is gone: full re-fold from NATS. The result must match a +/// continuous, never-interrupted fold. +fn check_property_pure_fold(open: impl Fn(&Path) -> (WatchCursor, S)) { + let updates = stream(); + + // Reference: one continuous store, never lost. + let ref_dir = TempDir::new().unwrap(); + let ref_path = ref_dir.path().join("store"); + let (_r, mut reference) = open(&ref_path); + fold(&mut reference, &updates); + let reference_dump = dump(&reference); + drop(reference); + + // Victim: fold the first half, then rm the store mid-stream, reopen fresh, + // and replay the WHOLE stream from the start (cursor is gone with the files). + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + { + let (_r, mut s) = open(&path); + fold(&mut s, &updates[..4]); + } + wipe(&path); + let (resume, mut s) = open(&path); + assert!(resume.is_none(), "a wiped store reopens with no cursor"); + fold(&mut s, &updates); + + assert_eq!( + dump(&s), + reference_dump, + "replay after losing the store is byte-identical to the continuous fold" + ); + // And identical to the independently-computed expected state. + assert_eq!(dump(&s), expected_state()); +} + +/// `Purge` is folded the same as `Delete` — the key disappears, untouched keys +/// survive, and the removal persists across a reopen. The shared `stream()` only +/// exercises `Put`/`Delete`, so without this the `Purge` match arm is dead in tests +/// and a refactor that diverges purge from delete would ship unnoticed. +fn check_purge(open: impl Fn(&Path) -> (WatchCursor, S)) { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + + { + let (_r, mut s) = open(&path); + s.apply( + &[put("a", b"1", 1), put("b", b"2", 2)], + &WatchCursor::from_u64(2), + ) + .expect("seed"); + s.apply(&[purge("a", 3)], &WatchCursor::from_u64(3)) + .expect("purge"); + + assert!(s.get("a").unwrap().is_none(), "purged key is gone"); + assert_eq!( + s.get("b").unwrap().unwrap().value, + b"2", + "untouched key survives a purge of its neighbor" + ); + } + + // The purge persists across a restart, just like a delete. + let (cursor, s) = open(&path); + assert_eq!(cursor.as_u64(), Some(3), "cursor advanced over the purge"); + assert!( + s.get("a").unwrap().is_none(), + "purge persists across reopen" + ); + assert_eq!(s.get("b").unwrap().unwrap().value, b"2"); +} + +/// An empty batch carries no data but still advances (and persists) the cursor. +/// `watch_applied` can flush a batch with zero updates (e.g. a heartbeat that only +/// moves the resume position); the cursor must follow it, or a restart re-folds +/// already-seen revisions. +fn check_empty_batch_advances_cursor(open: impl Fn(&Path) -> (WatchCursor, S)) { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + + { + let (_r, mut s) = open(&path); + fold(&mut s, &stream()[..3]); // real data up to rev 3 + s.apply(&[], &WatchCursor::from_u64(9)) + .expect("empty batch applies"); + } + + let (cursor, s) = open(&path); + assert_eq!( + cursor.as_u64(), + Some(9), + "empty batch still advances and persists the cursor" + ); + assert_eq!(dump(&s).len(), 3, "an empty batch mutates no data"); +} + +/// A stored empty value round-trips as a *present* entry, not as a deletion. This +/// is the CAS-tombstone shape (`delete_with_version` writes an empty-value `Put` so +/// concurrent writers still conflict) — the snapshot layer must preserve it +/// verbatim, including across a reopen, so the version stays available for CAS. +fn check_empty_value_round_trip(open: impl Fn(&Path) -> (WatchCursor, S)) { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + + { + let (_r, mut s) = open(&path); + s.apply(&[put("lock", b"", 1)], &WatchCursor::from_u64(1)) + .expect("apply empty-value put"); + let e = s + .get("lock") + .unwrap() + .expect("empty-value entry is present, not absent"); + assert!(e.value.is_empty()); + assert_eq!( + e.version.as_u64(), + Some(1), + "version survives an empty value" + ); + } + + let (_cursor, s) = open(&path); + let e = s + .get("lock") + .unwrap() + .expect("empty-value entry survives reopen, not confused with a delete"); + assert!(e.value.is_empty()); +} + +/// Remove a store at `path`, whether it is a single file (append log) or a +/// directory (fjall keyspace). +fn wipe(path: &Path) { + if path.is_dir() { + std::fs::remove_dir_all(path).expect("rm store dir"); + } else if path.exists() { + std::fs::remove_file(path).expect("rm store file"); + } +} + +// --------------------------------------------------------------------------- +// AppendLogSnapshot — the default backend +// --------------------------------------------------------------------------- + +use slipstream::AppendLogSnapshot; + +fn open_append_log(path: &Path) -> (WatchCursor, AppendLogSnapshot) { + AppendLogSnapshot::open(path, u64::MAX).expect("open append log") +} + +#[test] +fn append_log_round_trip() { + check_round_trip(open_append_log); +} + +#[test] +fn append_log_get_range() { + check_get_range(open_append_log); +} + +#[test] +fn append_log_for_each_in_range() { + check_for_each_in_range(open_append_log); +} + +#[test] +fn append_log_cursor_resume() { + check_cursor_resume(open_append_log); +} + +#[test] +fn append_log_pure_fold_property() { + check_property_pure_fold(open_append_log); +} + +#[test] +fn append_log_purge() { + check_purge(open_append_log); +} + +#[test] +fn append_log_empty_batch_advances_cursor() { + check_empty_batch_advances_cursor(open_append_log); +} + +#[test] +fn append_log_empty_value_round_trip() { + check_empty_value_round_trip(open_append_log); +} + +/// Backwards-compat: a file written by the existing [`SnapshotWriter`] API loads +/// through the new [`AppendLogSnapshot`] store (the on-disk v2 format is unchanged). +#[test] +fn append_log_loads_legacy_writer_file() { + use slipstream::snapshot::SnapshotWriter; + + let dir = TempDir::new().unwrap(); + let path = dir.path().join("legacy.snap"); + + let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap(); + w.write_update(&put("node.a", b"v1", 1)).unwrap(); + w.write_update(&put("node.b", b"v2", 2)).unwrap(); + w.checkpoint(&WatchCursor::from_u64(2)).unwrap(); + drop(w); + + let (cursor, s) = AppendLogSnapshot::open(&path, u64::MAX).unwrap(); + assert_eq!(cursor.as_u64(), Some(2), "legacy cursor loads"); + assert_eq!(s.get("node.a").unwrap().unwrap().value, b"v1"); + assert_eq!(s.get("node.b").unwrap().unwrap().value, b"v2"); +} + +// --------------------------------------------------------------------------- +// FjallSnapshot — on-disk backend (feature-gated) +// --------------------------------------------------------------------------- + +#[cfg(feature = "fjall")] +mod fjall_backend { + use super::*; + use slipstream::{FjallConfig, FjallSnapshot}; + + fn open_no_sync(path: &Path) -> (WatchCursor, FjallSnapshot) { + FjallSnapshot::open(path, FjallConfig { sync: false }).expect("open fjall") + } + + #[test] + fn fjall_round_trip() { + check_round_trip(open_no_sync); + } + + #[test] + fn fjall_get_range() { + check_get_range(open_no_sync); + } + + #[test] + fn fjall_for_each_in_range() { + check_for_each_in_range(open_no_sync); + } + + #[test] + fn fjall_cursor_resume() { + check_cursor_resume(open_no_sync); + } + + #[test] + fn fjall_pure_fold_property() { + check_property_pure_fold(open_no_sync); + } + + #[test] + fn fjall_purge() { + check_purge(open_no_sync); + } + + #[test] + fn fjall_empty_batch_advances_cursor() { + check_empty_batch_advances_cursor(open_no_sync); + } + + #[test] + fn fjall_empty_value_round_trip() { + check_empty_value_round_trip(open_no_sync); + } + + /// NO_SYNC crash-tail recovery. With sync off, commits are not fsync'd, but + /// data and cursor share one atomic batch, so whatever survives is mutually + /// consistent. We can't deterministically simulate a power-loss (a clean drop + /// flushes fjall's journal), so this asserts the load-bearing invariants: + /// after reopen the recovered cursor matches the recovered data, and re-folding + /// the tail from that cursor is idempotent (safe to replay). + #[test] + fn fjall_no_sync_tail_is_consistent_and_idempotent() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + let updates = stream(); + + { + let (_r, mut s) = open_no_sync(&path); + fold(&mut s, &updates); + } // drop flushes the journal + + // Recovered cursor names rev 7, and the data it names is all present. + let (cursor, s) = open_no_sync(&path); + assert_eq!( + cursor.as_u64(), + Some(7), + "cursor recovered, never ahead of data" + ); + assert_eq!(dump(&s), expected_state()); + drop(s); + + // Re-folding the tail (the last batch, revs 6..=7) from the recovered + // cursor is idempotent — replaying the un-synced tail never corrupts state. + let (_r, mut s) = open_no_sync(&path); + fold(&mut s, &updates[5..]); + assert_eq!( + dump(&s), + expected_state(), + "re-folding the tail is idempotent" + ); + } + + /// With sync on, every commit fsyncs — round-trip must still hold. + #[test] + fn fjall_sync_mode_round_trips() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("store"); + { + let (_r, mut s) = + FjallSnapshot::open(&path, FjallConfig { sync: true }).expect("open fjall sync"); + fold(&mut s, &stream()); + } + let (cursor, s) = + FjallSnapshot::open(&path, FjallConfig { sync: true }).expect("reopen fjall sync"); + assert_eq!(cursor.as_u64(), Some(7)); + assert_eq!(dump(&s), expected_state()); + } +}