Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
119 changes: 74 additions & 45 deletions src/endpoints_submission_cli/submissions/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,23 @@
into the SubmissionChecker-compatible layout:

<org>/
systems/<system_id>.json
pareto/<system_id>/<model>/points/point_<concurrency>.yaml
pareto/<system_id>/<model>/results/point_<concurrency>/
mlperf_endpoints_log_summary.json
mlperf_endpoints_log_detail.json
pareto/<system_id>/<model>/accuracy/
accuracy_result.json
accuracy.txt
<system_id>/
system_desc.json
<model>/
sweep_summary.csv
sweep_distributions.csv
r<concurrency>/
point.yaml
mlperf_endpoints_log_summary.json
mlperf_endpoints_log_detail.json
run_metadata.json
report.txt
accuracy/
accuracy_result.json
accuracy.txt
src/
<implementation>/
(endpoint interface code)
"""

from __future__ import annotations
Expand Down Expand Up @@ -88,16 +97,14 @@ def build_submission_folder(
for (system_id, model), runs in groups.items():
all_system_runs = runs_by_system[system_id]
if system_id not in written_systems:
max_concurrency = max(_extract_concurrency(r["config"]) for r in all_system_runs)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. The max concurrency is itself a field within the submission structure. I failed to notice that before, but I'll make that change when we get clarification.

_write_system_description(
submission_dir, system_id, model, all_system_runs, division
submission_dir, system_id, model, all_system_runs, division, max_concurrency
)
written_systems.add(system_id)
max_concurrency = max(_extract_concurrency(r["config"]) for r in all_system_runs)
_write_pareto_entries(submission_dir, system_id, model, runs, max_concurrency)
_write_accuracy_placeholders(submission_dir, system_id, model)

if _normalize_division(division) == "Standardized":
(submission_dir / "src").mkdir(exist_ok=True)
_write_run_entries(submission_dir, system_id, model, runs, max_concurrency)
_write_model_sweep_stubs(submission_dir, system_id, model)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function definitely shouldn't exist. We'll need to confirm what the csv files are, and generate them on the fly, probably.


return submission_dir

Expand Down Expand Up @@ -224,13 +231,10 @@ def _write_system_description(
model: str,
runs: list[dict[str, Any]],
division: str,
max_concurrency: int,
) -> None:
systems_dir = submission_dir / "systems"
systems_dir.mkdir(parents=True, exist_ok=True)

# Derive max_supported_concurrency from the highest concurrency across all runs
concurrencies = [_extract_concurrency(r["config"]) for r in runs]
max_concurrency = max(concurrencies) if concurrencies else 64
system_dir = submission_dir / system_id
system_dir.mkdir(parents=True, exist_ok=True)

si = runs[0]["system_info"]
cfg = runs[0]["config"]
Expand Down Expand Up @@ -281,12 +285,12 @@ def _write_system_description(
):
system_desc["host_processor_core_count"] = 1

(systems_dir / f"{system_id}.json").write_text(
(system_dir / "system_desc.json").write_text(
json.dumps(system_desc, indent=2), encoding="utf-8"
)


def _write_pareto_entries(
def _write_run_entries(
submission_dir: Path,
system_id: str,
model: str,
Expand All @@ -299,13 +303,10 @@ def _write_pareto_entries(

for run in runs:
concurrency = _extract_concurrency(run["config"])
model_dir = submission_dir / "pareto" / system_id / model
points_dir = model_dir / "points"
result_dir = model_dir / "results" / f"point_{concurrency}"
points_dir.mkdir(parents=True, exist_ok=True)
result_dir.mkdir(parents=True, exist_ok=True)
run_dir = submission_dir / system_id / model / f"r{concurrency}"
run_dir.mkdir(parents=True, exist_ok=True)

# Build point YAML from config.yaml + runtime_settings.json
# Build point.yaml from config.yaml + runtime_settings.json
cfg_settings = run["config"].get("settings", {}) or {}
load_pattern = cfg_settings.get("load_pattern", {}) or {}
rt_json = run.get("runtime_settings", {}) or {}
Expand All @@ -326,37 +327,65 @@ def _write_pareto_entries(
"dataset": dataset_name,
"runtime_settings": runtime_settings_out,
}
(points_dir / f"point_{concurrency}.yaml").write_text(
(run_dir / "point.yaml").write_text(
yaml.dump(point_cfg, default_flow_style=False), encoding="utf-8"
)

(result_dir / "mlperf_endpoints_log_summary.json").write_text(
(run_dir / "mlperf_endpoints_log_summary.json").write_text(
json.dumps(run["result_summary"], indent=2), encoding="utf-8"
)
(result_dir / "mlperf_endpoints_log_detail.json").write_text(
(run_dir / "mlperf_endpoints_log_detail.json").write_text(

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is also suspect. I can't believe I missed this earlier, but it's worrying that we build a submission with an empty directory here. We need to use real data.

"{}", encoding="utf-8"
)
(result_dir / "system_desc.json").write_text(
json.dumps(run["system_info"], indent=2), encoding="utf-8"
(run_dir / "run_metadata.json").write_text(
json.dumps(
{
"serving_framework": run["system_info"].get("framework", ""),
"parallelism": {},

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

"precision": "bfloat16",
},
indent=2,
),
encoding="utf-8",
)
(run_dir / "report.txt").write_text(
f"Run r{concurrency} — auto-generated placeholder\n", encoding="utf-8"
)

# src/<impl>/ stub — submitter fills in endpoint interface code
src_dir = run_dir / "src" / "vllm"
src_dir.mkdir(parents=True, exist_ok=True)
(src_dir / ".gitkeep").write_text("", encoding="utf-8")

# Per-run accuracy placeholder

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. This should be sourced from our database. If that isn't the case, we need to change things to match.

accuracy_dir = run_dir / "accuracy"
accuracy_dir.mkdir(exist_ok=True)
(accuracy_dir / "accuracy.txt").write_text("Accuracy pending\n", encoding="utf-8")
placeholder = {
"metric": "rouge1",
"score": 0.0,
"quality_target": 0.0,
"passed": True,
}
(accuracy_dir / "accuracy_result.json").write_text(
json.dumps(placeholder, indent=2), encoding="utf-8"
)


def _write_accuracy_placeholders(
def _write_model_sweep_stubs(

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting this again - we'll need to change this to an actual csv creator.

submission_dir: Path,
system_id: str,
model: str,
) -> None:
accuracy_dir = submission_dir / "pareto" / system_id / model / "accuracy"
accuracy_dir.mkdir(parents=True, exist_ok=True)
(accuracy_dir / "accuracy.txt").write_text("Accuracy pending\n", encoding="utf-8")
placeholder = {
"metric": "rouge1",
"score": 0.0,
"quality_target": 0.0,
"passed": True,
}
(accuracy_dir / "accuracy_result.json").write_text(
json.dumps(placeholder, indent=2), encoding="utf-8"
model_dir = submission_dir / system_id / model
model_dir.mkdir(parents=True, exist_ok=True)
(model_dir / "sweep_summary.csv").write_text(
"concurrency,qps,ttft_p50_ms,ttft_p95_ms,tpot_p50_ms,system_tps\n",
encoding="utf-8",
)
(model_dir / "sweep_distributions.csv").write_text(
"concurrency,percentile,ttft_ms,tpot_ms,output_tokens\n",
encoding="utf-8",
)


Expand Down
104 changes: 56 additions & 48 deletions src/endpoints_submission_cli/submissions/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import annotations

import os
import re
import shutil
import subprocess
from pathlib import Path
Expand Down Expand Up @@ -194,55 +195,62 @@ def prepare_pr_branch_merge(
repo_org_dir = repo_dir / submission_dir.name # e.g. repo_dir / "NVIDIA"

if repo_org_dir.exists():
fresh_pareto = submission_dir / "pareto"
repo_pareto = repo_org_dir / "pareto"

for fresh_model_dir in fresh_pareto.glob("*/*"): # <system_id>/<model>
rel = fresh_model_dir.relative_to(fresh_pareto)
repo_model_dir = repo_pareto / rel
repo_model_dir.mkdir(parents=True, exist_ok=True)

# points/ and accuracy/ — replace entirely from fresh build
for subdir_name in ("points", "accuracy"):
dest = repo_model_dir / subdir_name
if dest.exists():
shutil.rmtree(dest)
src = fresh_model_dir / subdir_name
if src.exists():
shutil.copytree(src, dest)

# results/ — surgical per-point update
fresh_results = fresh_model_dir / "results"
repo_results = repo_model_dir / "results"
if fresh_results.exists():
repo_results.mkdir(exist_ok=True)
# Remove point dirs no longer present in fresh build
fresh_point_names = {p.name for p in fresh_results.iterdir() if p.is_dir()}
for repo_point in list(repo_results.iterdir()):
if repo_point.is_dir() and repo_point.name not in fresh_point_names:
shutil.rmtree(repo_point)
# Update each point: replace log files, preserve system_desc.json
for fresh_point in fresh_results.iterdir():
if not fresh_point.is_dir():
# Iterate system directories in fresh build (identified by system_desc.json)
for fresh_sys_dir in sorted(submission_dir.iterdir()):
if not fresh_sys_dir.is_dir():
continue
repo_sys_dir = repo_org_dir / fresh_sys_dir.name
repo_sys_dir.mkdir(parents=True, exist_ok=True)

# system_desc.json: preserve PR version; seed from fresh build if absent
repo_sysdesc = repo_sys_dir / "system_desc.json"
fresh_sysdesc = fresh_sys_dir / "system_desc.json"
if not repo_sysdesc.exists() and fresh_sysdesc.exists():
shutil.copy2(fresh_sysdesc, repo_sysdesc)

for fresh_model_dir in sorted(fresh_sys_dir.iterdir()):
if not fresh_model_dir.is_dir():
continue
repo_model_dir = repo_sys_dir / fresh_model_dir.name
repo_model_dir.mkdir(parents=True, exist_ok=True)

# sweep CSVs — always replace from fresh build
for csv_name in ("sweep_summary.csv", "sweep_distributions.csv"):
src_csv = fresh_model_dir / csv_name
if src_csv.exists():
shutil.copy2(src_csv, repo_model_dir / csv_name)

# r<N>/ run dirs — surgical per-run update
fresh_run_names = {
d.name
for d in fresh_model_dir.iterdir()
if d.is_dir() and re.match(r"^r\d+$", d.name)
}
# Remove run dirs no longer present in fresh build
if repo_model_dir.exists():
for repo_run in list(repo_model_dir.iterdir()):
if repo_run.is_dir() and re.match(r"^r\d+$", repo_run.name) and repo_run.name not in fresh_run_names:
shutil.rmtree(repo_run)
# Update each run dir
for fresh_run in sorted(fresh_model_dir.iterdir()):
if not fresh_run.is_dir() or not re.match(r"^r\d+$", fresh_run.name):
continue
repo_point = repo_results / fresh_point.name
is_new_point = not repo_point.exists()
repo_point.mkdir(exist_ok=True)
for src_file in fresh_point.iterdir():
if src_file.name != "system_desc.json":
shutil.copy2(src_file, repo_point / src_file.name)
# system_desc.json: preserve PR version; seed only for new points
repo_sysdesc = repo_point / "system_desc.json"
if is_new_point or not repo_sysdesc.exists():
fresh_sysdesc = fresh_point / "system_desc.json"
if fresh_sysdesc.exists():
shutil.copy2(fresh_sysdesc, repo_sysdesc)
elif repo_results.exists():
shutil.rmtree(repo_results)

# systems/ — preserve PR version; seed from fresh build if absent
if not (repo_org_dir / "systems").exists():
shutil.copytree(submission_dir / "systems", repo_org_dir / "systems")
repo_run = repo_model_dir / fresh_run.name
is_new = not repo_run.exists()
repo_run.mkdir(exist_ok=True)
# Replace all items EXCEPT src/ (may have manually-authored endpoint config)
for item in fresh_run.iterdir():
dest = repo_run / item.name
if item.name == "src":
# Preserve from PR; seed only for new runs
if is_new and not dest.exists():
shutil.copytree(item, dest)
elif item.is_dir():
if dest.exists():
shutil.rmtree(dest)
shutil.copytree(item, dest)
else:
shutil.copy2(item, dest)
else:
# Org dir not yet on the PR branch — full copy (first push edge case).
shutil.copytree(submission_dir, repo_org_dir)
Expand Down
Loading
Loading