From 80d041bfdf1ee732b95ecf28287f4380aa05b215 Mon Sep 17 00:00:00 2001 From: Oleksii Habrusiev Date: Tue, 23 Jun 2026 10:20:31 -0700 Subject: [PATCH] lore-storage: Batch FastCDC chunking to cut per-chunk overhead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit write_fragmented called FastCDC::cut once per chunk, shipping the work to the compute pool, awaiting the result, and repeating. For a 1 GiB buffer with ~4 KiB chunks that is ~256k spawn dispatches, oneshot allocations and await yields — overhead that can rival the chunking work itself for small remaining chunks. Compute all FastCDC boundaries in a single compute_pool task by driving the FastCDC Iterator to completion. Fixed-size chunking has trivial per-step math so its boundaries are computed inline. The single-fragment fast path and the storage dispatch loop are unchanged. Verified by a new parity test that compares the batched boundaries against a reference FastCDC iteration on a 256 KiB random buffer, plus edge cases for empty, sub-min-size, all-zero, and non-aligned fixed-size inputs. Signed-off-by: Oleksii Habrusiev --- lore-storage/src/fragment_engine.rs | 199 +++++++++++++++++++++------- 1 file changed, 150 insertions(+), 49 deletions(-) diff --git a/lore-storage/src/fragment_engine.rs b/lore-storage/src/fragment_engine.rs index 4a88e26..3f779a0 100644 --- a/lore-storage/src/fragment_engine.rs +++ b/lore-storage/src/fragment_engine.rs @@ -24,12 +24,45 @@ use crate::types::FragmentReference; use crate::types::Partition; use crate::write::store_fragment; -/// Splits content into fragments using `FastCDC` and stores them via -/// [`store_fragment`]. -/// -/// For each chunk the function calls [`store_fragment`] directly with the -/// provided store, partition, and optional remote session. This replaces the -/// previous closure-based `store_fn` pattern. +/// Figure out where to cut `buffer` into chunks, all in one go. +async fn chunk_boundaries( + buffer: Bytes, + fixed_size_chunk: usize, +) -> Result, StorageError> { + let size = buffer.len(); + if fixed_size_chunk > 0 { + let step = fixed_size_chunk; + Ok((0..size) + .step_by(step) + .map(|offset| (offset, (offset + step).min(size))) + .collect()) + } else { + let chunker = { + // SAFETY: buffer lives until after the chunker finishes, so the reference is safe. + let slice: &[u8] = unsafe { extend_lifetime(buffer.as_ref()) }; + fastcdc::v2020::FastCDC::with_level( + slice, + FRAGMENT_SIZE_MINIMUM as u32, + FRAGMENT_SIZE_EXPECTED as u32, + FRAGMENT_SIZE_THRESHOLD as u32, + fastcdc::v2020::Normalization::Level1, + ) + }; + let (tx, rx) = tokio::sync::oneshot::channel(); + lore_base::runtime::compute_pool().spawn(move || { + let _ = tx.send(chunker.collect::>()); + }); + let chunks = rx + .await + .map_err(|e| StorageError::internal_with_context(e, "chunker task failed"))?; + Ok(chunks + .into_iter() + .map(|c| (c.offset, c.offset + c.length)) + .collect()) + } +} + +/// Cuts `buffer` into chunks and stores each one via [`store_fragment`]. #[allow(clippy::too_many_arguments)] pub async fn write_fragmented( store: Arc, @@ -41,48 +74,19 @@ pub async fn write_fragmented( remote_session: Option>, tracker: Option>, ) -> Result<(Address, Fragment), StorageError> { - // Raw data, use content defined chunking with FastCDC + // Cut the file into chunks so we can store each piece. let size = buffer.len(); - let mut chunk_offset = 0usize; let mut tasks = JoinSet::>::new(); - let mut chunk_index = 0usize; - - let chunker = Arc::new({ - // SAFETY: we await on all spawned tasks using this chunker before buffer - // is dropped, so the lifetime will not escape the scope of the buffer. - let buffer: &[u8] = unsafe { extend_lifetime(buffer.as_ref()) }; - fastcdc::v2020::FastCDC::with_level( - buffer, - FRAGMENT_SIZE_MINIMUM as u32, - FRAGMENT_SIZE_EXPECTED as u32, - FRAGMENT_SIZE_THRESHOLD as u32, - fastcdc::v2020::Normalization::Level1, - ) - }); lore_base::lore_trace!( "Write and fragment buffer to immutable store: {size} bytes representing {size} bytes (flags {flags:?})", ); - while chunk_offset < size { - let chunk_remain = size - chunk_offset; - let chunk_end = if flags.fixed_size_chunk > 0 { - chunk_offset + std::cmp::min(flags.fixed_size_chunk, chunk_remain) - } else { - // FastCDC chunking is CPU-bound (rolling hash over the buffer). - // Run it on the shared compute pool alongside compression so it - // doesn't contend with blocking IO on tokio's blocking pool. - let chunker = chunker.clone(); - let (tx, rx) = tokio::sync::oneshot::channel(); - lore_base::runtime::compute_pool().spawn(move || { - let _ = tx.send(chunker.cut(chunk_offset, chunk_remain)); - }); - let (_, chunk_end) = rx - .await - .map_err(|e| StorageError::internal_with_context(e, "chunker task failed"))?; - chunk_end - }; + + let chunk_boundaries = chunk_boundaries(buffer.clone(), flags.fixed_size_chunk).await?; + + for (chunk_index, (chunk_offset, chunk_end)) in chunk_boundaries.into_iter().enumerate() { let chunk_size = chunk_end - chunk_offset; - let chunk_buffer = buffer.slice(chunk_offset..(chunk_offset + chunk_size)); + let chunk_buffer = buffer.slice(chunk_offset..chunk_end); let fragment = Fragment { flags: flags.into(), @@ -91,7 +95,7 @@ pub async fn write_fragmented( }; if chunk_offset == 0 && chunk_size == size { - // Everything was put in a single fragment + // Whole file fits in one chunk. let chunk_buffer = if flags.clone_buffer { Bytes::copy_from_slice(chunk_buffer.as_ref()) } else { @@ -146,12 +150,7 @@ pub async fn write_fragmented( }; Ok((chunk_index, chunk_offset, chunk_address)) }); - - chunk_offset += chunk_size; - chunk_index += 1; } - drop(chunker); - drop(buffer); let mut list_buffer = BytesMut::with_capacity(tasks.len() * std::mem::size_of::()); @@ -391,11 +390,113 @@ pub fn write_fragmentlist( )) } -/// Unsafe extension of lifetime. Caller must guarantee the reference outlives -/// all uses. Used for `FastCDC`'s borrow of the buffer slice. +/// Pretends a short-lived reference lives forever. Caller must make sure it really does. pub(crate) unsafe fn extend_lifetime(data: &T) -> &'static T where T: ?Sized, { unsafe { &*(data as *const T) } } + +#[cfg(test)] +mod tests { + use bytes::Bytes; + + use super::*; + + /// Chunk a buffer using the standard `FastCDC` iterator, used to check our new way gives the same answer. + fn reference_fastcdc_boundaries(buffer: &[u8]) -> Vec<(usize, usize)> { + let chunker = fastcdc::v2020::FastCDC::with_level( + buffer, + FRAGMENT_SIZE_MINIMUM as u32, + FRAGMENT_SIZE_EXPECTED as u32, + FRAGMENT_SIZE_THRESHOLD as u32, + fastcdc::v2020::Normalization::Level1, + ); + chunker.map(|c| (c.offset, c.offset + c.length)).collect() + } + + /// Make a buffer of random bytes so chunking has to actually cut it. + fn mixed_pattern_buffer(size: usize) -> Bytes { + use rand::Rng; + let mut data = vec![0u8; size]; + rand::rng().fill(&mut data[..]); + Bytes::from(data) + } + + #[tokio::test] + async fn fastcdc_batch_matches_reference() { + let buffer = mixed_pattern_buffer(256 * 1024); + let expected = reference_fastcdc_boundaries(&buffer); + assert!( + expected.len() > 1, + "test buffer should produce multiple chunks, got {}", + expected.len() + ); + + let actual = chunk_boundaries(buffer, 0) + .await + .expect("chunking succeeds"); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn fastcdc_batch_handles_buffer_smaller_than_min_chunk() { + // Tiny buffer — should be one chunk covering the whole thing. + let buffer = mixed_pattern_buffer(1024); + let actual = chunk_boundaries(buffer.clone(), 0) + .await + .expect("chunking succeeds"); + assert_eq!(actual, vec![(0, buffer.len())]); + } + + #[tokio::test] + async fn fastcdc_batch_handles_empty_buffer() { + let buffer = Bytes::new(); + let actual = chunk_boundaries(buffer, 0) + .await + .expect("chunking succeeds"); + assert!(actual.is_empty()); + } + + #[tokio::test] + async fn fastcdc_batch_handles_pathological_all_zero_buffer() { + // All-zero input — the rolling hash never matches, but the split should still be clean. + let buffer = Bytes::from(vec![0u8; 512 * 1024]); + let actual = chunk_boundaries(buffer.clone(), 0) + .await + .expect("chunking succeeds"); + let reconstructed_size: usize = actual.iter().map(|(s, e)| e - s).sum(); + assert_eq!(reconstructed_size, buffer.len()); + assert!(actual.iter().all(|&(s, e)| s < e)); + assert_eq!(actual.first().copied(), Some((0, actual[0].1))); + assert_eq!(actual.last().unwrap().1, buffer.len()); + } + + #[tokio::test] + async fn fixed_size_chunking_covers_whole_buffer() { + let buffer = mixed_pattern_buffer(10 * 1024 + 17); + let step = 1024; + let actual = chunk_boundaries(buffer.clone(), step) + .await + .expect("chunking succeeds"); + + let expected: Vec<(usize, usize)> = (0..buffer.len()) + .step_by(step) + .map(|o| (o, (o + step).min(buffer.len()))) + .collect(); + assert_eq!(actual, expected); + + let reconstructed_size: usize = actual.iter().map(|(s, e)| e - s).sum(); + assert_eq!(reconstructed_size, buffer.len()); + } + + #[tokio::test] + async fn fixed_size_chunking_handles_buffer_smaller_than_step() { + let buffer = Bytes::from(vec![1u8; 100]); + let actual = chunk_boundaries(buffer.clone(), 1024) + .await + .expect("chunking succeeds"); + assert_eq!(actual, vec![(0, buffer.len())]); + } +}