Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions noq-proto/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ pub use transport::{AckFrequencyConfig, IdleTimeout, MtuDiscoveryConfig, Transpo
/// Global configuration for the endpoint, affecting all connections
///
/// Default values should be suitable for most internet applications.
#[derive(Clone)]
#[derive(Clone, derive_more::Debug)]
pub struct EndpointConfig {
#[debug("HmacKey")]
pub(crate) reset_key: Arc<dyn HmacKey>,
pub(crate) max_udp_payload_size: VarInt,
/// CID generator factory
///
/// Create a cid generator for local cid in Endpoint struct
#[debug("ConnectionIdGenerator")]
pub(crate) connection_id_generator_factory:
Arc<dyn Fn() -> Box<dyn ConnectionIdGenerator> + Send + Sync>,
pub(crate) supported_versions: Vec<u32>,
Expand All @@ -51,6 +53,7 @@ pub struct EndpointConfig {
pub(crate) min_reset_interval: Duration,
/// Optional seed to be used internally for random number generation
pub(crate) rng_seed: Option<[u8; 32]>,
pub(crate) reset_stream_at: bool,
}

impl EndpointConfig {
Expand All @@ -66,6 +69,7 @@ impl EndpointConfig {
grease_quic_bit: true,
min_reset_interval: Duration::from_millis(20),
rng_seed: None,
reset_stream_at: true,
}
}

Expand Down Expand Up @@ -162,18 +166,13 @@ impl EndpointConfig {
self.rng_seed = seed;
self
}
}

impl fmt::Debug for EndpointConfig {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("EndpointConfig")
// reset_key not debug
.field("max_udp_payload_size", &self.max_udp_payload_size)
// cid_generator_factory not debug
.field("supported_versions", &self.supported_versions)
.field("grease_quic_bit", &self.grease_quic_bit)
.field("rng_seed", &self.rng_seed)
.finish_non_exhaustive()
/// Enables the QUIC Stream Resets with Partial Delivery draft-07 extentsion.
///
/// <https://datatracker.ietf.org/doc/html/draft-ietf-quic-reliable-stream-reset>
pub fn reliable_stream_reset(&mut self, value: bool) -> &mut Self {
self.reset_stream_at = value;
self
}
}

Expand Down
14 changes: 12 additions & 2 deletions noq-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ pub use streams::StreamsState;
#[cfg(not(fuzzing))]
use streams::StreamsState;
pub use streams::{
Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
ShouldTransmit, StreamEvent, Streams, WriteError,
Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, ResetStreamAtError,
SendStream, ShouldTransmit, StreamEvent, Streams, WriteError,
};

mod timer;
Expand Down Expand Up @@ -5099,6 +5099,9 @@ impl Connection {
self.spaces[SpaceId::Data].pending.max_data = true;
}
}
Frame::ResetStreamAt(frame) => {
todo!();
}
Frame::DataBlocked(DataBlocked(offset)) => {
debug!(offset, "peer claims to be blocked at connection level");
}
Expand Down Expand Up @@ -7725,6 +7728,13 @@ impl SentFrames {
StreamsBlocked(streams_blocked) => {
self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true
}
ResetStreamAt(frame) => {
self.retransmits_mut().reset_stream_at.push((
frame.id,
frame.final_offset,
frame.error_code,
));
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions noq-proto/src/connection/qlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,17 @@ impl ToQlog for frame::StreamMetaEncoder {
}
}

#[cfg(feature = "qlog")]
impl ToQlog for frame::ResetStreamAt {
fn to_qlog(&self) -> QuicFrame {
// TODO: Teach qlog about this frame type.
QuicFrame::Unknown {
frame_type_bytes: Some(self.get_type().to_u64()),
raw: None,
}
}
}

#[cfg(feature = "qlog")]
impl Frame {
/// Converts a [`crate::Frame`] into a [`QuicFrame`].
Expand Down Expand Up @@ -1077,6 +1088,7 @@ impl Frame {
Self::AddAddress(f) => f.to_qlog(),
Self::ReachOut(f) => f.to_qlog(),
Self::RemoveAddress(f) => f.to_qlog(),
Self::ResetStreamAt(f) => f.to_qlog(),
}
}
}
Expand Down
38 changes: 37 additions & 1 deletion noq-proto/src/connection/send_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,37 @@ impl SendBufferData {
}
}

/// Discard data from the end of the buffer.
///
/// Calling this with offset outside of [`Self::range`] is essentially a no-op since
/// nothing needs to be truncated.
fn truncate(&mut self, offset: u64) {
if !self.range().contains(&offset) {
return;
}

// clear truncated data
let mut n = self.offset;
for segment in self.segments.iter_mut() {
if (n + segment.len() as u64) < offset {
n += segment.len() as u64;
} else if n < offset {
segment.truncate((offset - n) as usize);
n += offset - n;
} else {
segment.clear();
}
}

// remove empty segments
self.segments.retain(|s| !s.is_empty());

// shrink segments if we have a lot of unused capacity
if self.segments.len() * 4 < self.segments.capacity() {
self.segments.shrink_to_fit();
}
}

/// Discard data from the front of the buffer
///
/// Calling this with n > len() is allowed and will simply clear the buffer.
Expand Down Expand Up @@ -232,6 +263,11 @@ impl SendBuffer {
self.retransmits.remove(0..self.fully_acked_offset());
}

pub(super) fn truncate(&mut self, offset: u64) {
self.data.truncate(offset);
self.retransmits.remove(offset..self.offset());
}

/// Compute the next range to transmit on this stream and update state to account for that
/// transmission.
///
Expand Down Expand Up @@ -324,7 +360,7 @@ impl SendBuffer {
}

/// Offset up to which all data has been acknowledged
fn fully_acked_offset(&self) -> u64 {
pub(super) fn fully_acked_offset(&self) -> u64 {
self.data.range().start
}

Expand Down
6 changes: 6 additions & 0 deletions noq-proto/src/connection/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@ pub struct Retransmits {
pub(super) remove_address: BTreeSet<RemoveAddress>,
/// Round and local addresses to advertise in `REACH_OUT` frames
pub(super) reach_out: PendingReachOutFrames,
/// Pending RESET_STREAM_AT frames: (StreamId, offset, error_code).
pub(super) reset_stream_at: Vec<(StreamId, VarInt, VarInt)>,
}

impl Retransmits {
Expand All @@ -584,6 +586,7 @@ impl Retransmits {
add_address,
remove_address,
reach_out,
reset_stream_at,
} = &self;
!max_data
&& !max_stream_id.iter().any(|x| *x)
Expand All @@ -608,6 +611,7 @@ impl Retransmits {
&& add_address.is_empty()
&& remove_address.is_empty()
&& reach_out.is_empty()
&& reset_stream_at.is_empty()
}
}

Expand Down Expand Up @@ -635,6 +639,7 @@ impl ::std::ops::BitOrAssign for Retransmits {
add_address,
remove_address,
mut reach_out,
reset_stream_at,
} = rhs;

// We reduce in-stream head-of-line blocking by queueing retransmits before other data for
Expand Down Expand Up @@ -664,6 +669,7 @@ impl ::std::ops::BitOrAssign for Retransmits {
self.add_address.extend(add_address.iter().copied());
self.remove_address.extend(remove_address.iter().copied());
self.reach_out.append(&mut reach_out);
self.reset_stream_at.extend_from_slice(&reset_stream_at);
}
}

Expand Down
Loading
Loading