diff --git a/Cargo.lock b/Cargo.lock index 6f643fd56..33282c5ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -616,6 +616,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dbus" version = "0.9.10" @@ -1074,6 +1088,7 @@ dependencies = [ "chrono", "clap", "crossterm", + "dashmap", "dirs", "envy", "filetime", diff --git a/Cargo.toml b/Cargo.toml index 6d4687a89..c907ba588 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ gix-index = "0.48.0" regex = "1.12" toml = "0.9" unicode-normalization = "0.1" +dashmap = "6" [target.'cfg(windows)'.dependencies] named_pipe = "0.4.1" diff --git a/src/daemon.rs b/src/daemon.rs index df5d59207..3fa33810f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -3686,26 +3686,11 @@ impl RecentReplayPrerequisite { } } -#[derive(Debug, Default, Clone)] -struct TraceIngressState { - root_worktrees: HashMap, - root_families: HashMap, - root_argv: HashMap>, - root_pre_repo: HashMap, - root_inflight_merge_squash_contexts: HashMap, - root_terminal_merge_squash_contexts: HashMap, - root_mutating: HashMap, - root_target_repo_only: HashMap, - root_reflog_refs: HashMap>, - root_head_reflog_start_offsets: HashMap, - root_family_reflog_start_offsets: HashMap>, - root_last_activity_ns: HashMap, - /// Roots whose start event was identified as definitely read-only. All - /// subsequent events for these roots (including exit) take the fast path. - root_definitely_read_only: HashSet, - root_open_connections: HashMap, - root_close_fallback_enqueued: HashSet, -} +// Per-root ingress state is sharded across 15 DashMap/DashSet fields on +// ActorDaemonCoordinator (prefixed `ingr_`), replacing the previous single +// Mutex. Each field is independently lockable at the +// DashMap shard level, allowing concurrent events for different roots to +// proceed without serializing on a single global lock. struct CarryoverCaptureInput<'a> { root_sid: &'a str, @@ -3723,9 +3708,18 @@ pub struct ActorDaemonCoordinator { backend: Arc, coordinator: Arc>, - normalizer: AsyncMutex< - crate::daemon::trace_normalizer::TraceNormalizer< - crate::daemon::git_backend::SystemGitBackend, + // Per-root normalizers: created lazily on first payload for each root and + // removed when the root's terminal event is processed. Wrapping in + // Arc lets us clone the reference out of the DashMap before awaiting the + // inner AsyncMutex (DashMap refs must not be held across await points). + normalizers: dashmap::DashMap< + String, + Arc< + AsyncMutex< + crate::daemon::trace_normalizer::TraceNormalizer< + crate::daemon::git_backend::SystemGitBackend, + >, + >, >, >, pending_rebase_original_head_by_worktree: Mutex>, @@ -3751,7 +3745,32 @@ pub struct ActorDaemonCoordinator { queued_trace_payloads_by_root: Mutex>, processed_trace_ingest_seq: AtomicUsize, trace_ingest_progress_notify: Notify, - trace_ingress_state: Mutex, + // Per-root ingress state (sharded by root SID via DashMap internals). + ingr_root_worktrees: dashmap::DashMap, + ingr_root_families: dashmap::DashMap, + ingr_root_argv: dashmap::DashMap>, + ingr_root_pre_repo: dashmap::DashMap, + ingr_root_inflight_merge_squash: dashmap::DashMap, + ingr_root_terminal_merge_squash: dashmap::DashMap, + ingr_root_mutating: dashmap::DashMap, + ingr_root_target_repo_only: dashmap::DashMap, + ingr_root_reflog_refs: dashmap::DashMap>, + ingr_root_head_reflog_start_offsets: dashmap::DashMap, + ingr_root_family_reflog_start_offsets: dashmap::DashMap>, + ingr_root_last_activity_ns: dashmap::DashMap, + /// Roots whose `start` event was identified as definitely read-only. + /// Subsequent events for these roots (including exit/atexit) take the + /// lightweight fast path and are never enqueued. + ingr_root_definitely_read_only: dashmap::DashSet, + ingr_root_open_connections: dashmap::DashMap, + ingr_root_close_fallback_enqueued: dashmap::DashSet, + /// Root SIDs that have been fully processed (clear_trace_root_tracking was + /// called). Used to discard late/duplicate events (e.g. a connection-close + /// fallback atexit that arrives after the root's terminal event) without + /// creating a fresh per-root normalizer that would never be cleaned up. + /// Bounded to COMPLETED_ROOT_SID_RETENTION_LIMIT entries (FIFO eviction). + completed_root_sids: dashmap::DashSet, + completed_root_order: Mutex>, wrapper_states: Mutex>, wrapper_state_notify: Notify, shutting_down: AtomicBool, @@ -3772,6 +3791,11 @@ enum TracePayloadApplyOutcome { QueuedFamily, } +/// Mirrors `trace_normalizer::COMPLETED_ROOT_RETENTION_LIMIT`: the maximum +/// number of completed root SIDs kept in `completed_root_sids` to guard +/// against orphaned normalizers from late duplicate events. +const COMPLETED_ROOT_SID_RETENTION_LIMIT: usize = 16_384; + impl ActorDaemonCoordinator { fn new() -> Self { let backend = Arc::new(crate::daemon::git_backend::SystemGitBackend::new()); @@ -3779,9 +3803,7 @@ impl ActorDaemonCoordinator { coordinator: Arc::new(crate::daemon::coordinator::Coordinator::new( backend.clone(), )), - normalizer: AsyncMutex::new(crate::daemon::trace_normalizer::TraceNormalizer::new( - backend.clone(), - )), + normalizers: dashmap::DashMap::new(), backend, pending_rebase_original_head_by_worktree: Mutex::new(HashMap::new()), pending_cherry_pick_sources_by_worktree: Mutex::new(HashMap::new()), @@ -3812,7 +3834,23 @@ impl ActorDaemonCoordinator { queued_trace_payloads_by_root: Mutex::new(HashMap::new()), processed_trace_ingest_seq: AtomicUsize::new(0), trace_ingest_progress_notify: Notify::new(), - trace_ingress_state: Mutex::new(TraceIngressState::default()), + ingr_root_worktrees: dashmap::DashMap::new(), + ingr_root_families: dashmap::DashMap::new(), + ingr_root_argv: dashmap::DashMap::new(), + ingr_root_pre_repo: dashmap::DashMap::new(), + ingr_root_inflight_merge_squash: dashmap::DashMap::new(), + ingr_root_terminal_merge_squash: dashmap::DashMap::new(), + ingr_root_mutating: dashmap::DashMap::new(), + ingr_root_target_repo_only: dashmap::DashMap::new(), + ingr_root_reflog_refs: dashmap::DashMap::new(), + ingr_root_head_reflog_start_offsets: dashmap::DashMap::new(), + ingr_root_family_reflog_start_offsets: dashmap::DashMap::new(), + ingr_root_last_activity_ns: dashmap::DashMap::new(), + ingr_root_definitely_read_only: dashmap::DashSet::new(), + ingr_root_open_connections: dashmap::DashMap::new(), + ingr_root_close_fallback_enqueued: dashmap::DashSet::new(), + completed_root_sids: dashmap::DashSet::new(), + completed_root_order: Mutex::new(std::collections::VecDeque::new()), wrapper_states: Mutex::new(HashMap::new()), wrapper_state_notify: Notify::new(), shutting_down: AtomicBool::new(false), @@ -3884,9 +3922,9 @@ impl ActorDaemonCoordinator { /// Garbage-collect empty or idle entries from per-family and per-root maps /// to prevent unbounded memory growth in long-running daemon processes. fn gc_stale_family_state(&self) { - // NOTE: Do NOT call normalizer.sweep_orphans() here — it removes ALL - // pending/deferred roots unconditionally which destroys in-flight trace - // state. sweep_orphans() is only safe at daemon shutdown. + // NOTE: Do NOT call any per-root normalizer's sweep_orphans() here — + // it removes ALL pending/deferred roots unconditionally which destroys + // in-flight trace state. sweep_orphans() is only safe at shutdown. if let Ok(mut map) = self.recent_replay_prerequisites_by_family.lock() { map.retain(|_, entries| !entries.is_empty()); } @@ -4247,57 +4285,49 @@ impl ActorDaemonCoordinator { Ok(()) } - fn trace_root_is_tracked(ingress: &TraceIngressState, root: &str) -> bool { - ingress.root_worktrees.contains_key(root) - || ingress.root_families.contains_key(root) - || ingress.root_argv.contains_key(root) - || ingress.root_pre_repo.contains_key(root) - || ingress.root_mutating.contains_key(root) - || ingress.root_target_repo_only.contains_key(root) - || ingress.root_reflog_refs.contains_key(root) - || ingress.root_head_reflog_start_offsets.contains_key(root) - || ingress.root_family_reflog_start_offsets.contains_key(root) - } - - fn mark_trace_root_activity(&self, root_sid: &str) -> Result<(), GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; - ingress - .root_last_activity_ns - .insert(root_sid.to_string(), now_unix_nanos() as u64); - ingress.root_close_fallback_enqueued.remove(root_sid); - Ok(()) + fn trace_root_is_tracked(&self, root: &str) -> bool { + self.ingr_root_worktrees.contains_key(root) + || self.ingr_root_families.contains_key(root) + || self.ingr_root_argv.contains_key(root) + || self.ingr_root_pre_repo.contains_key(root) + || self.ingr_root_mutating.contains_key(root) + || self.ingr_root_target_repo_only.contains_key(root) + || self.ingr_root_reflog_refs.contains_key(root) + || self.ingr_root_head_reflog_start_offsets.contains_key(root) + || self + .ingr_root_family_reflog_start_offsets + .contains_key(root) } fn trace_root_connection_opened(&self, root_sid: &str) -> Result<(), GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; - *ingress - .root_open_connections + *self + .ingr_root_open_connections .entry(root_sid.to_string()) .or_insert(0) += 1; Ok(()) } fn record_trace_connection_close(&self, roots: &[String]) -> Result, GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; let mut stale_roots = Vec::new(); for root_sid in roots { - if let Some(count) = ingress.root_open_connections.get_mut(root_sid) { - if *count > 1 { - *count -= 1; - continue; - } - ingress.root_open_connections.remove(root_sid); + // Atomically decrement the open-connection count and remove the entry + // when it hits zero. remove_if_mut holds the shard write lock for the + // entire check-decrement-remove sequence, preventing a TOCTOU race with + // trace_root_connection_opened on a concurrent connection-handler thread. + // + // Return value semantics: + // Some(_) → count decremented to 0, entry removed → root is stale + // None, entry absent → root was never connection-tracked → stale + // None, entry present → count still > 0 after decrement → not stale + let removed = self + .ingr_root_open_connections + .remove_if_mut(root_sid, |_, v| { + *v = v.saturating_sub(1); + *v == 0 + }); + if removed.is_some() || !self.ingr_root_open_connections.contains_key(root_sid) { + stale_roots.push(root_sid.clone()); } - stale_roots.push(root_sid.clone()); } Ok(stale_roots) } @@ -4346,29 +4376,28 @@ impl ActorDaemonCoordinator { fn enqueue_stale_connection_close_fallbacks(&self, roots: &[String]) -> Result<(), GitAiError> { let stale_roots = { - let mut ingress = self.trace_ingress_state.lock().map_err(|_| { - GitAiError::Generic("trace ingress state lock poisoned".to_string()) - })?; let mut stale = Vec::new(); for root_sid in roots { - if !Self::trace_root_is_tracked(&ingress, root_sid) { + if !self.trace_root_is_tracked(root_sid) { continue; } - if ingress - .root_open_connections + if self + .ingr_root_open_connections .get(root_sid) - .copied() + .map(|v| *v) .unwrap_or(0) > 0 { continue; } - if ingress.root_close_fallback_enqueued.contains(root_sid) { + // DashSet::insert returns true if the value was newly inserted; + // false means a fallback is already scheduled for this root. + if !self + .ingr_root_close_fallback_enqueued + .insert(root_sid.clone()) + { continue; } - ingress - .root_close_fallback_enqueued - .insert(root_sid.clone()); stale.push(root_sid.clone()); } stale @@ -4398,25 +4427,38 @@ impl ActorDaemonCoordinator { } fn clear_trace_root_tracking(&self, root_sid: &str) -> Result<(), GitAiError> { - let mut ingress = self - .trace_ingress_state - .lock() - .map_err(|_| GitAiError::Generic("trace ingress state lock poisoned".to_string()))?; - ingress.root_worktrees.remove(root_sid); - ingress.root_families.remove(root_sid); - ingress.root_argv.remove(root_sid); - ingress.root_pre_repo.remove(root_sid); - ingress.root_inflight_merge_squash_contexts.remove(root_sid); - ingress.root_terminal_merge_squash_contexts.remove(root_sid); - ingress.root_mutating.remove(root_sid); - ingress.root_target_repo_only.remove(root_sid); - ingress.root_reflog_refs.remove(root_sid); - ingress.root_head_reflog_start_offsets.remove(root_sid); - ingress.root_family_reflog_start_offsets.remove(root_sid); - ingress.root_last_activity_ns.remove(root_sid); - ingress.root_definitely_read_only.remove(root_sid); - ingress.root_open_connections.remove(root_sid); - ingress.root_close_fallback_enqueued.remove(root_sid); + self.ingr_root_worktrees.remove(root_sid); + self.ingr_root_families.remove(root_sid); + self.ingr_root_argv.remove(root_sid); + self.ingr_root_pre_repo.remove(root_sid); + self.ingr_root_inflight_merge_squash.remove(root_sid); + self.ingr_root_terminal_merge_squash.remove(root_sid); + self.ingr_root_mutating.remove(root_sid); + self.ingr_root_target_repo_only.remove(root_sid); + self.ingr_root_reflog_refs.remove(root_sid); + self.ingr_root_head_reflog_start_offsets.remove(root_sid); + self.ingr_root_family_reflog_start_offsets.remove(root_sid); + self.ingr_root_last_activity_ns.remove(root_sid); + self.ingr_root_definitely_read_only.remove(root_sid); + self.ingr_root_open_connections.remove(root_sid); + self.ingr_root_close_fallback_enqueued.remove(root_sid); + // Release the per-root normalizer once the root is fully processed. + self.normalizers.remove(root_sid); + // Record the root as completed so that any late/duplicate events (e.g. + // a connection-close fallback atexit) are discarded in + // apply_trace_payload_to_state without creating an orphaned normalizer. + { + let mut order = self.completed_root_order.lock().map_err(|_| { + GitAiError::Generic("completed root order lock poisoned".to_string()) + })?; + if order.len() >= COMPLETED_ROOT_SID_RETENTION_LIMIT + && let Some(evicted) = order.pop_front() + { + self.completed_root_sids.remove(&evicted); + } + self.completed_root_sids.insert(root_sid.to_string()); + order.push_back(root_sid.to_string()); + } let mut queued = self.queued_trace_payloads_by_root.lock().map_err(|_| { GitAiError::Generic("queued trace payloads by root lock poisoned".to_string()) })?; @@ -4946,86 +4988,77 @@ impl ActorDaemonCoordinator { trace_invocation_is_definitely_read_only(early_primary.as_deref(), &argv); // For events with no command info (exit/atexit), defer to the cached // flag inside the lock to avoid a second lock acquisition. + // Fast-path decision: is this definitely a read-only event? + // For events with command info (start, def_repo): use argv-based check. + // For events without command info (exit, atexit): check the cached flag. + // Both checks are now lock-free thanks to DashSet. let may_be_read_only = event_is_read_only || early_primary.is_none(); if may_be_read_only { - let mut ingress = match self.trace_ingress_state.lock() { - Ok(guard) => guard, - // If the lock is poisoned we cannot determine read-only status; - // fall through and let the ingest worker handle error recovery. - Err(_) => return false, - }; - // If the event itself wasn't identified as read-only, check the root flag. - if !event_is_read_only && !ingress.root_definitely_read_only.contains(&root) { - // Not read-only — drop the lock and fall through to the full path. - drop(ingress); + let is_read_only_root = self.ingr_root_definitely_read_only.contains(&root); + if !event_is_read_only && !is_read_only_root { + // Neither the event itself nor the cached root flag indicates + // read-only — fall through to the full mutating path. } else { - // Activity tracking (folded here to avoid a separate lock acquisition) - ingress - .root_last_activity_ns + // Activity tracking — DashMap ops, no global lock needed. + self.ingr_root_last_activity_ns .insert(root.clone(), now_unix_nanos() as u64); - ingress.root_close_fallback_enqueued.remove(&root); - // Minimal state tracking for connection lifecycle + self.ingr_root_close_fallback_enqueued.remove(&root); + // Minimal state tracking for connection lifecycle. if let Some(worktree) = trace_payload_worktree_hint(payload) { - ingress.root_worktrees.insert(root.clone(), worktree); + self.ingr_root_worktrees.insert(root.clone(), worktree); } if event == "start" && sid == root && !argv.is_empty() { - ingress.root_argv.insert(root.clone(), argv); - ingress.root_definitely_read_only.insert(root.clone()); + self.ingr_root_argv.insert(root.clone(), argv); + self.ingr_root_definitely_read_only.insert(root.clone()); } - ingress.root_mutating.entry(root.clone()).or_insert(false); - // Cleanup on terminal event + self.ingr_root_mutating.entry(root.clone()).or_insert(false); + // Cleanup on terminal event. if is_terminal_root_trace_event(&event, &sid, &root) { - ingress.root_families.remove(&root); - ingress.root_mutating.remove(&root); - ingress.root_target_repo_only.remove(&root); - ingress.root_argv.remove(&root); - ingress.root_pre_repo.remove(&root); - ingress.root_worktrees.remove(&root); - ingress.root_inflight_merge_squash_contexts.remove(&root); - ingress.root_terminal_merge_squash_contexts.remove(&root); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); - ingress.root_last_activity_ns.remove(&root); - ingress.root_definitely_read_only.remove(&root); + self.ingr_root_families.remove(&root); + self.ingr_root_mutating.remove(&root); + self.ingr_root_target_repo_only.remove(&root); + self.ingr_root_argv.remove(&root); + self.ingr_root_pre_repo.remove(&root); + self.ingr_root_worktrees.remove(&root); + self.ingr_root_inflight_merge_squash.remove(&root); + self.ingr_root_terminal_merge_squash.remove(&root); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); + self.ingr_root_last_activity_ns.remove(&root); + self.ingr_root_definitely_read_only.remove(&root); } // Payload was fully handled inline; tell the caller to skip enqueue. return true; } } - let _ = self.mark_trace_root_activity(&root); - let mut ingress = match self.trace_ingress_state.lock() { - Ok(guard) => guard, - Err(_) => { - observability::log_error( - &GitAiError::Generic("trace ingress state lock poisoned".to_string()), - Some(serde_json::json!({ - "component": "daemon", - "phase": "augment_trace_payload_with_reflog_metadata", - "sid": sid, - "event": event, - })), - ); - return false; - } - }; + // Mutating path — all state updates go directly to the per-root DashMaps, + // no global lock needed. Each DashMap op locks only one shard. + + // Activity bookkeeping (previously a separate lock cycle). + self.ingr_root_last_activity_ns + .insert(root.clone(), now_unix_nanos() as u64); + self.ingr_root_close_fallback_enqueued.remove(&root); if let Some(worktree) = trace_payload_worktree_hint(payload) { if let Some(common_dir) = common_dir_for_worktree(&worktree) { let family = common_dir.canonicalize().unwrap_or(common_dir); - ingress - .root_families + self.ingr_root_families .insert(root.clone(), family.to_string_lossy().to_string()); } - ingress.root_worktrees.insert(root.clone(), worktree); + self.ingr_root_worktrees.insert(root.clone(), worktree); } let payload_argv = trace_payload_argv(payload); if event == "start" && sid == root && !payload_argv.is_empty() { - ingress.root_argv.insert(root.clone(), payload_argv.clone()); + self.ingr_root_argv + .insert(root.clone(), payload_argv.clone()); } let effective_argv = if payload_argv.is_empty() { - ingress.root_argv.get(&root).cloned().unwrap_or_default() + self.ingr_root_argv + .get(&root) + .map(|v| v.clone()) + .unwrap_or_default() } else { payload_argv }; @@ -5033,60 +5066,66 @@ impl ActorDaemonCoordinator { .or_else(|| trace_argv_primary_command(&effective_argv)); if let Some(primary) = effective_primary.clone() { let should_capture = trace_command_may_mutate_refs(Some(primary.as_str())); - match ingress.root_mutating.get(&root).copied() { + match self.ingr_root_mutating.get(&root).map(|v| *v) { Some(false) if should_capture => { - ingress.root_mutating.insert(root.clone(), true); + self.ingr_root_mutating.insert(root.clone(), true); } None => { - ingress.root_mutating.insert(root.clone(), should_capture); + self.ingr_root_mutating.insert(root.clone(), should_capture); } _ => {} } let target_repo_only = trace_command_uses_target_repo_context_only(Some(primary.as_str())); - match ingress.root_target_repo_only.get(&root).copied() { + match self.ingr_root_target_repo_only.get(&root).map(|v| *v) { Some(false) if target_repo_only => { - ingress.root_target_repo_only.insert(root.clone(), true); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); + self.ingr_root_target_repo_only.insert(root.clone(), true); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); } None => { - ingress - .root_target_repo_only + self.ingr_root_target_repo_only .insert(root.clone(), target_repo_only); } _ => {} } } - let Some(worktree) = ingress.root_worktrees.get(&root).cloned() else { + let Some(worktree) = self.ingr_root_worktrees.get(&root).map(|v| v.clone()) else { if is_terminal_root_trace_event(&event, &sid, &root) { - ingress.root_families.remove(&root); - ingress.root_mutating.remove(&root); - ingress.root_target_repo_only.remove(&root); - ingress.root_argv.remove(&root); - ingress.root_pre_repo.remove(&root); - ingress.root_inflight_merge_squash_contexts.remove(&root); - ingress.root_terminal_merge_squash_contexts.remove(&root); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); + self.ingr_root_families.remove(&root); + self.ingr_root_mutating.remove(&root); + self.ingr_root_target_repo_only.remove(&root); + self.ingr_root_argv.remove(&root); + self.ingr_root_pre_repo.remove(&root); + self.ingr_root_inflight_merge_squash.remove(&root); + self.ingr_root_terminal_merge_squash.remove(&root); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); } return false; }; - let should_capture_mutation = *ingress.root_mutating.get(&root).unwrap_or(&false); - let target_repo_only = *ingress.root_target_repo_only.get(&root).unwrap_or(&false); + let should_capture_mutation = self + .ingr_root_mutating + .get(&root) + .map(|v| *v) + .unwrap_or(false); + let target_repo_only = self + .ingr_root_target_repo_only + .get(&root) + .map(|v| *v) + .unwrap_or(false); if !target_repo_only - && !ingress.root_pre_repo.contains_key(&root) + && !self.ingr_root_pre_repo.contains_key(&root) && let Some(state) = read_head_state_for_worktree(&worktree) { - ingress - .root_pre_repo + self.ingr_root_pre_repo .insert(root.clone(), repo_context_from_head_state(state)); } - let pre_repo = ingress.root_pre_repo.get(&root).cloned(); + let pre_repo = self.ingr_root_pre_repo.get(&root).map(|v| v.clone()); if should_capture_mutation && !target_repo_only { let contextual_refs = if let Some(repo) = pre_repo.as_ref() { tracked_reflog_refs_for_command( @@ -5103,27 +5142,25 @@ impl ActorDaemonCoordinator { &effective_argv, ) }; - let refs = ingress - .root_reflog_refs - .entry(root.clone()) - .or_insert_with(Vec::new); + let mut refs_entry = self.ingr_root_reflog_refs.entry(root.clone()).or_default(); for reference in contextual_refs { - if !refs.iter().any(|existing| existing == &reference) { - refs.push(reference); + if !refs_entry.iter().any(|existing| existing == &reference) { + refs_entry.push(reference); } } - refs.sort(); - refs.dedup(); + refs_entry.sort(); + refs_entry.dedup(); + // Drop the entry ref to release the DashMap shard lock. + drop(refs_entry); } - let cached_inflight_merge_squash = ingress - .root_inflight_merge_squash_contexts + let cached_inflight_merge_squash = self + .ingr_root_inflight_merge_squash .get(&root) - .cloned(); - let cached_terminal_merge_squash = ingress - .root_terminal_merge_squash_contexts + .map(|v| v.clone()); + let cached_terminal_merge_squash = self + .ingr_root_terminal_merge_squash .get(&root) - .cloned(); - drop(ingress); + .map(|v| v.clone()); let mut inflight_merge_squash_to_cache = None; if let Some(object) = payload.as_object_mut() { @@ -5265,48 +5302,40 @@ impl ActorDaemonCoordinator { } } - let mut ingress = match self.trace_ingress_state.lock() { - Ok(guard) => guard, - Err(_) => { - observability::log_error( - &GitAiError::Generic("trace ingress state lock poisoned".to_string()), - Some(serde_json::json!({ - "component": "daemon", - "phase": "augment_trace_payload_with_reflog_metadata", - "sid": sid, - "event": event, - })), - ); - return false; - } - }; + // Cache inflight/terminal merge-squash context (first-writer wins). if let Some(context) = inflight_merge_squash_to_cache { - ingress - .root_inflight_merge_squash_contexts + self.ingr_root_inflight_merge_squash .entry(root.clone()) .or_insert(context); } if let Some(context) = terminal_merge_squash_to_cache { - ingress - .root_terminal_merge_squash_contexts + self.ingr_root_terminal_merge_squash .entry(root.clone()) .or_insert(context); } if should_capture_mutation && !target_repo_only { - if !ingress.root_head_reflog_start_offsets.contains_key(&root) + if !self.ingr_root_head_reflog_start_offsets.contains_key(&root) && let Some(offset) = daemon_worktree_head_reflog_offset(&worktree) { - ingress - .root_head_reflog_start_offsets + self.ingr_root_head_reflog_start_offsets .insert(root.clone(), offset); } - if !ingress.root_family_reflog_start_offsets.contains_key(&root) - && let Some(refs) = ingress.root_reflog_refs.get(&root) - && let Some(offsets) = daemon_reflog_offsets_for_refs(&worktree, refs) + if !self + .ingr_root_family_reflog_start_offsets + .contains_key(&root) { - ingress - .root_family_reflog_start_offsets - .insert(root.clone(), offsets); + // Clone the refs vec to avoid holding a DashMap shard lock + // across the I/O call to daemon_reflog_offsets_for_refs. + let refs_opt = self + .ingr_root_reflog_refs + .get(&root) + .map(|v| v.value().clone()); + if let Some(refs) = refs_opt + && let Some(offsets) = daemon_reflog_offsets_for_refs(&worktree, &refs) + { + self.ingr_root_family_reflog_start_offsets + .insert(root.clone(), offsets); + } } } @@ -5318,8 +5347,10 @@ impl ActorDaemonCoordinator { object.insert("git_ai_post_repo".to_string(), json!(state)); } if should_capture_mutation && !target_repo_only { - if let Some(start_offset) = - ingress.root_head_reflog_start_offsets.get(&root).copied() + if let Some(start_offset) = self + .ingr_root_head_reflog_start_offsets + .get(&root) + .map(|v| *v) { object.insert( "git_ai_worktree_head_reflog_start".to_string(), @@ -5332,16 +5363,26 @@ impl ActorDaemonCoordinator { json!(end_offset), ); } - if let Some(start_offsets) = ingress.root_family_reflog_start_offsets.get(&root) { + // Clone start_offsets and refs so we don't hold shard locks + // across I/O calls (daemon_reflog_offsets_for_refs, etc.). + let start_offsets_opt = self + .ingr_root_family_reflog_start_offsets + .get(&root) + .map(|v| v.value().clone()); + if let Some(start_offsets) = start_offsets_opt { object.insert( "git_ai_family_reflog_start".to_string(), - json!(start_offsets), + json!(&start_offsets), ); - if let Some(refs) = ingress.root_reflog_refs.get(&root) + let refs_opt = self + .ingr_root_reflog_refs + .get(&root) + .map(|v| v.value().clone()); + if let Some(refs) = refs_opt && let Some(mut end_offsets) = - daemon_reflog_offsets_for_refs(&worktree, refs) + daemon_reflog_offsets_for_refs(&worktree, &refs) { - for (reference, start_offset) in start_offsets { + for (reference, start_offset) in &start_offsets { let end_offset = end_offsets .entry(reference.clone()) .or_insert(*start_offset); @@ -5351,7 +5392,7 @@ impl ActorDaemonCoordinator { } match daemon_reflog_delta_from_offsets( &worktree, - start_offsets, + &start_offsets, &end_offsets, ) { Ok(ref_changes) => { @@ -5468,17 +5509,17 @@ impl ActorDaemonCoordinator { } } } - ingress.root_worktrees.remove(&root); - ingress.root_families.remove(&root); - ingress.root_argv.remove(&root); - ingress.root_pre_repo.remove(&root); - ingress.root_inflight_merge_squash_contexts.remove(&root); - ingress.root_terminal_merge_squash_contexts.remove(&root); - ingress.root_mutating.remove(&root); - ingress.root_target_repo_only.remove(&root); - ingress.root_reflog_refs.remove(&root); - ingress.root_head_reflog_start_offsets.remove(&root); - ingress.root_family_reflog_start_offsets.remove(&root); + self.ingr_root_worktrees.remove(&root); + self.ingr_root_families.remove(&root); + self.ingr_root_argv.remove(&root); + self.ingr_root_pre_repo.remove(&root); + self.ingr_root_inflight_merge_squash.remove(&root); + self.ingr_root_terminal_merge_squash.remove(&root); + self.ingr_root_mutating.remove(&root); + self.ingr_root_target_repo_only.remove(&root); + self.ingr_root_reflog_refs.remove(&root); + self.ingr_root_head_reflog_start_offsets.remove(&root); + self.ingr_root_family_reflog_start_offsets.remove(&root); } // Payload was fully augmented for a mutating command; tell the caller // to stamp a sequence number and enqueue it. @@ -6989,15 +7030,39 @@ impl ActorDaemonCoordinator { &self, payload: Value, ) -> Result { - self.maybe_append_pending_root_from_trace_payload(&payload)?; + // Resolve the root SID once up front; reused for both the completed-root + // guard and the normalizer lookup below. let payload_root_sid = Self::trace_payload_root_sid(&payload); + let root_key = payload_root_sid.clone().unwrap_or_default(); + + // Early-exit for already-completed roots. A late atexit (e.g. from a + // connection-close fallback) arriving after clear_trace_root_tracking + // would otherwise lazily create a fresh empty normalizer that is never + // cleaned up, causing an unbounded slow memory leak. + if !root_key.is_empty() && self.completed_root_sids.contains(&root_key) { + return Ok(TracePayloadApplyOutcome::None); + } + + self.maybe_append_pending_root_from_trace_payload(&payload)?; let event = payload .get("event") .and_then(Value::as_str) .unwrap_or_default() .to_string(); + // Obtain (or lazily create) the per-root normalizer. We clone the Arc + // out of the DashMap before awaiting so we don't hold a shard lock + // across the async suspension point. + let normalizer_arc = self + .normalizers + .entry(root_key) + .or_insert_with(|| { + Arc::new(AsyncMutex::new( + crate::daemon::trace_normalizer::TraceNormalizer::new(self.backend.clone()), + )) + }) + .clone(); let emitted = { - let mut normalizer = self.normalizer.lock().await; + let mut normalizer = normalizer_arc.lock().await; normalizer.ingest_payload(&payload)? }; let Some(command) = emitted else { @@ -7511,6 +7576,12 @@ fn trace_listener_loop_actor( ) -> Result<(), GitAiError> { #[cfg(not(windows))] { + // Hard ceiling on concurrent connection threads. At ~40 trace events/sec + // from Zed each connection lasts <10ms, so this is effectively unlimited + // for normal workloads; it protects against pathological floods (>1K/sec). + const MAX_CONCURRENT_TRACE_CONNECTIONS: usize = 512; + let active_connections = Arc::new(AtomicUsize::new(0)); + remove_socket_if_exists(&trace_socket_path)?; let listener = ListenerOptions::new() .name(local_socket_name(&trace_socket_path)?) @@ -7524,15 +7595,37 @@ fn trace_listener_loop_actor( let Ok(stream) = stream else { continue; }; + // Back-pressure: drop the connection rather than spawning unbounded + // threads. The git process will reconnect on the next invocation. + let current = active_connections.load(Ordering::Relaxed); + if current >= MAX_CONCURRENT_TRACE_CONNECTIONS { + debug_log(&format!( + "daemon trace listener: dropping connection, {} active (limit {})", + current, MAX_CONCURRENT_TRACE_CONNECTIONS + )); + continue; + } + active_connections.fetch_add(1, Ordering::Relaxed); + let active = active_connections.clone(); let coord = coordinator.clone(); if std::thread::Builder::new() .spawn(move || { + // RAII guard: decrements the active-connection counter when + // this scope exits, including on panic-driven unwinding. + struct ActiveGuard(Arc); + impl Drop for ActiveGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::Relaxed); + } + } + let _guard = ActiveGuard(active); if let Err(e) = handle_trace_connection_actor(stream, coord) { debug_log(&format!("daemon trace connection error: {}", e)); } }) .is_err() { + active_connections.fetch_sub(1, Ordering::Relaxed); debug_log("daemon trace listener: failed to spawn handler thread"); break; }