From c5fda76b41964a05ed4e02999c7a544f25699852 Mon Sep 17 00:00:00 2001 From: findshan <224246733+findshan@users.noreply.github.com> Date: Sun, 28 Jun 2026 00:20:51 +0800 Subject: [PATCH] feat(acp): stream session/prompt deltas as session/update chunks (#3192) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #3698. The ACP adapter buffered the whole turn: it called the non-streaming `create_message` and emitted a single `session/update` chunk only after the provider finished. Editors like Zed show agent output incrementally, so users waited for the entire response with no feedback, and the readiness audit listed non-streaming as a known limitation. Stream the turn instead, reusing the existing `LlmClient::create_message_stream` path that the TUI engine already uses: - `open_prompt_stream` builds the request with `stream: true` and returns the `'static` `StreamEventBox`. It borrows `&self` only to read config/model and open the stream, so the caller can race it against the reader without holding a borrow on the server. - `drive_prompt_stream` consumes the stream concurrently with the input reader: each text delta (`content_block_delta`/`content_block_start` text) is emitted as its own `session/update` agent_message_chunk as it arrives; `message_stop` or stream end completes the turn with the accumulated text (recorded in history); a provider stream error surfaces as a JSON-RPC error. - Cancellation is preserved and now interrupts mid-stream: a matching `session/cancel` returns `stopReason: "cancelled"` and dropping the stream aborts the underlying provider connection. Single-flight semantics are unchanged (other-session cancel ignored; concurrent request rejected with a clear error). The single-writer invariant holds, so streamed chunks and acknowledgements stay on one protocol-clean stdout stream. The streaming + cancellation control logic stays a free function over the reader/writer + boxed stream, so it is unit-tested with canned in-memory streams (delta-by-delta chunking, cancel mid-stream, other-session cancel ignored, concurrent request rejected) — no real provider call. Full bin suite 5521 green; fmt + clippy clean. Updates docs/ACP_REGISTRY_SUBMISSION.md. Refs #3192. Signed-off-by: findshan <224246733+findshan@users.noreply.github.com> --- crates/tui/src/acp_server.rs | 346 +++++++++++++++++++++----------- docs/ACP_REGISTRY_SUBMISSION.md | 25 ++- 2 files changed, 238 insertions(+), 133 deletions(-) diff --git a/crates/tui/src/acp_server.rs b/crates/tui/src/acp_server.rs index d239e28f8..b0dd0c7f5 100644 --- a/crates/tui/src/acp_server.rs +++ b/crates/tui/src/acp_server.rs @@ -5,23 +5,28 @@ //! routes prompts through the same configured DeepSeek client as one-shot CLI //! mode. //! -//! `session/prompt` runs concurrently with the input reader so that a -//! `session/cancel` for the same session can interrupt an in-flight provider -//! call mid-turn (returning `stopReason: "cancelled"`) instead of being queued -//! behind it. A single writer task is preserved so stdout stays protocol-clean. +//! `session/prompt` streams the provider response: each text delta is emitted +//! as a `session/update` agent_message_chunk as it arrives, instead of buffering +//! the whole turn and sending one chunk at the end. The stream is consumed +//! concurrently with the input reader so that a `session/cancel` for the same +//! session can interrupt the turn mid-stream (returning `stopReason: "cancelled"`) +//! instead of being queued behind it. A single writer task is preserved so +//! stdout stays protocol-clean. use std::collections::HashMap; -use std::future::Future; use std::path::PathBuf; use anyhow::{Result, anyhow}; +use futures_util::StreamExt; use serde_json::{Value, json}; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, Lines}; use crate::client::DeepSeekClient; use crate::config::{ApiProvider, Config}; -use crate::llm_client::LlmClient; -use crate::models::{ContentBlock, Message, MessageRequest, SystemPrompt}; +use crate::llm_client::{LlmClient, StreamEventBox}; +use crate::models::{ + ContentBlock, ContentBlockStart, Delta, Message, MessageRequest, StreamEvent, SystemPrompt, +}; const ACP_PROTOCOL_VERSION: u64 = 1; @@ -78,42 +83,43 @@ pub async fn run_acp_server(config: Config, model: String, default_cwd: PathBuf) messages, cwd, } = prepared; - // `run_prompt` borrows `&server` immutably and never touches - // the writer, so we can race it against the reader while the - // main task keeps exclusive ownership of stdout. - let outcome = { - let prompt_future = server.run_prompt(&messages, &cwd); - drive_prompt_with_cancellation( - prompt_future, - &session_id, - &mut reader, - &mut writer, - ) - .await - }; - match outcome { - Ok(PromptOutcome::Completed(output)) => { - server.finish_prompt(&session_id, &output); - if !output.is_empty() { - write_session_update(&mut writer, &session_id, output).await?; - } - if let Some(id) = id { - write_jsonrpc_result( - &mut writer, - id, - json!({ "stopReason": "end_turn" }), - ) - .await?; - } - } - Ok(PromptOutcome::Cancelled) => { - if let Some(id) = id { - write_jsonrpc_result( - &mut writer, - id, - json!({ "stopReason": "cancelled" }), - ) - .await?; + // Opening the stream borrows `&server` only briefly; the + // returned `StreamEventBox` is `'static`, so it can be raced + // against the reader without holding a borrow on the server, + // and the main task keeps exclusive ownership of stdout. + match server.open_prompt_stream(&messages, &cwd).await { + Ok(stream) => { + let outcome = + drive_prompt_stream(stream, &session_id, &mut reader, &mut writer) + .await; + match outcome { + Ok(PromptOutcome::Completed(output)) => { + // Chunks were already streamed; record the full + // assistant turn in history for the next prompt. + server.finish_prompt(&session_id, &output); + if let Some(id) = id { + write_jsonrpc_result( + &mut writer, + id, + json!({ "stopReason": "end_turn" }), + ) + .await?; + } + } + Ok(PromptOutcome::Cancelled) => { + if let Some(id) = id { + write_jsonrpc_result( + &mut writer, + id, + json!({ "stopReason": "cancelled" }), + ) + .await?; + } + } + Err(err) => { + write_jsonrpc_error(&mut writer, id, -32603, err.to_string()) + .await?; + } } } Err(err) => { @@ -158,42 +164,88 @@ enum PromptOutcome { Cancelled, } -/// Run a prompt future while concurrently watching `reader` for a +/// The text payload an ACP client should see for a given stream event, if any. +/// ACP baseline is text-only, so thinking/tool/control events carry no chunk. +fn stream_text_chunk(event: &StreamEvent) -> Option<&str> { + match event { + StreamEvent::ContentBlockDelta { + delta: Delta::TextDelta { text }, + .. + } => Some(text), + StreamEvent::ContentBlockStart { + content_block: ContentBlockStart::Text { text }, + .. + } => Some(text), + _ => None, + } +} + +/// Consume a provider response `stream`, emitting each text delta as a +/// `session/update` chunk, while concurrently watching `reader` for a /// `session/cancel` targeting `session_id`. /// -/// This is the cancellation control point. It is generic over the future and -/// the reader/writer so the logic can be unit-tested with a delayed future and -/// an in-memory input stream — no real provider call required. The caller keeps -/// the only writer, so any acknowledgements emitted here stay on the single -/// protocol-clean stdout stream. +/// This is the streaming + cancellation control point. It is generic over the +/// reader/writer and takes the boxed stream, so it is unit-tested with canned +/// in-memory streams and readers — no real provider call required. The caller +/// keeps the only writer, so streamed chunks and acknowledgements all stay on +/// the single protocol-clean stdout stream. /// -/// While the turn is in flight the prompt is single-flight: a cancel for this -/// session (request or notification form) ends it with [`PromptOutcome::Cancelled`]; -/// a cancel for a different session is acknowledged and ignored; any other -/// concurrent *request* is rejected with a clear error so the client is not left -/// waiting; notifications without an id are ignored. -async fn drive_prompt_with_cancellation( - prompt_future: F, +/// Returns [`PromptOutcome::Completed`] with the full accumulated text once the +/// stream ends (or emits `message_stop`), so the caller can record the turn in +/// history. A matching `session/cancel` (request or notification form) ends it +/// early with [`PromptOutcome::Cancelled`] — dropping the stream aborts the +/// underlying provider connection. The turn is single-flight: a cancel for a +/// different session is acknowledged and ignored; any other concurrent *request* +/// is rejected with a clear error so the client is not left waiting; +/// notifications without an id are ignored. +async fn drive_prompt_stream( + mut stream: StreamEventBox, session_id: &str, reader: &mut Lines, writer: &mut W, ) -> Result where - F: Future>, R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin, { - tokio::pin!(prompt_future); + let mut accumulated = String::new(); + // Once input closes mid-turn we stop selecting on the reader and just drain + // the stream to completion, rather than spinning on repeated EOFs. + let mut reader_open = true; loop { tokio::select! { - result = &mut prompt_future => { - return Ok(PromptOutcome::Completed(result?)); + event = stream.next() => { + match event { + // Stream exhausted without an explicit stop: turn is done. + None => return Ok(PromptOutcome::Completed(accumulated)), + Some(Ok(event)) => { + if let Some(text) = stream_text_chunk(&event) { + if !text.is_empty() { + accumulated.push_str(text); + write_session_update(writer, session_id, text.to_string()).await?; + } + } + match event { + StreamEvent::MessageStop => { + return Ok(PromptOutcome::Completed(accumulated)); + } + StreamEvent::Error { error } => { + return Err(anyhow!("provider stream error: {error}")); + } + _ => {} + } + } + Some(Err(err)) => return Err(err), + } } - line = reader.next_line() => { + line = reader.next_line(), if reader_open => { let line = match line? { Some(line) => line, - // Input closed mid-turn: let the provider call finish. - None => return Ok(PromptOutcome::Completed((&mut prompt_future).await?)), + // Input closed mid-turn: stop watching it, keep draining. + None => { + reader_open = false; + continue; + } }; if line.trim().is_empty() { continue; @@ -216,6 +268,7 @@ where if let Some(id) = id { write_jsonrpc_result(writer, id, json!(null)).await?; } + // Dropping `stream` on return aborts the provider call. return Ok(PromptOutcome::Cancelled); } // Cancel for some other session: acknowledge, keep going. @@ -405,9 +458,10 @@ impl AcpServer { /// Validate a `session/prompt` request and append the user turn to history, /// returning the cloned conversation for the (borrow-free) provider call. /// - /// This is the `&mut self` half of a prompt turn; the long-running provider - /// call lives in [`AcpServer::run_prompt`] (which borrows `&self` only) so it - /// can be raced against the reader for cancellation. + /// This is the `&mut self` half of a prompt turn; the streaming provider + /// call lives in [`AcpServer::open_prompt_stream`] (which borrows `&self` + /// only and returns a `'static` stream) so it can be raced against the + /// reader for cancellation. fn begin_prompt(&mut self, params: Value) -> std::result::Result { let session_id = params .get("sessionId") @@ -457,7 +511,17 @@ impl AcpServer { } } - async fn run_prompt(&self, messages: &[Message], cwd: &PathBuf) -> Result { + /// Resolve the route, build the streaming request, and open the provider + /// response stream. Borrows `&self` only to read config/model; the returned + /// [`StreamEventBox`] is `'static`, so the caller can race it against the + /// reader without holding any borrow on the server. The cwd guard only needs + /// to cover route resolution and client construction, not stream + /// consumption (ACP exposes no file/shell tools), so it is dropped here. + async fn open_prompt_stream( + &self, + messages: &[Message], + cwd: &PathBuf, + ) -> Result { let _cwd_guard = ScopedCurrentDir::new(cwd)?; let last_user_text = messages .iter() @@ -494,19 +558,12 @@ impl AcpServer { metadata: None, thinking: None, reasoning_effort, - stream: Some(false), + stream: Some(true), temperature: Some(0.2), top_p: Some(0.9), }; - let response = client.create_message(request).await?; - let mut output = String::new(); - for block in response.content { - if let ContentBlock::Text { text, .. } = block { - output.push_str(&text); - } - } - Ok(output) + client.create_message_stream(request).await } } @@ -984,37 +1041,84 @@ mod tests { BufReader::new(input.as_bytes()).lines() } + fn text_delta(text: &str) -> StreamEvent { + StreamEvent::ContentBlockDelta { + index: 0, + delta: Delta::TextDelta { + text: text.to_string(), + }, + } + } + + /// A stream that yields the given events immediately, then ends. + fn ready_stream(events: Vec) -> StreamEventBox { + Box::pin(futures_util::stream::iter( + events.into_iter().map(Ok::<_, anyhow::Error>), + )) + } + + /// A stream that never yields, so a concurrent cancel always wins. + fn pending_stream() -> StreamEventBox { + Box::pin(futures_util::stream::pending::>()) + } + + /// A stream that yields `events` immediately, then emits `message_stop` + /// after a short delay — long enough that an already-buffered reader line is + /// processed first, making the ordering deterministic in tests. + fn events_then_delayed_stop(events: Vec) -> StreamEventBox { + let head = futures_util::stream::iter(events.into_iter().map(Ok::<_, anyhow::Error>)); + let tail = futures_util::stream::once(async { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + Ok(StreamEvent::MessageStop) + }); + Box::pin(head.chain(tail)) + } + + fn parse_lines(out: Vec) -> Vec { + String::from_utf8(out) + .expect("utf8") + .lines() + .filter(|line| !line.trim().is_empty()) + .map(|line| serde_json::from_str(line).expect("json")) + .collect() + } + #[tokio::test] - async fn drive_prompt_completes_when_no_cancel_arrives() { + async fn drive_prompt_streams_each_delta_as_a_chunk_then_completes() { + let stream = ready_stream(vec![ + text_delta("hello"), + text_delta(" world"), + StreamEvent::MessageStop, + ]); let mut reader = lines_from(""); let mut out = Vec::new(); - let prompt_future = async { Ok("the answer".to_string()) }; - let outcome = - drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) - .await - .expect("driver ok"); - - assert_eq!(outcome, PromptOutcome::Completed("the answer".to_string())); - assert!(out.is_empty(), "no protocol bytes written for clean turn"); + let outcome = drive_prompt_stream(stream, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); + + // Full text is accumulated for history... + assert_eq!(outcome, PromptOutcome::Completed("hello world".to_string())); + // ...and each delta was emitted as its own session/update chunk. + let updates = parse_lines(out); + assert_eq!(updates.len(), 2); + assert!(updates.iter().all(|u| u["method"] == "session/update")); + assert_eq!(updates[0]["params"]["update"]["content"]["text"], "hello"); + assert_eq!(updates[1]["params"]["update"]["content"]["text"], " world"); } #[tokio::test] async fn drive_prompt_cancels_when_matching_cancel_arrives() { - // A provider call that effectively never finishes within the test. - let prompt_future = async { - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - Ok("late answer".to_string()) - }; + // A provider stream that never finishes within the test. + let stream = pending_stream(); let mut reader = lines_from( r#"{"jsonrpc":"2.0","method":"session/cancel","params":{"sessionId":"sess_1"}}"#, ); let mut out = Vec::new(); - let outcome = - drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) - .await - .expect("driver ok"); + let outcome = drive_prompt_stream(stream, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); assert_eq!(outcome, PromptOutcome::Cancelled); // Notification-form cancel (no id) is acknowledged by acting, not writing. @@ -1023,51 +1127,49 @@ mod tests { #[tokio::test] async fn drive_prompt_ignores_cancel_for_a_different_session() { - // Small delay so the unrelated cancel line is processed first, proving it - // does not abort the turn before the provider call resolves. - let prompt_future = async { - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - Ok("kept going".to_string()) - }; + // The unrelated cancel line is buffered and ready; the delayed stop makes + // it process first, proving it does not abort the turn. + let stream = events_then_delayed_stop(vec![text_delta("kept")]); let mut reader = lines_from( r#"{"jsonrpc":"2.0","id":7,"method":"session/cancel","params":{"sessionId":"other"}}"#, ); let mut out = Vec::new(); - let outcome = - drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) - .await - .expect("driver ok"); + let outcome = drive_prompt_stream(stream, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); - assert_eq!(outcome, PromptOutcome::Completed("kept going".to_string())); + assert_eq!(outcome, PromptOutcome::Completed("kept".to_string())); // The other-session cancel carried an id, so it was acknowledged with null. - let line = String::from_utf8(out).expect("utf8"); - let value: Value = serde_json::from_str(line.trim()).expect("json"); - assert_eq!(value["id"], "7"); - assert_eq!(value["result"], Value::Null); + let lines = parse_lines(out); + assert!( + lines + .iter() + .any(|v| v["id"] == "7" && v["result"] == Value::Null), + "expected a null ack for the other-session cancel, got {lines:?}" + ); } #[tokio::test] async fn drive_prompt_rejects_a_concurrent_request_but_keeps_running() { - let prompt_future = async { - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - Ok("done".to_string()) - }; - // A non-cancel request arrives mid-turn, then EOF. + let stream = events_then_delayed_stop(vec![text_delta("done")]); + // A non-cancel request arrives mid-turn. let mut reader = lines_from(r#"{"jsonrpc":"2.0","id":9,"method":"session/new","params":{}}"#); let mut out = Vec::new(); - let outcome = - drive_prompt_with_cancellation(prompt_future, "sess_1", &mut reader, &mut out) - .await - .expect("driver ok"); + let outcome = drive_prompt_stream(stream, "sess_1", &mut reader, &mut out) + .await + .expect("driver ok"); assert_eq!(outcome, PromptOutcome::Completed("done".to_string())); - let line = String::from_utf8(out).expect("utf8"); - let value: Value = serde_json::from_str(line.trim()).expect("json"); - assert_eq!(value["id"], "9"); - assert_eq!(value["error"]["code"], -32603); + let lines = parse_lines(out); + assert!( + lines + .iter() + .any(|v| v["id"] == "9" && v["error"]["code"] == -32603), + "expected a prompt-in-progress error for the concurrent request, got {lines:?}" + ); } #[test] diff --git a/docs/ACP_REGISTRY_SUBMISSION.md b/docs/ACP_REGISTRY_SUBMISSION.md index ed2befd7d..a248460fe 100644 --- a/docs/ACP_REGISTRY_SUBMISSION.md +++ b/docs/ACP_REGISTRY_SUBMISSION.md @@ -49,21 +49,24 @@ Implemented locally: - `authMethods` with terminal auth: `auth set --provider ` - `session/new` creates an in-memory session with a cwd. - `session/prompt` accepts string prompts plus text/resource/resource_link - blocks, routes through the configured CodeWhale client, emits one - `session/update` agent message chunk, then returns `stopReason: "end_turn"`. -- `session/prompt` now runs concurrently with the input reader, so a - `session/cancel` for the same session interrupts the in-flight provider call - mid-turn and the prompt returns `stopReason: "cancelled"`. A no-prompt - `session/cancel` stays an idempotent `null` no-op. The turn is single-flight: - another request arriving mid-turn gets a clear "prompt in progress" error - instead of being silently dropped. + blocks and routes through the configured CodeWhale client. +- `session/prompt` **streams**: each provider text delta is emitted as a + `session/update` agent_message_chunk as it arrives, then the prompt returns + `stopReason: "end_turn"` (instead of buffering the whole turn and sending one + chunk at the end). +- The stream is consumed concurrently with the input reader, so a + `session/cancel` for the same session interrupts the turn mid-stream and the + prompt returns `stopReason: "cancelled"`; dropping the stream aborts the + underlying provider connection. A no-prompt `session/cancel` stays an + idempotent `null` no-op. The turn is single-flight: another request arriving + mid-turn gets a clear "prompt in progress" error instead of being silently + dropped. Known limitations to state clearly: - The adapter is baseline ACP, not the full interactive TUI/runtime surface. -- The response is emitted after the provider completes; it is not token - streaming. Cancellation aborts the awaited call but cannot interrupt within a - single non-streaming provider response. +- Streaming covers text deltas only; thinking/tool/server-tool deltas are not + surfaced over ACP (ACP baseline here is text-only, `tools: None`). - ACP does not expose shell tools, file-write tools, checkpoint replay, session loading, or the HTTP/SSE runtime API. - Registry submission should be gated on a local run of the upstream registry