diff --git a/sdk/storage/azure-storage-blob-stress/pom.xml b/sdk/storage/azure-storage-blob-stress/pom.xml
index e644eebf05fd..39072049dc20 100644
--- a/sdk/storage/azure-storage-blob-stress/pom.xml
+++ b/sdk/storage/azure-storage-blob-stress/pom.xml
@@ -57,12 +57,12 @@
io.opentelemetry.instrumentation
opentelemetry-runtime-telemetry-java8
- 2.24.0-alpha
+ 2.15.0-alpha
io.opentelemetry.instrumentation
opentelemetry-logback-appender-1.0
- 2.24.0-alpha
+ 2.15.0-alpha
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java
index 9dd302492c3c..7543e7c86658 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlobOutputStream.java
@@ -70,7 +70,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java
index 89aefd7fc131..0c59b7a2851d 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/AppendBlock.java
@@ -62,8 +62,9 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.getAppendBlobAsyncClient().delete()
- .then(tempSetupBlobClient.delete())
+ return asyncNoFaultClient.getAppendBlobAsyncClient().deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then(tempSetupBlobClient.deleteIfExists())
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java
index 0e68fb7dc864..6bf3dc004108 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlobScenarioBase.java
@@ -6,6 +6,7 @@
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.perf.test.core.PerfStressTest;
@@ -14,17 +15,21 @@
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.stress.ContentMismatchException;
import com.azure.storage.stress.TelemetryHelper;
import com.azure.storage.stress.FaultInjectionProbabilities;
import com.azure.storage.stress.FaultInjectingHttpPolicy;
import com.azure.storage.stress.StorageStressOptions;
+import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.util.UUID;
+import java.time.Duration;
public abstract class BlobScenarioBase extends PerfStressTest {
private static final String CONTAINER_NAME = "stress-" + UUID.randomUUID();
+ private static final ClientLogger LOGGER = new ClientLogger(BlobScenarioBase.class);
protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass());
private final BlobContainerClient syncContainerClient;
private final BlobContainerAsyncClient asyncContainerClient;
@@ -72,10 +77,76 @@ public Mono globalSetupAsync() {
@Override
public Mono globalCleanupAsync() {
telemetryHelper.recordEnd(startTime);
- return asyncNoFaultContainerClient.deleteIfExists()
+ return cleanupContainerWithRetry()
+ .onErrorResume(error -> {
+ // Log cleanup failure but don't fail the overall test
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Container cleanup failed");
+
+ return Mono.empty();
+ })
.then(super.globalCleanupAsync());
}
+ private static final int DELETE_TIMEOUT_SECONDS = 30;
+ private static final int BLOB_CLEANUP_TIMEOUT_SECONDS = 60;
+ private static final int MAX_RETRY_ATTEMPTS = 3;
+
+ private Mono cleanupContainerWithRetry() {
+ return tryDeleteContainer()
+ .onErrorResume(error -> fallbackCleanup());
+ }
+
+ private Mono tryDeleteContainer() {
+ return asyncNoFaultContainerClient.deleteIfExists()
+ .then()
+ .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS))
+ .retry(MAX_RETRY_ATTEMPTS);
+ }
+
+ private Mono fallbackCleanup() {
+ return deleteAllBlobsInContainer()
+ .then(tryDeleteContainerOnce())
+ .onErrorResume(this::logCleanupFailure);
+ }
+
+ private Mono tryDeleteContainerOnce() {
+ return asyncNoFaultContainerClient.deleteIfExists()
+ .then()
+ .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS));
+ }
+
+ private Mono logCleanupFailure(Throwable error) {
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Final container cleanup failed after retries");
+ return Mono.empty();
+ }
+
+ /**
+ * Deletes all blobs in the container sequentially to avoid throttling.
+ */
+ private Mono deleteAllBlobsInContainer() {
+ return asyncNoFaultContainerClient.listBlobs()
+ .concatMap(this::deleteBlobQuietly)
+ .then()
+ .timeout(Duration.ofSeconds(BLOB_CLEANUP_TIMEOUT_SECONDS))
+ .onErrorResume(error -> {
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Blob cleanup partially failed");
+ return Mono.empty();
+ });
+ }
+
+ private Mono deleteBlobQuietly(com.azure.storage.blob.models.BlobItem blobItem) {
+ return asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName())
+ .deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then();
+ }
+
@SuppressWarnings("try")
@Override
public void run() {
@@ -85,8 +156,18 @@ public void run() {
@SuppressWarnings("try")
@Override
public Mono runAsync() {
- return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx))
- .onErrorResume(e -> Mono.empty());
+ return telemetryHelper.instrumentRunAsync(ctx ->
+ runInternalAsync(ctx)
+ .retryWhen(reactor.util.retry.Retry.max(3)
+ .filter(e -> !(Exceptions.unwrap(e)
+ instanceof ContentMismatchException)))
+ .doOnError(e -> {
+ // Log the error for debugging but let legitimate failures propagate
+ LOGGER.atError()
+ .addKeyValue("error", e.getMessage())
+ .addKeyValue("errorType", e.getClass().getSimpleName())
+ .log("Test operation failed after retries");
+ }));
}
protected abstract void runInternal(Context context) throws Exception;
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java
index 6d6dd563cb25..780ef149ece9 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobOutputStream.java
@@ -70,7 +70,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java
index 738f1abc9c47..efef9f68a4c4 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/BlockBlobUpload.java
@@ -56,7 +56,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java
index f19801ee61f2..310c7618e5d9 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/CommitBlockList.java
@@ -80,7 +80,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java
index 059365f0aa15..bcb2baae2c8f 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/DownloadToFile.java
@@ -66,7 +66,7 @@ protected Mono runInternalAsync(Context span) {
private static void deleteFile(Path path) {
try {
- path.toFile().delete();
+ Files.deleteIfExists(path);
} catch (Throwable e) {
LOGGER.atError()
.addKeyValue("path", path)
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java
index 1226c7f0d624..121c78f0333d 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobOutputStream.java
@@ -93,8 +93,9 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.getPageBlobAsyncClient().delete()
- .then(tempSetupPageBlobClient.delete())
+ return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then(tempSetupPageBlobClient.deleteIfExists())
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java
index b8b4a5115473..352e2ae790aa 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/PageBlobScenarioBase.java
@@ -6,6 +6,7 @@
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.perf.test.core.PerfStressTest;
@@ -14,18 +15,21 @@
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.stress.ContentMismatchException;
import com.azure.storage.stress.TelemetryHelper;
import com.azure.storage.stress.FaultInjectingHttpPolicy;
import com.azure.storage.stress.FaultInjectionProbabilities;
import com.azure.storage.stress.StorageStressOptions;
+import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
+import java.time.Duration;
import java.time.Instant;
-import java.util.Objects;
import java.util.UUID;
public abstract class PageBlobScenarioBase extends PerfStressTest {
private static final String CONTAINER_NAME = "stress-" + UUID.randomUUID();
+ private static final ClientLogger LOGGER = new ClientLogger(PageBlobScenarioBase.class);
protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass());
private final BlobServiceClient noFaultServiceClient;
private final BlobContainerClient syncContainerClient;
@@ -72,10 +76,69 @@ public Mono globalSetupAsync() {
@Override
public Mono globalCleanupAsync() {
telemetryHelper.recordEnd(startTime);
- return asyncNoFaultContainerClient.deleteIfExists()
+ return cleanupContainerWithRetry()
+ .onErrorResume(error -> {
+ // Log cleanup failure but don't fail the overall test
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Container cleanup failed");
+
+ return Mono.empty();
+ })
.then(super.globalCleanupAsync());
}
+ /**
+ * Enhanced cleanup with timeout and retry logic to ensure containers are properly destroyed.
+ */
+ private Mono cleanupContainerWithRetry() {
+ return asyncNoFaultContainerClient.deleteIfExists()
+ .then() // Convert Mono to Mono
+ .timeout(Duration.ofSeconds(30))
+ .retry(3)
+ .onErrorResume(error -> {
+ // If container deletion fails, try to delete all blobs first then retry container deletion
+ return deleteAllBlobsInContainer()
+ .then(asyncNoFaultContainerClient.deleteIfExists())
+ .then() // Convert Mono to Mono
+ .timeout(Duration.ofSeconds(30))
+ .onErrorResume(finalError -> {
+ // Log the error but don't fail the test
+ LOGGER.atWarning()
+ .addKeyValue("error", finalError.getMessage())
+ .log("Final container cleanup failed after retries");
+ return Mono.empty();
+ });
+ });
+ }
+
+ /**
+ * Delete all blobs in the container to help with cleanup.
+ */
+ private Mono deleteAllBlobsInContainer() {
+ return asyncNoFaultContainerClient.listBlobs()
+ .concatMap(blobItem ->
+ asyncNoFaultContainerClient.getBlobAsyncClient(blobItem.getName())
+ .deleteIfExists()
+ .onErrorResume(error -> {
+ // Log but continue - failures for individual blobs should not stop cleanup
+ LOGGER.atWarning()
+ .addKeyValue("blobName", blobItem.getName())
+ .addKeyValue("error", error.getMessage())
+ .log("Failed to delete blob during cleanup");
+ return Mono.empty();
+ }))
+ .then()
+ .timeout(Duration.ofSeconds(60))
+ .onErrorResume(error -> {
+ // Log but continue - some blobs might have been deleted
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Blob cleanup partially failed");
+ return Mono.empty();
+ });
+ }
+
@SuppressWarnings("try")
@Override
public void run() {
@@ -85,8 +148,14 @@ public void run() {
@SuppressWarnings("try")
@Override
public Mono runAsync() {
- return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx))
- .onErrorResume(e -> Mono.empty());
+ return telemetryHelper.instrumentRunAsync(ctx ->
+ runInternalAsync(ctx)
+ .retryWhen(reactor.util.retry.Retry.max(3)
+ .filter(e -> !(Exceptions.unwrap(e)
+ instanceof ContentMismatchException))))
+ .doOnError(e -> LOGGER.atError()
+ .addKeyValue("error", e.getMessage())
+ .log("Test operation failed after retries"));
}
protected abstract void runInternal(Context context) throws Exception;
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java
index f30b4aeeeab6..b39efef299b7 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/StageBlock.java
@@ -79,7 +79,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java
index 0393fae1e57f..d2af3a9cf70e 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/Upload.java
@@ -61,7 +61,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java
index 41a6c68bdecf..24289287b368 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadFromFile.java
@@ -107,7 +107,7 @@ private Path getTempPath(String prefix) {
private static void deleteFile(Path path) {
try {
- path.toFile().delete();
+ Files.deleteIfExists(path);
} catch (Throwable e) {
LOGGER.atError()
.addKeyValue("path", path)
diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java
index c221742dc651..13e6557e888c 100644
--- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java
+++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/UploadPages.java
@@ -69,8 +69,9 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.getPageBlobAsyncClient().delete()
- .then(tempSetupPageBlobClient.delete())
+ return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then(tempSetupPageBlobClient.deleteIfExists())
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-file-datalake-stress/pom.xml b/sdk/storage/azure-storage-file-datalake-stress/pom.xml
index df12914f5100..7d243eefa694 100644
--- a/sdk/storage/azure-storage-file-datalake-stress/pom.xml
+++ b/sdk/storage/azure-storage-file-datalake-stress/pom.xml
@@ -57,12 +57,12 @@
io.opentelemetry.instrumentation
opentelemetry-runtime-telemetry-java8
- 2.24.0-alpha
+ 2.15.0-alpha
io.opentelemetry.instrumentation
opentelemetry-logback-appender-1.0
- 2.24.0-alpha
+ 2.15.0-alpha
diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java
index e64ff0684c01..965cf2e3d981 100644
--- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java
+++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/DataLakeScenarioBase.java
@@ -6,6 +6,7 @@
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.perf.test.core.PerfStressTest;
@@ -14,17 +15,22 @@
import com.azure.storage.file.datalake.DataLakeServiceAsyncClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-import com.azure.storage.stress.TelemetryHelper;
+import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions;
+import com.azure.storage.stress.ContentMismatchException;
import com.azure.storage.stress.FaultInjectingHttpPolicy;
import com.azure.storage.stress.FaultInjectionProbabilities;
import com.azure.storage.stress.StorageStressOptions;
+import com.azure.storage.stress.TelemetryHelper;
+import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
+import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
public abstract class DataLakeScenarioBase extends PerfStressTest {
private static final String FILE_SYSTEM_NAME = "stress-" + UUID.randomUUID();
+ private static final ClientLogger LOGGER = new ClientLogger(DataLakeScenarioBase.class);
protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass());
private final DataLakeFileSystemClient syncFileSystemClient;
private final DataLakeFileSystemAsyncClient asyncFileSystemClient;
@@ -72,10 +78,86 @@ public Mono globalSetupAsync() {
@Override
public Mono globalCleanupAsync() {
telemetryHelper.recordEnd(startTime);
- return asyncNoFaultFileSystemClient.deleteIfExists()
+ return cleanupFileSystemWithRetry()
+ .onErrorResume(error -> {
+ // Log cleanup failure but don't fail the overall test
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("FileSystem cleanup failed");
+
+ return Mono.empty();
+ })
.then(super.globalCleanupAsync());
}
+ private static final int DELETE_TIMEOUT_SECONDS = 30;
+ private static final int PATH_CLEANUP_TIMEOUT_SECONDS = 60;
+ private static final int MAX_RETRY_ATTEMPTS = 3;
+
+ private Mono cleanupFileSystemWithRetry() {
+ return tryDeleteFileSystem()
+ .onErrorResume(error -> fallbackCleanup());
+ }
+
+ private Mono tryDeleteFileSystem() {
+ return asyncNoFaultFileSystemClient.deleteIfExists()
+ .then()
+ .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS))
+ .retry(MAX_RETRY_ATTEMPTS);
+ }
+
+ private Mono fallbackCleanup() {
+ return deleteAllPathsInFileSystem()
+ .then(tryDeleteFileSystemOnce())
+ .onErrorResume(this::logCleanupFailure);
+ }
+
+ private Mono tryDeleteFileSystemOnce() {
+ return asyncNoFaultFileSystemClient.deleteIfExists()
+ .then()
+ .timeout(Duration.ofSeconds(DELETE_TIMEOUT_SECONDS));
+ }
+
+ private Mono logCleanupFailure(Throwable error) {
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Final file system cleanup failed after retries");
+ return Mono.empty();
+ }
+
+ /**
+ * Deletes all paths in the file system sequentially to avoid throttling.
+ */
+ private Mono deleteAllPathsInFileSystem() {
+ return asyncNoFaultFileSystemClient.listPaths()
+ .concatMap(this::deletePathQuietly)
+ .then()
+ .timeout(Duration.ofSeconds(PATH_CLEANUP_TIMEOUT_SECONDS))
+ .onErrorResume(error -> {
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Path cleanup partially failed");
+ return Mono.empty();
+ });
+ }
+
+ private Mono deletePathQuietly(com.azure.storage.file.datalake.models.PathItem pathItem) {
+ DataLakePathDeleteOptions deleteOptions = new DataLakePathDeleteOptions();
+ deleteOptions.setIsRecursive(true);
+
+ if (pathItem.isDirectory()) {
+ return asyncNoFaultFileSystemClient.getDirectoryAsyncClient(pathItem.getName())
+ .deleteIfExistsWithResponse(deleteOptions)
+ .onErrorResume(e -> Mono.empty())
+ .then();
+ } else {
+ return asyncNoFaultFileSystemClient.getFileAsyncClient(pathItem.getName())
+ .deleteIfExists()
+ .onErrorResume(e -> Mono.empty())
+ .then();
+ }
+ }
+
@SuppressWarnings("try")
@Override
public void run() {
@@ -86,10 +168,15 @@ public void run() {
@Override
public Mono runAsync() {
return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx))
- .onErrorResume(e -> Mono.empty());
+ .retryWhen(reactor.util.retry.Retry.max(3)
+ .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException)))
+ .doOnError(e -> LOGGER.atWarning()
+ .addKeyValue("error", e.getMessage())
+ .log("DataLake test operation failed after retries"));
}
protected abstract void runInternal(Context context) throws Exception;
+
protected abstract Mono runInternalAsync(Context context);
protected DataLakeFileSystemClient getSyncFileSystemClient() {
diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java
index 6dca37ecb92c..228425064123 100644
--- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java
+++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/ReadToFile.java
@@ -72,7 +72,7 @@ public Mono cleanupAsync() {
private static void deleteFile(Path path) {
try {
- path.toFile().delete();
+ Files.deleteIfExists(path);
} catch (Throwable e) {
LOGGER.atError()
.addKeyValue("path", path)
diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java
index dd6a3221bfb6..123f97ef49b1 100644
--- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java
+++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/Upload.java
@@ -60,7 +60,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
}
diff --git a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java
index 15a64abb228f..9b889f1a393c 100644
--- a/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java
+++ b/sdk/storage/azure-storage-file-datalake-stress/src/main/java/com/azure/storage/file/datalake/stress/UploadFromFile.java
@@ -87,7 +87,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
@@ -101,7 +101,7 @@ private Path getTempPath(String prefix) {
private static void deleteFile(Path path) {
try {
- path.toFile().delete();
+ Files.deleteIfExists(path);
} catch (Throwable e) {
LOGGER.atError()
.addKeyValue("path", path)
diff --git a/sdk/storage/azure-storage-file-share-stress/pom.xml b/sdk/storage/azure-storage-file-share-stress/pom.xml
index 48823a16964d..852242f0285d 100644
--- a/sdk/storage/azure-storage-file-share-stress/pom.xml
+++ b/sdk/storage/azure-storage-file-share-stress/pom.xml
@@ -57,12 +57,12 @@
io.opentelemetry.instrumentation
opentelemetry-runtime-telemetry-java8
- 2.24.0-alpha
+ 2.15.0-alpha
io.opentelemetry.instrumentation
opentelemetry-logback-appender-1.0
- 2.24.0-alpha
+ 2.15.0-alpha
diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java
index 02be0b18f68b..7226e59d1e28 100644
--- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java
+++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/DownloadToFile.java
@@ -67,7 +67,7 @@ public Mono cleanupAsync() {
private static void deleteFile(Path path) {
try {
- path.toFile().delete();
+ Files.deleteIfExists(path);
} catch (Throwable e) {
LOGGER.atError()
.addKeyValue("path", path)
diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java
index 870f0d7eb755..fad48901ac1f 100644
--- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java
+++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/ShareScenarioBase.java
@@ -6,19 +6,23 @@
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.perf.test.core.PerfStressTest;
import com.azure.storage.file.share.ShareAsyncClient;
import com.azure.storage.file.share.ShareClient;
+import com.azure.storage.file.share.ShareDirectoryAsyncClient;
import com.azure.storage.file.share.ShareServiceAsyncClient;
import com.azure.storage.file.share.ShareServiceClient;
import com.azure.storage.file.share.ShareServiceClientBuilder;
import com.azure.storage.file.share.models.ShareTokenIntent;
+import com.azure.storage.stress.ContentMismatchException;
import com.azure.storage.stress.TelemetryHelper;
import com.azure.storage.stress.FaultInjectingHttpPolicy;
import com.azure.storage.stress.FaultInjectionProbabilities;
import com.azure.storage.stress.StorageStressOptions;
+import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import java.time.Instant;
@@ -26,6 +30,7 @@
public abstract class ShareScenarioBase extends PerfStressTest {
private static final String SHARE_NAME = "stress-" + UUID.randomUUID();
+ private static final ClientLogger LOGGER = new ClientLogger(ShareScenarioBase.class);
protected final TelemetryHelper telemetryHelper = new TelemetryHelper(this.getClass());
private final ShareClient syncShareClient;
private final ShareAsyncClient asyncShareClient;
@@ -74,10 +79,92 @@ public Mono globalSetupAsync() {
@Override
public Mono globalCleanupAsync() {
telemetryHelper.recordEnd(startTime);
- return asyncNoFaultShareClient.deleteIfExists()
+ return cleanupShareWithRetry()
+ .onErrorResume(error -> {
+ // Log cleanup failure but don't fail the overall test
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("Share cleanup failed");
+ return Mono.empty();
+ })
.then(super.globalCleanupAsync());
}
+ /**
+ * Enhanced cleanup with timeout and retry logic to ensure shares are properly destroyed.
+ */
+ private Mono cleanupShareWithRetry() {
+ return asyncNoFaultShareClient.deleteIfExists()
+ .then() // Convert Mono to Mono
+ .timeout(java.time.Duration.ofSeconds(30))
+ .retry(3)
+ .onErrorResume(error -> {
+ // If share deletion fails, try to delete all files first then retry
+ return deleteAllFilesInShare()
+ .then(asyncNoFaultShareClient.deleteIfExists())
+ .then() // Convert Mono to Mono
+ .timeout(java.time.Duration.ofSeconds(30))
+ .onErrorResume(finalError -> {
+ // Log the error but don't fail the test
+ LOGGER.atWarning()
+ .addKeyValue("error", finalError.getMessage())
+ .log("Final share cleanup failed after retries");
+ return Mono.empty();
+ });
+ });
+ }
+
+ /**
+ * Delete all files in the share to help with cleanup.
+ */
+ private Mono deleteAllFilesInShare() {
+ return deleteDirectoryContentsRecursively(asyncNoFaultShareClient.getDirectoryClient(""))
+ .timeout(java.time.Duration.ofSeconds(60))
+ .onErrorResume(error -> {
+ // Log but continue - some files might have been deleted
+ LOGGER.atWarning()
+ .addKeyValue("error", error.getMessage())
+ .log("File cleanup partially failed");
+ return Mono.empty();
+ });
+ }
+
+ /**
+ * Recursively delete all contents of a directory (files first, then subdirectories).
+ */
+ private Mono deleteDirectoryContentsRecursively(
+ ShareDirectoryAsyncClient directoryClient) {
+ return directoryClient.listFilesAndDirectories()
+ // Use concatMap to ensure we process each file/directory sequentially, which is important for correct deletion order
+ .concatMap(fileRef -> {
+ if (fileRef.isDirectory()) {
+ ShareDirectoryAsyncClient subDirClient =
+ directoryClient.getSubdirectoryClient(fileRef.getName());
+ // First delete all contents recursively, then delete the directory itself
+ return deleteDirectoryContentsRecursively(subDirClient)
+ .then(subDirClient.deleteIfExists())
+ .onErrorResume(error -> {
+ LOGGER.atWarning()
+ .addKeyValue("directory", fileRef.getName())
+ .addKeyValue("error", error.getMessage())
+ .log("Failed to delete directory during share cleanup; continuing with remaining items.");
+ return Mono.empty();
+ });
+ } else {
+ return directoryClient.getFileClient(fileRef.getName())
+ .deleteIfExists()
+ .onErrorResume(error -> {
+ LOGGER.atWarning()
+ .addKeyValue("file", fileRef.getName())
+ .addKeyValue("error", error.getMessage())
+ .log("Failed to delete file during share cleanup; continuing with remaining items.");
+ return Mono.empty();
+ });
+ }
+ })
+ .then();
+ }
+
@SuppressWarnings("try")
@Override
public void run() {
@@ -87,8 +174,14 @@ public void run() {
@SuppressWarnings("try")
@Override
public Mono runAsync() {
- return telemetryHelper.instrumentRunAsync(ctx -> runInternalAsync(ctx))
- .onErrorResume(e -> Mono.empty());
+ return telemetryHelper.instrumentRunAsync(ctx ->
+ runInternalAsync(ctx)
+ .retryWhen(reactor.util.retry.Retry.max(3)
+ .filter(e -> !(Exceptions.unwrap(e) instanceof ContentMismatchException)))
+ .doOnError(e -> {
+ // Log the error for debugging but let legitimate failures propagate
+ LOGGER.error("Share test operation failed after retries.", e);
+ }));
}
protected abstract void runInternal(Context context) throws Exception;
diff --git a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java
index 3689c2dd7f65..c3de9eb9da1f 100644
--- a/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java
+++ b/sdk/storage/azure-storage-file-share-stress/src/main/java/com/azure/storage/file/share/stress/UploadFromFile.java
@@ -80,7 +80,7 @@ public Mono setupAsync() {
@Override
public Mono cleanupAsync() {
- return asyncNoFaultClient.delete()
+ return asyncNoFaultClient.deleteIfExists()
.then(super.cleanupAsync());
}
@@ -94,7 +94,7 @@ private Path getTempPath(String prefix) {
private static void deleteFile(Path path) {
try {
- path.toFile().delete();
+ Files.deleteIfExists(path);
} catch (Throwable e) {
LOGGER.atError()
.addKeyValue("path", path)
diff --git a/sdk/storage/azure-storage-stress/pom.xml b/sdk/storage/azure-storage-stress/pom.xml
index d8907fe67014..9a6cd7caf2a6 100644
--- a/sdk/storage/azure-storage-stress/pom.xml
+++ b/sdk/storage/azure-storage-stress/pom.xml
@@ -52,12 +52,12 @@
io.opentelemetry.instrumentation
opentelemetry-runtime-telemetry-java8
- 2.24.0-alpha
+ 2.15.0-alpha
io.opentelemetry.instrumentation
opentelemetry-logback-appender-1.0
- 2.24.0-alpha
+ 2.15.0-alpha
com.azure
diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java
index 9c53830d9e09..7e48798b30b0 100644
--- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java
+++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcInputStream.java
@@ -43,33 +43,62 @@ public CrcInputStream(InputStream source) {
@Override
public synchronized int read() throws IOException {
int b = inputStream.read();
- if (b >= 0) {
- crc.update(b);
- if (head.hasRemaining()) {
- head.put((byte) b);
- }
- length++;
- } else {
- sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST);
+ if (b < 0) {
+ emitContentInfo();
+ return b;
+ }
+
+ crc.update(b);
+ if (head.hasRemaining()) {
+ head.put((byte) b);
}
+ length++;
return b;
}
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
int read = inputStream.read(buf, off, len);
- if (read >= 0) {
- length += read;
- crc.update(buf, off, read);
- if (head.hasRemaining()) {
- head.put(buf, off, Math.min(read, head.remaining()));
- }
- } else {
- sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST);
+ if (read < 0) {
+ emitContentInfo();
+ return read;
}
+
+ crc.update(buf, off, read);
+ if (head.hasRemaining()) {
+ head.put(buf, off, Math.min(read, head.remaining()));
+ }
+ length += read;
return read;
}
+ // Uses tryEmitValue instead of emitValue(FAIL_FAST) so that resubscriptions
+ // (SDK retries, verification passes) don't throw on the second EOF.
+ private void emitContentInfo() {
+ String baseErrorMessage = "Failed to emit content because ";
+ Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head));
+ switch (emitResult) {
+ case OK:
+ case FAIL_TERMINATED:
+ // No action needed for successful or already-terminated emissions.
+ break;
+ case FAIL_CANCELLED:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage +
+ " the sink was previously interrupted by its consumer: " + emitResult));
+ case FAIL_OVERFLOW:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the buffer is full: " + emitResult));
+ case FAIL_NON_SERIALIZED:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "two threads called emit at " +
+ "once: " + emitResult));
+ case FAIL_ZERO_SUBSCRIBER:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "the sink requires a " +
+ "subscriber:" + emitResult));
+ default:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage + "unexpected emit result: "
+ + emitResult));
+ }
+ }
+
@Override
public synchronized void mark(int readLimit) {
if (markSupported) {
@@ -136,6 +165,12 @@ public Flux convertStreamToByteBuffer() {
return FluxUtil.fluxError(LOGGER, new UncheckedIOException(e));
}
+ // Reset CRC tracking state so resubscriptions (SDK retries or verification)
+ // compute the correct checksum from scratch.
+ crc.reset();
+ length = 0;
+ head.clear();
+
final long[] currentTotalLength = new long[1];
return Flux.generate(() -> inputStream, (is, sink) -> {
long pos = currentTotalLength[0];
diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java
index a118d45e6cc1..a9445c399cad 100644
--- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java
+++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/CrcOutputStream.java
@@ -1,5 +1,6 @@
package com.azure.storage.stress;
+import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
@@ -13,16 +14,18 @@ public class CrcOutputStream extends OutputStream {
private final CRC32 crc = new CRC32();
private long length = 0;
private final ByteBuffer head = ByteBuffer.allocate(1024);
+ private static final ClientLogger LOGGER = new ClientLogger(CrcOutputStream.class);
+
@Override
public synchronized void write(int b) {
crc.update(b);
if (head.hasRemaining()) {
- head.put((byte)b);
+ head.put((byte) b);
}
- length ++;
+ length++;
}
- public synchronized void write(byte buf[], int off, int len) {
+ public synchronized void write(byte[] buf, int off, int len) {
crc.update(buf, off, len);
if (head.hasRemaining()) {
head.put(buf, off, Math.min(len, head.remaining()));
@@ -30,9 +33,33 @@ public synchronized void write(byte buf[], int off, int len) {
length += len;
}
+ // Uses tryEmitValue so that double-close (e.g. explicit close + try-with-resources)
+ // doesn't throw on the second call.
@Override
- public void close() throws IOException {
- sink.emitValue(new ContentInfo(crc.getValue(), length, head), Sinks.EmitFailureHandler.FAIL_FAST);
+ public synchronized void close() throws IOException {
+ String baseErrorMessage = "Failed to emit content because";
+ Sinks.EmitResult emitResult = sink.tryEmitValue(new ContentInfo(crc.getValue(), length, head));
+ switch (emitResult) {
+ case OK:
+ case FAIL_TERMINATED:
+ // Expected successful outcomes; nothing further to do.
+ break;
+ case FAIL_CANCELLED:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage
+ + " the sink was previously interrupted by its consumer: " + emitResult));
+ case FAIL_OVERFLOW:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage
+ + " the buffer is full: " + emitResult));
+ case FAIL_NON_SERIALIZED:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage
+ + " two threads called emit at once: " + emitResult));
+ case FAIL_ZERO_SUBSCRIBER:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage
+ + " the sink requires a subscriber: " + emitResult));
+ default:
+ throw LOGGER.logExceptionAsError(new RuntimeException(baseErrorMessage
+ + " an unexpected emit result was returned: " + emitResult));
+ }
super.close();
}
diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java
index e3a756bf0c51..4bf6e523eae1 100644
--- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java
+++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java
@@ -124,7 +124,7 @@ public String getDescription() {
Cpu.registerObservers(otel);
MemoryPools.registerObservers(otel);
Threads.registerObservers(otel);
- GarbageCollector.registerObservers(otel, false); // false disables the capture of the GC cause
+ GarbageCollector.registerObservers(otel);
OpenTelemetryAppender.install(otel);
return otel;
}
@@ -142,7 +142,9 @@ public void instrumentRun(ThrowingFunction oneRun) {
oneRun.run(ctx);
trackSuccess(start, span);
} catch (Throwable e) {
- if (e.getMessage().contains("Timeout on blocking read") || e instanceof InterruptedException || e instanceof TimeoutException) {
+ String message = e.getMessage();
+ if (e instanceof InterruptedException || e instanceof TimeoutException
+ || (message != null && message.contains("Timeout on blocking read"))) {
trackCancellation(start, span);
} else {
trackFailure(start, e, span);
@@ -201,10 +203,12 @@ private void trackFailure(Instant start, Throwable e, Span span) {
// already a NativeIoException/TimeoutException
if (unwrapped instanceof RuntimeException) {
String message = unwrapped.getMessage();
- if (message.contains("NativeIoException")) {
- unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE);
- } else if (message.contains("TimeoutException")) {
- unwrapped = new TimeoutException(message);
+ if (message != null) {
+ if (message.contains("NativeIoException")) {
+ unwrapped = new io.netty.channel.unix.Errors.NativeIoException("recvAddress", Errors.ERRNO_ECONNRESET_NEGATIVE);
+ } else if (message.contains("TimeoutException")) {
+ unwrapped = new TimeoutException(message);
+ }
}
}