diff --git a/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py b/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py index d21973a3f..51cd68e05 100644 --- a/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py +++ b/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py @@ -116,6 +116,10 @@ def start( spurious duplicate ``STARTED`` event or a buggy replay producer) is a no-op rather than orphaning the original task. The original task remains the one cancelled by ``publish_final`` / ``aclose``. + + If ``publish_interval_s <= 0``, live publishing is disabled and + the tick task is not created. Final snapshots will still be + published via ``publish_final``. """ if self._tick_task is not None: logger.warning( @@ -125,9 +129,12 @@ def start( ) return if publish_interval_s <= 0: - raise ValueError( - f"publish_interval_s must be positive, got {publish_interval_s}" + logger.info( + "Live metrics publishing disabled " + "(publish_interval_s=%s, skipping tick task)", + publish_interval_s, ) + return async def _tick() -> None: while True: diff --git a/src/inference_endpoint/commands/benchmark/execute.py b/src/inference_endpoint/commands/benchmark/execute.py index 0f8ae5743..42c964465 100644 --- a/src/inference_endpoint/commands/benchmark/execute.py +++ b/src/inference_endpoint/commands/benchmark/execute.py @@ -730,6 +730,10 @@ async def _run_benchmark_async( str(config.settings.drain.metrics_tokenizer_workers), ] ) + # Control live metrics publishing via config parameter + # publish_interval_s = 0 disables live tick task in publisher.start() + publish_interval_s = 0.25 if ctx.rt_settings.enable_live_metrics else 0.0 + aggregator_args.extend(["--publish-interval", str(publish_interval_s)]) # EventLoggerService writes events.jsonl to tmpfs (high-frequency writes) event_logger_args: list[str] = [ diff --git a/src/inference_endpoint/config/runtime_settings.py b/src/inference_endpoint/config/runtime_settings.py index 5067c78a1..953ee4b93 100644 --- a/src/inference_endpoint/config/runtime_settings.py +++ b/src/inference_endpoint/config/runtime_settings.py @@ -85,6 +85,9 @@ class RuntimeSettings: load_pattern: LoadPattern | None """Load pattern configuration""" + enable_live_metrics: bool = True + """Enable live metrics publishing (periodic snapshots)""" + @classmethod def from_config( cls, @@ -162,6 +165,7 @@ def _from_config_default( "rng_sched": random.Random(runtime_cfg.scheduler_random_seed), "rng_sample_index": random.Random(runtime_cfg.dataloader_random_seed), "load_pattern": load_pattern_cfg, + "enable_live_metrics": runtime_cfg.enable_live_metrics, } # Apply overrides diff --git a/src/inference_endpoint/config/schema.py b/src/inference_endpoint/config/schema.py index 6fe50b875..bd50e5e83 100644 --- a/src/inference_endpoint/config/schema.py +++ b/src/inference_endpoint/config/schema.py @@ -431,6 +431,14 @@ def _parse_duration_suffix(cls, v: object) -> object: ] = Field(None, gt=0) scheduler_random_seed: int = Field(42, description="Scheduler RNG seed") dataloader_random_seed: int = Field(42, description="Dataloader RNG seed") + enable_live_metrics: Annotated[ + bool, + cyclopts.Parameter( + alias="--enable-live-metrics", + negative="--no-live-metrics", + help="Enable live metrics publishing (periodic snapshots); disable for endpoints running on CPUs", + ), + ] = Field(True, description="Enable live metrics publishing (periodic snapshots)") @model_validator(mode="after") def _validate_durations(self) -> Self: diff --git a/src/inference_endpoint/config/templates/concurrency_template_full.yaml b/src/inference_endpoint/config/templates/concurrency_template_full.yaml index 382e3cc00..0f9b13795 100644 --- a/src/inference_endpoint/config/templates/concurrency_template_full.yaml +++ b/src/inference_endpoint/config/templates/concurrency_template_full.yaml @@ -49,6 +49,7 @@ settings: n_samples_to_issue: null # Sample count override scheduler_random_seed: 42 # Scheduler RNG seed dataloader_random_seed: 42 # Dataloader RNG seed + enable_live_metrics: true # Enable live metrics publishing (periodic snapshots) load_pattern: type: concurrency # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step target_qps: null # Target QPS diff --git a/src/inference_endpoint/config/templates/offline_template_full.yaml b/src/inference_endpoint/config/templates/offline_template_full.yaml index 6a2998812..2396afb88 100644 --- a/src/inference_endpoint/config/templates/offline_template_full.yaml +++ b/src/inference_endpoint/config/templates/offline_template_full.yaml @@ -49,6 +49,7 @@ settings: n_samples_to_issue: null # Sample count override scheduler_random_seed: 42 # Scheduler RNG seed dataloader_random_seed: 42 # Dataloader RNG seed + enable_live_metrics: true # Enable live metrics publishing (periodic snapshots) load_pattern: type: max_throughput # Load pattern type | offline only: max_throughput target_qps: null # Target QPS diff --git a/src/inference_endpoint/config/templates/online_template_full.yaml b/src/inference_endpoint/config/templates/online_template_full.yaml index 27df614de..5f6169a4e 100644 --- a/src/inference_endpoint/config/templates/online_template_full.yaml +++ b/src/inference_endpoint/config/templates/online_template_full.yaml @@ -49,6 +49,7 @@ settings: n_samples_to_issue: null # Sample count override scheduler_random_seed: 42 # Scheduler RNG seed dataloader_random_seed: 42 # Dataloader RNG seed + enable_live_metrics: true # Enable live metrics publishing (periodic snapshots) load_pattern: type: poisson # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step target_qps: 10.0 # Target QPS diff --git a/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py b/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py index 9e26f734a..6a4dfb5fd 100644 --- a/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py +++ b/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py @@ -221,6 +221,46 @@ async def test_publish_final_awaits_tick_task_cancellation( finally: publisher.close() + @pytest.mark.asyncio + async def test_disabled_interval_skips_tick_task_and_publish_final_works( + self, tmp_path: Path, zmq_ctx_scope: ManagedZMQContext + ): + """``publish_interval_s <= 0`` must not create a tick task, and + ``publish_final`` must still write the JSON snapshot correctly.""" + loop = asyncio.get_event_loop() + target = tmp_path / "final_snapshot.json" + publisher = MetricsPublisher( + MetricsSnapshotCodec(), + zmq_ctx_scope, + "test_pub_disabled_interval", + loop, + final_snapshot_path=target, + ) + try: + registry = MetricsRegistry() + registry.register_counter("c") + registry.increment("c", 2) + + publisher.start( + registry, + publish_interval_s=0, + get_runtime_state=lambda: (SessionState.LIVE, 0), + ) + + # No tick task should have been created. + assert publisher._tick_task is None + + await publisher.publish_final(registry, n_pending_tasks=0) + + assert ( + target.exists() + ), "final snapshot must be written even when live ticks are disabled" + decoded = json.loads(target.read_bytes()) + assert decoded["state"] == SessionState.COMPLETE.value + assert decoded["n_pending_tasks"] == 0 + finally: + publisher.close() + @pytest.mark.asyncio async def test_close_cancels_tick_task( self, tmp_path: Path, zmq_ctx_scope: ManagedZMQContext