-
-
Notifications
You must be signed in to change notification settings - Fork 5k
fix(executor): track usage consistently and harden openai compat streaming #1926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
be2f5a0
12ab70f
20ddd76
c5fb855
72cb44a
e1e336f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| package executor | ||
|
|
||
| import ( | ||
| "context" | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/router-for-me/CLIProxyAPI/v6/internal/config" | ||
| cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" | ||
| cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" | ||
| "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" | ||
| sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" | ||
| ) | ||
|
|
||
| type authScopedUsagePlugin struct { | ||
| authID string | ||
| records chan usage.Record | ||
| } | ||
|
|
||
| func (p *authScopedUsagePlugin) HandleUsage(_ context.Context, record usage.Record) { | ||
| if p == nil || record.AuthID != p.authID { | ||
| return | ||
| } | ||
| select { | ||
| case p.records <- record: | ||
| default: | ||
| } | ||
| } | ||
|
|
||
| var ( | ||
| claudePassthroughUsagePluginOnce sync.Once | ||
| claudePassthroughUsagePlugin = &authScopedUsagePlugin{ | ||
| authID: "claude-passthrough-no-usage", | ||
| records: make(chan usage.Record, 8), | ||
| } | ||
| ) | ||
|
|
||
| func waitForUsageRecord(t *testing.T, records <-chan usage.Record) usage.Record { | ||
| t.Helper() | ||
| select { | ||
| case record := <-records: | ||
| return record | ||
| case <-time.After(2 * time.Second): | ||
| t.Fatal("timed out waiting for usage record") | ||
| return usage.Record{} | ||
| } | ||
| } | ||
|
|
||
| func TestClaudeExecutorExecuteStream_PassthroughPublishesFallbackUsageWithoutUsageChunk(t *testing.T) { | ||
| claudePassthroughUsagePluginOnce.Do(func() { | ||
| usage.RegisterPlugin(claudePassthroughUsagePlugin) | ||
| }) | ||
|
|
||
| server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| w.Header().Set("Content-Type", "text/event-stream") | ||
| _, _ = w.Write([]byte("data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\"}}\n\n")) | ||
| _, _ = w.Write([]byte("data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n")) | ||
| _, _ = w.Write([]byte("data: {\"type\":\"message_stop\"}\n\n")) | ||
| })) | ||
| defer server.Close() | ||
|
|
||
| executor := NewClaudeExecutor(&config.Config{}) | ||
| auth := &cliproxyauth.Auth{ | ||
| ID: "claude-passthrough-no-usage", | ||
| Attributes: map[string]string{ | ||
| "api_key": "key-123", | ||
| "base_url": server.URL, | ||
| }, | ||
| } | ||
| payload := []byte(`{"messages":[{"role":"user","content":[{"type":"text","text":"hi"}]}]}`) | ||
|
|
||
| result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{ | ||
| Model: "claude-3-5-sonnet-20241022", | ||
| Payload: payload, | ||
| }, cliproxyexecutor.Options{ | ||
| SourceFormat: sdktranslator.FromString("claude"), | ||
| Stream: true, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("ExecuteStream error: %v", err) | ||
| } | ||
|
|
||
| for chunk := range result.Chunks { | ||
| if chunk.Err != nil { | ||
| t.Fatalf("unexpected stream chunk error: %v", chunk.Err) | ||
| } | ||
| } | ||
|
|
||
| record := waitForUsageRecord(t, claudePassthroughUsagePlugin.records) | ||
| if record.AuthID != auth.ID { | ||
| t.Fatalf("usage record auth_id = %q, want %q", record.AuthID, auth.ID) | ||
| } | ||
| if record.Provider != "claude" { | ||
| t.Fatalf("usage record provider = %q, want %q", record.Provider, "claude") | ||
| } | ||
| if record.Failed { | ||
| t.Fatal("usage fallback should mark request as successful") | ||
| } | ||
| if record.Detail != (usage.Detail{}) { | ||
| t.Fatalf("usage fallback detail = %+v, want zero-value detail", record.Detail) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,9 +17,12 @@ import ( | |
| cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" | ||
| sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" | ||
| log "github.com/sirupsen/logrus" | ||
| "github.com/tidwall/gjson" | ||
| "github.com/tidwall/sjson" | ||
| ) | ||
|
|
||
| const openAICompatRetryErrorBodyLimit = 1 << 20 | ||
|
|
||
| // OpenAICompatExecutor implements a stateless executor for OpenAI-compatible providers. | ||
| // It performs request/response translation and executes against the provider base URL | ||
| // using per-auth credentials (API key) and per-auth HTTP transport (proxy) from context. | ||
|
|
@@ -199,15 +202,22 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy | |
| translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, true) | ||
| requestedModel := payloadRequestedModel(opts, req.Model) | ||
| translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel) | ||
| // Preserve historical behavior: if include_usage is omitted or explicitly | ||
| // sent as false/null, still force it on so upstreams can emit real usage | ||
| // chunks. Only an explicit true counts as caller-enabled. | ||
| autoInjectedStreamUsage := !gjson.GetBytes(translated, "stream_options.include_usage").Bool() | ||
|
|
||
| translated, err = thinking.ApplyThinking(translated, req.Model, from.String(), to.String(), e.Identifier()) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Request usage data in the final streaming chunk so that token statistics | ||
| // are captured even when the upstream is an OpenAI-compatible provider. | ||
| translated, _ = sjson.SetBytes(translated, "stream_options.include_usage", true) | ||
| if autoInjectedStreamUsage { | ||
| translated, err = sjson.SetBytes(translated, "stream_options.include_usage", true) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("openai compat executor: failed to set stream_options in payload: %w", err) | ||
| } | ||
| } | ||
|
|
||
| url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" | ||
| httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated)) | ||
|
|
@@ -250,6 +260,13 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy | |
| recordAPIResponseError(ctx, e.cfg, err) | ||
| return nil, err | ||
| } | ||
| if retryResp, retryErr := e.retryStreamWithoutInjectedUsage(ctx, auth, httpClient, httpReq, translated, httpResp, autoInjectedStreamUsage); retryResp != nil || retryErr != nil { | ||
| httpResp = retryResp | ||
| if retryErr != nil { | ||
| recordAPIResponseError(ctx, e.cfg, retryErr) | ||
| return nil, retryErr | ||
| } | ||
| } | ||
| recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) | ||
| if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { | ||
| b, _ := io.ReadAll(httpResp.Body) | ||
|
|
@@ -304,6 +321,95 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy | |
| return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil | ||
| } | ||
|
|
||
| func (e *OpenAICompatExecutor) retryStreamWithoutInjectedUsage(ctx context.Context, auth *cliproxyauth.Auth, httpClient *http.Client, httpReq *http.Request, translated []byte, httpResp *http.Response, autoInjected bool) (*http.Response, error) { | ||
| if !autoInjected || httpResp == nil { | ||
| return nil, nil | ||
| } | ||
| if httpResp.StatusCode != http.StatusBadRequest && httpResp.StatusCode != http.StatusUnprocessableEntity { | ||
| return nil, nil | ||
| } | ||
| body, err := io.ReadAll(io.LimitReader(httpResp.Body, openAICompatRetryErrorBodyLimit+1)) | ||
| if err != nil { | ||
| if errClose := httpResp.Body.Close(); errClose != nil { | ||
| log.Warnf("openai compat executor: failed to close body after read error: %v", errClose) | ||
| } | ||
| return nil, err | ||
| } | ||
| if errClose := httpResp.Body.Close(); errClose != nil { | ||
| log.Warnf("openai compat executor: close fallback response body error: %v", errClose) | ||
| } | ||
| if len(body) > openAICompatRetryErrorBodyLimit { | ||
| log.Warnf("openai compat executor: fallback response body exceeded %d bytes; skip retry without include_usage", openAICompatRetryErrorBodyLimit) | ||
| httpResp.Body = io.NopCloser(bytes.NewReader(body[:openAICompatRetryErrorBodyLimit])) | ||
| return httpResp, nil | ||
| } | ||
| if !isUnsupportedInjectedUsageError(body) { | ||
| httpResp.Body = io.NopCloser(bytes.NewReader(body)) | ||
| return httpResp, nil | ||
| } | ||
| trimmed, err := sjson.DeleteBytes(translated, "stream_options.include_usage") | ||
|
shenshuoyaoyouguang marked this conversation as resolved.
|
||
| if err != nil { | ||
| return nil, fmt.Errorf("openai compat executor: failed to remove unsupported stream_options in payload: %w", err) | ||
| } | ||
| if streamOptions := gjson.GetBytes(trimmed, "stream_options"); streamOptions.Exists() && len(streamOptions.Map()) == 0 { | ||
| trimmed, err = sjson.DeleteBytes(trimmed, "stream_options") | ||
| if err != nil { | ||
| return nil, fmt.Errorf("openai compat executor: failed to remove empty stream_options in payload: %w", err) | ||
| } | ||
| } | ||
| retryReq, err := http.NewRequestWithContext(ctx, httpReq.Method, httpReq.URL.String(), bytes.NewReader(trimmed)) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| retryReq.Header = httpReq.Header.Clone() | ||
| var authID, authLabel, authType, authValue string | ||
| if auth != nil { | ||
| authID = auth.ID | ||
| authLabel = auth.Label | ||
| authType, authValue = auth.AccountInfo() | ||
| } | ||
| recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ | ||
| URL: retryReq.URL.String(), | ||
| Method: retryReq.Method, | ||
| Headers: retryReq.Header.Clone(), | ||
| Body: trimmed, | ||
| Provider: e.Identifier(), | ||
| AuthID: authID, | ||
| AuthLabel: authLabel, | ||
| AuthType: authType, | ||
| AuthValue: authValue, | ||
| }) | ||
|
shenshuoyaoyouguang marked this conversation as resolved.
Comment on lines
+365
to
+381
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block of code for recording the API request is very similar to the one in For example, you could create a method like func (e *OpenAICompatExecutor) recordUpstreamRequest(ctx context.Context, req *http.Request, body []byte, auth *cliproxyauth.Auth) {
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: req.URL.String(),
Method: req.Method,
Headers: req.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
}Then you could call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion. I kept the follow-up fix intentionally narrow so the retry-stream bugfix stayed low-risk and easy to review, which is why I did not fold the duplicated request-logging block into a helper in this PR. If we touch this executor logging path again, I agree that extracting a shared helper would be the right cleanup step. |
||
| return httpClient.Do(retryReq) | ||
| } | ||
|
Comment on lines
+324
to
+383
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is quite long and handles multiple responsibilities: checking for retry conditions, reading the response body, preparing the new request, and executing it. This increases its complexity and makes it harder to understand and maintain. Consider refactoring this function by extracting some of its logic into smaller, more focused helper functions. For example, you could have:
This would improve readability and separation of concerns.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. I kept this follow-up intentionally focused on the blocking streaming accounting gap and the retry false-positive path, so I did not split |
||
|
|
||
| func isUnsupportedInjectedUsageError(body []byte) bool { | ||
| if len(body) == 0 { | ||
| return false | ||
| } | ||
| lower := strings.ToLower(string(body)) | ||
| if !strings.Contains(lower, "stream_options") && !strings.Contains(lower, "include_usage") { | ||
| return false | ||
|
Comment on lines
+389
to
+391
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When callers already send another Useful? React with 👍 / 👎. |
||
| } | ||
| unsupportedMarkers := []string{ | ||
| "unknown field", | ||
| "unknown parameter", | ||
| "unknown argument", | ||
| "unrecognized field", | ||
| "unrecognized parameter", | ||
| "unsupported field", | ||
| "unsupported parameter", | ||
| "not allowed", | ||
| "not permitted", | ||
| "extra inputs are not permitted", | ||
| } | ||
| for _, marker := range unsupportedMarkers { | ||
| if strings.Contains(lower, marker) { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { | ||
| baseModel := thinking.ParseSuffix(req.Model).ModelName | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
autoInjectedStreamUsageis computed afterapplyPayloadConfigWithRoothas already appliedcfg.Payloaddefaults/overrides, so a proxy-configured rule that injectsstream_options.include_usage=trueis treated as if the client had supplied it. In that caseretryStreamWithoutInjectedUsageis never allowed to run, and OpenAI-compatible backends that rejectinclude_usagestill fail with 400/422 even though the unsupported field was added by this executor stack rather than by the caller.Useful? React with 👍 / 👎.