Skip to content

Commit a1d9703

Browse files
authored
Implement run pipeline (#3686)
* Add RunModel pipeline columns * Add RunPipeline scaffolding * Add run pipeline terminating worker * Implement pending path for non-services * Implement active path for non-services * Fix stale run.jobs view * Document Locking related resources before refetch * Fix stale fleet.instances view * Early return on fleet termination * Implement pending path for services * Implement active services scaling * Implement active services rolling deployment * Clarify scaling and rolling deploy conflict * Wire * Hint run fetch * Add ix_runs_pipeline_fetch_q index * Cleanup * Fix _load_active_context extra return param * Fix double unlock * Fix unfinished jobs in retry replica * Add tests * Respect parent pipeline priority * Fix unions syntax * Decrease fleets min_processing_interval * Process legacy instances without fleets * Fix tests
1 parent 3b51bfa commit a1d9703

28 files changed

Lines changed: 4404 additions & 59 deletions

contributing/PIPELINES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ It's ok not to force all pipelines into one exact shape.
9292

9393
When writing processing results, update the main row with a filter by both `id` and `lock_token`. This guarantees that only the worker that still owns the lock can apply its results. If the update affects no rows, treat the item as stale and skip applying other changes (status changes, related updates, events). A stale item means another worker or replica already continued processing.
9494

95+
**Locking related resources before refetch**
96+
97+
If you first refetch a main resource and only after lock the related resources, you need to ensure the worker doesn't get the stale view on related resources or works properly even in this case. It's often more robust to first lock related resources and then refetch the main resource with related resources already locked.
98+
9599
**Locking many related resources**
96100

97101
A pipeline may need to lock a potentially big set of related resource, e.g. fleet pipeline locking all fleet's instances. For this, do one SELECT FOR UPDATE of non-locked instances and one SELECT to see how many instances there are, and check if you managed to lock all of them. If fail to lock, release the main lock and try processing on another fetch iteration. You may keep `lock_owner` on the main resource or set `lock_owner` on locked related resource and make other pipelines respect that to guarantee the eventual locking of all related resources and avoid lock starvation.

src/dstack/_internal/server/background/pipeline_tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
1616
PlacementGroupPipeline,
1717
)
18+
from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline
1819
from dstack._internal.server.background.pipeline_tasks.volumes import VolumePipeline
1920
from dstack._internal.utils.logging import get_logger
2021

@@ -32,6 +33,7 @@ def __init__(self) -> None:
3233
JobTerminatingPipeline(),
3334
InstancePipeline(),
3435
PlacementGroupPipeline(),
36+
RunPipeline(),
3537
VolumePipeline(),
3638
]
3739
self._hinter = PipelineHinter(self._pipelines)

src/dstack/_internal/server/background/pipeline_tasks/fleets.py

Lines changed: 67 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def __init__(
6262
workers_num: int = 10,
6363
queue_lower_limit_factor: float = 0.5,
6464
queue_upper_limit_factor: float = 2.0,
65-
min_processing_interval: timedelta = timedelta(seconds=60),
65+
min_processing_interval: timedelta = timedelta(seconds=30),
6666
lock_timeout: timedelta = timedelta(seconds=20),
6767
heartbeat_trigger: timedelta = timedelta(seconds=10),
6868
) -> None:
@@ -199,19 +199,13 @@ async def process(self, item: PipelineItem):
199199
process_context = await _load_process_context(item)
200200
if process_context is None:
201201
return
202-
result = await _process_fleet(
203-
process_context.fleet_model,
204-
consolidation_fleet_spec=process_context.consolidation_fleet_spec,
205-
consolidation_instances=process_context.consolidation_instances,
206-
)
202+
result = await _process_fleet(process_context.fleet_model)
207203
await _apply_process_result(item, process_context, result)
208204

209205

210206
@dataclass
211207
class _ProcessContext:
212208
fleet_model: FleetModel
213-
consolidation_fleet_spec: Optional[FleetSpec]
214-
consolidation_instances: Optional[list[InstanceModel]]
215209
locked_instance_ids: set[uuid.UUID] = field(default_factory=set)
216210

217211

@@ -260,34 +254,64 @@ def has_changes(self) -> bool:
260254

261255
async def _load_process_context(item: PipelineItem) -> Optional[_ProcessContext]:
262256
async with get_session_ctx() as session:
263-
fleet_model = await _refetch_locked_fleet(session=session, item=item)
257+
fleet_model = await _refetch_locked_fleet_for_lock_decision(session=session, item=item)
264258
if fleet_model is None:
265259
log_lock_token_mismatch(logger, item)
266260
return None
267261

268-
consolidation_fleet_spec = _get_fleet_spec_if_ready_for_consolidation(fleet_model)
269-
consolidation_instances = None
270-
if consolidation_fleet_spec is not None:
271-
consolidation_instances = await _lock_fleet_instances_for_consolidation(
272-
session=session,
273-
item=item,
274-
)
275-
if consolidation_instances is None:
276-
return None
262+
locked_instance_ids = await _lock_fleet_instances_for_processing(
263+
session=session,
264+
item=item,
265+
fleet_model=fleet_model,
266+
)
267+
if locked_instance_ids is None:
268+
return None
269+
270+
fleet_model = await _refetch_locked_fleet_for_processing(session=session, item=item)
271+
if fleet_model is None:
272+
log_lock_token_mismatch(logger, item)
273+
if locked_instance_ids:
274+
await _unlock_fleet_locked_instances(
275+
session=session,
276+
item=item,
277+
locked_instance_ids=locked_instance_ids,
278+
)
279+
await session.commit()
280+
return None
277281

278282
return _ProcessContext(
279283
fleet_model=fleet_model,
280-
consolidation_fleet_spec=consolidation_fleet_spec,
281-
consolidation_instances=consolidation_instances,
282-
locked_instance_ids=(
283-
set()
284-
if consolidation_instances is None
285-
else {i.id for i in consolidation_instances}
286-
),
284+
locked_instance_ids=locked_instance_ids,
287285
)
288286

289287

290-
async def _refetch_locked_fleet(
288+
async def _refetch_locked_fleet_for_lock_decision(
289+
session: AsyncSession,
290+
item: PipelineItem,
291+
) -> Optional[FleetModel]:
292+
res = await session.execute(
293+
select(FleetModel)
294+
.where(
295+
FleetModel.id == item.id,
296+
FleetModel.lock_token == item.lock_token,
297+
)
298+
.options(
299+
load_only(
300+
FleetModel.id,
301+
FleetModel.status,
302+
FleetModel.spec,
303+
FleetModel.current_master_instance_id,
304+
FleetModel.consolidation_attempt,
305+
FleetModel.last_consolidated_at,
306+
FleetModel.last_processed_at,
307+
)
308+
)
309+
.execution_options(populate_existing=True)
310+
)
311+
return res.unique().scalar_one_or_none()
312+
313+
314+
async def _refetch_locked_fleet_for_processing(
291315
session: AsyncSession,
292316
item: PipelineItem,
293317
) -> Optional[FleetModel]:
@@ -308,6 +332,7 @@ async def _refetch_locked_fleet(
308332
FleetModel.runs.and_(RunModel.status.not_in(RunStatus.finished_statuses()))
309333
).load_only(RunModel.status)
310334
)
335+
.execution_options(populate_existing=True)
311336
)
312337
return res.unique().scalar_one_or_none()
313338

@@ -326,10 +351,17 @@ def _get_fleet_spec_if_ready_for_consolidation(fleet_model: FleetModel) -> Optio
326351
return consolidation_fleet_spec
327352

328353

