diff --git a/server/manager/pom.xml b/server/manager/pom.xml
index b2280f88f46..0da212e0240 100644
--- a/server/manager/pom.xml
+++ b/server/manager/pom.xml
@@ -31,6 +31,10 @@
Apache Accumulo Manager Server
The manager server for Apache Accumulo for load balancing and other system-wide operations.
+
+ com.github.ben-manes.caffeine
+ caffeine
+
com.google.code.gson
gson
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0b3ea6ab404..01399b8d1f5 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
@@ -157,6 +158,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.util.concurrent.RateLimiter;
@@ -201,8 +205,11 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener,
Collections.synchronizedMap(new HashMap<>());
final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
+
+ private final LoadingCache mergeLocks = Caffeine.newBuilder().weakValues()
+ .scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock());
+
final EventCoordinator nextEvent = new EventCoordinator();
- private final Object mergeLock = new Object();
private Thread replicationWorkThread;
private Thread replicationAssignerThread;
RecoveryManager recoveryManager = null;
@@ -477,7 +484,9 @@ public TServerConnection getConnection(TServerInstance server) {
public MergeInfo getMergeInfo(TableId tableId) {
ServerContext context = getContext();
- synchronized (mergeLock) {
+ final ReentrantLock l = mergeLocks.get(tableId.canonical());
+ l.lock();
+ try {
try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
if (!context.getZooReaderWriter().exists(path)) {
@@ -496,15 +505,19 @@ public MergeInfo getMergeInfo(TableId tableId) {
log.warn("Unexpected error reading merge state", ex);
return new MergeInfo();
}
+ } finally {
+ l.unlock();
}
}
public void setMergeState(MergeInfo info, MergeState state)
throws KeeperException, InterruptedException {
ServerContext context = getContext();
- synchronized (mergeLock) {
- String path =
- getZooKeeperRoot() + Constants.ZTABLES + "/" + info.getExtent().tableId() + "/merge";
+ final TableId tid = info.getExtent().tableId();
+ final ReentrantLock l = mergeLocks.get(tid.canonical());
+ l.lock();
+ try {
+ String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid + "/merge";
info.setState(state);
if (state.equals(MergeState.NONE)) {
context.getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
@@ -519,16 +532,20 @@ public void setMergeState(MergeInfo info, MergeState state)
state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL
: ZooUtil.NodeExistsPolicy.OVERWRITE);
}
- mergeLock.notifyAll();
+ } finally {
+ l.unlock();
}
nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
}
public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException {
- synchronized (mergeLock) {
+ final ReentrantLock l = mergeLocks.get(tableId.canonical());
+ l.lock();
+ try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
getContext().getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
- mergeLock.notifyAll();
+ } finally {
+ l.unlock();
}
nextEvent.event("Merge state of %s cleared", tableId);
}