Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`HttpClient` (blanket-impl'd `Service` sub-trait), `ResponseBody` (buffer-xor-
stream, forwarding `Body` metadata), and `BufferMode`. New `oath-adapter-net-
http-mock` test harness (`MockClient`, `MockBody`, `MockTimer`).
- `oath-adapter-net-ws-api` WebSocket contract (ADR-0032/0033) — `Frame`/`CloseFrame`
(RFC 6455 frame vocabulary), `WsError` (one concrete transport error with
`HasErrorKind`), the split owned halves (`WsSink` one-shot RPITIT send half with
terminal `close(self)`; `WsSource` blanket `Stream` recv half), the epoch-stamped
lifecycle watch channel (`ConnState`, `LifecycleSnapshot`, `Lifecycle`/
`LifecycleSender` over runtime-neutral `async-watch`), and the `WsConnector` leaf
seam. New `oath-adapter-net-ws-mock` test harness (`MockWsConnector`, `MockSink`,
`MockSource`).
- WebSocket transport design: ADR-0032 (contract — untyped duplex frame channel,
asymmetric `Stream`/RPITIT split, epoch-stamped lifecycle, `WsConnector` leaf,
per-transport `AuthSource`) and ADR-0033 (resilience — reconnect actor over a
Expand Down
78 changes: 78 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ members = [
"crates/adapter/net/api",
"crates/adapter/net/http/api",
"crates/adapter/net/http/mock",
"crates/adapter/net/ws/api",
"crates/adapter/net/ws/mock",
"crates/bus/api",
"crates/event-log/api",
"crates/persistence/api",
Expand Down Expand Up @@ -44,6 +46,8 @@ oath-model = { path = "crates/model", version = "0.1.0" }
oath-adapter-net-api = { path = "crates/adapter/net/api", version = "0.1.0" }
oath-adapter-net-http-api = { path = "crates/adapter/net/http/api", version = "0.1.0" }
oath-adapter-net-http-mock = { path = "crates/adapter/net/http/mock", version = "0.1.0" }
oath-adapter-net-ws-api = { path = "crates/adapter/net/ws/api", version = "0.1.0" }
oath-adapter-net-ws-mock = { path = "crates/adapter/net/ws/mock", version = "0.1.0" }
oath-bus-api = { path = "crates/bus/api", version = "0.1.0" }
oath-event-log-api = { path = "crates/event-log/api", version = "0.1.0" }
oath-persistence-api = { path = "crates/persistence/api", version = "0.1.0" }
Expand All @@ -56,13 +60,17 @@ oath-core-api = { path = "crates/core/api", version = "0.1.0" }
oath-core-kernel = { path = "crates/core/kernel", version = "0.1.0" }

# External shared — backend-specific deps belong in individual crate Cargo.toml files
# Runtime-neutral last-value channel (extracted from tokio::sync::watch,
# event-listener family) — keeps tokio out of net-ws-api (ADR-0033 §5).
async-watch = "0.3"
bytes = "1"
http = "1"
http-body = "1"
http-body-util = "0.1"
pin-project-lite = "0.2"
futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false }
futures-util = "0.3"
serde = { version = "1", features = ["derive"] }
thiserror = "2"
tokio = { version = "1", features = ["full"] }
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Every subsystem is defined behind a trait. Backends, adapters, transports, and s
| `oath-adapter-api` | Harness + `Broker` / `DataProvider` traits for venue adapters |
| `oath-adapter-net-api` | Transport-neutral composition primitives (`Layer`, `ServiceBuilder`, `Stack`) + `ErrorKind` / `Timer` |
| `oath-adapter-net-http-api` | HTTP transport contract (`Service`, …) over the `oath-adapter-net-api` kernel |
| `oath-adapter-net-ws-api` | WebSocket transport contract (`Frame`, `WsSink`/`WsSource`, `Lifecycle`, `WsConnector`, …) over the `oath-adapter-net-api` kernel |
| `oath-strategy-api` | User-facing `Strategy` trait and Signal ergonomics (the canonical `Signal` payload lives in `oath-model`, per ADR-0028) |
| `oath-strategy-host` | Strategy Node binary: hosts user strategies, isolated from Core |
| `oath-cli` | The first Frontend (MVP) |
Expand All @@ -43,6 +44,7 @@ graph TD
stratapi[oath-strategy-api] --> model
netapi[oath-adapter-net-api]
nethttpapi[oath-adapter-net-http-api] --> netapi
netwsapi[oath-adapter-net-ws-api] --> netapi

risk[oath-core-risk] --> coreapi
risk --> model
Expand Down
21 changes: 21 additions & 0 deletions crates/adapter/net/ws/api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "oath-adapter-net-ws-api"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true

[lints]
workspace = true

[dependencies]
async-watch = { workspace = true }
bytes = { workspace = true }
http = { workspace = true }
oath-adapter-net-api = { workspace = true }
thiserror = { workspace = true }
futures-core = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
futures-util = { workspace = true }
87 changes: 87 additions & 0 deletions crates/adapter/net/ws/api/src/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//! The named dependency-inversion seam the composition stack builds on.
//!
//! The `HttpClient` analogue for WS (ADR-0032 §7). The WS upgrade *is* an
//! HTTP GET, so the handshake is an `http::Request<()>` (body-agnostic parts
//! — the same shape the shared `AuthSource` will stamp, §8). Per ADR-0029 §5
//! it is a compile-time `impl WsConnector` seam — never `dyn`.
//!
//! This is the **composition** seam: the leaf and every inner resilience
//! layer implement it (ADR-0033 §1). The richer usage seam an adapter holds
//! (`ReconnectingConnector`/`ReconnectingConnection` + `WsControl`) is
//! produced only at the assembly boundary, in a later slice.

use crate::{Lifecycle, WsError, WsSink, WsSource};
use std::future::Future;

/// Establish WebSocket connections: one handshake in, three handles out.
pub trait WsConnector {
/// The send half produced by a successful connect.
type Sink: WsSink;
/// The receive half produced by a successful connect.
type Source: WsSource;

/// Perform the upgrade handshake and yield the two single-owner halves
/// plus the lifecycle read channel (ADR-0032 §2/§4). `&self` — a connector
/// is shared; the reconnect layer calls it once per (re)connect.
fn connect(
&self,
handshake: http::Request<()>,
) -> impl Future<Output = Result<(Self::Sink, Self::Source, Lifecycle), WsError>> + Send;
}

#[cfg(test)]
mod tests {
use super::WsConnector;
use crate::{Frame, Lifecycle, LifecycleSnapshot, WsError, WsSink};
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct StubSink;
impl WsSink for StubSink {
fn send(&mut self, _frame: Frame) -> impl Future<Output = Result<(), WsError>> + Send {
std::future::ready(Ok(()))
}
fn close(self) -> impl Future<Output = Result<(), WsError>> + Send {
std::future::ready(Ok(()))
}
}

struct StubSource;
impl Stream for StubSource {
type Item = Result<Frame, WsError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}

struct StubConnector;
impl WsConnector for StubConnector {
type Sink = StubSink;
type Source = StubSource;
#[allow(clippy::manual_async_fn)]
fn connect(
&self,
_handshake: http::Request<()>,
) -> impl Future<Output = Result<(StubSink, StubSource, Lifecycle), WsError>> + Send
{
async {
let (_tx, lifecycle) = Lifecycle::channel(LifecycleSnapshot::connected(0));
Ok((StubSink, StubSource, lifecycle))
}
}
}

#[tokio::test]
async fn connect_yields_the_three_handles() {
fn assert_connector<C: WsConnector>(_: &C) {}
assert_connector(&StubConnector);

let mut handshake = http::Request::new(());
*handshake.uri_mut() = "wss://api.ibkr.com/v1/api/ws".parse().unwrap();
let (sink, _source, lifecycle) = StubConnector.connect(handshake).await.unwrap();
assert_eq!(lifecycle.snapshot().epoch, 0);
sink.close().await.unwrap();
}
}
Loading