Skip to content

Support forced upstream streaming for OpenAI-compatible providers#2089

Open
jlcbk wants to merge 2 commits intorouter-for-me:mainfrom
jlcbk:codex/openai-compat-force-upstream-stream
Open

Support forced upstream streaming for OpenAI-compatible providers#2089
jlcbk wants to merge 2 commits intorouter-for-me:mainfrom
jlcbk:codex/openai-compat-force-upstream-stream

Conversation

@jlcbk
Copy link

@jlcbk jlcbk commented Mar 12, 2026

Summary

Add an opt-in force-upstream-stream flag for openai-compatibility providers. When enabled, non-stream downstream requests are sent upstream with stream=true, the SSE is aggregated into a final OpenAI chat.completion JSON, and then returned to the downstream client.

Why

Some private/self-hosted OpenAI-compatible upstreams only behave correctly in SSE mode. In non-stream mode they may return SSE fragments that cannot be parsed as JSON. This change lets CLIProxyAPI normalize that behavior without changing defaults for existing providers.

What changed

  • config: add force-upstream-stream field to openai-compatibility
  • management API PATCH: support toggling the new field
  • executor: optionally force upstream SSE + aggregate to non-stream JSON
  • diff watcher: include the new flag in signatures and summaries
  • tests: cover aggregation of content/reasoning/tool-calls and default behavior

Testing

  • Not run locally (no Go toolchain in this environment). Please run:
    • go test ./internal/runtime/executor ./internal/watcher/diff ./internal/watcher/synthesizer
    • go test ./... (optional)
  • gofmt not run (no gofmt in this environment)

Notes

  • Feature is opt-in; default behavior remains unchanged.
  • No provider URLs are included in this PR to avoid exposing private upstreams.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a crucial feature for OpenAI-compatible providers, allowing the system to normalize responses from upstreams that may only behave correctly in Server-Sent Events (SSE) mode. By adding an opt-in force-upstream-stream flag, the CLIProxyAPI can now send non-stream downstream requests as stream=true to the upstream, aggregate the SSE fragments into a final OpenAI chat.completion JSON, and then return it to the client. This ensures compatibility and correct parsing for self-hosted or private OpenAI-compatible services without altering existing default behaviors.

Highlights

  • Configuration: Added a new force-upstream-stream field to the openai-compatibility configuration, allowing an opt-in mechanism to force upstream streaming.
  • Management API: Updated the management API to support toggling the new force-upstream-stream field for OpenAI-compatible providers.
  • Executor Logic: Implemented logic within the executor to optionally force upstream requests to use SSE (Server-Sent Events) and then aggregate these SSE fragments into a single non-stream JSON response for downstream clients.
  • Configuration Watcher: Ensured the new force-upstream-stream flag is included in configuration signatures and summaries generated by the diff watcher.
  • Testing: Added comprehensive tests covering the aggregation of content, reasoning, and tool calls from SSE streams, and confirmed that the default behavior remains unchanged when the flag is not enabled.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • config.example.yaml
    • Added a commented-out example for the new force-upstream-stream configuration field.
  • internal/api/handlers/management/config_lists.go
    • Introduced ForceUpstreamStream as an optional boolean field in the PatchOpenAICompat request body.
    • Updated the patching logic to apply the ForceUpstreamStream value to the configuration entry.
  • internal/config/config.go
    • Defined a new ForceUpstreamStream boolean field in the OpenAICompatibility struct, with YAML and JSON tags for serialization.
  • internal/runtime/executor/openai_compat_executor.go
    • Implemented logic to check for forceUpstreamStream and modify the upstream request payload to include stream=true.
    • Added Accept and Cache-Control headers to upstream requests when forceUpstreamStream is enabled.
    • Modified response processing to aggregate SSE responses into a single JSON payload using aggregateOpenAIChatCompletionSSE when forceUpstreamStream is active.
    • Created a shouldForceUpstreamStream helper function to determine if upstream streaming should be forced.
  • internal/runtime/executor/openai_compat_executor_stream_aggregate_test.go
    • Added a new test file to verify the aggregation of reasoning and content from upstream SSE streams.
    • Included tests to confirm correct aggregation of tool calls from SSE streams.
    • Added a test to ensure that the default behavior of the executor remains unchanged when force-upstream-stream is not enabled.
  • internal/runtime/executor/openai_compat_sse_aggregate.go
    • Created a new file containing the aggregateOpenAIChatCompletionSSE function, which processes OpenAI-style chat completion chunk SSEs and converts them into a single, complete JSON response.
    • Defined a toolCallAggregate struct to assist in accumulating tool call data during SSE aggregation.
  • internal/watcher/diff/openai_compat.go
    • Updated describeOpenAICompatibilityUpdate to report changes in the force-upstream-stream setting.
    • Modified openAICompatSignature to include the ForceUpstreamStream field in the configuration signature, ensuring changes are detected.
  • internal/watcher/diff/openai_compat_test.go
    • Updated existing tests to account for the new ForceUpstreamStream field in OpenAICompatibility configurations.
    • Modified assertions to reflect the inclusion of force-upstream-stream changes in provider update summaries and signatures.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a force-upstream-stream flag for OpenAI-compatible providers, which is a useful feature for handling upstreams that only support streaming. The implementation is mostly solid, with good configuration options, management API support, and test coverage. I've found one area with redundant code in the executor logic that could be simplified for better clarity and maintainability. Overall, a great addition.

