Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/dbzero/bindings/python/PyAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <dbzero/core/dram/DRAM_Prefix.hpp>
#include <dbzero/core/vspace/v_object.hpp>
#include <dbzero/core/serialization/Types.hpp>
#include <dbzero/core/threading/SafeRMutex.hpp>

namespace db0::python

Expand Down Expand Up @@ -1146,7 +1147,7 @@ namespace db0::python

PyObject *tryWait(const char *prefix, long state, long timeout)
{
std::unique_lock<std::recursive_mutex> api_lock;
db0::SafeRLock api_lock;
{
// GIL have to be released to safely lock API
db0::python::WithGIL_Unlocked no_gil;
Expand Down
1 change: 1 addition & 0 deletions src/dbzero/bindings/python/PyLocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "PyLocks.hpp"

namespace db0::python

{

GIL_Lock::GIL_Lock()
Expand Down
4 changes: 1 addition & 3 deletions src/dbzero/bindings/python/PyLocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

#include <Python.h>

#define PY_API_FUNC PyThreadState *__save = PyEval_SaveThread(); \
auto __api_lock = db0::python::PyToolkit::lockApi(); \
PyEval_RestoreThread(__save);
#define PY_API_FUNC auto __api_lock = db0::python::PyToolkit::lockPyApi();

namespace db0::python

Expand Down
21 changes: 18 additions & 3 deletions src/dbzero/bindings/python/PyToolkit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace db0::python


PyToolkit::PyWorkspace PyToolkit::m_py_workspace;
std::recursive_mutex PyToolkit::m_api_mutex;
SafeRMutex PyToolkit::m_api_mutex;

std::string PyToolkit::getTypeName(ObjectPtr py_object) {
return getTypeName(Py_TYPE(py_object));
Expand Down Expand Up @@ -709,10 +709,25 @@ namespace db0::python
return PyClassObject_Check(py_object);
}

std::unique_lock<std::recursive_mutex> PyToolkit::lockApi() {
return std::unique_lock<std::recursive_mutex>(m_api_mutex);
SafeRLock PyToolkit::lockApi() {
return { m_api_mutex };
}

SafeRLock PyToolkit::lockPyApi()
{
if (m_api_mutex.isOwnedByThisThread()) {
// already locked by this thread
return {};
}

// unlock GIL while waiting for the API mutex
PyThreadState *__save = PyEval_SaveThread();
auto result = SafeRLock(m_api_mutex);
// restore GIL
PyEval_RestoreThread(__save);
return result;
}

PyToolkit::TypeObjectPtr PyToolkit::getBaseType(TypeObjectPtr py_object) {
return py_object->tp_base;
}
Expand Down
15 changes: 7 additions & 8 deletions src/dbzero/bindings/python/PyToolkit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
#include "PyLocks.hpp"
#include <dbzero/core/collections/pools/StringPools.hpp>
#include <dbzero/core/memory/swine_ptr.hpp>

#define PY_API_FUNC PyThreadState *__save = PyEval_SaveThread(); \
auto __api_lock = db0::python::PyToolkit::lockApi(); \
PyEval_RestoreThread(__save);
#include <dbzero/core/threading/SafeRMutex.hpp>

namespace db0

Expand Down Expand Up @@ -240,8 +237,10 @@ namespace db0::python
static std::optional<bool> getBool(ObjectPtr py_object, const std::string &key);
static std::optional<std::string> getString(ObjectPtr py_object, const std::string &key);

// block until lock acquired
static std::unique_lock<std::recursive_mutex> lockApi();
// Blocks until lock acquired
static SafeRLock lockApi();
// locks API from a Python context (releases GIL while waiting for the lock)
static SafeRLock lockPyApi();

// return base type of TypeObject
static TypeObjectPtr getBaseType(TypeObjectPtr py_object);
Expand All @@ -259,10 +258,10 @@ namespace db0::python
// decRef operation for memo objects
// @return true if reference count was decremented to zero (!hasRefs)
static bool decRefMemo(bool is_tag, ObjectPtr py_object);

private:
static PyWorkspace m_py_workspace;
static std::recursive_mutex m_api_mutex;
static SafeRMutex m_api_mutex;
};

}
35 changes: 35 additions & 0 deletions src/dbzero/core/threading/SafeRMutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (c) 2025 DBZero Software sp. z o.o.

#include "SafeRMutex.hpp"

namespace db0

{

void SafeRMutex::lock()
{
std::thread::id this_id = std::this_thread::get_id();
// We can use relaxed ordering because we only care if WE wrote it previously.
if (m_owner.load(std::memory_order_relaxed) == this_id) {
++m_recursion_count;
return;
}

m_mutex.lock();
m_owner.store(this_id, std::memory_order_relaxed);
m_recursion_count = 1;
}

void SafeRMutex::unlock()
{
--m_recursion_count;

if (m_recursion_count == 0) {
// Clear ownership BEFORE unlocking to avoid race conditions with future lockers
m_owner.store(std::thread::id(), std::memory_order_relaxed);
m_mutex.unlock();
}
}

}
77 changes: 77 additions & 0 deletions src/dbzero/core/threading/SafeRMutex.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (c) 2025 DBZero Software sp. z o.o.

#pragma once

#include <mutex>
#include <atomic>
#include <thread>
#include <iostream>

namespace db0

{

// Safe recursive mutex with thread tracking
// allowing additional checks (e.g. for proper integration with Python GIL)
class SafeRMutex
{
std::mutex m_mutex;
std::atomic<std::thread::id> m_owner = {};
int m_recursion_count = 0;

public:
bool isOwnedByThisThread() const {
return m_owner.load(std::memory_order_relaxed) == std::this_thread::get_id();
}

void lock();
void unlock();
};

class SafeRLock
{
public:
SafeRLock() = default;
SafeRLock(const SafeRLock &) = delete;

SafeRLock(SafeRLock &&other) noexcept
: m_mutex_ptr(other.m_mutex_ptr)
{
other.m_mutex_ptr = nullptr;
}

SafeRLock(SafeRMutex &mutex)
: m_mutex_ptr(&mutex)
{
m_mutex_ptr->lock();
}

~SafeRLock() {
unlock();
}

void unlock()
{
if (m_mutex_ptr) {
m_mutex_ptr->unlock();
m_mutex_ptr = nullptr;
}
}

SafeRLock &operator=(const SafeRLock &) = delete;

SafeRLock &operator=(SafeRLock &&other) noexcept
{
if (this != &other) {
m_mutex_ptr = other.m_mutex_ptr;
other.m_mutex_ptr = nullptr;
}
return *this;
}

private:
SafeRMutex *m_mutex_ptr = nullptr;
};

}
30 changes: 0 additions & 30 deletions src/dbzero/core/threading/ThreadTracker.cpp

This file was deleted.

24 changes: 0 additions & 24 deletions src/dbzero/core/threading/ThreadTracker.hpp

This file was deleted.

11 changes: 2 additions & 9 deletions src/dbzero/workspace/FixtureThreads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "FixtureThreads.hpp"
#include <dbzero/core/memory/Prefix.hpp>
#include <dbzero/object_model/LangConfig.hpp>
#include <dbzero/core/threading/ThreadTracker.hpp>
#include "AtomicContext.hpp"
#include "LockedContext.hpp"

Expand Down Expand Up @@ -206,25 +205,19 @@ namespace db0
void AutoCommitThread::onUpdate(Fixture &fixture)
{
using LangToolkit = db0::object_model::LangConfig::LangToolkit;

// need to lock the language API first
// otherwise it may deadlock on trying to invoke API calls from auto-commit
// (e.g. instance destruction triggered by LangCache::clear)
auto __api_lock = LangToolkit::lockApi();
// NOTE: since this a separate thread, we must acuire the language interpreter's lock (where required)
// // NOTE: since this a separate thread, we must acuire the language interpreter's lock (where required)
auto lang_lock = LangToolkit::ensureLocked();
#ifndef NDEBUG
ThreadTracker::beginUnique();
#endif
auto callbacks = fixture.onAutoCommit();
if (!callbacks.empty()) {
assert(m_context && "AutoSaveContext must exist here!");
// These callbacks have to be executed when 'everything' is unlocked. Otherwise we are risking a deadlock.
m_context->appendCallbacks(std::move(callbacks));
}
#ifndef NDEBUG
ThreadTracker::end();
#endif
}

void AutoCommitThread::prepareContext()
Expand Down