diff --git a/scripts/regenerate_templates.py b/scripts/regenerate_templates.py index d74b384af..6cb5c4358 100644 --- a/scripts/regenerate_templates.py +++ b/scripts/regenerate_templates.py @@ -280,9 +280,9 @@ def _add_comments(text: str, comments: dict[str, str]) -> str: for key, comment in sorted(comments.items(), key=lambda x: -len(x[0])): text = re.sub( rf"^(\s*{re.escape(key)}.*)$", - lambda m, c=comment: m.group(0) - if "#" in m.group(0) - else f"{m.group(0)} {c}", + lambda m, c=comment: ( + m.group(0) if "#" in m.group(0) else f"{m.group(0)} {c}" + ), text, count=0, flags=re.MULTILINE, @@ -326,6 +326,15 @@ def _build_full(model_cls: type[BenchmarkConfig], overrides: dict) -> dict: if overrides: data = _deep_merge(data, overrides) + # Mirror LoadPattern's model_serializer: use_legacy_loadgen_qps_metrics + # applies only to poisson, so drop it from other patterns' templates. + # TODO(vir): remove this prune when use_legacy_loadgen_qps_metrics is removed. + settings = data.get("settings") + if isinstance(settings, dict): + load_pattern = settings.get("load_pattern") + if isinstance(load_pattern, dict) and load_pattern.get("type") != "poisson": + load_pattern.pop("use_legacy_loadgen_qps_metrics", None) + # Resolve streaming AUTO → off/on (mirrors schema validator) test_type = data.get("type") mp = data.get("model_params", {}) diff --git a/src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py b/src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py index f01c9753c..728a51db9 100644 --- a/src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py +++ b/src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py @@ -70,6 +70,8 @@ class MetricCounterKey(str, Enum): # tracked row still exists when the aggregator sees the ERROR. TRACKED_SAMPLES_FAILED = "tracked_samples_failed" TRACKED_DURATION_NS = "tracked_duration_ns" + # Legacy MLPerf LoadGen Server "completed" window (final_query_all_samples_done_time). + LEGACY_LOADGEN_WINDOW_DURATION_NS = "legacy_loadgen_window_duration_ns" # Total wall-clock duration since session start. Updated on every event as # max(current, event_timestamp - session_start). Stored as a counter # rather than computed from (now - start) at read time because @@ -320,6 +322,10 @@ async def process(self, records: list[EventRecord]) -> None: MetricCounterKey.TRACKED_DURATION_NS.value, table.total_tracked_duration_ns, ) + registry.set_counter( + MetricCounterKey.LEGACY_LOADGEN_WINDOW_DURATION_NS.value, + table.total_loadgen_window_ns, + ) logger.debug("Session event: %s", ev) continue @@ -392,6 +398,10 @@ async def process(self, records: list[EventRecord]) -> None: MetricCounterKey.TRACKED_DURATION_NS.value, table.total_tracked_duration_ns, ) + registry.set_counter( + MetricCounterKey.LEGACY_LOADGEN_WINDOW_DURATION_NS.value, + table.total_loadgen_window_ns, + ) try: await self._publisher.publish_final(registry, n_pending_tasks=n_pending) finally: diff --git a/src/inference_endpoint/async_utils/services/metrics_aggregator/metrics_table.py b/src/inference_endpoint/async_utils/services/metrics_aggregator/metrics_table.py index 46a17e92f..1efd7ca2e 100644 --- a/src/inference_endpoint/async_utils/services/metrics_aggregator/metrics_table.py +++ b/src/inference_endpoint/async_utils/services/metrics_aggregator/metrics_table.py @@ -451,6 +451,13 @@ def __init__(self, registry: MetricsRegistry) -> None: self.session_started_ns: int | None = None self.tracked_blocks: list[TrackedBlock] = [] + # LoadGen window anchors: start at the FIRST issued tracked request + # (LoadGen t=0), end at the completion of the last-issued request that + # completed. See total_loadgen_window_ns. + self._loadgen_max_issued_ns: int = -1 + self._loadgen_window_end_ns: int | None = None + self._loadgen_window_start_ns: int | None = None + # --- Trigger registration --- def add_trigger(self, field_name: str, trigger: EmitTrigger) -> None: @@ -501,6 +508,16 @@ def total_completed_tracked_samples(self) -> int: """Total samples completed across all tracking blocks.""" return sum(b.completed_samples for b in self.tracked_blocks) + @property + def total_loadgen_window_ns(self) -> int: + """Window from the first issued tracked request (LoadGen t=0) to the + completion of the last-issued request that completed — the legacy MLPerf + LoadGen ``final_query_all_samples_done_time`` analog. Returns 0 (=> + the legacy metric falls back to native) when no tracked request completed.""" + if self._loadgen_window_end_ns is None or self._loadgen_window_start_ns is None: + return 0 + return max(0, self._loadgen_window_end_ns - self._loadgen_window_start_ns) + # --- Field updates --- def set_field( @@ -526,6 +543,8 @@ def set_field( return row = self._create_row(sample_uuid) row.tracked_block_idx = len(self.tracked_blocks) - 1 + if self._loadgen_window_start_ns is None: + self._loadgen_window_start_ns = value else: row = self._in_flight.get(sample_uuid) if row is None: @@ -608,3 +627,8 @@ def _update_tracked_block(self, row: SampleRow, complete_ns: int) -> None: if complete_ns > block.last_complete_ns: block.last_complete_ns = complete_ns block.completed_samples += 1 + # End the legacy LoadGen window at the completion of the last-issued + # (largest issued_ns) request that completed. + if row.issued_ns is not None and row.issued_ns > self._loadgen_max_issued_ns: + self._loadgen_max_issued_ns = row.issued_ns + self._loadgen_window_end_ns = complete_ns diff --git a/src/inference_endpoint/commands/benchmark/execute.py b/src/inference_endpoint/commands/benchmark/execute.py index 31b6cd53b..267b61b89 100644 --- a/src/inference_endpoint/commands/benchmark/execute.py +++ b/src/inference_endpoint/commands/benchmark/execute.py @@ -804,6 +804,7 @@ def _on_phase_start(phase: PhaseConfig) -> None: try: runtime = ctx.config.settings.runtime warmup = ctx.config.settings.warmup + load_pattern = ctx.config.settings.load_pattern report = Report.from_snapshot( snap_dict, seeds={ @@ -811,6 +812,10 @@ def _on_phase_start(phase: PhaseConfig) -> None: "dataloader_random_seed": runtime.dataloader_random_seed, "warmup_random_seed": warmup.warmup_random_seed, }, + use_legacy_loadgen_qps_metrics=( + load_pattern.type == LoadPatternType.POISSON + and load_pattern.use_legacy_loadgen_qps_metrics + ), ) if not report.complete: logger.warning( @@ -818,6 +823,14 @@ def _on_phase_start(phase: PhaseConfig) -> None: report.state, snap_dict.get("n_pending_tasks", 0), ) + if report.legacy_loadgen_window_duration_ns is not None: + logger.warning( + "Reporting QPS/TPS with the legacy MLPerf LoadGen Server " + "'completed' definition (deprecated; to be removed once a " + "formal tail-cutting mechanism lands). Pass " + "--no-use-legacy-loadgen-qps-metrics for endpoints-native " + "metrics." + ) except Exception as e: # noqa: BLE001 — best-effort report build. logger.warning(f"Failed to build report from snapshot: {e}") diff --git a/src/inference_endpoint/config/schema.py b/src/inference_endpoint/config/schema.py index 9226d7f85..751d8e477 100644 --- a/src/inference_endpoint/config/schema.py +++ b/src/inference_endpoint/config/schema.py @@ -34,9 +34,11 @@ ConfigDict, Discriminator, Field, + SerializerFunctionWrapHandler, Tag, TypeAdapter, field_validator, + model_serializer, model_validator, ) @@ -466,6 +468,31 @@ class LoadPattern(BaseModel): cyclopts.Parameter(alias="--concurrency", help="Concurrent requests"), ] = Field(None, gt=0) + # TODO(vir): remove once the formal tail-cutting mechanism lands. + use_legacy_loadgen_qps_metrics: Annotated[ + bool, + cyclopts.Parameter( + negative="--no-use-legacy-loadgen-qps-metrics", + help=( + "Only applies to the poisson load pattern. Report QPS/TPS using " + "the legacy MLPerf LoadGen Server 'completed' definition — (completed-1)/T " + "and tokens/T, T = first issued request to completion of the " + "last-issued request (see mlcommons/inference loadgen/results.cc). " + "--no-... uses endpoints-native completed/duration. Ignored for " + "non-poisson patterns." + ), + ), + ] = True + + @model_serializer(mode="wrap") + def _serialize(self, handler: SerializerFunctionWrapHandler) -> dict[str, Any]: + # use_legacy_loadgen_qps_metrics only applies to poisson; drop it from + # the serialized form (and thus YAML templates) for other patterns. + data = handler(self) + if self.type != LoadPatternType.POISSON: + data.pop("use_legacy_loadgen_qps_metrics", None) + return data + @model_validator(mode="after") def _validate_completeness(self) -> Self: if self.type == LoadPatternType.POISSON and ( diff --git a/src/inference_endpoint/config/templates/online_template_full.yaml b/src/inference_endpoint/config/templates/online_template_full.yaml index 5bea95329..baeff0047 100644 --- a/src/inference_endpoint/config/templates/online_template_full.yaml +++ b/src/inference_endpoint/config/templates/online_template_full.yaml @@ -53,6 +53,7 @@ settings: type: poisson # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step target_qps: 10.0 # Target QPS target_concurrency: null # Concurrent requests + use_legacy_loadgen_qps_metrics: true # Only applies to the poisson load pattern. Report QPS/TPS using the legacy MLPerf LoadGen Server 'completed' definition — (completed-1)/T and tokens/T, T = first issued request to completion of the last-issued request (see mlcommons/inference loadgen/results.cc). --no-... uses endpoints-native completed/duration. Ignored for non-poisson patterns. client: num_workers: -1 # Worker processes (-1=auto) log_level: INFO # Worker log level diff --git a/src/inference_endpoint/metrics/report.py b/src/inference_endpoint/metrics/report.py index 10dc6a631..88a650442 100644 --- a/src/inference_endpoint/metrics/report.py +++ b/src/inference_endpoint/metrics/report.py @@ -130,6 +130,14 @@ class Report(msgspec.Struct, frozen=True): # type: ignore[call-arg] tpot: dict[str, Any] latency: dict[str, Any] output_sequence_lengths: dict[str, Any] + # Legacy MLPerf LoadGen Server "completed" window (poisson only): first + # issued request -> completion of the last-issued request + # (final_query_all_samples_done_time analog; see mlcommons/inference + # loadgen/results.cc). Not None iff QPS/TPS were computed over this window; + # None means the endpoints-native full-run window was used. Recorded so + # result_summary.json is self-describing about which view it holds. + # TODO(vir): deprecate once endpoints has a formal tail-cutting mechanism. + legacy_loadgen_window_duration_ns: int | None = None # Derived throughput, computed once in from_snapshot so the serialized # report (result_summary.json) is self-complete. qps is None without a @@ -146,7 +154,11 @@ class Report(msgspec.Struct, frozen=True): # type: ignore[call-arg] @classmethod def from_snapshot( - cls, snap: dict[str, Any], *, seeds: dict[str, int] | None = None + cls, + snap: dict[str, Any], + *, + seeds: dict[str, int] | None = None, + use_legacy_loadgen_qps_metrics: bool = True, ) -> Report: """Build a Report from a snapshot dict. @@ -169,6 +181,13 @@ def from_snapshot( honest "incomplete" report on missing fields instead of crashing: missing ``state`` defaults to ``"interrupted"`` (worst-case), missing counters / series to zero / empty. + + The snapshot always carries BOTH ``tracked_duration_ns`` and + ``legacy_loadgen_window_duration_ns``, so it stays config-agnostic and + fully reinterpretable either way. Which window the reported QPS/TPS use + is decided by the run config (``use_legacy_loadgen_qps_metrics``, + recorded in ``config.yaml`` and in this Report's serialized JSON), not + by the snapshot. """ counters: dict[str, int | float] = {} series: dict[str, dict[str, Any]] = {} @@ -197,13 +216,41 @@ def _series_dict(key: str) -> dict[str, Any]: n_completed = _counter("tracked_samples_completed") osl = _series_dict("osl") - # Derived throughput. qps needs a duration; tps additionally needs OSL. - if duration_ns is None: - qps = tps = None + # Legacy MLPerf LoadGen Server "completed" window (poisson only): first + # issued request -> completion of the last-issued request + # (final_query_all_samples_done_time analog; see mlcommons/inference + # loadgen/results.cc). + # TODO(vir): deprecate once endpoints has a formal tail-cutting mechanism. + raw_loadgen_window_ns = _counter("legacy_loadgen_window_duration_ns") + + # Derived throughput, computed once so result_summary.json is + # self-complete. Legacy LoadGen QPS = (completed-1)/window; otherwise + # native completed/duration. The walrus assigns the window + # unconditionally (first operand of the `and`), so it stays in scope + # for the tps computation and the stored field below — no second + # counter read. + if ( + legacy_loadgen_window_duration_ns := ( + raw_loadgen_window_ns + if (use_legacy_loadgen_qps_metrics and raw_loadgen_window_ns > 0) + else None + ) + ) is not None and n_completed >= 2: + qps = (n_completed - 1) / (legacy_loadgen_window_duration_ns / 1e9) + elif duration_ns is not None: + qps = n_completed / (duration_ns / 1e9) else: - duration_s = duration_ns / 1e9 - qps = n_completed / duration_s - tps = (osl.get("total", 0) / duration_s) if osl else None + qps = None + tps_window_ns = ( + legacy_loadgen_window_duration_ns + if legacy_loadgen_window_duration_ns is not None + else duration_ns + ) + tps = ( + (osl.get("total", 0) / (tps_window_ns / 1e9)) + if (tps_window_ns and osl) + else None + ) # Default missing state to "interrupted" — a malformed / partial # snapshot dict is treated as worst-case (run did not reach a @@ -226,6 +273,7 @@ def _series_dict(key: str) -> dict[str, Any]: tpot=_series_dict("tpot_ns"), latency=_series_dict("sample_latency_ns"), output_sequence_lengths=osl, + legacy_loadgen_window_duration_ns=legacy_loadgen_window_duration_ns, qps=qps, tps=tps, seeds=seeds, diff --git a/tests/unit/async_utils/services/metrics_aggregator/test_aggregator.py b/tests/unit/async_utils/services/metrics_aggregator/test_aggregator.py index 9877aee5d..b74c30944 100644 --- a/tests/unit/async_utils/services/metrics_aggregator/test_aggregator.py +++ b/tests/unit/async_utils/services/metrics_aggregator/test_aggregator.py @@ -725,6 +725,50 @@ async def test_ended_in_second_batch(self, tmp_path): agg.close() +# --------------------------------------------------------------------------- +# LoadGen window aggregation (end-to-end through the event router) +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestLoadgenWindowAggregation: + @pytest.mark.asyncio + async def test_loadgen_window_duration_emitted(self, tmp_path): + """The aggregator emits ``legacy_loadgen_window_duration_ns`` = first + issue to the completion of the last-issued request. + + Sequence: STARTED, START_PERFORMANCE_TRACKING, ISSUED(s1, t=100), + COMPLETE(s1, t=500), ISSUED(s2, t=200, last-issued), COMPLETE(s2, + t=600), STOP_PERFORMANCE_TRACKING. Window = 600 - 100 = 500. + """ + loop = asyncio.get_event_loop() + with ManagedZMQContext.scoped(socket_dir=str(tmp_path)) as ctx: + agg, registry, _ = make_aggregator(ctx, loop, "agg_loadgen_window") + try: + await agg.process( + [ + session_event(SessionEventType.STARTED, ts=0), + session_event( + SessionEventType.START_PERFORMANCE_TRACKING, ts=10 + ), + sample_event(SampleEventType.ISSUED, "s1", ts=100), + sample_event(SampleEventType.COMPLETE, "s1", ts=500), + sample_event(SampleEventType.ISSUED, "s2", ts=200), + sample_event(SampleEventType.COMPLETE, "s2", ts=600), + session_event( + SessionEventType.STOP_PERFORMANCE_TRACKING, ts=700 + ), + ] + ) + counters = snapshot_counters(registry) + assert ( + counters[MetricCounterKey.LEGACY_LOADGEN_WINDOW_DURATION_NS.value] + == 600 - 100 + ) + finally: + agg.close() + + # --------------------------------------------------------------------------- # Counter accounting (issued / completed) # --------------------------------------------------------------------------- diff --git a/tests/unit/async_utils/services/metrics_aggregator/test_metrics_table.py b/tests/unit/async_utils/services/metrics_aggregator/test_metrics_table.py index 077923ff8..0933205dc 100644 --- a/tests/unit/async_utils/services/metrics_aggregator/test_metrics_table.py +++ b/tests/unit/async_utils/services/metrics_aggregator/test_metrics_table.py @@ -447,3 +447,75 @@ async def test_tpot_uses_tool_call_deltas_after_first_chunk(self): await task assert snapshot_series_total(registry, "tpot_ns") == pytest.approx(2000.0) + + +def _start_tracking(table: MetricsTable, start_ns: int = 0) -> None: + """Open a real tracking block at ``start_ns`` (STARTED + START).""" + table.handle_session_event( + EventRecord(event_type=SessionEventType.STARTED, timestamp_ns=start_ns) + ) + table.handle_session_event( + EventRecord( + event_type=SessionEventType.START_PERFORMANCE_TRACKING, + timestamp_ns=start_ns, + ) + ) + + +def _issue(table: MetricsTable, uuid: str, ts: int) -> None: + table.set_field( + uuid, + "issued_ns", + ts, + EventRecord( + event_type=SampleEventType.ISSUED, timestamp_ns=ts, sample_uuid=uuid + ), + ) + + +def _complete(table: MetricsTable, uuid: str, ts: int) -> None: + table.set_field( + uuid, + "complete_ns", + ts, + EventRecord( + event_type=SampleEventType.COMPLETE, timestamp_ns=ts, sample_uuid=uuid + ), + ) + + +@pytest.mark.unit +class TestLoadgenWindow: + """``MetricsTable.total_loadgen_window_ns`` — the legacy MLPerf LoadGen Server + ``final_query_all_samples_done_time`` analog: from the first issued + tracked request to the completion of the last-issued request that + completed. + """ + + def test_first_issue_to_last_issued_completion(self): + """s1 issued early (t=100) completes late (t=1000); s2 issued late + (t=200) completes early (t=300). s2 has the largest issued_ns, so the + window ends at s2's completion (300) and starts at the first issue + (100): window = 300 - 100. + """ + table = _new_table() + _start_tracking(table, start_ns=50) + _issue(table, "s1", 100) + _issue(table, "s2", 200) + _complete(table, "s2", 300) + _complete(table, "s1", 1000) + + # Start at first issue (100), not block start (50); end at the + # last-issued (s2) completion (300). + assert table.total_loadgen_window_ns == 300 - 100 + + def test_no_completions_yields_zero(self): + """If no tracked request ever COMPLETEs, the window end is never set → + total_loadgen_window_ns == 0. + """ + table = _new_table() + _start_tracking(table, start_ns=0) + _issue(table, "s1", 100) + # s1 is issued but never completes. + + assert table.total_loadgen_window_ns == 0 diff --git a/tests/unit/commands/test_benchmark.py b/tests/unit/commands/test_benchmark.py index 1c90554fb..07c542668 100644 --- a/tests/unit/commands/test_benchmark.py +++ b/tests/unit/commands/test_benchmark.py @@ -25,6 +25,7 @@ import pandas as pd import pytest from inference_endpoint.commands.benchmark.cli import ( + benchmark_app, from_config, offline, online, @@ -269,6 +270,49 @@ def test_command_handler( assert called_config.datasets[0].type == expected_dtype assert called_mode == mode + @pytest.mark.unit + def test_use_legacy_loadgen_qps_metrics_default_and_disable(self): + """LoadPattern flag defaults True; --no-use-legacy-loadgen-qps-metrics + sets False (poisson only). + """ + base = [ + "online", + "--endpoints", + "http://h:80", + "--model", + "m", + "--dataset", + "d.jsonl", + "--load-pattern", + "poisson", + "--target-qps", + "100", + ] + _, bound, _ = benchmark_app.parse_args(base, exit_on_error=False) + lp = bound.arguments["config"].settings.load_pattern + assert lp.use_legacy_loadgen_qps_metrics is True + + _, bound, _ = benchmark_app.parse_args( + [*base, "--no-use-legacy-loadgen-qps-metrics"], exit_on_error=False + ) + lp = bound.arguments["config"].settings.load_pattern + assert lp.use_legacy_loadgen_qps_metrics is False + + @pytest.mark.unit + def test_loadgen_flag_serialized_only_for_poisson(self): + """``use_legacy_loadgen_qps_metrics`` is dropped from the serialized + form for non-poisson patterns (so it does not pollute their YAML + templates), and present for poisson. + """ + poisson = LoadPattern(type=LoadPatternType.POISSON, target_qps=100) + assert "use_legacy_loadgen_qps_metrics" in poisson.model_dump() + + for lp in ( + LoadPattern(type=LoadPatternType.MAX_THROUGHPUT), + LoadPattern(type=LoadPatternType.CONCURRENCY, target_concurrency=10), + ): + assert "use_legacy_loadgen_qps_metrics" not in lp.model_dump() + @pytest.mark.unit @patch("inference_endpoint.commands.benchmark.cli.run_benchmark") def test_from_config_handler(self, mock_run, tmp_path): diff --git a/tests/unit/metrics/test_report_builder.py b/tests/unit/metrics/test_report_builder.py index 682e9624f..a3ccba153 100644 --- a/tests/unit/metrics/test_report_builder.py +++ b/tests/unit/metrics/test_report_builder.py @@ -107,11 +107,19 @@ def _make_registry(n_samples: int = 50) -> MetricsRegistry: return registry +def _set_loadgen_window(registry: MetricsRegistry, *, duration_ns: int) -> None: + """Populate the legacy LoadGen window counter a loadgen-view Report reads.""" + registry.set_counter( + MetricCounterKey.LEGACY_LOADGEN_WINDOW_DURATION_NS.value, duration_ns + ) + + def _build_report( registry: MetricsRegistry, *, state: SessionState = SessionState.COMPLETE, n_pending_tasks: int = 0, + use_legacy_loadgen_qps_metrics: bool = True, ) -> Report: """Build a Report from a snapshot dict (matches the consumer contract). @@ -122,7 +130,10 @@ def _build_report( does (loaded JSON file → Report). """ snap = registry.build_snapshot(state=state, n_pending_tasks=n_pending_tasks) - return Report.from_snapshot(snapshot_to_dict(snap)) + return Report.from_snapshot( + snapshot_to_dict(snap), + use_legacy_loadgen_qps_metrics=use_legacy_loadgen_qps_metrics, + ) # --------------------------------------------------------------------------- @@ -151,7 +162,9 @@ def test_empty_registry(self): def test_with_metrics(self): registry = _make_registry(n_samples=50) - report = _build_report(registry) + # Native view (completed / tracked_duration) so QPS/TPS are computable + # from tracked_duration_ns alone (no loadgen window counter set here). + report = _build_report(registry, use_legacy_loadgen_qps_metrics=False) assert report.n_samples_issued == 50 assert report.n_samples_completed == 50 @@ -491,6 +504,64 @@ def test_display_handles_scrubbed_nan_percentiles(self): assert "N/A" in output +# --------------------------------------------------------------------------- +# use_legacy_loadgen_qps_metrics: legacy MLPerf LoadGen "completed" (default) vs +# endpoints' native throughput, with native fallback when the window is +# unavailable. +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestLoadgenQpsMetrics: + def test_default_uses_loadgen_window(self): + """Default: QPS = (completed-1)/W, TPS = tokens/W, where W is the + legacy LoadGen window. + """ + registry = _make_registry(n_samples=50) + _set_loadgen_window(registry, duration_ns=8_000_000_000) + report = _build_report(registry) + assert report.legacy_loadgen_window_duration_ns == 8_000_000_000 + # (50 - 1) / 8 s + assert report.qps == pytest.approx(49 / 8.0) + total = report.output_sequence_lengths["total"] + assert report.tps == pytest.approx(total / 8.0) + + def test_disabled_uses_native(self): + """--no-use-legacy-loadgen-qps-metrics → native completed/duration and + tokens/duration, ignoring the legacy LoadGen window. + """ + registry = _make_registry(n_samples=50) + _set_loadgen_window(registry, duration_ns=8_000_000_000) + report = _build_report(registry, use_legacy_loadgen_qps_metrics=False) + # Native view selected → window not recorded on the report. + assert report.legacy_loadgen_window_duration_ns is None + # Native: 50 / 10 s. + assert report.qps == pytest.approx(5.0) + total = report.output_sequence_lengths["total"] + assert report.tps == pytest.approx(total / 10.0) + + def test_falls_back_to_native_when_window_unavailable(self): + """loadgen with absent/zero window → native fallback (not None), so the + default never silently drops the headline. + """ + registry = _make_registry(n_samples=50) # no window counter set + report = _build_report(registry) + assert report.legacy_loadgen_window_duration_ns is None + assert report.qps == pytest.approx(5.0) + total = report.output_sequence_lengths["total"] + assert report.tps == pytest.approx(total / 10.0) + + def test_loadgen_qps_falls_back_when_completed_lt_2(self): + """Fewer than 2 completions → native QPS (the (completed-1)/W form is + undefined for a single sample). + """ + registry = _make_registry(n_samples=1) + _set_loadgen_window(registry, duration_ns=1_000_000_000) + report = _build_report(registry) + # Native QPS = 1 / 10 s (tracked_duration set to 10s by _make_registry). + assert report.qps == pytest.approx(0.1) + + @pytest.mark.unit def test_scrub_nonfinite_round_trip_yields_none(): """End-to-end: a registry that records a non-finite series value