Skip to content

MonetiseBG/circuit-breaker-python

Repository files navigation

monetise-circuit-breaker

CI

One wrapper between you and runaway execution.

Minimal circuit breaker for AI agents. Wrap any supported agent and pick a mode — the breaker cuts the run short once provider-reported usage crosses a limit, and (optionally) refuses an oversized prompt before it is even sent.

This is the Python port of @monetisebg/circuit-breaker (the TypeScript/npm package).

  • Zero-config: defaults work out of the box.
  • Two modes, pick one: budget-guard (token caps) and loop-killer (state-repeat detection).
  • Post-hoc enforcement by default: token tripping happens after each call or turn boundary, so the call that crosses the limit still counts. Use the optional estimate_input_tokens preflight (see below) to reject oversized initial inputs before any provider work happens.
  • Visible: emits CircuitBreakerEvents as the run progresses.
  • Typed: raises a CircuitBreakerError, or routes through your on_trip handler.
  • Optional dependencies — only install the framework you actually use.
  • No bundled tokenizer: bring your own (tiktoken, transformers, provider SDK).

Shipped adapters: LangChain, OpenAI Agents SDK, Claude Agent SDK, LangGraph Platform SDK. The core is framework-agnostic; rolling your own adapter is a few lines.

Install

Requires Python ≥ 3.10.

pip install monetise-circuit-breaker
# plus the framework you use (only the one you need):
pip install "monetise-circuit-breaker[langchain]"
pip install "monetise-circuit-breaker[openai-agents]"
pip install "monetise-circuit-breaker[claude-agent-sdk]"
pip install "monetise-circuit-breaker[langgraph-sdk]"

Quick start (budget-guard, the default)

from monetise_circuit_breaker.openai_agents import with_circuit_breaker

safe_agent = with_circuit_breaker(agent)  # defaults: 10k input + 10k output

await safe_agent.run("Analyze this dataset")

budget-guard caps input and output tokens independently. Default limits: max_input_token = 10_000, max_output_token = 10_000. Token usage is read from each provider response, so the breaker trips on the next call/turn after either bucket is exceeded — the call that pushed the bucket over the limit still counts. To reject an oversized first prompt before it is sent, pass an optional estimate_input_tokens preflight (next section).

with_circuit_breaker(
    agent,
    mode="budget-guard",      # optional — this is the default
    max_input_token=50_000,
    max_output_token=20_000,
)

Preflight — estimate_input_tokens

import tiktoken

enc = tiktoken.encoding_for_model("gpt-4o")

with_circuit_breaker(
    agent,
    max_input_token=50_000,
    # input is the wrapper's call argument (per adapter)
    estimate_input_tokens=lambda input: (
        len(enc.encode(input)) if isinstance(input, str) else None
    ),
)

If the estimate exceeds max_input_token the wrapper raises CircuitBreakerError with reason == "max_input_tokens" before the underlying runnable / runner / query is called. Return None to skip the check for that invocation (e.g. when you can't tokenize the input shape). This is opt-in — without an estimator the wrapper behaves as before. No tokenizer is bundled.

loop-killer mode

with_circuit_breaker(
    agent,
    mode="loop-killer",
    max_retries=3,               # default
    detect_repeated_state=True,  # default — hashes each step's state
)

With detect_repeated_state=True (default), the breaker hashes each step's state (the latest message / turn input) and trips when any single state recurs more than max_retries times. Set detect_repeated_state=False to fall back to a plain iteration cap.

Visibility — on_event

The breaker emits events you can log, surface in your UI, or pipe to your observability stack.

def handle(event):  # event: CircuitBreakerEvent
    print(event)

with_circuit_breaker(agent, mode="loop-killer", max_retries=2, on_event=handle)

CircuitBreakerEvent shapes (frozen dataclasses):

Event When Modes
RetryEvent(type="retry", retries: int) A state recurred (detect_repeated_state=True) or each iteration past the first (detect_repeated_state=False) loop-killer
StopEvent(type="stop", reason: StopReason, saved: int) The breaker tripped both

saved is signed limit - usage: positive means headroom that won't be spent, negative means the call that pushed us over the limit still counted.

StopReason is one of "max_input_tokens", "max_output_tokens", "max_retries", "repeated_state".

Graceful handling — on_trip

Provide on_trip to suppress the raise and return a fallback value:

safe = with_circuit_breaker(
    agent,
    max_input_token=50_000,
    max_output_token=20_000,
    on_trip=lambda ctx: {
        "output": "Sorry, I had to stop early.",
        "reason": ctx.reason,
        "metrics": ctx.metrics,
    },
)

on_trip receives a TripContext (frozen dataclass) and may be sync or async:

@dataclass(frozen=True)
class TripContext:
    reason: StopReason
    mode: Mode                  # "budget-guard" | "loop-killer"
    metrics: Metrics            # iterations, retries, tokens
    limits: ResolvedLimits      # the limits actually in force
    saved: int
    message: str

LangChain

from monetise_circuit_breaker.langchain import with_circuit_breaker

safe_executor = with_circuit_breaker(
    executor,                   # any LangChain Runnable (e.g. AgentExecutor)
    max_input_token=50_000,
    max_output_token=20_000,
)

await safe_executor.ainvoke({"input": "..."})   # or safe_executor.invoke(...)

Iterations are counted on on_llm_start / on_chat_model_start. Token usage is read from on_llm_end with provider-agnostic extraction (OpenAI token_usage, Anthropic usage, the newer usage_metadata). The handler sets raise_error = True, so a trip propagates out of LangChain's callback dispatch instead of being swallowed.

You can also attach the handler directly:

from monetise_circuit_breaker.langchain import CircuitBreakerCallback

breaker = CircuitBreakerCallback(max_input_token=50_000)
await runnable.ainvoke(payload, config={"callbacks": [breaker]})

OpenAI Agents SDK

from agents import Agent
from monetise_circuit_breaker.openai_agents import with_circuit_breaker

agent = Agent(name="Assistant", instructions="...", tools=[...])

safe_agent = with_circuit_breaker(agent, mode="loop-killer", max_retries=3)

await safe_agent.run("Hello")

Iterations are counted on each LLM call (one per turn); the most recent input item is hashed for loop detection. Tokens are read from the cumulative RunContext.usage snapshot at each turn boundary. Because the SDK awaits its lifecycle hooks, a trip raises CircuitBreakerError straight out of Runner.run — no AbortSignal plumbing needed. Extra keyword arguments to run(...) are forwarded to Runner.run; a caller-supplied hooks object is composed with the breaker's, and run_config (passed at wrap time or per call) is forwarded too.

Streaming is not yet supported. Use the core CircuitBreaker directly if you need it.

Claude Agent SDK

from claude_agent_sdk import query
from monetise_circuit_breaker.claude_agent_sdk import with_circuit_breaker

safe_query = with_circuit_breaker(
    query,
    max_input_token=50_000,
    max_output_token=20_000,
)

async for message in safe_query(prompt="Analyze this repo"):
    ...  # messages stream through untouched

The wrapper takes the SDK's query function and returns a drop-in replacement with the same call signature. It's itself an async generator — messages stream through unchanged while the breaker watches them.

Iterations are counted on each AssistantMessage (one per turn); its content blocks are hashed for loop-killer detection. Tokens are read from each assistant message's usage (input counts input_tokens plus cache read/creation tokens). When a limit is hit, iteration stops and the underlying generator is closed. estimate_input_tokens receives the prompt.

With on_trip, the callback's return value is yielded as the generator's final item instead of raising.

LangGraph Platform SDK

For graphs deployed to LangGraph Platform and driven through the remote langgraph-sdk client. (For an in-process langgraph graph, use the LangChain adapter — a compiled graph is a Runnable and propagates callbacks.)

from langgraph_sdk import get_client
from monetise_circuit_breaker.langgraph_sdk import with_circuit_breaker

client = get_client(url="http://localhost:2024")
runs = with_circuit_breaker(
    client.runs,
    max_input_token=50_000,
    max_output_token=20_000,
)

async for chunk in runs.stream(
    thread_id,
    "agent",
    input={"messages": [{"role": "user", "content": "Analyze this repo"}]},
    stream_mode="updates",
):
    ...  # chunks stream through untouched

The wrapper takes client.runs and returns an object with the same stream(thread_id, assistant_id, **payload) signature.

Because the graph executes server-side, the breaker is driven off the events stream mode — the only mode that reports both per-LLM-call boundaries and token usage. The wrapper forces events into the run's stream_mode; if you didn't request it, those injected chunks are consumed internally and never yielded, so your stream is unchanged. Iterations are counted on each on_chat_model_start; tokens are read from each on_chat_model_end's usage_metadata. For loop-killer, the latest input message is hashed.

On a trip the wrapper stops the local stream and calls client.runs.cancel(...) to stop the run server-side (the run id is taken from the metadata event) — closing the connection alone would leave the graph running. estimate_input_tokens receives {"thread_id", "assistant_id", "payload"}.

With on_trip, the callback's return value is yielded as the generator's final item instead of raising.

Trip output

When a limit is reached the breaker logs (via the standard logging module under the monetise_circuit_breaker logger) and raises:

[circuit-breaker] Agent stopped: input token budget exceeded (10120/10000; iterations: 8).

Pass silent=True to suppress the log, or logger=lambda msg, ctx: ... to send it elsewhere.

Options reference

Field Mode Type Default Description
mode both Mode "budget-guard" "budget-guard" or "loop-killer".
max_input_token budget-guard int ≥ 1 10_000 Max aggregate input tokens before trip (post-hoc).
max_output_token budget-guard int ≥ 1 10_000 Max aggregate output tokens before trip (post-hoc).
estimate_input_tokens budget-guard (input) -> int | None Preflight estimator; trips before the call when the estimate exceeds max_input_token.
max_retries loop-killer int ≥ 1 3 Max times the same state may recur (or, with detection off, raw iterations).
detect_repeated_state loop-killer bool True Hash each step's state for loop detection.
silent both bool False Suppress the default trip log.
logger both (message, context) -> None default logger Custom trip logger. Ignored when silent=True.
on_event both EventListener Receives CircuitBreakerEvent updates.
on_trip wrappers OnTrip (sync or async) Suppress the raise and use the callback's return value instead.

All numeric options are validated at construction. A wrong type raises TypeError (a non-integer such as 1.5, NaN, or Infinity); a wrong value of the right type raises ValueError (0 or a negative integer).

Rolling your own adapter

The core is framework-agnostic — use CircuitBreaker directly for any framework not shipped here:

from monetise_circuit_breaker import CircuitBreaker, CircuitBreakerError

breaker = CircuitBreaker(max_input_token=50_000, max_output_token=20_000)

# on each new LLM call / agent turn:
breaker.record_iteration(state_key)        # state_key summarises the step (loop-killer)
# on per-call usage:
breaker.add_tokens(input_delta, output_delta)
# or, when the framework exposes running totals:
breaker.set_token_snapshot(total_input, total_output)

See AGENTS.md for the full adapter recipe.

Contributing

We built Circuit Breaker to solve the immediate, visceral pain of runaway agent costs and infinite loops. The API is intentionally minimal — budget-guard and loop-killer — and the roadmap is driven by how you use (or fight) the tool in the wild.

We especially want to hear from you if it almost fits, if you're building workarounds, or if your use case diverges from the defaults. Open an issue or share a snippet — your edge cases are our roadmap.

See AGENTS.md for the project layout, test commands, and the recipe for adding a new framework adapter.

License

Apache-2.0 — © 2026 MonetiseBG

About

Runtime economic governance for AI agents. Circuit Breaker adds risk-managed execution, cost ceilings, and kill switches to autonomous AI workflows.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages