Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class CosmosSchedulers {
private final static String OPEN_CONNECTIONS_BOUNDED_ELASTIC_THREAD_NAME = "open-connections-bounded-elastic";
private final static String ASYNC_CACHE_BACKGROUND_REFRESH_THREAD_NAME = "async-cache-background-refresh-bounded-elastic";
private final static String FAULT_INJECTION_CONNECTION_ERROR_THREAD_NAME = "fault-injection-connection-error-bounded-elastic";
private final static String GLOBAL_ENDPOINT_MANAGER_THREAD_NAME = "global-endpoint-manager-bounded-elastic";
private final static String PARTITION_AVAILABILITY_CHECK_THREAD_NAME = "partition-availability-check-bounded-elastic";

private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS

Expand Down Expand Up @@ -97,4 +99,26 @@ public class CosmosSchedulers {
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);

// Shared scheduler for GlobalEndpointManager background location refresh tasks.
// Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle()
// to prevent thread count from scaling linearly with client/tenant count.
public final static Scheduler GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
GLOBAL_ENDPOINT_MANAGER_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);

// Shared scheduler for per-partition circuit breaker availability staleness checks.
// Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle()
// to prevent thread count from scaling linearly with client/tenant count.
public final static Scheduler PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
PARTITION_AVAILABILITY_CHECK_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URI;
import java.time.Duration;
Expand All @@ -23,6 +22,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -34,8 +34,6 @@
public class GlobalEndpointManager implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(GlobalEndpointManager.class);

private static final CosmosDaemonThreadFactory theadFactory = new CosmosDaemonThreadFactory("cosmos-global-endpoint-mgr");

private final int backgroundRefreshLocationTimeIntervalInMS;
private final int backgroundRefreshJitterMaxInSeconds;
private final LocationCache locationCache;
Expand All @@ -45,7 +43,7 @@ public class GlobalEndpointManager implements AutoCloseable {
private final DatabaseAccountManagerInternal owner;
private final AtomicBoolean isRefreshing;
private final AtomicBoolean refreshInBackground;
private final Scheduler scheduler = Schedulers.newSingle(theadFactory);
private final AtomicReference<Disposable> backgroundRefreshDisposable = new AtomicReference<>();
private volatile boolean isClosed;
private volatile DatabaseAccount latestDatabaseAccount;
private final AtomicBoolean hasThinClientReadLocations = new AtomicBoolean(false);
Expand Down Expand Up @@ -194,7 +192,10 @@ public boolean canUseMultipleWriteLocations(RxDocumentServiceRequest request) {
public void close() {
this.isClosed = true;
this.perPartitionAutomaticFailoverConfigModifier = null;
this.scheduler.dispose();
Disposable disposable = this.backgroundRefreshDisposable.getAndSet(null);
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
logger.debug("GlobalEndpointManager closed.");
}
Comment on lines 192 to 200
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important context: the old scheduler.dispose() was already minimally effective here. The delay is explicitly scheduled on CosmosSchedulers.COSMOS_PARALLEL at line 344, not on the per-instance scheduler. subscribeOn(scheduler) only controls where the initial subscribe signal propagates once subscribed, the delay timer runs independently on COSMOS_PARALLEL. So the old scheduler.dispose() only rejected new subscribe dispatches, which isClosed at line 324 now handles equivalently.

The Disposable tracking suggestion is still valid for cleaner shutdown semantics (immediate cancellation of the pending Mono.delay and faster GC), but its absence is not a functional regression from pre-PR behavior. The isClosed volatile checks at lines 324 and 347 were already the real protection mechanism.

AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? → reply with your reasoning.


Expand Down Expand Up @@ -322,7 +323,11 @@ private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount)
}

private void startRefreshLocationTimerAsync() {
startRefreshLocationTimerAsync(false).subscribe();
Disposable newDisposable = startRefreshLocationTimerAsync(false).subscribe();
Disposable oldDisposable = this.backgroundRefreshDisposable.getAndSet(newDisposable);
if (oldDisposable != null && !oldDisposable.isDisposed()) {
oldDisposable.dispose();
}
}

private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {
Expand Down Expand Up @@ -371,7 +376,7 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {

this.startRefreshLocationTimerAsync();
return Mono.empty();
}).subscribeOn(scheduler);
}).subscribeOn(CosmosSchedulers.GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC);
}

public boolean hasThinClientReadLocations() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
Expand All @@ -22,10 +23,9 @@
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -56,9 +56,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker impleme
private final ConcurrentHashMap<RegionalRoutingContext, String> regionalRoutingContextToRegion;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isPartitionRecoveryTaskRunning = new AtomicBoolean(false);
private final Scheduler partitionRecoveryScheduler = Schedulers.newSingle(
"partition-availability-staleness-check",
true);
private final AtomicReference<Disposable> partitionRecoveryDisposable = new AtomicReference<>();

public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpointManager globalEndpointManager) {
this.partitionKeyRangeToLocationSpecificUnavailabilityInfo = new ConcurrentHashMap<>();
Expand All @@ -73,8 +71,12 @@ public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpoin
}

public void init() {
if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) {
this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe();
if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled()
&& this.isPartitionRecoveryTaskRunning.compareAndSet(false, true)) {

this.partitionRecoveryDisposable.set(this.updateStaleLocationInfo()
.subscribeOn(CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC)
.subscribe());
}
}

Expand Down Expand Up @@ -449,7 +451,10 @@ public void setGlobalAddressResolver(GlobalAddressResolver globalAddressResolver
@Override
public void close() {
this.isClosed.set(true);
this.partitionRecoveryScheduler.dispose();
Disposable disposable = this.partitionRecoveryDisposable.getAndSet(null);
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
Comment on lines 451 to 458
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting: the old partitionRecoveryScheduler.dispose() was less effective at cancellation than it appears. delayElement(Duration) without a scheduler parameter uses Schedulers.parallel() internally (Reactor default), not the subscribeOn scheduler. So disposing the per-instance scheduler never actually cancelled the pending delay the .repeat(() -> !this.isClosed.get()) predicate was already the effective termination mechanism even before this PR.

That said, tracking the Disposable returned by .subscribe() and calling .dispose() in close() would still be an improvement for immediate cleanup and faster GC of closed instances. The PR description explicitly claims this tracking was implemented (AtomicReference.getAndSet()), but the diff doesn't include it worth reconciling the description with the actual implementation.

AI-generated review may be incorrect. Agree? resolve the conversation. Disagree? reply with your reasoning.


private class PartitionLevelLocationUnavailabilityInfo {
Expand Down
Loading