Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@ jobs:
- name: Run tests
run: cargo test --workspace --exclude wasm-edge

integration-load-chaos:
name: Load & Chaos Tests
runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable

- name: Rust cache
uses: Swatinem/rust-cache@v2
with:
shared-key: "recached-ci-cache"

- name: Run load & chaos tests
run: cargo test -p server-native -- --include-ignored
timeout-minutes: 5

typecheck-js:
name: Typecheck @recached/react and @recached/vue
runs-on: ubuntu-latest
Expand Down
49 changes: 48 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,54 @@ All notable changes to Recached are documented here.

---

## [0.1.4] — 2026-05-10
## [0.1.5] — 2026-05-10

### Added

**`recached-react` package** (new)
- `<RecachedProvider>` — React context provider that initialises the cache and makes it available to the component tree. Accepts `options` (passed to `createCache`) or a pre-built `cache` instance.
- `useRecached()` — returns the `Cache` instance from context; throws if used outside a provider.
- `useKey(key)` — returns `string | null`, re-renders on any mutation from any source (local write, WebSocket fan-out, or BroadcastChannel). Implemented with React 18 `useSyncExternalStore` — concurrent-safe, no tearing.
- `useKeyJSON<T>(key)` — like `useKey` but deserialises the value via `cache.getJSON<T>`.
- `usePubSub(channel, handler)` — subscribes to a pub/sub channel on mount, invokes `handler(msg)` on each message, and cleans up on unmount.

**`recached-vue` package** (new)
- `RecachedPlugin` — Vue 3 plugin (`app.use(RecachedPlugin, options)`) that creates the cache and provides it via `inject`/`provide`.
- `useRecached()` — injects the `Cache` instance; throws if the plugin was not installed.
- `useKey(key)` — returns a `Ref<string | null>` that updates reactively on any mutation.
- `useKeyJSON<T>(key)` — like `useKey` but deserialised via `cache.getJSON<T>`.
- `usePubSub(channel, handler)` — subscribes on call, unsubscribes via `onUnmounted`.

**RESP3 Push frame** (`core-engine`, `server-native`, `wasm-edge`)
- New `Value::Push(Vec<Value>)` variant in the RESP parser/serialiser (`>N\r\n…`). Push frames are unambiguously out-of-band — no heuristic needed to distinguish mutation fan-out from command responses.
- All server-to-WebSocket mutation fan-out and pub/sub messages now use Push frames instead of Array frames.
- `wasm-edge` `connect()` handler pattern-matches on `Value::Push` first; Array frames are ignored (they are command acknowledgements, not mutations).

**Mutation notification bus** (`wasm-edge`, SDK)
- `RecachedCache::set_mutation_callback(cb)` — WASM method that fires `cb()` after every write from any source (local call, WebSocket Push, or BroadcastChannel).
- `RecachedCache::set_message_callback(cb)` — WASM method that fires `cb(channel, message)` on each pub/sub Push frame.
- `Cache.onMutation(cb): () => void` — public SDK method; returns an unsubscribe function.
- `Cache.onMessage(channel, cb): () => void` — pub/sub listener registration; returns an unsubscribe function.

**Auto-failover** (`server-native`)
- New `RECACHED_FAILOVER_TIMEOUT` env var. When set on a replica, it starts a timer the first time the primary becomes unreachable. If the primary is still unreachable after the configured number of seconds, the replica promotes itself to primary and begins accepting writes. The timer resets on successful reconnect, so brief primary restarts do not trigger spurious promotion.

**Replication backpressure** (`server-native`)
- Replica channels switched from `mpsc::UnboundedSender` to bounded `mpsc::Sender`. Channel capacity is controlled by `RECACHED_REPL_BUFFER` (default: `4096` frames). A replica that falls this many writes behind is disconnected and must reconnect from a fresh snapshot — the primary write path is never blocked by a slow replica.

