Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/storage/azure-storage-blob",
"Tag": "java/storage/azure-storage-blob_1f689f90f0"
"Tag": "java/storage/azure-storage-blob_f0eadf5927"
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.http.HttpHeaders;
import com.azure.core.util.BinaryData;
import com.azure.core.util.FluxUtil;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.PageRange;
import com.azure.storage.blob.models.ParallelTransferOptions;
Expand All @@ -25,13 +26,16 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
Expand Down Expand Up @@ -59,6 +63,13 @@ public class BlobContentValidationAsyncUploadTests extends BlobTestBase {
private static final long LARGE_UPLOAD_BLOCK_SIZE_BYTES = 8L * Constants.MB;
private static final int LARGE_UPLOAD_MAX_CONCURRENCY = 8;

/**
* {@link BlobTestBase#fuzzyParallelUploadLargeMultiPartCases()} starts at ~96 MiB; above this threshold the fuzzy
* parallel upload helpers stream from a temp source file and verify via {@code downloadToFile} +
* {@link BlobTestBase#compareFiles(File, File, long, long)} to avoid materializing the full payload twice in heap.
*/
private static final int FUZZY_PARALLEL_UPLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES = 96 * Constants.MB;

private static final String MD5_AND_CRC64_EXCLUSIVE_MESSAGE
= "Only one form of transactional content validation may be used.";

Expand Down Expand Up @@ -875,6 +886,104 @@ public void uploadChunkedRandomSizesRoundTripDataIntegrity() {
+ ")");
}

// ---------- Fuzzy parallel upload (async) ----------

