Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;

private int maxAckEntryNumForAutoSkipNonRecoverableData = 10000;

public boolean isCreateIfMissing() {
return createIfMissing;
}
Expand All @@ -95,6 +97,15 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
return this;
}

public int getMaxAckEntryNumForAutoSkipNonRecoverableData() {
return maxAckEntryNumForAutoSkipNonRecoverableData;
}

public ManagedLedgerConfig setMaxAckEntryNumForAutoSkipNonRecoverableData(int maxAckEntryNum) {
this.maxAckEntryNumForAutoSkipNonRecoverableData = maxAckEntryNum;
return this;
}

/**
* @return the lazyCursorRecovery
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private long timestamp;
private long ledgerId;
private long entryId;
private boolean skipped;
ByteBuf data;

public static EntryImpl create(LedgerEntry ledgerEntry) {
Expand All @@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
return entry;
}

public static EntryImpl createSkippedEntry(long ledgerId, long entryId) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.skipped = true;
entry.data = Unpooled.EMPTY_BUFFER;
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
Expand Down Expand Up @@ -146,6 +158,10 @@ public long getEntryId() {
return entryId;
}

public boolean skipped() {
return skipped;
}

@Override
public int compareTo(EntryImpl other) {
if (this.ledgerId != other.ledgerId) {
Expand All @@ -172,6 +188,7 @@ protected void deallocate() {
timestamp = -1;
ledgerId = -1;
entryId = -1;
skipped = false;
recyclerHandle.recycle(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
recycle();
return;
}

int toAckEntryNum = 0;
List<Entry> skippedEntries = new ArrayList<>();
PositionImpl startPosition = readPosition;
PositionImpl endPosition = (PositionImpl) nexReadPosition;
while (startPosition.compareTo(endPosition) < 0) {
skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
startPosition = ledger.getNextValidPosition(startPosition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?
Will you miss other entries to be acked?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?

YES

Will you miss other entries to be acked?

IMO, won't miss other entries to be acked

toAckEntryNum++;
if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the state of the cursor is like this:

read position {1:0}
individual deleted messages [ {1:0}, {1:50001} ]

After the entry filter, the nexReadPosition will be {1,10000} and filteredEntries will be {1,0}, then maxAckEntryNumForAutoSkipNonRecoverableData could not work perfect. Can we let cursor do the filtering?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix

nexReadPosition = startPosition;
break;
}
}
List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just drop the data with the instruction cursor.delete(positions)? This saves the memory of entries and is easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I will fix

entries.addAll(filteredEntries);
updateReadPosition(nexReadPosition);
checkReadCompletion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,6 +2009,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
+ " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger."
)
private boolean autoSkipNonRecoverableData = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "When autoSkipNonRecoverableData=true, "
+ "the upper limit of the number of entries skipped by automatic ack."
)
private int maxAckEntryNumForAutoSkipNonRecoverableData = 10000;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "operation timeout while updating managed-ledger metadata."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -118,6 +119,18 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
if (entry == null) {
continue;
}
if (entry instanceof EntryImpl) {
EntryImpl entryImpl = (EntryImpl) entry;
if (entryImpl.skipped()) {
if (entriesToFiltered == null) {
entriesToFiltered = new ArrayList<>();
}
entriesToFiltered.add(entryImpl.getPosition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need entries.set(i, null); and entry.release();?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, PTAL, thanks! @Jason918

entries.set(i, null);
entry.release();
continue;
}
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
final int metadataIndex = i + startOffset;
final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
}
}

managedLedgerConfig.setMaxAckEntryNumForAutoSkipNonRecoverableData(
serviceConfig.getMaxAckEntryNumForAutoSkipNonRecoverableData());
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;

Expand All @@ -43,6 +44,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand Down Expand Up @@ -166,6 +168,101 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
consumer.close();
}

@Test
public void testSkipCorruptDataLedgerAndCheckMarkdelete() throws Exception {
// Ensure intended state for autoSkipNonRecoverableData
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();

final String ns1 = "prop/usc/crash-broker";
final int totalMessages = 100;
final int totalDataLedgers = 5;
final int entriesPerLedger = totalMessages / totalDataLedgers;

try {
admin.namespaces().createNamespace(ns1);
} catch (Exception e) {
}

final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
// Create subscription
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
Field configField = ManagedCursorImpl.class.getDeclaredField("config");
configField.setAccessible(true);
// Create multiple data-ledger
ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
// bookkeeper client
Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
bookKeeperField.setAccessible(true);
// Create multiple data-ledger
BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);

// (1) publish messages in 5 data-ledgers each with 20 entries under managed-ledger
Producer<byte[]> producer = client.newProducer().topic(topic1).create();
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
producer.close();

Message<byte[]> msg = null;
// (2) consume 20 messages from first ledger
for (int i = 0; i < entriesPerLedger; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
PositionImpl markDeletePosition1 = (PositionImpl) cursor.getMarkDeletedPosition();

// (3) delete first 4 data-ledgers and clear cache
NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
ledgerInfo.entrySet().forEach(entry -> {
if (!entry.equals(lastLedger)) {
assertEquals(entry.getValue().getEntries(), entriesPerLedger);
try {
bookKeeper.deleteLedger(entry.getKey());
} catch (Exception e) {
log.warn("failed to delete ledger {}", entry.getKey(), e);
}
}
});
pulsar.getBrokerService().removeTopicFromCache(topic1);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field
.get(factory);
ledgers.clear();

// (4) consumer will be able to consume 20 messages from last non-deleted ledger
consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();
for (int i = 0; i < entriesPerLedger; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();

topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ml = (ManagedLedgerImpl) topic.getManagedLedger();
cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
PositionImpl markDeletePosition2 = (PositionImpl) cursor.getMarkDeletedPosition();
// markDeletePosition moves forward
assertTrue(markDeletePosition2.compareTo(markDeletePosition1) > 0);
}

/**
* It verifies broker-configuration using which broker can skip non-recoverable data-ledgers.
*
Expand Down
Loading