Skip to content

Commit badb924

Browse files
Removed unnecessary changes and refactor implementations
1 parent c83d2ed commit badb924

File tree

3 files changed

+84
-121
lines changed

3 files changed

+84
-121
lines changed

src/dstack/_internal/server/background/tasks/process_idle_volumes.py

Lines changed: 40 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -31,86 +31,74 @@ async def process_idle_volumes():
3131
.limit(10)
3232
.with_for_update(skip_locked=True)
3333
)
34-
volume_models = list(res.unique().scalars().all())
35-
if not volume_models:
34+
volumes = list(res.unique().scalars().all())
35+
if not volumes:
3636
return
37-
for volume_model in volume_models:
38-
await session.refresh(volume_model, ["project", "attachments"])
39-
lockset.add(volume_model.id)
37+
for volume in volumes:
38+
await session.refresh(volume, ["project", "attachments"])
39+
lockset.add(volume.id)
4040

4141
try:
42-
volumes_to_delete = []
43-
for volume_model in volume_models:
44-
if _should_delete_idle_volume(volume_model):
45-
volumes_to_delete.append(volume_model)
42+
to_delete = []
43+
for volume in volumes:
44+
if _should_delete_volume(volume):
45+
to_delete.append(volume)
4646

47-
if volumes_to_delete:
48-
await _delete_idle_volumes(session, volumes_to_delete)
47+
if to_delete:
48+
await _delete_volumes(session, to_delete)
4949

5050
finally:
51-
for volume_model in volume_models:
52-
lockset.discard(volume_model.id)
51+
for volume in volumes:
52+
lockset.discard(volume.id)
5353

5454

55-
def _should_delete_idle_volume(volume_model: VolumeModel) -> bool:
56-
configuration = get_volume_configuration(volume_model)
55+
def _should_delete_volume(volume: VolumeModel) -> bool:
56+
config = get_volume_configuration(volume)
5757

58-
if configuration.idle_duration is None:
58+
if not config.idle_duration:
5959
return False
6060

61-
if isinstance(configuration.idle_duration, int) and configuration.idle_duration < 0:
61+
if isinstance(config.idle_duration, int) and config.idle_duration < 0:
6262
return False
6363

64-
idle_duration_seconds = parse_duration(configuration.idle_duration)
65-
if idle_duration_seconds is None or idle_duration_seconds <= 0:
64+
duration_seconds = parse_duration(config.idle_duration)
65+
if not duration_seconds or duration_seconds <= 0:
6666
return False
6767

68-
if len(volume_model.attachments) > 0:
69-
logger.debug("Volume %s is still attached to instances, not deleting", volume_model.name)
68+
if volume.attachments:
7069
return False
7170

72-
idle_duration = _get_volume_idle_duration(volume_model)
73-
idle_threshold = datetime.timedelta(seconds=idle_duration_seconds)
71+
idle_time = _get_idle_time(volume)
72+
threshold = datetime.timedelta(seconds=duration_seconds)
7473

75-
if idle_duration > idle_threshold:
74+
if idle_time > threshold:
7675
logger.info(
77-
"Volume %s idle duration expired: idle time %.1f hours, threshold %.1f hours. Marking for deletion",
78-
volume_model.name,
79-
idle_duration.total_seconds() / 3600,
80-
idle_threshold.total_seconds() / 3600,
76+
"Deleting idle volume %s (idle %.1fh)", volume.name, idle_time.total_seconds() / 3600
8177
)
8278
return True
8379

8480
return False
8581

8682

87-
def _get_volume_idle_duration(volume_model: VolumeModel) -> datetime.timedelta:
88-
reference_time = volume_model.created_at
89-
if volume_model.last_job_processed_at is not None:
90-
reference_time = volume_model.last_job_processed_at
83+
def _get_idle_time(volume: VolumeModel) -> datetime.timedelta:
84+
last_used = volume.last_job_processed_at or volume.created_at
85+
last_used_utc = last_used.replace(tzinfo=datetime.timezone.utc)
86+
now = get_current_datetime()
9187

92-
reference_time_utc = reference_time.replace(tzinfo=datetime.timezone.utc)
93-
current_time = get_current_datetime()
88+
idle_time = now - last_used_utc
89+
return max(idle_time, datetime.timedelta(0))
9490

95-
idle_duration = current_time - reference_time_utc
9691

97-
if idle_duration.total_seconds() < 0:
98-
return datetime.timedelta(0)
92+
async def _delete_volumes(session: AsyncSession, volumes: List[VolumeModel]):
93+
by_project = {}
94+
for volume in volumes:
95+
project = volume.project
96+
if project not in by_project:
97+
by_project[project] = []
98+
by_project[project].append(volume.name)
9999

100-
return idle_duration
101-
102-
103-
async def _delete_idle_volumes(session: AsyncSession, volume_models: List[VolumeModel]):
104-
volumes_by_project = {}
105-
for volume_model in volume_models:
106-
project = volume_model.project
107-
if project not in volumes_by_project:
108-
volumes_by_project[project] = []
109-
volumes_by_project[project].append(volume_model.name)
110-
111-
for project, volume_names in volumes_by_project.items():
112-
logger.info("Deleting idle volumes for project %s: %s", project.name, volume_names)
100+
for project, names in by_project.items():
113101
try:
114-
await delete_volumes(session, project, volume_names)
102+
await delete_volumes(session, project, names)
115103
except Exception as e:
116-
logger.error("Failed to delete idle volumes for project %s: %s", project.name, str(e))
104+
logger.error("Failed to delete volumes for project %s: %s", project.name, str(e))

src/dstack/_internal/server/services/jobs/__init__.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,15 +281,18 @@ async def process_terminating_job(
281281
job_model.instance_id = None
282282
instance_model.last_job_processed_at = common.get_current_datetime()
283283

284-
if jrd is not None and jrd.volume_names is not None:
285-
volume_names = jrd.volume_names
286-
else:
287-
volume_names = [va.volume.name for va in instance_model.volume_attachments]
288-
volume_models = await list_project_volume_models(
289-
session=session, project=instance_model.project, names=volume_names
284+
# Update volume timestamps
285+
volume_names = (
286+
jrd.volume_names
287+
if jrd and jrd.volume_names
288+
else [va.volume.name for va in instance_model.volume_attachments]
290289
)
291-
for volume_model in volume_models:
292-
volume_model.last_job_processed_at = common.get_current_datetime()
290+
if volume_names:
291+
volumes = await list_project_volume_models(
292+
session=session, project=instance_model.project, names=volume_names
293+
)
294+
for volume in volumes:
295+
volume.last_job_processed_at = common.get_current_datetime()
293296

294297
logger.info(
295298
"%s: instance '%s' has been released, new status is %s",

src/tests/_internal/server/background/tasks/test_process_idle_volumes.py

Lines changed: 33 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from dstack._internal.core.models.backends.base import BackendType
88
from dstack._internal.core.models.volumes import VolumeStatus
99
from dstack._internal.server.background.tasks.process_idle_volumes import (
10-
_get_volume_idle_duration,
11-
_should_delete_idle_volume,
10+
_get_idle_time,
11+
_should_delete_volume,
1212
process_idle_volumes,
1313
)
1414
from dstack._internal.server.models import VolumeAttachmentModel
@@ -25,7 +25,7 @@
2525
@pytest.mark.asyncio
2626
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
2727
class TestProcessIdleVolumes:
28-
async def test_no_idle_duration_configured(self, test_db, session: AsyncSession):
28+
async def test_no_idle_duration(self, test_db, session: AsyncSession):
2929
project = await create_project(session=session)
3030
user = await create_user(session=session)
3131

@@ -39,106 +39,82 @@ async def test_no_idle_duration_configured(self, test_db, session: AsyncSession)
3939
volume_provisioning_data=get_volume_provisioning_data(),
4040
)
4141

42-
should_delete = _should_delete_idle_volume(volume)
43-
assert not should_delete
42+
assert not _should_delete_volume(volume)
4443

4544
async def test_idle_duration_disabled(self, test_db, session: AsyncSession):
4645
project = await create_project(session=session)
4746
user = await create_user(session=session)
4847

49-
volume_config = get_volume_configuration(name="test-volume")
50-
volume_config.idle_duration = -1
48+
config = get_volume_configuration(name="test-volume")
49+
config.idle_duration = -1
5150

5251
volume = await create_volume(
5352
session=session,
5453
project=project,
5554
user=user,
5655
status=VolumeStatus.ACTIVE,
5756
backend=BackendType.AWS,
58-
configuration=volume_config,
57+
configuration=config,
5958
volume_provisioning_data=get_volume_provisioning_data(),
6059
)
6160

62-
should_delete = _should_delete_idle_volume(volume)
63-
assert not should_delete
61+
assert not _should_delete_volume(volume)
6462

65-
async def test_volume_still_attached(self, test_db, session: AsyncSession):
63+
async def test_volume_attached(self, test_db, session: AsyncSession):
6664
project = await create_project(session=session)
6765
user = await create_user(session=session)
6866

69-
volume_config = get_volume_configuration(name="test-volume")
70-
volume_config.idle_duration = "1h"
67+
config = get_volume_configuration(name="test-volume")
68+
config.idle_duration = "1h"
7169

7270
volume = await create_volume(
7371
session=session,
7472
project=project,
7573
user=user,
7674
status=VolumeStatus.ACTIVE,
7775
backend=BackendType.AWS,
78-
configuration=volume_config,
76+
configuration=config,
7977
volume_provisioning_data=get_volume_provisioning_data(),
8078
)
8179

8280
instance = await create_instance(session=session, project=project)
83-
attachment = VolumeAttachmentModel(
84-
volume_id=volume.id,
85-
instance_id=instance.id,
81+
volume.attachments.append(
82+
VolumeAttachmentModel(volume_id=volume.id, instance_id=instance.id)
8683
)
87-
volume.attachments.append(attachment)
8884
await session.commit()
8985

90-
should_delete = _should_delete_idle_volume(volume)
91-
assert not should_delete
86+
assert not _should_delete_volume(volume)
9287

93-
async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession):
88+
async def test_idle_duration_threshold(self, test_db, session: AsyncSession):
9489
project = await create_project(session=session)
9590
user = await create_user(session=session)
9691

97-
volume_config = get_volume_configuration(name="test-volume")
98-
volume_config.idle_duration = "1h"
92+
config = get_volume_configuration(name="test-volume")
93+
config.idle_duration = "1h"
9994

10095
volume = await create_volume(
10196
session=session,
10297
project=project,
10398
user=user,
10499
status=VolumeStatus.ACTIVE,
105100
backend=BackendType.AWS,
106-
configuration=volume_config,
101+
configuration=config,
107102
volume_provisioning_data=get_volume_provisioning_data(),
108103
)
109104

105+
# Not exceeded - 30 minutes ago
110106
volume.last_job_processed_at = (
111107
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30)
112108
).replace(tzinfo=None)
109+
assert not _should_delete_volume(volume)
113110

114-
should_delete = _should_delete_idle_volume(volume)
115-
assert not should_delete
116-
117-
async def test_idle_duration_exceeded(self, test_db, session: AsyncSession):
118-
project = await create_project(session=session)
119-
user = await create_user(session=session)
120-
121-
volume_config = get_volume_configuration(name="test-volume")
122-
volume_config.idle_duration = "1h"
123-
124-
volume = await create_volume(
125-
session=session,
126-
project=project,
127-
user=user,
128-
status=VolumeStatus.ACTIVE,
129-
backend=BackendType.AWS,
130-
configuration=volume_config,
131-
volume_provisioning_data=get_volume_provisioning_data(),
132-
)
133-
111+
# Exceeded - 2 hours ago
134112
volume.last_job_processed_at = (
135113
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2)
136114
).replace(tzinfo=None)
115+
assert _should_delete_volume(volume)
137116

