Skip to content

fix(executor): track usage consistently and harden openai compat streaming#1926

Open
shenshuoyaoyouguang wants to merge 4 commits intorouter-for-me:mainfrom
shenshuoyaoyouguang:pr/executor-usage-streaming
Open

fix(executor): track usage consistently and harden openai compat streaming#1926
shenshuoyaoyouguang wants to merge 4 commits intorouter-for-me:mainfrom
shenshuoyaoyouguang:pr/executor-usage-streaming

Conversation

@shenshuoyaoyouguang
Copy link
Contributor

Summary

  • publish usage accounting consistently across executor stream implementations
  • parse stream usage aliases for openai-compatible providers
  • retry openai-compatible streaming requests without injected include_usage when upstream rejects that field

Business value

  • improves usage accounting reliability for streaming traffic
  • restores compatibility with providers that reject stream_options.include_usage
  • keeps executor/runtime behavior isolated from auth pool logic

Tests

  • go test ./internal/runtime/executor

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly improves the robustness and accuracy of usage accounting for streaming API calls within the executor. It addresses inconsistencies in how usage is reported across different model providers and enhances compatibility with OpenAI-like APIs by intelligently handling stream_options.include_usage. The changes ensure that usage data is reliably captured and that requests are successfully processed even when upstream providers have varying support for specific streaming options.

Highlights

  • Consistent Usage Accounting: Ensured that usage accounting is consistently published across various streaming executor implementations (Claude, Gemini, Kimi, Qwen) by adding a final ensurePublished call in deferred functions.
  • OpenAI Compat Streaming Hardening: Implemented logic to automatically inject stream_options.include_usage: true into OpenAI-compatible streaming requests if not already present, and added a retry mechanism for requests that fail due to this injected field being rejected by the upstream provider.
  • Flexible Usage Parsing: Enhanced OpenAI stream usage parsing to support alternative field names like input_tokens and output_tokens (and their detailed counterparts) in addition to prompt_tokens and completion_tokens.
  • New Test Coverage: Added new test cases to validate the OpenAICompatExecutor's behavior regarding stream_options.include_usage injection and the retry mechanism, as well as the updated usage parsing logic.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • internal/runtime/executor/claude_executor.go
    • Ensured usage reporting by calling reporter.ensurePublished in the ExecuteStream function's defer block.
  • internal/runtime/executor/gemini_executor.go
    • Ensured usage reporting by calling reporter.ensurePublished in the ExecuteStream function's defer block.
  • internal/runtime/executor/kimi_executor.go
    • Ensured usage reporting by calling reporter.ensurePublished in the ExecuteStream function's defer block.
  • internal/runtime/executor/openai_compat_executor.go
    • Imported github.com/tidwall/gjson for JSON parsing.
    • Added logic to automatically inject stream_options.include_usage: true into the request payload if not already present.
    • Introduced a retry mechanism for streaming requests that return 400 Bad Request or 422 Unprocessable Entity if the error message indicates issues with stream_options or include_usage, removing the injected field for the retry.
    • Implemented retryStreamWithoutInjectedUsage function to handle the retry logic.
  • internal/runtime/executor/openai_compat_executor_stream_test.go
    • Added a new test file containing tests for OpenAICompatExecutor streaming behavior.
    • Included TestOpenAICompatExecutorExecuteStreamSetsIncludeUsage to verify include_usage injection.
    • Included TestOpenAICompatExecutorExecuteStreamRetriesWithoutInjectedIncludeUsage to verify the retry mechanism.
  • internal/runtime/executor/qwen_executor.go
    • Ensured usage reporting by calling reporter.ensurePublished in the ExecuteStream function's defer block.
  • internal/runtime/executor/usage_helpers.go
    • Modified parseOpenAIStreamUsage to check for input_tokens and output_tokens as aliases for prompt_tokens and completion_tokens respectively, including their _details fields.
  • internal/runtime/executor/usage_helpers_test.go
    • Added TestParseOpenAIStreamUsageResponses to validate the updated parseOpenAIStreamUsage function with alias fields.
Activity
  • No specific activity (comments, reviews, progress) was provided in the context.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request improves executor logic by ensuring consistent usage accounting for streaming implementations with ensurePublished, hardening the OpenAI-compatible executor with a retry mechanism for streaming requests when stream_options.include_usage is rejected, and adding support for parsing aliased usage fields. However, the implementation of this retry mechanism introduces a potential Denial of Service (DoS) vulnerability due to unbounded memory allocation when reading upstream error responses. It also leaks sensitive API keys into application logs and lacks proper attribution fields in audit logs, which could impact security monitoring and billing accuracy. Additionally, there are a couple of minor suggestions for code simplification and consistency.

