diff --git a/src/dstack/_internal/cli/services/profile.py b/src/dstack/_internal/cli/services/profile.py index 23bbe55ad3..d57ea2e130 100644 --- a/src/dstack/_internal/cli/services/profile.py +++ b/src/dstack/_internal/cli/services/profile.py @@ -159,7 +159,7 @@ def apply_profile_args( if args.idle_duration is not None: profile_settings.idle_duration = args.idle_duration elif args.dont_destroy: - profile_settings.idle_duration = False + profile_settings.idle_duration = "off" if args.creation_policy_reuse: profile_settings.creation_policy = CreationPolicy.REUSE diff --git a/src/dstack/_internal/core/compatibility/volumes.py b/src/dstack/_internal/core/compatibility/volumes.py index 7395674f93..4b7be6bb00 100644 --- a/src/dstack/_internal/core/compatibility/volumes.py +++ b/src/dstack/_internal/core/compatibility/volumes.py @@ -30,4 +30,6 @@ def _get_volume_configuration_excludes( configuration_excludes: IncludeExcludeDictType = {} if configuration.tags is None: configuration_excludes["tags"] = True + if configuration.auto_cleanup_duration is None: + configuration_excludes["auto_cleanup_duration"] = True return configuration_excludes diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index 62997ce4e4..f98213c6a5 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -74,11 +74,9 @@ def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str return parse_duration(v) -def parse_idle_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]: - if v is False: +def parse_idle_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]: + if v == "off" or v == -1: return -1 - if v is True: - return None return parse_duration(v) @@ -249,7 +247,7 @@ class ProfileParams(CoreModel): ), ] = None idle_duration: Annotated[ - Optional[Union[Literal["off"], str, int, bool]], + Optional[Union[Literal["off"], str, int]], Field( description=( "Time to wait before terminating idle instances." diff --git a/src/dstack/_internal/core/models/volumes.py b/src/dstack/_internal/core/models/volumes.py index 773fd9429f..0f89b770f0 100644 --- a/src/dstack/_internal/core/models/volumes.py +++ b/src/dstack/_internal/core/models/volumes.py @@ -9,6 +9,7 @@ from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import CoreModel +from dstack._internal.core.models.profiles import parse_idle_duration from dstack._internal.core.models.resources import Memory from dstack._internal.utils.common import get_or_error from dstack._internal.utils.tags import tags_validator @@ -44,6 +45,16 @@ class VolumeConfiguration(CoreModel): Optional[str], Field(description="The volume ID. Must be specified when registering external volumes"), ] = None + auto_cleanup_duration: Annotated[ + Optional[Union[str, int]], + Field( + description=( + "Time to wait after volume is no longer used by any job before deleting it. " + "Defaults to keep the volume indefinitely. " + "Use the value 'off' or -1 to disable auto-cleanup." + ) + ), + ] = None tags: Annotated[ Optional[Dict[str, str]], Field( @@ -56,6 +67,9 @@ class VolumeConfiguration(CoreModel): ] = None _validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator) + _validate_auto_cleanup_duration = validator( + "auto_cleanup_duration", pre=True, allow_reuse=True + )(parse_idle_duration) @property def size_gb(self) -> int: diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 2dd410cd28..cf802b877c 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -7,6 +7,7 @@ process_gateways_connections, process_submitted_gateways, ) +from dstack._internal.server.background.tasks.process_idle_volumes import process_idle_volumes from dstack._internal.server.background.tasks.process_instances import ( process_instances, ) @@ -76,6 +77,9 @@ def start_background_tasks() -> AsyncIOScheduler: _scheduler.add_job( process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5 ) + _scheduler.add_job( + process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1 + ) _scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5)) for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR): # Add multiple copies of tasks if requested. diff --git a/src/dstack/_internal/server/background/tasks/process_idle_volumes.py b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py new file mode 100644 index 0000000000..33d9d5a9b4 --- /dev/null +++ b/src/dstack/_internal/server/background/tasks/process_idle_volumes.py @@ -0,0 +1,139 @@ +import datetime +from typing import List + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport +from dstack._internal.core.errors import BackendNotAvailable +from dstack._internal.core.models.profiles import parse_duration +from dstack._internal.core.models.volumes import VolumeStatus +from dstack._internal.server.db import get_db, get_session_ctx +from dstack._internal.server.models import ProjectModel, VolumeModel +from dstack._internal.server.services import backends as backends_services +from dstack._internal.server.services.locking import get_locker +from dstack._internal.server.services.volumes import ( + get_volume_configuration, + volume_model_to_volume, +) +from dstack._internal.utils import common +from dstack._internal.utils.common import get_current_datetime +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +async def process_idle_volumes(): + lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__) + async with get_session_ctx() as session: + async with lock: + res = await session.execute( + select(VolumeModel.id) + .where( + VolumeModel.status == VolumeStatus.ACTIVE, + VolumeModel.deleted == False, + VolumeModel.id.not_in(lockset), + ) + .order_by(VolumeModel.last_processed_at.asc()) + .limit(10) + .with_for_update(skip_locked=True, key_share=True) + ) + volume_ids = list(res.scalars().all()) + if not volume_ids: + return + for volume_id in volume_ids: + lockset.add(volume_id) + + res = await session.execute( + select(VolumeModel) + .where(VolumeModel.id.in_(volume_ids)) + .options(joinedload(VolumeModel.project).joinedload(ProjectModel.backends)) + .options(joinedload(VolumeModel.user)) + .options(joinedload(VolumeModel.attachments)) + .execution_options(populate_existing=True) + ) + volume_models = list(res.unique().scalars().all()) + try: + volumes_to_delete = [v for v in volume_models if _should_delete_volume(v)] + if not volumes_to_delete: + return + await _delete_idle_volumes(session, volumes_to_delete) + finally: + lockset.difference_update(volume_ids) + + +def _should_delete_volume(volume: VolumeModel) -> bool: + if volume.attachments: + return False + + config = get_volume_configuration(volume) + if not config.auto_cleanup_duration: + return False + + duration_seconds = parse_duration(config.auto_cleanup_duration) + if not duration_seconds or duration_seconds <= 0: + return False + + idle_time = _get_idle_time(volume) + threshold = datetime.timedelta(seconds=duration_seconds) + return idle_time > threshold + + +def _get_idle_time(volume: VolumeModel) -> datetime.timedelta: + last_used = volume.last_job_processed_at or volume.created_at + last_used_utc = last_used.replace(tzinfo=datetime.timezone.utc) + idle_time = get_current_datetime() - last_used_utc + return max(idle_time, datetime.timedelta(0)) + + +async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]): + # Note: Multiple volumes are deleted in the same transaction, + # so long deletion of one volume may block processing other volumes. + for volume_model in volumes: + logger.info("Deleting idle volume %s", volume_model.name) + try: + await _delete_idle_volume(session, volume_model) + except Exception: + logger.exception("Error when deleting idle volume %s", volume_model.name) + + volume_model.deleted = True + volume_model.deleted_at = get_current_datetime() + + logger.info("Deleted idle volume %s", volume_model.name) + + await session.commit() + + +async def _delete_idle_volume(session: AsyncSession, volume_model: VolumeModel): + volume = volume_model_to_volume(volume_model) + + if volume.provisioning_data is None: + logger.error( + f"Failed to delete volume {volume_model.name}. volume.provisioning_data is None." + ) + return + + if volume.provisioning_data.backend is None: + logger.error( + f"Failed to delete volume {volume_model.name}. volume.provisioning_data.backend is None." + ) + return + + try: + backend = await backends_services.get_project_backend_by_type_or_error( + project=volume_model.project, + backend_type=volume.provisioning_data.backend, + ) + except BackendNotAvailable: + logger.error( + f"Failed to delete volume {volume_model.name}. Backend {volume.configuration.backend} not available." + ) + return + + compute = backend.compute() + assert isinstance(compute, ComputeWithVolumeSupport) + await common.run_async( + compute.delete_volume, + volume=volume, + ) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index b9c2f9c946..eba9549b68 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -739,3 +739,5 @@ async def _attach_volume( attachment_data=attachment_data.json(), ) instance.volume_attachments.append(volume_attachment_model) + + volume_model.last_job_processed_at = common_utils.get_current_datetime() diff --git a/src/dstack/_internal/server/migrations/versions/d5863798bf41_add_volumemodel_last_job_processed_at.py b/src/dstack/_internal/server/migrations/versions/d5863798bf41_add_volumemodel_last_job_processed_at.py new file mode 100644 index 0000000000..1dc883e05e --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/d5863798bf41_add_volumemodel_last_job_processed_at.py @@ -0,0 +1,40 @@ +"""Add VolumeModel.last_job_processed_at + +Revision ID: d5863798bf41 +Revises: 644b8a114187 +Create Date: 2025-07-15 14:26:22.981687 + +""" + +import sqlalchemy as sa +from alembic import op + +import dstack._internal.server.models + +# revision identifiers, used by Alembic. +revision = "d5863798bf41" +down_revision = "644b8a114187" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("volumes", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "last_job_processed_at", + dstack._internal.server.models.NaiveDateTime(), + nullable=True, + ) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("volumes", schema=None) as batch_op: + batch_op.drop_column("last_job_processed_at") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index d39d07be10..c4dafe81e3 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -645,6 +645,7 @@ class VolumeModel(BaseModel): last_processed_at: Mapped[datetime] = mapped_column( NaiveDateTime, default=get_current_datetime ) + last_job_processed_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) deleted: Mapped[bool] = mapped_column(Boolean, default=False) deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 41aa496be7..221d1ef29d 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -289,6 +289,19 @@ async def process_terminating_job( # so that stuck volumes don't prevent the instance from terminating. job_model.instance_id = None instance_model.last_job_processed_at = common.get_current_datetime() + + volume_names = ( + jrd.volume_names + if jrd and jrd.volume_names + else [va.volume.name for va in instance_model.volume_attachments] + ) + if volume_names: + volumes = await list_project_volume_models( + session=session, project=instance_model.project, names=volume_names + ) + for volume in volumes: + volume.last_job_processed_at = common.get_current_datetime() + logger.info( "%s: instance '%s' has been released, new status is %s", fmt(job_model), diff --git a/src/dstack/_internal/server/services/volumes.py b/src/dstack/_internal/server/services/volumes.py index 9a7ed53d34..be43e02ce5 100644 --- a/src/dstack/_internal/server/services/volumes.py +++ b/src/dstack/_internal/server/services/volumes.py @@ -401,6 +401,19 @@ def _validate_volume_configuration(configuration: VolumeConfiguration): if configuration.name is not None: validate_dstack_resource_name(configuration.name) + if configuration.volume_id is not None and configuration.auto_cleanup_duration is not None: + if ( + isinstance(configuration.auto_cleanup_duration, int) + and configuration.auto_cleanup_duration > 0 + ) or ( + isinstance(configuration.auto_cleanup_duration, str) + and configuration.auto_cleanup_duration not in ("off", "-1") + ): + raise ServerClientError( + "External volumes (with volume_id) do not support auto_cleanup_duration. " + "Auto-cleanup only works for volumes created and managed by dstack." + ) + async def _delete_volume(session: AsyncSession, project: ProjectModel, volume_model: VolumeModel): volume = volume_model_to_volume(volume_model) diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 047adb5c14..4eb868d4cb 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -742,6 +742,7 @@ async def create_volume( status: VolumeStatus = VolumeStatus.SUBMITTED, created_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), last_processed_at: Optional[datetime] = None, + last_job_processed_at: Optional[datetime] = None, configuration: Optional[VolumeConfiguration] = None, volume_provisioning_data: Optional[VolumeProvisioningData] = None, deleted_at: Optional[datetime] = None, @@ -759,6 +760,7 @@ async def create_volume( status=status, created_at=created_at, last_processed_at=last_processed_at, + last_job_processed_at=last_job_processed_at, configuration=configuration.json(), volume_provisioning_data=volume_provisioning_data.json() if volume_provisioning_data @@ -820,6 +822,7 @@ def get_volume_configuration( region: str = "eu-west-1", size: Optional[Memory] = Memory(100), volume_id: Optional[str] = None, + auto_cleanup_duration: Optional[Union[str, int]] = None, ) -> VolumeConfiguration: return VolumeConfiguration( name=name, @@ -827,6 +830,7 @@ def get_volume_configuration( region=region, size=size, volume_id=volume_id, + auto_cleanup_duration=auto_cleanup_duration, ) diff --git a/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py new file mode 100644 index 0000000000..8f6621186c --- /dev/null +++ b/src/tests/_internal/server/background/tasks/test_process_idle_volumes.py @@ -0,0 +1,190 @@ +import datetime +from unittest.mock import Mock, patch + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.volumes import VolumeStatus +from dstack._internal.server.background.tasks.process_idle_volumes import ( + _get_idle_time, + _should_delete_volume, + process_idle_volumes, +) +from dstack._internal.server.models import VolumeAttachmentModel +from dstack._internal.server.testing.common import ( + ComputeMockSpec, + create_instance, + create_project, + create_user, + create_volume, + get_volume_configuration, + get_volume_provisioning_data, +) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +class TestProcessIdleVolumes: + async def test_deletes_idle_volumes(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + + config1 = get_volume_configuration( + name="test-volume", + auto_cleanup_duration="1h", + ) + config2 = get_volume_configuration( + name="test-volume", + auto_cleanup_duration="3h", + ) + volume1 = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=config1, + volume_provisioning_data=get_volume_provisioning_data(), + last_job_processed_at=datetime.datetime.now(datetime.timezone.utc) + - datetime.timedelta(hours=2), + ) + volume2 = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=config2, + volume_provisioning_data=get_volume_provisioning_data(), + last_job_processed_at=datetime.datetime.now(datetime.timezone.utc) + - datetime.timedelta(hours=2), + ) + await session.commit() + + with patch( + "dstack._internal.server.services.backends.get_project_backend_by_type_or_error" + ) as m: + aws_mock = Mock() + m.return_value = aws_mock + aws_mock.compute.return_value = Mock(spec=ComputeMockSpec) + await process_idle_volumes() + + await session.refresh(volume1) + await session.refresh(volume2) + assert volume1.deleted + assert volume1.deleted_at is not None + assert not volume2.deleted + assert volume2.deleted_at is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +class TestShouldDeleteVolume: + async def test_no_idle_duration(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=get_volume_configuration(name="test-volume"), + volume_provisioning_data=get_volume_provisioning_data(), + ) + + assert not _should_delete_volume(volume) + + async def test_idle_duration_disabled(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + + config = get_volume_configuration(name="test-volume") + config.auto_cleanup_duration = -1 + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + assert not _should_delete_volume(volume) + + async def test_volume_attached(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + + config = get_volume_configuration(name="test-volume") + config.auto_cleanup_duration = "1h" + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + instance = await create_instance(session=session, project=project) + volume.attachments.append( + VolumeAttachmentModel(volume_id=volume.id, instance_id=instance.id) + ) + await session.commit() + + assert not _should_delete_volume(volume) + + async def test_idle_duration_threshold(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + + config = get_volume_configuration(name="test-volume") + config.auto_cleanup_duration = "1h" + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=config, + volume_provisioning_data=get_volume_provisioning_data(), + ) + + # Not exceeded - 30 minutes ago + volume.last_job_processed_at = ( + datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30) + ).replace(tzinfo=None) + assert not _should_delete_volume(volume) + + # Exceeded - 2 hours ago + volume.last_job_processed_at = ( + datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) + ).replace(tzinfo=None) + assert _should_delete_volume(volume) + + async def test_never_used_volume(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + + volume = await create_volume( + session=session, + project=project, + user=user, + status=VolumeStatus.ACTIVE, + backend=BackendType.AWS, + configuration=get_volume_configuration(name="test-volume"), + volume_provisioning_data=get_volume_provisioning_data(), + created_at=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2), + ) + + volume.last_job_processed_at = None + idle_time = _get_idle_time(volume) + assert idle_time.total_seconds() >= 7000 diff --git a/src/tests/_internal/server/services/test_volumes.py b/src/tests/_internal/server/services/test_volumes.py index c2ba555a15..4de9c3f050 100644 --- a/src/tests/_internal/server/services/test_volumes.py +++ b/src/tests/_internal/server/services/test_volumes.py @@ -3,11 +3,78 @@ import pytest from freezegun import freeze_time -from dstack._internal.core.models.volumes import VolumeStatus -from dstack._internal.server.services.volumes import _get_volume_cost +from dstack._internal.core.errors import ServerClientError +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.volumes import VolumeConfiguration, VolumeStatus +from dstack._internal.server.services.volumes import ( + _get_volume_cost, + _validate_volume_configuration, +) from dstack._internal.server.testing.common import get_volume, get_volume_provisioning_data +class TestValidateVolumeConfiguration: + def test_external_volume_with_auto_cleanup_duration_raises_error(self): + """External volumes (with volume_id) should not allow auto_cleanup_duration""" + config = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration="1h", + ) + with pytest.raises( + ServerClientError, match="External volumes.*do not support auto_cleanup_duration" + ): + _validate_volume_configuration(config) + + def test_external_volume_with_auto_cleanup_duration_int_raises_error(self): + """External volumes with integer auto_cleanup_duration should also raise error""" + config = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration=3600, + ) + with pytest.raises( + ServerClientError, match="External volumes.*do not support auto_cleanup_duration" + ): + _validate_volume_configuration(config) + + def test_external_volume_with_auto_cleanup_disabled_succeeds(self): + """External volumes with auto_cleanup_duration='off' or -1 should be allowed""" + config1 = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration="off", + ) + config2 = VolumeConfiguration( + backend=BackendType.AWS, + region="us-east-1", + volume_id="vol-123456", + auto_cleanup_duration=-1, + ) + # Should not raise any errors + _validate_volume_configuration(config1) + _validate_volume_configuration(config2) + + def test_external_volume_without_auto_cleanup_succeeds(self): + """External volumes without auto_cleanup_duration should be allowed""" + config = VolumeConfiguration( + backend=BackendType.AWS, region="us-east-1", volume_id="vol-123456" + ) + # Should not raise any errors + _validate_volume_configuration(config) + + def test_new_volume_with_auto_cleanup_duration_succeeds(self): + """New volumes (without volume_id) with auto_cleanup_duration should be allowed""" + config = VolumeConfiguration( + backend=BackendType.AWS, region="us-east-1", size=100, auto_cleanup_duration="1h" + ) + # Should not raise any errors + _validate_volume_configuration(config) + + class TestGetVolumeCost: def test_returns_0_when_no_provisioning_data(self): volume = get_volume(provisioning_data=None)