From 4dba3a198ebdf4f0f78338e7ec928c0425247fa3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 8 Sep 2025 16:33:36 +0500 Subject: [PATCH 1/2] Generate unique fleet name for autocreated fleets --- .../tasks/process_submitted_jobs.py | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 21b699a0ba..13b5d330d7 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta from typing import List, Optional, Tuple -from sqlalchemy import and_, not_, or_, select +from sqlalchemy import and_, func, not_, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload @@ -54,6 +54,7 @@ from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.fleets import ( fleet_model_to_fleet, + generate_fleet_name, get_fleet_requirements, get_next_instance_num, ) @@ -71,7 +72,7 @@ get_job_configured_volumes, get_job_runtime_data, ) -from dstack._internal.server.services.locking import get_locker +from dstack._internal.server.services.locking import get_locker, string_to_lock_id from dstack._internal.server.services.logging import fmt from dstack._internal.server.services.offers import get_offers_by_requirements from dstack._internal.server.services.requirements.combine import ( @@ -363,7 +364,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): job_model.job_provisioning_data = job_provisioning_data.json() job_model.status = JobStatus.PROVISIONING if fleet_model is None: - fleet_model = _create_fleet_model_for_job( + fleet_model = await _create_fleet_model_for_job( + session=session, project=project, run=run, ) @@ -752,7 +754,8 @@ def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool: return True -def _create_fleet_model_for_job( +async def _create_fleet_model_for_job( + session: AsyncSession, project: ProjectModel, run: Run, ) -> FleetModel: @@ -760,9 +763,19 @@ def _create_fleet_model_for_job( if run.run_spec.configuration.type == "task" and run.run_spec.configuration.nodes > 1: placement = InstanceGroupPlacement.CLUSTER nodes = _get_nodes_required_num_for_run(run.run_spec) + + lock_namespace = f"fleet_names_{project.name}" + # TODO: Lock fleet names on SQLite. + # Needs some refactoring so that the lock is released after commit. + if get_db().dialect_name == "postgresql": + await session.execute( + select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace))) + ) + fleet_name = await generate_fleet_name(session=session, project=project) + spec = FleetSpec( configuration=FleetConfiguration( - name=run.run_spec.run_name, + name=fleet_name, placement=placement, reservation=run.run_spec.configuration.reservation, nodes=FleetNodesSpec( @@ -776,7 +789,7 @@ def _create_fleet_model_for_job( ) fleet_model = FleetModel( id=uuid.uuid4(), - name=run.run_spec.run_name, + name=fleet_name, project=project, status=FleetStatus.ACTIVE, spec=spec.json(), From d7aa34ec0f15ea4f1a9c21bbb7bd4a92e90ca4cb Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 8 Sep 2025 16:45:51 +0500 Subject: [PATCH 2/2] Document Lock unique names --- contributing/LOCKING.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/contributing/LOCKING.md b/contributing/LOCKING.md index 107b296d2a..54ee31991c 100644 --- a/contributing/LOCKING.md +++ b/contributing/LOCKING.md @@ -50,7 +50,7 @@ await session.commit() select ... ``` -> SQLite exhibits "snapshot isolation". When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html +> SQLite exhibits Snapshot Isolation. When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html Thus, if a new transaction is not started, you won't see changes that concurrent transactions made before you acquired the lock. @@ -82,3 +82,29 @@ In fact, using `joinedload` and `.with_for_update()` will trigger an error becau **Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column** If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock. + + +**Lock unique names** + +The following pattern can be used to lock a unique name of some resource type: + +```python +lock_namespace = f"fleet_names_{project.name}" +if get_db().dialect_name == "sqlite": + # Start new transaction to see committed changes after lock + await session.commit() +elif get_db().dialect_name == "postgresql": + await session.execute( + select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace))) + ) + +lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace) +async with lock: + # ... select taken names, use a unique name + await session.commit() +``` + +Note that: + +* This pattern works assuming that Postgres is using default isolation level Read Committed. By the time a transaction acquires the advisory lock, all other transactions that can take the name have committed, so their changes can be seen and a unique name is taken. +* SQLite needs a commit before selecting taken names due to Snapshot Isolation as noted above.