diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 113d25b..98b0dfa 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -69,6 +69,7 @@ watch_all_from(cursor, tx) | `KvUpdate` | One watch event: `Put`, `Delete`, or `Purge` | Not a read result; carries deletes too | | `Snapshot` | Deduplicated KV state + cursor persisted to disk | Not the source of truth; a cache of NATS | | `SnapshotWriter` | Append-only log of `KvUpdate`s; no in-memory state beyond a counter | Not the in-memory cache itself | +| `watch_applied` | Combinator: batch → apply → *then* advance cursor/checkpoint | Not a raw watch; the cursor follows `apply`, not receipt | | `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, …) | Not enforced; purely advisory | ## Layer Architecture @@ -93,6 +94,11 @@ watch_all_from(cursor, tx) │ SnapshotWriter │ load() │ compact_to_file() │ │ (append-only CRC log, tempfile+rename compact) │ └─────────────────────────────────────────────────────────────┘ + applied.rs (combinator over KvWatcher + snapshot) +┌──────────────────────────────────────────────────────────────┐ +│ watch_applied(): batch → apply → advance cursor/checkpoint │ +│ (cursor-after-apply; the safe default for resumable watch) │ +└──────────────────────────────────────────────────────────────┘ ``` ## Core Mechanism @@ -117,6 +123,28 @@ match watcher.watch_all_from(&snap.cursor, tx).await { } ``` +### Applied-Cursor Watch (`watch_applied`) + +The resumable watch above hands the caller raw machinery — a channel of `KvUpdate`s, a cursor, a snapshot writer — and trusts each one to hand-roll the loop that batches updates, applies them, and advances the cursor. Every hand-rolled instance got the same step wrong: it advanced the cursor on **receipt** of an update (`high_water = rev` at `rx.recv()`) and applied the batch afterward. The combinator `watch_applied` exists to encode the correct discipline once. + +**The resume guarantee.** This library's contract is "resume from a sequence number after any restart." `watch_applied` sharpens that into a single invariant: + +> A persisted/reported cursor `C` ⟹ every update with revision ≤ `C` has been **applied** — the caller's `apply()` has returned for it. + +The cursor is written from `apply()`'s completion, never from the channel's delivery. Concretely, on each flush the combinator runs `apply(batch)` to completion, *then* sets `cursor = batch_high`, *then* checkpoints the snapshot at that cursor, *then* fires `on_applied`. Nothing advances the cursor before `apply` returns. + +**Why receipt is the wrong signal.** Bumping the cursor at `rx.recv()` and applying later opens a crash window: the persisted cursor claims "caught up to rev N" while rev N still sits in an unapplied batch buffer (or in flight to a separate apply task). On crash+resume the watch re-arms at `cursor+1`, *past* the unapplied rev N, and silently skips it. The data is gone with no error — a hole in the exact guarantee the crate advertises. + +This is the lesson of Saltzer, Reed & Clark, *End-to-End Arguments in System Design* (1984): a checkpoint placed below the endpoint — here, at the transport's delivery rather than at the application of the update — can only ever be a performance hint, never a correctness guarantee. The "it happened" property can only be established at the endpoint that actually performs the work, so the cursor is sourced from `apply`, not from `recv`. The cursor-as-monotonic-index shape itself is the HashiCorp Consul anti-entropy / blocking-query lineage: a client re-arms its watch from the last index it *reconciled*, never from the index it merely *saw*. + +**Cursor authority covers rejected entries.** `batch_high` tracks the highest revision *received* since the last flush, including updates that `parse` rejected (corrupt bytes, irrelevant keys). A rejected entry is still "nothing to apply," so it is covered by the cursor — and because NATS delivers in revision order, advancing to the max revision after one atomic `apply` is sound: having seen the max means every revision below it has been seen too. Without this, a run of irrelevant keys would pin the cursor in place and force redundant replay on every restart. + +**Snapshot consistency.** Raw `KvUpdate`s stream to the snapshot log as they arrive, but the *checkpoint* cursor is the post-apply cursor. A crash after a raw record is written but before its `apply`/checkpoint leaves the log holding data *ahead* of its cursor — which is safe: the cursor never names a revision whose `apply` had not returned, so resume re-delivers and re-applies that tail rather than skipping it. Compaction runs off the hot path via `spawn_blocking`, as everywhere else in the snapshot subsystem. + +**Flush triggers.** A batch flushes when any of these fires: the `window` elapses, `batch.len()` reaches `config.max`, a shutdown is signalled, or the channel closes with a pending batch (the remainder is flushed before returning). On `CursorExpired` from the resume path the combinator logs and falls back to the full-scope watch (`watch_all` / `watch_prefix`); v1 replays the full re-list as a stream of puts (a deeper "resync" signal that diffs against prior state is a documented TODO). + +This is the layer the tunnel router (swap route table) and edge origin watcher (rebuild hashrings) both collapse onto: `parse` extracts the domain registration, `apply` swaps the live state, `on_applied` persists the cursor. + ### scan() and keys() via Ephemeral Push Consumer Both use `DeliverPolicy::LastPerSubject` — one ephemeral push consumer delivers the latest value per key in a single streaming operation, rather than N sequential `get()` calls. `keys()` adds `headers_only: true` so no value bytes cross the wire. @@ -220,6 +248,10 @@ The double-check pattern in `connect()` guards a concurrent connect race: a seco ## Design Decisions +### Why a `watch_applied` combinator instead of leaving the loop to callers? + +The raw `KvWatcher` + `WatchCursor` + `SnapshotWriter` pieces let callers hand-roll the batch/apply/advance loop — and every known caller advanced the cursor on *receipt* rather than after *apply*, silently skipping un-applied updates on crash+resume. That is a footgun in the library's core guarantee, not a caller bug to be fixed N times. Encoding cursor-after-apply once, behind a combinator that callers can't get wrong, is cheaper and safer than documentation. `apply` stays the only domain logic; the cursor/snapshot/`on_applied` bookkeeping is the library's. See [Applied-Cursor Watch](#applied-cursor-watch-watch_applied). + ### Why KvError: Clone instead of Box? A failed connect future may be observed by multiple concurrent callers waiting on a shared result. `Clone` lets the error fan out to N waiters without `Arc`. The cost: `std::io::Error` and `async-nats` error types are not `Clone`, so their structured cause chain is flattened into a pre-rendered `String` at the boundary. The trade-off is explicit: no `#[source]` chain, but the message carries context instead. @@ -308,8 +340,9 @@ Checkpoints are frequent (every N watch events). An fsync per checkpoint would a | `src/stores.rs` | `Connection`, `KvStore`, `StoreConfig`, `StorageType`, `ConnectionCapabilities` | | `src/nats.rs` | NATS JetStream implementation; bucket creation, scan consumer lifecycle, timeout wrapping, Synadia Cloud workarounds | | `src/snapshot.rs` | Append-only snapshot log: `SnapshotWriter`, `load()`, `replay_log()`, `compact_to_file()` | +| `src/applied.rs` | `watch_applied` cursor-after-apply combinator: `WatchScope`, `BatchConfig` | | `src/lib.rs` | Re-exports all public types; no logic | -| `benches/` | Criterion benchmarks for snapshot write/checkpoint/load throughput | +| `benches/` | Criterion benchmarks for snapshot write/checkpoint/load throughput and batch throughput | | `tests/` | Integration tests (require live NATS) | ## Configuration diff --git a/Cargo.lock b/Cargo.lock index 1d5beef..9fcd897 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,7 +97,7 @@ checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" [[package]] name = "beyond-slipstream" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-nats", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7a46721..0f631c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ rust-version = "1.92" [package] name = "beyond-slipstream" -version = "0.1.0" +version = "0.2.0" edition.workspace = true license.workspace = true rust-version.workspace = true @@ -27,7 +27,7 @@ futures = "0.3" serde_json = "1" tempfile = "3" thiserror = "2" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["macros", "rt", "sync", "time"] } tracing = "0.1" url = "2" @@ -42,3 +42,7 @@ harness = false [[bench]] name = "ack" harness = false + +[[bench]] +name = "applied" +harness = false diff --git a/README.md b/README.md index 39277d9..cc6d95c 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Trait-based KV abstraction over NATS JetStream. Read, write, and watch distribut ```toml [dependencies] -beyond-slipstream = "0.1" +beyond-slipstream = "0.2" ``` ## Concepts @@ -22,6 +22,7 @@ beyond-slipstream = "0.1" | `KvUpdate` | One watch event: `Put`, `Delete`, or `Purge` | | `Snapshot` | Deduplicated KV state + cursor at a point in time. Disk cache, not source of truth | | `SnapshotWriter` | Append-only log of `KvUpdate`s; survives restarts without a full NATS scan | +| `watch_applied` | Watch loop that advances the cursor only after your `apply` returns | | `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, global ordering) | ## Usage @@ -185,6 +186,8 @@ while let Some(update) = rx.recv().await { } ``` +This loop has a trap: `current_cursor` must track what `cache.apply()` has consumed, not what `rx.recv()` delivered. Get it wrong and a crash skips updates on resume. [`watch_applied`](#applied-watch) runs this loop for you with that invariant enforced. + The snapshot is a cache. Delete it and the service falls back to full replay on next start. ### File format @@ -204,6 +207,36 @@ Record: crc32:u32le ++ type:u8 ++ payload A truncated final record (crash mid-write) is discarded; earlier records are intact. A CRC failure mid-file returns `SnapshotError::Corrupted`. +## Applied watch + +`watch_applied` drives the watch-batch-apply-checkpoint loop and enforces one rule the hand-rolled version can't: the cursor advances only after your `apply` returns, never on receipt. + +```rust +use slipstream::{watch_applied, BatchConfig, KvUpdate, WatchCursor, WatchScope}; + +let final_cursor = watch_applied( + watcher, + WatchScope::All, // or WatchScope::Prefix("node.".into()) + load_cursor(), // Option — resume here, or None + Some(snapshot_writer), // checkpoints at the applied cursor, or None + BatchConfig::default(), // 10ms window, 100 updates per batch + |update: &KvUpdate| parse(update), // KvUpdate -> Option; None just drops it + |batch: Vec| cache.apply_batch(batch), // your only domain logic + |cursor: WatchCursor| persist(cursor), // fires after apply returns + shutdown, // tokio::sync::watch::Receiver +).await?; +``` + +A batch closes when `window` elapses or it hits `max` updates, whichever comes first. Then, in order: `apply(batch)` runs to completion, the cursor advances to the batch's highest revision, the snapshot checkpoints at that cursor, and `on_applied` fires. + +Persist the cursor on receipt instead and a crash between receive and apply loses data: the cursor reads "caught up to rev N" while rev N sits in an unapplied buffer, and the next resume starts past it. `watch_applied` checkpoints at the applied cursor, so a persisted cursor always means every update up to it has been applied. + +- `parse` returning `None` (corrupt bytes, irrelevant key) still advances the cursor — nothing to apply means nothing to skip. +- On `CursorExpired`, it falls back to a full watch automatically. +- It returns the final applied cursor on shutdown or stream close. + +`apply` runs inline. If it panics, the panic aborts the watch. + ## NATS mapping | Concept | NATS primitive | diff --git a/benches/applied.rs b/benches/applied.rs new file mode 100644 index 0000000..7aadd62 --- /dev/null +++ b/benches/applied.rs @@ -0,0 +1,110 @@ +//! Batch throughput for the [`watch_applied`] combinator. +//! +//! Measures the per-update cost of the cursor-after-apply loop — receive off the +//! channel, track the high-water cursor, parse, batch, flush (apply + +//! checkpoint) — with a no-op `apply` and no snapshot, so what's left is the +//! combinator's own batching overhead. A scripted in-process watcher feeds N +//! updates and closes; there is no NATS server in the loop. + +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use async_trait::async_trait; +use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main}; +use slipstream::{ + BatchConfig, KvEntry, KvError, KvUpdate, KvWatcher, VersionToken, WatchCursor, WatchScope, + watch_applied, +}; +use tokio::sync::mpsc::Sender; +use tokio::sync::watch; + +/// Delivers a fixed batch of updates, then closes the channel. +struct ScriptedWatcher { + updates: Mutex>>, +} + +#[async_trait] +impl KvWatcher for ScriptedWatcher { + async fn watch_all(&self, tx: Sender) -> Result<(), KvError> { + let updates = self.updates.lock().unwrap().take().unwrap_or_default(); + for u in updates { + if tx.send(u).await.is_err() { + break; + } + } + Ok(()) + } + + async fn watch_prefix(&self, _prefix: &str, tx: Sender) -> Result<(), KvError> { + self.watch_all(tx).await + } +} + +fn put(i: u64) -> KvUpdate { + KvUpdate::Put(KvEntry { + key: format!("node.region-{i}"), + value: vec![0x42; 256], + version: VersionToken::from_u64(i), + }) +} + +fn bench_batch_throughput(c: &mut Criterion) { + const N: u64 = 1000; + + // Multi-thread runtime so the spawned watch task and the combinator loop run + // on separate threads — the realistic deployment shape. + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + + let template: Vec = (0..N).map(put).collect(); + + let mut group = c.benchmark_group("watch_applied"); + group.throughput(Throughput::Elements(N)); + group.bench_function("batch_1000_updates", |b| { + b.iter_batched( + // Setup (not measured): fresh scripted watcher with its own copy of + // the updates, since each run drains the channel. + || { + Arc::new(ScriptedWatcher { + updates: Mutex::new(Some(template.clone())), + }) + }, + // Measured: run the combinator to completion over all N updates. + |watcher| { + rt.block_on(async move { + let (_sd_tx, sd_rx) = watch::channel(false); + watch_applied( + watcher as Arc, + WatchScope::All, + None, + None, + BatchConfig { + window: Duration::from_millis(10), + max: 100, + }, + // parse: keep every put. + |u: &KvUpdate| match u { + KvUpdate::Put(_) => Some(()), + _ => None, + }, + // apply: no-op — isolate the batching overhead. + |_batch: Vec<()>| {}, + |_cursor: WatchCursor| {}, + sd_rx, + ) + .await + .unwrap() + }) + }, + BatchSize::SmallInput, + ); + }); + group.finish(); +} + +criterion_group!(benches, bench_batch_throughput); +criterion_main!(benches); diff --git a/mise.toml b/mise.toml index 88f5c8e..9dbe040 100644 --- a/mise.toml +++ b/mise.toml @@ -3,7 +3,10 @@ experimental = true activate_aggressive = true [tools] -rust = "1.92" +# `components` pulls clippy + rustfmt into the toolchain; mise installs rust with +# the minimal rustup profile otherwise, and CI's `cargo clippy` step can't find +# the component. +rust = { version = "1.92", components = "clippy,rustfmt" } yamlfmt = "0.12.1" dprint = "0.50.2" "ubi:nats-io/nats-server" = "2.14.1" diff --git a/src/applied.rs b/src/applied.rs new file mode 100644 index 0000000..007828a --- /dev/null +++ b/src/applied.rs @@ -0,0 +1,1160 @@ +//! Cursor-after-apply watch combinator. +//! +//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over +//! a short window (or a max count), hands each batch to a caller-supplied +//! `apply` closure, and **only then** advances the resume cursor, checkpoints +//! the snapshot, and fires `on_applied`. It encodes one discipline that every +//! hand-rolled watch loop in the wider system gets subtly wrong: +//! +//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with +//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for +//! > it. The cursor never advances on *receipt* of an update, only after it has +//! > durably taken effect. +//! +//! ## Why receipt is the wrong signal +//! +//! The tempting shortcut is to bump the cursor as each update arrives off the +//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a +//! crash between those two steps the persisted cursor claims "caught up to rev +//! N" while rev N is still sitting in an unapplied buffer. On resume the watch +//! starts *past* rev N and silently skips it — a correctness hole in the exact +//! "resume after any restart" guarantee this crate advertises. +//! +//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names +//! the fix: a function placed below the endpoints (here, the channel receive) +//! can only be a performance hint; the *endpoint* — the application of the +//! update — is the only place the "it happened" guarantee can actually be +//! established. So the cursor is written from `apply()`'s completion, not from +//! the transport's delivery. +//! +//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp +//! Consul's anti-entropy / blocking-query lineage: a client holds the last index +//! it has *reconciled* and re-arms the watch from there, never from the index it +//! merely *saw*. +//! +//! ## What the caller supplies +//! +//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning +//! `None` (corrupt bytes, irrelevant key) is fine — the update is still +//! *received*, so it still counts toward the cursor; there is simply nothing to +//! apply for it. +//! - `apply`: consumes a `Vec` in revision order. This is the only domain +//! logic; for the tunnel router it swaps the route table, for the edge origin +//! watcher it rebuilds the hashrings. +//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new +//! applied cursor. Callers use it to persist the cursor for the next restart. +//! +//! ## Panics +//! +//! `apply` runs inline on the watch task. If it panics, the panic propagates out +//! of [`watch_applied`] and aborts the watch — that is the caller's contract, +//! the same as a panic in any other supplied closure. + +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::mpsc; +use tokio::sync::watch; +use tracing::warn; + +use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor}; +use crate::snapshot::SnapshotWriter; + +/// What to watch: every key, or every key under a prefix. +/// +/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` / +/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`. +#[derive(Debug, Clone)] +pub enum WatchScope { + /// Watch all keys in the bucket. + All, + /// Watch only keys beginning with this prefix. + Prefix(String), +} + +/// Batching policy for [`watch_applied`]. +/// +/// A flush fires when **either** bound is hit, whichever comes first: `window` +/// time has elapsed since the batch opened, or `max` updates have accumulated. +/// The window amortizes the cost of `apply` (e.g. one route-table clone per +/// flush instead of one per update); `max` caps memory and latency when updates +/// arrive faster than the window. +#[derive(Debug, Clone, Copy)] +pub struct BatchConfig { + /// Maximum time a batch stays open before being flushed. + pub window: Duration, + /// Maximum number of parsed updates in a batch before forcing a flush. + pub max: usize, +} + +impl Default for BatchConfig { + /// 10 ms / 100 updates — the de-facto default every hand-rolled caller + /// already used, lifted into one place. + fn default() -> Self { + Self { + window: Duration::from_millis(10), + max: 100, + } + } +} + +/// Drive a watch with cursor-after-apply semantics. +/// +/// Subscribes per `scope` (resuming from `resume` when it carries a position), +/// batches updates per `config`, applies each batch via `apply`, and only then +/// advances the cursor / checkpoints `snapshot` / calls `on_applied`. Returns +/// the final applied cursor when the watch ends (shutdown signalled, or the +/// underlying stream closed). +/// +/// Raw [`KvUpdate`]s are streamed to `snapshot` as they arrive, but the +/// *checkpoint* cursor written on each flush is the post-apply cursor — so a +/// loaded snapshot's cursor is always consistent with the state it carries +/// (the cursor never names a revision whose `apply` had not returned before the +/// checkpoint). +/// +/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and +/// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see +/// the full re-list as a stream of puts, exactly as the hand-rolled loops did. +/// +/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its +/// rationale. +/// +/// # Type parameters +/// - `U`: the caller's domain update type, produced by `parse` and consumed by +/// `apply`. +// This combinator takes each of its dependencies as a parameter so every +// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct +// type and is monomorphized at the call site. Folding them into a builder struct +// would either box the closures or force a single generic bundle, losing that. +#[allow(clippy::too_many_arguments)] +// The flush macro resets `batch_high`/`batch_deadline`/`snapshot` for the next +// loop iteration. At the two flush sites that return immediately afterward +// (shutdown, channel-close) those resets are dead stores — correct, but flagged. +#[allow(unused_assignments)] +pub async fn watch_applied( + watcher: Arc, + scope: WatchScope, + resume: Option, + mut snapshot: Option, + config: BatchConfig, + mut parse: P, + mut apply: A, + mut on_applied: O, + mut shutdown: watch::Receiver, +) -> Result +where + U: Send, + P: FnMut(&KvUpdate) -> Option + Send, + A: FnMut(Vec) + Send, + O: FnMut(WatchCursor) + Send, +{ + // The cursor we'll return. Initialized from the resume position so that a + // watch which receives nothing new still reports the position it resumed + // from as "applied" (it is — everything up to it was applied before the last + // run persisted it). + let mut applied = match &resume { + Some(c) => c.clone(), + None => WatchCursor::none(), + }; + + // Spawn the watch task. It owns the cursor-expired fallback so the main loop + // only ever sees a clean ordered stream of updates on `rx`. + let (tx, mut rx) = mpsc::channel::(256); + let handle = { + let watcher = Arc::clone(&watcher); + tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await }) + }; + + // Batch state. + // + // `batch_high` tracks the version of the most recently *received* update + // since the last flush — including updates `parse` rejected. NATS delivers + // in revision order, so the last received is the highest, and advancing the + // cursor to it after a single atomic `apply` is correct: having seen the max + // means we've seen everything below it, and a rejected entry is still + // "nothing to apply", hence covered. Reset to `none()` after every flush. + let batch_cap = config.max.clamp(1, 64); + let mut batch: Vec = Vec::with_capacity(batch_cap); + let mut batch_high = WatchCursor::none(); + // `Some` once a batch has opened and the window timer is armed; `None` + // between flushes. Only the armed/idle distinction is read in the loop — + // the absolute instant lives in the pinned `sleep` future below. + let mut batch_deadline: Option = None; + + // Flush the current batch: apply (if non-empty), then advance the cursor, + // checkpoint the snapshot at that cursor, and fire `on_applied` — in that + // order. Returns whether the snapshot grew past its compaction threshold. + // Defined as a macro so it can mutate the locals above and `.await` nothing + // itself; compaction (which does block) is handled by the caller via + // `flush_and_compact!`. + macro_rules! flush_inner { + () => {{ + let mut needs_compact = false; + // Nothing received since the last flush → nothing to do at all. + if !batch.is_empty() || !batch_high.is_none() { + if !batch.is_empty() { + // INVARIANT: apply() runs and RETURNS before any cursor + // advance below. Move the batch out so a panicking apply + // can't leave half-consumed state behind. + // + // `replace` (not `take`) leaves a pre-sized Vec behind: + // `take` swaps in a zero-capacity `Vec::new()`, so every + // batch after the first re-climbs the reallocation ladder + // (4→8→…→cap). Handing back a `with_capacity` Vec keeps the + // amortized allocation to one per batch. + apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap))); + } + if !batch_high.is_none() { + applied = batch_high.clone(); + if let Some(sw) = snapshot.as_mut() { + match sw.checkpoint(&applied) { + Ok(true) => needs_compact = true, + Ok(false) => {} + Err(e) => warn!(error = %e, "snapshot checkpoint failed"), + } + } + on_applied(applied.clone()); + } + batch_high = WatchCursor::none(); + } + batch_deadline = None; + needs_compact + }}; + } + + // Flush, then run compaction off the hot path if the log grew too large. + // Compaction reads and rewrites the whole snapshot file, so it must not run + // on the async reactor thread — `spawn_blocking` moves it to the blocking + // pool and we reclaim the writer afterward. + macro_rules! flush_and_compact { + () => {{ + if flush_inner!() + && let Some(mut sw) = snapshot.take() + { + // Return the writer to the closure unconditionally so a *failed* + // compaction (Ok(Err)) still hands the writer back — checkpoints + // continue, only this compaction was skipped. A *panicked* + // blocking task (Err) drops the writer on the blocking thread and + // we can't recover it; rather than silently run the rest of the + // watch without persistence — which breaks the resume-after-restart + // guarantee — we surface it as a fatal error. + match tokio::task::spawn_blocking(move || { + let res = sw.compact(); + (sw, res) + }) + .await + { + Ok((sw, Ok(()))) => snapshot = Some(sw), + Ok((sw, Err(e))) => { + warn!(error = %e, "snapshot compaction failed; continuing without compacting"); + snapshot = Some(sw); + } + Err(e) => { + warn!(error = %e, "snapshot compaction task panicked; aborting watch"); + handle.abort(); + return Err(KvError::WatchError(format!( + "snapshot compaction task panicked: {e}" + ))); + } + } + } + }}; + } + + // A single timer future, reset in place each time a batch opens. The old + // `tokio::time::sleep(timeout)` lived inside the select arm, so it was + // re-created on every loop iteration — one Arc-backed timer-wheel entry + // allocated, registered, and immediately dropped per received update. + // Pinning one future and `reset`-ing it reuses that single allocation; the + // `if batch_deadline.is_some()` guard keeps it from firing while idle, so + // its initial already-elapsed deadline is never observed. + let sleep = tokio::time::sleep(Duration::ZERO); + tokio::pin!(sleep); + + loop { + tokio::select! { + biased; + + // Shutdown wins: flush whatever is batched (so the cursor reflects + // it), abandon any updates still in flight on the channel — they + // weren't applied, the cursor doesn't claim them, and they'll be + // re-delivered on the next resume — and return the applied cursor. + res = shutdown.changed() => { + if res.is_err() || *shutdown.borrow() { + flush_and_compact!(); + handle.abort(); + // Observe the task's terminal state. An abort surfaces as a + // cancelled JoinError, which we ignore; a genuine panic that + // raced ahead of the abort is logged rather than silently lost. + if let Err(join) = handle.await + && !join.is_cancelled() + { + warn!(error = %join, "watch task panicked at shutdown"); + } + return Ok(applied); + } + } + + // Batch window elapsed. + () = &mut sleep, if batch_deadline.is_some() => { + flush_and_compact!(); + } + + update = rx.recv() => { + match update { + Some(u) => { + // Cursor authority: every received update bumps the + // pending high-water, regardless of whether `parse` + // keeps it. + batch_high = WatchCursor::from_version(u.version().clone()); + + // Stream the raw update to the snapshot log as it + // arrives. The durable checkpoint is written later at + // the applied cursor, so a crash here just means the log + // holds data ahead of its cursor — re-applied on resume, + // never skipped. + if let Some(sw) = snapshot.as_mut() + && let Err(e) = sw.write_update(&u) + { + warn!(error = %e, "snapshot write failed"); + } + + if let Some(parsed) = parse(&u) { + batch.push(parsed); + } + + // Arm the window on the first received update of a batch + // — even a parse-rejected one, so the cursor advances + // within `window` even through a run of irrelevant keys. + // Reset the pinned timer to the new deadline rather than + // allocating a fresh `Sleep`. + if batch_deadline.is_none() { + let deadline = tokio::time::Instant::now() + config.window; + sleep.as_mut().reset(deadline); + batch_deadline = Some(deadline); + } + + if batch.len() >= config.max { + flush_and_compact!(); + } + } + None => { + // Stream closed. Flush the remainder, then surface the + // watch task's terminal result: a clean end returns the + // applied cursor, an error propagates. + flush_and_compact!(); + return match handle.await { + Ok(Ok(())) => Ok(applied), + Ok(Err(e)) => Err(e), + Err(join) => Err(KvError::WatchError(format!( + "watch task panicked: {join}" + ))), + }; + } + } + } + } + } +} + +/// Run the underlying watch for `scope`, resuming from `resume` when it carries +/// a position, with the [`KvError::CursorExpired`] → full-watch fallback. +async fn run_watch( + watcher: &dyn KvWatcher, + scope: &WatchScope, + resume: Option, + tx: mpsc::Sender, +) -> Result<(), KvError> { + // Resume only when the cursor carries a real position; an absent or `none()` + // cursor falls through to a full watch. Binding `cursor` here makes "we have a + // resume position" structural — there is no separate bool whose truth a later + // edit could let drift from the `Some`. + let resume_cursor = resume.filter(|c| !c.is_none()); + + match scope { + WatchScope::All => { + if let Some(cursor) = resume_cursor { + match watcher.watch_all_from(&cursor, tx.clone()).await { + Err(KvError::CursorExpired) => { + // TODO(v2): signal a "resync" to the caller so it can + // diff the full re-list against prior state and emit + // synthetic deletes for keys that vanished during the + // gap (see Snapshot::stale_keys). For v1 the full + // re-list is replayed as a stream of puts, matching the + // hand-rolled loops this combinator replaces. + warn!("watch cursor expired, falling back to full watch_all"); + watcher.watch_all(tx).await + } + other => other, + } + } else { + watcher.watch_all(tx).await + } + } + WatchScope::Prefix(prefix) => { + if let Some(cursor) = resume_cursor { + match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await { + Err(KvError::CursorExpired) => { + // TODO(v2): see the watch_all arm above. + warn!("watch cursor expired, falling back to full watch_prefix"); + watcher.watch_prefix(prefix, tx).await + } + other => other, + } + } else { + watcher.watch_prefix(prefix, tx).await + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::kv::{KvEntry, VersionToken}; + use async_trait::async_trait; + use std::sync::Mutex; + use std::sync::atomic::{AtomicU64, Ordering}; + use tokio::sync::mpsc::Sender; + + fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate { + KvUpdate::Put(KvEntry { + key: key.to_string(), + value: value.to_vec(), + version: VersionToken::from_u64(rev), + }) + } + + /// A scripted watcher. Delivers a pre-set list of updates through the + /// channel, then either holds the channel open (so window/max/shutdown + /// flushes can be exercised without the stream ending) or returns cleanly + /// (so channel-close flushing can be exercised). + struct MockWatcher { + full: Mutex>>, + from: Mutex>>, + from_expires: bool, + hold: bool, + } + + impl MockWatcher { + fn new(updates: Vec, hold: bool) -> Self { + Self { + full: Mutex::new(Some(updates)), + from: Mutex::new(None), + from_expires: false, + hold, + } + } + + async fn deliver(&self, which: &Mutex>>, tx: Sender) { + let updates = which.lock().unwrap().take().unwrap_or_default(); + for u in updates { + if tx.send(u).await.is_err() { + return; + } + } + if self.hold { + // Keep `tx` alive (channel open) until this task is aborted. + std::future::pending::<()>().await; + } + } + } + + #[async_trait] + impl KvWatcher for MockWatcher { + async fn watch_all(&self, tx: Sender) -> Result<(), KvError> { + self.deliver(&self.full, tx).await; + Ok(()) + } + + async fn watch_prefix(&self, _prefix: &str, tx: Sender) -> Result<(), KvError> { + self.deliver(&self.full, tx).await; + Ok(()) + } + + async fn watch_all_from( + &self, + _cursor: &WatchCursor, + tx: Sender, + ) -> Result<(), KvError> { + if self.from_expires { + return Err(KvError::CursorExpired); + } + self.deliver(&self.from, tx).await; + Ok(()) + } + + // Mirror watch_all_from so the prefix resume / expiry arms of run_watch + // are exercised against the same `from` script. Without this the trait's + // default impl would delegate to watch_prefix and silently deliver the + // full set instead of the delta. + async fn watch_prefix_from( + &self, + _prefix: &str, + _cursor: &WatchCursor, + tx: Sender, + ) -> Result<(), KvError> { + if self.from_expires { + return Err(KvError::CursorExpired); + } + self.deliver(&self.from, tx).await; + Ok(()) + } + } + + /// A watcher whose entry points all fail. Used to prove the watch task's + /// terminal error is surfaced out of `watch_applied` rather than swallowed + /// as a clean `Ok(applied)` when the channel closes. + struct ErrorWatcher; + + #[async_trait] + impl KvWatcher for ErrorWatcher { + async fn watch_all(&self, _tx: Sender) -> Result<(), KvError> { + Err(KvError::WatchError("injected watch failure".into())) + } + + async fn watch_prefix(&self, _prefix: &str, _tx: Sender) -> Result<(), KvError> { + Err(KvError::WatchError("injected watch failure".into())) + } + } + + // A no-op parse that keeps every Put as the value bytes; drops deletes. + fn parse_put(u: &KvUpdate) -> Option> { + match u { + KvUpdate::Put(e) => Some(e.value.clone()), + _ => None, + } + } + + /// The stream closes (hold = false) with a pending batch; the remainder is + /// flushed before returning, the returned cursor is the last revision, and + /// `on_applied` ran exactly once after `apply`. + #[tokio::test] + async fn flush_on_channel_close() { + let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)]; + let watcher = Arc::new(MockWatcher::new(updates, false)); + + let applied_batches = Arc::new(Mutex::new(Vec::>>::new())); + let on_applied_cursors = Arc::new(Mutex::new(Vec::::new())); + + let ab = Arc::clone(&applied_batches); + let oc = Arc::clone(&on_applied_cursors); + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig::default(), + parse_put, + move |batch| ab.lock().unwrap().push(batch), + move |c| oc.lock().unwrap().push(c.as_u64().unwrap()), + sd_rx, + ) + .await + .unwrap(); + + assert_eq!(cursor.as_u64(), Some(3)); + let batches = applied_batches.lock().unwrap(); + let flat: Vec> = batches.iter().flatten().cloned().collect(); + assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]); + assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3); + } + + /// Fewer than `max` updates, then the channel idles: the window timer must + /// flush them and advance the cursor. + #[tokio::test(start_paused = true)] + async fn flush_on_window() { + let updates = vec![put("a", b"1", 1), put("b", b"2", 2)]; + let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open + + let applied = Arc::new(AtomicU64::new(0)); + let count = Arc::new(AtomicU64::new(0)); + let a = Arc::clone(&applied); + let c = Arc::clone(&count); + let (sd_tx, sd_rx) = watch::channel(false); + + let task = tokio::spawn(watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig::default(), + parse_put, + move |batch: Vec>| { + c.fetch_add(batch.len() as u64, Ordering::SeqCst); + }, + move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst), + sd_rx, + )); + + // Let the window (10ms) elapse under virtual time. + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!( + count.load(Ordering::SeqCst), + 2, + "window should have flushed" + ); + assert_eq!(applied.load(Ordering::SeqCst), 2); + + sd_tx.send(true).unwrap(); + let cursor = task.await.unwrap().unwrap(); + assert_eq!(cursor.as_u64(), Some(2)); + } + + /// Exactly `max` updates fills a batch and flushes immediately — before the + /// window would have elapsed. + #[tokio::test(start_paused = true)] + async fn flush_on_max() { + let max = 4; + let updates: Vec<_> = (1..=max as u64) + .map(|i| put(&format!("k{i}"), b"v", i)) + .collect(); + let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open + + let flushes = Arc::new(Mutex::new(Vec::::new())); + let f = Arc::clone(&flushes); + let (sd_tx, sd_rx) = watch::channel(false); + + let task = tokio::spawn(watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig { + window: Duration::from_secs(3600), // effectively never + max, + }, + parse_put, + move |batch: Vec>| f.lock().unwrap().push(batch.len()), + move |_| {}, + sd_rx, + )); + + // Yield enough for the mock to push all `max` updates; the window is an + // hour, so any flush is purely the max trigger. + tokio::time::sleep(Duration::from_millis(1)).await; + assert_eq!( + *flushes.lock().unwrap(), + vec![max], + "a full batch should flush on max, not wait for the window" + ); + + sd_tx.send(true).unwrap(); + task.await.unwrap().unwrap(); + } + + /// A pending batch plus a shutdown signal: the batch is flushed and the + /// applied cursor returned. + #[tokio::test(start_paused = true)] + async fn flush_on_shutdown() { + let updates = vec![put("a", b"1", 1), put("b", b"2", 2)]; + let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open + + let applied = Arc::new(AtomicU64::new(0)); + let a = Arc::clone(&applied); + let (sd_tx, sd_rx) = watch::channel(false); + + let task = tokio::spawn(watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig { + window: Duration::from_secs(3600), // window won't fire + max: 100, + }, + parse_put, + move |_batch: Vec>| {}, + move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst), + sd_rx, + )); + + // Give the mock time to deliver both updates into the pending batch. + tokio::time::sleep(Duration::from_millis(1)).await; + sd_tx.send(true).unwrap(); + + let cursor = task.await.unwrap().unwrap(); + assert_eq!( + cursor.as_u64(), + Some(2), + "shutdown flushes the pending batch" + ); + assert_eq!(applied.load(Ordering::SeqCst), 2); + } + + /// The cursor must not advance until `apply` has returned. We prove it by + /// having `apply` read the cursor that `on_applied` last published: when the + /// second batch is applied, the visible cursor must still be the *first* + /// batch's — never the second's, which only becomes visible after this + /// `apply` returns. + #[tokio::test(start_paused = true)] + async fn cursor_advances_only_after_apply() { + // Two batches of `max` updates each. + let max = 2usize; + let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect(); + let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open + + // Cursor as last published by on_applied; starts at 0 (nothing applied). + let published = Arc::new(AtomicU64::new(0)); + // What `apply` observed as the published cursor at the moment it ran. + let seen_at_apply = Arc::new(Mutex::new(Vec::::new())); + + let pub_for_apply = Arc::clone(&published); + let seen = Arc::clone(&seen_at_apply); + let pub_for_on = Arc::clone(&published); + let (sd_tx, sd_rx) = watch::channel(false); + + let task = tokio::spawn(watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig { + window: Duration::from_secs(3600), + max, + }, + parse_put, + move |_batch: Vec>| { + // The cursor visible here is whatever the PREVIOUS flush + // published — never this batch's, because we haven't returned. + seen.lock() + .unwrap() + .push(pub_for_apply.load(Ordering::SeqCst)); + }, + move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst), + sd_rx, + )); + + tokio::time::sleep(Duration::from_millis(1)).await; + sd_tx.send(true).unwrap(); + task.await.unwrap().unwrap(); + + // First apply saw 0 (nothing applied yet); second apply saw 2 (first + // batch's cursor), NOT 4. The cursor only reached 4 after the second + // apply returned. + assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]); + assert_eq!(published.load(Ordering::SeqCst), 4); + } + + /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no + /// domain work, but they were still received — so the cursor must advance + /// over them. + #[tokio::test] + async fn corrupt_parse_entries_advance_cursor() { + let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)]; + let watcher = Arc::new(MockWatcher::new(updates, false)); // close after + + let apply_calls = Arc::new(AtomicU64::new(0)); + let on_applied_max = Arc::new(AtomicU64::new(0)); + let ac = Arc::clone(&apply_calls); + let om = Arc::clone(&on_applied_max); + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig::default(), + // Reject everything — simulates corrupt/irrelevant entries. + |_u: &KvUpdate| -> Option> { None }, + move |batch: Vec>| { + ac.fetch_add(1, Ordering::SeqCst); + assert!(batch.is_empty()); + }, + move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst), + sd_rx, + ) + .await + .unwrap(); + + assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates"); + assert_eq!( + apply_calls.load(Ordering::SeqCst), + 0, + "an all-rejected batch applies nothing" + ); + assert_eq!(on_applied_max.load(Ordering::SeqCst), 7); + } + + /// A resume whose cursor has expired falls back to the full watch and still + /// applies the delivered updates. + #[tokio::test] + async fn cursor_expired_falls_back_to_full_watch() { + let mock = MockWatcher { + full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])), + from: Mutex::new(Some(vec![])), + from_expires: true, + hold: false, + }; + let watcher = Arc::new(mock); + + let applied_batches = Arc::new(Mutex::new(Vec::>::new())); + let ab = Arc::clone(&applied_batches); + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::All, + Some(WatchCursor::from_u64(5)), // resume position that "expired" + None, + BatchConfig::default(), + parse_put, + move |batch: Vec>| ab.lock().unwrap().extend(batch), + move |_| {}, + sd_rx, + ) + .await + .unwrap(); + + assert_eq!(cursor.as_u64(), Some(11)); + assert_eq!( + *applied_batches.lock().unwrap(), + vec![b"1".to_vec(), b"2".to_vec()], + "fallback full watch's updates were applied" + ); + } + + /// End-to-end with a real snapshot file: after the run, the persisted + /// snapshot's cursor equals the applied cursor and its entries match the + /// applied state — proving the checkpoint is written at the post-apply + /// cursor, never ahead of it. + #[tokio::test] + async fn snapshot_checkpoint_matches_applied_cursor() { + let dir = tempfile::TempDir::new().unwrap(); + let path = dir.path().join("applied.snap"); + let writer = SnapshotWriter::open(&path, u64::MAX).unwrap(); + + let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)]; + let watcher = Arc::new(MockWatcher::new(updates, false)); // close after + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::All, + None, + Some(writer), + BatchConfig::default(), + parse_put, + move |_batch: Vec>| {}, + move |_| {}, + sd_rx, + ) + .await + .unwrap(); + + assert_eq!(cursor.as_u64(), Some(2)); + + let snap = crate::snapshot::load(&path).unwrap().unwrap(); + assert_eq!( + snap.cursor.as_u64(), + cursor.as_u64(), + "snapshot checkpoint cursor must equal the applied cursor" + ); + assert_eq!(snap.entries.len(), 2); + assert_eq!(snap.entries["node.a"].value, b"1"); + assert_eq!(snap.entries["node.b"].value, b"2"); + } + + /// Happy-path resume: a non-expired cursor takes the `*_from` path and the + /// delta (the `from` script, NOT the full set) is applied. Proves the + /// resume branch delivers only post-cursor updates and advances to their + /// max revision. + #[tokio::test] + async fn resume_from_cursor_delivers_only_delta() { + let mock = MockWatcher { + // `full` would be delivered only if the resume path were (wrongly) + // bypassed; a non-empty distinguishing value makes that visible. + full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])), + from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])), + from_expires: false, + hold: false, + }; + let watcher = Arc::new(mock); + + let applied_batches = Arc::new(Mutex::new(Vec::>::new())); + let ab = Arc::clone(&applied_batches); + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::All, + Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired + None, + BatchConfig::default(), + parse_put, + move |batch: Vec>| ab.lock().unwrap().extend(batch), + move |_| {}, + sd_rx, + ) + .await + .unwrap(); + + assert_eq!( + cursor.as_u64(), + Some(11), + "cursor advances to the delta max" + ); + assert_eq!( + *applied_batches.lock().unwrap(), + vec![b"3".to_vec(), b"4".to_vec()], + "only the post-cursor delta is applied, never the full set" + ); + } + + /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and + /// applies the delivered updates. Every other test uses `WatchScope::All`; + /// this covers the prefix dispatch arm. + #[tokio::test] + async fn prefix_scope_applies_delivered_updates() { + let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)]; + let watcher = Arc::new(MockWatcher::new(updates, false)); // close after + + let applied_batches = Arc::new(Mutex::new(Vec::>::new())); + let ab = Arc::clone(&applied_batches); + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::Prefix("node.".to_string()), + None, + None, + BatchConfig::default(), + parse_put, + move |batch: Vec>| ab.lock().unwrap().extend(batch), + move |_| {}, + sd_rx, + ) + .await + .unwrap(); + + assert_eq!(cursor.as_u64(), Some(2)); + assert_eq!( + *applied_batches.lock().unwrap(), + vec![b"1".to_vec(), b"2".to_vec()] + ); + } + + /// `WatchScope::Prefix` resume whose cursor has expired falls back to the + /// full `watch_prefix` and still applies the delivered updates — the prefix + /// twin of `cursor_expired_falls_back_to_full_watch`. + #[tokio::test] + async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() { + let mock = MockWatcher { + full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])), + from: Mutex::new(Some(vec![])), + from_expires: true, + hold: false, + }; + let watcher = Arc::new(mock); + + let applied_batches = Arc::new(Mutex::new(Vec::>::new())); + let ab = Arc::clone(&applied_batches); + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::Prefix("node.".to_string()), + Some(WatchCursor::from_u64(5)), // resume position that "expired" + None, + BatchConfig::default(), + parse_put, + move |batch: Vec>| ab.lock().unwrap().extend(batch), + move |_| {}, + sd_rx, + ) + .await + .unwrap(); + + assert_eq!(cursor.as_u64(), Some(11)); + assert_eq!( + *applied_batches.lock().unwrap(), + vec![b"1".to_vec(), b"2".to_vec()], + "prefix fallback full watch's updates were applied" + ); + } + + /// The watch task's terminal error must propagate out of `watch_applied` + /// rather than being swallowed as `Ok(applied)` when the channel closes. + #[tokio::test] + async fn watch_task_error_propagates() { + let watcher = Arc::new(ErrorWatcher); + let (_sd_tx, sd_rx) = watch::channel(false); + + let result = watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig::default(), + parse_put, + move |_batch: Vec>| {}, + move |_| {}, + sd_rx, + ) + .await; + + match result { + Err(KvError::WatchError(msg)) => { + assert!(msg.contains("injected"), "error carries the cause: {msg}"); + } + other => panic!("expected WatchError, got {other:?}"), + } + } + + /// A batch where `parse` accepts some updates and rejects others: the cursor + /// must still advance to the highest *received* revision (covering the + /// rejected entry in the middle), while `apply` sees only the accepted ones. + #[tokio::test] + async fn mixed_parse_advances_cursor_over_rejected_entries() { + let updates = vec![ + put("keep.a", b"1", 5), + put("skip.b", b"2", 6), // rejected by parse + put("keep.c", b"3", 7), + ]; + let watcher = Arc::new(MockWatcher::new(updates, false)); // close after + + let applied_batches = Arc::new(Mutex::new(Vec::>::new())); + let on_applied_max = Arc::new(AtomicU64::new(0)); + let ab = Arc::clone(&applied_batches); + let om = Arc::clone(&on_applied_max); + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig::default(), + // Keep only keys under "keep."; reject everything else. + |u: &KvUpdate| -> Option> { + match u { + KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()), + _ => None, + } + }, + move |batch: Vec>| ab.lock().unwrap().extend(batch), + move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst), + sd_rx, + ) + .await + .unwrap(); + + assert_eq!( + cursor.as_u64(), + Some(7), + "cursor covers the rejected middle entry (rev 6)" + ); + assert_eq!( + *applied_batches.lock().unwrap(), + vec![b"1".to_vec(), b"3".to_vec()], + "apply sees only the accepted entries" + ); + assert_eq!(on_applied_max.load(Ordering::SeqCst), 7); + } + + /// Shutdown before any update arrives: nothing was received, so the cursor + /// stays at the resume position (here `none()`), `apply` never runs, and + /// `on_applied` never fires. + #[tokio::test(start_paused = true)] + async fn shutdown_with_no_pending_batch() { + let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open + + let apply_calls = Arc::new(AtomicU64::new(0)); + let on_applied_calls = Arc::new(AtomicU64::new(0)); + let ac = Arc::clone(&apply_calls); + let oc = Arc::clone(&on_applied_calls); + let (sd_tx, sd_rx) = watch::channel(false); + + let task = tokio::spawn(watch_applied( + watcher, + WatchScope::All, + None, + None, + BatchConfig::default(), + parse_put, + move |_batch: Vec>| { + ac.fetch_add(1, Ordering::SeqCst); + }, + move |_| { + oc.fetch_add(1, Ordering::SeqCst); + }, + sd_rx, + )); + + // Let the watcher attach and idle (it has nothing to deliver), then shut down. + tokio::time::sleep(Duration::from_millis(1)).await; + sd_tx.send(true).unwrap(); + + let cursor = task.await.unwrap().unwrap(); + assert_eq!( + cursor.as_u64(), + None, + "no updates received → cursor unmoved" + ); + assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs"); + assert_eq!( + on_applied_calls.load(Ordering::SeqCst), + 0, + "on_applied never fires" + ); + } + + /// With a low `compact_threshold`, the flush path's `spawn_blocking` + /// compaction actually fires (every other snapshot test pins the threshold + /// at `u64::MAX`, leaving that branch dead). After a compacting run the + /// snapshot must still load cleanly with the right cursor and entries. + #[tokio::test] + async fn snapshot_compaction_fires_and_stays_consistent() { + let dir = tempfile::TempDir::new().unwrap(); + let path = dir.path().join("applied.snap"); + // threshold 0 → every checkpoint reports "needs compact", forcing the + // spawn_blocking compaction branch on each flush. + let writer = SnapshotWriter::open(&path, 0).unwrap(); + + // Re-put the same key across flushes so compaction has duplicates to + // dedup; small max forces multiple flushes (hence multiple compactions). + let updates = vec![ + put("node.a", b"1", 1), + put("node.a", b"2", 2), + put("node.b", b"3", 3), + put("node.a", b"4", 4), + ]; + let watcher = Arc::new(MockWatcher::new(updates, false)); // close after + let (_sd_tx, sd_rx) = watch::channel(false); + + let cursor = watch_applied( + watcher, + WatchScope::All, + None, + Some(writer), + BatchConfig { + window: Duration::from_secs(3600), + max: 1, // one update per flush → a compaction per update + }, + parse_put, + move |_batch: Vec>| {}, + move |_| {}, + sd_rx, + ) + .await + .unwrap(); + + assert_eq!(cursor.as_u64(), Some(4)); + + let snap = crate::snapshot::load(&path).unwrap().unwrap(); + assert_eq!( + snap.cursor.as_u64(), + cursor.as_u64(), + "compacted snapshot's cursor still equals the applied cursor" + ); + assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped"); + assert_eq!( + snap.entries["node.a"].value, b"4", + "last write per key survives compaction" + ); + assert_eq!(snap.entries["node.b"].value, b"3"); + } +} diff --git a/src/lib.rs b/src/lib.rs index bac7d09..fcdc236 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,11 +23,13 @@ #![deny(unsafe_code)] +mod applied; mod kv; mod nats; pub mod snapshot; mod stores; +pub use applied::{BatchConfig, WatchScope, watch_applied}; pub use kv::{ KvEntry, KvError, KvReader, KvTtl, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor, }; diff --git a/tests/integration.rs b/tests/integration.rs index 2f8d33a..c0150dc 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -624,6 +624,78 @@ async fn watch_prefix_from_replays_only_the_delta() { assert!(matches!(&updates[0], KvUpdate::Put(e) if e.key == "node.b")); } +/// Demonstrates the seed-then-watch race a snapshot consumer must avoid, and that resuming from the +/// snapshot revision closes it. +/// +/// A consumer that seeds via `scan()` and then subscribes via `watch_prefix()` has a window between +/// the two calls. `watch_prefix` uses NATS `DeliverPolicy::New` (no initial-state replay), so a +/// write that lands in that window is in neither the snapshot nor the watch stream — it is silently +/// lost until the next full reseed. `watch_prefix_from(snapshot_revision)` instead replays +/// everything after the snapshot, so the gap write is delivered. +#[tokio::test] +async fn seed_then_watch_prefix_loses_writes_in_the_gap() { + let nats = TestNats::start().await; + let (_conn, store) = nats.store("seed-race").await; + let writer = store.writer().expect("writer"); + let watcher = store.watcher().expect("watcher"); + + // A pre-existing deny entry, present when the snapshot is taken. + writer.put("blackhole.1", b"spend").await.expect("put 1"); + + // 1) Snapshot — what a seeding consumer loads as its baseline — plus the revision it reflects. + let snapshot = store.reader().scan("blackhole.").await.expect("scan"); + assert_eq!(snapshot.len(), 1); + let baseline_rev = snapshot + .iter() + .filter_map(|e| e.version.as_u64()) + .max() + .unwrap_or(0); + + // 2) A write lands in the race window: after the scan, before any watch attaches. + writer.put("blackhole.2", b"fraud").await.expect("put 2"); + + // --- The bug: watch_prefix (DeliverPolicy::New) --- + { + let (tx, mut rx) = mpsc::channel(64); + let w = watcher.clone(); + tokio::spawn(async move { + let _ = w.watch_prefix("blackhole.", tx).await; + }); + // Handshake until the watch is provably attached and live (drains the sentinel echoes). + establish_watch(writer.as_ref(), &mut rx, "blackhole.sentinel").await; + + // The watch is live now, so a *fresh* write is delivered... + writer.put("blackhole.3", b"spend").await.expect("put 3"); + let got = collect_updates(&mut rx, 1).await; + // ...but the first thing we see is blackhole.3, not blackhole.2: the gap write was dropped. + // (blackhole.2 has a lower revision; had the watch carried it, it would arrive first.) + assert!( + matches!(&got[0], KvUpdate::Put(e) if e.key == "blackhole.3"), + "watch_prefix delivered a post-subscribe write but silently dropped the gap write \ + blackhole.2; got {:?}", + got[0].key() + ); + } + + // --- The fix: watch_prefix_from(snapshot revision) --- + { + let (tx, mut rx) = mpsc::channel(64); + let cursor = WatchCursor::from_u64(baseline_rev); + let w = watcher.clone(); + tokio::spawn(async move { + let _ = w.watch_prefix_from("blackhole.", &cursor, tx).await; + }); + // Resuming from the snapshot revision replays everything after it; the first such entry is + // exactly the gap write that watch_prefix lost. + let got = collect_updates(&mut rx, 1).await; + assert!( + matches!(&got[0], KvUpdate::Put(e) if e.key == "blackhole.2"), + "watch_prefix_from(snapshot revision) must deliver the gap write; got {:?}", + got[0].key() + ); + } +} + #[tokio::test] async fn watch_from_compacted_cursor_does_not_spuriously_fail() { let nats = TestNats::start().await; @@ -1044,3 +1116,326 @@ async fn health_reflects_server_death() { "is_healthy() must report false after the NATS server dies" ); } + +// --- watch_applied combinator (against real NATS) --------------------------- +// +// These exercise the cursor-after-apply combinator end to end against a live +// `nats-server`, not a stub watcher. The stub-based unit tests in +// `src/applied.rs` cover deterministic timing (paused clock) and fault +// injection; these cover the thing only the real backend can prove — that the +// resume guarantee holds against real JetStream revision ordering. +// +// All of these resume from a captured baseline cursor via `watch_all_from`, +// which replays everything strictly after the cursor regardless of when the +// subscription attaches. That sidesteps the `DeliverPolicy::New` attach race +// (see `seed_then_watch_prefix_loses_writes_in_the_gap`) without needing the +// sentinel handshake, which the combinator's owned watcher can't expose. + +use slipstream::snapshot::{self, SnapshotWriter}; +use slipstream::{BatchConfig, WatchScope, watch_applied}; +use tokio::sync::watch as tokio_watch; + +/// parse: keep every Put as its key string; drop deletes/purges. +fn put_key(u: &KvUpdate) -> Option { + match u { + KvUpdate::Put(e) => Some(e.key.clone()), + _ => None, + } +} + +/// parse: keep only Puts under the `node.` prefix — everything else is +/// "received but nothing to apply". +fn node_put_key(u: &KvUpdate) -> Option { + match u { + KvUpdate::Put(e) if e.key.starts_with("node.") => Some(e.key.clone()), + _ => None, + } +} + +/// Drive `watch_applied` over all keys, resuming from `baseline`, collecting +/// each applied key and each post-apply cursor onto unbounded channels. Returns +/// the spawned task handle plus the two receivers and the shutdown sender. +#[allow(clippy::type_complexity)] +fn spawn_applied( + watcher: Arc, + baseline: Option, + snapshot: Option, + parse: fn(&KvUpdate) -> Option, +) -> ( + tokio::task::JoinHandle>, + mpsc::UnboundedReceiver, + mpsc::UnboundedReceiver, + tokio_watch::Sender, +) { + let (applied_tx, applied_rx) = mpsc::unbounded_channel::(); + let (cursor_tx, cursor_rx) = mpsc::unbounded_channel::(); + let (sd_tx, sd_rx) = tokio_watch::channel(false); + + let task = tokio::spawn(watch_applied( + watcher, + WatchScope::All, + baseline, + snapshot, + BatchConfig::default(), + parse, + move |batch: Vec| { + for k in batch { + let _ = applied_tx.send(k); + } + }, + move |c: WatchCursor| { + let _ = cursor_tx.send(c.as_u64().unwrap_or(0)); + }, + sd_rx, + )); + + (task, applied_rx, cursor_rx, sd_tx) +} + +/// Receive exactly `n` applied keys, failing on timeout so a missing apply +/// can't hang the suite. +async fn collect_applied(rx: &mut mpsc::UnboundedReceiver, n: usize) -> Vec { + let mut out = Vec::with_capacity(n); + for _ in 0..n { + let key = timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("timed out waiting for an applied update") + .expect("applied channel closed early"); + out.push(key); + } + out +} + +/// End to end: updates after the resume cursor are applied in revision order, +/// and the returned cursor is the highest applied revision. +#[tokio::test] +async fn applied_streams_and_advances_cursor() { + let nats = TestNats::start().await; + let (_conn, store) = nats.store("applied").await; + let writer = store.writer().expect("writer"); + let watcher = store.watcher().expect("watcher"); + + // Baseline: capture a cursor, then write the real updates after it. + let base = writer.put("baseline", b"x").await.expect("put baseline"); + let baseline = WatchCursor::from_u64(base.as_u64().expect("u64 rev")); + + writer.put("node.a", b"1").await.expect("put a"); + writer.put("node.b", b"2").await.expect("put b"); + let last = writer.put("node.c", b"3").await.expect("put c"); + let last_rev = last.as_u64().expect("u64 rev"); + + let (task, mut applied_rx, _cursor_rx, sd_tx) = + spawn_applied(watcher, Some(baseline), None, put_key); + + let got = collect_applied(&mut applied_rx, 3).await; + assert_eq!( + got, + vec!["node.a", "node.b", "node.c"], + "updates applied in revision order" + ); + + sd_tx.send(true).expect("signal shutdown"); + let cursor = task.await.expect("task join").expect("watch_applied ok"); + assert_eq!( + cursor.as_u64(), + Some(last_rev), + "returned cursor is the highest applied revision" + ); +} + +/// The headline guarantee. Run the combinator with a real snapshot, shut it +/// down, then start a SECOND run from the persisted snapshot cursor and prove +/// the delta replays with nothing skipped and nothing re-applied — i.e. a +/// crash+restart resumes exactly where apply left off. +#[tokio::test] +async fn applied_resumes_from_snapshot_cursor_without_skipping() { + let nats = TestNats::start().await; + let (_conn, store) = nats.store("applied-resume").await; + let writer = store.writer().expect("writer"); + let watcher = store.watcher().expect("watcher"); + + let snap_dir = tempfile::tempdir().expect("snap dir"); + let snap_path = snap_dir.path().join("state.snap"); + + // Baseline cursor, then the first wave of writes. + let base = writer.put("baseline", b"x").await.expect("put baseline"); + let baseline = WatchCursor::from_u64(base.as_u64().expect("u64 rev")); + writer.put("node.a", b"1").await.expect("put a"); + let b_rev = writer + .put("node.b", b"2") + .await + .expect("put b") + .as_u64() + .expect("u64 rev"); + + // --- Run 1: apply {a, b}, checkpoint the snapshot, shut down. --- + let writer1 = SnapshotWriter::open(&snap_path, u64::MAX).expect("open snapshot"); + let (task, mut applied_rx, _cur_rx, sd_tx) = + spawn_applied(watcher.clone(), Some(baseline), Some(writer1), put_key); + + let first = collect_applied(&mut applied_rx, 2).await; + assert_eq!(first, vec!["node.a", "node.b"]); + sd_tx.send(true).expect("shutdown run 1"); + let cursor1 = task.await.expect("join 1").expect("run 1 ok"); + assert_eq!( + cursor1.as_u64(), + Some(b_rev), + "run 1 applied through node.b" + ); + + // The on-disk snapshot must carry the applied cursor and applied state — the + // checkpoint is written at the post-apply cursor, never ahead of it. + let loaded = snapshot::load(&snap_path) + .expect("load snapshot") + .expect("snapshot present"); + assert_eq!( + loaded.cursor.as_u64(), + Some(b_rev), + "snapshot cursor equals the applied cursor" + ); + // The snapshot holds exactly what the combinator received — the post-cursor + // delta {node.a, node.b}. `baseline` is the resume cursor itself and is + // excluded (resume is exclusive of the cursor revision), so it never reaches + // the snapshot log. + let mut snap_keys: Vec<&str> = loaded.entries.keys().map(String::as_str).collect(); + snap_keys.sort_unstable(); + assert_eq!(snap_keys, vec!["node.a", "node.b"]); + + // --- Between runs: more writes land while nothing is watching. --- + writer.put("node.c", b"3").await.expect("put c"); + let d_rev = writer + .put("node.d", b"4") + .await + .expect("put d") + .as_u64() + .expect("u64 rev"); + + // --- Run 2: resume from the snapshot cursor. Only the delta {c, d} may + // appear — a and b must NOT be re-applied (cursor1 is exclusive), and + // nothing in the gap may be skipped. --- + let (task2, mut applied_rx2, _cur_rx2, sd_tx2) = + spawn_applied(watcher, Some(loaded.cursor), None, put_key); + + let delta = collect_applied(&mut applied_rx2, 2).await; + assert_eq!( + delta, + vec!["node.c", "node.d"], + "resume replays exactly the post-cursor delta, in order" + ); + sd_tx2.send(true).expect("shutdown run 2"); + let cursor2 = task2.await.expect("join 2").expect("run 2 ok"); + assert_eq!( + cursor2.as_u64(), + Some(d_rev), + "run 2 applied through node.d" + ); +} + +/// An update that `parse` rejects (here: a non-`node.` key) is still received, +/// so the cursor must advance past it even though nothing is applied for it. +#[tokio::test] +async fn applied_advances_cursor_over_rejected_updates() { + let nats = TestNats::start().await; + let (_conn, store) = nats.store("applied-reject").await; + let writer = store.writer().expect("writer"); + let watcher = store.watcher().expect("watcher"); + + let base = writer.put("baseline", b"x").await.expect("put baseline"); + let baseline = WatchCursor::from_u64(base.as_u64().expect("u64 rev")); + + // node.a is applied; other.x is rejected by `node_put_key` but lands at a + // higher revision — the cursor must still reach it. + writer.put("node.a", b"1").await.expect("put a"); + let rejected_rev = writer + .put("other.x", b"junk") + .await + .expect("put other") + .as_u64() + .expect("u64 rev"); + + let (task, mut applied_rx, mut cursor_rx, sd_tx) = + spawn_applied(watcher, Some(baseline), None, node_put_key); + + // Exactly one update is applied: node.a. + let got = collect_applied(&mut applied_rx, 1).await; + assert_eq!(got, vec!["node.a"]); + + // Wait until a checkpoint reports the cursor at-or-past the rejected entry. + // It arrives because every *received* update bumps the high-water, applied + // or not. + let reached = timeout(Duration::from_secs(5), async { + while let Some(rev) = cursor_rx.recv().await { + if rev >= rejected_rev { + return true; + } + } + false + }) + .await + .expect("timed out waiting for cursor to pass the rejected update"); + assert!(reached, "cursor advanced past the rejected update"); + + sd_tx.send(true).expect("shutdown"); + let cursor = task.await.expect("join").expect("ok"); + assert_eq!( + cursor.as_u64(), + Some(rejected_rev), + "final cursor covers the rejected (un-applied) update" + ); +} + +/// A resume cursor compacted out of the stream must not strand the combinator: +/// it keeps working and applies subsequent updates (via the CursorExpired → +/// full-watch fallback when the backend reports expiry, or by transparently +/// resuming from the earliest retained revision when it doesn't). Either way, +/// the contract is "no error, updates still flow" — same as +/// `watch_from_compacted_cursor_does_not_spuriously_fail`. +#[tokio::test] +async fn applied_survives_compacted_resume_cursor() { + let nats = TestNats::start().await; + let conn = nats.connect().await; + // history=1: each re-put of a key purges the prior revision, pushing the + // stream's first sequence past old cursors. + let store = conn + .store_with_config(StoreConfig { + name: "applied-compacted".into(), + max_history: Some(1), + max_bytes: Some(1024 * 1024), + ..Default::default() + }) + .await + .expect("open store"); + let writer = store.writer().expect("writer"); + let watcher = store.watcher().expect("watcher"); + + for i in 0..6u8 { + writer.put("node.k", &[i]).await.expect("put k"); + } + + // Cursor at revision 1 — long since compacted away. + let stale = WatchCursor::from_u64(1); + let (task, mut applied_rx, _cur_rx, sd_tx) = + spawn_applied(watcher, Some(stale), None, node_put_key); + + // The fallback watch is DeliverPolicy::New, so it only sees *fresh* writes. + // Re-put node.k on a retry loop until one is applied — that proves the + // combinator recovered from the stale cursor and is streaming live. + let applied = timeout(Duration::from_secs(10), async { + loop { + writer.put("node.k", b"live").await.expect("put live"); + if let Ok(Some(key)) = timeout(Duration::from_millis(250), applied_rx.recv()).await { + return key; + } + } + }) + .await + .expect("combinator never applied an update after a compacted resume cursor"); + assert_eq!(applied, "node.k"); + + sd_tx.send(true).expect("shutdown"); + let cursor = task.await.expect("join").expect("watch_applied ok"); + assert!( + cursor.as_u64().is_some(), + "combinator returned a live cursor after recovering from compaction" + ); +}