diff --git a/ares-cli/src/ops/replay.rs b/ares-cli/src/ops/replay.rs index 11f9382e..121b80ed 100644 --- a/ares-cli/src/ops/replay.rs +++ b/ares-cli/src/ops/replay.rs @@ -1,5 +1,5 @@ //! `ares ops replay` — rebuild a point-in-time state snapshot from the -//! JetStream `ARES_OPSTATE` event log. Phase 5 forensics tooling. +//! JetStream `ARES_OPSTATE` event log. use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; diff --git a/ares-cli/src/orchestrator/automation/adcs_exploitation.rs b/ares-cli/src/orchestrator/automation/adcs_exploitation.rs index 96a16b9d..ef9f48f6 100644 --- a/ares-cli/src/orchestrator/automation/adcs_exploitation.rs +++ b/ares-cli/src/orchestrator/automation/adcs_exploitation.rs @@ -129,6 +129,107 @@ pub(crate) fn build_relay_coerce_args( v } +/// Build the `certipy_auth` arguments JSON for the second phase of the +/// relay chain (after a PFX has been captured). Pure so the keying stays +/// pinned by a unit test — the tool requires `pfx_path` (not `pfx`), +/// `dc_ip`, and `domain`; a one-letter drift here silently fails the whole +/// chain because `required_str` returns an error before certipy spawns. +pub(crate) fn build_certipy_auth_args( + pfx_path: &str, + relayed_user: Option<&str>, + domain: &str, + ca_host: &str, +) -> serde_json::Value { + let mut v = serde_json::json!({ "pfx_path": pfx_path }); + if let Some(u) = relayed_user { + // Strip trailing `$` (impacket's relay output ends machine account + // names with `$`); certipy_auth wants the bare SAM name. + v["username"] = serde_json::Value::String(u.trim_end_matches('$').to_string()); + } + if !domain.is_empty() { + v["domain"] = serde_json::Value::String(domain.to_string()); + } + if !ca_host.is_empty() { + // certipy_auth uses dc-ip for the KDC lookup; the CA host is also + // a viable target since the coerced victim is a DC and its KDC + // sits on the same host. Same for ESC11 — the RPC ICPR victim is + // the CA, and its KDC is co-located. + v["dc_ip"] = serde_json::Value::String(ca_host.to_string()); + } + v +} + +/// Resolve the realm + KDC IP that `certipy_auth` should target for a +/// captured cert. In cross-forest ESC8 relays the relayed account lives in +/// a *different* forest than the CA's domain — the discovered vuln record +/// carries the CA's domain, not the relayed account's. Authenticating +/// with the cert against the wrong realm/KDC produces +/// `KDC_ERR_S_PRINCIPAL_UNKNOWN`. +/// +/// The relayed machine is whatever IP we coerced from, so its host record +/// (looked up by IP, then by hostname stem as a fallback) carries the +/// correct home domain. The KDC for that home domain comes from +/// `state.domain_controllers`. Caller passes the vuln-record domain/CA +/// host as fallbacks; those win only when the host lookup turns up empty, +/// preserving the same-forest behavior the chain shipped with. +pub(crate) fn resolve_relayed_account_realm( + state: &StateInner, + coerce_target_ip: &str, + relayed_user: Option<&str>, + fallback_domain: &str, + fallback_dc_ip: &str, +) -> (String, String) { + // `Host` carries the domain in its FQDN hostname (e.g. + // `dc01.contoso.local`), not in a dedicated field. Pull the suffix. + let domain_from_fqdn = |hostname: &str| -> Option { + let (_, tail) = hostname.split_once('.')?; + if tail.is_empty() { + return None; + } + Some(tail.to_lowercase()) + }; + let dc_for = |domain: &str| -> Option { + state + .domain_controllers + .get(&domain.to_lowercase()) + .cloned() + }; + + // Match by coerce IP first — the relayed machine account is the host + // that authenticated to our listener, which is the IP we coerced. + if let Some(host) = state.hosts.iter().find(|h| h.ip == coerce_target_ip) { + if let Some(domain) = domain_from_fqdn(&host.hostname) { + let dc_ip = dc_for(&domain).unwrap_or_else(|| fallback_dc_ip.to_string()); + return (domain, dc_ip); + } + } + + // Fallback: match by hostname stem (strip `$` from the relayed user). + // Useful when state.hosts has the machine indexed by hostname but the + // IP doesn't match (e.g. NAT, second NIC, or a host entry created + // from LDAP without an IP yet). + if let Some(u) = relayed_user { + let stem = u.trim_end_matches('$').to_lowercase(); + if !stem.is_empty() { + let hit = state.hosts.iter().find(|h| { + h.hostname + .to_lowercase() + .split('.') + .next() + .is_some_and(|s| s == stem) + }); + if let Some(host) = hit { + if let Some(domain) = domain_from_fqdn(&host.hostname) { + let dc_ip = dc_for(&domain).unwrap_or_else(|| fallback_dc_ip.to_string()); + return (domain, dc_ip); + } + } + } + } + + (fallback_domain.to_string(), fallback_dc_ip.to_string()) +} + /// Distinguishes the two coercion-based ADCS exploitation paths that share /// the `relay_and_coerce` composite tool. Differences are isolated to: /// (a) the ntlmrelayx target URL — HTTP web enrollment vs RPC ICPR; @@ -1225,6 +1326,7 @@ async fn dispatch_relay_coerce_chain( // the next tick retry. let mut pfx_path: Option = None; let mut relayed_user: Option = None; + let mut successful_coerce_target: Option = None; let mut last_summary = String::new(); let mut bind_busy = false; let mut last_task_id = String::new(); @@ -1300,6 +1402,7 @@ async fn dispatch_relay_coerce_chain( if let Some(p) = parsed.pfx_path { pfx_path = Some(p); relayed_user = parsed.relayed_user; + successful_coerce_target = Some(coerce_target.clone()); info!( vuln_id = %vuln_id_bg, esc_type = esc_label, @@ -1363,23 +1466,30 @@ async fn dispatch_relay_coerce_chain( // Phase 2: certipy auth -pfx -> NT hash for the relayed user. // The auth produces a Hash discovery via the certipy_auth parser. - let mut auth_args = serde_json::json!({ "pfx": pfx_path }); - if let Some(ref u) = relayed_user { - // Strip trailing `$` (impacket's relay output ends machine - // account names with `$`); certipy_auth wants the bare SAM name. - auth_args["username"] = serde_json::Value::String(u.trim_end_matches('$').to_string()); - } - if !domain_bg.is_empty() { - auth_args["domain"] = serde_json::Value::String(domain_bg.clone()); - } - if !ca_host_bg.is_empty() { - // certipy_auth uses dc-ip for the KDC lookup; the CA host is - // also a viable target since the coerced victim is a DC and - // its KDC sits on the same host. Caller can override via - // payload if a separate dc_ip is known. Same for ESC11 — the - // RPC ICPR victim is the CA, and its KDC is co-located. - auth_args["dc_ip"] = serde_json::Value::String(ca_host_bg.clone()); - } + // + // The relayed account's realm can differ from the vuln record's + // domain (cross-forest ESC8 — a machine in forest A relayed to a + // CA in forest B via trust). Resolve the home realm + KDC from + // state.hosts/domain_controllers so certipy_auth doesn't PKINIT + // against the wrong KDC and bail with + // KDC_ERR_S_PRINCIPAL_UNKNOWN. + let coerce_ip_for_lookup = successful_coerce_target.as_deref().unwrap_or(""); + let (auth_domain, auth_dc_ip) = { + let state = dispatcher_bg.state.read().await; + resolve_relayed_account_realm( + &state, + coerce_ip_for_lookup, + relayed_user.as_deref(), + &domain_bg, + &ca_host_bg, + ) + }; + let auth_args = build_certipy_auth_args( + &pfx_path, + relayed_user.as_deref(), + &auth_domain, + &auth_dc_ip, + ); let auth_call = ares_llm::ToolCall { id: format!("certipy_auth_{}", uuid::Uuid::new_v4().simple()), @@ -3267,6 +3377,208 @@ RELAYED_USER=DC01$ assert_eq!(args["relay_target_url"], "rpc://192.168.58.50"); } + // --- build_certipy_auth_args ---------------------------------------- + + #[test] + fn build_certipy_auth_args_uses_pfx_path_not_pfx() { + // Regression: the relay chain previously emitted `"pfx"` while the + // tool (`ares_tools::privesc::certipy_auth`) reads `"pfx_path"` via + // `required_str`. Captured machine-account certs were silently + // dropped before certipy ever spawned. Pin the keying here so any + // future drift fails CI instead of the next op. + let args = super::build_certipy_auth_args( + "/tmp/ares_relay_xxx/DC01.pfx", + Some("DC01$"), + "contoso.local", + "192.168.58.240", + ); + assert_eq!(args["pfx_path"], "/tmp/ares_relay_xxx/DC01.pfx"); + assert!( + args.get("pfx").is_none(), + "must not emit legacy 'pfx' key — tool requires 'pfx_path'" + ); + // SAM-name strip: the trailing `$` from impacket relay output is + // removed before being passed as `username`. + assert_eq!(args["username"], "DC01"); + assert_eq!(args["domain"], "contoso.local"); + assert_eq!(args["dc_ip"], "192.168.58.240"); + } + + #[test] + fn build_certipy_auth_args_omits_optional_fields_when_empty() { + // domain="" / ca_host="" / relayed_user=None must not leak empty + // strings into the JSON — certipy rejects empty `-domain ''` and + // the tool registry would mark those as missing required args. + let args = super::build_certipy_auth_args("/tmp/x.pfx", None, "", ""); + assert_eq!(args["pfx_path"], "/tmp/x.pfx"); + assert!(args.get("username").is_none()); + assert!(args.get("domain").is_none()); + assert!(args.get("dc_ip").is_none()); + } + + #[test] + fn build_certipy_auth_args_covers_tool_required_keys() { + // Cross-check against the schema in + // `ares-llm/src/tool_registry/privesc/adcs.rs` — required = [domain, + // dc_ip, pfx_path]. If the schema changes the asserts here must + // change too, on purpose. + let args = super::build_certipy_auth_args( + "/tmp/x.pfx", + Some("ws01$"), + "contoso.local", + "192.168.58.240", + ); + for required in ["pfx_path", "dc_ip", "domain"] { + assert!( + args.get(required).is_some(), + "certipy_auth schema requires '{required}', but build_certipy_auth_args omitted it" + ); + } + } + + // --- resolve_relayed_account_realm --------------------------------- + + use crate::orchestrator::state::StateInner; + use ares_core::models::Host; + + fn host(ip: &str, hostname: &str) -> Host { + Host { + ip: ip.into(), + hostname: hostname.into(), + os: String::new(), + roles: Vec::new(), + services: Vec::new(), + is_dc: false, + owned: false, + } + } + + #[test] + fn resolve_realm_prefers_coerce_ip_match() { + // Cross-forest ESC8: vuln record's domain (the CA's forest) is + // *not* the relayed account's home realm. The coerce IP belongs + // to a host in the *other* forest, and that's the realm + // certipy_auth has to target. + let mut state = StateInner::new("op".into()); + state + .hosts + .push(host("192.168.58.58", "dc02.fabrikam.local")); + state + .domain_controllers + .insert("fabrikam.local".into(), "192.168.58.58".into()); + + let (domain, dc_ip) = super::resolve_relayed_account_realm( + &state, + "192.168.58.58", + Some("DC02$"), + "contoso.local", // vuln-record (CA) domain — wrong for auth + "192.168.58.50", // vuln-record CA host — wrong KDC + ); + assert_eq!(domain, "fabrikam.local"); + assert_eq!(dc_ip, "192.168.58.58"); + } + + #[test] + fn resolve_realm_falls_back_to_hostname_stem() { + // No coerce-IP match (e.g. NAT mapped IP), but the host record + // exists indexed by hostname — strip the trailing `$` and look up + // by stem. + let mut state = StateInner::new("op".into()); + state + .hosts + .push(host("192.168.58.10", "ws01.contoso.local")); + state + .domain_controllers + .insert("contoso.local".into(), "192.168.58.240".into()); + + let (domain, dc_ip) = super::resolve_relayed_account_realm( + &state, + "192.168.58.99", // does not match any host IP in 58.x lab + Some("WS01$"), + "fabrikam.local", + "192.168.58.50", + ); + assert_eq!(domain, "contoso.local"); + assert_eq!(dc_ip, "192.168.58.240"); + } + + #[test] + fn resolve_realm_returns_fallbacks_when_no_host_data() { + // No matching host, no relayed_user — preserve existing behavior + // (use the vuln-record domain + CA host as before). + let state = StateInner::new("op".into()); + let (domain, dc_ip) = super::resolve_relayed_account_realm( + &state, + "192.168.58.58", + None, + "contoso.local", + "192.168.58.50", + ); + assert_eq!(domain, "contoso.local"); + assert_eq!(dc_ip, "192.168.58.50"); + } + + #[test] + fn resolve_realm_falls_back_when_host_hostname_has_no_fqdn() { + // Short hostname (no FQDN suffix) means we can't derive a domain — + // use the caller-supplied fallback. + let mut state = StateInner::new("op".into()); + state.hosts.push(host("192.168.58.58", "dc02")); + + let (domain, dc_ip) = super::resolve_relayed_account_realm( + &state, + "192.168.58.58", + Some("DC02$"), + "contoso.local", + "192.168.58.50", + ); + assert_eq!(domain, "contoso.local"); + assert_eq!(dc_ip, "192.168.58.50"); + } + + #[test] + fn resolve_realm_uses_fallback_dc_when_domain_not_in_dc_map() { + // Host record exists with a derivable domain, but + // `domain_controllers` has no entry for it — fall back to the CA + // host as the KDC (caller's `fallback_dc_ip`). The home domain is + // still preferred over the vuln record's domain. + let mut state = StateInner::new("op".into()); + state + .hosts + .push(host("192.168.58.58", "dc02.fabrikam.local")); + // No fabrikam.local entry in domain_controllers. + + let (domain, dc_ip) = super::resolve_relayed_account_realm( + &state, + "192.168.58.58", + Some("DC02$"), + "contoso.local", + "192.168.58.50", + ); + assert_eq!(domain, "fabrikam.local"); + assert_eq!(dc_ip, "192.168.58.50"); + } + + #[test] + fn resolve_realm_relayed_user_is_case_insensitive() { + let mut state = StateInner::new("op".into()); + state + .hosts + .push(host("192.168.58.10", "ws01.contoso.local")); + state + .domain_controllers + .insert("contoso.local".into(), "192.168.58.240".into()); + + let (domain, _) = super::resolve_relayed_account_realm( + &state, + "192.168.58.99", + Some("ws01$"), + "fabrikam.local", + "192.168.58.50", + ); + assert_eq!(domain, "contoso.local"); + } + #[test] fn relay_mode_esc8_default_url_is_none() { // RelayMode::Esc8Http preserves the ESC8 default — the tool layer diff --git a/ares-cli/src/orchestrator/automation/certipy_auth.rs b/ares-cli/src/orchestrator/automation/certipy_auth.rs index af498b33..ad6fdf5b 100644 --- a/ares-cli/src/orchestrator/automation/certipy_auth.rs +++ b/ares-cli/src/orchestrator/automation/certipy_auth.rs @@ -56,8 +56,16 @@ pub async fn auto_certipy_auth(dispatcher: Arc, mut shutdown: watch: } let priority = dispatcher.effective_priority("certipy_auth"); + // Route to `privesc` — `certipy_auth` is registered in the + // privesc toolset (alongside the rest of the ADCS chain). The + // `credential_access` role does NOT carry certipy_auth, so the + // old routing produced an immediate "tool not available in + // this agent's toolset" assist-abandon every dispatch, wasting + // ~30k input tokens per attempt while leaking the captured + // .pfx. The task_type stays `credential_access` because the + // semantic goal is to surface an NT hash. match dispatcher - .throttled_submit("credential_access", "credential_access", payload, priority) + .throttled_submit("credential_access", "privesc", payload, priority) .await { Ok(Some(task_id)) => { diff --git a/ares-cli/src/orchestrator/dispatcher/submission.rs b/ares-cli/src/orchestrator/dispatcher/submission.rs index e27ef383..32e63081 100644 --- a/ares-cli/src/orchestrator/dispatcher/submission.rs +++ b/ares-cli/src/orchestrator/dispatcher/submission.rs @@ -69,25 +69,30 @@ impl Dispatcher { priority: i32, span: tracing::Span, ) -> Result { - // Hard rate cap: if this (task_type, target, principal) pattern - // already ended with `RequestAssistance` once this op, refuse to - // redispatch. The pattern is doomed — usually a missing tool - // primitive, a wrong-realm cred pairing, or a stale automation - // entry — and each re-attempt burns ~30k input tokens loading the - // LLM context only for the agent to bail with the same complaint. - // Re-enabling requires the operator to manually clear the dedup - // (or starts a new op with a wiped Redis). + // Rate cap: if this (task_type, target, principal) pattern ended + // with `RequestAssistance` inside the assist-abandoned TTL, refuse + // to redispatch. The pattern is usually doomed — missing tool + // primitive, wrong-realm cred pairing, stale automation entry — + // and each re-attempt burns ~30k input tokens loading the LLM + // context only for the agent to bail with the same complaint. + // + // After `ASSIST_ABANDONED_TTL_SECS` (10 min by default) elapses, + // one re-dispatch is allowed: parallel cred-harvest may have + // landed the missing material since the abandonment, and the + // earlier no-TTL behavior locked out patterns that became viable. + // If the re-dispatch hits the same failure, the new + // `mark_assist_abandoned` extends the window. let assist_key = assist_pattern_key(task_type, &payload); if let Some(ref key) = assist_key { let state = self.state.read().await; - if state.is_processed(crate::orchestrator::state::DEDUP_ASSIST_ABANDONED, key) { + if state.is_assist_abandoned(key) { drop(state); span.record("automation.decision", "drop_assist_abandoned"); debug!( task_type, target_role, pattern = %key, - "Refusing dispatch — task pattern previously ended with RequestAssistance", + "Refusing dispatch — task pattern within assist-abandoned TTL", ); return Ok(SubmissionOutcome::Dropped); } @@ -441,27 +446,24 @@ impl Dispatcher { tool_outputs_json.clone(), ); // Record this pattern as abandoned so future - // dispatches of (task_type, target, user, domain) - // get refused at throttled_submit. One failure is - // enough — re-running an LLM round on a doomed - // task costs ~30k input tokens for a guaranteed - // repeat of the same "Assistance requested". + // dispatches of (task_type, target, user, + // domain) get refused at throttled_submit + // for the next ASSIST_ABANDONED_TTL_SECS. One + // failure is enough — re-running an LLM round + // on a doomed task costs ~30k input tokens for + // a guaranteed repeat of the same complaint. + // The TTL gives the pattern one re-look later + // in case state has shifted (cred materialized, + // vuln became reachable). if let Some(ref key) = assist_key_for_spawn { - state_for_assist.write().await.mark_processed( - crate::orchestrator::state::DEDUP_ASSIST_ABANDONED, - key.clone(), - ); - let _ = state_for_assist - .persist_dedup( - &queue, - crate::orchestrator::state::DEDUP_ASSIST_ABANDONED, - key, - ) - .await; + state_for_assist + .write() + .await + .mark_assist_abandoned(key.clone()); warn!( task_id = %tid, pattern = %key, - "Marked task pattern as assist-abandoned — future dispatches will be dropped", + "Marked task pattern as assist-abandoned — future dispatches dropped until TTL expires", ); } TaskResult { diff --git a/ares-cli/src/orchestrator/mod.rs b/ares-cli/src/orchestrator/mod.rs index 27ec6ba7..78396707 100644 --- a/ares-cli/src/orchestrator/mod.rs +++ b/ares-cli/src/orchestrator/mod.rs @@ -133,7 +133,7 @@ async fn run_inner() -> Result<()> { let mut shared_state = SharedState::new(config.operation_id.clone()); - // Phase 2 dual-write: install a Nats-backed op-state recorder when NATS is + // install a Nats-backed op-state recorder when NATS is // available. Redis remains authoritative until Phase 4; emit failures are // logged (see `emit_op_state`) but never abort the op. let nats_broker = queue.nats_broker(); @@ -146,7 +146,7 @@ async fn run_inner() -> Result<()> { info!("Op-state event log disabled — no NATS broker on TaskQueue"); } - // Phase 3: spawn the Postgres projector consumer when both NATS and a + // spawn the Postgres projector consumer when both NATS and a // database URL are available. The projector tails ARES_OPSTATE and // upserts each event into PG, replacing the manual `ares ops offload` // path with an always-current archive. @@ -187,7 +187,7 @@ async fn run_inner() -> Result<()> { shared_state.initialize_self_ips().await; - // Phase 4 (opt-in): replay state from the JetStream event log instead of + // replay state from the JetStream event log instead of // loading from Redis. Falls through to Redis on failure or when the env // var is unset, so the default startup path is unchanged. let replay_enabled = std::env::var("ARES_USE_EVENT_LOG_REPLAY").as_deref() == Ok("1"); diff --git a/ares-cli/src/orchestrator/state/dedup.rs b/ares-cli/src/orchestrator/state/dedup.rs index 79864a8a..6999f5c0 100644 --- a/ares-cli/src/orchestrator/state/dedup.rs +++ b/ares-cli/src/orchestrator/state/dedup.rs @@ -59,7 +59,6 @@ impl SharedState { } let _: () = conn.expire(&key, 86400).await?; - // Phase 2 dual-write: append vuln.exploited event. emit_op_state( self.recorder(), &operation_id, diff --git a/ares-cli/src/orchestrator/state/inner.rs b/ares-cli/src/orchestrator/state/inner.rs index 08a1715a..5cb67a83 100644 --- a/ares-cli/src/orchestrator/state/inner.rs +++ b/ares-cli/src/orchestrator/state/inner.rs @@ -15,6 +15,22 @@ const QUARANTINE_DURATION_SECS: i64 = 300; const CAPTURE_IN_FLIGHT_TTL_SECS: i64 = 180; +/// How long an LLM-marked "assist-abandoned" task pattern stays +/// dispatch-blocked before the orchestrator allows a single re-try. +/// +/// The previous behavior (an entry in the generic dedup set with no TTL) +/// turned every `RequestAssistance` into a permanent op-wide drop. That is +/// the right call when the agent's complaint is structural — wrong +/// toolset, missing primitive — but it also fires when the complaint is +/// "no credentials in state yet": minutes later a parallel cred-harvest +/// can land the missing material and the pattern is still locked out. +/// +/// 10 minutes is enough that a doomed pattern won't burn a re-dispatch +/// every 30s tick, and short enough that legitimately fixable patterns +/// get a second look within one LLM-budget worth of latency. Per-op, +/// in-memory only — operator-restart starts everyone fresh, by design. +pub(crate) const ASSIST_ABANDONED_TTL_SECS: i64 = 600; + #[derive(Debug)] pub struct StateInner { pub operation_id: String, @@ -64,6 +80,15 @@ pub struct StateInner { // `dominated_domains`, and the TTL is the safety valve for silent fails. pub credential_capture_in_flight: HashMap>, + /// Patterns the LLM ended a task on with `RequestAssistance`, with the + /// timestamp the abandonment was recorded. Read by + /// `throttled_submit_outcome` to drop re-dispatches of doomed patterns + /// until `ASSIST_ABANDONED_TTL_SECS` elapses, at which point a single + /// re-try is allowed in case state has shifted (new cred, new vuln). + /// In-memory only — see the const comment for why this isn't + /// persisted. + pub assist_abandoned_at: HashMap>, + // Flags pub has_domain_admin: bool, pub has_golden_ticket: bool, @@ -171,6 +196,7 @@ impl StateInner { trusted_domains: HashMap::new(), dominated_domains: HashSet::new(), credential_capture_in_flight: HashMap::new(), + assist_abandoned_at: HashMap::new(), has_domain_admin: false, has_golden_ticket: false, domain_admin_path: None, @@ -584,6 +610,38 @@ impl StateInner { } } + /// Record an LLM-marked "assist-abandoned" pattern at `now`. + /// Time is injectable so the TTL behavior is unit-testable without + /// real-time clocks. + pub fn mark_assist_abandoned_at(&mut self, key: String, now: DateTime) { + self.assist_abandoned_at.insert(key, now); + } + + /// Convenience wrapper around `mark_assist_abandoned_at` that uses + /// the current UTC time. Call sites in production code use this. + pub fn mark_assist_abandoned(&mut self, key: String) { + self.mark_assist_abandoned_at(key, Utc::now()); + } + + /// Return true when `key` is currently within the assist-abandoned + /// window (i.e. `now - abandoned_at < ASSIST_ABANDONED_TTL_SECS`). + /// An expired entry returns false without being cleaned up — the + /// bounded per-op pattern space makes lazy GC fine; the next + /// `mark_assist_abandoned` for the same key overwrites the stale + /// entry. + pub fn is_assist_abandoned_at(&self, key: &str, now: DateTime) -> bool { + let Some(at) = self.assist_abandoned_at.get(key) else { + return false; + }; + now.signed_duration_since(*at).num_seconds() < ASSIST_ABANDONED_TTL_SECS + } + + /// Convenience wrapper around `is_assist_abandoned_at` for production + /// call sites. + pub fn is_assist_abandoned(&self, key: &str) -> bool { + self.is_assist_abandoned_at(key, Utc::now()) + } + /// Remove every key in `set_name` that starts with `prefix`. Returns the /// removed keys so the caller can also drop them from the persisted store. /// Used by trust automation to wake cross-forest fallback automations @@ -711,6 +769,60 @@ mod tests { assert!(removed.is_empty()); } + // --- assist-abandoned TTL ---------------------------------------- + + #[test] + fn assist_abandoned_starts_false() { + let state = StateInner::new("op-1".into()); + assert!(!state.is_assist_abandoned("any:key")); + } + + #[test] + fn assist_abandoned_marked_now_is_blocked() { + let mut state = StateInner::new("op-1".into()); + state.mark_assist_abandoned("credential_access|192.168.58.10|alice|contoso.local".into()); + assert!(state.is_assist_abandoned("credential_access|192.168.58.10|alice|contoso.local")); + } + + #[test] + fn assist_abandoned_expires_after_ttl() { + let mut state = StateInner::new("op-1".into()); + let key = "credential_access|192.168.58.10|alice|contoso.local".to_string(); + let old = Utc::now() - chrono::Duration::seconds(ASSIST_ABANDONED_TTL_SECS + 1); + state.mark_assist_abandoned_at(key.clone(), old); + // Within window: still blocked relative to `old + 1s`. + assert!(state.is_assist_abandoned_at(&key, old + chrono::Duration::seconds(1))); + // Past the TTL: re-dispatch allowed. + assert!(!state.is_assist_abandoned_at( + &key, + old + chrono::Duration::seconds(ASSIST_ABANDONED_TTL_SECS + 2), + )); + // And the production helper, which uses `Utc::now()`, also reports false + // because `old` was placed past the TTL. + assert!(!state.is_assist_abandoned(&key)); + } + + #[test] + fn assist_abandoned_remark_extends_window() { + // A repeat RequestAssistance after the TTL elapses should re-arm + // the block (orchestrator marks again on every failure). + let mut state = StateInner::new("op-1".into()); + let key = "k".to_string(); + let old = Utc::now() - chrono::Duration::seconds(ASSIST_ABANDONED_TTL_SECS + 100); + state.mark_assist_abandoned_at(key.clone(), old); + assert!(!state.is_assist_abandoned(&key)); + state.mark_assist_abandoned(key.clone()); + assert!(state.is_assist_abandoned(&key)); + } + + #[test] + fn assist_abandoned_keys_independent() { + let mut state = StateInner::new("op-1".into()); + state.mark_assist_abandoned("pattern_a".into()); + assert!(state.is_assist_abandoned("pattern_a")); + assert!(!state.is_assist_abandoned("pattern_b")); + } + #[test] fn credential_capture_in_flight_initially_empty() { let state = StateInner::new("op-1".into()); @@ -868,7 +980,6 @@ mod tests { DEDUP_MSSQL_RETRY, DEDUP_MSSQL_LINK_PIVOT, DEDUP_MSSQL_IMPERSONATION, - DEDUP_ASSIST_ABANDONED, DEDUP_SID_HISTORY, ]; assert_eq!(expected.len(), ALL_DEDUP_SETS.len()); diff --git a/ares-cli/src/orchestrator/state/mod.rs b/ares-cli/src/orchestrator/state/mod.rs index a2f89480..ed389d54 100644 --- a/ares-cli/src/orchestrator/state/mod.rs +++ b/ares-cli/src/orchestrator/state/mod.rs @@ -98,14 +98,10 @@ pub const DEDUP_MSSQL_LINK_PIVOT: &str = "mssql_link_pivot"; /// so the next tick re-attempts up to MAX_IMPERSONATION_ATTEMPTS. pub const DEDUP_MSSQL_IMPERSONATION: &str = "mssql_impersonation_auto"; -/// Task patterns that ended with `RequestAssistance` are recorded here and -/// refused on re-dispatch. Each entry is a canonical key of -/// `(task_type, target_ip, username, domain)` — enough to identify a -/// repeating doomed dispatch without false-positiving unrelated work -/// against the same target. Caps token burn from automations that keep -/// firing the same impossible task (e.g. missing tool primitive, no auth -/// resolvable for the principal, wrong-realm pairing). -pub const DEDUP_ASSIST_ABANDONED: &str = "assist_abandoned"; +// Assist-abandoned tracking moved off the generic dedup set into a +// timestamped HashMap on `StateInner` (`assist_abandoned_at`) so the +// abandonment can expire. See `ASSIST_ABANDONED_TTL_SECS` in +// `state/inner.rs` for the TTL and the rationale. /// Dedup for `auto_sid_history_enum` — one LDAP `(sIDHistory=*)` probe per /// (domain, DC) pair. The probe is a read-only LDAP query and the result @@ -181,7 +177,6 @@ const ALL_DEDUP_SETS: &[&str] = &[ DEDUP_MSSQL_RETRY, DEDUP_MSSQL_LINK_PIVOT, DEDUP_MSSQL_IMPERSONATION, - DEDUP_ASSIST_ABANDONED, DEDUP_SID_HISTORY, ]; diff --git a/ares-cli/src/orchestrator/state/publishing/credentials.rs b/ares-cli/src/orchestrator/state/publishing/credentials.rs index e5201e09..ec80b230 100644 --- a/ares-cli/src/orchestrator/state/publishing/credentials.rs +++ b/ares-cli/src/orchestrator/state/publishing/credentials.rs @@ -85,7 +85,7 @@ impl SharedState { let mut conn = queue.connection(); let added = reader.add_credential(&mut conn, &cred).await?; if added { - // Phase 2 dual-write: append to the op-state log after Redis confirms + // Append to the op-state log after Redis confirms // the credential is new (Redis is the dedup oracle). emit_op_state( self.recorder(), @@ -177,7 +177,7 @@ impl SharedState { } return Ok(false); } - // Phase 2 dual-write: emit before consuming `hash` into state. + // Emit before consuming `hash` into state. emit_op_state( self.recorder(), &operation_id, diff --git a/ares-cli/src/orchestrator/state/publishing/hosts.rs b/ares-cli/src/orchestrator/state/publishing/hosts.rs index 5fa07923..1104bbe8 100644 --- a/ares-cli/src/orchestrator/state/publishing/hosts.rs +++ b/ares-cli/src/orchestrator/state/publishing/hosts.rs @@ -273,7 +273,7 @@ impl SharedState { let mut conn = queue.connection(); reader.add_host(&mut conn, &host).await?; - // Phase 2 dual-write: emit host.discovered for net-new hosts only. + // Emit host.discovered for net-new hosts only. // Merges return earlier; HostUpdated is intentionally not yet a variant. emit_op_state( self.recorder(), diff --git a/ares-cli/src/orchestrator/state/publishing/mod.rs b/ares-cli/src/orchestrator/state/publishing/mod.rs index 692a7dab..b747168f 100644 --- a/ares-cli/src/orchestrator/state/publishing/mod.rs +++ b/ares-cli/src/orchestrator/state/publishing/mod.rs @@ -17,8 +17,7 @@ use std::sync::LazyLock; /// Emit a single op-state event through the recorder. No-op when the recorder /// is disabled; otherwise builds an [`OpStateEvent`] and forwards to the -/// recorder. Publish failures are logged at WARN — Phase 2 keeps Redis -/// authoritative so a transient broker outage must not abort the call. +/// recorder. Publish failures are logged at WARN pub(super) async fn emit_op_state( recorder: &OpStateRecorder, op_id: &str, diff --git a/ares-cli/src/orchestrator/state/replay.rs b/ares-cli/src/orchestrator/state/replay.rs index cc90fedf..6dd11c16 100644 --- a/ares-cli/src/orchestrator/state/replay.rs +++ b/ares-cli/src/orchestrator/state/replay.rs @@ -1,12 +1,12 @@ //! Replay operation state from the JetStream `ARES_OPSTATE` event log. //! -//! Phase 4 cutover primitive. The pure +//! The pure //! [`apply_event_to_state`] function knows how to mutate [`StateInner`] for //! every [`OpStateEventPayload`] variant. The async //! [`SharedState::load_from_event_log`] driver reads the stream up to the //! current sequence and applies events in order. //! -//! Scope limitations (see Phase 4 design doc): +//! Scope limitations: //! - The current event types cover entities only (credentials, hashes, //! hosts, users, vulns, timeline). They do NOT carry derived state like //! `has_domain_admin`, `dominated_domains`, `domain_controllers`, or the @@ -35,7 +35,7 @@ use super::inner::StateInner; use super::SharedState; /// Lightweight, serialisable snapshot of operation state reconstructed from -/// the event log. Used by `ares ops replay` and other Phase 5 tooling. +/// the event log. Used by `ares ops replay` /// /// Holds only the entity collections that the event log carries today /// (no derived state — see Phase 4 limitations). @@ -243,7 +243,7 @@ impl SharedState { /// no new messages arrive within [`REPLAY_IDLE_TIMEOUT`] — the stream is /// considered drained. /// - /// Phase 4 entry point. Opt-in: orchestrator checks + /// Opt-in: orchestrator checks /// `ARES_USE_EVENT_LOG_REPLAY=1` before calling. pub async fn load_from_event_log(&self, nats: &NatsBroker) -> Result { let op_id = self.operation_id().await; diff --git a/ares-core/src/nats.rs b/ares-core/src/nats.rs index ace68b7f..b1ed69b7 100644 --- a/ares-core/src/nats.rs +++ b/ares-core/src/nats.rs @@ -229,8 +229,7 @@ impl NatsBroker { /// concurrency; a mismatch surfaces as [`OpStatePublishError::Conflict`]. /// /// Awaits the JetStream ack. Callers in hot agent paths should treat a - /// transient failure as non-fatal and log — Phase 2 dual-write keeps Redis - /// authoritative until Phase 4 cutover. + /// transient failure as non-fatal and log pub async fn publish_op_state_event( &self, event: &OpStateEvent, diff --git a/ares-core/src/op_state_log.rs b/ares-core/src/op_state_log.rs index 952ac2e4..0b2a37ef 100644 --- a/ares-core/src/op_state_log.rs +++ b/ares-core/src/op_state_log.rs @@ -7,9 +7,9 @@ //! up a NATS server. Components that have not been wired into the event log //! yet hold [`OpStateRecorder::Disabled`] and the recorder becomes a no-op. //! -//! Phase 2 of the JetStream-as-source-of-truth rollout: publish failures are +//! publish failures are //! logged at the call site but never abort the operation, because Redis is -//! still authoritative until the Phase 4 cutover. +//! still authoritative until the cutover. use std::sync::Arc; @@ -41,7 +41,7 @@ impl OpStateRecorder { Self::Disabled } - /// Construct a Nats-backed recorder. Phase-2 dual-write entry point. + /// Construct a Nats-backed recorder. pub fn nats(broker: Arc) -> Self { Self::Nats(broker) }