Skip to content

beyondoss/slipstream

Repository files navigation

Slipstream

Trait-based KV abstraction over NATS JetStream. Read, write, and watch distributed config. A cursor tracks position in the watch stream so services resume from the delta after any restart.

[dependencies]
beyond-slipstream = "0.2"

The crate core is pure-Rust. On-disk snapshot backends are opt-in cargo features (no C toolchain is pulled unless you enable one):

beyond-slipstream = { version = "0.2", features = ["fjall"] } # on-disk SnapshotStore for large folds

Concepts

Term What It Is
Connection NATS connection lifecycle + store factory
KvStore Named bucket. Vends reader, watcher, writer
KvReader Point-in-time reads: get, entry, keys, scan
KvWatcher Live update stream via channel
KvWriter Write, soft-delete, CAS (create, update, delete_with_version)
WatchCursor Opaque position in a watch stream. Save it; pass it back on reconnect
VersionToken Opaque version — NATS: u64 revision; FDB: 10-byte versionstamp
KvEntry One key + value + version from a read
KvUpdate One watch event: Put, Delete, or Purge
Snapshot Deduplicated KV state + cursor at a point in time. Disk cache, not source of truth
SnapshotWriter Append-only log of KvUpdates; survives restarts without a full NATS scan
SnapshotStore Trait: the durable-fold contract — apply (data + cursor, atomically), load, get, range
AppendLogSnapshot Default SnapshotStore: the append-only log + an in-RAM fold (pure-Rust, small state)
FjallSnapshot On-disk SnapshotStore for folds too large for RAM; queryable (feature = "fjall")
watch_applied Watch loop that advances the cursor only after your apply returns, folding into any SnapshotStore
ConnectionCapabilities Feature flags for runtime branching (CAS, streaming watch, global ordering)

Usage

Connect

use slipstream::{Connection, NatsConnection, NatsConnectionConfig};

let conn = NatsConnection::new(NatsConnectionConfig {
    url: "nats://localhost:4222".into(),
    creds: None,
    creds_file: None,
});
conn.connect().await?;

Open a store

use slipstream::{StoreConfig, StorageType};
use std::time::Duration;

let store = conn.store_with_config(StoreConfig {
    name: "nodes".into(),
    storage: StorageType::Persistent,
    max_bytes: Some(512 * 1024 * 1024), // required by Synadia Cloud
    max_history: Some(1),
    max_age: Some(Duration::from_secs(30 * 24 * 3600)),
    num_replicas: Some(3), // HA clusters
    ..Default::default()
}).await?;

max_bytes is required on Synadia Cloud. Omit only for self-hosted NATS.

Read

use slipstream::KvReader;

let reader = store.reader();

// Single key — filters tombstones; use entry() to include them for CAS
if let Some(entry) = reader.get("node.us-east-1").await? {
    println!("{}: {:?}", entry.key, entry.version);
}

// All entries under prefix
// Uses DeliverPolicy::LastPerSubject: one NATS consumer, not N round-trips.
let entries = reader.scan("node.").await?;

// Key names only (no value transfer)
let keys = reader.keys("node.").await?;

Write

use slipstream::KvWriter;

let writer = store.writer().expect("store is writable");

// Unconditional write
let version = writer.put("node.us-east-1", &payload).await?;

// Create only. Returns AlreadyExists if key has a live value.
let version = writer.create("lock.migration", &payload).await?;

// CAS update. Returns RevisionMismatch if version doesn't match.
let new_version = writer.update("node.us-east-1", &payload, &version).await?;

// CAS delete. Returns RevisionMismatch on conflict.
writer.delete_with_version("node.us-east-1", &version).await?;

// Best-effort delete — returns Ok(true) even if key didn't exist.
writer.delete("node.us-east-1").await?;

Watch

use slipstream::{KvUpdate, KvWatcher};

let watcher = store.watcher().expect("store supports streaming");
let (tx, mut rx) = tokio::sync::mpsc::channel(128);

// watch_all blocks until the stream ends — run it in a separate task
tokio::spawn(async move {
    watcher.watch_all(tx).await.unwrap();
});