**Persistence hardening** (`core-engine`, `server-native`, `wasm-edge`)
- **Dirty counter** — `KeyValueStore` now tracks a `dirty: Arc<AtomicU64>` write counter. Every successful write command increments it; `save()` resets it to zero. Autosave skips the snapshot entirely when `dirty == 0`, so idle servers produce no disk I/O.
- **Multi-condition save policy (`RECACHED_SAVE`)** — replaces the single-interval `RECACHED_SAVE_INTERVAL` with a Redis-compatible multi-condition format: `"seconds:changes[,seconds:changes...]"`. A snapshot fires when any condition is satisfied (`elapsed >= secs && dirty >= changes`). `RECACHED_SAVE_INTERVAL` still works as a single-condition fallback for backward compatibility. Example: `RECACHED_SAVE="900:1,300:10,60:10000"`.
- **WAL compaction on load** (`wasm-edge`) — when `enable_persistence()` replays more than 1 000 WAL entries, it compacts in-place: clears IndexedDB, then writes the current in-memory state as minimal RESP commands (`SET PX` for string TTLs, `HSET`, `RPUSH`, `SADD`, `ZADD`, plus `PEXPIREAT` for collection TTLs). Next startup replays only N snapshot entries instead of the full write history.

### Changed

- Autosave loop now polls every second against save conditions rather than sleeping for the full interval. This allows multi-condition triggers (e.g. "10 000 writes in 60 s") to fire promptly.
- `ServerState::save()` calls `store.reset_dirty()` after a successful snapshot, ensuring the dirty counter accurately reflects only writes since the last save.

---

## [0.1.4] — 2026-05-09

### Added

Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ resolver = "2"
# ── Single source of truth for all crate versions ────────────────────────────
# Members inherit with: version.workspace = true / edition.workspace = true
[workspace.package]
version = "0.1.4"
version = "0.1.5"
edition = "2024"
license = "MIT"
authors = ["ThinkGrid Labs"]
Expand Down
62 changes: 62 additions & 0 deletions core-engine/src/resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub enum Value {
Integer(i64),
BulkString(Option<Vec<u8>>),
Array(Option<Vec<Value>>),
/// RESP3 Push frame (`>N\r\n...`). Used for server-initiated out-of-band messages
/// (mutation fan-out, pub/sub) on the WebSocket channel. Never sent as a command response.
Push(Vec<Value>),
}

impl Value {
Expand Down Expand Up @@ -46,6 +49,12 @@ impl Value {
buf.extend_from_slice(&v.serialize());
}
}
Value::Push(arr) => {
buf.extend_from_slice(format!(">{}\r\n", arr.len()).as_bytes());
for v in arr {
buf.extend_from_slice(&v.serialize());
}
}
}
buf
}
Expand All @@ -65,6 +74,7 @@ impl Value {
b':' => Self::parse_integer(buffer),
b'$' => Self::parse_bulk_string(buffer),
b'*' => Self::parse_array(buffer, depth),
b'>' => Self::parse_push(buffer, depth),
_ => Err("Invalid RESP type".to_string()),
}
}
Expand Down Expand Up @@ -145,6 +155,34 @@ impl Value {
}
}

fn parse_push(buffer: &[u8], depth: usize) -> Result<(Value, usize), String> {
if depth >= MAX_ARRAY_DEPTH {
return Err("ERR max nesting depth exceeded".to_string());
}
match Self::read_until_crlf(buffer) {
Some((data, mut offset)) => {
let s = String::from_utf8_lossy(data);
let count: u64 = s
.parse()
.map_err(|_| "Invalid push frame length".to_string())?;
if count as usize > MAX_ARRAY_ELEMENTS {
return Err(format!(
"ERR push frame too large ({} > {} elements)",
count, MAX_ARRAY_ELEMENTS
));
}
let mut arr = Vec::with_capacity(count as usize);
for _ in 0..count {
let (val, len) = Self::parse_inner(&buffer[offset..], depth + 1)?;
arr.push(val);
offset += len;
}
Ok((Value::Push(arr), offset))
}
None => Err("Incomplete".to_string()),
}
}

