Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/storage/azure-storage-blob",
"Tag": "java/storage/azure-storage-blob_1f689f90f0"
"Tag": "java/storage/azure-storage-blob_4691350e44"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.storage.blob.options.BlobDownloadContentOptions;
import com.azure.storage.blob.options.BlobDownloadStreamOptions;
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
import com.azure.storage.blob.options.BlobUploadFromFileOptions;
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.ContentValidationAlgorithm;
import com.azure.storage.common.implementation.Constants;
Expand All @@ -22,6 +23,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
Expand All @@ -35,6 +37,7 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -46,6 +49,13 @@
*/
public class BlobContentValidationAsyncDownloadTests extends BlobTestBase {
private static final int TEN_MB = 10 * Constants.MB;
/**
* {@link BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases()} starts at ~96 MiB; above this threshold fuzzy
* parallel download helpers use temp files + {@link BlobTestBase#compareFiles(File, File, long, long)} so the full
* payload never lives twice in heap.
*/
private static final int FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES = 96 * Constants.MB;

private final List<File> createdFiles = new ArrayList<>();

@AfterEach
Expand Down Expand Up @@ -439,4 +449,113 @@ public void interruptMultipleTimesWithDataIntact() {
assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded));
}

// ---------- Fuzzy parallel download (deterministic grids) ----------

@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadReplayableCases")
public void fuzzyParallelDownloadReplayableRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTripAsync("replayable", payloadBytes, blockSizeBytes, maxConcurrency);
}

@LiveOnly // payload > blockSize for every tuple; chunked range GETs across many requests.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadMediumMultiPartCases")
public void fuzzyParallelDownloadMediumMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTripAsync("mediumMultiPart", payloadBytes, blockSizeBytes, maxConcurrency);
}

@LiveOnly // payload >> blockSize; ~96-320 MiB downloads.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases")
public void fuzzyParallelDownloadLargeMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTripAsync("largeMultiPart", payloadBytes, blockSizeBytes, maxConcurrency);
}

@LiveOnly // ~1 GiB single case; far too large for the test proxy.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadOneGiBCases")
public void fuzzyParallelDownloadOneGiBRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTripAsync("oneGiB", payloadBytes, blockSizeBytes, maxConcurrency);
}

private void assertParallelDownloadFuzzyRoundTripAsync(String caseKind, int payloadBytes, long blockSizeBytes,
int maxConcurrency) throws IOException {
List<HttpHeaders> recorded = new CopyOnWriteArrayList<>();
BlobAsyncClient client = createBlobAsyncClientWithRequestSniffer(recorded);

ParallelTransferOptions parallelOptions
= new ParallelTransferOptions().setBlockSizeLong(blockSizeBytes).setMaxConcurrency(maxConcurrency);

String assertionMessage = "Fuzzy parallel download [" + caseKind + "] payloadBytes=" + payloadBytes
+ ", blockSize=" + blockSizeBytes + ", maxConcurrency=" + maxConcurrency;

if (payloadBytes >= FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES) {
File sourceFile = getRandomFile(payloadBytes);
sourceFile.deleteOnExit();
createdFiles.add(sourceFile);
File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-async", ".bin").toFile();
outFile.deleteOnExit();
createdFiles.add(outFile);
Files.deleteIfExists(outFile.toPath());

BlobUploadFromFileOptions uploadOptions
= new BlobUploadFromFileOptions(sourceFile.getAbsolutePath()).setParallelTransferOptions(
new com.azure.storage.blob.models.ParallelTransferOptions().setBlockSizeLong(blockSizeBytes)
.setMaxConcurrency(maxConcurrency));
assertNotNull(client.uploadFromFileWithResponse(uploadOptions).block().getValue().getETag(),
assertionMessage);

BlobDownloadToFileOptions downloadOptions
= new BlobDownloadToFileOptions(outFile.toPath().toString()).setParallelTransferOptions(parallelOptions)
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);

StepVerifier.create(client.downloadToFileWithResponse(downloadOptions))
.assertNext(r -> assertNotNull(r.getValue(), assertionMessage))
.verifyComplete();

assertTrue(compareFiles(sourceFile, outFile, 0, payloadBytes), assertionMessage);
} else {
byte[] randomData = getRandomByteArray(payloadBytes);
client.upload(BinaryData.fromBytes(randomData), true).block();

if (payloadBytes > blockSizeBytes) {
File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-async-mp", ".bin").toFile();
outFile.deleteOnExit();
createdFiles.add(outFile);
Files.deleteIfExists(outFile.toPath());

BlobDownloadToFileOptions downloadOptions = new BlobDownloadToFileOptions(outFile.toPath().toString())
.setParallelTransferOptions(parallelOptions)
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);

StepVerifier.create(client.downloadToFileWithResponse(downloadOptions))
.assertNext(r -> assertNotNull(r.getValue(), assertionMessage))
.verifyComplete();

byte[] downloaded = Files.readAllBytes(outFile.toPath());
assertArrayEquals(randomData, downloaded, assertionMessage);
} else {
BlobDownloadContentOptions downloadOptions
= new BlobDownloadContentOptions().setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);

