Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ watch_all_from(cursor, tx)
| `KvUpdate` | One watch event: `Put`, `Delete`, or `Purge` | Not a read result; carries deletes too |
| `Snapshot` | Deduplicated KV state + cursor persisted to disk | Not the source of truth; a cache of NATS |
| `SnapshotWriter` | Append-only log of `KvUpdate`s; no in-memory state beyond a counter | Not the in-memory cache itself |
| `watch_applied` | Combinator: batch → apply → *then* advance cursor/checkpoint | Not a raw watch; the cursor follows `apply`, not receipt |
| `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, …) | Not enforced; purely advisory |

## Layer Architecture
Expand All @@ -93,6 +94,11 @@ watch_all_from(cursor, tx)
│ SnapshotWriter │ load() │ compact_to_file() │
│ (append-only CRC log, tempfile+rename compact) │
└─────────────────────────────────────────────────────────────┘
applied.rs (combinator over KvWatcher + snapshot)
┌──────────────────────────────────────────────────────────────┐
│ watch_applied(): batch → apply → advance cursor/checkpoint │
│ (cursor-after-apply; the safe default for resumable watch) │
└──────────────────────────────────────────────────────────────┘
```

## Core Mechanism
Expand All @@ -117,6 +123,28 @@ match watcher.watch_all_from(&snap.cursor, tx).await {
}
```

### Applied-Cursor Watch (`watch_applied`)

The resumable watch above hands the caller raw machinery — a channel of `KvUpdate`s, a cursor, a snapshot writer — and trusts each one to hand-roll the loop that batches updates, applies them, and advances the cursor. Every hand-rolled instance got the same step wrong: it advanced the cursor on **receipt** of an update (`high_water = rev` at `rx.recv()`) and applied the batch afterward. The combinator `watch_applied` exists to encode the correct discipline once.

**The resume guarantee.** This library's contract is "resume from a sequence number after any restart." `watch_applied` sharpens that into a single invariant:

> A persisted/reported cursor `C` ⟹ every update with revision ≤ `C` has been **applied** — the caller's `apply()` has returned for it.

The cursor is written from `apply()`'s completion, never from the channel's delivery. Concretely, on each flush the combinator runs `apply(batch)` to completion, *then* sets `cursor = batch_high`, *then* checkpoints the snapshot at that cursor, *then* fires `on_applied`. Nothing advances the cursor before `apply` returns.

**Why receipt is the wrong signal.** Bumping the cursor at `rx.recv()` and applying later opens a crash window: the persisted cursor claims "caught up to rev N" while rev N still sits in an unapplied batch buffer (or in flight to a separate apply task). On crash+resume the watch re-arms at `cursor+1`, *past* the unapplied rev N, and silently skips it. The data is gone with no error — a hole in the exact guarantee the crate advertises.

This is the lesson of Saltzer, Reed & Clark, *End-to-End Arguments in System Design* (1984): a checkpoint placed below the endpoint — here, at the transport's delivery rather than at the application of the update — can only ever be a performance hint, never a correctness guarantee. The "it happened" property can only be established at the endpoint that actually performs the work, so the cursor is sourced from `apply`, not from `recv`. The cursor-as-monotonic-index shape itself is the HashiCorp Consul anti-entropy / blocking-query lineage: a client re-arms its watch from the last index it *reconciled*, never from the index it merely *saw*.

**Cursor authority covers rejected entries.** `batch_high` tracks the highest revision *received* since the last flush, including updates that `parse` rejected (corrupt bytes, irrelevant keys). A rejected entry is still "nothing to apply," so it is covered by the cursor — and because NATS delivers in revision order, advancing to the max revision after one atomic `apply` is sound: having seen the max means every revision below it has been seen too. Without this, a run of irrelevant keys would pin the cursor in place and force redundant replay on every restart.

**Snapshot consistency.** Raw `KvUpdate`s stream to the snapshot log as they arrive, but the *checkpoint* cursor is the post-apply cursor. A crash after a raw record is written but before its `apply`/checkpoint leaves the log holding data *ahead* of its cursor — which is safe: the cursor never names a revision whose `apply` had not returned, so resume re-delivers and re-applies that tail rather than skipping it. Compaction runs off the hot path via `spawn_blocking`, as everywhere else in the snapshot subsystem.

**Flush triggers.** A batch flushes when any of these fires: the `window` elapses, `batch.len()` reaches `config.max`, a shutdown is signalled, or the channel closes with a pending batch (the remainder is flushed before returning). On `CursorExpired` from the resume path the combinator logs and falls back to the full-scope watch (`watch_all` / `watch_prefix`); v1 replays the full re-list as a stream of puts (a deeper "resync" signal that diffs against prior state is a documented TODO).

This is the layer the tunnel router (swap route table) and edge origin watcher (rebuild hashrings) both collapse onto: `parse` extracts the domain registration, `apply` swaps the live state, `on_applied` persists the cursor.

### scan() and keys() via Ephemeral Push Consumer

Both use `DeliverPolicy::LastPerSubject` — one ephemeral push consumer delivers the latest value per key in a single streaming operation, rather than N sequential `get()` calls. `keys()` adds `headers_only: true` so no value bytes cross the wire.
Expand Down Expand Up @@ -220,6 +248,10 @@ The double-check pattern in `connect()` guards a concurrent connect race: a seco

## Design Decisions

### Why a `watch_applied` combinator instead of leaving the loop to callers?

The raw `KvWatcher` + `WatchCursor` + `SnapshotWriter` pieces let callers hand-roll the batch/apply/advance loop — and every known caller advanced the cursor on *receipt* rather than after *apply*, silently skipping un-applied updates on crash+resume. That is a footgun in the library's core guarantee, not a caller bug to be fixed N times. Encoding cursor-after-apply once, behind a combinator that callers can't get wrong, is cheaper and safer than documentation. `apply` stays the only domain logic; the cursor/snapshot/`on_applied` bookkeeping is the library's. See [Applied-Cursor Watch](#applied-cursor-watch-watch_applied).

### Why KvError: Clone instead of Box<dyn Error>?

A failed connect future may be observed by multiple concurrent callers waiting on a shared result. `Clone` lets the error fan out to N waiters without `Arc`. The cost: `std::io::Error` and `async-nats` error types are not `Clone`, so their structured cause chain is flattened into a pre-rendered `String` at the boundary. The trade-off is explicit: no `#[source]` chain, but the message carries context instead.
Expand Down Expand Up @@ -308,8 +340,9 @@ Checkpoints are frequent (every N watch events). An fsync per checkpoint would a
| `src/stores.rs` | `Connection`, `KvStore`, `StoreConfig`, `StorageType`, `ConnectionCapabilities` |
| `src/nats.rs` | NATS JetStream implementation; bucket creation, scan consumer lifecycle, timeout wrapping, Synadia Cloud workarounds |
| `src/snapshot.rs` | Append-only snapshot log: `SnapshotWriter`, `load()`, `replay_log()`, `compact_to_file()` |
| `src/applied.rs` | `watch_applied` cursor-after-apply combinator: `WatchScope`, `BatchConfig` |
| `src/lib.rs` | Re-exports all public types; no logic |
| `benches/` | Criterion benchmarks for snapshot write/checkpoint/load throughput |
| `benches/` | Criterion benchmarks for snapshot write/checkpoint/load throughput and batch throughput |
| `tests/` | Integration tests (require live NATS) |

