From 9f2f9bff543418dff6f97553c3f33b9c6a2fed4f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 30 Mar 2026 11:23:57 +0500 Subject: [PATCH 1/5] Move stop_runner() to JobTerminating pipeline --- .../pipeline_tasks/jobs_terminating.py | 27 ++++++++++++++-- .../pipeline_tasks/runs/terminating.py | 13 +++----- ...e38f2_add_jobmodel_graceful_termination.py | 32 +++++++++++++++++++ src/dstack/_internal/server/models.py | 7 +++- 4 files changed, 66 insertions(+), 13 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 2925dd844..9179663c1 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -60,6 +60,7 @@ get_job_provisioning_data, get_job_runtime_data, get_job_spec, + stop_runner, ) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.logging import fmt @@ -267,6 +268,7 @@ class _JobUpdateMap(ItemUpdateMap, total=False): instance_id: Optional[uuid.UUID] volumes_detached_at: UpdateMapDateTime registered: bool + remove_at: UpdateMapDateTime class _InstanceUpdateMap(ItemUpdateMap, total=False): @@ -580,9 +582,11 @@ async def _process_terminating_job( instance_model: Optional[InstanceModel], ) -> _ProcessResult: """ - Stops the job: tells shim to stop the container, detaches the job from the instance, - and detaches volumes from the instance. - Graceful stop should already be done by the run terminating path. + Terminates the job: + 1. tells the runner to stop the job's command + 2. tells the shim to stop the container + 3. detaches the job from the instance + 4. and detaches volumes from the instance. """ instance_update_map = None if instance_model is None else _InstanceUpdateMap() result = _ProcessResult(instance_update_map=instance_update_map) @@ -592,6 +596,10 @@ async def _process_terminating_job( result.job_update_map["status"] = _get_job_termination_status(job_model) return result + if job_model.graceful_termination and job_model.remove_at is None: + result.job_update_map = await _stop_job_gracefully(job_model, instance_model) + return result + jrd = get_job_runtime_data(job_model) jpd = get_job_provisioning_data(job_model) if jpd is not None: @@ -642,6 +650,19 @@ async def _process_terminating_job( return result +async def _stop_job_gracefully( + job_model: JobModel, instance_model: InstanceModel +) -> _JobUpdateMap: + """ + Tells the runner to stop the job's command. Sets `removed_at` for graceful termination: + `_process_terminating_job()` will stop the container on the next iteration after that time. + """ + job_update_map = _JobUpdateMap() + await stop_runner(job_model=job_model, instance_model=instance_model) + job_update_map["remove_at"] = get_current_datetime() + timedelta(seconds=10) + return job_update_map + + async def _process_job_volumes_detaching( job_model: JobModel, instance_model: InstanceModel, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py index a84d11cbf..0d1cc41de 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py @@ -1,6 +1,6 @@ import uuid from dataclasses import dataclass, field -from datetime import datetime, timedelta +from datetime import datetime from typing import Optional import httpx @@ -17,10 +17,9 @@ from dstack._internal.server.db import get_session_ctx from dstack._internal.server.services import events from dstack._internal.server.services.gateways import get_or_add_gateway_connection -from dstack._internal.server.services.jobs import stop_runner from dstack._internal.server.services.logging import fmt from dstack._internal.server.services.runs import _get_next_triggered_at, get_run_spec -from dstack._internal.utils.common import get_current_datetime, get_or_error +from dstack._internal.utils.common import get_or_error from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) @@ -35,7 +34,7 @@ class TerminatingRunUpdateMap(ItemUpdateMap, total=False): class TerminatingRunJobUpdateMap(ItemUpdateMap, total=False): status: JobStatus termination_reason: Optional[JobTerminationReason] - remove_at: Optional[datetime] + graceful_termination: bool @dataclass @@ -77,10 +76,6 @@ async def process_terminating_run(context: TerminatingContext) -> TerminatingRes JobTerminationReason.ABORTED_BY_USER, JobTerminationReason.DONE_BY_RUNNER, }: - # Send a signal to stop the job gracefully. - await stop_runner( - job_model=job_model, instance_model=get_or_error(job_model.instance) - ) delayed_job_ids.append(job_model.id) continue regular_job_ids.append(job_model.id) @@ -123,7 +118,7 @@ def _get_job_id_to_update_map( job_id_to_update_map[job_id] = TerminatingRunJobUpdateMap( status=JobStatus.TERMINATING, termination_reason=job_termination_reason, - remove_at=get_current_datetime() + timedelta(seconds=15), + graceful_termination=True, ) return job_id_to_update_map diff --git a/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py b/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py new file mode 100644 index 000000000..cca097213 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py @@ -0,0 +1,32 @@ +"""Add JobModel.graceful_termination + +Revision ID: aae8323e38f2 +Revises: c1c2ecaee45c +Create Date: 2026-03-30 06:19:29.078302+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "aae8323e38f2" +down_revision = "c1c2ecaee45c" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.add_column(sa.Column("graceful_termination", sa.Boolean(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.drop_column("graceful_termination") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index b599c4314..a1572b144 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -497,8 +497,13 @@ class JobModel(PipelineModelMixin, BaseModel): runner_timestamp: Mapped[Optional[int]] = mapped_column(BigInteger) inactivity_secs: Mapped[Optional[int]] = mapped_column(Integer) """`inactivity_secs` uses `0` for active jobs and `None` when inactivity is not applicable.""" + graceful_termination: Mapped[Optional[bool]] = mapped_column(Boolean) + """`graceful_termination` is set for terminating jobs if they need graceful termination. + """ remove_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) - """`remove_at` is used to ensure the instance is killed after the job is finished.""" + """`remove_at` is used to ensure the instance is killed after the job is finished + when performing graceful termination. + """ volumes_detached_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) instance_assigned: Mapped[bool] = mapped_column(Boolean, default=False) """`instance_assigned` shows whether instance assignment has already been attempted. From d734c1eceb6c8c25366681db56482620dfa49ea5 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 30 Mar 2026 11:50:50 +0500 Subject: [PATCH 2/5] Update tests --- .../test_runs/test_termination.py | 47 +++----- .../pipeline_tasks/test_terminating_jobs.py | 109 +++++++++++++++++- 2 files changed, 126 insertions(+), 30 deletions(-) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py index d056df8a8..89c97e9d1 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py @@ -20,6 +20,10 @@ JobTerminatingPipeline, ) from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline, RunWorker +from dstack._internal.server.background.pipeline_tasks.runs.terminating import ( + TerminatingResult, + process_terminating_run, +) from dstack._internal.server.testing.common import ( create_fleet, create_instance, @@ -84,32 +88,14 @@ async def test_transitions_running_jobs_to_terminating( ) lock_run(run) await session.commit() - item = run_to_pipeline_item(run) - observed_job_lock = {} - - async def record_stop_call(**kwargs) -> None: - observed_job_lock["lock_token"] = kwargs["job_model"].lock_token - observed_job_lock["lock_owner"] = kwargs["job_model"].lock_owner - - with patch( - "dstack._internal.server.background.pipeline_tasks.runs.terminating.stop_runner", - new=AsyncMock(side_effect=record_stop_call), - ) as stop_runner: - await worker.process(item) - - assert stop_runner.await_count == 1 - stop_call = stop_runner.await_args - assert stop_call is not None - assert stop_call.kwargs["job_model"].id == job.id - assert observed_job_lock["lock_token"] == item.lock_token - assert observed_job_lock["lock_owner"] == RunPipeline.__name__ - assert stop_call.kwargs["instance_model"].id == instance.id + await worker.process(run_to_pipeline_item(run)) await session.refresh(job) await session.refresh(run) assert job.status == JobStatus.TERMINATING assert job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER - assert job.remove_at is not None + assert job.graceful_termination is True + assert job.remove_at is None assert job.lock_token is None assert job.lock_expires_at is None assert job.lock_owner is None @@ -154,19 +140,17 @@ async def test_updates_delayed_and_regular_jobs_separately( lock_run(run) await session.commit() - with patch( - "dstack._internal.server.background.pipeline_tasks.runs.terminating.stop_runner", - new=AsyncMock(), - ): - await worker.process(run_to_pipeline_item(run)) + await worker.process(run_to_pipeline_item(run)) await session.refresh(delayed_job) await session.refresh(regular_job) assert delayed_job.status == JobStatus.TERMINATING assert delayed_job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER - assert delayed_job.remove_at is not None + assert delayed_job.graceful_termination is True + assert delayed_job.remove_at is None assert regular_job.status == JobStatus.TERMINATING assert regular_job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER + assert regular_job.graceful_termination is None assert regular_job.remove_at is None async def test_finishes_non_scheduled_run_when_all_jobs_are_finished( @@ -273,14 +257,16 @@ async def test_noops_when_run_lock_changes_after_processing( await session.commit() item = run_to_pipeline_item(run) new_lock_token = uuid.uuid4() + original_process_terminating_run = process_terminating_run - async def change_run_lock(**kwargs) -> None: + async def change_run_lock(context) -> TerminatingResult: run.lock_token = new_lock_token run.lock_expires_at = get_current_datetime() + timedelta(minutes=1) await session.commit() + return await original_process_terminating_run(context) with patch( - "dstack._internal.server.background.pipeline_tasks.runs.terminating.stop_runner", + "dstack._internal.server.background.pipeline_tasks.runs.terminating.process_terminating_run", new=AsyncMock(side_effect=change_run_lock), ): await worker.process(item) @@ -289,7 +275,10 @@ async def change_run_lock(**kwargs) -> None: await session.refresh(job) assert run.status == RunStatus.TERMINATING assert run.lock_token == new_lock_token + assert run.lock_owner == RunPipeline.__name__ assert job.status == JobStatus.RUNNING + assert job.graceful_termination is None + assert job.remove_at is None assert job.lock_token is None assert job.lock_expires_at is None assert job.lock_owner is None diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py index 822d6798a..66efdcaf0 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py @@ -1,7 +1,7 @@ import asyncio import uuid from datetime import datetime, timedelta, timezone -from unittest.mock import Mock, patch +from unittest.mock import AsyncMock, Mock, patch import pytest from sqlalchemy import select @@ -236,6 +236,113 @@ async def test_fetch_returns_oldest_jobs_first_up_to_limit( @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @pytest.mark.usefixtures("image_config_mock") class TestJobTerminatingWorker: + async def test_stops_job_gracefully_before_terminating_container( + self, test_db, session: AsyncSession, worker: JobTerminatingWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + instance = await create_instance( + session=session, + project=project, + status=InstanceStatus.BUSY, + ) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job( + session=session, + run=run, + status=JobStatus.TERMINATING, + termination_reason=JobTerminationReason.TERMINATED_BY_USER, + job_provisioning_data=get_job_provisioning_data(dockerized=True), + instance=instance, + ) + job.graceful_termination = True + _lock_job(job) + await session.commit() + + with ( + patch( + "dstack._internal.server.background.pipeline_tasks.jobs_terminating.stop_runner", + new=AsyncMock(), + ) as stop_runner, + patch( + "dstack._internal.server.background.pipeline_tasks.jobs_terminating._stop_container", + new=AsyncMock(return_value=True), + ) as stop_container, + ): + await worker.process(_job_to_pipeline_item(job)) + + stop_runner.assert_awaited_once() + stop_container.assert_not_awaited() + + await session.refresh(job) + await session.refresh(instance) + assert job.status == JobStatus.TERMINATING + assert job.graceful_termination is True + assert job.remove_at is not None + assert job.instance_id == instance.id + assert job.volumes_detached_at is None + assert job.lock_token is None + assert job.lock_expires_at is None + assert job.lock_owner is None + assert instance.lock_token is None + assert instance.lock_expires_at is None + assert instance.lock_owner is None + + async def test_terminates_gracefully_stopped_job_after_remove_at( + self, test_db, session: AsyncSession, worker: JobTerminatingWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + instance = await create_instance( + session=session, + project=project, + status=InstanceStatus.BUSY, + ) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job( + session=session, + run=run, + status=JobStatus.TERMINATING, + termination_reason=JobTerminationReason.TERMINATED_BY_USER, + job_provisioning_data=get_job_provisioning_data(dockerized=True), + instance=instance, + ) + job.graceful_termination = True + job.remove_at = get_current_datetime() - timedelta(minutes=1) + _lock_job(job) + await session.commit() + + with ( + patch( + "dstack._internal.server.background.pipeline_tasks.jobs_terminating.stop_runner", + new=AsyncMock(), + ) as stop_runner, + patch( + "dstack._internal.server.background.pipeline_tasks.jobs_terminating._stop_container", + new=AsyncMock(return_value=True), + ) as stop_container, + ): + await worker.process(_job_to_pipeline_item(job)) + + stop_runner.assert_not_awaited() + stop_container.assert_awaited_once() + + await session.refresh(job) + await session.refresh(instance) + assert job.status == JobStatus.TERMINATED + assert job.graceful_termination is True + assert job.remove_at is not None + assert job.instance_id is None + assert job.lock_token is None + assert job.lock_expires_at is None + assert job.lock_owner is None + assert instance.status == InstanceStatus.IDLE + assert instance.lock_token is None + assert instance.lock_expires_at is None + assert instance.lock_owner is None + async def test_terminates_job( self, test_db, session: AsyncSession, worker: JobTerminatingWorker ): From f6543c0a8575ab32d7e351751e4557b752f41689 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 30 Mar 2026 12:36:20 +0500 Subject: [PATCH 3/5] Replace graceful_termination with graceful_termination_attempts --- .../background/pipeline_tasks/jobs_terminating.py | 8 +++++--- .../background/pipeline_tasks/runs/terminating.py | 4 ++-- ..._aae8323e38f2_add_jobmodel_graceful_termination.py | 8 +++++--- src/dstack/_internal/server/models.py | 11 +++++++---- .../pipeline_tasks/test_runs/test_termination.py | 8 ++++---- .../pipeline_tasks/test_terminating_jobs.py | 8 ++++---- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 9179663c1..49e0d9179 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -266,6 +266,7 @@ class _JobUpdateMap(ItemUpdateMap, total=False): termination_reason: Optional[JobTerminationReason] termination_reason_message: Optional[str] instance_id: Optional[uuid.UUID] + graceful_termination_attempts: int volumes_detached_at: UpdateMapDateTime registered: bool remove_at: UpdateMapDateTime @@ -596,7 +597,7 @@ async def _process_terminating_job( result.job_update_map["status"] = _get_job_termination_status(job_model) return result - if job_model.graceful_termination and job_model.remove_at is None: + if job_model.graceful_termination_attempts == 0 and job_model.remove_at is None: result.job_update_map = await _stop_job_gracefully(job_model, instance_model) return result @@ -654,11 +655,12 @@ async def _stop_job_gracefully( job_model: JobModel, instance_model: InstanceModel ) -> _JobUpdateMap: """ - Tells the runner to stop the job's command. Sets `removed_at` for graceful termination: - `_process_terminating_job()` will stop the container on the next iteration after that time. + Tells the runner to stop the job's command. Records the first graceful-stop attempt and + sets `remove_at` so `_process_terminating_job()` stops the container on a later iteration. """ job_update_map = _JobUpdateMap() await stop_runner(job_model=job_model, instance_model=instance_model) + job_update_map["graceful_termination_attempts"] = 1 job_update_map["remove_at"] = get_current_datetime() + timedelta(seconds=10) return job_update_map diff --git a/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py index 0d1cc41de..be46bf0ed 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py @@ -34,7 +34,7 @@ class TerminatingRunUpdateMap(ItemUpdateMap, total=False): class TerminatingRunJobUpdateMap(ItemUpdateMap, total=False): status: JobStatus termination_reason: Optional[JobTerminationReason] - graceful_termination: bool + graceful_termination_attempts: int @dataclass @@ -118,7 +118,7 @@ def _get_job_id_to_update_map( job_id_to_update_map[job_id] = TerminatingRunJobUpdateMap( status=JobStatus.TERMINATING, termination_reason=job_termination_reason, - graceful_termination=True, + graceful_termination_attempts=0, ) return job_id_to_update_map diff --git a/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py b/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py index cca097213..6b2d4136c 100644 --- a/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py +++ b/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py @@ -1,4 +1,4 @@ -"""Add JobModel.graceful_termination +"""Add JobModel.graceful_termination_attempts Revision ID: aae8323e38f2 Revises: c1c2ecaee45c @@ -19,7 +19,9 @@ def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table("jobs", schema=None) as batch_op: - batch_op.add_column(sa.Column("graceful_termination", sa.Boolean(), nullable=True)) + batch_op.add_column( + sa.Column("graceful_termination_attempts", sa.Integer(), nullable=True) + ) # ### end Alembic commands ### @@ -27,6 +29,6 @@ def upgrade() -> None: def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table("jobs", schema=None) as batch_op: - batch_op.drop_column("graceful_termination") + batch_op.drop_column("graceful_termination_attempts") # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index a1572b144..b477b20cc 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -497,12 +497,15 @@ class JobModel(PipelineModelMixin, BaseModel): runner_timestamp: Mapped[Optional[int]] = mapped_column(BigInteger) inactivity_secs: Mapped[Optional[int]] = mapped_column(Integer) """`inactivity_secs` uses `0` for active jobs and `None` when inactivity is not applicable.""" - graceful_termination: Mapped[Optional[bool]] = mapped_column(Boolean) - """`graceful_termination` is set for terminating jobs if they need graceful termination. + graceful_termination_attempts: Mapped[Optional[int]] = mapped_column(Integer) + """`graceful_termination_attempts` is used for terminating jobs. + * `None` means graceful termination is not needed + * `0` means it is needed but not attempted, + * `>= 1` means at least one graceful stop attempt was sent. """ remove_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) - """`remove_at` is used to ensure the instance is killed after the job is finished - when performing graceful termination. + """`remove_at` is used to ensure the container/instance is killed after the job is gracefully finished. + Cannot kill the container/instance until `remove_at` is set. """ volumes_detached_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) instance_assigned: Mapped[bool] = mapped_column(Boolean, default=False) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py index 89c97e9d1..15881c007 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py @@ -94,7 +94,7 @@ async def test_transitions_running_jobs_to_terminating( await session.refresh(run) assert job.status == JobStatus.TERMINATING assert job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER - assert job.graceful_termination is True + assert job.graceful_termination_attempts == 0 assert job.remove_at is None assert job.lock_token is None assert job.lock_expires_at is None @@ -146,11 +146,11 @@ async def test_updates_delayed_and_regular_jobs_separately( await session.refresh(regular_job) assert delayed_job.status == JobStatus.TERMINATING assert delayed_job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER - assert delayed_job.graceful_termination is True + assert delayed_job.graceful_termination_attempts == 0 assert delayed_job.remove_at is None assert regular_job.status == JobStatus.TERMINATING assert regular_job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER - assert regular_job.graceful_termination is None + assert regular_job.graceful_termination_attempts is None assert regular_job.remove_at is None async def test_finishes_non_scheduled_run_when_all_jobs_are_finished( @@ -277,7 +277,7 @@ async def change_run_lock(context) -> TerminatingResult: assert run.lock_token == new_lock_token assert run.lock_owner == RunPipeline.__name__ assert job.status == JobStatus.RUNNING - assert job.graceful_termination is None + assert job.graceful_termination_attempts is None assert job.remove_at is None assert job.lock_token is None assert job.lock_expires_at is None diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py index 66efdcaf0..ed5381c08 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py @@ -256,7 +256,7 @@ async def test_stops_job_gracefully_before_terminating_container( job_provisioning_data=get_job_provisioning_data(dockerized=True), instance=instance, ) - job.graceful_termination = True + job.graceful_termination_attempts = 0 _lock_job(job) await session.commit() @@ -278,7 +278,7 @@ async def test_stops_job_gracefully_before_terminating_container( await session.refresh(job) await session.refresh(instance) assert job.status == JobStatus.TERMINATING - assert job.graceful_termination is True + assert job.graceful_termination_attempts == 1 assert job.remove_at is not None assert job.instance_id == instance.id assert job.volumes_detached_at is None @@ -309,7 +309,7 @@ async def test_terminates_gracefully_stopped_job_after_remove_at( job_provisioning_data=get_job_provisioning_data(dockerized=True), instance=instance, ) - job.graceful_termination = True + job.graceful_termination_attempts = 1 job.remove_at = get_current_datetime() - timedelta(minutes=1) _lock_job(job) await session.commit() @@ -332,7 +332,7 @@ async def test_terminates_gracefully_stopped_job_after_remove_at( await session.refresh(job) await session.refresh(instance) assert job.status == JobStatus.TERMINATED - assert job.graceful_termination is True + assert job.graceful_termination_attempts == 1 assert job.remove_at is not None assert job.instance_id is None assert job.lock_token is None From f3b8eaf83e5a943f581e9094ec61f346181f1333 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 30 Mar 2026 13:41:58 +0500 Subject: [PATCH 4/5] Rebase migration --- ...e9d81c97c042_add_jobmodel_graceful_termination_.py} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename src/dstack/_internal/server/migrations/versions/2026/{03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py => 03_30_0841_e9d81c97c042_add_jobmodel_graceful_termination_.py} (83%) diff --git a/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py b/src/dstack/_internal/server/migrations/versions/2026/03_30_0841_e9d81c97c042_add_jobmodel_graceful_termination_.py similarity index 83% rename from src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py rename to src/dstack/_internal/server/migrations/versions/2026/03_30_0841_e9d81c97c042_add_jobmodel_graceful_termination_.py index 6b2d4136c..2e744e05d 100644 --- a/src/dstack/_internal/server/migrations/versions/2026/03_30_0619_aae8323e38f2_add_jobmodel_graceful_termination.py +++ b/src/dstack/_internal/server/migrations/versions/2026/03_30_0841_e9d81c97c042_add_jobmodel_graceful_termination_.py @@ -1,8 +1,8 @@ """Add JobModel.graceful_termination_attempts -Revision ID: aae8323e38f2 -Revises: c1c2ecaee45c -Create Date: 2026-03-30 06:19:29.078302+00:00 +Revision ID: e9d81c97c042 +Revises: 59e328ced74c +Create Date: 2026-03-30 08:41:29.308250+00:00 """ @@ -10,8 +10,8 @@ from alembic import op # revision identifiers, used by Alembic. -revision = "aae8323e38f2" -down_revision = "c1c2ecaee45c" +revision = "e9d81c97c042" +down_revision = "59e328ced74c" branch_labels = None depends_on = None From d513c792b878ff6577bc989be30ea2b0283170c4 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 30 Mar 2026 14:48:31 +0500 Subject: [PATCH 5/5] Add status-specific processing interval --- .../server/background/pipeline_tasks/base.py | 11 ++++++++++- .../background/pipeline_tasks/jobs_running.py | 16 ++++++++++++++-- .../background/pipeline_tasks/runs/__init__.py | 14 ++++++++++++-- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/base.py b/src/dstack/_internal/server/background/pipeline_tasks/base.py index b37940ad1..402be0dc0 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/base.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/base.py @@ -255,6 +255,7 @@ async def heartbeat(self): class Fetcher(Generic[ItemT], ABC): _DEFAULT_FETCH_DELAYS = [0.5, 1, 2, 5] + """Increasing fetch delays on empty fetches to avoid frequent selects on low-activity/low-resource servers.""" def __init__( self, @@ -319,7 +320,15 @@ async def fetch(self, limit: int) -> list[ItemT]: pass def _next_fetch_delay(self, empty_fetch_count: int) -> float: - next_delay = self._fetch_delays[min(empty_fetch_count, len(self._fetch_delays) - 1)] + effective_empty_fetch_count = empty_fetch_count + if random.random() < 0.1: + # Empty fetch count can be 0 not because there are no items in the DB, + # but for other reasons such as waiting parent resource processing. + # From time to time, force minimal next delay to avoid empty results due to rare fetches. + effective_empty_fetch_count = 0 + next_delay = self._fetch_delays[ + min(effective_empty_fetch_count, len(self._fetch_delays) - 1) + ] jitter = random.random() * 0.4 - 0.2 return next_delay * (1 + jitter) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index 73af7d302..e9e6acdd9 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -117,7 +117,7 @@ def __init__( workers_num: int = 20, queue_lower_limit_factor: float = 0.5, queue_upper_limit_factor: float = 2.0, - min_processing_interval: timedelta = timedelta(seconds=10), + min_processing_interval: timedelta = timedelta(seconds=5), lock_timeout: timedelta = timedelta(seconds=30), heartbeat_trigger: timedelta = timedelta(seconds=15), ) -> None: @@ -196,7 +196,19 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]: [JobStatus.PROVISIONING, JobStatus.PULLING, JobStatus.RUNNING] ), RunModel.status.not_in([RunStatus.TERMINATING]), - JobModel.last_processed_at <= now - self._min_processing_interval, + or_( + # Process provisioning and pulling jobs quicker for low-latency provisioning. + # Active jobs processing can be less frequent to minimize contention with `RunPipeline`. + and_( + JobModel.status.in_([JobStatus.PROVISIONING, JobStatus.PULLING]), + JobModel.last_processed_at <= now - self._min_processing_interval, + ), + and_( + JobModel.status.in_([JobStatus.RUNNING]), + JobModel.last_processed_at + <= now - self._min_processing_interval * 2, + ), + ), or_( and_( # Do not try to lock jobs if the run is waiting for the lock, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py index 5c3ec260b..c82a69eda 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py @@ -55,7 +55,7 @@ def __init__( workers_num: int = 10, queue_lower_limit_factor: float = 0.5, queue_upper_limit_factor: float = 2.0, - min_processing_interval: timedelta = timedelta(seconds=10), + min_processing_interval: timedelta = timedelta(seconds=5), lock_timeout: timedelta = timedelta(seconds=30), heartbeat_trigger: timedelta = timedelta(seconds=15), ) -> None: @@ -164,7 +164,17 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]: ), ), or_( - RunModel.last_processed_at <= now - self._min_processing_interval, + # Process submitted runs quicker for low-latency provisioning. + # Active run processing can be less frequent to minimize contention with `JobRunningPipeline`. + and_( + RunModel.status == RunStatus.SUBMITTED, + RunModel.last_processed_at <= now - self._min_processing_interval, + ), + and_( + RunModel.status != RunStatus.SUBMITTED, + RunModel.last_processed_at + <= now - self._min_processing_interval * 2, + ), RunModel.last_processed_at == RunModel.submitted_at, ), or_(