Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion tests/integration/defs/perf/open_search_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ def is_empty(value):
for field in match_keys:
history_value = history_data.get(field, None)
new_value = new_data.get(field, None)
# For boolean fields (b_ prefix), treat None/missing as False.
# This ensures backward compatibility when new boolean match keys
# are added — historical data without the field can still match
# current data where the field defaults to False.
if field.startswith("b_"):
if history_value is None:
history_value = False
if new_value is None:
new_value = False
if is_empty(history_value) and is_empty(new_value):
continue
if history_value != new_value:
Expand Down Expand Up @@ -138,7 +147,34 @@ def parse_timestamp(timestamp):
},
]
for key, value in common_values_dict.items():
must_clauses.append({"term": {key: value}})
if key.startswith("b_") and value is False:
# For boolean fields with value False, also match documents
# where the field is missing (backward compatibility for
# newly added boolean match keys).
must_clauses.append({
"bool": {
"should": [
{
"term": {
key: False
}
},
{
"bool": {
"must_not": [{
"exists": {
"field": key
}
}]
}
},
],
"minimum_should_match":
1,
}
})
else:
must_clauses.append({"term": {key: value}})
history_data_list = OpenSearchDB.queryPerfDataFromOpenSearchDB(
TEST_INFO_PROJECT_NAME, must_clauses, size=MAX_QUERY_SIZE)

Expand Down
125 changes: 122 additions & 3 deletions tests/integration/defs/perf/test_perf_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
"""TensorRT LLM perf sanity tests."""

import copy
import fcntl
import glob
import os
import re
import shutil
import socket
import subprocess
import time
Expand Down Expand Up @@ -60,6 +62,50 @@
"H200": "h200",
}

BENCH_SERVING_REPO = "https://github.com/kedarpotdar-nv/bench_serving.git"
BENCH_SERVING_COMMIT = "f3ea022a5780de5d0babc5fffa53634e2023d28f"
BENCH_SERVING_DIR = "/tmp/bench_serving"


def ensure_bench_serving_repo() -> str:
"""Clone bench_serving repo if not already present. Returns path to benchmark_serving.py.

Uses a file lock to avoid race conditions when multiple ranks within the
same container simultaneously attempt to clone the repository.
"""
bench_script = os.path.join(BENCH_SERVING_DIR, "benchmark_serving.py")
lock_file = BENCH_SERVING_DIR + ".lock"

with open(lock_file, "w") as lf:
fcntl.flock(lf, fcntl.LOCK_EX)
try:
if not os.path.exists(bench_script):
if os.path.exists(BENCH_SERVING_DIR):
shutil.rmtree(BENCH_SERVING_DIR)
subprocess.check_call(
["git", "clone", "--depth", "1", BENCH_SERVING_REPO, BENCH_SERVING_DIR]
)
subprocess.check_call(
[
"git",
"-C",
BENCH_SERVING_DIR,
"fetch",
"--depth",
"1",
"origin",
BENCH_SERVING_COMMIT,
]
)
subprocess.check_call(
["git", "-C", BENCH_SERVING_DIR, "checkout", BENCH_SERVING_COMMIT]
)
finally:
fcntl.flock(lf, fcntl.LOCK_UN)

return bench_script
Comment thread
chenfeiz0326 marked this conversation as resolved.


