From 2705c724081194876ba65f073b258870c61c82d7 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 16 Jun 2026 13:16:24 +0200 Subject: [PATCH 1/2] refactor: split modules into files --- src/channel.rs | 46 ++ src/channel/mpsc.rs | 446 +++++++++++ src/channel/none.rs | 16 + src/channel/oneshot.rs | 249 ++++++ src/lib.rs | 1638 +-------------------------------------- src/rpc.rs | 720 +++++++++++++++++ src/span_propagation.rs | 112 +++ 7 files changed, 1599 insertions(+), 1628 deletions(-) create mode 100644 src/channel.rs create mode 100644 src/channel/mpsc.rs create mode 100644 src/channel/none.rs create mode 100644 src/channel/oneshot.rs create mode 100644 src/rpc.rs create mode 100644 src/span_propagation.rs diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..9d5f62c --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,46 @@ +//! Channels that abstract over local or remote sending + +use std::io; + +use n0_error::stack_error; + +pub mod mpsc; +pub mod none; +pub mod oneshot; + +/// Error when sending a oneshot or mpsc message. For local communication, +/// the only thing that can go wrong is that the receiver has been dropped. +/// +/// For rpc communication, there can be any number of errors, so this is a +/// generic io error. +#[stack_error(derive, add_meta, from_sources)] +pub enum SendError { + /// The receiver has been closed. This is the only error that can occur + /// for local communication. + #[error("Receiver closed")] + ReceiverClosed, + /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). + /// + /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE + #[error("Maximum message size exceeded")] + MaxMessageSizeExceeded, + /// The underlying io error. This can occur for remote communication, + /// due to a network error or serialization error. + #[error("Io error")] + Io { + #[error(std_err)] + source: io::Error, + }, +} + +impl From for io::Error { + fn from(e: SendError) -> Self { + match e { + SendError::ReceiverClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e), + SendError::MaxMessageSizeExceeded { .. } => { + io::Error::new(io::ErrorKind::InvalidData, e) + } + SendError::Io { source, .. } => source, + } + } +} diff --git a/src/channel/mpsc.rs b/src/channel/mpsc.rs new file mode 100644 index 0000000..74138c6 --- /dev/null +++ b/src/channel/mpsc.rs @@ -0,0 +1,446 @@ +//! SPSC channel, similar to tokio's mpsc channel +//! +//! For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc. + +use std::{fmt::Debug, future::Future, io, marker::PhantomData, pin::Pin, sync::Arc}; + +use n0_error::{e, stack_error}; + +use super::SendError; + +/// Error when receiving a oneshot or mpsc message. For local communication, +/// the only thing that can go wrong is that the sender has been closed. +/// +/// For rpc communication, there can be any number of errors, so this is a +/// generic io error. +#[stack_error(derive, add_meta, from_sources)] +pub enum RecvError { + /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). + /// + /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE + #[error("Maximum message size exceeded")] + MaxMessageSizeExceeded, + /// An io error occurred. This can occur for remote communication, + /// due to a network error or deserialization error. + #[error("Io error")] + Io { + #[error(std_err)] + source: io::Error, + }, +} + +impl From for io::Error { + fn from(e: RecvError) -> Self { + match e { + RecvError::Io { source, .. } => source, + RecvError::MaxMessageSizeExceeded { .. } => { + io::Error::new(io::ErrorKind::InvalidData, e) + } + } + } +} + +/// Create a local mpsc sender and receiver pair, with the given buffer size. +/// +/// This is currently using a tokio channel pair internally. +pub fn channel(buffer: usize) -> (Sender, Receiver) { + let (tx, rx) = tokio::sync::mpsc::channel(buffer); + (tx.into(), rx.into()) +} + +/// Single producer, single consumer sender. +/// +/// For the local case, this wraps a tokio::sync::mpsc::Sender. +pub enum Sender { + Tokio(tokio::sync::mpsc::Sender), + Boxed(Arc>), +} + +impl Clone for Sender { + fn clone(&self) -> Self { + match self { + Self::Tokio(tx) => Self::Tokio(tx.clone()), + Self::Boxed(inner) => Self::Boxed(inner.clone()), + } + } +} + +impl Sender { + pub fn is_rpc(&self) -> bool + where + T: 'static, + { + match self { + Sender::Tokio(_) => false, + Sender::Boxed(x) => x.is_rpc(), + } + } + + #[cfg(feature = "stream")] + pub fn into_sink(self) -> impl n0_future::Sink + Send + 'static + where + T: Send + Sync + 'static, + { + futures_util::sink::unfold(self, |sink, value| async move { + sink.send(value).await?; + Ok(sink) + }) + } +} + +impl Sender { + /// Applies a filter before sending. + /// + /// Messages that don't pass the filter are dropped. + /// + /// If you want to combine multiple filters and maps with minimal + /// overhead, use `with_filter_map` directly. + pub fn with_filter(self, f: F) -> Sender + where + F: Fn(&T) -> bool + Send + Sync + 'static, + { + self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) + } + + /// Applies a transform before sending. + /// + /// If you want to combine multiple filters and maps with minimal + /// overhead, use `with_filter_map` directly. + pub fn with_map(self, f: F) -> Sender + where + F: Fn(U) -> T + Send + Sync + 'static, + U: Send + Sync + 'static, + { + self.with_filter_map(move |u| Some(f(u))) + } + + /// Applies a filter and transform before sending. + /// + /// Any combination of filters and maps can be expressed using + /// a single filter_map. + pub fn with_filter_map(self, f: F) -> Sender + where + F: Fn(U) -> Option + Send + Sync + 'static, + U: Send + Sync + 'static, + { + let inner: Arc> = Arc::new(FilterMapSender { + f, + sender: self, + _p: PhantomData, + }); + Sender::Boxed(inner) + } + + /// Future that resolves when the sender is closed + pub async fn closed(&self) { + match self { + Sender::Tokio(tx) => tx.closed().await, + Sender::Boxed(sink) => sink.closed().await, + } + } +} + +impl From> for Sender { + fn from(tx: tokio::sync::mpsc::Sender) -> Self { + Self::Tokio(tx) + } +} + +impl TryFrom> for tokio::sync::mpsc::Sender { + type Error = Sender; + + fn try_from(value: Sender) -> Result { + match value { + Sender::Tokio(tx) => Ok(tx), + Sender::Boxed(_) => Err(value), + } + } +} + +/// A sender that can be wrapped in a `Arc>`. +pub trait DynSender: Debug + Send + Sync + 'static { + /// Send a message. + /// + /// For the remote case, if the message can not be completely sent, + /// this must return an error and disable the channel. + fn send(&self, value: T) -> Pin> + Send + '_>>; + + /// Try to send a message, returning as fast as possible if sending + /// is not currently possible. + /// + /// For the remote case, it must be guaranteed that the message is + /// either completely sent or not at all. + fn try_send( + &self, + value: T, + ) -> Pin> + Send + '_>>; + + /// Await the sender close + fn closed(&self) -> Pin + Send + Sync + '_>>; + + /// True if this is a remote sender + fn is_rpc(&self) -> bool; +} + +/// A receiver that can be wrapped in a `Box>`. +pub trait DynReceiver: Debug + Send + Sync + 'static { + fn recv( + &mut self, + ) -> Pin, RecvError>> + Send + Sync + '_>>; +} + +impl Debug for Sender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Tokio(x) => f + .debug_struct("Tokio") + .field("avail", &x.capacity()) + .field("cap", &x.max_capacity()) + .finish(), + Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(), + } + } +} + +impl Sender { + /// Send a message and yield until either it is sent or an error occurs. + /// + /// ## Cancellation safety + /// + /// If the future is dropped before completion, and if this is a remote sender, + /// then the sender will be closed and further sends will return an [`SendError::Io`] + /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the + /// future until completion if you want to reuse the sender or any clone afterwards. + pub async fn send(&self, value: T) -> Result<(), SendError> { + match self { + Sender::Tokio(tx) => tx + .send(value) + .await + .map_err(|_| e!(SendError::ReceiverClosed)), + Sender::Boxed(sink) => sink.send(value).await, + } + } + + /// Try to send a message, returning as fast as possible if sending + /// is not currently possible. This can be used to send ephemeral + /// messages. + /// + /// For the local case, this will immediately return false if the + /// channel is full. + /// + /// For the remote case, it will attempt to send the message and + /// return false if sending the first byte fails, otherwise yield + /// until the message is completely sent or an error occurs. This + /// guarantees that the message is sent either completely or not at + /// all. + /// + /// Returns true if the message was sent. + /// + /// ## Cancellation safety + /// + /// If the future is dropped before completion, and if this is a remote sender, + /// then the sender will be closed and further sends will return an [`SendError::Io`] + /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the + /// future until completion if you want to reuse the sender or any clone afterwards. + pub async fn try_send(&self, value: T) -> Result { + match self { + Sender::Tokio(tx) => match tx.try_send(value) { + Ok(()) => Ok(true), + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + Err(e!(SendError::ReceiverClosed)) + } + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => Ok(false), + }, + Sender::Boxed(sink) => sink.try_send(value).await, + } + } +} + +impl crate::sealed::Sealed for Sender {} +impl crate::Sender for Sender {} + +pub enum Receiver { + Tokio(tokio::sync::mpsc::Receiver), + Boxed(Box>), +} + +impl Receiver { + /// Receive a message + /// + /// Returns Ok(None) if the sender has been dropped or the remote end has + /// cleanly closed the connection. + /// + /// Returns an an io error if there was an error receiving the message. + pub async fn recv(&mut self) -> Result, RecvError> { + match self { + Self::Tokio(rx) => Ok(rx.recv().await), + Self::Boxed(rx) => Ok(rx.recv().await?), + } + } + + /// Map messages, transforming them from type T to type U. + pub fn map(self, f: F) -> Receiver + where + F: Fn(T) -> U + Send + Sync + 'static, + U: Send + Sync + 'static, + { + self.filter_map(move |u| Some(f(u))) + } + + /// Filter messages, only passing through those for which the predicate returns true. + /// + /// Messages that don't pass the filter are dropped. + pub fn filter(self, f: F) -> Receiver + where + F: Fn(&T) -> bool + Send + Sync + 'static, + { + self.filter_map(move |u| if f(&u) { Some(u) } else { None }) + } + + /// Filter and map messages, only passing through those for which the function returns Some. + /// + /// Messages that don't pass the filter are dropped. + pub fn filter_map(self, f: F) -> Receiver + where + U: Send + Sync + 'static, + F: Fn(T) -> Option + Send + Sync + 'static, + { + let inner: Box> = Box::new(FilterMapReceiver { + f, + receiver: self, + _p: PhantomData, + }); + Receiver::Boxed(inner) + } + + #[cfg(feature = "stream")] + pub fn into_stream( + self, + ) -> impl n0_future::Stream> + Send + Sync + 'static { + n0_future::stream::unfold(self, |mut recv| async move { + recv.recv().await.transpose().map(|msg| (msg, recv)) + }) + } +} + +impl From> for Receiver { + fn from(rx: tokio::sync::mpsc::Receiver) -> Self { + Self::Tokio(rx) + } +} + +impl TryFrom> for tokio::sync::mpsc::Receiver { + type Error = Receiver; + + fn try_from(value: Receiver) -> Result { + match value { + Receiver::Tokio(tx) => Ok(tx), + Receiver::Boxed(_) => Err(value), + } + } +} + +impl Debug for Receiver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Tokio(inner) => f + .debug_struct("Tokio") + .field("avail", &inner.capacity()) + .field("cap", &inner.max_capacity()) + .finish(), + Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(), + } + } +} + +struct FilterMapSender { + f: F, + sender: Sender, + _p: PhantomData, +} + +impl Debug for FilterMapSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FilterMapSender").finish_non_exhaustive() + } +} + +impl DynSender for FilterMapSender +where + F: Fn(U) -> Option + Send + Sync + 'static, + T: Send + Sync + 'static, + U: Send + Sync + 'static, +{ + fn send(&self, value: U) -> Pin> + Send + '_>> { + Box::pin(async move { + match (self.f)(value) { + Some(v) => self.sender.send(v).await, + _ => Ok(()), + } + }) + } + + fn try_send( + &self, + value: U, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + match (self.f)(value) { + Some(v) => self.sender.try_send(v).await, + _ => Ok(true), + } + }) + } + + fn is_rpc(&self) -> bool { + self.sender.is_rpc() + } + + fn closed(&self) -> Pin + Send + Sync + '_>> { + match self { + FilterMapSender { + sender: Sender::Tokio(tx), + .. + } => Box::pin(tx.closed()), + FilterMapSender { + sender: Sender::Boxed(sink), + .. + } => sink.closed(), + } + } +} + +struct FilterMapReceiver { + f: F, + receiver: Receiver, + _p: PhantomData, +} + +impl Debug for FilterMapReceiver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FilterMapReceiver").finish_non_exhaustive() + } +} + +impl DynReceiver for FilterMapReceiver +where + F: Fn(T) -> Option + Send + Sync + 'static, + T: Send + Sync + 'static, + U: Send + Sync + 'static, +{ + fn recv( + &mut self, + ) -> Pin, RecvError>> + Send + Sync + '_>> { + Box::pin(async move { + while let Some(msg) = self.receiver.recv().await? { + if let Some(v) = (self.f)(msg) { + return Ok(Some(v)); + } + } + Ok(None) + }) + } +} + +impl crate::sealed::Sealed for Receiver {} +impl crate::Receiver for Receiver {} diff --git a/src/channel/none.rs b/src/channel/none.rs new file mode 100644 index 0000000..93acdd6 --- /dev/null +++ b/src/channel/none.rs @@ -0,0 +1,16 @@ +//! No channels, used when no communication is needed + +use crate::sealed::Sealed; + +/// A sender that does nothing. This is used when no communication is needed. +#[derive(Debug)] +pub struct NoSender; +impl Sealed for NoSender {} +impl crate::Sender for NoSender {} + +/// A receiver that does nothing. This is used when no communication is needed. +#[derive(Debug)] +pub struct NoReceiver; + +impl Sealed for NoReceiver {} +impl crate::Receiver for NoReceiver {} diff --git a/src/channel/oneshot.rs b/src/channel/oneshot.rs new file mode 100644 index 0000000..921424a --- /dev/null +++ b/src/channel/oneshot.rs @@ -0,0 +1,249 @@ +//! Oneshot channel, similar to tokio's oneshot channel + +use std::{fmt::Debug, future::Future, io, pin::Pin, task}; + +use n0_error::{e, stack_error}; +use n0_future::future::Boxed as BoxFuture; + +use super::SendError; +use crate::util::FusedOneshotReceiver; + +/// Error when receiving a oneshot or mpsc message. For local communication, +/// the only thing that can go wrong is that the sender has been closed. +/// +/// For rpc communication, there can be any number of errors, so this is a +/// generic io error. +#[stack_error(derive, add_meta, from_sources)] +pub enum RecvError { + /// The sender has been closed. This is the only error that can occur + /// for local communication. + #[error("Sender closed")] + SenderClosed, + /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). + /// + /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE + #[error("Maximum message size exceeded")] + MaxMessageSizeExceeded, + /// An io error occurred. This can occur for remote communication, + /// due to a network error or deserialization error. + #[error("Io error")] + Io { + #[error(std_err)] + source: io::Error, + }, +} + +impl From for io::Error { + fn from(e: RecvError) -> Self { + match e { + RecvError::Io { source, .. } => source, + RecvError::SenderClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e), + RecvError::MaxMessageSizeExceeded { .. } => { + io::Error::new(io::ErrorKind::InvalidData, e) + } + } + } +} + +/// Create a local oneshot sender and receiver pair. +/// +/// This is currently using a tokio channel pair internally. +pub fn channel() -> (Sender, Receiver) { + let (tx, rx) = tokio::sync::oneshot::channel(); + (tx.into(), rx.into()) +} + +/// A generic boxed sender. +/// +/// Remote senders are always boxed, since for remote communication the boxing +/// overhead is negligible. However, boxing can also be used for local communication, +/// e.g. when applying a transform or filter to the message before sending it. +pub type BoxedSender = + Box BoxFuture> + Send + Sync + 'static>; + +/// A sender that can be wrapped in a `Box>`. +/// +/// In addition to implementing `Future`, this provides a fn to check if the sender is +/// an rpc sender. +/// +/// Remote receivers are always boxed, since for remote communication the boxing +/// overhead is negligible. However, boxing can also be used for local communication, +/// e.g. when applying a transform or filter to the message before receiving it. +pub trait DynSender: Future> + Send + Sync + 'static { + fn is_rpc(&self) -> bool; +} + +/// A generic boxed receiver +/// +/// Remote receivers are always boxed, since for remote communication the boxing +/// overhead is negligible. However, boxing can also be used for local communication, +/// e.g. when applying a transform or filter to the message before receiving it. +pub type BoxedReceiver = BoxFuture>; + +/// A oneshot sender. +/// +/// Compared to a local onehsot sender, sending a message is async since in the case +/// of remote communication, sending over the wire is async. Other than that it +/// behaves like a local oneshot sender and has no overhead in the local case. +pub enum Sender { + Tokio(tokio::sync::oneshot::Sender), + /// we can't yet distinguish between local and remote boxed oneshot senders. + /// If we ever want to have local boxed oneshot senders, we need to add a + /// third variant here. + Boxed(BoxedSender), +} + +impl Debug for Sender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Tokio(_) => f.debug_tuple("Tokio").finish(), + Self::Boxed(_) => f.debug_tuple("Boxed").finish(), + } + } +} + +impl From> for Sender { + fn from(tx: tokio::sync::oneshot::Sender) -> Self { + Self::Tokio(tx) + } +} + +impl TryFrom> for tokio::sync::oneshot::Sender { + type Error = Sender; + + fn try_from(value: Sender) -> Result { + match value { + Sender::Tokio(tx) => Ok(tx), + Sender::Boxed(_) => Err(value), + } + } +} + +impl Sender { + /// Send a message + /// + /// If this is a boxed sender that represents a remote connection, sending may yield or fail with an io error. + /// Local senders will never yield, but can fail if the receiver has been closed. + pub async fn send(self, value: T) -> Result<(), SendError> { + match self { + Sender::Tokio(tx) => tx.send(value).map_err(|_| e!(SendError::ReceiverClosed)), + Sender::Boxed(f) => f(value).await, + } + } + + /// Check if this is a remote sender + pub fn is_rpc(&self) -> bool + where + T: 'static, + { + match self { + Sender::Tokio(_) => false, + Sender::Boxed(_) => true, + } + } +} + +impl Sender { + /// Applies a filter before sending. + /// + /// Messages that don't pass the filter are dropped. + pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender { + self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) + } + + /// Applies a transform before sending. + pub fn with_map(self, f: F) -> Sender + where + F: Fn(U) -> T + Send + Sync + 'static, + U: Send + Sync + 'static, + { + self.with_filter_map(move |u| Some(f(u))) + } + + /// Applies a filter and transform before sending. + /// + /// Messages that don't pass the filter are dropped. + pub fn with_filter_map(self, f: F) -> Sender + where + F: Fn(U) -> Option + Send + Sync + 'static, + U: Send + Sync + 'static, + { + let inner: BoxedSender = Box::new(move |value| { + let opt = f(value); + Box::pin(async move { + if let Some(v) = opt { + self.send(v).await + } else { + Ok(()) + } + }) + }); + Sender::Boxed(inner) + } +} + +impl crate::sealed::Sealed for Sender {} +impl crate::Sender for Sender {} + +/// A oneshot receiver. +/// +/// Compared to a local oneshot receiver, receiving a message can fail not just +/// when the sender has been closed, but also when the remote connection fails. +pub enum Receiver { + Tokio(FusedOneshotReceiver), + Boxed(BoxedReceiver), +} + +impl Future for Receiver { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll { + match self.get_mut() { + Self::Tokio(rx) => Pin::new(rx) + .poll(cx) + .map_err(|_| e!(RecvError::SenderClosed)), + Self::Boxed(rx) => Pin::new(rx).poll(cx), + } + } +} + +/// Convert a tokio oneshot receiver to a receiver for this crate +impl From> for Receiver { + fn from(rx: tokio::sync::oneshot::Receiver) -> Self { + Self::Tokio(FusedOneshotReceiver(rx)) + } +} + +impl TryFrom> for tokio::sync::oneshot::Receiver { + type Error = Receiver; + + fn try_from(value: Receiver) -> Result { + match value { + Receiver::Tokio(tx) => Ok(tx.0), + Receiver::Boxed(_) => Err(value), + } + } +} + +/// Convert a function that produces a future to a receiver for this crate +impl From for Receiver +where + F: FnOnce() -> Fut, + Fut: Future> + Send + 'static, +{ + fn from(f: F) -> Self { + Self::Boxed(Box::pin(f())) + } +} + +impl Debug for Receiver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Tokio(_) => f.debug_tuple("Tokio").finish(), + Self::Boxed(_) => f.debug_tuple("Boxed").finish(), + } + } +} + +impl crate::sealed::Sealed for Receiver {} +impl crate::Receiver for Receiver {} diff --git a/src/lib.rs b/src/lib.rs index 0f84702..f0bc270 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -318,128 +318,23 @@ use self::{ }; use crate::channel::SendError; +pub mod channel; +#[cfg(feature = "rpc")] +#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] +pub mod span_propagation; #[cfg(test)] mod tests; pub mod util; - -mod sealed { - pub trait Sealed {} +#[cfg(not(feature = "rpc"))] +pub mod rpc { + pub struct RemoteSender(std::marker::PhantomData); } - -/// Span context propagation for remote RPC calls -/// -/// This module provides the `SpanContextCarrier` type for propagating trace context -/// across remote boundaries. The type is always available when `rpc` feature is enabled, -/// but actual OpenTelemetry integration requires the `tracing-opentelemetry` feature. -/// -/// The propagated context is scoped to a single request handler via a tokio task-local, -/// installed by the dispatch loop in `handle_connection`. This isolates concurrent -/// requests from each other and is robust to thread migration across `.await` points. #[cfg(feature = "rpc")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] -pub mod span_propagation { - use std::{collections::HashMap, future::Future}; - - use serde::{Deserialize, Serialize}; - - #[cfg(feature = "tracing-opentelemetry")] - tokio::task_local! { - static SPAN_CONTEXT: opentelemetry::Context; - } - - /// Carrier for propagating span context across RPC boundaries using W3C Trace Context format. - /// - /// This type is always available for serialization purposes. When the - /// `tracing-opentelemetry` feature is enabled, it can extract/inject actual - /// OpenTelemetry trace context. Without that feature, it simply serializes as an - /// empty map. - #[derive(Debug, Clone, Serialize, Deserialize, Default)] - pub struct SpanContextCarrier { - headers: HashMap, - } - - #[cfg(feature = "tracing-opentelemetry")] - impl opentelemetry::propagation::Injector for SpanContextCarrier { - fn set(&mut self, key: &str, value: String) { - self.headers.insert(key.to_string(), value); - } - } - - #[cfg(feature = "tracing-opentelemetry")] - impl opentelemetry::propagation::Extractor for SpanContextCarrier { - fn get(&self, key: &str) -> Option<&str> { - self.headers.get(key).map(|v| v.as_str()) - } - - fn keys(&self) -> Vec<&str> { - self.headers.keys().map(|k| k.as_str()).collect() - } - } - - impl SpanContextCarrier { - /// Create a carrier from the current OpenTelemetry context. - /// - /// When `tracing-opentelemetry` feature is enabled, this extracts the current - /// trace context. Without the feature, this returns an empty carrier. - #[cfg(feature = "tracing-opentelemetry")] - pub fn from_current() -> Self { - use opentelemetry::global; - use tracing_opentelemetry::OpenTelemetrySpanExt; - let mut carrier = Self::default(); - // Get the OTel context from the current tracing span, not from - // opentelemetry::Context::current(). The tracing-opentelemetry layer - // stores OTel spans inside tracing spans, so the thread-local OTel - // context won't have the right span. - let ctx = tracing::Span::current().context(); - global::get_text_map_propagator(|prop| { - prop.inject_context(&ctx, &mut carrier); - }); - carrier - } - - #[cfg(not(feature = "tracing-opentelemetry"))] - pub fn from_current() -> Self { - Self::default() - } - - /// Extract an OpenTelemetry context from this carrier. - #[cfg(feature = "tracing-opentelemetry")] - pub fn to_context(&self) -> opentelemetry::Context { - use opentelemetry::global; - global::get_text_map_propagator(|prop| { - prop.extract_with_context(&opentelemetry::Context::current(), self) - }) - } - } - - /// Run `fut` with `carrier`'s context installed as the per-task scope read by - /// [`set_span_parent_from_remote`]. - /// - /// Used by transport implementations (`irpc::rpc`, `irpc-iroh`) to wrap a single - /// request handler. Most users will not call this directly. - pub async fn scope_remote(carrier: Option, fut: F) -> F::Output { - #[cfg(feature = "tracing-opentelemetry")] - if let Some(carrier) = carrier { - return SPAN_CONTEXT.scope(carrier.to_context(), fut).await; - } - let _ = carrier; - fut.await - } +pub mod rpc; - /// Set the parent of a span from the propagated remote context, if one is in scope. - /// - /// Called by the code generated by `rpc_requests(span_propagation)`. Looks up the - /// task-local installed by the dispatch loop; no-op outside that scope. - pub fn set_span_parent_from_remote(span: &tracing::Span) { - #[cfg(feature = "tracing-opentelemetry")] - { - let _ = SPAN_CONTEXT.try_with(|ctx| { - use tracing_opentelemetry::OpenTelemetrySpanExt; - let _ = span.set_parent(ctx.clone()); - }); - } - let _ = span; - } +mod sealed { + pub trait Sealed {} } /// Requirements for a RPC message @@ -496,777 +391,6 @@ pub trait Channels: Send + 'static { type Rx: Receiver; } -/// Channels that abstract over local or remote sending -pub mod channel { - use std::io; - - use n0_error::stack_error; - - /// Oneshot channel, similar to tokio's oneshot channel - pub mod oneshot { - use std::{fmt::Debug, future::Future, io, pin::Pin, task}; - - use n0_error::{e, stack_error}; - use n0_future::future::Boxed as BoxFuture; - - use super::SendError; - use crate::util::FusedOneshotReceiver; - - /// Error when receiving a oneshot or mpsc message. For local communication, - /// the only thing that can go wrong is that the sender has been closed. - /// - /// For rpc communication, there can be any number of errors, so this is a - /// generic io error. - #[stack_error(derive, add_meta, from_sources)] - pub enum RecvError { - /// The sender has been closed. This is the only error that can occur - /// for local communication. - #[error("Sender closed")] - SenderClosed, - /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). - /// - /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE - #[error("Maximum message size exceeded")] - MaxMessageSizeExceeded, - /// An io error occurred. This can occur for remote communication, - /// due to a network error or deserialization error. - #[error("Io error")] - Io { - #[error(std_err)] - source: io::Error, - }, - } - - impl From for io::Error { - fn from(e: RecvError) -> Self { - match e { - RecvError::Io { source, .. } => source, - RecvError::SenderClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e), - RecvError::MaxMessageSizeExceeded { .. } => { - io::Error::new(io::ErrorKind::InvalidData, e) - } - } - } - } - - /// Create a local oneshot sender and receiver pair. - /// - /// This is currently using a tokio channel pair internally. - pub fn channel() -> (Sender, Receiver) { - let (tx, rx) = tokio::sync::oneshot::channel(); - (tx.into(), rx.into()) - } - - /// A generic boxed sender. - /// - /// Remote senders are always boxed, since for remote communication the boxing - /// overhead is negligible. However, boxing can also be used for local communication, - /// e.g. when applying a transform or filter to the message before sending it. - pub type BoxedSender = - Box BoxFuture> + Send + Sync + 'static>; - - /// A sender that can be wrapped in a `Box>`. - /// - /// In addition to implementing `Future`, this provides a fn to check if the sender is - /// an rpc sender. - /// - /// Remote receivers are always boxed, since for remote communication the boxing - /// overhead is negligible. However, boxing can also be used for local communication, - /// e.g. when applying a transform or filter to the message before receiving it. - pub trait DynSender: - Future> + Send + Sync + 'static - { - fn is_rpc(&self) -> bool; - } - - /// A generic boxed receiver - /// - /// Remote receivers are always boxed, since for remote communication the boxing - /// overhead is negligible. However, boxing can also be used for local communication, - /// e.g. when applying a transform or filter to the message before receiving it. - pub type BoxedReceiver = BoxFuture>; - - /// A oneshot sender. - /// - /// Compared to a local onehsot sender, sending a message is async since in the case - /// of remote communication, sending over the wire is async. Other than that it - /// behaves like a local oneshot sender and has no overhead in the local case. - pub enum Sender { - Tokio(tokio::sync::oneshot::Sender), - /// we can't yet distinguish between local and remote boxed oneshot senders. - /// If we ever want to have local boxed oneshot senders, we need to add a - /// third variant here. - Boxed(BoxedSender), - } - - impl Debug for Sender { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Tokio(_) => f.debug_tuple("Tokio").finish(), - Self::Boxed(_) => f.debug_tuple("Boxed").finish(), - } - } - } - - impl From> for Sender { - fn from(tx: tokio::sync::oneshot::Sender) -> Self { - Self::Tokio(tx) - } - } - - impl TryFrom> for tokio::sync::oneshot::Sender { - type Error = Sender; - - fn try_from(value: Sender) -> Result { - match value { - Sender::Tokio(tx) => Ok(tx), - Sender::Boxed(_) => Err(value), - } - } - } - - impl Sender { - /// Send a message - /// - /// If this is a boxed sender that represents a remote connection, sending may yield or fail with an io error. - /// Local senders will never yield, but can fail if the receiver has been closed. - pub async fn send(self, value: T) -> Result<(), SendError> { - match self { - Sender::Tokio(tx) => tx.send(value).map_err(|_| e!(SendError::ReceiverClosed)), - Sender::Boxed(f) => f(value).await, - } - } - - /// Check if this is a remote sender - pub fn is_rpc(&self) -> bool - where - T: 'static, - { - match self { - Sender::Tokio(_) => false, - Sender::Boxed(_) => true, - } - } - } - - impl Sender { - /// Applies a filter before sending. - /// - /// Messages that don't pass the filter are dropped. - pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender { - self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) - } - - /// Applies a transform before sending. - pub fn with_map(self, f: F) -> Sender - where - F: Fn(U) -> T + Send + Sync + 'static, - U: Send + Sync + 'static, - { - self.with_filter_map(move |u| Some(f(u))) - } - - /// Applies a filter and transform before sending. - /// - /// Messages that don't pass the filter are dropped. - pub fn with_filter_map(self, f: F) -> Sender - where - F: Fn(U) -> Option + Send + Sync + 'static, - U: Send + Sync + 'static, - { - let inner: BoxedSender = Box::new(move |value| { - let opt = f(value); - Box::pin(async move { - if let Some(v) = opt { - self.send(v).await - } else { - Ok(()) - } - }) - }); - Sender::Boxed(inner) - } - } - - impl crate::sealed::Sealed for Sender {} - impl crate::Sender for Sender {} - - /// A oneshot receiver. - /// - /// Compared to a local oneshot receiver, receiving a message can fail not just - /// when the sender has been closed, but also when the remote connection fails. - pub enum Receiver { - Tokio(FusedOneshotReceiver), - Boxed(BoxedReceiver), - } - - impl Future for Receiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll { - match self.get_mut() { - Self::Tokio(rx) => Pin::new(rx) - .poll(cx) - .map_err(|_| e!(RecvError::SenderClosed)), - Self::Boxed(rx) => Pin::new(rx).poll(cx), - } - } - } - - /// Convert a tokio oneshot receiver to a receiver for this crate - impl From> for Receiver { - fn from(rx: tokio::sync::oneshot::Receiver) -> Self { - Self::Tokio(FusedOneshotReceiver(rx)) - } - } - - impl TryFrom> for tokio::sync::oneshot::Receiver { - type Error = Receiver; - - fn try_from(value: Receiver) -> Result { - match value { - Receiver::Tokio(tx) => Ok(tx.0), - Receiver::Boxed(_) => Err(value), - } - } - } - - /// Convert a function that produces a future to a receiver for this crate - impl From for Receiver - where - F: FnOnce() -> Fut, - Fut: Future> + Send + 'static, - { - fn from(f: F) -> Self { - Self::Boxed(Box::pin(f())) - } - } - - impl Debug for Receiver { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Tokio(_) => f.debug_tuple("Tokio").finish(), - Self::Boxed(_) => f.debug_tuple("Boxed").finish(), - } - } - } - - impl crate::sealed::Sealed for Receiver {} - impl crate::Receiver for Receiver {} - } - - /// SPSC channel, similar to tokio's mpsc channel - /// - /// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc. - pub mod mpsc { - use std::{fmt::Debug, future::Future, io, marker::PhantomData, pin::Pin, sync::Arc}; - - use n0_error::{e, stack_error}; - - use super::SendError; - - /// Error when receiving a oneshot or mpsc message. For local communication, - /// the only thing that can go wrong is that the sender has been closed. - /// - /// For rpc communication, there can be any number of errors, so this is a - /// generic io error. - #[stack_error(derive, add_meta, from_sources)] - pub enum RecvError { - /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). - /// - /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE - #[error("Maximum message size exceeded")] - MaxMessageSizeExceeded, - /// An io error occurred. This can occur for remote communication, - /// due to a network error or deserialization error. - #[error("Io error")] - Io { - #[error(std_err)] - source: io::Error, - }, - } - - impl From for io::Error { - fn from(e: RecvError) -> Self { - match e { - RecvError::Io { source, .. } => source, - RecvError::MaxMessageSizeExceeded { .. } => { - io::Error::new(io::ErrorKind::InvalidData, e) - } - } - } - } - - /// Create a local mpsc sender and receiver pair, with the given buffer size. - /// - /// This is currently using a tokio channel pair internally. - pub fn channel(buffer: usize) -> (Sender, Receiver) { - let (tx, rx) = tokio::sync::mpsc::channel(buffer); - (tx.into(), rx.into()) - } - - /// Single producer, single consumer sender. - /// - /// For the local case, this wraps a tokio::sync::mpsc::Sender. - pub enum Sender { - Tokio(tokio::sync::mpsc::Sender), - Boxed(Arc>), - } - - impl Clone for Sender { - fn clone(&self) -> Self { - match self { - Self::Tokio(tx) => Self::Tokio(tx.clone()), - Self::Boxed(inner) => Self::Boxed(inner.clone()), - } - } - } - - impl Sender { - pub fn is_rpc(&self) -> bool - where - T: 'static, - { - match self { - Sender::Tokio(_) => false, - Sender::Boxed(x) => x.is_rpc(), - } - } - - #[cfg(feature = "stream")] - pub fn into_sink(self) -> impl n0_future::Sink + Send + 'static - where - T: Send + Sync + 'static, - { - futures_util::sink::unfold(self, |sink, value| async move { - sink.send(value).await?; - Ok(sink) - }) - } - } - - impl Sender { - /// Applies a filter before sending. - /// - /// Messages that don't pass the filter are dropped. - /// - /// If you want to combine multiple filters and maps with minimal - /// overhead, use `with_filter_map` directly. - pub fn with_filter(self, f: F) -> Sender - where - F: Fn(&T) -> bool + Send + Sync + 'static, - { - self.with_filter_map(move |u| if f(&u) { Some(u) } else { None }) - } - - /// Applies a transform before sending. - /// - /// If you want to combine multiple filters and maps with minimal - /// overhead, use `with_filter_map` directly. - pub fn with_map(self, f: F) -> Sender - where - F: Fn(U) -> T + Send + Sync + 'static, - U: Send + Sync + 'static, - { - self.with_filter_map(move |u| Some(f(u))) - } - - /// Applies a filter and transform before sending. - /// - /// Any combination of filters and maps can be expressed using - /// a single filter_map. - pub fn with_filter_map(self, f: F) -> Sender - where - F: Fn(U) -> Option + Send + Sync + 'static, - U: Send + Sync + 'static, - { - let inner: Arc> = Arc::new(FilterMapSender { - f, - sender: self, - _p: PhantomData, - }); - Sender::Boxed(inner) - } - - /// Future that resolves when the sender is closed - pub async fn closed(&self) { - match self { - Sender::Tokio(tx) => tx.closed().await, - Sender::Boxed(sink) => sink.closed().await, - } - } - } - - impl From> for Sender { - fn from(tx: tokio::sync::mpsc::Sender) -> Self { - Self::Tokio(tx) - } - } - - impl TryFrom> for tokio::sync::mpsc::Sender { - type Error = Sender; - - fn try_from(value: Sender) -> Result { - match value { - Sender::Tokio(tx) => Ok(tx), - Sender::Boxed(_) => Err(value), - } - } - } - - /// A sender that can be wrapped in a `Arc>`. - pub trait DynSender: Debug + Send + Sync + 'static { - /// Send a message. - /// - /// For the remote case, if the message can not be completely sent, - /// this must return an error and disable the channel. - fn send( - &self, - value: T, - ) -> Pin> + Send + '_>>; - - /// Try to send a message, returning as fast as possible if sending - /// is not currently possible. - /// - /// For the remote case, it must be guaranteed that the message is - /// either completely sent or not at all. - fn try_send( - &self, - value: T, - ) -> Pin> + Send + '_>>; - - /// Await the sender close - fn closed(&self) -> Pin + Send + Sync + '_>>; - - /// True if this is a remote sender - fn is_rpc(&self) -> bool; - } - - /// A receiver that can be wrapped in a `Box>`. - pub trait DynReceiver: Debug + Send + Sync + 'static { - fn recv( - &mut self, - ) -> Pin, RecvError>> + Send + Sync + '_>>; - } - - impl Debug for Sender { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Tokio(x) => f - .debug_struct("Tokio") - .field("avail", &x.capacity()) - .field("cap", &x.max_capacity()) - .finish(), - Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(), - } - } - } - - impl Sender { - /// Send a message and yield until either it is sent or an error occurs. - /// - /// ## Cancellation safety - /// - /// If the future is dropped before completion, and if this is a remote sender, - /// then the sender will be closed and further sends will return an [`SendError::Io`] - /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the - /// future until completion if you want to reuse the sender or any clone afterwards. - pub async fn send(&self, value: T) -> Result<(), SendError> { - match self { - Sender::Tokio(tx) => tx - .send(value) - .await - .map_err(|_| e!(SendError::ReceiverClosed)), - Sender::Boxed(sink) => sink.send(value).await, - } - } - - /// Try to send a message, returning as fast as possible if sending - /// is not currently possible. This can be used to send ephemeral - /// messages. - /// - /// For the local case, this will immediately return false if the - /// channel is full. - /// - /// For the remote case, it will attempt to send the message and - /// return false if sending the first byte fails, otherwise yield - /// until the message is completely sent or an error occurs. This - /// guarantees that the message is sent either completely or not at - /// all. - /// - /// Returns true if the message was sent. - /// - /// ## Cancellation safety - /// - /// If the future is dropped before completion, and if this is a remote sender, - /// then the sender will be closed and further sends will return an [`SendError::Io`] - /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the - /// future until completion if you want to reuse the sender or any clone afterwards. - pub async fn try_send(&self, value: T) -> Result { - match self { - Sender::Tokio(tx) => match tx.try_send(value) { - Ok(()) => Ok(true), - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - Err(e!(SendError::ReceiverClosed)) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => Ok(false), - }, - Sender::Boxed(sink) => sink.try_send(value).await, - } - } - } - - impl crate::sealed::Sealed for Sender {} - impl crate::Sender for Sender {} - - pub enum Receiver { - Tokio(tokio::sync::mpsc::Receiver), - Boxed(Box>), - } - - impl Receiver { - /// Receive a message - /// - /// Returns Ok(None) if the sender has been dropped or the remote end has - /// cleanly closed the connection. - /// - /// Returns an an io error if there was an error receiving the message. - pub async fn recv(&mut self) -> Result, RecvError> { - match self { - Self::Tokio(rx) => Ok(rx.recv().await), - Self::Boxed(rx) => Ok(rx.recv().await?), - } - } - - /// Map messages, transforming them from type T to type U. - pub fn map(self, f: F) -> Receiver - where - F: Fn(T) -> U + Send + Sync + 'static, - U: Send + Sync + 'static, - { - self.filter_map(move |u| Some(f(u))) - } - - /// Filter messages, only passing through those for which the predicate returns true. - /// - /// Messages that don't pass the filter are dropped. - pub fn filter(self, f: F) -> Receiver - where - F: Fn(&T) -> bool + Send + Sync + 'static, - { - self.filter_map(move |u| if f(&u) { Some(u) } else { None }) - } - - /// Filter and map messages, only passing through those for which the function returns Some. - /// - /// Messages that don't pass the filter are dropped. - pub fn filter_map(self, f: F) -> Receiver - where - U: Send + Sync + 'static, - F: Fn(T) -> Option + Send + Sync + 'static, - { - let inner: Box> = Box::new(FilterMapReceiver { - f, - receiver: self, - _p: PhantomData, - }); - Receiver::Boxed(inner) - } - - #[cfg(feature = "stream")] - pub fn into_stream( - self, - ) -> impl n0_future::Stream> + Send + Sync + 'static - { - n0_future::stream::unfold(self, |mut recv| async move { - recv.recv().await.transpose().map(|msg| (msg, recv)) - }) - } - } - - impl From> for Receiver { - fn from(rx: tokio::sync::mpsc::Receiver) -> Self { - Self::Tokio(rx) - } - } - - impl TryFrom> for tokio::sync::mpsc::Receiver { - type Error = Receiver; - - fn try_from(value: Receiver) -> Result { - match value { - Receiver::Tokio(tx) => Ok(tx), - Receiver::Boxed(_) => Err(value), - } - } - } - - impl Debug for Receiver { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Tokio(inner) => f - .debug_struct("Tokio") - .field("avail", &inner.capacity()) - .field("cap", &inner.max_capacity()) - .finish(), - Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(), - } - } - } - - struct FilterMapSender { - f: F, - sender: Sender, - _p: PhantomData, - } - - impl Debug for FilterMapSender { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FilterMapSender").finish_non_exhaustive() - } - } - - impl DynSender for FilterMapSender - where - F: Fn(U) -> Option + Send + Sync + 'static, - T: Send + Sync + 'static, - U: Send + Sync + 'static, - { - fn send( - &self, - value: U, - ) -> Pin> + Send + '_>> { - Box::pin(async move { - match (self.f)(value) { - Some(v) => self.sender.send(v).await, - _ => Ok(()), - } - }) - } - - fn try_send( - &self, - value: U, - ) -> Pin> + Send + '_>> { - Box::pin(async move { - match (self.f)(value) { - Some(v) => self.sender.try_send(v).await, - _ => Ok(true), - } - }) - } - - fn is_rpc(&self) -> bool { - self.sender.is_rpc() - } - - fn closed(&self) -> Pin + Send + Sync + '_>> { - match self { - FilterMapSender { - sender: Sender::Tokio(tx), - .. - } => Box::pin(tx.closed()), - FilterMapSender { - sender: Sender::Boxed(sink), - .. - } => sink.closed(), - } - } - } - - struct FilterMapReceiver { - f: F, - receiver: Receiver, - _p: PhantomData, - } - - impl Debug for FilterMapReceiver { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FilterMapReceiver").finish_non_exhaustive() - } - } - - impl DynReceiver for FilterMapReceiver - where - F: Fn(T) -> Option + Send + Sync + 'static, - T: Send + Sync + 'static, - U: Send + Sync + 'static, - { - fn recv( - &mut self, - ) -> Pin, RecvError>> + Send + Sync + '_>> - { - Box::pin(async move { - while let Some(msg) = self.receiver.recv().await? { - if let Some(v) = (self.f)(msg) { - return Ok(Some(v)); - } - } - Ok(None) - }) - } - } - - impl crate::sealed::Sealed for Receiver {} - impl crate::Receiver for Receiver {} - } - - /// No channels, used when no communication is needed - pub mod none { - use crate::sealed::Sealed; - - /// A sender that does nothing. This is used when no communication is needed. - #[derive(Debug)] - pub struct NoSender; - impl Sealed for NoSender {} - impl crate::Sender for NoSender {} - - /// A receiver that does nothing. This is used when no communication is needed. - #[derive(Debug)] - pub struct NoReceiver; - - impl Sealed for NoReceiver {} - impl crate::Receiver for NoReceiver {} - } - - /// Error when sending a oneshot or mpsc message. For local communication, - /// the only thing that can go wrong is that the receiver has been dropped. - /// - /// For rpc communication, there can be any number of errors, so this is a - /// generic io error. - #[stack_error(derive, add_meta, from_sources)] - pub enum SendError { - /// The receiver has been closed. This is the only error that can occur - /// for local communication. - #[error("Receiver closed")] - ReceiverClosed, - /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). - /// - /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE - #[error("Maximum message size exceeded")] - MaxMessageSizeExceeded, - /// The underlying io error. This can occur for remote communication, - /// due to a network error or serialization error. - #[error("Io error")] - Io { - #[error(std_err)] - source: io::Error, - }, - } - - impl From for io::Error { - fn from(e: SendError) -> Self { - match e { - SendError::ReceiverClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e), - SendError::MaxMessageSizeExceeded { .. } => { - io::Error::new(io::ErrorKind::InvalidData, e) - } - SendError::Io { source, .. } => source, - } - } - } -} - /// A wrapper for a message with channels to send and receive it. /// This expands the protocol message to a full message that includes the /// active and unserializable channels. @@ -1887,748 +1011,6 @@ impl From> for LocalSender< } } -#[cfg(not(feature = "rpc"))] -pub mod rpc { - pub struct RemoteSender(std::marker::PhantomData); -} - -#[cfg(feature = "rpc")] -#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))] -pub mod rpc { - //! Module for cross-process RPC using [`noq`]. - use std::{ - fmt::Debug, future::Future, io, marker::PhantomData, ops::DerefMut, pin::Pin, sync::Arc, - }; - - use n0_error::{e, stack_error}; - use n0_future::{future::Boxed as BoxFuture, task::JoinSet}; - /// This is used by irpc-derive to refer to noq types (SendStream and RecvStream) - /// to make generated code work for users without having to depend on noq directly - /// (i.e. when using iroh). - #[doc(hidden)] - pub use noq; - use noq::{ConnectionError, PathId}; - use serde::de::DeserializeOwned; - use smallvec::SmallVec; - use tracing::{Instrument, debug, error_span, trace, warn}; - - use crate::{ - LocalSender, RequestError, RpcMessage, Service, - channel::{ - SendError, - mpsc::{self, DynReceiver, DynSender}, - none::NoSender, - oneshot, - }, - util::{AsyncReadVarintExt, WriteVarintExt, now_or_never}, - }; - - /// Default max message size (16 MiB). - pub const MAX_MESSAGE_SIZE: u64 = 1024 * 1024 * 16; - - /// Error code on streams if the max message size was exceeded. - pub const ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED: u32 = 1; - - /// Error code on streams if the sender tried to send an message that could not be postcard serialized. - pub const ERROR_CODE_INVALID_POSTCARD: u32 = 2; - - /// Error that can occur when writing the initial message when doing a - /// cross-process RPC. - #[stack_error(derive, add_meta, from_sources)] - pub enum WriteError { - /// Error writing to the stream with noq - #[error("Error writing to stream")] - Noq { - #[error(std_err)] - source: noq::WriteError, - }, - /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). - #[error("Maximum message size exceeded")] - MaxMessageSizeExceeded, - /// Generic IO error, e.g. when serializing the message or when using - /// other transports. - #[error("Error serializing")] - Io { - #[error(std_err)] - source: io::Error, - }, - } - - impl From for WriteError { - fn from(value: postcard::Error) -> Self { - e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value)) - } - } - - impl From for SendError { - fn from(value: postcard::Error) -> Self { - e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value)) - } - } - - impl From for io::Error { - fn from(e: WriteError) -> Self { - match e { - WriteError::Io { source, .. } => source, - WriteError::MaxMessageSizeExceeded { .. } => { - io::Error::new(io::ErrorKind::InvalidData, e) - } - WriteError::Noq { source, .. } => source.into(), - } - } - } - - impl From for SendError { - fn from(err: noq::WriteError) -> Self { - match err { - noq::WriteError::Stopped(code) - if code == ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into() => - { - e!(SendError::MaxMessageSizeExceeded) - } - _ => e!(SendError::Io, io::Error::from(err)), - } - } - } - - /// Trait to abstract over a client connection to a remote service. - /// - /// This isn't really that much abstracted, since the result of open_bi must - /// still be a noq::SendStream and noq::RecvStream. This is just so we - /// can have different connection implementations for normal noq connections, - /// iroh connections, and possibly noq connections with disabled encryption - /// for performance. - /// - /// This is done as a trait instead of an enum, so we don't need an iroh - /// dependency in the main crate. - pub trait RemoteConnection: Send + Sync + Debug + 'static { - /// Boxed clone so the trait is dynable. - fn clone_boxed(&self) -> Box; - - /// Open a bidirectional stream to the remote service. - fn open_bi( - &self, - ) -> BoxFuture>; - - /// Returns whether 0-RTT data was rejected by the server. - /// - /// For connections that were fully authenticated before allowing to send any data, this should return `false`. - fn zero_rtt_rejected(&self) -> BoxFuture; - } - - /// A connection to a remote service. - /// - /// Initially this does just have the endpoint and the address. Once a - /// connection is established, it will be stored. - #[derive(Debug, Clone)] - pub(crate) struct NoqLazyRemoteConnection(Arc); - - #[derive(Debug)] - struct NoqLazyRemoteConnectionInner { - pub endpoint: noq::Endpoint, - pub addr: std::net::SocketAddr, - pub connection: tokio::sync::Mutex>, - } - - impl RemoteConnection for noq::Connection { - fn clone_boxed(&self) -> Box { - Box::new(self.clone()) - } - - fn open_bi( - &self, - ) -> BoxFuture> - { - let conn = self.clone(); - Box::pin(async move { - let pair = conn.open_bi().await?; - Ok(pair) - }) - } - - fn zero_rtt_rejected(&self) -> BoxFuture { - Box::pin(async { false }) - } - } - - impl NoqLazyRemoteConnection { - pub fn new(endpoint: noq::Endpoint, addr: std::net::SocketAddr) -> Self { - Self(Arc::new(NoqLazyRemoteConnectionInner { - endpoint, - addr, - connection: Default::default(), - })) - } - } - - impl RemoteConnection for NoqLazyRemoteConnection { - fn clone_boxed(&self) -> Box { - Box::new(self.clone()) - } - - fn open_bi( - &self, - ) -> BoxFuture> - { - let this = self.0.clone(); - Box::pin(async move { - let mut guard = this.connection.lock().await; - let pair = match guard.as_mut() { - Some(conn) => { - // try to reuse the connection - match conn.open_bi().await { - Ok(pair) => pair, - Err(_) => { - // try with a new connection, just once - *guard = None; - connect_and_open_bi(&this.endpoint, &this.addr, guard).await? - } - } - } - None => connect_and_open_bi(&this.endpoint, &this.addr, guard).await?, - }; - Ok(pair) - }) - } - - fn zero_rtt_rejected(&self) -> BoxFuture { - Box::pin(async { false }) - } - } - - async fn connect_and_open_bi( - endpoint: &noq::Endpoint, - addr: &std::net::SocketAddr, - mut guard: tokio::sync::MutexGuard<'_, Option>, - ) -> Result<(noq::SendStream, noq::RecvStream), RequestError> { - let conn = endpoint.connect(*addr, "localhost")?.await?; - let (send, recv) = conn.open_bi().await?; - *guard = Some(conn); - Ok((send, recv)) - } - - /// A connection to a remote service that can be used to send the initial message. - #[derive(Debug)] - pub struct RemoteSender( - noq::SendStream, - noq::RecvStream, - std::marker::PhantomData, - ); - - /// Serialize a message for sending over the wire. - /// - /// When `S::SPAN_PROPAGATION` is true, the message is wrapped in a tuple with - /// span context: `(Option, msg)`. - /// When false, the message is serialized directly. - pub(crate) fn prepare_write( - msg: impl Into, - ) -> Result, WriteError> { - let msg = msg.into(); - let mut buf = SmallVec::<[u8; 128]>::new(); - - if S::SPAN_PROPAGATION { - // Include span context in wire format - let span_ctx = Some(crate::span_propagation::SpanContextCarrier::from_current()); - let payload = (span_ctx, msg); - if postcard::experimental::serialized_size(&payload)? as u64 > MAX_MESSAGE_SIZE { - return Err(e!(WriteError::MaxMessageSizeExceeded)); - } - buf.write_length_prefixed(&payload)?; - } else { - // Original wire format without span context - if postcard::experimental::serialized_size(&msg)? as u64 > MAX_MESSAGE_SIZE { - return Err(e!(WriteError::MaxMessageSizeExceeded)); - } - buf.write_length_prefixed(&msg)?; - } - - Ok(buf) - } - - impl RemoteSender { - pub fn new(send: noq::SendStream, recv: noq::RecvStream) -> Self { - Self(send, recv, PhantomData) - } - - pub async fn write( - self, - msg: impl Into, - ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> { - let buf = prepare_write(msg)?; - self.write_raw(&buf).await - } - - pub(crate) async fn write_raw( - self, - buf: &[u8], - ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> { - let RemoteSender(mut send, recv, _) = self; - send.write_all(buf).await?; - Ok((send, recv)) - } - } - - impl From for oneshot::Receiver { - fn from(mut read: noq::RecvStream) -> Self { - let fut = async move { - let size = read.read_varint_u64().await?.ok_or(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to read size", - ))?; - if size > MAX_MESSAGE_SIZE { - read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok(); - return Err(e!(oneshot::RecvError::MaxMessageSizeExceeded)); - } - let rest = read - .read_to_end(size as usize) - .await - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let msg: T = postcard::from_bytes(&rest) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(msg) - }; - oneshot::Receiver::from(|| fut) - } - } - - impl From for crate::channel::none::NoReceiver { - fn from(read: noq::RecvStream) -> Self { - drop(read); - Self - } - } - - impl From for mpsc::Receiver { - fn from(read: noq::RecvStream) -> Self { - mpsc::Receiver::Boxed(Box::new(NoqReceiver { - recv: read, - _marker: PhantomData, - })) - } - } - - impl From for NoSender { - fn from(write: noq::SendStream) -> Self { - let _ = write; - NoSender - } - } - - impl From for oneshot::Sender { - fn from(mut writer: noq::SendStream) -> Self { - oneshot::Sender::Boxed(Box::new(move |value| { - Box::pin(async move { - let size = match postcard::experimental::serialized_size(&value) { - Ok(size) => size, - Err(e) => { - writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); - return Err(e!( - SendError::Io, - io::Error::new(io::ErrorKind::InvalidData, e,) - )); - } - }; - if size as u64 > MAX_MESSAGE_SIZE { - writer - .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()) - .ok(); - return Err(e!(SendError::MaxMessageSizeExceeded)); - } - // write via a small buffer to avoid allocation for small values - let mut buf = SmallVec::<[u8; 128]>::new(); - if let Err(e) = buf.write_length_prefixed(value) { - writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); - return Err(e.into()); - } - writer.write_all(&buf).await?; - Ok(()) - }) - })) - } - } - - impl From for mpsc::Sender { - fn from(write: noq::SendStream) -> Self { - mpsc::Sender::Boxed(Arc::new(NoqSender(tokio::sync::Mutex::new( - NoqSenderState::Open(NoqSenderInner { - send: write, - buffer: SmallVec::new(), - _marker: PhantomData, - }), - )))) - } - } - - struct NoqReceiver { - recv: noq::RecvStream, - _marker: std::marker::PhantomData, - } - - impl Debug for NoqReceiver { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NoqReceiver").finish() - } - } - - impl DynReceiver for NoqReceiver { - fn recv( - &mut self, - ) -> Pin, mpsc::RecvError>> + Send + Sync + '_>> - { - Box::pin(async { - let read = &mut self.recv; - let Some(size) = read.read_varint_u64().await? else { - return Ok(None); - }; - if size > MAX_MESSAGE_SIZE { - self.recv - .stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()) - .ok(); - return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded)); - } - let mut buf = vec![0; size as usize]; - read.read_exact(&mut buf) - .await - .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; - let msg: T = postcard::from_bytes(&buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(Some(msg)) - }) - } - } - - impl Drop for NoqReceiver { - fn drop(&mut self) {} - } - - struct NoqSenderInner { - send: noq::SendStream, - buffer: SmallVec<[u8; 128]>, - _marker: std::marker::PhantomData, - } - - impl NoqSenderInner { - fn send( - &mut self, - value: T, - ) -> Pin> + Send + Sync + '_>> { - Box::pin(async { - let size = match postcard::experimental::serialized_size(&value) { - Ok(size) => size, - Err(e) => { - self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); - return Err(e!( - SendError::Io, - io::Error::new(io::ErrorKind::InvalidData, e) - )); - } - }; - if size as u64 > MAX_MESSAGE_SIZE { - self.send - .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()) - .ok(); - return Err(e!(SendError::MaxMessageSizeExceeded)); - } - let value = value; - self.buffer.clear(); - if let Err(e) = self.buffer.write_length_prefixed(value) { - self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); - return Err(e.into()); - } - self.send.write_all(&self.buffer).await?; - self.buffer.clear(); - Ok(()) - }) - } - - fn try_send( - &mut self, - value: T, - ) -> Pin> + Send + Sync + '_>> { - Box::pin(async { - if postcard::experimental::serialized_size(&value)? as u64 > MAX_MESSAGE_SIZE { - return Err(e!(SendError::MaxMessageSizeExceeded)); - } - // todo: move the non-async part out of the box. Will require a new return type. - let value = value; - self.buffer.clear(); - self.buffer.write_length_prefixed(value)?; - let Some(n) = now_or_never(self.send.write(&self.buffer)) else { - return Ok(false); - }; - let n = n?; - self.send.write_all(&self.buffer[n..]).await?; - self.buffer.clear(); - Ok(true) - }) - } - - fn closed(&mut self) -> Pin + Send + Sync + '_>> { - Box::pin(async move { - self.send.stopped().await.ok(); - }) - } - } - - #[derive(Default)] - enum NoqSenderState { - Open(NoqSenderInner), - #[default] - Closed, - } - - struct NoqSender(tokio::sync::Mutex>); - - impl Debug for NoqSender { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NoqSender").finish() - } - } - - impl DynSender for NoqSender { - fn send( - &self, - value: T, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let mut guard = self.0.lock().await; - let sender = std::mem::take(guard.deref_mut()); - match sender { - NoqSenderState::Open(mut sender) => { - let res = sender.send(value).await; - if res.is_ok() { - *guard = NoqSenderState::Open(sender); - } - res - } - NoqSenderState::Closed => { - Err(io::Error::from(io::ErrorKind::BrokenPipe).into()) - } - } - }) - } - - fn try_send( - &self, - value: T, - ) -> Pin> + Send + '_>> { - Box::pin(async { - let mut guard = self.0.lock().await; - let sender = std::mem::take(guard.deref_mut()); - match sender { - NoqSenderState::Open(mut sender) => { - let res = sender.try_send(value).await; - if res.is_ok() { - *guard = NoqSenderState::Open(sender); - } - res - } - NoqSenderState::Closed => { - Err(io::Error::from(io::ErrorKind::BrokenPipe).into()) - } - } - }) - } - - fn closed(&self) -> Pin + Send + Sync + '_>> { - Box::pin(async { - let mut guard = self.0.lock().await; - match guard.deref_mut() { - NoqSenderState::Open(sender) => sender.closed().await, - NoqSenderState::Closed => {} - } - }) - } - - fn is_rpc(&self) -> bool { - true - } - } - - /// Type alias for a handler fn for remote requests - pub type Handler = Arc< - dyn Fn(R, noq::RecvStream, noq::SendStream) -> BoxFuture> - + Send - + Sync - + 'static, - >; - - /// Extension trait to [`Service`] to create a [`Service::Message`] from a [`Service`] - /// and a pair of QUIC streams. - /// - /// This trait is auto-implemented when using the [`crate::rpc_requests`] macro. - pub trait RemoteService: Service + Sized { - /// Returns the message enum for this request by combining `self` (the protocol enum) - /// with a pair of QUIC streams for `tx` and `rx` channels. - fn with_remote_channels(self, rx: noq::RecvStream, tx: noq::SendStream) -> Self::Message; - - /// Creates a [`Handler`] that forwards all messages to a [`LocalSender`]. - fn remote_handler(local_sender: LocalSender) -> Handler { - Arc::new(move |msg, rx, tx| { - // `with_remote_channels` reads the task-local span context installed by - // the dispatch loop, so it must run inside the future (which is polled - // within that scope) rather than eagerly here. - let local_sender = local_sender.clone(); - Box::pin(async move { - let msg = Self::with_remote_channels(msg, rx, tx); - local_sender.send_raw(msg).await - }) - }) - } - } - - /// Utility function to listen for incoming connections and handle them with the provided handler. - /// - /// The wire format used depends on `S::SPAN_PROPAGATION` - if true, span context is expected. - pub async fn listen(endpoint: noq::Endpoint, handler: Handler) { - let mut request_id = 0u64; - let mut tasks = JoinSet::new(); - loop { - let incoming = tokio::select! { - Some(res) = tasks.join_next(), if !tasks.is_empty() => { - res.expect("irpc connection task panicked"); - continue; - } - incoming = endpoint.accept() => { - match incoming { - None => break, - Some(incoming) => incoming - } - } - }; - let handler = handler.clone(); - let fut = async move { - match incoming.await { - Ok(connection) => match handle_connection(connection, handler).await { - Err(err) => warn!("connection closed with error: {err:?}"), - Ok(()) => debug!("connection closed"), - }, - Err(cause) => { - warn!("failed to accept connection: {cause:?}"); - } - }; - }; - let span = error_span!("rpc", id = request_id, remote = tracing::field::Empty); - tasks.spawn(fut.instrument(span)); - request_id += 1; - } - } - - /// Handles a quic connection with the provided `handler`. - /// - /// This function handles requests for a service `S`. The wire format used depends on - /// `S::SPAN_PROPAGATION` - if true, span context is expected in the wire format. - pub async fn handle_connection( - connection: noq::Connection, - handler: Handler, - ) -> io::Result<()> { - let remote = connection - .path(PathId::ZERO) - .and_then(|p| p.remote_address().ok()); - if let Some(remote) = remote { - tracing::Span::current().record("remote", tracing::field::display(remote)); - } - debug!("connection accepted"); - loop { - let Some((msg, carrier, rx, tx)) = read_request_inner::(&connection).await? else { - return Ok(()); - }; - crate::span_propagation::scope_remote(carrier, handler(msg, rx, tx)).await?; - } - } - - /// Reads a request from a connection and converts it to a message enum. - /// - /// This combines `read_request_raw` with `RemoteService::with_remote_channels`. - pub async fn read_request( - connection: &noq::Connection, - ) -> std::io::Result> { - let Some((msg, carrier, rx, tx)) = read_request_inner::(connection).await? else { - return Ok(None); - }; - Ok(Some( - crate::span_propagation::scope_remote(carrier, async move { - S::with_remote_channels(msg, rx, tx) - }) - .await, - )) - } - - /// Reads a single request from the connection. - /// - /// This accepts a bi-directional stream from the connection and reads and parses the request. - /// - /// When `S::SPAN_PROPAGATION` is true, any propagated span context on the wire is - /// silently dropped. Use [`handle_connection`] (or [`read_request`]) if you need - /// the propagated context to reach the generated handler spans. - /// - /// Returns the parsed request and the stream pair if reading and parsing the request succeeded. - /// Returns None if the remote closed the connection with error code `0`. - /// Returns an error for all other failure cases. - pub async fn read_request_raw( - connection: &noq::Connection, - ) -> std::io::Result> { - Ok(read_request_inner::(connection) - .await? - .map(|(msg, _carrier, rx, tx)| (msg, rx, tx))) - } - - /// Internal: read a request and also return the propagated span context carrier. - /// - /// The carrier is `Some` iff `S::SPAN_PROPAGATION` is true and the remote sent one. - async fn read_request_inner( - connection: &noq::Connection, - ) -> std::io::Result< - Option<( - S, - Option, - noq::RecvStream, - noq::SendStream, - )>, - > { - let (send, mut recv) = match connection.accept_bi().await { - Ok((s, r)) => (s, r), - Err(ConnectionError::ApplicationClosed(cause)) - if cause.error_code.into_inner() == 0 => - { - trace!("remote side closed connection {cause:?}"); - return Ok(None); - } - Err(cause) => { - warn!("failed to accept bi stream {cause:?}"); - return Err(cause.into()); - } - }; - let size = recv - .read_varint_u64() - .await? - .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size"))?; - if size > MAX_MESSAGE_SIZE { - connection.close( - ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(), - b"request exceeded max message size", - ); - return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded).into()); - } - let mut buf = vec![0; size as usize]; - recv.read_exact(&mut buf) - .await - .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; - - let (carrier, msg): (Option, S) = - if S::SPAN_PROPAGATION { - postcard::from_bytes(&buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? - } else { - let msg = postcard::from_bytes(&buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - (None, msg) - }; - - Ok(Some((msg, carrier, recv, send))) - } -} - /// A request to a service. This can be either local or remote. #[derive(Debug)] pub enum Request { diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 0000000..49a05f1 --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,720 @@ +//! Module for cross-process RPC using [`noq`]. +//! +use std::{ + fmt::Debug, future::Future, io, marker::PhantomData, ops::DerefMut, pin::Pin, sync::Arc, +}; + +use n0_error::{e, stack_error}; +use n0_future::{future::Boxed as BoxFuture, task::JoinSet}; +/// This is used by irpc-derive to refer to noq types (SendStream and RecvStream) +/// to make generated code work for users without having to depend on noq directly +/// (i.e. when using iroh). +#[doc(hidden)] +pub use noq; +use noq::{ConnectionError, PathId}; +use serde::de::DeserializeOwned; +use smallvec::SmallVec; +use tracing::{Instrument, debug, error_span, trace, warn}; + +use crate::{ + LocalSender, RequestError, RpcMessage, Service, + channel::{ + SendError, + mpsc::{self, DynReceiver, DynSender}, + none::NoSender, + oneshot, + }, + util::{AsyncReadVarintExt, WriteVarintExt, now_or_never}, +}; + +/// Default max message size (16 MiB). +pub const MAX_MESSAGE_SIZE: u64 = 1024 * 1024 * 16; + +/// Error code on streams if the max message size was exceeded. +pub const ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED: u32 = 1; + +/// Error code on streams if the sender tried to send an message that could not be postcard serialized. +pub const ERROR_CODE_INVALID_POSTCARD: u32 = 2; + +/// Error that can occur when writing the initial message when doing a +/// cross-process RPC. +#[stack_error(derive, add_meta, from_sources)] +pub enum WriteError { + /// Error writing to the stream with noq + #[error("Error writing to stream")] + Noq { + #[error(std_err)] + source: noq::WriteError, + }, + /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]). + #[error("Maximum message size exceeded")] + MaxMessageSizeExceeded, + /// Generic IO error, e.g. when serializing the message or when using + /// other transports. + #[error("Error serializing")] + Io { + #[error(std_err)] + source: io::Error, + }, +} + +impl From for WriteError { + fn from(value: postcard::Error) -> Self { + e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value)) + } +} + +impl From for SendError { + fn from(value: postcard::Error) -> Self { + e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value)) + } +} + +impl From for io::Error { + fn from(e: WriteError) -> Self { + match e { + WriteError::Io { source, .. } => source, + WriteError::MaxMessageSizeExceeded { .. } => { + io::Error::new(io::ErrorKind::InvalidData, e) + } + WriteError::Noq { source, .. } => source.into(), + } + } +} + +impl From for SendError { + fn from(err: noq::WriteError) -> Self { + match err { + noq::WriteError::Stopped(code) + if code == ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into() => + { + e!(SendError::MaxMessageSizeExceeded) + } + _ => e!(SendError::Io, io::Error::from(err)), + } + } +} + +/// Trait to abstract over a client connection to a remote service. +/// +/// This isn't really that much abstracted, since the result of open_bi must +/// still be a noq::SendStream and noq::RecvStream. This is just so we +/// can have different connection implementations for normal noq connections, +/// iroh connections, and possibly noq connections with disabled encryption +/// for performance. +/// +/// This is done as a trait instead of an enum, so we don't need an iroh +/// dependency in the main crate. +pub trait RemoteConnection: Send + Sync + Debug + 'static { + /// Boxed clone so the trait is dynable. + fn clone_boxed(&self) -> Box; + + /// Open a bidirectional stream to the remote service. + fn open_bi( + &self, + ) -> BoxFuture>; + + /// Returns whether 0-RTT data was rejected by the server. + /// + /// For connections that were fully authenticated before allowing to send any data, this should return `false`. + fn zero_rtt_rejected(&self) -> BoxFuture; +} + +/// A connection to a remote service. +/// +/// Initially this does just have the endpoint and the address. Once a +/// connection is established, it will be stored. +#[derive(Debug, Clone)] +pub(crate) struct NoqLazyRemoteConnection(Arc); + +#[derive(Debug)] +struct NoqLazyRemoteConnectionInner { + pub endpoint: noq::Endpoint, + pub addr: std::net::SocketAddr, + pub connection: tokio::sync::Mutex>, +} + +impl RemoteConnection for noq::Connection { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn open_bi( + &self, + ) -> BoxFuture> { + let conn = self.clone(); + Box::pin(async move { + let pair = conn.open_bi().await?; + Ok(pair) + }) + } + + fn zero_rtt_rejected(&self) -> BoxFuture { + Box::pin(async { false }) + } +} + +impl NoqLazyRemoteConnection { + pub fn new(endpoint: noq::Endpoint, addr: std::net::SocketAddr) -> Self { + Self(Arc::new(NoqLazyRemoteConnectionInner { + endpoint, + addr, + connection: Default::default(), + })) + } +} + +impl RemoteConnection for NoqLazyRemoteConnection { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn open_bi( + &self, + ) -> BoxFuture> { + let this = self.0.clone(); + Box::pin(async move { + let mut guard = this.connection.lock().await; + let pair = match guard.as_mut() { + Some(conn) => { + // try to reuse the connection + match conn.open_bi().await { + Ok(pair) => pair, + Err(_) => { + // try with a new connection, just once + *guard = None; + connect_and_open_bi(&this.endpoint, &this.addr, guard).await? + } + } + } + None => connect_and_open_bi(&this.endpoint, &this.addr, guard).await?, + }; + Ok(pair) + }) + } + + fn zero_rtt_rejected(&self) -> BoxFuture { + Box::pin(async { false }) + } +} + +async fn connect_and_open_bi( + endpoint: &noq::Endpoint, + addr: &std::net::SocketAddr, + mut guard: tokio::sync::MutexGuard<'_, Option>, +) -> Result<(noq::SendStream, noq::RecvStream), RequestError> { + let conn = endpoint.connect(*addr, "localhost")?.await?; + let (send, recv) = conn.open_bi().await?; + *guard = Some(conn); + Ok((send, recv)) +} + +/// A connection to a remote service that can be used to send the initial message. +#[derive(Debug)] +pub struct RemoteSender( + noq::SendStream, + noq::RecvStream, + std::marker::PhantomData, +); + +/// Serialize a message for sending over the wire. +/// +/// When `S::SPAN_PROPAGATION` is true, the message is wrapped in a tuple with +/// span context: `(Option, msg)`. +/// When false, the message is serialized directly. +pub(crate) fn prepare_write( + msg: impl Into, +) -> Result, WriteError> { + let msg = msg.into(); + let mut buf = SmallVec::<[u8; 128]>::new(); + + if S::SPAN_PROPAGATION { + // Include span context in wire format + let span_ctx = Some(crate::span_propagation::SpanContextCarrier::from_current()); + let payload = (span_ctx, msg); + if postcard::experimental::serialized_size(&payload)? as u64 > MAX_MESSAGE_SIZE { + return Err(e!(WriteError::MaxMessageSizeExceeded)); + } + buf.write_length_prefixed(&payload)?; + } else { + // Original wire format without span context + if postcard::experimental::serialized_size(&msg)? as u64 > MAX_MESSAGE_SIZE { + return Err(e!(WriteError::MaxMessageSizeExceeded)); + } + buf.write_length_prefixed(&msg)?; + } + + Ok(buf) +} + +impl RemoteSender { + pub fn new(send: noq::SendStream, recv: noq::RecvStream) -> Self { + Self(send, recv, PhantomData) + } + + pub async fn write( + self, + msg: impl Into, + ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> { + let buf = prepare_write(msg)?; + self.write_raw(&buf).await + } + + pub(crate) async fn write_raw( + self, + buf: &[u8], + ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> { + let RemoteSender(mut send, recv, _) = self; + send.write_all(buf).await?; + Ok((send, recv)) + } +} + +impl From for oneshot::Receiver { + fn from(mut read: noq::RecvStream) -> Self { + let fut = async move { + let size = read.read_varint_u64().await?.ok_or(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to read size", + ))?; + if size > MAX_MESSAGE_SIZE { + read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok(); + return Err(e!(oneshot::RecvError::MaxMessageSizeExceeded)); + } + let rest = read + .read_to_end(size as usize) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let msg: T = postcard::from_bytes(&rest) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + Ok(msg) + }; + oneshot::Receiver::from(|| fut) + } +} + +impl From for crate::channel::none::NoReceiver { + fn from(read: noq::RecvStream) -> Self { + drop(read); + Self + } +} + +impl From for mpsc::Receiver { + fn from(read: noq::RecvStream) -> Self { + mpsc::Receiver::Boxed(Box::new(NoqReceiver { + recv: read, + _marker: PhantomData, + })) + } +} + +impl From for NoSender { + fn from(write: noq::SendStream) -> Self { + let _ = write; + NoSender + } +} + +impl From for oneshot::Sender { + fn from(mut writer: noq::SendStream) -> Self { + oneshot::Sender::Boxed(Box::new(move |value| { + Box::pin(async move { + let size = match postcard::experimental::serialized_size(&value) { + Ok(size) => size, + Err(e) => { + writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); + return Err(e!( + SendError::Io, + io::Error::new(io::ErrorKind::InvalidData, e,) + )); + } + }; + if size as u64 > MAX_MESSAGE_SIZE { + writer + .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()) + .ok(); + return Err(e!(SendError::MaxMessageSizeExceeded)); + } + // write via a small buffer to avoid allocation for small values + let mut buf = SmallVec::<[u8; 128]>::new(); + if let Err(e) = buf.write_length_prefixed(value) { + writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); + return Err(e.into()); + } + writer.write_all(&buf).await?; + Ok(()) + }) + })) + } +} + +impl From for mpsc::Sender { + fn from(write: noq::SendStream) -> Self { + mpsc::Sender::Boxed(Arc::new(NoqSender(tokio::sync::Mutex::new( + NoqSenderState::Open(NoqSenderInner { + send: write, + buffer: SmallVec::new(), + _marker: PhantomData, + }), + )))) + } +} + +struct NoqReceiver { + recv: noq::RecvStream, + _marker: std::marker::PhantomData, +} + +impl Debug for NoqReceiver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NoqReceiver").finish() + } +} + +impl DynReceiver for NoqReceiver { + fn recv( + &mut self, + ) -> Pin, mpsc::RecvError>> + Send + Sync + '_>> { + Box::pin(async { + let read = &mut self.recv; + let Some(size) = read.read_varint_u64().await? else { + return Ok(None); + }; + if size > MAX_MESSAGE_SIZE { + self.recv + .stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()) + .ok(); + return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded)); + } + let mut buf = vec![0; size as usize]; + read.read_exact(&mut buf) + .await + .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; + let msg: T = postcard::from_bytes(&buf) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + Ok(Some(msg)) + }) + } +} + +impl Drop for NoqReceiver { + fn drop(&mut self) {} +} + +struct NoqSenderInner { + send: noq::SendStream, + buffer: SmallVec<[u8; 128]>, + _marker: std::marker::PhantomData, +} + +impl NoqSenderInner { + fn send( + &mut self, + value: T, + ) -> Pin> + Send + Sync + '_>> { + Box::pin(async { + let size = match postcard::experimental::serialized_size(&value) { + Ok(size) => size, + Err(e) => { + self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); + return Err(e!( + SendError::Io, + io::Error::new(io::ErrorKind::InvalidData, e) + )); + } + }; + if size as u64 > MAX_MESSAGE_SIZE { + self.send + .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()) + .ok(); + return Err(e!(SendError::MaxMessageSizeExceeded)); + } + let value = value; + self.buffer.clear(); + if let Err(e) = self.buffer.write_length_prefixed(value) { + self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok(); + return Err(e.into()); + } + self.send.write_all(&self.buffer).await?; + self.buffer.clear(); + Ok(()) + }) + } + + fn try_send( + &mut self, + value: T, + ) -> Pin> + Send + Sync + '_>> { + Box::pin(async { + if postcard::experimental::serialized_size(&value)? as u64 > MAX_MESSAGE_SIZE { + return Err(e!(SendError::MaxMessageSizeExceeded)); + } + // todo: move the non-async part out of the box. Will require a new return type. + let value = value; + self.buffer.clear(); + self.buffer.write_length_prefixed(value)?; + let Some(n) = now_or_never(self.send.write(&self.buffer)) else { + return Ok(false); + }; + let n = n?; + self.send.write_all(&self.buffer[n..]).await?; + self.buffer.clear(); + Ok(true) + }) + } + + fn closed(&mut self) -> Pin + Send + Sync + '_>> { + Box::pin(async move { + self.send.stopped().await.ok(); + }) + } +} + +#[derive(Default)] +enum NoqSenderState { + Open(NoqSenderInner), + #[default] + Closed, +} + +struct NoqSender(tokio::sync::Mutex>); + +impl Debug for NoqSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NoqSender").finish() + } +} + +impl DynSender for NoqSender { + fn send(&self, value: T) -> Pin> + Send + '_>> { + Box::pin(async { + let mut guard = self.0.lock().await; + let sender = std::mem::take(guard.deref_mut()); + match sender { + NoqSenderState::Open(mut sender) => { + let res = sender.send(value).await; + if res.is_ok() { + *guard = NoqSenderState::Open(sender); + } + res + } + NoqSenderState::Closed => Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), + } + }) + } + + fn try_send( + &self, + value: T, + ) -> Pin> + Send + '_>> { + Box::pin(async { + let mut guard = self.0.lock().await; + let sender = std::mem::take(guard.deref_mut()); + match sender { + NoqSenderState::Open(mut sender) => { + let res = sender.try_send(value).await; + if res.is_ok() { + *guard = NoqSenderState::Open(sender); + } + res + } + NoqSenderState::Closed => Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), + } + }) + } + + fn closed(&self) -> Pin + Send + Sync + '_>> { + Box::pin(async { + let mut guard = self.0.lock().await; + match guard.deref_mut() { + NoqSenderState::Open(sender) => sender.closed().await, + NoqSenderState::Closed => {} + } + }) + } + + fn is_rpc(&self) -> bool { + true + } +} + +/// Type alias for a handler fn for remote requests +pub type Handler = Arc< + dyn Fn(R, noq::RecvStream, noq::SendStream) -> BoxFuture> + + Send + + Sync + + 'static, +>; + +/// Extension trait to [`Service`] to create a [`Service::Message`] from a [`Service`] +/// and a pair of QUIC streams. +/// +/// This trait is auto-implemented when using the [`crate::rpc_requests`] macro. +pub trait RemoteService: Service + Sized { + /// Returns the message enum for this request by combining `self` (the protocol enum) + /// with a pair of QUIC streams for `tx` and `rx` channels. + fn with_remote_channels(self, rx: noq::RecvStream, tx: noq::SendStream) -> Self::Message; + + /// Creates a [`Handler`] that forwards all messages to a [`LocalSender`]. + fn remote_handler(local_sender: LocalSender) -> Handler { + Arc::new(move |msg, rx, tx| { + // `with_remote_channels` reads the task-local span context installed by + // the dispatch loop, so it must run inside the future (which is polled + // within that scope) rather than eagerly here. + let local_sender = local_sender.clone(); + Box::pin(async move { + let msg = Self::with_remote_channels(msg, rx, tx); + local_sender.send_raw(msg).await + }) + }) + } +} + +/// Utility function to listen for incoming connections and handle them with the provided handler. +/// +/// The wire format used depends on `S::SPAN_PROPAGATION` - if true, span context is expected. +pub async fn listen(endpoint: noq::Endpoint, handler: Handler) { + let mut request_id = 0u64; + let mut tasks = JoinSet::new(); + loop { + let incoming = tokio::select! { + Some(res) = tasks.join_next(), if !tasks.is_empty() => { + res.expect("irpc connection task panicked"); + continue; + } + incoming = endpoint.accept() => { + match incoming { + None => break, + Some(incoming) => incoming + } + } + }; + let handler = handler.clone(); + let fut = async move { + match incoming.await { + Ok(connection) => match handle_connection(connection, handler).await { + Err(err) => warn!("connection closed with error: {err:?}"), + Ok(()) => debug!("connection closed"), + }, + Err(cause) => { + warn!("failed to accept connection: {cause:?}"); + } + }; + }; + let span = error_span!("rpc", id = request_id, remote = tracing::field::Empty); + tasks.spawn(fut.instrument(span)); + request_id += 1; + } +} + +/// Handles a quic connection with the provided `handler`. +/// +/// This function handles requests for a service `S`. The wire format used depends on +/// `S::SPAN_PROPAGATION` - if true, span context is expected in the wire format. +pub async fn handle_connection( + connection: noq::Connection, + handler: Handler, +) -> io::Result<()> { + let remote = connection + .path(PathId::ZERO) + .and_then(|p| p.remote_address().ok()); + if let Some(remote) = remote { + tracing::Span::current().record("remote", tracing::field::display(remote)); + } + debug!("connection accepted"); + loop { + let Some((msg, carrier, rx, tx)) = read_request_inner::(&connection).await? else { + return Ok(()); + }; + crate::span_propagation::scope_remote(carrier, handler(msg, rx, tx)).await?; + } +} + +/// Reads a request from a connection and converts it to a message enum. +/// +/// This combines `read_request_raw` with `RemoteService::with_remote_channels`. +pub async fn read_request( + connection: &noq::Connection, +) -> std::io::Result> { + let Some((msg, carrier, rx, tx)) = read_request_inner::(connection).await? else { + return Ok(None); + }; + Ok(Some( + crate::span_propagation::scope_remote(carrier, async move { + S::with_remote_channels(msg, rx, tx) + }) + .await, + )) +} + +/// Reads a single request from the connection. +/// +/// This accepts a bi-directional stream from the connection and reads and parses the request. +/// +/// When `S::SPAN_PROPAGATION` is true, any propagated span context on the wire is +/// silently dropped. Use [`handle_connection`] (or [`read_request`]) if you need +/// the propagated context to reach the generated handler spans. +/// +/// Returns the parsed request and the stream pair if reading and parsing the request succeeded. +/// Returns None if the remote closed the connection with error code `0`. +/// Returns an error for all other failure cases. +pub async fn read_request_raw( + connection: &noq::Connection, +) -> std::io::Result> { + Ok(read_request_inner::(connection) + .await? + .map(|(msg, _carrier, rx, tx)| (msg, rx, tx))) +} + +/// Internal: read a request and also return the propagated span context carrier. +/// +/// The carrier is `Some` iff `S::SPAN_PROPAGATION` is true and the remote sent one. +async fn read_request_inner( + connection: &noq::Connection, +) -> std::io::Result< + Option<( + S, + Option, + noq::RecvStream, + noq::SendStream, + )>, +> { + let (send, mut recv) = match connection.accept_bi().await { + Ok((s, r)) => (s, r), + Err(ConnectionError::ApplicationClosed(cause)) if cause.error_code.into_inner() == 0 => { + trace!("remote side closed connection {cause:?}"); + return Ok(None); + } + Err(cause) => { + warn!("failed to accept bi stream {cause:?}"); + return Err(cause.into()); + } + }; + let size = recv + .read_varint_u64() + .await? + .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size"))?; + if size > MAX_MESSAGE_SIZE { + connection.close( + ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(), + b"request exceeded max message size", + ); + return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded).into()); + } + let mut buf = vec![0; size as usize]; + recv.read_exact(&mut buf) + .await + .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; + + let (carrier, msg): (Option, S) = + if S::SPAN_PROPAGATION { + postcard::from_bytes(&buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? + } else { + let msg = postcard::from_bytes(&buf) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + (None, msg) + }; + + Ok(Some((msg, carrier, recv, send))) +} diff --git a/src/span_propagation.rs b/src/span_propagation.rs new file mode 100644 index 0000000..a230c95 --- /dev/null +++ b/src/span_propagation.rs @@ -0,0 +1,112 @@ +//! Span context propagation for remote RPC calls +//! +//! This module provides the `SpanContextCarrier` type for propagating trace context +//! across remote boundaries. The type is always available when `rpc` feature is enabled, +//! but actual OpenTelemetry integration requires the `tracing-opentelemetry` feature. +//! +//! The propagated context is scoped to a single request handler via a tokio task-local, +//! installed by the dispatch loop in `handle_connection`. This isolates concurrent +//! requests from each other and is robust to thread migration across `.await` points. + +use std::{collections::HashMap, future::Future}; + +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "tracing-opentelemetry")] +tokio::task_local! { + static SPAN_CONTEXT: opentelemetry::Context; +} + +/// Carrier for propagating span context across RPC boundaries using W3C Trace Context format. +/// +/// This type is always available for serialization purposes. When the +/// `tracing-opentelemetry` feature is enabled, it can extract/inject actual +/// OpenTelemetry trace context. Without that feature, it simply serializes as an +/// empty map. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SpanContextCarrier { + headers: HashMap, +} + +#[cfg(feature = "tracing-opentelemetry")] +impl opentelemetry::propagation::Injector for SpanContextCarrier { + fn set(&mut self, key: &str, value: String) { + self.headers.insert(key.to_string(), value); + } +} + +#[cfg(feature = "tracing-opentelemetry")] +impl opentelemetry::propagation::Extractor for SpanContextCarrier { + fn get(&self, key: &str) -> Option<&str> { + self.headers.get(key).map(|v| v.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.headers.keys().map(|k| k.as_str()).collect() + } +} + +impl SpanContextCarrier { + /// Create a carrier from the current OpenTelemetry context. + /// + /// When `tracing-opentelemetry` feature is enabled, this extracts the current + /// trace context. Without the feature, this returns an empty carrier. + #[cfg(feature = "tracing-opentelemetry")] + pub fn from_current() -> Self { + use opentelemetry::global; + use tracing_opentelemetry::OpenTelemetrySpanExt; + let mut carrier = Self::default(); + // Get the OTel context from the current tracing span, not from + // opentelemetry::Context::current(). The tracing-opentelemetry layer + // stores OTel spans inside tracing spans, so the thread-local OTel + // context won't have the right span. + let ctx = tracing::Span::current().context(); + global::get_text_map_propagator(|prop| { + prop.inject_context(&ctx, &mut carrier); + }); + carrier + } + + #[cfg(not(feature = "tracing-opentelemetry"))] + pub fn from_current() -> Self { + Self::default() + } + + /// Extract an OpenTelemetry context from this carrier. + #[cfg(feature = "tracing-opentelemetry")] + pub fn to_context(&self) -> opentelemetry::Context { + use opentelemetry::global; + global::get_text_map_propagator(|prop| { + prop.extract_with_context(&opentelemetry::Context::current(), self) + }) + } +} + +/// Run `fut` with `carrier`'s context installed as the per-task scope read by +/// [`set_span_parent_from_remote`]. +/// +/// Used by transport implementations (`irpc::rpc`, `irpc-iroh`) to wrap a single +/// request handler. Most users will not call this directly. +pub async fn scope_remote(carrier: Option, fut: F) -> F::Output { + #[cfg(feature = "tracing-opentelemetry")] + if let Some(carrier) = carrier { + return SPAN_CONTEXT.scope(carrier.to_context(), fut).await; + } + let _ = carrier; + fut.await +} + +/// Set the parent of a span from the propagated remote context, if one is in scope. +/// +/// Called by the code generated by `rpc_requests(span_propagation)`. Looks up the +/// task-local installed by the dispatch loop; no-op outside that scope. +pub fn set_span_parent_from_remote(span: &tracing::Span) { + #[cfg(feature = "tracing-opentelemetry")] + { + let _ = SPAN_CONTEXT.try_with(|ctx| { + use tracing_opentelemetry::OpenTelemetrySpanExt; + let _ = span.set_parent(ctx.clone()); + }); + } + let _ = span; +} From 055dbe47fc3f0232ff27116f0f1944951dc5b253 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 16 Jun 2026 14:02:16 +0200 Subject: [PATCH 2/2] chore: fmt --- src/rpc.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpc.rs b/src/rpc.rs index 49a05f1..44757e6 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,5 +1,4 @@ //! Module for cross-process RPC using [`noq`]. -//! use std::{ fmt::Debug, future::Future, io, marker::PhantomData, ops::DerefMut, pin::Pin, sync::Arc, };