Skip to content

Commit 6721cb7

Browse files
feat: add volume idle duration cleanup feature (#2497)
1 parent 026ba42 commit 6721cb7

File tree

9 files changed

+435
-0
lines changed

9 files changed

+435
-0
lines changed

src/dstack/_internal/core/models/volumes.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from dstack._internal.core.models.backends.base import BackendType
1111
from dstack._internal.core.models.common import CoreModel
12+
from dstack._internal.core.models.profiles import parse_idle_duration
1213
from dstack._internal.core.models.resources import Memory
1314
from dstack._internal.utils.common import get_or_error
1415
from dstack._internal.utils.tags import tags_validator
@@ -44,6 +45,16 @@ class VolumeConfiguration(CoreModel):
4445
Optional[str],
4546
Field(description="The volume ID. Must be specified when registering external volumes"),
4647
] = None
48+
idle_duration: Annotated[
49+
Optional[Union[str, int, bool]],
50+
Field(
51+
description=(
52+
"Time to wait after volume is no longer used by any job before deleting it. "
53+
"Defaults to keep the volume indefinitely. "
54+
"Use the value 'off' to disable auto-cleanup."
55+
)
56+
),
57+
] = None
4758
tags: Annotated[
4859
Optional[Dict[str, str]],
4960
Field(
@@ -56,6 +67,9 @@ class VolumeConfiguration(CoreModel):
5667
] = None
5768

5869
_validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator)
70+
_validate_idle_duration = validator("idle_duration", pre=True, allow_reuse=True)(
71+
parse_idle_duration
72+
)
5973

6074
@property
6175
def size_gb(self) -> int:

src/dstack/_internal/server/background/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
process_gateways_connections,
88
process_submitted_gateways,
99
)
10+
from dstack._internal.server.background.tasks.process_idle_volumes import process_idle_volumes
1011
from dstack._internal.server.background.tasks.process_instances import (
1112
process_instances,
1213
)
@@ -92,6 +93,9 @@ def start_background_tasks() -> AsyncIOScheduler:
9293
_scheduler.add_job(
9394
process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5
9495
)
96+
_scheduler.add_job(
97+
process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1
98+
)
9599
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
96100
_scheduler.start()
97101
return _scheduler
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import datetime
2+
from typing import List
3+
4+
from sqlalchemy import select
5+
from sqlalchemy.ext.asyncio import AsyncSession
6+
from sqlalchemy.orm import joinedload
7+
8+
from dstack._internal.core.models.profiles import parse_duration
9+
from dstack._internal.core.models.volumes import VolumeStatus
10+
from dstack._internal.server.db import get_session_ctx
11+
from dstack._internal.server.models import VolumeModel
12+
from dstack._internal.server.services.locking import get_locker
13+
from dstack._internal.server.services.volumes import delete_volumes, get_volume_configuration
14+
from dstack._internal.utils.common import get_current_datetime
15+
from dstack._internal.utils.logging import get_logger
16+
17+
logger = get_logger(__name__)
18+
19+
20+
async def process_idle_volumes(batch_size: int = 10):
21+
"""
22+
Process volumes to check if they have exceeded their idle_duration and delete them.
23+
"""
24+
lock, lockset = get_locker().get_lockset(VolumeModel.__tablename__)
25+
async with get_session_ctx() as session:
26+
async with lock:
27+
res = await session.execute(
28+
select(VolumeModel)
29+
.where(
30+
VolumeModel.status == VolumeStatus.ACTIVE,
31+
VolumeModel.deleted == False,
32+
VolumeModel.id.not_in(lockset),
33+
)
34+
.options(joinedload(VolumeModel.project))
35+
.options(joinedload(VolumeModel.attachments))
36+
.order_by(VolumeModel.last_processed_at.asc())
37+
.limit(batch_size)
38+
.with_for_update(skip_locked=True)
39+
)
40+
volume_models = list(res.unique().scalars().all())
41+
if not volume_models:
42+
return
43+
44+
# Add to lockset
45+
for volume_model in volume_models:
46+
lockset.add(volume_model.id)
47+
48+
try:
49+
volumes_to_delete = []
50+
for volume_model in volume_models:
51+
if await _should_delete_idle_volume(volume_model):
52+
volumes_to_delete.append(volume_model)
53+
54+
if volumes_to_delete:
55+
await _delete_idle_volumes(session, volumes_to_delete)
56+
57+
finally:
58+
# Remove from lockset
59+
for volume_model in volume_models:
60+
lockset.difference_update([volume_model.id])
61+
62+
63+
async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool:
64+
"""
65+
Check if a volume should be deleted based on its idle duration.
66+
"""
67+
# Get volume configuration
68+
configuration = get_volume_configuration(volume_model)
69+
70+
# If no idle_duration is configured, don't delete
71+
if configuration.idle_duration is None:
72+
return False
73+
74+
# If idle_duration is disabled (negative value), don't delete
75+
if isinstance(configuration.idle_duration, int) and configuration.idle_duration < 0:
76+
return False
77+
78+
# Parse idle duration
79+
idle_duration_seconds = parse_duration(configuration.idle_duration)
80+
if idle_duration_seconds is None or idle_duration_seconds <= 0:
81+
return False
82+
83+
# Check if volume is currently attached to any instance
84+
if len(volume_model.attachments) > 0:
85+
logger.debug("Volume %s is still attached to instances, not deleting", volume_model.name)
86+
return False
87+
88+
# Calculate how long the volume has been idle
89+
idle_duration = _get_volume_idle_duration(volume_model)
90+
idle_threshold = datetime.timedelta(seconds=idle_duration_seconds)
91+
92+
if idle_duration > idle_threshold:
93+
logger.info(
94+
"Volume %s idle duration expired: idle time %s seconds, threshold %s seconds. Marking for deletion",
95+
volume_model.name,
96+
idle_duration.total_seconds(),
97+
idle_threshold.total_seconds(),
98+
)
99+
return True
100+
101+
return False
102+
103+
104+
def _get_volume_idle_duration(volume_model: VolumeModel) -> datetime.timedelta:
105+
"""
106+
Calculate how long a volume has been idle.
107+
A volume is considered idle from the time it was last processed by a job.
108+
If it was never used by a job, use the created_at time.
109+
"""
110+
last_time = volume_model.created_at.replace(tzinfo=datetime.timezone.utc)
111+
if volume_model.last_job_processed_at is not None:
112+
last_time = volume_model.last_job_processed_at.replace(tzinfo=datetime.timezone.utc)
113+
return get_current_datetime() - last_time
114+
115+
116+
async def _delete_idle_volumes(session: AsyncSession, volume_models: List[VolumeModel]):
117+
"""
118+
Delete volumes that have exceeded their idle duration.
119+
"""
120+
# Group volumes by project
121+
volumes_by_project = {}
122+
for volume_model in volume_models:
123+
project = volume_model.project
124+
if project not in volumes_by_project:
125+
volumes_by_project[project] = []
126+
volumes_by_project[project].append(volume_model.name)
127+
128+
# Delete volumes by project
129+
for project, volume_names in volumes_by_project.items():
130+
logger.info("Deleting idle volumes for project %s: %s", project.name, volume_names)
131+
try:
132+
await delete_volumes(session, project, volume_names)
133+
except Exception as e:
134+
logger.error("Failed to delete idle volumes for project %s: %s", project.name, str(e))

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,3 +704,6 @@ async def _attach_volume(
704704
attachment_data=attachment_data.json(),
705705
)
706706
instance.volume_attachments.append(volume_attachment_model)
707+
708+
# Update volume last_job_processed_at when it's attached to a job
709+
volume_model.last_job_processed_at = common_utils.get_current_datetime()
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""add_volume_last_job_processed_at
2+
3+
Revision ID: 09f10d63c924
4+
Revises: 35e90e1b0d3e
5+
Create Date: 2025-06-24 11:41:39.588797
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
import dstack._internal.server.models
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "09f10d63c924"
16+
down_revision = "35e90e1b0d3e"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
# ### commands auto generated by Alembic - please adjust! ###
23+
with op.batch_alter_table("volumes", schema=None) as batch_op:
24+
batch_op.add_column(
25+
sa.Column(
26+
"last_job_processed_at",
27+
dstack._internal.server.models.NaiveDateTime(),
28+
nullable=True,
29+
)
30+
)
31+
# ### end Alembic commands ###
32+
33+
34+
def downgrade() -> None:
35+
# ### commands auto generated by Alembic - please adjust! ###
36+
with op.batch_alter_table("volumes", schema=None) as batch_op:
37+
batch_op.drop_column("last_job_processed_at")
38+
# ### end Alembic commands ###

