diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 7f6994191875e..9753e68dfb16d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -21,6 +21,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask; +import static org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.newInFlightTaskCtx; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleGaugeValue; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.mockito.ArgumentMatchers.eq; @@ -663,7 +664,9 @@ public void testReplicatorClearBacklog() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); + Object inFlightTaskCtx = newInFlightTaskCtx(replicator, PositionFactory.create(1, 1), 1); + replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), + inFlightTaskCtx); replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage @@ -693,7 +696,9 @@ public void testReplicatorExpireMsgAsync() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); + Object inFlightTaskCtx = newInFlightTaskCtx(replicator, PositionFactory.create(1, 1), 1); + replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), + inFlightTaskCtx); replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java index 8c26aece76440..c52b9c6896b75 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.Position; import org.awaitility.Awaitility; public class BrokerServicePersistInternalMethodInvoker { @@ -42,4 +43,8 @@ public static void ensureNoBacklogByInflightTask(PersistentReplicator replicator return true; }); } + + public static Object newInFlightTaskCtx(PersistentReplicator replicator, Position readPos, int readingEntries) { + return new PersistentReplicator.InFlightTask(readPos, readingEntries, replicator.getReplicatorId()); + } }