Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8490ae1
feat: add volume idle duration cleanup feature (#2497)
haydnli-shopify Jun 24, 2025
8514a12
fix: avoid FOR UPDATE with outer join in process_idle_volumes (Postgr…
haydnli-shopify Jun 24, 2025
25f634a
Fix formatting issues
haydnli-shopify Jun 25, 2025
127dc2b
Optimize idle volume cleanup implementation
haydnli-shopify Jun 25, 2025
75daddd
Removed unnecessary changes and refactor implementations
haydnli-shopify Jun 26, 2025
ad05cb3
Removed unnecessary changes and refactor implementations
haydnli-shopify Jun 26, 2025
3d56496
Fix volume auto-cleanup locking and mocking issues
haydnli-shopify Jul 3, 2025
29187a8
feat: add merge migration for volume cleanup and secrets
haydnli-shopify Jul 7, 2025
d0b72b2
fix: remove accidentally committed test files with non-existent imports
haydnli-shopify Jul 7, 2025
151eb02
fix: add missing dialect_name parameter to get_locker call
haydnli-shopify Jul 7, 2025
fb56b81
fix: address code review comments
haydnli-shopify Jul 8, 2025
90a49ee
fix: actually delete volumes from cloud providers
haydnli-shopify Jul 8, 2025
8188574
Remove redundant code changes
haydnli-shopify Jul 8, 2025
c073017
Remove accidentally committed local config file
haydnli-shopify Jul 8, 2025
4e1b56c
Fix MissingGreenlet error in process_idle_volumes
haydnli-shopify Jul 9, 2025
fb99485
Add validation for external volumes with auto_cleanup_duration
haydnli-shopify Jul 9, 2025
002be4d
Remove merge migration and rebase on master
haydnli-shopify Jul 9, 2025
e1d98dd
Added merge migration to resolve multiple heads
haydnli-shopify Jul 9, 2025
d9ca2f2
Rebase migrations
r4victor Jul 15, 2025
a546d97
Update idle_duration type to match updated parse_idle_duration
r4victor Jul 15, 2025
96b9286
Refactor process_idle_volumes
r4victor Jul 15, 2025
065dfeb
Refactor External volumes check
r4victor Jul 15, 2025
6fc89f1
Fix client backward compatibility
r4victor Jul 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dstack/_internal/cli/services/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def apply_profile_args(
if args.idle_duration is not None:
profile_settings.idle_duration = args.idle_duration
elif args.dont_destroy:
profile_settings.idle_duration = False
profile_settings.idle_duration = "off"
if args.creation_policy_reuse:
profile_settings.creation_policy = CreationPolicy.REUSE

Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/compatibility/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ def _get_volume_configuration_excludes(
configuration_excludes: IncludeExcludeDictType = {}
if configuration.tags is None:
configuration_excludes["tags"] = True
if configuration.auto_cleanup_duration is None:
configuration_excludes["auto_cleanup_duration"] = True
return configuration_excludes
8 changes: 3 additions & 5 deletions src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str
return parse_duration(v)


def parse_idle_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
if v is False:
def parse_idle_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
if v == "off" or v == -1:
return -1
if v is True:
return None
return parse_duration(v)


Expand Down Expand Up @@ -249,7 +247,7 @@ class ProfileParams(CoreModel):
),
] = None
idle_duration: Annotated[
Optional[Union[Literal["off"], str, int, bool]],
Optional[Union[Literal["off"], str, int]],
Field(
description=(
"Time to wait before terminating idle instances."
Expand Down
14 changes: 14 additions & 0 deletions src/dstack/_internal/core/models/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.common import CoreModel
from dstack._internal.core.models.profiles import parse_idle_duration
from dstack._internal.core.models.resources import Memory
from dstack._internal.utils.common import get_or_error
from dstack._internal.utils.tags import tags_validator
Expand Down Expand Up @@ -44,6 +45,16 @@ class VolumeConfiguration(CoreModel):
Optional[str],
Field(description="The volume ID. Must be specified when registering external volumes"),
] = None
auto_cleanup_duration: Annotated[
Optional[Union[str, int]],
Field(
description=(
"Time to wait after volume is no longer used by any job before deleting it. "
"Defaults to keep the volume indefinitely. "
"Use the value 'off' or -1 to disable auto-cleanup."
)
),
] = None
tags: Annotated[
Optional[Dict[str, str]],
Field(
Expand All @@ -56,6 +67,9 @@ class VolumeConfiguration(CoreModel):
] = None

_validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator)
_validate_auto_cleanup_duration = validator(
"auto_cleanup_duration", pre=True, allow_reuse=True
)(parse_idle_duration)

@property
def size_gb(self) -> int:
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/server/background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
process_gateways_connections,
process_submitted_gateways,
)
from dstack._internal.server.background.tasks.process_idle_volumes import process_idle_volumes
from dstack._internal.server.background.tasks.process_instances import (
process_instances,
)
Expand Down Expand Up @@ -76,6 +77,9 @@ def start_background_tasks() -> AsyncIOScheduler:
_scheduler.add_job(
process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5
)
_scheduler.add_job(
process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1
)
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR):
# Add multiple copies of tasks if requested.
Expand Down
139 changes: 139 additions & 0 deletions src/dstack/_internal/server/background/tasks/process_idle_volumes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import datetime
from typing import List

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload

from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport
from dstack._internal.core.errors import BackendNotAvailable
from dstack._internal.core.models.profiles import parse_duration
from dstack._internal.core.models.volumes import VolumeStatus
from dstack._internal.server.db import get_db, get_session_ctx
from dstack._internal.server.models import ProjectModel, VolumeModel
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.volumes import (
get_volume_configuration,
volume_model_to_volume,
)
from dstack._internal.utils import common
from dstack._internal.utils.common import get_current_datetime
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)


async def process_idle_volumes():
lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__)
async with get_session_ctx() as session:
async with lock:
res = await session.execute(
select(VolumeModel.id)
.where(
VolumeModel.status == VolumeStatus.ACTIVE,
VolumeModel.deleted == False,
VolumeModel.id.not_in(lockset),
)
.order_by(VolumeModel.last_processed_at.asc())
.limit(10)
.with_for_update(skip_locked=True, key_share=True)
)
volume_ids = list(res.scalars().all())
if not volume_ids:
return
for volume_id in volume_ids:
lockset.add(volume_id)

res = await session.execute(
select(VolumeModel)
.where(VolumeModel.id.in_(volume_ids))
.options(joinedload(VolumeModel.project).joinedload(ProjectModel.backends))
.options(joinedload(VolumeModel.user))
.options(joinedload(VolumeModel.attachments))
.execution_options(populate_existing=True)
)
volume_models = list(res.unique().scalars().all())
try:
volumes_to_delete = [v for v in volume_models if _should_delete_volume(v)]
if not volumes_to_delete:
return
await _delete_idle_volumes(session, volumes_to_delete)
finally:
lockset.difference_update(volume_ids)


def _should_delete_volume(volume: VolumeModel) -> bool:
if volume.attachments:
return False

config = get_volume_configuration(volume)
if not config.auto_cleanup_duration:
return False

duration_seconds = parse_duration(config.auto_cleanup_duration)
if not duration_seconds or duration_seconds <= 0:
return False

idle_time = _get_idle_time(volume)
threshold = datetime.timedelta(seconds=duration_seconds)
return idle_time > threshold


def _get_idle_time(volume: VolumeModel) -> datetime.timedelta:
last_used = volume.last_job_processed_at or volume.created_at
last_used_utc = last_used.replace(tzinfo=datetime.timezone.utc)
idle_time = get_current_datetime() - last_used_utc
return max(idle_time, datetime.timedelta(0))


async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]):
# Note: Multiple volumes are deleted in the same transaction,
# so long deletion of one volume may block processing other volumes.
for volume_model in volumes:
logger.info("Deleting idle volume %s", volume_model.name)
try:
await _delete_idle_volume(session, volume_model)
except Exception:
logger.exception("Error when deleting idle volume %s", volume_model.name)

volume_model.deleted = True
volume_model.deleted_at = get_current_datetime()

logger.info("Deleted idle volume %s", volume_model.name)

await session.commit()


async def _delete_idle_volume(session: AsyncSession, volume_model: VolumeModel):
volume = volume_model_to_volume(volume_model)

if volume.provisioning_data is None:
logger.error(
f"Failed to delete volume {volume_model.name}. volume.provisioning_data is None."
)
return

if volume.provisioning_data.backend is None:
logger.error(
f"Failed to delete volume {volume_model.name}. volume.provisioning_data.backend is None."
)
return

