Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/runtime/executor/claude_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
log.Errorf("response body close error: %v", errClose)
}
}()
// Ensure every stream path, including Claude passthrough, records at least one usage entry.
defer reporter.ensurePublished(ctx)

// If from == to (Claude → Claude), directly forward the SSE stream without translation
if from == to {
Expand Down
105 changes: 105 additions & 0 deletions internal/runtime/executor/claude_executor_usage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package executor

import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
)

type authScopedUsagePlugin struct {
authID string
records chan usage.Record
}

func (p *authScopedUsagePlugin) HandleUsage(_ context.Context, record usage.Record) {
if p == nil || record.AuthID != p.authID {
return
}
select {
case p.records <- record:
default:
}
}

var (
claudePassthroughUsagePluginOnce sync.Once
claudePassthroughUsagePlugin = &authScopedUsagePlugin{
authID: "claude-passthrough-no-usage",
records: make(chan usage.Record, 8),
}
)

func waitForUsageRecord(t *testing.T, records <-chan usage.Record) usage.Record {
t.Helper()
select {
case record := <-records:
return record
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for usage record")
return usage.Record{}
}
}

func TestClaudeExecutorExecuteStream_PassthroughPublishesFallbackUsageWithoutUsageChunk(t *testing.T) {
claudePassthroughUsagePluginOnce.Do(func() {
usage.RegisterPlugin(claudePassthroughUsagePlugin)
})

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\"}}\n\n"))
_, _ = w.Write([]byte("data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n"))
_, _ = w.Write([]byte("data: {\"type\":\"message_stop\"}\n\n"))
}))
defer server.Close()

executor := NewClaudeExecutor(&config.Config{})
auth := &cliproxyauth.Auth{
ID: "claude-passthrough-no-usage",
Attributes: map[string]string{
"api_key": "key-123",
"base_url": server.URL,
},
}
payload := []byte(`{"messages":[{"role":"user","content":[{"type":"text","text":"hi"}]}]}`)

result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{
Model: "claude-3-5-sonnet-20241022",
Payload: payload,
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: true,
})
if err != nil {
t.Fatalf("ExecuteStream error: %v", err)
}

for chunk := range result.Chunks {
if chunk.Err != nil {
t.Fatalf("unexpected stream chunk error: %v", chunk.Err)
}
}

record := waitForUsageRecord(t, claudePassthroughUsagePlugin.records)
if record.AuthID != auth.ID {
t.Fatalf("usage record auth_id = %q, want %q", record.AuthID, auth.ID)
}
if record.Provider != "claude" {
t.Fatalf("usage record provider = %q, want %q", record.Provider, "claude")
}
if record.Failed {
t.Fatal("usage fallback should mark request as successful")
}
if record.Detail != (usage.Detail{}) {
t.Fatalf("usage fallback detail = %+v, want zero-value detail", record.Detail)
}
}
2 changes: 2 additions & 0 deletions internal/runtime/executor/gemini_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
// Ensure we record the request if no usage chunk was ever seen
reporter.ensurePublished(ctx)
}()
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions internal/runtime/executor/kimi_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
// Ensure we record the request if no usage chunk was ever seen
reporter.ensurePublished(ctx)
}()
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
}
Expand Down
112 changes: 109 additions & 3 deletions internal/runtime/executor/openai_compat_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ import (
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)

const openAICompatRetryErrorBodyLimit = 1 << 20

// OpenAICompatExecutor implements a stateless executor for OpenAI-compatible providers.
// It performs request/response translation and executes against the provider base URL
// using per-auth credentials (API key) and per-auth HTTP transport (proxy) from context.
Expand Down Expand Up @@ -199,15 +202,22 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, true)
requestedModel := payloadRequestedModel(opts, req.Model)
translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel)
// Preserve historical behavior: if include_usage is omitted or explicitly
// sent as false/null, still force it on so upstreams can emit real usage
// chunks. Only an explicit true counts as caller-enabled.
autoInjectedStreamUsage := !gjson.GetBytes(translated, "stream_options.include_usage").Bool()
Comment on lines 204 to +208
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Derive retry eligibility from the caller payload, not payload rules

