From 96fb58fa955899efdbfc057356ef1c99179a3898 Mon Sep 17 00:00:00 2001 From: browndav Date: Wed, 14 Jan 2026 14:36:59 -0500 Subject: [PATCH 01/25] removed enableDeterministic --- .../storage/blob/stress/BlobScenarioBase.java | 64 ++++++++++++++++++- .../blob/stress/PageBlobScenarioBase.java | 7 +- .../datalake/stress/DataLakeScenarioBase.java | 60 ++++++++++++++++- .../file/share/stress/ShareScenarioBase.java | 60 ++++++++++++++++- 4 files changed, 184 insertions(+), 7 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index 0e68fb7dc864..e35ca0c3bbcc 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -6,6 +6,7 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; @@ -22,9 +23,11 @@ import java.time.Instant; import java.util.UUID; +import java.time.Duration; public abstract class BlobScenarioBase extends PerfStressTest { private static final String CONTAINER_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(BlobScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final BlobContainerClient syncContainerClient; private final BlobContainerAsyncClient asyncContainerClient; @@ -72,8 +75,57 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); + return cleanupContainerWithRetry() + .then(super.globalCleanupAsync()) + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Container cleanup failed"); + return super.globalCleanupAsync(); + }); + } + + /** + * Enhanced cleanup with timeout and retry logic to ensure containers are properly destroyed. + */ + private Mono cleanupContainerWithRetry() { return asyncNoFaultContainerClient.deleteIfExists() - .then(super.globalCleanupAsync()); + .then() // Convert Mono to Mono + .timeout(Duration.ofSeconds(30)) + .retry(3) + .onErrorResume(error -> { + // If container deletion fails, try to delete all blobs first then retry container deletion + return deleteAllBlobsInContainer() + .then(asyncNoFaultContainerClient.deleteIfExists()) + .then() // Convert Mono to Mono + .timeout(Duration.ofSeconds(30)) + .onErrorResume(finalError -> { + // Log the error but don't fail the test + LOGGER.atWarning() + .addKeyValue("error", finalError.getMessage()) + .log("Final container cleanup failed after retries"); + return Mono.empty(); + }); + }); + } + + /** + * Delete all blobs in the container to help with cleanup. + */ + private Mono deleteAllBlobsInContainer() { + return asyncNoFaultContainerClient.listBlobs() + .flatMap(blobItem -> + asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) + .then() + .timeout(Duration.ofSeconds(60)) + .onErrorResume(error -> { + // Log but continue - some blobs might have been deleted + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Blob cleanup partially failed"); + return Mono.empty(); + }); } @SuppressWarnings("try") @@ -86,7 +138,15 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + .retry(3) // Retry failed operations up to 3 times to handle transient faults + .onErrorMap(e -> { + // Log the error for debugging but let legitimate failures propagate + LOGGER.atError() + .addKeyValue("error", e.getMessage()) + .addKeyValue("errorType", e.getClass().getSimpleName()) + .log("Test operation failed after retries"); + return e; + }); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java index b8b4a5115473..19a7dc5373e5 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java @@ -86,7 +86,12 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + .retry(3) // Retry failed operations up to 3 times to handle transient faults + .onErrorMap(e -> { + // Log the error for debugging but let legitimate failures propagate + System.err.println("Test operation failed after retries: " + e.getMessage()); + return e; + }); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index e64ff0684c01..8681f0d858f7 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -6,6 +6,7 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; @@ -25,6 +26,7 @@ public abstract class DataLakeScenarioBase extends PerfStressTest { private static final String FILE_SYSTEM_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(DataLakeScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final DataLakeFileSystemClient syncFileSystemClient; private final DataLakeFileSystemAsyncClient asyncFileSystemClient; @@ -72,8 +74,57 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); + return cleanupFileSystemWithRetry() + .then(super.globalCleanupAsync()) + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("FileSystem cleanup failed"); + return super.globalCleanupAsync(); + }); + } + + /** + * Enhanced cleanup with timeout and retry logic to ensure file systems are properly destroyed. + */ + private Mono cleanupFileSystemWithRetry() { return asyncNoFaultFileSystemClient.deleteIfExists() - .then(super.globalCleanupAsync()); + .then() // Convert Mono to Mono + .timeout(java.time.Duration.ofSeconds(30)) + .retry(3) + .onErrorResume(error -> { + // If file system deletion fails, try to delete all files first then retry + return deleteAllFilesInFileSystem() + .then(asyncNoFaultFileSystemClient.deleteIfExists()) + .then() // Convert Mono to Mono + .timeout(java.time.Duration.ofSeconds(30)) + .onErrorResume(finalError -> { + // Log the error but don't fail the test + LOGGER.atWarning() + .addKeyValue("error", finalError.getMessage()) + .log("Final file system cleanup failed after retries"); + return Mono.empty(); + }); + }); + } + + /** + * Delete all files in the file system to help with cleanup. + */ + private Mono deleteAllFilesInFileSystem() { + return asyncNoFaultFileSystemClient.listPaths() + .flatMap(pathItem -> + asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName()).delete()) + .then() + .timeout(java.time.Duration.ofSeconds(60)) + .onErrorResume(error -> { + // Log but continue - some files might have been deleted + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("File cleanup partially failed"); + return Mono.empty(); + }); } @SuppressWarnings("try") @@ -86,7 +137,12 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + .retry(3) // Retry failed operations up to 3 times to handle transient faults + .onErrorMap(e -> { + // Log the error for debugging but let legitimate failures propagate + System.err.println("DataLake test operation failed after retries: " + e.getMessage()); + return e; + }); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 870f0d7eb755..6f18eb176ff2 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -6,6 +6,7 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; @@ -26,6 +27,7 @@ public abstract class ShareScenarioBase extends PerfStressTest { private static final String SHARE_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(ShareScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final ShareClient syncShareClient; private final ShareAsyncClient asyncShareClient; @@ -74,8 +76,57 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); + return cleanupShareWithRetry() + .then(super.globalCleanupAsync()) + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Share cleanup failed"); + return super.globalCleanupAsync(); + }); + } + + /** + * Enhanced cleanup with timeout and retry logic to ensure shares are properly destroyed. + */ + private Mono cleanupShareWithRetry() { return asyncNoFaultShareClient.deleteIfExists() - .then(super.globalCleanupAsync()); + .then() // Convert Mono to Mono + .timeout(java.time.Duration.ofSeconds(30)) + .retry(3) + .onErrorResume(error -> { + // If share deletion fails, try to delete all files first then retry + return deleteAllFilesInShare() + .then(asyncNoFaultShareClient.deleteIfExists()) + .then() // Convert Mono to Mono + .timeout(java.time.Duration.ofSeconds(30)) + .onErrorResume(finalError -> { + // Log the error but don't fail the test + LOGGER.atWarning() + .addKeyValue("error", finalError.getMessage()) + .log("Final share cleanup failed after retries"); + return Mono.empty(); + }); + }); + } + + /** + * Delete all files in the share to help with cleanup. + */ + private Mono deleteAllFilesInShare() { + return asyncNoFaultShareClient.getDirectoryClient("").listFilesAndDirectories() + .flatMap(fileRef -> + asyncNoFaultShareClient.getFileClient(fileRef.getName()).delete()) + .then() + .timeout(java.time.Duration.ofSeconds(60)) + .onErrorResume(error -> { + // Log but continue - some files might have been deleted + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("File cleanup partially failed"); + return Mono.empty(); + }); } @SuppressWarnings("try") @@ -88,7 +139,12 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + .retry(3) // Retry failed operations up to 3 times to handle transient faults + .onErrorMap(e -> { + // Log the error for debugging but let legitimate failures propagate + System.err.println("Share test operation failed after retries: " + e.getMessage()); + return e; + }); } protected abstract void runInternal(Context context) throws Exception; From 0db7e677103ca1dd775c581a53ce4bdd97ff9db4 Mon Sep 17 00:00:00 2001 From: browndav Date: Thu, 5 Mar 2026 13:27:10 -0500 Subject: [PATCH 02/25] change .delete() to .deleteIfExists() --- .../blob/stress/AppendBlobOutputStream.java | 2 +- .../storage/blob/stress/AppendBlock.java | 5 +- .../storage/blob/stress/BlobScenarioBase.java | 3 +- .../blob/stress/BlockBlobOutputStream.java | 2 +- .../storage/blob/stress/BlockBlobUpload.java | 2 +- .../storage/blob/stress/CommitBlockList.java | 2 +- .../blob/stress/PageBlobOutputStream.java | 5 +- .../blob/stress/PageBlobScenarioBase.java | 58 ++++++++++++++++++- .../azure/storage/blob/stress/StageBlock.java | 2 +- .../com/azure/storage/blob/stress/Upload.java | 2 +- .../storage/blob/stress/UploadPages.java | 5 +- .../datalake/stress/DataLakeScenarioBase.java | 3 +- .../storage/file/datalake/stress/Upload.java | 2 +- .../file/datalake/stress/UploadFromFile.java | 2 +- .../file/share/stress/ShareScenarioBase.java | 3 +- .../file/share/stress/UploadFromFile.java | 2 +- 16 files changed, 79 insertions(+), 21 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java index 9dd302492c3c..7543e7c86658 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java @@ -70,7 +70,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java index 89aefd7fc131..0c59b7a2851d 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java @@ -62,8 +62,9 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.getAppendBlobAsyncClient().delete() - .then(tempSetupBlobClient.delete()) + return asyncNoFaultClient.getAppendBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupBlobClient.deleteIfExists()) .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index e35ca0c3bbcc..f821ea330c61 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -138,7 +138,8 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .retry(3) // Retry failed operations up to 3 times to handle transient faults + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) .onErrorMap(e -> { // Log the error for debugging but let legitimate failures propagate LOGGER.atError() diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java index 6d6dd563cb25..780ef149ece9 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java @@ -70,7 +70,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java index 738f1abc9c47..efef9f68a4c4 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java @@ -56,7 +56,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java index f19801ee61f2..310c7618e5d9 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java @@ -80,7 +80,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java index 1226c7f0d624..121c78f0333d 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java @@ -93,8 +93,9 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.getPageBlobAsyncClient().delete() - .then(tempSetupPageBlobClient.delete()) + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java index 19a7dc5373e5..cdc7539e8ecf 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java @@ -6,6 +6,7 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; @@ -20,12 +21,13 @@ import com.azure.storage.stress.StorageStressOptions; import reactor.core.publisher.Mono; +import java.time.Duration; import java.time.Instant; -import java.util.Objects; import java.util.UUID; public abstract class PageBlobScenarioBase extends PerfStressTest { private static final String CONTAINER_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(PageBlobScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final BlobServiceClient noFaultServiceClient; private final BlobContainerClient syncContainerClient; @@ -72,8 +74,57 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); + return cleanupContainerWithRetry() + .then(super.globalCleanupAsync()) + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Container cleanup failed"); + return super.globalCleanupAsync(); + }); + } + + /** + * Enhanced cleanup with timeout and retry logic to ensure containers are properly destroyed. + */ + private Mono cleanupContainerWithRetry() { return asyncNoFaultContainerClient.deleteIfExists() - .then(super.globalCleanupAsync()); + .then() // Convert Mono to Mono + .timeout(Duration.ofSeconds(30)) + .retry(3) + .onErrorResume(error -> { + // If container deletion fails, try to delete all blobs first then retry container deletion + return deleteAllBlobsInContainer() + .then(asyncNoFaultContainerClient.deleteIfExists()) + .then() // Convert Mono to Mono + .timeout(Duration.ofSeconds(30)) + .onErrorResume(finalError -> { + // Log the error but don't fail the test + LOGGER.atWarning() + .addKeyValue("error", finalError.getMessage()) + .log("Final container cleanup failed after retries"); + return Mono.empty(); + }); + }); + } + + /** + * Delete all blobs in the container to help with cleanup. + */ + private Mono deleteAllBlobsInContainer() { + return asyncNoFaultContainerClient.listBlobs() + .flatMap(blobItem -> + asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) + .then() + .timeout(Duration.ofSeconds(60)) + .onErrorResume(error -> { + // Log but continue - some blobs might have been deleted + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Blob cleanup partially failed"); + return Mono.empty(); + }); } @SuppressWarnings("try") @@ -86,7 +137,8 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .retry(3) // Retry failed operations up to 3 times to handle transient faults + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) .onErrorMap(e -> { // Log the error for debugging but let legitimate failures propagate System.err.println("Test operation failed after retries: " + e.getMessage()); diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java index f30b4aeeeab6..b39efef299b7 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java @@ -79,7 +79,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java index 0393fae1e57f..d2af3a9cf70e 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java @@ -61,7 +61,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java index c221742dc651..13e6557e888c 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java @@ -69,8 +69,9 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.getPageBlobAsyncClient().delete() - .then(tempSetupPageBlobClient.delete()) + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index 8681f0d858f7..a34136644a7f 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -137,7 +137,8 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .retry(3) // Retry failed operations up to 3 times to handle transient faults + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) .onErrorMap(e -> { // Log the error for debugging but let legitimate failures propagate System.err.println("DataLake test operation failed after retries: " + e.getMessage()); diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java index dd6a3221bfb6..123f97ef49b1 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java @@ -60,7 +60,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java index 15a64abb228f..b4a12e52888c 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java @@ -87,7 +87,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 6f18eb176ff2..b1ff827f59ac 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -139,7 +139,8 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .retry(3) // Retry failed operations up to 3 times to handle transient faults + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) .onErrorMap(e -> { // Log the error for debugging but let legitimate failures propagate System.err.println("Share test operation failed after retries: " + e.getMessage()); diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java index 3689c2dd7f65..4bd64c8580d5 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java @@ -80,7 +80,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } From 3c6060881b16c1526459a3e6f2cf2e03ab9c3401 Mon Sep 17 00:00:00 2001 From: browndav Date: Fri, 6 Mar 2026 18:11:14 -0500 Subject: [PATCH 03/25] remove Sinks.EmitFailureHandler.FAIL_FAST from CrcInputStream - read functions had FAIL_FAST which would throw an error when the stream had reached then end and we wanted to read from the stream again. So we removed from both reads. - refactor code so that the exit criteria is a tthe beginning - refactor the emitContentInfo for dry --- .../azure/storage/stress/CrcInputStream.java | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java index 9c53830d9e09..37e46eff29b0 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java @@ -43,33 +43,42 @@ public CrcInputStream(InputStream source) { @Override public synchronized int read() throws IOException { int b = inputStream.read(); - if (b >= 0) { - crc.update(b); - if (head.hasRemaining()) { - head.put((byte) b); - } - length++; - } else { - sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + if (b < 0) { + emitContentInfo(); + return b; + } + + crc.update(b); + if (head.hasRemaining()) { + head.put((byte) b); } + length++; return b; } @Override public synchronized int read(byte buf[], int off, int len) throws IOException { int read = inputStream.read(buf, off, len); - if (read >= 0) { - length += read; - crc.update(buf, off, read); - if (head.hasRemaining()) { - head.put(buf, off, Math.min(read, head.remaining())); - } - } else { - sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + if (read < 0) { + emitContentInfo(); + return read; } + + crc.update(buf, off, read); + if (head.hasRemaining()) { + head.put(buf, off, Math.min(read, head.remaining())); + } + length += read; return read; } + // Uses tryEmitValue instead of emitValue(FAIL_FAST) so that resubscriptions + // (SDK retries, verification passes) don't throw on the second EOF. + private void emitContentInfo() { + //old was sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); + } + @Override public synchronized void mark(int readLimit) { if (markSupported) { @@ -136,6 +145,12 @@ public Flux convertStreamToByteBuffer() { return FluxUtil.fluxError(LOGGER, new UncheckedIOException(e)); } + // Reset CRC tracking state so resubscriptions (SDK retries or verification) + // compute the correct checksum from scratch. + crc.reset(); + length = 0; + head.clear(); + final long[] currentTotalLength = new long[1]; return Flux.generate(() -> inputStream, (is, sink) -> { long pos = currentTotalLength[0]; From 025a9e43756c160c0a4756916a6830400e61d6f1 Mon Sep 17 00:00:00 2001 From: browndav Date: Fri, 6 Mar 2026 18:13:46 -0500 Subject: [PATCH 04/25] prevent crashes on reattempted close on stream - changed emitValue to tryEmitValue - remove Sinks.EmitFailureHandler.FAIL_FAST so that multiple closes does not cause an error to be thrown --- .../main/java/com/azure/storage/stress/CrcOutputStream.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index a118d45e6cc1..77944f2954fd 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -30,9 +30,12 @@ public synchronized void write(byte buf[], int off, int len) { length += len; } + // Uses tryEmitValue so that double-close (e.g. explicit close + try-with-resources) + // doesn't throw on the second call. @Override public void close() throws IOException { - sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + //old: sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); super.close(); } From 30bd8dc4da7b70527f01e56debbbe7d1b14a89c0 Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 9 Mar 2026 10:39:35 -0400 Subject: [PATCH 05/25] fix telemetry so that it doesnt swallow errors --- .../java/com/azure/storage/stress/TelemetryHelper.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java index e3a756bf0c51..9471cefc8eb4 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java @@ -142,7 +142,9 @@ public void instrumentRun(ThrowingFunction oneRun) { oneRun.run(ctx); trackSuccess(start, span); } catch (Throwable e) { - if (e.getMessage().contains("Timeout on blocking read") || e instanceof InterruptedException || e instanceof TimeoutException) { + String message = e.getMessage(); + if (e instanceof InterruptedException || e instanceof TimeoutException + || (message != null && message.contains("Timeout on blocking read"))) { trackCancellation(start, span); } else { trackFailure(start, e, span); @@ -201,7 +203,9 @@ private void trackFailure(Instant start, Throwable e, Span span) { // already a NativeIoException/TimeoutException if (unwrapped instanceof RuntimeException) { String message = unwrapped.getMessage(); - if (message.contains("NativeIoException")) { + if (message == null) { + // No message to inspect — skip NativeIoException/TimeoutException detection. + } else if (message.contains("NativeIoException")) { unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE); } else if (message.contains("TimeoutException")) { unwrapped = new TimeoutException(message); From 303ad802e71e9aafa0105bdde5af4bf7701c35f3 Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 9 Mar 2026 13:37:44 -0400 Subject: [PATCH 06/25] roll back two deps because they were causing failures in the containers - opentelemetry-runtime-telemetry-java8 from 2.24.0-alpha -> 2.15.0-alpha - opentelemetry-logback-appender-1.0 from 2.24.0-alpha -> 2.15.0-alpha --- sdk/storage/azure-storage-blob-stress/pom.xml | 4 ++-- sdk/storage/azure-storage-file-datalake-stress/pom.xml | 4 ++-- sdk/storage/azure-storage-file-share-stress/pom.xml | 4 ++-- sdk/storage/azure-storage-stress/pom.xml | 4 ++-- .../main/java/com/azure/storage/stress/TelemetryHelper.java | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/pom.xml b/sdk/storage/azure-storage-blob-stress/pom.xml index e644eebf05fd..39072049dc20 100644 --- a/sdk/storage/azure-storage-blob-stress/pom.xml +++ b/sdk/storage/azure-storage-blob-stress/pom.xml @@ -57,12 +57,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha diff --git a/sdk/storage/azure-storage-file-datalake-stress/pom.xml b/sdk/storage/azure-storage-file-datalake-stress/pom.xml index df12914f5100..7d243eefa694 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/pom.xml +++ b/sdk/storage/azure-storage-file-datalake-stress/pom.xml @@ -57,12 +57,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha diff --git a/sdk/storage/azure-storage-file-share-stress/pom.xml b/sdk/storage/azure-storage-file-share-stress/pom.xml index 48823a16964d..852242f0285d 100644 --- a/sdk/storage/azure-storage-file-share-stress/pom.xml +++ b/sdk/storage/azure-storage-file-share-stress/pom.xml @@ -57,12 +57,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha diff --git a/sdk/storage/azure-storage-stress/pom.xml b/sdk/storage/azure-storage-stress/pom.xml index d8907fe67014..9a6cd7caf2a6 100644 --- a/sdk/storage/azure-storage-stress/pom.xml +++ b/sdk/storage/azure-storage-stress/pom.xml @@ -52,12 +52,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha com.azure diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java index 9471cefc8eb4..06b31dbab9ff 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java @@ -124,7 +124,7 @@ public String getDescription() { Cpu.registerObservers(otel); MemoryPools.registerObservers(otel); Threads.registerObservers(otel); - GarbageCollector.registerObservers(otel, false); // false disables the capture of the GC cause + GarbageCollector.registerObservers(otel); OpenTelemetryAppender.install(otel); return otel; } From 0a25f4df3778cc8a227e0cf93e9f47755822da77 Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 9 Mar 2026 13:40:26 -0400 Subject: [PATCH 07/25] rollback azure-client-sdk-parent linting extensions from 1.0.0-beta.2 t0 beta.1 --- sdk/parents/azure-client-sdk-parent/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/parents/azure-client-sdk-parent/pom.xml b/sdk/parents/azure-client-sdk-parent/pom.xml index 9d4f9dfd5739..bc50d0f0c042 100644 --- a/sdk/parents/azure-client-sdk-parent/pom.xml +++ b/sdk/parents/azure-client-sdk-parent/pom.xml @@ -691,7 +691,7 @@ io.clientcore linting-extensions - 1.0.0-beta.2 + 1.0.0-beta.1 com.puppycrawl.tools From ec051d81eac9adeaf492f254a0c83b39b30a309d Mon Sep 17 00:00:00 2001 From: browndav Date: Wed, 11 Mar 2026 12:58:22 -0400 Subject: [PATCH 08/25] revert linting extensions to beta2 --- sdk/parents/azure-client-sdk-parent/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/parents/azure-client-sdk-parent/pom.xml b/sdk/parents/azure-client-sdk-parent/pom.xml index bc50d0f0c042..9d4f9dfd5739 100644 --- a/sdk/parents/azure-client-sdk-parent/pom.xml +++ b/sdk/parents/azure-client-sdk-parent/pom.xml @@ -691,7 +691,7 @@ io.clientcore linting-extensions - 1.0.0-beta.1 + 1.0.0-beta.2 com.puppycrawl.tools From 8917a7c45f12c6158bdc40e14920808d688a6820 Mon Sep 17 00:00:00 2001 From: browndav Date: Wed, 11 Mar 2026 14:57:33 -0400 Subject: [PATCH 09/25] remove comments with old code --- .../src/main/java/com/azure/storage/stress/CrcInputStream.java | 1 - .../src/main/java/com/azure/storage/stress/CrcOutputStream.java | 1 - 2 files changed, 2 deletions(-) diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java index 37e46eff29b0..baea75f0247f 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java @@ -75,7 +75,6 @@ public synchronized int read(byte buf[], int off, int len) throws IOException { // Uses tryEmitValue instead of emitValue(FAIL_FAST) so that resubscriptions // (SDK retries, verification passes) don't throw on the second EOF. private void emitContentInfo() { - //old was sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); } diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index 77944f2954fd..e6868d3d3e4d 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -34,7 +34,6 @@ public synchronized void write(byte buf[], int off, int len) { // doesn't throw on the second call. @Override public void close() throws IOException { - //old: sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); super.close(); } From 9ba9c62a2e48e211a302699a37a0710ac49a8996 Mon Sep 17 00:00:00 2001 From: browndav Date: Wed, 11 Mar 2026 16:28:06 -0400 Subject: [PATCH 10/25] add logging for errors --- .../datalake/stress/DataLakeScenarioBase.java | 11 ++++--- .../azure/storage/stress/CrcInputStream.java | 24 ++++++++++++++- .../azure/storage/stress/CrcOutputStream.java | 30 ++++++++++++++++--- 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index a34136644a7f..5b101ee46aa3 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -15,10 +15,10 @@ import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; -import com.azure.storage.stress.TelemetryHelper; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.StorageStressOptions; +import com.azure.storage.stress.TelemetryHelper; import reactor.core.publisher.Mono; import java.time.Instant; @@ -139,14 +139,13 @@ public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) .retryWhen(reactor.util.retry.Retry.max(3) .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) - .onErrorMap(e -> { - // Log the error for debugging but let legitimate failures propagate - System.err.println("DataLake test operation failed after retries: " + e.getMessage()); - return e; - }); + .doOnError(e -> LOGGER.atWarning() + .addKeyValue("error", e.getMessage()) + .log("DataLake test operation failed after retries")); } protected abstract void runInternal(Context context) throws Exception; + protected abstract Mono runInternalAsync(Context context); protected DataLakeFileSystemClient getSyncFileSystemClient() { diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java index baea75f0247f..cf46cb608e5e 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java @@ -75,7 +75,29 @@ public synchronized int read(byte buf[], int off, int len) throws IOException { // Uses tryEmitValue instead of emitValue(FAIL_FAST) so that resubscriptions // (SDK retries, verification passes) don't throw on the second EOF. private void emitContentInfo() { - sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); + String baseErrorMessage = "Failed to emit content because "; + Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); + switch (emitResult) { + case OK: + LOGGER.info("CrcInputStream: OK"); + break; + case FAIL_TERMINATED: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink was previously terminated successfully or with an error" + emitResult)); + case FAIL_CANCELLED: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink was previously interrupted by its consumer: " + emitResult)); + case FAIL_OVERFLOW: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the buffer is full: " + emitResult)); + case FAIL_NON_SERIALIZED: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "two threads called emit at " + + "once: " + emitResult)); + case FAIL_ZERO_SUBSCRIBER: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " + + "subscriber:" + emitResult)); + default: + throw LOGGER.logExceptionAsError(new RuntimeException("Unknown emit result: " + emitResult)); + } } @Override diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index e6868d3d3e4d..198dc239cc86 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -1,5 +1,6 @@ package com.azure.storage.stress; +import com.azure.core.util.logging.ClientLogger; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -13,16 +14,18 @@ public class CrcOutputStream extends OutputStream { private final CRC32 crc = new CRC32(); private long length = 0; private final ByteBuffer head = ByteBuffer.allocate(1024); + private final static ClientLogger LOGGER = new ClientLogger(CrcOutputStream.class); + @Override public synchronized void write(int b) { crc.update(b); if (head.hasRemaining()) { - head.put((byte)b); + head.put((byte) b); } - length ++; + length++; } - public synchronized void write(byte buf[], int off, int len) { + public synchronized void write(byte[] buf, int off, int len) { crc.update(buf, off, len); if (head.hasRemaining()) { head.put(buf, off, Math.min(len, head.remaining())); @@ -34,7 +37,26 @@ public synchronized void write(byte buf[], int off, int len) { // doesn't throw on the second call. @Override public void close() throws IOException { - sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); + Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); + + switch (emitResult) { + case OK: + break; + case FAIL_TERMINATED: + throw LOGGER.logExceptionAsError(new RuntimeException("Sink was terminated before emitting content: " + + "info: " + emitResult)); + case FAIL_CANCELLED: + throw LOGGER.logExceptionAsError(new RuntimeException("The subscriber cancelled before the value was " + + "emitted: " + emitResult)); + case FAIL_OVERFLOW: + throw LOGGER.logExceptionAsError(new RuntimeException("Buffer full: " + emitResult)); + case FAIL_NON_SERIALIZED: + throw LOGGER.logExceptionAsError(new RuntimeException("Two threads call emit at once: " + emitResult)); + case FAIL_ZERO_SUBSCRIBER: + throw LOGGER.logExceptionAsError(new RuntimeException("Sink requires a subscriber: " + emitResult)); + default: + throw LOGGER.logExceptionAsError(new RuntimeException("Unknown emit result: " + emitResult)); + } super.close(); } From ec879d5e073e1cf50c757966e9cbb678d1fd12a9 Mon Sep 17 00:00:00 2001 From: browndav Date: Wed, 11 Mar 2026 18:58:15 -0400 Subject: [PATCH 11/25] remove catches for double close issue and okay status --- .../azure/storage/stress/CrcInputStream.java | 11 +++------ .../azure/storage/stress/CrcOutputStream.java | 24 +++++++++---------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java index cf46cb608e5e..7335573010d9 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java @@ -78,12 +78,9 @@ private void emitContentInfo() { String baseErrorMessage = "Failed to emit content because "; Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); switch (emitResult) { - case OK: - LOGGER.info("CrcInputStream: OK"); - break; - case FAIL_TERMINATED: - throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + - " the sink was previously terminated successfully or with an error" + emitResult)); + // OK and FAIL_TERMINATED are expected results and not logged as errors. OK means the content info was + // emitted successfully. FAIL_TERMINATED means the content info was already emitted and the sink was terminated, + // which can happen if the stream is read to completion multiple times on things like retries case FAIL_CANCELLED: throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + " the sink was previously interrupted by its consumer: " + emitResult)); @@ -95,8 +92,6 @@ private void emitContentInfo() { case FAIL_ZERO_SUBSCRIBER: throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " + "subscriber:" + emitResult)); - default: - throw LOGGER.logExceptionAsError(new RuntimeException("Unknown emit result: " + emitResult)); } } diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index 198dc239cc86..311a899c8ae0 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -37,25 +37,23 @@ public synchronized void write(byte[] buf, int off, int len) { // doesn't throw on the second call. @Override public void close() throws IOException { + String baseErrorMessage = "Failed to emit content because "; Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); - switch (emitResult) { - case OK: - break; - case FAIL_TERMINATED: - throw LOGGER.logExceptionAsError(new RuntimeException("Sink was terminated before emitting content: " + - "info: " + emitResult)); + // OK and FAIL_TERMINATED are expected results and not logged as errors. OK means the content info was + // emitted successfully. FAIL_TERMINATED means the content info was already emitted and the sink was terminated, + // which can happen if the stream is read to completion multiple times on things like retries case FAIL_CANCELLED: - throw LOGGER.logExceptionAsError(new RuntimeException("The subscriber cancelled before the value was " + - "emitted: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink was previously interrupted by its consumer: " + emitResult)); case FAIL_OVERFLOW: - throw LOGGER.logExceptionAsError(new RuntimeException("Buffer full: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the buffer is full: " + emitResult)); case FAIL_NON_SERIALIZED: - throw LOGGER.logExceptionAsError(new RuntimeException("Two threads call emit at once: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "two threads called emit at " + + "once: " + emitResult)); case FAIL_ZERO_SUBSCRIBER: - throw LOGGER.logExceptionAsError(new RuntimeException("Sink requires a subscriber: " + emitResult)); - default: - throw LOGGER.logExceptionAsError(new RuntimeException("Unknown emit result: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " + + "subscriber:" + emitResult)); } super.close(); } From 01792374c265ac8cc944e025c8be7593e89f47c8 Mon Sep 17 00:00:00 2001 From: browndav Date: Sun, 15 Mar 2026 19:54:13 -0400 Subject: [PATCH 12/25] recursively delete files then delete the directory --- .../file/share/stress/ShareScenarioBase.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index b1ff827f59ac..15a493e9ce5c 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -12,6 +12,7 @@ import com.azure.perf.test.core.PerfStressTest; import com.azure.storage.file.share.ShareAsyncClient; import com.azure.storage.file.share.ShareClient; +import com.azure.storage.file.share.ShareDirectoryAsyncClient; import com.azure.storage.file.share.ShareServiceAsyncClient; import com.azure.storage.file.share.ShareServiceClient; import com.azure.storage.file.share.ShareServiceClientBuilder; @@ -115,10 +116,7 @@ private Mono cleanupShareWithRetry() { * Delete all files in the share to help with cleanup. */ private Mono deleteAllFilesInShare() { - return asyncNoFaultShareClient.getDirectoryClient("").listFilesAndDirectories() - .flatMap(fileRef -> - asyncNoFaultShareClient.getFileClient(fileRef.getName()).delete()) - .then() + return deleteDirectoryContentsRecursively(asyncNoFaultShareClient.getDirectoryClient("")) .timeout(java.time.Duration.ofSeconds(60)) .onErrorResume(error -> { // Log but continue - some files might have been deleted @@ -129,6 +127,27 @@ private Mono deleteAllFilesInShare() { }); } + /** + * Recursively delete all contents of a directory (files first, then subdirectories). + */ + private Mono deleteDirectoryContentsRecursively( + ShareDirectoryAsyncClient directoryClient) { + return directoryClient.listFilesAndDirectories() + // Use concatMap to ensure we process each file/directory sequentially, which is important for correct deletion order + .concatMap(fileRef -> { + if (fileRef.isDirectory()) { + ShareDirectoryAsyncClient subDirClient = + directoryClient.getSubdirectoryClient(fileRef.getName()); + // First delete all contents recursively, then delete the directory itself + return deleteDirectoryContentsRecursively(subDirClient) + .then(subDirClient.delete()); + } else { + return directoryClient.getFileClient(fileRef.getName()).delete(); + } + }) + .then(); + } + @SuppressWarnings("try") @Override public void run() { From ae74f4a3842090f2434867508c1f0a5376ce5edf Mon Sep 17 00:00:00 2001 From: browndav Date: Sun, 15 Mar 2026 20:11:41 -0400 Subject: [PATCH 13/25] change to sync deletes, refactor for easier reading --- .../storage/blob/stress/BlobScenarioBase.java | 65 ++++++++++++------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index f821ea330c61..0215ae3c3158 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -86,41 +86,50 @@ public Mono globalCleanupAsync() { }); } - /** - * Enhanced cleanup with timeout and retry logic to ensure containers are properly destroyed. - */ + private static final int DELETE_TIMEOUT_SECONDS = 30; + private static final int BLOB_CLEANUP_TIMEOUT_SECONDS = 60; + private static final int MAX_RETRY_ATTEMPTS = 3; + private Mono cleanupContainerWithRetry() { + return tryDeleteContainer() + .onErrorResume(error -> fallbackCleanup()); + } + + private Mono tryDeleteContainer() { return asyncNoFaultContainerClient.deleteIfExists() - .then() // Convert Mono to Mono - .timeout(Duration.ofSeconds(30)) - .retry(3) - .onErrorResume(error -> { - // If container deletion fails, try to delete all blobs first then retry container deletion - return deleteAllBlobsInContainer() - .then(asyncNoFaultContainerClient.deleteIfExists()) - .then() // Convert Mono to Mono - .timeout(Duration.ofSeconds(30)) - .onErrorResume(finalError -> { - // Log the error but don't fail the test - LOGGER.atWarning() - .addKeyValue("error", finalError.getMessage()) - .log("Final container cleanup failed after retries"); - return Mono.empty(); - }); - }); + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)) + .retry(MAX_RETRY_ATTEMPTS); + } + + private Mono fallbackCleanup() { + return deleteAllBlobsInContainer() + .then(tryDeleteContainerOnce()) + .onErrorResume(this::logCleanupFailure); + } + + private Mono tryDeleteContainerOnce() { + return asyncNoFaultContainerClient.deleteIfExists() + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)); + } + + private Mono logCleanupFailure(Throwable error) { + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Final container cleanup failed after retries"); + return Mono.empty(); } /** - * Delete all blobs in the container to help with cleanup. + * Deletes all blobs in the container sequentially to avoid throttling. */ private Mono deleteAllBlobsInContainer() { return asyncNoFaultContainerClient.listBlobs() - .flatMap(blobItem -> - asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) + .concatMap(this::deleteBlobQuietly) .then() - .timeout(Duration.ofSeconds(60)) + .timeout(Duration.ofSeconds(BLOB_CLEANUP_TIMEOUT_SECONDS)) .onErrorResume(error -> { - // Log but continue - some blobs might have been deleted LOGGER.atWarning() .addKeyValue("error", error.getMessage()) .log("Blob cleanup partially failed"); @@ -128,6 +137,12 @@ private Mono deleteAllBlobsInContainer() { }); } + private Mono deleteBlobQuietly(com.azure.storage.blob.models.BlobItem blobItem) { + return asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()) + .delete() + .onErrorResume(e -> Mono.empty()); + } + @SuppressWarnings("try") @Override public void run() { From c77be01d9b6f8716ea8b9cdc7e5807e5cff1f685 Mon Sep 17 00:00:00 2001 From: browndav Date: Sun, 15 Mar 2026 20:15:47 -0400 Subject: [PATCH 14/25] restructing share clean up so super calls only once --- .../azure/storage/file/share/stress/ShareScenarioBase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 15a493e9ce5c..905415aa685f 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -84,8 +84,9 @@ public Mono globalCleanupAsync() { LOGGER.atWarning() .addKeyValue("error", error.getMessage()) .log("Share cleanup failed"); - return super.globalCleanupAsync(); - }); + return Mono.empty(); + }) + .then(super.globalCleanupAsync()); } /** From ac25ae175c100abef03d24dea6a4f90c4ddae24d Mon Sep 17 00:00:00 2001 From: browndav Date: Sun, 15 Mar 2026 20:55:19 -0400 Subject: [PATCH 15/25] incorporate copilot suggestions --- .../storage/blob/stress/PageBlobScenarioBase.java | 10 ++++------ .../file/share/stress/ShareScenarioBase.java | 6 ++---- .../com/azure/storage/stress/TelemetryHelper.java | 14 +++++++------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java index cdc7539e8ecf..3d5f823c0e60 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java @@ -114,7 +114,7 @@ private Mono cleanupContainerWithRetry() { */ private Mono deleteAllBlobsInContainer() { return asyncNoFaultContainerClient.listBlobs() - .flatMap(blobItem -> + .concatMap(blobItem -> asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) .then() .timeout(Duration.ofSeconds(60)) @@ -139,11 +139,9 @@ public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) .retryWhen(reactor.util.retry.Retry.max(3) .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) - .onErrorMap(e -> { - // Log the error for debugging but let legitimate failures propagate - System.err.println("Test operation failed after retries: " + e.getMessage()); - return e; - }); + .doOnError(e -> LOGGER.atError() + .addKeyValue("error", e.getMessage()) + .log("Test operation failed after retries")); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 905415aa685f..8bfe6a5cdb99 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -78,7 +78,6 @@ public Mono globalSetupAsync() { public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); return cleanupShareWithRetry() - .then(super.globalCleanupAsync()) .onErrorResume(error -> { // Log cleanup failure but don't fail the overall test LOGGER.atWarning() @@ -161,10 +160,9 @@ public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) .retryWhen(reactor.util.retry.Retry.max(3) .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) - .onErrorMap(e -> { + .doOnError(e -> { // Log the error for debugging but let legitimate failures propagate - System.err.println("Share test operation failed after retries: " + e.getMessage()); - return e; + LOGGER.error("Share test operation failed after retries.", e); }); } diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java index 06b31dbab9ff..ab54e94a8386 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java @@ -203,13 +203,13 @@ private void trackFailure(Instant start, Throwable e, Span span) { // already a NativeIoException/TimeoutException if (unwrapped instanceof RuntimeException) { String message = unwrapped.getMessage(); - if (message == null) { - // No message to inspect — skip NativeIoException/TimeoutException detection. - } else if (message.contains("NativeIoException")) { - unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE); - } else if (message.contains("TimeoutException")) { - unwrapped = new TimeoutException(message); - } + // Only inspect the message when it is non-null. + if (message != null) { + if (message.contains("NativeIoException")) { + unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE); + } else if (message.contains("TimeoutException")) { + unwrapped = new TimeoutException(message); + } } span.recordException(unwrapped); From 12364bce4e16ff3cf4ee06c4090089aad41a3770 Mon Sep 17 00:00:00 2001 From: browndav Date: Sun, 15 Mar 2026 21:19:12 -0400 Subject: [PATCH 16/25] incorporate copilot suggestions --- .../azure/storage/blob/stress/BlobScenarioBase.java | 3 +-- .../com/azure/storage/stress/CrcInputStream.java | 10 +++++++--- .../com/azure/storage/stress/CrcOutputStream.java | 12 ++++++++---- .../com/azure/storage/stress/TelemetryHelper.java | 2 +- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index 0215ae3c3158..dc3ffc7bba4b 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -155,13 +155,12 @@ public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) .retryWhen(reactor.util.retry.Retry.max(3) .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) - .onErrorMap(e -> { + .doOnError(e -> { // Log the error for debugging but let legitimate failures propagate LOGGER.atError() .addKeyValue("error", e.getMessage()) .addKeyValue("errorType", e.getClass().getSimpleName()) .log("Test operation failed after retries"); - return e; }); } diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java index 7335573010d9..7e48798b30b0 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java @@ -78,9 +78,10 @@ private void emitContentInfo() { String baseErrorMessage = "Failed to emit content because "; Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); switch (emitResult) { - // OK and FAIL_TERMINATED are expected results and not logged as errors. OK means the content info was - // emitted successfully. FAIL_TERMINATED means the content info was already emitted and the sink was terminated, - // which can happen if the stream is read to completion multiple times on things like retries + case OK: + case FAIL_TERMINATED: + // No action needed for successful or already-terminated emissions. + break; case FAIL_CANCELLED: throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + " the sink was previously interrupted by its consumer: " + emitResult)); @@ -92,6 +93,9 @@ private void emitContentInfo() { case FAIL_ZERO_SUBSCRIBER: throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " + "subscriber:" + emitResult)); + default: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "unexpected emit result: " + + emitResult)); } } diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index 311a899c8ae0..2b908e8f7b39 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -14,7 +14,7 @@ public class CrcOutputStream extends OutputStream { private final CRC32 crc = new CRC32(); private long length = 0; private final ByteBuffer head = ByteBuffer.allocate(1024); - private final static ClientLogger LOGGER = new ClientLogger(CrcOutputStream.class); + private static final ClientLogger LOGGER = new ClientLogger(CrcOutputStream.class); @Override public synchronized void write(int b) { @@ -40,9 +40,10 @@ public void close() throws IOException { String baseErrorMessage = "Failed to emit content because "; Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); switch (emitResult) { - // OK and FAIL_TERMINATED are expected results and not logged as errors. OK means the content info was - // emitted successfully. FAIL_TERMINATED means the content info was already emitted and the sink was terminated, - // which can happen if the stream is read to completion multiple times on things like retries + case OK: + case FAIL_TERMINATED: + // Expected successful outcomes; nothing further to do. + break; case FAIL_CANCELLED: throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + " the sink was previously interrupted by its consumer: " + emitResult)); @@ -54,6 +55,9 @@ public void close() throws IOException { case FAIL_ZERO_SUBSCRIBER: throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " + "subscriber:" + emitResult)); + default: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + "an unexpected emit result was returned: " + emitResult)); } super.close(); } diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java index ab54e94a8386..4bf6e523eae1 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java @@ -203,13 +203,13 @@ private void trackFailure(Instant start, Throwable e, Span span) { // already a NativeIoException/TimeoutException if (unwrapped instanceof RuntimeException) { String message = unwrapped.getMessage(); - // Only inspect the message when it is non-null. if (message != null) { if (message.contains("NativeIoException")) { unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE); } else if (message.contains("TimeoutException")) { unwrapped = new TimeoutException(message); } + } } span.recordException(unwrapped); From 28a6fecbfc6d674bd901d91759b59d31397e0843 Mon Sep 17 00:00:00 2001 From: browndav Date: Sun, 15 Mar 2026 21:28:41 -0400 Subject: [PATCH 17/25] incorporate copilot suggestions --- .../storage/blob/stress/PageBlobScenarioBase.java | 11 ++++++++++- .../com/azure/storage/stress/CrcOutputStream.java | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java index 3d5f823c0e60..40f264444f2c 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java @@ -115,7 +115,16 @@ private Mono cleanupContainerWithRetry() { private Mono deleteAllBlobsInContainer() { return asyncNoFaultContainerClient.listBlobs() .concatMap(blobItem -> - asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()).delete()) + asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()) + .deleteIfExists() + .onErrorResume(error -> { + // Log but continue - failures for individual blobs should not stop cleanup + LOGGER.atWarning() + .addKeyValue("blobName", blobItem.getName()) + .addKeyValue("error", error.getMessage()) + .log("Failed to delete blob during cleanup"); + return Mono.empty(); + })) .then() .timeout(Duration.ofSeconds(60)) .onErrorResume(error -> { diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index 2b908e8f7b39..0d467c9bcd24 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -36,7 +36,7 @@ public synchronized void write(byte[] buf, int off, int len) { // Uses tryEmitValue so that double-close (e.g. explicit close + try-with-resources) // doesn't throw on the second call. @Override - public void close() throws IOException { + public synchronized void close() throws IOException { String baseErrorMessage = "Failed to emit content because "; Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); switch (emitResult) { From 6efe321487e94885c361e811efdadf349cd54bd4 Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 16 Mar 2026 11:10:34 -0400 Subject: [PATCH 18/25] incorporate copilot suggestions --- .../file/share/stress/ShareScenarioBase.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 8bfe6a5cdb99..d3e9a3c54711 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -140,9 +140,23 @@ private Mono deleteDirectoryContentsRecursively( directoryClient.getSubdirectoryClient(fileRef.getName()); // First delete all contents recursively, then delete the directory itself return deleteDirectoryContentsRecursively(subDirClient) - .then(subDirClient.delete()); + .then(subDirClient.deleteIfExists()) + .onErrorResume(error -> { + LOGGER.atWarning() + .addKeyValue("directory", fileRef.getName()) + .addKeyValue("error", error.getMessage()) + .log("Failed to delete directory during share cleanup; continuing with remaining items."); + return Mono.empty(); + }); } else { - return directoryClient.getFileClient(fileRef.getName()).delete(); + return directoryClient.getFileClient(fileRef.getName()) + .deleteIfExists() + .onErrorResume(error -> { + LOGGER.atWarning() + .addKeyValue("file", fileRef.getName()) + .addKeyValue("error", error.getMessage()) + .log("Failed to delete file during share cleanup; continuing with remaining items."); + return Mono.empty(); } }) .then(); From 5839164a4fe4a58fd2bad4bea33ab1d3e8187d9f Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 16 Mar 2026 11:58:01 -0400 Subject: [PATCH 19/25] fix all deletes to make sync and wrap in try-catch --- .../com/azure/storage/blob/stress/BlobScenarioBase.java | 5 +++-- .../java/com/azure/storage/blob/stress/DownloadToFile.java | 2 +- .../java/com/azure/storage/blob/stress/UploadFromFile.java | 2 +- .../storage/file/datalake/stress/DataLakeScenarioBase.java | 6 ++++-- .../com/azure/storage/file/datalake/stress/ReadToFile.java | 2 +- .../azure/storage/file/datalake/stress/UploadFromFile.java | 2 +- .../com/azure/storage/file/share/stress/DownloadToFile.java | 2 +- .../com/azure/storage/file/share/stress/UploadFromFile.java | 2 +- 8 files changed, 13 insertions(+), 10 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index dc3ffc7bba4b..ca1edbe70fe2 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -139,8 +139,9 @@ private Mono deleteAllBlobsInContainer() { private Mono deleteBlobQuietly(com.azure.storage.blob.models.BlobItem blobItem) { return asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()) - .delete() - .onErrorResume(e -> Mono.empty()); + .deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(); } @SuppressWarnings("try") diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java index 059365f0aa15..bcb2baae2c8f 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java @@ -66,7 +66,7 @@ protected Mono runInternalAsync(Context span) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java index 41a6c68bdecf..24289287b368 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java @@ -107,7 +107,7 @@ private Path getTempPath(String prefix) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index 5b101ee46aa3..5de274309835 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -114,8 +114,10 @@ private Mono cleanupFileSystemWithRetry() { */ private Mono deleteAllFilesInFileSystem() { return asyncNoFaultFileSystemClient.listPaths() - .flatMap(pathItem -> - asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName()).delete()) + .concatMap(pathItem -> + asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName()).deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then()) .then() .timeout(java.time.Duration.ofSeconds(60)) .onErrorResume(error -> { diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java index 6dca37ecb92c..228425064123 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java @@ -72,7 +72,7 @@ public Mono cleanupAsync() { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java index b4a12e52888c..9b889f1a393c 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java @@ -101,7 +101,7 @@ private Path getTempPath(String prefix) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java index 02be0b18f68b..7226e59d1e28 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java @@ -67,7 +67,7 @@ public Mono cleanupAsync() { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java index 4bd64c8580d5..c3de9eb9da1f 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java @@ -94,7 +94,7 @@ private Path getTempPath(String prefix) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) From 9850c8a4ebc0e841c53ab209051803f06181f3f5 Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 16 Mar 2026 12:25:55 -0400 Subject: [PATCH 20/25] fix tests so that super.globalCleanupAsync() is only called once --- sdk/parents/azure-client-sdk-parent/pom.xml | 2 +- .../com/azure/storage/blob/stress/BlobScenarioBase.java | 7 ++++--- .../azure/storage/blob/stress/PageBlobScenarioBase.java | 7 ++++--- .../storage/file/datalake/stress/DataLakeScenarioBase.java | 7 ++++--- .../azure/storage/file/share/stress/ShareScenarioBase.java | 1 + 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sdk/parents/azure-client-sdk-parent/pom.xml b/sdk/parents/azure-client-sdk-parent/pom.xml index 9d4f9dfd5739..bc50d0f0c042 100644 --- a/sdk/parents/azure-client-sdk-parent/pom.xml +++ b/sdk/parents/azure-client-sdk-parent/pom.xml @@ -691,7 +691,7 @@ io.clientcore linting-extensions - 1.0.0-beta.2 + 1.0.0-beta.1 com.puppycrawl.tools diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index ca1edbe70fe2..ec074426c4bc 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -76,14 +76,15 @@ public Mono globalSetupAsync() { public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); return cleanupContainerWithRetry() - .then(super.globalCleanupAsync()) .onErrorResume(error -> { // Log cleanup failure but don't fail the overall test LOGGER.atWarning() .addKeyValue("error", error.getMessage()) .log("Container cleanup failed"); - return super.globalCleanupAsync(); - }); + + return Mono.empty(); + }) + .then(super.globalCleanupAsync()); } private static final int DELETE_TIMEOUT_SECONDS = 30; diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java index 40f264444f2c..fbeb7647bab7 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java @@ -75,14 +75,15 @@ public Mono globalSetupAsync() { public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); return cleanupContainerWithRetry() - .then(super.globalCleanupAsync()) .onErrorResume(error -> { // Log cleanup failure but don't fail the overall test LOGGER.atWarning() .addKeyValue("error", error.getMessage()) .log("Container cleanup failed"); - return super.globalCleanupAsync(); - }); + + return Mono.empty(); + }) + .then(super.globalCleanupAsync()); } /** diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index 5de274309835..b4e7c625d133 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -75,14 +75,15 @@ public Mono globalSetupAsync() { public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); return cleanupFileSystemWithRetry() - .then(super.globalCleanupAsync()) .onErrorResume(error -> { // Log cleanup failure but don't fail the overall test LOGGER.atWarning() .addKeyValue("error", error.getMessage()) .log("FileSystem cleanup failed"); - return super.globalCleanupAsync(); - }); + + return Mono.empty(); + }) + .then(super.globalCleanupAsync()); } /** diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index d3e9a3c54711..85cfa11d04af 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -157,6 +157,7 @@ private Mono deleteDirectoryContentsRecursively( .addKeyValue("error", error.getMessage()) .log("Failed to delete file during share cleanup; continuing with remaining items."); return Mono.empty(); + }); } }) .then(); From 462a4b06e77aa028f6524a976254b00599b05d03 Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 16 Mar 2026 12:49:22 -0400 Subject: [PATCH 21/25] change telemetry to loggin only returns final state instead of failed retries when ultimately successful --- .../storage/blob/stress/BlobScenarioBase.java | 24 +++++++++++-------- .../blob/stress/PageBlobScenarioBase.java | 8 +++++-- .../datalake/stress/DataLakeScenarioBase.java | 4 +++- .../file/share/stress/ShareScenarioBase.java | 4 +++- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index ec074426c4bc..6bf3dc004108 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -15,10 +15,12 @@ import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.TelemetryHelper; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.StorageStressOptions; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.time.Instant; @@ -154,16 +156,18 @@ public void run() { @SuppressWarnings("try") @Override public Mono runAsync() { - return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .retryWhen(reactor.util.retry.Retry.max(3) - .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) - .doOnError(e -> { - // Log the error for debugging but let legitimate failures propagate - LOGGER.atError() - .addKeyValue("error", e.getMessage()) - .addKeyValue("errorType", e.getClass().getSimpleName()) - .log("Test operation failed after retries"); - }); + return telemetryHelper.instrumentRunAsync(ctx -> + runInternalAsync(ctx) + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(Exceptions.unwrap(e) + instanceof ContentMismatchException))) + .doOnError(e -> { + // Log the error for debugging but let legitimate failures propagate + LOGGER.atError() + .addKeyValue("error", e.getMessage()) + .addKeyValue("errorType", e.getClass().getSimpleName()) + .log("Test operation failed after retries"); + })); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java index fbeb7647bab7..352e2ae790aa 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java @@ -15,10 +15,12 @@ import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.TelemetryHelper; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.StorageStressOptions; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.time.Duration; @@ -146,9 +148,11 @@ public void run() { @SuppressWarnings("try") @Override public Mono runAsync() { - return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) + return telemetryHelper.instrumentRunAsync(ctx -> + runInternalAsync(ctx) .retryWhen(reactor.util.retry.Retry.max(3) - .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) + .filter(e -> !(Exceptions.unwrap(e) + instanceof ContentMismatchException)))) .doOnError(e -> LOGGER.atError() .addKeyValue("error", e.getMessage()) .log("Test operation failed after retries")); diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index b4e7c625d133..dc15309e9533 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -15,10 +15,12 @@ import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.StorageStressOptions; import com.azure.storage.stress.TelemetryHelper; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.time.Instant; @@ -141,7 +143,7 @@ public void run() { public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) .retryWhen(reactor.util.retry.Retry.max(3) - .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) + .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException))) .doOnError(e -> LOGGER.atWarning() .addKeyValue("error", e.getMessage()) .log("DataLake test operation failed after retries")); diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 85cfa11d04af..2b714fc17f22 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -17,10 +17,12 @@ import com.azure.storage.file.share.ShareServiceClient; import com.azure.storage.file.share.ShareServiceClientBuilder; import com.azure.storage.file.share.models.ShareTokenIntent; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.TelemetryHelper; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.StorageStressOptions; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.time.Instant; @@ -174,7 +176,7 @@ public void run() { public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) .retryWhen(reactor.util.retry.Retry.max(3) - .filter(e -> !(reactor.core.Exceptions.unwrap(e) instanceof com.azure.storage.stress.ContentMismatchException))) + .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException))) .doOnError(e -> { // Log the error for debugging but let legitimate failures propagate LOGGER.error("Share test operation failed after retries.", e); From 43886c3a92672138d4daff92ded34ef66bd415ac Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 16 Mar 2026 13:09:17 -0400 Subject: [PATCH 22/25] undo versio downgrade for linting-extensions --- sdk/parents/azure-client-sdk-parent/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/parents/azure-client-sdk-parent/pom.xml b/sdk/parents/azure-client-sdk-parent/pom.xml index bc50d0f0c042..9d4f9dfd5739 100644 --- a/sdk/parents/azure-client-sdk-parent/pom.xml +++ b/sdk/parents/azure-client-sdk-parent/pom.xml @@ -691,7 +691,7 @@ io.clientcore linting-extensions - 1.0.0-beta.1 + 1.0.0-beta.2 com.puppycrawl.tools From 617c585936ca14bd1f6628499b74b8e8462350f5 Mon Sep 17 00:00:00 2001 From: browndav-msft Date: Mon, 16 Mar 2026 14:45:28 -0400 Subject: [PATCH 23/25] Fixing spacing in error messages Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../azure/storage/stress/CrcOutputStream.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index 0d467c9bcd24..a9445c399cad 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -37,7 +37,7 @@ public synchronized void write(byte[] buf, int off, int len) { // doesn't throw on the second call. @Override public synchronized void close() throws IOException { - String baseErrorMessage = "Failed to emit content because "; + String baseErrorMessage = "Failed to emit content because"; Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); switch (emitResult) { case OK: @@ -45,19 +45,20 @@ public synchronized void close() throws IOException { // Expected successful outcomes; nothing further to do. break; case FAIL_CANCELLED: - throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + - " the sink was previously interrupted by its consumer: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink was previously interrupted by its consumer: " + emitResult)); case FAIL_OVERFLOW: - throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the buffer is full: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the buffer is full: " + emitResult)); case FAIL_NON_SERIALIZED: - throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "two threads called emit at " + - "once: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " two threads called emit at once: " + emitResult)); case FAIL_ZERO_SUBSCRIBER: - throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " + - "subscriber:" + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink requires a subscriber: " + emitResult)); default: - throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + - "an unexpected emit result was returned: " + emitResult)); + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " an unexpected emit result was returned: " + emitResult)); } super.close(); } From 26b3a37830186aa1a9a92394aa5a33df3a9c0a24 Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 16 Mar 2026 15:32:37 -0400 Subject: [PATCH 24/25] refactor datalake delete all so that it is easier to read --- .../datalake/stress/DataLakeScenarioBase.java | 84 ++++++++++++------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index dc15309e9533..965cf2e3d981 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -15,6 +15,7 @@ import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; +import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions; import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; @@ -23,6 +24,7 @@ import reactor.core.Exceptions; import reactor.core.publisher.Mono; +import java.time.Duration; import java.time.Instant; import java.util.UUID; @@ -88,50 +90,74 @@ public Mono globalCleanupAsync() { .then(super.globalCleanupAsync()); } - /** - * Enhanced cleanup with timeout and retry logic to ensure file systems are properly destroyed. - */ + private static final int DELETE_TIMEOUT_SECONDS = 30; + private static final int PATH_CLEANUP_TIMEOUT_SECONDS = 60; + private static final int MAX_RETRY_ATTEMPTS = 3; + private Mono cleanupFileSystemWithRetry() { + return tryDeleteFileSystem() + .onErrorResume(error -> fallbackCleanup()); + } + + private Mono tryDeleteFileSystem() { return asyncNoFaultFileSystemClient.deleteIfExists() - .then() // Convert Mono to Mono - .timeout(java.time.Duration.ofSeconds(30)) - .retry(3) - .onErrorResume(error -> { - // If file system deletion fails, try to delete all files first then retry - return deleteAllFilesInFileSystem() - .then(asyncNoFaultFileSystemClient.deleteIfExists()) - .then() // Convert Mono to Mono - .timeout(java.time.Duration.ofSeconds(30)) - .onErrorResume(finalError -> { - // Log the error but don't fail the test - LOGGER.atWarning() - .addKeyValue("error", finalError.getMessage()) - .log("Final file system cleanup failed after retries"); - return Mono.empty(); - }); - }); + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)) + .retry(MAX_RETRY_ATTEMPTS); + } + + private Mono fallbackCleanup() { + return deleteAllPathsInFileSystem() + .then(tryDeleteFileSystemOnce()) + .onErrorResume(this::logCleanupFailure); + } + + private Mono tryDeleteFileSystemOnce() { + return asyncNoFaultFileSystemClient.deleteIfExists() + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)); + } + + private Mono logCleanupFailure(Throwable error) { + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Final file system cleanup failed after retries"); + return Mono.empty(); } /** - * Delete all files in the file system to help with cleanup. + * Deletes all paths in the file system sequentially to avoid throttling. */ - private Mono deleteAllFilesInFileSystem() { + private Mono deleteAllPathsInFileSystem() { return asyncNoFaultFileSystemClient.listPaths() - .concatMap(pathItem -> - asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName()).deleteIfExists() - .onErrorResume(e -> Mono.empty()) - .then()) + .concatMap(this::deletePathQuietly) .then() - .timeout(java.time.Duration.ofSeconds(60)) + .timeout(Duration.ofSeconds(PATH_CLEANUP_TIMEOUT_SECONDS)) .onErrorResume(error -> { - // Log but continue - some files might have been deleted LOGGER.atWarning() .addKeyValue("error", error.getMessage()) - .log("File cleanup partially failed"); + .log("Path cleanup partially failed"); return Mono.empty(); }); } + private Mono deletePathQuietly(com.azure.storage.file.datalake.models.PathItem pathItem) { + DataLakePathDeleteOptions deleteOptions = new DataLakePathDeleteOptions(); + deleteOptions.setIsRecursive(true); + + if (pathItem.isDirectory()) { + return asyncNoFaultFileSystemClient.getDirectoryAsyncClient(pathItem.getName()) + .deleteIfExistsWithResponse(deleteOptions) + .onErrorResume(e -> Mono.empty()) + .then(); + } else { + return asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName()) + .deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(); + } + } + @SuppressWarnings("try") @Override public void run() { From 81c818eeefb9719b424048252d0a635463715eff Mon Sep 17 00:00:00 2001 From: browndav Date: Mon, 16 Mar 2026 15:42:29 -0400 Subject: [PATCH 25/25] refactor runAsync in ShareScenarioBase so retry failures dont show as failures upon success --- .../file/share/stress/ShareScenarioBase.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 2b714fc17f22..fad48901ac1f 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -174,13 +174,14 @@ public void run() { @SuppressWarnings("try") @Override public Mono runAsync() { - return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .retryWhen(reactor.util.retry.Retry.max(3) - .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException))) - .doOnError(e -> { - // Log the error for debugging but let legitimate failures propagate - LOGGER.error("Share test operation failed after retries.", e); - }); + return telemetryHelper.instrumentRunAsync(ctx -> + runInternalAsync(ctx) + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException))) + .doOnError(e -> { + // Log the error for debugging but let legitimate failures propagate + LOGGER.error("Share test operation failed after retries.", e); + })); } protected abstract void runInternal(Context context) throws Exception;