Comment on lines +97 to 110
upstreamStream := forceUpstreamStream
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, upstreamStream)
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, upstreamStream)
requestedModel := payloadRequestedModel(opts, req.Model)
translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel)
if opts.Alt == "responses/compact" {
if updated, errDelete := sjson.DeleteBytes(translated, "stream"); errDelete == nil {
translated = updated
}
} else if forceUpstreamStream {
if updated, errSet := sjson.SetBytes(translated, "stream", true); errSet == nil {
translated = updated
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for setting the upstream stream parameter seems to have some redundancy. The upstreamStream variable is passed to sdktranslator.TranslateRequest, which should handle setting the stream property in the request payload. The ExecuteStream function already relies on this behavior. The subsequent else if forceUpstreamStream block, which also sets stream: true, appears to be redundant and can be removed for clarity. Additionally, the upstreamStream variable is an alias for forceUpstreamStream in this context and can be removed to simplify the code.

 	originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, forceUpstreamStream)
 	translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, forceUpstreamStream)
 	requestedModel := payloadRequestedModel(opts, req.Model)
 	translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel)
 	if opts.Alt == "responses/compact" {
 		if updated, errDelete := sjson.DeleteBytes(translated, "stream"); errDelete == nil {
 			translated = updated
 		}
 	}

@jlcbk
Copy link
Author

jlcbk commented Mar 12, 2026

Tests (local on macOS arm64):\n- go test ./internal/runtime/executor ./internal/watcher/diff ./internal/watcher/synthesizer\n\nAll passing.

@jlcbk
Copy link
Author

jlcbk commented Mar 12, 2026

Note: earlier failures were from the new openai-compat stream aggregation tests; fixed in latest push and now passing.

Copy link
Collaborator

@luispater luispater left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

This opt-in flag is useful, and the overall approach makes sense. The new coverage for content/reasoning aggregation is helpful. I have one blocking correctness issue in the SSE aggregation path before this is safe to merge.

Key findings

  • Blocking: aggregateOpenAIChatCompletionSSE currently defaults finish_reason to "stop" whenever the upstream SSE does not send an explicit finish reason. That is incorrect for tool-call-only responses. If the stream contains only delta.tool_calls and then [DONE], the aggregated non-stream response is returned with finish_reason: "stop" instead of finish_reason: "tool_calls".
  • Why this matters: downstream clients often use finish_reason to distinguish a normal assistant completion from a tool invocation. Returning "stop" here changes the meaning of the response and can break tool execution flows for non-stream callers.
  • Expected fix: when at least one tool call was aggregated and no explicit finish reason was provided by upstream, infer finish_reason = "tool_calls" for the final chat.completion payload.
  • Test gap: please add a unit test for the tool-call-only SSE case (tool call deltas, no explicit finish reason, [DONE] terminator) and assert that the aggregated response uses finish_reason: "tool_calls".

Test plan

  • go test ./internal/runtime/executor ./internal/watcher/diff -count=1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants