Skip to content

Commit a3d654c

Browse files
committed
fix crash
1 parent bda51d6 commit a3d654c

4 files changed

Lines changed: 79 additions & 5 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
622622
if (state == Closing || state == Closed) {
623623
return;
624624
}
625+
if (!listenerExecutor_) {
626+
LOG_ERROR(getName() << " listenerExecutor_ is null, discarding message to avoid null dereference");
627+
increaseAvailablePermits(cnx);
628+
return;
629+
}
625630
uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
626631
if (ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
627632
LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer.");
@@ -663,8 +668,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
663668
return;
664669
}
665670
// Trigger message listener callback in a separate thread
666-
while (numOfMessageReceived--) {
667-
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
671+
if (listenerExecutor_) {
672+
while (numOfMessageReceived--) {
673+
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
674+
}
668675
}
669676
}
670677
}
@@ -713,8 +720,12 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {
713720

714721
// has pending receive, direct callback.
715722
if (asyncReceivedWaiting) {
716-
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
717-
get_shared_this_ptr(), ResultOk, msg, callback));
723+
if (listenerExecutor_) {
724+
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
725+
get_shared_this_ptr(), ResultOk, msg, callback));
726+
} else {
727+
notifyPendingReceivedCallback(ResultOk, msg, callback);
728+
}
718729
return;
719730
}
720731

lib/ExecutorService.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#include "ExecutorService.h"
2020

21+
#include <algorithm>
22+
2123
#include "LogUtils.h"
2224
#include "TimeUtils.h"
2325
DECLARE_LOG_OBJECT()
@@ -130,9 +132,12 @@ void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_c
130132
/////////////////////
131133

132134
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
133-
: executors_(nthreads), executorIdx_(0), mutex_() {}
135+
: executors_(std::max(1, nthreads)), executorIdx_(0), mutex_() {}
134136

135137
ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
138+
if (executors_.empty()) {
139+
return nullptr;
140+
}
136141
idx %= executors_.size();
137142
Lock lock(mutex_);
138143

tests/ReaderTest.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
#include <gtest/gtest.h>
2020
#include <pulsar/Client.h>
21+
#include <pulsar/ClientConfiguration.h>
2122
#include <pulsar/Reader.h>
2223
#include <time.h>
2324

@@ -955,5 +956,50 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
955956
assertStartMessageId(false, secondMsgId);
956957
}
957958

959+
// Regression test for segfault when Reader is used with messageListenerThreads=0.
960+
// Verifies ExecutorServiceProvider(0) does not cause undefined behavior and
961+
// ConsumerImpl::messageReceived does not dereference null listenerExecutor_.
962+
TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
963+
ClientConfiguration clientConf;
964+
clientConf.setMessageListenerThreads(0);
965+
Client client(serviceUrl, clientConf);
966+
967+
const std::string topicName =
968+
"testReaderWithZeroMessageListenerThreads-" + std::to_string(time(nullptr));
969+
970+
Producer producer;
971+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
972+
973+
ReaderConfiguration readerConf;
974+
Reader reader;
975+
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
976+
977+
constexpr int numMessages = 5;
978+
for (int i = 0; i < numMessages; i++) {
979+
Message msg = MessageBuilder().setContent("msg-" + std::to_string(i)).build();
980+
ASSERT_EQ(ResultOk, producer.send(msg));
981+
}
982+
983+
int received = 0;
984+
for (int i = 0; i < numMessages + 2; i++) {
985+
bool hasMessageAvailable = false;
986+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
987+
if (!hasMessageAvailable) {
988+
break;
989+
}
990+
Message msg;
991+
Result res = reader.readNext(msg, 3000);
992+
ASSERT_EQ(ResultOk, res) << "readNext failed at iteration " << i;
993+
std::string content = msg.getDataAsString();
994+
EXPECT_EQ("msg-" + std::to_string(received), content);
995+
++received;
996+
}
997+
EXPECT_EQ(received, numMessages);
998+
999+
producer.close();
1000+
reader.close();
1001+
client.close();
1002+
}
1003+
9581004
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
9591005
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));

tests/RetryableOperationCacheTest.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <chrono>
2323
#include <stdexcept>
2424

25+
#include "lib/ExecutorService.h"
2526
#include "lib/RetryableOperationCache.h"
2627

2728
namespace pulsar {
@@ -82,6 +83,17 @@ class RetryableOperationCacheTest : public ::testing::Test {
8283

8384
using namespace pulsar;
8485

86+
// Regression test: ExecutorServiceProvider(0) must not cause undefined behavior (e.g. idx % 0).
87+
// After fix, nthreads is clamped to at least 1, so get() returns a valid executor.
88+
TEST(ExecutorServiceProviderTest, ZeroThreadsReturnsValidExecutor) {
89+
ExecutorServiceProviderPtr provider = std::make_shared<ExecutorServiceProvider>(0);
90+
for (int i = 0; i < 3; i++) {
91+
ExecutorServicePtr executor = provider->get();
92+
ASSERT_NE(executor, nullptr) << "get() must not return null when created with 0 threads (clamped to 1)";
93+
}
94+
provider->close();
95+
}
96+
8597
TEST_F(RetryableOperationCacheTest, testRetry) {
8698
auto cache = RetryableOperationCache<int>::create(provider_, std::chrono::seconds(30));
8799
for (int i = 0; i < 10; i++) {

0 commit comments

Comments
 (0)