-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack #17751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
861fe48
5c69805
1df5552
b667e01
52ee2ca
8ae5d56
ecf4b63
59a1369
ff79d7d
a850052
5beed9d
51cf7f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,6 +116,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { | |
| recycle(); | ||
| return; | ||
| } | ||
|
|
||
| int toAckEntryNum = 0; | ||
| List<Entry> skippedEntries = new ArrayList<>(); | ||
| PositionImpl startPosition = readPosition; | ||
| PositionImpl endPosition = (PositionImpl) nexReadPosition; | ||
| while (startPosition.compareTo(endPosition) < 0) { | ||
| skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId)); | ||
| startPosition = ledger.getNextValidPosition(startPosition); | ||
| toAckEntryNum++; | ||
| if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the state of the cursor is like this: After the entry filter, the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix |
||
| nexReadPosition = startPosition; | ||
| break; | ||
| } | ||
| } | ||
| List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just drop the data with the instruction
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK , I will fix |
||
| entries.addAll(filteredEntries); | ||
| updateReadPosition(nexReadPosition); | ||
| checkReadCompletion(); | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| import org.apache.bookkeeper.mledger.Entry; | ||
| import org.apache.bookkeeper.mledger.ManagedCursor; | ||
| import org.apache.bookkeeper.mledger.Position; | ||
| import org.apache.bookkeeper.mledger.impl.EntryImpl; | ||
| import org.apache.bookkeeper.mledger.impl.PositionImpl; | ||
| import org.apache.commons.collections4.CollectionUtils; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
|
|
@@ -118,6 +119,18 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray | |
| if (entry == null) { | ||
| continue; | ||
| } | ||
| if (entry instanceof EntryImpl) { | ||
| EntryImpl entryImpl = (EntryImpl) entry; | ||
| if (entryImpl.skipped()) { | ||
| if (entriesToFiltered == null) { | ||
| entriesToFiltered = new ArrayList<>(); | ||
| } | ||
| entriesToFiltered.add(entryImpl.getPosition()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK , I will fix
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, PTAL, thanks! @Jason918 |
||
| entries.set(i, null); | ||
| entry.release(); | ||
| continue; | ||
| } | ||
| } | ||
| ByteBuf metadataAndPayload = entry.getDataBuffer(); | ||
| final int metadataIndex = i + startOffset; | ||
| final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex]) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems, normally, it will just goes from
readPositiontonexReadPosition?Will you miss other entries to be acked?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YES
IMO, won't miss other entries to be acked