diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index f01eb69ea4b9e..60375f9d335ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2932,7 +2932,8 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep }); for (int i = 0; i < 10; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(30, TimeUnit.SECONDS); + assertNotNull(message, "Failed to receive initial message-" + i); consumer.acknowledge(message); } @@ -2942,7 +2943,9 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep // Should received messages from 5-9 for (int i = 5; i < 10; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(30, TimeUnit.SECONDS); + assertNotNull(message, + "Failed to receive message-" + i + " after first resetCursor (firstTimestamp)"); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; @@ -2956,7 +2959,9 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep // Should received messages from 8-9 for (int i = 8; i < 10; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(30, TimeUnit.SECONDS); + assertNotNull(message, + "Failed to receive message-" + i + " after second resetCursor (secondTimestamp)"); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i;