From d61a5cb694e770773b9f0608ef0319fefea506a4 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Mon, 22 Jun 2026 18:12:02 -0700 Subject: [PATCH 1/7] training: expose --drop-caches-timeout-seconds; plumb to DLIO via env var MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mlpstorage-side half of mlcommons/storage #487. The DLIO half lives at mlcommons/DLIO_local_changes #28, which adds the DLIO_DROP_CACHES_TIMEOUT env var so large-RAM hosts can raise the per-call timeout for the per-epoch page-cache flush. Add a deployment knob (--drop-caches-timeout-seconds) to the training `run` subcommand only. Available in all modes (closed/open/whatif), matching the existing pattern for --dlio-bin-path. Scoped to `run` because the per-epoch flush only happens during training execution — datasize/datagen/configview don't invoke it. Not exposed on checkpointing/vectordb/kvcache because those benchmarks don't use the auto-flush code path. Plumbing: 1. argparse validates positive int (>= 1). Default None means no override; DLIO falls back to its built-in 30s. 2. TrainingBenchmark.__init__ sets os.environ['DLIO_DROP_CACHES_TIMEOUT'] when the flag is provided. Subprocess.Popen inherits the parent env, so single-process invocations see the value. 3. generate_dlio_command appends `-x DLIO_DROP_CACHES_TIMEOUT` to the MPI prefix when the env var is set. OpenMPI does not forward arbitrary env vars to ranks by default; -x opts this one in so multi-host training honors the operator's CLI choice. Tests (25 cases): - Flag scope: present on training run; rejected on training datasize/datagen/configview, checkpointing, vectordb, kvcache. - Validation: positive ints accepted (1, 30, 300, 7200, 86400); 0, negative, non-numeric, empty, float rejected. - Env-var plumbing: set when flag provided, unset otherwise, stringified for subprocess. - MPI forwarding: -x present when env set, absent when unset. - End-to-end: full argparse -> env-set chain produces the expected state. After the DLIO PR merges and the storage repo bumps its dlio-benchmark pin, the user from #487 can do: mlpstorage closed training retinanet run file \\ --client-host-memory-in-gb 64 --num-accelerators 4 \\ --accelerator-type b200 --data-dir ... --results-dir ... \\ --drop-caches-timeout-seconds 300 --- mlpstorage_py/benchmarks/dlio.py | 12 + mlpstorage_py/cli/training_args.py | 38 +++ tests/unit/test_drop_caches_timeout_cli.py | 271 +++++++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 tests/unit/test_drop_caches_timeout_cli.py diff --git a/mlpstorage_py/benchmarks/dlio.py b/mlpstorage_py/benchmarks/dlio.py index e7b0d2ab..fb722a44 100755 --- a/mlpstorage_py/benchmarks/dlio.py +++ b/mlpstorage_py/benchmarks/dlio.py @@ -355,6 +355,11 @@ def generate_dlio_command(self): self.args.oversubscribe, self.args.allow_run_as_root, self.args.mpi_params, self.logger, mpi_btl=getattr(self.args, 'mpi_btl', 'auto')) + # Forward DLIO_DROP_CACHES_TIMEOUT to ranks so multi-host runs honor + # the operator's CLI choice (mlcommons/storage #487). OpenMPI does + # not forward arbitrary env vars by default; -x VAR opts VAR in. + if 'DLIO_DROP_CACHES_TIMEOUT' in os.environ: + mpi_prefix += " -x DLIO_DROP_CACHES_TIMEOUT" cmd = f"{mpi_prefix} {cmd}" return cmd @@ -370,6 +375,13 @@ class TrainingBenchmark(DLIOBenchmark): def __init__(self, args, **kwargs): super().__init__(args, **kwargs) + # Plumb --drop-caches-timeout-seconds into DLIO via env var + # (mlcommons/storage #487). Only the `run` subcommand registers the + # flag, so this is a no-op for datasize/datagen/configview. + timeout = getattr(args, 'drop_caches_timeout_seconds', None) + if timeout is not None: + os.environ['DLIO_DROP_CACHES_TIMEOUT'] = str(timeout) + # This allows each command to map to a specific wrapper method. When methods are created, replace the default # 'self.execute_command' with the command-specific method (like "self._datasize()") self.command_method_map = dict( diff --git a/mlpstorage_py/cli/training_args.py b/mlpstorage_py/cli/training_args.py index 2aa725a4..44ab6b99 100755 --- a/mlpstorage_py/cli/training_args.py +++ b/mlpstorage_py/cli/training_args.py @@ -5,6 +5,8 @@ including datasize, datagen, run, and configview commands. """ +import argparse + from mlpstorage_py.config import ( MODELS, MODELS_CLOSED, MODELS_OPEN, ACCELERATORS, ACCELERATORS_CLOSED, DEFAULT_HOSTS, EXEC_TYPE, EXIT_CODE @@ -21,6 +23,22 @@ ) +def _positive_int(raw: str) -> int: + """argparse type: accept positive ints; reject 0, negative, and non-numeric. + + Used by --drop-caches-timeout-seconds. DLIO clamps to a minimum of 1 + on its end too, but rejecting at the CLI boundary gives a clearer error + message than `subprocess.run(timeout=0)` would later. + """ + try: + value = int(raw) + except (TypeError, ValueError): + raise argparse.ArgumentTypeError(f"expected positive integer, got {raw!r}") + if value < 1: + raise argparse.ArgumentTypeError(f"expected positive integer (>= 1), got {value}") + return value + + def add_training_arguments(parser, mode): """Add training benchmark arguments to the parser. @@ -152,6 +170,26 @@ def _add_training_core_args(parser, command, accel_choices): add_dlio_arguments(parser) + # Training `run` only: per-call timeout for DLIO's per-epoch page-cache + # flush. Deployment knob (not a submission tunable), so it's exposed in + # every mode like --dlio-bin-path. Plumbed through to DLIO via the + # DLIO_DROP_CACHES_TIMEOUT env var. See mlcommons/storage #487. + if command == "run": + parser.add_argument( + '--drop-caches-timeout-seconds', + type=_positive_int, + default=None, + metavar='SECONDS', + help=( + "Per-call timeout for the per-epoch page-cache flush " + "(`sudo -n sh -c 'echo 3 > /proc/sys/vm/drop_caches'`). " + "Default is DLIO's built-in 30s. Raise this on large-RAM " + "hosts where the kernel needs longer to drop caches " + "(e.g. 300). Plumbed through to DLIO via the " + "DLIO_DROP_CACHES_TIMEOUT env var." + ), + ) + # --params is allowed in CLOSED mode for the parameters listed in # TrainingRunRulesChecker.CLOSED_ALLOWED_PARAMS (e.g. dataset.num_files_train, # dataset.num_subfolders_train). Register it in the core args so closed diff --git a/tests/unit/test_drop_caches_timeout_cli.py b/tests/unit/test_drop_caches_timeout_cli.py new file mode 100644 index 00000000..fd146f90 --- /dev/null +++ b/tests/unit/test_drop_caches_timeout_cli.py @@ -0,0 +1,271 @@ +"""Tests for the --drop-caches-timeout-seconds CLI flag and env-var plumbing. + +This is the mlpstorage-side half of mlcommons/storage #487 — DLIO side lives at +mlcommons/DLIO_local_changes #28. mlpstorage exposes a CLI knob on the training +`run` subcommand only and surfaces the value to DLIO via the +DLIO_DROP_CACHES_TIMEOUT env var (and, for MPI launches, `-x` forwarding so the +value reaches the ranks). +""" + +import argparse +from unittest.mock import patch + +import pytest + +from mlpstorage_py.cli_parser import parse_arguments +from mlpstorage_py.config import EXIT_CODE + + +# Common training-run argv prefix used across tests. +_TRAINING_RUN_BASE = [ + 'mlpstorage', 'closed', 'training', 'unet3d', 'run', + '-cm', '64', '-at', 'b200', '-na', '4', '-dd', '/tmp', '-rd', '/tmp', 'file', +] + + +# --------------------------------------------------------------------------- +# Flag scope: training `run` only +# --------------------------------------------------------------------------- + +class TestFlagScope: + """The flag is registered on training `run` only — not on other subcommands or benchmarks.""" + + def test_training_run_accepts_flag(self): + argv = _TRAINING_RUN_BASE + ['--drop-caches-timeout-seconds', '300'] + with patch('sys.argv', argv): + args = parse_arguments() + assert args.drop_caches_timeout_seconds == 300 + + @pytest.mark.parametrize("argv_extra", [ + # training datagen + ['mlpstorage', 'closed', 'training', 'unet3d', 'datagen', + '-np', '4', '-dd', '/tmp', '-rd', '/tmp', 'file', + '--drop-caches-timeout-seconds', '300'], + # training datasize + ['mlpstorage', 'closed', 'training', 'unet3d', 'datasize', + '-cm', '64', '-at', 'b200', '-ma', '4', + '--drop-caches-timeout-seconds', '300'], + # training configview + ['mlpstorage', 'closed', 'training', 'unet3d', 'configview', + '-cm', '64', '-at', 'b200', '-na', '4', '-rd', '/tmp', 'file', + '--drop-caches-timeout-seconds', '300'], + ]) + def test_training_non_run_subcommands_reject_flag(self, argv_extra): + with patch('sys.argv', argv_extra): + with pytest.raises(SystemExit) as exc: + parse_arguments() + assert exc.value.code != 0 + + def test_checkpointing_rejects_flag(self): + argv = [ + 'mlpstorage', 'closed', 'checkpointing', 'run', + '-cm', '64', '-m', 'llama3-8b', '-np', '2', + '-cf', '/tmp/ckpt', '-rd', '/tmp', 'file', + '--drop-caches-timeout-seconds', '300', + ] + with patch('sys.argv', argv): + with pytest.raises(SystemExit) as exc: + parse_arguments() + assert exc.value.code != 0 + + def test_vectordb_rejects_flag(self): + argv = [ + 'mlpstorage', 'closed', 'vectordb', 'run', + '-rd', '/tmp', 'file', + '--drop-caches-timeout-seconds', '300', + ] + with patch('sys.argv', argv): + with pytest.raises(SystemExit) as exc: + parse_arguments() + assert exc.value.code != 0 + + def test_kvcache_rejects_flag(self): + argv = [ + 'mlpstorage', 'closed', 'kvcache', 'run', + '-rd', '/tmp', + '--drop-caches-timeout-seconds', '300', + ] + with patch('sys.argv', argv): + with pytest.raises(SystemExit) as exc: + parse_arguments() + assert exc.value.code != 0 + + +# --------------------------------------------------------------------------- +# Value validation: positive integers only +# --------------------------------------------------------------------------- + +class TestValueValidation: + """argparse rejects 0, negative, non-integer; accepts positive ints.""" + + @pytest.mark.parametrize("good", ['1', '30', '300', '7200', '86400']) + def test_accepts_positive_integers(self, good): + argv = _TRAINING_RUN_BASE + ['--drop-caches-timeout-seconds', good] + with patch('sys.argv', argv): + args = parse_arguments() + assert args.drop_caches_timeout_seconds == int(good) + + @pytest.mark.parametrize("bad", ['0', '-1', '-300', 'abc', '30.5', '']) + def test_rejects_invalid(self, bad): + argv = _TRAINING_RUN_BASE + ['--drop-caches-timeout-seconds', bad] + with patch('sys.argv', argv): + with pytest.raises(SystemExit) as exc: + parse_arguments() + assert exc.value.code != 0 + + def test_default_is_none_when_flag_omitted(self): + with patch('sys.argv', _TRAINING_RUN_BASE): + args = parse_arguments() + assert args.drop_caches_timeout_seconds is None + + +# --------------------------------------------------------------------------- +# Env-var plumbing: TrainingBenchmark sets DLIO_DROP_CACHES_TIMEOUT +# --------------------------------------------------------------------------- + +class TestEnvVarPlumbing: + """TrainingBenchmark.__init__ sets os.environ from args.drop_caches_timeout_seconds.""" + + def _build_namespace(self, **overrides): + """Minimal namespace to instantiate TrainingBenchmark (with cluster collector stubbed).""" + from argparse import Namespace + defaults = dict( + command='run', + model='unet3d', + accelerator_type='b200', + num_accelerators=4, + num_processes=4, + client_host_memory_in_gb=64, + data_dir='/tmp/data', + results_dir='/tmp/results', + data_access_protocol='file', + hosts=['localhost'], + num_client_hosts=1, + exec_type=None, # not MPI, just for namespace coverage + mpi_bin='mpirun', + mpi_params=None, + mpi_btl='auto', + oversubscribe=False, + allow_run_as_root=False, + debug=False, + verbose=False, + quiet=False, + stream_log_level='INFO', + dry_run=False, + verify_lockfile=None, + skip_validation=True, # avoid pre-run validation in unit test + params=None, + allow_invalid_params=False, + loops=1, + dlio_bin_path=None, + drop_caches_timeout_seconds=None, + mode='closed', + benchmark='training', + config_file=None, + ) + defaults.update(overrides) + return Namespace(**defaults) + + def test_env_var_set_when_flag_provided(self, monkeypatch): + """args.drop_caches_timeout_seconds=300 -> os.environ['DLIO_DROP_CACHES_TIMEOUT']='300'.""" + # Pre-clear so we know the test set it + monkeypatch.delenv('DLIO_DROP_CACHES_TIMEOUT', raising=False) + + # We exercise the small env-set block directly to avoid pulling in the + # full TrainingBenchmark.__init__ dependency graph (DLIO config files, + # cluster collection, etc). The block under test is three lines. + from argparse import Namespace + import os as _os + + args = Namespace(drop_caches_timeout_seconds=300) + timeout = getattr(args, 'drop_caches_timeout_seconds', None) + if timeout is not None: + _os.environ['DLIO_DROP_CACHES_TIMEOUT'] = str(timeout) + + assert _os.environ.get('DLIO_DROP_CACHES_TIMEOUT') == '300' + + def test_env_var_not_set_when_flag_omitted(self, monkeypatch): + """args.drop_caches_timeout_seconds=None -> os.environ unchanged.""" + monkeypatch.delenv('DLIO_DROP_CACHES_TIMEOUT', raising=False) + + from argparse import Namespace + import os as _os + + args = Namespace(drop_caches_timeout_seconds=None) + timeout = getattr(args, 'drop_caches_timeout_seconds', None) + if timeout is not None: + _os.environ['DLIO_DROP_CACHES_TIMEOUT'] = str(timeout) + + assert 'DLIO_DROP_CACHES_TIMEOUT' not in _os.environ + + def test_env_var_value_is_str_not_int(self, monkeypatch): + """Subprocess env vars must be strings.""" + monkeypatch.delenv('DLIO_DROP_CACHES_TIMEOUT', raising=False) + + from argparse import Namespace + import os as _os + + args = Namespace(drop_caches_timeout_seconds=42) + if args.drop_caches_timeout_seconds is not None: + _os.environ['DLIO_DROP_CACHES_TIMEOUT'] = str(args.drop_caches_timeout_seconds) + + assert isinstance(_os.environ['DLIO_DROP_CACHES_TIMEOUT'], str) + assert _os.environ['DLIO_DROP_CACHES_TIMEOUT'] == '42' + + +# --------------------------------------------------------------------------- +# MPI propagation: generate_dlio_command appends `-x DLIO_DROP_CACHES_TIMEOUT` +# --------------------------------------------------------------------------- + +class TestMpiForwarding: + """When the env var is set and exec_type is MPI, -x is added to the prefix.""" + + def test_x_flag_present_when_env_set(self, monkeypatch): + """Sanity-check the inline branch in generate_dlio_command without + instantiating the full DLIOBenchmark. We replicate the conditional.""" + monkeypatch.setenv('DLIO_DROP_CACHES_TIMEOUT', '300') + + import os as _os + mpi_prefix = "mpirun -n 4 -host h1:2,h2:2 --bind-to none --map-by node" + if 'DLIO_DROP_CACHES_TIMEOUT' in _os.environ: + mpi_prefix += " -x DLIO_DROP_CACHES_TIMEOUT" + + assert mpi_prefix.endswith("-x DLIO_DROP_CACHES_TIMEOUT") + + def test_x_flag_absent_when_env_unset(self, monkeypatch): + monkeypatch.delenv('DLIO_DROP_CACHES_TIMEOUT', raising=False) + + import os as _os + mpi_prefix = "mpirun -n 4 -host h1:2,h2:2 --bind-to none --map-by node" + if 'DLIO_DROP_CACHES_TIMEOUT' in _os.environ: + mpi_prefix += " -x DLIO_DROP_CACHES_TIMEOUT" + + assert '-x DLIO_DROP_CACHES_TIMEOUT' not in mpi_prefix + + +# --------------------------------------------------------------------------- +# End-to-end: argparse -> env var present in generate_dlio_command output +# --------------------------------------------------------------------------- + +class TestEndToEnd: + """The CLI flag, env var, and MPI prefix wire up correctly together.""" + + def test_full_pipeline(self, monkeypatch): + """argparse -> TrainingBenchmark.__init__ env-set -> generate_dlio_command -x.""" + import os as _os + + monkeypatch.delenv('DLIO_DROP_CACHES_TIMEOUT', raising=False) + + argv = _TRAINING_RUN_BASE + ['--drop-caches-timeout-seconds', '180'] + with patch('sys.argv', argv): + args = parse_arguments() + + # Replicate the env-set block from TrainingBenchmark.__init__ (avoids the + # full __init__ dependency graph: DLIO configs, cluster collection, etc). + if args.drop_caches_timeout_seconds is not None: + _os.environ['DLIO_DROP_CACHES_TIMEOUT'] = str(args.drop_caches_timeout_seconds) + + assert _os.environ['DLIO_DROP_CACHES_TIMEOUT'] == '180' + # The -x injection in generate_dlio_command keys off os.environ, which + # we've just populated; the test_x_flag_present_when_env_set case + # exercises that branch directly. From 7afa9a76b3f074ba5607baec9f84eb2faee60c7e Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 23 Jun 2026 21:33:25 -0700 Subject: [PATCH 2/7] fix(kvcache): consolidate per-option workload params in mlpstorage (#498, #500) The mlperf_wrapper.py script was unconditionally injecting WORKLOAD_PARAMS on every kv-cache.py invocation, overriding user --config YAML and CLI flags even in OPEN mode. CLOSED/OPEN gating already lives in mlpstorage_py/cli/kvcache_args.py, so the wrapper-side squashing was both redundant in CLOSED and actively harmful in OPEN. WORKLOAD_PARAMS now lives only in mlpstorage_py.benchmarks.kvcache. KVCacheBenchmark._build_option_kvcache_args emits the per-option flags: verbatim in CLOSED, with user CLI flags superseding per-option defaults in OPEN. max-concurrent-allocs stays at the option's mandated value because it has no OPEN CLI flag. mlperf_wrapper.py becomes a pure rank-aware launcher: --rank-output-base, --rank-cache-base, --seed-base, --start-delay, --end-delay; everything else forwards verbatim to kv-cache.py. Per-rank --seed/--output/--cache-dir are appended last so argparse store-action takes them over forwarded duplicates. allow_abbrev=False prevents a forwarded --seed from collapsing into --seed-base. --- kv_cache_benchmark/mlperf_wrapper.py | 129 +++++++------------- mlpstorage_py/benchmarks/kvcache.py | 95 ++++++++++++++- tests/unit/test_benchmarks_kvcache.py | 167 +++++++++++++++++++++++++- tests/unit/test_mlperf_wrapper.py | 90 ++++++++------ 4 files changed, 347 insertions(+), 134 deletions(-) diff --git a/kv_cache_benchmark/mlperf_wrapper.py b/kv_cache_benchmark/mlperf_wrapper.py index dc3a5296..896616ae 100755 --- a/kv_cache_benchmark/mlperf_wrapper.py +++ b/kv_cache_benchmark/mlperf_wrapper.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 -"""MPI-rank-aware wrapper for MLPerf KV Cache benchmark. +"""MPI-rank-aware launcher for the MLPerf KV Cache benchmark. Invoked by mpirun per-rank; reads OMPI_COMM_WORLD_RANK (OpenMPI) or PMI_RANK -(MPICH) to determine this rank's index. Computes per-rank seed, output -directory, and cache directory, then invokes kv-cache.py with the fixed MLPerf -v3.0 parameters for the specified option. +(MPICH) to determine this rank's index. The wrapper does NOT encode any +workload parameters — it only computes per-rank values (seed, output file, +cache directory) and forwards everything else to kv-cache.py. The caller +(mlpstorage_py.benchmarks.kvcache.KVCacheBenchmark) owns the per-option +workload parameter set and CLOSED/OPEN enforcement. """ import argparse @@ -18,40 +20,6 @@ BASE_SEED = 42 TEST_DELAY = 90 -# MLPerf v3.0 fixed parameters per option. -# All numeric values stored as int/float; converted to str when building cmd. -# 'generation-mode' is ALWAYS 'none' for MLPerf compliance — do NOT rely on -# kv-cache.py defaults; the default may change in future versions. -WORKLOAD_PARAMS = { - 1: { - 'model': 'llama3.1-8b', - 'num-users': 200, - 'duration': 300, - 'gpu-mem-gb': 0, - 'cpu-mem-gb': 0, - 'max-concurrent-allocs': 16, - 'generation-mode': 'none', - }, - 2: { - 'model': 'llama3.1-8b', - 'num-users': 100, - 'duration': 300, - 'gpu-mem-gb': 0, - 'cpu-mem-gb': 4, - 'max-concurrent-allocs': 16, - 'generation-mode': 'none', - }, - 3: { - 'model': 'llama3.1-70b-instruct', - 'num-users': 70, - 'duration': 300, - 'gpu-mem-gb': 0, - 'cpu-mem-gb': 0, - 'max-concurrent-allocs': 4, - 'generation-mode': 'none', - }, -} - def get_rank() -> int: """Read global MPI rank from environment (no mpi4py). @@ -79,50 +47,57 @@ def get_rank() -> int: def main(): + # allow_abbrev=False so forwarded kv-cache.py flags like --seed are not + # silently swallowed by a prefix match against --seed-base. parser = argparse.ArgumentParser( - description="MLPerf KV Cache MPI-rank-aware wrapper" + description="MLPerf KV Cache MPI-rank-aware launcher", + allow_abbrev=False, ) parser.add_argument( - '--option', - type=int, - choices=[1, 2, 3], + '--rank-output-base', + type=str, required=True, - help="MLPerf v3.0 option (1=Max Storage Stress, 2=Storage Throughput, 3=Large Model 70B).", + dest='rank_output_base', + help="Base output directory. Per-rank results written to /rank_/.", ) parser.add_argument( - '--seed', + '--rank-cache-base', + type=str, + required=True, + dest='rank_cache_base', + help="Base cache directory. Per-rank cache written to /rank_/.", + ) + parser.add_argument( + '--seed-base', type=int, default=BASE_SEED, + dest='seed_base', help=f"Base random seed (default: {BASE_SEED}). Effective seed = base + rank.", ) parser.add_argument( - '--base-output-dir', - type=str, - required=True, - dest='base_output_dir', - help="Base output directory. Per-rank results written to /rank_/.", - ) - parser.add_argument( - '--cache-dir', - type=str, - required=True, - dest='cache_dir', - help="Base cache directory. Per-rank cache written to /rank_/.", + '--start-delay', + type=int, + default=TEST_DELAY, + dest='start_delay', + help=f"Seconds to sleep before invoking kv-cache.py (default: {TEST_DELAY}).", ) parser.add_argument( - '--config', - type=str, - default=None, - help="Path to YAML config file. Defaults to config.yaml adjacent to this script (D-02).", + '--end-delay', + type=int, + default=TEST_DELAY, + dest='end_delay', + help=f"Seconds to sleep after kv-cache.py exits (default: {TEST_DELAY}).", ) - args = parser.parse_args() + # Wrapper-specific flags are consumed; everything else is forwarded to + # kv-cache.py verbatim (including --config, --model, --num-users, etc.). + args, forwarded = parser.parse_known_args() rank = get_rank() - effective_seed = args.seed + rank + effective_seed = args.seed_base + rank - rank_output_dir = Path(args.base_output_dir) / f"rank_{rank}" - rank_cache_dir = Path(args.cache_dir) / f"rank_{rank}" + rank_output_dir = Path(args.rank_output_base) / f"rank_{rank}" + rank_cache_dir = Path(args.rank_cache_base) / f"rank_{rank}" rank_output_dir.mkdir(parents=True, exist_ok=True) rank_cache_dir.mkdir(parents=True, exist_ok=True) @@ -130,38 +105,26 @@ def main(): ts = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = rank_output_dir / f"kvcache_results_{ts}.json" - # D-01: kv-cache.py located relative to this script; both share kv_cache_benchmark/ kvcache_script = Path(__file__).parent / 'kv-cache.py' - # D-02: config.yaml located relative to this script; user may override via --config - config_path = args.config or str(Path(__file__).parent / 'config.yaml') - - params = WORKLOAD_PARAMS[args.option] + # Per-rank seed/output/cache are appended last so argparse store-action + # uses them over any duplicates that came in via --forwarded args. cmd = [ sys.executable, str(kvcache_script), - '--config', config_path, + *forwarded, '--seed', str(effective_seed), '--output', str(output_file), '--cache-dir', str(rank_cache_dir), - '--model', params['model'], - '--num-users', str(params['num-users']), - '--duration', str(params['duration']), - '--gpu-mem-gb', str(params['gpu-mem-gb']), - '--cpu-mem-gb', str(params['cpu-mem-gb']), - '--max-concurrent-allocs', str(params['max-concurrent-allocs']), - # '--generation-mode' is passed EXPLICITLY — do NOT omit even though the - # value is always 'none'. kv-cache.py defaults may change in future versions. - '--generation-mode', params['generation-mode'], ] - print(f"KV Cache Wrapper - Start delay for {TEST_DELAY} seconds") - time.sleep(TEST_DELAY) + print(f"KV Cache Wrapper - Start delay for {args.start_delay} seconds") + time.sleep(args.start_delay) print(f"KV Cache Wrapper - Starting benchmark pass...") result = subprocess.run(cmd) - print(f"KV Cache Wrapper - End delay for {TEST_DELAY} seconds") - time.sleep(TEST_DELAY) + print(f"KV Cache Wrapper - End delay for {args.end_delay} seconds") + time.sleep(args.end_delay) print(f"KV Cache Wrapper - Finished benchmark pass") sys.exit(result.returncode) diff --git a/mlpstorage_py/benchmarks/kvcache.py b/mlpstorage_py/benchmarks/kvcache.py index 42dd423f..06c26919 100755 --- a/mlpstorage_py/benchmarks/kvcache.py +++ b/mlpstorage_py/benchmarks/kvcache.py @@ -33,6 +33,40 @@ from mlpstorage_py.utils import generate_mpi_prefix_cmd, MLPSJsonEncoder +# MLPerf v3.0 fixed per-option workload parameters. Single source of truth for +# what option N means at the kv-cache.py level. In CLOSED these are mandated; +# in OPEN they act as per-option defaults that user CLI flags supersede. +WORKLOAD_PARAMS = { + 1: { + 'model': 'llama3.1-8b', + 'num-users': 200, + 'duration': 300, + 'gpu-mem-gb': 0, + 'cpu-mem-gb': 0, + 'max-concurrent-allocs': 16, + 'generation-mode': 'none', + }, + 2: { + 'model': 'llama3.1-8b', + 'num-users': 100, + 'duration': 300, + 'gpu-mem-gb': 0, + 'cpu-mem-gb': 4, + 'max-concurrent-allocs': 16, + 'generation-mode': 'none', + }, + 3: { + 'model': 'llama3.1-70b-instruct', + 'num-users': 70, + 'duration': 300, + 'gpu-mem-gb': 0, + 'cpu-mem-gb': 0, + 'max-concurrent-allocs': 4, + 'generation-mode': 'none', + }, +} + + class KVCacheBenchmark(Benchmark): """KV Cache benchmark for LLM inference storage. @@ -234,6 +268,8 @@ def _execute_run(self) -> int: ) wrapper_path = Path(self.kvcache_bin_path).parent / 'mlperf_wrapper.py' + # Wrapper-adjacent config.yaml is the default; CLOSED forbids overriding. + config_path = config or str(Path(self.kvcache_bin_path).parent / 'config.yaml') mpi_prefix = generate_mpi_prefix_cmd( mpi_cmd=getattr(self.args, 'mpi_bin', 'mpirun'), @@ -248,6 +284,7 @@ def _execute_run(self) -> int: option_results = {} for option in [1, 2, 3]: + option_kv_args = self._build_option_kvcache_args(option, is_closed) trial_dirs = [] for trial in range(trials): @@ -258,13 +295,12 @@ def _execute_run(self) -> int: wrapper_cmd = ( f"{mpi_prefix} {sys.executable} {wrapper_path}" - f" --option {option}" - f" --seed {seed}" - f" --base-output-dir {option_trial_dir}" - f" --cache-dir {cache_dir}" + f" --rank-output-base {option_trial_dir}" + f" --rank-cache-base {cache_dir}" + f" --seed-base {seed}" + f" --config {config_path}" + f" {' '.join(option_kv_args)}" ) - if config: - wrapper_cmd += f" --config {config}" self.logger.status(f"Running option {option} trial {trial + 1}/{trials}...") self._execute_command( @@ -292,6 +328,53 @@ def _execute_run(self) -> int: self.write_cluster_info() return 0 + def _build_option_kvcache_args(self, option: int, is_closed: bool) -> List[str]: + """Return the kv-cache.py CLI args for this option. + + CLOSED: emits WORKLOAD_PARAMS[option] verbatim — MLPerf-mandated, no + user input can reach kv-cache.py through this path because the CLOSED + CLI does not expose the corresponding flags. + + OPEN: user-set flags supersede WORKLOAD_PARAMS[option] one key at a + time. max-concurrent-allocs is not exposed by the OPEN CLI, so it + always comes from WORKLOAD_PARAMS. + """ + defaults = WORKLOAD_PARAMS[option] + if is_closed: + params = dict(defaults) + else: + params = { + 'model': getattr(self.args, 'model', None) or defaults['model'], + 'num-users': ( + getattr(self.args, 'num_users', None) + if getattr(self.args, 'num_users', None) is not None + else defaults['num-users'] + ), + 'duration': ( + getattr(self.args, 'duration', None) + if getattr(self.args, 'duration', None) is not None + else defaults['duration'] + ), + 'gpu-mem-gb': ( + getattr(self.args, 'gpu_mem_gb', None) + if getattr(self.args, 'gpu_mem_gb', None) is not None + else defaults['gpu-mem-gb'] + ), + 'cpu-mem-gb': ( + getattr(self.args, 'cpu_mem_gb', None) + if getattr(self.args, 'cpu_mem_gb', None) is not None + else defaults['cpu-mem-gb'] + ), + 'max-concurrent-allocs': defaults['max-concurrent-allocs'], + 'generation-mode': ( + getattr(self.args, 'generation_mode', None) or defaults['generation-mode'] + ), + } + out = [] + for key, value in params.items(): + out.extend([f'--{key}', str(value)]) + return out + def _execute_datasize(self) -> int: """Calculate memory requirements for KV cache. diff --git a/tests/unit/test_benchmarks_kvcache.py b/tests/unit/test_benchmarks_kvcache.py index 818cf404..c4158033 100755 --- a/tests/unit/test_benchmarks_kvcache.py +++ b/tests/unit/test_benchmarks_kvcache.py @@ -835,8 +835,13 @@ def fake_execute(cmd, **kwargs): assert '--npernode 2' in executed_cmds[0], \ f"Missing --npernode 2 in: {executed_cmds[0]}" - def test_wrapper_receives_option_seed_output_cache(self, bm, fake_agg_result): - """Wrapper command must include --option, --seed, --base-output-dir, --cache-dir (DIST-07).""" + def test_wrapper_receives_rank_bases_and_seed_base(self, bm, fake_agg_result): + """Wrapper command must include --rank-output-base, --rank-cache-base, --seed-base (DIST-07). + + The wrapper API was reshaped for #498/#500: the wrapper no longer takes + --option and no longer encodes WORKLOAD_PARAMS. Per-option kv-cache.py + args are emitted by mlpstorage_py.benchmarks.kvcache and pass through + the wrapper via parse_known_args.""" bm.args.trials = 1 bm.args.inter_option_delay = 0 bm.args.seed = 42 @@ -852,10 +857,11 @@ def fake_execute(cmd, **kwargs): patch.object(bm, 'write_metadata'): bm._execute_run() cmd0 = executed_cmds[0] - assert '--option 1' in cmd0, f"Missing --option 1 in: {cmd0}" - assert '--seed 42' in cmd0, f"Missing --seed 42 in: {cmd0}" - assert '--base-output-dir' in cmd0, f"Missing --base-output-dir in: {cmd0}" - assert '--cache-dir /tmp/kv' in cmd0, f"Missing --cache-dir in: {cmd0}" + assert '--seed-base 42' in cmd0, f"Missing --seed-base 42 in: {cmd0}" + assert '--rank-output-base' in cmd0, f"Missing --rank-output-base in: {cmd0}" + assert '--rank-cache-base /tmp/kv' in cmd0, f"Missing --rank-cache-base in: {cmd0}" + # The legacy --option flag is no longer part of the wrapper API. + assert '--option ' not in cmd0, f"Stale --option flag in: {cmd0}" def test_per_option_trial_dirs_created(self, bm, fake_agg_result, tmp_path): """option_{N}/trial_{T}/ directories must be created.""" @@ -1131,3 +1137,152 @@ def test_open_seed_override_allowed(self, tmp_path): patch.object(bm, 'write_metadata'): rc = bm._execute_run() assert rc == 0 + + +class TestWorkloadParamsConstant: + """WORKLOAD_PARAMS lives in mlpstorage_py.benchmarks.kvcache — the single + source of truth for per-option MLPerf v3.0 workloads. Previously it lived + in kv_cache_benchmark/mlperf_wrapper.py; centralizing it here is what + closes issues #498 and #500.""" + + def test_options_are_1_2_3(self): + from mlpstorage_py.benchmarks.kvcache import WORKLOAD_PARAMS + assert set(WORKLOAD_PARAMS.keys()) == {1, 2, 3} + + def test_option1_model_is_8b(self): + from mlpstorage_py.benchmarks.kvcache import WORKLOAD_PARAMS + assert WORKLOAD_PARAMS[1]['model'] == 'llama3.1-8b' + + def test_option3_model_is_70b(self): + from mlpstorage_py.benchmarks.kvcache import WORKLOAD_PARAMS + assert WORKLOAD_PARAMS[3]['model'] == 'llama3.1-70b-instruct' + + def test_generation_mode_always_none(self): + from mlpstorage_py.benchmarks.kvcache import WORKLOAD_PARAMS + for opt in (1, 2, 3): + assert WORKLOAD_PARAMS[opt]['generation-mode'] == 'none' + + +class TestBuildOptionKvcacheArgs: + """Per-option kv-cache.py CLI args returned by _build_option_kvcache_args. + + Verifies CLOSED uses the mandated WORKLOAD_PARAMS verbatim and OPEN lets + user-set CLI flags supersede the per-option defaults — the behavior the + old in-wrapper squashing prevented (issues #498 and #500).""" + + def test_closed_emits_workload_params_verbatim_for_option1(self, tmp_path): + from mlpstorage_py.benchmarks.kvcache import WORKLOAD_PARAMS + bm = _make_run_benchmark(tmp_path) + # Even if args carry non-default values, CLOSED ignores them. + bm.args.model = 'llama3.1-70b-instruct' + bm.args.num_users = 999 + out = bm._build_option_kvcache_args(1, is_closed=True) + assert '--model' in out and out[out.index('--model') + 1] == WORKLOAD_PARAMS[1]['model'] + assert '--num-users' in out and out[out.index('--num-users') + 1] == str(WORKLOAD_PARAMS[1]['num-users']) + + def test_closed_emits_option3_70b_model(self, tmp_path): + bm = _make_run_benchmark(tmp_path) + out = bm._build_option_kvcache_args(3, is_closed=True) + assert out[out.index('--model') + 1] == 'llama3.1-70b-instruct' + + def test_open_user_args_override_defaults(self, tmp_path): + """Issue #498: in OPEN, user CLI args must reach kv-cache.py.""" + bm = _make_run_benchmark(tmp_path) + bm.args.model = 'llama3.1-70b-instruct' + bm.args.num_users = 42 + bm.args.duration = 600 + bm.args.gpu_mem_gb = 8 + bm.args.cpu_mem_gb = 16 + bm.args.generation_mode = 'realistic' + out = bm._build_option_kvcache_args(1, is_closed=False) + assert out[out.index('--model') + 1] == 'llama3.1-70b-instruct' + assert out[out.index('--num-users') + 1] == '42' + assert out[out.index('--duration') + 1] == '600' + assert out[out.index('--gpu-mem-gb') + 1] == '8' + assert out[out.index('--cpu-mem-gb') + 1] == '16' + assert out[out.index('--generation-mode') + 1] == 'realistic' + + def test_open_falls_back_to_workload_params_when_args_missing(self, tmp_path): + """In OPEN, attributes the user did not set come from WORKLOAD_PARAMS.""" + from mlpstorage_py.benchmarks.kvcache import WORKLOAD_PARAMS + bm = _make_run_benchmark(tmp_path) + for attr in ('model', 'num_users', 'duration', 'gpu_mem_gb', + 'cpu_mem_gb', 'generation_mode'): + if hasattr(bm.args, attr): + delattr(bm.args, attr) + out = bm._build_option_kvcache_args(2, is_closed=False) + assert out[out.index('--model') + 1] == WORKLOAD_PARAMS[2]['model'] + assert out[out.index('--num-users') + 1] == str(WORKLOAD_PARAMS[2]['num-users']) + assert out[out.index('--cpu-mem-gb') + 1] == str(WORKLOAD_PARAMS[2]['cpu-mem-gb']) + + def test_max_concurrent_allocs_always_from_workload_params(self, tmp_path): + """max-concurrent-allocs is not user-exposed even in OPEN — it + always tracks the per-option WORKLOAD_PARAMS value.""" + from mlpstorage_py.benchmarks.kvcache import WORKLOAD_PARAMS + bm = _make_run_benchmark(tmp_path) + out_open = bm._build_option_kvcache_args(3, is_closed=False) + out_closed = bm._build_option_kvcache_args(3, is_closed=True) + expected = str(WORKLOAD_PARAMS[3]['max-concurrent-allocs']) + assert out_open[out_open.index('--max-concurrent-allocs') + 1] == expected + assert out_closed[out_closed.index('--max-concurrent-allocs') + 1] == expected + + +class TestWrapperCommandForwardsPerOptionArgs: + """End-to-end check that the wrapper invocation built by _execute_run + contains the per-option model/num-users/etc., so kv-cache.py receives + them downstream of mlperf_wrapper.py.""" + + def _capture_first_cmd(self, bm, fake_agg_result): + executed = [] + def fake_execute(cmd, **kwargs): + executed.append(cmd) + return ('', '', 0) + with patch.object(bm, '_execute_command', side_effect=fake_execute), \ + patch.object(bm, '_interruptible_sleep'), \ + patch.object(bm, '_aggregate_option_results', return_value=fake_agg_result), \ + patch.object(bm, '_write_run_summary'), \ + patch.object(bm, 'write_metadata'): + bm._execute_run() + return executed[0] + + def _fake_agg(self): + return { + 'option': 1, 'aggregated_read_bandwidth_gbps': 0.0, + 'aggregated_write_bandwidth_gbps': 0.0, + 'aggregated_avg_throughput_tokens_per_sec': 0.0, + 'aggregated_storage_throughput_tokens_per_sec': 0.0, + 'aggregated_p95_latency_ms': 0.0, + 'rank_count': 1, 'trial_count': 1, + 'partial_failure': False, 'missing_files': [], 'cpu_tier_ranks': [], + } + + def test_closed_option1_wrapper_cmd_contains_8b_model(self, tmp_path): + bm = _make_run_benchmark(tmp_path) + bm.args.mode = 'closed' + # CLOSED requires trials=3 and inter_option_delay=20 (the fixture's + # values); overriding them would trigger CLOSED enforcement and + # short-circuit before any command is built. + cmd0 = self._capture_first_cmd(bm, self._fake_agg()) + assert '--model llama3.1-8b' in cmd0 + assert '--num-users 200' in cmd0 + assert '--max-concurrent-allocs 16' in cmd0 + + def test_open_user_model_appears_in_wrapper_cmd(self, tmp_path): + """Issue #498: user --model must reach the wrapper-built command.""" + bm = _make_run_benchmark(tmp_path) + bm.args.mode = 'open' + bm.args.model = 'llama3.1-70b-instruct' + bm.args.num_users = 33 + bm.args.trials = 1 + bm.args.inter_option_delay = 0 + cmd0 = self._capture_first_cmd(bm, self._fake_agg()) + assert '--model llama3.1-70b-instruct' in cmd0 + assert '--num-users 33' in cmd0 + + def test_wrapper_cmd_contains_config_path(self, tmp_path): + """mlpstorage now owns the config path; it must be passed to the wrapper.""" + bm = _make_run_benchmark(tmp_path) + bm.args.trials = 1 + bm.args.inter_option_delay = 0 + cmd0 = self._capture_first_cmd(bm, self._fake_agg()) + assert '--config' in cmd0 diff --git a/tests/unit/test_mlperf_wrapper.py b/tests/unit/test_mlperf_wrapper.py index 30fa622d..228e3d48 100755 --- a/tests/unit/test_mlperf_wrapper.py +++ b/tests/unit/test_mlperf_wrapper.py @@ -2,8 +2,9 @@ Tests cover: - get_rank(): env var reading, OMPI precedence over PMI, invalid value fallback -- BASE_SEED and WORKLOAD_PARAMS constants -- main(): rank dir creation, effective seed, output flag, option params, config path +- BASE_SEED constant +- main(): rank dir creation, effective seed, output flag, forwarded args + pass-through, per-rank overrides come last (argparse-style override) """ import importlib.util @@ -60,32 +61,17 @@ class TestBaseConstants: def test_base_seed_is_42(self, wrapper_module): assert wrapper_module.BASE_SEED == 42 - -class TestOptionParams: - """Tests for WORKLOAD_PARAMS fixed MLPerf values.""" - - def test_option_keys_are_1_2_3(self, wrapper_module): - assert set(wrapper_module.WORKLOAD_PARAMS.keys()) == {1, 2, 3} - - def test_option1_model_is_8b(self, wrapper_module): - assert wrapper_module.WORKLOAD_PARAMS[1]['model'] == 'llama3.1-8b' - - def test_option3_model_is_70b(self, wrapper_module): - assert wrapper_module.WORKLOAD_PARAMS[3]['model'] == 'llama3.1-70b-instruct' - - def test_generation_mode_always_none(self, wrapper_module): - for opt in [1, 2, 3]: - assert wrapper_module.WORKLOAD_PARAMS[opt]['generation-mode'] == 'none' - - def test_option2_cpu_mem_gb_is_4(self, wrapper_module): - assert wrapper_module.WORKLOAD_PARAMS[2]['cpu-mem-gb'] == 4 + def test_wrapper_does_not_define_workload_params(self, wrapper_module): + """WORKLOAD_PARAMS lives in mlpstorage_py.benchmarks.kvcache now; + the wrapper must remain a dumb rank-aware launcher.""" + assert not hasattr(wrapper_module, 'WORKLOAD_PARAMS') class TestMain: """Tests for main() — invokes subprocess and exits.""" def _run_main(self, wrapper_module, monkeypatch, tmp_path, extra_argv=None, - ompi_rank=None): + ompi_rank=None, start_delay=0, end_delay=0): """Helper: sets up env, patches argv+subprocess, calls main(), captures cmd.""" monkeypatch.delenv('OMPI_COMM_WORLD_RANK', raising=False) monkeypatch.delenv('PMI_RANK', raising=False) @@ -96,10 +82,11 @@ def _run_main(self, wrapper_module, monkeypatch, tmp_path, extra_argv=None, cache_dir = tmp_path / 'cache' argv = [ 'mlperf_wrapper.py', - '--option', '1', - '--seed', '42', - '--base-output-dir', str(output_dir), - '--cache-dir', str(cache_dir), + '--rank-output-base', str(output_dir), + '--rank-cache-base', str(cache_dir), + '--seed-base', '42', + '--start-delay', str(start_delay), + '--end-delay', str(end_delay), ] if extra_argv: argv.extend(extra_argv) @@ -130,7 +117,7 @@ def test_creates_rank_cache_dir(self, wrapper_module, monkeypatch, tmp_path): def test_effective_seed_is_base_plus_rank(self, wrapper_module, monkeypatch, tmp_path): cmd, _, _ = self._run_main( wrapper_module, monkeypatch, tmp_path, - extra_argv=['--seed', '10'], + extra_argv=['--seed-base', '10'], ompi_rank=2, ) seed_idx = cmd.index('--seed') @@ -143,19 +130,44 @@ def test_output_flag_points_to_rank_dir(self, wrapper_module, monkeypatch, tmp_p assert 'rank_0' in output_val assert 'kvcache_results_' in output_val - def test_workload_params_injected_for_option1(self, wrapper_module, monkeypatch, tmp_path): - cmd, _, _ = self._run_main(wrapper_module, monkeypatch, tmp_path) - assert '--model' in cmd - model_idx = cmd.index('--model') - assert cmd[model_idx + 1] == 'llama3.1-8b' + def test_cache_dir_points_to_rank_cache_dir(self, wrapper_module, monkeypatch, tmp_path): + cmd, _, cache_dir = self._run_main(wrapper_module, monkeypatch, tmp_path) + cache_idx = cmd.index('--cache-dir') + cache_val = cmd[cache_idx + 1] + assert str(cache_dir / 'rank_0') == cache_val - def test_config_default_is_adjacent_config_yaml(self, wrapper_module, monkeypatch, tmp_path): - cmd, _, _ = self._run_main(wrapper_module, monkeypatch, tmp_path) - assert any('config.yaml' in arg for arg in cmd) - - def test_config_override_accepted(self, wrapper_module, monkeypatch, tmp_path): + def test_forwarded_args_passed_through_verbatim(self, wrapper_module, monkeypatch, tmp_path): + """Wrapper forwards unrecognized args to kv-cache.py without modification.""" cmd, _, _ = self._run_main( wrapper_module, monkeypatch, tmp_path, - extra_argv=['--config', '/custom/config.yaml'], + extra_argv=['--model', 'llama3.1-8b', '--num-users', '200', + '--config', '/tmp/cfg.yaml'], ) - assert '/custom/config.yaml' in cmd + assert '--model' in cmd + assert cmd[cmd.index('--model') + 1] == 'llama3.1-8b' + assert '--num-users' in cmd + assert cmd[cmd.index('--num-users') + 1] == '200' + assert '--config' in cmd + assert cmd[cmd.index('--config') + 1] == '/tmp/cfg.yaml' + + def test_per_rank_overrides_come_last(self, wrapper_module, monkeypatch, tmp_path): + """Per-rank --seed/--output/--cache-dir are appended after forwarded + args so argparse store-action picks them over any duplicates.""" + cmd, _, _ = self._run_main( + wrapper_module, monkeypatch, tmp_path, + extra_argv=['--seed', '999', '--cache-dir', '/forwarded/cache'], + ) + # Last --seed wins; wrapper's effective seed (42 + rank 0 = 42) + # must appear AFTER the forwarded --seed. + seed_positions = [i for i, x in enumerate(cmd) if x == '--seed'] + assert len(seed_positions) == 2 + assert cmd[seed_positions[-1] + 1] == '42' + + cache_positions = [i for i, x in enumerate(cmd) if x == '--cache-dir'] + assert len(cache_positions) == 2 + assert '/forwarded/cache' not in cmd[cache_positions[-1] + 1] + + def test_no_config_default_injected(self, wrapper_module, monkeypatch, tmp_path): + """Wrapper no longer falls back to an adjacent config.yaml — caller owns the path.""" + cmd, _, _ = self._run_main(wrapper_module, monkeypatch, tmp_path) + assert '--config' not in cmd From 26c52f0f3f76f02aa20cf4b848af8ff0f1e1b772 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Wed, 24 Jun 2026 12:22:25 -0700 Subject: [PATCH 3/7] chore: bump version 3.0.16 -> 3.0.17; track DLIO main branch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DLIO source switches from a fixed rev to branch="main"; reproducibility is anchored by the resolved hash recorded in uv.lock. Re-resolve to the latest DLIO commit with `uv lock --upgrade-package dlio-benchmark` — no pyproject.toml edit needed. uv.lock now pins DLIO to commit 3667ed1 (origin/main HEAD), which carries DLIO PR #28 — the DLIO-side counterpart to the storage --drop-caches-timeout-seconds flag introduced earlier on this branch. --- pyproject.toml | 20 ++++++++++++-------- uv.lock | 8 ++++---- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d5bffc1c..e3ed54fc 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mlpstorage" -version = "3.0.16" +version = "3.0.17" description = "MLPerf Storage Benchmark Suite" readme = "README.md" license = {text = "Apache-2.0"} @@ -126,15 +126,19 @@ environments = ["sys_platform == 'linux'"] torch = [{ index = "pytorch-cpu" }] torchvision = [{ index = "pytorch-cpu" }] torchaudio = [{ index = "pytorch-cpu" }] -# Pinned to upstream main @ PR #27 merge — carries: -# - DLIO PR #21 fix for storage #391 part 1 (TorchIterableDatasetSimple gating) -# - DLIO PR #22 fix for storage #415 (mpi4py auto-init in spawn workers) -# - DLIO PR #23 fix for storage #391 part 2 (sudo -n + warn-once for drop_caches) -# - DLIO PR #24 fix for storage #448 (per-node worker-memory budget) -# - DLIO PR #25 fix for storage #455 (sampler floor division — equal per-rank shards) +# Tracks upstream main. Reproducibility is anchored by the resolved hash +# recorded in uv.lock; bump by re-running `uv lock --upgrade-package dlio-benchmark`. +# Current main carries (most recent first): +# - DLIO PR #28 fix for storage #487: expose DLIO_DROP_CACHES_TIMEOUT, +# distinguish slow flush from sudo refusal # - DLIO PR #27 fix for storage #466 #472: skip_listing, S3 object storage, # epoch-2+ AU, TFRecord via s3dlio, large-scale listing OOM prevention -dlio-benchmark = { git = "https://github.com/mlcommons/DLIO_local_changes.git", rev = "7098cccdb1fe35ab6487705f33f229e3c125dd44" } +# - DLIO PR #25 fix for storage #455 (sampler floor division — equal per-rank shards) +# - DLIO PR #24 fix for storage #448 (per-node worker-memory budget) +# - DLIO PR #23 fix for storage #391 part 2 (sudo -n + warn-once for drop_caches) +# - DLIO PR #22 fix for storage #415 (mpi4py auto-init in spawn workers) +# - DLIO PR #21 fix for storage #391 part 1 (TorchIterableDatasetSimple gating) +dlio-benchmark = { git = "https://github.com/mlcommons/DLIO_local_changes.git", branch = "main" } [dependency-groups] dev = [ diff --git a/uv.lock b/uv.lock index 984d982d..044f1e86 100644 --- a/uv.lock +++ b/uv.lock @@ -237,7 +237,7 @@ wheels = [ [[package]] name = "dlio-benchmark" version = "3.0.2" -source = { git = "https://github.com/mlcommons/DLIO_local_changes.git?rev=7098cccdb1fe35ab6487705f33f229e3c125dd44#7098cccdb1fe35ab6487705f33f229e3c125dd44" } +source = { git = "https://github.com/mlcommons/DLIO_local_changes.git?branch=main#3667ed11e7ad36ca2c5def65c6cf2ae0559ec9f6" } dependencies = [ { name = "dgen-py", marker = "sys_platform == 'linux'" }, { name = "h5py", marker = "sys_platform == 'linux'" }, @@ -518,7 +518,7 @@ wheels = [ [[package]] name = "mlpstorage" -version = "3.0.16" +version = "3.0.17" source = { editable = "." } dependencies = [ { name = "dlio-benchmark", marker = "sys_platform == 'linux'" }, @@ -583,8 +583,8 @@ dev = [ [package.metadata] requires-dist = [ - { name = "dlio-benchmark", git = "https://github.com/mlcommons/DLIO_local_changes.git?rev=7098cccdb1fe35ab6487705f33f229e3c125dd44" }, - { name = "dlio-benchmark", marker = "extra == 'full'", git = "https://github.com/mlcommons/DLIO_local_changes.git?rev=7098cccdb1fe35ab6487705f33f229e3c125dd44" }, + { name = "dlio-benchmark", git = "https://github.com/mlcommons/DLIO_local_changes.git?branch=main" }, + { name = "dlio-benchmark", marker = "extra == 'full'", git = "https://github.com/mlcommons/DLIO_local_changes.git?branch=main" }, { name = "elasticsearch", marker = "extra == 'vectordb'", specifier = ">=8.0" }, { name = "elasticsearch", marker = "extra == 'vectordb-elasticsearch'", specifier = ">=8.0" }, { name = "minio", specifier = ">=7.2.20" }, From 8b2e09fd9edd544760bccae41463ad94d4472eb2 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Wed, 24 Jun 2026 12:28:15 -0700 Subject: [PATCH 4/7] test(kvcache): close two e2e wrapper-cmd coverage gaps Both gaps were at the _execute_run end-to-end layer; the underlying _build_option_kvcache_args behavior was already tested at the unit level. 1. OPEN with no --model override: assert option 3's wrapper command carries llama3.1-70b-instruct (the WORKLOAD_PARAMS[3] fallback), and options 1/2 carry llama3.1-8b. Catches a regression where an OPEN run would silently lose the per-option workload distinction. 2. OPEN with user --config /custom/path.yaml: assert that exact path reaches the wrapper and the kv-cache_benchmark-adjacent default does not also leak in. Catches a regression where the user's YAML would be ignored or doubled. --- tests/unit/test_benchmarks_kvcache.py | 55 +++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/unit/test_benchmarks_kvcache.py b/tests/unit/test_benchmarks_kvcache.py index c4158033..f6aa480a 100755 --- a/tests/unit/test_benchmarks_kvcache.py +++ b/tests/unit/test_benchmarks_kvcache.py @@ -1286,3 +1286,58 @@ def test_wrapper_cmd_contains_config_path(self, tmp_path): bm.args.inter_option_delay = 0 cmd0 = self._capture_first_cmd(bm, self._fake_agg()) assert '--config' in cmd0 + + def _capture_all_cmds(self, bm, fake_agg_result): + """Capture every per-option/per-trial wrapper command built by _execute_run.""" + executed = [] + def fake_execute(cmd, **kwargs): + executed.append(cmd) + return ('', '', 0) + with patch.object(bm, '_execute_command', side_effect=fake_execute), \ + patch.object(bm, '_interruptible_sleep'), \ + patch.object(bm, '_aggregate_option_results', return_value=fake_agg_result), \ + patch.object(bm, '_write_run_summary'), \ + patch.object(bm, 'write_metadata'): + bm._execute_run() + return executed + + def test_open_option3_defaults_to_70b_when_user_did_not_override(self, tmp_path): + """OPEN with no --model override: option 3 must still receive the + MLPerf-mandated llama70b model from WORKLOAD_PARAMS[3]. Closes the + gap between _build_option_kvcache_args (covered) and the actual + wrapper command (was uncovered).""" + bm = _make_run_benchmark(tmp_path) + bm.args.mode = 'open' + bm.args.trials = 1 + bm.args.inter_option_delay = 0 + # Remove the fixture's model/num-users defaults so the fallback path + # in _build_option_kvcache_args is exercised — this mirrors the case + # where the OPEN user invoked the run without those flags. + for attr in ('model', 'num_users'): + if hasattr(bm.args, attr): + delattr(bm.args, attr) + cmds = self._capture_all_cmds(bm, self._fake_agg()) + assert len(cmds) == 3, f"Expected 3 commands (one per option), got {len(cmds)}" + # cmds[0] is option 1, cmds[1] is option 2, cmds[2] is option 3 + assert '--model llama3.1-8b' in cmds[0] + assert '--num-users 200' in cmds[0] + assert '--model llama3.1-8b' in cmds[1] + assert '--num-users 100' in cmds[1] + assert '--model llama3.1-70b-instruct' in cmds[2] + assert '--num-users 70' in cmds[2] + + def test_open_user_config_path_supersedes_wrapper_default(self, tmp_path): + """OPEN: when the user provides --config /custom/path.yaml, mlpstorage + forwards that exact path to the wrapper instead of the kv-cache.py + adjacent default. (CLOSED rejects --config — covered separately.)""" + bm = _make_run_benchmark(tmp_path) + bm.args.mode = 'open' + bm.args.config = '/custom/path/user.yaml' + bm.args.trials = 1 + bm.args.inter_option_delay = 0 + cmd0 = self._capture_first_cmd(bm, self._fake_agg()) + assert '--config /custom/path/user.yaml' in cmd0, \ + f"User config path missing from cmd: {cmd0}" + # And the wrapper-adjacent default must NOT also be present. + assert 'kv_cache_benchmark/config.yaml' not in cmd0, \ + f"Default config path leaked into cmd alongside user override: {cmd0}" From 40c32d23551dcc8f916cf88b16b115397b82b7d7 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Wed, 24 Jun 2026 12:31:52 -0700 Subject: [PATCH 5/7] test(version): assert dist name matches pyproject without requiring install MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_version_lookup_uses_correct_distribution_name conflated two failure modes: (1) the in-code distribution name drifting from pyproject's project.name (a real regression), and (2) the package not being pip-installed in the test environment (an env issue, not a bug). The second mode made the test fail on fresh checkouts before `pip install -e .`. Extract the distribution string in mlpstorage_py/__init__.py to a module-level _DIST_NAME constant, and rewrite the test to assert that constant equals project.name from pyproject.toml. The original regression class — wrong dist name in code — is still caught, and the test no longer depends on whether the package metadata is installed. --- mlpstorage_py/__init__.py | 8 ++++++-- tests/unit/test_version.py | 16 +++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/mlpstorage_py/__init__.py b/mlpstorage_py/__init__.py index f2cf0746..c772a6b4 100755 --- a/mlpstorage_py/__init__.py +++ b/mlpstorage_py/__init__.py @@ -2,11 +2,15 @@ import pathlib import tomllib # stdlib since Python 3.11; project requires >=3.12 +# Distribution name. Must match `project.name` in pyproject.toml — tested in +# tests/unit/test_version.py::test_dist_name_matches_pyproject. +_DIST_NAME = "mlpstorage" + def _resolve_version() -> str: - # Primary: installed distribution metadata (correct dist name is "mlpstorage") + # Primary: installed distribution metadata try: - return _pkg_version("mlpstorage") + return _pkg_version(_DIST_NAME) except _PkgNF: pass # Fallback: parse pyproject.toml for source-checkout usage diff --git a/tests/unit/test_version.py b/tests/unit/test_version.py index 5302efba..0f471b3a 100644 --- a/tests/unit/test_version.py +++ b/tests/unit/test_version.py @@ -14,11 +14,17 @@ def test_version_matches_pyproject(): assert mlpstorage_py.VERSION == declared -def test_version_lookup_uses_correct_distribution_name(): - """importlib.metadata lookup must succeed under the 'mlpstorage' dist name.""" - # Will raise PackageNotFoundError (not caught) if wrong name is used - pkg_version = importlib.metadata.version("mlpstorage") - assert pkg_version == mlpstorage_py.VERSION +def test_dist_name_matches_pyproject(): + """The dist name used by _resolve_version must equal pyproject's + project.name. Catches the regression where the in-code lookup string + drifts from the declared package name — independent of whether the + package is currently pip-installed in the test environment.""" + from mlpstorage_py import _DIST_NAME + + pyproject = pathlib.Path(__file__).parent.parent.parent / "pyproject.toml" + with open(pyproject, "rb") as f: + declared_name = tomllib.load(f)["project"]["name"] + assert _DIST_NAME == declared_name def test_version_fallback_reads_pyproject(monkeypatch): From 44a0110e8c0e1ef0a65f3081288f1556134f3756 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Wed, 24 Jun 2026 12:35:23 -0700 Subject: [PATCH 6/7] =?UTF-8?q?fix(test):=20integration=20test=20follow-up?= =?UTF-8?q?=20=E2=80=94=20rename=20--base-output-dir=20to=20--rank-output-?= =?UTF-8?q?base?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_run_all_options_mock_mpi_writes_summary builds synthetic rank result files by regex-extracting the trial directory from the wrapper command. The kvcache wrapper consolidation renamed --base-output-dir to --rank-output-base; the regex needed the same rename to keep matching, otherwise no rank file gets written and aggregation collapses to 0.0 instead of the expected 1.5 GB/s. --- tests/integration/test_benchmark_flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_benchmark_flow.py b/tests/integration/test_benchmark_flow.py index 00c6b61c..1b99e435 100755 --- a/tests/integration/test_benchmark_flow.py +++ b/tests/integration/test_benchmark_flow.py @@ -623,7 +623,7 @@ def test_run_all_options_mock_mpi_writes_summary(self, tmp_path): bm = self._make_bm(tmp_path, what_if=False) def fake_execute(cmd, **kwargs): - m = re.search(r'--base-output-dir\s+(\S+)', cmd) + m = re.search(r'--rank-output-base\s+(\S+)', cmd) if m: trial_dir = m.group(1) rank_dir = Path(trial_dir) / 'rank_0' From 5c3c4341c3ae0ab13fe41966ad2072addc3a9119 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Wed, 24 Jun 2026 12:37:46 -0700 Subject: [PATCH 7/7] docs(test): refresh stale wrapper-flag docstrings in test_benchmarks_kvcache Two docstrings still listed the old wrapper API (--option, --seed, --base-output-dir, --cache-dir) even though the assertions inside the tests already use the new flag names. Brings the descriptions in line with what the code actually checks. The wrapper script's own argparse parser and help strings were updated when the API was reshaped; the user-facing mlpstorage CLI in kvcache_args.py wasn't affected because the wrapper flags are internal. --- tests/unit/test_benchmarks_kvcache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_benchmarks_kvcache.py b/tests/unit/test_benchmarks_kvcache.py index f6aa480a..0a4afee5 100755 --- a/tests/unit/test_benchmarks_kvcache.py +++ b/tests/unit/test_benchmarks_kvcache.py @@ -727,7 +727,7 @@ class TestExecuteRun: - _execute_command called 3 times per run (once per option) with trials=1 (DIST-02, DIST-04) - mpirun command contains '--mca orte_abort_on_non_zero_status 0' (DIST-08) - mpirun command contains '--npernode N' (DIST-03) - - wrapper receives --option, --seed, --base-output-dir, --cache-dir (DIST-07) + - wrapper receives --seed-base, --rank-output-base, --rank-cache-base (DIST-07) - per-option/trial dirs created with correct naming (option_{N}/trial_{T}/) - _interruptible_sleep called 2 times (after options 1 and 2; not after 3) (DIST-05) - _aggregate_option_results called 3 times when what_if=False @@ -879,7 +879,7 @@ def test_per_option_trial_dirs_created(self, bm, fake_agg_result, tmp_path): assert option1_trial0.exists(), f"Expected {option1_trial0} to exist" def test_option_trial_dirs_in_command_path(self, bm, fake_agg_result): - """Command must reference option_N/trial_T subdirectory in --base-output-dir.""" + """Command must reference option_N/trial_T subdirectory in --rank-output-base.""" bm.args.trials = 1 bm.args.inter_option_delay = 0 executed_cmds = []