diff --git a/ext4/src/tar_convert.rs b/ext4/src/tar_convert.rs index ee38d68..10e298f 100644 --- a/ext4/src/tar_convert.rs +++ b/ext4/src/tar_convert.rs @@ -14,6 +14,8 @@ use crate::writer::{File, Writer, WriterOption}; const WHITEOUT_PREFIX: &str = ".wh."; const OPAQUE_WHITEOUT: &str = ".wh..wh..opq"; +/// overlayfs marks an opaque directory with this xattr (value `"y"`). +const OVERLAY_OPAQUE_XATTR: &str = "trusted.overlay.opaque"; /// Options for tar-to-ext4 conversion. #[derive(Default)] @@ -88,6 +90,123 @@ where fs.close() } +/// Convert a single OCI layer into an ext4 image that is a valid **overlayfs +/// lower layer** — i.e. layers *survive* instead of being flattened. +/// +/// Unlike [`convert_oci_layers_to_ext4`], this does not merge or drop deletion +/// markers. Instead it translates OCI whiteouts to the on-disk form overlayfs +/// understands, so the resulting images can be stacked at runtime: +/// - `.wh.` → a character device `0,0` at `/` +/// - `.wh..wh..opq` → `trusted.overlay.opaque=y` xattr on `` +/// +/// The output is deterministic for a fixed UUID, so the same layer (addressed +/// by its digest) always yields byte-identical ext4 — which is what makes +/// shared layers dedup across images. +/// +/// `layer` must be a seekable decompressed tar stream (two passes: one to find +/// opaque directories, one to write). +pub fn convert_layer_to_ext4( + mut layer: R, + output: W, + options: &ConvertOptions, +) -> io::Result +where + R: Read + Seek, + W: Read + Write + Seek, +{ + // Pass 1: find directories marked opaque so we can stamp the xattr when we + // write the directory entry (the writer is forward-only — xattrs must be set + // at create() time). + let opaque_dirs = scan_opaque_dirs(&mut layer, options)?; + layer.seek(SeekFrom::Start(0))?; + + let mut opaque_xattr = BTreeMap::new(); + opaque_xattr.insert(OVERLAY_OPAQUE_XATTR.to_string(), b"y".to_vec()); + let empty_xattrs: BTreeMap> = BTreeMap::new(); + + let mut fs = Writer::new(output, &options.writer_options); + let mut seen_dirs: HashSet = HashSet::new(); + + let mut archive = tar::Archive::new(&mut layer); + for entry_result in archive.entries()? { + let mut entry = entry_result?; + let (name, link_name) = extract_names(entry.header(), options)?; + let normalized = name.trim_end_matches('/').to_string(); + + if let Some((dir, file)) = split_dir_file(&name) { + // Opaque marker: handled via the directory's xattr, not as a file. + if file == OPAQUE_WHITEOUT { + continue; + } + // Plain whiteout: emit an overlayfs char-device deletion marker. + if let Some(stripped) = file.strip_prefix(WHITEOUT_PREFIX) { + let target = if dir.is_empty() { + stripped.to_string() + } else { + format!("{dir}{stripped}") + }; + fs.make_parents(&target)?; + let whiteout = File { + mode: format::S_IFCHR, // rdev 0,0 (devmajor/devminor default to 0) + ..Default::default() + }; + fs.create(&target, &whiteout)?; + continue; + } + } + + fs.make_parents(&name)?; + let is_dir = entry.header().entry_type() == tar::EntryType::Directory; + let extra = if is_dir && opaque_dirs.contains(&normalized) { + &opaque_xattr + } else { + &empty_xattrs + }; + write_tar_entry_inner(&mut fs, &mut entry, &name, &link_name, extra)?; + if is_dir { + seen_dirs.insert(normalized); + } + } + + // Opaque directories that had no explicit entry in the tar: synthesize them + // so the opaque xattr is recorded. `create()` treats caller xattrs as + // authoritative, so this also fixes up dirs auto-created by make_parents. + for dir in &opaque_dirs { + if seen_dirs.contains(dir) { + continue; + } + fs.make_parents(dir)?; + let f = File { + mode: format::S_IFDIR | 0o755, + xattrs: opaque_xattr.clone(), + ..Default::default() + }; + fs.create(dir, &f)?; + } + + fs.close() +} + +/// Scan a single layer tar for directories carrying an opaque whiteout. +fn scan_opaque_dirs( + layer: &mut R, + options: &ConvertOptions, +) -> io::Result> { + let mut opaque = HashSet::new(); + layer.seek(SeekFrom::Start(0))?; + let mut archive = tar::Archive::new(layer); + for entry_result in archive.entries()? { + let entry = entry_result?; + let (name, _) = extract_names(entry.header(), options)?; + if let Some((dir, file)) = split_dir_file(&name) + && file == OPAQUE_WHITEOUT + { + opaque.insert(dir.trim_end_matches('/').to_string()); + } + } + Ok(opaque) +} + // --------------------------------------------------------------------------- // Shared helpers // --------------------------------------------------------------------------- @@ -131,6 +250,18 @@ fn write_tar_entry_with_pax( entry: &mut tar::Entry<'_, R>, name: &str, link_name: &str, +) -> io::Result<()> { + write_tar_entry_inner(fs, entry, name, link_name, &BTreeMap::new()) +} + +/// Write a tar entry, merging `extra_xattrs` on top of any PAX xattrs (used to +/// stamp `trusted.overlay.opaque` on opaque directories). +fn write_tar_entry_inner( + fs: &mut Writer, + entry: &mut tar::Entry<'_, R>, + name: &str, + link_name: &str, + extra_xattrs: &BTreeMap>, ) -> io::Result<()> { let header = entry.header().clone(); let entry_type = header.entry_type(); @@ -176,6 +307,9 @@ fn write_tar_entry_with_pax( } } } + for (k, v) in extra_xattrs { + xattrs.insert(k.clone(), v.clone()); + } let fs_mtime = mtime & 0x3ffffffff; @@ -633,6 +767,71 @@ mod tests { assert_eq!(read_file(&a, "/app/main.bin").unwrap(), b"\x00\x01\x02\x03binary-ish\xff"); } + /// Overlay-preserving single-layer conversion must keep deletion markers in + /// the on-disk form overlayfs understands: char-device `0,0` for `.wh.` + /// and `trusted.overlay.opaque=y` for `.wh..wh..opq`. + #[test] + fn test_layer_overlay_whiteout_and_opaque() { + let layer = build_tar_with_dirs(&[ + TarEntry::Dir("etc/"), + TarEntry::File("etc/keep", b"k"), + TarEntry::Whiteout("etc/.wh.removed"), + TarEntry::Dir("opq/"), + TarEntry::Whiteout("opq/.wh..wh..opq"), + TarEntry::File("opq/new", b"n"), + ]); + let opts = ConvertOptions { + convert_backslash: false, + writer_options: vec![WriterOption::Uuid([0x42u8; 16]), WriterOption::Journal(1024)], + }; + let image = convert_layer_to_ext4(Cursor::new(layer), Cursor::new(Vec::new()), &opts) + .unwrap() + .into_inner(); + + let mut reader = crate::reader::Reader::new(Cursor::new(&image)).unwrap(); + let entries = reader.walk().unwrap(); + let by = |p: &str| entries.iter().find(|e| e.path == p); + + assert!(by("etc/keep").is_some(), "normal file preserved"); + let wh = by("etc/removed").expect("whiteout present as a node"); + assert_eq!(wh.mode & format::TYPE_MASK, format::S_IFCHR, "whiteout is a char device"); + assert_eq!((wh.devmajor, wh.devminor), (0, 0), "whiteout rdev is 0,0"); + let opq = by("opq").expect("opaque dir present"); + assert_eq!( + opq.xattrs.get(OVERLAY_OPAQUE_XATTR).map(Vec::as_slice), + Some(b"y".as_slice()), + "opaque dir carries trusted.overlay.opaque=y", + ); + // The whiteout marker itself must not survive as a regular file. + assert!(by("etc/.wh.removed").is_none()); + assert!(by("opq/.wh..wh..opq").is_none()); + } + + /// Single-layer overlay conversion is byte-deterministic for a fixed UUID — + /// the property that lets a shared layer dedup across images. + #[test] + fn test_layer_overlay_is_byte_deterministic() { + let opts = ConvertOptions { + convert_backslash: false, + writer_options: vec![ + WriterOption::MaximumDiskSize(64 * 1024 * 1024), + WriterOption::Uuid([0x7u8; 16]), + WriterOption::Journal(1024), + ], + }; + let make = || { + let layer = build_tar_with_dirs(&[ + TarEntry::Dir("a/"), + TarEntry::File("a/f", b"contents"), + TarEntry::Whiteout("a/.wh.gone"), + ]); + convert_layer_to_ext4(Cursor::new(layer), Cursor::new(Vec::new()), &opts) + .unwrap() + .into_inner() + }; + assert_eq!(make(), make(), "same layer + same UUID must be byte-identical"); + } + /// A different UUID must change the bytes (proving the UUID genuinely flows /// into the image) while everything else stays fixed — so reproducibility /// depends solely on pinning the UUID, which `bless` now derives diff --git a/glidefs/src/bin/oci_dedup_measure.rs b/glidefs/src/bin/oci_dedup_measure.rs new file mode 100644 index 0000000..57e0c42 --- /dev/null +++ b/glidefs/src/bin/oci_dedup_measure.rs @@ -0,0 +1,415 @@ +#![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::cast_possible_truncation)] +//! Measure content-addressed dedup for OCI images: **merged** (one ext4 per image, +//! all layers flattened — what `bless --oci` does today) vs **layers-survive** +//! (each OCI layer stored as its own content-addressed ext4 artifact). +//! +//! Crucially this drives the *real* GlideFS code paths so the numbers reflect +//! production, not a re-implementation: +//! - `ext4::tar_convert::convert_oci_layers_to_ext4` (the actual merge + writer) +//! - the production deterministic-UUID derivation from `bless.rs` +//! - `block_map::{blake3_128, lz4_compress}` (the actual flush hashing/compression) +//! - `pack::content_pack_id` (the actual S3 pack key derivation) +//! +//! It reports dedup at TWO granularities: +//! - BLOCK level: union of unique 128 KiB block hashes (theoretical upper bound; +//! this is what `dedup_measure` computes). +//! - PACK level: union of unique (chunk_idx, pack_id) packs — this is what GlideFS +//! *actually* dedups against in S3, since the pack key is +//! `chunks/{chunk_idx:04}/{pack_id:016x}.pack`. +//! +//! Usage: +//! skopeo copy docker://debian:bookworm-slim dir:/tmp/oci/debian +//! skopeo copy docker://python:3.12-slim-bookworm dir:/tmp/oci/python +//! cargo run --release --bin oci_dedup_measure -- /tmp/oci/debian /tmp/oci/python ... + +use std::collections::{HashMap, HashSet}; +use std::fs::File; +use std::io::{Read, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; + +use ext4::tar_convert::{ConvertOptions, convert_oci_layers_to_ext4}; +use ext4::writer::WriterOption; +use glidefs::block::block_map::{Blake3Hash, blake3_128, lz4_compress}; +use glidefs::block::pack::{PackId, content_pack_id}; + +const BLOCK_SIZE: usize = 128 * 1024; // 131072 — production BLOCK_SIZE +const CHUNK_SIZE: usize = 128 * 1024 * 1024; // 1 ext4 block group == 1 GlideFS chunk +const BLOCKS_PER_CHUNK: usize = CHUNK_SIZE / BLOCK_SIZE; + +/// Exact copy of `bless.rs::deterministic_uuid` — content-addressed UUID so that +/// identical input (same manifest, or same layer) yields a byte-identical ext4. +fn deterministic_uuid(seed: &str) -> [u8; 16] { + let mut uuid = *blake3_128(seed.as_bytes()).as_bytes(); + uuid[6] = (uuid[6] & 0x0f) | 0x80; // version 8 (custom) + uuid[8] = (uuid[8] & 0x3f) | 0x80; // variant 1 (RFC 4122) + uuid +} + +/// Production device-size estimate from `bless.rs::run_bless_oci`. +fn device_size_for(total_compressed: u64) -> i64 { + let estimated = (total_compressed * 3).max(64 * 1024 * 1024); + estimated.next_power_of_two() as i64 +} + +/// Decompress a gzip or zstd OCI layer blob into a fresh seekable temp file. +fn decompress_layer(blob: &Path) -> std::io::Result { + let mut f = File::open(blob)?; + let mut magic = [0u8; 4]; + f.read_exact(&mut magic)?; + f.seek(SeekFrom::Start(0))?; + + let mut out = tempfile::tempfile()?; + if magic[0] == 0x1f && magic[1] == 0x8b { + let mut dec = flate2::read::GzDecoder::new(f); + std::io::copy(&mut dec, &mut out)?; + } else if magic == [0x28, 0xb5, 0x2f, 0xfd] { + let mut dec = zstd::Decoder::new(f)?; + std::io::copy(&mut dec, &mut out)?; + } else { + // already an uncompressed tar + std::io::copy(&mut f, &mut out)?; + } + out.seek(SeekFrom::Start(0))?; + Ok(out) +} + +struct ImageSpec { + name: String, + manifest_digest_seed: String, // content-addressed seed for the merged UUID + layers: Vec, // bottom-to-top +} + +#[derive(Clone)] +struct LayerRef { + digest: String, // "sha256:..." + blob: PathBuf, // path to the (compressed) blob file + compressed_size: u64, +} + +fn load_image(dir: &Path) -> ImageSpec { + let manifest_path = dir.join("manifest.json"); + let bytes = std::fs::read(&manifest_path) + .unwrap_or_else(|e| panic!("read {}: {e}", manifest_path.display())); + // Content-addressed seed for the merged image. The literal value is irrelevant + // to the comparison (production uses the sha256 manifest digest); all that + // matters is that it is stable per-image and differs across images — which any + // hash of the manifest guarantees. + let seed = format!("blake3:{:032x}", u128::from_le_bytes(*blake3_128(&bytes).as_bytes())); + + let v: serde_json::Value = serde_json::from_slice(&bytes).expect("parse manifest.json"); + let mut layers = Vec::new(); + for l in v["layers"].as_array().expect("manifest.layers[]") { + let digest = l["digest"].as_str().expect("layer.digest").to_string(); + let hex = digest.strip_prefix("sha256:").unwrap_or(&digest); + let blob = dir.join(hex); + let compressed_size = std::fs::metadata(&blob).map(|m| m.len()).unwrap_or(0); + layers.push(LayerRef { digest, blob, compressed_size }); + } + + ImageSpec { + name: dir.file_name().unwrap_or_default().to_string_lossy().to_string(), + manifest_digest_seed: seed, + layers, + } +} + +/// Build a deterministic ext4 from `layer_blobs` (bottom-to-top), using the exact +/// production writer options. Returns the ext4 image bytes (as a temp File). +fn build_ext4(layer_blobs: &[PathBuf], total_compressed: u64, uuid_seed: &str) -> File { + let mut layers: Vec = + layer_blobs.iter().map(|p| decompress_layer(p).expect("decompress layer")).collect(); + + let opts = ConvertOptions { + convert_backslash: false, + writer_options: vec![ + WriterOption::MaximumDiskSize(device_size_for(total_compressed)), + WriterOption::Uuid(deterministic_uuid(uuid_seed)), + WriterOption::Journal(1024), // 4 MiB journal — same as bless + ], + }; + + let out = tempfile::tempfile().expect("tempfile"); + let mut fs = convert_oci_layers_to_ext4(&mut layers, out, &opts).expect("convert to ext4"); + fs.seek(SeekFrom::Start(0)).expect("seek ext4"); + fs +} + +fn is_zero(data: &[u8]) -> bool { + let (prefix, chunks, suffix) = unsafe { data.align_to::() }; + prefix.iter().all(|&b| b == 0) + && chunks.iter().all(|&w| w == 0) + && suffix.iter().all(|&b| b == 0) +} + +/// One ext4 image's content, decomposed exactly as GlideFS would store it. +struct ImageBlocks { + /// unique non-zero block hash -> compressed length (block-level view) + block_comp: HashMap, + /// (chunk_idx, pack_id) -> pack stored bytes (pack-level view = real S3 objects) + packs: HashMap<(u32, PackId), usize>, + nonzero_blocks: usize, + zero_blocks: usize, +} + +/// Read an ext4 image and decompose into block-level and pack-level dedup units, +/// using the production hashing (`blake3_128`), compression (`lz4_compress`) and +/// pack-id (`content_pack_id`). +fn decompose(mut img: File) -> ImageBlocks { + let mut block_comp: HashMap = HashMap::new(); + let mut packs: HashMap<(u32, PackId), usize> = HashMap::new(); + let mut nonzero_blocks = 0usize; + let mut zero_blocks = 0usize; + + img.seek(SeekFrom::Start(0)).unwrap(); + let mut buf = vec![0u8; BLOCK_SIZE]; + let mut block_idx: usize = 0; + // Accumulate the current chunk's non-zero blocks for pack-id computation. + let mut cur_chunk: u32 = 0; + let mut cur_blocks: Vec<(Blake3Hash, u32, bytes::Bytes)> = Vec::new(); + + let flush_pack = + |chunk: u32, blocks: &mut Vec<(Blake3Hash, u32, bytes::Bytes)>, packs: &mut HashMap<(u32, PackId), usize>| { + if blocks.is_empty() { + return; + } + // content_pack_id requires blocks sorted by chunk_offset (the flush path + // produces them in offset order); ours already are. + let pid = content_pack_id(blocks); + let stored: usize = blocks.iter().map(|(_, _, c)| c.len()).sum(); + packs.entry((chunk, pid)).or_insert(stored); + blocks.clear(); + }; + + loop { + buf.fill(0); + let mut filled = 0; + while filled < BLOCK_SIZE { + match img.read(&mut buf[filled..]) { + Ok(0) => break, + Ok(n) => filled += n, + Err(e) => panic!("read ext4: {e}"), + } + } + if filled == 0 { + break; + } + + let chunk = (block_idx / BLOCKS_PER_CHUNK) as u32; + if chunk != cur_chunk { + flush_pack(cur_chunk, &mut cur_blocks, &mut packs); + cur_chunk = chunk; + } + + let data = &buf[..BLOCK_SIZE]; + if is_zero(data) { + zero_blocks += 1; // zero blocks are never uploaded by GlideFS + } else { + nonzero_blocks += 1; + let hash = blake3_128(data); + let comp = lz4_compress(data); + block_comp.entry(hash).or_insert(comp.len()); + let chunk_offset = (block_idx % BLOCKS_PER_CHUNK) as u32; + cur_blocks.push((hash, chunk_offset, bytes::Bytes::from(comp))); + } + block_idx += 1; + } + flush_pack(cur_chunk, &mut cur_blocks, &mut packs); + + ImageBlocks { block_comp, packs, nonzero_blocks, zero_blocks } +} + +fn human(bytes: usize) -> String { + let b = bytes as f64; + if bytes >= 1 << 30 { + format!("{:.2} GiB", b / (1u64 << 30) as f64) + } else if bytes >= 1 << 20 { + format!("{:.1} MiB", b / (1u64 << 20) as f64) + } else if bytes >= 1 << 10 { + format!("{:.1} KiB", b / (1u64 << 10) as f64) + } else { + format!("{bytes} B") + } +} + +fn main() { + let dirs: Vec = std::env::args().skip(1).map(PathBuf::from).collect(); + if dirs.len() < 2 { + eprintln!("usage: oci_dedup_measure [...]"); + std::process::exit(2); + } + + let images: Vec = dirs.iter().map(|d| load_image(d)).collect(); + + // --- Layer inventory & shared-layer detection --- + println!("Images & layers (bottom→top):"); + let mut layer_count: HashMap = HashMap::new(); + let mut layer_size: HashMap = HashMap::new(); + for img in &images { + println!(" {} ({} layers)", img.name, img.layers.len()); + for l in &img.layers { + *layer_count.entry(l.digest.clone()).or_insert(0) += 1; + layer_size.insert(l.digest.clone(), l.compressed_size); + println!(" {} {}", &l.digest[7..7 + 16], human(l.compressed_size as usize)); + } + } + println!(); + println!("Shared layers (present in >1 image):"); + let mut any_shared = false; + for (d, &c) in &layer_count { + if c > 1 { + any_shared = true; + println!( + " {} in {} images, {} compressed", + &d[7..7 + 16], + c, + human(layer_size[d] as usize) + ); + } + } + if !any_shared { + println!(" (none — comparison will be uninformative)"); + } + println!(); + + // ============================================================ + // STRATEGY A — MERGED: one ext4 per image, all layers flattened. + // ============================================================ + eprintln!("Building merged ext4 per image (real convert_oci_layers_to_ext4)..."); + let mut merged_blocks: HashMap = HashMap::new(); + let mut merged_packs: HashMap<(u32, PackId), usize> = HashMap::new(); + let mut merged_naive_block = 0usize; // sum of per-image unique (no cross-image dedup) + let mut merged_naive_pack = 0usize; + let mut per_image_rows: Vec<(String, usize, usize, usize)> = Vec::new(); + + for img in &images { + let total_compressed: u64 = img.layers.iter().map(|l| l.compressed_size).sum(); + let blobs: Vec = img.layers.iter().map(|l| l.blob.clone()).collect(); + eprint!(" {} ...", img.name); + let ext4 = build_ext4(&blobs, total_compressed, &img.manifest_digest_seed); + let d = decompose(ext4); + eprintln!(" {} non-zero blocks", d.nonzero_blocks); + + let img_block_bytes: usize = d.block_comp.values().sum(); + let img_pack_bytes: usize = d.packs.values().sum(); + merged_naive_block += img_block_bytes; + merged_naive_pack += img_pack_bytes; + per_image_rows.push((img.name.clone(), d.nonzero_blocks, img_block_bytes, img_pack_bytes)); + + for (h, c) in d.block_comp { + merged_blocks.entry(h).or_insert(c); + } + for (k, v) in d.packs { + merged_packs.entry(k).or_insert(v); + } + } + let merged_block_bytes: usize = merged_blocks.values().sum(); + let merged_pack_bytes: usize = merged_packs.values().sum(); + + // ============================================================ + // STRATEGY B — LAYERS-SURVIVE: each unique layer is its own + // content-addressed ext4. Shared layers stored once. + // ============================================================ + eprintln!("Building per-layer ext4 (each unique layer once)..."); + let mut layers_block_bytes_total = 0usize; + let mut layers_pack_bytes_total = 0usize; + // Also compute the *naive* (no cross-image dedup) layers cost: sum over images + // of each image's layer artifacts, counting shared layers once per image. + let mut naive_layers_block = 0usize; + let mut naive_layers_pack = 0usize; + + // Build each unique layer once. + let mut unique_layers: Vec<&LayerRef> = Vec::new(); + let mut seen = HashSet::new(); + for img in &images { + for l in &img.layers { + if seen.insert(l.digest.clone()) { + unique_layers.push(l); + } + } + } + let mut per_layer_cost: HashMap = HashMap::new(); // digest -> (block_bytes, pack_bytes) + for l in &unique_layers { + eprint!(" layer {} ...", &l.digest[7..7 + 16]); + let ext4 = build_ext4(&[l.blob.clone()], l.compressed_size, &l.digest); + let d = decompose(ext4); + let bb: usize = d.block_comp.values().sum(); + let pb: usize = d.packs.values().sum(); + eprintln!(" {} non-zero blocks", d.nonzero_blocks); + layers_block_bytes_total += bb; + layers_pack_bytes_total += pb; + per_layer_cost.insert(l.digest.clone(), (bb, pb)); + } + for img in &images { + for l in &img.layers { + let (bb, pb) = per_layer_cost[&l.digest]; + naive_layers_block += bb; + naive_layers_pack += pb; + } + } + + // ============================================================ + // RESULTS + // ============================================================ + println!("Per-image merged ext4 (non-zero blocks, then stored bytes):"); + for (name, nz, bb, pb) in &per_image_rows { + println!(" {:32} {:>7} blk block-lvl {:>10} pack-lvl {:>10}", name, nz, human(*bb), human(*pb)); + } + println!(); + + println!("============ DEDUP COMPARISON ============"); + println!(); + println!(" NAIVE (no cross-image dedup, store each image whole):"); + println!(" merged images: block-level {:>11} pack-level {:>11}", human(merged_naive_block), human(merged_naive_pack)); + println!(); + println!(" BLOCK-LEVEL dedup (theoretical upper bound — global 128KiB CAS):"); + println!(" A) merged: {:>11}", human(merged_block_bytes)); + println!(" B) layers-survive: {:>11}", human(layers_block_bytes_total)); + if merged_block_bytes > 0 { + let saved = merged_block_bytes as i64 - layers_block_bytes_total as i64; + println!( + " → layers saves {} ({:.1}% smaller)", + human(saved.max(0) as usize), + 100.0 * saved as f64 / merged_block_bytes as f64 + ); + } + println!(); + println!(" PACK-LEVEL dedup (what GlideFS ACTUALLY stores in S3):"); + println!(" key = chunks/{{chunk_idx}}/{{pack_id}}.pack"); + println!(" A) merged (same export prefix, best case): {:>11}", human(merged_pack_bytes)); + println!(" B) layers-survive (each layer once): {:>11}", human(layers_pack_bytes_total)); + if merged_pack_bytes > 0 { + let saved = merged_pack_bytes as i64 - layers_pack_bytes_total as i64; + println!( + " → layers saves {} ({:.1}% smaller)", + human(saved.max(0) as usize), + 100.0 * saved as f64 / merged_pack_bytes as f64 + ); + } + println!(); + println!(" (reference) naive layers-survive, shared layer per-image: block {} / pack {}", + human(naive_layers_block), human(naive_layers_pack)); + + // Cross-image block sharing actually realized by the merged ext4 layout. + println!(); + println!("Cross-image BLOCK sharing in the MERGED layout (does ext4 alignment preserve it?):"); + // Re-decompose per image to get per-image hash sets (cheap-ish; rebuild). + let mut sets: Vec<(String, HashSet)> = Vec::new(); + for img in &images { + let total_compressed: u64 = img.layers.iter().map(|l| l.compressed_size).sum(); + let blobs: Vec = img.layers.iter().map(|l| l.blob.clone()).collect(); + let ext4 = build_ext4(&blobs, total_compressed, &img.manifest_digest_seed); + let d = decompose(ext4); + sets.push((img.name.clone(), d.block_comp.into_keys().collect())); + } + for i in 0..sets.len() { + for j in (i + 1)..sets.len() { + let shared = sets[i].1.intersection(&sets[j].1).count(); + let union = sets[i].1.union(&sets[j].1).count(); + let jac = if union > 0 { 100.0 * shared as f64 / union as f64 } else { 0.0 }; + println!( + " {} <-> {}: {} shared blocks ({:.1}% Jaccard) = {} if globally CAS'd", + sets[i].0, sets[j].0, shared, jac, human(shared * BLOCK_SIZE) + ); + } + } +} diff --git a/glidefs/src/cli/bless.rs b/glidefs/src/cli/bless.rs index 1f01420..66961f4 100644 --- a/glidefs/src/cli/bless.rs +++ b/glidefs/src/cli/bless.rs @@ -1,24 +1,24 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::cast_possible_truncation)] -use bytes::Bytes; -use crate::block::block_map::{blake3_128, lz4_compress, shared_zero_block, Blake3Hash}; use crate::block::cache::{BlockCache, FoyerBlockCache, FoyerCacheConfig}; use crate::block::content_store::ContentStore; use crate::block::handler::BlockHandler; use crate::block::manifest::serialize_hot_set; use crate::block::metrics::ExportMetrics; -use crate::block::pack::{content_pack_id, PackId, DEFAULT_FLUSH_THRESHOLD}; +use crate::block::pack::DEFAULT_FLUSH_THRESHOLD; use crate::block::pack_index_cache::PackIndexCache; use crate::block::volume_manifest::VolumeManifest; use crate::block::write_cache::{WriteCache, WriteCacheConfig}; use crate::config::Settings; +use crate::oci::ext4_store::{deterministic_uuid, store_ext4_stream, BLOCK_SIZE}; use crate::oci::ingest::IngestOptions; -use crate::oci::pull::pull_image; +use crate::oci::layer_store::{ + ensure_layer_stored, put_image_descriptor, ImageDescriptor, +}; +use crate::oci::pull::{pull_image, pull_layer_to_tempfile}; use crate::parse_object_store::parse_url_opts; use anyhow::{Context, Result}; use ext4::writer::WriterOption; use oci_registry::{Credentials, RegistryClient}; -use std::collections::HashMap; -use std::io::Read; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicU64; @@ -26,9 +26,6 @@ use std::time::Instant; use tokio::sync::Notify; use tracing::info; -/// Fixed block size for the chunked architecture: 128KB. -const BLOCK_SIZE: u32 = 131_072; - pub async fn run_bless( image_path: PathBuf, name: String, @@ -66,92 +63,16 @@ pub async fn run_bless( info!(image = %image_path.display(), name = %name, "starting bless"); // --- Read image --- - let mut file = std::fs::File::open(&image_path) + let file = std::fs::File::open(&image_path) .with_context(|| format!("Failed to open image {}", image_path.display()))?; let device_size = file.metadata()?.len(); - - let volume_manifest_template = VolumeManifest::new(device_size, BLOCK_SIZE); - let blocks_per_chunk = volume_manifest_template.blocks_per_chunk(); let total_blocks = device_size.div_ceil(u64::from(BLOCK_SIZE)) as usize; - info!(device_size, total_blocks, blocks_per_chunk, "reading image"); + info!(device_size, total_blocks, "reading image"); // --- Stream image: read blocks, upload each chunk as it completes --- - let (_, zero_hash) = shared_zero_block(BLOCK_SIZE as usize); - let mut buf = vec![0u8; BLOCK_SIZE as usize]; - - let mut volume_manifest = VolumeManifest::new(device_size, BLOCK_SIZE); - let mut stats = BlessStats::default(); - let mut hot_set_indices: Vec = Vec::new(); - - // Current chunk accumulator — flushed when we move to the next chunk. - let mut pending_chunk: Option<(u32, Vec)> = None; - // In-flight S3 upload — overlaps with reading the next chunk. - let mut in_flight: Option>> = None; - - for block_index in 0..total_blocks { - let bytes_read = read_full(&mut file, &mut buf)?; - if bytes_read < BLOCK_SIZE as usize { - buf[bytes_read..].fill(0); - } - - let hash = blake3_128(&buf); - - // Skip zero blocks entirely - if hash == zero_hash { - stats.zero_blocks += 1; - continue; - } - - // Record non-zero block index for hot set (prefetch at boot) - hot_set_indices.push(block_index as u64); - - let chunk_idx = volume_manifest_template.chunk_idx_for_block(block_index as u64); - let block_offset = volume_manifest_template.block_offset_in_chunk(block_index as u64); - - stats.unique_blocks += 1; - - let compressed = Bytes::from(lz4_compress(&buf)); - - // If we've moved to a new chunk, prepare and upload the previous one. - if pending_chunk.as_ref().is_some_and(|(idx, _)| *idx != chunk_idx) { - let (completed_idx, blocks) = pending_chunk.take().unwrap(); - in_flight = start_chunk_upload( - &content_store, - &mut volume_manifest, - &mut stats, - in_flight, - completed_idx, - blocks, - ) - .await?; - } - - pending_chunk - .get_or_insert_with(|| (chunk_idx, Vec::new())) - .1 - .push(BlockInfo { - block_offset, - hash, - compressed, - }); - } - - // Flush the final chunk. - if let Some((chunk_idx, blocks)) = pending_chunk.take() { - in_flight = start_chunk_upload( - &content_store, - &mut volume_manifest, - &mut stats, - in_flight, - chunk_idx, - blocks, - ) - .await?; - } - - // Wait for last upload. - join_upload(&mut volume_manifest, &mut stats, in_flight).await?; + let (volume_manifest, hot_set_indices, stats) = + store_ext4_stream(&content_store, file, device_size).await?; // --- Upload manifest --- let manifest_key = format!("bases/{}", name); @@ -198,21 +119,6 @@ pub async fn run_bless( Ok(()) } -/// Derive a deterministic, stable ext4 filesystem UUID from an OCI manifest -/// digest. -/// -/// The manifest digest (`sha256:...`) is content-addressed: the same image -/// content always resolves to the same digest, so hashing it yields the same -/// UUID on every bless. We hash rather than slice the digest directly so the -/// result is uniformly distributed over the 16-byte space, then stamp the -/// RFC 4122 version (8 = custom) and variant bits so it is a well-formed UUID. -fn deterministic_uuid(manifest_digest: &str) -> [u8; 16] { - let mut uuid = blake3_128(manifest_digest.as_bytes()).0; - uuid[6] = (uuid[6] & 0x0f) | 0x80; // version 8 (custom) - uuid[8] = (uuid[8] & 0x3f) | 0x80; // variant 1 (RFC 4122) - uuid -} - /// Bless an OCI image into a content-addressed base image. /// /// Pulls layers from the registry, converts to ext4, writes through @@ -446,121 +352,104 @@ pub async fn run_bless_oci( Ok(()) } -/// Result of a completed chunk upload. -struct ChunkUploadResult { - chunk_idx: u32, - pack_id: PackId, - pack_size: u64, -} - -/// Join the previous in-flight upload (if any) and apply its results. -async fn join_upload( - volume_manifest: &mut VolumeManifest, - stats: &mut BlessStats, - in_flight: Option>>, +/// Bless an OCI image as **content-addressed layers** (layers survive). +/// +/// Each layer is converted independently to a deterministic, overlay-preserving +/// ext4 and stored once under a global `layers/{digest}` namespace; the image is +/// recorded as an ordered list of layer digests under `images/{name}`. Two +/// images that share a layer share its storage — the dedup that flattening into +/// one merged ext4 cannot achieve. +pub async fn run_bless_oci_layered( + image_ref: String, + name: String, + config_path: PathBuf, ) -> Result<()> { - if let Some(handle) = in_flight { - let result = handle.await.context("upload task panicked")??; - volume_manifest.append_pack(result.chunk_idx, result.pack_id); - stats.packs_uploaded += 1; - stats.bytes_uploaded += result.pack_size; - stats.chunks_written += 1; - } - Ok(()) -} + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or(tracing_subscriber::EnvFilter::new("info")), + ) + .with_writer(std::io::stderr) + .init(); -/// Dedup + assemble pack (CPU), then spawn S3 upload overlapped with next chunk's reads. -/// -/// Joins the previous in-flight upload before spawning a new one, so at most -/// one upload is in flight at a time. -async fn start_chunk_upload( - content_store: &Arc, - volume_manifest: &mut VolumeManifest, - stats: &mut BlessStats, - prev_in_flight: Option>>, - chunk_idx: u32, - blocks: Vec, -) -> Result>>> { - // Share compressed Bytes across duplicate hashes (avoids redundant allocations) - // but keep every block offset in the pack index. Two blocks with the same - // hash but different chunk_offsets both need entries — otherwise the read - // path can't find the second block and returns zeros (BlockLocation::Zero). - let mut first_seen: HashMap = HashMap::new(); - let mut pack_blocks: Vec<(Blake3Hash, u32, Bytes)> = Vec::new(); - - for block in blocks { - let compressed = first_seen - .entry(block.hash) - .or_insert_with(|| block.compressed.clone()) - .clone(); - pack_blocks.push((block.hash, block.block_offset, compressed)); - } + let start = Instant::now(); - if pack_blocks.is_empty() { - // All-zero chunk — just join previous and move on. - join_upload(volume_manifest, stats, prev_in_flight).await?; - stats.chunks_written += 1; - return Ok(None); - } + // --- S3 setup --- + let settings = Settings::from_file(&config_path) + .with_context(|| format!("Failed to load config from {}", config_path.display()))?; + let url = settings.storage.url.clone(); + let env_vars = settings.cloud_provider_env_vars(); + let (object_store, path_from_url) = parse_url_opts( + &url.parse()?, + env_vars.into_iter(), + Some(settings.storage.connect_timeout()), + Some(settings.storage.request_timeout()), + )?; + let object_store: Arc = Arc::from(object_store); + let db_path = path_from_url.to_string(); - // Sort by chunk_offset for canonical ordering before computing the - // content-addressed pack ID (same as flush and compaction paths). - pack_blocks.sort_by_key(|(_, co, _)| *co); - let pack_id = content_pack_id(&pack_blocks); + // --- Resolve image --- + let registry_client = RegistryClient::new(); + let image: oci_registry::Reference = image_ref + .parse() + .map_err(|e| anyhow::anyhow!("invalid image reference: {e}"))?; - // Join previous upload before spawning next (keeps at most 1 in flight). - join_upload(volume_manifest, stats, prev_in_flight).await?; + info!(image = %image_ref, name = %name, "resolving OCI image (layered)"); + let resolved = registry_client + .resolve(&image, &Credentials::Anonymous) + .await + .map_err(|e| anyhow::anyhow!("failed to resolve image: {e}"))?; - // Spawn streaming S3 upload — runs concurrently with next chunk's disk reads. - let cs = Arc::clone(content_store); - let handle = tokio::spawn(async move { - let entries = cs - .stream_chunk_pack(chunk_idx, pack_id, pack_blocks, BLOCK_SIZE) - .await - .context("Failed to stream chunk pack")?; - let pack_size = entries.iter().map(|e| u64::from(e.comp_length)).sum::(); - Ok(ChunkUploadResult { - chunk_idx, - pack_id, - pack_size, - }) - }); + // --- Store each layer once (content-addressed) --- + let mut layer_digests = Vec::with_capacity(resolved.layers.len()); + let mut layer_sizes = Vec::with_capacity(resolved.layers.len()); + let mut total_stored: u64 = 0; + let mut reused = 0usize; - Ok(Some(handle)) -} + for (i, layer) in resolved.layers.iter().enumerate() { + info!(layer = i, digest = %layer.digest, size = layer.size, "ensuring layer"); + let decompressed = + pull_layer_to_tempfile(®istry_client, &image, layer, &Credentials::Anonymous) + .await + .with_context(|| format!("pull layer {}", layer.digest))?; + let stored = ensure_layer_stored(&object_store, &db_path, &layer.digest, decompressed) + .await + .with_context(|| format!("store layer {}", layer.digest))?; + if stored.already_present { + reused += 1; + } + total_stored += stored.stored_bytes; + layer_digests.push(layer.digest.clone()); + layer_sizes.push(layer.size as u64); + } -/// Block info accumulated during the image scan. -struct BlockInfo { - block_offset: u32, - hash: Blake3Hash, - compressed: Bytes, -} + // --- Record the image descriptor --- + let descriptor = ImageDescriptor { + image_ref: image_ref.clone(), + config_digest: resolved.manifest.config.digest.clone(), + layers: layer_digests, + layer_sizes, + }; + put_image_descriptor(&object_store, &db_path, &name, &descriptor) + .await + .context("write image descriptor")?; -#[derive(Default)] -struct BlessStats { - zero_blocks: usize, - unique_blocks: usize, - packs_uploaded: usize, - bytes_uploaded: u64, - chunks_written: usize, -} + let elapsed = start.elapsed(); + println!("Blessed '{}' from OCI image (layered) successfully:", name); + println!(" Image: {}", image_ref); + println!(" Layers: {}", resolved.layers.len()); + println!(" Layers reused: {} (already stored)", reused); + println!(" Bytes uploaded: {:.1} MB", total_stored as f64 / 1e6); + println!(" Elapsed: {:.1}s", elapsed.as_secs_f64()); + println!(" Descriptor: images/{}", name); -/// Read exactly buf.len() bytes, or fewer at EOF. -fn read_full(file: &mut std::fs::File, buf: &mut [u8]) -> std::io::Result { - let mut total = 0; - while total < buf.len() { - match file.read(&mut buf[total..])? { - 0 => break, - n => total += n, - } - } - Ok(total) + Ok(()) } #[cfg(test)] mod tests { use super::*; - use crate::block::block_map::lz4_decompress; + use crate::block::block_map::{blake3_128, lz4_decompress}; use crate::block::pack::{extract_block, lookup_block_in_index, parse_pack_index, PackId}; use object_store::memory::InMemory; use object_store::path::Path as ObjectPath; @@ -596,80 +485,16 @@ mod tests { content_store: &ContentStore, name: &str, image_data: &[u8], - ) -> Result { + ) -> Result { let device_size = image_data.len() as u64; - let vm_template = VolumeManifest::new(device_size, BLOCK_SIZE); - let total_blocks = device_size.div_ceil(BLOCK_SIZE as u64) as usize; - let (_, zero_hash) = shared_zero_block(BLOCK_SIZE as usize); - let content_store = Arc::new(ContentStore::new( content_store.object_store().clone(), content_store.base_path(), )); - let mut volume_manifest = VolumeManifest::new(device_size, BLOCK_SIZE); - let mut stats = BlessStats::default(); - let mut hot_set_indices: Vec = Vec::new(); - let mut pending_chunk: Option<(u32, Vec)> = None; - let mut in_flight: Option>> = None; - - for block_index in 0..total_blocks { - let start = block_index * BLOCK_SIZE as usize; - let end = (start + BLOCK_SIZE as usize).min(image_data.len()); - let mut buf = vec![0u8; BLOCK_SIZE as usize]; - buf[..end - start].copy_from_slice(&image_data[start..end]); - - let hash = blake3_128(&buf); - - if hash == zero_hash { - stats.zero_blocks += 1; - continue; - } - hot_set_indices.push(block_index as u64); - - let chunk_idx = vm_template.chunk_idx_for_block(block_index as u64); - let block_offset = vm_template.block_offset_in_chunk(block_index as u64); - - stats.unique_blocks += 1; - - let compressed = Bytes::from(lz4_compress(&buf)); - - if pending_chunk.as_ref().is_some_and(|(idx, _)| *idx != chunk_idx) { - let (completed_idx, blocks) = pending_chunk.take().unwrap(); - in_flight = start_chunk_upload( - &content_store, - &mut volume_manifest, - &mut stats, - in_flight, - completed_idx, - blocks, - ) + let (volume_manifest, hot_set_indices, stats) = + store_ext4_stream(&content_store, std::io::Cursor::new(image_data.to_vec()), device_size) .await?; - } - - pending_chunk - .get_or_insert_with(|| (chunk_idx, Vec::new())) - .1 - .push(BlockInfo { - block_offset, - hash, - compressed, - }); - } - - if let Some((chunk_idx, blocks)) = pending_chunk.take() { - in_flight = start_chunk_upload( - &content_store, - &mut volume_manifest, - &mut stats, - in_flight, - chunk_idx, - blocks, - ) - .await?; - } - - join_upload(&mut volume_manifest, &mut stats, in_flight).await?; content_store .put_manifest(&format!("bases/{}", name), volume_manifest.serialize()?, None) diff --git a/glidefs/src/cli/gc.rs b/glidefs/src/cli/gc.rs index 3d64611..6345645 100644 --- a/glidefs/src/cli/gc.rs +++ b/glidefs/src/cli/gc.rs @@ -34,6 +34,11 @@ use crate::parse_object_store::parse_url_opts; pub struct GcState { /// Pack key ("{chunk_idx:04}/{pack_id:016x}") -> first-seen-dead ISO 8601 timestamp. pub(crate) dead_packs: HashMap, + /// Layer digest (hex) -> first-seen-dead ISO 8601 timestamp. A layer is dead + /// when no `images/*` descriptor references it. `#[serde(default)]` so old + /// state files (without this field) still load. + #[serde(default)] + pub(crate) dead_layers: HashMap, } /// Format a composite key for a (chunk_idx, pack_id) pair. @@ -88,6 +93,11 @@ impl GcState { Self::is_key_eligible(&self.dead_packs, &key, grace_period) } + /// Whether an unreferenced layer has been dead long enough to delete. + fn is_layer_eligible(&self, digest_hex: &str, grace_period: Duration) -> bool { + Self::is_key_eligible(&self.dead_layers, digest_hex, grace_period) + } + fn is_key_eligible(map: &HashMap, key: &str, grace_period: Duration) -> bool { if let Some(ts_str) = map.get(key) && let Ok(ts) = ts_str.parse::>() @@ -111,6 +121,12 @@ struct GcStateDelta { revived_packs: Vec, /// Packs successfully deleted deleted_packs: Vec, + /// Layers (by hex digest) newly seen as dead: (digest, timestamp) + newly_dead_layers: Vec<(String, String)>, + /// Layers that became live again (referenced by an image descriptor) + revived_layers: Vec, + /// Layers successfully deleted + deleted_layers: Vec, } impl GcState { @@ -124,6 +140,15 @@ impl GcState { for key in delta.deleted_packs { self.dead_packs.remove(&key); } + for (digest, ts) in delta.newly_dead_layers { + self.dead_layers.entry(digest).or_insert(ts); + } + for digest in delta.revived_layers { + self.dead_layers.remove(&digest); + } + for digest in delta.deleted_layers { + self.dead_layers.remove(&digest); + } } } @@ -142,6 +167,11 @@ struct GcStats { eligible_for_deletion: usize, packs_deleted: usize, snapshots_checked: usize, + // Shared-layer pool (layered OCI bless). + live_layers: usize, + dead_layers_found: usize, + eligible_layers: usize, + layers_deleted: usize, } impl GcStats { @@ -155,6 +185,10 @@ impl GcStats { self.eligible_for_deletion += other.eligible_for_deletion; self.packs_deleted += other.packs_deleted; self.snapshots_checked += other.snapshots_checked; + self.live_layers += other.live_layers; + self.dead_layers_found += other.dead_layers_found; + self.eligible_layers += other.eligible_layers; + self.layers_deleted += other.layers_deleted; } } @@ -233,6 +267,10 @@ pub async fn run_gc( println!("Dead packs found: {}", stats.dead_found); println!("Eligible for deletion: {}", stats.eligible_for_deletion); println!("Packs deleted: {}", stats.packs_deleted); + println!("Live layers: {}", stats.live_layers); + println!("Dead layers found: {}", stats.dead_layers_found); + println!("Eligible layers: {}", stats.eligible_layers); + println!("Layers deleted: {}", stats.layers_deleted); Ok(()) } @@ -282,9 +320,152 @@ async fn gc_orchestrate( state.apply_delta(delta); stats.merge(prefix_stats); } + + // Reconcile the shared layer pool (layered OCI bless). Layers live outside + // `exports/`, so the per-prefix pass above never touches them; they are + // ref-counted by `images/*` descriptors instead. + let (layer_delta, layer_stats) = + reconcile_layers(object_store, db_path, state, grace_period, &budget, dry_run).await?; + state.apply_delta(layer_delta); + stats.merge(layer_stats); + Ok(stats) } +// --------------------------------------------------------------------------- +// Shared layer-pool reconciliation (layered OCI bless) +// --------------------------------------------------------------------------- + +/// Normalize an OCI digest to its hex form (the `layers/{hex}` path segment). +fn digest_hex(digest: &str) -> &str { + digest.strip_prefix("sha256:").unwrap_or(digest) +} + +/// Reconcile the global `layers/` pool: a layer is live iff some `images/*` +/// descriptor references its digest. Unreferenced layers are marked dead and, +/// after the grace period, their whole subtree is deleted. +/// +/// The grace period is what makes this safe against the layered-bless write race +/// (layers are uploaded *before* the image descriptor): a freshly stored layer +/// briefly looks unreferenced, but the descriptor lands within seconds — long +/// before any sane grace period elapses — so the layer is revived, not deleted. +async fn reconcile_layers( + object_store: &Arc, + db_path: &str, + state: &GcState, + grace_period: Duration, + budget: &AtomicUsize, + dry_run: bool, +) -> Result<(GcStateDelta, GcStats)> { + let live = collect_referenced_layers(object_store, db_path).await?; + let present = list_layer_digests(object_store, db_path).await?; + + let mut delta = GcStateDelta::default(); + let mut stats = GcStats::default(); + + for digest in present { + if live.contains(&digest) { + stats.live_layers += 1; + // Referenced again — clear any stale dead mark. + if state.dead_layers.contains_key(&digest) { + delta.revived_layers.push(digest); + } + continue; + } + + // Unreferenced layer. + stats.dead_layers_found += 1; + if state.is_layer_eligible(&digest, grace_period) { + if dry_run { + stats.eligible_layers += 1; + continue; + } + // One budget slot per layer keeps a single run's churn bounded. + if !try_claim_delete_slot(budget) { + continue; + } + let deleted = delete_layer_subtree(object_store, db_path, &digest).await?; + info!(layer = %digest, objects = deleted, "deleted orphaned layer"); + stats.layers_deleted += 1; + delta.deleted_layers.push(digest); + } else if !state.dead_layers.contains_key(&digest) { + delta + .newly_dead_layers + .push((digest, Utc::now().to_rfc3339())); + } + } + + Ok((delta, stats)) +} + +/// Collect the set of layer digests (hex) referenced by any `images/*` descriptor. +async fn collect_referenced_layers( + object_store: &Arc, + db_path: &str, +) -> Result> { + let names = crate::oci::layer_store::list_image_descriptors(object_store, db_path) + .await + .context("list image descriptors")?; + let mut live = HashSet::new(); + for name in names { + if let Some(desc) = + crate::oci::layer_store::get_image_descriptor(object_store, db_path, &name) + .await + .with_context(|| format!("read image descriptor {name}"))? + { + for layer in &desc.layers { + live.insert(digest_hex(layer).to_string()); + } + } + } + Ok(live) +} + +/// List the layer digests (hex) physically present under `{db_path}/layers/`. +async fn list_layer_digests( + object_store: &Arc, + db_path: &str, +) -> Result> { + let prefix_str = format!("{}/layers/", db_path.trim_end_matches('/')); + let prefix = ObjectPath::from(prefix_str.clone()); + let mut digests = HashSet::new(); + let mut stream = object_store.list(Some(&prefix)); + while let Some(result) = stream.next().await { + let meta = result?; + let path_str = meta.location.to_string(); + if let Some(rel) = path_str.strip_prefix(&prefix_str) + && let Some(slash) = rel.find('/') + { + digests.insert(rel[..slash].to_string()); + } + } + Ok(digests) +} + +/// Delete every object under `{db_path}/layers/{digest}/`. Returns the count. +async fn delete_layer_subtree( + object_store: &Arc, + db_path: &str, + digest_hex: &str, +) -> Result { + let prefix_str = format!("{}/layers/{}/", db_path.trim_end_matches('/'), digest_hex); + let prefix = ObjectPath::from(prefix_str); + let mut deleted = 0usize; + let mut stream = object_store.list(Some(&prefix)); + let mut paths = Vec::new(); + while let Some(result) = stream.next().await { + paths.push(result?.location); + } + for path in paths { + match object_store.delete(&path).await { + Ok(()) => deleted += 1, + Err(object_store::Error::NotFound { .. }) => {} + Err(e) => return Err(e).context("delete layer object"), + } + } + Ok(deleted) +} + /// Atomically claim one delete slot from the shared budget. /// Returns true if a slot was claimed (caller may delete), false if exhausted. fn try_claim_delete_slot(budget: &AtomicUsize) -> bool { @@ -1099,6 +1280,139 @@ mod tests { .contains_key(&pack_key(chunk_idx, dead_pack))); } + #[tokio::test] + async fn test_gc_layer_pool_keeps_referenced_reaps_orphans() { + use crate::oci::layer_store::{ + ensure_layer_stored, layer_base_path, put_image_descriptor, ImageDescriptor, + }; + + fn tiny_tar(marker: u8) -> Vec { + let mut b = tar::Builder::new(Vec::new()); + let data = vec![marker; 4096]; + let mut h = tar::Header::new_gnu(); + h.set_path("file").unwrap(); + h.set_size(data.len() as u64); + h.set_mode(0o644); + h.set_entry_type(tar::EntryType::Regular); + h.set_cksum(); + b.append(&h, &data[..]).unwrap(); + b.into_inner().unwrap() + } + async fn count(store: &Arc, prefix: &str) -> usize { + let p = ObjectPath::from(prefix.to_string()); + store + .list(Some(&p)) + .filter_map(|r| async move { r.ok() }) + .count() + .await + } + + let s3: Arc = Arc::new(InMemory::new()); + let db = "test"; + let keep = format!("sha256:{}", "aa".repeat(32)); + let orphan = format!("sha256:{}", "bb".repeat(32)); + + ensure_layer_stored(&s3, db, &keep, std::io::Cursor::new(tiny_tar(1))) + .await + .unwrap(); + ensure_layer_stored(&s3, db, &orphan, std::io::Cursor::new(tiny_tar(2))) + .await + .unwrap(); + + // An image references only `keep`. + put_image_descriptor( + &s3, + db, + "img", + &ImageDescriptor { + image_ref: "img:latest".into(), + config_digest: "sha256:cfg".into(), + layers: vec![keep.clone()], + layer_sizes: vec![4096], + }, + ) + .await + .unwrap(); + + let budget = AtomicUsize::new(100); + let mut state = GcState::default(); + + // Run 1 (grace 0): orphan is first seen dead → marked, not yet deleted. + let (delta, stats) = + reconcile_layers(&s3, db, &state, Duration::ZERO, &budget, false).await.unwrap(); + assert_eq!(stats.live_layers, 1, "keep is referenced"); + assert_eq!(stats.dead_layers_found, 1, "orphan unreferenced"); + assert_eq!(stats.layers_deleted, 0, "first sighting is never deleted"); + state.apply_delta(delta); + + // Run 2: orphan now past (zero) grace → deleted; keep survives. + let (delta, stats) = + reconcile_layers(&s3, db, &state, Duration::ZERO, &budget, false).await.unwrap(); + assert_eq!(stats.layers_deleted, 1); + state.apply_delta(delta); + + assert!(count(&s3, &layer_base_path(db, &keep)).await > 0, "referenced layer kept"); + assert_eq!(count(&s3, &layer_base_path(db, &orphan)).await, 0, "orphan reaped"); + } + + #[tokio::test] + async fn test_gc_layer_revived_when_image_added() { + use crate::oci::layer_store::{ensure_layer_stored, put_image_descriptor, ImageDescriptor}; + + fn tiny_tar() -> Vec { + let mut b = tar::Builder::new(Vec::new()); + let mut h = tar::Header::new_gnu(); + h.set_path("file").unwrap(); + h.set_size(8); + h.set_mode(0o644); + h.set_entry_type(tar::EntryType::Regular); + h.set_cksum(); + b.append(&h, &b"contents"[..]).unwrap(); + b.into_inner().unwrap() + } + + let s3: Arc = Arc::new(InMemory::new()); + let db = "test"; + let digest = format!("sha256:{}", "cc".repeat(32)); + ensure_layer_stored(&s3, db, &digest, std::io::Cursor::new(tiny_tar())) + .await + .unwrap(); + + let budget = AtomicUsize::new(100); + let mut state = GcState::default(); + + // No image yet → layer marked dead. + let (delta, _) = + reconcile_layers(&s3, db, &state, Duration::ZERO, &budget, false).await.unwrap(); + state.apply_delta(delta); + assert!(state.dead_layers.contains_key(&"cc".repeat(32))); + + // An image now references it → revived (dead mark cleared), never deleted. + put_image_descriptor( + &s3, + db, + "img", + &ImageDescriptor { + image_ref: "img:latest".into(), + config_digest: "sha256:cfg".into(), + layers: vec![digest.clone()], + layer_sizes: vec![8], + }, + ) + .await + .unwrap(); + + let (delta, stats) = + reconcile_layers(&s3, db, &state, Duration::ZERO, &budget, false).await.unwrap(); + state.apply_delta(delta); + assert_eq!(stats.live_layers, 1); + assert_eq!(stats.layers_deleted, 0); + assert!( + !state.dead_layers.contains_key(&"cc".repeat(32)), + "layer must be revived once referenced" + ); + } + #[tokio::test] async fn test_gc_zero_grace_period_deletes_immediately() { let s3: Arc = Arc::new(InMemory::new()); diff --git a/glidefs/src/cli/mod.rs b/glidefs/src/cli/mod.rs index 44038cb..c19570f 100644 --- a/glidefs/src/cli/mod.rs +++ b/glidefs/src/cli/mod.rs @@ -35,10 +35,16 @@ pub enum Commands { /// OCI image reference, e.g. "docker.io/library/ubuntu:24.04" (mutually exclusive with --image) #[arg(long, required_unless_present = "image", conflicts_with = "image")] oci: Option, + /// Store OCI layers as separate content-addressed artifacts (layers + /// survive) so layers shared across images dedup in storage, instead of + /// flattening into one merged ext4. Only valid with --oci. + #[arg(long, requires = "oci")] + layered: bool, /// Base image name (e.g., "ubuntu-22.04-node20-v3") #[arg(long)] name: String, - /// S3 prefix (export namespace) to write the blessed image into + /// S3 prefix (export namespace) to write the blessed image into. + /// Ignored with --layered (layer artifacts use a global namespace). #[arg(long)] s3_prefix: String, /// Config file (for storage URL + credentials) diff --git a/glidefs/src/main.rs b/glidefs/src/main.rs index b705fb7..447467e 100644 --- a/glidefs/src/main.rs +++ b/glidefs/src/main.rs @@ -87,6 +87,7 @@ async fn main() -> Result<()> { cli::Commands::Bless { image, oci, + layered, name, s3_prefix, config, @@ -94,7 +95,11 @@ async fn main() -> Result<()> { if let Some(image_path) = image { cli::bless::run_bless(image_path, name, s3_prefix, config).await?; } else if let Some(image_ref) = oci { - cli::bless::run_bless_oci(image_ref, name, s3_prefix, config).await?; + if layered { + cli::bless::run_bless_oci_layered(image_ref, name, config).await?; + } else { + cli::bless::run_bless_oci(image_ref, name, s3_prefix, config).await?; + } } } cli::Commands::Push { diff --git a/glidefs/src/oci/ext4_store.rs b/glidefs/src/oci/ext4_store.rs new file mode 100644 index 0000000..d286582 --- /dev/null +++ b/glidefs/src/oci/ext4_store.rs @@ -0,0 +1,234 @@ +#![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::cast_possible_truncation)] +//! Shared core for writing an ext4 byte stream into content-addressed storage. +//! +//! This is the streaming engine behind every bless/store path: read fixed-size +//! blocks, skip zeros, dedup within a 128 MiB chunk, and upload each chunk as a +//! content-addressed pack (overlapping upload with the next chunk's reads), +//! producing a [`VolumeManifest`]. It lives in the always-on library (not the +//! feature-gated `cli` module) so both `cli::bless` and `oci::layer_store` can +//! use it. + +use std::collections::HashMap; +use std::io::Read; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use bytes::Bytes; + +use crate::block::block_map::{blake3_128, lz4_compress, shared_zero_block, Blake3Hash}; +use crate::block::content_store::ContentStore; +use crate::block::pack::{content_pack_id, PackId}; +use crate::block::volume_manifest::VolumeManifest; + +/// Fixed block size for the chunked architecture: 128 KiB. +pub const BLOCK_SIZE: u32 = 131_072; + +/// Derive a deterministic, stable ext4 filesystem UUID from a content-addressed +/// digest (an OCI manifest digest, or a single layer digest). +/// +/// The same content always resolves to the same digest, so hashing it yields +/// the same UUID on every run — which makes the produced ext4 byte-identical and +/// therefore content-dedupable. We hash rather than slice the digest so the +/// result is uniformly distributed, then stamp the RFC 4122 version (8 = custom) +/// and variant bits so it is a well-formed UUID. +pub fn deterministic_uuid(digest: &str) -> [u8; 16] { + let mut uuid = blake3_128(digest.as_bytes()).0; + uuid[6] = (uuid[6] & 0x0f) | 0x80; // version 8 (custom) + uuid[8] = (uuid[8] & 0x3f) | 0x80; // variant 1 (RFC 4122) + uuid +} + +/// Upload stats for a stream. +#[derive(Default, Debug, Clone)] +pub struct StoreStats { + pub zero_blocks: usize, + pub unique_blocks: usize, + pub packs_uploaded: usize, + pub bytes_uploaded: u64, + pub chunks_written: usize, +} + +/// Stream an ext4 byte source into content-addressed packs + a `VolumeManifest` +/// under `content_store`'s base path. +/// +/// Returns the manifest, the non-zero block indices (hot set), and upload stats. +/// The caller owns persisting the manifest / hot set under whatever name it +/// chooses. +/// +/// `source` is read for exactly `ceil(device_size / BLOCK_SIZE)` blocks; bytes +/// past EOF are treated as zero (so an ext4 image smaller than `device_size` +/// just yields trailing zero blocks, which are skipped). +pub async fn store_ext4_stream( + content_store: &Arc, + mut source: R, + device_size: u64, +) -> Result<(VolumeManifest, Vec, StoreStats)> { + let vm_template = VolumeManifest::new(device_size, BLOCK_SIZE); + let total_blocks = device_size.div_ceil(u64::from(BLOCK_SIZE)) as usize; + let (_, zero_hash) = shared_zero_block(BLOCK_SIZE as usize); + let mut buf = vec![0u8; BLOCK_SIZE as usize]; + + let mut volume_manifest = VolumeManifest::new(device_size, BLOCK_SIZE); + let mut stats = StoreStats::default(); + let mut hot_set_indices: Vec = Vec::new(); + let mut pending_chunk: Option<(u32, Vec)> = None; + let mut in_flight: Option>> = None; + + for block_index in 0..total_blocks { + let bytes_read = read_full(&mut source, &mut buf)?; + if bytes_read < BLOCK_SIZE as usize { + buf[bytes_read..].fill(0); + } + + let hash = blake3_128(&buf); + if hash == zero_hash { + stats.zero_blocks += 1; + continue; + } + + hot_set_indices.push(block_index as u64); + let chunk_idx = vm_template.chunk_idx_for_block(block_index as u64); + let block_offset = vm_template.block_offset_in_chunk(block_index as u64); + stats.unique_blocks += 1; + let compressed = Bytes::from(lz4_compress(&buf)); + + if pending_chunk.as_ref().is_some_and(|(idx, _)| *idx != chunk_idx) { + let (completed_idx, blocks) = pending_chunk.take().unwrap(); + in_flight = start_chunk_upload( + content_store, + &mut volume_manifest, + &mut stats, + in_flight, + completed_idx, + blocks, + ) + .await?; + } + + pending_chunk + .get_or_insert_with(|| (chunk_idx, Vec::new())) + .1 + .push(BlockInfo { + block_offset, + hash, + compressed, + }); + } + + if let Some((chunk_idx, blocks)) = pending_chunk.take() { + in_flight = start_chunk_upload( + content_store, + &mut volume_manifest, + &mut stats, + in_flight, + chunk_idx, + blocks, + ) + .await?; + } + + join_upload(&mut volume_manifest, &mut stats, in_flight).await?; + Ok((volume_manifest, hot_set_indices, stats)) +} + +/// Result of a completed chunk upload. +struct ChunkUploadResult { + chunk_idx: u32, + pack_id: PackId, + pack_size: u64, +} + +/// Join the previous in-flight upload (if any) and apply its results. +async fn join_upload( + volume_manifest: &mut VolumeManifest, + stats: &mut StoreStats, + in_flight: Option>>, +) -> Result<()> { + if let Some(handle) = in_flight { + let result = handle.await.context("upload task panicked")??; + volume_manifest.append_pack(result.chunk_idx, result.pack_id); + stats.packs_uploaded += 1; + stats.bytes_uploaded += result.pack_size; + stats.chunks_written += 1; + } + Ok(()) +} + +/// Dedup + assemble pack (CPU), then spawn S3 upload overlapped with next chunk's reads. +/// +/// Joins the previous in-flight upload before spawning a new one, so at most +/// one upload is in flight at a time. +async fn start_chunk_upload( + content_store: &Arc, + volume_manifest: &mut VolumeManifest, + stats: &mut StoreStats, + prev_in_flight: Option>>, + chunk_idx: u32, + blocks: Vec, +) -> Result>>> { + // Share compressed Bytes across duplicate hashes (avoids redundant allocations) + // but keep every block offset in the pack index. Two blocks with the same + // hash but different chunk_offsets both need entries — otherwise the read + // path can't find the second block and returns zeros (BlockLocation::Zero). + let mut first_seen: HashMap = HashMap::new(); + let mut pack_blocks: Vec<(Blake3Hash, u32, Bytes)> = Vec::new(); + + for block in blocks { + let compressed = first_seen + .entry(block.hash) + .or_insert_with(|| block.compressed.clone()) + .clone(); + pack_blocks.push((block.hash, block.block_offset, compressed)); + } + + if pack_blocks.is_empty() { + // All-zero chunk — just join previous and move on. + join_upload(volume_manifest, stats, prev_in_flight).await?; + stats.chunks_written += 1; + return Ok(None); + } + + // Sort by chunk_offset for canonical ordering before computing the + // content-addressed pack ID (same as flush and compaction paths). + pack_blocks.sort_by_key(|(_, co, _)| *co); + let pack_id = content_pack_id(&pack_blocks); + + // Join previous upload before spawning next (keeps at most 1 in flight). + join_upload(volume_manifest, stats, prev_in_flight).await?; + + // Spawn streaming S3 upload — runs concurrently with next chunk's disk reads. + let cs = Arc::clone(content_store); + let handle = tokio::spawn(async move { + let entries = cs + .stream_chunk_pack(chunk_idx, pack_id, pack_blocks, BLOCK_SIZE) + .await + .context("Failed to stream chunk pack")?; + let pack_size = entries.iter().map(|e| u64::from(e.comp_length)).sum::(); + Ok(ChunkUploadResult { + chunk_idx, + pack_id, + pack_size, + }) + }); + + Ok(Some(handle)) +} + +/// Block info accumulated during the image scan. +struct BlockInfo { + block_offset: u32, + hash: Blake3Hash, + compressed: Bytes, +} + +/// Read exactly buf.len() bytes, or fewer at EOF. +fn read_full(file: &mut R, buf: &mut [u8]) -> std::io::Result { + let mut total = 0; + while total < buf.len() { + match file.read(&mut buf[total..])? { + 0 => break, + n => total += n, + } + } + Ok(total) +} diff --git a/glidefs/src/oci/layer_store.rs b/glidefs/src/oci/layer_store.rs new file mode 100644 index 0000000..1b6edfa --- /dev/null +++ b/glidefs/src/oci/layer_store.rs @@ -0,0 +1,311 @@ +#![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::cast_possible_truncation)] +//! Content-addressed OCI **layer** storage — the heart of layer-surviving dedup. +//! +//! Instead of flattening an image's layers into one ext4 (where a shared base +//! layer lands at different block offsets per image and never dedups), each OCI +//! layer is converted independently to a deterministic, overlay-preserving ext4 +//! and stored once under a global, content-addressed namespace keyed by the +//! layer digest: +//! +//! ```text +//! {db_path}/layers/{sha256-hex}/manifests/layer ← VolumeManifest +//! {db_path}/layers/{sha256-hex}/chunks/****/*.pack ← content-addressed packs +//! {db_path}/images/{name} ← ImageDescriptor (layer list) +//! ``` +//! +//! Two images that share a layer write to the *same* `layers/{digest}` path, so +//! the packs are identical and stored exactly once. Storing is idempotent: a +//! layer already present (manifest HEAD hit) is skipped. + +use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use ext4::tar_convert::{convert_layer_to_ext4, ConvertOptions}; +use ext4::writer::WriterOption; +use object_store::path::Path as ObjectPath; +use object_store::{ObjectStore, PutPayload}; +use serde::{Deserialize, Serialize}; + +use crate::block::content_store::ContentStore; +use crate::oci::ext4_store::{deterministic_uuid, store_ext4_stream}; + +/// Manifest name for a stored layer (its sole VolumeManifest). +const LAYER_MANIFEST_NAME: &str = "layer"; + +/// Strip the `sha256:` algorithm prefix, leaving the hex digest used as the +/// content-addressed path segment. +fn digest_hex(digest: &str) -> &str { + digest.strip_prefix("sha256:").unwrap_or(digest) +} + +/// Global content-addressed base path for a layer: `{db_path}/layers/{hex}`. +/// +/// Deliberately *not* under any per-image `exports/{prefix}` namespace — the +/// shared location is what lets the same layer dedup across different images. +pub fn layer_base_path(db_path: &str, digest: &str) -> String { + format!("{}/layers/{}", db_path, digest_hex(digest)) +} + +/// S3 key for an image descriptor: `{db_path}/images/{name}`. +pub fn image_key(db_path: &str, name: &str) -> String { + format!("{}/images/{}", db_path, name) +} + +/// Outcome of storing one layer. +#[derive(Debug, Clone)] +pub struct StoredLayer { + pub digest: String, + /// True if the layer was already present (idempotent skip). + pub already_present: bool, + /// Bytes uploaded for this layer (0 when skipped). + pub stored_bytes: u64, +} + +/// An image as an ordered list of content-addressed layers. This is all that is +/// stored per image — the layer artifacts hold the actual blocks. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ImageDescriptor { + /// OCI image reference this was blessed from (informational). + pub image_ref: String, + /// OCI config digest (`sha256:...`). + pub config_digest: String, + /// Layer digests, bottom-to-top (`sha256:...`). + pub layers: Vec, + /// Compressed size of each layer blob, parallel to `layers`. + pub layer_sizes: Vec, +} + +impl ImageDescriptor { + pub fn to_json(&self) -> Result> { + Ok(serde_json::to_vec_pretty(self)?) + } + pub fn from_json(bytes: &[u8]) -> Result { + Ok(serde_json::from_slice(bytes)?) + } +} + +/// Deterministic device size for a layer's ext4 image. +/// +/// Derived solely from the decompressed tar length so the same layer always +/// produces the same geometry (hence byte-identical ext4 → identical packs). +/// Zero blocks past the real content are skipped at store time, so oversizing +/// costs nothing in storage. +fn layer_device_size(tar_len: u64) -> u64 { + (tar_len.saturating_mul(2).max(64 * 1024 * 1024)).next_power_of_two() +} + +/// Ensure a single OCI layer is stored as a content-addressed ext4 artifact. +/// +/// `decompressed_tar` is the layer's *decompressed* tar stream (seekable). +/// Idempotent: if the layer's manifest already exists, returns immediately +/// without re-converting or re-uploading. +pub async fn ensure_layer_stored( + object_store: &Arc, + db_path: &str, + digest: &str, + mut decompressed_tar: R, +) -> Result { + let base = layer_base_path(db_path, digest); + let content_store = Arc::new(ContentStore::new(Arc::clone(object_store), &base)); + + // Idempotent skip: the layer is content-addressed, so an existing manifest + // means the identical bytes are already stored. + if content_store + .head_manifest(LAYER_MANIFEST_NAME) + .await + .context("HEAD layer manifest")? + { + return Ok(StoredLayer { + digest: digest.to_string(), + already_present: true, + stored_bytes: 0, + }); + } + + // Size the device from the decompressed tar length (deterministic per layer). + let tar_len = decompressed_tar.seek(SeekFrom::End(0))?; + decompressed_tar.seek(SeekFrom::Start(0))?; + let device_size = layer_device_size(tar_len); + + // Convert the layer to an overlay-preserving, deterministic ext4. The UUID + // is derived from the layer digest so the output is byte-identical across + // images — the property that makes the packs dedup. + let opts = ConvertOptions { + convert_backslash: false, + writer_options: vec![ + WriterOption::MaximumDiskSize(device_size as i64), + WriterOption::Uuid(deterministic_uuid(digest)), + WriterOption::Journal(1024), // 4 MiB journal — same as bless + ], + }; + let mut ext4_tmp = tempfile::tempfile().context("layer ext4 tempfile")?; + convert_layer_to_ext4(&mut decompressed_tar, &mut ext4_tmp, &opts) + .context("convert layer to ext4")?; + ext4_tmp.seek(SeekFrom::Start(0))?; + + // Stream the ext4 into content-addressed packs + manifest under layers/{digest}. + let (volume_manifest, _hot_set, stats) = + store_ext4_stream(&content_store, ext4_tmp, device_size).await?; + content_store + .put_manifest(LAYER_MANIFEST_NAME, volume_manifest.serialize()?, None) + .await + .context("put layer manifest")?; + + Ok(StoredLayer { + digest: digest.to_string(), + already_present: false, + stored_bytes: stats.bytes_uploaded, + }) +} + +/// Persist an image descriptor at `images/{name}`. +pub async fn put_image_descriptor( + object_store: &Arc, + db_path: &str, + name: &str, + descriptor: &ImageDescriptor, +) -> Result<()> { + let path = ObjectPath::from(image_key(db_path, name)); + object_store + .put(&path, PutPayload::from(descriptor.to_json()?)) + .await + .context("put image descriptor")?; + Ok(()) +} + +/// Load an image descriptor, or None if it does not exist. +pub async fn get_image_descriptor( + object_store: &Arc, + db_path: &str, + name: &str, +) -> Result> { + let path = ObjectPath::from(image_key(db_path, name)); + match object_store.get(&path).await { + Ok(resp) => { + let bytes = resp.bytes().await.context("read image descriptor")?; + Ok(Some(ImageDescriptor::from_json(&bytes)?)) + } + Err(object_store::Error::NotFound { .. }) => Ok(None), + Err(e) => Err(e).context("get image descriptor"), + } +} + +/// List all image descriptor names under `{db_path}/images/`. +pub async fn list_image_descriptors( + object_store: &Arc, + db_path: &str, +) -> Result> { + use futures::StreamExt; + let prefix_str = format!("{}/images/", db_path); + let prefix = ObjectPath::from(prefix_str.clone()); + let mut names = Vec::new(); + let mut stream = object_store.list(Some(&prefix)); + while let Some(meta) = stream.next().await { + let meta = meta.context("list images")?; + let loc = meta.location.to_string(); + if let Some(rel) = loc.strip_prefix(&prefix_str) + && !rel.is_empty() + { + names.push(rel.to_string()); + } + } + Ok(names) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use object_store::memory::InMemory; + + fn tar_with(files: &[(&str, &[u8])]) -> Vec { + let mut b = tar::Builder::new(Vec::new()); + for (path, data) in files { + let mut h = tar::Header::new_gnu(); + h.set_path(path).unwrap(); + h.set_size(data.len() as u64); + h.set_mode(0o644); + h.set_entry_type(tar::EntryType::Regular); + h.set_cksum(); + b.append(&h, *data).unwrap(); + } + b.into_inner().unwrap() + } + + async fn count_objects(store: &Arc, prefix: &str) -> usize { + let p = ObjectPath::from(prefix.to_string()); + store + .list(Some(&p)) + .filter_map(|r| async move { r.ok() }) + .count() + .await + } + + #[tokio::test] + async fn ensure_layer_stored_is_idempotent() { + let store: Arc = Arc::new(InMemory::new()); + let db = "db"; + let digest = "sha256:1111111111111111111111111111111111111111111111111111111111111111"; + let tar = tar_with(&[("a.txt", b"hello"), ("b.txt", b"world")]); + + let r1 = ensure_layer_stored(&store, db, digest, std::io::Cursor::new(tar.clone())) + .await + .unwrap(); + assert!(!r1.already_present); + assert!(r1.stored_bytes > 0); + let n1 = count_objects(&store, &layer_base_path(db, digest)).await; + assert!(n1 >= 2, "expected manifest + ≥1 pack, got {n1}"); + + // Re-store the same digest → idempotent skip, no new objects. + let r2 = ensure_layer_stored(&store, db, digest, std::io::Cursor::new(tar)) + .await + .unwrap(); + assert!(r2.already_present); + assert_eq!(r2.stored_bytes, 0); + let n2 = count_objects(&store, &layer_base_path(db, digest)).await; + assert_eq!(n1, n2, "no new objects on idempotent re-store"); + } + + #[tokio::test] + async fn identical_layers_share_storage() { + let store: Arc = Arc::new(InMemory::new()); + let db = "db"; + // Same content → same digest → same base path → stored once. + let digest = "sha256:2222222222222222222222222222222222222222222222222222222222222222"; + let tar = tar_with(&[("bin/sh", &[7u8; 200_000])]); + + ensure_layer_stored(&store, db, digest, std::io::Cursor::new(tar.clone())) + .await + .unwrap(); + let after_first = count_objects(&store, &format!("{db}/layers/")).await; + + // A second image references the same layer digest: skipped, no growth. + let r = ensure_layer_stored(&store, db, digest, std::io::Cursor::new(tar)) + .await + .unwrap(); + assert!(r.already_present); + let after_second = count_objects(&store, &format!("{db}/layers/")).await; + assert_eq!(after_first, after_second, "shared layer stored exactly once"); + } + + #[tokio::test] + async fn image_descriptor_roundtrip() { + let store: Arc = Arc::new(InMemory::new()); + let db = "db"; + let d = ImageDescriptor { + image_ref: "img:latest".into(), + config_digest: "sha256:cfg".into(), + layers: vec!["sha256:a".into(), "sha256:b".into()], + layer_sizes: vec![10, 20], + }; + put_image_descriptor(&store, db, "img", &d).await.unwrap(); + let got = get_image_descriptor(&store, db, "img") + .await + .unwrap() + .expect("descriptor present"); + assert_eq!(got, d); + assert_eq!(list_image_descriptors(&store, db).await.unwrap(), vec!["img".to_string()]); + assert!(get_image_descriptor(&store, db, "missing").await.unwrap().is_none()); + } +} diff --git a/glidefs/src/oci/mod.rs b/glidefs/src/oci/mod.rs index 853bf7b..6bf4b58 100644 --- a/glidefs/src/oci/mod.rs +++ b/glidefs/src/oci/mod.rs @@ -1,6 +1,8 @@ pub mod block_adapter; pub mod export; +pub mod ext4_store; pub mod ingest; +pub mod layer_store; pub mod pull; pub mod push; diff --git a/glidefs/src/oci/pull.rs b/glidefs/src/oci/pull.rs index b260a11..373cb7c 100644 --- a/glidefs/src/oci/pull.rs +++ b/glidefs/src/oci/pull.rs @@ -80,7 +80,7 @@ pub async fn pull_image( } /// Download and decompress a single layer to a seekable temp file. -async fn pull_layer_to_tempfile( +pub async fn pull_layer_to_tempfile( client: &oci_registry::RegistryClient, image: &Reference, layer: &OciDescriptor, diff --git a/glidefs/tests/docker_integration/mod.rs b/glidefs/tests/docker_integration/mod.rs index 58ea23d..5645e27 100644 --- a/glidefs/tests/docker_integration/mod.rs +++ b/glidefs/tests/docker_integration/mod.rs @@ -1076,6 +1076,7 @@ mod integrity_suite; mod live_migration; mod multi_export; mod oci_distribution; +mod oci_layer_dedup; mod oci_push; mod persistence; mod range_reads; diff --git a/glidefs/tests/docker_integration/oci_layer_dedup.rs b/glidefs/tests/docker_integration/oci_layer_dedup.rs new file mode 100644 index 0000000..a8ff737 --- /dev/null +++ b/glidefs/tests/docker_integration/oci_layer_dedup.rs @@ -0,0 +1,473 @@ +//! End-to-end proof that **layered** OCI bless dedups shared layers. +//! +//! Builds three images that share an identical base layer, pushes them to a +//! local `registry:2`, then runs the layered bless (each layer → its own +//! content-addressed ext4 under `layers/{digest}`) and asserts: +//! +//! 1. the shared base layer is stored exactly once (idempotent across images), +//! 2. total layered storage is smaller than merged-bless storage of the same +//! three images (which can't dedup the shared layer — proven empirically +//! elsewhere to share 0 blocks), +//! 3. overlay-composing the per-layer ext4s reconstructs the same filesystem +//! the merged path produces (including a whiteout deletion and an opaque +//! directory), +//! 4. the image→layer reference invariant GC relies on holds (dropping one +//! image keeps a layer still referenced by the others). +//! +//! Run: `cargo test --features docker-tests --test docker_integration oci_layer_dedup` + +use std::collections::{BTreeMap, HashSet}; +use std::io::{Cursor, Write}; +use std::sync::Arc; + +use bytes::Bytes; +use flate2::write::GzEncoder; +use flate2::Compression; +use object_store::memory::InMemory; +use object_store::path::Path as ObjectPath; +use object_store::ObjectStore; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, GenericImage}; + +use ext4::format; +use ext4::reader::Reader; +use ext4::tar_convert::{convert_layer_to_ext4, convert_oci_layers_to_ext4, ConvertOptions}; +use ext4::writer::WriterOption; + +use glidefs::block::content_store::ContentStore; +use glidefs::oci::ext4_store::{deterministic_uuid, store_ext4_stream}; +use glidefs::oci::hex_sha256; +use glidefs::oci::layer_store::{ + ensure_layer_stored, get_image_descriptor, layer_base_path, list_image_descriptors, + put_image_descriptor, ImageDescriptor, StoredLayer, +}; +use glidefs::oci::pull::pull_layer_to_tempfile; +use oci_registry::{ + ClientConfig, ClientProtocol, Credentials, OciDescriptor, OciImageManifest, Reference, + RegistryClient, +}; + +// --------------------------------------------------------------------------- +// Registry + client (mirrors bless_api.rs / oci_push.rs) +// --------------------------------------------------------------------------- + +async fn start_registry() -> (ContainerAsync, String) { + let container = GenericImage::new("registry", "2") + .with_exposed_port(5000.into()) + .start() + .await + .expect("failed to start registry:2"); + let host = container.get_host().await.unwrap(); + let mut port = None; + for _ in 0..20 { + if let Ok(p) = container.get_host_port_ipv4(5000).await { + port = Some(p); + break; + } + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + let addr = format!("{}:{}", host, port.expect("registry port")); + for _ in 0..50 { + if tokio::net::TcpStream::connect(&addr).await.is_ok() { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + (container, addr) +} + +fn http_client() -> RegistryClient { + RegistryClient::with_config(ClientConfig { + protocol: ClientProtocol::Http, + ..Default::default() + }) +} + +// --------------------------------------------------------------------------- +// Building & pushing controlled multi-layer images +// --------------------------------------------------------------------------- + +/// A tar entry to synthesize a layer. +enum E<'a> { + File(&'a str, &'a [u8]), + Dir(&'a str), + /// OCI whiteout: deletes `path` from lower layers. Stored as `.wh.`. + Whiteout(&'a str), + /// OCI opaque whiteout: hides all lower entries in `dir`. + Opaque(&'a str), +} + +/// Build a deterministic (mtime=0) tar from entries. +fn build_tar(entries: &[E]) -> Vec { + let mut b = tar::Builder::new(Vec::new()); + let mut push = |path: String, data: &[u8], dir: bool| { + let mut h = tar::Header::new_gnu(); + h.set_path(&path).unwrap(); + h.set_size(data.len() as u64); + h.set_mode(if dir { 0o755 } else { 0o644 }); + h.set_mtime(0); + h.set_entry_type(if dir { + tar::EntryType::Directory + } else { + tar::EntryType::Regular + }); + h.set_cksum(); + b.append(&h, data).unwrap(); + }; + for e in entries { + match e { + E::File(p, d) => push((*p).to_string(), d, false), + E::Dir(p) => push((*p).to_string(), &[], true), + E::Whiteout(p) => { + // `dir/name` → `dir/.wh.name`; bare `name` → `.wh.name`. + let wh = match p.rsplit_once('/') { + Some((dir, name)) => format!("{dir}/.wh.{name}"), + None => format!(".wh.{p}"), + }; + push(wh, &[], false); + } + E::Opaque(dir) => push(format!("{dir}/.wh..wh..opq"), &[], false), + } + } + b.into_inner().unwrap() +} + +/// Deterministic gzip so identical layer content → identical blob → identical +/// digest (the basis of cross-image layer dedup). +fn gzip(raw: &[u8]) -> Vec { + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + e.write_all(raw).unwrap(); + e.finish().unwrap() +} + +/// Push a multi-layer image (gzip layers + minimal config + manifest) and return +/// its reference string. +async fn push_image(client: &RegistryClient, addr: &str, repo_tag: &str, layers_raw: &[Vec]) -> String { + let auth = Credentials::Anonymous; + let image_ref = format!("{addr}/test/{repo_tag}"); + let image: Reference = image_ref.parse().unwrap(); + + let config_bytes = + br#"{"architecture":"amd64","os":"linux","rootfs":{"type":"layers","diff_ids":[]}}"#.to_vec(); + let config_digest = format!("sha256:{}", hex_sha256(&config_bytes)); + client + .push_blob( + &image, + futures::stream::iter([Ok(Bytes::from(config_bytes.clone()))]), + &config_digest, + &auth, + ) + .await + .unwrap(); + + let mut descriptors = Vec::new(); + for raw in layers_raw { + let blob = gzip(raw); + let digest = format!("sha256:{}", hex_sha256(&blob)); + client + .push_blob( + &image, + futures::stream::iter([Ok(Bytes::from(blob.clone()))]), + &digest, + &auth, + ) + .await + .unwrap(); + descriptors.push(OciDescriptor { + media_type: "application/vnd.oci.image.layer.v1.tar+gzip".to_string(), + digest, + size: blob.len() as i64, + ..Default::default() + }); + } + + let manifest = OciImageManifest { + schema_version: 2, + media_type: Some("application/vnd.oci.image.manifest.v1+json".to_string()), + config: OciDescriptor { + media_type: "application/vnd.oci.image.config.v1+json".to_string(), + digest: config_digest, + size: config_bytes.len() as i64, + ..Default::default() + }, + layers: descriptors, + ..Default::default() + }; + client.push_manifest(&image, &manifest, &auth).await.unwrap(); + image_ref +} + +// --------------------------------------------------------------------------- +// Layered bless (mirrors run_bless_oci_layered, but with the http client) +// --------------------------------------------------------------------------- + +async fn bless_layered( + client: &RegistryClient, + image_ref: &str, + object_store: &Arc, + db_path: &str, + name: &str, +) -> Vec { + let auth = Credentials::Anonymous; + let image: Reference = image_ref.parse().unwrap(); + let resolved = client.resolve(&image, &auth).await.unwrap(); + + let mut results = Vec::new(); + let mut digests = Vec::new(); + let mut sizes = Vec::new(); + for layer in &resolved.layers { + let tmp = pull_layer_to_tempfile(client, &image, layer, &auth) + .await + .unwrap(); + let stored = ensure_layer_stored(object_store, db_path, &layer.digest, tmp) + .await + .unwrap(); + digests.push(layer.digest.clone()); + sizes.push(layer.size as u64); + results.push(stored); + } + + put_image_descriptor( + object_store, + db_path, + name, + &ImageDescriptor { + image_ref: image_ref.to_string(), + config_digest: resolved.manifest.config.digest.clone(), + layers: digests, + layer_sizes: sizes, + }, + ) + .await + .unwrap(); + results +} + +// --------------------------------------------------------------------------- +// Merged-bless baseline (what flattening costs, no cross-image dedup) +// --------------------------------------------------------------------------- + +async fn merged_stored_bytes( + object_store: &Arc, + db_path: &str, + name: &str, + layers_raw: &[Vec], + uuid_seed: &str, +) -> u64 { + let mut layers: Vec>> = layers_raw.iter().cloned().map(Cursor::new).collect(); + let total: u64 = layers_raw.iter().map(|l| l.len() as u64).sum(); + let device_size = (total.saturating_mul(3).max(64 * 1024 * 1024)).next_power_of_two(); + let opts = ConvertOptions { + convert_backslash: false, + writer_options: vec![ + WriterOption::MaximumDiskSize(device_size as i64), + WriterOption::Uuid(deterministic_uuid(uuid_seed)), + WriterOption::Journal(1024), + ], + }; + let ext4 = convert_oci_layers_to_ext4(&mut layers, Cursor::new(Vec::new()), &opts) + .unwrap() + .into_inner(); + let cs = Arc::new(ContentStore::new( + Arc::clone(object_store), + &format!("{db_path}/exports/{name}"), + )); + let (_vm, _hot, stats) = store_ext4_stream(&cs, Cursor::new(ext4), device_size) + .await + .unwrap(); + stats.bytes_uploaded +} + +// --------------------------------------------------------------------------- +// Composition: overlay-merge per-layer ext4s vs the merged ext4 +// --------------------------------------------------------------------------- + +fn read_regular(reader: &mut Reader>, inode_number: u32) -> Vec { + let inode = reader.read_inode(inode_number).unwrap(); + reader.read_data(&inode).unwrap() +} + +/// Visible regular files of the merged ext4 (the canonical answer). +fn merged_files(layers_raw: &[Vec]) -> BTreeMap> { + let mut layers: Vec>> = layers_raw.iter().cloned().map(Cursor::new).collect(); + let opts = ConvertOptions { + convert_backslash: false, + writer_options: vec![WriterOption::Uuid([0x33; 16]), WriterOption::Journal(1024)], + }; + let image = convert_oci_layers_to_ext4(&mut layers, Cursor::new(Vec::new()), &opts) + .unwrap() + .into_inner(); + let mut reader = Reader::new(Cursor::new(image.as_slice())).unwrap(); + let mut out = BTreeMap::new(); + for e in reader.walk().unwrap() { + if e.mode & format::TYPE_MASK == format::S_IFREG { + let data = read_regular(&mut reader, e.inode_number); + out.insert(e.path, data); + } + } + out +} + +/// Visible regular files obtained by overlay-stacking the per-layer ext4s, +/// applying overlayfs semantics (char-dev whiteout, opaque dirs). +fn overlay_files(layers_raw: &[Vec]) -> BTreeMap> { + let mut visible: BTreeMap> = BTreeMap::new(); + for raw in layers_raw { + let opts = ConvertOptions { + convert_backslash: false, + writer_options: vec![WriterOption::Uuid([0x44; 16]), WriterOption::Journal(1024)], + }; + let image = convert_layer_to_ext4(Cursor::new(raw.clone()), Cursor::new(Vec::new()), &opts) + .unwrap() + .into_inner(); + let mut reader = Reader::new(Cursor::new(image.as_slice())).unwrap(); + let entries = reader.walk().unwrap(); + for e in &entries { + let kind = e.mode & format::TYPE_MASK; + if kind == format::S_IFCHR && e.devmajor == 0 && e.devminor == 0 { + // Whiteout: delete this path (and any subtree) from lower layers. + let prefix = format!("{}/", e.path); + visible.remove(&e.path); + visible.retain(|k, _| !k.starts_with(&prefix)); + } else if kind == format::S_IFDIR + && e.xattrs.get("trusted.overlay.opaque").map(Vec::as_slice) == Some(b"y") + { + // Opaque dir: drop everything from lower layers under it. + let prefix = format!("{}/", e.path); + visible.retain(|k, _| !k.starts_with(&prefix)); + } + } + // Add this layer's regular files (after opaque purge above). + for e in &entries { + if e.mode & format::TYPE_MASK == format::S_IFREG { + let data = read_regular(&mut reader, e.inode_number); + visible.insert(e.path.clone(), data); + } + } + } + visible +} + +fn digest_hex(d: &str) -> &str { + d.strip_prefix("sha256:").unwrap_or(d) +} + +/// Replicates GC's live-set collection from image descriptors (the invariant +/// GC ref-counting relies on). +async fn referenced_layers(store: &Arc, db: &str) -> HashSet { + let mut live = HashSet::new(); + for name in list_image_descriptors(store, db).await.unwrap() { + if let Some(d) = get_image_descriptor(store, db, &name).await.unwrap() { + for l in &d.layers { + live.insert(digest_hex(l).to_string()); + } + } + } + live +} + +async fn count_objects(store: &Arc, prefix: &str) -> usize { + use futures::StreamExt; + let p = ObjectPath::from(prefix.to_string()); + store + .list(Some(&p)) + .filter_map(|r| async move { r.ok() }) + .count() + .await +} + +// --------------------------------------------------------------------------- +// The test +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn layered_oci_bless_dedups_shared_layers_e2e() { + let (_registry, addr) = start_registry().await; + let client = http_client(); + + // A base layer shared by all three images (a big shared file dominates size). + let base = build_tar(&[ + E::Dir("etc"), + E::File("etc/shared.bin", &vec![0x5A_u8; 300_000]), + E::File("etc/keep", b"keep"), + E::File("b", b"to-be-removed"), + E::Dir("dir"), + E::File("dir/x", b"x"), + E::File("dir/y", b"y"), + ]); + let upper1 = build_tar(&[E::File("app1.bin", &vec![1u8; 50_000])]); + // upper2 exercises a whiteout (delete /b) and an opaque dir (/dir). + let upper2 = build_tar(&[ + E::File("app2.bin", &vec![2u8; 60_000]), + E::Whiteout("b"), + E::Opaque("dir"), + E::File("dir/z", b"z"), + ]); + let upper3 = build_tar(&[E::File("app3.bin", &vec![3u8; 70_000])]); + + let img1_layers = vec![base.clone(), upper1]; + let img2_layers = vec![base.clone(), upper2]; + let img3_layers = vec![base.clone(), upper3]; + + let ref1 = push_image(&client, &addr, "img1:v1", &img1_layers).await; + let ref2 = push_image(&client, &addr, "img2:v1", &img2_layers).await; + let ref3 = push_image(&client, &addr, "img3:v1", &img3_layers).await; + + let store: Arc = Arc::new(InMemory::new()); + let db = "test"; + + let r1 = bless_layered(&client, &ref1, &store, db, "img1").await; + let r2 = bless_layered(&client, &ref2, &store, db, "img2").await; + let r3 = bless_layered(&client, &ref3, &store, db, "img3").await; + + // (1) Shared base layer stored once; reused by images 2 and 3. + assert!(!r1[0].already_present, "first image stores the base layer"); + assert!(r2[0].already_present, "second image reuses the base layer"); + assert!(r3[0].already_present, "third image reuses the base layer"); + assert!(!r1[1].already_present && !r2[1].already_present && !r3[1].already_present, + "each upper layer is unique and stored fresh"); + assert_eq!(r1[0].digest, r2[0].digest, "base layer digest is identical across images"); + assert_eq!(r2[0].digest, r3[0].digest); + let base_digest = r1[0].digest.clone(); + assert!( + count_objects(&store, &layer_base_path(db, &base_digest)).await >= 2, + "base layer artifact (manifest + ≥1 pack) exists once", + ); + + // (2) Layered storage < merged storage of the same three images. + let layered_total: u64 = [&r1, &r2, &r3] + .iter() + .flat_map(|r| r.iter()) + .map(|s| s.stored_bytes) + .sum(); + let mstore: Arc = Arc::new(InMemory::new()); + let merged_total = merged_stored_bytes(&mstore, db, "m1", &img1_layers, "seed-1").await + + merged_stored_bytes(&mstore, db, "m2", &img2_layers, "seed-2").await + + merged_stored_bytes(&mstore, db, "m3", &img3_layers, "seed-3").await; + assert!( + layered_total < merged_total, + "layered storage {layered_total} must be smaller than merged {merged_total}", + ); + + // (3) Composition correctness: overlay-stacking the per-layer ext4s yields + // the same filesystem the merged path produces — including the /b whiteout + // deletion and the opaque /dir. + let overlay = overlay_files(&img2_layers); + let merged = merged_files(&img2_layers); + assert_eq!(overlay, merged, "overlay composition must equal the merged filesystem"); + assert!(!merged.contains_key("b"), "whiteout deleted /b"); + assert!(!merged.contains_key("dir/x") && !merged.contains_key("dir/y"), "opaque dropped lower /dir entries"); + assert_eq!(merged.get("dir/z").map(Vec::as_slice), Some(b"z".as_slice())); + assert_eq!(merged.get("app2.bin").map(|v| v.len()), Some(60_000)); + + // (4) GC reference invariant: dropping one image keeps the base referenced. + store + .delete(&ObjectPath::from(format!("{db}/images/img1"))) + .await + .unwrap(); + let live = referenced_layers(&store, db).await; + assert!( + live.contains(digest_hex(&base_digest)), + "base layer still referenced by img2/img3 after dropping img1", + ); +}