Skip to content

[improve][ml] Optimize read entries with skipCondition#22560

Open
dao-jun wants to merge 32 commits intoapache:masterfrom
dao-jun:dev/ml_parallel_read
Open

[improve][ml] Optimize read entries with skipCondition#22560
dao-jun wants to merge 32 commits intoapache:masterfrom
dao-jun:dev/ml_parallel_read

Conversation

@dao-jun
Copy link
Member

@dao-jun dao-jun commented Apr 23, 2024

Motivation

In #19035 we introduced skipCondition to filter-out delay delivery messages before read entries from Bookkeeper, and in #21739, we also filter-out deleted(individual acked messages) before read entries.

However, it will lead to one situation: one single segment can be spit into segments. For example:
entries to be filter-out: [3, 5, 7]
entries to read: [1, 10]
then, it will be split into: [[1,2],[4], [6], [8,10]].

In the current implementation, after read [1,2] finished, then begin to read [4], after read [4] finished, then start to read [6]...
It will lead to latency increasing, memory(allocated for entries) will also be retained for a longer period of time, and affect the throughput of the system.

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@dao-jun dao-jun added area/ML category/performance Performance issues fix or improvements labels Apr 23, 2024
@dao-jun dao-jun added this to the 3.3.0 milestone Apr 23, 2024
@dao-jun dao-jun self-assigned this Apr 23, 2024
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 23, 2024
@dao-jun dao-jun changed the title [improve][ml] Make ManagedLedger read entries parallel [improve][ml] Optimize read entries with skiCondition Apr 24, 2024
@dao-jun dao-jun changed the title [improve][ml] Optimize read entries with skiCondition [improve][ml] Optimize read entries with skipCondition Apr 25, 2024
@dao-jun dao-jun marked this pull request as ready for review April 25, 2024 08:18
@dao-jun dao-jun closed this Apr 25, 2024
@dao-jun dao-jun reopened this Apr 25, 2024
@dao-jun
Copy link
Member Author

dao-jun commented Jun 11, 2025

@dao-jun This PR will be a good foundation to switching to use BookKeeper's BP-62 Batch Read API (introduced in BK 4.17.0) in Pulsar. One of the gotchas of BP-62 is that a batch read might not return all entries even if there are remaining entries to be read. It seems that it would play nicely with the changes that are made in this PR so that the next read could continue to retry and read more. Would you be interested in implementing BP-62 support in Pulsar after this current PR has been merged?

@lhotari Yes, I'd like to handle the case. And I created a PR to support Bookie batch read about a year ago, it's not finished yet. dao-jun#13

@dao-jun dao-jun requested a review from lhotari June 11, 2025 14:47
@lhotari
Copy link
Member

lhotari commented Sep 23, 2025

@dao-jun Please rebase once again

@lhotari
Copy link
Member

lhotari commented Feb 6, 2026

@dao-jun Please rebase once again

@dao-jun
Copy link
Member Author

dao-jun commented Feb 7, 2026

@lhotari will do next few days, I'm working on kafka with bookkeeper now

dao-jun added 2 commits March 3, 2026 23:53
# Conflicts:
#	managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
#	managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
#	managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
#	managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@dao-jun
Copy link
Member Author

dao-jun commented Mar 4, 2026

@lhotari Rebased, PTAL

long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
callback, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parallelizes a logical read into multiple BK range reads, but timeout tracking still relies on a single lastReadCallback, which each sub‑read overwrites here.

That breaks the existing read-timeout semantics. For example, if Range A stalls while later ranges B and C update the lastReadCallback, then checkReadTimeout() cannot detect Range A, while BatchReadEntriesCallback may still be waiting for it. In that scenario, the outer read could hang indefinitely instead of timing out.

Should we track all pending sub-read callbacks for one OpReadEntry?

I think we also need a regression test for asyncReadEntriesWithSkip(...) with timeout enabled and more than one split range.

Copy link
Member

@lhotari lhotari Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sidenote: In the case of multiple reads, it would be useful to have a separate timeout value. So that an individual read would be retried until the total read timeout is met. The reason for this is that when a single read times out, all read results are discarded. This is already a problem with the individual reads, so it should be something that is handled in a separate PR. This particular detail is why the use of the current read timeout setting results in a cascading effect that adds more load to the system when the system is already under high load. I guess the impact wouldn't be so great if the entries are present in the broker cache when the retry happens. I haven't checked the detail whether partial results are already added to the broker cache. That might already be handled.

lastValidEntry = entryId;
}
// Skip entries that don't match the predicate
LongSortedSet entryIds = new LongAVLTreeSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still adds avoidable allocations and CPU in the hot path that this PR aims to optimize. Would it be simpler to build contiguous ranges directly during the scan and preserve result ordering by range index or submission order in BatchReadEntriesCallback? That should eliminate the set and at least one of the sorts.

Comment on lines +23 to +26
final class MutablePositionImpl implements Position {

private volatile long ledgerId;
private volatile long entryId;
Copy link
Member

@lhotari lhotari Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possibility would be to use non-volatile fields and make it explicit that this class isn't thread-safe and shouldn't be used for purposes where the reference is held after a method call.

The benefit would be that volatile reads and writes would be avoided when this is used for the intended use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/ML category/performance Performance issues fix or improvements doc-not-needed Your PR changes do not impact docs ready-to-test release/4.0.10 release/4.1.4 triage/lhotari/important lhotari's triaging label for important issues or PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants