Skip to content

Commit a4ae127

Browse files
authored
Implement fleet state-spec consolidation to maintain nodes.min (#3047)
* Prototype fleet state consolidation for nodes.min * Allow specifying target in fleet nodes * Handle nodes backward compatibility * Remove TODO * Fix field docs * Test consolidation * Document fleet nodes * Fix next instance_num
1 parent a178834 commit a4ae127

15 files changed

Lines changed: 462 additions & 132 deletions

File tree

docs/docs/concepts/fleets.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,27 @@ Once the status of instances changes to `idle`, they can be used by dev environm
5959

6060
### Configuration options
6161

62+
#### Nodes { #nodes }
63+
64+
The `nodes` property controls how many instances to provision and maintain in the fleet:
65+
66+
<div editor-title=".dstack.yml">
67+
68+
```yaml
69+
type: fleet
70+
71+
name: my-fleet
72+
73+
nodes:
74+
min: 1 # Always maintain at least 1 instance
75+
target: 2 # Provision 2 instances initially
76+
max: 3 # Do not allow more than 3 instances
77+
```
78+
79+
</div>
80+
81+
`dstack` ensures the fleet always has at least `nodes.min` instances, creating new instances in the background if necessary. If you don't need to keep instances in the fleet forever, you can set `nodes.min` to `0`. By default, `dstack apply` also provisions `nodes.min` instances. The `nodes.target` property allows provisioning more instances initially than needs to be maintained.
82+
6283
#### Placement { #cloud-placement }
6384

6485
To ensure instances are interconnected (e.g., for

src/dstack/_internal/core/compatibility/fleets.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ def get_fleet_spec_excludes(fleet_spec: FleetSpec) -> Optional[IncludeExcludeDic
5959
profile_excludes.add("stop_criteria")
6060
if profile.schedule is None:
6161
profile_excludes.add("schedule")
62+
if (
63+
fleet_spec.configuration.nodes
64+
and fleet_spec.configuration.nodes.min == fleet_spec.configuration.nodes.target
65+
):
66+
configuration_excludes["nodes"] = {"target"}
6267
if configuration_excludes:
6368
spec_excludes["configuration"] = configuration_excludes
6469
if profile_excludes:

src/dstack/_internal/core/models/fleets.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
TerminationPolicy,
2020
parse_idle_duration,
2121
)
22-
from dstack._internal.core.models.resources import Range, ResourcesSpec
22+
from dstack._internal.core.models.resources import ResourcesSpec
2323
from dstack._internal.utils.common import list_enum_values_for_annotation
2424
from dstack._internal.utils.json_schema import add_extra_schema_types
2525
from dstack._internal.utils.tags import tags_validator
@@ -141,6 +141,58 @@ def validate_network(cls, value):
141141
return value
142142

143143

144+
class FleetNodesSpec(CoreModel):
145+
min: Annotated[
146+
int, Field(description=("The minimum number of instances to maintain in the fleet"))
147+
]
148+
target: Annotated[
149+
int,
150+
Field(
151+
description=(
152+
"The number of instances to provision on fleet apply. `min` <= `target` <= `max`"
153+
" Defaults to `min`"
154+
)
155+
),
156+
]
157+
max: Annotated[
158+
Optional[int],
159+
Field(
160+
description=(
161+
"The maximum number of instances allowed in the fleet. Unlimited if not specified"
162+
)
163+
),
164+
] = None
165+
166+
@root_validator(pre=True)
167+
def set_min_and_target_defaults(cls, values):
168+
min_ = values.get("min")
169+
target = values.get("target")
170+
if min_ is None:
171+
values["min"] = 0
172+
if target is None:
173+
values["target"] = values["min"]
174+
return values
175+
176+
@validator("min")
177+
def validate_min(cls, v: int) -> int:
178+
if v < 0:
179+
raise ValueError("min cannot be negative")
180+
return v
181+
182+
@root_validator(skip_on_failure=True)
183+
def _post_validate_ranges(cls, values):
184+
min_ = values["min"]
185+
target = values["target"]
186+
max_ = values.get("max")
187+
if target < min_:
188+
raise ValueError("target must not be be less than min")
189+
if max_ is not None and max_ < min_:
190+
raise ValueError("max must not be less than min")
191+
if max_ is not None and max_ < target:
192+
raise ValueError("max must not be less than target")
193+
return values
194+
195+
144196
class InstanceGroupParams(CoreModel):
145197
env: Annotated[
146198
Env,
@@ -151,7 +203,9 @@ class InstanceGroupParams(CoreModel):
151203
Field(description="The parameters for adding instances via SSH"),
152204
] = None
153205

154-
nodes: Annotated[Optional[Range[int]], Field(description="The number of instances")] = None
206+
nodes: Annotated[
207+
Optional[FleetNodesSpec], Field(description="The number of instances in cloud fleet")
208+
] = None
155209
placement: Annotated[
156210
Optional[InstanceGroupPlacement],
157211
Field(description="The placement of instances: `any` or `cluster`"),
@@ -248,6 +302,16 @@ def schema_extra(schema: Dict[str, Any], model: Type):
248302
extra_types=[{"type": "string"}],
249303
)
250304

305+
@validator("nodes", pre=True)
306+
def parse_nodes(cls, v: Optional[Union[dict, str]]) -> Optional[dict]:
307+
if isinstance(v, str) and ".." in v:
308+
v = v.replace(" ", "")
309+
min, max = v.split("..")
310+
return dict(min=min or None, max=max or None)
311+
elif isinstance(v, str) or isinstance(v, int):
312+
return dict(min=v, max=v)
313+
return v
314+
251315
_validate_idle_duration = validator("idle_duration", pre=True, allow_reuse=True)(
252316
parse_idle_duration
253317
)

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 107 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from datetime import timedelta
22
from typing import List
3+
from uuid import UUID
34

45
from sqlalchemy import select, update
56
from sqlalchemy.ext.asyncio import AsyncSession
67
from sqlalchemy.orm import joinedload, load_only
78

8-
from dstack._internal.core.models.fleets import FleetStatus
9+
from dstack._internal.core.models.fleets import FleetSpec, FleetStatus
10+
from dstack._internal.core.models.instances import InstanceStatus
911
from dstack._internal.server.db import get_db, get_session_ctx
1012
from dstack._internal.server.models import (
1113
FleetModel,
@@ -15,7 +17,9 @@
1517
RunModel,
1618
)
1719
from dstack._internal.server.services.fleets import (
20+
create_fleet_instance_model,
1821
get_fleet_spec,
22+
get_next_instance_num,
1923
is_fleet_empty,
2024
is_fleet_in_use,
2125
)
@@ -65,31 +69,111 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel])
6569
res = await session.execute(
6670
select(FleetModel)
6771
.where(FleetModel.id.in_(fleet_ids))
68-
.options(joinedload(FleetModel.instances).load_only(InstanceModel.deleted))
6972
.options(
70-
joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id)
73+
joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id),
74+
joinedload(FleetModel.project),
7175
)
7276
.options(joinedload(FleetModel.runs).load_only(RunModel.status))
7377
.execution_options(populate_existing=True)
7478
)
7579
fleet_models = list(res.unique().scalars().all())
7680