@shenshuoyaoyouguang shenshuoyaoyouguang marked this pull request as ready for review March 7, 2026 09:33
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: be2f5a03b3

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@shenshuoyaoyouguang
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several valuable improvements to the executor implementations. The consistent usage tracking across all streaming executors via reporter.ensurePublished is a great reliability enhancement. The changes to usage_helpers.go to support more token field aliases in OpenAI-compatible streams and the simplification of the parsing logic are also excellent. The core of this PR, the new retry mechanism in OpenAICompatExecutor to handle providers that don't support stream_options.include_usage, is well-implemented and includes good testing. I have one suggestion to improve maintainability by refactoring a small piece of duplicated code.

Comment on lines +362 to +378
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: retryReq.URL.String(),
Method: retryReq.Method,
Headers: retryReq.Header.Clone(),
Body: trimmed,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for recording the API request is very similar to the one in ExecuteStream (lines 236-251). To improve maintainability and reduce code duplication, consider extracting this logic into a helper method on OpenAICompatExecutor.

For example, you could create a method like recordUpstreamRequest:

func (e *OpenAICompatExecutor) recordUpstreamRequest(ctx context.Context, req *http.Request, body []byte, auth *cliproxyauth.Auth) {
	var authID, authLabel, authType, authValue string
	if auth != nil {
		authID = auth.ID
		authLabel = auth.Label
		authType, authValue = auth.AccountInfo()
	}
	recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
		URL:       req.URL.String(),
		Method:    req.Method,
		Headers:   req.Header.Clone(),
		Body:      body,
		Provider:  e.Identifier(),
		AuthID:    authID,
		AuthLabel: authLabel,
		AuthType:  authType,
		AuthValue: authValue,
	})
}

Then you could call e.recordUpstreamRequest(ctx, retryReq, trimmed, auth) here and e.recordUpstreamRequest(ctx, httpReq, translated, auth) in ExecuteStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I kept the follow-up fix intentionally narrow so the retry-stream bugfix stayed low-risk and easy to review, which is why I did not fold the duplicated request-logging block into a helper in this PR. If we touch this executor logging path again, I agree that extracting a shared helper would be the right cleanup step.

@shenshuoyaoyouguang
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several enhancements related to usage reporting and OpenAI-compatible streaming API interactions. For Claude, Gemini, Kimi, and Qwen executors, a change was implemented to ensure that request usage is consistently published, even in cases where no usage chunk is received during streaming, particularly after failures. The OpenAI-compatible executor now automatically injects stream_options.include_usage: true into streaming request payloads to proactively request usage information. A new retry mechanism was added for OpenAI-compatible streaming requests: if an initial request fails with a 400 or 422 status code due to an unsupported stream_options.include_usage field, the executor retries the request with this field removed from the payload. Additionally, the usage parsing helper functions (parseCodexUsage, parseOpenAIUsage, parseOpenAIStreamUsage) were made more resilient by simplifying existence checks and incorporating fallback logic for varying field names (e.g., prompt_tokens vs. input_tokens) to accommodate different provider implementations. New tests were also added to validate the include_usage injection and the retry logic. A review comment suggested changing a log.Errorf to log.Warnf in the new retry function for consistency, as failing to close a response body after a successful read is considered a less critical issue, aligning with other logging practices in the same function.

@shenshuoyaoyouguang
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces consistent usage tracking for streaming executors and hardens the OpenAI-compatible executor by adding a retry mechanism for providers that don't support stream_options.include_usage. The changes are well-implemented and include relevant tests. I've suggested a minor refactoring in openai_compat_executor.go to improve the maintainability of the new retry logic.

