From 316d0ee1d340d97c845dfa064c1a25979c6899b2 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 16:37:48 -0700 Subject: [PATCH] [fix][test] Fix flaky AdminApiTest.persistentTopicsCursorResetAfterReset timeout The test calls `consumer.receive()` (no timeout) inside a loop after `admin.topics().resetCursor(...)`. If the broker doesn't push the expected redelivery in time, the consumer blocks indefinitely on `GrowableArrayBlockingQueue.take()` until the 5-minute test timeout fires: ``` ThreadTimeoutException: Method o.a.p.b.admin.AdminApiTest.persistentTopicsCursorResetAfterReset() didn't finish within the time-out 300000 at j.u.c.locks.LockSupport.park(LockSupport.java:371) at o.a.p.c.u.collections.GrowableArrayBlockingQueue.take(...) at o.a.p.c.impl.ConsumerImpl.internalReceive(ConsumerImpl.java:531) at o.a.p.c.impl.ConsumerBase.receive(ConsumerBase.java:282) at AdminApiTest.persistentTopicsCursorResetAfterReset(:2945) ``` Switch every `consumer.receive()` in this test to `receive(30, TimeUnit.SECONDS)` and assert the message is non-null with a descriptive message. Now if the redelivery genuinely doesn't arrive, the test fails fast (in 30s) with a clear diagnostic instead of hanging for the full 5-minute timeout. --- .../org/apache/pulsar/broker/admin/AdminApiTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index f01eb69ea4b9e..60375f9d335ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2932,7 +2932,8 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep }); for (int i = 0; i < 10; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(30, TimeUnit.SECONDS); + assertNotNull(message, "Failed to receive initial message-" + i); consumer.acknowledge(message); } @@ -2942,7 +2943,9 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep // Should received messages from 5-9 for (int i = 5; i < 10; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(30, TimeUnit.SECONDS); + assertNotNull(message, + "Failed to receive message-" + i + " after first resetCursor (firstTimestamp)"); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; @@ -2956,7 +2959,9 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep // Should received messages from 8-9 for (int i = 8; i < 10; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(30, TimeUnit.SECONDS); + assertNotNull(message, + "Failed to receive message-" + i + " after second resetCursor (secondTimestamp)"); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i;