@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadPutBlobReplayableCases")
public void fuzzyParallelUploadPutBlobReplayableRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTripAsync("putBlobReplay", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases: Put Block URLs include random IDs.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSmallPayloadStagingCases")
public void fuzzyParallelUploadSmallPayloadRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) throws IOException {
assertParallelUploadFuzzyRoundTripAsync("smallPayloadStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload > segment for every tuple; always staging/Put Block.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSub4MiBCases")
public void fuzzyParallelUploadSubFourMiBBlobRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTripAsync("subFourMiB", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadFourMiBBoundaryStagingCases")
public void fuzzyParallelUploadFourMiBBoundaryRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) throws IOException {
assertParallelUploadFuzzyRoundTripAsync("fourMiBBoundaryStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Chunked uploads only.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadMediumMultiPartCases")
public void fuzzyParallelUploadMediumMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTripAsync("mediumMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Large chunked uploads.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadLargeMultiPartCases")
public void fuzzyParallelUploadLargeMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTripAsync("largeMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

private void assertParallelUploadFuzzyRoundTripAsync(String caseKind, int payloadBytes, long segmentBytes,
int maxConcurrency) throws IOException {
BlobAsyncClient client = createBlobAsyncClientWithRequestSniffer(new CopyOnWriteArrayList<>());

ParallelTransferOptions parallelOptions = new ParallelTransferOptions().setBlockSizeLong(segmentBytes)
.setMaxSingleUploadSizeLong(segmentBytes)
.setMaxConcurrency(maxConcurrency);

String assertionMessage = "Fuzzy parallel upload [" + caseKind + "] payloadBytes=" + payloadBytes
+ ", segmentBytes=" + segmentBytes + ", maxConcurrency=" + maxConcurrency;

if (payloadBytes >= FUZZY_PARALLEL_UPLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES) {
File sourceFile = getRandomFile(payloadBytes);
File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-async", ".bin").toFile();
outFile.deleteOnExit();
int readChunkSize = (int) Math.min(8L * Constants.MB, Math.max(64 * Constants.KB, segmentBytes));
AsynchronousFileChannel channel
= AsynchronousFileChannel.open(sourceFile.toPath(), StandardOpenOption.READ);
try {
Flux<ByteBuffer> data = FluxUtil.readFile(channel, readChunkSize, 0, payloadBytes);
BlobParallelUploadOptions options
= new BlobParallelUploadOptions(data).setParallelTransferOptions(parallelOptions)
.setRequestConditions(new BlobRequestConditions())
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
client.uploadWithResponse(options).block();
} finally {
channel.close();
}
client.downloadToFile(outFile.getPath(), true).block();
assertTrue(compareFiles(sourceFile, outFile, 0, payloadBytes), assertionMessage);
if (!sourceFile.delete()) {
sourceFile.deleteOnExit();
}
if (!outFile.delete()) {
outFile.deleteOnExit();
}
} else {
byte[] randomData = getRandomByteArray(payloadBytes);
Flux<ByteBuffer> data = Flux.just(ByteBuffer.wrap(randomData));
BlobParallelUploadOptions options
= new BlobParallelUploadOptions(data).setParallelTransferOptions(parallelOptions)
.setRequestConditions(new BlobRequestConditions())
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
client.uploadWithResponse(options).block();
byte[] downloaded = client.downloadContent().block().toBytes();
assertArrayEquals(randomData, downloaded, assertionMessage);
}
}

@LiveOnly // This test is too large for the test proxy.
@Test
public void blockBlobSimpleUploadRandomSizeRoundTripDataIntegrity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -66,6 +68,13 @@ public class BlobContentValidationUploadTests extends BlobTestBase {
private static final long LARGE_UPLOAD_BLOCK_SIZE_BYTES = 8L * Constants.MB;
private static final int LARGE_UPLOAD_MAX_CONCURRENCY = 8;

/**
* {@link BlobTestBase#fuzzyParallelUploadLargeMultiPartCases()} starts at ~96 MiB; above this threshold the fuzzy
* parallel upload helpers use temp files and streaming download/compare to avoid holding the full payload twice in
* heap (upload buffer + {@code downloadContent().toBytes()}).
*/
private static final int FUZZY_PARALLEL_UPLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES = 96 * Constants.MB;

private static final String MD5_AND_CRC64_EXCLUSIVE_MESSAGE
= "Only one form of transactional content validation may be used.";

Expand Down Expand Up @@ -1133,6 +1142,98 @@ public void uploadChunkedRandomSizesRoundTripDataIntegrity() {
+ ")");
}

// ---------- Fuzzy parallel upload (deterministic grids) ----------

@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadPutBlobReplayableCases")
public void fuzzyParallelUploadPutBlobReplayableRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTrip("putBlobReplay", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases: Put Block URLs include random IDs
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSmallPayloadStagingCases")
public void fuzzyParallelUploadSmallPayloadRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) throws IOException {
assertParallelUploadFuzzyRoundTrip("smallPayloadStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload > segment for every tuple; always staging/Put Block.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSub4MiBCases")
public void fuzzyParallelUploadSubFourMiBBlobRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTrip("subFourMiB", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadFourMiBBoundaryStagingCases")
public void fuzzyParallelUploadFourMiBBoundaryRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) throws IOException {
assertParallelUploadFuzzyRoundTrip("fourMiBBoundaryStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload > segment throughout; chunked upload.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadMediumMultiPartCases")
public void fuzzyParallelUploadMediumMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTrip("mediumMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload >> segment throughout; chunked upload / large payloads.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadLargeMultiPartCases")
public void fuzzyParallelUploadLargeMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency)
throws IOException {
assertParallelUploadFuzzyRoundTrip("largeMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

private void assertParallelUploadFuzzyRoundTrip(String caseKind, int payloadBytes, long segmentBytes,
int maxConcurrency) throws IOException {
BlobClient client = createBlobClientWithRequestSniffer(new CopyOnWriteArrayList<>());

ParallelTransferOptions parallelOptions = new ParallelTransferOptions().setBlockSizeLong(segmentBytes)
.setMaxSingleUploadSizeLong(segmentBytes)
.setMaxConcurrency(maxConcurrency);

String assertionMessage = "Fuzzy parallel upload [" + caseKind + "] payloadBytes=" + payloadBytes
+ ", segmentBytes=" + segmentBytes + ", maxConcurrency=" + maxConcurrency;

if (payloadBytes >= FUZZY_PARALLEL_UPLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES) {
File sourceFile = getRandomFile(payloadBytes);
File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl", ".bin").toFile();
outFile.deleteOnExit();
try (InputStream data = new FileInputStream(sourceFile)) {
BlobParallelUploadOptions options
= new BlobParallelUploadOptions(data).setParallelTransferOptions(parallelOptions)
.setRequestConditions(new BlobRequestConditions())
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
client.uploadWithResponse(options, null, Context.NONE);
}
client.downloadToFile(outFile.getPath(), true);
assertTrue(compareFiles(sourceFile, outFile, 0, payloadBytes), assertionMessage);
if (!sourceFile.delete()) {
sourceFile.deleteOnExit();
}
if (!outFile.delete()) {
outFile.deleteOnExit();
}
} else {
byte[] randomData = getRandomByteArray(payloadBytes);
InputStream data = new ByteArrayInputStream(randomData);
BlobParallelUploadOptions options
= new BlobParallelUploadOptions(data).setParallelTransferOptions(parallelOptions)
.setRequestConditions(new BlobRequestConditions())
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);
client.uploadWithResponse(options, null, Context.NONE);
byte[] downloaded = client.downloadContent().toBytes();
assertArrayEquals(randomData, downloaded, assertionMessage);
}
}

@LiveOnly // This test is too large for the test proxy.
@Test
public void blockBlobSimpleUploadRandomSizeRoundTripDataIntegrity() {
Expand Down
Loading
Loading