Skip to content

Commit a3ef364

Browse files
committed
Refactor fleet/instance selects to fix for update with outer join
1 parent c50d578 commit a3ef364

File tree

1 file changed

+92
-44
lines changed

1 file changed

+92
-44
lines changed

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 92 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,10 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
235235
# Then, the job runs on the assigned instance or a new instance is provisioned.
236236
# This is needed to avoid holding instances lock for a long time.
237237
if not job_model.instance_assigned:
238+
# If another job freed the instance but is still trying to detach volumes,
239+
# do not provision on it to prevent attaching volumes that are currently detaching.
240+
detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session)
241+
238242
fleet_filters = [
239243
FleetModel.project_id == project.id,
240244
FleetModel.deleted == False,
@@ -243,60 +247,41 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
243247
fleet_filters.append(FleetModel.id == run_model.fleet_id)
244248
if run_spec.configuration.fleets is not None:
245249
fleet_filters.append(FleetModel.name.in_(run_spec.configuration.fleets))
246-
res = await session.execute(
247-
select(FleetModel)
248-
.outerjoin(FleetModel.instances)
249-
.where(*fleet_filters)
250-
.where(
251-
or_(
252-
InstanceModel.id.is_(None),
253-
and_(
254-
InstanceModel.deleted == False,
255-
InstanceModel.total_blocks > InstanceModel.busy_blocks,
256-
),
257-
)
258-
)
259-
.options(contains_eager(FleetModel.instances))
260-
.order_by(InstanceModel.id) # take locks in order
261-
.with_for_update(key_share=True, of=InstanceModel)
250+
251+
instance_filters = [
252+
InstanceModel.deleted == False,
253+
InstanceModel.total_blocks > InstanceModel.busy_blocks,
254+
InstanceModel.id.not_in(detaching_instances_ids),
255+
]
256+
257+
fleet_models_with_instances, fleet_models_without_instances = await _select_fleet_models(
258+
session=session,
259+
fleet_filters=fleet_filters,
260+
instance_filters=instance_filters,
262261
)
263-
fleet_models = list(res.unique().scalars().all())
264-
fleets_ids = sorted([f.id for f in fleet_models])
265262
instances_ids = sorted(
266-
itertools.chain.from_iterable([i.id for i in f.instances] for f in fleet_models)
263+
itertools.chain.from_iterable(
264+
[i.id for i in f.instances] for f in fleet_models_with_instances
265+
)
267266
)
267+
fleet_models = fleet_models_with_instances + fleet_models_without_instances
268+
fleets_ids = [f.id for f in fleet_models]
269+
268270
if get_db().dialect_name == "sqlite":
269271
# Start new transaction to see committed changes after lock
270272
await session.commit()
273+
271274
async with get_locker(get_db().dialect_name).lock_ctx(
272275
InstanceModel.__tablename__, instances_ids
273276
):
274-
# If another job freed the instance but is still trying to detach volumes,
275-
# do not provision on it to prevent attaching volumes that are currently detaching.
276-
detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session)
277-
# Refetch after lock
278-
res = await session.execute(
279-
select(FleetModel)
280-
.outerjoin(FleetModel.instances)
281-
.where(
282-
FleetModel.id.in_(fleets_ids),
283-
*fleet_filters,
284-
)
285-
.where(
286-
or_(
287-
InstanceModel.id.is_(None),
288-
and_(
289-
InstanceModel.id.not_in(detaching_instances_ids),
290-
InstanceModel.id.in_(instances_ids),
291-
InstanceModel.deleted == False,
292-
InstanceModel.total_blocks > InstanceModel.busy_blocks,
293-
),
294-
)
277+
if get_db().dialect_name == "sqlite":
278+
fleet_models = await _refetch_fleet_models(
279+
session=session,
280+
fleets_ids=fleets_ids,
281+
instances_ids=instances_ids,
282+
fleet_filters=fleet_filters,
283+
instance_filters=instance_filters,
295284
)
296-
.options(contains_eager(FleetModel.instances))
297-
.execution_options(populate_existing=True)
298-
)
299-
fleet_models = list(res.unique().scalars().all())
300285
fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers(
301286
fleet_models=fleet_models,
302287
run_model=run_model,
@@ -419,6 +404,69 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
419404
await session.commit()
420405

421406

407+
async def _select_fleet_models(
408+
session: AsyncSession, fleet_filters: list, instance_filters: list
409+
) -> tuple[list[FleetModel], list[FleetModel]]:
410+
# Selecting fleets in two queries since Postgres does not allow
411+
# locking nullable side of an outer join. So, first lock instances with inner join.
412+
# Then select left out fleets without instances.
413+
res = await session.execute(
414+
select(FleetModel)
415+
.join(FleetModel.instances)
416+
.where(*fleet_filters)
417+
.where(*instance_filters)
418+
.options(contains_eager(FleetModel.instances))
419+
.order_by(InstanceModel.id) # take locks in order
420+
.with_for_update(key_share=True, of=InstanceModel)
421+
)
422+
fleet_models_with_instances = list(res.unique().scalars().all())
423+
fleet_models_with_instances_ids = [f.id for f in fleet_models_with_instances]
424+
res = await session.execute(
425+
select(FleetModel)
426+
.outerjoin(FleetModel.instances)
427+
.where(
428+
*fleet_filters,
429+
FleetModel.id.not_in(fleet_models_with_instances_ids),
430+
)
431+
.where(InstanceModel.id.is_(None))
432+
.options(contains_eager(FleetModel.instances)) # loading empty relation
433+
)
434+
fleet_models_without_instances = list(res.unique().scalars().all())
435+
return fleet_models_with_instances, fleet_models_without_instances
436+
437+
438+
async def _refetch_fleet_models(
439+
session: AsyncSession,
440+
fleets_ids: list[uuid.UUID],
441+
instances_ids: list[uuid.UUID],
442+
fleet_filters: list,
443+
instance_filters: list,
444+
) -> list[FleetModel]:
445+
res = await session.execute(
446+
select(FleetModel)
447+
.outerjoin(FleetModel.instances)
448+
.where(
449+
FleetModel.id.in_(fleets_ids),
450+
*fleet_filters,
451+
)
452+
.where(
453+
and_(
454+
InstanceModel.id.in_(instances_ids),
455+
or_(
456+
InstanceModel.id.is_(None),
457+
and_(
458+
*instance_filters,
459+
),
460+
),
461+
)
462+
)
463+
.options(contains_eager(FleetModel.instances))
464+
.execution_options(populate_existing=True)
465+
)
466+
fleet_models = list(res.unique().scalars().all())
467+
return fleet_models
468+
469+
422470
def _find_optimal_fleet_with_offers(
423471
fleet_models: list[FleetModel],
424472
run_model: RunModel,

0 commit comments

Comments
 (0)