diff --git a/sdk/storage/azure-storage-blob-stress/pom.xml b/sdk/storage/azure-storage-blob-stress/pom.xml index e644eebf05fd..39072049dc20 100644 --- a/sdk/storage/azure-storage-blob-stress/pom.xml +++ b/sdk/storage/azure-storage-blob-stress/pom.xml @@ -57,12 +57,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java index 9dd302492c3c..7543e7c86658 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java @@ -70,7 +70,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java index 89aefd7fc131..0c59b7a2851d 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java @@ -62,8 +62,9 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.getAppendBlobAsyncClient().delete() - .then(tempSetupBlobClient.delete()) + return asyncNoFaultClient.getAppendBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupBlobClient.deleteIfExists()) .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java index 0e68fb7dc864..6bf3dc004108 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java @@ -6,6 +6,7 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; @@ -14,17 +15,21 @@ import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.TelemetryHelper; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.StorageStressOptions; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.time.Instant; import java.util.UUID; +import java.time.Duration; public abstract class BlobScenarioBase extends PerfStressTest { private static final String CONTAINER_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(BlobScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final BlobContainerClient syncContainerClient; private final BlobContainerAsyncClient asyncContainerClient; @@ -72,10 +77,76 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); - return asyncNoFaultContainerClient.deleteIfExists() + return cleanupContainerWithRetry() + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Container cleanup failed"); + + return Mono.empty(); + }) .then(super.globalCleanupAsync()); } + private static final int DELETE_TIMEOUT_SECONDS = 30; + private static final int BLOB_CLEANUP_TIMEOUT_SECONDS = 60; + private static final int MAX_RETRY_ATTEMPTS = 3; + + private Mono cleanupContainerWithRetry() { + return tryDeleteContainer() + .onErrorResume(error -> fallbackCleanup()); + } + + private Mono tryDeleteContainer() { + return asyncNoFaultContainerClient.deleteIfExists() + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)) + .retry(MAX_RETRY_ATTEMPTS); + } + + private Mono fallbackCleanup() { + return deleteAllBlobsInContainer() + .then(tryDeleteContainerOnce()) + .onErrorResume(this::logCleanupFailure); + } + + private Mono tryDeleteContainerOnce() { + return asyncNoFaultContainerClient.deleteIfExists() + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)); + } + + private Mono logCleanupFailure(Throwable error) { + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Final container cleanup failed after retries"); + return Mono.empty(); + } + + /** + * Deletes all blobs in the container sequentially to avoid throttling. + */ + private Mono deleteAllBlobsInContainer() { + return asyncNoFaultContainerClient.listBlobs() + .concatMap(this::deleteBlobQuietly) + .then() + .timeout(Duration.ofSeconds(BLOB_CLEANUP_TIMEOUT_SECONDS)) + .onErrorResume(error -> { + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Blob cleanup partially failed"); + return Mono.empty(); + }); + } + + private Mono deleteBlobQuietly(com.azure.storage.blob.models.BlobItem blobItem) { + return asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()) + .deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(); + } + @SuppressWarnings("try") @Override public void run() { @@ -85,8 +156,18 @@ public void run() { @SuppressWarnings("try") @Override public Mono runAsync() { - return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + return telemetryHelper.instrumentRunAsync(ctx -> + runInternalAsync(ctx) + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(Exceptions.unwrap(e) + instanceof ContentMismatchException))) + .doOnError(e -> { + // Log the error for debugging but let legitimate failures propagate + LOGGER.atError() + .addKeyValue("error", e.getMessage()) + .addKeyValue("errorType", e.getClass().getSimpleName()) + .log("Test operation failed after retries"); + })); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java index 6d6dd563cb25..780ef149ece9 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java @@ -70,7 +70,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java index 738f1abc9c47..efef9f68a4c4 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java @@ -56,7 +56,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java index f19801ee61f2..310c7618e5d9 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java @@ -80,7 +80,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java index 059365f0aa15..bcb2baae2c8f 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java @@ -66,7 +66,7 @@ protected Mono runInternalAsync(Context span) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java index 1226c7f0d624..121c78f0333d 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java @@ -93,8 +93,9 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.getPageBlobAsyncClient().delete() - .then(tempSetupPageBlobClient.delete()) + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java index b8b4a5115473..352e2ae790aa 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java @@ -6,6 +6,7 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; @@ -14,18 +15,21 @@ import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.TelemetryHelper; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.StorageStressOptions; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; +import java.time.Duration; import java.time.Instant; -import java.util.Objects; import java.util.UUID; public abstract class PageBlobScenarioBase extends PerfStressTest { private static final String CONTAINER_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(PageBlobScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final BlobServiceClient noFaultServiceClient; private final BlobContainerClient syncContainerClient; @@ -72,10 +76,69 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); - return asyncNoFaultContainerClient.deleteIfExists() + return cleanupContainerWithRetry() + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Container cleanup failed"); + + return Mono.empty(); + }) .then(super.globalCleanupAsync()); } + /** + * Enhanced cleanup with timeout and retry logic to ensure containers are properly destroyed. + */ + private Mono cleanupContainerWithRetry() { + return asyncNoFaultContainerClient.deleteIfExists() + .then() // Convert Mono to Mono + .timeout(Duration.ofSeconds(30)) + .retry(3) + .onErrorResume(error -> { + // If container deletion fails, try to delete all blobs first then retry container deletion + return deleteAllBlobsInContainer() + .then(asyncNoFaultContainerClient.deleteIfExists()) + .then() // Convert Mono to Mono + .timeout(Duration.ofSeconds(30)) + .onErrorResume(finalError -> { + // Log the error but don't fail the test + LOGGER.atWarning() + .addKeyValue("error", finalError.getMessage()) + .log("Final container cleanup failed after retries"); + return Mono.empty(); + }); + }); + } + + /** + * Delete all blobs in the container to help with cleanup. + */ + private Mono deleteAllBlobsInContainer() { + return asyncNoFaultContainerClient.listBlobs() + .concatMap(blobItem -> + asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName()) + .deleteIfExists() + .onErrorResume(error -> { + // Log but continue - failures for individual blobs should not stop cleanup + LOGGER.atWarning() + .addKeyValue("blobName", blobItem.getName()) + .addKeyValue("error", error.getMessage()) + .log("Failed to delete blob during cleanup"); + return Mono.empty(); + })) + .then() + .timeout(Duration.ofSeconds(60)) + .onErrorResume(error -> { + // Log but continue - some blobs might have been deleted + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Blob cleanup partially failed"); + return Mono.empty(); + }); + } + @SuppressWarnings("try") @Override public void run() { @@ -85,8 +148,14 @@ public void run() { @SuppressWarnings("try") @Override public Mono runAsync() { - return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + return telemetryHelper.instrumentRunAsync(ctx -> + runInternalAsync(ctx) + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(Exceptions.unwrap(e) + instanceof ContentMismatchException)))) + .doOnError(e -> LOGGER.atError() + .addKeyValue("error", e.getMessage()) + .log("Test operation failed after retries")); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java index f30b4aeeeab6..b39efef299b7 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java @@ -79,7 +79,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java index 0393fae1e57f..d2af3a9cf70e 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java @@ -61,7 +61,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java index 41a6c68bdecf..24289287b368 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java @@ -107,7 +107,7 @@ private Path getTempPath(String prefix) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java index c221742dc651..13e6557e888c 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java @@ -69,8 +69,9 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.getPageBlobAsyncClient().delete() - .then(tempSetupPageBlobClient.delete()) + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-file-datalake-stress/pom.xml b/sdk/storage/azure-storage-file-datalake-stress/pom.xml index df12914f5100..7d243eefa694 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/pom.xml +++ b/sdk/storage/azure-storage-file-datalake-stress/pom.xml @@ -57,12 +57,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java index e64ff0684c01..965cf2e3d981 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java @@ -6,6 +6,7 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; @@ -14,17 +15,22 @@ import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; -import com.azure.storage.stress.TelemetryHelper; +import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.StorageStressOptions; +import com.azure.storage.stress.TelemetryHelper; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; +import java.time.Duration; import java.time.Instant; import java.util.UUID; public abstract class DataLakeScenarioBase extends PerfStressTest { private static final String FILE_SYSTEM_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(DataLakeScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final DataLakeFileSystemClient syncFileSystemClient; private final DataLakeFileSystemAsyncClient asyncFileSystemClient; @@ -72,10 +78,86 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); - return asyncNoFaultFileSystemClient.deleteIfExists() + return cleanupFileSystemWithRetry() + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("FileSystem cleanup failed"); + + return Mono.empty(); + }) .then(super.globalCleanupAsync()); } + private static final int DELETE_TIMEOUT_SECONDS = 30; + private static final int PATH_CLEANUP_TIMEOUT_SECONDS = 60; + private static final int MAX_RETRY_ATTEMPTS = 3; + + private Mono cleanupFileSystemWithRetry() { + return tryDeleteFileSystem() + .onErrorResume(error -> fallbackCleanup()); + } + + private Mono tryDeleteFileSystem() { + return asyncNoFaultFileSystemClient.deleteIfExists() + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)) + .retry(MAX_RETRY_ATTEMPTS); + } + + private Mono fallbackCleanup() { + return deleteAllPathsInFileSystem() + .then(tryDeleteFileSystemOnce()) + .onErrorResume(this::logCleanupFailure); + } + + private Mono tryDeleteFileSystemOnce() { + return asyncNoFaultFileSystemClient.deleteIfExists() + .then() + .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS)); + } + + private Mono logCleanupFailure(Throwable error) { + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Final file system cleanup failed after retries"); + return Mono.empty(); + } + + /** + * Deletes all paths in the file system sequentially to avoid throttling. + */ + private Mono deleteAllPathsInFileSystem() { + return asyncNoFaultFileSystemClient.listPaths() + .concatMap(this::deletePathQuietly) + .then() + .timeout(Duration.ofSeconds(PATH_CLEANUP_TIMEOUT_SECONDS)) + .onErrorResume(error -> { + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Path cleanup partially failed"); + return Mono.empty(); + }); + } + + private Mono deletePathQuietly(com.azure.storage.file.datalake.models.PathItem pathItem) { + DataLakePathDeleteOptions deleteOptions = new DataLakePathDeleteOptions(); + deleteOptions.setIsRecursive(true); + + if (pathItem.isDirectory()) { + return asyncNoFaultFileSystemClient.getDirectoryAsyncClient(pathItem.getName()) + .deleteIfExistsWithResponse(deleteOptions) + .onErrorResume(e -> Mono.empty()) + .then(); + } else { + return asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName()) + .deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(); + } + } + @SuppressWarnings("try") @Override public void run() { @@ -86,10 +168,15 @@ public void run() { @Override public Mono runAsync() { return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException))) + .doOnError(e -> LOGGER.atWarning() + .addKeyValue("error", e.getMessage()) + .log("DataLake test operation failed after retries")); } protected abstract void runInternal(Context context) throws Exception; + protected abstract Mono runInternalAsync(Context context); protected DataLakeFileSystemClient getSyncFileSystemClient() { diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java index 6dca37ecb92c..228425064123 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java @@ -72,7 +72,7 @@ public Mono cleanupAsync() { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java index dd6a3221bfb6..123f97ef49b1 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java @@ -60,7 +60,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } } diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java index 15a64abb228f..9b889f1a393c 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java @@ -87,7 +87,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } @@ -101,7 +101,7 @@ private Path getTempPath(String prefix) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-file-share-stress/pom.xml b/sdk/storage/azure-storage-file-share-stress/pom.xml index 48823a16964d..852242f0285d 100644 --- a/sdk/storage/azure-storage-file-share-stress/pom.xml +++ b/sdk/storage/azure-storage-file-share-stress/pom.xml @@ -57,12 +57,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java index 02be0b18f68b..7226e59d1e28 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java @@ -67,7 +67,7 @@ public Mono cleanupAsync() { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java index 870f0d7eb755..fad48901ac1f 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java @@ -6,19 +6,23 @@ import com.azure.core.http.policy.HttpLogDetailLevel; import com.azure.core.http.policy.HttpLogOptions; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.perf.test.core.PerfStressTest; import com.azure.storage.file.share.ShareAsyncClient; import com.azure.storage.file.share.ShareClient; +import com.azure.storage.file.share.ShareDirectoryAsyncClient; import com.azure.storage.file.share.ShareServiceAsyncClient; import com.azure.storage.file.share.ShareServiceClient; import com.azure.storage.file.share.ShareServiceClientBuilder; import com.azure.storage.file.share.models.ShareTokenIntent; +import com.azure.storage.stress.ContentMismatchException; import com.azure.storage.stress.TelemetryHelper; import com.azure.storage.stress.FaultInjectingHttpPolicy; import com.azure.storage.stress.FaultInjectionProbabilities; import com.azure.storage.stress.StorageStressOptions; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.time.Instant; @@ -26,6 +30,7 @@ public abstract class ShareScenarioBase extends PerfStressTest { private static final String SHARE_NAME = "stress-" + UUID.randomUUID(); + private static final ClientLogger LOGGER = new ClientLogger(ShareScenarioBase.class); protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass()); private final ShareClient syncShareClient; private final ShareAsyncClient asyncShareClient; @@ -74,10 +79,92 @@ public Mono globalSetupAsync() { @Override public Mono globalCleanupAsync() { telemetryHelper.recordEnd(startTime); - return asyncNoFaultShareClient.deleteIfExists() + return cleanupShareWithRetry() + .onErrorResume(error -> { + // Log cleanup failure but don't fail the overall test + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("Share cleanup failed"); + return Mono.empty(); + }) .then(super.globalCleanupAsync()); } + /** + * Enhanced cleanup with timeout and retry logic to ensure shares are properly destroyed. + */ + private Mono cleanupShareWithRetry() { + return asyncNoFaultShareClient.deleteIfExists() + .then() // Convert Mono to Mono + .timeout(java.time.Duration.ofSeconds(30)) + .retry(3) + .onErrorResume(error -> { + // If share deletion fails, try to delete all files first then retry + return deleteAllFilesInShare() + .then(asyncNoFaultShareClient.deleteIfExists()) + .then() // Convert Mono to Mono + .timeout(java.time.Duration.ofSeconds(30)) + .onErrorResume(finalError -> { + // Log the error but don't fail the test + LOGGER.atWarning() + .addKeyValue("error", finalError.getMessage()) + .log("Final share cleanup failed after retries"); + return Mono.empty(); + }); + }); + } + + /** + * Delete all files in the share to help with cleanup. + */ + private Mono deleteAllFilesInShare() { + return deleteDirectoryContentsRecursively(asyncNoFaultShareClient.getDirectoryClient("")) + .timeout(java.time.Duration.ofSeconds(60)) + .onErrorResume(error -> { + // Log but continue - some files might have been deleted + LOGGER.atWarning() + .addKeyValue("error", error.getMessage()) + .log("File cleanup partially failed"); + return Mono.empty(); + }); + } + + /** + * Recursively delete all contents of a directory (files first, then subdirectories). + */ + private Mono deleteDirectoryContentsRecursively( + ShareDirectoryAsyncClient directoryClient) { + return directoryClient.listFilesAndDirectories() + // Use concatMap to ensure we process each file/directory sequentially, which is important for correct deletion order + .concatMap(fileRef -> { + if (fileRef.isDirectory()) { + ShareDirectoryAsyncClient subDirClient = + directoryClient.getSubdirectoryClient(fileRef.getName()); + // First delete all contents recursively, then delete the directory itself + return deleteDirectoryContentsRecursively(subDirClient) + .then(subDirClient.deleteIfExists()) + .onErrorResume(error -> { + LOGGER.atWarning() + .addKeyValue("directory", fileRef.getName()) + .addKeyValue("error", error.getMessage()) + .log("Failed to delete directory during share cleanup; continuing with remaining items."); + return Mono.empty(); + }); + } else { + return directoryClient.getFileClient(fileRef.getName()) + .deleteIfExists() + .onErrorResume(error -> { + LOGGER.atWarning() + .addKeyValue("file", fileRef.getName()) + .addKeyValue("error", error.getMessage()) + .log("Failed to delete file during share cleanup; continuing with remaining items."); + return Mono.empty(); + }); + } + }) + .then(); + } + @SuppressWarnings("try") @Override public void run() { @@ -87,8 +174,14 @@ public void run() { @SuppressWarnings("try") @Override public Mono runAsync() { - return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx)) - .onErrorResume(e -> Mono.empty()); + return telemetryHelper.instrumentRunAsync(ctx -> + runInternalAsync(ctx) + .retryWhen(reactor.util.retry.Retry.max(3) + .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException))) + .doOnError(e -> { + // Log the error for debugging but let legitimate failures propagate + LOGGER.error("Share test operation failed after retries.", e); + })); } protected abstract void runInternal(Context context) throws Exception; diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java index 3689c2dd7f65..c3de9eb9da1f 100644 --- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java +++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java @@ -80,7 +80,7 @@ public Mono setupAsync() { @Override public Mono cleanupAsync() { - return asyncNoFaultClient.delete() + return asyncNoFaultClient.deleteIfExists() .then(super.cleanupAsync()); } @@ -94,7 +94,7 @@ private Path getTempPath(String prefix) { private static void deleteFile(Path path) { try { - path.toFile().delete(); + Files.deleteIfExists(path); } catch (Throwable e) { LOGGER.atError() .addKeyValue("path", path) diff --git a/sdk/storage/azure-storage-stress/pom.xml b/sdk/storage/azure-storage-stress/pom.xml index d8907fe67014..9a6cd7caf2a6 100644 --- a/sdk/storage/azure-storage-stress/pom.xml +++ b/sdk/storage/azure-storage-stress/pom.xml @@ -52,12 +52,12 @@ io.opentelemetry.instrumentation opentelemetry-runtime-telemetry-java8 - 2.24.0-alpha + 2.15.0-alpha io.opentelemetry.instrumentation opentelemetry-logback-appender-1.0 - 2.24.0-alpha + 2.15.0-alpha com.azure diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java index 9c53830d9e09..7e48798b30b0 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java @@ -43,33 +43,62 @@ public CrcInputStream(InputStream source) { @Override public synchronized int read() throws IOException { int b = inputStream.read(); - if (b >= 0) { - crc.update(b); - if (head.hasRemaining()) { - head.put((byte) b); - } - length++; - } else { - sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + if (b < 0) { + emitContentInfo(); + return b; + } + + crc.update(b); + if (head.hasRemaining()) { + head.put((byte) b); } + length++; return b; } @Override public synchronized int read(byte buf[], int off, int len) throws IOException { int read = inputStream.read(buf, off, len); - if (read >= 0) { - length += read; - crc.update(buf, off, read); - if (head.hasRemaining()) { - head.put(buf, off, Math.min(read, head.remaining())); - } - } else { - sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + if (read < 0) { + emitContentInfo(); + return read; } + + crc.update(buf, off, read); + if (head.hasRemaining()) { + head.put(buf, off, Math.min(read, head.remaining())); + } + length += read; return read; } + // Uses tryEmitValue instead of emitValue(FAIL_FAST) so that resubscriptions + // (SDK retries, verification passes) don't throw on the second EOF. + private void emitContentInfo() { + String baseErrorMessage = "Failed to emit content because "; + Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); + switch (emitResult) { + case OK: + case FAIL_TERMINATED: + // No action needed for successful or already-terminated emissions. + break; + case FAIL_CANCELLED: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink was previously interrupted by its consumer: " + emitResult)); + case FAIL_OVERFLOW: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the buffer is full: " + emitResult)); + case FAIL_NON_SERIALIZED: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "two threads called emit at " + + "once: " + emitResult)); + case FAIL_ZERO_SUBSCRIBER: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " + + "subscriber:" + emitResult)); + default: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "unexpected emit result: " + + emitResult)); + } + } + @Override public synchronized void mark(int readLimit) { if (markSupported) { @@ -136,6 +165,12 @@ public Flux convertStreamToByteBuffer() { return FluxUtil.fluxError(LOGGER, new UncheckedIOException(e)); } + // Reset CRC tracking state so resubscriptions (SDK retries or verification) + // compute the correct checksum from scratch. + crc.reset(); + length = 0; + head.clear(); + final long[] currentTotalLength = new long[1]; return Flux.generate(() -> inputStream, (is, sink) -> { long pos = currentTotalLength[0]; diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java index a118d45e6cc1..a9445c399cad 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java @@ -1,5 +1,6 @@ package com.azure.storage.stress; +import com.azure.core.util.logging.ClientLogger; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -13,16 +14,18 @@ public class CrcOutputStream extends OutputStream { private final CRC32 crc = new CRC32(); private long length = 0; private final ByteBuffer head = ByteBuffer.allocate(1024); + private static final ClientLogger LOGGER = new ClientLogger(CrcOutputStream.class); + @Override public synchronized void write(int b) { crc.update(b); if (head.hasRemaining()) { - head.put((byte)b); + head.put((byte) b); } - length ++; + length++; } - public synchronized void write(byte buf[], int off, int len) { + public synchronized void write(byte[] buf, int off, int len) { crc.update(buf, off, len); if (head.hasRemaining()) { head.put(buf, off, Math.min(len, head.remaining())); @@ -30,9 +33,33 @@ public synchronized void write(byte buf[], int off, int len) { length += len; } + // Uses tryEmitValue so that double-close (e.g. explicit close + try-with-resources) + // doesn't throw on the second call. @Override - public void close() throws IOException { - sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST); + public synchronized void close() throws IOException { + String baseErrorMessage = "Failed to emit content because"; + Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head)); + switch (emitResult) { + case OK: + case FAIL_TERMINATED: + // Expected successful outcomes; nothing further to do. + break; + case FAIL_CANCELLED: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink was previously interrupted by its consumer: " + emitResult)); + case FAIL_OVERFLOW: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the buffer is full: " + emitResult)); + case FAIL_NON_SERIALIZED: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " two threads called emit at once: " + emitResult)); + case FAIL_ZERO_SUBSCRIBER: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " the sink requires a subscriber: " + emitResult)); + default: + throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + + " an unexpected emit result was returned: " + emitResult)); + } super.close(); } diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java index e3a756bf0c51..4bf6e523eae1 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java @@ -124,7 +124,7 @@ public String getDescription() { Cpu.registerObservers(otel); MemoryPools.registerObservers(otel); Threads.registerObservers(otel); - GarbageCollector.registerObservers(otel, false); // false disables the capture of the GC cause + GarbageCollector.registerObservers(otel); OpenTelemetryAppender.install(otel); return otel; } @@ -142,7 +142,9 @@ public void instrumentRun(ThrowingFunction oneRun) { oneRun.run(ctx); trackSuccess(start, span); } catch (Throwable e) { - if (e.getMessage().contains("Timeout on blocking read") || e instanceof InterruptedException || e instanceof TimeoutException) { + String message = e.getMessage(); + if (e instanceof InterruptedException || e instanceof TimeoutException + || (message != null && message.contains("Timeout on blocking read"))) { trackCancellation(start, span); } else { trackFailure(start, e, span); @@ -201,10 +203,12 @@ private void trackFailure(Instant start, Throwable e, Span span) { // already a NativeIoException/TimeoutException if (unwrapped instanceof RuntimeException) { String message = unwrapped.getMessage(); - if (message.contains("NativeIoException")) { - unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE); - } else if (message.contains("TimeoutException")) { - unwrapped = new TimeoutException(message); + if (message != null) { + if (message.contains("NativeIoException")) { + unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE); + } else if (message.contains("TimeoutException")) { + unwrapped = new TimeoutException(message); + } } }