Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.replication.metrics;

/**
* Interface for metrics specific to ReplicationLogDiscoveryReplay operations. Extends the base
* MetricsReplicationLogDiscovery with replay-specific metrics.
*/
public interface MetricsReplicationLogDiscoveryReplay extends MetricsReplicationLogDiscovery {

String CONSISTENCY_POINT = "consistencyPoint";
String CONSISTENCY_POINT_DESC =
"Consistency point timestamp in milliseconds for the HA Group during replay";

/**
* Updates the consistency point metric. The consistency point represents the timestamp up to
* which all mutations have been replayed and the data is consistent for failover or read
* operations.
* @param consistencyPointMs The consistency point timestamp in milliseconds
*/
void updateConsistencyPoint(long consistencyPointMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,29 @@
*/
package org.apache.phoenix.replication.metrics;

import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

/** Implementation of metrics source for ReplicationLogDiscoveryReplay operations. */
public class MetricsReplicationLogDiscoveryReplayImpl extends MetricsReplicationLogDiscoveryImpl {
public class MetricsReplicationLogDiscoveryReplayImpl extends MetricsReplicationLogDiscoveryImpl
implements MetricsReplicationLogDiscoveryReplay {

private static final String METRICS_NAME = "ReplicationLogDiscoveryReplay";
private static final String METRICS_DESCRIPTION =
"Metrics about Replication Replay Log Discovery for an HA Group";
private static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;

private final MutableGaugeLong consistencyPoint;

public MetricsReplicationLogDiscoveryReplayImpl(final String haGroupName) {
super(MetricsReplicationLogDiscoveryReplayImpl.METRICS_NAME,
MetricsReplicationLogDiscoveryReplayImpl.METRICS_DESCRIPTION,
MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT,
MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT + ",haGroup=" + haGroupName);
consistencyPoint = getMetricsRegistry().newGauge(CONSISTENCY_POINT, CONSISTENCY_POINT_DESC, 0L);
}

@Override
public void updateConsistencyPoint(long consistencyPointMs) {
consistencyPoint.set(consistencyPointMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.phoenix.replication.ReplicationLogTracker;
import org.apache.phoenix.replication.ReplicationRound;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplay;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -290,6 +291,18 @@ protected void initializeLastRoundProcessed() throws IOException {
public void replay() throws IOException {
LOG.info("Starting replay with lastRoundProcessed={}, lastRoundInSync={}", lastRoundProcessed,
lastRoundInSync);

// Update consistency point metric at the start of replay
try {
long consistencyPoint = getConsistencyPoint();
LOG.debug("Consistency point for HAGroup: {} before starting the replay is {}.", haGroupName,
consistencyPoint);
getReplayMetrics().updateConsistencyPoint(consistencyPoint);
} catch (IOException exception) {
LOG.warn("Failed to get the consistency point for HA Group: {} at start of replay",
haGroupName, exception);
}

Optional<ReplicationRound> optionalNextRound = getFirstRoundToProcess();
LOG.info("Found first round to process as {} for haGroup: {}", optionalNextRound, haGroupName);
while (optionalNextRound.isPresent()) {
Expand Down Expand Up @@ -340,6 +353,18 @@ public void replay() throws IOException {
default:
throw new IllegalStateException("Unexpected state: " + currentState);
}

// Update consistency point metric after processing each round
try {
long consistencyPoint = getConsistencyPoint();
LOG.debug("Consistency point for HAGroup: {} after processing round: {} is {}", haGroupName,
replicationRound, consistencyPoint);
getReplayMetrics().updateConsistencyPoint(consistencyPoint);
} catch (IOException exception) {
LOG.warn("Failed to get the consistency point for HA Group: {} after processing round: {}",
haGroupName, replicationRound, exception);
}

optionalNextRound = getNextRoundToProcess();
}

Expand Down Expand Up @@ -383,6 +408,14 @@ protected MetricsReplicationLogDiscovery createMetricsSource() {
return new MetricsReplicationLogDiscoveryReplayImpl(haGroupName);
}

/**
* Returns the replay-specific metrics interface.
* @return MetricsReplicationLogDiscoveryReplay instance
*/
protected MetricsReplicationLogDiscoveryReplay getReplayMetrics() {
return (MetricsReplicationLogDiscoveryReplay) getMetrics();
}

@Override
public String getExecutorThreadNameFormat() {
return EXECUTOR_THREAD_NAME_FORMAT;
Expand Down Expand Up @@ -487,4 +520,55 @@ public enum ReplicationReplayState {
DEGRADED, // degraded for writer
SYNCED_RECOVERY // came back from degraded → standby, needs rewind
}

/**
* Returns the consistency point timestamp based on the current replication replay state. The
* consistency point in a standby cluster is defined as the timestamp such that all mutations
* whose timestamp less than this consistency point timestamp have been replayed
* @return The consistency point timestamp in milliseconds
* @throws IOException if the consistency point cannot be determined based on current state
*/
public long getConsistencyPoint() throws IOException {

ReplicationReplayState currentState = replicationReplayState.get();
long consistencyPoint = 0L;

switch (currentState) {
case SYNC:
// In SYNC state: prefer minimum timestamp from in-progress files (if any),
// otherwise use lastRoundInSync end time
Optional<Long> optionalMinTimestampInProgressTimestamp =
getMinTimestampFromInProgressFiles();
if (optionalMinTimestampInProgressTimestamp.isPresent()) {
// Use minimum timestamp from in-progress files as consistency point
consistencyPoint = optionalMinTimestampInProgressTimestamp.get();
} else if (lastRoundInSync != null) {
// Use lastRoundInSync end time if no in-progress files
// Since we are in sync mode, both lastRoundProcessed and lastRoundInSync would be same.
// However, using lastRoundInSync to be on safe side.
consistencyPoint = lastRoundInSync.getEndTime();
} else {
throw new IOException(
"Not able to derive consistency point because In Progress directory is empty and lastRoundInSync is not initialized.");
}
break;
case DEGRADED:
case SYNCED_RECOVERY:
// In DEGRADED or SYNCED_RECOVERY state: use lastRoundInSync end time
// (the last known sync point before degradation/recovery)
if (lastRoundInSync != null) {
consistencyPoint = lastRoundInSync.getEndTime();
} else {
throw new IOException(
"Not able to derive consistency point because lastRoundInSync is not initialized.");
}
break;
default:
// Invalid or uninitialized state
throw new IOException(
"Not able to derive consistency point for current state: " + currentState);
}

return consistencyPoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -193,7 +194,7 @@ protected void startReplicationReplay() throws IOException, SQLException {
List<String> replicationGroups = getReplicationGroups();
LOG.info("{} number of HA Groups found to start Replication Replay", replicationGroups.size());
for (String replicationGroup : replicationGroups) {
ReplicationLogReplay.get(conf, replicationGroup).startReplay();
getReplicationLogReplay(replicationGroup).startReplay();
}
}

Expand All @@ -204,14 +205,36 @@ protected void stopReplicationReplay() throws IOException, SQLException {
List<String> replicationGroups = getReplicationGroups();
LOG.info("{} number of HA Groups found to stop Replication Replay", replicationGroups.size());
for (String replicationGroup : replicationGroups) {
ReplicationLogReplay replicationLogReplay = ReplicationLogReplay.get(conf, replicationGroup);
ReplicationLogReplay replicationLogReplay = getReplicationLogReplay(replicationGroup);
replicationLogReplay.stopReplay();
replicationLogReplay.close();
}
}

/**
* Returns the minimum consistency point across all HA groups in the cluster. See
* {@link ReplicationLogDiscoveryReplay#getConsistencyPoint()} for definition of consistency point
* for a particular HA Group.
* @return The minimum consistency point timestamp in milliseconds across all HA groups
* @throws IOException if there's an error retrieving consistency points from replication groups
* @throws SQLException if there's an error accessing HA group information
*/
protected long getConsistencyPoint() throws IOException, SQLException {
long consistencyPoint = EnvironmentEdgeManager.currentTime();
List<String> replicationGroups = getReplicationGroups();
for (String replicationGroup : replicationGroups) {
consistencyPoint = Math.min(getReplicationLogReplay(replicationGroup)
.getReplicationReplayLogDiscovery().getConsistencyPoint(), consistencyPoint);
}
return consistencyPoint;
}

/** Returns the list of HA groups on the cluster */
protected List<String> getReplicationGroups() throws SQLException {
return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
}

protected ReplicationLogReplay getReplicationLogReplay(final String haGroupName) {
return ReplicationLogReplay.get(conf, haGroupName);
}
}
Loading