diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java index ea4c3b371332..a1c79871832b 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java @@ -14,12 +14,17 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.GatewayConnectionConfig; +import com.azure.cosmos.Http2ConnectionConfig; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.models.CosmosClientTelemetryConfig; import com.azure.cosmos.models.CosmosMicrometerMetricsOptions; import com.azure.cosmos.models.CosmosContainerIdentity; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.ThroughputProperties; import io.micrometer.core.instrument.MeterRegistry; @@ -29,8 +34,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.util.retry.Retry; - import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -123,6 +126,15 @@ abstract class AsyncBenchmark implements Benchmark { } else { GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig(); gatewayConnectionConfig.setMaxConnectionPoolSize(cfg.getMaxConnectionPoolSize()); + if (cfg.isHttp2Enabled()) { + Http2ConnectionConfig http2Config = gatewayConnectionConfig.getHttp2ConnectionConfig(); + http2Config.setEnabled(true); + if (cfg.getHttp2MaxConcurrentStreams() != null) { + http2Config.setMaxConcurrentStreams(cfg.getHttp2MaxConcurrentStreams()); + } + logger.info("HTTP/2 enabled with maxConcurrentStreams: {}", + http2Config.getMaxConcurrentStreams()); + } benchmarkSpecificClientBuilder = benchmarkSpecificClientBuilder.gatewayMode(gatewayConnectionConfig); } @@ -184,68 +196,40 @@ abstract class AsyncBenchmark implements Benchmark { partitionKey = cosmosAsyncContainer.read().block().getProperties().getPartitionKeyDefinition() .getPaths().iterator().next().split("/")[1]; - ArrayList> createDocumentObservables = new ArrayList<>(); - if (cfg.getOperationType() != Operation.WriteLatency && cfg.getOperationType() != Operation.WriteThroughput && cfg.getOperationType() != Operation.ReadMyWrites) { logger.info("PRE-populating {} documents ....", cfg.getNumberOfPreCreatedDocuments()); String dataFieldValue = RandomStringUtils.randomAlphabetic(cfg.getDocumentDataFieldSize()); - for (int i = 0; i < cfg.getNumberOfPreCreatedDocuments(); i++) { - String uuid = UUID.randomUUID().toString(); - PojoizedJson newDoc = BenchmarkHelper.generateDocument(uuid, - dataFieldValue, - partitionKey, - cfg.getDocumentDataFieldCount()); - Flux obs = cosmosAsyncContainer - .createItem(newDoc) - .retryWhen(Retry.max(5).filter((error) -> { - if (!(error instanceof CosmosException)) { - return false; - } - final CosmosException cosmosException = (CosmosException) error; - if (cosmosException.getStatusCode() == 410 || - cosmosException.getStatusCode() == 408 || - cosmosException.getStatusCode() == 429 || - cosmosException.getStatusCode() == 500 || - cosmosException.getStatusCode() == 503) { - return true; - } - - return false; - })) - .onErrorResume( - (error) -> { - if (!(error instanceof CosmosException)) { - return false; - } - final CosmosException cosmosException = (CosmosException) error; - if (cosmosException.getStatusCode() == 409) { - return true; - } - - return false; - }, - (conflictException) -> cosmosAsyncContainer.readItem( - uuid, new PartitionKey(partitionKey), PojoizedJson.class) - ) - .map(resp -> { - PojoizedJson x = - resp.getItem(); - return x; - }) - .flux(); - createDocumentObservables.add(obs); - } - } + List generatedDocs = new ArrayList<>(); + + Flux bulkOperationFlux = Flux.range(0, cfg.getNumberOfPreCreatedDocuments()) + .map(i -> { + String uuid = UUID.randomUUID().toString(); + PojoizedJson newDoc = BenchmarkHelper.generateDocument(uuid, + dataFieldValue, + partitionKey, + cfg.getDocumentDataFieldCount()); + generatedDocs.add(newDoc); + return CosmosBulkOperations.getCreateItemOperation(newDoc, new PartitionKey(uuid)); + }); - if (createDocumentObservables.isEmpty()) { - docsToRead = new ArrayList<>(); + CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); + List> failedResponses = new ArrayList<>(); + cosmosAsyncContainer + .executeBulkOperations(bulkOperationFlux, bulkExecutionOptions) + .doOnNext(response -> { + if (response.getResponse() == null || !response.getResponse().isSuccessStatusCode()) { + failedResponses.add(response); + } + }) + .blockLast(Duration.ofMinutes(10)); + + BenchmarkHelper.retryFailedBulkOperations(failedResponses, cosmosAsyncContainer, partitionKey); + + docsToRead = generatedDocs; } else { - int prePopConcurrency = Math.max(1, Math.min(cfg.getConcurrency(), 100)); - docsToRead = Flux.merge(Flux.fromIterable(createDocumentObservables), prePopConcurrency) - .collectList() - .block(); + docsToRead = new ArrayList<>(); } logger.info("Finished pre-populating {} documents", cfg.getNumberOfPreCreatedDocuments()); diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java index 2bd8b5bd2b8a..1f8c2ee4ac5b 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java @@ -4,9 +4,23 @@ package com.azure.cosmos.benchmark; import java.time.Duration; +import java.util.List; import java.util.Map; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.PartitionKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + public class BenchmarkHelper { + + private static final Logger logger = LoggerFactory.getLogger(BenchmarkHelper.class); public static PojoizedJson generateDocument(String idString, String dataFieldValue, String partitionKey, int dataFieldCount) { PojoizedJson instance = new PojoizedJson(); @@ -30,4 +44,55 @@ public static boolean shouldContinue(long startTimeMillis, long iterationCount, return startTimeMillis + maxDurationTime.toMillis() > System.currentTimeMillis(); } + + /** + * Retries failed bulk operation responses by falling back to individual createItem calls. + * Ignores 409 (Conflict) errors since the document already exists. + * + * @param failedResponses list of failed bulk operation responses + * @param container the container to retry against + * @param partitionKeyName the partition key property name + */ + public static void retryFailedBulkOperations( + List> failedResponses, + CosmosAsyncContainer container, + String partitionKeyName) { + + if (failedResponses.isEmpty()) { + return; + } + + logger.info("Retrying {} failed bulk operations with individual createItem calls", failedResponses.size()); + + Flux.fromIterable(failedResponses) + .flatMap(failedResponse -> { + CosmosItemOperation operation = failedResponse.getOperation(); + PojoizedJson item = operation.getItem(); + PartitionKey pk = operation.getPartitionKeyValue(); + + return container.createItem(item, pk, null) + .retryWhen(Retry.max(5).filter(error -> { + if (!(error instanceof CosmosException)) { + return false; + } + int statusCode = ((CosmosException) error).getStatusCode(); + return statusCode == 410 + || statusCode == 408 + || statusCode == 429 + || statusCode == 500 + || statusCode == 503; + })) + .onErrorResume(error -> { + if (error instanceof CosmosException + && ((CosmosException) error).getStatusCode() == 409) { + return Mono.empty(); + } + logger.error("Failed to create item on retry: {}", error.getMessage()); + return Mono.empty(); + }); + }, 100) + .blockLast(Duration.ofMinutes(10)); + + logger.info("Finished retrying failed bulk operations"); + } } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java index 656fedd58b17..1136eed9f9b9 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java @@ -3,19 +3,11 @@ package com.azure.cosmos.benchmark; -import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.CosmosException; -import com.azure.cosmos.implementation.AsyncDocumentClient; -import com.azure.cosmos.implementation.CosmosPagedFluxOptions; -import com.azure.cosmos.implementation.Database; -import com.azure.cosmos.implementation.Document; -import com.azure.cosmos.implementation.DocumentCollection; -import com.azure.cosmos.implementation.NotFoundException; -import com.azure.cosmos.implementation.OperationType; -import com.azure.cosmos.implementation.QueryFeedOperationState; -import com.azure.cosmos.implementation.RequestOptions; -import com.azure.cosmos.implementation.ResourceResponse; -import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlParameter; @@ -26,6 +18,7 @@ import reactor.core.scheduler.Scheduler; import reactor.util.retry.Retry; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -42,15 +35,12 @@ * This workflow first will create some documents in cosmosdb and will store them all in its local cache. * Then at each step will randomly will try to do a write, read its own write, or query for its own write. */ -class ReadMyWriteWorkflow extends AsyncBenchmark { +class ReadMyWriteWorkflow extends AsyncBenchmark { private final static String QUERY_FIELD_NAME = "prop"; private final static String ORDER_BY_FIELD_NAME = "_ts"; private final static int MAX_TOP_QUERY_COUNT = 2000; - private AsyncDocumentClient client; - private DocumentCollection collection; - private String nameCollectionLink; - private ConcurrentHashMap cache; + private ConcurrentHashMap cache; private int cacheSize; ReadMyWriteWorkflow(TenantWorkloadConfig cfg, Scheduler scheduler) { @@ -59,21 +49,15 @@ class ReadMyWriteWorkflow extends AsyncBenchmark { @Override protected void init() { - // TODO: move read my writes to use v4 APIs - this.client = CosmosBridgeInternal.getAsyncDocumentClient(benchmarkWorkloadClient); - Database database = DocDBUtils.getDatabase(client, workloadConfig.getDatabaseId()); - this.collection = DocDBUtils.getCollection(client, database.getSelfLink(), workloadConfig.getContainerId()); - this.nameCollectionLink = String.format("dbs/%s/colls/%s", database.getId(), collection.getId()); - this.cacheSize = workloadConfig.getNumberOfPreCreatedDocuments(); this.cache = new ConcurrentHashMap<>(); this.populateCache(); } @Override - protected Mono performWorkload(long i) { + protected Mono performWorkload(long i) { - Flux obs; + Flux obs; boolean readyMyWrite = RandomUtils.nextBoolean(); if (readyMyWrite) { @@ -97,7 +81,7 @@ protected Mono performWorkload(long i) { // then try to query for the document which just was written obs = writeDocument() .flatMap(d -> singlePartitionQuery(d) - .switchIfEmpty(Flux.error(new NotFoundException( + .switchIfEmpty(Flux.error(new RuntimeException( "couldn't find my write in a single partition query!")))); break; case 2: @@ -105,7 +89,7 @@ protected Mono performWorkload(long i) { // then try to query for the document which just was written obs = writeDocument() .flatMap(d -> xPartitionQuery(generateQuery(d)) - .switchIfEmpty(Flux.error(new NotFoundException( + .switchIfEmpty(Flux.error(new RuntimeException( "couldn't find my write in a cross partition query!")))); break; default: @@ -134,13 +118,13 @@ protected Mono performWorkload(long i) { case 2: // randomly choose a document from the cache and do a single partition query obs = singlePartitionQuery(cache.get(cacheKey())) - .switchIfEmpty(Flux.error(new NotFoundException( + .switchIfEmpty(Flux.error(new RuntimeException( "couldn't find my cached write in a single partition query!"))); break; case 3: // randomly choose a document from the cache and do a cross partition query obs = xPartitionQuery(generateRandomQuery()) - .switchIfEmpty(Flux.error(new NotFoundException( + .switchIfEmpty(Flux.error(new RuntimeException( "couldn't find my cached write in a cross partition query!"))); break; default: @@ -153,14 +137,41 @@ protected Mono performWorkload(long i) { } private void populateCache() { - ArrayList> list = new ArrayList<>(); - for (int i = 0; i < cacheSize; i++) { - Flux observable = writeDocument(i); - list.add(observable); - } - logger.info("PRE-populating {} documents ....", cacheSize); - Flux.merge(Flux.fromIterable(list), workloadConfig.getConcurrency()).then().block(); + List generatedDocs = new ArrayList<>(); + + Flux bulkOperationFlux = Flux.range(0, cacheSize) + .map(i -> { + String idString = UUID.randomUUID().toString(); + String randomVal = UUID.randomUUID().toString(); + PojoizedJson newDoc = new PojoizedJson(); + newDoc.setProperty("id", idString); + newDoc.setProperty(partitionKey, idString); + newDoc.setProperty(QUERY_FIELD_NAME, randomVal); + newDoc.setProperty("dataField1", randomVal); + newDoc.setProperty("dataField2", randomVal); + newDoc.setProperty("dataField3", randomVal); + newDoc.setProperty("dataField4", randomVal); + generatedDocs.add(newDoc); + return CosmosBulkOperations.getCreateItemOperation(newDoc, new PartitionKey(idString)); + }); + + CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); + List> failedResponses = new ArrayList<>(); + cosmosAsyncContainer + .executeBulkOperations(bulkOperationFlux, bulkExecutionOptions) + .doOnNext(response -> { + if (response.getResponse() == null || !response.getResponse().isSuccessStatusCode()) { + failedResponses.add(response); + } + }) + .blockLast(Duration.ofMinutes(10)); + + BenchmarkHelper.retryFailedBulkOperations(failedResponses, cosmosAsyncContainer, partitionKey); + + for (int i = 0; i < generatedDocs.size(); i++) { + cache.put(i, generatedDocs.get(i)); + } logger.info("Finished pre-populating {} documents", cacheSize); } @@ -169,7 +180,7 @@ private void populateCache() { * * @return Observable of document */ - private Flux writeDocument() { + private Flux writeDocument() { return writeDocument(null); } @@ -178,20 +189,20 @@ private Flux writeDocument() { * * @return Observable of document */ - private Flux writeDocument(Integer i) { + private Flux writeDocument(Integer i) { String idString = UUID.randomUUID().toString(); String randomVal = UUID.randomUUID().toString(); - Document document = new Document(); - document.setId(idString); - document.set(partitionKey, idString); - document.set(QUERY_FIELD_NAME, randomVal); - document.set("dataField1", randomVal); - document.set("dataField2", randomVal); - document.set("dataField3", randomVal); - document.set("dataField4", randomVal); + PojoizedJson newDoc = new PojoizedJson(); + newDoc.setProperty("id", idString); + newDoc.setProperty(partitionKey, idString); + newDoc.setProperty(QUERY_FIELD_NAME, randomVal); + newDoc.setProperty("dataField1", randomVal); + newDoc.setProperty("dataField2", randomVal); + newDoc.setProperty("dataField3", randomVal); + newDoc.setProperty("dataField4", randomVal); Integer key = i == null ? cacheKey() : i; - return client.createDocument(getCollectionLink(), document, null, false) + return cosmosAsyncContainer.createItem(newDoc) .retryWhen(Retry.max(5).filter((error) -> { if (!(error instanceof CosmosException)) { return false; @@ -218,10 +229,11 @@ private Flux writeDocument(Integer i) { return false; }, - (conflictException) -> client.readDocument(getDocumentLink(document), null) + (conflictException) -> cosmosAsyncContainer.readItem( + idString, new PartitionKey(idString), PojoizedJson.class) ) - .doOnNext(r -> cache.put(key, r.getResource())) - .map(ResourceResponse::getResource).flux(); + .doOnNext(r -> cache.put(key, r.getItem())) + .map(r -> r.getItem()).flux(); } /** @@ -230,12 +242,10 @@ private Flux writeDocument(Integer i) { * @param d document to be read * @return Observable of document */ - private Flux readDocument(Document d) { - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(d.getString(partitionKey))); - - return client.readDocument(getDocumentLink(d), options) - .map(ResourceResponse::getResource).flux(); + private Flux readDocument(PojoizedJson d) { + return cosmosAsyncContainer.readItem( + d.getId(), new PartitionKey(d.getId()), PojoizedJson.class) + .map(r -> r.getItem()).flux(); } /** @@ -250,7 +260,7 @@ private SqlQuerySpec generateRandomQuery() { int key = RandomUtils.nextInt(0, cacheSize); keys.add(key); } - List documentList = null; + List documentList = null; if (RandomUtils.nextBoolean()) { documentList = keys.stream().map(cache::get).collect(Collectors.toList()); } @@ -267,24 +277,11 @@ private SqlQuerySpec generateRandomQuery() { * @param query to find document * @return Observable document */ - private Flux xPartitionQuery(SqlQuerySpec query) { + private Flux xPartitionQuery(SqlQuerySpec query) { CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); options.setMaxDegreeOfParallelism(-1); - QueryFeedOperationState state = new QueryFeedOperationState( - benchmarkWorkloadClient, - "xPartitionQuery", - workloadConfig.getDatabaseId(), - workloadConfig.getContainerId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - - return client.queryDocuments(getCollectionLink(), query, state, Document.class) - .flatMap(p -> Flux.fromIterable(p.getResults())); + return cosmosAsyncContainer.queryItems(query, options, PojoizedJson.class); } /** @@ -294,28 +291,15 @@ private Flux xPartitionQuery(SqlQuerySpec query) { * @param d document to be queried for. * @return Observable document */ - private Flux singlePartitionQuery(Document d) { + private Flux singlePartitionQuery(PojoizedJson d) { CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); - options.setPartitionKey(new PartitionKey(d.get(partitionKey))); + options.setPartitionKey(new PartitionKey(d.getProperty(partitionKey))); SqlQuerySpec sqlQuerySpec = new SqlQuerySpec(String.format("Select top 100 * from c where c.%s = '%s'", QUERY_FIELD_NAME, - d.getString(QUERY_FIELD_NAME))); - - QueryFeedOperationState state = new QueryFeedOperationState( - benchmarkWorkloadClient, - "singlePartitionQuery", - workloadConfig.getDatabaseId(), - workloadConfig.getContainerId(), - ResourceType.Document, - OperationType.Query, - null, - options, - new CosmosPagedFluxOptions() - ); - - return client.queryDocuments(getCollectionLink(), sqlQuerySpec, state, Document.class) - .flatMap(p -> Flux.fromIterable(p.getResults())); + (String) d.getProperty(QUERY_FIELD_NAME))); + + return cosmosAsyncContainer.queryItems(sqlQuerySpec, options, PojoizedJson.class); } /** @@ -326,7 +310,7 @@ private Flux singlePartitionQuery(Document d) { * @param documentList list of documents to be queried for * @return SqlQuerySpec */ - private SqlQuerySpec generateQuery(Document... documentList) { + private SqlQuerySpec generateQuery(PojoizedJson... documentList) { return generateQuery(Arrays.asList(documentList)); } @@ -338,7 +322,7 @@ private SqlQuerySpec generateQuery(Document... documentList) { * @param documentList list of documents to be queried for * @return SqlQuerySpec */ - private SqlQuerySpec generateQuery(List documentList) { + private SqlQuerySpec generateQuery(List documentList) { int top = RandomUtils.nextInt(0, MAX_TOP_QUERY_COUNT); boolean useOrderBy = RandomUtils.nextBoolean(); @@ -353,7 +337,7 @@ private SqlQuerySpec generateQuery(List documentList) { * @param withOrderBy if not null, the query will have an orderby clause * @return SqlQuerySpec */ - private SqlQuerySpec generateQuery(List documentList, Integer topCount, boolean withOrderBy) { + private SqlQuerySpec generateQuery(List documentList, Integer topCount, boolean withOrderBy) { QueryBuilder queryBuilder = new QueryBuilder(); if (withOrderBy) { queryBuilder.orderBy(ORDER_BY_FIELD_NAME); @@ -409,10 +393,10 @@ static class InWhereClause extends WhereClause { private final List parameters; private final String whereCondition; - static InWhereClause asInWhereClause(String fieldName, List documentList) { + static InWhereClause asInWhereClause(String fieldName, List documentList) { List parameters = new ArrayList<>(documentList.size()); for (int i = 0; i < documentList.size(); i++) { - Object value = documentList.get(i).get(fieldName); + Object value = documentList.get(i).getProperty(fieldName); SqlParameter sqlParameter = new SqlParameter("@param" + i, value); parameters.add(sqlParameter); } @@ -473,28 +457,4 @@ SqlQuerySpec toSqlQuerySpec() { } } - protected String getCollectionLink() { - if (workloadConfig.isUseNameLink()) { - return this.nameCollectionLink; - } else { - return collection.getSelfLink(); - } - } - - protected String getDocumentLink(Document doc) { - if (workloadConfig.isUseNameLink()) { - return this.nameCollectionLink + "/docs/" + doc.getId(); - } else { - return doc.getSelfLink(); - } - } - - @Override - public void shutdown() { - if (this.client != null) { - this.client.close(); - } - - super.shutdown(); - } } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java index e523852bcbd8..03f43408ff26 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java @@ -202,6 +202,12 @@ public enum Environment { @JsonProperty("connectionSharingAcrossClientsEnabled") private Boolean connectionSharingAcrossClientsEnabled; + @JsonProperty("http2Enabled") + private Boolean http2Enabled; + + @JsonProperty("http2MaxConcurrentStreams") + private Integer http2MaxConcurrentStreams; + @JsonProperty("preferredRegionsList") private String preferredRegionsList; @@ -338,6 +344,14 @@ public boolean isConnectionSharingAcrossClientsEnabled() { return connectionSharingAcrossClientsEnabled != null && connectionSharingAcrossClientsEnabled; } + public boolean isHttp2Enabled() { + return http2Enabled != null && http2Enabled; + } + + public Integer getHttp2MaxConcurrentStreams() { + return http2MaxConcurrentStreams; + } + public List getPreferredRegionsList() { if (preferredRegionsList == null || preferredRegionsList.isEmpty()) return null; List regions = new ArrayList<>(); @@ -508,6 +522,10 @@ private void applyField(String key, String value, boolean overwrite) { if (overwrite || environment == null) environment = value; break; case "useSync": if (overwrite || useSync == null) useSync = Boolean.parseBoolean(value); break; + case "http2Enabled": + if (overwrite || http2Enabled == null) http2Enabled = Boolean.parseBoolean(value); break; + case "http2MaxConcurrentStreams": + if (overwrite || http2MaxConcurrentStreams == null) http2MaxConcurrentStreams = Integer.parseInt(value); break; // JVM-global properties (minConnectionPoolSizePerEndpoint, isPartitionLevelCircuitBreakerEnabled, // isPerPartitionAutomaticFailoverRequired) are handled in BenchmarkConfig, not per-tenant. case "minConnectionPoolSizePerEndpoint": diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java index 8f3a94772025..ee068d8b2fb4 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java @@ -12,13 +12,18 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.GatewayConnectionConfig; +import com.azure.cosmos.Http2ConnectionConfig; import com.azure.cosmos.benchmark.Benchmark; import com.azure.cosmos.benchmark.BenchmarkHelper; import com.azure.cosmos.benchmark.PojoizedJson; import com.azure.cosmos.benchmark.TenantWorkloadConfig; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.ThroughputProperties; @@ -87,6 +92,13 @@ public AsyncCtlWorkload(TenantWorkloadConfig workloadCfg, Scheduler scheduler) { } else { GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig(); gatewayConnectionConfig.setMaxConnectionPoolSize(workloadCfg.getMaxConnectionPoolSize()); + if (workloadCfg.isHttp2Enabled()) { + Http2ConnectionConfig http2Config = gatewayConnectionConfig.getHttp2ConnectionConfig(); + http2Config.setEnabled(true); + if (workloadCfg.getHttp2MaxConcurrentStreams() != null) { + http2Config.setMaxConcurrentStreams(workloadCfg.getHttp2MaxConcurrentStreams()); + } + } cosmosClientBuilder = cosmosClientBuilder.gatewayMode(gatewayConnectionConfig); } cosmosClient = cosmosClientBuilder.buildAsyncClient(); @@ -244,33 +256,41 @@ private void parsedReadWriteQueryReadManyPct(String readWriteQueryReadManyPct) { private void createPrePopulatedDocs(int numberOfPreCreatedDocuments) { for (CosmosAsyncContainer container : containers) { + List generatedDocs = new ArrayList<>(); + + Flux bulkOperationFlux = Flux.range(0, numberOfPreCreatedDocuments) + .map(i -> { + String uId = UUID.randomUUID().toString(); + PojoizedJson newDoc = BenchmarkHelper.generateDocument(uId, + dataFieldValue, + partitionKey, + workloadConfig.getDocumentDataFieldCount()); + generatedDocs.add(newDoc); + return CosmosBulkOperations.getCreateItemOperation(newDoc, new PartitionKey(uId)); + }); + AtomicLong successCount = new AtomicLong(0); AtomicLong failureCount = new AtomicLong(0); - ArrayList> createDocumentObservables = new ArrayList<>(); - for (int i = 0; i < numberOfPreCreatedDocuments; i++) { - String uId = UUID.randomUUID().toString(); - PojoizedJson newDoc = BenchmarkHelper.generateDocument(uId, - dataFieldValue, - partitionKey, - workloadConfig.getDocumentDataFieldCount()); - - Flux obs = container.createItem(newDoc).map(resp -> { - PojoizedJson x = - resp.getItem(); - return x; - }).onErrorResume(throwable -> { - failureCount.incrementAndGet(); - logger.error("Error during pre populating item ", throwable.getMessage()); - return Mono.empty(); - }).doOnSuccess(pojoizedJson -> { - successCount.incrementAndGet(); - }).flux(); - createDocumentObservables.add(obs); - } - docsToRead.put(container.getId(), - Flux.merge(Flux.fromIterable(createDocumentObservables), 100).collectList().block()); + List> failedResponses = new ArrayList<>(); + CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); + container.executeBulkOperations(bulkOperationFlux, bulkExecutionOptions) + .doOnNext(response -> { + if (response.getResponse() != null && response.getResponse().isSuccessStatusCode()) { + successCount.incrementAndGet(); + } else { + failureCount.incrementAndGet(); + failedResponses.add(response); + logger.error("Error during pre populating item {}", + response.getException() != null ? response.getException().getMessage() : "unknown error"); + } + }) + .blockLast(Duration.ofMinutes(10)); + + BenchmarkHelper.retryFailedBulkOperations(failedResponses, container, partitionKey); + + docsToRead.put(container.getId(), generatedDocs); logger.info("Finished pre-populating {} documents for container {}", - successCount.get() - failureCount.get(), container.getId()); + successCount.get(), container.getId()); if (failureCount.get() > 0) { logger.info("Failed pre-populating {} documents for container {}", failureCount.get(), container.getId()); diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/encryption/AsyncEncryptionBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/encryption/AsyncEncryptionBenchmark.java index 6801350ca145..50280a118590 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/encryption/AsyncEncryptionBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/encryption/AsyncEncryptionBenchmark.java @@ -12,6 +12,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.GatewayConnectionConfig; +import com.azure.cosmos.Http2ConnectionConfig; import com.azure.cosmos.benchmark.Benchmark; import com.azure.cosmos.benchmark.BenchmarkHelper; import com.azure.cosmos.benchmark.Operation; @@ -27,9 +28,12 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.ClientEncryptionIncludedPath; import com.azure.cosmos.models.ClientEncryptionPolicy; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; import com.azure.cosmos.models.CosmosClientEncryptionKeyProperties; import com.azure.cosmos.models.CosmosContainerProperties; -import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.EncryptionKeyWrapMetadata; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; @@ -44,7 +48,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.util.retry.Retry; import java.io.IOException; import java.io.InputStream; @@ -108,6 +111,13 @@ public abstract class AsyncEncryptionBenchmark implements Benchmark { } else { GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig(); gatewayConnectionConfig.setMaxConnectionPoolSize(workloadCfg.getMaxConnectionPoolSize()); + if (workloadCfg.isHttp2Enabled()) { + Http2ConnectionConfig http2Config = gatewayConnectionConfig.getHttp2ConnectionConfig(); + http2Config.setEnabled(true); + if (workloadCfg.getHttp2MaxConcurrentStreams() != null) { + http2Config.setMaxConcurrentStreams(workloadCfg.getHttp2MaxConcurrentStreams()); + } + } cosmosClientBuilder = cosmosClientBuilder.gatewayMode(gatewayConnectionConfig); } cosmosClient = cosmosClientBuilder.buildAsyncClient(); @@ -116,71 +126,50 @@ public abstract class AsyncEncryptionBenchmark implements Benchmark { createEncryptionDatabaseAndContainer(); partitionKey = cosmosAsyncContainer.read().block().getProperties().getPartitionKeyDefinition() .getPaths().iterator().next().split("/")[1]; - ArrayList> createDocumentObservables = new ArrayList<>(); - if (workloadConfig.getOperationType() != Operation.WriteLatency && workloadConfig.getOperationType() != Operation.WriteThroughput && workloadConfig.getOperationType() != Operation.ReadMyWrites) { logger.info("PRE-populating {} documents ....", workloadCfg.getNumberOfPreCreatedDocuments()); String dataFieldValue = RandomStringUtils.randomAlphabetic(workloadCfg.getDocumentDataFieldSize()); - for (int i = 0; i < workloadCfg.getNumberOfPreCreatedDocuments(); i++) { - String uuid = UUID.randomUUID().toString(); - PojoizedJson newDoc = BenchmarkHelper.generateDocument(uuid, - dataFieldValue, - partitionKey, - workloadConfig.getDocumentDataFieldCount()); - for (int j = 1; j <= workloadCfg.getEncryptedStringFieldCount(); j++) { - newDoc.setProperty(ENCRYPTED_STRING_FIELD + j, uuid); - } - for (int j = 1; j <= workloadCfg.getEncryptedLongFieldCount(); j++) { - newDoc.setProperty(ENCRYPTED_LONG_FIELD + j, 1234l); - } - for (int j = 1; j <= workloadCfg.getEncryptedDoubleFieldCount(); j++) { - newDoc.setProperty(ENCRYPTED_DOUBLE_FIELD + j, 1234.01d); - } + List generatedDocs = new ArrayList<>(); + + Flux bulkOperationFlux = Flux.range(0, workloadCfg.getNumberOfPreCreatedDocuments()) + .map(i -> { + String uuid = UUID.randomUUID().toString(); + PojoizedJson newDoc = BenchmarkHelper.generateDocument(uuid, + dataFieldValue, + partitionKey, + workloadConfig.getDocumentDataFieldCount()); + for (int j = 1; j <= workloadCfg.getEncryptedStringFieldCount(); j++) { + newDoc.setProperty(ENCRYPTED_STRING_FIELD + j, uuid); + } + for (int j = 1; j <= workloadCfg.getEncryptedLongFieldCount(); j++) { + newDoc.setProperty(ENCRYPTED_LONG_FIELD + j, 1234l); + } + for (int j = 1; j <= workloadCfg.getEncryptedDoubleFieldCount(); j++) { + newDoc.setProperty(ENCRYPTED_DOUBLE_FIELD + j, 1234.01d); + } + generatedDocs.add(newDoc); + return CosmosBulkOperations.getCreateItemOperation(newDoc, new PartitionKey(uuid)); + }); - Flux obs = cosmosEncryptionAsyncContainer - .createItem(newDoc, new PartitionKey(uuid), new CosmosItemRequestOptions()) - .retryWhen(Retry.max(5).filter((error) -> { - if (!(error instanceof CosmosException)) { - return false; - } - final CosmosException cosmosException = (CosmosException) error; - if (cosmosException.getStatusCode() == 410 || - cosmosException.getStatusCode() == 408 || - cosmosException.getStatusCode() == 429 || - cosmosException.getStatusCode() == 503) { - return true; - } - - return false; - })) - .onErrorResume( - (error) -> { - if (!(error instanceof CosmosException)) { - return false; - } - final CosmosException cosmosException = (CosmosException) error; - if (cosmosException.getStatusCode() == 409) { - return true; - } - - return false; - }, - (conflictException) -> cosmosAsyncContainer.readItem( - uuid, new PartitionKey(partitionKey), PojoizedJson.class) - ) - .map(resp -> { - PojoizedJson x = - resp.getItem(); - return x; - }) - .flux(); - createDocumentObservables.add(obs); - } - } + CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); + List> failedResponses = new ArrayList<>(); + cosmosEncryptionAsyncContainer + .executeBulkOperations(bulkOperationFlux, bulkExecutionOptions) + .doOnNext(response -> { + if (response.getResponse() == null || !response.getResponse().isSuccessStatusCode()) { + failedResponses.add(response); + } + }) + .blockLast(Duration.ofMinutes(10)); + + BenchmarkHelper.retryFailedBulkOperations(failedResponses, cosmosAsyncContainer, partitionKey); - docsToRead = Flux.merge(Flux.fromIterable(createDocumentObservables), 100).collectList().block(); + docsToRead = generatedDocs; + } else { + docsToRead = new ArrayList<>(); + } logger.info("Finished pre-populating {} documents", workloadCfg.getNumberOfPreCreatedDocuments()); init(); diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/TenantWorkloadConfigApplyFieldTest.java b/sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/TenantWorkloadConfigApplyFieldTest.java new file mode 100644 index 000000000000..62143c20997e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/TenantWorkloadConfigApplyFieldTest.java @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.benchmark; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.testng.annotations.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Ensures every @JsonProperty field in TenantWorkloadConfig has a corresponding + * case in the applyField() switch statement, so that tenantDefaults inheritance works. + */ +public class TenantWorkloadConfigApplyFieldTest { + + // Fields that are intentionally excluded from applyField + private static final Set EXCLUDED_FIELDS = new HashSet<>(Arrays.asList( + "id" // tenant ID should not be inherited from defaults + )); + + @Test(groups = {"unit"}) + public void allJsonPropertiesShouldHaveApplyFieldCase() throws IOException { + // Collect all @JsonProperty names from the class + Set jsonPropertyNames = new HashSet<>(); + for (Field field : TenantWorkloadConfig.class.getDeclaredFields()) { + JsonProperty annotation = field.getAnnotation(JsonProperty.class); + if (annotation != null) { + jsonPropertyNames.add(annotation.value()); + } + } + + // Parse the source file to find all case "..." entries in applyField + Path sourceFile = Paths.get("src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java"); + String source = new String(Files.readAllBytes(sourceFile)); + Set caseNames = new HashSet<>(); + Matcher matcher = Pattern.compile("case\\s+\"([^\"]+)\"").matcher(source); + while (matcher.find()) { + caseNames.add(matcher.group(1)); + } + + // Every @JsonProperty (except excluded) should have a case in applyField + Set missingCases = new HashSet<>(); + for (String propName : jsonPropertyNames) { + if (!EXCLUDED_FIELDS.contains(propName) && !caseNames.contains(propName)) { + missingCases.add(propName); + } + } + + assertThat(missingCases) + .as("@JsonProperty fields missing from applyField() switch — " + + "these fields won't be inherited from tenantDefaults. " + + "Add a case in applyField() for each, or add to EXCLUDED_FIELDS if intentional.") + .isEmpty(); + } +}