entries = filterEntries();
+ if (!entries.isEmpty()) {
+ // Move the read position of the cursor to the next position of the last read entry,
+ // or we will deliver the same entry to the consumer more than once.
+ Entry entry = entries.get(entries.size() - 1);
+ Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
+ Position nextReadPosition = callback.cursor.getNextAvailablePosition(position);
+ callback.updateReadPosition(nextReadPosition);
+ }
+ callback.internalReadEntriesFailed(entries, exception, ctx);
+ }
+
+ /**
+ * Filter the entries that have been read success.
+ *
+ * If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed,
+ * only return [1,2] to the caller, to make sure the read operation success as possible
+ * and keep the ordering guarantee.
+ *
+ * @return filtered entries
+ */
+ private List filterEntries() {
+ if (entries.isEmpty()) {
+ return Collections.emptyList();
+ }
+ entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR);
+ List entries0 = new ArrayList<>();
+ for (long entryId : entryIds) {
+ if (this.entries.isEmpty()) {
+ break;
+ }
+ Entry entry = this.entries.remove(0);
+ if (entry.getEntryId() == entryId) {
+ entries0.add(entry);
+ } else {
+ entry.release();
+ break;
+ }
+ }
+ // Release the entries that are not in the result.
+ for (Entry entry : entries) {
+ entry.release();
+ }
+ return entries0;
}
- asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}
protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) {
@@ -2443,6 +2556,22 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry
}
}
+ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, ManagedCursorImpl cursor,
+ ReadEntriesCallback callback, Object ctx) {
+ IntSupplier expectedReadCount = cursor::getNumberOfCursorsAtSamePositionOrBefore;
+ if (config.getReadEntryTimeoutSeconds() > 0) {
+ // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
+ long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+ long createdTime = System.nanoTime();
+ ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
+ callback, readOpCount, createdTime, ctx);
+ lastReadCallback = readCallback;
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount);
+ } else {
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, callback, ctx);
+ }
+ }
+
static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {
volatile ReadEntryCallback readEntryCallback;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java
new file mode 100644
index 0000000000000..c4584da798c48
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import org.apache.bookkeeper.mledger.Position;
+
+final class MutablePositionImpl implements Position {
+
+ private volatile long ledgerId;
+ private volatile long entryId;
+
+ MutablePositionImpl(long ledgerId, long entryId) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ MutablePositionImpl() {
+ this.ledgerId = -1;
+ this.entryId = -1;
+ }
+
+ /**
+ * Change the ledgerId and entryId.
+ *
+ * @param ledgerId
+ * @param entryId
+ */
+ public void changePositionTo(long ledgerId, long entryId) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ @Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ @Override
+ public long getEntryId() {
+ return entryId;
+ }
+
+ /**
+ * String representation of virtual cursor - LedgerId:EntryId.
+ */
+ @Override
+ public String toString() {
+ return ledgerId + ":" + entryId;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCodeForPosition();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Position && compareTo((Position) obj) == 0;
+ }
+
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index b618a25aa3d75..26df4895e4314 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -21,6 +21,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
@@ -31,6 +32,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,8 +81,8 @@ public static OpReadEntry create(ManagedCursorImpl cursor, Position readPosition
return op;
}
- private void internalReadEntriesComplete(List returnedEntries) {
- if (returnedEntries.isEmpty()) {
+ void internalReadEntriesComplete(List returnedEntries, Position lastReadPosition) {
+ if (returnedEntries.isEmpty() && lastReadPosition == null) {
log.warn("[{}] Read no entries unexpectedly", this);
checkReadCompletion();
return;
@@ -99,13 +101,13 @@ private void internalReadEntriesComplete(List returnedEntries) {
}
// Entries might be released after `filterReadEntries`, so retrieve the last position before that
- final var lastPosition = returnedEntries.get(entriesCount - 1).getPosition();
+ final var lastPosition = lastReadPosition != null ? lastReadPosition
+ : returnedEntries.get(entriesCount - 1).getPosition();
final var filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);
// if entries have been filtered out then try to skip reading of already deletedMessages in that range
- final Position nexReadPosition = entriesCount != filteredEntries.size()
- ? cursor.getNextAvailablePosition(lastPosition) : lastPosition.getNext();
+ final Position nexReadPosition = cursor.getNextAvailablePosition(lastPosition);
updateReadPosition(nexReadPosition);
checkReadCompletion();
}
@@ -113,7 +115,7 @@ private void internalReadEntriesComplete(List returnedEntries) {
@Override
public void readEntriesComplete(List returnedEntries, Object ctx) {
try {
- internalReadEntriesComplete(returnedEntries);
+ internalReadEntriesComplete(returnedEntries, null);
} catch (Throwable throwable) {
log.error("[{}] Fallback to readEntriesFailed for exception in readEntriesComplete", this, throwable);
readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx);
@@ -123,16 +125,18 @@ public void readEntriesComplete(List returnedEntries, Object ctx) {
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
try {
- internalReadEntriesFailed(exception, ctx);
+ internalReadEntriesFailed(null, exception, ctx);
} catch (Throwable throwable) {
// At least we should complete the callback
fail(ManagedLedgerException.getManagedLedgerException(throwable), ctx);
}
}
- private void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ void internalReadEntriesFailed(Collection ret, ManagedLedgerException exception, Object ctx) {
cursor.readOperationCompleted();
-
+ if (CollectionUtils.isNotEmpty(ret)) {
+ entries.addAll(ret);
+ }
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can return them
complete(ctx);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 522e12903201c..61da2a770d1df 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -5011,7 +5011,7 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception {
})
.when(ledger)
.asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(),
- Mockito.anyLong(), Mockito.any(), Mockito.any());
+ Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");
@@ -5117,7 +5117,7 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws
})
.when(ledger)
.asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(),
- Mockito.anyLong(), Mockito.any(), Mockito.any());
+ Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 32f7bf6b77956..514b30ae770c7 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -48,6 +48,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
+import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
+import it.unimi.dsi.fastutil.longs.LongLongPair;
+import it.unimi.dsi.fastutil.longs.LongSortedSet;
import java.lang.reflect.Field;
import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;
@@ -5337,4 +5340,193 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
// Verify properties are preserved after cursor reset
assertEquals(cursor.getProperties(), expectedProperties);
}
+
+ @Test
+ public void testToRanges() {
+ LongSortedSet set = new LongAVLTreeSet();
+ set.add(1L);
+ set.add(2L);
+ set.add(4L);
+ set.add(6L);
+ set.add(7L);
+ set.add(8L);
+ set.add(10L);
+
+ List ranges = ManagedLedgerImpl.toRanges(set);
+ assertEquals(ranges.size(), 4);
+
+ LongLongPair pair0 = ranges.get(0);
+ assertEquals(pair0.firstLong(), 1L);
+ assertEquals(pair0.secondLong(), 2L);
+
+ LongLongPair pair1 = ranges.get(1);
+ assertEquals(pair1.firstLong(), 4L);
+ assertEquals(pair1.secondLong(), 4L);
+
+ LongLongPair pair2 = ranges.get(2);
+ assertEquals(pair2.firstLong(), 6L);
+ assertEquals(pair2.secondLong(), 8L);
+
+ LongLongPair pair3 = ranges.get(3);
+ assertEquals(pair3.firstLong(), 10L);
+ assertEquals(pair3.secondLong(), 10L);
+ }
+
+
+ @Test
+ public void testBatchReadEntriesCallback() throws Exception {
+ @Cleanup
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) factory.open("testBatchReadEntriesCallback");
+ @Cleanup
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor");
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ List entries = new ArrayList<>();
+ OpReadEntry opReadEntry = OpReadEntry.create(cursor, cursor.readPosition, 10, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List entries0, Object ctx) {
+ entries.addAll(entries0);
+ latch.countDown();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, ledger.lastConfirmedEntry, position -> position.getEntryId() % 2 == 0, true);
+
+ LongSortedSet entryIds = new LongAVLTreeSet();
+ entryIds.add(1L);
+ entryIds.add(3L);
+ entryIds.add(5L);
+ entryIds.add(7L);
+ entryIds.add(9L);
+ ManagedLedgerImpl.BatchReadEntriesCallback callback = new ManagedLedgerImpl
+ .BatchReadEntriesCallback(entryIds, opReadEntry, null);
+ long ledgerId = ledger.currentLedger.getId();
+
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null);
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[3])), null);
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 7, new byte[7])), null);
+ callback.readEntriesFailed(
+ new ManagedLedgerException.InvalidCursorPositionException("Invalid cursor position"), null);
+ // After call readEntriesFailed, the following readEntriesComplete should be ignored.
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[5])), null);
+
+ latch.await();
+ // should not fail
+ assertFalse(failed.get());
+ assertEquals(entries.size(), 2);
+
+ // `entries` should be only the entries with entryId 1 and 3.
+ assertEquals(entries.get(0).getEntryId(), 1);
+ assertEquals(entries.get(1).getEntryId(), 3);
+
+ // ReadPosition should be updated to [4]
+ assertEquals(cursor.getReadPosition().getEntryId(), 4);
+ }
+
+ @Test
+ public void testReadEntriesFromDifferentLedgersWithSkipCondition() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(5);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config);
+ ledger = Mockito.spy(ledger);
+
+ AtomicInteger counter = new AtomicInteger();
+ Mockito.doAnswer(inv -> {
+ counter.incrementAndGet();
+ return inv.callRealMethod();
+ }).when(ledger).asyncReadEntries(Mockito.any());
+ @Cleanup
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor");
+
+ Position lastPosition = null;
+ for (int i = 0; i < 12; i++) {
+ lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ List entries = new ArrayList<>();
+ cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List entries0, Object ctx) {
+ entries.addAll(entries0);
+ latch.countDown();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0);
+
+ latch.await();
+ assertFalse(failed.get());
+ assertEquals(entries.size(), 5);
+ // Read entries from 3 ledgers, the counter is 3.
+ assertEquals(counter.get(), 3);
+ Position readPosition = cursor.getReadPosition();
+ assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId()
+ && readPosition.getEntryId() == lastPosition.getEntryId() + 1);
+ }
+
+ @Test
+ public void testReadEntriesFromOneSameLedgerWithSkipCondition() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config);
+ ledger = Mockito.spy(ledger);
+
+ AtomicInteger counter = new AtomicInteger();
+ Mockito.doAnswer(inv -> {
+ counter.incrementAndGet();
+ return inv.callRealMethod();
+ }).when(ledger).asyncReadEntries(Mockito.any());
+
+ @Cleanup
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor");
+
+ Position lastPosition = null;
+ for (int i = 0; i < 10; i++) {
+ lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ List entries = new ArrayList<>();
+ cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List entries0, Object ctx) {
+ entries.addAll(entries0);
+ latch.countDown();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0);
+
+ latch.await();
+ assertEquals(counter.get(), 1);
+
+ assertFalse(failed.get());
+ assertEquals(entries.size(), 5);
+
+ Position readPosition = cursor.getReadPosition();
+ assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId()
+ && readPosition.getEntryId() == lastPosition.getEntryId() + 1);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
index 962749fbd49e9..4577db8c4167e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
@@ -218,7 +218,7 @@ public void testRedelivery(boolean useOpenRangeSet) throws Exception {
assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0);
// markDelete position should be one position behind read position
- assertEquals(cursor.getReadPosition(), cursor.getMarkDeletedPosition().getNext());
+ assertEquals(cursor.getReadPosition(), cursor.getNextAvailablePosition(cursor.getMarkDeletedPosition()));
producer.close();
consumer2.close();