From 6983f2fdef62ab5d7141f72a2386db6b2f4cd80d Mon Sep 17 00:00:00 2001 From: Nadine Handal Date: Thu, 5 Jun 2025 14:42:23 -0400 Subject: [PATCH 1/7] Add basic http metrics --- pyproject.toml | 1 + src/dstack/_internal/server/app.py | 1 + src/dstack/_internal/server/routers/prometheus.py | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index fba02a427c..62b558e80d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "gpuhunt==0.1.6", "argcomplete>=3.5.0", "ignore-python>=0.2.0", + "prometheus-fastapi-instrumentator>=7.1.0", ] [project.urls] diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index 6377e4c261..a58cee9cd3 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -90,6 +90,7 @@ def create_app() -> FastAPI: app = FastAPI(docs_url="/api/docs", lifespan=lifespan) app.state.proxy_dependency_injector = ServerProxyDependencyInjector() + Instrumentator().instrument(app, metric_namespace="dstack", metric_subsystem="server") return app diff --git a/src/dstack/_internal/server/routers/prometheus.py b/src/dstack/_internal/server/routers/prometheus.py index 28546e1eee..8957d706df 100644 --- a/src/dstack/_internal/server/routers/prometheus.py +++ b/src/dstack/_internal/server/routers/prometheus.py @@ -24,7 +24,7 @@ @router.get("/metrics") async def get_prometheus_metrics( session: Annotated[AsyncSession, Depends(get_session)], -) -> str: +): if not settings.ENABLE_PROMETHEUS_METRICS: raise error_not_found() custom_metrics = await prometheus.get_metrics(session=session) From 97aac8f6b25e24ad2d8f5ed2a3bac608c40b5c84 Mon Sep 17 00:00:00 2001 From: Nadine Handal Date: Fri, 6 Jun 2025 15:23:07 -0400 Subject: [PATCH 2/7] Implement custom http metrics --- src/dstack/_internal/server/app.py | 1 - src/dstack/_internal/server/routers/prometheus.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index a58cee9cd3..6377e4c261 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -90,7 +90,6 @@ def create_app() -> FastAPI: app = FastAPI(docs_url="/api/docs", lifespan=lifespan) app.state.proxy_dependency_injector = ServerProxyDependencyInjector() - Instrumentator().instrument(app, metric_namespace="dstack", metric_subsystem="server") return app diff --git a/src/dstack/_internal/server/routers/prometheus.py b/src/dstack/_internal/server/routers/prometheus.py index 8957d706df..28546e1eee 100644 --- a/src/dstack/_internal/server/routers/prometheus.py +++ b/src/dstack/_internal/server/routers/prometheus.py @@ -24,7 +24,7 @@ @router.get("/metrics") async def get_prometheus_metrics( session: Annotated[AsyncSession, Depends(get_session)], -): +) -> str: if not settings.ENABLE_PROMETHEUS_METRICS: raise error_not_found() custom_metrics = await prometheus.get_metrics(session=session) From ecc2ad77ee32b59d7170532ba44f330e4848ddd4 Mon Sep 17 00:00:00 2001 From: Nadine Handal Date: Fri, 6 Jun 2025 15:34:50 -0400 Subject: [PATCH 3/7] Update docs --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 62b558e80d..fba02a427c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,6 @@ dependencies = [ "gpuhunt==0.1.6", "argcomplete>=3.5.0", "ignore-python>=0.2.0", - "prometheus-fastapi-instrumentator>=7.1.0", ] [project.urls] From 36300d423f50083d0cc5181975ba216aa4d9cc18 Mon Sep 17 00:00:00 2001 From: Nadine Handal Date: Tue, 10 Jun 2025 11:48:31 -0400 Subject: [PATCH 4/7] Add custom health metrics --- .../_internal/server/services/prometheus.py | 3 + src/dstack/_internal/server/services/runs.py | 95 ++++++++++++++----- 2 files changed, 73 insertions(+), 25 deletions(-) diff --git a/src/dstack/_internal/server/services/prometheus.py b/src/dstack/_internal/server/services/prometheus.py index 9f9c64ed15..5c056f5477 100644 --- a/src/dstack/_internal/server/services/prometheus.py +++ b/src/dstack/_internal/server/services/prometheus.py @@ -109,6 +109,7 @@ async def get_run_metrics(session: AsyncSession) -> Iterable[Metric]: metrics.add_sample(_RUN_COUNT_TOTAL, labels, sum(statuses.values())) metrics.add_sample(_RUN_COUNT_TERMINATED, labels, statuses[RunStatus.TERMINATED]) metrics.add_sample(_RUN_COUNT_FAILED, labels, statuses[RunStatus.FAILED]) + metrics.add_sample(_RUN_COUNT_PENDING, labels, statuses[RunStatus.PENDING]) metrics.add_sample(_RUN_COUNT_DONE, labels, statuses[RunStatus.DONE]) return metrics.values() @@ -194,6 +195,7 @@ async def get_job_metrics(session: AsyncSession) -> Iterable[Metric]: _RUN_COUNT_TERMINATED = "dstack_run_count_terminated_total" _RUN_COUNT_FAILED = "dstack_run_count_failed_total" _RUN_COUNT_DONE = "dstack_run_count_done_total" +_RUN_COUNT_PENDING = "dstack_run_count_pending_total" _JOB_DURATION = "dstack_job_duration_seconds_total" _JOB_PRICE = "dstack_job_price_dollars_per_hour" _JOB_GPU_COUNT = "dstack_job_gpu_count" @@ -246,6 +248,7 @@ class _RunMetrics(_Metrics): (_RUN_COUNT_TERMINATED, _COUNTER, "Terminated runs count"), (_RUN_COUNT_FAILED, _COUNTER, "Failed runs count"), (_RUN_COUNT_DONE, _COUNTER, "Done runs count"), + (_RUN_COUNT_PENDING, _COUNTER, "Pending runs count"), ] diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index a1ec23e466..7ce7258fcd 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -1,10 +1,12 @@ import itertools import math +import time import uuid from datetime import datetime, timezone from typing import List, Optional import pydantic +from prometheus_client import REGISTRY, Histogram from sqlalchemy import and_, func, or_, select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload @@ -88,6 +90,13 @@ logger = get_logger(__name__) +RUN_SUBMISSION_TIME = Histogram( + "dstack_run_submission_duration_seconds", + "Time taken to submit a run", + labelnames=["project_name", "run_type", "action_type"], + buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0, float("inf")], + registry=REGISTRY, # Explicitly specify the registry +) JOB_TERMINATION_REASONS_TO_RETRY = { JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, @@ -381,6 +390,8 @@ async def apply_plan( plan: ApplyRunPlanInput, force: bool, ) -> Run: + submission_start_time = time.time() + run_spec = plan.run_spec run_spec = await apply_plugin_policies( user=user.name, @@ -390,25 +401,50 @@ async def apply_plan( # Spec must be copied by parsing to calculate merged_profile run_spec = RunSpec.parse_obj(run_spec.dict()) _validate_run_spec_and_set_defaults(run_spec) + + # Prepare labels for metrics + project_name = project.name + run_type = run_spec.configuration.type + if run_spec.run_name is None: - return await submit_run( - session=session, - user=user, - project=project, - run_spec=run_spec, - ) + try: + run = await submit_run( + session=session, + user=user, + project=project, + run_spec=run_spec, + ) + RUN_SUBMISSION_TIME.labels( + project_name=project_name, run_type=run_type, action_type="submit" + ).observe(time.time() - submission_start_time) + return run + except Exception: + RUN_SUBMISSION_TIME.labels( + project_name=project_name, run_type=run_type, action_type="submit" + ).observe(time.time() - submission_start_time) + raise current_resource = await get_run_by_name( session=session, project=project, run_name=run_spec.run_name, ) if current_resource is None or current_resource.status.is_finished(): - return await submit_run( - session=session, - user=user, - project=project, - run_spec=run_spec, - ) + try: + run = await submit_run( + session=session, + user=user, + project=project, + run_spec=run_spec, + ) + RUN_SUBMISSION_TIME.labels( + project_name=project_name, run_type=run_type, action_type="submit" + ).observe(time.time() - submission_start_time) + return run + except Exception: + RUN_SUBMISSION_TIME.labels( + project_name=project_name, run_type=run_type, action_type="submit" + ).observe(time.time() - submission_start_time) + raise # For backward compatibility (current_resource may has been submitted before # some fields, e.g., CPUSpec.arch, were added) @@ -433,20 +469,29 @@ async def apply_plan( ) # FIXME: potentially long write transaction # Avoid getting run_model after update - await session.execute( - update(RunModel) - .where(RunModel.id == current_resource.id) - .values( - run_spec=run_spec.json(), - priority=run_spec.configuration.priority, + try: + await session.execute( + update(RunModel) + .where(RunModel.id == current_resource.id) + .values( + run_spec=run_spec.json(), + priority=run_spec.configuration.priority, + ) ) - ) - run = await get_run_by_name( - session=session, - project=project, - run_name=run_spec.run_name, - ) - return common_utils.get_or_error(run) + run = await get_run_by_name( + session=session, + project=project, + run_name=run_spec.run_name, + ) + RUN_SUBMISSION_TIME.labels( + project_name=project_name, run_type=run_type, action_type="update" + ).observe(time.time() - submission_start_time) + return common_utils.get_or_error(run) + except Exception: + RUN_SUBMISSION_TIME.labels( + project_name=project_name, run_type=run_type, action_type="update" + ).observe(time.time() - submission_start_time) + raise async def submit_run( From f19732701359c8a439d302945bc215d01f48de3c Mon Sep 17 00:00:00 2001 From: Nadine Handal Date: Thu, 12 Jun 2025 12:10:58 -0400 Subject: [PATCH 5/7] Add prometheus wrapper class --- .../_internal/server/background/metrics.py | 52 ++++++++++ .../server/background/tasks/process_runs.py | 21 ++++ .../_internal/server/services/prometheus.py | 3 - src/dstack/_internal/server/services/runs.py | 95 +++++-------------- .../background/tasks/test_process_runs.py | 75 ++++++++++++++- .../server/background/test_metrics.py | 45 +++++++++ 6 files changed, 213 insertions(+), 78 deletions(-) create mode 100644 src/dstack/_internal/server/background/metrics.py create mode 100644 src/tests/_internal/server/background/test_metrics.py diff --git a/src/dstack/_internal/server/background/metrics.py b/src/dstack/_internal/server/background/metrics.py new file mode 100644 index 0000000000..d25971ef78 --- /dev/null +++ b/src/dstack/_internal/server/background/metrics.py @@ -0,0 +1,52 @@ +from prometheus_client import Counter, Histogram + + +class RunMetrics: + """Wrapper class for run-related Prometheus metrics.""" + + def __init__(self): + self._submit_to_provision_duration = Histogram( + "dstack_submit_to_provision_duration_seconds", + "Time from when a run has been submitted and first job provisioning", + # Buckets optimized for percentile calculation + buckets=[ + 15, + 30, + 45, + 60, + 90, + 120, + 180, + 240, + 300, + 360, + 420, + 480, + 540, + 600, + 900, + 1200, + 1800, + float("inf"), + ], + labelnames=["project_name", "run_type"], + ) + + self._pending_runs_total = Counter( + "dstack_pending_runs_total", + "Number of pending runs", + labelnames=["project_name", "run_type"], + ) + + def log_submit_to_provision_duration( + self, duration_seconds: float, project_name: str, run_type: str + ): + self._submit_to_provision_duration.labels( + project_name=project_name, run_type=run_type + ).observe(duration_seconds) + + def increment_pending_runs(self, project_name: str, run_type: str): + self._pending_runs_total.labels(project_name=project_name, run_type=run_type).inc() + + +run_metrics = RunMetrics() diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 547a4cd5a0..d8699d17f1 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -20,6 +20,7 @@ RunStatus, RunTerminationReason, ) +from dstack._internal.server.background.metrics import run_metrics from dstack._internal.server.db import get_session_ctx from dstack._internal.server.models import JobModel, ProjectModel, RunModel from dstack._internal.server.services.jobs import ( @@ -369,6 +370,26 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): run_model.status.name, new_status.name, ) + if run_model.status == RunStatus.SUBMITTED and new_status == RunStatus.PROVISIONING: + current_time = datetime.datetime.now(datetime.timezone.utc) + submit_to_provision_duration = ( + current_time - run_model.submitted_at.replace(tzinfo=datetime.timezone.utc) + ).total_seconds() + logger.info( + "%s: run took %.2f seconds from submision to provisioning.", + fmt(run_model), + submit_to_provision_duration, + ) + project_name = run_model.project.name + run_type = RunSpec.__response__.parse_raw(run_model.run_spec).configuration.type + run_metrics.log_submit_to_provision_duration( + submit_to_provision_duration, project_name, run_type + ) + + if new_status == RunStatus.PENDING: + run_spec = RunSpec.__response__.parse_raw(run_model.run_spec) + run_metrics.increment_pending_runs(run_model.project.name, run_spec.configuration.type) + run_model.status = new_status run_model.termination_reason = termination_reason # While a run goes to pending without provisioning, resubmission_attempt increases. diff --git a/src/dstack/_internal/server/services/prometheus.py b/src/dstack/_internal/server/services/prometheus.py index 5c056f5477..9f9c64ed15 100644 --- a/src/dstack/_internal/server/services/prometheus.py +++ b/src/dstack/_internal/server/services/prometheus.py @@ -109,7 +109,6 @@ async def get_run_metrics(session: AsyncSession) -> Iterable[Metric]: metrics.add_sample(_RUN_COUNT_TOTAL, labels, sum(statuses.values())) metrics.add_sample(_RUN_COUNT_TERMINATED, labels, statuses[RunStatus.TERMINATED]) metrics.add_sample(_RUN_COUNT_FAILED, labels, statuses[RunStatus.FAILED]) - metrics.add_sample(_RUN_COUNT_PENDING, labels, statuses[RunStatus.PENDING]) metrics.add_sample(_RUN_COUNT_DONE, labels, statuses[RunStatus.DONE]) return metrics.values() @@ -195,7 +194,6 @@ async def get_job_metrics(session: AsyncSession) -> Iterable[Metric]: _RUN_COUNT_TERMINATED = "dstack_run_count_terminated_total" _RUN_COUNT_FAILED = "dstack_run_count_failed_total" _RUN_COUNT_DONE = "dstack_run_count_done_total" -_RUN_COUNT_PENDING = "dstack_run_count_pending_total" _JOB_DURATION = "dstack_job_duration_seconds_total" _JOB_PRICE = "dstack_job_price_dollars_per_hour" _JOB_GPU_COUNT = "dstack_job_gpu_count" @@ -248,7 +246,6 @@ class _RunMetrics(_Metrics): (_RUN_COUNT_TERMINATED, _COUNTER, "Terminated runs count"), (_RUN_COUNT_FAILED, _COUNTER, "Failed runs count"), (_RUN_COUNT_DONE, _COUNTER, "Done runs count"), - (_RUN_COUNT_PENDING, _COUNTER, "Pending runs count"), ] diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 7ce7258fcd..a1ec23e466 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -1,12 +1,10 @@ import itertools import math -import time import uuid from datetime import datetime, timezone from typing import List, Optional import pydantic -from prometheus_client import REGISTRY, Histogram from sqlalchemy import and_, func, or_, select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload @@ -90,13 +88,6 @@ logger = get_logger(__name__) -RUN_SUBMISSION_TIME = Histogram( - "dstack_run_submission_duration_seconds", - "Time taken to submit a run", - labelnames=["project_name", "run_type", "action_type"], - buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0, float("inf")], - registry=REGISTRY, # Explicitly specify the registry -) JOB_TERMINATION_REASONS_TO_RETRY = { JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, @@ -390,8 +381,6 @@ async def apply_plan( plan: ApplyRunPlanInput, force: bool, ) -> Run: - submission_start_time = time.time() - run_spec = plan.run_spec run_spec = await apply_plugin_policies( user=user.name, @@ -401,50 +390,25 @@ async def apply_plan( # Spec must be copied by parsing to calculate merged_profile run_spec = RunSpec.parse_obj(run_spec.dict()) _validate_run_spec_and_set_defaults(run_spec) - - # Prepare labels for metrics - project_name = project.name - run_type = run_spec.configuration.type - if run_spec.run_name is None: - try: - run = await submit_run( - session=session, - user=user, - project=project, - run_spec=run_spec, - ) - RUN_SUBMISSION_TIME.labels( - project_name=project_name, run_type=run_type, action_type="submit" - ).observe(time.time() - submission_start_time) - return run - except Exception: - RUN_SUBMISSION_TIME.labels( - project_name=project_name, run_type=run_type, action_type="submit" - ).observe(time.time() - submission_start_time) - raise + return await submit_run( + session=session, + user=user, + project=project, + run_spec=run_spec, + ) current_resource = await get_run_by_name( session=session, project=project, run_name=run_spec.run_name, ) if current_resource is None or current_resource.status.is_finished(): - try: - run = await submit_run( - session=session, - user=user, - project=project, - run_spec=run_spec, - ) - RUN_SUBMISSION_TIME.labels( - project_name=project_name, run_type=run_type, action_type="submit" - ).observe(time.time() - submission_start_time) - return run - except Exception: - RUN_SUBMISSION_TIME.labels( - project_name=project_name, run_type=run_type, action_type="submit" - ).observe(time.time() - submission_start_time) - raise + return await submit_run( + session=session, + user=user, + project=project, + run_spec=run_spec, + ) # For backward compatibility (current_resource may has been submitted before # some fields, e.g., CPUSpec.arch, were added) @@ -469,29 +433,20 @@ async def apply_plan( ) # FIXME: potentially long write transaction # Avoid getting run_model after update - try: - await session.execute( - update(RunModel) - .where(RunModel.id == current_resource.id) - .values( - run_spec=run_spec.json(), - priority=run_spec.configuration.priority, - ) - ) - run = await get_run_by_name( - session=session, - project=project, - run_name=run_spec.run_name, + await session.execute( + update(RunModel) + .where(RunModel.id == current_resource.id) + .values( + run_spec=run_spec.json(), + priority=run_spec.configuration.priority, ) - RUN_SUBMISSION_TIME.labels( - project_name=project_name, run_type=run_type, action_type="update" - ).observe(time.time() - submission_start_time) - return common_utils.get_or_error(run) - except Exception: - RUN_SUBMISSION_TIME.labels( - project_name=project_name, run_type=run_type, action_type="update" - ).observe(time.time() - submission_start_time) - raise + ) + run = await get_run_by_name( + session=session, + project=project, + run_name=run_spec.run_name, + ) + return common_utils.get_or_error(run) async def submit_run( diff --git a/src/tests/_internal/server/background/tasks/test_process_runs.py b/src/tests/_internal/server/background/tasks/test_process_runs.py index bbef75b764..f5e764c0df 100644 --- a/src/tests/_internal/server/background/tasks/test_process_runs.py +++ b/src/tests/_internal/server/background/tasks/test_process_runs.py @@ -75,7 +75,26 @@ class TestProcessRuns: async def test_submitted_to_provisioning(self, test_db, session: AsyncSession): run = await make_run(session, status=RunStatus.SUBMITTED) await create_job(session=session, run=run, status=JobStatus.PROVISIONING) - await process_runs.process_runs() + current_time = datetime.datetime.now(datetime.timezone.utc) + + expected_duration = ( + current_time - run.submitted_at.replace(tzinfo=datetime.timezone.utc) + ).total_seconds() + + with patch( + "dstack._internal.server.background.tasks.process_runs.run_metrics" + ) as mock_run_metrics: + await process_runs.process_runs() + + mock_run_metrics.log_submit_to_provision_duration.assert_called_once() + args = mock_run_metrics.log_submit_to_provision_duration.call_args[0] + assert args[1] == run.project.name + assert args[2] == "service" + # Assert the duration is close to our expected duration (within 0.05 second tolerance) + assert abs(args[0] - expected_duration) < 0.05, ( + f"Expected duration ~{expected_duration:.3f}s, got {args[0]:.3f}s" + ) + await session.refresh(run) assert run.status == RunStatus.PROVISIONING @@ -95,7 +114,14 @@ async def test_keep_provisioning(self, test_db, session: AsyncSession): run = await make_run(session, status=RunStatus.PROVISIONING) await create_job(session=session, run=run, status=JobStatus.PULLING) - await process_runs.process_runs() + with patch( + "dstack._internal.server.background.tasks.process_runs.run_metrics" + ) as mock_run_metrics: + await process_runs.process_runs() + + mock_run_metrics.log_submit_to_provision_duration.assert_not_called() + mock_run_metrics.increment_pending_runs.assert_not_called() + await session.refresh(run) assert run.status == RunStatus.PROVISIONING @@ -153,9 +179,19 @@ async def test_retry_running_to_pending(self, test_db, session: AsyncSession): instance=instance, job_provisioning_data=get_job_provisioning_data(), ) - with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: + with ( + patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock, + patch( + "dstack._internal.server.background.tasks.process_runs.run_metrics" + ) as mock_run_metrics, + ): datetime_mock.return_value = run.submitted_at + datetime.timedelta(minutes=3) await process_runs.process_runs() + + mock_run_metrics.increment_pending_runs.assert_called_once_with( + run.project.name, "service" + ) + await session.refresh(run) assert run.status == RunStatus.PENDING @@ -201,8 +237,27 @@ async def test_submitted_to_provisioning_if_any(self, test_db, session: AsyncSes run = await make_run(session, status=RunStatus.SUBMITTED, replicas=2) await create_job(session=session, run=run, status=JobStatus.SUBMITTED, replica_num=0) await create_job(session=session, run=run, status=JobStatus.PROVISIONING, replica_num=1) + current_time = datetime.datetime.now(datetime.timezone.utc) + + expected_duration = ( + current_time - run.submitted_at.replace(tzinfo=datetime.timezone.utc) + ).total_seconds() + + with patch( + "dstack._internal.server.background.tasks.process_runs.run_metrics" + ) as mock_run_metrics: + await process_runs.process_runs() + + mock_run_metrics.log_submit_to_provision_duration.assert_called_once() + args = mock_run_metrics.log_submit_to_provision_duration.call_args[0] + assert args[1] == run.project.name + assert args[2] == "service" + assert isinstance(args[0], float) + # Assert the duration is close to our expected duration (within 0.05 second tolerance) + assert abs(args[0] - expected_duration) < 0.05, ( + f"Expected duration ~{expected_duration:.3f}s, got {args[0]:.3f}s" + ) - await process_runs.process_runs() await session.refresh(run) assert run.status == RunStatus.PROVISIONING @@ -243,9 +298,19 @@ async def test_all_no_capacity_to_pending(self, test_db, session: AsyncSession): instance=await create_instance(session, project=run.project, spot=True), job_provisioning_data=get_job_provisioning_data(), ) - with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: + with ( + patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock, + patch( + "dstack._internal.server.background.tasks.process_runs.run_metrics" + ) as mock_run_metrics, + ): datetime_mock.return_value = run.submitted_at + datetime.timedelta(minutes=3) await process_runs.process_runs() + + mock_run_metrics.increment_pending_runs.assert_called_once_with( + run.project.name, "service" + ) + await session.refresh(run) assert run.status == RunStatus.PENDING diff --git a/src/tests/_internal/server/background/test_metrics.py b/src/tests/_internal/server/background/test_metrics.py new file mode 100644 index 0000000000..82c81cae36 --- /dev/null +++ b/src/tests/_internal/server/background/test_metrics.py @@ -0,0 +1,45 @@ +from unittest.mock import MagicMock + +from dstack._internal.server.background.metrics import run_metrics + + +class TestRunMetrics: + def test_log_submit_to_provision_duration(self, monkeypatch): + mock_histogram = MagicMock() + mock_labels = MagicMock() + mock_histogram.labels.return_value = mock_labels + monkeypatch.setattr(run_metrics, "_submit_to_provision_duration", mock_histogram) + + duration = 120.5 + project_name = "test-project" + run_type = "dev" + + run_metrics.log_submit_to_provision_duration(duration, project_name, run_type) + + mock_histogram.labels.assert_called_once_with(project_name=project_name, run_type=run_type) + mock_labels.observe.assert_called_once_with(duration) + + def test_increment_pending_runs(self, monkeypatch): + mock_counter = MagicMock() + mock_labels = MagicMock() + mock_counter.labels.return_value = mock_labels + + monkeypatch.setattr(run_metrics, "_pending_runs_total", mock_counter) + + project_name = "test-project" + run_type = "train" + + run_metrics.increment_pending_runs(project_name, run_type) + mock_counter.labels.assert_called_once_with(project_name=project_name, run_type=run_type) + mock_labels.inc.assert_called_once() + + def test_multiple_calls_to_log_submit_to_provision_duration(self): + run_metrics.log_submit_to_provision_duration(60.0, "project1", "dev") + run_metrics.log_submit_to_provision_duration(120.0, "project1", "prod") + run_metrics.log_submit_to_provision_duration(30.0, "project2", "dev") + + def test_multiple_calls_to_increment_pending_runs(self): + run_metrics.increment_pending_runs("project1", "dev") + run_metrics.increment_pending_runs("project1", "prod") + run_metrics.increment_pending_runs("project2", "dev") + run_metrics.increment_pending_runs("project1", "dev") From 7227265e4c3fb750cf6ce2632283b44b2bc107d8 Mon Sep 17 00:00:00 2001 From: Nadine Handal Date: Wed, 18 Jun 2025 14:16:29 -0400 Subject: [PATCH 6/7] Apply PR comments --- .../server/background/tasks/process_runs.py | 8 +++----- .../_internal/server/routers/prometheus.py | 4 ++-- .../server/services/prometheus/__init__.py | 0 .../pull_metrics.py} | 0 .../prometheus/push_metrics.py} | 0 .../background/tasks/test_process_runs.py | 17 ++++++++--------- .../server/services/prometheus/__init__.py | 0 .../prometheus/test_push_metrics.py} | 2 +- 8 files changed, 14 insertions(+), 17 deletions(-) create mode 100644 src/dstack/_internal/server/services/prometheus/__init__.py rename src/dstack/_internal/server/services/{prometheus.py => prometheus/pull_metrics.py} (100%) rename src/dstack/_internal/server/{background/metrics.py => services/prometheus/push_metrics.py} (100%) create mode 100644 src/tests/_internal/server/services/prometheus/__init__.py rename src/tests/_internal/server/{background/test_metrics.py => services/prometheus/test_push_metrics.py} (95%) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index d8699d17f1..3db539eb20 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -20,7 +20,6 @@ RunStatus, RunTerminationReason, ) -from dstack._internal.server.background.metrics import run_metrics from dstack._internal.server.db import get_session_ctx from dstack._internal.server.models import JobModel, ProjectModel, RunModel from dstack._internal.server.services.jobs import ( @@ -29,6 +28,7 @@ group_jobs_by_replica_latest, ) from dstack._internal.server.services.locking import get_locker +from dstack._internal.server.services.prometheus.push_metrics import run_metrics from dstack._internal.server.services.runs import ( create_job_model_for_new_submission, fmt, @@ -371,7 +371,7 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): new_status.name, ) if run_model.status == RunStatus.SUBMITTED and new_status == RunStatus.PROVISIONING: - current_time = datetime.datetime.now(datetime.timezone.utc) + current_time = common.get_current_datetime() submit_to_provision_duration = ( current_time - run_model.submitted_at.replace(tzinfo=datetime.timezone.utc) ).total_seconds() @@ -381,13 +381,11 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): submit_to_provision_duration, ) project_name = run_model.project.name - run_type = RunSpec.__response__.parse_raw(run_model.run_spec).configuration.type run_metrics.log_submit_to_provision_duration( - submit_to_provision_duration, project_name, run_type + submit_to_provision_duration, project_name, run_spec.configuration.type ) if new_status == RunStatus.PENDING: - run_spec = RunSpec.__response__.parse_raw(run_model.run_spec) run_metrics.increment_pending_runs(run_model.project.name, run_spec.configuration.type) run_model.status = new_status diff --git a/src/dstack/_internal/server/routers/prometheus.py b/src/dstack/_internal/server/routers/prometheus.py index 28546e1eee..ccd85c4b65 100644 --- a/src/dstack/_internal/server/routers/prometheus.py +++ b/src/dstack/_internal/server/routers/prometheus.py @@ -9,7 +9,7 @@ from dstack._internal.server import settings from dstack._internal.server.db import get_session from dstack._internal.server.security.permissions import OptionalServiceAccount -from dstack._internal.server.services import prometheus +from dstack._internal.server.services.prometheus import pull_metrics from dstack._internal.server.utils.routers import error_not_found _auth = OptionalServiceAccount(os.getenv("DSTACK_PROMETHEUS_AUTH_TOKEN")) @@ -27,6 +27,6 @@ async def get_prometheus_metrics( ) -> str: if not settings.ENABLE_PROMETHEUS_METRICS: raise error_not_found() - custom_metrics = await prometheus.get_metrics(session=session) + custom_metrics = await pull_metrics.get_metrics(session=session) prometheus_metrics = generate_latest() return custom_metrics + prometheus_metrics.decode() diff --git a/src/dstack/_internal/server/services/prometheus/__init__.py b/src/dstack/_internal/server/services/prometheus/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/dstack/_internal/server/services/prometheus.py b/src/dstack/_internal/server/services/prometheus/pull_metrics.py similarity index 100% rename from src/dstack/_internal/server/services/prometheus.py rename to src/dstack/_internal/server/services/prometheus/pull_metrics.py diff --git a/src/dstack/_internal/server/background/metrics.py b/src/dstack/_internal/server/services/prometheus/push_metrics.py similarity index 100% rename from src/dstack/_internal/server/background/metrics.py rename to src/dstack/_internal/server/services/prometheus/push_metrics.py diff --git a/src/tests/_internal/server/background/tasks/test_process_runs.py b/src/tests/_internal/server/background/tasks/test_process_runs.py index f5e764c0df..1ff885b383 100644 --- a/src/tests/_internal/server/background/tasks/test_process_runs.py +++ b/src/tests/_internal/server/background/tasks/test_process_runs.py @@ -3,6 +3,7 @@ from unittest.mock import patch import pytest +from freezegun import freeze_time from pydantic import parse_obj_as from sqlalchemy.ext.asyncio import AsyncSession @@ -28,6 +29,7 @@ get_job_provisioning_data, get_run_spec, ) +from dstack._internal.utils import common pytestmark = pytest.mark.usefixtures("image_config_mock") @@ -72,10 +74,11 @@ async def make_run( class TestProcessRuns: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @freeze_time(datetime.datetime(2023, 1, 2, 3, 5, 20, tzinfo=datetime.timezone.utc)) async def test_submitted_to_provisioning(self, test_db, session: AsyncSession): run = await make_run(session, status=RunStatus.SUBMITTED) await create_job(session=session, run=run, status=JobStatus.PROVISIONING) - current_time = datetime.datetime.now(datetime.timezone.utc) + current_time = common.get_current_datetime() expected_duration = ( current_time - run.submitted_at.replace(tzinfo=datetime.timezone.utc) @@ -91,9 +94,7 @@ async def test_submitted_to_provisioning(self, test_db, session: AsyncSession): assert args[1] == run.project.name assert args[2] == "service" # Assert the duration is close to our expected duration (within 0.05 second tolerance) - assert abs(args[0] - expected_duration) < 0.05, ( - f"Expected duration ~{expected_duration:.3f}s, got {args[0]:.3f}s" - ) + assert args[0] == expected_duration await session.refresh(run) assert run.status == RunStatus.PROVISIONING @@ -233,11 +234,12 @@ async def test_pending_to_submitted(self, test_db, session: AsyncSession): class TestProcessRunsReplicas: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @freeze_time(datetime.datetime(2023, 1, 2, 3, 5, 20, tzinfo=datetime.timezone.utc)) async def test_submitted_to_provisioning_if_any(self, test_db, session: AsyncSession): run = await make_run(session, status=RunStatus.SUBMITTED, replicas=2) await create_job(session=session, run=run, status=JobStatus.SUBMITTED, replica_num=0) await create_job(session=session, run=run, status=JobStatus.PROVISIONING, replica_num=1) - current_time = datetime.datetime.now(datetime.timezone.utc) + current_time = common.get_current_datetime() expected_duration = ( current_time - run.submitted_at.replace(tzinfo=datetime.timezone.utc) @@ -253,10 +255,7 @@ async def test_submitted_to_provisioning_if_any(self, test_db, session: AsyncSes assert args[1] == run.project.name assert args[2] == "service" assert isinstance(args[0], float) - # Assert the duration is close to our expected duration (within 0.05 second tolerance) - assert abs(args[0] - expected_duration) < 0.05, ( - f"Expected duration ~{expected_duration:.3f}s, got {args[0]:.3f}s" - ) + assert args[0] == expected_duration await session.refresh(run) assert run.status == RunStatus.PROVISIONING diff --git a/src/tests/_internal/server/services/prometheus/__init__.py b/src/tests/_internal/server/services/prometheus/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/tests/_internal/server/background/test_metrics.py b/src/tests/_internal/server/services/prometheus/test_push_metrics.py similarity index 95% rename from src/tests/_internal/server/background/test_metrics.py rename to src/tests/_internal/server/services/prometheus/test_push_metrics.py index 82c81cae36..dddd1a45c8 100644 --- a/src/tests/_internal/server/background/test_metrics.py +++ b/src/tests/_internal/server/services/prometheus/test_push_metrics.py @@ -1,6 +1,6 @@ from unittest.mock import MagicMock -from dstack._internal.server.background.metrics import run_metrics +from dstack._internal.server.services.prometheus.push_metrics import run_metrics class TestRunMetrics: From 1583387ddcf5498448d8c8e39df7a009cbdec1f4 Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Thu, 26 Jun 2025 08:51:23 +0000 Subject: [PATCH 7/7] Rename modules --- .../_internal/server/background/tasks/process_runs.py | 2 +- src/dstack/_internal/server/routers/prometheus.py | 10 +++++----- .../prometheus/{push_metrics.py => client_metrics.py} | 0 .../prometheus/{pull_metrics.py => custom_metrics.py} | 0 src/tests/_internal/server/routers/test_prometheus.py | 4 ++-- .../{test_push_metrics.py => test_client_metrics.py} | 2 +- 6 files changed, 9 insertions(+), 9 deletions(-) rename src/dstack/_internal/server/services/prometheus/{push_metrics.py => client_metrics.py} (100%) rename src/dstack/_internal/server/services/prometheus/{pull_metrics.py => custom_metrics.py} (100%) rename src/tests/_internal/server/services/prometheus/{test_push_metrics.py => test_client_metrics.py} (95%) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 3db539eb20..f0d374e3c2 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -28,7 +28,7 @@ group_jobs_by_replica_latest, ) from dstack._internal.server.services.locking import get_locker -from dstack._internal.server.services.prometheus.push_metrics import run_metrics +from dstack._internal.server.services.prometheus.client_metrics import run_metrics from dstack._internal.server.services.runs import ( create_job_model_for_new_submission, fmt, diff --git a/src/dstack/_internal/server/routers/prometheus.py b/src/dstack/_internal/server/routers/prometheus.py index ccd85c4b65..a5538edfec 100644 --- a/src/dstack/_internal/server/routers/prometheus.py +++ b/src/dstack/_internal/server/routers/prometheus.py @@ -1,15 +1,15 @@ import os from typing import Annotated +import prometheus_client from fastapi import APIRouter, Depends from fastapi.responses import PlainTextResponse -from prometheus_client import generate_latest from sqlalchemy.ext.asyncio import AsyncSession from dstack._internal.server import settings from dstack._internal.server.db import get_session from dstack._internal.server.security.permissions import OptionalServiceAccount -from dstack._internal.server.services.prometheus import pull_metrics +from dstack._internal.server.services.prometheus import custom_metrics from dstack._internal.server.utils.routers import error_not_found _auth = OptionalServiceAccount(os.getenv("DSTACK_PROMETHEUS_AUTH_TOKEN")) @@ -27,6 +27,6 @@ async def get_prometheus_metrics( ) -> str: if not settings.ENABLE_PROMETHEUS_METRICS: raise error_not_found() - custom_metrics = await pull_metrics.get_metrics(session=session) - prometheus_metrics = generate_latest() - return custom_metrics + prometheus_metrics.decode() + custom_metrics_ = await custom_metrics.get_metrics(session=session) + client_metrics = prometheus_client.generate_latest().decode() + return custom_metrics_ + client_metrics diff --git a/src/dstack/_internal/server/services/prometheus/push_metrics.py b/src/dstack/_internal/server/services/prometheus/client_metrics.py similarity index 100% rename from src/dstack/_internal/server/services/prometheus/push_metrics.py rename to src/dstack/_internal/server/services/prometheus/client_metrics.py diff --git a/src/dstack/_internal/server/services/prometheus/pull_metrics.py b/src/dstack/_internal/server/services/prometheus/custom_metrics.py similarity index 100% rename from src/dstack/_internal/server/services/prometheus/pull_metrics.py rename to src/dstack/_internal/server/services/prometheus/custom_metrics.py diff --git a/src/tests/_internal/server/routers/test_prometheus.py b/src/tests/_internal/server/routers/test_prometheus.py index 4cb7c01c2f..1f7e1e274b 100644 --- a/src/tests/_internal/server/routers/test_prometheus.py +++ b/src/tests/_internal/server/routers/test_prometheus.py @@ -100,7 +100,7 @@ def enable_metrics(monkeypatch: pytest.MonkeyPatch): @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @pytest.mark.usefixtures("image_config_mock", "test_db", "enable_metrics") class TestGetPrometheusMetrics: - @patch("dstack._internal.server.routers.prometheus.generate_latest", lambda: BASE_HTTP_METRICS) + @patch("prometheus_client.generate_latest", lambda: BASE_HTTP_METRICS) async def test_returns_metrics(self, session: AsyncSession, client: AsyncClient): user = await create_user(session=session, name="test-user", global_role=GlobalRole.USER) offer = get_instance_offer_with_availability( @@ -335,7 +335,7 @@ async def test_returns_metrics(self, session: AsyncSession, client: AsyncClient) ) assert response.text.strip() == expected - @patch("dstack._internal.server.routers.prometheus.generate_latest", lambda: BASE_HTTP_METRICS) + @patch("prometheus_client.generate_latest", lambda: BASE_HTTP_METRICS) async def test_returns_empty_response_if_no_runs(self, client: AsyncClient): response = await client.get("/metrics") assert response.status_code == 200 diff --git a/src/tests/_internal/server/services/prometheus/test_push_metrics.py b/src/tests/_internal/server/services/prometheus/test_client_metrics.py similarity index 95% rename from src/tests/_internal/server/services/prometheus/test_push_metrics.py rename to src/tests/_internal/server/services/prometheus/test_client_metrics.py index dddd1a45c8..9d21ff5360 100644 --- a/src/tests/_internal/server/services/prometheus/test_push_metrics.py +++ b/src/tests/_internal/server/services/prometheus/test_client_metrics.py @@ -1,6 +1,6 @@ from unittest.mock import MagicMock -from dstack._internal.server.services.prometheus.push_metrics import run_metrics +from dstack._internal.server.services.prometheus.client_metrics import run_metrics class TestRunMetrics: