[fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack#17751
Conversation
|
@eolivelli @codelipenghui @Technoboy- PTAL,thanks! |
d1ee2ee to
2f52eb1
Compare
2f52eb1 to
253273c
Compare
| */ | ||
| public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) { | ||
| List<Position> skippedPositions = new ArrayList<>(); | ||
| List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> { |
There was a problem hiding this comment.
I wonder if we can move this into filterEntriesForConsumer? This way we save some allocations
| entry.ledgerId = ledgerId; | ||
| entry.entryId = entryId; | ||
| entry.skipped = skipped; | ||
| entry.data = Unpooled.wrappedBuffer(new byte[0]); |
There was a problem hiding this comment.
do we really need to allocate this ? what about using a constant ?
There was a problem hiding this comment.
entry.data = Unpooled.EMPTY_BUFFER ?
| return entry; | ||
| } | ||
|
|
||
| public static EntryImpl create(long ledgerId, long entryId, boolean skipped) { |
There was a problem hiding this comment.
what about "createSkippedEntry" ?
otherwise people may want to use this factory method for other purposes
5b0c3a2 to
e835c9f
Compare
|
@codelipenghui @Technoboy- @Jason918 PTAL,thanks! |
| if (entriesToFiltered == null) { | ||
| entriesToFiltered = new ArrayList<>(); | ||
| } | ||
| entriesToFiltered.add(entryImpl.getPosition()); |
There was a problem hiding this comment.
Do we need entries.set(i, null); and entry.release();?
There was a problem hiding this comment.
OK , I will fix
| PositionImpl endPosition = (PositionImpl) nexReadPosition; | ||
| while (startPosition.compareTo(endPosition) < 0) { | ||
| skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId)); | ||
| startPosition = ledger.getNextValidPosition(startPosition); |
There was a problem hiding this comment.
It seems, normally, it will just goes from readPosition to nexReadPosition?
Will you miss other entries to be acked?
There was a problem hiding this comment.
It seems, normally, it will just goes from readPosition to nexReadPosition?
YES
Will you miss other entries to be acked?
IMO, won't miss other entries to be acked
|
It seems that #21210 addresses the same issue. |
Motivation
When isAutoSkipNonRecoverableData=true and individual ack, the markdelete position does not move forward.
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
Lines 100 to 114 in 72dd01b
Modifications
Documentation
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)
Matching PR in forked repository
PR in forked repository: lordcheng10#19