From 0e8556c4ae2a62da81a74362d2ffe639bbb1e5da Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Mon, 18 Aug 2025 17:21:11 -0700 Subject: [PATCH 1/3] Use pytest --pyargs --- eval_protocol/benchmarks/__init__.py | 7 - eval_protocol/benchmarks/registry.py | 329 ------------------ eval_protocol/benchmarks/run.py | 100 ------ eval_protocol/benchmarks/suites/__init__.py | 1 - .../{suites/aime25.py => test_aime25.py} | 2 - .../{suites/gpqa.py => test_gpqa.py} | 6 +- ...sis.py => test_livebench_data_analysis.py} | 23 +- ...nch_retail.py => test_tau_bench_retail.py} | 16 +- eval_protocol/pytest/evaluation_test.py | 282 --------------- 9 files changed, 18 insertions(+), 748 deletions(-) delete mode 100644 eval_protocol/benchmarks/registry.py delete mode 100644 eval_protocol/benchmarks/run.py delete mode 100644 eval_protocol/benchmarks/suites/__init__.py rename eval_protocol/benchmarks/{suites/aime25.py => test_aime25.py} (97%) rename eval_protocol/benchmarks/{suites/gpqa.py => test_gpqa.py} (96%) rename eval_protocol/benchmarks/{suites/livebench_data_analysis.py => test_livebench_data_analysis.py} (95%) rename eval_protocol/benchmarks/{suites/tau_bench_retail.py => test_tau_bench_retail.py} (95%) diff --git a/eval_protocol/benchmarks/__init__.py b/eval_protocol/benchmarks/__init__.py index e248fe9b..e69de29b 100644 --- a/eval_protocol/benchmarks/__init__.py +++ b/eval_protocol/benchmarks/__init__.py @@ -1,7 +0,0 @@ -from .registry import export_benchmark, get_benchmark_runner, list_benchmarks - -__all__ = [ - "export_benchmark", - "get_benchmark_runner", - "list_benchmarks", -] diff --git a/eval_protocol/benchmarks/registry.py b/eval_protocol/benchmarks/registry.py deleted file mode 100644 index ce3c698e..00000000 --- a/eval_protocol/benchmarks/registry.py +++ /dev/null @@ -1,329 +0,0 @@ -""" -Benchmark registry and export decorator. - -This module provides a lightweight registry for benchmarks and a decorator -`@export_benchmark(name)` that can be stacked with `@evaluation_test`. - -It registers a runnable handle that executes the exact same evaluation pipeline -as the pytest flow by calling `run_evaluation_test_direct` with the parameters -captured from the decorated function. - -Usage in a suite module (stack under @evaluation_test): - - from eval_protocol.benchmarks.registry import export_benchmark - - @export_benchmark("aime25_low") - @evaluation_test(...) - def test_aime_pointwise(row: EvaluationRow) -> EvaluationRow: - ... - -Programmatic run: - - from eval_protocol.benchmarks.registry import get_benchmark_runner - get_benchmark_runner("aime25_low")(model="fireworks_ai/...", print_summary=True, out="artifacts/aime.json") -""" - -from __future__ import annotations - -import json -import os -from typing import Any, Callable, Dict, List, Optional - -# Global registry: name -> callable runner -_BENCHMARK_REGISTRY: Dict[str, Callable[..., Any]] = {} - - -def list_benchmarks() -> List[str]: - return sorted(_BENCHMARK_REGISTRY.keys()) - - -def get_benchmark_runner(name: str) -> Callable[..., Any]: - try: - return _BENCHMARK_REGISTRY[name] - except KeyError as exc: - raise KeyError(f"Benchmark '{name}' not found. Available: {list_benchmarks()}") from exc - - -def export_benchmark(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: - """ - Decorator to export a benchmark test into the global registry. - - This expects to be stacked with `@evaluation_test`, so the decorated function - should carry `__ep_config` and `__ep_original_test_func` attributes that the - decorator can read to construct a direct runner. - - The registered runner supports a subset of convenient overrides and maps them - to the same EP_* environment variables used by the pytest plugin to ensure - identical summaries and JSON artifact behavior. - """ - - def _decorator(test_wrapper: Callable[..., Any]) -> Callable[..., Any]: - # Pull through metadata attached by evaluation_test - ep_config: Dict[str, Any] = getattr(test_wrapper, "__ep_config", {}) - original_test_func: Optional[Callable[..., Any]] = getattr(test_wrapper, "__ep_original_test_func", None) - - def _runner( - *, - model: Optional[str] = None, - print_summary: bool = False, - out: Optional[str] = None, - reasoning_effort: Optional[str] = None, - max_rows: Optional[int | str] = None, - num_runs: Optional[int] = None, - input_params_override: Optional[Dict[str, Any]] = None, - max_concurrency: Optional[int] = None, - ) -> Any: - # Map convenience flags to EP_* env used by the pytest flow - if print_summary: - os.environ["EP_PRINT_SUMMARY"] = "1" - if out: - os.environ["EP_SUMMARY_JSON"] = out - # Merge reasoning effort and arbitrary overrides into EP_INPUT_PARAMS_JSON - merged: Dict[str, Any] = {} - if reasoning_effort: - # Fireworks OpenAI-compatible endpoint expects extra_body.reasoning_effort, not nested reasoning dict - merged.setdefault("extra_body", {})["reasoning_effort"] = str(reasoning_effort) - if input_params_override: - - def _deep_update(base: Dict[str, Any], over: Dict[str, Any]) -> Dict[str, Any]: - for k, v in over.items(): - if isinstance(v, dict) and isinstance(base.get(k), dict): - _deep_update(base[k], v) - else: - base[k] = v - return base - - merged = _deep_update(merged, dict(input_params_override)) - if merged: - os.environ["EP_INPUT_PARAMS_JSON"] = json.dumps(merged) - - if max_rows is not None: - if isinstance(max_rows, str) and max_rows.strip().lower() == "all": - os.environ["EP_MAX_DATASET_ROWS"] = "None" - else: - os.environ["EP_MAX_DATASET_ROWS"] = str(max_rows) - - # Build effective parameters, preferring overrides - models: List[str] = ep_config.get("model") or [] - model_to_use = model or (models[0] if models else None) - if not model_to_use: - raise ValueError(f"No model provided and none captured from evaluation_test for benchmark '{name}'") - - input_messages = ep_config.get("input_messages") - input_dataset = ep_config.get("input_dataset") - dataset_adapter = ep_config.get("dataset_adapter") - rollout_input_params_list = ep_config.get("rollout_input_params") - rollout_processor = ep_config.get("rollout_processor") - rollout_processor_kwargs = ep_config.get("rollout_processor_kwargs") - aggregation_method = ep_config.get("aggregation_method") - threshold = ep_config.get("threshold_of_success") - default_num_runs = ep_config.get("num_runs") - max_dataset_rows = ep_config.get("max_dataset_rows") - mcp_config_path = ep_config.get("mcp_config_path") - max_concurrent_rollouts = ep_config.get("max_concurrent_rollouts") - if max_concurrency is not None: - max_concurrent_rollouts = int(max_concurrency) - server_script_path = ep_config.get("server_script_path") - steps = ep_config.get("steps") - mode = ep_config.get("mode") - # combine_datasets captured but not used here - - # Choose the first rollout param set by default - rollout_params = None - if isinstance(rollout_input_params_list, list) and rollout_input_params_list: - rollout_params = rollout_input_params_list[0] - - # Import runner lazily to avoid hard import dependencies and circulars - import importlib - - _mod = importlib.import_module("eval_protocol.pytest.evaluation_test") - run_evaluation_test_direct = getattr(_mod, "run_evaluation_test_direct") - - return run_evaluation_test_direct( - test_func=original_test_func or test_wrapper, - model=model_to_use, - input_messages=input_messages, - input_dataset=input_dataset, - dataset_adapter=dataset_adapter, - rollout_input_params=rollout_params, - rollout_processor=rollout_processor, - rollout_processor_kwargs=rollout_processor_kwargs, - aggregation_method=aggregation_method, - threshold_of_success=threshold, - num_runs=(num_runs if num_runs is not None else default_num_runs), - max_dataset_rows=max_dataset_rows, - mcp_config_path=mcp_config_path, - max_concurrent_rollouts=max_concurrent_rollouts, - server_script_path=server_script_path, - steps=steps, - mode=mode, - ) - - # Register runner - if name in _BENCHMARK_REGISTRY: - # Overwrite with latest definition - _BENCHMARK_REGISTRY[name] = _runner - else: - _BENCHMARK_REGISTRY[name] = _runner - - return test_wrapper - - return _decorator - - -def register_composite_benchmark(name: str, children: List[str]) -> None: - """ - Register a composite benchmark that runs multiple exported benchmarks and aggregates results. - - The composite runner forwards common overrides to each child benchmark and aggregates - a combined score as a rows-weighted mean of each child's aggregated score. - - Args: - name: Name of the composite benchmark to register. - children: List of child benchmark names previously registered via export_benchmark. - """ - - def _composite_runner( - *, - model: Optional[str] = None, - print_summary: bool = False, - out: Optional[str] = None, - reasoning_effort: Optional[str] = None, - max_rows: Optional[int | str] = None, - num_runs: Optional[int] = None, - input_params_override: Optional[Dict[str, Any]] = None, - max_concurrency: Optional[int] = None, - ) -> Dict[str, Any]: - # Resolve child runners at call-time to ensure all suites are imported - # Local import avoided to prevent circular import at module import time - _get_benchmark_runner = get_benchmark_runner - import pathlib as _pathlib - import time as _time - - _json = json - - child_summaries: List[Dict[str, Any]] = [] - total_rows = 0 - weighted_sum = 0.0 - # For per-metric aggregation across children - metric_weighted_sums: Dict[str, float] = {} - metric_total_rows: Dict[str, int] = {} - combined_rows: List[Any] = [] - - # If 'out' is a file path, also compute a directory for child artifacts - child_out_dir: Optional[str] = None - if out: - p = _pathlib.Path(out) - if p.suffix.lower() == ".json" and not str(out).endswith("/"): - # Use parent directory for child artifacts - child_out_dir = str(p.parent) - else: - child_out_dir = out - - for child_name in children: - runner = _get_benchmark_runner(child_name) - result = runner( - model=model, - print_summary=print_summary, - out=child_out_dir, - reasoning_effort=reasoning_effort, - max_rows=max_rows, - num_runs=num_runs, - input_params_override=input_params_override, - max_concurrency=max_concurrency, - ) - summary = (result or {}).get("summary") if isinstance(result, dict) else None - if not summary: - continue - # Gather underlying rows to recompute CI across children - try: - rows_obj = result.get("results") if isinstance(result, dict) else None - if isinstance(rows_obj, list): - combined_rows.extend(rows_obj) - except Exception: - pass - child_summaries.append(summary) - rows = int(summary.get("rows", 0) or 0) - agg = summary.get("agg_score") - if isinstance(agg, (int, float)) and rows > 0: - total_rows += rows - weighted_sum += float(agg) * rows - # Combine per-metric means if available - metrics_agg = summary.get("metrics_agg") or {} - if isinstance(metrics_agg, dict): - for m_name, m_vals in metrics_agg.items(): - m_mean = m_vals.get("mean") - if isinstance(m_mean, (int, float)) and rows > 0: - metric_weighted_sums[m_name] = metric_weighted_sums.get(m_name, 0.0) + float(m_mean) * rows - metric_total_rows[m_name] = metric_total_rows.get(m_name, 0) + rows - - combined_agg = (weighted_sum / total_rows) if total_rows > 0 else None - # Compute 95% CI for combined rows if available - ci_low: Optional[float] = None - ci_high: Optional[float] = None - if combined_rows: - try: - from eval_protocol.stats.confidence_intervals import compute_fixed_set_mu_ci as _compute_ci - - r = _compute_ci(combined_rows) - if r and len(r) >= 3 and r[1] is not None and r[2] is not None: - ci_low = float(r[1]) - ci_high = float(r[2]) - except Exception: - ci_low = None - ci_high = None - combined_metrics: Dict[str, Dict[str, float]] = {} - for m_name, wsum in metric_weighted_sums.items(): - denom = metric_total_rows.get(m_name, 0) - if denom > 0: - combined_metrics[m_name] = {"mean": float(wsum / denom)} - combined = { - "suite": name, - "model": model, - "agg_score": float(combined_agg) if combined_agg is not None else None, - "rows": total_rows, - "children": child_summaries, - "num_runs": num_runs, - **({"metrics_agg": combined_metrics} if combined_metrics else {}), - **({"agg_ci_low": ci_low, "agg_ci_high": ci_high} if (ci_low is not None and ci_high is not None) else {}), - } - - # Optional print and persist - # Respect either function arg or EP_PRINT_SUMMARY env - _should_print = print_summary or (os.getenv("EP_PRINT_SUMMARY") == "1") - if _should_print: - try: - if combined_agg is not None: - if ci_low is not None and ci_high is not None: - print( - f"EP Summary | suite={name} model={model} agg={combined['agg_score']:.3f} ci95=[{ci_low:.3f},{ci_high:.3f}] rows={total_rows}" - ) - else: - print( - f"EP Summary | suite={name} model={model} agg={combined['agg_score']:.3f} rows={total_rows}" - ) - else: - print(f"EP Summary | suite={name} model={model} agg=None rows={total_rows}") - except Exception: - pass - - if out: - out_path = _pathlib.Path(out) - if out_path.suffix.lower() == ".json" and not str(out).endswith("/"): - # Write to the specified file - out_path.parent.mkdir(parents=True, exist_ok=True) - with open(out_path, "w", encoding="utf-8") as f: - _json.dump({**combined, "timestamp": int(_time.time())}, f) - else: - # Treat as directory - dir_path = out_path - dir_path.mkdir(parents=True, exist_ok=True) - safe_name = name.replace("/", "__") - file_path = dir_path / f"{safe_name}__composite.json" - with open(file_path, "w", encoding="utf-8") as f: - _json.dump({**combined, "timestamp": int(_time.time())}, f) - - return {"summary": combined} - - # Register (overwrite if exists) - _BENCHMARK_REGISTRY[name] = _composite_runner diff --git a/eval_protocol/benchmarks/run.py b/eval_protocol/benchmarks/run.py deleted file mode 100644 index a5afe900..00000000 --- a/eval_protocol/benchmarks/run.py +++ /dev/null @@ -1,100 +0,0 @@ -""" -Minimal CLI runner for exported benchmarks. - -Usage: - - python -m eval_protocol.benchmarks.run aime25_low \ - --model fireworks_ai/accounts/fireworks/models/gpt-oss-120b \ - --print-summary \ - --out artifacts/aime25_low.json \ - --max-rows 50 \ - --reasoning-effort low -""" - -from __future__ import annotations - -import argparse -import pkgutil -from importlib import import_module -from typing import Any - -import eval_protocol.benchmarks.suites as suites_pkg -from eval_protocol.benchmarks.registry import get_benchmark_runner, list_benchmarks - - -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Run an exported eval-protocol benchmark") - parser.add_argument("name", help=f"Benchmark name. Known: {', '.join(list_benchmarks()) or '(none)'}") - parser.add_argument("--model", required=True, help="Model identifier (provider/model)") - parser.add_argument("--print-summary", action="store_true", help="Print concise EP summary line") - parser.add_argument("--out", help="Write JSON summary artifact to path or directory") - parser.add_argument( - "--reasoning-effort", - choices=["low", "medium", "high"], - help="Sets extra_body.reasoning.effort via EP_INPUT_PARAMS_JSON", - ) - parser.add_argument( - "--max-rows", - help="Limit rows: integer or 'all' for no limit (maps to EP_MAX_DATASET_ROWS)", - ) - parser.add_argument("--num-runs", type=int, help="Override num_runs if provided") - parser.add_argument("--max-tokens", type=int, help="Override max_tokens for generation requests") - parser.add_argument("--max-concurrency", type=int, help="Override max concurrent rollouts") - # Allow overriding reasoning effort explicitly (low/medium/high). If omitted, suite default is used. - # Already mapped by --reasoning-effort above. - return parser.parse_args() - - -def main() -> int: - args = _parse_args() - # Auto-import all suite modules so their @export_benchmark decorators register - # Import all suite modules so their @export_benchmark decorators register - import sys - import traceback - - for modinfo in pkgutil.iter_modules(suites_pkg.__path__): - mod_name = f"{suites_pkg.__name__}.{modinfo.name}" - try: - import_module(mod_name) - except Exception as e: - print(f"[bench] failed to import suite module: {mod_name}: {e}", file=sys.stderr) - traceback.print_exc() - # Fallback: if nothing registered yet and a known suite was requested, try explicit import - if not list_benchmarks(): - known_map = { - "aime25_low": "eval_protocol.benchmarks.suites.aime25", - } - forced = known_map.get(args.name) - if forced: - try: - import_module(forced) - except Exception as e: - print(f"[bench] explicit import failed for {forced}: {e}", file=sys.stderr) - runner = get_benchmark_runner(args.name) - max_rows: int | str | None = None - if args.max_rows is not None: - try: - max_rows = int(args.max_rows) - except Exception: - max_rows = str(args.max_rows) - # Build input params override if needed - ip_override = {} - if args.max_tokens is not None: - ip_override["max_tokens"] = int(args.max_tokens) - - _ = runner( - model=args.model, - print_summary=args.print_summary, - out=args.out, - reasoning_effort=args.reasoning_effort, - max_rows=max_rows, - num_runs=args.num_runs, - input_params_override=(ip_override or None), - max_concurrency=args.max_concurrency, - ) - # Non-zero exit on failure gate is handled within the runner via assertions - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/eval_protocol/benchmarks/suites/__init__.py b/eval_protocol/benchmarks/suites/__init__.py deleted file mode 100644 index d0effd69..00000000 --- a/eval_protocol/benchmarks/suites/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Suite modules are auto-imported by eval_protocol.benchmarks.run to register benchmarks. diff --git a/eval_protocol/benchmarks/suites/aime25.py b/eval_protocol/benchmarks/test_aime25.py similarity index 97% rename from eval_protocol/benchmarks/suites/aime25.py rename to eval_protocol/benchmarks/test_aime25.py index 755795df..79769cf2 100644 --- a/eval_protocol/benchmarks/suites/aime25.py +++ b/eval_protocol/benchmarks/test_aime25.py @@ -1,6 +1,5 @@ from typing import Any, Dict, List, Optional -from eval_protocol.benchmarks.registry import export_benchmark from eval_protocol.models import EvaluateResult, EvaluationRow, Message, MetricResult from eval_protocol.pytest.default_single_turn_rollout_process import ( SingleTurnRolloutProcessor, @@ -58,7 +57,6 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: return converted -@export_benchmark("aime25") @evaluation_test( input_dataset=[ "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-I.jsonl", diff --git a/eval_protocol/benchmarks/suites/gpqa.py b/eval_protocol/benchmarks/test_gpqa.py similarity index 96% rename from eval_protocol/benchmarks/suites/gpqa.py rename to eval_protocol/benchmarks/test_gpqa.py index ced8ac9f..7fc8a6f0 100644 --- a/eval_protocol/benchmarks/suites/gpqa.py +++ b/eval_protocol/benchmarks/test_gpqa.py @@ -6,7 +6,6 @@ import requests -from eval_protocol.benchmarks.registry import export_benchmark from eval_protocol.models import EvaluateResult, EvaluationRow, Message, MetricResult from eval_protocol.pytest.default_single_turn_rollout_process import ( SingleTurnRolloutProcessor, @@ -90,7 +89,6 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> return self.single_turn_processor(processed, config) -@export_benchmark("gpqa") @evaluation_test( input_messages=_GPQA_INPUT_MESSAGES, completion_params=[ @@ -99,10 +97,10 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> rollout_processor=GPQAStripGTRolloutProcessor(), aggregation_method="mean", passed_threshold=None, - num_runs=8, + num_runs=1, mode="pointwise", ) -def gpqa_pointwise(row: EvaluationRow) -> EvaluationRow: +def test_gpqa_pointwise(row: EvaluationRow) -> EvaluationRow: assistant_msgs = [m for m in row.messages if m.role == "assistant"] content = assistant_msgs[-1].content if assistant_msgs else "" diff --git a/eval_protocol/benchmarks/suites/livebench_data_analysis.py b/eval_protocol/benchmarks/test_livebench_data_analysis.py similarity index 95% rename from eval_protocol/benchmarks/suites/livebench_data_analysis.py rename to eval_protocol/benchmarks/test_livebench_data_analysis.py index da384439..d1eaaead 100644 --- a/eval_protocol/benchmarks/suites/livebench_data_analysis.py +++ b/eval_protocol/benchmarks/test_livebench_data_analysis.py @@ -2,7 +2,6 @@ import re from typing import Any, Dict, List, Optional -from eval_protocol.benchmarks.registry import export_benchmark, register_composite_benchmark from eval_protocol.models import EvaluateResult, EvaluationRow, Message, MetricResult from eval_protocol.pytest.default_single_turn_rollout_process import ( SingleTurnRolloutProcessor, @@ -370,7 +369,6 @@ def _extract_gt(row: EvaluationRow) -> Dict[str, Any]: _CTA_ROWS = _load_livebench_da_messages("cta") -@export_benchmark("live_bench/data_analysis/cta") @evaluation_test( completion_params=[{"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b"}], input_messages=[[m for m in r.messages] for r in _CTA_ROWS], @@ -381,7 +379,7 @@ def _extract_gt(row: EvaluationRow) -> Dict[str, Any]: num_runs=4, mode="pointwise", ) -def livebench_cta_pointwise(row: EvaluationRow) -> EvaluationRow: +def test_livebench_cta_pointwise(row: EvaluationRow) -> EvaluationRow: assistant_msgs = [m for m in row.messages if m.role == "assistant"] content = assistant_msgs[-1].content if assistant_msgs else "" payload = _extract_gt(row) @@ -413,7 +411,6 @@ def livebench_cta_pointwise(row: EvaluationRow) -> EvaluationRow: _TABLEJOIN_ROWS = _load_livebench_da_messages("tablejoin") -@export_benchmark("live_bench/data_analysis/tablejoin") @evaluation_test( completion_params=[{"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b"}], input_messages=[[m for m in r.messages] for r in _TABLEJOIN_ROWS], @@ -421,10 +418,10 @@ def livebench_cta_pointwise(row: EvaluationRow) -> EvaluationRow: rollout_processor=SingleTurnRolloutProcessor(), aggregation_method="mean", passed_threshold=None, - num_runs=4, + num_runs=1, mode="pointwise", ) -def livebench_tablejoin_pointwise(row: EvaluationRow) -> EvaluationRow: +def test_livebench_tablejoin_pointwise(row: EvaluationRow) -> EvaluationRow: user_msgs = [m for m in row.messages if m.role == "user"] question = user_msgs[-1].content if user_msgs else "" assistant_msgs = [m for m in row.messages if m.role == "assistant"] @@ -457,7 +454,6 @@ def livebench_tablejoin_pointwise(row: EvaluationRow) -> EvaluationRow: _TABLEREFORMAT_ROWS = _load_livebench_da_messages("tablereformat") -@export_benchmark("live_bench/data_analysis/tablereformat") @evaluation_test( completion_params=[{"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b"}], input_messages=[[m for m in r.messages] for r in _TABLEREFORMAT_ROWS], @@ -468,7 +464,7 @@ def livebench_tablejoin_pointwise(row: EvaluationRow) -> EvaluationRow: num_runs=4, mode="pointwise", ) -def livebench_tablereformat_pointwise(row: EvaluationRow) -> EvaluationRow: +def test_livebench_tablereformat_pointwise(row: EvaluationRow) -> EvaluationRow: user_msgs = [m for m in row.messages if m.role == "user"] question = user_msgs[-1].content if user_msgs else "" assistant_msgs = [m for m in row.messages if m.role == "assistant"] @@ -496,14 +492,3 @@ def livebench_tablereformat_pointwise(row: EvaluationRow) -> EvaluationRow: }, ) return row - - -# Register a composite benchmark that aggregates all three LiveBench Data Analysis tests -register_composite_benchmark( - name="live_bench/data_analysis", - children=[ - "live_bench/data_analysis/cta", - "live_bench/data_analysis/tablejoin", - "live_bench/data_analysis/tablereformat", - ], -) diff --git a/eval_protocol/benchmarks/suites/tau_bench_retail.py b/eval_protocol/benchmarks/test_tau_bench_retail.py similarity index 95% rename from eval_protocol/benchmarks/suites/tau_bench_retail.py rename to eval_protocol/benchmarks/test_tau_bench_retail.py index 6c0a8a36..7ee7b7bd 100644 --- a/eval_protocol/benchmarks/suites/tau_bench_retail.py +++ b/eval_protocol/benchmarks/test_tau_bench_retail.py @@ -10,7 +10,6 @@ from pathlib import Path from typing import Any, Dict, List -from eval_protocol.benchmarks.registry import export_benchmark from eval_protocol.models import EvaluateResult, EvaluationRow, InputMetadata, Message from eval_protocol.pytest import evaluation_test from eval_protocol.pytest.default_mcp_gym_rollout_processor import MCPGymRolloutProcessor @@ -29,6 +28,16 @@ from vendor.tau2.registry import registry +def _get_retail_dataset_path() -> str: + """Get the retail dataset file path.""" + return str(Path(__file__).parent.parent.parent.parent / "tests" / "pytest" / "data" / "retail_dataset.jsonl") + + +def _get_server_script_path() -> str: + """Get the tau2 mcp server script path.""" + return str(Path(__file__).parent.parent.parent.parent / "examples" / "tau2_mcp" / "server.py") + + def tau_bench_retail_to_evaluation_row(data: List[Dict[str, Any]]) -> List[EvaluationRow]: """ Convert entries from retail dataset to EvaluationRow objects. @@ -62,9 +71,8 @@ def tau_bench_retail_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evalu return rows -@export_benchmark("tau_bench_retail") @evaluation_test( - input_dataset=["tests/pytest/data/retail_dataset.jsonl"], + input_dataset=[_get_retail_dataset_path()], dataset_adapter=tau_bench_retail_to_evaluation_row, completion_params=[ { @@ -78,7 +86,7 @@ def tau_bench_retail_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evalu num_runs=8, mode="pointwise", max_concurrent_rollouts=50, - server_script_path="examples/tau2_mcp/server.py", + server_script_path=_get_server_script_path(), ) def test_tau_bench_retail_evaluation(row: EvaluationRow) -> EvaluationRow: """ diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index beac9acb..2433b597 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -684,288 +684,6 @@ async def dual_mode_wrapper(*args, **kwargs): # Create the dual mode wrapper dual_mode_wrapper = create_dual_mode_wrapper() - # Attach metadata so non-pytest runners (e.g., export_benchmark) can reconstruct runs - try: - dual_mode_wrapper.__ep_original_test_func = test_func # type: ignore[attr-defined] - dual_mode_wrapper.__ep_config = { - "input_messages": input_messages, - "input_dataset": input_dataset, - "dataset_adapter": dataset_adapter, - "rollout_input_params": completion_params, - "rollout_processor": rollout_processor, - "evaluation_test_kwargs": evaluation_test_kwargs, - "rollout_processor_kwargs": rollout_processor_kwargs, - "aggregation_method": aggregation_method, - "passed_threshold": passed_threshold, - "num_runs": num_runs, - "max_dataset_rows": max_dataset_rows, - "mcp_config_path": mcp_config_path, - "max_concurrent_rollouts": max_concurrent_rollouts, - "server_script_path": server_script_path, - "steps": steps, - "mode": mode, - "combine_datasets": combine_datasets, - } # type: ignore[attr-defined] - - # Provide a direct runner method to avoid external imports - def __ep_run_direct( - *, - model_override: str | None = None, - num_runs_override: int | None = None, - rollout_input_params_override: Dict[str, Any] | None = None, - ): - cfg = dual_mode_wrapper.__ep_config # type: ignore[attr-defined] - models = cfg.get("model") or [] - _model = model_override or (models[0] if models else None) - if not _model: - raise ValueError("No model provided for direct run") - rip = rollout_input_params_override - if rip is None: - rip_list = cfg.get("rollout_input_params") - rip = rip_list[0] if isinstance(rip_list, list) and rip_list else {} - return run_evaluation_test_direct( - test_func=dual_mode_wrapper.__ep_original_test_func, # type: ignore[attr-defined] - input_messages=cfg.get("input_messages"), - input_dataset=cfg.get("input_dataset"), - dataset_adapter=cfg.get("dataset_adapter"), - completion_params=rip, - rollout_processor=cfg.get("rollout_processor"), - aggregation_method=cfg.get("aggregation_method"), - passed_threshold=cfg.get("passed_threshold"), - num_runs=(num_runs_override if num_runs_override is not None else cfg.get("num_runs")), - max_dataset_rows=cfg.get("max_dataset_rows"), - mcp_config_path=cfg.get("mcp_config_path"), - max_concurrent_rollouts=cfg.get("max_concurrent_rollouts"), - server_script_path=cfg.get("server_script_path"), - steps=cfg.get("steps"), - mode=cfg.get("mode"), - combine_datasets=cfg.get("combine_datasets"), - ) - - dual_mode_wrapper.__ep_run_direct = __ep_run_direct # type: ignore[attr-defined] - except Exception: - # Best-effort; never fail pytest setup due to metadata attachment - pass - return dual_mode_wrapper return decorator - - -def run_evaluation_test_direct( - *, - test_func: TestFunction, - input_messages: Optional[List[InputMessagesParam]] = None, - input_dataset: Optional[List[DatasetPathParam]] = None, - dataset_adapter: Callable[[List[Dict[str, Any]]], Dataset] = default_dataset_adapter, - completion_params: Optional[CompletionParams] = None, - rollout_processor: RolloutProcessor = NoOpRolloutProcessor(), - rollout_processor_kwargs: Optional[RolloutProcessorInputParam] = None, - aggregation_method: AggregationMethod = "mean", - passed_threshold: Optional[Union[EvaluationThreshold, float]] = None, - num_runs: int = 1, - max_dataset_rows: Optional[int] = None, - mcp_config_path: Optional[str] = None, - max_concurrent_rollouts: int = 8, - server_script_path: Optional[str] = None, - steps: int = 30, - mode: EvaluationTestMode = "batch", - combine_datasets: bool = True, -) -> Dict[str, Any]: - """ - Programmatic runner that executes the same pipeline as @evaluation_test without pytest. - Honors EP_* env overrides and emits the same summary/JSON artifact. - Returns a dict with keys: summary, results. - """ - - if passed_threshold is not None and not isinstance(passed_threshold, EvaluationThreshold): - passed_threshold = EvaluationThreshold(success=passed_threshold) - - # Build dataset/messages - data: List[EvaluationRow] = [] - if input_dataset is not None: - # Concatenate rows across multiple paths/URLs - data_jsonl: List[Dict[str, Any]] = [] - for p in input_dataset: - data_jsonl.extend(load_jsonl(p)) - effective_max_rows = parse_ep_max_rows(max_dataset_rows) - if effective_max_rows is not None: - data_jsonl = data_jsonl[:effective_max_rows] - data = dataset_adapter(data_jsonl) - elif input_messages is not None: - effective_max_rows = parse_ep_max_rows(max_dataset_rows) - msgs = input_messages - if effective_max_rows is not None and isinstance(msgs, list): - msgs = msgs[:effective_max_rows] # type: ignore - if isinstance(msgs, list) and msgs and isinstance(msgs[0], Message): - data = [EvaluationRow(messages=msgs)] # type: ignore[arg-type] - else: - data = [EvaluationRow(messages=m) for m in msgs] # type: ignore - else: - raise ValueError("No input dataset or input messages provided") - - # Build input params and apply env JSON override - completion_params: Dict[str, Any] = completion_params or {} - try: - import json as _json - - _env_override = os.getenv("EP_INPUT_PARAMS_JSON") - if _env_override: - override_obj = _json.loads(_env_override) - if isinstance(override_obj, dict): - completion_params = deep_update_dict(dict(completion_params), override_obj) - except Exception: - pass - - # Prepare metadata - eval_metadata = EvalMetadata( - name=test_func.__name__, - description=test_func.__doc__, - status="running", - num_runs=num_runs, - aggregation_method=aggregation_method, - passed_threshold=passed_threshold, - passed=None, - ) - - for row in data: - if row.input_metadata is None: - row.input_metadata = InputMetadata() - row.input_metadata.completion_params = completion_params - if row.input_metadata.session_data is None: - row.input_metadata.session_data = {} - row.input_metadata.session_data["mode"] = mode - row.eval_metadata = eval_metadata - row.pid = os.getpid() - default_logger.log(row) - - config = RolloutProcessorConfig( - completion_params=completion_params, - mcp_config_path=mcp_config_path or "", - max_concurrent_rollouts=max_concurrent_rollouts, - server_script_path=server_script_path, - steps=steps, - kwargs=rollout_processor_kwargs or {}, - ) - - all_results: List[EvaluationRow] = [] - try: - for _ in range(num_runs): - fresh_rows = [copy.deepcopy(r) for r in data] - processed_rows = execute_function(rollout_processor, rows=fresh_rows, config=config) - if mode == "pointwise": - for row in processed_rows: - result = execute_function(test_func, row=row) - if result is None or not isinstance(result, EvaluationRow): - raise ValueError( - f"Test function {test_func.__name__} did not return an EvaluationRow instance." - ) - all_results.append(result) - else: - results = execute_function(test_func, rows=processed_rows) - if results is None or not isinstance(results, list) or not results: - raise ValueError( - f"Test function {test_func.__name__} did not return a non-empty list of EvaluationRow instances." - ) - if not all(isinstance(r, EvaluationRow) for r in results): - raise ValueError( - f"Test function {test_func.__name__} returned a list containing non-EvaluationRow instances." - ) - all_results.extend(results) - - scores = [r.evaluation_result.score for r in all_results if r.evaluation_result] - agg_score = aggregate(scores, aggregation_method) - - ci_low: float | None = None - ci_high: float | None = None - if aggregation_method == "mean": - try: - result_ci = compute_fixed_set_mu_ci(all_results) - mu_ci_low, mu_ci_high = result_ci[1], result_ci[2] - if mu_ci_low is not None and mu_ci_high is not None: - ci_low = float(mu_ci_low) - ci_high = float(mu_ci_high) - except Exception: - ci_low = None - ci_high = None - - passed = None - if passed_threshold is not None: - passed = agg_score >= passed_threshold.success - for r in all_results: - if r.eval_metadata is not None: - r.eval_metadata.status = "finished" - r.eval_metadata.passed = passed - default_logger.log(r) - - # Summary/JSON artifact (same EP_* env behavior) - summary_obj: Dict[str, Any] = {} - try: - should_print = os.getenv("EP_PRINT_SUMMARY") == "1" - summary_path = os.getenv("EP_SUMMARY_JSON") - suite_name = test_func.__name__ - total_rows = len(all_results) - summary_obj = { - "suite": suite_name, - "model": config.completion_params["model"], - "agg_score": float(agg_score) if agg_score is not None else None, - "num_runs": num_runs, - "rows": total_rows, - } - if ci_low is not None and ci_high is not None: - summary_obj["agg_ci_low"] = ci_low - summary_obj["agg_ci_high"] = ci_high - if should_print: - if ci_low is not None and ci_high is not None: - print( - f"EP Summary | suite={suite_name} model={config.completion_params['model']} agg={summary_obj['agg_score']:.3f} ci95=[{ci_low:.3f},{ci_high:.3f}] runs={num_runs} rows={total_rows}" - ) - else: - print( - f"EP Summary | suite={suite_name} model={config.completion_params['model']} agg={summary_obj['agg_score']:.3f} runs={num_runs} rows={total_rows}" - ) - if summary_path: - import json as _json - import pathlib as _pathlib - import time as _time - - model_slug = sanitize_filename(config.completion_params["model"]) - effort_tag = extract_effort_tag(completion_params) or "" - effort_suffix = f"__effort-{sanitize_filename(effort_tag)}" if effort_tag else "" - base_name = f"{suite_name}__{model_slug}{effort_suffix}__{mode}__runs{num_runs}.json" - - p = _pathlib.Path(summary_path) - summary_obj["timestamp"] = int(_time.time()) - if p.suffix.lower() != ".json" or str(summary_path).endswith("/") or p.is_dir(): - out_dir = p - out_dir.mkdir(parents=True, exist_ok=True) - out_file = out_dir / base_name - else: - parent = p.parent - parent.mkdir(parents=True, exist_ok=True) - if effort_tag: - out_file = parent / f"{p.stem}__{sanitize_filename(effort_tag)}{p.suffix}" - else: - out_file = p - with open(out_file, "w", encoding="utf-8") as f: - _json.dump(summary_obj, f) - except Exception: - pass - - if passed_threshold is not None and not passed: - assert agg_score >= passed_threshold.success, ( - f"Aggregated score {agg_score:.3f} below threshold {passed_threshold}" - ) - - return {"summary": summary_obj, "results": all_results} - except Exception: - # Mark errors on rows - if eval_metadata is not None: - eval_metadata.status = "error" - eval_metadata.passed = False - for r in data or []: - if r.eval_metadata is not None: - r.eval_metadata.status = "error" - r.eval_metadata.passed = False - default_logger.log(r) - raise From 00cb2ec27b8f7723d51e96d8b4a2a1dbdfe609b8 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Mon, 18 Aug 2025 17:25:51 -0700 Subject: [PATCH 2/3] update --- eval_protocol/benchmarks/test_tau_bench_retail.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eval_protocol/benchmarks/test_tau_bench_retail.py b/eval_protocol/benchmarks/test_tau_bench_retail.py index 7ee7b7bd..a47d1520 100644 --- a/eval_protocol/benchmarks/test_tau_bench_retail.py +++ b/eval_protocol/benchmarks/test_tau_bench_retail.py @@ -30,12 +30,12 @@ def _get_retail_dataset_path() -> str: """Get the retail dataset file path.""" - return str(Path(__file__).parent.parent.parent.parent / "tests" / "pytest" / "data" / "retail_dataset.jsonl") + return str(Path(__file__).parent.parent.parent / "tests" / "pytest" / "data" / "retail_dataset.jsonl") def _get_server_script_path() -> str: """Get the tau2 mcp server script path.""" - return str(Path(__file__).parent.parent.parent.parent / "examples" / "tau2_mcp" / "server.py") + return str(Path(__file__).parent.parent.parent / "examples" / "tau2_mcp" / "server.py") def tau_bench_retail_to_evaluation_row(data: List[Dict[str, Any]]) -> List[EvaluationRow]: @@ -43,7 +43,7 @@ def tau_bench_retail_to_evaluation_row(data: List[Dict[str, Any]]) -> List[Evalu Convert entries from retail dataset to EvaluationRow objects. """ rows = [] - test_dir = Path(__file__).parent.parent.parent.parent / "examples" / "tau2_mcp" / "tests" + test_dir = Path(__file__).parent.parent.parent / "examples" / "tau2_mcp" / "tests" # Load system prompt from file so we can change it in one place domain = data[0]["environment_context"]["domain"] From 090f7065bd0fea6f84bfab352fbcb6c81e4bded4 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Mon, 18 Aug 2025 17:34:36 -0700 Subject: [PATCH 3/3] ignoring benchmarks --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8c1b0691..e5a3446f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,6 +99,7 @@ jobs: --ignore=tests/test_tau_bench_airline_smoke.py \ --ignore=tests/pytest/test_svgbench.py \ --ignore=tests/pytest/test_livesvgbench.py \ + --ignore=eval_protocol/benchmarks/ \ --cov=eval_protocol --cov-append --cov-report=xml --cov-report=term-missing -v --durations=10 - name: Store coverage file