Skip to content
129 changes: 46 additions & 83 deletions kv_cache_benchmark/mlperf_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env python3
"""MPI-rank-aware wrapper for MLPerf KV Cache benchmark.
"""MPI-rank-aware launcher for the MLPerf KV Cache benchmark.

Invoked by mpirun per-rank; reads OMPI_COMM_WORLD_RANK (OpenMPI) or PMI_RANK
(MPICH) to determine this rank's index. Computes per-rank seed, output
directory, and cache directory, then invokes kv-cache.py with the fixed MLPerf
v3.0 parameters for the specified option.
(MPICH) to determine this rank's index. The wrapper does NOT encode any
workload parameters — it only computes per-rank values (seed, output file,
cache directory) and forwards everything else to kv-cache.py. The caller
(mlpstorage_py.benchmarks.kvcache.KVCacheBenchmark) owns the per-option
workload parameter set and CLOSED/OPEN enforcement.
"""

import argparse
Expand All @@ -18,40 +20,6 @@
BASE_SEED = 42
TEST_DELAY = 90

# MLPerf v3.0 fixed parameters per option.
# All numeric values stored as int/float; converted to str when building cmd.
# 'generation-mode' is ALWAYS 'none' for MLPerf compliance — do NOT rely on
# kv-cache.py defaults; the default may change in future versions.
WORKLOAD_PARAMS = {
1: {
'model': 'llama3.1-8b',
'num-users': 200,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
2: {
'model': 'llama3.1-8b',
'num-users': 100,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 4,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
3: {
'model': 'llama3.1-70b-instruct',
'num-users': 70,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 4,
'generation-mode': 'none',
},
}


def get_rank() -> int:
"""Read global MPI rank from environment (no mpi4py).
Expand Down Expand Up @@ -79,89 +47,84 @@ def get_rank() -> int:


def main():
# allow_abbrev=False so forwarded kv-cache.py flags like --seed are not
# silently swallowed by a prefix match against --seed-base.
parser = argparse.ArgumentParser(
description="MLPerf KV Cache MPI-rank-aware wrapper"
description="MLPerf KV Cache MPI-rank-aware launcher",
allow_abbrev=False,
)
parser.add_argument(
'--option',
type=int,
choices=[1, 2, 3],
'--rank-output-base',
type=str,
required=True,
help="MLPerf v3.0 option (1=Max Storage Stress, 2=Storage Throughput, 3=Large Model 70B).",
dest='rank_output_base',
help="Base output directory. Per-rank results written to <rank-output-base>/rank_<N>/.",
)
parser.add_argument(
'--seed',
'--rank-cache-base',
type=str,
required=True,
dest='rank_cache_base',
help="Base cache directory. Per-rank cache written to <rank-cache-base>/rank_<N>/.",
)
parser.add_argument(
'--seed-base',
type=int,
default=BASE_SEED,
dest='seed_base',
help=f"Base random seed (default: {BASE_SEED}). Effective seed = base + rank.",
)
parser.add_argument(
'--base-output-dir',
type=str,
required=True,
dest='base_output_dir',
help="Base output directory. Per-rank results written to <base_output_dir>/rank_<N>/.",
)
parser.add_argument(
'--cache-dir',
type=str,
required=True,
dest='cache_dir',
help="Base cache directory. Per-rank cache written to <cache_dir>/rank_<N>/.",
'--start-delay',
type=int,
default=TEST_DELAY,
dest='start_delay',
help=f"Seconds to sleep before invoking kv-cache.py (default: {TEST_DELAY}).",
)
parser.add_argument(
'--config',
type=str,
default=None,
help="Path to YAML config file. Defaults to config.yaml adjacent to this script (D-02).",
'--end-delay',
type=int,
default=TEST_DELAY,
dest='end_delay',
help=f"Seconds to sleep after kv-cache.py exits (default: {TEST_DELAY}).",
)

args = parser.parse_args()
# Wrapper-specific flags are consumed; everything else is forwarded to
# kv-cache.py verbatim (including --config, --model, --num-users, etc.).
args, forwarded = parser.parse_known_args()

rank = get_rank()
effective_seed = args.seed + rank
effective_seed = args.seed_base + rank

rank_output_dir = Path(args.base_output_dir) / f"rank_{rank}"
rank_cache_dir = Path(args.cache_dir) / f"rank_{rank}"
rank_output_dir = Path(args.rank_output_base) / f"rank_{rank}"
rank_cache_dir = Path(args.rank_cache_base) / f"rank_{rank}"

rank_output_dir.mkdir(parents=True, exist_ok=True)
rank_cache_dir.mkdir(parents=True, exist_ok=True)

ts = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = rank_output_dir / f"kvcache_results_{ts}.json"

# D-01: kv-cache.py located relative to this script; both share kv_cache_benchmark/
kvcache_script = Path(__file__).parent / 'kv-cache.py'
# D-02: config.yaml located relative to this script; user may override via --config
config_path = args.config or str(Path(__file__).parent / 'config.yaml')

params = WORKLOAD_PARAMS[args.option]

# Per-rank seed/output/cache are appended last so argparse store-action
# uses them over any duplicates that came in via --forwarded args.
cmd = [
sys.executable,
str(kvcache_script),
'--config', config_path,
*forwarded,
'--seed', str(effective_seed),
'--output', str(output_file),
'--cache-dir', str(rank_cache_dir),
'--model', params['model'],
'--num-users', str(params['num-users']),
'--duration', str(params['duration']),
'--gpu-mem-gb', str(params['gpu-mem-gb']),
'--cpu-mem-gb', str(params['cpu-mem-gb']),
'--max-concurrent-allocs', str(params['max-concurrent-allocs']),
# '--generation-mode' is passed EXPLICITLY — do NOT omit even though the
# value is always 'none'. kv-cache.py defaults may change in future versions.
'--generation-mode', params['generation-mode'],
]

print(f"KV Cache Wrapper - Start delay for {TEST_DELAY} seconds")
time.sleep(TEST_DELAY)
print(f"KV Cache Wrapper - Start delay for {args.start_delay} seconds")
time.sleep(args.start_delay)
print(f"KV Cache Wrapper - Starting benchmark pass...")

result = subprocess.run(cmd)
print(f"KV Cache Wrapper - End delay for {TEST_DELAY} seconds")
time.sleep(TEST_DELAY)
print(f"KV Cache Wrapper - End delay for {args.end_delay} seconds")
time.sleep(args.end_delay)
print(f"KV Cache Wrapper - Finished benchmark pass")

sys.exit(result.returncode)
Expand Down
12 changes: 12 additions & 0 deletions mlpstorage_py/benchmarks/dlio.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ def generate_dlio_command(self):
self.args.oversubscribe, self.args.allow_run_as_root,
self.args.mpi_params, self.logger,
mpi_btl=getattr(self.args, 'mpi_btl', 'auto'))
# Forward DLIO_DROP_CACHES_TIMEOUT to ranks so multi-host runs honor
# the operator's CLI choice (mlcommons/storage #487). OpenMPI does
# not forward arbitrary env vars by default; -x VAR opts VAR in.
if 'DLIO_DROP_CACHES_TIMEOUT' in os.environ:
mpi_prefix += " -x DLIO_DROP_CACHES_TIMEOUT"
cmd = f"{mpi_prefix} {cmd}"

return cmd
Expand All @@ -370,6 +375,13 @@ class TrainingBenchmark(DLIOBenchmark):
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)

# Plumb --drop-caches-timeout-seconds into DLIO via env var
# (mlcommons/storage #487). Only the `run` subcommand registers the
# flag, so this is a no-op for datasize/datagen/configview.
timeout = getattr(args, 'drop_caches_timeout_seconds', None)
if timeout is not None:
os.environ['DLIO_DROP_CACHES_TIMEOUT'] = str(timeout)

