Skip to content

Commit 3ec2f23

Browse files
committed
Fix missing instance lock in delete_fleets
1 parent 104834e commit 3ec2f23

File tree

1 file changed

+34
-20
lines changed
  • src/dstack/_internal/server/services

1 file changed

+34
-20
lines changed

src/dstack/_internal/server/services/fleets.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@
4242
)
4343
from dstack._internal.core.models.projects import Project
4444
from dstack._internal.core.models.resources import ResourcesSpec
45-
from dstack._internal.core.models.runs import JobProvisioningData, Requirements, get_policy_map
45+
from dstack._internal.core.models.runs import (
46+
JobProvisioningData,
47+
Requirements,
48+
RunStatus,
49+
get_policy_map,
50+
)
4651
from dstack._internal.core.models.users import GlobalRole
4752
from dstack._internal.core.services import validate_dstack_resource_name
4853
from dstack._internal.core.services.diff import ModelDiff, copy_model, diff_models
@@ -53,6 +58,7 @@
5358
JobModel,
5459
MemberModel,
5560
ProjectModel,
61+
RunModel,
5662
UserModel,
5763
)
5864
from dstack._internal.server.services import events
@@ -613,48 +619,56 @@ async def delete_fleets(
613619
instance_nums: Optional[List[int]] = None,
614620
):
615621
res = await session.execute(
616-
select(FleetModel)
622+
select(FleetModel.id)
617623
.where(
618624
FleetModel.project_id == project.id,
619625
FleetModel.name.in_(names),
620626
FleetModel.deleted == False,
621627
)
622-
.options(joinedload(FleetModel.instances))
628+
.order_by(FleetModel.id) # take locks in order
629+
.with_for_update(key_share=True)
623630
)
624-
fleet_models = res.scalars().unique().all()
625-
fleets_ids = sorted([f.id for f in fleet_models])
626-
instances_ids = sorted([i.id for f in fleet_models for i in f.instances])
627-
await session.commit()
628-
logger.info("Deleting fleets: %s", [v.name for v in fleet_models])
631+
fleets_ids = list(res.scalars().unique().all())
632+
res = await session.execute(
633+
select(InstanceModel.id)
634+
.where(
635+
InstanceModel.fleet_id.in_(fleets_ids),
636+
InstanceModel.deleted == False,
637+
)
638+
.order_by(InstanceModel.id) # take locks in order
639+
.with_for_update(key_share=True)
640+
)
641+
instances_ids = list(res.scalars().unique().all())
642+
if is_db_sqlite():
643+
# Start new transaction to see committed changes after lock
644+
await session.commit()
629645
async with (
630646
get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, fleets_ids),
631647
get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids),
632648
):
633-
# Refetch after lock
634-
# TODO: Lock instances with FOR UPDATE?
635-
# TODO: Do not lock fleet when deleting only instances
649+
# Refetch after lock.
650+
# TODO: Do not lock fleet when deleting only instances.
636651
res = await session.execute(
637652
select(FleetModel)
638-
.where(
639-
FleetModel.project_id == project.id,
640-
FleetModel.name.in_(names),
641-
FleetModel.deleted == False,
642-
)
653+
.where(FleetModel.id.in_(fleets_ids))
643654
.options(
644-
selectinload(FleetModel.instances)
655+
joinedload(FleetModel.instances.and_(InstanceModel.id.in_(instances_ids)))
645656
.joinedload(InstanceModel.jobs)
646657
.load_only(JobModel.id)
647658
)
648-
.options(selectinload(FleetModel.runs))
659+
.options(
660+
joinedload(
661+
FleetModel.runs.and_(RunModel.status.not_in(RunStatus.finished_statuses()))
662+
)
663+
)
649664
.execution_options(populate_existing=True)
650-
.order_by(FleetModel.id) # take locks in order
651-
.with_for_update(key_share=True)
652665
)
653666
fleet_models = res.scalars().unique().all()
654667
fleets = [fleet_model_to_fleet(m) for m in fleet_models]
655668
for fleet in fleets:
656669
if fleet.spec.configuration.ssh_config is not None:
657670
_check_can_manage_ssh_fleets(user=user, project=project)
671+
logger.info("Deleting fleets: %s", [f.name for f in fleet_models])
658672
for fleet_model in fleet_models:
659673
_terminate_fleet_instances(fleet_model=fleet_model, instance_nums=instance_nums)
660674
# TERMINATING fleets are deleted by process_fleets after instances are terminated

0 commit comments

Comments
 (0)