Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,7 @@ dmypy.json
#Apple system files
.DS_Store
/.idea/
*venv*
*venv*
# Benchmark simulation output files
sim_*.tsv
sim_*.tsv.zst
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
- **Parquet format** (new) — full schema-driven generation and reading. Supports mixed-dtype column schemas (`int32`, `float32`, `float16`, `uint8`, `bool`, etc.) with configurable embedding sizes, LZ4/ZSTD compression, and per-column filtering on reads. Legacy single-column mode is preserved for backward compatibility.

### Storage Backends
- **S3 / S3-compatible object storage** (new) — three client libraries supported: [s3dlio](https://github.com/russfellows/s3dlio) (recommended, Rust-backed, multi-endpoint), [s3torchconnector](https://github.com/awslabs/s3-connector-for-pytorch) (PyTorch only), and the [MinIO Python SDK](https://min.io/docs/minio/linux/developers/python/API.html).
- **S3 / S3-compatible object storage** (new) — three client libraries supported: [s3dlio](https://pypi.org/project/s3dlio/) (recommended, Rust-backed, multi-endpoint), [s3torchconnector](https://github.com/awslabs/s3-connector-for-pytorch) (PyTorch only), and the [MinIO Python SDK](https://min.io/docs/minio/linux/developers/python/API.html).
- **Multi-endpoint load balancing** — `S3_ENDPOINT_URIS` distributes datagen write load across multiple S3 servers, one endpoint per MPI rank (round-robin). Eliminates single-node bottlenecks for large-scale data generation.
- **Storage env-var overrides** — `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_ENDPOINT_URL`, `AWS_REGION`, `DLIO_BUCKET`, `DLIO_STORAGE_TYPE`, and `DLIO_STORAGE_LIBRARY` are all read automatically; no YAML changes needed for credential injection.
- **Post-generation settle guard** — configurable `post_generation_settle_seconds` for eventual-consistency object stores that need time to propagate written objects before training reads begin.
Expand All @@ -49,7 +49,7 @@ DLIO is an I/O benchmark for Deep Learning. DLIO is aimed at emulating the I/O b

DLIO supports multiple storage backends out of the box:
- **Local filesystem** — the default, for NFS, Lustre, GPFS, and local NVMe
- **AWS S3 / S3-compatible object storage** — via [s3dlio](https://github.com/russfellows/s3dlio), [s3torchconnector](https://github.com/awslabs/s3-connector-for-pytorch), or the [MinIO Python SDK](https://min.io/docs/minio/linux/developers/python/API.html)
- **AWS S3 / S3-compatible object storage** — via [s3dlio](https://pypi.org/project/s3dlio/), [s3torchconnector](https://github.com/awslabs/s3-connector-for-pytorch), or the [MinIO Python SDK](https://min.io/docs/minio/linux/developers/python/API.html)
- **AIStore** — via the native AIStore Python SDK

Object storage backends are configured through the `storage:` block in the workload YAML file (see [Object Storage Configuration](#object-storage-configuration) below).
Expand Down Expand Up @@ -224,7 +224,7 @@ Object storage is enabled by adding a `storage:` block to the workload YAML. Th

| `storage_library` | Description | Framework support |
|---|---|---|
| `s3dlio` | High-performance Rust-backed client via [s3dlio](https://github.com/russfellows/s3dlio). Parallel GET, range optimization, multi-endpoint load balancing. | PyTorch + TensorFlow |
| `s3dlio` | High-performance Rust-backed client via [s3dlio](https://pypi.org/project/s3dlio/). Parallel GET, range optimization, multi-endpoint load balancing. | PyTorch + TensorFlow |
| `s3torchconnector` | AWS S3 Connector for PyTorch — streaming single-file GET. | PyTorch only |
| `minio` | MinIO Python SDK via `ThreadPoolExecutor`. | PyTorch + TensorFlow |

Expand Down Expand Up @@ -376,7 +376,7 @@ The YAML file is loaded through hydra (https://hydra.cc/). The default setting a

* File format support: we only support tfrecord, hdf5, npz, csv, jpg, jpeg formats. Other data formats can be extended.

* Storage backend support: we support local filesystem (`local_fs`), AWS S3 and S3-compatible object stores (`s3`), and AIStore (`aistore`). For S3 storage, three client libraries are available: [s3dlio](https://github.com/russfellows/s3dlio) (recommended), [s3torchconnector](https://github.com/awslabs/s3-connector-for-pytorch) (PyTorch only), and the [MinIO SDK](https://min.io/docs/minio/linux/developers/python/API.html). Other storage backends can be extended.
* Storage backend support: we support local filesystem (`local_fs`), AWS S3 and S3-compatible object stores (`s3`), and AIStore (`aistore`). For S3 storage, three client libraries are available: [s3dlio](https://pypi.org/project/s3dlio/) (recommended), [s3torchconnector](https://github.com/awslabs/s3-connector-for-pytorch) (PyTorch only), and the [MinIO SDK](https://min.io/docs/minio/linux/developers/python/API.html). Other storage backends can be extended.

* Data Loader support: we support reading datasets using TensorFlow tf.data data loader, PyTorch DataLoader, and a set of custom data readers implemented in ./reader. For TensorFlow tf.data data loader, PyTorch DataLoader
- We have complete support for tfrecord format in TensorFlow data loader.
Expand Down
74 changes: 74 additions & 0 deletions dlio_benchmark/configs/workload/dlrm_s3dlio_file.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# dlrm_s3dlio_file.yaml
#
# DLRMv2 parquet workload on local/network filesystem.
#
# Structurally identical to dlrm_s3dlio_s3.yaml for direct comparison.
#
# Two read modes — pass at runtime:
#
# direct (recommended, parity with S3):
# Uses s3dlio with direct:// URIs → O_DIRECT reads, bypasses page cache.
# Same Rust runtime, same GIL-releasing threads as the S3 path.
# ++workload.storage.storage_options.storage_library=direct
#
# posix (buffered fallback):
# Uses open()/seek()/read() via Python thread pool.
# Goes through OS page cache — useful to see caching effect.
# (storage_library unset, or ++workload.storage.storage_options.storage_library=posix)
#
# Data layout: /mnt/test/dlrm/train/img_NN_of_64.parquet
# 64 files, 1M rows each, 123 RGs @ 8192 rows/RG, ~8 MiB/RG compressed
#
# Usage:
# python -m dlio_benchmark.main workload=dlrm_s3dlio_file \
# ++workload.storage.storage_options.storage_library=direct
#
# Dry-run:
# python -m dlio_benchmark.main workload=dlrm_s3dlio_file \
# ++workload.storage.storage_options.storage_library=direct \
# ++workload.storage.storage_options.simulate_io=true

model:
name: dlrm_s3dlio_file

framework: pytorch

workflow:
generate_data: false
train: true
evaluation: false

storage:
storage_type: local_fs
storage_root: /mnt/test
storage_options:
# storage_library: set via ++workload.storage.storage_options.storage_library=<lib>
# Supported: direct (O_DIRECT via s3dlio, recommended), posix (buffered fallback)
prefetch_workers: 64 # threads per worker process (matches S3 config for parity)

dataset:
format: parquet
data_folder: /mnt/test/dlrm # DLIO scans /mnt/test/dlrm/train/*.parquet
num_files_train: 64
num_samples_per_file: 1000000 # 1M rows, 123 RGs @ 8192 rows/RG
record_length_bytes: 1024
record_length_bytes_stdev: 0

reader:
data_loader: pytorch
read_threads: 8 # 8 DataLoader worker processes
prefetch_size: 0
read_type: on_demand
batch_size: 2048

train:
computation_time: 0.000770031
epochs: 1
total_training_steps: -1
seed_change_epoch: false

profiling:
profiler: none

logging:
verbosity: INFO
69 changes: 69 additions & 0 deletions dlio_benchmark/configs/workload/dlrm_s3dlio_s3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# dlrm_s3dlio_s3.yaml
#
# DLRMv2 parquet workload on S3 via s3-ultra (port 9200).
#
# storage_library is NOT set here — pass at runtime:
# ++workload.storage.storage_options.storage_library=s3torchconnector
# ++workload.storage.storage_options.storage_library=s3dlio
# ++workload.storage.storage_options.storage_library=minio
#
# Data layout (64 files, 1M rows each, 123 RGs @ 8192 rows/RG):
# s3://mlp-flux/data/dlrm/train/train/img_NN_of_64.parquet
#
# Usage:
# export AWS_ACCESS_KEY_ID=minioadmin
# export AWS_SECRET_ACCESS_KEY=minioadmin
# export AWS_ENDPOINT_URL=http://127.0.0.1:9200
# python -m dlio_benchmark.main workload=dlrm_s3dlio_s3 \
# ++workload.storage.storage_options.storage_library=s3torchconnector

model:
name: dlrm_s3dlio_s3

framework: pytorch

workflow:
generate_data: false
train: true

storage:
storage_type: s3
storage_root: mlp-flux
storage_options:
# storage_library: set via ++workload.storage.storage_options.storage_library=<lib>
prefetch_workers: 64 # threads per worker process; must match prefetch_window for full pipelining
endpoint_url: http://127.0.0.1:9200

dataset:
format: parquet
data_folder: data/dlrm/train # DLIO auto-appends /train → data/dlrm/train/train/
num_files_train: 64
num_samples_per_file: 1000000 # 1M rows, 123 RGs @ 8192 rows/RG
record_length_bytes: 1024
record_length_bytes_stdev: 0

reader:
data_loader: pytorch
read_threads: 8 # 8 DataLoader worker processes × 8 prefetch_workers = 64 concurrent GETs
prefetch_size: 0 # disable DLIO-level prefetch (reader handles its own window)
read_type: on_demand
batch_size: 2048 # DLRMv2: 2048 real samples per GPU step

train:
# DLRMv2 / B200 trace parameters:
# batch_size=2048 → ~31,250 steps/epoch (64M samples / 2048)
# computation_time → 0.000770031 s/batch measured on B200
computation_time: 0.000770031
epochs: 1
total_training_steps: -1
seed_change_epoch: false

evaluation:
eval_time: 0.0
epochs_between_evals: 0

profiling:
profiler: none

logging:
verbosity: INFO
31 changes: 22 additions & 9 deletions dlio_benchmark/data_generator/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ def _generate_files(self, write_fn, label: str = "Data") -> None:

storage.put_data(out_path_spec, output.getvalue())

**Zero-copy fast path**: if ``write_fn`` returns a non-None value it is
treated as the ready-to-upload payload and is passed directly to
``put_data``, bypassing the ``BytesIO`` step entirely. This is used by
``NPZGenerator`` when s3dlio's ``generate_npz_bytes()`` is available:
the returned ``BytesView`` supports the buffer protocol so s3dlio can
upload it with a single zero-copy pass.

**Parallel semantics** (Issue 10):

Seeds are pre-derived sequentially in the main thread so that
Expand Down Expand Up @@ -165,12 +172,14 @@ def _write_one(job):
progress(i + 1, self.total_files_to_generate, f"Generating {label}")
output = out_path_spec if is_local else io.BytesIO()
worker_rng = np.random.default_rng(seed=file_seed)
write_fn(i, dim_, dim1, dim2, file_seed, worker_rng,
out_path_spec, is_local, output)
payload = write_fn(i, dim_, dim1, dim2, file_seed, worker_rng,
out_path_spec, is_local, output)
if not is_local:
# Pass BytesIO directly so put_data can use getbuffer() (zero-copy
# memoryview) instead of getvalue() which makes a full copy.
self.storage.put_data(out_path_spec, output)
# If write_fn returned a payload (e.g. s3dlio BytesView from
# generate_npz_bytes), use it directly — zero-copy, no BytesIO
# intermediary. Otherwise fall back to the BytesIO content.
self.storage.put_data(out_path_spec,
payload if payload is not None else output)

write_threads = getattr(self._args, 'write_threads', 1)
n_workers = max(1, min(write_threads, len(jobs))) if jobs else 1
Expand Down Expand Up @@ -219,15 +228,19 @@ def _upload(path, buf, sem):
f"Generating {label}")
output = io.BytesIO()
worker_rng = np.random.default_rng(seed=file_seed)
# Generate in main thread (fast; Rust dgen or numpy)
write_fn(i, dim_, dim1, dim2, file_seed, worker_rng,
out_path_spec, False, output)
# Generate in main thread (fast; Rust dgen or numpy).
# A non-None return value is the ready-to-upload payload
# (e.g. s3dlio BytesView from generate_npz_bytes) — use it
# directly to skip the BytesIO intermediary copy.
payload = write_fn(i, dim_, dim1, dim2, file_seed, worker_rng,
out_path_spec, False, output)
upload_data = payload if payload is not None else output
# Block if n_workers uploads are already in flight
# (back-pressure to bound peak RAM usage).
_sem.acquire()
# Submit upload immediately; main thread continues generating.
_futures.append(
pool.submit(_upload, out_path_spec, output, _sem)
pool.submit(_upload, out_path_spec, upload_data, _sem)
)
# Wait for all in-flight uploads before leaving the with block.
for f in _futures:
Expand Down
20 changes: 16 additions & 4 deletions dlio_benchmark/data_generator/npz_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,16 @@ def __init__(self):
def generate(self):
"""
Generator for creating data in NPZ format of 3d dataset.
Uses the base-class template for seeding, BytesIO, and put_data.
Bug fix: pass output.getvalue() (bytes) to put_data, not the BytesIO object.

Fast path (s3dlio available, no ZIP compression, object storage):
generate_npz_bytes() produces a BytesView in Rust (hardware CRC32,
Rayon fill, no GIL). _write() returns it directly; _generate_files()
passes it straight to put_data() → MultipartUploadWriter — zero copies
of the payload at any point in the Python layer.

Slow path (numpy fallback or local FS):
np.savez() writes into BytesIO; put_data() reads via getbuffer()
(zero-copy memoryview).
"""
super().generate()
dtype = self._args.record_element_dtype
Expand All @@ -69,9 +77,13 @@ def _write(i, dim_, dim1, dim2, file_seed, rng,
if is_local:
with open(output, "wb") as f:
f.write(npz_view)
return None
else:
output.write(npz_view)
return
# Return the BytesView directly — zero-copy.
# _generate_files() uses the return value as the upload
# payload, bypassing the BytesIO write entirely.
# No Python-side copy of the 140 MiB buffer occurs.
return npz_view
# ── Slow path: numpy fallback ─────────────────────────────────
if isinstance(dim_, list):
records = gen_random_tensor(
Expand Down
15 changes: 15 additions & 0 deletions dlio_benchmark/data_generator/parquet_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self):
self.partition_by = getattr(self._args, 'parquet_partition_by', None)
batch = getattr(self._args, 'parquet_generation_batch_size', 0)
self.generation_batch_size = batch if batch > 0 else self.row_group_size
self.use_s3dlio_gen = getattr(self._args, 'parquet_use_s3dlio_gen', False)

# ── Schema ───────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -309,6 +310,20 @@ def generate(self):

out_path_spec = self.storage.get_uri(self._file_list[i])

# ── s3dlio pure-Rust generation path (streaming) ────────────────
# When enabled, hand off entirely to s3dlio.generate_and_write_parquet_schema_streaming().
# Row groups are pipelined: generation and multipart upload run
# concurrently — no full-file buffer, peak RAM ~2× one row group.
if self.use_s3dlio_gen and self.parquet_columns:
import s3dlio as _s3dlio
_cols = [(str(c.get('name', 'data')), int(c.get('size', 1)))
for c in self.parquet_columns]
_num_rg = max(1, self.num_samples // self.row_group_size)
_s3dlio.generate_and_write_parquet_schema_streaming(
out_path_spec, _cols, self.row_group_size, _num_rg)
continue
# ─────────────────────────────────────────────────────────────────

dim_raw = dim[2 * i]
if isinstance(dim_raw, list):
dim1 = int(dim_raw[0])
Expand Down
Loading
Loading