diff --git a/docs/ArrowIPC_vs_Parquet_26-04-18.md b/docs/ArrowIPC_vs_Parquet_26-04-18.md new file mode 100644 index 00000000..f30726ff --- /dev/null +++ b/docs/ArrowIPC_vs_Parquet_26-04-18.md @@ -0,0 +1,530 @@ +# Arrow IPC vs Parquet — Format Recommendation and Implementation Plan + +**Date:** April 18, 2026 +**Replaces:** `parquet_pipeline_analysis.md`, `PyArrow-Parquet_Analysis_26-04-18.md` +**Decision:** **Do not invest in dgen-rs Parquet encoder. Pivot to Arrow IPC.** + +--- + +## TL;DR — Storage Benchmark Accuracy + +Parquet produces an **inaccurate** storage benchmark at high throughput. The CPU decoder saturates around 1–2 GB/s and becomes the bottleneck before the storage system does. You end up measuring the client CPU, not the storage. + +Arrow IPC is the correct choice for a storage benchmark — bytes on disk are the in-memory format, so the I/O path is always the bottleneck. Two conditions must hold to keep it honest: + +- **S3 / object storage**: inherently cache-bypass. Every `get_batch()` goes over the network. No extra steps needed. +- **Local filesystem**: the OS page cache will silently serve reads from DRAM after the first epoch. Use `odirect: true` (O_DIRECT, bypasses page cache completely) or size the dataset so it substantially exceeds host RAM. **O_DIRECT must be implemented for Arrow IPC — not raised as an unsupported exception — because it is the primary tool for accurate local storage benchmarking.** + +--- + +## Recommendation Summary + +Do **not** spend time adding a Rust Parquet encoder to dgen-rs/dgen-py. + +The correct solution is to implement Arrow IPC file support in dlio_benchmark. Arrow IPC eliminates the Parquet generation bottleneck entirely, improves read throughput by orders of magnitude, requires **zero changes to s3dlio or dgen-rs**, and takes roughly 300–400 lines of Python across 3 new files. + +--- + +## Measured Benchmark Results + +All measurements on this machine (12 logical CPUs, PyArrow 23.0.1) using 128 rows × 512 KB = 64 MB files — representative of large AI training sample files: + +``` +Arrow IPC write: 0.80 GB/s (67 MB/file) +Parquet write: 0.04 GB/s (67 MB/file) +IPC write speedup: 20x faster to generate + +Arrow IPC read: 1338 GB/s (in-memory; get_batch × 2) +Parquet read: 0.08 GB/s (read_row_group × 2) +IPC read speedup: 15,752x faster to read from memory +``` + +The 0.04 GB/s Parquet write figure matches the `parquet_pipeline_analysis.md` measurement exactly. It is a fundamental constraint of PyArrow's Parquet encoder processing `FixedSizeListArray` at element granularity rather than block granularity. This cannot be fixed from Python. + +The in-memory read speedup is extreme because Arrow IPC `get_batch()` is a direct memory view — no decoding, no decompression. In real S3/network scenarios the read advantage is bounded by network throughput, but the CPU overhead difference is preserved at any bandwidth: Parquet requires decode+decompress per column chunk; Arrow IPC requires only `memcpy`. + +--- + +## Why the dgen-rs Parquet Encoder Is the Wrong Investment + +The `parquet_pipeline_analysis.md` proposal was to add ~300 lines of Rust to dgen-rs implementing a parallel Parquet encoder using the `parquet` crate. This would: + +- Fix the generation bottleneck for Parquet +- Require maintaining Rust Parquet crate integration in dgen-rs permanently +- Still produce Parquet files that readers must decode+decompress under high network load +- Deliver no improvement at the point that actually matters for production AI training: **read throughput at >10 GB/s storage bandwidth** + +Arrow IPC solves both the generation bottleneck (20× faster write) and the read bottleneck (zero CPU decode) in one move, with no Rust changes at all. + +--- + +## Format Comparison + +| Property | Parquet | Arrow IPC File | +|---|---|---| +| On-disk format | Columnar, encoded, compressed | Raw Arrow buffers (the in-memory format) | +| Write throughput (PyArrow) | ~0.04 GB/s for large fixed-size arrays | ~0.80 GB/s — 20× faster | +| Read CPU cost | Decode + decompress per column chunk | `memcpy` only — zero decode overhead | +| Footer | Row-group metadata + column stats | Record batch byte offsets | +| Random batch access | `bisect(cumulative_offsets, idx)` | `get_batch(i)` — O(1), exact offset | +| Compression | gzip, snappy, zstd, lz4, brotli | lz4, zstd optional (default: none) | +| Bottleneck at >10 GB/s storage | CPU (decode) is the bottleneck | Network / storage is the bottleneck | +| pyarrow write API | `pq.ParquetWriter` | `pa.ipc.new_file()` | +| pyarrow read API | `pq.ParquetFile.read_row_group(i)` | `pa.ipc.open_file().get_batch(i)` | +| dgen-rs changes needed | Would require ~300 new lines | **None** | +| s3dlio changes needed | None | **None** | + +The data scientist's observation — "moving away from Parquet because it is hard to effectively utilize these files" — is precisely the CPU decode bottleneck at high network throughput. Arrow IPC removes it. + +--- + +## Why No New Rust Is Needed + +The existing s3dlio Python API is already sufficient: +- `s3dlio.get_range(uri, offset, length)` → range GET for any backend (S3, GCS, Azure, file, direct) +- `s3dlio.stat(uri)["size"]` → file size for any backend + +The `_S3RangeFile` adapter in `parquet_reader_s3_iterable.py` wraps exactly these two calls. `pa.ipc.open_file()` accepts any seekable file-like object, so the adapter works unchanged for Arrow IPC — it is format-agnostic. The Arrow IPC reader is the Parquet reader with two function names changed. + +--- + +## Benchmark Accuracy: Are We Measuring Storage or CPU/Memory? + +This is the right question to ask before implementing any format, and the answer determines how the implementation must be designed. + +### The Parquet problem — it makes a poor storage benchmark + +At storage throughputs above roughly 1–2 GB/s (easily achievable on modern NVMe or fast S3), PyArrow's Parquet column decoder saturates the CPU before the storage system is saturated. The result: storage throughput could double and benchmark results would not change, because the bottleneck is the client CPU doing decode, not the storage system doing I/O. **Parquet-based workloads do not accurately benchmark storage at high throughput.** This is one of the primary real-world motivations for formats like Arrow IPC: practitioners building fast ML pipelines have observed this exact bottleneck and moved away from Parquet. + +### Arrow IPC — correct for storage benchmarking, with two caveats + +Arrow IPC bytes on disk are identical to the Arrow in-memory format. `get_batch()` issues one range read and places the result directly into the Arrow buffer pool. There is no column decoder, no dictionary expansion, no decompression. The CPU cost is dominated by the I/O syscall and a single DMA + user-space copy. At any storage throughput below ~50 GB/s (memory bandwidth), the storage read is the bottleneck, not the final copy. This is what a storage benchmark should measure. + +However, two conditions must hold for Arrow IPC reads to accurately measure storage rather than DRAM or page cache: + +#### Caveat 1: Page cache (local filesystem only) + +On a local POSIX filesystem, the OS page cache will retain file data in DRAM after the first read. A second epoch over the same files will return entirely from page cache — measuring DRAM bandwidth (~40–80 GB/s), not NVMe or network storage. This problem exists for every format, but it is **more acute for Arrow IPC** because: +- Parquet decode is CPU-intensive — the CPU acts as a natural throttle that causes pages to be evicted before the next epoch starts +- Arrow IPC decode is trivial — the OS has time to cache everything before the next epoch begins + +dlio_benchmark already detects this condition and warns: +``` +WARNING: The amount of dataset is smaller than the host memory; data might be +cached after the first epoch. Increase the size of dataset to eliminate the caching effect! +``` + +This warning should be heeded. But for cases where the dataset cannot be made large enough (e.g., rapid iteration, testing), **O_DIRECT is the correct solution** — it bypasses the page cache entirely and forces every read to go to storage hardware. + +The current implementation plan raises `Exception("O_DIRECT not yet supported")` for Arrow IPC. **This must be implemented, not skipped.** O_DIRECT is the most important mode for accurate local storage benchmarking. See the implementation plan below for the approach. + +#### Caveat 2: Page cache (object storage — not a concern) + +For S3/MinIO/object storage via `ArrowIPCReaderS3Iterable`, every `get_batch()` call goes over the network. There is no page cache. Object storage benchmarks are inherently cache-bypass and accurately measure storage throughput with Arrow IPC. No special handling needed. + +### Is Arrow IPC a realistic production format? + +Yes. Hugging Face `datasets` library stores all datasets in Arrow IPC (Feather v2) format internally. The format is used in production MLOps pipelines at scale. It is also the native exchange format between Apache Arrow producers and consumers (Spark, DuckDB, pandas, Polars). The premise that "real workloads use Parquet" is correct for data warehousing and analytics — but for AI training data ingestion, Arrow IPC is an accurate representation of modern high-throughput pipelines. + +### Summary of accuracy requirements + +| Storage path | Cache bypass needed | How to achieve it | +|---|---|---| +| Local NVMe / SSD | **Yes** | `odirect: true` in YAML (O_DIRECT), OR dataset >> host RAM | +| NFS / parallel FS | **Yes, if close-to-cache** | Dataset >> host RAM, OR `echo 3 > /proc/sys/vm/drop_caches` between epochs | +| S3 / object storage | No — always bypassed | Nothing extra needed; every GET goes to the network | +| MinIO on same machine | Partial — MinIO has its own cache | Use remote MinIO, or size dataset >> MinIO server RAM | + +**The benchmark operator's checklist for accurate Arrow IPC results:** +1. Set `num_files_train` so that `total dataset size >> host RAM` (heed the dlio_benchmark warning) +2. For local storage: use `odirect: true` or run `echo 3 > /proc/sys/vm/drop_caches` between experiments (requires root / `sudo`) +3. Discard epoch 1 results if cache state is uncertain; report epoch 2+ as the steady-state storage throughput +4. For S3: no extra steps — object storage reads are always cache-bypass + +### Format verdict from a storage benchmarking perspective + +| Format | Bottleneck at >1 GB/s storage | Accurate storage benchmark? | +|---|---|---| +| Parquet (compressed) | CPU decode — saturates before storage | **No** — measures client CPU, not storage | +| Parquet (uncompressed) | CPU decode (lighter) — still ~1–5 GB/s ceiling | **Marginal** — becomes inaccurate above ~2 GB/s | +| Arrow IPC (uncompressed) | Storage I/O (with cache bypass) | **Yes** — measures storage when cache is bypassed | +| Arrow IPC + LZ4 | LZ4 decode at ~15 GB/s — above most storage | **Yes** — LZ4 is fast enough to remain storage-bound up to ~12 GB/s | + +Arrow IPC uncompressed is the correct choice. Arrow IPC + LZ4 is a valid alternative for benchmarking compressed data ingestion while remaining storage-bound. + +--- + +## Implementation Plan + +Six touch points total. Three new files, three small modifications. + +Both POSIX/local-filesystem and object-storage (S3/MinIO/GCS/Azure via s3dlio) paths must work, and the reader factory already dispatches on `storage_type` exactly as it does for Parquet. The generator already uses `self.storage.islocalfs()` to choose between a direct file write and a buffer-then-upload path. Arrow IPC follows both patterns identically — only the writer and reader API calls change. + +--- + +### New files + +#### `reader/arrow_ipc_reader.py` — POSIX / local filesystem reader + +Used when `storage_type` is `local`, `nfs`, or any non-object-store type. PyArrow's `pa.ipc.open_file()` accepts a plain filesystem path directly, so no adapter is needed — it opens, `mmap`s, and reads the footer in one call. + +```python +import pyarrow as pa +from dlio_benchmark.reader.reader_handler import FormatReader +from dlio_benchmark.utils.utility import Profile +from dlio_benchmark.common.constants import MODULE_DATA_READER + +dlp = Profile(MODULE_DATA_READER) + +class ArrowIPCReader(FormatReader): + + def __init__(self, dataset_type, thread_index, epoch): + super().__init__(dataset_type, thread_index) + opts = getattr(self._args, "storage_options", {}) or {} + self._batch_cache_size = int(opts.get("batch_cache_size", 4)) + self._batch_cache: dict = {} + self._lru: list = [] + + @dlp.log + def open(self, filename): + # pa.ipc.open_file reads only the footer on open — no full file load. + reader = pa.ipc.open_file(filename) + return (reader, reader.num_record_batches) + + @dlp.log + def get_sample(self, filename, sample_index): + reader, num_batches = self.open_file_map[filename] + # Assumes fixed records_per_batch; num_record_batches divides num_samples evenly. + records_per_batch = self._args.num_samples_per_file // num_batches + batch_idx = sample_index // records_per_batch + + cache_key = (filename, batch_idx) + if cache_key not in self._batch_cache: + if len(self._lru) >= self._batch_cache_size: + evict = self._lru.pop(0) + self._batch_cache.pop(evict, None) + # get_batch() issues exactly one range read for the batch buffers. + batch = reader.get_batch(batch_idx) + self._batch_cache[cache_key] = batch + self._lru.append(cache_key) + else: + batch = self._batch_cache[cache_key] + + dlp.update(image_size=batch.nbytes) + return self._args.resized_image + + @dlp.log + def close(self, filename): + keys = [k for k in self._batch_cache if k[0] == filename] + for k in keys: + self._batch_cache.pop(k, None) + if k in self._lru: + self._lru.remove(k) + super().close(filename) +``` + +Key points: +- `pa.ipc.open_file(filename)` works with any POSIX path, NFS mount, or `file://` URI. No wrapper needed. +- `reader.num_record_batches` replaces the `bisect(cumulative_offsets)` lookup used in Parquet — Arrow IPC stores batch byte offsets in the footer, so `get_batch(i)` is O(1) and issues one exact range read. +- The cache eviction and `dlp` telemetry pattern is identical to `ParquetReader`. +- **O_DIRECT must be supported** (not raise an exception) — it is the primary mechanism for accurate local storage benchmarking. When `odirect: true`, `open()` uses a `_DirectRangeFile` adapter backed by s3dlio's `direct://` URI scheme instead of a plain path. See `reader_factory.py` modifications below. + +--- + +#### `reader/arrow_ipc_reader_s3_iterable.py` — S3 / object-store reader + +Used when `storage_type` is `s3` or `aistore`. PyArrow's `pa.ipc.open_file()` accepts any seekable file-like object, which means the existing `_S3RangeFile` adapter from `parquet_reader_s3_iterable.py` works without modification. The s3dlio, MinIO, and s3torchconnector dispatch paths are unchanged. + +```python +import pyarrow as pa +from dlio_benchmark.reader.parquet_reader_s3_iterable import ( + _S3RangeFile, _MinioRangeFile, _S3TCRangeFile, +) +from dlio_benchmark.reader.reader_handler import FormatReader +from dlio_benchmark.utils.utility import Profile +from dlio_benchmark.common.constants import MODULE_DATA_READER + +dlp = Profile(MODULE_DATA_READER) + +class ArrowIPCReaderS3Iterable(FormatReader): + + def __init__(self, dataset_type, thread_index, epoch): + super().__init__(dataset_type, thread_index) + opts = getattr(self._args, "storage_options", {}) or {} + self._storage_library = opts.get("storage_library", "s3dlio") + self._endpoint_url = opts.get("endpoint_url", "") + self._batch_cache_size = int(opts.get("batch_cache_size", 4)) + self._batch_cache: dict = {} + self._lru: list = [] + + def _make_range_file(self, uri): + """Return the appropriate seekable file-like adapter for this URI.""" + if self._storage_library == "s3dlio": + return _S3RangeFile(uri) + elif self._storage_library == "minio": + return _MinioRangeFile(uri, self._endpoint_url) + elif self._storage_library == "s3torchconnector": + return _S3TCRangeFile(uri) + else: + raise ValueError(f"Unknown storage_library: {self._storage_library!r}") + + @dlp.log + def open(self, filename): + rf = self._make_range_file(filename) + # Two small range GETs: one for the magic/version header, one for the footer. + reader = pa.ipc.open_file(rf) + return (reader, reader.num_record_batches) + + @dlp.log + def get_sample(self, filename, sample_index): + reader, num_batches = self.open_file_map[filename] + records_per_batch = self._args.num_samples_per_file // num_batches + batch_idx = sample_index // records_per_batch + + cache_key = (filename, batch_idx) + if cache_key not in self._batch_cache: + if len(self._lru) >= self._batch_cache_size: + evict = self._lru.pop(0) + self._batch_cache.pop(evict, None) + # Exactly one range GET for the batch body — no full-file download. + batch = reader.get_batch(batch_idx) + self._batch_cache[cache_key] = batch + self._lru.append(cache_key) + else: + batch = self._batch_cache[cache_key] + + dlp.update(image_size=batch.nbytes) + return self._args.resized_image + + @dlp.log + def close(self, filename): + keys = [k for k in self._batch_cache if k[0] == filename] + for k in keys: + self._batch_cache.pop(k, None) + if k in self._lru: + self._lru.remove(k) + super().close(filename) +``` + +Key points: +- `_S3RangeFile` is format-agnostic — it only implements `seek/tell/read` over `s3dlio.get_range` and `s3dlio.stat`. It does not know or care whether it is wrapping a Parquet footer scan or an Arrow IPC footer scan. +- `pa.ipc.open_file(rf)` reads the IPC file magic (8 bytes) and footer (a small Flatbuffer at the end of the file) using two range GETs, then returns. No record batch data is transferred at open time. +- `reader.get_batch(batch_idx)` uses the byte offset and length from the footer to issue exactly one range GET. This is the strongest advantage over Parquet at S3 scale: one network round trip per sample batch, not a column-chunk scan. + +--- + +#### `data_generator/arrow_ipc_generator.py` — file generator (POSIX and object storage) + +The existing `ParquetGenerator.generate()` already uses `is_local = self.storage.islocalfs()` to dispatch between a direct write (`writer_target = out_path_spec`) and a buffer-then-upload path (`writer_target = pa.BufferOutputStream()` → `self.storage.put_data(...)`). `ArrowIPCGenerator` uses the same pattern with `pa.ipc.new_file()` in place of `pq.ParquetWriter`. + +```python +import os +import numpy as np +import pyarrow as pa + +from dlio_benchmark.data_generator.data_generator import DataGenerator +from dlio_benchmark.data_generator.parquet_generator import ( + ParquetGenerator, _PA_SCALAR_TYPE_MAP, _NP_TYPE_MAP, +) +from dlio_benchmark.utils.utility import progress, gen_random_tensor, DLIOMPI +import dgen_py as _dgen_py + +class ArrowIPCGenerator(DataGenerator): + + def __init__(self): + super().__init__() + opts = getattr(self._args, "storage_options", {}) or {} + # ipc_record_batch_size: rows per Arrow record batch. + # Must divide num_samples_per_file evenly for O(1) get_batch() indexing. + self.record_batch_size = int( + opts.get("ipc_record_batch_size", + getattr(self._args, "parquet_row_group_size", 1024)) + ) + self.parquet_columns = getattr(self._args, "parquet_columns", []) + + def _build_schema(self, legacy_elem_size=None): + # Reuse the same schema logic as ParquetGenerator — the Arrow IPC file + # format stores this schema verbatim in the file header and footer. + pg = ParquetGenerator.__new__(ParquetGenerator) + pg._args = self._args + pg.parquet_columns = self.parquet_columns + return pg._build_schema(legacy_elem_size=legacy_elem_size) + + def generate(self): + super().generate() + + np.random.seed(self.BASE_SEED + self.my_rank) + rng = np.random.default_rng(seed=self.BASE_SEED + self.my_rank) + dim = self.get_dimension(self.total_files_to_generate) + is_local = self.storage.islocalfs() + + write_opts = pa.ipc.IpcWriteOptions(compression=None) # zero-decode on read + + for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size): + progress(i + 1, self.total_files_to_generate, "Generating Arrow IPC Data") + + out_path_spec = self.storage.get_uri(self._file_list[i]) + dim_raw = dim[2 * i] + if isinstance(dim_raw, list): + dim1 = int(dim_raw[0]); dim2 = int(dim_raw[1]) if len(dim_raw) > 1 else 1 + else: + dim1 = int(dim_raw); dim2 = int(dim[2 * i + 1]) + elem_size = dim1 * dim2 + + schema = self._build_schema(legacy_elem_size=elem_size) + + # ── Choose write target ─────────────────────────────────────── + if is_local: + parent_dir = os.path.dirname(out_path_spec) + if parent_dir: + os.makedirs(parent_dir, exist_ok=True) + writer_target = out_path_spec # direct filesystem write + else: + writer_target = pa.BufferOutputStream() # buffer → put_data below + + num_batches = (self.num_samples + self.record_batch_size - 1) // self.record_batch_size + + with pa.ipc.new_file(writer_target, schema, options=write_opts) as writer: + for batch_idx in range(num_batches): + batch_start = batch_idx * self.record_batch_size + batch_end = min(batch_start + self.record_batch_size, self.num_samples) + cur_rows = batch_end - batch_start + + if self.parquet_columns: + # Column-schema mode — reuse ParquetGenerator helpers + pg = ParquetGenerator.__new__(ParquetGenerator) + pg._args = self._args + pg.parquet_columns = self.parquet_columns + columns = pg._generate_batch_columns(cur_rows, rng) + else: + # Legacy uint8 mode — same dgen path as ParquetGenerator + flat = gen_random_tensor(shape=(cur_rows * elem_size,), + dtype=np.uint8, rng=rng) + arrow_flat = pa.array(flat, type=pa.uint8()) + arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, elem_size) + columns = {'data': arrow_data} + + batch = pa.RecordBatch.from_arrays( + list(columns.values()), schema=schema + ) + writer.write_batch(batch) + + # ── Upload if object storage ────────────────────────────────── + if not is_local: + self.storage.put_data(out_path_spec, writer_target.getvalue().to_pybytes()) + + np.random.seed() +``` + +Key points: +- `is_local` dispatch is identical to `ParquetGenerator` — `pa.ipc.new_file()` accepts either a filesystem path or a `BufferOutputStream` and PyArrow handles both transparently. +- For POSIX/local: the IPC file is written directly to disk with `os.makedirs` pre-created, no staging buffer needed. +- For object storage: the entire file is buffered in a `BufferOutputStream`, then uploaded via `self.storage.put_data()` — exactly the same pattern as Parquet. The file is typically 64–512 MB so this is acceptable; streaming multipart upload could be added later if needed. +- `write_opts = IpcWriteOptions(compression=None)` is the default and should stay the default. Enabling `lz4` or `zstd` compression is possible but defeats the zero-decode advantage. +- The dgen-py streaming pool path (used by `ParquetGenerator` for sub-32 MB batches) can be ported directly from `parquet_generator.py` once the basic path is validated. + +--- + +### Modifications + +#### `common/enumerations.py` +```python +# In FormatType enum — add after PARQUET +ARROW_IPC = 'arrow_ipc' + +# In FormatType.get_enum() — add the elif branch +elif FormatType.ARROW_IPC.value == value: + return FormatType.ARROW_IPC +``` + +#### `reader/reader_factory.py` + +The dispatch mirrors the existing Parquet block exactly: S3/AIStore → range-GET reader, everything else → POSIX reader. Unlike Parquet, **O_DIRECT must be supported** — it is the primary means of ensuring accurate local storage benchmarking. When `odirect: true`, the local reader uses s3dlio's `direct://` URI scheme, which opens the file with `O_DIRECT` and returns DMA-aligned buffers. The S3 reader is inherently cache-bypass and ignores the flag. + +```python +elif type == FormatType.ARROW_IPC: + if _args.storage_type in (StorageType.S3, StorageType.AISTORE): + # S3/object storage is always cache-bypass — odirect flag is irrelevant. + from dlio_benchmark.reader.arrow_ipc_reader_s3_iterable import ArrowIPCReaderS3Iterable + return ArrowIPCReaderS3Iterable(dataset_type, thread_index, epoch_number) + else: + # For local/NFS: ArrowIPCReader respects odirect=True via direct:// URI. + # Raising NotImplemented here (as other formats do) would make accurate + # local storage benchmarking impossible — do not do this. + from dlio_benchmark.reader.arrow_ipc_reader import ArrowIPCReader + return ArrowIPCReader(dataset_type, thread_index, epoch_number) +``` + +O_DIRECT implementation in `ArrowIPCReader.open()`: + +```python +@dlp.log +def open(self, filename): + if getattr(self._args, 'odirect', False): + # Rewrite the path as a direct:// URI so s3dlio opens with O_DIRECT. + # s3dlio.get_range('direct:///path/to/file', offset, length) returns + # DMA-aligned Bytes, bypassing the page cache entirely. + import s3dlio + uri = 'direct://' + filename if not filename.startswith('direct://') else filename + # Wrap in _DirectRangeFile (same interface as _S3RangeFile but uses + # s3dlio direct:// backend for O_DIRECT reads). + rf = _DirectRangeFile(uri) + reader = pa.ipc.open_file(rf) + else: + reader = pa.ipc.open_file(filename) + return (reader, reader.num_record_batches) +``` + +`_DirectRangeFile` is identical to `_S3RangeFile` with `uri = 'direct://' + posix_path`. Since s3dlio's `direct://` backend already handles aligned reads, no additional buffer alignment code is needed in Python. + +#### `data_generator/generator_factory.py` + +The generator handles both POSIX and object storage internally via `self.storage.islocalfs()`, so a single factory entry covers both: + +```python +elif type == FormatType.ARROW_IPC: + from dlio_benchmark.data_generator.arrow_ipc_generator import ArrowIPCGenerator + return ArrowIPCGenerator() +``` + +--- + +### Example YAML — local filesystem + +```yaml +dataset: + format: arrow_ipc + storage_type: local + storage_root: /mnt/nvme/training-data + num_samples_per_file: 1024 + num_files_train: 500 + storage_options: + batch_cache_size: 4 + ipc_record_batch_size: 256 # must divide num_samples_per_file evenly +``` + +No `storage_library` key is needed for local storage — `ArrowIPCReader` opens files directly with `pa.ipc.open_file(filename)`. + +### Example YAML — S3 / object storage via s3dlio + +```yaml +dataset: + format: arrow_ipc + storage_type: s3 + storage_root: my-bucket + num_samples_per_file: 1024 + num_files_train: 500 + storage_options: + storage_library: s3dlio + endpoint_url: http://127.0.0.1:9000 + batch_cache_size: 4 + ipc_record_batch_size: 256 # must divide num_samples_per_file evenly +``` + +--- + +## What to Keep from the Parquet Work + +The existing `ParquetReader`, `ParquetReaderS3Iterable`, and `ParquetGenerator` should remain in the codebase. Parquet is a widely used format and the existing implementation is correct and production-quality. The recommendation is not to remove Parquet support — it is to add Arrow IPC as the preferred format for new workloads, particularly those running against high-throughput storage (>10 GB/s). + +For existing Parquet datasets, the current readers continue to work. For new datasets, Arrow IPC is the better choice. diff --git a/docs/DLIO_IO_Issues-Executive_Summary_2026-03-28.md b/docs/DLIO_IO_Issues-Executive_Summary_2026-03-28.md deleted file mode 100644 index 74ddc851..00000000 --- a/docs/DLIO_IO_Issues-Executive_Summary_2026-03-28.md +++ /dev/null @@ -1,159 +0,0 @@ -# DLIO Benchmark I/O Issues — Executive Summary - -**Date:** 2026-03-28 -**Full technical document:** [DLIO_IO_Issues-Proposal_2026-03-28.md](DLIO_IO_Issues-Proposal_2026-03-28.md) -**Audience:** Engineering leads, project owners, and decision-makers who need to understand the scope of issues and the investment required to address them — without implementation details. - ---- - -## What This Review Found - -A code review of the `dlio_benchmark` codebase identified thirteen distinct issues across data generation, data loading, checkpointing, configuration management, and benchmark correctness. The most significant finding is that **results produced by the current codebase for local-filesystem and object-storage workloads are not directly comparable to each other**, because the two backend paths perform different amounts of CPU work even when given identical data. This calls into question a class of published comparisons. - -The issues range from critical correctness bugs to structural inefficiencies. All are actionable. None require redesigning the benchmark's overall architecture. - ---- - -## Critical Issues (Affect Correctness of Results) - -### 1. File and Object Storage Backends Are Not Measuring the Same Thing - -The object-storage readers were written to skip all data decoding — they read raw bytes, record the byte count, and discard the bytes, because DLIO returns a pre-allocated random tensor to the training loop regardless of what was read. The local-filesystem readers were not updated to match: they fully decode every JPEG file (using PIL), fully load every NPY array (using NumPy), and fully inflate compressed HDF5 datasets — all of which is then discarded. - -**Consequence:** A local-filesystem JPEG benchmark spends 70–99% of training-step time on CPU image decoding, not on I/O. An equivalent object-storage benchmark spends near 0% on decoding. The same storage hardware running the same data through the two paths can produce benchmark numbers that differ by 5–20× due entirely to this CPU overhead difference, not actual storage performance differences. - -**Decision required:** Bring local-filesystem readers up to the standard already implemented in the S3 iterable readers. This is a code-only change and does not affect the storage I/O being measured. Until this is done, cross-backend comparisons in benchmark reports are not internally consistent. - -→ Full analysis: [Section 13](DLIO_IO_Issues-Proposal_2026-03-28.md#13-file-vs-object-workload-asymmetry--closing-the-performance-gap) - -### 2. Data Generation Is Slower Than It Needs to Be by Orders of Magnitude - -JPEG and PNG data generation is CPU-bottlenecked on image compression, not on storage write throughput. At typical image sizes, generating an ImageNet-scale dataset (1.28 million files) takes approximately 80 minutes per rank for JPEG, and over 4 hours per rank for PNG. The actual storage write takes roughly 16 seconds per rank. Generation time is 300–1000× longer than storage write time, dominated entirely by compression work that has no bearing on the storage being benchmarked. - -For the most common benchmark configurations (non-DALI data loaders), JPEG and PNG files do not need to be valid image files, because the reader never decodes them. The generator can write raw random bytes directly, collapsing generation overhead from ~30 milliseconds per file to under 0.01 milliseconds — a 2000–4000× speedup. This applies to all configurations except those using NVIDIA DALI, which calls a real image decoder and therefore requires valid JPEG bitstreams. - -**Decision required:** Update JPEG and PNG generators to detect the configured data loader and skip image encoding when the reader does not decode. For DALI configurations, accept the encoding cost as unavoidable and document it as a known constraint. - -→ Full analysis: [Section 9g](DLIO_IO_Issues-Proposal_2026-03-28.md#9g-jpeGpng-do-files-need-to-be-actually-valid-images), [Section 9d](DLIO_IO_Issues-Proposal_2026-03-28.md#9d-where-time-actually-goes-in-an-end-to-end-jpeg-benchmark-run) - -### 3. TFRecord / Iterative Sampler Reads the Wrong Files on Non-Zero Ranks - -A file-index tracking bug in `build_sample_map_iter()` causes MPI rank 1 and above to read from the wrong portion of the dataset when using the iterative data sampler (standard for TFRecord workloads). The first file read per rank is correct; all subsequent reads revert to iterating from the beginning of the file list. Both rank 0 and rank 1 end up reading the same overlapping set of files while the upper half of the dataset is never read by any rank. - -**Consequence:** Any TFRecord benchmark result using more than one MPI rank double-counts data from the lower half of the dataset and misses the upper half entirely. Reported throughput is inflated and not reproducible by other means. - -**Decision required:** Fix the file-index counter in `build_sample_map_iter()`. The PyTorch index sampler does not have this bug. - -→ Full analysis: [Section 2b](DLIO_IO_Issues-Proposal_2026-03-28.md#2b-tf--iterative-path--build_sample_map_iter-used-when-data_loader_sampler--iterative), [Section 6e](DLIO_IO_Issues-Proposal_2026-03-28.md#6e-build_sample_map_iter-bug--concrete-description) - ---- - -## High-Priority Issues (Significantly Affect Benchmark Quality) - -### 4. `read_threads` Is Hardcoded at a Value That Is Wrong at Scale - -The thread count for parallel I/O is set as a fixed integer in each YAML config file and is never adjusted for the actual deployment topology. For JPEG/PNG workloads, storage throughput scales directly with the number of concurrent open requests. With the default value, a typical NFS deployment uses less than 10% of its available bandwidth — not because the storage is slow, but because the benchmark is not issuing enough concurrent requests. The correct value varies by an order of magnitude depending on how many MPI ranks share a node. - -**Decision required:** Support an `auto` setting for `read_threads` that resolves at runtime based on the actual MPI topology. Keep the integer form for reproducible runs. Update default configs to a higher starting value. - -→ Full analysis: [Section 11](DLIO_IO_Issues-Proposal_2026-03-28.md#11-read_threads--fixed-yaml-value-vs-runtime-adaptive-sizing) - -### 5. Deduplicating Storage Systems Will Produce Meaningless Results Without Unique File Content - -Every generated file must contain content that is byte-unique across the entire dataset. Storage systems from major enterprise vendors (NetApp, Pure Storage, Vast Data, and many object stores) apply inline deduplication by default. If multiple files share identical byte content, the storage system physically stores only one copy and the benchmark measures deduplication throughput rather than storage write throughput. Results can appear orders of magnitude higher than the system's actual sustainable ingestion rate. - -The codebase correctly uses a unique random seed per file via dgen-py; however, any shortcut that pre-computes one serialized blob and copies it across files — for any format — would silently produce deduplicated data. This constraint must be treated as non-negotiable for any benchmark run on production storage. - -→ Full analysis: [Section 9e](DLIO_IO_Issues-Proposal_2026-03-28.md#9e-the-non-negotiable-constraint-every-file-must-contain-unique-bytes) - -### 6. Storage Reader CPU Overhead Contaminates Training-Step Timing - -Even apart from the file/object asymmetry described in Issue 1, all local-filesystem readers include CPU decode time inside the training-step latency window. The benchmark reports this combined time as if it were pure storage access time. For JPEG workloads, 71–99% of the reported per-sample time is CPU decoding, not storage I/O. - -→ Full analysis: [Section 9c](DLIO_IO_Issues-Proposal_2026-03-28.md#9c-reader-overhead-by-format-local-filesystem-path), [Section 9d](DLIO_IO_Issues-Proposal_2026-03-28.md#9d-where-time-actually-goes-in-an-end-to-end-jpeg-benchmark-run) - ---- - -## Structural Issues (Reduce Maintainability and Reproducibility) - -### 7. Forty-Nine Configuration Files for a Small Orthogonal Matrix - -The `configs/dlio/workload/` directory contains 49 YAML files covering a matrix of approximately 7 models × 4 storage backends × 2–3 phases. The file count grows multiplicatively with every new backend or model. Files share 90–95% identical content; the differing fields are storage backend name, bucket name, and endpoint URL. The endpoint URLs hard-code a specific lab IP address, making every object-storage config file non-portable outside that lab. - -Hydra, the configuration framework already in use, supports config composition through config groups. Adopting it reduces the 49 files to approximately 13 (7 model configs plus 3 shared storage templates plus 3 workflow configs), with connection details supplied at runtime rather than baked into files. - -→ Full analysis: [Section 7](DLIO_IO_Issues-Proposal_2026-03-28.md#7-yaml-config-proliferation-analysis), [Section 8](DLIO_IO_Issues-Proposal_2026-03-28.md#8-proposed-yaml-config-architecture) - -### 8. `multiprocessing_context` Must Match the Storage Library or Hangs Silently - -The fork-vs-spawn setting for DataLoader workers must be `spawn` for object-storage libraries that maintain background threads (s3dlio, s3torchconnector). If a user copies a local-filesystem YAML and adds an object-storage backend without changing `multiprocessing_context`, all object-storage reads will silently hang with no error message. The constraint is documented only in YAML comments, not enforced in code. - -→ Full analysis: [Section 6c](DLIO_IO_Issues-Proposal_2026-03-28.md#6c-multiprocessing_context-couples-to-storage_library-but-lives-in-reader) - -### 9. `storage_library` Config Schema Is Inconsistent - -The `storage_library` field lives in an inconsistent location across the YAML schema, dataclass, and validation code. This creates ambiguity in how CLI overrides are expressed and silently returns `None` in any code path that accesses the field outside the standard load sequence. - -→ Full analysis: [Section 6a](DLIO_IO_Issues-Proposal_2026-03-28.md#6a-storage_library-promotion-inconsistency) - ---- - -## Lower-Priority Issues (Operational Efficiency) - -### 10. No Intra-Rank Parallelism for Data Generation - -Each MPI rank generates files sequentially. On multi-core nodes, all cores beyond the one doing the generation loop sit idle during what is usually the longest phase of a benchmark run. Adding thread-level parallelism within each rank would multiply generation throughput by the available core count. - -→ Full analysis: [Section 5, Item 2](DLIO_IO_Issues-Proposal_2026-03-28.md#5-specific-improvement-opportunities), [Section 12e, Item 3](DLIO_IO_Issues-Proposal_2026-03-28.md#12e-recommendations) - -### 11. Object Store Generation Has No Async Pipeline - -Each file is generated and uploaded synchronously. Generation and upload cannot overlap, meaning each rank waits for the upload acknowledgment before generating the next file. An async upload pipeline would allow the CPU to generate the next file while the network transfers the previous one. - -→ Full analysis: [Section 5, Item 4](DLIO_IO_Issues-Proposal_2026-03-28.md#5-specific-improvement-opportunities) - -### 12. MPI Topology Is Collected but Not Used for Resource Planning - -DLIO already collects per-node rank counts and node indices at startup, but does not use this information to auto-size thread counts, assign file-locality by node, or report topology in benchmark output. All three uses are straightforward given the existing data. - -→ Full analysis: [Section 12](DLIO_IO_Issues-Proposal_2026-03-28.md#12-mpi-multi-host-topology--available-infrastructure-missing-integration) - -### 13. No Settle-Time Guard After Generation on Eventual-Consistency Systems - -After data generation completes, the benchmark immediately begins listing the generated files. On object stores with eventual-consistency semantics or NFS with attribute caching, newly written objects may not be visible to a listing immediately. If the listing returns fewer files than expected, the benchmark aborts with an error rather than retrying. - -→ Full analysis: [Section 6f](DLIO_IO_Issues-Proposal_2026-03-28.md#6f-no-barrier-before-directory-walk-in-initialize) - ---- - -## Recommended Prioritization - -| Priority | Issue | Effort | Impact | -|---|---|---|---| -| **Immediate** | File vs. object reader asymmetry (Issue 1) | Medium | Invalidates cross-backend comparisons | -| **Immediate** | TFRecord iterative sampler bug (Issue 3) | Low | Invalidates multi-rank TFRecord results | -| **High** | JPEG/PNG generator skips encoding for non-DALI (Issue 2) | Medium | Reduces generation from hours to seconds | -| **High** | Unique-bytes constraint enforcement (Issue 5) | Low | Prevents meaningless results on dedup storage | -| **High** | Auto-size `read_threads` (Issue 4) | Low | Unlocks full storage bandwidth at scale | -| **Medium** | Derive `multiprocessing_context` automatically (Issue 8) | Low | Prevents silent hangs on config copy/paste | -| **Medium** | YAML config composition with Hydra (Issue 7) | High | Reduces maintenance burden by ~70% | -| **Medium** | Intra-rank generation parallelism (Issue 10) | Medium | Reduces generation wall-clock time proportionally | -| **Low** | Async object-store upload pipeline (Issue 11) | Medium | Marginal throughput improvement | -| **Low** | Node-local file affinity and topology logging (Issue 12) | Low | Improves NFS locality and result reproducibility | -| **Low** | Post-generation settle time (Issue 13) | Low | Prevents spurious failures on object stores | - ---- - -## What Is Already Working Well - -The following design decisions in the current codebase are correct and should be preserved: - -- **dgen-py for data generation**: the zero-copy Rust-backed PRNG is the right foundation for all format generators. It is fast enough to never be the bottleneck and produces genuinely unique content per file. -- **S3 iterable readers**: the skip-decode architecture is correct and complete. The task is to apply the same pattern to local-filesystem readers, not to change the object-storage path. -- **Per-rank checkpoint files**: the distributed checkpointing design (each rank writes its own file, no serialization, barriers only at epoch boundaries) is correct for the workload being simulated. -- **MPI topology collection in DLIOMPI**: the infrastructure to make topology-aware decisions is already present. It only needs to be wired into resource planning. -- **TFRecord reader**: already returns the pre-allocated tensor without touching file bytes — the correct behaviour that all other readers need to adopt. - ---- - -*Full technical analysis, code examples, and implementation details are in [DLIO_IO_Issues-Proposal_2026-03-28.md](DLIO_IO_Issues-Proposal_2026-03-28.md).* diff --git a/docs/DLIO_IO_Issues-Proposal_2026-03-28.md b/docs/DLIO_IO_Issues-Proposal_2026-03-28.md deleted file mode 100644 index 854d53d5..00000000 --- a/docs/DLIO_IO_Issues-Proposal_2026-03-28.md +++ /dev/null @@ -1,1052 +0,0 @@ -# MPI Sharding & Parallelism Investigation: `dlio_benchmark` - -**Date:** 2026-03-28 - ---- - -## 1. Data Generation - -**File:** `dlio_benchmark/data_generator/data_generator.py` - -**Sharding strategy — `_generate_files()`:** -```python -for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size): - ... -``` -Classic rank-stride sharding. Rank `r` owns files at global indices `r, r+comm_size, r+2*comm_size, …`. File paths are pre-computed in `self._file_list[i]`, which distributes them across `num_subfolders_train` round-robin. This is correct and reproducible. - -**Seed handling:** `BASE_SEED + my_rank` for the per-rank RNG. File-level seeds are derived from a flowing `rng.integers(0, 2**63)` — no adjacent-seed correlation. Reproducible across runs. - -**Directory creation bottleneck:** Only rank 0 creates directories (correct, but means all other ranks idle during the `create_node` loop for subfolders). On slow NFS with many subfolders, this is measurable latency. - -**Intra-rank parallelism:** **None.** Each rank generates files in a serial loop. No threading. For large datasets on fast storage, each rank is I/O-bound writing one file at a time. - -**Object store path:** After each file, `storage.put_data(path, bytes_value)` is called synchronously. No pipelining or async upload. - ---- - -## 2. Data Loading (Training) - -**Files:** `dlio_benchmark/utils/config.py` · `dlio_benchmark/data_loader/torch_data_loader.py` · `dlio_benchmark/reader/reader_handler.py` - -### 2a. PyTorch path — `get_global_map_index()` (used when `data_loader_sampler == INDEX`) -```python -samples_per_proc = ceil(total_samples / comm_size) -start_sample = my_rank * samples_per_proc -end_sample = (my_rank + 1) * samples_per_proc - 1 -# ... -file_index = floor(global_sample_index / num_samples_per_file) -abs_path = file_list[file_index] -``` -**Correct.** Each rank gets a contiguous slice of the global sample space. File-to-sample mapping is done via global index, so rank `r` naturally reads a contiguous block of files. The custom `dlio_sampler` pre-computes `[start_sample, end_sample]` and yields indices from that range. - -Thread-level parallelism comes from `read_threads` (PyTorch `DataLoader` `num_workers`) with `multiprocessing_context` and `prefetch_factor`. Each worker independently reads samples. - -### 2b. TF / iterative path — `build_sample_map_iter()` (used when `data_loader_sampler == ITERATIVE`) -```python -files_per_rank = (num_files // comm_size) % num_files -file_index = my_rank * files_per_rank # ← initial offset -for sample in sample_list: # sample_list is global-indexed - abs_path = file_list[file_index] - sample_index += 1 - file_index = (sample_index // num_samples_per_file) % num_files # ← LOCAL counter -``` -**Bug:** The initial `file_index` (rank-aware offset) is applied only to the **first** sample. After that, `file_index` is driven by a LOCAL `sample_index` that starts at 0 regardless of rank. For rank 1 with `sppf=500`, rank 1 reads `file[2]` (correct), then immediately falls back to reading `file[0], file[0], …, file[1], …`. - -This means the TF iterative path does **not** correctly shard files across ranks — it reads from mostly the wrong files for all non-rank-0 ranks. The PyTorch index path does not have this bug. - -### 2c. Cross-rank file distribution pattern -All ranks share the **same flat global file list** built by rank 0 (via `storage.walk_node()` + sort). There is no per-rank subdirectory affinity. With `num_subfolders_train > 0` the files are distributed across subfolders, but each rank reads from any subfolder in the list — there is no "this rank owns this directory" concept. - ---- - -## 3. Checkpointing - -**Files:** `dlio_benchmark/checkpointing/base_checkpointing.py` · `dlio_benchmark/checkpointing/pytorch_checkpointing.py` - -**Per-rank files:** Each checkpointing rank writes to `checkpoint_folder/global_epoch{E}_step{S}/model_states-{rank}.pt` independently — no rank serialization. Standard distributed checkpoint pattern. - -**Who checkpoints:** Controlled by `zero_stage`, `tensor_parallelism`, `pipeline_parallelism`, and `data_parallelism`. With `zero_stage=0`, only ranks `< model_parallelism` actually write (data-parallel copies are deduplicated). This is correct. - -**In-rank parallelism (checkpoint read):** `_get_streaming()` creates a `StreamingCheckpointing` instance with `num_parallel_readers=4`, `chunk_size=32MiB`. This parallelizes the read within a single rank's file. Writes happen via a single sequential stream. - -**Memory model:** `_SizePlaceholder` (no actual tensor allocation) + `_compute_state_bytes()` → correct byte count passed to the streaming backend. No RAM proportional to model size is used during save/load. - -**Barriers:** `comm.barrier()` after each checkpoint step in `_checkpoint_write()` / `_train()`. Optional `checkpoint_rank_sync` adds an extra barrier after every individual checkpoint. No barrier between individual layer writes within a rank. - -**Layer writes are serial:** Within a rank, layers are saved in a `for layer_index in range(start_layer, end_layer+1)` loop — no threading across layers. - ---- - -## 4. Summary Table - -| Component | MPI Sharding | Intra-rank Threads | Key Issue | -|---|---|---|---| -| Data generator | ✅ stride `range(rank, N, size)` | ❌ None (serial) | No parallel file writes; slow for large datasets | -| Data loading (PyTorch) | ✅ contiguous sample slice, correct file mapping | ✅ `read_threads` workers | No per-rank directory affinity | -| Data loading (TF/iter) | ⚠️ Bug: only first file uses rank offset | ✅ `read_threads` | `build_sample_map_iter()` file_index resets to 0 after first sample | -| Checkpointing (write) | ✅ each rank writes its own file | ❌ layers written serially | No parallel layer writes per rank | -| Checkpointing (read) | ✅ each rank reads its own file | ✅ 4 parallel readers | Only parallelized on the read path | - ---- - -## 5. Specific Improvement Opportunities - -1. **Per-rank subdirectory ownership during generation and loading**: Set `num_subfolders_train = comm_size` and have rank `r` exclusively write to (and read from) `train/{r:04d}/`. This eliminates namespace contention on NFS/Lustre and makes the I/O pattern far more realistic for distributed storage. Today `num_subfolders_train` partitions files into folders but without rank affinity. - -2. **Parallel intra-rank file generation**: Wrap the `_generate_files()` loop in a `ThreadPoolExecutor(max_workers=N)` — each thread writes an independent file (already uniquely seeded). This would N× generation throughput per rank on fast storage (NVMe, object store). - -3. **Fix `build_sample_map_iter()` file index tracking**: The local `sample_index` counter should be replaced with the global sample index for the file lookup, matching the logic in `get_global_map_index()`. Currently rank 1+ in TF mode reads wrong files. - -4. **Async object store upload**: In `_generate_files()`, the `storage.put_data(path, bytes)` call is synchronous. A bounded async queue (e.g., `asyncio` or `ThreadPoolExecutor`) would pipeline data generation and upload. - -5. **Parallel checkpoint layer writes per rank**: The inner `for layer_index in range(start_layer, end_layer+1)` loop in `save_checkpoint()` is serial. Since each layer writes to an independent file, these could be parallelized with threads — especially relevant for large models with many layers. - -6. **Read-ahead / file pinning**: The `read_threads` workers in PyTorch mode all operate on the global file list. Adding an optional `prefetch_list` derived from each rank's assigned file range (pinning files to DRAM via `mmap`) before training starts would eliminate open-file latency in tight training loops. - ---- - -## 6. Additional Issues Identified on Second Review - -### 6a. `storage_library` Promotion Inconsistency - -**File:** `dlio_benchmark/utils/config.py` — `LoadConfig()` (line ~1075) and `validate()` (line ~368) - -The YAML schema places `storage_library` as a top-level key under `storage:`: -```yaml -storage: - storage_type: s3 - storage_library: s3dlio # ← top-level in YAML - storage_options: - endpoint_url: https://... -``` - -But `validate()` reads it from inside `storage_options`: -```python -storage_library = (self.storage_options or {}).get("storage_library") -``` - -This only works because `LoadConfig()` performs an explicit "promotion" — it detects `config['storage']['storage_library']` and injects it into `args.storage_options['storage_library']`. So the YAML schema and the dataclass schema are inconsistent: `ConfigArguments` has no top-level `storage_library` field, and `validate()` assumes it has been promoted into `storage_options`. - -**Risk:** Any code path that evaluates `storage_library` before or outside `LoadConfig()` (e.g., a custom runner that builds `ConfigArguments` by hand) will see `None`. Additionally, the Hydra CLI override path is ambiguous — both `++workload.storage.storage_library=s3dlio` (promoted by LoadConfig) and `++workload.storage.storage_options.storage_library=s3dlio` (direct) work, but neither is documented clearly, and users who pass the wrong one get an opaque `None` check failure. - -**Recommendation:** Add `storage_library: str = ""` as a first-class field on `ConfigArguments`, read it directly in `validate()` from `self.storage_library`, and have `LoadConfig()` populate it without the promotion workaround. - -### 6b. `validate()` Called Before File List Is Available - -`derive_configurations()` (which calls `validate()`) is called twice: -1. During `DataGenerator.__init__()` with no file lists (the generator-only early path) -2. During `DLIOBenchmark.initialize()` after the storage walk - -On the first call, credential checks and `storage_library` validation run even when the run is a pure file-system operation. More importantly, some validation branches (e.g., NPZ reader import checks) are exercised before it is clear whether object storage will actually be used. This is harmless when it works but adds unnecessary error surface for misconfigured environments. - -**Recommendation:** Separate `validate_storage()` (called early, storage-type-aware) from `validate_workload()` (called after file lists are known). Only run credential checks when `storage_type == StorageType.S3`. - -### 6c. `multiprocessing_context` Couples to `storage_library` But Lives in `reader:` - -The `multiprocessing_context` key lives under `reader:` but its correct value depends entirely on the storage backend: - -| Storage library | Required `multiprocessing_context` | Reason | -|---|---|---| -| `local_fs` / `minio` | `fork` (default) | No async runtime in worker | -| `s3dlio` | `spawn` | Tokio runtime destroyed by fork | -| `s3torchconnector` | `spawn` | Background S3 threads destroyed by fork | - -This coupling is currently enforced only through comments in the YAML files. If a user copies a file-backend YAML and adds an s3dlio storage section without updating the reader section, all S3 reads will silently hang (the Tokio runtime is dead in the forked child). There is no runtime warning or error. - -**Recommendation:** In `derive_configurations()`, after `storage_library` is known, automatically set `self.multiprocessing_context = "spawn"` if the library is `s3dlio` or `s3torchconnector`, with a warning if the YAML had explicitly set `fork`. This makes the constraint self-enforcing rather than documentation-dependent. - -### 6d. Hardcoded Endpoint URIs in YAML Files - -The lab IP `https://172.16.1.40:9000` appears hardcoded in every object-storage YAML: -```yaml -storage_options: - endpoint_url: https://172.16.1.40:9000 -``` - -This makes every object-storage YAML file **environment-specific** — they fail immediately in any other environment (CI, cloud, different lab). It also means the same model config cannot be shared across teams without edits. - -**Recommendation:** Use environment variable resolution for all connection properties. Hydra supports `${oc.env:AWS_ENDPOINT_URL}` interpolation. Alternatively, treat `endpoint_url` as a required CLI override with no default, so the YAML template contains a clearly-marked placeholder: -```yaml -storage_options: - endpoint_url: ??? # Required: set via ++workload.storage.storage_options.endpoint_url=... -``` - -### 6e. `build_sample_map_iter()` Bug — Concrete Description - -For completeness, here is the exact effect of the file-index tracking bug (Section 2b) with a worked example: - -Given 8 files, 2 ranks, 4 files per rank, `num_samples_per_file=1`: -- Rank 1: `files_per_rank = (8 // 2) % 8 = 4`. `file_index` starts at `1 * 4 = 4` (correct, pointing to file[4]). -- First iteration: reads `file[4]` ✅ -- After first sample: `sample_index = 1`, `file_index = (1 // 1) % 8 = 1` — now pointing to `file[1]` ❌ (should be `file[5]`) -- All subsequent samples for rank 1 iterate through `file[1], file[2], file[3], …` — the same file range as rank 0. - -Both ranks read overlapping files, meaning the benchmark double-counts throughput from the same data and misses the upper half of the dataset entirely. The PyTorch index path (`get_global_map_index()`) does not have this bug. TFRecord workloads using the `ITERATIVE` sampler are affected. - -### 6f. No Barrier Before Directory Walk in `initialize()` - -In `DLIOBenchmark.initialize()`, when `generate_data=True`, all ranks barrier after generation: -```python -self.data_generator.generate() -self.comm.barrier() # ← correct -``` - -But then rank 0 immediately proceeds to `storage.walk_node()` inside the same `initialize()` call (after the barrier) to build `file_list_train`. On object stores with eventual-consistency semantics (or NFS with attribute caching), newly written objects may not yet be visible to a listing. There is no retry or wait logic before the walk. If the walk returns fewer files than expected, a hard exception is raised. - -**Recommendation:** Add a configurable `post_generation_settle_time` (default 0) with a rank-0 sleep + broadcast before the walk when `storage_type != local_fs`. - ---- - -## 7. YAML Config Proliferation Analysis - -### 7a. Current State - -The `configs/dlio/workload/` directory contains **49 YAML files** for what is effectively a small matrix of orthogonal dimensions: - -| Dimension | Values | -|---|---| -| Model / workload | unet3d, resnet50, cosmoflow, llama3_8b, dlrm, flux, retinanet | -| Storage backend | local_fs, s3+minio, s3+s3dlio, s3+s3torchconnector | -| Phase | datagen only, train only, checkpoint only, train+checkpoint | -| Scale | a100, h100, b200, mi355, 1t, 405b, 70b, 8b | - -The current approach creates one YAML per _combination_. For a single model (unet3d h100), this already produces 7 files: - -``` -unet3d_h100.yaml ← file, train -unet3d_h100_minio.yaml ← minio, train -unet3d_h100_minio_datagen.yaml ← minio, datagen -unet3d_h100_s3dlio.yaml ← s3dlio, train -unet3d_h100_s3dlio_datagen.yaml ← s3dlio, datagen -unet3d_h100_s3torch.yaml ← s3torchconnector, train -unet3d_h100_s3torch_datagen.yaml ← s3torchconnector, datagen -``` - -Similarly, llama3_8b generates 4 files; cosmoflow, resnet50, flux, retinanet, dlrm create additional duplicates. This pattern scales as `O(models × libraries × phases)`. - -### 7b. What Differs Between Files — and What Doesn't - -Comparing the three unet3d-h100 training variants (minio / s3dlio / s3torch), **the only fields that differ** are: -```yaml -# Differs: -storage.storage_library: minio | s3dlio | s3torchconnector -storage.storage_root: mlp-minio | mlp-s3dlio | mlp-s3torch -storage_options.endpoint_url: # same IP, but separate bucket implies separate data staging -reader.multiprocessing_context: spawn # same for all three object store variants -# s3dlio only: -storage_options.s3_force_path_style: true -# minio only: -storage_options.secure: false -``` - -**Everything else is identical**: model definition, framework, dataset sizes, record lengths, train epochs, computation time, batch size, read threads, shuffle settings, metric target AU. - -The datagen variants differ from the train variants only in: -```yaml -workflow.generate_data: True # vs False -workflow.train: False # vs True -``` - -### 7c. Root Causes of the Explosion - -1. **No config composition**: Hydra supports config groups (sub-directories with named YAML slices that can be composed), but the current setup uses a flat directory of monolithic files. There is no `defaults:` list or group structure. - -2. **Storage connection params are baked in**: The endpoint URL and bucket name are specific to a single lab, making every file non-portable. Portable configs require parameterizing these, which currently gets done by forking. - -3. **`workflow.generate_data` / `workflow.train` are toggled by file, not CLI**: Users fork the YAML to change phase rather than passing `++workload.workflow.generate_data=True` on the command line. - -4. **`storage_library` is not a CLI-first parameter**: The library choice (minio vs s3dlio vs s3torchconnector) is buried inside the YAML, requiring a separate file per library instead of a single override on the command line. - ---- - -## 8. Proposed YAML Config Architecture - -### 8a. Principle: Separate What Changes from What Doesn't - -The YAML files should capture stable model/workload facts (architecture, dataset sizes, target AU, epoch count, computation time). Storage backend and connection details should be supplied at runtime via CLI overrides or a small environment-local override file. - -### 8b. Recommended Directory Structure (Hydra Config Groups) - -``` -configs/dlio/ - config.yaml ← top-level Hydra config with defaults list - workload/ - models/ ← config group: model + dataset + training params - unet3d_h100.yaml - resnet50_a100.yaml - cosmoflow_a100.yaml - llama3_8b.yaml - dlrm_b200.yaml - flux_b200.yaml - retinanet_b200.yaml - storage/ ← config group: storage backend templates - file.yaml ← local_fs, no credentials required - s3_minio.yaml ← s3 + minio SDK, endpoint_url = ??? - s3_s3dlio.yaml ← s3 + s3dlio, endpoint_url = ??? - s3_s3torch.yaml ← s3 + s3torchconnector, endpoint_url = ??? - workflow/ ← config group: what phases to run - train.yaml ← generate_data: False, train: True - datagen.yaml ← generate_data: True, train: False - checkpoint.yaml ← generate_data: False, train: False, checkpoint: True - full.yaml ← generate_data: True, train: True -``` - -A model file (`models/unet3d_h100.yaml`) would contain only stable facts: -```yaml -# configs/dlio/workload/models/unet3d_h100.yaml -model: - name: unet3d - type: cnn - model_size: 499153191 - -framework: pytorch - -dataset: - data_folder: test-run/unet3d # relative path within bucket or filesystem root - format: npz - num_files_train: 168 - num_samples_per_file: 1 - record_length_bytes: 146600628 - record_length_bytes_stdev: 68341808 - record_length_bytes_resize: 2097152 - -reader: - data_loader: pytorch - batch_size: 7 - read_threads: 4 - file_shuffle: seed - sample_shuffle: seed - -train: - epochs: 5 - computation_time: 0.323 - -checkpoint: - checkpoint_folder: checkpoints/unet3d - checkpoint_after_epoch: 5 - epochs_between_checkpoints: 2 - -metric: - au: 0.90 -``` - -A storage template (`storage/s3_s3dlio.yaml`) would contain backend facts with required fields explicitly marked: -```yaml -# configs/dlio/workload/storage/s3_s3dlio.yaml -storage: - storage_type: s3 - storage_library: s3dlio - storage_root: ??? # Required: bucket name, set via CLI - storage_options: - endpoint_url: ??? # Required: set via ++workload.storage.storage_options.endpoint_url= - region: us-east-1 - s3_force_path_style: true - -reader: - multiprocessing_context: spawn # Required for s3dlio — Tokio is fork-unsafe -``` - -### 8c. Command-Line Patterns for Runtime Switching - -With this structure, switching backends requires only CLI overrides — no new YAML files: - -**File-backend training:** -```bash -dlio_benchmark \ - workload=models/unet3d_h100 \ - ++workload.storage.storage_type=local_fs \ - ++workload.storage.storage_root=/mnt/scratch/dlio-data \ - ++workload.workflow.generate_data=False \ - ++workload.workflow.train=True -``` - -**Object storage with s3dlio:** -```bash -dlio_benchmark \ - workload=models/unet3d_h100 \ - ++workload.storage.storage_type=s3 \ - ++workload.storage.storage_library=s3dlio \ - ++workload.storage.storage_root=mlp-s3dlio \ - ++workload.storage.storage_options.endpoint_url=${AWS_ENDPOINT_URL} \ - ++workload.workflow.train=True -``` - -**Switch to minio on the same command, same model:** -```bash -# Change only storage_library and storage_root (bucket name) -... ++workload.storage.storage_library=minio \ - ++workload.storage.storage_root=mlp-minio \ - ++workload.reader.multiprocessing_context=fork -``` - -**Datagen-only, then train:** -```bash -# Step 1: generate -dlio_benchmark workload=models/unet3d_h100 \ - ++workload.storage.storage_type=s3 \ - ++workload.storage.storage_library=s3dlio \ - ++workload.storage.storage_root=mlp-s3dlio \ - ++workload.storage.storage_options.endpoint_url=${AWS_ENDPOINT_URL} \ - ++workload.workflow.generate_data=True \ - ++workload.workflow.train=False - -# Step 2: train (identical flags, flip workflow) -dlio_benchmark workload=models/unet3d_h100 \ - ... \ - ++workload.workflow.generate_data=False \ - ++workload.workflow.train=True -``` - -### 8d. Environment-Local Override File (Alternative to Shell Functions) - -For teams with a fixed endpoint, a local override file can be sourced by Hydra without committing credentials to the repo: - -```yaml -# configs/dlio/local.yaml (gitignored) -defaults: - - override storage: s3_s3dlio - -storage: - storage_root: my-bucket - storage_options: - endpoint_url: https://my-minio.internal:9000 -``` - -Then run: -```bash -dlio_benchmark +local=local workload=models/unet3d_h100 ++workload.workflow.train=True -``` - -### 8e. Impact on File Count - -Under the proposed structure, the 7 unet3d-h100 files collapse to 1 model file plus 3 reusable storage templates (shared by all models). Across the full matrix of 7 models × 3 object libraries × 2 phases, the ~30 object-storage YAML files collapse to 7 model files + 3 storage templates + 3 workflow files = **13 files total** — a ~70% reduction, and all storage templates are shared across models. - -### 8f. Short-Term Mitigation (No Refactor Required) - -If the full Hydra config-group refactor is not immediately feasible, the proliferation can be stopped without changing existing files: - -1. **Stop adding `_minio.yaml`, `_s3dlio.yaml`, `_s3torch.yaml` variants.** Document the override pattern in `README_S3DLIO_CONFIGS.md` instead. -2. **Remove hardcoded IPs** from existing YAML files. Replace with `???` (Hydra's "required, no default" sentinel) and add `endpoint_url` to the run instructions. -3. **Add a shared `storage/` config group** with the three library templates. New models only need a model YAML; storage is composed at runtime. -4. **Derive `multiprocessing_context`** automatically from `storage_library` in `derive_configurations()` to remove the hidden coupling. -5. **Phase switches via CLI**: Add one-line documentation showing `++workload.workflow.generate_data=True` so users stop forking YAML files to change only the phase. - ---- - -## 9. The Core Principle: This Is a Storage Benchmark, Not a Data Processing Benchmark - -### 9a. The Design Intent Is Correct — But the Implementation Is Incomplete - -The codebase already recognises that decoded data has no value. In `reader_handler.py`, every read path ends with: - -```python -# FormatReader.next() and FormatReader.read_index() -... -self.get_sample(filename, sample_index) # reads + decodes file -self.preprocess() -return self._args.resized_image # ← decoded data is THROWN AWAY here -``` - -`resized_image` is a **single random tensor**, allocated once at startup in `derive_configurations()`: - -```python -self.resized_image = gen_random_tensor( - shape=self.transformed_record_dims, - dtype=self.transformed_record_element_dtype, rng=rng) -``` - -Every reader, every format, every sample in every epoch returns this same pre-allocated buffer. The content of what was read from storage is irrelevant by design. The benchmark measures how fast the storage can deliver bytes — not what those bytes mean. - -The TFRecord reader already honours this principle fully: `_parse_image()` returns `self._resized_image` without touching the raw bytes at all. The S3 iterable readers (`image_reader_s3_iterable.py`, `hdf5_reader_s3_iterable.py`, `tfrecord_reader_s3_iterable.py`) store only byte counts for telemetry, never decoded arrays. - -**The problem is that for local-filesystem readers and all generators, the code does substantial CPU-intensive data transformation work whose only output is a buffer that is immediately discarded.** Every CPU cycle spent on JPEG entropy coding, PIL decoding, protobuf serialization, or zlib compression is overhead injected into a storage benchmark that doesn't need it. - -### 9b. Generator Overhead by Format - -| Format | Generation work | Relevant to storage? | CPU cost | -|---|---|---|---| -| JPEG | `gen_random_tensor` → `PIL.fromarray` → `img.save(format='JPEG')` (DCT + quantize + Huffman) | ❌ | High: 10–60 ms/file | -| PNG | `gen_random_tensor` → `PIL.fromarray` → `img.save(format='PNG')` (Deflate lossless) | ❌ | Very high: 30–200 ms/file | -| NPY | `gen_random_tensor` (dgen-py) → `np.save()` (raw binary dump) | ✅ Near-minimal | Low: < 1 ms/file | -| NPZ (no compression) | `gen_random_tensor` → `np.savez()` (ZIP container, stored mode) | ✅ Near-minimal | Low | -| NPZ (zip compression) | `gen_random_tensor` → `np.savez_compressed()` (ZIP+Deflate) | ❌ | Medium–high: zlib per file | -| HDF5 (no compression) | `gen_random_tensor` → h5py metadata + raw dataset write | Mostly ✅ | Low–medium | -| HDF5 (gzip) | + GZIP compression per dataset | ❌ | Medium–high | -| TFRecord | `gen_random_tensor` → `tf.train.Example` → `SerializeToString()` per sample | ❌ partial | Medium: protobuf serialize | -| CSV | `gen_random_tensor` → `pd.DataFrame.to_csv()` (text encode + float formatting) | ❌ | Medium: text serialization | -| IndexedBinary | `gen_random_tensor` → MPI-IO raw byte write | ✅ Minimal | Low | -| Synthetic | single integer written as UTF-8 string | ✅ Minimal | Negligible | - -**JPEG and PNG are the worst offenders** because the encoder is CPU-bound and irreversibly entangled in the format: there is no way to construct a valid JPEG or PNG without running the compression algorithm, because the file format *is* the compressed output. - -### 9c. Reader Overhead by Format (Local Filesystem Path) - -| Format | Reader `open()` / `get_sample()` work | Decoded data used? | CPU cost | -|---|---|---|---| -| JPEG/PNG (`ImageReader`) | `PIL.Image.open()` + `np.asarray()` — full entropy decode | ❌ Discarded | High: 5–20 ms/file | -| NPY (`NPYReader`) | `np.load()` — mmap or full array load | ❌ Discarded | Low–medium | -| NPZ (`NPZReader`) | `np.load()['x']` — ZIP inflate + array load | ❌ Discarded | Medium | -| HDF5 (`HDF5Reader`) | `h5py.File()` + `dataset[sample_index]` — HDF5 chunk read + numpy convert | ❌ Discarded | Low–medium | -| TFRecord (`TFReader`) | raw bytes streamed by tf.data, `_parse_image()` returns `resized_image` directly | ✅ Already bypassed | None | -| S3 iterable readers | raw bytes fetched, byte count stored for telemetry | ✅ Already bypassed | None | - -The S3 iterable readers represent the correct pattern. They are documented explicitly: - -> *"No PIL or numpy decode is performed. DLIO's FormatReader.next() yields a pre-allocated random tensor regardless of file contents; only the byte count is needed for the image_size telemetry metric."* -> — `image_reader_s3_iterable.py` docstring - -The local-filesystem equivalents do not apply the same logic. - -### 9d. Where Time Actually Goes in an End-to-End JPEG Benchmark Run - -For a single 224×224 JPEG file on a local NFS filesystem: - -**Generation (once):** -``` -dgen_py random bytes: ~0.01 ms (fast Rust PRNG, zero-copy) -PIL.fromarray(): ~0.5 ms (copies bytes into PIL Image object) -img.save(JPEG): ~20–40 ms (DCT + quantization + Huffman coding) -write() syscall: ~0.1 ms (kernel buffer, NFS async) -Total per file: ~21–41 ms — 98% is the JPEG encoder -``` - -**Reading (every training step, every epoch):** -``` -open() syscall: ~1 ms (NFS RTT) -read() syscall: ~0.01 ms (115 KB at 10 GiB/s) -PIL.Image.open(): ~5–15 ms (JPEG entropy decode + YCbCr→RGB) -np.asarray(): ~0.5 ms (copy into numpy) -resized_image returned: decoded array discarded -Total per file: ~7–17 ms — storage I/O is < 5% of total time -``` - -The storage benchmark is spending more time on JPEG decode during reading than on actual I/O. The encode during generation is 200–4000× the storage write time. - -### 9e. The Non-Negotiable Constraint: Every File Must Contain Unique Bytes - -Before discussing any optimisation, one constraint must be stated explicitly: - -**Every generated file must contain content that is unique across the entire dataset. Reusing the same byte sequence across multiple files is a fundamental correctness error for a storage benchmark.** - -Modern storage systems — enterprise NAS arrays (NetApp, Vast Data, Pure Storage), object stores, and distributed file systems — routinely apply inline deduplication and compression. If two files have identical byte content, a deduplicating storage system stores only one physical copy, regardless of how many logical files are created. A benchmark that writes N files containing identical bytes is not measuring how fast the storage can absorb N files of unique data — it is measuring how fast the dedup engine can detect and discard duplicates. The measured throughput may be orders of magnitude higher than true storage write throughput, producing completely meaningless results. - -**The template-clone approach described in an earlier draft of this document was categorically wrong and has been withdrawn.** Writing the same pre-encoded JPEG bytes to every file would collapse 1.28 million "distinct" training images to a single unique 115 KB block in any deduplicating storage system. That is not a storage benchmark. - -The same logic applies to any "pre-compute one serialized blob and copy it N times" shortcut for any format. The byte content of every file must be independently unique. - -### 9f. dgen-py: The Correct Foundation for All Data Generation - -The correct solution to the CPU overhead problem is already present in the codebase: `gen_random_tensor()` backed by **dgen-py**, a zero-copy Rust-backed PRNG library written specifically for this project. - -Key properties that matter here: - -- **Speed**: ~155× faster than NumPy random generation. For a 224×224×3 uint8 array (150,528 bytes), dgen-py generates the raw bytes in < 0.01 ms, versus ~1.5 ms for NumPy. -- **Uniqueness**: every call with a different seed produces a statistically independent, non-repeating byte stream. Since `_generate_files()` uses a flowing RNG that advances per file (`seed = int(rng.integers(0, 2**63))`), every file gets a unique seed → unique bytes. -- **Zero-copy**: dgen-py returns a `BytesView` implementing the buffer protocol. `np.frombuffer(bytesview, dtype=dtype)` consumes it without an intermediate allocation. -- **Scalability**: because the bytes are generated in Rust with SIMD, generation throughput exceeds 50 GiB/s on modern CPUs — faster than any storage device can accept data. - -**dgen-py must be used for all new data generation, for all formats, without exception.** It is already wired into `gen_random_tensor()` and therefore already active for every format that calls it. The critical requirement is that no code path reuses byte content across file boundaries. - -For the formats where generation work is proportional to storage size (NPY, IndexedBinary, HDF5 without compression), the pipeline is already correct: - -``` -dgen-py (unique bytes, < 0.01 ms per file) → write() syscall to storage -``` - -dgen-py is the bottleneck only if the benchmark needs to generate faster than ~50 GiB/s per core, which exceeds every real storage system's ingestion bandwidth. - -### 9g. JPEG/PNG: Do Files Need to Be ACTUALLY Valid Images? - -The short answer: **it depends entirely on which data loader is configured.** - -This is the key question for generation cost. If files do not need to be valid JPEG/PNG bitstreams, the generator can write raw dgen-py bytes directly — no PIL, no DCT, no Huffman coding — reducing generation from ~20–40 ms/file to < 0.01 ms/file. That is a 2000–4000× speedup. - -#### When valid JPEG/PNG is required: DALI and NATIVE_DALI data loaders - -`dali_image_reader.py` constructs a DALI pipeline that calls: - -```python -images = fn.decoders.image(images, device='cpu') # line 80 -``` - -`fn.decoders.image()` is NVIDIA's GPU/CPU image decoder. It requires a syntactically valid JPEG or PNG bitstream. It will throw an error on random bytes, even if preceded by a correct-looking header. When `data_loader_type` is `dali` or `native_dali`, files MUST be valid images and PIL encoding is unavoidable. - -#### When valid JPEG/PNG is NOT required: all other data loaders - -The S3 iterable readers (`ImageReaderS3Iterable`) already prove this. They fetch raw bytes from object storage, record `len(raw_bytes)` for telemetry, and never call `PIL.Image.open()`. The benchmark runs correctly with files that contain arbitrary bytes — the format name attached to those bytes is irrelevant because the reader never decodes them. - -After the Section 9h fix (replacing `PIL.Image.open()` with `open(filename, 'rb').read()` in the local-filesystem `ImageReader`), the same is true for all non-DALI paths: - -| Data loader | Reader decodes image? | Files must be valid JPEG/PNG? | -|---|---|---| -| `pytorch` / `tensorflow` (local FS, current) | YES — `PIL.Image.open()` | YES (current) | -| `pytorch` / `tensorflow` (local FS, after 9h fix) | NO — raw byte read | **NO** | -| any (S3 iterable readers, already shipped) | NO — raw byte read | **NO** | -| `dali` / `native_dali` | YES — `fn.decoders.image()` | **YES, always** | - -#### The consequence for generators: branch on `data_loader_type` - -For non-DALI paths, `JPEGGenerator` and `PNGGenerator` can write raw dgen-py bytes directly, with no PIL pipeline at all: - -```python -def _write(i, dim_, dim1, dim2, file_seed, rng, out_path_spec, is_local, output): - if self._args.data_loader_type in (DataLoaderType.DALI, DataLoaderType.NATIVE_DALI): - # DALI pipeline calls fn.decoders.image() — must produce valid JPEG - records = gen_random_tensor(shape=(dim1, dim2), dtype=np.uint8, rng=rng) - img = PIL.Image.fromarray(np.clip(records, 0, 255).astype(np.uint8)) - img.save(output, format='JPEG', quality=75) - else: - # Reader reads raw bytes and discards them — any bytes work - raw = gen_random_tensor(shape=(dim1 * dim2 * 3,), dtype=np.uint8, rng=rng) - output.write(raw.tobytes()) -``` - -For the non-DALI branch the generation pipeline collapses to: - -``` -dgen-py (unique bytes, < 0.01 ms) → write() syscall to storage -``` - -This is identical to NPY generation. The "irreducible cost" of JPEG/PNG format disappears entirely for non-DALI configurations. - -#### File size note - -Raw dgen-py bytes for a 224×224×3 uint8 image = 150,528 bytes (~150 KB). A real JPEG of the same image is typically 50–115 KB (4:1–6:1 compression). The raw format produces slightly LARGER files than real JPEGs. For a storage benchmark, larger files per sample means more I/O per batch — a slightly more conservative (harder) test. This is acceptable. The `record_length` field in the benchmark config controls expected size; if exact size matching is needed, the raw write can be padded or truncated to `record_length` bytes. - -#### Remaining mitigations for the DALI path - -When `data_loader_type: dali` is configured, PIL encoding is unavoidable. The applicable mitigations are: - -1. **Lower JPEG quality.** `quality=10` encodes at 3–5× speed compared to `quality=75`. Files are still valid, unique JPEG bitstreams. -2. **Parallel intra-rank encoding via `ThreadPoolExecutor`.** PIL's JPEG encoder releases the Python GIL; 4–8 threads per rank reduces wall-clock time proportionally. -3. **Use NPY or HDF5 for pure storage benchmarks.** DALI supports NPY input natively. If the goal is to measure storage bandwidth/IOPS rather than to simulate a specific vision training pipeline, switch formats. NPY generation is already fast and the benchmark result is equivalent. - -**YAML warning recommendation for any JPEG/PNG config that uses `data_loader_type: dali`:** - -```yaml -# WARNING: DALI data loader requires valid JPEG files (fn.decoders.image() is a real decoder). -# Generation cost: ~20-40ms/file (PIL JPEG encode). For faster generation with equivalent -# storage I/O measurement, use data_loader_type: pytorch with NPY format instead. -``` - -### 9h. Reader Overhead: The Fix That Is Already Half-Done - -The S3 iterable readers already apply the correct pattern: fetch raw bytes, record the byte count for telemetry, discard the bytes, return `resized_image`. The local-filesystem `ImageReader` does not; it decodes the full JPEG via PIL. - -The raw-byte-read fix for `ImageReader.open()` is valid and does not introduce any deduplication concern — the storage read is still a real read of the on-disk file (unique bytes are fetched); only the subsequent CPU decode is skipped: - -```python -# Proposed replacement for ImageReader.open() -def open(self, filename): - with open(filename, 'rb') as f: - raw = f.read() - return len(raw) # byte count for telemetry, like ImageReaderS3Iterable - -def get_sample(self, filename, sample_index): - byte_count = self.open_file_map[filename] - dlp.update(image_size=byte_count) - dft_ai.update(image_size=byte_count) -``` - -This eliminates 5–20 ms of PIL decode overhead per sample from the training-step timing. The storage I/O — the thing being measured — is unchanged. - -**NPZ/HDF5 with Compression:** -The same principle applies to read decompression. When `compression=gzip` or `compression=zip` is enabled, the reader spends significant CPU time inflating data that is then discarded. These settings should default to `none`: - -``` -WARNING at startup when compression != NONE: -"compression= is enabled. Benchmark will include CPU decompression -in timings, not pure storage bandwidth. Set compression=none for -accurate storage performance measurement." -``` - -### 9i. Summary of Corrected Recommendations - -| Issue | Correct Action | Incorrect Action (Do Not Do) | -|---|---|---| -| JPEG/PNG generation with non-DALI data loaders | Write raw dgen-py bytes directly — no PIL, no DCT, no Huffman; generation drops from ~30 ms/file to < 0.01 ms/file | Always run PIL encode regardless of whether the reader decodes the file | -| JPEG/PNG generation with DALI / NATIVE_DALI | PIL encode is unavoidable (`fn.decoders.image()` is a real GPU decoder); use `quality=10` + `ThreadPoolExecutor` | Treat DALI path the same as non-DALI and write raw bytes — DALI will throw an error on invalid bitstream | -| TFRecord per-sample protobuf serialization | Use dgen-py for each sample's raw bytes (already done); accept protobuf overhead as format cost | Pre-compute one `Example` blob and replicate it — produces N logically distinct but physically identical records | -| `ImageReader.open()` decodes JPEG to discard | Read raw bytes, store byte count (like `ImageReaderS3Iterable`) | Skip the storage read entirely — would produce an I/O-free benchmark | -| NPZ/HDF5 compression adds CPU overhead | Default `compression: none`; warn at startup when enabled | Add compression without warning — benchmark silently measures CPU, not storage | -| CSV format for storage benchmarking | Document as not recommended; prefer NPY/IndexedBinary | Add multi-format CSV confusion | -| JPEG/PNG for large-scale storage benchmarks with DALI | Document as "inherently generation-slow on DALI path"; recommend NPY/HDF5 for pure I/O testing | Use JPEG/PNG + DALI for billion-file benchmarks where generation time dominates | -| All data generation must use dgen-py | `gen_random_tensor()` via `_generate_files()` already does this — enforce as mandatory, no exceptions | Use `np.zeros`, `np.ones`, or any repeated constant — these produce identical content across files | - ---- - -## 10. Small-File Workload Pathologies (JPEG / PNG) - -### 10a. What "Small File" Means Here - -JPEG and PNG formats always store exactly one sample per file (`num_samples_per_file = 1`). Typical sizes: - -| Workload | Image size | File size | -|---|---|---| -| ImageNet-1K (resnet50) | 224 × 224 × 3 | ~50–150 KB | -| CIFAR-10 | 32 × 32 × 3 | ~2–5 KB | -| Custom satellite / medical | 512 × 512 × 1 | ~100–500 KB | - -Unlike TFRecord, HDF5, or NPZ — which pack hundreds or thousands of samples into one file, amortising open/stat/read latency across many samples — every JPEG/PNG access is a full open → read → decode → close cycle for a single sample. This makes the number of IOPS required proportional to the sample count, not the batch count. - -### 10b. Data Generation Bottleneck - -`_generate_files()` in `data_generator.py` drives every format generator. Its core loop is: - -```python -for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size): - write_fn(i, dim_, dim1, dim2, ...) # serial within rank -``` - -There is no thread pool, no `asyncio`, no `concurrent.futures`. Each call to `write_fn` must complete before the next begins. - -For JPEG and PNG, `write_fn` is: - -```python -# jpeg_generator.py -img = Image.fromarray(arr.astype('uint8'), mode='RGB') -img.save(output, format='JPEG') # CPU-bound encode, ~10–60 ms - -# png_generator.py -img = Image.fromarray(arr.astype('uint8'), mode='RGB') -img.save(output, format='PNG') # CPU-bound lossless encode, ~30–200 ms -``` - -PIL's JPEG and PNG encoders are single-threaded inside each call. JPEG encode at quality 75 typically runs 15–40 ms for a 224×224×3 image on a modern core; PNG is 2–5× slower due to lossless compression. - -**Concrete example — ImageNet-scale dataset (1.28 M files) with NP=8:** - -| Metric | Value | -|---|---| -| Total files | 1,280,000 | -| Files per rank (`N / np`) | 160,000 | -| Encode time (JPEG, 30 ms/file) | 160,000 × 0.030 s ≈ **80 min per rank** | -| Encode time (PNG, 100 ms/file) | 160,000 × 0.100 s ≈ **4.4 hours per rank** | -| Storage write time (100 KB, 1 GiB/s NFS) | 160,000 × 0.0001 s ≈ **16 s** — negligible | - -The bottleneck is not I/O bandwidth — it is pure CPU time for compression. Because each rank is serial, adding more MPI ranks scales generation linearly, but the per-rank CPU time remains unchanged. Doubling NP from 8 to 16 halves the wall-clock time, but only by adding 8 more processes. There is no intra-rank parallelism to exploit the spare CPU cores that sit idle while one thread encodes. - -**Contrast with `hdf5_generator.py` and `npy_generator.py`:** NumPy native binary format saves raw memory-mapped arrays at speeds limited only by storage bandwidth (often 1–5 GiB/s per rank). JPEG/PNG generation is an order of magnitude slower for the same logical data volume. - -### 10c. Data Reading Bottleneck - -`image_reader.py` uses PIL to read files: - -```python -def open(self, filename): - # called once per sample, from a DataLoader worker process - img = Image.open(filename) - data = np.asarray(img) - self.open_file_map[filename] = data -``` - -Each call is a separate system-level open → read → JPEG decode → numpy conversion. There is no read-ahead, no batch opening, and no memory pooling across calls. - -**Throughput ceiling for `read_threads=1` (the default):** - -On NFS (RTT ~1 ms, bandwidth ~10 GiB/s), each file fetch is dominated by per-request latency: - -- Per-file time ≈ RTT + file_size/bandwidth = 1 ms + (115 KB / 10 GiB/s) ≈ 1.01 ms -- Maximum IOPS ≈ 990 files/sec -- Throughput ≈ 990 × 115 KB ≈ **114 MiB/s** — with 10 GiB/s of available bandwidth **98.9% idle** - -With `read_threads=8`: - -- 8 concurrent opens → 8 simultaneous RTTs → IOPS ≈ 7,920 → **912 MiB/s** — still only 9% of NFS bandwidth - -With `read_threads=32`: - -- 32 concurrent opens → IOPS ≈ 31,680 → **3.6 GiB/s** — 36% of NFS bandwidth - -The practical takeaway: **IOPS, not bandwidth, is the binding constraint for small-file JPEG/PNG reading**. The optimal `read_threads` value is `ceil(target_throughput / (file_size / bandwidth) + RTT * target_IOPS)`, which for typical deployments means 16–64 threads per rank, not the default of 1. - -### 10d. No Aggregated-Access Path - -Frameworks such as WebDataset, FFCV, and TFRecord address this problem by grouping many samples into sequential tar or binary shards. A single large sequential read then yields many samples, converting the random-IOPS problem into a streaming-bandwidth problem. DLIO has no sharding path for JPEG or PNG: every benchmark run, at every scale, reads each sample as an individual file. This is by design for the benchmark (measuring actual per-file I/O cost), but it means: - -1. Any benchmark result with JPEG/PNG and `read_threads` < 16 is almost certainly I/O starved, not representative of storage peak capability. -2. Results should always report `read_threads × comm_size` (total concurrent I/O streams) alongside throughput. - -### 10e. Sub-folder Namespace - -`num_subfolders_train` distributes files across sub-directories, reducing directory listing time on large NFS servers. It does not change the fundamental one-file-per-open access pattern. For datasets with > 100 K JPEG/PNG files, sub-folders are necessary to avoid NFS `readdir` stalls, but are not sufficient to close the throughput gap. - ---- - -## 11. `read_threads` — Fixed YAML Value vs. Runtime-Adaptive Sizing - -### 11a. Current Behaviour - -`read_threads` is defined in `ConfigArguments`: - -```python -read_threads: int = 1 # dlio_benchmark/utils/config.py -``` - -It is set at YAML-load time (before MPI is initialized) and passed verbatim to PyTorch `DataLoader(num_workers=read_threads)`. The only runtime check is in `validate()`: - -```python -if self.read_threads > 1: - cores_available = len(psutil.Process().cpu_affinity()) - if cores_available < self.read_threads: - self.logger.warning(...) # logs a warning, zero action taken -``` - -Validation checks the pinned CPU set of the current process, not the actual core count divided by ranks per node. It never modifies `read_threads`, caps it, or auto-computes a value. DLIOMPI's `npernode()` and `nnodes()` are never consulted from `config.py`. - -The `prefetch_factor` fed to PyTorch DataLoader is: - -```python -prefetch_factor = math.ceil(self._args.prefetch_size / self._args.read_threads) -``` - -This means that changing `read_threads` without correspondingly adjusting `prefetch_size` silently changes prefetch aggressiveness, which affects memory consumption and training-step hide latency in ways that are not visible in the YAML. - -### 11b. Thread Budget Analysis Across Deployment Scales - -When `read_threads = 8` is hardcoded (as in `resnet50_a100.yaml`), the total DataLoader worker processes per node is `read_threads × ranks_per_node`: - -| Deployment | ranks/node | read_threads | DataLoader workers/node | Total processes/node | 128-core utilisation | -|---|---|---|---|---|---| -| NP=1, 1 node | 1 | 8 | 8 | 9 | 7% | -| NP=8, 1 node | 8 | 8 | 64 | 72 | 56% | -| NP=8, 8 nodes | 1 | 8 | 8 | 9 | 7% | -| NP=64, 8 nodes | 8 | 8 | 64 | 72 | 56% | - -The same YAML sets the same thread count regardless of whether one or eight ranks share a node. On high-rank-density nodes (NP=8/node), `read_threads=8` allocates 64 reader processes per node and may saturate the NFS client connection pool or cause CPU thrashing. On single-rank nodes, `read_threads=8` leaves most cores idle while I/O is the bottleneck. - -**The correct thread budget formula is:** - -``` -read_threads_per_rank = max(1, floor(available_cores / ranks_per_node / cpu_per_io_thread)) -# For I/O-bound NFS: cpu_per_io_thread ≈ 0.5 (threads mostly sleep on syscalls) -# For CPU-bound JPEG decode: cpu_per_io_thread ≈ 1.0 -# Practical range: [2, 64] -``` - -DLIOMPI can provide all the inputs (`npernode()`, via `MPI.COMM_TYPE_SHARED`), and `os.cpu_count()` or `psutil.cpu_count()` gives the core total. The computation is straightforward but requires MPI to be initialized before validation, which conflicts with the current order of operations (see Section 6b). - -### 11c. The Fixed-vs-Auto Design Decision - -**Arguments for keeping `read_threads` as a fixed YAML integer:** -- Reproducibility: same YAML, same thread count, same result regardless of hardware. -- Simplicity: no implicit logic; user controls the knob directly. -- Explicit: reported clearly in output logs. - -**Arguments for auto-sizing:** -- The "correct" value differs by an order of magnitude between single-node and multi-node deployments of the same YAML. -- The default of 1 is severely under-threaded for any network storage workload. -- Users who do not know to raise `read_threads` will see misleadingly low throughput that is not representative of storage capability. - -**Recommendation:** Support `read_threads: auto` as a special sentinel value. When set to `auto`, compute at runtime: - -```python -import os -ppn = DLIOMPI.get_instance().npernode() -total_cores = os.cpu_count() or 8 -# Reserve 1 core per MPI rank for compute; divide remainder among I/O threads -io_threads = max(1, min(64, (total_cores - ppn) // ppn)) -self.read_threads = io_threads -``` - -Log the resolved value at the start of the run so it appears in benchmark results. Keep the integer form working unchanged for reproducible benchmark runs. - ---- - -## 12. MPI Multi-Host Topology — Available Infrastructure, Missing Integration - -### 12a. What DLIOMPI Already Tracks - -`DLIOMPI.initialize()` uses `MPI.COMM_TYPE_SHARED` to discover per-node topology at startup: - -```python -split_comm = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED) -local_ppn = split_comm.size # ranks sharing this node -self.mpi_local_rank = split_comm.rank -# Gather ppn across all nodes via leader communicator -self.mpi_ppn_list = COMM_WORLD.bcast(ppn_list, root=0) -self.mpi_nodes = len(self.mpi_ppn_list) -self.mpi_node = -``` - -The public API is: - -| Method | Returns | -|---|---| -| `rank()` | Global MPI rank (0…comm_size-1) | -| `size()` | Total MPI world size | -| `local_rank()` | Rank within this node (0…ppn-1) | -| `npernode()` | Ranks on this node (can vary per node) | -| `nnodes()` | Total node count | -| `node()` | Node index for this rank | - -This is a complete node-topology picture. It is used in `statscounter.py` (for the benchmark summary) and in `base_checkpointing.py` (line 424: cross-node checkpoint read offset). It is **not used** in `data_generator.py` or `config.py`. - -### 12b. Scaling Formulas as NP and HOST Vary - -The training sample distribution is: - -``` -samples_per_proc = ceil(total_samples / comm_size) -training_steps = ceil(total_samples / batch_size / comm_size) -``` - -where `comm_size = NP * HOST` (total ranks). These scale correctly with the product, but they contain no node-level term. The formulas do not distinguish between: - -- 64 ranks on 1 node (NP=64, HOST=1): all ranks share the same NFS mount, causing ~64× connection multiplexing -- 64 ranks on 64 nodes (NP=1, HOST=64): each node has a dedicated NFS mount, maximally parallelising metadata operations - -**For JPEG/PNG reading**, effective storage throughput scales as: - -``` -IOPS_total = ranks_total × read_threads × (1 / per_open_latency) -``` - -where `per_open_latency` includes NFS RTT, kernel VFS overhead, and JPEG decode time. This throughput grows with both axes (ranks and threads), but the per-node NFS mount bandwidth caps growth when all ranks share one mount. The benchmark currently cannot express or control which axis scales which way. - -**Concrete scale-up table (JPEG, 115 KB/file, NFS RTT=1ms, BW=10 GiB/s/node):** - -| NP | HOST | comm_size | read_threads | IOPS_total | Throughput | -|---|---|---|---|---|---| -| 1 | 1 | 1 | 1 | 990 | 114 MiB/s | -| 4 | 1 | 4 | 8 | 15,840 | 1.8 GiB/s | -| 8 | 1 | 8 | 8 | 31,680 | 3.6 GiB/s → NFS BW cap (10 GiB/s single mount) | -| 4 | 8 | 32 | 8 | 126,720 | 14.6 GiB/s → 8 × NFS BW cap | -| 8 | 32 | 256 | 8 | 1,013,760 | 116 GiB/s | - -The key insight: **scale-out across hosts is much more effective than adding ranks per node**, because each new host brings a fresh NFS connection budget and independent bandwidth. DLIO's fixed `read_threads` value in YAML does not guide the user toward this topology insight. - -### 12c. File Distribution and Node Locality - -Data generation currently assigns files via: - -```python -for i in range(my_rank, total_files, comm_size): - write_fn(file_list[i], ...) -``` - -This is a round-robin stride across the global rank space. With `num_subfolders_train > 1`, the file-to-subfolder assignment is: - -```python -subfolder = i % num_subfolders_train -``` - -Both mappings are rank-indexed, not node-indexed. If `num_subfolders_train = num_nodes`, the intent might be to give each node its own subfolder for locality, but the actual assignment distributes files from all nodes into all subfolders (because `i % comm_size` spans all ranks, not just the ranks on one node). Ranks on node 0 produce files in all subfolders, as do ranks on node 1, etc. - -For read locality on distributed file systems with per-directory locking (some NFS and Lustre configurations), concentrating each node's reads into its "own" subfolder can reduce contention. The current round-robin prevents this. A node-local assignment would be: - -```python -node_idx = DLIOMPI.get_instance().node() -subfolder = node_idx % num_subfolders_train -``` - -This is not currently implemented. - -### 12d. What Is Missing - -| Gap | Current state | Impact | -|---|---|---| -| `read_threads` not scaled by `npernode()` | Hardcoded YAML integer | Over-commits per-node CPU when ranks/node is high; under-commits on single-rank nodes | -| No intra-rank generation parallelism | Serial `_generate_files()` loop | JPEG/PNG generation CPU-bottlenecked; idle cores cannot be exploited | -| Node-local file affinity not implemented | Round-robin across all ranks | No NFS namespace locality; all nodes contend on all subfolders | -| Benchmark output does not report `npernode()` | `num_hosts` reported, `ppn` not | Cannot reconstruct per-node concurrency from published benchmark results | -| `read_threads` is set before MPI init | Load-time YAML evaluation | Auto-sizing using `npernode()` requires a post-MPI-init resolve step | - -### 12e. Recommendations - -1. **Log MPI topology in benchmark header**: At rank 0, emit `nnodes()`, `npernode()`, and `read_threads` so that any published result has sufficient information to reproduce the I/O concurrency. - -2. **Auto-size `read_threads` post-MPI-init**: If `read_threads: auto` (or `read_threads: 0` as a sentinel), resolve to `max(1, min(64, (os.cpu_count() - npernode()) // npernode()))` after `DLIOMPI.initialize()`. This requires moving the resolution step out of YAML parse and into `derive_configurations()`, which already runs inside the main process after MPI init. - -3. **Add intra-rank concurrency for JPEG/PNG generation**: Wrap the `_generate_files()` loop in a `concurrent.futures.ThreadPoolExecutor`. PIL's JPEG encoder releases the GIL during its C extension work; threads genuinely parallelise the CPU encode. A pool of `min(read_threads, 8)` workers per rank would reduce ImageNet-scale generation from hours to minutes without requiring any MPI changes. - -4. **Node-indexed subfolder assignment**: When `num_subfolders_train == nnodes()`, assign `subfolder = node()` per rank so that all reads for a given training step from one node hit one subfolder. This concentrates hot NFS metadata into per-node directories, reducing cross-node directory contention. - -5. **Document the NP vs HOST scaling trade-off**: Add a section to the benchmark README explaining that for JPEG/PNG workloads, scaling HOST outperforms scaling NP for the same `comm_size`, because each new host brings independent NFS bandwidth. Provide a concrete example using the IOPS formula above. - ---- - -## 13. File vs. Object Workload Asymmetry — Closing the Performance Gap - -### 13a. The Problem: Two Classes of Benchmark with Different Overhead Profiles - -The S3 iterable readers introduced for object storage were built with a correct understanding of DLIO's design principle: the benchmark measures storage throughput, not data transformation throughput. As a result, every S3 iterable reader — `ImageReaderS3Iterable`, `NPYReaderS3Iterable`, `HDF5ReaderS3Iterable`, `TFRecordReaderS3Iterable` — does the following: - -1. Fetch raw bytes from the storage system (the I/O operation being measured). -2. Record the byte count for telemetry (`image_size` metric). -3. Return `self._args.resized_image` (the pre-allocated random tensor). -4. Never decode, decompress, or numpy-convert the fetched bytes. - -The local-filesystem readers — `ImageReader`, `NPYReader`, `HDF5Reader` — do NOT apply this principle. `ImageReader` calls `PIL.Image.open()` and `np.asarray()` on every sample. `NPYReader` calls `np.load()`. `HDF5Reader` performs a full HDF5 chunk read and numpy conversion. All of this CPU work happens inside the training-step timing window, and all of it produces output that is immediately discarded. - -**The result is that the same workload, with the same files, produces fundamentally different benchmark numbers depending solely on whether the storage backend is local FS or object storage.** An object-storage run with `ImageReaderS3Iterable` and a local-FS run with `ImageReader` are not measuring the same thing — even if the physical data is identical. - -### 13b. Quantified Impact of the Asymmetry - -For a JPEG workload at 224×224×3 image size, the per-sample overhead difference: - -| Reader | Storage I/O time | CPU decode time | Total per sample | CPU fraction of total | -|---|---|---|---|---| -| `ImageReaderS3Iterable` (object) | ~1–5 ms net fetch | 0 ms | ~1–5 ms | 0% | -| `ImageReader` (local FS) | ~0.01 ms read | 5–20 ms PIL decode | ~5–21 ms | 71–99% | - -A benchmark using `ImageReader` on a fast NVMe filesystem can show **5–20× lower per-sample throughput than a benchmark using `ImageReaderS3Iterable` on the same data served from an object store** — not because the object store is faster, but because the local-FS reader does far more CPU work. Published benchmark comparisons between the two backend types are therefore not valid without correcting for this asymmetry. - -The same asymmetry exists at generation time: object store YAML configs typically target fewer total files or use NPY/HDF5 format (avoiding JPEG), while local FS YAML configs often use JPEG with no awareness of the PIL encode cost. This is an accident of how the configs evolved, not a deliberate design choice. - -### 13c. Why the Asymmetry Exists - -The object-store readers were written later, after the design principle (Section 9a) was understood. The local-filesystem readers predate that understanding and have not been updated. The S3 iterable reader docstrings explicitly document why decoding is wrong: - -> *"Calling `PIL.Image.open(BytesIO(raw))` on JPEG/PNG data is pure CPU overhead. DLIO's `FormatReader.next()` yields a pre-allocated random tensor regardless of file contents; only the byte count is needed for the image_size telemetry metric."* - -The same rationale applies to `ImageReader`, but that file contains no equivalent comment and no equivalent implementation. The optimization was applied to the new path and never back-ported to the original one. - -For data generators, the object-store configs incidentally avoid the worst-case formats (JPEG/PNG with PIL encode) because they were configured for network-storage scale testing where generation cost is more visible. The local-FS configs retain JPEG/PNG as the default for historical reasons. - -### 13d. The Rationalization Proposal - -The fix is to bring local-filesystem readers up to the standard already established by the S3 iterable readers. This is a code change only — no format changes, no YAML changes, no protocol changes. The storage I/O (the measured operation) is unchanged in every case. - -**Reader rationalization targets (by priority):** - -| Reader | Current behaviour | Rationalized behaviour | Change required | -|---|---|---|---| -| `ImageReader` (local FS JPEG/PNG) | PIL decode + numpy convert | Raw byte read, byte count for telemetry | Replace `PIL.Image.open()` with `open(rb).read()` | -| `NPYReader` (local FS NPY/NPZ) | `np.load()` — allocates full array | Raw byte read, byte count for telemetry | Replace `np.load()` with `open(rb).read()` | -| `HDF5Reader` (local FS HDF5) | `h5py.File()` + dataset slice | `os.stat()` for byte count (HDF5 does not expose raw bytes cleanly) | Use file size from stat, skip h5py decode | -| `TFReader` (TFRecord) | Already returns `resized_image`, no decode | No change needed | ✅ Already correct | -| S3 iterable readers | Already raw byte read | No change needed | ✅ Already correct | - -For `HDF5Reader`, full raw-byte skipping is complicated because HDF5 files contain many datasets and the per-sample byte cost is embedded inside the HDF5 container format. The pragmatic fix is to record the total file size (via `os.stat()`, which is already a real syscall) and use `ceil(file_size / num_samples_per_file)` as the per-sample byte count. This avoids `h5py` decoding while still exercising real storage I/O. - -**Generator rationalization targets:** - -The same data-loader-aware branch described in Section 9g applies to generation. For non-DALI data loaders, JPEG and PNG generators must write raw dgen-py bytes rather than running PIL encode. This produces files that the rationalized `ImageReader` reads correctly (raw bytes, byte count for telemetry). For the DALI path, PIL encode remains necessary and the DALI reader is already correct. - -### 13e. Validation: How to Confirm the Fix Works - -After rationalizing the local-FS readers, a correctly implemented benchmark should satisfy: - -1. **A file-backend and object-backend run of the same workload with the same dataset produce statistically equivalent samples/sec and MiB/s numbers**, adjusted for storage latency and bandwidth differences between the two systems. CPU overhead should not be a confounding variable. - -2. **The fraction of training-step time attributed to I/O wait (as reported in `dlp` traces) should be the dominant fraction (> 80%)** for both backends, for all formats, on any storage system faster than the benchmark's prefetch queue can drain. - -3. **Generator throughput for JPEG/PNG on non-DALI configurations should match NPY generator throughput** (within 2×), because both should be bottlenecked on storage write bandwidth, not CPU encoding. - -If any of these properties does not hold after rationalizing the readers, it indicates a remaining source of CPU overhead that has not been identified or removed. - -### 13f. Configuration-Level Rationalization - -Beyond code changes, the YAML configs should be audited to eliminate format choices that reflect historical defaults rather than deliberate workload simulation decisions: - -1. **Local-FS configs that use JPEG/PNG for non-imaging workloads** (e.g., testing batch read throughput of random data) should be migrated to NPY or HDF5 with compression disabled. This eliminates generation overhead that is independent of the format rationalization. - -2. **Object-store configs that use NPY/HDF5 while local-FS configs use JPEG/PNG for the "same" workload** create an implicit apples-to-oranges comparison. If a workload is defined as JPEG-format vision training, both its local-FS and object-store variants should use identical format settings. The storage backend is the variable; the format should be held constant. - -3. **The `multiprocessing_context` coupling** (Section 6c) means that a rationalized file-backend config and its object-store counterpart must differ in at least one reader setting (`fork` vs `spawn`). This is unavoidable given the Tokio runtime constraint, but should be the ONLY difference between the two, and should be auto-derived from `storage_library` rather than manually set. - -### 13g. Summary of the Rationalization Requirement - -The core requirement is simple: **every reader, for every format, for every storage backend, must behave consistently.** The S3 iterable readers already implement the correct behaviour. The local-filesystem readers must be updated to match. Until that update is made, no published DLIO benchmark result comparing local-filesystem and object-storage throughput can be considered internally consistent, because the benchmarks are not measuring the same thing on both backends. diff --git a/docs/DLIO_Issues_Resolution_26-04-18.md b/docs/DLIO_Issues_Resolution_26-04-18.md new file mode 100644 index 00000000..0c31b516 --- /dev/null +++ b/docs/DLIO_Issues_Resolution_26-04-18.md @@ -0,0 +1,41 @@ +# DLIO Benchmark — Issue Resolution Summary + +**Date:** April 18, 2026 +**Replaces:** `DLIO_IO_Issues-Proposal_2026-03-28.md`, `DLIO_IO_Issues-Executive_Summary_2026-03-28.md`, `DLIO_PR_Plan_26-04-13.md`, `DLIO_PR_Status-26-04-12.md` + +--- + +## All Non-DALI Issues — Fully Resolved + +| Issue | Description | Resolution | +|-------|-------------|------------| +| 1 / 6 | File vs object reader asymmetry; local readers incurring full CPU decode (PIL, NumPy, h5py) while S3 readers did not | `reader/_local_fs_iterable_mixin.py` added: parallel prefetch via `ThreadPoolExecutor`, byte count only, no decode. Affects `ImageReader`, `NPYReader`, `HDF5Reader`, `NPZReader`. | +| 2 | JPEG/PNG generator 300–1000× slower than necessary due to PIL encoding | `jpeg_generator.py` and `png_generator.py` now detect non-DALI loader and write raw bytes, skipping PIL encode. DALI path still produces valid encoded bitstreams. Confirmed: JPEG 3×, PNG 27× speedup. | +| 3 | TFRecord iterative sampler file-index bug: non-zero ranks read wrong files | `config.py` line 719: rank offset is now carried forward through all iterations via `my_rank * files_per_rank + sample_index // num_samples_per_file`. | +| 4 | `read_threads` hardcoded at 1, under-utilizing storage bandwidth | Auto-sized to `min(cpu_count // ranks_per_node, 8)` when user leaves default. Explicit YAML values respected. | +| 5 | Deduplication — files must be byte-unique | Already correct; `data_generator.py` uses `rng.integers(0, 2**63)` per file with `BASE_SEED + my_rank`. No code change needed. | +| 7 | 49 YAML config files with hardcoded lab IPs | External: mlp-storage now supplies endpoint/bucket/library via env vars and CLI overrides. Remaining S3 configs use `localhost` placeholder. | +| 8 | `multiprocessing_context` must be `spawn` for object-storage libraries | `config.py` auto-derives `spawn` when `storage_library` is `s3dlio` or `s3torchconnector`. Dataclass default changed from `fork` to `spawn`. | +| 9 | `storage_library` not wired to standard env vars; poor standalone usability | `config.py` `_apply_env_overrides()` now reads `DLIO_STORAGE_LIBRARY`, `DLIO_BUCKET`, `DLIO_STORAGE_TYPE`, `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_ENDPOINT_URL`, `AWS_REGION`, and an optional `.env` file. | +| 10 / 11 | Data generation serial per rank; object-store uploads blocking | `data_generator.py`: seeds pre-derived in main thread (preserving determinism), then writes dispatched to `ThreadPoolExecutor`. New `write_threads` config field, auto-sized via `ranks_per_node`. | +| 12 | `comm_size` used as thread denominator — wrong on multi-node runs | `DLIOMPI.ranks_per_node()` added (`MPI_Allgather` of hostnames). `read_threads` and `write_threads` auto-sizing now uses `cpu_count // ranks_per_node`. | +| 13 | No settle-time guard after generation on eventual-consistency stores | `post_generation_settle_seconds: float = 0.0` field added. Non-zero + non-LOCAL_FS: rank 0 sleeps, then broadcasts barrier. Default 0.0 — no behavior change for existing configs. | +| 6b | `validate()` called before file lists available | Investigated: not a real bug. `derive_configurations()` does not call `validate()`; the only `validate()` call is in `main.py` after the file-list walk. Resolved with a clarifying comment in `data_generator.py`. | + +--- + +## Outstanding Issues — DALI Only (Deferred: No GPU Available) + +These were identified but not implemented because they require GPU hardware for validation: + +| Issue | Description | State | +|-------|-------------|-------| +| DALI-1 | `shard_id` never passed in `dali_image_reader.py`, `dali_npy_reader.py`, `dali_tfrecord_reader.py` — all multi-rank DALI runs read shard 0 only | Branch `fix/dali-correctness` exists locally. Critical correctness bug — **do not use DALI with `comm_size > 1`** until fixed. | +| DALI-2 | `fn.python_function` callbacks re-introduce the GIL into DALI pipeline; full C++ JPEG decode is done and then discarded | Branch `feat/dali-modernization` exists locally. | +| DALI-3 | DALI 2.0 dynamic executor not adopted; `exec_dynamic=False` still in use | Deferred with DALI-2. | + +--- + +## Minor Remaining Note + +`storage_library` is still accessed via `(self.storage_options or {}).get("storage_library")` rather than as a first-class `ConfigArguments` dataclass field. Functionally correct — the env-var path from Issue 9 populates `storage_options['storage_library']` properly. The dataclass promotion (adding `storage_library: Optional[str] = None` directly) was not done; it is cosmetic and low risk. diff --git a/docs/PageCache_Drop_Between_Epochs_26-04-18.md b/docs/PageCache_Drop_Between_Epochs_26-04-18.md new file mode 100644 index 00000000..92998f73 --- /dev/null +++ b/docs/PageCache_Drop_Between_Epochs_26-04-18.md @@ -0,0 +1,168 @@ +# Page Cache Dropping Between Epochs — Why and How + +**Date:** April 18, 2026 +**Status:** Proposal — documentation only, no code changes made + +--- + +## Why This Matters + +dlio_benchmark is a **storage** benchmark. Its measured throughput should reflect the performance of the storage system (NVMe, NFS, S3, parallel filesystem), not the client machine's DRAM. + +On a local POSIX filesystem, the Linux kernel page cache automatically retains file data in DRAM after the first read. Subsequent reads of the same files are served from memory at 40–80 GB/s rather than from storage hardware. In a multi-epoch benchmark this means: + +- **Epoch 1**: reads come from storage — accurate +- **Epoch 2+**: reads come from the page cache — measures DRAM, not storage + +This is true for every format (NPZ, HDF5, Parquet, Arrow IPC), but the severity depends on the format. Parquet decode is CPU-intensive, which limits the effective read rate and naturally causes page eviction before the next epoch starts. Arrow IPC reads are trivial (memcpy only), so the OS can fully populate the cache and serve all subsequent epochs from DRAM. **Arrow IPC is more accurate for storage benchmarking in principle, but more vulnerable to page cache pollution in practice if the dataset fits in RAM.** + +dlio_benchmark already detects this risk and logs a warning: +``` +WARNING: The amount of dataset is smaller than the host memory; data might be +cached after the first epoch. Increase the size of dataset to eliminate the caching effect! +``` + +This warning is correct but passive. Dropping the page cache between epochs is a stronger, active solution that works regardless of dataset size. + +--- + +## What Dropping the Page Cache Does + +On Linux, writing `3` to `/proc/sys/vm/drop_caches` instructs the kernel to evict all clean pages (page cache, dentries, and inodes) from memory. This is a supported, non-destructive kernel interface — it only releases clean pages that can be re-read from storage; dirty pages (unsaved writes) are never evicted. The operation is instantaneous for moderate dataset sizes. + +```bash +sync && echo 3 | sudo tee /proc/sys/vm/drop_caches > /dev/null +``` + +The `sync` flushes any pending writes before the drop, ensuring no dirty data is discarded. `sudo` is required because the operation is privileged. + +After the drop, every read in the next epoch must come from storage hardware, making all epochs equally accurate. + +--- + +## Why dlio_benchmark Should Do This Automatically + +Currently there is no mechanism in dlio_benchmark to drop the page cache between epochs. The benchmark operator must do it manually before each run, which: + +1. Is error-prone (easy to forget between runs) +2. Does not help for multi-epoch runs within a single invocation +3. Requires a separate out-of-band script or `--pre-run` hook + +Building it into the benchmark ensures consistent, reproducible results across all epochs, not just epoch 1. + +--- + +## Required Code Changes + +Three files require modification. No new dependencies are introduced — only the Linux `/proc/sys/vm/drop_caches` interface is used, which is always available on Linux kernels 2.6.16+. + +### 1. `dlio_benchmark/utils/config.py` — new config field + +Add one boolean field to the `ConfigArguments` dataclass, near `odirect`: + +```python +# In ConfigArguments dataclass, near odirect: bool = False +drop_page_cache: bool = False +``` + +And parse it from YAML in the `ConfigArguments.initialize_args()` / hydra config section (near where `odirect` is parsed): + +```python +# In the reader config parsing block +if 'drop_page_cache' in reader: + args.drop_page_cache = reader['drop_page_cache'] +``` + +--- + +### 2. `dlio_benchmark/main.py` — cache drop helper function and epoch hook + +Add a module-level helper function (alongside the existing `_apply_settle_guard`): + +```python +def _drop_page_cache(args, comm) -> None: + """Drop the Linux page cache between epochs for accurate storage benchmarking. + + Only activates when ALL of the following are true: + - ``args.drop_page_cache`` is True + - ``args.storage_type`` is LOCAL_FS or NFS (object storage is always cache-bypass) + - The process is running on Linux + - Rank 0 has write permission to /proc/sys/vm/drop_caches + + Rank 0 performs the drop; all ranks then barrier so they proceed together. + If the write fails (not root, or not Linux), a warning is logged and + the benchmark continues normally — the setting is advisory, not fatal. + """ + if not args.drop_page_cache: + return + if args.storage_type not in (_StorageType.LOCAL_FS,): + return # object storage is inherently cache-bypass + if args.my_rank == 0: + try: + # sync first to flush dirty pages (safety measure) + import subprocess + subprocess.run(['sync'], check=True) + with open('/proc/sys/vm/drop_caches', 'w') as f: + f.write('3\n') + args.logger.info(f"{utcnow()} Page cache dropped before epoch") + except (OSError, PermissionError) as exc: + args.logger.warning( + f"{utcnow()} drop_page_cache=True but could not write to " + f"/proc/sys/vm/drop_caches: {exc}. " + f"Run as root or grant CAP_SYS_ADMIN, or use 'odirect: true' instead." + ) + comm.barrier() +``` + +Call this function in `run()` inside the epoch loop, **after** `finalize()` (which closes open file handles) and **before** `reconfigure()` / the next epoch's reads: + +```python +# In run(), inside the epoch loop — current code: +self.framework.get_loader(DatasetType.TRAIN).finalize() +if self.do_eval and epoch >= next_eval_epoch: + ... + self.framework.get_loader(DatasetType.VALID).finalize() +self.args.reconfigure(epoch + 1) +self.stats.end_epoch(epoch) + +# Insert the cache drop here, after finalize() and before reconfigure(): +self.framework.get_loader(DatasetType.TRAIN).finalize() +if self.do_eval and epoch >= next_eval_epoch: + ... + self.framework.get_loader(DatasetType.VALID).finalize() +_drop_page_cache(self.args, self.comm) # ← new line +self.args.reconfigure(epoch + 1) +self.stats.end_epoch(epoch) +``` + +The placement is important: +- **After** `finalize()` — ensures all file handles are closed so no pages are pinned +- **Before** `reconfigure()` — ensures the cache is clean before the framework reinitialises its file list for the next epoch +- **Outside** the timing window — `stats.end_epoch()` / `stats.start_epoch()` bracket the actual I/O; the cache drop occurs between them and does not inflate measured throughput + +--- + +### 3. YAML configuration + +```yaml +reader: + odirect: false # O_DIRECT (bypasses cache entirely — preferred) + drop_page_cache: true # Drop page cache between epochs (requires root / CAP_SYS_ADMIN) +``` + +The two settings are complementary, not mutually exclusive: +- `odirect: true` — best accuracy, never populates cache, works without root if the device supports it +- `drop_page_cache: true` — clears cache accumulated during the epoch, requires root but works with any format and any reader +- Using both together is safe and provides belt-and-suspenders cache avoidance + +--- + +## MPI / Distributed Considerations + +In a multi-rank run, only **rank 0** should write to `/proc/sys/vm/drop_caches`. The drop is a local kernel operation — it only affects the machine it runs on. In a distributed job where multiple hosts each run ranks, only one rank per node should issue the drop. The implementation above uses `my_rank == 0` (global rank), which covers the single-host case. For multi-host MPI jobs, this should be extended to `local_rank == 0` (first rank on each node) using `DLIOMPI.get_instance().local_rank()` or equivalent. + +--- + +## Relationship to the Existing `potential_caching` Warning + +`StatsCounter` already computes `potential_caching` and logs a warning when `data_size_per_host_GB <= host_memory_GB`. The `drop_page_cache` feature complements this: rather than relying on the operator to manually increase dataset size, the benchmark can actively ensure cache-clean reads on every epoch. The warning should remain — it informs the operator about the risk — but `drop_page_cache: true` can be offered as the in-benchmark remedy.