Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion contributing/LOCKING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ await session.commit()
select ...
```

> SQLite exhibits "snapshot isolation". When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html
> SQLite exhibits Snapshot Isolation. When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html

Thus, if a new transaction is not started, you won't see changes that concurrent transactions made before you acquired the lock.

Expand Down Expand Up @@ -82,3 +82,29 @@ In fact, using `joinedload` and `.with_for_update()` will trigger an error becau
**Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column**

If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock.


**Lock unique names**

The following pattern can be used to lock a unique name of some resource type:

```python
lock_namespace = f"fleet_names_{project.name}"
if get_db().dialect_name == "sqlite":
# Start new transaction to see committed changes after lock
await session.commit()
elif get_db().dialect_name == "postgresql":
await session.execute(
select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace)))
)

lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace)
async with lock:
# ... select taken names, use a unique name
await session.commit()
```

Note that:

* This pattern works assuming that Postgres is using default isolation level Read Committed. By the time a transaction acquires the advisory lock, all other transactions that can take the name have committed, so their changes can be seen and a unique name is taken.
* SQLite needs a commit before selecting taken names due to Snapshot Isolation as noted above.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime, timedelta
from typing import List, Optional, Tuple

from sqlalchemy import and_, not_, or_, select
from sqlalchemy import and_, func, not_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload

Expand Down Expand Up @@ -54,6 +54,7 @@
from dstack._internal.server.services.backends import get_project_backend_by_type_or_error
from dstack._internal.server.services.fleets import (
fleet_model_to_fleet,
generate_fleet_name,
get_fleet_requirements,
get_next_instance_num,
)
Expand All @@ -71,7 +72,7 @@
get_job_configured_volumes,
get_job_runtime_data,
)
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.locking import get_locker, string_to_lock_id
from dstack._internal.server.services.logging import fmt
from dstack._internal.server.services.offers import get_offers_by_requirements
from dstack._internal.server.services.requirements.combine import (
Expand Down Expand Up @@ -363,7 +364,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
job_model.job_provisioning_data = job_provisioning_data.json()
job_model.status = JobStatus.PROVISIONING
if fleet_model is None:
fleet_model = _create_fleet_model_for_job(
fleet_model = await _create_fleet_model_for_job(
session=session,
project=project,
run=run,
)
Expand Down Expand Up @@ -752,17 +754,28 @@ def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool:
return True


def _create_fleet_model_for_job(
async def _create_fleet_model_for_job(
session: AsyncSession,
project: ProjectModel,
run: Run,
) -> FleetModel:
placement = InstanceGroupPlacement.ANY
if run.run_spec.configuration.type == "task" and run.run_spec.configuration.nodes > 1:
placement = InstanceGroupPlacement.CLUSTER
nodes = _get_nodes_required_num_for_run(run.run_spec)

lock_namespace = f"fleet_names_{project.name}"
# TODO: Lock fleet names on SQLite.
# Needs some refactoring so that the lock is released after commit.
if get_db().dialect_name == "postgresql":
await session.execute(
select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace)))
)
fleet_name = await generate_fleet_name(session=session, project=project)

spec = FleetSpec(
configuration=FleetConfiguration(
name=run.run_spec.run_name,
name=fleet_name,
placement=placement,
reservation=run.run_spec.configuration.reservation,
nodes=FleetNodesSpec(
Expand All @@ -776,7 +789,7 @@ def _create_fleet_model_for_job(
)
fleet_model = FleetModel(
id=uuid.uuid4(),
name=run.run_spec.run_name,
name=fleet_name,
project=project,
status=FleetStatus.ACTIVE,
spec=spec.json(),
Expand Down
Loading