Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 232 additions & 51 deletions crates/harness-server/src/chat_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ const MAX_EVENTS_PER_RUN: usize = 1_000;
/// be larger than typical token-delta bursts during a single turn so
/// transient backpressure doesn't kick a healthy client off.
const BROADCAST_CAPACITY: usize = 256;
/// How long a terminal (completed/failed/cancelled) run lingers in the
/// registry after it finishes. Long enough that a late WS reconnect can
/// still replay the final frames and observe channel closure, short
/// enough that memory doesn't grow with the lifetime number of
/// conversations served. Eviction is lazy (swept on the next
/// `try_start` / terminal transition) so there's no background task.
const TERMINAL_RETENTION_MS: u64 = 5 * 60 * 1_000;
/// Hard backstop on retained terminal runs, independent of the TTL.
/// Guards against a burst of conversations all completing inside the
/// retention window: once we exceed this, the oldest terminal runs are
/// dropped LRU-style by terminal timestamp. Active runs are never
/// counted against this cap or evicted.
const MAX_RETAINED_TERMINAL: usize = 256;

/// In-process status ledger for Web chat turns.
///
Expand All @@ -41,6 +54,11 @@ struct ChatRunState {
/// terminal event) replace the whole state, including this Sender,
/// so old subscribers naturally see channel closure.
broadcast: broadcast::Sender<ChatRunEventRecord>,
/// `now_ms()` at which this run first reached a terminal status, or
/// `None` while it's still active. Drives TTL + LRU eviction so the
/// `inner` map doesn't grow with the lifetime number of
/// conversations.
terminated_at: Option<u64>,
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -96,6 +114,7 @@ impl ChatRunRegistry {
last_error: None,
};
if let Ok(mut guard) = self.inner.write() {
prune_terminal_locked(&mut guard, now);
if guard
.get(conversation_id)
.is_some_and(|state| state.record.status.is_active())
Expand All @@ -110,6 +129,7 @@ impl ChatRunRegistry {
events: Vec::new(),
next_seq: 1,
broadcast: tx,
terminated_at: None,
},
);
true
Expand All @@ -121,7 +141,9 @@ impl ChatRunRegistry {
/// Atomically: subscribe to the live broadcast for `conversation_id`,
/// snapshot all buffered events with `seq > after`, return both.
/// Returns `None` if the conversation has no run state (never
/// started, or evicted — which we don't do today).
/// started, or already evicted — terminal runs are dropped after
/// `TERMINAL_RETENTION_MS`, so a reconnect that arrives long after
/// completion sees `None` and falls back to the persisted history).
///
/// The snapshot + subscription pair is consistent: any event
/// pushed after the lock is released arrives via the receiver,
Expand Down Expand Up @@ -278,16 +300,23 @@ impl ChatRunRegistry {
) {
let now = now_ms();
if let Ok(mut guard) = self.inner.write() {
let state = guard
.entry(conversation_id.to_string())
.or_insert_with(|| make_state(conversation_id, status, now));
state.record.status = status;
state.record.updated_at = now;
if let Some(tool) = current_tool {
state.record.current_tool = tool;
let became_terminal;
{
let state = guard
.entry(conversation_id.to_string())
.or_insert_with(|| make_state(conversation_id, status, now));
state.record.status = status;
state.record.updated_at = now;
if let Some(tool) = current_tool {
state.record.current_tool = tool;
}
if let Some(err) = last_error {
state.record.last_error = err;
}
became_terminal = status.is_terminal() && state.mark_terminal(now);
}
if let Some(err) = last_error {
state.record.last_error = err;
if became_terminal {
prune_terminal_locked(&mut guard, now);
}
}
}
Expand All @@ -305,58 +334,78 @@ impl ChatRunRegistry {
) {
let now = now_ms();
if let Ok(mut guard) = self.inner.write() {
let state = guard.entry(conversation_id.to_string()).or_insert_with(|| {
make_state(
conversation_id,
status
.as_ref()
.map(|(s, _, _)| *s)
.unwrap_or(ChatRunStatus::Running),
now,
)
});

if let Some((next_status, current_tool, last_error)) = status {
state.record.status = next_status;
if let Some(tool) = current_tool {
state.record.current_tool = tool;
}
if let Some(err) = last_error {
state.record.last_error = err;
let became_terminal;
{
let state = guard.entry(conversation_id.to_string()).or_insert_with(|| {
make_state(
conversation_id,
status
.as_ref()
.map(|(s, _, _)| *s)
.unwrap_or(ChatRunStatus::Running),
now,
)
});

if let Some((next_status, current_tool, last_error)) = status {
state.record.status = next_status;
if let Some(tool) = current_tool {
state.record.current_tool = tool;
}
if let Some(err) = last_error {
state.record.last_error = err;
}
}
}

let seq = state.next_seq;
state.next_seq += 1;
state.record.updated_at = now;
state.record.latest_seq = seq;
let record = ChatRunEventRecord {
conversation_id: conversation_id.to_string(),
seq,
timestamp: now,
frame,
};
state.events.push(record.clone());
if state.events.len() > MAX_EVENTS_PER_RUN {
let excess = state.events.len() - MAX_EVENTS_PER_RUN;
state.events.drain(0..excess);
let seq = state.next_seq;
state.next_seq += 1;
state.record.updated_at = now;
state.record.latest_seq = seq;
let record = ChatRunEventRecord {
conversation_id: conversation_id.to_string(),
seq,
timestamp: now,
frame,
};
state.events.push(record.clone());
if state.events.len() > MAX_EVENTS_PER_RUN {
let excess = state.events.len() - MAX_EVENTS_PER_RUN;
state.events.drain(0..excess);
}
// Fan out to live subscribers. `send` errors when there
// are zero receivers — expected (no one is tailing) and
// not worth logging. Lagged subscribers see `Lagged` on
// their next recv() and we leave that handling to the
// caller (the WS handler treats it as "drop the tail and
// ask the client to refetch").
let _ = state.broadcast.send(record);
became_terminal = state.record.status.is_terminal() && state.mark_terminal(now);
}
// Fan out to live subscribers. `send` errors when there
// are zero receivers — expected (no one is tailing) and
// not worth logging. Lagged subscribers see `Lagged` on
// their next recv() and we leave that handling to the
// caller (the WS handler treats it as "drop the tail and
// ask the client to refetch").
let _ = state.broadcast.send(record);
if state.record.status.is_terminal() {
if became_terminal {
if let Ok(mut aborts) = self.aborts.write() {
aborts.remove(conversation_id);
}
prune_terminal_locked(&mut guard, now);
}
}
}
}

impl ChatRunState {
/// Stamp the terminal timestamp the first time the run reaches a
/// terminal status. Returns `true` only on that first transition so
/// callers can run one-shot side effects (abort-handle cleanup,
/// eviction sweep) exactly once.
fn mark_terminal(&mut self, now: u64) -> bool {
if self.terminated_at.is_none() {
self.terminated_at = Some(now);
true
} else {
false
}
}
}

fn make_state(conversation_id: &str, status: ChatRunStatus, now: u64) -> ChatRunState {
let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
ChatRunState {
Expand All @@ -372,6 +421,41 @@ fn make_state(conversation_id: &str, status: ChatRunStatus, now: u64) -> ChatRun
events: Vec::new(),
next_seq: 1,
broadcast: tx,
// Always starts `None`; the caller stamps it via `mark_terminal`
// after setting the status so the first-transition bookkeeping
// (abort cleanup, eviction sweep) fires exactly once.
terminated_at: None,
}
}

/// Evict terminal runs that have outlived their retention window, then
/// cap the number of retained terminal runs as a backstop against a
/// burst completing inside the TTL. Active runs are never evicted.
/// Caller must hold the `inner` write lock.
fn prune_terminal_locked(guard: &mut HashMap<String, ChatRunState>, now: u64) {
// TTL eviction. Active runs (`terminated_at == None`) are kept
// unconditionally; `saturating_sub` keeps a clock that jumped
// backwards from evicting everything.
guard.retain(|_, state| match state.terminated_at {
Some(t) => now.saturating_sub(t) < TERMINAL_RETENTION_MS,
None => true,
});

// LRU backstop: if too many terminal runs survived the TTL window,
// drop the oldest by terminal timestamp.
let terminal_count = guard.values().filter(|s| s.terminated_at.is_some()).count();
if terminal_count > MAX_RETAINED_TERMINAL {
let mut terminals: Vec<(String, u64)> = guard
.iter()
.filter_map(|(id, s)| s.terminated_at.map(|t| (id.clone(), t)))
.collect();
terminals.sort_by_key(|(_, t)| *t);
for (id, _) in terminals
.into_iter()
.take(terminal_count - MAX_RETAINED_TERMINAL)
{
guard.remove(&id);
}
}
}

Expand All @@ -398,6 +482,17 @@ fn now_ms() -> u64 {
.unwrap_or(0)
}

#[cfg(test)]
impl ChatRunRegistry {
/// Test seam: run the eviction sweep with an injected clock so TTL
/// behaviour is testable without sleeping a real `TERMINAL_RETENTION_MS`.
fn force_prune(&self, now: u64) {
if let Ok(mut guard) = self.inner.write() {
prune_terminal_locked(&mut guard, now);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -531,4 +626,90 @@ mod tests {
let registry = ChatRunRegistry::default();
assert!(registry.subscribe("never-started", 0).is_none());
}

#[test]
fn terminal_run_is_evicted_after_ttl() {
let registry = ChatRunRegistry::default();
registry.start("c1");
registry.event(
Some("c1"),
&AgentEvent::Error {
message: "boom".into(),
},
);
// Within the retention window it's still queryable for late
// reconnects.
assert_eq!(registry.list(false).len(), 1);
registry.force_prune(now_ms());
assert_eq!(registry.list(false).len(), 1);

// Far enough past the terminal timestamp, it's swept.
registry.force_prune(u64::MAX);
assert!(registry.list(false).is_empty());
assert!(registry.subscribe("c1", 0).is_none());
}

#[test]
fn active_run_is_never_evicted() {
let registry = ChatRunRegistry::default();
registry.start("c1");
registry.event(
Some("c1"),
&AgentEvent::Delta {
content: "still going".into(),
},
);
// Even with an absurd clock, an active (non-terminal) run stays.
registry.force_prune(u64::MAX);
assert!(registry.is_active("c1"));
assert_eq!(registry.list(false).len(), 1);
}

#[test]
fn cancelled_run_is_evicted_after_ttl() {
let registry = ChatRunRegistry::default();
registry.start("c1");
registry.cancelled(Some("c1"));
assert_eq!(registry.list(false).len(), 1);
registry.force_prune(u64::MAX);
assert!(registry.list(false).is_empty());
}

#[test]
fn terminal_runs_are_capped_by_lru_backstop() {
let registry = ChatRunRegistry::default();
let total = MAX_RETAINED_TERMINAL + 5;
for i in 0..total {
let id = format!("c{i}");
registry.start(&id);
registry.event(
Some(&id),
&AgentEvent::Done {
outcome: harness_core::RunOutcome::Stopped { iterations: 1 },
conversation: Default::default(),
},
);
}
// The LRU backstop kicks in on each terminal transition, so the
// retained set never exceeds the cap regardless of TTL.
assert!(registry.list(false).len() <= MAX_RETAINED_TERMINAL);
}

#[test]
fn restarting_an_evicted_conversation_succeeds() {
let registry = ChatRunRegistry::default();
registry.start("c1");
registry.event(
Some("c1"),
&AgentEvent::Done {
outcome: harness_core::RunOutcome::Stopped { iterations: 1 },
conversation: Default::default(),
},
);
registry.force_prune(u64::MAX);
assert!(registry.list(false).is_empty());
// A fresh run on the same id after eviction starts cleanly.
assert!(registry.try_start("c1"));
assert!(registry.is_active("c1"));
}
}
Loading