From 02b230e06522870090cfc1b09ba60490f6c9134f Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 5 May 2026 14:18:53 -0700 Subject: [PATCH 1/2] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling Thread count for 'global-ep-mgr' and 'partition-availability-staleness-check' threads was scaling linearly with tenant/client count because both GlobalEndpointManager and GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker created per-instance Schedulers.newSingle() schedulers. Changes: - Add GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC and PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC shared schedulers to CosmosSchedulers - GlobalEndpointManager: Replace per-instance scheduler with shared scheduler, track background refresh Disposable via AtomicReference for immediate cleanup on close(). Use getAndSet() to atomically dispose old subscriptions on reschedule. - GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker: Replace per-instance scheduler with shared scheduler, track recovery Disposable via AtomicReference for immediate cleanup on close(). Use compareAndSet on isPartitionRecoveryTaskRunning to prevent duplicate background tasks under concurrent init() calls. Shared BoundedElastic schedulers reuse threads with 60s TTL, preventing thread count from growing with client count while still supporting concurrent background tasks from multiple clients. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/CosmosSchedulers.java | 24 +++++++++++++++++++ .../implementation/GlobalEndpointManager.java | 21 +++++++++------- ...tManagerForPerPartitionCircuitBreaker.java | 21 +++++++++------- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index b7db63c7e10b..65bb2c9a7e9f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -16,6 +16,8 @@ public class CosmosSchedulers { private final static String OPEN_CONNECTIONS_BOUNDED_ELASTIC_THREAD_NAME = "open-connections-bounded-elastic"; private final static String ASYNC_CACHE_BACKGROUND_REFRESH_THREAD_NAME = "async-cache-background-refresh-bounded-elastic"; private final static String FAULT_INJECTION_CONNECTION_ERROR_THREAD_NAME = "fault-injection-connection-error-bounded-elastic"; + private final static String GLOBAL_ENDPOINT_MANAGER_THREAD_NAME = "global-endpoint-manager-bounded-elastic"; + private final static String PARTITION_AVAILABILITY_CHECK_THREAD_NAME = "partition-availability-check-bounded-elastic"; private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS @@ -97,4 +99,26 @@ public class CosmosSchedulers { TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true ); + + // Shared scheduler for GlobalEndpointManager background location refresh tasks. + // Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle() + // to prevent thread count from scaling linearly with client/tenant count. + public final static Scheduler GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + GLOBAL_ENDPOINT_MANAGER_THREAD_NAME, + TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, + true + ); + + // Shared scheduler for per-partition circuit breaker availability staleness checks. + // Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle() + // to prevent thread count from scaling linearly with client/tenant count. + public final static Scheduler PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + PARTITION_AVAILABILITY_CHECK_THREAD_NAME, + TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, + true + ); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index d67f8d77088c..e313b5219713 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -9,10 +9,9 @@ import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.net.URI; import java.time.Duration; @@ -23,6 +22,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; @@ -34,8 +34,6 @@ public class GlobalEndpointManager implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(GlobalEndpointManager.class); - private static final CosmosDaemonThreadFactory theadFactory = new CosmosDaemonThreadFactory("cosmos-global-endpoint-mgr"); - private final int backgroundRefreshLocationTimeIntervalInMS; private final int backgroundRefreshJitterMaxInSeconds; private final LocationCache locationCache; @@ -45,7 +43,7 @@ public class GlobalEndpointManager implements AutoCloseable { private final DatabaseAccountManagerInternal owner; private final AtomicBoolean isRefreshing; private final AtomicBoolean refreshInBackground; - private final Scheduler scheduler = Schedulers.newSingle(theadFactory); + private final AtomicReference backgroundRefreshDisposable = new AtomicReference<>(); private volatile boolean isClosed; private volatile DatabaseAccount latestDatabaseAccount; private final AtomicBoolean hasThinClientReadLocations = new AtomicBoolean(false); @@ -194,7 +192,10 @@ public boolean canUseMultipleWriteLocations(RxDocumentServiceRequest request) { public void close() { this.isClosed = true; this.perPartitionAutomaticFailoverConfigModifier = null; - this.scheduler.dispose(); + Disposable disposable = this.backgroundRefreshDisposable.getAndSet(null); + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } logger.debug("GlobalEndpointManager closed."); } @@ -322,7 +323,11 @@ private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) } private void startRefreshLocationTimerAsync() { - startRefreshLocationTimerAsync(false).subscribe(); + Disposable newDisposable = startRefreshLocationTimerAsync(false).subscribe(); + Disposable oldDisposable = this.backgroundRefreshDisposable.getAndSet(newDisposable); + if (oldDisposable != null && !oldDisposable.isDisposed()) { + oldDisposable.dispose(); + } } private Mono startRefreshLocationTimerAsync(boolean initialization) { @@ -371,7 +376,7 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { this.startRefreshLocationTimerAsync(); return Mono.empty(); - }).subscribeOn(scheduler); + }).subscribeOn(CosmosSchedulers.GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC); } public boolean hasThinClientReadLocations() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java index 0213f2255144..cf5ffe7aa874 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java @@ -7,6 +7,7 @@ import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.CosmosSchedulers; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.OperationType; @@ -22,10 +23,9 @@ import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.net.URI; import java.time.Duration; @@ -56,9 +56,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker impleme private final ConcurrentHashMap regionalRoutingContextToRegion; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean isPartitionRecoveryTaskRunning = new AtomicBoolean(false); - private final Scheduler partitionRecoveryScheduler = Schedulers.newSingle( - "partition-availability-staleness-check", - true); + private final AtomicReference partitionRecoveryDisposable = new AtomicReference<>(); public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpointManager globalEndpointManager) { this.partitionKeyRangeToLocationSpecificUnavailabilityInfo = new ConcurrentHashMap<>(); @@ -73,8 +71,12 @@ public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpoin } public void init() { - if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) { - this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe(); + if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() + && this.isPartitionRecoveryTaskRunning.compareAndSet(false, true)) { + + this.partitionRecoveryDisposable.set(this.updateStaleLocationInfo() + .subscribeOn(CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC) + .subscribe()); } } @@ -449,7 +451,10 @@ public void setGlobalAddressResolver(GlobalAddressResolver globalAddressResolver @Override public void close() { this.isClosed.set(true); - this.partitionRecoveryScheduler.dispose(); + Disposable disposable = this.partitionRecoveryDisposable.getAndSet(null); + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } } private class PartitionLevelLocationUnavailabilityInfo { From ce677db33966dec30259a6ff411852d662243393 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Wed, 6 May 2026 09:19:10 -0700 Subject: [PATCH 2/2] Update CHANGELOG for shared scheduler thread scaling fix Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 13c8130c8f1d..6eb1b76bad4e 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Replaced per-client `Schedulers.newSingle()` schedulers in `GlobalEndpointManager` and `GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker` with shared `BoundedElastic` schedulers in `CosmosSchedulers` to prevent thread count from scaling linearly with client/tenant count. - See [PR 49062](https://github.com/Azure/azure-sdk-for-java/pull/49062) ### 4.80.0 (2026-05-01)