diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index cef751213c..776cb13e2e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -33,6 +33,7 @@ import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException; import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef; import com.google.cloud.storage.Retrying.Retrier; +import java.util.OptionalLong; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; import com.google.common.base.Suppliers; import com.google.protobuf.ByteString; @@ -91,7 +92,9 @@ final class GapicUnbufferedReadableByteChannel this.result = result; this.read = read; this.req = req; - this.hasher = hasher; + this.hasher = (req.getReadOffset() == 0) + ? new CumulativeHasher(hasher, 0, req.getReadLimit() <= 0 ? OptionalLong.empty() : OptionalLong.of(req.getReadLimit())) + : hasher; this.fetchOffset = new AtomicLong(req.getReadOffset()); this.blobOffset = req.getReadOffset(); this.retrier = retrier; @@ -174,6 +177,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { } if (take == EOF_MARKER) { complete = true; + validateCumulativeChecksum(); break; } @@ -311,6 +315,18 @@ private IOException createError(String message) throws IOException { return new IOException(message, cause); } + private void validateCumulativeChecksum() throws IOException { + if (hasher instanceof CumulativeHasher) { + CumulativeHasher cumulativeHasher = (CumulativeHasher) hasher; + try { + cumulativeHasher.validateCumulativeChecksum(metadata); + } catch (UncheckedCumulativeChecksumMismatchException exception) { + throw new IOException(StorageException.coalesce(exception)); + } + } + } + + private final class ReadObjectObserver extends StateCheckingResponseObserver { private final SettableApiFuture open = SettableApiFuture.create(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java index 27d96ef6f0..44553923ca 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiCallContext; @@ -26,6 +27,8 @@ import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable; import com.google.cloud.storage.Retrying.Retrier; import com.google.cloud.storage.it.ChecksummedTestContent; +import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.ReadObjectRequest; import com.google.storage.v2.ReadObjectResponse; import java.io.IOException; @@ -75,4 +78,427 @@ public void call( assertThat(close.get()).isTrue(); } } + + @Test + public void validateCumulativeChecksum_success() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c()) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(new ByteBuffer[] {buffer}, 0, 1); + assertThat(read).isEqualTo(testContent.length()); + assertThat(xxd(buffer)).isEqualTo(xxd(testContent.getBytes())); + } + } + + @Test + public void validateCumulativeChecksum_failure() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + IOException exception = assertThrows(IOException.class, () -> c.read(buffer)); + assertThat(exception.getCause()).isInstanceOf(StorageException.class); + assertThat(exception.getCause().getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + + @Test + public void validateCumulativeChecksum_skippedForRangedRead() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + ReadObjectRequest req = ReadObjectRequest.newBuilder().setReadLimit(5).build(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.slice(0, 5).asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + req, + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(5); + assertThat(xxd(buffer)).isEqualTo(xxd(testContent.slice(0, 5).getBytes())); + } + } + + @Test + public void validateCumulativeChecksum_multipleChunks_success() throws IOException { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object metadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(fullContent.getCrc32c()) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + new Thread(() -> { + try { + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk1.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk2.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk3.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } catch (Throwable t) { + respond.onError(t); + } + }).start(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(20); + int read = (int) c.read(new ByteBuffer[] {buffer}, 0, 1); + assertThat(read).isEqualTo(15); + assertThat(xxd(buffer)).isEqualTo(xxd(fullContent.getBytes())); + } + } + + @Test + public void validateCumulativeChecksum_multipleChunks_failure() throws IOException { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object metadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(fullContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + new Thread(() -> { + try { + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk1.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk2.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk3.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } catch (Throwable t) { + respond.onError(t); + } + }).start(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(20); + IOException exception = assertThrows(IOException.class, () -> { + c.read(new ByteBuffer[] {buffer}, 0, 1); + }); + assertThat(exception.getCause()).isInstanceOf(StorageException.class); + assertThat(exception.getCause().getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + + @Test + public void validateCumulativeChecksum_metadataMissingCrc32c_skipped() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums(ObjectChecksums.newBuilder().build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(10); + } + } + + @Test + public void validateCumulativeChecksum_nonZeroOffset_skipped() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + ReadObjectRequest req = ReadObjectRequest.newBuilder().setReadOffset(5).build(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.slice(5, 5).asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + req, + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(5); + } + } + + @Test + public void validateCumulativeChecksum_zeroByteObject_success() throws IOException { + Object metadata = + Object.newBuilder() + .setSize(0) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(0) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(0); + assertThat(c.read(buffer)).isEqualTo(-1); + } + } }