Skip to content

Commit 6fe89ac

Browse files
committed
Add more events
- All Run/Job/Fleet status transitions - All Job/Instance creation cases - Run deletion
1 parent 77fe555 commit 6fe89ac

File tree

13 files changed

+324
-138
lines changed

13 files changed

+324
-138
lines changed

contributing/RUNS-AND-JOBS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ A run can spawn one or multiple jobs, depending on the configuration. A task tha
3030
- STEP 4: Once all jobs are finished, the run becomes `TERMINATED`, `DONE`, or `FAILED` based on `RunTerminationReason`.
3131
- STEP 0: If the run is `PENDING`, `background.tasks.process_runs` will resubmit jobs. The run becomes `SUBMITTED` again.
3232

33+
> Use `switch_run_status()` for all status transitions. Do not set `RunModel.status` directly.
34+
3335
> No one must assign the finished status to the run, except `services.runs.process_terminating_run`. To terminate the run, assign `TERMINATING` status and `RunTerminationReason`.
3436
3537
### Services
@@ -68,6 +70,8 @@ Services' lifecycle has some modifications:
6870
- Once `remove_at` is in the past, it stops the container via `dstack-shim`, detaches instance volumes, and releases the instance. The job becomes `TERMINATED`, `DONE`, `FAILED`, or `ABORTED` based on `JobTerminationReason`.
6971
- If some volumes fail to detach, it keeps the job `TERMINATING` and checks volumes attachment status.
7072

73+
> Use `switch_job_status()` for all status transitions. Do not set `JobModel.status` directly.
74+
7175
> No one must assign the finished status to the job, except `services.jobs.process_terminating_job`. To terminate the job, assign `TERMINATING` status and `JobTerminationReason`.
7276
7377
### Services' Jobs

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
PlacementGroupModel,
1818
RunModel,
1919
)
20+
from dstack._internal.server.services import events
2021
from dstack._internal.server.services.fleets import (
2122
create_fleet_instance_model,
2223
get_fleet_spec,
2324
get_next_instance_num,
2425
is_fleet_empty,
2526
is_fleet_in_use,
27+
switch_fleet_status,
2628
)
29+
from dstack._internal.server.services.instances import format_instance_status_for_event
2730
from dstack._internal.server.services.locking import get_locker
2831
from dstack._internal.server.utils import sentry_utils
2932
from dstack._internal.utils.common import get_current_datetime
@@ -121,7 +124,7 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel])
121124
deleted_fleets_ids = []
122125
for fleet_model in fleet_models:
123126
_consolidate_fleet_state_with_spec(session, fleet_model)
124-
deleted = _autodelete_fleet(fleet_model)
127+
deleted = _autodelete_fleet(session, fleet_model)
125128
if deleted:
126129
deleted_fleets_ids.append(fleet_model.id)
127130
fleet_model.last_processed_at = get_current_datetime()
@@ -228,17 +231,26 @@ def _maintain_fleet_nodes_in_min_max_range(
228231
spec=fleet_spec,
229232
instance_num=get_next_instance_num({i.instance_num for i in active_instances}),
230233
)
234+
events.emit(
235+
session,
236+
(
237+
"Instance created to meet target fleet node count."
238+
f" Status: {format_instance_status_for_event(instance_model)}"
239+
),
240+
actor=events.SystemActor(),
241+
targets=[events.Target.from_model(instance_model)],
242+
)
231243
active_instances.append(instance_model)
232244
fleet_model.instances.append(instance_model)
233245
logger.info("Added %s instances to fleet %s", nodes_missing, fleet_model.name)
234246
return True
235247

236248

237-
def _autodelete_fleet(fleet_model: FleetModel) -> bool:
249+
def _autodelete_fleet(session: AsyncSession, fleet_model: FleetModel) -> bool:
238250
if fleet_model.project.deleted:
239251
# It used to be possible to delete project with active resources:
240252
# https://github.com/dstackai/dstack/issues/3077
241-
fleet_model.status = FleetStatus.TERMINATED
253+
switch_fleet_status(session, fleet_model, FleetStatus.TERMINATED)
242254
fleet_model.deleted = True
243255
logger.info("Fleet %s deleted due to deleted project", fleet_model.name)
244256
return True
@@ -256,7 +268,7 @@ def _autodelete_fleet(fleet_model: FleetModel) -> bool:
256268
return False
257269

