-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchat-handle.ts
More file actions
245 lines (229 loc) · 9.58 KB
/
chat-handle.ts
File metadata and controls
245 lines (229 loc) · 9.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import type { HarnessEvent, Logger } from "@open-gitagent/protocol";
import { nopLogger } from "@open-gitagent/protocol";
import type { ChatResult, PermissionDecision, UsageRollup } from "./types.js";
import { decisionToBody } from "./types.js";
interface ChatHandleDeps {
/** Resolves to the real sessionId once `POST /v1/sessions` returns. */
readonly sessionIdPromise: Promise<string>;
readonly events: AsyncIterable<HarnessEvent>;
/** Harness URL — async because substrates may boot lazily. */
readonly harnessUrlPromise: Promise<string>;
readonly fetchImpl: typeof fetch;
/** Hook fired by the handle for each ca_permission_request, before it auto-decides. */
readonly onPermissionRequest?: (
callId: string,
toolName: string,
input: unknown,
risk?: "low" | "medium" | "high" | "destructive",
) => Promise<PermissionDecision> | PermissionDecision;
/** Optional cleanup (e.g. delete the session) when the handle is fully consumed. */
readonly onComplete?: () => Promise<void> | void;
/**
* Optional client-side logger. When set, emits a one-line summary per
* consumed HarnessEvent — useful for the `debug: true` constructor flag.
*/
readonly logger?: Logger;
}
/**
* The return value of `agent.chat(input)`.
*
* Dual interface, by design:
* - `for await (const ev of handle)` — iterate raw HarnessEvents
* - `await handle` (or `await handle.result()`) — drain to a ChatResult
*
* Inspired by Anthropic's `client.messages.stream(...)` shape.
*/
export class ChatHandle implements AsyncIterable<HarnessEvent>, PromiseLike<ChatResult> {
private resultPromise: Promise<ChatResult> | undefined;
private readonly collectedMessages: unknown[] = [];
/** Running aggregate of usage snapshots seen during the turn. */
private readonly usage = {
inputTokens: 0,
outputTokens: 0,
cacheCreationInputTokens: 0,
cacheReadInputTokens: 0,
cumulativeCostUsd: undefined as number | undefined, // for "cumulative" semantic — track max
deltaCostUsdSum: 0, // for "delta" semantic — sum
sawCost: false,
sawDelta: false,
};
constructor(private readonly deps: ChatHandleDeps) {}
/** Resolves to the session id (after `POST /v1/sessions` returns). */
sessionId(): Promise<string> {
return this.deps.sessionIdPromise;
}
/**
* Snapshot of the usage aggregator at this moment. Callable during or
* after iteration — `for await (const ev of handle) { ... handle.getUsage() ... }`
* works, as does reading it after the loop exits.
*
* Equivalent to `result.usage` after `await handle`, but doesn't require
* a second drain — useful when the caller is streaming events themselves
* and just wants the rollup at the end.
*/
getUsage(): UsageRollup {
return this.finalizeUsage();
}
/** Iterate raw events. Yields once per HarnessEvent the server emits. */
async *[Symbol.asyncIterator](): AsyncIterator<HarnessEvent> {
const log = this.deps.logger ?? nopLogger;
for await (const ev of this.deps.events) {
logHarnessEvent(log, ev);
if (ev.kind === "sdk_message") this.collectedMessages.push(ev.payload);
if (ev.kind === "ca_permission_request") {
await this.handlePermission(ev.callId, ev.toolName, ev.input, ev.risk);
}
if (ev.kind === "ca_usage_snapshot") this.absorbUsage(ev);
yield ev;
if (ev.kind === "ca_session_ended") {
if (this.deps.onComplete) await this.deps.onComplete();
return;
}
}
}
/**
* Fold a usage snapshot into the running aggregate. Tokens always SUM.
* Cost depends on the engine's `costSemantic`:
* - "cumulative" (Claude SDK): keep the MAX value seen
* - "delta" (gitclaw): SUM the per-message values
* - undefined: treat as cumulative for safety
*
* If a single turn somehow mixes both semantics (e.g. two engines in a
* chain — not currently possible but defensive), we prefer the cumulative
* value to avoid double-counting.
*/
private absorbUsage(ev: Extract<HarnessEvent, { kind: "ca_usage_snapshot" }>): void {
if (ev.inputTokens !== undefined) this.usage.inputTokens += ev.inputTokens;
if (ev.outputTokens !== undefined) this.usage.outputTokens += ev.outputTokens;
if (ev.cacheCreationInputTokens !== undefined) {
this.usage.cacheCreationInputTokens += ev.cacheCreationInputTokens;
}
if (ev.cacheReadInputTokens !== undefined) {
this.usage.cacheReadInputTokens += ev.cacheReadInputTokens;
}
if (ev.costUsd !== undefined) {
this.usage.sawCost = true;
if (ev.costSemantic === "delta") {
this.usage.deltaCostUsdSum += ev.costUsd;
this.usage.sawDelta = true;
} else {
// "cumulative" or undefined — track the max.
if (this.usage.cumulativeCostUsd === undefined || ev.costUsd > this.usage.cumulativeCostUsd) {
this.usage.cumulativeCostUsd = ev.costUsd;
}
}
}
}
private finalizeUsage(): UsageRollup {
let costUsd: number | undefined;
if (this.usage.sawCost) {
// Prefer cumulative when both seen — see absorbUsage doc.
if (this.usage.cumulativeCostUsd !== undefined) {
costUsd = this.usage.cumulativeCostUsd;
} else if (this.usage.sawDelta) {
costUsd = this.usage.deltaCostUsdSum;
}
}
return {
inputTokens: this.usage.inputTokens,
outputTokens: this.usage.outputTokens,
cacheCreationInputTokens: this.usage.cacheCreationInputTokens,
cacheReadInputTokens: this.usage.cacheReadInputTokens,
costUsd,
};
}
/** Drain to completion and return the final result. Memoized. */
result(): Promise<ChatResult> {
if (!this.resultPromise) this.resultPromise = this.drain();
return this.resultPromise;
}
/** PromiseLike: `await handle` works the same as `await handle.result()`. */
then<TResult1 = ChatResult, TResult2 = never>(
onfulfilled?: ((value: ChatResult) => TResult1 | PromiseLike<TResult1>) | null | undefined,
onrejected?: ((reason: unknown) => TResult2 | PromiseLike<TResult2>) | null | undefined,
): PromiseLike<TResult1 | TResult2> {
return this.result().then(onfulfilled, onrejected);
}
/** Cancel the in-flight session via the harness server. */
async cancel(): Promise<void> {
const [sid, harnessUrl] = await Promise.all([this.deps.sessionIdPromise, this.deps.harnessUrlPromise]);
await this.deps.fetchImpl(`${harnessUrl}/v1/sessions/${sid}/cancel`, {
method: "POST",
});
}
/** Manually answer a permission request — mainly for callers iterating events directly. */
async respondToPermission(callId: string, decision: PermissionDecision): Promise<void> {
const [sid, harnessUrl] = await Promise.all([this.deps.sessionIdPromise, this.deps.harnessUrlPromise]);
await postPermission(this.deps.fetchImpl, harnessUrl, sid, callId, decision);
}
private async drain(): Promise<ChatResult> {
let ended: Extract<HarnessEvent, { kind: "ca_session_ended" }> | undefined;
for await (const ev of this) {
if (ev.kind === "ca_session_ended") ended = ev;
}
if (!ended) {
throw new Error("ChatHandle: stream closed without ca_session_ended");
}
return {
sessionId: await this.deps.sessionIdPromise,
messages: this.collectedMessages,
ended,
usage: this.finalizeUsage(),
};
}
private async handlePermission(
callId: string,
toolName: string,
input: unknown,
risk?: "low" | "medium" | "high" | "destructive",
): Promise<void> {
const decision = this.deps.onPermissionRequest
? await this.deps.onPermissionRequest(callId, toolName, input, risk)
: { decision: "allow" as const };
const [sid, harnessUrl] = await Promise.all([this.deps.sessionIdPromise, this.deps.harnessUrlPromise]);
await postPermission(this.deps.fetchImpl, harnessUrl, sid, callId, decision);
}
}
/**
* Surface one log line per HarnessEvent the client consumes. Keep it terse:
* the harness-side logs already give per-step detail; this is for the SDK
* caller to see at a glance what's coming across the wire.
*/
function logHarnessEvent(log: Logger, ev: HarnessEvent): void {
switch (ev.kind) {
case "ca_session_started":
log.info("event.session_started", { sessionId: ev.sessionId, engine: ev.engine });
break;
case "ca_session_ended":
log.info("event.session_ended", { sessionId: ev.sessionId, reason: ev.reason });
break;
case "ca_permission_request":
log.info("event.permission_request", { sessionId: ev.sessionId, callId: ev.callId, toolName: ev.toolName, risk: ev.risk });
break;
case "ca_usage_snapshot":
log.debug("event.usage", { sessionId: ev.sessionId, inputTokens: ev.inputTokens, outputTokens: ev.outputTokens, costUsd: ev.costUsd });
break;
case "sdk_message": {
const p = ev.payload as { type?: string; toolName?: string };
log.debug("event.sdk_message", { sessionId: ev.sessionId, type: p?.type, toolName: p?.toolName });
break;
}
default:
// Fallthrough: ev is `never` here because the union is exhausted. Cast
// for forward-compat — when new event kinds land we log them generically.
log.debug("event.unknown", { kind: (ev as { kind: string }).kind });
}
}
async function postPermission(
fetchImpl: typeof fetch,
harnessUrl: string,
sessionId: string,
callId: string,
decision: PermissionDecision,
): Promise<void> {
await fetchImpl(`${harnessUrl}/v1/sessions/${sessionId}/permission/${callId}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(decisionToBody(decision)),
});
}