Skip to content

Commit 5876f6f

Browse files
authored
Support elastic fleets (#2967)
* Do not auto-delete empty fleets that allow 0 nodes * Refactor offers filtering * Respect fleet nodes.max * Fix replicas typing * Support provisioning in empty fleets * Rebase migrations * Prioritize fleet choice by capacity and offer price * Unassign run from fleet for retrying * Comment on optimal fleet choice * Refactor fleet/instance selects to fix for update with outer join * If no fleets have available offers, create a new fleet * Revert respect nodes.max * Set nodes for autocreated fleet * Forbid new fleets creation with fleets specified * Use Range for replicas * Ensure models Config is inherited * Rebase migrations * Use fleets with no instances if fleets specified * Fix tests * Fix tests
1 parent e44751e commit 5876f6f

File tree

23 files changed

+643
-250
lines changed

23 files changed

+643
-250
lines changed

src/dstack/_internal/core/backends/aws/compute.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,7 @@ def is_suitable_placement_group(
383383
) -> bool:
384384
if not _offer_supports_placement_group(instance_offer, placement_group):
385385
return False
386-
return (
387-
placement_group.configuration.backend == BackendType.AWS
388-
and placement_group.configuration.region == instance_offer.region
389-
)
386+
return placement_group.configuration.region == instance_offer.region
390387

391388
def create_gateway(
392389
self,

src/dstack/_internal/core/backends/base/compute.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,6 @@ def is_suitable_placement_group(
263263
Checks if the instance offer can be provisioned in the placement group.
264264
265265
Should return immediately, without performing API calls.
266-
267-
Can be called with an offer originating from a different backend, because some backends
268-
(BackendType.DSTACK) produce offers on behalf of other backends. Should return `False`
269-
in that case.
270266
"""
271267
pass
272268

src/dstack/_internal/core/backends/gcp/compute.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,7 @@ def is_suitable_placement_group(
448448
placement_group: PlacementGroup,
449449
instance_offer: InstanceOffer,
450450
) -> bool:
451-
return (
452-
placement_group.configuration.backend == BackendType.GCP
453-
and placement_group.configuration.region == instance_offer.region
454-
)
451+
return placement_group.configuration.region == instance_offer.region
455452

456453
def create_gateway(
457454
self,

src/dstack/_internal/core/backends/nebius/compute.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,10 +298,7 @@ def is_suitable_placement_group(
298298
placement_group: PlacementGroup,
299299
instance_offer: InstanceOffer,
300300
) -> bool:
301-
if not (
302-
placement_group.configuration.backend == BackendType.NEBIUS
303-
and placement_group.configuration.region == instance_offer.region
304-
):
301+
if placement_group.configuration.region != instance_offer.region:
305302
return False
306303
assert placement_group.provisioning_data is not None
307304
backend_data = NebiusPlacementGroupBackendData.load(

src/dstack/_internal/core/models/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class RegistryAuth(CoreModel):
102102
password (str): The password or access token
103103
"""
104104

105-
class Config:
105+
class Config(CoreModel.Config):
106106
frozen = True
107107

108108
username: Annotated[str, Field(description="The username")]

src/dstack/_internal/core/models/configurations.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from dstack._internal.core.models.unix import UnixUser
2121
from dstack._internal.core.models.volumes import MountPoint, VolumeConfiguration, parse_mount_point
2222
from dstack._internal.utils.common import has_duplicates
23+
from dstack._internal.utils.json_schema import add_extra_schema_types
2324
from dstack._internal.utils.json_utils import (
2425
pydantic_orjson_dumps_with_indent,
2526
)
@@ -561,7 +562,7 @@ class ServiceConfigurationParams(CoreModel):
561562
)
562563
auth: Annotated[bool, Field(description="Enable the authorization")] = True
563564
replicas: Annotated[
564-
Union[conint(ge=1), constr(regex=r"^[0-9]+..[1-9][0-9]*$"), Range[int]],
565+
Range[int],
565566
Field(
566567
description="The number of replicas. Can be a number (e.g. `2`) or a range (`0..4` or `1..8`). "
567568
"If it's a range, the `scaling` property is required"
@@ -592,20 +593,13 @@ def convert_model(cls, v: Optional[Union[AnyModel, str]]) -> Optional[AnyModel]:
592593
return v
593594

594595
@validator("replicas")
595-
def convert_replicas(cls, v: Any) -> Range[int]:
596-
if isinstance(v, str) and ".." in v:
597-
min, max = v.replace(" ", "").split("..")
598-
v = Range(min=min or 0, max=max or None)
599-
elif isinstance(v, (int, float)):
600-
v = Range(min=v, max=v)
596+
def convert_replicas(cls, v: Range[int]) -> Range[int]:
601597
if v.max is None:
602598
raise ValueError("The maximum number of replicas is required")
599+
if v.min is None:
600+
v.min = 0
603601
if v.min < 0:
604602
raise ValueError("The minimum number of replicas must be greater than or equal to 0")
605-
if v.max < v.min:
606-
raise ValueError(
607-
"The maximum number of replicas must be greater than or equal to the minimum number of replicas"
608-
)
609603
return v
610604

611605
@validator("gateway")
@@ -622,9 +616,9 @@ def validate_gateway(
622616
def validate_scaling(cls, values):
623617
scaling = values.get("scaling")
624618
replicas = values.get("replicas")
625-
if replicas.min != replicas.max and not scaling:
619+
if replicas and replicas.min != replicas.max and not scaling:
626620
raise ValueError("When you set `replicas` to a range, ensure to specify `scaling`.")
627-
if replicas.min == replicas.max and scaling:
621+
if replicas and replicas.min == replicas.max and scaling:
628622
raise ValueError("To use `scaling`, `replicas` must be set to a range.")
629623
return values
630624

@@ -655,6 +649,14 @@ class ServiceConfiguration(
655649
):
656650
type: Literal["service"] = "service"
657651

652+
class Config(CoreModel.Config):
653+
@staticmethod
654+
def schema_extra(schema: Dict[str, Any]):
655+
add_extra_schema_types(
656+
schema["properties"]["replicas"],
657+
extra_types=[{"type": "integer"}, {"type": "string"}],
658+
)
659+
658660

659661
AnyRunConfiguration = Union[DevEnvironmentConfiguration, TaskConfiguration, ServiceConfiguration]
660662

@@ -715,7 +717,7 @@ class DstackConfiguration(CoreModel):
715717
Field(discriminator="type"),
716718
]
717719

718-
class Config:
720+
class Config(CoreModel.Config):
719721
json_loads = orjson.loads
720722
json_dumps = pydantic_orjson_dumps_with_indent
721723

src/dstack/_internal/core/models/fleets.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ class InstanceGroupParams(CoreModel):
234234
termination_policy: Annotated[Optional[TerminationPolicy], Field(exclude=True)] = None
235235
termination_idle_time: Annotated[Optional[Union[str, int]], Field(exclude=True)] = None
236236

237-
class Config:
237+
class Config(CoreModel.Config):
238238
@staticmethod
239239
def schema_extra(schema: Dict[str, Any], model: Type):
240240
del schema["properties"]["termination_policy"]
@@ -279,7 +279,7 @@ class FleetSpec(CoreModel):
279279
# TODO: make merged_profile a computed field after migrating to pydanticV2
280280
merged_profile: Annotated[Profile, Field(exclude=True)] = None
281281

282-
class Config:
282+
class Config(CoreModel.Config):
283283
@staticmethod
284284
def schema_extra(schema: Dict[str, Any], model: Type) -> None:
285285
prop = schema.get("properties", {})

src/dstack/_internal/core/models/instances.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class SSHConnectionParams(CoreModel):
122122
username: str
123123
port: int
124124

125-
class Config:
125+
class Config(CoreModel.Config):
126126
frozen = True
127127

128128

src/dstack/_internal/core/models/profiles.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ class ProfileParams(CoreModel):
339339
termination_policy: Annotated[Optional[TerminationPolicy], Field(exclude=True)] = None
340340
termination_idle_time: Annotated[Optional[Union[str, int]], Field(exclude=True)] = None
341341

342-
class Config:
342+
class Config(CoreModel.Config):
343343
@staticmethod
344344
def schema_extra(schema: Dict[str, Any]) -> None:
345345
del schema["properties"]["pool_name"]
@@ -379,7 +379,7 @@ class Profile(ProfileProps, ProfileParams):
379379
class ProfilesConfig(CoreModel):
380380
profiles: List[Profile]
381381

382-
class Config:
382+
class Config(CoreModel.Config):
383383
json_loads = orjson.loads
384384
json_dumps = pydantic_orjson_dumps_with_indent
385385

src/dstack/_internal/core/models/repos/remote.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class RemoteRepoCreds(CoreModel):
3232
# TODO: remove in 0.20. Left for compatibility with CLI <=0.18.44
3333
protocol: Annotated[Optional[str], Field(exclude=True)] = None
3434

35-
class Config:
35+
class Config(CoreModel.Config):
3636
@staticmethod
3737
def schema_extra(schema: Dict[str, Any]) -> None:
3838
del schema["properties"]["protocol"]
@@ -47,7 +47,7 @@ class RemoteRepoInfo(BaseRepoInfo):
4747
repo_port: Annotated[Optional[int], Field(exclude=True)] = None
4848
repo_user_name: Annotated[Optional[str], Field(exclude=True)] = None
4949

50-
class Config:
50+
class Config(BaseRepoInfo.Config):
5151
@staticmethod
5252
def schema_extra(schema: Dict[str, Any]) -> None:
5353
del schema["properties"]["repo_host_name"]

0 commit comments

Comments
 (0)