From c4f42a84b1e2eaa591f10439160d1d67cdaac5f7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 28 May 2026 15:30:06 -0400 Subject: [PATCH 1/2] Move ToStreamBuilder's container builder to method generic (#792) * Move ToStreamBuilder's container builder to method generic Generic-over-Item trait with method-level CB enables method-call syntax `(0..3).to_stream_with_builder::<_, CB>(scope)` instead of the UFCS form `ToStreamBuilder::::to_stream_with_builder(...)`. Co-Authored-By: Claude Opus 4.7 (1M context) * Use associated type for ToStreamBuilder item Per review feedback: each implementor of `ToStreamBuilder` has exactly one item type (`I::Item` for the blanket impl), so an associated type fits better than a trait-level generic. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 4 ++++ .../src/dataflow/operators/core/to_stream.rs | 23 +++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d11c74a6..8887ceccf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Breaking changes + +- `ToStreamBuilder` exposes the item type via the `Item` associated type instead of a trait-level generic, and the container builder moves to a method-level generic. This enables method-call syntax: `(0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)` instead of the UFCS form `ToStreamBuilder::>::to_stream_with_builder(0..3, scope)`. + ## [0.29.0](https://github.com/TimelyDataflow/timely-dataflow/compare/timely-v0.28.1...timely-v0.29.0) - 2026-04-13 The theme in this release is simplifying specialization by removing monomorphization sprawl. diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index ef7b7648a..f2ca61eb8 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -7,7 +7,10 @@ use crate::dataflow::operators::generic::operator::source; use crate::dataflow::{Stream, Scope}; /// Converts to a timely [Stream], using a container builder. -pub trait ToStreamBuilder { +pub trait ToStreamBuilder { + /// The item type produced by this iterator-like source. + type Item; + /// Converts to a timely [Stream], using the supplied container builder type. /// /// # Examples @@ -18,10 +21,10 @@ pub trait ToStreamBuilder { /// use timely::container::CapacityContainerBuilder; /// /// let (data1, data2) = timely::example(|scope| { - /// let data1 = ToStreamBuilder::>::to_stream_with_builder(0..3, scope) + /// let data1 = (0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope) /// .container::>() /// .capture(); - /// let data2 = ToStreamBuilder::>::to_stream_with_builder(vec![0,1,2], scope) + /// let data2 = vec![0,1,2].to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope) /// .container::>() /// .capture(); /// (data1, data2) @@ -29,12 +32,18 @@ pub trait ToStreamBuilder { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>; + fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> + where + CB: PushInto; } -impl ToStreamBuilder for I where CB: PushInto { - fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> { +impl ToStreamBuilder for I { + type Item = I::Item; + fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> + where + CB: PushInto + { source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| { // Acquire an activator, so that the operator can rescheduled itself. @@ -84,6 +93,6 @@ pub trait ToStream { impl ToStream for I where C: PushInto { fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> { - ToStreamBuilder::>::to_stream_with_builder(self, scope) + ToStreamBuilder::to_stream_with_builder::<_, CapacityContainerBuilder>(self, scope) } } From bcc74cb8e347da09d67e9e34cd2e982eff793f99 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 28 May 2026 16:44:47 -0400 Subject: [PATCH 2/2] Correct soundness hole, add Sync bound (#800) --- CHANGELOG.md | 4 ++++ bytes/src/lib.rs | 15 +++++++++++++-- communication/examples/lgalloc.rs | 6 +++++- .../src/allocator/zero_copy/bytes_slab.rs | 4 ++-- communication/src/initialize.rs | 2 +- .../src/dataflow/operators/core/capture/event.rs | 2 +- 6 files changed, 26 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8887ceccf..699bb6463 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `ToStreamBuilder` exposes the item type via the `Item` associated type instead of a trait-level generic, and the container builder moves to a method-level generic. This enables method-call syntax: `(0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)` instead of the UFCS form `ToStreamBuilder::>::to_stream_with_builder(0..3, scope)`. +### `Bytes` is now `Sync`, and byte buffers must be `Send` + +`timely_bytes::arc::Bytes` now implements `Sync` in addition to `Send`. To make both impls sound, `BytesMut::from` now requires its payload to be `Send`. This is a breaking change to `timely_communication`: `BytesRefill::logic` now produces `Box + Send>` rather than `Box>`. Custom refills whose buffer type wraps a raw pointer (e.g. `NonNull`) must assert `unsafe impl Send` on that type. + ## [0.29.0](https://github.com/TimelyDataflow/timely-dataflow/compare/timely-v0.28.1...timely-v0.29.0) - 2026-04-13 The theme in this release is simplifying specialization by removing monomorphization sprawl. diff --git a/bytes/src/lib.rs b/bytes/src/lib.rs index 2d371b4d3..4c41b52e4 100644 --- a/bytes/src/lib.rs +++ b/bytes/src/lib.rs @@ -60,7 +60,7 @@ pub mod arc { impl BytesMut { /// Create a new instance from a byte allocation. - pub fn from(bytes: B) -> BytesMut where B : DerefMut+'static { + pub fn from(bytes: B) -> BytesMut where B : DerefMut+Send+'static { // Sequester allocation behind an `Arc`, which *should* keep the address // stable for the lifetime of `sequestered`. The `Arc` also serves as our @@ -187,9 +187,20 @@ pub mod arc { } // Synchronization happens through `self.sequestered`, which means to ensure that even - // across multiple threads the referenced range of bytes remain valid. + // across multiple threads the referenced range of bytes remains valid. unsafe impl Send for Bytes { } + // `Sync` holds because everything reachable through `&Bytes` is read-only or atomic: + // `Deref` yields `&[u8]` (and `u8: Sync`), the mutating methods take `&mut self`, and + // cloning only touches the atomic `Arc` refcount. There is no interior mutability and + // no path to a `&mut` from a shared reference. + // + // Note this requires only that the sequestered payload `B` be `Send` (enforced by + // `BytesMut::from`), not `Sync`: `B` is never exposed by reference, so it is never + // shared across threads. The only cross-thread use of `B` is its destructor, which may + // run on whichever thread drops the last `Arc` clone -- and that needs `Send`, not `Sync`. + unsafe impl Sync for Bytes { } + impl Bytes { /// Extracts [0, index) into a new `Bytes` which is returned, updating `self`. diff --git a/communication/examples/lgalloc.rs b/communication/examples/lgalloc.rs index 3f805d95e..f542ba668 100644 --- a/communication/examples/lgalloc.rs +++ b/communication/examples/lgalloc.rs @@ -34,6 +34,10 @@ mod example { Box::new(LgallocHandle { handle, pointer, capacity }) } + // `LgallocHandle` wraps a `NonNull`, which is `!Send`; lgalloc allocations are safe to + // move and deallocate across threads, so we assert `Send` to permit sharing as `Bytes`. + unsafe impl Send for LgallocHandle { } + struct LgallocHandle { handle: Option, pointer: NonNull, @@ -67,7 +71,7 @@ mod example { lgalloc::lgalloc_set_config(&lgconfig); let refill = BytesRefill { - logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box>), + logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box+Send>), limit: None, }; diff --git a/communication/src/allocator/zero_copy/bytes_slab.rs b/communication/src/allocator/zero_copy/bytes_slab.rs index e7eaac4b9..2eec454e2 100644 --- a/communication/src/allocator/zero_copy/bytes_slab.rs +++ b/communication/src/allocator/zero_copy/bytes_slab.rs @@ -21,7 +21,7 @@ pub struct BytesSlab { #[derive(Clone)] pub struct BytesRefill { /// Logic to acquire a new buffer of a certain number of bytes. - pub logic: std::sync::Arc Box>+Send+Sync>, + pub logic: std::sync::Arc Box+Send>+Send+Sync>, /// An optional limit on the number of empty buffers retained. pub limit: Option, } @@ -111,7 +111,7 @@ impl BytesSlab { /// A wrapper for `Box>` that dereferences to `T` rather than `dyn DerefMut`. struct BoxDerefMut { - boxed: Box+'static>, + boxed: Box+Send+'static>, } impl Deref for BoxDerefMut { diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index dcd856917..e99f51676 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -77,7 +77,7 @@ impl Default for Hooks { Self { log_fn: Arc::new(|_| None), refill: BytesRefill { - logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box>), + logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box+Send>), limit: None, }, spill: None, diff --git a/timely/src/dataflow/operators/core/capture/event.rs b/timely/src/dataflow/operators/core/capture/event.rs index 3bd68b9c1..d43a1e51c 100644 --- a/timely/src/dataflow/operators/core/capture/event.rs +++ b/timely/src/dataflow/operators/core/capture/event.rs @@ -264,7 +264,7 @@ pub mod binary { pub fn new(r: R) -> Self { let refill = BytesRefill { logic: Arc::new(|size| { - Box::new(vec![0_u8; size]) as Box> + Box::new(vec![0_u8; size]) as Box + Send> }), limit: None, };