-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.ts
More file actions
330 lines (311 loc) · 13 KB
/
engine.ts
File metadata and controls
330 lines (311 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import { query, type SDKUserMessage } from "@anthropic-ai/claude-agent-sdk";
import type {
ClaudeAgentOptions,
EngineCapabilities,
EngineContext,
EngineDriver,
EngineEvent,
UserMessage,
} from "@open-gitagent/protocol";
import { nopLogger } from "@open-gitagent/protocol";
import { buildCanUseTool, buildPreToolUseHook } from "./permission-bridge.js";
import { deriveEngineUuid } from "./derive-uuid.js";
const CAPABILITIES: EngineCapabilities = {
streamingInput: true,
partialMessages: true,
permissionCallback: true,
sessions: true,
budget: true,
};
/**
* EngineDriver implementation backed by @anthropic-ai/claude-agent-sdk.
*
* Owns:
* - Adapting our streaming-input queue (UserMessage[]) to the SDK's
* AsyncIterable<SDKUserMessage>.
* - Forwarding every SDKMessage as `EngineEvent { kind: "sdk_message" }`
* verbatim — the harness server emits these as SSE `event: sdk_message`.
* - Wiring permission callbacks via the bridge.
*
* Does NOT own:
* - Translation from GAP/identity → ClaudeAgentOptions (that's the loader's job).
* - HTTP routing or SSE serialization (framework's job).
*/
export class ClaudeAgentEngine implements EngineDriver<ClaudeAgentOptions> {
readonly name = "claude-agent-sdk";
readonly capabilities = CAPABILITIES;
async *startSession(
ctx: EngineContext<ClaudeAgentOptions>,
): AsyncIterable<EngineEvent> {
const log = ctx.logger ?? nopLogger;
const turnStartedAt = Date.now();
const model = (ctx.options as { model?: string }).model;
log.info("engine.start", {
engine: "claude-agent-sdk",
sessionId: ctx.sessionId,
model,
workdir: ctx.workdir,
});
const prompt = adaptUserMessages(ctx.userMessageQueue, ctx.sessionId);
const abortController = signalToController(ctx.abortSignal);
// SessionStore wiring. Strategy:
// - Always pass `sessionStore` when set (so the SDK appends turn
// entries).
// - Always pin `sessionId` to a deterministic UUIDv5 derived from
// the harness sessionId. This makes the SDK write+read under the
// SAME key across turns; without it the SDK auto-generates a fresh
// UUID per turn and the next turn can't find anything to resume.
// - Only pass `resume: <uuid>` when prior entries actually exist
// under that key. The SDK errors when asked to resume a UUID with
// no prior data.
const engineUuid = deriveEngineUuid(ctx.sessionId);
let storeOpts: {
sessionStore?: typeof ctx.sessionStore;
sessionId?: string;
resume?: string;
} = {};
if (ctx.sessionStore) {
const prior = await ctx.sessionStore.load({
projectKey: PROJECT_KEY,
sessionId: engineUuid,
});
if (prior && prior.length > 0) {
// Resuming an existing session: pass `resume` only. The Claude SDK
// uses the resumed session's id internally for future appends.
storeOpts = { sessionStore: ctx.sessionStore, resume: engineUuid };
} else {
// Fresh session under our deterministic key: pin `sessionId` so
// appends land under the same key the next turn will look up.
storeOpts = { sessionStore: ctx.sessionStore, sessionId: engineUuid };
}
}
// `temperature` is part of our cross-engine options surface (Wedge 1.7)
// but `@anthropic-ai/claude-agent-sdk` v0.2.x's public `Options` type
// doesn't expose it. Warn once per process so callers know it's a no-op
// here — they can switch to harness="gitagent" if temperature matters,
// or wait for the upstream SDK to expose the field.
const flatTemperature = (ctx.options as { temperature?: number }).temperature;
if (flatTemperature !== undefined) warnTemperatureUnsupported();
const options: ClaudeAgentOptions = {
...stripFlatTemperature(ctx.options),
cwd: ctx.workdir,
// Merge essential host envs (HOME, PATH, ...) with caller-supplied envs.
// The Claude CLI subprocess writes transcript-mirror JSONL files to
// $HOME/.claude/projects/<projectKey>/<sessionId>.jsonl, and the SDK's
// mirror watcher reads them back to call sessionStore.append(). If HOME
// isn't in the env we pass to query(), the subprocess can't resolve the
// write path and silently drops every mirror frame. ctx.envs alone
// typically only carries API keys — that's not enough.
env: { ...inheritEssentialHostEnv(), ...ctx.envs },
includePartialMessages: true,
abortController,
canUseTool: buildCanUseTool(ctx.onPermissionRequest),
// PreToolUse hook — fires even in bypassPermissions mode (where canUseTool
// is skipped) and its deny overrides any other permission decision. The
// harness routes this to its policy decider; without one, the hook
// returns allow and falls through to canUseTool / the default flow.
hooks: {
...((ctx.options as { hooks?: ClaudeAgentOptions["hooks"] }).hooks ?? {}),
PreToolUse: [{ hooks: [buildPreToolUseHook(ctx.onPermissionRequest)] }],
},
...(ctx.budget?.maxUsd !== undefined ? { maxBudgetUsd: ctx.budget.maxUsd } : {}),
...storeOpts,
};
try {
for await (const message of query({ prompt, options })) {
if (ctx.abortSignal.aborted) break;
logSdkMessage(log, ctx.sessionId, message);
// Surface token/cost telemetry BEFORE the message itself. SDKResultMessage
// is the turn terminator — once the SDK consumer sees it, my issue #2
// fix synthesizes a `ca_session_ended` and stops reading. So usage has
// to land on the wire BEFORE the result, otherwise it's dropped. The
// consumer (SDK aggregator) uses costSemantic="cumulative" to take the
// max and sums the per-turn tokens. Never compute cost client-side — see #5.
const snapshot = toUsageSnapshot(message);
if (snapshot) {
log.debug("engine.usage", {
sessionId: ctx.sessionId,
inputTokens: snapshot.kind === "ca_usage_snapshot" ? snapshot.inputTokens : undefined,
outputTokens: snapshot.kind === "ca_usage_snapshot" ? snapshot.outputTokens : undefined,
costUsd: snapshot.kind === "ca_usage_snapshot" ? snapshot.costUsd : undefined,
});
yield snapshot;
}
yield { kind: "sdk_message", payload: message };
}
log.info("engine.turn.end", {
engine: "claude-agent-sdk",
sessionId: ctx.sessionId,
durationMs: Date.now() - turnStartedAt,
});
} catch (err) {
log.error("engine.error", {
engine: "claude-agent-sdk",
sessionId: ctx.sessionId,
error: err instanceof Error ? err.message : String(err),
});
throw err;
}
}
}
/**
* Extract tool_use, tool_result, and assistant_text events from each
* SDKMessage as it flows through. Pure log emission — no transformation.
*/
function logSdkMessage(log: { debug: (e: string, f?: Record<string, unknown>) => void }, sessionId: string, message: unknown): void {
const m = message as {
type?: string;
message?: { content?: Array<{ type: string; name?: string; id?: string; text?: string; tool_use_id?: string; is_error?: boolean; content?: unknown }> };
};
if (m?.type === "assistant" && Array.isArray(m.message?.content)) {
for (const block of m.message.content) {
if (block.type === "tool_use") {
log.debug("engine.tool_use", { sessionId, name: block.name, callId: block.id });
} else if (block.type === "text" && typeof block.text === "string") {
log.debug("engine.assistant_text", { sessionId, textLen: block.text.length });
}
}
} else if (m?.type === "user" && Array.isArray(m.message?.content)) {
for (const block of m.message.content) {
if (block.type === "tool_result") {
const bytes = typeof block.content === "string" ? block.content.length : JSON.stringify(block.content ?? "").length;
log.debug("engine.tool_result", { sessionId, callId: block.tool_use_id, isError: block.is_error, bytes });
}
}
}
}
/**
* Translate an SDKMessage to a `ca_usage_snapshot` event if it carries usage data.
* Currently only `SDKResultMessage` (success or error) does. Returns `undefined`
* for every other message type so the caller can no-op.
*
* Field reads use snake_case (wire-side from Anthropic API) since the SDK
* forwards Anthropic's JSON shape unchanged at runtime.
*/
function toUsageSnapshot(message: unknown): EngineEvent | undefined {
const m = message as {
type?: string;
total_cost_usd?: number;
usage?: {
input_tokens?: number;
output_tokens?: number;
cache_creation_input_tokens?: number;
cache_read_input_tokens?: number;
};
};
if (m?.type !== "result") return undefined;
const u = m.usage;
// Skip the snapshot if neither tokens nor cost are present — no signal to forward.
if (!u && m.total_cost_usd === undefined) return undefined;
return {
kind: "ca_usage_snapshot",
...(u?.input_tokens !== undefined ? { inputTokens: u.input_tokens } : {}),
...(u?.output_tokens !== undefined ? { outputTokens: u.output_tokens } : {}),
...(u?.cache_creation_input_tokens !== undefined
? { cacheCreationInputTokens: u.cache_creation_input_tokens }
: {}),
...(u?.cache_read_input_tokens !== undefined
? { cacheReadInputTokens: u.cache_read_input_tokens }
: {}),
...(m.total_cost_usd !== undefined
? { costUsd: m.total_cost_usd, costSemantic: "cumulative" as const }
: {}),
};
}
/**
* Project key used when probing the SessionStore. Stable across processes so
* a session written by one harness can be loaded by another against the same
* store. Engines for other projects should use their own constants.
*/
const PROJECT_KEY = "computeragent";
/**
* Bridges our `AsyncIterable<UserMessage>` to the SDK's expected
* `AsyncIterable<SDKUserMessage>`. The SDK's shape requires a `session_id` on
* each message — we synthesize one if the host hasn't supplied one.
*/
async function* adaptUserMessages(
queue: AsyncIterable<UserMessage>,
sessionId: string = "computeragent-session",
): AsyncIterable<SDKUserMessage> {
for await (const m of queue) {
yield {
type: "user",
session_id: sessionId,
message: { role: "user", content: m.content as never },
parent_tool_use_id: null,
};
}
}
/** Wraps an AbortSignal in a fresh controller the SDK can hold a reference to. */
function signalToController(signal: AbortSignal): AbortController {
const ctrl = new AbortController();
if (signal.aborted) ctrl.abort();
else signal.addEventListener("abort", () => ctrl.abort(), { once: true });
return ctrl;
}
/**
* Forward the essential host environment so the Claude CLI subprocess can
* resolve standard paths ($HOME, PATH, ...). Without these, the SDK's
* transcript-mirror writes silently drop because the CLI can't find
* $HOME/.claude/projects/ to write JSONL files into, which means
* sessionStore.append() never fires for any session entry.
*
* Caller envs (api keys, etc.) override these on conflict.
*/
export function inheritEssentialHostEnv(): Record<string, string> {
const out: Record<string, string> = {};
for (const k of [
// POSIX + XDG basics — required for the SDK to resolve $HOME, $PATH, etc.
// Without these, transcript-mirror writes silently drop.
"HOME",
"PATH",
"USER",
"LOGNAME",
"LANG",
"LC_ALL",
"CLAUDE_CONFIG_DIR",
"XDG_CONFIG_HOME",
"XDG_DATA_HOME",
// Bedrock + AWS IRSA passthrough (task #68 Phase 2a) — when the caller
// routes the agent via Bedrock, the AWS SDK's default credential chain
// needs these. Picked up automatically from the pod env (IRSA injects
// AWS_ROLE_ARN + AWS_WEB_IDENTITY_TOKEN_FILE) or developer shell.
"CLAUDE_CODE_USE_BEDROCK",
"AWS_REGION",
"AWS_DEFAULT_REGION",
"AWS_BEDROCK_MODEL_ID",
"AWS_ROLE_ARN",
"AWS_WEB_IDENTITY_TOKEN_FILE",
"AWS_PROFILE",
"AWS_SHARED_CREDENTIALS_FILE",
"AWS_CONFIG_FILE",
]) {
const v = process.env[k];
if (v) out[k] = v;
}
return out;
}
/**
* Drop the flat `temperature` shortcut before spreading into ClaudeAgentOptions.
* The Claude Agent SDK v0.2.x doesn't accept `temperature` on `Options`, so
* leaving it in would land as an unknown property (mostly harmless but noisy).
* See Wedge 1.7.
*/
function stripFlatTemperature<T extends Record<string, unknown>>(
opts: T,
): Omit<T, "temperature"> {
const { temperature: _t, ...rest } = opts as T & { temperature?: number };
return rest as Omit<T, "temperature">;
}
let temperatureWarned = false;
function warnTemperatureUnsupported(): void {
if (temperatureWarned) return;
temperatureWarned = true;
// eslint-disable-next-line no-console
console.warn(
"[computeragent] `temperature` is set but the claude-agent-sdk engine (v0.2.x) " +
"doesn't expose temperature on its public Options type — it has no effect. " +
"Use `harness: \"gitagent\"` if temperature control matters, or wait for the " +
"Anthropic SDK to add the field. (warned once per process)",
);
}