diff --git a/sdk/storage/azure-storage-blob/assets.json b/sdk/storage/azure-storage-blob/assets.json index 0c3832771777..baee893b7849 100644 --- a/sdk/storage/azure-storage-blob/assets.json +++ b/sdk/storage/azure-storage-blob/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "java", "TagPrefix": "java/storage/azure-storage-blob", - "Tag": "java/storage/azure-storage-blob_1f689f90f0" + "Tag": "java/storage/azure-storage-blob_4691350e44" } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationAsyncDownloadTests.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationAsyncDownloadTests.java index 38a96c2521a9..45c38f461df5 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationAsyncDownloadTests.java +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationAsyncDownloadTests.java @@ -13,6 +13,7 @@ import com.azure.storage.blob.options.BlobDownloadContentOptions; import com.azure.storage.blob.options.BlobDownloadStreamOptions; import com.azure.storage.blob.options.BlobDownloadToFileOptions; +import com.azure.storage.blob.options.BlobUploadFromFileOptions; import com.azure.storage.common.ParallelTransferOptions; import com.azure.storage.common.ContentValidationAlgorithm; import com.azure.storage.common.implementation.Constants; @@ -22,6 +23,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -35,6 +37,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -46,6 +49,13 @@ */ public class BlobContentValidationAsyncDownloadTests extends BlobTestBase { private static final int TEN_MB = 10 * Constants.MB; + /** + * {@link BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases()} starts at ~96 MiB; above this threshold fuzzy + * parallel download helpers use temp files + {@link BlobTestBase#compareFiles(File, File, long, long)} so the full + * payload never lives twice in heap. + */ + private static final int FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES = 96 * Constants.MB; + private final List createdFiles = new ArrayList<>(); @AfterEach @@ -439,4 +449,113 @@ public void interruptMultipleTimesWithDataIntact() { assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded)); } + // ---------- Fuzzy parallel download (deterministic grids) ---------- + + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadReplayableCases") + public void fuzzyParallelDownloadReplayableRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTripAsync("replayable", payloadBytes, blockSizeBytes, maxConcurrency); + } + + @LiveOnly // payload > blockSize for every tuple; chunked range GETs across many requests. + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadMediumMultiPartCases") + public void fuzzyParallelDownloadMediumMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTripAsync("mediumMultiPart", payloadBytes, blockSizeBytes, maxConcurrency); + } + + @LiveOnly // payload >> blockSize; ~96-320 MiB downloads. + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases") + public void fuzzyParallelDownloadLargeMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTripAsync("largeMultiPart", payloadBytes, blockSizeBytes, maxConcurrency); + } + + @LiveOnly // ~1 GiB single case; far too large for the test proxy. + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadOneGiBCases") + public void fuzzyParallelDownloadOneGiBRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTripAsync("oneGiB", payloadBytes, blockSizeBytes, maxConcurrency); + } + + private void assertParallelDownloadFuzzyRoundTripAsync(String caseKind, int payloadBytes, long blockSizeBytes, + int maxConcurrency) throws IOException { + List recorded = new CopyOnWriteArrayList<>(); + BlobAsyncClient client = createBlobAsyncClientWithRequestSniffer(recorded); + + ParallelTransferOptions parallelOptions + = new ParallelTransferOptions().setBlockSizeLong(blockSizeBytes).setMaxConcurrency(maxConcurrency); + + String assertionMessage = "Fuzzy parallel download [" + caseKind + "] payloadBytes=" + payloadBytes + + ", blockSize=" + blockSizeBytes + ", maxConcurrency=" + maxConcurrency; + + if (payloadBytes >= FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES) { + File sourceFile = getRandomFile(payloadBytes); + sourceFile.deleteOnExit(); + createdFiles.add(sourceFile); + File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-async", ".bin").toFile(); + outFile.deleteOnExit(); + createdFiles.add(outFile); + Files.deleteIfExists(outFile.toPath()); + + BlobUploadFromFileOptions uploadOptions + = new BlobUploadFromFileOptions(sourceFile.getAbsolutePath()).setParallelTransferOptions( + new com.azure.storage.blob.models.ParallelTransferOptions().setBlockSizeLong(blockSizeBytes) + .setMaxConcurrency(maxConcurrency)); + assertNotNull(client.uploadFromFileWithResponse(uploadOptions).block().getValue().getETag(), + assertionMessage); + + BlobDownloadToFileOptions downloadOptions + = new BlobDownloadToFileOptions(outFile.toPath().toString()).setParallelTransferOptions(parallelOptions) + .setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64); + + StepVerifier.create(client.downloadToFileWithResponse(downloadOptions)) + .assertNext(r -> assertNotNull(r.getValue(), assertionMessage)) + .verifyComplete(); + + assertTrue(compareFiles(sourceFile, outFile, 0, payloadBytes), assertionMessage); + } else { + byte[] randomData = getRandomByteArray(payloadBytes); + client.upload(BinaryData.fromBytes(randomData), true).block(); + + if (payloadBytes > blockSizeBytes) { + File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-async-mp", ".bin").toFile(); + outFile.deleteOnExit(); + createdFiles.add(outFile); + Files.deleteIfExists(outFile.toPath()); + + BlobDownloadToFileOptions downloadOptions = new BlobDownloadToFileOptions(outFile.toPath().toString()) + .setParallelTransferOptions(parallelOptions) + .setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64); + + StepVerifier.create(client.downloadToFileWithResponse(downloadOptions)) + .assertNext(r -> assertNotNull(r.getValue(), assertionMessage)) + .verifyComplete(); + + byte[] downloaded = Files.readAllBytes(outFile.toPath()); + assertArrayEquals(randomData, downloaded, assertionMessage); + } else { + BlobDownloadContentOptions downloadOptions + = new BlobDownloadContentOptions().setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64); + + StepVerifier.create(client.downloadContentWithResponse(downloadOptions)) + .assertNext(r -> assertArrayEquals(randomData, r.getValue().toBytes(), assertionMessage)) + .verifyComplete(); + + BlobDownloadStreamOptions streamOptions + = new BlobDownloadStreamOptions().setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64); + StepVerifier + .create(client.downloadStreamWithResponse(streamOptions) + .flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))) + .assertNext(bytes -> assertArrayEquals(randomData, bytes, assertionMessage)) + .verifyComplete(); + } + } + assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded), assertionMessage); + } + } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationDownloadTests.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationDownloadTests.java index 86b7f116a60d..f7154fe50d39 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationDownloadTests.java +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationDownloadTests.java @@ -15,6 +15,7 @@ import com.azure.storage.blob.options.BlobDownloadStreamOptions; import com.azure.storage.blob.options.BlobDownloadToFileOptions; import com.azure.storage.blob.options.BlobInputStreamOptions; +import com.azure.storage.blob.options.BlobUploadFromFileOptions; import com.azure.storage.blob.options.BlobSeekableByteChannelReadOptions; import com.azure.storage.blob.specialized.BlobInputStream; import com.azure.storage.common.ParallelTransferOptions; @@ -32,6 +33,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.util.ArrayList; @@ -40,6 +42,7 @@ import java.util.stream.Stream; import static com.azure.storage.blob.specialized.BlobSeekableByteChannelTests.copy; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -48,9 +51,19 @@ /** * Sync tests for structured message decoding during blob downloads using StorageContentValidationDecoderPolicy. * These tests verify that the pipeline policy correctly decodes structured messages when content validation is enabled. + *

