From 29b7203ae0f5f7cb702809ebc22b744541f32274 Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 19:25:26 +0000 Subject: [PATCH 01/11] docs(net): CircuitBreaker layer design spec (Slice 1, PR 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Design for the net-http CircuitBreaker resilience layer (ADR-0031 §5): a pure, clock-injected Breaker state machine (Closed/Open/Half-Open) behind a thin Arc> + Timer Service shell; trips on consecutive Connection/Timeout/5xx failures (or immediately on Throttled/429 with the long throttle_cooldown), fast-rejects with a new non-retryable HttpError::CircuitOpen / ErrorKind::CircuitOpen, lazy Half-Open probing with now()-only timing (no sleep, no futures-util, no new dependency). Adds a 4-class outcome partition (Failure / TripNow / Ignored / Success) so 4xx/Auth and unclassified errors neither trip nor mask a building outage; Unknown -> Ignored for v1 with the resilience4j fail-safe recorded as a future improvement. To be recorded as ADR-0034 amendment #9. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...04-net-http-circuitbreaker-layer-design.md | 407 ++++++++++++++++++ 1 file changed, 407 insertions(+) create mode 100644 docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md diff --git a/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md b/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md new file mode 100644 index 0000000..5da4f9c --- /dev/null +++ b/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md @@ -0,0 +1,407 @@ +# net-http `CircuitBreaker` layer — design (Slice 1, PR 4) + +## Context + +Slice 0 landed the net-http **construction surface** (transport contract ADR-0030; +`AuthSource`/`Auth`/`Guarded` in #66; the boot-time pacing config `RateKey`/ +`RateLimitConfig`/`validate_coverage` in #72). Slice 1 implements the resilience +*layers* of [ADR-0031](../../adr/0031-http-resilience-venue-pacing.md) — `Timeout`, +`RateLimit`, `Retry`, `CircuitBreaker`, `Tracing` — each a standalone, composable +`Service` generic over [`net-api::Timer`](../../adr/0029-network-adapter-stack-transport-split-compile-time-composition.md), +tested over inline service doubles + `MockTimer`. Assembly (`stack()`/`build()`) is +Slice 2. + +**PRs 1–3 landed `RateLimit`, `Timeout`, and `Retry`** (#76: `RateLimit` + +the `RateScope`/`Scope` directive, token-bucket + concurrency acquire, `Guarded` permit +lifetime; #78: `Timeout` + `TimeoutLayer`, the response-future race; the +`Retry` branch: `Retry` + `RetryLayer`, the `Retryable` marker, order-safe +retry with capped-exponential full-jitter backoff). This spec covers **PR 4: +`CircuitBreaker`** — the **reactive** backstop to `RateLimit`'s proactive guard. It +reuses every seam PRs 1–3 established: the `Layer`/`Service` contracts, +`net-api::Timer`, the `HttpError`/`ErrorKind` classification, and the inline-double + +`MockTimer` test pattern (net-http-api **cannot** dev-depend on `net-http-mock`'s +`MockClient` — that closes a crate cycle and the two builds' `Service` impls do not +unify; `rate_limit.rs`/`timeout.rs`/`retry.rs`/`body.rs` use inline doubles for exactly +this reason). + +### Governing ADRs + +- **ADR-0031 §5** — the CircuitBreaker decision record, the **source of truth** for + every behavior below: three states (`Closed`/`Open`/`Half-Open`), `Timer`-driven; + `CircuitBreakerConfig { failure_threshold, cooldown, throttle_cooldown, + half_open_probes }`; **Closed → Open** on `failure_threshold` consecutive + `Connection`/`Server`/`Timeout` (consecutive-count for v1; rolling-window later) **or + immediately on `Throttled`/429** with the long `throttle_cooldown`; **Open** rejects + fast with a non-retryable `CircuitOpen`; **Half-Open** admits `half_open_probes` + (success closes, failure re-opens); a **single per-host breaker**, state shared behind + `Arc`. +- **ADR-0031 §1–§2** — the default stack `Tracing → CircuitBreaker → Retry → RateLimit → + Timeout → BufferOrStream → Auth → leaf`. `CircuitBreaker` sits **outside `Retry`**, so + it counts *logical* (post-retry) outcomes and short-circuits **before** `Retry`/ + `RateLimit`/`Timeout` ever run. +- **ADR-0029** — `Timer` (`now()` + `sleep()`), compile-time composition, no `dyn`. The + breaker uses **`now()` only** — it never sleeps. +- **ADR-0030 §5 / ADR-0034 §2** — HTTP 4xx/5xx *statuses* are **not** `HttpError`s; they + flow through as `Ok(http::Response)` with the body intact (the adapter classifies). + The breaker therefore reads its failure signal from **both** the `Err(HttpError)` path + (`Connection`/`Timeout`) **and** the response `status()` on the `Ok` path (5xx, 429), + exactly as `Retry` does. +- **ADR-0034** — the construction-surface ADR the layer PRs append their amendments to; + this layer adds amendment **#9** (see §Definition of done). + +## Goal + +A `CircuitBreaker` `Service` (+ its `CircuitBreakerLayer` factory) that, having +seen `failure_threshold` consecutive transport failures, **trips open** and thereafter +**fast-rejects** every request with a non-retryable `HttpError::CircuitOpen` — never +touching the inner stack — until a `Timer`-measured `cooldown` elapses, at which point a +bounded number of **Half-Open probes** test recovery (a healthy response closes the +circuit; a failure re-opens it). A `Throttled`/429 trips the circuit **immediately** on +the long `throttle_cooldown` (IBKR's ~15-minute penalty box). Runtime-neutral +(`Timer`-generic, `now()`-only — **no sleep**, **no `futures-util`**, **no new +dependency**), body-transparent, single per-host breaker shared behind `Arc`, and fully +mockable with a fake clock. The highest-consequence logic — the state machine — lives in +a **pure, clock-injected `Breaker` unit** that is table-testable with zero async. + +## Scope (in) + +- The `CircuitBreaker` service + `CircuitBreakerLayer` factory (impl'ing + `net-api::Layer`), in `oath-adapter-net-http-api`, in a new `circuit_breaker.rs`. +- `CircuitBreakerConfig` plain-`Copy` data (`failure_threshold: NonZeroU32`, `cooldown`, + `throttle_cooldown`, `half_open_probes: NonZeroU32`) and an **infallible** + `CircuitBreakerLayer::new(cfg, timer)`. +- The **pure `Breaker` core** (`Breaker` + `BreakerState`): clock-injected transitions + `admit(&mut self, now) -> Admit` and `record(&mut self, class, now)` — no async, `now` + an input, fully table-tested. +- The **outcome classifier** `classify(&Result, HttpError>) -> Class` — a + pure, state-independent 4-way partition (Failure / TripNow / Ignored / Success, §3). +- The **new error** `HttpError::CircuitOpen` and its classification `→ + ErrorKind::CircuitOpen` (a **new** variant on the `net-api` `ErrorKind` enum), + non-retryable. +- The **thin async shell**: `call()` decides admission under a short lock (using + `timer.now()`), **releases the lock**, runs `inner.call(req).await` on admission (or + returns `CircuitOpen` on rejection without touching the leaf), then records the + classified outcome under a second short lock. The lock is **never** held across the + `await`. +- `MockTimer`-driven tests: pure-core table tests + Service integration over inline + service doubles. + +## Non-goals (deferred — each its own PR/slice or a documented future) + +| Deferred | Why | Where | +| --- | --- | --- | +| **`Unknown` → Failure (resilience4j fail-safe default).** v1 treats `Other`/`Unknown` as **Ignored** (conservative — never over-trip the whole gateway on an error we can't explain). | A sustained run of *unclassified* errors arguably signals a host problem; resilience4j's default records all exceptions as failures unless ignored. Deferred until we have evidence of what actually produces `HttpError::Other` from the hyper leaf — reclassifying is a one-line `classify` change plus a table-test row. | future PR | +| **Rolling-window failure counting** (a failure *rate* over a sliding window) | ADR-0031 §5 pins **consecutive-count for v1**; a window is an additive refinement to the `Closed` state's counter and its `record` arm. | future PR | +| **Per-key / per-endpoint breakers** | ADR-0031 §5: IBKR's penalty box is **per-IP, venue-wide**, so **one breaker per host** matches reality (v1). A keyed breaker mirrors `RateLimit`'s `RateKey` map if a venue ever needs per-route isolation. | when a venue needs it | +| **A breaker-state observation surface / watch** (exposing Open/Closed to Telemetry or a trading-halt consumer) | YAGNI for HTTP v1 — the WS stack's inverting breaker relocates the "break" to a risk-layer halt via a lifecycle watch (ADR-0033 §7), but the HTTP breaker (ADR-0031 §5) has no such consumer yet. `Tracing` (PR5) reads per-request outcomes; a watch is additive. | future PR / PR5 | +| **`stack()`/`build()` assembly, `HttpConfig`, Tokio `Timer`** | Construction/wiring / runtime-specific | Slice 2 | +| **`Tracing` layer** | Independent `Service`, outermost; emits per-request spans over the breaker's decision | Slice 1 PR 5 | + +## Decisions + +### 1. Layer shape & construction + +```rust +pub struct CircuitBreakerConfig { + pub failure_threshold: NonZeroU32, // consecutive failures in Closed → Open + pub cooldown: Duration, // general outage, e.g. 30s + pub throttle_cooldown: Duration, // penalty box ≈ 15 min, on Throttled/429 + pub half_open_probes: NonZeroU32, // probes admitted per Half-Open episode (1) +} + +pub struct CircuitBreakerLayer { breaker: Arc>, timer: T } +pub struct CircuitBreaker { inner: S, breaker: Arc>, timer: T } +``` + +- **`CircuitBreakerLayer::new(cfg, timer) -> Self` is infallible.** `NonZeroU32` on + `failure_threshold` and `half_open_probes` makes "≥ 1" a *type* invariant — a `0` + threshold is nonsense and `0` probes would leave a tripped circuit **stuck Open + forever** (nothing could ever be admitted to close it). This is the same type-safety + move `RetryConfig` made with `NonZeroU32` (which needs no `Result`, unlike + `RateLimitLayer::new`'s config-map validation). **This is a deliberate divergence from + ADR-0031 §5's sketch**, which types both as `u32`; recorded in the ADR-0034 amendment. + `CircuitBreakerConfig` is `Copy` plain data. +- **Single per-host breaker, shared.** `new` constructs the `Breaker` **once** into an + `Arc>`; `Layer::layer` **clones the `Arc`** into every produced service, + and `Clone` for `CircuitBreaker`/`CircuitBreakerLayer` clones the `Arc` (shared state, + bound `T: Clone`) — so every service the layer yields, and every clone the stack makes, + observes **one** breaker. This realises ADR-0031 §5's "single per-host breaker, state + shared behind `Arc`". +- `Debug`/`Clone` on `CircuitBreakerLayer`/`CircuitBreaker` are **hand-written** (as + `RateLimit`/`Timeout`/`Retry` do): `Debug` uses `finish_non_exhaustive`; `Clone` bounds + `T: Clone` (and, for `CircuitBreaker`, `S: Clone`) so the derives don't demand + `Debug`/`Clone` on the inner service. + +### 2. The pure core — `Breaker` + `BreakerState` (clock-injected, table-tested) + +The state machine is the highest-consequence logic in the layer, so — mirroring how +`Retry` isolated `SplitMix64` and how the WS resilience spec isolates `ReconnectPolicy` +as a pure, table-tested unit — it lives in a **standalone `Breaker`** whose transitions +are **pure functions with `now: Instant` as an input** (the async shell owns the `Timer` +and feeds it). Zero async, zero locks inside — the `Mutex` lives in the shell. + +```rust +enum BreakerState { + Closed { consecutive_failures: u32 }, + Open { reopen_at: Instant }, // cooldown target instant + HalfOpen { probes_left: u32, successes_needed: u32 }, +} +enum Admit { Pass, Reject } + +struct Breaker { state: BreakerState, cfg: CircuitBreakerConfig } +``` + +- **`admit(&mut self, now: Instant) -> Admit`** — the fast-path gate, called before the + inner send: + - **`Closed`** → `Pass` (state unchanged). + - **`Open { reopen_at }`** → if `now >= reopen_at`, transition to `HalfOpen { + probes_left: half_open_probes − 1, successes_needed: half_open_probes }` and return + `Pass` (**this** call becomes the first probe); else `Reject`. + - **`HalfOpen { probes_left, .. }`** → if `probes_left > 0`, decrement and `Pass`; else + `Reject` (the **concurrency gate** — no more than `half_open_probes` in-flight + probes; with `half_open_probes = 1`, exactly one at a time). +- **`record(&mut self, class: Class, now: Instant)`** — applied to the *current* state + after the outcome resolves: + + | State \ Class | `Failure` | `TripNow` | `Ignored` | `Success` | + | --- | --- | --- | --- | --- | + | **Closed** | `failures += 1`; at `failure_threshold` → `Open { now + cooldown }` | `Open { now + throttle_cooldown }` | no-op (streak untouched) | `failures = 0` | + | **Half-Open** | `Open { now + cooldown }` | `Open { now + throttle_cooldown }` | `successes_needed −= 1`; at 0 → `Closed{0}` | `successes_needed −= 1`; at 0 → `Closed{0}` | + | **Open** | no-op | no-op | no-op | no-op | + + - **Half-Open treats `Ignored` and `Success` identically** — both *resolve the probe* + and count toward closing. A probe's question is narrowly *"is the host reachable and + responding coherently?"*; a `4xx`/`Auth` answers **yes** (see §3), so it closes even + though the same outcome is a no-op in `Closed`. This is what avoids a **stuck + Half-Open** — every admitted probe reaches a decisive resolution. + - **`Open` records are no-ops (defensive).** Because the shell releases the lock across + the `await` (§4), a call admitted while `Closed`/`Half-Open` can have its outcome + recorded *after* a concurrent call already tripped the circuit; recording into `Open` + is simply dropped. This loses at most one data point per race and can never + *un*-trip a freshly-opened circuit — acceptable for a single global v1 breaker. + +### 3. Outcome classification — the 4-way partition + +```rust +enum Class { Failure, TripNow, Ignored, Success } + +fn classify(outcome: &Result, HttpError>) -> Class { + match outcome { + Err(e) => match e.kind() { + ErrorKind::Connection | ErrorKind::Timeout | ErrorKind::Server => Class::Failure, + ErrorKind::Throttled => Class::TripNow, + // Auth, Client, Unknown, CircuitOpen — and any future kind — are Ignored: + _ => Class::Ignored, + }, + Ok(resp) => { + let s = resp.status(); + if s == http::StatusCode::TOO_MANY_REQUESTS { Class::TripNow } // 429 + else if s.is_server_error() { Class::Failure } // 5xx + else if s.is_client_error() { Class::Ignored } // 4xx (non-429) + else { Class::Success } // 2xx / 3xx + } + } +} +``` + +| Class | Outcomes | Rationale | +| --- | --- | --- | +| **Failure** | `Err(Connection\|Timeout)`, `Ok(5xx)` | Genuine transport / server failure — the signal the breaker exists to count (ADR-0031 §5's "consecutive `Connection`/`Server`/`Timeout`"; `Server` = a 5xx status on the `Ok` path, since `HttpError` never carries `Server`). | +| **TripNow** | `Err(Throttled)`, `Ok(429)` | IBKR's penalty box — trip **immediately** on the long `throttle_cooldown`; retrying/continuing compounds the ban (ADR-0031 §5). | +| **Ignored** | `Ok(4xx non-429)`, `Err(Auth)`, **`Err(Other)`/`Unknown`** | The host answered coherently (4xx) or the fault is caller/credential-side (`Auth`), or unclassified: **do not trip, and do not reset** the failure streak. Not resetting is the key improvement — it stops a `5xx,4xx,5xx,4xx…` interleave from masking a building outage. Ignoring `Auth` is essential: an expired token must **not** trip the whole gateway (refresh is the `Auth` layer's job). `Unknown → Ignored` is the conservative v1 default (see Non-goals for the fail-safe alternative). | +| **Success** | `Ok(2xx\|3xx)` | The host is genuinely healthy → reset the `Closed` streak / resolve a Half-Open probe. | + +The `_ => Class::Ignored` catch-all on the `Err` arm is **deliberate and +self-documenting**: anything not explicitly a transport `Failure` or a `TripNow` +defaults to *Ignored* — the conservative "never over-trip on something we didn't +specifically classify" stance, consistent with the `Unknown → Ignored` decision. (Today +`HttpError::kind()` yields only `Timeout`/`Connection`/`Throttled`/`Auth`/`Unknown`; the +`Server`/`Client`/`CircuitOpen` arms are unreachable-via-`HttpError` but keep `classify` +total and correct if the mapping ever widens.) + +This partition is grounded in mature circuit-breaker practice — **resilience4j**'s +`recordExceptions`/`ignoreExceptions`, **Polly**'s `HandleTransientHttpError` +(5xx/408/`HttpRequestException` only), and **Envoy**'s consecutive-gateway-errors — all +of which distinguish an *ignored* outcome (neither success nor failure) from a healthy +one, precisely so client-side errors neither trip nor mask. + +### 4. The async shell — `call()` data flow + +```rust +impl Service> for CircuitBreaker +where + S: Service, Response = http::Response, Error = HttpError> + Sync, + T: Timer, +{ + type Response = http::Response; + type Error = HttpError; + + #[allow(clippy::manual_async_fn)] + fn call(&self, req: http::Request) + -> impl Future> + Send + { + async move { + let now = self.timer.now(); + let admit = self.lock().admit(now); // short lock; released here + if let Admit::Reject = admit { + return Err(HttpError::CircuitOpen); // fast reject — leaf untouched + } + let outcome = self.inner.call(req).await; // NO lock held across the await + let class = classify(&outcome); + self.lock().record(class, self.timer.now()); // short lock; released here + outcome + } + } +} +``` + +- **The lock is a `std::sync::Mutex` held only around the two pure `Breaker` calls, + never across the `await`** — the same discipline `RateLimit` uses for its bucket + `Mutex`. A poisoned lock is recovered with + `.unwrap_or_else(std::sync::PoisonError::into_inner)` (never `.lock().unwrap()`), per + the workspace no-`unwrap` rule; the `Breaker` holds no invariant a panic could + corrupt, so recovering is safe. +- **A rejected request never reaches the leaf** — `CircuitOpen` is synthesised locally. + This is the whole point: an Open circuit stops load cold, protecting a struggling (or + penalty-boxed) host. +- **Body-transparent** — an admitted request's `http::Response` is returned untouched + (no `Guarded`-style carrier, no `B` bound beyond the inner `Service`'s); the breaker + only *reads* `status()` via `classify`. +- **`S: Sync`** because the returned `Send` future borrows `&self` — the same bound + `RateLimit`/`Timeout`/`Retry` carry. +- **No sleep, no race.** Unlike `Timeout` (races a `Timer::sleep`) and `Retry` (sleeps + between attempts), the breaker only ever *reads* `timer.now()`; the Open→Half-Open + transition is a **lazy comparison on the next `admit`**, so there is no background + timer, no `futures-util::select`, and no new dependency. + +### 5. The new error — `HttpError::CircuitOpen` + +```rust +// net-api/src/error_kind.rs — new variant on the shared enum +pub enum ErrorKind { Timeout, Connection, Throttled, Auth, Client, Server, Unknown, CircuitOpen } + +// net-http-api/src/error.rs +pub enum HttpError { /* … */ CircuitOpen } // no source — a local decision +// impl HasErrorKind: +HttpError::CircuitOpen => ErrorKind::CircuitOpen +``` + +- **A genuinely new failure mode deserves a distinct kind.** A fast local reject is *not* + a transport failure and *not* a throttle; mapping it to `Unknown` would make an open + circuit observably indistinguishable from a real backend error in Telemetry, and + `Throttled` would conflate it with `RateLimit`'s proactive wait. So both a new + `HttpError::CircuitOpen` **and** a new `ErrorKind::CircuitOpen` are added. +- **Non-retryable by construction.** `Retry::is_transient` is `{Timeout, Connection}`, so + `CircuitOpen` is never retried even if it *were* seen — and it is not: the breaker sits + **outside `Retry`**, so `CircuitOpen` short-circuits above the retry loop and surfaces + straight to `Tracing`/the adapter. Nothing above the breaker retries. +- **Touch points:** the `net-api` `ErrorKind` enum + its exhaustive `kind()`-mapping test + and any exhaustive `match` in that crate; `net-http-api`'s `error.rs` `HasErrorKind` + impl + its `kind_maps_each_variant` test gain the `CircuitOpen` row. + +### 6. Stack interaction (ADR-0031 §1–§2, §5) + +`Tracing → CircuitBreaker → Retry → RateLimit → Timeout → BufferOrStream → Auth → leaf`. +`CircuitBreaker` is **outside `Retry`**, so it counts the **logical, post-retry** +outcome — a request that `Retry` re-issued three times and still failed is **one** +`Failure` to the breaker, not three — and an Open circuit short-circuits **before** +`Retry`/`RateLimit`/`Timeout` run, spending none of their budget. It is body-transparent, +composing with `Retry`'s (and below it, `RateLimit`'s `Guarded`) response type without +disturbing it — the breaker only reads `status()` and either forwards the response or +substitutes a local `CircuitOpen`. + +## Testing (MockTimer-driven; pure-core table tests + Service integration) + +The `Breaker` core is tested **synchronously** (no executor, `now` an input); the Service +is tested with `MockTimer::advance()` + an **inline** `Service` double that counts calls +and yields a scripted outcome sequence (no `MockClient` — cycle). `#[tokio::test]` +provides the executor. + +**Pure `Breaker` table tests (zero async):** + +- **Threshold trip.** `failure_threshold = 3`: two `Failure`s keep it `Closed`; the third + consecutive `Failure` → `Open { now + cooldown }`. +- **Streak reset.** `Failure, Failure, Success, Failure` never trips (the `Success` + resets the counter). +- **Ignored does not reset.** `Failure, Ignored, Failure, Failure` (threshold 3) **trips** + — the `Ignored` left the streak intact (the anti-masking property). +- **Immediate throttle trip.** A single `TripNow` from `Closed` → `Open`, and the reopen + instant is `now + throttle_cooldown` (asserted **distinct** from `cooldown`). +- **Open rejects then probes.** `Open` `admit`s `Reject` while `now < reopen_at`; at/after + `reopen_at` the first `admit` returns `Pass` and transitions to `Half-Open`. +- **Half-Open close / re-open.** A probe `Success` (and, separately, an `Ignored`) → + `Closed`; a probe `Failure` → `Open { now + cooldown }`; a probe `TripNow` → `Open { + now + throttle_cooldown }`. +- **Concurrency gate.** With `half_open_probes = 1`, once the single probe is admitted a + further `admit` returns `Reject` until the probe resolves; with `half_open_probes = 2`, + two probes admit and both must resolve non-failing to close. +- **`classify` partition.** A table over `(Err kind | Ok status) → Class` covering every + row of §3 (incl. `429 → TripNow`, `4xx → Ignored`, `Auth → Ignored`, `Unknown → + Ignored`, `5xx → Failure`, `2xx → Success`). + +**Service integration (`MockTimer` + inline leaf double):** + +- **Trip then fast-reject.** `failure_threshold` consecutive `Connection` errors trip the + circuit; the **next** call returns `Err(HttpError::CircuitOpen)` with the **leaf + call-count frozen** (the request never reached the inner double). +- **Immediate 429 trip.** One leaf `Ok(429)` trips the circuit on the long cooldown; a + subsequent call fast-rejects, and only advancing past `throttle_cooldown` (not the short + `cooldown`) admits a probe. +- **Recovery.** After tripping, `advance` past `cooldown` → the next call is admitted as a + probe (leaf hit **once**); a leaf `Ok(200)` **closes** (subsequent calls flow), whereas + a leaf `Ok(503)`/`Connection` **re-opens** (next call fast-rejects again). +- **Shared state.** Two `CircuitBreaker` clones produced from one `CircuitBreakerLayer` + observe the **same** trip (a failure streak driven through clone A opens the circuit + seen by clone B) — proving the single-per-host `Arc` sharing. + +## Dependencies + +**No new dependency, no `net-http-api` `Cargo.toml` change.** `http`/`bytes`/`http-body` +are crate deps; `oath-adapter-net-mock` (`MockTimer`) + `tokio` are dev-deps — all +present since #76/#78. The breaker uses `std::sync::{Arc, Mutex}`, +`std::time::{Duration, Instant}`, `std::num::NonZeroU32`, and `net-api::Timer::now()` +only — **no** `futures-util` race, **no** `sleep`, **no** +`tokio`/`hyper`/`reqwest`/`serde`. The **`net-api`** crate gains one enum variant +(`ErrorKind::CircuitOpen`) — no new dependency there either. + +## Definition of done + +- `CircuitBreaker` + `CircuitBreakerLayer` + `CircuitBreakerConfig` + the pure + `Breaker`/`BreakerState` core + the `classify` partition, implemented as specified in + `circuit_breaker.rs`, with the tests above. +- `net-api` gains `ErrorKind::CircuitOpen`; `net-http-api` gains `HttpError::CircuitOpen` + and its `HasErrorKind` row; both crates' exhaustive-mapping tests updated. +- `lib.rs` gains `pub mod circuit_breaker;` + re-exports + a module-doc bullet. +- ADR-0034 gains an **append-only** amendment (**#9** — the `Retry` layer takes #8; see + Open questions) recording: the `CircuitBreaker` layer, the new `CircuitOpen` + error/kind, the 4-class classification (`Connection`/`Timeout`/`5xx` → Failure; + `Throttled`/429 → immediate TripNow on the long cooldown; `4xx`/`Auth`/`Unknown` → + Ignored; `2xx`/`3xx` → Success), the `NonZeroU32` divergence from §5's `u32`, + consecutive-count v1, single per-host, lazy Half-Open, and the deferred + `Unknown → Failure` / rolling-window / per-key / state-watch. +- `just ci` green (fmt, lint = deny, test + doctests, doc, deny, typos, machete); no new + warnings; no `unsafe`/`unwrap`/`expect`/indexing in non-test code (the `Mutex` poison + is recovered, not unwrapped; there is no fallible arithmetic — counter increments are + bounded by `failure_threshold` and a `saturating_add` guards the degenerate case). +- `CHANGELOG.md` `[Unreleased]` updated. +- Delivered as one issue → one branch (worktree `.claude/worktrees/net-http-circuit-breaker`) + → one PR (`Closes #N`). + +## Open questions (for the implementation plan) + +1. **ADR amendment number.** On `main @ de2e5e4` ADR-0034's amendment list runs #1–#7; + the `Retry` branch adds #8. If this branch is cut **after `Retry` merges** (or off the + `Retry` branch), `CircuitBreaker` is **#9**. If it is cut off `main` **before `Retry` + merges**, both would provisionally claim #8 — renumber to #9 on rebase (the same + trail-completeness convention #78's Timeout spec and the `Retry` spec used). This spec + assumes **#9**. +2. **`Instant` source in `MockTimer`.** Confirm `MockTimer::now()` returns an `Instant` + that `advance()` moves forward monotonically, so the `reopen_at = now + cooldown` + comparison and the "past `throttle_cooldown` but not `cooldown`" test are expressible + (they are for `RateLimit`/`Timeout`/`Retry`, which already gate on `MockTimer` time). +3. **Test executor.** `#[tokio::test]` for the Service tests (parity with the shipped + layers); the pure `Breaker` tests need no executor at all — a benefit of the + pure-core cut. +4. **`half_open_probes > 1` semantics.** v1 ships the `NonZeroU32` knob and the + `probes_left`/`successes_needed` accounting handles `> 1`, but IBKR uses `1`; confirm + whether to table-test the `> 1` path now (cheap, proves the generality) or defer. From 61e094a912adfeea6b882def3093132f80e5d8ba Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 19:36:27 +0000 Subject: [PATCH 02/11] docs(net): CircuitBreaker layer implementation plan (Slice 1, PR 4) Co-Authored-By: Claude Opus 4.8 (1M context) --- ...026-07-04-net-http-circuitbreaker-layer.md | 1155 +++++++++++++++++ 1 file changed, 1155 insertions(+) create mode 100644 docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md diff --git a/docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md b/docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md new file mode 100644 index 0000000..f03b6c0 --- /dev/null +++ b/docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md @@ -0,0 +1,1155 @@ +# net-http `CircuitBreaker` Layer (Slice 1, PR 4) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build the `CircuitBreaker` HTTP middleware layer — the **reactive** backstop to `RateLimit`'s proactive guard — that trips **Open** after `failure_threshold` consecutive transport failures (or immediately on a `Throttled`/429), fast-rejects with a non-retryable `HttpError::CircuitOpen` without touching the inner stack, and after a `Timer`-measured cooldown admits bounded **Half-Open** probes to test recovery. + +**Architecture:** A `Timer`-generic, runtime-neutral `Service` wrapper in `oath-adapter-net-http-api`. The highest-consequence logic — the Closed/Open/Half-Open state machine — lives in a **pure, clock-injected `Breaker`** (`admit(now) -> Admit`, `record(class, now)`), table-testable with zero async. The `CircuitBreaker` shell is a thin `Arc>` + `Timer`: it locks briefly to `admit` (using `timer.now()`), **releases the lock**, runs `inner.call(req).await` (or returns `CircuitOpen` on rejection), then locks briefly to `record` the `classify`-d outcome. The breaker **never sleeps** — Open→Half-Open is a lazy `now()` comparison on the next `admit` — so there is no `futures-util` race, no lock held across an `await`, and no new dependency. Body-transparent: `http::Response` is forwarded untouched (the breaker only reads `status()`). + +**Tech Stack:** Rust (edition 2024, MSRV 1.90), `just`, `http`/`bytes`, `std::time::{Duration, Instant}`, `std::num::NonZeroU32`, `std::sync::{Arc, Mutex}`, `net-api::{Timer, ErrorKind, HasErrorKind, Layer}`. Tests use inline service doubles + `MockTimer` (`oath-adapter-net-mock`), driven on `tokio` (dev-only). **No new dependency**; the `net-api` contract crate gains one `ErrorKind` variant. + +## Global Constraints + +Every task implicitly includes these: + +- **Edition 2024, MSRV 1.90.** No `unsafe` — the crate is `#![forbid(unsafe_code)]`. +- **No `unwrap`/`expect`/indexing/panic in non-test code** — return `Result` or use total combinators. Recover a poisoned `std::sync::Mutex` with `.unwrap_or_else(std::sync::PoisonError::into_inner)`, **never** `.lock().unwrap()`. Counter increments use `saturating_add`. Test code is exempt for `unwrap`/`expect`/indexing. +- **`just lint` = clippy `-D warnings` + `pedantic`/`nursery`** — `#[must_use]` where asked, document all public items (`missing_docs`), `Debug` on all **public** types (`missing_debug_implementations` — hand-impl where a derive would demand `Debug`/`Clone` on `S`/`T`), `const fn` where `missing_const_for_fn` asks (but **not** `CircuitBreakerLayer::new` — it allocates an `Arc`; see Task 4). +- **`net-http-api` charter:** no async *runtime* — no `tokio`/`hyper`/`reqwest`/`serde` in non-dev deps. **This PR adds no dependency** (`http`/`bytes`/`http-body`/`async-lock` are crate deps; `oath-adapter-net-mock` + `tokio` are dev-deps — all present since #76/#78), so `cargo-deny`/`machete` are unaffected. The `net-api` change is a single enum variant, no new dep there either. +- **net-http-api tests must NOT dev-depend on `oath-adapter-net-http-mock` (`MockClient`)** — it normal-depends on this crate, so the dev-dep closes a cycle that recompiles a second, non-unifying copy of `net-http-api` (E0599: `MockClient` does not satisfy *this* crate's `Service`). Use **inline** service doubles + `oath-adapter-net-mock`'s `MockTimer`, exactly as `rate_limit.rs`/`timeout.rs`/`retry.rs`/`body.rs` do. +- **DoD per PR:** `just ci` green (fmt, lint, test + doctests, doc, deny, typos, machete). Update `CHANGELOG.md` `[Unreleased]`. One issue → one branch → worktree → one PR (`Closes #`). + +## Source spec + +[docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md](../specs/2026-07-04-net-http-circuitbreaker-layer-design.md), governed by [ADR-0031 §5](../../adr/0031-http-resilience-venue-pacing.md) and [ADR-0034](../../adr/0034-http-construction-surface-auth-guarded-boot-coverage.md). This is **Slice 1, PR 4** — the fourth of the resilience-layer PRs (RateLimit #76, Timeout #78, Retry #82 landed; then CircuitBreaker, Tracing). + +## File Structure + +- `crates/adapter/net/api/src/error_kind.rs` — **modify** (Task 1). Add the `ErrorKind::CircuitOpen` variant. +- `crates/adapter/net/http/api/src/error.rs` — **modify** (Task 1). Add the `HttpError::CircuitOpen` variant + its `HasErrorKind` arm + the mapping-test row. +- `crates/adapter/net/http/api/src/circuit_breaker.rs` — **new** (Tasks 2–4). `CircuitBreakerConfig`, `Class`/`classify`, `BreakerState`/`Admit`/`Breaker`, `CircuitBreakerLayer`, `CircuitBreaker`, the `Layer`/`Service` impls, and their tests. +- `crates/adapter/net/http/api/src/lib.rs` — **modify** (Tasks 2, 4). `pub mod circuit_breaker;` + re-exports + module-doc bullet. +- `docs/adr/0034-...md`, `CHANGELOG.md` — **modify** (Task 5). + +No `Cargo.toml` change. Each task is one commit; the tasks together are one PR/issue. + +--- + +## Setup: issue (worktree already exists) + +> The isolated worktree **already exists** at `.claude/worktrees/net-http-circuit-breaker` (branch `feat/net-http-circuit-breaker`, branched off `main` = `68d8f60`, which carries #76/#78/#80/#82). The design-spec commit (`29b7203`) is already on the branch. All tasks run inside the worktree. Only the GitHub issue remains to be created. + +- [ ] **Create the issue** + +```bash +gh issue create \ + --title "feat(net): CircuitBreaker resilience layer (Slice 1, PR 4)" \ + --label enhancement \ + --body "Slice 1 PR 4 of the net-http resilience layers (spec: docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md; ADR-0031 §5). + +- \`CircuitBreaker\` + \`CircuitBreakerLayer\` (impl \`net-api::Layer\`): the reactive backstop to RateLimit — trips Open after \`failure_threshold\` consecutive Connection/Timeout/5xx failures, or immediately on Throttled/429 with the long \`throttle_cooldown\`; fast-rejects with a new non-retryable \`HttpError::CircuitOpen\` / \`ErrorKind::CircuitOpen\`; lazy Half-Open probing (now()-only timing, no sleep, no new dependency) +- Pure clock-injected \`Breaker\` state machine (Closed/Open/Half-Open), table-tested with zero async; thin \`Arc>\` Service shell, single per-host breaker +- 4-class outcome partition (Failure / TripNow / Ignored / Success): 4xx/Auth/unclassified errors neither trip nor mask a building outage; Unknown -> Ignored for v1 (resilience4j fail-safe recorded as a future improvement) +- Body-transparent; sits outside Retry (counts logical post-retry outcomes)" +``` + +Note the issue number `#` for the PR body. + +--- + +## Task 1: the `CircuitOpen` error surface + +**Files:** +- Modify: `crates/adapter/net/api/src/error_kind.rs` +- Modify: `crates/adapter/net/http/api/src/error.rs` + +**Interfaces:** +- Consumes: nothing new. +- Produces: + - `oath_adapter_net_api::ErrorKind::CircuitOpen` — a new variant on the `#[non_exhaustive]` enum: a local fast-reject, distinct from a transport failure. + - `oath_adapter_net_http_api::HttpError::CircuitOpen` — a new variant (no source), `kind() -> ErrorKind::CircuitOpen`. Consumed by Task 4's Service shell. + +- [ ] **Step 1: Write the failing test** + +In `crates/adapter/net/http/api/src/error.rs`, extend the existing `kind_maps_each_variant` test with a `CircuitOpen` row (add this line inside that `#[test]` fn, after the `other` assertion): + +```rust + assert_eq!(HttpError::CircuitOpen.kind(), ErrorKind::CircuitOpen); +``` + +- [ ] **Step 2: Run it to verify it fails** + +Run: `just check` +Expected: FAIL — `no variant named CircuitOpen found for enum HttpError` (and, once that is added, `ErrorKind`). + +- [ ] **Step 3: Add both variants + the mapping** + +In `crates/adapter/net/api/src/error_kind.rs`, add the variant **after** `Unknown` (the enum is already `#[non_exhaustive]`, so downstream crates need no change): + +```rust + /// The error does not fit any other category. + Unknown, + + /// A circuit breaker rejected the request without sending it — the breaker is + /// Open after prior failures (or a throttle) and is failing fast until its + /// cooldown elapses. A deliberate local decision, not a transport outcome; + /// non-retryable. + CircuitOpen, +``` + +In `crates/adapter/net/http/api/src/error.rs`, add the `HttpError` variant (after `Other`, keeping the `#[non_exhaustive]` enum's `thiserror` style): + +```rust + /// The circuit breaker is open — the request was rejected without being sent. + CircuitOpen, +``` + +with its `#[error(...)]` message directly above it: + +```rust + #[error("circuit open: request rejected without being sent")] + CircuitOpen, +``` + +and add the `HasErrorKind` arm (in the `match self` inside `fn kind`, after the `Other` arm): + +```rust + Self::CircuitOpen => ErrorKind::CircuitOpen, +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `just check && cargo test -p oath-adapter-net-api && cargo test -p oath-adapter-net-http-api error && just lint` +Expected: PASS, warning-free. + +> Known risks: +> - Adding a variant to the `#[non_exhaustive]` `ErrorKind` can surface a **non-exhaustive `match`** in this or another crate that lacked a `_` arm. If `just check` reports one, add the missing arm — a `Class::Ignored`-equivalent "not a failure/trip" default where the match is a breaker/retry classifier, or the locally-correct value otherwise. (`retry.rs::is_transient` uses `matches!`, which is unaffected.) +> - `HttpError` is `#[non_exhaustive]`; its own `HasErrorKind` `match self` **is** exhaustive over `HttpError` and now covers the new arm. + +- [ ] **Step 5: Commit** + +```bash +git add crates/adapter/net/api/src/error_kind.rs crates/adapter/net/http/api/src/error.rs +git commit -m "feat(net): add CircuitOpen error kind + HttpError::CircuitOpen" +``` + +--- + +## Task 2: `CircuitBreakerConfig` + `Class`/`classify` + module scaffold + +**Files:** +- Create: `crates/adapter/net/http/api/src/circuit_breaker.rs` +- Modify: `crates/adapter/net/http/api/src/lib.rs` + +**Interfaces:** +- Consumes: `HttpError` (crate); `ErrorKind`, `HasErrorKind` (`oath_adapter_net_api`). +- Produces: + - `oath_adapter_net_http_api::CircuitBreakerConfig` — `struct { failure_threshold: NonZeroU32, cooldown: Duration, throttle_cooldown: Duration, half_open_probes: NonZeroU32 }` (`Debug`, `Clone`, `Copy`). Consumed by Tasks 3–4. + - `Class` (crate-private) — `enum { Failure, TripNow, Ignored, Success }` (`Debug`, `Clone`, `Copy`, `PartialEq`, `Eq`). Consumed by Task 3's `Breaker::record`. + - `classify` (crate-private) — `fn classify(&Result, HttpError>) -> Class`. Consumed by Task 4's Service. + +- [ ] **Step 1: Write the failing test** + +Create `crates/adapter/net/http/api/src/circuit_breaker.rs` with the module doc, the config, the classifier, and its table test: + +```rust +//! The `CircuitBreaker` resilience layer (ADR-0031 §5): the reactive 429/outage +//! backstop to `RateLimit`'s proactive pacing. +//! +//! `RateLimit` tries never to hit a 429; `CircuitBreaker` stops cold if the host +//! fails anyway. It trips **Open** after [`CircuitBreakerConfig::failure_threshold`] +//! consecutive transport failures (`HttpError::{Connection, Timeout}` or a `5xx` +//! response), or **immediately** on a `Throttled`/429 with the long +//! [`CircuitBreakerConfig::throttle_cooldown`] (IBKR's ~15-minute penalty box). +//! While Open it **fast-rejects** every request with a non-retryable +//! [`HttpError::CircuitOpen`](crate::HttpError::CircuitOpen) — the inner stack is +//! never touched. After the cooldown a bounded number of **Half-Open** probes test +//! recovery: a reached-host response closes the circuit, a failure re-opens it. +//! +//! The state machine lives in a pure, clock-injected [`Breaker`] (transitions take +//! `now: Instant` as an input, table-tested with zero async); the [`CircuitBreaker`] +//! service is a thin `Arc>` + [`Timer`](oath_adapter_net_api::Timer) +//! shell. A **single per-host** breaker is shared behind `Arc`. Runtime-neutral and +//! `now()`-only — the breaker never sleeps (Open→Half-Open is a lazy comparison on +//! the next admit), so there is no timer race and no new dependency. Body-transparent +//! — `http::Response` is forwarded untouched. + +use crate::HttpError; +use oath_adapter_net_api::{ErrorKind, HasErrorKind}; +use std::num::NonZeroU32; +use std::time::Duration; + +/// The circuit breaker's thresholds, as plain `Copy` data (ADR-0031 §5). +/// +/// `failure_threshold` and `half_open_probes` are `NonZeroU32`: "≥ 1" is a type +/// invariant, so [`CircuitBreakerLayer::new`](crate::CircuitBreakerLayer) needs no +/// `Result` (a `0` threshold is nonsense and `0` probes would leave a tripped +/// circuit stuck Open forever). This types §5's `u32` sketch more precisely. +#[derive(Debug, Clone, Copy)] +pub struct CircuitBreakerConfig { + /// Consecutive failures in the Closed state that trip the circuit Open. + pub failure_threshold: NonZeroU32, + /// The cooldown before Half-Open probing after a failure-threshold trip. + pub cooldown: Duration, + /// The (longer) cooldown after a `Throttled`/429 trip — the penalty box. + pub throttle_cooldown: Duration, + /// Probes admitted per Half-Open episode; all must reach the host to close. + pub half_open_probes: NonZeroU32, +} + +/// The breaker-relevant classification of one call outcome (pure, state-independent). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Class { + /// A genuine transport/server failure — advances the Closed trip counter. + Failure, + /// A throttle/429 — trips the circuit **immediately** on the long cooldown. + TripNow, + /// Neither a failure nor a trip (4xx, `Auth`, unclassified) — a no-op in Closed; + /// resolves a Half-Open probe (a reached host proves recovery). + Ignored, + /// A healthy `2xx`/`3xx` response — resets the streak / resolves a probe. + Success, +} + +/// Classify a call outcome for the breaker (ADR-0031 §5). +/// +/// Only genuine transport failures (`Connection`/`Timeout`) and `5xx` are +/// `Failure`; `Throttled`/429 is `TripNow`; a `4xx`/`Auth`/unclassified error is +/// `Ignored` (never trips **and never resets** — so an interleave cannot mask a +/// building outage); `2xx`/`3xx` is `Success`. `Unknown → Ignored` is the +/// conservative v1 default (the resilience4j fail-safe `Unknown → Failure` is a +/// documented future improvement). +pub(crate) fn classify(outcome: &Result, HttpError>) -> Class { + match outcome { + Err(e) => match e.kind() { + ErrorKind::Connection | ErrorKind::Timeout | ErrorKind::Server => Class::Failure, + ErrorKind::Throttled => Class::TripNow, + // Auth, Client, Unknown, CircuitOpen — and any future kind — are Ignored. + _ => Class::Ignored, + }, + Ok(resp) => { + let s = resp.status(); + if s == http::StatusCode::TOO_MANY_REQUESTS { + Class::TripNow + } else if s.is_server_error() { + Class::Failure + } else if s.is_client_error() { + Class::Ignored + } else { + Class::Success + } + } + } +} + +#[cfg(test)] +mod classify_tests { + use super::{Class, classify}; + use crate::HttpError; + + fn ok(status: u16) -> Result, HttpError> { + let mut r = http::Response::new(()); + *r.status_mut() = http::StatusCode::from_u16(status).unwrap(); + Ok(r) + } + + #[test] + fn transport_errors_and_5xx_are_failures() { + assert_eq!(classify::<()>(&Err(HttpError::Timeout)), Class::Failure); + assert_eq!(classify::<()>(&Err(HttpError::connection("reset"))), Class::Failure); + assert_eq!(classify(&ok(500)), Class::Failure); + assert_eq!(classify(&ok(503)), Class::Failure); + } + + #[test] + fn throttle_and_429_trip_now() { + assert_eq!(classify::<()>(&Err(HttpError::Throttled)), Class::TripNow); + assert_eq!(classify(&ok(429)), Class::TripNow); + } + + #[test] + fn client_errors_auth_and_unknown_are_ignored() { + assert_eq!(classify(&ok(400)), Class::Ignored); + assert_eq!(classify(&ok(404)), Class::Ignored); + assert_eq!(classify::<()>(&Err(HttpError::auth("expired"))), Class::Ignored); + assert_eq!(classify::<()>(&Err(HttpError::other("boom"))), Class::Ignored); + } + + #[test] + fn success_statuses_are_success() { + assert_eq!(classify(&ok(200)), Class::Success); + assert_eq!(classify(&ok(301)), Class::Success); + } +} +``` + +In `lib.rs`, add the module-doc bullet (insert **after** the `retry` bullet, before the `timeout` bullet), the `pub mod` (insert **after** `pub mod body;`, before `pub mod client;` — alphabetical), and the re-export (insert **after** the `body::{…}` re-export, before `pub use client::HttpClient;`): + +Module-doc bullet: + +```rust +//! - [`circuit_breaker`] — the `CircuitBreaker` layer, its `CircuitBreakerLayer` +//! factory, and the `CircuitBreakerConfig` thresholds +``` + +Module declaration: + +```rust +pub mod circuit_breaker; +``` + +Re-export: + +```rust +pub use circuit_breaker::CircuitBreakerConfig; +``` + +(Task 4 extends this to `pub use circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerLayer};`.) + +- [ ] **Step 2: Run it to verify it fails** + +Run: `just check` +Expected: initially FAIL if the module wiring is added before the file exists; once `circuit_breaker.rs` and the `lib.rs` wiring are both in place it compiles. The `classify_tests` are the real gate. + +- [ ] **Step 3: (implementation already written in Step 1)** + +`CircuitBreakerConfig`, `Class`, and `classify` are fully defined in Step 1 — there is no separate implementation step for this task. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cargo test -p oath-adapter-net-http-api circuit_breaker && just lint` +Expected: PASS, warning-free. + +> Known risks: +> - `classify::<()>(&Err(...))` needs the turbofish because `B` is otherwise unconstrained on the `Err` arm; the `ok(...)` helper pins `B = ()` on the `Ok` arm. +> - If clippy `missing_docs` fires, note `Class`/`classify` are `pub(crate)` (no doc required) but are documented anyway; `CircuitBreakerConfig` and its fields are `pub` and **must** be documented (they are). + +- [ ] **Step 5: Commit** + +```bash +git add crates/adapter/net/http/api/src/circuit_breaker.rs crates/adapter/net/http/api/src/lib.rs +git commit -m "feat(net): CircuitBreakerConfig + outcome classifier for the breaker" +``` + +--- + +## Task 3: the pure `Breaker` state machine + +**Files:** +- Modify: `crates/adapter/net/http/api/src/circuit_breaker.rs` + +**Interfaces:** +- Consumes: `CircuitBreakerConfig`, `Class` (Task 2). +- Produces (crate-private — **not** re-exported): + - `Admit` — `enum { Pass, Reject }` (`Debug`, `Clone`, `Copy`, `PartialEq`, `Eq`). + - `Breaker` — `struct` with `const fn new(cfg: CircuitBreakerConfig) -> Self`, `fn admit(&mut self, now: Instant) -> Admit`, `fn record(&mut self, class: Class, now: Instant)`. Task 4's shell owns one behind `Arc>`. + +- [ ] **Step 1: Write the failing tests** + +Append a **new** `#[cfg(test)]` module below `classify_tests` in `circuit_breaker.rs`: + +```rust +#[cfg(test)] +mod breaker_tests { + use super::{Admit, Breaker, CircuitBreakerConfig, Class}; + use std::num::NonZeroU32; + use std::time::{Duration, Instant}; + + fn cfg(threshold: u32, probes: u32) -> CircuitBreakerConfig { + CircuitBreakerConfig { + failure_threshold: NonZeroU32::new(threshold).unwrap(), + cooldown: Duration::from_secs(30), + throttle_cooldown: Duration::from_secs(900), + half_open_probes: NonZeroU32::new(probes).unwrap(), + } + } + + #[test] + fn closed_trips_after_threshold_consecutive_failures() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + assert_eq!(b.admit(now), Admit::Pass); + b.record(Class::Failure, now); + b.record(Class::Failure, now); + assert_eq!(b.admit(now), Admit::Pass, "still closed after 2 failures"); + b.record(Class::Failure, now); + assert_eq!(b.admit(now), Admit::Reject, "3rd consecutive failure → Open rejects"); + } + + #[test] + fn a_success_resets_the_failure_streak() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + b.record(Class::Failure, now); + b.record(Class::Failure, now); + b.record(Class::Success, now); // reset + b.record(Class::Failure, now); + b.record(Class::Failure, now); + assert_eq!(b.admit(now), Admit::Pass, "streak reset → not tripped"); + } + + #[test] + fn ignored_does_not_reset_the_streak() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + b.record(Class::Failure, now); + b.record(Class::Ignored, now); // a 4xx does NOT reset — anti-masking + b.record(Class::Failure, now); + b.record(Class::Failure, now); // 3rd failure overall → trips + assert_eq!(b.admit(now), Admit::Reject, "ignored left the streak intact → trips"); + } + + #[test] + fn throttle_trips_immediately_on_the_long_cooldown() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + b.record(Class::TripNow, now); // one throttle → Open, no threshold needed + assert_eq!(b.admit(now), Admit::Reject); + assert_eq!( + b.admit(now + Duration::from_secs(30)), + Admit::Reject, + "the short cooldown is insufficient for a throttle trip" + ); + assert_eq!( + b.admit(now + Duration::from_secs(900)), + Admit::Pass, + "throttle_cooldown elapsed → first probe admitted" + ); + } + + #[test] + fn open_rejects_until_cooldown_then_admits_one_probe() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); // trips on the first failure + b.record(Class::Failure, now); + assert_eq!(b.admit(now), Admit::Reject); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass, "cooldown elapsed → first probe"); + assert_eq!(b.admit(after), Admit::Reject, "concurrency gate: no 2nd probe"); + } + + #[test] + fn half_open_probe_success_closes() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass); + b.record(Class::Success, after); + assert_eq!(b.admit(after), Admit::Pass, "probe succeeded → closed"); + } + + #[test] + fn half_open_probe_ignored_also_closes() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass); + b.record(Class::Ignored, after); // a 4xx probe still proves the host is reachable + assert_eq!(b.admit(after), Admit::Pass, "ignored probe → closed (no stuck half-open)"); + } + + #[test] + fn half_open_probe_failure_reopens() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass); + b.record(Class::Failure, after); // probe fails → reopen with a fresh cooldown + assert_eq!(b.admit(after), Admit::Reject, "re-opened"); + assert_eq!( + b.admit(after + Duration::from_secs(30)), + Admit::Pass, + "the fresh cooldown runs from the failed probe" + ); + } + + #[test] + fn multi_probe_half_open_requires_all_to_close() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 2)); // 2 probes per episode + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass, "probe 1"); + assert_eq!(b.admit(after), Admit::Pass, "probe 2"); + assert_eq!(b.admit(after), Admit::Reject, "no probe 3 (gate)"); + b.record(Class::Success, after); // 1 of 2 + assert_eq!(b.admit(after), Admit::Reject, "still half-open, awaiting the 2nd"); + b.record(Class::Success, after); // 2 of 2 → close + assert_eq!(b.admit(after), Admit::Pass, "both probes reached → closed"); + } +} +``` + +- [ ] **Step 2: Run it to verify it fails** + +Run: `just check` +Expected: FAIL — `cannot find type Breaker`/`Admit` in module `circuit_breaker`. + +- [ ] **Step 3: Implement the state machine** + +Extend the top-of-file `use` block with `Instant`: + +```rust +use std::time::{Duration, Instant}; +``` + +(Replace the existing `use std::time::Duration;` line from Task 2 with the combined import above — keep a single copy.) + +Insert the types **between** the `classify` fn and the `classify_tests` module: + +```rust +/// The breaker's state (ADR-0031 §5). `Instant` deadlines are compared against +/// `Timer::now()` by the async shell — the core itself never reads a clock. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BreakerState { + /// Passing requests; `consecutive_failures` counts toward the trip threshold. + Closed { consecutive_failures: u32 }, + /// Rejecting fast until `reopen_at`; then the next admit begins Half-Open. + Open { reopen_at: Instant }, + /// Probing: `probes_left` may still be admitted, `successes_needed` must reach + /// the host before the circuit closes. + HalfOpen { probes_left: u32, successes_needed: u32 }, +} + +/// The admission verdict for one call. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Admit { + /// Admit the call to the inner stack. + Pass, + /// Reject the call fast with `CircuitOpen` — the inner stack is not touched. + Reject, +} + +/// The pure circuit-breaker state machine (ADR-0031 §5). +/// +/// Clock-injected: every transition takes `now: Instant` as an input, so the whole +/// unit is table-testable with zero async. The async [`CircuitBreaker`] shell owns +/// the `Mutex` and the `Timer`; this type holds neither. +#[derive(Debug, Clone)] +pub(crate) struct Breaker { + state: BreakerState, + cfg: CircuitBreakerConfig, +} + +impl Breaker { + /// A fresh breaker starts Closed with no failures. + pub(crate) const fn new(cfg: CircuitBreakerConfig) -> Self { + Self { + state: BreakerState::Closed { + consecutive_failures: 0, + }, + cfg, + } + } + + /// Decide whether to admit a call now, transitioning Open→Half-Open lazily. + pub(crate) fn admit(&mut self, now: Instant) -> Admit { + match &mut self.state { + BreakerState::Closed { .. } => Admit::Pass, + BreakerState::Open { reopen_at } => { + if now >= *reopen_at { + // Cooldown elapsed → begin a Half-Open episode; THIS call is the + // first probe (so `probes_left` starts one short of the budget). + let probes = self.cfg.half_open_probes.get(); + self.state = BreakerState::HalfOpen { + probes_left: probes - 1, + successes_needed: probes, + }; + Admit::Pass + } else { + Admit::Reject + } + } + BreakerState::HalfOpen { probes_left, .. } => { + if *probes_left > 0 { + *probes_left -= 1; + Admit::Pass + } else { + Admit::Reject // concurrency gate: no more than `half_open_probes` in flight + } + } + } + } + + /// Record a classified outcome, transitioning as ADR-0031 §5 dictates. + pub(crate) fn record(&mut self, class: Class, now: Instant) { + match self.state { + BreakerState::Closed { + consecutive_failures, + } => match class { + Class::Failure => { + let n = consecutive_failures.saturating_add(1); + self.state = if n >= self.cfg.failure_threshold.get() { + BreakerState::Open { + reopen_at: now + self.cfg.cooldown, + } + } else { + BreakerState::Closed { + consecutive_failures: n, + } + }; + } + Class::TripNow => { + self.state = BreakerState::Open { + reopen_at: now + self.cfg.throttle_cooldown, + }; + } + Class::Ignored => {} // streak untouched — a 4xx/Auth neither trips nor resets + Class::Success => { + self.state = BreakerState::Closed { + consecutive_failures: 0, + }; + } + }, + BreakerState::HalfOpen { + probes_left, + successes_needed, + } => match class { + Class::Failure => { + self.state = BreakerState::Open { + reopen_at: now + self.cfg.cooldown, + }; + } + Class::TripNow => { + self.state = BreakerState::Open { + reopen_at: now + self.cfg.throttle_cooldown, + }; + } + // A reached-host probe (2xx/3xx or 4xx/Auth) resolves; the last one closes. + Class::Ignored | Class::Success => { + self.state = if successes_needed <= 1 { + BreakerState::Closed { + consecutive_failures: 0, + } + } else { + BreakerState::HalfOpen { + probes_left, + successes_needed: successes_needed - 1, + } + }; + } + }, + // A stale outcome from a call admitted before a concurrent trip; drop it. + // Never un-trips a freshly-opened circuit (single global v1 breaker). + BreakerState::Open { .. } => {} + } + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `just check && cargo test -p oath-adapter-net-http-api circuit_breaker && just lint` +Expected: PASS, warning-free. + +> Known risks: +> - **`now + self.cfg.cooldown`** is `Instant + Duration`. A config cooldown so large it overflows `Instant` is not a real config; if clippy/overflow is a concern, it panics only in debug on an absurd (~292-billion-year) duration — acceptable, matching how `MockTimer`/`RateLimit` add `Duration` to `Instant`. Do **not** introduce `checked_add` here unless a test demands it (it would force an `Option` with no sensible fallback). +> - **`probes - 1` / `successes_needed - 1`** never underflow: `admit` only enters the `probes - 1` branch with `probes = half_open_probes.get() ≥ 1`, and the `successes_needed - 1` branch is guarded by `successes_needed <= 1 → close` (so the `else` has `successes_needed ≥ 2`). If clippy `arithmetic_side_effects` (nursery) flags them, they are provably safe; prefer a clarifying comment over `saturating_sub` (which would hide a logic bug). +> - **`consecutive_failures.saturating_add(1)`** guards the degenerate `failure_threshold = u32::MAX` case with no panic. +> - `Breaker`/`BreakerState`/`Admit` are `pub(crate)` — no `missing_docs`/`missing_debug_implementations` obligation, but they carry docs + `Debug` anyway. + +- [ ] **Step 5: Commit** + +```bash +git add crates/adapter/net/http/api/src/circuit_breaker.rs +git commit -m "feat(net): pure clock-injected Breaker state machine (Closed/Open/Half-Open)" +``` + +--- + +## Task 4: `CircuitBreaker` layer — the async shell + +**Files:** +- Modify: `crates/adapter/net/http/api/src/circuit_breaker.rs` +- Modify: `crates/adapter/net/http/api/src/lib.rs` + +**Interfaces:** +- Consumes: `CircuitBreakerConfig`, `Breaker`, `Admit`, `classify` (Tasks 2–3); `HttpError`, `Service` (crate); `Layer`, `Timer` (`oath_adapter_net_api`). +- Produces: + - `oath_adapter_net_http_api::CircuitBreakerLayer` — `impl Layer` factory; `pub fn new(cfg: CircuitBreakerConfig, timer: T) -> Self` (**infallible**; constructs the single shared `Arc>`). + - `oath_adapter_net_http_api::CircuitBreaker` — the wrapping `Service`; for an inner `S: Service, Response = http::Response, Error = HttpError> + Sync` and `T: Timer`, it is `Service, Response = http::Response, Error = HttpError>` (body-transparent — same `B`, no `B: Body` bound). + +- [ ] **Step 1: Write the failing tests** + +Append a **new** `#[cfg(test)]` module below `breaker_tests` in `circuit_breaker.rs`: + +```rust +#[cfg(test)] +mod service_tests { + use super::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerLayer}; + use crate::{HttpError, Service}; + use bytes::Bytes; + use oath_adapter_net_api::{ErrorKind, Layer}; + use oath_adapter_net_mock::MockTimer; + use std::future::Future; + use std::num::NonZeroU32; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + // One scripted outcome per attempt. `Copy` so the leaf reads it by index. + #[derive(Clone, Copy)] + enum Step { + Err(ErrorKind), + Status(u16), + } + + fn err_of(kind: ErrorKind) -> HttpError { + match kind { + ErrorKind::Timeout => HttpError::Timeout, + ErrorKind::Connection => HttpError::connection("reset"), + ErrorKind::Throttled => HttpError::Throttled, + ErrorKind::Auth => HttpError::auth("expired"), + _ => HttpError::other("boom"), + } + } + + // An inline leaf yielding a scripted sequence of outcomes, counting calls. Once + // the script is exhausted it repeats the last step. Body is `()` — the breaker + // only reads `status()`, never the body. Inline (not `MockClient`) to avoid the + // net-http-mock -> net-http-api dev-dep cycle. + #[derive(Clone)] + struct ScriptLeaf { + steps: Arc>, + calls: Arc, + } + impl ScriptLeaf { + fn new(steps: Vec) -> Self { + Self { + steps: Arc::new(steps), + calls: Arc::new(AtomicUsize::new(0)), + } + } + fn calls(&self) -> usize { + self.calls.load(Ordering::Relaxed) + } + } + impl Service> for ScriptLeaf { + type Response = http::Response<()>; + type Error = HttpError; + #[allow(clippy::manual_async_fn)] + fn call( + &self, + _req: http::Request, + ) -> impl Future> + Send { + let i = self.calls.fetch_add(1, Ordering::Relaxed); + let step = self + .steps + .get(i) + .copied() + .unwrap_or_else(|| *self.steps.last().unwrap()); + async move { + match step { + Step::Err(kind) => Err(err_of(kind)), + Step::Status(code) => { + let mut resp = http::Response::new(()); + *resp.status_mut() = http::StatusCode::from_u16(code).unwrap(); + Ok(resp) + } + } + } + } + } + + fn cfg(threshold: u32, cooldown: Duration, throttle: Duration, probes: u32) -> CircuitBreakerConfig { + CircuitBreakerConfig { + failure_threshold: NonZeroU32::new(threshold).unwrap(), + cooldown, + throttle_cooldown: throttle, + half_open_probes: NonZeroU32::new(probes).unwrap(), + } + } + + fn secs(n: u64) -> Duration { + Duration::from_secs(n) + } + + fn bare_req() -> http::Request { + http::Request::new(Bytes::new()) + } + + #[tokio::test] + async fn trips_after_threshold_then_fast_rejects_without_touching_the_leaf() { + let leaf = ScriptLeaf::new(vec![Step::Err(ErrorKind::Connection)]); // always fails + let svc = CircuitBreakerLayer::new(cfg(3, secs(30), secs(900), 1), MockTimer::new()) + .layer(leaf.clone()); + for _ in 0..3 { + let _ = svc.call(bare_req()).await; // 3 consecutive failures trip it + } + assert_eq!(leaf.calls(), 3); + let err = svc.call(bare_req()).await.unwrap_err(); + assert!(matches!(err, HttpError::CircuitOpen)); + assert_eq!(leaf.calls(), 3, "an open circuit fast-rejects; the leaf is untouched"); + } + + #[tokio::test] + async fn a_single_429_trips_immediately_on_the_long_cooldown() { + let timer = MockTimer::new(); + let leaf = ScriptLeaf::new(vec![Step::Status(429), Step::Status(200)]); + let svc = CircuitBreakerLayer::new(cfg(3, secs(30), secs(900), 1), timer.clone()) + .layer(leaf.clone()); + let resp = svc.call(bare_req()).await.expect("429 returns as Ok"); + assert_eq!(resp.status(), http::StatusCode::TOO_MANY_REQUESTS); + assert!( + matches!(svc.call(bare_req()).await.unwrap_err(), HttpError::CircuitOpen), + "one 429 trips the circuit" + ); + timer.advance(secs(30)); // the SHORT cooldown is not enough for a throttle trip + assert!(matches!(svc.call(bare_req()).await.unwrap_err(), HttpError::CircuitOpen)); + timer.advance(secs(900)); // now past throttle_cooldown + let resp = svc.call(bare_req()).await.expect("probe admitted, leaf returns 200"); + assert_eq!(resp.status(), http::StatusCode::OK); + assert_eq!(leaf.calls(), 2, "one 429 + one probe; the fast-rejects never hit the leaf"); + } + + #[tokio::test] + async fn recovers_when_the_cooldown_probe_succeeds() { + let timer = MockTimer::new(); + let leaf = ScriptLeaf::new(vec![ + Step::Err(ErrorKind::Timeout), + Step::Err(ErrorKind::Timeout), + Step::Status(200), + ]); + let svc = CircuitBreakerLayer::new(cfg(2, secs(30), secs(900), 1), timer.clone()) + .layer(leaf.clone()); + let _ = svc.call(bare_req()).await; // fail 1 + let _ = svc.call(bare_req()).await; // fail 2 → Open + assert!(matches!(svc.call(bare_req()).await.unwrap_err(), HttpError::CircuitOpen)); + timer.advance(secs(30)); + let ok = svc.call(bare_req()).await.expect("probe hits the leaf → 200"); + assert_eq!(ok.status(), http::StatusCode::OK); + let ok2 = svc.call(bare_req()).await.expect("closed → next call flows"); + assert_eq!(ok2.status(), http::StatusCode::OK); + assert_eq!(leaf.calls(), 4, "2 failures + 2 post-recovery sends; rejects skip the leaf"); + } + + #[tokio::test] + async fn reopens_when_the_cooldown_probe_fails() { + let timer = MockTimer::new(); + let leaf = ScriptLeaf::new(vec![ + Step::Err(ErrorKind::Connection), + Step::Err(ErrorKind::Connection), + Step::Status(503), + ]); + let svc = CircuitBreakerLayer::new(cfg(2, secs(30), secs(900), 1), timer.clone()) + .layer(leaf.clone()); + let _ = svc.call(bare_req()).await; + let _ = svc.call(bare_req()).await; // Open + assert!(matches!(svc.call(bare_req()).await.unwrap_err(), HttpError::CircuitOpen)); + timer.advance(secs(30)); + let resp = svc.call(bare_req()).await.expect("probe returns a 503 as Ok"); + assert_eq!(resp.status(), 503); + assert!( + matches!(svc.call(bare_req()).await.unwrap_err(), HttpError::CircuitOpen), + "the probe failed → re-opened" + ); + assert_eq!(leaf.calls(), 3); + } + + #[tokio::test] + async fn clones_from_one_layer_share_the_breaker() { + let leaf = ScriptLeaf::new(vec![Step::Err(ErrorKind::Connection)]); + let layer = CircuitBreakerLayer::new(cfg(2, secs(30), secs(900), 1), MockTimer::new()); + let a = layer.layer(leaf.clone()); + let b = a.clone(); // shares the Arc> + let _ = a.call(bare_req()).await; // fail 1 via A + let _ = a.call(bare_req()).await; // fail 2 via A → Open + assert!( + matches!(b.call(bare_req()).await.unwrap_err(), HttpError::CircuitOpen), + "clone B observes A's trip (single per-host breaker)" + ); + } +} +``` + +- [ ] **Step 2: Run it to verify it fails** + +Run: `just check` +Expected: FAIL — `cannot find type CircuitBreaker`/`CircuitBreakerLayer` in module `circuit_breaker`. + +- [ ] **Step 3: Implement the shell** + +Extend the top-of-file `use` block (merge with the Task 2/3 imports — keep one copy of each): + +```rust +use crate::{HttpError, Service}; +use bytes::Bytes; +use oath_adapter_net_api::{ErrorKind, HasErrorKind, Layer, Timer}; +use std::fmt; +use std::future::Future; +use std::num::NonZeroU32; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +``` + +Insert the layer + service **between** the `Breaker` impl (Task 3) and the `classify_tests` module: + +```rust +/// The `CircuitBreaker` [`Layer`] factory: holds the single shared breaker + clock. +/// +/// `new` constructs the breaker **once** into an `Arc>`; every service it +/// produces (and every clone) shares it — a single per-host breaker (ADR-0031 §5). +pub struct CircuitBreakerLayer { + breaker: Arc>, + timer: T, +} + +impl CircuitBreakerLayer { + /// Build the layer from thresholds and a [`Timer`] clock. + /// + /// **Infallible** — `NonZeroU32` makes the two counts "≥ 1" a type invariant + /// (contrast `RateLimitLayer::new`, which validates a config map). Not `const`: + /// it allocates the shared `Arc>`. + #[must_use] + pub fn new(cfg: CircuitBreakerConfig, timer: T) -> Self { + Self { + breaker: Arc::new(Mutex::new(Breaker::new(cfg))), + timer, + } + } +} + +impl Clone for CircuitBreakerLayer { + fn clone(&self) -> Self { + Self { + breaker: Arc::clone(&self.breaker), + timer: self.timer.clone(), + } + } +} + +impl fmt::Debug for CircuitBreakerLayer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CircuitBreakerLayer").finish_non_exhaustive() + } +} + +impl Layer for CircuitBreakerLayer { + type Service = CircuitBreaker; + + fn layer(&self, inner: S) -> CircuitBreaker { + CircuitBreaker { + inner, + breaker: Arc::clone(&self.breaker), + timer: self.timer.clone(), + } + } +} + +/// The `CircuitBreaker` middleware: fast-rejects while Open, else forwards. +/// +/// A thin shell over the pure [`Breaker`]: it locks briefly to `admit` (using +/// `timer.now()`), releases the lock, runs `inner.call` (or returns `CircuitOpen`), +/// then locks briefly to `record` the classified outcome. The lock is **never** +/// held across the `await`. Body-transparent — `http::Response` is forwarded. +pub struct CircuitBreaker { + inner: S, + breaker: Arc>, + timer: T, +} + +impl Clone for CircuitBreaker { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + breaker: Arc::clone(&self.breaker), + timer: self.timer.clone(), + } + } +} + +impl fmt::Debug for CircuitBreaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CircuitBreaker").finish_non_exhaustive() + } +} + +impl Service> for CircuitBreaker +where + S: Service, Response = http::Response, Error = HttpError> + Sync, + T: Timer, +{ + type Response = http::Response; + type Error = HttpError; + + // Not `async fn`: the trait requires the returned future to be `Send`. + #[allow(clippy::manual_async_fn)] + fn call( + &self, + req: http::Request, + ) -> impl Future> + Send { + async move { + // Admit decision under a short lock (released at the end of this block). + let admit = { + let now = self.timer.now(); + let mut breaker = self + .breaker + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + breaker.admit(now) + }; + if let Admit::Reject = admit { + return Err(HttpError::CircuitOpen); // fast reject — the leaf is not touched + } + + let outcome = self.inner.call(req).await; // NO lock held across the await + + // Record the classified outcome under a second short lock. + let class = classify(&outcome); + { + let now = self.timer.now(); + let mut breaker = self + .breaker + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + breaker.record(class, now); + } + outcome + } + } +} +``` + +In `lib.rs`, extend the Task 2 re-export: + +```rust +pub use circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerLayer}; +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `just check && cargo test -p oath-adapter-net-http-api circuit_breaker && just lint` +Expected: PASS, warning-free. + +> Known risks: +> - **No `B: Send` bound is needed** (unlike `Retry`): the only `.await` is `inner.call(req)`, and `outcome` is created *after* it with no subsequent await, so `B` never crosses an await point. If a future rustc generator analysis nonetheless demands it, add `B: Send` to the `where` clause (as `Retry`/`RateLimit` carry) — harmless. +> - **`S: Sync`** because the returned `Send` future borrows `&self`; `T: Sync` via `Timer`, and `Arc>: Sync`. Same bound the sibling layers carry. +> - **The `Mutex` guard must not cross the `await`** — each `lock()` is confined to its own `{ … }` block that ends before `inner.call(req).await` (admit block) or contains no await (record block). If clippy `await_holding_lock` fires, a guard escaped its block — re-scope it. +> - **`CircuitBreakerLayer::new` is intentionally not `const`** (it calls `Arc::new`). If clippy `missing_const_for_fn` flags it, that is a false positive here — leave it non-`const`. +> - The tests need **no** `spawn`/`yield`/`drain` (contrast `retry.rs`): the breaker never sleeps, so every `svc.call(...).await` resolves synchronously against the leaf; `MockTimer::advance` only moves `now()` for the lazy Open→Half-Open check between calls. + +- [ ] **Step 5: Commit** + +```bash +git add crates/adapter/net/http/api/src/circuit_breaker.rs crates/adapter/net/http/api/src/lib.rs +git commit -m "feat(net): CircuitBreaker layer — Arc> Service shell over the leaf" +``` + +--- + +## Task 5: ADR amendment, CHANGELOG, full gate, PR + +**Files:** +- Modify: `docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md` +- Modify: `CHANGELOG.md` + +- [ ] **Step 1: ADR-0034 append-only amendment** + +Append to ADR-0034's **Amendments (2026-07-04)** numbered list (after item 8, the `Retry` note) a new item 9: + +```markdown +9. **`CircuitBreaker` layer (Slice 1 PR 4).** The `CircuitBreaker` layer + + `CircuitBreakerLayer` factory add the **reactive** backstop to `RateLimit`'s + proactive pacing (ADR-0031 §5). A pure, clock-injected `Breaker` state machine + (Closed/Open/Half-Open) — table-tested with zero async — sits behind a thin + `Arc>` + `Timer` Service shell. It trips **Open** on + `CircuitBreakerConfig::failure_threshold` consecutive `Connection`/`Timeout`/`5xx` + failures, or **immediately** on a `Throttled`/429 with the long `throttle_cooldown` + (IBKR's penalty box); while Open it **fast-rejects** with a **new non-retryable + `HttpError::CircuitOpen` / `ErrorKind::CircuitOpen`** without touching the inner + stack; after the cooldown it admits `half_open_probes` **Half-Open** probes (a + reached-host outcome closes, a failure re-opens). Outcomes are a **4-class + partition**: `Connection`/`Timeout`/`5xx` → *Failure*; `Throttled`/429 → + *TripNow*; `4xx`/`Auth`/`Unknown` → *Ignored* (never trips **and never resets** — + so an interleave cannot mask a building outage; an `Auth` error must not trip the + gateway); `2xx`/`3xx` → *Success*. `failure_threshold`/`half_open_probes` are + `NonZeroU32` (typing §5's `u32` — "≥ 1" a type invariant, infallible `new`). A + **single per-host** breaker shared behind `Arc`; **consecutive-count** for v1; + `now()`-only timing (lazy Open→Half-Open, no sleep, no `futures-util`, no new + dependency). It sits **outside `Retry`**, counting logical post-retry outcomes. + Deferred: the resilience4j fail-safe `Unknown → Failure`, rolling-window counting, + per-key breakers, and a breaker-state observation watch. +``` + +- [ ] **Step 2: CHANGELOG** + +Add to `CHANGELOG.md` `[Unreleased] → Added` (after the Retry resilience-layer entry #82): + +```markdown +- `oath-adapter-net-http-api` `CircuitBreaker` resilience layer (Slice 1 PR 4) — the + `CircuitBreaker` service + `CircuitBreakerLayer` factory (`net-api::Layer`): + the reactive backstop to `RateLimit`. Trips Open after `failure_threshold` consecutive + `Connection`/`Timeout`/`5xx` failures, or immediately on a `Throttled`/429 with the long + `throttle_cooldown`; fast-rejects with a new non-retryable `HttpError::CircuitOpen` + (mapped to a new `ErrorKind::CircuitOpen`) without touching the inner stack; admits + bounded Half-Open probes after cooldown (reached-host closes, failure re-opens). Pure + clock-injected `Breaker` state machine (Closed/Open/Half-Open) behind a thin + `Arc>` + `Timer` shell; single per-host breaker; `now()`-only (no sleep, + no new dependency). 4-class outcome partition so `4xx`/`Auth`/unclassified errors neither + trip nor mask an outage. (ADR-0031 §5, ADR-0034.) +``` + +- [ ] **Step 3: Full local gate** + +Run: `just ci` +Expected: green (fmt, lint, test + doctests, doc, deny, typos, machete — no new dep, so `deny`/`machete` are unaffected). + +- [ ] **Step 4: Commit, push, PR** + +```bash +git add docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md CHANGELOG.md +git commit -m "docs(net): record CircuitBreaker layer amendment (ADR-0034 #9) + changelog" +git push -u origin feat/net-http-circuit-breaker +gh pr create \ + --title "feat(net): CircuitBreaker resilience layer (Slice 1, PR 4)" \ + --body "Closes # + +Slice 1 **PR 4** of the net-http resilience layers (spec: docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md; ADR-0031 §5). Builds on RateLimit (#76), Timeout (#78), Retry (#82). + +- **\`CircuitBreaker\`** + **\`CircuitBreakerLayer\`** (\`net-api::Layer\`) — the **reactive** backstop to \`RateLimit\`. Trips **Open** after \`failure_threshold\` consecutive \`Connection\`/\`Timeout\`/\`5xx\` failures, or **immediately** on a \`Throttled\`/429 with the long \`throttle_cooldown\` (IBKR's penalty box); **fast-rejects** with a new non-retryable **\`HttpError::CircuitOpen\`** / **\`ErrorKind::CircuitOpen\`** without touching the inner stack; admits bounded **Half-Open** probes after cooldown (reached-host closes, failure re-opens). +- **Pure clock-injected \`Breaker\`** state machine (Closed/Open/Half-Open), table-tested with zero async; thin **\`Arc>\`** + \`Timer\` Service shell, **single per-host** breaker. +- **4-class outcome partition** (Failure / TripNow / Ignored / Success): \`4xx\`/\`Auth\`/unclassified errors neither trip **nor reset** the streak, so an interleave cannot mask a building outage; \`Unknown → Ignored\` for v1 (resilience4j fail-safe recorded as a future improvement). +- **Body-transparent**; sits **outside \`Retry\`** (counts logical post-retry outcomes). \`now()\`-only timing — **no sleep, no \`futures-util\`, no new dependency**; the \`net-api\` contract crate gains one \`ErrorKind\` variant. Recorded as **ADR-0034 Amendment #9**. + +MockTimer-driven tests: pure-\`Breaker\` table tests (threshold trip, streak reset, anti-masking, throttle cooldown, half-open close/reopen, concurrency gate) + Service integration (fast-reject with the leaf frozen, immediate-429 trip, cooldown recovery/reopen, shared state across clones). + +Next: **Slice 1 PR 5** — the \`Tracing\` layer. + +🤖 Generated with [Claude Code](https://claude.com/claude-code)" +``` + +Expected: PR open, GitHub Actions CI green (same `just ci` + MSRV job). + +--- + +## Self-Review + +**Spec coverage (design doc §Scope + Decisions):** +- `CircuitBreaker` + `CircuitBreakerLayer` (`Layer`), infallible `new`, single-per-host `Arc` sharing — Task 4. ✅ +- `CircuitBreakerConfig` (`failure_threshold: NonZeroU32`, `cooldown`, `throttle_cooldown`, `half_open_probes: NonZeroU32`) — Task 2. ✅ +- Pure `Breaker`/`BreakerState` core, `admit(now) -> Admit`, `record(class, now)`, the full transition matrix — Task 3 (+ pure table tests). ✅ +- 4-class `classify` partition (Failure/TripNow/Ignored/Success; `Unknown → Ignored`) — Task 2 (+ `classify_tests`). ✅ +- New `HttpError::CircuitOpen` + new `ErrorKind::CircuitOpen`, non-retryable — Task 1. ✅ +- Thin shell: admit under lock → release → `inner.call().await` → record under lock; lock never across await; poison recovered not unwrapped; `now()`-only, no sleep — Task 4. ✅ +- Fast-reject leaves the leaf untouched; body-transparent (same `B`, no `B: Body` bound); `S: Sync` — Task 4. ✅ +- Sits outside `Retry` (counts logical outcomes) — documented in the module doc + amendment (composition is Slice 2, correctly not built here). ✅ +- ADR-0034 Amendment #9 + CHANGELOG — Task 5. ✅ +- Deferred (correctly absent): `Unknown → Failure` fail-safe, rolling-window counting, per-key breakers, state-watch, `stack()`/`build()` assembly, `Tracing` — noted, not built. ✅ + +**Placeholder scan:** none — every step carries actual code or an actual command with expected output (`#` is the PR-time issue number, per house convention; `#82` in the CHANGELOG anchor is the merged Retry PR). + +**Type consistency:** +- `CircuitBreakerConfig { failure_threshold: NonZeroU32, cooldown, throttle_cooldown, half_open_probes: NonZeroU32 }` — identical in Task 2's def, Task 3's `Breaker::{new, admit, record}` field reads, and both test `cfg(...)` helpers. +- `Class { Failure, TripNow, Ignored, Success }` — defined Task 2; consumed by Task 3's `record` (all four arms in Closed and Half-Open) and asserted in `classify_tests`. +- `Admit { Pass, Reject }` — defined Task 3; produced by `admit`, matched in Task 4's shell (`if let Admit::Reject`). +- `Breaker::{new(cfg) -> Self, admit(&mut self, Instant) -> Admit, record(&mut self, Class, Instant)}` — defined Task 3, used by Task 4's shell and Task 3's tests. +- `classify::(&Result, HttpError>) -> Class` — defined Task 2, called in Task 4's shell (`classify(&outcome)`) and `classify_tests`. +- `CircuitBreakerLayer::new(CircuitBreakerConfig, T) -> Self` + `.layer(inner) -> CircuitBreaker` — match the `Interfaces` block and every `service_tests` call. +- `CircuitBreaker` `Service` impl: inner `Response = http::Response` → `Response = http::Response` (transparent) — matches `ScriptLeaf` (`B = ()`). +- `HttpError::CircuitOpen` / `ErrorKind::CircuitOpen` — defined Task 1, produced in Task 4's shell, asserted in Task 1's `kind_maps_each_variant` row and every `service_tests` reject assertion. +- `lib.rs` re-export accumulates to `pub use circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerLayer};`; `Class`/`classify`/`Breaker`/`BreakerState`/`Admit` stay crate-private (not re-exported). + +**Known risks to watch during impl:** listed inline — Task 1 Step 4 (non-exhaustive `match` on the widened `#[non_exhaustive]` enum), Task 3 Step 4 (`Instant + Duration`, provably-safe `- 1`, `saturating_add`), Task 4 Step 4 (no `B: Send`, `S: Sync`, `await_holding_lock` re-scoping, non-`const` `new`, no spawn/drain in tests). From e3a58e6ee07cc49d7bd22ccac759410243b35e4e Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 19:40:53 +0000 Subject: [PATCH 03/11] feat(net): add CircuitOpen error kind + HttpError::CircuitOpen Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/adapter/net/api/src/error_kind.rs | 6 ++++++ crates/adapter/net/http/api/src/error.rs | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/crates/adapter/net/api/src/error_kind.rs b/crates/adapter/net/api/src/error_kind.rs index 32c5b81..3663e78 100644 --- a/crates/adapter/net/api/src/error_kind.rs +++ b/crates/adapter/net/api/src/error_kind.rs @@ -33,6 +33,12 @@ pub enum ErrorKind { /// The error does not fit any other category. Unknown, + + /// A circuit breaker rejected the request without sending it — the breaker is + /// Open after prior failures (or a throttle) and is failing fast until its + /// cooldown elapses. A deliberate local decision, not a transport outcome; + /// non-retryable. + CircuitOpen, } /// Implemented by error types that can be classified as an [`ErrorKind`]. diff --git a/crates/adapter/net/http/api/src/error.rs b/crates/adapter/net/http/api/src/error.rs index 37a98c7..a555bcf 100644 --- a/crates/adapter/net/http/api/src/error.rs +++ b/crates/adapter/net/http/api/src/error.rs @@ -31,6 +31,9 @@ pub enum HttpError { /// A backend error that does not fit another variant. #[error("network error")] Other(#[source] BoxError), + /// The circuit breaker is open — the request was rejected without being sent. + #[error("circuit open: request rejected without being sent")] + CircuitOpen, } impl HttpError { @@ -61,6 +64,7 @@ impl HasErrorKind for HttpError { Self::Throttled => ErrorKind::Throttled, Self::Auth(_) => ErrorKind::Auth, Self::Other(_) => ErrorKind::Unknown, + Self::CircuitOpen => ErrorKind::CircuitOpen, } } } @@ -77,6 +81,7 @@ mod tests { assert_eq!(HttpError::Throttled.kind(), ErrorKind::Throttled); assert_eq!(HttpError::auth("expired").kind(), ErrorKind::Auth); assert_eq!(HttpError::other("boom").kind(), ErrorKind::Unknown); + assert_eq!(HttpError::CircuitOpen.kind(), ErrorKind::CircuitOpen); } #[test] From bf80c44a88602d03e55f44fb87f4afb49641d38a Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 19:46:24 +0000 Subject: [PATCH 04/11] feat(net): CircuitBreakerConfig + outcome classifier for the breaker Co-Authored-By: Claude Opus 4.8 (1M context) --- .../net/http/api/src/circuit_breaker.rs | 140 ++++++++++++++++++ crates/adapter/net/http/api/src/lib.rs | 4 + 2 files changed, 144 insertions(+) create mode 100644 crates/adapter/net/http/api/src/circuit_breaker.rs diff --git a/crates/adapter/net/http/api/src/circuit_breaker.rs b/crates/adapter/net/http/api/src/circuit_breaker.rs new file mode 100644 index 0000000..4449c0a --- /dev/null +++ b/crates/adapter/net/http/api/src/circuit_breaker.rs @@ -0,0 +1,140 @@ +//! The `CircuitBreaker` resilience layer (ADR-0031 §5): the reactive 429/outage +//! backstop to `RateLimit`'s proactive pacing. +//! +//! `RateLimit` tries never to hit a 429; `CircuitBreaker` stops cold if the host +//! fails anyway. It trips **Open** after [`CircuitBreakerConfig::failure_threshold`] +//! consecutive transport failures (`HttpError::{Connection, Timeout}` or a `5xx` +//! response), or **immediately** on a `Throttled`/429 with the long +//! [`CircuitBreakerConfig::throttle_cooldown`] (IBKR's ~15-minute penalty box). +//! While Open it **fast-rejects** every request with a non-retryable +//! [`HttpError::CircuitOpen`](crate::HttpError::CircuitOpen) — the inner stack is +//! never touched. After the cooldown a bounded number of **Half-Open** probes test +//! recovery: a reached-host response closes the circuit, a failure re-opens it. +//! +//! The state machine lives in a pure, clock-injected [`Breaker`] (transitions take +//! `now: Instant` as an input, table-tested with zero async); the [`CircuitBreaker`] +//! service is a thin `Arc>` + [`Timer`](oath_adapter_net_api::Timer) +//! shell. A **single per-host** breaker is shared behind `Arc`. Runtime-neutral and +//! `now()`-only — the breaker never sleeps (Open→Half-Open is a lazy comparison on +//! the next admit), so there is no timer race and no new dependency. Body-transparent +//! — `http::Response` is forwarded untouched. + +use crate::HttpError; +use oath_adapter_net_api::{ErrorKind, HasErrorKind}; +use std::num::NonZeroU32; +use std::time::Duration; + +/// The circuit breaker's thresholds, as plain `Copy` data (ADR-0031 §5). +/// +/// `failure_threshold` and `half_open_probes` are `NonZeroU32`: "≥ 1" is a type +/// invariant, so [`CircuitBreakerLayer::new`](crate::CircuitBreakerLayer) needs no +/// `Result` (a `0` threshold is nonsense and `0` probes would leave a tripped +/// circuit stuck Open forever). This types §5's `u32` sketch more precisely. +#[derive(Debug, Clone, Copy)] +pub struct CircuitBreakerConfig { + /// Consecutive failures in the Closed state that trip the circuit Open. + pub failure_threshold: NonZeroU32, + /// The cooldown before Half-Open probing after a failure-threshold trip. + pub cooldown: Duration, + /// The (longer) cooldown after a `Throttled`/429 trip — the penalty box. + pub throttle_cooldown: Duration, + /// Probes admitted per Half-Open episode; all must reach the host to close. + pub half_open_probes: NonZeroU32, +} + +/// The breaker-relevant classification of one call outcome (pure, state-independent). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(dead_code)] +pub(crate) enum Class { + /// A genuine transport/server failure — advances the Closed trip counter. + Failure, + /// A throttle/429 — trips the circuit **immediately** on the long cooldown. + TripNow, + /// Neither a failure nor a trip (4xx, `Auth`, unclassified) — a no-op in Closed; + /// resolves a Half-Open probe (a reached host proves recovery). + Ignored, + /// A healthy `2xx`/`3xx` response — resets the streak / resolves a probe. + Success, +} + +/// Classify a call outcome for the breaker (ADR-0031 §5). +/// +/// Only genuine transport failures (`Connection`/`Timeout`) and `5xx` are +/// `Failure`; `Throttled`/429 is `TripNow`; a `4xx`/`Auth`/unclassified error is +/// `Ignored` (never trips **and never resets** — so an interleave cannot mask a +/// building outage); `2xx`/`3xx` is `Success`. `Unknown → Ignored` is the +/// conservative v1 default (the resilience4j fail-safe `Unknown → Failure` is a +/// documented future improvement). +#[allow(dead_code)] +pub(crate) fn classify(outcome: &Result, HttpError>) -> Class { + match outcome { + Err(e) => match e.kind() { + ErrorKind::Connection | ErrorKind::Timeout | ErrorKind::Server => Class::Failure, + ErrorKind::Throttled => Class::TripNow, + // Auth, Client, Unknown, CircuitOpen — and any future kind — are Ignored. + _ => Class::Ignored, + }, + Ok(resp) => { + let s = resp.status(); + if s == http::StatusCode::TOO_MANY_REQUESTS { + Class::TripNow + } else if s.is_server_error() { + Class::Failure + } else if s.is_client_error() { + Class::Ignored + } else { + Class::Success + } + }, + } +} + +#[cfg(test)] +mod classify_tests { + use super::{Class, classify}; + use crate::HttpError; + + #[allow(clippy::unnecessary_wraps)] + fn ok(status: u16) -> Result, HttpError> { + let mut r = http::Response::new(()); + *r.status_mut() = http::StatusCode::from_u16(status).unwrap(); + Ok(r) + } + + #[test] + fn transport_errors_and_5xx_are_failures() { + assert_eq!(classify::<()>(&Err(HttpError::Timeout)), Class::Failure); + assert_eq!( + classify::<()>(&Err(HttpError::connection("reset"))), + Class::Failure + ); + assert_eq!(classify(&ok(500)), Class::Failure); + assert_eq!(classify(&ok(503)), Class::Failure); + } + + #[test] + fn throttle_and_429_trip_now() { + assert_eq!(classify::<()>(&Err(HttpError::Throttled)), Class::TripNow); + assert_eq!(classify(&ok(429)), Class::TripNow); + } + + #[test] + fn client_errors_auth_and_unknown_are_ignored() { + assert_eq!(classify(&ok(400)), Class::Ignored); + assert_eq!(classify(&ok(404)), Class::Ignored); + assert_eq!( + classify::<()>(&Err(HttpError::auth("expired"))), + Class::Ignored + ); + assert_eq!( + classify::<()>(&Err(HttpError::other("boom"))), + Class::Ignored + ); + } + + #[test] + fn success_statuses_are_success() { + assert_eq!(classify(&ok(200)), Class::Success); + assert_eq!(classify(&ok(301)), Class::Success); + } +} diff --git a/crates/adapter/net/http/api/src/lib.rs b/crates/adapter/net/http/api/src/lib.rs index 203793e..2d08dbe 100644 --- a/crates/adapter/net/http/api/src/lib.rs +++ b/crates/adapter/net/http/api/src/lib.rs @@ -14,6 +14,8 @@ //! the `RateScope`/`Scope` per-request directive //! - [`retry`] — the `Retry` layer, its `RetryLayer` factory, and the //! `Retryable`/`RetryConfig` retry directive + schedule +//! - [`circuit_breaker`] — the `CircuitBreaker` layer, its `CircuitBreakerLayer` +//! factory, and the `CircuitBreakerConfig` thresholds //! - [`timeout`] — the `Timeout` layer, its `TimeoutLayer` factory, and the //! `RequestTimeout` per-request override //! @@ -23,6 +25,7 @@ pub mod auth; pub mod body; +pub mod circuit_breaker; pub mod client; pub mod error; pub mod rate; @@ -33,6 +36,7 @@ pub mod timeout; pub use auth::{Auth, AuthSource, NoAuth, SetHeaders}; pub use body::{BufferMode, Guarded, ResponseBody}; +pub use circuit_breaker::CircuitBreakerConfig; pub use client::HttpClient; pub use error::{BoxError, HttpError}; pub use rate::{ From b9986195c68dd883c8d10dc327cb892b951eee9c Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 19:56:29 +0000 Subject: [PATCH 05/11] docs(net): add just doc + dead_code-cleanup to CB plan verification Co-Authored-By: Claude Opus 4.8 (1M context) --- .../plans/2026-07-04-net-http-circuitbreaker-layer.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md b/docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md index f03b6c0..0159b4a 100644 --- a/docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md +++ b/docs/superpowers/plans/2026-07-04-net-http-circuitbreaker-layer.md @@ -642,7 +642,7 @@ impl Breaker { - [ ] **Step 4: Run tests to verify they pass** -Run: `just check && cargo test -p oath-adapter-net-http-api circuit_breaker && just lint` +Run: `just check && cargo test -p oath-adapter-net-http-api circuit_breaker && just lint && just doc` Expected: PASS, warning-free. > Known risks: @@ -650,6 +650,7 @@ Expected: PASS, warning-free. > - **`probes - 1` / `successes_needed - 1`** never underflow: `admit` only enters the `probes - 1` branch with `probes = half_open_probes.get() ≥ 1`, and the `successes_needed - 1` branch is guarded by `successes_needed <= 1 → close` (so the `else` has `successes_needed ≥ 2`). If clippy `arithmetic_side_effects` (nursery) flags them, they are provably safe; prefer a clarifying comment over `saturating_sub` (which would hide a logic bug). > - **`consecutive_failures.saturating_add(1)`** guards the degenerate `failure_threshold = u32::MAX` case with no panic. > - `Breaker`/`BreakerState`/`Admit` are `pub(crate)` — no `missing_docs`/`missing_debug_implementations` obligation, but they carry docs + `Debug` anyway. +> - **`#[allow(dead_code)]` on the new pure items.** Like Task 2's `Class`/`classify`, the lib target sees `Breaker`/`BreakerState`/`Admit` and their methods as unused (they are consumed only by `breaker_tests` and by Task 4's Service), so `just lint`'s `--all-targets` scope fails with `dead_code`. Add `#[allow(dead_code)]` where needed to keep `just lint` green; Task 4 deletes them all once the Service references them in non-test code. - [ ] **Step 5: Commit** @@ -1021,7 +1022,7 @@ pub use circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerLa - [ ] **Step 4: Run tests to verify they pass** -Run: `just check && cargo test -p oath-adapter-net-http-api circuit_breaker && just lint` +Run: `just check && cargo test -p oath-adapter-net-http-api circuit_breaker && just lint && just doc` Expected: PASS, warning-free. > Known risks: @@ -1030,6 +1031,8 @@ Expected: PASS, warning-free. > - **The `Mutex` guard must not cross the `await`** — each `lock()` is confined to its own `{ … }` block that ends before `inner.call(req).await` (admit block) or contains no await (record block). If clippy `await_holding_lock` fires, a guard escaped its block — re-scope it. > - **`CircuitBreakerLayer::new` is intentionally not `const`** (it calls `Arc::new`). If clippy `missing_const_for_fn` flags it, that is a false positive here — leave it non-`const`. > - The tests need **no** `spawn`/`yield`/`drain` (contrast `retry.rs`): the breaker never sleeps, so every `svc.call(...).await` resolves synchronously against the leaf; `MockTimer::advance` only moves `now()` for the lazy Open→Half-Open check between calls. +> - **Remove the transitional `#[allow(dead_code)]` allows.** Task 2 put `#[allow(dead_code)]` on `Class`/`classify` and Task 3 put them on `Breaker`/`BreakerState`/`Admit` (and their methods) because the lib target saw them as unused. This task's Service wires `classify`, `Breaker::{new, admit, record}`, `Class`, and `Admit` into non-test code, so those items are now reachable — **delete every `#[allow(dead_code)]`** from `circuit_breaker.rs` and confirm `just lint` stays green (an `allow` of a now-non-firing lint is untidy, though `allow` — unlike `expect` — does not itself warn). Optionally upgrade the module-doc `` `Breaker` ``/`` `CircuitBreaker` `` and the `CircuitBreakerConfig`-doc `` `CircuitBreakerLayer::new` `` from plain code spans to intra-doc links now that the targets exist — then re-run `just doc`. +> - **Run `just doc`** before committing — the module doc links resolve only once these types exist; net-http layer PRs have repeatedly shipped broken rustdoc links that `check`/`lint`/`test` miss. - [ ] **Step 5: Commit** From 741812e66a4160deea5aeb3194cca641db08042a Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 19:59:46 +0000 Subject: [PATCH 06/11] fix(net): resolve rustdoc links + document Server classification Co-Authored-By: Claude Opus 4.8 (1M context) --- .../net/http/api/src/circuit_breaker.rs | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/crates/adapter/net/http/api/src/circuit_breaker.rs b/crates/adapter/net/http/api/src/circuit_breaker.rs index 4449c0a..1ca6fc9 100644 --- a/crates/adapter/net/http/api/src/circuit_breaker.rs +++ b/crates/adapter/net/http/api/src/circuit_breaker.rs @@ -7,12 +7,12 @@ //! response), or **immediately** on a `Throttled`/429 with the long //! [`CircuitBreakerConfig::throttle_cooldown`] (IBKR's ~15-minute penalty box). //! While Open it **fast-rejects** every request with a non-retryable -//! [`HttpError::CircuitOpen`](crate::HttpError::CircuitOpen) — the inner stack is +//! [`HttpError::CircuitOpen`] — the inner stack is //! never touched. After the cooldown a bounded number of **Half-Open** probes test //! recovery: a reached-host response closes the circuit, a failure re-opens it. //! -//! The state machine lives in a pure, clock-injected [`Breaker`] (transitions take -//! `now: Instant` as an input, table-tested with zero async); the [`CircuitBreaker`] +//! The state machine lives in a pure, clock-injected `Breaker` (transitions take +//! `now: Instant` as an input, table-tested with zero async); the `CircuitBreaker` //! service is a thin `Arc>` + [`Timer`](oath_adapter_net_api::Timer) //! shell. A **single per-host** breaker is shared behind `Arc`. Runtime-neutral and //! `now()`-only — the breaker never sleeps (Open→Half-Open is a lazy comparison on @@ -27,7 +27,7 @@ use std::time::Duration; /// The circuit breaker's thresholds, as plain `Copy` data (ADR-0031 §5). /// /// `failure_threshold` and `half_open_probes` are `NonZeroU32`: "≥ 1" is a type -/// invariant, so [`CircuitBreakerLayer::new`](crate::CircuitBreakerLayer) needs no +/// invariant, so `CircuitBreakerLayer::new` needs no /// `Result` (a `0` threshold is nonsense and `0` probes would leave a tripped /// circuit stuck Open forever). This types §5's `u32` sketch more precisely. #[derive(Debug, Clone, Copy)] @@ -59,19 +59,24 @@ pub(crate) enum Class { /// Classify a call outcome for the breaker (ADR-0031 §5). /// -/// Only genuine transport failures (`Connection`/`Timeout`) and `5xx` are -/// `Failure`; `Throttled`/429 is `TripNow`; a `4xx`/`Auth`/unclassified error is -/// `Ignored` (never trips **and never resets** — so an interleave cannot mask a -/// building outage); `2xx`/`3xx` is `Success`. `Unknown → Ignored` is the -/// conservative v1 default (the resilience4j fail-safe `Unknown → Failure` is a -/// documented future improvement). +/// Genuine transport failures (`Connection`/`Timeout`), the error-side `Server` +/// kind, and `5xx` responses are all `Failure`; `Throttled`/429 is `TripNow`; a +/// `4xx`/`Auth`/unclassified error is `Ignored` (never trips **and never +/// resets** — so an interleave cannot mask a building outage); `2xx`/`3xx` is +/// `Success`. `Unknown → Ignored` is the conservative v1 default (the +/// resilience4j fail-safe `Unknown → Failure` is a documented future +/// improvement). #[allow(dead_code)] pub(crate) fn classify(outcome: &Result, HttpError>) -> Class { match outcome { Err(e) => match e.kind() { + // Server (5xx-equivalent error kind) grouped with transport failures — + // defensive: no HttpError maps here today, but keeps classify total if + // kind() widens. ErrorKind::Connection | ErrorKind::Timeout | ErrorKind::Server => Class::Failure, ErrorKind::Throttled => Class::TripNow, - // Auth, Client, Unknown, CircuitOpen — and any future kind — are Ignored. + // Auth, Client, Unknown, CircuitOpen — and any future kind — are Ignored + // (no HttpError maps to Client either today; same defensive rationale). _ => Class::Ignored, }, Ok(resp) => { From 3c383cde4de177a9861538d46c35a5770f7e9c70 Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 20:06:01 +0000 Subject: [PATCH 07/11] feat(net): pure clock-injected Breaker state machine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds BreakerState/Admit/Breaker (pub(crate), not re-exported) to circuit_breaker.rs per ADR-0031 §5: new/admit/record, clock-injected via now: Instant so the whole transition table is testable with zero async. Half-Open treats Ignored and Success identically (a reached-host probe resolves it); Open ignores stale record() calls (single global v1 breaker). Task 4's Service will consume these types, removing the transitional #[allow(dead_code)] added here to keep just lint --all-targets green. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../net/http/api/src/circuit_breaker.rs | 304 +++++++++++++++++- 1 file changed, 303 insertions(+), 1 deletion(-) diff --git a/crates/adapter/net/http/api/src/circuit_breaker.rs b/crates/adapter/net/http/api/src/circuit_breaker.rs index 1ca6fc9..2abaf9f 100644 --- a/crates/adapter/net/http/api/src/circuit_breaker.rs +++ b/crates/adapter/net/http/api/src/circuit_breaker.rs @@ -22,7 +22,7 @@ use crate::HttpError; use oath_adapter_net_api::{ErrorKind, HasErrorKind}; use std::num::NonZeroU32; -use std::time::Duration; +use std::time::{Duration, Instant}; /// The circuit breaker's thresholds, as plain `Copy` data (ADR-0031 §5). /// @@ -94,6 +94,151 @@ pub(crate) fn classify(outcome: &Result, HttpError>) -> Cla } } +/// The breaker's state (ADR-0031 §5). `Instant` deadlines are compared against +/// `Timer::now()` by the async shell — the core itself never reads a clock. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(dead_code)] +enum BreakerState { + /// Passing requests; `consecutive_failures` counts toward the trip threshold. + Closed { consecutive_failures: u32 }, + /// Rejecting fast until `reopen_at`; then the next admit begins Half-Open. + Open { reopen_at: Instant }, + /// Probing: `probes_left` may still be admitted, `successes_needed` must reach + /// the host before the circuit closes. + HalfOpen { + probes_left: u32, + successes_needed: u32, + }, +} + +/// The admission verdict for one call. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(dead_code)] +pub(crate) enum Admit { + /// Admit the call to the inner stack. + Pass, + /// Reject the call fast with `CircuitOpen` — the inner stack is not touched. + Reject, +} + +/// The pure circuit-breaker state machine (ADR-0031 §5). +/// +/// Clock-injected: every transition takes `now: Instant` as an input, so the whole +/// unit is table-testable with zero async. The async `CircuitBreaker` shell owns +/// the `Mutex` and the `Timer`; this type holds neither. +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) struct Breaker { + state: BreakerState, + cfg: CircuitBreakerConfig, +} + +#[allow(dead_code)] +impl Breaker { + /// A fresh breaker starts Closed with no failures. + pub(crate) const fn new(cfg: CircuitBreakerConfig) -> Self { + Self { + state: BreakerState::Closed { + consecutive_failures: 0, + }, + cfg, + } + } + + /// Decide whether to admit a call now, transitioning Open→Half-Open lazily. + pub(crate) fn admit(&mut self, now: Instant) -> Admit { + match &mut self.state { + BreakerState::Closed { .. } => Admit::Pass, + BreakerState::Open { reopen_at } => { + if now >= *reopen_at { + // Cooldown elapsed → begin a Half-Open episode; THIS call is the + // first probe (so `probes_left` starts one short of the budget). + let probes = self.cfg.half_open_probes.get(); + self.state = BreakerState::HalfOpen { + probes_left: probes - 1, + successes_needed: probes, + }; + Admit::Pass + } else { + Admit::Reject + } + }, + BreakerState::HalfOpen { probes_left, .. } => { + if *probes_left > 0 { + *probes_left -= 1; + Admit::Pass + } else { + Admit::Reject // concurrency gate: no more than `half_open_probes` in flight + } + }, + } + } + + /// Record a classified outcome, transitioning as ADR-0031 §5 dictates. + pub(crate) fn record(&mut self, class: Class, now: Instant) { + match self.state { + BreakerState::Closed { + consecutive_failures, + } => match class { + Class::Failure => { + let n = consecutive_failures.saturating_add(1); + self.state = if n >= self.cfg.failure_threshold.get() { + BreakerState::Open { + reopen_at: now + self.cfg.cooldown, + } + } else { + BreakerState::Closed { + consecutive_failures: n, + } + }; + }, + Class::TripNow => { + self.state = BreakerState::Open { + reopen_at: now + self.cfg.throttle_cooldown, + }; + }, + Class::Ignored => {}, // streak untouched — a 4xx/Auth neither trips nor resets + Class::Success => { + self.state = BreakerState::Closed { + consecutive_failures: 0, + }; + }, + }, + BreakerState::HalfOpen { + probes_left, + successes_needed, + } => match class { + Class::Failure => { + self.state = BreakerState::Open { + reopen_at: now + self.cfg.cooldown, + }; + }, + Class::TripNow => { + self.state = BreakerState::Open { + reopen_at: now + self.cfg.throttle_cooldown, + }; + }, + // A reached-host probe (2xx/3xx or 4xx/Auth) resolves; the last one closes. + Class::Ignored | Class::Success => { + self.state = if successes_needed <= 1 { + BreakerState::Closed { + consecutive_failures: 0, + } + } else { + BreakerState::HalfOpen { + probes_left, + successes_needed: successes_needed - 1, + } + }; + }, + }, + // A stale outcome from a call admitted before a concurrent trip; drop it. + // Never un-trips a freshly-opened circuit (single global v1 breaker). + BreakerState::Open { .. } => {}, + } + } +} + #[cfg(test)] mod classify_tests { use super::{Class, classify}; @@ -143,3 +288,160 @@ mod classify_tests { assert_eq!(classify(&ok(301)), Class::Success); } } + +#[cfg(test)] +mod breaker_tests { + use super::{Admit, Breaker, CircuitBreakerConfig, Class}; + use std::num::NonZeroU32; + use std::time::{Duration, Instant}; + + fn cfg(threshold: u32, probes: u32) -> CircuitBreakerConfig { + CircuitBreakerConfig { + failure_threshold: NonZeroU32::new(threshold).unwrap(), + cooldown: Duration::from_secs(30), + throttle_cooldown: Duration::from_secs(900), + half_open_probes: NonZeroU32::new(probes).unwrap(), + } + } + + #[test] + fn closed_trips_after_threshold_consecutive_failures() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + assert_eq!(b.admit(now), Admit::Pass); + b.record(Class::Failure, now); + b.record(Class::Failure, now); + assert_eq!(b.admit(now), Admit::Pass, "still closed after 2 failures"); + b.record(Class::Failure, now); + assert_eq!( + b.admit(now), + Admit::Reject, + "3rd consecutive failure → Open rejects" + ); + } + + #[test] + fn a_success_resets_the_failure_streak() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + b.record(Class::Failure, now); + b.record(Class::Failure, now); + b.record(Class::Success, now); // reset + b.record(Class::Failure, now); + b.record(Class::Failure, now); + assert_eq!(b.admit(now), Admit::Pass, "streak reset → not tripped"); + } + + #[test] + fn ignored_does_not_reset_the_streak() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + b.record(Class::Failure, now); + b.record(Class::Ignored, now); // a 4xx does NOT reset — anti-masking + b.record(Class::Failure, now); + b.record(Class::Failure, now); // 3rd failure overall → trips + assert_eq!( + b.admit(now), + Admit::Reject, + "ignored left the streak intact → trips" + ); + } + + #[test] + fn throttle_trips_immediately_on_the_long_cooldown() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + b.record(Class::TripNow, now); // one throttle → Open, no threshold needed + assert_eq!(b.admit(now), Admit::Reject); + assert_eq!( + b.admit(now + Duration::from_secs(30)), + Admit::Reject, + "the short cooldown is insufficient for a throttle trip" + ); + assert_eq!( + b.admit(now + Duration::from_secs(900)), + Admit::Pass, + "throttle_cooldown elapsed → first probe admitted" + ); + } + + #[test] + fn open_rejects_until_cooldown_then_admits_one_probe() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); // trips on the first failure + b.record(Class::Failure, now); + assert_eq!(b.admit(now), Admit::Reject); + let after = now + Duration::from_secs(30); + assert_eq!( + b.admit(after), + Admit::Pass, + "cooldown elapsed → first probe" + ); + assert_eq!( + b.admit(after), + Admit::Reject, + "concurrency gate: no 2nd probe" + ); + } + + #[test] + fn half_open_probe_success_closes() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass); + b.record(Class::Success, after); + assert_eq!(b.admit(after), Admit::Pass, "probe succeeded → closed"); + } + + #[test] + fn half_open_probe_ignored_also_closes() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass); + b.record(Class::Ignored, after); // a 4xx probe still proves the host is reachable + assert_eq!( + b.admit(after), + Admit::Pass, + "ignored probe → closed (no stuck half-open)" + ); + } + + #[test] + fn half_open_probe_failure_reopens() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass); + b.record(Class::Failure, after); // probe fails → reopen with a fresh cooldown + assert_eq!(b.admit(after), Admit::Reject, "re-opened"); + assert_eq!( + b.admit(after + Duration::from_secs(30)), + Admit::Pass, + "the fresh cooldown runs from the failed probe" + ); + } + + #[test] + fn multi_probe_half_open_requires_all_to_close() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 2)); // 2 probes per episode + b.record(Class::Failure, now); + let after = now + Duration::from_secs(30); + assert_eq!(b.admit(after), Admit::Pass, "probe 1"); + assert_eq!(b.admit(after), Admit::Pass, "probe 2"); + assert_eq!(b.admit(after), Admit::Reject, "no probe 3 (gate)"); + b.record(Class::Success, after); // 1 of 2 + assert_eq!( + b.admit(after), + Admit::Reject, + "still half-open, awaiting the 2nd" + ); + b.record(Class::Success, after); // 2 of 2 → close + assert_eq!(b.admit(after), Admit::Pass, "both probes reached → closed"); + } +} From 7a3a66c64a2f22c38da7e1e42dd23405f2f0f25b Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 20:17:13 +0000 Subject: [PATCH 08/11] =?UTF-8?q?feat(net):=20CircuitBreaker=20layer=20?= =?UTF-8?q?=E2=80=94=20async=20Service=20shell=20over=20the=20Breaker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add CircuitBreakerLayer (Layer factory) and CircuitBreaker (the wrapping Service) over the pure Breaker state machine from Tasks 2-3: locks briefly to admit (via timer.now()), releases the lock, runs inner.call().await (or fast-rejects CircuitOpen), then locks briefly to record the classified outcome. The Mutex guard never crosses the await. Removes the transitional #[allow(dead_code)] attributes from Tasks 2-3 now that Class/classify/Breaker/Admit are wired into non-test code. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../net/http/api/src/circuit_breaker.rs | 377 +++++++++++++++++- crates/adapter/net/http/api/src/lib.rs | 2 +- 2 files changed, 368 insertions(+), 11 deletions(-) diff --git a/crates/adapter/net/http/api/src/circuit_breaker.rs b/crates/adapter/net/http/api/src/circuit_breaker.rs index 2abaf9f..ed831bd 100644 --- a/crates/adapter/net/http/api/src/circuit_breaker.rs +++ b/crates/adapter/net/http/api/src/circuit_breaker.rs @@ -13,21 +13,25 @@ //! //! The state machine lives in a pure, clock-injected `Breaker` (transitions take //! `now: Instant` as an input, table-tested with zero async); the `CircuitBreaker` -//! service is a thin `Arc>` + [`Timer`](oath_adapter_net_api::Timer) +//! service is a thin `Arc>` + [`Timer`] //! shell. A **single per-host** breaker is shared behind `Arc`. Runtime-neutral and //! `now()`-only — the breaker never sleeps (Open→Half-Open is a lazy comparison on //! the next admit), so there is no timer race and no new dependency. Body-transparent //! — `http::Response` is forwarded untouched. -use crate::HttpError; -use oath_adapter_net_api::{ErrorKind, HasErrorKind}; +use crate::{HttpError, Service}; +use bytes::Bytes; +use oath_adapter_net_api::{ErrorKind, HasErrorKind, Layer, Timer}; +use std::fmt; +use std::future::Future; use std::num::NonZeroU32; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; /// The circuit breaker's thresholds, as plain `Copy` data (ADR-0031 §5). /// /// `failure_threshold` and `half_open_probes` are `NonZeroU32`: "≥ 1" is a type -/// invariant, so `CircuitBreakerLayer::new` needs no +/// invariant, so [`CircuitBreakerLayer::new`] needs no /// `Result` (a `0` threshold is nonsense and `0` probes would leave a tripped /// circuit stuck Open forever). This types §5's `u32` sketch more precisely. #[derive(Debug, Clone, Copy)] @@ -44,7 +48,6 @@ pub struct CircuitBreakerConfig { /// The breaker-relevant classification of one call outcome (pure, state-independent). #[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[allow(dead_code)] pub(crate) enum Class { /// A genuine transport/server failure — advances the Closed trip counter. Failure, @@ -66,7 +69,6 @@ pub(crate) enum Class { /// `Success`. `Unknown → Ignored` is the conservative v1 default (the /// resilience4j fail-safe `Unknown → Failure` is a documented future /// improvement). -#[allow(dead_code)] pub(crate) fn classify(outcome: &Result, HttpError>) -> Class { match outcome { Err(e) => match e.kind() { @@ -97,7 +99,6 @@ pub(crate) fn classify(outcome: &Result, HttpError>) -> Cla /// The breaker's state (ADR-0031 §5). `Instant` deadlines are compared against /// `Timer::now()` by the async shell — the core itself never reads a clock. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[allow(dead_code)] enum BreakerState { /// Passing requests; `consecutive_failures` counts toward the trip threshold. Closed { consecutive_failures: u32 }, @@ -113,7 +114,6 @@ enum BreakerState { /// The admission verdict for one call. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[allow(dead_code)] pub(crate) enum Admit { /// Admit the call to the inner stack. Pass, @@ -127,13 +127,11 @@ pub(crate) enum Admit { /// unit is table-testable with zero async. The async `CircuitBreaker` shell owns /// the `Mutex` and the `Timer`; this type holds neither. #[derive(Debug, Clone)] -#[allow(dead_code)] pub(crate) struct Breaker { state: BreakerState, cfg: CircuitBreakerConfig, } -#[allow(dead_code)] impl Breaker { /// A fresh breaker starts Closed with no failures. pub(crate) const fn new(cfg: CircuitBreakerConfig) -> Self { @@ -239,6 +237,131 @@ impl Breaker { } } +/// The `CircuitBreaker` [`Layer`] factory: holds the single shared breaker + clock. +/// +/// `new` constructs the breaker **once** into an `Arc>`; every service it +/// produces (and every clone) shares it — a single per-host breaker (ADR-0031 §5). +pub struct CircuitBreakerLayer { + breaker: Arc>, + timer: T, +} + +impl CircuitBreakerLayer { + /// Build the layer from thresholds and a [`Timer`] clock. + /// + /// **Infallible** — `NonZeroU32` makes the two counts "≥ 1" a type invariant + /// (contrast `RateLimitLayer::new`, which validates a config map). Not `const`: + /// it allocates the shared `Arc>`. + #[must_use] + pub fn new(cfg: CircuitBreakerConfig, timer: T) -> Self { + Self { + breaker: Arc::new(Mutex::new(Breaker::new(cfg))), + timer, + } + } +} + +impl Clone for CircuitBreakerLayer { + fn clone(&self) -> Self { + Self { + breaker: Arc::clone(&self.breaker), + timer: self.timer.clone(), + } + } +} + +impl fmt::Debug for CircuitBreakerLayer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CircuitBreakerLayer") + .finish_non_exhaustive() + } +} + +impl Layer for CircuitBreakerLayer { + type Service = CircuitBreaker; + + fn layer(&self, inner: S) -> CircuitBreaker { + CircuitBreaker { + inner, + breaker: Arc::clone(&self.breaker), + timer: self.timer.clone(), + } + } +} + +/// The `CircuitBreaker` middleware: fast-rejects while Open, else forwards. +/// +/// A thin shell over the pure `Breaker`: it locks briefly to `admit` (using +/// `timer.now()`), releases the lock, runs `inner.call` (or returns `CircuitOpen`), +/// then locks briefly to `record` the classified outcome. The lock is **never** +/// held across the `await`. Body-transparent — `http::Response` is forwarded. +pub struct CircuitBreaker { + inner: S, + breaker: Arc>, + timer: T, +} + +impl Clone for CircuitBreaker { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + breaker: Arc::clone(&self.breaker), + timer: self.timer.clone(), + } + } +} + +impl fmt::Debug for CircuitBreaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CircuitBreaker").finish_non_exhaustive() + } +} + +impl Service> for CircuitBreaker +where + S: Service, Response = http::Response, Error = HttpError> + Sync, + T: Timer, +{ + type Response = http::Response; + type Error = HttpError; + + // Not `async fn`: the trait requires the returned future to be `Send`. + #[allow(clippy::manual_async_fn)] + fn call( + &self, + req: http::Request, + ) -> impl Future> + Send { + async move { + // Admit decision under a short lock (released at the end of this block). + let admit = { + let now = self.timer.now(); + let mut breaker = self + .breaker + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + breaker.admit(now) + }; + if admit == Admit::Reject { + return Err(HttpError::CircuitOpen); // fast reject — the leaf is not touched + } + + let outcome = self.inner.call(req).await; // NO lock held across the await + + // Record the classified outcome under a second short lock. + let class = classify(&outcome); + { + let now = self.timer.now(); + let mut breaker = self + .breaker + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + breaker.record(class, now); + } + outcome + } + } +} + #[cfg(test)] mod classify_tests { use super::{Class, classify}; @@ -445,3 +568,237 @@ mod breaker_tests { assert_eq!(b.admit(after), Admit::Pass, "both probes reached → closed"); } } + +#[cfg(test)] +mod service_tests { + use super::{CircuitBreakerConfig, CircuitBreakerLayer}; + use crate::{HttpError, Service}; + use bytes::Bytes; + use oath_adapter_net_api::{ErrorKind, Layer}; + use oath_adapter_net_mock::MockTimer; + use std::future::Future; + use std::num::NonZeroU32; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + // One scripted outcome per attempt. `Copy` so the leaf reads it by index. + #[derive(Clone, Copy)] + enum Step { + Err(ErrorKind), + Status(u16), + } + + fn err_of(kind: ErrorKind) -> HttpError { + match kind { + ErrorKind::Timeout => HttpError::Timeout, + ErrorKind::Connection => HttpError::connection("reset"), + ErrorKind::Throttled => HttpError::Throttled, + ErrorKind::Auth => HttpError::auth("expired"), + _ => HttpError::other("boom"), + } + } + + // An inline leaf yielding a scripted sequence of outcomes, counting calls. Once + // the script is exhausted it repeats the last step. Body is `()` — the breaker + // only reads `status()`, never the body. Inline (not `MockClient`) to avoid the + // net-http-mock -> net-http-api dev-dep cycle. + #[derive(Clone)] + struct ScriptLeaf { + steps: Arc>, + calls: Arc, + } + impl ScriptLeaf { + fn new(steps: Vec) -> Self { + Self { + steps: Arc::new(steps), + calls: Arc::new(AtomicUsize::new(0)), + } + } + fn calls(&self) -> usize { + self.calls.load(Ordering::Relaxed) + } + } + impl Service> for ScriptLeaf { + type Response = http::Response<()>; + type Error = HttpError; + #[allow(clippy::manual_async_fn)] + fn call( + &self, + _req: http::Request, + ) -> impl Future> + Send { + let i = self.calls.fetch_add(1, Ordering::Relaxed); + let step = self + .steps + .get(i) + .copied() + .unwrap_or_else(|| *self.steps.last().unwrap()); + async move { + match step { + Step::Err(kind) => Err(err_of(kind)), + Step::Status(code) => { + let mut resp = http::Response::new(()); + *resp.status_mut() = http::StatusCode::from_u16(code).unwrap(); + Ok(resp) + }, + } + } + } + } + + fn cfg( + threshold: u32, + cooldown: Duration, + throttle: Duration, + probes: u32, + ) -> CircuitBreakerConfig { + CircuitBreakerConfig { + failure_threshold: NonZeroU32::new(threshold).unwrap(), + cooldown, + throttle_cooldown: throttle, + half_open_probes: NonZeroU32::new(probes).unwrap(), + } + } + + fn secs(n: u64) -> Duration { + Duration::from_secs(n) + } + + fn bare_req() -> http::Request { + http::Request::new(Bytes::new()) + } + + #[tokio::test] + async fn trips_after_threshold_then_fast_rejects_without_touching_the_leaf() { + let leaf = ScriptLeaf::new(vec![Step::Err(ErrorKind::Connection)]); // always fails + let svc = CircuitBreakerLayer::new(cfg(3, secs(30), secs(900), 1), MockTimer::new()) + .layer(leaf.clone()); + for _ in 0..3 { + let _ = svc.call(bare_req()).await; // 3 consecutive failures trip it + } + assert_eq!(leaf.calls(), 3); + let err = svc.call(bare_req()).await.unwrap_err(); + assert!(matches!(err, HttpError::CircuitOpen)); + assert_eq!( + leaf.calls(), + 3, + "an open circuit fast-rejects; the leaf is untouched" + ); + } + + #[tokio::test] + async fn a_single_429_trips_immediately_on_the_long_cooldown() { + let timer = MockTimer::new(); + let leaf = ScriptLeaf::new(vec![Step::Status(429), Step::Status(200)]); + let svc = CircuitBreakerLayer::new(cfg(3, secs(30), secs(900), 1), timer.clone()) + .layer(leaf.clone()); + let resp = svc.call(bare_req()).await.expect("429 returns as Ok"); + assert_eq!(resp.status(), http::StatusCode::TOO_MANY_REQUESTS); + assert!( + matches!( + svc.call(bare_req()).await.unwrap_err(), + HttpError::CircuitOpen + ), + "one 429 trips the circuit" + ); + timer.advance(secs(30)); // the SHORT cooldown is not enough for a throttle trip + assert!(matches!( + svc.call(bare_req()).await.unwrap_err(), + HttpError::CircuitOpen + )); + timer.advance(secs(900)); // now past throttle_cooldown + let resp = svc + .call(bare_req()) + .await + .expect("probe admitted, leaf returns 200"); + assert_eq!(resp.status(), http::StatusCode::OK); + assert_eq!( + leaf.calls(), + 2, + "one 429 + one probe; the fast-rejects never hit the leaf" + ); + } + + #[tokio::test] + async fn recovers_when_the_cooldown_probe_succeeds() { + let timer = MockTimer::new(); + let leaf = ScriptLeaf::new(vec![ + Step::Err(ErrorKind::Timeout), + Step::Err(ErrorKind::Timeout), + Step::Status(200), + ]); + let svc = CircuitBreakerLayer::new(cfg(2, secs(30), secs(900), 1), timer.clone()) + .layer(leaf.clone()); + let _ = svc.call(bare_req()).await; // fail 1 + let _ = svc.call(bare_req()).await; // fail 2 → Open + assert!(matches!( + svc.call(bare_req()).await.unwrap_err(), + HttpError::CircuitOpen + )); + timer.advance(secs(30)); + let ok = svc + .call(bare_req()) + .await + .expect("probe hits the leaf → 200"); + assert_eq!(ok.status(), http::StatusCode::OK); + let ok2 = svc + .call(bare_req()) + .await + .expect("closed → next call flows"); + assert_eq!(ok2.status(), http::StatusCode::OK); + assert_eq!( + leaf.calls(), + 4, + "2 failures + 2 post-recovery sends; rejects skip the leaf" + ); + } + + #[tokio::test] + async fn reopens_when_the_cooldown_probe_fails() { + let timer = MockTimer::new(); + let leaf = ScriptLeaf::new(vec![ + Step::Err(ErrorKind::Connection), + Step::Err(ErrorKind::Connection), + Step::Status(503), + ]); + let svc = CircuitBreakerLayer::new(cfg(2, secs(30), secs(900), 1), timer.clone()) + .layer(leaf.clone()); + let _ = svc.call(bare_req()).await; + let _ = svc.call(bare_req()).await; // Open + assert!(matches!( + svc.call(bare_req()).await.unwrap_err(), + HttpError::CircuitOpen + )); + timer.advance(secs(30)); + let resp = svc + .call(bare_req()) + .await + .expect("probe returns a 503 as Ok"); + assert_eq!(resp.status(), 503); + assert!( + matches!( + svc.call(bare_req()).await.unwrap_err(), + HttpError::CircuitOpen + ), + "the probe failed → re-opened" + ); + assert_eq!(leaf.calls(), 3); + } + + #[tokio::test] + async fn clones_from_one_layer_share_the_breaker() { + let leaf = ScriptLeaf::new(vec![Step::Err(ErrorKind::Connection)]); + let layer = CircuitBreakerLayer::new(cfg(2, secs(30), secs(900), 1), MockTimer::new()); + let a = layer.layer(leaf.clone()); + let b = a.clone(); // shares the Arc> + let _ = a.call(bare_req()).await; // fail 1 via A + let _ = a.call(bare_req()).await; // fail 2 via A → Open + assert!( + matches!( + b.call(bare_req()).await.unwrap_err(), + HttpError::CircuitOpen + ), + "clone B observes A's trip (single per-host breaker)" + ); + } +} diff --git a/crates/adapter/net/http/api/src/lib.rs b/crates/adapter/net/http/api/src/lib.rs index 2d08dbe..984f458 100644 --- a/crates/adapter/net/http/api/src/lib.rs +++ b/crates/adapter/net/http/api/src/lib.rs @@ -36,7 +36,7 @@ pub mod timeout; pub use auth::{Auth, AuthSource, NoAuth, SetHeaders}; pub use body::{BufferMode, Guarded, ResponseBody}; -pub use circuit_breaker::CircuitBreakerConfig; +pub use circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerLayer}; pub use client::HttpClient; pub use error::{BoxError, HttpError}; pub use rate::{ From 678207347902e1e85f1b334ce8c46446b5823c4b Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 20:23:23 +0000 Subject: [PATCH 09/11] docs(net): CircuitBreaker layer amendment (ADR-0034 #9) + changelog Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 11 ++++++++++ ...tion-surface-auth-guarded-boot-coverage.md | 22 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b7d3ce2..2b97ac4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 exhaustion; body-transparent (drops a superseded response, releasing its `Guarded` permit). Adds the `Retryable` marker + `RetryConfig` schedule; jitter via an internal seeded `SplitMix64` — no new dependency. (ADR-0031 §2, ADR-0034.) +- `oath-adapter-net-http-api` `CircuitBreaker` resilience layer (Slice 1 PR 4) — the + `CircuitBreaker` service + `CircuitBreakerLayer` factory (`net-api::Layer`): + the reactive backstop to `RateLimit`. Trips Open after `failure_threshold` consecutive + `Connection`/`Timeout`/`5xx` failures, or immediately on a `Throttled`/429 with the long + `throttle_cooldown`; fast-rejects with a new non-retryable `HttpError::CircuitOpen` + (mapped to a new `ErrorKind::CircuitOpen`) without touching the inner stack; admits + bounded Half-Open probes after cooldown (reached-host closes, failure re-opens). Pure + clock-injected `Breaker` state machine (Closed/Open/Half-Open) behind a thin + `Arc>` + `Timer` shell; single per-host breaker; `now()`-only (no sleep, + no new dependency). 4-class outcome partition so `4xx`/`Auth`/unclassified errors neither + trip nor mask an outage. (ADR-0031 §5, ADR-0034.) - net-http construction-surface design refinements (ADR-0034 append-only Amendments 2026-07-04, spec updated) — an absent `RateLimit` directive now **fails closed** (not "defaults to `Global`"), closing the last silent diff --git a/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md b/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md index 3f36eb9..d5b77c6 100644 --- a/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md +++ b/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md @@ -219,3 +219,25 @@ carries the full reasoning. seeded `SplitMix64` (no `rand` dependency, no injected `Jitter` generic — the RNG is a pure computation); a **total-elapsed retry budget** and **`Retry-After` parsing** are deferred (each an additive follow-up). No new dependency. + +9. **`CircuitBreaker` layer (Slice 1 PR 4).** The `CircuitBreaker` layer + + `CircuitBreakerLayer` factory add the **reactive** backstop to `RateLimit`'s + proactive pacing (ADR-0031 §5). A pure, clock-injected `Breaker` state machine + (Closed/Open/Half-Open) — table-tested with zero async — sits behind a thin + `Arc>` + `Timer` Service shell. It trips **Open** on + `CircuitBreakerConfig::failure_threshold` consecutive `Connection`/`Timeout`/`5xx` + failures, or **immediately** on a `Throttled`/429 with the long `throttle_cooldown` + (IBKR's penalty box); while Open it **fast-rejects** with a **new non-retryable + `HttpError::CircuitOpen` / `ErrorKind::CircuitOpen`** without touching the inner + stack; after the cooldown it admits `half_open_probes` **Half-Open** probes (a + reached-host outcome closes, a failure re-opens). Outcomes are a **4-class + partition**: `Connection`/`Timeout`/`5xx` → *Failure*; `Throttled`/429 → + *TripNow*; `4xx`/`Auth`/`Unknown` → *Ignored* (never trips **and never resets** — + so an interleave cannot mask a building outage; an `Auth` error must not trip the + gateway); `2xx`/`3xx` → *Success*. `failure_threshold`/`half_open_probes` are + `NonZeroU32` (typing §5's `u32` — "≥ 1" a type invariant, infallible `new`). A + **single per-host** breaker shared behind `Arc`; **consecutive-count** for v1; + `now()`-only timing (lazy Open→Half-Open, no sleep, no `futures-util`, no new + dependency). It sits **outside `Retry`**, counting logical post-retry outcomes. + Deferred: the resilience4j fail-safe `Unknown → Failure`, rolling-window counting, + per-key breakers, and a breaker-state observation watch. From 5f378bdbd616eb83a68c5eab40247b1aa7c7a770 Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sat, 4 Jul 2026 20:43:19 +0000 Subject: [PATCH 10/11] fix(net): close Half-Open cancellation wedge with a probe drop-guard If the CircuitBreaker call future is dropped before recording the outcome (caller cancellation, or an inner panic) while a Half-Open probe is in flight, the breaker was stranded at probes_left:0 forever. Add a state-aware Breaker::on_abandoned_probe (reopen from Half-Open, no-op in Closed/Open) and an RAII ProbeGuard that fires it on drop, so a cancelled probe self-heals after a fresh cooldown instead of wedging. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../net/http/api/src/circuit_breaker.rs | 219 ++++++++++++++++++ ...04-net-http-circuitbreaker-layer-design.md | 10 + 2 files changed, 229 insertions(+) diff --git a/crates/adapter/net/http/api/src/circuit_breaker.rs b/crates/adapter/net/http/api/src/circuit_breaker.rs index ed831bd..1214d76 100644 --- a/crates/adapter/net/http/api/src/circuit_breaker.rs +++ b/crates/adapter/net/http/api/src/circuit_breaker.rs @@ -235,6 +235,22 @@ impl Breaker { BreakerState::Open { .. } => {}, } } + + /// Resolve a Half-Open probe whose call was **abandoned** (the future was + /// dropped by caller cancellation, or the inner service panicked) before its + /// outcome could be recorded. Only meaningful in Half-Open: reopen so the + /// episode ends and the circuit self-heals after `cooldown` — a probe with an + /// unknown outcome must not optimistically close. A **no-op** in `Closed` (a + /// cancelled call is not a host-health signal, so it must not advance the trip + /// streak) and in `Open` (already tripped). This is what makes "every admitted + /// probe reaches a decisive resolution" hold even under cancellation. + pub(crate) fn on_abandoned_probe(&mut self, now: Instant) { + if matches!(self.state, BreakerState::HalfOpen { .. }) { + self.state = BreakerState::Open { + reopen_at: now + self.cfg.cooldown, + }; + } + } } /// The `CircuitBreaker` [`Layer`] factory: holds the single shared breaker + clock. @@ -317,6 +333,45 @@ impl fmt::Debug for CircuitBreaker { } } +/// Arms a safety net for an admitted call: if the [`CircuitBreaker::call`] future +/// is dropped (caller cancellation) or the inner service panics **before** the real +/// outcome is recorded, this guard's `Drop` resolves the (possibly Half-Open) probe +/// via [`Breaker::on_abandoned_probe`], so a cancelled probe can never strand the +/// breaker in a permanent Half-Open reject. Disarmed the instant the inner call +/// returns normally, so a completed call records its true outcome instead. +struct ProbeGuard<'a, T: Timer> { + breaker: &'a std::sync::Mutex, + timer: &'a T, + armed: bool, +} + +impl<'a, T: Timer> ProbeGuard<'a, T> { + const fn arm(breaker: &'a std::sync::Mutex, timer: &'a T) -> Self { + Self { + breaker, + timer, + armed: true, + } + } + + const fn disarm(&mut self) { + self.armed = false; + } +} + +impl Drop for ProbeGuard<'_, T> { + fn drop(&mut self) { + if self.armed { + let now = self.timer.now(); + let mut breaker = self + .breaker + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + breaker.on_abandoned_probe(now); + } + } +} + impl Service> for CircuitBreaker where S: Service, Response = http::Response, Error = HttpError> + Sync, @@ -345,7 +400,12 @@ where return Err(HttpError::CircuitOpen); // fast reject — the leaf is not touched } + // Arm the drop-guard: if this future is cancelled (or the leaf panics) + // before the real outcome is recorded below, the guard resolves the + // (possibly Half-Open) probe instead of stranding the breaker. + let mut guard = ProbeGuard::arm(&self.breaker, &self.timer); let outcome = self.inner.call(req).await; // NO lock held across the await + guard.disarm(); // the future was NOT cancelled — record the true outcome below // Record the classified outcome under a second short lock. let class = classify(&outcome); @@ -567,6 +627,77 @@ mod breaker_tests { b.record(Class::Success, after); // 2 of 2 → close assert_eq!(b.admit(after), Admit::Pass, "both probes reached → closed"); } + + #[test] + fn abandoned_probe_reopens_half_open() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); // → Open + let after = now + Duration::from_secs(30); + assert_eq!( + b.admit(after), + Admit::Pass, + "cooldown elapsed → probe admitted" + ); + b.on_abandoned_probe(after); // the probe's future was dropped + assert_eq!( + b.admit(after), + Admit::Reject, + "abandoned probe reopened → still within the fresh cooldown" + ); + assert_eq!( + b.admit(after + Duration::from_secs(30)), + Admit::Pass, + "self-healed after a fresh cooldown from the abandonment" + ); + } + + #[test] + fn abandoned_probe_is_a_noop_in_closed() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(3, 1)); + b.record(Class::Failure, now); // streak = 1 + b.record(Class::Failure, now); // streak = 2 + b.on_abandoned_probe(now); // must NOT advance the streak + assert_eq!( + b.admit(now), + Admit::Pass, + "2 real failures < threshold 3 — abandon was a no-op" + ); + b.record(Class::Failure, now); // the 3rd REAL failure trips it + assert_eq!(b.admit(now), Admit::Reject, "3rd real failure → tripped"); + } + + #[test] + fn abandoned_probe_is_a_noop_in_open() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); // → Open { reopen_at: now + 30s } + b.on_abandoned_probe(now + Duration::from_secs(5)); // must not push the deadline out + assert_eq!( + b.admit(now + Duration::from_secs(29)), + Admit::Reject, + "reopen_at unchanged by the no-op abandon" + ); + assert_eq!( + b.admit(now + Duration::from_secs(30)), + Admit::Pass, + "original cooldown still elapses on schedule" + ); + } + + #[test] + fn record_while_open_never_untrips() { + let now = Instant::now(); + let mut b = Breaker::new(cfg(1, 1)); + b.record(Class::Failure, now); // → Open + b.record(Class::Success, now); // a stale success from a pre-trip admit + assert_eq!( + b.admit(now), + Admit::Reject, + "the Open no-op arm must never un-trip a freshly-opened circuit" + ); + } } #[cfg(test)] @@ -801,4 +932,92 @@ mod service_tests { "clone B observes A's trip (single per-host breaker)" ); } + + // A leaf that fails on its first call (tripping `cfg(threshold=1)`) and then + // never resolves — models the Half-Open probe call getting cancelled in flight. + #[derive(Clone)] + struct FailThenHangLeaf { + calls: Arc, + } + impl FailThenHangLeaf { + fn new() -> Self { + Self { + calls: Arc::new(AtomicUsize::new(0)), + } + } + } + impl Service> for FailThenHangLeaf { + type Response = http::Response<()>; + type Error = HttpError; + #[allow(clippy::manual_async_fn)] + fn call( + &self, + _req: http::Request, + ) -> impl Future> + Send { + let i = self.calls.fetch_add(1, Ordering::Relaxed); + async move { + if i == 0 { + Err(err_of(ErrorKind::Connection)) + } else { + // Models an in-flight request that never returns until cancelled. + std::future::pending::, HttpError>>().await + } + } + } + } + + #[tokio::test] + async fn a_cancelled_half_open_probe_reopens_instead_of_wedging() { + let timer = MockTimer::new(); + let leaf = FailThenHangLeaf::new(); + let svc = + CircuitBreakerLayer::new(cfg(1, secs(30), secs(900), 1), timer.clone()).layer(leaf); + + // 1. First call is admitted (Closed) but fails → trips the circuit Open. + // The call returns the real transport error, not `CircuitOpen` — the + // circuit trips only *after* this outcome is recorded. + assert!(matches!( + svc.call(bare_req()).await.unwrap_err(), + HttpError::Connection(_) + )); + // Confirm the trip: the very next call fast-rejects. + let err = svc.call(bare_req()).await.unwrap_err(); + assert!(matches!(err, HttpError::CircuitOpen), "confirms Open"); + + // 2. Cooldown elapses. + timer.advance(secs(30)); + + // 3. Poll once: admits the Half-Open probe (state → HalfOpen{probes_left:0}) + // and parks on the never-resolving leaf call. + // `Box::pin` (not `std::pin::pin!`) so `drop(fut)` below actually runs the + // future's destructor early — `pin!`'s backing storage lives in a hidden + // stack slot until the enclosing scope ends, so dropping its `Pin<&mut _>` + // handle would NOT run `ProbeGuard::drop` at the point we need it to. + let mut fut = Box::pin(svc.call(bare_req())); + assert!( + futures_util::poll!(fut.as_mut()).is_pending(), + "the probe is admitted and parked on the hanging leaf" + ); + + // 4. Drop the parked future — simulates caller cancellation. `ProbeGuard::drop` + // must fire `Breaker::on_abandoned_probe`, reopening the circuit. + drop(fut); + + // 5. Self-heal, not a wedge: still within the fresh cooldown → fast-reject. + assert!( + matches!( + svc.call(bare_req()).await.unwrap_err(), + HttpError::CircuitOpen + ), + "reopened with a fresh cooldown from the abandonment" + ); + // After a fresh cooldown, a new probe is admitted again (parks on the leaf, + // i.e. polls Pending) instead of being permanently rejected as CircuitOpen. + timer.advance(secs(30)); + let mut fut2 = std::pin::pin!(svc.call(bare_req())); + assert!( + futures_util::poll!(fut2.as_mut()).is_pending(), + "self-healed: a fresh probe is admitted rather than wedged at probes_left:0" + ); + } } diff --git a/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md b/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md index 5da4f9c..60e7600 100644 --- a/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md +++ b/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md @@ -273,6 +273,16 @@ where between attempts), the breaker only ever *reads* `timer.now()`; the Open→Half-Open transition is a **lazy comparison on the next `admit`**, so there is no background timer, no `futures-util::select`, and no new dependency. +- **Cancellation-safe.** An admitted call arms an RAII `ProbeGuard` around + `inner.call(req).await`; if that future is **dropped** (caller cancellation via + `select!`/`timeout`) or the inner service **panics** before the real outcome is + recorded, the guard's `Drop` calls `Breaker::on_abandoned_probe` (Half-Open → + reopen on a fresh `cooldown`; a no-op in `Closed`/`Open`). The guard is disarmed the + instant `inner.call` returns normally, so a completed call still records its true + outcome. This closes the one wedge the "every admitted probe reaches a decisive + resolution" invariant (§2) didn't cover on its own: a **cancelled** Half-Open probe + now self-heals after `cooldown` instead of permanently stranding the breaker at + `probes_left: 0`. ### 5. The new error — `HttpError::CircuitOpen` From b6027bc3e4711076a39be3061b153b9f226465df Mon Sep 17 00:00:00 2001 From: NotAProfDev <84450364+NotAProfDev@users.noreply.github.com> Date: Sun, 5 Jul 2026 07:38:02 +0000 Subject: [PATCH 11/11] docs(net): scope CB Ignored "never resets" + resolve stale spec question MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address CodeRabbit review on #85: - ADR-0034 #9: "never resets" was unqualified but only holds for the Closed-state failure streak; in Half-Open a reached-host Ignored resolves the probe like a Success (circuit_breaker.rs record()). Scope the wording. - CB design spec open-question #4: convert the stale "table-test >1 now?" question to Resolved — the Testing section commits to the two-probe case and multi_probe_half_open_requires_all_to_close ships it. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...http-construction-surface-auth-guarded-boot-coverage.md | 7 ++++--- .../2026-07-04-net-http-circuitbreaker-layer-design.md | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md b/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md index d5b77c6..4259333 100644 --- a/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md +++ b/docs/adr/0034-http-construction-surface-auth-guarded-boot-coverage.md @@ -232,9 +232,10 @@ carries the full reasoning. stack; after the cooldown it admits `half_open_probes` **Half-Open** probes (a reached-host outcome closes, a failure re-opens). Outcomes are a **4-class partition**: `Connection`/`Timeout`/`5xx` → *Failure*; `Throttled`/429 → - *TripNow*; `4xx`/`Auth`/`Unknown` → *Ignored* (never trips **and never resets** — - so an interleave cannot mask a building outage; an `Auth` error must not trip the - gateway); `2xx`/`3xx` → *Success*. `failure_threshold`/`half_open_probes` are + *TripNow*; `4xx`/`Auth`/`Unknown` → *Ignored* (never trips, and **never resets the + Closed-state failure streak** — so an interleave cannot mask a building outage; an + `Auth` error must not trip the gateway; in **Half-Open** a reached-host `Ignored` + still resolves the probe like a `Success`); `2xx`/`3xx` → *Success*. `failure_threshold`/`half_open_probes` are `NonZeroU32` (typing §5's `u32` — "≥ 1" a type invariant, infallible `new`). A **single per-host** breaker shared behind `Arc`; **consecutive-count** for v1; `now()`-only timing (lazy Open→Half-Open, no sleep, no `futures-util`, no new diff --git a/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md b/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md index 60e7600..58e18a7 100644 --- a/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md +++ b/docs/superpowers/specs/2026-07-04-net-http-circuitbreaker-layer-design.md @@ -412,6 +412,7 @@ only — **no** `futures-util` race, **no** `sleep`, **no** 3. **Test executor.** `#[tokio::test]` for the Service tests (parity with the shipped layers); the pure `Breaker` tests need no executor at all — a benefit of the pure-core cut. -4. **`half_open_probes > 1` semantics.** v1 ships the `NonZeroU32` knob and the - `probes_left`/`successes_needed` accounting handles `> 1`, but IBKR uses `1`; confirm - whether to table-test the `> 1` path now (cheap, proves the generality) or defer. +4. **`half_open_probes > 1` semantics.** *Resolved:* v1 ships the `NonZeroU32` knob and + the `probes_left`/`successes_needed` accounting handles `> 1`; IBKR uses `1`, but the + `> 1` path **is table-tested now** (cheap, proves the generality) — see the Testing + section's two-probe concurrency-gate case.