diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87b228c..6b69eef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,6 +39,26 @@ jobs: - name: Run tests run: cargo test --workspace --exclude wasm-edge + integration-load-chaos: + name: Load & Chaos Tests + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Rust cache + uses: Swatinem/rust-cache@v2 + with: + shared-key: "recached-ci-cache" + + - name: Run load & chaos tests + run: cargo test -p server-native -- --include-ignored + timeout-minutes: 5 + typecheck-js: name: Typecheck @recached/react and @recached/vue runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index bd41436..d5d5a43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,54 @@ All notable changes to Recached are documented here. --- -## [0.1.4] — 2026-05-10 +## [0.1.5] — 2026-05-10 + +### Added + +**`recached-react` package** (new) +- `` — React context provider that initialises the cache and makes it available to the component tree. Accepts `options` (passed to `createCache`) or a pre-built `cache` instance. +- `useRecached()` — returns the `Cache` instance from context; throws if used outside a provider. +- `useKey(key)` — returns `string | null`, re-renders on any mutation from any source (local write, WebSocket fan-out, or BroadcastChannel). Implemented with React 18 `useSyncExternalStore` — concurrent-safe, no tearing. +- `useKeyJSON(key)` — like `useKey` but deserialises the value via `cache.getJSON`. +- `usePubSub(channel, handler)` — subscribes to a pub/sub channel on mount, invokes `handler(msg)` on each message, and cleans up on unmount. + +**`recached-vue` package** (new) +- `RecachedPlugin` — Vue 3 plugin (`app.use(RecachedPlugin, options)`) that creates the cache and provides it via `inject`/`provide`. +- `useRecached()` — injects the `Cache` instance; throws if the plugin was not installed. +- `useKey(key)` — returns a `Ref` that updates reactively on any mutation. +- `useKeyJSON(key)` — like `useKey` but deserialised via `cache.getJSON`. +- `usePubSub(channel, handler)` — subscribes on call, unsubscribes via `onUnmounted`. + +**RESP3 Push frame** (`core-engine`, `server-native`, `wasm-edge`) +- New `Value::Push(Vec)` variant in the RESP parser/serialiser (`>N\r\n…`). Push frames are unambiguously out-of-band — no heuristic needed to distinguish mutation fan-out from command responses. +- All server-to-WebSocket mutation fan-out and pub/sub messages now use Push frames instead of Array frames. +- `wasm-edge` `connect()` handler pattern-matches on `Value::Push` first; Array frames are ignored (they are command acknowledgements, not mutations). + +**Mutation notification bus** (`wasm-edge`, SDK) +- `RecachedCache::set_mutation_callback(cb)` — WASM method that fires `cb()` after every write from any source (local call, WebSocket Push, or BroadcastChannel). +- `RecachedCache::set_message_callback(cb)` — WASM method that fires `cb(channel, message)` on each pub/sub Push frame. +- `Cache.onMutation(cb): () => void` — public SDK method; returns an unsubscribe function. +- `Cache.onMessage(channel, cb): () => void` — pub/sub listener registration; returns an unsubscribe function. + +**Auto-failover** (`server-native`) +- New `RECACHED_FAILOVER_TIMEOUT` env var. When set on a replica, it starts a timer the first time the primary becomes unreachable. If the primary is still unreachable after the configured number of seconds, the replica promotes itself to primary and begins accepting writes. The timer resets on successful reconnect, so brief primary restarts do not trigger spurious promotion. + +**Replication backpressure** (`server-native`) +- Replica channels switched from `mpsc::UnboundedSender` to bounded `mpsc::Sender`. Channel capacity is controlled by `RECACHED_REPL_BUFFER` (default: `4096` frames). A replica that falls this many writes behind is disconnected and must reconnect from a fresh snapshot — the primary write path is never blocked by a slow replica. + +**Persistence hardening** (`core-engine`, `server-native`, `wasm-edge`) +- **Dirty counter** — `KeyValueStore` now tracks a `dirty: Arc` write counter. Every successful write command increments it; `save()` resets it to zero. Autosave skips the snapshot entirely when `dirty == 0`, so idle servers produce no disk I/O. +- **Multi-condition save policy (`RECACHED_SAVE`)** — replaces the single-interval `RECACHED_SAVE_INTERVAL` with a Redis-compatible multi-condition format: `"seconds:changes[,seconds:changes...]"`. A snapshot fires when any condition is satisfied (`elapsed >= secs && dirty >= changes`). `RECACHED_SAVE_INTERVAL` still works as a single-condition fallback for backward compatibility. Example: `RECACHED_SAVE="900:1,300:10,60:10000"`. +- **WAL compaction on load** (`wasm-edge`) — when `enable_persistence()` replays more than 1 000 WAL entries, it compacts in-place: clears IndexedDB, then writes the current in-memory state as minimal RESP commands (`SET PX` for string TTLs, `HSET`, `RPUSH`, `SADD`, `ZADD`, plus `PEXPIREAT` for collection TTLs). Next startup replays only N snapshot entries instead of the full write history. + +### Changed + +- Autosave loop now polls every second against save conditions rather than sleeping for the full interval. This allows multi-condition triggers (e.g. "10 000 writes in 60 s") to fire promptly. +- `ServerState::save()` calls `store.reset_dirty()` after a successful snapshot, ensuring the dirty counter accurately reflects only writes since the last save. + +--- + +## [0.1.4] — 2026-05-09 ### Added diff --git a/Cargo.lock b/Cargo.lock index 84760a0..d4e5ec8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,7 +107,7 @@ dependencies = [ [[package]] name = "core-engine" -version = "0.1.4" +version = "0.1.5" dependencies = [ "dashmap", "rand", @@ -975,7 +975,7 @@ dependencies = [ [[package]] name = "server-native" -version = "0.1.4" +version = "0.1.5" dependencies = [ "core-engine", "futures-util", @@ -1384,7 +1384,7 @@ dependencies = [ [[package]] name = "wasm-edge" -version = "0.1.4" +version = "0.1.5" dependencies = [ "core-engine", "getrandom 0.3.4", diff --git a/Cargo.toml b/Cargo.toml index 99c5d79..1283898 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ resolver = "2" # ── Single source of truth for all crate versions ──────────────────────────── # Members inherit with: version.workspace = true / edition.workspace = true [workspace.package] -version = "0.1.4" +version = "0.1.5" edition = "2024" license = "MIT" authors = ["ThinkGrid Labs"] diff --git a/core-engine/src/resp.rs b/core-engine/src/resp.rs index 4613be0..8cdd2c3 100644 --- a/core-engine/src/resp.rs +++ b/core-engine/src/resp.rs @@ -9,6 +9,9 @@ pub enum Value { Integer(i64), BulkString(Option>), Array(Option>), + /// RESP3 Push frame (`>N\r\n...`). Used for server-initiated out-of-band messages + /// (mutation fan-out, pub/sub) on the WebSocket channel. Never sent as a command response. + Push(Vec), } impl Value { @@ -46,6 +49,12 @@ impl Value { buf.extend_from_slice(&v.serialize()); } } + Value::Push(arr) => { + buf.extend_from_slice(format!(">{}\r\n", arr.len()).as_bytes()); + for v in arr { + buf.extend_from_slice(&v.serialize()); + } + } } buf } @@ -65,6 +74,7 @@ impl Value { b':' => Self::parse_integer(buffer), b'$' => Self::parse_bulk_string(buffer), b'*' => Self::parse_array(buffer, depth), + b'>' => Self::parse_push(buffer, depth), _ => Err("Invalid RESP type".to_string()), } } @@ -145,6 +155,34 @@ impl Value { } } + fn parse_push(buffer: &[u8], depth: usize) -> Result<(Value, usize), String> { + if depth >= MAX_ARRAY_DEPTH { + return Err("ERR max nesting depth exceeded".to_string()); + } + match Self::read_until_crlf(buffer) { + Some((data, mut offset)) => { + let s = String::from_utf8_lossy(data); + let count: u64 = s + .parse() + .map_err(|_| "Invalid push frame length".to_string())?; + if count as usize > MAX_ARRAY_ELEMENTS { + return Err(format!( + "ERR push frame too large ({} > {} elements)", + count, MAX_ARRAY_ELEMENTS + )); + } + let mut arr = Vec::with_capacity(count as usize); + for _ in 0..count { + let (val, len) = Self::parse_inner(&buffer[offset..], depth + 1)?; + arr.push(val); + offset += len; + } + Ok((Value::Push(arr), offset)) + } + None => Err("Incomplete".to_string()), + } + } + fn parse_array(buffer: &[u8], depth: usize) -> Result<(Value, usize), String> { if depth >= MAX_ARRAY_DEPTH { return Err("ERR max nesting depth exceeded".to_string()); @@ -286,6 +324,30 @@ mod tests { assert_eq!(consumed, 5); // "+OK\r\n" = 5 bytes } + #[test] + fn push_round_trip() { + round_trip(&Value::Push(vec![])); + round_trip(&Value::Push(vec![ + Value::BulkString(Some(b"SET".to_vec())), + Value::BulkString(Some(b"theme".to_vec())), + Value::BulkString(Some(b"dark".to_vec())), + ])); + round_trip(&Value::Push(vec![ + Value::BulkString(Some(b"message".to_vec())), + Value::BulkString(Some(b"alerts".to_vec())), + Value::BulkString(Some(b"hello".to_vec())), + ])); + } + + #[test] + fn push_prefix_distinct_from_array() { + let push = Value::Push(vec![Value::BulkString(Some(b"x".to_vec()))]).serialize(); + let arr = Value::Array(Some(vec![Value::BulkString(Some(b"x".to_vec()))])).serialize(); + assert_ne!(push, arr); + assert!(push.starts_with(b">")); + assert!(arr.starts_with(b"*")); + } + #[test] fn null_array_vs_empty_array() { let null = Value::Array(None).serialize(); diff --git a/core-engine/src/store.rs b/core-engine/src/store.rs index 31c17e5..4307009 100644 --- a/core-engine/src/store.rs +++ b/core-engine/src/store.rs @@ -5,6 +5,7 @@ use rand::seq::IteratorRandom; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; const WRONGTYPE: &str = "WRONGTYPE Operation against a key holding the wrong kind of value"; @@ -283,6 +284,7 @@ pub struct KeyValueStore { max_keys: Option, max_memory_bytes: Option, eviction_policy: EvictionPolicy, + dirty: Arc, } impl Default for KeyValueStore { @@ -298,6 +300,7 @@ impl KeyValueStore { max_keys: None, max_memory_bytes: None, eviction_policy: EvictionPolicy::NoEviction, + dirty: Arc::new(AtomicU64::new(0)), } } @@ -307,6 +310,7 @@ impl KeyValueStore { max_keys: Some(max), max_memory_bytes: None, eviction_policy: EvictionPolicy::NoEviction, + dirty: Arc::new(AtomicU64::new(0)), } } @@ -320,9 +324,27 @@ impl KeyValueStore { max_keys, max_memory_bytes, eviction_policy, + dirty: Arc::new(AtomicU64::new(0)), } } + /// Number of write commands applied since the last `reset_dirty()`. + pub fn dirty_count(&self) -> u64 { + self.dirty.load(Ordering::Relaxed) + } + + /// Reset the dirty counter to zero (call after a successful snapshot save). + pub fn reset_dirty(&self) { + self.dirty.store(0, Ordering::Relaxed); + } + + /// Increment the dirty counter by one. Called by the server after every + /// successful write command so the autosave loop can skip saves when + /// nothing has changed. + pub fn mark_dirty(&self) { + self.dirty.fetch_add(1, Ordering::Relaxed); + } + /// Approximate heap usage in bytes — key+value sizes plus a fixed overhead per entry. pub fn approximate_memory_bytes(&self) -> usize { self.data diff --git a/docs/guide/introduction.md b/docs/guide/introduction.md index 893c6d4..1b0d520 100644 --- a/docs/guide/introduction.md +++ b/docs/guide/introduction.md @@ -56,7 +56,7 @@ Recached is a good fit when: ## When Recached is not the right fit - **You need very high-durability persistence.** Recached supports snapshots (RDB-style) and AOF, but it is still primarily an in-memory cache. If you cannot tolerate any data loss between fsync intervals, a purpose-built database is the right tool. -- **You need automatic failover.** Recached supports leader–follower replication (`RECACHED_REPLICAOF`) and manual promotion (`REPLICAOF NO ONE`), but does not include sentinel-style automatic leader election. +- **You need multi-replica consensus failover.** Recached supports leader–follower replication with automatic single-replica failover (`RECACHED_FAILOVER_TIMEOUT`). If the primary is unreachable for the configured duration, the designated replica promotes itself. What it does not include is multi-replica quorum election: in a setup with several replicas, split-brain prevention requires you to designate one replica for auto-failover and keep the others as passive standbys. - **You depend on uncommon Redis commands.** Recached implements the commands most applications use, not all 250+. Server introspection (`INFO`, `SLOWLOG`, `COMMAND`), Lua scripting, RESP3, and cluster mode are out of scope. - **You need very large datasets.** Recached is an in-memory cache — it is not a database. If your working set does not fit in RAM, Redis with RDB persistence or a proper database is the right tool. @@ -68,7 +68,7 @@ Recached is a good fit when: | Browser-side cache | Yes — WASM | No | | WebSocket sync | Built-in | Not built-in | | Persistence | Snapshot + AOF | RDB + AOF | -| Replication | Primary/replica | Yes (+ Sentinel/Cluster) | +| Replication | Primary/replica + auto-failover | Yes (+ Sentinel/Cluster) | | Lua scripting | No (WASM scripting on roadmap) | Yes | | Cluster mode | No | Yes | | Command coverage | ~80 commands | 250+ | diff --git a/docs/server/configuration.md b/docs/server/configuration.md index 758f8fe..b3b0a76 100644 --- a/docs/server/configuration.md +++ b/docs/server/configuration.md @@ -12,13 +12,16 @@ Recached is configured entirely through environment variables. There is no confi | `RECACHED_EVICTION` | `noeviction` | Eviction policy when `RECACHED_MAX_KEYS` is reached. See eviction policies below. | | `RECACHED_METRICS_PORT` | `9091` | Port for the Prometheus metrics HTTP server. Metrics are available at `/metrics`. Set to `0` to disable. | | `RECACHED_SAVE_PATH` | `recached.rdb` | Path to the snapshot file. The server loads this file on startup and writes to it on `SAVE`, `BGSAVE`, autosave, and clean shutdown. | -| `RECACHED_SAVE_INTERVAL` | `900` | Autosave interval in seconds. The server automatically saves a snapshot in the background at this interval. Set to `0` to disable autosave (manual `SAVE`/`BGSAVE` still work). | +| `RECACHED_SAVE` | _(none)_ | Multi-condition autosave policy as comma-separated `seconds:changes` pairs. A snapshot is triggered when **any** condition is satisfied: `elapsed_since_last_save >= seconds` **and** `dirty_writes >= changes`. Example: `"900:1,300:10,60:10000"` — save after 1 write in 15 min, 10 writes in 5 min, or 10 000 writes in 1 min. When set, `RECACHED_SAVE_INTERVAL` is ignored. Skips saves when no writes have occurred since the last snapshot. | +| `RECACHED_SAVE_INTERVAL` | `900` | Autosave interval in seconds (single-condition fallback when `RECACHED_SAVE` is not set). The server saves automatically at this interval if at least one write has occurred since the last save. Set to `0` to disable autosave entirely (manual `SAVE`/`BGSAVE` still work). | | `RECACHED_AOF_PATH` | _(disabled)_ | Path to the append-only file. When set, every write command is appended to this file in addition to snapshot saves. On startup the snapshot is loaded first, then AOF commands are replayed for the delta. The AOF is truncated after each successful snapshot save. | | `RECACHED_AOF_SYNC` | `everysec` | AOF fsync policy. `always`: fsync after every write (safest, slowest). `everysec`: fsync once per second (default, good balance). `no`: let the OS decide (fastest, least safe). | | `RECACHED_MAX_CONNECTIONS` | `1024` | Maximum number of concurrent client connections (TCP + WebSocket combined). New connections are dropped when the limit is reached. | | `RECACHED_REPL_PORT` | `6381` | TCP port the primary listens on for incoming replica connections. Only active when `RECACHED_REPLICAOF` is not set (i.e. this server is a primary). | | `RECACHED_REPLICAOF` | _(none)_ | Set to `host:port` to run this server as a read-only replica. On startup it connects to the primary, receives a full snapshot, and then streams all subsequent writes. Reconnects automatically with exponential backoff on disconnect. | | `RECACHED_REPL_PASSWORD` | _(none)_ | Shared secret for the replication channel. When set, replicas must send this password during the handshake before receiving any data. Must match on both primary and replica. Strongly recommended for any network-exposed replication port. | +| `RECACHED_FAILOVER_TIMEOUT` | _(disabled)_ | Seconds a replica waits with the primary unreachable before automatically promoting itself to primary. Set on replicas only. `0` or unset disables auto-failover — the replica reconnects indefinitely. See [auto-failover](#auto-failover) below. | +| `RECACHED_REPL_BUFFER` | `4096` | Per-replica channel capacity (number of pending write frames buffered on the primary before a lagging replica is disconnected). A replica that falls this many writes behind is dropped and must reconnect from a fresh snapshot. Increase if replicas are on a consistently slow or high-latency link; decrease to reduce memory use per replica. | | `RECACHED_MAX_MEMORY` | _(unlimited)_ | Maximum approximate heap usage for the key-value store. Accepts a byte count or a human-readable suffix: `512mb`, `2gb`, `1073741824`. When the limit is exceeded, the background eviction loop runs the configured eviction policy. Has no effect when `RECACHED_EVICTION` is `noeviction`. | | `RECACHED_TLS_CERT` | _(none)_ | Path to a PEM-encoded TLS certificate file. TLS is enabled on both ports when this and `RECACHED_TLS_KEY` are set. | | `RECACHED_TLS_KEY` | _(none)_ | Path to a PEM-encoded TLS private key file. If either `RECACHED_TLS_CERT` or `RECACHED_TLS_KEY` is missing, the server falls back to plain TCP/WS. | @@ -157,8 +160,48 @@ recached-server Replicas reconnect automatically with exponential backoff (2s → 4s → … → 30s cap) if the primary is temporarily unavailable. Write commands sent to a replica return `-READONLY`. +Each replica has an internal write buffer (default 4096 frames, set by `RECACHED_REPL_BUFFER`). If a replica falls that many writes behind — due to a slow network or an overloaded replica host — it is disconnected and must reconnect from a fresh snapshot. This prevents a lagging replica from consuming unbounded memory on the primary or blocking the primary's write path. + To promote a replica to primary at runtime (manual failover), send `REPLICAOF NO ONE` over any RESP connection to the replica. It immediately starts accepting writes. +### With auto-failover {#auto-failover} + +Set `RECACHED_FAILOVER_TIMEOUT` on the replica. If the primary is unreachable for that many seconds, the replica promotes itself automatically without any manual intervention. + +```bash +# Primary +RECACHED_SAVE_PATH="/data/recached.rdb" \ +RECACHED_REPL_PORT="6381" \ +RECACHED_REPL_PASSWORD="repl-secret" \ +recached-server + +# Replica — promotes itself after 30s of primary being unreachable +RECACHED_REPLICAOF="primary-host:6381" \ +RECACHED_REPL_PASSWORD="repl-secret" \ +RECACHED_FAILOVER_TIMEOUT="30" \ +recached-server +``` + +**How the timer works.** The `RECACHED_FAILOVER_TIMEOUT` clock starts the first time a connect attempt fails or the live sync stream drops. It resets to zero as soon as the replica successfully reconnects to the primary. This means a brief primary restart (e.g. a rolling deploy) that completes before the timeout elapses does not trigger promotion. + +**Split-brain risk.** Auto-failover is safe in a single-replica setup. With multiple replicas, two replicas could both time out simultaneously and both promote — creating two independent primaries accepting diverging writes. To avoid this: + +- Use `RECACHED_FAILOVER_TIMEOUT` only on a designated standby replica, not all replicas. +- Keep the timeout long enough (≥ 2× your typical primary restart time) to avoid spurious promotion on routine restarts. +- After a failover event, update clients and other replicas to point at the new primary before bringing the old primary back online. + +### With multi-condition autosave (RECACHED_SAVE) + +Fine-grained save policy that triggers on the first matching condition: + +```bash +RECACHED_SAVE_PATH="/data/recached.rdb" \ +RECACHED_SAVE="900:1,300:10,60:10000" \ +recached-server +``` + +This matches Redis's default save policy: save after 1 write in 15 min, 10 writes in 5 min, or 10 000 writes in 1 min. Saves are skipped automatically when no writes have occurred since the last snapshot, so idle servers incur zero I/O. + ### With snapshot persistence By default the server saves a snapshot every 15 minutes to `recached.rdb` in the working directory. On startup it restores from that file automatically. diff --git a/recached-react/package.json b/recached-react/package.json index 3241662..2f04362 100644 --- a/recached-react/package.json +++ b/recached-react/package.json @@ -1,6 +1,6 @@ { "name": "@recached/react", - "version": "0.1.4", + "version": "0.1.5", "description": "Official React hooks for Recached \u2014 zero-latency reactive cache", "type": "module", "main": "./dist/index.js", diff --git a/recached-vue/package.json b/recached-vue/package.json index 651f314..af06503 100644 --- a/recached-vue/package.json +++ b/recached-vue/package.json @@ -1,7 +1,7 @@ { "name": "@recached/vue", - "version": "0.1.4", - "description": "Official Vue 3 composables for Recached — zero-latency reactive cache", + "version": "0.1.5", + "description": "Official Vue 3 composables for Recached \u2014 zero-latency reactive cache", "type": "module", "main": "./dist/index.js", "module": "./dist/index.js", diff --git a/server-native/src/main.rs b/server-native/src/main.rs index e1409aa..5bb915c 100644 --- a/server-native/src/main.rs +++ b/server-native/src/main.rs @@ -158,6 +158,8 @@ fn execute_and_record(store: &KeyValueStore, cmd: &Command) -> Value { counter!("recached_commands_total", "command" => name).increment(1); if matches!(response, Value::Error(_)) { counter!("recached_command_errors_total", "command" => name).increment(1); + } else if is_write_command(cmd) { + store.mark_dirty(); } if matches!(cmd, Command::Get(_)) { match &response { @@ -376,9 +378,15 @@ async fn replay_aof(store: &KeyValueStore, path: &std::path::Path) -> usize { // ── Replication ─────────────────────────────────────────────────────────────── -type ReplSender = mpsc::UnboundedSender>; +type ReplSender = mpsc::Sender>; type ReplRegistry = Arc>>; +/// Default per-replica channel capacity (number of pending write frames). +/// When a replica falls this many writes behind the primary it is disconnected +/// so it can reconnect and receive a fresh snapshot — the primary write path +/// is never blocked. +const DEFAULT_REPL_CHANNEL_CAPACITY: usize = 4096; + // ── Server state ────────────────────────────────────────────────────────────── struct ServerState { @@ -406,12 +414,22 @@ impl ServerState { } let bytes = resp.as_bytes().to_vec(); let mut reg = self.replicas.lock().await; - reg.retain(|tx| tx.send(bytes.clone()).is_ok()); + reg.retain(|tx| match tx.try_send(bytes.clone()) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Full(_)) => { + warn!( + "Replica fell too far behind (channel full) — disconnecting so it can resync" + ); + false + } + Err(mpsc::error::TrySendError::Closed(_)) => false, + }); } - /// Save snapshot then truncate AOF (snapshot subsumes the log). + /// Save snapshot, reset the dirty counter, then truncate AOF (snapshot subsumes the log). async fn save(&self, store: &KeyValueStore) { save_snapshot(store, &self.snap).await; + store.reset_dirty(); if let Some(aof) = &self.aof { aof.truncate().await; } @@ -468,6 +486,29 @@ fn is_write_command(cmd: &Command) -> bool { ) } +// ── Save conditions ─────────────────────────────────────────────────────────── + +/// A single autosave condition: save if `changes` or more writes have +/// accumulated within `secs` seconds of the last save. +struct SaveCondition { + secs: u64, + changes: u64, +} + +/// Parse `RECACHED_SAVE` value: comma-separated `seconds:changes` pairs. +/// Example: `"900:1,300:10,60:10000"` → save after 1 change in 15 min, +/// 10 changes in 5 min, or 10 000 changes in 1 min — whichever comes first. +fn parse_save_conditions(s: &str) -> Vec { + s.split(',') + .filter_map(|pair| { + let mut parts = pair.trim().splitn(2, ':'); + let secs: u64 = parts.next()?.trim().parse().ok()?; + let changes: u64 = parts.next()?.trim().parse().ok()?; + Some(SaveCondition { secs, changes }) + }) + .collect() +} + // ── Replication server (primary side) ──────────────────────────────────────── async fn run_repl_server( @@ -476,6 +517,7 @@ async fn run_repl_server( snap_cfg: Arc, replicas: ReplRegistry, repl_password: Option>, + repl_channel_capacity: usize, ) { let listener = match TcpListener::bind(format!("0.0.0.0:{}", port)).await { Ok(l) => l, @@ -494,7 +536,16 @@ async fn run_repl_server( let replicas = Arc::clone(&replicas); let pwd = repl_password.clone(); tokio::spawn(async move { - if let Err(e) = handle_replica(socket, store, snap_cfg, replicas, pwd).await { + if let Err(e) = handle_replica( + socket, + store, + snap_cfg, + replicas, + pwd, + repl_channel_capacity, + ) + .await + { info!("Replica {} disconnected: {}", addr, e); } }); @@ -510,6 +561,7 @@ async fn handle_replica( _snap_cfg: Arc, replicas: ReplRegistry, repl_password: Option>, + repl_channel_capacity: usize, ) -> std::io::Result<()> { // 0. Auth handshake — replica must send "\n" before anything else if let Some(pwd) = &repl_password { @@ -531,7 +583,7 @@ async fn handle_replica( } // 1. Register channel first so subsequent writes are buffered - let (tx, mut rx) = mpsc::unbounded_channel::>(); + let (tx, mut rx) = mpsc::channel::>(repl_channel_capacity); replicas.lock().await.push(tx); // 2. Take snapshot and send (writes since snapshot are in channel) @@ -557,22 +609,56 @@ async fn handle_replica( async fn run_repl_client( primary_addr: String, store: Arc, + state: Arc, repl_password: Option, + failover_timeout_secs: Option, ) { let mut backoff_secs = 2u64; + let mut unreachable_since: Option = None; + loop { + // Stop if already promoted (manual REPLICAOF NO ONE or earlier auto-promotion). + if !state.is_replica() { + return; + } + info!("Replica: connecting to primary at {}", primary_addr); match TcpStream::connect(&primary_addr).await { - Err(e) => warn!("Replica: connect failed: {}", e), + Err(e) => { + warn!("Replica: connect failed: {}", e); + unreachable_since.get_or_insert_with(std::time::Instant::now); + } Ok(mut socket) => { + // Primary is reachable — reset the unreachable timer. + unreachable_since = None; backoff_secs = 2; if let Err(e) = sync_from_primary(&mut socket, &store, repl_password.as_deref()).await { warn!("Replica: sync ended: {}", e); + // Sync dropped — primary may be gone; start tracking if not already. + unreachable_since.get_or_insert_with(std::time::Instant::now); } } } + + // Auto-failover: promote if primary has been unreachable long enough. + if let (Some(timeout), Some(since)) = (failover_timeout_secs, unreachable_since) { + let elapsed = since.elapsed().as_secs(); + if elapsed >= timeout { + warn!( + "Replica: primary unreachable for {}s (timeout {}s) — auto-promoting to primary", + elapsed, timeout + ); + state.promote_to_primary(); + return; + } + info!( + "Replica: primary unreachable for {}s / {}s before auto-failover", + elapsed, timeout + ); + } + tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await; backoff_secs = (backoff_secs * 2).min(30); } @@ -627,7 +713,13 @@ async fn sync_from_primary( match Value::parse(&cmd_bytes) { Ok((value, _)) => { - if let Ok(cmd) = Command::from_value(value) { + // Replication frames are broadcast as RESP3 Push (>N\r\n); normalise to + // Array so Command::from_value can parse them. + let normalised = match value { + Value::Push(inner) => Value::Array(Some(inner)), + other => other, + }; + if let Ok(cmd) = Command::from_value(normalised) { store.execute(cmd); } } @@ -847,22 +939,22 @@ fn notify_watchers( fn encode_pubsub_msg(msg: PubSubMsg) -> Vec { match msg { - PubSubMsg::Message { channel, message } => Value::Array(Some(vec![ + PubSubMsg::Message { channel, message } => Value::Push(vec![ Value::BulkString(Some(b"message".to_vec())), Value::BulkString(Some(channel.into_bytes())), Value::BulkString(Some(message.into_bytes())), - ])) + ]) .serialize(), PubSubMsg::PMessage { pattern, channel, message, - } => Value::Array(Some(vec![ + } => Value::Push(vec![ Value::BulkString(Some(b"pmessage".to_vec())), Value::BulkString(Some(pattern.into_bytes())), Value::BulkString(Some(channel.into_bytes())), Value::BulkString(Some(message.into_bytes())), - ])) + ]) .serialize(), } } @@ -894,9 +986,10 @@ fn glob_helper(pat: &[u8], s: &[u8]) -> bool { } } -/// Encodes a list of string parts as a RESP bulk-string array. -fn resp_command(parts: &[&str]) -> String { - let mut s = format!("*{}\r\n", parts.len()); +/// Encodes a list of string parts as a RESP3 Push frame for WebSocket fan-out. +/// Uses `>` prefix so clients can distinguish server-initiated pushes from command responses. +fn resp_push(parts: &[&str]) -> String { + let mut s = format!(">{}\r\n", parts.len()); for part in parts { s.push_str(&format!("${}\r\n{}\r\n", part.len(), part)); } @@ -915,31 +1008,31 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { return None; } match &opts.expiry { - None => Some(resp_command(&["SET", k, v])), + None => Some(resp_push(&["SET", k, v])), Some(SetExpiry::Ex(s)) => { let px = s.saturating_mul(1000).to_string(); - Some(resp_command(&["SET", k, v, "PX", &px])) + Some(resp_push(&["SET", k, v, "PX", &px])) } Some(SetExpiry::Px(ms)) => { let ms_s = ms.to_string(); - Some(resp_command(&["SET", k, v, "PX", &ms_s])) + Some(resp_push(&["SET", k, v, "PX", &ms_s])) } Some(SetExpiry::Exat(ts)) => { let pxat = ts.saturating_mul(1000).to_string(); - Some(resp_command(&["SET", k, v, "PXAT", &pxat])) + Some(resp_push(&["SET", k, v, "PXAT", &pxat])) } Some(SetExpiry::Pxat(ts)) => { let ts_s = ts.to_string(); - Some(resp_command(&["SET", k, v, "PXAT", &ts_s])) + Some(resp_push(&["SET", k, v, "PXAT", &ts_s])) } - Some(SetExpiry::KeepTtl) => Some(resp_command(&["SET", k, v, "KEEPTTL"])), + Some(SetExpiry::KeepTtl) => Some(resp_push(&["SET", k, v, "KEEPTTL"])), } } Command::Del(keys) | Command::Unlink(keys) => { let mut parts: Vec<&str> = vec!["DEL"]; let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&key_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } Command::MSet(pairs) => { let mut parts: Vec<&str> = vec!["MSET"]; @@ -949,75 +1042,75 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { .collect(); let flat_refs: Vec<&str> = flat.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&flat_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } Command::SetNx(k, v) => match response { - Value::Integer(1) => Some(resp_command(&["SET", k, v])), + Value::Integer(1) => Some(resp_push(&["SET", k, v])), _ => None, }, Command::SetEx(k, secs, v) => { let px = secs.saturating_mul(1000).to_string(); - Some(resp_command(&["SET", k, v, "PX", &px])) + Some(resp_push(&["SET", k, v, "PX", &px])) } Command::PSetEx(k, ms, v) => { let ms_s = ms.to_string(); - Some(resp_command(&["SET", k, v, "PX", &ms_s])) + Some(resp_push(&["SET", k, v, "PX", &ms_s])) } Command::Append(k, v) => match response { - Value::Integer(_) => Some(resp_command(&["APPEND", k, v])), + Value::Integer(_) => Some(resp_push(&["APPEND", k, v])), _ => None, }, - Command::GetSet(k, v) => Some(resp_command(&["SET", k, v])), + Command::GetSet(k, v) => Some(resp_push(&["SET", k, v])), Command::Incr(k) | Command::Decr(k) => match response { Value::Integer(n) => { let s = n.to_string(); - Some(resp_command(&["SET", k, &s])) + Some(resp_push(&["SET", k, &s])) } _ => None, }, Command::IncrBy(k, _) | Command::DecrBy(k, _) => match response { Value::Integer(n) => { let s = n.to_string(); - Some(resp_command(&["SET", k, &s])) + Some(resp_push(&["SET", k, &s])) } _ => None, }, Command::Expire(k, secs) => match response { Value::Integer(1) => { let ms = secs.saturating_mul(1000).to_string(); - Some(resp_command(&["PEXPIRE", k, &ms])) + Some(resp_push(&["PEXPIRE", k, &ms])) } _ => None, }, Command::PExpire(k, ms) => match response { Value::Integer(1) => { let ms_s = ms.to_string(); - Some(resp_command(&["PEXPIRE", k, &ms_s])) + Some(resp_push(&["PEXPIRE", k, &ms_s])) } _ => None, }, Command::ExpireAt(k, ts) => match response { Value::Integer(1) => { let ts_ms = ts.saturating_mul(1000).to_string(); - Some(resp_command(&["PEXPIREAT", k, &ts_ms])) + Some(resp_push(&["PEXPIREAT", k, &ts_ms])) } _ => None, }, Command::PExpireAt(k, ts) => match response { Value::Integer(1) => { let ts_s = ts.to_string(); - Some(resp_command(&["PEXPIREAT", k, &ts_s])) + Some(resp_push(&["PEXPIREAT", k, &ts_s])) } _ => None, }, Command::Persist(k) => match response { - Value::Integer(1) => Some(resp_command(&["PERSIST", k])), + Value::Integer(1) => Some(resp_push(&["PERSIST", k])), _ => None, }, - Command::FlushDb => Some(resp_command(&["FLUSHDB"])), + Command::FlushDb => Some(resp_push(&["FLUSHDB"])), Command::Rename(src, dst) => match response { Value::Error(_) => None, - _ => Some(resp_command(&["RENAME", src, dst])), + _ => Some(resp_push(&["RENAME", src, dst])), }, // ── Hash ───────────────────────────────────────────────────────────── @@ -1028,33 +1121,33 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { parts.push(v.clone()); } let refs: Vec<&str> = parts.iter().map(|s| s.as_str()).collect(); - Some(resp_command(&refs)) + Some(resp_push(&refs)) } Command::HDel(k, fields) => match response { Value::Integer(n) if *n > 0 => { let mut parts: Vec<&str> = vec!["HDEL", k]; let field_refs: Vec<&str> = fields.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&field_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } _ => None, }, Command::HIncrBy(k, f, _) => match response { Value::Integer(n) => { let s = n.to_string(); - Some(resp_command(&["HSET", k, f, &s])) + Some(resp_push(&["HSET", k, f, &s])) } _ => None, }, Command::HIncrByFloat(k, f, _) => match response { Value::BulkString(Some(data)) => { let s = String::from_utf8_lossy(data); - Some(resp_command(&["HSET", k, f, &s])) + Some(resp_push(&["HSET", k, f, &s])) } _ => None, }, Command::HSetNx(k, f, v) => match response { - Value::Integer(1) => Some(resp_command(&["HSET", k, f, v])), + Value::Integer(1) => Some(resp_push(&["HSET", k, f, v])), _ => None, }, @@ -1068,7 +1161,7 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { let mut parts: Vec<&str> = vec![cmd_name, k]; let val_refs: Vec<&str> = vals.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&val_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } Command::LPushX(k, vals) | Command::RPushX(k, vals) => match response { Value::Integer(n) if *n > 0 => { @@ -1080,7 +1173,7 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { let mut parts: Vec<&str> = vec![cmd_name, k]; let val_refs: Vec<&str> = vals.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&val_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } _ => None, }, @@ -1090,8 +1183,8 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { _ => { let n = count.map(|c| c.to_string()); match &n { - Some(ns) => Some(resp_command(&["LPOP", k, ns])), - None => Some(resp_command(&["LPOP", k])), + Some(ns) => Some(resp_push(&["LPOP", k, ns])), + None => Some(resp_push(&["LPOP", k])), } } }, @@ -1101,29 +1194,29 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { _ => { let n = count.map(|c| c.to_string()); match &n { - Some(ns) => Some(resp_command(&["RPOP", k, ns])), - None => Some(resp_command(&["RPOP", k])), + Some(ns) => Some(resp_push(&["RPOP", k, ns])), + None => Some(resp_push(&["RPOP", k])), } } }, Command::LSet(k, idx, v) => match response { Value::SimpleString(_) => { let idx_s = idx.to_string(); - Some(resp_command(&["LSET", k, &idx_s, v])) + Some(resp_push(&["LSET", k, &idx_s, v])) } _ => None, }, Command::LRem(k, count, elem) => match response { Value::Integer(n) if *n > 0 => { let count_s = count.to_string(); - Some(resp_command(&["LREM", k, &count_s, elem])) + Some(resp_push(&["LREM", k, &count_s, elem])) } _ => None, }, Command::LTrim(k, start, stop) => { let start_s = start.to_string(); let stop_s = stop.to_string(); - Some(resp_command(&["LTRIM", k, &start_s, &stop_s])) + Some(resp_push(&["LTRIM", k, &start_s, &stop_s])) } // ── Set ─────────────────────────────────────────────────────────────── @@ -1132,7 +1225,7 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { let mut parts: Vec<&str> = vec!["SADD", k]; let m_refs: Vec<&str> = members.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&m_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } _ => None, }, @@ -1141,7 +1234,7 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { let mut parts: Vec<&str> = vec!["SREM", k]; let m_refs: Vec<&str> = members.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&m_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } _ => None, }, @@ -1169,30 +1262,30 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { let mut parts: Vec<&str> = vec!["SREM", k]; let m_refs: Vec<&str> = popped.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&m_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } } Command::SMove(src, dst, member) => match response { - Value::Integer(1) => Some(resp_command(&["SMOVE", src, dst, member])), + Value::Integer(1) => Some(resp_push(&["SMOVE", src, dst, member])), _ => None, }, Command::SInterStore(dst, keys) => { let mut parts: Vec<&str> = vec!["SINTERSTORE", dst]; let k_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&k_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } Command::SUnionStore(dst, keys) => { let mut parts: Vec<&str> = vec!["SUNIONSTORE", dst]; let k_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&k_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } Command::SDiffStore(dst, keys) => { let mut parts: Vec<&str> = vec!["SDIFFSTORE", dst]; let k_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&k_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } // ── Sorted Set ──────────────────────────────────────────────────────── @@ -1215,20 +1308,20 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { parts.push(member.clone()); } let refs: Vec<&str> = parts.iter().map(|s| s.as_str()).collect(); - Some(resp_command(&refs)) + Some(resp_push(&refs)) } Command::ZRem(k, members) => match response { Value::Integer(n) if *n > 0 => { let mut parts: Vec<&str> = vec!["ZREM", k]; let m_refs: Vec<&str> = members.iter().map(|s| s.as_str()).collect(); parts.extend_from_slice(&m_refs); - Some(resp_command(&parts)) + Some(resp_push(&parts)) } _ => None, }, Command::ZIncrBy(k, delta, member) => { let delta_s = format_f64_score(*delta); - Some(resp_command(&["ZINCRBY", k, &delta_s, member])) + Some(resp_push(&["ZINCRBY", k, &delta_s, member])) } // Pub/Sub and transactions carry no store state — no broadcast needed. @@ -1373,10 +1466,25 @@ async fn main() -> Result<(), Box> { let save_path = PathBuf::from( std::env::var("RECACHED_SAVE_PATH").unwrap_or_else(|_| "recached.rdb".to_string()), ); - let save_interval_secs: u64 = std::env::var("RECACHED_SAVE_INTERVAL") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(900); + + // RECACHED_SAVE takes priority: "900:1,300:10,60:10000" (secs:changes pairs). + // Falls back to RECACHED_SAVE_INTERVAL (single-condition, 1 change required). + let save_conditions: Vec = if let Ok(s) = std::env::var("RECACHED_SAVE") { + parse_save_conditions(&s) + } else { + let interval: u64 = std::env::var("RECACHED_SAVE_INTERVAL") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(900); + if interval > 0 { + vec![SaveCondition { + secs: interval, + changes: 1, + }] + } else { + vec![] + } + }; load_snapshot(&store, &save_path).await; @@ -1440,6 +1548,15 @@ async fn main() -> Result<(), Box> { .and_then(|v| v.parse().ok()) .unwrap_or(6381); let repl_password: Option = std::env::var("RECACHED_REPL_PASSWORD").ok(); + let repl_channel_capacity: usize = std::env::var("RECACHED_REPL_BUFFER") + .ok() + .and_then(|v| v.parse().ok()) + .filter(|&n: &usize| n > 0) + .unwrap_or(DEFAULT_REPL_CHANNEL_CAPACITY); + let failover_timeout_secs: Option = std::env::var("RECACHED_FAILOVER_TIMEOUT") + .ok() + .and_then(|v| v.parse().ok()) + .filter(|&n| n > 0); if repl_password.is_some() { info!("Replication auth ENABLED (RECACHED_REPL_PASSWORD is set)."); @@ -1461,24 +1578,33 @@ async fn main() -> Result<(), Box> { }); // ── autosave ────────────────────────────────────────────────────────── - if save_interval_secs > 0 { + if !save_conditions.is_empty() { let store_snap = Arc::clone(&store); let state_snap = Arc::clone(&state); + let conditions = save_conditions; tokio::spawn(async move { - let mut interval = - tokio::time::interval(tokio::time::Duration::from_secs(save_interval_secs)); - interval.tick().await; // skip immediate first tick + let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(1)); + ticker.tick().await; // skip immediate first tick loop { - interval.tick().await; - state_snap.save(&store_snap).await; + ticker.tick().await; + let now = now_unix_secs(); + let last = state_snap.snap.last_save.load(Ordering::Relaxed); + let elapsed = now.saturating_sub(last).max(0) as u64; + let dirty = store_snap.dirty_count(); + if dirty > 0 + && conditions + .iter() + .any(|c| elapsed >= c.secs && dirty >= c.changes) + { + state_snap.save(&store_snap).await; + } } }); + info!("Autosave active → {:?}", snap_cfg.path); + } else { info!( - "Autosave every {}s → {:?}", - save_interval_secs, snap_cfg.path + "Autosave disabled (RECACHED_SAVE=0 or RECACHED_SAVE_INTERVAL=0). Use SAVE or BGSAVE manually." ); - } else { - info!("Autosave disabled (RECACHED_SAVE_INTERVAL=0). Use SAVE or BGSAVE manually."); } // ── start replication ───────────────────────────────────────────────── @@ -1487,16 +1613,28 @@ async fn main() -> Result<(), Box> { let snap_r = Arc::clone(&snap_cfg); let reg_r = Arc::clone(&replicas); let pwd_r = repl_password.clone().map(Arc::new); + let cap_r = repl_channel_capacity; tokio::spawn(async move { - run_repl_server(repl_port, store_r, snap_r, reg_r, pwd_r).await; + run_repl_server(repl_port, store_r, snap_r, reg_r, pwd_r, cap_r).await; }); } else if let Some(primary_addr) = replicaof { let store_r = Arc::clone(&store); + let state_r = Arc::clone(&state); let pwd_r = repl_password.clone(); + let fo_r = failover_timeout_secs; tokio::spawn(async move { - run_repl_client(primary_addr, store_r, pwd_r).await; + run_repl_client(primary_addr, store_r, state_r, pwd_r, fo_r).await; }); - info!("Running as replica — write commands will be rejected"); + if let Some(t) = failover_timeout_secs { + info!( + "Running as replica — auto-failover enabled (promotes after {}s of primary being unreachable)", + t + ); + } else { + info!( + "Running as replica — write commands will be rejected (auto-failover disabled; set RECACHED_FAILOVER_TIMEOUT to enable)" + ); + } } // ── background eviction ─────────────────────────────────────────────── @@ -2326,12 +2464,154 @@ mod tests { use core_engine::cmd::{SetOptions, ZAddOptions}; use core_engine::resp::Value; use core_engine::store::KeyValueStore; - use std::sync::atomic::AtomicI64; + use std::sync::atomic::{AtomicBool, AtomicI64}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; fn tmp_path(name: &str) -> PathBuf { std::env::temp_dir().join(format!("recached_test_{name}_{}", std::process::id())) } + // ── TestServer harness ──────────────────────────────────────────────────── + + struct TestServer { + pub tcp_addr: std::net::SocketAddr, + pub store: Arc, + pub state: Arc, + _task: tokio::task::JoinHandle<()>, + } + + impl Drop for TestServer { + fn drop(&mut self) { + self._task.abort(); + } + } + + async fn spawn_server() -> TestServer { + spawn_server_cfg(None, None, false).await + } + + async fn spawn_server_cfg( + password: Option<&str>, + snap_path: Option, + start_as_replica: bool, + ) -> TestServer { + let store = Arc::new(KeyValueStore::new()); + let (tx, _rx) = broadcast::channel::<(u64, String)>(256); + let pubsub: SharedPubSub = Arc::new(Mutex::new(PubSubHub::new())); + let watch_registry: WatchRegistry = Arc::new(Mutex::new(HashMap::new())); + let semaphore = Arc::new(Semaphore::new(64)); + let snap_cfg = Arc::new(SnapshotConfig { + path: snap_path.unwrap_or_else(|| tmp_path("test.rdb")), + last_save: AtomicI64::new(now_unix_secs()), + }); + let state = Arc::new(ServerState { + snap: snap_cfg, + aof: None, + replicas: Arc::new(tokio::sync::Mutex::new(vec![])), + is_replica: AtomicBool::new(start_as_replica), + }); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let store2 = Arc::clone(&store); + let state2 = Arc::clone(&state); + let pass = Arc::new(password.map(|s| s.to_string())); + + let task = tokio::spawn(async move { + loop { + let Ok((socket, _)) = listener.accept().await else { + return; + }; + let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else { + continue; + }; + let (s, t, p, ps, wr, st) = ( + Arc::clone(&store2), + tx.clone(), + Arc::clone(&pass), + Arc::clone(&pubsub), + Arc::clone(&watch_registry), + Arc::clone(&state2), + ); + tokio::spawn(async move { + handle_tcp(socket, s, t, p, ps, wr, st).await; + drop(permit); + }); + } + }); + + TestServer { + tcp_addr: addr, + store, + state, + _task: task, + } + } + + // ── RespClient ──────────────────────────────────────────────────────────── + + struct RespClient { + stream: TcpStream, + buf: Vec, + filled: usize, + } + + impl RespClient { + async fn connect(addr: std::net::SocketAddr) -> Self { + Self { + stream: TcpStream::connect(addr).await.unwrap(), + buf: vec![0u8; 65536], + filled: 0, + } + } + + async fn cmd(&mut self, args: &[&str]) -> Value { + let mut req = format!("*{}\r\n", args.len()); + for a in args { + req.push_str(&format!("${}\r\n{}\r\n", a.len(), a)); + } + self.stream.write_all(req.as_bytes()).await.unwrap(); + loop { + match Value::parse(&self.buf[..self.filled]) { + Ok((val, n)) => { + self.buf.copy_within(n..self.filled, 0); + self.filled -= n; + return val; + } + Err(ref e) if e == "Incomplete" => { + let n = self + .stream + .read(&mut self.buf[self.filled..]) + .await + .unwrap(); + assert!(n > 0, "server closed connection unexpectedly"); + self.filled += n; + } + Err(e) => panic!("RESP parse error: {e}"), + } + } + } + + async fn read_until_closed(&mut self) { + let mut buf = [0u8; 64]; + while self.stream.read(&mut buf).await.unwrap_or(0) > 0 {} + } + } + + fn ok() -> Value { + Value::SimpleString("OK".to_string()) + } + fn nil() -> Value { + Value::BulkString(None) + } + fn bulk(s: &str) -> Value { + Value::BulkString(Some(s.as_bytes().to_vec())) + } + fn int(n: i64) -> Value { + Value::Integer(n) + } + // ── is_write_command ────────────────────────────────────────────────────── #[test] @@ -2440,4 +2720,648 @@ mod tests { assert_eq!(len_after, 0); let _ = tokio::fs::remove_file(&path).await; } + + // ── Integration: 3a basic commands ─────────────────────────────────────── + + #[tokio::test] + async fn integration_set_get_del() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["SET", "k", "v"]).await, ok()); + assert_eq!(c.cmd(&["GET", "k"]).await, bulk("v")); + assert_eq!(c.cmd(&["GET", "missing"]).await, nil()); + assert_eq!(c.cmd(&["DEL", "k"]).await, int(1)); + assert_eq!(c.cmd(&["GET", "k"]).await, nil()); + assert_eq!(c.cmd(&["DEL", "k"]).await, int(0)); // already gone + } + + #[tokio::test] + async fn integration_incr_and_expiry() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["SET", "n", "10"]).await, ok()); + assert_eq!(c.cmd(&["INCR", "n"]).await, int(11)); + assert_eq!(c.cmd(&["INCRBY", "n", "4"]).await, int(15)); + assert_eq!(c.cmd(&["DECR", "n"]).await, int(14)); + + // TTL: set a key with 1-second expiry and verify TTL and eventual expiry + assert_eq!(c.cmd(&["SET", "ex", "val", "EX", "1"]).await, ok()); + let ttl = c.cmd(&["TTL", "ex"]).await; + assert!(matches!(ttl, Value::Integer(1) | Value::Integer(0))); + tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await; + assert_eq!(c.cmd(&["GET", "ex"]).await, nil()); + } + + #[tokio::test] + async fn integration_string_commands() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + // APPEND + STRLEN + assert_eq!(c.cmd(&["APPEND", "s", "hello"]).await, int(5)); + assert_eq!(c.cmd(&["APPEND", "s", " world"]).await, int(11)); + assert_eq!(c.cmd(&["STRLEN", "s"]).await, int(11)); + + // GETSET + assert_eq!(c.cmd(&["GETSET", "s", "new"]).await, bulk("hello world")); + assert_eq!(c.cmd(&["GET", "s"]).await, bulk("new")); + + // SETNX + assert_eq!(c.cmd(&["SETNX", "nx", "first"]).await, int(1)); + assert_eq!(c.cmd(&["SETNX", "nx", "second"]).await, int(0)); + assert_eq!(c.cmd(&["GET", "nx"]).await, bulk("first")); + + // SETEX + assert_eq!(c.cmd(&["SETEX", "ex", "60", "val"]).await, ok()); + let ttl = c.cmd(&["TTL", "ex"]).await; + assert!(matches!(ttl, Value::Integer(t) if t > 0 && t <= 60)); + + // MSET / MGET + assert_eq!(c.cmd(&["MSET", "a", "1", "b", "2", "c", "3"]).await, ok()); + let got = c.cmd(&["MGET", "a", "b", "c", "missing"]).await; + assert_eq!( + got, + Value::Array(Some(vec![bulk("1"), bulk("2"), bulk("3"), nil()])) + ); + } + + #[tokio::test] + async fn integration_hash_commands() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["HSET", "h", "f1", "v1", "f2", "v2"]).await, int(2)); + assert_eq!(c.cmd(&["HGET", "h", "f1"]).await, bulk("v1")); + assert_eq!(c.cmd(&["HGET", "h", "missing"]).await, nil()); + assert_eq!(c.cmd(&["HLEN", "h"]).await, int(2)); + assert_eq!(c.cmd(&["HDEL", "h", "f1"]).await, int(1)); + assert_eq!(c.cmd(&["HLEN", "h"]).await, int(1)); + // HGETALL returns field-value pairs + let all = c.cmd(&["HGETALL", "h"]).await; + assert_eq!(all, Value::Array(Some(vec![bulk("f2"), bulk("v2")]))); + } + + #[tokio::test] + async fn integration_list_commands() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["RPUSH", "l", "a", "b", "c"]).await, int(3)); + assert_eq!(c.cmd(&["LPUSH", "l", "z"]).await, int(4)); + assert_eq!(c.cmd(&["LLEN", "l"]).await, int(4)); + assert_eq!( + c.cmd(&["LRANGE", "l", "0", "-1"]).await, + Value::Array(Some(vec![bulk("z"), bulk("a"), bulk("b"), bulk("c")])) + ); + assert_eq!(c.cmd(&["LPOP", "l"]).await, bulk("z")); + assert_eq!(c.cmd(&["RPOP", "l"]).await, bulk("c")); + assert_eq!(c.cmd(&["LLEN", "l"]).await, int(2)); + } + + #[tokio::test] + async fn integration_set_commands() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["SADD", "s", "a", "b", "c"]).await, int(3)); + assert_eq!(c.cmd(&["SADD", "s", "a"]).await, int(0)); // duplicate + assert_eq!(c.cmd(&["SCARD", "s"]).await, int(3)); + assert_eq!(c.cmd(&["SISMEMBER", "s", "b"]).await, int(1)); + assert_eq!(c.cmd(&["SISMEMBER", "s", "x"]).await, int(0)); + assert_eq!(c.cmd(&["SREM", "s", "a"]).await, int(1)); + assert_eq!(c.cmd(&["SCARD", "s"]).await, int(2)); + } + + #[tokio::test] + async fn integration_zset_commands() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!( + c.cmd(&["ZADD", "z", "1.5", "a", "2.5", "b", "3.0", "c"]) + .await, + int(3) + ); + assert_eq!(c.cmd(&["ZCARD", "z"]).await, int(3)); + assert_eq!(c.cmd(&["ZSCORE", "z", "b"]).await, bulk("2.5")); + assert_eq!(c.cmd(&["ZRANK", "z", "a"]).await, int(0)); + assert_eq!(c.cmd(&["ZRANK", "z", "c"]).await, int(2)); + assert_eq!( + c.cmd(&["ZRANGE", "z", "0", "-1", "WITHSCORES"]).await, + Value::Array(Some(vec![ + bulk("a"), + bulk("1.5"), + bulk("b"), + bulk("2.5"), + bulk("c"), + bulk("3"), + ])) + ); + assert_eq!(c.cmd(&["ZREM", "z", "b"]).await, int(1)); + assert_eq!(c.cmd(&["ZCARD", "z"]).await, int(2)); + } + + #[tokio::test] + async fn integration_transactions_exec() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["SET", "counter", "10"]).await, ok()); + assert_eq!(c.cmd(&["MULTI"]).await, ok()); + assert_eq!( + c.cmd(&["SET", "counter", "20"]).await, + Value::SimpleString("QUEUED".to_string()) + ); + assert_eq!( + c.cmd(&["INCR", "counter"]).await, + Value::SimpleString("QUEUED".to_string()) + ); + let res = c.cmd(&["EXEC"]).await; + assert_eq!(res, Value::Array(Some(vec![ok(), int(21)]))); + assert_eq!(c.cmd(&["GET", "counter"]).await, bulk("21")); + } + + #[tokio::test] + async fn integration_transactions_discard() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["SET", "key", "original"]).await, ok()); + assert_eq!(c.cmd(&["MULTI"]).await, ok()); + assert_eq!( + c.cmd(&["DEL", "key"]).await, + Value::SimpleString("QUEUED".to_string()) + ); + assert_eq!(c.cmd(&["DISCARD"]).await, ok()); + assert_eq!(c.cmd(&["GET", "key"]).await, bulk("original")); // DEL was discarded + } + + #[tokio::test] + async fn integration_unknown_command() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + let r = c.cmd(&["NOTACOMMAND", "arg"]).await; + assert!(matches!(r, Value::Error(_))); + } + + // ── Integration: 3b auth ────────────────────────────────────────────────── + + #[tokio::test] + async fn integration_auth_blocks_unauthenticated() { + let srv = spawn_server_cfg(Some("secret"), None, false).await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + let r = c.cmd(&["SET", "k", "v"]).await; + assert!(matches!(&r, Value::Error(e) if e.contains("NOAUTH"))); + } + + #[tokio::test] + async fn integration_auth_correct() { + let srv = spawn_server_cfg(Some("secret"), None, false).await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["AUTH", "secret"]).await, ok()); + assert_eq!(c.cmd(&["SET", "k", "v"]).await, ok()); + assert_eq!(c.cmd(&["GET", "k"]).await, bulk("v")); + } + + #[tokio::test] + async fn integration_auth_wrong_password_lockout() { + let srv = spawn_server_cfg(Some("secret"), None, false).await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + // First 4 wrong attempts → "ERR invalid password" + for _ in 0..4 { + let r = c.cmd(&["AUTH", "wrong"]).await; + assert!(matches!(&r, Value::Error(e) if e.contains("invalid"))); + } + // 5th attempt hits MAX_AUTH_FAILURES → "too many" + server disconnects + let r = c.cmd(&["AUTH", "wrong"]).await; + assert!(matches!(&r, Value::Error(e) if e.contains("too many"))); + c.read_until_closed().await; + } + + // ── Integration: 3c persistence ─────────────────────────────────────────── + + #[tokio::test] + async fn integration_save_and_reload() { + let snap = tmp_path("integ_snap.rdb"); + let srv = spawn_server_cfg(None, Some(snap.clone()), false).await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(c.cmd(&["SET", "hello", "world"]).await, ok()); + assert_eq!(c.cmd(&["SET", "foo", "bar"]).await, ok()); + assert_eq!(c.cmd(&["SAVE"]).await, ok()); + + // Load into a fresh store + let store2 = KeyValueStore::new(); + let loaded = load_snapshot(&store2, &snap).await; + assert!(loaded); + assert_eq!( + store2.execute(Command::Get("hello".into())), + Value::BulkString(Some(b"world".to_vec())) + ); + assert_eq!( + store2.execute(Command::Get("foo".into())), + Value::BulkString(Some(b"bar".to_vec())) + ); + let _ = tokio::fs::remove_file(&snap).await; + } + + #[tokio::test] + async fn integration_aof_replay() { + let path = tmp_path("integ_aof.aof"); + let aof = AofWriter::open(path.clone(), AofSync::No).await.unwrap(); + let store = KeyValueStore::new(); + let snap_cfg = Arc::new(SnapshotConfig { + path: tmp_path("integ_aof.rdb"), + last_save: AtomicI64::new(0), + }); + let state = Arc::new(ServerState { + snap: snap_cfg, + aof: Some(Arc::new(aof)), + replicas: Arc::new(tokio::sync::Mutex::new(vec![])), + is_replica: AtomicBool::new(false), + }); + + // Simulate writes captured by AOF + state + .on_write("*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n") + .await; + state + .on_write("*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n") + .await; + if let Some(ref a) = state.aof { + a.flush().await; + } + + // Replay into fresh store + let store2 = KeyValueStore::new(); + let count = replay_aof(&store2, &path).await; + assert_eq!(count, 2); + assert_eq!( + store2.execute(Command::Get("hello".into())), + Value::BulkString(Some(b"world".to_vec())) + ); + drop(store); // suppress unused warning + let _ = tokio::fs::remove_file(&path).await; + } + + #[tokio::test] + async fn integration_dirty_counter() { + let srv = spawn_server().await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + assert_eq!(srv.store.dirty_count(), 0); + + assert_eq!(c.cmd(&["SET", "a", "1"]).await, ok()); + assert_eq!(c.cmd(&["SET", "b", "2"]).await, ok()); + assert_eq!(srv.store.dirty_count(), 2); + + let last_save_before = srv.state.snap.last_save.load(Ordering::Relaxed); + + // Trigger a save — dirty resets to 0 + assert_eq!(c.cmd(&["SAVE"]).await, ok()); + assert_eq!(srv.store.dirty_count(), 0); + + // No new writes → save condition not met → last_save unchanged after 1s + tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await; + let last_save_after = srv.state.snap.last_save.load(Ordering::Relaxed); + assert_eq!(last_save_before, last_save_after); // no autosave fired (no conditions configured) + } + + // ── Integration: 3d replication ─────────────────────────────────────────── + + #[tokio::test] + async fn integration_replica_rejects_writes() { + let srv = spawn_server_cfg(None, None, true).await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + let r = c.cmd(&["SET", "k", "v"]).await; + assert!(matches!(&r, Value::Error(e) if e.contains("READONLY"))); + // Reads still work + assert_eq!(c.cmd(&["GET", "k"]).await, nil()); + } + + #[tokio::test] + async fn integration_replicaof_no_one_promotes() { + let srv = spawn_server_cfg(None, None, true).await; + let mut c = RespClient::connect(srv.tcp_addr).await; + + // Promote + assert_eq!(c.cmd(&["REPLICAOF", "NO", "ONE"]).await, ok()); + // Now writes are accepted + assert_eq!(c.cmd(&["SET", "k", "v"]).await, ok()); + assert_eq!(c.cmd(&["GET", "k"]).await, bulk("v")); + assert!(!srv.state.is_replica()); + } + + #[tokio::test] + async fn integration_replica_receives_write() { + // Spawn primary with a separate replication listener on a random port + let primary = spawn_server().await; + let repl_registry: ReplRegistry = Arc::new(tokio::sync::Mutex::new(vec![])); + let snap_cfg = Arc::clone(&primary.state.snap); + let primary_store = Arc::clone(&primary.store); + let reg = Arc::clone(&repl_registry); + + // Replication listener — binds on port 0 + let repl_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let repl_port = repl_listener.local_addr().unwrap().port(); + tokio::spawn(async move { + while let Ok((socket, _)) = repl_listener.accept().await { + let s = Arc::clone(&primary_store); + let sc = Arc::clone(&snap_cfg); + let r = Arc::clone(®); + tokio::spawn(handle_replica( + socket, + s, + sc, + r, + None, + DEFAULT_REPL_CHANNEL_CAPACITY, + )); + } + }); + + // Also wire the repl_registry into the primary state so on_write fans out + // We can't replace state.replicas (it's private), but handle_replica adds + // itself to the registry it receives. We pass the same repl_registry to + // on_write via a workaround: patch primary state's replicas after the fact + // by passing the same Arc. Since ServerState.replicas is private in our + // TestServer, we re-use the one we created. + // ── Simpler approach: replace on_write path by sharing registry ── + // Instead, wire it through the primary ServerState directly. + // (In practice the TestServer shares state.replicas which starts empty; + // handle_replica will push its sender into it when it connects.) + // The trick: we need primary.state.replicas to point to our repl_registry. + // Since TestServer.state is Arc, we can't replace it. + // Use a fresh primary state that shares our registry. + let primary2 = { + let store = Arc::clone(&primary.store); + let (tx, _rx) = broadcast::channel::<(u64, String)>(256); + let pubsub: SharedPubSub = Arc::new(Mutex::new(PubSubHub::new())); + let wr: WatchRegistry = Arc::new(Mutex::new(HashMap::new())); + let sem = Arc::new(Semaphore::new(64)); + let snap = Arc::clone(&primary.state.snap); + let state = Arc::new(ServerState { + snap, + aof: None, + replicas: Arc::clone(&repl_registry), + is_replica: AtomicBool::new(false), + }); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let store2 = Arc::clone(&store); + let state2 = Arc::clone(&state); + let pass = Arc::new(None::); + let task = tokio::spawn(async move { + loop { + let Ok((socket, _)) = listener.accept().await else { + return; + }; + let Ok(permit) = Arc::clone(&sem).try_acquire_owned() else { + continue; + }; + let (s, t, p, ps, wrr, st) = ( + Arc::clone(&store2), + tx.clone(), + Arc::clone(&pass), + Arc::clone(&pubsub), + Arc::clone(&wr), + Arc::clone(&state2), + ); + tokio::spawn(async move { + handle_tcp(socket, s, t, p, ps, wrr, st).await; + drop(permit); + }); + } + }); + TestServer { + tcp_addr: addr, + store, + state, + _task: task, + } + }; + + // Start replica + let replica_store = Arc::new(KeyValueStore::new()); + let replica_state = Arc::new(ServerState { + snap: Arc::new(SnapshotConfig { + path: tmp_path("repl_snap.rdb"), + last_save: AtomicI64::new(0), + }), + aof: None, + replicas: Arc::new(tokio::sync::Mutex::new(vec![])), + is_replica: AtomicBool::new(true), + }); + let rs = Arc::clone(&replica_store); + let rst = Arc::clone(&replica_state); + let repl_addr = format!("127.0.0.1:{repl_port}"); + tokio::spawn(async move { + run_repl_client(repl_addr, rs, rst, None, None).await; + }); + + // Give replica time to connect and receive initial snapshot + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + + // Write to primary2 (which uses the shared repl_registry) + let mut c = RespClient::connect(primary2.tcp_addr).await; + assert_eq!(c.cmd(&["SET", "replkey", "replval"]).await, ok()); + + // Give replication fan-out time to arrive + tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; + + assert_eq!( + replica_store.execute(Command::Get("replkey".into())), + Value::BulkString(Some(b"replval".to_vec())) + ); + } + + // ── Integration: 3e load (ignored in normal CI) ─────────────────────────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[ignore] + async fn integration_concurrent_writers() { + let srv = Arc::new(spawn_server().await); + let addr = srv.tcp_addr; + + let tasks: Vec<_> = (0..50) + .map(|task_id| { + tokio::spawn(async move { + let mut c = RespClient::connect(addr).await; + for i in 0..100u32 { + let key = format!("t{task_id}_{i}"); + let val = format!("v{i}"); + assert_eq!(c.cmd(&["SET", &key, &val]).await, ok()); + assert_eq!(c.cmd(&["GET", &key]).await, bulk(&val)); + } + }) + }) + .collect(); + + for t in tasks { + t.await.unwrap(); + } + // All 50 × 100 keys should be present + assert_eq!(srv.store.execute(Command::DbSize), Value::Integer(5000)); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[ignore] + async fn integration_connection_limit() { + // Small semaphore: only 3 concurrent connections + let store = Arc::new(KeyValueStore::new()); + let (tx, _rx) = broadcast::channel::<(u64, String)>(16); + let pubsub: SharedPubSub = Arc::new(Mutex::new(PubSubHub::new())); + let watch_registry: WatchRegistry = Arc::new(Mutex::new(HashMap::new())); + let semaphore = Arc::new(Semaphore::new(3)); + let state = Arc::new(ServerState { + snap: Arc::new(SnapshotConfig { + path: tmp_path("conn_limit.rdb"), + last_save: AtomicI64::new(0), + }), + aof: None, + replicas: Arc::new(tokio::sync::Mutex::new(vec![])), + is_replica: AtomicBool::new(false), + }); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let store2 = Arc::clone(&store); + let state2 = Arc::clone(&state); + let pass = Arc::new(None::); + + tokio::spawn(async move { + loop { + let Ok((socket, _)) = listener.accept().await else { + return; + }; + let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else { + // Drop socket immediately — connection limit reached + drop(socket); + continue; + }; + let (s, t, p, ps, wr, st) = ( + Arc::clone(&store2), + tx.clone(), + Arc::clone(&pass), + Arc::clone(&pubsub), + Arc::clone(&watch_registry), + Arc::clone(&state2), + ); + tokio::spawn(async move { + handle_tcp(socket, s, t, p, ps, wr, st).await; + drop(permit); + }); + } + }); + + // Open 3 connections and hold them (just send PING and keep the socket open) + let mut holders = Vec::new(); + for _ in 0..3 { + let mut c = RespClient::connect(addr).await; + assert_eq!( + c.cmd(&["PING"]).await, + Value::SimpleString("PONG".to_string()) + ); + holders.push(c); + } + + // 4th connection: server drops it immediately, so read returns 0 + let mut overflow = TcpStream::connect(addr).await.unwrap(); + let mut buf = [0u8; 64]; + let n = overflow.read(&mut buf).await.unwrap_or(0); + assert_eq!(n, 0, "4th connection should have been closed by server"); + + drop(holders); + } + + // ── Integration: 3f chaos (ignored in normal CI) ────────────────────────── + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[ignore] + async fn integration_kill_primary_mid_write() { + let srv = Arc::new(spawn_server().await); + let addr = srv.tcp_addr; + + // Start 20 concurrent writers + let tasks: Vec<_> = (0..20) + .map(|i| { + tokio::spawn(async move { + // Connect; tolerate connection errors (server may die mid-flight) + let stream = TcpStream::connect(addr).await; + if stream.is_err() { + return; + } + let mut c = RespClient { + stream: stream.unwrap(), + buf: vec![0u8; 65536], + filled: 0, + }; + for j in 0..50u32 { + let key = format!("chaos_{i}_{j}"); + // Ignore errors — server may die during this + let _ = tokio::time::timeout( + tokio::time::Duration::from_millis(200), + c.cmd(&["SET", &key, "v"]), + ) + .await; + } + }) + }) + .collect(); + + // Kill the server after 10ms + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + srv._task.abort(); + + // Join all writers — none should panic + for t in tasks { + let _ = t.await; + } + + // Store is still intact in memory — no panic is the meaningful assertion here; + // zero keys is valid if the server was killed before any write landed. + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[ignore] + async fn integration_failover_promotes() { + // Point replica at a port that refuses connections immediately so the + // unreachable timer starts on the first loop iteration without any + // real primary required. Promotion happens after: + // connect fail (fast) → backoff 2s → connect fail → elapsed ≥ 1s → promote + // so we wait 3s to be safe. + let replica_state = Arc::new(ServerState { + snap: Arc::new(SnapshotConfig { + path: tmp_path("failover_snap.rdb"), + last_save: AtomicI64::new(0), + }), + aof: None, + replicas: Arc::new(tokio::sync::Mutex::new(vec![])), + is_replica: AtomicBool::new(true), + }); + let replica_store = Arc::new(KeyValueStore::new()); + let rs = Arc::clone(&replica_store); + let rst = Arc::clone(&replica_state); + // Bind a listener then immediately drop it so the port is known-refused + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let dead_addr = format!("127.0.0.1:{}", listener.local_addr().unwrap().port()); + drop(listener); + tokio::spawn(async move { + run_repl_client(dead_addr, rs, rst, None, Some(1)).await; + }); + + // Wait for 2 backoff cycles (initial fail + 2s sleep + retry fail → promote) + tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; + + assert!( + !replica_state.is_replica(), + "replica should have promoted after primary was unreachable for >1s" + ); + } } diff --git a/wasm-edge/package.json b/wasm-edge/package.json index 0ce8da7..299115f 100644 --- a/wasm-edge/package.json +++ b/wasm-edge/package.json @@ -1,7 +1,7 @@ { "name": "recached-edge", "description": "Browser and edge WebAssembly client for Recached \u2014 zero-latency local cache with automatic server sync", - "version": "0.1.4", + "version": "0.1.5", "type": "module", "main": "sdk.js", "module": "sdk.js", diff --git a/wasm-edge/src/lib.rs b/wasm-edge/src/lib.rs index c16b028..6590f51 100644 --- a/wasm-edge/src/lib.rs +++ b/wasm-edge/src/lib.rs @@ -1,6 +1,6 @@ use core_engine::cmd::{Command, SetOptions}; use core_engine::resp::Value; -use core_engine::store::KeyValueStore; +use core_engine::store::{KeyValueStore, SnapshotEntry, SnapshotValue}; use js_sys::Promise; use std::cell::{Cell, RefCell}; use std::rc::Rc; @@ -73,6 +73,113 @@ fn to_resp(parts: &[&str]) -> String { s } +fn to_resp_owned(parts: &[String]) -> String { + let refs: Vec<&str> = parts.iter().map(|s| s.as_str()).collect(); + to_resp(&refs) +} + +// ── WAL compaction ──────────────────────────────────────────────────────────── + +/// Compact after this many replayed WAL entries on load — rewrite as minimal +/// snapshot commands so the next replay is fast regardless of write history. +const WAL_COMPACT_THRESHOLD: u32 = 1000; + +fn format_zset_score(s: f64) -> String { + if s == f64::INFINITY { + "inf".into() + } else if s == f64::NEG_INFINITY { + "-inf".into() + } else if s.fract() == 0.0 && s.abs() < 1e15 { + format!("{}", s as i64) + } else { + format!("{}", s) + } +} + +/// Convert snapshot entries into minimal RESP command strings suitable for +/// storing in the WAL. Each entry produces one command; entries with a TTL on +/// collection types produce an extra PEXPIREAT command. +fn snapshot_to_resp_cmds(entries: &[SnapshotEntry]) -> Vec { + use std::time::{SystemTime, UNIX_EPOCH}; + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + let mut out = Vec::new(); + for e in entries { + if e.expires_at_ms.is_some_and(|exp| now_ms >= exp) { + continue; + } + let data_parts: Vec = match &e.value { + SnapshotValue::Str(s) => { + if let Some(exp) = e.expires_at_ms { + let rem_ms = exp.saturating_sub(now_ms); + vec![ + "SET".into(), + e.key.clone(), + s.clone(), + "PX".into(), + rem_ms.to_string(), + ] + } else { + vec!["SET".into(), e.key.clone(), s.clone()] + } + } + SnapshotValue::Hash(map) => { + if map.is_empty() { + continue; + } + let mut parts = vec!["HSET".to_string(), e.key.clone()]; + for (f, v) in map { + parts.push(f.clone()); + parts.push(v.clone()); + } + parts + } + SnapshotValue::List(list) => { + if list.is_empty() { + continue; + } + let mut parts = vec!["RPUSH".to_string(), e.key.clone()]; + parts.extend(list.iter().cloned()); + parts + } + SnapshotValue::Set(set) => { + if set.is_empty() { + continue; + } + let mut parts = vec!["SADD".to_string(), e.key.clone()]; + parts.extend(set.iter().cloned()); + parts + } + SnapshotValue::ZSet(pairs) => { + if pairs.is_empty() { + continue; + } + let mut parts = vec!["ZADD".to_string(), e.key.clone()]; + for (member, score) in pairs { + parts.push(format_zset_score(*score)); + parts.push(member.clone()); + } + parts + } + }; + out.push(to_resp_owned(&data_parts)); + // Non-string types with a TTL need a separate PEXPIREAT command. + if !matches!(&e.value, SnapshotValue::Str(_)) { + if let Some(exp) = e.expires_at_ms { + out.push(to_resp_owned(&[ + "PEXPIREAT".to_string(), + e.key.clone(), + exp.to_string(), + ])); + } + } + } + out +} + // ── RecachedCache ───────────────────────────────────────────────────────────── #[wasm_bindgen] @@ -173,8 +280,9 @@ impl RecachedCache { let keys = js_sys::Array::from(&pair.get(0)); let vals = js_sys::Array::from(&pair.get(1)); + let entry_count = keys.length(); let mut max_seq: u64 = 0; - for i in 0..keys.length() { + for i in 0..entry_count { let s = keys.get(i).as_f64().unwrap_or(0.0) as u64; let cmd_str = vals.get(i).as_string().unwrap_or_default(); if let Ok((value, _)) = Value::parse(cmd_str.as_bytes()) @@ -187,8 +295,25 @@ impl RecachedCache { } } - // If WAL was empty, start seq at 0; otherwise continue from max_seq + 1. - seq_cell.set(if keys.length() == 0 { 0 } else { max_seq + 1 }); + // If the WAL grew large, compact: rewrite it as minimal snapshot + // commands. This keeps startup replay fast regardless of how many + // writes accumulated between refreshes. + let next_seq = if entry_count > WAL_COMPACT_THRESHOLD { + JsFuture::from(idb_clear_js(&db)).await?; + let cmds = snapshot_to_resp_cmds(&store.snapshot()); + let mut seq: u64 = 0; + for cmd_str in &cmds { + let _ = JsFuture::from(idb_append_js(&db, seq as f64, cmd_str)).await; + seq += 1; + } + seq + } else if entry_count == 0 { + 0 + } else { + max_seq + 1 + }; + + seq_cell.set(next_seq); *idb_cell.borrow_mut() = Some(db); Ok(JsValue::UNDEFINED) @@ -286,32 +411,29 @@ impl RecachedCache { let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| { if let Ok(text) = e.data().dyn_into::() { let s = String::from(text); - if let Ok((value, _)) = Value::parse(s.as_bytes()) { - // Detect pub/sub push: ["message", channel, payload] - if let Value::Array(Some(arr)) = &value { - if arr.len() == 3 { - if let ( - Value::BulkString(Some(kind)), - Value::BulkString(Some(channel)), - Value::BulkString(Some(payload)), - ) = (&arr[0], &arr[1], &arr[2]) - { - if kind.eq_ignore_ascii_case(b"message") { - if let Some(f) = on_msg.borrow().as_ref() { - let ch = String::from_utf8_lossy(channel); - let pl = String::from_utf8_lossy(payload); - let _ = f.call2( - &JsValue::NULL, - &JsValue::from_str(&ch), - &JsValue::from_str(&pl), - ); - } - return; - } - } + if let Ok((Value::Push(arr), _)) = Value::parse(s.as_bytes()) { + // Pub/sub message: >3 ["message", channel, payload] + if arr.len() == 3 + && let ( + Value::BulkString(Some(kind)), + Value::BulkString(Some(channel)), + Value::BulkString(Some(payload)), + ) = (&arr[0], &arr[1], &arr[2]) + && kind.eq_ignore_ascii_case(b"message") + { + if let Some(f) = on_msg.borrow().as_ref() { + let ch = String::from_utf8_lossy(channel); + let pl = String::from_utf8_lossy(payload); + let _ = f.call2( + &JsValue::NULL, + &JsValue::from_str(&ch), + &JsValue::from_str(&pl), + ); } + return; } - if let Ok(cmd) = Command::from_value(value) { + // Mutation push: convert to Array for command dispatch + if let Ok(cmd) = Command::from_value(Value::Array(Some(arr))) { match cmd { Command::Set(_, _, _) | Command::Del(_)