Skip to content

Commit ede2e97

Browse files
authored
[fix] [python client] Better Python garbage collection management for C++-owned objects (#16535)
Fixes apache/pulsar#16527
1 parent ed3698e commit ede2e97

4 files changed

Lines changed: 81 additions & 48 deletions

File tree

pulsar/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,8 +461,7 @@ def __init__(self, service_url,
461461
conf.concurrent_lookup_requests(concurrent_lookup_requests)
462462
if log_conf_file_path:
463463
conf.log_conf_file_path(log_conf_file_path)
464-
if logger:
465-
conf.set_logger(logger)
464+
conf.set_logger(self._prepare_logger(logger) if logger else None)
466465
if listener_name:
467466
conf.listener_name(listener_name)
468467
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
@@ -476,6 +475,16 @@ def __init__(self, service_url,
476475
self._client = _pulsar.Client(service_url, conf)
477476
self._consumers = []
478477

478+
@staticmethod
479+
def _prepare_logger(logger):
480+
import logging
481+
def log(level, message):
482+
old_threads = logging.logThreads
483+
logging.logThreads = False
484+
logger.log(logging.getLevelName(level), message)
485+
logging.logThreads = old_threads
486+
return log
487+
479488
def create_producer(self, topic,
480489
producer_name=None,
481490
schema=schema.BytesSchema(),

pulsar_test.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#
2020

2121

22+
import threading
23+
import logging
2224
from unittest import TestCase, main
2325
import time
2426
import os
@@ -1249,6 +1251,35 @@ def test_json_schema_encode(self):
12491251
second_encode = schema.encode(record)
12501252
self.assertEqual(first_encode, second_encode)
12511253

1254+
def test_logger_thread_leaks(self):
1255+
def _do_connect(close):
1256+
logger = logging.getLogger(str(threading.current_thread().ident))
1257+
logger.setLevel(logging.INFO)
1258+
client = pulsar.Client(
1259+
service_url="pulsar://localhost:6650",
1260+
io_threads=4,
1261+
message_listener_threads=4,
1262+
operation_timeout_seconds=1,
1263+
log_conf_file_path=None,
1264+
authentication=None,
1265+
logger=logger,
1266+
)
1267+
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
1268+
if close:
1269+
client.close()
1270+
1271+
for should_close in (True, False):
1272+
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close))
1273+
_do_connect(should_close)
1274+
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close))
1275+
threads = []
1276+
for _ in range(10):
1277+
threads.append(threading.Thread(target=_do_connect, args=(should_close)))
1278+
threads[-1].start()
1279+
for thread in threads:
1280+
thread.join()
1281+
assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close)
1282+
12521283
def test_chunking(self):
12531284
client = Client(self.serviceUrl)
12541285
data_size = 10 * 1024 * 1024

src/config.cc

Lines changed: 19 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -93,94 +93,67 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
9393
return conf;
9494
}
9595

96-
class LoggerWrapper : public Logger {
97-
PyObject* const _pyLogger;
98-
const int _pythonLogLevel;
96+
class LoggerWrapper : public Logger, public CaptivePythonObjectMixin {
9997
const std::unique_ptr<Logger> _fallbackLogger;
10098

101-
static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); }
102-
10399
public:
104-
LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
105-
: _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) {
106-
Py_XINCREF(_pyLogger);
107-
}
100+
LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger)
101+
: CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) {}
108102

109103
LoggerWrapper(const LoggerWrapper&) = delete;
110104
LoggerWrapper(LoggerWrapper&&) noexcept = delete;
111105
LoggerWrapper& operator=(const LoggerWrapper&) = delete;
112106
LoggerWrapper& operator=(LoggerWrapper&&) = delete;
113107

114-
virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }
115-
116-
bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; }
108+
bool isEnabled(Level level) {
109+
return true; // Python loggers are always enabled; they decide internally whether or not to log.
110+
}
117111

118112
void log(Level level, int line, const std::string& message) {
119113
if (!Py_IsInitialized()) {
120114
// Python logger is unavailable - fallback to console logger
121115
_fallbackLogger->log(level, line, message);
122116
} else {
123117
PyGILState_STATE state = PyGILState_Ensure();
124-
118+
PyObject *type, *value, *traceback;
119+
PyErr_Fetch(&type, &value, &traceback);
125120
try {
126121
switch (level) {
127122
case Logger::LEVEL_DEBUG:
128-
py::call_method<void>(_pyLogger, "debug", message.c_str());
123+
py::call<void>(_captive, "DEBUG", message.c_str());
129124
break;
130125
case Logger::LEVEL_INFO:
131-
py::call_method<void>(_pyLogger, "info", message.c_str());
126+
py::call<void>(_captive, "INFO", message.c_str());
132127
break;
133128
case Logger::LEVEL_WARN:
134-
py::call_method<void>(_pyLogger, "warning", message.c_str());
129+
py::call<void>(_captive, "WARNING", message.c_str());
135130
break;
136131
case Logger::LEVEL_ERROR:
137-
py::call_method<void>(_pyLogger, "error", message.c_str());
132+
py::call<void>(_captive, "ERROR", message.c_str());
138133
break;
139134
}
140-
141135
} catch (const py::error_already_set& e) {
136+
PyErr_Print();
142137
_fallbackLogger->log(level, line, message);
143138
}
144-
139+
PyErr_Restore(type, value, traceback);
145140
PyGILState_Release(state);
146141
}
147142
}
148143
};
149144

150-
class LoggerWrapperFactory : public LoggerFactory {
145+
class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin {
151146
std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
152-
PyObject* _pyLogger;
153-
Optional<int> _pythonLogLevel{Optional<int>::empty()};
154-
155-
void initializePythonLogLevel() {
156-
PyGILState_STATE state = PyGILState_Ensure();
157-
158-
try {
159-
int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
160-
_pythonLogLevel = Optional<int>::of(level);
161-
} catch (const py::error_already_set& e) {
162-
// Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
163-
_pythonLogLevel = Optional<int>::empty();
164-
}
165-
166-
PyGILState_Release(state);
167-
}
168147

169148
public:
170-
LoggerWrapperFactory(py::object pyLogger) {
171-
_pyLogger = pyLogger.ptr();
172-
Py_XINCREF(_pyLogger);
173-
initializePythonLogLevel();
174-
}
175-
176-
virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }
149+
LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger.ptr()) {}
177150

178151
Logger* getLogger(const std::string& fileName) {
179152
const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
180-
if (_pythonLogLevel.is_present()) {
181-
return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
182-
} else {
153+
if (_captive == py::object().ptr()) {
183154
return fallbackLogger;
155+
} else {
156+
return new LoggerWrapper(_captive, fallbackLogger);
184157
}
185158
}
186159
};

src/utils.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,23 @@ struct CryptoKeyReaderWrapper {
8282
CryptoKeyReaderWrapper();
8383
CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath);
8484
};
85+
86+
class CaptivePythonObjectMixin {
87+
protected:
88+
PyObject* _captive;
89+
90+
CaptivePythonObjectMixin(PyObject* captive) {
91+
_captive = captive;
92+
PyGILState_STATE state = PyGILState_Ensure();
93+
Py_XINCREF(_captive);
94+
PyGILState_Release(state);
95+
}
96+
97+
~CaptivePythonObjectMixin() {
98+
if (Py_IsInitialized()) {
99+
PyGILState_STATE state = PyGILState_Ensure();
100+
Py_XDECREF(_captive);
101+
PyGILState_Release(state);
102+
}
103+
}
104+
};

0 commit comments

Comments
 (0)