Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0075cd0
Use BulkExecutor for pre-populating documents in benchmarks
Mar 12, 2026
4df5cde
Use streaming Flux for bulk pre-population to reduce memory pressure
Mar 12, 2026
cd9b3d9
Add createItem fallback retry for failed bulk operations
Mar 12, 2026
403aeab
Reduce Gateway mode per-request CPU overhead
Mar 12, 2026
2f1c8a7
Revert HttpHeaders toLowerCaseIfNeeded optimization
Mar 12, 2026
0d19062
Eliminate URI parsing on Gateway request hot path
Mar 12, 2026
0d8e708
Add HTTP/2 support to benchmark workloads
Mar 12, 2026
7916e5c
Fix http2Enabled/http2MaxConcurrentStreams not applied from tenantDef…
Mar 12, 2026
277cb83
Add test to guard applyField coverage for new config fields
Mar 12, 2026
89339b4
Remove test-output from tracking and add to .gitignore
Mar 12, 2026
c500bdd
R3: Lazy response header access — skip intermediate HttpHeaders copy
Mar 12, 2026
f1ba753
Eliminate Map→String[]→Map header copy chain in response path
Mar 12, 2026
6f29ee3
Remove test-output from tracking
Mar 12, 2026
9c91ec3
Deprecate getResponseHeaderNames/Values, fix toArray garbage
Mar 12, 2026
f7a841d
Add test-output/ to gitignore
Mar 12, 2026
a36bc14
Revert headerMap() — keep HttpHeaders copy, retain StoreResponse Map
Mar 13, 2026
0e28165
R2: Set initialBufferSize(16384) for HTTP response decoder
Mar 13, 2026
e12678a
Revert to v4 (URI elim) + add initialBufferSize(16384)
Mar 13, 2026
5a9e0ea
Revert to v4 — remove initialBufferSize(16384)
Mar 13, 2026
1c15947
Merge branch 'main' into optimizeBenchmarkPreCreateDocFlow
Mar 17, 2026
d4ce345
revert URI elimiation change
Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.GatewayConnectionConfig;
import com.azure.cosmos.Http2ConnectionConfig;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
import com.azure.cosmos.models.CosmosMicrometerMetricsOptions;
import com.azure.cosmos.models.CosmosContainerIdentity;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.ThroughputProperties;

import io.micrometer.core.instrument.MeterRegistry;
Expand All @@ -29,8 +34,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -123,6 +126,15 @@ abstract class AsyncBenchmark<T> implements Benchmark {
} else {
GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig();
gatewayConnectionConfig.setMaxConnectionPoolSize(cfg.getMaxConnectionPoolSize());
if (cfg.isHttp2Enabled()) {
Http2ConnectionConfig http2Config = gatewayConnectionConfig.getHttp2ConnectionConfig();
http2Config.setEnabled(true);
if (cfg.getHttp2MaxConcurrentStreams() != null) {
http2Config.setMaxConcurrentStreams(cfg.getHttp2MaxConcurrentStreams());
}
logger.info("HTTP/2 enabled with maxConcurrentStreams: {}",
http2Config.getMaxConcurrentStreams());
}
benchmarkSpecificClientBuilder = benchmarkSpecificClientBuilder.gatewayMode(gatewayConnectionConfig);
}

