Skip to content

Commit c12340a

Browse files
committed
Revert stats related changes
1 parent bba4802 commit c12340a

6 files changed

Lines changed: 8 additions & 13 deletions

lib/ConsumerImpl.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,8 +1036,7 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCall
10361036
const auto& msgIdToAck = pair.first;
10371037
const auto& readyToAck = pair.second;
10381038
if (readyToAck) {
1039-
int numRemoved = unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
1040-
consumerStatsBasePtr_->messageAcknowledged(ResultOk, CommandAck_AckType_Cumulative, numRemoved);
1039+
unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
10411040
ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck);
10421041
}
10431042
if (callback) {

lib/UnAckedMessageTrackerDisabled.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class UnAckedMessageTrackerDisabled : public UnAckedMessageTrackerInterface {
2626
bool add(const MessageId& m) { return false; }
2727
bool remove(const MessageId& m) { return false; }
2828
void remove(const MessageIdList& msgIds) {}
29-
int removeMessagesTill(const MessageId& msgId) { return 0; }
29+
void removeMessagesTill(const MessageId& msgId) {}
3030
void removeTopicMessage(const std::string& topic) {}
3131

3232
void clear() {}

lib/UnAckedMessageTrackerEnabled.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,17 @@ long UnAckedMessageTrackerEnabled::size() {
137137
return messageIdPartitionMap.size();
138138
}
139139

140-
int UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
140+
void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
141141
std::lock_guard<std::recursive_mutex> acquire(lock_);
142-
int numRemoved = 0;
143142
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
144143
MessageId msgIdInMap = it->first;
145144
if (msgIdInMap <= msgId) {
146145
it->second.erase(msgIdInMap);
147146
messageIdPartitionMap.erase(it++);
148-
numRemoved++;
149147
} else {
150148
it++;
151149
}
152150
}
153-
return numRemoved;
154151
}
155152

156153
// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.

lib/UnAckedMessageTrackerEnabled.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
4242
bool add(const MessageId& msgId);
4343
bool remove(const MessageId& msgId);
4444
void remove(const MessageIdList& msgIds);
45-
int removeMessagesTill(const MessageId& msgId);
45+
void removeMessagesTill(const MessageId& msgId);
4646
void removeTopicMessage(const std::string& topic);
4747
void timeoutHandler();
4848

lib/UnAckedMessageTrackerInterface.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class UnAckedMessageTrackerInterface {
3131
virtual bool add(const MessageId& m) = 0;
3232
virtual bool remove(const MessageId& m) = 0;
3333
virtual void remove(const MessageIdList& msgIds) = 0;
34-
virtual int removeMessagesTill(const MessageId& msgId) = 0;
34+
virtual void removeMessagesTill(const MessageId& msgId) = 0;
3535
virtual void clear() = 0;
3636
// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's
3737
// message.

tests/AcknowledgeTest.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,15 +200,14 @@ TEST_F(AcknowledgeTest, testBatchedMessageId) {
200200
consumers[2].acknowledgeAndRedeliver({batchingMaxMessages + 1}, CommandAck_AckType_Cumulative);
201201
ASSERT_EQ(consumers[2].receive(msg), ResultOk);
202202
EXPECT_EQ(msg.getMessageId(), consumers[2].messageIdList()[batchingMaxMessages]);
203-
// NOTE: Currently the unacked message tracker doesn't count the batch size, so the result is the number
204-
// of entries (not messages)
205-
ASSERT_EQ(consumers[2].getNumAcked(CommandAck_AckType_Cumulative), 1);
203+
// TODO: Currently there is no stats for cumulative ACK
204+
ASSERT_EQ(consumers[2].getNumAcked(CommandAck_AckType_Cumulative), 0);
206205

207206
// the whole 2nd batch is acknowledged
208207
consumers[3].acknowledgeAndRedeliver({batchingMaxMessages + 2}, CommandAck_AckType_Cumulative);
209208
ASSERT_EQ(consumers[3].receive(msg), ResultOk);
210209
EXPECT_EQ(msg.getMessageId(), consumers[3].messageIdList()[batchingMaxMessages * 2]);
211-
ASSERT_EQ(consumers[3].getNumAcked(CommandAck_AckType_Cumulative), 2);
210+
ASSERT_EQ(consumers[3].getNumAcked(CommandAck_AckType_Cumulative), 0);
212211
}
213212

214213
INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));

0 commit comments

Comments
 (0)