StepVerifier.create(client.downloadContentWithResponse(downloadOptions))
.assertNext(r -> assertArrayEquals(randomData, r.getValue().toBytes(), assertionMessage))
.verifyComplete();

BlobDownloadStreamOptions streamOptions
= new BlobDownloadStreamOptions().setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
StepVerifier
.create(client.downloadStreamWithResponse(streamOptions)
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
.assertNext(bytes -> assertArrayEquals(randomData, bytes, assertionMessage))
.verifyComplete();
}
}
assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded), assertionMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.storage.blob.options.BlobDownloadStreamOptions;
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.options.BlobUploadFromFileOptions;
import com.azure.storage.blob.options.BlobSeekableByteChannelReadOptions;
import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.common.ParallelTransferOptions;
Expand All @@ -32,6 +33,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.util.ArrayList;
Expand All @@ -40,6 +42,7 @@
import java.util.stream.Stream;

import static com.azure.storage.blob.specialized.BlobSeekableByteChannelTests.copy;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -48,9 +51,19 @@
/**
* Sync tests for structured message decoding during blob downloads using StorageContentValidationDecoderPolicy.
* These tests verify that the pipeline policy correctly decodes structured messages when content validation is enabled.
* <p>
* Encoder/decoder-only fuzzy roundtrip and corruption grids live in {@link BlobContentValidationStructuredMessageFuzzyTests}
* without extending {@link BlobTestBase} (no test-proxy setup).
*/
public class BlobContentValidationDownloadTests extends BlobTestBase {
private static final int TEN_MB = 10 * Constants.MB;
/**
* {@link BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases()} starts at ~96 MiB; above this threshold fuzzy
* parallel download helpers use temp files + {@link BlobTestBase#compareFiles(File, File, long, long)} so the full
* payload never lives twice in heap.
*/
private static final int FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES = 96 * Constants.MB;

private final List<File> createdFiles = new ArrayList<>();

@AfterEach
Expand Down Expand Up @@ -426,6 +439,114 @@ public void openSeekableByteChannelReadContentValidation(Integer streamBufferSiz
assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded));
}

// ---------- Fuzzy parallel download (deterministic grids) ----------

@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadReplayableCases")
public void fuzzyParallelDownloadReplayableRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTrip("replayable", payloadBytes, blockSizeBytes, maxConcurrency);
}

@LiveOnly // payload > blockSize for every tuple; chunked range GETs across many requests.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadMediumMultiPartCases")
public void fuzzyParallelDownloadMediumMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTrip("mediumMultiPart", payloadBytes, blockSizeBytes, maxConcurrency);
}

@LiveOnly // payload >> blockSize; ~96-320 MiB downloads.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases")
public void fuzzyParallelDownloadLargeMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTrip("largeMultiPart", payloadBytes, blockSizeBytes, maxConcurrency);
}

@LiveOnly // ~1 GiB single case; far too large for the test proxy.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadOneGiBCases")
public void fuzzyParallelDownloadOneGiBRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency)
throws IOException {
assertParallelDownloadFuzzyRoundTrip("oneGiB", payloadBytes, blockSizeBytes, maxConcurrency);
}

private void assertParallelDownloadFuzzyRoundTrip(String caseKind, int payloadBytes, long blockSizeBytes,
int maxConcurrency) throws IOException {
List<HttpHeaders> recorded = new CopyOnWriteArrayList<>();
BlobClient client = createBlobClientWithRequestSniffer(recorded);

ParallelTransferOptions parallelOptions
= new ParallelTransferOptions().setBlockSizeLong(blockSizeBytes).setMaxConcurrency(maxConcurrency);

String assertionMessage = "Fuzzy parallel download [" + caseKind + "] payloadBytes=" + payloadBytes
+ ", blockSize=" + blockSizeBytes + ", maxConcurrency=" + maxConcurrency;

if (payloadBytes >= FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES) {
File sourceFile = getRandomFile(payloadBytes);
sourceFile.deleteOnExit();
createdFiles.add(sourceFile);
File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl", ".bin").toFile();
outFile.deleteOnExit();
createdFiles.add(outFile);
Files.deleteIfExists(outFile.toPath());

BlobUploadFromFileOptions uploadOptions
= new BlobUploadFromFileOptions(sourceFile.getAbsolutePath()).setParallelTransferOptions(
new com.azure.storage.blob.models.ParallelTransferOptions().setBlockSizeLong(blockSizeBytes)
.setMaxConcurrency(maxConcurrency));
assertNotNull(client.uploadFromFileWithResponse(uploadOptions, null, Context.NONE).getValue().getETag(),
assertionMessage);

BlobDownloadToFileOptions downloadOptions
= new BlobDownloadToFileOptions(outFile.toPath().toString()).setParallelTransferOptions(parallelOptions)
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
assertNotNull(client.downloadToFileWithResponse(downloadOptions, null, Context.NONE).getValue(),
assertionMessage);

assertTrue(compareFiles(sourceFile, outFile, 0, payloadBytes), assertionMessage);
} else {
byte[] randomData = getRandomByteArray(payloadBytes);
client.upload(BinaryData.fromBytes(randomData), true);

if (payloadBytes > blockSizeBytes) {
File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-mp", ".bin").toFile();
outFile.deleteOnExit();
createdFiles.add(outFile);
Files.deleteIfExists(outFile.toPath());

BlobDownloadToFileOptions downloadOptions = new BlobDownloadToFileOptions(outFile.toPath().toString())
.setParallelTransferOptions(parallelOptions)
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
assertNotNull(client.downloadToFileWithResponse(downloadOptions, null, Context.NONE).getValue(),
assertionMessage);

byte[] downloaded = readAllBytesFromFile(outFile);
assertArrayEquals(randomData, downloaded, assertionMessage);
} else {
BlobDownloadContentOptions downloadOptions
= new BlobDownloadContentOptions().setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
byte[] downloaded
= client.downloadContentWithResponse(downloadOptions, null, Context.NONE).getValue().toBytes();
assertArrayEquals(randomData, downloaded, assertionMessage);
}
}
assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded), assertionMessage);
}

private static byte[] readAllBytesFromFile(File file) throws IOException {
try (InputStream is = Files.newInputStream(file.toPath())) {
byte[] buffer = new byte[(int) file.length()];
int offset = 0;
int read;
while (offset < buffer.length && (read = is.read(buffer, offset, buffer.length - offset)) != -1) {
offset += read;
}
return buffer;
}
}

static Stream<Arguments> channelReadDataSupplier() {
return Stream.of(Arguments.of(50, 40, Constants.KB), Arguments.of(Constants.KB + 50, 40, Constants.KB),
Arguments.of(null, Constants.MB, TEN_MB));
Expand Down
Loading