258270
logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name)
259-
fleet_model.status = FleetStatus.TERMINATED
271+
switch_fleet_status(session, fleet_model, FleetStatus.TERMINATED)
260272
fleet_model.deleted = True
261273
logger.info("Fleet %s deleted", fleet_model.name)
262274
return True

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
get_job_runtime_data,
6262
is_master_job,
6363
job_model_to_job_submission,
64+
switch_job_status,
6465
)
6566
from dstack._internal.server.services.locking import get_locker
6667
from dstack._internal.server.services.logging import fmt
@@ -164,8 +165,11 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
164165
job_provisioning_data = job_submission.job_provisioning_data
165166
if job_provisioning_data is None:
166167
logger.error("%s: job_provisioning_data of an active job is None", fmt(job_model))
167-
job_model.status = JobStatus.TERMINATING
168168
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
169+
job_model.termination_reason_message = (
170+
"Unexpected server error: job_provisioning_data of an active job is None"
171+
)
172+
switch_job_status(session, job_model, JobStatus.TERMINATING)
169173
job_model.last_processed_at = common_utils.get_current_datetime()
170174
await session.commit()
171175
return
@@ -216,10 +220,9 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
216220
try:
217221
_interpolate_secrets(secrets, job.job_spec)
218222
except InterpolatorError as e:
219-
logger.info("%s: terminating due to secrets interpolation error", fmt(job_model))
220-
job_model.status = JobStatus.TERMINATING
221223
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
222-
job_model.termination_reason_message = e.args[0]
224+
job_model.termination_reason_message = f"Secrets interpolation error: {e.args[0]}"
225+
switch_job_status(session, job_model, JobStatus.TERMINATING)
223226
job_model.last_processed_at = common_utils.get_current_datetime()
224227
await session.commit()
225228
return
@@ -230,7 +233,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
230233

231234
if initial_status == JobStatus.PROVISIONING:
232235
if job_provisioning_data.hostname is None:
233-
await _wait_for_instance_provisioning_data(job_model=job_model)
236+
await _wait_for_instance_provisioning_data(session, job_model)
234237
job_model.last_processed_at = common_utils.get_current_datetime()
235238
await session.commit()
236239
return
@@ -258,6 +261,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
258261
server_ssh_private_keys,
259262
job_provisioning_data,
260263
None,
264+
session,
261265
run,
262266
job_model,
263267
job_provisioning_data,
@@ -292,6 +296,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
292296
server_ssh_private_keys,
293297
job_provisioning_data,
294298
None,
299+
session,
295300
run,
296301
job_model,
297302
job,
@@ -305,17 +310,17 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
305310

306311
if not success:
307312
# check timeout
308-
if job_submission.age > get_provisioning_timeout(
313+
provisioning_timeout = get_provisioning_timeout(
309314
backend_type=job_provisioning_data.get_base_backend(),
310315
instance_type_name=job_provisioning_data.instance_type.name,
311-
):
312-
logger.warning(
313-
"%s: failed because runner has not become available in time, age=%s",
314-
fmt(job_model),
315-
job_submission.age,
316-
)
317-
job_model.status = JobStatus.TERMINATING
316+
)
317+
if job_submission.age > provisioning_timeout:
318318
job_model.termination_reason = JobTerminationReason.WAITING_RUNNER_LIMIT_EXCEEDED
319+
job_model.termination_reason_message = (
320+
f"Runner did not become available within {provisioning_timeout.total_seconds()}s."
321+
f" Job submission age: {job_submission.age.total_seconds()}s)"
322+
)
323+
switch_job_status(session, job_model, JobStatus.TERMINATING)
319324
# instance will be emptied by process_terminating_jobs
320325