while let Some(update) = rx.recv().await {
    match update {
        KvUpdate::Put(entry) => { /* ... */ }
        KvUpdate::Delete { key, version } => { /* ... */ }
        KvUpdate::Purge { key, version } => { /* ... */ }
    }
}

Dropping rx cancels the watch. The watcher task exits and unsubscribes automatically.

Resumable watch

The cursor is a sequence number. Persist it; pass it back on reconnect. NATS delivers only the delta since that position.

let cursor = load_cursor().unwrap_or(WatchCursor::none());

match watcher.watch_all_from(&cursor, tx.clone()).await {
    Ok(()) => {}
    Err(KvError::CursorExpired) => {
        // NATS compacted past the cursor. Full replay required.
        watcher.watch_all(tx).await?;
    }
    Err(e) => return Err(e.into()),
}

watch_prefix_from() works the same way for prefix-filtered streams.

Snapshot

For services that cache KV state locally, the snapshot persists both state and cursor to disk. On restart, load the snapshot and resume the watch from its cursor — only the delta since the last checkpoint arrives from NATS.

Startup

use slipstream::snapshot;

if let Some(snap) = snapshot::load(Path::new("/var/lib/svc/state.snap"))? {
    for (key, entry) in snap.entries {
        cache.insert(key, entry.value);
    }
    watcher.watch_all_from(&snap.cursor, tx).await?;
} else {
    watcher.watch_all(tx).await?;
}

Runtime

use slipstream::snapshot::SnapshotWriter;

let mut snap = SnapshotWriter::open(
    Path::new("/var/lib/svc/state.snap"),
    10 * 1024 * 1024, // compact after 10MB of appended records
)?;

while let Some(update) = rx.recv().await {
    cache.apply(&update);
    snap.write_update(&update); // buffered, no I/O

    // checkpoint() flushes + syncs to disk; returns true when compaction is due
    if snap.checkpoint(&current_cursor)? {
        // compact() is blocking I/O; run via spawn_blocking in async contexts
        tokio::task::spawn_blocking(move || snap.compact()).await??;
    }
}

This loop has a trap: current_cursor must track what cache.apply() has consumed, not what rx.recv() delivered. Get it wrong and a crash skips updates on resume. watch_applied runs this loop for you with that invariant enforced.

The snapshot is a cache. Delete it and the service falls back to full replay on next start.

File format

Header:  b"PGSS" ++ version:u16le
Record:  crc32:u32le ++ type:u8 ++ payload
Record type Byte Payload
REC_PUT 0x01 key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version_bytes
REC_DELETE 0x02 key_len:u16le ++ key ++ ver_len:u8 ++ version_bytes
REC_CURSOR 0x03 cursor_len:u8 ++ cursor bytes

version_bytes is the raw [VersionToken] bytes (≤10), not a fixed u64, so NATS revisions (8 bytes) and FDB versionstamps (10 bytes) both round-trip intact.

A truncated final record (crash mid-write) is discarded; earlier records are intact. A CRC failure mid-file returns SnapshotError::Corrupted.

Pluggable backends

The durable fold is a trait, [SnapshotStore], so a consumer picks where its fold lives. The contract is small — apply a batch and advance the cursor atomically, resume from the cursor on restart, and query the result:

pub trait SnapshotStore: Sized + Send {
    fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>;
    fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>;
    fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError>;
    fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError>;
}

Every backend keeps the same invariants: the fold is a pure function of the log (delete the store, replay from the cursor, get identical state), the cursor never names a revision whose data isn't durable (cursor-after-apply), and the store is a cache — a tail lost to power loss is rebuilt by resuming the watch.

Backend When Notes
AppendLogSnapshot Default. Fold fits in RAM (edge/tunnel-style services) Pure-Rust, the append-only log above plus an in-RAM map serving get/range. No extra dependencies.
FjallSnapshot Fold too large for RAM (e.g. routing at ~1B keys) On-disk fjall LSM, feature = "fjall". Each apply is one atomic batch (data and cursor); durability (NO_SYNC vs fsync) is configurable.

Pick a backend, then hand it to watch_appliedload returns the resume cursor alongside the store:

use slipstream::{AppendLogSnapshot, SnapshotStore};

