diff --git a/CHANGELOG.md b/CHANGELOG.md index bf42e0e..6b7d2a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `oath-adapter-net-http-api` HTTP contract — `HttpError` (one concrete + transport/middleware error; HTTP statuses pass through as `Ok(Response)`), + `HttpClient` (blanket-impl'd `Service` sub-trait), `ResponseBody` (buffer-xor- + stream, forwarding `Body` metadata), and `BufferMode`. New `oath-adapter-net- + http-mock` test harness (`MockClient`, `MockBody`, `MockTimer`). - WebSocket transport design: ADR-0032 (contract — untyped duplex frame channel, asymmetric `Stream`/RPITIT split, epoch-stamped lifecycle, `WsConnector` leaf, per-transport `AuthSource`) and ADR-0033 (resilience — reconnect actor over a diff --git a/Cargo.lock b/Cargo.lock index 78d4298..a5a951e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,12 @@ version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8" +[[package]] +name = "bytes" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae3f5d315924270530207e2a68396c3cc547f6dca3fbdca317cfb1a51edb593" + [[package]] name = "cfg-if" version = "1.0.4" @@ -57,6 +63,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + [[package]] name = "getrandom" version = "0.3.4" @@ -80,6 +92,39 @@ dependencies = [ "r-efi 6.0.0", ] +[[package]] +name = "http" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + [[package]] name = "itoa" version = "1.0.18" @@ -98,12 +143,32 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "memchr" version = "2.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88904434abc2901f197fe8cc55f0445e7ded921dba5911dad2e2b39b48e663c4" +[[package]] +name = "mio" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -128,6 +193,29 @@ version = "0.1.0" [[package]] name = "oath-adapter-net-http-api" version = "0.1.0" +dependencies = [ + "bytes", + "http", + "http-body", + "http-body-util", + "oath-adapter-net-api", + "pin-project-lite", + "thiserror", + "tokio", +] + +[[package]] +name = "oath-adapter-net-http-mock" +version = "0.1.0" +dependencies = [ + "bytes", + "http", + "http-body", + "http-body-util", + "oath-adapter-net-api", + "oath-adapter-net-http-api", + "tokio", +] [[package]] name = "oath-bus-api" @@ -255,6 +343,35 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -357,6 +474,15 @@ dependencies = [ "rand_core", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + [[package]] name = "regex-syntax" version = "0.8.11" @@ -388,6 +514,12 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.228" @@ -431,6 +563,32 @@ dependencies = [ "zmij", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ed6a63f02c8539c91a8685a86f4099661ba3da017932f6ebbea6de3f0fa7c90" + +[[package]] +name = "socket2" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "syn" version = "2.0.117" @@ -475,6 +633,34 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio" +version = "1.52.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" +dependencies = [ + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unarray" version = "0.1.4" @@ -496,6 +682,12 @@ dependencies = [ "libc", ] +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + [[package]] name = "wasip2" version = "1.0.4+wasi-0.2.12" diff --git a/Cargo.toml b/Cargo.toml index e12aab4..b95b274 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/model", "crates/adapter/net/api", "crates/adapter/net/http/api", + "crates/adapter/net/http/mock", "crates/bus/api", "crates/event-log/api", "crates/persistence/api", @@ -42,6 +43,7 @@ categories = ["finance", "asynchronous"] oath-model = { path = "crates/model", version = "0.1.0" } oath-adapter-net-api = { path = "crates/adapter/net/api", version = "0.1.0" } oath-adapter-net-http-api = { path = "crates/adapter/net/http/api", version = "0.1.0" } +oath-adapter-net-http-mock = { path = "crates/adapter/net/http/mock", version = "0.1.0" } oath-bus-api = { path = "crates/bus/api", version = "0.1.0" } oath-event-log-api = { path = "crates/event-log/api", version = "0.1.0" } oath-persistence-api = { path = "crates/persistence/api", version = "0.1.0" } @@ -57,6 +59,8 @@ oath-core-kernel = { path = "crates/core/kernel", version = "0.1.0" } bytes = "1" http = "1" http-body = "1" +http-body-util = "0.1" +pin-project-lite = "0.2" futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } serde = { version = "1", features = ["derive"] } diff --git a/crates/adapter/net/http/api/Cargo.toml b/crates/adapter/net/http/api/Cargo.toml index 7db5fc5..e81c907 100644 --- a/crates/adapter/net/http/api/Cargo.toml +++ b/crates/adapter/net/http/api/Cargo.toml @@ -5,5 +5,17 @@ edition.workspace = true rust-version.workspace = true license.workspace = true +[dependencies] +oath-adapter-net-api = { workspace = true } +thiserror = { workspace = true } +http = { workspace = true } +bytes = { workspace = true } +http-body = { workspace = true } +http-body-util = { workspace = true } +pin-project-lite = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } + [lints] workspace = true diff --git a/crates/adapter/net/http/api/src/body.rs b/crates/adapter/net/http/api/src/body.rs new file mode 100644 index 0000000..66722e8 --- /dev/null +++ b/crates/adapter/net/http/api/src/body.rs @@ -0,0 +1,150 @@ +//! The canonical HTTP response body and the per-request buffer/stream directive. + +use crate::HttpError; +use bytes::Bytes; +use http_body::{Body, Frame, SizeHint}; +use http_body_util::Full; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Per-request directive: buffer the response body inside the retry boundary, or +/// return it streaming at headers (ADR-0030 §4). `Copy` so it survives the +/// request clone `Retry` makes. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BufferMode { + /// Collect the body to `Bytes` before returning (full retry coverage). + Buffer, + /// Return the live body at headers (adapter owns mid-stream recovery). + Stream, +} + +pin_project_lite::pin_project! { + /// The canonical response body: one buffered frame *xor* a live streaming + /// body, behind one stable type so adapters never name the buffer-vs-stream + /// machinery. Forwards all three `Body` methods to the active arm — a + /// wrapper that silently reported the default `size_hint`/`is_end_stream` + /// would make a caller's `.collect()` pre-size and any max-size guard wrong. + #[project = ResponseBodyProj] + #[allow(missing_docs)] + pub enum ResponseBody { + /// A fully-collected body (single frame). + Buffered { #[pin] body: Full }, + /// A live streaming backend body. + Streaming { #[pin] body: B }, + } +} + +impl ResponseBody { + /// Wrap already-collected bytes as a one-frame buffered body. + #[must_use] + pub fn buffered(bytes: Bytes) -> Self { + Self::Buffered { + body: Full::new(bytes), + } + } + + /// Wrap a live streaming backend body. + #[must_use] + pub const fn streaming(body: B) -> Self { + Self::Streaming { body } + } +} + +impl Body for ResponseBody +where + B: Body, +{ + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, HttpError>>> { + match self.project() { + // `Full`'s error is `Infallible`, so map it away to unify with `HttpError`. + ResponseBodyProj::Buffered { body } => body + .poll_frame(cx) + .map(|frame| frame.map(|res| res.map_err(|never| match never {}))), + ResponseBodyProj::Streaming { body } => body.poll_frame(cx), + } + } + + fn is_end_stream(&self) -> bool { + match self { + Self::Buffered { body } => body.is_end_stream(), + Self::Streaming { body } => body.is_end_stream(), + } + } + + fn size_hint(&self) -> SizeHint { + match self { + Self::Buffered { body } => body.size_hint(), + Self::Streaming { body } => body.size_hint(), + } + } +} + +#[cfg(test)] +mod tests { + use super::{BufferMode, ResponseBody}; + use crate::HttpError; + use bytes::Bytes; + use http_body::{Body, Frame, SizeHint}; + use std::pin::Pin; + use std::task::{Context, Poll}; + + // Inner body with a known, non-default size_hint / is_end_stream, so the + // parity assertion is meaningful. + struct Stub { + remaining: u64, + } + impl Body for Stub { + type Data = Bytes; + type Error = HttpError; + fn poll_frame( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, HttpError>>> { + Poll::Ready(None) + } + fn is_end_stream(&self) -> bool { + self.remaining == 0 + } + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(self.remaining) + } + } + + #[test] + fn streaming_forwards_size_hint_and_is_end_stream() { + let reference = Stub { remaining: 42 }; + let ref_hint = reference.size_hint().exact(); + let ref_end = reference.is_end_stream(); + let wrapped = ResponseBody::streaming(Stub { remaining: 42 }); + assert_eq!(wrapped.size_hint().exact(), ref_hint); // NOT silently None/unbounded + assert_eq!(wrapped.is_end_stream(), ref_end); + } + + #[test] + fn streaming_is_end_stream_is_forwarded_not_defaulted() { + // Inner `is_end_stream()` is `true`; the trait default is `false`, so this + // assertion fails if the override were dropped — unlike a `remaining: 42` + // (false) case, which the default would also satisfy. + let wrapped = ResponseBody::streaming(Stub { remaining: 0 }); + assert!(wrapped.is_end_stream()); + } + + #[test] + fn buffered_reports_exact_length() { + let body: ResponseBody = ResponseBody::buffered(Bytes::from_static(b"hello")); + assert_eq!(body.size_hint().exact(), Some(5)); + } + + #[test] + fn buffer_mode_is_copy() { + let m = BufferMode::Buffer; + let n = m; // Copy + assert_eq!(m, n); + } +} diff --git a/crates/adapter/net/http/api/src/client.rs b/crates/adapter/net/http/api/src/client.rs new file mode 100644 index 0000000..14ec599 --- /dev/null +++ b/crates/adapter/net/http/api/src/client.rs @@ -0,0 +1,90 @@ +//! The `HttpClient` dependency-inversion seam for adapters. +//! +//! A backend implements [`Service`] once and is `HttpClient` for free via blanket +//! impl (ADR-0030 §6). Per ADR-0029 §5 this is a compile-time seam — never `dyn`. + +use crate::{HttpError, Service}; +use bytes::Bytes; +use std::future::Future; + +/// A composed HTTP client: a [`Service`] from `http::Request` to +/// `http::Response` with `Error = HttpError`. +pub trait HttpClient: + Service, Response = http::Response, Error = HttpError> +{ + /// The response body type (generic, for zero-alloc flow-through). + type Body: http_body::Body; + + /// Send a request — sugar over [`Service::call`]. + fn send( + &self, + req: http::Request, + ) -> impl Future, HttpError>> + Send { + self.call(req) + } +} + +impl HttpClient for S +where + S: Service, Response = http::Response, Error = HttpError>, + B: http_body::Body, +{ + type Body = B; +} + +#[cfg(test)] +mod tests { + use super::HttpClient; + use crate::{HttpError, Service}; + use bytes::Bytes; + use http_body::{Body, Frame, SizeHint}; + use std::pin::Pin; + use std::task::{Context, Poll}; + + // Minimal body whose error is `HttpError` (stock `Full`/`Empty` are `Infallible`). + struct EmptyBody; + impl Body for EmptyBody { + type Data = Bytes; + type Error = HttpError; + fn poll_frame( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, HttpError>>> { + Poll::Ready(None) + } + fn is_end_stream(&self) -> bool { + true + } + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(0) + } + } + + #[derive(Clone)] + struct Leaf; + impl Service> for Leaf { + type Response = http::Response; + type Error = HttpError; + #[allow(clippy::manual_async_fn)] + fn call( + &self, + _req: http::Request, + ) -> impl std::future::Future> + Send { + async { Ok(http::Response::new(EmptyBody)) } + } + } + + #[test] + fn any_matching_service_is_httpclient() { + fn assert_http_client(_: &C) {} + assert_http_client(&Leaf); // blanket impl applies + } + + #[tokio::test] + async fn send_is_sugar_over_call() { + let resp = HttpClient::send(&Leaf, http::Request::new(Bytes::new())) + .await + .unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + } +} diff --git a/crates/adapter/net/http/api/src/error.rs b/crates/adapter/net/http/api/src/error.rs new file mode 100644 index 0000000..37a98c7 --- /dev/null +++ b/crates/adapter/net/http/api/src/error.rs @@ -0,0 +1,95 @@ +//! The single concrete error type across the HTTP stack — transport and +//! middleware failures only. +//! +//! HTTP 4xx/5xx *statuses* are NOT errors here: they flow through as +//! `Ok(http::Response)` with the body intact for the adapter to classify +//! (ADR-0030 §5). Retry/CircuitBreaker peek `Response::status()` for their +//! resilience decisions. + +use oath_adapter_net_api::{ErrorKind, HasErrorKind}; + +/// A boxed error source, preserving backend detail for logs without leaking the +/// concrete type. +pub type BoxError = Box; + +/// The single `Service::Error` (and every `Body::Error`) of the HTTP stack. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum HttpError { + /// The request did not complete within its timeout. + #[error("request timed out")] + Timeout, + /// A connection-level failure (DNS, TCP, TLS, backend transport). + #[error("connection failure")] + Connection(#[source] BoxError), + /// A pacing wait exceeded `max_wait` — the request was not sent. + #[error("throttled: pacing wait exceeded max_wait")] + Throttled, + /// Credential stamping or refresh failed. + #[error("authorization failed: {0}")] + Auth(String), + /// A backend error that does not fit another variant. + #[error("network error")] + Other(#[source] BoxError), +} + +impl HttpError { + /// Construct an [`HttpError::Auth`] from a message. `AuthSource` impls use this. + #[must_use] + pub fn auth(message: impl Into) -> Self { + Self::Auth(message.into()) + } + + /// Construct an [`HttpError::Connection`] from a source error. + #[must_use] + pub fn connection(source: impl Into) -> Self { + Self::Connection(source.into()) + } + + /// Construct an [`HttpError::Other`] from a source error. + #[must_use] + pub fn other(source: impl Into) -> Self { + Self::Other(source.into()) + } +} + +impl HasErrorKind for HttpError { + fn kind(&self) -> ErrorKind { + match self { + Self::Timeout => ErrorKind::Timeout, + Self::Connection(_) => ErrorKind::Connection, + Self::Throttled => ErrorKind::Throttled, + Self::Auth(_) => ErrorKind::Auth, + Self::Other(_) => ErrorKind::Unknown, + } + } +} + +#[cfg(test)] +mod tests { + use super::HttpError; + use oath_adapter_net_api::{ErrorKind, HasErrorKind}; + + #[test] + fn kind_maps_each_variant() { + assert_eq!(HttpError::Timeout.kind(), ErrorKind::Timeout); + assert_eq!(HttpError::connection("reset").kind(), ErrorKind::Connection); + assert_eq!(HttpError::Throttled.kind(), ErrorKind::Throttled); + assert_eq!(HttpError::auth("expired").kind(), ErrorKind::Auth); + assert_eq!(HttpError::other("boom").kind(), ErrorKind::Unknown); + } + + #[test] + fn auth_carries_message() { + assert_eq!( + HttpError::auth("no token").to_string(), + "authorization failed: no token" + ); + } + + #[test] + fn is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } +} diff --git a/crates/adapter/net/http/api/src/lib.rs b/crates/adapter/net/http/api/src/lib.rs index c93ae59..db8fe6c 100644 --- a/crates/adapter/net/http/api/src/lib.rs +++ b/crates/adapter/net/http/api/src/lib.rs @@ -1,11 +1,23 @@ //! `oath-adapter-net-http-api` — the HTTP transport contract over the kernel. //! //! Builds on `oath-adapter-net-api` (composition machinery + `ErrorKind` + -//! `Timer`) and adds the request/reply [`Service`] connection shape. The HTTP -//! data plane (`HttpError`, `HttpClient`, `ResponseBody`, the layers) lands in -//! later slices. No async runtime, `hyper`, `reqwest`, or `serde` here. +//! `Timer`). Defines the HTTP transport contract: +//! +//! - [`service`] — the `Service` request/reply connection shape +//! - [`error`] — `HttpError` and its `HasErrorKind` impl +//! - [`client`] — the `HttpClient` dependency-inversion seam +//! - [`body`] — `ResponseBody` and `BufferMode` +//! +//! The resilience layers, `stack`/`build` assembly, and backends land in later +//! slices. No async runtime, `hyper`, `reqwest`, or `serde` here. #![forbid(unsafe_code)] +pub mod body; +pub mod client; +pub mod error; pub mod service; +pub use body::{BufferMode, ResponseBody}; +pub use client::HttpClient; +pub use error::{BoxError, HttpError}; pub use service::Service; diff --git a/crates/adapter/net/http/mock/Cargo.toml b/crates/adapter/net/http/mock/Cargo.toml new file mode 100644 index 0000000..b197bf0 --- /dev/null +++ b/crates/adapter/net/http/mock/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "oath-adapter-net-http-mock" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +oath-adapter-net-api = { workspace = true } +oath-adapter-net-http-api = { workspace = true } +http = { workspace = true } +bytes = { workspace = true } +http-body = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } +http-body-util = { workspace = true } diff --git a/crates/adapter/net/http/mock/src/body.rs b/crates/adapter/net/http/mock/src/body.rs new file mode 100644 index 0000000..b752db3 --- /dev/null +++ b/crates/adapter/net/http/mock/src/body.rs @@ -0,0 +1,71 @@ +//! A response body that yields pre-set data frames, with a controllable +//! `size_hint`/`is_end_stream` for exercising body-metadata forwarding. + +use bytes::Bytes; +use http_body::{Body, Frame, SizeHint}; +use oath_adapter_net_http_api::HttpError; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A body that yields its configured frames in order, then ends. +#[derive(Debug, Default)] +pub struct MockBody { + frames: VecDeque, +} + +impl MockBody { + /// A body yielding `frames` in order. + #[must_use] + pub fn new(frames: impl IntoIterator) -> Self { + Self { + frames: frames.into_iter().collect(), + } + } + + /// An immediately-ended body. + #[must_use] + pub fn empty() -> Self { + Self::default() + } +} + +impl Body for MockBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, HttpError>>> { + // `MockBody` holds no pinned fields, so `get_mut` is sound (auto-`Unpin`). + let this = self.get_mut(); + Poll::Ready(this.frames.pop_front().map(|data| Ok(Frame::data(data)))) + } + + fn is_end_stream(&self) -> bool { + self.frames.is_empty() + } + + fn size_hint(&self) -> SizeHint { + let total: u64 = self.frames.iter().map(|f| f.len() as u64).sum(); + SizeHint::with_exact(total) + } +} + +#[cfg(test)] +mod tests { + use super::MockBody; + use bytes::Bytes; + use http_body::Body; + use http_body_util::BodyExt; + + #[tokio::test] + async fn yields_frames_then_ends_and_reports_exact_size() { + let body = MockBody::new([Bytes::from_static(b"ab"), Bytes::from_static(b"cde")]); + assert_eq!(body.size_hint().exact(), Some(5)); + assert!(!body.is_end_stream()); + let collected = body.collect().await.unwrap().to_bytes(); + assert_eq!(collected, Bytes::from_static(b"abcde")); + } +} diff --git a/crates/adapter/net/http/mock/src/client.rs b/crates/adapter/net/http/mock/src/client.rs new file mode 100644 index 0000000..ea89e6a --- /dev/null +++ b/crates/adapter/net/http/mock/src/client.rs @@ -0,0 +1,81 @@ +//! A canned-response `Service` leaf that records the requests it receives. + +use crate::{MockBody, lock}; +use bytes::Bytes; +use oath_adapter_net_http_api::{HttpError, Service}; +use std::future::Future; +use std::sync::{Arc, Mutex}; + +/// A leaf client that returns a fixed status + body and records every request. +#[derive(Debug, Clone)] +pub struct MockClient { + status: http::StatusCode, + frames: Vec, + requests: Arc>>>, +} + +impl MockClient { + /// A client returning `status` with a body of `frames`. + #[must_use] + pub fn new(status: http::StatusCode, frames: impl IntoIterator) -> Self { + Self { + status, + frames: frames.into_iter().collect(), + requests: Arc::new(Mutex::new(Vec::new())), + } + } + + /// A `200 OK` client whose body is `body`. + #[must_use] + pub fn ok(body: impl Into) -> Self { + Self::new(http::StatusCode::OK, [body.into()]) + } + + /// The requests this client has received, in order. + #[must_use] + pub fn recorded_requests(&self) -> Vec> { + lock(&self.requests).clone() + } +} + +impl Service> for MockClient { + type Response = http::Response; + type Error = HttpError; + + fn call( + &self, + req: http::Request, + ) -> impl Future> + Send { + let requests = Arc::clone(&self.requests); + let status = self.status; + let frames = self.frames.clone(); + async move { + lock(&requests).push(req); + let mut resp = http::Response::new(MockBody::new(frames)); + *resp.status_mut() = status; + Ok(resp) + } + } +} + +#[cfg(test)] +mod tests { + use super::MockClient; + use bytes::Bytes; + use http_body_util::BodyExt; + use oath_adapter_net_http_api::HttpClient; + + #[tokio::test] + async fn returns_canned_body_and_records_requests() { + let client = MockClient::ok(Bytes::from_static(b"pong")); + let mut req = http::Request::new(Bytes::from_static(b"ping")); + *req.uri_mut() = "/tickle".parse().unwrap(); + let resp = client.send(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(body, Bytes::from_static(b"pong")); + let recorded = client.recorded_requests(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].uri(), "/tickle"); + } +} diff --git a/crates/adapter/net/http/mock/src/lib.rs b/crates/adapter/net/http/mock/src/lib.rs new file mode 100644 index 0000000..e2ac4ad --- /dev/null +++ b/crates/adapter/net/http/mock/src/lib.rs @@ -0,0 +1,20 @@ +//! Test harness for the net-http stack: a canned-response `MockClient` leaf, a +//! frame-controllable `MockBody`, and a `MockTimer` virtual clock. Consumed by +//! downstream crates via `[dev-dependencies]` only — it has no production edge. +#![forbid(unsafe_code)] + +pub mod body; +pub mod client; +pub mod timer; + +pub use body::MockBody; +pub use client::MockClient; +pub use timer::MockTimer; + +use std::sync::{Mutex, MutexGuard, PoisonError}; + +/// Lock `mutex`, recovering the guard if a panic poisoned it — mock state stays +/// usable so a failing test reports its own assertion, not a poison panic. +fn lock(mutex: &Mutex) -> MutexGuard<'_, T> { + mutex.lock().unwrap_or_else(PoisonError::into_inner) +} diff --git a/crates/adapter/net/http/mock/src/timer.rs b/crates/adapter/net/http/mock/src/timer.rs new file mode 100644 index 0000000..f6eb486 --- /dev/null +++ b/crates/adapter/net/http/mock/src/timer.rs @@ -0,0 +1,173 @@ +//! A virtual clock for deterministically driving timing layers in tests. +//! +//! `std::time::Instant` has no value constructor, so `MockTimer` anchors to a +//! real `Instant::now()` at construction and advances via a stored offset +//! (behind interior mutability, since `Timer::now` takes `&self`). `sleep` +//! registers a waker released by `advance` — a no-op `sleep` would make +//! elapsed-time-dependent tests vacuous. Cf. `governor::clock::FakeRelativeClock`. + +use crate::lock; +use oath_adapter_net_api::Timer; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +struct State { + now: Instant, + waiters: Vec<(Instant, Waker)>, +} + +/// A cloneable virtual clock. Clones share one timeline. +#[derive(Debug, Clone)] +pub struct MockTimer { + state: Arc>, +} + +impl MockTimer { + /// A clock anchored at the current real instant. + #[must_use] + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(State { + now: Instant::now(), + waiters: Vec::new(), + })), + } + } + + /// Advance virtual time by `dur`, waking every sleeper now due. + pub fn advance(&self, dur: Duration) { + let mut state = lock(&self.state); + state.now += dur; + let now = state.now; + let mut due = Vec::new(); + state.waiters.retain(|(deadline, waker)| { + if *deadline <= now { + due.push(waker.clone()); + false + } else { + true + } + }); + drop(state); // release before waking, so a woken poll can re-lock + for waker in due { + waker.wake(); + } + } +} + +impl Default for MockTimer { + fn default() -> Self { + Self::new() + } +} + +/// The future returned by [`MockTimer::sleep`]. +#[derive(Debug)] +pub struct Sleep { + state: Arc>, + deadline: Instant, +} + +impl Future for Sleep { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let mut state = lock(&self.state); + if state.now >= self.deadline { + return Poll::Ready(()); + } + // Re-polls while pending (e.g. this `Sleep` in a `select!` woken by a + // sibling future) must not stack duplicate waiters. Waiters carry no + // per-future identity, so dedup on `(deadline, will_wake)`: skip when + // this exact waker is already queued for this deadline — a re-poll of + // the same future is a no-op, while an unrelated future that merely + // shares the deadline still registers its own distinct waker. + let already_registered = state + .waiters + .iter() + .any(|(deadline, waker)| *deadline == self.deadline && waker.will_wake(cx.waker())); + if !already_registered { + state.waiters.push((self.deadline, cx.waker().clone())); + } + Poll::Pending + } +} + +impl Timer for MockTimer { + fn sleep(&self, dur: Duration) -> impl Future + Send { + let deadline = { + let state = lock(&self.state); + state.now + dur + }; + Sleep { + state: Arc::clone(&self.state), + deadline, + } + } + + fn now(&self) -> Instant { + lock(&self.state).now + } +} + +#[cfg(test)] +mod tests { + use super::MockTimer; + use oath_adapter_net_api::Timer; + use std::future::Future; + use std::pin::pin; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::task::{Context, Poll, Wake, Waker}; + use std::time::Duration; + + // A waker with stable Arc identity (so `will_wake` treats a clone as equal) + // that records how often it is woken. + struct CountingWaker(AtomicUsize); + impl Wake for CountingWaker { + fn wake(self: Arc) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + + #[test] + fn repeated_poll_does_not_stack_waiters() { + let timer = MockTimer::new(); + let counter = Arc::new(CountingWaker(AtomicUsize::new(0))); + let waker = Waker::from(Arc::clone(&counter)); + let mut cx = Context::from_waker(&waker); + + let mut sleep = pin!(timer.sleep(Duration::from_secs(1))); + assert_eq!(sleep.as_mut().poll(&mut cx), Poll::Pending); + // A second poll with the same waker + deadline must not re-register. + assert_eq!(sleep.as_mut().poll(&mut cx), Poll::Pending); + + let waiters = timer.state.lock().unwrap().waiters.len(); + assert_eq!(waiters, 1, "duplicate waiter registered on re-poll"); + + // Advancing past the deadline wakes the single registration exactly once. + timer.advance(Duration::from_secs(1)); + assert_eq!( + counter.0.load(Ordering::SeqCst), + 1, + "sleeper woken more than once" + ); + } + + #[tokio::test] + async fn advance_moves_now_and_wakes_sleepers() { + let timer = MockTimer::new(); + let start = timer.now(); + let timer_for_spawn = timer.clone(); + // Wake the sleeper by advancing past its deadline on another task. + let handle = + tokio::spawn(async move { timer_for_spawn.sleep(Duration::from_secs(10)).await }); + tokio::task::yield_now().await; + timer.advance(Duration::from_secs(10)); + handle.await.unwrap(); + assert_eq!(timer.now().duration_since(start), Duration::from_secs(10)); + } +} diff --git a/docs/superpowers/plans/2026-07-01-net-http-contract.md b/docs/superpowers/plans/2026-07-01-net-http-contract.md new file mode 100644 index 0000000..48cb240 --- /dev/null +++ b/docs/superpowers/plans/2026-07-01-net-http-contract.md @@ -0,0 +1,951 @@ +# net-http HTTP Contract (Slice 0, PR 2) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Give `oath-adapter-net-http-api` its HTTP data-plane contracts — `HttpError`, `HttpClient`, `ResponseBody`, `BufferMode` — and ship the standalone `oath-adapter-net-http-mock` test harness, so later slices (auth/body/rate layers, then assembly) have a typed, mockable HTTP surface to build on. + +**Architecture:** The stack is pure transport — bytes in, bytes out. `HttpError` is one concrete error for **transport/middleware failures only** (HTTP 4xx/5xx statuses are NOT error-ified; they flow through as `Ok(Response)` with body intact for the adapter to classify). `HttpClient` is a blanket-impl'd `Service` sub-trait so any backend is `HttpClient` for free. `ResponseBody` is a `pin-project-lite` enum (buffered `Full` xor streaming `B`) that forwards all three `Body` methods. The mock crate is a self-contained harness (`MockClient`, `MockBody`, `MockTimer`) consumed by downstream slices via `[dev-dependencies]`. + +**Tech Stack:** Rust (edition 2024, MSRV 1.90), `just`, `http`/`http-body`/`http-body-util`/`bytes`/`pin-project-lite`/`thiserror`. **No** `tokio`/`hyper`/`reqwest`/`serde` — `net-http-api` stays runtime-free. + +**Source spec:** [docs/superpowers/specs/2026-06-30-net-http-construction-surface-design.md](../specs/2026-06-30-net-http-construction-surface-design.md). This is **Slice 0, PR 2**; it builds on PR 1 (#57, `feat/net-http-api-repartition`) and follows the PR 2 roadmap in [2026-06-30-net-http-foundation.md](2026-06-30-net-http-foundation.md). + +**Depends on PR 1 having merged** (or a branch stacked on it): PR 2 consumes `oath_adapter_net_http_api::Service`, `oath_adapter_net_api::{ErrorKind, HasErrorKind, Timer}`. + +## Global Constraints + +Every task implicitly includes these: + +- **Edition 2024, MSRV 1.90.** No `unsafe` (`unsafe_code = "deny"`). Body impls use `pin-project-lite`, never manual `unsafe`. +- **No `unwrap`/`expect`/indexing/panic in non-test code** — return `Result` / recover (`Mutex` poison via `unwrap_or_else(std::sync::PoisonError::into_inner)`). Test code is exempt for `unwrap`/`expect`/indexing only. +- **`just lint` runs `clippy --all-targets -- -D warnings`, which promotes `pedantic`/`nursery` (warn-level) to errors** — so all code, **including tests**, must be pedantic-clean: no `as` casts that trip `cast_possible_truncation` (use `u64::try_from(x).unwrap_or(u64::MAX)` or pick a `u64` field), add `#[must_use]` where clippy asks, document all public items (`missing_docs`), derive `Debug` (`missing_debug_implementations`), no unreachable `pub`. +- **`net-http-api` charter:** no `tokio`/`hyper`/`reqwest`/`serde`; free of any async runtime. Adds only `http`/`bytes`/`http-body`/`http-body-util`/`pin-project-lite`/`thiserror` (+ `oath-adapter-net-api`) as tasks use them. +- **`HttpError` is model A:** transport/middleware failures only. HTTP error statuses are never converted to `HttpError`. +- **Deps** via `[workspace.dependencies]` (explicit `version` for internal crates). Add a dep in the task that first *uses* it (keeps `cargo-machete` green). +- **DoD per PR:** `just ci` green. Update `CHANGELOG.md` `[Unreleased]`. One issue → one branch → worktree under `.claude/worktrees/` → one PR (`Closes #`). + +--- + +## File Structure + +- `crates/adapter/net/http/api/src/error.rs` — **new.** `HttpError`, `BoxError`, `HasErrorKind` impl. +- `crates/adapter/net/http/api/src/client.rs` — **new.** `HttpClient` trait + blanket impl. +- `crates/adapter/net/http/api/src/body.rs` — **new.** `ResponseBody`, `BufferMode`. +- `crates/adapter/net/http/api/src/lib.rs` — **modify.** Add `mod`/`pub use` for the above. +- `crates/adapter/net/http/api/Cargo.toml` — **modify.** Add deps as used. +- `crates/adapter/net/http/mock/{Cargo.toml,src/lib.rs,src/body.rs,src/client.rs,src/timer.rs}` — **new crate** `oath-adapter-net-http-mock`. +- `Cargo.toml` (workspace) — **modify.** Add `http-body-util`/`pin-project-lite` to `[workspace.dependencies]`; register the mock crate as a member + dep entry. +- `CHANGELOG.md` — **modify.** + +Each PR-2 task is one commit; the four tasks together are one PR/issue. + +--- + +## Task 2.1: `HttpError` + `HasErrorKind` + +**Files:** +- Create: `crates/adapter/net/http/api/src/error.rs` +- Modify: `crates/adapter/net/http/api/src/lib.rs`, `crates/adapter/net/http/api/Cargo.toml` + +**Interfaces:** +- Consumes: `oath_adapter_net_api::{ErrorKind, HasErrorKind}` (kernel). +- Produces: `oath_adapter_net_http_api::{HttpError, BoxError}`. `HttpError::auth(impl Into) -> HttpError`, `HttpError::connection(impl Into)`, `HttpError::other(impl Into)`. `impl HasErrorKind for HttpError`. + +- [ ] **Step 1: Add deps** + +In `crates/adapter/net/http/api/Cargo.toml`, add a `[dependencies]` section: + +```toml +[dependencies] +oath-adapter-net-api = { workspace = true } +thiserror = { workspace = true } +``` + +- [ ] **Step 2: Write the failing test** + +Create `crates/adapter/net/http/api/src/error.rs` with only the test, and add `pub mod error;` to `lib.rs`: + +```rust +//! The single concrete error type for the HTTP stack — placeholder; filled in step 4. + +#[cfg(test)] +mod tests { + use super::HttpError; + use oath_adapter_net_api::{ErrorKind, HasErrorKind}; + + #[test] + fn kind_maps_each_variant() { + assert_eq!(HttpError::Timeout.kind(), ErrorKind::Timeout); + assert_eq!(HttpError::connection("reset").kind(), ErrorKind::Connection); + assert_eq!(HttpError::Throttled.kind(), ErrorKind::Throttled); + assert_eq!(HttpError::auth("expired").kind(), ErrorKind::Auth); + assert_eq!(HttpError::other("boom").kind(), ErrorKind::Unknown); + } + + #[test] + fn auth_carries_message() { + assert_eq!(HttpError::auth("no token").to_string(), "authorization failed: no token"); + } + + #[test] + fn is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } +} +``` + +- [ ] **Step 3: Run it to verify it fails** + +Run: `just check` +Expected: FAIL — `cannot find type HttpError`. + +- [ ] **Step 4: Implement `HttpError`** + +Prepend to `error.rs` (above the test), replacing the placeholder `//!` line: + +```rust +//! The single concrete error type across the HTTP stack — transport and +//! middleware failures only. HTTP 4xx/5xx *statuses* are NOT errors here: they +//! flow through as `Ok(http::Response)` with the body intact for the adapter to +//! classify (ADR-0030 §5). Retry/CircuitBreaker peek `Response::status()` for +//! their resilience decisions. + +use oath_adapter_net_api::{ErrorKind, HasErrorKind}; + +/// A boxed error source, preserving backend detail for logs without leaking the +/// concrete type. +pub type BoxError = Box; + +/// The single `Service::Error` (and every `Body::Error`) of the HTTP stack. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum HttpError { + /// The request did not complete within its timeout. + #[error("request timed out")] + Timeout, + /// A connection-level failure (DNS, TCP, TLS, backend transport). + #[error("connection failure")] + Connection(#[source] BoxError), + /// A pacing wait exceeded `max_wait` — the request was not sent. + #[error("throttled: pacing wait exceeded max_wait")] + Throttled, + /// Credential stamping or refresh failed. + #[error("authorization failed: {0}")] + Auth(String), + /// A backend error that does not fit another variant. + #[error("network error")] + Other(#[source] BoxError), +} + +impl HttpError { + /// Construct an [`HttpError::Auth`] from a message. `AuthSource` impls use this. + #[must_use] + pub fn auth(message: impl Into) -> Self { + Self::Auth(message.into()) + } + + /// Construct an [`HttpError::Connection`] from a source error. + #[must_use] + pub fn connection(source: impl Into) -> Self { + Self::Connection(source.into()) + } + + /// Construct an [`HttpError::Other`] from a source error. + #[must_use] + pub fn other(source: impl Into) -> Self { + Self::Other(source.into()) + } +} + +impl HasErrorKind for HttpError { + fn kind(&self) -> ErrorKind { + match self { + Self::Timeout => ErrorKind::Timeout, + Self::Connection(_) => ErrorKind::Connection, + Self::Throttled => ErrorKind::Throttled, + Self::Auth(_) => ErrorKind::Auth, + Self::Other(_) => ErrorKind::Unknown, + } + } +} +``` + +Add to `lib.rs`: `pub mod error;`, `pub use error::{BoxError, HttpError};`, and an `error` entry in the `//!` module-list doc. + +- [ ] **Step 5: Run tests** + +Run: `just check && cargo test -p oath-adapter-net-http-api error && just lint` +Expected: PASS, warning-free. + +- [ ] **Step 6: Commit** + +```bash +git add crates/adapter/net/http/api/src/error.rs crates/adapter/net/http/api/src/lib.rs crates/adapter/net/http/api/Cargo.toml +git commit -m "feat(net): HttpError — one concrete transport/middleware error, HasErrorKind" +``` + +--- + +## Task 2.2: `HttpClient` blanket-impl'd `Service` sub-trait + +**Files:** +- Create: `crates/adapter/net/http/api/src/client.rs` +- Modify: `crates/adapter/net/http/api/src/lib.rs`, `crates/adapter/net/http/api/Cargo.toml` + +**Interfaces:** +- Consumes: `crate::{Service, HttpError}`; `http`, `bytes`, `http-body`. +- Produces: `oath_adapter_net_http_api::HttpClient` — a trait with `type Body: http_body::Body` and `fn send(&self, http::Request) -> impl Future, HttpError>> + Send`; blanket `impl HttpClient for S`. + +- [ ] **Step 1: Add deps** + +Append to `crates/adapter/net/http/api/Cargo.toml` `[dependencies]`: + +```toml +http = { workspace = true } +bytes = { workspace = true } +http-body = { workspace = true } +``` + +- [ ] **Step 2: Write the failing test** + +Create `crates/adapter/net/http/api/src/client.rs` with the test only; add `pub mod client;` to `lib.rs`: + +```rust +//! The `HttpClient` dependency-inversion seam — placeholder; filled in step 4. + +#[cfg(test)] +mod tests { + use super::HttpClient; + use crate::{HttpError, Service}; + use bytes::Bytes; + use http_body::{Body, Frame, SizeHint}; + use std::pin::Pin; + use std::task::{Context, Poll}; + + // Minimal body whose error is `HttpError` (stock `Full`/`Empty` are `Infallible`). + struct EmptyBody; + impl Body for EmptyBody { + type Data = Bytes; + type Error = HttpError; + fn poll_frame(self: Pin<&mut Self>, _: &mut Context<'_>) + -> Poll, HttpError>>> { + Poll::Ready(None) + } + fn is_end_stream(&self) -> bool { true } + fn size_hint(&self) -> SizeHint { SizeHint::with_exact(0) } + } + + #[derive(Clone)] + struct Leaf; + impl Service> for Leaf { + type Response = http::Response; + type Error = HttpError; + fn call(&self, _req: http::Request) + -> impl std::future::Future> + Send { + async { Ok(http::Response::new(EmptyBody)) } + } + } + + #[test] + fn any_matching_service_is_httpclient() { + fn assert_http_client(_: &C) {} + assert_http_client(&Leaf); // blanket impl applies + } + + #[tokio::test] + async fn send_is_sugar_over_call() { + let resp = HttpClient::send(&Leaf, http::Request::new(Bytes::new())).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + } +} +``` + +Add `tokio` as a **dev-dependency** to `crates/adapter/net/http/api/Cargo.toml` (test-only executor; does not touch the runtime-free production graph): + +```toml +[dev-dependencies] +tokio = { workspace = true } +``` + +- [ ] **Step 3: Run it to verify it fails** + +Run: `just check` +Expected: FAIL — `cannot find trait HttpClient`. + +- [ ] **Step 4: Implement `HttpClient`** + +Prepend to `client.rs`: + +```rust +//! The named dependency-inversion seam adapters code against. It *is* a +//! [`Service`], so the `Layer` machinery composes it; a backend implements +//! `Service` once and is `HttpClient` for free (ADR-0030 §6). Per ADR-0029 §5 it +//! is a compile-time `impl HttpClient` seam — never `dyn`. + +use crate::{HttpError, Service}; +use bytes::Bytes; +use std::future::Future; + +/// A composed HTTP client: a [`Service`] from `http::Request` to +/// `http::Response` with `Error = HttpError`. +pub trait HttpClient: + Service, Response = http::Response, Error = HttpError> +{ + /// The response body type (generic, for zero-alloc flow-through). + type Body: http_body::Body; + + /// Send a request — sugar over [`Service::call`]. + fn send( + &self, + req: http::Request, + ) -> impl Future, HttpError>> + Send { + self.call(req) + } +} + +impl HttpClient for S +where + S: Service, Response = http::Response, Error = HttpError>, + B: http_body::Body, +{ + type Body = B; +} +``` + +Add to `lib.rs`: `pub mod client;`, `pub use client::HttpClient;`, module-doc line. + +- [ ] **Step 5: Run tests** + +Run: `just check && cargo test -p oath-adapter-net-http-api client && just lint` +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add crates/adapter/net/http/api/src/client.rs crates/adapter/net/http/api/src/lib.rs crates/adapter/net/http/api/Cargo.toml +git commit -m "feat(net): HttpClient — blanket-impl'd Service sub-trait with send sugar" +``` + +--- + +## Task 2.3: `ResponseBody` + `BufferMode` + +**Files:** +- Create: `crates/adapter/net/http/api/src/body.rs` +- Modify: `crates/adapter/net/http/api/src/lib.rs`, `crates/adapter/net/http/api/Cargo.toml`, root `Cargo.toml` + +**Interfaces:** +- Consumes: `crate::HttpError`; `http-body`, `http-body-util` (`Full`), `bytes`, `pin-project-lite`. +- Produces: `oath_adapter_net_http_api::ResponseBody` with `ResponseBody::buffered(Bytes) -> Self` and `ResponseBody::streaming(B) -> Self`, and `impl> Body for ResponseBody` forwarding `poll_frame`/`is_end_stream`/`size_hint`. `oath_adapter_net_http_api::BufferMode` (`Buffer`/`Stream`, `Copy`). + +- [ ] **Step 1: Add deps** + +In the root `Cargo.toml` `[workspace.dependencies]`, add: + +```toml +http-body-util = "0.1" +pin-project-lite = "0.2" +``` + +In `crates/adapter/net/http/api/Cargo.toml` `[dependencies]`, add: + +```toml +http-body-util = { workspace = true } +pin-project-lite = { workspace = true } +``` + +- [ ] **Step 2: Write the failing test** + +Create `crates/adapter/net/http/api/src/body.rs` with the test only; add `pub mod body;` to `lib.rs`: + +```rust +//! The canonical response body + buffer-mode directive — placeholder; step 4. + +#[cfg(test)] +mod tests { + use super::{BufferMode, ResponseBody}; + use crate::HttpError; + use bytes::Bytes; + use http_body::{Body, Frame, SizeHint}; + use std::pin::Pin; + use std::task::{Context, Poll}; + + // Inner body with a known, non-default size_hint / is_end_stream, so the + // parity assertion is meaningful. + struct Stub { + remaining: u64, + } + impl Body for Stub { + type Data = Bytes; + type Error = HttpError; + fn poll_frame(self: Pin<&mut Self>, _: &mut Context<'_>) + -> Poll, HttpError>>> { + Poll::Ready(None) + } + fn is_end_stream(&self) -> bool { self.remaining == 0 } + fn size_hint(&self) -> SizeHint { SizeHint::with_exact(self.remaining) } + } + + #[test] + fn streaming_forwards_size_hint_and_is_end_stream() { + let reference = Stub { remaining: 42 }; + let ref_hint = reference.size_hint().exact(); + let ref_end = reference.is_end_stream(); + let wrapped = ResponseBody::streaming(Stub { remaining: 42 }); + assert_eq!(wrapped.size_hint().exact(), ref_hint); // NOT silently None/unbounded + assert_eq!(wrapped.is_end_stream(), ref_end); + } + + #[test] + fn buffered_reports_exact_length() { + let body: ResponseBody = ResponseBody::buffered(Bytes::from_static(b"hello")); + assert_eq!(body.size_hint().exact(), Some(5)); + } + + #[test] + fn buffer_mode_is_copy() { + let m = BufferMode::Buffer; + let n = m; // Copy + assert_eq!(m, n); + } +} +``` + +- [ ] **Step 3: Run it to verify it fails** + +Run: `just check` +Expected: FAIL — `cannot find type ResponseBody` / `BufferMode`. + +- [ ] **Step 4: Implement `ResponseBody` + `BufferMode`** + +Prepend to `body.rs`: + +```rust +//! The canonical HTTP response body and the per-request buffer/stream directive. + +use crate::HttpError; +use bytes::Bytes; +use http_body::{Body, Frame, SizeHint}; +use http_body_util::Full; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Per-request directive: buffer the response body inside the retry boundary, or +/// return it streaming at headers (ADR-0030 §4). `Copy` so it survives the +/// request clone `Retry` makes. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BufferMode { + /// Collect the body to `Bytes` before returning (full retry coverage). + Buffer, + /// Return the live body at headers (adapter owns mid-stream recovery). + Stream, +} + +pin_project_lite::pin_project! { + /// The canonical response body: one buffered frame *xor* a live streaming + /// body, behind one stable type so adapters never name the buffer-vs-stream + /// machinery. Forwards all three `Body` methods to the active arm — a + /// wrapper that silently reported the default `size_hint`/`is_end_stream` + /// would make a caller's `.collect()` pre-size and any max-size guard wrong. + #[project = ResponseBodyProj] + pub enum ResponseBody { + /// A fully-collected body (single frame). + Buffered { #[pin] body: Full }, + /// A live streaming backend body. + Streaming { #[pin] body: B }, + } +} + +impl ResponseBody { + /// Wrap already-collected bytes as a one-frame buffered body. + #[must_use] + pub fn buffered(bytes: Bytes) -> Self { + Self::Buffered { body: Full::new(bytes) } + } + + /// Wrap a live streaming backend body. + pub fn streaming(body: B) -> Self { + Self::Streaming { body } + } +} + +impl Body for ResponseBody +where + B: Body, +{ + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, HttpError>>> { + match self.project() { + // `Full`'s error is `Infallible`; the `Err` arm is unreachable. + ResponseBodyProj::Buffered { body } => match body.poll_frame(cx) { + Poll::Ready(Some(Ok(frame))) => Poll::Ready(Some(Ok(frame))), + Poll::Ready(Some(Err(never))) => match never {}, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }, + ResponseBodyProj::Streaming { body } => body.poll_frame(cx), + } + } + + fn is_end_stream(&self) -> bool { + match self { + Self::Buffered { body } => body.is_end_stream(), + Self::Streaming { body } => body.is_end_stream(), + } + } + + fn size_hint(&self) -> SizeHint { + match self { + Self::Buffered { body } => body.size_hint(), + Self::Streaming { body } => body.size_hint(), + } + } +} +``` + +Add to `lib.rs`: `pub mod body;`, `pub use body::{BufferMode, ResponseBody};`, module-doc line. + +- [ ] **Step 5: Run tests** + +Run: `just check && cargo test -p oath-adapter-net-http-api body && just lint` +Expected: PASS. (If `just machete` later flags `http-body-util`/`pin-project-lite` as unused, they are used here — re-run after this task lands.) + +- [ ] **Step 6: Commit** + +```bash +git add crates/adapter/net/http/api/src/body.rs crates/adapter/net/http/api/src/lib.rs crates/adapter/net/http/api/Cargo.toml Cargo.toml +git commit -m "feat(net): ResponseBody (buffer-xor-stream, forwards Body metadata) + BufferMode" +``` + +--- + +## Task 2.4: `oath-adapter-net-http-mock` harness + +**Files:** +- Create: `crates/adapter/net/http/mock/Cargo.toml`, `.../src/lib.rs`, `.../src/body.rs`, `.../src/client.rs`, `.../src/timer.rs` +- Modify: root `Cargo.toml` (member + dep entry) + +**Interfaces:** +- Consumes: `oath_adapter_net_http_api::{Service, HttpError}`; `oath_adapter_net_api::Timer`; `http`, `bytes`, `http-body`. +- Produces: `oath_adapter_net_http_mock::{MockBody, MockClient, MockTimer}`. `MockBody::new(frames)`, `MockBody::empty()`. `MockClient::ok(Bytes)`, `MockClient::new(StatusCode, frames)`, `MockClient::recorded_requests() -> Vec>`. `MockTimer::new() -> Self`, `MockTimer::advance(Duration)`. + +*(Standalone harness — `net-http-api` does NOT depend on it, so there is no dev-dep cycle. Downstream slices add it under their own `[dev-dependencies]`.)* + +- [ ] **Step 1: Register the crate** + +Root `Cargo.toml`: add `"crates/adapter/net/http/mock",` to `members`, and to `[workspace.dependencies]`: + +```toml +oath-adapter-net-http-mock = { path = "crates/adapter/net/http/mock", version = "0.1.0" } +``` + +Create `crates/adapter/net/http/mock/Cargo.toml`: + +```toml +[package] +name = "oath-adapter-net-http-mock" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +oath-adapter-net-api = { workspace = true } +oath-adapter-net-http-api = { workspace = true } +http = { workspace = true } +bytes = { workspace = true } +http-body = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } +``` + +Create `crates/adapter/net/http/mock/src/lib.rs`: + +```rust +//! Test harness for the net-http stack: a canned-response `MockClient` leaf, a +//! frame-controllable `MockBody`, and a `MockTimer` virtual clock. Consumed by +//! downstream crates via `[dev-dependencies]` only — it has no production edge. +#![forbid(unsafe_code)] + +pub mod body; +pub mod client; +pub mod timer; + +pub use body::MockBody; +pub use client::MockClient; +pub use timer::MockTimer; +``` + +- [ ] **Step 2: `MockBody` — write the failing test** + +Create `crates/adapter/net/http/mock/src/body.rs`: + +```rust +//! A response body that yields pre-set frames — placeholder; step 3. + +#[cfg(test)] +mod tests { + use super::MockBody; + use bytes::Bytes; + use http_body::Body; + use http_body_util::BodyExt; + + #[tokio::test] + async fn yields_frames_then_ends_and_reports_exact_size() { + let body = MockBody::new([Bytes::from_static(b"ab"), Bytes::from_static(b"cde")]); + assert_eq!(body.size_hint().exact(), Some(5)); + assert!(!body.is_end_stream()); + let collected = body.collect().await.unwrap().to_bytes(); + assert_eq!(collected, Bytes::from_static(b"abcde")); + } +} +``` + +Add `http-body-util` as a **dev-dependency** of the mock crate (for `BodyExt::collect` in tests) — append to its `Cargo.toml` `[dev-dependencies]`: `http-body-util = { workspace = true }`. + +- [ ] **Step 3: Implement `MockBody`** + +Prepend to `body.rs`: + +```rust +//! A response body that yields pre-set data frames, with a controllable +//! `size_hint`/`is_end_stream` for exercising body-metadata forwarding. + +use bytes::Bytes; +use http_body::{Body, Frame, SizeHint}; +use oath_adapter_net_http_api::HttpError; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A body that yields its configured frames in order, then ends. +#[derive(Debug, Default)] +pub struct MockBody { + frames: VecDeque, +} + +impl MockBody { + /// A body yielding `frames` in order. + #[must_use] + pub fn new(frames: impl IntoIterator) -> Self { + Self { frames: frames.into_iter().collect() } + } + + /// An immediately-ended body. + #[must_use] + pub fn empty() -> Self { + Self::default() + } +} + +impl Body for MockBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, HttpError>>> { + // `MockBody` holds no pinned fields, so `get_mut` is sound (auto-`Unpin`). + let this = self.get_mut(); + match this.frames.pop_front() { + Some(data) => Poll::Ready(Some(Ok(Frame::data(data)))), + None => Poll::Ready(None), + } + } + + fn is_end_stream(&self) -> bool { + self.frames.is_empty() + } + + fn size_hint(&self) -> SizeHint { + let total: u64 = self + .frames + .iter() + .map(|f| u64::try_from(f.len()).unwrap_or(u64::MAX)) + .sum(); + SizeHint::with_exact(total) + } +} +``` + +Run: `just check && cargo test -p oath-adapter-net-http-mock body && just lint` — Expected: PASS. + +- [ ] **Step 4: `MockTimer` — write the failing test** + +Create `crates/adapter/net/http/mock/src/timer.rs`: + +```rust +//! A virtual, controllable clock — placeholder; step 5. + +#[cfg(test)] +mod tests { + use super::MockTimer; + use oath_adapter_net_api::Timer; + use std::time::Duration; + + #[tokio::test] + async fn advance_moves_now_and_wakes_sleepers() { + let timer = MockTimer::new(); + let start = timer.now(); + let sleep = timer.sleep(Duration::from_secs(10)); + let advancer = timer.clone(); + // Wake the sleeper by advancing past its deadline on another task. + let handle = tokio::spawn(async move { sleep.await }); + tokio::task::yield_now().await; + advancer.advance(Duration::from_secs(10)); + handle.await.unwrap(); + assert_eq!(timer.now().duration_since(start), Duration::from_secs(10)); + } +} +``` + +- [ ] **Step 5: Implement `MockTimer`** + +Prepend to `timer.rs`: + +```rust +//! A virtual clock for deterministically driving timing layers in tests. +//! +//! `std::time::Instant` has no value constructor, so `MockTimer` anchors to a +//! real `Instant::now()` at construction and advances via a stored offset +//! (behind interior mutability, since `Timer::now` takes `&self`). `sleep` +//! registers a waker released by `advance` — a no-op `sleep` would make +//! elapsed-time-dependent tests vacuous. Cf. `governor::clock::FakeRelativeClock`. + +use oath_adapter_net_api::Timer; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex, PoisonError}; +use std::task::{Context, Poll, Waker}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +struct State { + now: Instant, + waiters: Vec<(Instant, Waker)>, +} + +/// A cloneable virtual clock. Clones share one timeline. +#[derive(Debug, Clone)] +pub struct MockTimer { + state: Arc>, +} + +impl MockTimer { + /// A clock anchored at the current real instant. + #[must_use] + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(State { now: Instant::now(), waiters: Vec::new() })), + } + } + + /// Advance virtual time by `dur`, waking every sleeper now due. + pub fn advance(&self, dur: Duration) { + let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner); + state.now += dur; + let now = state.now; + let mut due = Vec::new(); + state.waiters.retain(|(deadline, waker)| { + if *deadline <= now { + due.push(waker.clone()); + false + } else { + true + } + }); + drop(state); // release before waking, so a woken poll can re-lock + for waker in due { + waker.wake(); + } + } +} + +impl Default for MockTimer { + fn default() -> Self { + Self::new() + } +} + +/// The future returned by [`MockTimer::sleep`]. +#[derive(Debug)] +pub struct Sleep { + state: Arc>, + deadline: Instant, +} + +impl Future for Sleep { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner); + if state.now >= self.deadline { + Poll::Ready(()) + } else { + state.waiters.push((self.deadline, cx.waker().clone())); + Poll::Pending + } + } +} + +impl Timer for MockTimer { + fn sleep(&self, dur: Duration) -> impl Future + Send { + let deadline = { + let state = self.state.lock().unwrap_or_else(PoisonError::into_inner); + state.now + dur + }; + Sleep { state: Arc::clone(&self.state), deadline } + } + + fn now(&self) -> Instant { + self.state.lock().unwrap_or_else(PoisonError::into_inner).now + } +} +``` + +Run: `just check && cargo test -p oath-adapter-net-http-mock timer && just lint` — Expected: PASS. + +- [ ] **Step 6: `MockClient` — write the failing test** + +Create `crates/adapter/net/http/mock/src/client.rs`: + +```rust +//! A canned-response client leaf — placeholder; step 7. + +#[cfg(test)] +mod tests { + use super::MockClient; + use bytes::Bytes; + use http_body_util::BodyExt; + use oath_adapter_net_http_api::HttpClient; + + #[tokio::test] + async fn returns_canned_body_and_records_requests() { + let client = MockClient::ok(Bytes::from_static(b"pong")); + let mut req = http::Request::new(Bytes::from_static(b"ping")); + *req.uri_mut() = "/tickle".parse().unwrap(); + let resp = client.send(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(body, Bytes::from_static(b"pong")); + let recorded = client.recorded_requests(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].uri(), "/tickle"); + } +} +``` + +- [ ] **Step 7: Implement `MockClient`** + +Prepend to `client.rs`: + +```rust +//! A canned-response `Service` leaf that records the requests it receives. + +use crate::MockBody; +use bytes::Bytes; +use oath_adapter_net_http_api::{HttpError, Service}; +use std::future::Future; +use std::sync::{Arc, Mutex, PoisonError}; + +/// A leaf client that returns a fixed status + body and records every request. +#[derive(Debug, Clone)] +pub struct MockClient { + status: http::StatusCode, + frames: Vec, + requests: Arc>>>, +} + +impl MockClient { + /// A client returning `status` with a body of `frames`. + #[must_use] + pub fn new(status: http::StatusCode, frames: impl IntoIterator) -> Self { + Self { + status, + frames: frames.into_iter().collect(), + requests: Arc::new(Mutex::new(Vec::new())), + } + } + + /// A `200 OK` client whose body is `body`. + #[must_use] + pub fn ok(body: Bytes) -> Self { + Self::new(http::StatusCode::OK, [body]) + } + + /// The requests this client has received, in order. + #[must_use] + pub fn recorded_requests(&self) -> Vec> { + self.requests.lock().unwrap_or_else(PoisonError::into_inner).clone() + } +} + +impl Service> for MockClient { + type Response = http::Response; + type Error = HttpError; + + fn call( + &self, + req: http::Request, + ) -> impl Future> + Send { + let requests = Arc::clone(&self.requests); + let status = self.status; + let frames = self.frames.clone(); + async move { + requests.lock().unwrap_or_else(PoisonError::into_inner).push(req); + let mut resp = http::Response::new(MockBody::new(frames)); + *resp.status_mut() = status; + Ok(resp) + } + } +} +``` + +Run: `just check && cargo test -p oath-adapter-net-http-mock client && just lint` — Expected: PASS. + +- [ ] **Step 8: CHANGELOG + full gate + commit** + +Add to `CHANGELOG.md` `[Unreleased] → Added`: + +```markdown +- `oath-adapter-net-http-api` HTTP contract — `HttpError` (one concrete + transport/middleware error; HTTP statuses pass through as `Ok(Response)`), + `HttpClient` (blanket-impl'd `Service` sub-trait), `ResponseBody` (buffer-xor- + stream, forwarding `Body` metadata), and `BufferMode`. New `oath-adapter-net- + http-mock` test harness (`MockClient`, `MockBody`, `MockTimer`). +``` + +Run: `just ci` — Expected: green. + +```bash +git add crates/adapter/net/http/mock Cargo.toml CHANGELOG.md +git commit -m "feat(net): net-http-mock harness — MockClient, MockBody, MockTimer" +``` + +--- + +## Self-Review + +**Spec coverage (PR 2 roadmap in the foundation plan + construction-surface spec):** +- `HttpError` model A (transport/middleware only; statuses pass through) — Task 2.1. ✅ +- `HttpClient` blanket impl + `send` — Task 2.2. ✅ +- `ResponseBody` forwarding all three `Body` methods + `BufferMode` — Task 2.3 (the spec's transparency fix is the `is_end_stream`/`size_hint` match arms + the parity test). ✅ +- `net-http-mock` (`MockClient`, `MockBody`, `MockTimer` with observable `sleep`/`advance`) — Task 2.4. ✅ +- No dev-dep cycle (net-http-api uses inline doubles) — the roadmap's stated alternative. ✅ +- Deferred (correctly absent): `AuthSource`/`Auth`/`Guarded` (PR 3), `RateKey`/coverage (PR 4), the resilience layers + `stack`/`build` + hyper leaf (later slices), `CircuitOpen` variant (added with the CB layer). + +**Placeholder scan:** none — every step carries real code/commands. + +**Type consistency:** `HttpError` variants/constructors identical across 2.1 and their uses in 2.2–2.4; `Service`/`HttpClient` signatures match the PR-1 landed `Service` and the 2.2 definition; `MockBody`/`MockClient`/`MockTimer` names match `lib.rs` re-exports and the `Interfaces` blocks. `ResponseBody` arm names (`Buffered`/`Streaming`) consistent between the pin-project enum, the `project`ed `poll_frame`, and the `&self` `is_end_stream`/`size_hint` matches. + +**Known risk to watch during impl:** `http_body_util`/`pin_project_lite` exact API surface (`Full::new`, `Frame::data`, `SizeHint::with_exact`, `pin_project!` enum `#[project = …]`) — all stable in the pinned versions; if a signature differs, adjust at the failing-test step. The mock crate's dev-dep on `tokio`/`http-body-util` is test-only and does not touch any production graph.