From 5484178430457c7f73c2bcc13dce7790baccf7df Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 25 Aug 2025 15:45:07 +0500 Subject: [PATCH] Fix duplicate instance_num --- .../tasks/process_submitted_jobs.py | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 81b7d27acd..6d93eb523f 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta from typing import List, Optional, Tuple -from sqlalchemy import and_, not_, or_, select +from sqlalchemy import and_, func, not_, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload @@ -365,6 +365,10 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): project=project, run=run, ) + # FIXME: Fleet is not locked which may lead to duplicate instance_num. + # This is currently hard to fix without locking the fleet for entire provisioning duration. + # Processing should be done in multiple steps so that + # InstanceModel is created before provisioning. instance_num = await _get_next_instance_num( session=session, fleet_model=fleet_model, @@ -773,25 +777,11 @@ def _create_fleet_model_for_job( async def _get_next_instance_num(session: AsyncSession, fleet_model: FleetModel) -> int: - if len(fleet_model.instances) == 0: - # No instances means the fleet is not in the db yet, so don't lock. - return 0 - async with get_locker(get_db().dialect_name).lock_ctx( - FleetModel.__tablename__, [fleet_model.id] - ): - fleet_model = ( - ( - await session.execute( - select(FleetModel) - .where(FleetModel.id == fleet_model.id) - .options(joinedload(FleetModel.instances)) - .execution_options(populate_existing=True) - ) - ) - .unique() - .scalar_one() - ) - return len(fleet_model.instances) + res = await session.execute( + select(func.count(InstanceModel.id)).where(InstanceModel.fleet_id == fleet_model.id) + ) + instance_count = res.scalar_one() + return instance_count def _create_instance_model_for_job(