Skip to content

Commit cb82141

Browse files
authored
Fix ack non-persistent topic will be blocked. (#240)
1 parent fc6dbe5 commit cb82141

2 files changed

Lines changed: 30 additions & 3 deletions

File tree

lib/AckGroupingTracker.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,25 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
7171
* @param[in] msgId ID of the message to be ACKed.
7272
* @param[in] callback the callback that is triggered when the message is acknowledged
7373
*/
74-
virtual void addAcknowledge(const MessageId& msgId, ResultCallback callback) {}
74+
virtual void addAcknowledge(const MessageId& msgId, ResultCallback callback) { callback(ResultOk); }
7575

7676
/**
7777
* Adding message ID list into ACK group for individual ACK.
7878
* @param[in] msgIds of the message to be ACKed.
7979
* @param[in] callback the callback that is triggered when the messages are acknowledged
8080
*/
81-
virtual void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback callback) {}
81+
virtual void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback callback) {
82+
callback(ResultOk);
83+
}
8284

8385
/**
8486
* Adding message ID into ACK group for cumulative ACK.
8587
* @param[in] msgId ID of the message to be ACKed.
8688
* @param[in] callback the callback that is triggered when the message is acknowledged
8789
*/
88-
virtual void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {}
90+
virtual void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {
91+
callback(ResultOk);
92+
}
8993

9094
/**
9195
* Flush all the pending grouped ACKs (as flush() does), and stop period ACKs sending.

tests/ConsumerTest.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,29 @@ TEST(ConsumerTest, testNegativeAcksTrackerClose) {
11181118
client.close();
11191119
}
11201120

1121+
TEST(ConsumerTest, testAckNotPersistentTopic) {
1122+
Client client(lookupUrl);
1123+
auto topicName = "non-persistent://public/default/testAckNotPersistentTopic";
1124+
1125+
Consumer consumer;
1126+
client.subscribe(topicName, "test-sub", consumer);
1127+
1128+
Producer producer;
1129+
client.createProducer(topicName, producer);
1130+
1131+
for (int i = 0; i < 10; ++i) {
1132+
producer.send(MessageBuilder().setContent(std::to_string(i)).build());
1133+
}
1134+
1135+
Message msg;
1136+
for (int i = 0; i < 10; ++i) {
1137+
ASSERT_EQ(ResultOk, consumer.receive(msg));
1138+
ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
1139+
}
1140+
1141+
client.close();
1142+
}
1143+
11211144
INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
11221145

11231146
} // namespace pulsar

0 commit comments

Comments
 (0)