Skip to content

Commit 86accea

Browse files
author
Chojan Shang
committed
refactor: update examples for basic echo
1 parent a8afbd6 commit 86accea

3 files changed

Lines changed: 116 additions & 148 deletions

File tree

examples/agent.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,97 @@
11
import asyncio
2+
from typing import Any
23

34
from acp import (
45
Agent,
56
AgentSideConnection,
67
AuthenticateRequest,
8+
AuthenticateResponse,
79
CancelNotification,
810
InitializeRequest,
911
InitializeResponse,
10-
LoadSessionRequest,
1112
NewSessionRequest,
1213
NewSessionResponse,
1314
PromptRequest,
1415
PromptResponse,
16+
SessionNotification,
17+
SetSessionModeRequest,
18+
SetSessionModeResponse,
1519
stdio_streams,
20+
PROTOCOL_VERSION,
1621
)
22+
from acp.schema import ContentBlock1, SessionUpdate2
1723

1824

19-
class EchoAgent(Agent):
25+
class ExampleAgent(Agent):
26+
def __init__(self, conn: AgentSideConnection) -> None:
27+
self._conn = conn
28+
self._next_session_id = 0
29+
2030
async def initialize(self, params: InitializeRequest) -> InitializeResponse:
21-
# Avoid serializer warnings by omitting defaults
22-
return InitializeResponse(protocolVersion=params.protocolVersion)
31+
return InitializeResponse(protocolVersion=PROTOCOL_VERSION, agentCapabilities=None, authMethods=[])
2332

24-
async def newSession(self, params: NewSessionRequest) -> NewSessionResponse:
25-
return NewSessionResponse(sessionId="sess-1")
33+
async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse | None: # noqa: ARG002
34+
return {}
2635

27-
async def loadSession(self, params: LoadSessionRequest) -> None:
28-
return None
36+
async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: # noqa: ARG002
37+
session_id = f"sess-{self._next_session_id}"
38+
self._next_session_id += 1
39+
return NewSessionResponse(sessionId=session_id)
2940

30-
async def authenticate(self, params: AuthenticateRequest) -> None:
41+
async def loadSession(self, params): # type: ignore[override]
3142
return None
3243

44+
async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse | None: # noqa: ARG002
45+
return {}
46+
3347
async def prompt(self, params: PromptRequest) -> PromptResponse:
34-
# Normally you'd stream updates via sessionUpdate
48+
# Stream a couple of agent message chunks, then end the turn
49+
# 1) Prefix
50+
await self._conn.sessionUpdate(
51+
SessionNotification(
52+
sessionId=params.sessionId,
53+
update=SessionUpdate2(
54+
sessionUpdate="agent_message_chunk",
55+
content=ContentBlock1(type="text", text="Client sent: "),
56+
),
57+
)
58+
)
59+
# 2) Echo text blocks
60+
for block in params.prompt:
61+
if isinstance(block, dict):
62+
# tolerate raw dicts
63+
if block.get("type") == "text":
64+
text = str(block.get("text", ""))
65+
else:
66+
text = f"<{block.get('type', 'content')}>"
67+
else:
68+
# pydantic model ContentBlock1
69+
text = getattr(block, "text", "<content>")
70+
await self._conn.sessionUpdate(
71+
SessionNotification(
72+
sessionId=params.sessionId,
73+
update=SessionUpdate2(
74+
sessionUpdate="agent_message_chunk",
75+
content=ContentBlock1(type="text", text=text),
76+
),
77+
)
78+
)
3579
return PromptResponse(stopReason="end_turn")
3680

37-
async def cancel(self, params: CancelNotification) -> None:
81+
async def cancel(self, params: CancelNotification) -> None: # noqa: ARG002
82+
return None
83+
84+
async def extMethod(self, method: str, params: dict) -> dict: # noqa: ARG002
85+
return {"example": "response"}
86+
87+
async def extNotification(self, method: str, params: dict) -> None: # noqa: ARG002
3888
return None
3989

4090

4191
async def main() -> None:
4292
reader, writer = await stdio_streams()
4393
# For an agent process, local writes go to client stdin (writer=stdout)
44-
AgentSideConnection(lambda _conn: EchoAgent(), writer, reader)
45-
# Keep running; in a real agent you would await tasks or add your own loop
94+
AgentSideConnection(lambda conn: ExampleAgent(conn), writer, reader)
4695
await asyncio.Event().wait()
4796

4897

examples/client.py

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import os
33
import sys
4+
from typing import Optional
45