321326
else: # fails are not acceptable
@@ -342,6 +347,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
342347
server_ssh_private_keys,
343348
job_provisioning_data,
344349
None,
350+
session,
345351
run,
346352
job_model,
347353
job,
@@ -360,6 +366,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
360366
server_ssh_private_keys,
361367
job_provisioning_data,
362368
job_submission.job_runtime_data,
369+
session,
363370
run_model,
364371
job_model,
365372
)
@@ -374,21 +381,17 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
374381
job_model.termination_reason.value,
375382
job_submission.age,
376383
)
377-
job_model.status = JobStatus.TERMINATING
384+
switch_job_status(session, job_model, JobStatus.TERMINATING)
378385
# job will be terminated and instance will be emptied by process_terminating_jobs
379386
else:
380387
# No job_model.termination_reason set means ssh connection failed
381388
if job_model.disconnected_at is None:
382389
job_model.disconnected_at = common_utils.get_current_datetime()
383390
if _should_terminate_job_due_to_disconnect(job_model):
384-
logger.warning(
385-
"%s: failed because instance is unreachable, age=%s",
386-
fmt(job_model),
387-
job_submission.age,
388-
)
389391
# TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE for on-demand.
390392
job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
391-
job_model.status = JobStatus.TERMINATING
393+
job_model.termination_reason_message = "Instance is unreachable"
394+
switch_job_status(session, job_model, JobStatus.TERMINATING)
392395
else:
393396
logger.warning(
394397
"%s: is unreachable, waiting for the instance to become reachable again, age=%s",
@@ -418,7 +421,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
418421
await session.commit()
419422

420423

421-
async def _wait_for_instance_provisioning_data(job_model: JobModel):
424+
async def _wait_for_instance_provisioning_data(session: AsyncSession, job_model: JobModel):
422425
"""
423426
This function will be called until instance IP address appears
424427
in `job_model.instance.job_provisioning_data` or instance is terminated on timeout.
@@ -437,8 +440,9 @@ async def _wait_for_instance_provisioning_data(job_model: JobModel):
437440
return
438441

439442
if job_model.instance.status == InstanceStatus.TERMINATED:
440-
job_model.status = JobStatus.TERMINATING
441443
job_model.termination_reason = JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED
444+
job_model.termination_reason_message = "Instance is terminated"
445+
switch_job_status(session, job_model, JobStatus.TERMINATING)
442446
return
443447

444448
job_model.job_provisioning_data = job_model.instance.job_provisioning_data
@@ -489,6 +493,7 @@ def _should_wait_for_other_nodes(run: Run, job: Job, job_model: JobModel) -> boo
489493
@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT], retries=1)
490494
def _process_provisioning_with_shim(
491495
ports: Dict[int, int],
496+
session: AsyncSession,
492497
run: Run,
493498
job_model: JobModel,
494499
job_provisioning_data: JobProvisioningData,
@@ -615,14 +620,14 @@ def _process_provisioning_with_shim(
615620
shim_client.stop(force=True)
616621
return False
617622

618-
job_model.status = JobStatus.PULLING
619-
logger.info("%s: now is %s", fmt(job_model), job_model.status.name)
623+
switch_job_status(session, job_model, JobStatus.PULLING)
620624
return True
621625

622626

623627
@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT])
624628
def _process_pulling_with_shim(
625629
ports: Dict[int, int],
630+
session: AsyncSession,
626631
run: Run,
627632
job_model: JobModel,
628633
job: Job,
@@ -700,6 +705,7 @@ def _process_pulling_with_shim(
700705
server_ssh_private_keys,
701706
job_provisioning_data,
702707
job_runtime_data,
708+
session=session,
703709
run=run,
704710
job_model=job_model,
705711
job=job,
@@ -715,6 +721,7 @@ def _process_pulling_with_shim(
715721
@runner_ssh_tunnel(ports=[DSTACK_RUNNER_HTTP_PORT])
716722
def _process_running(
717723
ports: Dict[int, int],
724+
session: AsyncSession,
718725
run_model: RunModel,
719726
job_model: JobModel,
720727
) -> bool:
@@ -740,35 +747,37 @@ def _process_running(
740747
runner_logs=resp.runner_logs,
741748
job_logs=resp.job_logs,
742749
)
743-
previous_status = job_model.status
744750
if len(resp.job_states) > 0:
745751
latest_state_event = resp.job_states[-1]
746752
latest_status = latest_state_event.state
747753
if latest_status == JobStatus.DONE:
748-
job_model.status = JobStatus.TERMINATING
749754
job_model.termination_reason = JobTerminationReason.DONE_BY_RUNNER
755+
switch_job_status(session, job_model, JobStatus.TERMINATING)
750756
elif latest_status in {JobStatus.FAILED, JobStatus.TERMINATED}:
751-
job_model.status = JobStatus.TERMINATING
752757
job_model.termination_reason = JobTerminationReason.CONTAINER_EXITED_WITH_ERROR
753758
if latest_state_event.termination_reason:
754759
job_model.termination_reason = JobTerminationReason(
755760
latest_state_event.termination_reason.lower()
756761
)
757762
if latest_state_event.termination_message:
758763
job_model.termination_reason_message = latest_state_event.termination_message
764+
switch_job_status(session, job_model, JobStatus.TERMINATING)
759765
if (exit_status := latest_state_event.exit_status) is not None:
760766
job_model.exit_status = exit_status
761767
if exit_status != 0:
762768
logger.info("%s: non-zero exit status %s", fmt(job_model), exit_status)
763769
else:
764-
_terminate_if_inactivity_duration_exceeded(run_model, job_model, resp.no_connections_secs)
765-
if job_model.status != previous_status:
766-
logger.info("%s: now is %s", fmt(job_model), job_model.status.name)
770+
_terminate_if_inactivity_duration_exceeded(
771+
session, run_model, job_model, resp.no_connections_secs
772+
)
767773
return True
768774

769775

770776
def _terminate_if_inactivity_duration_exceeded(
771-
run_model: RunModel, job_model: JobModel, no_connections_secs: Optional[int]
777+
session: AsyncSession,
778+
run_model: RunModel,
779+
job_model: JobModel,
780+
no_connections_secs: Optional[int],
772781
) -> None:
773782
conf = RunSpec.__response__.parse_raw(run_model.run_spec).configuration
774783
if not isinstance(conf, DevEnvironmentConfiguration) or not isinstance(
@@ -781,20 +790,20 @@ def _terminate_if_inactivity_duration_exceeded(
781790
job_model.inactivity_secs = no_connections_secs
782791
if no_connections_secs is None:
783792
# TODO(0.19 or earlier): make no_connections_secs required
784-
job_model.status = JobStatus.TERMINATING
785793
job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
786794
job_model.termination_reason_message = (
787795
"The selected instance was created before dstack 0.18.41"
788796
" and does not support inactivity_duration"
789797
)
798+
switch_job_status(session, job_model, JobStatus.TERMINATING)
790799
elif no_connections_secs >= conf.inactivity_duration:
791-
job_model.status = JobStatus.TERMINATING
792800
# TODO(0.19 or earlier): set JobTerminationReason.INACTIVITY_DURATION_EXCEEDED
793801
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
794802
job_model.termination_reason_message = (
795803
f"The job was inactive for {no_connections_secs} seconds,"
796804
f" exceeding the inactivity_duration of {conf.inactivity_duration} seconds"
797805
)
806+
switch_job_status(session, job_model, JobStatus.TERMINATING)
798807

799808

800809
def _should_terminate_job_due_to_disconnect(job_model: JobModel) -> bool:
@@ -851,8 +860,10 @@ async def _maybe_register_replica(
851860
fmt(job_model),
852861
e,
853862
)
854-
job_model.status = JobStatus.TERMINATING
855863
job_model.termination_reason = JobTerminationReason.GATEWAY_ERROR
864+
# Not including e.args[0] in the message to avoid exposing internal details
865+
job_model.termination_reason_message = "Failed to register service replica"
866+
switch_job_status(session, job_model, JobStatus.TERMINATING)
856867

857868

858869
async def _check_gpu_utilization(session: AsyncSession, job_model: JobModel, job: Job) -> None:
@@ -873,14 +884,14 @@ async def _check_gpu_utilization(session: AsyncSession, job_model: JobModel, job
873884
if _should_terminate_due_to_low_gpu_util(
874885
policy.min_gpu_utilization, [m.values for m in gpus_util_metrics]
875886
):
876-
logger.info("%s: GPU utilization check: terminating", fmt(job_model))
877-
job_model.status = JobStatus.TERMINATING
887+
logger.debug("%s: GPU utilization check: terminating", fmt(job_model))
878888
# TODO(0.19 or earlier): set JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY
879889
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
880890
job_model.termination_reason_message = (
881891
f"The job GPU utilization below {policy.min_gpu_utilization}%"
882892
f" for {policy.time_window} seconds"
883893
)
894+
switch_job_status(session, job_model, JobStatus.TERMINATING)
884895
else:
885896
logger.debug("%s: GPU utilization check: OK", fmt(job_model))
886897

@@ -998,6 +1009,7 @@ async def _get_job_file_archive(
9981009
@runner_ssh_tunnel(ports=[DSTACK_RUNNER_HTTP_PORT], retries=1)
9991010
def _submit_job_to_runner(
10001011
ports: Dict[int, int],
1012+
session: AsyncSession,
10011013
run: Run,
10021014
job_model: JobModel,
10031015
job: Job,
@@ -1053,7 +1065,7 @@ def _submit_job_to_runner(
10531065
logger.debug("%s: starting job", fmt(job_model))
10541066
runner_client.run_job()
10551067

1056-
job_model.status = JobStatus.RUNNING
1068+
switch_job_status(session, job_model, JobStatus.RUNNING)
10571069
# do not log here, because the runner will send a new status
10581070

10591071
return True

0 commit comments

Comments
 (0)