diff --git a/sdk/storage/azure-storage-blob-stress/pom.xml b/sdk/storage/azure-storage-blob-stress/pom.xml
index f214138d3f8b..c22d4d2e4096 100644
--- a/sdk/storage/azure-storage-blob-stress/pom.xml
+++ b/sdk/storage/azure-storage-blob-stress/pom.xml
@@ -19,8 +19,23 @@
-
+
+
+ 1.58.0
+
+
+
+ io.opentelemetry
+ opentelemetry-bom
+ ${opentelemetry.version}
+ pom
+ import
+
+
+
+
ch.qos.logback
@@ -64,6 +79,11 @@
opentelemetry-logback-appender-1.0
2.24.0-alpha
+
+ io.opentelemetry
+ opentelemetry-api-incubator
+ 1.58.0-alpha
+
com.azure
diff --git a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml
index 3a8920a67b35..9e8c1bad70f7 100644
--- a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml
+++ b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml
@@ -5,7 +5,7 @@ matrix:
testScenario: downloadtofile
sync: true
sizeBytes: 1024
- downloadFaults: true
+ downloadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -14,7 +14,7 @@ matrix:
testScenario: downloadtofile
sync: false
sizeBytes: 1024
- downloadFaults: true
+ downloadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -23,7 +23,7 @@ matrix:
testScenario: downloadtofile
sync: true
sizeBytes: "52428800"
- downloadFaults: true
+ downloadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -32,7 +32,7 @@ matrix:
testScenario: downloadstream
sync: true
sizeBytes: 1024
- downloadFaults: true
+ downloadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -41,7 +41,7 @@ matrix:
testScenario: downloadstream
sync: false
sizeBytes: 1024
- downloadFaults: true
+ downloadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -50,7 +50,7 @@ matrix:
testScenario: downloadstream
sync: true
sizeBytes: "52428800"
- downloadFaults: true
+ downloadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -59,7 +59,7 @@ matrix:
testScenario: downloadcontent
sync: true
sizeBytes: 1024
- downloadFaults: true
+ downloadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -68,7 +68,7 @@ matrix:
testScenario: downloadcontent
sync: true
sizeBytes: "52428800"
- downloadFaults: true
+ downloadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -77,7 +77,7 @@ matrix:
testScenario: openinputstream
sync: true
sizeBytes: 1024
- downloadFaults: true
+ downloadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -86,7 +86,7 @@ matrix:
testScenario: openinputstream
sync: true
sizeBytes: "52428800"
- downloadFaults: true
+ downloadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -95,7 +95,7 @@ matrix:
testScenario: openseekablebytechannelread
sync: true
sizeBytes: 1024
- downloadFaults: true
+ downloadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -104,7 +104,7 @@ matrix:
testScenario: openseekablebytechannelread
sync: true
sizeBytes: "52428800"
- downloadFaults: true
+ downloadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -113,7 +113,7 @@ matrix:
testScenario: appendblock
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -122,7 +122,7 @@ matrix:
testScenario: appendblock
sync: false
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -131,7 +131,7 @@ matrix:
testScenario: appendblock
sync: true
sizeBytes: "26214400"
- uploadFaults: true
+ uploadFaults: false
durationMin: 30
imageBuildDir: "../../.."
@@ -140,7 +140,7 @@ matrix:
testScenario: appendbloboutputstream
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -149,7 +149,7 @@ matrix:
testScenario: appendbloboutputstream
sync: true
sizeBytes: "10240"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -158,7 +158,7 @@ matrix:
testScenario: blockblobupload
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -167,7 +167,7 @@ matrix:
testScenario: blockblobupload
sync: true
sizeBytes: "26214400"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -176,7 +176,7 @@ matrix:
testScenario: blockbloboutputstream
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -185,7 +185,7 @@ matrix:
testScenario: blockbloboutputstream
sync: true
sizeBytes: "26214400"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -194,7 +194,7 @@ matrix:
testScenario: commitblocklist
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -203,7 +203,7 @@ matrix:
testScenario: commitblocklist
sync: true
sizeBytes: "26214400"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -212,7 +212,7 @@ matrix:
testScenario: openseekablebytechannelwrite
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 10
imageBuildDir: "../../.."
@@ -221,7 +221,7 @@ matrix:
testScenario: openseekablebytechannelwrite
sync: true
sizeBytes: "52428800"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -230,7 +230,7 @@ matrix:
testScenario: stageblock
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -239,7 +239,7 @@ matrix:
testScenario: stageblock
sync: true
sizeBytes: "26214400"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -248,7 +248,7 @@ matrix:
testScenario: pagebloboutputstream
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -257,7 +257,7 @@ matrix:
testScenario: pagebloboutputstream
sync: true
sizeBytes: "10240"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -266,7 +266,7 @@ matrix:
testScenario: uploadpages
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -275,7 +275,7 @@ matrix:
testScenario: uploadpages
sync: true
sizeBytes: "4194304"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -284,7 +284,7 @@ matrix:
testScenario: upload
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -293,7 +293,7 @@ matrix:
testScenario: upload
sync: false
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -302,7 +302,7 @@ matrix:
testScenario: upload
sync: true
sizeBytes: "52428800"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
@@ -311,7 +311,7 @@ matrix:
testScenario: uploadfromfile
sync: true
sizeBytes: 1024
- uploadFaults: true
+ uploadFaults: false
durationMin: 25
imageBuildDir: "../../.."
@@ -320,6 +320,249 @@ matrix:
testScenario: uploadfromfile
sync: true
sizeBytes: "52428800"
- uploadFaults: true
+ uploadFaults: false
durationMin: 60
imageBuildDir: "../../.."
+
+ # --- Content validation (ContentValidation*; default CRC64). Scenario keys are short: K8s label
+ # testInstance = "{Scenario}-{BaseName}-{revision}" must be <= 63 chars (see stress-test-job.yaml).
+
+ cv-appendblock-sm:
+ testScenario: contentvalidationappendblock
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-appendblock-lg:
+ testScenario: contentvalidationappendblock
+ sync: true
+ sizeBytes: "26214400"
+ uploadFaults: false
+ durationMin: 30
+ imageBuildDir: "../../.."
+
+ cv-appendblock-async-sm:
+ testScenario: contentvalidationappendblock
+ sync: false
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-appendblock-async-lg:
+ testScenario: contentvalidationappendblock
+ sync: false
+ sizeBytes: "26214400"
+ uploadFaults: false
+ durationMin: 30
+ imageBuildDir: "../../.."
+
+ cv-appendbloboutputstream-sm:
+ testScenario: contentvalidationappendbloboutputstream
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-appendbloboutputstream-lg:
+ testScenario: contentvalidationappendbloboutputstream
+ sync: true
+ sizeBytes: "10240"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-blockblobupload-sm:
+ testScenario: contentvalidationblockblobupload
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-blockblobupload-lg:
+ testScenario: contentvalidationblockblobupload
+ sync: true
+ sizeBytes: "26214400"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-blockbloboutputstream-sm:
+ testScenario: contentvalidationblockbloboutputstream
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-blockbloboutputstream-lg:
+ testScenario: contentvalidationblockbloboutputstream
+ sync: true
+ sizeBytes: "26214400"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-seekablebytechannelwrite-sm:
+ testScenario: contentvalidationseekablebytechannelwrite
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 10
+ imageBuildDir: "../../.."
+
+ cv-seekablebytechannelwrite-lg:
+ testScenario: contentvalidationseekablebytechannelwrite
+ sync: true
+ sizeBytes: "52428800"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-stageblock-sm:
+ testScenario: contentvalidationstageblock
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-stageblock-lg:
+ testScenario: contentvalidationstageblock
+ sync: true
+ sizeBytes: "26214400"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-stageblock-async-sm:
+ testScenario: contentvalidationstageblock
+ sync: false
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-stageblock-async-lg:
+ testScenario: contentvalidationstageblock
+ sync: false
+ sizeBytes: "26214400"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-pagebloboutputstream-sm:
+ testScenario: contentvalidationpagebloboutputstream
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-pagebloboutputstream-lg:
+ testScenario: contentvalidationpagebloboutputstream
+ sync: true
+ sizeBytes: "10240"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-uploadpages-sm:
+ testScenario: contentvalidationuploadpages
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-uploadpages-lg:
+ testScenario: contentvalidationuploadpages
+ sync: true
+ sizeBytes: "4194304"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-uploadpages-async-sm:
+ testScenario: contentvalidationuploadpages
+ sync: false
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-uploadpages-async-lg:
+ testScenario: contentvalidationuploadpages
+ sync: false
+ sizeBytes: "4194304"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-upload-sm:
+ testScenario: contentvalidationupload
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-upload-lg:
+ testScenario: contentvalidationupload
+ sync: true
+ sizeBytes: "52428800"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-upload-async-sm:
+ testScenario: contentvalidationupload
+ sync: false
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-upload-async-lg:
+ testScenario: contentvalidationupload
+ sync: false
+ sizeBytes: "52428800"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-uploadfromfile-sm:
+ testScenario: contentvalidationuploadfromfile
+ sync: true
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-uploadfromfile-lg:
+ testScenario: contentvalidationuploadfromfile
+ sync: true
+ sizeBytes: "52428800"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
+
+ cv-uploadfromfile-async-sm:
+ testScenario: contentvalidationuploadfromfile
+ sync: false
+ sizeBytes: 1024
+ uploadFaults: false
+ durationMin: 25
+ imageBuildDir: "../../.."
+
+ cv-uploadfromfile-async-lg:
+ testScenario: contentvalidationuploadfromfile
+ sync: false
+ sizeBytes: "52428800"
+ uploadFaults: false
+ durationMin: 60
+ imageBuildDir: "../../.."
\ No newline at end of file
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java
index e38bd16791ca..3f3219930596 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java
@@ -15,6 +15,16 @@ public static void main(String[] args) {
BlockBlobOutputStream.class,
BlockBlobUpload.class,
CommitBlockList.class,
+ ContentValidationAppendBlobOutputStream.class,
+ ContentValidationAppendBlock.class,
+ ContentValidationBlockBlobOutputStream.class,
+ ContentValidationBlockBlobUpload.class,
+ ContentValidationPageBlobOutputStream.class,
+ ContentValidationStageBlock.class,
+ ContentValidationSeekableByteChannelWrite.class,
+ ContentValidationUpload.class,
+ ContentValidationUploadFromFile.class,
+ ContentValidationUploadPages.class,
DownloadToFile.class,
DownloadStream.class,
DownloadContent.class,
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java
index 310c7618e5d9..5e63058a4ab5 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java
@@ -1,6 +1,3 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
package com.azure.storage.blob.stress;
import com.azure.core.util.Context;
@@ -11,7 +8,6 @@
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.blob.stress.utils.OriginalContent;
-import com.azure.storage.common.Utility;
import com.azure.storage.stress.CrcInputStream;
import com.azure.storage.stress.StorageStressOptions;
import reactor.core.publisher.Flux;
@@ -19,10 +15,17 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
public class CommitBlockList extends BlobScenarioBase {
+ // Per-operation ceilings. Must be smaller than the Azure LB idle timeout (~4 min)
+ // so that a stalled request fails fast rather than hangs the whole test run.
+ private static final Duration STAGE_TIMEOUT = Duration.ofSeconds(120);
+ private static final Duration COMMIT_TIMEOUT = Duration.ofSeconds(30);
+ private static final Duration CHECK_MATCH_TIMEOUT = Duration.ofSeconds(30);
+
private final OriginalContent originalContent = new OriginalContent();
private final BlobClient syncClient;
private final BlobAsyncClient asyncClient;
@@ -45,14 +48,11 @@ protected void runInternal(Context span) throws Exception {
String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString()
.getBytes(StandardCharsets.UTF_8));
try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
- // First perform non-faulted stage block to send data to the service
- blockBlobClientNoFault.stageBlockWithResponse(blockId, inputStream, options.getSize(), null, null, null,
- span);
- // Then perform faulted commit block list to commit the block
+ blockBlobClientNoFault.stageBlockWithResponse(blockId, inputStream, options.getSize(), null, null,
+ STAGE_TIMEOUT, span);
blockBlobClient.commitBlockListWithResponse(
- new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)), null, span);
- // Confirm the CRC matches for the uploaded input stream
- originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)), COMMIT_TIMEOUT, span);
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block(CHECK_MATCH_TIMEOUT);
}
}
@@ -64,13 +64,14 @@ protected Mono runInternalAsync(Context span) {
.getBytes(StandardCharsets.UTF_8));
Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())
.convertStreamToByteBuffer();
- // First perform non-faulted stage block to send data to the service
- return blockBlobAsyncClientNoFault.stageBlockWithResponse(blockId, byteBufferFlux, options.getSize(), null, null)
- // Then perform faulted commit block list to commit the block
- .then(blockBlobAsyncClient.commitBlockListWithResponse(
- new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId))))
- // Confirm the CRC matches for the uploaded byte buffer flux
- .then(originalContent.checkMatch(byteBufferFlux, span));
+ return blockBlobAsyncClientNoFault
+ .stageBlockWithResponse(blockId, byteBufferFlux, options.getSize(), null, null)
+ .timeout(STAGE_TIMEOUT)
+ .then(blockBlobAsyncClient
+ .commitBlockListWithResponse(
+ new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)))
+ .timeout(COMMIT_TIMEOUT))
+ .then(originalContent.checkMatch(byteBufferFlux, span).timeout(CHECK_MATCH_TIMEOUT));
}
@Override
@@ -83,4 +84,4 @@ public Mono cleanupAsync() {
return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
-}
+}
\ No newline at end of file
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java
new file mode 100644
index 000000000000..b2b9d1845fa4
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java
@@ -0,0 +1,88 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.options.AppendBlobOutputStreamOptions;
+import com.azure.storage.blob.specialized.AppendBlobClient;
+import com.azure.storage.blob.specialized.BlobOutputStream;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+
+import static com.azure.core.util.FluxUtil.monoError;
+
+/**
+ * Append blob output stream with {@link AppendBlobOutputStreamOptions#setContentValidationAlgorithm} (sync only).
+ */
+public class ContentValidationAppendBlobOutputStream extends BlobScenarioBase {
+ private static final ClientLogger LOGGER = new ClientLogger(ContentValidationAppendBlobOutputStream.class);
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+ /** Separate blob used to upload reference content for {@link OriginalContent} checksum (block blob). */
+ private final BlobAsyncClient tempSetupBlobClient;
+
+ public ContentValidationAppendBlobOutputStream(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ String tempBlobName = generateBlobName();
+
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName);
+ }
+
+ @Override
+ protected void runInternal(Context span) throws IOException {
+ AppendBlobClient appendBlobClient = syncClient.getAppendBlobClient();
+ // Reset the append blob at the start of each iteration. The boolean overload
+ // getBlobOutputStream(true) does this implicitly via create(true); the options overload
+ // does not, so we replicate that behavior here. Without this reset, fault-injection
+ // sequences that commit a block server-side but drop the response leave the cached
+ // appendPosition stale, causing subsequent retries to fail with 412 AppendPositionConditionNotMet,
+ // which combined with non-retriable Crc64Mismatch on truncated-body faults collapses the pass rate.
+ appendBlobClient.create(true);
+
+ AppendBlobOutputStreamOptions streamOptions = new AppendBlobOutputStreamOptions()
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm());
+
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize());
+ BlobOutputStream outputStream = appendBlobClient.getBlobOutputStream(streamOptions)) {
+ byte[] buffer = new byte[4096];
+ int bytesRead;
+
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+
+ outputStream.close();
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client"));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync()
+ .then(asyncNoFaultClient.getAppendBlobAsyncClient().create())
+ .then(originalContent.setupBlob(tempSetupBlobClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.deleteIfExists()
+ .then(tempSetupBlobClient.deleteIfExists())
+ .then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java
new file mode 100644
index 000000000000..bcbaf7681742
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java
@@ -0,0 +1,76 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.options.AppendBlobAppendBlockOptions;
+import com.azure.storage.blob.specialized.AppendBlobAsyncClient;
+import com.azure.storage.blob.specialized.AppendBlobClient;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Append block with {@link AppendBlobAppendBlockOptions#setContentValidationAlgorithm}.
+ */
+public class ContentValidationAppendBlock extends BlobScenarioBase {
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+ private final BlobAsyncClient tempSetupBlobClient;
+
+ public ContentValidationAppendBlock(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ String tempBlobName = generateBlobName();
+
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName);
+ this.tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName);
+ }
+
+ @Override
+ protected void runInternal(Context span) {
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
+ AppendBlobClient appendBlobClient = syncClient.getAppendBlobClient();
+ appendBlobClient.appendBlockWithResponse(
+ new AppendBlobAppendBlockOptions(inputStream, options.getSize())
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()),
+ null, span);
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ AppendBlobAsyncClient appendBlobAsyncClient = asyncClient.getAppendBlobAsyncClient();
+ Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())
+ .convertStreamToByteBuffer();
+ return appendBlobAsyncClient.appendBlockWithResponse(
+ new AppendBlobAppendBlockOptions(byteBufferFlux, options.getSize())
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()))
+ .then(originalContent.checkMatch(byteBufferFlux, span));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(asyncNoFaultClient.getAppendBlobAsyncClient().create())
+ .then(originalContent.setupBlob(tempSetupBlobClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.getAppendBlobAsyncClient().deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then(tempSetupBlobClient.deleteIfExists())
+ .then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java
new file mode 100644
index 000000000000..175b648002a5
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java
@@ -0,0 +1,75 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.models.ParallelTransferOptions;
+import com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
+import com.azure.storage.blob.specialized.BlobOutputStream;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+
+import static com.azure.core.util.FluxUtil.monoError;
+
+/**
+ * Block blob output stream with {@link BlockBlobOutputStreamOptions#setContentValidationAlgorithm} (sync only).
+ */
+public class ContentValidationBlockBlobOutputStream extends BlobScenarioBase {
+ private static final ClientLogger LOGGER = new ClientLogger(ContentValidationBlockBlobOutputStream.class);
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+ private final ParallelTransferOptions parallelTransferOptions;
+
+ public ContentValidationBlockBlobOutputStream(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.parallelTransferOptions = new ParallelTransferOptions().setMaxConcurrency(options.getMaxConcurrency());
+ }
+
+ @Override
+ protected void runInternal(Context span) throws IOException {
+ BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient();
+ BlockBlobOutputStreamOptions streamOptions = new BlockBlobOutputStreamOptions()
+ .setParallelTransferOptions(parallelTransferOptions)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm());
+
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize());
+ BlobOutputStream outputStream = blockBlobClient.getBlobOutputStream(streamOptions, span)) {
+ byte[] buffer = new byte[4096];
+ int bytesRead;
+
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+
+ outputStream.close();
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client"));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.deleteIfExists().then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java
new file mode 100644
index 000000000000..e86a410e1945
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java
@@ -0,0 +1,70 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Single-shot block blob upload with request content validation
+ * ({@link BlockBlobSimpleUploadOptions#setContentValidationAlgorithm}).
+ */
+public class ContentValidationBlockBlobUpload extends BlobScenarioBase {
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+
+ public ContentValidationBlockBlobUpload(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName);
+ }
+
+ @Override
+ protected void runInternal(Context span) {
+ BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient();
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
+ blockBlobClient.uploadWithResponse(
+ new BlockBlobSimpleUploadOptions(inputStream, options.getSize())
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()),
+ null, span);
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())
+ .convertStreamToByteBuffer();
+ BlockBlobAsyncClient blockBlobAsyncClient = asyncClient.getBlockBlobAsyncClient();
+ return blockBlobAsyncClient.uploadWithResponse(
+ new BlockBlobSimpleUploadOptions(byteBufferFlux, options.getSize())
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()))
+ .then(originalContent.checkMatch(byteBufferFlux, span));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.deleteIfExists()
+ .then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java
new file mode 100644
index 000000000000..b49a0305f12f
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java
@@ -0,0 +1,105 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.models.PageRange;
+import com.azure.storage.blob.options.PageBlobOutputStreamOptions;
+import com.azure.storage.blob.specialized.BlobOutputStream;
+import com.azure.storage.blob.specialized.PageBlobAsyncClient;
+import com.azure.storage.blob.specialized.PageBlobClient;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static com.azure.core.util.FluxUtil.monoError;
+
+/**
+ * Page blob output stream with {@link PageBlobOutputStreamOptions#setContentValidationAlgorithm} (sync only).
+ */
+public class ContentValidationPageBlobOutputStream extends PageBlobScenarioBase {
+ private static final ClientLogger LOGGER = new ClientLogger(ContentValidationPageBlobOutputStream.class);
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+ /** Page blob used only to seed {@link OriginalContent} (same pattern as {@link PageBlobOutputStream}). */
+ private final PageBlobAsyncClient tempSetupPageBlobClient;
+
+ public ContentValidationPageBlobOutputStream(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ String tempBlobName = generateBlobName();
+
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ BlobAsyncClient tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName);
+ this.tempSetupPageBlobClient = tempSetupBlobClient.getPageBlobAsyncClient();
+ }
+
+ @Override
+ protected void runInternal(Context span) throws IOException {
+ PageBlobClient pageBlobClient = syncClient.getPageBlobClient();
+ PageBlobOutputStreamOptions streamOptions = new PageBlobOutputStreamOptions(
+ new PageRange().setStart(0).setEnd(options.getSize() - 1))
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm());
+
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize());
+ BlobOutputStream outputStream = pageBlobClient.getBlobOutputStream(streamOptions)) {
+ ByteArrayOutputStream bufferStream = new ByteArrayOutputStream();
+ byte[] buffer = new byte[512];
+ int bytesRead;
+
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ // Always accumulate into bufferStream to avoid dropping or reordering bytes
+ bufferStream.write(buffer, 0, bytesRead);
+ // Flush all full 512-byte pages from the accumulator
+ if (bufferStream.size() >= buffer.length) {
+ byte[] toWrite = bufferStream.toByteArray();
+ int length = toWrite.length - (toWrite.length % buffer.length);
+ if (length > 0) {
+ outputStream.write(toWrite, 0, length);
+ bufferStream.reset();
+ // Keep any remaining partial page bytes in the accumulator
+ bufferStream.write(toWrite, length, toWrite.length - length);
+ }
+ }
+ }
+ // For page blobs, total content size must be a multiple of 512 bytes.
+ // Any remaining bytes here indicate misalignment and would result in silent truncation.
+ if (bufferStream.size() != 0) {
+ throw new IOException("Remaining bytes in buffer that do not align to 512-byte page size.");
+ }
+
+ outputStream.close();
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client"));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync()
+ .then(asyncNoFaultClient.getPageBlobAsyncClient().create(options.getSize()))
+ .then(tempSetupPageBlobClient.create(options.getSize()))
+ .then(originalContent.setupPageBlob(tempSetupPageBlobClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then(tempSetupPageBlobClient.deleteIfExists())
+ .then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java
new file mode 100644
index 000000000000..65afda98636f
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java
@@ -0,0 +1,83 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.options.BlockBlobSeekableByteChannelWriteOptions;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.common.implementation.StorageSeekableByteChannel;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static com.azure.core.util.FluxUtil.monoError;
+import static com.azure.storage.blob.options.BlockBlobSeekableByteChannelWriteOptions.WriteMode.OVERWRITE;
+
+/**
+ * Block-blob seekable byte channel write with {@link BlockBlobSeekableByteChannelWriteOptions#setContentValidationAlgorithm}.
+ * Matches {@link com.azure.storage.blob.BlobContentValidationUploadTests} seekable-channel scenarios (sync only).
+ */
+public class ContentValidationSeekableByteChannelWrite extends BlobScenarioBase {
+ private static final ClientLogger LOGGER = new ClientLogger(ContentValidationSeekableByteChannelWrite.class);
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+
+ public ContentValidationSeekableByteChannelWrite(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ }
+
+ @Override
+ protected void runInternal(Context span) throws IOException {
+ BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient();
+ BlockBlobSeekableByteChannelWriteOptions writeOptions = new BlockBlobSeekableByteChannelWriteOptions(OVERWRITE)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm());
+
+ try (CrcInputStream crcInput = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
+ Flux byteBufferFlux = crcInput.convertStreamToByteBuffer();
+ try (StorageSeekableByteChannel channel = (StorageSeekableByteChannel) blockBlobClient.openSeekableByteChannelWrite(
+ writeOptions)) {
+ Mono writeOperation = byteBufferFlux
+ .doOnNext(buffer -> {
+ try {
+ while (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ } catch (IOException e) {
+ throw LOGGER.logExceptionAsError(new RuntimeException(e));
+ }
+ }).then();
+ writeOperation.block();
+ channel.getWriteBehavior().commit(options.getSize());
+ }
+ originalContent.checkMatch(byteBufferFlux, span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ return monoError(LOGGER, new RuntimeException(
+ "openSeekableByteChannelWrite() does not exist on the async client"));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.deleteIfExists().then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java
new file mode 100644
index 000000000000..c95dd7e9a98a
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java
@@ -0,0 +1,89 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.BinaryData;
+import com.azure.core.util.Context;
+import com.azure.core.util.CoreUtils;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions;
+import com.azure.storage.blob.options.BlockBlobStageBlockOptions;
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+
+/**
+ * Stage block with request content validation on the faulted client, then commit via the non-faulted client.
+ */
+public class ContentValidationStageBlock extends BlobScenarioBase {
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncClient;
+ private final BlobClient syncNoFaultClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+
+ public ContentValidationStageBlock(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ this.syncNoFaultClient = getSyncContainerClientNoFault().getBlobClient(blobName);
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName);
+ }
+
+ @Override
+ protected void runInternal(Context span) {
+ BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient();
+ BlockBlobClient blockBlobClientNoFault = syncNoFaultClient.getBlockBlobClient();
+ String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString()
+ .getBytes(StandardCharsets.UTF_8));
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
+ BinaryData data = BinaryData.fromStream(inputStream, options.getSize());
+ blockBlobClient.stageBlockWithResponse(
+ new BlockBlobStageBlockOptions(blockId, data)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()),
+ null, span);
+ blockBlobClientNoFault.commitBlockListWithResponse(
+ new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)), null, span);
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ BlockBlobAsyncClient blockBlobAsyncClient = asyncClient.getBlockBlobAsyncClient();
+ BlockBlobAsyncClient blockBlobAsyncClientNoFault = asyncNoFaultClient.getBlockBlobAsyncClient();
+ String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString()
+ .getBytes(StandardCharsets.UTF_8));
+ Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())
+ .convertStreamToByteBuffer();
+ return BinaryData.fromFlux(byteBufferFlux, options.getSize(), false)
+ .flatMap(binaryData -> blockBlobAsyncClient.stageBlockWithResponse(
+ new BlockBlobStageBlockOptions(blockId, binaryData)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm())))
+ .then(blockBlobAsyncClientNoFault.commitBlockListWithResponse(
+ new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId))))
+ .then(originalContent.checkMatch(byteBufferFlux, span));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.deleteIfExists()
+ .then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java
new file mode 100644
index 000000000000..1e92d331d33e
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java
@@ -0,0 +1,26 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.storage.common.ContentValidationAlgorithm;
+import com.azure.storage.stress.StorageStressOptions;
+import com.beust.jcommander.Parameter;
+
+/**
+ * Options for stress scenarios that enable transactional request content validation on uploads
+ * (CRC64 / structured message). See {@link com.azure.storage.blob.BlobContentValidationUploadTests}.
+ */
+public class ContentValidationStressOptions extends StorageStressOptions {
+ /**
+ * Request checksum behavior for upload APIs. Use CRC64 or AUTO to exercise content validation.
+ * MD5 is not supported for uploadFromFile. NONE disables request validation.
+ */
+ @Parameter(names = { "--requestChecksumAlgorithm" },
+ description = "CRC64 (default), AUTO, NONE, or MD5 (not valid for upload-from-file scenarios)")
+ private ContentValidationAlgorithm requestChecksumAlgorithm = ContentValidationAlgorithm.CRC64;
+
+ public ContentValidationAlgorithm getContentValidationAlgorithm() {
+ return requestChecksumAlgorithm;
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java
new file mode 100644
index 000000000000..b2cc61909be3
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java
@@ -0,0 +1,70 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.models.ParallelTransferOptions;
+import com.azure.storage.blob.options.BlobParallelUploadOptions;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Parallel blob upload with {@link com.azure.storage.blob.options.BlobParallelUploadOptions#setContentValidationAlgorithm}
+ * enabled. Verifies the correctness of the upload request content via CRC (see {@code BlobContentValidationUploadTests}).
+ */
+public class ContentValidationUpload extends BlobScenarioBase {
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+ private final ParallelTransferOptions parallelTransferOptions;
+
+ public ContentValidationUpload(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName);
+ parallelTransferOptions = new ParallelTransferOptions()
+ .setMaxConcurrency(options.getMaxConcurrency())
+ .setMaxSingleUploadSizeLong(4 * 1024 * 1024L);
+ }
+
+ @Override
+ protected void runInternal(Context span) {
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
+ syncClient.uploadWithResponse(new BlobParallelUploadOptions(inputStream)
+ .setParallelTransferOptions(parallelTransferOptions)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), null, span);
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())
+ .convertStreamToByteBuffer();
+ return asyncClient.uploadWithResponse(new BlobParallelUploadOptions(byteBufferFlux)
+ .setParallelTransferOptions(parallelTransferOptions)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()))
+ .then(originalContent.checkMatch(byteBufferFlux, span));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.deleteIfExists()
+ .then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java
new file mode 100644
index 000000000000..a716563c1752
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java
@@ -0,0 +1,138 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.BinaryData;
+import com.azure.core.util.Context;
+import com.azure.core.util.CoreUtils;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.models.ParallelTransferOptions;
+import com.azure.storage.blob.options.BlobDownloadToFileOptions;
+import com.azure.storage.blob.options.BlobUploadFromFileOptions;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Mono;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.UUID;
+
+/**
+ * Upload from file with {@link BlobUploadFromFileOptions#setContentValidationAlgorithm}.
+ */
+public class ContentValidationUploadFromFile extends BlobScenarioBase {
+ private static final ClientLogger LOGGER = new ClientLogger(ContentValidationUploadFromFile.class);
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncClient;
+ private final BlobClient syncNoFaultClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+ private final ParallelTransferOptions parallelTransferOptions;
+
+ public ContentValidationUploadFromFile(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncNoFaultClient = getSyncContainerClientNoFault().getBlobClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName);
+ this.parallelTransferOptions = new ParallelTransferOptions()
+ .setMaxConcurrency(options.getMaxConcurrency())
+ .setMaxSingleUploadSizeLong(4 * 1024 * 1024L);
+ }
+
+ @Override
+ protected void runInternal(Context span) {
+ Path downloadPath = getTempPath("test");
+ Path uploadFilePath = null;
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
+ uploadFilePath = generateFile(inputStream);
+ downloadPath = downloadPath.resolve(CoreUtils.randomUuid() + ".txt");
+ syncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString())
+ .setParallelTransferOptions(parallelTransferOptions)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()),
+ null, span);
+ syncNoFaultClient.downloadToFileWithResponse(
+ new BlobDownloadToFileOptions(downloadPath.toString()), null, span);
+ originalContent.checkMatch(BinaryData.fromFile(downloadPath), span).block();
+ } finally {
+ deleteFile(downloadPath);
+ deleteFile(uploadFilePath);
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ Path downloadPath = getTempPath("test");
+ // This is written differently than the other runInternalAsync methods because uploadFromFile requires a file
+ // path, so we need to generate the temp file.
+ return Mono.using(
+ () -> new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()),
+ inputStream -> uploadAndVerifyAsync(inputStream, downloadPath, span),
+ CrcInputStream::close);
+ }
+
+ private Mono uploadAndVerifyAsync(CrcInputStream inputStream, Path downloadDir, Context span) {
+ Path uploadFilePath = generateFile(inputStream);
+ Path downloadFilePath = downloadDir.resolve(UUID.randomUUID() + ".txt");
+
+ return asyncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString())
+ .setParallelTransferOptions(parallelTransferOptions)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()))
+ .flatMap(ignored -> asyncNoFaultClient.downloadToFileWithResponse(
+ new BlobDownloadToFileOptions(downloadFilePath.toString())))
+ .flatMap(ignored -> originalContent.checkMatch(BinaryData.fromFile(downloadFilePath), span))
+ .doFinally(signal -> {
+ deleteFile(uploadFilePath);
+ deleteFile(downloadFilePath);
+ });
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.deleteIfExists()
+ .then(super.cleanupAsync());
+ }
+
+ private Path getTempPath(String prefix) {
+ try {
+ return Files.createTempDirectory(prefix);
+ } catch (IOException e) {
+ throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
+ }
+ }
+
+ private static void deleteFile(Path path) {
+ try {
+ Files.deleteIfExists(path);
+ } catch (Throwable e) {
+ LOGGER.atError()
+ .addKeyValue("path", path)
+ .log("failed to delete file", e);
+ }
+ }
+
+ private static Path generateFile(InputStream inputStream) {
+ try {
+ File file = Files.createTempFile(CoreUtils.randomUuid().toString(), ".txt").toFile();
+ file.deleteOnExit();
+ Files.copy(inputStream, file.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ return file.toPath();
+ } catch (IOException e) {
+ throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
+ }
+ }
+}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java
new file mode 100644
index 000000000000..0e3c9603fe1c
--- /dev/null
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java
@@ -0,0 +1,82 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.storage.blob.stress;
+
+import com.azure.core.util.Context;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.models.PageRange;
+import com.azure.storage.blob.options.PageBlobUploadPagesOptions;
+import com.azure.storage.blob.specialized.PageBlobAsyncClient;
+import com.azure.storage.blob.specialized.PageBlobClient;
+import com.azure.storage.blob.stress.utils.OriginalContent;
+import com.azure.storage.stress.CrcInputStream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Page blob upload pages with {@link PageBlobUploadPagesOptions#setContentValidationAlgorithm}.
+ */
+public class ContentValidationUploadPages extends PageBlobScenarioBase {
+ private final OriginalContent originalContent = new OriginalContent();
+ private final BlobClient syncClient;
+ private final BlobAsyncClient asyncClient;
+ private final BlobAsyncClient asyncNoFaultClient;
+ private final PageBlobAsyncClient tempSetupPageBlobClient;
+
+ public ContentValidationUploadPages(ContentValidationStressOptions options) {
+ super(options);
+ String blobName = generateBlobName();
+ String tempBlobName = generateBlobName();
+
+ this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName);
+ this.syncClient = getSyncContainerClient().getBlobClient(blobName);
+ this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName);
+ BlobAsyncClient tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName);
+ this.tempSetupPageBlobClient = tempSetupBlobClient.getPageBlobAsyncClient();
+ }
+
+ @Override
+ protected void runInternal(Context span) {
+ try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) {
+ PageBlobClient pageBlobClient = syncClient.getPageBlobClient();
+ PageRange range = new PageRange().setStart(0).setEnd(options.getSize() - 1);
+ pageBlobClient.uploadPagesWithResponse(
+ new PageBlobUploadPagesOptions(range, inputStream)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()),
+ null, span);
+ originalContent.checkMatch(inputStream.getContentInfo(), span).block();
+ }
+ }
+
+ @Override
+ protected Mono runInternalAsync(Context span) {
+ PageBlobAsyncClient pageBlobAsyncClient = asyncClient.getPageBlobAsyncClient();
+ Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())
+ .convertStreamToByteBuffer();
+ PageRange range = new PageRange().setStart(0).setEnd(options.getSize() - 1);
+ return pageBlobAsyncClient.uploadPagesWithResponse(
+ new PageBlobUploadPagesOptions(range, byteBufferFlux)
+ .setContentValidationAlgorithm(options.getContentValidationAlgorithm()))
+ .then(originalContent.checkMatch(byteBufferFlux, span));
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync()
+ .then(asyncNoFaultClient.getPageBlobAsyncClient().create(options.getSize()))
+ .then(tempSetupPageBlobClient.create(options.getSize()))
+ .then(originalContent.setupPageBlob(tempSetupPageBlobClient, options.getSize()));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then(tempSetupPageBlobClient.deleteIfExists())
+ .then(super.cleanupAsync());
+ }
+}
diff --git a/sdk/storage/azure-storage-stress/pom.xml b/sdk/storage/azure-storage-stress/pom.xml
index d8907fe67014..bdf518e233b8 100644
--- a/sdk/storage/azure-storage-stress/pom.xml
+++ b/sdk/storage/azure-storage-stress/pom.xml
@@ -19,8 +19,23 @@
-
+
+
+ 1.58.0
+
+
+
+ io.opentelemetry
+ opentelemetry-bom
+ ${opentelemetry.version}
+ pom
+ import
+
+
+
+
ch.qos.logback
@@ -59,6 +74,11 @@
opentelemetry-logback-appender-1.0
2.24.0-alpha
+
+ io.opentelemetry
+ opentelemetry-api-incubator
+ 1.58.0-alpha
+
com.azure
azure-core-metrics-opentelemetry
diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java
index 4bf6e523eae1..b633479100b1 100644
--- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java
+++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java
@@ -124,7 +124,7 @@ public String getDescription() {
Cpu.registerObservers(otel);
MemoryPools.registerObservers(otel);
Threads.registerObservers(otel);
- GarbageCollector.registerObservers(otel);
+ GarbageCollector.registerObservers(otel, true);
OpenTelemetryAppender.install(otel);
return otel;
}