Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 28 additions & 118 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ def to_status(self) -> "RunStatus":
}
return mapping[self]

def to_error(self) -> Optional[str]:
if self == RunTerminationReason.RETRY_LIMIT_EXCEEDED:
return "retry limit exceeded"
elif self == RunTerminationReason.SERVER_ERROR:
return "server error"
else:
return None


class JobTerminationReason(str, Enum):
# Set by the server
Expand Down Expand Up @@ -162,6 +170,24 @@ def to_retry_event(self) -> Optional[RetryEvent]:
default = RetryEvent.ERROR if self.to_status() == JobStatus.FAILED else None
return mapping.get(self, default)

def to_error(self) -> Optional[str]:
# Should return None for values that are already
# handled and shown in status_message.
error_mapping = {
JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable",
JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded",
JobTerminationReason.VOLUME_ERROR: "volume error",
JobTerminationReason.GATEWAY_ERROR: "gateway error",
JobTerminationReason.SCALED_DOWN: "scaled down",
JobTerminationReason.INACTIVITY_DURATION_EXCEEDED: "inactivity duration exceeded",
JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY: "utilization policy",
JobTerminationReason.PORTS_BINDING_FAILED: "ports binding failed",
JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error",
JobTerminationReason.EXECUTOR_ERROR: "executor error",
JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded",
}
return error_mapping.get(self)


class Requirements(CoreModel):
# TODO: Make requirements' fields required
Expand Down Expand Up @@ -305,13 +331,12 @@ class JobSubmission(CoreModel):
finished_at: Optional[datetime]
inactivity_secs: Optional[int]
status: JobStatus
status_message: str = "" # default for backward compatibility
termination_reason: Optional[JobTerminationReason]
termination_reason_message: Optional[str]
exit_status: Optional[int]
job_provisioning_data: Optional[JobProvisioningData]
job_runtime_data: Optional[JobRuntimeData]
# TODO: make status_message and error a computed field after migrating to pydanticV2
status_message: Optional[str] = None
error: Optional[str] = None

@property
Expand All @@ -325,71 +350,11 @@ def duration(self) -> timedelta:
end_time = self.finished_at
return end_time - self.submitted_at

def dict(self, *args, **kwargs) -> Dict:
status_message = self._get_status_message()
error = self._get_error()
# super() does not work with pydantic-duality
res = CoreModel.dict(self, *args, **kwargs)
res["status_message"] = status_message
res["error"] = error
return res

def _get_status_message(self) -> Optional[str]:
if self.status == JobStatus.DONE:
return "exited (0)"
elif self.status == JobStatus.FAILED:
if self.termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR:
return f"exited ({self.exit_status})"
elif (
self.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
):
return "no offers"
elif self.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY:
return "interrupted"
else:
return "error"
elif self.status == JobStatus.TERMINATED:
if self.termination_reason == JobTerminationReason.TERMINATED_BY_USER:
return "stopped"
elif self.termination_reason == JobTerminationReason.ABORTED_BY_USER:
return "aborted"
return self.status.value

def _get_error(self) -> Optional[str]:
return JobSubmission._termination_reason_to_error(
termination_reason=self.termination_reason
)

@staticmethod
def _termination_reason_to_error(
termination_reason: Optional[JobTerminationReason],
) -> Optional[str]:
error_mapping = {
JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable",
JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded",
JobTerminationReason.VOLUME_ERROR: "volume error",
JobTerminationReason.GATEWAY_ERROR: "gateway error",
JobTerminationReason.SCALED_DOWN: "scaled down",
JobTerminationReason.INACTIVITY_DURATION_EXCEEDED: "inactivity duration exceeded",
JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY: "utilization policy",
JobTerminationReason.PORTS_BINDING_FAILED: "ports binding failed",
JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error",
JobTerminationReason.EXECUTOR_ERROR: "executor error",
JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded",
}
return error_mapping.get(termination_reason)


class Job(CoreModel):
job_spec: JobSpec
job_submissions: List[JobSubmission]

def get_last_termination_reason(self) -> Optional[JobTerminationReason]:
for submission in reversed(self.job_submissions):
if submission.termination_reason is not None:
return submission.termination_reason
return None


class RunSpec(CoreModel):
# TODO: run_name, working_dir are redundant here since they already passed in configuration
Expand Down Expand Up @@ -519,72 +484,17 @@ class Run(CoreModel):
submitted_at: datetime
last_processed_at: datetime
status: RunStatus
status_message: Optional[str] = None
status_message: str = "" # default for backward compatibility
termination_reason: Optional[RunTerminationReason] = None
run_spec: RunSpec
jobs: List[Job]
latest_job_submission: Optional[JobSubmission] = None
cost: float = 0
service: Optional[ServiceSpec] = None
deployment_num: int = 0 # default for compatibility with pre-0.19.14 servers
# TODO: make error a computed field after migrating to pydanticV2
error: Optional[str] = None
deleted: Optional[bool] = None

def dict(self, *args, **kwargs) -> Dict:
status_message = self._get_status_message()
error = self._get_error()
# super() does not work with pydantic-duality
res = CoreModel.dict(self, *args, **kwargs)
res["status_message"] = status_message
res["error"] = error
return res

def _get_error(self) -> Optional[str]:
return Run._termination_reason_to_error(termination_reason=self.termination_reason)

@staticmethod
def _termination_reason_to_error(
termination_reason: Optional[RunTerminationReason],
) -> Optional[str]:
if termination_reason == RunTerminationReason.RETRY_LIMIT_EXCEEDED:
return "retry limit exceeded"
elif termination_reason == RunTerminationReason.SERVER_ERROR:
return "server error"
else:
return None

def _get_status_message(self) -> Optional[str]:
if len(self.jobs) == 0:
return self.status.value

last_job = self.jobs[0]
# FIXME: status_message should not require all job submissions for status calculation
# since it's very expensive and is not required for anything else.
# May return a different status if not all job submissions requested.
# TODO: Calculate status_message by looking at job models directly instead job submissions.
last_job_termination_reason = last_job.get_last_termination_reason()

if len(self.jobs) == 1:
# FIXME: Clarify why show "pulling" only in case of one job
if (
last_job.job_submissions
and last_job.job_submissions[-1].status == JobStatus.PULLING
):
return "pulling"

retry_on_events = last_job.job_spec.retry.on_events if last_job.job_spec.retry else []
# Currently, `retrying` is shown only for `no-capacity` events
if (
self.status in [RunStatus.SUBMITTED, RunStatus.PENDING]
and last_job_termination_reason
== JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
and RetryEvent.NO_CAPACITY in retry_on_events
):
return "retrying"

return self.status.value

def is_deployment_in_progress(self) -> bool:
return any(
not j.job_submissions[-1].status.is_finished()
Expand Down
32 changes: 32 additions & 0 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ def job_model_to_job_submission(job_model: JobModel) -> JobSubmission:
finished_at = None
if job_model.status.is_finished():
finished_at = last_processed_at
status_message = _get_job_status_message(job_model)
error = _get_job_error(job_model)
return JobSubmission(
id=job_model.id,
submission_num=job_model.submission_num,
Expand All @@ -143,11 +145,13 @@ def job_model_to_job_submission(job_model: JobModel) -> JobSubmission:
finished_at=finished_at,
inactivity_secs=job_model.inactivity_secs,
status=job_model.status,
status_message=status_message,
termination_reason=job_model.termination_reason,
termination_reason_message=job_model.termination_reason_message,
exit_status=job_model.exit_status,
job_provisioning_data=job_provisioning_data,
job_runtime_data=get_job_runtime_data(job_model),
error=error,
)


Expand Down Expand Up @@ -693,3 +697,31 @@ def _get_job_mount_point_attached_volume(
continue
return volume
raise ServerClientError("Failed to find an eligible volume for the mount point")


def _get_job_status_message(job_model: JobModel) -> str:
if job_model.status == JobStatus.DONE:
return "exited (0)"
elif job_model.status == JobStatus.FAILED:
if job_model.termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR:
return f"exited ({job_model.exit_status})"
elif (
job_model.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
):
return "no offers"
elif job_model.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY:
return "interrupted"
else:
return "error"
elif job_model.status == JobStatus.TERMINATED:
if job_model.termination_reason == JobTerminationReason.TERMINATED_BY_USER:
return "stopped"
elif job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER:
return "aborted"
return job_model.status.value


def _get_job_error(job_model: JobModel) -> Optional[str]:
if job_model.termination_reason is None:
return None
return job_model.termination_reason.to_error()
51 changes: 51 additions & 0 deletions src/dstack/_internal/server/services/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from dstack._internal.core.models.profiles import (
CreationPolicy,
RetryEvent,
)
from dstack._internal.core.models.repos.virtual import DEFAULT_VIRTUAL_REPO_ID, VirtualRunRepoData
from dstack._internal.core.models.runs import (
Expand Down Expand Up @@ -686,19 +687,23 @@ def run_model_to_run(
if run_model.service_spec is not None:
service_spec = ServiceSpec.__response__.parse_raw(run_model.service_spec)

status_message = _get_run_status_message(run_model)
error = _get_run_error(run_model)
run = Run(
id=run_model.id,
project_name=run_model.project.name,
user=run_model.user.name,
submitted_at=run_model.submitted_at.replace(tzinfo=timezone.utc),
last_processed_at=run_model.last_processed_at.replace(tzinfo=timezone.utc),
status=run_model.status,
status_message=status_message,
termination_reason=run_model.termination_reason,
run_spec=run_spec,
jobs=jobs,
latest_job_submission=latest_job_submission,
service=service_spec,
deployment_num=run_model.deployment_num,
error=error,
deleted=run_model.deleted,
)
run.cost = _get_run_cost(run)
Expand Down Expand Up @@ -746,6 +751,52 @@ def _get_run_jobs_with_submissions(
return jobs


def _get_run_status_message(run_model: RunModel) -> str:
if len(run_model.jobs) == 0:
return run_model.status.value

sorted_job_models = sorted(
run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num)
)
job_models_grouped_by_job = list(
list(jm)
for _, jm in itertools.groupby(sorted_job_models, key=lambda j: (j.replica_num, j.job_num))
)

if all(job_models[-1].status == JobStatus.PULLING for job_models in job_models_grouped_by_job):
# Show `pulling`` if last job submission of all jobs is pulling
return "pulling"

if run_model.status in [RunStatus.SUBMITTED, RunStatus.PENDING]:
# Show `retrying` if any job caused the run to retry
for job_models in job_models_grouped_by_job:
last_job_spec = JobSpec.__response__.parse_raw(job_models[-1].job_spec_data)
retry_on_events = last_job_spec.retry.on_events if last_job_spec.retry else []
last_job_termination_reason = _get_last_job_termination_reason(job_models)
if (
last_job_termination_reason
== JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
and RetryEvent.NO_CAPACITY in retry_on_events
):
# TODO: Show `retrying` for other retry events
return "retrying"

return run_model.status.value


def _get_last_job_termination_reason(job_models: List[JobModel]) -> Optional[JobTerminationReason]:
for job_model in reversed(job_models):
if job_model.termination_reason is not None:
return job_model.termination_reason
return None


def _get_run_error(run_model: RunModel) -> Optional[str]:
if run_model.termination_reason is None:
return None
return run_model.termination_reason.to_error()


async def _get_pool_offers(
session: AsyncSession,
project: ProjectModel,
Expand Down
7 changes: 3 additions & 4 deletions src/tests/_internal/core/models/test_runs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from dstack._internal.core.models.profiles import RetryEvent
from dstack._internal.core.models.runs import (
JobStatus,
JobSubmission,
JobTerminationReason,
Run,
RunStatus,
RunTerminationReason,
)
Expand Down Expand Up @@ -35,6 +33,7 @@ def test_job_termination_reason_to_retry_event_works_with_all_enum_variants():

# Will fail if JobTerminationReason value is added without updating JobSubmission._get_error
def test_get_error_returns_expected_messages():
# already handled and shown in status_message
no_error_reasons = [
JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY,
Expand All @@ -47,7 +46,7 @@ def test_get_error_returns_expected_messages():
]

for reason in JobTerminationReason:
if JobSubmission._termination_reason_to_error(reason) is None:
if reason.to_error() is None:
# Fail no-error reason is not in the list
assert reason in no_error_reasons

Expand All @@ -62,6 +61,6 @@ def test_run_get_error_returns_none_for_specific_reasons():
]

for reason in RunTerminationReason:
if Run._termination_reason_to_error(reason) is None:
if reason.to_error() is None:
# Fail no-error reason is not in the list
assert reason in no_error_reasons
Loading