-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathjsonpipe.py
More file actions
197 lines (158 loc) · 5.34 KB
/
jsonpipe.py
File metadata and controls
197 lines (158 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python3
"""JSON output normalizer for claude CLI.
Modes (pass as first arg):
json — read all stdin, parse as JSON, normalize keys, output
stream-json — read JSONL lines, normalize each, output immediately
json-verbose — read JSONL, assemble into single JSON with turns array
"""
import hashlib
import json
import sys
_CONTENT_TRUNCATE = 2000
def _to_camel(name: str) -> str:
parts = name.split("_")
return parts[0] + "".join(p.capitalize() for p in parts[1:])
def _normalize_keys(obj):
if isinstance(obj, dict):
return {_to_camel(k): _normalize_keys(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_normalize_keys(item) for item in obj]
return obj
def _normalize_line(line: str) -> str:
line = line.strip()
if not line:
return ""
try:
parsed = json.loads(line)
return json.dumps(_normalize_keys(parsed))
except (json.JSONDecodeError, ValueError):
return line
# ── json-verbose assembly ───────────────────────────────────────────────────
def _extract_tool_uses(content):
out = []
for block in content:
if block.get("type") != "tool_use":
continue
out.append(
{
"type": "tool_use",
"id": block["id"],
"name": block["name"],
"input": block.get("input", {}),
}
)
return out
def _extract_text(content):
out = []
for block in content:
if block.get("type") != "text":
continue
out.append({"type": "text", "text": block["text"]})
return out
def _truncate_content(text: str) -> dict:
if len(text) <= _CONTENT_TRUNCATE:
return {"content": text}
sha = hashlib.sha256(text.encode()).hexdigest()
return {
"content": text[:_CONTENT_TRUNCATE],
"truncated": True,
"total_length": len(text),
"sha256": sha,
}
def _extract_tool_results(content):
out = []
for block in content:
if block.get("type") != "tool_result":
continue
raw = block.get("content", "")
if isinstance(raw, str):
text = raw
elif isinstance(raw, list):
texts = [b.get("text", "") for b in raw if b.get("type") == "text"]
text = "\n".join(texts) if texts else str(raw)
else:
text = str(raw)
out.append(
{
"type": "tool_result",
"tool_use_id": block.get("tool_use_id", ""),
"is_error": block.get("is_error", False),
**_truncate_content(text),
}
)
return out
def _assemble(lines):
turns = []
result = None
system_init = None
for line in lines:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
etype = event.get("type", "")
if etype == "system" and event.get("subtype") == "init":
system_init = {
"session_id": event.get("session_id", ""),
"model": event.get("model", ""),
"cwd": event.get("cwd", ""),
"tools": event.get("tools", []),
}
continue
if etype == "assistant":
msg = event.get("message", {})
content = msg.get("content", [])
parts = _extract_text(content) + _extract_tool_uses(content)
if not parts:
continue
turns.append({"role": "assistant", "content": parts})
continue
if etype == "user":
msg = event.get("message", {})
content = msg.get("content", [])
tool_results = _extract_tool_results(content)
if not tool_results:
continue
turns.append({"role": "tool_result", "content": tool_results})
continue
if etype == "result":
result = event
continue
if not result:
return {
"type": "result",
"subtype": "error",
"isError": True,
"result": "no result event found in stream",
"turns": turns,
}
result["turns"] = turns
if system_init:
result["system"] = system_init
return _normalize_keys(result)
# ── main ────────────────────────────────────────────────────────────────────
def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "json"
if mode == "stream-json":
for line in sys.stdin:
normalized = _normalize_line(line)
if normalized:
sys.stdout.write(normalized + "\n")
sys.stdout.flush()
return
if mode == "json-verbose":
lines = sys.stdin.readlines()
output = _assemble(lines)
json.dump(output, sys.stdout)
sys.stdout.write("\n")
return
# json mode — read all, normalize
raw = sys.stdin.read().strip()
if not raw:
return
sys.stdout.write(_normalize_line(raw) + "\n")
if __name__ == "__main__":
main()