329-
async def _lock_fleet_instances_for_consolidation(
354+
async def _lock_fleet_instances_for_processing(
330355
session: AsyncSession,
331356
item: PipelineItem,
332-
) -> Optional[list[InstanceModel]]:
357+
fleet_model: FleetModel,
358+
) -> Optional[set[uuid.UUID]]:
359+
if _get_fleet_spec_if_ready_for_consolidation(fleet_model) is None:
360+
if fleet_model.current_master_instance_id is None:
361+
return set()
362+
if not _is_cloud_cluster_fleet_spec(get_fleet_spec(fleet_model)):
363+
return set()
364+
333365
instance_lock, _ = get_locker(get_db().dialect_name).get_lockset(InstanceModel.__tablename__)
334366
async with instance_lock:
335367
res = await session.execute(
@@ -347,6 +379,7 @@ async def _lock_fleet_instances_for_consolidation(
347379
),
348380
)
349381
.with_for_update(skip_locked=True, key_share=True, of=InstanceModel)
382+
.options(load_only(InstanceModel.id))
350383
)
351384
locked_instance_models = list(res.scalars().all())
352385
locked_instance_ids = {instance_model.id for instance_model in locked_instance_models}
@@ -389,7 +422,7 @@ async def _lock_fleet_instances_for_consolidation(
389422
instance_model.lock_token = item.lock_token
390423
instance_model.lock_owner = FleetPipeline.__name__
391424
await session.commit()
392-
return locked_instance_models
425+
return locked_instance_ids
393426

394427

395428
async def _apply_process_result(
@@ -461,30 +494,29 @@ async def _apply_process_result(
461494

462495
async def _process_fleet(
463496
fleet_model: FleetModel,
464-
consolidation_fleet_spec: Optional[FleetSpec] = None,
465-
consolidation_instances: Optional[Sequence[InstanceModel]] = None,
466497
) -> _ProcessResult:
467498
result = _ProcessResult()
468-
effective_instances = list(consolidation_instances or fleet_model.instances)
499+
consolidation_fleet_spec = _get_fleet_spec_if_ready_for_consolidation(fleet_model)
469500
if consolidation_fleet_spec is not None:
470501
result = _consolidate_fleet_state_with_spec(
471502
fleet_model,
472503
consolidation_fleet_spec=consolidation_fleet_spec,
473-
consolidation_instances=effective_instances,
504+
consolidation_instances=fleet_model.instances,
474505
)
475506
if len(result.new_instance_creates) == 0 and _should_delete_fleet(fleet_model):
476507
result.fleet_update_map["status"] = FleetStatus.TERMINATED
477508
result.fleet_update_map["deleted"] = True
478509
result.fleet_update_map["deleted_at"] = NOW_PLACEHOLDER
510+
return result
479511
_set_fail_instances_on_master_bootstrap_failure(
480512
fleet_model=fleet_model,
481-
instance_models=effective_instances,
513+
instance_models=fleet_model.instances,
482514
instance_id_to_update_map=result.instance_id_to_update_map,
483515
)
484516
_set_current_master_instance_id(
485517
fleet_model=fleet_model,
486518
fleet_update_map=result.fleet_update_map,
487-
instance_models=effective_instances,
519+
instance_models=fleet_model.instances,
488520
instance_id_to_update_map=result.instance_id_to_update_map,
489521
new_instance_creates=result.new_instance_creates,
490522
)

src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141
from dstack._internal.server.db import get_db, get_session_ctx
4242
from dstack._internal.server.models import (
43+
FleetModel,
4344
InstanceHealthCheckModel,
4445
InstanceModel,
4546
JobModel,
@@ -147,6 +148,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
147148
now = get_current_datetime()
148149
res = await session.execute(
149150
select(InstanceModel)
151+
.join(InstanceModel.fleet, isouter=True)
150152
.where(
151153
InstanceModel.status.in_(
152154
[
@@ -164,6 +166,11 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
164166
)
165167
),
166168
InstanceModel.deleted == False,
169+
or_(
170+
# Do not try to lock instances if the fleet is waiting for the lock.
171+
InstanceModel.fleet_id.is_(None),
172+
FleetModel.lock_owner.is_(None),
173+
),
167174
or_(
168175
InstanceModel.last_processed_at <= now - self._min_processing_interval,
169176
InstanceModel.last_processed_at == InstanceModel.created_at,
@@ -239,6 +246,9 @@ async def process(self, item: InstancePipelineItem):
239246
if process_context is None:
240247
return
241248

249+
# Keep apply centralized here because every instance path returns the same
250+
# `ProcessResult` shape for one primary model, with only a small set of
251+
# optional side effects such as health checks or placement-group scheduling.
242252
await _apply_process_result(
243253
item=item,
244254
instance_model=process_context.instance_model,

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
197197
),
198198
RunModel.status.not_in([RunStatus.TERMINATING]),
199199
JobModel.last_processed_at <= now - self._min_processing_interval,
200+
# Do not try to lock jobs if the run is waiting for the lock.
201+
RunModel.lock_owner.is_(None),
200202
or_(
201203
JobModel.lock_expires_at.is_(None),
202204
JobModel.lock_expires_at < now,

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
243243
JobModel.last_processed_at <= now - self._min_processing_interval,
244244
JobModel.last_processed_at == JobModel.submitted_at,
245245
),
246+
# Do not try to lock jobs if the run is waiting for the lock.
247+
RunModel.lock_owner.is_(None),
246248
or_(
247249
JobModel.lock_expires_at.is_(None),
248250
JobModel.lock_expires_at < now,

src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ async def _process_terminating_job(
582582
"""
583583
Stops the job: tells shim to stop the container, detaches the job from the instance,
584584
and detaches volumes from the instance.
585-
Graceful stop should already be done by `process_terminating_run`.
585+
Graceful stop should already be done by the run terminating path.
586586
"""
587587
instance_update_map = None if instance_model is None else _InstanceUpdateMap()
588588
result = _ProcessResult(instance_update_map=instance_update_map)

0 commit comments

Comments
 (0)