Omni Agent 现在支持服务器发送事件 (SSE) 的流式输出功能,让你可以实时查看 agent 的执行过程。
- ✅ 实时思考流 - 查看 agent 的思考过程
- ✅ 内容流式输出 - 逐字输出 agent 的回复
- ✅ 工具调用事件 - 实时显示工具的调用和结果
- ✅ 步骤进度 - 追踪 agent 执行的每个步骤
- ✅ Token 使用情况 - 实时监控 token 使用量
POST /api/v1/agent/run/stream
{
"message": "你的任务描述",
"max_steps": 50 // 可选,默认 50
}使用 Server-Sent Events (SSE) 格式,每个事件包含:
data: {"type": "event_type", "data": {...}}
日志文件路径信息
{
"type": "log_file",
"data": {
"log_file": "/home/user/.omni-agents/log/agent_run_20251114_114221.log"
}
}步骤开始信息
{
"type": "step",
"data": {
"step": 1,
"max_steps": 50,
"tokens": 154,
"token_limit": 120000
}
}Agent 思考过程(增量)
{
"type": "thinking",
"data": {
"delta": "用户要求我..."
}
}Agent 回复内容(增量)
{
"type": "content",
"data": {
"delta": "你好!"
}
}工具调用开始
{
"type": "tool_call",
"data": {
"tool": "bash",
"arguments": {
"command": "date '+%A'"
}
}
}工具执行结果
{
"type": "tool_result",
"data": {
"tool": "bash",
"success": true,
"content": "Friday",
"error": null,
"execution_time": 0.01
}
}任务完成
{
"type": "done",
"data": {
"message": "最终的回复内容",
"steps": 2,
"reason": "completed"
}
}错误事件
{
"type": "error",
"data": {
"message": "错误描述",
"reason": "max_steps_reached"
}
}import httpx
import json
import asyncio
async def stream_agent():
url = "http://localhost:8000/api/v1/agent/run/stream"
payload = {
"message": "你好,请介绍一下自己",
"max_steps": 10
}
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream("POST", url, json=payload) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
event = json.loads(line[6:])
event_type = event["type"]
event_data = event["data"]
if event_type == "content":
print(event_data["delta"], end="", flush=True)
elif event_type == "done":
print("\n完成!")
break
asyncio.run(stream_agent())curl -X POST "http://localhost:8000/api/v1/agent/run/stream" \
-H "Content-Type: application/json" \
-d '{"message": "你好", "max_steps": 5}' \
--no-bufferasync function streamAgent(message) {
const response = await fetch('http://localhost:8000/api/v1/agent/run/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const event = JSON.parse(line.slice(6));
if (event.type === 'content') {
process.stdout.write(event.data.delta);
} else if (event.type === 'done') {
console.log('\n完成!');
}
}
}
}
}
streamAgent('你好,请介绍一下自己');LLMClient.generate_stream() 方法:
- 发送
stream=True参数到 LLM API - 解析 SSE 流式响应
- 生成增量事件(thinking_delta, content_delta, tool_use)
Agent.run_stream() 方法:
- 协调整个执行流程
- 调用 LLM 流式生成
- 执行工具并流式返回结果
- 管理消息历史和状态
/run/stream endpoint:
- 接收用户请求
- 调用 agent 流式方法
- 将事件转换为 SSE 格式
- 返回 StreamingResponse
-
设置合理的超时时间
httpx.AsyncClient(timeout=120.0) # 2分钟超时
-
处理网络中断
try: async for event in stream: process_event(event) except httpx.ReadTimeout: print("连接超时,请重试")
-
缓冲输出
- 前端显示时考虑使用缓冲区
- 避免每个字符都触发 UI 重绘
| 特性 | 普通 API (/run) |
流式 API (/run/stream) |
|---|---|---|
| 响应方式 | 等待完成后返回 | 实时流式输出 |
| 用户体验 | 等待时间长 | 实时反馈,体验好 |
| 适用场景 | 后台任务、批处理 | 交互式应用、聊天界面 |
| 实现复杂度 | 简单 | 需要处理 SSE |
| Token 监控 | 事后查看 | 实时监控 |
可能原因:
- Nginx/代理服务器缓冲
- 客户端未正确处理 SSE
- 超时设置太短
解决方案:
# Nginx 配置
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
chunked_transfer_encoding off;可能原因:
- 任务执行时间超过超时限制
- 网络不稳定
解决方案:
- 增加客户端超时时间
- 实现重连机制
- 添加心跳检测
-
始终处理错误事件
if event_type == "error": handle_error(event_data["message"])
-
显示执行进度
if event_type == "step": progress = step / max_steps * 100 update_progress_bar(progress)
-
区分思考和内容
if event_type == "thinking": show_in_gray(delta) # 以灰色显示思考过程 elif event_type == "content": show_in_black(delta) # 正常显示回复内容
-
超时处理
async with httpx.AsyncClient(timeout=httpx.Timeout( connect=10.0, # 连接超时 read=120.0, # 读取超时 write=10.0, # 写入超时 )) as client: ...
项目提供了测试脚本 test_stream.py:
# 运行流式输出测试
uv run python test_stream.py该脚本会:
- 连接流式 API
- 彩色显示不同类型的事件
- 展示完整的执行过程
- 并发限制 - 每个连接会占用一个 agent 实例,注意服务器资源
- 超时设置 - 确保客户端超时时间大于任务预期执行时间
- 错误处理 - 必须处理网络中断、超时等异常情况
- 资源清理 - 确保连接正确关闭,释放资源