Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 150 additions & 49 deletions lore-storage/src/fragment_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,45 @@ use crate::types::FragmentReference;
use crate::types::Partition;
use crate::write::store_fragment;

/// Splits content into fragments using `FastCDC` and stores them via
/// [`store_fragment`].
///
/// For each chunk the function calls [`store_fragment`] directly with the
/// provided store, partition, and optional remote session. This replaces the
/// previous closure-based `store_fn` pattern.
/// Figure out where to cut `buffer` into chunks, all in one go.
async fn chunk_boundaries(
buffer: Bytes,
fixed_size_chunk: usize,
) -> Result<Vec<(usize, usize)>, StorageError> {
let size = buffer.len();
if fixed_size_chunk > 0 {
let step = fixed_size_chunk;
Ok((0..size)
.step_by(step)
.map(|offset| (offset, (offset + step).min(size)))
.collect())
} else {
let chunker = {
// SAFETY: buffer lives until after the chunker finishes, so the reference is safe.
let slice: &[u8] = unsafe { extend_lifetime(buffer.as_ref()) };
fastcdc::v2020::FastCDC::with_level(
slice,
FRAGMENT_SIZE_MINIMUM as u32,
FRAGMENT_SIZE_EXPECTED as u32,
FRAGMENT_SIZE_THRESHOLD as u32,
fastcdc::v2020::Normalization::Level1,
)
};
let (tx, rx) = tokio::sync::oneshot::channel();
lore_base::runtime::compute_pool().spawn(move || {
let _ = tx.send(chunker.collect::<Vec<_>>());
});
let chunks = rx
.await
.map_err(|e| StorageError::internal_with_context(e, "chunker task failed"))?;
Ok(chunks
.into_iter()
.map(|c| (c.offset, c.offset + c.length))
.collect())
}
}

/// Cuts `buffer` into chunks and stores each one via [`store_fragment`].
#[allow(clippy::too_many_arguments)]
pub async fn write_fragmented(
store: Arc<dyn ImmutableStore>,
Expand All @@ -41,48 +74,19 @@ pub async fn write_fragmented(
remote_session: Option<Arc<StorageSession>>,
tracker: Option<Arc<crate::write_tracker::WriteTracker>>,
) -> Result<(Address, Fragment), StorageError> {
// Raw data, use content defined chunking with FastCDC
// Cut the file into chunks so we can store each piece.
let size = buffer.len();
let mut chunk_offset = 0usize;
let mut tasks = JoinSet::<Result<(usize, usize, Address), StorageError>>::new();
let mut chunk_index = 0usize;

let chunker = Arc::new({
// SAFETY: we await on all spawned tasks using this chunker before buffer
// is dropped, so the lifetime will not escape the scope of the buffer.
let buffer: &[u8] = unsafe { extend_lifetime(buffer.as_ref()) };
fastcdc::v2020::FastCDC::with_level(
buffer,
FRAGMENT_SIZE_MINIMUM as u32,
FRAGMENT_SIZE_EXPECTED as u32,
FRAGMENT_SIZE_THRESHOLD as u32,
fastcdc::v2020::Normalization::Level1,
)
});

lore_base::lore_trace!(
"Write and fragment buffer to immutable store: {size} bytes representing {size} bytes (flags {flags:?})",
);
while chunk_offset < size {
let chunk_remain = size - chunk_offset;
let chunk_end = if flags.fixed_size_chunk > 0 {
chunk_offset + std::cmp::min(flags.fixed_size_chunk, chunk_remain)
} else {
// FastCDC chunking is CPU-bound (rolling hash over the buffer).
// Run it on the shared compute pool alongside compression so it
// doesn't contend with blocking IO on tokio's blocking pool.
let chunker = chunker.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
lore_base::runtime::compute_pool().spawn(move || {
let _ = tx.send(chunker.cut(chunk_offset, chunk_remain));
});
let (_, chunk_end) = rx
.await
.map_err(|e| StorageError::internal_with_context(e, "chunker task failed"))?;
chunk_end
};

let chunk_boundaries = chunk_boundaries(buffer.clone(), flags.fixed_size_chunk).await?;

for (chunk_index, (chunk_offset, chunk_end)) in chunk_boundaries.into_iter().enumerate() {
let chunk_size = chunk_end - chunk_offset;
let chunk_buffer = buffer.slice(chunk_offset..(chunk_offset + chunk_size));
let chunk_buffer = buffer.slice(chunk_offset..chunk_end);

let fragment = Fragment {
flags: flags.into(),
Expand All @@ -91,7 +95,7 @@ pub async fn write_fragmented(
};

if chunk_offset == 0 && chunk_size == size {
// Everything was put in a single fragment
// Whole file fits in one chunk.
let chunk_buffer = if flags.clone_buffer {
Bytes::copy_from_slice(chunk_buffer.as_ref())
} else {
Expand Down Expand Up @@ -146,12 +150,7 @@ pub async fn write_fragmented(
};
Ok((chunk_index, chunk_offset, chunk_address))
});

chunk_offset += chunk_size;
chunk_index += 1;
}
drop(chunker);
drop(buffer);

let mut list_buffer =
BytesMut::with_capacity(tasks.len() * std::mem::size_of::<FragmentReference>());
Expand Down Expand Up @@ -391,11 +390,113 @@ pub fn write_fragmentlist(
))
}

/// Unsafe extension of lifetime. Caller must guarantee the reference outlives
/// all uses. Used for `FastCDC`'s borrow of the buffer slice.
/// Pretends a short-lived reference lives forever. Caller must make sure it really does.
pub(crate) unsafe fn extend_lifetime<T>(data: &T) -> &'static T
where
T: ?Sized,
{
unsafe { &*(data as *const T) }
}

#[cfg(test)]
mod tests {
use bytes::Bytes;

use super::*;

/// Chunk a buffer using the standard `FastCDC` iterator, used to check our new way gives the same answer.
fn reference_fastcdc_boundaries(buffer: &[u8]) -> Vec<(usize, usize)> {
let chunker = fastcdc::v2020::FastCDC::with_level(
buffer,
FRAGMENT_SIZE_MINIMUM as u32,
FRAGMENT_SIZE_EXPECTED as u32,
FRAGMENT_SIZE_THRESHOLD as u32,
fastcdc::v2020::Normalization::Level1,
);
chunker.map(|c| (c.offset, c.offset + c.length)).collect()
}

/// Make a buffer of random bytes so chunking has to actually cut it.
fn mixed_pattern_buffer(size: usize) -> Bytes {
use rand::Rng;
let mut data = vec![0u8; size];
rand::rng().fill(&mut data[..]);
Bytes::from(data)
}

#[tokio::test]
async fn fastcdc_batch_matches_reference() {
let buffer = mixed_pattern_buffer(256 * 1024);
let expected = reference_fastcdc_boundaries(&buffer);
assert!(
expected.len() > 1,
"test buffer should produce multiple chunks, got {}",
expected.len()
);

let actual = chunk_boundaries(buffer, 0)
.await
.expect("chunking succeeds");
assert_eq!(actual, expected);
}

#[tokio::test]
async fn fastcdc_batch_handles_buffer_smaller_than_min_chunk() {
// Tiny buffer — should be one chunk covering the whole thing.
let buffer = mixed_pattern_buffer(1024);
let actual = chunk_boundaries(buffer.clone(), 0)
.await
.expect("chunking succeeds");
assert_eq!(actual, vec![(0, buffer.len())]);
}

#[tokio::test]
async fn fastcdc_batch_handles_empty_buffer() {
let buffer = Bytes::new();
let actual = chunk_boundaries(buffer, 0)
.await
.expect("chunking succeeds");
assert!(actual.is_empty());
}

#[tokio::test]
async fn fastcdc_batch_handles_pathological_all_zero_buffer() {
// All-zero input — the rolling hash never matches, but the split should still be clean.
let buffer = Bytes::from(vec![0u8; 512 * 1024]);
let actual = chunk_boundaries(buffer.clone(), 0)
.await
.expect("chunking succeeds");
let reconstructed_size: usize = actual.iter().map(|(s, e)| e - s).sum();
assert_eq!(reconstructed_size, buffer.len());
assert!(actual.iter().all(|&(s, e)| s < e));
assert_eq!(actual.first().copied(), Some((0, actual[0].1)));
assert_eq!(actual.last().unwrap().1, buffer.len());
}

#[tokio::test]
async fn fixed_size_chunking_covers_whole_buffer() {
let buffer = mixed_pattern_buffer(10 * 1024 + 17);
let step = 1024;
let actual = chunk_boundaries(buffer.clone(), step)
.await
.expect("chunking succeeds");

let expected: Vec<(usize, usize)> = (0..buffer.len())
.step_by(step)
.map(|o| (o, (o + step).min(buffer.len())))
.collect();
assert_eq!(actual, expected);

let reconstructed_size: usize = actual.iter().map(|(s, e)| e - s).sum();
assert_eq!(reconstructed_size, buffer.len());
}

#[tokio::test]
async fn fixed_size_chunking_handles_buffer_smaller_than_step() {
let buffer = Bytes::from(vec![1u8; 100]);
let actual = chunk_boundaries(buffer.clone(), 1024)
.await
.expect("chunking succeeds");
assert_eq!(actual, vec![(0, buffer.len())]);
}
}