From ebd0f061114a4520d015176a643000ec57f6af70 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 3 Sep 2025 14:47:59 +0500 Subject: [PATCH 1/3] Add run.fleet to API responses --- src/dstack/_internal/core/models/runs.py | 6 ++++++ .../background/tasks/process_running_jobs.py | 2 ++ .../server/background/tasks/process_runs.py | 2 ++ src/dstack/_internal/server/services/runs.py | 16 ++++++++++++++++ src/dstack/_internal/server/testing/common.py | 2 ++ src/tests/_internal/server/routers/test_runs.py | 14 ++++++++++++++ 6 files changed, 42 insertions(+) diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index c8085b55bd..0f0a87c13a 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -519,10 +519,16 @@ def is_finished(self): return self in self.finished_statuses() +class RunFleet(CoreModel): + id: UUID4 + name: str + + class Run(CoreModel): id: UUID4 project_name: str user: str + fleet: Optional[RunFleet] = None submitted_at: datetime last_processed_at: datetime status: RunStatus diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 19cb089b11..985cbfee8d 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -41,6 +41,7 @@ from dstack._internal.server.background.tasks.common import get_provisioning_timeout from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( + FleetModel, InstanceModel, JobModel, ProbeModel, @@ -151,6 +152,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): .options(joinedload(RunModel.project)) .options(joinedload(RunModel.user)) .options(joinedload(RunModel.repo)) + .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) .options(joinedload(RunModel.jobs)) ) run_model = res.unique().scalar_one() diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index e9d13a5009..03017202b4 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -21,6 +21,7 @@ ) from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( + FleetModel, InstanceModel, JobModel, ProjectModel, @@ -145,6 +146,7 @@ async def _process_run(session: AsyncSession, run_model: RunModel): .execution_options(populate_existing=True) .options(joinedload(RunModel.project).load_only(ProjectModel.id, ProjectModel.name)) .options(joinedload(RunModel.user).load_only(UserModel.name)) + .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) .options( selectinload(RunModel.jobs) .joinedload(JobModel.instance) diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 1e4a2d2ad6..a0f72ed3c3 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -43,6 +43,7 @@ JobTerminationReason, ProbeSpec, Run, + RunFleet, RunPlan, RunSpec, RunStatus, @@ -58,6 +59,7 @@ from dstack._internal.server import settings from dstack._internal.server.db import get_db from dstack._internal.server.models import ( + FleetModel, JobModel, ProbeModel, ProjectModel, @@ -227,6 +229,7 @@ async def list_projects_run_models( select(RunModel) .where(*filters) .options(joinedload(RunModel.user).load_only(UserModel.name)) + .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) .options(selectinload(RunModel.jobs).joinedload(JobModel.probes)) .order_by(*order_by) .limit(limit) @@ -269,6 +272,7 @@ async def get_run_by_name( RunModel.deleted == False, ) .options(joinedload(RunModel.user)) + .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) .options(selectinload(RunModel.jobs).joinedload(JobModel.probes)) ) run_model = res.scalar() @@ -289,6 +293,7 @@ async def get_run_by_id( RunModel.id == run_id, ) .options(joinedload(RunModel.user)) + .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) .options(selectinload(RunModel.jobs).joinedload(JobModel.probes)) ) run_model = res.scalar() @@ -709,10 +714,12 @@ def run_model_to_run( status_message = _get_run_status_message(run_model) error = _get_run_error(run_model) + fleet = _get_run_fleet(run_model) run = Run( id=run_model.id, project_name=run_model.project.name, user=run_model.user.name, + fleet=fleet, submitted_at=run_model.submitted_at, last_processed_at=run_model.last_processed_at, status=run_model.status, @@ -821,6 +828,15 @@ def _get_run_error(run_model: RunModel) -> Optional[str]: return run_model.termination_reason.to_error() +def _get_run_fleet(run_model: RunModel) -> Optional[RunFleet]: + if run_model.fleet is None: + return None + return RunFleet( + id=run_model.fleet.id, + name=run_model.fleet.name, + ) + + async def _get_pool_offers( session: AsyncSession, project: ProjectModel, diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 4a40bb9eb9..bf3fc98b68 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -285,6 +285,7 @@ async def create_run( project: ProjectModel, repo: RepoModel, user: UserModel, + fleet: Optional[FleetModel] = None, run_name: str = "test-run", status: RunStatus = RunStatus.SUBMITTED, termination_reason: Optional[RunTerminationReason] = None, @@ -310,6 +311,7 @@ async def create_run( project_id=project.id, repo_id=repo.id, user_id=user.id, + fleet_id=fleet.id if fleet else None, submitted_at=submitted_at, run_name=run_name, status=status, diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 3361f49e47..efd571ef10 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -49,6 +49,7 @@ from dstack._internal.server.services.runs import run_model_to_run from dstack._internal.server.testing.common import ( create_backend, + create_fleet, create_gateway, create_gateway_compute, create_instance, @@ -337,6 +338,7 @@ def get_dev_env_run_dict( "id": run_id, "project_name": project_name, "user": username, + "fleet": None, "submitted_at": submitted_at, "last_processed_at": last_processed_at, "status": "submitted", @@ -558,6 +560,7 @@ async def test_returns_40x_if_not_authenticated( async def test_lists_runs(self, test_db, session: AsyncSession, client: AsyncClient): user = await create_user(session=session, global_role=GlobalRole.USER) project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project) await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) @@ -571,6 +574,7 @@ async def test_lists_runs(self, test_db, session: AsyncSession, client: AsyncCli project=project, repo=repo, user=user, + fleet=fleet, submitted_at=run1_submitted_at, ) run1_spec = RunSpec.parse_raw(run1.run_spec) @@ -587,6 +591,7 @@ async def test_lists_runs(self, test_db, session: AsyncSession, client: AsyncCli project=project, repo=repo, user=user, + fleet=fleet, submitted_at=run2_submitted_at, ) run2_spec = RunSpec.parse_raw(run2.run_spec) @@ -601,6 +606,10 @@ async def test_lists_runs(self, test_db, session: AsyncSession, client: AsyncCli "id": str(run1.id), "project_name": project.name, "user": user.name, + "fleet": { + "id": str(fleet.id), + "name": fleet.name, + }, "submitted_at": run1_submitted_at.isoformat(), "last_processed_at": run1_submitted_at.isoformat(), "status": "submitted", @@ -660,6 +669,10 @@ async def test_lists_runs(self, test_db, session: AsyncSession, client: AsyncCli "id": str(run2.id), "project_name": project.name, "user": user.name, + "fleet": { + "id": str(fleet.id), + "name": fleet.name, + }, "submitted_at": run2_submitted_at.isoformat(), "last_processed_at": run2_submitted_at.isoformat(), "status": "submitted", @@ -784,6 +797,7 @@ async def test_limits_job_submissions( "id": str(run.id), "project_name": project.name, "user": user.name, + "fleet": None, "submitted_at": run_submitted_at.isoformat(), "last_processed_at": run_submitted_at.isoformat(), "status": "submitted", From 0d781bcdb3c3cb3e1dc55e8cd9d0ff010654af36 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 3 Sep 2025 15:38:05 +0500 Subject: [PATCH 2/3] Show warning when using autocreated fleets --- .../cli/services/configurators/run.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/dstack/_internal/cli/services/configurators/run.py b/src/dstack/_internal/cli/services/configurators/run.py index 3c7c03be9f..5cb15cbc09 100644 --- a/src/dstack/_internal/cli/services/configurators/run.py +++ b/src/dstack/_internal/cli/services/configurators/run.py @@ -57,6 +57,7 @@ from dstack._internal.utils.path import is_absolute_posix_path from dstack.api._public.repos import get_ssh_keypair from dstack.api._public.runs import Run +from dstack.api.server import APIClient from dstack.api.utils import load_profile _KNOWN_AMD_GPUS = {gpu.name.lower() for gpu in gpuhunt.KNOWN_AMD_GPUS} @@ -222,6 +223,9 @@ def apply_configuration( format_date=local_time, ) ) + + _warn_fleet_autocreated(self.api.client, run) + console.print( f"\n[code]{run.name}[/] provisioning completed [secondary]({run.status.value})[/]" ) @@ -865,3 +869,17 @@ def render_run_spec_diff(old_spec: RunSpec, new_spec: RunSpec) -> Optional[str]: item = NestedListItem(spec_field.replace("_", " ").capitalize()) nested_list.children.append(item) return nested_list.render() + + +def _warn_fleet_autocreated(api: APIClient, run: Run): + if run._run.fleet is None: + return + fleet = api.fleets.get(project_name=run._project, name=run._run.fleet.name) + if not fleet.spec.autocreated: + return + warn( + f"\nThe run is using automatically created fleet [code]{fleet.name}[/code].\n" + "Future [code]dstack[/code] versions will stop creating fleets automatically.\n" + "Create the fleet explicitly to remove this warning and prepare for upcoming changes.\n" + "Learn more about fleets: https://dstack.ai/docs/concepts/fleets/" + ) From 9e19d51c0e310ebe975d34e744e6dfe87b89012d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 4 Sep 2025 10:43:45 +0500 Subject: [PATCH 3/3] Update warning text --- src/dstack/_internal/cli/services/configurators/run.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/cli/services/configurators/run.py b/src/dstack/_internal/cli/services/configurators/run.py index 5cb15cbc09..757c75fcf0 100644 --- a/src/dstack/_internal/cli/services/configurators/run.py +++ b/src/dstack/_internal/cli/services/configurators/run.py @@ -878,8 +878,7 @@ def _warn_fleet_autocreated(api: APIClient, run: Run): if not fleet.spec.autocreated: return warn( - f"\nThe run is using automatically created fleet [code]{fleet.name}[/code].\n" - "Future [code]dstack[/code] versions will stop creating fleets automatically.\n" - "Create the fleet explicitly to remove this warning and prepare for upcoming changes.\n" - "Learn more about fleets: https://dstack.ai/docs/concepts/fleets/" + f"\nNo existing fleet matched, so the run created a new fleet [code]{fleet.name}[/code].\n" + "Future dstack versions won't create fleets automatically.\n" + "Create a fleet explicitly: https://dstack.ai/docs/concepts/fleets/" )