Expand Down Expand Up @@ -184,68 +196,40 @@ abstract class AsyncBenchmark<T> implements Benchmark {
partitionKey = cosmosAsyncContainer.read().block().getProperties().getPartitionKeyDefinition()
.getPaths().iterator().next().split("/")[1];

ArrayList<Flux<PojoizedJson>> createDocumentObservables = new ArrayList<>();

if (cfg.getOperationType() != Operation.WriteLatency
&& cfg.getOperationType() != Operation.WriteThroughput
&& cfg.getOperationType() != Operation.ReadMyWrites) {
logger.info("PRE-populating {} documents ....", cfg.getNumberOfPreCreatedDocuments());
String dataFieldValue = RandomStringUtils.randomAlphabetic(cfg.getDocumentDataFieldSize());
for (int i = 0; i < cfg.getNumberOfPreCreatedDocuments(); i++) {
String uuid = UUID.randomUUID().toString();
PojoizedJson newDoc = BenchmarkHelper.generateDocument(uuid,
dataFieldValue,
partitionKey,
cfg.getDocumentDataFieldCount());
Flux<PojoizedJson> obs = cosmosAsyncContainer
.createItem(newDoc)
.retryWhen(Retry.max(5).filter((error) -> {
if (!(error instanceof CosmosException)) {
return false;
}
final CosmosException cosmosException = (CosmosException) error;
if (cosmosException.getStatusCode() == 410 ||
cosmosException.getStatusCode() == 408 ||
cosmosException.getStatusCode() == 429 ||
cosmosException.getStatusCode() == 500 ||
cosmosException.getStatusCode() == 503) {
return true;
}

return false;
}))
.onErrorResume(
(error) -> {
if (!(error instanceof CosmosException)) {
return false;
}
final CosmosException cosmosException = (CosmosException) error;
if (cosmosException.getStatusCode() == 409) {
return true;
}

return false;
},
(conflictException) -> cosmosAsyncContainer.readItem(
uuid, new PartitionKey(partitionKey), PojoizedJson.class)
)
.map(resp -> {
PojoizedJson x =
resp.getItem();
return x;
})
.flux();
createDocumentObservables.add(obs);
}
}
List<PojoizedJson> generatedDocs = new ArrayList<>();

Flux<CosmosItemOperation> bulkOperationFlux = Flux.range(0, cfg.getNumberOfPreCreatedDocuments())
.map(i -> {
String uuid = UUID.randomUUID().toString();
PojoizedJson newDoc = BenchmarkHelper.generateDocument(uuid,
dataFieldValue,
partitionKey,
cfg.getDocumentDataFieldCount());
generatedDocs.add(newDoc);
return CosmosBulkOperations.getCreateItemOperation(newDoc, new PartitionKey(uuid));
});

if (createDocumentObservables.isEmpty()) {
docsToRead = new ArrayList<>();
CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
List<CosmosBulkOperationResponse<Object>> failedResponses = new ArrayList<>();
cosmosAsyncContainer
.executeBulkOperations(bulkOperationFlux, bulkExecutionOptions)
.doOnNext(response -> {
if (response.getResponse() == null || !response.getResponse().isSuccessStatusCode()) {
failedResponses.add(response);
}
})
.blockLast(Duration.ofMinutes(10));

BenchmarkHelper.retryFailedBulkOperations(failedResponses, cosmosAsyncContainer, partitionKey);

docsToRead = generatedDocs;
} else {
int prePopConcurrency = Math.max(1, Math.min(cfg.getConcurrency(), 100));
docsToRead = Flux.merge(Flux.fromIterable(createDocumentObservables), prePopConcurrency)
.collectList()
.block();
docsToRead = new ArrayList<>();
}
logger.info("Finished pre-populating {} documents", cfg.getNumberOfPreCreatedDocuments());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,23 @@
package com.azure.cosmos.benchmark;

import java.time.Duration;
import java.util.List;
import java.util.Map;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.PartitionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class BenchmarkHelper {

private static final Logger logger = LoggerFactory.getLogger(BenchmarkHelper.class);
public static PojoizedJson generateDocument(String idString, String dataFieldValue, String partitionKey,
int dataFieldCount) {
PojoizedJson instance = new PojoizedJson();
Expand All @@ -30,4 +44,55 @@ public static boolean shouldContinue(long startTimeMillis, long iterationCount,

return startTimeMillis + maxDurationTime.toMillis() > System.currentTimeMillis();
}

/**
* Retries failed bulk operation responses by falling back to individual createItem calls.
* Ignores 409 (Conflict) errors since the document already exists.
*
* @param failedResponses list of failed bulk operation responses
* @param container the container to retry against
* @param partitionKeyName the partition key property name
*/
public static <TContext> void retryFailedBulkOperations(
List<CosmosBulkOperationResponse<TContext>> failedResponses,
CosmosAsyncContainer container,
String partitionKeyName) {

if (failedResponses.isEmpty()) {
return;
}

logger.info("Retrying {} failed bulk operations with individual createItem calls", failedResponses.size());

Flux.fromIterable(failedResponses)
.flatMap(failedResponse -> {
CosmosItemOperation operation = failedResponse.getOperation();
PojoizedJson item = operation.getItem();
PartitionKey pk = operation.getPartitionKeyValue();

return container.createItem(item, pk, null)
.retryWhen(Retry.max(5).filter(error -> {
if (!(error instanceof CosmosException)) {
return false;
}
int statusCode = ((CosmosException) error).getStatusCode();
return statusCode == 410
|| statusCode == 408
|| statusCode == 429
|| statusCode == 500
|| statusCode == 503;
}))
.onErrorResume(error -> {
if (error instanceof CosmosException
&& ((CosmosException) error).getStatusCode() == 409) {
return Mono.empty();
}
logger.error("Failed to create item on retry: {}", error.getMessage());
return Mono.empty();
});
}, 100)
.blockLast(Duration.ofMinutes(10));

logger.info("Finished retrying failed bulk operations");
}
}
Loading