From 85736e5d5a792d2817bfb6229eada7d9f73366ad Mon Sep 17 00:00:00 2001 From: Jin Seop Kim Date: Mon, 26 Jan 2026 16:08:44 -0500 Subject: [PATCH 1/2] feat: Add methods to wait for consistency with a token This change adds support for checking consistency using a provided token, enabling distributed workflows. It also adds convenience methods to BigtableTableAdminClient for generating and waiting for a token automatically. This addresses the Consistency Tokens CUJ. Tracking Bug: b/475820272 --- .../admin/v2/BigtableTableAdminClient.java | 43 +++++++++++++++++++ .../admin/v2/models/ConsistencyRequest.java | 26 +++++++++-- .../v2/stub/AwaitConsistencyCallable.java | 9 ++++ .../v2/BigtableTableAdminClientTests.java | 25 +++++++++++ .../v2/models/ConsistencyRequestTest.java | 18 ++++++++ .../v2/stub/AwaitConsistencyCallableTest.java | 32 ++++++++++++++ 6 files changed, 150 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java index ddbe637e1e..9e4a1a4bf5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java @@ -1460,6 +1460,49 @@ public ApiFuture awaitReplicationAsync(final String tableId) { return stub.awaitReplicationCallable().futureCall(tableName); } + /** + * Generates a consistency token and polls it until replication is consistent. + * Blocks until completion. + * + * @param tableId The table to check. + */ + public void waitForConsistency(String tableId) { + awaitReplication(tableId); + } + + /** + * Asynchronously generates a token and polls until consistent. + * + * @param tableId The table to check. + */ + public ApiFuture waitForConsistencyAsync(String tableId) { + return awaitReplicationAsync(tableId); + } + + /** + * Polls an existing consistency token until replication is consistent. + * Useful for checking consistency of a token generated in a separate process. + * Blocks until completion. + * + * @param tableId The table to check. + * @param consistencyToken The token to poll. + */ + public void waitForConsistency(String tableId, String consistencyToken) { + ApiExceptions.callAndTranslateApiException( + waitForConsistencyAsync(tableId, consistencyToken)); + } + + /** + * Asynchronously polls an existing consistency token. + * + * @param tableId The table to check. + * @param consistencyToken The token to poll. + */ + public ApiFuture waitForConsistencyAsync(String tableId, String consistencyToken) { + return stub.awaitConsistencyCallable().futureCall( + ConsistencyRequest.forReplication(tableId, consistencyToken)); + } + /** * Creates a new authorized view with the specified configuration. * diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java index 0718af03c1..9dc7f76626 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java @@ -24,6 +24,7 @@ import com.google.bigtable.admin.v2.TableName; import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext; import javax.annotation.Nonnull; +import javax.annotation.Nullable; @AutoValue public abstract class ConsistencyRequest { @@ -33,14 +34,33 @@ public abstract class ConsistencyRequest { @Nonnull protected abstract CheckConsistencyRequest.ModeCase getMode(); + /** + * Internal accessor for the consistency token. + * Must be public to be accessible from the stub package. + */ + @InternalApi + @Nullable + public abstract String getConsistencyToken(); + public static ConsistencyRequest forReplication(String tableId) { return new AutoValue_ConsistencyRequest( - tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES); + tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null); + } + + /** + * Creates a request to check consistency using an existing token. + * + * @param tableId The table ID. + * @param consistencyToken The token to check. + */ + public static ConsistencyRequest forReplication(String tableId, String consistencyToken) { + return new AutoValue_ConsistencyRequest( + tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, consistencyToken); } public static ConsistencyRequest forDataBoost(String tableId) { return new AutoValue_ConsistencyRequest( - tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES); + tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null); } @InternalApi @@ -68,4 +88,4 @@ public GenerateConsistencyTokenRequest toGenerateTokenProto( return builder.setName(tableName.toString()).build(); } -} +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java index 7cdcb66599..395a77cdb3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java @@ -93,6 +93,15 @@ static AwaitConsistencyCallable create( @Override public ApiFuture futureCall( final ConsistencyRequest consistencyRequest, final ApiCallContext apiCallContext) { + + // If the token is already provided, skip generation and poll directly. + if (consistencyRequest.getConsistencyToken() != null) { + CheckConsistencyRequest request = + consistencyRequest.toCheckConsistencyProto( + requestContext, consistencyRequest.getConsistencyToken()); + return pollToken(request, apiCallContext); + } + ApiFuture tokenFuture = generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java index 0bf3731161..e89bd8fbb5 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java @@ -1657,6 +1657,31 @@ public void testTestIamPermissions() { assertThat(actualResult).containsExactly("bigtable.backups.get"); } + @Test + public void testWaitForConsistencyWithToken() { + // Setup + Mockito.when(mockStub.awaitConsistencyCallable()).thenReturn(mockAwaitConsistencyCallable); + + String token = "my-token"; + ConsistencyRequest expectedRequest = ConsistencyRequest.forReplication(TABLE_ID, token); + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + Mockito.when(mockAwaitConsistencyCallable.futureCall(expectedRequest)) + .thenAnswer( + (Answer>) + invocationOnMock -> { + wasCalled.set(true); + return ApiFutures.immediateFuture(null); + }); + + // Execute + adminClient.waitForConsistency(TABLE_ID, token); + + // Verify + assertThat(wasCalled.get()).isTrue(); + } + private void mockOperationResult( OperationCallable callable, ReqT request, diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java index d9e40242a1..c3b99a4e68 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java @@ -79,4 +79,22 @@ public void testToGenerateTokenProto() { assertThat(generateRequest.getName()) .isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID)); } + + @Test + public void testToCheckConsistencyProtoWithToken() { + ConsistencyRequest consistencyRequest = + ConsistencyRequest.forReplication(TABLE_ID, CONSISTENCY_TOKEN); + + TableAdminRequestContext requestContext = + TableAdminRequestContext.create(PROJECT_ID, INSTANCE_ID); + + CheckConsistencyRequest checkConsistencyRequest = + consistencyRequest.toCheckConsistencyProto(requestContext, CONSISTENCY_TOKEN); + + assertThat(checkConsistencyRequest.getName()) + .isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID)); + assertThat(checkConsistencyRequest.getConsistencyToken()).isEqualTo(CONSISTENCY_TOKEN); + assertThat(checkConsistencyRequest.getModeCase()) + .isEqualTo(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java index 2628cdf224..a39020b54d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java @@ -42,6 +42,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; @@ -325,4 +326,35 @@ public void testAwaitReplicationCallablePolling() throws Exception { consistentFuture.get(1, TimeUnit.SECONDS); } + + @Test + public void testWithProvidedToken() throws Exception { + // 1. Setup: Request with a pre-existing token + String existingToken = "existing-token"; + ConsistencyRequest consistencyRequest = + ConsistencyRequest.forReplication(TABLE_ID, existingToken); + + // 2. Setup: Mock the check operation to succeed immediately + CheckConsistencyRequest expectedCheckRequest = + CheckConsistencyRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .setConsistencyToken(existingToken) + .setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build()) + .build(); + CheckConsistencyResponse expectedResponse = + CheckConsistencyResponse.newBuilder().setConsistent(true).build(); + + Mockito.when(mockCheckConsistencyCallable.futureCall(expectedCheckRequest, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse)); + + // 3. Execute + ApiFuture future = awaitConsistencyCallable.futureCall(consistencyRequest, CALL_CONTEXT); + future.get(1, TimeUnit.SECONDS); + + // 4. Verify: Generate was NEVER called, Check WAS called + Mockito.verify(mockGenerateConsistencyTokenCallable, Mockito.never()) + .futureCall(ArgumentMatchers.any(GenerateConsistencyTokenRequest.class), ArgumentMatchers.any(ApiCallContext.class)); + Mockito.verify(mockCheckConsistencyCallable, Mockito.times(1)) + .futureCall(expectedCheckRequest, CALL_CONTEXT); + } } From 6fb1fabff42241764b0eba700da89d24a9bd342e Mon Sep 17 00:00:00 2001 From: cloud-java-bot Date: Mon, 26 Jan 2026 21:12:10 +0000 Subject: [PATCH 2/2] chore: generate libraries at Mon Jan 26 21:09:41 UTC 2026 --- .../admin/v2/BigtableTableAdminClient.java | 16 +++++++--------- .../admin/v2/models/ConsistencyRequest.java | 6 +++--- .../v2/stub/AwaitConsistencyCallableTest.java | 4 +++- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java index 9e4a1a4bf5..3068352c06 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java @@ -1461,8 +1461,8 @@ public ApiFuture awaitReplicationAsync(final String tableId) { } /** - * Generates a consistency token and polls it until replication is consistent. - * Blocks until completion. + * Generates a consistency token and polls it until replication is consistent. Blocks until + * completion. * * @param tableId The table to check. */ @@ -1480,16 +1480,14 @@ public ApiFuture waitForConsistencyAsync(String tableId) { } /** - * Polls an existing consistency token until replication is consistent. - * Useful for checking consistency of a token generated in a separate process. - * Blocks until completion. + * Polls an existing consistency token until replication is consistent. Useful for checking + * consistency of a token generated in a separate process. Blocks until completion. * * @param tableId The table to check. * @param consistencyToken The token to poll. */ public void waitForConsistency(String tableId, String consistencyToken) { - ApiExceptions.callAndTranslateApiException( - waitForConsistencyAsync(tableId, consistencyToken)); + ApiExceptions.callAndTranslateApiException(waitForConsistencyAsync(tableId, consistencyToken)); } /** @@ -1499,8 +1497,8 @@ public void waitForConsistency(String tableId, String consistencyToken) { * @param consistencyToken The token to poll. */ public ApiFuture waitForConsistencyAsync(String tableId, String consistencyToken) { - return stub.awaitConsistencyCallable().futureCall( - ConsistencyRequest.forReplication(tableId, consistencyToken)); + return stub.awaitConsistencyCallable() + .futureCall(ConsistencyRequest.forReplication(tableId, consistencyToken)); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java index 9dc7f76626..b2a79efcc1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java @@ -35,8 +35,8 @@ public abstract class ConsistencyRequest { protected abstract CheckConsistencyRequest.ModeCase getMode(); /** - * Internal accessor for the consistency token. - * Must be public to be accessible from the stub package. + * Internal accessor for the consistency token. Must be public to be accessible from the stub + * package. */ @InternalApi @Nullable @@ -88,4 +88,4 @@ public GenerateConsistencyTokenRequest toGenerateTokenProto( return builder.setName(tableName.toString()).build(); } -} \ No newline at end of file +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java index a39020b54d..0aad99b232 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java @@ -353,7 +353,9 @@ public void testWithProvidedToken() throws Exception { // 4. Verify: Generate was NEVER called, Check WAS called Mockito.verify(mockGenerateConsistencyTokenCallable, Mockito.never()) - .futureCall(ArgumentMatchers.any(GenerateConsistencyTokenRequest.class), ArgumentMatchers.any(ApiCallContext.class)); + .futureCall( + ArgumentMatchers.any(GenerateConsistencyTokenRequest.class), + ArgumentMatchers.any(ApiCallContext.class)); Mockito.verify(mockCheckConsistencyCallable, Mockito.times(1)) .futureCall(expectedCheckRequest, CALL_CONTEXT); }