Skip to content

Commit 2018a06

Browse files
Add BatchedMessageIdImpl to acknowledge batched messages (#132)
* Add BatchedMessageIdImpl to acknowledge batched messages Fixes #130 ### Motivation It's a catchup for apache/pulsar#1424 ### Modifications Migrate `BitSet` implementation from JDK. Though we have `vector<bool>` or `boost::dynamic_bitset`, to support batch index ACK in future, it's better to have the same method to convert a bit set to the underlying long array. Add `BatchedMessageIdImpl` to maintain a `BatchMessageAcker`, which is shared by messages in the same batch. The acker is responsible to record which messages are acknowledged. Only if all messages in the batch are acknowledged will the message id be acknowledged. The stats for individual ACKs only updates after the whole batch are acknowledged. Before this PR, each time the single message is acknowledged, the stats increase by one.
1 parent 18dcdd5 commit 2018a06

16 files changed

Lines changed: 735 additions & 359 deletions

lib/BatchAcknowledgementTracker.cc

Lines changed: 0 additions & 174 deletions
This file was deleted.

lib/BatchAcknowledgementTracker.h

Lines changed: 0 additions & 106 deletions
This file was deleted.

lib/BatchMessageAcker.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <atomic>
22+
#include <memory>
23+
#include <mutex>
24+
25+
#include "BitSet.h"
26+
#include "ProtoApiEnums.h"
27+
28+
namespace pulsar {
29+
30+
class BatchMessageAcker;
31+
using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;
32+
33+
class BatchMessageAcker {
34+
public:
35+
using Lock = std::lock_guard<std::mutex>;
36+
37+
static BatchMessageAckerPtr create(int32_t batchSize) {
38+
return std::make_shared<BatchMessageAcker>(batchSize);
39+
}
40+
41+
BatchMessageAcker(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0, batchSize); }
42+
43+
bool ackIndividual(int32_t batchIndex) {
44+
Lock lock{mutex_};
45+
bitSet_.clear(batchIndex);
46+
return bitSet_.isEmpty();
47+
}
48+
49+
bool ackCumulative(int32_t batchIndex) {
50+
Lock lock{mutex_};
51+
// The range of cumulative acknowledgment is closed while BitSet::clear accepts a left-closed
52+
// right-open range.
53+
bitSet_.clear(0, batchIndex + 1);
54+
return bitSet_.isEmpty();
55+
}
56+
57+
bool shouldAckPreviousMessageId() noexcept {
58+
bool expectedValue = false;
59+
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
60+
}
61+
62+
private:
63+
BitSet bitSet_;
64+
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
65+
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to
66+
// determine whether to acknowledge the previous message id.
67+
std::atomic_bool prevBatchCumulativelyAcked_{false};
68+
mutable std::mutex mutex_;
69+
};
70+
71+
} // namespace pulsar

0 commit comments

Comments
 (0)