138-
should_delete = _should_delete_idle_volume(volume)
139-
assert should_delete
140-
141-
async def test_volume_never_used_by_job(self, test_db, session: AsyncSession):
117+
async def test_never_used_volume(self, test_db, session: AsyncSession):
142118
project = await create_project(session=session)
143119
user = await create_user(session=session)
144120

@@ -154,38 +130,34 @@ async def test_volume_never_used_by_job(self, test_db, session: AsyncSession):
154130
)
155131

156132
volume.last_job_processed_at = None
133+
idle_time = _get_idle_time(volume)
134+
assert idle_time.total_seconds() >= 7000
157135

158-
idle_duration = _get_volume_idle_duration(volume)
159-
assert idle_duration.total_seconds() >= 7000
160-
161-
async def test_process_idle_volumes_integration(self, test_db, session: AsyncSession):
136+
async def test_integration(self, test_db, session: AsyncSession):
162137
project = await create_project(session=session)
163138
user = await create_user(session=session)
164139

165-
volume_config = get_volume_configuration(name="test-volume")
166-
volume_config.idle_duration = "1h"
140+
config = get_volume_configuration(name="test-volume")
141+
config.idle_duration = "1h"
167142

168143
volume = await create_volume(
169144
session=session,
170145
project=project,
171146
user=user,
172147
status=VolumeStatus.ACTIVE,
173148
backend=BackendType.AWS,
174-
configuration=volume_config,
149+
configuration=config,
175150
volume_provisioning_data=get_volume_provisioning_data(),
176151
)
177152

178153
volume.last_job_processed_at = (
179154
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2)
180155
).replace(tzinfo=None)
181-
182156
await session.commit()
183157

184158
with patch(
185159
"dstack._internal.server.background.tasks.process_idle_volumes.delete_volumes"
186-
) as mock_delete:
160+
) as mock:
187161
await process_idle_volumes()
188-
189-
mock_delete.assert_called_once()
190-
call_args = mock_delete.call_args
191-
assert call_args[0][2] == ["test-volume"]
162+
mock.assert_called_once()
163+
assert mock.call_args[0][2] == ["test-volume"]

0 commit comments

Comments
 (0)