From d04edb4e616f26990a2af2ab4fac17a5fe18fcd5 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 29 Jul 2025 11:18:30 +0500 Subject: [PATCH] Set up background tasks Sentry tracing --- src/dstack/_internal/server/app.py | 32 +++++++++++++------ .../server/background/tasks/process_fleets.py | 2 ++ .../background/tasks/process_gateways.py | 2 ++ .../background/tasks/process_idle_volumes.py | 2 ++ .../background/tasks/process_instances.py | 2 ++ .../background/tasks/process_metrics.py | 3 ++ .../tasks/process_placement_groups.py | 2 ++ .../tasks/process_prometheus_metrics.py | 3 ++ .../background/tasks/process_running_jobs.py | 4 +++ .../server/background/tasks/process_runs.py | 2 ++ .../tasks/process_submitted_jobs.py | 2 ++ .../tasks/process_terminating_jobs.py | 2 ++ .../background/tasks/process_volumes.py | 2 ++ src/dstack/_internal/server/settings.py | 3 ++ .../_internal/server/utils/sentry_utils.py | 12 +++++++ 15 files changed, 65 insertions(+), 10 deletions(-) create mode 100644 src/dstack/_internal/server/utils/sentry_utils.py diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index 8aff963c1a..6c8f4aa83e 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -13,6 +13,7 @@ from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from prometheus_client import Counter, Histogram +from sentry_sdk.types import SamplingContext from dstack._internal.cli.utils.common import console from dstack._internal.core.errors import ForbiddenError, ServerClientError @@ -81,16 +82,6 @@ def create_app() -> FastAPI: - if settings.SENTRY_DSN is not None: - sentry_sdk.init( - dsn=settings.SENTRY_DSN, - release=DSTACK_VERSION, - environment=settings.SERVER_ENVIRONMENT, - enable_tracing=True, - traces_sample_rate=settings.SENTRY_TRACES_SAMPLE_RATE, - profiles_sample_rate=settings.SENTRY_PROFILES_SAMPLE_RATE, - ) - app = FastAPI( docs_url="/api/docs", lifespan=lifespan, @@ -102,6 +93,15 @@ def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): configure_logging() + if settings.SENTRY_DSN is not None: + sentry_sdk.init( + dsn=settings.SENTRY_DSN, + release=DSTACK_VERSION, + environment=settings.SERVER_ENVIRONMENT, + enable_tracing=True, + traces_sampler=_sentry_traces_sampler, + profiles_sample_rate=settings.SENTRY_PROFILES_SAMPLE_RATE, + ) server_executor = ThreadPoolExecutor(max_workers=settings.SERVER_EXECUTOR_MAX_WORKERS) asyncio.get_running_loop().set_default_executor(server_executor) await migrate() @@ -379,3 +379,15 @@ def _print_dstack_logo(): ╰━━┻━━┻╯╱╰╯╰━━┻╯ [/]""" ) + + +def _sentry_traces_sampler(sampling_context: SamplingContext) -> float: + parent_sampling_decision = sampling_context["parent_sampled"] + if parent_sampling_decision is not None: + return float(parent_sampling_decision) + transaction_context = sampling_context["transaction_context"] + name = transaction_context.get("name") + if name is not None: + if name.startswith("background."): + return settings.SENTRY_TRACES_BACKGROUND_SAMPLE_RATE + return settings.SENTRY_TRACES_SAMPLE_RATE diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index 4b2b73c547..0388ac96ba 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -19,6 +19,7 @@ is_fleet_in_use, ) from dstack._internal.server.services.locking import get_locker +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import get_current_datetime from dstack._internal.utils.logging import get_logger @@ -29,6 +30,7 @@ MIN_PROCESSING_INTERVAL = timedelta(seconds=30) +@sentry_utils.instrument_background_task async def process_fleets(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(FleetModel.__tablename__) async with get_session_ctx() as session: diff --git a/src/dstack/_internal/server/background/tasks/process_gateways.py b/src/dstack/_internal/server/background/tasks/process_gateways.py index f8de6202d1..ef6c1aebe6 100644 --- a/src/dstack/_internal/server/background/tasks/process_gateways.py +++ b/src/dstack/_internal/server/background/tasks/process_gateways.py @@ -17,6 +17,7 @@ ) from dstack._internal.server.services.locking import advisory_lock_ctx, get_locker from dstack._internal.server.services.logging import fmt +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import get_current_datetime from dstack._internal.utils.logging import get_logger @@ -28,6 +29,7 @@ async def process_gateways_connections(): await _process_active_connections() +@sentry_utils.instrument_background_task async def process_gateways(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(GatewayModel.__tablename__) async with get_session_ctx() as session: diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index 067314817f..2557012c2b 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -17,6 +17,7 @@ get_volume_configuration, volume_model_to_volume, ) +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils import common from dstack._internal.utils.common import get_current_datetime from dstack._internal.utils.logging import get_logger @@ -24,6 +25,7 @@ logger = get_logger(__name__) +@sentry_utils.instrument_background_task async def process_idle_volumes(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__) async with get_session_ctx() as session: diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 2cb6407ab8..2ad6ca4837 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -105,6 +105,7 @@ from dstack._internal.server.services.runner import client as runner_client from dstack._internal.server.services.runner.client import HealthStatus from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import ( get_current_datetime, get_or_error, @@ -136,6 +137,7 @@ async def process_instances(batch_size: int = 1): await asyncio.gather(*tasks) +@sentry_utils.instrument_background_task async def _process_next_instance(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(InstanceModel.__tablename__) async with get_session_ctx() as session: diff --git a/src/dstack/_internal/server/background/tasks/process_metrics.py b/src/dstack/_internal/server/background/tasks/process_metrics.py index 97fd2ce72f..d2197d4229 100644 --- a/src/dstack/_internal/server/background/tasks/process_metrics.py +++ b/src/dstack/_internal/server/background/tasks/process_metrics.py @@ -15,6 +15,7 @@ from dstack._internal.server.services.jobs import get_job_provisioning_data, get_job_runtime_data from dstack._internal.server.services.runner import client from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import batched, get_current_datetime, get_or_error, run_async from dstack._internal.utils.logging import get_logger @@ -26,6 +27,7 @@ MIN_COLLECT_INTERVAL_SECONDS = 9 +@sentry_utils.instrument_background_task async def collect_metrics(): async with get_session_ctx() as session: res = await session.execute( @@ -45,6 +47,7 @@ async def collect_metrics(): await _collect_jobs_metrics(batch) +@sentry_utils.instrument_background_task async def delete_metrics(): now_timestamp_micro = int(get_current_datetime().timestamp() * 1_000_000) running_timestamp_micro_cutoff = ( diff --git a/src/dstack/_internal/server/background/tasks/process_placement_groups.py b/src/dstack/_internal/server/background/tasks/process_placement_groups.py index 54b46cd91e..1f61300016 100644 --- a/src/dstack/_internal/server/background/tasks/process_placement_groups.py +++ b/src/dstack/_internal/server/background/tasks/process_placement_groups.py @@ -12,12 +12,14 @@ from dstack._internal.server.services import backends as backends_services from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.placement import placement_group_model_to_placement_group +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import get_current_datetime, run_async from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) +@sentry_utils.instrument_background_task async def process_placement_groups(): lock, lockset = get_locker(get_db().dialect_name).get_lockset( PlacementGroupModel.__tablename__ diff --git a/src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py b/src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py index 8a7ba9b299..2f8bf72142 100644 --- a/src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py +++ b/src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py @@ -19,6 +19,7 @@ from dstack._internal.server.services.jobs import get_job_provisioning_data, get_job_runtime_data from dstack._internal.server.services.runner import client from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel +from dstack._internal.server.utils import sentry_utils from dstack._internal.server.utils.common import gather_map_async from dstack._internal.utils.common import batched, get_current_datetime, get_or_error, run_async from dstack._internal.utils.logging import get_logger @@ -34,6 +35,7 @@ METRICS_TTL_SECONDS = 600 +@sentry_utils.instrument_background_task async def collect_prometheus_metrics(): now = get_current_datetime() cutoff = now - timedelta(seconds=MIN_COLLECT_INTERVAL_SECONDS) @@ -61,6 +63,7 @@ async def collect_prometheus_metrics(): await _collect_jobs_metrics(batch, now) +@sentry_utils.instrument_background_task async def delete_prometheus_metrics(): now = get_current_datetime() cutoff = now - timedelta(seconds=METRICS_TTL_SECONDS) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 18f04137d7..4a3c9b7a01 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -73,6 +73,7 @@ ) from dstack._internal.server.services.secrets import get_project_secrets_mapping from dstack._internal.server.services.storage import get_default_storage +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils import common as common_utils from dstack._internal.utils.interpolator import InterpolatorError, VariablesInterpolator from dstack._internal.utils.logging import get_logger @@ -94,6 +95,7 @@ async def process_running_jobs(batch_size: int = 1): await asyncio.gather(*tasks) +@sentry_utils.instrument_background_task async def _process_next_running_job(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) async with get_session_ctx() as session: @@ -159,6 +161,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER job_model.last_processed_at = common_utils.get_current_datetime() + await session.commit() return job = find_job(run.jobs, job_model.replica_num, job_model.job_num) @@ -204,6 +207,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER job_model.termination_reason_message = e.args[0] job_model.last_processed_at = common_utils.get_current_datetime() + await session.commit() return server_ssh_private_keys = get_instance_ssh_private_keys( diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index ad79a4973d..ab0ae1340c 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -43,6 +43,7 @@ ) from dstack._internal.server.services.secrets import get_project_secrets_mapping from dstack._internal.server.services.services import update_service_desired_replica_count +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils import common from dstack._internal.utils.logging import get_logger @@ -59,6 +60,7 @@ async def process_runs(batch_size: int = 1): await asyncio.gather(*tasks) +@sentry_utils.instrument_background_task async def _process_next_run(): run_lock, run_lockset = get_locker(get_db().dialect_name).get_lockset(RunModel.__tablename__) job_lock, job_lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 005d615f87..e33271064f 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -75,6 +75,7 @@ from dstack._internal.server.services.volumes import ( volume_model_to_volume, ) +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils import common as common_utils from dstack._internal.utils import env as env_utils from dstack._internal.utils.logging import get_logger @@ -109,6 +110,7 @@ def _get_effective_batch_size(batch_size: int) -> int: return batch_size +@sentry_utils.instrument_background_task async def _process_next_submitted_job(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) async with get_session_ctx() as session: diff --git a/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py b/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py index 1951df1e15..cd81765636 100644 --- a/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py @@ -18,6 +18,7 @@ ) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.logging import fmt +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import ( get_current_datetime, get_or_error, @@ -34,6 +35,7 @@ async def process_terminating_jobs(batch_size: int = 1): await asyncio.gather(*tasks) +@sentry_utils.instrument_background_task async def _process_next_terminating_job(): job_lock, job_lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) instance_lock, instance_lockset = get_locker(get_db().dialect_name).get_lockset( diff --git a/src/dstack/_internal/server/background/tasks/process_volumes.py b/src/dstack/_internal/server/background/tasks/process_volumes.py index 561e1edb8d..4e37f6997b 100644 --- a/src/dstack/_internal/server/background/tasks/process_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_volumes.py @@ -16,12 +16,14 @@ from dstack._internal.server.services import backends as backends_services from dstack._internal.server.services import volumes as volumes_services from dstack._internal.server.services.locking import get_locker +from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import get_current_datetime, run_async from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) +@sentry_utils.instrument_background_task async def process_submitted_volumes(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__) async with get_session_ctx() as session: diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index f025d71644..0ca72ea9f9 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -93,6 +93,9 @@ SENTRY_DSN = os.getenv("DSTACK_SENTRY_DSN") SENTRY_TRACES_SAMPLE_RATE = float(os.getenv("DSTACK_SENTRY_TRACES_SAMPLE_RATE", 0.1)) +SENTRY_TRACES_BACKGROUND_SAMPLE_RATE = float( + os.getenv("DSTACK_SENTRY_TRACES_BACKGROUND_SAMPLE_RATE", 0.01) +) SENTRY_PROFILES_SAMPLE_RATE = float(os.getenv("DSTACK_SENTRY_PROFILES_SAMPLE_RATE", 0)) DEFAULT_CREDS_DISABLED = os.getenv("DSTACK_DEFAULT_CREDS_DISABLED") is not None diff --git a/src/dstack/_internal/server/utils/sentry_utils.py b/src/dstack/_internal/server/utils/sentry_utils.py new file mode 100644 index 0000000000..c878e1e912 --- /dev/null +++ b/src/dstack/_internal/server/utils/sentry_utils.py @@ -0,0 +1,12 @@ +import functools + +import sentry_sdk + + +def instrument_background_task(f): + @functools.wraps(f) + async def wrapper(*args, **kwargs): + with sentry_sdk.start_transaction(name=f"background.{f.__name__}"): + return await f(*args, **kwargs) + + return wrapper