fix(executor): track usage consistently and harden openai compat streaming#1926
fix(executor): track usage consistently and harden openai compat streaming#1926shenshuoyaoyouguang wants to merge 4 commits intorouter-for-me:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the robustness and accuracy of usage accounting for streaming API calls within the executor. It addresses inconsistencies in how usage is reported across different model providers and enhances compatibility with OpenAI-like APIs by intelligently handling Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request improves executor logic by ensuring consistent usage accounting for streaming implementations with ensurePublished, hardening the OpenAI-compatible executor with a retry mechanism for streaming requests when stream_options.include_usage is rejected, and adding support for parsing aliased usage fields. However, the implementation of this retry mechanism introduces a potential Denial of Service (DoS) vulnerability due to unbounded memory allocation when reading upstream error responses. It also leaks sensitive API keys into application logs and lacks proper attribution fields in audit logs, which could impact security monitoring and billing accuracy. Additionally, there are a couple of minor suggestions for code simplification and consistency.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: be2f5a03b3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces several valuable improvements to the executor implementations. The consistent usage tracking across all streaming executors via reporter.ensurePublished is a great reliability enhancement. The changes to usage_helpers.go to support more token field aliases in OpenAI-compatible streams and the simplification of the parsing logic are also excellent. The core of this PR, the new retry mechanism in OpenAICompatExecutor to handle providers that don't support stream_options.include_usage, is well-implemented and includes good testing. I have one suggestion to improve maintainability by refactoring a small piece of duplicated code.
| var authID, authLabel, authType, authValue string | ||
| if auth != nil { | ||
| authID = auth.ID | ||
| authLabel = auth.Label | ||
| authType, authValue = auth.AccountInfo() | ||
| } | ||
| recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ | ||
| URL: retryReq.URL.String(), | ||
| Method: retryReq.Method, | ||
| Headers: retryReq.Header.Clone(), | ||
| Body: trimmed, | ||
| Provider: e.Identifier(), | ||
| AuthID: authID, | ||
| AuthLabel: authLabel, | ||
| AuthType: authType, | ||
| AuthValue: authValue, | ||
| }) |
There was a problem hiding this comment.
This block of code for recording the API request is very similar to the one in ExecuteStream (lines 236-251). To improve maintainability and reduce code duplication, consider extracting this logic into a helper method on OpenAICompatExecutor.
For example, you could create a method like recordUpstreamRequest:
func (e *OpenAICompatExecutor) recordUpstreamRequest(ctx context.Context, req *http.Request, body []byte, auth *cliproxyauth.Auth) {
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: req.URL.String(),
Method: req.Method,
Headers: req.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
}Then you could call e.recordUpstreamRequest(ctx, retryReq, trimmed, auth) here and e.recordUpstreamRequest(ctx, httpReq, translated, auth) in ExecuteStream.
There was a problem hiding this comment.
Good suggestion. I kept the follow-up fix intentionally narrow so the retry-stream bugfix stayed low-risk and easy to review, which is why I did not fold the duplicated request-logging block into a helper in this PR. If we touch this executor logging path again, I agree that extracting a shared helper would be the right cleanup step.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces several enhancements related to usage reporting and OpenAI-compatible streaming API interactions. For Claude, Gemini, Kimi, and Qwen executors, a change was implemented to ensure that request usage is consistently published, even in cases where no usage chunk is received during streaming, particularly after failures. The OpenAI-compatible executor now automatically injects stream_options.include_usage: true into streaming request payloads to proactively request usage information. A new retry mechanism was added for OpenAI-compatible streaming requests: if an initial request fails with a 400 or 422 status code due to an unsupported stream_options.include_usage field, the executor retries the request with this field removed from the payload. Additionally, the usage parsing helper functions (parseCodexUsage, parseOpenAIUsage, parseOpenAIStreamUsage) were made more resilient by simplifying existence checks and incorporating fallback logic for varying field names (e.g., prompt_tokens vs. input_tokens) to accommodate different provider implementations. New tests were also added to validate the include_usage injection and the retry logic. A review comment suggested changing a log.Errorf to log.Warnf in the new retry function for consistency, as failing to close a response body after a successful read is considered a less critical issue, aligning with other logging practices in the same function.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces consistent usage tracking for streaming executors and hardens the OpenAI-compatible executor by adding a retry mechanism for providers that don't support stream_options.include_usage. The changes are well-implemented and include relevant tests. I've suggested a minor refactoring in openai_compat_executor.go to improve the maintainability of the new retry logic.
| func (e *OpenAICompatExecutor) retryStreamWithoutInjectedUsage(ctx context.Context, auth *cliproxyauth.Auth, httpClient *http.Client, httpReq *http.Request, translated []byte, httpResp *http.Response, autoInjected bool) (*http.Response, error) { | ||
| if !autoInjected || httpResp == nil { | ||
| return nil, nil | ||
| } | ||
| if httpResp.StatusCode != http.StatusBadRequest && httpResp.StatusCode != http.StatusUnprocessableEntity { | ||
| return nil, nil | ||
| } | ||
| body, err := io.ReadAll(io.LimitReader(httpResp.Body, openAICompatRetryErrorBodyLimit+1)) | ||
| if err != nil { | ||
| if errClose := httpResp.Body.Close(); errClose != nil { | ||
| log.Warnf("openai compat executor: failed to close body after read error: %v", errClose) | ||
| } | ||
| return nil, err | ||
| } | ||
| if errClose := httpResp.Body.Close(); errClose != nil { | ||
| log.Errorf("openai compat executor: close fallback response body error: %v", errClose) | ||
| } | ||
| if len(body) > openAICompatRetryErrorBodyLimit { | ||
| log.Warnf("openai compat executor: fallback response body exceeded %d bytes; skip retry without include_usage", openAICompatRetryErrorBodyLimit) | ||
| httpResp.Body = io.NopCloser(bytes.NewReader(body[:openAICompatRetryErrorBodyLimit])) | ||
| return httpResp, nil | ||
| } | ||
| if !strings.Contains(strings.ToLower(string(body)), "stream_options") && !strings.Contains(strings.ToLower(string(body)), "include_usage") { | ||
| httpResp.Body = io.NopCloser(bytes.NewReader(body)) | ||
| return httpResp, nil | ||
| } | ||
| trimmed, err := sjson.DeleteBytes(translated, "stream_options.include_usage") | ||
| if err != nil { | ||
| return nil, fmt.Errorf("openai compat executor: failed to remove unsupported stream_options in payload: %w", err) | ||
| } | ||
| if streamOptions := gjson.GetBytes(trimmed, "stream_options"); streamOptions.Exists() && len(streamOptions.Map()) == 0 { | ||
| trimmed, err = sjson.DeleteBytes(trimmed, "stream_options") | ||
| if err != nil { | ||
| return nil, fmt.Errorf("openai compat executor: failed to remove empty stream_options in payload: %w", err) | ||
| } | ||
| } | ||
| retryReq, err := http.NewRequestWithContext(ctx, httpReq.Method, httpReq.URL.String(), bytes.NewReader(trimmed)) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| retryReq.Header = httpReq.Header.Clone() | ||
| var authID, authLabel, authType, authValue string | ||
| if auth != nil { | ||
| authID = auth.ID | ||
| authLabel = auth.Label | ||
| authType, authValue = auth.AccountInfo() | ||
| } | ||
| recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ | ||
| URL: retryReq.URL.String(), | ||
| Method: retryReq.Method, | ||
| Headers: retryReq.Header.Clone(), | ||
| Body: trimmed, | ||
| Provider: e.Identifier(), | ||
| AuthID: authID, | ||
| AuthLabel: authLabel, | ||
| AuthType: authType, | ||
| AuthValue: authValue, | ||
| }) | ||
| return httpClient.Do(retryReq) | ||
| } |
There was a problem hiding this comment.
This function is quite long and handles multiple responsibilities: checking for retry conditions, reading the response body, preparing the new request, and executing it. This increases its complexity and makes it harder to understand and maintain.
Consider refactoring this function by extracting some of its logic into smaller, more focused helper functions. For example, you could have:
- A function that determines if a retry is necessary. This function would encapsulate reading the response body and checking its content. It could return a boolean, the body bytes, and an error.
- The main
retryStreamWithoutInjectedUsagefunction would then use this helper to decide whether to proceed with preparing and sending the retry request.
This would improve readability and separation of concerns.
There was a problem hiding this comment.
Thanks. I kept this follow-up intentionally focused on the blocking streaming accounting gap and the retry false-positive path, so I did not split
etryStreamWithoutInjectedUsage further in this patch. The new predicate helper was the smallest safe extraction for this round; I can revisit a larger decomposition in a later cleanup PR if we touch this executor again.
|
Review Navigation: High-risk issues mentioned in early automated review for this PR (error body read limit, retry audit fields, etc.) have been resolved in subsequent commits, now closer to maintenance suggestion level. CI has passed, can be placed in second-tier review priority. |
luispater
left a comment
There was a problem hiding this comment.
Summary: Nice direction overall — the usage alias parsing and the OpenAI-compatible retry path both make sense, and the new tests cover the happy path well. I found one blocking gap before this is ready.
Blocking:
- In internal/runtime/executor/claude_executor.go, the Claude->Claude passthrough branch returns before reporter.ensurePublished(ctx), while the fallback publish only happens in the translated branch. That means streams without a usage chunk still won't be accounted for on the passthrough path, so the fix is not yet consistent across Claude streaming. Please move the fallback publish into a defer at the top of the goroutine or call it before that early return, and add a regression test for the from == to path.
Non-blocking:
- In internal/runtime/executor/openai_compat_executor.go, the retry predicate matches any 400/422 body containing stream_options or include_usage. It would be safer to tighten that check, or at least add a negative test proving we do not retry unrelated validation errors that merely echo those strings.
Test plan:
- Reviewed the diff and passing CI checks (build, ensure-no-translator-changes).
|
Pushed follow-up commit c5fb855 to address the remaining streaming review gaps. This update makes the Claude->Claude passthrough stream publish a fallback usage record even when no usage chunk is emitted, and tightens the OpenAI-compatible retry predicate so unrelated 400/422 validation errors do not trigger a second upstream request. I also added regression coverage for both cases and re-ran go test ./internal/runtime/executor. |
Summary
Business value
Tests