Skip to content

Commit 72361d4

Browse files
feat: add circuit breaker observability methods (#1176) (#118)
* feat: add circuit breaker observability methods (#1176) New SDK methods: get_circuit_breaker_status, get_circuit_breaker_history, get_circuit_breaker_config, update_circuit_breaker_config. * fix: add X-Org-ID header + handle null list responses Circuit breaker endpoints require X-Org-ID header. API returns null for empty arrays which Pydantic rejects; use `or []` fallback. * revert: remove global X-Org-ID, fix on platform side instead Circuit breaker handler will fall back to X-Tenant-ID when X-Org-ID is missing. Keep null list fix. * chore: add v4.2.0 changelog entry * style: apply ruff formatting * fix: satisfy mypy no-any-return for update_circuit_breaker_config
1 parent 83e728f commit 72361d4

5 files changed

Lines changed: 858 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ All notable changes to the AxonFlow Python SDK will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [4.2.0] - 2026-03-16
9+
10+
### Added
11+
12+
- `get_circuit_breaker_status()` — query active circuit breaker circuits and emergency stop state
13+
- `get_circuit_breaker_history(limit)` — retrieve circuit breaker trip/reset audit trail
14+
- `get_circuit_breaker_config(tenant_id)` — get effective circuit breaker config (global or tenant-specific)
15+
- `update_circuit_breaker_config(config)` — update per-tenant circuit breaker thresholds
16+
17+
---
18+
819
## [4.1.0] - 2026-03-14
920

1021
### Added

axonflow/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@
152152
BudgetStatus,
153153
CacheConfig,
154154
CancelPlanResponse,
155+
CircuitBreakerConfig,
156+
CircuitBreakerConfigUpdate,
157+
CircuitBreakerHistoryEntry,
158+
CircuitBreakerHistoryResponse,
159+
CircuitBreakerStatusResponse,
155160
ClientRequest,
156161
ClientResponse,
157162
CodeArtifact,
@@ -312,6 +317,12 @@
312317
# Audit Tool Call types (Issue #1260)
313318
"AuditToolCallRequest",
314319
"AuditToolCallResponse",
320+
# Circuit Breaker Observability types (Issue #1176)
321+
"CircuitBreakerStatusResponse",
322+
"CircuitBreakerHistoryEntry",
323+
"CircuitBreakerHistoryResponse",
324+
"CircuitBreakerConfig",
325+
"CircuitBreakerConfigUpdate",
315326
# Execution Replay types
316327
"ExecutionSummary",
317328
"ExecutionSnapshot",

axonflow/client.py

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@
140140
BudgetStatus,
141141
CacheConfig,
142142
CancelPlanResponse,
143+
CircuitBreakerConfig,
144+
CircuitBreakerConfigUpdate,
145+
CircuitBreakerHistoryEntry,
146+
CircuitBreakerHistoryResponse,
147+
CircuitBreakerStatusResponse,
143148
ClientRequest,
144149
ClientResponse,
145150
ConnectorHealthStatus,
@@ -1850,6 +1855,171 @@ async def audit_tool_call(
18501855
timestamp=response["timestamp"],
18511856
)
18521857

1858+
# =========================================================================
1859+
# Circuit Breaker Observability Methods
1860+
# =========================================================================
1861+
1862+
async def get_circuit_breaker_status(self) -> CircuitBreakerStatusResponse:
1863+
"""Get all active circuit breaker circuits.
1864+
1865+
Returns the current state of all circuit breakers, including which
1866+
circuits are open (tripped) and whether any emergency stop is active.
1867+
1868+
Returns:
1869+
CircuitBreakerStatusResponse with active circuits and counts.
1870+
1871+
Raises:
1872+
AxonFlowError: If the request fails.
1873+
1874+
Example:
1875+
>>> status = await client.get_circuit_breaker_status()
1876+
>>> print(f"{status.count} active circuits")
1877+
>>> if status.emergency_stop_active:
1878+
... print("Emergency stop is active!")
1879+
"""
1880+
if self._config.debug:
1881+
self._logger.debug("Getting circuit breaker status")
1882+
1883+
response = await self._request("GET", "/api/v1/circuit-breaker/status")
1884+
data = response.get("data", response)
1885+
1886+
return CircuitBreakerStatusResponse(
1887+
active_circuits=data.get("active_circuits") or [],
1888+
count=data.get("count", 0),
1889+
emergency_stop_active=data.get("emergency_stop_active", False),
1890+
)
1891+
1892+
async def get_circuit_breaker_history(
1893+
self,
1894+
limit: int | None = None,
1895+
) -> CircuitBreakerHistoryResponse:
1896+
"""Get circuit breaker history for audit trail.
1897+
1898+
Returns the history of circuit breaker state transitions, including
1899+
trips, resets, and auto-recovery events.
1900+
1901+
Args:
1902+
limit: Maximum number of history entries to return.
1903+
1904+
Returns:
1905+
CircuitBreakerHistoryResponse with history entries.
1906+
1907+
Raises:
1908+
AxonFlowError: If the request fails.
1909+
1910+
Example:
1911+
>>> history = await client.get_circuit_breaker_history(limit=50)
1912+
>>> for entry in history.history:
1913+
... print(f"{entry.scope}/{entry.scope_id}: {entry.state}")
1914+
"""
1915+
if self._config.debug:
1916+
self._logger.debug(
1917+
"Getting circuit breaker history",
1918+
limit=limit,
1919+
)
1920+
1921+
path = "/api/v1/circuit-breaker/history"
1922+
if limit is not None:
1923+
path = f"{path}?limit={limit}"
1924+
1925+
response = await self._request("GET", path)
1926+
data = response.get("data", response)
1927+
1928+
history = [CircuitBreakerHistoryEntry(**entry) for entry in (data.get("history") or [])]
1929+
1930+
return CircuitBreakerHistoryResponse(
1931+
history=history,
1932+
count=data.get("count", 0),
1933+
)
1934+
1935+
async def get_circuit_breaker_config(
1936+
self,
1937+
tenant_id: str | None = None,
1938+
) -> CircuitBreakerConfig:
1939+
"""Get circuit breaker configuration (global or tenant-specific).
1940+
1941+
Args:
1942+
tenant_id: If provided, returns tenant-specific config with
1943+
any overrides applied. Otherwise returns global defaults.
1944+
1945+
Returns:
1946+
CircuitBreakerConfig with thresholds and recovery settings.
1947+
1948+
Raises:
1949+
AxonFlowError: If the request fails.
1950+
1951+
Example:
1952+
>>> config = await client.get_circuit_breaker_config()
1953+
>>> print(f"Error threshold: {config.error_threshold}")
1954+
>>> tenant_config = await client.get_circuit_breaker_config(
1955+
... tenant_id="tenant-123"
1956+
... )
1957+
"""
1958+
if self._config.debug:
1959+
self._logger.debug(
1960+
"Getting circuit breaker config",
1961+
tenant_id=tenant_id,
1962+
)
1963+
1964+
path = "/api/v1/circuit-breaker/config"
1965+
if tenant_id is not None:
1966+
path = f"{path}?tenant_id={tenant_id}"
1967+
1968+
response = await self._request("GET", path)
1969+
data = response.get("data", response)
1970+
1971+
return CircuitBreakerConfig(**data)
1972+
1973+
async def update_circuit_breaker_config(
1974+
self,
1975+
config: CircuitBreakerConfigUpdate,
1976+
) -> dict[str, Any]:
1977+
"""Update per-tenant circuit breaker configuration.
1978+
1979+
Sets tenant-specific overrides for circuit breaker thresholds and
1980+
recovery behavior.
1981+
1982+
Args:
1983+
config: Configuration update with tenant_id and override values.
1984+
1985+
Returns:
1986+
Server response confirming the update.
1987+
1988+
Raises:
1989+
ValueError: If tenant_id is empty.
1990+
AxonFlowError: If the request fails.
1991+
1992+
Example:
1993+
>>> from axonflow.types import CircuitBreakerConfigUpdate
1994+
>>> result = await client.update_circuit_breaker_config(
1995+
... CircuitBreakerConfigUpdate(
1996+
... tenant_id="tenant-123",
1997+
... error_threshold=10,
1998+
... violation_threshold=5,
1999+
... )
2000+
... )
2001+
"""
2002+
if not config.tenant_id or not config.tenant_id.strip():
2003+
msg = "tenant_id is required and cannot be empty"
2004+
raise ValueError(msg)
2005+
2006+
if self._config.debug:
2007+
self._logger.debug(
2008+
"Updating circuit breaker config",
2009+
tenant_id=config.tenant_id,
2010+
)
2011+
2012+
request_body = config.model_dump(by_alias=True, exclude_none=True)
2013+
2014+
response = await self._request(
2015+
"PUT",
2016+
"/api/v1/circuit-breaker/config",
2017+
json_data=request_body,
2018+
)
2019+
2020+
result: dict[str, Any] = response.get("data", response)
2021+
return result
2022+
18532023
# =========================================================================
18542024
# Audit Log Read Methods
18552025
# =========================================================================
@@ -6271,6 +6441,33 @@ def audit_tool_call(
62716441
"""Record a non-LLM tool call in the audit trail."""
62726442
return self._run_sync(self._async_client.audit_tool_call(request))
62736443

6444+
# Circuit Breaker Observability sync wrappers
6445+
6446+
def get_circuit_breaker_status(self) -> CircuitBreakerStatusResponse:
6447+
"""Get all active circuit breaker circuits."""
6448+
return self._run_sync(self._async_client.get_circuit_breaker_status())
6449+
6450+
def get_circuit_breaker_history(
6451+
self,
6452+
limit: int | None = None,
6453+
) -> CircuitBreakerHistoryResponse:
6454+
"""Get circuit breaker history for audit trail."""
6455+
return self._run_sync(self._async_client.get_circuit_breaker_history(limit=limit))
6456+
6457+
def get_circuit_breaker_config(
6458+
self,
6459+
tenant_id: str | None = None,
6460+
) -> CircuitBreakerConfig:
6461+
"""Get circuit breaker config (global or tenant-specific)."""
6462+
return self._run_sync(self._async_client.get_circuit_breaker_config(tenant_id=tenant_id))
6463+
6464+
def update_circuit_breaker_config(
6465+
self,
6466+
config: CircuitBreakerConfigUpdate,
6467+
) -> dict[str, Any]:
6468+
"""Update per-tenant circuit breaker config."""
6469+
return self._run_sync(self._async_client.update_circuit_breaker_config(config))
6470+
62746471
# Policy CRUD sync wrappers
62756472

62766473
def list_static_policies(

axonflow/types.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,3 +1168,85 @@ class AuditToolCallResponse(BaseModel):
11681168
audit_id: str = Field(description="Unique ID for the audit entry")
11691169
status: str = Field(description="Recording status (e.g., recorded)")
11701170
timestamp: str = Field(description="Timestamp when the audit entry was recorded")
1171+
1172+
1173+
# =========================================================================
1174+
# Circuit Breaker Observability Types
1175+
# =========================================================================
1176+
1177+
1178+
class CircuitBreakerStatusResponse(BaseModel):
1179+
"""Response from circuit breaker status endpoint."""
1180+
1181+
model_config = ConfigDict(populate_by_name=True)
1182+
1183+
active_circuits: list[dict[str, Any]] = Field(
1184+
default_factory=list, description="List of active (open) circuits"
1185+
)
1186+
count: int = Field(description="Number of active circuits")
1187+
emergency_stop_active: bool = Field(description="Whether any circuit is open")
1188+
1189+
1190+
class CircuitBreakerHistoryEntry(BaseModel):
1191+
"""A single circuit breaker history entry."""
1192+
1193+
model_config = ConfigDict(populate_by_name=True)
1194+
1195+
id: str = Field(description="Circuit ID")
1196+
org_id: str = Field(description="Organization ID")
1197+
scope: str = Field(description="Circuit scope (global, tenant, client, policy)")
1198+
scope_id: str = Field(default="", description="Scope identifier")
1199+
state: str = Field(description="Circuit state (closed, open, half_open)")
1200+
trip_reason: str | None = Field(default=None, description="Why the circuit was tripped")
1201+
tripped_by: str | None = Field(default=None, description="Who/what tripped the circuit")
1202+
tripped_at: str | None = Field(default=None, description="When the circuit was tripped")
1203+
expires_at: str | None = Field(default=None, description="When the circuit will auto-reset")
1204+
reset_by: str | None = Field(default=None, description="Who reset the circuit")
1205+
reset_at: str | None = Field(default=None, description="When the circuit was reset")
1206+
error_count: int = Field(default=0, description="Number of errors in current window")
1207+
violation_count: int = Field(default=0, description="Number of violations in current window")
1208+
1209+
1210+
class CircuitBreakerHistoryResponse(BaseModel):
1211+
"""Response from circuit breaker history endpoint."""
1212+
1213+
model_config = ConfigDict(populate_by_name=True)
1214+
1215+
history: list[CircuitBreakerHistoryEntry] = Field(
1216+
default_factory=list, description="Circuit history entries"
1217+
)
1218+
count: int = Field(description="Number of history entries")
1219+
1220+
1221+
class CircuitBreakerConfig(BaseModel):
1222+
"""Circuit breaker configuration (effective for a tenant or global)."""
1223+
1224+
model_config = ConfigDict(populate_by_name=True)
1225+
1226+
source: str = Field(description="Config source: 'global' or 'tenant'")
1227+
error_threshold: int = Field(description="Error threshold for auto-trip")
1228+
violation_threshold: int = Field(description="Policy violation threshold")
1229+
window_seconds: int = Field(description="Sliding window duration in seconds")
1230+
default_timeout_seconds: int = Field(description="Default circuit open timeout in seconds")
1231+
max_timeout_seconds: int = Field(description="Maximum allowed timeout in seconds")
1232+
enable_auto_recovery: bool = Field(description="Whether auto-recovery is enabled")
1233+
tenant_id: str | None = Field(default=None, description="Tenant ID if tenant-specific")
1234+
overrides: dict[str, Any] | None = Field(default=None, description="Tenant-specific overrides")
1235+
1236+
1237+
class CircuitBreakerConfigUpdate(BaseModel):
1238+
"""Request to update per-tenant circuit breaker config."""
1239+
1240+
model_config = ConfigDict(populate_by_name=True)
1241+
1242+
tenant_id: str = Field(description="Tenant ID to configure")
1243+
error_threshold: int | None = Field(default=None, description="Override error threshold")
1244+
violation_threshold: int | None = Field(
1245+
default=None, description="Override violation threshold"
1246+
)
1247+
window_seconds: int | None = Field(default=None, description="Override window duration")
1248+
default_timeout_seconds: int | None = Field(
1249+
default=None, description="Override default timeout"
1250+
)
1251+
max_timeout_seconds: int | None = Field(default=None, description="Override max timeout")
1252+
enable_auto_recovery: bool | None = Field(default=None, description="Override auto-recovery")

0 commit comments

Comments
 (0)