Skip to content

Commit cd27344

Browse files
authored
Fix error submitting run to empty imported fleet (#3661)
Avoid `fleet_model_to_fleet()`, since it requires the fleet project to be loaded from the database, which is not always the case for imported fleets. Now, `fleet_model_to_fleet()` is only used in API-related services. Also stop loading the fleet project model where it was previously necessary only for the redundant `fleet_model_to_fleet()` call.
1 parent b6fb321 commit cd27344

File tree

4 files changed

+103
-28
lines changed

4 files changed

+103
-28
lines changed

src/dstack/_internal/server/background/scheduled_tasks/instances.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@
7777
from dstack._internal.server.services import backends as backends_services
7878
from dstack._internal.server.services import events
7979
from dstack._internal.server.services.fleets import (
80-
fleet_model_to_fleet,
8180
get_create_instance_offers,
81+
get_fleet_spec,
8282
is_cloud_cluster,
8383
)
8484
from dstack._internal.server.services.instances import (
@@ -310,12 +310,12 @@ def _can_terminate_fleet_instances_on_idle_duration(fleet_model: FleetModel) ->
310310
# There may be race conditions since we don't take the fleet lock.
311311
# That's ok: in the worst case we go below `nodes.min`, but
312312
# the fleet consolidation logic will provision new nodes.
313-
fleet = fleet_model_to_fleet(fleet_model)
314-
if fleet.spec.configuration.nodes is None or fleet.spec.autocreated:
313+
fleet_spec = get_fleet_spec(fleet_model)
314+
if fleet_spec.configuration.nodes is None or fleet_spec.autocreated:
315315
return True
316316
active_instances = [i for i in fleet_model.instances if i.status.is_active()]
317317
active_instances_num = len(active_instances)
318-
return active_instances_num > fleet.spec.configuration.nodes.min
318+
return active_instances_num > fleet_spec.configuration.nodes.min
319319

320320

321321
async def _add_remote(session: AsyncSession, instance: InstanceModel) -> None:
@@ -1223,8 +1223,8 @@ def _get_instance_offer_for_instance(
12231223
) -> InstanceOfferWithAvailability:
12241224
if instance.fleet is None:
12251225
return instance_offer
1226-
fleet = fleet_model_to_fleet(instance.fleet)
1227-
if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
1226+
fleet_spec = get_fleet_spec(instance.fleet)
1227+
if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
12281228
master_job_provisioning_data = get_instance_provisioning_data(master_instance)
12291229
return get_instance_offer_with_restricted_az(
12301230
instance_offer=instance_offer,

src/dstack/_internal/server/services/runs/plan.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
from sqlalchemy import and_, exists, not_, or_, select
55
from sqlalchemy.ext.asyncio import AsyncSession
6-
from sqlalchemy.orm import contains_eager, joinedload, noload
6+
from sqlalchemy.orm import contains_eager, noload
77

88
from dstack._internal.core.backends.base.backend import Backend
9-
from dstack._internal.core.models.fleets import Fleet, FleetSpec, InstanceGroupPlacement
9+
from dstack._internal.core.models.fleets import FleetSpec, InstanceGroupPlacement
1010
from dstack._internal.core.models.instances import (
1111
InstanceAvailability,
1212
InstanceOfferWithAvailability,
@@ -31,7 +31,6 @@
3131
)
3232
from dstack._internal.server.services.fleets import (
3333
check_can_create_new_cloud_instance_in_fleet,
34-
fleet_model_to_fleet,
3534
get_fleet_master_instance_provisioning_data,
3635
get_fleet_requirements,
3736
get_fleet_spec,
@@ -251,12 +250,7 @@ async def select_run_candidate_fleet_models_with_filters(
251250
.join(FleetModel.instances)
252251
.where(*fleet_filters)
253252
.where(*instance_filters)
254-
.options(
255-
contains_eager(FleetModel.instances),
256-
joinedload(FleetModel.project)
257-
.load_only(ProjectModel.name)
258-
.joinedload(ProjectModel.backends),
259-
)
253+
.options(contains_eager(FleetModel.instances))
260254
.execution_options(populate_existing=True)
261255
)
262256
if lock_instances:
@@ -341,18 +335,18 @@ async def find_optimal_fleet_with_offers(
341335
]
342336
] = []
343337
for candidate_fleet_model in fleet_models:
344-
candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
338+
candidate_fleet_spec = get_fleet_spec(candidate_fleet_model)
345339
if (
346340
is_multinode_job(job)
347-
and candidate_fleet.spec.configuration.placement != InstanceGroupPlacement.CLUSTER
341+
and candidate_fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER
348342
):
349343
# Limit multinode runs to cluster fleets to guarantee best connectivity.
350344
continue
351345

352-
if not _run_can_fit_into_fleet(run_spec, candidate_fleet):
346+
if not _run_can_fit_into_fleet(run_spec, candidate_fleet_model, candidate_fleet_spec):
353347
logger.debug(
354348
"Skipping fleet %s from consideration: run cannot fit into fleet",
355-
candidate_fleet.name,
349+
candidate_fleet_model.name,
356350
)
357351
continue
358352

@@ -376,7 +370,7 @@ async def find_optimal_fleet_with_offers(
376370
backend_offers = await _get_backend_offers_in_fleet(
377371
project=project,
378372
fleet_model=candidate_fleet_model,
379-
fleet_spec=candidate_fleet.spec,
373+
fleet_spec=candidate_fleet_spec,
380374
run_spec=run_spec,
381375
job=job,
382376
volumes=volumes,
@@ -509,7 +503,9 @@ def _get_instance_offers_in_fleet(
509503
return instances_with_offers
510504

511505

512-
def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool:
506+
def _run_can_fit_into_fleet(
507+
run_spec: RunSpec, fleet_model: FleetModel, fleet_spec: FleetSpec
508+
) -> bool:
513509
"""
514510
Returns `False` if the run cannot fit into fleet for sure.
515511
This is helpful heuristic to avoid even considering fleets too small for a run.
@@ -522,19 +518,19 @@ def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool:
522518
# how many jobs such fleets can accommodate.
523519
nodes_required_num = get_nodes_required_num(run_spec)
524520
if (
525-
fleet.spec.configuration.nodes is not None
526-
and fleet.spec.configuration.blocks == 1
527-
and fleet.spec.configuration.nodes.max is not None
521+
fleet_spec.configuration.nodes is not None
522+
and fleet_spec.configuration.blocks == 1
523+
and fleet_spec.configuration.nodes.max is not None
528524
):
529-
busy_instances = [i for i in fleet.instances if i.busy_blocks > 0]
530-
fleet_available_capacity = fleet.spec.configuration.nodes.max - len(busy_instances)
525+
busy_instances = [i for i in fleet_model.instances if i.busy_blocks > 0]
526+
fleet_available_capacity = fleet_spec.configuration.nodes.max - len(busy_instances)
531527
if fleet_available_capacity < nodes_required_num:
532528
return False
533-
elif fleet.spec.configuration.ssh_config is not None:
529+
elif fleet_spec.configuration.ssh_config is not None:
534530
# Currently assume that each idle block can run a job.
535531
# TODO: Take resources / eligible offers into account.
536532
total_idle_blocks = 0
537-
for instance in fleet.instances:
533+
for instance in fleet_model.instances:
538534
total_blocks = instance.total_blocks or 1
539535
total_idle_blocks += total_blocks - instance.busy_blocks
540536
if total_idle_blocks < nodes_required_num:

src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,42 @@ async def test_not_assigns_job_to_foreign_fleet_if_not_imported(
464464
assert not job.instance_assigned
465465
assert job.instance is None
466466

467+
@pytest.mark.asyncio
468+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
469+
async def test_job_fails_if_imported_ssh_fleet_is_empty(self, test_db, session: AsyncSession):
470+
user = await create_user(session, global_role=GlobalRole.USER)
471+
exporter_project = await create_project(session, name="exporter-project", owner=user)
472+
importer_project = await create_project(session, name="importer-project", owner=user)
473+
fleet = await create_fleet(
474+
session=session,
475+
project=exporter_project,
476+
spec=get_fleet_spec(get_ssh_fleet_configuration()),
477+
name="exported-fleet",
478+
)
479+
await create_export(
480+
session=session,
481+
exporter_project=exporter_project,
482+
importer_projects=[importer_project],
483+
exported_fleets=[fleet],
484+
)
485+
repo = await create_repo(session=session, project_id=importer_project.id)
486+
run = await create_run(
487+
session=session,
488+
project=importer_project,
489+
repo=repo,
490+
user=user,
491+
)
492+
job = await create_job(
493+
session=session,
494+
run=run,
495+
instance_assigned=False,
496+
status=JobStatus.SUBMITTED,
497+
)
498+
await process_submitted_jobs()
499+
await session.refresh(job)
500+
assert job.status == JobStatus.TERMINATING
501+
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
502+
467503
@pytest.mark.asyncio
468504
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
469505
async def test_assigns_second_replica_to_same_imported_fleet(

src/tests/_internal/server/routers/test_runs.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,6 +1436,49 @@ async def test_returns_run_plan_with_offer_from_imported_fleet(
14361436
assert response_json["project_name"] == "importer-project"
14371437
assert response_json["job_plans"][0]["offers"][0]["backend"] == "remote"
14381438

1439+
@pytest.mark.asyncio
1440+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
1441+
async def test_returns_no_offers_if_imported_ssh_fleet_is_empty(
1442+
self,
1443+
test_db,
1444+
session: AsyncSession,
1445+
client: AsyncClient,
1446+
) -> None:
1447+
importer_user = await create_user(session, global_role=GlobalRole.USER)
1448+
exporter_project = await create_project(session, name="exporter-project")
1449+
importer_project = await create_project(
1450+
session, name="importer-project", owner=importer_user
1451+
)
1452+
await add_project_member(
1453+
session=session,
1454+
project=importer_project,
1455+
user=importer_user,
1456+
project_role=ProjectRole.USER,
1457+
)
1458+
fleet = await create_fleet(
1459+
session=session,
1460+
project=exporter_project,
1461+
spec=get_fleet_spec(get_ssh_fleet_configuration()),
1462+
)
1463+
await create_export(
1464+
session=session,
1465+
exporter_project=exporter_project,
1466+
importer_projects=[importer_project],
1467+
exported_fleets=[fleet],
1468+
)
1469+
1470+
run_spec = {"configuration": {"type": "dev-environment", "ide": "vscode"}}
1471+
body = {"run_spec": run_spec}
1472+
response = await client.post(
1473+
"/api/project/importer-project/runs/get_plan",
1474+
headers=get_auth_headers(importer_user.token),
1475+
json=body,
1476+
)
1477+
assert response.status_code == 200, response.json()
1478+
response_json = response.json()
1479+
assert response_json["project_name"] == "importer-project"
1480+
assert len(response_json["job_plans"][0]["offers"]) == 0
1481+
14391482
@pytest.mark.parametrize(
14401483
("client_version", "expected_availability"),
14411484
[

0 commit comments

Comments
 (0)