-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproxy_core.py
More file actions
138 lines (107 loc) · 4.85 KB
/
proxy_core.py
File metadata and controls
138 lines (107 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
代理核心模块 - 提供路由层共用的请求解析、错误格式化和代理流式响应构建逻辑。
消除 routes_codex.py 和 routes_claude.py 中的重复代码。
"""
from __future__ import annotations
import logging
from typing import Any, Optional, Type
from fastapi import Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from codebuddy_client import CodeBuddyClient, CodeBuddyClientError
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════
# 统一错误格式
# ═══════════════════════════════════════════════════════════════
def make_error(code: str, message: str) -> dict:
"""构造统一错误响应体
所有路由和全局异常处理器统一使用此格式:
{"error": {"code": "...", "message": "..."}}
"""
return {"error": {"code": code, "message": message}}
def raise_upstream_error(e: CodeBuddyClientError) -> None:
"""将 CodeBuddyClientError 转换为统一的 HTTPException"""
raise HTTPException(
status_code=e.status_code or 502,
detail=make_error("UPSTREAM_ERROR", str(e)),
)
def raise_internal_error(e: Exception) -> None:
"""将内部异常转换为统一的 HTTPException"""
raise HTTPException(
status_code=500,
detail=make_error("INTERNAL_ERROR", str(e)),
)
# ═══════════════════════════════════════════════════════════════
# 请求解析
# ═══════════════════════════════════════════════════════════════
async def parse_request_body(request: Request) -> dict:
"""读取并解析请求 JSON 体"""
try:
return await request.json()
except Exception as e:
logger.error("Failed to parse request JSON: %s", e)
raise HTTPException(
status_code=400,
detail=make_error("BAD_REQUEST", f"Invalid JSON: {e}"),
)
def validate_request(raw_body: dict, model_class: Type[BaseModel]) -> BaseModel:
"""用 Pydantic 模型校验请求体"""
try:
return model_class.model_validate(raw_body)
except Exception as e:
logger.error(
"Request validation failed for %s: %s\nRaw keys: %s",
model_class.__name__,
e,
list(raw_body.keys()) if isinstance(raw_body, dict) else type(raw_body),
)
raise HTTPException(
status_code=422,
detail=make_error("VALIDATION_ERROR", f"Request validation failed: {e}"),
)
# ═══════════════════════════════════════════════════════════════
# 流式代理响应构建
# ═══════════════════════════════════════════════════════════════
async def build_streaming_response(
client: CodeBuddyClient,
prompt_text: str,
model: str,
stream_translator,
extra_headers: Optional[dict[str, str]] = None,
) -> StreamingResponse:
"""构建流式 SSE 代理响应的通用逻辑。
1. 创建 CodeBuddy run
2. 订阅 SSE 流
3. 翻译为客户端格式
4. 包装为 StreamingResponse
Args:
client: CodeBuddy 客户端
prompt_text: 翻译后的 prompt 文本
model: 客户端指定的模型名
stream_translator: 翻译函数,签名为 (stream: AsyncGenerator, model: str) -> AsyncGenerator
extra_headers: 额外的响应头
Returns:
StreamingResponse 对象
"""
try:
run_id = await client.post_run(text=prompt_text)
codebuddy_stream = client.stream_run(run_id)
translated_stream = stream_translator(codebuddy_stream, model=model)
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
if extra_headers:
headers.update(extra_headers)
return StreamingResponse(
translated_stream,
media_type="text/event-stream",
headers=headers,
)
except CodeBuddyClientError as e:
logger.error("CodeBuddy error in proxy: %s", e)
raise_upstream_error(e)
except Exception as e:
logger.exception("Unexpected error in proxy")
raise_internal_error(e)