Skip to content

Commit d513c79

Browse files
committed
Add status-specific processing interval
1 parent f3b8eaf commit d513c79

3 files changed

Lines changed: 36 additions & 5 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/base.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ async def heartbeat(self):
255255

256256
class Fetcher(Generic[ItemT], ABC):
257257
_DEFAULT_FETCH_DELAYS = [0.5, 1, 2, 5]
258+
"""Increasing fetch delays on empty fetches to avoid frequent selects on low-activity/low-resource servers."""
258259

259260
def __init__(
260261
self,
@@ -319,7 +320,15 @@ async def fetch(self, limit: int) -> list[ItemT]:
319320
pass
320321

321322
def _next_fetch_delay(self, empty_fetch_count: int) -> float:
322-
next_delay = self._fetch_delays[min(empty_fetch_count, len(self._fetch_delays) - 1)]
323+
effective_empty_fetch_count = empty_fetch_count
324+
if random.random() < 0.1:
325+
# Empty fetch count can be 0 not because there are no items in the DB,
326+
# but for other reasons such as waiting parent resource processing.
327+
# From time to time, force minimal next delay to avoid empty results due to rare fetches.
328+
effective_empty_fetch_count = 0
329+
next_delay = self._fetch_delays[
330+
min(effective_empty_fetch_count, len(self._fetch_delays) - 1)
331+
]
323332
jitter = random.random() * 0.4 - 0.2
324333
return next_delay * (1 + jitter)
325334

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def __init__(
117117
workers_num: int = 20,
118118
queue_lower_limit_factor: float = 0.5,
119119
queue_upper_limit_factor: float = 2.0,
120-
min_processing_interval: timedelta = timedelta(seconds=10),
120+
min_processing_interval: timedelta = timedelta(seconds=5),
121121
lock_timeout: timedelta = timedelta(seconds=30),
122122
heartbeat_trigger: timedelta = timedelta(seconds=15),
123123
) -> None:
@@ -196,7 +196,19 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
196196
[JobStatus.PROVISIONING, JobStatus.PULLING, JobStatus.RUNNING]
197197
),
198198
RunModel.status.not_in([RunStatus.TERMINATING]),
199-
JobModel.last_processed_at <= now - self._min_processing_interval,
199+
or_(
200+
# Process provisioning and pulling jobs quicker for low-latency provisioning.
201+
# Active jobs processing can be less frequent to minimize contention with `RunPipeline`.
202+
and_(
203+
JobModel.status.in_([JobStatus.PROVISIONING, JobStatus.PULLING]),
204+
JobModel.last_processed_at <= now - self._min_processing_interval,
205+
),
206+
and_(
207+
JobModel.status.in_([JobStatus.RUNNING]),
208+
JobModel.last_processed_at
209+
<= now - self._min_processing_interval * 2,
210+
),
211+
),
200212
or_(
201213
and_(
202214
# Do not try to lock jobs if the run is waiting for the lock,

src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(
5555
workers_num: int = 10,
5656
queue_lower_limit_factor: float = 0.5,
5757
queue_upper_limit_factor: float = 2.0,
58-
min_processing_interval: timedelta = timedelta(seconds=10),
58+
min_processing_interval: timedelta = timedelta(seconds=5),
5959
lock_timeout: timedelta = timedelta(seconds=30),
6060
heartbeat_trigger: timedelta = timedelta(seconds=15),
6161
) -> None:
@@ -164,7 +164,17 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]:
164164
),
165165
),
166166
or_(
167-
RunModel.last_processed_at <= now - self._min_processing_interval,
167+
# Process submitted runs quicker for low-latency provisioning.
168+
# Active run processing can be less frequent to minimize contention with `JobRunningPipeline`.
169+
and_(
170+
RunModel.status == RunStatus.SUBMITTED,
171+
RunModel.last_processed_at <= now - self._min_processing_interval,
172+
),
173+
and_(
174+
RunModel.status != RunStatus.SUBMITTED,
175+
RunModel.last_processed_at
176+
<= now - self._min_processing_interval * 2,
177+
),
168178
RunModel.last_processed_at == RunModel.submitted_at,
169179
),
170180
or_(

0 commit comments

Comments
 (0)