diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index b7b9390c5be..77c9a60f7cf 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -450,6 +450,9 @@ public interface QueryServices extends SQLCloseable {
// Check HAGroup is Stale for mutations
public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED =
"phoenix.ha.group.stale.for.mutation.check.enabled";
+ // Enable prewarming of HAGroupStoreClients at RegionServer startup
+ String HA_GROUP_STORE_CLIENT_PREWARM_ENABLED
+ = "phoenix.ha.group.store.client.prewarm.enabled";
// Enable Thread Pool Creation in CQSI to be used for HBase Client.
String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled";
// CQSI Thread Pool Related Configuration.
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 6b5374d6548..dadcb47dbca 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -60,6 +60,7 @@
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED;
import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
import static org.apache.phoenix.query.QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
@@ -493,6 +494,7 @@ public class QueryServicesOptions {
public static final Boolean DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
public static final Boolean DEFAULT_HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = true;
+ public static final Boolean DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED = true;
public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false;
public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25;
@@ -639,7 +641,9 @@ public static QueryServicesOptions withDefaults() {
.setIfUnset(CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS)
.setIfUnset(REPLICATION_LOG_ROTATION_TIME_MS_KEY, DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS)
.setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
- DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS);
+ DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS)
+ .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
+ DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 99a66bd2d7e..3a2c31c3b5d 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -24,7 +24,13 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
@@ -58,7 +64,7 @@ public class PhoenixRegionServerEndpoint extends
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
- private String zkUrl;
+ private ExecutorService prewarmExecutor;
// regionserver level thread pool used by Uncovered Indexes to scan data table rows
private static TaskRunner uncoveredIndexThreadPool;
@@ -66,10 +72,17 @@ public class PhoenixRegionServerEndpoint extends
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
- this.metricsSource =
- MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
- initUncoveredIndexThreadPool(this.conf);
- this.zkUrl = getLocalZkUrl(conf);
+ this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
+ .getInstance().getMetadataCachingSource();
+ // Start async prewarming of HAGroupStoreClients if enabled
+ if (conf.getBoolean(
+ QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
+ QueryServicesOptions
+ .DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED)) {
+ startHAGroupStoreClientPrewarming();
+ } else {
+ LOGGER.info("HAGroupStoreClient prewarming is disabled");
+ }
// Start replication log replay
ReplicationLogReplayService.getInstance(conf).start();
}
@@ -84,6 +97,10 @@ public void stop(CoprocessorEnvironment env) throws IOException {
.stop("PhoenixRegionServerEndpoint is stopping. Shutting down uncovered index threadpool.");
}
ServerUtil.ConnectionFactory.shutdown();
+ // Stop prewarming executor
+ if (prewarmExecutor != null) {
+ prewarmExecutor.shutdownNow();
+ }
}
@Override
@@ -215,4 +232,90 @@ private static void initUncoveredIndexThreadPool(Configuration conf) {
LOGGER.info("Initialized region level thread pool for Uncovered Global Indexes.");
}
+ /**
+ * Prewarms HAGroupStoreClients in background thread with retry.
+ * Initializes all HA group clients asynchronously at startup.
+ *
+ * Phase 1 : Retry indefinitely until HAGroupStoreManager is initialized
+ * and HAGroupNames are retrieved. If the SYSTEM.HA_GROUP table region
+ * is not ready, manager.getHAGroupNames() would return an exception.
+ * So we need to retry until the SYSTEM.HA_GROUP table region is ready
+ * and then retrieve the HAGroupNames for prewarming.
+ *
+ *
+ * Phase 2 : Prewarm individual HAGroupStoreClients with retry.
+ * If the HAGroupStoreClient is not ready/initialized,
+ * manager.getClusterRoleRecord(haGroup) would throw an exception.
+ * So we need to retry until the HAGroupStoreClient is ready/initialized.
+ */
+ private void startHAGroupStoreClientPrewarming() {
+ prewarmExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "HAGroupStoreClient-Prewarm");
+ t.setDaemon(true);
+ return t;
+ });
+
+ prewarmExecutor.submit(() -> {
+ HAGroupStoreManager manager = null;
+ List pending = null;
+ // Phase 1: Retry indefinitely until HAGroupStoreManager is initialized
+ // and HAGroupNames are retrieved.
+ while (pending == null) {
+ try {
+ manager = HAGroupStoreManager.getInstance(conf);
+ if (manager != null) {
+ pending = new ArrayList<>(manager.getHAGroupNames());
+ LOGGER.info("Starting prewarming for {} HAGroupStoreClients",
+ pending.size());
+ } else {
+ LOGGER.debug("HAGroupStoreManager is null, retrying in 2s...");
+ Thread.sleep(2000);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.info("HAGroupStoreClient prewarming interrupted during "
+ + "initialization");
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ LOGGER.debug("Failed to initialize HAGroupStoreManager, retrying in "
+ + "2s...", e);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {
+ LOGGER.info("HAGroupStoreClient prewarming interrupted");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ // Phase 2: Prewarm individual HAGroupStoreClients with retry
+ try {
+ while (!pending.isEmpty()) {
+ Iterator iterator = pending.iterator();
+ while (iterator.hasNext()) {
+ String haGroup = iterator.next();
+ try {
+ manager.getClusterRoleRecord(haGroup);
+ iterator.remove();
+ LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)",
+ haGroup, pending.size());
+ } catch (Exception e) {
+ LOGGER.debug("Failed to prewarm {}, will retry", haGroup, e);
+ }
+ }
+
+ if (!pending.isEmpty()) {
+ Thread.sleep(2000);
+ }
+ }
+
+ LOGGER.info("Completed prewarming all HAGroupStoreClients");
+ } catch (InterruptedException e) {
+ LOGGER.info("HAGroupStoreClient prewarming interrupted during warmup");
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java
index edfb7bd2076..d92e3f564aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java
@@ -24,9 +24,18 @@
import static org.junit.Assert.assertNotNull;
import com.google.protobuf.RpcCallback;
+
+import java.io.IOException;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.OnlineRegions;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
@@ -38,20 +47,26 @@
import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
import org.apache.phoenix.jdbc.PhoenixHAAdmin;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.HAGroupStoreTestUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@Category({ NeedsOwnMiniClusterTest.class })
public class PhoenixRegionServerEndpointWithConsistentFailoverIT extends BaseTest {
+ private static final Logger LOGGER
+ = LoggerFactory.getLogger(PhoenixRegionServerEndpointWithConsistentFailoverIT.class);
private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L;
private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS =
new HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
@@ -65,22 +80,110 @@ public class PhoenixRegionServerEndpointWithConsistentFailoverIT extends BaseTes
public static synchronized void doSetup() throws Exception {
Map props = Maps.newHashMapWithExpectedSize(1);
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ // Set prewarm enabled to true for cluster 1 and false for cluster 2 for comparison.
+ CLUSTERS.getHBaseCluster1().getConfiguration().setBoolean(
+ QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, true);
+ CLUSTERS.getHBaseCluster2().getConfiguration().setBoolean(
+ QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, false);
CLUSTERS.start();
}
+ @AfterClass
+ public static synchronized void doTeardown() throws Exception {
+ CLUSTERS.close();
+ }
+
@Before
public void setUp() throws Exception {
- zkUrl = getLocalZkUrl(config);
peerZkUrl = CLUSTERS.getZkUrl2();
- HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), zkUrl,
- peerZkUrl, CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
- ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, null);
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(),
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY,
+ null);
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), CLUSTERS.getZkUrl2(), CLUSTERS.getZkUrl1(),
+ CLUSTERS.getMasterAddress2(), CLUSTERS.getMasterAddress1(),
+ ClusterRoleRecord.ClusterRole.STANDBY, ClusterRoleRecord.ClusterRole.ACTIVE,
+ null);
+
+ }
+
+ @Test
+ public void testHAGroupStoreClientPrewarming() throws Exception {
+ // Use a different HA group name to avoid interference with setUp() method
+ String haGroupName = testName.getMethodName() + "_test";
+
+ // There is a race condition between when RegionServerEndpoint Coproc starts and
+ // when the HAGroupStoreRecord is inserted into the system table.
+ // To handle this condition and get predictable results, we will insert the HAGroupStoreRecord into the system table first.
+ // Once the HAGroupStoreRecord is inserted into the system table, we will start the RegionServerEndpoint Coproc again.
+ // This will ensure that the RegionServerEndpoint Coproc starts after the HAGroupStoreRecord is inserted into the system table.
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(),
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY,
+ null);
+
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(),
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY,
+ CLUSTERS.getZkUrl2());
+
+ // Get RegionServer instances from both clusters
+ HRegionServer regionServer1 = CLUSTERS.getHBaseCluster1().getHBaseCluster().getRegionServer(0);
+ PhoenixRegionServerEndpoint coprocessor1 = getPhoenixRegionServerEndpoint(regionServer1);
+
+ // Start the RegionServerEndpoint Coproc for cluster 1
+ coprocessor1.start(getTestCoprocessorEnvironment(CLUSTERS.getHBaseCluster1().getConfiguration()));
+
+ HRegionServer regionServer2 = CLUSTERS.getHBaseCluster2().getHBaseCluster().getRegionServer(0);
+ PhoenixRegionServerEndpoint coprocessor2 = getPhoenixRegionServerEndpoint(regionServer2);
+
+ // Start the RegionServerEndpoint Coproc for cluster 2
+ coprocessor2.start(getTestCoprocessorEnvironment(CLUSTERS.getHBaseCluster2().getConfiguration()));
+
+ // Wait for prewarming to complete on cluster 1 (cluster 2 won't prewarm)
+ Thread.sleep(5000);
+
+ // Expected records for each cluster
+ ClusterRoleRecord expectedRecord1 = buildExpectedClusterRoleRecord(haGroupName,
+ ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.UNKNOWN);
+ ClusterRoleRecord expectedRecord2 = buildExpectedClusterRoleRecord(haGroupName,
+ ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY);
+
+ // Test Cluster 1 WITH prewarming
+ ServerRpcController controller1 = new ServerRpcController();
+ long startTimeCluster1 = System.currentTimeMillis();
+ executeGetClusterRoleRecordAndVerify(coprocessor1, controller1,
+ haGroupName, expectedRecord1, false);
+ long timeCluster1 = System.currentTimeMillis() - startTimeCluster1;
+ LOGGER.info("Cluster 1 WITH prewarming (after restart, prewarmed at startup): {} ms for {}",
+ timeCluster1, CLUSTERS.getZkUrl1());
+
+ // Test Cluster 2 WITHOUT prewarming
+ ServerRpcController controller2 = new ServerRpcController();
+ long startTimeCluster2 = System.currentTimeMillis();
+ executeGetClusterRoleRecordAndVerify(coprocessor2, controller2,
+ haGroupName, expectedRecord2, false);
+ long timeCluster2 = System.currentTimeMillis() - startTimeCluster2;
+ LOGGER.info("Cluster 2 WITHOUT prewarming (after restart, cold start): {} ms for {}",
+ timeCluster2, CLUSTERS.getZkUrl2());
+
+ // Compare performance
+ LOGGER.info("Performance comparison: Cluster 1 (prewarmed) took {} ms, " +
+ "Cluster 2 (not prewarmed) took {} ms",
+ timeCluster1, timeCluster2);
+ LOGGER.info("Performance improvement: {} ms faster with prewarming",
+ (timeCluster2 - timeCluster1));
+
+ // Prewarmed cluster should be faster than non-prewarmed cluster
+ assert (timeCluster1 < timeCluster2) :
+ String.format("Prewarmed cluster (cluster 1: %d ms) should be faster than " +
+ "non-prewarmed cluster (cluster 2: %d ms)", timeCluster1, timeCluster2);
}
@Test
public void testGetClusterRoleRecordAndInvalidate() throws Exception {
String haGroupName = testName.getMethodName();
- HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0);
+ HRegionServer regionServer = CLUSTERS.getHBaseCluster1().getHBaseCluster().getRegionServer(0);
PhoenixRegionServerEndpoint coprocessor = getPhoenixRegionServerEndpoint(regionServer);
assertNotNull(coprocessor);
ServerRpcController controller = new ServerRpcController();
@@ -103,12 +206,12 @@ public void testGetClusterRoleRecordAndInvalidate() throws Exception {
// Delete the HAGroupStoreRecord from ZK
try (PhoenixHAAdmin haAdmin =
- new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE)) {
+ new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE)) {
haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
}
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Delete the row from System Table
- HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl);
+ HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, CLUSTERS.getZkUrl1());
// Expect exception when getting ClusterRoleRecord because the HAGroupStoreRecord is not found
// in ZK
@@ -117,7 +220,7 @@ public void testGetClusterRoleRecordAndInvalidate() throws Exception {
true);
// Update the row
- HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), zkUrl,
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), CLUSTERS.getZkUrl1(),
peerZkUrl, CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, ClusterRoleRecord.ClusterRole.STANDBY, null);
@@ -187,4 +290,69 @@ private PhoenixRegionServerEndpoint getPhoenixRegionServerEndpoint(HRegionServer
requestBuilder.setHaGroupName(ByteStringer.wrap(Bytes.toBytes(haGroupName)));
return requestBuilder.build();
}
+
+ private RegionServerCoprocessorEnvironment getTestCoprocessorEnvironment(Configuration conf) {
+ return new RegionServerCoprocessorEnvironment() {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public String getHBaseVersion() {
+ return "";
+ }
+
+ @Override
+ public RegionServerCoprocessor getInstance() {
+ return null;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public int getLoadSequence() {
+ return 0;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return null;
+ }
+
+ @Override
+ public OnlineRegions getOnlineRegions() {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() {
+ return null;
+ }
+
+ @Override
+ public Connection createConnection(Configuration conf) throws IOException {
+ return null;
+ }
+
+ @Override
+ public MetricRegistry getMetricRegistryForRegionServer() {
+ return null;
+ }
+ };
+ }
}