Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,47 @@ public ApiFuture<Void> awaitReplicationAsync(final String tableId) {
return stub.awaitReplicationCallable().futureCall(tableName);
}

/**
* Generates a consistency token and polls it until replication is consistent. Blocks until
* completion.
*
* @param tableId The table to check.
*/
public void waitForConsistency(String tableId) {
awaitReplication(tableId);
}

/**
* Asynchronously generates a token and polls until consistent.
*
* @param tableId The table to check.
*/
public ApiFuture<Void> waitForConsistencyAsync(String tableId) {
return awaitReplicationAsync(tableId);
}

/**
* Polls an existing consistency token until replication is consistent. Useful for checking
* consistency of a token generated in a separate process. Blocks until completion.
*
* @param tableId The table to check.
* @param consistencyToken The token to poll.
*/
public void waitForConsistency(String tableId, String consistencyToken) {
ApiExceptions.callAndTranslateApiException(waitForConsistencyAsync(tableId, consistencyToken));
}

/**
* Asynchronously polls an existing consistency token.
*
* @param tableId The table to check.
* @param consistencyToken The token to poll.
*/
public ApiFuture<Void> waitForConsistencyAsync(String tableId, String consistencyToken) {
return stub.awaitConsistencyCallable()
.futureCall(ConsistencyRequest.forReplication(tableId, consistencyToken));
}

/**
* Creates a new authorized view with the specified configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

@AutoValue
public abstract class ConsistencyRequest {
Expand All @@ -33,14 +34,33 @@ public abstract class ConsistencyRequest {
@Nonnull
protected abstract CheckConsistencyRequest.ModeCase getMode();

/**
* Internal accessor for the consistency token. Must be public to be accessible from the stub
* package.
*/
@InternalApi
@Nullable
public abstract String getConsistencyToken();

public static ConsistencyRequest forReplication(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null);
}

/**
* Creates a request to check consistency using an existing token.
*
* @param tableId The table ID.
* @param consistencyToken The token to check.
*/
public static ConsistencyRequest forReplication(String tableId, String consistencyToken) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, consistencyToken);
}

public static ConsistencyRequest forDataBoost(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES);
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null);
}

@InternalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ static AwaitConsistencyCallable create(
@Override
public ApiFuture<Void> futureCall(
final ConsistencyRequest consistencyRequest, final ApiCallContext apiCallContext) {

// If the token is already provided, skip generation and poll directly.
if (consistencyRequest.getConsistencyToken() != null) {
CheckConsistencyRequest request =
consistencyRequest.toCheckConsistencyProto(
requestContext, consistencyRequest.getConsistencyToken());
return pollToken(request, apiCallContext);
}

ApiFuture<GenerateConsistencyTokenResponse> tokenFuture =
generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,31 @@ public void testTestIamPermissions() {
assertThat(actualResult).containsExactly("bigtable.backups.get");
}

@Test
public void testWaitForConsistencyWithToken() {
// Setup
Mockito.when(mockStub.awaitConsistencyCallable()).thenReturn(mockAwaitConsistencyCallable);

String token = "my-token";
ConsistencyRequest expectedRequest = ConsistencyRequest.forReplication(TABLE_ID, token);

final AtomicBoolean wasCalled = new AtomicBoolean(false);

Mockito.when(mockAwaitConsistencyCallable.futureCall(expectedRequest))
.thenAnswer(
(Answer<ApiFuture<Void>>)
invocationOnMock -> {
wasCalled.set(true);
return ApiFutures.immediateFuture(null);
});

// Execute
adminClient.waitForConsistency(TABLE_ID, token);

// Verify
assertThat(wasCalled.get()).isTrue();
}

private <ReqT, RespT, MetaT> void mockOperationResult(
OperationCallable<ReqT, RespT, MetaT> callable,
ReqT request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,22 @@ public void testToGenerateTokenProto() {
assertThat(generateRequest.getName())
.isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID));
}

@Test
public void testToCheckConsistencyProtoWithToken() {
ConsistencyRequest consistencyRequest =
ConsistencyRequest.forReplication(TABLE_ID, CONSISTENCY_TOKEN);

TableAdminRequestContext requestContext =
TableAdminRequestContext.create(PROJECT_ID, INSTANCE_ID);

CheckConsistencyRequest checkConsistencyRequest =
consistencyRequest.toCheckConsistencyProto(requestContext, CONSISTENCY_TOKEN);

assertThat(checkConsistencyRequest.getName())
.isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID));
assertThat(checkConsistencyRequest.getConsistencyToken()).isEqualTo(CONSISTENCY_TOKEN);
assertThat(checkConsistencyRequest.getModeCase())
.isEqualTo(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
Expand Down Expand Up @@ -325,4 +326,37 @@ public void testAwaitReplicationCallablePolling() throws Exception {

consistentFuture.get(1, TimeUnit.SECONDS);
}

@Test
public void testWithProvidedToken() throws Exception {
// 1. Setup: Request with a pre-existing token
String existingToken = "existing-token";
ConsistencyRequest consistencyRequest =
ConsistencyRequest.forReplication(TABLE_ID, existingToken);

// 2. Setup: Mock the check operation to succeed immediately
CheckConsistencyRequest expectedCheckRequest =
CheckConsistencyRequest.newBuilder()
.setName(TABLE_NAME.toString())
.setConsistencyToken(existingToken)
.setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build())
.build();
CheckConsistencyResponse expectedResponse =
CheckConsistencyResponse.newBuilder().setConsistent(true).build();

Mockito.when(mockCheckConsistencyCallable.futureCall(expectedCheckRequest, CALL_CONTEXT))
.thenReturn(ApiFutures.immediateFuture(expectedResponse));

// 3. Execute
ApiFuture<Void> future = awaitConsistencyCallable.futureCall(consistencyRequest, CALL_CONTEXT);
future.get(1, TimeUnit.SECONDS);

// 4. Verify: Generate was NEVER called, Check WAS called
Mockito.verify(mockGenerateConsistencyTokenCallable, Mockito.never())
.futureCall(
ArgumentMatchers.any(GenerateConsistencyTokenRequest.class),
ArgumentMatchers.any(ApiCallContext.class));
Mockito.verify(mockCheckConsistencyCallable, Mockito.times(1))
.futureCall(expectedCheckRequest, CALL_CONTEXT);
}
}
Loading