diff --git a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java index b5e8156..987bd02 100644 --- a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java +++ b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java @@ -31,10 +31,13 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.enterprise.cloudsearch.sdk.CheckpointCloseableIterable; +import com.google.enterprise.cloudsearch.sdk.ConnectorScheduler.OneAtATimeRunnable; import com.google.enterprise.cloudsearch.sdk.IncrementalChangeHandler; import com.google.enterprise.cloudsearch.sdk.InvalidConfigurationException; import com.google.enterprise.cloudsearch.sdk.RepositoryException; +import com.google.enterprise.cloudsearch.sdk.StatsManager; import com.google.enterprise.cloudsearch.sdk.config.Configuration; import com.google.enterprise.cloudsearch.sdk.indexing.DefaultAcl; import com.google.enterprise.cloudsearch.sdk.indexing.IndexingConnector; @@ -48,6 +51,9 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -137,8 +143,11 @@ public class FullTraversalConnector implements IndexingConnector, IncrementalCha private RepositoryContext repositoryContext; private CheckpointHandler checkpointHandler; private boolean useQueues; - @VisibleForTesting QueueCheckpoint queueCheckpoint; + @VisibleForTesting + QueueCheckpoint queueCheckpoint; private int partitionSize; + private ScheduledExecutorService scheduleExecutor; + private ExecutorService backgroundExecutor; /** * Creates an instance of {@link FullTraversalConnector} for performing full traversal over given @@ -148,6 +157,12 @@ public class FullTraversalConnector implements IndexingConnector, IncrementalCha */ public FullTraversalConnector(Repository repository) { this(repository, null); + scheduleExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(false).setNameFormat("schedule").build()); + backgroundExecutor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(false).setNameFormat("background").build()); } /** @@ -304,6 +319,30 @@ private interface GetDocsFunction { CheckpointCloseableIterable apply(byte[] checkpoint) throws RepositoryException; } + /** + * Runnable that when invoked executes the delegate with {@link #backgroundExecutor} and then + * returns before completion. That implies that uses of this class must ensure they do not add an + * instance directly to {@link #backgroundExecutor}, otherwise an odd infinite loop will + * occur. + */ + protected class BackgroundRunnable implements Runnable { + + private final Runnable delegate; + + public BackgroundRunnable(Runnable delegate) { + this.delegate = checkNotNull(delegate); + } + + @Override + public void run() { + try { + backgroundExecutor.execute(delegate); + } catch (Throwable t) { + logger.log(Level.WARNING, "Failed to start background runnable", t); + } + } + } + /** * Performs a repository traversal of a given type. * @@ -313,6 +352,13 @@ private interface GetDocsFunction { private boolean doTraverse(String traversalType, String checkpointName, String queueName, GetDocsFunction getDocs) throws IOException, InterruptedException { + + Runnable loggingStatsRunnable = + new BackgroundRunnable( + new OneAtATimeRunnable( + () -> logger.info(StatsManager.getInstance().printStats()), "StatsLog")); + scheduleExecutor.scheduleAtFixedRate(loggingStatsRunnable, 1, 5, TimeUnit.MINUTES); + logger.log(Level.INFO, "Begin {0} traversal.", traversalType); ExecuteCounter executeCounter = new ExecuteCounter(); byte[] checkpoint = checkpointHandler.readCheckpoint(checkpointName); @@ -462,7 +508,7 @@ public List call() throws Exception { */ private List executeOperation( ApiOperation operation, ExecuteCounter executeCounter) - throws InterruptedException, IOException { + throws InterruptedException, IOException { // most should be update item ops, but allow other ops (delete item, etc.) executeCounter.incrementTotal(); String displayId = "[not an item update]";