From 6ef884511e5d59d905ace4d4099aa5c76f621681 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Feb 2026 22:59:22 +0000 Subject: [PATCH 1/3] Fix task cancellation bugs with orphaned blocks and panics This commit fixes several bugs that could occur when cancelling a task: 1. **Effect queue not cleared on cancel**: Pending effects remained in the queue after cancel, which could lead to effects being processed after the turn was already finished. 2. **Stage not cleared on cancel**: Blocks that were in the Stage awaiting approval remained orphaned after cancel. 3. **Tool executor not cancelled**: The cancel() function only cancelled the agent's streaming, not the tool executor's active pipelines. 4. **Panic in start_block after finish_turn**: If decide_pending_tool was called after the turn was finished (e.g., due to race conditions), start_block would panic with "No active turn". Changes: - Add EffectQueue::clear() method to drop all pending effects - Add Transcript::has_active_turn() method to check turn state - Add Transcript::try_start_block() method that returns false instead of panicking when no turn is active - Update App::cancel() to: - Cancel the tool executor - Clear the effect queue - Drain orphaned blocks from the stage - Update decide_pending_tool() to use try_start_block() instead of start_block() for defensive safety - Add comprehensive tests for cancellation scenarios https://claude.ai/code/session_0198d69SpWL6JAZo8nu3fpMN --- src/app.rs | 25 +++++- src/effect.rs | 119 ++++++++++++++++++++++++++++ src/transcript.rs | 193 ++++++++++++++++++++++++++++++++++++++++++++++ src/ui/chat.rs | 1 + 4 files changed, 337 insertions(+), 1 deletion(-) diff --git a/src/app.rs b/src/app.rs index 273f24d..b7db9e0 100644 --- a/src/app.rs +++ b/src/app.rs @@ -407,10 +407,27 @@ impl App { /// Cancel the current agent request and reset input mode async fn cancel(&mut self) -> Result<()> { + // Cancel the primary agent's streaming if let Some(agent_mutex) = self.agents.primary() { agent_mutex.lock().await.cancel(); } + + // Cancel the tool executor - stops active pipelines and clears pending + self.tool_executor.cancel(); + + // Clear the effect queue - drops pending effects and their responder channels. + // This signals cancellation to any tool executor pipelines waiting for responses. + self.effects.clear(); + + // Clear orphaned blocks from the stage (blocks that were awaiting approval) + let orphaned = self.chat.transcript.stage.drain_all(); + if !orphaned.is_empty() { + tracing::debug!("Cleared {} orphaned blocks from stage on cancel", orphaned.len()); + } + + // Finish the current turn self.chat.finish_turn(&mut self.terminal); + if let Err(e) = self.chat.transcript.save() { tracing::error!("Failed to save transcript on cancel: {}", e); } @@ -551,7 +568,13 @@ impl App { ToolDecision::Deny => Status::Denied, _ => Status::Pending, }); - self.chat.transcript.start_block(block); + // Use try_start_block to avoid panic if turn was already finished (e.g., by cancel) + if !self.chat.transcript.try_start_block(block) { + tracing::warn!( + "decide_pending_tool: no active turn to add block for {}", + pending.call_id + ); + } } // Convert decision to EffectResult and send to executor diff --git a/src/effect.rs b/src/effect.rs index 5b1c5ff..5120487 100644 --- a/src/effect.rs +++ b/src/effect.rs @@ -240,6 +240,15 @@ impl EffectQueue { self.pending.push_back(effect); } + /// Clear all pending effects (for cancellation). + /// + /// Drops all pending effects, which closes their responder channels. + /// The tool executor will receive a channel closed error for any + /// effects that were waiting for a response. + pub fn clear(&mut self) { + self.pending.clear(); + } + /// Check if there are any effects that can be polled. /// /// An effect is pollable if: @@ -612,4 +621,114 @@ mod tests { // But approval call_2 should NOT be pollable assert!(!queue.has_pollable()); } + + // ======================================================================== + // Cancellation Bug Tests + // ======================================================================== + + /// Test that EffectQueue::clear() properly clears all pending effects. + /// + /// When the user cancels, pending effects should be cleared to avoid + /// orphaned effects trying to operate on a finished turn. + #[test] + fn test_effect_queue_clear_method() { + let mut queue = EffectQueue::new(); + + // Queue several effects + queue.push(make_approval_effect("call_1", "shell")); + queue.push(make_approval_effect("call_2", "read_file")); + queue.push(make_ide_effect("ide_1")); + + // Acknowledge one + if let Some(effect) = queue.find_by_call_id_mut("call_1") { + effect.acknowledge(); + } + + // Verify state before clear + assert!(queue.has_pending_approvals()); + assert!(queue.has_active_approval()); + + // Clear all effects (simulating cancel) + queue.clear(); + + // All effects should be gone + assert!(!queue.has_pending_approvals(), "All approvals should be cleared"); + assert!(!queue.has_active_approval(), "Active approval should be cleared"); + assert!(!queue.has_pollable(), "Nothing should be pollable"); + assert!(queue.poll_next().is_none(), "poll_next should return None"); + } + + /// Test that effects remain pollable when clear() is not called. + /// + /// This documents the behavior before clear() is called - effects + /// remain in the queue and can still be polled. The fix ensures + /// cancel() calls clear() to prevent this. + #[test] + fn test_effects_pollable_without_clear() { + let mut queue = EffectQueue::new(); + + // Queue an approval that hasn't been acknowledged yet + queue.push(make_approval_effect("pending_call", "shell")); + + // Without calling clear(), the effect is still pollable + assert!(queue.has_pollable(), "Effect should be pollable when clear() not called"); + + let effect = queue.poll_next(); + assert!(effect.is_some(), "Effect can still be polled when clear() not called"); + } + + /// Test that acknowledged approvals are also cleared by clear(). + #[test] + fn test_acknowledged_approval_cleared_by_clear() { + let mut queue = EffectQueue::new(); + + queue.push(make_approval_effect("acknowledged_call", "shell")); + + // Acknowledge it (simulating the approval UI being shown) + if let Some(effect) = queue.find_by_call_id_mut("acknowledged_call") { + effect.acknowledge(); + } + + // Verify it's acknowledged + assert!(queue.has_active_approval()); + + // Clear the queue + queue.clear(); + + // The acknowledged approval should be gone + assert!(!queue.has_active_approval(), "Acknowledged approval should be cleared"); + } + + /// Test that dropping PendingEffect drops the responder channel. + /// + /// When effects are dropped (e.g., via a clear method), the responder + /// channel should be dropped, signaling cancellation to the executor. + #[test] + fn test_pending_effect_drop_behavior() { + let (tx, mut rx) = oneshot::channel::(); + + // Create effect + let effect = PendingEffect::new( + "test_call".to_string(), + 0, + Effect::AwaitApproval { + name: "test".to_string(), + params: serde_json::json!({}), + background: false, + }, + tx, + ); + + // Drop the effect without completing it + drop(effect); + + // The receiver should now get an error (channel closed) + // This is the correct behavior - the executor knows the effect was cancelled + match rx.try_recv() { + Err(oneshot::error::TryRecvError::Closed) => { + // Good - channel was properly closed + } + _ => panic!("Expected channel to be closed when effect is dropped"), + } + } } diff --git a/src/transcript.rs b/src/transcript.rs index f63ba47..56a0421 100644 --- a/src/transcript.rs +++ b/src/transcript.rs @@ -815,6 +815,11 @@ impl Transcript { self.current_turn_id = None; } + /// Check if there's an active turn. + pub fn has_active_turn(&self) -> bool { + self.current_turn_id.is_some() + } + /// Get mutable reference to the current turn. Panics if no turn is active. fn current_turn_mut(&mut self) -> &mut Turn { let turn_id = self @@ -849,6 +854,20 @@ impl Transcript { self.current_turn_mut().start_block(block); } + /// Try to start a new block on the current turn. + /// Returns false if no turn is active (block is dropped). + /// + /// Use this when the turn may have been cancelled before the block was ready. + pub fn try_start_block(&mut self, block: Box) -> bool { + if let Some(turn_id) = self.current_turn_id { + if let Some(turn) = self.get_mut(turn_id) { + turn.start_block(block); + return true; + } + } + false + } + /// Get mutable reference to the active block. pub fn active_block_mut(&mut self) -> Option<&mut (dyn Block + 'static)> { let turn_id = self.current_turn_id?; @@ -1306,4 +1325,178 @@ mod tests { let drained = stage.drain_all(); assert!(drained.is_empty()); } + + // ======================================================================== + // Cancellation Bug Tests + // ======================================================================== + + /// BUG TEST: Calling start_block after finish_turn should not panic. + /// + /// This reproduces a bug where cancelling during tool approval leaves + /// orphaned blocks in the Stage. When those blocks are later processed + /// (e.g., by decide_pending_tool), start_block is called but panics + /// because there's no active turn. + /// + /// Expected behavior: start_block should either: + /// 1. Create a new turn automatically, or + /// 2. Return an error/Option instead of panicking + #[test] + #[should_panic(expected = "No active turn")] + fn test_start_block_after_finish_turn_panics() { + let mut transcript = Transcript::with_path(std::path::PathBuf::from("/tmp/test_panic.md")); + + // Simulate normal flow: begin turn, add some content + transcript.begin_turn(Role::Assistant); + transcript.stream_delta(BlockType::Text, "Hello"); + + // Now finish the turn (simulating cancel()) + transcript.finish_turn(); + + // This simulates what happens when decide_pending_tool tries to + // promote a block from Stage to Transcript after cancel + let block = ToolBlock::new("orphan_call", "shell", serde_json::json!({}), false); + + // BUG: This panics because current_turn_id is None + transcript.start_block(Box::new(block)); + } + + /// BUG TEST: Simulates the full cancellation scenario with Stage. + /// + /// Flow: + /// 1. Begin turn for assistant + /// 2. Tool block is added to Stage (awaiting approval) + /// 3. Turn is finished (cancel was triggered) + /// 4. Block remains in Stage (orphaned) + /// 5. Later, trying to promote block to transcript panics + #[test] + #[should_panic(expected = "No active turn")] + fn test_cancel_leaves_orphaned_blocks_in_stage() { + let mut transcript = Transcript::with_path(std::path::PathBuf::from("/tmp/test_orphan.md")); + + // Step 1: Begin turn + transcript.begin_turn(Role::Assistant); + transcript.stream_delta(BlockType::Text, "Let me run a command"); + + // Step 2: Tool block added to Stage (simulating acknowledge_approval) + let block = ToolBlock::new("call_1", "shell", serde_json::json!({"command": "ls"}), false); + transcript.stage.push(Box::new(block)); + + // Step 3: Turn is finished (cancel triggered before approval decision) + transcript.finish_turn(); + + // Verify block is still in stage (orphaned) + assert!(!transcript.stage.is_empty(), "Block should remain in stage after cancel"); + assert!(transcript.stage.find_by_call_id("call_1").is_some()); + + // Step 4: Later, something tries to promote the block (e.g., decide_pending_tool) + // This is what would happen if the effect is processed after cancel + if let Some(mut block) = transcript.stage.remove_by_call_id("call_1") { + block.set_status(Status::Running); + // BUG: This panics because current_turn_id is None + transcript.start_block(block); + } + } + + /// Test has_active_turn() method. + #[test] + fn test_has_active_turn() { + let mut transcript = Transcript::with_path(std::path::PathBuf::from("/tmp/test_has_active.md")); + + // No turn started + assert!(!transcript.has_active_turn()); + + // Begin turn + transcript.begin_turn(Role::Assistant); + assert!(transcript.has_active_turn()); + + // Finish turn + transcript.finish_turn(); + assert!(!transcript.has_active_turn()); + } + + /// Test try_start_block() returns false when no turn is active. + /// + /// This is the safe alternative to start_block() that doesn't panic. + #[test] + fn test_try_start_block_no_active_turn() { + let mut transcript = Transcript::with_path(std::path::PathBuf::from("/tmp/test_try.md")); + + // No turn started + let block = ToolBlock::new("call_1", "shell", serde_json::json!({}), false); + let result = transcript.try_start_block(Box::new(block)); + assert!(!result, "try_start_block should return false when no active turn"); + assert_eq!(transcript.turns().len(), 0, "No turn should be created"); + } + + /// Test try_start_block() returns true and adds block when turn is active. + #[test] + fn test_try_start_block_with_active_turn() { + let mut transcript = Transcript::with_path(std::path::PathBuf::from("/tmp/test_try_ok.md")); + + transcript.begin_turn(Role::Assistant); + + let block = ToolBlock::new("call_1", "shell", serde_json::json!({}), false); + let result = transcript.try_start_block(Box::new(block)); + assert!(result, "try_start_block should return true when turn is active"); + assert_eq!(transcript.turns().len(), 1); + assert_eq!(transcript.turns()[0].content.len(), 1); + } + + /// Test safe cancellation flow using try_start_block. + /// + /// This demonstrates the correct way to handle blocks after cancel. + #[test] + fn test_safe_cancel_with_try_start_block() { + let mut transcript = Transcript::with_path(std::path::PathBuf::from("/tmp/test_safe.md")); + + // Begin turn + transcript.begin_turn(Role::Assistant); + transcript.stream_delta(BlockType::Text, "Let me run a command"); + + // Tool block added to Stage + let block = ToolBlock::new("call_1", "shell", serde_json::json!({}), false); + transcript.stage.push(Box::new(block)); + + // Cancel - finish turn + transcript.finish_turn(); + + // Try to promote block safely + if let Some(block) = transcript.stage.remove_by_call_id("call_1") { + // Use try_start_block instead of start_block - no panic! + let added = transcript.try_start_block(block); + assert!(!added, "Block should not be added when no active turn"); + } + } + + /// Test stage drain_all during cancel flow. + /// + /// The proper fix is to drain the stage during cancel to avoid orphaned blocks. + #[test] + fn test_stage_drain_during_cancel() { + let mut transcript = Transcript::with_path(std::path::PathBuf::from("/tmp/test_drain.md")); + + transcript.begin_turn(Role::Assistant); + + // Multiple blocks added to Stage + for i in 0..3 { + let block = ToolBlock::new( + &format!("call_{}", i), + "shell", + serde_json::json!({"command": format!("echo {}", i)}), + false, + ); + transcript.stage.push(Box::new(block)); + } + + // Proper cancel: drain stage before finishing turn + let orphaned = transcript.stage.drain_all(); + assert_eq!(orphaned.len(), 3); + assert!(transcript.stage.is_empty()); + + // Now finish turn safely + transcript.finish_turn(); + + // No orphaned blocks remain + assert!(transcript.stage.is_empty()); + } } diff --git a/src/ui/chat.rs b/src/ui/chat.rs index fea57f6..9525161 100644 --- a/src/ui/chat.rs +++ b/src/ui/chat.rs @@ -94,6 +94,7 @@ impl ChatView { } /// Start a new block in the current turn and render + #[allow(dead_code)] pub fn start_block( &mut self, block: Box, From d24b36b3494a3aa7be6bbe31a7f6c8d38f85df24 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Feb 2026 22:59:59 +0000 Subject: [PATCH 2/3] Update Cargo.lock https://claude.ai/code/session_0198d69SpWL6JAZo8nu3fpMN --- Cargo.lock | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2db172..e60ca8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5239,7 +5239,3 @@ name = "zmij" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" - -[[patch.unused]] -name = "ratatui-core" -version = "0.1.0-beta.0" From d8a10ddcdf7c1c2b6d3cb2ce62d793447b138d59 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Feb 2026 23:54:00 +0000 Subject: [PATCH 3/3] Add proper shell process killing on cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user presses Esc to cancel a running tool, the shell process is now properly killed instead of being left running in the background. Changes: - Add CancellationToken and CancellationSource types to pipeline.rs for cooperative cancellation between the executor and handlers - Update EffectHandler trait to accept CancellationToken parameter - Update all handler implementations to accept the token (most ignore it) - Update Shell handler to pass the token to execute_shell - Update execute_shell in io.rs to use tokio::select! to monitor both the child process and the cancellation token, killing the process when cancellation is requested - Update ToolExecutor to create cancellation sources for each pipeline and trigger them in cancel() - Update README.md with new handler signature The cancellation flow: 1. User presses Esc → App::cancel() is called 2. App calls tool_executor.cancel() 3. Executor triggers cancel_source.cancel() for all active pipelines 4. Running handlers receive the signal via their CancellationToken 5. Shell handler's execute_shell uses select! to detect cancellation 6. Child process is killed with child.kill() https://claude.ai/code/session_0198d69SpWL6JAZo8nu3fpMN --- src/tools/README.md | 4 +- src/tools/exec.rs | 37 ++++++++++---- src/tools/handlers.rs | 58 ++++++++++++---------- src/tools/impls/edit_file.rs | 4 +- src/tools/impls/spawn_agent.rs | 4 +- src/tools/io.rs | 90 ++++++++++++++++++++++------------ src/tools/pipeline.rs | 63 +++++++++++++++++++++++- 7 files changed, 187 insertions(+), 73 deletions(-) diff --git a/src/tools/README.md b/src/tools/README.md index 0a2769f..503b771 100644 --- a/src/tools/README.md +++ b/src/tools/README.md @@ -227,8 +227,10 @@ pub struct YourHandler { #[async_trait::async_trait] impl EffectHandler for YourHandler { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { // Do the actual work + // For long-running operations, check cancel.is_cancelled() periodically + // or use tokio::select! with cancel.cancelled() to abort gracefully match do_something(&self.param).await { Ok(result) => Step::Output(result), Err(e) => Step::Error(e.to_string()), diff --git a/src/tools/exec.rs b/src/tools/exec.rs index bae0e29..61eb155 100644 --- a/src/tools/exec.rs +++ b/src/tools/exec.rs @@ -14,7 +14,7 @@ use tokio::sync::oneshot; use crate::effect::EffectResult; use crate::llm::AgentId; use crate::transcript::Status; -use crate::tools::pipeline::{Effect, Step, ToolPipeline}; +use crate::tools::pipeline::{CancellationSource, Effect, Step, ToolPipeline}; use crate::tools::ToolRegistry; // ============================================================================= @@ -180,10 +180,13 @@ struct ActivePipeline { background: bool, /// Execution status status: Status, + /// Cancellation source for this pipeline - call cancel() to stop running handlers + cancel_source: CancellationSource, } impl ActivePipeline { fn new(tool_call: ToolCall, pipeline: ToolPipeline) -> Self { + let (cancel_source, _token) = CancellationSource::new(); Self { agent_id: tool_call.agent_id, call_id: tool_call.call_id, @@ -195,6 +198,7 @@ impl ActivePipeline { output: String::new(), waiting: WaitingFor::Nothing, status: Status::Running, + cancel_source, } } @@ -268,12 +272,24 @@ impl ToolExecutor { &mut self.tools } - /// Cancel any active or pending tool execution + /// Cancel any active or pending tool execution. + /// This triggers cancellation tokens for running handlers, which will + /// cause them to abort gracefully (e.g., killing shell processes). pub fn cancel(&mut self) { self.cancelled = true; self.pending.clear(); - // Only clear non-background tasks - self.active.retain(|_, p| p.background && p.status != Status::Running); + + // Trigger cancellation for all active non-background pipelines + // This signals running handlers to abort (e.g., kill child processes) + for (_, pipeline) in self.active.iter() { + if !pipeline.background || pipeline.status == Status::Running { + pipeline.cancel_source.cancel(); + } + } + + // Remove non-background tasks from tracking + self.active + .retain(|_, p| p.background && p.status != Status::Running); } pub fn enqueue(&mut self, tool_calls: Vec) { @@ -578,11 +594,11 @@ impl ToolExecutor { /// Execute next handler in a specific pipeline. /// Spawns the handler in a separate task to avoid losing state if dropped. fn execute_step(&mut self, call_id: &str) -> Option { - // Get the handler to execute - let handler = { + // Get the handler and cancellation token to execute + let (handler, cancel_token) = { let active = self.active.get_mut(call_id)?; match active.pipeline.pop() { - Some(h) => h, + Some(h) => (h, active.cancel_source.token()), None => { // Pipeline complete - finally handlers have run // For denied/errored pipelines, we already emitted the event, just cleanup @@ -590,7 +606,7 @@ impl ToolExecutor { self.active.remove(call_id); return None; } - + if active.background { active.set_complete(); return Some(ToolEvent::BackgroundCompleted { @@ -608,12 +624,13 @@ impl ToolExecutor { // Spawn handler in separate task so it won't be lost if our future is dropped. // The result will be polled via WaitingFor::Handler in poll_waiting(). + // Pass the cancellation token so handlers can abort gracefully on cancel. let (tx, rx) = oneshot::channel(); tokio::spawn(async move { - let step = handler.call().await; + let step = handler.call(cancel_token).await; let _ = tx.send(step); }); - + let active = self.active.get_mut(call_id)?; active.waiting = WaitingFor::Handler(rx); None diff --git a/src/tools/handlers.rs b/src/tools/handlers.rs index 8e613f4..4335d6d 100644 --- a/src/tools/handlers.rs +++ b/src/tools/handlers.rs @@ -6,7 +6,7 @@ use super::browser; use crate::ide::{Edit, ToolPreview}; use crate::tools::io; -use crate::tools::pipeline::{Effect, EffectHandler, Step}; +use crate::tools::pipeline::{CancellationToken, Effect, EffectHandler, Step}; use std::fs; use std::path::PathBuf; @@ -21,7 +21,7 @@ pub struct ValidateFile { #[async_trait::async_trait] impl EffectHandler for ValidateFile { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { match fs::metadata(&self.path) { Ok(m) if m.is_file() => Step::Continue, Ok(_) => Step::Error(format!("Not a file: {}", self.path.display())), @@ -40,7 +40,7 @@ pub struct ValidateFileWritable { #[async_trait::async_trait] impl EffectHandler for ValidateFileWritable { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { // Check if we can open the file for writing match fs::OpenOptions::new().write(true).open(&self.path) { Ok(_) => Step::Continue, @@ -59,7 +59,7 @@ pub struct ValidateNoUnsavedEdits { #[async_trait::async_trait] impl EffectHandler for ValidateNoUnsavedEdits { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::IdeCheckUnsavedEdits { path: self.path }) } } @@ -72,7 +72,7 @@ pub struct ValidateFileNotExists { #[async_trait::async_trait] impl EffectHandler for ValidateFileNotExists { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { if self.path.exists() { Step::Error(self.message) } else { @@ -89,7 +89,7 @@ pub struct ApplyEdits { #[async_trait::async_trait] impl EffectHandler for ApplyEdits { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { let mut content = match fs::read_to_string(&self.path) { Ok(c) => c, Err(e) => return Step::Error(format!("Failed to read file: {}", e)), @@ -119,7 +119,7 @@ pub struct AwaitApproval; #[async_trait::async_trait] impl EffectHandler for AwaitApproval { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::AwaitApproval } } @@ -131,7 +131,7 @@ pub struct Output { #[async_trait::async_trait] impl EffectHandler for Output { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Output(self.content) } } @@ -143,7 +143,7 @@ pub struct Delta { #[async_trait::async_trait] impl EffectHandler for Delta { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delta(self.content) } } @@ -155,7 +155,7 @@ pub struct Error { #[async_trait::async_trait] impl EffectHandler for Error { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Error(self.message) } } @@ -173,7 +173,7 @@ pub struct ReadFile { #[async_trait::async_trait] impl EffectHandler for ReadFile { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { match io::read_file(&self.path, self.start_line, self.end_line) { Ok(content) => Step::Output(content), Err(e) => Step::Error(e), @@ -189,7 +189,7 @@ pub struct WriteFile { #[async_trait::async_trait] impl EffectHandler for WriteFile { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { // Create parent directories if needed if let Some(parent) = self.path.parent() { if !parent.exists() { @@ -222,8 +222,14 @@ pub struct Shell { #[async_trait::async_trait] impl EffectHandler for Shell { - async fn call(self: Box) -> Step { - match io::execute_shell(&self.command, self.working_dir.as_deref(), self.timeout_secs).await + async fn call(self: Box, cancel: CancellationToken) -> Step { + match io::execute_shell( + &self.command, + self.working_dir.as_deref(), + self.timeout_secs, + cancel, + ) + .await { Ok(result) if result.success => Step::Output(result.output), Ok(result) => Step::Output(result.output), // Still output, but includes exit code @@ -244,7 +250,7 @@ pub struct FetchUrl { #[async_trait::async_trait] impl EffectHandler for FetchUrl { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { match io::fetch_url(&self.url, self.max_length).await { Ok(result) => { let header = format!( @@ -266,7 +272,7 @@ pub struct WebSearch { #[async_trait::async_trait] impl EffectHandler for WebSearch { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { match io::web_search(&self.query, self.count).await { Ok(results) => { if results.is_empty() { @@ -299,7 +305,7 @@ pub struct IdeOpen { #[async_trait::async_trait] impl EffectHandler for IdeOpen { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::IdeOpen { path: self.path, line: self.line, @@ -315,7 +321,7 @@ pub struct IdeShowPreview { #[async_trait::async_trait] impl EffectHandler for IdeShowPreview { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::IdeShowPreview { preview: self.preview, }) @@ -330,7 +336,7 @@ pub struct IdeShowDiffPreview { #[async_trait::async_trait] impl EffectHandler for IdeShowDiffPreview { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::IdeShowDiffPreview { path: self.path, edits: self.edits, @@ -345,7 +351,7 @@ pub struct IdeReloadBuffer { #[async_trait::async_trait] impl EffectHandler for IdeReloadBuffer { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::IdeReloadBuffer { path: self.path }) } } @@ -355,7 +361,7 @@ pub struct IdeClosePreview; #[async_trait::async_trait] impl EffectHandler for IdeClosePreview { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::IdeClosePreview) } } @@ -369,7 +375,7 @@ pub struct ListBackgroundTasks; #[async_trait::async_trait] impl EffectHandler for ListBackgroundTasks { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::ListBackgroundTasks) } } @@ -381,7 +387,7 @@ pub struct GetBackgroundTask { #[async_trait::async_trait] impl EffectHandler for GetBackgroundTask { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::GetBackgroundTask { task_id: self.task_id, }) @@ -397,7 +403,7 @@ pub struct ListAgents; #[async_trait::async_trait] impl EffectHandler for ListAgents { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::ListAgents) } } @@ -409,7 +415,7 @@ pub struct GetAgent { #[async_trait::async_trait] impl EffectHandler for GetAgent { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { Step::Delegate(Effect::GetAgent { label: self.label, }) @@ -428,7 +434,7 @@ pub struct FetchHtml { #[async_trait::async_trait] impl EffectHandler for FetchHtml { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { match browser::fetch_html(&self.url, self.max_length).await { Ok(result) => { let title_info = result diff --git a/src/tools/impls/edit_file.rs b/src/tools/impls/edit_file.rs index 80a6124..e3f6424 100644 --- a/src/tools/impls/edit_file.rs +++ b/src/tools/impls/edit_file.rs @@ -27,7 +27,7 @@ use serde_json::json; use super::{handlers, Tool, ToolPipeline}; use crate::ide::Edit; use crate::impl_tool_block; -use crate::tools::pipeline::{EffectHandler, Step}; +use crate::tools::pipeline::{CancellationToken, EffectHandler, Step}; use crate::transcript::{ render_agent_label, render_approval_prompt, render_prefix, render_result, Block, BlockType, Status, ToolBlock, }; @@ -44,7 +44,7 @@ pub struct ValidateEdits { #[async_trait::async_trait] impl EffectHandler for ValidateEdits { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { let content = match std::fs::read_to_string(&self.path) { Ok(c) => c, Err(e) => return Step::Error(format!("Failed to read file: {}", e)), diff --git a/src/tools/impls/spawn_agent.rs b/src/tools/impls/spawn_agent.rs index ad859f6..ef8b8f6 100644 --- a/src/tools/impls/spawn_agent.rs +++ b/src/tools/impls/spawn_agent.rs @@ -8,7 +8,7 @@ use crate::config::AgentRuntimeConfig; use crate::impl_tool_block; use crate::llm::{Agent, RequestMode}; use crate::prompts::SUB_AGENT_PROMPT; -use crate::tools::pipeline::{Effect, EffectHandler, Step}; +use crate::tools::pipeline::{CancellationToken, Effect, EffectHandler, Step}; use crate::tools::ToolRegistry; use crate::transcript::{render_approval_prompt, render_prefix, Block, BlockType, Status, ToolBlock}; use ratatui::{ @@ -238,7 +238,7 @@ struct SpawnAgentHandler { #[async_trait::async_trait] impl EffectHandler for SpawnAgentHandler { - async fn call(self: Box) -> Step { + async fn call(self: Box, _cancel: CancellationToken) -> Step { // Get the agent context (initialized at app startup) let ctx = match agent_context() { Some(c) => c, diff --git a/src/tools/io.rs b/src/tools/io.rs index d469ad4..13618a6 100644 --- a/src/tools/io.rs +++ b/src/tools/io.rs @@ -10,6 +10,8 @@ use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; +use crate::tools::pipeline::CancellationToken; + /// Result of a shell command #[derive(Debug)] pub struct ShellResult { @@ -112,11 +114,15 @@ pub fn read_file( Ok(output) } -/// Execute a shell command +/// Execute a shell command with cancellation support. +/// +/// When the cancellation token is triggered (e.g., user pressed Esc), +/// the child process is killed and an error is returned. pub async fn execute_shell( command: &str, working_dir: Option<&str>, timeout_secs: u64, + mut cancel: CancellationToken, ) -> Result { let mut cmd = Command::new("bash"); cmd.arg("-c").arg(command); @@ -139,42 +145,64 @@ pub async fn execute_shell( let stdout = child.stdout.take(); let stderr = child.stderr.take(); - let mut collected = String::new(); - - if let Some(stdout) = stdout { - let mut reader = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = reader.next_line().await { - collected.push_str(&line); - collected.push('\n'); + // Spawn tasks to read stdout and stderr concurrently + let stdout_handle = tokio::spawn(async move { + let mut collected = String::new(); + if let Some(stdout) = stdout { + let mut reader = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = reader.next_line().await { + collected.push_str(&line); + collected.push('\n'); + } } - } - - let mut stderr_output = String::new(); - if let Some(stderr) = stderr { - let mut reader = BufReader::new(stderr).lines(); - while let Ok(Some(line)) = reader.next_line().await { - stderr_output.push_str(&line); - stderr_output.push('\n'); + collected + }); + + let stderr_handle = tokio::spawn(async move { + let mut collected = String::new(); + if let Some(stderr) = stderr { + let mut reader = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = reader.next_line().await { + collected.push_str(&line); + collected.push('\n'); + } } - } - - let status = match tokio::time::timeout( - std::time::Duration::from_secs(timeout_secs), - child.wait(), - ) - .await - { - Ok(Ok(status)) => status, - Ok(Err(e)) => return Err(format!("Wait failed: {}", e)), - Err(_) => { + collected + }); + + // Wait for process with timeout and cancellation + let timeout = std::time::Duration::from_secs(timeout_secs); + let status = tokio::select! { + // Cancellation requested + _ = cancel.cancelled() => { + tracing::debug!("Shell command cancelled, killing process"); + let _ = child.kill().await; + // Abort the reader tasks + stdout_handle.abort(); + stderr_handle.abort(); + return Err("Command cancelled by user".to_string()); + } + // Timeout + _ = tokio::time::sleep(timeout) => { + tracing::debug!("Shell command timed out, killing process"); let _ = child.kill().await; - return Err(format!( - "Command timed out after {} seconds", - timeout_secs - )); + stdout_handle.abort(); + stderr_handle.abort(); + return Err(format!("Command timed out after {} seconds", timeout_secs)); + } + // Process completed + result = child.wait() => { + match result { + Ok(status) => status, + Err(e) => return Err(format!("Wait failed: {}", e)), + } } }; + // Collect output from reader tasks + let collected = stdout_handle.await.unwrap_or_default(); + let stderr_output = stderr_handle.await.unwrap_or_default(); + let exit_code = status.code().unwrap_or(-1); let mut output = collected; diff --git a/src/tools/pipeline.rs b/src/tools/pipeline.rs index 661a098..6261309 100644 --- a/src/tools/pipeline.rs +++ b/src/tools/pipeline.rs @@ -15,9 +15,66 @@ #[cfg(feature = "cli")] use crate::transcript::Block; use std::collections::VecDeque; +use tokio::sync::watch; pub use crate::effect::Effect; +/// A token that can be used to signal cancellation to running handlers. +/// Clone this to pass to handlers; call `cancel()` on the source to trigger. +#[derive(Clone)] +pub struct CancellationToken { + receiver: watch::Receiver, +} + +impl CancellationToken { + /// Check if cancellation has been requested. + pub fn is_cancelled(&self) -> bool { + *self.receiver.borrow() + } + + /// Wait until cancellation is requested. + pub async fn cancelled(&mut self) { + // Wait for the value to change to true + while !*self.receiver.borrow() { + if self.receiver.changed().await.is_err() { + // Sender dropped - treat as cancelled + return; + } + } + } +} + +/// Source for creating cancellation tokens. Call `cancel()` to trigger. +pub struct CancellationSource { + sender: watch::Sender, +} + +impl CancellationSource { + /// Create a new cancellation source and initial token. + pub fn new() -> (Self, CancellationToken) { + let (sender, receiver) = watch::channel(false); + (Self { sender }, CancellationToken { receiver }) + } + + /// Signal cancellation to all tokens created from this source. + pub fn cancel(&self) { + let _ = self.sender.send(true); + } + + /// Create a new token from this source. + pub fn token(&self) -> CancellationToken { + CancellationToken { + receiver: self.sender.subscribe(), + } + } +} + +impl Default for CancellationSource { + fn default() -> Self { + Self::new().0 + } +} + /// Result of calling an effect handler pub enum Step { /// Continue to next effect @@ -37,7 +94,11 @@ pub enum Step { /// Trait for effect handlers - each effect knows how to execute itself #[async_trait::async_trait] pub trait EffectHandler: Send { - async fn call(self: Box) -> Step; + /// Execute the handler. The cancellation token can be used to detect + /// when the user has requested cancellation (e.g., pressed Esc). + /// Handlers should check `token.is_cancelled()` periodically or use + /// `token.cancelled()` in a `select!` to abort gracefully. + async fn call(self: Box, cancel: CancellationToken) -> Step; } /// When an effect should run