diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f84178d0..db582cdb 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -471,5 +471,5 @@ jobs: run: cargo fuzz run fuzz_volume_manifest -- -max_total_time=30 - name: Fuzz hot set run: cargo fuzz run fuzz_hot_set -- -max_total_time=30 - - name: Fuzz LZ4 decompress - run: cargo fuzz run fuzz_lz4_decompress -- -max_total_time=30 + - name: Fuzz block decompress (LZ4 + zstd auto-detect) + run: cargo fuzz run fuzz_decompress_block -- -max_total_time=30 diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 75481d89..554c2f28 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1,6 +1,6 @@ # GlideFS Architecture -Takes block I/O commands (read/write/flush/write_zeroes) from a Linux kernel block device (`/dev/nbdN` or `/dev/ublkbN`), serves reads from a tiered cache (local SSD → in-memory Foyer → SSD Foyer → S3), buffers writes to local SSD (~5µs), and asynchronously uploads dirty blocks to S3 as LZ4-compressed, content-addressed packs. Transport-agnostic: NBD (default, cross-platform) and ublk (Linux 6.0+, io_uring-based, opt-in via `--features ublk`). +Takes block I/O commands (read/write/flush/write_zeroes) from a Linux kernel block device (`/dev/nbdN` or `/dev/ublkbN`), serves reads from a tiered cache (local SSD → in-memory Foyer → SSD Foyer → S3), buffers writes to local SSD (~5µs), and asynchronously uploads dirty blocks to S3 as compressed (zstd by default; legacy LZ4 packs still read via codec auto-detection), content-addressed packs. Transport-agnostic: NBD (default, cross-platform) and ublk (Linux 6.0+, io_uring-based, opt-in via `--features ublk`). ## Data Flow @@ -82,7 +82,7 @@ WriteCache ──► is_present(block_idx)? └── ContentStore::get_chunk_block(chunk_idx, pack_id, offset, comp_length) ├── S3 error → EIO to guest (block stays NOT_PRESENT; next read retries) ├── BLAKE3 mismatch → HashMismatch error → EIO to guest - └── OK → LZ4 decompress → verify BLAKE3 → insert CleanCache → return + └── OK → decompress (zstd or legacy LZ4, auto-detected) → verify BLAKE3 → insert CleanCache → return ``` Multi-block reads fan out with `futures::future::try_join_all()`. Sequential access (3+ consecutive chunk accesses) triggers prefetch of the next pack boundary to hide S3 latency. (`readahead.rs`) @@ -109,7 +109,7 @@ For each chunk (one pack per chunk per flush cycle): │ ├── pread block from SSD │ ├── CRC32 verify from SparseCrcMap (if available) │ ├── Skip zero blocks (well-known hash sentinel) - │ ├── BLAKE3-128 hash → LZ4 compress + │ ├── BLAKE3-128 hash → compress (zstd by default; per-cache level) │ └── Collect into Vec<(hash, chunk_offset, compressed)> ├── ContentStore::stream_chunk_pack(): │ ├── WriteMultipart::new(put_multipart_opts(...)) ← streaming S3 upload @@ -155,7 +155,7 @@ Each pack is self-describing — the block index is a footer (trailer → index | Export | A virtual block device served over a transport, with its own cache and S3 prefix | Not a filesystem — raw blocks only | | Block | Fixed-size unit of data (default 128 KB to match ZFS recordsize) | Not variable-sized | | Volume Chunk | 128 MiB range of blocks (1,024 blocks of 128 KB = 1 ext4 block group). The unit of pack scoping, compaction, and metadata management. | Not a 128 KB block — "chunk" means 128 MiB range. Aligns with ext4 block groups, bounding database scatter to 2–3 chunks per flush | -| Pack | GLPK S3 object containing LZ4-compressed blocks scoped to one volume chunk. Footer-indexed: header + block data + index footer + GLIX trailer. | Not cross-chunk | +| Pack | GLPK S3 object containing compressed blocks scoped to one volume chunk (zstd by default; legacy LZ4 packs still read via per-block codec auto-detection). Footer-indexed: header + block data + index footer + GLIX trailer. | Not cross-chunk | | PackId | 8-byte random `u64` identifying one pack within its chunk. Hex string in S3 key. | Not a UUID. Collision-safe: birthday bound ~4.3 billion per chunk, and chunks see hundreds of IDs over their lifetime | | VolumeManifest (GLVM) | Binary file mapping `chunk_idx → [pack_id, ...]`. Sparse: only written chunks appear. The root of an export's metadata. CRC32-protected. | Not the full block index — pack IDs point to self-describing packs that contain the block-level index | | ChunkEntry | `Vec` for one chunk, ordered oldest-to-newest. After compaction: single entry. | Not block-level index — that lives in each pack's embedded index | @@ -220,7 +220,7 @@ The write path avoids all locks. Three techniques make this possible: Every block is identified by its BLAKE3-128 hash (16 bytes, truncated from 256-bit), computed at flush time (not on the write path). This enables: - **Within-batch deduplication**: During flush, zero blocks and within-batch duplicates are deduplicated (seen_hashes set). Two blocks at different `chunk_offsets` with the same hash each get their own index entry — required for the read path to resolve them by position. -- **Integrity verification**: Read path verifies hash after S3 fetch and LZ4 decompression. +- **Integrity verification**: Read path verifies the hash after S3 fetch and decompression (codec auto-detected: zstd or legacy LZ4). - **Sparse manifests**: VolumeManifest only stores chunks that have been written — a 500 GB export with 2 GB of data has a tiny manifest. The well-known hash of a 128 KB zero block (`zero_block_hash()`) lets the flush path skip blocks that are all-zeros — they're deduplicated without storage or S3 interaction. (`block_map.rs`) @@ -272,6 +272,38 @@ A lock-free circuit breaker protects against S3 outages. All mutable state is pa Two failure policies: **Consecutive** (N failures in a row) and **Windowed** (N failures within a time window). Only connectivity errors count — business logic errors (404, etc.) don't trip the breaker. (`circuit_breaker.rs`) +## Deduplication Model + +Dedup happens in three places, at three different granularities, and they don't behave the same way. Knowing which is which explains what actually shares storage and what doesn't. + +| Tier | Addressed by | Granularity | What it dedups | +|------|--------------|-------------|----------------| +| **Lineage (CoW)** | manifest reference | block (pack-id list) | A fork/snapshot inherits the parent's manifest and shares the parent's packs (same `s3_prefix`). This is the **primary** cross-volume dedup — but only *along ancestry*. | +| **Host clean cache** | **content** (BLAKE3-128) | 128 KiB block | Host-global, shared across all exports. Two *unrelated* volumes that read a byte-identical block resolve to **one resident copy** in RAM/SSD. No lineage, no opt-in. | +| **S3 packs** | **position** (chunk + offset) | pack | A pack lays blocks out in `chunk_offset` order. Dedup is limited to: zero blocks (skipped), cross-flush re-writes of the *same (offset, content)* (`blocks_cross_deduped`), identical *whole* packs within a prefix (`head_chunk_pack`), and OCI `--layered` (whole layers by digest, global `layers/{digest}`). | + +### The addressing asymmetry (and why it's deliberate) + +**S3 is position-addressed; the host cache is content-addressed.** That is a design choice matched to each tier's access pattern, not an inconsistency: + +- **S3 (cold, bulk):** consecutive logical blocks sit contiguous in a pack, so a multi-block read is **one ranged GET** and a flush is **one PUT**. S3 bills per request, so locality and batching dominate cost. Content-addressing each block would scatter consecutive blocks by hash → N random GETs, N× requests, no batching. +- **Host cache (hot, random-access):** there is no locality concern in RAM, and memory is scarce, so **dedup is density**. Content-addressing is the right primitive. + +Content-addressing and range-read locality are fundamentally opposed (this is the same tension that rules out content-defined chunking here). So each tier picks the axis that matters for it. + +### Consequences (the things this implies) + +- **Cross-lineage content overlap is *not* deduped in S3** — only the host cache (per-host, hot set) and OCI `--layered` (whole shared base layers) catch it. Two independently-blessed images in different prefixes store their shared bytes twice in S3. +- **Within a rootfs, identical content at different offsets is stored once per offset in S3** (position-addressed). Zeros (skipped) and hardlinks (shared extents) — the bulk of intra-image duplication — are already neutralized; what remains is non-hardlinked identical files, usually small. The host cache dedups all of it on read regardless. +- **`WriterOption::AlignData` helps the cache, not intra-rootfs S3.** Aligning a file to the block grid makes it produce identical *block hashes* at stable offsets, which the content-addressed cache exploits and which lets *whole packs* match across deterministic re-blesses. It does **not** dedup those blocks within a rootfs in S3, because packs are position-addressed. +- **More S3 dedup is only available at pack/layer granularity** (`--layered`, or a future global content-addressed pack store), never sub-pack block dedup — that would break range reads. A global pack store's cost is GC: pack liveness is O(all manifests), whereas layer liveness is O(images) — which is why `--layered` exists and finer-grained global dedup doesn't. + +### Compression + +Blocks are compressed independently at flush time via `block_map::compress_block(data, level)`. The default is **zstd-1** for runtime exports (~LZ4 compress cost, ~23% smaller) and **zstd-19** for `bless` (offline, write-once/read-many; ~37% smaller, and zstd decode is ~level-independent so the most-read data pays only at build time). `GLIDEFS_COMPRESSION_LEVEL` overrides the default; `0` pins legacy LZ4. + +The read path (`decompress_block`) detects the codec per block by sniffing the zstd frame magic, so **legacy LZ4 packs remain readable forever** — there was no on-disk format change. A pack may even hold both codecs (compaction reuses each block's original compressed bytes). `content_pack_id` mixes the compressed bytes, so a zstd pack simply gets a new id; cross-flush dedup keys on the *uncompressed* BLAKE3 hash and is codec-independent. Compression is orthogonal to dedup — it shrinks stored/transferred bytes without changing what shares. + ## File Rotation & Eviction Local SSD is a bounded write-back buffer, not a persistent cache. After each flush to S3, blocks are evicted (SYNCING→NOT_PRESENT) and the flushing file is deleted. SSD footprint per export: `(dirty + syncing) × block_size` — only blocks modified since the last flush consume local space. @@ -298,7 +330,7 @@ After flush: {name}.cache ← active 6. Swap `data_file` handle (new active file goes into the RwLock) 7. Store old handle in `flushing_file: Mutex>>` 8. Release write lock (~15µs total hold time) -9. `compute_flush_batch` reads from `flushing_file` (rayon parallel: pread + CRC32 + BLAKE3 + LZ4) +9. `compute_flush_batch` reads from `flushing_file` (rayon parallel: pread + CRC32 + BLAKE3 + compress) 10. Stream GLPK v3 packs to S3 11. Finalize: CAS SYNCING→NOT_PRESENT (evict), copy skipped blocks flushing→active 12. `flushing_active.store(false)`, drop flushing_file, `unlink("{name}.flushing")` @@ -496,7 +528,7 @@ Self-describing S3 object. Each pack is scoped to one volume chunk. The block in │ chunk_size: u32 LE _reserved: [u8; 4] │ ├────────────────────────────────────────────────────────────┤ │ Block Data (immediately after header) │ -│ [LZ4-compressed blocks, concatenated] │ +│ [compressed blocks (zstd default; legacy LZ4 reads too)] │ │ Offsets in index are absolute from pack start │ ├────────────────────────────────────────────────────────────┤ │ Block Index footer (28 bytes × block_count) │ @@ -622,7 +654,7 @@ Every layer has a verification mechanism. The goal: corruption is detected befor | Layer | What's Protected | Hash/Check | When Verified | On Failure | |-------|-----------------|------------|---------------|------------| -| S3 packs | Block data in transit/at rest | BLAKE3-128 | Read path: after S3 fetch + LZ4 decompress | `HashMismatch` error | +| S3 packs | Block data in transit/at rest | BLAKE3-128 | Read path: after S3 fetch + decompress (zstd/LZ4) | `HashMismatch` error | | Clean cache (Foyer) | Cached blocks on SSD/memory | BLAKE3-128 | Background scrubber | Evict from cache → re-fetch from S3 | | VolumeManifest | Chunk pack list root | CRC32 trailer | On deserialization | Reject manifest | | GLPK pack | Block index + data | BLAKE3-128 per block | On block read from S3 | `HashMismatch` error | @@ -755,7 +787,7 @@ Histogram buckets: `<100µs`, `<1ms`, `<10ms`, `<100ms`, `<1s`, `>=1s`. **What the system verifies (rejects if invalid):** -- Block data integrity: BLAKE3-128 verified on every S3 fetch + LZ4 decompress +- Block data integrity: BLAKE3-128 verified on every S3 fetch + decompress (zstd/LZ4) - Manifest integrity: CRC32 trailer verified on every deserialization - WAL integrity: CRC32 per entry, replay stops at first corrupt entry - Dirty block integrity: CRC32 verified at flush time before uploading to S3 @@ -833,7 +865,7 @@ Histogram buckets: `<100µs`, `<1ms`, `<10ms`, `<100ms`, `<1s`, `>=1s`. | `block/pack_index_cache.rs` | `PackIndexCache`: Foyer HybridCache keyed by `PackId`; `lookup_block`, `insert_entries`, `known_hashes` | | `block/content_store.rs` | S3 typed I/O: `stream_chunk_pack` (WriteMultipart), `get_chunk_block`, `get_pack_index` (suffix-read), manifests, snapshots | | `block/manifest.rs` | S3 key helpers: `manifest_s3_key`, `snapshot_s3_key` | -| `block/block_map.rs` | `SparseStateMap`, `SparseCrcMap`, `Blake3Hash`, `blake3_128`, `lz4_compress`, `lz4_decompress` | +| `block/block_map.rs` | `SparseStateMap`, `SparseCrcMap`, `Blake3Hash`, `blake3_128`; block codec: `compress_block`/`decompress_block` (zstd + legacy-LZ4 auto-detect), `zstd_compress`, `lz4_compress`/`lz4_decompress` | | `block/cache.rs` | `BlockCache` trait (CleanCache) + Foyer implementation | ### Background & Observability diff --git a/README.md b/README.md index 27a7b039..0aaf3107 100644 --- a/README.md +++ b/README.md @@ -6,14 +6,26 @@ Built for microVM storage at [Beyond](https://beyond.dev). ## How It Works -Guests see a standard block device (NBD or ublk). Writes go to local SSD immediately. A background scheduler packs dirty blocks, compresses with LZ4, and uploads to S3. Reads serve from local cache; misses pull from S3, verify BLAKE3 hashes, and cache locally. +Guests see a standard block device (NBD or ublk). Writes go to local SSD immediately. A background scheduler packs dirty blocks, compresses them (zstd; codec is detected on read, so legacy LZ4 packs still read), and uploads to S3. Reads serve from local cache; misses pull from S3, verify BLAKE3 hashes, and cache locally. ``` Write path: Guest → NBD/ublk → local SSD pwrite() → return OK ~5µs Read path: Guest → NBD/ublk → local cache hit → return data ~500µs - Guest → NBD/ublk → cache miss → S3 GET → LZ4 → verify → cache → return 50-300ms + Guest → NBD/ublk → cache miss → S3 GET → decompress → verify → cache → return 50-300ms ``` +## Core Properties + +- **Write-back over object storage.** Writes acknowledge against local NVMe (~5µs) and sync to S3 asynchronously as compressed, content-addressed packs. The durable copy is S3; latency is local. +- **Copy-on-write volumes.** Forks and snapshots are manifest operations — O(metadata), no data copied — so a 500 GB volume forks in milliseconds (see [Deployments](#deployments)). +- **Position-addressed in S3, content-addressed in cache.** In S3 a block is located by position — its offset within a (content-named) pack within a chunk — which keeps consecutive blocks contiguous so a multi-block read is one ranged GET. The host cache locates blocks by BLAKE3-128 content hash. The two tiers optimize opposite things on purpose: range-read/request economics in S3, dedup density in cache. +- **Content-addressed host cache.** That cache is shared across every export on the node, so identical blocks from unrelated volumes occupy a single resident copy — regardless of lineage. +- **Deterministic images.** `bless` produces byte-identical ext4 for identical input, and large file payloads are aligned to the block grid, so identical content hashes identically and is stored/cached once. +- **Bounded local cache.** Local SSD is a write-back buffer sized to the working set, not the volume; evicted blocks are re-fetched from S3 and BLAKE3-verified. +- **Standard block device.** Exposed as NBD or ublk — no guest cooperation, no custom filesystem. + +Deduplication spans three tiers at three granularities (lineage CoW, the content-addressed host cache, and position-addressed S3 packs); see [ARCHITECTURE.md → Deduplication Model](ARCHITECTURE.md#deduplication-model). + ## Install ```sh @@ -449,7 +461,7 @@ At 1,000 blocks/sec with 128KB blocks: ~2% of one core for BLAKE3 hashing, ~128M ## Key Design Choices -- **128KB blocks** match ZFS recordsize. Each flush creates one LZ4-compressed pack per modified 128MiB chunk. +- **128KB blocks** match ZFS recordsize. Each flush creates one compressed pack (zstd by default) per modified 128MiB chunk. - **BLAKE3-128 hashing** for content addressing and integrity verification. Truncated from 256-bit; 128-bit collision resistance is sufficient for dedup. - **Lock-free write path** using `pread`/`pwrite`, atomic block map with CAS, and monotonic sequence numbers. - **Typestate pattern** enforces valid lifecycle transitions at compile time. Can't write to a recovering cache. diff --git a/glidefs/fuzz/Cargo.toml b/glidefs/fuzz/Cargo.toml index b9f747fe..bb7624ae 100644 --- a/glidefs/fuzz/Cargo.toml +++ b/glidefs/fuzz/Cargo.toml @@ -78,8 +78,8 @@ doc = false bench = false [[bin]] -name = "fuzz_lz4_decompress" -path = "fuzz_targets/fuzz_lz4_decompress.rs" +name = "fuzz_decompress_block" +path = "fuzz_targets/fuzz_decompress_block.rs" test = false doc = false bench = false diff --git a/glidefs/fuzz/fuzz_targets/fuzz_decompress_block.rs b/glidefs/fuzz/fuzz_targets/fuzz_decompress_block.rs new file mode 100644 index 00000000..b051409c --- /dev/null +++ b/glidefs/fuzz/fuzz_targets/fuzz_decompress_block.rs @@ -0,0 +1,15 @@ +//! Fuzz target for codec-detecting block decompression. +//! +//! `decompress_block` is called on every S3 cache-miss read. It sniffs the zstd +//! magic and dispatches to zstd or legacy LZ4. Corrupted pack data (bit flips, +//! partial uploads, adversarial size prefixes) must produce an error, never a +//! panic or unbounded allocation. Arbitrary input exercises both codec branches. + +#![no_main] + +use glidefs::block::block_map::decompress_block; +use libfuzzer_sys::fuzz_target; + +fuzz_target!(|data: &[u8]| { + let _ = decompress_block(data); +}); diff --git a/glidefs/fuzz/fuzz_targets/fuzz_lz4_decompress.rs b/glidefs/fuzz/fuzz_targets/fuzz_lz4_decompress.rs deleted file mode 100644 index a44efc89..00000000 --- a/glidefs/fuzz/fuzz_targets/fuzz_lz4_decompress.rs +++ /dev/null @@ -1,14 +0,0 @@ -//! Fuzz target for LZ4 decompression. -//! -//! `lz4_decompress` is called on every S3 cache-miss read. Corrupted -//! pack data (bit flips, partial uploads) must produce a DecompressError, -//! not a panic. This exercises lz4_flex's size-prepended format parser. - -#![no_main] - -use glidefs::block::block_map::lz4_decompress; -use libfuzzer_sys::fuzz_target; - -fuzz_target!(|data: &[u8]| { - let _ = lz4_decompress(data); -}); diff --git a/glidefs/src/bin/compress_probe.rs b/glidefs/src/bin/compress_probe.rs new file mode 100644 index 00000000..8b30a809 --- /dev/null +++ b/glidefs/src/bin/compress_probe.rs @@ -0,0 +1,141 @@ +#![allow(clippy::cast_precision_loss)] +//! Measure per-block compression: LZ4 (production) vs zstd at several levels, on +//! real ext4 images. Production compresses each 128 KiB block *independently* +//! (see block_map::lz4_compress at flush), so we do the same here — whole-stream +//! compression would overstate the ratio. Zero blocks are skipped (never stored). +//! +//! Reports stored bytes per scheme = the S3 pack payload size (and thus the +//! storage + egress footprint) you'd get by switching the block compressor. +//! +//! Usage: cargo run --release --bin compress_probe -- img1.ext4 [img2 ...] + +use std::fs::File; +use std::io::Read; +use std::time::Instant; + +use glidefs::block::block_map::{lz4_compress, lz4_decompress}; + +const BLOCK: usize = 128 * 1024; +const ZSTD_LEVELS: [i32; 4] = [1, 3, 9, 19]; + +fn is_zero(d: &[u8]) -> bool { + let (p, c, s) = unsafe { d.align_to::() }; + p.iter().all(|&b| b == 0) && c.iter().all(|&w| w == 0) && s.iter().all(|&b| b == 0) +} + +fn human(b: u64) -> String { + let f = b as f64; + if b >= 1 << 30 { + format!("{:.2} GiB", f / (1u64 << 30) as f64) + } else if b >= 1 << 20 { + format!("{:.1} MiB", f / (1u64 << 20) as f64) + } else { + format!("{:.1} KiB", f / (1u64 << 10) as f64) + } +} + +#[derive(Default)] +struct Totals { + nonzero_raw: u64, + nonzero_blocks: u64, + lz4: u64, + zstd: [u64; 4], + lz4_decode_ns: u128, + zstd_decode_ns: u128, // decoding the zstd-3 form (representative; ~level-independent) +} + +fn main() { + let paths: Vec = std::env::args().skip(1).collect(); + if paths.is_empty() { + eprintln!("usage: compress_probe [...]"); + std::process::exit(2); + } + + let mut grand = Totals::default(); + println!("Per-128KiB-block compression (zeros skipped):\n"); + + for path in &paths { + let mut f = File::open(path).unwrap_or_else(|e| panic!("open {path}: {e}")); + let mut buf = vec![0u8; BLOCK]; + let mut t = Totals::default(); + loop { + let mut filled = 0; + while filled < BLOCK { + match f.read(&mut buf[filled..]) { + Ok(0) => break, + Ok(n) => filled += n, + Err(e) => panic!("read {path}: {e}"), + } + } + if filled == 0 { + break; + } + let block = &buf[..BLOCK]; + if is_zero(block) { + continue; + } + t.nonzero_blocks += 1; + t.nonzero_raw += BLOCK as u64; + let lz4c = lz4_compress(block); + t.lz4 += lz4c.len() as u64; + // Time LZ4 decode (read path on cache miss). + let d0 = Instant::now(); + let _ = lz4_decompress(&lz4c).expect("lz4 decode"); + t.lz4_decode_ns += d0.elapsed().as_nanos(); + for (i, lvl) in ZSTD_LEVELS.iter().enumerate() { + let zc = zstd::bulk::compress(block, *lvl).expect("zstd"); + t.zstd[i] += zc.len() as u64; + if *lvl == 3 { + let d1 = Instant::now(); + let _ = zstd::bulk::decompress(&zc, BLOCK).expect("zstd decode"); + t.zstd_decode_ns += d1.elapsed().as_nanos(); + } + } + } + + let name = path.rsplit('/').next().unwrap_or(path); + println!("{name} ({} non-zero blocks, {} raw)", t.nonzero_blocks, human(t.nonzero_raw)); + println!(" LZ4 (today): {:>10} ratio {:.2}x", human(t.lz4), t.nonzero_raw as f64 / t.lz4 as f64); + for (i, lvl) in ZSTD_LEVELS.iter().enumerate() { + let z = t.zstd[i]; + println!( + " zstd-{:<2}: {:>10} ratio {:.2}x vs LZ4: {:.1}% smaller", + lvl, + human(z), + t.nonzero_raw as f64 / z as f64, + 100.0 * (t.lz4 as f64 - z as f64) / t.lz4 as f64 + ); + } + let n = t.nonzero_blocks.max(1) as f64; + println!( + " decode/block: LZ4 {:.1} µs, zstd-3 {:.1} µs (a 128KiB block; vs ~10–100ms per S3 GET → decode is ~0.1% of read latency)", + t.lz4_decode_ns as f64 / 1000.0 / n, + t.zstd_decode_ns as f64 / 1000.0 / n, + ); + println!(); + + grand.nonzero_raw += t.nonzero_raw; + grand.nonzero_blocks += t.nonzero_blocks; + grand.lz4 += t.lz4; + grand.lz4_decode_ns += t.lz4_decode_ns; + grand.zstd_decode_ns += t.zstd_decode_ns; + for i in 0..4 { + grand.zstd[i] += t.zstd[i]; + } + } + + if paths.len() > 1 { + println!("==== TOTAL across {} images ({} raw) ====", paths.len(), human(grand.nonzero_raw)); + println!(" LZ4 (today): {:>10} ratio {:.2}x", human(grand.lz4), grand.nonzero_raw as f64 / grand.lz4 as f64); + for (i, lvl) in ZSTD_LEVELS.iter().enumerate() { + let z = grand.zstd[i]; + println!( + " zstd-{:<2}: {:>10} ratio {:.2}x vs LZ4: {:.1}% smaller", + lvl, + human(z), + grand.nonzero_raw as f64 / z as f64, + 100.0 * (grand.lz4 as f64 - z as f64) / grand.lz4 as f64 + ); + } + } +} diff --git a/glidefs/src/block/block_map.rs b/glidefs/src/block/block_map.rs index 598729b8..f1ec40c1 100644 --- a/glidefs/src/block/block_map.rs +++ b/glidefs/src/block/block_map.rs @@ -770,6 +770,99 @@ pub fn lz4_decompress( lz4_flex::decompress_size_prepended(compressed) } +// ============================================================================ +// Codec-agnostic block compression (LZ4 legacy + zstd) +// ============================================================================ +// +// New packs are written with zstd; existing LZ4 packs stay readable forever. +// The codec is detected on read by sniffing the zstd frame magic — no on-disk +// format change, and per-block self-describing frames survive compaction (which +// reuses each block's original compressed bytes when merging packs). + +/// First 4 bytes of every zstd frame (RFC 8878). An `lz4_compress` block can +/// never collide: it begins with a u32-LE uncompressed size ≤ 2 MiB, so its +/// byte[3] is always `0x00`, whereas zstd requires `0xFD`. +const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD]; + +/// Cap on decompressed block size — guards against an adversarial/corrupt frame +/// claiming a huge size and OOMing the process (matches the LZ4 path's bound). +const MAX_DECOMPRESSED_SIZE: usize = 2 * 1024 * 1024; + +/// Sentinel `compression_level` selecting the legacy LZ4 codec on write. Any +/// other value selects zstd at that level. The read path always auto-detects, +/// so this only governs what new packs are written as. +pub const COMPRESSION_LZ4: i32 = 0; + +/// Default zstd level for runtime volume flushes. Level 1 compresses at roughly +/// LZ4 speed (near cost-neutral on guest-serving hosts) while still ~23% smaller. +pub const COMPRESSION_RUNTIME_DEFAULT: i32 = 1; + +/// zstd level for bless (offline, write-once/read-many). Highest ratio (~37% +/// smaller than LZ4); slow to compress but bless is offline and decode speed is +/// ~level-independent, so the most-read data pays only at build time. +pub const COMPRESSION_BLESS: i32 = 19; + +/// Default compression level applied to a freshly-opened cache. zstd-1 unless +/// overridden by the `GLIDEFS_COMPRESSION_LEVEL` env var (set it to `0` to pin +/// legacy LZ4, or e.g. `19` for max ratio). zstd is the system default so a +/// cache-open path that forgets to choose still gets compression rather than +/// silently falling back to LZ4. bless still overrides to `COMPRESSION_BLESS`. +pub fn env_default_compression_level() -> i32 { + std::env::var("GLIDEFS_COMPRESSION_LEVEL") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or(COMPRESSION_RUNTIME_DEFAULT) +} + +/// Error from `decompress_block`, preserving which codec failed. +#[derive(Debug)] +pub enum CompressError { + Lz4(lz4_flex::block::DecompressError), + Zstd(std::io::Error), +} + +impl std::fmt::Display for CompressError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CompressError::Lz4(e) => write!(f, "lz4 decompress: {e}"), + CompressError::Zstd(e) => write!(f, "zstd decompress: {e}"), + } + } +} + +impl std::error::Error for CompressError {} + +/// Compress with zstd at `level`. zstd's bulk compressor only errors on OOM or +/// internal failure, neither of which is recoverable here. +#[inline] +pub fn zstd_compress(data: &[u8], level: i32) -> Vec { + zstd::bulk::compress(data, level).expect("zstd bulk compress (fails only on OOM)") +} + +/// Compress a block with the codec selected by `level`: `COMPRESSION_LZ4` +/// selects legacy LZ4, any other value selects zstd at that level. Used by all +/// production write paths (flush, ext4 ingest). +#[inline] +pub fn compress_block(data: &[u8], level: i32) -> Vec { + if level == COMPRESSION_LZ4 { + lz4_compress(data) + } else { + zstd_compress(data, level) + } +} + +/// Decompress a block, auto-detecting LZ4 vs zstd by frame magic. Used by all +/// production read paths so mixed-codec packs (during/after migration) read +/// transparently. Output is capped at `MAX_DECOMPRESSED_SIZE`. +#[inline] +pub fn decompress_block(compressed: &[u8]) -> Result, CompressError> { + if compressed.len() >= 4 && compressed[..4] == ZSTD_MAGIC { + zstd::bulk::decompress(compressed, MAX_DECOMPRESSED_SIZE).map_err(CompressError::Zstd) + } else { + lz4_decompress(compressed).map_err(CompressError::Lz4) + } +} + // ============================================================================ // Tests // ============================================================================ @@ -907,6 +1000,58 @@ mod tests { assert_eq!(decompressed, data); } + #[test] + fn test_zstd_roundtrip() { + let mut data = vec![0u8; 131072]; + for (i, b) in data.iter_mut().enumerate() { + *b = (i % 251) as u8; + } + for level in [1, 3, 19] { + let c = compress_block(&data, level); + assert!(c.len() < data.len(), "zstd-{level} should shrink"); + assert_eq!(decompress_block(&c).unwrap(), data, "zstd-{level} roundtrip"); + } + } + + #[test] + fn test_codec_autodetect_reads_both() { + // The read path must transparently handle LZ4 (legacy) and zstd (new), + // detecting the codec from the frame — this is what keeps old packs + // readable after the compressor swap. + let data: Vec = (0u32..70000) + .map(|i| (i.wrapping_mul(2654435761) >> 13) as u8) + .collect(); + + let lz4 = compress_block(&data, COMPRESSION_LZ4); + let zstd = compress_block(&data, 3); + + // zstd frames carry the magic; LZ4 size-prefix never does (byte[3]==0). + assert_eq!(&zstd[..4], &ZSTD_MAGIC); + assert_ne!(&lz4[..4], &ZSTD_MAGIC); + assert_eq!(lz4[3], 0, "LZ4 size-prefix high byte is 0, never 0xFD"); + + assert_eq!(decompress_block(&lz4).unwrap(), data, "auto-detect LZ4"); + assert_eq!(decompress_block(&zstd).unwrap(), data, "auto-detect zstd"); + } + + #[test] + fn test_decompress_block_reads_legacy_lz4_helper() { + // A block written by the pre-swap `lz4_compress` must still decode. + let data = b"legacy pack written before the zstd swap"; + let legacy = lz4_compress(data); + assert_eq!(decompress_block(&legacy).unwrap(), data); + } + + #[test] + fn test_legacy_lz4_decompress_rejects_zstd_frame() { + // Documents the single-shot rollback floor: a pre-swap binary calling + // the raw LZ4 decompressor on a new zstd pack must FAIL cleanly (the + // zstd magic's high byte 0xFD reads as a >2MiB claimed size → guard + // trips), never silently return wrong bytes. + let zstd = compress_block(b"written after the swap", 3); + assert!(lz4_decompress(&zstd).is_err(), "old LZ4 path must reject a zstd frame, not corrupt"); + } + // ======================================================================== // SparseStateMap tests // ======================================================================== diff --git a/glidefs/src/block/pack.rs b/glidefs/src/block/pack.rs index e4805345..dc44a156 100644 --- a/glidefs/src/block/pack.rs +++ b/glidefs/src/block/pack.rs @@ -497,7 +497,10 @@ pub fn extract_block(pack_data: &[u8], offset: u32, comp_length: u32) -> Option< #[cfg(test)] mod tests { use super::*; - use crate::block::block_map::{blake3_128, lz4_compress, lz4_decompress}; + use crate::block::block_map::{ + blake3_128, compress_block, decompress_block, lz4_compress, lz4_decompress, + COMPRESSION_LZ4, + }; /// Helper: generate deterministic test data for block `i`. fn test_block_data(i: usize, size: usize) -> Vec { @@ -542,6 +545,38 @@ mod tests { } } + #[test] + fn test_pack_round_trip_mixed_codec() { + // Compaction reuses each block's ORIGINAL compressed bytes when merging + // packs, so a single pack can legitimately hold both LZ4- and zstd-framed + // blocks. The read path must auto-detect per block. + let chunk_size: u32 = 131072; + let block_count = 8; + let mut blocks = Vec::with_capacity(block_count); + let mut originals = Vec::with_capacity(block_count); + for i in 0..block_count { + let data = test_block_data(i, chunk_size as usize); + let hash = blake3_128(&data); + // Alternate codecs block-by-block. + let level = if i % 2 == 0 { COMPRESSION_LZ4 } else { 3 }; + let compressed = compress_block(&data, level); + originals.push((hash, data)); + blocks.push((hash, i as u32, compressed)); + } + + let (pack_bytes, _entries) = assemble_pack(blocks, chunk_size).unwrap(); + let index = parse_pack_index(&pack_bytes).unwrap(); + assert_eq!(index.entries.len(), block_count); + + for entry in &index.entries { + let i = entry.chunk_offset as usize; + let compressed = extract_block(&pack_bytes, entry.offset, entry.comp_length).unwrap(); + let decompressed = decompress_block(compressed).unwrap(); + assert_eq!(blake3_128(&decompressed), originals[i].0, "hash mismatch at block {i}"); + assert_eq!(decompressed, originals[i].1, "data mismatch at block {i}"); + } + } + #[test] fn test_pack_header_and_trailer() { let data = vec![0u8; 4096]; diff --git a/glidefs/src/block/router.rs b/glidefs/src/block/router.rs index b04615ba..fc20e58b 100644 --- a/glidefs/src/block/router.rs +++ b/glidefs/src/block/router.rs @@ -1308,6 +1308,10 @@ impl ExportRouter { (Arc::new(cache), volume_manifest) }; + // Runtime exports keep the cache's default codec (zstd-1, or whatever + // GLIDEFS_COMPRESSION_LEVEL sets fleet-wide); only bless overrides it. + // The read path auto-detects, so existing LZ4 packs stay readable. + // Record any recovery issues in metrics let rw = cache.recovery_warning_count(); if rw > 0 { @@ -1842,6 +1846,8 @@ impl ExportRouter { block_size: self.block_size, wal_sync: false, })?); + // Bless is offline + write-once/read-many: use the highest zstd level. + cache.set_compression_level(crate::block::block_map::COMPRESSION_BLESS); #[allow(clippy::cast_possible_truncation)] let block_size_u32 = self.block_size as u32; // block_size typically 4096, fits in u32 diff --git a/glidefs/src/block/write_cache/compact.rs b/glidefs/src/block/write_cache/compact.rs index e935c772..79e060b4 100644 --- a/glidefs/src/block/write_cache/compact.rs +++ b/glidefs/src/block/write_cache/compact.rs @@ -240,7 +240,7 @@ pub async fn compact_chunk( // S3 data could become the sole copy after GC deletes the // original packs. The decompress+hash cost is acceptable // since compaction is not latency-sensitive. - let decompressed = crate::block::block_map::lz4_decompress(&compressed) + let decompressed = crate::block::block_map::decompress_block(&compressed) .map_err(|e| CacheError::DecompressFailed(format!( "compaction: chunk {} pack {:016x} offset {}: {}", chunk_idx, pid, pack_offset, e diff --git a/glidefs/src/block/write_cache/flush.rs b/glidefs/src/block/write_cache/flush.rs index d4b0f4db..a9e0aa8f 100644 --- a/glidefs/src/block/write_cache/flush.rs +++ b/glidefs/src/block/write_cache/flush.rs @@ -7,7 +7,7 @@ use std::sync::atomic::Ordering; use bytes::Bytes; use tracing::{debug, error, info, instrument, warn}; -use crate::block::block_map::{Blake3Hash, SparseBlockState, blake3_128, lz4_compress}; +use crate::block::block_map::{Blake3Hash, SparseBlockState, blake3_128, compress_block}; use crate::block::cache::BlockCache; use crate::block::content_store::ContentStore; use crate::block::state::{Active, Draining}; @@ -72,6 +72,9 @@ fn compute_flush_batch( let block_size = inner.config.block_size; let device_size = inner.config.device_size; + // Codec/level for new packs (read once; the rayon closure captures the Copy + // i32 rather than reaching into `inner`). LZ4 by default; production sets zstd. + let compression_level = inner.compression_level.load(Ordering::Relaxed); // Snap the flushing file Arc before entering rayon so workers share it // lock-free. Without this, every rayon thread serializes on the Mutex @@ -153,7 +156,7 @@ fn compute_flush_batch( let hash = blake3_128(&chunk_buf); - let compressed = Some(Bytes::from(lz4_compress(&chunk_buf[..]))); + let compressed = Some(Bytes::from(compress_block(&chunk_buf[..], compression_level))); // Warm clean_cache if let Some(ref cache) = clean_cache { @@ -244,6 +247,17 @@ fn drain_page_crcs(inner: &CacheInner) -> HashMap> { } impl WriteCache { + /// Set the block compression level/codec used by the flush path. + /// `block_map::COMPRESSION_LZ4` selects legacy LZ4; any other value selects + /// zstd at that level. Set once after open, before any flush (runtime + /// exports use a low zstd level, bless uses a high one). The read path + /// always auto-detects, so this only affects newly written packs. + pub fn set_compression_level(&self, level: i32) { + self.inner + .compression_level + .store(level, std::sync::atomic::Ordering::Relaxed); + } + /// Flush the local cache file and WAL to durable storage. /// /// Syncs both the data file and the WAL so that all dirty block metadata diff --git a/glidefs/src/block/write_cache/init.rs b/glidefs/src/block/write_cache/init.rs index d80c2252..ff06356b 100644 --- a/glidefs/src/block/write_cache/init.rs +++ b/glidefs/src/block/write_cache/init.rs @@ -328,6 +328,11 @@ impl WriteCache { let inner = Arc::new(CacheInner { config, + // Default codec: zstd-1, or GLIDEFS_COMPRESSION_LEVEL if set. bless + // overrides to a high level via WriteCache::set_compression_level. + compression_level: std::sync::atomic::AtomicI32::new( + crate::block::block_map::env_default_compression_level(), + ), data_file: Arc::new(parking_lot::RwLock::new(data_file)), flushing_file: parking_lot::Mutex::new(None), state_map, @@ -416,6 +421,11 @@ impl WriteCache { let inner = Arc::new(CacheInner { config, + // Default codec: zstd-1, or GLIDEFS_COMPRESSION_LEVEL if set. bless + // overrides to a high level via WriteCache::set_compression_level. + compression_level: std::sync::atomic::AtomicI32::new( + crate::block::block_map::env_default_compression_level(), + ), data_file: Arc::new(parking_lot::RwLock::new(data_file)), flushing_file: parking_lot::Mutex::new(None), state_map, diff --git a/glidefs/src/block/write_cache/inner.rs b/glidefs/src/block/write_cache/inner.rs index 5b9e29ef..7dcb7c0d 100644 --- a/glidefs/src/block/write_cache/inner.rs +++ b/glidefs/src/block/write_cache/inner.rs @@ -3,7 +3,7 @@ use parking_lot::Mutex; use std::fs::{File, OpenOptions}; use std::io::{Read, Write as IoWrite}; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use tracing::{debug, info, warn}; use crate::block::block_map::{Blake3Hash, SequenceNumber, SparseBlockState, SparseStateMap}; @@ -360,6 +360,14 @@ pub(crate) struct CacheInner { /// Configuration pub(super) config: WriteCacheConfig, + /// Block compression level used by the flush path. `COMPRESSION_LZ4` + /// selects legacy LZ4; any other value selects zstd at that level. Defaults + /// to zstd-1 (or `GLIDEFS_COMPRESSION_LEVEL`); bless overrides to zstd-19 via + /// `WriteCache::set_compression_level`. The read path always auto-detects, + /// so legacy LZ4 packs stay readable regardless of this. + /// Atomic so it can be set post-construction; set once before any flush. + pub(super) compression_level: AtomicI32, + /// Local cache file (data). /// Uses positional I/O (pread/pwrite) which is thread-safe. RwLock is /// read-locked for all I/O (~2ns overhead); write-locked only during diff --git a/glidefs/src/block/write_cache/read.rs b/glidefs/src/block/write_cache/read.rs index f92a6eff..b8ff3681 100644 --- a/glidefs/src/block/write_cache/read.rs +++ b/glidefs/src/block/write_cache/read.rs @@ -876,7 +876,7 @@ impl WriteCache { pack_offset: u32, comp_length: u32, ) -> Result { - use crate::block::block_map::{blake3_128, lz4_decompress}; + use crate::block::block_map::{blake3_128, decompress_block}; let fetch_start = std::time::Instant::now(); let compressed = match content_store @@ -892,7 +892,7 @@ impl WriteCache { } }; - let decompressed = lz4_decompress(&compressed) + let decompressed = decompress_block(&compressed) .map_err(|e| CacheError::DecompressFailed(e.to_string()))?; let actual_hash = blake3_128(&decompressed); @@ -926,7 +926,7 @@ impl WriteCache { clean_cache: &dyn BlockCache, metrics: Option<&super::super::metrics::ExportMetrics>, ) -> Result, CacheError> { - use crate::block::block_map::{blake3_128, lz4_decompress}; + use crate::block::block_map::{blake3_128, decompress_block}; if entries.is_empty() { return Ok(HashMap::new()); @@ -985,7 +985,7 @@ impl WriteCache { m.record_s3_read(compressed.len() as u64); m.record_s3_fetch_latency(fetch_start.elapsed()); } - let decompressed = lz4_decompress(&compressed) + let decompressed = decompress_block(&compressed) .map_err(|e| CacheError::DecompressFailed(e.to_string()))?; let actual_hash = blake3_128(&decompressed); if actual_hash != expected_hash { @@ -1030,7 +1030,7 @@ impl WriteCache { let local_end = local_start + comp_len as usize; let compressed = coalesced.slice(local_start..local_end); - let decompressed = lz4_decompress(&compressed) + let decompressed = decompress_block(&compressed) .map_err(|e| CacheError::DecompressFailed(e.to_string()))?; let actual_hash = blake3_128(&decompressed); diff --git a/glidefs/src/block/write_cache/tests.rs b/glidefs/src/block/write_cache/tests.rs index dc2f7f04..06b1564a 100644 --- a/glidefs/src/block/write_cache/tests.rs +++ b/glidefs/src/block/write_cache/tests.rs @@ -3040,3 +3040,68 @@ async fn test_cross_flush_dedup_cold_read_from_s3() { let data = h.read(0, 128 * 1024).await; assert!(data.iter().all(|&b| b == 0x42), "cold read from S3 after dedup must return correct data"); } + +/// End-to-end zstd: the production flush path compresses with zstd, uploads to +/// S3, the block is evicted, and a cold read resolves pack-index → S3 → +/// `decompress_block`. Asserts BOTH that the stored pack is actually zstd-framed +/// (codec really switched, not silently LZ4) and that the bytes round-trip. +/// Existing flush/read tests run at the default LZ4 level, so this is the only +/// coverage of the real zstd write→read cycle. +#[tokio::test] +async fn test_zstd_flush_and_cold_read_from_s3() { + const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD]; + // Pack data begins right after the 16-byte GLPK header, so byte 16 is the + // first block's compressed frame. + const PACK_HEADER_SIZE: u32 = 16; + + for level in [1i32, 3, 19] { + let h = V2Harness::with_config(128 * 1024 * 4, 128 * 1024).await; + h.cache.set_compression_level(level); + + // Compressible, level-distinct content. + let mut block = vec![0u8; 128 * 1024]; + for (i, b) in block.iter_mut().enumerate() { + *b = ((i / 512) as u8).wrapping_add(level as u8); + } + h.cache.write(0, &block).unwrap(); + let s = h.flush().await; + assert!(s.packs_uploaded > 0, "level {level}: pack uploaded"); + + // Prove the stored pack's first block frame is ACTUALLY zstd. + let pack_id = h.manifest().chunk_pack_ids(0).expect("chunk 0 has packs")[0]; + let frame = h + .content_store + .get_chunk_block(0, pack_id, PACK_HEADER_SIZE, 4) + .await + .unwrap(); + assert_eq!(&frame[..], &ZSTD_MAGIC, "level {level}: stored pack must be zstd-framed"); + + // Evicted after flush → cold read: pack-index → S3 → decompress_block. + let data = h.read(0, 128 * 1024).await; + assert_eq!(&data[..], &block[..], "level {level}: cold zstd read-back mismatch"); + } +} + +/// Mixed-codec across flushes: a chunk written as LZ4 (legacy), then more blocks +/// appended as zstd, must read back correctly for both — the real migration +/// scenario where old LZ4 packs and new zstd packs coexist for one volume. +#[tokio::test] +async fn test_mixed_codec_across_flushes_cold_read() { + let h = V2Harness::with_config(128 * 1024 * 4, 128 * 1024).await; + + // Flush block 0 as legacy LZ4. + h.cache.set_compression_level(crate::block::block_map::COMPRESSION_LZ4); + let b0 = vec![0xAB; 128 * 1024]; + h.cache.write(0, &b0).unwrap(); + assert!(h.flush().await.packs_uploaded > 0); + + // Switch codec and flush block 1 as zstd (simulates a post-upgrade write). + h.cache.set_compression_level(19); + let b1 = vec![0xCD; 128 * 1024]; + h.cache.write(128 * 1024, &b1).unwrap(); + assert!(h.flush().await.packs_uploaded > 0); + + // Both cold-read correctly via per-block codec auto-detection. + assert_eq!(&h.read(0, 128 * 1024).await[..], &b0[..], "legacy LZ4 block"); + assert_eq!(&h.read(128 * 1024, 128 * 1024).await[..], &b1[..], "zstd block"); +} diff --git a/glidefs/src/cli/bless.rs b/glidefs/src/cli/bless.rs index 2ba0dea5..52a45ef1 100644 --- a/glidefs/src/cli/bless.rs +++ b/glidefs/src/cli/bless.rs @@ -72,7 +72,7 @@ pub async fn run_bless( // --- Stream image: read blocks, upload each chunk as it completes --- let (volume_manifest, hot_set_indices, stats) = - store_ext4_stream(&content_store, file, device_size).await?; + store_ext4_stream(&content_store, file, device_size, crate::block::block_map::COMPRESSION_BLESS).await?; // --- Upload manifest --- let manifest_key = format!("bases/{}", name); @@ -198,6 +198,8 @@ pub async fn run_bless_oci( }; let cache = Arc::new(WriteCache::open_fresh_active(cache_config)?); + // Bless is offline + write-once/read-many: use the highest zstd level. + cache.set_compression_level(crate::block::block_map::COMPRESSION_BLESS); let volume_manifest = Arc::new(parking_lot::RwLock::new(VolumeManifest::new( device_size, BLOCK_SIZE, @@ -458,7 +460,7 @@ pub async fn run_bless_oci_layered( #[cfg(test)] mod tests { use super::*; - use crate::block::block_map::{blake3_128, lz4_decompress}; + use crate::block::block_map::{blake3_128, decompress_block}; use crate::block::pack::{extract_block, lookup_block_in_index, parse_pack_index, PackId}; use object_store::memory::InMemory; use object_store::path::Path as ObjectPath; @@ -502,7 +504,7 @@ mod tests { )); let (volume_manifest, hot_set_indices, stats) = - store_ext4_stream(&content_store, std::io::Cursor::new(image_data.to_vec()), device_size) + store_ext4_stream(&content_store, std::io::Cursor::new(image_data.to_vec()), device_size, crate::block::block_map::COMPRESSION_BLESS) .await?; content_store @@ -602,7 +604,7 @@ mod tests { // Extract and decompress the block from pack let compressed = extract_block(&pack_bytes, entry.offset, entry.comp_length).unwrap(); - let decompressed = lz4_decompress(compressed).unwrap(); + let decompressed = decompress_block(compressed).unwrap(); assert_eq!(blake3_128(&decompressed), entry.hash); assert_eq!(&decompressed[..], original_block); @@ -797,7 +799,7 @@ mod tests { pie.comp_length, ) .unwrap(); - let decompressed = lz4_decompress(compressed).unwrap(); + let decompressed = decompress_block(compressed).unwrap(); assert_eq!( &decompressed[..], original_block, @@ -841,7 +843,7 @@ mod tests { let compressed = extract_block(&pack_bytes, pack_offset, comp_length).unwrap(); - let decompressed = lz4_decompress(compressed).unwrap(); + let decompressed = decompress_block(compressed).unwrap(); assert_eq!(blake3_128(&decompressed), hash); let expected = vec![(offset + 1) as u8; BLOCK_SIZE as usize]; diff --git a/glidefs/src/oci/ext4_store.rs b/glidefs/src/oci/ext4_store.rs index d286582e..42a9893f 100644 --- a/glidefs/src/oci/ext4_store.rs +++ b/glidefs/src/oci/ext4_store.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; use bytes::Bytes; -use crate::block::block_map::{blake3_128, lz4_compress, shared_zero_block, Blake3Hash}; +use crate::block::block_map::{blake3_128, compress_block, shared_zero_block, Blake3Hash}; use crate::block::content_store::ContentStore; use crate::block::pack::{content_pack_id, PackId}; use crate::block::volume_manifest::VolumeManifest; @@ -62,6 +62,8 @@ pub async fn store_ext4_stream( content_store: &Arc, mut source: R, device_size: u64, + // Block compression level (`block_map::COMPRESSION_LZ4` = LZ4, else zstd level). + level: i32, ) -> Result<(VolumeManifest, Vec, StoreStats)> { let vm_template = VolumeManifest::new(device_size, BLOCK_SIZE); let total_blocks = device_size.div_ceil(u64::from(BLOCK_SIZE)) as usize; @@ -90,7 +92,7 @@ pub async fn store_ext4_stream( let chunk_idx = vm_template.chunk_idx_for_block(block_index as u64); let block_offset = vm_template.block_offset_in_chunk(block_index as u64); stats.unique_blocks += 1; - let compressed = Bytes::from(lz4_compress(&buf)); + let compressed = Bytes::from(compress_block(&buf, level)); if pending_chunk.as_ref().is_some_and(|(idx, _)| *idx != chunk_idx) { let (completed_idx, blocks) = pending_chunk.take().unwrap(); diff --git a/glidefs/src/oci/layer_store.rs b/glidefs/src/oci/layer_store.rs index 9e9afaf1..a0accf00 100644 --- a/glidefs/src/oci/layer_store.rs +++ b/glidefs/src/oci/layer_store.rs @@ -152,7 +152,7 @@ pub async fn ensure_layer_stored( // Stream the ext4 into content-addressed packs + manifest under layers/{digest}. let (volume_manifest, _hot_set, stats) = - store_ext4_stream(&content_store, ext4_tmp, device_size).await?; + store_ext4_stream(&content_store, ext4_tmp, device_size, crate::block::block_map::COMPRESSION_BLESS).await?; content_store .put_manifest(LAYER_MANIFEST_NAME, volume_manifest.serialize()?, None) .await diff --git a/glidefs/tests/docker_integration/oci_layer_dedup.rs b/glidefs/tests/docker_integration/oci_layer_dedup.rs index a8ff737b..5cb45d7c 100644 --- a/glidefs/tests/docker_integration/oci_layer_dedup.rs +++ b/glidefs/tests/docker_integration/oci_layer_dedup.rs @@ -272,9 +272,10 @@ async fn merged_stored_bytes( Arc::clone(object_store), &format!("{db_path}/exports/{name}"), )); - let (_vm, _hot, stats) = store_ext4_stream(&cs, Cursor::new(ext4), device_size) - .await - .unwrap(); + let (_vm, _hot, stats) = + store_ext4_stream(&cs, Cursor::new(ext4), device_size, glidefs::block::block_map::COMPRESSION_BLESS) + .await + .unwrap(); stats.bytes_uploaded }