Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def start(
spurious duplicate ``STARTED`` event or a buggy replay producer)
is a no-op rather than orphaning the original task. The original
task remains the one cancelled by ``publish_final`` / ``aclose``.

If ``publish_interval_s <= 0``, live publishing is disabled and
the tick task is not created. Final snapshots will still be
published via ``publish_final``.
"""
if self._tick_task is not None:
logger.warning(
Expand All @@ -125,9 +129,12 @@ def start(
)
return
if publish_interval_s <= 0:
raise ValueError(
f"publish_interval_s must be positive, got {publish_interval_s}"
logger.info(
"Live metrics publishing disabled "
"(publish_interval_s=%s, skipping tick task)",
publish_interval_s,
)
return

async def _tick() -> None:
while True:
Expand Down
4 changes: 4 additions & 0 deletions src/inference_endpoint/commands/benchmark/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,10 @@ async def _run_benchmark_async(
str(config.settings.drain.metrics_tokenizer_workers),
]
)
# Control live metrics publishing via config parameter
# publish_interval_s = 0 disables live tick task in publisher.start()
publish_interval_s = 0.25 if ctx.rt_settings.enable_live_metrics else 0.0
aggregator_args.extend(["--publish-interval", str(publish_interval_s)])
Comment on lines +733 to +736
Comment on lines +733 to +736

# EventLoggerService writes events.jsonl to tmpfs (high-frequency writes)
event_logger_args: list[str] = [
Expand Down
4 changes: 4 additions & 0 deletions src/inference_endpoint/config/runtime_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class RuntimeSettings:
load_pattern: LoadPattern | None
"""Load pattern configuration"""

enable_live_metrics: bool = True
"""Enable live metrics publishing (periodic snapshots)"""

@classmethod
def from_config(
cls,
Expand Down Expand Up @@ -162,6 +165,7 @@ def _from_config_default(
"rng_sched": random.Random(runtime_cfg.scheduler_random_seed),
"rng_sample_index": random.Random(runtime_cfg.dataloader_random_seed),
"load_pattern": load_pattern_cfg,
"enable_live_metrics": runtime_cfg.enable_live_metrics,
}

# Apply overrides
Expand Down
8 changes: 8 additions & 0 deletions src/inference_endpoint/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ def _parse_duration_suffix(cls, v: object) -> object:
] = Field(None, gt=0)
scheduler_random_seed: int = Field(42, description="Scheduler RNG seed")
dataloader_random_seed: int = Field(42, description="Dataloader RNG seed")
enable_live_metrics: Annotated[
bool,
cyclopts.Parameter(
alias="--enable-live-metrics",
negative="--no-live-metrics",
help="Enable live metrics publishing (periodic snapshots); disable for endpoints running on CPUs",
),
Comment thread
Copilot marked this conversation as resolved.
] = Field(True, description="Enable live metrics publishing (periodic snapshots)")
Comment thread
attafosu marked this conversation as resolved.
Comment thread
Copilot marked this conversation as resolved.

@model_validator(mode="after")
def _validate_durations(self) -> Self:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ settings:
n_samples_to_issue: null # Sample count override
scheduler_random_seed: 42 # Scheduler RNG seed
dataloader_random_seed: 42 # Dataloader RNG seed
enable_live_metrics: true # Enable live metrics publishing (periodic snapshots)
load_pattern:
type: concurrency # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step
target_qps: null # Target QPS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ settings:
n_samples_to_issue: null # Sample count override
scheduler_random_seed: 42 # Scheduler RNG seed
dataloader_random_seed: 42 # Dataloader RNG seed
enable_live_metrics: true # Enable live metrics publishing (periodic snapshots)
load_pattern:
type: max_throughput # Load pattern type | offline only: max_throughput
target_qps: null # Target QPS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ settings:
n_samples_to_issue: null # Sample count override
scheduler_random_seed: 42 # Scheduler RNG seed
dataloader_random_seed: 42 # Dataloader RNG seed
enable_live_metrics: true # Enable live metrics publishing (periodic snapshots)
load_pattern:
type: poisson # Load pattern type | options: max_throughput, poisson, concurrency, agentic_inference, burst, step
target_qps: 10.0 # Target QPS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,46 @@ async def test_publish_final_awaits_tick_task_cancellation(
finally:
publisher.close()

@pytest.mark.asyncio
async def test_disabled_interval_skips_tick_task_and_publish_final_works(
self, tmp_path: Path, zmq_ctx_scope: ManagedZMQContext
):
"""``publish_interval_s <= 0`` must not create a tick task, and
``publish_final`` must still write the JSON snapshot correctly."""
loop = asyncio.get_event_loop()
target = tmp_path / "final_snapshot.json"
publisher = MetricsPublisher(
MetricsSnapshotCodec(),
zmq_ctx_scope,
"test_pub_disabled_interval",
loop,
final_snapshot_path=target,
)
try:
registry = MetricsRegistry()
registry.register_counter("c")
registry.increment("c", 2)

publisher.start(
registry,
publish_interval_s=0,
get_runtime_state=lambda: (SessionState.LIVE, 0),
)

# No tick task should have been created.
assert publisher._tick_task is None

await publisher.publish_final(registry, n_pending_tasks=0)

assert (
target.exists()
), "final snapshot must be written even when live ticks are disabled"
decoded = json.loads(target.read_bytes())
assert decoded["state"] == SessionState.COMPLETE.value
assert decoded["n_pending_tasks"] == 0
finally:
publisher.close()

@pytest.mark.asyncio
async def test_close_cancels_tick_task(
self, tmp_path: Path, zmq_ctx_scope: ManagedZMQContext
Expand Down
Loading