diff --git a/.gitignore b/.gitignore index 75f5c463387..d2c752024bf 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,7 @@ data/ token_estimator_test.go skills-lock.json .playwright-mcp + +# Local-only live probes and scratch test workspaces. +.local-tests/ +service/openaicompat/chat_responses_live_local_test.go diff --git a/dto/openai_response.go b/dto/openai_response.go index 0e6b818dbd8..e503f5f91c6 100644 --- a/dto/openai_response.go +++ b/dto/openai_response.go @@ -334,7 +334,7 @@ func (o *OpenAIResponsesResponse) GetSize() string { } type IncompleteDetails struct { - Reasoning string `json:"reasoning"` + Reason string `json:"reason"` } type ResponsesOutput struct { diff --git a/relay/channel/openai/chat_via_responses.go b/relay/channel/openai/chat_via_responses.go index 2c0752275da..8eb4821ce53 100644 --- a/relay/channel/openai/chat_via_responses.go +++ b/relay/channel/openai/chat_via_responses.go @@ -1,6 +1,7 @@ package openai import ( + "bufio" "fmt" "io" "net/http" @@ -13,31 +14,12 @@ import ( relaycommon "github.com/QuantumNous/new-api/relay/common" "github.com/QuantumNous/new-api/relay/helper" "github.com/QuantumNous/new-api/service" + "github.com/QuantumNous/new-api/service/openaicompat" "github.com/QuantumNous/new-api/types" "github.com/gin-gonic/gin" ) -func responsesStreamIndexKey(itemID string, idx *int) string { - if itemID == "" { - return "" - } - if idx == nil { - return itemID - } - return fmt.Sprintf("%s:%d", itemID, *idx) -} - -func stringDeltaFromPrefix(prev string, next string) string { - if next == "" { - return "" - } - if prev != "" && strings.HasPrefix(next, prev) { - return next[len(prev):] - } - return next -} - func OaiResponsesToChatHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { if resp == nil || resp.Body == nil { return nil, types.NewOpenAIError(fmt.Errorf("invalid response"), types.ErrorCodeBadResponse, http.StatusInternalServerError) @@ -90,6 +72,109 @@ func OaiResponsesToChatHandler(c *gin.Context, info *relaycommon.RelayInfo, resp return usage, nil } +func OaiResponsesToChatBufferedStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { + if resp == nil || resp.Body == nil { + return nil, types.NewOpenAIError(fmt.Errorf("invalid response"), types.ErrorCodeBadResponse, http.StatusInternalServerError) + } + defer service.CloseResponseBodyGracefully(resp) + + accumulator := openaicompat.NewResponsesBufferedAccumulator() + var finalResponse *dto.OpenAIResponsesResponse + var streamErr *types.NewAPIError + + scanner := helper.NewStreamScanner(resp.Body) + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + line := scanner.Text() + if len(line) < 6 || line[:5] != "data:" { + continue + } + data := line[5:] + data = strings.TrimSpace(data) + if data == "" || data == "[DONE]" { + if data == "[DONE]" { + break + } + continue + } + + var streamResp dto.ResponsesStreamResponse + if err := common.UnmarshalJsonStr(data, &streamResp); err != nil { + logger.LogError(c, "failed to unmarshal buffered responses stream event: "+err.Error()) + streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError) + break + } + accumulator.ProcessEvent(&streamResp) + switch streamResp.Type { + case "response.completed", "response.done", "response.incomplete": + finalResponse = streamResp.Response + if streamResp.Type == "response.incomplete" { + if finalResponse == nil { + finalResponse = &dto.OpenAIResponsesResponse{} + } + if len(finalResponse.Status) == 0 { + finalResponse.Status = []byte(`"incomplete"`) + } + } + case "response.failed", "response.error": + if streamResp.Response != nil { + if oaiErr := streamResp.Response.GetOpenAIError(); oaiErr != nil && oaiErr.Type != "" { + streamErr = types.WithOpenAIError(*oaiErr, http.StatusInternalServerError) + break + } + } + streamErr = types.NewOpenAIError(fmt.Errorf("responses stream error: %s", streamResp.Type), types.ErrorCodeBadResponse, http.StatusInternalServerError) + } + if streamErr != nil || finalResponse != nil { + break + } + } + if streamErr != nil { + return nil, streamErr + } + if err := scanner.Err(); err != nil { + return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) + } + if finalResponse == nil { + finalResponse = &dto.OpenAIResponsesResponse{ + ID: helper.GetResponseID(c), + CreatedAt: int(time.Now().Unix()), + Model: info.UpstreamModelName, + Status: []byte(`"completed"`), + } + } + accumulator.SupplementResponseOutput(finalResponse) + + chatId := helper.GetResponseID(c) + chatResp, usage, err := service.ResponsesResponseToChatCompletionsResponse(finalResponse, chatId) + if err != nil { + return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError) + } + if usage == nil || usage.TotalTokens == 0 { + text := service.ExtractOutputTextFromResponses(finalResponse) + usage = service.ResponseText2Usage(c, text, info.UpstreamModelName, info.GetEstimatePromptTokens()) + chatResp.Usage = *usage + } + + var responseBody []byte + switch info.RelayFormat { + case types.RelayFormatClaude: + claudeResp := service.ResponseOpenAI2Claude(chatResp, info) + responseBody, err = common.Marshal(claudeResp) + case types.RelayFormatGemini: + geminiResp := service.ResponseOpenAI2Gemini(chatResp, info) + responseBody, err = common.Marshal(geminiResp) + default: + responseBody, err = common.Marshal(chatResp) + } + if err != nil { + return nil, types.NewOpenAIError(err, types.ErrorCodeJsonMarshalFailed, http.StatusInternalServerError) + } + + service.IOCopyBytesGracefully(c, resp, responseBody) + return usage, nil +} + func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { if resp == nil || resp.Body == nil { return nil, types.NewOpenAIError(fmt.Errorf("invalid response"), types.ErrorCodeBadResponse, http.StatusInternalServerError) @@ -99,44 +184,28 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo responseId := helper.GetResponseID(c) createAt := time.Now().Unix() - model := info.UpstreamModelName - - var ( - usage = &dto.Usage{} - outputText strings.Builder - usageText strings.Builder - sentStart bool - sentStop bool - sawToolCall bool - streamErr *types.NewAPIError - ) - - toolCallIndexByID := make(map[string]int) - toolCallNameByID := make(map[string]string) - toolCallArgsByID := make(map[string]string) - toolCallNameSent := make(map[string]bool) - toolCallCanonicalIDByItemID := make(map[string]string) - hasSentReasoningSummary := false - needsReasoningSummarySeparator := false - //reasoningSummaryTextByKey := make(map[string]string) + state := openaicompat.NewResponsesToChatStreamState(info.UpstreamModelName, false) + state.ID = responseId + state.Created = createAt + streamErr := (*types.NewAPIError)(nil) if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo == nil { info.ClaudeConvertInfo = &relaycommon.ClaudeConvertInfo{LastMessagesType: relaycommon.LastMessageTypeNone} } - sendChatChunk := func(chunk *dto.ChatCompletionsStreamResponse) bool { - if chunk == nil { + sendChatChunk := func(chunk dto.ChatCompletionsStreamResponse) bool { + if len(chunk.Choices) == 0 && chunk.Usage == nil { return true } if info.RelayFormat == types.RelayFormatOpenAI { - if err := helper.ObjectData(c, chunk); err != nil { + if err := helper.ObjectData(c, &chunk); err != nil { streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) return false } return true } - chunkData, err := common.Marshal(chunk) + chunkData, err := common.Marshal(&chunk) if err != nil { streamErr = types.NewOpenAIError(err, types.ErrorCodeJsonMarshalFailed, http.StatusInternalServerError) return false @@ -148,154 +217,6 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo return true } - sendStartIfNeeded := func() bool { - if sentStart { - return true - } - if !sendChatChunk(helper.GenerateStartEmptyResponse(responseId, createAt, model, nil)) { - return false - } - sentStart = true - return true - } - - //sendReasoningDelta := func(delta string) bool { - // if delta == "" { - // return true - // } - // if !sendStartIfNeeded() { - // return false - // } - // - // usageText.WriteString(delta) - // chunk := &dto.ChatCompletionsStreamResponse{ - // Id: responseId, - // Object: "chat.completion.chunk", - // Created: createAt, - // Model: model, - // Choices: []dto.ChatCompletionsStreamResponseChoice{ - // { - // Index: 0, - // Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ - // ReasoningContent: &delta, - // }, - // }, - // }, - // } - // if err := helper.ObjectData(c, chunk); err != nil { - // streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) - // return false - // } - // return true - //} - - sendReasoningSummaryDelta := func(delta string) bool { - if delta == "" { - return true - } - if needsReasoningSummarySeparator { - if strings.HasPrefix(delta, "\n\n") { - needsReasoningSummarySeparator = false - } else if strings.HasPrefix(delta, "\n") { - delta = "\n" + delta - needsReasoningSummarySeparator = false - } else { - delta = "\n\n" + delta - needsReasoningSummarySeparator = false - } - } - if !sendStartIfNeeded() { - return false - } - - usageText.WriteString(delta) - chunk := &dto.ChatCompletionsStreamResponse{ - Id: responseId, - Object: "chat.completion.chunk", - Created: createAt, - Model: model, - Choices: []dto.ChatCompletionsStreamResponseChoice{ - { - Index: 0, - Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ - ReasoningContent: &delta, - }, - }, - }, - } - if !sendChatChunk(chunk) { - return false - } - hasSentReasoningSummary = true - return true - } - - sendToolCallDelta := func(callID string, name string, argsDelta string) bool { - if callID == "" { - return true - } - if outputText.Len() > 0 { - // Prefer streaming assistant text over tool calls to match non-stream behavior. - return true - } - if !sendStartIfNeeded() { - return false - } - - idx, ok := toolCallIndexByID[callID] - if !ok { - idx = len(toolCallIndexByID) - toolCallIndexByID[callID] = idx - } - if name != "" { - toolCallNameByID[callID] = name - } - if toolCallNameByID[callID] != "" { - name = toolCallNameByID[callID] - } - - tool := dto.ToolCallResponse{ - ID: callID, - Type: "function", - Function: dto.FunctionResponse{ - Arguments: argsDelta, - }, - } - tool.SetIndex(idx) - if name != "" && !toolCallNameSent[callID] { - tool.Function.Name = name - toolCallNameSent[callID] = true - } - - chunk := &dto.ChatCompletionsStreamResponse{ - Id: responseId, - Object: "chat.completion.chunk", - Created: createAt, - Model: model, - Choices: []dto.ChatCompletionsStreamResponseChoice{ - { - Index: 0, - Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ - ToolCalls: []dto.ToolCallResponse{tool}, - }, - }, - }, - } - if !sendChatChunk(chunk) { - return false - } - sawToolCall = true - - // Include tool call data in the local builder for fallback token estimation. - if tool.Function.Name != "" { - usageText.WriteString(tool.Function.Name) - } - if argsDelta != "" { - usageText.WriteString(argsDelta) - } - return true - } - helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) { if streamErr != nil { sr.Stop(streamErr) @@ -309,193 +230,7 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo return } - switch streamResp.Type { - case "response.created": - if streamResp.Response != nil { - if streamResp.Response.Model != "" { - model = streamResp.Response.Model - } - if streamResp.Response.CreatedAt != 0 { - createAt = int64(streamResp.Response.CreatedAt) - } - } - - //case "response.reasoning_text.delta": - //if !sendReasoningDelta(streamResp.Delta) { - // sr.Stop(streamErr) - // return - //} - - //case "response.reasoning_text.done": - - case "response.reasoning_summary_text.delta": - if !sendReasoningSummaryDelta(streamResp.Delta) { - sr.Stop(streamErr) - return - } - - case "response.reasoning_summary_text.done": - if hasSentReasoningSummary { - needsReasoningSummarySeparator = true - } - - //case "response.reasoning_summary_part.added", "response.reasoning_summary_part.done": - // key := responsesStreamIndexKey(strings.TrimSpace(streamResp.ItemID), streamResp.SummaryIndex) - // if key == "" || streamResp.Part == nil { - // break - // } - // // Only handle summary text parts, ignore other part types. - // if streamResp.Part.Type != "" && streamResp.Part.Type != "summary_text" { - // break - // } - // prev := reasoningSummaryTextByKey[key] - // next := streamResp.Part.Text - // delta := stringDeltaFromPrefix(prev, next) - // reasoningSummaryTextByKey[key] = next - // if !sendReasoningSummaryDelta(delta) { - // sr.Stop(streamErr) - // return - // } - - case "response.output_text.delta": - if !sendStartIfNeeded() { - sr.Stop(streamErr) - return - } - - if streamResp.Delta != "" { - outputText.WriteString(streamResp.Delta) - usageText.WriteString(streamResp.Delta) - delta := streamResp.Delta - chunk := &dto.ChatCompletionsStreamResponse{ - Id: responseId, - Object: "chat.completion.chunk", - Created: createAt, - Model: model, - Choices: []dto.ChatCompletionsStreamResponseChoice{ - { - Index: 0, - Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ - Content: &delta, - }, - }, - }, - } - if !sendChatChunk(chunk) { - sr.Stop(streamErr) - return - } - } - - case "response.output_item.added", "response.output_item.done": - if streamResp.Item == nil { - break - } - if streamResp.Item.Type != "function_call" { - break - } - - itemID := strings.TrimSpace(streamResp.Item.ID) - callID := strings.TrimSpace(streamResp.Item.CallId) - if callID == "" { - callID = itemID - } - if itemID != "" && callID != "" { - toolCallCanonicalIDByItemID[itemID] = callID - } - name := strings.TrimSpace(streamResp.Item.Name) - if name != "" { - toolCallNameByID[callID] = name - } - - newArgs := streamResp.Item.ArgumentsString() - prevArgs := toolCallArgsByID[callID] - argsDelta := "" - if newArgs != "" { - if strings.HasPrefix(newArgs, prevArgs) { - argsDelta = newArgs[len(prevArgs):] - } else { - argsDelta = newArgs - } - toolCallArgsByID[callID] = newArgs - } - - if !sendToolCallDelta(callID, name, argsDelta) { - sr.Stop(streamErr) - return - } - - case "response.function_call_arguments.delta": - itemID := strings.TrimSpace(streamResp.ItemID) - callID := toolCallCanonicalIDByItemID[itemID] - if callID == "" { - callID = itemID - } - if callID == "" { - break - } - toolCallArgsByID[callID] += streamResp.Delta - if !sendToolCallDelta(callID, "", streamResp.Delta) { - sr.Stop(streamErr) - return - } - - case "response.function_call_arguments.done": - - case "response.completed": - if streamResp.Response != nil { - if streamResp.Response.Model != "" { - model = streamResp.Response.Model - } - if streamResp.Response.CreatedAt != 0 { - createAt = int64(streamResp.Response.CreatedAt) - } - if streamResp.Response.Usage != nil { - if streamResp.Response.Usage.InputTokens != 0 { - usage.PromptTokens = streamResp.Response.Usage.InputTokens - usage.InputTokens = streamResp.Response.Usage.InputTokens - } - if streamResp.Response.Usage.OutputTokens != 0 { - usage.CompletionTokens = streamResp.Response.Usage.OutputTokens - usage.OutputTokens = streamResp.Response.Usage.OutputTokens - } - if streamResp.Response.Usage.TotalTokens != 0 { - usage.TotalTokens = streamResp.Response.Usage.TotalTokens - } else { - usage.TotalTokens = usage.PromptTokens + usage.CompletionTokens - } - if streamResp.Response.Usage.InputTokensDetails != nil { - usage.PromptTokensDetails.CachedTokens = streamResp.Response.Usage.InputTokensDetails.CachedTokens - usage.PromptTokensDetails.ImageTokens = streamResp.Response.Usage.InputTokensDetails.ImageTokens - usage.PromptTokensDetails.AudioTokens = streamResp.Response.Usage.InputTokensDetails.AudioTokens - } - if streamResp.Response.Usage.CompletionTokenDetails.ReasoningTokens != 0 { - usage.CompletionTokenDetails.ReasoningTokens = streamResp.Response.Usage.CompletionTokenDetails.ReasoningTokens - } - } - } - - if !sendStartIfNeeded() { - sr.Stop(streamErr) - return - } - if !sentStop { - if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil { - info.ClaudeConvertInfo.Usage = usage - } - finishReason := "stop" - if sawToolCall && outputText.Len() == 0 { - finishReason = "tool_calls" - } - stop := helper.GenerateStopResponse(responseId, createAt, model, finishReason) - if !sendChatChunk(stop) { - sr.Stop(streamErr) - return - } - sentStop = true - } - - case "response.error", "response.failed": + if streamResp.Type == "response.error" || streamResp.Type == "response.failed" { if streamResp.Response != nil { if oaiErr := streamResp.Response.GetOpenAIError(); oaiErr != nil && oaiErr.Type != "" { streamErr = types.WithOpenAIError(*oaiErr, http.StatusInternalServerError) @@ -506,8 +241,19 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo streamErr = types.NewOpenAIError(fmt.Errorf("responses stream error: %s", streamResp.Type), types.ErrorCodeBadResponse, http.StatusInternalServerError) sr.Stop(streamErr) return + } - default: + chunks, err := openaicompat.ResponsesStreamEventToChatChunks(&streamResp, state) + if err != nil { + streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) + sr.Stop(streamErr) + return + } + for _, chunk := range chunks { + if !sendChatChunk(chunk) { + sr.Stop(streamErr) + return + } } }) @@ -515,30 +261,22 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo return nil, streamErr } + usage := state.Usage if usage.TotalTokens == 0 { - usage = service.ResponseText2Usage(c, usageText.String(), info.UpstreamModelName, info.GetEstimatePromptTokens()) + usage = service.ResponseText2Usage(c, state.UsageText(), info.UpstreamModelName, info.GetEstimatePromptTokens()) + state.Usage = usage } - if !sentStart { - if !sendChatChunk(helper.GenerateStartEmptyResponse(responseId, createAt, model, nil)) { - return nil, streamErr - } + if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil { + info.ClaudeConvertInfo.Usage = usage } - if !sentStop { - if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil { - info.ClaudeConvertInfo.Usage = usage - } - finishReason := "stop" - if sawToolCall && outputText.Len() == 0 { - finishReason = "tool_calls" - } - stop := helper.GenerateStopResponse(responseId, createAt, model, finishReason) - if !sendChatChunk(stop) { + for _, chunk := range openaicompat.FinalizeResponsesToChatStream(state) { + if !sendChatChunk(chunk) { return nil, streamErr } } if info.RelayFormat == types.RelayFormatOpenAI && info.ShouldIncludeUsage && usage != nil { - if err := helper.ObjectData(c, helper.GenerateFinalUsageResponse(responseId, createAt, model, *usage)); err != nil { + if err := helper.ObjectData(c, helper.GenerateFinalUsageResponse(responseId, state.Created, state.Model, *usage)); err != nil { return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) } } diff --git a/relay/channel/openai/chat_via_responses_test.go b/relay/channel/openai/chat_via_responses_test.go new file mode 100644 index 00000000000..7f5caf26b38 --- /dev/null +++ b/relay/channel/openai/chat_via_responses_test.go @@ -0,0 +1,128 @@ +package openai + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/QuantumNous/new-api/common" + "github.com/QuantumNous/new-api/constant" + relaycommon "github.com/QuantumNous/new-api/relay/common" + "github.com/QuantumNous/new-api/types" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" +) + +func newResponsesChatTestContext(t *testing.T, body string, isStream bool) (*gin.Context, *httptest.ResponseRecorder, *http.Response, *relaycommon.RelayInfo) { + t.Helper() + + recorder := httptest.NewRecorder() + c, _ := gin.CreateTestContext(recorder) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + c.Set(common.RequestIdKey, "responses-test") + + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(body)), + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + } + info := &relaycommon.RelayInfo{ + ChannelMeta: &relaycommon.ChannelMeta{UpstreamModelName: "gpt-test"}, + IsStream: isStream, + RelayFormat: types.RelayFormatOpenAI, + ShouldIncludeUsage: true, + DisablePing: true, + } + return c, recorder, resp, info +} + +func TestOaiResponsesToChatStreamHandlerConvertsSSEOrderAndUsage(t *testing.T) { + oldMode := gin.Mode() + gin.SetMode(gin.TestMode) + t.Cleanup(func() { gin.SetMode(oldMode) }) + + oldTimeout := constant.StreamingTimeout + constant.StreamingTimeout = 30 + t.Cleanup(func() { constant.StreamingTimeout = oldTimeout }) + + body := strings.Join([]string{ + `data: {"type":"response.created","response":{"id":"resp_1","model":"gpt-test","created_at":1710000000}}`, + `data: {"type":"response.output_text.delta","delta":"hello"}`, + `data: {"type":"response.output_item.added","output_index":1,"item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"lookup"}}`, + `data: {"type":"response.function_call_arguments.delta","output_index":1,"delta":"{\"q\":\"x\"}"}`, + `data: {"type":"response.completed","response":{"status":"completed","usage":{"input_tokens":2,"output_tokens":3,"total_tokens":5}}}`, + `data: [DONE]`, + ``, + }, "\n") + + c, recorder, resp, info := newResponsesChatTestContext(t, body, true) + + usage, err := OaiResponsesToChatStreamHandler(c, info, resp) + require.Nil(t, err) + require.NotNil(t, usage) + require.Equal(t, 2, usage.PromptTokens) + require.Equal(t, 3, usage.CompletionTokens) + require.Equal(t, 5, usage.TotalTokens) + + got := recorder.Body.String() + require.Equal(t, "text/event-stream", recorder.Header().Get("Content-Type")) + require.Contains(t, got, `"role":"assistant"`) + require.Contains(t, got, `"content":"hello"`) + require.Contains(t, got, `"name":"lookup"`) + require.Contains(t, got, `"arguments":"{\"q\":\"x\"}"`) + require.Contains(t, got, `"finish_reason":"tool_calls"`) + require.Contains(t, got, `"usage":{"prompt_tokens":2,"completion_tokens":3,"total_tokens":5`) + require.Contains(t, got, `data: [DONE]`) + requireOrderedSubstrings(t, got, + `"role":"assistant"`, + `"content":"hello"`, + `"name":"lookup"`, + `"arguments":"{\"q\":\"x\"}"`, + `"finish_reason":"tool_calls"`, + `"usage":{"prompt_tokens":2,"completion_tokens":3,"total_tokens":5`, + `data: [DONE]`, + ) +} + +func TestOaiResponsesToChatBufferedStreamHandlerReturnsJSONFromSSE(t *testing.T) { + oldMode := gin.Mode() + gin.SetMode(gin.TestMode) + t.Cleanup(func() { gin.SetMode(oldMode) }) + + body := strings.Join([]string{ + `data: {"type":"response.output_text.delta","delta":"buffered text"}`, + `data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"lookup"}}`, + `data: {"type":"response.function_call_arguments.delta","output_index":0,"delta":"{\"q\":\"x\"}"}`, + `data: {"type":"response.done","response":{"model":"gpt-test","status":"completed","usage":{"input_tokens":1,"output_tokens":2,"total_tokens":3}}}`, + `data: [DONE]`, + ``, + }, "\n") + + c, recorder, resp, info := newResponsesChatTestContext(t, body, false) + + usage, err := OaiResponsesToChatBufferedStreamHandler(c, info, resp) + require.Nil(t, err) + require.NotNil(t, usage) + require.Equal(t, 3, usage.TotalTokens) + + got := recorder.Body.String() + require.NotContains(t, got, `data:`) + require.Contains(t, got, `"object":"chat.completion"`) + require.Contains(t, got, `"content":"buffered text"`) + require.Contains(t, got, `"name":"lookup"`) + require.Contains(t, got, `"arguments":"{\"q\":\"x\"}"`) + require.Contains(t, got, `"finish_reason":"tool_calls"`) +} + +func requireOrderedSubstrings(t *testing.T, s string, parts ...string) { + t.Helper() + + offset := 0 + for _, part := range parts { + idx := strings.Index(s[offset:], part) + require.NotEqualf(t, -1, idx, "missing %q after byte offset %d", part, offset) + offset += idx + len(part) + } +} diff --git a/relay/chat_completions_via_responses.go b/relay/chat_completions_via_responses.go index c47da1fab67..ce197f95542 100644 --- a/relay/chat_completions_via_responses.go +++ b/relay/chat_completions_via_responses.go @@ -145,14 +145,16 @@ func chatCompletionsViaResponses(c *gin.Context, info *relaycommon.RelayInfo, ad statusCodeMappingStr := c.GetString("status_code_mapping") httpResp = resp.(*http.Response) - info.IsStream = info.IsStream || strings.HasPrefix(httpResp.Header.Get("Content-Type"), "text/event-stream") + clientStream := info.IsStream + upstreamStream := strings.HasPrefix(httpResp.Header.Get("Content-Type"), "text/event-stream") + info.IsStream = clientStream || upstreamStream if httpResp.StatusCode != http.StatusOK { newApiErr := service.RelayErrorHandler(c.Request.Context(), httpResp, false) service.ResetStatusCode(newApiErr, statusCodeMappingStr) return nil, newApiErr } - if info.IsStream { + if upstreamStream && clientStream { usage, newApiErr := openaichannel.OaiResponsesToChatStreamHandler(c, info, httpResp) if newApiErr != nil { service.ResetStatusCode(newApiErr, statusCodeMappingStr) @@ -160,6 +162,15 @@ func chatCompletionsViaResponses(c *gin.Context, info *relaycommon.RelayInfo, ad } return usage, nil } + if upstreamStream { + info.IsStream = false + usage, newApiErr := openaichannel.OaiResponsesToChatBufferedStreamHandler(c, info, httpResp) + if newApiErr != nil { + service.ResetStatusCode(newApiErr, statusCodeMappingStr) + return nil, newApiErr + } + return usage, nil + } usage, newApiErr := openaichannel.OaiResponsesToChatHandler(c, info, httpResp) if newApiErr != nil { diff --git a/service/convert.go b/service/convert.go index 95acf835ee4..7a1d87476ea 100644 --- a/service/convert.go +++ b/service/convert.go @@ -615,24 +615,25 @@ func ResponseOpenAI2Claude(openAIResponse *dto.OpenAITextResponse, info *relayco } for _, choice := range openAIResponse.Choices { stopReason = stopReasonOpenAI2Claude(choice.FinishReason) - if choice.FinishReason == "tool_calls" { - for _, toolUse := range choice.Message.ParseToolCalls() { - claudeContent := dto.ClaudeMediaMessage{} - claudeContent.Type = "tool_use" - claudeContent.Id = toolUse.ID - claudeContent.Name = toolUse.Function.Name - var mapParams map[string]interface{} - if err := common.Unmarshal([]byte(toolUse.Function.Arguments), &mapParams); err == nil { - claudeContent.Input = mapParams - } else { - claudeContent.Input = toolUse.Function.Arguments - } - contents = append(contents, claudeContent) - } - } else { + textContent := choice.Message.StringContent() + toolCalls := choice.Message.ParseToolCalls() + if textContent != "" || len(toolCalls) == 0 { claudeContent := dto.ClaudeMediaMessage{} claudeContent.Type = "text" - claudeContent.SetText(choice.Message.StringContent()) + claudeContent.SetText(textContent) + contents = append(contents, claudeContent) + } + for _, toolUse := range toolCalls { + claudeContent := dto.ClaudeMediaMessage{} + claudeContent.Type = "tool_use" + claudeContent.Id = toolUse.ID + claudeContent.Name = toolUse.Function.Name + var mapParams map[string]interface{} + if err := common.Unmarshal([]byte(toolUse.Function.Arguments), &mapParams); err == nil { + claudeContent.Input = mapParams + } else { + claudeContent.Input = toolUse.Function.Arguments + } contents = append(contents, claudeContent) } } @@ -863,37 +864,32 @@ func ResponseOpenAI2Gemini(openAIResponse *dto.OpenAITextResponse, info *relayco Parts: make([]dto.GeminiPart, 0), } - // 处理工具调用 - toolCalls := choice.Message.ParseToolCalls() - if len(toolCalls) > 0 { - for _, toolCall := range toolCalls { - // 解析参数 - var args map[string]interface{} - if toolCall.Function.Arguments != "" { - if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &args); err != nil { - args = map[string]interface{}{"arguments": toolCall.Function.Arguments} - } - } else { - args = make(map[string]interface{}) - } + textContent := choice.Message.StringContent() + if textContent != "" { + part := dto.GeminiPart{ + Text: textContent, + } + content.Parts = append(content.Parts, part) + } - part := dto.GeminiPart{ - FunctionCall: &dto.FunctionCall{ - FunctionName: toolCall.Function.Name, - Arguments: args, - }, + toolCalls := choice.Message.ParseToolCalls() + for _, toolCall := range toolCalls { + var args map[string]interface{} + if toolCall.Function.Arguments != "" { + if err := common.Unmarshal([]byte(toolCall.Function.Arguments), &args); err != nil { + args = map[string]interface{}{"arguments": toolCall.Function.Arguments} } - content.Parts = append(content.Parts, part) + } else { + args = make(map[string]interface{}) } - } else { - // 处理文本内容 - textContent := choice.Message.StringContent() - if textContent != "" { - part := dto.GeminiPart{ - Text: textContent, - } - content.Parts = append(content.Parts, part) + + part := dto.GeminiPart{ + FunctionCall: &dto.FunctionCall{ + FunctionName: toolCall.Function.Name, + Arguments: args, + }, } + content.Parts = append(content.Parts, part) } candidate.Content = content diff --git a/service/openai_chat_responses_compat.go b/service/openai_chat_responses_compat.go index 2e887386339..e0202580a81 100644 --- a/service/openai_chat_responses_compat.go +++ b/service/openai_chat_responses_compat.go @@ -13,6 +13,10 @@ func ResponsesResponseToChatCompletionsResponse(resp *dto.OpenAIResponsesRespons return openaicompat.ResponsesResponseToChatCompletionsResponse(resp, id) } +func ResponsesFinishReasonFromStatus(resp *dto.OpenAIResponsesResponse) (string, bool) { + return openaicompat.ResponsesFinishReasonFromStatus(resp) +} + func ExtractOutputTextFromResponses(resp *dto.OpenAIResponsesResponse) string { return openaicompat.ExtractOutputTextFromResponses(resp) } diff --git a/service/openaicompat/chat_responses_compat_test.go b/service/openaicompat/chat_responses_compat_test.go new file mode 100644 index 00000000000..4661f9d94d1 --- /dev/null +++ b/service/openaicompat/chat_responses_compat_test.go @@ -0,0 +1,343 @@ +package openaicompat + +import ( + "testing" + + "github.com/QuantumNous/new-api/dto" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" +) + +func TestChatCompletionsRequestToResponsesRequestInstructionsAndTools(t *testing.T) { + req := &dto.GeneralOpenAIRequest{ + Model: "gpt-test", + N: lo.ToPtr(1), + Messages: []dto.Message{ + {Role: "system", Content: "system rules"}, + {Role: "developer", Content: "developer rules"}, + {Role: "user", Content: []any{ + map[string]any{"type": "text", "text": "look"}, + map[string]any{"type": "image_url", "image_url": map[string]any{"url": "https://example.test/a.png"}}, + }}, + assistantMessageWithTool("partial text", "call_1", "lookup", `{"q":"x"}`), + {Role: "tool", ToolCallId: "call_1", Content: "tool result"}, + }, + } + + got, err := ChatCompletionsRequestToResponsesRequest(req) + require.NoError(t, err) + + assert.Equal(t, "gpt-test", got.Model) + assert.Equal(t, `"system rules\n\ndeveloper rules"`, string(got.Instructions)) + assert.Equal(t, "input_image", gjson.GetBytes(got.Input, "0.content.1.type").String()) + assert.Equal(t, "function_call", gjson.GetBytes(got.Input, "2.type").String()) + assert.Equal(t, "call_1", gjson.GetBytes(got.Input, "2.call_id").String()) + assert.Equal(t, "function_call_output", gjson.GetBytes(got.Input, "3.type").String()) +} + +func TestChatCompletionsRequestToResponsesRequestRejectsMultipleChoices(t *testing.T) { + _, err := ChatCompletionsRequestToResponsesRequest(&dto.GeneralOpenAIRequest{ + Model: "gpt-test", + N: lo.ToPtr(2), + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "n>1") +} + +func TestResponsesResponseToChatCompletionsPreservesTextAndToolCalls(t *testing.T) { + resp := &dto.OpenAIResponsesResponse{ + ID: "resp_1", + CreatedAt: 123, + Model: "gpt-test", + Status: []byte(`"completed"`), + Output: []dto.ResponsesOutput{ + { + Type: responsesOutputTypeMessage, + Role: "assistant", + Content: []dto.ResponsesOutputContent{ + {Type: "output_text", Text: "I will call a tool."}, + }, + }, + { + Type: responsesOutputTypeFunctionCall, + ID: "fc_1", + CallId: "call_1", + Name: "lookup", + Arguments: []byte(`{"q":"x"}`), + }, + }, + Usage: &dto.Usage{InputTokens: 3, OutputTokens: 4, TotalTokens: 7}, + } + + chat, usage, err := ResponsesResponseToChatCompletionsResponse(resp, "chatcmpl_1") + require.NoError(t, err) + require.NotNil(t, usage) + + require.Len(t, chat.Choices, 1) + assert.Equal(t, "tool_calls", chat.Choices[0].FinishReason) + assert.Equal(t, "I will call a tool.", chat.Choices[0].Message.StringContent()) + toolCalls := chat.Choices[0].Message.ParseToolCalls() + require.Len(t, toolCalls, 1) + assert.Equal(t, "call_1", toolCalls[0].ID) + assert.Equal(t, "lookup", toolCalls[0].Function.Name) + assert.Equal(t, `{"q":"x"}`, toolCalls[0].Function.Arguments) + assert.Equal(t, 7, usage.TotalTokens) +} + +func TestResponsesResponseToChatCompletionsPreservesReasoningSummary(t *testing.T) { + resp := &dto.OpenAIResponsesResponse{ + ID: "resp_1", + Model: "gpt-test", + Status: []byte(`"completed"`), + Output: []dto.ResponsesOutput{ + { + Type: responsesOutputTypeReasoning, + Content: []dto.ResponsesOutputContent{ + {Type: "summary_text", Text: "first summary"}, + {Type: "summary_text", Text: "\n\nsecond summary"}, + }, + }, + { + Type: responsesOutputTypeMessage, + Role: "assistant", + Content: []dto.ResponsesOutputContent{ + {Type: "output_text", Text: "final"}, + }, + }, + }, + } + + chat, _, err := ResponsesResponseToChatCompletionsResponse(resp, "chatcmpl_1") + require.NoError(t, err) + assert.Equal(t, "first summary\n\nsecond summary", chat.Choices[0].Message.GetReasoningContent()) + assert.Equal(t, "final", chat.Choices[0].Message.StringContent()) +} + +func TestResponsesFinishReasonFromIncompleteStatus(t *testing.T) { + tests := []struct { + name string + reason string + want string + }{ + {name: "max output", reason: responsesIncompleteReasonMaxTokens, want: "length"}, + {name: "content filter", reason: responsesIncompleteReasonContentFilter, want: "content_filter"}, + {name: "unknown", reason: "other", want: "length"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, ok := ResponsesFinishReasonFromStatus(&dto.OpenAIResponsesResponse{ + Status: []byte(`"incomplete"`), + IncompleteDetails: &dto.IncompleteDetails{Reason: tt.reason}, + }) + require.True(t, ok) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestResponsesStreamEventToChatChunksUsesOutputIndexForToolArguments(t *testing.T) { + state := newTestResponsesStreamState() + outputIndex := 1 + + var chunks []dto.ChatCompletionsStreamResponse + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{Type: responsesEventCreated})...) + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{Type: responsesEventOutputTextDelta, Delta: "text before tool"})...) + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventFunctionArgsDelta, + OutputIndex: &outputIndex, + Delta: `{"cmd":"ls"}`, + })...) + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventOutputItemAdded, + OutputIndex: &outputIndex, + Item: &dto.ResponsesOutput{ + Type: responsesOutputTypeFunctionCall, + ID: "fc_1", + CallId: "call_1", + Name: "exec", + }, + })...) + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventCompleted, + Response: &dto.OpenAIResponsesResponse{ + Status: []byte(`"completed"`), + Usage: &dto.Usage{InputTokens: 1, OutputTokens: 2, TotalTokens: 3}, + }, + })...) + + require.Len(t, chunks, 4) + assert.Equal(t, "assistant", chunks[0].Choices[0].Delta.Role) + assert.Equal(t, "text before tool", chunks[1].Choices[0].Delta.GetContentString()) + tool := chunks[2].Choices[0].Delta.ToolCalls[0] + require.NotNil(t, tool.Index) + assert.Equal(t, 0, *tool.Index) + assert.Equal(t, "call_1", tool.ID) + assert.Equal(t, "exec", tool.Function.Name) + assert.Equal(t, `{"cmd":"ls"}`, tool.Function.Arguments) + require.NotNil(t, chunks[3].Choices[0].FinishReason) + assert.Equal(t, "tool_calls", *chunks[3].Choices[0].FinishReason) + assert.Equal(t, 3, state.Usage.TotalTokens) +} + +func TestResponsesStreamEventToChatChunksCustomToolAndReasoning(t *testing.T) { + state := newTestResponsesStreamState() + outputIndex := 0 + + chunks := mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventReasoningTextDelta, + Delta: "thinking", + }) + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventOutputItemAdded, + OutputIndex: &outputIndex, + Item: &dto.ResponsesOutput{ + Type: responsesOutputTypeCustomToolCall, + ID: "ct_1", + CallId: "call_custom", + Name: "apply_patch", + }, + })...) + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventCustomToolInputDelta, + OutputIndex: &outputIndex, + Delta: "patch body", + })...) + chunks = append(chunks, mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventIncomplete, + Response: &dto.OpenAIResponsesResponse{ + IncompleteDetails: &dto.IncompleteDetails{Reason: responsesIncompleteReasonContentFilter}, + }, + })...) + + require.Len(t, chunks, 5) + assert.Equal(t, "thinking", chunks[1].Choices[0].Delta.GetReasoningContent()) + assert.Equal(t, "apply_patch", chunks[2].Choices[0].Delta.ToolCalls[0].Function.Name) + assert.Equal(t, "patch body", chunks[3].Choices[0].Delta.ToolCalls[0].Function.Arguments) + require.NotNil(t, chunks[4].Choices[0].FinishReason) + assert.Equal(t, "content_filter", *chunks[4].Choices[0].FinishReason) +} + +func TestResponsesStreamEventToChatChunksUsesTerminalDoneOutput(t *testing.T) { + state := newTestResponsesStreamState() + chunks := mustStreamChunks(t, state, &dto.ResponsesStreamResponse{ + Type: responsesEventDone, + Response: &dto.OpenAIResponsesResponse{ + Status: []byte(`"completed"`), + Output: []dto.ResponsesOutput{ + { + Type: responsesOutputTypeMessage, + Role: "assistant", + Content: []dto.ResponsesOutputContent{ + {Type: "output_text", Text: "terminal text"}, + }, + }, + { + Type: responsesOutputTypeFunctionCall, + ID: "fc_1", + CallId: "call_1", + Name: "lookup", + Arguments: []byte(`{"q":"x"}`), + }, + }, + }, + }) + + require.Len(t, chunks, 4) + assert.Equal(t, "assistant", chunks[0].Choices[0].Delta.Role) + assert.Equal(t, "terminal text", chunks[1].Choices[0].Delta.GetContentString()) + tool := chunks[2].Choices[0].Delta.ToolCalls[0] + assert.Equal(t, "lookup", tool.Function.Name) + assert.Equal(t, `{"q":"x"}`, tool.Function.Arguments) + require.NotNil(t, chunks[3].Choices[0].FinishReason) + assert.Equal(t, "tool_calls", *chunks[3].Choices[0].FinishReason) +} + +func TestFinalizeResponsesToChatStreamFlushesPendingDeltaOnlyArguments(t *testing.T) { + state := newTestResponsesStreamState() + outputIndex := 2 + _, err := ResponsesStreamEventToChatChunks(&dto.ResponsesStreamResponse{ + Type: responsesEventFunctionArgsDelta, + OutputIndex: &outputIndex, + Delta: `{"pending":true}`, + }, state) + require.NoError(t, err) + + chunks := FinalizeResponsesToChatStream(state) + require.Len(t, chunks, 3) + tool := chunks[1].Choices[0].Delta.ToolCalls[0] + assert.Equal(t, "call_output_2", tool.ID) + assert.Equal(t, `{"pending":true}`, tool.Function.Arguments) + require.NotNil(t, chunks[2].Choices[0].FinishReason) + assert.Equal(t, "tool_calls", *chunks[2].Choices[0].FinishReason) +} + +func TestResponsesStreamEventToChatChunksFailedEventReturnsError(t *testing.T) { + _, err := ResponsesStreamEventToChatChunks(&dto.ResponsesStreamResponse{Type: responsesEventFailed}, newTestResponsesStreamState()) + require.Error(t, err) +} + +func TestResponsesBufferedAccumulatorSupplementsEmptyTerminalOutput(t *testing.T) { + acc := NewResponsesBufferedAccumulator() + outputIndex := 1 + acc.ProcessEvent(&dto.ResponsesStreamResponse{Type: responsesEventOutputTextDelta, Delta: "buffered text"}) + acc.ProcessEvent(&dto.ResponsesStreamResponse{ + Type: responsesEventOutputItemAdded, + OutputIndex: &outputIndex, + Item: &dto.ResponsesOutput{ + Type: responsesOutputTypeFunctionCall, + ID: "fc_1", + CallId: "call_1", + Name: "lookup", + }, + }) + acc.ProcessEvent(&dto.ResponsesStreamResponse{ + Type: responsesEventFunctionArgsDelta, + OutputIndex: &outputIndex, + Delta: `{"q":"x"}`, + }) + + resp := &dto.OpenAIResponsesResponse{ + Status: []byte(`"completed"`), + Model: "gpt-test", + } + acc.SupplementResponseOutput(resp) + + chat, _, err := ResponsesResponseToChatCompletionsResponse(resp, "chatcmpl_1") + require.NoError(t, err) + assert.Equal(t, "buffered text", chat.Choices[0].Message.StringContent()) + toolCalls := chat.Choices[0].Message.ParseToolCalls() + require.Len(t, toolCalls, 1) + assert.Equal(t, `{"q":"x"}`, toolCalls[0].Function.Arguments) +} + +func assistantMessageWithTool(content string, id string, name string, args string) dto.Message { + msg := dto.Message{Role: "assistant", Content: content} + msg.SetToolCalls([]dto.ToolCallRequest{ + { + ID: id, + Type: "function", + Function: dto.FunctionRequest{ + Name: name, + Arguments: args, + }, + }, + }) + return msg +} + +func newTestResponsesStreamState() *ResponsesToChatStreamState { + state := NewResponsesToChatStreamState("gpt-test", false) + state.ID = "chatcmpl_test" + state.Created = 123 + return state +} + +func mustStreamChunks(t *testing.T, state *ResponsesToChatStreamState, event *dto.ResponsesStreamResponse) []dto.ChatCompletionsStreamResponse { + t.Helper() + chunks, err := ResponsesStreamEventToChatChunks(event, state) + require.NoError(t, err) + return chunks +} diff --git a/service/openaicompat/responses_to_chat.go b/service/openaicompat/responses_to_chat.go index d1c7473fe8a..d7a9fbc5320 100644 --- a/service/openaicompat/responses_to_chat.go +++ b/service/openaicompat/responses_to_chat.go @@ -2,49 +2,77 @@ package openaicompat import ( "errors" + "fmt" + "sort" "strings" + "time" + "github.com/QuantumNous/new-api/common" "github.com/QuantumNous/new-api/dto" ) +const ( + responsesEventCreated = "response.created" + responsesEventCompleted = "response.completed" + responsesEventDone = "response.done" + responsesEventIncomplete = "response.incomplete" + responsesEventFailed = "response.failed" + responsesEventError = "response.error" + responsesEventOutputTextDelta = "response.output_text.delta" + responsesEventOutputItemAdded = "response.output_item.added" + responsesEventOutputItemDone = "response.output_item.done" + responsesEventFunctionArgsDelta = "response.function_call_arguments.delta" + responsesEventFunctionArgsDone = "response.function_call_arguments.done" + responsesEventCustomToolInputDelta = "response.custom_tool_call_input.delta" + responsesEventCustomToolInputDone = "response.custom_tool_call_input.done" + responsesEventReasoningSummaryDelta = "response.reasoning_summary_text.delta" + responsesEventReasoningSummaryDone = "response.reasoning_summary_text.done" + responsesEventReasoningTextDelta = "response.reasoning_text.delta" + responsesEventReasoningTextDone = "response.reasoning_text.done" + responsesOutputTypeFunctionCall = "function_call" + responsesOutputTypeCustomToolCall = "custom_tool_call" + responsesOutputTypeMessage = "message" + responsesOutputTypeReasoning = "reasoning" + responsesIncompleteReasonContentFilter = "content_filter" + responsesIncompleteReasonMaxTokens = "max_output_tokens" +) + +func ResponsesFinishReasonFromStatus(resp *dto.OpenAIResponsesResponse) (string, bool) { + if resp == nil { + return "", false + } + + status := responseStatusString(resp) + if status != "incomplete" { + return "", false + } + + reason := "" + if resp.IncompleteDetails != nil { + reason = strings.TrimSpace(resp.IncompleteDetails.Reason) + } + if reason == responsesIncompleteReasonContentFilter { + return "content_filter", true + } + return "length", true +} + func ResponsesResponseToChatCompletionsResponse(resp *dto.OpenAIResponsesResponse, id string) (*dto.OpenAITextResponse, *dto.Usage, error) { if resp == nil { return nil, nil, errors.New("response is nil") } text := ExtractOutputTextFromResponses(resp) + reasoning := ExtractReasoningTextFromResponses(resp) - usage := &dto.Usage{} - if resp.Usage != nil { - if resp.Usage.InputTokens != 0 { - usage.PromptTokens = resp.Usage.InputTokens - usage.InputTokens = resp.Usage.InputTokens - } - if resp.Usage.OutputTokens != 0 { - usage.CompletionTokens = resp.Usage.OutputTokens - usage.OutputTokens = resp.Usage.OutputTokens - } - if resp.Usage.TotalTokens != 0 { - usage.TotalTokens = resp.Usage.TotalTokens - } else { - usage.TotalTokens = usage.PromptTokens + usage.CompletionTokens - } - if resp.Usage.InputTokensDetails != nil { - usage.PromptTokensDetails.CachedTokens = resp.Usage.InputTokensDetails.CachedTokens - usage.PromptTokensDetails.ImageTokens = resp.Usage.InputTokensDetails.ImageTokens - usage.PromptTokensDetails.AudioTokens = resp.Usage.InputTokensDetails.AudioTokens - } - if resp.Usage.CompletionTokenDetails.ReasoningTokens != 0 { - usage.CompletionTokenDetails.ReasoningTokens = resp.Usage.CompletionTokenDetails.ReasoningTokens - } - } + usage := UsageFromResponsesUsage(resp.Usage) created := resp.CreatedAt var toolCalls []dto.ToolCallResponse - if text == "" && len(resp.Output) > 0 { + if len(resp.Output) > 0 { for _, out := range resp.Output { - if out.Type != "function_call" { + if !isResponsesToolOutputType(out.Type) { continue } name := strings.TrimSpace(out.Name) @@ -67,7 +95,9 @@ func ResponsesResponseToChatCompletionsResponse(resp *dto.OpenAIResponsesRespons } finishReason := "stop" - if len(toolCalls) > 0 { + if mappedReason, ok := ResponsesFinishReasonFromStatus(resp); ok { + finishReason = mappedReason + } else if len(toolCalls) > 0 { finishReason = "tool_calls" } @@ -75,9 +105,11 @@ func ResponsesResponseToChatCompletionsResponse(resp *dto.OpenAIResponsesRespons Role: "assistant", Content: text, } + if reasoning != "" { + msg.ReasoningContent = &reasoning + } if len(toolCalls) > 0 { msg.SetToolCalls(toolCalls) - msg.Content = "" } out := &dto.OpenAITextResponse{ @@ -98,6 +130,35 @@ func ResponsesResponseToChatCompletionsResponse(resp *dto.OpenAIResponsesRespons return out, usage, nil } +func UsageFromResponsesUsage(src *dto.Usage) *dto.Usage { + usage := &dto.Usage{} + if src == nil { + return usage + } + if src.InputTokens != 0 { + usage.PromptTokens = src.InputTokens + usage.InputTokens = src.InputTokens + } + if src.OutputTokens != 0 { + usage.CompletionTokens = src.OutputTokens + usage.OutputTokens = src.OutputTokens + } + if src.TotalTokens != 0 { + usage.TotalTokens = src.TotalTokens + } else { + usage.TotalTokens = usage.PromptTokens + usage.CompletionTokens + } + if src.InputTokensDetails != nil { + usage.PromptTokensDetails.CachedTokens = src.InputTokensDetails.CachedTokens + usage.PromptTokensDetails.ImageTokens = src.InputTokensDetails.ImageTokens + usage.PromptTokensDetails.AudioTokens = src.InputTokensDetails.AudioTokens + } + if src.CompletionTokenDetails.ReasoningTokens != 0 { + usage.CompletionTokenDetails.ReasoningTokens = src.CompletionTokenDetails.ReasoningTokens + } + return usage +} + func ExtractOutputTextFromResponses(resp *dto.OpenAIResponsesResponse) string { if resp == nil || len(resp.Output) == 0 { return "" @@ -131,3 +192,764 @@ func ExtractOutputTextFromResponses(resp *dto.OpenAIResponsesResponse) string { } return sb.String() } + +func ExtractReasoningTextFromResponses(resp *dto.OpenAIResponsesResponse) string { + if resp == nil || len(resp.Output) == 0 { + return "" + } + + var sb strings.Builder + for _, out := range resp.Output { + if out.Type != responsesOutputTypeReasoning { + continue + } + for _, c := range out.Content { + if c.Text != "" { + sb.WriteString(c.Text) + } + } + } + return sb.String() +} + +type ResponsesToChatStreamState struct { + ID string + Model string + Created int64 + IncludeUsage bool + + Usage *dto.Usage + + sentStart bool + finalized bool + hasSentText bool + sawToolCall bool + hasSentReasoning bool + needsReasoningSummaryBreak bool + nextToolIndex int + toolByKey map[string]*responsesStreamTool + outputIndexToKey map[int]string + itemIDToKey map[string]string + callIDToKey map[string]string + pendingArgsByOutputIndex map[int]string + pendingArgsByItemID map[string]string + usageText strings.Builder +} + +type responsesStreamTool struct { + Key string + CallID string + ItemID string + Name string + Arguments string + Index int + Sent bool + NameSent bool + ArgsSentAt int +} + +func NewResponsesToChatStreamState(model string, includeUsage bool) *ResponsesToChatStreamState { + return &ResponsesToChatStreamState{ + Model: model, + Created: time.Now().Unix(), + IncludeUsage: includeUsage, + Usage: &dto.Usage{}, + toolByKey: make(map[string]*responsesStreamTool), + outputIndexToKey: make(map[int]string), + itemIDToKey: make(map[string]string), + callIDToKey: make(map[string]string), + pendingArgsByOutputIndex: make(map[int]string), + pendingArgsByItemID: make(map[string]string), + } +} + +func (s *ResponsesToChatStreamState) UsageText() string { + if s == nil { + return "" + } + return s.usageText.String() +} + +func ResponsesStreamEventToChatChunks(event *dto.ResponsesStreamResponse, state *ResponsesToChatStreamState) ([]dto.ChatCompletionsStreamResponse, error) { + if event == nil || state == nil { + return nil, nil + } + + switch event.Type { + case responsesEventCreated: + state.applyResponseMetadata(event.Response) + return state.ensureStart(), nil + case responsesEventReasoningSummaryDelta, responsesEventReasoningTextDelta: + return state.reasoningDelta(event.Delta), nil + case responsesEventReasoningSummaryDone, responsesEventReasoningTextDone: + if state.hasSentReasoning { + state.needsReasoningSummaryBreak = true + } + return nil, nil + case responsesEventOutputTextDelta: + return state.textDelta(event.Delta), nil + case responsesEventOutputItemAdded, responsesEventOutputItemDone: + if event.Item == nil || !isResponsesToolOutputType(event.Item.Type) { + return nil, nil + } + return state.toolItem(event), nil + case responsesEventFunctionArgsDelta, responsesEventCustomToolInputDelta: + return state.toolArgumentsDelta(event), nil + case responsesEventFunctionArgsDone, responsesEventCustomToolInputDone: + return state.flushPendingTool(event), nil + case responsesEventCompleted, responsesEventDone, responsesEventIncomplete: + response := event.Response + if event.Type == responsesEventIncomplete { + response = ensureIncompleteResponse(response) + } + state.applyResponseMetadata(response) + chunks := state.terminalOutputChunks(response) + chunks = append(chunks, state.finalize(response)...) + return chunks, nil + case responsesEventFailed, responsesEventError: + return nil, fmt.Errorf("responses stream error: %s", event.Type) + default: + return nil, nil + } +} + +func FinalizeResponsesToChatStream(state *ResponsesToChatStreamState) []dto.ChatCompletionsStreamResponse { + if state == nil { + return nil + } + return state.finalize(nil) +} + +func (s *ResponsesToChatStreamState) applyResponseMetadata(response *dto.OpenAIResponsesResponse) { + if response == nil { + return + } + if response.ID != "" && s.ID == "" { + s.ID = response.ID + } + if response.Model != "" { + s.Model = response.Model + } + if response.CreatedAt != 0 { + s.Created = int64(response.CreatedAt) + } + if response.Usage != nil { + s.Usage = UsageFromResponsesUsage(response.Usage) + } +} + +func (s *ResponsesToChatStreamState) ensureStart() []dto.ChatCompletionsStreamResponse { + if s.sentStart { + return nil + } + s.sentStart = true + return []dto.ChatCompletionsStreamResponse{s.makeChunk(dto.ChatCompletionsStreamResponseChoiceDelta{ + Role: "assistant", + Content: common.GetPointer(""), + }, nil)} +} + +func (s *ResponsesToChatStreamState) textDelta(delta string) []dto.ChatCompletionsStreamResponse { + if delta == "" { + return nil + } + s.usageText.WriteString(delta) + s.hasSentText = true + chunks := s.ensureStart() + chunks = append(chunks, s.makeChunk(dto.ChatCompletionsStreamResponseChoiceDelta{ + Content: &delta, + }, nil)) + return chunks +} + +func (s *ResponsesToChatStreamState) terminalOutputChunks(response *dto.OpenAIResponsesResponse) []dto.ChatCompletionsStreamResponse { + if s == nil || response == nil || len(response.Output) == 0 { + return nil + } + + var chunks []dto.ChatCompletionsStreamResponse + for i := range response.Output { + out := &response.Output[i] + switch { + case out.Type == responsesOutputTypeMessage && !s.hasSentText: + var text strings.Builder + for _, c := range out.Content { + if c.Type == "output_text" && c.Text != "" { + text.WriteString(c.Text) + } + } + chunks = append(chunks, s.textDelta(text.String())...) + case out.Type == responsesOutputTypeReasoning && !s.hasSentReasoning: + var reasoning strings.Builder + for _, c := range out.Content { + if c.Text != "" { + reasoning.WriteString(c.Text) + } + } + chunks = append(chunks, s.reasoningDelta(reasoning.String())...) + case isResponsesToolOutputType(out.Type): + chunks = append(chunks, s.toolItem(&dto.ResponsesStreamResponse{Item: out})...) + } + } + return chunks +} + +func (s *ResponsesToChatStreamState) reasoningDelta(delta string) []dto.ChatCompletionsStreamResponse { + if delta == "" { + return nil + } + if s.needsReasoningSummaryBreak { + if strings.HasPrefix(delta, "\n\n") { + s.needsReasoningSummaryBreak = false + } else if strings.HasPrefix(delta, "\n") { + delta = "\n" + delta + s.needsReasoningSummaryBreak = false + } else { + delta = "\n\n" + delta + s.needsReasoningSummaryBreak = false + } + } + s.usageText.WriteString(delta) + chunks := s.ensureStart() + chunks = append(chunks, s.makeChunk(dto.ChatCompletionsStreamResponseChoiceDelta{ + ReasoningContent: &delta, + }, nil)) + s.hasSentReasoning = true + return chunks +} + +func (s *ResponsesToChatStreamState) toolItem(event *dto.ResponsesStreamResponse) []dto.ChatCompletionsStreamResponse { + tool := s.ensureToolForEvent(event) + if tool == nil { + return nil + } + args := event.Item.ArgumentsString() + if args != "" { + tool.Arguments = args + } + return s.toolDelta(tool, "") +} + +func (s *ResponsesToChatStreamState) toolArgumentsDelta(event *dto.ResponsesStreamResponse) []dto.ChatCompletionsStreamResponse { + if event.Delta == "" { + return nil + } + tool := s.findToolForEvent(event) + if tool == nil { + if event.OutputIndex != nil { + s.pendingArgsByOutputIndex[*event.OutputIndex] += event.Delta + } + if itemID := strings.TrimSpace(event.ItemID); itemID != "" { + s.pendingArgsByItemID[itemID] += event.Delta + } + return nil + } + tool.Arguments += event.Delta + return s.toolDelta(tool, event.Delta) +} + +func (s *ResponsesToChatStreamState) flushPendingTool(event *dto.ResponsesStreamResponse) []dto.ChatCompletionsStreamResponse { + tool := s.findToolForEvent(event) + if tool == nil { + tool = s.ensureFallbackToolForEvent(event) + } + if tool == nil { + return nil + } + return s.toolDelta(tool, "") +} + +func (s *ResponsesToChatStreamState) ensureToolForEvent(event *dto.ResponsesStreamResponse) *responsesStreamTool { + if event == nil || event.Item == nil { + return nil + } + key := s.keyForEvent(event) + if key == "" { + key = fallbackToolKey(event.Item.ID, event.Item.CallId, event.OutputIndex) + } + if key == "" { + return nil + } + + tool := s.toolByKey[key] + if tool == nil { + tool = &responsesStreamTool{Key: key, Index: s.nextToolIndex} + s.nextToolIndex++ + s.toolByKey[key] = tool + } + + if event.OutputIndex != nil { + s.outputIndexToKey[*event.OutputIndex] = key + if pending := s.pendingArgsByOutputIndex[*event.OutputIndex]; pending != "" { + tool.Arguments += pending + delete(s.pendingArgsByOutputIndex, *event.OutputIndex) + } + } + if itemID := strings.TrimSpace(event.Item.ID); itemID != "" { + tool.ItemID = itemID + s.itemIDToKey[itemID] = key + if pending := s.pendingArgsByItemID[itemID]; pending != "" { + tool.Arguments += pending + delete(s.pendingArgsByItemID, itemID) + } + } + if callID := strings.TrimSpace(event.Item.CallId); callID != "" { + tool.CallID = callID + s.callIDToKey[callID] = key + } else if tool.CallID == "" { + tool.CallID = strings.TrimSpace(event.Item.ID) + } + if name := strings.TrimSpace(event.Item.Name); name != "" { + tool.Name = name + } + return tool +} + +func (s *ResponsesToChatStreamState) findToolForEvent(event *dto.ResponsesStreamResponse) *responsesStreamTool { + if event == nil { + return nil + } + if event.OutputIndex != nil { + if key := s.outputIndexToKey[*event.OutputIndex]; key != "" { + return s.toolByKey[key] + } + } + if itemID := strings.TrimSpace(event.ItemID); itemID != "" { + if key := s.itemIDToKey[itemID]; key != "" { + return s.toolByKey[key] + } + } + if event.Item != nil { + if key := s.keyForEvent(event); key != "" { + return s.toolByKey[key] + } + } + return nil +} + +func (s *ResponsesToChatStreamState) ensureFallbackToolForEvent(event *dto.ResponsesStreamResponse) *responsesStreamTool { + if event == nil { + return nil + } + key := "" + if event.OutputIndex != nil { + key = fmt.Sprintf("output:%d", *event.OutputIndex) + } + if key == "" && strings.TrimSpace(event.ItemID) != "" { + key = "item:" + strings.TrimSpace(event.ItemID) + } + if key == "" { + return nil + } + tool := s.toolByKey[key] + if tool == nil { + tool = &responsesStreamTool{ + Key: key, + Index: s.nextToolIndex, + CallID: fallbackCallID(event), + } + s.nextToolIndex++ + s.toolByKey[key] = tool + } + if event.OutputIndex != nil { + s.outputIndexToKey[*event.OutputIndex] = key + if pending := s.pendingArgsByOutputIndex[*event.OutputIndex]; pending != "" { + tool.Arguments += pending + delete(s.pendingArgsByOutputIndex, *event.OutputIndex) + } + } + if itemID := strings.TrimSpace(event.ItemID); itemID != "" { + tool.ItemID = itemID + s.itemIDToKey[itemID] = key + if pending := s.pendingArgsByItemID[itemID]; pending != "" { + tool.Arguments += pending + delete(s.pendingArgsByItemID, itemID) + } + } + return tool +} + +func (s *ResponsesToChatStreamState) toolDelta(tool *responsesStreamTool, explicitDelta string) []dto.ChatCompletionsStreamResponse { + if tool == nil { + return nil + } + + argsDelta := explicitDelta + if argsDelta == "" && len(tool.Arguments) > tool.ArgsSentAt { + argsDelta = tool.Arguments[tool.ArgsSentAt:] + } + if tool.Sent && argsDelta == "" && (tool.Name == "" || tool.NameSent) { + return nil + } + + chunks := s.ensureStart() + callID := strings.TrimSpace(tool.CallID) + if callID == "" { + callID = tool.Key + } + responseTool := dto.ToolCallResponse{ + ID: callID, + Type: "function", + Function: dto.FunctionResponse{ + Arguments: argsDelta, + }, + } + responseTool.SetIndex(tool.Index) + if !tool.NameSent && tool.Name != "" { + responseTool.Function.Name = tool.Name + tool.NameSent = true + } + if !tool.Sent { + tool.Sent = true + } + if argsDelta != "" { + tool.ArgsSentAt += len(argsDelta) + s.usageText.WriteString(argsDelta) + } + if responseTool.Function.Name != "" { + s.usageText.WriteString(responseTool.Function.Name) + } + + chunks = append(chunks, s.makeChunk(dto.ChatCompletionsStreamResponseChoiceDelta{ + ToolCalls: []dto.ToolCallResponse{responseTool}, + }, nil)) + s.sawToolCall = true + return chunks +} + +func (s *ResponsesToChatStreamState) finalize(response *dto.OpenAIResponsesResponse) []dto.ChatCompletionsStreamResponse { + if s.finalized { + return nil + } + s.finalized = true + + chunks := s.flushAllPendingTools() + chunks = append(chunks, s.ensureStart()...) + + finishReason := "stop" + if mappedReason, ok := ResponsesFinishReasonFromStatus(response); ok { + finishReason = mappedReason + } else if s.sawToolCall { + finishReason = "tool_calls" + } + chunks = append(chunks, s.makeChunk(dto.ChatCompletionsStreamResponseChoiceDelta{}, &finishReason)) + if s.IncludeUsage && s.Usage != nil { + chunks = append(chunks, dto.ChatCompletionsStreamResponse{ + Id: s.ID, + Object: "chat.completion.chunk", + Created: s.Created, + Model: s.Model, + Choices: make([]dto.ChatCompletionsStreamResponseChoice, 0), + Usage: s.Usage, + }) + } + return chunks +} + +func (s *ResponsesToChatStreamState) flushAllPendingTools() []dto.ChatCompletionsStreamResponse { + keys := make([]string, 0, len(s.toolByKey)+len(s.pendingArgsByOutputIndex)+len(s.pendingArgsByItemID)) + seen := make(map[string]bool) + for key := range s.toolByKey { + keys = append(keys, key) + seen[key] = true + } + for outputIndex := range s.pendingArgsByOutputIndex { + key := fmt.Sprintf("output:%d", outputIndex) + if !seen[key] { + keys = append(keys, key) + seen[key] = true + } + } + for itemID := range s.pendingArgsByItemID { + key := "item:" + itemID + if !seen[key] { + keys = append(keys, key) + seen[key] = true + } + } + sort.Strings(keys) + + var chunks []dto.ChatCompletionsStreamResponse + for _, key := range keys { + tool := s.toolByKey[key] + if tool == nil { + callID := strings.TrimPrefix(key, "item:") + if strings.HasPrefix(key, "output:") { + callID = "call_output_" + strings.TrimPrefix(key, "output:") + } + tool = &responsesStreamTool{ + Key: key, + Index: s.nextToolIndex, + CallID: callID, + } + s.nextToolIndex++ + s.toolByKey[key] = tool + } + if strings.HasPrefix(key, "output:") { + var outputIndex int + if _, err := fmt.Sscanf(key, "output:%d", &outputIndex); err == nil { + tool.Arguments += s.pendingArgsByOutputIndex[outputIndex] + delete(s.pendingArgsByOutputIndex, outputIndex) + } + } + if strings.HasPrefix(key, "item:") { + itemID := strings.TrimPrefix(key, "item:") + tool.Arguments += s.pendingArgsByItemID[itemID] + delete(s.pendingArgsByItemID, itemID) + } + chunks = append(chunks, s.toolDelta(tool, "")...) + } + return chunks +} + +func (s *ResponsesToChatStreamState) makeChunk(delta dto.ChatCompletionsStreamResponseChoiceDelta, finishReason *string) dto.ChatCompletionsStreamResponse { + return dto.ChatCompletionsStreamResponse{ + Id: s.ID, + Object: "chat.completion.chunk", + Created: s.Created, + Model: s.Model, + Choices: []dto.ChatCompletionsStreamResponseChoice{ + { + Index: 0, + Delta: delta, + FinishReason: finishReason, + }, + }, + } +} + +func (s *ResponsesToChatStreamState) keyForEvent(event *dto.ResponsesStreamResponse) string { + if event == nil { + return "" + } + if event.OutputIndex != nil { + return fmt.Sprintf("output:%d", *event.OutputIndex) + } + if event.Item != nil { + if itemID := strings.TrimSpace(event.Item.ID); itemID != "" { + return "item:" + itemID + } + if callID := strings.TrimSpace(event.Item.CallId); callID != "" { + return "call:" + callID + } + } + if itemID := strings.TrimSpace(event.ItemID); itemID != "" { + return "item:" + itemID + } + return "" +} + +type ResponsesBufferedAccumulator struct { + text strings.Builder + reasoning strings.Builder + tools []*responsesBufferedTool + outputIndexToToolIdx map[int]int + itemIDToToolIdx map[string]int + pendingByOutputIndex map[int]string + pendingByItemID map[string]string +} + +type responsesBufferedTool struct { + CallID string + ItemID string + Name string + Arguments strings.Builder +} + +func NewResponsesBufferedAccumulator() *ResponsesBufferedAccumulator { + return &ResponsesBufferedAccumulator{ + outputIndexToToolIdx: make(map[int]int), + itemIDToToolIdx: make(map[string]int), + pendingByOutputIndex: make(map[int]string), + pendingByItemID: make(map[string]string), + } +} + +func (a *ResponsesBufferedAccumulator) ProcessEvent(event *dto.ResponsesStreamResponse) { + if a == nil || event == nil { + return + } + switch event.Type { + case responsesEventOutputTextDelta: + a.text.WriteString(event.Delta) + case responsesEventReasoningSummaryDelta, responsesEventReasoningTextDelta: + a.reasoning.WriteString(event.Delta) + case responsesEventOutputItemAdded, responsesEventOutputItemDone: + if event.Item != nil && isResponsesToolOutputType(event.Item.Type) { + tool := a.ensureTool(event) + if args := event.Item.ArgumentsString(); args != "" { + tool.Arguments.Reset() + tool.Arguments.WriteString(args) + } + } + case responsesEventFunctionArgsDelta, responsesEventCustomToolInputDelta: + if idx, ok := a.findToolIndex(event); ok { + a.tools[idx].Arguments.WriteString(event.Delta) + return + } + if event.OutputIndex != nil { + a.pendingByOutputIndex[*event.OutputIndex] += event.Delta + } + if itemID := strings.TrimSpace(event.ItemID); itemID != "" { + a.pendingByItemID[itemID] += event.Delta + } + } +} + +func (a *ResponsesBufferedAccumulator) SupplementResponseOutput(resp *dto.OpenAIResponsesResponse) { + if a == nil || resp == nil || len(resp.Output) > 0 { + return + } + resp.Output = a.BuildOutput() +} + +func (a *ResponsesBufferedAccumulator) BuildOutput() []dto.ResponsesOutput { + if a == nil { + return nil + } + out := make([]dto.ResponsesOutput, 0, 2+len(a.tools)) + if a.reasoning.Len() > 0 { + out = append(out, dto.ResponsesOutput{ + Type: responsesOutputTypeReasoning, + Content: []dto.ResponsesOutputContent{ + {Type: "summary_text", Text: a.reasoning.String()}, + }, + }) + } + if a.text.Len() > 0 { + out = append(out, dto.ResponsesOutput{ + Type: responsesOutputTypeMessage, + Role: "assistant", + Content: []dto.ResponsesOutputContent{ + {Type: "output_text", Text: a.text.String()}, + }, + }) + } + for _, tool := range a.tools { + if tool == nil { + continue + } + argsRaw, _ := common.Marshal(tool.Arguments.String()) + out = append(out, dto.ResponsesOutput{ + Type: responsesOutputTypeFunctionCall, + ID: tool.ItemID, + CallId: tool.CallID, + Name: tool.Name, + Arguments: argsRaw, + }) + } + return out +} + +func (a *ResponsesBufferedAccumulator) ensureTool(event *dto.ResponsesStreamResponse) *responsesBufferedTool { + if idx, ok := a.findToolIndex(event); ok { + tool := a.tools[idx] + a.applyToolMetadata(tool, event) + return tool + } + tool := &responsesBufferedTool{} + a.applyToolMetadata(tool, event) + idx := len(a.tools) + a.tools = append(a.tools, tool) + if event.OutputIndex != nil { + a.outputIndexToToolIdx[*event.OutputIndex] = idx + if pending := a.pendingByOutputIndex[*event.OutputIndex]; pending != "" { + tool.Arguments.WriteString(pending) + delete(a.pendingByOutputIndex, *event.OutputIndex) + } + } + if tool.ItemID != "" { + a.itemIDToToolIdx[tool.ItemID] = idx + if pending := a.pendingByItemID[tool.ItemID]; pending != "" { + tool.Arguments.WriteString(pending) + delete(a.pendingByItemID, tool.ItemID) + } + } + return tool +} + +func (a *ResponsesBufferedAccumulator) applyToolMetadata(tool *responsesBufferedTool, event *dto.ResponsesStreamResponse) { + if tool == nil || event == nil || event.Item == nil { + return + } + if itemID := strings.TrimSpace(event.Item.ID); itemID != "" { + tool.ItemID = itemID + } + if callID := strings.TrimSpace(event.Item.CallId); callID != "" { + tool.CallID = callID + } else if tool.CallID == "" { + tool.CallID = strings.TrimSpace(event.Item.ID) + } + if name := strings.TrimSpace(event.Item.Name); name != "" { + tool.Name = name + } +} + +func (a *ResponsesBufferedAccumulator) findToolIndex(event *dto.ResponsesStreamResponse) (int, bool) { + if event == nil { + return 0, false + } + if event.OutputIndex != nil { + if idx, ok := a.outputIndexToToolIdx[*event.OutputIndex]; ok { + return idx, true + } + } + itemID := strings.TrimSpace(event.ItemID) + if itemID == "" && event.Item != nil { + itemID = strings.TrimSpace(event.Item.ID) + } + if itemID != "" { + idx, ok := a.itemIDToToolIdx[itemID] + return idx, ok + } + return 0, false +} + +func responseStatusString(resp *dto.OpenAIResponsesResponse) string { + if resp == nil || len(resp.Status) == 0 { + return "" + } + var status string + _ = common.Unmarshal(resp.Status, &status) + return strings.TrimSpace(status) +} + +func ensureIncompleteResponse(resp *dto.OpenAIResponsesResponse) *dto.OpenAIResponsesResponse { + if resp == nil { + resp = &dto.OpenAIResponsesResponse{} + } + if len(resp.Status) == 0 { + resp.Status = []byte(`"incomplete"`) + } + return resp +} + +func isResponsesToolOutputType(outputType string) bool { + return outputType == responsesOutputTypeFunctionCall || outputType == responsesOutputTypeCustomToolCall +} + +func fallbackToolKey(itemID string, callID string, outputIndex *int) string { + if outputIndex != nil { + return fmt.Sprintf("output:%d", *outputIndex) + } + if strings.TrimSpace(itemID) != "" { + return "item:" + strings.TrimSpace(itemID) + } + if strings.TrimSpace(callID) != "" { + return "call:" + strings.TrimSpace(callID) + } + return "" +} + +func fallbackCallID(event *dto.ResponsesStreamResponse) string { + if event == nil { + return "" + } + if strings.TrimSpace(event.ItemID) != "" { + return strings.TrimSpace(event.ItemID) + } + if event.OutputIndex != nil { + return fmt.Sprintf("call_output_%d", *event.OutputIndex) + } + return "" +}