Skip to content

Commit 26bb9fc

Browse files
committed
Fix error when imported fleet has no capacity
Avoid `fleet_model_to_fleet()`, since it requires the fleet project to be loaded from the database, which is not always the case for imported fleets.
1 parent 07184b3 commit 26bb9fc

4 files changed

Lines changed: 158 additions & 27 deletions

File tree

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@
8181
from dstack._internal.server.services.backends import get_project_backend_by_type_or_error
8282
from dstack._internal.server.services.fleets import (
8383
check_can_create_new_cloud_instance_in_fleet,
84-
fleet_model_to_fleet,
8584
generate_fleet_name,
8685
get_fleet_master_instance_provisioning_data,
86+
get_fleet_spec,
8787
get_next_instance_num,
8888
is_cloud_cluster,
8989
)
@@ -580,8 +580,8 @@ async def _fetch_fleet_with_master_instance_provisioning_data(
580580
# as FleetPipeline/InstancePipeline.
581581
master_instance_provisioning_data = None
582582
if is_master_job(job) and fleet_model is not None:
583-
fleet = fleet_model_to_fleet(fleet_model)
584-
if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
583+
fleet_spec = get_fleet_spec(fleet_model)
584+
if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
585585
# To avoid violating fleet placement cluster during master provisioning,
586586
# we must lock empty fleets and respect existing instances in non-empty fleets.
587587
# On SQLite always take the lock during master provisioning for simplicity.
@@ -624,7 +624,7 @@ async def _fetch_fleet_with_master_instance_provisioning_data(
624624
fleet_model = res.unique().scalar_one()
625625
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
626626
fleet_model=fleet_model,
627-
fleet_spec=fleet.spec,
627+
fleet_spec=fleet_spec,
628628
)
629629
return master_instance_provisioning_data
630630

@@ -730,15 +730,14 @@ async def _run_jobs_on_new_instances(
730730
job = jobs[0]
731731
profile = run.run_spec.merged_profile
732732
requirements = job.job_spec.requirements
733-
fleet = None
734733
if fleet_model is not None:
735-
fleet = fleet_model_to_fleet(fleet_model)
734+
fleet_spec = get_fleet_spec(fleet_model)
736735
try:
737-
check_can_create_new_cloud_instance_in_fleet(fleet)
736+
check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec)
738737
profile, requirements = get_run_profile_and_requirements_in_fleet(
739738
job=job,
740739
run_spec=run.run_spec,
741-
fleet=fleet,
740+
fleet_spec=fleet_spec,
742741
)
743742
except ValueError as e:
744743
logger.debug("%s: %s", fmt(job_model), e.args[0])

src/dstack/_internal/server/services/fleets.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -953,23 +953,23 @@ def get_fleet_master_instance_provisioning_data(
953953
return master_instance_provisioning_data
954954

955955

956-
def can_create_new_cloud_instance_in_fleet(fleet: Fleet) -> bool:
957-
if fleet.spec.configuration.ssh_config is not None:
956+
def can_create_new_cloud_instance_in_fleet(fleet_model: FleetModel, fleet_spec: FleetSpec) -> bool:
957+
if fleet_spec.configuration.ssh_config is not None:
958958
return False
959-
active_instances = [i for i in fleet.instances if i.status.is_active()]
959+
active_instances = [i for i in fleet_model.instances if i.status.is_active()]
960960
# nodes.max is a soft limit that can be exceeded when provisioning concurrently.
961961
# The fleet consolidation logic will remove redundant nodes eventually.
962962
if (
963-
fleet.spec.configuration.nodes is not None
964-
and fleet.spec.configuration.nodes.max is not None
965-
and len(active_instances) >= fleet.spec.configuration.nodes.max
963+
fleet_spec.configuration.nodes is not None
964+
and fleet_spec.configuration.nodes.max is not None
965+
and len(active_instances) >= fleet_spec.configuration.nodes.max
966966
):
967967
return False
968968
return True
969969

970970

971-
def check_can_create_new_cloud_instance_in_fleet(fleet: Fleet):
972-
if not can_create_new_cloud_instance_in_fleet(fleet):
971+
def check_can_create_new_cloud_instance_in_fleet(fleet_model: FleetModel, fleet_spec: FleetSpec):
972+
if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec):
973973
raise ValueError("Cannot fit new cloud instance into fleet")
974974

975975

src/dstack/_internal/server/services/runs/plan.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from sqlalchemy.orm import contains_eager, joinedload, noload
77

88
from dstack._internal.core.backends.base.backend import Backend
9-
from dstack._internal.core.models.fleets import Fleet, InstanceGroupPlacement
9+
from dstack._internal.core.models.fleets import Fleet, FleetSpec, InstanceGroupPlacement
1010
from dstack._internal.core.models.instances import (
1111
InstanceAvailability,
1212
InstanceOfferWithAvailability,
@@ -34,6 +34,7 @@
3434
fleet_model_to_fleet,
3535
get_fleet_master_instance_provisioning_data,
3636
get_fleet_requirements,
37+
get_fleet_spec,
3738
)
3839
from dstack._internal.server.services.instances import (
3940
filter_pool_instances,
@@ -375,7 +376,7 @@ async def find_optimal_fleet_with_offers(
375376
backend_offers = await _get_backend_offers_in_fleet(
376377
project=project,
377378
fleet_model=candidate_fleet_model,
378-
fleet=candidate_fleet,
379+
fleet_spec=candidate_fleet.spec,
379380
run_spec=run_spec,
380381
job=job,
381382
volumes=volumes,
@@ -436,12 +437,12 @@ async def find_optimal_fleet_with_offers(
436437
def get_run_profile_and_requirements_in_fleet(
437438
job: Job,
438439
run_spec: RunSpec,
439-
fleet: Fleet,
440+
fleet_spec: FleetSpec,
440441
) -> tuple[Profile, Requirements]:
441-
profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile)
442+
profile = combine_fleet_and_run_profiles(fleet_spec.merged_profile, run_spec.merged_profile)
442443
if profile is None:
443444
raise ValueError("Cannot combine fleet profile")
444-
fleet_requirements = get_fleet_requirements(fleet.spec)
445+
fleet_requirements = get_fleet_requirements(fleet_spec)
445446
requirements = combine_fleet_and_run_requirements(
446447
fleet_requirements, job.job_spec.requirements
447448
)
@@ -547,25 +548,25 @@ async def _get_backend_offers_in_fleet(
547548
run_spec: RunSpec,
548549
job: Job,
549550
volumes: Optional[list[list[Volume]]],
550-
fleet: Optional[Fleet] = None,
551+
fleet_spec: Optional[FleetSpec] = None,
551552
max_offers: Optional[int] = None,
552553
) -> list[tuple[Backend, InstanceOfferWithAvailability]]:
553-
if fleet is None:
554-
fleet = fleet_model_to_fleet(fleet_model)
554+
if fleet_spec is None:
555+
fleet_spec = get_fleet_spec(fleet_model)
555556
try:
556-
check_can_create_new_cloud_instance_in_fleet(fleet)
557+
check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec)
557558
profile, requirements = get_run_profile_and_requirements_in_fleet(
558559
job=job,
559560
run_spec=run_spec,
560-
fleet=fleet,
561+
fleet_spec=fleet_spec,
561562
)
562563
except ValueError:
563564
backend_offers = []
564565
else:
565566
# Master job offers must be in the same cluster as existing instances.
566567
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
567568
fleet_model=fleet_model,
568-
fleet_spec=fleet.spec,
569+
fleet_spec=fleet_spec,
569570
)
570571
# Handle multinode for old jobs that don't have requirements.multinode set.
571572
# TODO: Drop multinode param.

src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,137 @@ async def test_not_assigns_job_to_foreign_fleet_if_not_imported(
470470
assert not job.instance_assigned
471471
assert job.instance is None
472472

473+
@pytest.mark.asyncio
474+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
475+
async def test_assigns_second_replica_to_same_imported_fleet(
476+
self, test_db, session: AsyncSession
477+
):
478+
user = await create_user(session, global_role=GlobalRole.USER)
479+
exporter_project = await create_project(session, name="exporter-project", owner=user)
480+
importer_project = await create_project(session, name="importer-project", owner=user)
481+
fleet = await create_fleet(
482+
session=session,
483+
project=exporter_project,
484+
spec=get_fleet_spec(get_ssh_fleet_configuration()),
485+
name="exported-fleet",
486+
)
487+
instance_0 = await create_instance(
488+
session=session,
489+
project=exporter_project,
490+
fleet=fleet,
491+
name="exported-fleet-0",
492+
status=InstanceStatus.BUSY,
493+
)
494+
instance_1 = await create_instance(
495+
session=session,
496+
project=exporter_project,
497+
fleet=fleet,
498+
name="exported-fleet-1",
499+
status=InstanceStatus.IDLE,
500+
)
501+
await create_export(
502+
session=session,
503+
exporter_project=exporter_project,
504+
importer_projects=[importer_project],
505+
exported_fleets=[fleet],
506+
)
507+
repo = await create_repo(session=session, project_id=importer_project.id)
508+
run = await create_run(
509+
session=session,
510+
project=importer_project,
511+
repo=repo,
512+
user=user,
513+
fleet=fleet,
514+
)
515+
await create_job(
516+
session=session,
517+
run=run,
518+
fleet=fleet,
519+
instance=instance_0,
520+
instance_assigned=True,
521+
status=JobStatus.RUNNING,
522+
job_num=0,
523+
replica_num=0,
524+
)
525+
job_1 = await create_job(
526+
session=session,
527+
run=run,
528+
instance_assigned=False,
529+
status=JobStatus.SUBMITTED,
530+
job_num=0,
531+
replica_num=1,
532+
)
533+
await process_submitted_jobs()
534+
await session.refresh(job_1)
535+
assert job_1.status == JobStatus.SUBMITTED
536+
assert job_1.instance_assigned
537+
assert job_1.instance_id == instance_1.id
538+
assert job_1.fleet_id == fleet.id
539+
540+
@pytest.mark.asyncio
541+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
542+
async def test_second_job_fails_if_imported_fleet_has_no_capacity(
543+
self, test_db, session: AsyncSession
544+
):
545+
user = await create_user(session, global_role=GlobalRole.USER)
546+
exporter_project = await create_project(session, name="exporter-project", owner=user)
547+
importer_project = await create_project(session, name="importer-project", owner=user)
548+
fleet = await create_fleet(
549+
session=session,
550+
project=exporter_project,
551+
spec=get_fleet_spec(get_ssh_fleet_configuration()),
552+
name="exported-fleet",
553+
)
554+
instance_0 = await create_instance(
555+
session=session,
556+
project=exporter_project,
557+
fleet=fleet,
558+
name="exported-fleet-0",
559+
status=InstanceStatus.BUSY,
560+
)
561+
await create_export(
562+
session=session,
563+
exporter_project=exporter_project,
564+
importer_projects=[importer_project],
565+
exported_fleets=[fleet],
566+
)
567+
repo = await create_repo(session=session, project_id=importer_project.id)
568+
run = await create_run(
569+
session=session,
570+
project=importer_project,
571+
repo=repo,
572+
user=user,
573+
fleet=fleet,
574+
)
575+
await create_job(
576+
session=session,
577+
run=run,
578+
fleet=fleet,
579+
instance=instance_0,
580+
instance_assigned=True,
581+
status=JobStatus.RUNNING,
582+
job_num=0,
583+
replica_num=0,
584+
)
585+
job_1 = await create_job(
586+
session=session,
587+
run=run,
588+
instance_assigned=False,
589+
status=JobStatus.SUBMITTED,
590+
job_num=0,
591+
replica_num=1,
592+
)
593+
await process_submitted_jobs()
594+
await session.refresh(job_1)
595+
assert job_1.status == JobStatus.SUBMITTED
596+
assert job_1.instance_assigned
597+
assert job_1.instance_id is None
598+
599+
await process_submitted_jobs()
600+
await session.refresh(job_1)
601+
assert job_1.status == JobStatus.TERMINATING
602+
assert job_1.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
603+
473604
@pytest.mark.asyncio
474605
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
475606
async def test_does_no_reuse_unavailable_instances(self, test_db, session: AsyncSession):

0 commit comments

Comments
 (0)