Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 46 additions & 83 deletions kv_cache_benchmark/mlperf_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env python3
"""MPI-rank-aware wrapper for MLPerf KV Cache benchmark.
"""MPI-rank-aware launcher for the MLPerf KV Cache benchmark.

Invoked by mpirun per-rank; reads OMPI_COMM_WORLD_RANK (OpenMPI) or PMI_RANK
(MPICH) to determine this rank's index. Computes per-rank seed, output
directory, and cache directory, then invokes kv-cache.py with the fixed MLPerf
v3.0 parameters for the specified option.
(MPICH) to determine this rank's index. The wrapper does NOT encode any
workload parameters — it only computes per-rank values (seed, output file,
cache directory) and forwards everything else to kv-cache.py. The caller
(mlpstorage_py.benchmarks.kvcache.KVCacheBenchmark) owns the per-option
workload parameter set and CLOSED/OPEN enforcement.
"""

import argparse
Expand All @@ -18,40 +20,6 @@
BASE_SEED = 42
TEST_DELAY = 90

# MLPerf v3.0 fixed parameters per option.
# All numeric values stored as int/float; converted to str when building cmd.
# 'generation-mode' is ALWAYS 'none' for MLPerf compliance — do NOT rely on
# kv-cache.py defaults; the default may change in future versions.
WORKLOAD_PARAMS = {
1: {
'model': 'llama3.1-8b',
'num-users': 200,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
2: {
'model': 'llama3.1-8b',
'num-users': 100,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 4,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
3: {
'model': 'llama3.1-70b-instruct',
'num-users': 70,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 4,
'generation-mode': 'none',
},
}


def get_rank() -> int:
"""Read global MPI rank from environment (no mpi4py).
Expand Down Expand Up @@ -79,89 +47,84 @@ def get_rank() -> int:


def main():
# allow_abbrev=False so forwarded kv-cache.py flags like --seed are not
# silently swallowed by a prefix match against --seed-base.
parser = argparse.ArgumentParser(
description="MLPerf KV Cache MPI-rank-aware wrapper"
description="MLPerf KV Cache MPI-rank-aware launcher",
allow_abbrev=False,
)
parser.add_argument(
'--option',
type=int,
choices=[1, 2, 3],
'--rank-output-base',
type=str,
required=True,
help="MLPerf v3.0 option (1=Max Storage Stress, 2=Storage Throughput, 3=Large Model 70B).",
dest='rank_output_base',
help="Base output directory. Per-rank results written to <rank-output-base>/rank_<N>/.",
)
parser.add_argument(
'--seed',
'--rank-cache-base',
type=str,
required=True,
dest='rank_cache_base',
help="Base cache directory. Per-rank cache written to <rank-cache-base>/rank_<N>/.",
)
parser.add_argument(
'--seed-base',
type=int,
default=BASE_SEED,
dest='seed_base',
help=f"Base random seed (default: {BASE_SEED}). Effective seed = base + rank.",
)
parser.add_argument(
'--base-output-dir',
type=str,
required=True,
dest='base_output_dir',
help="Base output directory. Per-rank results written to <base_output_dir>/rank_<N>/.",
)
parser.add_argument(
'--cache-dir',
type=str,
required=True,
dest='cache_dir',
help="Base cache directory. Per-rank cache written to <cache_dir>/rank_<N>/.",
'--start-delay',
type=int,
default=TEST_DELAY,
dest='start_delay',
help=f"Seconds to sleep before invoking kv-cache.py (default: {TEST_DELAY}).",
)
parser.add_argument(
'--config',
type=str,
default=None,
help="Path to YAML config file. Defaults to config.yaml adjacent to this script (D-02).",
'--end-delay',
type=int,
default=TEST_DELAY,
dest='end_delay',
help=f"Seconds to sleep after kv-cache.py exits (default: {TEST_DELAY}).",
)

args = parser.parse_args()
# Wrapper-specific flags are consumed; everything else is forwarded to
# kv-cache.py verbatim (including --config, --model, --num-users, etc.).
args, forwarded = parser.parse_known_args()

rank = get_rank()
effective_seed = args.seed + rank
effective_seed = args.seed_base + rank

rank_output_dir = Path(args.base_output_dir) / f"rank_{rank}"
rank_cache_dir = Path(args.cache_dir) / f"rank_{rank}"
rank_output_dir = Path(args.rank_output_base) / f"rank_{rank}"
rank_cache_dir = Path(args.rank_cache_base) / f"rank_{rank}"

rank_output_dir.mkdir(parents=True, exist_ok=True)
rank_cache_dir.mkdir(parents=True, exist_ok=True)

ts = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = rank_output_dir / f"kvcache_results_{ts}.json"

# D-01: kv-cache.py located relative to this script; both share kv_cache_benchmark/
kvcache_script = Path(__file__).parent / 'kv-cache.py'
# D-02: config.yaml located relative to this script; user may override via --config
config_path = args.config or str(Path(__file__).parent / 'config.yaml')

params = WORKLOAD_PARAMS[args.option]

# Per-rank seed/output/cache are appended last so argparse store-action
# uses them over any duplicates that came in via --forwarded args.
cmd = [
sys.executable,
str(kvcache_script),
'--config', config_path,
*forwarded,
'--seed', str(effective_seed),
'--output', str(output_file),
'--cache-dir', str(rank_cache_dir),
'--model', params['model'],
'--num-users', str(params['num-users']),
'--duration', str(params['duration']),
'--gpu-mem-gb', str(params['gpu-mem-gb']),
'--cpu-mem-gb', str(params['cpu-mem-gb']),
'--max-concurrent-allocs', str(params['max-concurrent-allocs']),
# '--generation-mode' is passed EXPLICITLY — do NOT omit even though the
# value is always 'none'. kv-cache.py defaults may change in future versions.
'--generation-mode', params['generation-mode'],
]

