diff --git a/server/manager/pom.xml b/server/manager/pom.xml index b2280f88f46..0da212e0240 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -31,6 +31,10 @@ Apache Accumulo Manager Server The manager server for Apache Accumulo for load balancing and other system-wide operations. + + com.github.ben-manes.caffeine + caffeine + com.google.code.gson gson diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 0b3ea6ab404..01399b8d1f5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -157,6 +158,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.collect.ImmutableSortedMap; import com.google.common.util.concurrent.RateLimiter; @@ -201,8 +205,11 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final Migrations migrations = new Migrations(); + + private final LoadingCache mergeLocks = Caffeine.newBuilder().weakValues() + .scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock()); + final EventCoordinator nextEvent = new EventCoordinator(); - private final Object mergeLock = new Object(); private Thread replicationWorkThread; private Thread replicationAssignerThread; RecoveryManager recoveryManager = null; @@ -477,7 +484,9 @@ public TServerConnection getConnection(TServerInstance server) { public MergeInfo getMergeInfo(TableId tableId) { ServerContext context = getContext(); - synchronized (mergeLock) { + final ReentrantLock l = mergeLocks.get(tableId.canonical()); + l.lock(); + try { try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; if (!context.getZooReaderWriter().exists(path)) { @@ -496,15 +505,19 @@ public MergeInfo getMergeInfo(TableId tableId) { log.warn("Unexpected error reading merge state", ex); return new MergeInfo(); } + } finally { + l.unlock(); } } public void setMergeState(MergeInfo info, MergeState state) throws KeeperException, InterruptedException { ServerContext context = getContext(); - synchronized (mergeLock) { - String path = - getZooKeeperRoot() + Constants.ZTABLES + "/" + info.getExtent().tableId() + "/merge"; + final TableId tid = info.getExtent().tableId(); + final ReentrantLock l = mergeLocks.get(tid.canonical()); + l.lock(); + try { + String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid + "/merge"; info.setState(state); if (state.equals(MergeState.NONE)) { context.getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP); @@ -519,16 +532,20 @@ public void setMergeState(MergeInfo info, MergeState state) state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE); } - mergeLock.notifyAll(); + } finally { + l.unlock(); } nextEvent.event("Merge state of %s set to %s", info.getExtent(), state); } public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException { - synchronized (mergeLock) { + final ReentrantLock l = mergeLocks.get(tableId.canonical()); + l.lock(); + try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; getContext().getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP); - mergeLock.notifyAll(); + } finally { + l.unlock(); } nextEvent.event("Merge state of %s cleared", tableId); }