Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
standalone `validate_coverage` check: an unclassified endpoint or an
out-of-range policy param is a boot failure, not a first-live-order 429
(ADR-0034 §3). Closes Slice 0 of the net-http construction surface.
- `oath-adapter-net-http-api` `RateLimit` resilience layer (Slice 1) — the
`RateLimit<S, K, T>` service + `RateLimitLayer<K, T>` factory (`net-api::Layer`):
proactive per-endpoint pacing (token-bucket + concurrency policies) built from a
validated `RateLimitConfig`, driven by `net-api::Timer` (mockable clock). Adds the
`RateScope`/`Scope` per-request directive (absent → fails closed; `None` → opt-out;
a runtime coverage gap fails closed as `Throttled`, never sent). `LimitPolicy::
TokenBucket` gains `per: Duration` for sub-1/second venue limits, and the
≤1-concurrency-permit invariant is a boot check (`BuildError::MultipleConcurrency`).
(ADR-0031 §3–4.)
- net-http construction-surface design refinements (ADR-0034 append-only
Amendments 2026-07-04, spec updated) — an absent `RateLimit<K>` directive now
**fails closed** (not "defaults to `Global`"), closing the last silent
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/adapter/net/http/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ async-lock = { workspace = true }
thiserror = { workspace = true }
http = { workspace = true }
bytes = { workspace = true }
futures-util = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
pin-project-lite = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
oath-adapter-net-mock = { workspace = true }

[lints]
workspace = true
9 changes: 8 additions & 1 deletion crates/adapter/net/http/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
//! - [`auth`] — the `AuthSource` seam, `NoAuth`, and the `Auth`/`SetHeaders` layers
//! - [`rate`] — `RateKey`, the `LimitPolicy`/`LimitDecl` vocabulary, the total
//! `RateLimitConfig`, and the boot-time `validate_coverage` check
//! - [`rate_limit`] — the `RateLimit` layer, its `RateLimitLayer` factory, and
//! the `RateScope`/`Scope` per-request directive
//!
//! The resilience layers, `stack`/`build` assembly, and backends land in later
//! slices. No async runtime, `hyper`, `reqwest`, or `serde` here.
Expand All @@ -20,11 +22,16 @@ pub mod body;
pub mod client;
pub mod error;
pub mod rate;
pub mod rate_limit;
pub mod service;

pub use auth::{Auth, AuthSource, NoAuth, SetHeaders};
pub use body::{BufferMode, Guarded, ResponseBody};
pub use client::HttpClient;
pub use error::{BoxError, HttpError};
pub use rate::{BuildError, LimitDecl, LimitPolicy, RateKey, RateLimitConfig, validate_coverage};
pub use rate::{
BuildError, LimitDecl, LimitPolicy, RateKey, RateLimitConfig, validate_concurrency_singleton,
validate_coverage,
};
pub use rate_limit::{RateLimit, RateLimitLayer, RateScope, Scope};
pub use service::Service;
154 changes: 143 additions & 11 deletions crates/adapter/net/http/api/src/rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
//! explicitly classified — `LimitDecl::Policy` or `LimitDecl::GlobalOnly`,
//! never "absent". A missing or ill-configured bucket is caught at
//! construction ([`validate_coverage`]), so it is a boot failure rather than a
//! first-live-order 429 → 15-minute IBKR penalty box. This module is pure data
//! + one validator; the `RateLimit` layer that consumes it lands in Slice 1.
//! first-live-order 429 → 15-minute IBKR penalty box.
//!
//! This module is pure data + its two validators (`validate_coverage`,
//! `validate_concurrency_singleton`); the `RateLimit` layer that consumes them
//! lives in [`crate::rate_limit`].

use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::time::Duration;

