-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
122 lines (84 loc) · 4.37 KB
/
utils.py
File metadata and controls
122 lines (84 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
通用工具函数 - 统一 ID 生成、时间戳、SSE 事件构建和流辅助工具。
供 schemas、sse_translator、routes 等模块共享使用。
"""
from __future__ import annotations
import json
import time
import uuid
import logging
import asyncio
from typing import AsyncGenerator, Optional
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════
# ID 生成
# ═══════════════════════════════════════════════════════════════
def now_unix() -> int:
"""返回当前 Unix 时间戳(秒)"""
return int(time.time())
def gen_id(prefix: str = "id") -> str:
"""生成带前缀的唯一 ID"""
return f"{prefix}_{uuid.uuid4().hex[:12]}"
def generate_response_id() -> str:
"""生成 OpenAI Responses API 格式的 response ID"""
return gen_id("resp")
def generate_message_id() -> str:
"""生成 Anthropic Messages API 格式的 message ID"""
return gen_id("msg")
def generate_content_block_id() -> str:
"""生成 Anthropic 格式的 content block ID"""
return gen_id("cb")
def generate_item_id() -> str:
"""生成 OpenAI 格式的 item ID"""
return gen_id("item")
# ═══════════════════════════════════════════════════════════════
# SSE 事件构建
# ═══════════════════════════════════════════════════════════════
def sse_event(data: dict) -> str:
"""构造仅含 data 行的 SSE 事件(Anthropic 标准格式)"""
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
def sse_event_with_type(event_type: str, data: dict) -> str:
"""构造含 event + data 行的 SSE 事件(OpenAI 格式)
统一替代原来的 _sse_event_openai / _sse_event_anthropic 两个完全相同的函数。
"""
return (
f"event: {event_type}\n"
f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
)
# ═══════════════════════════════════════════════════════════════
# Token 估算
# ═══════════════════════════════════════════════════════════════
def estimate_tokens(text: str, divisor: int = 4) -> int:
"""粗略估算文本的 token 数"""
return max(1, len(text) // divisor)
# ═══════════════════════════════════════════════════════════════
# SSE 流 keepalive 包装器(通用)
# ═══════════════════════════════════════════════════════════════
async def stream_with_keepalive(
event_stream: AsyncGenerator[str, None],
interval: int = 15,
ping_event: Optional[str] = None,
) -> AsyncGenerator[str, None]:
"""在 SSE 流中插入周期性的 ping keepalive 事件,防止客户端断连。
支持 OpenAI 和 Anthropic 两种 ping 格式。
Args:
event_stream: SSE 事件字符串生成器
interval: keepalive 发送间隔(秒)
ping_event: 自定义 ping 事件字符串(None 则使用默认 Anthropic ping 格式)
Yields:
SSE 格式字符串
"""
if ping_event is None:
ping_event = f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n"
last_event = asyncio.get_event_loop().time()
while True:
try:
coro = event_stream.__anext__()
event_text = await asyncio.wait_for(coro, timeout=interval)
last_event = asyncio.get_event_loop().time()
yield event_text
except StopAsyncIteration:
break
except asyncio.TimeoutError:
yield ping_event
last_event = asyncio.get_event_loop().time()