Skip to content

Commit 109cab4

Browse files
authored
Check lock_expires_at in deprecated background tasks (#3689)
* Check lock_expires_at in deprecated background tasks * Check empty_fleet_model.lock_expires_at in submitted_jobs
1 parent 48cbbad commit 109cab4

10 files changed

Lines changed: 29 additions & 1 deletion

File tree

src/dstack/_internal/server/background/scheduled_tasks/compute_groups.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ async def _process_next_compute_group():
4949
res = await session.execute(
5050
select(ComputeGroupModel)
5151
.where(
52+
ComputeGroupModel.lock_expires_at.is_(None),
5253
ComputeGroupModel.deleted == False,
5354
ComputeGroupModel.id.not_in(lockset),
5455
ComputeGroupModel.last_processed_at

src/dstack/_internal/server/background/scheduled_tasks/fleets.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ async def process_fleets():
5555
res = await session.execute(
5656
select(FleetModel)
5757
.where(
58+
FleetModel.lock_expires_at.is_(None),
5859
FleetModel.deleted == False,
5960
FleetModel.id.not_in(fleet_lockset),
6061
FleetModel.last_processed_at
@@ -75,6 +76,7 @@ async def process_fleets():
7576
res = await session.execute(
7677
select(InstanceModel)
7778
.where(
79+
InstanceModel.lock_expires_at.is_(None),
7880
InstanceModel.id.not_in(instance_lockset),
7981
InstanceModel.fleet_id.in_(fleet_ids),
8082
InstanceModel.deleted == False,

src/dstack/_internal/server/background/scheduled_tasks/gateways.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ async def process_gateways():
4545
res = await session.execute(
4646
select(GatewayModel)
4747
.where(
48+
GatewayModel.lock_expires_at.is_(None),
4849
GatewayModel.status.in_([GatewayStatus.SUBMITTED, GatewayStatus.PROVISIONING]),
4950
GatewayModel.id.not_in(lockset),
5051
)

src/dstack/_internal/server/background/scheduled_tasks/instances.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ async def _process_next_instance():
176176
InstanceModel.compute_group_id.is_not(None),
177177
)
178178
),
179+
InstanceModel.lock_expires_at.is_(None),
179180
InstanceModel.id.not_in(lockset),
180181
InstanceModel.last_processed_at
181182
< get_current_datetime() - MIN_PROCESSING_INTERVAL,

src/dstack/_internal/server/background/scheduled_tasks/placement_groups.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ async def process_placement_groups():
3131
res = await session.execute(
3232
select(PlacementGroupModel)
3333
.where(
34+
PlacementGroupModel.lock_expires_at.is_(None),
3435
PlacementGroupModel.fleet_deleted == True,
3536
PlacementGroupModel.deleted == False,
3637
PlacementGroupModel.id.not_in(lockset),

src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ async def _process_next_running_job():
121121
select(JobModel)
122122
.join(JobModel.run)
123123
.where(
124+
JobModel.lock_expires_at.is_(None),
124125
JobModel.status.in_(
125126
[JobStatus.PROVISIONING, JobStatus.PULLING, JobStatus.RUNNING]
126127
),

src/dstack/_internal/server/background/scheduled_tasks/runs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
ROLLING_DEPLOYMENT_MAX_SURGE = 1 # at most one extra replica during rolling deployment
7474

7575

76+
# NOTE: This scheduled task is going to be deprecated in favor of `RunPipeline`.
77+
# If this logic changes before removal, keep `pipeline_tasks/runs/__init__.py` in sync.
7678
async def process_runs(batch_size: int = 1):
7779
tasks = []
7880
for _ in range(batch_size):
@@ -90,6 +92,7 @@ async def _process_next_run():
9092
res = await session.execute(
9193
select(RunModel)
9294
.where(
95+
RunModel.lock_expires_at.is_(None),
9396
RunModel.id.not_in(run_lockset),
9497
RunModel.last_processed_at < now - MIN_PROCESSING_INTERVAL,
9598
# Filter out runs that don't need to be processed.

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@
151151
last_processed_at: Optional[datetime] = None
152152

153153

154+
# NOTE: This scheduled task is going to be deprecated in favor of `JobSubmittedPipeline`.
155+
# If this logic changes before removal, keep `pipeline_tasks/jobs_submitted.py` in sync.
154156
async def process_submitted_jobs(batch_size: int = 1):
155157
tasks = []
156158
effective_batch_size = _get_effective_batch_size(batch_size)
@@ -177,6 +179,7 @@ async def _process_next_submitted_job():
177179
select(JobModel)
178180
.join(JobModel.run)
179181
.where(
182+
JobModel.lock_expires_at.is_(None),
180183
JobModel.status == JobStatus.SUBMITTED,
181184
JobModel.waiting_master_job.is_not(True),
182185
JobModel.id.not_in(lockset),
@@ -634,6 +637,14 @@ async def _process_new_capacity_provisioning_path(
634637
job=context.job,
635638
)
636639

640+
if context.fleet_model is not None and fleet_model is None:
641+
await _defer_submitted_job(
642+
session=session,
643+
job_model=context.job_model,
644+
log_message="cluster fleet is locked",
645+
)
646+
return None
647+
637648
# master_job_provisioning_data is present if there is a master job.
638649
# master_instance_provisioning_data is present if there is a master instance (non empty cluster fleet).
639650
master_provisioning_data = master_job_provisioning_data or master_instance_provisioning_data
@@ -1018,6 +1029,8 @@ async def _lock_fleet_and_get_master_provisioning_data(
10181029
)
10191030
await sqlite_commit(session)
10201031
fleet_model = await _refetch_cluster_master_fleet(session=session, fleet_model=fleet_model)
1032+
if fleet_model is None:
1033+
return None, None
10211034
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
10221035
fleet_model=fleet_model,
10231036
fleet_spec=fleet_spec,
@@ -1034,7 +1047,7 @@ def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]:
10341047

10351048
async def _refetch_cluster_master_fleet(
10361049
session: AsyncSession, fleet_model: FleetModel
1037-
) -> FleetModel:
1050+
) -> Optional[FleetModel]:
10381051
res = await session.execute(
10391052
select(FleetModel)
10401053
.where(
@@ -1050,6 +1063,9 @@ async def _refetch_cluster_master_fleet(
10501063
)
10511064
empty_fleet_model = res.unique().scalar()
10521065
if empty_fleet_model is not None:
1066+
if empty_fleet_model.lock_expires_at is not None:
1067+
# Defer while a pipeline owns the empty cluster fleet.
1068+
return None
10531069
return empty_fleet_model
10541070

10551071
res = await session.execute(

src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ async def _process_next_terminating_job():
8181
res = await session.execute(
8282
select(JobModel)
8383
.where(
84+
JobModel.lock_expires_at.is_(None),
8485
JobModel.id.not_in(job_lockset),
8586
JobModel.status == JobStatus.TERMINATING,
8687
or_(

src/dstack/_internal/server/background/scheduled_tasks/volumes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ async def process_submitted_volumes():
3434
res = await session.execute(
3535
select(VolumeModel)
3636
.where(
37+
VolumeModel.lock_expires_at.is_(None),
3738
VolumeModel.status == VolumeStatus.SUBMITTED,
3839
VolumeModel.deleted == False,
3940
VolumeModel.id.not_in(lockset),

0 commit comments

Comments
 (0)