From 71aad21c65609fba9d0fe7b0fc5b0454be45b823 Mon Sep 17 00:00:00 2001 From: Donghan Miao <45982807+donghanmiao@users.noreply.github.com> Date: Fri, 15 Mar 2019 09:36:44 -0700 Subject: [PATCH 1/3] changlog --- changlog.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changlog.md diff --git a/changlog.md b/changlog.md new file mode 100644 index 0000000..44cb8ef --- /dev/null +++ b/changlog.md @@ -0,0 +1,3 @@ + +# Title +- something is changed. From 1ea8cab2308c7775ff25a1125090b4ecaa1e22e2 Mon Sep 17 00:00:00 2001 From: Donghan Miao Date: Mon, 12 Aug 2019 12:57:41 -0700 Subject: [PATCH 2/3] print stats every 5 mins --- .../template/FullTraversalConnector.java | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java index b5e8156..987bd02 100644 --- a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java +++ b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/template/FullTraversalConnector.java @@ -31,10 +31,13 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.enterprise.cloudsearch.sdk.CheckpointCloseableIterable; +import com.google.enterprise.cloudsearch.sdk.ConnectorScheduler.OneAtATimeRunnable; import com.google.enterprise.cloudsearch.sdk.IncrementalChangeHandler; import com.google.enterprise.cloudsearch.sdk.InvalidConfigurationException; import com.google.enterprise.cloudsearch.sdk.RepositoryException; +import com.google.enterprise.cloudsearch.sdk.StatsManager; import com.google.enterprise.cloudsearch.sdk.config.Configuration; import com.google.enterprise.cloudsearch.sdk.indexing.DefaultAcl; import com.google.enterprise.cloudsearch.sdk.indexing.IndexingConnector; @@ -48,6 +51,9 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -137,8 +143,11 @@ public class FullTraversalConnector implements IndexingConnector, IncrementalCha private RepositoryContext repositoryContext; private CheckpointHandler checkpointHandler; private boolean useQueues; - @VisibleForTesting QueueCheckpoint queueCheckpoint; + @VisibleForTesting + QueueCheckpoint queueCheckpoint; private int partitionSize; + private ScheduledExecutorService scheduleExecutor; + private ExecutorService backgroundExecutor; /** * Creates an instance of {@link FullTraversalConnector} for performing full traversal over given @@ -148,6 +157,12 @@ public class FullTraversalConnector implements IndexingConnector, IncrementalCha */ public FullTraversalConnector(Repository repository) { this(repository, null); + scheduleExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(false).setNameFormat("schedule").build()); + backgroundExecutor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(false).setNameFormat("background").build()); } /** @@ -304,6 +319,30 @@ private interface GetDocsFunction { CheckpointCloseableIterable apply(byte[] checkpoint) throws RepositoryException; } + /** + * Runnable that when invoked executes the delegate with {@link #backgroundExecutor} and then + * returns before completion. That implies that uses of this class must ensure they do not add an + * instance directly to {@link #backgroundExecutor}, otherwise an odd infinite loop will + * occur. + */ + protected class BackgroundRunnable implements Runnable { + + private final Runnable delegate; + + public BackgroundRunnable(Runnable delegate) { + this.delegate = checkNotNull(delegate); + } + + @Override + public void run() { + try { + backgroundExecutor.execute(delegate); + } catch (Throwable t) { + logger.log(Level.WARNING, "Failed to start background runnable", t); + } + } + } + /** * Performs a repository traversal of a given type. * @@ -313,6 +352,13 @@ private interface GetDocsFunction { private boolean doTraverse(String traversalType, String checkpointName, String queueName, GetDocsFunction getDocs) throws IOException, InterruptedException { + + Runnable loggingStatsRunnable = + new BackgroundRunnable( + new OneAtATimeRunnable( + () -> logger.info(StatsManager.getInstance().printStats()), "StatsLog")); + scheduleExecutor.scheduleAtFixedRate(loggingStatsRunnable, 1, 5, TimeUnit.MINUTES); + logger.log(Level.INFO, "Begin {0} traversal.", traversalType); ExecuteCounter executeCounter = new ExecuteCounter(); byte[] checkpoint = checkpointHandler.readCheckpoint(checkpointName); @@ -462,7 +508,7 @@ public List call() throws Exception { */ private List executeOperation( ApiOperation operation, ExecuteCounter executeCounter) - throws InterruptedException, IOException { + throws InterruptedException, IOException { // most should be update item ops, but allow other ops (delete item, etc.) executeCounter.incrementTotal(); String displayId = "[not an item update]"; From 1558f952cbb86e3ef12992c9aed32e656bdaf737 Mon Sep 17 00:00:00 2001 From: Donghan Miao <45982807+donghanmiao@users.noreply.github.com> Date: Mon, 12 Aug 2019 13:00:36 -0700 Subject: [PATCH 3/3] Delete changlog.md --- changlog.md | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 changlog.md diff --git a/changlog.md b/changlog.md deleted file mode 100644 index 44cb8ef..0000000 --- a/changlog.md +++ /dev/null @@ -1,3 +0,0 @@ - -# Title -- something is changed.