Skip to content

Commit 0efe86b

Browse files
committed
[fix] Fix deadlock when closing the partitioned producer
1 parent 8990b93 commit 0efe86b

3 files changed

Lines changed: 57 additions & 40 deletions

File tree

lib/PartitionedProducerImpl.cc

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -267,73 +267,54 @@ void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
267267
}
268268
};
269269

270-
if (state_ == Closed || state_.exchange(Closing) == Closing) {
270+
if (state_ == Closed || state_ == Failed || state_.exchange(Closing) == Closing) {
271271
closeCallback(ResultAlreadyClosed);
272272
return;
273273
}
274274

275-
cancelTimers();
275+
auto remainProducersCount = std::make_shared<std::atomic<unsigned long>>(producers_.size());
276+
277+
auto subProducerClosedCallback = [remainProducersCount, closeCallback](unsigned int partitions,
278+
Result result) {
279+
(*remainProducersCount)--;
280+
LOG_DEBUG("Closed Producer for partition "
281+
<< partitions << " successfully. Remain producers to be closed: " << *remainProducersCount);
282+
if (!*remainProducersCount) {
283+
closeCallback(result);
284+
}
285+
};
276286

277-
unsigned int producerAlreadyClosed = 0;
287+
cancelTimers();
278288

279289
// Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
280290
// when `state_` is Ready
281291
for (auto& producer : producers_) {
282292
if (!producer->isClosed()) {
283293
auto self = shared_from_this();
284294
const auto partition = static_cast<unsigned int>(producer->partition());
285-
producer->closeAsync([this, self, partition, closeCallback](Result result) {
286-
handleSinglePartitionProducerClose(result, partition, closeCallback);
295+
producer->closeAsync([this, self, partition, subProducerClosedCallback](Result result) {
296+
handleSinglePartitionProducerClose(result, partition,
297+
[partition, subProducerClosedCallback](Result result) {
298+
subProducerClosedCallback(partition, result);
299+
});
287300
});
288301
} else {
289-
producerAlreadyClosed++;
302+
subProducerClosedCallback(producer->partition(), ResultOk);
290303
}
291304
}
292-
const auto numProducers = producers_.size();
293-
294-
/*
295-
* No need to set state since:-
296-
* a. If closeAsync before creation then state == Closed, since producers_.size() = producerAlreadyClosed
297-
* = 0
298-
* b. If closeAsync called after all sub partitioned producer connected -
299-
* handleSinglePartitionProducerClose handles the closing
300-
* c. If closeAsync called due to failure in creating just one sub producer then state is set by
301-
* handleSinglePartitionProducerCreated
302-
*/
303-
if (producerAlreadyClosed == numProducers) {
304-
closeCallback(ResultOk);
305-
}
306305
}
307306

308-
// `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()`
309307
void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
310308
const unsigned int partitionIndex,
311309
CloseCallback callback) {
312-
if (state_ == Failed) {
313-
// we should have already notified the client by callback
314-
return;
315-
}
316310
if (result != ResultOk) {
317311
LOG_ERROR("Closing the producer failed for partition - " << partitionIndex);
318-
callback(result);
319312
state_ = Failed;
320-
return;
321-
}
322-
assert(partitionIndex < getNumPartitionsWithLock());
323-
if (numProducersCreated_ > 0) {
324-
numProducersCreated_--;
325-
}
326-
// closed all successfully
327-
if (!numProducersCreated_) {
328-
// set the producerCreatedPromise to failure, if client called
329-
// closeAsync and it's not failure to create producer, the promise
330-
// is set second time here, first time it was successful. So check
331-
// if there's any adverse effect of setting it again. It should not
332-
// be but must check. MUSTCHECK changeme
333-
partitionedProducerCreatedPromise_.setFailed(ResultUnknownError);
334313
callback(result);
335314
return;
336315
}
316+
assert(partitionIndex < getNumPartitionsWithLock());
317+
callback(result);
337318
}
338319

339320
// override

tests/ProducerTest.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <thread>
2323

2424
#include "HttpHelper.h"
25+
#include "PulsarFriend.h"
2526
#include "lib/Future.h"
2627
#include "lib/Latch.h"
2728
#include "lib/LogUtils.h"
@@ -467,4 +468,27 @@ TEST(ProducerTest, testCloseProducerBeforeCreated) {
467468
client.close();
468469
}
469470

471+
TEST(ProducerTest, testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUpdate) {
472+
const std::string topic = "public/default/partition-test" + std::to_string(time(nullptr));
473+
std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions";
474+
475+
int res = makePutRequest(topicOperateUrl, "2");
476+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
477+
478+
ClientConfiguration clientConf;
479+
clientConf.setPartititionsUpdateInterval(1);
480+
Client client(serviceUrl, clientConf);
481+
ProducerConfiguration conf;
482+
Producer producer;
483+
client.createProducer(topic, conf, producer);
484+
PartitionedProducerImpl& partitionedProducer = PulsarFriend::getPartitionedProducerImpl(producer);
485+
486+
// TODO: Replace by producer interceptor to reproduce the issue then we can remove
487+
// PulsarFriend::updatePartitions
488+
PulsarFriend::updatePartitions(partitionedProducer, 3);
489+
490+
producer.close();
491+
client.close();
492+
}
493+
470494
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

tests/PulsarFriend.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ class PulsarFriend {
7979
return *producerImpl;
8080
}
8181

82+
static PartitionedProducerImpl& getPartitionedProducerImpl(Producer producer) {
83+
PartitionedProducerImpl* partitionedProducer =
84+
static_cast<PartitionedProducerImpl*>(producer.impl_.get());
85+
return *partitionedProducer;
86+
}
87+
8288
static ProducerImpl& getInternalProducerImpl(Producer producer, int index) {
8389
PartitionedProducerImpl* producerImpl = static_cast<PartitionedProducerImpl*>(producer.impl_.get());
8490
return *(producerImpl->producers_[index]);
@@ -181,6 +187,12 @@ class PulsarFriend {
181187
static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; }
182188

183189
static std::shared_ptr<MessageIdImpl> getMessageIdImpl(MessageId& msgId) { return msgId.impl_; }
190+
191+
static void updatePartitions(PartitionedProducerImpl& partitionedProducer, int newPartitions) {
192+
LookupDataResultPtr lookupData = std::make_shared<LookupDataResult>();
193+
lookupData->setPartitions(newPartitions);
194+
partitionedProducer.handleGetPartitions(ResultOk, lookupData);
195+
}
184196
};
185197
} // namespace pulsar
186198

0 commit comments

Comments
 (0)