56
from acp import (
67
Client,
@@ -9,64 +10,66 @@
910
InitializeRequest,
1011
NewSessionRequest,
1112
PromptRequest,
12-
ReadTextFileRequest,
13-
ReadTextFileResponse,
14-
RequestPermissionRequest,
15-
RequestPermissionResponse,
1613
SessionNotification,
17-
WriteTextFileRequest,
18-
stdio_streams,
1914
)
2015

2116

22-
class MinimalClient(Client):
23-
async def writeTextFile(self, params: WriteTextFileRequest) -> None:
24-
print(f"write {params.path}", file=sys.stderr)
25-
26-
async def readTextFile(self, params: ReadTextFileRequest) -> ReadTextFileResponse:
27-
return ReadTextFileResponse(content="example")
17+
class ExampleClient(Client):
18+
async def sessionUpdate(self, params: SessionNotification) -> None:
19+
update = params.update
20+
kind = getattr(update, "sessionUpdate", None) if not isinstance(update, dict) else update.get("sessionUpdate")
21+
if kind == "agent_message_chunk":
22+
# Handle both dict and model shapes
23+
content = update["content"] if isinstance(update, dict) else getattr(update, "content", None)
24+
text = content.get("text") if isinstance(content, dict) else getattr(content, "text", "<content>")
25+
print(f"| Agent: {text}")
2826

29-
async def requestPermission(self, params: RequestPermissionRequest) -> RequestPermissionResponse:
30-
return RequestPermissionResponse.model_validate({"outcome": {"outcome": "selected", "optionId": "allow"}})
3127

32-
async def sessionUpdate(self, params: SessionNotification) -> None:
33-
print(f"session update: {params}", file=sys.stderr)
28+
async def interactive_loop(conn: ClientSideConnection, session_id: str) -> None:
29+
loop = asyncio.get_running_loop()
30+
while True:
31+
try:
32+
line = await loop.run_in_executor(None, lambda: input("> "))
33+
except EOFError:
34+
break
35+
if not line:
36+
continue
37+
try:
38+
await conn.prompt(PromptRequest(sessionId=session_id, prompt=[{"type": "text", "text": line}]))
39+
except Exception as e: # noqa: BLE001
40+
print(f"error: {e}", file=sys.stderr)
3441

35-
# Optional terminal methods (not implemented in this minimal client)
36-
async def createTerminal(self, params) -> None:
37-
pass
3842

39-
async def terminalOutput(self, params) -> None:
40-
pass
43+
async def main(argv: list[str]) -> int:
44+
if len(argv) < 2:
45+
print("Usage: python examples/client.py AGENT_PROGRAM [ARGS...]", file=sys.stderr)
46+
return 2
4147

42-
async def releaseTerminal(self, params) -> None:
43-
pass
48+
# Spawn agent subprocess
49+
proc = await asyncio.create_subprocess_exec(
50+
sys.executable,
51+
*argv[1:],
52+
stdin=asyncio.subprocess.PIPE,
53+
stdout=asyncio.subprocess.PIPE,
54+
)
55+
assert proc.stdin and proc.stdout
4456

45-
async def waitForTerminalExit(self, params) -> None:
46-
pass
57+
# Connect to agent stdio
58+
conn = ClientSideConnection(lambda _agent: ExampleClient(), proc.stdin, proc.stdout)
4759

48-
async def killTerminal(self, params) -> None:
49-
pass
60+
# Initialize and create session
61+
await conn.initialize(InitializeRequest(protocolVersion=PROTOCOL_VERSION, clientCapabilities=None))
62+
new_sess = await conn.newSession(NewSessionRequest(mcpServers=[], cwd=os.getcwd()))
5063

64+
# Run REPL until EOF
65+
await interactive_loop(conn, new_sess.sessionId)
5166

52-
async def main() -> None:
53-
reader, writer = await stdio_streams()
54-
client_conn = ClientSideConnection(lambda _agent: MinimalClient(), writer, reader)
55-
# 1) initialize
56-
resp = await client_conn.initialize(InitializeRequest(protocolVersion=PROTOCOL_VERSION))
57-
print(f"Initialized with protocol version: {resp.protocolVersion}", file=sys.stderr)
58-
# 2) new session
59-
new_sess = await client_conn.newSession(NewSessionRequest(mcpServers=[], cwd=os.getcwd()))
60-
# 3) prompt
61-
await client_conn.prompt(
62-
PromptRequest(
63-
sessionId=new_sess.sessionId,
64-
prompt=[{"type": "text", "text": "Hello from client"}],
65-
)
66-
)
67-
# Small grace period to allow duplex messages to flush
68-
await asyncio.sleep(0.2)
67+
try:
68+
proc.terminate()
69+
except ProcessLookupError:
70+
pass
71+
return 0
6972

7073

7174
if __name__ == "__main__":
72-
asyncio.run(main())
75+
raise SystemExit(asyncio.run(main(sys.argv)))

examples/duet.py

Lines changed: 6 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,10 @@
11
import asyncio
2-
import contextlib
3-
import json
42
import os
5-
import signal
63
import sys
74
from pathlib import Path
85

96

10-
async def _relay(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, tag: str):
11-
try:
12-
while True:
13-
line = await reader.readline()
14-
if not line:
15-
break
16-
# Mirror to the other end unchanged
17-
writer.write(line)
18-
try:
19-
await writer.drain()
20-
except ConnectionError:
21-
break
22-
# Try to pretty-print the JSON-RPC message for visibility
23-
try:
24-
obj = json.loads(line.decode("utf-8", errors="replace"))
25-
pretty = json.dumps(obj, ensure_ascii=False, indent=2)
26-
print(f"[{tag}] {pretty}", file=sys.stderr)
27-
except Exception:
28-
# Non-JSON (shouldn't happen on the protocol stream)
29-
print(f"[{tag}] {line!r}", file=sys.stderr)
30-
finally:
31-
try:
32-
writer.close()
33-
await writer.wait_closed()
34-
except Exception:
35-
pass
36-
37-
38-
async def main() -> None:
7+
async def main() -> int:
398
root = Path(__file__).resolve().parent
409
agent_path = str(root / "agent.py")
4110
client_path = str(root / "client.py")
@@ -45,68 +14,15 @@ async def main() -> None:
4514
src_dir = str((root.parent / "src").resolve())
4615
env["PYTHONPATH"] = src_dir + os.pathsep + env.get("PYTHONPATH", "")
4716

48-
agent = await asyncio.create_subprocess_exec(
49-
sys.executable,
50-
agent_path,
51-
stdin=asyncio.subprocess.PIPE,
52-
stdout=asyncio.subprocess.PIPE,
53-
stderr=sys.stderr,
54-
env=env,
55-
)
56-
client = await asyncio.create_subprocess_exec(
17+
# Run the client and let it spawn the agent, wiring stdio automatically.
18+
proc = await asyncio.create_subprocess_exec(
5719
sys.executable,
5820
client_path,
59-
stdin=asyncio.subprocess.PIPE,
60-
stdout=asyncio.subprocess.PIPE,
61-
stderr=sys.stderr,
21+
agent_path,
6222
env=env,
6323
)
64-
65-
assert agent.stdout and agent.stdin and client.stdout and client.stdin
66-
67-
# Wire: agent.stdout -> client.stdin, client.stdout -> agent.stdin
68-
t1 = asyncio.create_task(_relay(agent.stdout, client.stdin, "agent→client"))
69-
t2 = asyncio.create_task(_relay(client.stdout, agent.stdin, "client→agent"))
70-
71-
# Handle shutdown
72-
stop = asyncio.Event()
73-
74-
def _on_sigint(*_):
75-
stop.set()
76-
77-
loop = asyncio.get_running_loop()
78-
try:
79-
loop.add_signal_handler(signal.SIGINT, _on_sigint)
80-
loop.add_signal_handler(signal.SIGTERM, _on_sigint)
81-
except NotImplementedError:
82-
pass
83-
84-
done, _ = await asyncio.wait(
85-
{
86-
t1,
87-
t2,
88-
asyncio.create_task(agent.wait()),
89-
asyncio.create_task(client.wait()),
90-
asyncio.create_task(stop.wait()),
91-
},
92-
return_when=asyncio.FIRST_COMPLETED,
93-
)
94-
95-
# Teardown
96-
for proc in (agent, client):
97-
if proc.returncode is None:
98-
with contextlib.suppress(ProcessLookupError):
99-
proc.terminate()
100-
try:
101-
await asyncio.wait_for(proc.wait(), 2)
102-
except asyncio.TimeoutError:
103-
with contextlib.suppress(ProcessLookupError):
104-
proc.kill()
105-
for task in (t1, t2):
106-
task.cancel()
107-
with contextlib.suppress(asyncio.CancelledError):
108-
await task
24+
return await proc.wait()
10925

11026

11127
if __name__ == "__main__":
112-
asyncio.run(main())
28+
raise SystemExit(asyncio.run(main()))

0 commit comments

Comments
 (0)