From b7a4d3663e904d5d9b20fa3d2fa28295aacfb4f5 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 10 Jun 2026 18:29:37 +0200 Subject: [PATCH 1/4] feat: reset_stream_at TransportParameter --- noq-proto/src/config/mod.rs | 23 +++++++++++------------ noq-proto/src/frame.rs | 2 ++ noq-proto/src/transport_parameters.rs | 26 +++++++++++++++++++++++++- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/noq-proto/src/config/mod.rs b/noq-proto/src/config/mod.rs index 32b944223..ffacae1d3 100644 --- a/noq-proto/src/config/mod.rs +++ b/noq-proto/src/config/mod.rs @@ -36,13 +36,15 @@ pub use transport::{AckFrequencyConfig, IdleTimeout, MtuDiscoveryConfig, Transpo /// Global configuration for the endpoint, affecting all connections /// /// Default values should be suitable for most internet applications. -#[derive(Clone)] +#[derive(Clone, derive_more::Debug)] pub struct EndpointConfig { + #[debug("HmacKey")] pub(crate) reset_key: Arc, pub(crate) max_udp_payload_size: VarInt, /// CID generator factory /// /// Create a cid generator for local cid in Endpoint struct + #[debug("ConnectionIdGenerator")] pub(crate) connection_id_generator_factory: Arc Box + Send + Sync>, pub(crate) supported_versions: Vec, @@ -51,6 +53,7 @@ pub struct EndpointConfig { pub(crate) min_reset_interval: Duration, /// Optional seed to be used internally for random number generation pub(crate) rng_seed: Option<[u8; 32]>, + pub(crate) reset_stream_at: bool, } impl EndpointConfig { @@ -66,6 +69,7 @@ impl EndpointConfig { grease_quic_bit: true, min_reset_interval: Duration::from_millis(20), rng_seed: None, + reset_stream_at: true, } } @@ -162,18 +166,13 @@ impl EndpointConfig { self.rng_seed = seed; self } -} -impl fmt::Debug for EndpointConfig { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("EndpointConfig") - // reset_key not debug - .field("max_udp_payload_size", &self.max_udp_payload_size) - // cid_generator_factory not debug - .field("supported_versions", &self.supported_versions) - .field("grease_quic_bit", &self.grease_quic_bit) - .field("rng_seed", &self.rng_seed) - .finish_non_exhaustive() + /// Enables the QUIC Stream Resets with Partial Delivery draft-07 extentsion. + /// + /// + pub fn reliable_stream_reset(&mut self, value: bool) -> &mut Self { + self.reset_stream_at = value; + self } } diff --git a/noq-proto/src/frame.rs b/noq-proto/src/frame.rs index 51ebd3bef..b7fa6c020 100644 --- a/noq-proto/src/frame.rs +++ b/noq-proto/src/frame.rs @@ -133,6 +133,8 @@ pub enum FrameType { ReachOutAtIpv6, #[assoc(to_u64 = 0x3d7f94)] RemoveAddress, + // #[assoc(to_u64 = 0x24)] + // ResetStreamAt, } /// Encounter a frame ID that was not valid. diff --git a/noq-proto/src/transport_parameters.rs b/noq-proto/src/transport_parameters.rs index 7dc2907c9..b721c2071 100644 --- a/noq-proto/src/transport_parameters.rs +++ b/noq-proto/src/transport_parameters.rs @@ -122,6 +122,9 @@ macro_rules! make_struct { /// Nat traversal draft pub max_remote_nat_traversal_addresses: Option, + + /// QUIC Stream Resets with Partial Delivery + pub(crate) reset_stream_at: bool, } // We deliberately don't implement the `Default` trait, since that would be public, and @@ -148,6 +151,7 @@ macro_rules! make_struct { address_discovery_role: address_discovery::Role::default(), initial_max_path_id: None, max_remote_nat_traversal_addresses: None, + reset_stream_at: false, } } } @@ -198,6 +202,7 @@ impl TransportParameters { address_discovery_role: config.address_discovery_role, initial_max_path_id: config.get_initial_max_path_id(), max_remote_nat_traversal_addresses: config.max_remote_nat_traversal_addresses, + reset_stream_at: endpoint_config.reset_stream_at, ..Self::default() } } @@ -216,6 +221,7 @@ impl TransportParameters { || cached.grease_quic_bit && !self.grease_quic_bit || cached.address_discovery_role != self.address_discovery_role || cached.max_remote_nat_traversal_addresses != self.max_remote_nat_traversal_addresses + || cached.reset_stream_at != self.reset_stream_at { return Err(TransportError::PROTOCOL_VIOLATION( "0-RTT accepted with incompatible transport parameters", @@ -423,6 +429,12 @@ impl TransportParameters { w.write(val.get()); } } + TransportParameterId::ResetStreamAt => { + if self.reset_stream_at { + w.write_var(id as u64); + w.write_var(0); + } + } id => { macro_rules! write_params { {$($(#[$doc:meta])* $name:ident ($id:ident) = $default:expr,)*} => { @@ -559,6 +571,12 @@ impl TransportParameters { params.max_remote_nat_traversal_addresses = Some(value); } + TransportParameterId::ResetStreamAt => { + if len != 0 || params.reset_stream_at { + return Err(Error::Malformed); + } + params.reset_stream_at = true; + } _ => { macro_rules! parse { {$($(#[$doc:meta])* $name:ident ($id:ident) = $default:expr,)*} => { @@ -731,11 +749,14 @@ pub(crate) enum TransportParameterId { // inspired by https://www.ietf.org/archive/id/draft-seemann-quic-nat-traversal-02.html, // simplified to n0's own protocol. N0NatTraversal = 0x3d7f91120401, + + // https://datatracker.ietf.org/doc/html/draft-ietf-quic-reliable-stream-reset + ResetStreamAt = 0x17f7586d2cb571, } impl TransportParameterId { /// Array with all supported transport parameter IDs - const SUPPORTED: [Self; 24] = [ + const SUPPORTED: [Self; 25] = [ Self::MaxIdleTimeout, Self::MaxUdpPayloadSize, Self::InitialMaxData, @@ -760,6 +781,7 @@ impl TransportParameterId { Self::ObservedAddr, Self::InitialMaxPathId, Self::N0NatTraversal, + Self::ResetStreamAt, ]; } @@ -802,6 +824,7 @@ impl TryFrom for TransportParameterId { id if Self::ObservedAddr == id => Self::ObservedAddr, id if Self::InitialMaxPathId == id => Self::InitialMaxPathId, id if Self::N0NatTraversal == id => Self::N0NatTraversal, + id if Self::ResetStreamAt == id => Self::ResetStreamAt, _ => return Err(()), }; Ok(param) @@ -843,6 +866,7 @@ mod test { address_discovery_role: address_discovery::Role::send_only(), initial_max_path_id: Some(PathId::MAX), max_remote_nat_traversal_addresses: Some(5u8.try_into().unwrap()), + reset_stream_at: true, ..TransportParameters::default() }; params.write(&mut buf); From a03d4ffa3f90df0825ba973f17051e0eac023707 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 10 Jun 2026 20:49:50 +0200 Subject: [PATCH 2/4] Add the frame type itself --- noq-proto/src/connection/mod.rs | 6 ++ noq-proto/src/connection/qlog.rs | 12 +++ noq-proto/src/connection/stats.rs | 134 ++++++++++----------------- noq-proto/src/frame.rs | 57 +++++++++++- noq-proto/src/tests/encode_decode.rs | 1 + 5 files changed, 123 insertions(+), 87 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 121a033cb..1400ae0e5 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -5099,6 +5099,9 @@ impl Connection { self.spaces[SpaceId::Data].pending.max_data = true; } } + Frame::ResetStreamAt(frame) => { + todo!(); + } Frame::DataBlocked(DataBlocked(offset)) => { debug!(offset, "peer claims to be blocked at connection level"); } @@ -7725,6 +7728,9 @@ impl SentFrames { StreamsBlocked(streams_blocked) => { self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true } + ResetStreamAt(reset_stream_at) => { + todo!(); + } } } } diff --git a/noq-proto/src/connection/qlog.rs b/noq-proto/src/connection/qlog.rs index 19f2f696d..155dd60c4 100644 --- a/noq-proto/src/connection/qlog.rs +++ b/noq-proto/src/connection/qlog.rs @@ -989,6 +989,17 @@ impl ToQlog for frame::StreamMetaEncoder { } } +#[cfg(feature = "qlog")] +impl ToQlog for frame::ResetStreamAt { + fn to_qlog(&self) -> QuicFrame { + // TODO: Teach qlog about this frame type. + QuicFrame::Unknown { + frame_type_bytes: Some(self.get_type().to_u64()), + raw: None, + } + } +} + #[cfg(feature = "qlog")] impl Frame { /// Converts a [`crate::Frame`] into a [`QuicFrame`]. @@ -1077,6 +1088,7 @@ impl Frame { Self::AddAddress(f) => f.to_qlog(), Self::ReachOut(f) => f.to_qlog(), Self::RemoveAddress(f) => f.to_qlog(), + Self::ResetStreamAt(f) => f.to_qlog(), } } } diff --git a/noq-proto/src/connection/stats.rs b/noq-proto/src/connection/stats.rs index dfaf02748..a0626bddd 100644 --- a/noq-proto/src/connection/stats.rs +++ b/noq-proto/src/connection/stats.rs @@ -33,47 +33,95 @@ impl UdpStats { } /// Number of frames transmitted or received of each frame type. -#[derive(Default, Copy, Clone, PartialEq, Eq, derive_more::Add, derive_more::AddAssign)] +#[derive( + Default, + Copy, + Clone, + PartialEq, + Eq, + derive_more::Debug, + derive_more::Add, + derive_more::AddAssign, +)] #[non_exhaustive] #[allow(missing_docs)] pub struct FrameStats { + #[debug("ACK")] pub acks: u64, + #[debug("PATH_ACK")] pub path_acks: u64, + #[debug("ACK_FREQUENCY")] pub ack_frequency: u64, + #[debug("CRYPTO")] pub crypto: u64, + #[debug("CONNECTION_CLOSE")] pub connection_close: u64, + #[debug("DATA_BLOCKED")] pub data_blocked: u64, + #[debug("DATAGRAM")] pub datagram: u64, + #[debug("HANDSHAKE_DONE")] pub handshake_done: u8, + #[debug("IMMEDIATE_ACK")] pub immediate_ack: u64, + #[debug("MAX_DATA")] pub max_data: u64, + #[debug("MAX_STREAM_DATA")] pub max_stream_data: u64, + #[debug("MAX_STREAMS_BIDI")] pub max_streams_bidi: u64, + #[debug("MAX_STREAMS_UNI")] pub max_streams_uni: u64, + #[debug("NEW_CONNECTION_ID")] pub new_connection_id: u64, + #[debug("PATH_NEW_CONNECTION_ID")] pub path_new_connection_id: u64, + #[debug("NEW_TOKEN")] pub new_token: u64, + #[debug("PATH_CHALLENGE")] pub path_challenge: u64, + #[debug("PATH_RESPONSE")] pub path_response: u64, + #[debug("PING")] pub ping: u64, + #[debug("RESET_STREAM")] pub reset_stream: u64, + #[debug("RETIRE_CONNECTION_ID")] pub retire_connection_id: u64, + #[debug("PATH_RETIRE_CONNECTION_ID")] pub path_retire_connection_id: u64, + #[debug("STREAM_DATA_BLOCKED")] pub stream_data_blocked: u64, + #[debug("STREAMS_BLOCKED_BIDI")] pub streams_blocked_bidi: u64, + #[debug("STREAMS_BLOCKED_UNI")] pub streams_blocked_uni: u64, + #[debug("STOP_SENDING")] pub stop_sending: u64, + #[debug("STREAM")] pub stream: u64, + #[debug("OBSERVED_ADDR")] pub observed_addr: u64, + #[debug("PATH_ABANDON")] pub path_abandon: u64, + #[debug("PATH_STATUS_AVAILABLE")] pub path_status_available: u64, + #[debug("PATH_STATUS_BACKUP")] pub path_status_backup: u64, + #[debug("MAX_PATH_ID")] pub max_path_id: u64, + #[debug("PATHS_BLOCKED")] pub paths_blocked: u64, + #[debug("PATH_CIDS_BLOCKED")] pub path_cids_blocked: u64, + #[debug("ADD_ADDRESS")] pub add_address: u64, + #[debug("REACH_OUT")] pub reach_out: u64, + #[debug("REMOVE_ADDRESS")] pub remove_address: u64, + #[debug("RESET_STREAM_AT")] + pub reset_stream_at: u64, } impl FrameStats { @@ -122,93 +170,11 @@ impl FrameStats { AddIpv4Address | AddIpv6Address => inc!(add_address), ReachOutAtIpv4 | ReachOutAtIpv6 => inc!(reach_out), RemoveAddress => inc!(remove_address), + ResetStreamAt => inc!(reset_stream_at), }; } } -impl std::fmt::Debug for FrameStats { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - acks, - path_acks, - ack_frequency, - crypto, - connection_close, - data_blocked, - datagram, - handshake_done, - immediate_ack, - max_data, - max_stream_data, - max_streams_bidi, - max_streams_uni, - new_connection_id, - path_new_connection_id, - new_token, - path_challenge, - path_response, - ping, - reset_stream, - retire_connection_id, - path_retire_connection_id, - stream_data_blocked, - streams_blocked_bidi, - streams_blocked_uni, - stop_sending, - stream, - observed_addr, - path_abandon, - path_status_available, - path_status_backup, - max_path_id, - paths_blocked, - path_cids_blocked, - add_address, - reach_out, - remove_address, - } = self; - f.debug_struct("FrameStats") - .field("ACK", acks) - .field("ACK_FREQUENCY", ack_frequency) - .field("CONNECTION_CLOSE", connection_close) - .field("CRYPTO", crypto) - .field("DATA_BLOCKED", data_blocked) - .field("DATAGRAM", datagram) - .field("HANDSHAKE_DONE", handshake_done) - .field("IMMEDIATE_ACK", immediate_ack) - .field("MAX_DATA", max_data) - .field("MAX_PATH_ID", max_path_id) - .field("MAX_STREAM_DATA", max_stream_data) - .field("MAX_STREAMS_BIDI", max_streams_bidi) - .field("MAX_STREAMS_UNI", max_streams_uni) - .field("NEW_CONNECTION_ID", new_connection_id) - .field("NEW_TOKEN", new_token) - .field("PATHS_BLOCKED", paths_blocked) - .field("PATH_ABANDON", path_abandon) - .field("PATH_ACK", path_acks) - .field("PATH_STATUS_AVAILABLE", path_status_available) - .field("PATH_STATUS_BACKUP", path_status_backup) - .field("PATH_CHALLENGE", path_challenge) - .field("PATH_CIDS_BLOCKED", path_cids_blocked) - .field("PATH_NEW_CONNECTION_ID", path_new_connection_id) - .field("PATH_RESPONSE", path_response) - .field("PATH_RETIRE_CONNECTION_ID", path_retire_connection_id) - .field("PING", ping) - .field("RESET_STREAM", reset_stream) - .field("RETIRE_CONNECTION_ID", retire_connection_id) - .field("STREAM_DATA_BLOCKED", stream_data_blocked) - .field("STREAMS_BLOCKED_BIDI", streams_blocked_bidi) - .field("STREAMS_BLOCKED_UNI", streams_blocked_uni) - .field("STOP_SENDING", stop_sending) - .field("STREAM", stream) - .field("OBSERVED_ADDRESS", observed_addr) - .field("ADD_ADDRESS", add_address) - .field("REACH_OUT", reach_out) - .field("REMOVE_ADDRESS", remove_address) - .finish() - } -} - /// Statistics related to a transmission path. #[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] #[non_exhaustive] diff --git a/noq-proto/src/frame.rs b/noq-proto/src/frame.rs index b7fa6c020..f8a06e7b0 100644 --- a/noq-proto/src/frame.rs +++ b/noq-proto/src/frame.rs @@ -133,8 +133,8 @@ pub enum FrameType { ReachOutAtIpv6, #[assoc(to_u64 = 0x3d7f94)] RemoveAddress, - // #[assoc(to_u64 = 0x24)] - // ResetStreamAt, + #[assoc(to_u64 = 0x24)] + ResetStreamAt, } /// Encounter a frame ID that was not valid. @@ -219,6 +219,7 @@ pub(super) enum EncodableFrame<'a> { MaxStreamData(MaxStreamData), MaxStreams(MaxStreams), StreamsBlocked(StreamsBlocked), + ResetStreamAt(ResetStreamAt), } impl<'a> EncodableFrame<'a> { @@ -253,7 +254,8 @@ impl<'a> EncodableFrame<'a> { | EncodableFrame::MaxData(_) | EncodableFrame::MaxStreamData(_) | EncodableFrame::MaxStreams(_) - | EncodableFrame::StreamsBlocked(_) => true, + | EncodableFrame::StreamsBlocked(_) + | EncodableFrame::ResetStreamAt(_) => true, } } } @@ -465,6 +467,7 @@ pub(crate) enum Frame { AddAddress(AddAddress), ReachOut(ReachOut), RemoveAddress(RemoveAddress), + ResetStreamAt(ResetStreamAt), } impl Frame { @@ -516,6 +519,7 @@ impl Frame { AddAddress(frame) => frame.get_type(), ReachOut(frame) => frame.get_type(), RemoveAddress(_) => self::RemoveAddress::TYPE, + ResetStreamAt(_) => FrameType::ResetStreamAt, } } @@ -1709,6 +1713,18 @@ impl Iter { self.take_remaining() }, }), + FrameType::ResetStreamAt => { + let frame = ResetStreamAt { + id: self.bytes.get()?, + error_code: self.bytes.get()?, + final_offset: self.bytes.get()?, + reliable_size: self.bytes.get()?, + }; + if frame.reliable_size > frame.final_offset { + return Err(IterErr::Malformed); + } + Frame::ResetStreamAt(frame) + } }) } @@ -2494,6 +2510,41 @@ impl Encodable for RemoveAddress { } } +/// RESET_STREAM_AT frame. +/// +/// +// #[allow(unreachable_pub)] // fuzzing only +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] +#[cfg_attr(test, derive(Arbitrary))] +#[derive(Debug, Copy, Clone, derive_more::Display)] +#[display("RESET_STREAM id: {id}")] +pub(crate) struct ResetStreamAt { + pub(crate) id: StreamId, + pub(crate) error_code: VarInt, + pub(crate) final_offset: VarInt, + pub(crate) reliable_size: VarInt, +} + +impl ResetStreamAt { + pub(crate) const fn get_type(&self) -> FrameType { + FrameType::ResetStreamAt + } +} + +impl FrameStruct for ResetStreamAt { + const SIZE_BOUND: usize = 1 + 8 + 8 + 8 + 8; +} + +impl Encodable for ResetStreamAt { + fn encode(&self, out: &mut W) { + out.write(FrameType::ResetStream); // 1 byte + out.write(self.id); // <= 8 bytes + out.write(self.error_code); // <= 8 bytes + out.write(self.final_offset); // <= 8 bytes + out.write(self.reliable_size); // <= 8 bytes + } +} + /// Helper struct for display implementations. // NOTE: Due to lifetimes in fmt::Arguments it's not possible to make this a simple function that // avoids allocations. diff --git a/noq-proto/src/tests/encode_decode.rs b/noq-proto/src/tests/encode_decode.rs index 69aa3e6ff..f822be908 100644 --- a/noq-proto/src/tests/encode_decode.rs +++ b/noq-proto/src/tests/encode_decode.rs @@ -58,6 +58,7 @@ fn encode_frame(frame: &Frame, buf: &mut B) { Frame::AddAddress(aa) => aa.encode(buf), Frame::ReachOut(ro) => ro.encode(buf), Frame::RemoveAddress(ra) => ra.encode(buf), + Frame::ResetStreamAt(f) => f.encode(buf), } } From c39c57eb8ad5a9a88110c16ea4d6816f944b30a3 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 10 Jun 2026 21:33:42 +0200 Subject: [PATCH 3/4] impl retransmit of RESET_STREAM_AT --- noq-proto/src/connection/spaces.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/noq-proto/src/connection/spaces.rs b/noq-proto/src/connection/spaces.rs index d6b901f1c..4803df9c1 100644 --- a/noq-proto/src/connection/spaces.rs +++ b/noq-proto/src/connection/spaces.rs @@ -558,6 +558,7 @@ pub struct Retransmits { pub(super) remove_address: BTreeSet, /// Round and local addresses to advertise in `REACH_OUT` frames pub(super) reach_out: PendingReachOutFrames, + pub(super) reset_stream_at: Option, } impl Retransmits { @@ -584,6 +585,7 @@ impl Retransmits { add_address, remove_address, reach_out, + reset_stream_at, } = &self; !max_data && !max_stream_id.iter().any(|x| *x) @@ -608,6 +610,7 @@ impl Retransmits { && add_address.is_empty() && remove_address.is_empty() && reach_out.is_empty() + && reset_stream_at.is_none() } } @@ -635,6 +638,7 @@ impl ::std::ops::BitOrAssign for Retransmits { add_address, remove_address, mut reach_out, + reset_stream_at, } = rhs; // We reduce in-stream head-of-line blocking by queueing retransmits before other data for @@ -664,6 +668,12 @@ impl ::std::ops::BitOrAssign for Retransmits { self.add_address.extend(add_address.iter().copied()); self.remove_address.extend(remove_address.iter().copied()); self.reach_out.append(&mut reach_out); + self.reset_stream_at = match (self.reset_stream_at, reset_stream_at) { + (None, None) => None, + (None, Some(v)) => Some(v), + (Some(v), None) => Some(v), + (Some(l), Some(r)) => Some(l.min(r)), + }; } } From a4a97fdebe9e21dfd358e222440417d7da674091 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 11 Jun 2026 17:16:42 +0200 Subject: [PATCH 4/4] allow sending the RESET_STREAM_AT --- noq-proto/src/connection/mod.rs | 12 ++++--- noq-proto/src/connection/send_buffer.rs | 38 ++++++++++++++++++++- noq-proto/src/connection/spaces.rs | 12 +++---- noq-proto/src/connection/streams/mod.rs | 42 ++++++++++++++++++++++++ noq-proto/src/connection/streams/send.rs | 25 ++++++++++++++ noq-proto/src/lib.rs | 4 +-- noq/src/send_stream.rs | 31 ++++++++++++++--- 7 files changed, 145 insertions(+), 19 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 1400ae0e5..44ae7d52e 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -98,8 +98,8 @@ pub use streams::StreamsState; #[cfg(not(fuzzing))] use streams::StreamsState; pub use streams::{ - Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream, - ShouldTransmit, StreamEvent, Streams, WriteError, + Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, ResetStreamAtError, + SendStream, ShouldTransmit, StreamEvent, Streams, WriteError, }; mod timer; @@ -7728,8 +7728,12 @@ impl SentFrames { StreamsBlocked(streams_blocked) => { self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true } - ResetStreamAt(reset_stream_at) => { - todo!(); + ResetStreamAt(frame) => { + self.retransmits_mut().reset_stream_at.push(( + frame.id, + frame.final_offset, + frame.error_code, + )); } } } diff --git a/noq-proto/src/connection/send_buffer.rs b/noq-proto/src/connection/send_buffer.rs index a09ddd198..c53a759de 100644 --- a/noq-proto/src/connection/send_buffer.rs +++ b/noq-proto/src/connection/send_buffer.rs @@ -94,6 +94,37 @@ impl SendBufferData { } } + /// Discard data from the end of the buffer. + /// + /// Calling this with offset outside of [`Self::range`] is essentially a no-op since + /// nothing needs to be truncated. + fn truncate(&mut self, offset: u64) { + if !self.range().contains(&offset) { + return; + } + + // clear truncated data + let mut n = self.offset; + for segment in self.segments.iter_mut() { + if (n + segment.len() as u64) < offset { + n += segment.len() as u64; + } else if n < offset { + segment.truncate((offset - n) as usize); + n += offset - n; + } else { + segment.clear(); + } + } + + // remove empty segments + self.segments.retain(|s| !s.is_empty()); + + // shrink segments if we have a lot of unused capacity + if self.segments.len() * 4 < self.segments.capacity() { + self.segments.shrink_to_fit(); + } + } + /// Discard data from the front of the buffer /// /// Calling this with n > len() is allowed and will simply clear the buffer. @@ -232,6 +263,11 @@ impl SendBuffer { self.retransmits.remove(0..self.fully_acked_offset()); } + pub(super) fn truncate(&mut self, offset: u64) { + self.data.truncate(offset); + self.retransmits.remove(offset..self.offset()); + } + /// Compute the next range to transmit on this stream and update state to account for that /// transmission. /// @@ -324,7 +360,7 @@ impl SendBuffer { } /// Offset up to which all data has been acknowledged - fn fully_acked_offset(&self) -> u64 { + pub(super) fn fully_acked_offset(&self) -> u64 { self.data.range().start } diff --git a/noq-proto/src/connection/spaces.rs b/noq-proto/src/connection/spaces.rs index 4803df9c1..573ace341 100644 --- a/noq-proto/src/connection/spaces.rs +++ b/noq-proto/src/connection/spaces.rs @@ -558,7 +558,8 @@ pub struct Retransmits { pub(super) remove_address: BTreeSet, /// Round and local addresses to advertise in `REACH_OUT` frames pub(super) reach_out: PendingReachOutFrames, - pub(super) reset_stream_at: Option, + /// Pending RESET_STREAM_AT frames: (StreamId, offset, error_code). + pub(super) reset_stream_at: Vec<(StreamId, VarInt, VarInt)>, } impl Retransmits { @@ -610,7 +611,7 @@ impl Retransmits { && add_address.is_empty() && remove_address.is_empty() && reach_out.is_empty() - && reset_stream_at.is_none() + && reset_stream_at.is_empty() } } @@ -668,12 +669,7 @@ impl ::std::ops::BitOrAssign for Retransmits { self.add_address.extend(add_address.iter().copied()); self.remove_address.extend(remove_address.iter().copied()); self.reach_out.append(&mut reach_out); - self.reset_stream_at = match (self.reset_stream_at, reset_stream_at) { - (None, None) => None, - (None, Some(v)) => Some(v), - (Some(v), None) => Some(v), - (Some(l), Some(r)) => Some(l.min(r)), - }; + self.reset_stream_at.extend_from_slice(&reset_stream_at); } } diff --git a/noq-proto/src/connection/streams/mod.rs b/noq-proto/src/connection/streams/mod.rs index 4a3f49d75..471cadde7 100644 --- a/noq-proto/src/connection/streams/mod.rs +++ b/noq-proto/src/connection/streams/mod.rs @@ -356,6 +356,30 @@ impl<'a> SendStream<'a> { Ok(()) } + /// Abandon transmitting data on a stream, deliver reliably up to `offset`. + pub fn reset_at( + &mut self, + offset: VarInt, + error_code: VarInt, + ) -> Result<(), ResetStreamAtError> { + let max_send_data = self.state.max_send_data(self.id); + let stream = self + .state + .send + .get_mut(&self.id) + .map(get_or_insert_send(max_send_data)) + .ok_or(ResetStreamAtError::ClosedStream)?; + + if matches!(stream.state, SendState::ResetSent) { + return Err(ResetStreamAtError::ClosedStream); + } + stream.reset_at(offset)?; + self.pending + .reset_stream_at + .push((self.id, offset, error_code)); + Ok(()) + } + /// Set the priority of a stream /// /// # Panics @@ -542,6 +566,24 @@ impl From for io::Error { } } +/// Errors for resetting a stream with partial delivery. +#[derive(Debug, Error, Clone, PartialEq, Eq)] +pub enum ResetStreamAtError { + /// The stream has already been stopped. + /// + /// The peer is no longer accepting data on this stream. + /// + /// Carries an application-defined error code. + #[error("stopped by peer: code {0}")] + Stopped(VarInt), + /// The stream has already been finished or reset. + #[error("closed stream")] + ClosedStream, + /// The reliable size is larger than the number of bytes sent on the stream. + #[error("invalid reliable size")] + InvalidReliableSize, +} + #[derive(Debug, Copy, Clone, Eq, PartialEq)] enum StreamHalf { Send, diff --git a/noq-proto/src/connection/streams/send.rs b/noq-proto/src/connection/streams/send.rs index cc30c2c8d..f99202a59 100644 --- a/noq-proto/src/connection/streams/send.rs +++ b/noq-proto/src/connection/streams/send.rs @@ -7,6 +7,8 @@ use crate::{ frame, }; +use super::ResetStreamAtError; + #[derive(Debug)] pub(super) struct Send { pub(super) max_data: u64, @@ -53,6 +55,29 @@ impl Send { } } + pub(super) fn reset_at(&mut self, offset: VarInt) -> Result<(), ResetStreamAtError> { + if let Some(error_code) = self.stop_reason { + return Err(ResetStreamAtError::Stopped(error_code)); + } + match self.state { + SendState::Ready | SendState::DataSent { .. } if offset.0 >= self.pending.offset() => { + Err(ResetStreamAtError::InvalidReliableSize) + } + SendState::Ready => { + self.state = SendState::DataSent { + finish_acked: false, + }; + self.pending.truncate(offset.0); + Ok(()) + } + SendState::DataSent { .. } => { + self.pending.truncate(offset.0); + Ok(()) + } + SendState::ResetSent => Err(ResetStreamAtError::ClosedStream), + } + } + pub(super) fn write<'a, S: BytesSource<'a>>( &mut self, source: &'a mut S, diff --git a/noq-proto/src/lib.rs b/noq-proto/src/lib.rs index 85bfdcf9e..7d6a181bc 100644 --- a/noq-proto/src/lib.rs +++ b/noq-proto/src/lib.rs @@ -45,8 +45,8 @@ pub use crate::connection::{ Chunk, Chunks, ClosePathError, ClosedPath, ClosedStream, Connection, ConnectionError, ConnectionStats, Datagrams, Event, FinishError, FrameStats, MultipathNotNegotiated, NetworkChangeHint, PathAbandonReason, PathError, PathEvent, PathId, PathStats, PathStatus, - ReadError, ReadableError, RecvStream, RttEstimator, SendDatagramError, SendStream, - SetPathStatusError, ShouldTransmit, StreamEvent, Streams, UdpStats, WriteError, + ReadError, ReadableError, RecvStream, ResetStreamAtError, RttEstimator, SendDatagramError, + SendStream, SetPathStatusError, ShouldTransmit, StreamEvent, Streams, UdpStats, WriteError, }; #[cfg(test)] use test_strategy::Arbitrary; diff --git a/noq/src/send_stream.rs b/noq/src/send_stream.rs index a46771a01..534e32d85 100644 --- a/noq/src/send_stream.rs +++ b/noq/src/send_stream.rs @@ -7,7 +7,7 @@ use std::{ use bytes::Bytes; use pin_project_lite::pin_project; -use proto::{ClosedStream, ConnectionError, FinishError, StreamId}; +use proto::{ClosedStream, ConnectionError, FinishError, ResetStreamAtError, StreamId}; use thiserror::Error; use tokio::sync::futures::OwnedNotified; @@ -79,7 +79,7 @@ impl SendStream { Ok(()) } - /// Writes [`Bytes`] from a slice of buffers into this stream, returning how many bytes were. + /// Writes [`Bytes`] from a slice of buffers into this stream, returning how many bytes were. /// written /// /// Bytes to try to write are provided to this method as an array of cheaply cloneable chunks. @@ -103,7 +103,7 @@ impl SendStream { poll_fn(|cx| self.execute_poll(cx, |s| s.write_chunks(bufs))).await } - /// Writes a single [`Bytes`] into this stream in its entirety. + /// Writes a single [`Bytes`] into this stream in its entirety. /// /// Bytes to write are provided to this method as a single cheaply cloneable chunk. This /// method repeatedly calls [`write_many_chunks`](Self::write_many_chunks) until all bytes @@ -117,7 +117,7 @@ impl SendStream { self.write_all_chunks(&mut [buf]).await } - /// Writes a slice of [`Bytes`] into this stream in its entirety. + /// Writes a slice of [`Bytes`] into this stream in its entirety. /// /// Bytes to write are provided to this method as an array of cheaply cloneable chunks. This /// method repeatedly calls [`write_many_chunks`](Self::write_many_chunks) until all bytes are @@ -219,6 +219,29 @@ impl SendStream { Ok(()) } + /// Close the send stream immediately, delivering data up to `offset`. + /// + /// No new data can be written after calling this method. Data up to `offset` is still + /// reliably delivered, but any remaining stream data may never be transmitted or lost. + /// + /// Fails if [`Self::finish`], [`Self::reset`] or [`Self::reset_at`] was previously + /// called, of if the remote stopped the stream. + pub fn reset_at( + &mut self, + offset: VarInt, + error_code: VarInt, + ) -> Result<(), ResetStreamAtError> { + let mut conn = self.conn.lock_and_wake("SendStream::reset_at"); + if self.is_0rtt && conn.check_0rtt().is_err() { + conn.skip_waking(); + return Ok(()); + } + conn.inner + .send_stream(self.stream) + .reset_at(offset, error_code)?; + Ok(()) + } + /// Set the priority of the send stream /// /// Every send stream has an initial priority of 0. Locally buffered data from streams with