Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ public interface QueryServices extends SQLCloseable {
// Check HAGroup is Stale for mutations
public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED =
"phoenix.ha.group.stale.for.mutation.check.enabled";
// Enable prewarming of HAGroupStoreClients at RegionServer startup
String HA_GROUP_STORE_CLIENT_PREWARM_ENABLED
= "phoenix.ha.group.store.client.prewarm.enabled";
// Enable Thread Pool Creation in CQSI to be used for HBase Client.
String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled";
// CQSI Thread Pool Related Configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED;
import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
import static org.apache.phoenix.query.QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
Expand Down Expand Up @@ -493,6 +494,7 @@ public class QueryServicesOptions {

public static final Boolean DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
public static final Boolean DEFAULT_HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = true;
public static final Boolean DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED = true;
public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false;
public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25;
Expand Down Expand Up @@ -639,7 +641,9 @@ public static QueryServicesOptions withDefaults() {
.setIfUnset(CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS)
.setIfUnset(REPLICATION_LOG_ROTATION_TIME_MS_KEY, DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS)
.setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS);
DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS)
.setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED);

// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
Expand Down Expand Up @@ -58,18 +64,25 @@ public class PhoenixRegionServerEndpoint extends
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
private String zkUrl;
private ExecutorService prewarmExecutor;

// regionserver level thread pool used by Uncovered Indexes to scan data table rows
private static TaskRunner uncoveredIndexThreadPool;

@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
this.metricsSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
initUncoveredIndexThreadPool(this.conf);
this.zkUrl = getLocalZkUrl(conf);
this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
.getInstance().getMetadataCachingSource();
// Start async prewarming of HAGroupStoreClients if enabled
if (conf.getBoolean(
QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
QueryServicesOptions
.DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED)) {
startHAGroupStoreClientPrewarming();
} else {
LOGGER.info("HAGroupStoreClient prewarming is disabled");
}
// Start replication log replay
ReplicationLogReplayService.getInstance(conf).start();
}
Expand All @@ -84,6 +97,10 @@ public void stop(CoprocessorEnvironment env) throws IOException {
.stop("PhoenixRegionServerEndpoint is stopping. Shutting down uncovered index threadpool.");
}
ServerUtil.ConnectionFactory.shutdown();
// Stop prewarming executor
if (prewarmExecutor != null) {
prewarmExecutor.shutdownNow();
}
}

@Override
Expand Down Expand Up @@ -215,4 +232,90 @@ private static void initUncoveredIndexThreadPool(Configuration conf) {
LOGGER.info("Initialized region level thread pool for Uncovered Global Indexes.");
}

/**
* Prewarms HAGroupStoreClients in background thread with retry.
* Initializes all HA group clients asynchronously at startup.
* <p>
* Phase 1 : Retry indefinitely until HAGroupStoreManager is initialized
* and HAGroupNames are retrieved. If the SYSTEM.HA_GROUP table region
* is not ready, manager.getHAGroupNames() would return an exception.
* So we need to retry until the SYSTEM.HA_GROUP table region is ready
* and then retrieve the HAGroupNames for prewarming.
*
* <p>
* Phase 2 : Prewarm individual HAGroupStoreClients with retry.
* If the HAGroupStoreClient is not ready/initialized,
* manager.getClusterRoleRecord(haGroup) would throw an exception.
* So we need to retry until the HAGroupStoreClient is ready/initialized.
*/
private void startHAGroupStoreClientPrewarming() {
prewarmExecutor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "HAGroupStoreClient-Prewarm");
t.setDaemon(true);
return t;
});

prewarmExecutor.submit(() -> {
HAGroupStoreManager manager = null;
List<String> pending = null;
// Phase 1: Retry indefinitely until HAGroupStoreManager is initialized
// and HAGroupNames are retrieved.
while (pending == null) {
try {
manager = HAGroupStoreManager.getInstance(conf);
if (manager != null) {
pending = new ArrayList<>(manager.getHAGroupNames());
LOGGER.info("Starting prewarming for {} HAGroupStoreClients",
pending.size());
} else {
LOGGER.debug("HAGroupStoreManager is null, retrying in 2s...");
Thread.sleep(2000);
}
} catch (InterruptedException e) {
LOGGER.info("HAGroupStoreClient prewarming interrupted during "
+ "initialization");
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
LOGGER.debug("Failed to initialize HAGroupStoreManager, retrying in "
+ "2s...", e);
try {
Thread.sleep(2000);
} catch (InterruptedException ie) {
LOGGER.info("HAGroupStoreClient prewarming interrupted");
Thread.currentThread().interrupt();
return;
}
}
}

// Phase 2: Prewarm individual HAGroupStoreClients with retry
try {
while (!pending.isEmpty()) {
Iterator<String> iterator = pending.iterator();
while (iterator.hasNext()) {
String haGroup = iterator.next();
try {
manager.getClusterRoleRecord(haGroup);
iterator.remove();
LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)",
haGroup, pending.size());
} catch (Exception e) {
LOGGER.debug("Failed to prewarm {}, will retry", haGroup, e);
}
}

if (!pending.isEmpty()) {
Thread.sleep(2000);
}
}

LOGGER.info("Completed prewarming all HAGroupStoreClients");
} catch (InterruptedException e) {
LOGGER.info("HAGroupStoreClient prewarming interrupted during warmup");
Thread.currentThread().interrupt();
}
});
}

}
Loading