fn parse_array(buffer: &[u8], depth: usize) -> Result<(Value, usize), String> {
if depth >= MAX_ARRAY_DEPTH {
return Err("ERR max nesting depth exceeded".to_string());
Expand Down Expand Up @@ -286,6 +324,30 @@ mod tests {
assert_eq!(consumed, 5); // "+OK\r\n" = 5 bytes
}

#[test]
fn push_round_trip() {
round_trip(&Value::Push(vec![]));
round_trip(&Value::Push(vec![
Value::BulkString(Some(b"SET".to_vec())),
Value::BulkString(Some(b"theme".to_vec())),
Value::BulkString(Some(b"dark".to_vec())),
]));
round_trip(&Value::Push(vec![
Value::BulkString(Some(b"message".to_vec())),
Value::BulkString(Some(b"alerts".to_vec())),
Value::BulkString(Some(b"hello".to_vec())),
]));
}

#[test]
fn push_prefix_distinct_from_array() {
let push = Value::Push(vec![Value::BulkString(Some(b"x".to_vec()))]).serialize();
let arr = Value::Array(Some(vec![Value::BulkString(Some(b"x".to_vec()))])).serialize();
assert_ne!(push, arr);
assert!(push.starts_with(b">"));
assert!(arr.starts_with(b"*"));
}

#[test]
fn null_array_vs_empty_array() {
let null = Value::Array(None).serialize();
Expand Down
22 changes: 22 additions & 0 deletions core-engine/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use rand::seq::IteratorRandom;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};

const WRONGTYPE: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
Expand Down Expand Up @@ -283,6 +284,7 @@ pub struct KeyValueStore {
max_keys: Option<usize>,
max_memory_bytes: Option<usize>,
eviction_policy: EvictionPolicy,
dirty: Arc<AtomicU64>,
}

impl Default for KeyValueStore {
Expand All @@ -298,6 +300,7 @@ impl KeyValueStore {
max_keys: None,
max_memory_bytes: None,
eviction_policy: EvictionPolicy::NoEviction,
dirty: Arc::new(AtomicU64::new(0)),
}
}

Expand All @@ -307,6 +310,7 @@ impl KeyValueStore {
max_keys: Some(max),
max_memory_bytes: None,
eviction_policy: EvictionPolicy::NoEviction,
dirty: Arc::new(AtomicU64::new(0)),
}
}

Expand All @@ -320,9 +324,27 @@ impl KeyValueStore {
max_keys,
max_memory_bytes,
eviction_policy,
dirty: Arc::new(AtomicU64::new(0)),
}
}

/// Number of write commands applied since the last `reset_dirty()`.
pub fn dirty_count(&self) -> u64 {
self.dirty.load(Ordering::Relaxed)
}

/// Reset the dirty counter to zero (call after a successful snapshot save).
pub fn reset_dirty(&self) {
self.dirty.store(0, Ordering::Relaxed);
}

/// Increment the dirty counter by one. Called by the server after every
/// successful write command so the autosave loop can skip saves when
/// nothing has changed.
pub fn mark_dirty(&self) {
self.dirty.fetch_add(1, Ordering::Relaxed);
}

/// Approximate heap usage in bytes — key+value sizes plus a fixed overhead per entry.
pub fn approximate_memory_bytes(&self) -> usize {
self.data
Expand Down
4 changes: 2 additions & 2 deletions docs/guide/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Recached is a good fit when:
## When Recached is not the right fit

- **You need very high-durability persistence.** Recached supports snapshots (RDB-style) and AOF, but it is still primarily an in-memory cache. If you cannot tolerate any data loss between fsync intervals, a purpose-built database is the right tool.
- **You need automatic failover.** Recached supports leader–follower replication (`RECACHED_REPLICAOF`) and manual promotion (`REPLICAOF NO ONE`), but does not include sentinel-style automatic leader election.
- **You need multi-replica consensus failover.** Recached supports leader–follower replication with automatic single-replica failover (`RECACHED_FAILOVER_TIMEOUT`). If the primary is unreachable for the configured duration, the designated replica promotes itself. What it does not include is multi-replica quorum election: in a setup with several replicas, split-brain prevention requires you to designate one replica for auto-failover and keep the others as passive standbys.
- **You depend on uncommon Redis commands.** Recached implements the commands most applications use, not all 250+. Server introspection (`INFO`, `SLOWLOG`, `COMMAND`), Lua scripting, RESP3, and cluster mode are out of scope.
- **You need very large datasets.** Recached is an in-memory cache — it is not a database. If your working set does not fit in RAM, Redis with RDB persistence or a proper database is the right tool.

Expand All @@ -68,7 +68,7 @@ Recached is a good fit when:
| Browser-side cache | Yes — WASM | No |
| WebSocket sync | Built-in | Not built-in |
| Persistence | Snapshot + AOF | RDB + AOF |
| Replication | Primary/replica | Yes (+ Sentinel/Cluster) |
| Replication | Primary/replica + auto-failover | Yes (+ Sentinel/Cluster) |
| Lua scripting | No (WASM scripting on roadmap) | Yes |
| Cluster mode | No | Yes |
| Command coverage | ~80 commands | 250+ |
Expand Down
Loading
Loading