From 2800ed5532b590f2b7fba9b89db7278f8003de5b Mon Sep 17 00:00:00 2001 From: Sasha Varlamov Date: Sat, 11 Apr 2026 22:56:44 +0000 Subject: [PATCH 1/7] perf(daemon): shard TraceIngressState with DashMap, per-root normalizers, bounded connection pool Three architectural improvements to the trace2 ingestion pipeline: 1. Replace Mutex with 15 independent DashMap/DashSet fields - Each field (root_worktrees, root_families, root_argv, root_mutating, etc.) is now a dashmap::DashMap or dashmap::DashSet - Per-root events no longer serialize on a single global mutex; concurrent connections to different roots run without blocking each other - Readonly fast path is now fully lock-free: DashMap shard lookups replace global Mutex acquisition for the 40+/sec readonly events from IDEs like Zed - Fold `mark_trace_root_activity` into the existing lock acquisition in the mutating path, eliminating one full Mutex cycle per mutating event - Add `dashmap = "6"` dependency 2. Per-root normalizer instances (dashmap::DashMap>>) - Replace single global AsyncMutex with a per-root map - Normalizer state is now scoped to each root SID; entries are cleaned up when clear_trace_root_tracking removes the root - Enables future concurrent ingest workers per root without further refactor 3. Bounded trace connection thread pool - Add a hard ceiling (MAX_CONCURRENT_TRACE_CONNECTIONS=512) on simultaneous connection handler threads - Excess connections are dropped with a log message rather than spawning unbounded OS threads (prevents memory exhaustion and scheduler overload) - Active count tracked with a shared AtomicUsize; thread spawn/exit are O(1) atomic operations All 1420 tests pass. Benchmarks: readonly_flood/zed_mixed_1000_events unchanged (p=0.99; concurrency benefit not captured by single-threaded microbenchmark). Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 15 ++ Cargo.toml | 1 + src/daemon.rs | 511 +++++++++++++++++++++++++++----------------------- 3 files changed, 292 insertions(+), 235 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a076b9731..ce0685deb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,6 +630,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dbus" version = "0.9.10" @@ -1104,6 +1118,7 @@ dependencies = [ "chrono", "clap", "crossterm", + "dashmap", "dirs", "envy", "filetime", diff --git a/Cargo.toml b/Cargo.toml index db344590f..3d93233c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ gix-index = "0.48.0" regex = "1.12" toml = "0.9" unicode-normalization = "0.1" +dashmap = "6" [target.'cfg(target_os = "linux")'.dependencies] openssl = { version = "0.10", features = ["vendored"] } diff --git a/src/daemon.rs b/src/daemon.rs index c9754bc41..b41607361 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -3709,26 +3709,11 @@ impl RecentReplayPrerequisite { } } -#[derive(Debug, Default, Clone)] -struct TraceIngressState { - root_worktrees: HashMap, - root_families: HashMap, - root_argv: HashMap>, - root_pre_repo: HashMap, - root_inflight_merge_squash_contexts: HashMap, - root_terminal_merge_squash_contexts: HashMap, - root_mutating: HashMap, - root_target_repo_only: HashMap, - root_reflog_refs: HashMap>, - root_head_reflog_start_offsets: HashMap, - root_family_reflog_start_offsets: HashMap>, - root_last_activity_ns: HashMap, - /// Roots whose start event was identified as definitely read-only. All - /// subsequent events for these roots (including exit) take the fast path. - root_definitely_read_only: HashSet, - root_open_connections: HashMap, - root_close_fallback_enqueued: HashSet, -} +// Per-root ingress state is sharded across 15 DashMap/DashSet fields on +// ActorDaemonCoordinator (prefixed `ingr_`), replacing the previous single +// Mutex. Each field is independently lockable at the +// DashMap shard level, allowing concurrent events for different roots to +// proceed without serializing on a single global lock. struct CarryoverCaptureInput<'a> { root_sid: &'a str, @@ -3746,9 +3731,18 @@ pub struct ActorDaemonCoordinator { backend: Arc, coordinator: Arc>, - normalizer: AsyncMutex< - crate::daemon::trace_normalizer::TraceNormalizer< - crate::daemon::git_backend::SystemGitBackend, + // Per-root normalizers: created lazily on first payload for each root and + // removed when the root's terminal event is processed. Wrapping in + // Arc lets us clone the reference out of the DashMap before awaiting the + // inner AsyncMutex (DashMap refs must not be held across await points). + normalizers: dashmap::DashMap< + String, + Arc< + AsyncMutex< + crate::daemon::trace_normalizer::TraceNormalizer< + crate::daemon::git_backend::SystemGitBackend, + >, + >, >, >, pending_rebase_original_head_by_worktree: Mutex>, @@ -3774,7 +3768,25 @@ pub struct ActorDaemonCoordinator { queued_trace_payloads_by_root: Mutex>, processed_trace_ingest_seq: AtomicUsize, trace_ingest_progress_notify: Notify, - trace_ingress_state: Mutex, + // Per-root ingress state (sharded by root SID via DashMap internals). + ingr_root_worktrees: dashmap::DashMap, + ingr_root_families: dashmap::DashMap, + ingr_root_argv: dashmap::DashMap>, + ingr_root_pre_repo: dashmap::DashMap, + ingr_root_inflight_merge_squash: dashmap::DashMap, + ingr_root_terminal_merge_squash: dashmap::DashMap, + ingr_root_mutating: dashmap::DashMap, + ingr_root_target_repo_only: dashmap::DashMap, + ingr_root_reflog_refs: dashmap::DashMap>, + ingr_root_head_reflog_start_offsets: dashmap::DashMap, + ingr_root_family_reflog_start_offsets: dashmap::DashMap>, + ingr_root_last_activity_ns: dashmap::DashMap, + /// Roots whose `start` event was identified as definitely read-only. + /// Subsequent events for these roots (including exit/atexit) take the + /// lightweight fast path and are never enqueued. + ingr_root_definitely_read_only: dashmap::DashSet, + ingr_root_open_connections: dashmap::DashMap, + ingr_root_close_fallback_enqueued: dashmap::DashSet, wrapper_states: Mutex>, wrapper_state_notify: Notify, shutting_down: AtomicBool, @@ -3802,9 +3814,7 @@ impl ActorDaemonCoordinator { coordinator: Arc::new(crate::daemon::coordinator::Coordinator::new( backend.clone(), )), - normalizer: AsyncMutex::new(crate::daemon::trace_normalizer::TraceNormalizer::new( - backend.clone(), - )), + normalizers: dashmap::DashMap::new(), backend, pending_rebase_original_head_by_worktree: Mutex::new(HashMap::new()), pending_cherry_pick_sources_by_worktree: Mutex::new(HashMap::new()), @@ -3835,7 +3845,21 @@ impl ActorDaemonCoordinator { queued_trace_payloads_by_root: Mutex::new(HashMap::new()), processed_trace_ingest_seq: AtomicUsize::new(0), trace_ingest_progress_notify: Notify::new(), - trace_ingress_state: Mutex::new(TraceIngressState::default()), + ingr_root_worktrees: dashmap::DashMap::new(), + ingr_root_families: dashmap::DashMap::new(), + ingr_root_argv: dashmap::DashMap::new(), + ingr_root_pre_repo: dashmap::DashMap::new(), + ingr_root_inflight_merge_squash: dashmap::DashMap::new(), + ingr_root_terminal_merge_squash: dashmap::DashMap::new(), + ingr_root_mutating: dashmap::DashMap::new(), + ingr_root_target_repo_only: dashmap::DashMap::new(), + ingr_root_reflog_refs: dashmap::DashMap::new(), + ingr_root_head_reflog_start_offsets: dashmap::DashMap::new(), + ingr_root_family_reflog_start_offsets: dashmap::DashMap::new(), + ingr_root_last_activity_ns: dashmap::DashMap::new(), + ingr_root_definitely_read_only: dashmap::DashSet::new(), + ingr_root_open_connections: dashmap::DashMap::new(), + ingr_root_close_fallback_enqueued: dashmap::DashSet::new(), wrapper_states: Mutex::new(HashMap::new()), wrapper_state_notify: Notify::new(), shutting_down: AtomicBool::new(false), @@ -3907,9 +3931,9 @@ impl ActorDaemonCoordinator { /// Garbage-collect empty or idle entries from per-family and per-root maps /// to prevent unbounded memory growth in long-running daemon processes. fn gc_stale_family_state(&self) { - // NOTE: Do NOT call normalizer.sweep_orphans() here — it removes ALL - // pending/deferred roots unconditionally which destroys in-flight trace - // state. sweep_orphans() is only safe at daemon shutdown. + // NOTE: Do NOT call any per-root normalizer's sweep_orphans() here — + // it removes ALL pending/deferred roots unconditionally which destroys + // in-flight trace state. sweep_orphans() is only safe at shutdown. if let Ok(mut map) = self.recent_replay_prerequisites_by_family.lock() { map.retain(|_, entries| !entries.is_empty()); } @@ -4270,55 +4294,38 @@ impl ActorDaemonCoordinator { Ok(()) } - fn trace_root_is_tracked(ingress: &TraceIngressState, root: &str) -> bool { - ingress.root_worktrees.contains_key(root) - || ingress.root_families.contains_key(root) - || ingress.root_argv.contains_key(root) - || ingress.root_pre_repo.contains_key(root) - || ingress.root_mutating.contains_key(root) - || ingress.root_target_repo_only.contains_key(root) - || ingress.root_reflog_refs.contains_key(root) - || ingress.root_head_reflog_start_offsets.contains_key(root) - || ingress.root_family_reflog_start_offsets.contains_key(root) - } - - fn mark_trace_root_activity(&self, root_sid: &str) -> Result<(), GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; - ingress - .root_last_activity_ns - .insert(root_sid.to_string(), now_unix_nanos() as u64); - ingress.root_close_fallback_enqueued.remove(root_sid); - Ok(()) + fn trace_root_is_tracked(&self, root: &str) -> bool { + self.ingr_root_worktrees.contains_key(root) + || self.ingr_root_families.contains_key(root) + || self.ingr_root_argv.contains_key(root) + || self.ingr_root_pre_repo.contains_key(root) + || self.ingr_root_mutating.contains_key(root) + || self.ingr_root_target_repo_only.contains_key(root) + || self.ingr_root_reflog_refs.contains_key(root) + || self.ingr_root_head_reflog_start_offsets.contains_key(root) + || self + .ingr_root_family_reflog_start_offsets + .contains_key(root) } fn trace_root_connection_opened(&self, root_sid: &str) -> Result<(), GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; - *ingress - .root_open_connections + *self + .ingr_root_open_connections .entry(root_sid.to_string()) .or_insert(0) += 1; Ok(()) } fn record_trace_connection_close(&self, roots: &[String]) -> Result, GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; let mut stale_roots = Vec::new(); for root_sid in roots { - if let Some(count) = ingress.root_open_connections.get_mut(root_sid) { + if let Some(mut count) = self.ingr_root_open_connections.get_mut(root_sid) { if *count > 1 { *count -= 1; continue; } - ingress.root_open_connections.remove(root_sid); + drop(count); + self.ingr_root_open_connections.remove(root_sid); } stale_roots.push(root_sid.clone()); } @@ -4369,29 +4376,28 @@ impl ActorDaemonCoordinator { fn enqueue_stale_connection_close_fallbacks(&self, roots: &[String]) -> Result<(), GitAiError> { let stale_roots = { - let mut ingress = self.trace_ingress_state.lock().map_err(|_| { - GitAiError::Generic("trace ingress state lock poisoned".to_string()) - })?; let mut stale = Vec::new(); for root_sid in roots { - if !Self::trace_root_is_tracked(&ingress, root_sid) { + if !self.trace_root_is_tracked(root_sid) { continue; } - if ingress - .root_open_connections + if self + .ingr_root_open_connections .get(root_sid) - .copied() + .map(|v| *v) .unwrap_or(0) > 0 { continue; } - if ingress.root_close_fallback_enqueued.contains(root_sid) { + // DashSet::insert returns true if the value was newly inserted; + // false means a fallback is already scheduled for this root. + if !self + .ingr_root_close_fallback_enqueued + .insert(root_sid.clone()) + { continue; } - ingress - .root_close_fallback_enqueued - .insert(root_sid.clone()); stale.push(root_sid.clone()); } stale @@ -4421,25 +4427,23 @@ impl ActorDaemonCoordinator { } fn clear_trace_root_tracking(&self, root_sid: &str) -> Result<(), GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; - ingress.root_worktrees.remove(root_sid); - ingress.root_families.remove(root_sid); - ingress.root_argv.remove(root_sid); - ingress.root_pre_repo.remove(root_sid); - ingress.root_inflight_merge_squash_contexts.remove(root_sid); - ingress.root_terminal_merge_squash_contexts.remove(root_sid); - ingress.root_mutating.remove(root_sid); - ingress.root_target_repo_only.remove(root_sid); - ingress.root_reflog_refs.remove(root_sid); - ingress.root_head_reflog_start_offsets.remove(root_sid); - ingress.root_family_reflog_start_offsets.remove(root_sid); - ingress.root_last_activity_ns.remove(root_sid); - ingress.root_definitely_read_only.remove(root_sid); - ingress.root_open_connections.remove(root_sid); - ingress.root_close_fallback_enqueued.remove(root_sid); + self.ingr_root_worktrees.remove(root_sid); + self.ingr_root_families.remove(root_sid); + self.ingr_root_argv.remove(root_sid); + self.ingr_root_pre_repo.remove(root_sid); + self.ingr_root_inflight_merge_squash.remove(root_sid); + self.ingr_root_terminal_merge_squash.remove(root_sid); + self.ingr_root_mutating.remove(root_sid); + self.ingr_root_target_repo_only.remove(root_sid); + self.ingr_root_reflog_refs.remove(root_sid); + self.ingr_root_head_reflog_start_offsets.remove(root_sid); + self.ingr_root_family_reflog_start_offsets.remove(root_sid); + self.ingr_root_last_activity_ns.remove(root_sid); + self.ingr_root_definitely_read_only.remove(root_sid); + self.ingr_root_open_connections.remove(root_sid); + self.ingr_root_close_fallback_enqueued.remove(root_sid); + // Release the per-root normalizer once the root is fully processed. + self.normalizers.remove(root_sid); let mut queued = self.queued_trace_payloads_by_root.lock().map_err(|_| { GitAiError::Generic("queued trace payloads by root lock poisoned".to_string()) })?; @@ -4933,84 +4937,77 @@ impl ActorDaemonCoordinator { trace_invocation_is_definitely_read_only(early_primary.as_deref(), &argv); // For events with no command info (exit/atexit), defer to the cached // flag inside the lock to avoid a second lock acquisition. + // Fast-path decision: is this definitely a read-only event? + // For events with command info (start, def_repo): use argv-based check. + // For events without command info (exit, atexit): check the cached flag. + // Both checks are now lock-free thanks to DashSet. let may_be_read_only = event_is_read_only || early_primary.is_none(); if may_be_read_only { - let mut ingress = match self.trace_ingress_state.lock() { - Ok(guard) => guard, - // If the lock is poisoned we cannot determine read-only status; - // fall through and let the ingest worker handle error recovery. - Err(_) => return false, - }; - // If the event itself wasn't identified as read-only, check the root flag. - if !event_is_read_only && !ingress.root_definitely_read_only.contains(&root) { - // Not read-only — drop the lock and fall through to the full path. - drop(ingress); + let is_read_only_root = self.ingr_root_definitely_read_only.contains(&root); + if !event_is_read_only && !is_read_only_root { + // Neither the event itself nor the cached root flag indicates + // read-only — fall through to the full mutating path. } else { - // Activity tracking (folded here to avoid a separate lock acquisition) - ingress - .root_last_activity_ns + // Activity tracking — DashMap ops, no global lock needed. + self.ingr_root_last_activity_ns .insert(root.clone(), now_unix_nanos() as u64); - ingress.root_close_fallback_enqueued.remove(&root); - // Minimal state tracking for connection lifecycle + self.ingr_root_close_fallback_enqueued.remove(&root); + // Minimal state tracking for connection lifecycle. if let Some(worktree) = trace_payload_worktree_hint(payload) { - ingress.root_worktrees.insert(root.clone(), worktree); + self.ingr_root_worktrees.insert(root.clone(), worktree); } if event == "start" && sid == root && !argv.is_empty() { - ingress.root_argv.insert(root.clone(), argv); - ingress.root_definitely_read_only.insert(root.clone()); + self.ingr_root_argv.insert(root.clone(), argv); + self.ingr_root_definitely_read_only.insert(root.clone()); } - ingress.root_mutating.entry(root.clone()).or_insert(false); - // Cleanup on terminal event + self.ingr_root_mutating.entry(root.clone()).or_insert(false); + // Cleanup on terminal event. if is_terminal_root_trace_event(&event, &sid, &root) { - ingress.root_families.remove(&root); - ingress.root_mutating.remove(&root); - ingress.root_target_repo_only.remove(&root); - ingress.root_argv.remove(&root); - ingress.root_pre_repo.remove(&root); - ingress.root_worktrees.remove(&root); - ingress.root_inflight_merge_squash_contexts.remove(&root); - ingress.root_terminal_merge_squash_contexts.remove(&root); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); - ingress.root_last_activity_ns.remove(&root); - ingress.root_definitely_read_only.remove(&root); + self.ingr_root_families.remove(&root); + self.ingr_root_mutating.remove(&root); + self.ingr_root_target_repo_only.remove(&root); + self.ingr_root_argv.remove(&root); + self.ingr_root_pre_repo.remove(&root); + self.ingr_root_worktrees.remove(&root); + self.ingr_root_inflight_merge_squash.remove(&root); + self.ingr_root_terminal_merge_squash.remove(&root); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); + self.ingr_root_last_activity_ns.remove(&root); + self.ingr_root_definitely_read_only.remove(&root); } // Payload was fully handled inline; tell the caller to skip enqueue. return true; } } - let _ = self.mark_trace_root_activity(&root); - let mut ingress = match self.trace_ingress_state.lock() { - Ok(guard) => guard, - Err(_) => { - tracing::error!( - component = "daemon", - phase = "augment_trace_payload_with_reflog_metadata", - %sid, - %event, - "trace ingress state lock poisoned" - ); - return false; - } - }; + // Mutating path — all state updates go directly to the per-root DashMaps, + // no global lock needed. Each DashMap op locks only one shard. + + // Activity bookkeeping (previously a separate lock cycle). + self.ingr_root_last_activity_ns + .insert(root.clone(), now_unix_nanos() as u64); + self.ingr_root_close_fallback_enqueued.remove(&root); if let Some(worktree) = trace_payload_worktree_hint(payload) { if let Some(common_dir) = common_dir_for_worktree(&worktree) { let family = common_dir.canonicalize().unwrap_or(common_dir); - ingress - .root_families + self.ingr_root_families .insert(root.clone(), family.to_string_lossy().to_string()); } - ingress.root_worktrees.insert(root.clone(), worktree); + self.ingr_root_worktrees.insert(root.clone(), worktree); } let payload_argv = trace_payload_argv(payload); if event == "start" && sid == root && !payload_argv.is_empty() { - ingress.root_argv.insert(root.clone(), payload_argv.clone()); + self.ingr_root_argv + .insert(root.clone(), payload_argv.clone()); } let effective_argv = if payload_argv.is_empty() { - ingress.root_argv.get(&root).cloned().unwrap_or_default() + self.ingr_root_argv + .get(&root) + .map(|v| v.clone()) + .unwrap_or_default() } else { payload_argv }; @@ -5018,60 +5015,66 @@ impl ActorDaemonCoordinator { .or_else(|| trace_argv_primary_command(&effective_argv)); if let Some(primary) = effective_primary.clone() { let should_capture = trace_command_may_mutate_refs(Some(primary.as_str())); - match ingress.root_mutating.get(&root).copied() { + match self.ingr_root_mutating.get(&root).map(|v| *v) { Some(false) if should_capture => { - ingress.root_mutating.insert(root.clone(), true); + self.ingr_root_mutating.insert(root.clone(), true); } None => { - ingress.root_mutating.insert(root.clone(), should_capture); + self.ingr_root_mutating.insert(root.clone(), should_capture); } _ => {} } let target_repo_only = trace_command_uses_target_repo_context_only(Some(primary.as_str())); - match ingress.root_target_repo_only.get(&root).copied() { + match self.ingr_root_target_repo_only.get(&root).map(|v| *v) { Some(false) if target_repo_only => { - ingress.root_target_repo_only.insert(root.clone(), true); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); + self.ingr_root_target_repo_only.insert(root.clone(), true); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); } None => { - ingress - .root_target_repo_only + self.ingr_root_target_repo_only .insert(root.clone(), target_repo_only); } _ => {} } } - let Some(worktree) = ingress.root_worktrees.get(&root).cloned() else { + let Some(worktree) = self.ingr_root_worktrees.get(&root).map(|v| v.clone()) else { if is_terminal_root_trace_event(&event, &sid, &root) { - ingress.root_families.remove(&root); - ingress.root_mutating.remove(&root); - ingress.root_target_repo_only.remove(&root); - ingress.root_argv.remove(&root); - ingress.root_pre_repo.remove(&root); - ingress.root_inflight_merge_squash_contexts.remove(&root); - ingress.root_terminal_merge_squash_contexts.remove(&root); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); + self.ingr_root_families.remove(&root); + self.ingr_root_mutating.remove(&root); + self.ingr_root_target_repo_only.remove(&root); + self.ingr_root_argv.remove(&root); + self.ingr_root_pre_repo.remove(&root); + self.ingr_root_inflight_merge_squash.remove(&root); + self.ingr_root_terminal_merge_squash.remove(&root); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); } return false; }; - let should_capture_mutation = *ingress.root_mutating.get(&root).unwrap_or(&false); - let target_repo_only = *ingress.root_target_repo_only.get(&root).unwrap_or(&false); + let should_capture_mutation = self + .ingr_root_mutating + .get(&root) + .map(|v| *v) + .unwrap_or(false); + let target_repo_only = self + .ingr_root_target_repo_only + .get(&root) + .map(|v| *v) + .unwrap_or(false); if !target_repo_only - && !ingress.root_pre_repo.contains_key(&root) + && !self.ingr_root_pre_repo.contains_key(&root) && let Some(state) = read_head_state_for_worktree(&worktree) { - ingress - .root_pre_repo + self.ingr_root_pre_repo .insert(root.clone(), repo_context_from_head_state(state)); } - let pre_repo = ingress.root_pre_repo.get(&root).cloned(); + let pre_repo = self.ingr_root_pre_repo.get(&root).map(|v| v.clone()); if should_capture_mutation && !target_repo_only { let contextual_refs = if let Some(repo) = pre_repo.as_ref() { tracked_reflog_refs_for_command( @@ -5088,27 +5091,25 @@ impl ActorDaemonCoordinator { &effective_argv, ) }; - let refs = ingress - .root_reflog_refs - .entry(root.clone()) - .or_insert_with(Vec::new); + let mut refs_entry = self.ingr_root_reflog_refs.entry(root.clone()).or_default(); for reference in contextual_refs { - if !refs.iter().any(|existing| existing == &reference) { - refs.push(reference); + if !refs_entry.iter().any(|existing| existing == &reference) { + refs_entry.push(reference); } } - refs.sort(); - refs.dedup(); + refs_entry.sort(); + refs_entry.dedup(); + // Drop the entry ref to release the DashMap shard lock. + drop(refs_entry); } - let cached_inflight_merge_squash = ingress - .root_inflight_merge_squash_contexts + let cached_inflight_merge_squash = self + .ingr_root_inflight_merge_squash .get(&root) - .cloned(); - let cached_terminal_merge_squash = ingress - .root_terminal_merge_squash_contexts + .map(|v| v.clone()); + let cached_terminal_merge_squash = self + .ingr_root_terminal_merge_squash .get(&root) - .cloned(); - drop(ingress); + .map(|v| v.clone()); let mut inflight_merge_squash_to_cache = None; if let Some(object) = payload.as_object_mut() { @@ -5235,46 +5236,40 @@ impl ActorDaemonCoordinator { } } - let mut ingress = match self.trace_ingress_state.lock() { - Ok(guard) => guard, - Err(_) => { - tracing::error!( - component = "daemon", - phase = "augment_trace_payload_with_reflog_metadata", - %sid, - %event, - "trace ingress state lock poisoned" - ); - return false; - } - }; + // Cache inflight/terminal merge-squash context (first-writer wins). if let Some(context) = inflight_merge_squash_to_cache { - ingress - .root_inflight_merge_squash_contexts + self.ingr_root_inflight_merge_squash .entry(root.clone()) .or_insert(context); } if let Some(context) = terminal_merge_squash_to_cache { - ingress - .root_terminal_merge_squash_contexts + self.ingr_root_terminal_merge_squash .entry(root.clone()) .or_insert(context); } if should_capture_mutation && !target_repo_only { - if !ingress.root_head_reflog_start_offsets.contains_key(&root) + if !self.ingr_root_head_reflog_start_offsets.contains_key(&root) && let Some(offset) = daemon_worktree_head_reflog_offset(&worktree) { - ingress - .root_head_reflog_start_offsets + self.ingr_root_head_reflog_start_offsets .insert(root.clone(), offset); } - if !ingress.root_family_reflog_start_offsets.contains_key(&root) - && let Some(refs) = ingress.root_reflog_refs.get(&root) - && let Some(offsets) = daemon_reflog_offsets_for_refs(&worktree, refs) + if !self + .ingr_root_family_reflog_start_offsets + .contains_key(&root) { - ingress - .root_family_reflog_start_offsets - .insert(root.clone(), offsets); + // Clone the refs vec to avoid holding a DashMap shard lock + // across the I/O call to daemon_reflog_offsets_for_refs. + let refs_opt = self + .ingr_root_reflog_refs + .get(&root) + .map(|v| v.value().clone()); + if let Some(refs) = refs_opt + && let Some(offsets) = daemon_reflog_offsets_for_refs(&worktree, &refs) + { + self.ingr_root_family_reflog_start_offsets + .insert(root.clone(), offsets); + } } } @@ -5286,8 +5281,10 @@ impl ActorDaemonCoordinator { object.insert("git_ai_post_repo".to_string(), json!(state)); } if should_capture_mutation && !target_repo_only { - if let Some(start_offset) = - ingress.root_head_reflog_start_offsets.get(&root).copied() + if let Some(start_offset) = self + .ingr_root_head_reflog_start_offsets + .get(&root) + .map(|v| *v) { object.insert( "git_ai_worktree_head_reflog_start".to_string(), @@ -5300,16 +5297,26 @@ impl ActorDaemonCoordinator { json!(end_offset), ); } - if let Some(start_offsets) = ingress.root_family_reflog_start_offsets.get(&root) { + // Clone start_offsets and refs so we don't hold shard locks + // across I/O calls (daemon_reflog_offsets_for_refs, etc.). + let start_offsets_opt = self + .ingr_root_family_reflog_start_offsets + .get(&root) + .map(|v| v.value().clone()); + if let Some(start_offsets) = start_offsets_opt { object.insert( "git_ai_family_reflog_start".to_string(), - json!(start_offsets), + json!(&start_offsets), ); - if let Some(refs) = ingress.root_reflog_refs.get(&root) + let refs_opt = self + .ingr_root_reflog_refs + .get(&root) + .map(|v| v.value().clone()); + if let Some(refs) = refs_opt && let Some(mut end_offsets) = - daemon_reflog_offsets_for_refs(&worktree, refs) + daemon_reflog_offsets_for_refs(&worktree, &refs) { - for (reference, start_offset) in start_offsets { + for (reference, start_offset) in &start_offsets { let end_offset = end_offsets .entry(reference.clone()) .or_insert(*start_offset); @@ -5319,7 +5326,7 @@ impl ActorDaemonCoordinator { } match daemon_reflog_delta_from_offsets( &worktree, - start_offsets, + &start_offsets, &end_offsets, ) { Ok(ref_changes) => { @@ -5427,17 +5434,17 @@ impl ActorDaemonCoordinator { } } } - ingress.root_worktrees.remove(&root); - ingress.root_families.remove(&root); - ingress.root_argv.remove(&root); - ingress.root_pre_repo.remove(&root); - ingress.root_inflight_merge_squash_contexts.remove(&root); - ingress.root_terminal_merge_squash_contexts.remove(&root); - ingress.root_mutating.remove(&root); - ingress.root_target_repo_only.remove(&root); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); + self.ingr_root_worktrees.remove(&root); + self.ingr_root_families.remove(&root); + self.ingr_root_argv.remove(&root); + self.ingr_root_pre_repo.remove(&root); + self.ingr_root_inflight_merge_squash.remove(&root); + self.ingr_root_terminal_merge_squash.remove(&root); + self.ingr_root_mutating.remove(&root); + self.ingr_root_target_repo_only.remove(&root); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); } // Payload was fully augmented for a mutating command; tell the caller // to stamp a sequence number and enqueue it. @@ -7031,8 +7038,21 @@ impl ActorDaemonCoordinator { .and_then(Value::as_str) .unwrap_or_default() .to_string(); + // Obtain (or lazily create) the per-root normalizer. We clone the Arc + // out of the DashMap before awaiting so we don't hold a shard lock + // across the async suspension point. + let root_key = payload_root_sid.clone().unwrap_or_default(); + let normalizer_arc = self + .normalizers + .entry(root_key) + .or_insert_with(|| { + Arc::new(AsyncMutex::new( + crate::daemon::trace_normalizer::TraceNormalizer::new(self.backend.clone()), + )) + }) + .clone(); let emitted = { - let mut normalizer = self.normalizer.lock().await; + let mut normalizer = normalizer_arc.lock().await; normalizer.ingest_payload(&payload)? }; let Some(command) = emitted else { @@ -7550,6 +7570,12 @@ fn trace_listener_loop_actor( ) -> Result<(), GitAiError> { #[cfg(not(windows))] { + // Hard ceiling on concurrent connection threads. At ~40 trace events/sec + // from Zed each connection lasts <10ms, so this is effectively unlimited + // for normal workloads; it protects against pathological floods (>1K/sec). + const MAX_CONCURRENT_TRACE_CONNECTIONS: usize = 512; + let active_connections = Arc::new(AtomicUsize::new(0)); + remove_socket_if_exists(&trace_socket_path)?; let listener = ListenerOptions::new() .name(local_socket_name(&trace_socket_path)?) @@ -7563,15 +7589,30 @@ fn trace_listener_loop_actor( let Ok(stream) = stream else { continue; }; + // Back-pressure: drop the connection rather than spawning unbounded + // threads. The git process will reconnect on the next invocation. + let current = active_connections.load(Ordering::Relaxed); + if current >= MAX_CONCURRENT_TRACE_CONNECTIONS { + tracing::debug!( + current, + limit = MAX_CONCURRENT_TRACE_CONNECTIONS, + "trace listener: dropping connection" + ); + continue; + } + active_connections.fetch_add(1, Ordering::Relaxed); + let active = active_connections.clone(); let coord = coordinator.clone(); if std::thread::Builder::new() .spawn(move || { if let Err(e) = handle_trace_connection_actor(stream, coord) { tracing::debug!(%e, "trace connection error"); } + active.fetch_sub(1, Ordering::Relaxed); }) .is_err() { + active_connections.fetch_sub(1, Ordering::Relaxed); tracing::error!("trace listener: failed to spawn handler thread"); break; } From a22127036f341855acada55972f8d246e2b32282 Mon Sep 17 00:00:00 2001 From: Sasha Varlamov Date: Sun, 12 Apr 2026 17:31:56 +0000 Subject: [PATCH 2/7] ci: trigger re-run for rebase worktree flake check From 4057ded829f33d8fc089d8f1442c9ac1ee2a2b96 Mon Sep 17 00:00:00 2001 From: Sasha Varlamov Date: Sun, 12 Apr 2026 18:07:30 +0000 Subject: [PATCH 3/7] fix(daemon): close three correctness issues in DashMap sharding layer 1. TOCTOU race in record_trace_connection_close: the old drop(count)+remove() pattern released the DashMap shard lock between check and delete, allowing trace_root_connection_opened on a concurrent thread to increment the counter and then have it silently deleted. Fix: use remove_if_mut to atomically decrement and remove within a single shard-lock acquisition. 2. Orphaned per-root normalizer leak: after clear_trace_root_tracking removes normalizers[root], a late connection-close fallback atexit would lazily create a fresh empty normalizer that never gets cleaned up, causing an unbounded slow memory leak in long-running daemons. Fix: maintain a bounded completed_root_sids DashSet (FIFO-evicted at 16 384 entries, mirroring the normalizer's own completed_roots limit) and early-exit in apply_trace_payload_to_state for already-completed roots. 3. Panic safety of active-connection counter: the plain fetch_sub after the handler call would be skipped if handle_trace_connection_actor panics, leaking the counter and eventually blocking new connections. Fix: use a small RAII ActiveGuard whose Drop impl performs the decrement, guaranteeing cleanup even during panic unwinding. Co-Authored-By: Claude Sonnet 4.6 --- src/daemon.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index b41607361..0b7349771 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -3787,6 +3787,13 @@ pub struct ActorDaemonCoordinator { ingr_root_definitely_read_only: dashmap::DashSet, ingr_root_open_connections: dashmap::DashMap, ingr_root_close_fallback_enqueued: dashmap::DashSet, + /// Root SIDs that have been fully processed (clear_trace_root_tracking was + /// called). Used to discard late/duplicate events (e.g. a connection-close + /// fallback atexit that arrives after the root's terminal event) without + /// creating a fresh per-root normalizer that would never be cleaned up. + /// Bounded to COMPLETED_ROOT_SID_RETENTION_LIMIT entries (FIFO eviction). + completed_root_sids: dashmap::DashSet, + completed_root_order: Mutex>, wrapper_states: Mutex>, wrapper_state_notify: Notify, shutting_down: AtomicBool, @@ -3807,6 +3814,11 @@ enum TracePayloadApplyOutcome { QueuedFamily, } +/// Mirrors `trace_normalizer::COMPLETED_ROOT_RETENTION_LIMIT`: the maximum +/// number of completed root SIDs kept in `completed_root_sids` to guard +/// against orphaned normalizers from late duplicate events. +const COMPLETED_ROOT_SID_RETENTION_LIMIT: usize = 16_384; + impl ActorDaemonCoordinator { fn new() -> Self { let backend = Arc::new(crate::daemon::git_backend::SystemGitBackend::new()); @@ -3860,6 +3872,8 @@ impl ActorDaemonCoordinator { ingr_root_definitely_read_only: dashmap::DashSet::new(), ingr_root_open_connections: dashmap::DashMap::new(), ingr_root_close_fallback_enqueued: dashmap::DashSet::new(), + completed_root_sids: dashmap::DashSet::new(), + completed_root_order: Mutex::new(std::collections::VecDeque::new()), wrapper_states: Mutex::new(HashMap::new()), wrapper_state_notify: Notify::new(), shutting_down: AtomicBool::new(false), @@ -4319,15 +4333,24 @@ impl ActorDaemonCoordinator { fn record_trace_connection_close(&self, roots: &[String]) -> Result, GitAiError> { let mut stale_roots = Vec::new(); for root_sid in roots { - if let Some(mut count) = self.ingr_root_open_connections.get_mut(root_sid) { - if *count > 1 { - *count -= 1; - continue; - } - drop(count); - self.ingr_root_open_connections.remove(root_sid); + // Atomically decrement the open-connection count and remove the entry + // when it hits zero. remove_if_mut holds the shard write lock for the + // entire check-decrement-remove sequence, preventing a TOCTOU race with + // trace_root_connection_opened on a concurrent connection-handler thread. + // + // Return value semantics: + // Some(_) → count decremented to 0, entry removed → root is stale + // None, entry absent → root was never connection-tracked → stale + // None, entry present → count still > 0 after decrement → not stale + let removed = self + .ingr_root_open_connections + .remove_if_mut(root_sid, |_, v| { + *v = v.saturating_sub(1); + *v == 0 + }); + if removed.is_some() || !self.ingr_root_open_connections.contains_key(root_sid) { + stale_roots.push(root_sid.clone()); } - stale_roots.push(root_sid.clone()); } Ok(stale_roots) } @@ -4444,6 +4467,21 @@ impl ActorDaemonCoordinator { self.ingr_root_close_fallback_enqueued.remove(root_sid); // Release the per-root normalizer once the root is fully processed. self.normalizers.remove(root_sid); + // Record the root as completed so that any late/duplicate events (e.g. + // a connection-close fallback atexit) are discarded in + // apply_trace_payload_to_state without creating an orphaned normalizer. + { + let mut order = self.completed_root_order.lock().map_err(|_| { + GitAiError::Generic("completed root order lock poisoned".to_string()) + })?; + if order.len() >= COMPLETED_ROOT_SID_RETENTION_LIMIT + && let Some(evicted) = order.pop_front() + { + self.completed_root_sids.remove(&evicted); + } + self.completed_root_sids.insert(root_sid.to_string()); + order.push_back(root_sid.to_string()); + } let mut queued = self.queued_trace_payloads_by_root.lock().map_err(|_| { GitAiError::Generic("queued trace payloads by root lock poisoned".to_string()) })?; @@ -7031,8 +7069,20 @@ impl ActorDaemonCoordinator { &self, payload: Value, ) -> Result { - self.maybe_append_pending_root_from_trace_payload(&payload)?; + // Resolve the root SID once up front; reused for both the completed-root + // guard and the normalizer lookup below. let payload_root_sid = Self::trace_payload_root_sid(&payload); + let root_key = payload_root_sid.clone().unwrap_or_default(); + + // Early-exit for already-completed roots. A late atexit (e.g. from a + // connection-close fallback) arriving after clear_trace_root_tracking + // would otherwise lazily create a fresh empty normalizer that is never + // cleaned up, causing an unbounded slow memory leak. + if !root_key.is_empty() && self.completed_root_sids.contains(&root_key) { + return Ok(TracePayloadApplyOutcome::None); + } + + self.maybe_append_pending_root_from_trace_payload(&payload)?; let event = payload .get("event") .and_then(Value::as_str) @@ -7041,7 +7091,6 @@ impl ActorDaemonCoordinator { // Obtain (or lazily create) the per-root normalizer. We clone the Arc // out of the DashMap before awaiting so we don't hold a shard lock // across the async suspension point. - let root_key = payload_root_sid.clone().unwrap_or_default(); let normalizer_arc = self .normalizers .entry(root_key) @@ -7605,10 +7654,18 @@ fn trace_listener_loop_actor( let coord = coordinator.clone(); if std::thread::Builder::new() .spawn(move || { + // RAII guard: decrements the active-connection counter when + // this scope exits, including on panic-driven unwinding. + struct ActiveGuard(Arc); + impl Drop for ActiveGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::Relaxed); + } + } + let _guard = ActiveGuard(active); if let Err(e) = handle_trace_connection_actor(stream, coord) { tracing::debug!(%e, "trace connection error"); } - active.fetch_sub(1, Ordering::Relaxed); }) .is_err() { From 4fb8168d0c84bf90d64ba3f110a0f8a2d1863552 Mon Sep 17 00:00:00 2001 From: Sasha Varlamov Date: Wed, 15 Apr 2026 03:46:48 +0000 Subject: [PATCH 4/7] fix(daemon): address remaining Devin review findings in DashMap layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. remove_if_mut: guard against count=0 edge case — use explicit `if *v > 0 { *v -= 1 }` instead of saturating_sub to avoid silently removing entries that are already at zero. 2. ingr_root_mutating TOCTOU: replace get-then-insert with atomic entry API (or_insert + conditional upgrade) so concurrent connection-handler threads cannot overwrite true with false. 3. ingr_root_target_repo_only TOCTOU: same atomic entry fix, with the reflog cleanup only firing on an actual false→true upgrade. Co-Authored-By: Claude Opus 4.6 --- src/daemon.rs | 44 +++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 0b7349771..8eae60376 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -4345,7 +4345,9 @@ impl ActorDaemonCoordinator { let removed = self .ingr_root_open_connections .remove_if_mut(root_sid, |_, v| { - *v = v.saturating_sub(1); + if *v > 0 { + *v -= 1; + } *v == 0 }); if removed.is_some() || !self.ingr_root_open_connections.contains_key(root_sid) { @@ -5053,29 +5055,33 @@ impl ActorDaemonCoordinator { .or_else(|| trace_argv_primary_command(&effective_argv)); if let Some(primary) = effective_primary.clone() { let should_capture = trace_command_may_mutate_refs(Some(primary.as_str())); - match self.ingr_root_mutating.get(&root).map(|v| *v) { - Some(false) if should_capture => { - self.ingr_root_mutating.insert(root.clone(), true); - } - None => { - self.ingr_root_mutating.insert(root.clone(), should_capture); + { + let mut entry = self + .ingr_root_mutating + .entry(root.clone()) + .or_insert(should_capture); + if !*entry && should_capture { + *entry = true; } - _ => {} } let target_repo_only = trace_command_uses_target_repo_context_only(Some(primary.as_str())); - match self.ingr_root_target_repo_only.get(&root).map(|v| *v) { - Some(false) if target_repo_only => { - self.ingr_root_target_repo_only.insert(root.clone(), true); - self.ingr_root_reflog_refs.remove(&root); - self.ingr_root_head_reflog_start_offsets.remove(&root); - self.ingr_root_family_reflog_start_offsets.remove(&root); - } - None => { - self.ingr_root_target_repo_only - .insert(root.clone(), target_repo_only); + let upgraded_to_target_repo_only = { + let mut entry = self + .ingr_root_target_repo_only + .entry(root.clone()) + .or_insert(target_repo_only); + if !*entry && target_repo_only { + *entry = true; + true + } else { + false } - _ => {} + }; + if upgraded_to_target_repo_only { + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); } } From 1e23a26fd6e1d312450e1f2974a923fa97dde8d1 Mon Sep 17 00:00:00 2001 From: Sasha Varlamov Date: Wed, 15 Apr 2026 04:06:39 +0000 Subject: [PATCH 5/7] refactor(daemon): remove bounded connection pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the MAX_CONCURRENT_TRACE_CONNECTIONS=512 ceiling and ActiveGuard RAII pattern. The simple unbounded thread::spawn from main is sufficient — the connection pool added complexity without meaningful benefit for current workloads, and the trace listener may move away from OS threads soon. Co-Authored-By: Claude Opus 4.6 --- src/daemon.rs | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 8eae60376..04b61f4b3 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -7625,12 +7625,6 @@ fn trace_listener_loop_actor( ) -> Result<(), GitAiError> { #[cfg(not(windows))] { - // Hard ceiling on concurrent connection threads. At ~40 trace events/sec - // from Zed each connection lasts <10ms, so this is effectively unlimited - // for normal workloads; it protects against pathological floods (>1K/sec). - const MAX_CONCURRENT_TRACE_CONNECTIONS: usize = 512; - let active_connections = Arc::new(AtomicUsize::new(0)); - remove_socket_if_exists(&trace_socket_path)?; let listener = ListenerOptions::new() .name(local_socket_name(&trace_socket_path)?) @@ -7644,38 +7638,16 @@ fn trace_listener_loop_actor( let Ok(stream) = stream else { continue; }; - // Back-pressure: drop the connection rather than spawning unbounded - // threads. The git process will reconnect on the next invocation. - let current = active_connections.load(Ordering::Relaxed); - if current >= MAX_CONCURRENT_TRACE_CONNECTIONS { - tracing::debug!( - current, - limit = MAX_CONCURRENT_TRACE_CONNECTIONS, - "trace listener: dropping connection" - ); - continue; - } - active_connections.fetch_add(1, Ordering::Relaxed); - let active = active_connections.clone(); + let coord = coordinator.clone(); if std::thread::Builder::new() .spawn(move || { - // RAII guard: decrements the active-connection counter when - // this scope exits, including on panic-driven unwinding. - struct ActiveGuard(Arc); - impl Drop for ActiveGuard { - fn drop(&mut self) { - self.0.fetch_sub(1, Ordering::Relaxed); - } - } - let _guard = ActiveGuard(active); if let Err(e) = handle_trace_connection_actor(stream, coord) { tracing::debug!(%e, "trace connection error"); } }) .is_err() { - active_connections.fetch_sub(1, Ordering::Relaxed); tracing::error!("trace listener: failed to spawn handler thread"); break; } From a57a248bd1e1dc02423f002f73852eab182a3eb8 Mon Sep 17 00:00:00 2001 From: Sasha Varlamov Date: Wed, 15 Apr 2026 18:15:34 +0000 Subject: [PATCH 6/7] fix(daemon): guard against empty root_key and document benign TOCTOU - Early-exit when root_key is empty before normalizer creation, preventing an orphaned normalizer keyed on "" that would never be cleaned up. - Document the benign duplicate-stale-roots race in record_trace_connection_close (concurrent close can push the same root twice; downstream deduplicates via ingr_root_close_fallback_enqueued DashSet). Co-Authored-By: Claude Opus 4.6 --- src/daemon.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/daemon.rs b/src/daemon.rs index 04b61f4b3..b5888ac83 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -4350,6 +4350,12 @@ impl ActorDaemonCoordinator { } *v == 0 }); + // Note: the remove_if_mut + contains_key two-step can produce + // duplicate stale entries for the same root when two connections + // close concurrently (thread B removes the entry between thread A's + // remove_if_mut and contains_key). This is benign — downstream + // enqueue_stale_connection_close_fallbacks deduplicates via the + // ingr_root_close_fallback_enqueued DashSet. if removed.is_some() || !self.ingr_root_open_connections.contains_key(root_sid) { stale_roots.push(root_sid.clone()); } @@ -7080,11 +7086,17 @@ impl ActorDaemonCoordinator { let payload_root_sid = Self::trace_payload_root_sid(&payload); let root_key = payload_root_sid.clone().unwrap_or_default(); + // Early-exit: payloads with no root SID would create an orphaned + // normalizer keyed on "", which is never cleaned up. + if root_key.is_empty() { + return Ok(TracePayloadApplyOutcome::None); + } + // Early-exit for already-completed roots. A late atexit (e.g. from a // connection-close fallback) arriving after clear_trace_root_tracking // would otherwise lazily create a fresh empty normalizer that is never // cleaned up, causing an unbounded slow memory leak. - if !root_key.is_empty() && self.completed_root_sids.contains(&root_key) { + if self.completed_root_sids.contains(&root_key) { return Ok(TracePayloadApplyOutcome::None); } From 8c8636e2619960bf39041873f27d7ae9eedf13fc Mon Sep 17 00:00:00 2001 From: Sasha Varlamov Date: Wed, 15 Apr 2026 18:20:30 +0000 Subject: [PATCH 7/7] ci: trigger CI run