[improve][ml] Optimize read entries with skipCondition#22560
[improve][ml] Optimize read entries with skipCondition#22560dao-jun wants to merge 32 commits intoapache:masterfrom
Conversation
# Conflicts: # managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@lhotari Yes, I'd like to handle the case. And I created a PR to support Bookie batch read about a year ago, it's not finished yet. dao-jun#13 |
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java
Outdated
Show resolved
Hide resolved
|
@dao-jun Please rebase once again |
|
@dao-jun Please rebase once again |
|
@lhotari will do next few days, I'm working on kafka with bookkeeper now |
# Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java # managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java # managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
|
@lhotari Rebased, PTAL |
| long createdTime = System.nanoTime(); | ||
| ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, | ||
| callback, readOpCount, createdTime, ctx); | ||
| lastReadCallback = readCallback; |
There was a problem hiding this comment.
This parallelizes a logical read into multiple BK range reads, but timeout tracking still relies on a single lastReadCallback, which each sub‑read overwrites here.
That breaks the existing read-timeout semantics. For example, if Range A stalls while later ranges B and C update the lastReadCallback, then checkReadTimeout() cannot detect Range A, while BatchReadEntriesCallback may still be waiting for it. In that scenario, the outer read could hang indefinitely instead of timing out.
Should we track all pending sub-read callbacks for one OpReadEntry?
I think we also need a regression test for asyncReadEntriesWithSkip(...) with timeout enabled and more than one split range.
There was a problem hiding this comment.
Sidenote: In the case of multiple reads, it would be useful to have a separate timeout value. So that an individual read would be retried until the total read timeout is met. The reason for this is that when a single read times out, all read results are discarded. This is already a problem with the individual reads, so it should be something that is handled in a separate PR. This particular detail is why the use of the current read timeout setting results in a cascading effect that adds more load to the system when the system is already under high load. I guess the impact wouldn't be so great if the entries are present in the broker cache when the retry happens. I haven't checked the detail whether partial results are already added to the broker cache. That might already be handled.
| lastValidEntry = entryId; | ||
| } | ||
| // Skip entries that don't match the predicate | ||
| LongSortedSet entryIds = new LongAVLTreeSet(); |
There was a problem hiding this comment.
This still adds avoidable allocations and CPU in the hot path that this PR aims to optimize. Would it be simpler to build contiguous ranges directly during the scan and preserve result ordering by range index or submission order in BatchReadEntriesCallback? That should eliminate the set and at least one of the sorts.
| final class MutablePositionImpl implements Position { | ||
|
|
||
| private volatile long ledgerId; | ||
| private volatile long entryId; |
There was a problem hiding this comment.
One possibility would be to use non-volatile fields and make it explicit that this class isn't thread-safe and shouldn't be used for purposes where the reference is held after a method call.
The benefit would be that volatile reads and writes would be avoided when this is used for the intended use case.
Motivation
In #19035 we introduced
skipConditionto filter-out delay delivery messages before read entries from Bookkeeper, and in #21739, we also filter-out deleted(individual acked messages) before read entries.However, it will lead to one situation: one single segment can be spit into segments. For example:
entries to be filter-out: [3, 5, 7]
entries to read: [1, 10]
then, it will be split into: [[1,2],[4], [6], [8,10]].
In the current implementation, after read [1,2] finished, then begin to read [4], after read [4] finished, then start to read [6]...
It will lead to latency increasing, memory(allocated for entries) will also be retained for a longer period of time, and affect the throughput of the system.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: