Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ members = [
"crates/ruvllm_retrieval_diffusion",
# RAIRS IVF: Redundant Assignment + Amplified Inverse Residual (ADR-193)
"crates/ruvector-rairs",
# Streaming semantic drift detection for agent vector memory (ADR-194)
"crates/ruvector-drift",
]
resolver = "2"

Expand Down
20 changes: 20 additions & 0 deletions crates/ruvector-drift/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "ruvector-drift"
version = "0.1.0"
edition = "2021"
description = "Streaming semantic drift detection for agent vector memory — online distribution shift monitoring for RuVector"
authors = ["ruvnet", "claude-flow"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/ruvnet/ruvector"
keywords = ["ann", "drift-detection", "agent-memory", "vector-search", "ruvector"]
categories = ["algorithms", "data-structures", "science"]

[[bin]]
name = "drift-bench"
path = "src/main.rs"

[dependencies]
rand = "0.8"
serde = { version = "1", features = ["derive"] }

[dev-dependencies]
129 changes: 129 additions & 0 deletions crates/ruvector-drift/src/cusum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//! Alternative A: CUSUM-based drift detector.
//!
//! Runs a standard CUSUM (Page 1954) control chart on the L2 norm of each
//! incoming vector. For vectors from N(μ, I), E[||v||²] = D + ||μ||², so a
//! shift in mean always increases the expected squared norm — giving CUSUM a
//! reliable scalar channel that is sign-agnostic and dimension-agnostic.
//!
//! The reference phase fits a running mean and unbiased variance of ||v||² via
//! Welford's algorithm. After warm-up, each new vector contributes one
//! z-scored observation to both an upper CUSUM (increase) and a lower CUSUM
//! (decrease) statistic. Either arm triggering indicates distribution shift.
//!
//! **Complexity:** O(D) insert (norm computation), O(1) score, O(1) memory.

use crate::DriftDetector;

/// CUSUM drift detector operating on per-vector L2 squared norms.
#[derive(Debug, Clone)]
pub struct CusumDetector {
dim: usize,
warm_up: usize,
/// Allowance (slack) for normal variability; typically 0.5 σ.
slack: f64,
/// Welford running mean of ||v||².
ref_mean: f64,
/// Welford M2 accumulator for ||v||².
ref_m2: f64,
/// Welford running count during reference phase.
ref_n: usize,
/// Reference std of ||v||² (frozen after warm-up).
ref_std: f64,
/// Upper CUSUM statistic (detects upward shifts).
cusum_up: f64,
/// Lower CUSUM statistic (detects downward shifts).
cusum_down: f64,
count: usize,
warmed_up: bool,
}

impl CusumDetector {
/// Create a new norm-based CUSUM detector.
///
/// - `dim`: vector dimension (used for memory reporting only)
/// - `warm_up`: insertions to build reference statistics
/// - `slack`: CUSUM allowance in units of reference σ (try 0.5–1.0)
pub fn new(dim: usize, warm_up: usize, slack: f64) -> Self {
assert!(dim > 0);
assert!(warm_up >= 2, "need at least 2 samples for variance");
assert!(slack >= 0.0);
Self {
dim,
warm_up,
slack,
ref_mean: 0.0,
ref_m2: 0.0,
ref_n: 0,
ref_std: 1.0,
cusum_up: 0.0,
cusum_down: 0.0,
count: 0,
warmed_up: false,
}
}

/// L2 squared norm of `vec`.
fn sq_norm(vec: &[f32]) -> f64 {
vec.iter().map(|&x| (x as f64).powi(2)).sum()
}
}

impl DriftDetector for CusumDetector {
fn insert(&mut self, vec: &[f32]) {
debug_assert_eq!(vec.len(), self.dim);
self.count += 1;

let norm_sq = Self::sq_norm(vec);

if !self.warmed_up {
// Welford online update for mean and M2 of ||v||²
self.ref_n += 1;
let delta = norm_sq - self.ref_mean;
self.ref_mean += delta / self.ref_n as f64;
let delta2 = norm_sq - self.ref_mean;
self.ref_m2 += delta * delta2;

if self.count >= self.warm_up {
self.warmed_up = true;
// Sample std; floor at 1.0 to avoid division by near-zero.
self.ref_std = if self.ref_n >= 2 {
(self.ref_m2 / (self.ref_n - 1) as f64).sqrt().max(1.0)
} else {
1.0
};
}
} else {
// Z-score the squared norm relative to reference statistics.
let z = (norm_sq - self.ref_mean) / self.ref_std;
self.cusum_up = (self.cusum_up + z - self.slack).max(0.0);
self.cusum_down = (self.cusum_down - z - self.slack).max(0.0);
}
}

fn drift_score(&self) -> f32 {
if !self.warmed_up {
return 0.0;
}
self.cusum_up.max(self.cusum_down) as f32
}

fn reset_reference(&mut self) {
self.ref_mean = 0.0;
self.ref_m2 = 0.0;
self.ref_n = 0;
self.ref_std = 1.0;
self.cusum_up = 0.0;
self.cusum_down = 0.0;
self.count = 0;
self.warmed_up = false;
}

fn count(&self) -> usize {
self.count
}

fn memory_bytes(&self) -> usize {
// All state is stack-allocated scalars.
6 * std::mem::size_of::<f64>()
}
}
215 changes: 215 additions & 0 deletions crates/ruvector-drift/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
//! # ruvector-drift — Streaming Semantic Drift Detection for Agent Vector Memory
//!
//! Detects when the semantic distribution of an agent's vector memory has shifted —
//! enabling self-healing indexes, staleness eviction, and RAG safety guards.
//!
//! ## Variants
//!
//! | Variant | Algorithm | Memory | Latency |
//! |------------------|-------------------------|-------------|-----------|
//! | `MeanShiftDetector` | EMA mean distance | O(D) | O(D) |
//! | `CusumDetector` | CUSUM on projections | O(D) | O(D) |
//! | `MmdRffDetector` | MMD via RFF features | O(D × R) | O(D + R) |
//!
//! All three implement [`DriftDetector`].

#![forbid(unsafe_code)]
#![warn(missing_docs)]

pub mod cusum;
pub mod mean_shift;
pub mod mmd_rff;
pub mod stats;

pub use cusum::CusumDetector;
pub use mean_shift::MeanShiftDetector;
pub use mmd_rff::MmdRffDetector;
pub use stats::OnlineStats;

/// Core trait implemented by all drift detectors.
///
/// A detector ingests vectors one at a time via [`insert`], accumulates a
/// reference distribution during the warm-up phase, then continuously scores
/// divergence from that reference. Callers gate on [`is_drifted`] and can
/// [`reset_reference`] when a controlled concept update occurs.
pub trait DriftDetector {
/// Ingest one vector into the detector.
fn insert(&mut self, vec: &[f32]);

/// Scalar divergence from the reference distribution; 0.0 = no drift.
fn drift_score(&self) -> f32;

/// Whether the detector considers drift to have occurred.
fn is_drifted(&self, threshold: f32) -> bool {
self.drift_score() > threshold
}

/// Freeze the current distribution as the new reference baseline.
fn reset_reference(&mut self);

/// Number of vectors seen since last reset.
fn count(&self) -> usize;

/// Approximate heap bytes consumed by this detector.
fn memory_bytes(&self) -> usize;
}

#[cfg(test)]
mod tests {
use super::*;
use rand::{rngs::StdRng, Rng, SeedableRng};

fn gaussian(rng: &mut StdRng, dim: usize, mean: f64, std: f64) -> Vec<f32> {
use std::f64::consts::PI;
let mut out = Vec::with_capacity(dim);
while out.len() < dim {
let u1 = rng.gen::<f64>().max(1e-14);
let u2 = rng.gen::<f64>();
let r = (-2.0 * u1.ln()).sqrt() * std;
let theta = 2.0 * PI * u2;
out.push((mean + r * theta.cos()) as f32);
if out.len() < dim {
out.push((mean + r * theta.sin()) as f32);
}
}
out.truncate(dim);
out
}

fn run_detect<D: DriftDetector>(
det: &mut D,
rng: &mut StdRng,
dim: usize,
warm: usize,
n_drift: usize,
drift: f64,
threshold: f32,
) -> Option<usize> {
for _ in 0..warm {
det.insert(&gaussian(rng, dim, 0.0, 1.0));
}
for i in 0..n_drift {
det.insert(&gaussian(rng, dim, drift, 1.0));
if det.is_drifted(threshold) {
return Some(i + 1);
}
}
None
}

#[test]
fn mean_shift_detects_large_drift() {
let mut rng = StdRng::seed_from_u64(1);
let mut det = MeanShiftDetector::new(64, 200, 0.05);
let lag = run_detect(&mut det, &mut rng, 64, 200, 500, 3.0, 0.4);
assert!(
lag.is_some(),
"MeanShift must detect drift=3.0 within 500 insertions"
);
assert!(
lag.unwrap() <= 200,
"detection lag must be ≤200; got {:?}",
lag
);
}

#[test]
fn cusum_detects_moderate_drift() {
let mut rng = StdRng::seed_from_u64(2);
let mut det = CusumDetector::new(64, 200, 1.0);
let lag = run_detect(&mut det, &mut rng, 64, 200, 500, 2.0, 3.0);
assert!(lag.is_some(), "CUSUM must detect drift=2.0");
assert!(lag.unwrap() <= 300, "CUSUM lag must be ≤300; got {:?}", lag);
}

#[test]
fn mmd_rff_detects_shift() {
let mut rng = StdRng::seed_from_u64(3);
let mut det = MmdRffDetector::new(64, 128, 200, 1.0, 0.05, 42);
let lag = run_detect(&mut det, &mut rng, 64, 200, 500, 2.5, 0.04);
assert!(lag.is_some(), "MMD-RFF must detect drift=2.5");
}

#[test]
fn mean_shift_drift_exceeds_nodrift_score() {
// Verify drift signal >> natural noise floor.
// EMA effective window n_eff = 1/alpha = 20. For D=64 iid N(0,1):
// expected no-drift L2 ≈ sqrt(D/n_eff) = sqrt(64/20) ≈ 1.79
// drift=4.0 per-dim pushes L2 far above that noise floor.
let mut rng_nodrift = StdRng::seed_from_u64(4);
let mut det_nodrift = MeanShiftDetector::new(64, 200, 0.05);
for _ in 0..200 {
det_nodrift.insert(&gaussian(&mut rng_nodrift, 64, 0.0, 1.0));
}
for _ in 0..100 {
det_nodrift.insert(&gaussian(&mut rng_nodrift, 64, 0.0, 1.0));
}
let nodrift_score = det_nodrift.drift_score();

let mut rng_drift = StdRng::seed_from_u64(4);
let mut det_drift = MeanShiftDetector::new(64, 200, 0.05);
for _ in 0..200 {
det_drift.insert(&gaussian(&mut rng_drift, 64, 0.0, 1.0));
}
for _ in 0..100 {
det_drift.insert(&gaussian(&mut rng_drift, 64, 4.0, 1.0));
}
let drift_score = det_drift.drift_score();

// Drift score must be at least 3× larger than no-drift score.
assert!(
drift_score > nodrift_score * 3.0,
"signal-to-noise too low: drift={drift_score:.2} nodrift={nodrift_score:.2}"
);
}

#[test]
fn reset_clears_state() {
let mut rng = StdRng::seed_from_u64(5);
let mut det = MeanShiftDetector::new(32, 100, 0.05);
for _ in 0..100 {
det.insert(&gaussian(&mut rng, 32, 0.0, 1.0));
}
for _ in 0..100 {
det.insert(&gaussian(&mut rng, 32, 5.0, 1.0));
}
assert!(det.drift_score() > 0.1);
det.reset_reference();
assert_eq!(det.drift_score(), 0.0);
assert_eq!(det.count(), 0);
}

#[test]
fn memory_bytes_nonzero() {
let det_ms = MeanShiftDetector::new(128, 100, 0.05);
let det_cs = CusumDetector::new(128, 100, 1.0);
let det_mmd = MmdRffDetector::new(128, 256, 100, 1.0, 0.05, 0);
assert!(det_ms.memory_bytes() > 0);
assert!(det_cs.memory_bytes() > 0);
assert!(
det_mmd.memory_bytes() >= 256 * 128 * 4,
"RFF matrix should dominate"
);
}
}

/// Result of a single drift evaluation run.
#[derive(Debug, Clone)]
pub struct DriftReport {
/// Name of the detector variant.
pub variant: String,
/// Number of insertions in the reference phase.
pub reference_count: usize,
/// Number of insertions in the drift-observation phase.
pub observation_count: usize,
/// Drift score at end of reference phase (should be near 0).
pub baseline_score: f32,
/// Drift score when detection first triggered (0 if not triggered).
pub trigger_score: f32,
/// Insertions after drift injection until detection (None = not detected).
pub detection_lag: Option<usize>,
/// Final drift score after all observations.
pub final_score: f32,
/// Approximate memory used by the detector (bytes).
pub memory_bytes: usize,
}
Loading
Loading