From b68014219be4bd50242c8cc31cae526a86d1ea34 Mon Sep 17 00:00:00 2001 From: John Zhang Date: Wed, 11 Mar 2026 22:42:33 +0800 Subject: [PATCH 1/2] fix: correct streaming id/model extraction and multi-format usage tracking - stream.go: extract id/model from nested response object in response.created/in_progress events (was reading wrong top-level key) - stream.go: capture call_id/name from response.output_item.added so first tool_calls delta includes proper id and function.name header - stream.go: emit full tool call header (id, type, function.name, first args) on first delta; subsequent deltas carry args only - server.go: sseUsageExtractor.processChunk now handles Responses API response.completed events and OpenAI Chat final chunks (no "type" field, usage at top level with prompt_tokens/completion_tokens) - tests: assert id and model fields in streaming chunks; add response.output_item.added to tool call test; cover Responses API and OpenAI Chat usage extraction in sseUsageExtractor tests Co-Authored-By: Claude Sonnet 4.6 --- internal/proxy/server.go | 29 ++++- internal/proxy/server_test.go | 63 +++++++++++ internal/proxy/transform/stream.go | 135 +++++++++++++++--------- internal/proxy/transform/stream_test.go | 26 ++++- 4 files changed, 202 insertions(+), 51 deletions(-) diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 6a0e84b..77e49ac 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -1937,7 +1937,7 @@ func (e *sseUsageExtractor) processChunk(data []byte) { evType, _ := ev["type"].(string) switch evType { case "message_start": - // {"type":"message_start","message":{"usage":{"input_tokens":N}}} + // Anthropic: {"type":"message_start","message":{"usage":{"input_tokens":N}}} if msg, ok := ev["message"].(map[string]interface{}); ok { if u, ok := msg["usage"].(map[string]interface{}); ok { if v, ok := u["input_tokens"].(float64); ok { @@ -1946,12 +1946,37 @@ func (e *sseUsageExtractor) processChunk(data []byte) { } } case "message_delta": - // {"type":"message_delta","usage":{"output_tokens":N}} + // Anthropic: {"type":"message_delta","usage":{"output_tokens":N}} if u, ok := ev["usage"].(map[string]interface{}); ok { if v, ok := u["output_tokens"].(float64); ok { e.outputTok += int(v) } } + case "response.completed": + // Responses API: {"type":"response.completed","response":{"usage":{...}}} + if resp, ok := ev["response"].(map[string]interface{}); ok { + if u, ok := resp["usage"].(map[string]interface{}); ok { + if v, ok := u["input_tokens"].(float64); ok { + e.inputTok += int(v) + } + if v, ok := u["output_tokens"].(float64); ok { + e.outputTok += int(v) + } + } + } + default: + // OpenAI Chat Completions chunks have no "type" field; usage appears + // in the final chunk as top-level {"usage":{"prompt_tokens":N,"completion_tokens":M}}. + if evType == "" { + if u, ok := ev["usage"].(map[string]interface{}); ok { + if v, ok := u["prompt_tokens"].(float64); ok { + e.inputTok += int(v) + } + if v, ok := u["completion_tokens"].(float64); ok { + e.outputTok += int(v) + } + } + } } } e.partial = buf diff --git a/internal/proxy/server_test.go b/internal/proxy/server_test.go index 6029f9e..2f378bc 100644 --- a/internal/proxy/server_test.go +++ b/internal/proxy/server_test.go @@ -3382,6 +3382,69 @@ func TestSSEUsageExtractor(t *testing.T) { io.ReadAll(extractor) }) + t.Run("extracts_usage_from_responses_api_sse", func(t *testing.T) { + sse := strings.Join([]string{ + `data: {"type":"response.created","response":{"id":"resp_1","status":"in_progress"}}`, + ``, + `data: {"type":"response.output_text.delta","delta":"Hi"}`, + ``, + `data: {"type":"response.completed","response":{"status":"completed","usage":{"input_tokens":30,"output_tokens":15}}}`, + ``, + }, "\n") + + sessionID := "test-sse-responses-api" + ClearSessionUsage(sessionID) + extractor := &sseUsageExtractor{ + r: io.NopCloser(strings.NewReader(sse)), + sessionID: sessionID, + } + if _, err := io.ReadAll(extractor); err != nil { + t.Fatalf("unexpected read error: %v", err) + } + got := GetSessionUsage(sessionID) + if got == nil { + t.Fatal("expected session usage to be updated, got nil") + } + if got.InputTokens != 30 { + t.Errorf("InputTokens = %d, want 30", got.InputTokens) + } + if got.OutputTokens != 15 { + t.Errorf("OutputTokens = %d, want 15", got.OutputTokens) + } + }) + + t.Run("extracts_usage_from_openai_chat_sse", func(t *testing.T) { + // OpenAI Chat final chunk has no "type" field; usage at top level + sse := strings.Join([]string{ + `data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hi"}}]}`, + ``, + `data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"finish_reason":"stop"}],"usage":{"prompt_tokens":20,"completion_tokens":8,"total_tokens":28}}`, + ``, + `data: [DONE]`, + ``, + }, "\n") + + sessionID := "test-sse-openai-chat" + ClearSessionUsage(sessionID) + extractor := &sseUsageExtractor{ + r: io.NopCloser(strings.NewReader(sse)), + sessionID: sessionID, + } + if _, err := io.ReadAll(extractor); err != nil { + t.Fatalf("unexpected read error: %v", err) + } + got := GetSessionUsage(sessionID) + if got == nil { + t.Fatal("expected session usage to be updated, got nil") + } + if got.InputTokens != 20 { + t.Errorf("InputTokens = %d, want 20", got.InputTokens) + } + if got.OutputTokens != 8 { + t.Errorf("OutputTokens = %d, want 8", got.OutputTokens) + } + }) + t.Run("close_delegates_to_inner", func(t *testing.T) { extractor := &sseUsageExtractor{ r: io.NopCloser(strings.NewReader("")), diff --git a/internal/proxy/transform/stream.go b/internal/proxy/transform/stream.go index a6965e1..e71a17c 100644 --- a/internal/proxy/transform/stream.go +++ b/internal/proxy/transform/stream.go @@ -1101,11 +1101,20 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io var currentEvent string var dataBuffer bytes.Buffer - // Use a deterministic ID prefix; real responses will come from the event data. + // Fallback ID; overwritten from the first response.created event. completionID := fmt.Sprintf("chatcmpl-%d", time.Now().UnixNano()) model := st.Model created := time.Now().Unix() + // Per-tool-call metadata captured from response.output_item.added. + // Key = output_index (int), value = {callID, name, headerSent}. + type toolMeta struct { + callID string + name string + headerSent bool + } + toolCalls := make(map[int]*toolMeta) + emitChunk := func(delta map[string]interface{}, finishReason interface{}) { choice := map[string]interface{}{ "index": 0, @@ -1126,6 +1135,27 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io fmt.Fprintf(w, "data: %s\n\n", data) } + processCompleted := func(eventData map[string]interface{}) { + finishReason := "stop" + if resp, ok := eventData["response"].(map[string]interface{}); ok { + if status, ok := resp["status"].(string); ok && status == "incomplete" { + finishReason = "length" + } + if output, ok := resp["output"].([]interface{}); ok { + for _, item := range output { + if itemMap, ok := item.(map[string]interface{}); ok { + if itemMap["type"] == "function_call" { + finishReason = "tool_calls" + break + } + } + } + } + } + emitChunk(map[string]interface{}{}, finishReason) + fmt.Fprintf(w, "data: [DONE]\n\n") + } + for scanner.Scan() { line := scanner.Text() @@ -1155,27 +1185,72 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io switch currentEvent { case "response.created", "response.in_progress": - // Emit role delta at stream start - if id, ok := eventData["id"].(string); ok { - completionID = "chatcmpl-" + id - } - if m, ok := eventData["model"].(string); ok { - model = m + // id and model are inside the nested "response" object + if resp, ok := eventData["response"].(map[string]interface{}); ok { + if id, ok := resp["id"].(string); ok { + completionID = "chatcmpl-" + id + } + if m, ok := resp["model"].(string); ok { + model = m + } } emitChunk(map[string]interface{}{"role": "assistant", "content": ""}, nil) + case "response.output_item.added": + // Capture function_call metadata so we can emit proper headers later. + if item, ok := eventData["item"].(map[string]interface{}); ok { + if item["type"] == "function_call" { + idx := 0 + if v, ok := eventData["output_index"].(float64); ok { + idx = int(v) + } + toolCalls[idx] = &toolMeta{ + callID: fmt.Sprintf("%v", item["call_id"]), + name: fmt.Sprintf("%v", item["name"]), + } + } + } + case "response.output_text.delta": if delta, ok := eventData["delta"].(string); ok && delta != "" { emitChunk(map[string]interface{}{"content": delta}, nil) } case "response.function_call_arguments.delta": - // Streaming tool call arguments - if delta, ok := eventData["delta"].(string); ok && delta != "" { + idx := 0 + if v, ok := eventData["output_index"].(float64); ok { + idx = int(v) + } + delta, _ := eventData["delta"].(string) + + tc := toolCalls[idx] + if tc == nil { + tc = &toolMeta{} + toolCalls[idx] = tc + } + + if !tc.headerSent { + // First delta for this tool call: emit id, type, function.name + tc.headerSent = true + emitChunk(map[string]interface{}{ + "tool_calls": []interface{}{ + map[string]interface{}{ + "index": idx, + "id": tc.callID, + "type": "function", + "function": map[string]interface{}{ + "name": tc.name, + "arguments": delta, + }, + }, + }, + }, nil) + } else if delta != "" { + // Subsequent deltas: arguments only emitChunk(map[string]interface{}{ "tool_calls": []interface{}{ map[string]interface{}{ - "index": 0, + "index": idx, "function": map[string]interface{}{ "arguments": delta, }, @@ -1185,26 +1260,7 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io } case "response.completed": - // Determine finish reason from completed response - finishReason := "stop" - if resp, ok := eventData["response"].(map[string]interface{}); ok { - if status, ok := resp["status"].(string); ok && status == "incomplete" { - finishReason = "length" - } - // Check for tool use in output - if output, ok := resp["output"].([]interface{}); ok { - for _, item := range output { - if itemMap, ok := item.(map[string]interface{}); ok { - if itemMap["type"] == "function_call" { - finishReason = "tool_calls" - break - } - } - } - } - } - emitChunk(map[string]interface{}{}, finishReason) - fmt.Fprintf(w, "data: [DONE]\n\n") + processCompleted(eventData) } currentEvent = "" @@ -1214,24 +1270,7 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io if dataBuffer.Len() > 0 && currentEvent == "response.completed" { var eventData map[string]interface{} if json.Unmarshal(dataBuffer.Bytes(), &eventData) == nil { - finishReason := "stop" - if resp, ok := eventData["response"].(map[string]interface{}); ok { - if status, ok := resp["status"].(string); ok && status == "incomplete" { - finishReason = "length" - } - if output, ok := resp["output"].([]interface{}); ok { - for _, item := range output { - if itemMap, ok := item.(map[string]interface{}); ok { - if itemMap["type"] == "function_call" { - finishReason = "tool_calls" - break - } - } - } - } - } - emitChunk(map[string]interface{}{}, finishReason) - fmt.Fprintf(w, "data: [DONE]\n\n") + processCompleted(eventData) } } diff --git a/internal/proxy/transform/stream_test.go b/internal/proxy/transform/stream_test.go index 17cf0fc..c5613f2 100644 --- a/internal/proxy/transform/stream_test.go +++ b/internal/proxy/transform/stream_test.go @@ -1210,6 +1210,14 @@ func TestTransformResponsesAPIToOpenAIChat_Text(t *testing.T) { if !strings.Contains(result, `"chat.completion.chunk"`) { t.Error("should emit chat.completion.chunk objects") } + // id should be derived from response.id ("chatcmpl-resp_chat1") + if !strings.Contains(result, `chatcmpl-resp_chat1`) { + t.Errorf("should use id from response object, got: %s", result) + } + // model should come from response.model + if !strings.Contains(result, `"gpt-5"`) { + t.Errorf("should use model from response object, got: %s", result) + } if !strings.Contains(result, `"Hello"`) { t.Error("should include first delta text") } @@ -1229,11 +1237,17 @@ func TestTransformResponsesAPIToOpenAIChat_ToolCall(t *testing.T) { `event: response.created`, `data: {"type":"response.created","response":{"id":"resp_tc1","status":"in_progress","model":"gpt-5","output":[]}}`, ``, + `event: response.output_item.added`, + `data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","id":"fc_1","call_id":"call_abc","name":"get_weather"}}`, + ``, `event: response.function_call_arguments.delta`, `data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","output_index":0,"delta":"{\"loc"}`, ``, + `event: response.function_call_arguments.delta`, + `data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","output_index":0,"delta":"\":\"NYC\"}"}`, + ``, `event: response.completed`, - `data: {"type":"response.completed","response":{"id":"resp_tc1","status":"completed","model":"gpt-5","output":[{"type":"function_call","id":"fc_1","call_id":"call_1","name":"weather"}]}}`, + `data: {"type":"response.completed","response":{"id":"resp_tc1","status":"completed","model":"gpt-5","output":[{"type":"function_call","id":"fc_1","call_id":"call_abc","name":"get_weather"}]}}`, ``, }, "\n") @@ -1251,7 +1265,17 @@ func TestTransformResponsesAPIToOpenAIChat_ToolCall(t *testing.T) { if !strings.Contains(result, `"tool_calls"`) { t.Error("should emit tool_calls delta") } + // First delta should include call_id and function name + if !strings.Contains(result, `"call_abc"`) { + t.Errorf("should include tool call id from output_item.added, got: %s", result) + } + if !strings.Contains(result, `"get_weather"`) { + t.Errorf("should include function name from output_item.added, got: %s", result) + } if !strings.Contains(result, `"tool_calls"`) || !strings.Contains(result, `"finish_reason"`) { t.Error("should emit finish chunk with tool_calls finish_reason") } + if !strings.Contains(result, "data: [DONE]") { + t.Error("should emit [DONE] sentinel") + } } From 9202d61d42ab5f6900674838d60154b8381727f6 Mon Sep 17 00:00:00 2001 From: John Zhang Date: Thu, 12 Mar 2026 09:16:39 +0800 Subject: [PATCH 2/2] fix: guard against duplicate assistant role chunk in responses->openai-chat stream Both response.created and response.in_progress events triggered the initial role delta emit. Add roleSent guard so only the first of these events emits the chunk; subsequent ones still update id/model. Add TestTransformResponsesAPIToOpenAIChat_NoDuplicateRoleChunk to verify only one role:assistant chunk appears when both events arrive. Co-Authored-By: Claude Sonnet 4.6 --- internal/proxy/transform/stream.go | 8 +++++- internal/proxy/transform/stream_test.go | 35 +++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/internal/proxy/transform/stream.go b/internal/proxy/transform/stream.go index e71a17c..4052b75 100644 --- a/internal/proxy/transform/stream.go +++ b/internal/proxy/transform/stream.go @@ -1114,6 +1114,9 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io headerSent bool } toolCalls := make(map[int]*toolMeta) + // Guard against duplicate role chunks when both response.created and + // response.in_progress arrive for the same stream. + roleSent := false emitChunk := func(delta map[string]interface{}, finishReason interface{}) { choice := map[string]interface{}{ @@ -1194,7 +1197,10 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io model = m } } - emitChunk(map[string]interface{}{"role": "assistant", "content": ""}, nil) + if !roleSent { + roleSent = true + emitChunk(map[string]interface{}{"role": "assistant", "content": ""}, nil) + } case "response.output_item.added": // Capture function_call metadata so we can emit proper headers later. diff --git a/internal/proxy/transform/stream_test.go b/internal/proxy/transform/stream_test.go index c5613f2..dd17274 100644 --- a/internal/proxy/transform/stream_test.go +++ b/internal/proxy/transform/stream_test.go @@ -1279,3 +1279,38 @@ func TestTransformResponsesAPIToOpenAIChat_ToolCall(t *testing.T) { t.Error("should emit [DONE] sentinel") } } + +func TestTransformResponsesAPIToOpenAIChat_NoDuplicateRoleChunk(t *testing.T) { + // When both response.created and response.in_progress arrive, only one + // assistant role chunk should be emitted. + input := strings.Join([]string{ + `event: response.created`, + `data: {"type":"response.created","response":{"id":"resp_dup1","status":"in_progress","model":"gpt-5","output":[]}}`, + ``, + `event: response.in_progress`, + `data: {"type":"response.in_progress","response":{"id":"resp_dup1","status":"in_progress","model":"gpt-5","output":[]}}`, + ``, + `event: response.output_text.delta`, + `data: {"type":"response.output_text.delta","output_index":0,"content_index":0,"delta":"Hi"}`, + ``, + `event: response.completed`, + `data: {"type":"response.completed","response":{"id":"resp_dup1","status":"completed","model":"gpt-5","output":[]}}`, + ``, + }, "\n") + + st := &StreamTransformer{ + ClientFormat: FormatOpenAIChat, + ProviderFormat: FormatOpenAIResponses, + } + reader := st.TransformSSEStream(strings.NewReader(input)) + output, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Count how many times a role:assistant delta is emitted + roleCount := strings.Count(string(output), `"role":"assistant"`) + if roleCount != 1 { + t.Errorf("expected exactly 1 role:assistant chunk, got %d; output:\n%s", roleCount, string(output)) + } +}