diff --git a/crates/tui/src/acp_server.rs b/crates/tui/src/acp_server.rs index 90b2d22e5..d239e28f8 100644 --- a/crates/tui/src/acp_server.rs +++ b/crates/tui/src/acp_server.rs @@ -4,13 +4,19 @@ //! prompt, and cancel. It keeps stdout protocol-clean for editor clients and //! routes prompts through the same configured DeepSeek client as one-shot CLI //! mode. +//! +//! `session/prompt` runs concurrently with the input reader so that a +//! `session/cancel` for the same session can interrupt an in-flight provider +//! call mid-turn (returning `stopReason: "cancelled"`) instead of being queued +//! behind it. A single writer task is preserved so stdout stays protocol-clean. use std::collections::HashMap; +use std::future::Future; use std::path::PathBuf; use anyhow::{Result, anyhow}; use serde_json::{Value, json}; -use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, Lines}; use crate::client::DeepSeekClient; use crate::config::{ApiProvider, Config}; @@ -61,7 +67,68 @@ pub async fn run_acp_server(config: Config, model: String, default_cwd: PathBuf) }; let params = message.get("params").cloned().unwrap_or_else(|| json!({})); - match server.handle_request(method, params, &mut writer).await { + // `session/prompt` is driven concurrently with the reader so a + // `session/cancel` can interrupt the in-flight provider call. Every + // other method is request/response and handled synchronously below. + if method == "session/prompt" { + match server.begin_prompt(params) { + Ok(prepared) => { + let PreparedPrompt { + session_id, + messages, + cwd, + } = prepared; + // `run_prompt` borrows `&server` immutably and never touches + // the writer, so we can race it against the reader while the + // main task keeps exclusive ownership of stdout. + let outcome = { + let prompt_future = server.run_prompt(&messages, &cwd); + drive_prompt_with_cancellation( + prompt_future, + &session_id, + &mut reader, + &mut writer, + ) + .await + }; + match outcome { + Ok(PromptOutcome::Completed(output)) => { + server.finish_prompt(&session_id, &output); + if !output.is_empty() { + write_session_update(&mut writer, &session_id, output).await?; + } + if let Some(id) = id { + write_jsonrpc_result( + &mut writer, + id, + json!({ "stopReason": "end_turn" }), + ) + .await?; + } + } + Ok(PromptOutcome::Cancelled) => { + if let Some(id) = id { + write_jsonrpc_result( + &mut writer, + id, + json!({ "stopReason": "cancelled" }), + ) + .await?; + } + } + Err(err) => { + write_jsonrpc_error(&mut writer, id, -32603, err.to_string()).await?; + } + } + } + Err(err) => { + write_jsonrpc_error(&mut writer, id, err.code, err.message).await?; + } + } + continue; + } + + match server.handle_request(method, params).await { Ok(AcpDispatch::Response(result)) => { if let Some(id) = id { write_jsonrpc_result(&mut writer, id, result).await?; @@ -82,6 +149,99 @@ pub async fn run_acp_server(config: Config, model: String, default_cwd: PathBuf) Ok(()) } +/// Outcome of a `session/prompt` turn driven against the input stream. +#[derive(Debug, PartialEq, Eq)] +enum PromptOutcome { + /// The provider call finished first; carries the assistant text. + Completed(String), + /// A matching `session/cancel` arrived before the call finished. + Cancelled, +} + +/// Run a prompt future while concurrently watching `reader` for a +/// `session/cancel` targeting `session_id`. +/// +/// This is the cancellation control point. It is generic over the future and +/// the reader/writer so the logic can be unit-tested with a delayed future and +/// an in-memory input stream — no real provider call required. The caller keeps +/// the only writer, so any acknowledgements emitted here stay on the single +/// protocol-clean stdout stream. +/// +/// While the turn is in flight the prompt is single-flight: a cancel for this +/// session (request or notification form) ends it with [`PromptOutcome::Cancelled`]; +/// a cancel for a different session is acknowledged and ignored; any other +/// concurrent *request* is rejected with a clear error so the client is not left +/// waiting; notifications without an id are ignored. +async fn drive_prompt_with_cancellation( + prompt_future: F, + session_id: &str, + reader: &mut Lines, + writer: &mut W, +) -> Result +where + F: Future>, + R: AsyncBufRead + Unpin, + W: AsyncWrite + Unpin, +{ + tokio::pin!(prompt_future); + loop { + tokio::select! { + result = &mut prompt_future => { + return Ok(PromptOutcome::Completed(result?)); + } + line = reader.next_line() => { + let line = match line? { + Some(line) => line, + // Input closed mid-turn: let the provider call finish. + None => return Ok(PromptOutcome::Completed((&mut prompt_future).await?)), + }; + if line.trim().is_empty() { + continue; + } + let message: Value = match serde_json::from_str(&line) { + Ok(value) => value, + Err(err) => { + write_jsonrpc_error(writer, None, -32700, format!("invalid json: {err}")) + .await?; + continue; + } + }; + let id = message.get("id").cloned(); + match message.get("method").and_then(Value::as_str) { + Some("session/cancel") => { + let target = message.pointer("/params/sessionId").and_then(Value::as_str); + // A cancel with no sessionId is treated as targeting the + // single in-flight turn. + if target.is_none() || target == Some(session_id) { + if let Some(id) = id { + write_jsonrpc_result(writer, id, json!(null)).await?; + } + return Ok(PromptOutcome::Cancelled); + } + // Cancel for some other session: acknowledge, keep going. + if let Some(id) = id { + write_jsonrpc_result(writer, id, json!(null)).await?; + } + } + _ => { + // The turn is single-flight; do not silently drop a + // request the client expects a response to. + if let Some(id) = id { + write_jsonrpc_error( + writer, + Some(id), + -32603, + "a session/prompt turn is already in progress", + ) + .await?; + } + } + } + } + } + } +} + struct AcpServer { config: Config, model: String, @@ -94,6 +254,15 @@ struct AcpSession { messages: Vec, } +/// The `&mut self` result of validating a `session/prompt`: the user turn is +/// already recorded, and the cloned conversation + cwd are ready for the +/// borrow-free provider call that the prompt driver races against cancellation. +struct PreparedPrompt { + session_id: String, + messages: Vec, + cwd: PathBuf, +} + enum AcpDispatch { Response(Value), Shutdown, @@ -115,15 +284,13 @@ impl AcpServer { } } - async fn handle_request( + // `session/prompt` is handled in the main loop (it needs to run concurrently + // with the reader for cancellation); every other method is request/response. + async fn handle_request( &mut self, method: &str, params: Value, - writer: &mut W, - ) -> std::result::Result - where - W: AsyncWrite + Unpin, - { + ) -> std::result::Result { match method { "initialize" => Ok(AcpDispatch::Response(initialize_result( params.get("protocolVersion").and_then(Value::as_u64), @@ -133,10 +300,8 @@ impl AcpServer { "session/listProviders" => Ok(AcpDispatch::Response(self.list_providers())), "session/currentModel" => Ok(AcpDispatch::Response(self.current_model())), "session/selectModel" => Ok(AcpDispatch::Response(self.select_model(params)?)), - "session/prompt" => { - self.prompt(params, writer).await?; - Ok(AcpDispatch::Response(json!({ "stopReason": "end_turn" }))) - } + // A cancel that arrives with no prompt in flight is an idempotent + // no-op (the in-flight case is handled by the prompt driver). "session/cancel" => Ok(AcpDispatch::Response(json!(null))), "shutdown" => Ok(AcpDispatch::Shutdown), _ => Err(AcpError::method_not_found(method)), @@ -237,14 +402,13 @@ impl AcpServer { Ok(self.current_model()) } - async fn prompt( - &mut self, - params: Value, - writer: &mut W, - ) -> std::result::Result<(), AcpError> - where - W: AsyncWrite + Unpin, - { + /// Validate a `session/prompt` request and append the user turn to history, + /// returning the cloned conversation for the (borrow-free) provider call. + /// + /// This is the `&mut self` half of a prompt turn; the long-running provider + /// call lives in [`AcpServer::run_prompt`] (which borrows `&self` only) so it + /// can be raced against the reader for cancellation. + fn begin_prompt(&mut self, params: Value) -> std::result::Result { let session_id = params .get("sessionId") .and_then(Value::as_str) @@ -254,7 +418,6 @@ impl AcpServer { .filter(|text| !text.trim().is_empty()) .ok_or_else(|| AcpError::invalid_params("prompt must include text content"))?; - // Append user message to session history and clone for the LLM call (avoids borrowing self across await) let (messages, cwd) = { let session = self .sessions @@ -270,33 +433,28 @@ impl AcpServer { (session.messages.clone(), session.cwd.clone()) }; - let output = self - .run_prompt(&messages, &cwd) - .await - .map_err(|err| AcpError::internal(err.to_string()))?; - - // Append assistant response to session history - if !output.is_empty() { - { - let session = self - .sessions - .get_mut(&session_id) - .ok_or_else(|| AcpError::invalid_params("unknown sessionId"))?; - session.messages.push(Message { - role: "assistant".to_string(), - content: vec![ContentBlock::Text { - text: output.clone(), - cache_control: None, - }], - }); - } + Ok(PreparedPrompt { + session_id, + messages, + cwd, + }) + } - write_session_update(writer, &session_id, output) - .await - .map_err(|err| AcpError::internal(err.to_string()))?; + /// Append a completed assistant turn to session history. A cancelled turn + /// never calls this, so cancelled output does not pollute the transcript. + fn finish_prompt(&mut self, session_id: &str, output: &str) { + if output.is_empty() { + return; + } + if let Some(session) = self.sessions.get_mut(session_id) { + session.messages.push(Message { + role: "assistant".to_string(), + content: vec![ContentBlock::Text { + text: output.to_string(), + cache_control: None, + }], + }); } - - Ok(()) } async fn run_prompt(&self, messages: &[Message], cwd: &PathBuf) -> Result { @@ -388,13 +546,6 @@ impl AcpError { message: format!("method not found: {method}"), } } - - fn internal(message: impl Into) -> Self { - Self { - code: -32603, - message: message.into(), - } - } } fn initialize_result(client_protocol_version: Option, config: &Config) -> Value { @@ -829,6 +980,96 @@ mod tests { ); } + fn lines_from(input: &'static str) -> Lines> { + BufReader::new(input.as_bytes()).lines() + } + + #[tokio::test] + async fn drive_prompt_completes_when_no_cancel_arrives() { + let mut reader = lines_from(""); + let mut out = Vec::new(); + let prompt_future = async { Ok("the answer".to_string()) }; + + let outcome = + drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); + + assert_eq!(outcome, PromptOutcome::Completed("the answer".to_string())); + assert!(out.is_empty(), "no protocol bytes written for clean turn"); + } + + #[tokio::test] + async fn drive_prompt_cancels_when_matching_cancel_arrives() { + // A provider call that effectively never finishes within the test. + let prompt_future = async { + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + Ok("late answer".to_string()) + }; + let mut reader = lines_from( + r#"{"jsonrpc":"2.0","method":"session/cancel","params":{"sessionId":"sess_1"}}"#, + ); + let mut out = Vec::new(); + + let outcome = + drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); + + assert_eq!(outcome, PromptOutcome::Cancelled); + // Notification-form cancel (no id) is acknowledged by acting, not writing. + assert!(out.is_empty()); + } + + #[tokio::test] + async fn drive_prompt_ignores_cancel_for_a_different_session() { + // Small delay so the unrelated cancel line is processed first, proving it + // does not abort the turn before the provider call resolves. + let prompt_future = async { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + Ok("kept going".to_string()) + }; + let mut reader = lines_from( + r#"{"jsonrpc":"2.0","id":7,"method":"session/cancel","params":{"sessionId":"other"}}"#, + ); + let mut out = Vec::new(); + + let outcome = + drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); + + assert_eq!(outcome, PromptOutcome::Completed("kept going".to_string())); + // The other-session cancel carried an id, so it was acknowledged with null. + let line = String::from_utf8(out).expect("utf8"); + let value: Value = serde_json::from_str(line.trim()).expect("json"); + assert_eq!(value["id"], "7"); + assert_eq!(value["result"], Value::Null); + } + + #[tokio::test] + async fn drive_prompt_rejects_a_concurrent_request_but_keeps_running() { + let prompt_future = async { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + Ok("done".to_string()) + }; + // A non-cancel request arrives mid-turn, then EOF. + let mut reader = + lines_from(r#"{"jsonrpc":"2.0","id":9,"method":"session/new","params":{}}"#); + let mut out = Vec::new(); + + let outcome = + drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); + + assert_eq!(outcome, PromptOutcome::Completed("done".to_string())); + let line = String::from_utf8(out).expect("utf8"); + let value: Value = serde_json::from_str(line.trim()).expect("json"); + assert_eq!(value["id"], "9"); + assert_eq!(value["error"]["code"], -32603); + } + #[test] fn different_sessions_have_independent_history() { let mut server = AcpServer::new( diff --git a/docs/ACP_REGISTRY_SUBMISSION.md b/docs/ACP_REGISTRY_SUBMISSION.md index 4532672b5..ed2befd7d 100644 --- a/docs/ACP_REGISTRY_SUBMISSION.md +++ b/docs/ACP_REGISTRY_SUBMISSION.md @@ -51,15 +51,19 @@ Implemented locally: - `session/prompt` accepts string prompts plus text/resource/resource_link blocks, routes through the configured CodeWhale client, emits one `session/update` agent message chunk, then returns `stopReason: "end_turn"`. -- `session/cancel` currently returns `null`. +- `session/prompt` now runs concurrently with the input reader, so a + `session/cancel` for the same session interrupts the in-flight provider call + mid-turn and the prompt returns `stopReason: "cancelled"`. A no-prompt + `session/cancel` stays an idempotent `null` no-op. The turn is single-flight: + another request arriving mid-turn gets a clear "prompt in progress" error + instead of being silently dropped. Known limitations to state clearly: - The adapter is baseline ACP, not the full interactive TUI/runtime surface. -- `session/cancel` is accepted but does not cancel an in-flight provider call - yet. - The response is emitted after the provider completes; it is not token - streaming. + streaming. Cancellation aborts the awaited call but cannot interrupt within a + single non-streaming provider response. - ACP does not expose shell tools, file-write tools, checkpoint replay, session loading, or the HTTP/SSE runtime API. - Registry submission should be gated on a local run of the upstream registry