From 21b150548b7ab922e231acee377739178188972c Mon Sep 17 00:00:00 2001 From: "cosmo.li" Date: Thu, 30 Apr 2026 23:23:09 +0800 Subject: [PATCH 1/2] feat(engine): add per-key TTL with lazy and active expiration Introduce an internal ValueEntry carrying an optional monotonic deadline, and expose expire_at_ms / persist / ttl_ms / sweep_expired on KvEngine. All read and write paths honour the deadline: lazy expiration drops stale entries on access, and sweep_expired supports an adaptive background sampler. The AOF writer learns two new records, PEXPIREAT (absolute epoch ms) and PERSIST, and the replay loader applies them. PEXPIREAT entries whose deadline is already past cause the key to be dropped during replay, keeping the log idempotent across restarts. SET overwrites clear the existing TTL to match Redis semantics; INCR and APPEND preserve it. --- src/persistence/replay.rs | 92 ++++++ src/persistence/writer.rs | 16 + src/storage/engine.rs | 629 +++++++++++++++++++++++++++++++++++--- 3 files changed, 688 insertions(+), 49 deletions(-) diff --git a/src/persistence/replay.rs b/src/persistence/replay.rs index 37cc936..2362959 100644 --- a/src/persistence/replay.rs +++ b/src/persistence/replay.rs @@ -363,6 +363,29 @@ fn apply_record(engine: &KvEngine, parts: &[Vec]) -> Result<(), ApplyError> } engine.flushdb().map_err(ApplyError::Engine) } + b"PEXPIREAT" => { + if parts.len() != 3 { + return Err(ApplyError::Arity(cmd_name(cmd))); + } + let ts_text = + std::str::from_utf8(&parts[2]).map_err(|_| ApplyError::Arity(cmd_name(cmd)))?; + let abs_ms: i64 = ts_text + .parse() + .map_err(|_| ApplyError::Arity(cmd_name(cmd)))?; + engine + .expire_at_ms(&parts[1], abs_ms) + .map(|_| ()) + .map_err(ApplyError::Engine) + } + b"PERSIST" => { + if parts.len() != 2 { + return Err(ApplyError::Arity(cmd_name(cmd))); + } + engine + .persist(&parts[1]) + .map(|_| ()) + .map_err(ApplyError::Engine) + } _ => Err(ApplyError::Unknown(cmd_name(cmd))), } } @@ -499,4 +522,73 @@ mod tests { assert_eq!(stats, ReplayStats::default()); let _ = fs::remove_file(&path); } + + #[test] + fn replays_pexpireat_and_expires_key_after_deadline() { + use crate::storage::engine::TtlStatus; + + let path = tmp_path("pexpireat"); + let now_ms = crate::storage::engine::current_epoch_ms(); + let abs_ms = now_ms + 60_000; + let abs_text = abs_ms.to_string(); + let mut content = String::new(); + content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n"); + content.push_str(&format!( + "*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n", + abs_text.len(), + abs_text, + )); + fs::write(&path, &content).unwrap(); + + let engine = KvEngine::new(); + let stats = replay(&path, &engine).unwrap(); + assert_eq!(stats.applied, 2); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::Millis(_))); + let _ = fs::remove_file(&path); + } + + #[test] + fn replay_drops_keys_whose_pexpireat_is_already_past() { + let path = tmp_path("pexpireat-past"); + let now_ms = crate::storage::engine::current_epoch_ms(); + let abs_ms = now_ms - 1_000; // already expired + let abs_text = abs_ms.to_string(); + let mut content = String::new(); + content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n"); + content.push_str(&format!( + "*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n", + abs_text.len(), + abs_text, + )); + fs::write(&path, &content).unwrap(); + + let engine = KvEngine::new(); + replay(&path, &engine).unwrap(); + assert_eq!(engine.get(b"k").unwrap(), None); + let _ = fs::remove_file(&path); + } + + #[test] + fn replays_persist_and_clears_ttl() { + use crate::storage::engine::TtlStatus; + + let path = tmp_path("persist"); + let now_ms = crate::storage::engine::current_epoch_ms(); + let abs_ms = now_ms + 60_000; + let abs_text = abs_ms.to_string(); + let mut content = String::new(); + content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n"); + content.push_str(&format!( + "*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n", + abs_text.len(), + abs_text, + )); + content.push_str("*2\r\n$7\r\nPERSIST\r\n$1\r\nk\r\n"); + fs::write(&path, &content).unwrap(); + + let engine = KvEngine::new(); + replay(&path, &engine).unwrap(); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + let _ = fs::remove_file(&path); + } } diff --git a/src/persistence/writer.rs b/src/persistence/writer.rs index 8f7f2d6..d83733a 100644 --- a/src/persistence/writer.rs +++ b/src/persistence/writer.rs @@ -92,6 +92,22 @@ impl AofWriter { self.append(&[b"FLUSHDB"]) } + /// Appends a `PEXPIREAT key abs_epoch_ms` entry to the log. + /// + /// The absolute millisecond timestamp is recorded rather than a relative + /// offset so replay stays correct regardless of how long the log has been + /// sitting on disk. Any already-past timestamp encountered during replay + /// makes the key be dropped immediately. + pub fn append_pexpireat(&self, key: &[u8], abs_epoch_ms: i64) -> Result<(), FerrumError> { + let ts = abs_epoch_ms.to_string(); + self.append(&[b"PEXPIREAT", key, ts.as_bytes()]) + } + + /// Appends a `PERSIST key` entry to the log. + pub fn append_persist(&self, key: &[u8]) -> Result<(), FerrumError> { + self.append(&[b"PERSIST", key]) + } + fn append(&self, parts: &[&[u8]]) -> Result<(), FerrumError> { let bytes = encode_command(parts); self.write_bytes(&bytes) diff --git a/src/storage/engine.rs b/src/storage/engine.rs index 7692bc4..42f9cee 100644 --- a/src/storage/engine.rs +++ b/src/storage/engine.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use log::warn; @@ -12,6 +13,30 @@ pub const KEY_MAX_BYTES: usize = 64 * 1024; /// Maximum allowed value size in bytes (16 MiB). pub const VALUE_MAX_BYTES: usize = 16 * 1024 * 1024; +/// A value stored in the engine, together with its optional expiration. +/// +/// `expire_at` is a monotonic deadline derived from [`Instant::now`] at the +/// time the TTL was installed. A value whose deadline is `<= Instant::now()` +/// is considered expired and must be treated as absent by every read path. +#[derive(Clone)] +struct ValueEntry { + data: Vec, + expire_at: Option, +} + +impl ValueEntry { + fn new(data: Vec) -> Self { + Self { + data, + expire_at: None, + } + } + + fn is_expired(&self, now: Instant) -> bool { + matches!(self.expire_at, Some(deadline) if deadline <= now) + } +} + /// A thread-safe key-value storage engine backed by a [`HashMap`]. /// /// Keys and values are stored as `Vec`, making the engine fully @@ -19,17 +44,22 @@ pub const VALUE_MAX_BYTES: usize = 16 * 1024 * 1024; /// preserved verbatim. This matches the contract of the RESP2 bulk string, /// which is already byte-oriented on the wire. /// -/// Mutating commands (`SET`, `DEL`, `FLUSHDB`) are optionally forwarded to an -/// [`AofWriter`] so changes survive a restart. The log is appended while the -/// write lock is still held, which preserves the ordering invariant described -/// in the whitepaper (§8.7): the in-memory state and the on-disk log always -/// agree on the relative order of successful writes. +/// Each value carries an optional expiration deadline. Expired entries are +/// removed lazily on access (Redis-style) and proactively by a background +/// sweeper that calls [`KvEngine::sweep_expired`] periodically. +/// +/// Mutating commands (`SET`, `DEL`, `FLUSHDB`, `EXPIRE`, `PERSIST`) are +/// optionally forwarded to an [`AofWriter`] so changes survive a restart. +/// The log is appended while the write lock is still held, which preserves +/// the ordering invariant described in the whitepaper (§8.7): the in-memory +/// state and the on-disk log always agree on the relative order of +/// successful writes. /// /// Public methods return [`Result`] so lock poisoning can be reported instead /// of causing a panic. #[derive(Clone)] pub struct KvEngine { - store: Arc, Vec>>>, + store: Arc, ValueEntry>>>, aof: Option>, } @@ -59,6 +89,10 @@ impl KvEngine { /// Sets a key-value pair and returns the previous value, if any. /// + /// Matches Redis' default semantics: any existing TTL on the key is + /// cleared by the write. Callers that need the Redis `KEEPTTL` option + /// will have to go through a dedicated future API. + /// /// Returns [`FerrumError::KeyTooLong`] or [`FerrumError::ValueTooLarge`] if /// the configured size limits are exceeded. pub fn set(&self, key: Vec, value: Vec) -> Result>, FerrumError> { @@ -69,8 +103,8 @@ impl KvEngine { if let Some(aof) = &self.aof { log_aof_result("SET", aof.append_set(&key, &value)); } - let previous = store.insert(key, value); - Ok(previous) + let previous = store.insert(key, ValueEntry::new(value)); + Ok(previous.and_then(live_payload)) } /// Sets `key` to `value` only if the key is not already present. @@ -83,13 +117,21 @@ impl KvEngine { validate_value(&value)?; let mut store = self.store.write()?; - if store.contains_key(key.as_slice()) { - return Ok(false); + let now = Instant::now(); + if let Some(entry) = store.get(key.as_slice()) { + if !entry.is_expired(now) { + return Ok(false); + } + // The old value has already expired: remove it and proceed as if + // the key were absent. We intentionally do not log a DEL because + // the subsequent SET, once replayed, overwrites the stale entry + // anyway and skipping the DEL keeps the log shorter. + store.remove(key.as_slice()); } if let Some(aof) = &self.aof { log_aof_result("SETNX", aof.append_set(&key, &value)); } - store.insert(key, value); + store.insert(key, ValueEntry::new(value)); Ok(true) } @@ -110,7 +152,7 @@ impl KvEngine { log_aof_result("MSET", aof.append_set_many(&pairs)); } for (k, v) in pairs { - store.insert(k, v); + store.insert(k, ValueEntry::new(v)); } Ok(()) } @@ -118,13 +160,24 @@ impl KvEngine { /// Returns the stored value for every key in `keys`, preserving order. /// /// Missing keys map to `None` so the caller can serialise them as - /// null bulk strings without ambiguity. + /// null bulk strings without ambiguity. Expired entries are dropped in + /// the same pass so the result reflects live state only. pub fn mget(&self, keys: &[Vec]) -> Result>>, FerrumError> { - let store = self.store.read()?; - Ok(keys - .iter() - .map(|k| store.get(k.as_slice()).cloned()) - .collect()) + let mut store = self.store.write()?; + let now = Instant::now(); + let mut out = Vec::with_capacity(keys.len()); + for key in keys { + match store.get(key.as_slice()) { + Some(entry) if entry.is_expired(now) => { + store.remove(key.as_slice()); + self.log_expire_drop(key); + out.push(None); + } + Some(entry) => out.push(Some(entry.data.clone())), + None => out.push(None), + } + } + Ok(out) } /// Atomically adds `delta` to the integer value at `key` and returns @@ -133,23 +186,32 @@ impl KvEngine { /// A missing key is treated as starting from zero, matching Redis' /// `INCR` semantics. The existing value, if any, must be a decimal /// ASCII integer that fits into an [`i64`]; values outside that range - /// or that fail to parse produce the Redis-standard - /// `-ERR value is not an integer or out of range` reply. Overflow of - /// the addition itself is treated the same way. + /// or that fail to parse produce the Redis-standard reply + /// `-ERR value is not an integer or out of range`. Overflow of the + /// addition itself is treated the same way. + /// + /// The key's existing TTL, if any, is preserved. pub fn incr_by(&self, key: Vec, delta: i64) -> Result { validate_key(&key)?; let mut store = self.store.write()?; - let current = match store.get(key.as_slice()) { - Some(bytes) => { - let text = std::str::from_utf8(bytes).map_err(|_| { + let now = Instant::now(); + let (current, existing_deadline) = match store.get(key.as_slice()) { + Some(entry) if entry.is_expired(now) => { + store.remove(key.as_slice()); + self.log_expire_drop(&key); + (0i64, None) + } + Some(entry) => { + let text = std::str::from_utf8(&entry.data).map_err(|_| { FerrumError::ParseError("value is not an integer or out of range".into()) })?; - text.parse::().map_err(|_| { + let n = text.parse::().map_err(|_| { FerrumError::ParseError("value is not an integer or out of range".into()) - })? + })?; + (n, entry.expire_at) } - None => 0, + None => (0i64, None), }; let new_value = current.checked_add(delta).ok_or_else(|| { @@ -159,21 +221,47 @@ impl KvEngine { if let Some(aof) = &self.aof { log_aof_result("INCRBY", aof.append_set(&key, &serialised)); + // INCR preserves TTL: re-emit the existing deadline so replay + // converges to the same state regardless of record ordering. + if let Some(deadline) = existing_deadline + && let Some(abs_ms) = deadline_to_epoch_ms(deadline, now) + { + log_aof_result("PEXPIREAT", aof.append_pexpireat(&key, abs_ms)); + } } - store.insert(key, serialised); + store.insert( + key, + ValueEntry { + data: serialised, + expire_at: existing_deadline, + }, + ); Ok(new_value) } /// Returns the value for `key`, or `None` if the key does not exist. pub fn get(&self, key: &[u8]) -> Result>, FerrumError> { - let store = self.store.read()?; - Ok(store.get(key).cloned()) + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(None) + } + Some(entry) => Ok(Some(entry.data.clone())), + None => Ok(None), + } } /// Deletes `key` and returns `true` if it existed. pub fn del(&self, key: &[u8]) -> Result { let mut store = self.store.write()?; - let existed = store.remove(key).is_some(); + let now = Instant::now(); + let existed = match store.remove(key) { + Some(entry) => !entry.is_expired(now), + None => false, + }; if existed && let Some(aof) = &self.aof { log_aof_result("DEL", aof.append_del(key)); } @@ -187,15 +275,18 @@ impl KvEngine { /// atomic from an observer's point of view: concurrent readers see /// either all deletions or none of them. Persisted log records are /// appended only for keys that were actually removed, mirroring - /// Redis' behaviour. + /// Redis' behaviour. Already-expired keys do not count as removed. pub fn del_many(&self, keys: &[Vec]) -> Result { if keys.is_empty() { return Ok(0); } let mut store = self.store.write()?; + let now = Instant::now(); let mut removed: Vec<&[u8]> = Vec::with_capacity(keys.len()); for key in keys { - if store.remove(key.as_slice()).is_some() { + if let Some(entry) = store.remove(key.as_slice()) + && !entry.is_expired(now) + { removed.push(key.as_slice()); } } @@ -207,52 +298,91 @@ impl KvEngine { Ok(removed.len()) } - /// Returns `true` if `key` exists. + /// Returns `true` if `key` exists and has not expired. pub fn exists(&self, key: &[u8]) -> Result { - let store = self.store.read()?; - Ok(store.contains_key(key)) + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(false) + } + Some(_) => Ok(true), + None => Ok(false), + } } /// Returns the number of keys currently stored. + /// + /// Already-expired keys still pending lazy cleanup are excluded so + /// callers see a value consistent with `EXISTS`. pub fn dbsize(&self) -> Result { let store = self.store.read()?; - Ok(store.len()) + let now = Instant::now(); + Ok(store.values().filter(|v| !v.is_expired(now)).count()) } /// Appends `suffix` to the value at `key` and returns the new length. /// /// If `key` is absent, the command behaves like `SET` with an empty /// initial value (the same contract as Redis). The resulting value is - /// subject to the usual size validation, and the AOF records the new - /// full value with a `SET` entry so that replay is guaranteed to - /// converge to the same state regardless of history. + /// subject to the usual size validation. The existing TTL, if any, is + /// preserved and re-emitted to the AOF so replay converges regardless + /// of record ordering. pub fn append(&self, key: Vec, suffix: Vec) -> Result { validate_key(&key)?; let mut store = self.store.write()?; - let new_value = match store.get(key.as_slice()) { - Some(existing) => { - let mut buf = Vec::with_capacity(existing.len() + suffix.len()); - buf.extend_from_slice(existing); + let now = Instant::now(); + let (new_value, existing_deadline) = match store.get(key.as_slice()) { + Some(entry) if entry.is_expired(now) => { + store.remove(key.as_slice()); + self.log_expire_drop(&key); + (suffix, None) + } + Some(entry) => { + let mut buf = Vec::with_capacity(entry.data.len() + suffix.len()); + buf.extend_from_slice(&entry.data); buf.extend_from_slice(&suffix); - buf + (buf, entry.expire_at) } - None => suffix, + None => (suffix, None), }; validate_value(&new_value)?; if let Some(aof) = &self.aof { log_aof_result("APPEND", aof.append_set(&key, &new_value)); + if let Some(deadline) = existing_deadline + && let Some(abs_ms) = deadline_to_epoch_ms(deadline, now) + { + log_aof_result("PEXPIREAT", aof.append_pexpireat(&key, abs_ms)); + } } let new_len = new_value.len(); - store.insert(key, new_value); + store.insert( + key, + ValueEntry { + data: new_value, + expire_at: existing_deadline, + }, + ); Ok(new_len) } /// Returns the byte length of the value at `key`, or `0` if absent. pub fn strlen(&self, key: &[u8]) -> Result { - let store = self.store.read()?; - Ok(store.get(key).map(|v| v.len()).unwrap_or(0)) + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(0) + } + Some(entry) => Ok(entry.data.len()), + None => Ok(0), + } } /// Removes all keys from the store. @@ -264,6 +394,224 @@ impl KvEngine { } Ok(()) } + + /// Installs an absolute expiration time on `key`, measured in Unix + /// epoch milliseconds. + /// + /// Returns `true` if the key exists and the deadline was recorded. + /// Returns `false` when the key is absent or has already expired, + /// matching the Redis semantics of `EXPIRE`/`PEXPIREAT` returning `0`. + /// + /// A deadline in the past (`abs_epoch_ms <= now`) causes the key to + /// be removed immediately and an accompanying `DEL` to be logged, + /// keeping the AOF idempotent across replays. + pub fn expire_at_ms(&self, key: &[u8], abs_epoch_ms: i64) -> Result { + let mut store = self.store.write()?; + let now_instant = Instant::now(); + let now_ms = current_epoch_ms(); + + // Drop the entry if it is already expired under its current TTL. + if let Some(entry) = store.get(key) + && entry.is_expired(now_instant) + { + store.remove(key); + self.log_expire_drop(key); + } + + if !store.contains_key(key) { + return Ok(false); + } + + if abs_epoch_ms <= now_ms { + store.remove(key); + if let Some(aof) = &self.aof { + log_aof_result("DEL", aof.append_del(key)); + } + return Ok(true); + } + + let delta_ms = (abs_epoch_ms - now_ms) as u64; + let deadline = now_instant + Duration::from_millis(delta_ms); + if let Some(entry) = store.get_mut(key) { + entry.expire_at = Some(deadline); + } + if let Some(aof) = &self.aof { + log_aof_result("PEXPIREAT", aof.append_pexpireat(key, abs_epoch_ms)); + } + Ok(true) + } + + /// Removes any TTL from `key`. + /// + /// Returns `true` only when the key existed **and** had a TTL before + /// the call — matching Redis' `PERSIST` return semantics. + pub fn persist(&self, key: &[u8]) -> Result { + let mut store = self.store.write()?; + let now = Instant::now(); + + if let Some(entry) = store.get(key) + && entry.is_expired(now) + { + store.remove(key); + self.log_expire_drop(key); + return Ok(false); + } + + let Some(entry) = store.get_mut(key) else { + return Ok(false); + }; + if entry.expire_at.is_none() { + return Ok(false); + } + entry.expire_at = None; + if let Some(aof) = &self.aof { + log_aof_result("PERSIST", aof.append_persist(key)); + } + Ok(true) + } + + /// Returns the remaining TTL for `key` in milliseconds. + /// + /// * `Ok(TtlStatus::Missing)` — key does not exist (Redis reports `-2`). + /// * `Ok(TtlStatus::NoExpire)` — key exists without a TTL (Redis `-1`). + /// * `Ok(TtlStatus::Millis(n))` — `n` milliseconds remaining. + pub fn ttl_ms(&self, key: &[u8]) -> Result { + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + None => Ok(TtlStatus::Missing), + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(TtlStatus::Missing) + } + Some(entry) => match entry.expire_at { + None => Ok(TtlStatus::NoExpire), + Some(deadline) => { + let remaining = deadline.saturating_duration_since(now); + Ok(TtlStatus::Millis(remaining.as_millis() as i64)) + } + }, + } + } + + /// Proactively removes up to `sample` expired entries. + /// + /// Sampling is random — the first `sample` keys yielded by the map's + /// iteration order are checked. This mirrors Redis' active expiration + /// loop and is the primary caller of the `ferrum-expire` background + /// thread. Returns the number of entries actually evicted. + /// + /// When more than 25% of the sample was expired the caller is expected + /// to invoke this method again immediately; the signal is surfaced via + /// the returned fraction so the scheduler can make that decision. + pub fn sweep_expired(&self, sample: usize) -> Result { + if sample == 0 { + return Ok(SweepStats { + examined: 0, + evicted: 0, + }); + } + let mut store = self.store.write()?; + let now = Instant::now(); + + let mut victims: Vec> = Vec::new(); + let mut examined = 0usize; + for (key, entry) in store.iter() { + if examined >= sample { + break; + } + examined += 1; + if entry.is_expired(now) { + victims.push(key.clone()); + } + } + + let evicted = victims.len(); + for key in &victims { + store.remove(key.as_slice()); + if let Some(aof) = &self.aof { + log_aof_result("DEL", aof.append_del(key)); + } + } + + Ok(SweepStats { examined, evicted }) + } + + /// Logs a DEL record caused by lazy expiration. + /// + /// Kept separate from `del()` so call sites stay terse; callers must + /// already hold the write lock. + fn log_expire_drop(&self, key: &[u8]) { + if let Some(aof) = &self.aof { + log_aof_result("DEL", aof.append_del(key)); + } + } +} + +/// Outcome of a single call to [`KvEngine::sweep_expired`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SweepStats { + /// Number of entries inspected during the sweep. + pub examined: usize, + /// Number of entries that were actually expired and removed. + pub evicted: usize, +} + +impl SweepStats { + /// Returns `true` when the expired ratio warrants a follow-up sweep. + /// + /// Matches Redis' active expiration heuristic: keep running while at + /// least 25% of the sample is expired. + pub fn should_continue(&self) -> bool { + self.examined > 0 && self.evicted * 4 > self.examined + } +} + +/// Tri-state return for [`KvEngine::ttl_ms`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TtlStatus { + /// The key does not exist; RESP reply is `:-2`. + Missing, + /// The key exists with no TTL; RESP reply is `:-1`. + NoExpire, + /// Remaining TTL in milliseconds (`>= 0`). + Millis(i64), +} + +/// Returns the payload of `entry` if it has not already expired. +fn live_payload(entry: ValueEntry) -> Option> { + let now = Instant::now(); + if entry.is_expired(now) { + None + } else { + Some(entry.data) + } +} + +/// Current wall-clock time expressed as Unix epoch milliseconds. +/// +/// A system clock earlier than the Unix epoch (extremely unusual in +/// practice) falls back to zero so the engine never panics. +pub(crate) fn current_epoch_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + +/// Converts a monotonic `deadline` to a Unix epoch millisecond timestamp. +/// +/// Returns `None` when the deadline is already in the past (the caller will +/// drop the key instead of writing an expiration record). +fn deadline_to_epoch_ms(deadline: Instant, now: Instant) -> Option { + if deadline <= now { + return None; + } + let remaining = deadline - now; + let now_ms = current_epoch_ms(); + let abs_ms = now_ms.saturating_add(remaining.as_millis() as i64); + Some(abs_ms) } fn validate_key(key: &[u8]) -> Result<(), FerrumError> { @@ -790,4 +1138,187 @@ mod tests { assert!(bytes.is_empty()); let _ = fs::remove_file(&path); } + + #[test] + fn expire_at_ms_sets_and_ttl_reports_remaining() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + + let now = current_epoch_ms(); + assert!(engine.expire_at_ms(b"k", now + 60_000).unwrap()); + + match engine.ttl_ms(b"k").unwrap() { + TtlStatus::Millis(ms) => assert!(ms > 0 && ms <= 60_000), + other => panic!("expected Millis(..), got {other:?}"), + } + } + + #[test] + fn expire_at_ms_returns_false_for_missing_key() { + let engine = KvEngine::new(); + let now = current_epoch_ms(); + assert!(!engine.expire_at_ms(b"missing", now + 1000).unwrap()); + } + + #[test] + fn expire_in_the_past_deletes_key_immediately() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + + let now = current_epoch_ms(); + assert!(engine.expire_at_ms(b"k", now - 1).unwrap()); + assert_eq!(engine.get(b"k").unwrap(), None); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::Missing)); + } + + #[test] + fn persist_strips_ttl_only_when_present() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + assert!(!engine.persist(b"k").unwrap()); + + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 10_000).unwrap(); + assert!(engine.persist(b"k").unwrap()); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + assert!(!engine.persist(b"k").unwrap()); + } + + #[test] + fn ttl_status_for_missing_and_persistent_keys() { + let engine = KvEngine::new(); + assert!(matches!( + engine.ttl_ms(b"missing").unwrap(), + TtlStatus::Missing + )); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + } + + #[test] + fn lazy_expiration_drops_key_on_read() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let deadline = Instant::now() + Duration::from_millis(20); + { + let mut store = engine.store.write().unwrap(); + if let Some(entry) = store.get_mut(b"k".as_slice()) { + entry.expire_at = Some(deadline); + } + } + std::thread::sleep(Duration::from_millis(40)); + assert_eq!(engine.get(b"k").unwrap(), None); + assert!(!engine.exists(b"k").unwrap()); + assert_eq!(engine.dbsize().unwrap(), 0); + } + + #[test] + fn set_overwrite_clears_existing_ttl() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 60_000).unwrap(); + engine.set(b"k".to_vec(), b"v2".to_vec()).unwrap(); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + } + + #[test] + fn incr_preserves_ttl() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"1".to_vec()).unwrap(); + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 60_000).unwrap(); + engine.incr_by(b"k".to_vec(), 5).unwrap(); + match engine.ttl_ms(b"k").unwrap() { + TtlStatus::Millis(ms) => assert!(ms > 0), + other => panic!("expected Millis(..), got {other:?}"), + } + } + + #[test] + fn append_preserves_ttl() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"hi".to_vec()).unwrap(); + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 60_000).unwrap(); + engine.append(b"k".to_vec(), b"!".to_vec()).unwrap(); + match engine.ttl_ms(b"k").unwrap() { + TtlStatus::Millis(ms) => assert!(ms > 0), + other => panic!("expected Millis(..), got {other:?}"), + } + } + + #[test] + fn expire_logs_pexpireat_record_to_aof() { + let path = tmp_aof_path("expire"); + let (engine, writer) = engine_with_aof(&path); + + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let abs_ms = current_epoch_ms() + 60_000; + engine.expire_at_ms(b"k", abs_ms).unwrap(); + drop(engine); + drop(writer); + + let bytes = fs::read(&path).unwrap(); + let text = String::from_utf8_lossy(&bytes); + assert!( + text.contains("PEXPIREAT"), + "missing PEXPIREAT record in {text:?}" + ); + assert!(text.contains(&abs_ms.to_string())); + let _ = fs::remove_file(&path); + } + + #[test] + fn persist_logs_persist_record_to_aof() { + let path = tmp_aof_path("persist"); + let (engine, writer) = engine_with_aof(&path); + + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let abs_ms = current_epoch_ms() + 60_000; + engine.expire_at_ms(b"k", abs_ms).unwrap(); + assert!(engine.persist(b"k").unwrap()); + drop(engine); + drop(writer); + + let bytes = fs::read(&path).unwrap(); + let text = String::from_utf8_lossy(&bytes); + assert!( + text.contains("PERSIST"), + "missing PERSIST record in {text:?}" + ); + let _ = fs::remove_file(&path); + } + + #[test] + fn sweep_removes_expired_entries_and_logs_del() { + let path = tmp_aof_path("sweep"); + let (engine, writer) = engine_with_aof(&path); + + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + { + let mut store = engine.store.write().unwrap(); + if let Some(entry) = store.get_mut(b"k".as_slice()) { + entry.expire_at = Some(Instant::now() - Duration::from_millis(1)); + } + } + + let stats = engine.sweep_expired(16).unwrap(); + assert_eq!(stats.evicted, 1); + assert_eq!(engine.dbsize().unwrap(), 0); + + drop(engine); + drop(writer); + let bytes = fs::read(&path).unwrap(); + assert!(String::from_utf8_lossy(&bytes).contains("DEL")); + let _ = fs::remove_file(&path); + } + + #[test] + fn sweep_respects_zero_sample() { + let engine = KvEngine::new(); + let stats = engine.sweep_expired(0).unwrap(); + assert_eq!(stats.examined, 0); + assert_eq!(stats.evicted, 0); + } } From caf25d0a5cb847db46885cbcf14fd331f25cdbcf Mon Sep 17 00:00:00 2001 From: "cosmo.li" Date: Thu, 30 Apr 2026 23:23:19 +0800 Subject: [PATCH 2/2] feat(server): wire EXPIRE/PEXPIRE/PEXPIREAT/PERSIST/TTL/PTTL and an active expiration sweeper Extend the RESP2 parser and the command executor with the full TTL command family. EXPIRE/PEXPIRE translate relative offsets into absolute epoch milliseconds before calling the engine, so replay always sees a stable deadline. TTL returns whole seconds (rounded up from the remaining millis) and PTTL returns the raw millisecond count; both surface the Redis -2 / -1 sentinels. Add a dedicated background sweeper (storage::expire) that samples twenty keys every hundred milliseconds and follows up immediately when more than a quarter of the sample was expired. The main binary spawns the sweeper on startup and joins it during graceful shutdown. End-to-end tests cover the new wire contract as well as the sweeper actually evicting keys without a foreground read. --- src/main.rs | 11 +- src/network/server.rs | 67 ++++++++++++- src/protocol/parser.rs | 120 ++++++++++++++++++++++ src/storage/expire.rs | 194 +++++++++++++++++++++++++++++++++++ src/storage/mod.rs | 1 + tests/expire_test.rs | 222 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 611 insertions(+), 4 deletions(-) create mode 100644 src/storage/expire.rs create mode 100644 tests/expire_test.rs diff --git a/src/main.rs b/src/main.rs index 9958e04..4a602b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use ferrum_kv::network::server::ServerConfig; use ferrum_kv::network::shutdown::Shutdown; use ferrum_kv::persistence::AofWriter; use ferrum_kv::storage::engine::KvEngine; +use ferrum_kv::storage::expire; use crate::cli::{CliArgs, Invocation, USAGE}; @@ -83,9 +84,13 @@ fn main() -> ExitCode { info!("maxclients: {}", server_config.max_clients); } - if let Err(e) = - ferrum_kv::network::server::run_listener(listener, engine, shutdown, server_config) - { + let expire_handle = expire::spawn(engine.clone(), shutdown.clone()); + + let server_result = + ferrum_kv::network::server::run_listener(listener, engine, shutdown, server_config); + expire_handle.shutdown(); + + if let Err(e) = server_result { error!("server error: {e}"); return ExitCode::FAILURE; } diff --git a/src/network/server.rs b/src/network/server.rs index c13ad93..4e4cc7a 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -11,7 +11,7 @@ use crate::error::FerrumError; use crate::network::shutdown::Shutdown; use crate::protocol::encoder; use crate::protocol::parser::{self, Command, FrameParse}; -use crate::storage::engine::KvEngine; +use crate::storage::engine::{KvEngine, TtlStatus, current_epoch_ms}; /// Initial capacity of each per-connection read buffer. /// @@ -306,6 +306,71 @@ pub fn execute_command(cmd: Command, engine: &KvEngine, out: &mut Vec) { Ok(()) => encoder::encode_simple_string(out, "OK"), Err(e) => write_ferrum_error(out, &e), }, + Command::Expire { key, seconds } => { + let reply = expire_reply(engine, &key, checked_seconds_to_ms(seconds)); + write_bool_integer(out, reply); + } + Command::PExpire { key, millis } => { + let reply = expire_reply(engine, &key, Some(millis)); + write_bool_integer(out, reply); + } + Command::PExpireAt { key, abs_epoch_ms } => match engine.expire_at_ms(&key, abs_epoch_ms) { + Ok(true) => encoder::encode_integer(out, 1), + Ok(false) => encoder::encode_integer(out, 0), + Err(e) => write_ferrum_error(out, &e), + }, + Command::Persist { key } => match engine.persist(&key) { + Ok(true) => encoder::encode_integer(out, 1), + Ok(false) => encoder::encode_integer(out, 0), + Err(e) => write_ferrum_error(out, &e), + }, + Command::Ttl { key } => match engine.ttl_ms(&key) { + Ok(TtlStatus::Missing) => encoder::encode_integer(out, -2), + Ok(TtlStatus::NoExpire) => encoder::encode_integer(out, -1), + Ok(TtlStatus::Millis(ms)) => encoder::encode_integer(out, (ms + 999) / 1000), + Err(e) => write_ferrum_error(out, &e), + }, + Command::PTtl { key } => match engine.ttl_ms(&key) { + Ok(TtlStatus::Missing) => encoder::encode_integer(out, -2), + Ok(TtlStatus::NoExpire) => encoder::encode_integer(out, -1), + Ok(TtlStatus::Millis(ms)) => encoder::encode_integer(out, ms), + Err(e) => write_ferrum_error(out, &e), + }, + } +} + +/// Converts an `EXPIRE` second delta to milliseconds, saturating on overflow. +/// +/// A `None` return means the caller should treat the request as "delete this +/// key right now" — which is how [`KvEngine::expire_at_ms`] interprets an +/// already-past absolute timestamp. +fn checked_seconds_to_ms(seconds: i64) -> Option { + seconds.checked_mul(1_000) +} + +/// Computes the absolute epoch-millisecond deadline for `EXPIRE`/`PEXPIRE` +/// and forwards it to the engine. +/// +/// A delta that overflows `i64` when expressed in milliseconds (only possible +/// with `EXPIRE` and astronomically large second counts) is treated the same +/// way as an in-the-past deadline: the key is dropped on the spot. That keeps +/// the wire contract simple — either the key existed and was updated +/// (`:1`), or it did not (`:0`). +fn expire_reply(engine: &KvEngine, key: &[u8], delta_ms: Option) -> Result { + let now_ms = current_epoch_ms(); + let abs_ms = match delta_ms { + Some(d) => now_ms.saturating_add(d), + None => now_ms, // treat overflow as "expire immediately" + }; + engine.expire_at_ms(key, abs_ms) +} + +/// Writes a `Result` as a RESP integer (`0`/`1`) or as an error. +fn write_bool_integer(out: &mut Vec, reply: Result) { + match reply { + Ok(true) => encoder::encode_integer(out, 1), + Ok(false) => encoder::encode_integer(out, 0), + Err(e) => write_ferrum_error(out, &e), } } diff --git a/src/protocol/parser.rs b/src/protocol/parser.rs index 6d5a637..2388bb4 100644 --- a/src/protocol/parser.rs +++ b/src/protocol/parser.rs @@ -33,6 +33,19 @@ pub enum Command { DbSize, /// `FLUSHDB`, which removes all keys. FlushDb, + /// `EXPIRE key seconds` — set TTL in seconds. + Expire { key: Vec, seconds: i64 }, + /// `PEXPIRE key milliseconds` — set TTL in milliseconds. + PExpire { key: Vec, millis: i64 }, + /// `PEXPIREAT key ms-timestamp` — set TTL as an absolute + /// Unix epoch millisecond timestamp. + PExpireAt { key: Vec, abs_epoch_ms: i64 }, + /// `PERSIST key` — remove any TTL. + Persist { key: Vec }, + /// `TTL key` — remaining TTL in whole seconds. + Ttl { key: Vec }, + /// `PTTL key` — remaining TTL in milliseconds. + PTtl { key: Vec }, } /// Outcome of attempting to parse a single RESP2 frame from a byte buffer. @@ -361,6 +374,57 @@ fn build_command(parts: Vec>) -> Result { } Ok(Command::FlushDb) } + b"EXPIRE" => { + if args.len() != 2 { + return Err(FerrumError::WrongArity { cmd: "EXPIRE" }); + } + let mut it = args.into_iter(); + let key = it.next().unwrap(); + let seconds = parse_integer_argument(&it.next().unwrap(), "EXPIRE")?; + Ok(Command::Expire { key, seconds }) + } + b"PEXPIRE" => { + if args.len() != 2 { + return Err(FerrumError::WrongArity { cmd: "PEXPIRE" }); + } + let mut it = args.into_iter(); + let key = it.next().unwrap(); + let millis = parse_integer_argument(&it.next().unwrap(), "PEXPIRE")?; + Ok(Command::PExpire { key, millis }) + } + b"PEXPIREAT" => { + if args.len() != 2 { + return Err(FerrumError::WrongArity { cmd: "PEXPIREAT" }); + } + let mut it = args.into_iter(); + let key = it.next().unwrap(); + let abs_epoch_ms = parse_integer_argument(&it.next().unwrap(), "PEXPIREAT")?; + Ok(Command::PExpireAt { key, abs_epoch_ms }) + } + b"PERSIST" => { + if args.len() != 1 { + return Err(FerrumError::WrongArity { cmd: "PERSIST" }); + } + Ok(Command::Persist { + key: args.into_iter().next().unwrap(), + }) + } + b"TTL" => { + if args.len() != 1 { + return Err(FerrumError::WrongArity { cmd: "TTL" }); + } + Ok(Command::Ttl { + key: args.into_iter().next().unwrap(), + }) + } + b"PTTL" => { + if args.len() != 1 { + return Err(FerrumError::WrongArity { cmd: "PTTL" }); + } + Ok(Command::PTtl { + key: args.into_iter().next().unwrap(), + }) + } _ => Err(FerrumError::UnknownCommand( String::from_utf8_lossy(&name).into_owned(), )), @@ -779,4 +843,60 @@ mod frame_tests { let err = parse_frame(b"*1000000\r\n").unwrap_err(); assert!(matches!(err, FerrumError::ParseError(_))); } + + #[test] + fn parses_expire_and_ttl_family() { + assert_eq!( + parse_exact(b"*3\r\n$6\r\nEXPIRE\r\n$1\r\nk\r\n$2\r\n60\r\n"), + Command::Expire { + key: b"k".to_vec(), + seconds: 60, + } + ); + assert_eq!( + parse_exact(b"*3\r\n$7\r\nPEXPIRE\r\n$1\r\nk\r\n$4\r\n1500\r\n"), + Command::PExpire { + key: b"k".to_vec(), + millis: 1500, + } + ); + assert_eq!( + parse_exact(b"*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n$13\r\n1700000000000\r\n"), + Command::PExpireAt { + key: b"k".to_vec(), + abs_epoch_ms: 1_700_000_000_000, + } + ); + assert_eq!( + parse_exact(b"*2\r\n$7\r\nPERSIST\r\n$1\r\nk\r\n"), + Command::Persist { key: b"k".to_vec() } + ); + assert_eq!( + parse_exact(b"*2\r\n$3\r\nTTL\r\n$1\r\nk\r\n"), + Command::Ttl { key: b"k".to_vec() } + ); + assert_eq!( + parse_exact(b"*2\r\n$4\r\nPTTL\r\n$1\r\nk\r\n"), + Command::PTtl { key: b"k".to_vec() } + ); + } + + #[test] + fn expire_with_non_integer_is_invalid() { + let (err, _) = match parse_frame(b"*3\r\n$6\r\nEXPIRE\r\n$1\r\nk\r\n$3\r\nabc\r\n").unwrap() + { + FrameParse::Invalid { error, consumed } => (error, consumed), + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(matches!(err, FerrumError::ParseError(_))); + } + + #[test] + fn ttl_without_key_is_wrong_arity() { + let err = match parse_frame(b"*1\r\n$3\r\nTTL\r\n").unwrap() { + FrameParse::Invalid { error, .. } => error, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(matches!(err, FerrumError::WrongArity { cmd: "TTL" })); + } } diff --git a/src/storage/expire.rs b/src/storage/expire.rs new file mode 100644 index 0000000..74748e6 --- /dev/null +++ b/src/storage/expire.rs @@ -0,0 +1,194 @@ +//! Active expiration sweeper. +//! +//! Runs on its own thread, periodically sampling keys from the engine and +//! removing those whose TTL has elapsed. The sampler mirrors Redis' adaptive +//! strategy: sample 20 keys every 100ms; when more than 25% of the sample +//! was already expired, run another round immediately instead of sleeping. +//! +//! The thread is cooperatively shut down via [`Shutdown`], matching the rest +//! of the server. + +use std::sync::Arc; +use std::sync::{Condvar, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use log::{debug, warn}; + +use crate::network::shutdown::Shutdown; +use crate::storage::engine::KvEngine; + +/// Default sample size per sweep tick. +pub const DEFAULT_SAMPLE: usize = 20; + +/// Default interval between sweep ticks. +pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(100); + +/// Spawns the background expiration sweeper. +/// +/// Returns a [`ExpireSweeperHandle`] whose [`ExpireSweeperHandle::shutdown`] +/// waits for the worker thread to exit. The worker also self-terminates when +/// the shared [`Shutdown`] flag flips, so callers that forward the server's +/// shutdown signal do not need to call the handle explicitly — dropping it +/// will still join the thread. +pub fn spawn(engine: KvEngine, shutdown: Shutdown) -> ExpireSweeperHandle { + spawn_with(engine, shutdown, DEFAULT_SAMPLE, DEFAULT_INTERVAL) +} + +/// Testing-oriented variant that accepts custom sampling parameters. +pub fn spawn_with( + engine: KvEngine, + shutdown: Shutdown, + sample: usize, + interval: Duration, +) -> ExpireSweeperHandle { + let wait = Arc::new(SleepWaker::default()); + let wait_clone = Arc::clone(&wait); + let shutdown_clone = shutdown.clone(); + + let handle = thread::Builder::new() + .name("ferrum-expire".into()) + .spawn(move || run(engine, shutdown_clone, sample, interval, wait_clone)) + .expect("failed to spawn ferrum-expire thread"); + + ExpireSweeperHandle { + handle: Some(handle), + wait, + } +} + +/// Joins the sweeper thread on drop, surfacing any thread panic as a warning. +pub struct ExpireSweeperHandle { + handle: Option>, + wait: Arc, +} + +impl ExpireSweeperHandle { + /// Wakes the sweeper and blocks until it exits. + pub fn shutdown(mut self) { + self.wait.wake(); + if let Some(handle) = self.handle.take() + && let Err(e) = handle.join() + { + warn!("ferrum-expire thread panicked: {e:?}"); + } + } +} + +impl Drop for ExpireSweeperHandle { + fn drop(&mut self) { + self.wait.wake(); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} + +/// Interruptible sleep primitive shared between the sweeper and its owner. +#[derive(Default)] +struct SleepWaker { + lock: Mutex, + cvar: Condvar, +} + +impl SleepWaker { + fn wake(&self) { + if let Ok(mut guard) = self.lock.lock() { + *guard = true; + self.cvar.notify_all(); + } + } + + fn sleep(&self, timeout: Duration) { + let guard = match self.lock.lock() { + Ok(g) => g, + Err(poisoned) => poisoned.into_inner(), + }; + let (mut guard, _) = match self.cvar.wait_timeout(guard, timeout) { + Ok(pair) => pair, + Err(poisoned) => { + let pair = poisoned.into_inner(); + (pair.0, pair.1) + } + }; + // Reset so the next tick sleeps again unless explicitly woken. + *guard = false; + } +} + +fn run( + engine: KvEngine, + shutdown: Shutdown, + sample: usize, + interval: Duration, + wait: Arc, +) { + debug!("ferrum-expire: started (sample={sample}, interval={interval:?})"); + while !shutdown.is_triggered() { + // Iterate the adaptive loop a small bounded number of times so one + // busy sweep cannot starve other readers holding the write lock. + for _ in 0..16 { + match engine.sweep_expired(sample) { + Ok(stats) => { + if !stats.should_continue() { + break; + } + } + Err(e) => { + warn!("ferrum-expire: sweep failed: {e}"); + break; + } + } + } + wait.sleep(interval); + } + debug!("ferrum-expire: stopped"); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Instant; + + #[test] + fn sweeper_removes_expired_keys_in_the_background() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + // Install a 10 ms TTL. + let now = crate::storage::engine::current_epoch_ms(); + engine.expire_at_ms(b"k", now + 10).unwrap(); + + let shutdown = Shutdown::new(); + let handle = spawn_with( + engine.clone(), + shutdown.clone(), + 8, + Duration::from_millis(5), + ); + + let deadline = Instant::now() + Duration::from_millis(500); + while Instant::now() < deadline { + if !engine.exists(b"k").unwrap() { + break; + } + thread::sleep(Duration::from_millis(5)); + } + + shutdown.trigger(); + handle.shutdown(); + assert!( + !engine.exists(b"k").unwrap(), + "sweeper failed to remove expired key" + ); + } + + #[test] + fn sweeper_honours_shutdown_flag() { + let engine = KvEngine::new(); + let shutdown = Shutdown::new(); + let handle = spawn_with(engine, shutdown.clone(), 8, Duration::from_millis(50)); + shutdown.trigger(); + handle.shutdown(); + // Absence of a hang here is the assertion. + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 702e611..da81298 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1 +1,2 @@ pub mod engine; +pub mod expire; diff --git a/tests/expire_test.rs b/tests/expire_test.rs new file mode 100644 index 0000000..f1b703a --- /dev/null +++ b/tests/expire_test.rs @@ -0,0 +1,222 @@ +//! End-to-end tests for the TTL / expiration commands. +//! +//! Drives the full server path (RESP2 parser, engine, active sweeper) through +//! a real TCP connection so regressions in wire format, command dispatch, or +//! background expiration all surface here. + +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::thread; +use std::time::{Duration, Instant}; + +use ferrum_kv::network::server::{self, ServerConfig}; +use ferrum_kv::network::shutdown::Shutdown; +use ferrum_kv::protocol::encoder; +use ferrum_kv::storage::engine::KvEngine; +use ferrum_kv::storage::expire; + +struct ServerGuard { + addr: String, + shutdown: Shutdown, + _server: thread::JoinHandle<()>, +} + +impl Drop for ServerGuard { + fn drop(&mut self) { + self.shutdown.trigger(); + // Self-connect to unblock accept() so the thread can wind down. + let _ = TcpStream::connect_timeout(&self.addr.parse().unwrap(), Duration::from_millis(200)); + } +} + +/// Spawns a server with an active expiration sweeper that ticks frequently +/// so tests do not need to wait a full 100 ms per iteration. +fn spawn_server_with_sweeper() -> ServerGuard { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("local_addr").to_string(); + let engine = KvEngine::new(); + let shutdown = Shutdown::new(); + + // Keep the sweeper handle alive for the server thread's lifetime. + let sweeper_engine = engine.clone(); + let sweeper_shutdown = shutdown.clone(); + let handle = thread::spawn(move || { + let sweeper = expire::spawn_with( + sweeper_engine, + sweeper_shutdown.clone(), + 16, + Duration::from_millis(10), + ); + let _ = server::run_listener(listener, engine, sweeper_shutdown, ServerConfig::default()); + sweeper.shutdown(); + }); + + ServerGuard { + addr, + shutdown, + _server: handle, + } +} + +fn connect(addr: &str) -> TcpStream { + let stream = TcpStream::connect(addr).expect("connect"); + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("set_read_timeout"); + stream + .set_write_timeout(Some(Duration::from_secs(2))) + .expect("set_write_timeout"); + stream +} + +fn build_request(args: &[&[u8]]) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(format!("*{}\r\n", args.len()).as_bytes()); + for arg in args { + encoder::encode_bulk_string(&mut out, arg); + } + out +} + +fn send(stream: &mut TcpStream, request: &[u8]) -> Vec { + stream.write_all(request).expect("write request"); + read_one_reply(stream) +} + +/// Reads exactly one RESP2 reply from `stream`. +/// +/// Supports simple strings, errors, integers, and bulk strings, which is +/// enough for the TTL command family. +fn read_one_reply(stream: &mut TcpStream) -> Vec { + let mut out = Vec::new(); + let mut byte = [0u8; 1]; + stream.read_exact(&mut byte).expect("read type byte"); + out.push(byte[0]); + match byte[0] { + b'+' | b'-' | b':' => { + read_until_crlf(stream, &mut out); + } + b'$' => { + let mut header = Vec::new(); + read_until_crlf(stream, &mut header); + // header is "\r\n"; parse it to know how many body bytes follow. + let header_str = + std::str::from_utf8(&header[..header.len() - 2]).expect("ascii length prefix"); + let len: i64 = header_str.parse().expect("integer length"); + out.extend_from_slice(&header); + if len >= 0 { + let mut body = vec![0u8; len as usize + 2]; + stream.read_exact(&mut body).expect("read bulk body"); + out.extend_from_slice(&body); + } + } + other => panic!("unexpected RESP type byte {other:#x}"), + } + out +} + +fn read_until_crlf(stream: &mut TcpStream, out: &mut Vec) { + let mut byte = [0u8; 1]; + loop { + stream.read_exact(&mut byte).expect("read byte"); + out.push(byte[0]); + if out.len() >= 2 && out[out.len() - 2] == b'\r' && out[out.len() - 1] == b'\n' { + return; + } + } +} + +#[test] +fn ttl_returns_minus_two_for_missing_key() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + let reply = send(&mut stream, &build_request(&[b"TTL", b"missing"])); + assert_eq!(reply, b":-2\r\n"); +} + +#[test] +fn ttl_returns_minus_one_for_persistent_key() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + let reply = send(&mut stream, &build_request(&[b"TTL", b"k"])); + assert_eq!(reply, b":-1\r\n"); +} + +#[test] +fn expire_and_ttl_roundtrip() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + let r = send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"100"])); + assert_eq!(r, b":1\r\n"); + + let ttl = send(&mut stream, &build_request(&[b"TTL", b"k"])); + let text = std::str::from_utf8(&ttl).unwrap(); + assert!(text.starts_with(':') && text.ends_with("\r\n")); + let n: i64 = text[1..text.len() - 2].parse().unwrap(); + assert!((1..=100).contains(&n), "TTL out of range: {n}"); +} + +#[test] +fn persist_removes_ttl() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"60"])); + let r = send(&mut stream, &build_request(&[b"PERSIST", b"k"])); + assert_eq!(r, b":1\r\n"); + let ttl = send(&mut stream, &build_request(&[b"TTL", b"k"])); + assert_eq!(ttl, b":-1\r\n"); +} + +#[test] +fn pexpire_triggers_background_eviction() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + send(&mut stream, &build_request(&[b"PEXPIRE", b"k", b"50"])); + + // Poll EXISTS until the sweeper removes the key, with a generous ceiling + // so CI jitter does not flake the test. + let deadline = Instant::now() + Duration::from_secs(2); + loop { + let reply = send(&mut stream, &build_request(&[b"EXISTS", b"k"])); + if reply == b":0\r\n" { + break; + } + if Instant::now() >= deadline { + panic!("key was not expired within 2s"); + } + thread::sleep(Duration::from_millis(20)); + } +} + +#[test] +fn expire_with_negative_seconds_deletes_key() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + let r = send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"-1"])); + assert_eq!(r, b":1\r\n"); + let reply = send(&mut stream, &build_request(&[b"GET", b"k"])); + assert_eq!(reply, b"$-1\r\n"); +} + +#[test] +fn set_overwrite_clears_ttl_on_wire() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"60"])); + send(&mut stream, &build_request(&[b"SET", b"k", b"v2"])); + let ttl = send(&mut stream, &build_request(&[b"TTL", b"k"])); + assert_eq!(ttl, b":-1\r\n"); +}