// Default in-RAM backend:
let (resume, store) = AppendLogSnapshot::load(Path::new("/var/lib/svc/state.snap"))?;

// Or, behind `feature = "fjall"`, an on-disk fold for a large consumer:
// let (resume, store) = FjallSnapshot::open(dir, FjallConfig { sync: false })?;

let final_cursor = watch_applied(
    watcher, WatchScope::All, Some(resume), Some(store), BatchConfig::default(),
    parse, apply, on_applied, shutdown,
).await?;

The trait stops at durable fold + cursor + query. Serving structures built from the fold (routing rings, hashrings, indexes) live in the consumer — query them out of the store with get/range. A consumer with a different engine can implement SnapshotStore itself; the rest of slipstream is unchanged.

Applied watch

watch_applied drives the watch-batch-apply-checkpoint loop and enforces one rule the hand-rolled version can't: the cursor advances only after your apply returns, never on receipt. It is generic over the SnapshotStore backend, so the consumer chooses where the durable fold lives (or None to run without persistence).

use slipstream::{watch_applied, AppendLogSnapshot, BatchConfig, KvUpdate, WatchCursor, WatchScope};

let final_cursor = watch_applied(
    watcher,
    WatchScope::All,                  // or WatchScope::Prefix("node.".into())
    Some(resume),                     // Option<WatchCursor> — resume here, or None
    Some(store),                      // any SnapshotStore (e.g. AppendLogSnapshot), or None
    BatchConfig::default(),           // 10ms window, 100 updates per batch
    |update: &KvUpdate| parse(update),        // KvUpdate -> Option<U>; None just drops it
    |batch: Vec<U>| cache.apply_batch(batch), // your only domain logic
    |cursor: WatchCursor| persist(cursor),    // fires after apply returns
    shutdown,                                 // tokio::sync::watch::Receiver<bool>
).await?;

A batch closes when window elapses or it hits max updates, whichever comes first. Then, in order: apply(batch) runs to completion, the cursor advances to the batch's highest revision, the batch + cursor are folded into the store atomically (on a blocking task), and on_applied fires.

Persist the cursor on receipt instead and a crash between receive and apply loses data: the cursor reads "caught up to rev N" while rev N sits in an unapplied buffer, and the next resume starts past it. watch_applied checkpoints at the applied cursor, so a persisted cursor always means every update up to it has been applied.

  • parse returning None (corrupt bytes, irrelevant key) still advances the cursor — nothing to apply means nothing to skip.
  • On CursorExpired, it falls back to a full watch automatically.
  • It returns the final applied cursor on shutdown or stream close.

apply runs inline. If it panics, the panic aborts the watch.

NATS mapping

Concept NATS primitive
Store JetStream KV bucket (KV_{name} stream)
VersionToken Per-key revision (u64, big-endian)
WatchCursor NATS revision at last checkpoint
delete() Writes empty value (soft delete). Always returns Ok(true)
KvUpdate::Purge Hard delete: all history removed from stream
scan() DeliverPolicy::LastPerSubject: one entry per key, one consumer
watch_prefix() Native NATS subject filter ({prefix}> wildcard)

Feature detection

let caps = conn.capabilities();

if caps.cas             { /* safe to call create/update/delete_with_version */ }
if caps.streaming_watch { /* watcher() is Some */ }
if caps.prefix_watch    { /* watch_prefix() uses a server-side filter */ }
if caps.global_ordering { /* VersionToken is globally ordered across keys */ }

Errors

Error Cause Recovery
NotConnected Operation before connect() Call connect()
AlreadyExists create() on a live key Read current state, decide
RevisionMismatch CAS conflict on update() / delete_with_version() Re-read, retry
CursorExpired watch_*_from() cursor compacted by NATS Fall back to watch_all()
WatchError NATS stream dropped Re-subscribe

Credentials

Priority order, first match wins:

  1. creds: base64-encoded .creds content (containers, ECS)
  2. creds_file: path to .creds on disk (bare-metal, local dev)
  3. URL-embedded user:pass@host
  4. No auth

About

Live distributed config. A sequence number is all you need to catch up.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Contributors

Languages