From ae7832a065309278fba83be2be81ca5708c71e9b Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 02:44:12 +0000 Subject: [PATCH] =?UTF-8?q?Fix=20five=20code-review=20bugs=20(#37=E2=80=93?= =?UTF-8?q?#41)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #37 security: validate git-include URLs before cloning. Reject git's remote-helper transport syntax (`ext::`, `fd::`, any `::`) and leading-`-`/control chars in `validate_git_url`, applied at `parse_target` (so dangerous directives never parse) and in `git_clone` (defence in depth, plus `--` separator). `run_git` now exports `GIT_ALLOW_PROTOCOL=https:ssh:git:file`. Same validator is applied to `memory_sync`'s remote_url. #38 panic: truncate subagent task labels in GET /v1/tasks by char count, not raw byte slice — multibyte (CJK/emoji) names >48 bytes no longer panic (and no longer crash the WS socket). #39 streaming agent loop: when a terminal tool is emitted alongside sibling calls, synthesize a cancelled tool_result for every unprocessed sibling on the sequential path (avoids orphaned tool_calls → provider 400 on resume), and emit a closing ToolEnd for cancelled in-flight calls on the parallel path (no dangling ToolStart on the wire). #40 fs.patch atomicity: make the disk-commit phase atomic. Snapshot each target's prior content, and on any mid-batch I/O failure roll back every already-committed write/delete so the working tree is left unchanged. #41 SummarizingMemory: place the summary at the chronological position of the dropped turns. On the cache-breakpoint path the kept set is non-contiguous (cached prefix + recent tail); the summary now sits between them instead of before the cached prefix. Adds regression tests for all five. https://claude.ai/code/session_013gxHmLU1sk8WabMxPvMRf7 --- crates/harness-core/src/agent.rs | 194 ++++++++++++++++++++- crates/harness-memory/src/summarizing.rs | 110 ++++++++++-- crates/harness-server/src/tasks_routes.rs | 42 ++++- crates/harness-tools/src/memory_include.rs | 101 +++++++++++ crates/harness-tools/src/memory_sync.rs | 25 ++- crates/harness-tools/src/patch.rs | 137 +++++++++++++-- 6 files changed, 567 insertions(+), 42 deletions(-) diff --git a/crates/harness-core/src/agent.rs b/crates/harness-core/src/agent.rs index 72aa8dc..3eeddb3 100644 --- a/crates/harness-core/src/agent.rs +++ b/crates/harness-core/src/agent.rs @@ -726,7 +726,7 @@ impl Agent { ); if !parallel { - for call in tool_calls { + for (call_idx, call) in tool_calls.iter().enumerate() { // Decide whether this call goes through the // approver. We check the trait flag inline // so that the `ApprovalRequest` event lands @@ -908,6 +908,38 @@ impl Agent { .map(|t| t.is_terminal()) .unwrap_or(false); if is_terminal { + // The model may have emitted more tool + // calls *after* the terminal one in the + // same batch. We skip executing them, but + // every `tool_call` in the assistant + // message still needs a matching + // `tool_result` or the persisted + // conversation is unresumable (OpenAI / + // Anthropic 400 on orphaned tool calls). + // Synthesize a cancellation result for + // each unprocessed sibling, mirroring the + // parallel path, and keep the wire's + // ToolStart/ToolEnd pairing intact. + for sibling in &tool_calls[call_idx + 1..] { + let cancelled = format!( + "tool cancelled: terminal tool '{}' ended the turn", + call.name + ); + yield AgentEvent::ToolStart { + id: sibling.id.clone(), + name: sibling.name.clone(), + arguments: sibling.arguments.clone(), + }; + conversation.messages.push(Message::tool_result( + &sibling.id, + cancelled.clone(), + )); + yield AgentEvent::ToolEnd { + id: sibling.id.clone(), + name: sibling.name.clone(), + content: cancelled, + }; + } yield AgentEvent::PlanProposed { plan: output }; yield AgentEvent::Done { conversation: conversation.clone(), @@ -1159,11 +1191,28 @@ impl Agent { // Phase 3: pump events + collect outputs. let mut outputs: Vec> = std::iter::repeat_with(|| None).take(n).collect(); + // Track which calls have had a genuine ToolEnd + // emitted by their future, so a terminal sibling + // that cancels in-flight calls doesn't leave a + // ToolStart dangling on the wire (and so we never + // double-emit ToolEnd for a call whose future + // completed before we tore the dispatch down). + let mut tool_ended: Vec = vec![false; n]; + let mark_ended = |ev: &AgentEvent, ended: &mut [bool]| { + if let AgentEvent::ToolEnd { id, .. } = ev { + if let Some(i) = + tool_calls.iter().position(|c| c.id == *id) + { + ended[i] = true; + } + } + }; let mut terminal_idx: Option = None; loop { tokio::select! { biased; Some(ev) = event_rx.recv() => { + mark_ended(&ev, &mut tool_ended); yield ev; } Some((idx, output)) = invokes.next() => { @@ -1197,6 +1246,7 @@ impl Agent { // client sees them before the appended // tool_result rows. while let Ok(ev) = event_rx.try_recv() { + mark_ended(&ev, &mut tool_ended); yield ev; } @@ -1215,6 +1265,20 @@ impl Agent { }) .collect(); + // Any call whose future was cancelled before it + // emitted ToolEnd still has an open ToolStart on + // the wire — close it so a UI keyed on the pair + // doesn't show a card stuck "running" forever. + for (idx, call) in tool_calls.iter().enumerate() { + if !tool_ended[idx] { + yield AgentEvent::ToolEnd { + id: call.id.clone(), + name: call.name.clone(), + content: final_contents[idx].clone(), + }; + } + } + for (call, content) in tool_calls.iter().zip(final_contents.iter()) { @@ -2119,4 +2183,132 @@ mod tests { Agent::ensure_system_prompt(&mut conv, None, true); assert!(matches!(conv.messages[0], Message::System { ref content, .. } if content == "KEEP")); } + + /// LLM that emits a fixed batch of tool calls on the first turn, + /// then stops. Used to exercise the terminal-tool batch path. + struct BatchLlm { + iter: AtomicUsize, + calls: Vec, + } + #[async_trait::async_trait] + impl LlmProvider for BatchLlm { + async fn complete(&self, _req: ChatRequest) -> Result { + let i = self.iter.fetch_add(1, Ordering::SeqCst); + if i == 0 { + Ok(ChatResponse { + message: Message::Assistant { + content: None, + tool_calls: self.calls.clone(), + reasoning_content: None, + cache: None, + }, + finish_reason: FinishReason::ToolCalls, + response_id: None, + usage: None, + }) + } else { + Ok(ChatResponse { + message: Message::assistant_text("done"), + finish_reason: FinishReason::Stop, + response_id: None, + usage: None, + }) + } + } + } + + /// A terminal tool (mirrors `exit_plan`): ends the turn even when + /// the model emitted sibling calls in the same batch. + struct TerminalTool; + #[async_trait::async_trait] + impl Tool for TerminalTool { + fn name(&self) -> &str { + "exit_plan" + } + fn description(&self) -> &str { + "terminal" + } + fn parameters(&self) -> Value { + json!({"type": "object"}) + } + fn is_terminal(&self) -> bool { + true + } + async fn invoke(&self, _args: Value) -> std::result::Result { + Ok("PLAN".into()) + } + } + + #[tokio::test] + async fn streaming_terminal_tool_fills_sibling_results_and_pairs_events() { + // Regression: when a terminal tool is emitted alongside sibling + // calls in the same batch, the sequential streaming path used to + // return immediately, leaving the siblings without tool_result + // rows (orphaned tool_calls → provider 400 on resume) and any + // ToolStart unpaired on the wire. + use futures::StreamExt; + + let calls = vec![ + ToolCall { + id: "t".into(), + name: "exit_plan".into(), + arguments: json!({}), + }, + ToolCall { + id: "s".into(), + name: "safe".into(), + arguments: json!({}), + }, + ]; + let mut registry = ToolRegistry::new(); + registry.register_arc(Arc::new(TerminalTool) as Arc); + registry.register_arc(CountingTool::new("safe", false) as Arc); + let cfg = AgentConfig::new("test-model").with_tools(registry); + let llm = Arc::new(BatchLlm { + iter: AtomicUsize::new(0), + calls, + }); + let agent = Arc::new(Agent::new(llm as _, cfg)); + + let mut stream = agent.run_stream(Conversation::new()); + let mut starts: Vec = Vec::new(); + let mut ends: Vec = Vec::new(); + let mut final_conv: Option = None; + while let Some(ev) = stream.next().await { + match ev { + AgentEvent::ToolStart { id, .. } => starts.push(id), + AgentEvent::ToolEnd { id, .. } => ends.push(id), + AgentEvent::Done { conversation, .. } => final_conv = Some(conversation), + _ => {} + } + } + + // Every ToolStart has a matching ToolEnd (wire pairing). + starts.sort(); + ends.sort(); + assert_eq!(starts, ends, "ToolStart/ToolEnd not paired"); + + // Both tool calls have a tool_result in the persisted conversation. + let conv = final_conv.expect("Done event with conversation"); + let result_ids: Vec<&str> = conv + .messages + .iter() + .filter_map(|m| match m { + Message::Tool { tool_call_id, .. } => Some(tool_call_id.as_str()), + _ => None, + }) + .collect(); + assert!(result_ids.contains(&"t"), "terminal result missing"); + assert!( + result_ids.contains(&"s"), + "sibling tool_call orphaned (no tool_result): {result_ids:?}" + ); + // The sibling reply is the synthetic cancellation sentinel. + let sibling_cancelled = conv.messages.iter().any(|m| matches!( + m, + Message::Tool { tool_call_id, content, .. } + if tool_call_id == "s" && content.starts_with("tool cancelled:") + )); + assert!(sibling_cancelled, "sibling not marked cancelled"); + } } diff --git a/crates/harness-memory/src/summarizing.rs b/crates/harness-memory/src/summarizing.rs index 4a8426a..eb32906 100644 --- a/crates/harness-memory/src/summarizing.rs +++ b/crates/harness-memory/src/summarizing.rs @@ -387,26 +387,56 @@ impl Memory for SummarizingMemory { for &i in &system_idxs { out.push(messages[i].clone()); } - let summary_chars: Option = if let Some(s) = summary { - let chars = s.chars().count(); - out.push(Message::system(format!( + + // The summary stands in for the dropped turns and must occupy + // *their* chronological position. On the cache-breakpoint path + // the kept set can be non-contiguous (cached prefix + recent + // tail with a hole in the middle); inserting the summary right + // after the leading systems would invert chronology, presenting + // the summarised middle as if it preceded the cached prefix. + // + // Instead we walk turns in original order: kept turns are + // emitted in place, and the summary/placeholder is emitted once, + // at the position of the first dropped turn — i.e. after any + // cached-prefix turns and before the recent tail. + let summary_msg: Option = if let Some(s) = &summary { + Some(Message::system(format!( "Earlier conversation summary ({dropped_count} turn(s) compressed):\n{s}" - ))); - Some(chars) + ))) + } else if dropped_count > 0 { + // Surfacing the gap explicitly is better than silent + // truncation; keeps the model from getting confused + // about why the conversation seems to start mid-thought. + Some(Message::system(format!( + "[{dropped_count} earlier turn(s) omitted — summary unavailable]" + ))) } else { - if dropped_count > 0 { - // Surfacing the gap explicitly is better than silent - // truncation; keeps the model from getting confused - // about why the conversation seems to start mid-thought. - out.push(Message::system(format!( - "[{dropped_count} earlier turn(s) omitted — summary unavailable]" - ))); - } None }; - for turn in kept { - for &i in turn { - out.push(messages[i].clone()); + let summary_chars: Option = summary.as_ref().map(|s| s.chars().count()); + + let mut summary_emitted = false; + for turn in &turns { + let is_kept = kept.iter().any(|k| std::ptr::eq(*k, turn)); + if is_kept { + for &i in turn { + out.push(messages[i].clone()); + } + } else if !summary_emitted { + // First dropped turn — drop the summary in here. + if let Some(msg) = &summary_msg { + out.push(msg.clone()); + } + summary_emitted = true; + } + // Subsequent dropped turns are simply skipped — they're + // already represented by the single summary message. + } + // Edge case: every turn was dropped (no kept turns), so the + // loop never emitted the summary. Append it now. + if !summary_emitted { + if let Some(msg) = &summary_msg { + out.push(msg.clone()); } } // Append the agent's working-context snapshot. Same helper @@ -1010,6 +1040,54 @@ mod tests { assert_eq!(llm.calls.load(Ordering::SeqCst), 0); } + #[tokio::test] + async fn breakpoint_summary_sits_between_cached_prefix_and_recent_tail() { + // Regression for the chronology-inversion bug: on the cache + // breakpoint path the kept set is non-contiguous (cached prefix + // + recent tail with a dropped hole). The summary of the hole + // must land *between* them, not before the cached prefix. + let llm = FakeLlm::new("MIDDLE_SUMMARY"); + let mem = SummarizingMemory::new(llm.clone(), "test-model", 400); + + let big = "lorem ipsum dolor sit amet ".repeat(80); // ~2160 chars + let msgs = vec![ + system("sys"), + user("cached q1"), + // Cache breakpoint on the assistant reply of the first turn. + assistant("cached a1").with_cache(harness_core::CacheHint::Ephemeral), + user(&format!("middle q2 {big}")), + assistant(&format!("middle a2 {big}")), + user(&format!("middle q3 {big}")), + assistant(&format!("middle a3 {big}")), + user("recent q4"), + assistant("recent a4"), + ]; + let out = mem.compact(&msgs).await.unwrap(); + + let pos = |needle: &str| { + out.iter().position(|m| match m { + Message::System { content, .. } | Message::User { content, .. } => { + content.contains(needle) + } + Message::Assistant { content, .. } => { + content.as_deref().is_some_and(|c| c.contains(needle)) + } + _ => false, + }) + }; + + let cached = pos("cached a1").expect("cached prefix kept"); + let summary = pos("MIDDLE_SUMMARY").expect("summary inserted"); + let recent = pos("recent q4").expect("recent tail kept"); + assert!( + cached < summary && summary < recent, + "summary must sit between cached prefix and recent tail; \ + got cached={cached} summary={summary} recent={recent}: {out:#?}" + ); + // The dropped middle turns must be gone. + assert!(pos("middle q2").is_none(), "middle turn should be dropped"); + } + #[tokio::test] async fn over_budget_inserts_summary() { let llm = FakeLlm::new("ALPHA AND BETA HAPPENED"); diff --git a/crates/harness-server/src/tasks_routes.rs b/crates/harness-server/src/tasks_routes.rs index da640cd..dd32d93 100644 --- a/crates/harness-server/src/tasks_routes.rs +++ b/crates/harness-server/src/tasks_routes.rs @@ -122,8 +122,11 @@ pub(crate) async fn collect_tasks(state: &AppState) -> Vec { // Compose a concise label: ": " with // the task truncated so the panel row stays readable. let task_head = r.task.as_deref().unwrap_or(""); - let task_short = if task_head.len() > 48 { - format!("{}…", &task_head[..47]) + // Truncate by char count, not byte index — a raw byte slice + // panics when byte 47 splits a multibyte char (CJK, emoji), + // and this runs inside the WS turn-terminal handler. + let task_short = if task_head.chars().count() > 48 { + format!("{}…", task_head.chars().take(47).collect::()) } else { task_head.to_string() }; @@ -305,6 +308,41 @@ mod tests { assert_eq!(items[0]["status"], "running"); } + #[tokio::test] + async fn multibyte_task_name_does_not_panic() { + // Regression: the task label was truncated with a raw byte + // slice, which panics when byte 47 splits a multibyte char. + // The task text is LLM/agent-supplied, so a long CJK task name + // must not crash GET /v1/tasks (and, transitively, the WS). + use crate::subagent_runs::SubAgentRunRegistry; + use harness_core::subagent::{SubAgentEvent, SubAgentFrame}; + + let runs = SubAgentRunRegistry::new(); + // 60 CJK chars = 180 bytes; byte index 47 lands mid-character. + let task = "验证代码变更并运行测试套件确保一切正常工作完成质量检查任务".repeat(2); + assert!(task.len() > 48 && !task.is_char_boundary(47)); + runs.record_frame( + Some("conv-cjk"), + &SubAgentFrame { + subagent_id: "sub-1".into(), + subagent_name: "review".into(), + event: SubAgentEvent::Started { + task: task.clone(), + model: None, + }, + }, + ); + let state = mk_state().with_subagent_runs(runs); + let resp = list_tasks(State(state)).await.into_response(); + let body = read_json(resp).await; + let items = body["items"].as_array().unwrap(); + assert_eq!(items.len(), 1, "expected the subagent run, got: {items:?}"); + assert_eq!(items[0]["kind"], "subagent_run"); + // Label is truncated to 47 chars + ellipsis, on a char boundary. + let label = items[0]["label"].as_str().unwrap(); + assert!(label.ends_with('…'), "expected ellipsis, got: {label}"); + } + #[tokio::test] async fn completed_chat_run_does_not_surface() { let state = mk_state(); diff --git a/crates/harness-tools/src/memory_include.rs b/crates/harness-tools/src/memory_include.rs index a860a23..6e45888 100644 --- a/crates/harness-tools/src/memory_include.rs +++ b/crates/harness-tools/src/memory_include.rs @@ -40,6 +40,62 @@ use tokio::process::Command; const GIT_TIMEOUT_MS: u64 = 60_000; +/// Protocols git is permitted to use when cloning an include URL. +/// Exported into the child process as `GIT_ALLOW_PROTOCOL` so git's +/// `ext::` / `fd::` transports (which execute arbitrary commands and +/// are honoured by default for a directly-invoked clone) can never +/// fire from attacker-controlled memory content. +const GIT_ALLOW_PROTOCOL: &str = "https:ssh:git:file"; + +/// Reject git URLs that could lead to command execution or option +/// smuggling before they ever reach `git clone`. +/// +/// Three distinct dangers: +/// - git's *remote-helper transports* (`ext::`, `fd::`, and any +/// `::
` form) run arbitrary commands. These are +/// distinguished by a `name::` prefix, which we reject outright. +/// - a leading `-` would be parsed by git as an option, not a URL. +/// - embedded NUL / newline bytes. +/// +/// Everything else is allowed: ordinary `https://` / `ssh://` / +/// `git://` / `file://` URLs, the scp-like `git@host:path` shorthand, +/// and plain local filesystem paths (which git clones over the safe +/// local transport). Defence in depth: `git_clone` also passes `--` +/// and `run_git` exports `GIT_ALLOW_PROTOCOL`. +pub fn validate_git_url(url: &str) -> Result<(), BoxError> { + let u = url.trim(); + if u.is_empty() { + return Err("git include URL is empty".into()); + } + if u.starts_with('-') { + return Err(format!("git include URL must not start with '-': {u}").into()); + } + if u.bytes().any(|b| b == 0 || b == b'\n' || b == b'\r') { + return Err("git include URL contains control characters".into()); + } + // Detect git's remote-helper transport syntax `::` + // (e.g. `ext::sh -c id`, `fd::17/foo`). The transport name sits at + // the very start, contains no '/', and is followed by `::`. A real + // scheme uses `://` (single colon + slashes), so `https://…` and an + // IPv6 literal like `ssh://[::1]/r` never match (their text before + // the first `::` contains a '/'). + if let Some(p) = u.find("::") { + let prefix = &u[..p]; + let looks_like_transport = !prefix.is_empty() + && !prefix.contains('/') + && prefix + .chars() + .all(|c| c.is_ascii_alphanumeric() || matches!(c, '+' | '.' | '-')); + if looks_like_transport { + return Err(format!( + "git include URL uses a remote-helper transport (command execution risk): {u}" + ) + .into()); + } + } + Ok(()) +} + /// One parsed `` line. Stays /// closer to the wire format than to the on-disk path so the /// caller can dedup / display the original directive verbatim. @@ -84,6 +140,12 @@ impl IncludeDirective { } _ => (rest.to_string(), None), }; + // Reject command-executing transports (ext::/fd::) and + // option-smuggling URLs at parse time so a malicious + // directive never even reaches the cache/clone path. + if validate_git_url(&url).is_err() { + return None; + } return Some(IncludeDirective::GitUrl { url, branch }); } Some(IncludeDirective::LocalPath(trimmed.to_string())) @@ -287,11 +349,17 @@ fn directive_slug(url: &str, branch: Option<&str>) -> String { } async fn git_clone(url: &str, branch: Option<&str>, dest: &Path) -> Result<(), BoxError> { + // Defence in depth: parse_target already rejects dangerous URLs, + // but re-validate here so any future caller of git_clone is + // covered too. + validate_git_url(url)?; let mut args: Vec<&str> = vec!["clone", "--depth", "1"]; if let Some(b) = branch { args.push("--branch"); args.push(b); } + // `--` ensures the URL is never parsed as an option. + args.push("--"); args.push(url); let dest_str = dest.to_string_lossy().to_string(); args.push(&dest_str); @@ -414,6 +482,10 @@ pub async fn refresh_git_cache( async fn run_git(args: &[&str]) -> Result<(bool, String, String), BoxError> { let mut cmd = Command::new("git"); cmd.args(args) + // Belt-and-suspenders against git's command-executing + // transports (ext::/fd::): even if a bad URL slipped through + // validation, git refuses any protocol not listed here. + .env("GIT_ALLOW_PROTOCOL", GIT_ALLOW_PROTOCOL) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -485,6 +557,35 @@ mod tests { assert!(IncludeDirective::parse_target(" ").is_none()); } + #[test] + fn validate_git_url_allows_normal_forms() { + for ok in [ + "https://github.com/me/repo.git", + "ssh://git@github.com/me/repo.git", + "git://host/r.git", + "file:///srv/git/r.git", + "git@github.com:me/repo.git", + "/tmp/local/bare/repo", + "./relative/repo", + "ssh://git@[::1]:22/r.git", // IPv6 literal must not trip the `::` check + ] { + assert!(validate_git_url(ok).is_ok(), "expected ok: {ok}"); + } + } + + #[test] + fn validate_git_url_rejects_command_executing_transports() { + for bad in ["ext::sh -c id", "fd::17/foo", "-upload-pack=evil", "--foo"] { + assert!(validate_git_url(bad).is_err(), "expected err: {bad}"); + } + } + + #[test] + fn parse_target_drops_command_executing_git_transport() { + // The dangerous directive must not even parse into a GitUrl. + assert!(IncludeDirective::parse_target("git+ext::sh -c id").is_none()); + } + #[test] fn parse_directives_from_memory_md_skips_non_directives() { let text = "\n\n\n# Real content\n- [x](x.md)\n\n"; diff --git a/crates/harness-tools/src/memory_sync.rs b/crates/harness-tools/src/memory_sync.rs index ea7ff7f..2f4ec48 100644 --- a/crates/harness-tools/src/memory_sync.rs +++ b/crates/harness-tools/src/memory_sync.rs @@ -607,11 +607,10 @@ impl Tool for MemorySyncSetupTool { if remote_url.is_empty() { return Err("`remote_url` must not be empty".into()); } - // Refuse anything that looks like a flag — the URL goes - // verbatim into `git remote add`. - if remote_url.starts_with('-') { - return Err(format!("invalid remote_url `{remote_url}`").into()); - } + // Refuse anything that looks like a flag or uses a + // command-executing transport (ext::/fd::) — the URL goes + // verbatim into `git remote add` and is later pushed to. + crate::memory_include::validate_git_url(remote_url)?; let scope = args .get("scope") .and_then(Value::as_str) @@ -1126,7 +1125,21 @@ mod tests { .invoke(json!({"remote_url": "--upload-pack=evil"})) .await .unwrap_err(); - assert!(err.to_string().contains("invalid remote_url")); + assert!(err.to_string().contains("start with '-'")); + } + + #[tokio::test] + async fn sync_setup_rejects_command_executing_transport() { + // git's `ext::` remote-helper transport runs arbitrary + // commands; the validator must refuse it before `git remote + // add` ever sees it. + let user = tempdir().unwrap(); + let tool = MemorySyncSetupTool::new(user_only(user.path())); + let err = tool + .invoke(json!({"remote_url": "ext::sh -c id"})) + .await + .unwrap_err(); + assert!(err.to_string().contains("remote-helper transport")); } #[tokio::test] diff --git a/crates/harness-tools/src/patch.rs b/crates/harness-tools/src/patch.rs index 84bd2e0..f5078fc 100644 --- a/crates/harness-tools/src/patch.rs +++ b/crates/harness-tools/src/patch.rs @@ -188,29 +188,95 @@ impl Tool for FsPatchTool { } } - // Phase 2: commit to disk. Order doesn't matter functionally - // — each path is independent — but we keep input order for - // a predictable summary. + // Phase 2: commit to disk. The contract is "atomic per call", + // so before touching anything we snapshot each target's prior + // state, then roll every committed entry back if any write or + // delete fails mid-batch (permission, ENOSPC, read-only path, + // …). Order doesn't matter functionally — each path is + // independent — but we keep input order for a predictable + // summary. + let mut snapshots: Vec> = Vec::with_capacity(planned.len()); for w in &planned { - match w.kind { - ChangeKind::Created | ChangeKind::Modified => { - if let Some(parent) = w.abs.parent() { - fs::create_dir_all(parent).await.map_err(|e| -> BoxError { - format!("mkdir for `{}`: {e}", w.rel).into() + // `None` means the file did not exist before this call; + // `Some(content)` is its prior content to restore on rollback. + let prior = fs::read_to_string(&w.abs).await.ok(); + snapshots.push(prior); + } + + let mut committed = 0usize; + let mut commit_err: Option = None; + for w in &planned { + let step: Result<(), BoxError> = async { + match w.kind { + ChangeKind::Created | ChangeKind::Modified => { + if let Some(parent) = w.abs.parent() { + fs::create_dir_all(parent).await.map_err(|e| -> BoxError { + format!("mkdir for `{}`: {e}", w.rel).into() + })?; + } + fs::write(&w.abs, w.new_text.as_deref().unwrap_or("")) + .await + .map_err(|e| -> BoxError { + format!("write `{}`: {e}", w.rel).into() + })?; + } + ChangeKind::Deleted => { + fs::remove_file(&w.abs).await.map_err(|e| -> BoxError { + format!("delete `{}`: {e}", w.rel).into() })?; } - fs::write(&w.abs, w.new_text.as_deref().unwrap_or("")) - .await - .map_err(|e| -> BoxError { format!("write `{}`: {e}", w.rel).into() })?; } - ChangeKind::Deleted => { - fs::remove_file(&w.abs) - .await - .map_err(|e| -> BoxError { format!("delete `{}`: {e}", w.rel).into() })?; + Ok(()) + } + .await; + + if let Err(e) = step { + commit_err = Some(e); + break; + } + committed += 1; + } + + if let Some(e) = commit_err { + // Roll back every entry we already committed, restoring its + // snapshot. Best-effort: collect restore failures so the + // returned error names both the original fault and any + // file we couldn't fully revert. + let mut restore_errs: Vec = Vec::new(); + for (w, prior) in planned[..committed].iter().zip(snapshots.iter()) { + let restore: Result<(), std::io::Error> = match prior { + Some(content) => { + if let Some(parent) = w.abs.parent() { + let _ = fs::create_dir_all(parent).await; + } + fs::write(&w.abs, content).await + } + None => match fs::remove_file(&w.abs).await { + Ok(()) => Ok(()), + // Already gone is fine. + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + }, + }; + if let Err(re) = restore { + restore_errs.push(format!("{}: {re}", w.rel)); } } - // Note every touched path so post-compaction reinjection - // can remind the agent which files it just changed. + return Err(if restore_errs.is_empty() { + format!("{e} (rolled back all changes; working tree unchanged)").into() + } else { + format!( + "{e} (rollback incomplete — could not restore: {})", + restore_errs.join(", ") + ) + .into() + }); + } + + // All writes succeeded — note every touched path so + // post-compaction reinjection can remind the agent which files + // it just changed. + for w in &planned { harness_core::note_working_file_relative_to(&w.abs, Some(&root)); } @@ -534,6 +600,43 @@ diff --git a/b.txt b/b.txt ); } + #[tokio::test] + async fn phase_two_io_failure_rolls_back_committed_writes() { + // Phase 1 (parse/apply) succeeds for both files; the disk-commit + // phase writes `a.txt` successfully, then fails creating + // `blocker/x.txt` because `blocker` is an existing regular file + // (so `create_dir_all` errors). The earlier write to `a.txt` + // must be rolled back to honour the "atomic per call" contract. + let dir = tempdir().unwrap(); + std::fs::write(dir.path().join("a.txt"), "alpha\n").unwrap(); + std::fs::write(dir.path().join("blocker"), "i am a file\n").unwrap(); + let tool = FsPatchTool::new(dir.path()); + let diff = "\ +diff --git a/a.txt b/a.txt +--- a/a.txt ++++ b/a.txt +@@ -1 +1 @@ +-alpha ++ALPHA +diff --git a/blocker/x.txt b/blocker/x.txt +--- /dev/null ++++ b/blocker/x.txt +@@ -0,0 +1 @@ ++nope +"; + let err = tool.invoke(json!({ "diff": diff })).await.unwrap_err(); + assert!(err.to_string().contains("rolled back"), "got: {err}"); + // a.txt must be restored to its pre-call content. + assert_eq!( + std::fs::read_to_string(dir.path().join("a.txt")).unwrap(), + "alpha\n", + "phase-2 atomicity violated: committed write not rolled back" + ); + // The blocker file is untouched and the nested write didn't land. + assert!(dir.path().join("blocker").is_file()); + assert!(!dir.path().join("blocker/x.txt").exists()); + } + #[tokio::test] async fn rejects_create_over_existing_file() { let dir = tempdir().unwrap();