autoInjectedStreamUsage is computed after applyPayloadConfigWithRoot has already applied cfg.Payload defaults/overrides, so a proxy-configured rule that injects stream_options.include_usage=true is treated as if the client had supplied it. In that case retryStreamWithoutInjectedUsage is never allowed to run, and OpenAI-compatible backends that reject include_usage still fail with 400/422 even though the unsupported field was added by this executor stack rather than by the caller.

Useful? React with 👍 / 👎.


translated, err = thinking.ApplyThinking(translated, req.Model, from.String(), to.String(), e.Identifier())
if err != nil {
return nil, err
}

// Request usage data in the final streaming chunk so that token statistics
// are captured even when the upstream is an OpenAI-compatible provider.
translated, _ = sjson.SetBytes(translated, "stream_options.include_usage", true)
if autoInjectedStreamUsage {
translated, err = sjson.SetBytes(translated, "stream_options.include_usage", true)
if err != nil {
return nil, fmt.Errorf("openai compat executor: failed to set stream_options in payload: %w", err)
}
}

url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
Expand Down Expand Up @@ -250,6 +260,13 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
if retryResp, retryErr := e.retryStreamWithoutInjectedUsage(ctx, auth, httpClient, httpReq, translated, httpResp, autoInjectedStreamUsage); retryResp != nil || retryErr != nil {
httpResp = retryResp
if retryErr != nil {
recordAPIResponseError(ctx, e.cfg, retryErr)
return nil, retryErr
}
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
Expand Down Expand Up @@ -304,6 +321,95 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
}

func (e *OpenAICompatExecutor) retryStreamWithoutInjectedUsage(ctx context.Context, auth *cliproxyauth.Auth, httpClient *http.Client, httpReq *http.Request, translated []byte, httpResp *http.Response, autoInjected bool) (*http.Response, error) {
if !autoInjected || httpResp == nil {
return nil, nil
}
if httpResp.StatusCode != http.StatusBadRequest && httpResp.StatusCode != http.StatusUnprocessableEntity {
return nil, nil
}
body, err := io.ReadAll(io.LimitReader(httpResp.Body, openAICompatRetryErrorBodyLimit+1))
if err != nil {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Warnf("openai compat executor: failed to close body after read error: %v", errClose)
}
return nil, err
}
if errClose := httpResp.Body.Close(); errClose != nil {
log.Warnf("openai compat executor: close fallback response body error: %v", errClose)
}
if len(body) > openAICompatRetryErrorBodyLimit {
log.Warnf("openai compat executor: fallback response body exceeded %d bytes; skip retry without include_usage", openAICompatRetryErrorBodyLimit)
httpResp.Body = io.NopCloser(bytes.NewReader(body[:openAICompatRetryErrorBodyLimit]))
return httpResp, nil
}
if !isUnsupportedInjectedUsageError(body) {
httpResp.Body = io.NopCloser(bytes.NewReader(body))
return httpResp, nil
}
trimmed, err := sjson.DeleteBytes(translated, "stream_options.include_usage")
Comment thread
shenshuoyaoyouguang marked this conversation as resolved.
if err != nil {
return nil, fmt.Errorf("openai compat executor: failed to remove unsupported stream_options in payload: %w", err)
}
if streamOptions := gjson.GetBytes(trimmed, "stream_options"); streamOptions.Exists() && len(streamOptions.Map()) == 0 {
trimmed, err = sjson.DeleteBytes(trimmed, "stream_options")
if err != nil {
return nil, fmt.Errorf("openai compat executor: failed to remove empty stream_options in payload: %w", err)
}
}
retryReq, err := http.NewRequestWithContext(ctx, httpReq.Method, httpReq.URL.String(), bytes.NewReader(trimmed))
if err != nil {
return nil, err
}
retryReq.Header = httpReq.Header.Clone()
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: retryReq.URL.String(),
Method: retryReq.Method,
Headers: retryReq.Header.Clone(),
Body: trimmed,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
Comment thread
shenshuoyaoyouguang marked this conversation as resolved.
Comment on lines +365 to +381
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for recording the API request is very similar to the one in ExecuteStream (lines 236-251). To improve maintainability and reduce code duplication, consider extracting this logic into a helper method on OpenAICompatExecutor.

