diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index ef6a03d107040..0a2fd37182204 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -134,6 +134,11 @@ slf4j-api + + it.unimi.dsi + fastutil + + diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java index 481619b8db7a6..1a12d70a6ecc3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java @@ -48,6 +48,7 @@ public static Position create(long ledgerId, long entryId) { return new ImmutablePositionImpl(ledgerId, entryId); } + /** * Create a new position or returns the other instance if it's immutable. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7009ac750fe15..0ab4f48d91497 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -121,7 +121,7 @@ @SuppressWarnings("checkstyle:javadoctype") public class ManagedCursorImpl implements ManagedCursor { - private static final Comparator ENTRY_COMPARATOR = (e1, e2) -> { + static final Comparator ENTRY_COMPARATOR = (e1, e2) -> { if (e1.getLedgerId() != e2.getLedgerId()) { return e1.getLedgerId() < e2.getLedgerId() ? -1 : 1; } @@ -3928,6 +3928,9 @@ public long[] getBatchPositionAckSet(Position position) { public Position getNextAvailablePosition(Position position) { lock.readLock().lock(); try { + if (individualDeletedMessages.isEmpty()) { + return ledger.getNextValidPosition(position); + } Range range = individualDeletedMessages.rangeContaining(position.getLedgerId(), position.getEntryId()); if (range != null) { @@ -3935,7 +3938,7 @@ public Position getNextAvailablePosition(Position position) { return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext(); } - return position.getNext(); + return ledger.getNextValidPosition(position); } finally { lock.readLock().unlock(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ce0bf59edc70f..0f5ec9a6b8e2f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -31,6 +31,9 @@ import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; +import it.unimi.dsi.fastutil.longs.LongAVLTreeSet; +import it.unimi.dsi.fastutil.longs.LongLongPair; +import it.unimi.dsi.fastutil.longs.LongSortedSet; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; @@ -2373,43 +2376,153 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); - // Filer out and skip unnecessary read entry - if (opReadEntry.skipCondition != null) { - long firstValidEntry = -1L; - long lastValidEntry = -1L; - long entryId = firstEntry; - for (; entryId <= lastEntry; entryId++) { - if (opReadEntry.skipCondition.test(PositionFactory.create(ledger.getId(), entryId))) { - if (firstValidEntry != -1L) { - break; - } - } else { - if (firstValidEntry == -1L) { - firstValidEntry = entryId; - } + Predicate skipCondition = opReadEntry.skipCondition; + if (skipCondition == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, + lastEntry); + } + asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); + return; + } - lastValidEntry = entryId; - } + // Skip entries that don't match the predicate + LongSortedSet entryIds = new LongAVLTreeSet(); + MutablePositionImpl position = new MutablePositionImpl(); + for (long entryId = firstEntry; entryId <= lastEntry; entryId++) { + position.changePositionTo(ledger.getId(), entryId); + if (skipCondition.test(position)) { + continue; } + entryIds.add(entryId); + } - // If all messages in [firstEntry...lastEntry] are filter out, - // then manual call internalReadEntriesComplete to advance read position. - if (firstValidEntry == -1L) { - final var nextReadPosition = PositionFactory.create(ledger.getId(), lastEntry).getNext(); - opReadEntry.updateReadPosition(nextReadPosition); - opReadEntry.checkReadCompletion(); - return; + Position lastReadPosition = PositionFactory.create(ledger.getId(), lastEntry); + if (entryIds.isEmpty()) { + final var nextReadPosition = lastReadPosition.getNext(); + opReadEntry.updateReadPosition(nextReadPosition); + opReadEntry.checkReadCompletion(); + return; + } + + List ranges = toRanges(entryIds); + ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry, lastReadPosition); + for (LongLongPair pair : ranges) { + long start = pair.firstLong(); + long end = pair.secondLong(); + asyncReadEntry(ledger, start, end, opReadEntry.cursor, callback, opReadEntry.ctx); + } + } + + @VisibleForTesting + public static List toRanges(LongSortedSet entryIds) { + List ranges = new ArrayList<>(); + long start = entryIds.firstLong(); + long end = start; + for (long entryId : entryIds) { + if (entryId - end > 1) { + ranges.add(LongLongPair.of(start, end)); + start = entryId; + end = start; + } else { + end = entryId; } + } + ranges.add(LongLongPair.of(start, end)); + return ranges; + } + + @VisibleForTesting + public static class BatchReadEntriesCallback implements ReadEntriesCallback { + private final LongSortedSet entryIds; + private final List entries; + private final OpReadEntry callback; + private volatile boolean completed = false; + private final Position lastReadPosition; - firstEntry = firstValidEntry; - lastEntry = lastValidEntry; + @VisibleForTesting + public BatchReadEntriesCallback(LongSortedSet entryIdSet, OpReadEntry callback, Position lastReadPosition) { + this.entryIds = entryIdSet; + this.entries = new ArrayList<>(entryIdSet.size()); + this.callback = callback; + this.lastReadPosition = lastReadPosition; } - if (log.isDebugEnabled()) { - log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, - lastEntry); + @Override + public synchronized void readEntriesComplete(List entries0, Object ctx) { + if (completed) { + for (Entry entry : entries0) { + entry.release(); + } + return; + } + entries.addAll(entries0); + // If read empty batch from Bookie, we have to complete the call. + // Otherwise, it maybe blocks forever, see: PR 24515. + if (entries.size() < entryIds.size() && !entries0.isEmpty()) { + return; + } + completed = true; + // Make sure the entries are in the correct order + entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR); + // If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3], [4,5] are filtered, so we need to pass + // the `lastReadPosition([5])` to make sure the cursor read position is correct. + // If we pass nonnull `lastReadPosition` to call if the entries0.isEmpty, it will skip some entries. + callback.internalReadEntriesComplete(entries, entries0.isEmpty() ? null : lastReadPosition); + } + + @Override + public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + if (completed) { + return; + } + completed = true; + // If there are entries been read success, try to let the read operation success as possible. + List entries = filterEntries(); + if (!entries.isEmpty()) { + // Move the read position of the cursor to the next position of the last read entry, + // or we will deliver the same entry to the consumer more than once. + Entry entry = entries.get(entries.size() - 1); + Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId()); + Position nextReadPosition = callback.cursor.getNextAvailablePosition(position); + callback.updateReadPosition(nextReadPosition); + } + callback.internalReadEntriesFailed(entries, exception, ctx); + } + + /** + * Filter the entries that have been read success. + *

+ * If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed, + * only return [1,2] to the caller, to make sure the read operation success as possible + * and keep the ordering guarantee. + * + * @return filtered entries + */ + private List filterEntries() { + if (entries.isEmpty()) { + return Collections.emptyList(); + } + entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR); + List entries0 = new ArrayList<>(); + for (long entryId : entryIds) { + if (this.entries.isEmpty()) { + break; + } + Entry entry = this.entries.remove(0); + if (entry.getEntryId() == entryId) { + entries0.add(entry); + } else { + entry.release(); + break; + } + } + // Release the entries that are not in the result. + for (Entry entry : entries) { + entry.release(); + } + return entries0; } - asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); } protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) { @@ -2443,6 +2556,22 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry } } + protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, ManagedCursorImpl cursor, + ReadEntriesCallback callback, Object ctx) { + IntSupplier expectedReadCount = cursor::getNumberOfCursorsAtSamePositionOrBefore; + if (config.getReadEntryTimeoutSeconds() > 0) { + // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled + long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); + long createdTime = System.nanoTime(); + ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, + callback, readOpCount, createdTime, ctx); + lastReadCallback = readCallback; + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount); + } else { + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, callback, ctx); + } + } + static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback { volatile ReadEntryCallback readEntryCallback; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java new file mode 100644 index 0000000000000..c4584da798c48 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import org.apache.bookkeeper.mledger.Position; + +final class MutablePositionImpl implements Position { + + private volatile long ledgerId; + private volatile long entryId; + + MutablePositionImpl(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; + } + + MutablePositionImpl() { + this.ledgerId = -1; + this.entryId = -1; + } + + /** + * Change the ledgerId and entryId. + * + * @param ledgerId + * @param entryId + */ + public void changePositionTo(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; + } + + @Override + public long getLedgerId() { + return ledgerId; + } + + @Override + public long getEntryId() { + return entryId; + } + + /** + * String representation of virtual cursor - LedgerId:EntryId. + */ + @Override + public String toString() { + return ledgerId + ":" + entryId; + } + + @Override + public int hashCode() { + return hashCodeForPosition(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Position && compareTo((Position) obj) == 0; + } + +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index b618a25aa3d75..26df4895e4314 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -21,6 +21,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -31,6 +32,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,8 +81,8 @@ public static OpReadEntry create(ManagedCursorImpl cursor, Position readPosition return op; } - private void internalReadEntriesComplete(List returnedEntries) { - if (returnedEntries.isEmpty()) { + void internalReadEntriesComplete(List returnedEntries, Position lastReadPosition) { + if (returnedEntries.isEmpty() && lastReadPosition == null) { log.warn("[{}] Read no entries unexpectedly", this); checkReadCompletion(); return; @@ -99,13 +101,13 @@ private void internalReadEntriesComplete(List returnedEntries) { } // Entries might be released after `filterReadEntries`, so retrieve the last position before that - final var lastPosition = returnedEntries.get(entriesCount - 1).getPosition(); + final var lastPosition = lastReadPosition != null ? lastReadPosition + : returnedEntries.get(entriesCount - 1).getPosition(); final var filteredEntries = cursor.filterReadEntries(returnedEntries); entries.addAll(filteredEntries); // if entries have been filtered out then try to skip reading of already deletedMessages in that range - final Position nexReadPosition = entriesCount != filteredEntries.size() - ? cursor.getNextAvailablePosition(lastPosition) : lastPosition.getNext(); + final Position nexReadPosition = cursor.getNextAvailablePosition(lastPosition); updateReadPosition(nexReadPosition); checkReadCompletion(); } @@ -113,7 +115,7 @@ private void internalReadEntriesComplete(List returnedEntries) { @Override public void readEntriesComplete(List returnedEntries, Object ctx) { try { - internalReadEntriesComplete(returnedEntries); + internalReadEntriesComplete(returnedEntries, null); } catch (Throwable throwable) { log.error("[{}] Fallback to readEntriesFailed for exception in readEntriesComplete", this, throwable); readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx); @@ -123,16 +125,18 @@ public void readEntriesComplete(List returnedEntries, Object ctx) { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { try { - internalReadEntriesFailed(exception, ctx); + internalReadEntriesFailed(null, exception, ctx); } catch (Throwable throwable) { // At least we should complete the callback fail(ManagedLedgerException.getManagedLedgerException(throwable), ctx); } } - private void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) { + void internalReadEntriesFailed(Collection ret, ManagedLedgerException exception, Object ctx) { cursor.readOperationCompleted(); - + if (CollectionUtils.isNotEmpty(ret)) { + entries.addAll(ret); + } if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them complete(ctx); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 522e12903201c..61da2a770d1df 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -5011,7 +5011,7 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception { }) .when(ledger) .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), - Mockito.anyLong(), Mockito.any(), Mockito.any()); + Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -5117,7 +5117,7 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws }) .when(ledger) .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), - Mockito.anyLong(), Mockito.any(), Mockito.any()); + Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 32f7bf6b77956..514b30ae770c7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -48,6 +48,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.concurrent.DefaultThreadFactory; +import it.unimi.dsi.fastutil.longs.LongAVLTreeSet; +import it.unimi.dsi.fastutil.longs.LongLongPair; +import it.unimi.dsi.fastutil.longs.LongSortedSet; import java.lang.reflect.Field; import java.nio.ReadOnlyBufferException; import java.nio.charset.Charset; @@ -5337,4 +5340,193 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), expectedProperties); } + + @Test + public void testToRanges() { + LongSortedSet set = new LongAVLTreeSet(); + set.add(1L); + set.add(2L); + set.add(4L); + set.add(6L); + set.add(7L); + set.add(8L); + set.add(10L); + + List ranges = ManagedLedgerImpl.toRanges(set); + assertEquals(ranges.size(), 4); + + LongLongPair pair0 = ranges.get(0); + assertEquals(pair0.firstLong(), 1L); + assertEquals(pair0.secondLong(), 2L); + + LongLongPair pair1 = ranges.get(1); + assertEquals(pair1.firstLong(), 4L); + assertEquals(pair1.secondLong(), 4L); + + LongLongPair pair2 = ranges.get(2); + assertEquals(pair2.firstLong(), 6L); + assertEquals(pair2.secondLong(), 8L); + + LongLongPair pair3 = ranges.get(3); + assertEquals(pair3.firstLong(), 10L); + assertEquals(pair3.secondLong(), 10L); + } + + + @Test + public void testBatchReadEntriesCallback() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testBatchReadEntriesCallback"); + @Cleanup + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor"); + for (int i = 0; i < 10; i++) { + ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + List entries = new ArrayList<>(); + OpReadEntry opReadEntry = OpReadEntry.create(cursor, cursor.readPosition, 10, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries0, Object ctx) { + entries.addAll(entries0); + latch.countDown(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, ledger.lastConfirmedEntry, position -> position.getEntryId() % 2 == 0, true); + + LongSortedSet entryIds = new LongAVLTreeSet(); + entryIds.add(1L); + entryIds.add(3L); + entryIds.add(5L); + entryIds.add(7L); + entryIds.add(9L); + ManagedLedgerImpl.BatchReadEntriesCallback callback = new ManagedLedgerImpl + .BatchReadEntriesCallback(entryIds, opReadEntry, null); + long ledgerId = ledger.currentLedger.getId(); + + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null); + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[3])), null); + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 7, new byte[7])), null); + callback.readEntriesFailed( + new ManagedLedgerException.InvalidCursorPositionException("Invalid cursor position"), null); + // After call readEntriesFailed, the following readEntriesComplete should be ignored. + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[5])), null); + + latch.await(); + // should not fail + assertFalse(failed.get()); + assertEquals(entries.size(), 2); + + // `entries` should be only the entries with entryId 1 and 3. + assertEquals(entries.get(0).getEntryId(), 1); + assertEquals(entries.get(1).getEntryId(), 3); + + // ReadPosition should be updated to [4] + assertEquals(cursor.getReadPosition().getEntryId(), 4); + } + + @Test + public void testReadEntriesFromDifferentLedgersWithSkipCondition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(5); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config); + ledger = Mockito.spy(ledger); + + AtomicInteger counter = new AtomicInteger(); + Mockito.doAnswer(inv -> { + counter.incrementAndGet(); + return inv.callRealMethod(); + }).when(ledger).asyncReadEntries(Mockito.any()); + @Cleanup + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor"); + + Position lastPosition = null; + for (int i = 0; i < 12; i++) { + lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + List entries = new ArrayList<>(); + cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries0, Object ctx) { + entries.addAll(entries0); + latch.countDown(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0); + + latch.await(); + assertFalse(failed.get()); + assertEquals(entries.size(), 5); + // Read entries from 3 ledgers, the counter is 3. + assertEquals(counter.get(), 3); + Position readPosition = cursor.getReadPosition(); + assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId() + && readPosition.getEntryId() == lastPosition.getEntryId() + 1); + } + + @Test + public void testReadEntriesFromOneSameLedgerWithSkipCondition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config); + ledger = Mockito.spy(ledger); + + AtomicInteger counter = new AtomicInteger(); + Mockito.doAnswer(inv -> { + counter.incrementAndGet(); + return inv.callRealMethod(); + }).when(ledger).asyncReadEntries(Mockito.any()); + + @Cleanup + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor"); + + Position lastPosition = null; + for (int i = 0; i < 10; i++) { + lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + List entries = new ArrayList<>(); + cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries0, Object ctx) { + entries.addAll(entries0); + latch.countDown(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0); + + latch.await(); + assertEquals(counter.get(), 1); + + assertFalse(failed.get()); + assertEquals(entries.size(), 5); + + Position readPosition = cursor.getReadPosition(); + assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId() + && readPosition.getEntryId() == lastPosition.getEntryId() + 1); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 962749fbd49e9..4577db8c4167e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -218,7 +218,7 @@ public void testRedelivery(boolean useOpenRangeSet) throws Exception { assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0); // markDelete position should be one position behind read position - assertEquals(cursor.getReadPosition(), cursor.getMarkDeletedPosition().getNext()); + assertEquals(cursor.getReadPosition(), cursor.getNextAvailablePosition(cursor.getMarkDeletedPosition())); producer.close(); consumer2.close();