DEFAULT_TIMEOUT = 5400
AGG_CONFIG_FOLDER = os.environ.get("AGG_CONFIG_FOLDER", "tests/scripts/perf-sanity/aggregated")
DISAGG_CONFIG_FOLDER = os.environ.get(
Expand Down Expand Up @@ -439,6 +485,7 @@ def __init__(
self.trust_remote_code = client_config_data.get("trust_remote_code", True)
self.model_path = ""
self.dataset_file = client_config_data.get("dataset_file", "")
self.use_nv_sa_benchmark = client_config_data.get("use_nv_sa_benchmark", False)
self.env_vars = env_vars

# Generate default name if not provided
Expand All @@ -450,6 +497,48 @@ def to_cmd(self) -> List[str]:
"""Generate benchmark command."""
model_dir = get_model_dir(self.model_name)
self.model_path = model_dir if os.path.exists(model_dir) else self.model_name

if self.use_nv_sa_benchmark:
return self._to_sa_benchmark_cmd()
else:
return self._to_default_benchmark_cmd()

def _to_sa_benchmark_cmd(self) -> List[str]:
"""Generate SA benchmark command (bench_serving repo)."""
bench_script = ensure_bench_serving_repo()
benchmark_cmd = [
"python",
bench_script,
"--model",
self.model_path,
"--dataset-name",
"random",
"--num-prompts",
str(self.concurrency * self.iterations),
"--max-concurrency",
str(self.concurrency),
"--ignore-eos",
"--random-input-len",
str(self.isl),
"--random-output-len",
str(self.osl),
"--random-range-ratio",
str(self.random_range_ratio),
"--save-result",
"--percentile-metrics",
"ttft,tpot,itl,e2el",
]
if self.backend:
benchmark_cmd.extend(["--backend", self.backend])
if self.trust_remote_code:
benchmark_cmd.append("--trust-remote-code")
if self.use_chat_template:
benchmark_cmd.append("--use-chat-template")
# Note: bench_serving has no --non-streaming flag; streaming is backend-determined
return benchmark_cmd
Comment thread
chenfeiz0326 marked this conversation as resolved.

def _to_default_benchmark_cmd(self) -> List[str]:
"""Generate default benchmark command (tensorrt_llm benchmark_serving)."""
dataset_path = get_dataset_dir(self.dataset_file)
benchmark_cmd = [
"python",
Expand Down Expand Up @@ -513,6 +602,7 @@ def to_match_keys(self) -> List[str]:
"s_backend",
"b_use_chat_template",
"b_streaming",
"b_use_nv_sa_benchmark",
]

def to_db_data(self) -> dict:
Expand All @@ -529,6 +619,7 @@ def to_db_data(self) -> dict:
"b_use_chat_template": self.use_chat_template,
"b_streaming": self.streaming,
"b_trust_remote_code": self.trust_remote_code,
"b_use_nv_sa_benchmark": self.use_nv_sa_benchmark,
"s_client_log_link": "",
"s_client_env_vars": self.env_vars,
}
Expand Down Expand Up @@ -1292,6 +1383,7 @@ def _parse_disagg_config_file(self, config_file_path: str, config_file: str):
# For ctx_only: OSL is set to 1 and dataset_file is empty
osl = 1 if benchmark_mode == "ctx_only" else benchmark.get("output_length", 1024)
dataset_file = "" if benchmark_mode == "ctx_only" else benchmark.get("dataset_file", "")
use_nv_sa_benchmark = benchmark.get("use_nv_sa_benchmark", False)

client_configs = []
for concurrency in concurrency_values:
Expand All @@ -1305,6 +1397,7 @@ def _parse_disagg_config_file(self, config_file_path: str, config_file: str):
"use_chat_template": False,
"streaming": benchmark.get("streaming", True),
"dataset_file": dataset_file,
"use_nv_sa_benchmark": use_nv_sa_benchmark,
}
client_config = ClientConfig(
client_config_data,
Expand Down Expand Up @@ -1426,19 +1519,36 @@ def _check_benchmark_errors(self, output: str) -> None:
if not output:
return

# Check for non-zero failed requests
# Check for non-zero failed requests (default benchmark)
failed_requests_match = re.search(r"Failed requests:\s+(\d+)", output)
if failed_requests_match:
failed_count = int(failed_requests_match.group(1))
if failed_count > 0:
error_msg = f"Benchmark output contains {failed_count} failed requests."
raise RuntimeError(error_msg)

# Check for explicit failure markers
# Check for explicit failure markers (default benchmark)
if "!FAILED REQUESTS!" in output or "!CHECK LOG FOR ERRORS!" in output:
error_msg = "Benchmark output contains failure markers."
raise RuntimeError(error_msg)

# SA benchmark (bench_serving) only prints "Successful requests:"
# without "Failed requests:". Detect failures by comparing successful
# count against num_prompts from the Namespace output.
if not failed_requests_match:
successful_match = re.search(r"Successful requests:\s+(\d+)", output)
num_prompts_match = re.search(r"num_prompts=(\d+)", output)
if successful_match and num_prompts_match:
successful_count = int(successful_match.group(1))
num_prompts = int(num_prompts_match.group(1))
failed_count = num_prompts - successful_count
if failed_count > 0:
error_msg = (
f"SA benchmark: {failed_count} of {num_prompts} requests failed "
f"({successful_count} successful)."
)
raise RuntimeError(error_msg)

def run_ex(self, commands) -> Dict[int, List[str]]:
"""Run commands and collect outputs."""
outputs = {}
Expand Down Expand Up @@ -1478,8 +1588,17 @@ def parse_metrics_from_output(output: str) -> Optional[Dict[str, float]]:
for server_idx, client_configs in self.server_client_configs.items():
self._perf_results[server_idx] = []
server_outputs = outputs.get(server_idx, [])
for output in server_outputs:
for client_idx, output in enumerate(server_outputs):
metrics = parse_metrics_from_output(output)
# SA benchmark (bench_serving) doesn't report user_throughput.
# Use None as sentinel to distinguish "not available" from actual zero.
if (
metrics
and "user_throughput" not in metrics
and client_idx < len(client_configs)
and client_configs[client_idx].use_nv_sa_benchmark
):
metrics["user_throughput"] = None
self._perf_results[server_idx].append(metrics)

def check_test_failure(self):
Expand Down
Loading