Skip to content

Commit aaacb43

Browse files
committed
Default to with_for_update(key_share=True) to prevent deadlock
1 parent caef0b9 commit aaacb43

File tree

14 files changed

+24
-20
lines changed

14 files changed

+24
-20
lines changed

contributing/LOCKING.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ Concurrency is hard. Below you'll find common patterns and gotchas when working
4040

4141
This is a common sense approach. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged. In this example, run processing should take job lock instead.
4242

43-
4443
**Start new transaction after acquiring a lock to see other transactions changes in SQLite.**
4544

4645
```python
@@ -76,6 +75,10 @@ unlock resources
7675

7776
If a transaction releases a lock before committing changes, the changes may not be visible to another transaction that acquired the lock and relies upon seeing all committed changes.
7877

79-
**Don't use joinedload when selecting .with_for_update()**
78+
**Don't use `joinedload` when selecting `.with_for_update()`**
8079

8180
In fact, using `joinedload` and `.with_for_update()` will trigger an error because `joinedload` produces OUTER LEFT JOIN that cannot be used with SELECT FOR UPDATE. A regular `.join()` can be used to lock related resources but it may lead to no rows if there is no row to join. Usually, you'd select with `selectinload` or first select with `.with_for_update()` without loading related attributes and then re-selecting with `joinedload` without `.with_for_update()`.
81+
82+
**Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column**
83+
84+
If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock.

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async def process_fleets():
2929
)
3030
.order_by(FleetModel.last_processed_at.asc())
3131
.limit(1)
32-
.with_for_update(skip_locked=True)
32+
.with_for_update(skip_locked=True, key_share=True)
3333
)
3434
fleet_model = res.scalar()
3535
if fleet_model is None:

src/dstack/_internal/server/background/tasks/process_gateways.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async def process_submitted_gateways():
4040
.options(lazyload(GatewayModel.gateway_compute))
4141
.order_by(GatewayModel.last_processed_at.asc())
4242
.limit(1)
43-
.with_for_update(skip_locked=True)
43+
.with_for_update(skip_locked=True, key_share=True)
4444
)
4545
gateway_model = res.scalar()
4646
if gateway_model is None:

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def _process_next_instance():
149149
.options(lazyload(InstanceModel.jobs))
150150
.order_by(InstanceModel.last_processed_at.asc())
151151
.limit(1)
152-
.with_for_update(skip_locked=True)
152+
.with_for_update(skip_locked=True, key_share=True)
153153
)
154154
instance = res.scalar()
155155
if instance is None:

src/dstack/_internal/server/background/tasks/process_placement_groups.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def process_placement_groups():
3030
PlacementGroupModel.id.not_in(lockset),
3131
)
3232
.order_by(PlacementGroupModel.id) # take locks in order
33-
.with_for_update(skip_locked=True)
33+
.with_for_update(skip_locked=True, key_share=True)
3434
)
3535
placement_group_models = res.scalars().all()
3636
if len(placement_group_models) == 0:

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ async def _process_next_running_job():
101101
)
102102
.order_by(JobModel.last_processed_at.asc())
103103
.limit(1)
104-
.with_for_update(skip_locked=True)
104+
.with_for_update(skip_locked=True, key_share=True)
105105
)
106106
job_model = res.unique().scalar()
107107
if job_model is None:

src/dstack/_internal/server/background/tasks/process_runs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def _process_next_run():
6262
)
6363
.order_by(RunModel.last_processed_at.asc())
6464
.limit(1)
65-
.with_for_update(skip_locked=True)
65+
.with_for_update(skip_locked=True, key_share=True)
6666
)
6767
run_model = res.scalar()
6868
if run_model is None:
@@ -74,7 +74,7 @@ async def _process_next_run():
7474
JobModel.id.not_in(job_lockset),
7575
)
7676
.order_by(JobModel.id) # take locks in order
77-
.with_for_update(skip_locked=True)
77+
.with_for_update(skip_locked=True, key_share=True)
7878
)
7979
job_models = res.scalars().all()
8080
if len(run_model.jobs) != len(job_models):

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ async def _process_next_submitted_job():
110110
.limit(1)
111111
.with_for_update(
112112
skip_locked=True,
113+
key_share=True,
113114
# Do not lock joined run, only job.
114115
# Locking run here may cause deadlock.
115116
of=JobModel,
@@ -206,7 +207,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
206207
)
207208
.options(lazyload(InstanceModel.jobs))
208209
.order_by(InstanceModel.id) # take locks in order
209-
.with_for_update()
210+
.with_for_update(key_share=True)
210211
)
211212
pool_instances = list(res.unique().scalars().all())
212213
instances_ids = sorted([i.id for i in pool_instances])
@@ -331,7 +332,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
331332
.where(VolumeModel.id.in_(volumes_ids))
332333
.options(selectinload(VolumeModel.user))
333334
.order_by(VolumeModel.id) # take locks in order
334-
.with_for_update()
335+
.with_for_update(key_share=True)
335336
)
336337
async with get_locker().lock_ctx(VolumeModel.__tablename__, volumes_ids):
337338
if len(volume_models) > 0:

src/dstack/_internal/server/background/tasks/process_terminating_jobs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def _process_next_terminating_job():
4545
)
4646
.order_by(JobModel.last_processed_at.asc())
4747
.limit(1)
48-
.with_for_update(skip_locked=True)
48+
.with_for_update(skip_locked=True, key_share=True)
4949
)
5050
job_model = res.scalar()
5151
if job_model is None:
@@ -58,7 +58,7 @@ async def _process_next_terminating_job():
5858
InstanceModel.id.not_in(instance_lockset),
5959
)
6060
.options(lazyload(InstanceModel.jobs))
61-
.with_for_update(skip_locked=True)
61+
.with_for_update(skip_locked=True, key_share=True)
6262
)
6363
instance_model = res.scalar()
6464
if instance_model is None:

src/dstack/_internal/server/background/tasks/process_volumes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def process_submitted_volumes():
3333
)
3434
.order_by(VolumeModel.last_processed_at.asc())
3535
.limit(1)
36-
.with_for_update(skip_locked=True)
36+
.with_for_update(skip_locked=True, key_share=True)
3737
)
3838
volume_model = res.scalar()
3939
if volume_model is None:

0 commit comments

Comments
 (0)