diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b7d2a0..030bf9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `HttpClient` (blanket-impl'd `Service` sub-trait), `ResponseBody` (buffer-xor- stream, forwarding `Body` metadata), and `BufferMode`. New `oath-adapter-net- http-mock` test harness (`MockClient`, `MockBody`, `MockTimer`). +- `oath-adapter-net-ws-api` WebSocket contract (ADR-0032/0033) — `Frame`/`CloseFrame` + (RFC 6455 frame vocabulary), `WsError` (one concrete transport error with + `HasErrorKind`), the split owned halves (`WsSink` one-shot RPITIT send half with + terminal `close(self)`; `WsSource` blanket `Stream` recv half), the epoch-stamped + lifecycle watch channel (`ConnState`, `LifecycleSnapshot`, `Lifecycle`/ + `LifecycleSender` over runtime-neutral `async-watch`), and the `WsConnector` leaf + seam. New `oath-adapter-net-ws-mock` test harness (`MockWsConnector`, `MockSink`, + `MockSource`). - WebSocket transport design: ADR-0032 (contract — untyped duplex frame channel, asymmetric `Stream`/RPITIT split, epoch-stamped lifecycle, `WsConnector` leaf, per-transport `AuthSource`) and ADR-0033 (resilience — reconnect actor over a diff --git a/Cargo.lock b/Cargo.lock index a5a951e..a865021 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "async-watch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a078faf4e27c0c6cc0efb20e5da59dcccc04968ebf2801d8e0b2195124cdcdb2" +dependencies = [ + "event-listener", +] + [[package]] name = "autocfg" version = "1.5.1" @@ -51,6 +60,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "2.4.1" @@ -69,6 +84,36 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "slab", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -217,6 +262,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "oath-adapter-net-ws-api" +version = "0.1.0" +dependencies = [ + "async-watch", + "bytes", + "futures-core", + "futures-util", + "http", + "oath-adapter-net-api", + "thiserror", + "tokio", +] + +[[package]] +name = "oath-adapter-net-ws-mock" +version = "0.1.0" +dependencies = [ + "bytes", + "futures-core", + "futures-util", + "http", + "oath-adapter-net-api", + "oath-adapter-net-ws-api", + "tokio", +] + [[package]] name = "oath-bus-api" version = "0.1.0" @@ -573,6 +645,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.2" diff --git a/Cargo.toml b/Cargo.toml index b95b274..b6103e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,8 @@ members = [ "crates/adapter/net/api", "crates/adapter/net/http/api", "crates/adapter/net/http/mock", + "crates/adapter/net/ws/api", + "crates/adapter/net/ws/mock", "crates/bus/api", "crates/event-log/api", "crates/persistence/api", @@ -44,6 +46,8 @@ oath-model = { path = "crates/model", version = "0.1.0" } oath-adapter-net-api = { path = "crates/adapter/net/api", version = "0.1.0" } oath-adapter-net-http-api = { path = "crates/adapter/net/http/api", version = "0.1.0" } oath-adapter-net-http-mock = { path = "crates/adapter/net/http/mock", version = "0.1.0" } +oath-adapter-net-ws-api = { path = "crates/adapter/net/ws/api", version = "0.1.0" } +oath-adapter-net-ws-mock = { path = "crates/adapter/net/ws/mock", version = "0.1.0" } oath-bus-api = { path = "crates/bus/api", version = "0.1.0" } oath-event-log-api = { path = "crates/event-log/api", version = "0.1.0" } oath-persistence-api = { path = "crates/persistence/api", version = "0.1.0" } @@ -56,6 +60,9 @@ oath-core-api = { path = "crates/core/api", version = "0.1.0" } oath-core-kernel = { path = "crates/core/kernel", version = "0.1.0" } # External shared — backend-specific deps belong in individual crate Cargo.toml files +# Runtime-neutral last-value channel (extracted from tokio::sync::watch, +# event-listener family) — keeps tokio out of net-ws-api (ADR-0033 §5). +async-watch = "0.3" bytes = "1" http = "1" http-body = "1" @@ -63,6 +70,7 @@ http-body-util = "0.1" pin-project-lite = "0.2" futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } +futures-util = "0.3" serde = { version = "1", features = ["derive"] } thiserror = "2" tokio = { version = "1", features = ["full"] } diff --git a/README.md b/README.md index b2dc4f4..fe3a4c9 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Every subsystem is defined behind a trait. Backends, adapters, transports, and s | `oath-adapter-api` | Harness + `Broker` / `DataProvider` traits for venue adapters | | `oath-adapter-net-api` | Transport-neutral composition primitives (`Layer`, `ServiceBuilder`, `Stack`) + `ErrorKind` / `Timer` | | `oath-adapter-net-http-api` | HTTP transport contract (`Service`, …) over the `oath-adapter-net-api` kernel | +| `oath-adapter-net-ws-api` | WebSocket transport contract (`Frame`, `WsSink`/`WsSource`, `Lifecycle`, `WsConnector`, …) over the `oath-adapter-net-api` kernel | | `oath-strategy-api` | User-facing `Strategy` trait and Signal ergonomics (the canonical `Signal` payload lives in `oath-model`, per ADR-0028) | | `oath-strategy-host` | Strategy Node binary: hosts user strategies, isolated from Core | | `oath-cli` | The first Frontend (MVP) | @@ -43,6 +44,7 @@ graph TD stratapi[oath-strategy-api] --> model netapi[oath-adapter-net-api] nethttpapi[oath-adapter-net-http-api] --> netapi + netwsapi[oath-adapter-net-ws-api] --> netapi risk[oath-core-risk] --> coreapi risk --> model diff --git a/crates/adapter/net/ws/api/Cargo.toml b/crates/adapter/net/ws/api/Cargo.toml new file mode 100644 index 0000000..cde6196 --- /dev/null +++ b/crates/adapter/net/ws/api/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "oath-adapter-net-ws-api" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +async-watch = { workspace = true } +bytes = { workspace = true } +http = { workspace = true } +oath-adapter-net-api = { workspace = true } +thiserror = { workspace = true } +futures-core = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } +futures-util = { workspace = true } diff --git a/crates/adapter/net/ws/api/src/connector.rs b/crates/adapter/net/ws/api/src/connector.rs new file mode 100644 index 0000000..737d983 --- /dev/null +++ b/crates/adapter/net/ws/api/src/connector.rs @@ -0,0 +1,87 @@ +//! The named dependency-inversion seam the composition stack builds on. +//! +//! The `HttpClient` analogue for WS (ADR-0032 §7). The WS upgrade *is* an +//! HTTP GET, so the handshake is an `http::Request<()>` (body-agnostic parts +//! — the same shape the shared `AuthSource` will stamp, §8). Per ADR-0029 §5 +//! it is a compile-time `impl WsConnector` seam — never `dyn`. +//! +//! This is the **composition** seam: the leaf and every inner resilience +//! layer implement it (ADR-0033 §1). The richer usage seam an adapter holds +//! (`ReconnectingConnector`/`ReconnectingConnection` + `WsControl`) is +//! produced only at the assembly boundary, in a later slice. + +use crate::{Lifecycle, WsError, WsSink, WsSource}; +use std::future::Future; + +/// Establish WebSocket connections: one handshake in, three handles out. +pub trait WsConnector { + /// The send half produced by a successful connect. + type Sink: WsSink; + /// The receive half produced by a successful connect. + type Source: WsSource; + + /// Perform the upgrade handshake and yield the two single-owner halves + /// plus the lifecycle read channel (ADR-0032 §2/§4). `&self` — a connector + /// is shared; the reconnect layer calls it once per (re)connect. + fn connect( + &self, + handshake: http::Request<()>, + ) -> impl Future> + Send; +} + +#[cfg(test)] +mod tests { + use super::WsConnector; + use crate::{Frame, Lifecycle, LifecycleSnapshot, WsError, WsSink}; + use futures_core::Stream; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + struct StubSink; + impl WsSink for StubSink { + fn send(&mut self, _frame: Frame) -> impl Future> + Send { + std::future::ready(Ok(())) + } + fn close(self) -> impl Future> + Send { + std::future::ready(Ok(())) + } + } + + struct StubSource; + impl Stream for StubSource { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(None) + } + } + + struct StubConnector; + impl WsConnector for StubConnector { + type Sink = StubSink; + type Source = StubSource; + #[allow(clippy::manual_async_fn)] + fn connect( + &self, + _handshake: http::Request<()>, + ) -> impl Future> + Send + { + async { + let (_tx, lifecycle) = Lifecycle::channel(LifecycleSnapshot::connected(0)); + Ok((StubSink, StubSource, lifecycle)) + } + } + } + + #[tokio::test] + async fn connect_yields_the_three_handles() { + fn assert_connector(_: &C) {} + assert_connector(&StubConnector); + + let mut handshake = http::Request::new(()); + *handshake.uri_mut() = "wss://api.ibkr.com/v1/api/ws".parse().unwrap(); + let (sink, _source, lifecycle) = StubConnector.connect(handshake).await.unwrap(); + assert_eq!(lifecycle.snapshot().epoch, 0); + sink.close().await.unwrap(); + } +} diff --git a/crates/adapter/net/ws/api/src/error.rs b/crates/adapter/net/ws/api/src/error.rs new file mode 100644 index 0000000..d342c99 --- /dev/null +++ b/crates/adapter/net/ws/api/src/error.rs @@ -0,0 +1,124 @@ +//! The single concrete error type across the WS stack — transport and +//! middleware failures only. +//! +//! Venue-level errors arriving as data frames are NOT errors here: they flow +//! through the source as `Text`/`Binary` for the adapter to classify +//! (ADR-0032 §1). `WsError` implements [`HasErrorKind`] once; the resilience +//! layers branch only on [`ErrorKind`] (ADR-0033 §7). + +use crate::CloseFrame; +use oath_adapter_net_api::{ErrorKind, HasErrorKind}; + +/// A boxed error source, preserving backend detail for logs without leaking +/// the concrete type. +pub type BoxError = Box; + +/// The single error of the WS transport: `connect`, `send`, `close`, and every +/// source item use it. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum WsError { + /// The operation did not complete within its timeout. + #[error("operation timed out")] + Timeout, + /// A connection-level failure (DNS, TCP, TLS, WS handshake or protocol). + #[error("connection failure")] + Connection(#[source] BoxError), + /// Credential stamping or refresh failed. + #[error("authorization failed: {0}")] + Auth(String), + /// The peer closed the connection (close frame, possibly with code+reason). + #[error("connection closed by peer{}", .0.as_ref().map_or_else( + String::new, + |frame| format!(" (code {}, reason {:?})", frame.code, frame.reason), + ))] + Closed(Option), + /// A backend error that does not fit another variant. + #[error("websocket error")] + Other(#[source] BoxError), +} + +impl WsError { + /// Construct a [`WsError::Connection`] from a source error. + #[must_use] + pub fn connection(source: impl Into) -> Self { + Self::Connection(source.into()) + } + + /// Construct a [`WsError::Auth`] from a message. + #[must_use] + pub fn auth(message: impl Into) -> Self { + Self::Auth(message.into()) + } + + /// Construct a [`WsError::Other`] from a source error. + #[must_use] + pub fn other(source: impl Into) -> Self { + Self::Other(source.into()) + } +} + +impl HasErrorKind for WsError { + fn kind(&self) -> ErrorKind { + match self { + Self::Timeout => ErrorKind::Timeout, + // A peer close is a connection-level loss for retry purposes; + // per-close-code refinement is the adapter's classification hook + // (ADR-0033 §7), not a transport concern. + Self::Connection(_) | Self::Closed(_) => ErrorKind::Connection, + Self::Auth(_) => ErrorKind::Auth, + Self::Other(_) => ErrorKind::Unknown, + } + } +} + +#[cfg(test)] +mod tests { + use super::WsError; + use crate::CloseFrame; + use oath_adapter_net_api::{ErrorKind, HasErrorKind}; + + #[test] + fn kind_maps_each_variant() { + assert_eq!(WsError::Timeout.kind(), ErrorKind::Timeout); + assert_eq!(WsError::connection("reset").kind(), ErrorKind::Connection); + assert_eq!(WsError::auth("expired").kind(), ErrorKind::Auth); + assert_eq!(WsError::Closed(None).kind(), ErrorKind::Connection); + let close = CloseFrame { + code: 1000, + reason: String::new(), + }; + assert_eq!(WsError::Closed(Some(close)).kind(), ErrorKind::Connection); + assert_eq!(WsError::other("boom").kind(), ErrorKind::Unknown); + } + + #[test] + fn auth_carries_message() { + assert_eq!( + WsError::auth("session expired").to_string(), + "authorization failed: session expired" + ); + } + + #[test] + fn closed_surfaces_code_and_reason_when_present() { + assert_eq!( + WsError::Closed(None).to_string(), + "connection closed by peer" + ); + let close = CloseFrame { + code: 1006, + reason: "abnormal".to_owned(), + }; + assert_eq!( + WsError::Closed(Some(close)).to_string(), + "connection closed by peer (code 1006, reason \"abnormal\")" + ); + } + + #[test] + fn is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } +} diff --git a/crates/adapter/net/ws/api/src/frame.rs b/crates/adapter/net/ws/api/src/frame.rs new file mode 100644 index 0000000..1ad8072 --- /dev/null +++ b/crates/adapter/net/ws/api/src/frame.rs @@ -0,0 +1,75 @@ +//! The WebSocket frame vocabulary — the leaf/inter-layer unit of transport. +//! +//! "Untyped" (ADR-0032 §1) means *no venue/JSON typing* — not flattening +//! WebSocket's own protocol frame kinds, which are transport concerns +//! (RFC 6455), not venue concerns (§3). After the ADR-0033 default stack the +//! adapter-facing source delivers only `Text`/`Binary` data frames; control +//! frames are absorbed by the heartbeat layer and bypass the data buffer. + +use bytes::Bytes; + +/// The payload of a `Close` frame: an RFC 6455 close code plus a UTF-8 reason. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CloseFrame { + /// The RFC 6455 §7.4 close code (e.g. `1000` normal closure). + pub code: u16, + /// The UTF-8 close reason; may be empty. + pub reason: String, +} + +/// One WebSocket frame. Payloads are raw [`Bytes`] — the transport never +/// parses them (grammar-blindness, ADR-0032 §1). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Frame { + /// A UTF-8 text data frame (kept as bytes; the adapter owns parsing). + Text(Bytes), + /// A binary data frame. + Binary(Bytes), + /// A protocol ping control frame. + Ping(Bytes), + /// A protocol pong control frame. + Pong(Bytes), + /// A close control frame with an optional code + reason. + Close(Option), +} + +impl Frame { + /// Whether this is a data frame (`Text`/`Binary`) — what the default + /// stack delivers to the adapter (ADR-0032 §3). + #[must_use] + pub const fn is_data(&self) -> bool { + matches!(self, Self::Text(_) | Self::Binary(_)) + } + + /// Whether this is a control frame (`Ping`/`Pong`/`Close`) — absorbed by + /// the heartbeat layer; bypasses the data buffer (ADR-0032 §3/§6). + #[must_use] + pub const fn is_control(&self) -> bool { + !self.is_data() + } +} + +#[cfg(test)] +mod tests { + use super::{CloseFrame, Frame}; + use bytes::Bytes; + + #[test] + fn data_and_control_frames_classify() { + assert!(Frame::Text(Bytes::from_static(b"{}")).is_data()); + assert!(Frame::Binary(Bytes::new()).is_data()); + assert!(Frame::Ping(Bytes::new()).is_control()); + assert!(Frame::Pong(Bytes::new()).is_control()); + assert!(Frame::Close(None).is_control()); + } + + #[test] + fn frames_are_cloneable_and_comparable() { + let close = Frame::Close(Some(CloseFrame { + code: 1000, + reason: "bye".to_owned(), + })); + assert_eq!(close.clone(), close); + assert_ne!(close, Frame::Close(None)); + } +} diff --git a/crates/adapter/net/ws/api/src/lib.rs b/crates/adapter/net/ws/api/src/lib.rs new file mode 100644 index 0000000..549f4ec --- /dev/null +++ b/crates/adapter/net/ws/api/src/lib.rs @@ -0,0 +1,34 @@ +//! `oath-adapter-net-ws-api` — the WebSocket transport contract over the kernel. +//! +//! Builds on `oath-adapter-net-api` (composition machinery + `ErrorKind` + +//! `Timer`). Defines the WS transport contract (ADR-0032, as amended by +//! ADR-0033 §5): an untyped duplex frame channel — the transport moves frames +//! and knows nothing of venue grammar (subscriptions, topics, JSON), which +//! stays in the adapter (ADR-0003). +//! +//! - [`frame`] — the `Frame`/`CloseFrame` transport vocabulary +//! - [`error`] — `WsError` and its `HasErrorKind` impl +//! - [`sink`] — `WsSink`, the owned send half +//! - [`source`] — `WsSource`, the owned recv half +//! - [`lifecycle`] — the out-of-band `Lifecycle` watch channel +//! - [`connector`] — `WsConnector`, the composition seam (handshake in, three +//! handles out) +//! +//! The resilience stack (reconnect actor, heartbeat, buffer, `stack()`) and +//! the tungstenite backend land in later slices. No async runtime, `tokio`, +//! `tokio-tungstenite`, or `serde` here. +#![forbid(unsafe_code)] + +pub mod connector; +pub mod error; +pub mod frame; +pub mod lifecycle; +pub mod sink; +pub mod source; + +pub use connector::WsConnector; +pub use error::{BoxError, WsError}; +pub use frame::{CloseFrame, Frame}; +pub use lifecycle::{ConnState, Lifecycle, LifecycleSender, LifecycleSnapshot}; +pub use sink::WsSink; +pub use source::WsSource; diff --git a/crates/adapter/net/ws/api/src/lifecycle.rs b/crates/adapter/net/ws/api/src/lifecycle.rs new file mode 100644 index 0000000..463ae5c --- /dev/null +++ b/crates/adapter/net/ws/api/src/lifecycle.rs @@ -0,0 +1,190 @@ +//! The out-of-band lifecycle channel: connection health as a third handle, +//! not an item interleaved into the data stream (ADR-0032 §4). +//! +//! Delivery is a **last-value watch of an epoch-stamped snapshot** (ADR-0033 +//! §5): the producer overwrites and never blocks, so a slow risk consumer can +//! never backpressure the socket-owning actor; the monotonic `epoch` makes +//! coalescing lossless for the safety fact ({ currently-down ∨ epoch-advanced } +//! is total). Every snapshot field is **level or monotonic-cumulative, never a +//! per-event delta** — overwrite semantics would lose a delta. + +use std::fmt; +use std::time::Instant; + +/// The connection phase — the level component of [`LifecycleSnapshot`]. +/// +/// `Stale`/`Reconnecting` are first-class: for a trading system the +/// safety-critical event is the feed going *down*, not its recovery +/// (ADR-0032 §4). `Unrecoverable` is the one terminal phase (ADR-0033 §7). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum ConnState { + /// The connection is up. `epoch` echoes [`LifecycleSnapshot::epoch`]. + Connected { + /// The connection epoch at the time of this phase. + epoch: u64, + }, + /// The feed is stale (idle-read timeout / missed liveness) — treat as down. + Stale, + /// The connection is down; the reconnect actor is re-establishing it. + Reconnecting, + /// A reconnect completed; `epoch` bounds the adapter's reconcile window + /// ("reconcile since epoch N", ADR-0032 §4/§5). + Resumed { + /// The new connection epoch after the completed down-cycle. + epoch: u64, + }, + /// A classified permanent failure — the stack has stopped retrying and + /// will not self-heal (ADR-0033 §7). + Unrecoverable, +} + +/// One epoch-stamped, level-semantics view of connection health. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct LifecycleSnapshot { + /// The current connection phase (level). + pub phase: ConnState, + /// Monotonic; bumped on every completed down-cycle. The canonical source + /// of truth — the value echoed in `Connected`/`Resumed` is this field; + /// consumers diff it. + pub epoch: u64, + /// When the current down phase began (`None` while up). + pub down_since: Option, + /// Monotonic count of connection attempts. + pub attempts: u64, + /// Monotonic cumulative count of frames dropped by the buffer layer — the + /// `Lagged` signal (ADR-0032 §6); a per-event delta would be lost to + /// overwrite, so consumers diff this total. + pub total_lagged: u64, +} + +impl LifecycleSnapshot { + /// A healthy just-connected snapshot at `epoch`. + #[must_use] + pub const fn connected(epoch: u64) -> Self { + Self { + phase: ConnState::Connected { epoch }, + epoch, + down_since: None, + attempts: 0, + total_lagged: 0, + } + } +} + +/// The read side of the lifecycle channel — the third handle `connect` yields. +/// +/// Cloneable: risk loop and adapter each hold their own cursor. +#[derive(Clone)] +pub struct Lifecycle { + rx: async_watch::Receiver, +} + +impl Lifecycle { + /// Create a linked producer/consumer pair seeded with `initial`. + #[must_use] + pub fn channel(initial: LifecycleSnapshot) -> (LifecycleSender, Self) { + let (tx, rx) = async_watch::channel(initial); + (LifecycleSender { tx }, Self { rx }) + } + + /// The latest snapshot — a level read; never blocks. + #[must_use] + pub fn snapshot(&self) -> LifecycleSnapshot { + *self.rx.borrow() + } + + /// Wait until a snapshot newer than the last one seen by *this* handle is + /// published. Returns `false` once the producer is dropped (terminal — no + /// further updates will ever arrive). + pub async fn changed(&mut self) -> bool { + self.rx.changed().await.is_ok() + } +} + +impl fmt::Debug for Lifecycle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Lifecycle") + .field("snapshot", &self.snapshot()) + .finish() + } +} + +/// The write side, held by the connection owner (a leaf, the reconnect actor, +/// or a mock). `send` overwrites — it never blocks on a slow consumer. +pub struct LifecycleSender { + tx: async_watch::Sender, +} + +impl LifecycleSender { + /// Publish a new snapshot. Best-effort: sending with every receiver gone + /// is a no-op, not a failure — the producer never depends on consumers. + pub fn send(&self, snapshot: LifecycleSnapshot) { + // Err means all receivers dropped — nothing to notify. + let _ = self.tx.send(snapshot); + } +} + +impl fmt::Debug for LifecycleSender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LifecycleSender").finish_non_exhaustive() + } +} + +#[cfg(test)] +mod tests { + use super::{ConnState, Lifecycle, LifecycleSnapshot}; + + #[test] + fn connected_seeds_a_consistent_snapshot() { + let snap = LifecycleSnapshot::connected(3); + assert_eq!(snap.phase, ConnState::Connected { epoch: 3 }); + assert_eq!(snap.epoch, 3); + assert_eq!(snap.down_since, None); + assert_eq!(snap.attempts, 0); + assert_eq!(snap.total_lagged, 0); + } + + #[test] + fn snapshot_reads_the_latest_value_without_blocking() { + let (tx, lifecycle) = Lifecycle::channel(LifecycleSnapshot::connected(0)); + assert_eq!( + lifecycle.snapshot().phase, + ConnState::Connected { epoch: 0 } + ); + tx.send(LifecycleSnapshot { + phase: ConnState::Stale, + ..LifecycleSnapshot::connected(0) + }); + assert_eq!(lifecycle.snapshot().phase, ConnState::Stale); + } + + #[test] + fn overwrite_coalesces_but_the_epoch_carries_the_cycle_count() { + // ADR-0033 §5: a slow consumer may miss transient phases, but + // { currently-down ∨ epoch-advanced } is total — the epoch delta + // recovers fully-coalesced down-cycles. + let (tx, lifecycle) = Lifecycle::channel(LifecycleSnapshot::connected(5)); + tx.send(LifecycleSnapshot { + phase: ConnState::Reconnecting, + ..LifecycleSnapshot::connected(5) + }); + tx.send(LifecycleSnapshot::connected(9)); // several cycles later + let seen = lifecycle.snapshot(); + assert_eq!(seen.phase, ConnState::Connected { epoch: 9 }); + assert_eq!(seen.epoch - 5, 4); // four down-cycles, regardless of what was witnessed + } + + #[tokio::test] + async fn changed_wakes_on_update_and_ends_when_the_sender_drops() { + let (tx, mut lifecycle) = Lifecycle::channel(LifecycleSnapshot::connected(0)); + tx.send(LifecycleSnapshot { + phase: ConnState::Reconnecting, + ..LifecycleSnapshot::connected(0) + }); + assert!(lifecycle.changed().await); // pending update is observed + assert_eq!(lifecycle.snapshot().phase, ConnState::Reconnecting); + drop(tx); + assert!(!lifecycle.changed().await); // producer gone — no more updates + } +} diff --git a/crates/adapter/net/ws/api/src/sink.rs b/crates/adapter/net/ws/api/src/sink.rs new file mode 100644 index 0000000..2aa2684 --- /dev/null +++ b/crates/adapter/net/ws/api/src/sink.rs @@ -0,0 +1,61 @@ +//! The owned send half: one-shot RPITIT `send`, terminal `close`. +//! +//! Deliberately **not** `futures::Sink` — its `poll_ready`/`start_send`/ +//! `poll_flush` is the poll-handshake the `Service` design walked away from +//! (ADR-0032 §2); subscribe/heartbeat traffic is low-volume, so a one-shot +//! `send` suffices. The half is single-owner and moves to its own task. + +use crate::{Frame, WsError}; +use std::future::Future; + +/// The send half of one WebSocket connection. +/// +/// `Send` because the halves move to separate tasks (concurrent send of +/// subscribe/heartbeat vs. receive of frames). `'static` is not required +/// here — it is enforced at the composition boundary, as for `Service`. +pub trait WsSink: Send { + /// Send one frame. + fn send(&mut self, frame: Frame) -> impl Future> + Send; + + /// Initiate the closing handshake. Consumes the sink — shutdown is one-way + /// and terminal, so the sink cannot be used after close is requested + /// (enforced by the type system, ADR-0032 §2). + fn close(self) -> impl Future> + Send; +} + +#[cfg(test)] +mod tests { + use super::WsSink; + use crate::{Frame, WsError}; + use bytes::Bytes; + use std::future::Future; + + /// Inline double: records sent frames, succeeds on close. + #[derive(Default)] + struct VecSink { + sent: Vec, + } + + impl WsSink for VecSink { + fn send(&mut self, frame: Frame) -> impl Future> + Send { + self.sent.push(frame); + std::future::ready(Ok(())) + } + + fn close(self) -> impl Future> + Send { + std::future::ready(Ok(())) + } + } + + #[tokio::test] + async fn send_is_one_shot_and_close_consumes_the_sink() { + let mut sink = VecSink::default(); + sink.send(Frame::Text(Bytes::from_static(b"smd+265598+{}"))) + .await + .unwrap(); + assert_eq!(sink.sent.len(), 1); + // `close(self)` takes the sink by value — `sink.send(...)` after this + // line would be a compile error (ADR-0032 §2's type-system guarantee). + sink.close().await.unwrap(); + } +} diff --git a/crates/adapter/net/ws/api/src/source.rs b/crates/adapter/net/ws/api/src/source.rs new file mode 100644 index 0000000..777eb6a --- /dev/null +++ b/crates/adapter/net/ws/api/src/source.rs @@ -0,0 +1,65 @@ +//! The owned recv half: a stream of frames. +//! +//! Recv is `futures_core::Stream`, not a hand-rolled pull iterator — the +//! `http_body::Body` precedent (ADR-0032 §2): monomorphised, zero-box, and it +//! gives the resilience layers `StreamExt`/`unfold` instead of manual +//! `poll_next`. Exclusive `&mut` access is inherent in `Stream::poll_next`. + +use crate::{Frame, WsError}; +use futures_core::Stream; + +/// The receive half of one WebSocket connection: frames in arrival order, +/// each `Ok(Frame)` or a terminal-ish `Err(WsError)`. +/// +/// Blanket-implemented for every matching stream — a backend implements +/// `Stream` once and is a `WsSource` for free (the `HttpClient` move). +pub trait WsSource: Stream> + Send {} + +impl WsSource for S where S: Stream> + Send {} + +#[cfg(test)] +mod tests { + use super::WsSource; + use crate::{Frame, WsError}; + use bytes::Bytes; + use futures_core::Stream; + use futures_util::StreamExt; + use std::collections::VecDeque; + use std::pin::Pin; + use std::task::{Context, Poll}; + + /// Inline double: yields scripted items, then ends. + struct ScriptSource { + items: VecDeque>, + } + + impl Stream for ScriptSource { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // No pinned fields — `get_mut` is sound (auto-`Unpin`). + Poll::Ready(self.get_mut().items.pop_front()) + } + } + + #[test] + fn any_matching_stream_is_a_ws_source() { + fn assert_ws_source(_: &S) {} + let source = ScriptSource { + items: VecDeque::new(), + }; + assert_ws_source(&source); // blanket impl applies + } + + #[tokio::test] + async fn source_yields_items_then_ends() { + let mut source = ScriptSource { + items: VecDeque::from([ + Ok(Frame::Text(Bytes::from_static(b"{\"topic\":\"system\"}"))), + Err(WsError::connection("reset")), + ]), + }; + assert!(source.next().await.unwrap().is_ok()); + assert!(source.next().await.unwrap().is_err()); + assert!(source.next().await.is_none()); + } +} diff --git a/crates/adapter/net/ws/mock/Cargo.toml b/crates/adapter/net/ws/mock/Cargo.toml new file mode 100644 index 0000000..403befa --- /dev/null +++ b/crates/adapter/net/ws/mock/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "oath-adapter-net-ws-mock" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +oath-adapter-net-ws-api = { workspace = true } +http = { workspace = true } +futures-core = { workspace = true } + +[dev-dependencies] +oath-adapter-net-api = { workspace = true } +bytes = { workspace = true } +tokio = { workspace = true } +futures-util = { workspace = true } diff --git a/crates/adapter/net/ws/mock/src/connector.rs b/crates/adapter/net/ws/mock/src/connector.rs new file mode 100644 index 0000000..49b4592 --- /dev/null +++ b/crates/adapter/net/ws/mock/src/connector.rs @@ -0,0 +1,229 @@ +//! A scriptable `WsConnector` leaf. +//! +//! Each `connect` consumes the next script (a frame sequence, or a failure), +//! records the handshake, and exposes what every connection sent, whether it +//! closed, and its `LifecycleSender` so a test can drive lifecycle +//! transitions (ADR-0033 §9's "scripted frames + injectable disconnects and +//! `ErrorKind`s"). + +use crate::sink::ConnectionRecord; +use crate::{MockSink, MockSource, lock}; +use oath_adapter_net_ws_api::{ + Frame, Lifecycle, LifecycleSender, LifecycleSnapshot, WsConnector, WsError, +}; +use std::collections::VecDeque; +use std::future::Future; +use std::sync::{Arc, Mutex}; + +#[derive(Debug)] +enum Script { + /// The next connect succeeds; its source yields these items, then ends. + Yield(VecDeque>), + /// The next connect fails with this error. + Fail(WsError), +} + +#[derive(Debug, Default)] +struct State { + scripts: VecDeque