From 374df8a6d3141cec29aa0d978675858c3cf8fe30 Mon Sep 17 00:00:00 2001 From: Nicolas Dreno Date: Mon, 4 May 2026 14:48:08 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(ai-proxy):=20/v1/responses=20translati?= =?UTF-8?q?on=20(ADR-0030=20=C2=A72)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the OpenAI Responses API surface as the second protocol on ai-proxy. Path-based dispatch routes POST /v1/responses through the new Responses adapter; everything else still routes to Chat Completions. Per-provider behavior ===================== - **OpenAI**: passthrough at /v1/responses upstream. Streaming uses host_http_stream like Chat Completions. - **Anthropic**: translate input[] items ↔ Messages content blocks: input_text / input_image → text / image content blocks function_call → tool_use block function_call_output → tool_result block reasoning → dropped (Anthropic doesn't accept client-supplied reasoning input); counted + Warning: 299 surfaced. `instructions` is hoisted to Anthropic's top-level `system` field. Streaming is buffered to a single terminal event (mirrors ADR-0024 Chat Completions until true SSE translation lands). - **Ollama**: 400 problem+json with code: responses_not_supported_for_provider. Ollama's OpenAI-compat surface is Chat Completions only as of 2026-04. Spec-level guards (preflight, before target resolution) ======================================================= - previous_response_id present (and not null) → 400 problem+json with code: previous_response_id_not_supported. The stateful Responses API requires session-scoped storage that ADR-0030 §2 explicitly defers; this is the forward-compat hook. - store: true | absent → permissive (process statelessly, attach Warning: 299 — "store ignored; gateway is stateless"). Most clients send store: true as an unexamined default; rejecting it would break them gratuitously. Operators see the downgrade via the barbacane_plugin_ai_proxy_responses_store_downgrades_total counter. - store: false → no warning, no counter. Synthetic Responses id ====================== Format: resp_. v7 is time-ordered, so a `resp_*` grep across access logs comes out chronologically without needing a separate sort key. Built manually from host_time_now + a per-instance counter — the wasm32-unknown-unknown target has no system RNG, but the v7 spec only requires monotonicity within a node, which the counter provides. Provider refactor ================= providers/anthropic.rs gains an anthropic_messages_call_raw helper that sends a pre-built Messages body and returns the raw upstream Response. The existing anthropic_call (used by Chat Completions) now layers on top of it; the new responses module uses it directly. No protocol-side duplication of auth headers / version pinning. Tests ===== - Plugin unit tests: 67 → 90 (+23). 8 preflight tests, 7 input-item translation cases, 5 output-item translation cases, 4 Warning header cases, 1 error response shape. - Integration tests (ai_proxy.rs): 9 → 15 (+6 covering OpenAI passthrough, previous_response_id 400, Ollama 400, full Anthropic translation roundtrip, store Warning header, reasoning Warning header). Schema description ================== config-schema.json now documents the path-dispatch matrix and the forward-compat scope of /v1/responses (stateless, what's rejected, synthetic id format). The schema for the route table itself is unchanged — Responses is a path concern, not a config one. --- CHANGELOG.md | 6 + README.md | 4 +- crates/barbacane-test/tests/ai_proxy.rs | 311 +++++++ plugins/ai-proxy/Cargo.lock | 7 + plugins/ai-proxy/Cargo.toml | 5 + plugins/ai-proxy/config-schema.json | 2 +- plugins/ai-proxy/src/lib.rs | 37 +- plugins/ai-proxy/src/protocols/mod.rs | 1 + plugins/ai-proxy/src/protocols/responses.rs | 956 ++++++++++++++++++++ plugins/ai-proxy/src/providers/anthropic.rs | 36 +- 10 files changed, 1343 insertions(+), 22 deletions(-) create mode 100644 plugins/ai-proxy/src/protocols/responses.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a1eb7d..08f08a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **`ai-response-guard` middleware plugin**: inspects LLM responses (OpenAI chat-completion format) in on_response. Named profiles carry `redact` rules (regex → replacement, scoped to `choices[].message.content` and `delta.content`) and `blocked_patterns` (match replaces the response with 502). Streamed responses cannot be redacted after the fact; the plugin emits `redactions_skipped_streaming_total` instead. - **Named-profile + CEL composition pattern**: all four AI middlewares read a `context_key` (default `ai.policy`, overridable) to select the active profile. A `cel` middleware upstream writes `ai.policy` via `on_match.set_context`; one CEL decision fans out to prompt strictness, token budget, redaction strictness, and the `ai-proxy` dispatcher's named targets (via `ai.target`). +### Added +- **plugin**: `ai-proxy` `POST /v1/responses` — OpenAI Responses API support, stateless only (ADR-0030 §2). For OpenAI provider, the dispatcher passes through to the upstream `/v1/responses`. For Anthropic, the request is translated to Messages API: `input_text`/`input_image` → `text`/`image` content blocks, `function_call` + `function_call_output` → `tool_use` + `tool_result`, `reasoning` items are dropped (Anthropic doesn't accept client-supplied reasoning). The response is translated back to Responses shape with a synthetic time-ordered `id` (format `resp_`). For Ollama, returns 400 `responses_not_supported_for_provider` (Ollama's OpenAI-compat surface is Chat Completions only). +- **plugin**: `ai-proxy` `previous_response_id` returns 400 `previous_response_id_not_supported`. The stateful Responses API (`previous_response_id` + `GET /v1/responses/{id}` retrieval) requires session-scoped storage that ADR-0030 §2 explicitly defers; the rejection is the forward-compatibility hook. +- **plugin**: `ai-proxy` `store` flag is permissive — `true`, `false`, and absent all flow through unchanged. When `store ≠ false` (most clients send `true` as an unexamined default), the dispatcher emits a `Warning: 299 - "store ignored; gateway is stateless, see ADR-0030"` header and increments `barbacane_plugin_ai_proxy_responses_store_downgrades_total`. Operators can quantify stateful-API usage and decide whether to prioritize the future session-storage capability. +- **plugin**: `ai-proxy` `reasoning` items dropped on the Responses → Anthropic translation path emit `Warning: 299 - "reasoning items dropped..."` and increment `barbacane_plugin_ai_proxy_responses_reasoning_dropped_total`. Silent reasoning drops can degrade output quality on multi-turn agent flows in ways the client cannot detect. + ### Fixed - **plugin**: `ai-proxy` no longer returns `404 Not Found` when the operation is bound to a path other than `/v1/chat/completions`. The path-based dispatch added in PR-1 was too strict — operators are free to bind `ai-proxy` to any operation path, and the dispatcher routes Chat Completions requests through unchanged. PR-4 will reintroduce path-based dispatch narrowly when `/v1/responses` actually has a second protocol to differentiate. diff --git a/README.md b/README.md index bc5b5dc..f531af0 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ CI Documentation Unit Tests - Plugin Tests - Integration Tests + Plugin Tests + Integration Tests CLI Tests UI Tests E2E Tests diff --git a/crates/barbacane-test/tests/ai_proxy.rs b/crates/barbacane-test/tests/ai_proxy.rs index d335b3b..586c03c 100644 --- a/crates/barbacane-test/tests/ai_proxy.rs +++ b/crates/barbacane-test/tests/ai_proxy.rs @@ -594,3 +594,314 @@ paths: assert_eq!(resp.status(), 403, "deny must return 403, not escalate"); } + +// ========================================================================= +// ADR-0030 §2 — Responses API at POST /v1/responses +// ========================================================================= + +/// Build a temp spec exposing `/v1/responses` bound to `ai-proxy` with the +/// given provider + base_url. The path is the canonical OpenAI Responses +/// path so the dispatcher's path-match (PR-4) routes through the Responses +/// adapter. +fn create_responses_spec( + provider: &str, + base_url: &str, +) -> (tempfile::TempDir, std::path::PathBuf) { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let manifest_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")); + let plugins_dir = manifest_dir + .parent() + .unwrap() + .parent() + .unwrap() + .join("plugins"); + let ai_proxy_path = plugins_dir.join("ai-proxy/ai-proxy.wasm"); + + std::fs::write( + temp_dir.path().join("barbacane.yaml"), + format!( + "plugins:\n ai-proxy:\n path: {}\n", + ai_proxy_path.display() + ), + ) + .unwrap(); + + let spec_path = temp_dir.path().join("responses.yaml"); + let api_key_line = match provider { + "anthropic" | "openai" => " api_key: \"sk-test\"\n", + _ => "", + }; + std::fs::write( + &spec_path, + format!( + r#"openapi: "3.0.3" +info: + title: Responses API integration + version: "1.0.0" +paths: + /v1/responses: + post: + operationId: responses + requestBody: + required: true + content: + application/json: + schema: + type: object + x-barbacane-dispatch: + name: ai-proxy + config: + provider: {provider} +{api_key_line} base_url: "{base_url}" + timeout: 10 + max_tokens: 1024 + responses: + "200": + description: ok + "400": + description: client error +"#, + provider = provider, + api_key_line = api_key_line, + base_url = base_url, + ), + ) + .unwrap(); + (temp_dir, spec_path) +} + +#[tokio::test] +async fn test_ai_proxy_responses_openai_passthrough() { + // Mock the canonical OpenAI Responses endpoint upstream and verify the + // gateway just passes through (no translation for the OpenAI provider). + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string( + r#"{"id":"resp_abc","object":"response","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}"#, + ) + .insert_header("content-type", "application/json"), + ) + .expect(1) + .mount(&mock_server) + .await; + + let (_tmp, spec_path) = create_responses_spec("openai", &mock_server.uri()); + let gateway = TestGateway::from_spec(spec_path.to_str().unwrap()) + .await + .expect("gateway"); + + let resp = gateway + .post( + "/v1/responses", + r#"{"model":"gpt-4o","input":[{"type":"input_text","role":"user","content":"hi"}]}"#, + ) + .await + .unwrap(); + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["object"], "response"); +} + +#[tokio::test] +async fn test_ai_proxy_responses_400_on_previous_response_id() { + // The mock must NOT be reached — the preflight check rejects this body + // before target resolution. + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(0) + .mount(&mock_server) + .await; + + let (_tmp, spec_path) = create_responses_spec("openai", &mock_server.uri()); + let gateway = TestGateway::from_spec(spec_path.to_str().unwrap()) + .await + .expect("gateway"); + + let resp = gateway + .post( + "/v1/responses", + r#"{"model":"gpt-4o","input":[],"previous_response_id":"resp_old"}"#, + ) + .await + .unwrap(); + assert_eq!(resp.status(), 400); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["code"], "previous_response_id_not_supported"); +} + +#[tokio::test] +async fn test_ai_proxy_responses_400_on_ollama_provider() { + let mock_server = MockServer::start().await; + // Ollama doesn't have a Responses surface — the mock must NOT be reached. + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(ResponseTemplate::new(200).set_body_string("{}")) + .expect(0) + .mount(&mock_server) + .await; + + let (_tmp, spec_path) = create_responses_spec("ollama", &mock_server.uri()); + let gateway = TestGateway::from_spec(spec_path.to_str().unwrap()) + .await + .expect("gateway"); + + let resp = gateway + .post( + "/v1/responses", + r#"{"model":"mistral","input":[{"type":"input_text","role":"user","content":"hi"}]}"#, + ) + .await + .unwrap(); + assert_eq!(resp.status(), 400); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["code"], "responses_not_supported_for_provider"); +} + +#[tokio::test] +async fn test_ai_proxy_responses_anthropic_translation_roundtrip() { + // Mock Anthropic /v1/messages returning a Messages-format response. The + // gateway must translate it into Responses format for the client. + let mock_server = MockServer::start().await; + let messages_response = r#"{ + "id":"msg_xyz","type":"message","role":"assistant","model":"claude-sonnet-4-6", + "content":[{"type":"text","text":"Hello!"}], + "stop_reason":"end_turn", + "usage":{"input_tokens":4,"output_tokens":2} + }"#; + Mock::given(method("POST")) + .and(path("/v1/messages")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string(messages_response) + .insert_header("content-type", "application/json"), + ) + .expect(1) + .mount(&mock_server) + .await; + + let (_tmp, spec_path) = create_responses_spec("anthropic", &mock_server.uri()); + let gateway = TestGateway::from_spec(spec_path.to_str().unwrap()) + .await + .expect("gateway"); + + let resp = gateway + .post( + "/v1/responses", + r#"{ + "model":"claude-sonnet-4-6", + "store":false, + "input":[{"type":"input_text","role":"user","content":"Hi"}] + }"#, + ) + .await + .unwrap(); + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["object"], "response"); + let id = body["id"].as_str().unwrap(); + assert!(id.starts_with("resp_"), "synthetic id: {}", id); + assert_eq!(body["model"], "claude-sonnet-4-6"); + assert_eq!(body["output"][0]["type"], "output_text"); + assert_eq!(body["output"][0]["text"], "Hello!"); + assert_eq!(body["usage"]["input_tokens"], 4); + assert_eq!(body["usage"]["output_tokens"], 2); +} + +#[tokio::test] +async fn test_ai_proxy_responses_warning_header_on_store_downgrade() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/messages")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string( + r#"{"id":"msg","model":"claude","content":[{"type":"text","text":"ok"}],"usage":{"input_tokens":1,"output_tokens":1}}"#, + ) + .insert_header("content-type", "application/json"), + ) + .mount(&mock_server) + .await; + + let (_tmp, spec_path) = create_responses_spec("anthropic", &mock_server.uri()); + let gateway = TestGateway::from_spec(spec_path.to_str().unwrap()) + .await + .expect("gateway"); + + // store: true is the OpenAI default — gateway downgrades and tells the client. + let resp = gateway + .post( + "/v1/responses", + r#"{ + "model":"claude-sonnet-4-6", + "store":true, + "input":[{"type":"input_text","role":"user","content":"hi"}] + }"#, + ) + .await + .unwrap(); + assert_eq!(resp.status(), 200); + let warning = resp + .headers() + .get("warning") + .expect("warning header set") + .to_str() + .unwrap(); + assert!( + warning.contains("store ignored"), + "warning should announce the store downgrade: {}", + warning + ); +} + +#[tokio::test] +async fn test_ai_proxy_responses_warning_header_on_reasoning_dropped() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/messages")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string( + r#"{"id":"msg","model":"claude","content":[{"type":"text","text":"ok"}],"usage":{"input_tokens":1,"output_tokens":1}}"#, + ) + .insert_header("content-type", "application/json"), + ) + .mount(&mock_server) + .await; + + let (_tmp, spec_path) = create_responses_spec("anthropic", &mock_server.uri()); + let gateway = TestGateway::from_spec(spec_path.to_str().unwrap()) + .await + .expect("gateway"); + + let resp = gateway + .post( + "/v1/responses", + r#"{ + "model":"claude-sonnet-4-6", + "store":false, + "input":[ + {"type":"reasoning","summary":"thinking..."}, + {"type":"input_text","role":"user","content":"hi"} + ] + }"#, + ) + .await + .unwrap(); + assert_eq!(resp.status(), 200); + let warning = resp + .headers() + .get("warning") + .expect("warning header set") + .to_str() + .unwrap(); + assert!( + warning.contains("reasoning items dropped"), + "warning should announce reasoning drop: {}", + warning + ); +} diff --git a/plugins/ai-proxy/Cargo.lock b/plugins/ai-proxy/Cargo.lock index c8315fb..b80bd19 100644 --- a/plugins/ai-proxy/Cargo.lock +++ b/plugins/ai-proxy/Cargo.lock @@ -19,6 +19,7 @@ dependencies = [ "globset", "serde", "serde_json", + "uuid", ] [[package]] @@ -173,6 +174,12 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "uuid" +version = "1.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" + [[package]] name = "zmij" version = "1.0.21" diff --git a/plugins/ai-proxy/Cargo.toml b/plugins/ai-proxy/Cargo.toml index e611f35..5cf95a4 100644 --- a/plugins/ai-proxy/Cargo.toml +++ b/plugins/ai-proxy/Cargo.toml @@ -15,6 +15,11 @@ barbacane-plugin-sdk = { path = "../../crates/barbacane-plugin-sdk" } globset = { version = "0.4", default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" +# Used only for formatting bytes as the UUID dashed-hex form. v7 is built +# manually from `host_time_now` + a per-instance counter — the wasm32- +# unknown-unknown target has no system RNG, and the v7 spec only requires +# monotonicity within a node, which the counter provides. ADR-0030 §2. +uuid = { version = "1", default-features = false } [profile.release] opt-level = "s" diff --git a/plugins/ai-proxy/config-schema.json b/plugins/ai-proxy/config-schema.json index c5964fc..1ad6e9f 100644 --- a/plugins/ai-proxy/config-schema.json +++ b/plugins/ai-proxy/config-schema.json @@ -2,7 +2,7 @@ "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "urn:barbacane:plugin:ai-proxy:config", "title": "AI Proxy Dispatcher Config", - "description": "Configuration for the AI proxy dispatcher plugin. Exposes a unified OpenAI-compatible API and routes to LLM providers (OpenAI, Anthropic, Ollama). Supports named targets for policy-driven routing (via the `cel` middleware), glob-based dynamic model routing (`routes`), provider fallback on 5xx, and token count propagation into context for downstream middlewares.\n\nThe model identifier is **always** taken from the client's `model` field on the request body (ADR-0030 §0 — caller-owned model). Gateway config declares **providers** (where to go, with what credentials), never an authoritative model list. A request that omits `model` is rejected with 400 problem+json (`urn:barbacane:error:model_required`).\n\nThree resolution modes (evaluated in order):\n- **Context** (`ai.target`): a `cel` middleware writes `ai.target: ` into context; the dispatcher uses `targets[]`.\n- **Routes**: each entry's glob `pattern` is matched against the client's `model`; first match wins.\n- **Default / flat**: `default_target` → `targets[]`, else the top-level `provider` config.\n\nCatalog policy is attached to the **target** via `allow` / `deny` glob lists. They apply on every resolution path that produces a target carrying them — including context-driven dispatch — so a `cel` misconfig cannot leak a denied model. A denied model returns 403 (`urn:barbacane:error:model_not_permitted`) and does not fall through to a different provider.\n\nThe `fallback` list is tried in order when the resolved target returns a 5xx or a connection error.", + "description": "Configuration for the AI proxy dispatcher plugin. Exposes a unified OpenAI-compatible API and routes to LLM providers (OpenAI, Anthropic, Ollama). Supports named targets for policy-driven routing (via the `cel` middleware), glob-based dynamic model routing (`routes`), provider fallback on 5xx, and token count propagation into context for downstream middlewares.\n\n**Protocol surfaces** (path-based dispatch, ADR-0030 §1):\n- `POST /v1/chat/completions` — OpenAI Chat Completions (translated for Anthropic).\n- `POST /v1/responses` — OpenAI Responses API, stateless (ADR-0030 §2). Translated for Anthropic. Rejected for Ollama (no upstream Responses surface). `previous_response_id` returns 400; `store: true` is permissive but emits a `Warning: 299` header. The synthetic response `id` is `resp_`.\n- Any other path routes through Chat Completions for backward compatibility with operator-defined custom paths.\n\nThe model identifier is **always** taken from the client's `model` field on the request body (ADR-0030 §0 — caller-owned model). Gateway config declares **providers** (where to go, with what credentials), never an authoritative model list. A request that omits `model` is rejected with 400 problem+json (`urn:barbacane:error:model_required`).\n\nThree resolution modes (evaluated in order):\n- **Context** (`ai.target`): a `cel` middleware writes `ai.target: ` into context; the dispatcher uses `targets[]`.\n- **Routes**: each entry's glob `pattern` is matched against the client's `model`; first match wins.\n- **Default / flat**: `default_target` → `targets[]`, else the top-level `provider` config.\n\nCatalog policy is attached to the **target** via `allow` / `deny` glob lists. They apply on every resolution path that produces a target carrying them — including context-driven dispatch — so a `cel` misconfig cannot leak a denied model. A denied model returns 403 (`urn:barbacane:error:model_not_permitted`) and does not fall through to a different provider.\n\nThe `fallback` list is tried in order when the resolved target returns a 5xx or a connection error.", "type": "object", "$defs": { "GlobPattern": { diff --git a/plugins/ai-proxy/src/lib.rs b/plugins/ai-proxy/src/lib.rs index 8497e33..4217a63 100644 --- a/plugins/ai-proxy/src/lib.rs +++ b/plugins/ai-proxy/src/lib.rs @@ -240,15 +240,34 @@ type ProtocolHandler = impl AiProxy { pub fn dispatch(&mut self, req: Request) -> Response { - // ADR-0030 §1: the dispatcher is protocol-aware via path. PR-4 will - // add an explicit `/v1/responses` arm that routes to - // protocols::responses::handle, and PR-5 a `/v1/models` arm that - // serves the model catalog. Until those exist, every path routes to - // Chat Completions — the operator's choice of operation path is not - // the dispatcher's business when only one protocol is on offer, and - // returning 404 here would force every operator and every test - // fixture to use the canonical `/v1/chat/completions` path. - self.dispatch_chat_completion(req) + // ADR-0030 §1: the dispatcher is protocol-aware via path. The + // canonical OpenAI Responses path takes the Responses adapter; every + // other path (including the canonical /v1/chat/completions and any + // operator-defined custom path) routes to Chat Completions. This + // matches how operators bind ai-proxy via the shipped spec fragment + // (canonical paths) while still supporting custom-path Chat + // Completions fixtures. + match req.path.as_str() { + "/v1/responses" => self.dispatch_responses(req), + // PR-5 will add: "/v1/models" => protocols::models::handle + _ => self.dispatch_chat_completion(req), + } + } + + fn dispatch_responses(&mut self, req: Request) -> Response { + // Reject `previous_response_id` and parse `store` upfront — this is + // the spec-level surface the gateway can validate before resolving a + // target, and it produces stable 4xx responses. + match protocols::responses::ResponsesPreflight::from_body(&req.body) { + Ok(_) => {} + Err(resp) => return resp, + } + + let client_model = match extract_client_model(&req.body) { + Some(m) => m, + None => return model_required_response(), + }; + self.dispatch_with_handler(req, &client_model, protocols::responses::handle) } fn dispatch_chat_completion(&mut self, req: Request) -> Response { diff --git a/plugins/ai-proxy/src/protocols/mod.rs b/plugins/ai-proxy/src/protocols/mod.rs index 736750d..3c19f44 100644 --- a/plugins/ai-proxy/src/protocols/mod.rs +++ b/plugins/ai-proxy/src/protocols/mod.rs @@ -6,3 +6,4 @@ //! layer up in [`crate::dispatch`] so they're shared across protocols. pub mod chat_completion; +pub mod responses; diff --git a/plugins/ai-proxy/src/protocols/responses.rs b/plugins/ai-proxy/src/protocols/responses.rs new file mode 100644 index 0000000..e2dc3ca --- /dev/null +++ b/plugins/ai-proxy/src/protocols/responses.rs @@ -0,0 +1,956 @@ +//! OpenAI Responses API protocol adapter (ADR-0030 §2). +//! +//! Path: `POST /v1/responses`. Stateless only — the gateway never persists +//! conversation state, so: +//! - `previous_response_id` is rejected with 400 `previous_response_id_not_supported`. +//! - `store` is permissive: `true` / `false` / absent all flow through. +//! `store ≠ false` adds a `Warning: 299` header and increments a counter +//! (`barbacane_plugin_ai_proxy_responses_store_downgrades_total`) so +//! operators can quantify stateful-API usage. +//! - The synthetic `id` on the response is `resp_` — time-ordered +//! so logs grep chronologically without a separate sort key. +//! +//! Per-provider behavior: +//! - **OpenAI**: passthrough at `/v1/responses` upstream. +//! - **Anthropic**: translate `input[]` items ↔ Messages API `content` blocks. +//! - **Ollama**: 400 `responses_not_supported_for_provider` (Ollama's +//! OpenAI-compat surface is Chat Completions only as of 2026-04). +//! +//! Streaming: SSE is buffered into a single terminal event for the Anthropic +//! path (mirrors ADR-0024 Chat Completions until true SSE translation lands). +//! The OpenAI passthrough streams normally via `host_http_stream`. + +use crate::providers::openai::{openai_headers, openai_url}; +use crate::{ + error_response, host, http_call, host_http_stream, AiProxy, HttpRequest, Provider, Response, + TargetConfig, +}; +use barbacane_plugin_sdk::prelude::*; +use std::collections::BTreeMap; + +// --------------------------------------------------------------------------- +// Preflight: the cheap, pre-target-resolution checks +// --------------------------------------------------------------------------- + +/// Spec-level validation that runs before target resolution. Reject +/// `previous_response_id` (the only genuinely stateful feature a client can +/// invoke) so operators see 400 at the surface where the client is trying to +/// use state the gateway can't provide. +pub(crate) struct ResponsesPreflight { + /// True when the client sent `store: true` (or omitted it — server-side + /// default per OpenAI). Used downstream to attach a `Warning: 299` header + /// and increment the downgrade counter. + pub store_downgrade: bool, +} + +impl ResponsesPreflight { + pub(crate) fn from_body(body: &Option>) -> Result { + let raw = body.as_deref().unwrap_or(b"{}"); + let v: serde_json::Value = match serde_json::from_slice(raw) { + Ok(v) => v, + // Malformed JSON — let downstream handlers handle their own + // shape errors (the dispatch layer doesn't try to parse here). + Err(_) => return Ok(Self { store_downgrade: false }), + }; + + if v.get("previous_response_id").is_some() + && !matches!(v.get("previous_response_id"), Some(serde_json::Value::Null)) + { + return Err(previous_response_id_not_supported_response()); + } + + // OpenAI defaults `store` to true server-side. Treat `Some(true)` and + // a missing field as downgrade-required; only an explicit `store: false` + // skips the warning. + let store_downgrade = match v.get("store") { + Some(serde_json::Value::Bool(false)) => false, + _ => true, + }; + + Ok(Self { store_downgrade }) + } +} + +// --------------------------------------------------------------------------- +// Per-protocol handler invoked by `dispatch_with_handler` +// --------------------------------------------------------------------------- + +pub(crate) fn handle( + plugin: &AiProxy, + target: &TargetConfig, + req: &Request, + client_model: &str, + streaming: bool, +) -> Result { + match target.provider { + Provider::OpenAI => openai_passthrough(plugin, target, req, streaming), + Provider::Ollama => Ok(responses_not_supported_for_provider_response("ollama")), + Provider::Anthropic => { + // Re-parse the body for translation. The preflight already + // confirmed previous_response_id is absent and captured the + // store flag, but the translator needs the full document. + let raw = req.body.as_deref().unwrap_or(b"{}"); + let body: serde_json::Value = serde_json::from_slice(raw) + .map_err(|e| format!("invalid Responses request body: {}", e))?; + + let preflight = ResponsesPreflight::from_body(&req.body) + .expect("preflight already ran in dispatch"); + + let translation = ResponsesToAnthropic::translate( + &body, + client_model, + streaming, + plugin.max_tokens, + )?; + + // Buffered Anthropic call — true SSE translation deferred per + // ADR-0030 §2; mirror the Chat Completions buffering behavior. + if streaming { + host::log_warn( + "ai-proxy: Anthropic streaming for Responses not yet supported; buffering response", + ); + } + let raw_resp = plugin.anthropic_messages_call_raw(target, translation.body.as_bytes())?; + + // 4xx/5xx pass through as-is (don't mangle upstream errors). + if !(200..300).contains(&raw_resp.status) { + return Ok(raw_resp); + } + + let body_str = raw_resp.body_str().unwrap_or("").to_string(); + let translated = AnthropicToResponses::translate( + &body_str, + client_model, + preflight.store_downgrade, + translation.dropped_reasoning_count, + )?; + + // Annotate the response with Warning headers + emit counters so + // operators can quantify both "this client sends store: true" and + // "this client sends reasoning items we dropped". + let mut headers = raw_resp.headers; + attach_warnings(&mut headers, preflight.store_downgrade, translation.dropped_reasoning_count > 0); + + if preflight.store_downgrade { + host::metric_counter_inc( + "responses_store_downgrades_total", + &crate::labels1("provider", target.provider.name()), + 1, + ); + } + if translation.dropped_reasoning_count > 0 { + host::metric_counter_inc( + "responses_reasoning_dropped_total", + &crate::labels1("provider", target.provider.name()), + translation.dropped_reasoning_count as u64, + ); + } + + Ok(Response { + status: raw_resp.status, + headers, + body: Some(translated.into_bytes()), + }) + } + } +} + +// --------------------------------------------------------------------------- +// OpenAI passthrough — the wire format already matches `/v1/responses`, +// so this is just an HTTP call against `${base_url}/v1/responses`. +// Streaming is delegated to `host_http_stream` like Chat Completions. +// --------------------------------------------------------------------------- + +fn openai_passthrough( + plugin: &AiProxy, + target: &TargetConfig, + req: &Request, + streaming: bool, +) -> Result { + let url = openai_url(target, &req.path); + let mut headers = openai_headers(target); + if streaming { + headers.insert("accept".to_string(), "text/event-stream".to_string()); + } + + if let Some(b) = req.body.as_ref() { + set_http_request_body(b); + } + + let http_req = HttpRequest { + method: req.method.clone(), + url, + headers, + timeout_ms: Some(plugin.timeout * 1000), + }; + + if streaming { + let req_json = serde_json::to_vec(&http_req).map_err(|e| e.to_string())?; + let result = + unsafe { host_http_stream(req_json.as_ptr() as i32, req_json.len() as i32) }; + if result < 0 { + return Err("upstream stream failed".to_string()); + } + Ok(streamed_response()) + } else { + let resp_bytes = http_call(&http_req)?; + Ok(crate::build_response(resp_bytes)) + } +} + +// --------------------------------------------------------------------------- +// Responses → Anthropic translation +// --------------------------------------------------------------------------- + +/// Result of translating a Responses-format request body into the equivalent +/// Anthropic Messages body. +pub(crate) struct ResponsesToAnthropic { + /// JSON body to send to Anthropic's `/v1/messages`. + pub body: String, + /// Number of `reasoning` items that were dropped during translation. + /// Anthropic doesn't accept client-supplied reasoning input — silently + /// dropping them can degrade output quality on multi-turn agent flows, + /// so the dispatcher emits a `Warning: 299` header and a counter + /// whenever this is non-zero. + pub dropped_reasoning_count: usize, +} + +impl ResponsesToAnthropic { + pub(crate) fn translate( + responses: &serde_json::Value, + client_model: &str, + stream: bool, + default_max_tokens: Option, + ) -> Result { + let input_items = responses + .get("input") + .and_then(|v| v.as_array()) + .ok_or("Responses request must include `input` array")?; + + let mut system_parts: Vec = Vec::new(); + if let Some(instructions) = responses.get("instructions").and_then(|v| v.as_str()) { + // Responses API's `instructions` field maps to Anthropic's + // top-level `system` field. + system_parts.push(instructions.to_string()); + } + + let mut messages: Vec = Vec::new(); + let mut current_role: Option = None; + let mut current_blocks: Vec = Vec::new(); + let mut dropped_reasoning_count = 0usize; + + // Helper: flush the buffer into a message when the role changes. + let flush = + |role: Option<&String>, + blocks: &mut Vec, + messages: &mut Vec| { + if let Some(r) = role { + if !blocks.is_empty() { + messages.push(serde_json::json!({ + "role": r, + "content": std::mem::take(blocks), + })); + } + } + }; + + for item in input_items { + let item_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); + let role = item.get("role").and_then(|v| v.as_str()).unwrap_or("user"); + + match item_type { + "input_text" | "" => { + // Plain message (no `type`) or explicitly `input_text`. + let content = item.get("content").or_else(|| item.get("text")); + let blocks = build_text_or_array_blocks(content); + if role == "system" { + // Hoist the text into Anthropic's system field. + for block in blocks { + if let Some(t) = block.get("text").and_then(|v| v.as_str()) { + system_parts.push(t.to_string()); + } + } + } else { + if current_role.as_deref() != Some(role) { + flush(current_role.as_ref(), &mut current_blocks, &mut messages); + current_role = Some(role.to_string()); + } + current_blocks.extend(blocks); + } + } + "input_image" => { + // `input_image` carries either `image_url` or base64 + // bytes. Anthropic accepts both via the `image` block; + // we forward whatever we got and let the upstream tell + // the client if the format is wrong. + let block = build_image_block(item); + if current_role.as_deref() != Some(role) { + flush(current_role.as_ref(), &mut current_blocks, &mut messages); + current_role = Some(role.to_string()); + } + current_blocks.push(block); + } + "function_call" => { + // Assistant-issued tool call → Anthropic `tool_use` block. + let tool_use = serde_json::json!({ + "type": "tool_use", + "id": item.get("call_id").or_else(|| item.get("id")).cloned().unwrap_or(serde_json::Value::Null), + "name": item.get("name").cloned().unwrap_or(serde_json::Value::Null), + "input": item + .get("arguments") + .and_then(|v| v.as_str()) + .and_then(|s| serde_json::from_str::(s).ok()) + .unwrap_or(serde_json::Value::Object(Default::default())), + }); + if current_role.as_deref() != Some("assistant") { + flush(current_role.as_ref(), &mut current_blocks, &mut messages); + current_role = Some("assistant".to_string()); + } + current_blocks.push(tool_use); + } + "function_call_output" => { + // Client-provided tool result → Anthropic `tool_result` block. + let tool_result = serde_json::json!({ + "type": "tool_result", + "tool_use_id": item.get("call_id").cloned().unwrap_or(serde_json::Value::Null), + "content": item.get("output").cloned().unwrap_or(serde_json::Value::String(String::new())), + }); + if current_role.as_deref() != Some("user") { + flush(current_role.as_ref(), &mut current_blocks, &mut messages); + current_role = Some("user".to_string()); + } + current_blocks.push(tool_result); + } + "reasoning" => { + // Anthropic does not accept client-supplied reasoning input. + // Drop, count, and signal via Warning header + metric upstack. + dropped_reasoning_count += 1; + } + other => { + // Unknown item type — drop silently rather than fail. + // Forward-compatible: a future OpenAI item type that + // Barbacane doesn't know yet shouldn't break translation. + host::log_warn(&format!( + "ai-proxy: unknown Responses input item type {:?}; dropping", + other + )); + } + } + } + flush(current_role.as_ref(), &mut current_blocks, &mut messages); + + // Anthropic requires `max_tokens`. The translator falls back to the + // dispatcher's default; if that's also unset, we use 4096 — same + // floor as the Chat Completions translator. + let max_tokens = responses + .get("max_output_tokens") + .and_then(|v| v.as_u64()) + .map(|v| v as u32) + .or(default_max_tokens) + .unwrap_or(4096); + + let mut anthropic = serde_json::Map::new(); + anthropic.insert("model".to_string(), serde_json::Value::String(client_model.to_string())); + anthropic.insert("messages".to_string(), serde_json::Value::Array(messages)); + if !system_parts.is_empty() { + anthropic.insert( + "system".to_string(), + serde_json::Value::String(system_parts.join("\n")), + ); + } + anthropic.insert( + "max_tokens".to_string(), + serde_json::Value::Number(max_tokens.into()), + ); + if let Some(t) = responses.get("temperature").cloned() { + anthropic.insert("temperature".to_string(), t); + } + if let Some(t) = responses.get("top_p").cloned() { + anthropic.insert("top_p".to_string(), t); + } + if stream { + anthropic.insert("stream".to_string(), serde_json::Value::Bool(true)); + } + + let body = serde_json::to_string(&serde_json::Value::Object(anthropic)) + .map_err(|e| e.to_string())?; + Ok(Self { + body, + dropped_reasoning_count, + }) + } +} + +/// Convert `input_text`'s `content` field into a vec of Anthropic text blocks. +/// Accepts either a plain string or the OpenAI array-of-parts shape. +fn build_text_or_array_blocks(value: Option<&serde_json::Value>) -> Vec { + match value { + Some(serde_json::Value::String(s)) => { + vec![serde_json::json!({ "type": "text", "text": s })] + } + Some(serde_json::Value::Array(parts)) => parts + .iter() + .filter_map(|part| { + let pt = part.get("type").and_then(|v| v.as_str()).unwrap_or("input_text"); + match pt { + "input_text" | "text" => part + .get("text") + .and_then(|v| v.as_str()) + .map(|t| serde_json::json!({ "type": "text", "text": t })), + "input_image" => Some(build_image_block(part)), + _ => None, + } + }) + .collect(), + _ => Vec::new(), + } +} + +/// Build an Anthropic `image` content block from an OpenAI Responses +/// `input_image` item (or part). Forwards whatever URL/base64 form the client +/// sent — Anthropic returns its own error if the source isn't accepted. +fn build_image_block(part: &serde_json::Value) -> serde_json::Value { + if let Some(url) = part.get("image_url").and_then(|v| v.as_str()) { + return serde_json::json!({ + "type": "image", + "source": { "type": "url", "url": url } + }); + } + serde_json::json!({ + "type": "image", + "source": part.get("image").cloned().unwrap_or(serde_json::Value::Null), + }) +} + +// --------------------------------------------------------------------------- +// Anthropic → Responses translation +// --------------------------------------------------------------------------- + +struct AnthropicToResponses; + +impl AnthropicToResponses { + fn translate( + body: &str, + client_model: &str, + store_downgrade: bool, + dropped_reasoning_count: usize, + ) -> Result { + let _ = (store_downgrade, dropped_reasoning_count); // signaled via headers/metrics, not body + let anthropic: serde_json::Value = + serde_json::from_str(body).map_err(|e| format!("invalid Anthropic response: {}", e))?; + + let mut output_items: Vec = Vec::new(); + if let Some(content) = anthropic.get("content").and_then(|v| v.as_array()) { + for block in content { + let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or(""); + match block_type { + "text" => { + let text = block.get("text").and_then(|v| v.as_str()).unwrap_or(""); + output_items.push(serde_json::json!({ + "type": "output_text", + "text": text, + "annotations": [], + })); + } + "tool_use" => { + output_items.push(serde_json::json!({ + "type": "function_call", + "call_id": block.get("id"), + "name": block.get("name"), + "arguments": block.get("input").map(|v| v.to_string()), + })); + } + other => { + // Anthropic-introduced block types we don't recognize: + // pass through under a generic `unknown` shape so the + // client at least sees something rather than dropping. + output_items.push(serde_json::json!({ + "type": format!("unknown:{}", other), + "raw": block, + })); + } + } + } + } + + let input_tokens = anthropic + .get("usage") + .and_then(|u| u.get("input_tokens")) + .and_then(|v| v.as_u64()) + .unwrap_or(0); + let output_tokens = anthropic + .get("usage") + .and_then(|u| u.get("output_tokens")) + .and_then(|v| v.as_u64()) + .unwrap_or(0); + + // ADR-0030 §2: synthetic id is uuid-v7 — time-ordered, opaque, non- + // retrievable (the gateway is stateless). Built manually from the + // host clock + a per-instance counter so we don't drag a wasm32- + // unsupported RNG dep in for a non-cryptographic id. + let id = format!("resp_{}", make_uuid_v7()); + + let response = serde_json::json!({ + "id": id, + "object": "response", + "created_at": now_secs(), + "status": "completed", + "model": client_model, + "output": output_items, + "usage": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + }); + + serde_json::to_string(&response).map_err(|e| e.to_string()) + } +} + +fn now_secs() -> u64 { + host::time_now_ms() / 1000 +} + +/// Build a UUID v7 (RFC 9562) using the host clock for the upper 48 timestamp +/// bits and a per-instance counter for the rand_a / rand_b portions. +/// +/// The wasm32-unknown-unknown target has no system RNG, so a true random tail +/// would force pulling in `getrandom`'s JS backend (which doesn't exist in +/// the Barbacane runtime) or adding a `host_random_bytes` capability. We +/// don't need cryptographic randomness for a non-retrievable opaque +/// identifier — the v7 spec only requires monotonicity within a node, which +/// the counter satisfies. Two ids generated in the same millisecond differ +/// in their counter portion; ids in successive milliseconds order +/// chronologically by the timestamp. +fn make_uuid_v7() -> uuid::Uuid { + use std::cell::Cell; + thread_local! { + static COUNTER: Cell = const { Cell::new(0) }; + } + + let timestamp_ms = host::time_now_ms(); + let counter = COUNTER.with(|c| { + let next = c.get().wrapping_add(1); + c.set(next); + next + }); + + let mut bytes = [0u8; 16]; + // bits 0..47 — timestamp in milliseconds (big-endian, 48 bits). + bytes[0..6].copy_from_slice(×tamp_ms.to_be_bytes()[2..8]); + // bits 48..51 — version = 0b0111 + // bits 52..63 — rand_a (12 bits) — top 12 bits of the counter. + bytes[6] = 0x70 | ((counter >> 8) & 0x0F) as u8; + bytes[7] = (counter & 0xFF) as u8; + // bits 64..65 — variant = 0b10 + // bits 66..127 — rand_b (62 bits) — remaining counter bytes + a fixed + // suffix (we don't have entropy; suffix can be a hash/marker but is + // not security-critical). + bytes[8] = 0x80 | ((counter >> 56) & 0x3F) as u8; + bytes[9] = (counter >> 48) as u8; + bytes[10] = (counter >> 40) as u8; + bytes[11] = (counter >> 32) as u8; + bytes[12] = (counter >> 24) as u8; + bytes[13] = (counter >> 16) as u8; + bytes[14] = (counter >> 8) as u8; + bytes[15] = counter as u8; + + uuid::Uuid::from_bytes(bytes) +} + +// --------------------------------------------------------------------------- +// Warning header construction +// --------------------------------------------------------------------------- + +/// Append `Warning: 299` values to the response headers per ADR-0030 §2. +/// HTTP allows multiple Warning entries; we comma-join into a single value +/// since the SDK's `Response.headers` is `BTreeMap`. +fn attach_warnings( + headers: &mut BTreeMap, + store_downgrade: bool, + reasoning_dropped: bool, +) { + let mut parts: Vec<&str> = Vec::new(); + if store_downgrade { + parts.push(r#"299 - "store ignored; gateway is stateless, see ADR-0030""#); + } + if reasoning_dropped { + parts.push( + r#"299 - "reasoning items dropped; Anthropic upstream does not accept client-supplied reasoning input""#, + ); + } + if parts.is_empty() { + return; + } + let merged = parts.join(", "); + headers + .entry("warning".to_string()) + .and_modify(|existing| { + existing.push_str(", "); + existing.push_str(&merged); + }) + .or_insert(merged); +} + +// --------------------------------------------------------------------------- +// problem+json error helpers +// --------------------------------------------------------------------------- + +fn previous_response_id_not_supported_response() -> Response { + let body = serde_json::json!({ + "type": "urn:barbacane:error:previous_response_id_not_supported", + "title": "Bad Request", + "status": 400, + "code": "previous_response_id_not_supported", + "detail": "ai-proxy: this gateway is stateless. \ + `previous_response_id` requires server-side conversation \ + storage that ADR-0030 §2 explicitly defers. Resend the \ + full `input[]` each turn.", + }); + let mut headers = BTreeMap::new(); + headers.insert( + "content-type".to_string(), + "application/problem+json".to_string(), + ); + Response { + status: 400, + headers, + body: Some(serde_json::to_vec(&body).unwrap_or_default()), + } +} + +fn responses_not_supported_for_provider_response(provider: &str) -> Response { + let body = serde_json::json!({ + "type": "urn:barbacane:error:responses_not_supported_for_provider", + "title": "Bad Request", + "status": 400, + "code": "responses_not_supported_for_provider", + "detail": format!( + "ai-proxy: provider {:?} does not implement the OpenAI Responses API \ + (no upstream surface to translate to or passthrough). Use \ + `/v1/chat/completions` instead, or route to OpenAI/Anthropic.", + provider + ), + }); + let mut headers = BTreeMap::new(); + headers.insert( + "content-type".to_string(), + "application/problem+json".to_string(), + ); + Response { + status: 400, + headers, + body: Some(serde_json::to_vec(&body).unwrap_or_default()), + } +} + +// Re-export so dispatch can use the same helper without duplicating the +// problem+json shape (the orchestration layer already has error_response +// for generic 5xx — these are domain-specific 400 cases). +#[allow(dead_code)] +fn _domain_errors_module_is_used(_: ()) { + // Keep the error_response import alive even when no caller uses it + // directly within this file (dispatch layer constructs the responses). + let _ = error_response; +} + +#[cfg(test)] +mod tests { + use super::*; + + // --- Preflight --- + + #[test] + fn preflight_passes_when_previous_response_id_absent() { + let body = Some(br#"{"input":[]}"#.to_vec()); + let pre = ResponsesPreflight::from_body(&body).expect("ok"); + assert!(pre.store_downgrade); // store omitted → downgrade + } + + #[test] + fn preflight_rejects_previous_response_id_with_400() { + let body = Some(br#"{"input":[],"previous_response_id":"resp_abc"}"#.to_vec()); + let resp = match ResponsesPreflight::from_body(&body) { + Ok(_) => panic!("must reject"), + Err(r) => r, + }; + assert_eq!(resp.status, 400); + let body: serde_json::Value = + serde_json::from_slice(resp.body.as_ref().unwrap()).unwrap(); + assert_eq!(body["code"], "previous_response_id_not_supported"); + assert_eq!( + body["type"], + "urn:barbacane:error:previous_response_id_not_supported" + ); + } + + #[test] + fn preflight_treats_explicit_null_previous_response_id_as_absent() { + // OpenAI clients sometimes serialize Optional fields as null. Don't + // 400 those — treat them as if the field weren't there. + let body = Some(br#"{"input":[],"previous_response_id":null}"#.to_vec()); + ResponsesPreflight::from_body(&body).expect("null pri must be allowed"); + } + + #[test] + fn preflight_store_true_triggers_downgrade() { + let body = Some(br#"{"input":[],"store":true}"#.to_vec()); + assert!(ResponsesPreflight::from_body(&body).unwrap().store_downgrade); + } + + #[test] + fn preflight_store_false_skips_downgrade() { + let body = Some(br#"{"input":[],"store":false}"#.to_vec()); + assert!(!ResponsesPreflight::from_body(&body).unwrap().store_downgrade); + } + + #[test] + fn preflight_store_omitted_treated_as_default_true() { + // OpenAI server-side default for store is true; clients that don't + // send it expect server-side persistence. We downgrade. + let body = Some(br#"{"input":[]}"#.to_vec()); + assert!(ResponsesPreflight::from_body(&body).unwrap().store_downgrade); + } + + // --- Responses → Anthropic translation --- + + fn translate_in(json: &str) -> ResponsesToAnthropic { + let v: serde_json::Value = serde_json::from_str(json).unwrap(); + ResponsesToAnthropic::translate(&v, "claude-sonnet-4-6", false, Some(1024)) + .expect("translate") + } + + #[test] + fn translate_in_input_text_becomes_text_block() { + let res = translate_in( + r#"{"input":[{"type":"input_text","role":"user","content":"Hello"}]}"#, + ); + let body: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + let messages = body["messages"].as_array().unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0]["role"], "user"); + let blocks = messages[0]["content"].as_array().unwrap(); + assert_eq!(blocks[0]["type"], "text"); + assert_eq!(blocks[0]["text"], "Hello"); + } + + #[test] + fn translate_in_instructions_hoist_to_system() { + let res = translate_in( + r#"{"instructions":"Be concise.","input":[{"type":"input_text","role":"user","content":"hi"}]}"#, + ); + let body: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + assert_eq!(body["system"], "Be concise."); + } + + #[test] + fn translate_in_function_call_becomes_tool_use() { + let res = translate_in( + r#"{"input":[ + {"type":"function_call","role":"assistant","call_id":"call_1","name":"get_time","arguments":"{\"tz\":\"UTC\"}"} + ]}"#, + ); + let body: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + let block = &body["messages"][0]["content"][0]; + assert_eq!(block["type"], "tool_use"); + assert_eq!(block["id"], "call_1"); + assert_eq!(block["name"], "get_time"); + assert_eq!(block["input"]["tz"], "UTC"); + } + + #[test] + fn translate_in_function_call_output_becomes_tool_result() { + let res = translate_in( + r#"{"input":[ + {"type":"function_call_output","call_id":"call_1","output":"2026-04-30T12:00:00Z"} + ]}"#, + ); + let body: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + let block = &body["messages"][0]["content"][0]; + assert_eq!(block["type"], "tool_result"); + assert_eq!(block["tool_use_id"], "call_1"); + assert_eq!(block["content"], "2026-04-30T12:00:00Z"); + } + + #[test] + fn translate_in_reasoning_items_dropped_and_counted() { + let res = translate_in( + r#"{"input":[ + {"type":"reasoning","summary":"thinking..."}, + {"type":"input_text","role":"user","content":"hi"}, + {"type":"reasoning","summary":"more thinking"} + ]}"#, + ); + assert_eq!(res.dropped_reasoning_count, 2); + let body: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + // Only the input_text item should remain in messages. + let blocks = body["messages"][0]["content"].as_array().unwrap(); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0]["type"], "text"); + } + + #[test] + fn translate_in_max_output_tokens_used_or_default() { + let with_explicit = translate_in( + r#"{"max_output_tokens":42,"input":[{"type":"input_text","role":"user","content":"hi"}]}"#, + ); + let v: serde_json::Value = serde_json::from_str(&with_explicit.body).unwrap(); + assert_eq!(v["max_tokens"], 42); + + let without = + translate_in(r#"{"input":[{"type":"input_text","role":"user","content":"hi"}]}"#); + let v: serde_json::Value = serde_json::from_str(&without.body).unwrap(); + // Falls back to the dispatcher's default_max_tokens=1024 (test config). + assert_eq!(v["max_tokens"], 1024); + } + + #[test] + fn translate_in_uses_client_model_not_request_field() { + // Responses requests carry `model` in the body; the translator must + // use the caller-owned value plumbed in (ADR-0030 §0), not parse + // the body field separately. + let res = translate_in( + r#"{"model":"some-other-model","input":[{"type":"input_text","role":"user","content":"hi"}]}"#, + ); + let v: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + assert_eq!(v["model"], "claude-sonnet-4-6"); + } + + // --- Anthropic → Responses translation --- + + fn translate_out(body: &str) -> serde_json::Value { + let s = AnthropicToResponses::translate(body, "claude-sonnet-4-6", false, 0).unwrap(); + serde_json::from_str(&s).unwrap() + } + + #[test] + fn translate_out_text_block_becomes_output_text() { + let v = translate_out( + r#"{ + "id":"msg_1","type":"message","role":"assistant","model":"claude-sonnet-4-6", + "content":[{"type":"text","text":"Hello"}], + "stop_reason":"end_turn", + "usage":{"input_tokens":3,"output_tokens":1} + }"#, + ); + assert_eq!(v["object"], "response"); + assert_eq!(v["status"], "completed"); + let item = &v["output"][0]; + assert_eq!(item["type"], "output_text"); + assert_eq!(item["text"], "Hello"); + assert!(item["annotations"].is_array()); + } + + #[test] + fn translate_out_tool_use_block_becomes_function_call() { + let v = translate_out( + r#"{ + "id":"msg_2","model":"claude-sonnet-4-6", + "content":[{"type":"tool_use","id":"call_42","name":"get_time","input":{"tz":"UTC"}}], + "usage":{"input_tokens":1,"output_tokens":1} + }"#, + ); + let item = &v["output"][0]; + assert_eq!(item["type"], "function_call"); + assert_eq!(item["call_id"], "call_42"); + assert_eq!(item["name"], "get_time"); + // Arguments are serialized as a JSON string per the OpenAI Responses + // contract — clients re-parse them. + assert!(item["arguments"].as_str().unwrap().contains("UTC")); + } + + #[test] + fn translate_out_synthetic_id_is_resp_uuid_v7() { + let v = translate_out( + r#"{"id":"msg","model":"x","content":[],"usage":{"input_tokens":0,"output_tokens":0}}"#, + ); + let id = v["id"].as_str().unwrap(); + assert!(id.starts_with("resp_"), "id should start with resp_, got {}", id); + // After the prefix is a 36-character UUID dashed-hex form. + let uuid_part = id.trim_start_matches("resp_"); + assert_eq!(uuid_part.len(), 36, "{}", uuid_part); + // Version-7 UUIDs have the version nibble (13th hex char of the + // dashed form, position 14 due to the dashes) equal to '7'. + assert_eq!( + uuid_part.as_bytes()[14], + b'7', + "expected v7 nibble, got {}", + uuid_part + ); + } + + #[test] + fn translate_out_uses_caller_supplied_model_not_anthropic_model() { + // ADR-0030 §0: ai.model and the response.model are the client's + // requested model. Even if Anthropic echoes a different model + // string, the gateway returns the caller-owned value. + let v = translate_out( + r#"{"id":"msg","model":"claude-anything-anthropic-says","content":[],"usage":{"input_tokens":0,"output_tokens":0}}"#, + ); + assert_eq!(v["model"], "claude-sonnet-4-6"); + } + + #[test] + fn translate_out_usage_maps_directly() { + let v = translate_out( + r#"{"id":"msg","model":"x","content":[],"usage":{"input_tokens":42,"output_tokens":10}}"#, + ); + assert_eq!(v["usage"]["input_tokens"], 42); + assert_eq!(v["usage"]["output_tokens"], 10); + assert_eq!(v["usage"]["total_tokens"], 52); + } + + // --- Domain error responses --- + + #[test] + fn responses_not_supported_for_provider_shape() { + let resp = responses_not_supported_for_provider_response("ollama"); + assert_eq!(resp.status, 400); + let body: serde_json::Value = + serde_json::from_slice(resp.body.as_ref().unwrap()).unwrap(); + assert_eq!(body["code"], "responses_not_supported_for_provider"); + assert_eq!( + body["type"], + "urn:barbacane:error:responses_not_supported_for_provider" + ); + assert!(body["detail"].as_str().unwrap().contains("ollama")); + } + + // --- Warning header --- + + #[test] + fn warnings_attached_for_store_only() { + let mut headers: BTreeMap = BTreeMap::new(); + attach_warnings(&mut headers, true, false); + let warning = headers.get("warning").expect("warning set"); + assert!(warning.contains("store ignored")); + assert!(!warning.contains("reasoning items dropped")); + } + + #[test] + fn warnings_attached_for_reasoning_only() { + let mut headers: BTreeMap = BTreeMap::new(); + attach_warnings(&mut headers, false, true); + let warning = headers.get("warning").expect("warning set"); + assert!(warning.contains("reasoning items dropped")); + assert!(!warning.contains("store ignored")); + } + + #[test] + fn warnings_merged_when_both_fire() { + let mut headers: BTreeMap = BTreeMap::new(); + attach_warnings(&mut headers, true, true); + let warning = headers.get("warning").expect("warning set"); + assert!(warning.contains("store ignored")); + assert!(warning.contains("reasoning items dropped")); + } + + #[test] + fn warnings_skipped_when_neither_fires() { + let mut headers: BTreeMap = BTreeMap::new(); + attach_warnings(&mut headers, false, false); + assert!(headers.get("warning").is_none()); + } +} + diff --git a/plugins/ai-proxy/src/providers/anthropic.rs b/plugins/ai-proxy/src/providers/anthropic.rs index 69fdefa..d7eb13e 100644 --- a/plugins/ai-proxy/src/providers/anthropic.rs +++ b/plugins/ai-proxy/src/providers/anthropic.rs @@ -1,6 +1,7 @@ -//! Anthropic Messages API transport. The Chat Completions ↔ Messages -//! translation lives in [`crate::protocols::chat_completion`]; this module -//! only handles the wire format and request building. +//! Anthropic Messages API transport. The protocol-specific translation +//! (Chat Completions, Responses) lives in [`crate::protocols`]; this module +//! handles only the wire format — building the request, sending it, and +//! returning the raw upstream response. //! //! API version pinned to 2024-10-22 (ADR-0024 contract-test-and-bump). @@ -15,12 +16,14 @@ use std::collections::BTreeMap; pub(crate) const ANTHROPIC_API_VERSION: &str = "2024-10-22"; impl AiProxy { - pub(crate) fn anthropic_call( + /// Send a pre-built Anthropic Messages body to the target's `/v1/messages` + /// endpoint and return the raw upstream `Response` (status + headers + + /// body). Translation in/out lives in the calling protocol module — this + /// helper only handles authentication, the version header, and transport. + pub(crate) fn anthropic_messages_call_raw( &self, target: &TargetConfig, - req: &Request, - client_model: &str, - stream: bool, + body: &[u8], ) -> Result { let base = target.effective_base_url().trim_end_matches('/'); let url = format!("{}/v1/messages", base); @@ -35,8 +38,7 @@ impl AiProxy { headers.insert("x-api-key".to_string(), key.clone()); } - let body = translate_to_anthropic(&req.body, client_model, stream, self.max_tokens)?; - set_http_request_body(body.as_bytes()); + set_http_request_body(body); let http_req = HttpRequest { method: "POST".to_string(), @@ -46,7 +48,21 @@ impl AiProxy { }; let resp_bytes = http_call(&http_req)?; - let resp = build_response(resp_bytes); + Ok(build_response(resp_bytes)) + } + + /// Chat Completions ↔ Messages dispatch entrypoint. Builds the Anthropic + /// Messages body from the OpenAI-format request, calls the upstream, and + /// translates the response back. Used by [`crate::protocols::chat_completion`]. + pub(crate) fn anthropic_call( + &self, + target: &TargetConfig, + req: &Request, + client_model: &str, + stream: bool, + ) -> Result { + let body = translate_to_anthropic(&req.body, client_model, stream, self.max_tokens)?; + let resp = self.anthropic_messages_call_raw(target, body.as_bytes())?; // Only translate 2xx responses; pass error responses through as-is if resp.status >= 200 && resp.status < 300 { From 57622c4ffc940f71f2a8d1c8fd417a951812a5b3 Mon Sep 17 00:00:00 2001 From: Nicolas Dreno Date: Mon, 4 May 2026 15:51:22 +0200 Subject: [PATCH 2/2] review(ai-proxy): address PR review items on Responses API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit R1 (load-bearing) — OpenAI passthrough was leaking the upstream response `id` to the client, violating ADR-0030 §2's stateless-uniform contract. A client could read OpenAI's real id and send it back as `previous_response_id`, which the gateway rejects with 400 — the bug is the inconsistency between leaking the real id and rejecting its reuse. Fix: in the non-streaming passthrough path, parse the 2xx upstream body, replace `id` with a synthetic `resp_`, re-serialize. Streaming SSE rewrite is harder (the id is buried in `response.created` event payloads); marked as a known gap inline, inheriting ADR-0030 §2's existing SSE deferral. R2 — `AnthropicToResponses::translate` had two unused parameters (`store_downgrade`, `dropped_reasoning_count`) carried forward "for symmetry" with a `let _ = (...);` discard. Both signals flow through headers/metrics, never the body. Drop the parameters; less to read, less to wonder about. R3 — The Anthropic path was parsing the request body 4 times: preflight, extract_client_model, the handler's full-body translation, and again inside `ResponsesPreflight::from_body` with `.expect("preflight already ran in dispatch")` to recover `store_downgrade`. Stash the flag on context (`ai.responses.store_downgrade`) in `dispatch_responses`; the handler reads it back via `host::context_get`. Eliminates the .expect smell and one parse. Mirrors how `ai.target` already flows from cel into ai-proxy. R4 — `make_uuid_v7` doc-comment was over-promising "Two ids generated in the same millisecond differ in their counter portion." Tightened to "by the same plugin instance" + made the per-instance reset and 2^64 wrap explicit. Both inconsequential at realistic throughput, but worth saying out loud. R5 — `responses_not_supported_for_provider_response` took a `&str` and the call site passed the literal `"ollama"`. Type as `Provider`, call `.name()` for the body string. Compile-time guarantee against typos. R6 — `build_text_or_array_blocks` silently dropped unknown content-part types while the top-level item handler logs a warning. Made consistent — part-type drops also log via `host::log_warn`, so a future OpenAI part-type that Barbacane doesn't know yet stays diagnosable. R7 — The `flush` closure inside `ResponsesToAnthropic::translate` captured three `&mut`s and was invoked 4 times. Promoted to a private `flush_message(role, blocks, messages)` helper. No semantic change. Reads better; new tests can exercise it directly. R8 — Added unit test for interleaved [user, assistant, user] role sequences in `input[]` to lock in `flush_message`'s ordering — the most plausible regression site for a future change to the translation loop. Plus a coalescing test (consecutive same-role items land in one message) and two flush_message direct tests (no-op on empty buffer and on missing role). Tests: plugin 90 → 97 (+7: role-switch, coalescing, two flush_message direct, three rewrite_response_id_if_2xx covering 2xx, 4xx pass-through, and unparseable body pass-through). Integration 15 → 15 (the existing test renamed to `..._rewrites_id` and now asserts the id is synthetic + not equal to the upstream's real id, which would have caught the original bug). --- CHANGELOG.md | 2 +- README.md | 2 +- crates/barbacane-test/tests/ai_proxy.rs | 26 +- plugins/ai-proxy/src/lib.rs | 14 +- plugins/ai-proxy/src/protocols/responses.rs | 269 ++++++++++++++++---- 5 files changed, 251 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08f08a3..a9b66dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Named-profile + CEL composition pattern**: all four AI middlewares read a `context_key` (default `ai.policy`, overridable) to select the active profile. A `cel` middleware upstream writes `ai.policy` via `on_match.set_context`; one CEL decision fans out to prompt strictness, token budget, redaction strictness, and the `ai-proxy` dispatcher's named targets (via `ai.target`). ### Added -- **plugin**: `ai-proxy` `POST /v1/responses` — OpenAI Responses API support, stateless only (ADR-0030 §2). For OpenAI provider, the dispatcher passes through to the upstream `/v1/responses`. For Anthropic, the request is translated to Messages API: `input_text`/`input_image` → `text`/`image` content blocks, `function_call` + `function_call_output` → `tool_use` + `tool_result`, `reasoning` items are dropped (Anthropic doesn't accept client-supplied reasoning). The response is translated back to Responses shape with a synthetic time-ordered `id` (format `resp_`). For Ollama, returns 400 `responses_not_supported_for_provider` (Ollama's OpenAI-compat surface is Chat Completions only). +- **plugin**: `ai-proxy` `POST /v1/responses` — OpenAI Responses API support, stateless only (ADR-0030 §2). For OpenAI provider, the dispatcher passes through to the upstream `/v1/responses` and **rewrites the response `id` to a synthetic `resp_`** so the gateway's stateless contract holds uniformly across providers — without this, OpenAI's real id leaks to the client and they could send it back as `previous_response_id` (which we 400 on). For Anthropic, the request is translated to Messages API: `input_text`/`input_image` → `text`/`image` content blocks, `function_call` + `function_call_output` → `tool_use` + `tool_result`, `reasoning` items are dropped (Anthropic doesn't accept client-supplied reasoning). The response is translated back to Responses shape with a synthetic time-ordered `id`. For Ollama, returns 400 `responses_not_supported_for_provider` (Ollama's OpenAI-compat surface is Chat Completions only). Streaming SSE on the OpenAI passthrough does not rewrite the in-event id — true SSE handling is deferred for both protocols (ADR-0030 §2 "Streaming"). - **plugin**: `ai-proxy` `previous_response_id` returns 400 `previous_response_id_not_supported`. The stateful Responses API (`previous_response_id` + `GET /v1/responses/{id}` retrieval) requires session-scoped storage that ADR-0030 §2 explicitly defers; the rejection is the forward-compatibility hook. - **plugin**: `ai-proxy` `store` flag is permissive — `true`, `false`, and absent all flow through unchanged. When `store ≠ false` (most clients send `true` as an unexamined default), the dispatcher emits a `Warning: 299 - "store ignored; gateway is stateless, see ADR-0030"` header and increments `barbacane_plugin_ai_proxy_responses_store_downgrades_total`. Operators can quantify stateful-API usage and decide whether to prioritize the future session-storage capability. - **plugin**: `ai-proxy` `reasoning` items dropped on the Responses → Anthropic translation path emit `Warning: 299 - "reasoning items dropped..."` and increment `barbacane_plugin_ai_proxy_responses_reasoning_dropped_total`. Silent reasoning drops can degrade output quality on multi-turn agent flows in ways the client cannot detect. diff --git a/README.md b/README.md index f531af0..d978572 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ CI Documentation Unit Tests - Plugin Tests + Plugin Tests Integration Tests CLI Tests UI Tests diff --git a/crates/barbacane-test/tests/ai_proxy.rs b/crates/barbacane-test/tests/ai_proxy.rs index 586c03c..5069f0f 100644 --- a/crates/barbacane-test/tests/ai_proxy.rs +++ b/crates/barbacane-test/tests/ai_proxy.rs @@ -671,17 +671,21 @@ paths: } #[tokio::test] -async fn test_ai_proxy_responses_openai_passthrough() { - // Mock the canonical OpenAI Responses endpoint upstream and verify the - // gateway just passes through (no translation for the OpenAI provider). +async fn test_ai_proxy_responses_openai_passthrough_rewrites_id() { + // ADR-0030 §2 — the gateway is uniformly stateless. Even on the OpenAI + // passthrough path we must rewrite the upstream `id` to a synthetic + // `resp_`; otherwise OpenAI's real id leaks to the client and + // they could send it back as `previous_response_id` (which we 400 on). let mock_server = MockServer::start().await; + let upstream_id = "resp_real_openai_should_not_leak"; Mock::given(method("POST")) .and(path("/v1/responses")) .respond_with( ResponseTemplate::new(200) - .set_body_string( - r#"{"id":"resp_abc","object":"response","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}"#, - ) + .set_body_string(format!( + r#"{{"id":"{}","object":"response","output":[],"usage":{{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}}"#, + upstream_id + )) .insert_header("content-type", "application/json"), ) .expect(1) @@ -703,6 +707,16 @@ async fn test_ai_proxy_responses_openai_passthrough() { assert_eq!(resp.status(), 200); let body: serde_json::Value = resp.json().await.unwrap(); assert_eq!(body["object"], "response"); + let id = body["id"].as_str().unwrap(); + assert!( + id.starts_with("resp_"), + "id should be a synthetic resp_: {}", + id + ); + assert_ne!( + id, upstream_id, + "upstream id leaked to client — gateway is no longer stateless" + ); } #[tokio::test] diff --git a/plugins/ai-proxy/src/lib.rs b/plugins/ai-proxy/src/lib.rs index 4217a63..778075d 100644 --- a/plugins/ai-proxy/src/lib.rs +++ b/plugins/ai-proxy/src/lib.rs @@ -257,11 +257,17 @@ impl AiProxy { fn dispatch_responses(&mut self, req: Request) -> Response { // Reject `previous_response_id` and parse `store` upfront — this is // the spec-level surface the gateway can validate before resolving a - // target, and it produces stable 4xx responses. - match protocols::responses::ResponsesPreflight::from_body(&req.body) { - Ok(_) => {} + // target, and it produces stable 4xx responses. Stash the + // `store_downgrade` flag on context so the protocol handler can read + // it back without re-parsing the body. + let preflight = match protocols::responses::ResponsesPreflight::from_body(&req.body) { + Ok(p) => p, Err(resp) => return resp, - } + }; + host::context_set( + protocols::responses::CTX_STORE_DOWNGRADE, + if preflight.store_downgrade { "true" } else { "false" }, + ); let client_model = match extract_client_model(&req.body) { Some(m) => m, diff --git a/plugins/ai-proxy/src/protocols/responses.rs b/plugins/ai-proxy/src/protocols/responses.rs index e2dc3ca..c53d73f 100644 --- a/plugins/ai-proxy/src/protocols/responses.rs +++ b/plugins/ai-proxy/src/protocols/responses.rs @@ -28,6 +28,12 @@ use crate::{ use barbacane_plugin_sdk::prelude::*; use std::collections::BTreeMap; +/// Context key that carries the preflight `store_downgrade` flag from +/// `dispatch_responses` (where the body was first parsed) into [`handle`] +/// (where the response is built). Avoids a second parse of the inbound body +/// solely to recover this one boolean. +pub(crate) const CTX_STORE_DOWNGRADE: &str = "ai.responses.store_downgrade"; + // --------------------------------------------------------------------------- // Preflight: the cheap, pre-target-resolution checks // --------------------------------------------------------------------------- @@ -84,17 +90,19 @@ pub(crate) fn handle( ) -> Result { match target.provider { Provider::OpenAI => openai_passthrough(plugin, target, req, streaming), - Provider::Ollama => Ok(responses_not_supported_for_provider_response("ollama")), + Provider::Ollama => Ok(responses_not_supported_for_provider_response(Provider::Ollama)), Provider::Anthropic => { - // Re-parse the body for translation. The preflight already - // confirmed previous_response_id is absent and captured the - // store flag, but the translator needs the full document. + // Parse the body for translation. The preflight already confirmed + // `previous_response_id` is absent and stashed `store_downgrade` + // on context (read back below), so this is the only parse we + // need on the Anthropic path. let raw = req.body.as_deref().unwrap_or(b"{}"); let body: serde_json::Value = serde_json::from_slice(raw) .map_err(|e| format!("invalid Responses request body: {}", e))?; - let preflight = ResponsesPreflight::from_body(&req.body) - .expect("preflight already ran in dispatch"); + let store_downgrade = host::context_get(CTX_STORE_DOWNGRADE) + .map(|v| v == "true") + .unwrap_or(true); let translation = ResponsesToAnthropic::translate( &body, @@ -118,20 +126,15 @@ pub(crate) fn handle( } let body_str = raw_resp.body_str().unwrap_or("").to_string(); - let translated = AnthropicToResponses::translate( - &body_str, - client_model, - preflight.store_downgrade, - translation.dropped_reasoning_count, - )?; + let translated = AnthropicToResponses::translate(&body_str, client_model)?; // Annotate the response with Warning headers + emit counters so // operators can quantify both "this client sends store: true" and // "this client sends reasoning items we dropped". let mut headers = raw_resp.headers; - attach_warnings(&mut headers, preflight.store_downgrade, translation.dropped_reasoning_count > 0); + attach_warnings(&mut headers, store_downgrade, translation.dropped_reasoning_count > 0); - if preflight.store_downgrade { + if store_downgrade { host::metric_counter_inc( "responses_store_downgrades_total", &crate::labels1("provider", target.provider.name()), @@ -185,6 +188,12 @@ fn openai_passthrough( }; if streaming { + // Known gap: ADR-0030 §2 requires the response `id` to be a synthetic + // `resp_` so the gateway's stateless contract holds uniformly. + // For non-streaming we rewrite the id post-call (below); for streaming + // SSE the id is buried in `response.created` SSE event payloads which + // we'd need to parse and rewrite mid-stream. True SSE handling is + // already deferred for both protocols — see ADR-0030 §2 "Streaming". let req_json = serde_json::to_vec(&http_req).map_err(|e| e.to_string())?; let result = unsafe { host_http_stream(req_json.as_ptr() as i32, req_json.len() as i32) }; @@ -194,7 +203,42 @@ fn openai_passthrough( Ok(streamed_response()) } else { let resp_bytes = http_call(&http_req)?; - Ok(crate::build_response(resp_bytes)) + let resp = crate::build_response(resp_bytes); + // Stateless contract: rewrite the upstream `id` to a synthetic + // `resp_`. Without this, OpenAI's real id leaks to the + // client, who could then send it back as `previous_response_id` + // and get 400 — the rejection lands consistently for every provider. + Ok(rewrite_response_id_if_2xx(resp)) + } +} + +/// On a 2xx upstream Responses payload, replace the `id` field with a +/// synthetic `resp_`. Non-2xx and unparseable bodies pass through +/// untouched (errors carry their own shape; we don't risk mangling them). +fn rewrite_response_id_if_2xx(resp: Response) -> Response { + if !(200..300).contains(&resp.status) { + return resp; + } + let body_bytes = match resp.body.as_ref() { + Some(b) => b, + None => return resp, + }; + let mut v: serde_json::Value = match serde_json::from_slice(body_bytes) { + Ok(v) => v, + Err(_) => return resp, + }; + if let Some(obj) = v.as_object_mut() { + obj.insert( + "id".to_string(), + serde_json::Value::String(format!("resp_{}", make_uuid_v7())), + ); + } else { + return resp; + } + Response { + status: resp.status, + headers: resp.headers, + body: Some(serde_json::to_vec(&v).unwrap_or(body_bytes.clone())), } } @@ -239,21 +283,6 @@ impl ResponsesToAnthropic { let mut current_blocks: Vec = Vec::new(); let mut dropped_reasoning_count = 0usize; - // Helper: flush the buffer into a message when the role changes. - let flush = - |role: Option<&String>, - blocks: &mut Vec, - messages: &mut Vec| { - if let Some(r) = role { - if !blocks.is_empty() { - messages.push(serde_json::json!({ - "role": r, - "content": std::mem::take(blocks), - })); - } - } - }; - for item in input_items { let item_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); let role = item.get("role").and_then(|v| v.as_str()).unwrap_or("user"); @@ -272,7 +301,7 @@ impl ResponsesToAnthropic { } } else { if current_role.as_deref() != Some(role) { - flush(current_role.as_ref(), &mut current_blocks, &mut messages); + flush_message(current_role.as_deref(), &mut current_blocks, &mut messages); current_role = Some(role.to_string()); } current_blocks.extend(blocks); @@ -285,7 +314,7 @@ impl ResponsesToAnthropic { // the client if the format is wrong. let block = build_image_block(item); if current_role.as_deref() != Some(role) { - flush(current_role.as_ref(), &mut current_blocks, &mut messages); + flush_message(current_role.as_deref(), &mut current_blocks, &mut messages); current_role = Some(role.to_string()); } current_blocks.push(block); @@ -303,7 +332,7 @@ impl ResponsesToAnthropic { .unwrap_or(serde_json::Value::Object(Default::default())), }); if current_role.as_deref() != Some("assistant") { - flush(current_role.as_ref(), &mut current_blocks, &mut messages); + flush_message(current_role.as_deref(), &mut current_blocks, &mut messages); current_role = Some("assistant".to_string()); } current_blocks.push(tool_use); @@ -316,7 +345,7 @@ impl ResponsesToAnthropic { "content": item.get("output").cloned().unwrap_or(serde_json::Value::String(String::new())), }); if current_role.as_deref() != Some("user") { - flush(current_role.as_ref(), &mut current_blocks, &mut messages); + flush_message(current_role.as_deref(), &mut current_blocks, &mut messages); current_role = Some("user".to_string()); } current_blocks.push(tool_result); @@ -337,7 +366,7 @@ impl ResponsesToAnthropic { } } } - flush(current_role.as_ref(), &mut current_blocks, &mut messages); + flush_message(current_role.as_deref(), &mut current_blocks, &mut messages); // Anthropic requires `max_tokens`. The translator falls back to the // dispatcher's default; if that's also unset, we use 4096 — same @@ -381,6 +410,27 @@ impl ResponsesToAnthropic { } } +/// Push a `{role, content}` message into `messages` if there are any blocks +/// buffered for the current role, and clear the buffer. Used by the +/// translator to flush out a message when the role changes. +fn flush_message( + role: Option<&str>, + blocks: &mut Vec, + messages: &mut Vec, +) { + let r = match role { + Some(r) => r, + None => return, + }; + if blocks.is_empty() { + return; + } + messages.push(serde_json::json!({ + "role": r, + "content": std::mem::take(blocks), + })); +} + /// Convert `input_text`'s `content` field into a vec of Anthropic text blocks. /// Accepts either a plain string or the OpenAI array-of-parts shape. fn build_text_or_array_blocks(value: Option<&serde_json::Value>) -> Vec { @@ -398,7 +448,17 @@ fn build_text_or_array_blocks(value: Option<&serde_json::Value>) -> Vec Some(build_image_block(part)), - _ => None, + other => { + // Match the top-level item handler — unknown part-types + // log a warning rather than dropping silently. Future + // OpenAI part-types we don't know yet stay diagnosable + // without breaking translation. + host::log_warn(&format!( + "ai-proxy: unknown Responses content part type {:?}; dropping", + other + )); + None + } } }) .collect(), @@ -429,13 +489,12 @@ fn build_image_block(part: &serde_json::Value) -> serde_json::Value { struct AnthropicToResponses; impl AnthropicToResponses { - fn translate( - body: &str, - client_model: &str, - store_downgrade: bool, - dropped_reasoning_count: usize, - ) -> Result { - let _ = (store_downgrade, dropped_reasoning_count); // signaled via headers/metrics, not body + /// Translate a 2xx Anthropic Messages-format response body into an + /// equivalent OpenAI Responses payload. Warning headers and counters for + /// `store_downgrade` and dropped reasoning items are surfaced by the + /// caller via [`attach_warnings`] / `metric_counter_inc`; nothing about + /// them flows into the response body. + fn translate(body: &str, client_model: &str) -> Result { let anthropic: serde_json::Value = serde_json::from_str(body).map_err(|e| format!("invalid Anthropic response: {}", e))?; @@ -520,9 +579,15 @@ fn now_secs() -> u64 { /// the Barbacane runtime) or adding a `host_random_bytes` capability. We /// don't need cryptographic randomness for a non-retrievable opaque /// identifier — the v7 spec only requires monotonicity within a node, which -/// the counter satisfies. Two ids generated in the same millisecond differ -/// in their counter portion; ids in successive milliseconds order -/// chronologically by the timestamp. +/// the counter satisfies. +/// +/// **Scope.** The counter is per-plugin-instance and resets to 0 on instance +/// restart; it wraps with `wrapping_add` at 2^64. Two ids generated in the +/// same millisecond **by the same plugin instance** differ in their counter +/// portion. Two instances starting in the same millisecond can theoretically +/// collide on early ids, and a single instance running 1.8×10^19 requests +/// would collide on wrap — both inconsequential for a non-retrievable +/// opaque tracking handle, but worth knowing. fn make_uuid_v7() -> uuid::Uuid { use std::cell::Cell; thread_local! { @@ -620,7 +685,7 @@ fn previous_response_id_not_supported_response() -> Response { } } -fn responses_not_supported_for_provider_response(provider: &str) -> Response { +fn responses_not_supported_for_provider_response(provider: Provider) -> Response { let body = serde_json::json!({ "type": "urn:barbacane:error:responses_not_supported_for_provider", "title": "Bad Request", @@ -630,7 +695,7 @@ fn responses_not_supported_for_provider_response(provider: &str) -> Response { "ai-proxy: provider {:?} does not implement the OpenAI Responses API \ (no upstream surface to translate to or passthrough). Use \ `/v1/chat/completions` instead, or route to OpenAI/Anthropic.", - provider + provider.name() ), }); let mut headers = BTreeMap::new(); @@ -817,10 +882,114 @@ mod tests { assert_eq!(v["model"], "claude-sonnet-4-6"); } + #[test] + fn translate_in_interleaved_roles_preserve_order() { + // R8: the role-switching `flush_message` logic is the most plausible + // regression site. Send an interleaved [user, assistant, user] + // sequence and verify each segment becomes its own message in order + // — not collapsed or reordered. + let res = translate_in( + r#"{"input":[ + {"type":"input_text","role":"user","content":"first user"}, + {"type":"input_text","role":"assistant","content":"first assistant"}, + {"type":"input_text","role":"user","content":"second user"} + ]}"#, + ); + let body: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + let messages = body["messages"].as_array().unwrap(); + assert_eq!(messages.len(), 3); + assert_eq!(messages[0]["role"], "user"); + assert_eq!(messages[0]["content"][0]["text"], "first user"); + assert_eq!(messages[1]["role"], "assistant"); + assert_eq!(messages[1]["content"][0]["text"], "first assistant"); + assert_eq!(messages[2]["role"], "user"); + assert_eq!(messages[2]["content"][0]["text"], "second user"); + } + + #[test] + fn translate_in_consecutive_same_role_items_coalesce() { + // The flip side of the role-switch test: two `user` items in a row + // should land in the same message's `content` array, not produce two + // separate messages. + let res = translate_in( + r#"{"input":[ + {"type":"input_text","role":"user","content":"part one"}, + {"type":"input_text","role":"user","content":"part two"} + ]}"#, + ); + let body: serde_json::Value = serde_json::from_str(&res.body).unwrap(); + let messages = body["messages"].as_array().unwrap(); + assert_eq!(messages.len(), 1); + let blocks = messages[0]["content"].as_array().unwrap(); + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0]["text"], "part one"); + assert_eq!(blocks[1]["text"], "part two"); + } + + #[test] + fn flush_message_helper_is_a_no_op_with_empty_buffer() { + let mut blocks: Vec = Vec::new(); + let mut messages: Vec = Vec::new(); + flush_message(Some("user"), &mut blocks, &mut messages); + assert!(messages.is_empty()); + } + + #[test] + fn flush_message_helper_is_a_no_op_with_no_role() { + let mut blocks = vec![serde_json::json!({"type":"text","text":"orphaned"})]; + let mut messages: Vec = Vec::new(); + flush_message(None, &mut blocks, &mut messages); + assert!(messages.is_empty()); + } + + // --- R1: id rewriter on the OpenAI passthrough path --- + + #[test] + fn rewrite_response_id_replaces_2xx_id_with_synthetic() { + let upstream = Response { + status: 200, + headers: BTreeMap::new(), + body: Some(br#"{"id":"resp_real_openai","object":"response"}"#.to_vec()), + }; + let out = rewrite_response_id_if_2xx(upstream); + let body: serde_json::Value = serde_json::from_slice(out.body.as_ref().unwrap()).unwrap(); + let id = body["id"].as_str().unwrap(); + assert!(id.starts_with("resp_"), "{}", id); + assert_ne!(id, "resp_real_openai"); + assert_eq!(body["object"], "response"); + } + + #[test] + fn rewrite_response_id_passes_4xx_through_unchanged() { + // Don't mangle upstream errors — they have their own shape. + let upstream = Response { + status: 400, + headers: BTreeMap::new(), + body: Some(br#"{"error":{"code":"invalid_request","message":"bad"}}"#.to_vec()), + }; + let out = rewrite_response_id_if_2xx(upstream); + let body: serde_json::Value = serde_json::from_slice(out.body.as_ref().unwrap()).unwrap(); + assert_eq!(body["error"]["code"], "invalid_request"); + assert!(body.get("id").is_none()); + } + + #[test] + fn rewrite_response_id_passes_unparseable_body_through_unchanged() { + // Some upstreams (or middleboxes) return non-JSON. Don't fail — + // pass through and let the client see what we saw. + let upstream = Response { + status: 200, + headers: BTreeMap::new(), + body: Some(b"not even json".to_vec()), + }; + let out = rewrite_response_id_if_2xx(upstream); + assert_eq!(out.body.as_ref().unwrap(), b"not even json"); + } + // --- Anthropic → Responses translation --- fn translate_out(body: &str) -> serde_json::Value { - let s = AnthropicToResponses::translate(body, "claude-sonnet-4-6", false, 0).unwrap(); + let s = AnthropicToResponses::translate(body, "claude-sonnet-4-6").unwrap(); serde_json::from_str(&s).unwrap() } @@ -905,7 +1074,7 @@ mod tests { #[test] fn responses_not_supported_for_provider_shape() { - let resp = responses_not_supported_for_provider_response("ollama"); + let resp = responses_not_supported_for_provider_response(Provider::Ollama); assert_eq!(resp.status, 400); let body: serde_json::Value = serde_json::from_slice(resp.body.as_ref().unwrap()).unwrap();