+ * Encoder/decoder-only fuzzy roundtrip and corruption grids live in {@link BlobContentValidationStructuredMessageFuzzyTests} + * without extending {@link BlobTestBase} (no test-proxy setup). */ public class BlobContentValidationDownloadTests extends BlobTestBase { private static final int TEN_MB = 10 * Constants.MB; + /** + * {@link BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases()} starts at ~96 MiB; above this threshold fuzzy + * parallel download helpers use temp files + {@link BlobTestBase#compareFiles(File, File, long, long)} so the full + * payload never lives twice in heap. + */ + private static final int FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES = 96 * Constants.MB; + private final List createdFiles = new ArrayList<>(); @AfterEach @@ -426,6 +439,114 @@ public void openSeekableByteChannelReadContentValidation(Integer streamBufferSiz assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded)); } + // ---------- Fuzzy parallel download (deterministic grids) ---------- + + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadReplayableCases") + public void fuzzyParallelDownloadReplayableRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTrip("replayable", payloadBytes, blockSizeBytes, maxConcurrency); + } + + @LiveOnly // payload > blockSize for every tuple; chunked range GETs across many requests. + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadMediumMultiPartCases") + public void fuzzyParallelDownloadMediumMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTrip("mediumMultiPart", payloadBytes, blockSizeBytes, maxConcurrency); + } + + @LiveOnly // payload >> blockSize; ~96-320 MiB downloads. + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases") + public void fuzzyParallelDownloadLargeMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTrip("largeMultiPart", payloadBytes, blockSizeBytes, maxConcurrency); + } + + @LiveOnly // ~1 GiB single case; far too large for the test proxy. + @ParameterizedTest + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadOneGiBCases") + public void fuzzyParallelDownloadOneGiBRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) + throws IOException { + assertParallelDownloadFuzzyRoundTrip("oneGiB", payloadBytes, blockSizeBytes, maxConcurrency); + } + + private void assertParallelDownloadFuzzyRoundTrip(String caseKind, int payloadBytes, long blockSizeBytes, + int maxConcurrency) throws IOException { + List recorded = new CopyOnWriteArrayList<>(); + BlobClient client = createBlobClientWithRequestSniffer(recorded); + + ParallelTransferOptions parallelOptions + = new ParallelTransferOptions().setBlockSizeLong(blockSizeBytes).setMaxConcurrency(maxConcurrency); + + String assertionMessage = "Fuzzy parallel download [" + caseKind + "] payloadBytes=" + payloadBytes + + ", blockSize=" + blockSizeBytes + ", maxConcurrency=" + maxConcurrency; + + if (payloadBytes >= FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES) { + File sourceFile = getRandomFile(payloadBytes); + sourceFile.deleteOnExit(); + createdFiles.add(sourceFile); + File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl", ".bin").toFile(); + outFile.deleteOnExit(); + createdFiles.add(outFile); + Files.deleteIfExists(outFile.toPath()); + + BlobUploadFromFileOptions uploadOptions + = new BlobUploadFromFileOptions(sourceFile.getAbsolutePath()).setParallelTransferOptions( + new com.azure.storage.blob.models.ParallelTransferOptions().setBlockSizeLong(blockSizeBytes) + .setMaxConcurrency(maxConcurrency)); + assertNotNull(client.uploadFromFileWithResponse(uploadOptions, null, Context.NONE).getValue().getETag(), + assertionMessage); + + BlobDownloadToFileOptions downloadOptions + = new BlobDownloadToFileOptions(outFile.toPath().toString()).setParallelTransferOptions(parallelOptions) + .setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64); + assertNotNull(client.downloadToFileWithResponse(downloadOptions, null, Context.NONE).getValue(), + assertionMessage); + + assertTrue(compareFiles(sourceFile, outFile, 0, payloadBytes), assertionMessage); + } else { + byte[] randomData = getRandomByteArray(payloadBytes); + client.upload(BinaryData.fromBytes(randomData), true); + + if (payloadBytes > blockSizeBytes) { + File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-mp", ".bin").toFile(); + outFile.deleteOnExit(); + createdFiles.add(outFile); + Files.deleteIfExists(outFile.toPath()); + + BlobDownloadToFileOptions downloadOptions = new BlobDownloadToFileOptions(outFile.toPath().toString()) + .setParallelTransferOptions(parallelOptions) + .setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64); + assertNotNull(client.downloadToFileWithResponse(downloadOptions, null, Context.NONE).getValue(), + assertionMessage); + + byte[] downloaded = readAllBytesFromFile(outFile); + assertArrayEquals(randomData, downloaded, assertionMessage); + } else { + BlobDownloadContentOptions downloadOptions + = new BlobDownloadContentOptions().setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64); + byte[] downloaded + = client.downloadContentWithResponse(downloadOptions, null, Context.NONE).getValue().toBytes(); + assertArrayEquals(randomData, downloaded, assertionMessage); + } + } + assertTrue(hasOnlyStructuredMessageDownloadHeaders(recorded), assertionMessage); + } + + private static byte[] readAllBytesFromFile(File file) throws IOException { + try (InputStream is = Files.newInputStream(file.toPath())) { + byte[] buffer = new byte[(int) file.length()]; + int offset = 0; + int read; + while (offset < buffer.length && (read = is.read(buffer, offset, buffer.length - offset)) != -1) { + offset += read; + } + return buffer; + } + } + static Stream channelReadDataSupplier() { return Stream.of(Arguments.of(50, 40, Constants.KB), Arguments.of(Constants.KB + 50, 40, Constants.KB), Arguments.of(null, Constants.MB, TEN_MB)); diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationStructuredMessageFuzzyTests.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationStructuredMessageFuzzyTests.java new file mode 100644 index 000000000000..c7be1154697b --- /dev/null +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobContentValidationStructuredMessageFuzzyTests.java @@ -0,0 +1,216 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob; + +import com.azure.core.util.FluxUtil; +import com.azure.storage.common.implementation.contentvalidation.StructuredMessageConstants; +import com.azure.storage.common.implementation.contentvalidation.StructuredMessageDecoder; +import com.azure.storage.common.implementation.contentvalidation.StructuredMessageEncoder; +import com.azure.storage.common.implementation.contentvalidation.StructuredMessageFlags; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Encoder + decoder fuzzy coverage. + */ +public final class BlobContentValidationStructuredMessageFuzzyTests { + + private static ByteBuffer collectStructuredMessageFlux(Flux flux) { + return ByteBuffer.wrap(FluxUtil.collectBytesInByteBufferStream(flux).block()).order(ByteOrder.LITTLE_ENDIAN); + } + + private static byte[] encodeStructuredMessage(byte[] originalData, int segmentLength, StructuredMessageFlags flags) + throws IOException { + StructuredMessageEncoder encoder = new StructuredMessageEncoder(originalData.length, segmentLength, flags); + ByteBuffer encoded = collectStructuredMessageFlux(encoder.encode(ByteBuffer.wrap(originalData))); + byte[] encodedBytes = new byte[encoded.remaining()]; + encoded.get(encodedBytes); + return encodedBytes; + } + + private static byte[] structuredMessageDeterministicPayload(int size, int seedBase) { + byte[] data = new byte[size]; + long state + = 0x9E3779B97F4A7C15L ^ ((long) seedBase * 0xBF58476D1CE4E5B9L) ^ ((long) size * 0x94D049BB133111EBL); + for (int i = 0; i < size; i++) { + state ^= state >>> 30; + state *= 0xBF58476D1CE4E5B9L; + state ^= state >>> 27; + state *= 0x94D049BB133111EBL; + state ^= state >>> 31; + data[i] = (byte) state; + } + return data; + } + + static Stream fuzzyStructuredMessageRoundTripCases() { + return Stream.of(Arguments.of(1, 1), Arguments.of(1, 64), Arguments.of(64, 1), Arguments.of(127, 1), + Arguments.of(257, 7), Arguments.of(63, 64), Arguments.of(64, 64), Arguments.of(65, 64), + Arguments.of(255, 256), Arguments.of(256, 256), Arguments.of(257, 256), + Arguments.of(7 * 1024 + 3, 16 * 1024), Arguments.of(41 * 1024 + 17, 128 * 1024), + Arguments.of(199 * 1024 + 5, 32 * 1024), Arguments.of(7 * 1024 + 3, 1024), + Arguments.of(199 * 1024 + 5, 4 * 1024 + 17), Arguments.of(512 * 1024 - 31, 8 * 1024), + Arguments.of(1 * 1024 * 1024, 1 * 1024 * 1024), Arguments.of(1 * 1024 * 1024 + 1, 1 * 1024 * 1024), + Arguments.of(1 * 1024 * 1024 - 1, 1 * 1024 * 1024), Arguments.of(2 * 1024 * 1024 + 333, 256 * 1024), + Arguments.of(4 * 1024 * 1024 + 1, 1024 * 1024 + 17)); + } + + @ParameterizedTest + @MethodSource("fuzzyStructuredMessageRoundTripCases") + void fuzzyStructuredMessageRoundTrip(int payloadBytes, int segmentBytes) throws IOException { + String assertionMessage + = "Fuzzy structured-message round trip payloadBytes=" + payloadBytes + ", segmentBytes=" + segmentBytes; + + byte[] data = structuredMessageDeterministicPayload(payloadBytes, segmentBytes); + byte[] encoded = encodeStructuredMessage(data, segmentBytes, StructuredMessageFlags.STORAGE_CRC64); + + StructuredMessageDecoder decoder = new StructuredMessageDecoder(encoded.length); + ByteBuffer result = decoder.decodeChunk(ByteBuffer.wrap(encoded).order(ByteOrder.LITTLE_ENDIAN)); + + assertTrue(decoder.isComplete(), assertionMessage); + assertNotNull(result, assertionMessage); + byte[] decoded = new byte[result.remaining()]; + result.get(decoded); + assertArrayEquals(data, decoded, assertionMessage); + } + + @ParameterizedTest + @MethodSource("fuzzyStructuredMessageRoundTripCases") + void fuzzyStructuredMessageRoundTripNoCrc(int payloadBytes, int segmentBytes) throws IOException { + String assertionMessage = "Fuzzy structured-message round trip (no-CRC) payloadBytes=" + payloadBytes + + ", segmentBytes=" + segmentBytes; + + byte[] data = structuredMessageDeterministicPayload(payloadBytes, segmentBytes ^ 0x55); + byte[] encoded = encodeStructuredMessage(data, segmentBytes, StructuredMessageFlags.NONE); + + StructuredMessageDecoder decoder = new StructuredMessageDecoder(encoded.length); + ByteBuffer result = decoder.decodeChunk(ByteBuffer.wrap(encoded).order(ByteOrder.LITTLE_ENDIAN)); + + assertTrue(decoder.isComplete(), assertionMessage); + assertNotNull(result, assertionMessage); + byte[] decoded = new byte[result.remaining()]; + result.get(decoded); + assertArrayEquals(data, decoded, assertionMessage); + } + + private static int[] structuredMessageDeterministicMutationOffsets(int encodedLength, int payloadLength, + int segmentLength) { + int afterHeader = StructuredMessageConstants.V1_HEADER_LENGTH; + int insidePayload + = StructuredMessageConstants.V1_HEADER_LENGTH + StructuredMessageConstants.V1_SEGMENT_HEADER_LENGTH + + Math.min(Math.max(payloadLength - 1, 0), segmentLength / 2); + int nearSegmentFooter = Math.min(StructuredMessageConstants.V1_HEADER_LENGTH + + StructuredMessageConstants.V1_SEGMENT_HEADER_LENGTH + payloadLength + 1, encodedLength - 1); + int nearMessageFooter = Math.max(encodedLength - 4, 0); + int nearEnd = encodedLength - 1; + return new int[] { afterHeader, insidePayload, nearSegmentFooter, nearMessageFooter, nearEnd }; + } + + static Stream fuzzyStructuredMessageCorruptionCases() { + return Stream.of(Arguments.of(64, 64), Arguments.of(257, 64), Arguments.of(1024, 256), + Arguments.of(7 * 1024 + 3, 1024), Arguments.of(199 * 1024 + 5, 32 * 1024), + Arguments.of(1 * 1024 * 1024 + 1, 1 * 1024 * 1024)); + } + + @ParameterizedTest + @MethodSource("fuzzyStructuredMessageCorruptionCases") + void fuzzyStructuredMessageRejectsInjectedByte(int payloadBytes, int segmentBytes) throws IOException { + byte[] data = structuredMessageDeterministicPayload(payloadBytes, segmentBytes ^ 0xA5); + byte[] encoded = encodeStructuredMessage(data, segmentBytes, StructuredMessageFlags.STORAGE_CRC64); + + for (int offset : structuredMessageDeterministicMutationOffsets(encoded.length, payloadBytes, segmentBytes)) { + byte injectByte = (byte) (encoded[offset] ^ 0xFF); + byte[] mutated = new byte[encoded.length + 1]; + System.arraycopy(encoded, 0, mutated, 0, offset); + mutated[offset] = injectByte; + System.arraycopy(encoded, offset, mutated, offset + 1, encoded.length - offset); + + String assertionMessage = "Fuzzy structured-message rejects injected byte payloadBytes=" + payloadBytes + + ", segmentBytes=" + segmentBytes + ", mutationOffset=" + offset; + + StructuredMessageDecoder decoder = new StructuredMessageDecoder(encoded.length); + assertThrows(IllegalArgumentException.class, + () -> decoder.decodeChunk(ByteBuffer.wrap(mutated, 0, encoded.length).order(ByteOrder.LITTLE_ENDIAN)), + assertionMessage); + } + } + + @ParameterizedTest + @MethodSource("fuzzyStructuredMessageCorruptionCases") + void fuzzyStructuredMessageRejectsRemovedBytes(int payloadBytes, int segmentBytes) throws IOException { + byte[] data = structuredMessageDeterministicPayload(payloadBytes, segmentBytes ^ 0x3C); + byte[] encoded = encodeStructuredMessage(data, segmentBytes, StructuredMessageFlags.STORAGE_CRC64); + + for (int offset : structuredMessageDeterministicMutationOffsets(encoded.length, payloadBytes, segmentBytes)) { + byte[] mutated = new byte[encoded.length - 1]; + System.arraycopy(encoded, 0, mutated, 0, offset); + System.arraycopy(encoded, offset + 1, mutated, offset, encoded.length - offset - 1); + + String assertionMessage = "Fuzzy structured-message rejects removed byte payloadBytes=" + payloadBytes + + ", segmentBytes=" + segmentBytes + ", mutationOffset=" + offset; + + StructuredMessageDecoder decoder = new StructuredMessageDecoder(mutated.length); + assertThrows(IllegalArgumentException.class, + () -> decoder.decodeChunk(ByteBuffer.wrap(mutated).order(ByteOrder.LITTLE_ENDIAN)), assertionMessage); + } + } + + @ParameterizedTest + @MethodSource("fuzzyStructuredMessageCorruptionCases") + void fuzzyStructuredMessageRejectsRemovedRange(int payloadBytes, int segmentBytes) throws IOException { + byte[] data = structuredMessageDeterministicPayload(payloadBytes, segmentBytes ^ 0x71); + byte[] encoded = encodeStructuredMessage(data, segmentBytes, StructuredMessageFlags.STORAGE_CRC64); + + final int rangeLen = 4; + for (int rawOffset : structuredMessageDeterministicMutationOffsets(encoded.length, payloadBytes, + segmentBytes)) { + int offset = Math.min(rawOffset, encoded.length - rangeLen - 1); + if (offset < 0) { + continue; + } + byte[] mutated = new byte[encoded.length - rangeLen]; + System.arraycopy(encoded, 0, mutated, 0, offset); + System.arraycopy(encoded, offset + rangeLen, mutated, offset, encoded.length - offset - rangeLen); + + String assertionMessage = "Fuzzy structured-message rejects removed bytes payloadBytes=" + payloadBytes + + ", segmentBytes=" + segmentBytes + ", mutationOffset=" + offset + ", rangeLen=" + rangeLen; + + StructuredMessageDecoder decoder = new StructuredMessageDecoder(mutated.length); + assertThrows(IllegalArgumentException.class, + () -> decoder.decodeChunk(ByteBuffer.wrap(mutated).order(ByteOrder.LITTLE_ENDIAN)), assertionMessage); + } + } + + @ParameterizedTest + @MethodSource("fuzzyStructuredMessageCorruptionCases") + void fuzzyStructuredMessageRejectsFlippedByte(int payloadBytes, int segmentBytes) throws IOException { + byte[] data = structuredMessageDeterministicPayload(payloadBytes, segmentBytes ^ 0x1B); + byte[] encoded = encodeStructuredMessage(data, segmentBytes, StructuredMessageFlags.STORAGE_CRC64); + + for (int offset : structuredMessageDeterministicMutationOffsets(encoded.length, payloadBytes, segmentBytes)) { + byte[] mutated = Arrays.copyOf(encoded, encoded.length); + mutated[offset] ^= 0x01; + + String assertionMessage = "Fuzzy structured-message rejects flipped byte payloadBytes=" + payloadBytes + + ", segmentBytes=" + segmentBytes + ", mutationOffset=" + offset; + + StructuredMessageDecoder decoder = new StructuredMessageDecoder(mutated.length); + assertThrows(IllegalArgumentException.class, + () -> decoder.decodeChunk(ByteBuffer.wrap(mutated).order(ByteOrder.LITTLE_ENDIAN)), assertionMessage); + } + } +} diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobTestBase.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobTestBase.java index 514ff455fb90..59b08a21a0ab 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobTestBase.java +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobTestBase.java @@ -1518,4 +1518,85 @@ protected static long expectedStructuredMessageEncodedLengthChunked(int totalUne } return sum; } + + /** + * Every tuple keeps payloadBytes <= blockSizeBytes, so the parallel download path issues a single GET (no + * follow-on range requests for additional blocks), which replays under the test proxy. + *

