Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/dstack/_internal/server/background/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from prometheus_client import Counter, Histogram
Comment thread
un-def marked this conversation as resolved.


class RunMetrics:
"""Wrapper class for run-related Prometheus metrics."""

def __init__(self):
self._submit_to_provision_duration = Histogram(
"dstack_submit_to_provision_duration_seconds",
"Time from when a run has been submitted and first job provisioning",
# Buckets optimized for percentile calculation
buckets=[
15,
30,
45,
60,
90,
120,
180,
240,
300,
360,
420,
480,
540,
600,
900,
1200,
1800,
float("inf"),
],
labelnames=["project_name", "run_type"],
)

self._pending_runs_total = Counter(
"dstack_pending_runs_total",
"Number of pending runs",
labelnames=["project_name", "run_type"],
)

def log_submit_to_provision_duration(
self, duration_seconds: float, project_name: str, run_type: str
):
self._submit_to_provision_duration.labels(
project_name=project_name, run_type=run_type
).observe(duration_seconds)

def increment_pending_runs(self, project_name: str, run_type: str):
self._pending_runs_total.labels(project_name=project_name, run_type=run_type).inc()


run_metrics = RunMetrics()
21 changes: 21 additions & 0 deletions src/dstack/_internal/server/background/tasks/process_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
RunStatus,
RunTerminationReason,
)
from dstack._internal.server.background.metrics import run_metrics
from dstack._internal.server.db import get_session_ctx
from dstack._internal.server.models import JobModel, ProjectModel, RunModel
from dstack._internal.server.services.jobs import (
Expand Down Expand Up @@ -369,6 +370,26 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel):
run_model.status.name,
new_status.name,
)
if run_model.status == RunStatus.SUBMITTED and new_status == RunStatus.PROVISIONING:
current_time = datetime.datetime.now(datetime.timezone.utc)
Comment thread
Nadine-H marked this conversation as resolved.
Outdated
submit_to_provision_duration = (
current_time - run_model.submitted_at.replace(tzinfo=datetime.timezone.utc)
).total_seconds()
logger.info(
"%s: run took %.2f seconds from submision to provisioning.",
fmt(run_model),
submit_to_provision_duration,
)
project_name = run_model.project.name
run_type = RunSpec.__response__.parse_raw(run_model.run_spec).configuration.type
Comment thread
Nadine-H marked this conversation as resolved.
Outdated
run_metrics.log_submit_to_provision_duration(
submit_to_provision_duration, project_name, run_type
)

if new_status == RunStatus.PENDING:
run_spec = RunSpec.__response__.parse_raw(run_model.run_spec)
Comment thread
Nadine-H marked this conversation as resolved.
Outdated
run_metrics.increment_pending_runs(run_model.project.name, run_spec.configuration.type)

run_model.status = new_status
run_model.termination_reason = termination_reason
# While a run goes to pending without provisioning, resubmission_attempt increases.
Expand Down
75 changes: 70 additions & 5 deletions src/tests/_internal/server/background/tasks/test_process_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,26 @@ class TestProcessRuns:
async def test_submitted_to_provisioning(self, test_db, session: AsyncSession):
run = await make_run(session, status=RunStatus.SUBMITTED)
await create_job(session=session, run=run, status=JobStatus.PROVISIONING)
await process_runs.process_runs()
current_time = datetime.datetime.now(datetime.timezone.utc)

expected_duration = (
current_time - run.submitted_at.replace(tzinfo=datetime.timezone.utc)
).total_seconds()

with patch(
"dstack._internal.server.background.tasks.process_runs.run_metrics"
) as mock_run_metrics:
await process_runs.process_runs()

mock_run_metrics.log_submit_to_provision_duration.assert_called_once()
args = mock_run_metrics.log_submit_to_provision_duration.call_args[0]
assert args[1] == run.project.name
assert args[2] == "service"
# Assert the duration is close to our expected duration (within 0.05 second tolerance)
Comment thread
Nadine-H marked this conversation as resolved.
assert abs(args[0] - expected_duration) < 0.05, (
f"Expected duration ~{expected_duration:.3f}s, got {args[0]:.3f}s"
)

await session.refresh(run)
assert run.status == RunStatus.PROVISIONING

Expand All @@ -95,7 +114,14 @@ async def test_keep_provisioning(self, test_db, session: AsyncSession):
run = await make_run(session, status=RunStatus.PROVISIONING)
await create_job(session=session, run=run, status=JobStatus.PULLING)

await process_runs.process_runs()
with patch(
"dstack._internal.server.background.tasks.process_runs.run_metrics"
) as mock_run_metrics:
await process_runs.process_runs()

mock_run_metrics.log_submit_to_provision_duration.assert_not_called()
mock_run_metrics.increment_pending_runs.assert_not_called()

