diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs index 80d2364d5..b89cb58cf 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs @@ -10,6 +10,10 @@ //! //! The trained model is serialised as JSON and hot-loaded at runtime so that //! the classification thresholds adapt to the specific room and ESP32 placement. +//! +//! Classes are discovered dynamically from training data filenames instead of +//! being hardcoded, so new activity classes can be added just by recording data +//! with the appropriate filename convention. use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -20,9 +24,8 @@ use std::path::{Path, PathBuf}; /// Extended feature vector: 7 server features + 8 subcarrier-derived features = 15. const N_FEATURES: usize = 15; -/// Activity classes we recognise. -pub const CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"]; -const N_CLASSES: usize = 4; +/// Default class names for backward compatibility with old saved models. +const DEFAULT_CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"]; /// Extract extended feature vector from a JSONL frame (features + raw amplitudes). pub fn features_from_frame(frame: &serde_json::Value) -> [f64; N_FEATURES] { @@ -124,8 +127,9 @@ pub struct ClassStats { pub struct AdaptiveModel { /// Per-class feature statistics (centroid + spread). pub class_stats: Vec, - /// Logistic regression weights: [N_CLASSES x (N_FEATURES + 1)] (last = bias). - pub weights: Vec<[f64; N_FEATURES + 1]>, + /// Logistic regression weights: [n_classes x (N_FEATURES + 1)] (last = bias). + /// Dynamic: the outer Vec length equals the number of discovered classes. + pub weights: Vec>, /// Global feature normalisation: mean and stddev across all training data. pub global_mean: [f64; N_FEATURES], pub global_std: [f64; N_FEATURES], @@ -133,27 +137,38 @@ pub struct AdaptiveModel { pub trained_frames: usize, pub training_accuracy: f64, pub version: u32, + /// Dynamically discovered class names (in index order). + #[serde(default = "default_class_names")] + pub class_names: Vec, +} + +/// Backward-compatible fallback for models saved without class_names. +fn default_class_names() -> Vec { + DEFAULT_CLASSES.iter().map(|s| s.to_string()).collect() } impl Default for AdaptiveModel { fn default() -> Self { + let n_classes = DEFAULT_CLASSES.len(); Self { class_stats: Vec::new(), - weights: vec![[0.0; N_FEATURES + 1]; N_CLASSES], + weights: vec![vec![0.0; N_FEATURES + 1]; n_classes], global_mean: [0.0; N_FEATURES], global_std: [1.0; N_FEATURES], trained_frames: 0, training_accuracy: 0.0, version: 1, + class_names: default_class_names(), } } } impl AdaptiveModel { /// Classify a raw feature vector. Returns (class_label, confidence). - pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (&'static str, f64) { - if self.weights.is_empty() || self.class_stats.is_empty() { - return ("present_still", 0.5); + pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (String, f64) { + let n_classes = self.weights.len(); + if n_classes == 0 || self.class_stats.is_empty() { + return ("present_still".to_string(), 0.5); } // Normalise features. @@ -163,8 +178,8 @@ impl AdaptiveModel { } // Compute logits: w·x + b for each class. - let mut logits = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES.min(self.weights.len()) { + let mut logits: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { let w = &self.weights[c]; let mut z = w[N_FEATURES]; // bias for i in 0..N_FEATURES { @@ -176,8 +191,8 @@ impl AdaptiveModel { // Softmax. let max_logit = logits.iter().cloned().fold(f64::NEG_INFINITY, f64::max); let exp_sum: f64 = logits.iter().map(|z| (z - max_logit).exp()).sum(); - let mut probs = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES { + let mut probs: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { probs[c] = ((logits[c] - max_logit).exp()) / exp_sum; } @@ -185,7 +200,11 @@ impl AdaptiveModel { let (best_c, best_p) = probs.iter().enumerate() .max_by(|a, b| a.1.partial_cmp(b.1).unwrap()) .unwrap(); - let label = if best_c < CLASSES.len() { CLASSES[best_c] } else { "present_still" }; + let label = if best_c < self.class_names.len() { + self.class_names[best_c].clone() + } else { + "present_still".to_string() + }; (label, *best_p) } @@ -228,48 +247,88 @@ fn load_recording(path: &Path, class_idx: usize) -> Vec { }).collect() } -/// Map a recording filename to a class index. -fn classify_recording_name(name: &str) -> Option { +/// Map a recording filename to a class name (String). +/// Returns the discovered class name for the file, or None if it cannot be determined. +fn classify_recording_name(name: &str) -> Option { let lower = name.to_lowercase(); - if lower.contains("empty") || lower.contains("absent") { Some(0) } - else if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { Some(1) } - else if lower.contains("walking") || lower.contains("moving") { Some(2) } - else if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { Some(3) } - else { None } + // Strip "train_" prefix and ".jsonl" suffix, then extract the class label. + // Convention: train__.jsonl + // The class is the first segment after "train_" that matches a known pattern, + // or the entire middle portion if no pattern matches. + + // Check common patterns first for backward compat + if lower.contains("empty") || lower.contains("absent") { return Some("absent".into()); } + if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { return Some("present_still".into()); } + if lower.contains("walking") || lower.contains("moving") { return Some("present_moving".into()); } + if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { return Some("active".into()); } + + // Fallback: extract class from filename structure train__*.jsonl + let stem = lower.trim_start_matches("train_").trim_end_matches(".jsonl"); + let class_name = stem.split('_').next().unwrap_or(stem); + if !class_name.is_empty() { + Some(class_name.to_string()) + } else { + None + } } /// Train a model from labeled JSONL recordings in a directory. /// -/// Recordings are matched to classes by filename pattern: -/// - `*empty*` / `*absent*` → absent (0) -/// - `*still*` / `*sitting*` → present_still (1) -/// - `*walking*` / `*moving*` → present_moving (2) -/// - `*active*` / `*exercise*`→ active (3) +/// Recordings are matched to classes by filename pattern. Classes are discovered +/// dynamically from the training data filenames: +/// - `*empty*` / `*absent*` → absent +/// - `*still*` / `*sitting*` → present_still +/// - `*walking*` / `*moving*` → present_moving +/// - `*active*` / `*exercise*`→ active +/// - Any other `train__*.jsonl` → pub fn train_from_recordings(recordings_dir: &Path) -> Result { - // Scan for train_* files. - let mut samples: Vec = Vec::new(); - let entries = std::fs::read_dir(recordings_dir) - .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))?; - - for entry in entries.flatten() { + // First pass: scan filenames to discover all unique class names. + let entries: Vec<_> = std::fs::read_dir(recordings_dir) + .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))? + .flatten() + .collect(); + + let mut class_map: HashMap = HashMap::new(); + let mut class_names: Vec = Vec::new(); + + // Collect (entry, class_name) pairs for files that match. + let mut file_classes: Vec<(PathBuf, String, String)> = Vec::new(); // (path, fname, class_name) + for entry in &entries { let fname = entry.file_name().to_string_lossy().to_string(); if !fname.starts_with("train_") || !fname.ends_with(".jsonl") { continue; } - if let Some(class_idx) = classify_recording_name(&fname) { - let loaded = load_recording(&entry.path(), class_idx); - eprintln!(" Loaded {}: {} frames → class '{}'", - fname, loaded.len(), CLASSES[class_idx]); - samples.extend(loaded); + if let Some(class_name) = classify_recording_name(&fname) { + if !class_map.contains_key(&class_name) { + let idx = class_names.len(); + class_map.insert(class_name.clone(), idx); + class_names.push(class_name.clone()); + } + file_classes.push((entry.path(), fname, class_name)); } } + let n_classes = class_names.len(); + if n_classes == 0 { + return Err("No training samples found. Record data with train_* prefix.".into()); + } + + // Second pass: load recordings with the discovered class indices. + let mut samples: Vec = Vec::new(); + for (path, fname, class_name) in &file_classes { + let class_idx = class_map[class_name]; + let loaded = load_recording(path, class_idx); + eprintln!(" Loaded {}: {} frames → class '{}'", + fname, loaded.len(), class_name); + samples.extend(loaded); + } + if samples.is_empty() { return Err("No training samples found. Record data with train_* prefix.".into()); } let n = samples.len(); - eprintln!("Total training samples: {n}"); + eprintln!("Total training samples: {n} across {n_classes} classes: {:?}", class_names); // ── Compute global normalisation stats ── let mut global_mean = [0.0f64; N_FEATURES]; @@ -289,9 +348,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes]; let lr = 0.1; let epochs = 200; let batch_size = 32; @@ -348,19 +407,19 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes]; for (x, target) in batch { // Forward: softmax. - let mut logits = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES { + let mut logits: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; // bias for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -368,8 +427,8 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes]; + for c in 0..n_classes { probs[c] = ((logits[c] - max_l).exp()) / exp_sum; } @@ -377,7 +436,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -422,12 +481,12 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -438,9 +497,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result, + /// Per-node feature breakdown (per-node CSI separation). + #[serde(skip_serializing_if = "Option::is_none")] + node_features: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -275,6 +278,68 @@ struct BoundingBox { height: f64, } +/// Per-node CSI tracking state (per-node CSI separation). +struct NodeState { + node_id: u8, + frame_history: VecDeque>, + rssi_history: VecDeque, + latest_features: Option, + latest_classification: Option, + latest_amplitudes: Vec, + last_seen: std::time::Instant, + frame_count: u64, + smoothed_motion: f64, + current_motion_level: String, + debounce_counter: u32, + debounce_candidate: String, + baseline_motion: f64, + baseline_frames: u64, +} + +impl NodeState { + fn new(node_id: u8) -> Self { + Self { + node_id, + frame_history: VecDeque::new(), + rssi_history: VecDeque::new(), + latest_features: None, + latest_classification: None, + latest_amplitudes: Vec::new(), + last_seen: std::time::Instant::now(), + frame_count: 0, + smoothed_motion: 0.0, + current_motion_level: "absent".to_string(), + debounce_counter: 0, + debounce_candidate: "absent".to_string(), + baseline_motion: 0.0, + baseline_frames: 0, + } + } +} + +/// Per-node feature info for WebSocket broadcasts (multi-node support). +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PerNodeFeatureInfo { + node_id: u8, + features: FeatureInfo, + classification: ClassificationInfo, + rssi_dbm: f64, + last_seen_ms: u64, + frame_rate_hz: f64, + stale: bool, +} + +/// Map node_id to a default 3D position in the room coordinate system. +fn node_position(node_id: u8) -> [f64; 3] { + match node_id { + 0 => [2.0, 0.0, 1.5], + 1 => [-2.0, 0.0, 1.5], + 2 => [2.0, 0.0, -1.5], + 3 => [-2.0, 0.0, -1.5], + _ => [0.0, 0.0, 0.0], + } +} + /// Shared application state struct AppStateInner { latest_update: Option, @@ -283,6 +348,8 @@ struct AppStateInner { /// Each entry is the full subcarrier amplitude vector for one frame. /// Capacity: FRAME_HISTORY_CAPACITY frames. frame_history: VecDeque>, + /// Per-node CSI tracking state (per-node CSI separation). + node_states: HashMap, tick: u64, source: String, tx: broadcast::Sender, @@ -490,7 +557,9 @@ fn parse_esp32_frame(buf: &[u8]) -> Option { let n_subcarriers = buf[6]; let freq_mhz = u16::from_le_bytes([buf[8], buf[9]]); let sequence = u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]); - let rssi = buf[14] as i8; + let rssi_raw = buf[14] as i8; + // Fix RSSI sign: ensure it's always negative (dBm convention). + let rssi = if rssi_raw > 0 { rssi_raw.saturating_neg() } else { rssi_raw }; let noise_floor = buf[15] as i8; let iq_start = 20; @@ -941,6 +1010,190 @@ fn smooth_and_classify(state: &mut AppStateInner, raw: &mut ClassificationInfo, raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0); } +/// Apply EMA smoothing, adaptive baseline subtraction, and hysteresis debounce +/// to raw classification — operates on a per-node `NodeState` instead of the +/// global `AppStateInner`. Same logic as `smooth_and_classify`. +fn smooth_and_classify_node(node: &mut NodeState, raw: &mut ClassificationInfo, raw_motion: f64) { + node.baseline_frames += 1; + if node.baseline_frames < BASELINE_WARMUP { + node.baseline_motion = node.baseline_motion * 0.9 + raw_motion * 0.1; + } else if raw_motion < node.smoothed_motion + 0.05 { + node.baseline_motion = node.baseline_motion * (1.0 - BASELINE_EMA_ALPHA) + + raw_motion * BASELINE_EMA_ALPHA; + } + + let adjusted = (raw_motion - node.baseline_motion * 0.7).max(0.0); + + node.smoothed_motion = node.smoothed_motion * (1.0 - MOTION_EMA_ALPHA) + + adjusted * MOTION_EMA_ALPHA; + let sm = node.smoothed_motion; + + let candidate = raw_classify(sm); + + if candidate == node.current_motion_level { + node.debounce_counter = 0; + node.debounce_candidate = candidate; + } else if candidate == node.debounce_candidate { + node.debounce_counter += 1; + if node.debounce_counter >= DEBOUNCE_FRAMES { + node.current_motion_level = candidate; + node.debounce_counter = 0; + } + } else { + node.debounce_candidate = candidate; + node.debounce_counter = 1; + } + + raw.motion_level = node.current_motion_level.clone(); + raw.presence = sm > 0.03; + raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0); +} + +/// Compute fused (aggregate) features and classification from all active nodes. +/// +/// Uses weighted mean for most features, with max-boosted presence-sensitive +/// fields (variance, motion_band_power) so that a single strongly-activated +/// node is not diluted by quiet nodes. +fn compute_fused_features(node_states: &HashMap) -> (FeatureInfo, ClassificationInfo) { + let now = std::time::Instant::now(); + let active: Vec<&NodeState> = node_states.values() + .filter(|ns| now.duration_since(ns.last_seen).as_secs() < 5 && ns.latest_features.is_some()) + .collect(); + + if active.is_empty() { + return ( + FeatureInfo { + mean_rssi: -90.0, + variance: 0.0, + motion_band_power: 0.0, + breathing_band_power: 0.0, + dominant_freq_hz: 0.0, + change_points: 0, + spectral_power: 0.0, + }, + ClassificationInfo { + motion_level: "absent".to_string(), + presence: false, + confidence: 0.5, + }, + ); + } + + let n = active.len() as f64; + + // Accumulate sums of features across active nodes. + let mut sum_rssi = 0.0_f64; + let mut sum_variance = 0.0_f64; + let mut max_variance = 0.0_f64; + let mut sum_mbp = 0.0_f64; + let mut max_mbp = 0.0_f64; + let mut sum_bbp = 0.0_f64; + let mut sum_freq = 0.0_f64; + let mut sum_cp = 0_usize; + let mut sum_sp = 0.0_f64; + let mut any_presence = false; + let mut max_confidence = 0.0_f64; + let mut highest_motion = "absent".to_string(); + + let motion_rank = |level: &str| -> u8 { + match level { + "active" => 3, + "present_moving" => 2, + "present_still" => 1, + _ => 0, + } + }; + + for ns in &active { + let f = ns.latest_features.as_ref().unwrap(); + sum_rssi += f.mean_rssi; + sum_variance += f.variance; + if f.variance > max_variance { max_variance = f.variance; } + sum_mbp += f.motion_band_power; + if f.motion_band_power > max_mbp { max_mbp = f.motion_band_power; } + sum_bbp += f.breathing_band_power; + sum_freq += f.dominant_freq_hz; + sum_cp += f.change_points; + sum_sp += f.spectral_power; + + if let Some(ref cls) = ns.latest_classification { + if cls.presence { any_presence = true; } + if cls.confidence > max_confidence { max_confidence = cls.confidence; } + if motion_rank(&cls.motion_level) > motion_rank(&highest_motion) { + highest_motion = cls.motion_level.clone(); + } + } + } + + // For variance and motion_band_power: use max(mean, 0.7 * max) so that + // single-node strong signals are not diluted by quiet nodes. + let mean_variance = sum_variance / n; + let fused_variance = mean_variance.max(0.7 * max_variance); + let mean_mbp = sum_mbp / n; + let fused_mbp = mean_mbp.max(0.7 * max_mbp); + + let fused_features = FeatureInfo { + mean_rssi: sum_rssi / n, + variance: fused_variance, + motion_band_power: fused_mbp, + breathing_band_power: sum_bbp / n, + dominant_freq_hz: sum_freq / n, + change_points: ((sum_cp as f64) / n) as usize, + spectral_power: sum_sp / n, + }; + + let fused_classification = ClassificationInfo { + motion_level: highest_motion, + presence: any_presence, + confidence: max_confidence, + }; + + (fused_features, fused_classification) +} + +/// Build a `Vec` from all currently tracked node states. +fn build_per_node_features(node_states: &HashMap) -> Vec { + let now = std::time::Instant::now(); + let mut nodes: Vec = node_states.values() + .filter_map(|ns| { + let features = ns.latest_features.clone()?; + let classification = ns.latest_classification.clone().unwrap_or(ClassificationInfo { + motion_level: "absent".to_string(), + presence: false, + confidence: 0.5, + }); + let elapsed_ms = now.duration_since(ns.last_seen).as_millis() as u64; + let stale = elapsed_ms > 5000; + // Estimate frame rate from frame_count and last_seen. + // Use a rough estimate: frames / elapsed time (cap at 100 Hz). + let elapsed_secs = now.duration_since(ns.last_seen).as_secs_f64(); + let frame_rate_hz = if ns.frame_count > 1 && elapsed_secs < 60.0 { + // Simple: use 10 Hz default unless we have better info. + // A proper rate estimator would track inter-frame intervals. + 10.0 + } else { + 0.0 + }; + let avg_rssi = if ns.rssi_history.is_empty() { + features.mean_rssi + } else { + ns.rssi_history.iter().sum::() / ns.rssi_history.len() as f64 + }; + Some(PerNodeFeatureInfo { + node_id: ns.node_id, + features, + classification, + rssi_dbm: avg_rssi, + last_seen_ms: elapsed_ms, + frame_rate_hz, + stale, + }) + }) + .collect(); + nodes.sort_by_key(|n| n.node_id); + nodes +} + /// If an adaptive model is loaded, override the classification with the /// model's prediction. Uses the full 15-feature vector for higher accuracy. fn adaptive_override(state: &AppStateInner, features: &FeatureInfo, classification: &mut ClassificationInfo) { @@ -1285,6 +1538,7 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; // Populate persons from the sensing update. @@ -1415,6 +1669,7 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; let persons = derive_pose_from_sensing(&update); @@ -2541,7 +2796,7 @@ async fn adaptive_status(State(state): State) -> Json Json(serde_json::json!({ @@ -2704,6 +2959,32 @@ async fn sona_activate( } } +/// GET /api/v1/nodes — per-node health info. +async fn nodes_endpoint(State(state): State) -> Json { + let s = state.read().await; + let now = std::time::Instant::now(); + let nodes: Vec = s.node_states.values() + .map(|ns| { + let elapsed_ms = now.duration_since(ns.last_seen).as_millis() as u64; + let stale = elapsed_ms > 5000; + let status = if stale { "stale" } else { "active" }; + serde_json::json!({ + "node_id": ns.node_id, + "status": status, + "last_seen_ms": elapsed_ms, + "frame_count": ns.frame_count, + "frame_rate_hz": if ns.frame_count > 0 && !stale { 10.0 } else { 0.0 }, + "features": ns.latest_features, + "classification": ns.latest_classification, + }) + }) + .collect(); + Json(serde_json::json!({ + "nodes": nodes, + "total": nodes.len(), + })) +} + async fn info_page() -> Html { Html(format!( "\ @@ -2791,20 +3072,83 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { let mut s = state.write().await; s.source = "esp32".to_string(); - // Append current amplitudes to history before extracting features so - // that temporal analysis includes the most recent frame. + // ── Per-node CSI separation ───────────────────────────── + // 1. Update the per-node state for this frame's node_id. + let node = s.node_states + .entry(frame.node_id) + .or_insert_with(|| NodeState::new(frame.node_id)); + + // Push amplitudes to the NODE's frame_history. + node.frame_history.push_back(frame.amplitudes.clone()); + if node.frame_history.len() > FRAME_HISTORY_CAPACITY { + node.frame_history.pop_front(); + } + + // Extract features using the NODE's frame_history. + let sample_rate_hz = 1000.0 / 500.0_f64; + let (node_features, mut node_classification, _node_breathing_hz, _node_sub_vars, node_raw_motion) = + extract_features_from_frame(&frame, &node.frame_history, sample_rate_hz); + + // Per-node smoothing and classification. + smooth_and_classify_node(node, &mut node_classification, node_raw_motion); + + // Store latest features and classification on the node. + node.latest_features = Some(node_features.clone()); + node.latest_classification = Some(node_classification.clone()); + node.latest_amplitudes = frame.amplitudes.clone(); + node.last_seen = std::time::Instant::now(); + node.frame_count += 1; + + // Update node's RSSI history. + node.rssi_history.push_back(frame.rssi as f64); + if node.rssi_history.len() > 60 { + node.rssi_history.pop_front(); + } + + // 2. ALSO push to global frame_history for backward compat. s.frame_history.push_back(frame.amplitudes.clone()); if s.frame_history.len() > FRAME_HISTORY_CAPACITY { s.frame_history.pop_front(); } - let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come + // 3. Extract features from global history (backward compat). let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) = extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz); smooth_and_classify(&mut s, &mut classification, raw_motion); - adaptive_override(&s, &features, &mut classification); + adaptive_override(&s, &features, &mut classification); + + // 4. Compute fused features from all active nodes. + let (fused_features, fused_classification) = compute_fused_features(&s.node_states); + + // 5. Build per-node feature list from ALL active nodes. + let per_node_features = build_per_node_features(&s.node_states); + + // 6. Build nodes list from ALL active NodeStates. + let now_instant = std::time::Instant::now(); + let mut active_nodes: Vec = s.node_states.values() + .filter(|ns| now_instant.duration_since(ns.last_seen).as_secs() < 5) + .map(|ns| NodeInfo { + node_id: ns.node_id, + rssi_dbm: ns.rssi_history.back().copied().unwrap_or(-90.0), + position: node_position(ns.node_id), + amplitude: ns.latest_amplitudes.iter().take(56).cloned().collect(), + subcarrier_count: ns.latest_amplitudes.len(), + }) + .collect(); + active_nodes.sort_by_key(|n| n.node_id); + + // If no active nodes (shouldn't happen), fall back to current frame. + if active_nodes.is_empty() { + active_nodes.push(NodeInfo { + node_id: frame.node_id, + rssi_dbm: features.mean_rssi, + position: node_position(frame.node_id), + amplitude: frame.amplitudes.iter().take(56).cloned().collect(), + subcarrier_count: frame.n_subcarriers as usize, + }); + } - // Update RSSI history + // Update global RSSI history. s.rssi_history.push_back(features.mean_rssi); if s.rssi_history.len() > 60 { s.rssi_history.pop_front(); @@ -2813,8 +3157,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { s.tick += 1; let tick = s.tick; - let motion_score = if classification.motion_level == "active" { 0.8 } - else if classification.motion_level == "present_still" { 0.3 } + // Use fused classification for motion score. + let motion_score = if fused_classification.motion_level == "active" { 0.8 } + else if fused_classification.motion_level == "present_still" { 0.3 } else { 0.05 }; let raw_vitals = s.vital_detector.process_frame( @@ -2825,32 +3170,28 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { s.latest_vitals = vitals.clone(); // Multi-person estimation with temporal smoothing. - let raw_score = compute_person_score(&features); + let raw_score = compute_person_score(&fused_features); s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15; - let est_persons = if classification.presence { + let est_persons = if fused_classification.presence { score_to_person_count(s.smoothed_person_score) } else { 0 }; + let signal_field = generate_signal_field( + fused_features.mean_rssi, motion_score, breathing_rate_hz, + fused_features.variance.min(1.0), &sub_variances, + ); + let mut update = SensingUpdate { msg_type: "sensing_update".to_string(), timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0, source: "esp32".to_string(), tick, - nodes: vec![NodeInfo { - node_id: frame.node_id, - rssi_dbm: features.mean_rssi, - position: [2.0, 0.0, 1.5], - amplitude: frame.amplitudes.iter().take(56).cloned().collect(), - subcarrier_count: frame.n_subcarriers as usize, - }], - features: features.clone(), - classification, - signal_field: generate_signal_field( - features.mean_rssi, motion_score, breathing_rate_hz, - features.variance.min(1.0), &sub_variances, - ), + nodes: active_nodes, + features: fused_features, + classification: fused_classification, + signal_field, vital_signs: Some(vitals), enhanced_motion: None, enhanced_breathing: None, @@ -2862,6 +3203,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: if per_node_features.is_empty() { None } else { Some(per_node_features) }, }; let persons = derive_pose_from_sensing(&update); @@ -2977,6 +3319,7 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { }, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; // Populate persons from the sensing update. @@ -2999,16 +3342,36 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { async fn broadcast_tick_task(state: SharedState, tick_ms: u64) { let mut interval = tokio::time::interval(Duration::from_millis(tick_ms)); + let mut cleanup_counter: u64 = 0; + // Cleanup every ~10 seconds (number of ticks depends on tick_ms). + let cleanup_interval = (10_000 / tick_ms).max(1); loop { interval.tick().await; - let s = state.read().await; - if let Some(ref update) = s.latest_update { - if s.tx.receiver_count() > 0 { - // Re-broadcast the latest sensing_update so pose WS clients - // always get data even when ESP32 pauses between frames. - if let Ok(json) = serde_json::to_string(update) { - let _ = s.tx.send(json); + cleanup_counter += 1; + + // Node timeout cleanup — remove nodes with last_seen > 30s. + if cleanup_counter % cleanup_interval == 0 { + let mut s = state.write().await; + let now = std::time::Instant::now(); + s.node_states.retain(|_id, ns| { + now.duration_since(ns.last_seen).as_secs() < 30 + }); + // Re-broadcast under write lock to avoid double-acquire. + if let Some(ref update) = s.latest_update { + if s.tx.receiver_count() > 0 { + if let Ok(json) = serde_json::to_string(update) { + let _ = s.tx.send(json); + } + } + } + } else { + let s = state.read().await; + if let Some(ref update) = s.latest_update { + if s.tx.receiver_count() > 0 { + if let Ok(json) = serde_json::to_string(update) { + let _ = s.tx.send(json); + } } } } @@ -3564,6 +3927,7 @@ async fn main() { latest_update: None, rssi_history: VecDeque::new(), frame_history: VecDeque::new(), + node_states: HashMap::new(), tick: 0, source: source.into(), tx, @@ -3661,6 +4025,8 @@ async fn main() { .route("/api/v1/metrics", get(health_metrics)) // Sensing endpoints .route("/api/v1/sensing/latest", get(latest)) + // Per-node health endpoint + .route("/api/v1/nodes", get(nodes_endpoint)) // Vital sign endpoints .route("/api/v1/vital-signs", get(vital_signs_endpoint)) .route("/api/v1/edge-vitals", get(edge_vitals_endpoint)) diff --git a/ui/components/SensingTab.js b/ui/components/SensingTab.js index 6c3115c12..33387eefe 100644 --- a/ui/components/SensingTab.js +++ b/ui/components/SensingTab.js @@ -110,12 +110,18 @@ export class SensingTab {
About This Data

Metrics are computed from WiFi Channel State Information (CSI). - With 1 ESP32 you get presence detection, breathing + With 0 ESP32 node(s) you get presence detection, breathing estimation, and gross motion. Add 3-4+ ESP32 nodes around the room for spatial resolution and limb-level tracking.

+ +
+
NODE STATUS
+
+
+
Details
@@ -193,6 +199,9 @@ export class SensingTab { // Update HUD this._updateHUD(data); + + // Update per-node panels + this._updateNodePanels(data); } _onStateChange(state) { @@ -233,6 +242,11 @@ export class SensingTab { const f = data.features || {}; const c = data.classification || {}; + // Node count + const nodeCount = (data.nodes || []).length; + const countEl = this.container.querySelector('#sensingNodeCount'); + if (countEl) countEl.textContent = String(nodeCount); + // RSSI this._setText('sensingRssi', `${(f.mean_rssi || -80).toFixed(1)} dBm`); this._setText('sensingSource', data.source || ''); @@ -309,6 +323,57 @@ export class SensingTab { ctx.stroke(); } + // ---- Per-node panels --------------------------------------------------- + + _updateNodePanels(data) { + const container = this.container.querySelector('#nodeStatusContainer'); + if (!container) return; + const nodeFeatures = data.node_features || []; + if (nodeFeatures.length === 0) { + container.textContent = ''; + const msg = document.createElement('div'); + msg.style.cssText = 'color:#888;font-size:12px;padding:8px;'; + msg.textContent = 'No nodes detected'; + container.appendChild(msg); + return; + } + const NODE_COLORS = ['#00ccff', '#ff6600', '#00ff88', '#ff00cc', '#ffcc00', '#8800ff', '#00ffcc', '#ff0044']; + container.textContent = ''; + for (const nf of nodeFeatures) { + const color = NODE_COLORS[nf.node_id % NODE_COLORS.length]; + const statusColor = nf.stale ? '#888' : '#0f0'; + + const row = document.createElement('div'); + row.style.cssText = `display:flex;align-items:center;gap:8px;padding:6px 8px;margin-bottom:4px;background:rgba(255,255,255,0.03);border-radius:6px;border-left:3px solid ${color};`; + + const idCol = document.createElement('div'); + idCol.style.minWidth = '50px'; + const nameEl = document.createElement('div'); + nameEl.style.cssText = `font-size:11px;font-weight:600;color:${color};`; + nameEl.textContent = 'Node ' + nf.node_id; + const statusEl = document.createElement('div'); + statusEl.style.cssText = `font-size:9px;color:${statusColor};`; + statusEl.textContent = nf.stale ? 'STALE' : 'ACTIVE'; + idCol.appendChild(nameEl); + idCol.appendChild(statusEl); + + const metricsCol = document.createElement('div'); + metricsCol.style.cssText = 'flex:1;font-size:10px;color:#aaa;'; + metricsCol.textContent = (nf.rssi_dbm || -80).toFixed(0) + ' dBm · var ' + (nf.features?.variance || 0).toFixed(1); + + const classCol = document.createElement('div'); + classCol.style.cssText = 'font-size:10px;font-weight:600;color:#ccc;'; + const motion = (nf.classification?.motion_level || 'absent').toUpperCase(); + const conf = ((nf.classification?.confidence || 0) * 100).toFixed(0); + classCol.textContent = motion + ' ' + conf + '%'; + + row.appendChild(idCol); + row.appendChild(metricsCol); + row.appendChild(classCol); + container.appendChild(row); + } + } + // ---- Resize ------------------------------------------------------------ _setupResize() { diff --git a/ui/components/gaussian-splats.js b/ui/components/gaussian-splats.js index ecab6e481..5f7227fa3 100644 --- a/ui/components/gaussian-splats.js +++ b/ui/components/gaussian-splats.js @@ -66,6 +66,10 @@ function valueToColor(v) { return [r, g, b]; } +// ---- Node marker color palette ------------------------------------------- + +const NODE_MARKER_COLORS = [0x00ccff, 0xff6600, 0x00ff88, 0xff00cc, 0xffcc00, 0x8800ff, 0x00ffcc, 0xff0044]; + // ---- GaussianSplatRenderer ----------------------------------------------- export class GaussianSplatRenderer { @@ -108,6 +112,10 @@ export class GaussianSplatRenderer { // Node markers (ESP32 / router positions) this._createNodeMarkers(THREE); + // Dynamic per-node markers (multi-node support) + this.nodeMarkers = new Map(); // nodeId -> THREE.Mesh + this._THREE = THREE; + // Body disruption blob this._createBodyBlob(THREE); @@ -369,11 +377,43 @@ export class GaussianSplatRenderer { bGeo.attributes.splatSize.needsUpdate = true; } - // -- Update node positions --------------------------------------------- + // -- Update node positions (legacy single-node) ------------------------ if (nodes.length > 0 && nodes[0].position) { const pos = nodes[0].position; this.nodeMarker.position.set(pos[0], 0.5, pos[2]); } + + // -- Update dynamic per-node markers (multi-node support) -------------- + if (nodes && nodes.length > 0 && this.scene) { + const THREE = this._THREE || window.THREE; + if (THREE) { + const activeIds = new Set(); + for (const node of nodes) { + activeIds.add(node.node_id); + if (!this.nodeMarkers.has(node.node_id)) { + const geo = new THREE.SphereGeometry(0.25, 16, 16); + const mat = new THREE.MeshBasicMaterial({ + color: NODE_MARKER_COLORS[node.node_id % NODE_MARKER_COLORS.length], + transparent: true, + opacity: 0.8, + }); + const marker = new THREE.Mesh(geo, mat); + this.scene.add(marker); + this.nodeMarkers.set(node.node_id, marker); + } + const marker = this.nodeMarkers.get(node.node_id); + const pos = node.position || [0, 0, 0]; + marker.position.set(pos[0], 0.5, pos[2]); + } + // Remove stale markers + for (const [id, marker] of this.nodeMarkers) { + if (!activeIds.has(id)) { + this.scene.remove(marker); + this.nodeMarkers.delete(id); + } + } + } + } } // ---- Render loop ------------------------------------------------------- diff --git a/ui/services/sensing.service.js b/ui/services/sensing.service.js index 4931e86e2..0992483bc 100644 --- a/ui/services/sensing.service.js +++ b/ui/services/sensing.service.js @@ -84,6 +84,11 @@ class SensingService { return [...this._rssiHistory]; } + /** Get per-node RSSI history (object keyed by node_id). */ + getPerNodeRssiHistory() { + return { ...(this._perNodeRssiHistory || {}) }; + } + /** Current connection state. */ get state() { return this._state; @@ -327,6 +332,20 @@ class SensingService { } } + // Per-node RSSI tracking + if (!this._perNodeRssiHistory) this._perNodeRssiHistory = {}; + if (data.node_features) { + for (const nf of data.node_features) { + if (!this._perNodeRssiHistory[nf.node_id]) { + this._perNodeRssiHistory[nf.node_id] = []; + } + this._perNodeRssiHistory[nf.node_id].push(nf.rssi_dbm); + if (this._perNodeRssiHistory[nf.node_id].length > this._maxHistory) { + this._perNodeRssiHistory[nf.node_id].shift(); + } + } + } + // Notify all listeners for (const cb of this._listeners) { try {