From cf6bd877d573341d3048542ce6e36bbb2a121d6a Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 22 Sep 2025 09:18:04 -0400 Subject: [PATCH] wip --- Makefile | 1 + docs/.vitepress/config.mts | 115 +++-- docs/reference/adr/_template.md | 32 ++ docs/reference/adr/index.md | 22 + docs/reference/adr/records.data.mjs | 20 + docs/reference/adr/records/001-bufferview.md | 54 +++ .../src/sources/dogstatsd/framer.rs | 23 +- .../src/sources/dogstatsd/mod.rs | 23 +- lib/saluki-io/src/buf/mod.rs | 11 +- lib/saluki-io/src/deser/framing/iter.rs | 429 ++++++++++++++++++ .../src/deser/framing/length_delimited.rs | 172 ++++--- lib/saluki-io/src/deser/framing/mod.rs | 267 +---------- lib/saluki-io/src/deser/framing/newline.rs | 93 ++-- 13 files changed, 805 insertions(+), 457 deletions(-) create mode 100644 docs/reference/adr/_template.md create mode 100644 docs/reference/adr/index.md create mode 100644 docs/reference/adr/records.data.mjs create mode 100644 docs/reference/adr/records/001-bufferview.md create mode 100644 lib/saluki-io/src/deser/framing/iter.rs diff --git a/Makefile b/Makefile index 1dffe5ae72..c990b9fdbb 100644 --- a/Makefile +++ b/Makefile @@ -508,6 +508,7 @@ test-miri: check-rust-build-tools ensure-rust-miri test-miri: ## Runs all Miri-specific unit tests @echo "[*] Running Miri-specific unit tests..." cargo +nightly-2025-06-16 miri test -p stringtheory + cargo +nightly-2025-06-16 miri test -p saluki-io deser::framing .PHONY: test-loom test-loom: check-rust-build-tools diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index d5dd9fbcd9..8963702f5c 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -1,67 +1,66 @@ -import { defineConfig } from 'vitepress' +import { defineConfig } from "vitepress"; export default defineConfig({ - title: "Saluki", - description: "A toolkit for building telemetry data planes in Rust.", - base: '/saluki/', - cleanUrls: true, - lastUpdated: true, - themeConfig: { - search: { - provider: 'local' - }, + title: "Saluki", + description: "A toolkit for building telemetry data planes in Rust.", + base: "/saluki/", + cleanUrls: true, + lastUpdated: true, + themeConfig: { + search: { + provider: "local", + }, - footer: { - copyright: 'Copyright © 2024-Present Datadog, Inc.', - }, + footer: { + copyright: "Copyright © 2024-Present Datadog, Inc.", + }, - editLink: { - pattern: 'https://github.com/DataDog/saluki/edit/main/docs/:path', - text: 'Edit this page on GitHub' - }, - - lastUpdated: { - text: 'Updated at', - formatOptions: { - dateStyle: 'full', - timeStyle: 'medium' - } - }, + editLink: { + pattern: "https://github.com/DataDog/saluki/edit/main/docs/:path", + text: "Edit this page on GitHub", + }, + + lastUpdated: { + text: "Updated at", + formatOptions: { + dateStyle: "full", + timeStyle: "medium", + }, + }, + + nav: [ + { text: "Home", link: "/" }, + { text: "Developer Guide", link: "/development" }, + ], - nav: [ - { text: 'Home', link: '/' }, - { text: 'Developer Guide', link: '/development' } - ], + sidebar: [ + { + text: "Developer Guide", + items: [ + { text: "Common Language", link: "/development/common-language" }, + { text: "Contributing", link: "/development/contributing" }, + { text: "Style Guide", link: "/development/style-guide" }, + ], + }, + { + text: "Reference Docs", + items: [ + { text: "Architecture", link: "/reference/architecture" }, + { text: "Architectural Decision Records", link: "/reference/adr" }, + ], + }, + { + text: "Agent Data Plane", + items: [{ text: "Releasing", link: "/agent-data-plane/releasing" }], + }, + ], - sidebar: [ - { - text: 'Developer Guide', - items: [ - { text: 'Common Language', link: '/development/common-language' }, - { text: 'Contributing', link: '/development/contributing' }, - { text: 'Style Guide', link: '/development/style-guide' } - ] - }, - { - text: 'Reference Docs', - items: [ - { text: 'Architecture', link: '/reference/architecture' } - ] - }, - { - text: 'Agent Data Plane', - items: [ - { text: 'Releasing', link: '/agent-data-plane/releasing' } - ] - } - ], + outline: { + level: [2, 3], + }, - outline: { - level: [2, 3] + socialLinks: [{ icon: "github", link: "https://github.com/DataDog/saluki" }], }, - socialLinks: [ - { icon: 'github', link: 'https://github.com/DataDog/saluki' } - ] - } -}) + srcExclude: ["docs/reference/adr/_template.md"], +}); diff --git a/docs/reference/adr/_template.md b/docs/reference/adr/_template.md new file mode 100644 index 0000000000..2b23951e99 --- /dev/null +++ b/docs/reference/adr/_template.md @@ -0,0 +1,32 @@ +--- +date: 2025-11-04 +title: ADR 000 - Short title, representative of solved problem and found solution +--- + +# ADR 000 - Short title, representative of solved problem and found solution + +## Problem Statement + +Begin with the problem statement, e.g., in free form using two to three sentences or in the form of an illustrative story. You may want to articulate the problem in form of a question and add links to collaboration boards or issue management systems. + +## Context + +Provide context on problem, including why the current design is not sufficient for meeting requirements, or what has changed that necessitates a new approach. + +## Considered Options + +* title of option 1 +* title of option 2 +* title of option 3 +* ... + +## Decision Outcome + +Chosen option: "(option here)", because [justification. e.g., only option which meets all requirements, etc]. + + +### Consequences + +* Good, because [positive consequence, e.g., improvement of one or more desired qualities, …] +* Bad, because [negative consequence, e.g., compromising one or more desired qualities, …] +* ... diff --git a/docs/reference/adr/index.md b/docs/reference/adr/index.md new file mode 100644 index 0000000000..ff4d90f802 --- /dev/null +++ b/docs/reference/adr/index.md @@ -0,0 +1,22 @@ + + +# Architectural Decision Records (ADRs) + +An **Architectural Decision (AD)** is a justified software design choice that addresses a functional or non-functional requirement of architectural significance. This decision is documented in an **Architectural Decision Record (ADR)**, which details a single AD and its underlying rationale. This section contains a list of all ADRs in Saluki. + +## Adding new ADRs + +Adding a new ADR is a straightforward process: + +1. Create a new file in the `docs/reference/adr/records` directory with a name following the pattern `number-title.md`. The `number` should be the next highest number in the sequence based on the existing ADRs. `title` should be a very short description of the decision, such as related acronyms or keywords related to the decision. +2. Write the ADR content in Markdown format, following the template provided in the `docs/reference/adr/_template.md` file. + +## List of ADRs + + diff --git a/docs/reference/adr/records.data.mjs b/docs/reference/adr/records.data.mjs new file mode 100644 index 0000000000..66740ffd75 --- /dev/null +++ b/docs/reference/adr/records.data.mjs @@ -0,0 +1,20 @@ +import { createContentLoader, createMarkdownRenderer } from "vitepress"; +import path from "node:path"; + +const config = globalThis.VITEPRESS_CONFIG; +const md = await createMarkdownRenderer(config.srcDir, config.markdown, config.site.base, config.logger); + +export default createContentLoader("./reference/adr/records/*.md", { + transform(rawData) { + // For each record, generate the full anchor element, including rendering the title as Markdown, so that + // we can get the right base URL and formatting all of that. + rawData.forEach((record) => { + record.pretty_title = md.render(record.frontmatter.title); + record.url = path.join(config.site.base, record.url); + }); + + return rawData.sort((a, b) => { + return +new Date(b.frontmatter.date) - +new Date(a.frontmatter.date); + }); + }, +}); diff --git a/docs/reference/adr/records/001-bufferview.md b/docs/reference/adr/records/001-bufferview.md new file mode 100644 index 0000000000..651440bca3 --- /dev/null +++ b/docs/reference/adr/records/001-bufferview.md @@ -0,0 +1,54 @@ +--- +date: 2025-11-04 +title: ADR 001 - Switching from `Bytes` to `BufferView<'a>` for improved memory efficiency in payload deserialization. +--- + +# ADR 001 - Switching from `Bytes` to `BufferView<'a>` for improved memory efficiency in payload deserialization. + +## Context and Problem Statement + +When deserializing payloads, such as DogStatsD, framers must allocate to hold the resulting frames that they extract. This is not memory efficient, as it means we always pay a penalty by allocating, even when a borrowed byte slice could otherwise be used. We want to be able to avoid allocating for every single frame that we extract from a payload, both for the obvious benefits of reduced memory usage and fragmentation, but also for the potential latency improvements by not allocating in the critical path of receiving data. + +## Context + +Saluki's overarching design goal is to provide efficient and deterministic memory usage. In order to achieve this, pre-allocated I/O buffers are preferred for accepting data over the network. We do this with a custom type, `BytesBuffer`, and some object pooling facilities to support reuse of buffers in an ergonomic way. On top of that, we utilize traits from the [`bytes`][bytes] crate, namely [`Buf`][bytes_Buf], to make it easy to work with these buffers. These traits provide ergonomic support for using buffers as cursors: read a portion of the buffer, and then "advance" the cursor, so that the same data cannot be read again. + +When framers extract a valid frame, we want to update the source buffer to mark those bytes as consumed. As mentioned above, we use [`Buf`][bytes_Buf] to achieve this, which works well as it stands: the act of carving out a certain chunk of bytes to create our "frame" also advances the source buffer. With the extracted frame, we can then use additional methods from [`Buf`][bytes_Buf] to constrain the frame. This is important because it allows us to remove the delimiter itself, which callers don't care about. This ensures we can both satisfy the need to consume the entire frame, while advancing the source buffer, and also return a sanitized frame that is ready to be used by callers. + +The main problem with this approach is that [`bytes`][bytes] has its own concrete implementations -- [`Bytes`][bytes_Bytes] and [`BytesMut`][bytes_BytesMut], which are not amenable to the type of object pooling we do. In order to ensure that our fixed number of I/O buffers can be reused, we need control over when _usages_ of those buffers is handed back to the caller that retrieved them from the pool in the first place. [`bytes`][bytes] follows an approach where a mutable buffer ([`BytesMut`][bytes_BytesMut]) is used for collecting data (such as the buffer being read into from a network socket read), and then is "frozen" to create a [`Bytes`][bytes_Bytes] instance, which is then able to be cheaply cloned and sliced by virtue of using atomic reference counting under the hood. We can only recover the original [`BytesMut`][bytes_BytesMut] by fallibly trying to consume a [`Bytes`][bytes_Bytes] instance, which may fail if other copies still exist. This means we cannot provide a misuse-resistant object pooling mechanism that ensures we only ever have a maximum of N buffers at any given time, which is ultimately why we created `BytesBuffer`. + +The crux of the problem that this ADR aims to solve is that, when using a custom implementation of [`Buf`][bytes_Buf], none of the optimizations in [`bytes`][bytes] are available to us. This means that the slicing/advancing operations that would otherwise be cheap atomic counter operations now become allocations for intermediate buffers and so on. We want to be able to use our custom buffer type (`BytesBuffer`) in a way that provides the ergonomics and safety of interacting with [`Bytes`][bytes_Bytes] through its [`Buf`][bytes_Buf] implementation, while also still supporting the ability to pool the buffers. + +## Considered Options + +We considered two possible approaches to improve the situation: + +- use borrowed byte slices directly (e.g. pass around `&[u8]`) +- create our own [`Bytes`][bytes_Bytes]-style abstraction to manage borrowed byte slices (`BufferView<'a>`) + +### Use borrowed byte slices directly + +Using borrowed byte slices meets our constraint of avoiding additional allocations for intermediate frames, as framers would simply be returning subslices of the input buffer they were given. However, borrowed byte slices are not ergonomic to work with in terms of ensuring they're advanced. + +In order to accept a byte slice as the input and be able to manipulate it such that we could ensure it was "advanced" past an extracted frame, a mutable borrow would be required: we have to be able to modify the original borrow passed in to the framer. However, since we would also need to take an immutable borrow to capture our "frame", this turns into a classic mutable/immutable borrowing violation. + +### Create our own `Bytes`-style abstraction to manage borrowed byte slices (`BufferView<'a>`) + +This option is an incremental addition on top of borrowed byte slices, which uses immutable borrows and index offsets to provide a "view" over a byte slice, generating the subslice on demand by holding the start and end indices of the slice. Concretely, this maintains the benefit of avoiding intermediate allocations while providing the necessary ergonomics to slice up a source buffer during the course of extracting frames. + +We would create a new type, `BufferView<'a>`, that holds an immutable borrow to an underlying byte slice -- the complete frame, in this case -- and then generates the "view" representing the cleaned up frame by tracking index offsets -- how far from the start, how far from the end -- which can then be used to exclude frame delimiters. Since `BufferView<'a>` holds a borrow of the full frame slice, it can also be used to determine the amount by which the input buffer needs to be advanced. This is the core part of the solution: we maintain immutable borrows while extracting frames, and carry enough information in `BufferView<'a>` to handle the advancing outside of the framer. + +This means that in order to meet our goal of ensuring the the source buffer is properly advanced, we need to handle that outside of the framer, which we've abstracted away in a new `Framed` type: this type provides an iterator-like interface over a source buffer and framer implementation, handling advancing the source buffer as frames are extracted. Finally, an additional helper is required to ensure that input buffers given to a framer are consumed from left to right: while we can theoretically represent any byte slice with `BufferView<'a>`, we don't want to mistakenly allow a frame to come from the middle of a buffer, as we can't "advance" in that way. This helper, `RawBuffer`, is a wrapper around a byte slice that provides the only methods for creating a `BufferView<'a>`. These methods ensure that views are created from the start of the buffer. + +With these helpers, we can now ensure that framers are only able to extract frames from the start of a buffer, that the frame they extract carries the entire frame length necessary for properly advancing the source buffer, and that the logic for advancing the source buffer exists in a single place that can be well tested. + +## Decision Outcome + +We will implement the "Create our own `Bytes`-style abstraction to manage borrowed byte slices (`BufferView<'a>`)" option. + +This option allows to meet our two main goals: avoiding intermediate allocations as well as providing the necessary ergonomics to ensure source buffers are automatically advanced/consumed as frames are extracted. It achieves this with otherwise straightforward helper types, although one main downside is that it requires a level of upfront documentation to explain the invariants, which are not able to be entirely upheld purely through the type system. + +[bytes]: https://docs.rs/bytes/latest/bytes/ +[bytes_Buf]: https://docs.rs/bytes/latest/bytes/buf/trait.Buf.html +[bytes_Bytes]: https://docs.rs/bytes/latest/bytes/struct.Bytes.html +[bytes_BytesMut]: https://docs.rs/bytes/latest/bytes/struct.BytesMut.html diff --git a/lib/saluki-components/src/sources/dogstatsd/framer.rs b/lib/saluki-components/src/sources/dogstatsd/framer.rs index a26e45b4a6..c061045d59 100644 --- a/lib/saluki-components/src/sources/dogstatsd/framer.rs +++ b/lib/saluki-components/src/sources/dogstatsd/framer.rs @@ -1,33 +1,36 @@ -use bytes::Bytes; use saluki_io::{ buf::ReadIoBuffer, - deser::framing::{Framer, FramingError, LengthDelimitedFramer, NestedFramer, NewlineFramer}, + deser::framing::{Framed, LengthDelimitedFramer, NewlineFramer}, net::ListenAddress, }; pub enum DsdFramer { NonStream(NewlineFramer), - Stream(NestedFramer), + Stream(LengthDelimitedFramer, NewlineFramer), } -impl Framer for DsdFramer { - fn next_frame(&mut self, buf: &mut B, is_eof: bool) -> Result, FramingError> { +impl DsdFramer { + pub fn framed<'buf, B>(&self, buf: &'buf mut B, is_eof: bool) -> Framed<'buf, '_, B> + where + B: ReadIoBuffer, + { match self { - Self::NonStream(framer) => framer.next_frame(buf, is_eof), - Self::Stream(framer) => framer.next_frame(buf, is_eof), + Self::NonStream(framer) => Framed::direct(framer, buf, is_eof), + Self::Stream(outer, inner) => Framed::nested(outer, inner, buf, is_eof), } } } -pub fn get_framer(listen_address: &ListenAddress) -> DsdFramer { +pub fn get_framer(listen_address: &ListenAddress, max_frame_size: usize) -> DsdFramer { + let length_delimited_framer = LengthDelimitedFramer::default().with_max_frame_size(max_frame_size); let newline_framer = NewlineFramer::default().required_on_eof(false); match listen_address { - ListenAddress::Tcp(_) => DsdFramer::Stream(NestedFramer::new(newline_framer, LengthDelimitedFramer)), + ListenAddress::Tcp(_) => DsdFramer::Stream(length_delimited_framer, newline_framer), ListenAddress::Udp(_) => DsdFramer::NonStream(newline_framer), #[cfg(unix)] ListenAddress::Unixgram(_) => DsdFramer::NonStream(newline_framer), #[cfg(unix)] - ListenAddress::Unix(_) => DsdFramer::Stream(NestedFramer::new(newline_framer, LengthDelimitedFramer)), + ListenAddress::Unix(_) => DsdFramer::Stream(length_delimited_framer, newline_framer), } } diff --git a/lib/saluki-components/src/sources/dogstatsd/mod.rs b/lib/saluki-components/src/sources/dogstatsd/mod.rs index b3efd1b97d..244d5ac10a 100644 --- a/lib/saluki-components/src/sources/dogstatsd/mod.rs +++ b/lib/saluki-components/src/sources/dogstatsd/mod.rs @@ -33,7 +33,7 @@ use saluki_env::WorkloadProvider; use saluki_error::{generic_error, ErrorContext as _, GenericError}; use saluki_io::{ buf::{BytesBuffer, FixedSizeVec}, - deser::{codec::dogstatsd::*, framing::FramerExt as _}, + deser::codec::dogstatsd::*, net::{ listener::{Listener, ListenerError}, ConnectionAddress, ListenAddress, Stream, @@ -425,6 +425,8 @@ impl SourceBuilder for DogStatsDConfiguration { )); } + let io_buffer_size = get_adjusted_buffer_size(self.buffer_size); + let maybe_origin_tags_resolver = self .workload_provider .clone() @@ -448,8 +450,9 @@ impl SourceBuilder for DogStatsDConfiguration { Ok(Box::new(DogStatsD { listeners, io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || { - FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size)) + FixedSizeVec::with_capacity(io_buffer_size) }), + io_buffer_size, codec, context_resolvers, enabled_filter: enable_payloads_filter, @@ -494,6 +497,7 @@ impl MemoryBounds for DogStatsDConfiguration { pub struct DogStatsD { listeners: Vec, io_buffer_pool: FixedSizeObjectPool, + io_buffer_size: usize, codec: DogstatsdCodec, context_resolvers: ContextResolvers, enabled_filter: EnablePayloadsFilter, @@ -504,6 +508,7 @@ struct ListenerContext { shutdown_handle: DynamicShutdownHandle, listener: Listener, io_buffer_pool: FixedSizeObjectPool, + io_buffer_size: usize, codec: DogstatsdCodec, context_resolvers: ContextResolvers, additional_tags: Arc<[String]>, @@ -680,6 +685,7 @@ impl Source for DogStatsD { shutdown_handle: listener_shutdown_coordinator.register(), listener, io_buffer_pool: self.io_buffer_pool.clone(), + io_buffer_size: self.io_buffer_size, codec: self.codec.clone(), context_resolvers: self.context_resolvers.clone(), additional_tags: self.additional_tags.clone(), @@ -725,6 +731,7 @@ async fn process_listener( shutdown_handle, mut listener, io_buffer_pool, + io_buffer_size, codec, context_resolvers, additional_tags, @@ -748,7 +755,7 @@ async fn process_listener( let handler_context = HandlerContext { listen_addr: listen_addr.clone(), - framer: get_framer(&listen_addr), + framer: get_framer(&listen_addr, io_buffer_size), codec: codec.clone(), io_buffer_pool: io_buffer_pool.clone(), metrics: build_metrics(&listen_addr, source_context.component_context()), @@ -792,7 +799,7 @@ async fn drive_stream( ) { let HandlerContext { listen_addr, - mut framer, + framer, codec, io_buffer_pool, metrics, @@ -860,12 +867,12 @@ async fn drive_stream( bytes_read ); - let mut frames = io_buffer.framed(&mut framer, reached_eof); + let mut frames = framer.framed(&mut io_buffer, reached_eof); 'frame: loop { - match frames.next() { + match frames.next_frame() { Some(Ok(frame)) => { trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame."); - match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) { + match handle_frame(frame, &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) { Ok(Some(event)) => { if let Some(event_buffer) = event_buffer_manager.try_push(event) { debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events."); @@ -880,7 +887,7 @@ async fn drive_stream( continue }, Err(e) => { - let frame_lossy_str = String::from_utf8_lossy(&frame); + let frame_lossy_str = String::from_utf8_lossy(frame); warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame."); }, } diff --git a/lib/saluki-io/src/buf/mod.rs b/lib/saluki-io/src/buf/mod.rs index 771c6ecb5d..69d835b25e 100644 --- a/lib/saluki-io/src/buf/mod.rs +++ b/lib/saluki-io/src/buf/mod.rs @@ -6,7 +6,7 @@ mod vec; pub use self::vec::{BytesBuffer, FixedSizeVec}; /// An I/O buffer that can be read from. -pub trait ReadIoBuffer: Buf { +pub trait ReadIoBuffer: Buf + Send { fn capacity(&self) -> usize; } @@ -28,6 +28,15 @@ impl ReadIoBuffer for VecDeque { } } +impl ReadIoBuffer for &mut T +where + T: ReadIoBuffer, +{ + fn capacity(&self) -> usize { + (**self).capacity() + } +} + /// An I/O buffer that can be written to. pub trait WriteIoBuffer: BufMut {} diff --git a/lib/saluki-io/src/deser/framing/iter.rs b/lib/saluki-io/src/deser/framing/iter.rs new file mode 100644 index 0000000000..8aa33c03d4 --- /dev/null +++ b/lib/saluki-io/src/deser/framing/iter.rs @@ -0,0 +1,429 @@ +use std::{marker::PhantomData, mem::ManuallyDrop, ptr::NonNull}; + +use super::{Framer, FramingError}; +use crate::buf::ReadIoBuffer; + +struct DirectIter<'framer, 'buf> { + framer: &'framer dyn Framer, + buf: &'buf [u8], + is_eof: bool, +} + +impl<'framer, 'buf> DirectIter<'framer, 'buf> { + fn new(framer: &'framer dyn Framer, buf: &'buf [u8], is_eof: bool) -> Self { + Self { framer, buf, is_eof } + } + + fn next_frame(&mut self) -> Result, FramingError> { + // Our buffer is empty, so we're all done. + if self.buf.is_empty() { + return Ok(None); + } + + match self.framer.next_frame(&mut self.buf, self.is_eof)? { + Some(frame) => Ok(Some(frame)), + None => Ok(None), + } + } + + fn finish(self) -> usize { + self.buf.len() + } +} + +struct NestedIter<'framer, 'buf> { + outer_framer: &'framer dyn Framer, + inner_framer: &'framer dyn Framer, + buf: &'buf [u8], + outer_frame_buf: &'buf [u8], + is_eof: bool, +} + +impl<'framer, 'buf> NestedIter<'framer, 'buf> { + fn new( + outer_framer: &'framer dyn Framer, inner_framer: &'framer dyn Framer, buf: &'buf [u8], is_eof: bool, + ) -> Self { + Self { + outer_framer, + inner_framer, + buf, + outer_frame_buf: &[], + is_eof, + } + } + + fn next_frame(&mut self) -> Result, FramingError> { + loop { + // Check if our outer frame is empty, and try to extract the next outer frame if so. + if self.outer_frame_buf.is_empty() { + // If our root buffer is also empty, then we're done. + if self.buf.is_empty() { + return Ok(None); + } + + match self.outer_framer.next_frame(&mut self.buf, self.is_eof)? { + Some(frame) => { + self.outer_frame_buf = frame; + } + None => return Ok(None), + } + } + + // Try to extract an inner frame from the outer frame. + match self.inner_framer.next_frame(&mut self.outer_frame_buf, true)? { + Some(frame) => return Ok(Some(frame)), + None => continue, + } + } + } + + fn finish(self) -> usize { + self.buf.len() + } +} + +enum Iter<'framer, 'buf> { + Direct(DirectIter<'framer, 'buf>), + Nested(NestedIter<'framer, 'buf>), +} + +impl<'framer, 'buf> Iter<'framer, 'buf> { + fn next_frame(&mut self) -> Result, FramingError> { + match self { + Self::Direct(iter) => iter.next_frame(), + Self::Nested(iter) => iter.next_frame(), + } + } + + fn finish(self) -> usize { + match self { + Self::Direct(iter) => iter.finish(), + Self::Nested(iter) => iter.finish(), + } + } +} + +/// Core primitive for advancing the source buffer passed to `Framed` on drop. +/// +/// Overall, our goal is to hold a mutable reference to our source buffer -- so that we can advance it when we're all +/// done extracting any complete frames -- while also returning _immutable_ references to subslices of that same buffer, +/// representing the complete frames, to avoid the need to allocate or copy any data. This would already be trivially +/// doable with the usage of the `Buf` trait which `ReadIoBuffer` is a superset of, except for our other constraint: +/// nested framing. +/// +/// Nested framing requires holding a subslice of the buffer and then extracting frames from _that_ until exhausted. This +/// is difficult to do with just `Buf` because if we're tying the lifetimes of the frames we emit to the caller to the frame +/// iterator, then we can't prove that we have exclusive access to the buffer in order to advance it after an outer frame +/// has been exhausted. This is sort of a chicken-and-egg problem, and there are some potential solutions in the vein of +/// vector indexing -- track regions of the source buffer instead of holding slices directly -- but they end up being +/// pretty ugly, IMO. +/// +/// We've chosen to solve this by splitting things up into two phases: extraction and advancement. Extraction is everything +/// from the time we create `Framed` up until the time it's dropped: we utilize immutable borrows of the source buffer internally +/// to make writing the framers themselves, and the direct/nested wrappers, as easy as possible. Advancement occurs during drop, +/// where we switch back to a mutable borrow of the source buffer in order to advance it. This is where the unsafety lies. +/// +/// In order to hold on to the original mutable reference despite needing _immutable_ references for the framers, we +/// convert the mutable reference into a mutable pointer. Creating this pointer initially, and accessing it during drop, +/// is the entirety of the unsafety. This creates a risk of aliasing violations because of how easy it is to create a +/// mutable reference to the buffer while there are active immutable references. We take care to ensure there are no +/// outstanding immutable references to the buffer before recreating the mutable reference from the pointer during drop, +/// which would otherwise represent immediate UB. +struct FramedInner<'buf, 'framer, B> +where + B: ReadIoBuffer, +{ + buf_ptr: NonNull, + iter: ManuallyDrop>, + _buf: PhantomData<&'buf mut B>, +} + +impl<'buf, 'framer, B> FramedInner<'buf, 'framer, B> +where + B: ReadIoBuffer, +{ + fn direct(framer: &'framer dyn Framer, buf: &'buf mut B, is_eof: bool) -> Self { + Self { + buf_ptr: NonNull::new(buf as *mut B).expect("buffer reference cannot be invalid"), + iter: ManuallyDrop::new(Iter::Direct(DirectIter::new(framer, buf.chunk(), is_eof))), + _buf: PhantomData, + } + } + + fn nested( + outer_framer: &'framer dyn Framer, inner_framer: &'framer dyn Framer, buf: &'buf mut B, is_eof: bool, + ) -> Self { + Self { + buf_ptr: NonNull::new(buf as *mut B).expect("buffer reference cannot be invalid"), + iter: ManuallyDrop::new(Iter::Nested(NestedIter::new( + outer_framer, + inner_framer, + buf.chunk(), + is_eof, + ))), + _buf: PhantomData, + } + } +} + +impl<'buf, 'framer, B> Drop for FramedInner<'buf, 'framer, B> +where + B: ReadIoBuffer, +{ + fn drop(&mut self) { + // Consume the inner iterator to ensure we have no outstanding references to the underlying buffer, and in doing + // so, get the final buffer length so we know how much to advance by. + // + // SAFETY: We only consume the iterator value here in the drop logic, so we know that this logic will only be + // triggered once, and the iterator can't be taken again. + let iter = unsafe { ManuallyDrop::take(&mut self.iter) }; + let iter_buf_len = iter.finish(); + + // Reconstitute our mutable reference to the underlying buffer. + // + // SAFETY: Our iterator implementations only give out immutable references to slices of the buffer tied to the + // lifetime of the iterator itself, and we've consumed and dropped the iterator right before this point, so we + // know that there are no other outstanding references to the buffer other than the one we were given when + // constructing `FramedInner` itself. + let buf = unsafe { self.buf_ptr.as_mut() }; + + // Figure out how far the buffer view from the iterator has been advanced and advance the underlying buffer by + // that many bytes, which keeps things consistent. + let advance_by = buf.remaining() - iter_buf_len; + buf.advance(advance_by); + } +} + +// SAFETY: `ReadIoBuffer` is `Send`, so we can safely hold a mutable pointer of `B` and still be `Send` ourselves. +unsafe impl<'buf, 'framer, B> Send for FramedInner<'buf, 'framer, B> where B: ReadIoBuffer {} + +/// An iterator of framed messages over a generic buffer. +/// +/// When reading messages from a network socket, it is typical to use a single buffer that is read into and then checked +/// for complete messages: read some data from the socket, extract as many complete messages as possible, over and over +/// again until the connection is closed. Framers help abstract the process of finding these messages in a buffer, but +/// it can still be cumbersome to ensure that the buffer is advanced once a message has been extracted. Advancing the buffer +/// is the final step that ensures we track the progress made, free up the space in the buffer, and allow the buffer to be +/// reused for the next read. +/// +/// `Framed` provides a safe abstraction over the process of using an arbitrary `Framer` implementation (or nested +/// implementation, see "Direct vs Nested" section) over an arbitrary buffer implementation. It does this while avoiding +/// unnecessary allocations and copies. +/// +/// # Direct vs Nested +/// +/// `Framed` is constructed in either "direct" or "nested" mode. In direct mode, a single `Framer` is provided and used +/// for all messages. This is the most common mode: messages are framed individually. +/// +/// In nested mode, two `Framer`s are provided: an "outer" framer and an "inner" framer. The outer framer is used to +/// extract top-level frames from the buffer, and when a top-level frame is extracted, the inner framer is used to +/// extract messages from the top-level frame itself, which are ultimately returned to the caller. This allows for more +/// complex framing scenarios where we only care about the "inner" frames, but still want the convenience of using a +/// `Framer` implementation for extracting the outer frames. +pub struct Framed<'buf, 'framer, B> +where + B: ReadIoBuffer, +{ + inner: FramedInner<'buf, 'framer, B>, +} + +impl<'buf, 'framer, B> Framed<'buf, 'framer, B> +where + B: ReadIoBuffer, +{ + /// Creates a new `Framed` iterator in direct mode. + pub fn direct(framer: &'framer dyn Framer, buf: &'buf mut B, is_eof: bool) -> Self { + Self { + inner: FramedInner::direct(framer, buf, is_eof), + } + } + + /// Creates a new `Framed` iterator in nested mode. + pub fn nested(outer: &'framer dyn Framer, inner: &'framer dyn Framer, buf: &'buf mut B, is_eof: bool) -> Self { + Self { + inner: FramedInner::nested(outer, inner, buf, is_eof), + } + } + + /// Attempt to extract the next frame from the buffer. + /// + /// If there was insufficient data to extract a frame, and it may be possible in the future to do so, `None` is + /// returned. Otherwise, `Some` is returned with either an extracted frame or an error indicating why a frame could + /// not be extracted. + /// + /// Behavior when EOF is reached is framer-specific and in some cases may allow for decoding a frame even when the + /// inherent delimiting data is not present. + /// + /// # Errors + /// + /// If an error is detected when reading the next frame, an error is returned. + pub fn next_frame(&mut self) -> Option> { + self.inner.iter.next_frame().transpose() + } +} + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + + use super::*; + use crate::deser::framing::{LengthDelimitedFramer, NewlineFramer}; + + fn nested_payload(inner_frames: &[&[u8]], outer_frame_count: usize) -> VecDeque { + let mut outer_frames = VecDeque::new(); + + let inner_frames_chunk_size = inner_frames.len() / outer_frame_count; + for inner_frames in inner_frames.chunks(inner_frames_chunk_size) { + let mut inner_frames_chunk = Vec::new(); + for inner_frame in inner_frames { + inner_frames_chunk.extend_from_slice(&inner_frame[..]); + inner_frames_chunk.push(b'\n'); + } + + outer_frames.extend(&(inner_frames_chunk.len() as u32).to_le_bytes()); + outer_frames.extend(inner_frames_chunk); + } + + outer_frames + } + + fn nested_payload_direct(raw_outer_frames: &[&[u8]]) -> VecDeque { + let mut outer_frames = VecDeque::new(); + + for raw_outer_frame in raw_outer_frames { + outer_frames.extend(&(raw_outer_frame.len() as u32).to_le_bytes()); + outer_frames.extend(raw_outer_frame.iter()); + } + + outer_frames + } + + #[test] + fn framed_nested_single_outer_multiple_inner() { + // We create a buffer that has a single outer (length delimited) frame with multiple inner (newline delimited) frames. + let input_frames = [&b"frame1"[..], &b"frame2"[..], &b"frame3"[..]]; + let mut buf = nested_payload(input_frames.as_slice(), 1); + + // Create our framer: length-delimited frames on the outside, and newline-delimited frames on the inside. + let outer = LengthDelimitedFramer::default(); + let inner = NewlineFramer::default(); + let mut framed = Framed::nested(&outer, &inner, &mut buf, false); + + // Now we should be able to extract our original three frames from the buffer. + for input_frame in input_frames { + let frame = framed + .next_frame() + .expect("should not fail to read from payload") + .expect("should not fail to extract frame from payload"); + assert_eq!(frame, input_frame); + } + + let maybe_frame = framed.next_frame(); + assert!(maybe_frame.is_none()); + + drop(framed); + + // We should have consumed the entire buffer. + assert!(buf.is_empty()); + } + + #[test] + fn framed_nested_multiple_outer_single_inner() { + // We create a buffer that has multiple outer (length delimited) frames each with a single inner (newline delimited) frame. + let input_frames = &[&b"frame1"[..], &b"frame2"[..], &b"frame3"[..]]; + let mut buf = nested_payload(input_frames.as_slice(), 3); + + // Create our framer: length-delimited frames on the outside, and newline-delimited frames on the inside. + let outer = LengthDelimitedFramer::default(); + let inner = NewlineFramer::default(); + let mut framed = Framed::nested(&outer, &inner, &mut buf, false); + + // Now we should be able to extract our original three frames from the buffer. + for input_frame in input_frames { + let frame = framed + .next_frame() + .expect("should not fail to read from payload") + .expect("should not fail to extract frame from payload"); + assert_eq!(frame, &input_frame[..]); + } + + let maybe_frame = framed.next_frame(); + assert!(maybe_frame.is_none()); + + drop(framed); + + // We should have consumed the entire buffer. + assert!(buf.is_empty()); + } + + #[test] + fn framed_nested_multiple_outer_multiple_inner() { + // We create a buffer that has multiple outer (length delimited) frames each with multiple inner (newline delimited) frames. + let input_frames = &[ + &b"frame1"[..], + &b"frame2"[..], + &b"frame3"[..], + &b"frame4"[..], + &b"frame5"[..], + &b"frame6"[..], + ]; + let mut buf = nested_payload(input_frames.as_slice(), 3); + + // Create our framer: length-delimited frames on the outside, and newline-delimited frames on the inside. + let outer = LengthDelimitedFramer::default(); + let inner = NewlineFramer::default(); + let mut framed = Framed::nested(&outer, &inner, &mut buf, false); + + // Now we should be able to extract our original six frames from the buffer. + for input_frame in input_frames { + let frame = framed + .next_frame() + .expect("should not fail to read from payload") + .expect("should not fail to extract frame from payload"); + assert_eq!(frame, &input_frame[..]); + } + + let maybe_frame = framed.next_frame(); + assert!(maybe_frame.is_none()); + + drop(framed); + + // We should have consumed the entire buffer. + assert!(buf.is_empty()); + } + + #[test] + fn framed_nested_multiple_outer_with_zero_length() { + // We create a buffer that has multiple outer (length delimited) frames each with multiple inner (newline + // delimited) frames, but one of the outer frames has an empty inner frame, such that the outer frame is just + // the length delimiter, with a value of 0. + let input_frames = &[&b"frame1\n"[..], &b""[..], &b"frame3\n"[..]]; + let mut buf = nested_payload_direct(input_frames.as_slice()); + + // Create our framer: length-delimited frames on the outside, and newline-delimited frames on the inside. + let outer = LengthDelimitedFramer::default(); + let inner = NewlineFramer::default(); + let mut framed = Framed::nested(&outer, &inner, &mut buf, false); + + // Now we should be able to extract our original two frames from the buffer. + // + // Specifically, this means we correctly handle skipping over the empty outer frame. We filter out the empty + // frame from our comparison, and we also adjust our comparison since our raw input frames include the newline + // delimiter. + for input_frame in input_frames.iter().filter(|frame| !frame.is_empty()) { + let frame = framed + .next_frame() + .expect("should not fail to read from payload") + .expect("should not fail to extract frame from payload"); + assert_eq!(frame, &input_frame[..input_frame.len() - 1]); + } + + let maybe_frame = framed.next_frame(); + assert!(maybe_frame.is_none()); + + drop(framed); + + // We should have consumed the entire buffer. + assert!(buf.is_empty()); + } +} diff --git a/lib/saluki-io/src/deser/framing/length_delimited.rs b/lib/saluki-io/src/deser/framing/length_delimited.rs index 6799ec5a0f..60ed7a5717 100644 --- a/lib/saluki-io/src/deser/framing/length_delimited.rs +++ b/lib/saluki-io/src/deser/framing/length_delimited.rs @@ -1,34 +1,44 @@ -use bytes::Bytes; use tracing::trace; use super::{Framer, FramingError}; -use crate::buf::ReadIoBuffer; /// Frames incoming data by splitting data based on a fixed-size length delimiter. /// /// All frames are prepended with a 4-byte integer, in little endian order, which indicates how much additional data is /// included in the frame. This framer only supports frame lengths that fit within the given buffer, which is to say /// that if the length described in the delimiter would exceed the current buffer, it is considered an invalid frame. -#[derive(Default)] -pub struct LengthDelimitedFramer; +pub struct LengthDelimitedFramer { + max_frame_size: usize, +} + +impl LengthDelimitedFramer { + /// Sets the maximum frame size that this framer will accept. + /// + /// This controls whether or not a frame is rejected after decoding the frame length delimiter. This should + /// generally be used if I/O buffers are fixed in size and cannot be expanded, as this represents the effective + /// upper bound on the size of frames that could be received with such buffers. + /// + /// Defaults to `u32::MAX`. + pub const fn with_max_frame_size(mut self, max_frame_size: usize) -> Self { + self.max_frame_size = max_frame_size; + self + } +} impl Framer for LengthDelimitedFramer { - fn next_frame(&mut self, buf: &mut B, is_eof: bool) -> Result, FramingError> { - trace!(buf_len = buf.remaining(), "Processing buffer."); + fn next_frame<'buf>(&self, buf: &mut &'buf [u8], is_eof: bool) -> Result, FramingError> { + trace!(buf_len = buf.len(), "Processing buffer."); - let chunk = buf.chunk(); - if chunk.is_empty() { + if buf.is_empty() { return Ok(None); } - trace!(chunk_len = chunk.len(), "Processing chunk."); - // See if there's enough data to read the frame length. - if chunk.len() < 4 { + if buf.len() < 4 { return if is_eof { Err(FramingError::PartialFrame { needed: 4, - remaining: chunk.len(), + remaining: buf.len(), }) } else { Ok(None) @@ -36,31 +46,29 @@ impl Framer for LengthDelimitedFramer { } // See if we have enough data to read the full frame. - let frame_len = u32::from_le_bytes(chunk[0..4].try_into().unwrap()) as usize; - let frame_len_with_length = frame_len.saturating_add(4); - if frame_len_with_length > buf.capacity() { + let frame_len = u32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize; + let full_frame_len = frame_len.saturating_add(4); + if full_frame_len > self.max_frame_size { return Err(oversized_frame_err(frame_len)); } - if chunk.len() < frame_len_with_length { + if buf.len() < full_frame_len { return if is_eof { // If we've hit EOF and we have a partial frame here, well, then... it's invalid. Err(FramingError::PartialFrame { - needed: frame_len_with_length, - remaining: chunk.len(), + needed: full_frame_len, + remaining: buf.len(), }) } else { Ok(None) }; } - // Split out the entire frame -- length delimiter included -- and then carve out the length delimiter from the - // frame that we return. - // - // TODO: This is a bit inefficient, as we're copying the entire frame here. We could potentially avoid this by - // adding some specialized trait methods to `ReadIoBuffer` that could let us, potentially, implement equivalent - // slicing that is object pool aware (i.e., somehow utilizing `FrozenBytesBuffer`, etc). - let frame = buf.copy_to_bytes(frame_len_with_length).slice(4..); + // Carve out the entire frame, and then adjust our view to start after the delimiter. + let mut frame = buf + .split_off(..full_frame_len) + .expect("buf should be long enough to extract full frame"); + frame = &frame[4..]; Ok(Some(frame)) } @@ -73,24 +81,31 @@ const fn oversized_frame_err(frame_len: usize) -> FramingError { } } +impl Default for LengthDelimitedFramer { + fn default() -> Self { + Self { + // Use `u32::MAX` since that's the maximum frame size that can be represented in the length delimiter. + max_frame_size: usize::try_from(u32::MAX).unwrap_or(usize::MAX), + } + } +} + #[cfg(test)] mod tests { - use std::collections::VecDeque; - use super::*; - fn get_delimited_payload(inner: &[u8], with_newline: bool) -> VecDeque { + fn get_delimited_payload(inner: &[u8], with_newline: bool) -> Vec { let payload_len = if with_newline { inner.len() + 1 } else { inner.len() }; get_delimited_payload_with_fixed_length(inner, payload_len as u32, with_newline) } - fn get_delimited_payload_with_fixed_length(inner: &[u8], frame_len: u32, with_newline: bool) -> VecDeque { - let mut payload = VecDeque::new(); + fn get_delimited_payload_with_fixed_length(inner: &[u8], frame_len: u32, with_newline: bool) -> Vec { + let mut payload = Vec::new(); payload.extend(&frame_len.to_le_bytes()); payload.extend(inner); if with_newline { - payload.push_back(b'\n'); + payload.push(b'\n'); } payload @@ -99,17 +114,17 @@ mod tests { #[test] fn basic() { let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, false); - - let mut framer = LengthDelimitedFramer; + let buf = get_delimited_payload(payload, false); + let mut src = &buf[..]; + let framer = LengthDelimitedFramer::default(); let frame = framer - .next_frame(&mut buf, false) + .next_frame(&mut src, false) .expect("should not fail to read from payload") .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], payload); - assert!(buf.is_empty()); + assert_eq!(frame, payload); + assert!(src.is_empty(), "frame should consume entire buffer"); } #[test] @@ -117,39 +132,51 @@ mod tests { // We create a full, valid frame and then take incrementally larger slices of it, ensuring that we can't // actually read the frame until we give the framer the entire buffer. let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, false); + let buf = get_delimited_payload(payload, false); - let mut framer = LengthDelimitedFramer; + let framer = LengthDelimitedFramer::default(); // Try reading a frame from a buffer that doesn't have enough bytes for the length delimiter itself. let mut no_delimiter_buf = buf.clone(); no_delimiter_buf.truncate(3); + let mut src1 = &no_delimiter_buf[..]; let maybe_frame = framer - .next_frame(&mut no_delimiter_buf, false) + .next_frame(&mut src1, false) .expect("should not fail to read from payload"); assert!(maybe_frame.is_none()); - assert_eq!(no_delimiter_buf.len(), 3); + assert_eq!( + no_delimiter_buf.len(), + src1.len(), + "should not consume from buffer if frame isn't returned" + ); // Try reading a frame from a buffer that has enough bytes for the length delimiter, but not as many bytes as // the length delimiter indicates. let mut delimiter_but_partial_buf = buf.clone(); delimiter_but_partial_buf.truncate(7); + let mut src2 = &delimiter_but_partial_buf[..]; let maybe_frame = framer - .next_frame(&mut delimiter_but_partial_buf, false) + .next_frame(&mut src2, false) .expect("should not fail to read from payload"); assert!(maybe_frame.is_none()); - assert_eq!(delimiter_but_partial_buf.len(), 7); + assert_eq!( + delimiter_but_partial_buf.len(), + src2.len(), + "should not consume from buffer if frame isn't returned" + ); // Now try reading a frame from the original buffer, which should succeed. + let mut src3 = &buf[..]; + let frame = framer - .next_frame(&mut buf, false) + .next_frame(&mut src3, false) .expect("should not fail to read from payload") .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], payload); - assert!(buf.is_empty()); + assert_eq!(frame, payload); + assert!(src3.is_empty(), "frame should consume entire buffer"); } #[test] @@ -157,62 +184,77 @@ mod tests { // We create a full, valid frame and then take incrementally larger slices of it, ensuring that we can't // actually read the frame until we give the framer the entire buffer. let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, false); - let frame_len = buf.len(); + let buf = get_delimited_payload(payload, false); - let mut framer = LengthDelimitedFramer; + let framer = LengthDelimitedFramer::default(); // Try reading a frame from a buffer that doesn't have enough bytes for the length delimiter itself. let mut no_delimiter_buf = buf.clone(); no_delimiter_buf.truncate(3); + let mut src1 = &no_delimiter_buf[..]; - let maybe_frame = framer.next_frame(&mut no_delimiter_buf, true); + let maybe_frame = framer.next_frame(&mut src1, true); assert_eq!( maybe_frame, Err(FramingError::PartialFrame { needed: 4, - remaining: 3 + remaining: no_delimiter_buf.len() }) ); - assert_eq!(no_delimiter_buf.len(), 3); + assert_eq!( + no_delimiter_buf.len(), + src1.len(), + "should not consume from buffer if frame isn't returned" + ); // Try reading a frame from a buffer that has enough bytes for the length delimiter, but not as many bytes as // the length delimiter indicates. let mut delimiter_but_partial_buf = buf.clone(); delimiter_but_partial_buf.truncate(7); + let mut src2 = &delimiter_but_partial_buf[..]; - let maybe_frame = framer.next_frame(&mut delimiter_but_partial_buf, true); + let maybe_frame = framer.next_frame(&mut src2, true); assert_eq!( maybe_frame, Err(FramingError::PartialFrame { - needed: frame_len, - remaining: 7 + needed: buf.len(), + remaining: delimiter_but_partial_buf.len(), }) ); - assert_eq!(delimiter_but_partial_buf.len(), 7); + assert_eq!( + delimiter_but_partial_buf.len(), + src2.len(), + "should not consume from buffer if frame isn't returned" + ); // Now try reading a frame from the original buffer, which should succeed. + let mut src3 = &buf[..]; + let frame = framer - .next_frame(&mut buf, true) + .next_frame(&mut src3, true) .expect("should not fail to read from payload") .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], payload); - assert!(buf.is_empty()); + assert_eq!(frame, payload); + assert!(src3.is_empty(), "frame should consume entire buffer"); } #[test] fn oversized_frame() { // We create an invalid frame with a length that exceeds the overall length of the resulting buffer. let payload = b"hello, world!"; - let mut buf = get_delimited_payload_with_fixed_length(payload, (payload.len() * 10) as u32, false); - let buf_len = buf.len(); + let buf = get_delimited_payload_with_fixed_length(payload, 32, false); + let mut src = &buf[..]; - let mut framer = LengthDelimitedFramer; + let framer = LengthDelimitedFramer::default().with_max_frame_size(24); // We should get back an error that the frame is invalid, and the original buffer should not be altered at all. - let maybe_frame = framer.next_frame(&mut buf, false); - assert_eq!(maybe_frame, Err(oversized_frame_err(payload.len() * 10))); - assert_eq!(buf.len(), buf_len); + let maybe_frame = framer.next_frame(&mut src, false); + assert_eq!(maybe_frame, Err(oversized_frame_err(32))); + assert_eq!( + buf.len(), + src.len(), + "should not consume from buffer if frame isn't returned" + ); } } diff --git a/lib/saluki-io/src/deser/framing/mod.rs b/lib/saluki-io/src/deser/framing/mod.rs index 32d5768324..8d46ea1146 100644 --- a/lib/saluki-io/src/deser/framing/mod.rs +++ b/lib/saluki-io/src/deser/framing/mod.rs @@ -1,8 +1,7 @@ -use bytes::Bytes; use snafu::Snafu; -use tracing::trace; -use crate::buf::ReadIoBuffer; +mod iter; +pub use self::iter::Framed; mod length_delimited; pub use self::length_delimited::LengthDelimitedFramer; @@ -37,7 +36,7 @@ pub enum FramingError { } /// A trait for reading framed messages from a buffer. -pub trait Framer { +pub trait Framer: Sync { /// Attempt to extract the next frame from the buffer. /// /// If enough data was present to extract a frame, `Ok(Some(frame))` is returned. If not enough data was present, and @@ -49,266 +48,14 @@ pub trait Framer { /// # Errors /// /// If an error is detected when reading the next frame, an error is returned. - fn next_frame(&mut self, buf: &mut B, is_eof: bool) -> Result, FramingError>; + fn next_frame<'buf>(&self, buf: &mut &'buf [u8], is_eof: bool) -> Result, FramingError>; } -/// A nested framer that extracts inner frames from outer frames. -/// -/// This framer takes two input framers -- the "outer" and "inner" framers -- and extracts outer frames, and once an -/// outer frame has been extract, extracts as many inner frames from the outer frame as possible. Callers deal -/// exclusively with the extracted inner frames. -pub struct NestedFramer { - inner: Inner, - outer: Outer, - current_outer_frame: Option, -} - -impl NestedFramer { - /// Creates a new `NestedFramer` from the given inner and outer framers. - pub fn new(inner: Inner, outer: Outer) -> Self { - Self { - inner, - outer, - current_outer_frame: None, - } - } -} - -impl Framer for NestedFramer -where - Inner: Framer, - Outer: Framer, -{ - fn next_frame(&mut self, buf: &mut B, is_eof: bool) -> Result, FramingError> { - loop { - // Take our current outer frame, or if we have none, try to get the next one. - let outer_frame = match self.current_outer_frame.as_mut() { - Some(frame) => { - trace!( - buf_len = buf.remaining(), - frame_len = frame.len(), - "Using existing outer frame." - ); - - frame - } - None => { - trace!(buf_len = buf.remaining(), "No existing outer frame."); - - match self.outer.next_frame(buf, is_eof)? { - Some(frame) => { - trace!( - buf_len = buf.remaining(), - frame_len = frame.len(), - ?frame, - "Extracted outer frame." - ); - - self.current_outer_frame.get_or_insert(frame) - } - - // If we can't get another outer frame, then we're done for now. - None => return Ok(None), - } - } - }; - - // Try to get the next inner frame. - match self.inner.next_frame(outer_frame, true)? { - Some(frame) => { - trace!( - buf_len = buf.remaining(), - outer_frame_len = outer_frame.len(), - inner_frame_len = frame.len(), - "Extracted inner frame." - ); - - return Ok(Some(frame)); - } - None => { - // We can't get anything else from our inner frame. If our outer frame is empty, and our input buffer - // isn't empty, clear the current outer frame so that we can try to grab the next one. - trace!( - buf_len = buf.remaining(), - outer_frame_len = outer_frame.len(), - "Couldn't extract inner frame from existing outer frame." - ); - - if outer_frame.is_empty() && buf.remaining() != 0 { - self.current_outer_frame = None; - continue; - } else { - return Ok(None); - } - } - } - } - } -} - -/// An iterator of framed messages over a generic buffer. -pub struct Framed<'a, F, B> { - framer: &'a mut F, - buffer: &'a mut B, - is_eof: bool, -} - -impl Iterator for Framed<'_, F, B> +impl Framer for &F where F: Framer, - B: ReadIoBuffer, -{ - type Item = Result; - - fn next(&mut self) -> Option { - self.framer.next_frame(self.buffer, self.is_eof).transpose() - } -} - -/// Extension trait for ergonomically working with framers and buffers. -pub trait FramerExt { - /// Creates a new `Framed` iterator over the buffer, using the given framer. - /// - /// Returns an iterator that extracts frames from the given buffer, consuming the bytes from the buffer as frames - /// are yielded. - fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self> - where - Self: ReadIoBuffer + Sized, - F: Framer; -} - -impl FramerExt for B -where - B: ReadIoBuffer, { - fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self> { - Framed { - framer, - buffer: self, - is_eof, - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::VecDeque; - - use super::{Framer as _, LengthDelimitedFramer, NestedFramer, NewlineFramer}; - - #[test] - fn nested_framer_single_outer_multiple_inner() { - let input_frames = &[b"frame1", b"frame2", b"frame3"]; - - // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as - // the inner layer. - let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer); - - // Create a buffer that has a single length-delimited frame with three newline-delimited frames inside of that. - let mut inner_frames = Vec::new(); - - for inner_frame_data in input_frames { - inner_frames.extend_from_slice(&inner_frame_data[..]); - inner_frames.push(b'\n'); - } - - let mut buf = VecDeque::new(); - buf.extend(&(inner_frames.len() as u32).to_le_bytes()); - buf.extend(inner_frames); - - // Now we should be able to extract our original three frames from the buffer. - for input_frame in input_frames { - let frame = framer - .next_frame(&mut buf, false) - .expect("should not fail to read from payload") - .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], &input_frame[..]); - } - - let maybe_frame = framer - .next_frame(&mut buf, false) - .expect("should not fail to read from payload"); - assert!(maybe_frame.is_none()); - - // We should have consumed the entire buffer. - assert!(buf.is_empty()); - } - - #[test] - fn nested_framer_multiple_outer_single_inner() { - let input_frames = &[b"frame1", b"frame2", b"frame3"]; - - // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as - // the inner layer. - let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer); - - // Create a buffer that has a three length-delimited frames with a single newline-delimited frame inside. - let mut buf = VecDeque::new(); - - for inner_frame_data in input_frames { - let mut inner_frame = Vec::new(); - inner_frame.extend_from_slice(&inner_frame_data[..]); - inner_frame.push(b'\n'); - - buf.extend(&(inner_frame.len() as u32).to_le_bytes()); - buf.extend(inner_frame); - } - - // Now we should be able to extract our original three frames from the buffer. - for input_frame in input_frames { - let frame = framer - .next_frame(&mut buf, false) - .expect("should not fail to read from payload") - .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], &input_frame[..]); - } - - let maybe_frame = framer - .next_frame(&mut buf, false) - .expect("should not fail to read from payload"); - assert!(maybe_frame.is_none()); - - // We should have consumed the entire buffer. - assert!(buf.is_empty()); - } - - #[test] - fn nested_framer_multiple_outer_multiple_inner() { - let input_frames = &[b"frame1", b"frame2", b"frame3", b"frame4", b"frame5", b"frame6"]; - - // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as - // the inner layer. - let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer); - - // Create a buffer that has a three length-delimited frames with two newline-delimited frames inside. - let mut buf = VecDeque::new(); - - for inner_frame_data in input_frames.chunks(2) { - let mut inner_frames = Vec::new(); - inner_frames.extend_from_slice(&inner_frame_data[0][..]); - inner_frames.push(b'\n'); - inner_frames.extend_from_slice(&inner_frame_data[1][..]); - inner_frames.push(b'\n'); - - buf.extend(&(inner_frames.len() as u32).to_le_bytes()); - buf.extend(inner_frames); - } - - // Now we should be able to extract our original six frames from the buffer. - for input_frame in input_frames { - let frame = framer - .next_frame(&mut buf, false) - .expect("should not fail to read from payload") - .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], &input_frame[..]); - } - - let maybe_frame = framer - .next_frame(&mut buf, false) - .expect("should not fail to read from payload"); - assert!(maybe_frame.is_none()); - - // We should have consumed the entire buffer. - assert!(buf.is_empty()); + fn next_frame<'buf>(&self, buf: &mut &'buf [u8], is_eof: bool) -> Result, FramingError> { + (**self).next_frame(buf, is_eof) } } diff --git a/lib/saluki-io/src/deser/framing/newline.rs b/lib/saluki-io/src/deser/framing/newline.rs index f13ea2581c..3fa97c6a9e 100644 --- a/lib/saluki-io/src/deser/framing/newline.rs +++ b/lib/saluki-io/src/deser/framing/newline.rs @@ -1,8 +1,6 @@ -use bytes::Bytes; use tracing::trace; use super::{Framer, FramingError}; -use crate::buf::ReadIoBuffer; /// Frames incoming data by splitting on newlines. /// @@ -30,24 +28,19 @@ impl NewlineFramer { } impl Framer for NewlineFramer { - fn next_frame<'a, B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result, FramingError> { - trace!(buf_len = buf.remaining(), "Processing buffer."); + fn next_frame<'buf>(&self, buf: &mut &'buf [u8], is_eof: bool) -> Result, FramingError> { + trace!(buf_len = buf.len(), "Processing buffer."); - let chunk = buf.chunk(); - if chunk.is_empty() { + if buf.is_empty() { return Ok(None); } - trace!(chunk_len = chunk.len(), "Processing chunk."); - // Search through the buffer for our delimiter. - match find_newline(chunk) { + match find_newline(buf) { Some(idx) => { // If we found the delimiter, then we can return the frame. - let frame = buf.copy_to_bytes(idx); - - // Advance the buffer past the delimiter. - buf.advance(1); + let mut frame = buf.split_off(..idx + 1).unwrap(); + frame = &frame[..frame.len() - 1]; Ok(Some(frame)) } @@ -59,14 +52,10 @@ impl Framer for NewlineFramer { // If we're at EOF and we require the delimiter, then this is an invalid frame. if self.required_on_eof { - return Err(missing_delimiter_err(chunk.len())); + return Err(missing_delimiter_err(buf.len())); } - // TODO: This is a bit inefficient, as we're copying the entire frame here. We could potentially avoid - // this by adding some specialized trait methods to `ReadIoBuffer` that could let us, potentially, - // implement equivalent slicing that is object pool aware (i.e., somehow utilizing `FrozenBytesBuffer`, - // etc). - Ok(Some(buf.copy_to_bytes(chunk.len()))) + Ok(Some(buf.split_off(0..).unwrap())) } } } @@ -85,15 +74,13 @@ fn find_newline(haystack: &[u8]) -> Option { #[cfg(test)] mod tests { - use std::collections::VecDeque; - use super::*; - fn get_delimited_payload(inner: &[u8], with_newline: bool) -> VecDeque { - let mut payload = VecDeque::new(); + fn get_delimited_payload(inner: &[u8], with_newline: bool) -> Vec { + let mut payload = Vec::new(); payload.extend(inner); if with_newline { - payload.push_back(b'\n'); + payload.push(b'\n'); } payload @@ -102,78 +89,74 @@ mod tests { #[test] fn newline_no_eof() { let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, true); - - let mut framer = NewlineFramer::default(); + let buf = get_delimited_payload(payload, true); + let mut src = &buf[..]; + let framer = NewlineFramer::default(); let frame = framer - .next_frame(&mut buf, false) + .next_frame(&mut src, false) .expect("should not fail to read from payload") .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], payload); - assert!(buf.is_empty()); + assert_eq!(frame, payload); + assert!(src.is_empty(), "frame should consume entire buffer"); } #[test] fn no_newline_no_eof() { let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, false); - let buf_len = buf.len(); - - let mut framer = NewlineFramer::default(); + let buf = get_delimited_payload(payload, false); + let mut src = &buf[..]; + let framer = NewlineFramer::default(); let maybe_frame = framer - .next_frame(&mut buf, false) + .next_frame(&mut src, false) .expect("should not fail to read from payload"); assert_eq!(maybe_frame, None); - assert_eq!(buf.len(), buf_len); } #[test] fn newline_eof() { let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, true); - - let mut framer = NewlineFramer::default(); + let buf = get_delimited_payload(payload, true); + let mut src = &buf[..]; + let framer = NewlineFramer::default(); let frame = framer - .next_frame(&mut buf, true) + .next_frame(&mut src, true) .expect("should not fail to read from payload") .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], payload); - assert!(buf.is_empty()); + assert_eq!(frame, payload); + assert!(src.is_empty(), "frame should consume entire buffer"); } #[test] fn no_newline_eof_not_required_on_eof() { let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, false); - - let mut framer = NewlineFramer::default(); + let buf = get_delimited_payload(payload, false); + let mut src = &buf[..]; + let framer = NewlineFramer::default(); let frame = framer - .next_frame(&mut buf, true) + .next_frame(&mut src, true) .expect("should not fail to read from payload") .expect("should not fail to extract frame from payload"); - assert_eq!(&frame[..], payload); - assert!(buf.is_empty()); + assert_eq!(frame, payload); + assert!(src.is_empty(), "frame should consume entire buffer"); } #[test] fn no_newline_eof_required_on_eof() { let payload = b"hello, world!"; - let mut buf = get_delimited_payload(payload, false); - let buf_len = buf.len(); - - let mut framer = NewlineFramer::default().required_on_eof(true); + let buf = get_delimited_payload(payload, false); + let mut src = &buf[..]; - let maybe_frame = framer.next_frame(&mut buf, true); + let framer = NewlineFramer::default().required_on_eof(true); + let maybe_frame = framer.next_frame(&mut src, true); - assert_eq!(maybe_frame, Err(missing_delimiter_err(buf_len))); - assert_eq!(buf.len(), buf_len); + assert_eq!(maybe_frame, Err(missing_delimiter_err(buf.len()))); } }