-
-
Notifications
You must be signed in to change notification settings - Fork 223
Expand file tree
/
Copy pathfleets.py
More file actions
200 lines (180 loc) · 6.08 KB
/
fleets.py
File metadata and controls
200 lines (180 loc) · 6.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from typing import List, Optional, Tuple
from fastapi import APIRouter, Depends
from packaging.version import Version
from sqlalchemy.ext.asyncio import AsyncSession
import dstack._internal.server.services.fleets as fleets_services
from dstack._internal.core.errors import ResourceNotExistsError
from dstack._internal.core.models.fleets import Fleet, FleetPlan
from dstack._internal.server.compatibility.common import patch_offers_list
from dstack._internal.server.db import get_session
from dstack._internal.server.models import ProjectModel, UserModel
from dstack._internal.server.schemas.fleets import (
ApplyFleetPlanRequest,
CreateFleetRequest,
DeleteFleetInstancesRequest,
DeleteFleetsRequest,
GetFleetPlanRequest,
GetFleetRequest,
ListFleetsRequest,
)
from dstack._internal.server.security.permissions import Authenticated, ProjectMember
from dstack._internal.server.utils.routers import (
CustomORJSONResponse,
get_base_api_additional_responses,
get_client_version,
)
root_router = APIRouter(
prefix="/api/fleets",
tags=["fleets"],
responses=get_base_api_additional_responses(),
)
project_router = APIRouter(
prefix="/api/project/{project_name}/fleets",
tags=["fleets"],
responses=get_base_api_additional_responses(),
)
@root_router.post("/list", response_model=List[Fleet])
async def list_fleets(
body: ListFleetsRequest,
session: AsyncSession = Depends(get_session),
user: UserModel = Depends(Authenticated()),
):
"""
Returns all fleets and instances within them visible to user sorted by descending `created_at`.
`project_name` and `only_active` can be specified as filters.
The results are paginated. To get the next page, pass `created_at` and `id` of
the last fleet from the previous page as `prev_created_at` and `prev_id`.
"""
return CustomORJSONResponse(
await fleets_services.list_fleets(
session=session,
user=user,
project_name=body.project_name,
only_active=body.only_active,
prev_created_at=body.prev_created_at,
prev_id=body.prev_id,
limit=body.limit,
ascending=body.ascending,
)
)
@project_router.post("/list", response_model=List[Fleet])
async def list_project_fleets(
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
):
"""
Returns all fleets in the project.
"""
_, project = user_project
return CustomORJSONResponse(
await fleets_services.list_project_fleets(session=session, project=project)
)
@project_router.post("/get", response_model=Fleet)
async def get_fleet(
body: GetFleetRequest,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
):
"""
Returns a fleet given `name` or `id`.
If given `name`, does not return deleted fleets.
If given `id`, returns deleted fleets.
"""
_, project = user_project
fleet = await fleets_services.get_fleet(
session=session, project=project, name=body.name, fleet_id=body.id
)
if fleet is None:
raise ResourceNotExistsError()
return CustomORJSONResponse(fleet)
@project_router.post("/get_plan", response_model=FleetPlan)
async def get_plan(
body: GetFleetPlanRequest,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
client_version: Optional[Version] = Depends(get_client_version),
):
"""
Returns a fleet plan for the given fleet configuration.
"""
user, project = user_project
plan = await fleets_services.get_plan(
session=session,
project=project,
user=user,
spec=body.spec,
)
patch_offers_list(plan.offers, client_version)
return CustomORJSONResponse(plan)
@project_router.post("/apply", response_model=Fleet)
async def apply_plan(
body: ApplyFleetPlanRequest,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
):
"""
Creates a new fleet or updates an existing fleet.
Errors if the expected current resource from the plan does not match the current resource.
Use `force: true` to apply even if the current resource does not match.
"""
user, project = user_project
return CustomORJSONResponse(
await fleets_services.apply_plan(
session=session,
user=user,
project=project,
plan=body.plan,
force=body.force,
)
)
@project_router.post("/create", response_model=Fleet, deprecated=True)
async def create_fleet(
body: CreateFleetRequest,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
):
"""
Creates a fleet given a fleet configuration.
"""
user, project = user_project
return CustomORJSONResponse(
await fleets_services.create_fleet(
session=session,
project=project,
user=user,
spec=body.spec,
)
)
@project_router.post("/delete")
async def delete_fleets(
body: DeleteFleetsRequest,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
):
"""
Deletes one or more fleets.
"""
user, project = user_project
await fleets_services.delete_fleets(
session=session,
project=project,
user=user,
names=body.names,
)
@project_router.post("/delete_instances")
async def delete_fleet_instances(
body: DeleteFleetInstancesRequest,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
):
"""
Deletes one or more instances within the fleet.
"""
user, project = user_project
await fleets_services.delete_fleets(
session=session,
project=project,
user=user,
names=[body.name],
instance_nums=body.instance_nums,
)