Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions internal/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1937,7 +1937,7 @@ func (e *sseUsageExtractor) processChunk(data []byte) {
evType, _ := ev["type"].(string)
switch evType {
case "message_start":
// {"type":"message_start","message":{"usage":{"input_tokens":N}}}
// Anthropic: {"type":"message_start","message":{"usage":{"input_tokens":N}}}
if msg, ok := ev["message"].(map[string]interface{}); ok {
if u, ok := msg["usage"].(map[string]interface{}); ok {
if v, ok := u["input_tokens"].(float64); ok {
Expand All @@ -1946,12 +1946,37 @@ func (e *sseUsageExtractor) processChunk(data []byte) {
}
}
case "message_delta":
// {"type":"message_delta","usage":{"output_tokens":N}}
// Anthropic: {"type":"message_delta","usage":{"output_tokens":N}}
if u, ok := ev["usage"].(map[string]interface{}); ok {
if v, ok := u["output_tokens"].(float64); ok {
e.outputTok += int(v)
}
}
case "response.completed":
// Responses API: {"type":"response.completed","response":{"usage":{...}}}
if resp, ok := ev["response"].(map[string]interface{}); ok {
if u, ok := resp["usage"].(map[string]interface{}); ok {
if v, ok := u["input_tokens"].(float64); ok {
e.inputTok += int(v)
}
if v, ok := u["output_tokens"].(float64); ok {
e.outputTok += int(v)
}
}
}
default:
// OpenAI Chat Completions chunks have no "type" field; usage appears
// in the final chunk as top-level {"usage":{"prompt_tokens":N,"completion_tokens":M}}.
if evType == "" {
if u, ok := ev["usage"].(map[string]interface{}); ok {
if v, ok := u["prompt_tokens"].(float64); ok {
e.inputTok += int(v)
}
if v, ok := u["completion_tokens"].(float64); ok {
e.outputTok += int(v)
}
}
}
}
}
e.partial = buf
Expand Down
63 changes: 63 additions & 0 deletions internal/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3382,6 +3382,69 @@ func TestSSEUsageExtractor(t *testing.T) {
io.ReadAll(extractor)
})

t.Run("extracts_usage_from_responses_api_sse", func(t *testing.T) {
sse := strings.Join([]string{
`data: {"type":"response.created","response":{"id":"resp_1","status":"in_progress"}}`,
``,
`data: {"type":"response.output_text.delta","delta":"Hi"}`,
``,
`data: {"type":"response.completed","response":{"status":"completed","usage":{"input_tokens":30,"output_tokens":15}}}`,
``,
}, "\n")

sessionID := "test-sse-responses-api"
ClearSessionUsage(sessionID)
extractor := &sseUsageExtractor{
r: io.NopCloser(strings.NewReader(sse)),
sessionID: sessionID,
}
if _, err := io.ReadAll(extractor); err != nil {
t.Fatalf("unexpected read error: %v", err)
}
got := GetSessionUsage(sessionID)
if got == nil {
t.Fatal("expected session usage to be updated, got nil")
}
if got.InputTokens != 30 {
t.Errorf("InputTokens = %d, want 30", got.InputTokens)
}
if got.OutputTokens != 15 {
t.Errorf("OutputTokens = %d, want 15", got.OutputTokens)
}
})

t.Run("extracts_usage_from_openai_chat_sse", func(t *testing.T) {
// OpenAI Chat final chunk has no "type" field; usage at top level
sse := strings.Join([]string{
`data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hi"}}]}`,
``,
`data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"finish_reason":"stop"}],"usage":{"prompt_tokens":20,"completion_tokens":8,"total_tokens":28}}`,
``,
`data: [DONE]`,
``,
}, "\n")

sessionID := "test-sse-openai-chat"
ClearSessionUsage(sessionID)
extractor := &sseUsageExtractor{
r: io.NopCloser(strings.NewReader(sse)),
sessionID: sessionID,
}
if _, err := io.ReadAll(extractor); err != nil {
t.Fatalf("unexpected read error: %v", err)
}
got := GetSessionUsage(sessionID)
if got == nil {
t.Fatal("expected session usage to be updated, got nil")
}
if got.InputTokens != 20 {
t.Errorf("InputTokens = %d, want 20", got.InputTokens)
}
if got.OutputTokens != 8 {
t.Errorf("OutputTokens = %d, want 8", got.OutputTokens)
}
})

t.Run("close_delegates_to_inner", func(t *testing.T) {
extractor := &sseUsageExtractor{
r: io.NopCloser(strings.NewReader("")),
Expand Down
135 changes: 87 additions & 48 deletions internal/proxy/transform/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,11 +1101,20 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io

var currentEvent string
var dataBuffer bytes.Buffer
// Use a deterministic ID prefix; real responses will come from the event data.
// Fallback ID; overwritten from the first response.created event.
completionID := fmt.Sprintf("chatcmpl-%d", time.Now().UnixNano())
model := st.Model
created := time.Now().Unix()

// Per-tool-call metadata captured from response.output_item.added.
// Key = output_index (int), value = {callID, name, headerSent}.
type toolMeta struct {
callID string
name string
headerSent bool
}
toolCalls := make(map[int]*toolMeta)

emitChunk := func(delta map[string]interface{}, finishReason interface{}) {
choice := map[string]interface{}{
"index": 0,
Expand All @@ -1126,6 +1135,27 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io
fmt.Fprintf(w, "data: %s\n\n", data)
}

processCompleted := func(eventData map[string]interface{}) {
finishReason := "stop"
if resp, ok := eventData["response"].(map[string]interface{}); ok {
if status, ok := resp["status"].(string); ok && status == "incomplete" {
finishReason = "length"
}
if output, ok := resp["output"].([]interface{}); ok {
for _, item := range output {
if itemMap, ok := item.(map[string]interface{}); ok {
if itemMap["type"] == "function_call" {
finishReason = "tool_calls"
break
}
}
}
}
}
emitChunk(map[string]interface{}{}, finishReason)
fmt.Fprintf(w, "data: [DONE]\n\n")
}

for scanner.Scan() {
line := scanner.Text()

Expand Down Expand Up @@ -1155,27 +1185,72 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io

switch currentEvent {
case "response.created", "response.in_progress":
// Emit role delta at stream start
if id, ok := eventData["id"].(string); ok {
completionID = "chatcmpl-" + id
}
if m, ok := eventData["model"].(string); ok {
model = m
// id and model are inside the nested "response" object
if resp, ok := eventData["response"].(map[string]interface{}); ok {
if id, ok := resp["id"].(string); ok {
completionID = "chatcmpl-" + id
}
if m, ok := resp["model"].(string); ok {
model = m
}
}
emitChunk(map[string]interface{}{"role": "assistant", "content": ""}, nil)

case "response.output_item.added":
// Capture function_call metadata so we can emit proper headers later.
if item, ok := eventData["item"].(map[string]interface{}); ok {
if item["type"] == "function_call" {
idx := 0
if v, ok := eventData["output_index"].(float64); ok {
idx = int(v)
}
toolCalls[idx] = &toolMeta{
callID: fmt.Sprintf("%v", item["call_id"]),
name: fmt.Sprintf("%v", item["name"]),
}
}
}

case "response.output_text.delta":
if delta, ok := eventData["delta"].(string); ok && delta != "" {
emitChunk(map[string]interface{}{"content": delta}, nil)
}

case "response.function_call_arguments.delta":
// Streaming tool call arguments
if delta, ok := eventData["delta"].(string); ok && delta != "" {
idx := 0
if v, ok := eventData["output_index"].(float64); ok {
idx = int(v)
}
delta, _ := eventData["delta"].(string)

tc := toolCalls[idx]
if tc == nil {
tc = &toolMeta{}
toolCalls[idx] = tc
}

if !tc.headerSent {
// First delta for this tool call: emit id, type, function.name
tc.headerSent = true
emitChunk(map[string]interface{}{
"tool_calls": []interface{}{
map[string]interface{}{
"index": idx,
"id": tc.callID,
"type": "function",
"function": map[string]interface{}{
"name": tc.name,
"arguments": delta,
},
},
},
}, nil)
} else if delta != "" {
// Subsequent deltas: arguments only
emitChunk(map[string]interface{}{
"tool_calls": []interface{}{
map[string]interface{}{
"index": 0,
"index": idx,
"function": map[string]interface{}{
"arguments": delta,
},
Expand All @@ -1185,26 +1260,7 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io
}

case "response.completed":
// Determine finish reason from completed response
finishReason := "stop"
if resp, ok := eventData["response"].(map[string]interface{}); ok {
if status, ok := resp["status"].(string); ok && status == "incomplete" {
finishReason = "length"
}
// Check for tool use in output
if output, ok := resp["output"].([]interface{}); ok {
for _, item := range output {
if itemMap, ok := item.(map[string]interface{}); ok {
if itemMap["type"] == "function_call" {
finishReason = "tool_calls"
break
}
}
}
}
}
emitChunk(map[string]interface{}{}, finishReason)
fmt.Fprintf(w, "data: [DONE]\n\n")
processCompleted(eventData)
}

currentEvent = ""
Expand All @@ -1214,24 +1270,7 @@ func (st *StreamTransformer) transformResponsesAPIToOpenAIChat(r io.Reader, w io
if dataBuffer.Len() > 0 && currentEvent == "response.completed" {
var eventData map[string]interface{}
if json.Unmarshal(dataBuffer.Bytes(), &eventData) == nil {
finishReason := "stop"
if resp, ok := eventData["response"].(map[string]interface{}); ok {
if status, ok := resp["status"].(string); ok && status == "incomplete" {
finishReason = "length"
}
if output, ok := resp["output"].([]interface{}); ok {
for _, item := range output {
if itemMap, ok := item.(map[string]interface{}); ok {
if itemMap["type"] == "function_call" {
finishReason = "tool_calls"
break
}
}
}
}
}
emitChunk(map[string]interface{}{}, finishReason)
fmt.Fprintf(w, "data: [DONE]\n\n")
processCompleted(eventData)
}
}

Expand Down
26 changes: 25 additions & 1 deletion internal/proxy/transform/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,14 @@ func TestTransformResponsesAPIToOpenAIChat_Text(t *testing.T) {
if !strings.Contains(result, `"chat.completion.chunk"`) {
t.Error("should emit chat.completion.chunk objects")
}
// id should be derived from response.id ("chatcmpl-resp_chat1")
if !strings.Contains(result, `chatcmpl-resp_chat1`) {
t.Errorf("should use id from response object, got: %s", result)
}
// model should come from response.model
if !strings.Contains(result, `"gpt-5"`) {
t.Errorf("should use model from response object, got: %s", result)
}
if !strings.Contains(result, `"Hello"`) {
t.Error("should include first delta text")
}
Expand All @@ -1229,11 +1237,17 @@ func TestTransformResponsesAPIToOpenAIChat_ToolCall(t *testing.T) {
`event: response.created`,
`data: {"type":"response.created","response":{"id":"resp_tc1","status":"in_progress","model":"gpt-5","output":[]}}`,
``,
`event: response.output_item.added`,
`data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","id":"fc_1","call_id":"call_abc","name":"get_weather"}}`,
``,
`event: response.function_call_arguments.delta`,
`data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","output_index":0,"delta":"{\"loc"}`,
``,
`event: response.function_call_arguments.delta`,
`data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","output_index":0,"delta":"\":\"NYC\"}"}`,
``,
`event: response.completed`,
`data: {"type":"response.completed","response":{"id":"resp_tc1","status":"completed","model":"gpt-5","output":[{"type":"function_call","id":"fc_1","call_id":"call_1","name":"weather"}]}}`,
`data: {"type":"response.completed","response":{"id":"resp_tc1","status":"completed","model":"gpt-5","output":[{"type":"function_call","id":"fc_1","call_id":"call_abc","name":"get_weather"}]}}`,
``,
}, "\n")

Expand All @@ -1251,7 +1265,17 @@ func TestTransformResponsesAPIToOpenAIChat_ToolCall(t *testing.T) {
if !strings.Contains(result, `"tool_calls"`) {
t.Error("should emit tool_calls delta")
}
// First delta should include call_id and function name
if !strings.Contains(result, `"call_abc"`) {
t.Errorf("should include tool call id from output_item.added, got: %s", result)
}
if !strings.Contains(result, `"get_weather"`) {
t.Errorf("should include function name from output_item.added, got: %s", result)
}
if !strings.Contains(result, `"tool_calls"`) || !strings.Contains(result, `"finish_reason"`) {
t.Error("should emit finish chunk with tool_calls finish_reason")
}
if !strings.Contains(result, "data: [DONE]") {
t.Error("should emit [DONE] sentinel")
}
}
Loading