81+
# TODO: Drop fleets auto-deletion after dropping fleets auto-creation.
7782
deleted_fleets_ids = []
78-
now = get_current_datetime()
7983
for fleet_model in fleet_models:
84+
_consolidate_fleet_state_with_spec(session, fleet_model)
8085
deleted = _autodelete_fleet(fleet_model)
8186
if deleted:
8287
deleted_fleets_ids.append(fleet_model.id)
83-
fleet_model.last_processed_at = now
88+
fleet_model.last_processed_at = get_current_datetime()
89+
await _update_deleted_fleets_placement_groups(session, deleted_fleets_ids)
90+
await session.commit()
8491

85-
await session.execute(
86-
update(PlacementGroupModel)
87-
.where(
88-
PlacementGroupModel.fleet_id.in_(deleted_fleets_ids),
92+
93+
def _consolidate_fleet_state_with_spec(session: AsyncSession, fleet_model: FleetModel):
94+
if fleet_model.status == FleetStatus.TERMINATING:
95+
return
96+
fleet_spec = get_fleet_spec(fleet_model)
97+
if fleet_spec.configuration.nodes is None or fleet_spec.autocreated:
98+
# Only explicitly created cloud fleets are consolidated.
99+
return
100+
if not _is_fleet_ready_for_consolidation(fleet_model):
101+
return
102+
added_instances = _maintain_fleet_nodes_min(session, fleet_model, fleet_spec)
103+
if added_instances:
104+
fleet_model.consolidation_attempt += 1
105+
else:
106+
# The fleet is already consolidated or consolidation is in progress.
107+
# We reset consolidation_attempt in both cases for simplicity.
108+
# The second case does not need reset but is ok to do since
109+
# it means consolidation is longer than delay, so it won't happen too often.
110+
# TODO: Reset consolidation_attempt on fleet in-place update.
111+
fleet_model.consolidation_attempt = 0
112+
fleet_model.last_consolidated_at = get_current_datetime()
113+
114+
115+
def _is_fleet_ready_for_consolidation(fleet_model: FleetModel) -> bool:
116+
consolidation_retry_delay = _get_consolidation_retry_delay(fleet_model.consolidation_attempt)
117+
last_consolidated_at = fleet_model.last_consolidated_at or fleet_model.last_processed_at
118+
duration_since_last_consolidation = get_current_datetime() - last_consolidated_at
119+
return duration_since_last_consolidation >= consolidation_retry_delay
120+
121+
122+
# We use exponentially increasing consolidation retry delays so that
123+
# consolidation does not happen too often. In particular, this prevents
124+
# retrying instance provisioning constantly in case of no offers.
125+
# TODO: Adjust delays.
126+
_CONSOLIDATION_RETRY_DELAYS = [
127+
timedelta(seconds=30),
128+
timedelta(minutes=1),
129+
timedelta(minutes=2),
130+
timedelta(minutes=5),
131+
timedelta(minutes=10),
132+
]
133+
134+
135+
def _get_consolidation_retry_delay(consolidation_attempt: int) -> timedelta:
136+
if consolidation_attempt < len(_CONSOLIDATION_RETRY_DELAYS):
137+
return _CONSOLIDATION_RETRY_DELAYS[consolidation_attempt]
138+
return _CONSOLIDATION_RETRY_DELAYS[-1]
139+
140+
141+
def _maintain_fleet_nodes_min(
142+
session: AsyncSession,
143+
fleet_model: FleetModel,
144+
fleet_spec: FleetSpec,
145+
) -> bool:
146+
"""
147+
Ensures the fleet has at least `nodes.min` instances.
148+
Returns `True` if retried or added new instances and `False` otherwise.
149+
"""
150+
assert fleet_spec.configuration.nodes is not None
151+
for instance in fleet_model.instances:
152+
# Delete terminated but not deleted instances since
153+
# they are going to be replaced with new pending instances.
154+
if instance.status == InstanceStatus.TERMINATED and not instance.deleted:
155+
# It's safe to modify instances without instance lock since
156+
# no other task modifies already terminated instances.
157+
instance.deleted = True
158+
instance.deleted_at = get_current_datetime()
159+
active_instances = [i for i in fleet_model.instances if not i.deleted]
160+
active_instances_num = len(active_instances)
161+
if active_instances_num >= fleet_spec.configuration.nodes.min:
162+
return False
163+
nodes_missing = fleet_spec.configuration.nodes.min - active_instances_num
164+
for i in range(nodes_missing):
165+
instance_model = create_fleet_instance_model(
166+
session=session,
167+
project=fleet_model.project,
168+
# TODO: Store fleet.user and pass it instead of the project owner.
169+
username=fleet_model.project.owner.name,
170+
spec=fleet_spec,
171+
instance_num=get_next_instance_num({i.instance_num for i in active_instances}),
89172
)
90-
.values(fleet_deleted=True)
91-
)
92-
await session.commit()
173+
active_instances.append(instance_model)
174+
fleet_model.instances.append(instance_model)
175+
logger.info("Added %s instances to fleet %s", nodes_missing, fleet_model.name)
176+
return True
93177

94178

95179
def _autodelete_fleet(fleet_model: FleetModel) -> bool:
@@ -100,7 +184,7 @@ def _autodelete_fleet(fleet_model: FleetModel) -> bool:
100184
if (
101185
fleet_model.status != FleetStatus.TERMINATING
102186
and fleet_spec.configuration.nodes is not None
103-
and (fleet_spec.configuration.nodes.min is None or fleet_spec.configuration.nodes.min == 0)
187+
and fleet_spec.configuration.nodes.min == 0
104188
):
105189
# Empty fleets that allow 0 nodes should not be auto-deleted
106190
return False
@@ -110,3 +194,13 @@ def _autodelete_fleet(fleet_model: FleetModel) -> bool:
110194
fleet_model.deleted = True
111195
logger.info("Fleet %s deleted", fleet_model.name)
112196
return True
197+
198+
199+
async def _update_deleted_fleets_placement_groups(session: AsyncSession, fleets_ids: list[UUID]):
200+
await session.execute(
201+
update(PlacementGroupModel)
202+
.where(
203+
PlacementGroupModel.fleet_id.in_(fleets_ids),
204+
)
205+
.values(fleet_deleted=True)
206+
)

0 commit comments

Comments
 (0)