Skip to content

Commit 24cfa30

Browse files
committed
Instrument all pipelines
1 parent 5aa90fd commit 24cfa30

3 files changed

Lines changed: 9 additions & 0 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/compute_groups.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from dstack._internal.server.services.compute_groups import compute_group_model_to_compute_group
2828
from dstack._internal.server.services.instances import emit_instance_status_change_event
2929
from dstack._internal.server.services.locking import get_locker
30+
from dstack._internal.server.utils import sentry_utils
3031
from dstack._internal.utils.common import get_current_datetime, run_async
3132
from dstack._internal.utils.logging import get_logger
3233

@@ -107,6 +108,7 @@ def __init__(
107108
queue_check_delay=queue_check_delay,
108109
)
109110

111+
@sentry_utils.instrument_named_task("pipeline_tasks.ComputeGroupFetcher.fetch")
110112
async def fetch(self, limit: int) -> list[PipelineItem]:
111113
compute_group_lock, _ = get_locker(get_db().dialect_name).get_lockset(
112114
ComputeGroupModel.__tablename__
@@ -172,6 +174,7 @@ def __init__(
172174
heartbeater=heartbeater,
173175
)
174176

177+
@sentry_utils.instrument_named_task("pipeline_tasks.ComputeGroupWorker.process")
175178
async def process(self, item: PipelineItem):
176179
async with get_session_ctx() as session:
177180
res = await session.execute(

src/dstack/_internal/server/background/pipeline_tasks/gateways.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from dstack._internal.server.services.gateways.pool import gateway_connections_pool
3636
from dstack._internal.server.services.locking import get_locker
3737
from dstack._internal.server.services.logging import fmt
38+
from dstack._internal.server.utils import sentry_utils
3839
from dstack._internal.utils.common import get_current_datetime, run_async
3940
from dstack._internal.utils.logging import get_logger
4041

@@ -118,6 +119,7 @@ def __init__(
118119
queue_check_delay=queue_check_delay,
119120
)
120121

122+
@sentry_utils.instrument_named_task("pipeline_tasks.GatewayFetcher.fetch")
121123
async def fetch(self, limit: int) -> list[GatewayPipelineItem]:
122124
gateway_lock, _ = get_locker(get_db().dialect_name).get_lockset(GatewayModel.__tablename__)
123125
async with gateway_lock:
@@ -193,6 +195,7 @@ def __init__(
193195
heartbeater=heartbeater,
194196
)
195197

198+
@sentry_utils.instrument_named_task("pipeline_tasks.GatewayWorker.process")
196199
async def process(self, item: GatewayPipelineItem):
197200
if item.to_be_deleted:
198201
await _process_to_be_deleted_item(item)

src/dstack/_internal/server/background/pipeline_tasks/placement_groups.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from dstack._internal.server.services import backends as backends_services
2727
from dstack._internal.server.services.locking import get_locker
2828
from dstack._internal.server.services.placement import placement_group_model_to_placement_group
29+
from dstack._internal.server.utils import sentry_utils
2930
from dstack._internal.utils.common import get_current_datetime, run_async
3031
from dstack._internal.utils.logging import get_logger
3132

@@ -103,6 +104,7 @@ def __init__(
103104
queue_check_delay=queue_check_delay,
104105
)
105106

107+
@sentry_utils.instrument_named_task("pipeline_tasks.PlacementGroupFetcher.fetch")
106108
async def fetch(self, limit: int) -> list[PipelineItem]:
107109
placement_group_lock, _ = get_locker(get_db().dialect_name).get_lockset(
108110
PlacementGroupModel.__tablename__
@@ -170,6 +172,7 @@ def __init__(
170172
heartbeater=heartbeater,
171173
)
172174

175+
@sentry_utils.instrument_named_task("pipeline_tasks.PlacementGroupWorker.process")
173176
async def process(self, item: PipelineItem):
174177
async with get_session_ctx() as session:
175178
res = await session.execute(

0 commit comments

Comments
 (0)