diff --git a/crates/harness-server/src/chat_runs.rs b/crates/harness-server/src/chat_runs.rs index c2bda3a..969f5e3 100644 --- a/crates/harness-server/src/chat_runs.rs +++ b/crates/harness-server/src/chat_runs.rs @@ -15,6 +15,19 @@ const MAX_EVENTS_PER_RUN: usize = 1_000; /// be larger than typical token-delta bursts during a single turn so /// transient backpressure doesn't kick a healthy client off. const BROADCAST_CAPACITY: usize = 256; +/// How long a terminal (completed/failed/cancelled) run lingers in the +/// registry after it finishes. Long enough that a late WS reconnect can +/// still replay the final frames and observe channel closure, short +/// enough that memory doesn't grow with the lifetime number of +/// conversations served. Eviction is lazy (swept on the next +/// `try_start` / terminal transition) so there's no background task. +const TERMINAL_RETENTION_MS: u64 = 5 * 60 * 1_000; +/// Hard backstop on retained terminal runs, independent of the TTL. +/// Guards against a burst of conversations all completing inside the +/// retention window: once we exceed this, the oldest terminal runs are +/// dropped LRU-style by terminal timestamp. Active runs are never +/// counted against this cap or evicted. +const MAX_RETAINED_TERMINAL: usize = 256; /// In-process status ledger for Web chat turns. /// @@ -41,6 +54,11 @@ struct ChatRunState { /// terminal event) replace the whole state, including this Sender, /// so old subscribers naturally see channel closure. broadcast: broadcast::Sender, + /// `now_ms()` at which this run first reached a terminal status, or + /// `None` while it's still active. Drives TTL + LRU eviction so the + /// `inner` map doesn't grow with the lifetime number of + /// conversations. + terminated_at: Option, } #[derive(Debug, Clone, Serialize)] @@ -96,6 +114,7 @@ impl ChatRunRegistry { last_error: None, }; if let Ok(mut guard) = self.inner.write() { + prune_terminal_locked(&mut guard, now); if guard .get(conversation_id) .is_some_and(|state| state.record.status.is_active()) @@ -110,6 +129,7 @@ impl ChatRunRegistry { events: Vec::new(), next_seq: 1, broadcast: tx, + terminated_at: None, }, ); true @@ -121,7 +141,9 @@ impl ChatRunRegistry { /// Atomically: subscribe to the live broadcast for `conversation_id`, /// snapshot all buffered events with `seq > after`, return both. /// Returns `None` if the conversation has no run state (never - /// started, or evicted — which we don't do today). + /// started, or already evicted — terminal runs are dropped after + /// `TERMINAL_RETENTION_MS`, so a reconnect that arrives long after + /// completion sees `None` and falls back to the persisted history). /// /// The snapshot + subscription pair is consistent: any event /// pushed after the lock is released arrives via the receiver, @@ -278,16 +300,23 @@ impl ChatRunRegistry { ) { let now = now_ms(); if let Ok(mut guard) = self.inner.write() { - let state = guard - .entry(conversation_id.to_string()) - .or_insert_with(|| make_state(conversation_id, status, now)); - state.record.status = status; - state.record.updated_at = now; - if let Some(tool) = current_tool { - state.record.current_tool = tool; + let became_terminal; + { + let state = guard + .entry(conversation_id.to_string()) + .or_insert_with(|| make_state(conversation_id, status, now)); + state.record.status = status; + state.record.updated_at = now; + if let Some(tool) = current_tool { + state.record.current_tool = tool; + } + if let Some(err) = last_error { + state.record.last_error = err; + } + became_terminal = status.is_terminal() && state.mark_terminal(now); } - if let Some(err) = last_error { - state.record.last_error = err; + if became_terminal { + prune_terminal_locked(&mut guard, now); } } } @@ -305,58 +334,78 @@ impl ChatRunRegistry { ) { let now = now_ms(); if let Ok(mut guard) = self.inner.write() { - let state = guard.entry(conversation_id.to_string()).or_insert_with(|| { - make_state( - conversation_id, - status - .as_ref() - .map(|(s, _, _)| *s) - .unwrap_or(ChatRunStatus::Running), - now, - ) - }); - - if let Some((next_status, current_tool, last_error)) = status { - state.record.status = next_status; - if let Some(tool) = current_tool { - state.record.current_tool = tool; - } - if let Some(err) = last_error { - state.record.last_error = err; + let became_terminal; + { + let state = guard.entry(conversation_id.to_string()).or_insert_with(|| { + make_state( + conversation_id, + status + .as_ref() + .map(|(s, _, _)| *s) + .unwrap_or(ChatRunStatus::Running), + now, + ) + }); + + if let Some((next_status, current_tool, last_error)) = status { + state.record.status = next_status; + if let Some(tool) = current_tool { + state.record.current_tool = tool; + } + if let Some(err) = last_error { + state.record.last_error = err; + } } - } - let seq = state.next_seq; - state.next_seq += 1; - state.record.updated_at = now; - state.record.latest_seq = seq; - let record = ChatRunEventRecord { - conversation_id: conversation_id.to_string(), - seq, - timestamp: now, - frame, - }; - state.events.push(record.clone()); - if state.events.len() > MAX_EVENTS_PER_RUN { - let excess = state.events.len() - MAX_EVENTS_PER_RUN; - state.events.drain(0..excess); + let seq = state.next_seq; + state.next_seq += 1; + state.record.updated_at = now; + state.record.latest_seq = seq; + let record = ChatRunEventRecord { + conversation_id: conversation_id.to_string(), + seq, + timestamp: now, + frame, + }; + state.events.push(record.clone()); + if state.events.len() > MAX_EVENTS_PER_RUN { + let excess = state.events.len() - MAX_EVENTS_PER_RUN; + state.events.drain(0..excess); + } + // Fan out to live subscribers. `send` errors when there + // are zero receivers — expected (no one is tailing) and + // not worth logging. Lagged subscribers see `Lagged` on + // their next recv() and we leave that handling to the + // caller (the WS handler treats it as "drop the tail and + // ask the client to refetch"). + let _ = state.broadcast.send(record); + became_terminal = state.record.status.is_terminal() && state.mark_terminal(now); } - // Fan out to live subscribers. `send` errors when there - // are zero receivers — expected (no one is tailing) and - // not worth logging. Lagged subscribers see `Lagged` on - // their next recv() and we leave that handling to the - // caller (the WS handler treats it as "drop the tail and - // ask the client to refetch"). - let _ = state.broadcast.send(record); - if state.record.status.is_terminal() { + if became_terminal { if let Ok(mut aborts) = self.aborts.write() { aborts.remove(conversation_id); } + prune_terminal_locked(&mut guard, now); } } } } +impl ChatRunState { + /// Stamp the terminal timestamp the first time the run reaches a + /// terminal status. Returns `true` only on that first transition so + /// callers can run one-shot side effects (abort-handle cleanup, + /// eviction sweep) exactly once. + fn mark_terminal(&mut self, now: u64) -> bool { + if self.terminated_at.is_none() { + self.terminated_at = Some(now); + true + } else { + false + } + } +} + fn make_state(conversation_id: &str, status: ChatRunStatus, now: u64) -> ChatRunState { let (tx, _) = broadcast::channel(BROADCAST_CAPACITY); ChatRunState { @@ -372,6 +421,41 @@ fn make_state(conversation_id: &str, status: ChatRunStatus, now: u64) -> ChatRun events: Vec::new(), next_seq: 1, broadcast: tx, + // Always starts `None`; the caller stamps it via `mark_terminal` + // after setting the status so the first-transition bookkeeping + // (abort cleanup, eviction sweep) fires exactly once. + terminated_at: None, + } +} + +/// Evict terminal runs that have outlived their retention window, then +/// cap the number of retained terminal runs as a backstop against a +/// burst completing inside the TTL. Active runs are never evicted. +/// Caller must hold the `inner` write lock. +fn prune_terminal_locked(guard: &mut HashMap, now: u64) { + // TTL eviction. Active runs (`terminated_at == None`) are kept + // unconditionally; `saturating_sub` keeps a clock that jumped + // backwards from evicting everything. + guard.retain(|_, state| match state.terminated_at { + Some(t) => now.saturating_sub(t) < TERMINAL_RETENTION_MS, + None => true, + }); + + // LRU backstop: if too many terminal runs survived the TTL window, + // drop the oldest by terminal timestamp. + let terminal_count = guard.values().filter(|s| s.terminated_at.is_some()).count(); + if terminal_count > MAX_RETAINED_TERMINAL { + let mut terminals: Vec<(String, u64)> = guard + .iter() + .filter_map(|(id, s)| s.terminated_at.map(|t| (id.clone(), t))) + .collect(); + terminals.sort_by_key(|(_, t)| *t); + for (id, _) in terminals + .into_iter() + .take(terminal_count - MAX_RETAINED_TERMINAL) + { + guard.remove(&id); + } } } @@ -398,6 +482,17 @@ fn now_ms() -> u64 { .unwrap_or(0) } +#[cfg(test)] +impl ChatRunRegistry { + /// Test seam: run the eviction sweep with an injected clock so TTL + /// behaviour is testable without sleeping a real `TERMINAL_RETENTION_MS`. + fn force_prune(&self, now: u64) { + if let Ok(mut guard) = self.inner.write() { + prune_terminal_locked(&mut guard, now); + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -531,4 +626,90 @@ mod tests { let registry = ChatRunRegistry::default(); assert!(registry.subscribe("never-started", 0).is_none()); } + + #[test] + fn terminal_run_is_evicted_after_ttl() { + let registry = ChatRunRegistry::default(); + registry.start("c1"); + registry.event( + Some("c1"), + &AgentEvent::Error { + message: "boom".into(), + }, + ); + // Within the retention window it's still queryable for late + // reconnects. + assert_eq!(registry.list(false).len(), 1); + registry.force_prune(now_ms()); + assert_eq!(registry.list(false).len(), 1); + + // Far enough past the terminal timestamp, it's swept. + registry.force_prune(u64::MAX); + assert!(registry.list(false).is_empty()); + assert!(registry.subscribe("c1", 0).is_none()); + } + + #[test] + fn active_run_is_never_evicted() { + let registry = ChatRunRegistry::default(); + registry.start("c1"); + registry.event( + Some("c1"), + &AgentEvent::Delta { + content: "still going".into(), + }, + ); + // Even with an absurd clock, an active (non-terminal) run stays. + registry.force_prune(u64::MAX); + assert!(registry.is_active("c1")); + assert_eq!(registry.list(false).len(), 1); + } + + #[test] + fn cancelled_run_is_evicted_after_ttl() { + let registry = ChatRunRegistry::default(); + registry.start("c1"); + registry.cancelled(Some("c1")); + assert_eq!(registry.list(false).len(), 1); + registry.force_prune(u64::MAX); + assert!(registry.list(false).is_empty()); + } + + #[test] + fn terminal_runs_are_capped_by_lru_backstop() { + let registry = ChatRunRegistry::default(); + let total = MAX_RETAINED_TERMINAL + 5; + for i in 0..total { + let id = format!("c{i}"); + registry.start(&id); + registry.event( + Some(&id), + &AgentEvent::Done { + outcome: harness_core::RunOutcome::Stopped { iterations: 1 }, + conversation: Default::default(), + }, + ); + } + // The LRU backstop kicks in on each terminal transition, so the + // retained set never exceeds the cap regardless of TTL. + assert!(registry.list(false).len() <= MAX_RETAINED_TERMINAL); + } + + #[test] + fn restarting_an_evicted_conversation_succeeds() { + let registry = ChatRunRegistry::default(); + registry.start("c1"); + registry.event( + Some("c1"), + &AgentEvent::Done { + outcome: harness_core::RunOutcome::Stopped { iterations: 1 }, + conversation: Default::default(), + }, + ); + registry.force_prune(u64::MAX); + assert!(registry.list(false).is_empty()); + // A fresh run on the same id after eviction starts cleanly. + assert!(registry.try_start("c1")); + assert!(registry.is_active("c1")); + } }