diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java new file mode 100644 index 00000000000..f78b7621466 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.metrics; + +/** + * Interface for metrics specific to ReplicationLogDiscoveryReplay operations. Extends the base + * MetricsReplicationLogDiscovery with replay-specific metrics. + */ +public interface MetricsReplicationLogDiscoveryReplay extends MetricsReplicationLogDiscovery { + + String CONSISTENCY_POINT = "consistencyPoint"; + String CONSISTENCY_POINT_DESC = + "Consistency point timestamp in milliseconds for the HA Group during replay"; + + /** + * Updates the consistency point metric. The consistency point represents the timestamp up to + * which all mutations have been replayed and the data is consistent for failover or read + * operations. + * @param consistencyPointMs The consistency point timestamp in milliseconds + */ + void updateConsistencyPoint(long consistencyPointMs); +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java index c7dcf4f6ae9..16e65287d98 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java @@ -17,18 +17,29 @@ */ package org.apache.phoenix.replication.metrics; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + /** Implementation of metrics source for ReplicationLogDiscoveryReplay operations. */ -public class MetricsReplicationLogDiscoveryReplayImpl extends MetricsReplicationLogDiscoveryImpl { +public class MetricsReplicationLogDiscoveryReplayImpl extends MetricsReplicationLogDiscoveryImpl + implements MetricsReplicationLogDiscoveryReplay { private static final String METRICS_NAME = "ReplicationLogDiscoveryReplay"; private static final String METRICS_DESCRIPTION = "Metrics about Replication Replay Log Discovery for an HA Group"; private static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + private final MutableGaugeLong consistencyPoint; + public MetricsReplicationLogDiscoveryReplayImpl(final String haGroupName) { super(MetricsReplicationLogDiscoveryReplayImpl.METRICS_NAME, MetricsReplicationLogDiscoveryReplayImpl.METRICS_DESCRIPTION, MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT, MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT + ",haGroup=" + haGroupName); + consistencyPoint = getMetricsRegistry().newGauge(CONSISTENCY_POINT, CONSISTENCY_POINT_DESC, 0L); + } + + @Override + public void updateConsistencyPoint(long consistencyPointMs) { + consistencyPoint.set(consistencyPointMs); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index 36daca8b97d..991ee35b7b6 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -32,6 +32,7 @@ import org.apache.phoenix.replication.ReplicationLogTracker; import org.apache.phoenix.replication.ReplicationRound; import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplay; import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -290,6 +291,18 @@ protected void initializeLastRoundProcessed() throws IOException { public void replay() throws IOException { LOG.info("Starting replay with lastRoundProcessed={}, lastRoundInSync={}", lastRoundProcessed, lastRoundInSync); + + // Update consistency point metric at the start of replay + try { + long consistencyPoint = getConsistencyPoint(); + LOG.debug("Consistency point for HAGroup: {} before starting the replay is {}.", haGroupName, + consistencyPoint); + getReplayMetrics().updateConsistencyPoint(consistencyPoint); + } catch (IOException exception) { + LOG.warn("Failed to get the consistency point for HA Group: {} at start of replay", + haGroupName, exception); + } + Optional optionalNextRound = getFirstRoundToProcess(); LOG.info("Found first round to process as {} for haGroup: {}", optionalNextRound, haGroupName); while (optionalNextRound.isPresent()) { @@ -340,6 +353,18 @@ public void replay() throws IOException { default: throw new IllegalStateException("Unexpected state: " + currentState); } + + // Update consistency point metric after processing each round + try { + long consistencyPoint = getConsistencyPoint(); + LOG.debug("Consistency point for HAGroup: {} after processing round: {} is {}", haGroupName, + replicationRound, consistencyPoint); + getReplayMetrics().updateConsistencyPoint(consistencyPoint); + } catch (IOException exception) { + LOG.warn("Failed to get the consistency point for HA Group: {} after processing round: {}", + haGroupName, replicationRound, exception); + } + optionalNextRound = getNextRoundToProcess(); } @@ -383,6 +408,14 @@ protected MetricsReplicationLogDiscovery createMetricsSource() { return new MetricsReplicationLogDiscoveryReplayImpl(haGroupName); } + /** + * Returns the replay-specific metrics interface. + * @return MetricsReplicationLogDiscoveryReplay instance + */ + protected MetricsReplicationLogDiscoveryReplay getReplayMetrics() { + return (MetricsReplicationLogDiscoveryReplay) getMetrics(); + } + @Override public String getExecutorThreadNameFormat() { return EXECUTOR_THREAD_NAME_FORMAT; @@ -487,4 +520,55 @@ public enum ReplicationReplayState { DEGRADED, // degraded for writer SYNCED_RECOVERY // came back from degraded → standby, needs rewind } + + /** + * Returns the consistency point timestamp based on the current replication replay state. The + * consistency point in a standby cluster is defined as the timestamp such that all mutations + * whose timestamp less than this consistency point timestamp have been replayed + * @return The consistency point timestamp in milliseconds + * @throws IOException if the consistency point cannot be determined based on current state + */ + public long getConsistencyPoint() throws IOException { + + ReplicationReplayState currentState = replicationReplayState.get(); + long consistencyPoint = 0L; + + switch (currentState) { + case SYNC: + // In SYNC state: prefer minimum timestamp from in-progress files (if any), + // otherwise use lastRoundInSync end time + Optional optionalMinTimestampInProgressTimestamp = + getMinTimestampFromInProgressFiles(); + if (optionalMinTimestampInProgressTimestamp.isPresent()) { + // Use minimum timestamp from in-progress files as consistency point + consistencyPoint = optionalMinTimestampInProgressTimestamp.get(); + } else if (lastRoundInSync != null) { + // Use lastRoundInSync end time if no in-progress files + // Since we are in sync mode, both lastRoundProcessed and lastRoundInSync would be same. + // However, using lastRoundInSync to be on safe side. + consistencyPoint = lastRoundInSync.getEndTime(); + } else { + throw new IOException( + "Not able to derive consistency point because In Progress directory is empty and lastRoundInSync is not initialized."); + } + break; + case DEGRADED: + case SYNCED_RECOVERY: + // In DEGRADED or SYNCED_RECOVERY state: use lastRoundInSync end time + // (the last known sync point before degradation/recovery) + if (lastRoundInSync != null) { + consistencyPoint = lastRoundInSync.getEndTime(); + } else { + throw new IOException( + "Not able to derive consistency point because lastRoundInSync is not initialized."); + } + break; + default: + // Invalid or uninitialized state + throw new IOException( + "Not able to derive consistency point for current state: " + currentState); + } + + return consistencyPoint; + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index fe380dea857..24d40faac77 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.phoenix.jdbc.HAGroupStoreManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,7 +194,7 @@ protected void startReplicationReplay() throws IOException, SQLException { List replicationGroups = getReplicationGroups(); LOG.info("{} number of HA Groups found to start Replication Replay", replicationGroups.size()); for (String replicationGroup : replicationGroups) { - ReplicationLogReplay.get(conf, replicationGroup).startReplay(); + getReplicationLogReplay(replicationGroup).startReplay(); } } @@ -204,14 +205,36 @@ protected void stopReplicationReplay() throws IOException, SQLException { List replicationGroups = getReplicationGroups(); LOG.info("{} number of HA Groups found to stop Replication Replay", replicationGroups.size()); for (String replicationGroup : replicationGroups) { - ReplicationLogReplay replicationLogReplay = ReplicationLogReplay.get(conf, replicationGroup); + ReplicationLogReplay replicationLogReplay = getReplicationLogReplay(replicationGroup); replicationLogReplay.stopReplay(); replicationLogReplay.close(); } } + /** + * Returns the minimum consistency point across all HA groups in the cluster. See + * {@link ReplicationLogDiscoveryReplay#getConsistencyPoint()} for definition of consistency point + * for a particular HA Group. + * @return The minimum consistency point timestamp in milliseconds across all HA groups + * @throws IOException if there's an error retrieving consistency points from replication groups + * @throws SQLException if there's an error accessing HA group information + */ + protected long getConsistencyPoint() throws IOException, SQLException { + long consistencyPoint = EnvironmentEdgeManager.currentTime(); + List replicationGroups = getReplicationGroups(); + for (String replicationGroup : replicationGroups) { + consistencyPoint = Math.min(getReplicationLogReplay(replicationGroup) + .getReplicationReplayLogDiscovery().getConsistencyPoint(), consistencyPoint); + } + return consistencyPoint; + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); } + + protected ReplicationLogReplay getReplicationLogReplay(final String haGroupName) { + return ReplicationLogReplay.get(conf, haGroupName); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java index 68045bf5e04..0e85520eef9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java @@ -22,11 +22,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; import java.io.IOException; import java.net.URI; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -48,8 +51,7 @@ import org.apache.phoenix.replication.ReplicationLogTracker; import org.apache.phoenix.replication.ReplicationRound; import org.apache.phoenix.replication.ReplicationShardDirectoryManager; -import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker; -import org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl; +import org.apache.phoenix.replication.metrics.*; import org.apache.phoenix.util.HAGroupStoreTestUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.After; @@ -591,6 +593,10 @@ public void testReplay_SyncState_ProcessMultipleRounds() throws IOException { try { TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + + // Initialize the discovery + discovery.init(); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); discovery.setLastRoundProcessed(initialRound); @@ -639,6 +645,11 @@ public void testReplay_SyncState_ProcessMultipleRounds() throws IOException { // Verify triggerFailover was not called assertEquals("triggerFailover should not be called", 0, discovery.getTriggerFailoverCallCount()); + + // Verify consistency point: in SYNC state, should be lastRoundInSync.getEndTime() + long consistencyPoint = discovery.getConsistencyPoint(); + assertEquals("Consistency point should match lastRoundInSync.getEndTime() in SYNC state", + expectedRound3.getEndTime(), consistencyPoint); } finally { EnvironmentEdgeManager.reset(); } @@ -678,6 +689,10 @@ public void testReplay_DegradedState_MultipleRounds() throws IOException { try { TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + + // Initialize the discovery + discovery.init(); + discovery.setLastRoundProcessed( new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime)); ReplicationRound lastRoundInSyncBeforeReplay = @@ -724,6 +739,12 @@ public void testReplay_DegradedState_MultipleRounds() throws IOException { // Verify triggerFailover was not called assertEquals("triggerFailover should not be called", 0, discovery.getTriggerFailoverCallCount()); + + // Verify consistency point: in DEGRADED state, should be lastRoundInSync.getEndTime() + long consistencyPoint = discovery.getConsistencyPoint(); + assertEquals( + "Consistency point should match lastRoundInSync.getEndTime() in DEGRADED state", + lastRoundInSyncBeforeReplay.getEndTime(), consistencyPoint); } finally { EnvironmentEdgeManager.reset(); } @@ -764,6 +785,9 @@ public void testReplay_SyncedRecoveryState_RewindToLastInSync() throws IOExcepti TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + // Initialize the discovery + discovery.init(); + // Set initial state: lastRoundProcessed is ahead, lastRoundInSync is behind ReplicationRound lastInSyncRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); @@ -842,6 +866,13 @@ public void testReplay_SyncedRecoveryState_RewindToLastInSync() throws IOExcepti // Verify triggerFailover was not called assertEquals("triggerFailover should not be called", 0, discovery.getTriggerFailoverCallCount()); + + // Verify consistency point: after transition to SYNC, should be + // lastRoundInSync.getEndTime() + long consistencyPoint = discovery.getConsistencyPoint(); + assertEquals( + "Consistency point should match lastRoundInSync.getEndTime() after SYNC transition", + expectedSixthRound.getEndTime(), consistencyPoint); } finally { EnvironmentEdgeManager.reset(); } @@ -881,6 +912,10 @@ public void testReplay_StateTransition_SyncToDegradedDuringProcessing() throws I try { TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + + // Initialize the discovery + discovery.init(); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); discovery.setLastRoundProcessed(initialRound); @@ -947,6 +982,12 @@ public void testReplay_StateTransition_SyncToDegradedDuringProcessing() throws I // Verify triggerFailover was not called assertEquals("triggerFailover should not be called", 0, discovery.getTriggerFailoverCallCount()); + + // Verify consistency point: in DEGRADED state, should be lastRoundInSync.getEndTime() + long consistencyPoint = discovery.getConsistencyPoint(); + assertEquals( + "Consistency point should match lastRoundInSync.getEndTime() in DEGRADED state", + expectedRound1.getEndTime(), consistencyPoint); } finally { EnvironmentEdgeManager.reset(); } @@ -986,6 +1027,9 @@ public void testReplay_StateTransition_DegradedToSyncedRecovery() throws IOExcep TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + // Initialize the discovery + discovery.init(); + ReplicationRound lastInSyncRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); discovery.setLastRoundProcessed(new ReplicationRound(initialEndTime + roundTimeMills, @@ -1140,6 +1184,9 @@ else if (roundCount == 5) { } }; + // Initialize the discovery + discoveryWithTransitions.init(); + discoveryWithTransitions.setLastRoundProcessed(initialRound); discoveryWithTransitions.setLastRoundInSync(initialRound); discoveryWithTransitions @@ -1217,6 +1264,13 @@ else if (roundCount == 5) { // Verify triggerFailover was not called assertEquals("triggerFailover should not be called", 0, discoveryWithTransitions.getTriggerFailoverCallCount()); + + // Verify consistency point: after transition to SYNC, should be + // lastRoundInSync.getEndTime() + long consistencyPoint = discoveryWithTransitions.getConsistencyPoint(); + assertEquals( + "Consistency point should match lastRoundInSync.getEndTime() after SYNC transition", + expectedLastRound.getEndTime(), consistencyPoint); } finally { EnvironmentEdgeManager.reset(); } @@ -1254,6 +1308,10 @@ public void testReplay_NoRoundsToProcess() throws IOException { try { TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + + // Initialize the discovery + discovery.init(); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); discovery.setLastRoundProcessed(initialRound); @@ -1288,6 +1346,11 @@ public void testReplay_NoRoundsToProcess() throws IOException { // Verify triggerFailover was not called assertEquals("triggerFailover should not be called", 0, discovery.getTriggerFailoverCallCount()); + + // Verify consistency point: in SYNC state, should be lastRoundInSync.getEndTime() + long consistencyPoint = discovery.getConsistencyPoint(); + assertEquals("Consistency point should match lastRoundInSync.getEndTime() in SYNC state", + lastRoundInSyncBeforeReplay.getEndTime(), consistencyPoint); } finally { EnvironmentEdgeManager.reset(); } @@ -1399,6 +1462,9 @@ public void testReplay_LastRoundInSync_NotInitialized() throws IOException { TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + // Initialize the discovery + discovery.init(); + // Set lastRoundInSync with start time 0 (the new case being tested) ReplicationRound lastRoundInSyncWithZeroStart = new ReplicationRound(0L, roundTimeMills); discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart); @@ -1604,6 +1670,9 @@ protected void processRound(ReplicationRound replicationRound) throws IOExceptio } }; + // Initialize the discovery + discovery.init(); + // Set lastRoundInSync with start time 0 ReplicationRound lastRoundInSyncWithZeroStart = new ReplicationRound(0L, roundTimeMills); discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart); @@ -1718,6 +1787,10 @@ public void testReplay_TriggerFailoverAfterProcessing() throws IOException { try { TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + + // Initialize the discovery + discovery.init(); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); discovery.setLastRoundProcessed(initialRound); @@ -1776,6 +1849,11 @@ public void testReplay_TriggerFailoverAfterProcessing() throws IOException { assertFalse("failoverPending should be set to false after failover is triggered", discovery.getFailoverPending()); + // Verify consistency point: in SYNC state, should be lastRoundInSync.getEndTime() + long consistencyPoint = discovery.getConsistencyPoint(); + assertEquals("Consistency point should match lastRoundInSync.getEndTime() in SYNC state", + expectedRound3.getEndTime(), consistencyPoint); + // TODO: Ensure cluster state is updated to ACTIVE_IN_SYNC once failover is triggered. } finally { EnvironmentEdgeManager.reset(); @@ -2033,6 +2111,115 @@ public void testTriggerFailover() throws IOException, SQLException { } } + /** + * Tests getConsistencyPoint method in SYNC state with in-progress files present. Should return + * the minimum timestamp from in-progress files. + */ + @Test + public void testGetConsistencyPointSyncStateWithInProgressFiles() throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to SYNC + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + // Create mock in-progress files + // Note: getFileTimestamp() parses the filename, so we don't need to mock it + Path file1 = new Path("/test/1704153660000_rs2.plog"); + Path file2 = new Path("/test/1704153600000_rs1.plog"); + Path file3 = new Path("/test/1704153720000_rs3.plog"); + List inProgressFiles = Arrays.asList(file1, file2, file3); + + // Mock fileTracker to return in-progress files + doReturn(inProgressFiles).when(tracker).getInProgressFiles(); + + // Call getConsistencyPoint + long consistencyPoint = discovery.getConsistencyPoint(); + + // Should return the minimum timestamp from in-progress files + assertEquals("Should return minimum timestamp from in-progress files", 1704153600000L, + consistencyPoint); + } + + /** + * Tests getConsistencyPoint method in SYNC state without in-progress files but with + * lastRoundInSync set. Should return lastRoundInSync.getEndTime(). + */ + @Test + public void testGetConsistencyPointSyncStateWithoutInProgressFilesWithLastRoundInSync() + throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to SYNC + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + // Mock empty in-progress files + doReturn(Collections.emptyList()).when(tracker).getInProgressFiles(); + + // Set lastRoundInSync + long endTime = 1704153600000L; + long roundTimeMills = discovery.getRoundTimeMills(); + ReplicationRound lastRoundInSync = new ReplicationRound(endTime - roundTimeMills, endTime); + discovery.setLastRoundInSync(lastRoundInSync); + + // Call getConsistencyPoint + long consistencyPoint = discovery.getConsistencyPoint(); + + // Should return lastRoundInSync.getEndTime() + assertEquals("Should return lastRoundInSync.getEndTime()", endTime, consistencyPoint); + } + + /** + * Tests getConsistencyPoint method in SYNC state without in-progress files and without + * lastRoundInSync. Should throw IOException. + */ + @Test + public void testGetConsistencyPointSyncStateWithoutInProgressFilesWithoutLastRoundInSync() + throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to SYNC + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + // Mock empty in-progress files + doReturn(Collections.emptyList()).when(tracker).getInProgressFiles(); + + // Don't set lastRoundInSync (should be null) + + // Call getConsistencyPoint - should throw IOException + try { + discovery.getConsistencyPoint(); + fail("Should throw IOException when in-progress files are empty and lastRoundInSync is null"); + } catch (IOException e) { + assertEquals("Error message should match", + "Not able to derive consistency point because In Progress directory is empty and lastRoundInSync is not initialized.", + e.getMessage()); + } + } + /** * Tests triggerFailover when * HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName) throws @@ -2103,6 +2290,168 @@ public void testTriggerFailover_InvalidClusterRoleTransitionExceptionFromHAGroup } } + /** + * Tests getConsistencyPoint method in DEGRADED state with lastRoundInSync set. Should return + * lastRoundInSync.getEndTime(). + */ + @Test + public void testGetConsistencyPointDegradedStateWithLastRoundInSync() throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to DEGRADED + discovery + .setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + + // Set lastRoundInSync + long endTime = 1704153600000L; + long roundTimeMills = discovery.getRoundTimeMills(); + ReplicationRound lastRoundInSync = new ReplicationRound(endTime - roundTimeMills, endTime); + discovery.setLastRoundInSync(lastRoundInSync); + + // Call getConsistencyPoint + long consistencyPoint = discovery.getConsistencyPoint(); + + // Should return lastRoundInSync.getEndTime() + assertEquals("Should return lastRoundInSync.getEndTime()", endTime, consistencyPoint); + } + + /** + * Tests getConsistencyPoint method in DEGRADED state without lastRoundInSync. Should throw + * IOException. + */ + @Test + public void testGetConsistencyPointDegradedStateWithoutLastRoundInSync() throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to DEGRADED + discovery + .setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + + // Don't set lastRoundInSync (should be null) + + // Call getConsistencyPoint - should throw IOException + try { + discovery.getConsistencyPoint(); + fail("Should throw IOException when lastRoundInSync is null in DEGRADED state"); + } catch (IOException e) { + assertEquals("Error message should match", + "Not able to derive consistency point because lastRoundInSync is not initialized.", + e.getMessage()); + } + } + + /** + * Tests getConsistencyPoint method in SYNCED_RECOVERY state with lastRoundInSync set. Should + * return lastRoundInSync.getEndTime(). + */ + @Test + public void testGetConsistencyPointSyncedRecoveryStateWithLastRoundInSync() throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to SYNCED_RECOVERY + discovery.setReplicationReplayState( + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY); + + // Set lastRoundInSync + long endTime = 1704153600000L; + long roundTimeMills = discovery.getRoundTimeMills(); + ReplicationRound lastRoundInSync = new ReplicationRound(endTime - roundTimeMills, endTime); + discovery.setLastRoundInSync(lastRoundInSync); + + // Call getConsistencyPoint + long consistencyPoint = discovery.getConsistencyPoint(); + + // Should return lastRoundInSync.getEndTime() + assertEquals("Should return lastRoundInSync.getEndTime()", endTime, consistencyPoint); + } + + /** + * Tests getConsistencyPoint method in SYNCED_RECOVERY state without lastRoundInSync. Should throw + * IOException. + */ + @Test + public void testGetConsistencyPointSyncedRecoveryStateWithoutLastRoundInSync() + throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to SYNCED_RECOVERY + discovery.setReplicationReplayState( + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY); + + // Don't set lastRoundInSync (should be null) + + // Call getConsistencyPoint - should throw IOException + try { + discovery.getConsistencyPoint(); + fail("Should throw IOException when lastRoundInSync is null in SYNCED_RECOVERY state"); + } catch (IOException e) { + assertEquals("Error message should match", + "Not able to derive consistency point because lastRoundInSync is not initialized.", + e.getMessage()); + } + } + + /** + * Tests getConsistencyPoint method in NOT_INITIALIZED state. Should throw IOException. + */ + @Test + public void testGetConsistencyPointNotInitializedState() throws IOException { + // Create ReplicationLogTracker + ReplicationLogTracker tracker = + Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, standbyUri)); + HAGroupStoreRecord haGroupStoreRecord = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, 0L, HighAvailabilityPolicy.FAILOVER.toString(), + peerZkUrl, zkUrl, peerZkUrl, 0L); + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); + + // Set state to NOT_INITIALIZED + discovery.setReplicationReplayState( + ReplicationLogDiscoveryReplay.ReplicationReplayState.NOT_INITIALIZED); + + // Call getConsistencyPoint - should throw IOException + try { + discovery.getConsistencyPoint(); + fail("Should throw IOException when state is NOT_INITIALIZED"); + } catch (IOException e) { + assertEquals("Error message should match", + "Not able to derive consistency point for current state: NOT_INITIALIZED", e.getMessage()); + } + } + /** * Testable implementation of ReplicationLogDiscoveryReplay for unit testing. Provides dependency * injection for HAGroupStoreRecord, tracks processed rounds, and supports simulating state @@ -2187,5 +2536,9 @@ public int getProcessRoundCallCount() { public List getProcessedRounds() { return new java.util.ArrayList<>(processedRounds); } + + public long getRoundTimeMills() { + return roundTimeMills; + } } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java new file mode 100644 index 00000000000..88b7a20b702 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URI; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.ClusterRoleRecord; +import org.apache.phoenix.jdbc.HAGroupStoreClient; +import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility; +import org.apache.phoenix.jdbc.PhoenixHAAdmin; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.util.HAGroupStoreTestUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class ReplicationLogReplayServiceTestIT extends BaseTest { + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = + new HighAvailabilityTestingUtility.HBaseTestingUtilityPair(); + private String zkUrl; + private String peerZkUrl; + private FileSystem localFs; + private URI standbyUri; + private PhoenixHAAdmin haAdmin; + private PhoenixHAAdmin peerHaAdmin; + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + CLUSTERS.start(); + } + + @Before + public void setUp() throws Exception { + zkUrl = getLocalZkUrl(config); + peerZkUrl = CLUSTERS.getZkUrl2(); + localFs = FileSystem.getLocal(config); + standbyUri = testFolder.getRoot().toURI(); + haAdmin = new PhoenixHAAdmin(zkUrl, config, ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); + peerHaAdmin = new PhoenixHAAdmin(peerZkUrl, config, ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); + cleanupHAGroupState(); + + // Set the required configuration for ReplicationLogReplay + config.set(ReplicationLogReplay.REPLICATION_LOG_REPLAY_HDFS_URL_KEY, standbyUri.toString()); + // Enable replication replay service + config.setBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, true); + } + + @After + public void tearDown() throws IOException { + localFs.delete(new Path(testFolder.getRoot().toURI()), true); + } + + /** + * Tests getConsistencyPoint method of ReplicationLogReplayService with multiple HA groups. + * Verifies that it returns the minimum consistency point across all HA groups. + */ + @Test + public void testGetConsistencyPointMultipleGroups() throws IOException, SQLException { + final String haGroupName1 = "testGroup1"; + final String haGroupName2 = "testGroup2"; + + // Insert HAGroupStoreRecords into the system table + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName1, zkUrl, peerZkUrl, + CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, null); + + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName2, zkUrl, peerZkUrl, + CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, null); + + // Set up consistency points for both groups + long consistencyPoint1 = 1704153600000L; // 2024-01-02 00:00:00 + long consistencyPoint2 = 1704240000000L; // 2024-01-03 00:00:00 + + // Create testable replays with mocked consistency points + TestableReplicationLogReplay testableReplay1 = + new TestableReplicationLogReplay(config, haGroupName1); + testableReplay1.setConsistencyPoint(consistencyPoint1); + + TestableReplicationLogReplay testableReplay2 = + new TestableReplicationLogReplay(config, haGroupName2); + testableReplay2.setConsistencyPoint(consistencyPoint2); + + // Create a spy on ReplicationLogReplayService and mock getReplicationLogReplay + ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(config); + ReplicationLogReplayService serviceSpy = Mockito.spy(service); + + Mockito.doReturn(testableReplay1).when(serviceSpy).getReplicationLogReplay(haGroupName1); + Mockito.doReturn(testableReplay2).when(serviceSpy).getReplicationLogReplay(haGroupName2); + + // Call getConsistencyPoint + long consistencyPoint = serviceSpy.getConsistencyPoint(); + + // Should return the minimum consistency point across all groups + // Since consistencyPoint1 < consistencyPoint2, consistencyPoint should be consistencyPoint1 + assertEquals("Consistency point should be the minimum across all HA groups", consistencyPoint1, + consistencyPoint); + } + + /** + * Tests getConsistencyPoint method of ReplicationLogReplayService with a single HA group. + * Verifies that it returns the consistency point for that group. + */ + @Test + public void testGetConsistencyPointSingleGroup() throws IOException, SQLException { + final String haGroupName = "testGroup"; + + // Insert HAGroupStoreRecord into the system table + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl, peerZkUrl, + CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, null); + + // Set up consistency point for the group + long consistencyPoint = 1704153600000L; // 2024-01-02 00:00:00 + + // Create testable replay with mocked consistency point + TestableReplicationLogReplay testableReplay = + new TestableReplicationLogReplay(config, haGroupName); + testableReplay.setConsistencyPoint(consistencyPoint); + + // Create a spy on ReplicationLogReplayService and mock getReplicationLogReplay + ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(config); + ReplicationLogReplayService serviceSpy = Mockito.spy(service); + + Mockito.doReturn(testableReplay).when(serviceSpy).getReplicationLogReplay(haGroupName); + + // Call getConsistencyPoint + long result = serviceSpy.getConsistencyPoint(); + + // Should return the consistency point for the single group + // Since we start with currentTime and compare with Math.min, it should return the group's + // consistency point + assertEquals("Consistency point should match the single HA group's consistency point", + consistencyPoint, result); + } + + /** + * Tests getConsistencyPoint method when no HA groups exist. Should return currentTime (initial + * value) when there are no groups. + */ + @Test + public void testGetConsistencyPointWithNoGroups() throws IOException, SQLException { + // Ensure no HA groups exist + cleanupHAGroupState(); + + // Mock current time + long mockedCurrentTime = 1704153600000L; // 2024-01-02 00:00:00 + EnvironmentEdge edge = () -> mockedCurrentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + // Get ReplicationLogReplayService instance + ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(config); + + // Call getConsistencyPoint + long consistencyPoint = service.getConsistencyPoint(); + + // Should return mocked currentTime when no groups exist (since it's initialized with + // currentTime) + assertEquals("Consistency point should equal mocked currentTime when no HA groups exist", + mockedCurrentTime, consistencyPoint); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + private void cleanupHAGroupState() throws SQLException { + // Clean up existing HAGroupStoreRecords + try { + List haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl); + for (String haGroupName : haGroupNames) { + haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName)); + peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName)); + } + + } catch (Exception e) { + // Ignore cleanup errors + } + // Remove any existing entries in the system table + HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl); + } + + /** + * Testable implementation of ReplicationLogReplay for testing. Allows mocking the + * getConsistencyPoint method via getReplicationReplayLogDiscovery(). + */ + private static class TestableReplicationLogReplay extends ReplicationLogReplay { + private ReplicationLogDiscoveryReplay mockDiscovery; + + public TestableReplicationLogReplay(org.apache.hadoop.conf.Configuration conf, + String haGroupName) { + super(conf, haGroupName); + // Create a mock discovery that we can configure + mockDiscovery = Mockito.mock(ReplicationLogDiscoveryReplay.class); + } + + public void setConsistencyPoint(long consistencyPoint) { + try { + when(mockDiscovery.getConsistencyPoint()).thenReturn(consistencyPoint); + } catch (IOException e) { + // This shouldn't happen during mock setup, but handle it just in case + throw new RuntimeException("Failed to set consistency point", e); + } + } + + @Override + protected ReplicationLogDiscoveryReplay getReplicationReplayLogDiscovery() { + return mockDiscovery; + } + } +}