Skip to content

[fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack#17751

Open
lordcheng10 wants to merge 12 commits intoapache:masterfrom
lordcheng10:fix_skip_entry_no_ack
Open

[fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack#17751
lordcheng10 wants to merge 12 commits intoapache:masterfrom
lordcheng10:fix_skip_entry_no_ack

Conversation

@lordcheng10
Copy link
Contributor

@lordcheng10 lordcheng10 commented Sep 20, 2022

Motivation

When isAutoSkipNonRecoverableData=true and individual ack, the markdelete position does not move forward.

} else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
// try to find and move to next valid ledger
final Position nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
// fail callback if it couldn't find next valid ledger
if (nexReadPosition == null) {
callback.readEntriesFailed(exception, ctx);
cursor.ledger.mbean.recordReadEntriesError();
recycle();
return;
}
updateReadPosition(nexReadPosition);
checkReadCompletion();
} else {

Modifications

  1. Add a skipped variable in EntryImpl to record whether the entry is automatically skipped;
  2. When pushing entrys to the consumer, automatically ack the skipped entrys;

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Matching PR in forked repository

PR in forked repository: lordcheng10#19

@lordcheng10 lordcheng10 changed the title When isAutoSkipNonRecoverableData=true, fix the problem that the mark… [fix][broker] Fix the problem that the markdelete position does not move forward Sep 20, 2022
@lordcheng10 lordcheng10 changed the title [fix][broker] Fix the problem that the markdelete position does not move forward [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true Sep 20, 2022
@lordcheng10 lordcheng10 changed the title [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack Sep 20, 2022
@lordcheng10
Copy link
Contributor Author

@eolivelli @codelipenghui @Technoboy- PTAL,thanks!

*/
public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
List<Position> skippedPositions = new ArrayList<>();
List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can move this into filterEntriesForConsumer? This way we save some allocations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!
@eolivelli PTAL,thanks!

entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.skipped = skipped;
entry.data = Unpooled.wrappedBuffer(new byte[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to allocate this ? what about using a constant ?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entry.data = Unpooled.EMPTY_BUFFER ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! PTAL,thanks! @eolivelli

return entry;
}

public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about "createSkippedEntry" ?
otherwise people may want to use this factory method for other purposes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! PTAL,thanks! @eolivelli

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lordcheng10
Copy link
Contributor Author

@codelipenghui @Technoboy- @Jason918 PTAL,thanks!

if (entriesToFiltered == null) {
entriesToFiltered = new ArrayList<>();
}
entriesToFiltered.add(entryImpl.getPosition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need entries.set(i, null); and entry.release();?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, PTAL, thanks! @Jason918

PositionImpl endPosition = (PositionImpl) nexReadPosition;
while (startPosition.compareTo(endPosition) < 0) {
skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
startPosition = ledger.getNextValidPosition(startPosition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?
Will you miss other entries to be acked?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?

YES

Will you miss other entries to be acked?

IMO, won't miss other entries to be acked

@lordcheng10 lordcheng10 requested review from Jason918 and removed request for AnonHxy September 27, 2022 13:43
@github-actions github-actions bot removed the Stale label Jun 28, 2023
@Technoboy- Technoboy- added this to the 3.2.0 milestone Jul 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
@lhotari
Copy link
Member

lhotari commented Oct 14, 2024

It seems that #21210 addresses the same issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker doc-not-needed Your PR changes do not impact docs release/2.8.5 release/2.10.4 release/2.11.1 release/4.0.10 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.