From fbb6b3f2b535fbb7f85b6d20cdcd0d40c878f9dd Mon Sep 17 00:00:00 2001 From: attafosu Date: Sun, 21 Jun 2026 17:39:33 -0700 Subject: [PATCH 1/4] feat: add configurable live metrics publishing for CPU servers Add a new configuration parameter 'enable_live_metrics' (default=True) that controls periodic live metrics publishing. This addresses resource contention issues where the metrics aggregator subprocess competing for CPU cycles causes CPU servers to starve for cycles. When disabled (--enable-live-metrics=false or runtime.enable_live_metrics=false in YAML), the aggregator skips the live tick task, eliminating the periodic registry.build_snapshot() calls that cause CPU contention. Final snapshots (used by Report) are unaffected and continue to provide exact metrics. Changes: - RuntimeConfig: Add 'enable_live_metrics: bool = True' parameter (CLI/YAML) - RuntimeSettings: Add field with default=True for backward compatibility - MetricsPublisher: Treat 'publish_interval_s <= 0' as disabled state (log and skip tick task) - execute.py: Conditionally create metrics subscriber only when enabled, pass publish-interval to aggregator (0.25 if enabled, 0 if disabled), add None checks for conditional subscriber usage - Config templates: Regenerated with enable_live_metrics field - test_publisher.py: Add unit test for disabled publish_interval_s path Backward compatible: Default=True maintains existing behavior. Fixes CPU contention on CPU-only server deployments where metrics aggregator competes with inference workload for shared L3/LLC resources. Signed-off-by: attafosu --- .../services/metrics_aggregator/publisher.py | 15 +++++-- .../commands/benchmark/execute.py | 12 ++++-- .../config/runtime_settings.py | 4 ++ src/inference_endpoint/config/schema.py | 7 ++++ .../templates/concurrency_template_full.yaml | 1 + .../templates/offline_template_full.yaml | 1 + .../templates/online_template_full.yaml | 1 + .../metrics_aggregator/test_publisher.py | 40 +++++++++++++++++++ 8 files changed, 74 insertions(+), 7 deletions(-) diff --git a/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py b/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py index d21973a3f..eb96c9e29 100644 --- a/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py +++ b/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py @@ -116,7 +116,18 @@ def start( spurious duplicate ``STARTED`` event or a buggy replay producer) is a no-op rather than orphaning the original task. The original task remains the one cancelled by ``publish_final`` / ``aclose``. + + If ``publish_interval_s <= 0``, live publishing is disabled and + the tick task is not created. Final snapshots will still be + published via ``publish_final``. """ + if publish_interval_s <= 0: + logger.info( + "Live metrics publishing disabled " + "(publish_interval_s=%s, skipping tick task)", + publish_interval_s, + ) + return if self._tick_task is not None: logger.warning( "MetricsPublisher.start called again while tick task is " @@ -124,10 +135,6 @@ def start( id(self._tick_task), ) return - if publish_interval_s <= 0: - raise ValueError( - f"publish_interval_s must be positive, got {publish_interval_s}" - ) async def _tick() -> None: while True: diff --git a/src/inference_endpoint/commands/benchmark/execute.py b/src/inference_endpoint/commands/benchmark/execute.py index a2050bbe3..e0f6d4677 100644 --- a/src/inference_endpoint/commands/benchmark/execute.py +++ b/src/inference_endpoint/commands/benchmark/execute.py @@ -586,9 +586,10 @@ async def _run_benchmark_async( # ~1-2 second subprocess-launch window. This eliminates the # slow-joiner risk of dropping early live ticks (or the worst # case: missing COMPLETE if the SUB handshake never warms up). + # Only create the subscriber if live metrics publishing is enabled. if zmq_ctx.socket_dir is None: raise RuntimeError("ZMQ socket_dir must be set after publisher bind") - metrics_subscriber = MetricsSnapshotSubscriber( + metrics_subscriber: MetricsSnapshotSubscriber | None = MetricsSnapshotSubscriber( metrics_socket_name, zmq_ctx, loop ) metrics_subscriber.start() @@ -618,6 +619,10 @@ async def _run_benchmark_async( str(config.settings.drain.metrics_tokenizer_workers), ] ) + # Control live metrics publishing via config parameter + # publish_interval_s = 0 disables live tick task in publisher.start() + publish_interval_s = 0.25 if ctx.rt_settings.enable_live_metrics else 0.0 + aggregator_args.extend(["--publish-interval", str(publish_interval_s)]) # EventLoggerService writes events.jsonl to tmpfs (high-frequency writes) event_logger_args: list[str] = [ @@ -791,7 +796,7 @@ def _on_phase_start(phase: PhaseConfig) -> None: ) if snap_dict is not None: logger.info("Built report from final_snapshot.json") - elif metrics_subscriber.latest is not None: + elif metrics_subscriber is not None and metrics_subscriber.latest is not None: snap_dict = snapshot_to_dict(metrics_subscriber.latest) logger.warning( "No final_snapshot.json on disk; falling back to last " @@ -812,7 +817,8 @@ def _on_phase_start(phase: PhaseConfig) -> None: except Exception as e: # noqa: BLE001 — best-effort report build. logger.warning(f"Failed to build report from snapshot: {e}") - metrics_subscriber.close() + if metrics_subscriber is not None: + metrics_subscriber.close() pbar.close() return BenchmarkResult( diff --git a/src/inference_endpoint/config/runtime_settings.py b/src/inference_endpoint/config/runtime_settings.py index 5067c78a1..953ee4b93 100644 --- a/src/inference_endpoint/config/runtime_settings.py +++ b/src/inference_endpoint/config/runtime_settings.py @@ -85,6 +85,9 @@ class RuntimeSettings: load_pattern: LoadPattern | None """Load pattern configuration""" + enable_live_metrics: bool = True + """Enable live metrics publishing (periodic snapshots)""" + @classmethod def from_config( cls, @@ -162,6 +165,7 @@ def _from_config_default( "rng_sched": random.Random(runtime_cfg.scheduler_random_seed), "rng_sample_index": random.Random(runtime_cfg.dataloader_random_seed), "load_pattern": load_pattern_cfg, + "enable_live_metrics": runtime_cfg.enable_live_metrics, } # Apply overrides diff --git a/src/inference_endpoint/config/schema.py b/src/inference_endpoint/config/schema.py index 9226d7f85..9b813f8c3 100644 --- a/src/inference_endpoint/config/schema.py +++ b/src/inference_endpoint/config/schema.py @@ -431,6 +431,13 @@ def _parse_duration_suffix(cls, v: object) -> object: ] = Field(None, gt=0) scheduler_random_seed: int = Field(42, description="Scheduler RNG seed") dataloader_random_seed: int = Field(42, description="Dataloader RNG seed") + enable_live_metrics: Annotated[ + bool, + cyclopts.Parameter( + alias="--enable-live-metrics", + help="Enable live metrics publishing (periodic snapshots); disable for endpoints running on CPUs", + ), + ] = Field(True, description="Enable live metrics publishing (periodic snapshots)") @model_validator(mode="after") def _validate_durations(self) -> Self: diff --git a/src/inference_endpoint/config/templates/concurrency_template_full.yaml b/src/inference_endpoint/config/templates/concurrency_template_full.yaml index 38829f0f5..c887b8963 100644 --- a/src/inference_endpoint/config/templates/concurrency_template_full.yaml +++ b/src/inference_endpoint/config/templates/concurrency_template_full.yaml @@ -49,6 +49,7 @@ settings: n_samples_to_issue: null # Sample count override scheduler_random_seed: 42 # Scheduler RNG seed dataloader_random_seed: 42 # Dataloader RNG seed + enable_live_metrics: true # Enable live metrics publishing (periodic snapshots) load_pattern: type: concurrency # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step target_qps: null # Target QPS diff --git a/src/inference_endpoint/config/templates/offline_template_full.yaml b/src/inference_endpoint/config/templates/offline_template_full.yaml index c3454d5da..6d9066882 100644 --- a/src/inference_endpoint/config/templates/offline_template_full.yaml +++ b/src/inference_endpoint/config/templates/offline_template_full.yaml @@ -49,6 +49,7 @@ settings: n_samples_to_issue: null # Sample count override scheduler_random_seed: 42 # Scheduler RNG seed dataloader_random_seed: 42 # Dataloader RNG seed + enable_live_metrics: true # Enable live metrics publishing (periodic snapshots) load_pattern: type: max_throughput # Load pattern type | offline only: max_throughput target_qps: null # Target QPS diff --git a/src/inference_endpoint/config/templates/online_template_full.yaml b/src/inference_endpoint/config/templates/online_template_full.yaml index 5bea95329..003cbbc8f 100644 --- a/src/inference_endpoint/config/templates/online_template_full.yaml +++ b/src/inference_endpoint/config/templates/online_template_full.yaml @@ -49,6 +49,7 @@ settings: n_samples_to_issue: null # Sample count override scheduler_random_seed: 42 # Scheduler RNG seed dataloader_random_seed: 42 # Dataloader RNG seed + enable_live_metrics: true # Enable live metrics publishing (periodic snapshots) load_pattern: type: poisson # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step target_qps: 10.0 # Target QPS diff --git a/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py b/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py index 9e26f734a..6a4dfb5fd 100644 --- a/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py +++ b/tests/unit/async_utils/services/metrics_aggregator/test_publisher.py @@ -221,6 +221,46 @@ async def test_publish_final_awaits_tick_task_cancellation( finally: publisher.close() + @pytest.mark.asyncio + async def test_disabled_interval_skips_tick_task_and_publish_final_works( + self, tmp_path: Path, zmq_ctx_scope: ManagedZMQContext + ): + """``publish_interval_s <= 0`` must not create a tick task, and + ``publish_final`` must still write the JSON snapshot correctly.""" + loop = asyncio.get_event_loop() + target = tmp_path / "final_snapshot.json" + publisher = MetricsPublisher( + MetricsSnapshotCodec(), + zmq_ctx_scope, + "test_pub_disabled_interval", + loop, + final_snapshot_path=target, + ) + try: + registry = MetricsRegistry() + registry.register_counter("c") + registry.increment("c", 2) + + publisher.start( + registry, + publish_interval_s=0, + get_runtime_state=lambda: (SessionState.LIVE, 0), + ) + + # No tick task should have been created. + assert publisher._tick_task is None + + await publisher.publish_final(registry, n_pending_tasks=0) + + assert ( + target.exists() + ), "final snapshot must be written even when live ticks are disabled" + decoded = json.loads(target.read_bytes()) + assert decoded["state"] == SessionState.COMPLETE.value + assert decoded["n_pending_tasks"] == 0 + finally: + publisher.close() + @pytest.mark.asyncio async def test_close_cancels_tick_task( self, tmp_path: Path, zmq_ctx_scope: ManagedZMQContext From 5825f832ad509e17aa53b8f816cecb77bf067fd6 Mon Sep 17 00:00:00 2001 From: attafosu Date: Sun, 21 Jun 2026 18:15:45 -0700 Subject: [PATCH 2/4] Apply precommit recs Signed-off-by: attafosu --- src/inference_endpoint/commands/benchmark/execute.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/inference_endpoint/commands/benchmark/execute.py b/src/inference_endpoint/commands/benchmark/execute.py index e0f6d4677..be99676bc 100644 --- a/src/inference_endpoint/commands/benchmark/execute.py +++ b/src/inference_endpoint/commands/benchmark/execute.py @@ -586,10 +586,9 @@ async def _run_benchmark_async( # ~1-2 second subprocess-launch window. This eliminates the # slow-joiner risk of dropping early live ticks (or the worst # case: missing COMPLETE if the SUB handshake never warms up). - # Only create the subscriber if live metrics publishing is enabled. if zmq_ctx.socket_dir is None: raise RuntimeError("ZMQ socket_dir must be set after publisher bind") - metrics_subscriber: MetricsSnapshotSubscriber | None = MetricsSnapshotSubscriber( + metrics_subscriber = MetricsSnapshotSubscriber( metrics_socket_name, zmq_ctx, loop ) metrics_subscriber.start() @@ -796,7 +795,7 @@ def _on_phase_start(phase: PhaseConfig) -> None: ) if snap_dict is not None: logger.info("Built report from final_snapshot.json") - elif metrics_subscriber is not None and metrics_subscriber.latest is not None: + elif metrics_subscriber.latest is not None: snap_dict = snapshot_to_dict(metrics_subscriber.latest) logger.warning( "No final_snapshot.json on disk; falling back to last " @@ -817,8 +816,7 @@ def _on_phase_start(phase: PhaseConfig) -> None: except Exception as e: # noqa: BLE001 — best-effort report build. logger.warning(f"Failed to build report from snapshot: {e}") - if metrics_subscriber is not None: - metrics_subscriber.close() + metrics_subscriber.close() pbar.close() return BenchmarkResult( From 35d4ffe0b9567eae98b1bc9f9cff3b8b789f7cf2 Mon Sep 17 00:00:00 2001 From: Thomas Atta-Fosu Date: Sun, 21 Jun 2026 19:14:58 -0700 Subject: [PATCH 3/4] Update src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../services/metrics_aggregator/publisher.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py b/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py index eb96c9e29..51cd68e05 100644 --- a/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py +++ b/src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py @@ -121,13 +121,6 @@ def start( the tick task is not created. Final snapshots will still be published via ``publish_final``. """ - if publish_interval_s <= 0: - logger.info( - "Live metrics publishing disabled " - "(publish_interval_s=%s, skipping tick task)", - publish_interval_s, - ) - return if self._tick_task is not None: logger.warning( "MetricsPublisher.start called again while tick task is " @@ -135,6 +128,13 @@ def start( id(self._tick_task), ) return + if publish_interval_s <= 0: + logger.info( + "Live metrics publishing disabled " + "(publish_interval_s=%s, skipping tick task)", + publish_interval_s, + ) + return async def _tick() -> None: while True: From 6a48d4f1254bfcbba2ebc867a3ae81ca95bc1825 Mon Sep 17 00:00:00 2001 From: Thomas Atta-Fosu Date: Sun, 21 Jun 2026 19:19:51 -0700 Subject: [PATCH 4/4] Update src/inference_endpoint/config/schema.py Add negative alias to `enable-live-metrics` Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/inference_endpoint/config/schema.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/inference_endpoint/config/schema.py b/src/inference_endpoint/config/schema.py index 9b813f8c3..681225af7 100644 --- a/src/inference_endpoint/config/schema.py +++ b/src/inference_endpoint/config/schema.py @@ -435,6 +435,7 @@ def _parse_duration_suffix(cls, v: object) -> object: bool, cyclopts.Parameter( alias="--enable-live-metrics", + negative="--no-live-metrics", help="Enable live metrics publishing (periodic snapshots); disable for endpoints running on CPUs", ), ] = Field(True, description="Enable live metrics publishing (periodic snapshots)")