Skip to content

Commit f7a977d

Browse files
authored
Implement pipeline tasks (#3581)
* Rename tasks/ to scheduled_tasks/ * Support pipeline tasks * Stop workers on pipeline shutdown * Add pipeline draining * Use returning instead of rowcount for heartbeat * Add ComputeGroupPipeline and PlacementGroupPipeline * Add TestComputeGroupWorker * Add TestPlacementGroupWorker * Add DSTACK_FF_PIPELINE_PROCESSING_ENABLED * Fixes * Rename scheduled_tasks tests * Add TestHeartbeater * Split pipeline migration in two * Add pipeline indexes for compute and placement groups * Make PipelineItem a dataclass
1 parent 42c0752 commit f7a977d

52 files changed

Lines changed: 1792 additions & 196 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/dstack/_internal/core/models/compute_groups.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ class ComputeGroupStatus(str, enum.Enum):
1212
RUNNING = "running"
1313
TERMINATED = "terminated"
1414

15+
@classmethod
16+
def finished_statuses(cls) -> List["ComputeGroupStatus"]:
17+
return [cls.TERMINATED]
18+
19+
def is_finished(self):
20+
return self in self.finished_statuses()
21+
1522

1623
class ComputeGroupProvisioningData(CoreModel):
1724
compute_group_id: str

src/dstack/_internal/server/app.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
from dstack._internal.proxy.lib.deps import get_injector_from_app
2424
from dstack._internal.proxy.lib.routers import model_proxy
2525
from dstack._internal.server import settings
26-
from dstack._internal.server.background import start_background_tasks
27-
from dstack._internal.server.background.tasks.process_probes import PROBES_SCHEDULER
26+
from dstack._internal.server.background.pipeline_tasks import start_pipeline_tasks
27+
from dstack._internal.server.background.scheduled_tasks import start_scheduled_tasks
28+
from dstack._internal.server.background.scheduled_tasks.probes import PROBES_SCHEDULER
2829
from dstack._internal.server.db import get_db, get_session_ctx, migrate
2930
from dstack._internal.server.routers import (
3031
auth,
@@ -163,8 +164,11 @@ async def lifespan(app: FastAPI):
163164
if settings.SERVER_S3_BUCKET is not None or settings.SERVER_GCS_BUCKET is not None:
164165
init_default_storage()
165166
scheduler = None
167+
pipeline_manager = None
166168
if settings.SERVER_BACKGROUND_PROCESSING_ENABLED:
167-
scheduler = start_background_tasks()
169+
scheduler = start_scheduled_tasks()
170+
pipeline_manager = start_pipeline_tasks()
171+
app.state.pipeline_manager = pipeline_manager
168172
else:
169173
logger.info("Background processing is disabled")
170174
PROBES_SCHEDULER.start()
@@ -189,9 +193,15 @@ async def lifespan(app: FastAPI):
189193
for func in _ON_STARTUP_HOOKS:
190194
await func(app)
191195
yield
196+
PROBES_SCHEDULER.shutdown(wait=False)
197+
if pipeline_manager is not None:
198+
pipeline_manager.shutdown()
192199
if scheduler is not None:
200+
# Note: Scheduler does not cancel currently running jobs, so scheduled tasks cannot do cleanup.
201+
# TODO: Track and cancel scheduled tasks.
193202
scheduler.shutdown()
194-
PROBES_SCHEDULER.shutdown(wait=False)
203+
if pipeline_manager is not None:
204+
await pipeline_manager.drain()
195205
await gateway_connections_pool.remove_all()
196206
service_conn_pool = await get_injector_from_app(app).get_service_connection_pool()
197207
await service_conn_pool.remove_all()
Lines changed: 0 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -1,142 +0,0 @@
1-
from apscheduler.schedulers.asyncio import AsyncIOScheduler
2-
from apscheduler.triggers.interval import IntervalTrigger
3-
4-
from dstack._internal.server import settings
5-
from dstack._internal.server.background.tasks.process_compute_groups import process_compute_groups
6-
from dstack._internal.server.background.tasks.process_events import delete_events
7-
from dstack._internal.server.background.tasks.process_fleets import process_fleets
8-
from dstack._internal.server.background.tasks.process_gateways import (
9-
process_gateways,
10-
process_gateways_connections,
11-
)
12-
from dstack._internal.server.background.tasks.process_idle_volumes import process_idle_volumes
13-
from dstack._internal.server.background.tasks.process_instances import (
14-
delete_instance_health_checks,
15-
process_instances,
16-
)
17-
from dstack._internal.server.background.tasks.process_metrics import (
18-
collect_metrics,
19-
delete_metrics,
20-
)
21-
from dstack._internal.server.background.tasks.process_placement_groups import (
22-
process_placement_groups,
23-
)
24-
from dstack._internal.server.background.tasks.process_probes import process_probes
25-
from dstack._internal.server.background.tasks.process_prometheus_metrics import (
26-
collect_prometheus_metrics,
27-
delete_prometheus_metrics,
28-
)
29-
from dstack._internal.server.background.tasks.process_running_jobs import process_running_jobs
30-
from dstack._internal.server.background.tasks.process_runs import process_runs
31-
from dstack._internal.server.background.tasks.process_submitted_jobs import process_submitted_jobs
32-
from dstack._internal.server.background.tasks.process_terminating_jobs import (
33-
process_terminating_jobs,
34-
)
35-
from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes
36-
37-
_scheduler = AsyncIOScheduler()
38-
39-
40-
def get_scheduler() -> AsyncIOScheduler:
41-
return _scheduler
42-
43-
44-
def start_background_tasks() -> AsyncIOScheduler:
45-
# Background processing is implemented via in-memory locks on SQLite
46-
# and SELECT FOR UPDATE on Postgres. Locks may be held for a long time.
47-
# This is currently the main bottleneck for scaling dstack processing
48-
# as processing more resources requires more DB connections.
49-
# TODO: Make background processing efficient by committing locks to DB
50-
# and processing outside of DB transactions.
51-
#
52-
# Now we just try to process as many resources as possible without exhausting DB connections.
53-
#
54-
# Quick tasks can process multiple resources per transaction.
55-
# Potentially long tasks process one resource per transaction
56-
# to avoid holding locks for all the resources if one is slow to process.
57-
# Still, the next batch won't be processed unless all resources are processed,
58-
# so larger batches do not increase processing rate linearly.
59-
#
60-
# The interval, batch_size, and max_instances determine background tasks processing rates.
61-
# By default, one server replica can handle:
62-
#
63-
# * 150 active jobs with 2 minutes processing latency
64-
# * 150 active runs with 2 minutes processing latency
65-
# * 150 active instances with 2 minutes processing latency
66-
#
67-
# These latency numbers do not account for provisioning time,
68-
# so it may be slower if a backend is slow to provision.
69-
#
70-
# Users can set SERVER_BACKGROUND_PROCESSING_FACTOR to process more resources per replica.
71-
# They also need to increase max db connections on the client side and db side.
72-
#
73-
# In-memory locking via locksets does not guarantee
74-
# that the first waiting for the lock will acquire it.
75-
# The jitter is needed to give all tasks a chance to acquire locks.
76-
77-
_scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1))
78-
_scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1)
79-
_scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1)
80-
_scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1)
81-
if settings.ENABLE_PROMETHEUS_METRICS:
82-
_scheduler.add_job(
83-
collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1
84-
)
85-
_scheduler.add_job(delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1)
86-
_scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15))
87-
_scheduler.add_job(process_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5)
88-
_scheduler.add_job(
89-
process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5
90-
)
91-
_scheduler.add_job(
92-
process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1
93-
)
94-
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
95-
_scheduler.add_job(
96-
process_fleets,
97-
IntervalTrigger(seconds=10, jitter=2),
98-
max_instances=1,
99-
)
100-
_scheduler.add_job(delete_instance_health_checks, IntervalTrigger(minutes=5), max_instances=1)
101-
for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR):
102-
# Add multiple copies of tasks if requested.
103-
# max_instances=1 for additional copies to avoid running too many tasks.
104-
# Move other tasks here when they need per-replica scaling.
105-
_scheduler.add_job(
106-
process_submitted_jobs,
107-
IntervalTrigger(seconds=4, jitter=2),
108-
kwargs={"batch_size": 5},
109-
max_instances=4 if replica == 0 else 1,
110-
)
111-
_scheduler.add_job(
112-
process_running_jobs,
113-
IntervalTrigger(seconds=4, jitter=2),
114-
kwargs={"batch_size": 5},
115-
max_instances=2 if replica == 0 else 1,
116-
)
117-
_scheduler.add_job(
118-
process_terminating_jobs,
119-
IntervalTrigger(seconds=4, jitter=2),
120-
kwargs={"batch_size": 5},
121-
max_instances=2 if replica == 0 else 1,
122-
)
123-
_scheduler.add_job(
124-
process_runs,
125-
IntervalTrigger(seconds=2, jitter=1),
126-
kwargs={"batch_size": 5},
127-
max_instances=2 if replica == 0 else 1,
128-
)
129-
_scheduler.add_job(
130-
process_instances,
131-
IntervalTrigger(seconds=4, jitter=2),
132-
kwargs={"batch_size": 5},
133-
max_instances=2 if replica == 0 else 1,
134-
)
135-
_scheduler.add_job(
136-
process_compute_groups,
137-
IntervalTrigger(seconds=15, jitter=2),
138-
kwargs={"batch_size": 1},
139-
max_instances=2 if replica == 0 else 1,
140-
)
141-
_scheduler.start()
142-
return _scheduler
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
3+
from dstack._internal.server.background.pipeline_tasks.base import Pipeline
4+
from dstack._internal.server.background.pipeline_tasks.compute_groups import ComputeGroupPipeline
5+
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
6+
PlacementGroupPipeline,
7+
)
8+
from dstack._internal.settings import FeatureFlags
9+
from dstack._internal.utils.logging import get_logger
10+
11+
logger = get_logger(__name__)
12+
13+
14+
class PipelineManager:
15+
def __init__(self) -> None:
16+
self._pipelines: list[Pipeline] = []
17+
if FeatureFlags.PIPELINE_PROCESSING_ENABLED:
18+
self._pipelines += [
19+
ComputeGroupPipeline(),
20+
PlacementGroupPipeline(),
21+
]
22+
self._hinter = PipelineHinter(self._pipelines)
23+
24+
def start(self):
25+
for pipeline in self._pipelines:
26+
pipeline.start()
27+
28+
def shutdown(self):
29+
for pipeline in self._pipelines:
30+
pipeline.shutdown()
31+
32+
async def drain(self):
33+
results = await asyncio.gather(
34+
*[p.drain() for p in self._pipelines], return_exceptions=True
35+
)
36+
for pipeline, result in zip(self._pipelines, results):
37+
if isinstance(result, BaseException):
38+
logger.error(
39+
"Unexpected exception when draining pipeline %r",
40+
pipeline,
41+
exc_info=(type(result), result, result.__traceback__),
42+
)
43+
44+
@property
45+
def hinter(self):
46+
return self._hinter
47+
48+
49+
class PipelineHinter:
50+
def __init__(self, pipelines: list[Pipeline]) -> None:
51+
self._pipelines = pipelines
52+
self._hint_fetch_map = {p.hint_fetch_model_name: p for p in self._pipelines}
53+
54+
def hint_fetch(self, model_name: str):
55+
pipeline = self._hint_fetch_map.get(model_name)
56+
if pipeline is None:
57+
logger.warning("Model %s not registered for fetch hints", model_name)
58+
return
59+
pipeline.hint_fetch()
60+
61+
62+
def start_pipeline_tasks() -> PipelineManager:
63+
"""
64+
Start tasks processed by fetch-workers pipelines based on db + in-memory queues.
65+
Suitable for tasks that run frequently and need to lock rows for a long time.
66+
"""
67+
pipeline_manager = PipelineManager()
68+
pipeline_manager.start()
69+
return pipeline_manager

0 commit comments

Comments
 (0)