Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions contributing/LOCKING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ Concurrency is hard. Below you'll find common patterns and gotchas when working

This is a common sense approach. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged. In this example, run processing should take job lock instead.


**Start new transaction after acquiring a lock to see other transactions changes in SQLite.**

```python
Expand Down Expand Up @@ -76,6 +75,10 @@ unlock resources

If a transaction releases a lock before committing changes, the changes may not be visible to another transaction that acquired the lock and relies upon seeing all committed changes.

**Don't use joinedload when selecting .with_for_update()**
**Don't use `joinedload` when selecting `.with_for_update()`**

In fact, using `joinedload` and `.with_for_update()` will trigger an error because `joinedload` produces OUTER LEFT JOIN that cannot be used with SELECT FOR UPDATE. A regular `.join()` can be used to lock related resources but it may lead to no rows if there is no row to join. Usually, you'd select with `selectinload` or first select with `.with_for_update()` without loading related attributes and then re-selecting with `joinedload` without `.with_for_update()`.

**Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column**

If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock.
2 changes: 1 addition & 1 deletion src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class JobSubmission(CoreModel):
job_provisioning_data: Optional[JobProvisioningData]
job_runtime_data: Optional[JobRuntimeData]
# TODO: make status_message and error a computed field after migrating to pydanticV2
status_message: Optional[str]
status_message: Optional[str] = None
error: Optional[str] = None

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def process_fleets():
)
.order_by(FleetModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
fleet_model = res.scalar()
if fleet_model is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def process_submitted_gateways():
.options(lazyload(GatewayModel.gateway_compute))
.order_by(GatewayModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
gateway_model = res.scalar()
if gateway_model is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async def _process_next_instance():
.options(lazyload(InstanceModel.jobs))
.order_by(InstanceModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
instance = res.scalar()
if instance is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def process_placement_groups():
PlacementGroupModel.id.not_in(lockset),
)
.order_by(PlacementGroupModel.id) # take locks in order
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
placement_group_models = res.scalars().all()
if len(placement_group_models) == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def _process_next_running_job():
)
.order_by(JobModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
job_model = res.unique().scalar()
if job_model is None:
Expand Down
4 changes: 2 additions & 2 deletions src/dstack/_internal/server/background/tasks/process_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def _process_next_run():
)
.order_by(RunModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
run_model = res.scalar()
if run_model is None:
Expand All @@ -74,7 +74,7 @@ async def _process_next_run():
JobModel.id.not_in(job_lockset),
)
.order_by(JobModel.id) # take locks in order
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
job_models = res.scalars().all()
if len(run_model.jobs) != len(job_models):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def _process_next_submitted_job():
JobModel.id.not_in(lockset),
)
# Jobs are process in FIFO sorted by priority globally,
# thus runs from different project can "overtake" each other by using higher priorities.
# thus runs from different projects can "overtake" each other by using higher priorities.
# That's not a big problem as long as projects do not compete for the same compute resources.
# Jobs with lower priorities from other projects will be processed without major lag
# as long as new higher priority runs are not constantly submitted.
Expand All @@ -108,7 +108,13 @@ async def _process_next_submitted_job():
# there can be many projects and we are limited by the max DB connections.
.order_by(RunModel.priority.desc(), JobModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(
skip_locked=True,
key_share=True,
# Do not lock joined run, only job.
# Locking run here may cause deadlock.
of=JobModel,
)
)
job_model = res.scalar()
if job_model is None:
Expand Down Expand Up @@ -201,7 +207,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
)
.options(lazyload(InstanceModel.jobs))
.order_by(InstanceModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
)
pool_instances = list(res.unique().scalars().all())
instances_ids = sorted([i.id for i in pool_instances])
Expand Down Expand Up @@ -326,7 +332,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
.where(VolumeModel.id.in_(volumes_ids))
.options(selectinload(VolumeModel.user))
.order_by(VolumeModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
)
async with get_locker().lock_ctx(VolumeModel.__tablename__, volumes_ids):
if len(volume_models) > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def _process_next_terminating_job():
)
.order_by(JobModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
job_model = res.scalar()
if job_model is None:
Expand All @@ -58,7 +58,7 @@ async def _process_next_terminating_job():
InstanceModel.id.not_in(instance_lockset),
)
.options(lazyload(InstanceModel.jobs))
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
instance_model = res.scalar()
if instance_model is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def process_submitted_volumes():
)
.order_by(VolumeModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
.with_for_update(skip_locked=True, key_share=True)
)
volume_model = res.scalar()
if volume_model is None:
Expand Down
2 changes: 1 addition & 1 deletion src/dstack/_internal/server/services/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ async def delete_fleets(
.options(selectinload(FleetModel.runs))
.execution_options(populate_existing=True)
.order_by(FleetModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
)
fleet_models = res.scalars().unique().all()
fleets = [fleet_model_to_fleet(m) for m in fleet_models]
Expand Down
2 changes: 1 addition & 1 deletion src/dstack/_internal/server/services/gateways/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async def delete_gateways(
.options(selectinload(GatewayModel.gateway_compute))
.execution_options(populate_existing=True)
.order_by(GatewayModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
)
gateway_models = res.scalars().all()
for gateway_model in gateway_models:
Expand Down
6 changes: 3 additions & 3 deletions src/dstack/_internal/server/services/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,15 @@ async def stop_run(session: AsyncSession, run_model: RunModel, abort: bool):
select(RunModel)
.where(RunModel.id == run_model.id)
.order_by(RunModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
.execution_options(populate_existing=True)
)
run_model = res.scalar_one()
await session.execute(
select(JobModel)
.where(JobModel.run_id == run_model.id)
.order_by(JobModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
.execution_options(populate_existing=True)
)
if run_model.status.is_finished():
Expand Down Expand Up @@ -633,7 +633,7 @@ async def delete_runs(
select(RunModel)
.where(RunModel.id.in_(run_ids))
.order_by(RunModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
)
run_models = res.scalars().all()
active_runs = [r for r in run_models if not r.status.is_finished()]
Expand Down
2 changes: 1 addition & 1 deletion src/dstack/_internal/server/services/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ async def delete_volumes(session: AsyncSession, project: ProjectModel, names: Li
.options(selectinload(VolumeModel.attachments))
.execution_options(populate_existing=True)
.order_by(VolumeModel.id) # take locks in order
.with_for_update()
.with_for_update(key_share=True)
)
volume_models = res.scalars().unique().all()
for volume_model in volume_models:
Expand Down
Loading