+ * Sizes are deliberately non-power-of-two (e.g. 7 * KB + 3) and use mixed block ceilings (64 KiB through + * multi-MiB) to catch alignment and decoder edge cases at structural boundaries (message header, segment + * footer, message footer); the 4 MiB boundary row exercises the exact service-side default segment length. + */ + protected static Stream fuzzyParallelDownloadReplayableCases() { + return Stream.of(Arguments.of(1, 64L * Constants.KB, 1), + Arguments.of(7 * Constants.KB + 3, 64L * Constants.KB, 1), + Arguments.of(7 * Constants.KB + 3, 128L * Constants.KB, 4), + Arguments.of(41 * Constants.KB + 17, 256L * Constants.KB, 1), + Arguments.of(41 * Constants.KB + 17, 256L * Constants.KB, 8), + Arguments.of(199 * Constants.KB + 5, 512L * Constants.KB, 2), + Arguments.of(512 * Constants.KB - 31, 1L * Constants.MB, 8), + Arguments.of(896 * Constants.KB + 101, 1L * Constants.MB, 6), + Arguments.of(2 * Constants.MB - 1, 4L * Constants.MB, 4), + Arguments.of(2 * Constants.MB + 33, 4L * Constants.MB, 1), + Arguments.of(4 * Constants.MB - 1, 4L * Constants.MB, 2), + Arguments.of(4 * Constants.MB, 4L * Constants.MB, 1), + Arguments.of(4 * Constants.MB, 7L * Constants.MB + 919, 3)); + } + + /** + * payloadBytes > blockSizeBytes, so downloads always go through multiple ranged GETs (parallel download + * fan-out) with totals roughly 6-80 MiB. Large enough to exercise the structured-message decoder over + * multiple HTTP responses, but cheaper than {@link #fuzzyParallelDownloadLargeMultiPartCases}. + *

+ * Block sizes step through common service limits (1-8 MiB, half-MiB tail values); concurrency 1-8 pairs + * with imbalanced payloads (e.g. 701, 333) to flush merge/retry edge cases. + */ + protected static Stream fuzzyParallelDownloadMediumMultiPartCases() { + return Stream.of(Arguments.of(6 * Constants.MB + 701, Constants.MB, 1), + Arguments.of(6 * Constants.MB + 701, 3L * Constants.MB + 271, 4), + Arguments.of(9 * Constants.MB + 333, 2L * Constants.MB, 1), + Arguments.of(9 * Constants.MB + 333, 3L * Constants.MB + 199, 8), + Arguments.of(12 * Constants.MB + 901, 4L * Constants.MB + 901, 2), + Arguments.of(14 * Constants.MB, 500L * Constants.KB + 13, 6), + Arguments.of(18 * Constants.MB - 4021, 5L * Constants.MB - 701, 3), + Arguments.of(24 * Constants.MB, 8L * Constants.MB, 8), + Arguments.of(28 * Constants.MB + 56789, 7L * Constants.MB + 13, 2), + Arguments.of(31 * Constants.MB, 1024L * Constants.KB + 17, 4), + Arguments.of(40 * Constants.MB + 12345, 7L * Constants.MB + 13, 3), + Arguments.of(48 * Constants.MB - 777, 5L * Constants.MB + 809L * Constants.KB, 6), + Arguments.of(56 * Constants.MB + 19, 9L * Constants.MB + 4096, 8), + Arguments.of(72 * Constants.MB, 4L * Constants.MB + 65536, 8), + Arguments.of(80 * Constants.MB + 321, 13L * Constants.MB - 3073, 1)); + } + + /** + * Stresses high block counts and long-running parallel downloads (~96-320 MiB payloads) with service-realistic + * block sizes (8-61 MiB class) and heavy concurrency. + *

+ * The final rows use named near-256/288/320 MiB totals with irregular byte tails to keep total bytes and block + * remainders off common multiples while still bounding runtime for Live-only CI. + */ + protected static Stream fuzzyParallelDownloadLargeMultiPartCases() { + final int payload257MiBPlus = (int) (257L * Constants.MB + 18881); + final int payload288MiBPlus = (int) (288L * Constants.MB + 7777); + final int payload320MiBPlus = (int) (320L * Constants.MB + 1999); + return Stream.of(Arguments.of(96 * Constants.MB + 17, 8L * Constants.MB + 511, 2), + Arguments.of(112 * Constants.MB, 15L * Constants.MB + 4096, 8), + Arguments.of(128 * Constants.MB + 45673, 17L * Constants.MB - 11264 + 173, 4), + Arguments.of(160 * Constants.MB + 12345, 12L * Constants.MB + 8192, 8), + Arguments.of(192 * Constants.MB + 9876, 31L * Constants.MB - 513, 8), + Arguments.of(224 * Constants.MB, 23L * Constants.MB + 524288, 8), + Arguments.of(payload257MiBPlus, 61L * Constants.MB + 23L * Constants.KB, 6), + Arguments.of(payload288MiBPlus, 36L * Constants.MB + 513, 8), + Arguments.of(payload320MiBPlus, 16L * Constants.MB + 511, 8)); + } + + /** + * Single ~1 GiB download with high concurrency and an awkward (non-aligned) tail to exercise the structured + * message decoder under a sustained, fan-out-heavy parallel download. Live-only and file-backed so payload + * never materializes twice in heap. + */ + protected static Stream fuzzyParallelDownloadOneGiBCases() { + return Stream.of(Arguments.of((int) (1L * Constants.GB + 1377), 16L * Constants.MB + 511, 8)); + } }