Skip to content

Commit 942e444

Browse files
committed
Respect fleet specs when provisioning new instance on run apply
1 parent 407e983 commit 942e444

File tree

2 files changed

+35
-15
lines changed

2 files changed

+35
-15
lines changed

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from dstack._internal.server.services.backends import get_project_backend_by_type_or_error
5454
from dstack._internal.server.services.fleets import (
5555
fleet_model_to_fleet,
56+
get_fleet_requirements,
5657
)
5758
from dstack._internal.server.services.instances import (
5859
filter_pool_instances,
@@ -71,6 +72,10 @@
7172
from dstack._internal.server.services.locking import get_locker
7273
from dstack._internal.server.services.logging import fmt
7374
from dstack._internal.server.services.offers import get_offers_by_requirements
75+
from dstack._internal.server.services.requirements.combine import (
76+
combine_fleet_and_run_profiles,
77+
combine_fleet_and_run_requirements,
78+
)
7479
from dstack._internal.server.services.runs import (
7580
check_run_spec_requires_instance_mounts,
7681
run_model_to_run,
@@ -646,6 +651,8 @@ async def _run_job_on_new_instance(
646651
) -> Optional[Tuple[JobProvisioningData, InstanceOfferWithAvailability]]:
647652
if volumes is None:
648653
volumes = []
654+
profile = run.run_spec.merged_profile
655+
requirements = job.job_spec.requirements
649656
fleet = None
650657
if fleet_model is not None:
651658
fleet = fleet_model_to_fleet(fleet_model)
@@ -654,13 +661,26 @@ async def _run_job_on_new_instance(
654661
"%s: cannot fit new instance into fleet %s", fmt(job_model), fleet_model.name
655662
)
656663
return None
664+
profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, profile)
665+
if profile is None:
666+
logger.debug("%s: cannot combine fleet %s profile", fmt(job_model), fleet_model.name)
667+
return None
668+
fleet_requirements = get_fleet_requirements(fleet.spec)
669+
requirements = combine_fleet_and_run_requirements(fleet_requirements, requirements)
670+
if requirements is None:
671+
logger.debug(
672+
"%s: cannot combine fleet %s requirements", fmt(job_model), fleet_model.name
673+
)
674+
return None
675+
# TODO: Respect fleet provisioning properties such as tags
676+
657677
multinode = job.job_spec.jobs_per_replica > 1 or (
658678
fleet is not None and fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
659679
)
660680
offers = await get_offers_by_requirements(
661681
project=project,
662-
profile=run.run_spec.merged_profile,
663-
requirements=job.job_spec.requirements,
682+
profile=profile,
683+
requirements=requirements,
664684
exclude_not_available=True,
665685
multinode=multinode,
666686
master_job_provisioning_data=master_job_provisioning_data,

src/dstack/_internal/server/services/fleets.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ async def get_plan(
279279
offers_with_backends = await get_create_instance_offers(
280280
project=project,
281281
profile=effective_spec.merged_profile,
282-
requirements=_get_fleet_requirements(effective_spec),
282+
requirements=get_fleet_requirements(effective_spec),
283283
fleet_spec=effective_spec,
284284
blocks=effective_spec.configuration.blocks,
285285
)
@@ -458,7 +458,7 @@ async def create_fleet_instance_model(
458458
instance_num: int,
459459
) -> InstanceModel:
460460
profile = spec.merged_profile
461-
requirements = _get_fleet_requirements(spec)
461+
requirements = get_fleet_requirements(spec)
462462
instance_model = await instances_services.create_instance_model(
463463
session=session,
464464
project=project,
@@ -644,6 +644,17 @@ def is_fleet_empty(fleet_model: FleetModel) -> bool:
644644
return len(active_instances) == 0
645645

646646

647+
def get_fleet_requirements(fleet_spec: FleetSpec) -> Requirements:
648+
profile = fleet_spec.merged_profile
649+
requirements = Requirements(
650+
resources=fleet_spec.configuration.resources or ResourcesSpec(),
651+
max_price=profile.max_price,
652+
spot=get_policy_map(profile.spot_policy, default=SpotPolicy.ONDEMAND),
653+
reservation=fleet_spec.configuration.reservation,
654+
)
655+
return requirements
656+
657+
647658
async def _create_fleet(
648659
session: AsyncSession,
649660
project: ProjectModel,
@@ -1004,17 +1015,6 @@ def _terminate_fleet_instances(fleet_model: FleetModel, instance_nums: Optional[
10041015
instance.status = InstanceStatus.TERMINATING
10051016

10061017

1007-
def _get_fleet_requirements(fleet_spec: FleetSpec) -> Requirements:
1008-
profile = fleet_spec.merged_profile
1009-
requirements = Requirements(
1010-
resources=fleet_spec.configuration.resources or ResourcesSpec(),
1011-
max_price=profile.max_price,
1012-
spot=get_policy_map(profile.spot_policy, default=SpotPolicy.ONDEMAND),
1013-
reservation=fleet_spec.configuration.reservation,
1014-
)
1015-
return requirements
1016-
1017-
10181018
def _get_next_instance_num(instance_nums: set[int]) -> int:
10191019
if not instance_nums:
10201020
return 0

0 commit comments

Comments
 (0)