Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 193 additions & 1 deletion crates/harness-core/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ impl Agent {
);

if !parallel {
for call in tool_calls {
for (call_idx, call) in tool_calls.iter().enumerate() {
// Decide whether this call goes through the
// approver. We check the trait flag inline
// so that the `ApprovalRequest` event lands
Expand Down Expand Up @@ -908,6 +908,38 @@ impl Agent {
.map(|t| t.is_terminal())
.unwrap_or(false);
if is_terminal {
// The model may have emitted more tool
// calls *after* the terminal one in the
// same batch. We skip executing them, but
// every `tool_call` in the assistant
// message still needs a matching
// `tool_result` or the persisted
// conversation is unresumable (OpenAI /
// Anthropic 400 on orphaned tool calls).
// Synthesize a cancellation result for
// each unprocessed sibling, mirroring the
// parallel path, and keep the wire's
// ToolStart/ToolEnd pairing intact.
for sibling in &tool_calls[call_idx + 1..] {
let cancelled = format!(
"tool cancelled: terminal tool '{}' ended the turn",
call.name
);
yield AgentEvent::ToolStart {
id: sibling.id.clone(),
name: sibling.name.clone(),
arguments: sibling.arguments.clone(),
};
conversation.messages.push(Message::tool_result(
&sibling.id,
cancelled.clone(),
));
yield AgentEvent::ToolEnd {
id: sibling.id.clone(),
name: sibling.name.clone(),
content: cancelled,
};
}
yield AgentEvent::PlanProposed { plan: output };
yield AgentEvent::Done {
conversation: conversation.clone(),
Expand Down Expand Up @@ -1159,11 +1191,28 @@ impl Agent {
// Phase 3: pump events + collect outputs.
let mut outputs: Vec<Option<String>> =
std::iter::repeat_with(|| None).take(n).collect();
// Track which calls have had a genuine ToolEnd
// emitted by their future, so a terminal sibling
// that cancels in-flight calls doesn't leave a
// ToolStart dangling on the wire (and so we never
// double-emit ToolEnd for a call whose future
// completed before we tore the dispatch down).
let mut tool_ended: Vec<bool> = vec![false; n];
let mark_ended = |ev: &AgentEvent, ended: &mut [bool]| {
if let AgentEvent::ToolEnd { id, .. } = ev {
if let Some(i) =
tool_calls.iter().position(|c| c.id == *id)
{
ended[i] = true;
}
}
};
let mut terminal_idx: Option<usize> = None;
loop {
tokio::select! {
biased;
Some(ev) = event_rx.recv() => {
mark_ended(&ev, &mut tool_ended);
yield ev;
}
Some((idx, output)) = invokes.next() => {
Expand Down Expand Up @@ -1197,6 +1246,7 @@ impl Agent {
// client sees them before the appended
// tool_result rows.
while let Ok(ev) = event_rx.try_recv() {
mark_ended(&ev, &mut tool_ended);
yield ev;
}

Expand All @@ -1215,6 +1265,20 @@ impl Agent {
})
.collect();

// Any call whose future was cancelled before it
// emitted ToolEnd still has an open ToolStart on
// the wire — close it so a UI keyed on the pair
// doesn't show a card stuck "running" forever.
for (idx, call) in tool_calls.iter().enumerate() {
if !tool_ended[idx] {
yield AgentEvent::ToolEnd {
id: call.id.clone(),
name: call.name.clone(),
content: final_contents[idx].clone(),
};
}
}

for (call, content) in
tool_calls.iter().zip(final_contents.iter())
{
Expand Down Expand Up @@ -2119,4 +2183,132 @@ mod tests {
Agent::ensure_system_prompt(&mut conv, None, true);
assert!(matches!(conv.messages[0], Message::System { ref content, .. } if content == "KEEP"));
}

/// LLM that emits a fixed batch of tool calls on the first turn,
/// then stops. Used to exercise the terminal-tool batch path.
struct BatchLlm {
iter: AtomicUsize,
calls: Vec<ToolCall>,
}
#[async_trait::async_trait]
impl LlmProvider for BatchLlm {
async fn complete(&self, _req: ChatRequest) -> Result<ChatResponse> {
let i = self.iter.fetch_add(1, Ordering::SeqCst);
if i == 0 {
Ok(ChatResponse {
message: Message::Assistant {
content: None,
tool_calls: self.calls.clone(),
reasoning_content: None,
cache: None,
},
finish_reason: FinishReason::ToolCalls,
response_id: None,
usage: None,
})
} else {
Ok(ChatResponse {
message: Message::assistant_text("done"),
finish_reason: FinishReason::Stop,
response_id: None,
usage: None,
})
}
}
}

/// A terminal tool (mirrors `exit_plan`): ends the turn even when
/// the model emitted sibling calls in the same batch.
struct TerminalTool;
#[async_trait::async_trait]
impl Tool for TerminalTool {
fn name(&self) -> &str {
"exit_plan"
}
fn description(&self) -> &str {
"terminal"
}
fn parameters(&self) -> Value {
json!({"type": "object"})
}
fn is_terminal(&self) -> bool {
true
}
async fn invoke(&self, _args: Value) -> std::result::Result<String, BoxError> {
Ok("PLAN".into())
}
}

#[tokio::test]
async fn streaming_terminal_tool_fills_sibling_results_and_pairs_events() {
// Regression: when a terminal tool is emitted alongside sibling
// calls in the same batch, the sequential streaming path used to
// return immediately, leaving the siblings without tool_result
// rows (orphaned tool_calls → provider 400 on resume) and any
// ToolStart unpaired on the wire.
use futures::StreamExt;

let calls = vec![
ToolCall {
id: "t".into(),
name: "exit_plan".into(),
arguments: json!({}),
},
ToolCall {
id: "s".into(),
name: "safe".into(),
arguments: json!({}),
},
];
let mut registry = ToolRegistry::new();
registry.register_arc(Arc::new(TerminalTool) as Arc<dyn Tool>);
registry.register_arc(CountingTool::new("safe", false) as Arc<dyn Tool>);
let cfg = AgentConfig::new("test-model").with_tools(registry);
let llm = Arc::new(BatchLlm {
iter: AtomicUsize::new(0),
calls,
});
let agent = Arc::new(Agent::new(llm as _, cfg));

let mut stream = agent.run_stream(Conversation::new());
let mut starts: Vec<String> = Vec::new();
let mut ends: Vec<String> = Vec::new();
let mut final_conv: Option<Conversation> = None;
while let Some(ev) = stream.next().await {
match ev {
AgentEvent::ToolStart { id, .. } => starts.push(id),
AgentEvent::ToolEnd { id, .. } => ends.push(id),
AgentEvent::Done { conversation, .. } => final_conv = Some(conversation),
_ => {}
}
}

// Every ToolStart has a matching ToolEnd (wire pairing).
starts.sort();
ends.sort();
assert_eq!(starts, ends, "ToolStart/ToolEnd not paired");

// Both tool calls have a tool_result in the persisted conversation.
let conv = final_conv.expect("Done event with conversation");
let result_ids: Vec<&str> = conv
.messages
.iter()
.filter_map(|m| match m {
Message::Tool { tool_call_id, .. } => Some(tool_call_id.as_str()),
_ => None,
})
.collect();
assert!(result_ids.contains(&"t"), "terminal result missing");
assert!(
result_ids.contains(&"s"),
"sibling tool_call orphaned (no tool_result): {result_ids:?}"
);
// The sibling reply is the synthetic cancellation sentinel.
let sibling_cancelled = conv.messages.iter().any(|m| matches!(
m,
Message::Tool { tool_call_id, content, .. }
if tool_call_id == "s" && content.starts_with("tool cancelled:")
));
assert!(sibling_cancelled, "sibling not marked cancelled");
}
}
110 changes: 94 additions & 16 deletions crates/harness-memory/src/summarizing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,26 +387,56 @@ impl Memory for SummarizingMemory {
for &i in &system_idxs {
out.push(messages[i].clone());
}
let summary_chars: Option<usize> = if let Some(s) = summary {
let chars = s.chars().count();
out.push(Message::system(format!(

// The summary stands in for the dropped turns and must occupy
// *their* chronological position. On the cache-breakpoint path
// the kept set can be non-contiguous (cached prefix + recent
// tail with a hole in the middle); inserting the summary right
// after the leading systems would invert chronology, presenting
// the summarised middle as if it preceded the cached prefix.
//
// Instead we walk turns in original order: kept turns are
// emitted in place, and the summary/placeholder is emitted once,
// at the position of the first dropped turn — i.e. after any
// cached-prefix turns and before the recent tail.
let summary_msg: Option<Message> = if let Some(s) = &summary {
Some(Message::system(format!(
"Earlier conversation summary ({dropped_count} turn(s) compressed):\n{s}"
)));
Some(chars)
)))
} else if dropped_count > 0 {
// Surfacing the gap explicitly is better than silent
// truncation; keeps the model from getting confused
// about why the conversation seems to start mid-thought.
Some(Message::system(format!(
"[{dropped_count} earlier turn(s) omitted — summary unavailable]"
)))
} else {
if dropped_count > 0 {
// Surfacing the gap explicitly is better than silent
// truncation; keeps the model from getting confused
// about why the conversation seems to start mid-thought.
out.push(Message::system(format!(
"[{dropped_count} earlier turn(s) omitted — summary unavailable]"
)));
}
None
};
for turn in kept {
for &i in turn {
out.push(messages[i].clone());
let summary_chars: Option<usize> = summary.as_ref().map(|s| s.chars().count());

let mut summary_emitted = false;
for turn in &turns {
let is_kept = kept.iter().any(|k| std::ptr::eq(*k, turn));
if is_kept {
for &i in turn {
out.push(messages[i].clone());
}
} else if !summary_emitted {
// First dropped turn — drop the summary in here.
if let Some(msg) = &summary_msg {
out.push(msg.clone());
}
summary_emitted = true;
}
// Subsequent dropped turns are simply skipped — they're
// already represented by the single summary message.
}
// Edge case: every turn was dropped (no kept turns), so the
// loop never emitted the summary. Append it now.
if !summary_emitted {
if let Some(msg) = &summary_msg {
out.push(msg.clone());
}
}
// Append the agent's working-context snapshot. Same helper
Expand Down Expand Up @@ -1010,6 +1040,54 @@ mod tests {
assert_eq!(llm.calls.load(Ordering::SeqCst), 0);
}

#[tokio::test]
async fn breakpoint_summary_sits_between_cached_prefix_and_recent_tail() {
// Regression for the chronology-inversion bug: on the cache
// breakpoint path the kept set is non-contiguous (cached prefix
// + recent tail with a dropped hole). The summary of the hole
// must land *between* them, not before the cached prefix.
let llm = FakeLlm::new("MIDDLE_SUMMARY");
let mem = SummarizingMemory::new(llm.clone(), "test-model", 400);

let big = "lorem ipsum dolor sit amet ".repeat(80); // ~2160 chars
let msgs = vec![
system("sys"),
user("cached q1"),
// Cache breakpoint on the assistant reply of the first turn.
assistant("cached a1").with_cache(harness_core::CacheHint::Ephemeral),
user(&format!("middle q2 {big}")),
assistant(&format!("middle a2 {big}")),
user(&format!("middle q3 {big}")),
assistant(&format!("middle a3 {big}")),
user("recent q4"),
assistant("recent a4"),
];
let out = mem.compact(&msgs).await.unwrap();

let pos = |needle: &str| {
out.iter().position(|m| match m {
Message::System { content, .. } | Message::User { content, .. } => {
content.contains(needle)
}
Message::Assistant { content, .. } => {
content.as_deref().is_some_and(|c| c.contains(needle))
}
_ => false,
})
};

let cached = pos("cached a1").expect("cached prefix kept");
let summary = pos("MIDDLE_SUMMARY").expect("summary inserted");
let recent = pos("recent q4").expect("recent tail kept");
assert!(
cached < summary && summary < recent,
"summary must sit between cached prefix and recent tail; \
got cached={cached} summary={summary} recent={recent}: {out:#?}"
);
// The dropped middle turns must be gone.
assert!(pos("middle q2").is_none(), "middle turn should be dropped");
}

#[tokio::test]
async fn over_budget_inserts_summary() {
let llm = FakeLlm::new("ALPHA AND BETA HAPPENED");
Expand Down
Loading
Loading