From db1737d4d2188e6aa2cad4b15152867f32c2ed70 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Thu, 11 Jun 2026 11:50:46 +0200 Subject: [PATCH 1/3] Add logstorage for python logging --- .github/workflows/code_quality.yml | 5 +- .pre-commit-config.yaml | 6 ++ src/duckdb_py/CMakeLists.txt | 1 + .../duckdb_python/python_log_storage.hpp | 37 ++++++++ src/duckdb_py/pyconnection.cpp | 18 ++++ src/duckdb_py/python_log_storage.cpp | 92 +++++++++++++++++++ tests/fast/test_python_log_storage.py | 28 ++++++ 7 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 src/duckdb_py/include/duckdb_python/python_log_storage.hpp create mode 100644 src/duckdb_py/python_log_storage.cpp create mode 100644 tests/fast/test_python_log_storage.py diff --git a/.github/workflows/code_quality.yml b/.github/workflows/code_quality.yml index 575f6f5b..522c3811 100644 --- a/.github/workflows/code_quality.yml +++ b/.github/workflows/code_quality.yml @@ -32,7 +32,10 @@ jobs: uses: astral-sh/setup-uv@v7 with: version: "0.9.0" - python-version: "3.12" + # 3.13: the cmake-format pre-commit hook is pinned to python3.13 + # (cmakelang crashes under 3.14). Keeping this in sync means the hook + # resolves to the running interpreter instead of hunting PATH. + python-version: "3.13" - name: pre-commit (cache) uses: actions/cache@v4 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a3123470..022a5257 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,6 +29,12 @@ repos: rev: v0.6.13 hooks: - id: cmake-format + # cmakelang is unmaintained and crashes under Python 3.14 + # ("Cannot use capturing groups in re.Scanner"). Pin this hook's + # environment to 3.13 so it never picks up a 3.14 interpreter. The + # code_quality CI job provisions Python 3.13 to match, so the hook + # resolves to the running interpreter there. + language_version: python3.13 - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.18.2 diff --git a/src/duckdb_py/CMakeLists.txt b/src/duckdb_py/CMakeLists.txt index 3d06b062..a0c65873 100644 --- a/src/duckdb_py/CMakeLists.txt +++ b/src/duckdb_py/CMakeLists.txt @@ -19,6 +19,7 @@ add_library( importer.cpp map.cpp path_like.cpp + python_log_storage.cpp pyconnection.cpp pyexpression.cpp pyfilesystem.cpp diff --git a/src/duckdb_py/include/duckdb_python/python_log_storage.hpp b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp new file mode 100644 index 00000000..cf6e663d --- /dev/null +++ b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp @@ -0,0 +1,37 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb_python/python_log_storage.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/logging/log_storage.hpp" +#include "duckdb/logging/logging.hpp" + +namespace duckdb { + +class PythonLogStorage : public LogStorage { +public: + PythonLogStorage() = default; + ~PythonLogStorage() override = default; + + const string GetStorageName() override { + return "python_log_storage"; + } + + void WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, const string &log_message, + const RegisteredLoggingContext &context) override; + void WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) override; + void FlushAll() override { + } + void Flush(LoggingTargetTable table) override { + } + bool IsEnabled(LoggingTargetTable table) override { + return true; + } +}; + +} // namespace duckdb diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index 6fcbe0ac..4559ee85 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -35,6 +35,8 @@ #include "duckdb_python/numpy/numpy_type.hpp" #include "duckdb/main/prepared_statement.hpp" #include "duckdb_python/jupyter_progress_bar_display.hpp" +#include "duckdb_python/python_log_storage.hpp" +#include "duckdb/logging/log_manager.hpp" #include "duckdb_python/pyfilesystem.hpp" #include "duckdb/main/client_config.hpp" #include "duckdb/function/table/read_csv.hpp" @@ -2283,6 +2285,22 @@ shared_ptr DuckDBPyConnection::Connect(const py::object &dat auto res = FetchOrCreateInstance(database, config); auto &client_context = *res->con.GetConnection().context; SetDefaultConfigArguments(client_context); + { + auto &db_instance = *res->con.GetDatabase().instance; + auto &log_manager = db_instance.GetLogManager(); + auto storage = make_shared_ptr(); + shared_ptr storage_base = storage; + // RegisterLogStorage returns false if the name is already registered on this + // DatabaseInstance. Instances are cached and shared across connections/cursors, so + // only configure logging on the first registration. SetLogStorage/SetEnableLogging/ + // SetLogLevel are NOT idempotent — re-running them on every Connect() would silently + // stomp a user's explicit `SET enable_logging` / `SET logging_level` / storage choice. + if (log_manager.RegisterLogStorage("python_log_storage", storage_base)) { + log_manager.SetLogStorage(db_instance, "python_log_storage"); + log_manager.SetEnableLogging(true); + log_manager.SetLogLevel(LogLevel::LOG_WARNING); + } + } return res; } diff --git a/src/duckdb_py/python_log_storage.cpp b/src/duckdb_py/python_log_storage.cpp new file mode 100644 index 00000000..c2bf107c --- /dev/null +++ b/src/duckdb_py/python_log_storage.cpp @@ -0,0 +1,92 @@ +#include "duckdb_python/python_log_storage.hpp" +#include "duckdb_python/pybind11/pybind_wrapper.hpp" +#include "duckdb/common/types/data_chunk.hpp" +#include "duckdb/common/types/vector.hpp" +#include "duckdb/logging/logging.hpp" + +namespace duckdb { + +static int LogLevelToPython(LogLevel level) { + switch (level) { + case LogLevel::LOG_TRACE: + case LogLevel::LOG_DEBUG: + return 10; // logging.DEBUG + case LogLevel::LOG_INFO: + return 20; // logging.INFO + case LogLevel::LOG_WARNING: + return 30; // logging.WARNING + case LogLevel::LOG_ERROR: + return 40; // logging.ERROR + case LogLevel::LOG_FATAL: + return 50; // logging.CRITICAL + default: + return 30; + } +} + +static int LevelStringToPython(const string &level_str) { + if (level_str == "TRACE" || level_str == "DEBUG") { + return 10; + } + if (level_str == "INFO") { + return 20; + } + if (level_str == "WARNING") { + return 30; + } + if (level_str == "ERROR") { + return 40; + } + if (level_str == "FATAL") { + return 50; + } + return 30; +} + +// Both write methods run on engine worker threads and invoke arbitrary user Python (the +// handlers installed on the "duckdb" logger). The engine calls these directly from query +// binding/execution with NO surrounding try/catch (see LogManager::WriteLogEntry), so an +// exception escaping here would fail the user's query. Logging is a side effect — it must +// never do that. Hence every body swallows all exceptions. +// +// Note also that the engine holds LogManager::lock (a non-recursive mutex) across this call. +// A handler that re-enters DuckDB on the same thread and emits another log entry would +// self-deadlock on that lock — outside our control, but worth knowing. + +void PythonLogStorage::WriteLogEntry(timestamp_t, LogLevel level, const string &, const string &log_message, + const RegisteredLoggingContext &) { + if (!Py_IsInitialized()) { + return; // interpreter is finalizing — acquiring the GIL would crash + } + try { + py::gil_scoped_acquire gil; + auto logging = py::module::import("logging"); + auto logger = logging.attr("getLogger")("duckdb"); + logger.attr("log")(LogLevelToPython(level), log_message); + } catch (...) { + // Logging must not disrupt query execution. + } +} + +void PythonLogStorage::WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &) { + if (!Py_IsInitialized()) { + return; // interpreter is finalizing — acquiring the GIL would crash + } + try { + py::gil_scoped_acquire gil; + auto logging = py::module::import("logging"); + auto logger = logging.attr("getLogger")("duckdb"); + // DataChunk is in LOG_ENTRIES format: context_id, timestamp, type, log_level, message. + // log_level (idx 3) and message (idx 4) are both VARCHAR; the chunk is freshly + // allocated by the engine so the vectors are flat. + auto level_data = FlatVector::GetData(chunk.data[3]); + auto message_data = FlatVector::GetData(chunk.data[4]); + for (idx_t i = 0; i < chunk.size(); i++) { + logger.attr("log")(LevelStringToPython(level_data[i].GetString()), message_data[i].GetString()); + } + } catch (...) { + // Logging must not disrupt query execution. + } +} + +} // namespace duckdb diff --git a/tests/fast/test_python_log_storage.py b/tests/fast/test_python_log_storage.py new file mode 100644 index 00000000..8b121506 --- /dev/null +++ b/tests/fast/test_python_log_storage.py @@ -0,0 +1,28 @@ +import logging + +import duckdb + + +def test_warning_routed_to_python_logging(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + # Pin lambda_syntax to DEFAULT so the deprecated arrow (->) form reliably emits a + # DUCKDB_LOG_WARNING. DEFAULT is the current engine default, but it is explicitly + # slated to change ("before DuckDB's next release"); pinning keeps this test + # exercising the warning path across future submodule bumps. + con.execute("SET lambda_syntax='DEFAULT'") + con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)") + deprecation_records = [r for r in caplog.records if "deprecated" in r.message.lower()] + assert deprecation_records, "expected a deprecation warning routed to the 'duckdb' logger" + assert all(r.name == "duckdb" for r in deprecation_records) + assert all(r.levelno == logging.WARNING for r in deprecation_records) + + +def test_warning_not_emitted_for_clean_queries(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + con.execute("SELECT 1 + 1").fetchone() + # Assert the absence of the deprecation warning specifically rather than requiring zero + # records total — an incidental connect-time warning (e.g. the macOS Rosetta notice on + # some hardware) would otherwise make this flaky. + assert not [r for r in caplog.records if "deprecated" in r.message.lower()] From d799f2dfba74cced801aed7ce4839727eb79d578 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Thu, 11 Jun 2026 13:07:36 +0200 Subject: [PATCH 2/3] Subclass BufferingLogStorage --- .../duckdb_python/python_log_storage.hpp | 59 +++-- src/duckdb_py/pyconnection.cpp | 2 +- src/duckdb_py/python_log_storage.cpp | 128 ++++++----- tests/fast/test_python_log_storage.py | 212 ++++++++++++++++-- 4 files changed, 322 insertions(+), 79 deletions(-) diff --git a/src/duckdb_py/include/duckdb_python/python_log_storage.hpp b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp index cf6e663d..0fe000f0 100644 --- a/src/duckdb_py/include/duckdb_python/python_log_storage.hpp +++ b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp @@ -9,29 +9,60 @@ #pragma once #include "duckdb/logging/log_storage.hpp" -#include "duckdb/logging/logging.hpp" +#include "duckdb/common/map.hpp" +#include "duckdb/common/unique_ptr.hpp" namespace duckdb { -class PythonLogStorage : public LogStorage { +class ColumnDataCollection; +class DatabaseInstance; + +//! Scan state backing PythonLogStorage's in-memory buffers (so `duckdb_logs` can read them). +//! We define our own rather than reuse the engine's InMemoryLogStorageScanState to avoid +//! depending on whether that type's symbols are exported across platforms. +class PythonLogStorageScanState : public LogStorageScanState { +public: + explicit PythonLogStorageScanState(LoggingTargetTable table) : LogStorageScanState(table) { + } + ~PythonLogStorageScanState() override = default; + + ColumnDataScanState scan_state; +}; + +//! A composite log storage that does two things for every engine log entry: +//! 1. forwards it to Python's standard `logging` module (logging.getLogger("duckdb")), and +//! 2. retains it in-memory so `SELECT * FROM duckdb_logs` keeps working. +//! +//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed (and +//! therefore forwarded to Python) immediately, rather than batched until a 2048-entry buffer +//! fills — engine WARNINGs are sparse and must surface inline to be useful. +class PythonLogStorage : public BufferingLogStorage { public: - PythonLogStorage() = default; - ~PythonLogStorage() override = default; + explicit PythonLogStorage(DatabaseInstance &db); + ~PythonLogStorage() override; const string GetStorageName() override { return "python_log_storage"; } - void WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, const string &log_message, - const RegisteredLoggingContext &context) override; - void WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) override; - void FlushAll() override { - } - void Flush(LoggingTargetTable table) override { - } - bool IsEnabled(LoggingTargetTable table) override { - return true; - } + //! Single-threaded scan interface — mirrors InMemoryLogStorage so duckdb_logs can read us. + bool CanScan(LoggingTargetTable table) override; + unique_ptr CreateScanState(LoggingTargetTable table) const override; + bool Scan(LogStorageScanState &state, DataChunk &result) const override; + void InitializeScan(LogStorageScanState &state) const override; + +protected: + //! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) forwards it to Python. + void FlushChunk(LoggingTargetTable table, DataChunk &chunk) override; + //! Clears the in-memory buffers. + void ResetAllBuffers() override; + +private: + ColumnDataCollection &GetBuffer(LoggingTargetTable table) const; + //! Forwards each row of a LOG_ENTRIES chunk to logging.getLogger("duckdb"). Never throws. + void ForwardEntriesToPython(DataChunk &chunk); + + map> log_storage_buffers; }; } // namespace duckdb diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index 4559ee85..bae186c0 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -2288,7 +2288,7 @@ shared_ptr DuckDBPyConnection::Connect(const py::object &dat { auto &db_instance = *res->con.GetDatabase().instance; auto &log_manager = db_instance.GetLogManager(); - auto storage = make_shared_ptr(); + auto storage = make_shared_ptr(db_instance); shared_ptr storage_base = storage; // RegisterLogStorage returns false if the name is already registered on this // DatabaseInstance. Instances are cached and shared across connections/cursors, so diff --git a/src/duckdb_py/python_log_storage.cpp b/src/duckdb_py/python_log_storage.cpp index c2bf107c..41a21f96 100644 --- a/src/duckdb_py/python_log_storage.cpp +++ b/src/duckdb_py/python_log_storage.cpp @@ -1,74 +1,63 @@ #include "duckdb_python/python_log_storage.hpp" #include "duckdb_python/pybind11/pybind_wrapper.hpp" + +#include "duckdb/common/allocator.hpp" +#include "duckdb/common/exception.hpp" +#include "duckdb/common/types/column/column_data_collection.hpp" #include "duckdb/common/types/data_chunk.hpp" #include "duckdb/common/types/vector.hpp" -#include "duckdb/logging/logging.hpp" namespace duckdb { -static int LogLevelToPython(LogLevel level) { - switch (level) { - case LogLevel::LOG_TRACE: - case LogLevel::LOG_DEBUG: - return 10; // logging.DEBUG - case LogLevel::LOG_INFO: - return 20; // logging.INFO - case LogLevel::LOG_WARNING: - return 30; // logging.WARNING - case LogLevel::LOG_ERROR: - return 40; // logging.ERROR - case LogLevel::LOG_FATAL: - return 50; // logging.CRITICAL - default: - return 30; - } -} - +// Maps the engine's textual log level (stored as VARCHAR in the LOG_ENTRIES chunk) to the +// integer levels of Python's logging module. static int LevelStringToPython(const string &level_str) { if (level_str == "TRACE" || level_str == "DEBUG") { - return 10; + return 10; // logging.DEBUG } if (level_str == "INFO") { - return 20; + return 20; // logging.INFO } if (level_str == "WARNING") { - return 30; + return 30; // logging.WARNING } if (level_str == "ERROR") { - return 40; + return 40; // logging.ERROR } if (level_str == "FATAL") { - return 50; + return 50; // logging.CRITICAL } return 30; } -// Both write methods run on engine worker threads and invoke arbitrary user Python (the -// handlers installed on the "duckdb" logger). The engine calls these directly from query -// binding/execution with NO surrounding try/catch (see LogManager::WriteLogEntry), so an -// exception escaping here would fail the user's query. Logging is a side effect — it must -// never do that. Hence every body swallows all exceptions. -// -// Note also that the engine holds LogManager::lock (a non-recursive mutex) across this call. -// A handler that re-enters DuckDB on the same thread and emits another log entry would -// self-deadlock on that lock — outside our control, but worth knowing. +PythonLogStorage::PythonLogStorage(DatabaseInstance &db) : BufferingLogStorage(db, 1, true) { + log_storage_buffers[LoggingTargetTable::LOG_ENTRIES] = + make_uniq(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_ENTRIES)); + log_storage_buffers[LoggingTargetTable::LOG_CONTEXTS] = + make_uniq(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_CONTEXTS)); +} -void PythonLogStorage::WriteLogEntry(timestamp_t, LogLevel level, const string &, const string &log_message, - const RegisteredLoggingContext &) { - if (!Py_IsInitialized()) { - return; // interpreter is finalizing — acquiring the GIL would crash - } - try { - py::gil_scoped_acquire gil; - auto logging = py::module::import("logging"); - auto logger = logging.attr("getLogger")("duckdb"); - logger.attr("log")(LogLevelToPython(level), log_message); - } catch (...) { - // Logging must not disrupt query execution. +PythonLogStorage::~PythonLogStorage() { +} + +ColumnDataCollection &PythonLogStorage::GetBuffer(LoggingTargetTable table) const { + auto res = log_storage_buffers.find(table); + if (res == log_storage_buffers.end()) { + throw InternalException("PythonLogStorage: failed to find buffer for logging target table"); } + return *res->second; } -void PythonLogStorage::WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &) { +void PythonLogStorage::ForwardEntriesToPython(DataChunk &chunk) { + // This fires from engine worker threads with the GIL released, and from within both the + // LogManager lock and this storage's lock. It runs arbitrary user Python (logging + // handlers) and MUST NOT let an exception escape: the engine calls the write path with no + // try/catch, directly from query binding/execution, so a raising handler would otherwise + // fail the user's query. Hence we swallow everything here. + // + // Caveat: because a lock is held across this call, a handler that re-enters DuckDB on the + // same thread and emits another log entry can self-deadlock on the non-recursive lock. + // That is outside our control (and matches the engine's own contract for log storages). if (!Py_IsInitialized()) { return; // interpreter is finalizing — acquiring the GIL would crash } @@ -76,17 +65,56 @@ void PythonLogStorage::WriteLogEntries(DataChunk &chunk, const RegisteredLogging py::gil_scoped_acquire gil; auto logging = py::module::import("logging"); auto logger = logging.attr("getLogger")("duckdb"); - // DataChunk is in LOG_ENTRIES format: context_id, timestamp, type, log_level, message. - // log_level (idx 3) and message (idx 4) are both VARCHAR; the chunk is freshly - // allocated by the engine so the vectors are flat. + // LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4). + // log_level and message are both VARCHAR; the buffer chunk is flat. auto level_data = FlatVector::GetData(chunk.data[3]); auto message_data = FlatVector::GetData(chunk.data[4]); for (idx_t i = 0; i < chunk.size(); i++) { logger.attr("log")(LevelStringToPython(level_data[i].GetString()), message_data[i].GetString()); } } catch (...) { - // Logging must not disrupt query execution. + // Logging must never disrupt query execution. } } +void PythonLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) { + D_ASSERT(table == LoggingTargetTable::LOG_ENTRIES || table == LoggingTargetTable::LOG_CONTEXTS); + // Retain the entry for duckdb_logs FIRST, so a misbehaving Python handler can never cost + // us a stored row. + log_storage_buffers[table]->Append(chunk); + // Forward only real log entries (not context metadata) to Python's logging module. + if (table == LoggingTargetTable::LOG_ENTRIES) { + ForwardEntriesToPython(chunk); + } +} + +void PythonLogStorage::ResetAllBuffers() { + BufferingLogStorage::ResetAllBuffers(); + for (const auto &buffer : log_storage_buffers) { + buffer.second->Reset(); + } +} + +bool PythonLogStorage::CanScan(LoggingTargetTable table) { + unique_lock lck(lock); + return IsEnabledInternal(table); +} + +unique_ptr PythonLogStorage::CreateScanState(LoggingTargetTable table) const { + return make_uniq(table); +} + +bool PythonLogStorage::Scan(LogStorageScanState &state, DataChunk &result) const { + unique_lock lck(lock); + auto &python_scan_state = state.Cast(); + return GetBuffer(python_scan_state.table).Scan(python_scan_state.scan_state, result); +} + +void PythonLogStorage::InitializeScan(LogStorageScanState &state) const { + unique_lock lck(lock); + auto &python_scan_state = state.Cast(); + GetBuffer(python_scan_state.table) + .InitializeScan(python_scan_state.scan_state, ColumnDataScanProperties::DISALLOW_ZERO_COPY); +} + } // namespace duckdb diff --git a/tests/fast/test_python_log_storage.py b/tests/fast/test_python_log_storage.py index 8b121506..e3bceb9a 100644 --- a/tests/fast/test_python_log_storage.py +++ b/tests/fast/test_python_log_storage.py @@ -1,28 +1,212 @@ +"""Tests for the PythonLogStorage composite log storage (issue #480). + +PythonLogStorage forwards engine log entries to Python's `logging` module AND keeps them +queryable via `SELECT * FROM duckdb_logs`. It is registered on the first connection to each +DatabaseInstance and routes WARNING+ entries to logging.getLogger("duckdb"). +""" + import logging import duckdb +DEPRECATION_FRAGMENT = "Deprecated lambda arrow" + + +def _trigger_deprecation_warning(con): + """Run a query that reliably emits a single engine DUCKDB_LOG_WARNING. + + The deprecated arrow (->) lambda form warns only when lambda_syntax is DEFAULT. DEFAULT + is the current engine default but is slated to change, so we pin it to keep this exercising + the warning path across submodule bumps. + """ + con.execute("SET lambda_syntax='DEFAULT'") + con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + + +def _deprecation_records(caplog): + return [r for r in caplog.records if "deprecated" in r.getMessage().lower()] + + +def _duckdb_logs_deprecation_count(con): + return con.execute(f"SELECT count(*) FROM duckdb_logs WHERE message LIKE '%{DEPRECATION_FRAGMENT}%'").fetchone()[0] + + +# --------------------------------------------------------------------------- +# Channel 1: forwarding to Python's logging module +# --------------------------------------------------------------------------- + def test_warning_routed_to_python_logging(caplog): with caplog.at_level(logging.WARNING, logger="duckdb"): con = duckdb.connect() - # Pin lambda_syntax to DEFAULT so the deprecated arrow (->) form reliably emits a - # DUCKDB_LOG_WARNING. DEFAULT is the current engine default, but it is explicitly - # slated to change ("before DuckDB's next release"); pinning keeps this test - # exercising the warning path across future submodule bumps. - con.execute("SET lambda_syntax='DEFAULT'") - con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)") - deprecation_records = [r for r in caplog.records if "deprecated" in r.message.lower()] - assert deprecation_records, "expected a deprecation warning routed to the 'duckdb' logger" - assert all(r.name == "duckdb" for r in deprecation_records) - assert all(r.levelno == logging.WARNING for r in deprecation_records) + _trigger_deprecation_warning(con) + records = _deprecation_records(caplog) + assert records, "expected a deprecation warning routed to the 'duckdb' logger" + assert all(r.name == "duckdb" for r in records) + assert all(r.levelno == logging.WARNING for r in records) def test_warning_not_emitted_for_clean_queries(caplog): with caplog.at_level(logging.WARNING, logger="duckdb"): con = duckdb.connect() con.execute("SELECT 1 + 1").fetchone() - # Assert the absence of the deprecation warning specifically rather than requiring zero - # records total — an incidental connect-time warning (e.g. the macOS Rosetta notice on - # some hardware) would otherwise make this flaky. - assert not [r for r in caplog.records if "deprecated" in r.message.lower()] + # Assert absence of the deprecation warning specifically rather than requiring zero records + # total — an incidental connect-time warning (e.g. the macOS Rosetta notice on some + # hardware) would otherwise make this flaky. + assert not _deprecation_records(caplog) + + +def test_module_level_default_connection_forwards(caplog): + # The most common Jupyter/script pattern: no explicit connect(). The default connection is + # created lazily via Connect(), so it must register the storage too. + with caplog.at_level(logging.WARNING, logger="duckdb"): + duckdb.execute("SET lambda_syntax='DEFAULT'") + duckdb.sql("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + assert _deprecation_records(caplog), "default connection should route warnings to logging" + + +# --------------------------------------------------------------------------- +# Channel 2: duckdb_logs stays queryable (the regression this design fixes) +# --------------------------------------------------------------------------- + + +def test_duckdb_logs_still_populated(): + con = duckdb.connect() + _trigger_deprecation_warning(con) + assert _duckdb_logs_deprecation_count(con) >= 1, "SELECT * FROM duckdb_logs must still surface engine warnings" + + +def test_single_warning_visible_immediately(): + # Guards against regressing to a batched (buffer_size=2048) storage where a lone warning + # would never flush. With buffer_size=1 it must appear after a single triggering query. + con = duckdb.connect() + assert _duckdb_logs_deprecation_count(con) == 0 + _trigger_deprecation_warning(con) + assert _duckdb_logs_deprecation_count(con) >= 1 + + +def test_duckdb_logs_schema_and_content(): + con = duckdb.connect() + _trigger_deprecation_warning(con) + row = con.execute( + "SELECT log_level, message, type, timestamp " + f"FROM duckdb_logs WHERE message LIKE '%{DEPRECATION_FRAGMENT}%' LIMIT 1" + ).fetchone() + assert row is not None + log_level, message, log_type, timestamp = row + assert log_level == "WARNING" + assert DEPRECATION_FRAGMENT in message + # `type` is a VARCHAR but may be empty for some engine warnings (the deprecation notice + # carries no log type), so only assert the schema, not non-emptiness. + assert isinstance(log_type, str) + assert timestamp is not None + + +# --------------------------------------------------------------------------- +# Both channels together +# --------------------------------------------------------------------------- + + +def test_both_channels_receive_the_same_entry(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + _trigger_deprecation_warning(con) + table_rows = con.execute( + f"SELECT message FROM duckdb_logs WHERE message LIKE '%{DEPRECATION_FRAGMENT}%'" + ).fetchall() + logging_records = _deprecation_records(caplog) + assert logging_records, "logging channel missing the entry" + assert table_rows, "duckdb_logs channel missing the entry" + # The message content must agree across both channels. + assert any(DEPRECATION_FRAGMENT in r.getMessage() for r in logging_records) + assert all(DEPRECATION_FRAGMENT in row[0] for row in table_rows) + + +def test_repeated_warnings_accumulate_in_both_channels(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + _trigger_deprecation_warning(con) + after_first = _duckdb_logs_deprecation_count(con) + records_after_first = len(_deprecation_records(caplog)) + _trigger_deprecation_warning(con) + after_second = _duckdb_logs_deprecation_count(con) + records_after_second = len(_deprecation_records(caplog)) + # No deduplication: a second occurrence is recorded again in both channels. + assert after_second > after_first + assert records_after_second > records_after_first + + +# --------------------------------------------------------------------------- +# Robustness +# --------------------------------------------------------------------------- + + +def test_raising_handler_does_not_fail_query_and_row_persists(): + # A user logging handler that raises must not fail the query (the engine has no try/catch + # around the log write path), and because the entry is stored BEFORE forwarding, the + # duckdb_logs row must still be present. + class BoomHandler(logging.Handler): + def emit(self, record): + # Intentionally raise to exercise the C++ exception safety net (bare raise keeps + # ruff's EM101/TRY003 happy). + raise RuntimeError + + logger = logging.getLogger("duckdb") + handler = BoomHandler() + previous_level = logger.level + logger.addHandler(handler) + logger.setLevel(logging.WARNING) + try: + con = duckdb.connect() + con.execute("SET lambda_syntax='DEFAULT'") + result = con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + assert result == [([2, 3, 4],)] + assert _duckdb_logs_deprecation_count(con) >= 1 + finally: + logger.removeHandler(handler) + logger.setLevel(previous_level) + + +def test_default_storage_configuration(): + con = duckdb.connect() + assert con.execute("SELECT current_setting('logging_storage')").fetchone()[0] == "python_log_storage" + assert con.execute("SELECT current_setting('enable_logging')").fetchone()[0] + assert con.execute("SELECT current_setting('logging_level')").fetchone()[0] == "WARNING" + + +def test_switching_to_memory_storage_disables_forwarding(caplog): + # The user escape hatch: SET logging_storage='memory' detaches our storage. Forwarding to + # logging stops, but the table path still works (now via the engine's in-memory storage). + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + con.execute("SET logging_storage='memory'") + _trigger_deprecation_warning(con) + table_count = _duckdb_logs_deprecation_count(con) + assert table_count >= 1, "memory storage should still populate duckdb_logs" + assert not _deprecation_records(caplog), "forwarding should stop once our storage is detached" + + +def test_separate_databases_are_independent(caplog): + # Logging is per-DatabaseInstance; each fresh database registers its own storage and keeps + # its own duckdb_logs, while both forward to the shared process-wide 'duckdb' logger. + with caplog.at_level(logging.WARNING, logger="duckdb"): + con_a = duckdb.connect() + con_b = duckdb.connect() + _trigger_deprecation_warning(con_a) + assert _duckdb_logs_deprecation_count(con_a) >= 1 + assert _duckdb_logs_deprecation_count(con_b) == 0, "con_b has its own, untouched storage" + _trigger_deprecation_warning(con_b) + assert _duckdb_logs_deprecation_count(con_b) >= 1 + assert len(_deprecation_records(caplog)) >= 2 + + +def test_cursor_shares_storage(caplog): + # A cursor shares the parent's DatabaseInstance, so warnings it triggers land in the same + # duckdb_logs and route to logging. + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + cur = con.cursor() + _trigger_deprecation_warning(cur) + assert _duckdb_logs_deprecation_count(con) >= 1 + assert _duckdb_logs_deprecation_count(cur) >= 1 + assert _deprecation_records(caplog) From f70d2d06d1c9e7cb3f043c553364e5326dc6d7a8 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Thu, 11 Jun 2026 20:26:18 +0200 Subject: [PATCH 3/3] try separate trhread --- src/duckdb_py/duckdb_python.cpp | 4 + .../duckdb_python/python_log_storage.hpp | 30 ++- src/duckdb_py/pyconnection.cpp | 4 + src/duckdb_py/python_log_storage.cpp | 172 +++++++++++++++--- tests/fast/test_python_log_storage.py | 56 +++++- 5 files changed, 229 insertions(+), 37 deletions(-) diff --git a/src/duckdb_py/duckdb_python.cpp b/src/duckdb_py/duckdb_python.cpp index d950960d..9b0a8695 100644 --- a/src/duckdb_py/duckdb_python.cpp +++ b/src/duckdb_py/duckdb_python.cpp @@ -5,6 +5,7 @@ #include "duckdb/parser/parser.hpp" #include "duckdb_python/python_objects.hpp" +#include "duckdb_python/python_log_storage.hpp" #include "duckdb_python/pyconnection/pyconnection.hpp" #include "duckdb_python/pystatement.hpp" #include "duckdb_python/pyrelation.hpp" @@ -1135,6 +1136,9 @@ PYBIND11_MODULE(DUCKDB_PYTHON_LIB_NAME, m) { // NOLINT "Tokenizes a SQL string, returning a list of (position, type) tuples that can be " "used for e.g., syntax highlighting", py::arg("query")); + m.def("_drain_log_forwarding", &PythonLogStorage::DrainForwarder, + "Block until all engine log entries queued for Python's logging module have been " + "forwarded. Forwarding is asynchronous; this is a test/synchronization aid."); py::enum_(m, "token_type", py::module_local()) .value("identifier", PySQLTokenType::PY_SQL_TOKEN_IDENTIFIER) .value("numeric_const", PySQLTokenType::PY_SQL_TOKEN_NUMERIC_CONSTANT) diff --git a/src/duckdb_py/include/duckdb_python/python_log_storage.hpp b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp index 0fe000f0..f00d6b28 100644 --- a/src/duckdb_py/include/duckdb_python/python_log_storage.hpp +++ b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp @@ -33,9 +33,16 @@ class PythonLogStorageScanState : public LogStorageScanState { //! 1. forwards it to Python's standard `logging` module (logging.getLogger("duckdb")), and //! 2. retains it in-memory so `SELECT * FROM duckdb_logs` keeps working. //! -//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed (and -//! therefore forwarded to Python) immediately, rather than batched until a 2048-entry buffer -//! fills — engine WARNINGs are sparse and must surface inline to be useful. +//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed +//! immediately, rather than batched until a 2048-entry buffer fills — engine WARNINGs are +//! sparse and must surface promptly to be useful. +//! +//! Forwarding to Python is ASYNCHRONOUS. The engine calls FlushChunk while holding +//! LogManager::lock (a non-recursive mutex also taken by CreateLogger/WriteLogEntry). Acquiring +//! the GIL there would deadlock against any other thread that holds the GIL and then enters one +//! of those LogManager methods (i.e. ordinary concurrent queries). So FlushChunk only copies +//! (level, message) into a process-global queue, and a single background thread — which holds +//! no engine lock — drains it and forwards to `logging`. See python_log_storage.cpp. class PythonLogStorage : public BufferingLogStorage { public: explicit PythonLogStorage(DatabaseInstance &db); @@ -45,6 +52,16 @@ class PythonLogStorage : public BufferingLogStorage { return "python_log_storage"; } + //! Starts the process-global forwarder thread (idempotent). MUST be called with the GIL held + //! and no engine lock held — i.e. from Connect(), never from the engine log-write path. + static void EnsureForwarderStarted(); + + //! Blocks (releasing the GIL) until every queued entry has been forwarded to `logging`. + //! Forwarding is asynchronous, so callers that need to observe a just-emitted warning on the + //! Python side must drain first. Exposed to Python as `_duckdb._drain_log_forwarding` + //! for deterministic tests; harmless if the forwarder was never started. + static void DrainForwarder(); + //! Single-threaded scan interface — mirrors InMemoryLogStorage so duckdb_logs can read us. bool CanScan(LoggingTargetTable table) override; unique_ptr CreateScanState(LoggingTargetTable table) const override; @@ -52,15 +69,16 @@ class PythonLogStorage : public BufferingLogStorage { void InitializeScan(LogStorageScanState &state) const override; protected: - //! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) forwards it to Python. + //! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) queues it for async forwarding. void FlushChunk(LoggingTargetTable table, DataChunk &chunk) override; //! Clears the in-memory buffers. void ResetAllBuffers() override; private: ColumnDataCollection &GetBuffer(LoggingTargetTable table) const; - //! Forwards each row of a LOG_ENTRIES chunk to logging.getLogger("duckdb"). Never throws. - void ForwardEntriesToPython(DataChunk &chunk); + //! Copies each row of a LOG_ENTRIES chunk into the global forward queue. Never touches the + //! GIL or calls Python (it runs under LogManager::lock). Never throws. + void EnqueueEntriesForPython(DataChunk &chunk); map> log_storage_buffers; }; diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index bae186c0..ec2546a8 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -2299,6 +2299,10 @@ shared_ptr DuckDBPyConnection::Connect(const py::object &dat log_manager.SetLogStorage(db_instance, "python_log_storage"); log_manager.SetEnableLogging(true); log_manager.SetLogLevel(LogLevel::LOG_WARNING); + // Start the background thread that forwards queued entries to Python's logging + // module. We're here with the GIL held and no engine lock taken — the only safe + // place to do it (the engine log-write path holds LogManager::lock). + PythonLogStorage::EnsureForwarderStarted(); } } return res; diff --git a/src/duckdb_py/python_log_storage.cpp b/src/duckdb_py/python_log_storage.cpp index 41a21f96..5f30a1c7 100644 --- a/src/duckdb_py/python_log_storage.cpp +++ b/src/duckdb_py/python_log_storage.cpp @@ -7,6 +7,10 @@ #include "duckdb/common/types/data_chunk.hpp" #include "duckdb/common/types/vector.hpp" +#include +#include +#include + namespace duckdb { // Maps the engine's textual log level (stored as VARCHAR in the LOG_ENTRIES chunk) to the @@ -30,6 +34,129 @@ static int LevelStringToPython(const string &level_str) { return 30; } +//===--------------------------------------------------------------------===// +// Asynchronous forwarder +// +// The engine invokes FlushChunk while holding LogManager::lock — a non-recursive mutex that is +// also taken by LogManager::CreateLogger / WriteLogEntry / Flush. Acquiring the GIL from inside +// that lock deadlocks: a worker thread holding the lock blocks on the GIL, while another thread +// holding the GIL blocks on the lock (e.g. via CreateLogger at the start of a concurrent query). +// We observed exactly this with two threads each running execute() on one database. +// +// So forwarding is decoupled. FlushChunk only copies plain (level, message) data into this +// process-global queue (no GIL, no Python). A single background thread drains the queue and +// forwards to logging.getLogger("duckdb") with the GIL held but NO engine lock held — breaking +// the lock-ordering cycle. One global thread (not one per DatabaseInstance) avoids spawning a +// thread per connection. The queue holds owned copies, so it is independent of any storage's +// lifetime. +//===--------------------------------------------------------------------===// +namespace { + +struct PendingLogEntry { + int level; + string message; +}; + +struct LogForwarder { + std::mutex mutex; // guards the fields below; NEVER held while acquiring the GIL + std::condition_variable cv; // forwarder waits here for work + std::condition_variable idle_cv; // drainers wait here for the queue to empty + vector queue; + bool stop = false; + bool started = false; + bool busy = false; // a batch has been dequeued but not yet forwarded + std::thread thread; +}; + +LogForwarder &GetForwarder() { + static LogForwarder forwarder; + return forwarder; +} + +void ForwarderLoop() { + auto &fwd = GetForwarder(); + while (true) { + vector batch; + { + std::unique_lock lck(fwd.mutex); + fwd.cv.wait(lck, [&fwd] { return fwd.stop || !fwd.queue.empty(); }); + if (fwd.stop && fwd.queue.empty()) { + return; + } + batch.swap(fwd.queue); + fwd.busy = true; // queue is empty again, but this batch isn't delivered yet + } + // No engine lock and no forwarder lock held here, so acquiring the GIL cannot deadlock. + if (Py_IsInitialized()) { // else interpreter is finalizing — acquiring the GIL would crash + try { + py::gil_scoped_acquire gil; + auto logging = py::module::import("logging"); + auto logger = logging.attr("getLogger")("duckdb"); + for (auto &entry : batch) { + logger.attr("log")(entry.level, entry.message); + } + } catch (...) { + // Logging must never disrupt anything. + } + } + { + std::unique_lock lck(fwd.mutex); + fwd.busy = false; + fwd.idle_cv.notify_all(); // wake any DrainForwarder() waiters + } + } +} + +// atexit callback: stop and join the forwarder while the interpreter is still alive. Runs on the +// main thread with the GIL held; the GIL is released around join() because the forwarder may be +// parked in take_gil and could not otherwise wake to observe `stop`. +void StopForwarder() { + auto &fwd = GetForwarder(); + { + std::unique_lock lck(fwd.mutex); + if (!fwd.started) { + return; + } + fwd.stop = true; + } + fwd.cv.notify_all(); + if (fwd.thread.joinable()) { + py::gil_scoped_release release; + fwd.thread.join(); + } +} + +} // namespace + +void PythonLogStorage::EnsureForwarderStarted() { + // Called from Connect() with the GIL held and no engine lock held. + auto &fwd = GetForwarder(); + { + std::unique_lock lck(fwd.mutex); + if (fwd.started) { + return; + } + fwd.started = true; + fwd.thread = std::thread(ForwarderLoop); + } + // Stop+join before interpreter finalization. Joining a GIL-blocked thread after Py_Finalize + // would crash, so we hook atexit (which runs while the interpreter is still valid). + try { + auto atexit = py::module::import("atexit"); + atexit.attr("register")(py::cpp_function([]() { StopForwarder(); })); + } catch (...) { + } +} + +void PythonLogStorage::DrainForwarder() { + auto &fwd = GetForwarder(); + // Release the GIL while waiting: the forwarder thread needs it to finish its in-flight batch + // and signal idle. Holding it here would deadlock the very thread we're waiting on. + py::gil_scoped_release release; + std::unique_lock lck(fwd.mutex); + fwd.idle_cv.wait(lck, [&fwd] { return fwd.queue.empty() && !fwd.busy; }); +} + PythonLogStorage::PythonLogStorage(DatabaseInstance &db) : BufferingLogStorage(db, 1, true) { log_storage_buffers[LoggingTargetTable::LOG_ENTRIES] = make_uniq(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_ENTRIES)); @@ -48,33 +175,28 @@ ColumnDataCollection &PythonLogStorage::GetBuffer(LoggingTargetTable table) cons return *res->second; } -void PythonLogStorage::ForwardEntriesToPython(DataChunk &chunk) { - // This fires from engine worker threads with the GIL released, and from within both the - // LogManager lock and this storage's lock. It runs arbitrary user Python (logging - // handlers) and MUST NOT let an exception escape: the engine calls the write path with no - // try/catch, directly from query binding/execution, so a raising handler would otherwise - // fail the user's query. Hence we swallow everything here. +void PythonLogStorage::EnqueueEntriesForPython(DataChunk &chunk) { + // Runs under LogManager::lock (and our scan lock). It MUST NOT touch the GIL or call Python: + // doing so here would deadlock against any thread that holds the GIL and then enters a + // LogManager method that needs the same lock (CreateLogger / WriteLogEntry / Flush). So we + // only copy plain data into the global queue; the forwarder thread does the Python work + // lock-free. The strings are deep-copied (GetString), so they outlive this chunk. // - // Caveat: because a lock is held across this call, a handler that re-enters DuckDB on the - // same thread and emits another log entry can self-deadlock on the non-recursive lock. - // That is outside our control (and matches the engine's own contract for log storages). - if (!Py_IsInitialized()) { - return; // interpreter is finalizing — acquiring the GIL would crash - } - try { - py::gil_scoped_acquire gil; - auto logging = py::module::import("logging"); - auto logger = logging.attr("getLogger")("duckdb"); - // LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4). - // log_level and message are both VARCHAR; the buffer chunk is flat. - auto level_data = FlatVector::GetData(chunk.data[3]); - auto message_data = FlatVector::GetData(chunk.data[4]); + // A side benefit of decoupling: a user logging handler that raises now runs on the forwarder + // thread, where the exception is swallowed — it can never reach the engine's query path. + // + // LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4). + // log_level and message are both VARCHAR; the buffer chunk is flat. + auto level_data = FlatVector::GetData(chunk.data[3]); + auto message_data = FlatVector::GetData(chunk.data[4]); + auto &fwd = GetForwarder(); + { + std::unique_lock lck(fwd.mutex); for (idx_t i = 0; i < chunk.size(); i++) { - logger.attr("log")(LevelStringToPython(level_data[i].GetString()), message_data[i].GetString()); + fwd.queue.push_back({LevelStringToPython(level_data[i].GetString()), message_data[i].GetString()}); } - } catch (...) { - // Logging must never disrupt query execution. } + fwd.cv.notify_one(); } void PythonLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) { @@ -82,9 +204,9 @@ void PythonLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) { // Retain the entry for duckdb_logs FIRST, so a misbehaving Python handler can never cost // us a stored row. log_storage_buffers[table]->Append(chunk); - // Forward only real log entries (not context metadata) to Python's logging module. + // Queue only real log entries (not context metadata) for async forwarding to logging. if (table == LoggingTargetTable::LOG_ENTRIES) { - ForwardEntriesToPython(chunk); + EnqueueEntriesForPython(chunk); } } diff --git a/tests/fast/test_python_log_storage.py b/tests/fast/test_python_log_storage.py index e3bceb9a..955d640c 100644 --- a/tests/fast/test_python_log_storage.py +++ b/tests/fast/test_python_log_storage.py @@ -3,24 +3,42 @@ PythonLogStorage forwards engine log entries to Python's `logging` module AND keeps them queryable via `SELECT * FROM duckdb_logs`. It is registered on the first connection to each DatabaseInstance and routes WARNING+ entries to logging.getLogger("duckdb"). + +Forwarding to `logging` is ASYNCHRONOUS (a background thread drains a queue), because the engine +calls the log-write path while holding LogManager::lock and acquiring the GIL there would +deadlock. So any assertion about the `logging` channel must first call `_drain()` to wait for the +forwarder to catch up. The `duckdb_logs` table channel is synchronous and needs no drain. """ import logging +import _duckdb +import pytest + import duckdb DEPRECATION_FRAGMENT = "Deprecated lambda arrow" +def _drain(): + """Block until the async forwarder has delivered every queued entry to `logging`.""" + _duckdb._drain_log_forwarding() + + def _trigger_deprecation_warning(con): - """Run a query that reliably emits a single engine DUCKDB_LOG_WARNING. + """Run a query that reliably emits a single engine DUCKDB_LOG_WARNING, then drain. The deprecated arrow (->) lambda form warns only when lambda_syntax is DEFAULT. DEFAULT is the current engine default but is slated to change, so we pin it to keep this exercising the warning path across submodule bumps. + + We drain the async forwarder before returning so the entry is delivered to `logging` while + the caller's caplog handler is still attached — callers may read records after their + `with caplog` block exits. """ con.execute("SET lambda_syntax='DEFAULT'") con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + _drain() def _deprecation_records(caplog): @@ -62,6 +80,7 @@ def test_module_level_default_connection_forwards(caplog): with caplog.at_level(logging.WARNING, logger="duckdb"): duckdb.execute("SET lambda_syntax='DEFAULT'") duckdb.sql("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + _drain() # this test triggers directly rather than via _trigger_deprecation_warning assert _deprecation_records(caplog), "default connection should route warnings to logging" @@ -142,13 +161,14 @@ def test_repeated_warnings_accumulate_in_both_channels(caplog): def test_raising_handler_does_not_fail_query_and_row_persists(): - # A user logging handler that raises must not fail the query (the engine has no try/catch - # around the log write path), and because the entry is stored BEFORE forwarding, the - # duckdb_logs row must still be present. + # A user logging handler that raises must not disrupt anything. Forwarding runs on a + # background thread (decoupled from the query path), so the exception is swallowed there by + # the forwarder's catch(...) — the query can never see it. The entry is also stored BEFORE + # being queued, so the duckdb_logs row must still be present. We drain while the handler is + # attached so it actually fires and exercises the C++ exception safety net. class BoomHandler(logging.Handler): def emit(self, record): - # Intentionally raise to exercise the C++ exception safety net (bare raise keeps - # ruff's EM101/TRY003 happy). + # Intentionally raise (bare raise keeps ruff's EM101/TRY003 happy). raise RuntimeError logger = logging.getLogger("duckdb") @@ -160,6 +180,7 @@ def emit(self, record): con = duckdb.connect() con.execute("SET lambda_syntax='DEFAULT'") result = con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + _drain() # force the raising handler to fire on the forwarder thread assert result == [([2, 3, 4],)] assert _duckdb_logs_deprecation_count(con) >= 1 finally: @@ -167,6 +188,29 @@ def emit(self, record): logger.setLevel(previous_level) +@pytest.mark.timeout(60) +def test_concurrent_warning_queries_do_not_deadlock(): + # Regression guard. Forwarding used to acquire the GIL from inside FlushChunk, which runs + # under LogManager::lock. With two threads each running a warning-emitting query, one thread + # would hold that lock and block on the GIL while another held the GIL and blocked on the + # lock (via LogManager::CreateLogger) — a hard deadlock. Forwarding is now async, so this + # must complete quickly. pytest-timeout (configured for the suite) fails the test if it hangs. + from concurrent.futures import ThreadPoolExecutor + + def hammer(con): + cur = con.cursor() + cur.execute("SET lambda_syntax='DEFAULT'") + for _ in range(20): + cur.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + + con = duckdb.connect() + with ThreadPoolExecutor(max_workers=4) as pool: + futures = [pool.submit(hammer, con) for _ in range(4)] + for future in futures: + future.result() + _drain() + + def test_default_storage_configuration(): con = duckdb.connect() assert con.execute("SELECT current_setting('logging_storage')").fetchone()[0] == "python_log_storage"