Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,14 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent(
// trigger replicas to offline
onReplicaBecomeOffline(offlineReplicas);
}

// Try to complete rebalance tasks for the buckets in the response.
// This is essential for leader-only migrations to ensure they wait for the tablet
// server to acknowledge the leader change before proceeding to the next migration.
for (NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket :
notifyLeaderAndIsrResultForBuckets) {
tryToCompleteRebalanceTask(notifyLeaderAndIsrResultForBucket.getTableBucket());
Copy link
Copy Markdown
Contributor

@LiebingYu LiebingYu Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After switching to sequential leader transfer one bucket at a time, a new issue arises: if the leader transfer fails for any single bucket, it may block all subsequent leader transfers. However, I understand it is acceptable to not address this issue for now, but users must manually handle such blocking situations. WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinks @LiebingYu. This problem will be trace by this issue: #3096

}
}

private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
Expand Down Expand Up @@ -1365,12 +1373,15 @@ public void tryToExecuteRebalanceTask(RebalancePlanForBucket planForBucket) {

if (planForBucket.isLeaderChanged() && !reassignment.isBeingReassigned()) {
// buckets only need to change leader like leader replica rebalance.
// Don't finish the task immediately; wait for the NotifyLeaderAndIsr response
// from the tablet server to confirm the leader change has been applied.
// This ensures leader migrations are executed sequentially, avoiding excessive
// pressure on tablet servers (especially for KV tables).
LOG.info("trigger leader election for tableBucket {}.", tableBucket);
tableBucketStateMachine.handleStateChange(
Collections.singleton(tableBucket),
OnlineBucket,
new ReassignmentLeaderElection(newReplicas));
rebalanceManager.finishRebalanceTask(tableBucket, RebalanceStatus.COMPLETED);
} else {
try {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
import org.apache.fluss.rpc.messages.ApiMessage;
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrRequest;
import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse;
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.protocol.ApiKeys;
Expand Down Expand Up @@ -102,6 +105,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1042,6 +1046,112 @@ void testDoBucketReassignment() throws Exception {
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupPath(ZkData.ServerIdZNode.path(3));
}

@Test
void testLeaderOnlyRebalanceExecutesSequentially() throws Exception {
// Set up controlled gateways that capture NotifyLeaderAndIsr calls.
// Gateways start in pass-through mode for table creation, then switch
// to controlled mode to verify sequential leader migration.
ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers =
new ConcurrentLinkedDeque<>();
int[] servers = zookeeperClient.getSortedTabletServerList();
Map<Integer, TabletServerGateway> gateways = new HashMap<>();
ControlledNotifyGateway[] controlledGateways = new ControlledNotifyGateway[servers.length];
for (int i = 0; i < servers.length; i++) {
ControlledNotifyGateway gw = new ControlledNotifyGateway(pendingTriggers);
gateways.put(servers[i], gw);
controlledGateways[i] = gw;
}
testCoordinatorChannelManager.setGateways(gateways);

// Create a table with 3 buckets, each assigned to replicas [0, 1, 2] with leader 0.
TablePath t1 = TablePath.of(defaultDatabase, "test_leader_rebalance_sequential");
Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
bucketAssignments.put(0, BucketAssignment.of(0, 1, 2));
bucketAssignments.put(1, BucketAssignment.of(0, 1, 2));
bucketAssignments.put(2, BucketAssignment.of(0, 1, 2));
TableAssignment tableAssignment = new TableAssignment(bucketAssignments);
long t1Id = metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false);

TableBucket tb0 = new TableBucket(t1Id, 0);
TableBucket tb1 = new TableBucket(t1Id, 1);
TableBucket tb2 = new TableBucket(t1Id, 2);

// Wait for initial leaders to be elected (all should be leader 0).
verifyIsr(tb0, 0, Arrays.asList(0, 1, 2));
verifyIsr(tb1, 0, Arrays.asList(0, 1, 2));
verifyIsr(tb2, 0, Arrays.asList(0, 1, 2));

// Switch to controlled mode: from now on, NotifyLeaderAndIsr responses
// are held until the test explicitly releases them.
for (ControlledNotifyGateway gw : controlledGateways) {
gw.enableControlMode();
}
pendingTriggers.clear();

// Create leader-only rebalance plan (replicas stay the same, only leaders change):
// tb0: leader 0 -> 1 (newReplicas=[1,0,2] puts target leader first)
// tb1: leader 0 -> 2 (newReplicas=[2,0,1] puts target leader first)
// tb2: leader 0 -> 1 (newReplicas=[1,2,0] puts target leader first)
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
rebalancePlan.put(
tb0,
new RebalancePlanForBucket(
tb0, 0, 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 0, 2)));
rebalancePlan.put(
tb1,
new RebalancePlanForBucket(
tb1, 0, 2, Arrays.asList(0, 1, 2), Arrays.asList(2, 0, 1)));
rebalancePlan.put(
tb2,
new RebalancePlanForBucket(
tb2, 0, 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 0)));

// Register the rebalance. Only the FIRST task should trigger a leader election
// because subsequent tasks must wait for the NotifyLeaderAndIsr response.
eventProcessor
.getRebalanceManager()
.registerRebalance(
"rebalance-leader-sequential", rebalancePlan, RebalanceStatus.NOT_STARTED);

// === Step 1: Verify only the first task started ===
// registerRebalance() is synchronous, so after it returns, the first task's
// leader election has triggered NotifyLeaderAndIsr to replica servers.
// Other tasks must NOT have started because the first response is held.
assertThat(pendingTriggers).isNotEmpty();
// All 3 tasks are still in progress (first executing, two waiting).
assertThat(countInProgressRebalanceTasks(tb0, tb1, tb2)).isEqualTo(3);

// Release the first batch - this allows the event processor to complete
// the first task and start the second.
drainPendingNotifyTriggers(pendingTriggers);

// === Step 2: Wait for the second task to start ===
// The event processor completes the first task via the response callback,
// then starts the second task which produces new pending triggers.
retry(Duration.ofMinutes(1), () -> assertThat(pendingTriggers).isNotEmpty());
// First task completed, 2 tasks remaining.
assertThat(countInProgressRebalanceTasks(tb0, tb1, tb2)).isEqualTo(2);
drainPendingNotifyTriggers(pendingTriggers);

// === Step 3: Wait for the third task to start ===
retry(Duration.ofMinutes(1), () -> assertThat(pendingTriggers).isNotEmpty());
// Two tasks completed, 1 task remaining.
assertThat(countInProgressRebalanceTasks(tb0, tb1, tb2)).isEqualTo(1);
drainPendingNotifyTriggers(pendingTriggers);

// === Step 4: Wait for the rebalance to complete ===
retry(
Duration.ofMinutes(1),
() ->
assertThat(eventProcessor.getRebalanceManager().hasInProgressRebalance())
.isFalse());

// Verify all leaders changed correctly.
verifyIsr(tb0, 1, Arrays.asList(0, 1, 2));
verifyIsr(tb1, 2, Arrays.asList(0, 1, 2));
verifyIsr(tb2, 1, Arrays.asList(0, 1, 2));
}

private void verifyIsr(TableBucket tb, int expectedLeader, List<Integer> expectedIsr)
throws Exception {
LeaderAndIsr leaderAndIsr =
Expand Down Expand Up @@ -1433,6 +1543,58 @@ private static List<TableBucket> allTableBuckets(
.collect(Collectors.toList());
}

private static void drainPendingNotifyTriggers(
ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers) {
CompletableFuture<Void> trigger;
while ((trigger = pendingTriggers.poll()) != null) {
trigger.complete(null);
}
}

private int countInProgressRebalanceTasks(TableBucket... buckets) {
int count = 0;
for (TableBucket tb : buckets) {
if (eventProcessor.getRebalanceManager().getRebalancePlanForBucket(tb) != null) {
count++;
}
}
return count;
}

/**
* A gateway that intercepts NotifyLeaderAndIsr calls for verifying sequential execution of
* leader migrations. In pass-through mode, it delegates to the parent. In controlled mode, it
* captures the response in a CompletableFuture trigger that the test must explicitly complete
* before the response is delivered.
*/
private static class ControlledNotifyGateway extends TestTabletServerGateway {
private volatile boolean controlMode = false;
private final ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers;

ControlledNotifyGateway(ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers) {
super(false, Collections.emptySet());
this.pendingTriggers = pendingTriggers;
}

void enableControlMode() {
controlMode = true;
}

@Override
public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
NotifyLeaderAndIsrRequest request) {
if (!controlMode) {
return super.notifyLeaderAndIsr(request);
}
// Build the proper success response using parent's logic.
NotifyLeaderAndIsrResponse response = super.notifyLeaderAndIsr(request).join();
// Return a future that completes only when the test releases the trigger.
CompletableFuture<Void> trigger = new CompletableFuture<>();
pendingTriggers.add(trigger);
return trigger.thenApply(v -> response);
}
}

private static class PartitionIdName {
private final long partitionId;
private final String partitionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,29 @@ public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
notifyLeaderAndIsrResponse.addAllNotifyBucketsLeaderResps(bucketsResps);
return CompletableFuture.completedFuture(notifyLeaderAndIsrResponse);
} else {
return CompletableFuture.completedFuture(new NotifyLeaderAndIsrResponse());
// Build success responses for all buckets in the request so that
// the coordinator can identify which buckets have been acknowledged.
List<PbNotifyLeaderAndIsrRespForBucket> bucketsResps = new ArrayList<>();
for (PbNotifyLeaderAndIsrReqForBucket pbNotifyLeaderForBucket :
notifyLeaderAndIsrRequest.getNotifyBucketsLeaderReqsList()) {
PbNotifyLeaderAndIsrRespForBucket pbNotifyLeaderRespForBucket =
new PbNotifyLeaderAndIsrRespForBucket();
pbNotifyLeaderRespForBucket
.setTableBucket()
.setTableId(pbNotifyLeaderForBucket.getTableBucket().getTableId())
.setBucketId(pbNotifyLeaderForBucket.getTableBucket().getBucketId());
if (pbNotifyLeaderForBucket.getTableBucket().hasPartitionId()) {
pbNotifyLeaderRespForBucket
.getTableBucket()
.setPartitionId(
pbNotifyLeaderForBucket.getTableBucket().getPartitionId());
}
bucketsResps.add(pbNotifyLeaderRespForBucket);
}
NotifyLeaderAndIsrResponse notifyLeaderAndIsrResponse =
new NotifyLeaderAndIsrResponse();
notifyLeaderAndIsrResponse.addAllNotifyBucketsLeaderResps(bucketsResps);
return CompletableFuture.completedFuture(notifyLeaderAndIsrResponse);
}
}

Expand Down