Skip to content

Commit d1fd238

Browse files
authored
Implement volume pipeline (#3604)
* Fix process_submitted_volumes selecting deleted volumes * Implement pipeline for submitted volumes * Implement delete volume async API * Add TestVolumeWorkerDeleted * Make process_idle_volumes work with pipelines * Handle locked volumes when attaching * Test idle volumes with pipelines * Add index ix_volumes_pipeline_fetch_q * Add sentry instrumentation for pipeline tasks * Instrument all pipelines * Fix to_be_deleted server_default
1 parent 798f6bc commit d1fd238

32 files changed

Lines changed: 1369 additions & 50 deletions

src/dstack/_internal/core/models/volumes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
class VolumeStatus(str, Enum):
1919
SUBMITTED = "submitted"
20+
# PROVISIONING is currently not used since on all backends supporting volumes,
21+
# volumes become ACTIVE (ready to be used) almost immediately after provisioning.
2022
PROVISIONING = "provisioning"
2123
ACTIVE = "active"
2224
FAILED = "failed"

src/dstack/_internal/server/background/pipeline_tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
77
PlacementGroupPipeline,
88
)
9+
from dstack._internal.server.background.pipeline_tasks.volumes import VolumePipeline
910
from dstack._internal.utils.logging import get_logger
1011

1112
logger = get_logger(__name__)
@@ -17,6 +18,7 @@ def __init__(self) -> None:
1718
ComputeGroupPipeline(),
1819
GatewayPipeline(),
1920
PlacementGroupPipeline(),
21+
VolumePipeline(),
2022
]
2123
self._hinter = PipelineHinter(self._pipelines)
2224

src/dstack/_internal/server/background/pipeline_tasks/compute_groups.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from dstack._internal.server.services.compute_groups import compute_group_model_to_compute_group
2828
from dstack._internal.server.services.instances import emit_instance_status_change_event
2929
from dstack._internal.server.services.locking import get_locker
30+
from dstack._internal.server.utils import sentry_utils
3031
from dstack._internal.utils.common import get_current_datetime, run_async
3132
from dstack._internal.utils.logging import get_logger
3233

@@ -107,6 +108,7 @@ def __init__(
107108
queue_check_delay=queue_check_delay,
108109
)
109110

111+
@sentry_utils.instrument_named_task("pipeline_tasks.ComputeGroupFetcher.fetch")
110112
async def fetch(self, limit: int) -> list[PipelineItem]:
111113
compute_group_lock, _ = get_locker(get_db().dialect_name).get_lockset(
112114
ComputeGroupModel.__tablename__
@@ -172,6 +174,7 @@ def __init__(
172174
heartbeater=heartbeater,
173175
)
174176

177+
@sentry_utils.instrument_named_task("pipeline_tasks.ComputeGroupWorker.process")
175178
async def process(self, item: PipelineItem):
176179
async with get_session_ctx() as session:
177180
res = await session.execute(

src/dstack/_internal/server/background/pipeline_tasks/gateways.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from dstack._internal.server.services.gateways.pool import gateway_connections_pool
3636
from dstack._internal.server.services.locking import get_locker
3737
from dstack._internal.server.services.logging import fmt
38+
from dstack._internal.server.utils import sentry_utils
3839
from dstack._internal.utils.common import get_current_datetime, run_async
3940
from dstack._internal.utils.logging import get_logger
4041

@@ -118,6 +119,7 @@ def __init__(
118119
queue_check_delay=queue_check_delay,
119120
)
120121

122+
@sentry_utils.instrument_named_task("pipeline_tasks.GatewayFetcher.fetch")
121123
async def fetch(self, limit: int) -> list[GatewayPipelineItem]:
122124
gateway_lock, _ = get_locker(get_db().dialect_name).get_lockset(GatewayModel.__tablename__)
123125
async with gateway_lock:
@@ -193,6 +195,7 @@ def __init__(
193195
heartbeater=heartbeater,
194196
)
195197

198+
@sentry_utils.instrument_named_task("pipeline_tasks.GatewayWorker.process")
196199
async def process(self, item: GatewayPipelineItem):
197200
if item.to_be_deleted:
198201
await _process_to_be_deleted_item(item)

src/dstack/_internal/server/background/pipeline_tasks/placement_groups.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from dstack._internal.server.services import backends as backends_services
2727
from dstack._internal.server.services.locking import get_locker
2828
from dstack._internal.server.services.placement import placement_group_model_to_placement_group
29+
from dstack._internal.server.utils import sentry_utils
2930
from dstack._internal.utils.common import get_current_datetime, run_async
3031
from dstack._internal.utils.logging import get_logger
3132

@@ -103,6 +104,7 @@ def __init__(
103104
queue_check_delay=queue_check_delay,
104105
)
105106

107+
@sentry_utils.instrument_named_task("pipeline_tasks.PlacementGroupFetcher.fetch")
106108
async def fetch(self, limit: int) -> list[PipelineItem]:
107109
placement_group_lock, _ = get_locker(get_db().dialect_name).get_lockset(
108110
PlacementGroupModel.__tablename__
@@ -170,6 +172,7 @@ def __init__(
170172
heartbeater=heartbeater,
171173
)
172174

175+
@sentry_utils.instrument_named_task("pipeline_tasks.PlacementGroupWorker.process")
173176
async def process(self, item: PipelineItem):
174177
async with get_session_ctx() as session:
175178
res = await session.execute(
@@ -230,6 +233,7 @@ async def _delete_placement_group(placement_group_model: PlacementGroupModel) ->
230233
backend_type=placement_group.provisioning_data.backend,
231234
)
232235
if backend is None:
236+
# TODO: Retry deletion
233237
logger.error(
234238
"Failed to delete placement group %s. Backend not available. Please delete it manually.",
235239
placement_group.name,
@@ -245,6 +249,7 @@ async def _delete_placement_group(placement_group_model: PlacementGroupModel) ->
245249
)
246250
return {}
247251
except Exception:
252+
# TODO: Retry deletion
248253
logger.exception(
249254
"Got exception when deleting placement group %s. Please delete it manually.",
250255
placement_group.name,

0 commit comments

Comments
 (0)