Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7de1758
Improve for skipCondition.
dao-jun Jan 4, 2024
302ab0d
Merge branch 'master' into dev/ml_parallel_read
dao-jun Jan 19, 2024
7236265
code improve
dao-jun Jan 20, 2024
eb505c0
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun Apr 21, 2024
6a4f740
Fix code
dao-jun Apr 23, 2024
b0a1f9b
Fix code
dao-jun Apr 23, 2024
a622263
Fix code
dao-jun Apr 23, 2024
a04d788
Fix code
dao-jun Apr 24, 2024
3949101
Fix code
dao-jun Apr 25, 2024
9eec2ba
Fix code
dao-jun Apr 25, 2024
e656c45
Fix code
dao-jun Apr 25, 2024
b71b022
Fix code
dao-jun Apr 25, 2024
cbd449a
fix codestyle
dao-jun Apr 25, 2024
0cbe85a
fix tests
dao-jun Apr 25, 2024
d675bae
improve code
dao-jun Apr 26, 2024
5a30448
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun Apr 27, 2024
f438312
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun May 3, 2024
ec248c0
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun May 8, 2024
bba345a
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun May 13, 2024
60e090d
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun Oct 17, 2024
ad47126
rebase master
dao-jun Oct 17, 2024
ae51bbc
Merge remote-tracking branch 'origin/master' into dev/ml_parallel_read
lhotari Oct 22, 2024
47fca1d
Merge branch 'master' into dev/ml_parallel_read
dao-jun May 26, 2025
8786644
fix code
dao-jun May 26, 2025
8c700ec
address comment
dao-jun May 26, 2025
9f0a50d
address comment
dao-jun Jun 3, 2025
b826594
Address review comment
dao-jun Jun 11, 2025
9f1245a
Address review comment
dao-jun Jun 11, 2025
11824bc
Address review comment
dao-jun Jun 11, 2025
cf51802
Address review comment
dao-jun Jun 12, 2025
62c70ec
Merge branch 'master' into dev/ml_parallel_read
dao-jun Mar 3, 2026
34951b8
rebase master
dao-jun Mar 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static Position create(long ledgerId, long entryId) {
return new ImmutablePositionImpl(ledgerId, entryId);
}


