Skip to content

Commit 54d2d0a

Browse files
authored
Emit events for instance status changes (#3477)
- Emit an event on every instance status change - To make events more informative, set termination reasons whenever terminating instances - Add `terminated_by_user` termination reason - Remove redundant logging now covered by events - Refactor runtime-only status changes that were not persisted and did not affect logic - For event readability, only include the busy blocks count in job assigned/unassigned events, which is the only place where the count can change
1 parent 29076ba commit 54d2d0a

11 files changed

Lines changed: 214 additions & 220 deletions

File tree

src/dstack/_internal/core/models/instances.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def finished_statuses(cls) -> List["InstanceStatus"]:
256256

257257

258258
class InstanceTerminationReason(str, Enum):
259+
TERMINATED_BY_USER = "terminated_by_user"
259260
IDLE_TIMEOUT = "idle_timeout"
260261
PROVISIONING_TIMEOUT = "provisioning_timeout"
261262
ERROR = "error"

src/dstack/_internal/server/background/tasks/process_compute_groups.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from dstack._internal.server.services import backends as backends_services
1919
from dstack._internal.server.services.compute_groups import compute_group_model_to_compute_group
20+
from dstack._internal.server.services.instances import switch_instance_status
2021
from dstack._internal.server.services.locking import get_locker
2122
from dstack._internal.server.utils import sentry_utils
2223
from dstack._internal.utils.common import get_current_datetime, run_async
@@ -83,12 +84,14 @@ async def _process_compute_group(session: AsyncSession, compute_group_model: Com
8384
)
8485
compute_group_model = res.unique().scalar_one()
8586
if all(i.status == InstanceStatus.TERMINATING for i in compute_group_model.instances):
86-
await _terminate_compute_group(compute_group_model)
87+
await _terminate_compute_group(session, compute_group_model)
8788
compute_group_model.last_processed_at = get_current_datetime()
8889
await session.commit()
8990

9091

91-
async def _terminate_compute_group(compute_group_model: ComputeGroupModel) -> None:
92+
async def _terminate_compute_group(
93+
session: AsyncSession, compute_group_model: ComputeGroupModel
94+
) -> None:
9295
if (
9396
compute_group_model.last_termination_retry_at is not None
9497
and _next_termination_retry_at(compute_group_model) > get_current_datetime()
@@ -147,7 +150,7 @@ async def _terminate_compute_group(compute_group_model: ComputeGroupModel) -> No
147150
instance_model.deleted = True
148151
instance_model.deleted_at = get_current_datetime()
149152
instance_model.finished_at = get_current_datetime()
150-
instance_model.status = InstanceStatus.TERMINATED
153+
switch_instance_status(session, instance_model, InstanceStatus.TERMINATED)
151154
logger.info(
152155
"Terminated compute group %s",
153156
compute_group.name,

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
is_fleet_in_use,
2727
switch_fleet_status,
2828
)
29-
from dstack._internal.server.services.instances import format_instance_status_for_event
29+
from dstack._internal.server.services.instances import switch_instance_status
3030
from dstack._internal.server.services.locking import get_locker
3131
from dstack._internal.server.utils import sentry_utils
3232
from dstack._internal.utils.common import get_current_datetime
@@ -219,15 +219,10 @@ def _maintain_fleet_nodes_in_min_max_range(
219219
if nodes_redundant == 0:
220220
break
221221
if instance.status in [InstanceStatus.IDLE]:
222-
instance.status = InstanceStatus.TERMINATING
223222
instance.termination_reason = InstanceTerminationReason.MAX_INSTANCES_LIMIT
224223
instance.termination_reason_message = "Fleet has too many instances"
224+
switch_instance_status(session, instance, InstanceStatus.TERMINATING)
225225
nodes_redundant -= 1
226-
logger.info(
227-
"Terminating instance %s: %s",
228-
instance.name,
229-
instance.termination_reason,
230-
)
231226
return True
232227
nodes_missing = fleet_spec.configuration.nodes.min - active_instances_num
233228
for i in range(nodes_missing):
@@ -243,7 +238,7 @@ def _maintain_fleet_nodes_in_min_max_range(
243238
session,
244239
(
245240
"Instance created to meet target fleet node count."
246-
f" Status: {format_instance_status_for_event(instance_model)}"
241+
f" Status: {instance_model.status.upper()}"
247242
),
248243
actor=events.SystemActor(),
249244
targets=[events.Target.from_model(instance_model)],

0 commit comments

Comments
 (0)