From 70bb18021413bcaceb7821bd0f82622730e41d9f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 30 Jun 2025 16:46:57 +0500 Subject: [PATCH 01/20] Process fleets in batches --- src/dstack/_internal/server/background/__init__.py | 7 ++++++- .../server/background/tasks/process_fleets.py | 11 ++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 4ac4fb25bf..59bf63287d 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -84,7 +84,12 @@ def start_background_tasks() -> AsyncIOScheduler: kwargs={"batch_size": 5}, max_instances=2, ) - _scheduler.add_job(process_fleets, IntervalTrigger(seconds=10, jitter=2)) + _scheduler.add_job( + process_fleets, + IntervalTrigger(seconds=10, jitter=2), + kwargs={"batch_size": 5}, + max_instances=2, + ) _scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15)) _scheduler.add_job( process_submitted_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5 diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index 9a7aae0ae8..d8c825aa94 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -1,3 +1,5 @@ +import asyncio + from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload @@ -17,7 +19,14 @@ logger = get_logger(__name__) -async def process_fleets(): +async def process_fleets(batch_size: int = 1): + tasks = [] + for _ in range(batch_size): + tasks.append(_process_next_fleet()) + await asyncio.gather(*tasks) + + +async def _process_next_fleet(): lock, lockset = get_locker().get_lockset(FleetModel.__tablename__) async with get_session_ctx() as session: async with lock: From 72d5ee7a2486523923d2d159b6c020acfb4c7df5 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 1 Jul 2025 12:39:25 +0500 Subject: [PATCH 02/20] Increase offers_cache --- src/dstack/_internal/core/backends/base/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 4efcbc851c..cca7d2be49 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -57,7 +57,7 @@ class Compute(ABC): def __init__(self): self._offers_cache_lock = threading.Lock() - self._offers_cache = TTLCache(maxsize=5, ttl=30) + self._offers_cache = TTLCache(maxsize=10, ttl=120) @abstractmethod def get_offers( From 7e2da7b52a823811d798a1979a1ce36ffc870606 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 1 Jul 2025 12:42:22 +0500 Subject: [PATCH 03/20] Cache redundant AWS API calls --- .../_internal/core/backends/aws/compute.py | 225 ++++++++++++++++-- 1 file changed, 209 insertions(+), 16 deletions(-) diff --git a/src/dstack/_internal/core/backends/aws/compute.py b/src/dstack/_internal/core/backends/aws/compute.py index 249acb4e6f..17d1836ed2 100644 --- a/src/dstack/_internal/core/backends/aws/compute.py +++ b/src/dstack/_internal/core/backends/aws/compute.py @@ -1,14 +1,21 @@ +import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List, Optional, Tuple import boto3 import botocore.client import botocore.exceptions +from cachetools import Cache, TTLCache, cachedmethod +from cachetools.keys import hashkey from pydantic import ValidationError import dstack._internal.core.backends.aws.resources as aws_resources from dstack._internal import settings -from dstack._internal.core.backends.aws.models import AWSAccessKeyCreds, AWSConfig +from dstack._internal.core.backends.aws.models import ( + AWSAccessKeyCreds, + AWSConfig, + AWSOSImageConfig, +) from dstack._internal.core.backends.base.compute import ( Compute, ComputeWithCreateInstanceSupport, @@ -66,6 +73,10 @@ class AWSVolumeBackendData(CoreModel): iops: int +def _ec2client_cache_methodkey(self, ec2_client, *args, **kwargs): + return hashkey(*args, **kwargs) + + class AWSCompute( ComputeWithCreateInstanceSupport, ComputeWithMultinodeSupport, @@ -86,6 +97,23 @@ def __init__(self, config: AWSConfig): ) else: # default creds self.session = boto3.Session() + # Caches to avoid redundant API calls when provisioning many instances + # get_offers is already cached but we still cache its sub-functions + # with more aggressive/longer caches. + self._get_regions_to_quotas_cache_lock = threading.Lock() + self._get_regions_to_quotas_cache = TTLCache(maxsize=10, ttl=300) + self._get_regions_to_zones_cache_lock = threading.Lock() + self._get_regions_to_zones_cache = Cache(maxsize=10) + self._get_vpc_id_subnet_id_or_error_cache_lock = threading.Lock() + self._get_vpc_id_subnet_id_or_error_cache = TTLCache(maxsize=100, ttl=600) + self._get_maximum_efa_interfaces_cache_lock = threading.Lock() + self._get_maximum_efa_interfaces_cache = Cache(maxsize=100) + self._get_subnets_availability_zones_cache_lock = threading.Lock() + self._get_subnets_availability_zones_cache = Cache(maxsize=100) + self._create_security_group_cache_lock = threading.Lock() + self._create_security_group_cache = TTLCache(maxsize=100, ttl=600) + self._get_image_id_and_username_cache_lock = threading.Lock() + self._get_image_id_and_username_cache = TTLCache(maxsize=100, ttl=600) def get_offers( self, requirements: Optional[Requirements] = None @@ -126,8 +154,8 @@ def _supported_instances_with_reservation(offer: InstanceOffer) -> bool: extra_filter=filter, ) regions = list(set(i.region for i in offers)) - regions_to_quotas = _get_regions_to_quotas(self.session, regions) - regions_to_zones = _get_regions_to_zones(self.session, regions) + regions_to_quotas = self._get_regions_to_quotas(self.session, regions) + regions_to_zones = self._get_regions_to_zones(self.session, regions) availability_offers = [] for offer in offers: @@ -186,21 +214,24 @@ def create_instance( tags = aws_resources.filter_invalid_tags(tags) disk_size = round(instance_offer.instance.resources.disk.size_mib / 1024) - max_efa_interfaces = _get_maximum_efa_interfaces( - ec2_client=ec2_client, instance_type=instance_offer.instance.name + max_efa_interfaces = self._get_maximum_efa_interfaces( + ec2_client=ec2_client, + region=instance_offer.region, + instance_type=instance_offer.instance.name, ) enable_efa = max_efa_interfaces > 0 is_capacity_block = False try: - vpc_id, subnet_ids = get_vpc_id_subnet_id_or_error( + vpc_id, subnet_ids = self._get_vpc_id_subnet_id_or_error( ec2_client=ec2_client, config=self.config, region=instance_offer.region, allocate_public_ip=allocate_public_ip, availability_zones=zones, ) - subnet_id_to_az_map = aws_resources.get_subnets_availability_zones( + subnet_id_to_az_map = self._get_subnets_availability_zones( ec2_client=ec2_client, + region=instance_offer.region, subnet_ids=subnet_ids, ) if instance_config.reservation: @@ -229,12 +260,19 @@ def create_instance( tried_zones.add(az) try: logger.debug("Trying provisioning %s in %s", instance_offer.instance.name, az) - image_id, username = aws_resources.get_image_id_and_username( + image_id, username = self._get_image_id_and_username( ec2_client=ec2_client, + region=instance_offer.region, cuda=len(instance_offer.instance.resources.gpus) > 0, instance_type=instance_offer.instance.name, image_config=self.config.os_images, ) + security_group_id = self._create_security_group( + ec2_client=ec2_client, + region=instance_offer.region, + project_id=project_name, + vpc_id=vpc_id, + ) response = ec2_resource.create_instances( **aws_resources.create_instances_struct( disk_size=disk_size, @@ -243,11 +281,7 @@ def create_instance( iam_instance_profile=self.config.iam_instance_profile, user_data=get_user_data(authorized_keys=instance_config.get_public_keys()), tags=aws_resources.make_tags(tags), - security_group_id=aws_resources.create_security_group( - ec2_client=ec2_client, - project_id=project_name, - vpc_id=vpc_id, - ), + security_group_id=security_group_id, spot=instance_offer.instance.resources.spot, subnet_id=subnet_id, allocate_public_ip=allocate_public_ip, @@ -361,7 +395,7 @@ def create_gateway( tags = aws_resources.filter_invalid_tags(tags) tags = aws_resources.make_tags(tags) - vpc_id, subnets_ids = get_vpc_id_subnet_id_or_error( + vpc_id, subnets_ids = self._get_vpc_id_subnet_id_or_error( ec2_client=ec2_client, config=self.config, region=configuration.region, @@ -696,6 +730,165 @@ def is_volume_detached(self, volume: Volume, provisioning_data: JobProvisioningD return True return True + def _get_regions_to_quotas_key( + self, + session: boto3.Session, + regions: List[str], + ) -> tuple: + return hashkey(tuple(regions)) + + @cachedmethod( + cache=lambda self: self._get_regions_to_quotas_cache, + key=_get_regions_to_quotas_key, + lock=lambda self: self._get_regions_to_quotas_cache_lock, + ) + def _get_regions_to_quotas( + self, + session: boto3.Session, + regions: List[str], + ) -> Dict[str, Dict[str, int]]: + return _get_regions_to_quotas(session=session, regions=regions) + + def _get_regions_to_zones_key( + self, + session: boto3.Session, + regions: List[str], + ) -> tuple: + return hashkey(tuple(regions)) + + @cachedmethod( + cache=lambda self: self._get_regions_to_zones_cache, + key=_get_regions_to_zones_key, + lock=lambda self: self._get_regions_to_zones_cache_lock, + ) + def _get_regions_to_zones( + self, + session: boto3.Session, + regions: List[str], + ) -> Dict[str, List[str]]: + return _get_regions_to_zones(session=session, regions=regions) + + def _get_vpc_id_subnet_id_or_error_cache_key( + self, + ec2_client: botocore.client.BaseClient, + config: AWSConfig, + region: str, + allocate_public_ip: bool, + availability_zones: Optional[List[str]] = None, + ) -> tuple: + return hashkey( + region, allocate_public_ip, tuple(availability_zones) if availability_zones else None + ) + + @cachedmethod( + cache=lambda self: self._get_vpc_id_subnet_id_or_error_cache, + key=_get_vpc_id_subnet_id_or_error_cache_key, + lock=lambda self: self._get_vpc_id_subnet_id_or_error_cache_lock, + ) + def _get_vpc_id_subnet_id_or_error( + self, + ec2_client: botocore.client.BaseClient, + config: AWSConfig, + region: str, + allocate_public_ip: bool, + availability_zones: Optional[List[str]] = None, + ) -> Tuple[str, List[str]]: + return get_vpc_id_subnet_id_or_error( + ec2_client=ec2_client, + config=config, + region=region, + allocate_public_ip=allocate_public_ip, + availability_zones=availability_zones, + ) + + @cachedmethod( + cache=lambda self: self._get_maximum_efa_interfaces_cache, + key=_ec2client_cache_methodkey, + lock=lambda self: self._get_maximum_efa_interfaces_cache_lock, + ) + def _get_maximum_efa_interfaces( + self, + ec2_client: botocore.client.BaseClient, + region: str, + instance_type: str, + ) -> int: + return _get_maximum_efa_interfaces( + ec2_client=ec2_client, + instance_type=instance_type, + ) + + def _get_subnets_availability_zones_key( + self, + ec2_client: botocore.client.BaseClient, + region: str, + subnet_ids: List[str], + ) -> tuple: + return hashkey(region, tuple(subnet_ids)) + + @cachedmethod( + cache=lambda self: self._get_subnets_availability_zones_cache, + key=_get_subnets_availability_zones_key, + lock=lambda self: self._get_subnets_availability_zones_cache_lock, + ) + def _get_subnets_availability_zones( + self, + ec2_client: botocore.client.BaseClient, + region: str, + subnet_ids: List[str], + ) -> Dict[str, str]: + return aws_resources.get_subnets_availability_zones( + ec2_client=ec2_client, + subnet_ids=subnet_ids, + ) + + @cachedmethod( + cache=lambda self: self._create_security_group_cache, + key=_ec2client_cache_methodkey, + lock=lambda self: self._create_security_group_cache_lock, + ) + def _create_security_group( + self, + ec2_client: botocore.client.BaseClient, + region: str, + project_id: str, + vpc_id: Optional[str], + ) -> str: + return aws_resources.create_security_group( + ec2_client=ec2_client, + project_id=project_id, + vpc_id=vpc_id, + ) + + def _get_image_id_and_username_cache_key( + self, + ec2_client: botocore.client.BaseClient, + region: str, + cuda: bool, + instance_type: str, + image_config: Optional[AWSOSImageConfig] = None, + ) -> tuple: + return hashkey(region, cuda, instance_type, image_config.json() if image_config else None) + + @cachedmethod( + cache=lambda self: self._get_image_id_and_username_cache, + key=_get_image_id_and_username_cache_key, + lock=lambda self: self._get_image_id_and_username_cache_lock, + ) + def _get_image_id_and_username( + self, + ec2_client: botocore.client.BaseClient, + region: str, + cuda: bool, + instance_type: str, + image_config: Optional[AWSOSImageConfig] = None, + ) -> tuple[str, str]: + return aws_resources.get_image_id_and_username( + ec2_client=ec2_client, + cuda=cuda, + instance_type=instance_type, + image_config=image_config, + ) + def get_vpc_id_subnet_id_or_error( ec2_client: botocore.client.BaseClient, @@ -798,7 +991,7 @@ def get_region_quotas(client: botocore.client.BaseClient) -> Dict[str, int]: return region_quotas regions_to_quotas = {} - with ThreadPoolExecutor(max_workers=8) as executor: + with ThreadPoolExecutor(max_workers=12) as executor: future_to_region = {} for region in regions: future = executor.submit( @@ -823,7 +1016,7 @@ def _has_quota(quotas: Dict[str, int], instance_name: str) -> Optional[bool]: def _get_regions_to_zones(session: boto3.Session, regions: List[str]) -> Dict[str, List[str]]: regions_to_zones = {} - with ThreadPoolExecutor(max_workers=8) as executor: + with ThreadPoolExecutor(max_workers=12) as executor: future_to_region = {} for region in regions: future = executor.submit( From 5d1783db1aaf6dbd34763fc429bd8a214c59fb6d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 1 Jul 2025 14:49:32 +0500 Subject: [PATCH 04/20] Set MIN_PROCESSING_INTERVAL --- .../server/background/tasks/process_instances.py | 4 ++++ .../server/background/tasks/process_running_jobs.py | 4 ++++ .../_internal/server/background/tasks/process_runs.py | 4 ++++ src/dstack/_internal/server/testing/common.py | 8 +++++--- .../server/background/tasks/test_process_running_jobs.py | 4 ++++ 5 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index c567b90eff..84d09bcf9b 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -110,6 +110,8 @@ pkey_from_str, ) +MIN_PROCESSING_INTERVAL = timedelta(seconds=5) + PENDING_JOB_RETRY_INTERVAL = timedelta(seconds=60) TERMINATION_DEADLINE_OFFSET = timedelta(minutes=20) @@ -145,6 +147,8 @@ async def _process_next_instance(): ] ), InstanceModel.id.not_in(lockset), + InstanceModel.last_processed_at + < get_current_datetime().replace(tzinfo=None) - MIN_PROCESSING_INTERVAL, ) .options(lazyload(InstanceModel.jobs)) .order_by(InstanceModel.last_processed_at.asc()) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 5249efe5d2..5af453c5f4 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -79,6 +79,7 @@ logger = get_logger(__name__) +MIN_PROCESSING_INTERVAL = timedelta(seconds=5) # Minimum time before terminating active job in case of connectivity issues. # Should be sufficient to survive most problems caused by # the server network flickering and providers' glitches. @@ -103,6 +104,9 @@ async def _process_next_running_job(): [JobStatus.PROVISIONING, JobStatus.PULLING, JobStatus.RUNNING] ), JobModel.id.not_in(lockset), + JobModel.last_processed_at + < common_utils.get_current_datetime().replace(tzinfo=None) + - MIN_PROCESSING_INTERVAL, ) .order_by(JobModel.last_processed_at.asc()) .limit(1) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 5a2d596664..82f55f47e4 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -41,6 +41,8 @@ from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) + +MIN_PROCESSING_INTERVAL = datetime.timedelta(seconds=5) ROLLING_DEPLOYMENT_MAX_SURGE = 1 # at most one extra replica during rolling deployment @@ -61,6 +63,8 @@ async def _process_next_run(): .where( RunModel.status.not_in(RunStatus.finished_statuses()), RunModel.id.not_in(run_lockset), + RunModel.last_processed_at + < common.get_current_datetime().replace(tzinfo=None) - MIN_PROCESSING_INTERVAL, ) .order_by(RunModel.last_processed_at.asc()) .limit(1) diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 4bcb95404e..61b39d1cfd 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -560,10 +560,10 @@ async def create_instance( instance_id: Optional[UUID] = None, job: Optional[JobModel] = None, instance_num: int = 0, - backend: Optional[BackendType] = BackendType.DATACRUNCH, + backend: BackendType = BackendType.DATACRUNCH, termination_policy: Optional[TerminationPolicy] = None, termination_idle_time: int = DEFAULT_FLEET_TERMINATION_IDLE_TIME, - region: Optional[str] = "eu-west", + region: str = "eu-west", remote_connection_info: Optional[RemoteConnectionInfo] = None, offer: Optional[Union[InstanceOfferWithAvailability, Literal["auto"]]] = "auto", job_provisioning_data: Optional[Union[JobProvisioningData, Literal["auto"]]] = "auto", @@ -572,6 +572,7 @@ async def create_instance( name: str = "test_instance", volumes: Optional[List[VolumeModel]] = None, price: float = 1.0, + last_processed_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), ) -> InstanceModel: if instance_id is None: instance_id = uuid.uuid4() @@ -608,6 +609,7 @@ async def create_instance( fleet=fleet, project=project, status=status, + last_processed_at=last_processed_at, unreachable=unreachable, created_at=created_at, started_at=created_at, @@ -1038,7 +1040,7 @@ class ComputeMockSpec( ComputeWithVolumeSupport, ): """ - Can be used to create Compute mocks that pass all isinstance asserts. + Can be used to create Compute mocks that pass all `isinstance()` asserts. """ pass diff --git a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py index 59a08ddc4d..a16151f7da 100644 --- a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py @@ -224,6 +224,7 @@ async def test_updates_running_job(self, test_db, session: AsyncSession, tmp_pat instance=instance, instance_assigned=True, ) + last_processed_at = job.last_processed_at with ( patch("dstack._internal.server.services.runner.ssh.SSHTunnel") as SSHTunnelMock, patch( @@ -244,6 +245,8 @@ async def test_updates_running_job(self, test_db, session: AsyncSession, tmp_pat assert job is not None assert job.status == JobStatus.RUNNING assert job.runner_timestamp == 1 + job.last_processed_at = last_processed_at + await session.commit() with ( patch("dstack._internal.server.services.runner.ssh.SSHTunnel") as SSHTunnelMock, patch( @@ -776,6 +779,7 @@ async def test_gpu_utilization( job_provisioning_data=get_job_provisioning_data(), instance=instance, instance_assigned=True, + last_processed_at=datetime(2023, 1, 1, 11, 30, tzinfo=timezone.utc), ) for timestamp, gpu_util in samples: # two GPUs, the second one always 100% utilized From cb8515dd5da5a45fa25a6da330cc7061eaa7b815 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 08:27:28 +0500 Subject: [PATCH 05/20] Prevent concurrent quotas requests --- src/dstack/_internal/core/backends/aws/compute.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/backends/aws/compute.py b/src/dstack/_internal/core/backends/aws/compute.py index 17d1836ed2..edc26136b0 100644 --- a/src/dstack/_internal/core/backends/aws/compute.py +++ b/src/dstack/_internal/core/backends/aws/compute.py @@ -101,6 +101,7 @@ def __init__(self, config: AWSConfig): # get_offers is already cached but we still cache its sub-functions # with more aggressive/longer caches. self._get_regions_to_quotas_cache_lock = threading.Lock() + self._get_regions_to_quotas_execution_lock = threading.Lock() self._get_regions_to_quotas_cache = TTLCache(maxsize=10, ttl=300) self._get_regions_to_zones_cache_lock = threading.Lock() self._get_regions_to_zones_cache = Cache(maxsize=10) @@ -154,7 +155,10 @@ def _supported_instances_with_reservation(offer: InstanceOffer) -> bool: extra_filter=filter, ) regions = list(set(i.region for i in offers)) - regions_to_quotas = self._get_regions_to_quotas(self.session, regions) + with self._get_regions_to_quotas_execution_lock: + # Cache lock does not prevent concurrent execution. + # We use a separate lock to avoid requesting quotas in parallel and hitting rate limits. + regions_to_quotas = self._get_regions_to_quotas(self.session, regions) regions_to_zones = self._get_regions_to_zones(self.session, regions) availability_offers = [] From 6bbdcd2ed0aefb3df21e2aebd95f0ad25e026766 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 08:28:48 +0500 Subject: [PATCH 06/20] Increase MIN_PROCESSING_INTERVAL --- .../_internal/server/background/tasks/process_instances.py | 2 +- .../_internal/server/background/tasks/process_running_jobs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 84d09bcf9b..7ab11f23fc 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -110,7 +110,7 @@ pkey_from_str, ) -MIN_PROCESSING_INTERVAL = timedelta(seconds=5) +MIN_PROCESSING_INTERVAL = timedelta(seconds=10) PENDING_JOB_RETRY_INTERVAL = timedelta(seconds=60) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 5af453c5f4..282ea13c8f 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -79,7 +79,7 @@ logger = get_logger(__name__) -MIN_PROCESSING_INTERVAL = timedelta(seconds=5) +MIN_PROCESSING_INTERVAL = timedelta(seconds=10) # Minimum time before terminating active job in case of connectivity issues. # Should be sufficient to survive most problems caused by # the server network flickering and providers' glitches. From 99d83fa21429ea1ea10392899e452a70cae29981 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 09:40:49 +0500 Subject: [PATCH 07/20] Make runs stopping async --- src/dstack/_internal/server/services/runs.py | 59 +++++++------------ .../_internal/server/routers/test_runs.py | 27 +++------ 2 files changed, 30 insertions(+), 56 deletions(-) diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index bb775bdf29..8b2c5eab8a 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -586,46 +586,29 @@ async def stop_runs( ) run_models = res.scalars().all() run_ids = sorted([r.id for r in run_models]) - res = await session.execute(select(JobModel).where(JobModel.run_id.in_(run_ids))) - job_models = res.scalars().all() - job_ids = sorted([j.id for j in job_models]) await session.commit() - async with ( - get_locker().lock_ctx(RunModel.__tablename__, run_ids), - get_locker().lock_ctx(JobModel.__tablename__, job_ids), - ): + async with get_locker().lock_ctx(RunModel.__tablename__, run_ids): + res = await session.execute( + select(RunModel) + .where(RunModel.id.in_(run_ids)) + .order_by(RunModel.id) # take locks in order + .with_for_update(key_share=True) + .execution_options(populate_existing=True) + ) + run_models = res.scalars().all() + now = common_utils.get_current_datetime() for run_model in run_models: - await stop_run(session=session, run_model=run_model, abort=abort) - - -async def stop_run(session: AsyncSession, run_model: RunModel, abort: bool): - res = await session.execute( - select(RunModel) - .where(RunModel.id == run_model.id) - .order_by(RunModel.id) # take locks in order - .with_for_update(key_share=True) - .execution_options(populate_existing=True) - ) - run_model = res.scalar_one() - await session.execute( - select(JobModel) - .where(JobModel.run_id == run_model.id) - .order_by(JobModel.id) # take locks in order - .with_for_update(key_share=True) - .execution_options(populate_existing=True) - ) - if run_model.status.is_finished(): - return - run_model.status = RunStatus.TERMINATING - if abort: - run_model.termination_reason = RunTerminationReason.ABORTED_BY_USER - else: - run_model.termination_reason = RunTerminationReason.STOPPED_BY_USER - # process the run out of turn - logger.debug("%s: terminating because %s", fmt(run_model), run_model.termination_reason.name) - await process_terminating_run(session, run_model) - run_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + if run_model.status.is_finished(): + continue + run_model.status = RunStatus.TERMINATING + if abort: + run_model.termination_reason = RunTerminationReason.ABORTED_BY_USER + else: + run_model.termination_reason = RunTerminationReason.STOPPED_BY_USER + run_model.last_processed_at = now + # The run will be terminated by process_runs. + # Terminating synchronously is problematic since it may take a long time. + await session.commit() async def delete_runs( diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 880c3be5df..942e5d906b 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -33,7 +33,6 @@ ApplyRunPlanInput, JobSpec, JobStatus, - JobTerminationReason, Run, RunSpec, RunStatus, @@ -1480,7 +1479,7 @@ async def test_returns_403_if_not_project_member( @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_terminates_submitted_run( + async def test_marks_submitted_run_as_terminating( self, test_db, session: AsyncSession, client: AsyncClient ): user = await create_user(session=session, global_role=GlobalRole.USER) @@ -1498,7 +1497,7 @@ async def test_terminates_submitted_run( repo=repo, user=user, ) - job = await create_job( + await create_job( session=session, run=run, ) @@ -1511,13 +1510,10 @@ async def test_terminates_submitted_run( await session.refresh(run) assert run.status == RunStatus.TERMINATING assert run.termination_reason == RunTerminationReason.STOPPED_BY_USER - await session.refresh(job) - assert job.status == JobStatus.TERMINATING - assert job.termination_reason == JobTerminationReason.TERMINATED_BY_USER @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_terminates_running_run( + async def test_marks_running_run_as_terminating( self, test_db, session: AsyncSession, client: AsyncClient ): user = await create_user(session=session, global_role=GlobalRole.USER) @@ -1541,7 +1537,7 @@ async def test_terminates_running_run( project=project, status=InstanceStatus.BUSY, ) - job = await create_job( + await create_job( session=session, run=run, job_provisioning_data=get_job_provisioning_data(), @@ -1549,20 +1545,15 @@ async def test_terminates_running_run( instance=instance, instance_assigned=True, ) - with patch("dstack._internal.server.services.jobs._stop_runner") as stop_runner: - response = await client.post( - f"/api/project/{project.name}/runs/stop", - headers=get_auth_headers(user.token), - json={"runs_names": [run.run_name], "abort": False}, - ) - stop_runner.assert_called_once() + response = await client.post( + f"/api/project/{project.name}/runs/stop", + headers=get_auth_headers(user.token), + json={"runs_names": [run.run_name], "abort": False}, + ) assert response.status_code == 200 await session.refresh(run) assert run.status == RunStatus.TERMINATING assert run.termination_reason == RunTerminationReason.STOPPED_BY_USER - await session.refresh(job) - assert job.status == JobStatus.TERMINATING - assert job.termination_reason == JobTerminationReason.TERMINATED_BY_USER @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) From b46f3f02ad21fe2fcf9da238be4e70d3d3f58806 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 12:47:36 +0500 Subject: [PATCH 08/20] Introduce SERVER_BACKGROUND_PROCESSING_RATE --- .../_internal/server/background/__init__.py | 29 ++++++++++++------- .../background/tasks/process_running_jobs.py | 9 +++++- src/dstack/_internal/server/settings.py | 16 +++++++--- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 59bf63287d..0d08910c67 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -40,12 +40,19 @@ def start_background_tasks() -> AsyncIOScheduler: # In-memory locking via locksets does not guarantee # that the first waiting for the lock will acquire it. # The jitter is needed to give all tasks a chance to acquire locks. - + # # The batch_size and interval determine background tasks processing rates. - # Currently one server replica can handle: - # * 150 active jobs with up to 2 minutes processing latency - # * 150 active runs with up to 2 minutes processing latency - # * 150 active instances with up to 2 minutes processing latency + # By default, one server replica can handle: + # + # * 150 active jobs with 2 minutes processing latency + # * 150 active runs with 2 minutes processing latency + # * 150 active instances with 2 minutes processing latency + # + # These latency numbers do not account for provisioning time, + # so it may be slower if a backend is slow to provision. + # + # Using larger batches to process more resources can lead to DB connections exhaustion. + # Users can set SERVER_BACKGROUND_PROCESSING_RATE to process more resources per replica. _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) if settings.ENABLE_PROMETHEUS_METRICS: @@ -57,37 +64,37 @@ def start_background_tasks() -> AsyncIOScheduler: _scheduler.add_job( process_submitted_jobs, IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, + kwargs={"batch_size": 10 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, max_instances=2, ) _scheduler.add_job( process_running_jobs, IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, + kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, max_instances=2, ) _scheduler.add_job( process_terminating_jobs, IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, + kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, max_instances=2, ) _scheduler.add_job( process_runs, IntervalTrigger(seconds=2, jitter=1), - kwargs={"batch_size": 5}, + kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, max_instances=2, ) _scheduler.add_job( process_instances, IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5}, + kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, max_instances=2, ) _scheduler.add_job( process_fleets, IntervalTrigger(seconds=10, jitter=2), - kwargs={"batch_size": 5}, + kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, max_instances=2, ) _scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15)) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 282ea13c8f..14ec35f45f 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -34,6 +34,7 @@ JobTerminationReason, Run, RunSpec, + RunStatus, ) from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint from dstack._internal.server.background.tasks.common import get_provisioning_timeout @@ -99,10 +100,12 @@ async def _process_next_running_job(): async with lock: res = await session.execute( select(JobModel) + .join(JobModel.run) .where( JobModel.status.in_( [JobStatus.PROVISIONING, JobStatus.PULLING, JobStatus.RUNNING] ), + RunModel.status.not_in([RunStatus.TERMINATING]), JobModel.id.not_in(lockset), JobModel.last_processed_at < common_utils.get_current_datetime().replace(tzinfo=None) @@ -110,7 +113,11 @@ async def _process_next_running_job(): ) .order_by(JobModel.last_processed_at.asc()) .limit(1) - .with_for_update(skip_locked=True, key_share=True) + .with_for_update( + skip_locked=True, + key_share=True, + of=JobModel, + ) ) job_model = res.unique().scalar() if job_model is None: diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index e71c8ce657..d42bd10846 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -27,10 +27,18 @@ ALEMBIC_MIGRATIONS_LOCATION = os.getenv( "DSTACK_ALEMBIC_MIGRATIONS_LOCATION", "dstack._internal.server:migrations" ) -# Users may want to increase pool size to support more concurrent resources -# if their db supports many connections -DB_POOL_SIZE = int(os.getenv("DSTACK_DB_POOL_SIZE", 10)) -DB_MAX_OVERFLOW = int(os.getenv("DSTACK_DB_MAX_OVERFLOW", 10)) + +# Users may want to increase client pool size to support more concurrent resources +# if their db supports many connections. +DB_POOL_SIZE = int(os.getenv("DSTACK_DB_POOL_SIZE", 20)) +DB_MAX_OVERFLOW = int(os.getenv("DSTACK_DB_MAX_OVERFLOW", 20)) + +# Increases the size of processing batches +# allowing to process more resource on one server replica. +# Not recommended to change on SQLite. +# DSTACK_DB_POOL_SIZE and DSTACK_DB_MAX_OVERFLOW +# must be increased proportionally to handle larger batches. +SERVER_BACKGROUND_PROCESSING_RATE = int(os.getenv("DSTACK_SERVER_BACKGROUND_PROCESSING_RATE", 1)) MAX_OFFERS_TRIED = int(os.getenv("DSTACK_SERVER_MAX_OFFERS_TRIED", 25)) From 55bca44be5173b5834da7ee50e581b83b53214c8 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 14:55:10 +0500 Subject: [PATCH 09/20] Use dummy locking on Postgres --- .../_internal/core/backends/base/compute.py | 2 +- .../server/background/tasks/process_fleets.py | 4 +- .../background/tasks/process_gateways.py | 2 +- .../background/tasks/process_instances.py | 4 +- .../tasks/process_placement_groups.py | 6 +- .../background/tasks/process_running_jobs.py | 4 +- .../server/background/tasks/process_runs.py | 6 +- .../tasks/process_submitted_jobs.py | 12 +- .../tasks/process_terminating_jobs.py | 8 +- .../background/tasks/process_volumes.py | 4 +- .../_internal/server/services/fleets.py | 6 +- .../server/services/gateways/__init__.py | 6 +- .../_internal/server/services/locking.py | 113 ++++++++++++++++-- src/dstack/_internal/server/services/runs.py | 6 +- .../_internal/server/services/volumes.py | 4 +- src/dstack/_internal/server/settings.py | 2 +- 16 files changed, 144 insertions(+), 45 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index cca7d2be49..0125f6066b 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -57,7 +57,7 @@ class Compute(ABC): def __init__(self): self._offers_cache_lock = threading.Lock() - self._offers_cache = TTLCache(maxsize=10, ttl=120) + self._offers_cache = TTLCache(maxsize=10, ttl=180) @abstractmethod def get_offers( diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index d8c825aa94..6a8d5645f3 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -5,7 +5,7 @@ from sqlalchemy.orm import joinedload from dstack._internal.core.models.fleets import FleetStatus -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import FleetModel from dstack._internal.server.services.fleets import ( is_fleet_empty, @@ -27,7 +27,7 @@ async def process_fleets(batch_size: int = 1): async def _process_next_fleet(): - lock, lockset = get_locker().get_lockset(FleetModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset(FleetModel.__tablename__) async with get_session_ctx() as session: async with lock: res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_gateways.py b/src/dstack/_internal/server/background/tasks/process_gateways.py index c9a7b673c4..e2a17aa151 100644 --- a/src/dstack/_internal/server/background/tasks/process_gateways.py +++ b/src/dstack/_internal/server/background/tasks/process_gateways.py @@ -28,7 +28,7 @@ async def process_gateways_connections(): async def process_submitted_gateways(): - lock, lockset = get_locker().get_lockset(GatewayModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset(GatewayModel.__tablename__) async with get_session_ctx() as session: async with lock: res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 7ab11f23fc..677ed9ddb8 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -73,7 +73,7 @@ from dstack._internal.core.services.profiles import get_retry from dstack._internal.server import settings as server_settings from dstack._internal.server.background.tasks.common import get_provisioning_timeout -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( FleetModel, InstanceModel, @@ -131,7 +131,7 @@ async def process_instances(batch_size: int = 1): async def _process_next_instance(): - lock, lockset = get_locker().get_lockset(InstanceModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset(InstanceModel.__tablename__) async with get_session_ctx() as session: async with lock: res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_placement_groups.py b/src/dstack/_internal/server/background/tasks/process_placement_groups.py index aff96b60ba..54b46cd91e 100644 --- a/src/dstack/_internal/server/background/tasks/process_placement_groups.py +++ b/src/dstack/_internal/server/background/tasks/process_placement_groups.py @@ -7,7 +7,7 @@ from dstack._internal.core.backends.base.compute import ComputeWithPlacementGroupSupport from dstack._internal.core.errors import PlacementGroupInUseError -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import PlacementGroupModel, ProjectModel from dstack._internal.server.services import backends as backends_services from dstack._internal.server.services.locking import get_locker @@ -19,7 +19,9 @@ async def process_placement_groups(): - lock, lockset = get_locker().get_lockset(PlacementGroupModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset( + PlacementGroupModel.__tablename__ + ) async with get_session_ctx() as session: async with lock: res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 14ec35f45f..368cd6137b 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -38,7 +38,7 @@ ) from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint from dstack._internal.server.background.tasks.common import get_provisioning_timeout -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( InstanceModel, JobModel, @@ -95,7 +95,7 @@ async def process_running_jobs(batch_size: int = 1): async def _process_next_running_job(): - lock, lockset = get_locker().get_lockset(JobModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) async with get_session_ctx() as session: async with lock: res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 82f55f47e4..3e7cafb61e 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -19,7 +19,7 @@ RunStatus, RunTerminationReason, ) -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import JobModel, ProjectModel, RunModel from dstack._internal.server.services.jobs import ( find_job, @@ -54,8 +54,8 @@ async def process_runs(batch_size: int = 1): async def _process_next_run(): - run_lock, run_lockset = get_locker().get_lockset(RunModel.__tablename__) - job_lock, job_lockset = get_locker().get_lockset(JobModel.__tablename__) + run_lock, run_lockset = get_locker(get_db().dialect_name).get_lockset(RunModel.__tablename__) + job_lock, job_lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) async with get_session_ctx() as session: async with run_lock, job_lock: res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index bf59e16814..56a66ff82d 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -88,7 +88,7 @@ async def process_submitted_jobs(batch_size: int = 1): async def _process_next_submitted_job(): - lock, lockset = get_locker().get_lockset(JobModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) async with get_session_ctx() as session: async with lock: res = await session.execute( @@ -214,7 +214,9 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): if get_db().dialect_name == "sqlite": # Start new transaction to see committed changes after lock await session.commit() - async with get_locker().lock_ctx(InstanceModel.__tablename__, instances_ids): + async with get_locker(get_db().dialect_name).lock_ctx( + InstanceModel.__tablename__, instances_ids + ): # If another job freed the instance but is still trying to detach volumes, # do not provision on it to prevent attaching volumes that are currently detaching. detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) @@ -334,7 +336,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): .order_by(VolumeModel.id) # take locks in order .with_for_update(key_share=True) ) - async with get_locker().lock_ctx(VolumeModel.__tablename__, volumes_ids): + async with get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids): if len(volume_models) > 0: await _attach_volumes( session=session, @@ -527,7 +529,9 @@ async def _get_next_instance_num(session: AsyncSession, fleet_model: FleetModel) if len(fleet_model.instances) == 0: # No instances means the fleet is not in the db yet, so don't lock. return 0 - async with get_locker().lock_ctx(FleetModel.__tablename__, [fleet_model.id]): + async with get_locker(get_db().dialect_name).lock_ctx( + FleetModel.__tablename__, [fleet_model.id] + ): fleet_model = ( ( await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py b/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py index 4fde2e7898..4514ffb2bc 100644 --- a/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_terminating_jobs.py @@ -5,7 +5,7 @@ from sqlalchemy.orm import joinedload, lazyload from dstack._internal.core.models.runs import JobStatus -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( InstanceModel, JobModel, @@ -32,8 +32,10 @@ async def process_terminating_jobs(batch_size: int = 1): async def _process_next_terminating_job(): - job_lock, job_lockset = get_locker().get_lockset(JobModel.__tablename__) - instance_lock, instance_lockset = get_locker().get_lockset(InstanceModel.__tablename__) + job_lock, job_lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) + instance_lock, instance_lockset = get_locker(get_db().dialect_name).get_lockset( + InstanceModel.__tablename__ + ) async with get_session_ctx() as session: async with job_lock, instance_lock: res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_volumes.py b/src/dstack/_internal/server/background/tasks/process_volumes.py index a77633c63b..6434d96f0a 100644 --- a/src/dstack/_internal/server/background/tasks/process_volumes.py +++ b/src/dstack/_internal/server/background/tasks/process_volumes.py @@ -5,7 +5,7 @@ from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport from dstack._internal.core.errors import BackendError, BackendNotAvailable from dstack._internal.core.models.volumes import VolumeStatus -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( InstanceModel, ProjectModel, @@ -22,7 +22,7 @@ async def process_submitted_volumes(): - lock, lockset = get_locker().get_lockset(VolumeModel.__tablename__) + lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__) async with get_session_ctx() as session: async with lock: res = await session.execute( diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index d29c2b68c3..d54173115a 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -362,7 +362,7 @@ async def create_fleet( select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace))) ) - lock, _ = get_locker().get_lockset(lock_namespace) + lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace) async with lock: if spec.configuration.name is not None: fleet_model = await get_project_fleet_model_by_name( @@ -516,8 +516,8 @@ async def delete_fleets( await session.commit() logger.info("Deleting fleets: %s", [v.name for v in fleet_models]) async with ( - get_locker().lock_ctx(FleetModel.__tablename__, fleets_ids), - get_locker().lock_ctx(InstanceModel.__tablename__, instances_ids), + get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, fleets_ids), + get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids), ): # Refetch after lock # TODO lock instances with FOR UPDATE? diff --git a/src/dstack/_internal/server/services/gateways/__init__.py b/src/dstack/_internal/server/services/gateways/__init__.py index 648ad8c17e..1564592576 100644 --- a/src/dstack/_internal/server/services/gateways/__init__.py +++ b/src/dstack/_internal/server/services/gateways/__init__.py @@ -162,7 +162,7 @@ async def create_gateway( select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace))) ) - lock, _ = get_locker().get_lockset(lock_namespace) + lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace) async with lock: if configuration.name is None: configuration.name = await generate_gateway_name(session=session, project=project) @@ -229,7 +229,9 @@ async def delete_gateways( gateways_ids = sorted([g.id for g in gateway_models]) await session.commit() logger.info("Deleting gateways: %s", [g.name for g in gateway_models]) - async with get_locker().lock_ctx(GatewayModel.__tablename__, gateways_ids): + async with get_locker(get_db().dialect_name).lock_ctx( + GatewayModel.__tablename__, gateways_ids + ): # Refetch after lock res = await session.execute( select(GatewayModel) diff --git a/src/dstack/_internal/server/services/locking.py b/src/dstack/_internal/server/services/locking.py index d5fb275fc6..37807b37a8 100644 --- a/src/dstack/_internal/server/services/locking.py +++ b/src/dstack/_internal/server/services/locking.py @@ -1,8 +1,10 @@ import asyncio +import collections.abc import hashlib +from abc import abstractmethod from asyncio import Lock from contextlib import asynccontextmanager -from typing import AsyncGenerator, Dict, List, Set, Tuple, TypeVar, Union +from typing import AsyncGenerator, Iterable, Iterator, Protocol, TypeVar, Union from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncConnection, AsyncSession @@ -10,23 +12,54 @@ KeyT = TypeVar("KeyT") -class ResourceLocker: - def __init__(self): - self.namespace_to_locks_map: Dict[str, Tuple[Lock, set]] = {} +class LocksetLock(Protocol): + async def acquire(self) -> bool: ... + def release(self) -> None: ... + async def __aenter__(self): ... + async def __aexit__(self, exc_type, exc, tb): ... + + +T = TypeVar("T") + - def get_lockset(self, namespace: str) -> Tuple[Lock, set]: +class Lockset(Protocol[T]): + def __contains__(self, item: T) -> bool: ... + def __iter__(self) -> Iterator[T]: ... + def __len__(self) -> int: ... + def add(self, item: T) -> None: ... + def discard(self, item: T) -> None: ... + def update(self, other: Iterable[T]) -> None: ... + def difference_update(self, other: Iterable[T]) -> None: ... + + +class ResourceLocker: + @abstractmethod + def get_lockset(self, namespace: str) -> tuple[LocksetLock, Lockset]: """ Returns a lockset containing locked resources for in-memory locking. Also returns a lock that guards the lockset. """ - return self.namespace_to_locks_map.setdefault(namespace, (Lock(), set())) + pass + @abstractmethod @asynccontextmanager - async def lock_ctx(self, namespace: str, keys: List[KeyT]): + async def lock_ctx(self, namespace: str, keys: list[KeyT]): """ Acquires locks for all keys in namespace. The keys must be sorted to prevent deadlock. """ + yield + + +class InMemoryResourceLocker(ResourceLocker): + def __init__(self): + self.namespace_to_locks_map: dict[str, tuple[Lock, set]] = {} + + def get_lockset(self, namespace: str) -> tuple[Lock, set]: + return self.namespace_to_locks_map.setdefault(namespace, (Lock(), set())) + + @asynccontextmanager + async def lock_ctx(self, namespace: str, keys: list[KeyT]): lock, lockset = self.get_lockset(namespace) try: await _wait_to_lock_many(lock, lockset, keys) @@ -35,6 +68,56 @@ async def lock_ctx(self, namespace: str, keys: List[KeyT]): lockset.difference_update(keys) +class DummyAsyncLock: + async def __aenter__(self): + pass + + async def __aexit__(self, exc_type, exc, tb): + pass + + async def acquire(self): + return True + + def release(self): + pass + + +class DummySet(collections.abc.MutableSet): + def __contains__(self, item): + return False + + def __iter__(self): + return iter(()) + + def __len__(self): + return 0 + + def add(self, value): + pass + + def discard(self, value): + pass + + def update(self, other): + pass + + def difference_update(self, other): + pass + + +class DummyResourceLocker(ResourceLocker): + def __init__(self): + self.lock = DummyAsyncLock() + self.lockset = DummySet() + + def get_lockset(self, namespace: str) -> tuple[DummyAsyncLock, DummySet]: + return self.lock, self.lockset + + @asynccontextmanager + async def lock_ctx(self, namespace: str, keys: list[KeyT]): + yield + + def string_to_lock_id(s: str) -> int: return int(hashlib.sha256(s.encode()).hexdigest(), 16) % (2**63) @@ -67,15 +150,21 @@ async def try_advisory_lock_ctx( await bind.execute(select(func.pg_advisory_unlock(string_to_lock_id(resource)))) -_locker = ResourceLocker() +_in_memory_locker = InMemoryResourceLocker() +_dummy_locker = DummyResourceLocker() -def get_locker() -> ResourceLocker: - return _locker +def get_locker(dialect_name: str) -> ResourceLocker: + if dialect_name == "sqlite": + return _in_memory_locker + # We could use an in-memory locker on Postgres + # but it can lead to unnecessary lock contention, + # so we use a dummy locker that does not take any locks. + return _dummy_locker async def _wait_to_lock_many( - lock: asyncio.Lock, locked: Set[KeyT], keys: List[KeyT], *, delay: float = 0.1 + lock: asyncio.Lock, locked: set[KeyT], keys: list[KeyT], *, delay: float = 0.1 ): """ Retry locking until all the keys are locked. @@ -88,7 +177,7 @@ async def _wait_to_lock_many( locked_now_num = 0 for key in left_to_lock: if key in locked: - # Someone already aquired the lock, wait + # Someone already acquired the lock, wait break locked.add(key) locked_now_num += 1 diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 8b2c5eab8a..25b67383d6 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -482,7 +482,7 @@ async def submit_run( select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace))) ) - lock, _ = get_locker().get_lockset(lock_namespace) + lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace) async with lock: if run_spec.run_name is None: run_spec.run_name = await _generate_run_name( @@ -587,7 +587,7 @@ async def stop_runs( run_models = res.scalars().all() run_ids = sorted([r.id for r in run_models]) await session.commit() - async with get_locker().lock_ctx(RunModel.__tablename__, run_ids): + async with get_locker(get_db().dialect_name).lock_ctx(RunModel.__tablename__, run_ids): res = await session.execute( select(RunModel) .where(RunModel.id.in_(run_ids)) @@ -625,7 +625,7 @@ async def delete_runs( run_models = res.scalars().all() run_ids = sorted([r.id for r in run_models]) await session.commit() - async with get_locker().lock_ctx(RunModel.__tablename__, run_ids): + async with get_locker(get_db().dialect_name).lock_ctx(RunModel.__tablename__, run_ids): res = await session.execute( select(RunModel) .where(RunModel.id.in_(run_ids)) diff --git a/src/dstack/_internal/server/services/volumes.py b/src/dstack/_internal/server/services/volumes.py index 8d5a8a18cc..9a7ed53d34 100644 --- a/src/dstack/_internal/server/services/volumes.py +++ b/src/dstack/_internal/server/services/volumes.py @@ -223,7 +223,7 @@ async def create_volume( select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace))) ) - lock, _ = get_locker().get_lockset(lock_namespace) + lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace) async with lock: if configuration.name is not None: volume_model = await get_project_volume_model_by_name( @@ -262,7 +262,7 @@ async def delete_volumes(session: AsyncSession, project: ProjectModel, names: Li volumes_ids = sorted([v.id for v in volume_models]) await session.commit() logger.info("Deleting volumes: %s", [v.name for v in volume_models]) - async with get_locker().lock_ctx(VolumeModel.__tablename__, volumes_ids): + async with get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids): # Refetch after lock res = await session.execute( select(VolumeModel) diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index d42bd10846..22ed28cd43 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -34,7 +34,7 @@ DB_MAX_OVERFLOW = int(os.getenv("DSTACK_DB_MAX_OVERFLOW", 20)) # Increases the size of processing batches -# allowing to process more resource on one server replica. +# allowing to process more resources on one server replica. # Not recommended to change on SQLite. # DSTACK_DB_POOL_SIZE and DSTACK_DB_MAX_OVERFLOW # must be increased proportionally to handle larger batches. From df2546036e91b62256f55670ba1361cbf79cb454 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 15:31:04 +0500 Subject: [PATCH 10/20] Process submitted jobs in one steps if no instance lock --- .../server/background/tasks/process_submitted_jobs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 56a66ff82d..7873afb6dd 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -245,8 +245,10 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): ) job_model.instance_assigned = True job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return + if len(pool_instances) > 0: + await session.commit() + return + # If no instances were locked, we can proceed in the same transaction. if job_model.instance is not None: res = await session.execute( From 76696ccccf10093c05435a5913cba7b37613f18c Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 17:35:54 +0500 Subject: [PATCH 11/20] Fix missing cache lock --- .../_internal/server/services/jobs/configurators/base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index 349f2bcf02..e1fd4247aa 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -1,5 +1,6 @@ import shlex import sys +import threading from abc import ABC, abstractmethod from pathlib import PurePosixPath from typing import Dict, List, Optional, Union @@ -351,7 +352,10 @@ def _join_shell_commands(commands: List[str]) -> str: return " && ".join(commands) -@cached(TTLCache(maxsize=2048, ttl=80)) +@cached( + cache=TTLCache(maxsize=2048, ttl=80), + lock=threading.Lock(), +) def _get_image_config(image: str, registry_auth: Optional[RegistryAuth]) -> ImageConfig: try: return get_image_config(image, registry_auth).config From 2ed8c77ac00700e5c7b820134a03a68c1cbf39f8 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 2 Jul 2025 17:36:31 +0500 Subject: [PATCH 12/20] Configure custom executor --- src/dstack/_internal/server/app.py | 3 +++ src/dstack/_internal/server/settings.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index 6a76dadf48..3d295885fa 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -2,6 +2,7 @@ import importlib.resources import os import time +from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager from pathlib import Path from typing import Awaitable, Callable, List @@ -97,6 +98,8 @@ def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): configure_logging() + server_executor = ThreadPoolExecutor(max_workers=settings.SERVER_EXECUTOR_MAX_WORKERS) + asyncio.get_running_loop().set_default_executor(server_executor) await migrate() _print_dstack_logo() if not check_required_ssh_version(): diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 22ed28cd43..7589366423 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -40,6 +40,8 @@ # must be increased proportionally to handle larger batches. SERVER_BACKGROUND_PROCESSING_RATE = int(os.getenv("DSTACK_SERVER_BACKGROUND_PROCESSING_RATE", 1)) +SERVER_EXECUTOR_MAX_WORKERS = int(os.getenv("DSTACK_SERVER_EXECUTOR_MAX_WORKERS", 128)) + MAX_OFFERS_TRIED = int(os.getenv("DSTACK_SERVER_MAX_OFFERS_TRIED", 25)) SERVER_CONFIG_DISABLED = os.getenv("DSTACK_SERVER_CONFIG_DISABLED") is not None From b51806a8c0a4f1516a974e376e3694332735ec49 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 3 Jul 2025 11:08:17 +0500 Subject: [PATCH 13/20] Fix typo --- src/dstack/_internal/server/background/tasks/process_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 3e7cafb61e..88320eab5a 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -341,7 +341,7 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): current_time - run_model.submitted_at.replace(tzinfo=datetime.timezone.utc) ).total_seconds() logger.info( - "%s: run took %.2f seconds from submision to provisioning.", + "%s: run took %.2f seconds from submission to provisioning.", fmt(run_model), submit_to_provision_duration, ) From a02f2c3a216d98ade6838d29f5afee96207b5d7d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 3 Jul 2025 11:37:05 +0500 Subject: [PATCH 14/20] Increase process_submitted_jobs max_instances --- .../_internal/server/background/__init__.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 0d08910c67..4300cb193f 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -53,6 +53,13 @@ def start_background_tasks() -> AsyncIOScheduler: # # Using larger batches to process more resources can lead to DB connections exhaustion. # Users can set SERVER_BACKGROUND_PROCESSING_RATE to process more resources per replica. + # + # Potentially long tasks (e.g. provisioning) process one resource per transaction + # to avoid holding locks for other resources if one is slow to process. + # Still, the next batch won't be processed unless all resources are processed. + # So larger batches do not increase processing linearly. + # Consider increasing max_instances in that case. + _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) if settings.ENABLE_PROMETHEUS_METRICS: @@ -60,12 +67,12 @@ def start_background_tasks() -> AsyncIOScheduler: collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 ) _scheduler.add_job(delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1) - # process_submitted_jobs and process_instances max processing rate is 75 jobs(instances) per minute. + # process_submitted_jobs and process_instances max processing rate is 75 jobs/instances per minute. _scheduler.add_job( process_submitted_jobs, IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 10 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=2, + kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, + max_instances=4, ) _scheduler.add_job( process_running_jobs, @@ -89,7 +96,7 @@ def start_background_tasks() -> AsyncIOScheduler: process_instances, IntervalTrigger(seconds=4, jitter=2), kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=2, + max_instances=4, ) _scheduler.add_job( process_fleets, From 993eb9771712ee60fc35558a0f1768b4e9bd4ebc Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 3 Jul 2025 12:00:50 +0500 Subject: [PATCH 15/20] Implement offers cache warm up trick --- .../tasks/process_submitted_jobs.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 7873afb6dd..224e7c13fb 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -1,5 +1,6 @@ import asyncio import uuid +from datetime import datetime, timedelta from typing import List, Optional, Tuple from sqlalchemy import select @@ -80,13 +81,32 @@ logger = get_logger(__name__) +# Track when we last processed a job. +# This is needed for a trick: +# If no tasks were processed recently, we force batch_size 1. +# If there are lots of runs/jobs with same offers submitted, +# we warm up the cache instead of requesting the offers concurrently. +BATCH_SIZE_RESET_TIMEOUT = timedelta(minutes=2) +last_processed_at: Optional[datetime] = None + + async def process_submitted_jobs(batch_size: int = 1): tasks = [] - for _ in range(batch_size): + effective_batch_size = _get_effective_batch_size(batch_size) + for _ in range(effective_batch_size): tasks.append(_process_next_submitted_job()) await asyncio.gather(*tasks) +def _get_effective_batch_size(batch_size: int) -> int: + if ( + last_processed_at is None + or last_processed_at < common_utils.get_current_datetime() - BATCH_SIZE_RESET_TIMEOUT + ): + return 1 + return batch_size + + async def _process_next_submitted_job(): lock, lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__) async with get_session_ctx() as session: @@ -125,6 +145,8 @@ async def _process_next_submitted_job(): await _process_submitted_job(session=session, job_model=job_model) finally: lockset.difference_update([job_model_id]) + global last_processed_at + last_processed_at = common_utils.get_current_datetime() async def _process_submitted_job(session: AsyncSession, job_model: JobModel): From c39b6e4c29b267915bf7e92827adebf56de27b95 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 3 Jul 2025 13:40:47 +0500 Subject: [PATCH 16/20] Replace DSTACK_SERVER_BACKGROUND_PROCESSING_RATE with DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR --- .../_internal/server/background/__init__.py | 101 +++++++++--------- src/dstack/_internal/server/settings.py | 8 +- 2 files changed, 58 insertions(+), 51 deletions(-) diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 4300cb193f..7c261af305 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -37,11 +37,15 @@ def get_scheduler() -> AsyncIOScheduler: def start_background_tasks() -> AsyncIOScheduler: - # In-memory locking via locksets does not guarantee - # that the first waiting for the lock will acquire it. - # The jitter is needed to give all tasks a chance to acquire locks. + # We try to process as many resources as possible without exhausting DB connections. + # + # Quick tasks can process multiple resources per transaction. + # Potentially long tasks process one resource per transaction + # to avoid holding locks for all the resources if one is slow to process. + # Still, the next batch won't be processed unless all resources are processed, + # so larger batches do not increase processing rate linearly. # - # The batch_size and interval determine background tasks processing rates. + # The interval, batch_size, and max_instances determine background tasks processing rates. # By default, one server replica can handle: # # * 150 active jobs with 2 minutes processing latency @@ -51,14 +55,12 @@ def start_background_tasks() -> AsyncIOScheduler: # These latency numbers do not account for provisioning time, # so it may be slower if a backend is slow to provision. # - # Using larger batches to process more resources can lead to DB connections exhaustion. - # Users can set SERVER_BACKGROUND_PROCESSING_RATE to process more resources per replica. + # Users can set SERVER_BACKGROUND_PROCESSING_FACTOR to process more resources per replica. + # They also need to increase max db connections on the client side and db side. # - # Potentially long tasks (e.g. provisioning) process one resource per transaction - # to avoid holding locks for other resources if one is slow to process. - # Still, the next batch won't be processed unless all resources are processed. - # So larger batches do not increase processing linearly. - # Consider increasing max_instances in that case. + # In-memory locking via locksets does not guarantee + # that the first waiting for the lock will acquire it. + # The jitter is needed to give all tasks a chance to acquire locks. _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) @@ -67,43 +69,6 @@ def start_background_tasks() -> AsyncIOScheduler: collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 ) _scheduler.add_job(delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1) - # process_submitted_jobs and process_instances max processing rate is 75 jobs/instances per minute. - _scheduler.add_job( - process_submitted_jobs, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=4, - ) - _scheduler.add_job( - process_running_jobs, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=2, - ) - _scheduler.add_job( - process_terminating_jobs, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=2, - ) - _scheduler.add_job( - process_runs, - IntervalTrigger(seconds=2, jitter=1), - kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=2, - ) - _scheduler.add_job( - process_instances, - IntervalTrigger(seconds=4, jitter=2), - kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=4, - ) - _scheduler.add_job( - process_fleets, - IntervalTrigger(seconds=10, jitter=2), - kwargs={"batch_size": 5 * settings.SERVER_BACKGROUND_PROCESSING_RATE}, - max_instances=2, - ) _scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15)) _scheduler.add_job( process_submitted_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5 @@ -112,5 +77,45 @@ def start_background_tasks() -> AsyncIOScheduler: process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5 ) _scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5)) + for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR): + # Add multiple copies of tasks if requested. + # max_instances=1 for additional copies to avoid running too many tasks. + # Move other tasks here when they need per-replica scaling. + _scheduler.add_job( + process_submitted_jobs, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=4 if replica == 0 else 1, + ) + _scheduler.add_job( + process_running_jobs, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) + _scheduler.add_job( + process_terminating_jobs, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) + _scheduler.add_job( + process_runs, + IntervalTrigger(seconds=2, jitter=1), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) + _scheduler.add_job( + process_instances, + IntervalTrigger(seconds=4, jitter=2), + kwargs={"batch_size": 5}, + max_instances=4 if replica == 0 else 1, + ) + _scheduler.add_job( + process_fleets, + IntervalTrigger(seconds=10, jitter=2), + kwargs={"batch_size": 5}, + max_instances=2 if replica == 0 else 1, + ) _scheduler.start() return _scheduler diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 7589366423..d4b52adda4 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -33,12 +33,14 @@ DB_POOL_SIZE = int(os.getenv("DSTACK_DB_POOL_SIZE", 20)) DB_MAX_OVERFLOW = int(os.getenv("DSTACK_DB_MAX_OVERFLOW", 20)) -# Increases the size of processing batches +# Scale the number of background processing tasks # allowing to process more resources on one server replica. # Not recommended to change on SQLite. # DSTACK_DB_POOL_SIZE and DSTACK_DB_MAX_OVERFLOW -# must be increased proportionally to handle larger batches. -SERVER_BACKGROUND_PROCESSING_RATE = int(os.getenv("DSTACK_SERVER_BACKGROUND_PROCESSING_RATE", 1)) +# must be increased proportionally. +SERVER_BACKGROUND_PROCESSING_FACTOR = int( + os.getenv("DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR", 1) +) SERVER_EXECUTOR_MAX_WORKERS = int(os.getenv("DSTACK_SERVER_EXECUTOR_MAX_WORKERS", 128)) From 6311ba0d1a8bb973271451c6c457a3a3fb12c954 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 3 Jul 2025 15:32:27 +0500 Subject: [PATCH 17/20] Document DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR --- docs/docs/guides/server-deployment.md | 14 +++++++++++++- docs/docs/reference/environment-variables.md | 7 +++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/docs/docs/guides/server-deployment.md b/docs/docs/guides/server-deployment.md index c13e5b451b..8ff4034e62 100644 --- a/docs/docs/guides/server-deployment.md +++ b/docs/docs/guides/server-deployment.md @@ -381,8 +381,20 @@ A single `dstack` server replica can support: * Up to 150 active jobs. * Up to 150 active instances. -Having more active resources can affect server performance. +Having more active resources will work but can affect server performance. If you hit these limits, consider using Postgres with multiple server replicas. +You can also increase processing rates of a replica by setting the `DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR` environment variable. +You should also increase `DSTACK_DB_POOL_SIZE` and `DSTACK_DB_MAX_OVERFLOW` proportionally. +For example, to increase processing rates 4 times, set: + +``` +export DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR=4 +export DSTACK_DB_POOL_SIZE=80 +export DSTACK_DB_MAX_OVERFLOW=80 +``` + +You have to ensure your Postgres installation supports that many connections by +configuring [`max_connections`](https://www.postgresql.org/docs/current/runtime-config-connection.html#GUC-MAX-CONNECTIONS) and/or using connection pooler. ## FAQs diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index 27fda53938..4c5d44bd56 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -116,10 +116,13 @@ For more details on the options below, refer to the [server deployment](../guide - `DSTACK_ENABLE_PROMETHEUS_METRICS`{ #DSTACK_ENABLE_PROMETHEUS_METRICS } — Enables Prometheus metrics collection and export. - `DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE`{ #DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE } – Request body size limit for services running with a gateway, in bytes. Defaults to 64 MiB. - `DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY`{ #DSTACK_FORBID_SERVICES_WITHOUT_GATEWAY } – Forbids registering new services without a gateway if set to any value. -- `DSTACK_SERVER_CODE_UPLOAD_LIMIT`{ #DSTACK_SERVER_CODE_UPLOAD_LIMIT } - The repo size limit when uploading diffs or local repos, in bytes. Set to 0 to disable size limits. Defaults to 2MiB. +- `DSTACK_SERVER_CODE_UPLOAD_LIMIT`{ #DSTACK_SERVER_CODE_UPLOAD_LIMIT } - The repo size limit when uploading diffs or local repos, in bytes. Set to `0` to disable size limits. Defaults to `2MiB`. - `DSTACK_SERVER_S3_BUCKET`{ #DSTACK_SERVER_S3_BUCKET } - The bucket that repo diffs will be uploaded to if set. If unset, diffs are uploaded to the database. - `DSTACK_SERVER_S3_BUCKET_REGION`{ #DSTACK_SERVER_S3_BUCKET_REGION } - The region of the S3 Bucket. -- `DSTACK_SERVER_GCS_BUCKET`{ #DSTACK_SERVER_GCD_BUCKET } - The bucket that repo diffs will be uploaded to if set. If unset, diffs are uploaded to the database. +- `DSTACK_SERVER_GCS_BUCKET`{ #DSTACK_SERVER_GCS_BUCKET } - The bucket that repo diffs will be uploaded to if set. If unset, diffs are uploaded to the database. +- `DSTACK_DB_POOL_SIZE`{ #DSTACK_DB_POOL_SIZE } - The client DB connections pool size. Defaults to `20`, +- `DSTACK_DB_MAX_OVERFLOW`{ #DSTACK_DB_MAX_OVERFLOW } - The client DB connections pool allowed overflow. Defaults to `20`. +- `DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR`{ #DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR } - The number of background jobs for processing server resources. Increase if you need to process more resources per server replica quickly. Defaults to `1`. ??? info "Internal environment variables" The following environment variables are intended for development purposes: From d659cea3d72f2c8035e6a8cf8038dfc633931a79 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 3 Jul 2025 16:24:40 +0500 Subject: [PATCH 18/20] Do not create clusters from burstable AWS instances Fixes #2872 --- .../_internal/core/backends/aws/compute.py | 26 +++++++++++++++++-- src/dstack/_internal/core/errors.py | 4 +++ .../_internal/server/background/__init__.py | 2 +- .../background/tasks/process_instances.py | 7 +++++ 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/core/backends/aws/compute.py b/src/dstack/_internal/core/backends/aws/compute.py index edc26136b0..82e628e585 100644 --- a/src/dstack/_internal/core/backends/aws/compute.py +++ b/src/dstack/_internal/core/backends/aws/compute.py @@ -33,7 +33,12 @@ merge_tags, ) from dstack._internal.core.backends.base.offers import get_catalog_offers -from dstack._internal.core.errors import ComputeError, NoCapacityError, PlacementGroupInUseError +from dstack._internal.core.errors import ( + ComputeError, + NoCapacityError, + PlacementGroupInUseError, + PlacementGroupNotSupportedError, +) from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import CoreModel from dstack._internal.core.models.gateways import ( @@ -46,7 +51,11 @@ InstanceOffer, InstanceOfferWithAvailability, ) -from dstack._internal.core.models.placement import PlacementGroup, PlacementGroupProvisioningData +from dstack._internal.core.models.placement import ( + PlacementGroup, + PlacementGroupProvisioningData, + PlacementStrategy, +) from dstack._internal.core.models.resources import Memory, Range from dstack._internal.core.models.runs import JobProvisioningData, Requirements from dstack._internal.core.models.volumes import ( @@ -334,6 +343,8 @@ def create_placement_group( placement_group: PlacementGroup, master_instance_offer: InstanceOffer, ) -> PlacementGroupProvisioningData: + if not _offer_supports_placement_group(master_instance_offer, placement_group): + raise PlacementGroupNotSupportedError() ec2_client = self.session.client("ec2", region_name=placement_group.configuration.region) logger.debug("Creating placement group %s...", placement_group.name) ec2_client.create_placement_group( @@ -370,6 +381,8 @@ def is_suitable_placement_group( placement_group: PlacementGroup, instance_offer: InstanceOffer, ) -> bool: + if not _offer_supports_placement_group(instance_offer, placement_group): + return False return ( placement_group.configuration.backend == BackendType.AWS and placement_group.configuration.region == instance_offer.region @@ -1059,6 +1072,15 @@ def _supported_instances(offer: InstanceOffer) -> bool: return False +def _offer_supports_placement_group(offer: InstanceOffer, placement_group: PlacementGroup) -> bool: + if placement_group.configuration.placement_strategy != PlacementStrategy.CLUSTER: + return True + for family in ["t3.", "t2."]: + if offer.instance.name.startswith(family): + return False + return True + + def _get_maximum_efa_interfaces(ec2_client: botocore.client.BaseClient, instance_type: str) -> int: try: response = ec2_client.describe_instance_types( diff --git a/src/dstack/_internal/core/errors.py b/src/dstack/_internal/core/errors.py index 0e8edd05b5..0bfd5f6f33 100644 --- a/src/dstack/_internal/core/errors.py +++ b/src/dstack/_internal/core/errors.py @@ -110,6 +110,10 @@ class PlacementGroupInUseError(ComputeError): pass +class PlacementGroupNotSupportedError(ComputeError): + pass + + class NotYetTerminated(ComputeError): """ Used by Compute.terminate_instance to signal that instance termination is not complete diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 7c261af305..2dd410cd28 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -109,7 +109,7 @@ def start_background_tasks() -> AsyncIOScheduler: process_instances, IntervalTrigger(seconds=4, jitter=2), kwargs={"batch_size": 5}, - max_instances=4 if replica == 0 else 1, + max_instances=2 if replica == 0 else 1, ) _scheduler.add_job( process_fleets, diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 677ed9ddb8..2dd5b7bb00 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -45,6 +45,7 @@ from dstack._internal.core.errors import ( BackendError, NotYetTerminated, + PlacementGroupNotSupportedError, ProvisioningError, ) from dstack._internal.core.models.backends.base import BackendType @@ -1067,6 +1068,12 @@ async def _create_placement_group( placement_group_model_to_placement_group(placement_group_model), master_instance_offer, ) + except PlacementGroupNotSupportedError: + logger.debug( + "Skipping offer %s because placement group not supported", + master_instance_offer.instance.name, + ) + return None except BackendError as e: logger.warning( "Failed to create placement group %s in %s/%s: %r", From 401987246009c4144c4b5e9ce798615d0e1c2355 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 4 Jul 2025 09:18:41 +0500 Subject: [PATCH 19/20] Set MIN_PROCESSING_INTERVAL for process_fleets --- .../_internal/server/background/tasks/process_fleets.py | 7 +++++++ .../server/background/tasks/process_submitted_jobs.py | 1 + src/dstack/_internal/server/testing/common.py | 2 ++ 3 files changed, 10 insertions(+) diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index 6a8d5645f3..9a8dd56636 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -1,4 +1,5 @@ import asyncio +from datetime import timedelta from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -19,6 +20,9 @@ logger = get_logger(__name__) +MIN_PROCESSING_INTERVAL = timedelta(seconds=30) + + async def process_fleets(batch_size: int = 1): tasks = [] for _ in range(batch_size): @@ -35,6 +39,8 @@ async def _process_next_fleet(): .where( FleetModel.deleted == False, FleetModel.id.not_in(lockset), + FleetModel.last_processed_at + < get_current_datetime().replace(tzinfo=None) - MIN_PROCESSING_INTERVAL, ) .order_by(FleetModel.last_processed_at.asc()) .limit(1) @@ -52,6 +58,7 @@ async def _process_next_fleet(): async def _process_fleet(session: AsyncSession, fleet_model: FleetModel): + logger.info("Processing fleet %s", fleet_model.name) # Refetch to load related attributes. # joinedload produces LEFT OUTER JOIN that can't be used with FOR UPDATE. res = await session.execute( diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 224e7c13fb..b9c2f9c946 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -86,6 +86,7 @@ # If no tasks were processed recently, we force batch_size 1. # If there are lots of runs/jobs with same offers submitted, # we warm up the cache instead of requesting the offers concurrently. +# Mostly useful when runs are submitted via API without getting run plan first. BATCH_SIZE_RESET_TIMEOUT = timedelta(minutes=2) last_processed_at: Optional[datetime] = None diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 61b39d1cfd..6c458ed88a 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -500,6 +500,7 @@ async def create_fleet( status: FleetStatus = FleetStatus.ACTIVE, deleted: bool = False, name: Optional[str] = None, + last_processed_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), ) -> FleetModel: if fleet_id is None: fleet_id = uuid.uuid4() @@ -517,6 +518,7 @@ async def create_fleet( spec=spec.json(), instances=[], runs=[], + last_processed_at=last_processed_at, ) session.add(fm) await session.commit() From d91a4c29a8986136f6e3799f4a4c943c736843f0 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 4 Jul 2025 09:21:37 +0500 Subject: [PATCH 20/20] Add TODO --- src/dstack/_internal/server/services/fleets.py | 3 ++- src/dstack/_internal/server/services/runs.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index d54173115a..9925483e45 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -520,7 +520,8 @@ async def delete_fleets( get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids), ): # Refetch after lock - # TODO lock instances with FOR UPDATE? + # TODO: Lock instances with FOR UPDATE? + # TODO: Do not lock fleet when deleting only instances res = await session.execute( select(FleetModel) .where( diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 25b67383d6..d3ca67502c 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -484,6 +484,7 @@ async def submit_run( lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace) async with lock: + # FIXME: delete_runs commits, so Postgres lock is released too early. if run_spec.run_name is None: run_spec.run_name = await _generate_run_name( session=session,