Skip to content

Use per-table merge lock in Manager instead of single lock#6378

Open
dlmarion wants to merge 4 commits into
apache:2.1from
dlmarion:per-table-merge-lock
Open

Use per-table merge lock in Manager instead of single lock#6378
dlmarion wants to merge 4 commits into
apache:2.1from
dlmarion:per-table-merge-lock

Conversation

@dlmarion
Copy link
Copy Markdown
Contributor

This change introduces a per-table merge lock instead of a single lock to protect reading/writing to ZooKeeper.

Closes #6374

This change introduces a per-table merge lock instead of
a single lock to protect reading/writing to ZooKeeper.

Closes apache#6374
@dlmarion dlmarion added this to the 2.1.5 milestone May 18, 2026
@dlmarion dlmarion self-assigned this May 18, 2026
Copy link
Copy Markdown
Member

@ctubbsii ctubbsii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use a Caffeine cache to handle this. The key could also be a hash of the TableId, if you wanted to stripe this, but less aggressively than one per table.

Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
private final MergeLocks mergeLocks = new MergeLocks();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final MergeLocks mergeLocks = new MergeLocks();
private final LoadingCache<TableId,ReentrantLock> mergeLocks = Caffeine.newBuilder().weakValues()
.scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock());

My main concern with this is that the TableId in the keys will prevent cleanup of the TableId.cache weak values. The scheduler is supposed to help with cleanup of the keys whose values have been garbage collected, but it may also be a good idea to use String (tableId.canonical()) for the key.

Comment on lines +176 to +199
private final class MergeLocks {

private final Object lock = new Object();
private Map<TableId,ReentrantLock> lockStorage = new HashMap<TableId,ReentrantLock>();

private ReentrantLock getLock(TableId tid) {
synchronized (lock) {
return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true));
}
}

private void cleanup() {
synchronized (lock) {
Set<TableId> removals = new HashSet<>();
for (Entry<TableId,ReentrantLock> e : lockStorage.entrySet()) {
if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) {
removals.add(e.getKey());
}
}
removals.forEach(lockStorage::remove);
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final class MergeLocks {
private final Object lock = new Object();
private Map<TableId,ReentrantLock> lockStorage = new HashMap<TableId,ReentrantLock>();
private ReentrantLock getLock(TableId tid) {
synchronized (lock) {
return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true));
}
}
private void cleanup() {
synchronized (lock) {
Set<TableId> removals = new HashSet<>();
for (Entry<TableId,ReentrantLock> e : lockStorage.entrySet()) {
if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) {
removals.add(e.getKey());
}
}
removals.forEach(lockStorage::remove);
}
}
}

Comment thread server/manager/src/main/java/org/apache/accumulo/manager/Manager.java Outdated
Comment thread server/manager/src/main/java/org/apache/accumulo/manager/Manager.java Outdated

public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException {
synchronized (mergeLock) {
final ReentrantLock l = mergeLocks.getLock(tableId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final ReentrantLock l = mergeLocks.getLock(tableId);
final ReentrantLock l = mergeLocks.get(tableId);


ThreadPools.watchCriticalScheduledTask(
context.getScheduledExecutor().scheduleWithFixedDelay(mergeLocks::cleanup, 3, 3, HOURS));

Copy link
Copy Markdown
Member

@ctubbsii ctubbsii May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this cleanup code with the weakValues cache's scheduler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants