From 4a536099213d9e221bdd55684e9d54b841852eb0 Mon Sep 17 00:00:00 2001 From: "Byron Miller (MOBB)" Date: Mon, 20 Apr 2026 08:35:45 -0500 Subject: [PATCH] Add streaming decode circuit breakers for budget and safety. Enable streamed backend decoding with cancellation hooks and apply a shared FSM guard so runs can be interrupted mid-generation on budget or safety violations before further side effects. Add parser and guard tests plus documentation for runtime control, model adaptation, and budget-exhaustion fallback routing. Made-with: Cursor --- docs/architecture.md | 2 + docs/model-adaptation-budget-control.md | 76 ++++ harness/README.md | 11 + harness/src/agents/chat-agent.ts | 27 +- harness/src/agents/coder-agent.ts | 27 +- harness/src/agents/governed-agent.ts | 31 +- harness/src/backends/index.ts | 11 +- harness/src/backends/mock.ts | 84 +++- harness/src/backends/openai-compat.ts | 359 +++++++++++++++++- harness/src/backends/types.ts | 49 ++- harness/src/core/index.ts | 6 + harness/src/core/llm-circuit-breaker.ts | 192 ++++++++++ harness/tests/llm-circuit-breaker.test.ts | 133 +++++++ harness/tests/openai-compat-streaming.test.ts | 164 ++++++++ 14 files changed, 1110 insertions(+), 62 deletions(-) create mode 100644 docs/model-adaptation-budget-control.md create mode 100644 harness/src/core/llm-circuit-breaker.ts create mode 100644 harness/tests/llm-circuit-breaker.test.ts create mode 100644 harness/tests/openai-compat-streaming.test.ts diff --git a/docs/architecture.md b/docs/architecture.md index 4f6f9cf..0a4017b 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -24,6 +24,8 @@ Picture data moving left to right on the **happy path**: Side channels include **budget** enforcement (RFC 0038) and **sandbox** allow/deny lists (RFC 0017), which can pre-empt a transition or force `fail_safe` without giving unsafe payloads back to the model. +For streamed decoding, deployments should treat budget control as an active circuit-breaker (preflight budget gate + mid-stream cancellation), not a post-hoc accounting report. The reference harness now supports this runtime pattern; see [Model adaptation for budget control](./model-adaptation-budget-control.md). + ## Major components These names describe responsibilities; a single deployment may fold multiple roles into one service, but the boundaries stay conceptually distinct. diff --git a/docs/model-adaptation-budget-control.md b/docs/model-adaptation-budget-control.md new file mode 100644 index 0000000..7865f07 --- /dev/null +++ b/docs/model-adaptation-budget-control.md @@ -0,0 +1,76 @@ +# Model Adaptation for Budget-Constrained Reasoning + +This note describes practical guidance for running Open-CoT with strict token budgets, streamed cancellation, and fallback routing. + +## Runtime control path (what the harness enforces) + +In the reference harness, budget/safety control is enforced at two layers: + +1. **Provider-side cap**: each call gets a per-request `max_tokens` cap derived from remaining budget. +2. **Harness-side stream breaker**: + - preflight prompt estimate gate, + - mid-stream completion-budget interruption, + - mid-stream safety interruption (runaway/pattern checks), + - forced transition to terminal FSM status before any follow-on side effect. + +This means budget enforcement is not dependent on model obedience. + +## Model behavior profile: what tends to work best + +Budget-following quality is usually higher for models with: + +- strong instruction adherence in system prompts, +- native tool-call behavior and function-schema compliance, +- stable short-form planning (can compress plans under hard limits), +- lower tendency to emit long reflective preambles. + +Budget-following quality is usually worse for models with: + +- weak instruction tuning (treats budget as advisory text), +- verbose default style (long chain-style narration before action), +- fragile tool-call formatting under constrained output length. + +These are deployment traits, not absolute rules. Evaluate with your own tasks and policies. + +## Fine-tuning / adaptation recommendations + +If you train or adapt models for this harness, prioritize data and objectives that reward controlled reasoning depth: + +1. **Budget-conditioned demonstrations** + - Include explicit `budget_remaining` context in prompts. + - Provide successful traces at multiple budgets (tight/medium/high). +2. **Compression preference** + - Reward concise plans that keep high-value steps and drop redundant rationale. +3. **Tool-first economy** + - Reward early tool requests when external evidence is required, instead of long speculative reasoning. +4. **Truncation-aware recovery** + - Include examples where the model says what is missing and asks for retry/escalation when budget is insufficient. +5. **Policy-aware refusal** + - Include traces that correctly stop/escalate when policy or safety constraints prevent completion. + +## Routing when reasoning is incomplete due budget + +When a run ends with `budget_exhausted`, use a deterministic policy instead of silent retries: + +1. **Narrow retry (same model)** + Retry with a smaller objective slice (single sub-problem) and explicit compact-output instruction. +2. **Model escalation (same route family)** + Route to a stronger instruction-following model for a bounded rescue pass. +3. **Tool-heavy route** + Shift from free-form reasoning to evidence/tool-driven route with minimal synthesis tokens. +4. **Human escalation** + If policy-critical or high-risk, require human approval/resolution path. + +Each retry should carry forward prior observations and a remaining-budget contract so failures are auditable rather than hidden. + +## Suggested evaluation matrix + +For each candidate model family, track at least: + +- completion-under-budget rate, +- correctness at fixed budget tiers, +- tool-call validity under low output caps, +- rate of `budget_exhausted` recoveries that succeed after one retry, +- safety/fail-safe trigger precision (true positives vs false positives). + +Open-CoT experiments under `docs/experiments/` can be used as baseline scaffolding for this matrix. diff --git a/harness/README.md b/harness/README.md index b3a00d5..ffffdb6 100644 --- a/harness/README.md +++ b/harness/README.md @@ -111,6 +111,17 @@ The budget tracker (RFC 0038) enforces: When any hard-enforced budget hits zero, the agent is force-stopped with `budget_exhausted` status and the trace records why. +### Streaming decode circuit breaker + +The harness now enforces token/safety limits during streamed decoding (not only after full responses): + +- **Preflight budget gate**: estimate prompt token cost before each model call; if insufficient remaining budget, stop before decode starts. +- **Mid-stream token breaker**: stream callbacks track emitted completion tokens and abort decode once the remaining completion allowance is exhausted. +- **Mid-stream safety breaker**: stream callbacks can stop runaway or unsafe output patterns and route to `fail_safe`. +- **FSM-first shutdown**: on breaker trip, the run is forced into terminal state (`budget_exhausted`, `fail_safe`, or `external_stop`) before any subsequent tool side effects. + +This keeps authority in the harness FSM even when a model ignores budget instructions. + ## Tool contracts Every tool is registered with a contract (RFC 0003 + RFC 0018): diff --git a/harness/src/agents/chat-agent.ts b/harness/src/agents/chat-agent.ts index 10387d8..3bd12c5 100644 --- a/harness/src/agents/chat-agent.ts +++ b/harness/src/agents/chat-agent.ts @@ -8,6 +8,7 @@ import type { AgentState } from "../core/state.js"; import { createAgentState } from "../core/state.js"; import { transition, forceStop } from "../core/transitions.js"; import { createBudgetTracker } from "../core/budget-tracker.js"; +import { callLLMWithCircuitBreaker } from "../core/llm-circuit-breaker.js"; import type { ToolRegistry } from "../core/tool-registry.js"; import { emitPlan, @@ -56,19 +57,19 @@ export async function runChatAgent( let lastResponse: LLMResponseWithTools | undefined; const callLLM = async (messages: LLMMessage[]): Promise => { - if (halted(state)) { - return { content: "", tokensUsed: 0, model: "noop", finishReason: "stop" }; - } - try { - const response = await backend.chat(messages); - budget.recordTokens(state, response.tokensUsed, `LLM (${backend.name})`); - lastResponse = response; - return response; - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - forceStop(state, "fail_safe", msg); - return { content: "", tokensUsed: 0, model: "error", finishReason: "stop" }; - } + const response = await callLLMWithCircuitBreaker({ + backend, + messages, + state, + budget, + llmReason: `LLM (${backend.name})`, + stream: true, + safety: { + maxDecodedChars: 12_000, + }, + }); + lastResponse = response; + return response; }; const end = (answer: string): Trace => { diff --git a/harness/src/agents/coder-agent.ts b/harness/src/agents/coder-agent.ts index a1f32f6..4e4545b 100644 --- a/harness/src/agents/coder-agent.ts +++ b/harness/src/agents/coder-agent.ts @@ -7,6 +7,7 @@ import type { AgentState } from "../core/state.js"; import { createAgentState } from "../core/state.js"; import { transition, forceStop } from "../core/transitions.js"; import { createBudgetTracker } from "../core/budget-tracker.js"; +import { callLLMWithCircuitBreaker } from "../core/llm-circuit-breaker.js"; import type { ToolRegistry } from "../core/tool-registry.js"; import { emitPlan, @@ -55,19 +56,19 @@ export async function runCoderAgent( let lastResponse: LLMResponseWithTools | undefined; const callLLM = async (messages: LLMMessage[]): Promise => { - if (halted(state)) { - return { content: "", tokensUsed: 0, model: "noop", finishReason: "stop" }; - } - try { - const response = await backend.chat(messages); - budget.recordTokens(state, response.tokensUsed, `LLM (${backend.name})`); - lastResponse = response; - return response; - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - forceStop(state, "fail_safe", msg); - return { content: "", tokensUsed: 0, model: "error", finishReason: "stop" }; - } + const response = await callLLMWithCircuitBreaker({ + backend, + messages, + state, + budget, + llmReason: `LLM (${backend.name})`, + stream: true, + safety: { + maxDecodedChars: 20_000, + }, + }); + lastResponse = response; + return response; }; const end = (answer: string): Trace => { diff --git a/harness/src/agents/governed-agent.ts b/harness/src/agents/governed-agent.ts index 54e31ed..ee469d8 100644 --- a/harness/src/agents/governed-agent.ts +++ b/harness/src/agents/governed-agent.ts @@ -9,6 +9,7 @@ import type { AgentState } from "../core/state.js"; import { createAgentState } from "../core/state.js"; import { transition, forceStop } from "../core/transitions.js"; import { createBudgetTracker } from "../core/budget-tracker.js"; +import { callLLMWithCircuitBreaker } from "../core/llm-circuit-breaker.js"; import type { ToolRegistry } from "../core/tool-registry.js"; import { PermissionManager } from "../governance/permission-manager.js"; import { PolicyEvaluator } from "../governance/policy-evaluator.js"; @@ -75,23 +76,19 @@ export async function runGovernedAgent( const callLLM = async ( messages: LLMMessage[], ): Promise => { - if (halted(state)) { - return { content: "", tokensUsed: 0, model: "noop", finishReason: "stop" }; - } - try { - const response = await config.backend.chat(messages); - budget.recordTokens( - state, - response.tokensUsed, - `LLM (${config.backend.name})`, - ); - lastResponse = response; - return response; - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - forceStop(state, "fail_safe", `LLM failure: ${msg}`); - return { content: "", tokensUsed: 0, model: "error", finishReason: "stop" }; - } + const response = await callLLMWithCircuitBreaker({ + backend: config.backend, + messages, + state, + budget, + llmReason: `LLM (${config.backend.name})`, + stream: true, + safety: { + maxDecodedChars: 16_000, + }, + }); + lastResponse = response; + return response; }; const finish = (answer: string): GovernedAgentResult => { diff --git a/harness/src/backends/index.ts b/harness/src/backends/index.ts index d6cf457..32ee8db 100644 --- a/harness/src/backends/index.ts +++ b/harness/src/backends/index.ts @@ -1,4 +1,13 @@ -export type { LLMBackend, LLMMessage, LLMResponse, LLMResponseWithTools, ToolCallRequest } from "./types.js"; +export type { + LLMBackend, + LLMChatOptions, + LLMFinishReason, + LLMMessage, + LLMResponse, + LLMResponseWithTools, + LLMStreamChunk, + ToolCallRequest, +} from "./types.js"; export { MockLLMBackend } from "./mock.js"; export { OpenAICompatBackend, configFromEnv } from "./openai-compat.js"; export type { OpenAICompatConfig } from "./openai-compat.js"; diff --git a/harness/src/backends/mock.ts b/harness/src/backends/mock.ts index 1511e4b..599080f 100644 --- a/harness/src/backends/mock.ts +++ b/harness/src/backends/mock.ts @@ -7,6 +7,7 @@ import type { LLMBackend, + LLMChatOptions, LLMMessage, LLMResponseWithTools, ToolCallRequest, @@ -31,7 +32,24 @@ export class MockLLMBackend implements LLMBackend { this.routes.push(route); } - async chat(messages: LLMMessage[]): Promise { + async chat( + messages: LLMMessage[], + options?: LLMChatOptions, + ): Promise { + if (options?.signal?.aborted) { + throw makeAbortError("Mock backend aborted before decode"); + } + + const response = this.resolveResponse(messages); + const capped = applyOutputCap(response, options?.maxOutputTokens); + + if (options?.stream && options.onChunk && capped.content) { + await emitStream(capped.content, options); + } + return capped; + } + + private resolveResponse(messages: LLMMessage[]): LLMResponseWithTools { const raw = [...messages].reverse().find((m) => m.role === "user")?.content ?? ""; @@ -90,6 +108,70 @@ export class MockLLMBackend implements LLMBackend { } } +function makeAbortError(message: string): Error { + const err = new Error(message); + err.name = "AbortError"; + return err; +} + +function estimateTokens(text: string): number { + if (!text) return 0; + return Math.max(1, Math.ceil(text.length / 4)); +} + +function applyOutputCap( + response: LLMResponseWithTools, + maxOutputTokens?: number, +): LLMResponseWithTools { + if ( + maxOutputTokens === undefined || + maxOutputTokens <= 0 || + response.content.length === 0 + ) { + return response; + } + + const completionEstimate = estimateTokens(response.content); + if (completionEstimate <= maxOutputTokens) { + return response; + } + + const maxChars = Math.max(1, maxOutputTokens * 4); + const truncated = response.content.slice(0, maxChars); + const promptEstimate = Math.max( + 0, + response.tokensUsed - completionEstimate, + ); + return { + ...response, + content: truncated, + tokensUsed: promptEstimate + estimateTokens(truncated), + finishReason: "length", + }; +} + +async function emitStream( + content: string, + options: LLMChatOptions, +): Promise { + const chunks = content.match(/\S+\s*/g) ?? [content]; + let aggregate = ""; + let completionTokensEstimated = 0; + + for (const piece of chunks) { + if (options.signal?.aborted) { + throw makeAbortError("Mock backend aborted during streamed decode"); + } + aggregate += piece; + completionTokensEstimated += estimateTokens(piece); + await options.onChunk?.({ + contentDelta: piece, + content: aggregate, + completionTokensEstimated, + }); + } +} + function getDefaultRoutes(): MockRoute[] { return [ { diff --git a/harness/src/backends/openai-compat.ts b/harness/src/backends/openai-compat.ts index 7788f11..925c830 100644 --- a/harness/src/backends/openai-compat.ts +++ b/harness/src/backends/openai-compat.ts @@ -12,6 +12,8 @@ import type { LLMBackend, + LLMChatOptions, + LLMFinishReason, LLMMessage, LLMResponseWithTools, ToolCallRequest, @@ -45,13 +47,34 @@ export class OpenAICompatBackend implements LLMBackend { this.name = `openai-compat:${this.config.model}`; } - async chat(messages: LLMMessage[]): Promise { + async chat( + messages: LLMMessage[], + options?: LLMChatOptions, + ): Promise { + if (options?.stream) { + return this.chatStreaming(messages, options); + } + return this.chatBuffered(messages, options); + } + + private resolveMaxTokens(limit?: number): number { + const configured = this.config.maxTokens ?? 4096; + if (limit === undefined || limit <= 0) { + return configured; + } + return Math.min(configured, limit); + } + + private async chatBuffered( + messages: LLMMessage[], + options?: LLMChatOptions, + ): Promise { const url = `${this.config.baseUrl}/chat/completions`; const body = { model: this.config.model, messages, temperature: this.config.temperature, - max_tokens: this.config.maxTokens, + max_tokens: this.resolveMaxTokens(options?.maxOutputTokens), }; const res = await fetch(url, { @@ -63,6 +86,7 @@ export class OpenAICompatBackend implements LLMBackend { : {}), }, body: JSON.stringify(body), + signal: options?.signal, }); if (!res.ok) { @@ -76,12 +100,7 @@ export class OpenAICompatBackend implements LLMBackend { const choice = data.choices?.[0]; if (!choice) throw new Error("No choices in OpenAI response"); - const toolCalls: ToolCallRequest[] = ( - choice.message.tool_calls ?? [] - ).map((tc) => ({ - toolName: tc.function.name, - arguments: JSON.parse(tc.function.arguments) as Record, - })); + const toolCalls = parseToolCalls(choice.message.tool_calls ?? []); const tokensUsed = (data.usage?.prompt_tokens ?? 0) + (data.usage?.completion_tokens ?? 0); @@ -90,8 +109,105 @@ export class OpenAICompatBackend implements LLMBackend { content: choice.message.content ?? "", tokensUsed, model: data.model ?? this.config.model, - finishReason: - choice.finish_reason === "tool_calls" ? "tool_calls" : "stop", + finishReason: normalizeFinishReason(choice.finish_reason), + toolCalls: toolCalls.length > 0 ? toolCalls : undefined, + }; + } + + private async chatStreaming( + messages: LLMMessage[], + options: LLMChatOptions, + ): Promise { + const url = `${this.config.baseUrl}/chat/completions`; + const body = { + model: this.config.model, + messages, + temperature: this.config.temperature, + max_tokens: this.resolveMaxTokens(options.maxOutputTokens), + stream: true, + stream_options: { include_usage: true }, + }; + + const res = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(this.config.apiKey + ? { Authorization: `Bearer ${this.config.apiKey}` } + : {}), + }, + body: JSON.stringify(body), + signal: options.signal, + }); + + if (!res.ok) { + const text = await res.text(); + throw new Error( + `OpenAI API error ${res.status}: ${text.slice(0, 500)}`, + ); + } + if (!res.body) { + throw new Error("OpenAI streaming response has no body"); + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + const toolCallsByIndex = new Map(); + let content = ""; + let completionTokensEstimated = 0; + let finishReason: LLMFinishReason = "stop"; + let model = this.config.model; + let promptTokens = 0; + let completionTokens = 0; + let done = false; + let buffer = ""; + + while (!done) { + const { value, done: streamDone } = await reader.read(); + buffer += streamDone + ? decoder.decode() + : decoder.decode(value, { stream: true }); + buffer = buffer.replace(/\r\n/g, "\n"); + + const drained = await drainSseBuffer({ + buffer, + flushTrailing: streamDone, + toolCallsByIndex, + context: { + content, + completionTokensEstimated, + finishReason, + model, + promptTokens, + completionTokens, + }, + options, + }); + buffer = drained.buffer; + done = drained.done; + content = drained.context.content; + completionTokensEstimated = drained.context.completionTokensEstimated; + finishReason = drained.context.finishReason; + model = drained.context.model; + promptTokens = drained.context.promptTokens; + completionTokens = drained.context.completionTokens; + + if (streamDone) { + break; + } + } + + const toolCalls = finalizeStreamToolCalls(toolCallsByIndex); + const tokensUsed = + promptTokens + completionTokens > 0 + ? promptTokens + completionTokens + : estimateMessagesTokens(messages) + completionTokensEstimated; + + return { + content, + tokensUsed, + model, + finishReason, toolCalls: toolCalls.length > 0 ? toolCalls : undefined, }; } @@ -99,13 +215,23 @@ export class OpenAICompatBackend implements LLMBackend { interface OpenAIChatResponse { choices?: Array<{ - message: { + message: OpenAIMessage; + finish_reason: string; + }>; + model?: string; + usage?: { + prompt_tokens?: number; + completion_tokens?: number; + }; +} + +interface OpenAIStreamChunk { + choices?: Array<{ + delta?: { content?: string; - tool_calls?: Array<{ - function: { name: string; arguments: string }; - }>; + tool_calls?: Array; }; - finish_reason: string; + finish_reason?: string | null; }>; model?: string; usage?: { @@ -113,3 +239,206 @@ interface OpenAIChatResponse { completion_tokens?: number; }; } + +interface OpenAIToolCallDelta { + index?: number; + id?: string; + function?: { + name?: string; + arguments?: string; + }; +} + +interface OpenAIFullToolCall { + function: { name: string; arguments: string }; +} + +interface OpenAIMessage { + content?: string; + tool_calls?: OpenAIFullToolCall[]; +} + +interface StreamToolAccumulator { + name?: string; + arguments: string; +} + +interface StreamContext { + content: string; + completionTokensEstimated: number; + finishReason: LLMFinishReason; + model: string; + promptTokens: number; + completionTokens: number; + done?: boolean; +} + +function estimateTextTokens(text: string): number { + if (!text) return 0; + return Math.max(1, Math.ceil(text.length / 4)); +} + +function estimateMessagesTokens(messages: LLMMessage[]): number { + let total = 0; + for (const msg of messages) { + total += estimateTextTokens(msg.content) + 4; + } + return total; +} + +function normalizeFinishReason(reason: string | null | undefined): LLMFinishReason { + if (reason === "tool_calls") return "tool_calls"; + if (reason === "length") return "length"; + if (reason === "cancelled") return "cancelled"; + return "stop"; +} + +function parseToolCalls(calls: OpenAIFullToolCall[]): ToolCallRequest[] { + return calls.map((tc) => ({ + toolName: tc.function.name, + arguments: parseToolArguments(tc.function.arguments), + })); +} + +function parseToolArguments(raw: string): Record { + if (!raw.trim()) { + return {}; + } + return JSON.parse(raw) as Record; +} + +function extractSseData(rawEvent: string): string | null { + const dataLines = rawEvent + .split(/\r?\n/) + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()); + if (dataLines.length === 0) { + return null; + } + return dataLines.join("\n"); +} + +function accumulateToolDeltas( + toolCallsByIndex: Map, + deltas: OpenAIToolCallDelta[], +): void { + for (const delta of deltas) { + const index = delta.index ?? 0; + const curr = toolCallsByIndex.get(index) ?? { arguments: "" }; + if (delta.function?.name) { + curr.name = delta.function.name; + } + if (delta.function?.arguments) { + curr.arguments += delta.function.arguments; + } + toolCallsByIndex.set(index, curr); + } +} + +function finalizeStreamToolCalls( + toolCallsByIndex: Map, +): ToolCallRequest[] { + return [...toolCallsByIndex.entries()] + .sort(([a], [b]) => a - b) + .map(([, acc]) => { + if (!acc.name) { + throw new Error("Streaming tool call missing function name"); + } + return { + toolName: acc.name, + arguments: parseToolArguments(acc.arguments), + }; + }); +} + +async function drainSseBuffer(args: { + buffer: string; + flushTrailing: boolean; + toolCallsByIndex: Map; + context: StreamContext; + options: LLMChatOptions; +}): Promise<{ buffer: string; context: StreamContext; done: boolean }> { + let buffer = args.buffer; + let context = args.context; + let done = false; + + let boundary = buffer.indexOf("\n\n"); + while (boundary !== -1) { + const rawEvent = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + boundary = buffer.indexOf("\n\n"); + context = await consumeSseEvent( + rawEvent, + args.toolCallsByIndex, + context, + args.options, + ); + if (context.done) { + done = true; + return { buffer: "", context, done }; + } + } + + if (args.flushTrailing && buffer.trim().length > 0) { + context = await consumeSseEvent( + buffer, + args.toolCallsByIndex, + context, + args.options, + ); + buffer = ""; + if (context.done) { + done = true; + } + } + + return { buffer, context, done }; +} + +async function consumeSseEvent( + rawEvent: string, + toolCallsByIndex: Map, + context: StreamContext, + options: LLMChatOptions, +): Promise { + const payload = extractSseData(rawEvent); + if (!payload) { + return context; + } + if (payload === "[DONE]") { + return { ...context, done: true }; + } + + const chunk = JSON.parse(payload) as OpenAIStreamChunk; + const next: StreamContext = { + ...context, + model: chunk.model ?? context.model, + promptTokens: chunk.usage?.prompt_tokens ?? context.promptTokens, + completionTokens: chunk.usage?.completion_tokens ?? context.completionTokens, + }; + + const choice = chunk.choices?.[0]; + if (!choice) { + return next; + } + + if (choice.delta?.content) { + next.content += choice.delta.content; + next.completionTokensEstimated += estimateTextTokens(choice.delta.content); + } + if (choice.delta?.tool_calls) { + accumulateToolDeltas(toolCallsByIndex, choice.delta.tool_calls); + } + if (choice.finish_reason) { + next.finishReason = normalizeFinishReason(choice.finish_reason); + } + + await options.onChunk?.({ + contentDelta: choice.delta?.content, + content: next.content, + completionTokensEstimated: next.completionTokensEstimated, + finishReason: next.finishReason, + }); + + return next; +} diff --git a/harness/src/backends/types.ts b/harness/src/backends/types.ts index ed4f266..d9f2ad6 100644 --- a/harness/src/backends/types.ts +++ b/harness/src/backends/types.ts @@ -10,11 +10,13 @@ export interface LLMMessage { content: string; } +export type LLMFinishReason = "stop" | "length" | "tool_calls" | "cancelled"; + export interface LLMResponse { content: string; tokensUsed: number; model: string; - finishReason: "stop" | "length" | "tool_calls"; + finishReason: LLMFinishReason; } export interface ToolCallRequest { @@ -26,7 +28,50 @@ export interface LLMResponseWithTools extends LLMResponse { toolCalls?: ToolCallRequest[]; } +export interface LLMStreamChunk { + /** + * New text emitted by this chunk. + */ + contentDelta?: string; + /** + * Aggregated text emitted so far in this response. + */ + content: string; + /** + * Running estimate of completion tokens emitted so far. + */ + completionTokensEstimated: number; + /** + * Optional finish reason when the provider emits it mid-stream. + */ + finishReason?: LLMFinishReason; +} + +export interface LLMChatOptions { + /** + * Request streamed decoding. Backends MAY ignore this and fall back to + * buffered responses, but OpenAI-compatible backend supports it. + */ + stream?: boolean; + /** + * Max completion tokens for this request, typically derived from remaining + * runtime budget. + */ + maxOutputTokens?: number; + /** + * Abort signal for circuit-breakers (budget/safety/external stop). + */ + signal?: AbortSignal; + /** + * Stream callback invoked for each content/tool delta. + */ + onChunk?: (chunk: LLMStreamChunk) => void | Promise; +} + export interface LLMBackend { - chat(messages: LLMMessage[]): Promise; + chat( + messages: LLMMessage[], + options?: LLMChatOptions, + ): Promise; readonly name: string; } diff --git a/harness/src/core/index.ts b/harness/src/core/index.ts index aa585a5..dc1e208 100644 --- a/harness/src/core/index.ts +++ b/harness/src/core/index.ts @@ -32,3 +32,9 @@ export { validateTermination, } from "./validator.js"; export type { ValidationResult } from "./validator.js"; +export { + callLLMWithCircuitBreaker, + estimateMessageTokens, + estimateTextTokens, +} from "./llm-circuit-breaker.js"; +export type { CircuitBreakerOptions, StreamSafetyConfig } from "./llm-circuit-breaker.js"; diff --git a/harness/src/core/llm-circuit-breaker.ts b/harness/src/core/llm-circuit-breaker.ts new file mode 100644 index 0000000..f1b3ce0 --- /dev/null +++ b/harness/src/core/llm-circuit-breaker.ts @@ -0,0 +1,192 @@ +import type { + LLMBackend, + LLMMessage, + LLMResponseWithTools, + LLMStreamChunk, +} from "../backends/types.js"; +import type { BudgetTracker } from "./budget-tracker.js"; +import type { AgentState } from "./state.js"; +import { forceStop } from "./transitions.js"; +import type { CompletionStatus } from "../schemas/audit-envelope.js"; + +const NOOP_RESPONSE: LLMResponseWithTools = { + content: "", + tokensUsed: 0, + model: "noop", + finishReason: "stop", +}; + +const DEFAULT_MAX_DECODED_CHARS = 16_000; + +export interface StreamSafetyConfig { + /** + * Hard ceiling on streamed decoded characters to prevent runaway output. + */ + maxDecodedChars: number; + /** + * Optional denylist for decoded output. If any pattern matches, decoding is + * interrupted and the run enters fail_safe. + */ + blockedPatterns: RegExp[]; +} + +export interface CircuitBreakerOptions { + backend: LLMBackend; + messages: LLMMessage[]; + state: AgentState; + budget: BudgetTracker; + llmReason?: string; + stream?: boolean; + safety?: Partial; +} + +function isAbortError(err: unknown): boolean { + return ( + (err instanceof DOMException && err.name === "AbortError") || + (err instanceof Error && err.name === "AbortError") + ); +} + +function abortWithReason(controller: AbortController, reason: string): void { + if (!controller.signal.aborted) { + controller.abort(reason); + } +} + +export function estimateTextTokens(text: string): number { + if (!text) return 0; + // Deliberately simple, provider-agnostic estimate (good enough for guardrails). + return Math.max(1, Math.ceil(text.length / 4)); +} + +export function estimateMessageTokens(messages: LLMMessage[]): number { + let total = 0; + for (const msg of messages) { + total += estimateTextTokens(msg.content) + 4; + } + return total; +} + +function toNoopErrorResponse(): LLMResponseWithTools { + return { + content: "", + tokensUsed: 0, + model: "error", + finishReason: "stop", + }; +} + +function normalizeSafetyConfig( + partial?: Partial, +): StreamSafetyConfig { + return { + maxDecodedChars: partial?.maxDecodedChars ?? DEFAULT_MAX_DECODED_CHARS, + blockedPatterns: partial?.blockedPatterns ?? [], + }; +} + +export async function callLLMWithCircuitBreaker( + options: CircuitBreakerOptions, +): Promise { + const { backend, messages, state, budget } = options; + if (state.phase === "audit_seal") { + return NOOP_RESPONSE; + } + + const llmReason = options.llmReason ?? `LLM (${backend.name})`; + const hardBudget = state.budgetPolicy.enforcement === "hard"; + const promptEstimate = estimateMessageTokens(messages); + const reserve = 4; + const completionBudget = Math.max( + 0, + state.budget.tokensRemaining - promptEstimate - reserve, + ); + const safety = normalizeSafetyConfig(options.safety); + const requestOutputCap = + hardBudget && completionBudget > 0 ? completionBudget : undefined; + + if (hardBudget && completionBudget <= 0) { + forceStop( + state, + "budget_exhausted", + `Insufficient token budget before decode (remaining=${state.budget.tokensRemaining}, prompt_estimate=${promptEstimate})`, + ); + return toNoopErrorResponse(); + } + + const controller = new AbortController(); + let streamedText = ""; + let streamedCompletionEstimate = 0; + let localStopStatus: CompletionStatus | null = null; + let localStopReason = ""; + + const onChunk = async (chunk: LLMStreamChunk): Promise => { + const delta = chunk.contentDelta ?? ""; + if (delta.length > 0) { + streamedText += delta; + } else if (chunk.content.length > streamedText.length) { + streamedText = chunk.content; + } + + streamedCompletionEstimate = Math.max( + streamedCompletionEstimate, + chunk.completionTokensEstimated, + ); + + if (hardBudget && streamedCompletionEstimate >= completionBudget) { + localStopStatus = "budget_exhausted"; + localStopReason = `Token budget exhausted mid-decode (completion_estimate=${streamedCompletionEstimate}, allowance=${completionBudget})`; + abortWithReason(controller, localStopReason); + return; + } + + if (streamedText.length > safety.maxDecodedChars) { + localStopStatus = "fail_safe"; + localStopReason = `Safety circuit breaker: decoded output exceeded ${safety.maxDecodedChars} characters`; + abortWithReason(controller, localStopReason); + return; + } + + for (const pattern of safety.blockedPatterns) { + if (pattern.test(streamedText)) { + localStopStatus = "fail_safe"; + localStopReason = `Safety circuit breaker matched blocked output pattern: ${pattern}`; + abortWithReason(controller, localStopReason); + return; + } + } + }; + + try { + const response = await backend.chat(messages, { + stream: options.stream ?? true, + maxOutputTokens: requestOutputCap, + signal: controller.signal, + onChunk: options.stream === false ? undefined : onChunk, + }); + budget.recordTokens(state, response.tokensUsed, llmReason); + return response; + } catch (err) { + if (localStopStatus) { + forceStop(state, localStopStatus, localStopReason); + const estimatedTotal = promptEstimate + streamedCompletionEstimate; + if (estimatedTotal > 0) { + budget.recordTokens( + state, + estimatedTotal, + `${llmReason} (stream-estimated)`, + ); + } + return toNoopErrorResponse(); + } + + if (isAbortError(err)) { + forceStop(state, "external_stop", "LLM stream aborted"); + return toNoopErrorResponse(); + } + + const msg = err instanceof Error ? err.message : String(err); + forceStop(state, "fail_safe", `LLM failure: ${msg}`); + return toNoopErrorResponse(); + } +} diff --git a/harness/tests/llm-circuit-breaker.test.ts b/harness/tests/llm-circuit-breaker.test.ts new file mode 100644 index 0000000..6fbb9a7 --- /dev/null +++ b/harness/tests/llm-circuit-breaker.test.ts @@ -0,0 +1,133 @@ +import { describe, it, expect } from "vitest"; +import { MockLLMBackend } from "../src/backends/mock.js"; +import type { BudgetPolicy } from "../src/schemas/budget.js"; +import { createAgentState } from "../src/core/state.js"; +import { createBudgetTracker } from "../src/core/budget-tracker.js"; +import { callLLMWithCircuitBreaker } from "../src/core/llm-circuit-breaker.js"; + +function makePolicy(maxTokens: number): BudgetPolicy { + return { + maxTokens, + maxCost: 10, + maxSteps: 50, + maxToolCalls: 20, + maxRetries: 5, + enforcement: "hard", + }; +} + +describe("callLLMWithCircuitBreaker", () => { + it("stops before decode when prompt estimate exceeds remaining budget", async () => { + const state = createAgentState({ + objective: "tiny budget", + budgetPolicy: makePolicy(8), + }); + const budget = createBudgetTracker(); + + const response = await callLLMWithCircuitBreaker({ + backend: new MockLLMBackend(), + state, + budget, + messages: [ + { + role: "user", + content: "This prompt is intentionally long enough to exceed tiny budget.", + }, + ], + }); + + expect(response.content).toBe(""); + expect(state.completionStatus).toBe("budget_exhausted"); + expect(state.phase).toBe("audit_seal"); + }); + + it("interrupts streamed decoding when completion allowance is exhausted", async () => { + const state = createAgentState({ + objective: "mid-stream budget stop", + budgetPolicy: makePolicy(90), + }); + const budget = createBudgetTracker(); + const backend = new MockLLMBackend([ + { + pattern: /stream stop test/i, + response: + "token ".repeat(400), + tokensUsed: 500, + }, + ]); + + const response = await callLLMWithCircuitBreaker({ + backend, + state, + budget, + messages: [ + { role: "user", content: "stream stop test" }, + ], + stream: true, + }); + + expect(response.model).toBe("error"); + expect(state.completionStatus).toBe("budget_exhausted"); + expect(state.phase).toBe("audit_seal"); + expect(state.budget.tokensUsed).toBeGreaterThan(0); + }); + + it("enters fail_safe when streamed output exceeds safety ceiling", async () => { + const state = createAgentState({ + objective: "safety ceiling stop", + budgetPolicy: makePolicy(500), + }); + const budget = createBudgetTracker(); + const backend = new MockLLMBackend([ + { + pattern: /safety test/i, + response: "safe ".repeat(200), + tokensUsed: 300, + }, + ]); + + await callLLMWithCircuitBreaker({ + backend, + state, + budget, + messages: [{ role: "user", content: "safety test" }], + stream: true, + safety: { + maxDecodedChars: 60, + }, + }); + + expect(state.completionStatus).toBe("fail_safe"); + expect(state.phase).toBe("audit_seal"); + }); + + it("enters fail_safe when blocked stream pattern appears", async () => { + const state = createAgentState({ + objective: "pattern stop", + budgetPolicy: makePolicy(500), + }); + const budget = createBudgetTracker(); + const backend = new MockLLMBackend([ + { + pattern: /pattern test/i, + response: "normal output ... -----BEGIN PRIVATE KEY----- ... trailing", + tokensUsed: 200, + }, + ]); + + await callLLMWithCircuitBreaker({ + backend, + state, + budget, + messages: [{ role: "user", content: "pattern test" }], + stream: true, + safety: { + maxDecodedChars: 5000, + blockedPatterns: [/-----BEGIN PRIVATE KEY-----/], + }, + }); + + expect(state.completionStatus).toBe("fail_safe"); + expect(state.phase).toBe("audit_seal"); + }); +}); diff --git a/harness/tests/openai-compat-streaming.test.ts b/harness/tests/openai-compat-streaming.test.ts new file mode 100644 index 0000000..41d8199 --- /dev/null +++ b/harness/tests/openai-compat-streaming.test.ts @@ -0,0 +1,164 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { OpenAICompatBackend } from "../src/backends/openai-compat.js"; +import type { LLMMessage } from "../src/backends/types.js"; + +const messages: LLMMessage[] = [{ role: "user", content: "hello" }]; + +function makeSseResponse(events: unknown[]): Response { + const payload = + events.map((event) => `data: ${JSON.stringify(event)}\n\n`).join("") + + "data: [DONE]\n\n"; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(payload)); + controller.close(); + }, + }); + return new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }); +} + +afterEach(() => { + vi.restoreAllMocks(); + vi.unstubAllGlobals(); +}); + +describe("OpenAICompatBackend streaming", () => { + it("parses streamed text deltas and usage accounting", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue( + makeSseResponse([ + { choices: [{ delta: { content: "Hel" }, finish_reason: null }] }, + { + choices: [{ delta: { content: "lo" }, finish_reason: "stop" }], + usage: { prompt_tokens: 5, completion_tokens: 2 }, + }, + ]), + ); + vi.stubGlobal("fetch", fetchMock); + + const backend = new OpenAICompatBackend({ + baseUrl: "https://example.test/v1", + apiKey: "", + model: "stream-model", + maxTokens: 200, + }); + + const chunks: string[] = []; + const resp = await backend.chat(messages, { + stream: true, + onChunk: (chunk) => { + chunks.push(chunk.content); + }, + }); + + expect(resp.content).toBe("Hello"); + expect(resp.finishReason).toBe("stop"); + expect(resp.tokensUsed).toBe(7); + expect(chunks.length).toBeGreaterThan(0); + + const [, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + const body = JSON.parse(String(init.body)); + expect(body.stream).toBe(true); + expect(body.stream_options.include_usage).toBe(true); + }); + + it("reconstructs streamed tool calls with split arguments", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue( + makeSseResponse([ + { + choices: [ + { + delta: { + tool_calls: [ + { + index: 0, + function: { + name: "search", + arguments: "{\"query\":\"Tok", + }, + }, + ], + }, + finish_reason: null, + }, + ], + }, + { + choices: [ + { + delta: { + tool_calls: [ + { + index: 0, + function: { arguments: "yo\"}" }, + }, + ], + }, + finish_reason: "tool_calls", + }, + ], + usage: { prompt_tokens: 11, completion_tokens: 4 }, + }, + ]), + ); + vi.stubGlobal("fetch", fetchMock); + + const backend = new OpenAICompatBackend({ + baseUrl: "https://example.test/v1", + apiKey: "", + model: "stream-tool-model", + maxTokens: 200, + }); + + const resp = await backend.chat(messages, { stream: true }); + + expect(resp.finishReason).toBe("tool_calls"); + expect(resp.toolCalls).toEqual([ + { + toolName: "search", + arguments: { query: "Tokyo" }, + }, + ]); + }); + + it("applies per-call output cap to max_tokens", async () => { + const fetchMock = vi.fn().mockResolvedValue( + new Response( + JSON.stringify({ + model: "buffered-model", + choices: [ + { + message: { content: "ok" }, + finish_reason: "stop", + }, + ], + usage: { prompt_tokens: 2, completion_tokens: 1 }, + }), + { + status: 200, + headers: { "Content-Type": "application/json" }, + }, + ), + ); + vi.stubGlobal("fetch", fetchMock); + + const backend = new OpenAICompatBackend({ + baseUrl: "https://example.test/v1", + apiKey: "", + model: "buffered-model", + maxTokens: 4096, + }); + + await backend.chat(messages, { maxOutputTokens: 32 }); + + const [, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + const body = JSON.parse(String(init.body)); + expect(body.max_tokens).toBe(32); + }); +});