Skip to content

Commit 8ee924e

Browse files
Replace Instance.termination_reason values with codes (#3187)
Co-authored-by: Jvst Me <git@jvst.me>
1 parent 018b40e commit 8ee924e

9 files changed

Lines changed: 225 additions & 38 deletions

File tree

src/dstack/_internal/core/models/instances.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
from dstack._internal.core.models.health import HealthStatus
1616
from dstack._internal.core.models.volumes import Volume
1717
from dstack._internal.utils.common import pretty_resources
18+
from dstack._internal.utils.logging import get_logger
19+
20+
logger = get_logger(__name__)
1821

1922

2023
class Gpu(CoreModel):
@@ -254,6 +257,70 @@ def finished_statuses(cls) -> List["InstanceStatus"]:
254257
return [cls.TERMINATING, cls.TERMINATED]
255258

256259

260+
class InstanceTerminationReason(str, Enum):
261+
IDLE_TIMEOUT = "idle_timeout"
262+
PROVISIONING_TIMEOUT = "provisioning_timeout"
263+
ERROR = "error"
264+
JOB_FINISHED = "job_finished"
265+
UNREACHABLE = "unreachable"
266+
NO_OFFERS = "no_offers"
267+
MASTER_FAILED = "master_failed"
268+
MAX_INSTANCES_LIMIT = "max_instances_limit"
269+
NO_BALANCE = "no_balance" # used in dstack Sky
270+
271+
@classmethod
272+
def from_legacy_str(cls, v: str) -> "InstanceTerminationReason":
273+
"""
274+
Convert legacy termination reason string to relevant termination reason enum.
275+
276+
dstack versions prior to 0.20.1 represented instance termination reasons as raw
277+
strings. Such strings may still be stored in the database.
278+
"""
279+
280+
if v == "Idle timeout":
281+
return cls.IDLE_TIMEOUT
282+
if v in (
283+
"Instance has not become running in time",
284+
"Provisioning timeout expired",
285+
"Proivisioning timeout expired", # typo is intentional
286+
"The proivisioning timeout expired", # typo is intentional
287+
):
288+
return cls.PROVISIONING_TIMEOUT
289+
if v in (
290+
"Unsupported private SSH key type",
291+
"Failed to locate internal IP address on the given network",
292+
"Specified internal IP not found among instance interfaces",
293+
"Cannot split into blocks",
294+
"Backend not available",
295+
"Error while waiting for instance to become running",
296+
"Empty profile, requirements or instance_configuration",
297+
"Unable to locate the internal ip-address for the given network",
298+
"Private SSH key is encrypted, password required",
299+
"Cannot parse private key, key type is not supported",
300+
) or v.startswith("Error to parse profile, requirements or instance_configuration:"):
301+
return cls.ERROR
302+
if v in (
303+
"All offers failed",
304+
"No offers found",
305+
"There were no offers found",
306+
"Retry duration expired",
307+
"The retry's duration expired",
308+
):
309+
return cls.NO_OFFERS
310+
if v == "Master instance failed to start":
311+
return cls.MASTER_FAILED
312+
if v == "Instance job finished":
313+
return cls.JOB_FINISHED
314+
if v == "Termination deadline":
315+
return cls.UNREACHABLE
316+
if v == "Fleet has too many instances":
317+
return cls.MAX_INSTANCES_LIMIT
318+
if v == "Low account balance":
319+
return cls.NO_BALANCE
320+
logger.warning("Unexpected instance termination reason string: %r", v)
321+
return cls.ERROR
322+
323+
257324
class Instance(CoreModel):
258325
id: UUID
259326
project_name: str
@@ -268,7 +335,10 @@ class Instance(CoreModel):
268335
status: InstanceStatus
269336
unreachable: bool = False
270337
health_status: HealthStatus = HealthStatus.HEALTHY
338+
# termination_reason stores InstanceTerminationReason.
339+
# str allows adding new enum members without breaking compatibility with old clients.
271340
termination_reason: Optional[str] = None
341+
termination_reason_message: Optional[str] = None
272342
created: datetime.datetime
273343
region: Optional[str] = None
274344
availability_zone: Optional[str] = None

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from sqlalchemy.orm import joinedload, load_only, selectinload
99

1010
from dstack._internal.core.models.fleets import FleetSpec, FleetStatus
11-
from dstack._internal.core.models.instances import InstanceStatus
11+
from dstack._internal.core.models.instances import InstanceStatus, InstanceTerminationReason
1212
from dstack._internal.server.db import get_db, get_session_ctx
1313
from dstack._internal.server.models import (
1414
FleetModel,
@@ -213,7 +213,8 @@ def _maintain_fleet_nodes_in_min_max_range(
213213
break
214214
if instance.status in [InstanceStatus.IDLE]:
215215
instance.status = InstanceStatus.TERMINATING
216-
instance.termination_reason = "Fleet has too many instances"
216+
instance.termination_reason = InstanceTerminationReason.MAX_INSTANCES_LIMIT
217+
instance.termination_reason_message = "Fleet has too many instances"
217218
nodes_redundant -= 1
218219
logger.info(
219220
"Terminating instance %s: %s",

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
InstanceOfferWithAvailability,
4848
InstanceRuntime,
4949
InstanceStatus,
50+
InstanceTerminationReason,
5051
RemoteConnectionInfo,
5152
SSHKey,
5253
)
@@ -274,7 +275,7 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
274275
delta = datetime.timedelta(seconds=idle_seconds)
275276
if idle_duration > delta:
276277
instance.status = InstanceStatus.TERMINATING
277-
instance.termination_reason = "Idle timeout"
278+
instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT
278279
logger.info(
279280
"Instance %s idle duration expired: idle time %ss. Terminating",
280281
instance.name,
@@ -310,7 +311,7 @@ async def _add_remote(instance: InstanceModel) -> None:
310311
retry_duration_deadline = instance.created_at + timedelta(seconds=PROVISIONING_TIMEOUT_SECONDS)
311312
if retry_duration_deadline < get_current_datetime():
312313
instance.status = InstanceStatus.TERMINATED
313-
instance.termination_reason = "Provisioning timeout expired"
314+
instance.termination_reason = InstanceTerminationReason.PROVISIONING_TIMEOUT
314315
logger.warning(
315316
"Failed to start instance %s in %d seconds. Terminating...",
316317
instance.name,
@@ -333,7 +334,8 @@ async def _add_remote(instance: InstanceModel) -> None:
333334
ssh_proxy_pkeys = None
334335
except (ValueError, PasswordRequiredException):
335336
instance.status = InstanceStatus.TERMINATED
336-
instance.termination_reason = "Unsupported private SSH key type"
337+
instance.termination_reason = InstanceTerminationReason.ERROR
338+
instance.termination_reason_message = "Unsupported private SSH key type"
337339
logger.warning(
338340
"Failed to add instance %s: unsupported private SSH key type",
339341
instance.name,
@@ -391,7 +393,10 @@ async def _add_remote(instance: InstanceModel) -> None:
391393
)
392394
if instance_network is not None and internal_ip is None:
393395
instance.status = InstanceStatus.TERMINATED
394-
instance.termination_reason = "Failed to locate internal IP address on the given network"
396+
instance.termination_reason = InstanceTerminationReason.ERROR
397+
instance.termination_reason_message = (
398+
"Failed to locate internal IP address on the given network"
399+
)
395400
logger.warning(
396401
"Failed to add instance %s: failed to locate internal IP address on the given network",
397402
instance.name,
@@ -404,7 +409,8 @@ async def _add_remote(instance: InstanceModel) -> None:
404409
if internal_ip is not None:
405410
if not is_ip_among_addresses(ip_address=internal_ip, addresses=host_network_addresses):
406411
instance.status = InstanceStatus.TERMINATED
407-
instance.termination_reason = (
412+
instance.termination_reason = InstanceTerminationReason.ERROR
413+
instance.termination_reason_message = (
408414
"Specified internal IP not found among instance interfaces"
409415
)
410416
logger.warning(
@@ -426,7 +432,8 @@ async def _add_remote(instance: InstanceModel) -> None:
426432
instance.total_blocks = blocks
427433
else:
428434
instance.status = InstanceStatus.TERMINATED
429-
instance.termination_reason = "Cannot split into blocks"
435+
instance.termination_reason = InstanceTerminationReason.ERROR
436+
instance.termination_reason_message = "Cannot split into blocks"
430437
logger.warning(
431438
"Failed to add instance %s: cannot split into blocks",
432439
instance.name,
@@ -545,7 +552,8 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
545552
requirements = get_instance_requirements(instance)
546553
except ValidationError as e:
547554
instance.status = InstanceStatus.TERMINATED
548-
instance.termination_reason = (
555+
instance.termination_reason = InstanceTerminationReason.ERROR
556+
instance.termination_reason_message = (
549557
f"Error to parse profile, requirements or instance_configuration: {e}"
550558
)
551559
logger.warning(
@@ -671,19 +679,28 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
671679
)
672680
return
673681

674-
_mark_terminated(instance, "All offers failed" if offers else "No offers found")
682+
_mark_terminated(
683+
instance,
684+
InstanceTerminationReason.NO_OFFERS,
685+
"All offers failed" if offers else "No offers found",
686+
)
675687
if instance.fleet and is_fleet_master_instance(instance) and is_cloud_cluster(instance.fleet):
676688
# Do not attempt to deploy other instances, as they won't determine the correct cluster
677689
# backend, region, and placement group without a successfully deployed master instance
678690
for sibling_instance in instance.fleet.instances:
679691
if sibling_instance.id == instance.id:
680692
continue
681-
_mark_terminated(sibling_instance, "Master instance failed to start")
693+
_mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED)
682694

683695

684-
def _mark_terminated(instance: InstanceModel, termination_reason: str) -> None:
696+
def _mark_terminated(
697+
instance: InstanceModel,
698+
termination_reason: InstanceTerminationReason,
699+
termination_reason_message: Optional[str] = None,
700+
) -> None:
685701
instance.status = InstanceStatus.TERMINATED
686702
instance.termination_reason = termination_reason
703+
instance.termination_reason_message = termination_reason_message
687704
logger.info(
688705
"Terminated instance %s: %s",
689706
instance.name,
@@ -703,7 +720,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
703720
):
704721
# A busy instance could have no active jobs due to this bug: https://github.com/dstackai/dstack/issues/2068
705722
instance.status = InstanceStatus.TERMINATING
706-
instance.termination_reason = "Instance job finished"
723+
instance.termination_reason = InstanceTerminationReason.JOB_FINISHED
707724
logger.info(
708725
"Detected busy instance %s with finished job. Marked as TERMINATING",
709726
instance.name,
@@ -832,7 +849,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
832849
deadline = instance.termination_deadline
833850
if get_current_datetime() > deadline:
834851
instance.status = InstanceStatus.TERMINATING
835-
instance.termination_reason = "Termination deadline"
852+
instance.termination_reason = InstanceTerminationReason.UNREACHABLE
836853
logger.warning(
837854
"Instance %s shim waiting timeout. Marked as TERMINATING",
838855
instance.name,
@@ -861,7 +878,8 @@ async def _wait_for_instance_provisioning_data(
861878
"Instance %s failed because instance has not become running in time", instance.name
862879
)
863880
instance.status = InstanceStatus.TERMINATING
864-
instance.termination_reason = "Instance has not become running in time"
881+
instance.termination_reason = InstanceTerminationReason.PROVISIONING_TIMEOUT
882+
instance.termination_reason_message = "Backend did not complete provisioning in time"
865883
return
866884

867885
backend = await backends_services.get_project_backend_by_type(
@@ -874,7 +892,8 @@ async def _wait_for_instance_provisioning_data(
874892
instance.name,
875893
)
876894
instance.status = InstanceStatus.TERMINATING
877-
instance.termination_reason = "Backend not available"
895+
instance.termination_reason = InstanceTerminationReason.ERROR
896+
instance.termination_reason_message = "Backend not available"
878897
return
879898
try:
880899
await run_async(
@@ -891,7 +910,8 @@ async def _wait_for_instance_provisioning_data(
891910
repr(e),
892911
)
893912
instance.status = InstanceStatus.TERMINATING
894-
instance.termination_reason = "Error while waiting for instance to become running"
913+
instance.termination_reason = InstanceTerminationReason.ERROR
914+
instance.termination_reason_message = "Error while waiting for instance to become running"
895915
except Exception:
896916
logger.exception(
897917
"Got exception when updating instance %s provisioning data", instance.name
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""Add instances.termination_reason_message
2+
3+
Revision ID: 903c91e24634
4+
Revises: 1aa9638ad963
5+
Create Date: 2025-12-22 12:17:58.573457
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "903c91e24634"
14+
down_revision = "1aa9638ad963"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade() -> None:
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
with op.batch_alter_table("instances", schema=None) as batch_op:
22+
batch_op.add_column(
23+
sa.Column("termination_reason_message", sa.String(length=4000), nullable=True)
24+
)
25+
26+
# ### end Alembic commands ###
27+
28+
29+
def downgrade() -> None:
30+
# ### commands auto generated by Alembic - please adjust! ###
31+
with op.batch_alter_table("instances", schema=None) as batch_op:
32+
batch_op.drop_column("termination_reason_message")
33+
34+
# ### end Alembic commands ###

src/dstack/_internal/server/models.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import enum
22
import uuid
33
from datetime import datetime, timezone
4-
from typing import Callable, List, Optional, Union
4+
from typing import Callable, Generic, List, Optional, TypeVar, Union
55

66
from sqlalchemy import (
77
BigInteger,
@@ -30,7 +30,7 @@
3030
from dstack._internal.core.models.fleets import FleetStatus
3131
from dstack._internal.core.models.gateways import GatewayStatus
3232
from dstack._internal.core.models.health import HealthStatus
33-
from dstack._internal.core.models.instances import InstanceStatus
33+
from dstack._internal.core.models.instances import InstanceStatus, InstanceTerminationReason
3434
from dstack._internal.core.models.profiles import (
3535
DEFAULT_FLEET_TERMINATION_IDLE_TIME,
3636
TerminationPolicy,
@@ -141,26 +141,45 @@ def process_result_value(self, value: Optional[str], dialect) -> Optional[Decryp
141141
return DecryptedString(plaintext=None, decrypted=False, exc=e)
142142

143143

144-
class EnumAsString(TypeDecorator):
144+
E = TypeVar("E", bound=enum.Enum)
145+
146+
147+
class EnumAsString(TypeDecorator, Generic[E]):
145148
"""
146149
A custom type decorator that stores enums as strings in the DB.
147150
"""
148151

149152
impl = String
150153
cache_ok = True
151154

152-
def __init__(self, enum_class: type[enum.Enum], *args, **kwargs):
155+
def __init__(
156+
self,
157+
enum_class: type[E],
158+
*args,
159+
fallback_deserializer: Optional[Callable[[str], E]] = None,
160+
**kwargs,
161+
):
162+
"""
163+
Args:
164+
enum_class: The enum class to be stored.
165+
fallback_deserializer: An optional function used when the string
166+
from the DB does not match any enum member name. If not
167+
provided, an exception will be raised in such cases.
168+
"""
153169
self.enum_class = enum_class
170+
self.fallback_deserializer = fallback_deserializer
154171
super().__init__(*args, **kwargs)
155172

156-
def process_bind_param(self, value: Optional[enum.Enum], dialect) -> Optional[str]:
173+
def process_bind_param(self, value: Optional[E], dialect) -> Optional[str]:
157174
if value is None:
158175
return None
159176
return value.name
160177

161-
def process_result_value(self, value: Optional[str], dialect) -> Optional[enum.Enum]:
178+
def process_result_value(self, value: Optional[str], dialect) -> Optional[E]:
162179
if value is None:
163180
return None
181+
if value not in self.enum_class.__members__ and self.fallback_deserializer is not None:
182+
return self.fallback_deserializer(value)
164183
return self.enum_class[value]
165184

166185

@@ -641,7 +660,17 @@ class InstanceModel(BaseModel):
641660

642661
# instance termination handling
643662
termination_deadline: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
644-
termination_reason: Mapped[Optional[str]] = mapped_column(String(4000))
663+
# dstack versions prior to 0.20.1 represented instance termination reasons as raw strings.
664+
# Such strings may still be stored in the database, so we are using a wide column (4000 chars)
665+
# and a fallback deserializer to convert them to relevant enum members.
666+
termination_reason: Mapped[Optional[InstanceTerminationReason]] = mapped_column(
667+
EnumAsString(
668+
InstanceTerminationReason,
669+
4000,
670+
fallback_deserializer=InstanceTerminationReason.from_legacy_str,
671+
)
672+
)
673+
termination_reason_message: Mapped[Optional[str]] = mapped_column(String(4000))
645674
# Deprecated since 0.19.22, not used
646675
health_status: Mapped[Optional[str]] = mapped_column(String(4000), deferred=True)
647676
health: Mapped[HealthStatus] = mapped_column(

0 commit comments

Comments
 (0)