Support forced upstream streaming for OpenAI-compatible providers#2089
Support forced upstream streaming for OpenAI-compatible providers#2089jlcbk wants to merge 2 commits intorouter-for-me:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a crucial feature for OpenAI-compatible providers, allowing the system to normalize responses from upstreams that may only behave correctly in Server-Sent Events (SSE) mode. By adding an opt-in Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a force-upstream-stream flag for OpenAI-compatible providers, which is a useful feature for handling upstreams that only support streaming. The implementation is mostly solid, with good configuration options, management API support, and test coverage. I've found one area with redundant code in the executor logic that could be simplified for better clarity and maintainability. Overall, a great addition.
| upstreamStream := forceUpstreamStream | ||
| originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, upstreamStream) | ||
| translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, upstreamStream) | ||
| requestedModel := payloadRequestedModel(opts, req.Model) | ||
| translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel) | ||
| if opts.Alt == "responses/compact" { | ||
| if updated, errDelete := sjson.DeleteBytes(translated, "stream"); errDelete == nil { | ||
| translated = updated | ||
| } | ||
| } else if forceUpstreamStream { | ||
| if updated, errSet := sjson.SetBytes(translated, "stream", true); errSet == nil { | ||
| translated = updated | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic for setting the upstream stream parameter seems to have some redundancy. The upstreamStream variable is passed to sdktranslator.TranslateRequest, which should handle setting the stream property in the request payload. The ExecuteStream function already relies on this behavior. The subsequent else if forceUpstreamStream block, which also sets stream: true, appears to be redundant and can be removed for clarity. Additionally, the upstreamStream variable is an alias for forceUpstreamStream in this context and can be removed to simplify the code.
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, forceUpstreamStream)
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, forceUpstreamStream)
requestedModel := payloadRequestedModel(opts, req.Model)
translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel)
if opts.Alt == "responses/compact" {
if updated, errDelete := sjson.DeleteBytes(translated, "stream"); errDelete == nil {
translated = updated
}
}|
Tests (local on macOS arm64):\n- go test ./internal/runtime/executor ./internal/watcher/diff ./internal/watcher/synthesizer\n\nAll passing. |
|
Note: earlier failures were from the new openai-compat stream aggregation tests; fixed in latest push and now passing. |
luispater
left a comment
There was a problem hiding this comment.
Summary
This opt-in flag is useful, and the overall approach makes sense. The new coverage for content/reasoning aggregation is helpful. I have one blocking correctness issue in the SSE aggregation path before this is safe to merge.
Key findings
- Blocking:
aggregateOpenAIChatCompletionSSEcurrently defaultsfinish_reasonto"stop"whenever the upstream SSE does not send an explicit finish reason. That is incorrect for tool-call-only responses. If the stream contains onlydelta.tool_callsand then[DONE], the aggregated non-stream response is returned withfinish_reason: "stop"instead offinish_reason: "tool_calls". - Why this matters: downstream clients often use
finish_reasonto distinguish a normal assistant completion from a tool invocation. Returning"stop"here changes the meaning of the response and can break tool execution flows for non-stream callers. - Expected fix: when at least one tool call was aggregated and no explicit finish reason was provided by upstream, infer
finish_reason = "tool_calls"for the finalchat.completionpayload. - Test gap: please add a unit test for the tool-call-only SSE case (tool call deltas, no explicit finish reason,
[DONE]terminator) and assert that the aggregated response usesfinish_reason: "tool_calls".
Test plan
go test ./internal/runtime/executor ./internal/watcher/diff -count=1
Summary
Add an opt-in
force-upstream-streamflag foropenai-compatibilityproviders. When enabled, non-stream downstream requests are sent upstream withstream=true, the SSE is aggregated into a final OpenAIchat.completionJSON, and then returned to the downstream client.Why
Some private/self-hosted OpenAI-compatible upstreams only behave correctly in SSE mode. In non-stream mode they may return SSE fragments that cannot be parsed as JSON. This change lets CLIProxyAPI normalize that behavior without changing defaults for existing providers.
What changed
force-upstream-streamfield toopenai-compatibilityTesting
go test ./internal/runtime/executor ./internal/watcher/diff ./internal/watcher/synthesizergo test ./...(optional)Notes