diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 9e93e7b2a..5f5ac0eeb 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -98,6 +98,7 @@ get_instance_offer, get_instance_provisioning_data, switch_instance_status, + try_atomic_busy_blocks_increment, ) from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, @@ -575,6 +576,22 @@ async def _apply_assignment_result( return instance_model, current_offer = current_instance_offers[0] + if not await try_atomic_busy_blocks_increment( + session=session, + instance_id=instance_model.id, + blocks=current_offer.blocks, + ): + await _reset_job_lock_for_retry(session=session, item=item) + return + + # Refetch instance to get updated busy_blocks from the atomic update + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.id == instance_model.id) + .options(joinedload(InstanceModel.volume_attachments)) + ) + instance_model = res.unique().scalar_one() + _assign_instance_to_job( session=session, job_model=job_model, @@ -880,7 +897,6 @@ def _assign_instance_to_job( job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json() switch_instance_status(session, instance_model, InstanceStatus.BUSY) - instance_model.busy_blocks += offer.blocks events.emit( session, ( diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index eaa452380..f65b3fff0 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -93,6 +93,7 @@ format_instance_blocks_for_event, get_instance_provisioning_data, switch_instance_status, + try_atomic_busy_blocks_increment, ) from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, @@ -1079,6 +1080,12 @@ async def _assign_existing_instance_to_job( instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0) instance, offer = instances_with_offers[0] + if not await try_atomic_busy_blocks_increment( + session=session, instance_id=instance.id, blocks=offer.blocks + ): + job_model.instance_assigned = False + return + # Reload InstanceModel with volume attachments res = await session.execute( select(InstanceModel) @@ -1087,7 +1094,6 @@ async def _assign_existing_instance_to_job( ) instance = res.unique().scalar_one() switch_instance_status(session, instance, InstanceStatus.BUSY) - instance.busy_blocks += offer.blocks job_model.instance = instance job_model.used_instance_id = instance.id diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index d54ec8b68..7fd373b99 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -5,7 +5,7 @@ from typing import Dict, List, Literal, Optional, Union import gpuhunt -from sqlalchemy import and_, exists, false, or_, select +from sqlalchemy import and_, exists, false, or_, select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, load_only @@ -63,6 +63,29 @@ logger = get_logger(__name__) +async def try_atomic_busy_blocks_increment( + session: AsyncSession, + instance_id: uuid.UUID, + blocks: int, +) -> bool: + """ + Atomically increment busy_blocks if the instance has capacity. + Returns True if the update affected a row, False otherwise. + """ + res = await session.execute( + update(InstanceModel) + .where( + InstanceModel.id == instance_id, + or_( + InstanceModel.total_blocks.is_(None), + InstanceModel.busy_blocks + blocks <= InstanceModel.total_blocks, + ), + ) + .values(busy_blocks=InstanceModel.busy_blocks + blocks) + ) + return res.rowcount > 0 + + def switch_instance_status( session: AsyncSession, instance_model: InstanceModel,