[Cosmos][NO Review] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling#49062
Conversation
db98eb9 to
c42cfe8
Compare
|
@sdkReviewAgent |
There was a problem hiding this comment.
Pull request overview
This PR addresses a thread-scaling issue in the Cosmos Java SDK where per-client Schedulers.newSingle() usage caused background thread counts to grow linearly with the number of client instances. It introduces shared schedulers in CosmosSchedulers and updates background work to run on those shared schedulers instead of allocating dedicated per-client threads.
Changes:
- Added shared bounded-elastic schedulers to
CosmosSchedulersfor Global Endpoint Manager refresh and per-partition availability checks. - Updated
GlobalEndpointManagerbackground refresh to use the shared scheduler and removed per-instance scheduler disposal. - Updated
GlobalPartitionEndpointManagerForPerPartitionCircuitBreakerto use the shared scheduler and removed per-instance scheduler disposal.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java |
Adds shared bounded-elastic schedulers for endpoint refresh and partition availability checks. |
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java |
Switches background location refresh work from per-instance single scheduler to shared bounded-elastic scheduler. |
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java |
Switches staleness check work from per-instance single scheduler to shared bounded-elastic scheduler. |
| public void init() { | ||
| if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) { | ||
| this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe(); | ||
| this.updateStaleLocationInfo() | ||
| .subscribeOn(CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC) | ||
| .doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)) | ||
| .subscribe(); |
There was a problem hiding this comment.
This race is amplified by the scheduler change in this PR. Previously, even if two init() calls both passed the guard and created duplicate subscriptions, they'd both land on Schedulers.newSingle() serializing all work onto one thread. With the new BoundedElastic, duplicate subscriptions genuinely run in parallel on different threads. Two concurrent updateStaleLocationInfo() chains would issue duplicate recovery probes (doubled network calls) and potentially apply conflicting health state transitions via concurrent .compute() calls on the same ConcurrentHashMap entries.
The compareAndSet(false, true) fix suggested here is a one-line change that becomes particularly important with this PR.
AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? reply with your reasoning.
| @Override | ||
| public void close() { | ||
| this.isClosed.set(true); | ||
| this.partitionRecoveryScheduler.dispose(); | ||
| } |
There was a problem hiding this comment.
Worth noting: the old partitionRecoveryScheduler.dispose() was less effective at cancellation than it appears. delayElement(Duration) without a scheduler parameter uses Schedulers.parallel() internally (Reactor default), not the subscribeOn scheduler. So disposing the per-instance scheduler never actually cancelled the pending delay the .repeat(() -> !this.isClosed.get()) predicate was already the effective termination mechanism even before this PR.
That said, tracking the Disposable returned by .subscribe() and calling .dispose() in close() would still be an improvement for immediate cleanup and faster GC of closed instances. The PR description explicitly claims this tracking was implemented (AtomicReference.getAndSet()), but the diff doesn't include it worth reconciling the description with the actual implementation.
AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? reply with your reasoning.
| public void close() { | ||
| this.isClosed = true; | ||
| this.perPartitionAutomaticFailoverConfigModifier = null; | ||
| this.scheduler.dispose(); | ||
| logger.debug("GlobalEndpointManager closed."); | ||
| } |
There was a problem hiding this comment.
Important context: the old scheduler.dispose() was already minimally effective here. The delay is explicitly scheduled on CosmosSchedulers.COSMOS_PARALLEL at line 344, not on the per-instance scheduler. subscribeOn(scheduler) only controls where the initial subscribe signal propagates once subscribed, the delay timer runs independently on COSMOS_PARALLEL. So the old scheduler.dispose() only rejected new subscribe dispatches, which isClosed at line 324 now handles equivalently.
The Disposable tracking suggestion is still valid for cleaner shutdown semantics (immediate cancellation of the pending Mono.delay and faster GC), but its absence is not a functional regression from pre-PR behavior. The isClosed volatile checks at lines 324 and 347 were already the real protection mechanism.
AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? → reply with your reasoning.
|
✅ Review complete (47:35) No new comments — existing review coverage is sufficient. Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
…ead scaling Thread count for 'global-ep-mgr' and 'partition-availability-staleness-check' threads was scaling linearly with tenant/client count because both GlobalEndpointManager and GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker created per-instance Schedulers.newSingle() schedulers. Changes: - Add GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC and PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC shared schedulers to CosmosSchedulers - GlobalEndpointManager: Replace per-instance scheduler with shared scheduler, track background refresh Disposable via AtomicReference for immediate cleanup on close(). Use getAndSet() to atomically dispose old subscriptions on reschedule. - GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker: Replace per-instance scheduler with shared scheduler, track recovery Disposable via AtomicReference for immediate cleanup on close(). Use compareAndSet on isPartitionRecoveryTaskRunning to prevent duplicate background tasks under concurrent init() calls. Shared BoundedElastic schedulers reuse threads with 60s TTL, preventing thread count from growing with client count while still supporting concurrent background tasks from multiple clients. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
c42cfe8 to
02b230e
Compare
Problem
PR testing revealed that
global-ep-mgrandpartition-availability-staleness-checkthread counts increase linearly with tenant/client count because bothGlobalEndpointManagerandGlobalPartitionEndpointManagerForPerPartitionCircuitBreakercreate per-instanceSchedulers.newSingle()schedulers.With N clients → N dedicated threads for each component → 2N extra threads just for background location refresh and circuit breaker staleness checks.
Solution
Replace per-instance
Schedulers.newSingle()with shared staticBoundedElasticschedulers inCosmosSchedulers, following the existing pattern used forCOSMOS_PARALLEL,TRANSPORT_RESPONSE_BOUNDED_ELASTIC, etc.Changes
CosmosSchedulers.javaGLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTICshared schedulerPARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTICshared schedulerGlobalEndpointManager.javaSchedulers.newSingle(CosmosDaemonThreadFactory)withCosmosSchedulers.GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTICDisposableviaAtomicReferencewithgetAndSet()to atomically clean up old subscriptions on concurrent callsclose()cancels the tracked subscription instead of disposing the schedulerGlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.javaSchedulers.newSingle("partition-availability-staleness-check")withCosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTICDisposableviaAtomicReferencefor consistent cleanup onclose()Design Decisions
close()startRefreshLocationTimerAsync()is called concurrentlyisClosedguards in both classes provide additional protection against post-close work