Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

#### Features Added
* Added support for N-Region synchronous commit feature - See [PR 47757](https://github.com/Azure/azure-sdk-for-java/pull/47757)
* Added support for Query Advisor feature - See [48160](https://github.com/Azure/azure-sdk-for-java/pull/48160)
* Added support for Query Advisor feature - See [48160](https://github.com/Azure/azure-sdk-for-java/pull/48160)
* Added write availability strategy (hedging) for Per-Partition Automatic Failover (PPAF) single-writer accounts. When a write to the current write region is slow or fails (410/21005, 503/21008, 403/3, 408), the SDK hedges the write to a read region via the existing availability strategy. On success, the PPAF manager records the new region so subsequent writes route directly there. Controlled by `COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF` system property (default: enabled).

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,11 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) {
request.requestContext.routeToLocation(this.regionalRoutingContext);
}

// In case PPAF is enabled and a location override exists for the partition key range assigned to the request
// In case PPAF is enabled and a location override exists for the partition key range assigned to the request.
// This also handles PPAF write hedging — when ppafWriteHedgeTargetRegion is set, it creates
// the conchashmap entry and routes the request to the target read region.
this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryAddPartitionLevelLocationOverride(request);

this.throttlingRetry.onBeforeSendRequest(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ public class Configs {
private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF";
private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE = "COSMOS_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF";

private static final String DEFAULT_IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "true";
private static final String IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF";
private static final String IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE = "COSMOS_IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF";

private static final int DEFAULT_WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF = 25;
private static final String WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF = "COSMOS.WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF";
private static final String WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF_VARIABLE = "COSMOS_WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF_VARIABLE";
Expand Down Expand Up @@ -1344,6 +1348,16 @@ public static boolean isReadAvailabilityStrategyEnabledWithPpaf() {
return Boolean.parseBoolean(isReadAvailabilityStrategyEnabledWithPpaf);
}

public static boolean isWriteAvailabilityStrategyEnabledWithPpaf() {
String isWriteAvailabilityStrategyEnabledWithPpaf = System.getProperty(
IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF,
firstNonNull(
emptyToNull(System.getenv().get(IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE)),
DEFAULT_IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF));

return Boolean.parseBoolean(isWriteAvailabilityStrategyEnabledWithPpaf);
}

public static String getAadScopeOverride() {
return System.getProperty(
AAD_SCOPE_OVERRIDE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -96,4 +97,21 @@ public void setPerPartitionFailoverInfo(PartitionLevelAutomaticFailoverInfo part
public PerPartitionAutomaticFailoverInfoHolder getPerPartitionAutomaticFailoverInfoHolder() {
return this.perPartitionAutomaticFailoverInfoHolder;
}

/**
* For PPAF write hedging on single-writer accounts, this field holds the target
* read region that the hedged write should be routed to. When set, {@code ClientRetryPolicy}
* uses {@code routeToLocation(RegionalRoutingContext)} to force-route the request
* to this region, bypassing the excluded-regions mechanism which cannot route writes
* to read regions on single-writer accounts.
*/
private volatile RegionalRoutingContext ppafWriteHedgeTargetRegion;

public RegionalRoutingContext getPpafWriteHedgeTargetRegion() {
return this.ppafWriteHedgeTargetRegion;
}

public void setPpafWriteHedgeTargetRegion(RegionalRoutingContext ppafWriteHedgeTargetRegion) {
this.ppafWriteHedgeTargetRegion = ppafWriteHedgeTargetRegion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
private List<CosmosOperationPolicy> operationPolicies;
private final AtomicReference<CosmosAsyncClient> cachedCosmosAsyncClientSnapshot;
private CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads;
private CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForWrites;
private Consumer<DatabaseAccount> perPartitionFailoverConfigModifier;

public RxDocumentClientImpl(URI serviceEndpoint,
Expand Down Expand Up @@ -3483,6 +3484,14 @@ private CosmosEndToEndOperationLatencyPolicyConfig getEffectiveEndToEndOperation
return this.ppafEnforcedE2ELatencyPolicyConfigForReads;
}

// For write point operations, apply PPAF-enforced write availability strategy — mirroring
// the read default. This enables hedging writes to read regions when the primary write
// region is unresponsive. Batch is excluded (not a point operation) because it bypasses
// wrapPointOperationWithAvailabilityStrategy.
if (operationType.isWriteOperation() && operationType.isPointOperation()) {
return this.ppafEnforcedE2ELatencyPolicyConfigForWrites;
}

return null;
}

Expand Down Expand Up @@ -7335,6 +7344,29 @@ private Mono<ResourceResponse<Document>> wrapPointOperationWithAvailabilityStrat
idempotentWriteRetriesEnabled,
nonNullRequestOptions);

// For PPAF write hedging on single-writer accounts, build a map of region name → RegionalRoutingContext
// so hedged requests can be force-routed to specific read regions via routeToLocation.
// This bypasses the excluded-regions mechanism which cannot route writes to read regions.
boolean isPpafWriteHedging = operationType.isWriteOperation()
&& !this.globalEndpointManager.canUseMultipleWriteLocations()
&& this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()
&& Configs.isWriteAvailabilityStrategyEnabledWithPpaf();

Map<String, RegionalRoutingContext> regionToRoutingContext = new HashMap<>();
if (isPpafWriteHedging) {
// Use ALL account-level read regions (not just preferred regions) as hedge candidates.
// PPAF write failover can target any read region, not just the ones in the preferred list.
List<RegionalRoutingContext> readRoutingContexts =
this.globalEndpointManager.getAvailableReadRoutingContexts();
for (RegionalRoutingContext rrc : readRoutingContexts) {
String regionName = this.globalEndpointManager.getRegionName(
rrc.getGatewayRegionalEndpoint(), OperationType.Read);
if (regionName != null) {
regionToRoutingContext.put(regionName.toLowerCase(Locale.ROOT), rrc);
}
}
}

AtomicBoolean isOperationSuccessful = new AtomicBoolean(false);
AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false);
PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder = new PerPartitionCircuitBreakerInfoHolder();
Expand Down Expand Up @@ -7448,6 +7480,15 @@ private Mono<ResourceResponse<Document>> wrapPointOperationWithAvailabilityStrat
perPartitionCircuitBreakerInfoHolder,
perPartitionAutomaticFailoverInfoHolder);

// For PPAF write hedging, set the target read region so ClientRetryPolicy
// routes the hedged write there via routeToLocation instead of excluded-regions.
if (isPpafWriteHedging) {
RegionalRoutingContext targetRegion = regionToRoutingContext.get(region.toLowerCase(Locale.ROOT));
if (targetRegion != null) {
crossRegionAvailabilityContextForHedgedRequest.setPpafWriteHedgeTargetRegion(targetRegion);
}
}

Mono<NonTransientPointOperationResult> regionalCrossRegionRetryMono =
callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForHedgedRequest)
.map(NonTransientPointOperationResult::new)
Expand Down Expand Up @@ -7652,6 +7693,55 @@ private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatenc
return null;
}

/**
* Evaluates whether a PPAF-enforced E2E latency policy should be applied for write operations.
* Uses the same timeout/threshold values as the read policy — the availability strategy
* hedging behavior should be symmetric for reads and writes under PPAF.
*/
private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForWrites(
GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
ConnectionPolicy connectionPolicy) {

if (!globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) {
return null;
}

if (Configs.isWriteAvailabilityStrategyEnabledWithPpaf()) {

logger.info("ATTN: As Per-Partition Automatic Failover (PPAF) is enabled a default End-to-End Operation Latency Policy will be applied for write operation types.");

if (connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT) {
Duration networkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout();

checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!");

Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND);
Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND);
Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND);

return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout)
.availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep))
.build();
} else {

Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout();

checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!");

Duration overallE2eLatencyTimeout = Utils.min(Utils.SIX_SECONDS, httpNetworkRequestTimeout);

Duration threshold = Utils.min(overallE2eLatencyTimeout.dividedBy(2), Utils.ONE_SECOND);
Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND);