Comment on lines +321 to +380
func (e *OpenAICompatExecutor) retryStreamWithoutInjectedUsage(ctx context.Context, auth *cliproxyauth.Auth, httpClient *http.Client, httpReq *http.Request, translated []byte, httpResp *http.Response, autoInjected bool) (*http.Response, error) {
if !autoInjected || httpResp == nil {
return nil, nil
}
if httpResp.StatusCode != http.StatusBadRequest && httpResp.StatusCode != http.StatusUnprocessableEntity {
return nil, nil
}
body, err := io.ReadAll(io.LimitReader(httpResp.Body, openAICompatRetryErrorBodyLimit+1))
if err != nil {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Warnf("openai compat executor: failed to close body after read error: %v", errClose)
}
return nil, err
}
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close fallback response body error: %v", errClose)
}
if len(body) > openAICompatRetryErrorBodyLimit {
log.Warnf("openai compat executor: fallback response body exceeded %d bytes; skip retry without include_usage", openAICompatRetryErrorBodyLimit)
httpResp.Body = io.NopCloser(bytes.NewReader(body[:openAICompatRetryErrorBodyLimit]))
return httpResp, nil
}
if !strings.Contains(strings.ToLower(string(body)), "stream_options") && !strings.Contains(strings.ToLower(string(body)), "include_usage") {
httpResp.Body = io.NopCloser(bytes.NewReader(body))
return httpResp, nil
}
trimmed, err := sjson.DeleteBytes(translated, "stream_options.include_usage")
if err != nil {
return nil, fmt.Errorf("openai compat executor: failed to remove unsupported stream_options in payload: %w", err)
}
if streamOptions := gjson.GetBytes(trimmed, "stream_options"); streamOptions.Exists() && len(streamOptions.Map()) == 0 {
trimmed, err = sjson.DeleteBytes(trimmed, "stream_options")
if err != nil {
return nil, fmt.Errorf("openai compat executor: failed to remove empty stream_options in payload: %w", err)
}
}
retryReq, err := http.NewRequestWithContext(ctx, httpReq.Method, httpReq.URL.String(), bytes.NewReader(trimmed))
if err != nil {
return nil, err
}
retryReq.Header = httpReq.Header.Clone()
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: retryReq.URL.String(),
Method: retryReq.Method,
Headers: retryReq.Header.Clone(),
Body: trimmed,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
return httpClient.Do(retryReq)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function is quite long and handles multiple responsibilities: checking for retry conditions, reading the response body, preparing the new request, and executing it. This increases its complexity and makes it harder to understand and maintain.

Consider refactoring this function by extracting some of its logic into smaller, more focused helper functions. For example, you could have:

  1. A function that determines if a retry is necessary. This function would encapsulate reading the response body and checking its content. It could return a boolean, the body bytes, and an error.
  2. The main retryStreamWithoutInjectedUsage function would then use this helper to decide whether to proceed with preparing and sending the retry request.

This would improve readability and separation of concerns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I kept this follow-up intentionally focused on the blocking streaming accounting gap and the retry false-positive path, so I did not split
etryStreamWithoutInjectedUsage further in this patch. The new predicate helper was the smallest safe extraction for this round; I can revisit a larger decomposition in a later cleanup PR if we touch this executor again.

@shenshuoyaoyouguang
Copy link
Contributor Author

Review Navigation: High-risk issues mentioned in early automated review for this PR (error body read limit, retry audit fields, etc.) have been resolved in subsequent commits, now closer to maintenance suggestion level. CI has passed, can be placed in second-tier review priority.

Copy link
Collaborator

@luispater luispater left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary: Nice direction overall — the usage alias parsing and the OpenAI-compatible retry path both make sense, and the new tests cover the happy path well. I found one blocking gap before this is ready.

Blocking:

  • In internal/runtime/executor/claude_executor.go, the Claude->Claude passthrough branch returns before reporter.ensurePublished(ctx), while the fallback publish only happens in the translated branch. That means streams without a usage chunk still won't be accounted for on the passthrough path, so the fix is not yet consistent across Claude streaming. Please move the fallback publish into a defer at the top of the goroutine or call it before that early return, and add a regression test for the from == to path.

Non-blocking:

  • In internal/runtime/executor/openai_compat_executor.go, the retry predicate matches any 400/422 body containing stream_options or include_usage. It would be safer to tighten that check, or at least add a negative test proving we do not retry unrelated validation errors that merely echo those strings.

Test plan:

  • Reviewed the diff and passing CI checks (build, ensure-no-translator-changes).

@shenshuoyaoyouguang
Copy link
Contributor Author

Pushed follow-up commit c5fb855 to address the remaining streaming review gaps. This update makes the Claude->Claude passthrough stream publish a fallback usage record even when no usage chunk is emitted, and tightens the OpenAI-compatible retry predicate so unrelated 400/422 validation errors do not trigger a second upstream request. I also added regression coverage for both cases and re-ran go test ./internal/runtime/executor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants