Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions dlio_benchmark/data_generator/parquet_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pyarrow as pa
import pyarrow.parquet as pq

from dlio_benchmark.common.enumerations import Compression
from dlio_benchmark.common.enumerations import Compression, StorageType
from dlio_benchmark.data_generator.data_generator import DataGenerator
from dlio_benchmark.utils.utility import progress, gen_random_tensor, DLIOMPI
import dgen_py as _dgen_py
Expand Down Expand Up @@ -314,7 +314,10 @@ def generate(self):
# When enabled, hand off entirely to s3dlio.generate_and_write_parquet_schema_streaming().
# Row groups are pipelined: generation and multipart upload run
# concurrently — no full-file buffer, peak RAM ~2× one row group.
if self.use_s3dlio_gen and self.parquet_columns:
# Restricted to object storage (S3/AISTORE): s3dlio requires an
# s3:// URI and raises RuntimeError for local paths (issue #385).
_s3_storage = (StorageType.S3, StorageType.AISTORE)
if self.use_s3dlio_gen and self.parquet_columns and self._args.storage_type in _s3_storage:
import s3dlio as _s3dlio
_cols = [(str(c.get('name', 'data')), int(c.get('size', 1)))
for c in self.parquet_columns]
Expand Down
9 changes: 8 additions & 1 deletion dlio_benchmark/data_loader/torch_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,16 +465,23 @@ def read(self):
StorageType.LOCAL_FS,
)
)
_s3_types = (StorageType.S3, StorageType.AISTORE)
# TorchIterableDatasetSimple uses DataLoader(num_workers>0) which forks
# worker processes via os.fork(). On LOCAL_FS, this fork-after-module-import
# pattern causes a ThreadPoolExecutor deadlock (the executor's background
# thread is not fork-safe). Restrict the iterable path to object storage
# (S3/AISTORE) only where the prefetch benefit is most significant and
# the fork issue does not apply. LOCAL_FS falls through to map-style TorchDataset.
use_simple_iterable_dataset = (
self.format_type in _simple_iterable_formats
and not use_rg_iterable_dataset
and self._args.storage_type in _s3_types
)

# Determine concrete reader class name and access pattern for logging.
_opts = getattr(self._args, "storage_options", {}) or {}
_lib = _opts.get("storage_library", "none")
_st = self._args.storage_type
_s3_types = (StorageType.S3, StorageType.AISTORE)
_s3_libs = ("s3dlio", "s3torchconnector", "minio")
_nw = self._args.read_threads

Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading