Skip to content

Commit 2af4779

Browse files
[fluss-server] Restrict replicasOnOffline to fatal errors only
Only fatal errors (STORAGE_EXCEPTION, LOG_STORAGE_EXCEPTION, KV_STORAGE_EXCEPTION, UNKNOWN_SERVER_ERROR) mark replicas offline. All other NotifyLeaderAndIsr errors are transient and do not affect leader election eligibility. Also fixes ReplicaManager to return LeaderNotAvailableException instead of StorageException when leaderId is null or negative.
1 parent 0e265ad commit 2af4779

4 files changed

Lines changed: 426 additions & 12 deletions

File tree

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.fluss.rpc.messages.RebalanceResponse;
6060
import org.apache.fluss.rpc.messages.RemoveServerTagResponse;
6161
import org.apache.fluss.rpc.protocol.ApiError;
62+
import org.apache.fluss.rpc.protocol.Errors;
6263
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
6364
import org.apache.fluss.server.coordinator.event.AddServerTagEvent;
6465
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
@@ -342,8 +343,14 @@ private void initCoordinatorContext() throws Exception {
342343
long start4loadTabletServer = System.currentTimeMillis();
343344
Map<Integer, TabletServerRegistration> tabletServerRegistrations =
344345
zooKeeperClient.getTabletServers(currentServers);
346+
List<Integer> skippedNullRegistration = new ArrayList<>();
347+
List<Integer> skippedNoEndpoint = new ArrayList<>();
345348
for (int server : currentServers) {
346349
TabletServerRegistration registration = tabletServerRegistrations.get(server);
350+
if (registration == null) {
351+
skippedNullRegistration.add(server);
352+
continue;
353+
}
347354
ServerInfo serverInfo =
348355
new ServerInfo(
349356
server,
@@ -357,6 +364,7 @@ private void initCoordinatorContext() throws Exception {
357364
"Can not find endpoint for listener name {} for tablet server {}",
358365
internalListenerName,
359366
serverInfo);
367+
skippedNoEndpoint.add(server);
360368
continue;
361369
}
362370
tabletServerInfos.add(serverInfo);
@@ -370,8 +378,30 @@ private void initCoordinatorContext() throws Exception {
370378

371379
coordinatorContext.setLiveTabletServers(tabletServerInfos);
372380
LOG.info(
373-
"Load tablet servers success in {}ms when initializing coordinator context.",
374-
System.currentTimeMillis() - start4loadTabletServer);
381+
"Load tablet servers success in {}ms when initializing coordinator context. "
382+
+ "ZK returned {} servers, loaded {} into liveSet, "
383+
+ "skipped {} (null registration), skipped {} (no endpoint). "
384+
+ "Live server IDs: {}",
385+
System.currentTimeMillis() - start4loadTabletServer,
386+
currentServers.length,
387+
tabletServerInfos.size(),
388+
skippedNullRegistration.size(),
389+
skippedNoEndpoint.size(),
390+
tabletServerInfos.stream()
391+
.map(s -> String.valueOf(s.id()))
392+
.collect(Collectors.joining(",")));
393+
if (!skippedNullRegistration.isEmpty()) {
394+
LOG.warn(
395+
"Skipped {} servers with null ZK registration: {}",
396+
skippedNullRegistration.size(),
397+
skippedNullRegistration);
398+
}
399+
if (!skippedNoEndpoint.isEmpty()) {
400+
LOG.warn(
401+
"Skipped {} servers with no internal endpoint: {}",
402+
skippedNoEndpoint.size(),
403+
skippedNoEndpoint);
404+
}
375405

376406
// init tablet server channels
377407
coordinatorChannelManager.startup(internalServerNodes);
@@ -938,11 +968,25 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent(
938968
notifyLeaderAndIsrResponseReceivedEvent.getNotifyLeaderAndIsrResultForBuckets();
939969
for (NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket :
940970
notifyLeaderAndIsrResultForBuckets) {
941-
// if the error code is not none, we will consider it as offline
942971
if (notifyLeaderAndIsrResultForBucket.failed()) {
943-
offlineReplicas.add(
944-
new TableBucketReplica(
945-
notifyLeaderAndIsrResultForBucket.getTableBucket(), serverId));
972+
Errors error = notifyLeaderAndIsrResultForBucket.getError().error();
973+
TableBucket tableBucket = notifyLeaderAndIsrResultForBucket.getTableBucket();
974+
if (isFatalReplicaError(error)) {
975+
LOG.warn(
976+
"Fatal NotifyLeaderAndIsr error for bucket {} on server {}: {}. "
977+
+ "Marking replica offline.",
978+
tableBucket,
979+
serverId,
980+
notifyLeaderAndIsrResultForBucket.getError());
981+
offlineReplicas.add(new TableBucketReplica(tableBucket, serverId));
982+
} else {
983+
LOG.warn(
984+
"Transient NotifyLeaderAndIsr error for bucket {} on server {}: {}. "
985+
+ "Replica remains online.",
986+
tableBucket,
987+
serverId,
988+
notifyLeaderAndIsrResultForBucket.getError());
989+
}
946990
}
947991
}
948992
if (!offlineReplicas.isEmpty()) {
@@ -951,6 +995,18 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent(
951995
}
952996
}
953997

998+
/**
999+
* Returns true if the error indicates a fatal replica failure (storage corruption, unknown
1000+
* internal error) that warrants excluding this replica from future leader elections. All other
1001+
* errors are considered transient and should NOT mark the replica offline.
1002+
*/
1003+
private static boolean isFatalReplicaError(Errors error) {
1004+
return error == Errors.STORAGE_EXCEPTION
1005+
|| error == Errors.LOG_STORAGE_EXCEPTION
1006+
|| error == Errors.KV_STORAGE_EXCEPTION
1007+
|| error == Errors.UNKNOWN_SERVER_ERROR;
1008+
}
1009+
9541010
private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
9551011
LOG.info("The replica {} become offline.", offlineReplicas);
9561012
for (TableBucketReplica offlineReplica : offlineReplicas) {

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.exception.InvalidColumnProjectionException;
2828
import org.apache.fluss.exception.InvalidCoordinatorException;
2929
import org.apache.fluss.exception.InvalidRequiredAcksException;
30+
import org.apache.fluss.exception.LeaderNotAvailableException;
3031
import org.apache.fluss.exception.LogOffsetOutOfRangeException;
3132
import org.apache.fluss.exception.LogStorageException;
3233
import org.apache.fluss.exception.NotLeaderOrFollowerException;
@@ -1188,17 +1189,16 @@ private void addFetcherForReplicas(
11881189
Integer leaderId = replica.getLeaderId();
11891190
TableBucket tb = replica.getTableBucket();
11901191
LogTablet logTablet = replica.getLogTablet();
1191-
if (leaderId == null) {
1192+
if (leaderId == null || leaderId < 0) {
11921193
result.put(
11931194
tb,
11941195
new NotifyLeaderAndIsrResultForBucket(
11951196
tb,
11961197
ApiError.fromThrowable(
1197-
new StorageException(
1198+
new LeaderNotAvailableException(
11981199
String.format(
1199-
"Could not find leader for follower replica %s while make "
1200-
+ "follower for %s.",
1201-
serverId, tb)))));
1200+
"No leader available for follower replica %s on server %s.",
1201+
tb, serverId)))));
12021202
} else {
12031203
bucketAndStatus.put(
12041204
tb,

0 commit comments

Comments
 (0)