-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][ml] Optimize read entries with skipCondition #22560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7de1758
302ab0d
7236265
eb505c0
6a4f740
b0a1f9b
a622263
a04d788
3949101
9eec2ba
e656c45
b71b022
cbd449a
0cbe85a
d675bae
5a30448
f438312
ec248c0
bba345a
60e090d
ad47126
ae51bbc
47fca1d
8786644
8c700ec
9f0a50d
b826594
9f1245a
11824bc
cf51802
62c70ec
34951b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,9 @@ | |
| import io.netty.buffer.Unpooled; | ||
| import io.netty.util.Recycler; | ||
| import io.netty.util.Recycler.Handle; | ||
| import it.unimi.dsi.fastutil.longs.LongAVLTreeSet; | ||
| import it.unimi.dsi.fastutil.longs.LongLongPair; | ||
| import it.unimi.dsi.fastutil.longs.LongSortedSet; | ||
| import java.time.Clock; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
|
|
@@ -2373,43 +2376,153 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) | |
|
|
||
| long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); | ||
|
|
||
| // Filer out and skip unnecessary read entry | ||
| if (opReadEntry.skipCondition != null) { | ||
| long firstValidEntry = -1L; | ||
| long lastValidEntry = -1L; | ||
| long entryId = firstEntry; | ||
| for (; entryId <= lastEntry; entryId++) { | ||
| if (opReadEntry.skipCondition.test(PositionFactory.create(ledger.getId(), entryId))) { | ||
| if (firstValidEntry != -1L) { | ||
| break; | ||
| } | ||
| } else { | ||
| if (firstValidEntry == -1L) { | ||
| firstValidEntry = entryId; | ||
| } | ||
| Predicate<Position> skipCondition = opReadEntry.skipCondition; | ||
| if (skipCondition == null) { | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, | ||
| lastEntry); | ||
| } | ||
| asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); | ||
| return; | ||
| } | ||
|
|
||
| lastValidEntry = entryId; | ||
| } | ||
| // Skip entries that don't match the predicate | ||
| LongSortedSet entryIds = new LongAVLTreeSet(); | ||
| MutablePositionImpl position = new MutablePositionImpl(); | ||
| for (long entryId = firstEntry; entryId <= lastEntry; entryId++) { | ||
| position.changePositionTo(ledger.getId(), entryId); | ||
| if (skipCondition.test(position)) { | ||
| continue; | ||
| } | ||
| entryIds.add(entryId); | ||
| } | ||
|
|
||
| // If all messages in [firstEntry...lastEntry] are filter out, | ||
| // then manual call internalReadEntriesComplete to advance read position. | ||
| if (firstValidEntry == -1L) { | ||
| final var nextReadPosition = PositionFactory.create(ledger.getId(), lastEntry).getNext(); | ||
| opReadEntry.updateReadPosition(nextReadPosition); | ||
| opReadEntry.checkReadCompletion(); | ||
| return; | ||
| Position lastReadPosition = PositionFactory.create(ledger.getId(), lastEntry); | ||
| if (entryIds.isEmpty()) { | ||
| final var nextReadPosition = lastReadPosition.getNext(); | ||
| opReadEntry.updateReadPosition(nextReadPosition); | ||
| opReadEntry.checkReadCompletion(); | ||
| return; | ||
| } | ||
|
|
||
| List<LongLongPair> ranges = toRanges(entryIds); | ||
| ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry, lastReadPosition); | ||
| for (LongLongPair pair : ranges) { | ||
| long start = pair.firstLong(); | ||
| long end = pair.secondLong(); | ||
| asyncReadEntry(ledger, start, end, opReadEntry.cursor, callback, opReadEntry.ctx); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public static List<LongLongPair> toRanges(LongSortedSet entryIds) { | ||
| List<LongLongPair> ranges = new ArrayList<>(); | ||
| long start = entryIds.firstLong(); | ||
| long end = start; | ||
| for (long entryId : entryIds) { | ||
| if (entryId - end > 1) { | ||
| ranges.add(LongLongPair.of(start, end)); | ||
| start = entryId; | ||
| end = start; | ||
| } else { | ||
| end = entryId; | ||
| } | ||
| } | ||
| ranges.add(LongLongPair.of(start, end)); | ||
| return ranges; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public static class BatchReadEntriesCallback implements ReadEntriesCallback { | ||
| private final LongSortedSet entryIds; | ||
| private final List<Entry> entries; | ||
| private final OpReadEntry callback; | ||
| private volatile boolean completed = false; | ||
| private final Position lastReadPosition; | ||
|
|
||
| firstEntry = firstValidEntry; | ||
| lastEntry = lastValidEntry; | ||
| @VisibleForTesting | ||
| public BatchReadEntriesCallback(LongSortedSet entryIdSet, OpReadEntry callback, Position lastReadPosition) { | ||
| this.entryIds = entryIdSet; | ||
| this.entries = new ArrayList<>(entryIdSet.size()); | ||
| this.callback = callback; | ||
| this.lastReadPosition = lastReadPosition; | ||
| } | ||
|
|
||
| if (log.isDebugEnabled()) { | ||
| log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, | ||
| lastEntry); | ||
| @Override | ||
| public synchronized void readEntriesComplete(List<Entry> entries0, Object ctx) { | ||
| if (completed) { | ||
| for (Entry entry : entries0) { | ||
| entry.release(); | ||
| } | ||
| return; | ||
| } | ||
| entries.addAll(entries0); | ||
| // If read empty batch from Bookie, we have to complete the call. | ||
| // Otherwise, it maybe blocks forever, see: PR 24515. | ||
| if (entries.size() < entryIds.size() && !entries0.isEmpty()) { | ||
| return; | ||
| } | ||
| completed = true; | ||
| // Make sure the entries are in the correct order | ||
| entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR); | ||
| // If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3], [4,5] are filtered, so we need to pass | ||
| // the `lastReadPosition([5])` to make sure the cursor read position is correct. | ||
| // If we pass nonnull `lastReadPosition` to call if the entries0.isEmpty, it will skip some entries. | ||
| callback.internalReadEntriesComplete(entries, entries0.isEmpty() ? null : lastReadPosition); | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) { | ||
| if (completed) { | ||
| return; | ||
| } | ||
| completed = true; | ||
| // If there are entries been read success, try to let the read operation success as possible. | ||
| List<Entry> entries = filterEntries(); | ||
| if (!entries.isEmpty()) { | ||
| // Move the read position of the cursor to the next position of the last read entry, | ||
| // or we will deliver the same entry to the consumer more than once. | ||
| Entry entry = entries.get(entries.size() - 1); | ||
| Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId()); | ||
| Position nextReadPosition = callback.cursor.getNextAvailablePosition(position); | ||
| callback.updateReadPosition(nextReadPosition); | ||
| } | ||
| callback.internalReadEntriesFailed(entries, exception, ctx); | ||
| } | ||
|
|
||
| /** | ||
| * Filter the entries that have been read success. | ||
| * <p> | ||
| * If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed, | ||
| * only return [1,2] to the caller, to make sure the read operation success as possible | ||
| * and keep the ordering guarantee. | ||
| * | ||
| * @return filtered entries | ||
| */ | ||
| private List<Entry> filterEntries() { | ||
dao-jun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (entries.isEmpty()) { | ||
| return Collections.emptyList(); | ||
| } | ||
| entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR); | ||
| List<Entry> entries0 = new ArrayList<>(); | ||
| for (long entryId : entryIds) { | ||
| if (this.entries.isEmpty()) { | ||
| break; | ||
| } | ||
| Entry entry = this.entries.remove(0); | ||
dao-jun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (entry.getEntryId() == entryId) { | ||
| entries0.add(entry); | ||
| } else { | ||
| entry.release(); | ||
| break; | ||
| } | ||
| } | ||
| // Release the entries that are not in the result. | ||
| for (Entry entry : entries) { | ||
| entry.release(); | ||
| } | ||
| return entries0; | ||
| } | ||
| asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); | ||
| } | ||
|
|
||
| protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) { | ||
|
|
@@ -2443,6 +2556,22 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry | |
| } | ||
| } | ||
|
|
||
| protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, ManagedCursorImpl cursor, | ||
| ReadEntriesCallback callback, Object ctx) { | ||
| IntSupplier expectedReadCount = cursor::getNumberOfCursorsAtSamePositionOrBefore; | ||
| if (config.getReadEntryTimeoutSeconds() > 0) { | ||
| // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled | ||
| long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); | ||
| long createdTime = System.nanoTime(); | ||
| ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, | ||
| callback, readOpCount, createdTime, ctx); | ||
| lastReadCallback = readCallback; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This parallelizes a logical read into multiple BK range reads, but timeout tracking still relies on a single That breaks the existing read-timeout semantics. For example, if Range A stalls while later ranges B and C update the Should we track all pending sub-read callbacks for one I think we also need a regression test for
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sidenote: In the case of multiple reads, it would be useful to have a separate timeout value. So that an individual read would be retried until the total read timeout is met. The reason for this is that when a single read times out, all read results are discarded. This is already a problem with the individual reads, so it should be something that is handled in a separate PR. This particular detail is why the use of the current read timeout setting results in a cascading effect that adds more load to the system when the system is already under high load. I guess the impact wouldn't be so great if the entries are present in the broker cache when the retry happens. I haven't checked the detail whether partial results are already added to the broker cache. That might already be handled. |
||
| entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount); | ||
| } else { | ||
| entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, callback, ctx); | ||
| } | ||
| } | ||
|
|
||
| static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback { | ||
|
|
||
| volatile ReadEntryCallback readEntryCallback; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.bookkeeper.mledger.impl; | ||
|
|
||
| import org.apache.bookkeeper.mledger.Position; | ||
|
|
||
| final class MutablePositionImpl implements Position { | ||
|
|
||
| private volatile long ledgerId; | ||
| private volatile long entryId; | ||
|
Comment on lines
+23
to
+26
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One possibility would be to use non-volatile fields and make it explicit that this class isn't thread-safe and shouldn't be used for purposes where the reference is held after a method call. The benefit would be that volatile reads and writes would be avoided when this is used for the intended use case. |
||
|
|
||
| MutablePositionImpl(long ledgerId, long entryId) { | ||
| this.ledgerId = ledgerId; | ||
| this.entryId = entryId; | ||
| } | ||
|
|
||
| MutablePositionImpl() { | ||
| this.ledgerId = -1; | ||
| this.entryId = -1; | ||
| } | ||
|
|
||
| /** | ||
| * Change the ledgerId and entryId. | ||
| * | ||
| * @param ledgerId | ||
| * @param entryId | ||
| */ | ||
| public void changePositionTo(long ledgerId, long entryId) { | ||
| this.ledgerId = ledgerId; | ||
| this.entryId = entryId; | ||
| } | ||
|
|
||
| @Override | ||
| public long getLedgerId() { | ||
| return ledgerId; | ||
| } | ||
|
|
||
| @Override | ||
| public long getEntryId() { | ||
| return entryId; | ||
| } | ||
|
|
||
| /** | ||
| * String representation of virtual cursor - LedgerId:EntryId. | ||
| */ | ||
| @Override | ||
| public String toString() { | ||
| return ledgerId + ":" + entryId; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return hashCodeForPosition(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object obj) { | ||
| return obj instanceof Position && compareTo((Position) obj) == 0; | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still adds avoidable allocations and CPU in the hot path that this PR aims to optimize. Would it be simpler to build contiguous ranges directly during the scan and preserve result ordering by range index or submission order in
BatchReadEntriesCallback? That should eliminate the set and at least one of the sorts.