diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index a90e39d9cd..6984117ffa 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -101,6 +101,14 @@ def to_status(self) -> "RunStatus": } return mapping[self] + def to_error(self) -> Optional[str]: + if self == RunTerminationReason.RETRY_LIMIT_EXCEEDED: + return "retry limit exceeded" + elif self == RunTerminationReason.SERVER_ERROR: + return "server error" + else: + return None + class JobTerminationReason(str, Enum): # Set by the server @@ -162,6 +170,24 @@ def to_retry_event(self) -> Optional[RetryEvent]: default = RetryEvent.ERROR if self.to_status() == JobStatus.FAILED else None return mapping.get(self, default) + def to_error(self) -> Optional[str]: + # Should return None for values that are already + # handled and shown in status_message. + error_mapping = { + JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable", + JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded", + JobTerminationReason.VOLUME_ERROR: "volume error", + JobTerminationReason.GATEWAY_ERROR: "gateway error", + JobTerminationReason.SCALED_DOWN: "scaled down", + JobTerminationReason.INACTIVITY_DURATION_EXCEEDED: "inactivity duration exceeded", + JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY: "utilization policy", + JobTerminationReason.PORTS_BINDING_FAILED: "ports binding failed", + JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error", + JobTerminationReason.EXECUTOR_ERROR: "executor error", + JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded", + } + return error_mapping.get(self) + class Requirements(CoreModel): # TODO: Make requirements' fields required @@ -305,13 +331,12 @@ class JobSubmission(CoreModel): finished_at: Optional[datetime] inactivity_secs: Optional[int] status: JobStatus + status_message: str = "" # default for backward compatibility termination_reason: Optional[JobTerminationReason] termination_reason_message: Optional[str] exit_status: Optional[int] job_provisioning_data: Optional[JobProvisioningData] job_runtime_data: Optional[JobRuntimeData] - # TODO: make status_message and error a computed field after migrating to pydanticV2 - status_message: Optional[str] = None error: Optional[str] = None @property @@ -325,71 +350,11 @@ def duration(self) -> timedelta: end_time = self.finished_at return end_time - self.submitted_at - def dict(self, *args, **kwargs) -> Dict: - status_message = self._get_status_message() - error = self._get_error() - # super() does not work with pydantic-duality - res = CoreModel.dict(self, *args, **kwargs) - res["status_message"] = status_message - res["error"] = error - return res - - def _get_status_message(self) -> Optional[str]: - if self.status == JobStatus.DONE: - return "exited (0)" - elif self.status == JobStatus.FAILED: - if self.termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR: - return f"exited ({self.exit_status})" - elif ( - self.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - ): - return "no offers" - elif self.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY: - return "interrupted" - else: - return "error" - elif self.status == JobStatus.TERMINATED: - if self.termination_reason == JobTerminationReason.TERMINATED_BY_USER: - return "stopped" - elif self.termination_reason == JobTerminationReason.ABORTED_BY_USER: - return "aborted" - return self.status.value - - def _get_error(self) -> Optional[str]: - return JobSubmission._termination_reason_to_error( - termination_reason=self.termination_reason - ) - - @staticmethod - def _termination_reason_to_error( - termination_reason: Optional[JobTerminationReason], - ) -> Optional[str]: - error_mapping = { - JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable", - JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded", - JobTerminationReason.VOLUME_ERROR: "volume error", - JobTerminationReason.GATEWAY_ERROR: "gateway error", - JobTerminationReason.SCALED_DOWN: "scaled down", - JobTerminationReason.INACTIVITY_DURATION_EXCEEDED: "inactivity duration exceeded", - JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY: "utilization policy", - JobTerminationReason.PORTS_BINDING_FAILED: "ports binding failed", - JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error", - JobTerminationReason.EXECUTOR_ERROR: "executor error", - JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded", - } - return error_mapping.get(termination_reason) - class Job(CoreModel): job_spec: JobSpec job_submissions: List[JobSubmission] - def get_last_termination_reason(self) -> Optional[JobTerminationReason]: - for submission in reversed(self.job_submissions): - if submission.termination_reason is not None: - return submission.termination_reason - return None - class RunSpec(CoreModel): # TODO: run_name, working_dir are redundant here since they already passed in configuration @@ -519,7 +484,7 @@ class Run(CoreModel): submitted_at: datetime last_processed_at: datetime status: RunStatus - status_message: Optional[str] = None + status_message: str = "" # default for backward compatibility termination_reason: Optional[RunTerminationReason] = None run_spec: RunSpec jobs: List[Job] @@ -527,64 +492,9 @@ class Run(CoreModel): cost: float = 0 service: Optional[ServiceSpec] = None deployment_num: int = 0 # default for compatibility with pre-0.19.14 servers - # TODO: make error a computed field after migrating to pydanticV2 error: Optional[str] = None deleted: Optional[bool] = None - def dict(self, *args, **kwargs) -> Dict: - status_message = self._get_status_message() - error = self._get_error() - # super() does not work with pydantic-duality - res = CoreModel.dict(self, *args, **kwargs) - res["status_message"] = status_message - res["error"] = error - return res - - def _get_error(self) -> Optional[str]: - return Run._termination_reason_to_error(termination_reason=self.termination_reason) - - @staticmethod - def _termination_reason_to_error( - termination_reason: Optional[RunTerminationReason], - ) -> Optional[str]: - if termination_reason == RunTerminationReason.RETRY_LIMIT_EXCEEDED: - return "retry limit exceeded" - elif termination_reason == RunTerminationReason.SERVER_ERROR: - return "server error" - else: - return None - - def _get_status_message(self) -> Optional[str]: - if len(self.jobs) == 0: - return self.status.value - - last_job = self.jobs[0] - # FIXME: status_message should not require all job submissions for status calculation - # since it's very expensive and is not required for anything else. - # May return a different status if not all job submissions requested. - # TODO: Calculate status_message by looking at job models directly instead job submissions. - last_job_termination_reason = last_job.get_last_termination_reason() - - if len(self.jobs) == 1: - # FIXME: Clarify why show "pulling" only in case of one job - if ( - last_job.job_submissions - and last_job.job_submissions[-1].status == JobStatus.PULLING - ): - return "pulling" - - retry_on_events = last_job.job_spec.retry.on_events if last_job.job_spec.retry else [] - # Currently, `retrying` is shown only for `no-capacity` events - if ( - self.status in [RunStatus.SUBMITTED, RunStatus.PENDING] - and last_job_termination_reason - == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - and RetryEvent.NO_CAPACITY in retry_on_events - ): - return "retrying" - - return self.status.value - def is_deployment_in_progress(self) -> bool: return any( not j.job_submissions[-1].status.is_finished() diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 41aa496be7..a96448ef3b 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -134,6 +134,8 @@ def job_model_to_job_submission(job_model: JobModel) -> JobSubmission: finished_at = None if job_model.status.is_finished(): finished_at = last_processed_at + status_message = _get_job_status_message(job_model) + error = _get_job_error(job_model) return JobSubmission( id=job_model.id, submission_num=job_model.submission_num, @@ -143,11 +145,13 @@ def job_model_to_job_submission(job_model: JobModel) -> JobSubmission: finished_at=finished_at, inactivity_secs=job_model.inactivity_secs, status=job_model.status, + status_message=status_message, termination_reason=job_model.termination_reason, termination_reason_message=job_model.termination_reason_message, exit_status=job_model.exit_status, job_provisioning_data=job_provisioning_data, job_runtime_data=get_job_runtime_data(job_model), + error=error, ) @@ -693,3 +697,31 @@ def _get_job_mount_point_attached_volume( continue return volume raise ServerClientError("Failed to find an eligible volume for the mount point") + + +def _get_job_status_message(job_model: JobModel) -> str: + if job_model.status == JobStatus.DONE: + return "exited (0)" + elif job_model.status == JobStatus.FAILED: + if job_model.termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR: + return f"exited ({job_model.exit_status})" + elif ( + job_model.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + ): + return "no offers" + elif job_model.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY: + return "interrupted" + else: + return "error" + elif job_model.status == JobStatus.TERMINATED: + if job_model.termination_reason == JobTerminationReason.TERMINATED_BY_USER: + return "stopped" + elif job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER: + return "aborted" + return job_model.status.value + + +def _get_job_error(job_model: JobModel) -> Optional[str]: + if job_model.termination_reason is None: + return None + return job_model.termination_reason.to_error() diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 5462d6b018..dad59f6007 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -24,6 +24,7 @@ ) from dstack._internal.core.models.profiles import ( CreationPolicy, + RetryEvent, ) from dstack._internal.core.models.repos.virtual import DEFAULT_VIRTUAL_REPO_ID, VirtualRunRepoData from dstack._internal.core.models.runs import ( @@ -686,6 +687,8 @@ def run_model_to_run( if run_model.service_spec is not None: service_spec = ServiceSpec.__response__.parse_raw(run_model.service_spec) + status_message = _get_run_status_message(run_model) + error = _get_run_error(run_model) run = Run( id=run_model.id, project_name=run_model.project.name, @@ -693,12 +696,14 @@ def run_model_to_run( submitted_at=run_model.submitted_at.replace(tzinfo=timezone.utc), last_processed_at=run_model.last_processed_at.replace(tzinfo=timezone.utc), status=run_model.status, + status_message=status_message, termination_reason=run_model.termination_reason, run_spec=run_spec, jobs=jobs, latest_job_submission=latest_job_submission, service=service_spec, deployment_num=run_model.deployment_num, + error=error, deleted=run_model.deleted, ) run.cost = _get_run_cost(run) @@ -746,6 +751,52 @@ def _get_run_jobs_with_submissions( return jobs +def _get_run_status_message(run_model: RunModel) -> str: + if len(run_model.jobs) == 0: + return run_model.status.value + + sorted_job_models = sorted( + run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num) + ) + job_models_grouped_by_job = list( + list(jm) + for _, jm in itertools.groupby(sorted_job_models, key=lambda j: (j.replica_num, j.job_num)) + ) + + if all(job_models[-1].status == JobStatus.PULLING for job_models in job_models_grouped_by_job): + # Show `pulling`` if last job submission of all jobs is pulling + return "pulling" + + if run_model.status in [RunStatus.SUBMITTED, RunStatus.PENDING]: + # Show `retrying` if any job caused the run to retry + for job_models in job_models_grouped_by_job: + last_job_spec = JobSpec.__response__.parse_raw(job_models[-1].job_spec_data) + retry_on_events = last_job_spec.retry.on_events if last_job_spec.retry else [] + last_job_termination_reason = _get_last_job_termination_reason(job_models) + if ( + last_job_termination_reason + == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + and RetryEvent.NO_CAPACITY in retry_on_events + ): + # TODO: Show `retrying` for other retry events + return "retrying" + + return run_model.status.value + + +def _get_last_job_termination_reason(job_models: List[JobModel]) -> Optional[JobTerminationReason]: + for job_model in reversed(job_models): + if job_model.termination_reason is not None: + return job_model.termination_reason + return None + + +def _get_run_error(run_model: RunModel) -> Optional[str]: + if run_model.termination_reason is None: + return None + return run_model.termination_reason.to_error() + + async def _get_pool_offers( session: AsyncSession, project: ProjectModel, diff --git a/src/tests/_internal/core/models/test_runs.py b/src/tests/_internal/core/models/test_runs.py index 6837a4528c..23b27c0185 100644 --- a/src/tests/_internal/core/models/test_runs.py +++ b/src/tests/_internal/core/models/test_runs.py @@ -1,9 +1,7 @@ from dstack._internal.core.models.profiles import RetryEvent from dstack._internal.core.models.runs import ( JobStatus, - JobSubmission, JobTerminationReason, - Run, RunStatus, RunTerminationReason, ) @@ -35,6 +33,7 @@ def test_job_termination_reason_to_retry_event_works_with_all_enum_variants(): # Will fail if JobTerminationReason value is added without updating JobSubmission._get_error def test_get_error_returns_expected_messages(): + # already handled and shown in status_message no_error_reasons = [ JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, @@ -47,7 +46,7 @@ def test_get_error_returns_expected_messages(): ] for reason in JobTerminationReason: - if JobSubmission._termination_reason_to_error(reason) is None: + if reason.to_error() is None: # Fail no-error reason is not in the list assert reason in no_error_reasons @@ -62,6 +61,6 @@ def test_run_get_error_returns_none_for_specific_reasons(): ] for reason in RunTerminationReason: - if Run._termination_reason_to_error(reason) is None: + if reason.to_error() is None: # Fail no-error reason is not in the list assert reason in no_error_reasons