-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
executable file
·158 lines (125 loc) · 5.11 KB
/
client.py
File metadata and controls
executable file
·158 lines (125 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
"""
Claude-in-a-Box CLI Client
Send tasks to the server and watch streaming output.
Usage:
./client.py "Fix the bug in main.py"
./client.py --server http://localhost:8080 "Run tests"
./client.py status
./client.py stop
./client.py history
"""
import argparse
import json
import sys
import httpx
import os
DEFAULT_SERVER = os.environ.get("CLAUDE_BOX_SERVER", "http://localhost:8080")
def send_task(server: str, prompt: str, workdir: str = None):
"""Send task and stream SSE output."""
print(f"Sending task to {server}...")
print(f"Prompt: {prompt[:100]}{'...' if len(prompt) > 100 else ''}")
print("-" * 60)
payload = {"prompt": prompt}
if workdir:
payload["workdir"] = workdir
with httpx.stream(
"POST",
f"{server}/task",
json=payload,
timeout=None, # No timeout for streaming
) as response:
if response.status_code == 409:
data = response.json()
print(f"Error: Agent is busy with task {data.get('current_task')}")
sys.exit(1)
if response.status_code != 200:
print(f"Error: {response.status_code}")
print(response.text)
sys.exit(1)
# Parse SSE stream
event_type = None
data_buffer = ""
for line in response.iter_lines():
line = line.strip()
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data_buffer = line[5:].strip()
try:
data = json.loads(data_buffer)
except json.JSONDecodeError:
continue
# Handle events
if event_type == "start":
print(f"[STARTED] Task ID: {data.get('task_id')}")
print("-" * 60)
elif event_type == "output":
line_text = data.get("line", "")
# Try to parse as JSON (Claude's stream-json format)
try:
parsed = json.loads(line_text)
if parsed.get("type") == "assistant":
content = parsed.get("message", {}).get("content", [])
for block in content:
if block.get("type") == "text":
print(block.get("text", ""), end="", flush=True)
elif block.get("type") == "tool_use":
print(f"\n[TOOL] {block.get('name')}: {block.get('input', {})}")
elif parsed.get("type") == "result":
print(f"\n[RESULT] Cost: ${parsed.get('cost_usd', 0):.4f}")
except json.JSONDecodeError:
# Plain text output
print(line_text)
elif event_type == "done":
print("-" * 60)
state = data.get("state", "unknown")
exit_code = data.get("exit_code", -1)
print(f"[{state.upper()}] Exit code: {exit_code}")
elif event_type == "error":
print(f"[ERROR] {data.get('error')}")
elif event_type == "cancelled":
print("[CANCELLED] Task was stopped")
data_buffer = ""
event_type = None
elif line.startswith(":"):
# Comment/heartbeat, ignore
pass
def get_status(server: str):
"""Get current agent status."""
response = httpx.get(f"{server}/status")
print(json.dumps(response.json(), indent=2))
def stop_task(server: str):
"""Stop current task."""
response = httpx.post(f"{server}/stop")
print(json.dumps(response.json(), indent=2))
def get_history(server: str, limit: int = 10):
"""Get task history."""
response = httpx.get(f"{server}/history", params={"limit": limit})
data = response.json()
for task in data.get("history", []):
state = task.get("state", "?")
prompt = task.get("prompt", "")[:50]
exit_code = task.get("exit_code", "?")
print(f"[{state}] {task.get('id')} - {prompt}... (exit: {exit_code})")
def main():
parser = argparse.ArgumentParser(description="Claude-in-a-Box CLI")
parser.add_argument("command", nargs="?", help="Task prompt or command (status/stop/history)")
parser.add_argument("--server", "-s", default=DEFAULT_SERVER, help="Server URL")
parser.add_argument("--workdir", "-w", help="Working directory for task")
parser.add_argument("--limit", "-n", type=int, default=10, help="History limit")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == "status":
get_status(args.server)
elif args.command == "stop":
stop_task(args.server)
elif args.command == "history":
get_history(args.server, args.limit)
else:
# Treat as task prompt
send_task(args.server, args.command, args.workdir)
if __name__ == "__main__":
main()