-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Cosmos][NO Review] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling #49062
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Cosmos][NO Review] Replace per-client schedulers with shared CosmosSchedulers to fix thread scaling #49062
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| import com.azure.cosmos.CosmosDiagnostics; | ||
| import com.azure.cosmos.CosmosException; | ||
| import com.azure.cosmos.implementation.Configs; | ||
| import com.azure.cosmos.implementation.CosmosSchedulers; | ||
| import com.azure.cosmos.implementation.GlobalEndpointManager; | ||
| import com.azure.cosmos.implementation.HttpConstants; | ||
| import com.azure.cosmos.implementation.OperationType; | ||
|
|
@@ -22,10 +23,9 @@ | |
| import com.azure.cosmos.implementation.routing.RegionalRoutingContext; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import reactor.core.Disposable; | ||
| import reactor.core.publisher.Flux; | ||
| import reactor.core.publisher.Mono; | ||
| import reactor.core.scheduler.Scheduler; | ||
| import reactor.core.scheduler.Schedulers; | ||
|
|
||
| import java.net.URI; | ||
| import java.time.Duration; | ||
|
|
@@ -56,9 +56,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker impleme | |
| private final ConcurrentHashMap<RegionalRoutingContext, String> regionalRoutingContextToRegion; | ||
| private final AtomicBoolean isClosed = new AtomicBoolean(false); | ||
| private final AtomicBoolean isPartitionRecoveryTaskRunning = new AtomicBoolean(false); | ||
| private final Scheduler partitionRecoveryScheduler = Schedulers.newSingle( | ||
| "partition-availability-staleness-check", | ||
| true); | ||
| private final AtomicReference<Disposable> partitionRecoveryDisposable = new AtomicReference<>(); | ||
|
|
||
| public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpointManager globalEndpointManager) { | ||
| this.partitionKeyRangeToLocationSpecificUnavailabilityInfo = new ConcurrentHashMap<>(); | ||
|
|
@@ -73,8 +71,12 @@ public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpoin | |
| } | ||
|
|
||
| public void init() { | ||
| if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) { | ||
| this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe(); | ||
| if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() | ||
| && this.isPartitionRecoveryTaskRunning.compareAndSet(false, true)) { | ||
|
|
||
| this.partitionRecoveryDisposable.set(this.updateStaleLocationInfo() | ||
| .subscribeOn(CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC) | ||
| .subscribe()); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -449,7 +451,10 @@ public void setGlobalAddressResolver(GlobalAddressResolver globalAddressResolver | |
| @Override | ||
| public void close() { | ||
| this.isClosed.set(true); | ||
| this.partitionRecoveryScheduler.dispose(); | ||
| Disposable disposable = this.partitionRecoveryDisposable.getAndSet(null); | ||
| if (disposable != null && !disposable.isDisposed()) { | ||
| disposable.dispose(); | ||
| } | ||
| } | ||
|
Comment on lines
451
to
458
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth noting: the old That said, tracking the AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? reply with your reasoning. |
||
|
|
||
| private class PartitionLevelLocationUnavailabilityInfo { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important context: the old
scheduler.dispose()was already minimally effective here. The delay is explicitly scheduled onCosmosSchedulers.COSMOS_PARALLELat line 344, not on the per-instance scheduler.subscribeOn(scheduler)only controls where the initial subscribe signal propagates once subscribed, the delay timer runs independently onCOSMOS_PARALLEL. So the oldscheduler.dispose()only rejected new subscribe dispatches, whichisClosedat line 324 now handles equivalently.The Disposable tracking suggestion is still valid for cleaner shutdown semantics (immediate cancellation of the pending
Mono.delayand faster GC), but its absence is not a functional regression from pre-PR behavior. TheisClosedvolatile checks at lines 324 and 347 were already the real protection mechanism.AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? → reply with your reasoning.