-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathrun_server.py
More file actions
71 lines (51 loc) · 1.99 KB
/
run_server.py
File metadata and controls
71 lines (51 loc) · 1.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# Tencent is pleased to support the open source community by making tRPC-Agent-Python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# tRPC-Agent-Python is licensed under Apache-2.0.
"""A2A Server Example
This example uses the standard A2A SDK server (A2AStarletteApplication) to serve
a trpc-agent as an A2A service over plain HTTP, with the standard protocol
(artifact-first streaming and unprefixed metadata keys).
"""
import uvicorn
from dotenv import load_dotenv
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from trpc_agent_sdk.server.a2a import TrpcA2aAgentExecutorConfig
from trpc_agent_sdk.server.a2a import TrpcA2aAgentService
load_dotenv()
HOST = "127.0.0.1"
PORT = 18081
def create_a2a_service() -> TrpcA2aAgentService:
"""Create A2A service with LlmAgent (standard protocol).
This service wraps a weather query agent and exposes it via A2A protocol
using artifact-first streaming and unprefixed metadata keys.
"""
from agent.agent import root_agent
executor_config = TrpcA2aAgentExecutorConfig()
a2a_svc = TrpcA2aAgentService(
service_name="weather_agent_standard_service",
agent=root_agent,
executor_config=executor_config,
)
a2a_svc.initialize()
return a2a_svc
def serve():
"""Start the A2A server using standard HTTP (uvicorn + Starlette)."""
a2a_svc = create_a2a_service()
request_handler = DefaultRequestHandler(
agent_executor=a2a_svc,
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
agent_card=a2a_svc.agent_card,
http_handler=request_handler,
)
print("Starting A2A server (standard protocol over HTTP)...")
print(f"Listening on: http://{HOST}:{PORT}")
print(f"Agent card: http://{HOST}:{PORT}/.well-known/agent.json")
uvicorn.run(server.build(), host=HOST, port=PORT)
if __name__ == "__main__":
serve()