/// An adapter's rate-limit key with a **finite universe** — the enumeration
/// that makes the boot-time coverage check possible (ADR-0034 §3).
Expand All @@ -35,10 +39,14 @@ pub trait RateKey: Hash + Eq + Clone + Send + Sync + 'static {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum LimitPolicy {
/// A refilling token bucket: `rate` tokens/second, up to `burst` in hand.
/// A refilling token bucket: `rate` tokens per `per` window, up to `burst`
/// in hand. `per` lets sub-1/second venue limits (IBKR `1/5s`, `1/min`,
/// `1/15min`) be expressed exactly with integer parameters.
TokenBucket {
/// Steady-state tokens per second (must be `>= 1`).
/// Tokens replenished per `per` window (must be `>= 1`).
rate: u32,
/// The replenishment window (must be non-zero).
per: Duration,
/// Maximum tokens available at once (must be `>= 1`).
burst: u32,
},
Expand Down Expand Up @@ -79,7 +87,7 @@ impl LimitPolicy {
/// `burst == 0`, `max == 0`).
fn validate(self) -> Result<(), BuildError> {
match self {
Self::TokenBucket { rate, burst } => {
Self::TokenBucket { rate, per, burst } => {
if rate == 0 {
return Err(BuildError::InvalidPolicy(format!(
"token-bucket rate must be >= 1, got {rate}"
Expand All @@ -90,6 +98,11 @@ impl LimitPolicy {
"token-bucket burst must be >= 1, got {burst}"
)));
}
if per.is_zero() {
return Err(BuildError::InvalidPolicy(format!(
"token-bucket period must be non-zero, got {per:?}"
)));
}
Ok(())
},
Self::Concurrency { max } => {
Expand Down Expand Up @@ -122,6 +135,14 @@ pub enum BuildError {
/// A policy carries out-of-range parameters (`rate`/`burst`/`max` of 0).
#[error("invalid rate-limit policy: {0}")]
InvalidPolicy(String),
/// A config in which a `Both`-scoped request could require two held
/// concurrency permits (global `Concurrency` **and** a local `Concurrency`)
/// — [`Guarded`](crate::Guarded) holds one, so this is a boot failure, not a
/// silent runtime permit truncation.
#[error(
"config has both a global and a local Concurrency policy; a Both-scoped request would need two held permits (Guarded holds one)"
)]
MultipleConcurrency,
}

/// Validate that `cfg` is a **total**, param-sane pacing configuration.
Expand Down Expand Up @@ -149,10 +170,37 @@ where
Ok(())
}

/// Reject a config whose `Both`-scoped requests could require two held
/// concurrency permits — global `Concurrency` **and** any local `Concurrency`.
///
/// `RateLimitLayer::new` calls this alongside [`validate_coverage`], turning the
/// ≤1-concurrency-permit invariant into a boot failure (spec Decision 6).
///
/// # Errors
/// [`BuildError::MultipleConcurrency`] if the global policy is `Concurrency` and
/// any local `Policy` is also `Concurrency`.
pub fn validate_concurrency_singleton<K>(cfg: &RateLimitConfig<K>) -> Result<(), BuildError>
where
K: RateKey,
{
if matches!(cfg.global, LimitPolicy::Concurrency { .. }) {
for decl in cfg.local.values() {
if matches!(decl, LimitDecl::Policy(LimitPolicy::Concurrency { .. })) {
return Err(BuildError::MultipleConcurrency);
}
}
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::{BuildError, LimitDecl, LimitPolicy, RateKey, RateLimitConfig, validate_coverage};
use super::{
BuildError, LimitDecl, LimitPolicy, RateKey, RateLimitConfig,
validate_concurrency_singleton, validate_coverage,
};
use std::collections::HashMap;
use std::time::Duration;

/// A stand-in endpoint key for the tests — the shape an adapter provides.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -189,6 +237,7 @@ mod tests {
let cfg = RateLimitConfig {
global: LimitPolicy::TokenBucket {
rate: 10,
per: Duration::from_secs(1),
burst: 20,
},
local: HashMap::from([
Expand All @@ -198,7 +247,11 @@ mod tests {
),
(
TestKey::Snapshot,
LimitDecl::Policy(LimitPolicy::TokenBucket { rate: 5, burst: 5 }),
LimitDecl::Policy(LimitPolicy::TokenBucket {
rate: 5,
per: Duration::from_secs(1),
burst: 5,
}),
),
(TestKey::History, LimitDecl::GlobalOnly),
]),
Expand All @@ -208,6 +261,7 @@ mod tests {
cfg.global,
LimitPolicy::TokenBucket {
rate: 10,
per: Duration::from_secs(1),
burst: 20
}
);
Expand All @@ -220,6 +274,7 @@ mod tests {
RateLimitConfig {
global: LimitPolicy::TokenBucket {
rate: 10,
per: Duration::from_secs(1),
burst: 20,
},
local: HashMap::from([
Expand All @@ -229,7 +284,11 @@ mod tests {
),
(
TestKey::Snapshot,
LimitDecl::Policy(LimitPolicy::TokenBucket { rate: 5, burst: 5 }),
LimitDecl::Policy(LimitPolicy::TokenBucket {
rate: 5,
per: Duration::from_secs(1),
burst: 5,
}),
),
(TestKey::History, LimitDecl::GlobalOnly),
]),
Expand All @@ -254,7 +313,11 @@ mod tests {
let mut cfg = total_config();
cfg.local.insert(
TestKey::Snapshot,
LimitDecl::Policy(LimitPolicy::TokenBucket { rate: 0, burst: 5 }),
LimitDecl::Policy(LimitPolicy::TokenBucket {
rate: 0,
per: Duration::from_secs(1),
burst: 5,
}),
);
assert!(matches!(
validate_coverage(&cfg),
Expand All @@ -267,7 +330,28 @@ mod tests {
let mut cfg = total_config();
cfg.local.insert(
TestKey::Snapshot,
LimitDecl::Policy(LimitPolicy::TokenBucket { rate: 5, burst: 0 }),
LimitDecl::Policy(LimitPolicy::TokenBucket {
rate: 5,
per: Duration::from_secs(1),
burst: 0,
}),
);
assert!(matches!(
validate_coverage(&cfg),
Err(BuildError::InvalidPolicy(_))
));
}

#[test]
fn zero_period_token_bucket_is_invalid() {
let mut cfg = total_config();
cfg.local.insert(
TestKey::Snapshot,
LimitDecl::Policy(LimitPolicy::TokenBucket {
rate: 5,
per: Duration::ZERO,
burst: 5,
}),
);
assert!(matches!(
validate_coverage(&cfg),
Expand All @@ -291,13 +375,61 @@ mod tests {
#[test]
fn bad_global_policy_is_invalid() {
let mut cfg = total_config();
cfg.global = LimitPolicy::TokenBucket { rate: 0, burst: 1 };
cfg.global = LimitPolicy::TokenBucket {
rate: 0,
per: Duration::from_secs(1),
burst: 1,
};
assert!(matches!(
validate_coverage(&cfg),
Err(BuildError::InvalidPolicy(_))
));
}

#[test]
fn token_bucket_carries_a_period_for_sub_1_per_second_rates() {
// IBKR orders = 1 per 5s — inexpressible as tokens/second under u32.
let p = LimitPolicy::TokenBucket {
rate: 1,
per: Duration::from_secs(5),
burst: 1,
};
assert!(matches!(
p,
LimitPolicy::TokenBucket {
rate: 1,
burst: 1,
..
}
));
}

#[test]
fn global_and_local_concurrency_is_rejected() {
// Both-scoped request would need two held permits; Guarded holds one.
let cfg = RateLimitConfig {
global: LimitPolicy::Concurrency { max: 5 },
local: HashMap::from([
(
TestKey::PlaceOrder,
LimitDecl::Policy(LimitPolicy::Concurrency { max: 1 }),
),
(TestKey::Snapshot, LimitDecl::GlobalOnly),
(TestKey::History, LimitDecl::GlobalOnly),
]),
};
assert_eq!(
validate_concurrency_singleton(&cfg),
Err(BuildError::MultipleConcurrency)
);
}

#[test]
fn global_rate_with_local_concurrency_is_allowed() {
// The real IBKR shape: global 10/s rate + /history concurrency.
assert_eq!(validate_concurrency_singleton(&total_config()), Ok(()));
}

#[test]
fn global_only_endpoints_need_no_local_params() {
// A `GlobalOnly` decl carries no policy, so it is always coverage-valid
Expand Down
Loading