print(f"KV Cache Wrapper - Start delay for {TEST_DELAY} seconds")
time.sleep(TEST_DELAY)
print(f"KV Cache Wrapper - Start delay for {args.start_delay} seconds")
time.sleep(args.start_delay)
print(f"KV Cache Wrapper - Starting benchmark pass...")

result = subprocess.run(cmd)
print(f"KV Cache Wrapper - End delay for {TEST_DELAY} seconds")
time.sleep(TEST_DELAY)
print(f"KV Cache Wrapper - End delay for {args.end_delay} seconds")
time.sleep(args.end_delay)
print(f"KV Cache Wrapper - Finished benchmark pass")

sys.exit(result.returncode)
Expand Down
95 changes: 89 additions & 6 deletions mlpstorage_py/benchmarks/kvcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,40 @@
from mlpstorage_py.utils import generate_mpi_prefix_cmd, MLPSJsonEncoder


# MLPerf v3.0 fixed per-option workload parameters. Single source of truth for
# what option N means at the kv-cache.py level. In CLOSED these are mandated;
# in OPEN they act as per-option defaults that user CLI flags supersede.
WORKLOAD_PARAMS = {
1: {
'model': 'llama3.1-8b',
'num-users': 200,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
2: {
'model': 'llama3.1-8b',
'num-users': 100,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 4,
'max-concurrent-allocs': 16,
'generation-mode': 'none',
},
3: {
'model': 'llama3.1-70b-instruct',
'num-users': 70,
'duration': 300,
'gpu-mem-gb': 0,
'cpu-mem-gb': 0,
'max-concurrent-allocs': 4,
'generation-mode': 'none',
},
}


class KVCacheBenchmark(Benchmark):
"""KV Cache benchmark for LLM inference storage.

Expand Down Expand Up @@ -234,6 +268,8 @@ def _execute_run(self) -> int:
)

wrapper_path = Path(self.kvcache_bin_path).parent / 'mlperf_wrapper.py'
# Wrapper-adjacent config.yaml is the default; CLOSED forbids overriding.
config_path = config or str(Path(self.kvcache_bin_path).parent / 'config.yaml')

mpi_prefix = generate_mpi_prefix_cmd(
mpi_cmd=getattr(self.args, 'mpi_bin', 'mpirun'),
Expand All @@ -248,6 +284,7 @@ def _execute_run(self) -> int:

option_results = {}
for option in [1, 2, 3]:
option_kv_args = self._build_option_kvcache_args(option, is_closed)
trial_dirs = []

for trial in range(trials):
Expand All @@ -258,13 +295,12 @@ def _execute_run(self) -> int:

wrapper_cmd = (
f"{mpi_prefix} {sys.executable} {wrapper_path}"
f" --option {option}"
f" --seed {seed}"
f" --base-output-dir {option_trial_dir}"
f" --cache-dir {cache_dir}"
f" --rank-output-base {option_trial_dir}"
f" --rank-cache-base {cache_dir}"
f" --seed-base {seed}"
f" --config {config_path}"
f" {' '.join(option_kv_args)}"
)
if config:
wrapper_cmd += f" --config {config}"

self.logger.status(f"Running option {option} trial {trial + 1}/{trials}...")
self._execute_command(
Expand Down Expand Up @@ -292,6 +328,53 @@ def _execute_run(self) -> int:
self.write_cluster_info()
return 0

def _build_option_kvcache_args(self, option: int, is_closed: bool) -> List[str]:
"""Return the kv-cache.py CLI args for this option.

CLOSED: emits WORKLOAD_PARAMS[option] verbatim — MLPerf-mandated, no
user input can reach kv-cache.py through this path because the CLOSED
CLI does not expose the corresponding flags.

OPEN: user-set flags supersede WORKLOAD_PARAMS[option] one key at a
time. max-concurrent-allocs is not exposed by the OPEN CLI, so it
always comes from WORKLOAD_PARAMS.
"""
defaults = WORKLOAD_PARAMS[option]
if is_closed:
params = dict(defaults)
else:
params = {
'model': getattr(self.args, 'model', None) or defaults['model'],
'num-users': (
getattr(self.args, 'num_users', None)
if getattr(self.args, 'num_users', None) is not None
else defaults['num-users']
),
'duration': (
getattr(self.args, 'duration', None)
if getattr(self.args, 'duration', None) is not None
else defaults['duration']
),
'gpu-mem-gb': (
getattr(self.args, 'gpu_mem_gb', None)
if getattr(self.args, 'gpu_mem_gb', None) is not None
else defaults['gpu-mem-gb']
),
'cpu-mem-gb': (
getattr(self.args, 'cpu_mem_gb', None)
if getattr(self.args, 'cpu_mem_gb', None) is not None
else defaults['cpu-mem-gb']
),
'max-concurrent-allocs': defaults['max-concurrent-allocs'],
'generation-mode': (
getattr(self.args, 'generation_mode', None) or defaults['generation-mode']
),
}
out = []
for key, value in params.items():
out.extend([f'--{key}', str(value)])
return out

def _execute_datasize(self) -> int:
"""Calculate memory requirements for KV cache.

Expand Down
Loading
Loading