From 115212cf0586cd27483c4d2ad449603dfa5c17bb Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 15 Sep 2025 14:51:12 +0500 Subject: [PATCH 1/2] Consider backend offers when choosing optimal fleet --- .../tasks/process_submitted_jobs.py | 87 ++++++++++++++----- 1 file changed, 65 insertions(+), 22 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 5a1ff64a50..41f925ca1d 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -289,7 +289,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): instance_filters=instance_filters, ) fleet_models = fleet_models_with_instances + fleet_models_without_instances - fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers( + fleet_model, fleet_instances_with_offers = await _find_optimal_fleet_with_offers( + project=project, fleet_models=fleet_models, run_model=run_model, run_spec=run.run_spec, @@ -492,7 +493,8 @@ async def _refetch_fleet_models_with_instances( return fleet_models -def _find_optimal_fleet_with_offers( +async def _find_optimal_fleet_with_offers( + project: ProjectModel, fleet_models: list[FleetModel], run_model: RunModel, run_spec: RunSpec, @@ -502,58 +504,99 @@ def _find_optimal_fleet_with_offers( ) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]: if run_model.fleet is not None: # Using the fleet that was already chosen by the master job - fleet_instances_with_offers = _get_fleet_instances_with_offers( + fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers( fleet_model=run_model.fleet, run_spec=run_spec, job=job, master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, ) - return run_model.fleet, fleet_instances_with_offers + return run_model.fleet, fleet_instances_with_pool_offers if len(fleet_models) == 0: return None, [] nodes_required_num = _get_nodes_required_num_for_run(run_spec) - # The current strategy is to first consider fleets that can accommodate - # the run without additional provisioning and choose the one with the cheapest offer. - # Fallback to fleet with the cheapest offer among all fleets with offers. + # The current strategy is first to consider fleets that can accommodate + # the run without additional provisioning and choose the one with the cheapest pool offer. + # Then choose a fleet with the cheapest pool offer among all fleets with pool offers. + # If there are no fleets with pool offers, choose a fleet with a cheapest backend offer. + # Fallback to autocreated fleet if fleets have no pool or backend offers. + # TODO: Consider trying all backend offers and then choosing a fleet. candidate_fleets_with_offers: list[ tuple[ Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]], int, - tuple[int, float], + int, + tuple[int, float, float], ] ] = [] for candidate_fleet_model in fleet_models: - fleet_instances_with_offers = _get_fleet_instances_with_offers( + fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers( fleet_model=candidate_fleet_model, run_spec=run_spec, job=job, master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, ) - fleet_available_offers = [ - o for _, o in fleet_instances_with_offers if o.availability.is_available() - ] - fleet_has_available_capacity = nodes_required_num <= len(fleet_available_offers) - fleet_cheapest_offer = math.inf - if len(fleet_available_offers) > 0: - fleet_cheapest_offer = fleet_available_offers[0].price - fleet_priority = (not fleet_has_available_capacity, fleet_cheapest_offer) + fleet_has_available_capacity = nodes_required_num <= len(fleet_instances_with_pool_offers) + fleet_cheapest_pool_offer = math.inf + if len(fleet_instances_with_pool_offers) > 0: + fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price + + candidate_fleet = fleet_model_to_fleet(candidate_fleet_model) + profile = combine_fleet_and_run_profiles( + candidate_fleet.spec.merged_profile, run_spec.merged_profile + ) + fleet_requirements = get_fleet_requirements(candidate_fleet.spec) + requirements = combine_fleet_and_run_requirements( + fleet_requirements, job.job_spec.requirements + ) + multinode = ( + candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER + or job.job_spec.jobs_per_replica > 1 + ) + fleet_backend_offers = [] + if ( + _check_can_create_new_instance_in_fleet(candidate_fleet) + and profile is not None + and requirements is not None + ): + fleet_backend_offers = await get_offers_by_requirements( + project=project, + profile=profile, + requirements=requirements, + exclude_not_available=True, + multinode=multinode, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + privileged=job.job_spec.privileged, + instance_mounts=check_run_spec_requires_instance_mounts(run_spec), + ) + + fleet_cheapest_backend_offer = math.inf + if len(fleet_backend_offers) > 0: + fleet_cheapest_backend_offer = fleet_backend_offers[0][1].price + + fleet_priority = ( + not fleet_has_available_capacity, + fleet_cheapest_pool_offer, + fleet_cheapest_backend_offer, + ) candidate_fleets_with_offers.append( ( candidate_fleet_model, - fleet_instances_with_offers, - len(fleet_available_offers), + fleet_instances_with_pool_offers, + len(fleet_instances_with_pool_offers), + len(fleet_backend_offers), fleet_priority, ) ) if run_spec.merged_profile.fleets is None and all( - t[2] == 0 for t in candidate_fleets_with_offers + t[2] == 0 and t[3] == 0 for t in candidate_fleets_with_offers ): - # If fleets are not specified and no fleets have available offers, create a new fleet. + # If fleets are not specified and no fleets have available pool or backend offers, create a new fleet. # This is for compatibility with non-fleet-first UX when runs created new fleets # if there are no instances to reuse. return None, [] @@ -573,7 +616,7 @@ def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int: return nodes_required_num -def _get_fleet_instances_with_offers( +def _get_fleet_instances_with_pool_offers( fleet_model: FleetModel, run_spec: RunSpec, job: Job, From 89616e62061a415e4d79a3a07a6e46e14649f762 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 16 Sep 2025 10:22:48 +0500 Subject: [PATCH 2/2] Test --- .../tasks/test_process_submitted_jobs.py | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index 868bfb6355..cbf3872846 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -16,6 +16,7 @@ InstanceStatus, ) from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.resources import Range, ResourcesSpec from dstack._internal.core.models.runs import ( JobStatus, JobTerminationReason, @@ -744,7 +745,7 @@ async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified( + async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers_if_fleets_unspecified( self, test_db, session: AsyncSession ): project = await create_project(session) @@ -782,6 +783,58 @@ async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified( assert job.instance_id is None assert job.fleet_id is None + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_assigns_job_to_elastic_empty_fleet_with_backend_offers_if_fleets_unspecified( + self, test_db, session: AsyncSession + ): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec1 = get_fleet_spec() + fleet_spec1.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + fleet1 = await create_fleet( + session=session, project=project, spec=fleet_spec1, name="fleet" + ) + # Need a second non-empty fleet to have two-stage processing + fleet_spec2 = get_fleet_spec() + # Empty resources intersection to return no backend offers + fleet_spec2.configuration.resources = ResourcesSpec(cpu=Range(min=0, max=0)) + fleet2 = await create_fleet( + session=session, project=project, spec=fleet_spec2, name="fleet2" + ) + await create_instance( + session=session, + project=project, + fleet=fleet2, + instance_num=0, + status=InstanceStatus.BUSY, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=False, + ) + aws_mock = Mock() + aws_mock.TYPE = BackendType.AWS + offer = get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0) + aws_mock.compute.return_value = Mock(spec=ComputeMockSpec) + aws_mock.compute.return_value.get_offers.return_value = [offer] + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + m.return_value = [aws_mock] + await process_submitted_jobs() + await session.refresh(job) + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.instance_id is None + assert job.fleet_id == fleet1.id + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified(