Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ bytes = "1.11"
static_assertions = "1.1"
thiserror = "2.0.9"
# cdr-encoding = { path = "../cdr-encoding"}
cdr-encoding = { version="0.10" }
# cdr-encoding = { version="0.10" }
cdr-encoding = { git = "https://github.com/iblnkn/cdr-encoding/", branch="zero-copy-byte-deserialization" }
cdr-encoding-size = { version="^0.5" }
futures = "0.3"
io-extras = "0.18.0"
Expand Down Expand Up @@ -118,4 +119,4 @@ termion = "4.0.2"


[target.'cfg(target_os = "linux")'.dev-dependencies]
procfs = "0.17" # for ddsperf
procfs = "0.17" # for ddsperf
63 changes: 37 additions & 26 deletions src/dds/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,17 @@ pub mod no_key {
///
/// `encoding` must be something given by `supported_encodings()`, or
/// implementation may fail with Err or `panic!()`.
fn from_bytes_with<S>(
input_bytes: &[u8],
///
/// The input slice's lifetime is tied to the deserialization lifetime
/// `'de`, so adapters that produce borrowed data (zero-copy) are
/// supported.
fn from_bytes_with<'de, S>(
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
decoder: S,
) -> Result<D, S::Error>
where
S: Decode<Self::Decoded>,
S: Decode<'de, Self::Decoded>,
{
decoder
.decode_bytes(input_bytes, encoding)
Expand All @@ -158,8 +162,9 @@ pub mod no_key {
/// Type of the default decoder.
///
/// The default decoder needs to be clonable to be usable for async stream
/// creation (as it's needed multiple times).
type Decoder: Decode<Self::Decoded, Error = Self::Error> + Clone;
/// creation (as it's needed multiple times). It must work for any input
/// lifetime, hence the higher-ranked `for<'de>` bound.
type Decoder: for<'de> Decode<'de, Self::Decoded, Error = Self::Error> + Clone;

/// The default decoder value.
///
Expand All @@ -168,21 +173,23 @@ pub mod no_key {
const DECODER: Self::Decoder;
}

/// The trait `Decode` defines a decoder object that produced a value of type
/// `Dec` from a slice of bytes and a [`RepresentationIdentifier`].
/// The trait `Decode` defines a decoder object that produces a value of type
/// `Decoded` from a slice of bytes and a [`RepresentationIdentifier`].
///
/// Note that `Decoded` maps to associated type `Decoded` in
/// `DeserializerAdapter`, not `D`.
///
/// Note
/// that `Decoded` maps to associated type `Decoded` in
/// `DeserializerAdapter` , not `D`.
pub trait Decode<Decoded> {
/// The lifetime `'de` is the deserialization lifetime: the returned
/// `Decoded` value may borrow from `input_bytes` for `'de`.
pub trait Decode<'de, Decoded> {
/// The decoding error type returned by [`Self::decode_bytes`].
type Error: std::error::Error;

/// Tries to decode the given byte slice to a value of type `D` using the
/// given encoding.
/// Tries to decode the given byte slice to a value of type `Decoded`
/// using the given encoding.
fn decode_bytes(
self,
input_bytes: &[u8],
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<Decoded, Self::Error>;
}
Expand Down Expand Up @@ -242,13 +249,13 @@ pub mod with_key {
///
/// `encoding` must be something given by `supported_encodings()`, or
/// implementation may fail with Err or `panic!()`.
fn key_from_bytes_with<S>(
input_bytes: &[u8],
fn key_from_bytes_with<'de, S>(
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
decoder: S,
) -> Result<D::K, S::Error>
where
S: Decode<Self::Decoded, Self::DecodedKey>,
S: Decode<'de, Self::Decoded, Self::DecodedKey>,
{
decoder
.decode_key_bytes(input_bytes, encoding)
Expand Down Expand Up @@ -280,8 +287,9 @@ pub mod with_key {
/// Type of the default decoder.
///
/// The default decoder needs to be clonable to be usable for async stream
/// creation (as it's needed multiple times).
type Decoder: Decode<Self::Decoded, Self::DecodedKey, Error = Self::Error> + Clone;
/// creation (as it's needed multiple times). It must work for any input
/// lifetime, hence the higher-ranked `for<'de>` bound.
type Decoder: for<'de> Decode<'de, Self::Decoded, Self::DecodedKey, Error = Self::Error> + Clone;

/// The default decoder value.
///
Expand All @@ -290,15 +298,18 @@ pub mod with_key {
const DECODER: Self::Decoder;
}

/// Decodes a value of type `Dec` from a slice of bytes and a
/// [`RepresentationIdentifier`]. Note that `Dec` maps to associated type
/// `Decoded` in `DeserializerAdapter` , not `D`.
pub trait Decode<Dec, DecKey>: no_key::Decode<Dec> {
/// Tries to decode the given byte slice to a value of type `D` using the
/// given encoding.
/// Decodes a value of type `Dec` (or its key `DecKey`) from a slice of
/// bytes and a [`RepresentationIdentifier`]. Note that `Dec` maps to
/// associated type `Decoded` in `DeserializerAdapter`, not `D`.
///
/// The lifetime `'de` is the deserialization lifetime: the returned key
/// may borrow from `input_key_bytes` for `'de`.
pub trait Decode<'de, Dec, DecKey>: no_key::Decode<'de, Dec> {
/// Tries to decode the given byte slice to a value of type `DecKey`
/// using the given encoding.
fn decode_key_bytes(
self,
input_key_bytes: &[u8],
input_key_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<DecKey, Self::Error>;
}
Expand Down
4 changes: 2 additions & 2 deletions src/dds/no_key/simpledatareader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where

pub fn try_take_one_with<S>(&self, decoder: S) -> ReadResult<Option<DeserializedCacheChange<D>>>
where
S: Decode<DA::Decoded> + Clone,
S: for<'de> Decode<'de, DA::Decoded, Error = DA::Error> + Clone,
{
match self
.keyed_simpledatareader
Expand Down Expand Up @@ -101,7 +101,7 @@ where
decoder: S,
) -> impl FusedStream<Item = ReadResult<DeserializedCacheChange<D>>> + 'a
where
S: Decode<DA::Decoded> + Clone + 'a,
S: for<'de> Decode<'de, DA::Decoded, Error = DA::Error> + Clone + 'a,
{
self
.keyed_simpledatareader
Expand Down
12 changes: 6 additions & 6 deletions src/dds/no_key/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ impl<NoKeyDecode> DecodeWrapper<NoKeyDecode> {

// re-implement no_key::Decode<Decoded> for the wrapper also. Wrapped type
// already does it for us.
impl<Decoded, NoKeyDecode> no_key::Decode<Decoded> for DecodeWrapper<NoKeyDecode>
impl<'de, Decoded, NoKeyDecode> no_key::Decode<'de, Decoded> for DecodeWrapper<NoKeyDecode>
where
NoKeyDecode: no_key::Decode<Decoded>,
NoKeyDecode: no_key::Decode<'de, Decoded>,
{
type Error = NoKeyDecode::Error;

fn decode_bytes(
self,
input_bytes: &[u8],
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<Decoded, Self::Error> {
self.no_key.decode_bytes(input_bytes, encoding)
Expand All @@ -162,13 +162,13 @@ where
// implement with_key::Decode<Decoded> for the wrapper.
// The key has type `()`, so the decoded value is always `()` regardless of the
// input bytes.
impl<Decoded, NoKeyDecode> with_key::Decode<Decoded, ()> for DecodeWrapper<NoKeyDecode>
impl<'de, Decoded, NoKeyDecode> with_key::Decode<'de, Decoded, ()> for DecodeWrapper<NoKeyDecode>
where
NoKeyDecode: no_key::Decode<Decoded>,
NoKeyDecode: no_key::Decode<'de, Decoded>,
{
fn decode_key_bytes(
self,
_input_key_bytes: &[u8],
_input_key_bytes: &'de [u8],
_encoding: RepresentationIdentifier,
) -> Result<(), Self::Error> {
Ok(())
Expand Down
16 changes: 8 additions & 8 deletions src/dds/with_key/simpledatareader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ where
decoder: S,
) -> ReadResult<DeserializedCacheChange<D>>
where
S: Decode<DA::Decoded, DA::DecodedKey>,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error>,
{
match cc.data_value {
DDSData::Data {
Expand Down Expand Up @@ -342,7 +342,7 @@ where
#[allow(clippy::needless_pass_by_value)]
pub fn try_take_one_with<S>(&self, decoder: S) -> ReadResult<Option<DeserializedCacheChange<D>>>
where
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error> + Clone,
{
let is_reliable = matches!(
self.qos_policy.reliability(),
Expand Down Expand Up @@ -414,14 +414,14 @@ where
where
DA: DefaultDecoder<D, Decoder = S>,
DA::Decoder: Clone,
S: Decode<DA::Decoded, DA::DecodedKey>,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error>,
{
Self::as_async_stream_with(self, DA::DECODER)
}

pub fn as_async_stream_with<S>(&self, decoder: S) -> SimpleDataReaderStream<'_, D, S, DA>
where
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error> + Clone,
{
SimpleDataReaderStream {
simple_datareader: self,
Expand Down Expand Up @@ -553,7 +553,7 @@ where
pub struct SimpleDataReaderStream<
'a,
D: Keyed + 'static,
S: Decode<DA::Decoded, DA::DecodedKey>,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error>,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
simple_datareader: &'a SimpleDataReader<D, DA>,
Expand All @@ -568,15 +568,15 @@ impl<D, S, DA> Unpin for SimpleDataReaderStream<'_, D, S, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
S: Decode<DA::Decoded, DA::DecodedKey> + Unpin,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error> + Unpin,
{
}

impl<D, S, DA> Stream for SimpleDataReaderStream<'_, D, S, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error> + Clone,
{
type Item = ReadResult<DeserializedCacheChange<D>>;

Expand Down Expand Up @@ -633,7 +633,7 @@ impl<D, S, DA> FusedStream for SimpleDataReaderStream<'_, D, S, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
S: for<'de> Decode<'de, DA::Decoded, DA::DecodedKey, Error = DA::Error> + Clone,
{
fn is_terminated(&self) -> bool {
false // Never terminate. This means it is always valid to call poll_next().
Expand Down
34 changes: 18 additions & 16 deletions src/serialization/cdr_adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ where
}
}

/// A default decoder is available for all types that implement
/// `serde::Deserialize`.
impl<'de, D> no_key::DefaultDecoder<D> for CDRDeserializerAdapter<D>
/// A default decoder is available for all owned types that implement
/// `serde::Deserialize`. Borrowing types use `from_bytes_with` with an
/// explicit decoder instead.
impl<D> no_key::DefaultDecoder<D> for CDRDeserializerAdapter<D>
where
D: serde::Deserialize<'de>,
D: DeserializeOwned,
{
type Decoder = CdrDeserializeDecoder<D>;
const DECODER: Self::Decoder = CdrDeserializeDecoder(PhantomData);
Expand All @@ -157,25 +158,25 @@ where
/// Decode type based on a `serde::Deserialize` implementation.
pub struct CdrDeserializeDecoder<D>(PhantomData<D>);

impl<'de, D> no_key::Decode<D> for CdrDeserializeDecoder<D>
impl<'de, D> no_key::Decode<'de, D> for CdrDeserializeDecoder<D>
where
D: serde::Deserialize<'de>,
{
type Error = Error;

fn decode_bytes(self, input_bytes: &[u8], encoding: RepresentationIdentifier) -> Result<D> {
fn decode_bytes(self, input_bytes: &'de [u8], encoding: RepresentationIdentifier) -> Result<D> {
deserialize_from_cdr_with_decoder_and_rep_id(input_bytes, encoding, PhantomData).map(|r| r.0)
}
}

impl<Dec, DecKey> with_key::Decode<Dec, DecKey> for CdrDeserializeDecoder<Dec>
impl<'de, Dec, DecKey> with_key::Decode<'de, Dec, DecKey> for CdrDeserializeDecoder<Dec>
where
Dec: DeserializeOwned,
DecKey: DeserializeOwned,
Dec: serde::Deserialize<'de>,
DecKey: serde::Deserialize<'de>,
{
fn decode_key_bytes(
self,
input_key_bytes: &[u8],
input_key_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<DecKey> {
deserialize_from_cdr_with_decoder_and_rep_id(input_key_bytes, encoding, PhantomData)
Expand Down Expand Up @@ -210,26 +211,27 @@ where
}

/// Decode type based on a [`serde::de::DeserializeSeed`]-based decoder.
impl<'de, D, S, SK> no_key::Decode<D> for CdrDeserializeSeedDecoder<S, SK>
impl<'de, D, S, SK> no_key::Decode<'de, D> for CdrDeserializeSeedDecoder<S, SK>
where
S: serde::de::DeserializeSeed<'de, Value = D>,
{
type Error = Error;

fn decode_bytes(self, input_bytes: &[u8], encoding: RepresentationIdentifier) -> Result<D> {
fn decode_bytes(self, input_bytes: &'de [u8], encoding: RepresentationIdentifier) -> Result<D> {
deserialize_from_cdr_with_decoder_and_rep_id(input_bytes, encoding, self.value_seed)
.map(|r| r.0)
}
}

impl<'de, Dec, DecKey, S, SK> with_key::Decode<Dec, DecKey> for CdrDeserializeSeedDecoder<S, SK>
impl<'de, Dec, DecKey, S, SK> with_key::Decode<'de, Dec, DecKey>
for CdrDeserializeSeedDecoder<S, SK>
where
S: serde::de::DeserializeSeed<'de, Value = Dec>,
SK: serde::de::DeserializeSeed<'de, Value = DecKey>,
{
fn decode_key_bytes(
self,
input_key_bytes: &[u8],
input_key_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<DecKey> {
deserialize_from_cdr_with_decoder_and_rep_id(input_key_bytes, encoding, self.key_seed)
Expand All @@ -241,7 +243,7 @@ where
///
/// Returns deserialized object. Byte count is discarded.
pub fn deserialize_from_cdr_with_rep_id<'de, T>(
input_bytes: &[u8],
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<(T, usize)>
where
Expand All @@ -254,7 +256,7 @@ where
///
/// Returns deserialized object and byte count of stream consumed.
pub fn deserialize_from_cdr_with_decoder_and_rep_id<'de, S>(
input_bytes: &[u8],
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
decoder: S,
) -> Result<(S::Value, usize)>
Expand Down
8 changes: 4 additions & 4 deletions src/serialization/pl_cdr_adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ where
/// Decode type based on [`PlCdrDeserialize`] implementation.
pub struct PlCdrDeserializer<D>(PhantomData<D>);

impl<D> no_key::Decode<D> for PlCdrDeserializer<D>
impl<'de, D> no_key::Decode<'de, D> for PlCdrDeserializer<D>
where
D: PlCdrDeserialize,
{
type Error = PlCdrDeserializeError;

fn decode_bytes(
self,
input_bytes: &[u8],
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<D, Self::Error> {
match encoding {
Expand All @@ -189,14 +189,14 @@ where
}
}

impl<Dec, DecKey> with_key::Decode<Dec, DecKey> for PlCdrDeserializer<Dec>
impl<'de, Dec, DecKey> with_key::Decode<'de, Dec, DecKey> for PlCdrDeserializer<Dec>
where
Dec: PlCdrDeserialize,
DecKey: PlCdrDeserialize,
{
fn decode_key_bytes(
self,
input_bytes: &[u8],
input_bytes: &'de [u8],
encoding: RepresentationIdentifier,
) -> Result<DecKey, Self::Error> {
match encoding {
Expand Down
Loading