Skip to content

Commit f4c9604

Browse files
authored
Fix Postgres deadlocks (#2843)
* Fix run locking when processing jobs * Default to with_for_update(key_share=True) to prevent deadlock
1 parent 2db1752 commit f4c9604

15 files changed

Lines changed: 32 additions & 23 deletions

contributing/LOCKING.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ Concurrency is hard. Below you'll find common patterns and gotchas when working
4040

4141
This is a common sense approach. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged. In this example, run processing should take job lock instead.
4242

43-
4443
**Start new transaction after acquiring a lock to see other transactions changes in SQLite.**
4544

4645
```python
@@ -76,6 +75,10 @@ unlock resources
7675

7776
If a transaction releases a lock before committing changes, the changes may not be visible to another transaction that acquired the lock and relies upon seeing all committed changes.
7877

79-
**Don't use joinedload when selecting .with_for_update()**
78+
**Don't use `joinedload` when selecting `.with_for_update()`**
8079

8180
In fact, using `joinedload` and `.with_for_update()` will trigger an error because `joinedload` produces OUTER LEFT JOIN that cannot be used with SELECT FOR UPDATE. A regular `.join()` can be used to lock related resources but it may lead to no rows if there is no row to join. Usually, you'd select with `selectinload` or first select with `.with_for_update()` without loading related attributes and then re-selecting with `joinedload` without `.with_for_update()`.
81+
82+
**Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column**
83+
84+
If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock.

src/dstack/_internal/core/models/runs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ class JobSubmission(CoreModel):
301301
job_provisioning_data: Optional[JobProvisioningData]
302302
job_runtime_data: Optional[JobRuntimeData]
303303
# TODO: make status_message and error a computed field after migrating to pydanticV2
304-
status_message: Optional[str]
304+
status_message: Optional[str] = None
305305
error: Optional[str] = None
306306

307307
@property

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async def process_fleets():
2929
)
3030
.order_by(FleetModel.last_processed_at.asc())
3131
.limit(1)
32-
.with_for_update(skip_locked=True)
32+
.with_for_update(skip_locked=True, key_share=True)
3333
)
3434
fleet_model = res.scalar()
3535
if fleet_model is None:

src/dstack/_internal/server/background/tasks/process_gateways.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async def process_submitted_gateways():
4040
.options(lazyload(GatewayModel.gateway_compute))
4141
.order_by(GatewayModel.last_processed_at.asc())
4242
.limit(1)
43-
.with_for_update(skip_locked=True)
43+
.with_for_update(skip_locked=True, key_share=True)
4444
)
4545
gateway_model = res.scalar()
4646
if gateway_model is None:

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def _process_next_instance():
149149
.options(lazyload(InstanceModel.jobs))
150150
.order_by(InstanceModel.last_processed_at.asc())
151151
.limit(1)
152-
.with_for_update(skip_locked=True)
152+
.with_for_update(skip_locked=True, key_share=True)
153153
)
154154
instance = res.scalar()
155155
if instance is None:

src/dstack/_internal/server/background/tasks/process_placement_groups.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def process_placement_groups():
3030
PlacementGroupModel.id.not_in(lockset),
3131
)
3232
.order_by(PlacementGroupModel.id) # take locks in order
33-
.with_for_update(skip_locked=True)
33+
.with_for_update(skip_locked=True, key_share=True)
3434
)
3535
placement_group_models = res.scalars().all()
3636
if len(placement_group_models) == 0:

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ async def _process_next_running_job():
101101
)
102102
.order_by(JobModel.last_processed_at.asc())
103103
.limit(1)
104-
.with_for_update(skip_locked=True)
104+
.with_for_update(skip_locked=True, key_share=True)
105105
)
106106
job_model = res.unique().scalar()
107107
if job_model is None:

src/dstack/_internal/server/background/tasks/process_runs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def _process_next_run():
6262
)
6363
.order_by(RunModel.last_processed_at.asc())
6464
.limit(1)
65-
.with_for_update(skip_locked=True)
65+
.with_for_update(skip_locked=True, key_share=True)
6666
)
6767
run_model = res.scalar()
6868
if run_model is None:
@@ -74,7 +74,7 @@ async def _process_next_run():
7474
JobModel.id.not_in(job_lockset),
7575
)
7676
.order_by(JobModel.id) # take locks in order
77-
.with_for_update(skip_locked=True)
77+
.with_for_update(skip_locked=True, key_share=True)
7878
)
7979
job_models = res.scalars().all()
8080
if len(run_model.jobs) != len(job_models):

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ async def _process_next_submitted_job():
9999
JobModel.id.not_in(lockset),
100100
)
101101
# Jobs are process in FIFO sorted by priority globally,
102-
# thus runs from different project can "overtake" each other by using higher priorities.
102+
# thus runs from different projects can "overtake" each other by using higher priorities.
103103
# That's not a big problem as long as projects do not compete for the same compute resources.
104104
# Jobs with lower priorities from other projects will be processed without major lag
105105
# as long as new higher priority runs are not constantly submitted.
@@ -108,7 +108,13 @@ async def _process_next_submitted_job():
108108
# there can be many projects and we are limited by the max DB connections.
109109
.order_by(RunModel.priority.desc(), JobModel.last_processed_at.asc())
110110
.limit(1)
111-
.with_for_update(skip_locked=True)
111+
.with_for_update(
112+
skip_locked=True,
113+
key_share=True,
114+
# Do not lock joined run, only job.
115+
# Locking run here may cause deadlock.
116+
of=JobModel,
117+
)
112118
)
113119
job_model = res.scalar()
114120
if job_model is None:
@@ -201,7 +207,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
201207
)
202208
.options(lazyload(InstanceModel.jobs))
203209
.order_by(InstanceModel.id) # take locks in order
204-
.with_for_update()
210+
.with_for_update(key_share=True)
205211
)
206212
pool_instances = list(res.unique().scalars().all())
207213
instances_ids = sorted([i.id for i in pool_instances])
@@ -326,7 +332,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
326332
.where(VolumeModel.id.in_(volumes_ids))
327333
.options(selectinload(VolumeModel.user))
328334
.order_by(VolumeModel.id) # take locks in order
329-
.with_for_update()
335+
.with_for_update(key_share=True)
330336
)
331337
async with get_locker().lock_ctx(VolumeModel.__tablename__, volumes_ids):
332338
if len(volume_models) > 0:

src/dstack/_internal/server/background/tasks/process_terminating_jobs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def _process_next_terminating_job():
4545
)
4646
.order_by(JobModel.last_processed_at.asc())
4747
.limit(1)
48-
.with_for_update(skip_locked=True)
48+
.with_for_update(skip_locked=True, key_share=True)
4949
)
5050
job_model = res.scalar()
5151
if job_model is None:
@@ -58,7 +58,7 @@ async def _process_next_terminating_job():
5858
InstanceModel.id.not_in(instance_lockset),
5959
)
6060
.options(lazyload(InstanceModel.jobs))
61-
.with_for_update(skip_locked=True)
61+
.with_for_update(skip_locked=True, key_share=True)
6262
)
6363
instance_model = res.scalar()
6464
if instance_model is None:

0 commit comments

Comments
 (0)