For example, you could create a method like recordUpstreamRequest:

func (e *OpenAICompatExecutor) recordUpstreamRequest(ctx context.Context, req *http.Request, body []byte, auth *cliproxyauth.Auth) {
	var authID, authLabel, authType, authValue string
	if auth != nil {
		authID = auth.ID
		authLabel = auth.Label
		authType, authValue = auth.AccountInfo()
	}
	recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
		URL:       req.URL.String(),
		Method:    req.Method,
		Headers:   req.Header.Clone(),
		Body:      body,
		Provider:  e.Identifier(),
		AuthID:    authID,
		AuthLabel: authLabel,
		AuthType:  authType,
		AuthValue: authValue,
	})
}

Then you could call e.recordUpstreamRequest(ctx, retryReq, trimmed, auth) here and e.recordUpstreamRequest(ctx, httpReq, translated, auth) in ExecuteStream.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I kept the follow-up fix intentionally narrow so the retry-stream bugfix stayed low-risk and easy to review, which is why I did not fold the duplicated request-logging block into a helper in this PR. If we touch this executor logging path again, I agree that extracting a shared helper would be the right cleanup step.

return httpClient.Do(retryReq)
}
Comment on lines +324 to +383
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function is quite long and handles multiple responsibilities: checking for retry conditions, reading the response body, preparing the new request, and executing it. This increases its complexity and makes it harder to understand and maintain.

Consider refactoring this function by extracting some of its logic into smaller, more focused helper functions. For example, you could have:

  1. A function that determines if a retry is necessary. This function would encapsulate reading the response body and checking its content. It could return a boolean, the body bytes, and an error.
  2. The main retryStreamWithoutInjectedUsage function would then use this helper to decide whether to proceed with preparing and sending the retry request.

This would improve readability and separation of concerns.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I kept this follow-up intentionally focused on the blocking streaming accounting gap and the retry false-positive path, so I did not split
etryStreamWithoutInjectedUsage further in this patch. The new predicate helper was the smallest safe extraction for this round; I can revisit a larger decomposition in a later cleanup PR if we touch this executor again.


func isUnsupportedInjectedUsageError(body []byte) bool {
if len(body) == 0 {
return false
}
lower := strings.ToLower(string(body))
if !strings.Contains(lower, "stream_options") && !strings.Contains(lower, "include_usage") {
return false
Comment on lines +389 to +391
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Narrow retry detection to include_usage-specific errors

When callers already send another stream_options member, a 400/422 such as unknown field stream_options.foo still matches this predicate because it only looks for any stream_options mention plus a generic validation marker. The retry then removes the auto-injected include_usage but leaves the real offending field intact, so these requests make a second upstream call that fails the same way, adding avoidable latency and consuming another request against providers that rate-limit every attempt.

Useful? React with 👍 / 👎.

}
unsupportedMarkers := []string{
"unknown field",
"unknown parameter",
"unknown argument",
"unrecognized field",
"unrecognized parameter",
"unsupported field",
"unsupported parameter",
"not allowed",
"not permitted",
"extra inputs are not permitted",
}
for _, marker := range unsupportedMarkers {
if strings.Contains(lower, marker) {
return true
}
}
return false
}

func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
baseModel := thinking.ParseSuffix(req.Model).ModelName

Expand Down
Loading
Loading