From 7c68eee08b6af823bc66467e761443a95221a5fb Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Tue, 19 May 2026 09:07:12 +0000 Subject: [PATCH] Adding full object checksum for bidi flow --- .../BaseObjectReadSessionStreamRead.java | 4 +- .../storage/ObjectReadSessionStream.java | 15 + .../storage/ObjectReadSessionStreamTest.java | 392 ++++++++++++++++++ 3 files changed, 409 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java index dc71350d70..39d09a5aee 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java @@ -150,7 +150,7 @@ private AccumulatingRead( IOAutoCloseable onCloseCallback) { super(rangeSpec, retryContext, onCloseCallback); this.readId = readId; - this.hasher = hasher; + this.hasher = (rangeSpec.begin() == 0) ? new CumulativeHasher(hasher, 0, rangeSpec.maxLength()) : hasher; this.complete = SettableApiFuture.create(); this.childRefs = Collections.synchronizedList(new ArrayList<>()); } @@ -280,7 +280,7 @@ static class StreamingRead extends BaseObjectReadSessionStreamRead(2); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java index 6f02b16866..beda7e5f77 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java @@ -351,6 +351,12 @@ public void onResponse(BidiReadObjectResponse response) { executor.execute( StorageException.liftToRunnable( () -> { + try { + validateCumulativeChecksum(read); + } catch (UncheckedCumulativeChecksumMismatchException e) { + state.removeOutstandingReadOnFailure(id, read::fail).onFailure(e); + return; + } read.eof(); // don't remove the outstanding read until the future has been resolved state.removeOutstandingRead(id); @@ -545,6 +551,15 @@ public void onComplete() { } } + private void validateCumulativeChecksum(ObjectReadSessionStreamRead read) { + Hasher hasher = read.hasher(); + if (hasher instanceof CumulativeHasher) { + CumulativeHasher cumulativeHasher = (CumulativeHasher) hasher; + com.google.storage.v2.Object metadata = state.getMetadata(); + cumulativeHasher.validateCumulativeChecksum(metadata); + } + } + static ObjectReadSessionStream create( ScheduledExecutorService executor, ZeroCopyBidiStreamingCallable callable, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java index 75fec2cb2d..c56ecfb355 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.storage.ByteSizeConstants._2MiB; import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; @@ -32,6 +33,7 @@ import com.google.api.gax.rpc.ClientStreamReadyObserver; import com.google.api.gax.rpc.ResponseObserver; import com.google.cloud.storage.Backoff.Jitterer; +import com.google.cloud.storage.it.ChecksummedTestContent; import com.google.cloud.storage.BaseObjectReadSessionStreamRead.AccumulatingRead; import com.google.cloud.storage.BaseObjectReadSessionStreamRead.StreamingRead; import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable; @@ -47,8 +49,10 @@ import com.google.storage.v2.BidiReadObjectSpec; import com.google.storage.v2.BucketName; import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -388,4 +392,392 @@ static TestObjectReadSessionStreamRead of() { id, RangeSpec.of(0, 10), RetryContext.neverRetry()); } } + + @Test + public void validateCumulativeChecksum_bidi_success() throws Exception { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object bidiMetadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c()) + .build()) + .build(); + + SettableApiFuture> observerFuture = SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override public void send(BidiReadObjectRequest request) {} + @Override public void closeSendWithError(Throwable t) {} + @Override + public void closeSend() { + responseObserver.onComplete(); + } + @Override public boolean isSendReady() { return true; } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + BidiReadObjectResponse resp = + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(0).build()) + .setChecksummedData(testContent.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build(); + + observer.onResponse(resp); + + byte[] resultBytes = read1.get(2, TimeUnit.SECONDS); + assertThat(xxd(ByteBuffer.wrap(resultBytes))).isEqualTo(xxd(testContent.asByteBuffer())); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_failure() throws Exception { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object bidiMetadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + SettableApiFuture> observerFuture = SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override public void send(BidiReadObjectRequest request) {} + @Override public void closeSendWithError(Throwable t) {} + @Override + public void closeSend() { + responseObserver.onComplete(); + } + @Override public boolean isSendReady() { return true; } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + BidiReadObjectResponse resp = + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(0).build()) + .setChecksummedData(testContent.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build(); + + observer.onResponse(resp); + + ExecutionException exception = assertThrows(ExecutionException.class, () -> read1.get(2, TimeUnit.SECONDS)); + assertThat(exception.getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_skippedForRangedRead() throws Exception { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object bidiMetadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + SettableApiFuture> observerFuture = SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override public void send(BidiReadObjectRequest request) {} + @Override public void closeSendWithError(Throwable t) {} + @Override + public void closeSend() { + responseObserver.onComplete(); + } + @Override public boolean isSendReady() { return true; } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.of(0, 5), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + BidiReadObjectResponse resp = + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(0).build()) + .setChecksummedData(testContent.slice(0, 5).asChecksummedData()) + .setRangeEnd(true) + .build()) + .build(); + + observer.onResponse(resp); + + byte[] resultBytes = read1.get(2, TimeUnit.SECONDS); + assertThat(xxd(ByteBuffer.wrap(resultBytes))).isEqualTo(xxd(testContent.slice(0, 5).asByteBuffer())); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_multipleChunks_success() throws Exception { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object bidiMetadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(fullContent.getCrc32c()) + .build()) + .build(); + + SettableApiFuture> observerFuture = SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override public void send(BidiReadObjectRequest request) {} + @Override public void closeSendWithError(Throwable t) {} + @Override + public void closeSend() { + responseObserver.onComplete(); + } + @Override public boolean isSendReady() { return true; } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(0).build()) + .setChecksummedData(chunk1.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(5).build()) + .setChecksummedData(chunk2.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(10).build()) + .setChecksummedData(chunk3.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build()); + + byte[] resultBytes = read1.get(2, TimeUnit.SECONDS); + assertThat(xxd(ByteBuffer.wrap(resultBytes))).isEqualTo(xxd(fullContent.asByteBuffer())); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_multipleChunks_failure() throws Exception { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object bidiMetadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(fullContent.getCrc32c() + 1) + .build()) + .build(); + + SettableApiFuture> observerFuture = SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override public void send(BidiReadObjectRequest request) {} + @Override public void closeSendWithError(Throwable t) {} + @Override + public void closeSend() { + responseObserver.onComplete(); + } + @Override public boolean isSendReady() { return true; } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(0).build()) + .setChecksummedData(chunk1.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(5).build()) + .setChecksummedData(chunk2.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange(com.google.storage.v2.ReadRange.newBuilder().setReadId(1).setReadOffset(10).build()) + .setChecksummedData(chunk3.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build()); + + ExecutionException exception = assertThrows(ExecutionException.class, () -> read1.get(2, TimeUnit.SECONDS)); + assertThat(exception.getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + } }