From 861fe4873484e46dfed6874b38c4e8aa6da2470c Mon Sep 17 00:00:00 2001 From: leolinchen Date: Wed, 21 Sep 2022 00:41:35 +0800 Subject: [PATCH 01/12] When isAutoSkipNonRecoverableData=true, fix the problem that the markdelete position does not move forward --- .../bookkeeper/mledger/impl/EntryImpl.java | 16 +++ .../bookkeeper/mledger/impl/OpReadEntry.java | 9 ++ .../service/AbstractBaseDispatcher.java | 23 ++++ ...PersistentDispatcherMultipleConsumers.java | 3 +- ...sistentDispatcherSingleActiveConsumer.java | 10 +- .../service/BrokerBkEnsemblesTests.java | 101 +++++++++++++++++- 6 files changed, 157 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index c387aa970adb6..ae7251e5d853d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -43,6 +43,7 @@ protected EntryImpl newObject(Handle handle) { private long ledgerId; private long entryId; ByteBuf data; + private boolean skipped; public static EntryImpl create(LedgerEntry ledgerEntry) { EntryImpl entry = RECYCLER.get(); @@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) { return entry; } + public static EntryImpl create(long ledgerId, long entryId, boolean skipped) { + EntryImpl entry = RECYCLER.get(); + entry.timestamp = System.nanoTime(); + entry.ledgerId = ledgerId; + entry.entryId = entryId; + entry.skipped = skipped; + entry.data = Unpooled.wrappedBuffer(new byte[0]); + entry.setRefCnt(1); + return entry; + } + public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) { EntryImpl entry = RECYCLER.get(); entry.timestamp = System.nanoTime(); @@ -146,6 +158,10 @@ public long getEntryId() { return entryId; } + public boolean skipped() { + return skipped; + } + @Override public int compareTo(EntryImpl other) { if (this.ledgerId != other.ledgerId) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index b90527834398a..9dc89cfc5b8e9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -116,6 +116,15 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { recycle(); return; } + List skippedEntries = new ArrayList<>(); + PositionImpl startPosition = readPosition; + PositionImpl endPosition = (PositionImpl) nexReadPosition; + while (startPosition.compareTo(endPosition) < 0) { + skippedEntries.add(EntryImpl.create(startPosition.ledgerId, startPosition.entryId, true)); + startPosition = cursor.ledger.getNextValidPosition(startPosition); + } + List filteredEntries = cursor.filterReadEntries(skippedEntries); + entries.addAll(filteredEntries); updateReadPosition(nexReadPosition); checkReadCompletion(); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index df02bbd85d470..147033700a173 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.service; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +33,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; @@ -41,6 +44,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; @@ -91,6 +95,25 @@ public int filterEntriesForConsumer(List entries, EntryBatchSiz isReplayRead, consumer); } + /** + * 1. Acknowledge skipped messages; + * 2. Filter out skipped messages; + */ + public List filterAndAcknowledgeSkippedEntry(List entries) { + List skippedPositions = new ArrayList<>(); + List filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> { + if (entry instanceof EntryImpl) { + if (((EntryImpl) entry).skipped()) { + skippedPositions.add(new PositionImpl(entry.getLedgerId(), entry.getEntryId())); + return false; + } + } + return true; + })); + subscription.acknowledgeMessage(skippedPositions, CommandAck.AckType.Individual, Collections.emptyMap()); + return filterEntries; + } + /** * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 15b42fedd38ab..5093ff394a94b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -564,7 +564,8 @@ public final synchronized void readEntriesComplete(List entries, Object c protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List entries) { sendInProgress = true; try { - return trySendMessagesToConsumers(readType, entries); + List filterEntries = filterAndAcknowledgeSkippedEntry(entries); + return trySendMessagesToConsumers(readType, filterEntries); } finally { sendInProgress = false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 3ba7a82aa5e35..1dcbad02a1597 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -20,7 +20,11 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; import io.netty.util.Recycler; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -36,6 +40,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.tuple.Pair; @@ -51,6 +57,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException; import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; @@ -146,7 +153,8 @@ protected void cancelPendingRead() { @Override public void readEntriesComplete(final List entries, Object obj) { topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> { - internalReadEntriesComplete(entries, obj); + List filterEntries = filterAndAcknowledgeSkippedEntry(entries); + internalReadEntriesComplete(filterEntries, obj); })); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 8b0dbb76eecb7..aea388e9a53ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; import java.lang.reflect.Field; import java.util.Map.Entry; @@ -40,9 +38,11 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.util.StringUtils; import org.apache.pulsar.broker.BrokerTestUtil; @@ -166,6 +166,101 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { consumer.close(); } + @Test + public void testSkipCorruptDataLedgerAndCheckMarkdelete() throws Exception { + // Ensure intended state for autoSkipNonRecoverableData + admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true"); + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String ns1 = "prop/usc/crash-broker"; + final int totalMessages = 100; + final int totalDataLedgers = 5; + final int entriesPerLedger = totalMessages / totalDataLedgers; + + try { + admin.namespaces().createNamespace(ns1); + } catch (Exception e) { + } + + final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis(); + // Create subscription + Consumer consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") + .receiverQueueSize(5).subscribe(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); + Field configField = ManagedCursorImpl.class.getDeclaredField("config"); + configField.setAccessible(true); + // Create multiple data-ledger + ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + // bookkeeper client + Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper"); + bookKeeperField.setAccessible(true); + // Create multiple data-ledger + BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml); + + // (1) publish messages in 5 data-ledgers each with 20 entries under managed-ledger + Producer producer = client.newProducer().topic(topic1).create(); + for (int i = 0; i < totalMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + producer.close(); + + Message msg = null; + // (2) consume 20 messages from first ledger + for (int i = 0; i < entriesPerLedger; i++) { + msg = consumer.receive(); + consumer.acknowledge(msg); + } + consumer.close(); + PositionImpl markDeletePosition1 = (PositionImpl) cursor.getMarkDeletedPosition(); + + // (3) delete first 4 data-ledgers and clear cache + NavigableMap ledgerInfo = ml.getLedgersInfo(); + Entry lastLedger = ledgerInfo.lastEntry(); + ledgerInfo.entrySet().forEach(entry -> { + if (!entry.equals(lastLedger)) { + assertEquals(entry.getValue().getEntries(), entriesPerLedger); + try { + bookKeeper.deleteLedger(entry.getKey()); + } catch (Exception e) { + log.warn("failed to delete ledger {}", entry.getKey(), e); + } + } + }); + pulsar.getBrokerService().removeTopicFromCache(topic1); + ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); + Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + ConcurrentHashMap> ledgers = (ConcurrentHashMap>) field + .get(factory); + ledgers.clear(); + + // (4) consumer will be able to consume 20 messages from last non-deleted ledger + consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") + .receiverQueueSize(5).subscribe(); + for (int i = 0; i < entriesPerLedger; i++) { + msg = consumer.receive(); + consumer.acknowledge(msg); + } + consumer.close(); + + topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); + ml = (ManagedLedgerImpl) topic.getManagedLedger(); + cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); + PositionImpl markDeletePosition2 = (PositionImpl) cursor.getMarkDeletedPosition(); + // markDeletePosition moves forward + assertTrue(markDeletePosition2.compareTo(markDeletePosition1) > 0); + } + /** * It verifies broker-configuration using which broker can skip non-recoverable data-ledgers. * From 5c698056f5d32445bba40fbb0dae4b5b381d0a5c Mon Sep 17 00:00:00 2001 From: leolinchen Date: Wed, 21 Sep 2022 09:00:19 +0800 Subject: [PATCH 02/12] check style --- .../PersistentDispatcherSingleActiveConsumer.java | 7 ------- .../pulsar/broker/service/BrokerBkEnsemblesTests.java | 1 - 2 files changed, 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 1dcbad02a1597..7b7b856e915a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -20,11 +20,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; import io.netty.util.Recycler; -import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -40,8 +36,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; -import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.tuple.Pair; @@ -57,7 +51,6 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException; import org.apache.pulsar.client.impl.Backoff; -import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index aea388e9a53ec..7fea47f0808e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -38,7 +38,6 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; From 1df5552a508965de0cab972076f6e511e2327739 Mon Sep 17 00:00:00 2001 From: leolinchen Date: Sun, 25 Sep 2022 21:47:16 +0800 Subject: [PATCH 03/12] Reuse filterEntriesForConsumer method --- .../service/AbstractBaseDispatcher.java | 26 +++++-------------- ...PersistentDispatcherMultipleConsumers.java | 3 +-- ...sistentDispatcherSingleActiveConsumer.java | 3 +-- 3 files changed, 9 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 147033700a173..151cbd82f6ad3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -95,25 +95,6 @@ public int filterEntriesForConsumer(List entries, EntryBatchSiz isReplayRead, consumer); } - /** - * 1. Acknowledge skipped messages; - * 2. Filter out skipped messages; - */ - public List filterAndAcknowledgeSkippedEntry(List entries) { - List skippedPositions = new ArrayList<>(); - List filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> { - if (entry instanceof EntryImpl) { - if (((EntryImpl) entry).skipped()) { - skippedPositions.add(new PositionImpl(entry.getLedgerId(), entry.getEntryId())); - return false; - } - } - return true; - })); - subscription.acknowledgeMessage(skippedPositions, CommandAck.AckType.Individual, Collections.emptyMap()); - return filterEntries; - } - /** * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry. * @@ -141,6 +122,13 @@ public int filterEntriesForConsumer(Optional optMetadataArray if (entry == null) { continue; } + if (entry instanceof EntryImpl) { + EntryImpl entryImpl = (EntryImpl) entry; + if (entryImpl.skipped()) { + entriesToFiltered.add(entryImpl.getPosition()); + } + continue; + } ByteBuf metadataAndPayload = entry.getDataBuffer(); final int metadataIndex = i + startOffset; final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex]) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 5093ff394a94b..15b42fedd38ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -564,8 +564,7 @@ public final synchronized void readEntriesComplete(List entries, Object c protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List entries) { sendInProgress = true; try { - List filterEntries = filterAndAcknowledgeSkippedEntry(entries); - return trySendMessagesToConsumers(readType, filterEntries); + return trySendMessagesToConsumers(readType, entries); } finally { sendInProgress = false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 7b7b856e915a8..3ba7a82aa5e35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -146,8 +146,7 @@ protected void cancelPendingRead() { @Override public void readEntriesComplete(final List entries, Object obj) { topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> { - List filterEntries = filterAndAcknowledgeSkippedEntry(entries); - internalReadEntriesComplete(filterEntries, obj); + internalReadEntriesComplete(entries, obj); })); } From b667e013fcae8fe01df9136061911c42761dbaee Mon Sep 17 00:00:00 2001 From: leolinchen Date: Sun, 25 Sep 2022 21:49:35 +0800 Subject: [PATCH 04/12] check style --- .../apache/pulsar/broker/service/AbstractBaseDispatcher.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 151cbd82f6ad3..4bcdbc0df1176 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -19,8 +19,6 @@ package org.apache.pulsar.broker.service; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import java.util.ArrayList; import java.util.Collections; @@ -44,7 +42,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; From 52ee2caaa9d89d6d9b7bc50d0171f2f398db4665 Mon Sep 17 00:00:00 2001 From: leolinchen Date: Sun, 25 Sep 2022 21:53:24 +0800 Subject: [PATCH 05/12] check style --- .../apache/pulsar/broker/service/BrokerBkEnsemblesTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 7fea47f0808e2..fe43d18538629 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -19,7 +19,10 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.fail; import java.lang.reflect.Field; import java.util.Map.Entry; From 8ae5d560afba5054273cb484fc75a0cf26c7438f Mon Sep 17 00:00:00 2001 From: leolinchen Date: Tue, 27 Sep 2022 13:56:22 +0800 Subject: [PATCH 06/12] fix test --- .../apache/pulsar/broker/service/AbstractBaseDispatcher.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 4bcdbc0df1176..d6de265c02f73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -122,9 +122,12 @@ public int filterEntriesForConsumer(Optional optMetadataArray if (entry instanceof EntryImpl) { EntryImpl entryImpl = (EntryImpl) entry; if (entryImpl.skipped()) { + if (entriesToFiltered == null) { + entriesToFiltered = new ArrayList<>(); + } entriesToFiltered.add(entryImpl.getPosition()); + continue; } - continue; } ByteBuf metadataAndPayload = entry.getDataBuffer(); final int metadataIndex = i + startOffset; From ecf4b63bdfba89ece559f3382453912608ee97d6 Mon Sep 17 00:00:00 2001 From: leolinchen Date: Tue, 27 Sep 2022 19:41:36 +0800 Subject: [PATCH 07/12] 1.add final;2.use getter --- .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java | 6 +++--- .../org/apache/bookkeeper/mledger/impl/OpReadEntry.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index ae7251e5d853d..5ba5f91d646f3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -67,13 +67,13 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) { return entry; } - public static EntryImpl create(long ledgerId, long entryId, boolean skipped) { + public static EntryImpl createSkippedEntry(long ledgerId, long entryId) { EntryImpl entry = RECYCLER.get(); entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; - entry.skipped = skipped; - entry.data = Unpooled.wrappedBuffer(new byte[0]); + entry.skipped = true; + entry.data = Unpooled.EMPTY_BUFFER; entry.setRefCnt(1); return entry; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 9dc89cfc5b8e9..0369f04f3164a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -120,7 +120,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { PositionImpl startPosition = readPosition; PositionImpl endPosition = (PositionImpl) nexReadPosition; while (startPosition.compareTo(endPosition) < 0) { - skippedEntries.add(EntryImpl.create(startPosition.ledgerId, startPosition.entryId, true)); + skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId)); startPosition = cursor.ledger.getNextValidPosition(startPosition); } List filteredEntries = cursor.filterReadEntries(skippedEntries); From 59a1369112ac4137019e646ece1e8f1d87fb39c0 Mon Sep 17 00:00:00 2001 From: leolinchen Date: Tue, 27 Sep 2022 19:58:22 +0800 Subject: [PATCH 08/12] use getManagedLedger method --- .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java | 2 +- .../java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 5ba5f91d646f3..ac9622e0a0a59 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -42,8 +42,8 @@ protected EntryImpl newObject(Handle handle) { private long timestamp; private long ledgerId; private long entryId; - ByteBuf data; private boolean skipped; + ByteBuf data; public static EntryImpl create(LedgerEntry ledgerEntry) { EntryImpl entry = RECYCLER.get(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 0369f04f3164a..2c4b6d1603c68 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -116,12 +116,13 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { recycle(); return; } + final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger(); List skippedEntries = new ArrayList<>(); PositionImpl startPosition = readPosition; PositionImpl endPosition = (PositionImpl) nexReadPosition; while (startPosition.compareTo(endPosition) < 0) { skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId)); - startPosition = cursor.ledger.getNextValidPosition(startPosition); + startPosition = ledger.getNextValidPosition(startPosition); } List filteredEntries = cursor.filterReadEntries(skippedEntries); entries.addAll(filteredEntries); From ff79d7d9ca4f6718aec174cd17ac17c86029605a Mon Sep 17 00:00:00 2001 From: leolinchen Date: Tue, 27 Sep 2022 21:37:46 +0800 Subject: [PATCH 09/12] fix entry.release and entries.set(i,null) --- .../apache/pulsar/broker/service/AbstractBaseDispatcher.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index d6de265c02f73..8436f8503f773 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -126,6 +126,8 @@ public int filterEntriesForConsumer(Optional optMetadataArray entriesToFiltered = new ArrayList<>(); } entriesToFiltered.add(entryImpl.getPosition()); + entries.set(i, null); + entry.release(); continue; } } From a8500520e5b83fce6d05cd528c92492fcced4ac3 Mon Sep 17 00:00:00 2001 From: leolinchen Date: Wed, 28 Sep 2022 14:30:33 +0800 Subject: [PATCH 10/12] reset skipped when deallocate --- .../main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index ac9622e0a0a59..b840c36225060 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -188,6 +188,7 @@ protected void deallocate() { timestamp = -1; ledgerId = -1; entryId = -1; + skipped = false; recyclerHandle.recycle(this); } From 5beed9db096790518109ba14338153d79b1dfa10 Mon Sep 17 00:00:00 2001 From: leolinchen Date: Thu, 29 Sep 2022 16:50:05 +0800 Subject: [PATCH 11/12] add config maxAckEntryNumForAutoSkipNonRecoverableData --- .../bookkeeper/mledger/ManagedLedgerConfig.java | 11 +++++++++++ .../apache/bookkeeper/mledger/impl/OpReadEntry.java | 8 +++++++- .../apache/pulsar/broker/ServiceConfiguration.java | 9 ++++++++- .../apache/pulsar/broker/service/BrokerService.java | 2 ++ 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 24228d8d6d238..a5e3cfb3d69fd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -86,6 +86,8 @@ public class ManagedLedgerConfig { private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + private int maxAckEntryNumForAutoSkipNonRecoverableData = 10000; + public boolean isCreateIfMissing() { return createIfMissing; } @@ -95,6 +97,15 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) { return this; } + public int getMaxAckEntryNumForAutoSkipNonRecoverableData() { + return maxAckEntryNumForAutoSkipNonRecoverableData; + } + + public ManagedLedgerConfig setMaxAckEntryNumForAutoSkipNonRecoverableData(int maxAckEntryNum) { + this.maxAckEntryNumForAutoSkipNonRecoverableData = maxAckEntryNum; + return this; + } + /** * @return the lazyCursorRecovery */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 2c4b6d1603c68..5a040ce9a7db9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -116,13 +116,19 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { recycle(); return; } - final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger(); + + int toAckEntryNum = 0; List skippedEntries = new ArrayList<>(); PositionImpl startPosition = readPosition; PositionImpl endPosition = (PositionImpl) nexReadPosition; while (startPosition.compareTo(endPosition) < 0) { skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId)); startPosition = ledger.getNextValidPosition(startPosition); + toAckEntryNum++; + if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) { + nexReadPosition = startPosition; + break; + } } List filteredEntries = cursor.filterReadEntries(skippedEntries); entries.addAll(filteredEntries); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 6683d36c36e06..1111c93fa2607 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2008,7 +2008,14 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se doc = "Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.\n\n" + " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger." ) - private boolean autoSkipNonRecoverableData = false; + private boolean autoSkipNonRecoverableData = false; + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "When autoSkipNonRecoverableData=true, " + + "the upper limit of the number of entries skipped by automatic ack." + ) + private int maxAckEntryNumForAutoSkipNonRecoverableData = 10000; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "operation timeout while updating managed-ledger metadata." diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 0ff8bfac45741..cd1f1dfa6fde3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1663,6 +1663,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t } } + managedLedgerConfig.setMaxAckEntryNumForAutoSkipNonRecoverableData( + serviceConfig.getMaxAckEntryNumForAutoSkipNonRecoverableData()); managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); From 51cf7f0d573b7507a542be5a1c7346c3afbda68c Mon Sep 17 00:00:00 2001 From: LinChen <1572139390@qq.com> Date: Thu, 29 Sep 2022 19:55:18 +0800 Subject: [PATCH 12/12] Update pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java Co-authored-by: JiangHaiting --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1111c93fa2607..13fbcbb785dfd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2008,7 +2008,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se doc = "Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.\n\n" + " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger." ) - private boolean autoSkipNonRecoverableData = false; + private boolean autoSkipNonRecoverableData = false; @FieldContext( dynamic = true, category = CATEGORY_STORAGE_ML,