Skip to content

[Cosmos][NO Review] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling#49062

Open
xinlian12 wants to merge 1 commit intoAzure:mainfrom
xinlian12:fix/shared-schedulers-thread-scaling
Open

[Cosmos][NO Review] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling#49062
xinlian12 wants to merge 1 commit intoAzure:mainfrom
xinlian12:fix/shared-schedulers-thread-scaling

Conversation

@xinlian12
Copy link
Copy Markdown
Member

Problem

PR testing revealed that global-ep-mgr and partition-availability-staleness-check thread counts increase linearly with tenant/client count because both GlobalEndpointManager and GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker create per-instance Schedulers.newSingle() schedulers.

With N clients → N dedicated threads for each component → 2N extra threads just for background location refresh and circuit breaker staleness checks.

Solution

Replace per-instance Schedulers.newSingle() with shared static BoundedElastic schedulers in CosmosSchedulers, following the existing pattern used for COSMOS_PARALLEL, TRANSPORT_RESPONSE_BOUNDED_ELASTIC, etc.

Changes

CosmosSchedulers.java

  • Added GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC shared scheduler
  • Added PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC shared scheduler

GlobalEndpointManager.java

  • Replaced per-instance Schedulers.newSingle(CosmosDaemonThreadFactory) with CosmosSchedulers.GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC
  • Track background refresh Disposable via AtomicReference with getAndSet() to atomically clean up old subscriptions on concurrent calls
  • close() cancels the tracked subscription instead of disposing the scheduler

GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java

  • Replaced per-instance Schedulers.newSingle("partition-availability-staleness-check") with CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC
  • Track recovery Disposable via AtomicReference for consistent cleanup on close()

Design Decisions

  • BoundedElastic over Single — supports concurrent background tasks from multiple clients; threads auto-reclaim with 60s TTL
  • Disposable tracking — shared schedulers cannot be disposed per-client, so background subscriptions are tracked and cancelled individually in close()
  • AtomicReference.getAndSet() — prevents Disposable leaks when startRefreshLocationTimerAsync() is called concurrently
  • Existing isClosed guards in both classes provide additional protection against post-close work

@github-actions github-actions Bot added the Cosmos label May 5, 2026
@xinlian12 xinlian12 force-pushed the fix/shared-schedulers-thread-scaling branch from db98eb9 to c42cfe8 Compare May 5, 2026 22:36
@xinlian12 xinlian12 changed the title [Cosmos] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling [Cosmos][NO Review] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling May 5, 2026
@xinlian12 xinlian12 marked this pull request as ready for review May 5, 2026 22:38
Copilot AI review requested due to automatic review settings May 5, 2026 22:38
@xinlian12 xinlian12 requested review from a team and kirankumarkolli as code owners May 5, 2026 22:38
@xinlian12
Copy link
Copy Markdown
Member Author

@sdkReviewAgent

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a thread-scaling issue in the Cosmos Java SDK where per-client Schedulers.newSingle() usage caused background thread counts to grow linearly with the number of client instances. It introduces shared schedulers in CosmosSchedulers and updates background work to run on those shared schedulers instead of allocating dedicated per-client threads.

Changes:

  • Added shared bounded-elastic schedulers to CosmosSchedulers for Global Endpoint Manager refresh and per-partition availability checks.
  • Updated GlobalEndpointManager background refresh to use the shared scheduler and removed per-instance scheduler disposal.
  • Updated GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker to use the shared scheduler and removed per-instance scheduler disposal.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java Adds shared bounded-elastic schedulers for endpoint refresh and partition availability checks.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java Switches background location refresh work from per-instance single scheduler to shared bounded-elastic scheduler.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java Switches staleness check work from per-instance single scheduler to shared bounded-elastic scheduler.

Comment on lines +71 to +76
public void init() {
if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) {
this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe();
this.updateStaleLocationInfo()
.subscribeOn(CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC)
.doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true))
.subscribe();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This race is amplified by the scheduler change in this PR. Previously, even if two init() calls both passed the guard and created duplicate subscriptions, they'd both land on Schedulers.newSingle() serializing all work onto one thread. With the new BoundedElastic, duplicate subscriptions genuinely run in parallel on different threads. Two concurrent updateStaleLocationInfo() chains would issue duplicate recovery probes (doubled network calls) and potentially apply conflicting health state transitions via concurrent .compute() calls on the same ConcurrentHashMap entries.

The compareAndSet(false, true) fix suggested here is a one-line change that becomes particularly important with this PR.

AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? reply with your reasoning.

Comment on lines 448 to 451
@Override
public void close() {
this.isClosed.set(true);
this.partitionRecoveryScheduler.dispose();
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting: the old partitionRecoveryScheduler.dispose() was less effective at cancellation than it appears. delayElement(Duration) without a scheduler parameter uses Schedulers.parallel() internally (Reactor default), not the subscribeOn scheduler. So disposing the per-instance scheduler never actually cancelled the pending delay the .repeat(() -> !this.isClosed.get()) predicate was already the effective termination mechanism even before this PR.

That said, tracking the Disposable returned by .subscribe() and calling .dispose() in close() would still be an improvement for immediate cleanup and faster GC of closed instances. The PR description explicitly claims this tracking was implemented (AtomicReference.getAndSet()), but the diff doesn't include it worth reconciling the description with the actual implementation.

AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? reply with your reasoning.

Comment on lines 189 to 193
public void close() {
this.isClosed = true;
this.perPartitionAutomaticFailoverConfigModifier = null;
this.scheduler.dispose();
logger.debug("GlobalEndpointManager closed.");
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important context: the old scheduler.dispose() was already minimally effective here. The delay is explicitly scheduled on CosmosSchedulers.COSMOS_PARALLEL at line 344, not on the per-instance scheduler. subscribeOn(scheduler) only controls where the initial subscribe signal propagates once subscribed, the delay timer runs independently on COSMOS_PARALLEL. So the old scheduler.dispose() only rejected new subscribe dispatches, which isClosed at line 324 now handles equivalently.

The Disposable tracking suggestion is still valid for cleaner shutdown semantics (immediate cancellation of the pending Mono.delay and faster GC), but its absence is not a functional regression from pre-PR behavior. The isClosed volatile checks at lines 324 and 347 were already the real protection mechanism.

AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? → reply with your reasoning.

@xinlian12
Copy link
Copy Markdown
Member Author

Review complete (47:35)

No new comments — existing review coverage is sufficient.

Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage

…ead scaling

Thread count for 'global-ep-mgr' and 'partition-availability-staleness-check'
threads was scaling linearly with tenant/client count because both
GlobalEndpointManager and GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker
created per-instance Schedulers.newSingle() schedulers.

Changes:
- Add GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC and
  PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC shared schedulers to CosmosSchedulers
- GlobalEndpointManager: Replace per-instance scheduler with shared scheduler,
  track background refresh Disposable via AtomicReference for immediate cleanup
  on close(). Use getAndSet() to atomically dispose old subscriptions on reschedule.
- GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker: Replace per-instance
  scheduler with shared scheduler, track recovery Disposable via AtomicReference
  for immediate cleanup on close(). Use compareAndSet on isPartitionRecoveryTaskRunning
  to prevent duplicate background tasks under concurrent init() calls.

Shared BoundedElastic schedulers reuse threads with 60s TTL, preventing thread
count from growing with client count while still supporting concurrent background
tasks from multiple clients.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@xinlian12 xinlian12 force-pushed the fix/shared-schedulers-thread-scaling branch from c42cfe8 to 02b230e Compare May 6, 2026 04:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants