Skip to content

Commit 08793c1

Browse files
committed
Check if run cannot fit into fleet
1 parent 9d1e053 commit 08793c1

2 files changed

Lines changed: 110 additions & 22 deletions

File tree

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,6 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
260260

261261
instance_filters = [
262262
InstanceModel.deleted == False,
263-
InstanceModel.total_blocks > InstanceModel.busy_blocks,
264263
InstanceModel.id.not_in(detaching_instances_ids),
265264
]
266265

@@ -514,9 +513,6 @@ async def _find_optimal_fleet_with_offers(
514513
)
515514
return run_model.fleet, fleet_instances_with_pool_offers
516515

517-
if len(fleet_models) == 0:
518-
return None, []
519-
520516
nodes_required_num = _get_nodes_required_num_for_run(run_spec)
521517
# The current strategy is first to consider fleets that can accommodate
522518
# the run without additional provisioning and choose the one with the cheapest pool offer.
@@ -534,31 +530,29 @@ async def _find_optimal_fleet_with_offers(
534530
]
535531
] = []
536532
for candidate_fleet_model in fleet_models:
533+
candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
537534
fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers(
538535
fleet_model=candidate_fleet_model,
539536
run_spec=run_spec,
540537
job=job,
541538
master_job_provisioning_data=master_job_provisioning_data,
542539
volumes=volumes,
543540
)
544-
fleet_has_available_capacity = nodes_required_num <= len(fleet_instances_with_pool_offers)
541+
fleet_has_pool_capacity = nodes_required_num <= len(fleet_instances_with_pool_offers)
545542
fleet_cheapest_pool_offer = math.inf
546543
if len(fleet_instances_with_pool_offers) > 0:
547544
fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price
548545