await session.refresh(run)
assert run.status == RunStatus.PROVISIONING

Expand Down Expand Up @@ -153,9 +179,19 @@ async def test_retry_running_to_pending(self, test_db, session: AsyncSession):
instance=instance,
job_provisioning_data=get_job_provisioning_data(),
)
with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock:
with (
patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock,
patch(
"dstack._internal.server.background.tasks.process_runs.run_metrics"
) as mock_run_metrics,
):
datetime_mock.return_value = run.submitted_at + datetime.timedelta(minutes=3)
await process_runs.process_runs()

mock_run_metrics.increment_pending_runs.assert_called_once_with(
run.project.name, "service"
)

await session.refresh(run)
assert run.status == RunStatus.PENDING

Expand Down Expand Up @@ -201,8 +237,27 @@ async def test_submitted_to_provisioning_if_any(self, test_db, session: AsyncSes
run = await make_run(session, status=RunStatus.SUBMITTED, replicas=2)
await create_job(session=session, run=run, status=JobStatus.SUBMITTED, replica_num=0)
await create_job(session=session, run=run, status=JobStatus.PROVISIONING, replica_num=1)
current_time = datetime.datetime.now(datetime.timezone.utc)

expected_duration = (
current_time - run.submitted_at.replace(tzinfo=datetime.timezone.utc)
).total_seconds()

with patch(
"dstack._internal.server.background.tasks.process_runs.run_metrics"
) as mock_run_metrics:
await process_runs.process_runs()

mock_run_metrics.log_submit_to_provision_duration.assert_called_once()
args = mock_run_metrics.log_submit_to_provision_duration.call_args[0]
assert args[1] == run.project.name
assert args[2] == "service"
assert isinstance(args[0], float)
# Assert the duration is close to our expected duration (within 0.05 second tolerance)
Comment thread
Nadine-H marked this conversation as resolved.
Outdated
assert abs(args[0] - expected_duration) < 0.05, (
f"Expected duration ~{expected_duration:.3f}s, got {args[0]:.3f}s"
)

await process_runs.process_runs()
await session.refresh(run)
assert run.status == RunStatus.PROVISIONING

Expand Down Expand Up @@ -243,9 +298,19 @@ async def test_all_no_capacity_to_pending(self, test_db, session: AsyncSession):
instance=await create_instance(session, project=run.project, spot=True),
job_provisioning_data=get_job_provisioning_data(),
)
with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock:
with (
patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock,
patch(
"dstack._internal.server.background.tasks.process_runs.run_metrics"
) as mock_run_metrics,
):
datetime_mock.return_value = run.submitted_at + datetime.timedelta(minutes=3)
await process_runs.process_runs()

mock_run_metrics.increment_pending_runs.assert_called_once_with(
run.project.name, "service"
)

await session.refresh(run)
assert run.status == RunStatus.PENDING

Expand Down
45 changes: 45 additions & 0 deletions src/tests/_internal/server/background/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from unittest.mock import MagicMock

from dstack._internal.server.background.metrics import run_metrics


class TestRunMetrics:
def test_log_submit_to_provision_duration(self, monkeypatch):
mock_histogram = MagicMock()
mock_labels = MagicMock()
mock_histogram.labels.return_value = mock_labels
monkeypatch.setattr(run_metrics, "_submit_to_provision_duration", mock_histogram)

duration = 120.5
project_name = "test-project"
run_type = "dev"

run_metrics.log_submit_to_provision_duration(duration, project_name, run_type)

mock_histogram.labels.assert_called_once_with(project_name=project_name, run_type=run_type)
mock_labels.observe.assert_called_once_with(duration)

def test_increment_pending_runs(self, monkeypatch):
mock_counter = MagicMock()
mock_labels = MagicMock()
mock_counter.labels.return_value = mock_labels

monkeypatch.setattr(run_metrics, "_pending_runs_total", mock_counter)

project_name = "test-project"
run_type = "train"

run_metrics.increment_pending_runs(project_name, run_type)
mock_counter.labels.assert_called_once_with(project_name=project_name, run_type=run_type)
mock_labels.inc.assert_called_once()

def test_multiple_calls_to_log_submit_to_provision_duration(self):
run_metrics.log_submit_to_provision_duration(60.0, "project1", "dev")
run_metrics.log_submit_to_provision_duration(120.0, "project1", "prod")
run_metrics.log_submit_to_provision_duration(30.0, "project2", "dev")

def test_multiple_calls_to_increment_pending_runs(self):
run_metrics.increment_pending_runs("project1", "dev")
run_metrics.increment_pending_runs("project1", "prod")
run_metrics.increment_pending_runs("project2", "dev")
run_metrics.increment_pending_runs("project1", "dev")