From 99e90fc06e1010093cc93aa2ff4b9f1f05884f32 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 7 Aug 2025 15:19:22 +0500 Subject: [PATCH 01/20] Do not auto-delete empty fleets that allow 0 nodes --- .../server/background/tasks/process_fleets.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index 0388ac96ba..4ce819e59a 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -15,6 +15,7 @@ RunModel, ) from dstack._internal.server.services.fleets import ( + get_fleet_spec, is_fleet_empty, is_fleet_in_use, ) @@ -92,11 +93,18 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel]) def _autodelete_fleet(fleet_model: FleetModel) -> bool: - # Currently all empty fleets are autodeleted. - # TODO: If fleets with `nodes: 0..` are supported, their deletion should be skipped. if is_fleet_in_use(fleet_model) or not is_fleet_empty(fleet_model): return False + fleet_spec = get_fleet_spec(fleet_model) + if ( + fleet_model.status != FleetStatus.TERMINATING + and fleet_spec.configuration.nodes is not None + and (fleet_spec.configuration.nodes.min is None or fleet_spec.configuration.nodes.min == 0) + ): + # Empty fleets that allow 0 nodes should not be auto-deleted + return False + logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name) fleet_model.status = FleetStatus.TERMINATED fleet_model.deleted = True From 1be6ff662d7403b23df9b69f4c37e4d5ed627fd7 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 8 Aug 2025 10:16:53 +0500 Subject: [PATCH 02/20] Refactor offers filtering --- .../_internal/core/backends/aws/compute.py | 5 +- .../_internal/core/backends/base/compute.py | 4 - .../_internal/core/backends/gcp/compute.py | 5 +- .../_internal/core/backends/nebius/compute.py | 5 +- .../_internal/server/services/offers.py | 79 +++++++++++-------- 5 files changed, 51 insertions(+), 47 deletions(-) diff --git a/src/dstack/_internal/core/backends/aws/compute.py b/src/dstack/_internal/core/backends/aws/compute.py index 82e628e585..f88cefee23 100644 --- a/src/dstack/_internal/core/backends/aws/compute.py +++ b/src/dstack/_internal/core/backends/aws/compute.py @@ -383,10 +383,7 @@ def is_suitable_placement_group( ) -> bool: if not _offer_supports_placement_group(instance_offer, placement_group): return False - return ( - placement_group.configuration.backend == BackendType.AWS - and placement_group.configuration.region == instance_offer.region - ) + return placement_group.configuration.region == instance_offer.region def create_gateway( self, diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 53d062567b..9e604286af 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -263,10 +263,6 @@ def is_suitable_placement_group( Checks if the instance offer can be provisioned in the placement group. Should return immediately, without performing API calls. - - Can be called with an offer originating from a different backend, because some backends - (BackendType.DSTACK) produce offers on behalf of other backends. Should return `False` - in that case. """ pass diff --git a/src/dstack/_internal/core/backends/gcp/compute.py b/src/dstack/_internal/core/backends/gcp/compute.py index 2f0239239e..4956d66ae3 100644 --- a/src/dstack/_internal/core/backends/gcp/compute.py +++ b/src/dstack/_internal/core/backends/gcp/compute.py @@ -448,10 +448,7 @@ def is_suitable_placement_group( placement_group: PlacementGroup, instance_offer: InstanceOffer, ) -> bool: - return ( - placement_group.configuration.backend == BackendType.GCP - and placement_group.configuration.region == instance_offer.region - ) + return placement_group.configuration.region == instance_offer.region def create_gateway( self, diff --git a/src/dstack/_internal/core/backends/nebius/compute.py b/src/dstack/_internal/core/backends/nebius/compute.py index 48be255aa7..c355df1b83 100644 --- a/src/dstack/_internal/core/backends/nebius/compute.py +++ b/src/dstack/_internal/core/backends/nebius/compute.py @@ -298,10 +298,7 @@ def is_suitable_placement_group( placement_group: PlacementGroup, instance_offer: InstanceOffer, ) -> bool: - if not ( - placement_group.configuration.backend == BackendType.NEBIUS - and placement_group.configuration.region == instance_offer.region - ): + if placement_group.configuration.region != instance_offer.region: return False assert placement_group.provisioning_data is not None backend_data = NebiusPlacementGroupBackendData.load( diff --git a/src/dstack/_internal/server/services/offers.py b/src/dstack/_internal/server/services/offers.py index 1e1b6ff586..3b1f3c8d8c 100644 --- a/src/dstack/_internal/server/services/offers.py +++ b/src/dstack/_internal/server/services/offers.py @@ -49,6 +49,7 @@ async def get_offers_by_requirements( backend_types = profile.backends regions = profile.regions availability_zones = profile.availability_zones + instance_types = profile.instance_types if volumes: mount_point_volumes = volumes[0] @@ -97,9 +98,43 @@ async def get_offers_by_requirements( exclude_not_available=exclude_not_available, ) - # Filter offers again for backends since a backend - # can return offers of different backend types (e.g. BackendType.DSTACK). - # The first filter should remain as an optimization. + offers = filter_offers( + offers=offers, + # Double filtering by backends if backend returns offers for other backend. + backend_types=backend_types, + regions=regions, + availability_zones=availability_zones, + instance_types=instance_types, + placement_group=placement_group, + ) + + if blocks == 1: + return offers + + shareable_offers = [] + for backend, offer in offers: + resources = offer.instance.resources + cpu_count = resources.cpus + gpu_count = len(resources.gpus) + if gpu_count > 0 and resources.gpus[0].vendor == gpuhunt.AcceleratorVendor.GOOGLE: + # TPUs cannot be shared + gpu_count = 1 + divisible, _blocks = is_divisible_into_blocks(cpu_count, gpu_count, blocks) + if not divisible: + continue + offer.total_blocks = _blocks + shareable_offers.append((backend, offer)) + return shareable_offers + + +def filter_offers( + offers: List[Tuple[Backend, InstanceOfferWithAvailability]], + backend_types: Optional[List[BackendType]] = None, + regions: Optional[List[str]] = None, + availability_zones: Optional[List[str]] = None, + instance_types: Optional[List[str]] = None, + placement_group: Optional[PlacementGroup] = None, +) -> List[Tuple[Backend, InstanceOfferWithAvailability]]: if backend_types is not None: offers = [(b, o) for b, o in offers if o.backend in backend_types] @@ -119,39 +154,21 @@ async def get_offers_by_requirements( new_offers.append((b, new_offer)) offers = new_offers + if instance_types is not None: + instance_types = [i.lower() for i in instance_types] + offers = [(b, o) for b, o in offers if o.instance.name.lower() in instance_types] + if placement_group is not None: new_offers = [] for b, o in offers: - for backend in backends: - compute = backend.compute() - if isinstance( - compute, ComputeWithPlacementGroupSupport - ) and compute.is_suitable_placement_group(placement_group, o): - new_offers.append((b, o)) - break + compute = b.compute() + if isinstance( + compute, ComputeWithPlacementGroupSupport + ) and compute.is_suitable_placement_group(placement_group, o): + new_offers.append((b, o)) offers = new_offers - if profile.instance_types is not None: - instance_types = [i.lower() for i in profile.instance_types] - offers = [(b, o) for b, o in offers if o.instance.name.lower() in instance_types] - - if blocks == 1: - return offers - - shareable_offers = [] - for backend, offer in offers: - resources = offer.instance.resources - cpu_count = resources.cpus - gpu_count = len(resources.gpus) - if gpu_count > 0 and resources.gpus[0].vendor == gpuhunt.AcceleratorVendor.GOOGLE: - # TPUs cannot be shared - gpu_count = 1 - divisible, _blocks = is_divisible_into_blocks(cpu_count, gpu_count, blocks) - if not divisible: - continue - offer.total_blocks = _blocks - shareable_offers.append((backend, offer)) - return shareable_offers + return offers def is_divisible_into_blocks( From 40ab38d87a9f3c0c6602187a71c2119c5c60c1b6 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 8 Aug 2025 10:35:41 +0500 Subject: [PATCH 03/20] Respect fleet nodes.max --- .../tasks/process_submitted_jobs.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index e33271064f..fe17c86f5d 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -12,6 +12,7 @@ from dstack._internal.core.errors import BackendError, ServerClientError from dstack._internal.core.models.common import NetworkMode from dstack._internal.core.models.fleets import ( + Fleet, FleetConfiguration, FleetSpec, FleetStatus, @@ -463,6 +464,14 @@ async def _run_job_on_new_instance( fleet = None if fleet_model is not None: fleet = fleet_model_to_fleet(fleet_model) + # FIXME: Concurrent provisioning may violate nodes.max + # To fix, lock fleet and split instance model creation + # and instance provisioning into separate transactions. + if not _check_can_create_new_instance_in_fleet(fleet): + logger.debug( + "%s: cannot fit new instance into fleet %s", fmt(job_model), fleet_model.name + ) + return None multinode = job.job_spec.jobs_per_replica > 1 or ( fleet is not None and fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER ) @@ -522,6 +531,18 @@ async def _run_job_on_new_instance( return None +def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool: + if fleet.spec.configuration.ssh_config is not None: + return False + if ( + fleet.spec.configuration.nodes is not None + and fleet.spec.configuration.nodes.max is not None + and fleet.spec.configuration.nodes.max <= len(fleet.instances) + ): + return False + return True + + def _get_or_create_fleet_model_for_job( project: ProjectModel, run_model: RunModel, From 12463ea2cb608800f6d63d975cbc0ae814218bb3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 8 Aug 2025 14:36:37 +0500 Subject: [PATCH 04/20] Fix replicas typing --- .../_internal/core/models/configurations.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index 39530696ab..f9fb0749be 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -20,6 +20,7 @@ from dstack._internal.core.models.unix import UnixUser from dstack._internal.core.models.volumes import MountPoint, VolumeConfiguration, parse_mount_point from dstack._internal.utils.common import has_duplicates +from dstack._internal.utils.json_schema import add_extra_schema_types from dstack._internal.utils.json_utils import ( pydantic_orjson_dumps_with_indent, ) @@ -561,7 +562,7 @@ class ServiceConfigurationParams(CoreModel): ) auth: Annotated[bool, Field(description="Enable the authorization")] = True replicas: Annotated[ - Union[conint(ge=1), constr(regex=r"^[0-9]+..[1-9][0-9]*$"), Range[int]], + Range[int], Field( description="The number of replicas. Can be a number (e.g. `2`) or a range (`0..4` or `1..8`). " "If it's a range, the `scaling` property is required" @@ -592,20 +593,13 @@ def convert_model(cls, v: Optional[Union[AnyModel, str]]) -> Optional[AnyModel]: return v @validator("replicas") - def convert_replicas(cls, v: Any) -> Range[int]: - if isinstance(v, str) and ".." in v: - min, max = v.replace(" ", "").split("..") - v = Range(min=min or 0, max=max or None) - elif isinstance(v, (int, float)): - v = Range(min=v, max=v) + def convert_replicas(cls, v: Range[int]) -> Range[int]: if v.max is None: raise ValueError("The maximum number of replicas is required") + if v.min is None: + v.min = 0 if v.min < 0: raise ValueError("The minimum number of replicas must be greater than or equal to 0") - if v.max < v.min: - raise ValueError( - "The maximum number of replicas must be greater than or equal to the minimum number of replicas" - ) return v @validator("gateway") @@ -622,9 +616,9 @@ def validate_gateway( def validate_scaling(cls, values): scaling = values.get("scaling") replicas = values.get("replicas") - if replicas.min != replicas.max and not scaling: + if replicas and replicas.min != replicas.max and not scaling: raise ValueError("When you set `replicas` to a range, ensure to specify `scaling`.") - if replicas.min == replicas.max and scaling: + if replicas and replicas.min == replicas.max and scaling: raise ValueError("To use `scaling`, `replicas` must be set to a range.") return values @@ -655,6 +649,14 @@ class ServiceConfiguration( ): type: Literal["service"] = "service" + class Config: + @staticmethod + def schema_extra(schema: Dict[str, Any]): + add_extra_schema_types( + schema["properties"]["replicas"], + extra_types=[{"type": "integer"}, {"type": "string"}], + ) + AnyRunConfiguration = Union[DevEnvironmentConfiguration, TaskConfiguration, ServiceConfiguration] From 780e7ca86ac59d580408bd5602830d222efa8e3f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 8 Aug 2025 18:05:08 +0500 Subject: [PATCH 05/20] Support provisioning in empty fleets --- .../tasks/process_submitted_jobs.py | 163 +++++++++---- .../ecd3cfc5c86e_add_jobmodel_fleet.py | 41 ++++ src/dstack/_internal/server/models.py | 9 + src/dstack/_internal/server/testing/common.py | 5 +- .../tasks/test_process_submitted_jobs.py | 229 +++++++++--------- 5 files changed, 293 insertions(+), 154 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/ecd3cfc5c86e_add_jobmodel_fleet.py diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index fe17c86f5d..24c4e37e07 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -1,11 +1,13 @@ import asyncio +import itertools +import math import uuid from datetime import datetime, timedelta from typing import List, Optional, Tuple -from sqlalchemy import select +from sqlalchemy import and_, or_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload, load_only, selectinload +from sqlalchemy.orm import contains_eager, joinedload, load_only, selectinload from dstack._internal.core.backends.base.backend import Backend from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport @@ -51,6 +53,7 @@ from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.fleets import ( fleet_model_to_fleet, + get_fleet_spec, ) from dstack._internal.server.services.instances import ( filter_pool_instances, @@ -158,7 +161,10 @@ async def _process_next_submitted_job(): async def _process_submitted_job(session: AsyncSession, job_model: JobModel): # Refetch to load related attributes. res = await session.execute( - select(JobModel).where(JobModel.id == job_model.id).options(joinedload(JobModel.instance)) + select(JobModel) + .where(JobModel.id == job_model.id) + .options(joinedload(JobModel.instance)) + .options(joinedload(JobModel.fleet).joinedload(FleetModel.instances)) ) job_model = res.unique().scalar_one() res = await session.execute( @@ -177,6 +183,12 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): profile = run_spec.merged_profile job = find_job(run.jobs, job_model.replica_num, job_model.job_num) + # Master job chooses fleet for the run. + # Due to two-step processing, it's saved to job_model.fleet. + # Other jobs just inherit fleet from run_model.fleet. + # If master job chooses no fleet, the new fleet will be created. + fleet_model = run_model.fleet or job_model.fleet + master_job = find_job(run.jobs, job_model.replica_num, 0) master_job_provisioning_data = None if job.job_spec.job_num != 0: @@ -224,19 +236,36 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): # Then, the job runs on the assigned instance or a new instance is provisioned. # This is needed to avoid holding instances lock for a long time. if not job_model.instance_assigned: - # Try assigning an existing instance + fleet_filters = [ + FleetModel.project_id == project.id, + FleetModel.deleted == False, + ] + if run_model.fleet is not None: + fleet_filters.append(FleetModel.id == run_model.fleet_id) + if run_spec.configuration.fleets is not None: + fleet_filters.append(FleetModel.name.in_(run_spec.configuration.fleets)) res = await session.execute( - select(InstanceModel) + select(FleetModel) + .outerjoin(FleetModel.instances) + .where(*fleet_filters) .where( - InstanceModel.project_id == project.id, - InstanceModel.deleted == False, - InstanceModel.total_blocks > InstanceModel.busy_blocks, + or_( + InstanceModel.id.is_(None), + and_( + InstanceModel.deleted == False, + InstanceModel.total_blocks > InstanceModel.busy_blocks, + ), + ) ) + .options(contains_eager(FleetModel.instances)) .order_by(InstanceModel.id) # take locks in order - .with_for_update(key_share=True) + .with_for_update(key_share=True, of=InstanceModel) + ) + fleet_models = list(res.unique().scalars().all()) + fleets_ids = sorted([f.id for f in fleet_models]) + instances_ids = sorted( + itertools.chain.from_iterable([i.id for i in f.instances] for f in fleet_models) ) - pool_instances = list(res.unique().scalars().all()) - instances_ids = sorted([i.id for i in pool_instances]) if get_db().dialect_name == "sqlite": # Start new transaction to see committed changes after lock await session.commit() @@ -248,30 +277,77 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) # Refetch after lock res = await session.execute( - select(InstanceModel) + select(FleetModel) + .outerjoin(FleetModel.instances) .where( - InstanceModel.id.not_in(detaching_instances_ids), - InstanceModel.id.in_(instances_ids), - InstanceModel.deleted == False, - InstanceModel.total_blocks > InstanceModel.busy_blocks, + FleetModel.id.in_(fleets_ids), + *fleet_filters, + ) + .where( + or_( + InstanceModel.id.is_(None), + and_( + InstanceModel.id.not_in(detaching_instances_ids), + InstanceModel.id.in_(instances_ids), + InstanceModel.deleted == False, + InstanceModel.total_blocks > InstanceModel.busy_blocks, + ), + ) ) - .options(joinedload(InstanceModel.fleet)) + .options(contains_eager(FleetModel.instances)) .execution_options(populate_existing=True) ) - pool_instances = list(res.unique().scalars().all()) - instance = await _assign_job_to_pool_instance( + fleet_models = list(res.unique().scalars().all()) + fleet_instances_with_offers = [] + for candidate_fleet_model in fleet_models: + fleet_instances_with_offers = await _get_fleet_instances_with_offers( + fleet_model=candidate_fleet_model, + run_spec=run_spec, + job=job, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + if run_model.fleet_id is not None: + # Using the first fleet that was already chosen by the master job. + fleet_model = candidate_fleet_model + break + # Looking for an eligible fleet for the run. + # TODO: Pick optimal fleet instead of the first eligible one. + fleet_spec = get_fleet_spec(candidate_fleet_model) + fleet_capacity = len( + [o for o in fleet_instances_with_offers if o[1].availability.is_available()] + ) + if fleet_spec.configuration.nodes is not None: + if fleet_spec.configuration.nodes.max is None: + fleet_capacity = math.inf + else: + # FIXME: Multiple service jobs can be provisioned on one instance with blocks. + # Current capacity calculation does not take future provisioned blocks into account. + # It may be impossible to do since we cannot be sure which instance will be provisioned. + fleet_capacity += fleet_spec.configuration.nodes.max - len( + candidate_fleet_model.instances + ) + instances_required = 1 + if run_spec.configuration.type == "task": + instances_required = run_spec.configuration.nodes + elif ( + run_spec.configuration.type == "service" + and run_spec.configuration.replicas.min is not None + ): + instances_required = run_spec.configuration.replicas.min + if fleet_capacity >= instances_required: + # TODO: Ensure we use the chosen fleet when there are no instance assigned. + fleet_model = candidate_fleet_model + break + instance = await _assign_job_to_fleet_instance( session=session, - pool_instances=pool_instances, - run_spec=run_spec, + instances_with_offers=fleet_instances_with_offers, job_model=job_model, - job=job, - fleet_model=run_model.fleet, - master_job_provisioning_data=master_job_provisioning_data, - volumes=volumes, ) + job_model.fleet = fleet_model job_model.instance_assigned = True job_model.last_processed_at = common_utils.get_current_datetime() - if len(pool_instances) > 0: + if len(instances_ids) > 0: await session.commit() return # If no instances were locked, we can proceed in the same transaction. @@ -298,7 +374,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): # Create a new cloud instance run_job_result = await _run_job_on_new_instance( project=project, - fleet_model=run_model.fleet, + fleet_model=fleet_model, job_model=job_model, run=run, job=job, @@ -319,11 +395,11 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): job_provisioning_data, offer = run_job_result job_model.job_provisioning_data = job_provisioning_data.json() job_model.status = JobStatus.PROVISIONING - fleet_model = _get_or_create_fleet_model_for_job( - project=project, - run_model=run_model, - run=run, - ) + if fleet_model is None: + fleet_model = _create_fleet_model_for_job( + project=project, + run=run, + ) instance_num = await _get_next_instance_num( session=session, fleet_model=fleet_model, @@ -377,16 +453,14 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): await session.commit() -async def _assign_job_to_pool_instance( - session: AsyncSession, - pool_instances: List[InstanceModel], +async def _get_fleet_instances_with_offers( + fleet_model: FleetModel, run_spec: RunSpec, - job_model: JobModel, job: Job, - fleet_model: Optional[FleetModel], master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[List[List[Volume]]] = None, -) -> Optional[InstanceModel]: +) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: + pool_instances = fleet_model.instances instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] profile = run_spec.merged_profile multinode = job.job_spec.jobs_per_replica > 1 @@ -415,7 +489,15 @@ async def _assign_job_to_pool_instance( volumes=volumes, ) instances_with_offers.extend(shared_instances_with_offers) + instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0) + return instances_with_offers + +async def _assign_job_to_fleet_instance( + session: AsyncSession, + instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]], + job_model: JobModel, +) -> Optional[InstanceModel]: if len(instances_with_offers) == 0: return None @@ -543,13 +625,10 @@ def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool: return True -def _get_or_create_fleet_model_for_job( +def _create_fleet_model_for_job( project: ProjectModel, - run_model: RunModel, run: Run, ) -> FleetModel: - if run_model.fleet is not None: - return run_model.fleet placement = InstanceGroupPlacement.ANY if run.run_spec.configuration.type == "task" and run.run_spec.configuration.nodes > 1: placement = InstanceGroupPlacement.CLUSTER diff --git a/src/dstack/_internal/server/migrations/versions/ecd3cfc5c86e_add_jobmodel_fleet.py b/src/dstack/_internal/server/migrations/versions/ecd3cfc5c86e_add_jobmodel_fleet.py new file mode 100644 index 0000000000..ff92108ce3 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/ecd3cfc5c86e_add_jobmodel_fleet.py @@ -0,0 +1,41 @@ +"""Add JobModel.fleet + +Revision ID: ecd3cfc5c86e +Revises: 728b1488b1b4 +Create Date: 2025-08-08 17:51:27.267140 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +from alembic import op + +# revision identifiers, used by Alembic. +revision = "ecd3cfc5c86e" +down_revision = "728b1488b1b4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "fleet_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True + ) + ) + batch_op.create_foreign_key( + batch_op.f("fk_jobs_fleet_id_fleets"), "fleets", ["fleet_id"], ["id"] + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f("fk_jobs_fleet_id_fleets"), type_="foreignkey") + batch_op.drop_column("fleet_id") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index fed48a37c3..befa4235e3 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -390,10 +390,18 @@ class JobModel(BaseModel): id: Mapped[uuid.UUID] = mapped_column( UUIDType(binary=False), primary_key=True, default=uuid.uuid4 ) + project_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE")) project: Mapped["ProjectModel"] = relationship() + run_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("runs.id", ondelete="CASCADE")) run: Mapped["RunModel"] = relationship() + + # Jobs need to reference fleets because we may choose an optimal fleet for a master job + # but not yet create an instance for it. + fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id")) + fleet: Mapped[Optional["FleetModel"]] = relationship(back_populates="jobs") + run_name: Mapped[str] = mapped_column(String(100)) job_num: Mapped[int] = mapped_column(Integer) job_name: Mapped[str] = mapped_column(String(100)) @@ -537,6 +545,7 @@ class FleetModel(BaseModel): spec: Mapped[str] = mapped_column(Text) runs: Mapped[List["RunModel"]] = relationship(back_populates="fleet") + jobs: Mapped[List["JobModel"]] = relationship(back_populates="fleet") instances: Mapped[List["InstanceModel"]] = relationship(back_populates="fleet") diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 646a03e88b..b7908f758c 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -330,6 +330,7 @@ async def create_run( async def create_job( session: AsyncSession, run: RunModel, + fleet: Optional[FleetModel] = None, submission_num: int = 0, status: JobStatus = JobStatus.SUBMITTED, submitted_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), @@ -353,6 +354,7 @@ async def create_job( job_spec.job_num = job_num job = JobModel( project_id=run.project_id, + fleet=fleet, run_id=run.id, run_name=run.run_name, job_num=job_num, @@ -733,6 +735,7 @@ def get_instance_offer_with_availability( availability_zones: Optional[List[str]] = None, price: float = 1.0, instance_type: str = "instance", + availability: InstanceAvailability = InstanceAvailability.AVAILABLE, ): gpus = [ Gpu( @@ -756,7 +759,7 @@ def get_instance_offer_with_availability( ), region=region, price=price, - availability=InstanceAvailability.AVAILABLE, + availability=availability, availability_zones=availability_zones, blocks=blocks, total_blocks=total_blocks, diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index 901a91be4d..13adb1aacd 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -11,14 +11,11 @@ from dstack._internal.core.models.health import HealthStatus from dstack._internal.core.models.instances import ( InstanceAvailability, - InstanceOfferWithAvailability, InstanceStatus, - InstanceType, - Resources, ) from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.resources import Range from dstack._internal.core.models.runs import ( - JobProvisioningData, JobStatus, JobTerminationReason, ) @@ -40,7 +37,9 @@ create_run, create_user, create_volume, + get_fleet_spec, get_instance_offer_with_availability, + get_job_provisioning_data, get_run_spec, get_volume_provisioning_data, ) @@ -113,14 +112,8 @@ async def test_provisions_job( run=run, instance_assigned=True, ) - offer = InstanceOfferWithAvailability( + offer = get_instance_offer_with_availability( backend=backend, - instance=InstanceType( - name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), - ), - region="us", - price=1.0, availability=InstanceAvailability.AVAILABLE, ) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -128,20 +121,7 @@ async def test_provisions_job( m.return_value = [backend_mock] backend_mock.TYPE = backend backend_mock.compute.return_value.get_offers_cached.return_value = [offer] - backend_mock.compute.return_value.run_job.return_value = JobProvisioningData( - backend=offer.backend, - instance_type=offer.instance, - instance_id="instance_id", - hostname="1.1.1.1", - internal_ip=None, - region=offer.region, - price=offer.price, - username="ubuntu", - ssh_port=22, - ssh_proxy=None, - dockerized=True, - backend_data=None, - ) + backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data() await process_submitted_jobs() m.assert_called_once() backend_mock.compute.return_value.get_offers_cached.assert_called_once() @@ -179,14 +159,8 @@ async def test_fails_job_when_privileged_true_and_no_offers_with_create_instance run=run, instance_assigned=True, ) - offer = InstanceOfferWithAvailability( + offer = get_instance_offer_with_availability( backend=BackendType.RUNPOD, - instance=InstanceType( - name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), - ), - region="us", - price=1.0, availability=InstanceAvailability.AVAILABLE, ) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -194,20 +168,7 @@ async def test_fails_job_when_privileged_true_and_no_offers_with_create_instance m.return_value = [backend_mock] backend_mock.TYPE = BackendType.RUNPOD backend_mock.compute.return_value.get_offers_cached.return_value = [offer] - backend_mock.compute.return_value.run_job.return_value = JobProvisioningData( - backend=offer.backend, - instance_type=offer.instance, - instance_id="instance_id", - hostname="1.1.1.1", - internal_ip=None, - region=offer.region, - price=offer.price, - username="ubuntu", - ssh_port=22, - ssh_proxy=None, - dockerized=True, - backend_data=None, - ) + backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data() with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: datetime_mock.return_value = datetime(2023, 1, 2, 3, 30, 0, tzinfo=timezone.utc) await process_submitted_jobs() @@ -248,14 +209,8 @@ async def test_fails_job_when_instance_mounts_and_no_offers_with_create_instance run=run, instance_assigned=True, ) - offer = InstanceOfferWithAvailability( + offer = get_instance_offer_with_availability( backend=BackendType.RUNPOD, - instance=InstanceType( - name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), - ), - region="us", - price=1.0, availability=InstanceAvailability.AVAILABLE, ) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -263,20 +218,7 @@ async def test_fails_job_when_instance_mounts_and_no_offers_with_create_instance m.return_value = [backend_mock] backend_mock.TYPE = BackendType.RUNPOD backend_mock.compute.return_value.get_offers_cached.return_value = [offer] - backend_mock.compute.return_value.run_job.return_value = JobProvisioningData( - backend=offer.backend, - instance_type=offer.instance, - instance_id="instance_id", - hostname="1.1.1.1", - internal_ip=None, - region=offer.region, - price=offer.price, - username="ubuntu", - ssh_port=22, - ssh_proxy=None, - dockerized=True, - backend_data=None, - ) + backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data() with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: datetime_mock.return_value = datetime(2023, 1, 2, 3, 30, 0, tzinfo=timezone.utc) await process_submitted_jobs() @@ -319,14 +261,8 @@ async def test_provisions_job_with_optional_instance_volume_not_attached( run=run, instance_assigned=True, ) - offer = InstanceOfferWithAvailability( + offer = get_instance_offer_with_availability( backend=BackendType.RUNPOD, - instance=InstanceType( - name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), - ), - region="us", - price=1.0, availability=InstanceAvailability.AVAILABLE, ) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -334,20 +270,7 @@ async def test_provisions_job_with_optional_instance_volume_not_attached( m.return_value = [backend_mock] backend_mock.TYPE = BackendType.RUNPOD backend_mock.compute.return_value.get_offers_cached.return_value = [offer] - backend_mock.compute.return_value.run_job.return_value = JobProvisioningData( - backend=offer.backend, - instance_type=offer.instance, - instance_id="instance_id", - hostname="1.1.1.1", - internal_ip=None, - region=offer.region, - price=offer.price, - username="ubuntu", - ssh_port=22, - ssh_proxy=None, - dockerized=False, - backend_data=None, - ) + backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data() await process_submitted_jobs() await session.refresh(job) @@ -401,9 +324,11 @@ async def test_assignes_job_to_instance(self, test_db, session: AsyncSession): session=session, project_id=project.id, ) + fleet = await create_fleet(session=session, project=project) instance = await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, ) run = await create_run( @@ -435,16 +360,19 @@ async def test_does_no_reuse_unavailable_instances(self, test_db, session: Async session=session, project_id=project.id, ) + fleet = await create_fleet(session=session, project=project) # busy await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.BUSY, ) # unreachable await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, unreachable=True, ) @@ -452,6 +380,7 @@ async def test_does_no_reuse_unavailable_instances(self, test_db, session: Async await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, health_status=HealthStatus.FAILURE, ) @@ -494,9 +423,11 @@ async def test_assigns_job_to_instance_with_volumes(self, test_db, session: Asyn backend=BackendType.AWS, region="us-east-1", ) + fleet = await create_fleet(session=session, project=project) instance = await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, backend=BackendType.AWS, region="us-east-1", @@ -557,9 +488,11 @@ async def test_assigns_job_to_shared_instance(self, test_db, session: AsyncSessi project_id=project.id, ) offer = get_instance_offer_with_availability(gpu_count=8, cpu_count=64, memory_gib=128) + fleet = await create_fleet(session=session, project=project) instance = await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, offer=offer, total_blocks=4, @@ -598,9 +531,11 @@ async def test_assigns_multi_node_job_to_shared_instance(self, test_db, session: project_id=project.id, ) offer = get_instance_offer_with_availability(gpu_count=8, cpu_count=64, memory_gib=128) + fleet = await create_fleet(session=session, project=project) instance = await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, backend=BackendType.AWS, offer=offer, @@ -646,9 +581,11 @@ async def test_cannot_assign_multi_node_job_to_partially_busy_shared_instance( project_id=project.id, ) offer = get_instance_offer_with_availability(gpu_count=8, cpu_count=64, memory_gib=128) + fleet = await create_fleet(session=session, project=project) instance = await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, backend=BackendType.AWS, offer=offer, @@ -712,14 +649,19 @@ async def test_assigns_job_to_specific_fleet(self, test_db, session: AsyncSessio @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_creates_new_instance_in_existing_fleet(self, test_db, session: AsyncSession): + async def test_creates_new_instance_in_existing_non_empty_fleet( + self, test_db, session: AsyncSession + ): project = await create_project(session) user = await create_user(session) repo = await create_repo(session=session, project_id=project.id) - fleet = await create_fleet(session=session, project=project) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = Range(min=1, max=2) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) instance = await create_instance( session=session, project=project, + fleet=fleet, instance_num=0, status=InstanceStatus.BUSY, ) @@ -738,14 +680,8 @@ async def test_creates_new_instance_in_existing_fleet(self, test_db, session: As ) await session.commit() - offer = InstanceOfferWithAvailability( + offer = get_instance_offer_with_availability( backend=BackendType.AWS, - instance=InstanceType( - name="instance", - resources=Resources(cpus=4, memory_mib=8192, spot=False, gpus=[]), - ), - region="us", - price=1.0, availability=InstanceAvailability.AVAILABLE, ) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -753,20 +689,7 @@ async def test_creates_new_instance_in_existing_fleet(self, test_db, session: As m.return_value = [backend_mock] backend_mock.TYPE = BackendType.AWS backend_mock.compute.return_value.get_offers_cached.return_value = [offer] - backend_mock.compute.return_value.run_job.return_value = JobProvisioningData( - backend=offer.backend, - instance_type=offer.instance, - instance_id="instance_id", - hostname="1.1.1.1", - internal_ip=None, - region=offer.region, - price=offer.price, - username="ubuntu", - ssh_port=22, - ssh_proxy=None, - dockerized=True, - backend_data=None, - ) + backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data() await process_submitted_jobs() m.assert_called_once() backend_mock.compute.return_value.get_offers_cached.assert_called_once() @@ -780,6 +703,88 @@ async def test_creates_new_instance_in_existing_fleet(self, test_db, session: As assert job.instance.instance_num == 1 assert job.instance.fleet_id == fleet.id + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncSession): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = Range(min=0, max=1) + await create_fleet(session=session, project=project, spec=fleet_spec) + # Need a second non-empty fleet to have two-stage processing + fleet2 = await create_fleet(session=session, project=project, spec=fleet_spec) + await create_instance( + session=session, + project=project, + fleet=fleet2, + instance_num=0, + status=InstanceStatus.BUSY, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=False, + ) + await process_submitted_jobs() + await session.refresh(job) + res = await session.execute(select(JobModel)) + job = res.unique().scalar_one() + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_creates_new_instance_in_existing_empty_fleet( + self, test_db, session: AsyncSession + ): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = Range(min=0, max=1) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + fleet=fleet, + instance_assigned=True, + ) + offer = get_instance_offer_with_availability( + backend=BackendType.AWS, + availability=InstanceAvailability.AVAILABLE, + ) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + m.return_value = [backend_mock] + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers_cached.return_value = [offer] + backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data() + await process_submitted_jobs() + m.assert_called_once() + backend_mock.compute.return_value.get_offers_cached.assert_called_once() + backend_mock.compute.return_value.run_job.assert_called_once() + + await session.refresh(job) + res = await session.execute(select(JobModel).options(joinedload(JobModel.instance))) + job = res.unique().scalar_one() + assert job.status == JobStatus.PROVISIONING + assert job.instance is not None + assert job.instance.instance_num == 0 + assert job.instance.fleet_id == fleet.id + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_picks_high_priority_jobs_first(self, test_db, session: AsyncSession): @@ -789,9 +794,11 @@ async def test_picks_high_priority_jobs_first(self, test_db, session: AsyncSessi session=session, project_id=project.id, ) + fleet = await create_fleet(session=session, project=project) instance = await create_instance( session=session, project=project, + fleet=fleet, status=InstanceStatus.IDLE, ) run1 = await create_run( From 173eba01dfb0832d814a3fefceaaf08228e76d54 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 11 Aug 2025 11:34:45 +0500 Subject: [PATCH 06/20] Rebase migrations --- ...del_fleet.py => 474af8479c09_add_jobmodel_fleet.py} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename src/dstack/_internal/server/migrations/versions/{ecd3cfc5c86e_add_jobmodel_fleet.py => 474af8479c09_add_jobmodel_fleet.py} (87%) diff --git a/src/dstack/_internal/server/migrations/versions/ecd3cfc5c86e_add_jobmodel_fleet.py b/src/dstack/_internal/server/migrations/versions/474af8479c09_add_jobmodel_fleet.py similarity index 87% rename from src/dstack/_internal/server/migrations/versions/ecd3cfc5c86e_add_jobmodel_fleet.py rename to src/dstack/_internal/server/migrations/versions/474af8479c09_add_jobmodel_fleet.py index ff92108ce3..b3c2b605f2 100644 --- a/src/dstack/_internal/server/migrations/versions/ecd3cfc5c86e_add_jobmodel_fleet.py +++ b/src/dstack/_internal/server/migrations/versions/474af8479c09_add_jobmodel_fleet.py @@ -1,8 +1,8 @@ """Add JobModel.fleet -Revision ID: ecd3cfc5c86e -Revises: 728b1488b1b4 -Create Date: 2025-08-08 17:51:27.267140 +Revision ID: 474af8479c09 +Revises: 74a1f55209bd +Create Date: 2025-08-11 11:32:39.847360 """ @@ -11,8 +11,8 @@ from alembic import op # revision identifiers, used by Alembic. -revision = "ecd3cfc5c86e" -down_revision = "728b1488b1b4" +revision = "474af8479c09" +down_revision = "74a1f55209bd" branch_labels = None depends_on = None From bfe3521fb91e57385b36589fadc2e49114018a61 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 11 Aug 2025 13:47:20 +0500 Subject: [PATCH 07/20] Prioritize fleet choice by capacity and offer price --- .../tasks/process_submitted_jobs.py | 118 +++++++++++------- .../tasks/test_process_submitted_jobs.py | 52 ++++++++ 2 files changed, 127 insertions(+), 43 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 24c4e37e07..e570100217 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -53,7 +53,6 @@ from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.fleets import ( fleet_model_to_fleet, - get_fleet_spec, ) from dstack._internal.server.services.instances import ( filter_pool_instances, @@ -298,47 +297,14 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): .execution_options(populate_existing=True) ) fleet_models = list(res.unique().scalars().all()) - fleet_instances_with_offers = [] - for candidate_fleet_model in fleet_models: - fleet_instances_with_offers = await _get_fleet_instances_with_offers( - fleet_model=candidate_fleet_model, - run_spec=run_spec, - job=job, - master_job_provisioning_data=master_job_provisioning_data, - volumes=volumes, - ) - if run_model.fleet_id is not None: - # Using the first fleet that was already chosen by the master job. - fleet_model = candidate_fleet_model - break - # Looking for an eligible fleet for the run. - # TODO: Pick optimal fleet instead of the first eligible one. - fleet_spec = get_fleet_spec(candidate_fleet_model) - fleet_capacity = len( - [o for o in fleet_instances_with_offers if o[1].availability.is_available()] - ) - if fleet_spec.configuration.nodes is not None: - if fleet_spec.configuration.nodes.max is None: - fleet_capacity = math.inf - else: - # FIXME: Multiple service jobs can be provisioned on one instance with blocks. - # Current capacity calculation does not take future provisioned blocks into account. - # It may be impossible to do since we cannot be sure which instance will be provisioned. - fleet_capacity += fleet_spec.configuration.nodes.max - len( - candidate_fleet_model.instances - ) - instances_required = 1 - if run_spec.configuration.type == "task": - instances_required = run_spec.configuration.nodes - elif ( - run_spec.configuration.type == "service" - and run_spec.configuration.replicas.min is not None - ): - instances_required = run_spec.configuration.replicas.min - if fleet_capacity >= instances_required: - # TODO: Ensure we use the chosen fleet when there are no instance assigned. - fleet_model = candidate_fleet_model - break + fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers( + fleet_models=fleet_models, + run_model=run_model, + run_spec=run.run_spec, + job=job, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) instance = await _assign_job_to_fleet_instance( session=session, instances_with_offers=fleet_instances_with_offers, @@ -453,7 +419,73 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): await session.commit() -async def _get_fleet_instances_with_offers( +def _find_optimal_fleet_with_offers( + fleet_models: list[FleetModel], + run_model: RunModel, + run_spec: RunSpec, + job: Job, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: Optional[list[list[Volume]]], +) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]: + if run_model.fleet is not None: + # Using the fleet that was already chosen by the master job + fleet_instances_with_offers = _get_fleet_instances_with_offers( + fleet_model=run_model.fleet, + run_spec=run_spec, + job=job, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + return run_model.fleet, fleet_instances_with_offers + + if len(fleet_models) == 0: + return None, [] + + nodes_required_num = _get_nodes_required_num_for_run(run_spec) + + candidate_fleets_with_offers: list[ + tuple[ + Optional[FleetModel], + list[tuple[InstanceModel, InstanceOfferWithAvailability]], + tuple[int, float], + ] + ] = [] + for candidate_fleet_model in fleet_models: + fleet_instances_with_offers = _get_fleet_instances_with_offers( + fleet_model=candidate_fleet_model, + run_spec=run_spec, + job=job, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + fleet_available_offers = [ + o for _, o in fleet_instances_with_offers if o.availability.is_available() + ] + fleet_has_available_capacity = nodes_required_num <= len(fleet_available_offers) + fleet_cheapest_offer = math.inf + if len(fleet_available_offers) > 0: + fleet_cheapest_offer = fleet_available_offers[0].price + fleet_priority = (not fleet_has_available_capacity, fleet_cheapest_offer) + candidate_fleets_with_offers.append( + (candidate_fleet_model, fleet_instances_with_offers, fleet_priority) + ) + candidate_fleets_with_offers.sort(key=lambda t: t[-1]) + return candidate_fleets_with_offers[0][:2] + + +def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int: + nodes_required_num = 1 + if run_spec.configuration.type == "task": + nodes_required_num = run_spec.configuration.nodes + elif ( + run_spec.configuration.type == "service" + and run_spec.configuration.replicas.min is not None + ): + nodes_required_num = run_spec.configuration.replicas.min + return nodes_required_num + + +def _get_fleet_instances_with_offers( fleet_model: FleetModel, run_spec: RunSpec, job: Job, diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index 13adb1aacd..ca1ecd961d 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -785,6 +785,58 @@ async def test_creates_new_instance_in_existing_empty_fleet( assert job.instance.instance_num == 0 assert job.instance.fleet_id == fleet.id + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_assigns_job_to_optimal_fleet(self, test_db, session: AsyncSession): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo(session=session, project_id=project.id) + fleet1 = await create_fleet(session=session, project=project) + fleet2 = await create_fleet(session=session, project=project) + fleet3 = await create_fleet(session=session, project=project) + await create_instance( + session=session, + project=project, + fleet=fleet1, + instance_num=0, + status=InstanceStatus.BUSY, + price=1, + ) + await create_instance( + session=session, + project=project, + fleet=fleet2, + instance_num=0, + status=InstanceStatus.IDLE, + price=2, + ) + await create_instance( + session=session, + project=project, + fleet=fleet3, + instance_num=0, + status=InstanceStatus.IDLE, + price=3, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=False, + ) + await process_submitted_jobs() + await session.refresh(job) + res = await session.execute(select(JobModel)) + job = res.unique().scalar_one() + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.fleet_id == fleet2.id + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_picks_high_priority_jobs_first(self, test_db, session: AsyncSession): From 1f47e2b98ef7aff27316dbf2f6ecd2b66b3dc40a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 11 Aug 2025 13:54:44 +0500 Subject: [PATCH 08/20] Unassign run from fleet for retrying --- src/dstack/_internal/server/background/tasks/process_runs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 3371621a9d..13349acf4b 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -402,6 +402,8 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): if new_status == RunStatus.PENDING: run_metrics.increment_pending_runs(run_model.project.name, run_spec.configuration.type) + # Unassign run from fleet so that the new fleet can be chosen when retrying + run_model.fleet = None run_model.status = new_status run_model.termination_reason = termination_reason From c50d5782f919efdc4a9daf921539d67144a311ce Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 11 Aug 2025 14:04:16 +0500 Subject: [PATCH 09/20] Comment on optimal fleet choice --- .../server/background/tasks/process_submitted_jobs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index e570100217..853c159881 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -442,7 +442,9 @@ def _find_optimal_fleet_with_offers( return None, [] nodes_required_num = _get_nodes_required_num_for_run(run_spec) - + # The current strategy is to first consider fleets that can accommodate + # the run without additional provisioning and choose the one with the cheapest offer. + # Fallback to fleet with the cheapest offer among all fleets. candidate_fleets_with_offers: list[ tuple[ Optional[FleetModel], From a3ef3644972d0627d791d258a3bfcd689c2de45b Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 11 Aug 2025 15:22:25 +0500 Subject: [PATCH 10/20] Refactor fleet/instance selects to fix for update with outer join --- .../tasks/process_submitted_jobs.py | 136 ++++++++++++------ 1 file changed, 92 insertions(+), 44 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 853c159881..95b4efb543 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -235,6 +235,10 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): # Then, the job runs on the assigned instance or a new instance is provisioned. # This is needed to avoid holding instances lock for a long time. if not job_model.instance_assigned: + # If another job freed the instance but is still trying to detach volumes, + # do not provision on it to prevent attaching volumes that are currently detaching. + detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) + fleet_filters = [ FleetModel.project_id == project.id, FleetModel.deleted == False, @@ -243,60 +247,41 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): fleet_filters.append(FleetModel.id == run_model.fleet_id) if run_spec.configuration.fleets is not None: fleet_filters.append(FleetModel.name.in_(run_spec.configuration.fleets)) - res = await session.execute( - select(FleetModel) - .outerjoin(FleetModel.instances) - .where(*fleet_filters) - .where( - or_( - InstanceModel.id.is_(None), - and_( - InstanceModel.deleted == False, - InstanceModel.total_blocks > InstanceModel.busy_blocks, - ), - ) - ) - .options(contains_eager(FleetModel.instances)) - .order_by(InstanceModel.id) # take locks in order - .with_for_update(key_share=True, of=InstanceModel) + + instance_filters = [ + InstanceModel.deleted == False, + InstanceModel.total_blocks > InstanceModel.busy_blocks, + InstanceModel.id.not_in(detaching_instances_ids), + ] + + fleet_models_with_instances, fleet_models_without_instances = await _select_fleet_models( + session=session, + fleet_filters=fleet_filters, + instance_filters=instance_filters, ) - fleet_models = list(res.unique().scalars().all()) - fleets_ids = sorted([f.id for f in fleet_models]) instances_ids = sorted( - itertools.chain.from_iterable([i.id for i in f.instances] for f in fleet_models) + itertools.chain.from_iterable( + [i.id for i in f.instances] for f in fleet_models_with_instances + ) ) + fleet_models = fleet_models_with_instances + fleet_models_without_instances + fleets_ids = [f.id for f in fleet_models] + if get_db().dialect_name == "sqlite": # Start new transaction to see committed changes after lock await session.commit() + async with get_locker(get_db().dialect_name).lock_ctx( InstanceModel.__tablename__, instances_ids ): - # If another job freed the instance but is still trying to detach volumes, - # do not provision on it to prevent attaching volumes that are currently detaching. - detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) - # Refetch after lock - res = await session.execute( - select(FleetModel) - .outerjoin(FleetModel.instances) - .where( - FleetModel.id.in_(fleets_ids), - *fleet_filters, - ) - .where( - or_( - InstanceModel.id.is_(None), - and_( - InstanceModel.id.not_in(detaching_instances_ids), - InstanceModel.id.in_(instances_ids), - InstanceModel.deleted == False, - InstanceModel.total_blocks > InstanceModel.busy_blocks, - ), - ) + if get_db().dialect_name == "sqlite": + fleet_models = await _refetch_fleet_models( + session=session, + fleets_ids=fleets_ids, + instances_ids=instances_ids, + fleet_filters=fleet_filters, + instance_filters=instance_filters, ) - .options(contains_eager(FleetModel.instances)) - .execution_options(populate_existing=True) - ) - fleet_models = list(res.unique().scalars().all()) fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers( fleet_models=fleet_models, run_model=run_model, @@ -419,6 +404,69 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): await session.commit() +async def _select_fleet_models( + session: AsyncSession, fleet_filters: list, instance_filters: list +) -> tuple[list[FleetModel], list[FleetModel]]: + # Selecting fleets in two queries since Postgres does not allow + # locking nullable side of an outer join. So, first lock instances with inner join. + # Then select left out fleets without instances. + res = await session.execute( + select(FleetModel) + .join(FleetModel.instances) + .where(*fleet_filters) + .where(*instance_filters) + .options(contains_eager(FleetModel.instances)) + .order_by(InstanceModel.id) # take locks in order + .with_for_update(key_share=True, of=InstanceModel) + ) + fleet_models_with_instances = list(res.unique().scalars().all()) + fleet_models_with_instances_ids = [f.id for f in fleet_models_with_instances] + res = await session.execute( + select(FleetModel) + .outerjoin(FleetModel.instances) + .where( + *fleet_filters, + FleetModel.id.not_in(fleet_models_with_instances_ids), + ) + .where(InstanceModel.id.is_(None)) + .options(contains_eager(FleetModel.instances)) # loading empty relation + ) + fleet_models_without_instances = list(res.unique().scalars().all()) + return fleet_models_with_instances, fleet_models_without_instances + + +async def _refetch_fleet_models( + session: AsyncSession, + fleets_ids: list[uuid.UUID], + instances_ids: list[uuid.UUID], + fleet_filters: list, + instance_filters: list, +) -> list[FleetModel]: + res = await session.execute( + select(FleetModel) + .outerjoin(FleetModel.instances) + .where( + FleetModel.id.in_(fleets_ids), + *fleet_filters, + ) + .where( + and_( + InstanceModel.id.in_(instances_ids), + or_( + InstanceModel.id.is_(None), + and_( + *instance_filters, + ), + ), + ) + ) + .options(contains_eager(FleetModel.instances)) + .execution_options(populate_existing=True) + ) + fleet_models = list(res.unique().scalars().all()) + return fleet_models + + def _find_optimal_fleet_with_offers( fleet_models: list[FleetModel], run_model: RunModel, From c6fb80c1056aee40751a917f222546b2c5f19081 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 11 Aug 2025 16:15:46 +0500 Subject: [PATCH 11/20] If no fleets have available offers, create a new fleet --- .../tasks/process_submitted_jobs.py | 13 +++++-- .../tasks/test_process_submitted_jobs.py | 36 +++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 95b4efb543..fff76ee828 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -492,11 +492,12 @@ def _find_optimal_fleet_with_offers( nodes_required_num = _get_nodes_required_num_for_run(run_spec) # The current strategy is to first consider fleets that can accommodate # the run without additional provisioning and choose the one with the cheapest offer. - # Fallback to fleet with the cheapest offer among all fleets. + # Fallback to fleet with the cheapest offer among all fleets with offers. candidate_fleets_with_offers: list[ tuple[ Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]], + int, tuple[int, float], ] ] = [] @@ -517,8 +518,16 @@ def _find_optimal_fleet_with_offers( fleet_cheapest_offer = fleet_available_offers[0].price fleet_priority = (not fleet_has_available_capacity, fleet_cheapest_offer) candidate_fleets_with_offers.append( - (candidate_fleet_model, fleet_instances_with_offers, fleet_priority) + ( + candidate_fleet_model, + fleet_instances_with_offers, + len(fleet_available_offers), + fleet_priority, + ) ) + if all(t[2] == 0 for t in candidate_fleets_with_offers): + # If no fleets have available offers, create a new fleet. + return None, [] candidate_fleets_with_offers.sort(key=lambda t: t[-1]) return candidate_fleets_with_offers[0][:2] diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index ca1ecd961d..b88b2484bc 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -703,6 +703,42 @@ async def test_creates_new_instance_in_existing_non_empty_fleet( assert job.instance.instance_num == 1 assert job.instance.fleet_id == fleet.id + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session: AsyncSession): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project) + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=0, + status=InstanceStatus.BUSY, + ) + fleet.instances.append(instance) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=False, + ) + await session.commit() + await process_submitted_jobs() + await session.refresh(job) + res = await session.execute(select(JobModel).options(joinedload(JobModel.instance))) + job = res.unique().scalar_one() + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.instance is None + assert job.fleet is None + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncSession): From 24c0ca42e867bb002aec181083aa3aad244d7b49 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 11 Aug 2025 16:33:17 +0500 Subject: [PATCH 12/20] Revert respect nodes.max --- .../background/tasks/process_submitted_jobs.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index fff76ee828..6220c5d751 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -637,9 +637,6 @@ async def _run_job_on_new_instance( fleet = None if fleet_model is not None: fleet = fleet_model_to_fleet(fleet_model) - # FIXME: Concurrent provisioning may violate nodes.max - # To fix, lock fleet and split instance model creation - # and instance provisioning into separate transactions. if not _check_can_create_new_instance_in_fleet(fleet): logger.debug( "%s: cannot fit new instance into fleet %s", fmt(job_model), fleet_model.name @@ -707,12 +704,10 @@ async def _run_job_on_new_instance( def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool: if fleet.spec.configuration.ssh_config is not None: return False - if ( - fleet.spec.configuration.nodes is not None - and fleet.spec.configuration.nodes.max is not None - and fleet.spec.configuration.nodes.max <= len(fleet.instances) - ): - return False + # TODO: Respect nodes.max + # Ensure concurrent provisioning does not violate nodes.max + # E.g. lock fleet and split instance model creation + # and instance provisioning into separate transactions. return True From 28f235acfb58eb46a6203cc049d8010be445d50f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 12 Aug 2025 09:42:53 +0500 Subject: [PATCH 13/20] Set nodes for autocreated fleet --- .../server/background/tasks/process_submitted_jobs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 6220c5d751..2c7e90f9ca 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -26,7 +26,7 @@ CreationPolicy, TerminationPolicy, ) -from dstack._internal.core.models.resources import Memory +from dstack._internal.core.models.resources import Memory, Range from dstack._internal.core.models.runs import ( Job, JobProvisioningData, @@ -723,6 +723,7 @@ def _create_fleet_model_for_job( name=run.run_spec.run_name, placement=placement, reservation=run.run_spec.configuration.reservation, + nodes=Range(min=_get_nodes_required_num_for_run(run.run_spec), max=None), ), profile=run.run_spec.merged_profile, autocreated=True, From 71d4c8d39f2bf760452679550eafa43080472fd1 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 12 Aug 2025 09:57:15 +0500 Subject: [PATCH 14/20] Forbid new fleets creation with fleets specified --- .../tasks/process_submitted_jobs.py | 10 ++++ src/dstack/_internal/server/testing/common.py | 2 +- .../tasks/test_process_submitted_jobs.py | 47 +++++++++++++++---- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 2c7e90f9ca..3a189b88e8 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -290,6 +290,16 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, ) + if fleet_model is None and run_spec.configuration.fleets is not None: + # Run cannot create new fleets when fleets are specified + logger.debug("%s: failed to use specified fleets", fmt(job_model)) + job_model.status = JobStatus.TERMINATING + job_model.termination_reason = ( + JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + ) + job_model.last_processed_at = common_utils.get_current_datetime() + await session.commit() + return instance = await _assign_job_to_fleet_instance( session=session, instances_with_offers=fleet_instances_with_offers, diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index b7908f758c..0731938ab2 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -258,8 +258,8 @@ async def create_file_archive( def get_run_spec( - run_name: str, repo_id: str, + run_name: str = "test-run", configuration_path: str = "dstack.yaml", profile: Union[Profile, Callable[[], Profile], None] = lambda: Profile(name="default"), configuration: Optional[AnyRunConfiguration] = None, diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index b88b2484bc..20c94a9060 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -732,12 +732,47 @@ async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session: await session.commit() await process_submitted_jobs() await session.refresh(job) - res = await session.execute(select(JobModel).options(joinedload(JobModel.instance))) - job = res.unique().scalar_one() assert job.status == JobStatus.SUBMITTED assert job.instance_assigned - assert job.instance is None - assert job.fleet is None + assert job.instance_id is None + assert job.fleet_id is None + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_fails_with_no_capacity_when_specified_fleets_occupied( + self, test_db, session: AsyncSession + ): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project, name="test-fleet") + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=0, + status=InstanceStatus.BUSY, + ) + fleet.instances.append(instance) + run_spec = get_run_spec(repo_id=repo.name) + run_spec.configuration.fleets = ["test-fleet"] + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=False, + ) + await session.commit() + await process_submitted_jobs() + await session.refresh(job) + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @@ -770,8 +805,6 @@ async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncS ) await process_submitted_jobs() await session.refresh(job) - res = await session.execute(select(JobModel)) - job = res.unique().scalar_one() assert job.status == JobStatus.SUBMITTED assert job.instance_assigned @@ -867,8 +900,6 @@ async def test_assigns_job_to_optimal_fleet(self, test_db, session: AsyncSession ) await process_submitted_jobs() await session.refresh(job) - res = await session.execute(select(JobModel)) - job = res.unique().scalar_one() assert job.status == JobStatus.SUBMITTED assert job.instance_assigned assert job.fleet_id == fleet2.id From eb17a56420122aa50b02264c182f6dce0c08ef78 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 14 Aug 2025 16:14:21 +0500 Subject: [PATCH 15/20] Use Range for replicas --- src/tests/_internal/server/routers/test_runs.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 320e857659..945e039495 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -1176,13 +1176,13 @@ async def test_returns_run_plan_instance_volumes( ServiceConfiguration( commands=["one", "two"], port=80, - replicas=1, + replicas=Range(min=1, max=1), scaling=None, ), ServiceConfiguration( commands=["one", "two"], port=80, - replicas="2..4", + replicas=Range(min=2, max=4), scaling=ScalingSpec(metric="rps", target=5), ), "update", @@ -1193,14 +1193,14 @@ async def test_returns_run_plan_instance_volumes( commands=["one", "two"], port=80, gateway=None, - replicas=1, + replicas=Range(min=1, max=1), scaling=None, ), ServiceConfiguration( commands=["one", "two"], port=8080, gateway="test-gateway", # not updatable - replicas="2..4", + replicas=Range(min=2, max=4), scaling=ScalingSpec(metric="rps", target=5), ), "create", @@ -1345,7 +1345,7 @@ async def test_updates_run(self, test_db, session: AsyncSession, client: AsyncCl type="service", commands=["one", "two"], port=80, - replicas=1, + replicas=Range(min=1, max=1), ), ) run_model = await create_run( @@ -1357,7 +1357,7 @@ async def test_updates_run(self, test_db, session: AsyncSession, client: AsyncCl run_spec=run_spec, ) run = run_model_to_run(run_model) - run_spec.configuration.replicas = 2 + run_spec.configuration.replicas = Range(min=2, max=2) response = await client.post( f"/api/project/{project.name}/runs/apply", headers=get_auth_headers(user.token), From 36b6bb608d5b5c06dbd47040343fea44c141875b Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 15 Aug 2025 11:23:54 +0500 Subject: [PATCH 16/20] Ensure models Config is inherited --- src/dstack/_internal/core/models/common.py | 2 +- src/dstack/_internal/core/models/configurations.py | 4 ++-- src/dstack/_internal/core/models/fleets.py | 4 ++-- src/dstack/_internal/core/models/instances.py | 2 +- src/dstack/_internal/core/models/profiles.py | 4 ++-- src/dstack/_internal/core/models/repos/remote.py | 4 ++-- src/dstack/_internal/core/models/resources.py | 8 ++++---- src/dstack/_internal/core/models/runs.py | 2 +- src/dstack/_internal/server/models.py | 2 +- src/dstack/_internal/server/schemas/gateways.py | 2 +- src/dstack/_internal/server/services/docker.py | 2 +- 11 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/dstack/_internal/core/models/common.py b/src/dstack/_internal/core/models/common.py index a139226712..4c4e45fd09 100644 --- a/src/dstack/_internal/core/models/common.py +++ b/src/dstack/_internal/core/models/common.py @@ -102,7 +102,7 @@ class RegistryAuth(CoreModel): password (str): The password or access token """ - class Config: + class Config(CoreModel.Config): frozen = True username: Annotated[str, Field(description="The username")] diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index f9fb0749be..ee0ec61b5f 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -649,7 +649,7 @@ class ServiceConfiguration( ): type: Literal["service"] = "service" - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]): add_extra_schema_types( @@ -717,7 +717,7 @@ class DstackConfiguration(CoreModel): Field(discriminator="type"), ] - class Config: + class Config(CoreModel.Config): json_loads = orjson.loads json_dumps = pydantic_orjson_dumps_with_indent diff --git a/src/dstack/_internal/core/models/fleets.py b/src/dstack/_internal/core/models/fleets.py index fd616b7547..8aaf0d18ee 100644 --- a/src/dstack/_internal/core/models/fleets.py +++ b/src/dstack/_internal/core/models/fleets.py @@ -234,7 +234,7 @@ class InstanceGroupParams(CoreModel): termination_policy: Annotated[Optional[TerminationPolicy], Field(exclude=True)] = None termination_idle_time: Annotated[Optional[Union[str, int]], Field(exclude=True)] = None - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any], model: Type): del schema["properties"]["termination_policy"] @@ -279,7 +279,7 @@ class FleetSpec(CoreModel): # TODO: make merged_profile a computed field after migrating to pydanticV2 merged_profile: Annotated[Profile, Field(exclude=True)] = None - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any], model: Type) -> None: prop = schema.get("properties", {}) diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index 40a86e5d5f..5aefcf3b62 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -122,7 +122,7 @@ class SSHConnectionParams(CoreModel): username: str port: int - class Config: + class Config(CoreModel.Config): frozen = True diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index aca5f5707f..5572ae25dd 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -339,7 +339,7 @@ class ProfileParams(CoreModel): termination_policy: Annotated[Optional[TerminationPolicy], Field(exclude=True)] = None termination_idle_time: Annotated[Optional[Union[str, int]], Field(exclude=True)] = None - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]) -> None: del schema["properties"]["pool_name"] @@ -379,7 +379,7 @@ class Profile(ProfileProps, ProfileParams): class ProfilesConfig(CoreModel): profiles: List[Profile] - class Config: + class Config(CoreModel.Config): json_loads = orjson.loads json_dumps = pydantic_orjson_dumps_with_indent diff --git a/src/dstack/_internal/core/models/repos/remote.py b/src/dstack/_internal/core/models/repos/remote.py index 638582c96f..366767fe79 100644 --- a/src/dstack/_internal/core/models/repos/remote.py +++ b/src/dstack/_internal/core/models/repos/remote.py @@ -32,7 +32,7 @@ class RemoteRepoCreds(CoreModel): # TODO: remove in 0.20. Left for compatibility with CLI <=0.18.44 protocol: Annotated[Optional[str], Field(exclude=True)] = None - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]) -> None: del schema["properties"]["protocol"] @@ -47,7 +47,7 @@ class RemoteRepoInfo(BaseRepoInfo): repo_port: Annotated[Optional[int], Field(exclude=True)] = None repo_user_name: Annotated[Optional[str], Field(exclude=True)] = None - class Config: + class Config(BaseRepoInfo.Config): @staticmethod def schema_extra(schema: Dict[str, Any]) -> None: del schema["properties"]["repo_host_name"] diff --git a/src/dstack/_internal/core/models/resources.py b/src/dstack/_internal/core/models/resources.py index 15c80f7166..13d5dcf2a9 100644 --- a/src/dstack/_internal/core/models/resources.py +++ b/src/dstack/_internal/core/models/resources.py @@ -130,7 +130,7 @@ def __str__(self): class CPUSpec(CoreModel): - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]): add_extra_schema_types( @@ -191,7 +191,7 @@ def _validate_arch(cls, v: Any) -> Any: class GPUSpec(CoreModel): - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]): add_extra_schema_types( @@ -314,7 +314,7 @@ def _vendor_from_string(cls, v: str) -> gpuhunt.AcceleratorVendor: class DiskSpec(CoreModel): - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]): add_extra_schema_types( @@ -340,7 +340,7 @@ def _parse(cls, v: Any) -> Any: class ResourcesSpec(CoreModel): - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]): add_extra_schema_types( diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index d112b52dfa..87a274a0c3 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -444,7 +444,7 @@ class RunSpec(CoreModel): # TODO: make merged_profile a computed field after migrating to pydanticV2 merged_profile: Annotated[Profile, Field(exclude=True)] = None - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any], model: Type) -> None: prop = schema.get("properties", {}) diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index befa4235e3..9ee54fc659 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -84,7 +84,7 @@ class DecryptedString(CoreModel): decrypted: bool = True exc: Optional[Exception] = None - class Config: + class Config(CoreModel.Config): arbitrary_types_allowed = True def get_plaintext_or_error(self) -> str: diff --git a/src/dstack/_internal/server/schemas/gateways.py b/src/dstack/_internal/server/schemas/gateways.py index 2ffc891432..c4d7ebcb77 100644 --- a/src/dstack/_internal/server/schemas/gateways.py +++ b/src/dstack/_internal/server/schemas/gateways.py @@ -14,7 +14,7 @@ class CreateGatewayRequest(CoreModel): backend_type: Annotated[Optional[BackendType], Field(exclude=True)] = None region: Annotated[Optional[str], Field(exclude=True)] = None - class Config: + class Config(CoreModel.Config): @staticmethod def schema_extra(schema: Dict[str, Any]) -> None: del schema["properties"]["name"] diff --git a/src/dstack/_internal/server/services/docker.py b/src/dstack/_internal/server/services/docker.py index 3f0f1a47fa..49e8d8e857 100644 --- a/src/dstack/_internal/server/services/docker.py +++ b/src/dstack/_internal/server/services/docker.py @@ -32,7 +32,7 @@ def __call__(self, dxf: DXF, response: requests.Response) -> None: class DockerImage(CoreModel): - class Config: + class Config(CoreModel.Config): frozen = True image: str From 605985b65ef8dbd64daa103034ff577fbef9ce2a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 15 Aug 2025 11:26:32 +0500 Subject: [PATCH 17/20] Rebase migrations --- ...del_fleet.py => e2d08cd1b8d9_add_jobmodel_fleet.py} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename src/dstack/_internal/server/migrations/versions/{474af8479c09_add_jobmodel_fleet.py => e2d08cd1b8d9_add_jobmodel_fleet.py} (87%) diff --git a/src/dstack/_internal/server/migrations/versions/474af8479c09_add_jobmodel_fleet.py b/src/dstack/_internal/server/migrations/versions/e2d08cd1b8d9_add_jobmodel_fleet.py similarity index 87% rename from src/dstack/_internal/server/migrations/versions/474af8479c09_add_jobmodel_fleet.py rename to src/dstack/_internal/server/migrations/versions/e2d08cd1b8d9_add_jobmodel_fleet.py index b3c2b605f2..6136b7843c 100644 --- a/src/dstack/_internal/server/migrations/versions/474af8479c09_add_jobmodel_fleet.py +++ b/src/dstack/_internal/server/migrations/versions/e2d08cd1b8d9_add_jobmodel_fleet.py @@ -1,8 +1,8 @@ """Add JobModel.fleet -Revision ID: 474af8479c09 -Revises: 74a1f55209bd -Create Date: 2025-08-11 11:32:39.847360 +Revision ID: e2d08cd1b8d9 +Revises: 3d7f6c2ec000 +Create Date: 2025-08-15 11:26:05.670591 """ @@ -11,8 +11,8 @@ from alembic import op # revision identifiers, used by Alembic. -revision = "474af8479c09" -down_revision = "74a1f55209bd" +revision = "e2d08cd1b8d9" +down_revision = "3d7f6c2ec000" branch_labels = None depends_on = None From c9c84bc4f749ca1f74f343577285df7f61ddd0c5 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 15 Aug 2025 15:01:06 +0500 Subject: [PATCH 18/20] Use fleets with no instances if fleets specified --- .../tasks/process_submitted_jobs.py | 20 ++++--- .../tasks/test_process_submitted_jobs.py | 56 +++++++++++++++++-- 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 3a189b88e8..9470e39b79 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -460,13 +460,11 @@ async def _refetch_fleet_models( *fleet_filters, ) .where( - and_( - InstanceModel.id.in_(instances_ids), - or_( - InstanceModel.id.is_(None), - and_( - *instance_filters, - ), + or_( + InstanceModel.id.is_(None), + and_( + InstanceModel.id.in_(instances_ids), + *instance_filters, ), ) ) @@ -535,8 +533,12 @@ def _find_optimal_fleet_with_offers( fleet_priority, ) ) - if all(t[2] == 0 for t in candidate_fleets_with_offers): - # If no fleets have available offers, create a new fleet. + if run_spec.configuration.fleets is None and all( + t[2] == 0 for t in candidate_fleets_with_offers + ): + # If fleets are not specified and no fleets have available offers, create a new fleet. + # This is for compatibility with non-fleet-first UX when runs created new fleets + # if there are no instances to reuse. return None, [] candidate_fleets_with_offers.sort(key=lambda t: t[-1]) return candidate_fleets_with_offers[0][:2] diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index 20c94a9060..c6ea80d2e3 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -776,15 +776,19 @@ async def test_fails_with_no_capacity_when_specified_fleets_occupied( @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncSession): + async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified( + self, test_db, session: AsyncSession + ): project = await create_project(session) user = await create_user(session) repo = await create_repo(session=session, project_id=project.id) fleet_spec = get_fleet_spec() fleet_spec.configuration.nodes = Range(min=0, max=1) - await create_fleet(session=session, project=project, spec=fleet_spec) + await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet") # Need a second non-empty fleet to have two-stage processing - fleet2 = await create_fleet(session=session, project=project, spec=fleet_spec) + fleet2 = await create_fleet( + session=session, project=project, spec=fleet_spec, name="fleet2" + ) await create_instance( session=session, project=project, @@ -807,6 +811,51 @@ async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncS await session.refresh(job) assert job.status == JobStatus.SUBMITTED assert job.instance_assigned + assert job.instance_id is None + assert job.fleet_id is None + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified( + self, test_db, session: AsyncSession + ): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = Range(min=0, max=1) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet") + # Need a second non-empty fleet to have two-stage processing + fleet2 = await create_fleet( + session=session, project=project, spec=fleet_spec, name="fleet2" + ) + await create_instance( + session=session, + project=project, + fleet=fleet2, + instance_num=0, + status=InstanceStatus.BUSY, + ) + run_spec = get_run_spec(repo_id=repo.name) + run_spec.configuration.fleets = [fleet.name, fleet2.name] + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=False, + ) + await process_submitted_jobs() + await session.refresh(job) + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.instance_id is None + assert job.fleet_id == fleet.id @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @@ -852,7 +901,6 @@ async def test_creates_new_instance_in_existing_empty_fleet( assert job.status == JobStatus.PROVISIONING assert job.instance is not None assert job.instance.instance_num == 0 - assert job.instance.fleet_id == fleet.id @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) From e304b898371beca038019c66069f8ff07288dab2 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 15 Aug 2025 15:26:51 +0500 Subject: [PATCH 19/20] Fix tests --- .../tasks/test_process_submitted_jobs.py | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index c6ea80d2e3..42a98a5f4a 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -737,43 +737,6 @@ async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session: assert job.instance_id is None assert job.fleet_id is None - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_fails_with_no_capacity_when_specified_fleets_occupied( - self, test_db, session: AsyncSession - ): - project = await create_project(session) - user = await create_user(session) - repo = await create_repo(session=session, project_id=project.id) - fleet = await create_fleet(session=session, project=project, name="test-fleet") - instance = await create_instance( - session=session, - project=project, - fleet=fleet, - instance_num=0, - status=InstanceStatus.BUSY, - ) - fleet.instances.append(instance) - run_spec = get_run_spec(repo_id=repo.name) - run_spec.configuration.fleets = ["test-fleet"] - run = await create_run( - session=session, - project=project, - repo=repo, - user=user, - run_spec=run_spec, - ) - job = await create_job( - session=session, - run=run, - instance_assigned=False, - ) - await session.commit() - await process_submitted_jobs() - await session.refresh(job) - assert job.status == JobStatus.TERMINATING - assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified( From 7eeb35b47a4a1c06f7e756118ba7a52f174e2dbd Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 15 Aug 2025 15:32:14 +0500 Subject: [PATCH 20/20] Fix tests --- .../tasks/test_process_submitted_jobs.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index 42a98a5f4a..2bc226dded 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -788,19 +788,8 @@ async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified( fleet_spec = get_fleet_spec() fleet_spec.configuration.nodes = Range(min=0, max=1) fleet = await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet") - # Need a second non-empty fleet to have two-stage processing - fleet2 = await create_fleet( - session=session, project=project, spec=fleet_spec, name="fleet2" - ) - await create_instance( - session=session, - project=project, - fleet=fleet2, - instance_num=0, - status=InstanceStatus.BUSY, - ) run_spec = get_run_spec(repo_id=repo.name) - run_spec.configuration.fleets = [fleet.name, fleet2.name] + run_spec.configuration.fleets = [fleet.name] run = await create_run( session=session, project=project, @@ -815,7 +804,6 @@ async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified( ) await process_submitted_jobs() await session.refresh(job) - assert job.status == JobStatus.SUBMITTED assert job.instance_assigned assert job.instance_id is None assert job.fleet_id == fleet.id