From 8490ae1aa0f3a863816cc800ba9eade163fed553 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Tue, 24 Jun 2025 14:48:10 -0400 Subject: [PATCH 01/23] feat: add volume idle duration cleanup feature (#2497) --- src/dstack/_internal/core/models/volumes.py | 14 ++ .../_internal/server/background/__init__.py | 4 + .../background/tasks/process_idle_volumes.py | 134 +++++++++++ .../tasks/process_submitted_jobs.py | 3 + ...63c924_add_volume_last_job_processed_at.py | 38 +++ src/dstack/_internal/server/models.py | 1 + .../server/services/jobs/__init__.py | 13 + src/dstack/_internal/server/testing/common.py | 2 + .../tasks/test_process_idle_volumes.py | 226 ++++++++++++++++++ 9 files changed, 435 insertions(+) create mode 100644 src/dstack/_internal/server/background/tasks/process_idle_volumes.py create mode 100644 src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py create mode 100644 src/tests/_internal/server/background/tasks/test_process_idle_volumes.py diff --git a/src/dstack/_internal/core/models/volumes.py b/src/dstack/_internal/core/models/volumes.py index 773fd9429f..1f9c077487 100644 --- a/src/dstack/_internal/core/models/volumes.py +++ b/src/dstack/_internal/core/models/volumes.py @@ -9,6 +9,7 @@ from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import CoreModel +from dstack._internal.core.models.profiles import parse_idle_duration from dstack._internal.core.models.resources import Memory from dstack._internal.utils.common import get_or_error from dstack._internal.utils.tags import tags_validator @@ -44,6 +45,16 @@ class VolumeConfiguration(CoreModel): Optional[str], Field(description="The volume ID. Must be specified when registering external volumes"), ] = None + idle_duration: Annotated[ + Optional[Union[str, int, bool]], + Field( + description=( + "Time to wait after volume is no longer used by any job before deleting it. " + "Defaults to keep the volume indefinitely. " + "Use the value 'off' to disable auto-cleanup." + ) + ), + ] = None tags: Annotated[ Optional[Dict[str, str]], Field( @@ -56,6 +67,9 @@ class VolumeConfiguration(CoreModel): ] = None _validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator) + _validate_idle_duration = validator("idle_duration", pre=True, allow_reuse=True)( + parse_idle_duration + ) @property def size_gb(self) -> int: diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 2dd410cd28..cf802b877c 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -7,6 +7,7 @@ process_gateways_connections, process_submitted_gateways, ) +from dstack._internal.server.background.tasks.process_idle_volumes import process_idle_volumes from dstack._internal.server.background.tasks.process_instances import ( process_instances, ) @@ -76,6 +77,9 @@ def start_background_tasks() -> AsyncIOScheduler: _scheduler.add_job( process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5 ) + _scheduler.add_job( + process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1 + ) _scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5)) for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR): # Add multiple copies of tasks if requested. diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py new file mode 100644 index 0000000000..9d49885d90 --- /dev/null +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -0,0 +1,134 @@ +import datetime +from typing import List + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from dstack._internal.core.models.profiles import parse_duration +from dstack._internal.core.models.volumes import VolumeStatus +from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.models import VolumeModel +from dstack._internal.server.services.locking import get_locker +from dstack._internal.server.services.volumes import delete_volumes, get_volume_configuration +from dstack._internal.utils.common import get_current_datetime +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +async def process_idle_volumes(batch_size: int = 10): + """ + Process volumes to check if they have exceeded their idle_duration and delete them. + """ + lock, lockset = get_locker().get_lockset(VolumeModel.__tablename__) + async with get_session_ctx() as session: + async with lock: + res = await session.execute( + select(VolumeModel) + .where( + VolumeModel.status == VolumeStatus.ACTIVE, + VolumeModel.deleted == False, + VolumeModel.id.not_in(lockset), + ) + .options(joinedload(VolumeModel.project)) + .options(joinedload(VolumeModel.attachments)) + .order_by(VolumeModel.last_processed_at.asc()) + .limit(batch_size) + .with_for_update(skip_locked=True) + ) + volume_models = list(res.unique().scalars().all()) + if not volume_models: + return + + # Add to lockset + for volume_model in volume_models: + lockset.add(volume_model.id) + + try: + volumes_to_delete = [] + for volume_model in volume_models: + if await _should_delete_idle_volume(volume_model): + volumes_to_delete.append(volume_model) + + if volumes_to_delete: + await _delete_idle_volumes(session, volumes_to_delete) + + finally: + # Remove from lockset + for volume_model in volume_models: + lockset.difference_update([volume_model.id]) + + +async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool: + """ + Check if a volume should be deleted based on its idle duration. + """ + # Get volume configuration + configuration = get_volume_configuration(volume_model) + + # If no idle_duration is configured, don't delete + if configuration.idle_duration is None: + return False + + # If idle_duration is disabled (negative value), don't delete + if isinstance(configuration.idle_duration, int) and configuration.idle_duration < 0: + return False + + # Parse idle duration + idle_duration_seconds = parse_duration(configuration.idle_duration) + if idle_duration_seconds is None or idle_duration_seconds <= 0: + return False + + # Check if volume is currently attached to any instance + if len(volume_model.attachments) > 0: + logger.debug("Volume %s is still attached to instances, not deleting", volume_model.name) + return False + + # Calculate how long the volume has been idle + idle_duration = _get_volume_idle_duration(volume_model) + idle_threshold = datetime.timedelta(seconds=idle_duration_seconds) + + if idle_duration > idle_threshold: + logger.info( + "Volume %s idle duration expired: idle time %s seconds, threshold %s seconds. Marking for deletion", + volume_model.name, + idle_duration.total_seconds(), + idle_threshold.total_seconds(), + ) + return True + + return False + + +def _get_volume_idle_duration(volume_model: VolumeModel) -> datetime.timedelta: + """ + Calculate how long a volume has been idle. + A volume is considered idle from the time it was last processed by a job. + If it was never used by a job, use the created_at time. + """ + last_time = volume_model.created_at.replace(tzinfo=datetime.timezone.utc) + if volume_model.last_job_processed_at is not None: + last_time = volume_model.last_job_processed_at.replace(tzinfo=datetime.timezone.utc) + return get_current_datetime() - last_time + + +async def _delete_idle_volumes(session: AsyncSession, volume_models: List[VolumeModel]): + """ + Delete volumes that have exceeded their idle duration. + """ + # Group volumes by project + volumes_by_project = {} + for volume_model in volume_models: + project = volume_model.project + if project not in volumes_by_project: + volumes_by_project[project] = [] + volumes_by_project[project].append(volume_model.name) + + # Delete volumes by project + for project, volume_names in volumes_by_project.items(): + logger.info("Deleting idle volumes for project %s: %s", project.name, volume_names) + try: + await delete_volumes(session, project, volume_names) + except Exception as e: + logger.error("Failed to delete idle volumes for project %s: %s", project.name, str(e)) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index b9c2f9c946..219acc8ec8 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -739,3 +739,6 @@ async def _attach_volume( attachment_data=attachment_data.json(), ) instance.volume_attachments.append(volume_attachment_model) + + # Update volume last_job_processed_at when it's attached to a job + volume_model.last_job_processed_at = common_utils.get_current_datetime() diff --git a/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py b/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py new file mode 100644 index 0000000000..60bf75105d --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py @@ -0,0 +1,38 @@ +"""add_volume_last_job_processed_at + +Revision ID: 09f10d63c924 +Revises: 35e90e1b0d3e +Create Date: 2025-06-24 11:41:39.588797 + +""" + +import sqlalchemy as sa +from alembic import op + +import dstack._internal.server.models + +# revision identifiers, used by Alembic. +revision = "09f10d63c924" +down_revision = "35e90e1b0d3e" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("volumes", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "last_job_processed_at", + dstack._internal.server.models.NaiveDateTime(), + nullable=True, + ) + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("volumes", schema=None) as batch_op: + batch_op.drop_column("last_job_processed_at") + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index d39d07be10..c4dafe81e3 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -645,6 +645,7 @@ class VolumeModel(BaseModel): last_processed_at: Mapped[datetime] = mapped_column( NaiveDateTime, default=get_current_datetime ) + last_job_processed_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) deleted: Mapped[bool] = mapped_column(Boolean, default=False) deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 41aa496be7..36c3a55ef5 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -289,6 +289,19 @@ async def process_terminating_job( # so that stuck volumes don't prevent the instance from terminating. job_model.instance_id = None instance_model.last_job_processed_at = common.get_current_datetime() + + # Update volume last_job_processed_at when job is done using them + if jrd is not None and jrd.volume_names is not None: + volume_names = jrd.volume_names + else: + # Legacy jobs before job_runtime_data/blocks were introduced + volume_names = [va.volume.name for va in instance_model.volume_attachments] + volume_models = await list_project_volume_models( + session=session, project=instance_model.project, names=volume_names + ) + for volume_model in volume_models: + volume_model.last_job_processed_at = common.get_current_datetime() + logger.info( "%s: instance '%s' has been released, new status is %s", fmt(job_model), diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 047adb5c14..9c640bd282 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -820,6 +820,7 @@ def get_volume_configuration( region: str = "eu-west-1", size: Optional[Memory] = Memory(100), volume_id: Optional[str] = None, + idle_duration: Optional[Union[str, int, bool]] = None, ) -> VolumeConfiguration: return VolumeConfiguration( name=name, @@ -827,6 +828,7 @@ def get_volume_configuration( region=region, size=size, volume_id=volume_id, + idle_duration=idle_duration, ) diff --git a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py new file mode 100644 index 0000000000..3b74a4b7a4 --- /dev/null +++ b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py @@ -0,0 +1,226 @@ +import datetime +from unittest.mock import patch + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.volumes import VolumeStatus +from dstack._internal.server.background.tasks.process_idle_volumes import ( + _get_volume_idle_duration, + _should_delete_idle_volume, + process_idle_volumes, +) +from dstack._internal.server.models import VolumeAttachmentModel +from dstack._internal.server.testing.common import ( + create_instance, + create_project, + create_user, + create_volume, + get_volume_configuration, + get_volume_provisioning_data, +) + + +class TestProcessIdleVolumes: + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_no_idle_duration_configured(self, test_db, session: AsyncSession): + """Test that volumes without idle_duration configured are not deleted.""" + project = await create_project(session=session) + user = await create_user(session=session) + + # Create volume without idle_duration + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=get_volume_configuration(name="test-volume"), + volume_provisioning_data=get_volume_provisioning_data(), + ) + + should_delete = await _should_delete_idle_volume(volume) + assert not should_delete + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_idle_duration_disabled(self, test_db, session: AsyncSession): + """Test that volumes with idle_duration set to -1 (disabled) are not deleted.""" + project = await create_project(session=session) + user = await create_user(session=session) + + # Create volume with idle_duration disabled + volume_config = get_volume_configuration(name="test-volume") + volume_config.idle_duration = -1 + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=volume_config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + should_delete = await _should_delete_idle_volume(volume) + assert not should_delete + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_volume_still_attached(self, test_db, session: AsyncSession): + """Test that volumes still attached to instances are not deleted.""" + project = await create_project(session=session) + user = await create_user(session=session) + + # Create volume with idle_duration + volume_config = get_volume_configuration(name="test-volume") + volume_config.idle_duration = "1h" # 1 hour + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=volume_config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + # Create an instance and attach the volume to it + instance = await create_instance(session=session, project=project) + attachment = VolumeAttachmentModel( + volume_id=volume.id, + instance_id=instance.id, + ) + volume.attachments.append(attachment) + await session.commit() + + should_delete = await _should_delete_idle_volume(volume) + assert not should_delete + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession): + """Test that volumes within idle duration threshold are not deleted.""" + project = await create_project(session=session) + user = await create_user(session=session) + + # Create volume with idle_duration + volume_config = get_volume_configuration(name="test-volume") + volume_config.idle_duration = "1h" # 1 hour + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=volume_config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + # Set last_job_processed_at to 30 minutes ago (less than 1 hour) + volume.last_job_processed_at = ( + datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30) + ).replace(tzinfo=None) + + should_delete = await _should_delete_idle_volume(volume) + assert not should_delete + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_idle_duration_exceeded(self, test_db, session: AsyncSession): + """Test that volumes exceeding idle duration threshold are marked for deletion.""" + project = await create_project(session=session) + user = await create_user(session=session) + + # Create volume with idle_duration + volume_config = get_volume_configuration(name="test-volume") + volume_config.idle_duration = "1h" # 1 hour + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=volume_config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + # Set last_job_processed_at to 2 hours ago (more than 1 hour) + volume.last_job_processed_at = ( + datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) + ).replace(tzinfo=None) + + should_delete = await _should_delete_idle_volume(volume) + assert should_delete + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_volume_never_used_by_job(self, test_db, session: AsyncSession): + """Test idle duration calculation for volumes never used by jobs.""" + project = await create_project(session=session) + user = await create_user(session=session) + + # Create volume with old created_at time + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=get_volume_configuration(name="test-volume"), + volume_provisioning_data=get_volume_provisioning_data(), + created_at=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2), + ) + + # last_job_processed_at is None, so it should use created_at + volume.last_job_processed_at = None + + idle_duration = _get_volume_idle_duration(volume) + # Should be approximately 2 hours + assert idle_duration.total_seconds() >= 7000 # ~2 hours in seconds + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_process_idle_volumes_integration(self, test_db, session: AsyncSession): + """Integration test for the full process_idle_volumes function.""" + project = await create_project(session=session) + user = await create_user(session=session) + + # Create volume that should be deleted (exceeded idle duration) + volume_config = get_volume_configuration(name="test-volume") + volume_config.idle_duration = "1h" # 1 hour + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=volume_config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + # Set as idle for more than threshold + volume.last_job_processed_at = ( + datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) + ).replace(tzinfo=None) + + await session.commit() + + # Mock the delete_volumes function to avoid actual deletion + with patch( + "dstack._internal.server.background.tasks.process_idle_volumes.delete_volumes" + ) as mock_delete: + await process_idle_volumes() + + # Should have called delete_volumes with the volume + mock_delete.assert_called_once() + call_args = mock_delete.call_args + # The function is called with (session, project, volume_names_list) + assert call_args[0][2] == ["test-volume"] From 8514a12952107d9de2fa6a06f84e39a04173e458 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Tue, 24 Jun 2025 15:19:19 -0400 Subject: [PATCH 02/23] fix: avoid FOR UPDATE with outer join in process_idle_volumes (Postgres compatibility) --- .../server/background/tasks/process_idle_volumes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index 9d49885d90..06d1aa538f 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -3,7 +3,6 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload from dstack._internal.core.models.profiles import parse_duration from dstack._internal.core.models.volumes import VolumeStatus @@ -31,13 +30,14 @@ async def process_idle_volumes(batch_size: int = 10): VolumeModel.deleted == False, VolumeModel.id.not_in(lockset), ) - .options(joinedload(VolumeModel.project)) - .options(joinedload(VolumeModel.attachments)) .order_by(VolumeModel.last_processed_at.asc()) .limit(batch_size) .with_for_update(skip_locked=True) ) volume_models = list(res.unique().scalars().all()) + # Manually load relationships to avoid outer join in the locked query + for volume_model in volume_models: + await session.refresh(volume_model, ["project", "attachments"]) if not volume_models: return From 25f634aace05d0c2c066e4c69b27a09127aaa490 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Wed, 25 Jun 2025 11:01:23 -0400 Subject: [PATCH 03/23] Fix formatting issues --- .../server/background/tasks/process_idle_volumes.py | 11 ----------- .../server/background/tasks/process_submitted_jobs.py | 1 - src/dstack/_internal/server/services/jobs/__init__.py | 2 -- 3 files changed, 14 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index 06d1aa538f..508b7f1da7 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -35,13 +35,11 @@ async def process_idle_volumes(batch_size: int = 10): .with_for_update(skip_locked=True) ) volume_models = list(res.unique().scalars().all()) - # Manually load relationships to avoid outer join in the locked query for volume_model in volume_models: await session.refresh(volume_model, ["project", "attachments"]) if not volume_models: return - # Add to lockset for volume_model in volume_models: lockset.add(volume_model.id) @@ -55,7 +53,6 @@ async def process_idle_volumes(batch_size: int = 10): await _delete_idle_volumes(session, volumes_to_delete) finally: - # Remove from lockset for volume_model in volume_models: lockset.difference_update([volume_model.id]) @@ -64,28 +61,22 @@ async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool: """ Check if a volume should be deleted based on its idle duration. """ - # Get volume configuration configuration = get_volume_configuration(volume_model) - # If no idle_duration is configured, don't delete if configuration.idle_duration is None: return False - # If idle_duration is disabled (negative value), don't delete if isinstance(configuration.idle_duration, int) and configuration.idle_duration < 0: return False - # Parse idle duration idle_duration_seconds = parse_duration(configuration.idle_duration) if idle_duration_seconds is None or idle_duration_seconds <= 0: return False - # Check if volume is currently attached to any instance if len(volume_model.attachments) > 0: logger.debug("Volume %s is still attached to instances, not deleting", volume_model.name) return False - # Calculate how long the volume has been idle idle_duration = _get_volume_idle_duration(volume_model) idle_threshold = datetime.timedelta(seconds=idle_duration_seconds) @@ -117,7 +108,6 @@ async def _delete_idle_volumes(session: AsyncSession, volume_models: List[Volume """ Delete volumes that have exceeded their idle duration. """ - # Group volumes by project volumes_by_project = {} for volume_model in volume_models: project = volume_model.project @@ -125,7 +115,6 @@ async def _delete_idle_volumes(session: AsyncSession, volume_models: List[Volume volumes_by_project[project] = [] volumes_by_project[project].append(volume_model.name) - # Delete volumes by project for project, volume_names in volumes_by_project.items(): logger.info("Deleting idle volumes for project %s: %s", project.name, volume_names) try: diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 219acc8ec8..eba9549b68 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -740,5 +740,4 @@ async def _attach_volume( ) instance.volume_attachments.append(volume_attachment_model) - # Update volume last_job_processed_at when it's attached to a job volume_model.last_job_processed_at = common_utils.get_current_datetime() diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 36c3a55ef5..76133c65ca 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -290,11 +290,9 @@ async def process_terminating_job( job_model.instance_id = None instance_model.last_job_processed_at = common.get_current_datetime() - # Update volume last_job_processed_at when job is done using them if jrd is not None and jrd.volume_names is not None: volume_names = jrd.volume_names else: - # Legacy jobs before job_runtime_data/blocks were introduced volume_names = [va.volume.name for va in instance_model.volume_attachments] volume_models = await list_project_volume_models( session=session, project=instance_model.project, names=volume_names From 127dc2bce883ca83e4d4633dfebc58d611c032ca Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Wed, 25 Jun 2025 16:45:03 -0400 Subject: [PATCH 04/23] Optimize idle volume cleanup implementation --- .../background/tasks/process_idle_volumes.py | 49 +++++++-------- .../tasks/test_process_idle_volumes.py | 59 ++++--------------- 2 files changed, 33 insertions(+), 75 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index 508b7f1da7..aaa7aa4fa4 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -16,10 +16,7 @@ logger = get_logger(__name__) -async def process_idle_volumes(batch_size: int = 10): - """ - Process volumes to check if they have exceeded their idle_duration and delete them. - """ +async def process_idle_volumes(): lock, lockset = get_locker().get_lockset(VolumeModel.__tablename__) async with get_session_ctx() as session: async with lock: @@ -31,22 +28,20 @@ async def process_idle_volumes(batch_size: int = 10): VolumeModel.id.not_in(lockset), ) .order_by(VolumeModel.last_processed_at.asc()) - .limit(batch_size) + .limit(10) .with_for_update(skip_locked=True) ) volume_models = list(res.unique().scalars().all()) - for volume_model in volume_models: - await session.refresh(volume_model, ["project", "attachments"]) if not volume_models: return - for volume_model in volume_models: + await session.refresh(volume_model, ["project", "attachments"]) lockset.add(volume_model.id) try: volumes_to_delete = [] for volume_model in volume_models: - if await _should_delete_idle_volume(volume_model): + if _should_delete_idle_volume(volume_model): volumes_to_delete.append(volume_model) if volumes_to_delete: @@ -54,13 +49,10 @@ async def process_idle_volumes(batch_size: int = 10): finally: for volume_model in volume_models: - lockset.difference_update([volume_model.id]) + lockset.discard(volume_model.id) -async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool: - """ - Check if a volume should be deleted based on its idle duration. - """ +def _should_delete_idle_volume(volume_model: VolumeModel) -> bool: configuration = get_volume_configuration(volume_model) if configuration.idle_duration is None: @@ -82,10 +74,10 @@ async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool: if idle_duration > idle_threshold: logger.info( - "Volume %s idle duration expired: idle time %s seconds, threshold %s seconds. Marking for deletion", + "Volume %s idle duration expired: idle time %.1f hours, threshold %.1f hours. Marking for deletion", volume_model.name, - idle_duration.total_seconds(), - idle_threshold.total_seconds(), + idle_duration.total_seconds() / 3600, + idle_threshold.total_seconds() / 3600, ) return True @@ -93,21 +85,22 @@ async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool: def _get_volume_idle_duration(volume_model: VolumeModel) -> datetime.timedelta: - """ - Calculate how long a volume has been idle. - A volume is considered idle from the time it was last processed by a job. - If it was never used by a job, use the created_at time. - """ - last_time = volume_model.created_at.replace(tzinfo=datetime.timezone.utc) + reference_time = volume_model.created_at if volume_model.last_job_processed_at is not None: - last_time = volume_model.last_job_processed_at.replace(tzinfo=datetime.timezone.utc) - return get_current_datetime() - last_time + reference_time = volume_model.last_job_processed_at + + reference_time_utc = reference_time.replace(tzinfo=datetime.timezone.utc) + current_time = get_current_datetime() + + idle_duration = current_time - reference_time_utc + + if idle_duration.total_seconds() < 0: + return datetime.timedelta(0) + + return idle_duration async def _delete_idle_volumes(session: AsyncSession, volume_models: List[VolumeModel]): - """ - Delete volumes that have exceeded their idle duration. - """ volumes_by_project = {} for volume_model in volume_models: project = volume_model.project diff --git a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py index 3b74a4b7a4..ae61cee4e5 100644 --- a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py +++ b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py @@ -22,15 +22,13 @@ ) +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) class TestProcessIdleVolumes: - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_no_idle_duration_configured(self, test_db, session: AsyncSession): - """Test that volumes without idle_duration configured are not deleted.""" project = await create_project(session=session) user = await create_user(session=session) - # Create volume without idle_duration volume = await create_volume( session=session, project=project, @@ -41,17 +39,13 @@ async def test_no_idle_duration_configured(self, test_db, session: AsyncSession) volume_provisioning_data=get_volume_provisioning_data(), ) - should_delete = await _should_delete_idle_volume(volume) + should_delete = _should_delete_idle_volume(volume) assert not should_delete - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_idle_duration_disabled(self, test_db, session: AsyncSession): - """Test that volumes with idle_duration set to -1 (disabled) are not deleted.""" project = await create_project(session=session) user = await create_user(session=session) - # Create volume with idle_duration disabled volume_config = get_volume_configuration(name="test-volume") volume_config.idle_duration = -1 @@ -65,19 +59,15 @@ async def test_idle_duration_disabled(self, test_db, session: AsyncSession): volume_provisioning_data=get_volume_provisioning_data(), ) - should_delete = await _should_delete_idle_volume(volume) + should_delete = _should_delete_idle_volume(volume) assert not should_delete - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_volume_still_attached(self, test_db, session: AsyncSession): - """Test that volumes still attached to instances are not deleted.""" project = await create_project(session=session) user = await create_user(session=session) - # Create volume with idle_duration volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" # 1 hour + volume_config.idle_duration = "1h" volume = await create_volume( session=session, @@ -89,7 +79,6 @@ async def test_volume_still_attached(self, test_db, session: AsyncSession): volume_provisioning_data=get_volume_provisioning_data(), ) - # Create an instance and attach the volume to it instance = await create_instance(session=session, project=project) attachment = VolumeAttachmentModel( volume_id=volume.id, @@ -98,19 +87,15 @@ async def test_volume_still_attached(self, test_db, session: AsyncSession): volume.attachments.append(attachment) await session.commit() - should_delete = await _should_delete_idle_volume(volume) + should_delete = _should_delete_idle_volume(volume) assert not should_delete - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession): - """Test that volumes within idle duration threshold are not deleted.""" project = await create_project(session=session) user = await create_user(session=session) - # Create volume with idle_duration volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" # 1 hour + volume_config.idle_duration = "1h" volume = await create_volume( session=session, @@ -122,24 +107,19 @@ async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession): volume_provisioning_data=get_volume_provisioning_data(), ) - # Set last_job_processed_at to 30 minutes ago (less than 1 hour) volume.last_job_processed_at = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30) ).replace(tzinfo=None) - should_delete = await _should_delete_idle_volume(volume) + should_delete = _should_delete_idle_volume(volume) assert not should_delete - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_idle_duration_exceeded(self, test_db, session: AsyncSession): - """Test that volumes exceeding idle duration threshold are marked for deletion.""" project = await create_project(session=session) user = await create_user(session=session) - # Create volume with idle_duration volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" # 1 hour + volume_config.idle_duration = "1h" volume = await create_volume( session=session, @@ -151,22 +131,17 @@ async def test_idle_duration_exceeded(self, test_db, session: AsyncSession): volume_provisioning_data=get_volume_provisioning_data(), ) - # Set last_job_processed_at to 2 hours ago (more than 1 hour) volume.last_job_processed_at = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) ).replace(tzinfo=None) - should_delete = await _should_delete_idle_volume(volume) + should_delete = _should_delete_idle_volume(volume) assert should_delete - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_volume_never_used_by_job(self, test_db, session: AsyncSession): - """Test idle duration calculation for volumes never used by jobs.""" project = await create_project(session=session) user = await create_user(session=session) - # Create volume with old created_at time volume = await create_volume( session=session, project=project, @@ -178,23 +153,17 @@ async def test_volume_never_used_by_job(self, test_db, session: AsyncSession): created_at=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2), ) - # last_job_processed_at is None, so it should use created_at volume.last_job_processed_at = None idle_duration = _get_volume_idle_duration(volume) - # Should be approximately 2 hours - assert idle_duration.total_seconds() >= 7000 # ~2 hours in seconds + assert idle_duration.total_seconds() >= 7000 - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_process_idle_volumes_integration(self, test_db, session: AsyncSession): - """Integration test for the full process_idle_volumes function.""" project = await create_project(session=session) user = await create_user(session=session) - # Create volume that should be deleted (exceeded idle duration) volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" # 1 hour + volume_config.idle_duration = "1h" volume = await create_volume( session=session, @@ -206,21 +175,17 @@ async def test_process_idle_volumes_integration(self, test_db, session: AsyncSes volume_provisioning_data=get_volume_provisioning_data(), ) - # Set as idle for more than threshold volume.last_job_processed_at = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) ).replace(tzinfo=None) await session.commit() - # Mock the delete_volumes function to avoid actual deletion with patch( "dstack._internal.server.background.tasks.process_idle_volumes.delete_volumes" ) as mock_delete: await process_idle_volumes() - # Should have called delete_volumes with the volume mock_delete.assert_called_once() call_args = mock_delete.call_args - # The function is called with (session, project, volume_names_list) assert call_args[0][2] == ["test-volume"] From 75dadddb998e87878ac0f5839370794cd3a95182 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Thu, 26 Jun 2025 14:10:04 -0400 Subject: [PATCH 05/23] Removed unnecessary changes and refactor implementations --- .../background/tasks/process_idle_volumes.py | 92 ++++++++---------- .../server/services/jobs/__init__.py | 19 ++-- .../tasks/test_process_idle_volumes.py | 94 +++++++------------ 3 files changed, 84 insertions(+), 121 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index aaa7aa4fa4..bf9c33d13a 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -31,86 +31,74 @@ async def process_idle_volumes(): .limit(10) .with_for_update(skip_locked=True) ) - volume_models = list(res.unique().scalars().all()) - if not volume_models: + volumes = list(res.unique().scalars().all()) + if not volumes: return - for volume_model in volume_models: - await session.refresh(volume_model, ["project", "attachments"]) - lockset.add(volume_model.id) + for volume in volumes: + await session.refresh(volume, ["project", "attachments"]) + lockset.add(volume.id) try: - volumes_to_delete = [] - for volume_model in volume_models: - if _should_delete_idle_volume(volume_model): - volumes_to_delete.append(volume_model) + to_delete = [] + for volume in volumes: + if _should_delete_volume(volume): + to_delete.append(volume) - if volumes_to_delete: - await _delete_idle_volumes(session, volumes_to_delete) + if to_delete: + await _delete_volumes(session, to_delete) finally: - for volume_model in volume_models: - lockset.discard(volume_model.id) + for volume in volumes: + lockset.discard(volume.id) -def _should_delete_idle_volume(volume_model: VolumeModel) -> bool: - configuration = get_volume_configuration(volume_model) +def _should_delete_volume(volume: VolumeModel) -> bool: + config = get_volume_configuration(volume) - if configuration.idle_duration is None: + if not config.idle_duration: return False - if isinstance(configuration.idle_duration, int) and configuration.idle_duration < 0: + if isinstance(config.idle_duration, int) and config.idle_duration < 0: return False - idle_duration_seconds = parse_duration(configuration.idle_duration) - if idle_duration_seconds is None or idle_duration_seconds <= 0: + duration_seconds = parse_duration(config.idle_duration) + if not duration_seconds or duration_seconds <= 0: return False - if len(volume_model.attachments) > 0: - logger.debug("Volume %s is still attached to instances, not deleting", volume_model.name) + if volume.attachments: return False - idle_duration = _get_volume_idle_duration(volume_model) - idle_threshold = datetime.timedelta(seconds=idle_duration_seconds) + idle_time = _get_idle_time(volume) + threshold = datetime.timedelta(seconds=duration_seconds) - if idle_duration > idle_threshold: + if idle_time > threshold: logger.info( - "Volume %s idle duration expired: idle time %.1f hours, threshold %.1f hours. Marking for deletion", - volume_model.name, - idle_duration.total_seconds() / 3600, - idle_threshold.total_seconds() / 3600, + "Deleting idle volume %s (idle %.1fh)", volume.name, idle_time.total_seconds() / 3600 ) return True return False -def _get_volume_idle_duration(volume_model: VolumeModel) -> datetime.timedelta: - reference_time = volume_model.created_at - if volume_model.last_job_processed_at is not None: - reference_time = volume_model.last_job_processed_at +def _get_idle_time(volume: VolumeModel) -> datetime.timedelta: + last_used = volume.last_job_processed_at or volume.created_at + last_used_utc = last_used.replace(tzinfo=datetime.timezone.utc) + now = get_current_datetime() - reference_time_utc = reference_time.replace(tzinfo=datetime.timezone.utc) - current_time = get_current_datetime() + idle_time = now - last_used_utc + return max(idle_time, datetime.timedelta(0)) - idle_duration = current_time - reference_time_utc - if idle_duration.total_seconds() < 0: - return datetime.timedelta(0) +async def _delete_volumes(session: AsyncSession, volumes: List[VolumeModel]): + by_project = {} + for volume in volumes: + project = volume.project + if project not in by_project: + by_project[project] = [] + by_project[project].append(volume.name) - return idle_duration - - -async def _delete_idle_volumes(session: AsyncSession, volume_models: List[VolumeModel]): - volumes_by_project = {} - for volume_model in volume_models: - project = volume_model.project - if project not in volumes_by_project: - volumes_by_project[project] = [] - volumes_by_project[project].append(volume_model.name) - - for project, volume_names in volumes_by_project.items(): - logger.info("Deleting idle volumes for project %s: %s", project.name, volume_names) + for project, names in by_project.items(): try: - await delete_volumes(session, project, volume_names) + await delete_volumes(session, project, names) except Exception as e: - logger.error("Failed to delete idle volumes for project %s: %s", project.name, str(e)) + logger.error("Failed to delete volumes for project %s: %s", project.name, str(e)) diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 76133c65ca..38b01fde11 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -290,15 +290,18 @@ async def process_terminating_job( job_model.instance_id = None instance_model.last_job_processed_at = common.get_current_datetime() - if jrd is not None and jrd.volume_names is not None: - volume_names = jrd.volume_names - else: - volume_names = [va.volume.name for va in instance_model.volume_attachments] - volume_models = await list_project_volume_models( - session=session, project=instance_model.project, names=volume_names + # Update volume timestamps + volume_names = ( + jrd.volume_names + if jrd and jrd.volume_names + else [va.volume.name for va in instance_model.volume_attachments] ) - for volume_model in volume_models: - volume_model.last_job_processed_at = common.get_current_datetime() + if volume_names: + volumes = await list_project_volume_models( + session=session, project=instance_model.project, names=volume_names + ) + for volume in volumes: + volume.last_job_processed_at = common.get_current_datetime() logger.info( "%s: instance '%s' has been released, new status is %s", diff --git a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py index ae61cee4e5..66a39e4aaf 100644 --- a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py +++ b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py @@ -7,8 +7,8 @@ from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.volumes import VolumeStatus from dstack._internal.server.background.tasks.process_idle_volumes import ( - _get_volume_idle_duration, - _should_delete_idle_volume, + _get_idle_time, + _should_delete_volume, process_idle_volumes, ) from dstack._internal.server.models import VolumeAttachmentModel @@ -25,7 +25,7 @@ @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) class TestProcessIdleVolumes: - async def test_no_idle_duration_configured(self, test_db, session: AsyncSession): + async def test_no_idle_duration(self, test_db, session: AsyncSession): project = await create_project(session=session) user = await create_user(session=session) @@ -39,15 +39,14 @@ async def test_no_idle_duration_configured(self, test_db, session: AsyncSession) volume_provisioning_data=get_volume_provisioning_data(), ) - should_delete = _should_delete_idle_volume(volume) - assert not should_delete + assert not _should_delete_volume(volume) async def test_idle_duration_disabled(self, test_db, session: AsyncSession): project = await create_project(session=session) user = await create_user(session=session) - volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = -1 + config = get_volume_configuration(name="test-volume") + config.idle_duration = -1 volume = await create_volume( session=session, @@ -55,19 +54,18 @@ async def test_idle_duration_disabled(self, test_db, session: AsyncSession): user=user, status=VolumeStatus.ACTIVE, backend=BackendType.AWS, - configuration=volume_config, + configuration=config, volume_provisioning_data=get_volume_provisioning_data(), ) - should_delete = _should_delete_idle_volume(volume) - assert not should_delete + assert not _should_delete_volume(volume) - async def test_volume_still_attached(self, test_db, session: AsyncSession): + async def test_volume_attached(self, test_db, session: AsyncSession): project = await create_project(session=session) user = await create_user(session=session) - volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" + config = get_volume_configuration(name="test-volume") + config.idle_duration = "1h" volume = await create_volume( session=session, @@ -75,27 +73,24 @@ async def test_volume_still_attached(self, test_db, session: AsyncSession): user=user, status=VolumeStatus.ACTIVE, backend=BackendType.AWS, - configuration=volume_config, + configuration=config, volume_provisioning_data=get_volume_provisioning_data(), ) instance = await create_instance(session=session, project=project) - attachment = VolumeAttachmentModel( - volume_id=volume.id, - instance_id=instance.id, + volume.attachments.append( + VolumeAttachmentModel(volume_id=volume.id, instance_id=instance.id) ) - volume.attachments.append(attachment) await session.commit() - should_delete = _should_delete_idle_volume(volume) - assert not should_delete + assert not _should_delete_volume(volume) - async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession): + async def test_idle_duration_threshold(self, test_db, session: AsyncSession): project = await create_project(session=session) user = await create_user(session=session) - volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" + config = get_volume_configuration(name="test-volume") + config.idle_duration = "1h" volume = await create_volume( session=session, @@ -103,42 +98,23 @@ async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession): user=user, status=VolumeStatus.ACTIVE, backend=BackendType.AWS, - configuration=volume_config, + configuration=config, volume_provisioning_data=get_volume_provisioning_data(), ) + # Not exceeded - 30 minutes ago volume.last_job_processed_at = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30) ).replace(tzinfo=None) + assert not _should_delete_volume(volume) - should_delete = _should_delete_idle_volume(volume) - assert not should_delete - - async def test_idle_duration_exceeded(self, test_db, session: AsyncSession): - project = await create_project(session=session) - user = await create_user(session=session) - - volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" - - volume = await create_volume( - session=session, - project=project, - user=user, - status=VolumeStatus.ACTIVE, - backend=BackendType.AWS, - configuration=volume_config, - volume_provisioning_data=get_volume_provisioning_data(), - ) - + # Exceeded - 2 hours ago volume.last_job_processed_at = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) ).replace(tzinfo=None) + assert _should_delete_volume(volume) - should_delete = _should_delete_idle_volume(volume) - assert should_delete - - async def test_volume_never_used_by_job(self, test_db, session: AsyncSession): + async def test_never_used_volume(self, test_db, session: AsyncSession): project = await create_project(session=session) user = await create_user(session=session) @@ -154,16 +130,15 @@ async def test_volume_never_used_by_job(self, test_db, session: AsyncSession): ) volume.last_job_processed_at = None + idle_time = _get_idle_time(volume) + assert idle_time.total_seconds() >= 7000 - idle_duration = _get_volume_idle_duration(volume) - assert idle_duration.total_seconds() >= 7000 - - async def test_process_idle_volumes_integration(self, test_db, session: AsyncSession): + async def test_integration(self, test_db, session: AsyncSession): project = await create_project(session=session) user = await create_user(session=session) - volume_config = get_volume_configuration(name="test-volume") - volume_config.idle_duration = "1h" + config = get_volume_configuration(name="test-volume") + config.idle_duration = "1h" volume = await create_volume( session=session, @@ -171,21 +146,18 @@ async def test_process_idle_volumes_integration(self, test_db, session: AsyncSes user=user, status=VolumeStatus.ACTIVE, backend=BackendType.AWS, - configuration=volume_config, + configuration=config, volume_provisioning_data=get_volume_provisioning_data(), ) volume.last_job_processed_at = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) ).replace(tzinfo=None) - await session.commit() with patch( "dstack._internal.server.background.tasks.process_idle_volumes.delete_volumes" - ) as mock_delete: + ) as mock: await process_idle_volumes() - - mock_delete.assert_called_once() - call_args = mock_delete.call_args - assert call_args[0][2] == ["test-volume"] + mock.assert_called_once() + assert mock.call_args[0][2] == ["test-volume"] From ad05cb30846263acd988d67cf72d31e4e7f99986 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Thu, 26 Jun 2025 14:10:04 -0400 Subject: [PATCH 06/23] Removed unnecessary changes and refactor implementations --- src/dstack/_internal/cli/services/profile.py | 2 +- src/dstack/_internal/core/models/profiles.py | 6 ++---- src/dstack/_internal/core/models/volumes.py | 12 ++++++------ .../server/background/tasks/process_idle_volumes.py | 10 +++++----- src/dstack/_internal/server/testing/common.py | 4 ++-- .../background/tasks/test_process_idle_volumes.py | 8 ++++---- 6 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/dstack/_internal/cli/services/profile.py b/src/dstack/_internal/cli/services/profile.py index 23bbe55ad3..d57ea2e130 100644 --- a/src/dstack/_internal/cli/services/profile.py +++ b/src/dstack/_internal/cli/services/profile.py @@ -159,7 +159,7 @@ def apply_profile_args( if args.idle_duration is not None: profile_settings.idle_duration = args.idle_duration elif args.dont_destroy: - profile_settings.idle_duration = False + profile_settings.idle_duration = "off" if args.creation_policy_reuse: profile_settings.creation_policy = CreationPolicy.REUSE diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index 62997ce4e4..0ebaef81fb 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -74,11 +74,9 @@ def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str return parse_duration(v) -def parse_idle_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]: - if v is False: +def parse_idle_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]: + if v == "off" or v == -1: return -1 - if v is True: - return None return parse_duration(v) diff --git a/src/dstack/_internal/core/models/volumes.py b/src/dstack/_internal/core/models/volumes.py index 1f9c077487..0f89b770f0 100644 --- a/src/dstack/_internal/core/models/volumes.py +++ b/src/dstack/_internal/core/models/volumes.py @@ -45,13 +45,13 @@ class VolumeConfiguration(CoreModel): Optional[str], Field(description="The volume ID. Must be specified when registering external volumes"), ] = None - idle_duration: Annotated[ - Optional[Union[str, int, bool]], + auto_cleanup_duration: Annotated[ + Optional[Union[str, int]], Field( description=( "Time to wait after volume is no longer used by any job before deleting it. " "Defaults to keep the volume indefinitely. " - "Use the value 'off' to disable auto-cleanup." + "Use the value 'off' or -1 to disable auto-cleanup." ) ), ] = None @@ -67,9 +67,9 @@ class VolumeConfiguration(CoreModel): ] = None _validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator) - _validate_idle_duration = validator("idle_duration", pre=True, allow_reuse=True)( - parse_idle_duration - ) + _validate_auto_cleanup_duration = validator( + "auto_cleanup_duration", pre=True, allow_reuse=True + )(parse_idle_duration) @property def size_gb(self) -> int: diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index bf9c33d13a..f54d5994d8 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -55,13 +55,13 @@ async def process_idle_volumes(): def _should_delete_volume(volume: VolumeModel) -> bool: config = get_volume_configuration(volume) - if not config.idle_duration: + if not config.auto_cleanup_duration: return False - if isinstance(config.idle_duration, int) and config.idle_duration < 0: + if isinstance(config.auto_cleanup_duration, int) and config.auto_cleanup_duration < 0: return False - duration_seconds = parse_duration(config.idle_duration) + duration_seconds = parse_duration(config.auto_cleanup_duration) if not duration_seconds or duration_seconds <= 0: return False @@ -100,5 +100,5 @@ async def _delete_volumes(session: AsyncSession, volumes: List[VolumeModel]): for project, names in by_project.items(): try: await delete_volumes(session, project, names) - except Exception as e: - logger.error("Failed to delete volumes for project %s: %s", project.name, str(e)) + except Exception: + logger.exception("Failed to delete volumes for project %s", project.name) diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 9c640bd282..50e32e969c 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -820,7 +820,7 @@ def get_volume_configuration( region: str = "eu-west-1", size: Optional[Memory] = Memory(100), volume_id: Optional[str] = None, - idle_duration: Optional[Union[str, int, bool]] = None, + auto_cleanup_duration: Optional[Union[str, int]] = None, ) -> VolumeConfiguration: return VolumeConfiguration( name=name, @@ -828,7 +828,7 @@ def get_volume_configuration( region=region, size=size, volume_id=volume_id, - idle_duration=idle_duration, + auto_cleanup_duration=auto_cleanup_duration, ) diff --git a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py index 66a39e4aaf..27bac31526 100644 --- a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py +++ b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py @@ -46,7 +46,7 @@ async def test_idle_duration_disabled(self, test_db, session: AsyncSession): user = await create_user(session=session) config = get_volume_configuration(name="test-volume") - config.idle_duration = -1 + config.auto_cleanup_duration = -1 volume = await create_volume( session=session, @@ -65,7 +65,7 @@ async def test_volume_attached(self, test_db, session: AsyncSession): user = await create_user(session=session) config = get_volume_configuration(name="test-volume") - config.idle_duration = "1h" + config.auto_cleanup_duration = "1h" volume = await create_volume( session=session, @@ -90,7 +90,7 @@ async def test_idle_duration_threshold(self, test_db, session: AsyncSession): user = await create_user(session=session) config = get_volume_configuration(name="test-volume") - config.idle_duration = "1h" + config.auto_cleanup_duration = "1h" volume = await create_volume( session=session, @@ -138,7 +138,7 @@ async def test_integration(self, test_db, session: AsyncSession): user = await create_user(session=session) config = get_volume_configuration(name="test-volume") - config.idle_duration = "1h" + config.auto_cleanup_duration = "1h" volume = await create_volume( session=session, From 3d5649680a1b3fda8180d5cb15d5ee9f5d0c638c Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Thu, 3 Jul 2025 10:57:56 -0400 Subject: [PATCH 07/23] Fix volume auto-cleanup locking and mocking issues --- .../background/tasks/process_idle_volumes.py | 58 ++++++++++++------- .../tasks/test_process_idle_volumes.py | 14 ++--- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index f54d5994d8..93b24f4f4f 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -1,15 +1,16 @@ import datetime from typing import List -from sqlalchemy import select +from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload from dstack._internal.core.models.profiles import parse_duration from dstack._internal.core.models.volumes import VolumeStatus from dstack._internal.server.db import get_session_ctx from dstack._internal.server.models import VolumeModel from dstack._internal.server.services.locking import get_locker -from dstack._internal.server.services.volumes import delete_volumes, get_volume_configuration +from dstack._internal.server.services.volumes import get_volume_configuration from dstack._internal.utils.common import get_current_datetime from dstack._internal.utils.logging import get_logger @@ -19,9 +20,10 @@ async def process_idle_volumes(): lock, lockset = get_locker().get_lockset(VolumeModel.__tablename__) async with get_session_ctx() as session: + # Take lock, select IDs, add to lockset, release lock async with lock: res = await session.execute( - select(VolumeModel) + select(VolumeModel.id) .where( VolumeModel.status == VolumeStatus.ACTIVE, VolumeModel.deleted == False, @@ -31,12 +33,21 @@ async def process_idle_volumes(): .limit(10) .with_for_update(skip_locked=True) ) - volumes = list(res.unique().scalars().all()) - if not volumes: + volume_ids = list(res.scalars().all()) + if not volume_ids: return - for volume in volumes: - await session.refresh(volume, ["project", "attachments"]) - lockset.add(volume.id) + for volume_id in volume_ids: + lockset.add(volume_id) + + # Load volumes with related attributes in one query + res = await session.execute( + select(VolumeModel) + .where(VolumeModel.id.in_(volume_ids)) + .options(selectinload(VolumeModel.project)) + .options(selectinload(VolumeModel.attachments)) + .execution_options(populate_existing=True) + ) + volumes = list(res.unique().scalars().all()) try: to_delete = [] @@ -45,11 +56,11 @@ async def process_idle_volumes(): to_delete.append(volume) if to_delete: - await _delete_volumes(session, to_delete) + await _delete_idle_volumes(session, to_delete) finally: - for volume in volumes: - lockset.discard(volume.id) + for volume_id in volume_ids: + lockset.discard(volume_id) def _should_delete_volume(volume: VolumeModel) -> bool: @@ -89,16 +100,21 @@ def _get_idle_time(volume: VolumeModel) -> datetime.timedelta: return max(idle_time, datetime.timedelta(0)) -async def _delete_volumes(session: AsyncSession, volumes: List[VolumeModel]): - by_project = {} +async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]): + """Delete idle volumes without using the delete_volumes function to avoid locking conflicts.""" for volume in volumes: - project = volume.project - if project not in by_project: - by_project[project] = [] - by_project[project].append(volume.name) - - for project, names in by_project.items(): try: - await delete_volumes(session, project, names) + # Mark volume as deleted + await session.execute( + update(VolumeModel) + .where(VolumeModel.id == volume.id) + .values( + deleted=True, + deleted_at=get_current_datetime(), + ) + ) + logger.info("Marked idle volume %s for deletion", volume.name) except Exception: - logger.exception("Failed to delete volumes for project %s", project.name) + logger.exception("Failed to mark volume %s for deletion", volume.name) + + await session.commit() diff --git a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py index 27bac31526..62b95ec252 100644 --- a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py +++ b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py @@ -1,5 +1,4 @@ import datetime -from unittest.mock import patch import pytest from sqlalchemy.ext.asyncio import AsyncSession @@ -155,9 +154,10 @@ async def test_integration(self, test_db, session: AsyncSession): ).replace(tzinfo=None) await session.commit() - with patch( - "dstack._internal.server.background.tasks.process_idle_volumes.delete_volumes" - ) as mock: - await process_idle_volumes() - mock.assert_called_once() - assert mock.call_args[0][2] == ["test-volume"] + # Run the background task + await process_idle_volumes() + + # Refresh the volume to see if it was marked as deleted + await session.refresh(volume) + assert volume.deleted is True + assert volume.deleted_at is not None From 29187a8d887f285d36aac5a4e79a85c50e8b8e6f Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Mon, 7 Jul 2025 10:23:46 -0400 Subject: [PATCH 08/23] feat: add merge migration for volume cleanup and secrets --- ...bd76b_merge_volume_cleanup_and_secrets_.py | 21 +++++ .../server/services/test_fleets_yaml.py | 55 ++++++++++++ .../server/services/test_runs_yaml.py | 87 +++++++++++++++++++ ~/.dstack/server/config.yml | 7 ++ 4 files changed, 170 insertions(+) create mode 100644 src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py create mode 100644 src/tests/_internal/server/services/test_fleets_yaml.py create mode 100644 src/tests/_internal/server/services/test_runs_yaml.py create mode 100644 ~/.dstack/server/config.yml diff --git a/src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py b/src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py new file mode 100644 index 0000000000..6d6ce58f34 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py @@ -0,0 +1,21 @@ +"""merge volume cleanup and secrets migrations + +Revision ID: d268739bd76b +Revises: 09f10d63c924, 644b8a114187 +Create Date: 2025-07-07 10:22:50.061037 + +""" + +# revision identifiers, used by Alembic. +revision = "d268739bd76b" +down_revision = ("09f10d63c924", "644b8a114187") +branch_labels = None +depends_on = None + + +def upgrade() -> None: + pass + + +def downgrade() -> None: + pass diff --git a/src/tests/_internal/server/services/test_fleets_yaml.py b/src/tests/_internal/server/services/test_fleets_yaml.py new file mode 100644 index 0000000000..b85d90ba6d --- /dev/null +++ b/src/tests/_internal/server/services/test_fleets_yaml.py @@ -0,0 +1,55 @@ +from dstack._internal.core.models.fleets import FleetConfiguration, FleetSpec +from dstack._internal.core.models.profiles import Profile +from dstack._internal.server.services.fleets import fleet_configuration_to_yaml + + +def test_fleet_configuration_to_yaml(): + """Test that fleet configuration is correctly converted to YAML.""" + # Create a simple fleet configuration + config = FleetConfiguration( + name="test-fleet", + nodes=2, + resources={"gpu": "24GB"}, + ) + + spec = FleetSpec( + configuration=config, + profile=Profile(name="test-profile"), + ) + + # Convert to YAML + yaml_content = fleet_configuration_to_yaml(spec) + + # Verify the YAML contains expected content + assert "name: test-fleet" in yaml_content + assert "min: 2" in yaml_content # nodes.min + assert "max: 2" in yaml_content # nodes.max + assert "gpu:" in yaml_content + assert "type: fleet" in yaml_content + + +def test_fleet_configuration_to_yaml_with_ssh(): + """Test that SSH fleet configuration is correctly converted to YAML.""" + # Create an SSH fleet configuration + config = FleetConfiguration( + name="ssh-fleet", + ssh_config={ + "user": "ubuntu", + "hosts": ["192.168.1.100", "192.168.1.101"], + }, + ) + + spec = FleetSpec( + configuration=config, + profile=Profile(name="test-profile"), + ) + + # Convert to YAML + yaml_content = fleet_configuration_to_yaml(spec) + + # Verify the YAML contains expected content + assert "name: ssh-fleet" in yaml_content + assert "user: ubuntu" in yaml_content + assert "192.168.1.100" in yaml_content + assert "192.168.1.101" in yaml_content + assert "type: fleet" in yaml_content diff --git a/src/tests/_internal/server/services/test_runs_yaml.py b/src/tests/_internal/server/services/test_runs_yaml.py new file mode 100644 index 0000000000..76032a98ac --- /dev/null +++ b/src/tests/_internal/server/services/test_runs_yaml.py @@ -0,0 +1,87 @@ +from dstack._internal.core.models.configurations import ( + DevEnvironmentConfiguration, + ServiceConfiguration, + TaskConfiguration, +) +from dstack._internal.core.models.runs import RunSpec +from dstack._internal.server.services.runs import run_configuration_to_yaml + + +class TestRunConfigurationYaml: + def test_task_configuration_to_yaml(self): + """Test converting task configuration to YAML""" + config = TaskConfiguration( + name="test-task", + commands=["echo 'Hello World'"], + resources={"cpu": 1, "memory": "1GB"}, + image="python:3.9", + ) + + run_spec = RunSpec( + run_name="test-run", configuration=config, ssh_key_pub="ssh-rsa test-key" + ) + + yaml_content = run_configuration_to_yaml(run_spec) + + assert "name: test-run" in yaml_content + assert "type: task" in yaml_content + assert "commands:" in yaml_content + assert "echo 'Hello World'" in yaml_content + assert "image: python:3.9" in yaml_content + + def test_service_configuration_to_yaml(self): + """Test converting service configuration to YAML""" + config = ServiceConfiguration( + name="test-service", + commands=["python app.py"], + port=8080, + resources={"cpu": 2, "memory": "2GB"}, + image="python:3.9", + ) + + run_spec = RunSpec( + run_name="test-service-run", configuration=config, ssh_key_pub="ssh-rsa test-key" + ) + + yaml_content = run_configuration_to_yaml(run_spec) + + assert "name: test-service-run" in yaml_content + assert "type: service" in yaml_content + assert "commands:" in yaml_content + assert "python app.py" in yaml_content + assert "port:" in yaml_content + + def test_dev_environment_configuration_to_yaml(self): + """Test converting dev environment configuration to YAML""" + config = DevEnvironmentConfiguration( + name="test-dev", + ide="vscode", + resources={"cpu": 1, "memory": "1GB"}, + image="python:3.9", + ) + + run_spec = RunSpec( + run_name="test-dev-run", configuration=config, ssh_key_pub="ssh-rsa test-key" + ) + + yaml_content = run_configuration_to_yaml(run_spec) + + assert "name: test-dev-run" in yaml_content + assert "type: dev-environment" in yaml_content + assert "ide: vscode" in yaml_content + + def test_configuration_without_run_name(self): + """Test converting configuration when run_name is not set""" + config = TaskConfiguration( + name="test-task", + commands=["echo 'Hello World'"], + resources={"cpu": 1, "memory": "1GB"}, + image="python:3.9", + ) + + run_spec = RunSpec(configuration=config, ssh_key_pub="ssh-rsa test-key") + + yaml_content = run_configuration_to_yaml(run_spec) + + assert "name: test-task" in yaml_content + assert "type: task" in yaml_content diff --git a/~/.dstack/server/config.yml b/~/.dstack/server/config.yml new file mode 100644 index 0000000000..75363802cf --- /dev/null +++ b/~/.dstack/server/config.yml @@ -0,0 +1,7 @@ +projects: +- name: main + backends: + - type: local + name: local +encryption: + keys: [] From d0b72b20b654a3caf8fb80be065ccb326c5a265f Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Mon, 7 Jul 2025 10:29:27 -0400 Subject: [PATCH 09/23] fix: remove accidentally committed test files with non-existent imports --- .../server/services/test_fleets_yaml.py | 55 ------------ .../server/services/test_runs_yaml.py | 87 ------------------- 2 files changed, 142 deletions(-) delete mode 100644 src/tests/_internal/server/services/test_fleets_yaml.py delete mode 100644 src/tests/_internal/server/services/test_runs_yaml.py diff --git a/src/tests/_internal/server/services/test_fleets_yaml.py b/src/tests/_internal/server/services/test_fleets_yaml.py deleted file mode 100644 index b85d90ba6d..0000000000 --- a/src/tests/_internal/server/services/test_fleets_yaml.py +++ /dev/null @@ -1,55 +0,0 @@ -from dstack._internal.core.models.fleets import FleetConfiguration, FleetSpec -from dstack._internal.core.models.profiles import Profile -from dstack._internal.server.services.fleets import fleet_configuration_to_yaml - - -def test_fleet_configuration_to_yaml(): - """Test that fleet configuration is correctly converted to YAML.""" - # Create a simple fleet configuration - config = FleetConfiguration( - name="test-fleet", - nodes=2, - resources={"gpu": "24GB"}, - ) - - spec = FleetSpec( - configuration=config, - profile=Profile(name="test-profile"), - ) - - # Convert to YAML - yaml_content = fleet_configuration_to_yaml(spec) - - # Verify the YAML contains expected content - assert "name: test-fleet" in yaml_content - assert "min: 2" in yaml_content # nodes.min - assert "max: 2" in yaml_content # nodes.max - assert "gpu:" in yaml_content - assert "type: fleet" in yaml_content - - -def test_fleet_configuration_to_yaml_with_ssh(): - """Test that SSH fleet configuration is correctly converted to YAML.""" - # Create an SSH fleet configuration - config = FleetConfiguration( - name="ssh-fleet", - ssh_config={ - "user": "ubuntu", - "hosts": ["192.168.1.100", "192.168.1.101"], - }, - ) - - spec = FleetSpec( - configuration=config, - profile=Profile(name="test-profile"), - ) - - # Convert to YAML - yaml_content = fleet_configuration_to_yaml(spec) - - # Verify the YAML contains expected content - assert "name: ssh-fleet" in yaml_content - assert "user: ubuntu" in yaml_content - assert "192.168.1.100" in yaml_content - assert "192.168.1.101" in yaml_content - assert "type: fleet" in yaml_content diff --git a/src/tests/_internal/server/services/test_runs_yaml.py b/src/tests/_internal/server/services/test_runs_yaml.py deleted file mode 100644 index 76032a98ac..0000000000 --- a/src/tests/_internal/server/services/test_runs_yaml.py +++ /dev/null @@ -1,87 +0,0 @@ -from dstack._internal.core.models.configurations import ( - DevEnvironmentConfiguration, - ServiceConfiguration, - TaskConfiguration, -) -from dstack._internal.core.models.runs import RunSpec -from dstack._internal.server.services.runs import run_configuration_to_yaml - - -class TestRunConfigurationYaml: - def test_task_configuration_to_yaml(self): - """Test converting task configuration to YAML""" - config = TaskConfiguration( - name="test-task", - commands=["echo 'Hello World'"], - resources={"cpu": 1, "memory": "1GB"}, - image="python:3.9", - ) - - run_spec = RunSpec( - run_name="test-run", configuration=config, ssh_key_pub="ssh-rsa test-key" - ) - - yaml_content = run_configuration_to_yaml(run_spec) - - assert "name: test-run" in yaml_content - assert "type: task" in yaml_content - assert "commands:" in yaml_content - assert "echo 'Hello World'" in yaml_content - assert "image: python:3.9" in yaml_content - - def test_service_configuration_to_yaml(self): - """Test converting service configuration to YAML""" - config = ServiceConfiguration( - name="test-service", - commands=["python app.py"], - port=8080, - resources={"cpu": 2, "memory": "2GB"}, - image="python:3.9", - ) - - run_spec = RunSpec( - run_name="test-service-run", configuration=config, ssh_key_pub="ssh-rsa test-key" - ) - - yaml_content = run_configuration_to_yaml(run_spec) - - assert "name: test-service-run" in yaml_content - assert "type: service" in yaml_content - assert "commands:" in yaml_content - assert "python app.py" in yaml_content - assert "port:" in yaml_content - - def test_dev_environment_configuration_to_yaml(self): - """Test converting dev environment configuration to YAML""" - config = DevEnvironmentConfiguration( - name="test-dev", - ide="vscode", - resources={"cpu": 1, "memory": "1GB"}, - image="python:3.9", - ) - - run_spec = RunSpec( - run_name="test-dev-run", configuration=config, ssh_key_pub="ssh-rsa test-key" - ) - - yaml_content = run_configuration_to_yaml(run_spec) - - assert "name: test-dev-run" in yaml_content - assert "type: dev-environment" in yaml_content - assert "ide: vscode" in yaml_content - - def test_configuration_without_run_name(self): - """Test converting configuration when run_name is not set""" - config = TaskConfiguration( - name="test-task", - commands=["echo 'Hello World'"], - resources={"cpu": 1, "memory": "1GB"}, - image="python:3.9", - ) - - run_spec = RunSpec(configuration=config, ssh_key_pub="ssh-rsa test-key") - - yaml_content = run_configuration_to_yaml(run_spec) - - assert "name: test-task" in yaml_content - assert "type: task" in yaml_content From 151eb02ea4c727cf7b2620e33935244b9663ffd6 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Mon, 7 Jul 2025 10:43:57 -0400 Subject: [PATCH 10/23] fix: add missing dialect_name parameter to get_locker call --- .../_internal/server/background/tasks/process_idle_volumes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index 93b24f4f4f..311c9610f5 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -7,7 +7,7 @@ from dstack._internal.core.models.profiles import parse_duration from dstack._internal.core.models.volumes import VolumeStatus -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import VolumeModel from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.volumes import get_volume_configuration @@ -18,7 +18,7 @@ async def process_idle_volumes(): - lock, lockset = get_locker().get_lockset(VolumeModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__) async with get_session_ctx() as session: # Take lock, select IDs, add to lockset, release lock async with lock: From fb56b818be4c574e936c61137e3d0d91f0cfc8e4 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Tue, 8 Jul 2025 09:59:39 -0400 Subject: [PATCH 11/23] fix: address code review comments --- .../server/background/tasks/process_idle_volumes.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index 311c9610f5..a750c5a022 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -20,7 +20,6 @@ async def process_idle_volumes(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__) async with get_session_ctx() as session: - # Take lock, select IDs, add to lockset, release lock async with lock: res = await session.execute( select(VolumeModel.id) @@ -59,8 +58,7 @@ async def process_idle_volumes(): await _delete_idle_volumes(session, to_delete) finally: - for volume_id in volume_ids: - lockset.discard(volume_id) + lockset.difference_update(volume_ids) def _should_delete_volume(volume: VolumeModel) -> bool: @@ -101,7 +99,7 @@ def _get_idle_time(volume: VolumeModel) -> datetime.timedelta: async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]): - """Delete idle volumes without using the delete_volumes function to avoid locking conflicts.""" + """Mark idle volumes as deleted.""" for volume in volumes: try: # Mark volume as deleted From 90a49eea87499d553d706bd90934c9434d961f03 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Tue, 8 Jul 2025 10:07:23 -0400 Subject: [PATCH 12/23] fix: actually delete volumes from cloud providers --- .../background/tasks/process_idle_volumes.py | 65 +++++++++++++++++-- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index a750c5a022..a2533f4df4 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -5,12 +5,19 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload +from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport +from dstack._internal.core.errors import BackendNotAvailable from dstack._internal.core.models.profiles import parse_duration from dstack._internal.core.models.volumes import VolumeStatus from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import VolumeModel +from dstack._internal.server.services import backends as backends_services from dstack._internal.server.services.locking import get_locker -from dstack._internal.server.services.volumes import get_volume_configuration +from dstack._internal.server.services.volumes import ( + get_volume_configuration, + volume_model_to_volume, +) +from dstack._internal.utils import common from dstack._internal.utils.common import get_current_datetime from dstack._internal.utils.logging import get_logger @@ -43,6 +50,7 @@ async def process_idle_volumes(): select(VolumeModel) .where(VolumeModel.id.in_(volume_ids)) .options(selectinload(VolumeModel.project)) + .options(selectinload(VolumeModel.user)) .options(selectinload(VolumeModel.attachments)) .execution_options(populate_existing=True) ) @@ -99,20 +107,63 @@ def _get_idle_time(volume: VolumeModel) -> datetime.timedelta: async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]): - """Mark idle volumes as deleted.""" - for volume in volumes: + """Delete idle volumes from cloud providers and mark as deleted in database.""" + for volume_model in volumes: + try: + # Try to delete from cloud provider first + await _delete_volume_from_cloud(session, volume_model) + except Exception: + logger.exception("Error when deleting volume %s from cloud", volume_model.name) + + # Always mark as deleted in database, even if cloud deletion failed try: - # Mark volume as deleted await session.execute( update(VolumeModel) - .where(VolumeModel.id == volume.id) + .where(VolumeModel.id == volume_model.id) .values( deleted=True, deleted_at=get_current_datetime(), ) ) - logger.info("Marked idle volume %s for deletion", volume.name) + logger.info("Deleted idle volume %s", volume_model.name) except Exception: - logger.exception("Failed to mark volume %s for deletion", volume.name) + logger.exception("Failed to mark volume %s as deleted in database", volume_model.name) await session.commit() + + +async def _delete_volume_from_cloud(session: AsyncSession, volume_model: VolumeModel): + """Delete volume from cloud provider. Based on volumes.py:_delete_volume""" + volume = volume_model_to_volume(volume_model) + + if volume.external: + # External volumes are not managed by dstack + return + + if volume.provisioning_data is None: + # The volume wasn't provisioned so there is nothing to delete + return + + if volume.provisioning_data.backend is None: + logger.error( + f"Failed to delete volume {volume_model.name}. volume.provisioning_data.backend is None." + ) + return + + try: + backend = await backends_services.get_project_backend_by_type_or_error( + project=volume_model.project, + backend_type=volume.provisioning_data.backend, + ) + except BackendNotAvailable: + logger.error( + f"Failed to delete volume {volume_model.name}. Backend {volume.configuration.backend} not available." + ) + return + + compute = backend.compute() + assert isinstance(compute, ComputeWithVolumeSupport) + await common.run_async( + compute.delete_volume, + volume=volume, + ) From 81885746f2aeb8121ab321383295e2f6561e2fe0 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Tue, 8 Jul 2025 10:29:41 -0400 Subject: [PATCH 13/23] Remove redundant code changes --- .../server/background/tasks/process_idle_volumes.py | 6 ------ src/dstack/_internal/server/services/jobs/__init__.py | 1 - 2 files changed, 7 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index a2533f4df4..e784bfdbdc 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -45,7 +45,6 @@ async def process_idle_volumes(): for volume_id in volume_ids: lockset.add(volume_id) - # Load volumes with related attributes in one query res = await session.execute( select(VolumeModel) .where(VolumeModel.id.in_(volume_ids)) @@ -110,12 +109,9 @@ async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel] """Delete idle volumes from cloud providers and mark as deleted in database.""" for volume_model in volumes: try: - # Try to delete from cloud provider first await _delete_volume_from_cloud(session, volume_model) except Exception: logger.exception("Error when deleting volume %s from cloud", volume_model.name) - - # Always mark as deleted in database, even if cloud deletion failed try: await session.execute( update(VolumeModel) @@ -137,11 +133,9 @@ async def _delete_volume_from_cloud(session: AsyncSession, volume_model: VolumeM volume = volume_model_to_volume(volume_model) if volume.external: - # External volumes are not managed by dstack return if volume.provisioning_data is None: - # The volume wasn't provisioned so there is nothing to delete return if volume.provisioning_data.backend is None: diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 38b01fde11..221d1ef29d 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -290,7 +290,6 @@ async def process_terminating_job( job_model.instance_id = None instance_model.last_job_processed_at = common.get_current_datetime() - # Update volume timestamps volume_names = ( jrd.volume_names if jrd and jrd.volume_names From c073017d2a657a738daf69136d2bcc991e809859 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Tue, 8 Jul 2025 10:35:02 -0400 Subject: [PATCH 14/23] Remove accidentally committed local config file --- ~/.dstack/server/config.yml | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 ~/.dstack/server/config.yml diff --git a/~/.dstack/server/config.yml b/~/.dstack/server/config.yml deleted file mode 100644 index 75363802cf..0000000000 --- a/~/.dstack/server/config.yml +++ /dev/null @@ -1,7 +0,0 @@ -projects: -- name: main - backends: - - type: local - name: local -encryption: - keys: [] From 4e1b56cfc72338bb2ab10ad624beefcb269e6158 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Wed, 9 Jul 2025 13:40:47 -0400 Subject: [PATCH 15/23] Fix MissingGreenlet error in process_idle_volumes --- .../server/background/tasks/process_idle_volumes.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index e784bfdbdc..1ac3b2db11 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -3,14 +3,14 @@ from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload +from sqlalchemy.orm import joinedload from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport from dstack._internal.core.errors import BackendNotAvailable from dstack._internal.core.models.profiles import parse_duration from dstack._internal.core.models.volumes import VolumeStatus from dstack._internal.server.db import get_db, get_session_ctx -from dstack._internal.server.models import VolumeModel +from dstack._internal.server.models import ProjectModel, VolumeModel from dstack._internal.server.services import backends as backends_services from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.volumes import ( @@ -45,12 +45,13 @@ async def process_idle_volumes(): for volume_id in volume_ids: lockset.add(volume_id) + # Refetch volumes with proper relationship loading to avoid MissingGreenlet res = await session.execute( select(VolumeModel) .where(VolumeModel.id.in_(volume_ids)) - .options(selectinload(VolumeModel.project)) - .options(selectinload(VolumeModel.user)) - .options(selectinload(VolumeModel.attachments)) + .options(joinedload(VolumeModel.project).joinedload(ProjectModel.backends)) + .options(joinedload(VolumeModel.user)) + .options(joinedload(VolumeModel.attachments)) .execution_options(populate_existing=True) ) volumes = list(res.unique().scalars().all()) From fb99485b06b73a3fc72e8895ccd639d92fc5de05 Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Wed, 9 Jul 2025 13:52:22 -0400 Subject: [PATCH 16/23] Add validation for external volumes with auto_cleanup_duration --- .../_internal/server/services/volumes.py | 17 +++++ .../_internal/server/services/test_volumes.py | 71 ++++++++++++++++++- 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/services/volumes.py b/src/dstack/_internal/server/services/volumes.py index 9a7ed53d34..4e4f63f4c4 100644 --- a/src/dstack/_internal/server/services/volumes.py +++ b/src/dstack/_internal/server/services/volumes.py @@ -400,6 +400,23 @@ def _validate_volume_configuration(configuration: VolumeConfiguration): ) if configuration.name is not None: validate_dstack_resource_name(configuration.name) + # External volumes (with volume_id) cannot have auto_cleanup_duration + if configuration.volume_id is not None and configuration.auto_cleanup_duration is not None: + if ( + isinstance(configuration.auto_cleanup_duration, int) + and configuration.auto_cleanup_duration >= 0 + ): + raise ServerClientError( + "External volumes (with volume_id) do not support auto_cleanup_duration. " + "Auto-cleanup only works for volumes created and managed by dstack." + ) + elif isinstance( + configuration.auto_cleanup_duration, str + ) and configuration.auto_cleanup_duration not in ("off", "-1"): + raise ServerClientError( + "External volumes (with volume_id) do not support auto_cleanup_duration. " + "Auto-cleanup only works for volumes created and managed by dstack." + ) async def _delete_volume(session: AsyncSession, project: ProjectModel, volume_model: VolumeModel): diff --git a/src/tests/_internal/server/services/test_volumes.py b/src/tests/_internal/server/services/test_volumes.py index c2ba555a15..4de9c3f050 100644 --- a/src/tests/_internal/server/services/test_volumes.py +++ b/src/tests/_internal/server/services/test_volumes.py @@ -3,11 +3,78 @@ import pytest from freezegun import freeze_time -from dstack._internal.core.models.volumes import VolumeStatus -from dstack._internal.server.services.volumes import _get_volume_cost +from dstack._internal.core.errors import ServerClientError +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.volumes import VolumeConfiguration, VolumeStatus +from dstack._internal.server.services.volumes import ( + _get_volume_cost, + _validate_volume_configuration, +) from dstack._internal.server.testing.common import get_volume, get_volume_provisioning_data +class TestValidateVolumeConfiguration: + def test_external_volume_with_auto_cleanup_duration_raises_error(self): + """External volumes (with volume_id) should not allow auto_cleanup_duration""" + config = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration="1h", + ) + with pytest.raises( + ServerClientError, match="External volumes.*do not support auto_cleanup_duration" + ): + _validate_volume_configuration(config) + + def test_external_volume_with_auto_cleanup_duration_int_raises_error(self): + """External volumes with integer auto_cleanup_duration should also raise error""" + config = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration=3600, + ) + with pytest.raises( + ServerClientError, match="External volumes.*do not support auto_cleanup_duration" + ): + _validate_volume_configuration(config) + + def test_external_volume_with_auto_cleanup_disabled_succeeds(self): + """External volumes with auto_cleanup_duration='off' or -1 should be allowed""" + config1 = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration="off", + ) + config2 = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration=-1, + ) + # Should not raise any errors + _validate_volume_configuration(config1) + _validate_volume_configuration(config2) + + def test_external_volume_without_auto_cleanup_succeeds(self): + """External volumes without auto_cleanup_duration should be allowed""" + config = VolumeConfiguration( + backend=BackendType.AWS, region="us-east-1", volume_id="vol-123456" + ) + # Should not raise any errors + _validate_volume_configuration(config) + + def test_new_volume_with_auto_cleanup_duration_succeeds(self): + """New volumes (without volume_id) with auto_cleanup_duration should be allowed""" + config = VolumeConfiguration( + backend=BackendType.AWS, region="us-east-1", size=100, auto_cleanup_duration="1h" + ) + # Should not raise any errors + _validate_volume_configuration(config) + + class TestGetVolumeCost: def test_returns_0_when_no_provisioning_data(self): volume = get_volume(provisioning_data=None) From 002be4dcdcf2ba59cb48ae1e8d92b2101c15d9ed Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Wed, 9 Jul 2025 14:01:37 -0400 Subject: [PATCH 17/23] Remove merge migration and rebase on master --- ...63c924_add_volume_last_job_processed_at.py | 4 ++-- ...bd76b_merge_volume_cleanup_and_secrets_.py | 21 ------------------- 2 files changed, 2 insertions(+), 23 deletions(-) delete mode 100644 src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py diff --git a/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py b/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py index 60bf75105d..ebff5358af 100644 --- a/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py +++ b/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py @@ -1,7 +1,7 @@ """add_volume_last_job_processed_at Revision ID: 09f10d63c924 -Revises: 35e90e1b0d3e +Revises: ffa99edd1988 Create Date: 2025-06-24 11:41:39.588797 """ @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision = "09f10d63c924" -down_revision = "35e90e1b0d3e" +down_revision = "ffa99edd1988" branch_labels = None depends_on = None diff --git a/src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py b/src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py deleted file mode 100644 index 6d6ce58f34..0000000000 --- a/src/dstack/_internal/server/migrations/versions/d268739bd76b_merge_volume_cleanup_and_secrets_.py +++ /dev/null @@ -1,21 +0,0 @@ -"""merge volume cleanup and secrets migrations - -Revision ID: d268739bd76b -Revises: 09f10d63c924, 644b8a114187 -Create Date: 2025-07-07 10:22:50.061037 - -""" - -# revision identifiers, used by Alembic. -revision = "d268739bd76b" -down_revision = ("09f10d63c924", "644b8a114187") -branch_labels = None -depends_on = None - - -def upgrade() -> None: - pass - - -def downgrade() -> None: - pass From e1d98dd8ccb5d7cca58404a6d265b58a17be4b4f Mon Sep 17 00:00:00 2001 From: Haydn Li Date: Wed, 9 Jul 2025 14:14:21 -0400 Subject: [PATCH 18/23] Added merge migration to resolve multiple heads --- ...440b9_merge_volume_cleanup_and_secrets_.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py diff --git a/src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py b/src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py new file mode 100644 index 0000000000..ae7dda4541 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py @@ -0,0 +1,21 @@ +"""merge volume cleanup and secrets migrations + +Revision ID: ac86efc440b9 +Revises: 09f10d63c924, 644b8a114187 +Create Date: 2025-07-09 14:13:07.602021 + +""" + +# revision identifiers, used by Alembic. +revision = "ac86efc440b9" +down_revision = ("09f10d63c924", "644b8a114187") +branch_labels = None +depends_on = None + + +def upgrade() -> None: + pass + + +def downgrade() -> None: + pass From d9ca2f2424df91ce1955cdc001ec71393e8100b5 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 15 Jul 2025 14:32:03 +0500 Subject: [PATCH 19/23] Rebase migrations --- ...440b9_merge_volume_cleanup_and_secrets_.py | 21 ------------------- ..._add_volumemodel_last_job_processed_at.py} | 14 +++++++------ 2 files changed, 8 insertions(+), 27 deletions(-) delete mode 100644 src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py rename src/dstack/_internal/server/migrations/versions/{09f10d63c924_add_volume_last_job_processed_at.py => d5863798bf41_add_volumemodel_last_job_processed_at.py} (81%) diff --git a/src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py b/src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py deleted file mode 100644 index ae7dda4541..0000000000 --- a/src/dstack/_internal/server/migrations/versions/ac86efc440b9_merge_volume_cleanup_and_secrets_.py +++ /dev/null @@ -1,21 +0,0 @@ -"""merge volume cleanup and secrets migrations - -Revision ID: ac86efc440b9 -Revises: 09f10d63c924, 644b8a114187 -Create Date: 2025-07-09 14:13:07.602021 - -""" - -# revision identifiers, used by Alembic. -revision = "ac86efc440b9" -down_revision = ("09f10d63c924", "644b8a114187") -branch_labels = None -depends_on = None - - -def upgrade() -> None: - pass - - -def downgrade() -> None: - pass diff --git a/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py b/src/dstack/_internal/server/migrations/versions/d5863798bf41_add_volumemodel_last_job_processed_at.py similarity index 81% rename from src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py rename to src/dstack/_internal/server/migrations/versions/d5863798bf41_add_volumemodel_last_job_processed_at.py index ebff5358af..1dc883e05e 100644 --- a/src/dstack/_internal/server/migrations/versions/09f10d63c924_add_volume_last_job_processed_at.py +++ b/src/dstack/_internal/server/migrations/versions/d5863798bf41_add_volumemodel_last_job_processed_at.py @@ -1,8 +1,8 @@ -"""add_volume_last_job_processed_at +"""Add VolumeModel.last_job_processed_at -Revision ID: 09f10d63c924 -Revises: ffa99edd1988 -Create Date: 2025-06-24 11:41:39.588797 +Revision ID: d5863798bf41 +Revises: 644b8a114187 +Create Date: 2025-07-15 14:26:22.981687 """ @@ -12,8 +12,8 @@ import dstack._internal.server.models # revision identifiers, used by Alembic. -revision = "09f10d63c924" -down_revision = "ffa99edd1988" +revision = "d5863798bf41" +down_revision = "644b8a114187" branch_labels = None depends_on = None @@ -28,6 +28,7 @@ def upgrade() -> None: nullable=True, ) ) + # ### end Alembic commands ### @@ -35,4 +36,5 @@ def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table("volumes", schema=None) as batch_op: batch_op.drop_column("last_job_processed_at") + # ### end Alembic commands ### From a546d97876bbc550ff6c7e9b559c8e45852e1def Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 15 Jul 2025 14:33:45 +0500 Subject: [PATCH 20/23] Update idle_duration type to match updated parse_idle_duration --- src/dstack/_internal/core/models/profiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index 0ebaef81fb..f98213c6a5 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -247,7 +247,7 @@ class ProfileParams(CoreModel): ), ] = None idle_duration: Annotated[ - Optional[Union[Literal["off"], str, int, bool]], + Optional[Union[Literal["off"], str, int]], Field( description=( "Time to wait before terminating idle instances." From 96b928647d0c3e2d00ee73dd01da9dc5be425f97 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 15 Jul 2025 15:24:26 +0500 Subject: [PATCH 21/23] Refactor process_idle_volumes --- .../background/tasks/process_idle_volumes.py | 77 ++++++---------- src/dstack/_internal/server/testing/common.py | 2 + .../tasks/test_process_idle_volumes.py | 87 ++++++++++++------- 3 files changed, 85 insertions(+), 81 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py index 1ac3b2db11..33d9d5a9b4 100644 --- a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -1,7 +1,7 @@ import datetime from typing import List -from sqlalchemy import select, update +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload @@ -37,7 +37,7 @@ async def process_idle_volumes(): ) .order_by(VolumeModel.last_processed_at.asc()) .limit(10) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) volume_ids = list(res.scalars().all()) if not volume_ids: @@ -45,7 +45,6 @@ async def process_idle_volumes(): for volume_id in volume_ids: lockset.add(volume_id) - # Refetch volumes with proper relationship loading to avoid MissingGreenlet res = await session.execute( select(VolumeModel) .where(VolumeModel.id.in_(volume_ids)) @@ -54,89 +53,65 @@ async def process_idle_volumes(): .options(joinedload(VolumeModel.attachments)) .execution_options(populate_existing=True) ) - volumes = list(res.unique().scalars().all()) - + volume_models = list(res.unique().scalars().all()) try: - to_delete = [] - for volume in volumes: - if _should_delete_volume(volume): - to_delete.append(volume) - - if to_delete: - await _delete_idle_volumes(session, to_delete) - + volumes_to_delete = [v for v in volume_models if _should_delete_volume(v)] + if not volumes_to_delete: + return + await _delete_idle_volumes(session, volumes_to_delete) finally: lockset.difference_update(volume_ids) def _should_delete_volume(volume: VolumeModel) -> bool: - config = get_volume_configuration(volume) - - if not config.auto_cleanup_duration: + if volume.attachments: return False - if isinstance(config.auto_cleanup_duration, int) and config.auto_cleanup_duration < 0: + config = get_volume_configuration(volume) + if not config.auto_cleanup_duration: return False duration_seconds = parse_duration(config.auto_cleanup_duration) if not duration_seconds or duration_seconds <= 0: return False - if volume.attachments: - return False - idle_time = _get_idle_time(volume) threshold = datetime.timedelta(seconds=duration_seconds) - - if idle_time > threshold: - logger.info( - "Deleting idle volume %s (idle %.1fh)", volume.name, idle_time.total_seconds() / 3600 - ) - return True - - return False + return idle_time > threshold def _get_idle_time(volume: VolumeModel) -> datetime.timedelta: last_used = volume.last_job_processed_at or volume.created_at last_used_utc = last_used.replace(tzinfo=datetime.timezone.utc) - now = get_current_datetime() - - idle_time = now - last_used_utc + idle_time = get_current_datetime() - last_used_utc return max(idle_time, datetime.timedelta(0)) async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]): - """Delete idle volumes from cloud providers and mark as deleted in database.""" + # Note: Multiple volumes are deleted in the same transaction, + # so long deletion of one volume may block processing other volumes. for volume_model in volumes: + logger.info("Deleting idle volume %s", volume_model.name) try: - await _delete_volume_from_cloud(session, volume_model) - except Exception: - logger.exception("Error when deleting volume %s from cloud", volume_model.name) - try: - await session.execute( - update(VolumeModel) - .where(VolumeModel.id == volume_model.id) - .values( - deleted=True, - deleted_at=get_current_datetime(), - ) - ) - logger.info("Deleted idle volume %s", volume_model.name) + await _delete_idle_volume(session, volume_model) except Exception: - logger.exception("Failed to mark volume %s as deleted in database", volume_model.name) + logger.exception("Error when deleting idle volume %s", volume_model.name) + + volume_model.deleted = True + volume_model.deleted_at = get_current_datetime() + + logger.info("Deleted idle volume %s", volume_model.name) await session.commit() -async def _delete_volume_from_cloud(session: AsyncSession, volume_model: VolumeModel): - """Delete volume from cloud provider. Based on volumes.py:_delete_volume""" +async def _delete_idle_volume(session: AsyncSession, volume_model: VolumeModel): volume = volume_model_to_volume(volume_model) - if volume.external: - return - if volume.provisioning_data is None: + logger.error( + f"Failed to delete volume {volume_model.name}. volume.provisioning_data is None." + ) return if volume.provisioning_data.backend is None: diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 50e32e969c..4eb868d4cb 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -742,6 +742,7 @@ async def create_volume( status: VolumeStatus = VolumeStatus.SUBMITTED, created_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), last_processed_at: Optional[datetime] = None, + last_job_processed_at: Optional[datetime] = None, configuration: Optional[VolumeConfiguration] = None, volume_provisioning_data: Optional[VolumeProvisioningData] = None, deleted_at: Optional[datetime] = None, @@ -759,6 +760,7 @@ async def create_volume( status=status, created_at=created_at, last_processed_at=last_processed_at, + last_job_processed_at=last_job_processed_at, configuration=configuration.json(), volume_provisioning_data=volume_provisioning_data.json() if volume_provisioning_data diff --git a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py index 62b95ec252..8f6621186c 100644 --- a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py +++ b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py @@ -1,4 +1,5 @@ import datetime +from unittest.mock import Mock, patch import pytest from sqlalchemy.ext.asyncio import AsyncSession @@ -12,6 +13,7 @@ ) from dstack._internal.server.models import VolumeAttachmentModel from dstack._internal.server.testing.common import ( + ComputeMockSpec, create_instance, create_project, create_user, @@ -24,6 +26,61 @@ @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) class TestProcessIdleVolumes: + async def test_deletes_idle_volumes(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + + config1 = get_volume_configuration( + name="test-volume", + auto_cleanup_duration="1h", + ) + config2 = get_volume_configuration( + name="test-volume", + auto_cleanup_duration="3h", + ) + volume1 = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=config1, + volume_provisioning_data=get_volume_provisioning_data(), + last_job_processed_at=datetime.datetime.now(datetime.timezone.utc) + - datetime.timedelta(hours=2), + ) + volume2 = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=config2, + volume_provisioning_data=get_volume_provisioning_data(), + last_job_processed_at=datetime.datetime.now(datetime.timezone.utc) + - datetime.timedelta(hours=2), + ) + await session.commit() + + with patch( + "dstack._internal.server.services.backends.get_project_backend_by_type_or_error" + ) as m: + aws_mock = Mock() + m.return_value = aws_mock + aws_mock.compute.return_value = Mock(spec=ComputeMockSpec) + await process_idle_volumes() + + await session.refresh(volume1) + await session.refresh(volume2) + assert volume1.deleted + assert volume1.deleted_at is not None + assert not volume2.deleted + assert volume2.deleted_at is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +class TestShouldDeleteVolume: async def test_no_idle_duration(self, test_db, session: AsyncSession): project = await create_project(session=session) user = await create_user(session=session) @@ -131,33 +188,3 @@ async def test_never_used_volume(self, test_db, session: AsyncSession): volume.last_job_processed_at = None idle_time = _get_idle_time(volume) assert idle_time.total_seconds() >= 7000 - - async def test_integration(self, test_db, session: AsyncSession): - project = await create_project(session=session) - user = await create_user(session=session) - - config = get_volume_configuration(name="test-volume") - config.auto_cleanup_duration = "1h" - - volume = await create_volume( - session=session, - project=project, - user=user, - status=VolumeStatus.ACTIVE, - backend=BackendType.AWS, - configuration=config, - volume_provisioning_data=get_volume_provisioning_data(), - ) - - volume.last_job_processed_at = ( - datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) - ).replace(tzinfo=None) - await session.commit() - - # Run the background task - await process_idle_volumes() - - # Refresh the volume to see if it was marked as deleted - await session.refresh(volume) - assert volume.deleted is True - assert volume.deleted_at is not None From 065dfeb76180bef06f9aabde195e2c689c4bde25 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 15 Jul 2025 16:50:39 +0500 Subject: [PATCH 22/23] Refactor External volumes check --- src/dstack/_internal/server/services/volumes.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/dstack/_internal/server/services/volumes.py b/src/dstack/_internal/server/services/volumes.py index 4e4f63f4c4..be43e02ce5 100644 --- a/src/dstack/_internal/server/services/volumes.py +++ b/src/dstack/_internal/server/services/volumes.py @@ -400,23 +400,19 @@ def _validate_volume_configuration(configuration: VolumeConfiguration): ) if configuration.name is not None: validate_dstack_resource_name(configuration.name) - # External volumes (with volume_id) cannot have auto_cleanup_duration + if configuration.volume_id is not None and configuration.auto_cleanup_duration is not None: if ( isinstance(configuration.auto_cleanup_duration, int) - and configuration.auto_cleanup_duration >= 0 + and configuration.auto_cleanup_duration > 0 + ) or ( + isinstance(configuration.auto_cleanup_duration, str) + and configuration.auto_cleanup_duration not in ("off", "-1") ): raise ServerClientError( "External volumes (with volume_id) do not support auto_cleanup_duration. " "Auto-cleanup only works for volumes created and managed by dstack." ) - elif isinstance( - configuration.auto_cleanup_duration, str - ) and configuration.auto_cleanup_duration not in ("off", "-1"): - raise ServerClientError( - "External volumes (with volume_id) do not support auto_cleanup_duration. " - "Auto-cleanup only works for volumes created and managed by dstack." - ) async def _delete_volume(session: AsyncSession, project: ProjectModel, volume_model: VolumeModel): From 6fc89f1e9ce74d6551a2b51c30107cc0ac2bc27f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 15 Jul 2025 17:02:53 +0500 Subject: [PATCH 23/23] Fix client backward compatibility --- src/dstack/_internal/core/compatibility/volumes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dstack/_internal/core/compatibility/volumes.py b/src/dstack/_internal/core/compatibility/volumes.py index 7395674f93..4b7be6bb00 100644 --- a/src/dstack/_internal/core/compatibility/volumes.py +++ b/src/dstack/_internal/core/compatibility/volumes.py @@ -30,4 +30,6 @@ def _get_volume_configuration_excludes( configuration_excludes: IncludeExcludeDictType = {} if configuration.tags is None: configuration_excludes["tags"] = True + if configuration.auto_cleanup_duration is None: + configuration_excludes["auto_cleanup_duration"] = True return configuration_excludes