src/dstack/_internal/server/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,7 @@ class VolumeModel(BaseModel):
630630
last_processed_at: Mapped[datetime] = mapped_column(
631631
NaiveDateTime, default=get_current_datetime
632632
)
633+
last_job_processed_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
633634
deleted: Mapped[bool] = mapped_column(Boolean, default=False)
634635
deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
635636

src/dstack/_internal/server/services/jobs/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,19 @@ async def process_terminating_job(
280280
# so that stuck volumes don't prevent the instance from terminating.
281281
job_model.instance_id = None
282282
instance_model.last_job_processed_at = common.get_current_datetime()
283+
284+
# Update volume last_job_processed_at when job is done using them
285+
if jrd is not None and jrd.volume_names is not None:
286+
volume_names = jrd.volume_names
287+
else:
288+
# Legacy jobs before job_runtime_data/blocks were introduced
289+
volume_names = [va.volume.name for va in instance_model.volume_attachments]
290+
volume_models = await list_project_volume_models(
291+
session=session, project=instance_model.project, names=volume_names
292+
)
293+
for volume_model in volume_models:
294+
volume_model.last_job_processed_at = common.get_current_datetime()
295+
283296
logger.info(
284297
"%s: instance '%s' has been released, new status is %s",
285298
fmt(job_model),

src/dstack/_internal/server/testing/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,13 +794,15 @@ def get_volume_configuration(
794794
region: str = "eu-west-1",
795795
size: Optional[Memory] = Memory(100),
796796
volume_id: Optional[str] = None,
797+
idle_duration: Optional[Union[str, int, bool]] = None,
797798
) -> VolumeConfiguration:
798799
return VolumeConfiguration(
799800
name=name,
800801
backend=backend,
801802
region=region,
802803
size=size,
803804
volume_id=volume_id,
805+
idle_duration=idle_duration,
804806
)
805807

806808

0 commit comments

Comments
 (0)