Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion src/dstack/_internal/cli/services/configurators/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ServerClientError,
URLNotFoundError,
)
from dstack._internal.core.models.common import ApplyAction
from dstack._internal.core.models.configurations import ApplyConfigurationType
from dstack._internal.core.models.fleets import (
Fleet,
Expand Down Expand Up @@ -72,7 +73,104 @@ def apply_configuration(
spec=spec,
)
_print_plan_header(plan)
if plan.action is not None:
self._apply_plan(plan, command_args)
else:
# Old servers don't support spec update
self._apply_plan_on_old_server(plan, command_args)

def _apply_plan(self, plan: FleetPlan, command_args: argparse.Namespace):
delete_fleet_name: Optional[str] = None
action_message = ""
confirm_message = ""
if plan.current_resource is None:
if plan.spec.configuration.name is not None:
action_message += (
f"Fleet [code]{plan.spec.configuration.name}[/] does not exist yet."
)
confirm_message += "Create the fleet?"
else:
action_message += f"Found fleet [code]{plan.spec.configuration.name}[/]."
if plan.action == ApplyAction.CREATE:
delete_fleet_name = plan.current_resource.name
action_message += (
" Configuration changes detected. Cannot update the fleet in-place"
)
confirm_message += "Re-create the fleet?"
elif plan.current_resource.spec == plan.effective_spec:
if command_args.yes and not command_args.force:
# --force is required only with --yes,
# otherwise we may ask for force apply interactively.
console.print(
"No configuration changes detected. Use --force to apply anyway."
)
return
delete_fleet_name = plan.current_resource.name
action_message += " No configuration changes detected."
confirm_message += "Re-create the fleet?"
else:
action_message += " Configuration changes detected."
confirm_message += "Update the fleet in-place?"

console.print(action_message)
if not command_args.yes and not confirm_ask(confirm_message):
console.print("\nExiting...")
return

if delete_fleet_name is not None:
with console.status("Deleting existing fleet..."):
self.api.client.fleets.delete(
project_name=self.api.project, names=[delete_fleet_name]
)
# Fleet deletion is async. Wait for fleet to be deleted.
while True:
try:
self.api.client.fleets.get(
project_name=self.api.project, name=delete_fleet_name
)
except ResourceNotExistsError:
break
else:
time.sleep(1)

try:
with console.status("Applying plan..."):
fleet = self.api.client.fleets.apply_plan(project_name=self.api.project, plan=plan)
except ServerClientError as e:
raise CLIError(e.msg)
if command_args.detach:
console.print("Fleet configuration submitted. Exiting...")
return
try:
with MultiItemStatus(
f"Provisioning [code]{fleet.name}[/]...", console=console
) as live:
while not _finished_provisioning(fleet):
table = get_fleets_table([fleet])
live.update(table)
time.sleep(LIVE_TABLE_PROVISION_INTERVAL_SECS)
fleet = self.api.client.fleets.get(self.api.project, fleet.name)
except KeyboardInterrupt:
if confirm_ask("Delete the fleet before exiting?"):
with console.status("Deleting fleet..."):
self.api.client.fleets.delete(
project_name=self.api.project, names=[fleet.name]
)
else:
console.print("Exiting... Fleet provisioning will continue in the background.")
return
console.print(
get_fleets_table(
[fleet],
verbose=_failed_provisioning(fleet),
format_date=local_time,
)
)
if _failed_provisioning(fleet):
console.print("\n[error]Some instances failed. Check the table above for errors.[/]")
exit(1)

def _apply_plan_on_old_server(self, plan: FleetPlan, command_args: argparse.Namespace):
action_message = ""
confirm_message = ""
if plan.current_resource is None:
Expand All @@ -86,7 +184,7 @@ def apply_configuration(
diff = diff_models(
old=plan.current_resource.spec.configuration,
new=plan.spec.configuration,
ignore={
reset={
"ssh_config": {
"ssh_key": True,
"proxy_jump": {"ssh_key"},
Expand Down
3 changes: 2 additions & 1 deletion src/dstack/_internal/core/models/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing_extensions import Annotated, Literal

from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.common import CoreModel
from dstack._internal.core.models.common import ApplyAction, CoreModel
from dstack._internal.core.models.envs import Env
from dstack._internal.core.models.instances import Instance, InstanceOfferWithAvailability, SSHKey
from dstack._internal.core.models.profiles import (
Expand Down Expand Up @@ -324,6 +324,7 @@ class FleetPlan(CoreModel):
offers: List[InstanceOfferWithAvailability]
total_offers: int
max_offer_price: Optional[float] = None
action: Optional[ApplyAction] = None # default value for backward compatibility

def get_effective_spec(self) -> FleetSpec:
if self.effective_spec is not None:
Expand Down
40 changes: 30 additions & 10 deletions src/dstack/_internal/core/services/diff.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional, TypedDict
from typing import Any, Optional, TypedDict, TypeVar

from pydantic import BaseModel

Expand All @@ -15,20 +15,19 @@ class ModelFieldDiff(TypedDict):

# TODO: calculate nested diffs
def diff_models(
old: BaseModel, new: BaseModel, ignore: Optional[IncludeExcludeType] = None
old: BaseModel, new: BaseModel, reset: Optional[IncludeExcludeType] = None
) -> ModelDiff:
"""
Returns a diff of model instances fields.

NOTE: `ignore` is implemented as `BaseModel.parse_obj(BaseModel.dict(exclude=ignore))`,
that is, the "ignored" fields are actually not ignored but reset to the default values
before comparison, meaning that 1) any field in `ignore` must have a default value,
2) the default value must be equal to itself (e.g. `math.nan` != `math.nan`).
The fields specified in the `reset` option are reset to their default values, effectively
excluding them from comparison (assuming that the default value is equal to itself, e.g,
`None == None`, `"task" == "task"`, but `math.nan != math.nan`).

Args:
old: The "old" model instance.
new: The "new" model instance.
ignore: Optional fields to ignore.
reset: Fields to reset to their default values before comparison.

Returns:
A dict of changed fields in the form of
Expand All @@ -37,9 +36,9 @@ def diff_models(
if type(old) is not type(new):
raise TypeError("Both instances must be of the same Pydantic model class.")

if ignore is not None:
old = type(old).parse_obj(old.dict(exclude=ignore))
new = type(new).parse_obj(new.dict(exclude=ignore))
if reset is not None:
old = copy_model(old, reset=reset)
new = copy_model(new, reset=reset)

changes: ModelDiff = {}
for field in old.__fields__:
Expand All @@ -49,3 +48,24 @@ def diff_models(
changes[field] = {"old": old_value, "new": new_value}

return changes


M = TypeVar("M", bound=BaseModel)


def copy_model(model: M, reset: Optional[IncludeExcludeType] = None) -> M:
"""
Returns a deep copy of the model instance.

Implemented as `BaseModel.parse_obj(BaseModel.dict())`, thus,
unlike `BaseModel.copy(deep=True)`, runs all validations.

The fields specified in the `reset` option are reset to their default values.

Args:
reset: Fields to reset to their default values.

Returns:
A deep copy of the model instance.
"""
return type(model).parse_obj(model.dict(exclude=reset))
Loading
Loading