diff --git a/contributing/LOCKING.md b/contributing/LOCKING.md index f146a5e151..107b296d2a 100644 --- a/contributing/LOCKING.md +++ b/contributing/LOCKING.md @@ -40,7 +40,6 @@ Concurrency is hard. Below you'll find common patterns and gotchas when working This is a common sense approach. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged. In this example, run processing should take job lock instead. - **Start new transaction after acquiring a lock to see other transactions changes in SQLite.** ```python @@ -76,6 +75,10 @@ unlock resources If a transaction releases a lock before committing changes, the changes may not be visible to another transaction that acquired the lock and relies upon seeing all committed changes. -**Don't use joinedload when selecting .with_for_update()** +**Don't use `joinedload` when selecting `.with_for_update()`** In fact, using `joinedload` and `.with_for_update()` will trigger an error because `joinedload` produces OUTER LEFT JOIN that cannot be used with SELECT FOR UPDATE. A regular `.join()` can be used to lock related resources but it may lead to no rows if there is no row to join. Usually, you'd select with `selectinload` or first select with `.with_for_update()` without loading related attributes and then re-selecting with `joinedload` without `.with_for_update()`. + +**Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column** + +If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock. diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 9fd5e092a1..860aa5238e 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -301,7 +301,7 @@ class JobSubmission(CoreModel): job_provisioning_data: Optional[JobProvisioningData] job_runtime_data: Optional[JobRuntimeData] # TODO: make status_message and error a computed field after migrating to pydanticV2 - status_message: Optional[str] + status_message: Optional[str] = None error: Optional[str] = None @property diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index fd98f9c3d5..9a7aae0ae8 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -29,7 +29,7 @@ async def process_fleets(): ) .order_by(FleetModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) fleet_model = res.scalar() if fleet_model is None: diff --git a/src/dstack/_internal/server/background/tasks/process_gateways.py b/src/dstack/_internal/server/background/tasks/process_gateways.py index 12131e6b73..c9a7b673c4 100644 --- a/src/dstack/_internal/server/background/tasks/process_gateways.py +++ b/src/dstack/_internal/server/background/tasks/process_gateways.py @@ -40,7 +40,7 @@ async def process_submitted_gateways(): .options(lazyload(GatewayModel.gateway_compute)) .order_by(GatewayModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) gateway_model = res.scalar() if gateway_model is None: diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index f1d1647699..c567b90eff 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -149,7 +149,7 @@ async def _process_next_instance(): .options(lazyload(InstanceModel.jobs)) .order_by(InstanceModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) instance = res.scalar() if instance is None: diff --git a/src/dstack/_internal/server/background/tasks/process_placement_groups.py b/src/dstack/_internal/server/background/tasks/process_placement_groups.py index 9231d2611e..aff96b60ba 100644 --- a/src/dstack/_internal/server/background/tasks/process_placement_groups.py +++ b/src/dstack/_internal/server/background/tasks/process_placement_groups.py @@ -30,7 +30,7 @@ async def process_placement_groups(): PlacementGroupModel.id.not_in(lockset), ) .order_by(PlacementGroupModel.id) # take locks in order - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) placement_group_models = res.scalars().all() if len(placement_group_models) == 0: diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index b834db39b9..f287d11128 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -101,7 +101,7 @@ async def _process_next_running_job(): ) .order_by(JobModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) job_model = res.unique().scalar() if job_model is None: diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 73dfbbe026..ed7b4bdb0f 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -62,7 +62,7 @@ async def _process_next_run(): ) .order_by(RunModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) run_model = res.scalar() if run_model is None: @@ -74,7 +74,7 @@ async def _process_next_run(): JobModel.id.not_in(job_lockset), ) .order_by(JobModel.id) # take locks in order - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) job_models = res.scalars().all() if len(run_model.jobs) != len(job_models): diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index cce9c89a74..bf59e16814 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -99,7 +99,7 @@ async def _process_next_submitted_job(): JobModel.id.not_in(lockset), ) # Jobs are process in FIFO sorted by priority globally, - # thus runs from different project can "overtake" each other by using higher priorities. + # thus runs from different projects can "overtake" each other by using higher priorities. # That's not a big problem as long as projects do not compete for the same compute resources. # Jobs with lower priorities from other projects will be processed without major lag # as long as new higher priority runs are not constantly submitted. @@ -108,7 +108,13 @@ async def _process_next_submitted_job(): # there can be many projects and we are limited by the max DB connections. .order_by(RunModel.priority.desc(), JobModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update( + skip_locked=True, + key_share=True, + # Do not lock joined run, only job. + # Locking run here may cause deadlock. + of=JobModel, + ) ) job_model = res.scalar() if job_model is None: @@ -201,7 +207,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): ) .options(lazyload(InstanceModel.jobs)) .order_by(InstanceModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) ) pool_instances = list(res.unique().scalars().all()) instances_ids = sorted([i.id for i in pool_instances]) @@ -326,7 +332,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): .where(VolumeModel.id.in_(volumes_ids)) .options(selectinload(VolumeModel.user)) .order_by(VolumeModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) ) async with get_locker().lock_ctx(VolumeModel.__tablename__, volumes_ids): if len(volume_models) > 0: diff --git a/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py b/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py index a97c9e7a7f..4fde2e7898 100644 --- a/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py @@ -45,7 +45,7 @@ async def _process_next_terminating_job(): ) .order_by(JobModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) job_model = res.scalar() if job_model is None: @@ -58,7 +58,7 @@ async def _process_next_terminating_job(): InstanceModel.id.not_in(instance_lockset), ) .options(lazyload(InstanceModel.jobs)) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) instance_model = res.scalar() if instance_model is None: diff --git a/src/dstack/_internal/server/background/tasks/process_volumes.py b/src/dstack/_internal/server/background/tasks/process_volumes.py index cf07f15d41..a77633c63b 100644 --- a/src/dstack/_internal/server/background/tasks/process_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_volumes.py @@ -33,7 +33,7 @@ async def process_submitted_volumes(): ) .order_by(VolumeModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True) + .with_for_update(skip_locked=True, key_share=True) ) volume_model = res.scalar() if volume_model is None: diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 842ba60c58..d29c2b68c3 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -532,7 +532,7 @@ async def delete_fleets( .options(selectinload(FleetModel.runs)) .execution_options(populate_existing=True) .order_by(FleetModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) ) fleet_models = res.scalars().unique().all() fleets = [fleet_model_to_fleet(m) for m in fleet_models] diff --git a/src/dstack/_internal/server/services/gateways/__init__.py b/src/dstack/_internal/server/services/gateways/__init__.py index 9cd9dcde0d..648ad8c17e 100644 --- a/src/dstack/_internal/server/services/gateways/__init__.py +++ b/src/dstack/_internal/server/services/gateways/__init__.py @@ -240,7 +240,7 @@ async def delete_gateways( .options(selectinload(GatewayModel.gateway_compute)) .execution_options(populate_existing=True) .order_by(GatewayModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) ) gateway_models = res.scalars().all() for gateway_model in gateway_models: diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index badc2a0276..385660f944 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -589,7 +589,7 @@ async def stop_run(session: AsyncSession, run_model: RunModel, abort: bool): select(RunModel) .where(RunModel.id == run_model.id) .order_by(RunModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) .execution_options(populate_existing=True) ) run_model = res.scalar_one() @@ -597,7 +597,7 @@ async def stop_run(session: AsyncSession, run_model: RunModel, abort: bool): select(JobModel) .where(JobModel.run_id == run_model.id) .order_by(JobModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) .execution_options(populate_existing=True) ) if run_model.status.is_finished(): @@ -633,7 +633,7 @@ async def delete_runs( select(RunModel) .where(RunModel.id.in_(run_ids)) .order_by(RunModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) ) run_models = res.scalars().all() active_runs = [r for r in run_models if not r.status.is_finished()] diff --git a/src/dstack/_internal/server/services/volumes.py b/src/dstack/_internal/server/services/volumes.py index 8913be3b09..8d5a8a18cc 100644 --- a/src/dstack/_internal/server/services/volumes.py +++ b/src/dstack/_internal/server/services/volumes.py @@ -275,7 +275,7 @@ async def delete_volumes(session: AsyncSession, project: ProjectModel, names: Li .options(selectinload(VolumeModel.attachments)) .execution_options(populate_existing=True) .order_by(VolumeModel.id) # take locks in order - .with_for_update() + .with_for_update(key_share=True) ) volume_models = res.scalars().unique().all() for volume_model in volume_models: