One wrapper between you and runaway execution.
Minimal circuit breaker for AI agents. Wrap any supported agent and pick a mode — the breaker cuts the run short once provider-reported usage crosses a limit, and (optionally) refuses an oversized prompt before it is even sent.
This is the Python port of
@monetisebg/circuit-breaker
(the TypeScript/npm package).
- Zero-config: defaults work out of the box.
- Two modes, pick one:
budget-guard(token caps) andloop-killer(state-repeat detection). - Post-hoc enforcement by default: token tripping happens after each call
or turn boundary, so the call that crosses the limit still counts. Use the
optional
estimate_input_tokenspreflight (see below) to reject oversized initial inputs before any provider work happens. - Visible: emits
CircuitBreakerEvents as the run progresses. - Typed: raises a
CircuitBreakerError, or routes through youron_triphandler. - Optional dependencies — only install the framework you actually use.
- No bundled tokenizer: bring your own (
tiktoken,transformers, provider SDK).
Shipped adapters: LangChain, OpenAI Agents SDK, Claude Agent SDK, LangGraph Platform SDK. The core is framework-agnostic; rolling your own adapter is a few lines.
Requires Python ≥ 3.10.
pip install monetise-circuit-breaker
# plus the framework you use (only the one you need):
pip install "monetise-circuit-breaker[langchain]"
pip install "monetise-circuit-breaker[openai-agents]"
pip install "monetise-circuit-breaker[claude-agent-sdk]"
pip install "monetise-circuit-breaker[langgraph-sdk]"from monetise_circuit_breaker.openai_agents import with_circuit_breaker
safe_agent = with_circuit_breaker(agent) # defaults: 10k input + 10k output
await safe_agent.run("Analyze this dataset")budget-guard caps input and output tokens independently. Default limits:
max_input_token = 10_000, max_output_token = 10_000. Token usage is read
from each provider response, so the breaker trips on the next call/turn
after either bucket is exceeded — the call that pushed the bucket over the
limit still counts. To reject an oversized first prompt before it is sent, pass
an optional estimate_input_tokens preflight (next section).
with_circuit_breaker(
agent,
mode="budget-guard", # optional — this is the default
max_input_token=50_000,
max_output_token=20_000,
)import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o")
with_circuit_breaker(
agent,
max_input_token=50_000,
# input is the wrapper's call argument (per adapter)
estimate_input_tokens=lambda input: (
len(enc.encode(input)) if isinstance(input, str) else None
),
)If the estimate exceeds max_input_token the wrapper raises
CircuitBreakerError with reason == "max_input_tokens" before the
underlying runnable / runner / query is called. Return None to skip the
check for that invocation (e.g. when you can't tokenize the input shape). This
is opt-in — without an estimator the wrapper behaves as before. No tokenizer is
bundled.
with_circuit_breaker(
agent,
mode="loop-killer",
max_retries=3, # default
detect_repeated_state=True, # default — hashes each step's state
)With detect_repeated_state=True (default), the breaker hashes each step's
state (the latest message / turn input) and trips when any single state recurs
more than max_retries times. Set detect_repeated_state=False to fall back
to a plain iteration cap.
The breaker emits events you can log, surface in your UI, or pipe to your observability stack.
def handle(event): # event: CircuitBreakerEvent
print(event)
with_circuit_breaker(agent, mode="loop-killer", max_retries=2, on_event=handle)CircuitBreakerEvent shapes (frozen dataclasses):
| Event | When | Modes |
|---|---|---|
RetryEvent(type="retry", retries: int) |
A state recurred (detect_repeated_state=True) or each iteration past the first (detect_repeated_state=False) |
loop-killer |
StopEvent(type="stop", reason: StopReason, saved: int) |
The breaker tripped | both |
saved is signed limit - usage: positive means headroom that won't be
spent, negative means the call that pushed us over the limit still counted.
StopReason is one of "max_input_tokens", "max_output_tokens",
"max_retries", "repeated_state".
Provide on_trip to suppress the raise and return a fallback value:
safe = with_circuit_breaker(
agent,
max_input_token=50_000,
max_output_token=20_000,
on_trip=lambda ctx: {
"output": "Sorry, I had to stop early.",
"reason": ctx.reason,
"metrics": ctx.metrics,
},
)on_trip receives a TripContext (frozen dataclass) and may be sync or async:
@dataclass(frozen=True)
class TripContext:
reason: StopReason
mode: Mode # "budget-guard" | "loop-killer"
metrics: Metrics # iterations, retries, tokens
limits: ResolvedLimits # the limits actually in force
saved: int
message: strfrom monetise_circuit_breaker.langchain import with_circuit_breaker
safe_executor = with_circuit_breaker(
executor, # any LangChain Runnable (e.g. AgentExecutor)
max_input_token=50_000,
max_output_token=20_000,
)
await safe_executor.ainvoke({"input": "..."}) # or safe_executor.invoke(...)Iterations are counted on on_llm_start / on_chat_model_start. Token usage
is read from on_llm_end with provider-agnostic extraction (OpenAI
token_usage, Anthropic usage, the newer usage_metadata). The handler sets
raise_error = True, so a trip propagates out of LangChain's callback dispatch
instead of being swallowed.
You can also attach the handler directly:
from monetise_circuit_breaker.langchain import CircuitBreakerCallback
breaker = CircuitBreakerCallback(max_input_token=50_000)
await runnable.ainvoke(payload, config={"callbacks": [breaker]})from agents import Agent
from monetise_circuit_breaker.openai_agents import with_circuit_breaker
agent = Agent(name="Assistant", instructions="...", tools=[...])
safe_agent = with_circuit_breaker(agent, mode="loop-killer", max_retries=3)
await safe_agent.run("Hello")Iterations are counted on each LLM call (one per turn); the most recent input
item is hashed for loop detection. Tokens are read from the cumulative
RunContext.usage snapshot at each turn boundary. Because the SDK awaits
its lifecycle hooks, a trip raises CircuitBreakerError straight out of
Runner.run — no AbortSignal plumbing needed. Extra keyword arguments to
run(...) are forwarded to Runner.run; a caller-supplied hooks object is
composed with the breaker's, and run_config (passed at wrap time or per call)
is forwarded too.
Streaming is not yet supported. Use the core
CircuitBreakerdirectly if you need it.
from claude_agent_sdk import query
from monetise_circuit_breaker.claude_agent_sdk import with_circuit_breaker
safe_query = with_circuit_breaker(
query,
max_input_token=50_000,
max_output_token=20_000,
)
async for message in safe_query(prompt="Analyze this repo"):
... # messages stream through untouchedThe wrapper takes the SDK's query function and returns a drop-in replacement
with the same call signature. It's itself an async generator — messages stream
through unchanged while the breaker watches them.
Iterations are counted on each AssistantMessage (one per turn); its content
blocks are hashed for loop-killer detection. Tokens are read from each
assistant message's usage (input counts input_tokens plus cache
read/creation tokens). When a limit is hit, iteration stops and the underlying
generator is closed. estimate_input_tokens receives the prompt.
With on_trip, the callback's return value is yielded as the generator's final
item instead of raising.
For graphs deployed to LangGraph Platform and driven through the remote
langgraph-sdk client. (For an in-process langgraph graph, use the
LangChain adapter — a compiled graph is a Runnable and
propagates callbacks.)
from langgraph_sdk import get_client
from monetise_circuit_breaker.langgraph_sdk import with_circuit_breaker
client = get_client(url="http://localhost:2024")
runs = with_circuit_breaker(
client.runs,
max_input_token=50_000,
max_output_token=20_000,
)
async for chunk in runs.stream(
thread_id,
"agent",
input={"messages": [{"role": "user", "content": "Analyze this repo"}]},
stream_mode="updates",
):
... # chunks stream through untouchedThe wrapper takes client.runs and returns an object with the same
stream(thread_id, assistant_id, **payload) signature.
Because the graph executes server-side, the breaker is driven off the events
stream mode — the only mode that reports both per-LLM-call boundaries and token
usage. The wrapper forces events into the run's stream_mode; if you
didn't request it, those injected chunks are consumed internally and never
yielded, so your stream is unchanged. Iterations are counted on each
on_chat_model_start; tokens are read from each on_chat_model_end's
usage_metadata. For loop-killer, the latest input message is hashed.
On a trip the wrapper stops the local stream and calls
client.runs.cancel(...) to stop the run server-side (the run id is taken from
the metadata event) — closing the connection alone would leave the graph
running. estimate_input_tokens receives
{"thread_id", "assistant_id", "payload"}.
With on_trip, the callback's return value is yielded as the generator's final
item instead of raising.
When a limit is reached the breaker logs (via the standard logging module
under the monetise_circuit_breaker logger) and raises:
[circuit-breaker] Agent stopped: input token budget exceeded (10120/10000; iterations: 8).
Pass silent=True to suppress the log, or logger=lambda msg, ctx: ... to
send it elsewhere.
| Field | Mode | Type | Default | Description |
|---|---|---|---|---|
mode |
both | Mode |
"budget-guard" |
"budget-guard" or "loop-killer". |
max_input_token |
budget-guard | int ≥ 1 |
10_000 |
Max aggregate input tokens before trip (post-hoc). |
max_output_token |
budget-guard | int ≥ 1 |
10_000 |
Max aggregate output tokens before trip (post-hoc). |
estimate_input_tokens |
budget-guard | (input) -> int | None |
— | Preflight estimator; trips before the call when the estimate exceeds max_input_token. |
max_retries |
loop-killer | int ≥ 1 |
3 |
Max times the same state may recur (or, with detection off, raw iterations). |
detect_repeated_state |
loop-killer | bool |
True |
Hash each step's state for loop detection. |
silent |
both | bool |
False |
Suppress the default trip log. |
logger |
both | (message, context) -> None |
default logger | Custom trip logger. Ignored when silent=True. |
on_event |
both | EventListener |
— | Receives CircuitBreakerEvent updates. |
on_trip |
wrappers | OnTrip (sync or async) |
— | Suppress the raise and use the callback's return value instead. |
All numeric options are validated at construction. A wrong type raises
TypeError (a non-integer such as 1.5, NaN, or Infinity); a wrong
value of the right type raises ValueError (0 or a negative integer).
The core is framework-agnostic — use CircuitBreaker directly for any
framework not shipped here:
from monetise_circuit_breaker import CircuitBreaker, CircuitBreakerError
breaker = CircuitBreaker(max_input_token=50_000, max_output_token=20_000)
# on each new LLM call / agent turn:
breaker.record_iteration(state_key) # state_key summarises the step (loop-killer)
# on per-call usage:
breaker.add_tokens(input_delta, output_delta)
# or, when the framework exposes running totals:
breaker.set_token_snapshot(total_input, total_output)See AGENTS.md for the full adapter recipe.
We built Circuit Breaker to solve the immediate, visceral pain of runaway agent
costs and infinite loops. The API is intentionally minimal — budget-guard and
loop-killer — and the roadmap is driven by how you use (or fight) the tool in
the wild.
We especially want to hear from you if it almost fits, if you're building workarounds, or if your use case diverges from the defaults. Open an issue or share a snippet — your edge cases are our roadmap.
See AGENTS.md for the project layout, test commands, and the
recipe for adding a new framework adapter.
Apache-2.0 — © 2026 MonetiseBG