try:
backend = await backends_services.get_project_backend_by_type_or_error(
project=volume_model.project,
backend_type=volume.provisioning_data.backend,
)
except BackendNotAvailable:
logger.error(
f"Failed to delete volume {volume_model.name}. Backend {volume.configuration.backend} not available."
)
return

compute = backend.compute()
assert isinstance(compute, ComputeWithVolumeSupport)
await common.run_async(
compute.delete_volume,
volume=volume,
)
Original file line number Diff line number Diff line change
Expand Up @@ -739,3 +739,5 @@ async def _attach_volume(
attachment_data=attachment_data.json(),
)
instance.volume_attachments.append(volume_attachment_model)

volume_model.last_job_processed_at = common_utils.get_current_datetime()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Add VolumeModel.last_job_processed_at

Revision ID: d5863798bf41
Revises: 644b8a114187
Create Date: 2025-07-15 14:26:22.981687

"""

import sqlalchemy as sa
from alembic import op

import dstack._internal.server.models

# revision identifiers, used by Alembic.
revision = "d5863798bf41"
down_revision = "644b8a114187"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("volumes", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"last_job_processed_at",
dstack._internal.server.models.NaiveDateTime(),
nullable=True,
)
)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("volumes", schema=None) as batch_op:
batch_op.drop_column("last_job_processed_at")

# ### end Alembic commands ###
1 change: 1 addition & 0 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ class VolumeModel(BaseModel):
last_processed_at: Mapped[datetime] = mapped_column(
NaiveDateTime, default=get_current_datetime
)
last_job_processed_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
deleted: Mapped[bool] = mapped_column(Boolean, default=False)
deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)

Expand Down
13 changes: 13 additions & 0 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,19 @@ async def process_terminating_job(
# so that stuck volumes don't prevent the instance from terminating.
job_model.instance_id = None
instance_model.last_job_processed_at = common.get_current_datetime()

volume_names = (
jrd.volume_names
if jrd and jrd.volume_names
else [va.volume.name for va in instance_model.volume_attachments]
)
if volume_names:
volumes = await list_project_volume_models(
session=session, project=instance_model.project, names=volume_names
)
for volume in volumes:
volume.last_job_processed_at = common.get_current_datetime()

logger.info(
"%s: instance '%s' has been released, new status is %s",
fmt(job_model),
Expand Down
13 changes: 13 additions & 0 deletions src/dstack/_internal/server/services/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,19 @@ def _validate_volume_configuration(configuration: VolumeConfiguration):
if configuration.name is not None:
validate_dstack_resource_name(configuration.name)

if configuration.volume_id is not None and configuration.auto_cleanup_duration is not None:
if (
isinstance(configuration.auto_cleanup_duration, int)
and configuration.auto_cleanup_duration > 0
) or (
isinstance(configuration.auto_cleanup_duration, str)
and configuration.auto_cleanup_duration not in ("off", "-1")
):
raise ServerClientError(
"External volumes (with volume_id) do not support auto_cleanup_duration. "
"Auto-cleanup only works for volumes created and managed by dstack."
)


async def _delete_volume(session: AsyncSession, project: ProjectModel, volume_model: VolumeModel):
volume = volume_model_to_volume(volume_model)
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/server/testing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ async def create_volume(
status: VolumeStatus = VolumeStatus.SUBMITTED,
created_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc),
last_processed_at: Optional[datetime] = None,
last_job_processed_at: Optional[datetime] = None,
configuration: Optional[VolumeConfiguration] = None,
volume_provisioning_data: Optional[VolumeProvisioningData] = None,
deleted_at: Optional[datetime] = None,
Expand All @@ -759,6 +760,7 @@ async def create_volume(
status=status,
created_at=created_at,
last_processed_at=last_processed_at,
last_job_processed_at=last_job_processed_at,
configuration=configuration.json(),
volume_provisioning_data=volume_provisioning_data.json()
if volume_provisioning_data
Expand Down Expand Up @@ -820,13 +822,15 @@ def get_volume_configuration(
region: str = "eu-west-1",
size: Optional[Memory] = Memory(100),
volume_id: Optional[str] = None,
auto_cleanup_duration: Optional[Union[str, int]] = None,
) -> VolumeConfiguration:
return VolumeConfiguration(
name=name,
backend=backend,
region=region,
size=size,
volume_id=volume_id,
auto_cleanup_duration=auto_cleanup_duration,
)


Expand Down
Loading
Loading