549-
candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
550-
profile = None
551-
requirements = None
552546
try:
547+
_check_can_create_new_instance_in_fleet(candidate_fleet)
553548
profile, requirements = _get_run_profile_and_requirements_in_fleet(
554549
job=job,
555550
run_spec=run_spec,
556551
fleet=candidate_fleet,
557552
)
558553
except ValueError:
559-
pass
560-
fleet_backend_offers = []
561-
if profile is not None and requirements is not None:
554+
fleet_backend_offers = []
555+
else:
562556
multinode = (
563557
candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
564558
or job.job_spec.jobs_per_replica > 1
@@ -579,8 +573,12 @@ async def _find_optimal_fleet_with_offers(
579573
if len(fleet_backend_offers) > 0:
580574
fleet_cheapest_backend_offer = fleet_backend_offers[0][1].price
581575

576+
if not _run_can_fit_into_fleet(run_spec, candidate_fleet):
577+
logger.debug("Skipping fleet %s from consideration: run cannot fit into fleet")
578+
continue
579+
582580
fleet_priority = (
583-
not fleet_has_available_capacity,
581+
not fleet_has_pool_capacity,
584582
fleet_cheapest_pool_offer,
585583
fleet_cheapest_backend_offer,
586584
)
@@ -593,10 +591,13 @@ async def _find_optimal_fleet_with_offers(
593591
fleet_priority,
594592
)
595593
)
594+
if len(candidate_fleets_with_offers) == 0:
595+
return None, []
596596
if run_spec.merged_profile.fleets is None and all(
597597
t[2] == 0 and t[3] == 0 for t in candidate_fleets_with_offers
598598
):
599-
# If fleets are not specified and no fleets have available pool or backend offers, create a new fleet.
599+
# If fleets are not specified and no fleets have available pool
600+
# or backend offers, create a new fleet.
600601
# This is for compatibility with non-fleet-first UX when runs created new fleets
601602
# if there are no instances to reuse.
602603
return None, []
@@ -616,6 +617,31 @@ def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int:
616617
return nodes_required_num
617618

618619

620+
def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool:
621+
"""
622+
Returns `False` if the run cannot fit into fleet for sure.
623+
This is helpful heuristic to avoid even considering fleets too small for a run.
624+
A run may not fit even if this function returns `True`.
625+
This will lead to some jobs failing due to exceeding `nodes.max`
626+
or more than `nodes.max` instances being provisioned
627+
and eventually removed by the fleet consolidation logic.
628+
"""
629+
# No check for cloud fleets with blocks > 1 since we don't know
630+
# how many jobs such fleets can accommodate.
631+
# TODO: Check if cannot fit into SSH fleet.
632+
nodes_required_num = _get_nodes_required_num_for_run(run_spec)
633+
if (
634+
fleet.spec.configuration.nodes is not None
635+
and fleet.spec.configuration.blocks == 1
636+
and fleet.spec.configuration.nodes.max is not None
637+
):
638+
busy_instances = [i for i in fleet.instances if i.busy_blocks > 0]
639+
fleet_available_capacity = fleet.spec.configuration.nodes.max - len(busy_instances)
640+
if fleet_available_capacity < nodes_required_num:
641+
return False
642+
return True
643+
644+
619645
def _get_fleet_instances_with_pool_offers(
620646
fleet_model: FleetModel,
621647
run_spec: RunSpec,
@@ -713,6 +739,7 @@ async def _run_job_on_new_instance(
713739
if fleet_model is not None:
714740
fleet = fleet_model_to_fleet(fleet_model)
715741
try:
742+
_check_can_create_new_instance_in_fleet(fleet)
716743
profile, requirements = _get_run_profile_and_requirements_in_fleet(
717744
job=job,
718745
run_spec=run.run_spec,
@@ -787,8 +814,6 @@ def _get_run_profile_and_requirements_in_fleet(
787814
run_spec: RunSpec,
788815
fleet: Fleet,
789816
) -> tuple[Profile, Requirements]:
790-
if not _check_can_create_new_instance_in_fleet(fleet):
791-
raise ValueError("Cannot fit new instance into fleet")
792817
profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile)
793818
if profile is None:
794819
raise ValueError("Cannot combine fleet profile")
@@ -801,13 +826,23 @@ def _get_run_profile_and_requirements_in_fleet(
801826
return profile, requirements
802827

803828

804-
def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool:
829+
def _check_can_create_new_instance_in_fleet(fleet: Fleet):
830+
if not _can_create_new_instance_in_fleet(fleet):
831+
raise ValueError("Cannot fit new instance into fleet")
832+
833+
834+
def _can_create_new_instance_in_fleet(fleet: Fleet) -> bool:
805835
if fleet.spec.configuration.ssh_config is not None:
806836
return False
807-
# TODO: Respect nodes.max
808-
# Ensure concurrent provisioning does not violate nodes.max
809-
# E.g. lock fleet and split instance model creation
810-
# and instance provisioning into separate transactions.
837+
active_instances = [i for i in fleet.instances if i.status.is_active()]
838+
# nodes.max is a soft limit that can be exceeded when provisioning concurrently.
839+
# The fleet consolidation logic will remove redundant nodes eventually.
840+
if (
841+
fleet.spec.configuration.nodes is not None
842+
and fleet.spec.configuration.nodes.max is not None
843+
and len(active_instances) >= fleet.spec.configuration.nodes.max
844+
):
845+
return False
811846
return True
812847

813848

src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,9 @@ async def test_assigns_job_to_shared_instance(self, test_db, session: AsyncSessi
494494
project_id=project.id,
495495
)
496496
offer = get_instance_offer_with_availability(gpu_count=8, cpu_count=64, memory_gib=128)
497-
fleet = await create_fleet(session=session, project=project)
497+
fleet_spec = get_fleet_spec()
498+
fleet_spec.configuration.blocks = 4
499+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
498500
instance = await create_instance(
499501
session=session,
500502
project=project,
@@ -537,7 +539,9 @@ async def test_assigns_multi_node_job_to_shared_instance(self, test_db, session:
537539
project_id=project.id,
538540
)
539541
offer = get_instance_offer_with_availability(gpu_count=8, cpu_count=64, memory_gib=128)
540-
fleet = await create_fleet(session=session, project=project)
542+
fleet_spec = get_fleet_spec()
543+
fleet_spec.configuration.nodes = FleetNodesSpec(min=1, target=1, max=None)
544+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
541545
instance = await create_instance(
542546
session=session,
543547
project=project,
@@ -743,6 +747,55 @@ async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session:
743747
assert job.instance_id is None
744748
assert job.fleet_id is None
745749

750+
@pytest.mark.asyncio
751+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
752+
async def test_assigns_no_fleet_if_run_cannot_fit(self, test_db, session: AsyncSession):
753+
project = await create_project(session)
754+
user = await create_user(session)
755+
repo = await create_repo(session=session, project_id=project.id)
756+
fleet_spec = get_fleet_spec()
757+
fleet_spec.configuration.nodes = FleetNodesSpec(min=1, target=1, max=3)
758+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
759+
instance1 = await create_instance(
760+
session=session,
761+
project=project,
762+
fleet=fleet,
763+
instance_num=0,
764+
status=InstanceStatus.BUSY,
765+
busy_blocks=1,
766+
)
767+
instance2 = await create_instance(
768+
session=session,
769+
project=project,
770+
fleet=fleet,
771+
instance_num=1,
772+
status=InstanceStatus.IDLE,
773+
busy_blocks=0,
774+
)
775+
fleet.instances.append(instance1)
776+
fleet.instances.append(instance2)
777+
run_spec = get_run_spec(repo_id=repo.name)
778+
run_spec.configuration = TaskConfiguration(nodes=3, commands=["echo"])
779+
run = await create_run(
780+
session=session,
781+
project=project,
782+
repo=repo,
783+
user=user,
784+
run_spec=run_spec,
785+
)
786+
job = await create_job(
787+
session=session,
788+
run=run,
789+
instance_assigned=False,
790+
)
791+
await session.commit()
792+
await process_submitted_jobs()
793+
await session.refresh(job)
794+
assert job.status == JobStatus.SUBMITTED
795+
assert job.instance_assigned
796+
assert job.instance_id is None
797+
assert job.fleet_id is None
798+
746799
@pytest.mark.asyncio
747800
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
748801
async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers_if_fleets_unspecified(

0 commit comments

Comments
 (0)