Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/dstack/_internal/core/models/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,12 @@ class InstanceGroupParams(CoreModel):
idle_duration: Annotated[
Optional[int],
Field(
description="Time to wait before terminating idle instances. Defaults to `5m` for runs and `3d` for fleets. Use `off` for unlimited duration"
description=(
"Time to wait before terminating idle instances."
" Instances are not terminated if the fleet is already at `nodes.min`."
" Defaults to `5m` for runs and `3d` for fleets."
" Use `off` for unlimited duration"
)
),
] = None

Expand Down
4 changes: 3 additions & 1 deletion src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ class ProfileParams(CoreModel):
Field(
description=(
"Time to wait before terminating idle instances."
" Defaults to `5m` for runs and `3d` for fleets. Use `off` for unlimited duration"
" Instances are not terminated if the fleet is already at `nodes.min`."
" Defaults to `5m` for runs and `3d` for fleets."
" Use `off` for unlimited duration"
)
),
] = None
Expand Down
36 changes: 34 additions & 2 deletions src/dstack/_internal/server/background/tasks/process_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ async def _process_next_instance():


async def _process_instance(session: AsyncSession, instance: InstanceModel):
# Refetch to load related attributes.
# Load related attributes only for statuses that always need them.
if instance.status in (
InstanceStatus.PENDING,
InstanceStatus.TERMINATING,
):
# Refetch to load related attributes.
# Load related attributes only for statuses that always need them.
res = await session.execute(
select(InstanceModel)
.where(InstanceModel.id == instance.id)
Expand All @@ -211,6 +211,16 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel):
.execution_options(populate_existing=True)
)
instance = res.unique().scalar_one()
elif instance.status == InstanceStatus.IDLE:
res = await session.execute(
select(InstanceModel)
.where(InstanceModel.id == instance.id)
.options(joinedload(InstanceModel.project))
.options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status))
.options(joinedload(InstanceModel.fleet).joinedload(FleetModel.instances))
.execution_options(populate_existing=True)
)
instance = res.unique().scalar_one()

if instance.status == InstanceStatus.PENDING:
if instance.remote_connection_info is not None:
Expand Down Expand Up @@ -242,6 +252,14 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
and not instance.jobs
):
return False
if instance.fleet is not None and not _can_terminate_fleet_instances_on_idle_duration(
instance.fleet
):
logger.debug(
"Skipping instance %s termination on idle duration. Fleet is already at `nodes.min`.",
instance.name,
)
return False
idle_duration = _get_instance_idle_duration(instance)
idle_seconds = instance.termination_idle_time
delta = datetime.timedelta(seconds=idle_seconds)
Expand All @@ -261,6 +279,20 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
return False


def _can_terminate_fleet_instances_on_idle_duration(fleet_model: FleetModel) -> bool:
# Do not terminate instances on idle duration if fleet is already at `nodes.min`.
# This is an optimization to avoid terminate-create loop.
# There may be race conditions since we don't take the fleet lock.
# That's ok: in the worst case we go below `nodes.min`, but
# the fleet consolidation logic will provision new nodes.
fleet = fleet_model_to_fleet(fleet_model)
if fleet.spec.configuration.nodes is None or fleet.spec.autocreated:
return True
active_instances = [i for i in fleet_model.instances if i.status.is_active()]
active_instances_num = len(active_instances)
return active_instances_num > fleet.spec.configuration.nodes.min


async def _add_remote(instance: InstanceModel) -> None:
logger.info("Adding ssh instance %s...", instance.name)
if instance.status == InstanceStatus.PENDING:
Expand Down