Skip to content

Commit

Permalink
[fix] [python client] Better Python garbage collection management for…
Browse files Browse the repository at this point in the history
… C++-owned objects (#16535)

Fixes apache/pulsar#16527
  • Loading branch information
zbentley authored Sep 22, 2022
1 parent ed3698e commit ede2e97
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 48 deletions.
13 changes: 11 additions & 2 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,7 @@ def __init__(self, service_url,
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
conf.set_logger(self._prepare_logger(logger) if logger else None)
if listener_name:
conf.listener_name(listener_name)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
Expand All @@ -476,6 +475,16 @@ def __init__(self, service_url,
self._client = _pulsar.Client(service_url, conf)
self._consumers = []

@staticmethod
def _prepare_logger(logger):
import logging
def log(level, message):
old_threads = logging.logThreads
logging.logThreads = False
logger.log(logging.getLevelName(level), message)
logging.logThreads = old_threads
return log

def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
Expand Down
31 changes: 31 additions & 0 deletions pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#


import threading
import logging
from unittest import TestCase, main
import time
import os
Expand Down Expand Up @@ -1249,6 +1251,35 @@ def test_json_schema_encode(self):
second_encode = schema.encode(record)
self.assertEqual(first_encode, second_encode)

def test_logger_thread_leaks(self):
def _do_connect(close):
logger = logging.getLogger(str(threading.current_thread().ident))
logger.setLevel(logging.INFO)
client = pulsar.Client(
service_url="pulsar://localhost:6650",
io_threads=4,
message_listener_threads=4,
operation_timeout_seconds=1,
log_conf_file_path=None,
authentication=None,
logger=logger,
)
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
if close:
client.close()

for should_close in (True, False):
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close))
_do_connect(should_close)
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close))
threads = []
for _ in range(10):
threads.append(threading.Thread(target=_do_connect, args=(should_close)))
threads[-1].start()
for thread in threads:
thread.join()
assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close)

def test_chunking(self):
client = Client(self.serviceUrl)
data_size = 10 * 1024 * 1024
Expand Down
65 changes: 19 additions & 46 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,94 +93,67 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
return conf;
}

class LoggerWrapper : public Logger {
PyObject* const _pyLogger;
const int _pythonLogLevel;
class LoggerWrapper : public Logger, public CaptivePythonObjectMixin {
const std::unique_ptr<Logger> _fallbackLogger;

static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); }

public:
LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
: _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) {
Py_XINCREF(_pyLogger);
}
LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger)
: CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) {}

LoggerWrapper(const LoggerWrapper&) = delete;
LoggerWrapper(LoggerWrapper&&) noexcept = delete;
LoggerWrapper& operator=(const LoggerWrapper&) = delete;
LoggerWrapper& operator=(LoggerWrapper&&) = delete;

virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }

bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; }
bool isEnabled(Level level) {
return true; // Python loggers are always enabled; they decide internally whether or not to log.
}

void log(Level level, int line, const std::string& message) {
if (!Py_IsInitialized()) {
// Python logger is unavailable - fallback to console logger
_fallbackLogger->log(level, line, message);
} else {
PyGILState_STATE state = PyGILState_Ensure();

PyObject *type, *value, *traceback;
PyErr_Fetch(&type, &value, &traceback);
try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
py::call<void>(_captive, "DEBUG", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
py::call<void>(_captive, "INFO", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
py::call<void>(_captive, "WARNING", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
py::call<void>(_captive, "ERROR", message.c_str());
break;
}

} catch (const py::error_already_set& e) {
PyErr_Print();
_fallbackLogger->log(level, line, message);
}

PyErr_Restore(type, value, traceback);
PyGILState_Release(state);
}
}
};

class LoggerWrapperFactory : public LoggerFactory {
class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin {
std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
PyObject* _pyLogger;
Optional<int> _pythonLogLevel{Optional<int>::empty()};

void initializePythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();

try {
int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
_pythonLogLevel = Optional<int>::of(level);
} catch (const py::error_already_set& e) {
// Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
_pythonLogLevel = Optional<int>::empty();
}

PyGILState_Release(state);
}

public:
LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
initializePythonLogLevel();
}

virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }
LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger.ptr()) {}

Logger* getLogger(const std::string& fileName) {
const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
if (_pythonLogLevel.is_present()) {
return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
} else {
if (_captive == py::object().ptr()) {
return fallbackLogger;
} else {
return new LoggerWrapper(_captive, fallbackLogger);
}
}
};
Expand Down
20 changes: 20 additions & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,23 @@ struct CryptoKeyReaderWrapper {
CryptoKeyReaderWrapper();
CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath);
};

class CaptivePythonObjectMixin {
protected:
PyObject* _captive;

CaptivePythonObjectMixin(PyObject* captive) {
_captive = captive;
PyGILState_STATE state = PyGILState_Ensure();
Py_XINCREF(_captive);
PyGILState_Release(state);
}

~CaptivePythonObjectMixin() {
if (Py_IsInitialized()) {
PyGILState_STATE state = PyGILState_Ensure();
Py_XDECREF(_captive);
PyGILState_Release(state);
}
}
};

0 comments on commit ede2e97

Please sign in to comment.