/**
* Create a new position or returns the other instance if it's immutable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@

@SuppressWarnings("checkstyle:javadoctype")
public class ManagedCursorImpl implements ManagedCursor {
private static final Comparator<Entry> ENTRY_COMPARATOR = (e1, e2) -> {
static final Comparator<Entry> ENTRY_COMPARATOR = (e1, e2) -> {
if (e1.getLedgerId() != e2.getLedgerId()) {
return e1.getLedgerId() < e2.getLedgerId() ? -1 : 1;
}
Expand Down Expand Up @@ -3928,14 +3928,17 @@ public long[] getBatchPositionAckSet(Position position) {
public Position getNextAvailablePosition(Position position) {
lock.readLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
return ledger.getNextValidPosition(position);
}
Range<Position> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
position.getEntryId());
if (range != null) {
Position nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0)
? nextPosition : position.getNext();
}
return position.getNext();
return ledger.getNextValidPosition(position);
} finally {
lock.readLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
import it.unimi.dsi.fastutil.longs.LongLongPair;
import it.unimi.dsi.fastutil.longs.LongSortedSet;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -2373,43 +2376,153 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)

long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);

// Filer out and skip unnecessary read entry
if (opReadEntry.skipCondition != null) {
long firstValidEntry = -1L;
long lastValidEntry = -1L;
long entryId = firstEntry;
for (; entryId <= lastEntry; entryId++) {
if (opReadEntry.skipCondition.test(PositionFactory.create(ledger.getId(), entryId))) {
if (firstValidEntry != -1L) {
break;
}
} else {
if (firstValidEntry == -1L) {
firstValidEntry = entryId;
}
Predicate<Position> skipCondition = opReadEntry.skipCondition;
if (skipCondition == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
return;
}

lastValidEntry = entryId;
}
// Skip entries that don't match the predicate
LongSortedSet entryIds = new LongAVLTreeSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still adds avoidable allocations and CPU in the hot path that this PR aims to optimize. Would it be simpler to build contiguous ranges directly during the scan and preserve result ordering by range index or submission order in BatchReadEntriesCallback? That should eliminate the set and at least one of the sorts.

MutablePositionImpl position = new MutablePositionImpl();
for (long entryId = firstEntry; entryId <= lastEntry; entryId++) {
position.changePositionTo(ledger.getId(), entryId);
if (skipCondition.test(position)) {
continue;
}
entryIds.add(entryId);
}

// If all messages in [firstEntry...lastEntry] are filter out,
// then manual call internalReadEntriesComplete to advance read position.
if (firstValidEntry == -1L) {
final var nextReadPosition = PositionFactory.create(ledger.getId(), lastEntry).getNext();
opReadEntry.updateReadPosition(nextReadPosition);
opReadEntry.checkReadCompletion();
return;
Position lastReadPosition = PositionFactory.create(ledger.getId(), lastEntry);
if (entryIds.isEmpty()) {
final var nextReadPosition = lastReadPosition.getNext();
opReadEntry.updateReadPosition(nextReadPosition);
opReadEntry.checkReadCompletion();
return;
}

List<LongLongPair> ranges = toRanges(entryIds);
ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry, lastReadPosition);
for (LongLongPair pair : ranges) {
long start = pair.firstLong();
long end = pair.secondLong();
asyncReadEntry(ledger, start, end, opReadEntry.cursor, callback, opReadEntry.ctx);
}
}

@VisibleForTesting
public static List<LongLongPair> toRanges(LongSortedSet entryIds) {
List<LongLongPair> ranges = new ArrayList<>();
long start = entryIds.firstLong();
long end = start;
for (long entryId : entryIds) {
if (entryId - end > 1) {
ranges.add(LongLongPair.of(start, end));
start = entryId;
end = start;
} else {
end = entryId;
}
}
ranges.add(LongLongPair.of(start, end));
return ranges;
}

@VisibleForTesting
public static class BatchReadEntriesCallback implements ReadEntriesCallback {
private final LongSortedSet entryIds;
private final List<Entry> entries;
private final OpReadEntry callback;
private volatile boolean completed = false;
private final Position lastReadPosition;

firstEntry = firstValidEntry;
lastEntry = lastValidEntry;
@VisibleForTesting
public BatchReadEntriesCallback(LongSortedSet entryIdSet, OpReadEntry callback, Position lastReadPosition) {
this.entryIds = entryIdSet;
this.entries = new ArrayList<>(entryIdSet.size());
this.callback = callback;
this.lastReadPosition = lastReadPosition;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
@Override
public synchronized void readEntriesComplete(List<Entry> entries0, Object ctx) {
if (completed) {
for (Entry entry : entries0) {
entry.release();
}
return;
}
entries.addAll(entries0);
// If read empty batch from Bookie, we have to complete the call.
// Otherwise, it maybe blocks forever, see: PR 24515.
if (entries.size() < entryIds.size() && !entries0.isEmpty()) {
return;
}
completed = true;
// Make sure the entries are in the correct order
entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR);
// If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3], [4,5] are filtered, so we need to pass
// the `lastReadPosition([5])` to make sure the cursor read position is correct.
// If we pass nonnull `lastReadPosition` to call if the entries0.isEmpty, it will skip some entries.
callback.internalReadEntriesComplete(entries, entries0.isEmpty() ? null : lastReadPosition);
}

@Override
public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (completed) {
return;
}
completed = true;
// If there are entries been read success, try to let the read operation success as possible.
List<Entry> entries = filterEntries();
if (!entries.isEmpty()) {
// Move the read position of the cursor to the next position of the last read entry,
// or we will deliver the same entry to the consumer more than once.
Entry entry = entries.get(entries.size() - 1);
Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
Position nextReadPosition = callback.cursor.getNextAvailablePosition(position);
callback.updateReadPosition(nextReadPosition);
}
callback.internalReadEntriesFailed(entries, exception, ctx);
}

/**
* Filter the entries that have been read success.
* <p>
* If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed,
* only return [1,2] to the caller, to make sure the read operation success as possible
* and keep the ordering guarantee.
*
* @return filtered entries
*/
private List<Entry> filterEntries() {
if (entries.isEmpty()) {
return Collections.emptyList();
}
entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR);
List<Entry> entries0 = new ArrayList<>();
for (long entryId : entryIds) {
if (this.entries.isEmpty()) {
break;
}
Entry entry = this.entries.remove(0);
if (entry.getEntryId() == entryId) {
entries0.add(entry);
} else {
entry.release();
break;
}
}
// Release the entries that are not in the result.
for (Entry entry : entries) {
entry.release();
}
return entries0;
}
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}

protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) {
Expand Down Expand Up @@ -2443,6 +2556,22 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry
}
}

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, ManagedCursorImpl cursor,
ReadEntriesCallback callback, Object ctx) {
IntSupplier expectedReadCount = cursor::getNumberOfCursorsAtSamePositionOrBefore;
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
callback, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parallelizes a logical read into multiple BK range reads, but timeout tracking still relies on a single lastReadCallback, which each sub‑read overwrites here.

That breaks the existing read-timeout semantics. For example, if Range A stalls while later ranges B and C update the lastReadCallback, then checkReadTimeout() cannot detect Range A, while BatchReadEntriesCallback may still be waiting for it. In that scenario, the outer read could hang indefinitely instead of timing out.

Should we track all pending sub-read callbacks for one OpReadEntry?

I think we also need a regression test for asyncReadEntriesWithSkip(...) with timeout enabled and more than one split range.

Copy link
Member

@lhotari lhotari Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sidenote: In the case of multiple reads, it would be useful to have a separate timeout value. So that an individual read would be retried until the total read timeout is met. The reason for this is that when a single read times out, all read results are discarded. This is already a problem with the individual reads, so it should be something that is handled in a separate PR. This particular detail is why the use of the current read timeout setting results in a cascading effect that adds more load to the system when the system is already under high load. I guess the impact wouldn't be so great if the entries are present in the broker cache when the retry happens. I haven't checked the detail whether partial results are already added to the broker cache. That might already be handled.

entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, callback, ctx);
}
}

static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {

volatile ReadEntryCallback readEntryCallback;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import org.apache.bookkeeper.mledger.Position;

final class MutablePositionImpl implements Position {

private volatile long ledgerId;
private volatile long entryId;
Comment on lines +23 to +26
Copy link
Member

@lhotari lhotari Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possibility would be to use non-volatile fields and make it explicit that this class isn't thread-safe and shouldn't be used for purposes where the reference is held after a method call.

The benefit would be that volatile reads and writes would be avoided when this is used for the intended use case.


MutablePositionImpl(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
}

MutablePositionImpl() {
this.ledgerId = -1;
this.entryId = -1;
}

/**
* Change the ledgerId and entryId.
*
* @param ledgerId
* @param entryId
*/
public void changePositionTo(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
}

@Override
public long getLedgerId() {
return ledgerId;
}

@Override
public long getEntryId() {
return entryId;
}

/**
* String representation of virtual cursor - LedgerId:EntryId.
*/
@Override
public String toString() {
return ledgerId + ":" + entryId;
}

@Override
public int hashCode() {
return hashCodeForPosition();
}

@Override
public boolean equals(Object obj) {
return obj instanceof Position && compareTo((Position) obj) == 0;
}

}
Loading
Loading