Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumMap;
Expand Down Expand Up @@ -78,6 +79,8 @@
* <p>Additionally limits the lock map size.
*/
public class HeapLockManager extends AbstractEventProducer<LockEvent, LockEventParameters> implements LockManager {
private static final LockMode[] LOCK_MODES = LockMode.values();

/** Table size. */
public static final int DEFAULT_SLOTS = 1_048_576;

Expand Down Expand Up @@ -761,6 +764,9 @@ public class LockState implements Releasable {
/** Waiters. */
private final TreeMap<UUID, WaiterImpl> waiters;

/** Number of waiters currently holding each lock mode. Indexed by {@link LockMode#ordinal()}. */
private final int[] lockModeHeldCount = new int[LOCK_MODES.length];

/** Lock key. */
private volatile LockKey key;

Expand Down Expand Up @@ -848,10 +854,14 @@ IgniteBiTuple<CompletableFuture<Void>, LockMode> tryAcquire(UUID txId, LockMode
// Reenter
if (prev != null) {
if (prev.locked() && prev.lockMode().allowReenter(lockMode)) {
// No count update: prev was replaced (not added) and allowReenter
// guarantees the mode is unchanged, so lockModeHeldCount stays in sync.
waiter.lock();

waiter.upgrade(prev);

assertLockModeHeldCount();

return new IgniteBiTuple<>(nullCompletedFuture(), prev.lockMode());
} else {
waiter.upgrade(prev);
Expand All @@ -871,6 +881,8 @@ IgniteBiTuple<CompletableFuture<Void>, LockMode> tryAcquire(UUID txId, LockMode
track(waiter.txId, this);
}

assertLockModeHeldCount();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release flow seems to stay uncovered by this fix. Meanwhile, there is also a usage of heavy method under lock: unlockCompatibleWaiters


return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
}

Expand All @@ -884,6 +896,8 @@ IgniteBiTuple<CompletableFuture<Void>, LockMode> tryAcquire(UUID txId, LockMode
track(waiter.txId, this);
}
}

assertLockModeHeldCount();
}

// Notify outside the monitor.
Expand All @@ -903,6 +917,52 @@ public int waitersCount() {
}
}

/**
* Checks if any currently held lock mode is incompatible with the intended mode.
* Runs in O(1) time (constant number of lock modes). Must be called under {@code synchronized (waiters)}.
*/
private boolean hasAnyIncompatibleHolder(LockMode intended) {
for (LockMode held : LOCK_MODES) {
if (lockModeHeldCount[held.ordinal()] > 0 && !held.isCompatible(intended)) {
return true;
}
}
return false;
}

/** Adjusts {@code lockModeHeldCount} for a mode transition. Must be called under {@code synchronized (waiters)}. */
private void updateHeldCount(@Nullable LockMode oldMode, @Nullable LockMode newMode) {
if (oldMode == newMode) {
return;
}
if (oldMode != null) {
lockModeHeldCount[oldMode.ordinal()]--;
}
if (newMode != null) {
lockModeHeldCount[newMode.ordinal()]++;
}
}

/** Locks the waiter and updates {@code lockModeHeldCount}. Must be called under {@code synchronized (waiters)}. */
private void lockOnWaiter(WaiterImpl waiter) {
LockMode oldMode = waiter.lockMode();
waiter.lock();
updateHeldCount(oldMode, waiter.lockMode());
}

/** Validates lockModeHeldCount is consistent with actual waiter state. Must be called under {@code synchronized (waiters)}. */
private void assertLockModeHeldCount() {
int[] expected = new int[LOCK_MODES.length];
for (WaiterImpl w : waiters.values()) {
if (w.lockMode != null) {
expected[w.lockMode.ordinal()]++;
}
}
Comment on lines +956 to +960
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we want to check this every time?
Did you run benchmarks?

assert Arrays.equals(expected, lockModeHeldCount)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I remember, we didn't want to use assert where it's not necessary, is it still so?

: "lockModeHeldCount out of sync [expected=" + Arrays.toString(expected)
+ ", actual=" + Arrays.toString(lockModeHeldCount) + ']';
}

/**
* Checks current waiter. It can change the internal state of the waiter.
*
Expand All @@ -914,6 +974,14 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) {

assert intendedLockMode != null : "Intended lock mode is null";

// Fast path: no incompatible holders — grant immediately.
// Conservative for upgrades: the waiter's own mode is in the counts, causing a fallthrough to the slow path.
if (!hasAnyIncompatibleHolder(intendedLockMode)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct that the main idea is adding fast path for isWaiterReadyToNotify to reduce the time which tread spends inside synchronized?

lockOnWaiter(waiter);

return true;
}

for (Entry<UUID, WaiterImpl> entry : waiters.tailMap(waiter.txId(), false).entrySet()) {
WaiterImpl tmp = entry.getValue();
LockMode currentlyAcquiredLockMode = tmp.lockMode;
Expand Down Expand Up @@ -983,7 +1051,7 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) {
}
}

waiter.lock();
lockOnWaiter(waiter);

return true;
}
Expand All @@ -1000,6 +1068,7 @@ public boolean tryRelease(UUID txId) {

synchronized (waiters) {
toNotify = release(txId);
assertLockModeHeldCount();
}

// Notify outside the monitor.
Expand Down Expand Up @@ -1028,18 +1097,24 @@ boolean tryRelease(UUID txId, LockMode lockMode) {
assert LockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() :
"The lock is not locked in specified mode [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']';

LockMode modeFromDowngrade = waiter.recalculateMode(lockMode);
LockMode oldMode = waiter.lockMode();
waiter.recalculateMode(lockMode);
LockMode newMode = waiter.lockMode();

updateHeldCount(oldMode, newMode);

if (!waiter.locked() && !waiter.hasLockIntent()) {
// All locks are revoked - deqeue waiter.
// All locks are revoked - dequeue waiter.
waiters.remove(txId);
if (!waiters.isEmpty()) {
toNotify = unlockCompatibleWaiters();
}
} else if (modeFromDowngrade != waiter.lockMode()) {
} else if (oldMode != newMode) {
toNotify = unlockCompatibleWaiters();
}
}

assertLockModeHeldCount();
}

// Notify outside the monitor.
Expand All @@ -1060,7 +1135,13 @@ private List<WaiterImpl> release(UUID txId) {
WaiterImpl removed = waiters.remove(txId);

// Removing incomplete waiter doesn't affect lock state.
if (removed == null || waiters.isEmpty() || !removed.locked()) {
if (removed == null || !removed.locked()) {
return emptyList();
}

updateHeldCount(removed.lockMode(), null);

if (waiters.isEmpty()) {
return emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,40 @@ public void uncaughtException(Thread t, Throwable e) {
assertTrue(lockManager.queue(key).isEmpty());
}

@Test
public void testConcurrentCompatibleLocksOnSameKey() throws InterruptedException {
LockKey key = lockKey();
int threadCount = Runtime.getRuntime().availableProcessors() * 2;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicInteger granted = new AtomicInteger();

Thread[] threads = new Thread[threadCount];

for (int i = 0; i < threadCount; i++) {
UUID txId = TestTransactionIds.newTransactionId();
threads[i] = new Thread(() -> {
try {
barrier.await();
Lock lock = lockManager.acquire(txId, key, IX).join();
granted.incrementAndGet();
lockManager.release(lock);
} catch (Exception e) {
error.compareAndSet(null, e);
}
});
threads[i].start();
}

for (Thread t : threads) {
t.join(10_000);
}

assertNull(error.get(), () -> "Unexpected error: " + error.get());
assertEquals(threadCount, granted.get(), "All IX locks should be granted");
assertTrue(lockManager.queue(key).isEmpty());
}

private UUID[] generate(int num) {
UUID[] tmp = new UUID[num];

Expand Down