diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java index ddbe637e1e..3068352c06 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java @@ -1460,6 +1460,47 @@ public ApiFuture awaitReplicationAsync(final String tableId) { return stub.awaitReplicationCallable().futureCall(tableName); } + /** + * Generates a consistency token and polls it until replication is consistent. Blocks until + * completion. + * + * @param tableId The table to check. + */ + public void waitForConsistency(String tableId) { + awaitReplication(tableId); + } + + /** + * Asynchronously generates a token and polls until consistent. + * + * @param tableId The table to check. + */ + public ApiFuture waitForConsistencyAsync(String tableId) { + return awaitReplicationAsync(tableId); + } + + /** + * Polls an existing consistency token until replication is consistent. Useful for checking + * consistency of a token generated in a separate process. Blocks until completion. + * + * @param tableId The table to check. + * @param consistencyToken The token to poll. + */ + public void waitForConsistency(String tableId, String consistencyToken) { + ApiExceptions.callAndTranslateApiException(waitForConsistencyAsync(tableId, consistencyToken)); + } + + /** + * Asynchronously polls an existing consistency token. + * + * @param tableId The table to check. + * @param consistencyToken The token to poll. + */ + public ApiFuture waitForConsistencyAsync(String tableId, String consistencyToken) { + return stub.awaitConsistencyCallable() + .futureCall(ConsistencyRequest.forReplication(tableId, consistencyToken)); + } + /** * Creates a new authorized view with the specified configuration. * diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java index 0718af03c1..b2a79efcc1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java @@ -24,6 +24,7 @@ import com.google.bigtable.admin.v2.TableName; import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext; import javax.annotation.Nonnull; +import javax.annotation.Nullable; @AutoValue public abstract class ConsistencyRequest { @@ -33,14 +34,33 @@ public abstract class ConsistencyRequest { @Nonnull protected abstract CheckConsistencyRequest.ModeCase getMode(); + /** + * Internal accessor for the consistency token. Must be public to be accessible from the stub + * package. + */ + @InternalApi + @Nullable + public abstract String getConsistencyToken(); + public static ConsistencyRequest forReplication(String tableId) { return new AutoValue_ConsistencyRequest( - tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES); + tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null); + } + + /** + * Creates a request to check consistency using an existing token. + * + * @param tableId The table ID. + * @param consistencyToken The token to check. + */ + public static ConsistencyRequest forReplication(String tableId, String consistencyToken) { + return new AutoValue_ConsistencyRequest( + tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, consistencyToken); } public static ConsistencyRequest forDataBoost(String tableId) { return new AutoValue_ConsistencyRequest( - tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES); + tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null); } @InternalApi diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java index 7cdcb66599..395a77cdb3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java @@ -93,6 +93,15 @@ static AwaitConsistencyCallable create( @Override public ApiFuture futureCall( final ConsistencyRequest consistencyRequest, final ApiCallContext apiCallContext) { + + // If the token is already provided, skip generation and poll directly. + if (consistencyRequest.getConsistencyToken() != null) { + CheckConsistencyRequest request = + consistencyRequest.toCheckConsistencyProto( + requestContext, consistencyRequest.getConsistencyToken()); + return pollToken(request, apiCallContext); + } + ApiFuture tokenFuture = generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java index 0bf3731161..e89bd8fbb5 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java @@ -1657,6 +1657,31 @@ public void testTestIamPermissions() { assertThat(actualResult).containsExactly("bigtable.backups.get"); } + @Test + public void testWaitForConsistencyWithToken() { + // Setup + Mockito.when(mockStub.awaitConsistencyCallable()).thenReturn(mockAwaitConsistencyCallable); + + String token = "my-token"; + ConsistencyRequest expectedRequest = ConsistencyRequest.forReplication(TABLE_ID, token); + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + Mockito.when(mockAwaitConsistencyCallable.futureCall(expectedRequest)) + .thenAnswer( + (Answer>) + invocationOnMock -> { + wasCalled.set(true); + return ApiFutures.immediateFuture(null); + }); + + // Execute + adminClient.waitForConsistency(TABLE_ID, token); + + // Verify + assertThat(wasCalled.get()).isTrue(); + } + private void mockOperationResult( OperationCallable callable, ReqT request, diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java index d9e40242a1..c3b99a4e68 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java @@ -79,4 +79,22 @@ public void testToGenerateTokenProto() { assertThat(generateRequest.getName()) .isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID)); } + + @Test + public void testToCheckConsistencyProtoWithToken() { + ConsistencyRequest consistencyRequest = + ConsistencyRequest.forReplication(TABLE_ID, CONSISTENCY_TOKEN); + + TableAdminRequestContext requestContext = + TableAdminRequestContext.create(PROJECT_ID, INSTANCE_ID); + + CheckConsistencyRequest checkConsistencyRequest = + consistencyRequest.toCheckConsistencyProto(requestContext, CONSISTENCY_TOKEN); + + assertThat(checkConsistencyRequest.getName()) + .isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID)); + assertThat(checkConsistencyRequest.getConsistencyToken()).isEqualTo(CONSISTENCY_TOKEN); + assertThat(checkConsistencyRequest.getModeCase()) + .isEqualTo(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java index 2628cdf224..0aad99b232 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java @@ -42,6 +42,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; @@ -325,4 +326,37 @@ public void testAwaitReplicationCallablePolling() throws Exception { consistentFuture.get(1, TimeUnit.SECONDS); } + + @Test + public void testWithProvidedToken() throws Exception { + // 1. Setup: Request with a pre-existing token + String existingToken = "existing-token"; + ConsistencyRequest consistencyRequest = + ConsistencyRequest.forReplication(TABLE_ID, existingToken); + + // 2. Setup: Mock the check operation to succeed immediately + CheckConsistencyRequest expectedCheckRequest = + CheckConsistencyRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .setConsistencyToken(existingToken) + .setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build()) + .build(); + CheckConsistencyResponse expectedResponse = + CheckConsistencyResponse.newBuilder().setConsistent(true).build(); + + Mockito.when(mockCheckConsistencyCallable.futureCall(expectedCheckRequest, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse)); + + // 3. Execute + ApiFuture future = awaitConsistencyCallable.futureCall(consistencyRequest, CALL_CONTEXT); + future.get(1, TimeUnit.SECONDS); + + // 4. Verify: Generate was NEVER called, Check WAS called + Mockito.verify(mockGenerateConsistencyTokenCallable, Mockito.never()) + .futureCall( + ArgumentMatchers.any(GenerateConsistencyTokenRequest.class), + ArgumentMatchers.any(ApiCallContext.class)); + Mockito.verify(mockCheckConsistencyCallable, Mockito.times(1)) + .futureCall(expectedCheckRequest, CALL_CONTEXT); + } }