From 0323bcc785021e429ad6b7ba2065643748c46a49 Mon Sep 17 00:00:00 2001 From: Isabelle Date: Wed, 29 Apr 2026 11:50:59 -0700 Subject: [PATCH] Add encoder stress tests --- sdk/storage/azure-storage-blob-stress/pom.xml | 20 ++ .../scenarios-matrix.yaml | 315 ++++++++++++++++-- .../com/azure/storage/blob/stress/App.java | 10 + .../storage/blob/stress/CommitBlockList.java | 39 +-- ...ntentValidationAppendBlobOutputStream.java | 88 +++++ .../stress/ContentValidationAppendBlock.java | 76 +++++ ...ontentValidationBlockBlobOutputStream.java | 75 +++++ .../ContentValidationBlockBlobUpload.java | 70 ++++ ...ContentValidationPageBlobOutputStream.java | 105 ++++++ ...entValidationSeekableByteChannelWrite.java | 83 +++++ .../stress/ContentValidationStageBlock.java | 89 +++++ .../ContentValidationStressOptions.java | 26 ++ .../blob/stress/ContentValidationUpload.java | 70 ++++ .../ContentValidationUploadFromFile.java | 138 ++++++++ .../stress/ContentValidationUploadPages.java | 82 +++++ sdk/storage/azure-storage-stress/pom.xml | 20 ++ .../azure/storage/stress/TelemetryHelper.java | 2 +- 17 files changed, 1252 insertions(+), 56 deletions(-) create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java diff --git a/sdk/storage/azure-storage-blob-stress/pom.xml b/sdk/storage/azure-storage-blob-stress/pom.xml index f214138d3f8b..c22d4d2e4096 100644 --- a/sdk/storage/azure-storage-blob-stress/pom.xml +++ b/sdk/storage/azure-storage-blob-stress/pom.xml @@ -19,8 +19,23 @@ - + + + 1.58.0 + + + + io.opentelemetry + opentelemetry-bom + ${opentelemetry.version} + pom + import + + + + ch.qos.logback @@ -64,6 +79,11 @@ opentelemetry-logback-appender-1.0 2.24.0-alpha + + io.opentelemetry + opentelemetry-api-incubator + 1.58.0-alpha + com.azure diff --git a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml index 3a8920a67b35..9e8c1bad70f7 100644 --- a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml +++ b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml @@ -5,7 +5,7 @@ matrix: testScenario: downloadtofile sync: true sizeBytes: 1024 - downloadFaults: true + downloadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -14,7 +14,7 @@ matrix: testScenario: downloadtofile sync: false sizeBytes: 1024 - downloadFaults: true + downloadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -23,7 +23,7 @@ matrix: testScenario: downloadtofile sync: true sizeBytes: "52428800" - downloadFaults: true + downloadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -32,7 +32,7 @@ matrix: testScenario: downloadstream sync: true sizeBytes: 1024 - downloadFaults: true + downloadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -41,7 +41,7 @@ matrix: testScenario: downloadstream sync: false sizeBytes: 1024 - downloadFaults: true + downloadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -50,7 +50,7 @@ matrix: testScenario: downloadstream sync: true sizeBytes: "52428800" - downloadFaults: true + downloadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -59,7 +59,7 @@ matrix: testScenario: downloadcontent sync: true sizeBytes: 1024 - downloadFaults: true + downloadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -68,7 +68,7 @@ matrix: testScenario: downloadcontent sync: true sizeBytes: "52428800" - downloadFaults: true + downloadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -77,7 +77,7 @@ matrix: testScenario: openinputstream sync: true sizeBytes: 1024 - downloadFaults: true + downloadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -86,7 +86,7 @@ matrix: testScenario: openinputstream sync: true sizeBytes: "52428800" - downloadFaults: true + downloadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -95,7 +95,7 @@ matrix: testScenario: openseekablebytechannelread sync: true sizeBytes: 1024 - downloadFaults: true + downloadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -104,7 +104,7 @@ matrix: testScenario: openseekablebytechannelread sync: true sizeBytes: "52428800" - downloadFaults: true + downloadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -113,7 +113,7 @@ matrix: testScenario: appendblock sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -122,7 +122,7 @@ matrix: testScenario: appendblock sync: false sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -131,7 +131,7 @@ matrix: testScenario: appendblock sync: true sizeBytes: "26214400" - uploadFaults: true + uploadFaults: false durationMin: 30 imageBuildDir: "../../.." @@ -140,7 +140,7 @@ matrix: testScenario: appendbloboutputstream sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -149,7 +149,7 @@ matrix: testScenario: appendbloboutputstream sync: true sizeBytes: "10240" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -158,7 +158,7 @@ matrix: testScenario: blockblobupload sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -167,7 +167,7 @@ matrix: testScenario: blockblobupload sync: true sizeBytes: "26214400" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -176,7 +176,7 @@ matrix: testScenario: blockbloboutputstream sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -185,7 +185,7 @@ matrix: testScenario: blockbloboutputstream sync: true sizeBytes: "26214400" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -194,7 +194,7 @@ matrix: testScenario: commitblocklist sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -203,7 +203,7 @@ matrix: testScenario: commitblocklist sync: true sizeBytes: "26214400" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -212,7 +212,7 @@ matrix: testScenario: openseekablebytechannelwrite sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 10 imageBuildDir: "../../.." @@ -221,7 +221,7 @@ matrix: testScenario: openseekablebytechannelwrite sync: true sizeBytes: "52428800" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -230,7 +230,7 @@ matrix: testScenario: stageblock sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -239,7 +239,7 @@ matrix: testScenario: stageblock sync: true sizeBytes: "26214400" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -248,7 +248,7 @@ matrix: testScenario: pagebloboutputstream sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -257,7 +257,7 @@ matrix: testScenario: pagebloboutputstream sync: true sizeBytes: "10240" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -266,7 +266,7 @@ matrix: testScenario: uploadpages sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -275,7 +275,7 @@ matrix: testScenario: uploadpages sync: true sizeBytes: "4194304" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -284,7 +284,7 @@ matrix: testScenario: upload sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -293,7 +293,7 @@ matrix: testScenario: upload sync: false sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -302,7 +302,7 @@ matrix: testScenario: upload sync: true sizeBytes: "52428800" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." @@ -311,7 +311,7 @@ matrix: testScenario: uploadfromfile sync: true sizeBytes: 1024 - uploadFaults: true + uploadFaults: false durationMin: 25 imageBuildDir: "../../.." @@ -320,6 +320,249 @@ matrix: testScenario: uploadfromfile sync: true sizeBytes: "52428800" - uploadFaults: true + uploadFaults: false durationMin: 60 imageBuildDir: "../../.." + + # --- Content validation (ContentValidation*; default CRC64). Scenario keys are short: K8s label + # testInstance = "{Scenario}-{BaseName}-{revision}" must be <= 63 chars (see stress-test-job.yaml). + + cv-appendblock-sm: + testScenario: contentvalidationappendblock + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-appendblock-lg: + testScenario: contentvalidationappendblock + sync: true + sizeBytes: "26214400" + uploadFaults: false + durationMin: 30 + imageBuildDir: "../../.." + + cv-appendblock-async-sm: + testScenario: contentvalidationappendblock + sync: false + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-appendblock-async-lg: + testScenario: contentvalidationappendblock + sync: false + sizeBytes: "26214400" + uploadFaults: false + durationMin: 30 + imageBuildDir: "../../.." + + cv-appendbloboutputstream-sm: + testScenario: contentvalidationappendbloboutputstream + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-appendbloboutputstream-lg: + testScenario: contentvalidationappendbloboutputstream + sync: true + sizeBytes: "10240" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-blockblobupload-sm: + testScenario: contentvalidationblockblobupload + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-blockblobupload-lg: + testScenario: contentvalidationblockblobupload + sync: true + sizeBytes: "26214400" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-blockbloboutputstream-sm: + testScenario: contentvalidationblockbloboutputstream + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-blockbloboutputstream-lg: + testScenario: contentvalidationblockbloboutputstream + sync: true + sizeBytes: "26214400" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-seekablebytechannelwrite-sm: + testScenario: contentvalidationseekablebytechannelwrite + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 10 + imageBuildDir: "../../.." + + cv-seekablebytechannelwrite-lg: + testScenario: contentvalidationseekablebytechannelwrite + sync: true + sizeBytes: "52428800" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-stageblock-sm: + testScenario: contentvalidationstageblock + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-stageblock-lg: + testScenario: contentvalidationstageblock + sync: true + sizeBytes: "26214400" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-stageblock-async-sm: + testScenario: contentvalidationstageblock + sync: false + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-stageblock-async-lg: + testScenario: contentvalidationstageblock + sync: false + sizeBytes: "26214400" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-pagebloboutputstream-sm: + testScenario: contentvalidationpagebloboutputstream + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-pagebloboutputstream-lg: + testScenario: contentvalidationpagebloboutputstream + sync: true + sizeBytes: "10240" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-uploadpages-sm: + testScenario: contentvalidationuploadpages + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-uploadpages-lg: + testScenario: contentvalidationuploadpages + sync: true + sizeBytes: "4194304" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-uploadpages-async-sm: + testScenario: contentvalidationuploadpages + sync: false + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-uploadpages-async-lg: + testScenario: contentvalidationuploadpages + sync: false + sizeBytes: "4194304" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-upload-sm: + testScenario: contentvalidationupload + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-upload-lg: + testScenario: contentvalidationupload + sync: true + sizeBytes: "52428800" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-upload-async-sm: + testScenario: contentvalidationupload + sync: false + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-upload-async-lg: + testScenario: contentvalidationupload + sync: false + sizeBytes: "52428800" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-uploadfromfile-sm: + testScenario: contentvalidationuploadfromfile + sync: true + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-uploadfromfile-lg: + testScenario: contentvalidationuploadfromfile + sync: true + sizeBytes: "52428800" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." + + cv-uploadfromfile-async-sm: + testScenario: contentvalidationuploadfromfile + sync: false + sizeBytes: 1024 + uploadFaults: false + durationMin: 25 + imageBuildDir: "../../.." + + cv-uploadfromfile-async-lg: + testScenario: contentvalidationuploadfromfile + sync: false + sizeBytes: "52428800" + uploadFaults: false + durationMin: 60 + imageBuildDir: "../../.." \ No newline at end of file diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java index e38bd16791ca..3f3219930596 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java @@ -15,6 +15,16 @@ public static void main(String[] args) { BlockBlobOutputStream.class, BlockBlobUpload.class, CommitBlockList.class, + ContentValidationAppendBlobOutputStream.class, + ContentValidationAppendBlock.class, + ContentValidationBlockBlobOutputStream.class, + ContentValidationBlockBlobUpload.class, + ContentValidationPageBlobOutputStream.class, + ContentValidationStageBlock.class, + ContentValidationSeekableByteChannelWrite.class, + ContentValidationUpload.class, + ContentValidationUploadFromFile.class, + ContentValidationUploadPages.class, DownloadToFile.class, DownloadStream.class, DownloadContent.class, diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java index 310c7618e5d9..5e63058a4ab5 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java @@ -1,6 +1,3 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - package com.azure.storage.blob.stress; import com.azure.core.util.Context; @@ -11,7 +8,6 @@ import com.azure.storage.blob.specialized.BlockBlobAsyncClient; import com.azure.storage.blob.specialized.BlockBlobClient; import com.azure.storage.blob.stress.utils.OriginalContent; -import com.azure.storage.common.Utility; import com.azure.storage.stress.CrcInputStream; import com.azure.storage.stress.StorageStressOptions; import reactor.core.publisher.Flux; @@ -19,10 +15,17 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Base64; import java.util.Collections; public class CommitBlockList extends BlobScenarioBase { + // Per-operation ceilings. Must be smaller than the Azure LB idle timeout (~4 min) + // so that a stalled request fails fast rather than hangs the whole test run. + private static final Duration STAGE_TIMEOUT = Duration.ofSeconds(120); + private static final Duration COMMIT_TIMEOUT = Duration.ofSeconds(30); + private static final Duration CHECK_MATCH_TIMEOUT = Duration.ofSeconds(30); + private final OriginalContent originalContent = new OriginalContent(); private final BlobClient syncClient; private final BlobAsyncClient asyncClient; @@ -45,14 +48,11 @@ protected void runInternal(Context span) throws Exception { String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString() .getBytes(StandardCharsets.UTF_8)); try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { - // First perform non-faulted stage block to send data to the service - blockBlobClientNoFault.stageBlockWithResponse(blockId, inputStream, options.getSize(), null, null, null, - span); - // Then perform faulted commit block list to commit the block + blockBlobClientNoFault.stageBlockWithResponse(blockId, inputStream, options.getSize(), null, null, + STAGE_TIMEOUT, span); blockBlobClient.commitBlockListWithResponse( - new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)), null, span); - // Confirm the CRC matches for the uploaded input stream - originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)), COMMIT_TIMEOUT, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(CHECK_MATCH_TIMEOUT); } } @@ -64,13 +64,14 @@ protected Mono runInternalAsync(Context span) { .getBytes(StandardCharsets.UTF_8)); Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) .convertStreamToByteBuffer(); - // First perform non-faulted stage block to send data to the service - return blockBlobAsyncClientNoFault.stageBlockWithResponse(blockId, byteBufferFlux, options.getSize(), null, null) - // Then perform faulted commit block list to commit the block - .then(blockBlobAsyncClient.commitBlockListWithResponse( - new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)))) - // Confirm the CRC matches for the uploaded byte buffer flux - .then(originalContent.checkMatch(byteBufferFlux, span)); + return blockBlobAsyncClientNoFault + .stageBlockWithResponse(blockId, byteBufferFlux, options.getSize(), null, null) + .timeout(STAGE_TIMEOUT) + .then(blockBlobAsyncClient + .commitBlockListWithResponse( + new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId))) + .timeout(COMMIT_TIMEOUT)) + .then(originalContent.checkMatch(byteBufferFlux, span).timeout(CHECK_MATCH_TIMEOUT)); } @Override @@ -83,4 +84,4 @@ public Mono cleanupAsync() { return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } -} +} \ No newline at end of file diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java new file mode 100644 index 000000000000..b2b9d1845fa4 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java @@ -0,0 +1,88 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.AppendBlobOutputStreamOptions; +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.IOException; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Append blob output stream with {@link AppendBlobOutputStreamOptions#setContentValidationAlgorithm} (sync only). + */ +public class ContentValidationAppendBlobOutputStream extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationAppendBlobOutputStream.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + /** Separate blob used to upload reference content for {@link OriginalContent} checksum (block blob). */ + private final BlobAsyncClient tempSetupBlobClient; + + public ContentValidationAppendBlobOutputStream(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + } + + @Override + protected void runInternal(Context span) throws IOException { + AppendBlobClient appendBlobClient = syncClient.getAppendBlobClient(); + // Reset the append blob at the start of each iteration. The boolean overload + // getBlobOutputStream(true) does this implicitly via create(true); the options overload + // does not, so we replicate that behavior here. Without this reset, fault-injection + // sequences that commit a block server-side but drop the response leave the cached + // appendPosition stale, causing subsequent retries to fail with 412 AppendPositionConditionNotMet, + // which combined with non-retriable Crc64Mismatch on truncated-body faults collapses the pass rate. + appendBlobClient.create(true); + + AppendBlobOutputStreamOptions streamOptions = new AppendBlobOutputStreamOptions() + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()); + + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()); + BlobOutputStream outputStream = appendBlobClient.getBlobOutputStream(streamOptions)) { + byte[] buffer = new byte[4096]; + int bytesRead; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + + outputStream.close(); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(asyncNoFaultClient.getAppendBlobAsyncClient().create()) + .then(originalContent.setupBlob(tempSetupBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(tempSetupBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java new file mode 100644 index 000000000000..bcbaf7681742 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.AppendBlobAppendBlockOptions; +import com.azure.storage.blob.specialized.AppendBlobAsyncClient; +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Append block with {@link AppendBlobAppendBlockOptions#setContentValidationAlgorithm}. + */ +public class ContentValidationAppendBlock extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final BlobAsyncClient tempSetupBlobClient; + + public ContentValidationAppendBlock(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + this.tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + } + + @Override + protected void runInternal(Context span) { + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + AppendBlobClient appendBlobClient = syncClient.getAppendBlobClient(); + appendBlobClient.appendBlockWithResponse( + new AppendBlobAppendBlockOptions(inputStream, options.getSize()) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + AppendBlobAsyncClient appendBlobAsyncClient = asyncClient.getAppendBlobAsyncClient(); + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + return appendBlobAsyncClient.appendBlockWithResponse( + new AppendBlobAppendBlockOptions(byteBufferFlux, options.getSize()) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(asyncNoFaultClient.getAppendBlobAsyncClient().create()) + .then(originalContent.setupBlob(tempSetupBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.getAppendBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java new file mode 100644 index 000000000000..175b648002a5 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlockBlobOutputStreamOptions; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.IOException; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Block blob output stream with {@link BlockBlobOutputStreamOptions#setContentValidationAlgorithm} (sync only). + */ +public class ContentValidationBlockBlobOutputStream extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationBlockBlobOutputStream.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final ParallelTransferOptions parallelTransferOptions; + + public ContentValidationBlockBlobOutputStream(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.parallelTransferOptions = new ParallelTransferOptions().setMaxConcurrency(options.getMaxConcurrency()); + } + + @Override + protected void runInternal(Context span) throws IOException { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + BlockBlobOutputStreamOptions streamOptions = new BlockBlobOutputStreamOptions() + .setParallelTransferOptions(parallelTransferOptions) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()); + + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()); + BlobOutputStream outputStream = blockBlobClient.getBlobOutputStream(streamOptions, span)) { + byte[] buffer = new byte[4096]; + int bytesRead; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + + outputStream.close(); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists().then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java new file mode 100644 index 000000000000..e86a410e1945 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Single-shot block blob upload with request content validation + * ({@link BlockBlobSimpleUploadOptions#setContentValidationAlgorithm}). + */ +public class ContentValidationBlockBlobUpload extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationBlockBlobUpload(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + blockBlobClient.uploadWithResponse( + new BlockBlobSimpleUploadOptions(inputStream, options.getSize()) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + BlockBlobAsyncClient blockBlobAsyncClient = asyncClient.getBlockBlobAsyncClient(); + return blockBlobAsyncClient.uploadWithResponse( + new BlockBlobSimpleUploadOptions(byteBufferFlux, options.getSize()) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java new file mode 100644 index 000000000000..b49a0305f12f --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java @@ -0,0 +1,105 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.PageRange; +import com.azure.storage.blob.options.PageBlobOutputStreamOptions; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.PageBlobAsyncClient; +import com.azure.storage.blob.specialized.PageBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Page blob output stream with {@link PageBlobOutputStreamOptions#setContentValidationAlgorithm} (sync only). + */ +public class ContentValidationPageBlobOutputStream extends PageBlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationPageBlobOutputStream.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + /** Page blob used only to seed {@link OriginalContent} (same pattern as {@link PageBlobOutputStream}). */ + private final PageBlobAsyncClient tempSetupPageBlobClient; + + public ContentValidationPageBlobOutputStream(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + BlobAsyncClient tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + this.tempSetupPageBlobClient = tempSetupBlobClient.getPageBlobAsyncClient(); + } + + @Override + protected void runInternal(Context span) throws IOException { + PageBlobClient pageBlobClient = syncClient.getPageBlobClient(); + PageBlobOutputStreamOptions streamOptions = new PageBlobOutputStreamOptions( + new PageRange().setStart(0).setEnd(options.getSize() - 1)) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()); + + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()); + BlobOutputStream outputStream = pageBlobClient.getBlobOutputStream(streamOptions)) { + ByteArrayOutputStream bufferStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[512]; + int bytesRead; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + // Always accumulate into bufferStream to avoid dropping or reordering bytes + bufferStream.write(buffer, 0, bytesRead); + // Flush all full 512-byte pages from the accumulator + if (bufferStream.size() >= buffer.length) { + byte[] toWrite = bufferStream.toByteArray(); + int length = toWrite.length - (toWrite.length % buffer.length); + if (length > 0) { + outputStream.write(toWrite, 0, length); + bufferStream.reset(); + // Keep any remaining partial page bytes in the accumulator + bufferStream.write(toWrite, length, toWrite.length - length); + } + } + } + // For page blobs, total content size must be a multiple of 512 bytes. + // Any remaining bytes here indicate misalignment and would result in silent truncation. + if (bufferStream.size() != 0) { + throw new IOException("Remaining bytes in buffer that do not align to 512-byte page size."); + } + + outputStream.close(); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(asyncNoFaultClient.getPageBlobAsyncClient().create(options.getSize())) + .then(tempSetupPageBlobClient.create(options.getSize())) + .then(originalContent.setupPageBlob(tempSetupPageBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java new file mode 100644 index 000000000000..65afda98636f --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java @@ -0,0 +1,83 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlockBlobSeekableByteChannelWriteOptions; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.common.implementation.StorageSeekableByteChannel; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static com.azure.core.util.FluxUtil.monoError; +import static com.azure.storage.blob.options.BlockBlobSeekableByteChannelWriteOptions.WriteMode.OVERWRITE; + +/** + * Block-blob seekable byte channel write with {@link BlockBlobSeekableByteChannelWriteOptions#setContentValidationAlgorithm}. + * Matches {@link com.azure.storage.blob.BlobContentValidationUploadTests} seekable-channel scenarios (sync only). + */ +public class ContentValidationSeekableByteChannelWrite extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationSeekableByteChannelWrite.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationSeekableByteChannelWrite(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) throws IOException { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + BlockBlobSeekableByteChannelWriteOptions writeOptions = new BlockBlobSeekableByteChannelWriteOptions(OVERWRITE) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()); + + try (CrcInputStream crcInput = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + Flux byteBufferFlux = crcInput.convertStreamToByteBuffer(); + try (StorageSeekableByteChannel channel = (StorageSeekableByteChannel) blockBlobClient.openSeekableByteChannelWrite( + writeOptions)) { + Mono writeOperation = byteBufferFlux + .doOnNext(buffer -> { + try { + while (buffer.hasRemaining()) { + channel.write(buffer); + } + } catch (IOException e) { + throw LOGGER.logExceptionAsError(new RuntimeException(e)); + } + }).then(); + writeOperation.block(); + channel.getWriteBehavior().commit(options.getSize()); + } + originalContent.checkMatch(byteBufferFlux, span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException( + "openSeekableByteChannelWrite() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists().then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java new file mode 100644 index 000000000000..c95dd7e9a98a --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.CoreUtils; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions; +import com.azure.storage.blob.options.BlockBlobStageBlockOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; + +/** + * Stage block with request content validation on the faulted client, then commit via the non-faulted client. + */ +public class ContentValidationStageBlock extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobClient syncNoFaultClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationStageBlock(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.syncNoFaultClient = getSyncContainerClientNoFault().getBlobClient(blobName); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + BlockBlobClient blockBlobClientNoFault = syncNoFaultClient.getBlockBlobClient(); + String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString() + .getBytes(StandardCharsets.UTF_8)); + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + BinaryData data = BinaryData.fromStream(inputStream, options.getSize()); + blockBlobClient.stageBlockWithResponse( + new BlockBlobStageBlockOptions(blockId, data) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + null, span); + blockBlobClientNoFault.commitBlockListWithResponse( + new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)), null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + BlockBlobAsyncClient blockBlobAsyncClient = asyncClient.getBlockBlobAsyncClient(); + BlockBlobAsyncClient blockBlobAsyncClientNoFault = asyncNoFaultClient.getBlockBlobAsyncClient(); + String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString() + .getBytes(StandardCharsets.UTF_8)); + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + return BinaryData.fromFlux(byteBufferFlux, options.getSize(), false) + .flatMap(binaryData -> blockBlobAsyncClient.stageBlockWithResponse( + new BlockBlobStageBlockOptions(blockId, binaryData) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()))) + .then(blockBlobAsyncClientNoFault.commitBlockListWithResponse( + new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)))) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java new file mode 100644 index 000000000000..1e92d331d33e --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.storage.common.ContentValidationAlgorithm; +import com.azure.storage.stress.StorageStressOptions; +import com.beust.jcommander.Parameter; + +/** + * Options for stress scenarios that enable transactional request content validation on uploads + * (CRC64 / structured message). See {@link com.azure.storage.blob.BlobContentValidationUploadTests}. + */ +public class ContentValidationStressOptions extends StorageStressOptions { + /** + * Request checksum behavior for upload APIs. Use CRC64 or AUTO to exercise content validation. + * MD5 is not supported for uploadFromFile. NONE disables request validation. + */ + @Parameter(names = { "--requestChecksumAlgorithm" }, + description = "CRC64 (default), AUTO, NONE, or MD5 (not valid for upload-from-file scenarios)") + private ContentValidationAlgorithm requestChecksumAlgorithm = ContentValidationAlgorithm.CRC64; + + public ContentValidationAlgorithm getContentValidationAlgorithm() { + return requestChecksumAlgorithm; + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java new file mode 100644 index 000000000000..b2cc61909be3 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Parallel blob upload with {@link com.azure.storage.blob.options.BlobParallelUploadOptions#setContentValidationAlgorithm} + * enabled. Verifies the correctness of the upload request content via CRC (see {@code BlobContentValidationUploadTests}). + */ +public class ContentValidationUpload extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final ParallelTransferOptions parallelTransferOptions; + + public ContentValidationUpload(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + parallelTransferOptions = new ParallelTransferOptions() + .setMaxConcurrency(options.getMaxConcurrency()) + .setMaxSingleUploadSizeLong(4 * 1024 * 1024L); + } + + @Override + protected void runInternal(Context span) { + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + syncClient.uploadWithResponse(new BlobParallelUploadOptions(inputStream) + .setParallelTransferOptions(parallelTransferOptions) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + return asyncClient.uploadWithResponse(new BlobParallelUploadOptions(byteBufferFlux) + .setParallelTransferOptions(parallelTransferOptions) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java new file mode 100644 index 000000000000..a716563c1752 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java @@ -0,0 +1,138 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.CoreUtils; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobDownloadToFileOptions; +import com.azure.storage.blob.options.BlobUploadFromFileOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.UUID; + +/** + * Upload from file with {@link BlobUploadFromFileOptions#setContentValidationAlgorithm}. + */ +public class ContentValidationUploadFromFile extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationUploadFromFile.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobClient syncNoFaultClient; + private final BlobAsyncClient asyncNoFaultClient; + private final ParallelTransferOptions parallelTransferOptions; + + public ContentValidationUploadFromFile(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncNoFaultClient = getSyncContainerClientNoFault().getBlobClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + this.parallelTransferOptions = new ParallelTransferOptions() + .setMaxConcurrency(options.getMaxConcurrency()) + .setMaxSingleUploadSizeLong(4 * 1024 * 1024L); + } + + @Override + protected void runInternal(Context span) { + Path downloadPath = getTempPath("test"); + Path uploadFilePath = null; + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + uploadFilePath = generateFile(inputStream); + downloadPath = downloadPath.resolve(CoreUtils.randomUuid() + ".txt"); + syncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString()) + .setParallelTransferOptions(parallelTransferOptions) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + null, span); + syncNoFaultClient.downloadToFileWithResponse( + new BlobDownloadToFileOptions(downloadPath.toString()), null, span); + originalContent.checkMatch(BinaryData.fromFile(downloadPath), span).block(); + } finally { + deleteFile(downloadPath); + deleteFile(uploadFilePath); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + Path downloadPath = getTempPath("test"); + // This is written differently than the other runInternalAsync methods because uploadFromFile requires a file + // path, so we need to generate the temp file. + return Mono.using( + () -> new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()), + inputStream -> uploadAndVerifyAsync(inputStream, downloadPath, span), + CrcInputStream::close); + } + + private Mono uploadAndVerifyAsync(CrcInputStream inputStream, Path downloadDir, Context span) { + Path uploadFilePath = generateFile(inputStream); + Path downloadFilePath = downloadDir.resolve(UUID.randomUUID() + ".txt"); + + return asyncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString()) + .setParallelTransferOptions(parallelTransferOptions) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .flatMap(ignored -> asyncNoFaultClient.downloadToFileWithResponse( + new BlobDownloadToFileOptions(downloadFilePath.toString()))) + .flatMap(ignored -> originalContent.checkMatch(BinaryData.fromFile(downloadFilePath), span)) + .doFinally(signal -> { + deleteFile(uploadFilePath); + deleteFile(downloadFilePath); + }); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } + + private Path getTempPath(String prefix) { + try { + return Files.createTempDirectory(prefix); + } catch (IOException e) { + throw LOGGER.logExceptionAsError(new UncheckedIOException(e)); + } + } + + private static void deleteFile(Path path) { + try { + Files.deleteIfExists(path); + } catch (Throwable e) { + LOGGER.atError() + .addKeyValue("path", path) + .log("failed to delete file", e); + } + } + + private static Path generateFile(InputStream inputStream) { + try { + File file = Files.createTempFile(CoreUtils.randomUuid().toString(), ".txt").toFile(); + file.deleteOnExit(); + Files.copy(inputStream, file.toPath(), StandardCopyOption.REPLACE_EXISTING); + return file.toPath(); + } catch (IOException e) { + throw LOGGER.logExceptionAsError(new UncheckedIOException(e)); + } + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java new file mode 100644 index 000000000000..0e3c9603fe1c --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java @@ -0,0 +1,82 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.PageRange; +import com.azure.storage.blob.options.PageBlobUploadPagesOptions; +import com.azure.storage.blob.specialized.PageBlobAsyncClient; +import com.azure.storage.blob.specialized.PageBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Page blob upload pages with {@link PageBlobUploadPagesOptions#setContentValidationAlgorithm}. + */ +public class ContentValidationUploadPages extends PageBlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final PageBlobAsyncClient tempSetupPageBlobClient; + + public ContentValidationUploadPages(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + BlobAsyncClient tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + this.tempSetupPageBlobClient = tempSetupBlobClient.getPageBlobAsyncClient(); + } + + @Override + protected void runInternal(Context span) { + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + PageBlobClient pageBlobClient = syncClient.getPageBlobClient(); + PageRange range = new PageRange().setStart(0).setEnd(options.getSize() - 1); + pageBlobClient.uploadPagesWithResponse( + new PageBlobUploadPagesOptions(range, inputStream) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + PageBlobAsyncClient pageBlobAsyncClient = asyncClient.getPageBlobAsyncClient(); + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + PageRange range = new PageRange().setStart(0).setEnd(options.getSize() - 1); + return pageBlobAsyncClient.uploadPagesWithResponse( + new PageBlobUploadPagesOptions(range, byteBufferFlux) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(asyncNoFaultClient.getPageBlobAsyncClient().create(options.getSize())) + .then(tempSetupPageBlobClient.create(options.getSize())) + .then(originalContent.setupPageBlob(tempSetupPageBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-stress/pom.xml b/sdk/storage/azure-storage-stress/pom.xml index d8907fe67014..bdf518e233b8 100644 --- a/sdk/storage/azure-storage-stress/pom.xml +++ b/sdk/storage/azure-storage-stress/pom.xml @@ -19,8 +19,23 @@ - + + + 1.58.0 + + + + io.opentelemetry + opentelemetry-bom + ${opentelemetry.version} + pom + import + + + + ch.qos.logback @@ -59,6 +74,11 @@ opentelemetry-logback-appender-1.0 2.24.0-alpha + + io.opentelemetry + opentelemetry-api-incubator + 1.58.0-alpha + com.azure azure-core-metrics-opentelemetry diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java index 4bf6e523eae1..b633479100b1 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java @@ -124,7 +124,7 @@ public String getDescription() { Cpu.registerObservers(otel); MemoryPools.registerObservers(otel); Threads.registerObservers(otel); - GarbageCollector.registerObservers(otel); + GarbageCollector.registerObservers(otel, true); OpenTelemetryAppender.install(otel); return otel; }