return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout)
.availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep))
.build();
}
}

return null;
}

private DiagnosticsClientContext getEffectiveClientContext(DiagnosticsClientContext clientContextOverride) {
if (clientContextOverride != null) {
return clientContextOverride;
Expand Down Expand Up @@ -7723,18 +7813,49 @@ private List<String> getApplicableRegionsForSpeculation(
}

if (operationType.isWriteOperation() && !isIdempotentWriteRetriesEnabled) {
return EMPTY_REGION_LIST;
}

if (operationType.isWriteOperation() && !this.globalEndpointManager.canUseMultipleWriteLocations()) {
// For PPAF-enabled single-writer accounts, write hedging is allowed even without
// explicit idempotent write retries because PPAF provides partition-level write
// consistency guarantees — the service ensures exactly-once semantics for writes
// to failed-over partitions.
boolean isPpafEnabled =
this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()
&& Configs.isWriteAvailabilityStrategyEnabledWithPpaf();

if (!isPpafEnabled) {
return EMPTY_REGION_LIST;
}
}

// For PPAF-enabled single-writer accounts, allow write hedging using read regions.
// In PPAF, a partition can fail over to a read region for writes, so read regions
// are valid hedge targets even when the account has only one write region.
//
// Design tradeoff: We relax the multi-write-location gate here because PPAF
// fundamentally changes the write routing model — the service accepts writes
// at read regions for failed-over partitions. Without this, write hedging would
// never activate for the most common PPAF scenario (single-writer accounts),
// leaving the customer waiting up to 60-120s for the retry-based failover path.
boolean isPpafWriteHedgingApplicable = operationType.isWriteOperation()
&& !this.globalEndpointManager.canUseMultipleWriteLocations()
&& this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()
&& Configs.isWriteAvailabilityStrategyEnabledWithPpaf();

if (operationType.isWriteOperation()
&& !this.globalEndpointManager.canUseMultipleWriteLocations()
&& !isPpafWriteHedgingApplicable) {
return EMPTY_REGION_LIST;
}

if (!(endToEndPolicyConfig.getAvailabilityStrategy() instanceof ThresholdBasedAvailabilityStrategy)) {
return EMPTY_REGION_LIST;
}

List<RegionalRoutingContext> regionalRoutingContextList = getApplicableEndPoints(operationType, excludedRegions);
// For PPAF write hedging on single-writer accounts, use ALL account-level read regions
// as hedge candidates (not just preferred regions). PPAF failover can target any read region.
List<RegionalRoutingContext> regionalRoutingContextList =
isPpafWriteHedgingApplicable
? withoutNulls(new ArrayList<>(this.globalEndpointManager.getAvailableReadRoutingContexts()))
: getApplicableEndPoints(operationType, excludedRegions);

HashSet<String> normalizedExcludedRegions = new HashSet<>();
if (excludedRegions != null) {
Expand All @@ -7743,7 +7864,11 @@ private List<String> getApplicableRegionsForSpeculation(

List<String> orderedRegionsForSpeculation = new ArrayList<>();
regionalRoutingContextList.forEach(consolidatedLocationEndpoints -> {
String regionName = this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), operationType);
// For PPAF write hedging, resolve region names against read endpoints since
// the hedged write targets are read regions (not write regions).
String regionName = isPpafWriteHedgingApplicable
? this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), OperationType.Read)
: this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), operationType);
if (!normalizedExcludedRegions.contains(regionName.toLowerCase(Locale.ROOT))) {
orderedRegionsForSpeculation.add(regionName);
}
Expand Down Expand Up @@ -8067,6 +8192,7 @@ private synchronized void initializePerPartitionFailover(DatabaseAccount databas
initializePerPartitionAutomaticFailover(databaseAccountSnapshot);
initializePerPartitionCircuitBreaker();
enableAvailabilityStrategyForReads();
enableAvailabilityStrategyForWrites();

checkNotNull(this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover, "Argument 'globalPartitionEndpointManagerForPerPartitionAutomaticFailover' cannot be null.");
checkNotNull(this.globalPartitionEndpointManagerForPerPartitionCircuitBreaker, "Argument 'globalPartitionEndpointManagerForPerPartitionCircuitBreaker' cannot be null.");
Expand Down Expand Up @@ -8115,6 +8241,19 @@ private void enableAvailabilityStrategyForReads() {
}
}

private void enableAvailabilityStrategyForWrites() {
this.ppafEnforcedE2ELatencyPolicyConfigForWrites = this.evaluatePpafEnforcedE2eLatencyPolicyCfgForWrites(
this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
this.connectionPolicy
);

if (this.ppafEnforcedE2ELatencyPolicyConfigForWrites != null) {
logger.info("ATTN: Per-Partition Automatic Failover (PPAF) enforced E2E Latency Policy for writes is enabled.");
} else {
logger.info("ATTN: Per-Partition Automatic Failover (PPAF) enforced E2E Latency Policy for writes is disabled.");
}
}

public boolean useThinClient() {
return useThinClient;
}
Expand Down
Loading