Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 24 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,27 @@ impl App {

/// Cancel the current agent request and reset input mode
async fn cancel(&mut self) -> Result<()> {
// Cancel the primary agent's streaming
if let Some(agent_mutex) = self.agents.primary() {
agent_mutex.lock().await.cancel();
}

// Cancel the tool executor - stops active pipelines and clears pending
self.tool_executor.cancel();

// Clear the effect queue - drops pending effects and their responder channels.
// This signals cancellation to any tool executor pipelines waiting for responses.
self.effects.clear();

// Clear orphaned blocks from the stage (blocks that were awaiting approval)
let orphaned = self.chat.transcript.stage.drain_all();
if !orphaned.is_empty() {
tracing::debug!("Cleared {} orphaned blocks from stage on cancel", orphaned.len());
}

// Finish the current turn
self.chat.finish_turn(&mut self.terminal);

if let Err(e) = self.chat.transcript.save() {
tracing::error!("Failed to save transcript on cancel: {}", e);
}
Expand Down Expand Up @@ -551,7 +568,13 @@ impl App {
ToolDecision::Deny => Status::Denied,
_ => Status::Pending,
});
self.chat.transcript.start_block(block);
// Use try_start_block to avoid panic if turn was already finished (e.g., by cancel)
if !self.chat.transcript.try_start_block(block) {
tracing::warn!(
"decide_pending_tool: no active turn to add block for {}",
pending.call_id
);
}
}

// Convert decision to EffectResult and send to executor
Expand Down
119 changes: 119 additions & 0 deletions src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,15 @@ impl EffectQueue {
self.pending.push_back(effect);
}

/// Clear all pending effects (for cancellation).
///
/// Drops all pending effects, which closes their responder channels.
/// The tool executor will receive a channel closed error for any
/// effects that were waiting for a response.
pub fn clear(&mut self) {
self.pending.clear();
}

/// Check if there are any effects that can be polled.
///
/// An effect is pollable if:
Expand Down Expand Up @@ -612,4 +621,114 @@ mod tests {
// But approval call_2 should NOT be pollable
assert!(!queue.has_pollable());
}

// ========================================================================
// Cancellation Bug Tests
// ========================================================================

/// Test that EffectQueue::clear() properly clears all pending effects.
///
/// When the user cancels, pending effects should be cleared to avoid
/// orphaned effects trying to operate on a finished turn.
#[test]
fn test_effect_queue_clear_method() {
let mut queue = EffectQueue::new();

// Queue several effects
queue.push(make_approval_effect("call_1", "shell"));
queue.push(make_approval_effect("call_2", "read_file"));
queue.push(make_ide_effect("ide_1"));

// Acknowledge one
if let Some(effect) = queue.find_by_call_id_mut("call_1") {
effect.acknowledge();
}

// Verify state before clear
assert!(queue.has_pending_approvals());
assert!(queue.has_active_approval());

// Clear all effects (simulating cancel)
queue.clear();

// All effects should be gone
assert!(!queue.has_pending_approvals(), "All approvals should be cleared");
assert!(!queue.has_active_approval(), "Active approval should be cleared");
assert!(!queue.has_pollable(), "Nothing should be pollable");
assert!(queue.poll_next().is_none(), "poll_next should return None");
}

/// Test that effects remain pollable when clear() is not called.
///
/// This documents the behavior before clear() is called - effects
/// remain in the queue and can still be polled. The fix ensures
/// cancel() calls clear() to prevent this.
#[test]
fn test_effects_pollable_without_clear() {
let mut queue = EffectQueue::new();

// Queue an approval that hasn't been acknowledged yet
queue.push(make_approval_effect("pending_call", "shell"));

// Without calling clear(), the effect is still pollable
assert!(queue.has_pollable(), "Effect should be pollable when clear() not called");

let effect = queue.poll_next();
assert!(effect.is_some(), "Effect can still be polled when clear() not called");
}

/// Test that acknowledged approvals are also cleared by clear().
#[test]
fn test_acknowledged_approval_cleared_by_clear() {
let mut queue = EffectQueue::new();

queue.push(make_approval_effect("acknowledged_call", "shell"));

// Acknowledge it (simulating the approval UI being shown)
if let Some(effect) = queue.find_by_call_id_mut("acknowledged_call") {
effect.acknowledge();
}

// Verify it's acknowledged
assert!(queue.has_active_approval());

// Clear the queue
queue.clear();

// The acknowledged approval should be gone
assert!(!queue.has_active_approval(), "Acknowledged approval should be cleared");
}

/// Test that dropping PendingEffect drops the responder channel.
///
/// When effects are dropped (e.g., via a clear method), the responder
/// channel should be dropped, signaling cancellation to the executor.
#[test]
fn test_pending_effect_drop_behavior() {
let (tx, mut rx) = oneshot::channel::<EffectResult>();

// Create effect
let effect = PendingEffect::new(
"test_call".to_string(),
0,
Effect::AwaitApproval {
name: "test".to_string(),
params: serde_json::json!({}),
background: false,
},
tx,
);

// Drop the effect without completing it
drop(effect);

// The receiver should now get an error (channel closed)
// This is the correct behavior - the executor knows the effect was cancelled
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Closed) => {
// Good - channel was properly closed
}
_ => panic!("Expected channel to be closed when effect is dropped"),
}
}
}
4 changes: 3 additions & 1 deletion src/tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,10 @@ pub struct YourHandler {

#[async_trait::async_trait]
impl EffectHandler for YourHandler {
async fn call(self: Box<Self>) -> Step {
async fn call(self: Box<Self>, _cancel: CancellationToken) -> Step {
// Do the actual work
// For long-running operations, check cancel.is_cancelled() periodically
// or use tokio::select! with cancel.cancelled() to abort gracefully
match do_something(&self.param).await {
Ok(result) => Step::Output(result),
Err(e) => Step::Error(e.to_string()),
Expand Down
37 changes: 27 additions & 10 deletions src/tools/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::oneshot;
use crate::effect::EffectResult;
use crate::llm::AgentId;
use crate::transcript::Status;
use crate::tools::pipeline::{Effect, Step, ToolPipeline};
use crate::tools::pipeline::{CancellationSource, Effect, Step, ToolPipeline};
use crate::tools::ToolRegistry;

// =============================================================================
Expand Down Expand Up @@ -180,10 +180,13 @@ struct ActivePipeline {
background: bool,
/// Execution status
status: Status,
/// Cancellation source for this pipeline - call cancel() to stop running handlers
cancel_source: CancellationSource,
}

impl ActivePipeline {
fn new(tool_call: ToolCall, pipeline: ToolPipeline) -> Self {
let (cancel_source, _token) = CancellationSource::new();
Self {
agent_id: tool_call.agent_id,
call_id: tool_call.call_id,
Expand All @@ -195,6 +198,7 @@ impl ActivePipeline {
output: String::new(),
waiting: WaitingFor::Nothing,
status: Status::Running,
cancel_source,
}
}

Expand Down Expand Up @@ -268,12 +272,24 @@ impl ToolExecutor {
&mut self.tools
}

/// Cancel any active or pending tool execution
/// Cancel any active or pending tool execution.
/// This triggers cancellation tokens for running handlers, which will
/// cause them to abort gracefully (e.g., killing shell processes).
pub fn cancel(&mut self) {
self.cancelled = true;
self.pending.clear();
// Only clear non-background tasks
self.active.retain(|_, p| p.background && p.status != Status::Running);

// Trigger cancellation for all active non-background pipelines
// This signals running handlers to abort (e.g., kill child processes)
for (_, pipeline) in self.active.iter() {
if !pipeline.background || pipeline.status == Status::Running {
pipeline.cancel_source.cancel();
}
}

// Remove non-background tasks from tracking
self.active
.retain(|_, p| p.background && p.status != Status::Running);
}

pub fn enqueue(&mut self, tool_calls: Vec<ToolCall>) {
Expand Down Expand Up @@ -578,19 +594,19 @@ impl ToolExecutor {
/// Execute next handler in a specific pipeline.
/// Spawns the handler in a separate task to avoid losing state if dropped.
fn execute_step(&mut self, call_id: &str) -> Option<ToolEvent> {
// Get the handler to execute
let handler = {
// Get the handler and cancellation token to execute
let (handler, cancel_token) = {
let active = self.active.get_mut(call_id)?;
match active.pipeline.pop() {
Some(h) => h,
Some(h) => (h, active.cancel_source.token()),
None => {
// Pipeline complete - finally handlers have run
// For denied/errored pipelines, we already emitted the event, just cleanup
if active.status == Status::Denied || active.status == Status::Error {
self.active.remove(call_id);
return None;
}

if active.background {
active.set_complete();
return Some(ToolEvent::BackgroundCompleted {
Expand All @@ -608,12 +624,13 @@ impl ToolExecutor {

// Spawn handler in separate task so it won't be lost if our future is dropped.
// The result will be polled via WaitingFor::Handler in poll_waiting().
// Pass the cancellation token so handlers can abort gracefully on cancel.
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let step = handler.call().await;
let step = handler.call(cancel_token).await;
let _ = tx.send(step);
});

let active = self.active.get_mut(call_id)?;
active.waiting = WaitingFor::Handler(rx);
None
Expand Down
Loading