Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
instance_filters=instance_filters,
)
fleet_models = fleet_models_with_instances + fleet_models_without_instances
fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers(
fleet_model, fleet_instances_with_offers = await _find_optimal_fleet_with_offers(
project=project,
fleet_models=fleet_models,
run_model=run_model,
run_spec=run.run_spec,
Expand Down Expand Up @@ -492,7 +493,8 @@ async def _refetch_fleet_models_with_instances(
return fleet_models


def _find_optimal_fleet_with_offers(
async def _find_optimal_fleet_with_offers(
project: ProjectModel,
fleet_models: list[FleetModel],
run_model: RunModel,
run_spec: RunSpec,
Expand All @@ -502,58 +504,99 @@ def _find_optimal_fleet_with_offers(
) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]:
if run_model.fleet is not None:
# Using the fleet that was already chosen by the master job
fleet_instances_with_offers = _get_fleet_instances_with_offers(
fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers(
fleet_model=run_model.fleet,
run_spec=run_spec,
job=job,
master_job_provisioning_data=master_job_provisioning_data,
volumes=volumes,
)
return run_model.fleet, fleet_instances_with_offers
return run_model.fleet, fleet_instances_with_pool_offers

if len(fleet_models) == 0:
return None, []

nodes_required_num = _get_nodes_required_num_for_run(run_spec)
# The current strategy is to first consider fleets that can accommodate
# the run without additional provisioning and choose the one with the cheapest offer.
# Fallback to fleet with the cheapest offer among all fleets with offers.
# The current strategy is first to consider fleets that can accommodate
# the run without additional provisioning and choose the one with the cheapest pool offer.
# Then choose a fleet with the cheapest pool offer among all fleets with pool offers.
# If there are no fleets with pool offers, choose a fleet with a cheapest backend offer.
# Fallback to autocreated fleet if fleets have no pool or backend offers.
# TODO: Consider trying all backend offers and then choosing a fleet.
candidate_fleets_with_offers: list[
tuple[
Optional[FleetModel],
list[tuple[InstanceModel, InstanceOfferWithAvailability]],
int,
tuple[int, float],
int,
tuple[int, float, float],
]
] = []
for candidate_fleet_model in fleet_models:
fleet_instances_with_offers = _get_fleet_instances_with_offers(
fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers(
fleet_model=candidate_fleet_model,
run_spec=run_spec,
job=job,
master_job_provisioning_data=master_job_provisioning_data,
volumes=volumes,
)
fleet_available_offers = [
o for _, o in fleet_instances_with_offers if o.availability.is_available()
]
fleet_has_available_capacity = nodes_required_num <= len(fleet_available_offers)
fleet_cheapest_offer = math.inf
if len(fleet_available_offers) > 0:
fleet_cheapest_offer = fleet_available_offers[0].price
fleet_priority = (not fleet_has_available_capacity, fleet_cheapest_offer)
fleet_has_available_capacity = nodes_required_num <= len(fleet_instances_with_pool_offers)
fleet_cheapest_pool_offer = math.inf
if len(fleet_instances_with_pool_offers) > 0:
fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price

candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
profile = combine_fleet_and_run_profiles(
candidate_fleet.spec.merged_profile, run_spec.merged_profile
)
fleet_requirements = get_fleet_requirements(candidate_fleet.spec)
requirements = combine_fleet_and_run_requirements(
fleet_requirements, job.job_spec.requirements
)
multinode = (
candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
or job.job_spec.jobs_per_replica > 1
)
fleet_backend_offers = []
if (
_check_can_create_new_instance_in_fleet(candidate_fleet)
and profile is not None
and requirements is not None
):
fleet_backend_offers = await get_offers_by_requirements(
project=project,
profile=profile,
requirements=requirements,
exclude_not_available=True,
multinode=multinode,
master_job_provisioning_data=master_job_provisioning_data,
volumes=volumes,
privileged=job.job_spec.privileged,
instance_mounts=check_run_spec_requires_instance_mounts(run_spec),
)

fleet_cheapest_backend_offer = math.inf
if len(fleet_backend_offers) > 0:
fleet_cheapest_backend_offer = fleet_backend_offers[0][1].price

fleet_priority = (
not fleet_has_available_capacity,
fleet_cheapest_pool_offer,
fleet_cheapest_backend_offer,
)
candidate_fleets_with_offers.append(
(
candidate_fleet_model,
fleet_instances_with_offers,
len(fleet_available_offers),
fleet_instances_with_pool_offers,
len(fleet_instances_with_pool_offers),
len(fleet_backend_offers),
fleet_priority,
)
)
if run_spec.merged_profile.fleets is None and all(
t[2] == 0 for t in candidate_fleets_with_offers
t[2] == 0 and t[3] == 0 for t in candidate_fleets_with_offers
):
# If fleets are not specified and no fleets have available offers, create a new fleet.
# If fleets are not specified and no fleets have available pool or backend offers, create a new fleet.
# This is for compatibility with non-fleet-first UX when runs created new fleets
# if there are no instances to reuse.
return None, []
Expand All @@ -573,7 +616,7 @@ def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int:
return nodes_required_num


def _get_fleet_instances_with_offers(
def _get_fleet_instances_with_pool_offers(
fleet_model: FleetModel,
run_spec: RunSpec,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
InstanceStatus,
)
from dstack._internal.core.models.profiles import Profile
from dstack._internal.core.models.resources import Range, ResourcesSpec
from dstack._internal.core.models.runs import (
JobStatus,
JobTerminationReason,
Expand Down Expand Up @@ -744,7 +745,7 @@ async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session:

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified(
async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers_if_fleets_unspecified(
self, test_db, session: AsyncSession
):
project = await create_project(session)
Expand Down Expand Up @@ -782,6 +783,58 @@ async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified(
assert job.instance_id is None
assert job.fleet_id is None

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_assigns_job_to_elastic_empty_fleet_with_backend_offers_if_fleets_unspecified(
self, test_db, session: AsyncSession
):
project = await create_project(session)
user = await create_user(session)
repo = await create_repo(session=session, project_id=project.id)
fleet_spec1 = get_fleet_spec()
fleet_spec1.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
fleet1 = await create_fleet(
session=session, project=project, spec=fleet_spec1, name="fleet"
)
# Need a second non-empty fleet to have two-stage processing
fleet_spec2 = get_fleet_spec()
# Empty resources intersection to return no backend offers
fleet_spec2.configuration.resources = ResourcesSpec(cpu=Range(min=0, max=0))
fleet2 = await create_fleet(
session=session, project=project, spec=fleet_spec2, name="fleet2"
)
await create_instance(
session=session,
project=project,
fleet=fleet2,
instance_num=0,
status=InstanceStatus.BUSY,
)
run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
)
job = await create_job(
session=session,
run=run,
instance_assigned=False,
)
aws_mock = Mock()
aws_mock.TYPE = BackendType.AWS
offer = get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0)
aws_mock.compute.return_value = Mock(spec=ComputeMockSpec)
aws_mock.compute.return_value.get_offers.return_value = [offer]
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
m.return_value = [aws_mock]
await process_submitted_jobs()
await session.refresh(job)
assert job.status == JobStatus.SUBMITTED
assert job.instance_assigned
assert job.instance_id is None
assert job.fleet_id == fleet1.id

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified(
Expand Down
Loading