Skip to content

Commit b5e3610

Browse files
committed
Do not terminate fleet instances on idle_duration at nodes.min
1 parent 8951afb commit b5e3610

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,12 @@ async def _process_next_instance():
196196

197197

198198
async def _process_instance(session: AsyncSession, instance: InstanceModel):
199+
# Refetch to load related attributes.
200+
# Load related attributes only for statuses that always need them.
199201
if instance.status in (
200202
InstanceStatus.PENDING,
201203
InstanceStatus.TERMINATING,
202204
):
203-
# Refetch to load related attributes.
204-
# Load related attributes only for statuses that always need them.
205205
res = await session.execute(
206206
select(InstanceModel)
207207
.where(InstanceModel.id == instance.id)
@@ -211,6 +211,16 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel):
211211
.execution_options(populate_existing=True)
212212
)
213213
instance = res.unique().scalar_one()
214+
elif instance.status == InstanceStatus.IDLE:
215+
res = await session.execute(
216+
select(InstanceModel)
217+
.where(InstanceModel.id == instance.id)
218+
.options(joinedload(InstanceModel.project))
219+
.options(joinedload(InstanceModel.jobs).load_only(JobModel.id, JobModel.status))
220+
.options(joinedload(InstanceModel.fleet).joinedload(FleetModel.instances))
221+
.execution_options(populate_existing=True)
222+
)
223+
instance = res.unique().scalar_one()
214224

215225
if instance.status == InstanceStatus.PENDING:
216226
if instance.remote_connection_info is not None:
@@ -242,6 +252,14 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
242252
and not instance.jobs
243253
):
244254
return False
255+
if instance.fleet is not None and not _can_terminate_fleet_instances_on_idle_duration(
256+
instance.fleet
257+
):
258+
logger.debug(
259+
"Skipping instance %s termination on idle duration. Fleet is already at `nodes.min`.",
260+
instance.name,
261+
)
262+
return False
245263
idle_duration = _get_instance_idle_duration(instance)
246264
idle_seconds = instance.termination_idle_time
247265
delta = datetime.timedelta(seconds=idle_seconds)
@@ -261,6 +279,20 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
261279
return False
262280

263281

282+
def _can_terminate_fleet_instances_on_idle_duration(fleet_model: FleetModel) -> bool:
283+
# Do not terminate instances on idle duration if fleet is already at `nodes.min`.
284+
# This is an optimization to avoid terminate-create loop.
285+
# There may be race conditions since we don't take the fleet lock.
286+
# That's ok: in the worst case we go below `nodes.min`, but
287+
# the fleet consolidation logic will provision new nodes.
288+
fleet = fleet_model_to_fleet(fleet_model)
289+
if fleet.spec.configuration.nodes is None:
290+
return True
291+
active_instances = [i for i in fleet_model.instances if i.status.is_active()]
292+
active_instances_num = len(active_instances)
293+
return active_instances_num > fleet.spec.configuration.nodes.min
294+
295+
264296
async def _add_remote(instance: InstanceModel) -> None:
265297
logger.info("Adding ssh instance %s...", instance.name)
266298
if instance.status == InstanceStatus.PENDING:

0 commit comments

Comments
 (0)