## Configuration
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ rust-version = "1.92"

[package]
name = "beyond-slipstream"
version = "0.1.0"
version = "0.2.0"
edition.workspace = true
license.workspace = true
rust-version.workspace = true
Expand All @@ -27,7 +27,7 @@ futures = "0.3"
serde_json = "1"
tempfile = "3"
thiserror = "2"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["macros", "rt", "sync", "time"] }
tracing = "0.1"
url = "2"

Expand All @@ -42,3 +42,7 @@ harness = false
[[bench]]
name = "ack"
harness = false

[[bench]]
name = "applied"
harness = false
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Trait-based KV abstraction over NATS JetStream. Read, write, and watch distribut

```toml
[dependencies]
beyond-slipstream = "0.1"
beyond-slipstream = "0.2"
```

## Concepts
Expand All @@ -22,6 +22,7 @@ beyond-slipstream = "0.1"
| `KvUpdate` | One watch event: `Put`, `Delete`, or `Purge` |
| `Snapshot` | Deduplicated KV state + cursor at a point in time. Disk cache, not source of truth |
| `SnapshotWriter` | Append-only log of `KvUpdate`s; survives restarts without a full NATS scan |
| `watch_applied` | Watch loop that advances the cursor only after your `apply` returns |
| `ConnectionCapabilities` | Feature flags for runtime branching (CAS, streaming watch, global ordering) |

## Usage
Expand Down Expand Up @@ -185,6 +186,8 @@ while let Some(update) = rx.recv().await {
}
```

This loop has a trap: `current_cursor` must track what `cache.apply()` has consumed, not what `rx.recv()` delivered. Get it wrong and a crash skips updates on resume. [`watch_applied`](#applied-watch) runs this loop for you with that invariant enforced.

The snapshot is a cache. Delete it and the service falls back to full replay on next start.

### File format
Expand All @@ -204,6 +207,36 @@ Record: crc32:u32le ++ type:u8 ++ payload

A truncated final record (crash mid-write) is discarded; earlier records are intact. A CRC failure mid-file returns `SnapshotError::Corrupted`.

## Applied watch

`watch_applied` drives the watch-batch-apply-checkpoint loop and enforces one rule the hand-rolled version can't: the cursor advances only after your `apply` returns, never on receipt.

```rust
use slipstream::{watch_applied, BatchConfig, KvUpdate, WatchCursor, WatchScope};

