diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index b7b9390c5be..77c9a60f7cf 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -450,6 +450,9 @@ public interface QueryServices extends SQLCloseable { // Check HAGroup is Stale for mutations public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = "phoenix.ha.group.stale.for.mutation.check.enabled"; + // Enable prewarming of HAGroupStoreClients at RegionServer startup + String HA_GROUP_STORE_CLIENT_PREWARM_ENABLED + = "phoenix.ha.group.store.client.prewarm.enabled"; // Enable Thread Pool Creation in CQSI to be used for HBase Client. String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled"; // CQSI Thread Pool Related Configuration. diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 6b5374d6548..dadcb47dbca 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -60,6 +60,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED; import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; import static org.apache.phoenix.query.QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB; import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB; @@ -493,6 +494,7 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false; public static final Boolean DEFAULT_HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = true; + public static final Boolean DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED = true; public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false; public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60; public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25; @@ -639,7 +641,9 @@ public static QueryServicesOptions withDefaults() { .setIfUnset(CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS) .setIfUnset(REPLICATION_LOG_ROTATION_TIME_MS_KEY, DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS) .setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS, - DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS); + DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS) + .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, + DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java index 99a66bd2d7e..3a2c31c3b5d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java @@ -24,7 +24,13 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; @@ -58,7 +64,7 @@ public class PhoenixRegionServerEndpoint extends private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class); private MetricsMetadataCachingSource metricsSource; protected Configuration conf; - private String zkUrl; + private ExecutorService prewarmExecutor; // regionserver level thread pool used by Uncovered Indexes to scan data table rows private static TaskRunner uncoveredIndexThreadPool; @@ -66,10 +72,17 @@ public class PhoenixRegionServerEndpoint extends @Override public void start(CoprocessorEnvironment env) throws IOException { this.conf = env.getConfiguration(); - this.metricsSource = - MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource(); - initUncoveredIndexThreadPool(this.conf); - this.zkUrl = getLocalZkUrl(conf); + this.metricsSource = MetricsPhoenixCoprocessorSourceFactory + .getInstance().getMetadataCachingSource(); + // Start async prewarming of HAGroupStoreClients if enabled + if (conf.getBoolean( + QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, + QueryServicesOptions + .DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED)) { + startHAGroupStoreClientPrewarming(); + } else { + LOGGER.info("HAGroupStoreClient prewarming is disabled"); + } // Start replication log replay ReplicationLogReplayService.getInstance(conf).start(); } @@ -84,6 +97,10 @@ public void stop(CoprocessorEnvironment env) throws IOException { .stop("PhoenixRegionServerEndpoint is stopping. Shutting down uncovered index threadpool."); } ServerUtil.ConnectionFactory.shutdown(); + // Stop prewarming executor + if (prewarmExecutor != null) { + prewarmExecutor.shutdownNow(); + } } @Override @@ -215,4 +232,90 @@ private static void initUncoveredIndexThreadPool(Configuration conf) { LOGGER.info("Initialized region level thread pool for Uncovered Global Indexes."); } + /** + * Prewarms HAGroupStoreClients in background thread with retry. + * Initializes all HA group clients asynchronously at startup. + *

+ * Phase 1 : Retry indefinitely until HAGroupStoreManager is initialized + * and HAGroupNames are retrieved. If the SYSTEM.HA_GROUP table region + * is not ready, manager.getHAGroupNames() would return an exception. + * So we need to retry until the SYSTEM.HA_GROUP table region is ready + * and then retrieve the HAGroupNames for prewarming. + * + *

+ * Phase 2 : Prewarm individual HAGroupStoreClients with retry. + * If the HAGroupStoreClient is not ready/initialized, + * manager.getClusterRoleRecord(haGroup) would throw an exception. + * So we need to retry until the HAGroupStoreClient is ready/initialized. + */ + private void startHAGroupStoreClientPrewarming() { + prewarmExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "HAGroupStoreClient-Prewarm"); + t.setDaemon(true); + return t; + }); + + prewarmExecutor.submit(() -> { + HAGroupStoreManager manager = null; + List pending = null; + // Phase 1: Retry indefinitely until HAGroupStoreManager is initialized + // and HAGroupNames are retrieved. + while (pending == null) { + try { + manager = HAGroupStoreManager.getInstance(conf); + if (manager != null) { + pending = new ArrayList<>(manager.getHAGroupNames()); + LOGGER.info("Starting prewarming for {} HAGroupStoreClients", + pending.size()); + } else { + LOGGER.debug("HAGroupStoreManager is null, retrying in 2s..."); + Thread.sleep(2000); + } + } catch (InterruptedException e) { + LOGGER.info("HAGroupStoreClient prewarming interrupted during " + + "initialization"); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + LOGGER.debug("Failed to initialize HAGroupStoreManager, retrying in " + + "2s...", e); + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + LOGGER.info("HAGroupStoreClient prewarming interrupted"); + Thread.currentThread().interrupt(); + return; + } + } + } + + // Phase 2: Prewarm individual HAGroupStoreClients with retry + try { + while (!pending.isEmpty()) { + Iterator iterator = pending.iterator(); + while (iterator.hasNext()) { + String haGroup = iterator.next(); + try { + manager.getClusterRoleRecord(haGroup); + iterator.remove(); + LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)", + haGroup, pending.size()); + } catch (Exception e) { + LOGGER.debug("Failed to prewarm {}, will retry", haGroup, e); + } + } + + if (!pending.isEmpty()) { + Thread.sleep(2000); + } + } + + LOGGER.info("Completed prewarming all HAGroupStoreClients"); + } catch (InterruptedException e) { + LOGGER.info("HAGroupStoreClient prewarming interrupted during warmup"); + Thread.currentThread().interrupt(); + } + }); + } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java index edfb7bd2076..d92e3f564aa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java @@ -24,9 +24,18 @@ import static org.junit.Assert.assertNotNull; import com.google.protobuf.RpcCallback; + +import java.io.IOException; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.OnlineRegions; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint; @@ -38,20 +47,26 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility; import org.apache.phoenix.jdbc.PhoenixHAAdmin; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.HAGroupStoreTestUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; @Category({ NeedsOwnMiniClusterTest.class }) public class PhoenixRegionServerEndpointWithConsistentFailoverIT extends BaseTest { + private static final Logger LOGGER + = LoggerFactory.getLogger(PhoenixRegionServerEndpointWithConsistentFailoverIT.class); private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L; private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new HighAvailabilityTestingUtility.HBaseTestingUtilityPair(); @@ -65,22 +80,110 @@ public class PhoenixRegionServerEndpointWithConsistentFailoverIT extends BaseTes public static synchronized void doSetup() throws Exception { Map props = Maps.newHashMapWithExpectedSize(1); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + // Set prewarm enabled to true for cluster 1 and false for cluster 2 for comparison. + CLUSTERS.getHBaseCluster1().getConfiguration().setBoolean( + QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, true); + CLUSTERS.getHBaseCluster2().getConfiguration().setBoolean( + QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, false); CLUSTERS.start(); } + @AfterClass + public static synchronized void doTeardown() throws Exception { + CLUSTERS.close(); + } + @Before public void setUp() throws Exception { - zkUrl = getLocalZkUrl(config); peerZkUrl = CLUSTERS.getZkUrl2(); - HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), zkUrl, - peerZkUrl, CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), - ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, null); + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(), + CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, + null); + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), CLUSTERS.getZkUrl2(), CLUSTERS.getZkUrl1(), + CLUSTERS.getMasterAddress2(), CLUSTERS.getMasterAddress1(), + ClusterRoleRecord.ClusterRole.STANDBY, ClusterRoleRecord.ClusterRole.ACTIVE, + null); + + } + + @Test + public void testHAGroupStoreClientPrewarming() throws Exception { + // Use a different HA group name to avoid interference with setUp() method + String haGroupName = testName.getMethodName() + "_test"; + + // There is a race condition between when RegionServerEndpoint Coproc starts and + // when the HAGroupStoreRecord is inserted into the system table. + // To handle this condition and get predictable results, we will insert the HAGroupStoreRecord into the system table first. + // Once the HAGroupStoreRecord is inserted into the system table, we will start the RegionServerEndpoint Coproc again. + // This will ensure that the RegionServerEndpoint Coproc starts after the HAGroupStoreRecord is inserted into the system table. + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(), + CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, + null); + + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(), + CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, + CLUSTERS.getZkUrl2()); + + // Get RegionServer instances from both clusters + HRegionServer regionServer1 = CLUSTERS.getHBaseCluster1().getHBaseCluster().getRegionServer(0); + PhoenixRegionServerEndpoint coprocessor1 = getPhoenixRegionServerEndpoint(regionServer1); + + // Start the RegionServerEndpoint Coproc for cluster 1 + coprocessor1.start(getTestCoprocessorEnvironment(CLUSTERS.getHBaseCluster1().getConfiguration())); + + HRegionServer regionServer2 = CLUSTERS.getHBaseCluster2().getHBaseCluster().getRegionServer(0); + PhoenixRegionServerEndpoint coprocessor2 = getPhoenixRegionServerEndpoint(regionServer2); + + // Start the RegionServerEndpoint Coproc for cluster 2 + coprocessor2.start(getTestCoprocessorEnvironment(CLUSTERS.getHBaseCluster2().getConfiguration())); + + // Wait for prewarming to complete on cluster 1 (cluster 2 won't prewarm) + Thread.sleep(5000); + + // Expected records for each cluster + ClusterRoleRecord expectedRecord1 = buildExpectedClusterRoleRecord(haGroupName, + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.UNKNOWN); + ClusterRoleRecord expectedRecord2 = buildExpectedClusterRoleRecord(haGroupName, + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY); + + // Test Cluster 1 WITH prewarming + ServerRpcController controller1 = new ServerRpcController(); + long startTimeCluster1 = System.currentTimeMillis(); + executeGetClusterRoleRecordAndVerify(coprocessor1, controller1, + haGroupName, expectedRecord1, false); + long timeCluster1 = System.currentTimeMillis() - startTimeCluster1; + LOGGER.info("Cluster 1 WITH prewarming (after restart, prewarmed at startup): {} ms for {}", + timeCluster1, CLUSTERS.getZkUrl1()); + + // Test Cluster 2 WITHOUT prewarming + ServerRpcController controller2 = new ServerRpcController(); + long startTimeCluster2 = System.currentTimeMillis(); + executeGetClusterRoleRecordAndVerify(coprocessor2, controller2, + haGroupName, expectedRecord2, false); + long timeCluster2 = System.currentTimeMillis() - startTimeCluster2; + LOGGER.info("Cluster 2 WITHOUT prewarming (after restart, cold start): {} ms for {}", + timeCluster2, CLUSTERS.getZkUrl2()); + + // Compare performance + LOGGER.info("Performance comparison: Cluster 1 (prewarmed) took {} ms, " + + "Cluster 2 (not prewarmed) took {} ms", + timeCluster1, timeCluster2); + LOGGER.info("Performance improvement: {} ms faster with prewarming", + (timeCluster2 - timeCluster1)); + + // Prewarmed cluster should be faster than non-prewarmed cluster + assert (timeCluster1 < timeCluster2) : + String.format("Prewarmed cluster (cluster 1: %d ms) should be faster than " + + "non-prewarmed cluster (cluster 2: %d ms)", timeCluster1, timeCluster2); } @Test public void testGetClusterRoleRecordAndInvalidate() throws Exception { String haGroupName = testName.getMethodName(); - HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0); + HRegionServer regionServer = CLUSTERS.getHBaseCluster1().getHBaseCluster().getRegionServer(0); PhoenixRegionServerEndpoint coprocessor = getPhoenixRegionServerEndpoint(regionServer); assertNotNull(coprocessor); ServerRpcController controller = new ServerRpcController(); @@ -103,12 +206,12 @@ public void testGetClusterRoleRecordAndInvalidate() throws Exception { // Delete the HAGroupStoreRecord from ZK try (PhoenixHAAdmin haAdmin = - new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE)) { + new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE)) { haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); } Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); // Delete the row from System Table - HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl); + HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, CLUSTERS.getZkUrl1()); // Expect exception when getting ClusterRoleRecord because the HAGroupStoreRecord is not found // in ZK @@ -117,7 +220,7 @@ public void testGetClusterRoleRecordAndInvalidate() throws Exception { true); // Update the row - HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), zkUrl, + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), CLUSTERS.getZkUrl1(), peerZkUrl, CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, ClusterRoleRecord.ClusterRole.STANDBY, null); @@ -187,4 +290,69 @@ private PhoenixRegionServerEndpoint getPhoenixRegionServerEndpoint(HRegionServer requestBuilder.setHaGroupName(ByteStringer.wrap(Bytes.toBytes(haGroupName))); return requestBuilder.build(); } + + private RegionServerCoprocessorEnvironment getTestCoprocessorEnvironment(Configuration conf) { + return new RegionServerCoprocessorEnvironment() { + + @Override + public int getVersion() { + return 0; + } + + @Override + public String getHBaseVersion() { + return ""; + } + + @Override + public RegionServerCoprocessor getInstance() { + return null; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public int getLoadSequence() { + return 0; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + + @Override + public ServerName getServerName() { + return null; + } + + @Override + public OnlineRegions getOnlineRegions() { + return null; + } + + @Override + public Connection getConnection() { + return null; + } + + @Override + public Connection createConnection(Configuration conf) throws IOException { + return null; + } + + @Override + public MetricRegistry getMetricRegistryForRegionServer() { + return null; + } + }; + } }