# This allows each command to map to a specific wrapper method. When methods are created, replace the default
# 'self.execute_command' with the command-specific method (like "self._datasize()")
self.command_method_map = dict(
Expand Down
95 changes: 89 additions & 6 deletions mlpstorage_py/benchmarks/kvcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,40 @@
from mlpstorage_py.utils import generate_mpi_prefix_cmd, MLPSJsonEncoder


# MLPerf v3.0 fixed per-option workload parameters. Single source of truth for
# what option N means at the kv-cache.py level. In CLOSED these are mandated;
# in OPEN they act as per-option defaults that user CLI flags supersede.
WORKLOAD_PARAMS = {
1: {
'model': 'llama3.1-8b',
'num-users': 200,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
2: {
'model': 'llama3.1-8b',
'num-users': 100,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 4,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
3: {
'model': 'llama3.1-70b-instruct',
'num-users': 70,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 4,
'generation-mode': 'none',
},
}


class KVCacheBenchmark(Benchmark):
"""KV Cache benchmark for LLM inference storage.

Expand Down Expand Up @@ -234,6 +268,8 @@ def _execute_run(self) -> int:
)

wrapper_path = Path(self.kvcache_bin_path).parent / 'mlperf_wrapper.py'
# Wrapper-adjacent config.yaml is the default; CLOSED forbids overriding.
config_path = config or str(Path(self.kvcache_bin_path).parent / 'config.yaml')

mpi_prefix = generate_mpi_prefix_cmd(
mpi_cmd=getattr(self.args, 'mpi_bin', 'mpirun'),
Expand All @@ -248,6 +284,7 @@ def _execute_run(self) -> int:

option_results = {}
for option in [1, 2, 3]:
option_kv_args = self._build_option_kvcache_args(option, is_closed)
trial_dirs = []

for trial in range(trials):
Expand All @@ -258,13 +295,12 @@ def _execute_run(self) -> int:

wrapper_cmd = (
f"{mpi_prefix} {sys.executable} {wrapper_path}"
f" --option {option}"
f" --seed {seed}"
f" --base-output-dir {option_trial_dir}"
f" --cache-dir {cache_dir}"
f" --rank-output-base {option_trial_dir}"
f" --rank-cache-base {cache_dir}"
f" --seed-base {seed}"
f" --config {config_path}"
f" {' '.join(option_kv_args)}"
)
if config:
wrapper_cmd += f" --config {config}"

self.logger.status(f"Running option {option} trial {trial + 1}/{trials}...")
self._execute_command(
Expand Down Expand Up @@ -292,6 +328,53 @@ def _execute_run(self) -> int:
self.write_cluster_info()
return 0

def _build_option_kvcache_args(self, option: int, is_closed: bool) -> List[str]:
"""Return the kv-cache.py CLI args for this option.

CLOSED: emits WORKLOAD_PARAMS[option] verbatim — MLPerf-mandated, no
user input can reach kv-cache.py through this path because the CLOSED
CLI does not expose the corresponding flags.

OPEN: user-set flags supersede WORKLOAD_PARAMS[option] one key at a
time. max-concurrent-allocs is not exposed by the OPEN CLI, so it
always comes from WORKLOAD_PARAMS.
"""
defaults = WORKLOAD_PARAMS[option]
if is_closed:
params = dict(defaults)
else:
params = {
'model': getattr(self.args, 'model', None) or defaults['model'],
'num-users': (
getattr(self.args, 'num_users', None)
if getattr(self.args, 'num_users', None) is not None
else defaults['num-users']
),
'duration': (
getattr(self.args, 'duration', None)
if getattr(self.args, 'duration', None) is not None
else defaults['duration']
),
'gpu-mem-gb': (
getattr(self.args, 'gpu_mem_gb', None)
if getattr(self.args, 'gpu_mem_gb', None) is not None
else defaults['gpu-mem-gb']
),
'cpu-mem-gb': (
getattr(self.args, 'cpu_mem_gb', None)
if getattr(self.args, 'cpu_mem_gb', None) is not None
else defaults['cpu-mem-gb']
),
'max-concurrent-allocs': defaults['max-concurrent-allocs'],
'generation-mode': (
getattr(self.args, 'generation_mode', None) or defaults['generation-mode']
),
}
out = []
for key, value in params.items():
out.extend([f'--{key}', str(value)])
return out

def _execute_datasize(self) -> int:
"""Calculate memory requirements for KV cache.

Expand Down
Loading
Loading