let final_cursor = watch_applied(
watcher,
WatchScope::All, // or WatchScope::Prefix("node.".into())
load_cursor(), // Option<WatchCursor> — resume here, or None
Some(snapshot_writer), // checkpoints at the applied cursor, or None
BatchConfig::default(), // 10ms window, 100 updates per batch
|update: &KvUpdate| parse(update), // KvUpdate -> Option<U>; None just drops it
|batch: Vec<U>| cache.apply_batch(batch), // your only domain logic
|cursor: WatchCursor| persist(cursor), // fires after apply returns
shutdown, // tokio::sync::watch::Receiver<bool>
).await?;
```

A batch closes when `window` elapses or it hits `max` updates, whichever comes first. Then, in order: `apply(batch)` runs to completion, the cursor advances to the batch's highest revision, the snapshot checkpoints at that cursor, and `on_applied` fires.

Persist the cursor on receipt instead and a crash between receive and apply loses data: the cursor reads "caught up to rev N" while rev N sits in an unapplied buffer, and the next resume starts past it. `watch_applied` checkpoints at the applied cursor, so a persisted cursor always means every update up to it has been applied.

- `parse` returning `None` (corrupt bytes, irrelevant key) still advances the cursor — nothing to apply means nothing to skip.
- On `CursorExpired`, it falls back to a full watch automatically.
- It returns the final applied cursor on shutdown or stream close.

`apply` runs inline. If it panics, the panic aborts the watch.

## NATS mapping

| Concept | NATS primitive |
Expand Down
110 changes: 110 additions & 0 deletions benches/applied.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//! Batch throughput for the [`watch_applied`] combinator.
//!
//! Measures the per-update cost of the cursor-after-apply loop — receive off the
//! channel, track the high-water cursor, parse, batch, flush (apply +
//! checkpoint) — with a no-op `apply` and no snapshot, so what's left is the
//! combinator's own batching overhead. A scripted in-process watcher feeds N
//! updates and closes; there is no NATS server in the loop.

use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use async_trait::async_trait;
use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main};
use slipstream::{
BatchConfig, KvEntry, KvError, KvUpdate, KvWatcher, VersionToken, WatchCursor, WatchScope,
watch_applied,
};
use tokio::sync::mpsc::Sender;
use tokio::sync::watch;

/// Delivers a fixed batch of updates, then closes the channel.
struct ScriptedWatcher {
updates: Mutex<Option<Vec<KvUpdate>>>,
}

#[async_trait]
impl KvWatcher for ScriptedWatcher {
async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
let updates = self.updates.lock().unwrap().take().unwrap_or_default();
for u in updates {
if tx.send(u).await.is_err() {
break;
}
}
Ok(())
}

async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
self.watch_all(tx).await
}
}

fn put(i: u64) -> KvUpdate {
KvUpdate::Put(KvEntry {
key: format!("node.region-{i}"),
value: vec![0x42; 256],
version: VersionToken::from_u64(i),
})
}

fn bench_batch_throughput(c: &mut Criterion) {
const N: u64 = 1000;

// Multi-thread runtime so the spawned watch task and the combinator loop run
// on separate threads — the realistic deployment shape.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap();

let template: Vec<KvUpdate> = (0..N).map(put).collect();

let mut group = c.benchmark_group("watch_applied");
group.throughput(Throughput::Elements(N));
group.bench_function("batch_1000_updates", |b| {
b.iter_batched(
// Setup (not measured): fresh scripted watcher with its own copy of
// the updates, since each run drains the channel.
|| {
Arc::new(ScriptedWatcher {
updates: Mutex::new(Some(template.clone())),
})
},
// Measured: run the combinator to completion over all N updates.
|watcher| {
rt.block_on(async move {
let (_sd_tx, sd_rx) = watch::channel(false);
watch_applied(
watcher as Arc<dyn KvWatcher>,
WatchScope::All,
None,
None,
BatchConfig {
window: Duration::from_millis(10),
max: 100,
},
// parse: keep every put.
|u: &KvUpdate| match u {
KvUpdate::Put(_) => Some(()),
_ => None,
},
// apply: no-op — isolate the batching overhead.
|_batch: Vec<()>| {},
|_cursor: WatchCursor| {},
sd_rx,
)
.await
.unwrap()
})
},
BatchSize::SmallInput,
);
});
group.finish();
}

criterion_group!(benches, bench_batch_throughput);
criterion_main!(benches);
5 changes: 4 additions & 1 deletion mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ experimental = true
activate_aggressive = true

[tools]
rust = "1.92"
# `components` pulls clippy + rustfmt into the toolchain; mise installs rust with
# the minimal rustup profile otherwise, and CI's `cargo clippy` step can't find
# the component.
rust = { version = "1.92", components = "clippy,rustfmt" }
yamlfmt = "0.12.1"
dprint = "0.50.2"
"ubi:nats-io/nats-server" = "2.14.1"
Expand Down
Loading
Loading