From 34788464fe384ca31c478c90e523c8f72d2ae0a6 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sun, 17 May 2026 13:00:10 +0200 Subject: [PATCH 1/4] nested atomic operations --- AGENTS.md | 1 + python_tests/test_atomic.py | 274 ++++++++++++++++++ python_tests/test_atomic_lifecycle.py | 94 ++++++ src/dbzero/bindings/python/PyAtomic.cpp | 6 +- src/dbzero/bindings/python/PyToolkit.cpp | 14 +- src/dbzero/bindings/python/PyWorkspace.cpp | 2 +- .../bindings/python/collections/PyIndex.cpp | 14 +- src/dbzero/core/memory/BoundaryLock.cpp | 2 +- src/dbzero/core/memory/BoundaryLock.hpp | 2 +- src/dbzero/core/memory/Memspace.cpp | 13 +- src/dbzero/core/memory/Memspace.hpp | 4 +- src/dbzero/core/memory/MetaAllocator.cpp | 17 +- src/dbzero/core/memory/MetaAllocator.hpp | 4 +- src/dbzero/core/memory/PrefixCache.cpp | 87 ++++-- src/dbzero/core/memory/PrefixCache.hpp | 18 +- src/dbzero/core/memory/PrefixImpl.cpp | 22 +- src/dbzero/core/memory/PrefixImpl.hpp | 6 +- src/dbzero/core/memory/SlabManager.cpp | 68 +++-- src/dbzero/core/memory/SlabManager.hpp | 17 +- src/dbzero/core/memory/VObjectCache.cpp | 21 +- src/dbzero/core/memory/VObjectCache.hpp | 18 +- src/dbzero/object_model/LangCache.cpp | 7 +- src/dbzero/workspace/AtomicContext.cpp | 24 +- src/dbzero/workspace/AtomicContext.hpp | 12 +- src/dbzero/workspace/Fixture.cpp | 23 +- src/dbzero/workspace/Fixture.hpp | 9 +- src/dbzero/workspace/FixtureThreads.cpp | 4 +- src/dbzero/workspace/GC0.cpp | 27 +- src/dbzero/workspace/GC0.hpp | 14 +- src/dbzero/workspace/Workspace.cpp | 44 ++- src/dbzero/workspace/Workspace.hpp | 11 +- tests/unit_tests/PrefixImplTest.cpp | 109 +++++++ 32 files changed, 792 insertions(+), 196 deletions(-) create mode 100644 python_tests/test_atomic_lifecycle.py diff --git a/AGENTS.md b/AGENTS.md index 7e077217..9fbeaa2b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,6 +24,7 @@ All tests must pass before a change is considered complete. - Python tests: `./scripts/run_tests.sh` - C++ tests after a `-t` build: `./build/release/tests.x` +- During development, do not run stress tests by default; they are intentionally slow. Run focused tests specific to the feature or refactor being worked on before finalization. - If any C++ source under the native/core part of the project was modified, also run the C++ test suite (do not rely on the Python tests alone to cover native changes). Never mark a task done while tests are failing. diff --git a/python_tests/test_atomic.py b/python_tests/test_atomic.py index 98208245..470c0e52 100644 --- a/python_tests/test_atomic.py +++ b/python_tests/test_atomic.py @@ -2,6 +2,8 @@ # Copyright (c) 2025 DBZero Software sp. z o.o. import time +import gc +import random import pytest import dbzero as db0 from .memo_test_types import MemoTestClass, MemoTestSingleton, MemoScopedSingleton, MemoScopedClass @@ -382,6 +384,62 @@ def test_atomic_stress_test_1(db0_no_autocommit): print(f"Atomic operations completed: {count}") +@pytest.mark.stress_test +def test_nested_atomic_stress_test_1(db0_no_autocommit): + rng = random.Random(0xDB0A70) + buf = db0.list() + state = MemoTestClass({"counter": 0}) + expected_count = 0 + + def make_payload(outer_index, group_index, level, item_index): + header = f"{outer_index}:{group_index}:{level}:{item_index}:" + return header + rand_string(4096 - len(header)) + + def append_items(outer_index, group_index, level, item_count): + for item_index in range(item_count): + buf.append(MemoTestClass(make_payload(outer_index, group_index, level, item_index))) + state.value["counter"] = state.value["counter"] + item_count + return item_count + + def run_nested_block(outer_index, group_index, level): + mode = rng.choices(["commit", "cancel", "exception"], weights=[6, 2, 2], k=1)[0] + max_depth = rng.randint(2, 4) + item_count = rng.randint(1, 4) + committed_count = 0 + + try: + with db0.atomic() as atomic: + committed_count += append_items(outer_index, group_index, level, item_count) + + if level < max_depth and rng.random() < 0.75: + committed_count += run_nested_block(outer_index, group_index, level + 1) + + if mode == "cancel": + atomic.cancel() + return 0 + + if mode == "exception": + raise RuntimeError("nested atomic rollback") + + return committed_count + except RuntimeError: + return 0 + + for outer_index in range(250): + outer_committed_count = 0 + with db0.atomic(): + outer_committed_count += append_items(outer_index, -1, 0, 40) + + for group_index in range(10): + outer_committed_count += run_nested_block(outer_index, group_index, 1) + + expected_count += outer_committed_count + print(f"Nested atomic operations completed: {outer_index + 1}") + + assert len(buf) == expected_count + assert state.value["counter"] == expected_count + + def test_atomic_deletion(db0_fixture): obj = MemoTestClass(MemoTestClass(123)) dep_uuid = db0.uuid(obj.value) @@ -490,3 +548,219 @@ def test_atomic_context_does_not_increase_state_num(db0_fixture): state_1 = db0.get_state_num() with db0.atomic(): assert db0.get_state_num() == state_1 + + +def test_nested_atomic_cancel_reverts_only_nested_changes(db0_fixture): + object_1 = MemoTestClass(1) + with db0.atomic(): + object_1.value += 10 + try: + with db0.atomic(): + object_1.value += 20 + raise RuntimeError("nested failure") + except RuntimeError: + pass + + assert object_1.value == 11 + + +def test_nested_atomic_success_merges_into_parent(db0_fixture): + object_1 = MemoTestClass(1) + with db0.atomic(): + object_1.value += 10 + with db0.atomic(): + object_1.value += 20 + + assert object_1.value == 31 + + +def test_deep_nested_atomic_cancel_reverts_top_only(db0_fixture): + object_1 = MemoTestClass(1) + with db0.atomic(): + object_1.value += 10 + with db0.atomic(): + object_1.value += 20 + with db0.atomic() as atomic: + object_1.value += 30 + atomic.cancel() + + assert object_1.value == 31 + + +def test_parent_cancel_reverts_successful_nested_atomic(db0_fixture): + object_1 = MemoTestClass(1) + with db0.atomic() as atomic: + object_1.value += 10 + with db0.atomic(): + object_1.value += 20 + atomic.cancel() + + assert object_1.value == 1 + + +def test_nested_atomic_list_cancel_reverts_only_nested_changes(db0_fixture): + object_1 = MemoTestClass([1]) + with db0.atomic(): + object_1.value.append(2) + try: + with db0.atomic(): + object_1.value.append(3) + raise RuntimeError("nested failure") + except RuntimeError: + pass + object_1.value.append(4) + + assert list(object_1.value) == [1, 2, 4] + + +def test_nested_atomic_can_begin_after_grandchild_rollback_with_list_update(db0_fixture): + items = db0.list() + root = MemoTestClass({"items": items, "counter": 0}) + + with db0.atomic(): + root.value["counter"] = 1 + committed_child = MemoTestClass("child") + items.append(committed_child) + + try: + with db0.atomic(): + rolled_child = MemoTestClass("rolled") + root.value["counter"] = 999 + items.append(rolled_child) + + with db0.atomic(): + root.value["counter"] = 1000 + raise RuntimeError("grandchild rollback") + except RuntimeError: + pass + + with db0.atomic(): + root.value["counter"] = 2 + + assert root.value["counter"] == 2 + assert [obj.value for obj in items] == ["child"] + root = items = committed_child = None + gc.collect() + + +def test_nested_atomic_rollback_of_new_tagged_object_is_gc_safe(db0_fixture): + items = db0.list() + + with db0.atomic(): + committed_child = MemoTestClass("child") + items.append(committed_child) + + try: + with db0.atomic(): + rolled_child = MemoTestClass("rolled") + items.append(rolled_child) + db0.tags(rolled_child).add("nested-rolled") + + with db0.atomic(): + raise RuntimeError("grandchild rollback") + except RuntimeError: + pass + + assert [obj.value for obj in items] == ["child"] + assert len(list(db0.find("nested-rolled"))) == 0 + items = committed_child = rolled_child = None + gc.collect() + + +def test_nested_atomic_cancel_reverts_index_add_without_corrupting_index(db0_fixture): + index = db0.index() + committed = MemoTestClass("committed") + canceled = None + + with db0.atomic(): + index.add(1, committed) + + with db0.atomic() as atomic: + canceled = MemoTestClass("canceled") + index.add(2, canceled) + atomic.cancel() + canceled = None + + assert [obj.value for obj in index.select(0, 10)] == ["committed"] + + assert [obj.value for obj in index.select(0, 10)] == ["committed"] + index = committed = canceled = None + gc.collect() + + +def test_deep_nested_atomic_mixed_commit_and_rollback(db0_fixture): + items = db0.list() + index = db0.index() + keep = MemoTestClass("keep") + drop = MemoTestClass("drop") + root = MemoTestClass({"drop": drop, "counter": 0}) + drop = None + + items.append(keep) + index.add(1, keep) + db0.tags(keep).add("nested-keep") + + with db0.atomic(): + root.value["counter"] = 1 + committed_child = MemoTestClass("child-commit") + items.append(committed_child) + index.add(2, committed_child) + db0.tags(committed_child).add("nested-child-commit") + + try: + with db0.atomic(): + nonlocal_marker = MemoTestClass("child-rollback") + root.value["counter"] = 999 + root.value["drop"] = None + items.append(nonlocal_marker) + index.add(3, nonlocal_marker) + db0.tags(nonlocal_marker).add("nested-child-rollback") + + with db0.atomic(): + root.value["counter"] = 1000 + raise RuntimeError("level 3 rollback") + except RuntimeError: + nonlocal_marker = None + pass + + assert root.value["counter"] == 1 + assert [obj.value for obj in items] == ["keep", "child-commit"] + assert {obj.value for obj in index.select(0, 10)} == {"keep", "child-commit"} + assert len(list(db0.find("nested-child-rollback"))) == 0 + assert root.value["drop"].value == "drop" + + with db0.atomic(): + root.value["counter"] = 2 + grandchild = MemoTestClass("grandchild-commit") + items.append(grandchild) + index.add(4, grandchild) + db0.tags(grandchild).add("nested-grandchild-commit") + + assert root.value["counter"] == 2 + assert [obj.value for obj in items] == [ + "keep", + "child-commit", + "grandchild-commit", + ] + + root.value["drop"] = None + + assert root.value["counter"] == 2 + assert [obj.value for obj in items] == [ + "keep", + "child-commit", + "grandchild-commit", + ] + assert {obj.value for obj in index.select(0, 10)} == { + "keep", + "child-commit", + "grandchild-commit", + } + assert len(list(db0.find("nested-keep"))) == 1 + assert len(list(db0.find("nested-child-commit"))) == 1 + assert len(list(db0.find("nested-grandchild-commit"))) == 1 + assert len(list(db0.find("nested-child-rollback"))) == 0 + assert root.value["drop"] is None + + root = items = index = keep = committed_child = grandchild = None + gc.collect() diff --git a/python_tests/test_atomic_lifecycle.py b/python_tests/test_atomic_lifecycle.py new file mode 100644 index 00000000..f258edbf --- /dev/null +++ b/python_tests/test_atomic_lifecycle.py @@ -0,0 +1,94 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# Copyright (c) 2025 DBZero Software sp. z o.o. + +import gc + +import dbzero as db0 + +from .memo_test_types import MemoTestClass + + +def test_nested_atomic_lifecycle_previous_pytest_frame(db0_fixture): + obj = MemoTestClass([1]) + + with db0.atomic(): + obj.value.append(2) + try: + with db0.atomic(): + obj.value.append(3) + raise RuntimeError("nested failure") + except RuntimeError: + pass + obj.value.append(4) + + assert list(obj.value) == [1, 2, 4] + + +def test_deep_nested_atomic_lifecycle_after_previous_test(db0_fixture): + items = db0.list() + index = db0.index() + keep = MemoTestClass("keep") + drop = MemoTestClass("drop") + root = MemoTestClass({"drop": drop, "counter": 0}) + drop = None + + items.append(keep) + index.add(1, keep) + db0.tags(keep).add("nested-keep") + + with db0.atomic(): + root.value["counter"] = 1 + committed_child = MemoTestClass("child-commit") + items.append(committed_child) + index.add(2, committed_child) + db0.tags(committed_child).add("nested-child-commit") + + try: + with db0.atomic(): + nonlocal_marker = MemoTestClass("child-rollback") + root.value["counter"] = 999 + root.value["drop"] = None + items.append(nonlocal_marker) + index.add(3, nonlocal_marker) + db0.tags(nonlocal_marker).add("nested-child-rollback") + + with db0.atomic(): + root.value["counter"] = 1000 + raise RuntimeError("level 3 rollback") + except RuntimeError: + nonlocal_marker = None + + assert root.value["counter"] == 1 + assert [obj.value for obj in items] == ["keep", "child-commit"] + assert {obj.value for obj in index.select(0, 10)} == {"keep", "child-commit"} + assert len(list(db0.find("nested-child-rollback"))) == 0 + assert root.value["drop"].value == "drop" + + with db0.atomic(): + root.value["counter"] = 2 + grandchild = MemoTestClass("grandchild-commit") + items.append(grandchild) + index.add(4, grandchild) + db0.tags(grandchild).add("nested-grandchild-commit") + + root.value["drop"] = None + + assert root.value["counter"] == 2 + assert [obj.value for obj in items] == [ + "keep", + "child-commit", + "grandchild-commit", + ] + assert {obj.value for obj in index.select(0, 10)} == { + "keep", + "child-commit", + "grandchild-commit", + } + assert len(list(db0.find("nested-keep"))) == 1 + assert len(list(db0.find("nested-child-commit"))) == 1 + assert len(list(db0.find("nested-grandchild-commit"))) == 1 + assert len(list(db0.find("nested-child-rollback"))) == 0 + assert root.value["drop"] is None + + root = items = index = keep = committed_child = grandchild = None + gc.collect() diff --git a/src/dbzero/bindings/python/PyAtomic.cpp b/src/dbzero/bindings/python/PyAtomic.cpp index be982f3b..533a804c 100644 --- a/src/dbzero/bindings/python/PyAtomic.cpp +++ b/src/dbzero/bindings/python/PyAtomic.cpp @@ -45,7 +45,7 @@ namespace db0::python .tp_free = PyObject_Free, }; - PyAtomic *PyAPI_tryBeginAtomic(PyObject *self, std::unique_lock &&lock) + PyAtomic *PyAPI_tryBeginAtomic(PyObject *self, std::unique_lock &&lock) { PY_API_FUNC auto py_object = Py_OWN(PyAtomic_new(&PyAtomicType, NULL, NULL)); @@ -62,7 +62,7 @@ namespace db0::python } // need to acquire atomic lock before API lock - std::unique_lock atomic_lock; + std::unique_lock atomic_lock; { // this is to prevent GIL-related deadlocks WithGIL_Unlocked no_gil; @@ -99,4 +99,4 @@ namespace db0::python return runSafe(tryPyAtomic_cancel, reinterpret_cast(self)); } -} \ No newline at end of file +} diff --git a/src/dbzero/bindings/python/PyToolkit.cpp b/src/dbzero/bindings/python/PyToolkit.cpp index a6349735..dcf58e62 100644 --- a/src/dbzero/bindings/python/PyToolkit.cpp +++ b/src/dbzero/bindings/python/PyToolkit.cpp @@ -452,11 +452,21 @@ namespace db0::python lang_cache.add(address, py_index.get()); auto py_index_ptr = py_index.get(); - py_index->ext().setDirtyCallback([py_index_ptr](bool incRef) { + // Dirty/clean notifications are state transitions, but nested atomic + // rollback/merge paths may emit a clean notification for work that did + // not take a matching Python self-reference in this wrapper callback. + // Keep the callback's own ref balance explicit so an unmatched clean + // does not drop the LangCache-owned Index wrapper. + auto dirty_ref_count = std::make_shared(0); + py_index->ext().setDirtyCallback([py_index_ptr, dirty_ref_count](bool incRef) { if (incRef) { Py_INCREF(py_index_ptr); + ++(*dirty_ref_count); } else { - Py_DECREF(py_index_ptr); + if (*dirty_ref_count > 0) { + --(*dirty_ref_count); + Py_DECREF(py_index_ptr); + } } }); diff --git a/src/dbzero/bindings/python/PyWorkspace.cpp b/src/dbzero/bindings/python/PyWorkspace.cpp index 6e094526..3acbe550 100644 --- a/src/dbzero/bindings/python/PyWorkspace.cpp +++ b/src/dbzero/bindings/python/PyWorkspace.cpp @@ -133,4 +133,4 @@ namespace db0::python } return m_config; } -} \ No newline at end of file +} diff --git a/src/dbzero/bindings/python/collections/PyIndex.cpp b/src/dbzero/bindings/python/collections/PyIndex.cpp index 47679d3f..9b6912ba 100644 --- a/src/dbzero/bindings/python/collections/PyIndex.cpp +++ b/src/dbzero/bindings/python/collections/PyIndex.cpp @@ -78,11 +78,21 @@ namespace db0::python // NOTE: this callback is important for proper lifecycle management // we must prevent dirty Index instance from deletion auto py_index_ptr = py_index.get(); - index.setDirtyCallback([py_index_ptr](bool incRef) { + // Dirty/clean notifications are state transitions, but nested atomic + // rollback/merge paths may emit a clean notification for work that did + // not take a matching Python self-reference in this wrapper callback. + // Keep the callback's own ref balance explicit so an unmatched clean + // does not drop the LangCache-owned Index wrapper. + auto dirty_ref_count = std::make_shared(0); + index.setDirtyCallback([py_index_ptr, dirty_ref_count](bool incRef) { if (incRef) { Py_INCREF(py_index_ptr); + ++(*dirty_ref_count); } else { - Py_DECREF(py_index_ptr); + if (*dirty_ref_count > 0) { + --(*dirty_ref_count); + Py_DECREF(py_index_ptr); + } } }); diff --git a/src/dbzero/core/memory/BoundaryLock.cpp b/src/dbzero/core/memory/BoundaryLock.cpp index 5280eab9..275124b9 100644 --- a/src/dbzero/core/memory/BoundaryLock.cpp +++ b/src/dbzero/core/memory/BoundaryLock.cpp @@ -139,4 +139,4 @@ namespace db0 } #endif -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/BoundaryLock.hpp b/src/dbzero/core/memory/BoundaryLock.hpp index 5e78277f..7b0281d6 100644 --- a/src/dbzero/core/memory/BoundaryLock.hpp +++ b/src/dbzero/core/memory/BoundaryLock.hpp @@ -48,4 +48,4 @@ namespace db0 bool _tryFlush(FlushMethod); }; -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/Memspace.cpp b/src/dbzero/core/memory/Memspace.cpp index a2354e75..e6bf26be 100644 --- a/src/dbzero/core/memory/Memspace.cpp +++ b/src/dbzero/core/memory/Memspace.cpp @@ -121,8 +121,7 @@ namespace db0 void Memspace::beginAtomic() { - assert(!m_atomic); - m_atomic = true; + ++m_atomic_depth; getAllocatorForUpdate().commit(); // note that we don't flush from prefix on begin atomic m_prefix->beginAtomic(); @@ -130,16 +129,16 @@ namespace db0 void Memspace::endAtomic() { - assert(m_atomic); - m_atomic = false; + assert(m_atomic_depth > 0); + --m_atomic_depth; getAllocator().detach(); m_prefix->endAtomic(); } void Memspace::cancelAtomic() { - assert(m_atomic); - m_atomic = false; + assert(m_atomic_depth > 0); + --m_atomic_depth; // NOTE: the deferred operations on the allocator get cancelled getAllocator().detach(); m_prefix->cancelAtomic(); @@ -164,4 +163,4 @@ namespace db0 return m_allocator_ptr->isAllocated(address, realm_id, size_of_result); } -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/Memspace.hpp b/src/dbzero/core/memory/Memspace.hpp index f02f7950..106d1f5f 100644 --- a/src/dbzero/core/memory/Memspace.hpp +++ b/src/dbzero/core/memory/Memspace.hpp @@ -133,8 +133,8 @@ namespace db0 Allocator *m_allocator_ptr = nullptr; // UUID (if passed from a derived class) std::optional m_derived_UUID; - // flag indicating if the atomic operation is in progress - bool m_atomic = false; + // atomic operation nesting depth + std::uint32_t m_atomic_depth = 0; std::size_t m_page_size = 0; unsigned int m_page_shift = 0; // exhaustive list of instances which may need flush diff --git a/src/dbzero/core/memory/MetaAllocator.cpp b/src/dbzero/core/memory/MetaAllocator.cpp index d3c067ea..098bf196 100644 --- a/src/dbzero/core/memory/MetaAllocator.cpp +++ b/src/dbzero/core/memory/MetaAllocator.cpp @@ -332,7 +332,7 @@ namespace db0 { // NOTE: if atomic operation is in progress, the deferred free operations are not flushed // this is not a finalized and potentially reversible commit - if (!m_atomic) { + if (m_atomic_depth == 0) { flush(); } m_realms.commit(); @@ -352,28 +352,27 @@ namespace db0 void MetaAllocator::flush() const { - assert(!m_atomic); + assert(m_atomic_depth == 0); m_realms.flush(); } void MetaAllocator::beginAtomic() { - assert(!m_atomic); - m_atomic = true; + ++m_atomic_depth; m_realms.beginAtomic(); } void MetaAllocator::endAtomic() { - assert(m_atomic); - m_atomic = false; + assert(m_atomic_depth > 0); + --m_atomic_depth; m_realms.endAtomic(); } void MetaAllocator::cancelAtomic() { - assert(m_atomic); - m_atomic = false; + assert(m_atomic_depth > 0); + --m_atomic_depth; m_realms.cancelAtomic(); } @@ -472,4 +471,4 @@ namespace db0 return m_realms.getDeferredFreeCount(); } -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/MetaAllocator.hpp b/src/dbzero/core/memory/MetaAllocator.hpp index 99370117..912e8644 100644 --- a/src/dbzero/core/memory/MetaAllocator.hpp +++ b/src/dbzero/core/memory/MetaAllocator.hpp @@ -217,8 +217,8 @@ DB0_PACKED_END RealmsVector m_realms; SlabRecycler *m_recycler_ptr; std::function m_slab_id_function; - // flag indicating if the atomic operation is in progress - bool m_atomic = false; + // atomic operation nesting depth + std::uint32_t m_atomic_depth = 0; /** * Reads header information from the prefix diff --git a/src/dbzero/core/memory/PrefixCache.cpp b/src/dbzero/core/memory/PrefixCache.cpp index f856613f..dc864708 100644 --- a/src/dbzero/core/memory/PrefixCache.cpp +++ b/src/dbzero/core/memory/PrefixCache.cpp @@ -52,7 +52,8 @@ namespace db0 assert(lock); if (is_volatile) { // NOTE: volatile locks are not registered with the recycler - m_volatile_locks.push_back(lock); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_dp_locks.push_back(lock); } else { // register or update lock with the recycler if (m_cache_recycler_ptr) { @@ -119,7 +120,8 @@ namespace db0 dp_lock->setDirty(); // upgraded locks may need to be registered as volatile if (is_volatile) { - m_volatile_locks.push_back(dp_lock); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_dp_locks.push_back(dp_lock); } } } @@ -151,7 +153,8 @@ namespace db0 assert(lock); // register with the volatile locks if (is_volatile) { - m_volatile_wide_locks.push_back(lock); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_wide_locks.push_back(lock); } else { // register or update lock with the recycler if (m_cache_recycler_ptr) { @@ -244,7 +247,8 @@ namespace db0 read_state_num = state_num; // upgraded locks may need to be registered as volatile if (is_volatile) { - m_volatile_wide_locks.push_back(wide_lock); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_wide_locks.push_back(wide_lock); } } } @@ -325,7 +329,8 @@ namespace db0 m_boundary_map.insert(read_state_num, br_lock); // the new lock may need to be registered as "potentially" volatile because parent locks may already be volatile if (access_mode[AccessOptions::no_flush]) { - m_volatile_boundary_locks.push_back(br_lock); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_boundary_locks.push_back(br_lock); } } @@ -358,7 +363,8 @@ namespace db0 // register with the volatile locks if (access_mode[AccessOptions::no_flush]) { - m_volatile_locks.push_back(result); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_dp_locks.push_back(result); } else { // register with the recycler if (m_cache_recycler_ptr) { @@ -377,7 +383,8 @@ namespace db0 // register with the volatile locks if (access_mode[AccessOptions::no_flush]) { - m_volatile_wide_locks.push_back(result); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_wide_locks.push_back(result); } else { // register with the recycler if (m_cache_recycler_ptr) { @@ -404,7 +411,8 @@ namespace db0 // note that BoundaryLocks are not recycled // register with the volatile locks if (access_mode[AccessOptions::no_flush]) { - m_volatile_boundary_locks.push_back(result); + assert(!m_volatile_lock_stack.empty()); + m_volatile_lock_stack.back().m_boundary_locks.push_back(result); } return result; @@ -426,9 +434,12 @@ namespace db0 void PrefixCache::release() { - discardAll(m_volatile_boundary_locks); - discardAll(m_volatile_wide_locks); - discardAll(m_volatile_locks); + for (auto &volatileLocks : m_volatile_lock_stack) { + discardAll(volatileLocks.m_boundary_locks); + discardAll(volatileLocks.m_wide_locks); + discardAll(volatileLocks.m_dp_locks); + } + m_volatile_lock_stack.clear(); // undo write / remove dirty flag from all owned locks forEach([&](ResourceLock &lock) { lock.discard(); @@ -470,6 +481,11 @@ namespace db0 }); } + void PrefixCache::beginAtomic() + { + m_volatile_lock_stack.emplace_back(); + } + void PrefixCache::commit(ProcessTimer *parent_timer) { std::unique_ptr timer; @@ -515,25 +531,25 @@ namespace db0 void PrefixCache::rollback(StateNumType state_num) { + assert(!m_volatile_lock_stack.empty()); + auto volatileLocks = std::move(m_volatile_lock_stack.back()); + m_volatile_lock_stack.pop_back(); // remove all volatile locks - for (auto &lock: m_volatile_boundary_locks) { + for (auto &lock : volatileLocks.m_boundary_locks) { // erase range eraseBoundaryRange(lock->getAddress(), lock->size(), state_num); lock->discard(); } - m_volatile_boundary_locks.clear(); - for (auto &lock: m_volatile_wide_locks) { + for (auto &lock : volatileLocks.m_wide_locks) { // erase range eraseRange(lock->getAddress(), lock->size(), state_num); lock->discard(); } - m_volatile_wide_locks.clear(); - for (auto &lock: m_volatile_locks) { + for (auto &lock : volatileLocks.m_dp_locks) { // erase range eraseRange(lock->getAddress(), lock->size(), state_num); lock->discard(); } - m_volatile_locks.clear(); } void PrefixCache::eraseRange(std::uint64_t address, std::size_t size, StateNumType state_num) @@ -566,7 +582,10 @@ namespace db0 if (end_page == first_page + 1) { auto existing_lock = m_dp_map.replace(state_num, new_lock, first_page); if (existing_lock) { - assert(!existing_lock->isVolatile()); + // In nested atomic blocks the parent state may already own a + // volatile lock for this page. Replacing means merging the + // child's changes into that parent lock, not necessarily into + // a committed transaction lock. return existing_lock; } else { // must clear the no_flush flag if lock was reused @@ -576,7 +595,8 @@ namespace db0 } else { auto existing_lock = m_wide_map.replace(state_num, std::dynamic_pointer_cast(new_lock), first_page); if (existing_lock) { - assert(!existing_lock->isVolatile()); + // Same as DP locks above: a nested child can merge into a + // volatile wide lock owned by its parent atomic frame. return existing_lock; } else { // must clear the no_flush flag if lock was reused @@ -593,7 +613,8 @@ namespace db0 assert(isBoundaryRange(first_page, ((address + size - 1) >> m_shift) + 1, address & m_mask)); auto existing_lock = m_boundary_map.replace(state_num, new_lock, first_page); if (existing_lock) { - assert(!existing_lock->isVolatile()); + // Nested merge can target a boundary lock from the parent atomic + // frame, so the existing lock is allowed to be volatile here. return true; } else { // must clear the no_flush flag if lock was reused (i.e. added to cache under a different state) @@ -605,16 +626,20 @@ namespace db0 void PrefixCache::merge(StateNumType from_state_num, StateNumType to_state_num, std::vector > &reused_locks) { + assert(!m_volatile_lock_stack.empty()); + auto volatileLocks = std::move(m_volatile_lock_stack.back()); + m_volatile_lock_stack.pop_back(); + // Remove all volatile boundary locks before merge, to flush them // into the underlying parent DP-locks // This is fine because in the head transaction we've released all boundary locks // so they need not to be merged - m_volatile_boundary_locks.clear(); + volatileLocks.m_boundary_locks.clear(); // first collect residual parts from dirty wide locks // which need to be trated as dirty, otherwise may not be merged / flushed properly std::unordered_set dirty_res_locks; - for (auto &lock: m_volatile_wide_locks) { + for (auto &lock: volatileLocks.m_wide_locks) { if (lock->isDirty()) { // collect dirty residual locks dirty_res_locks.insert(lock->getResLockPtr()); @@ -623,7 +648,7 @@ namespace db0 std::unordered_map > rebase_map; // merge DP-locks first - for (auto &lock: m_volatile_locks) { + for (auto &lock: volatileLocks.m_dp_locks) { // erase volatile range related lock eraseRange(lock->getAddress(), lock->size(), from_state_num); // NOTE: volatile locks may not be dirty if first accessed as read-only from the atomic context @@ -636,13 +661,15 @@ namespace db0 rebase_map[lock.get()] = existing_lock; } else { reused_locks.push_back(lock); + if (!m_volatile_lock_stack.empty()) { + m_volatile_lock_stack.back().m_dp_locks.push_back(lock); + } } } } - m_volatile_locks.clear(); // merge wide locks next & rebase residual locks if needed - for (auto &lock: m_volatile_wide_locks) { + for (auto &lock: volatileLocks.m_wide_locks) { // erase volatile range related lock eraseRange(lock->getAddress(), lock->size(), from_state_num); // NOTE: volatile locks may not be dirty if first accessed as read-only from the atomic context @@ -656,10 +683,12 @@ namespace db0 // need to rebase parent locks if the boundary lock was reused lock->rebase(rebase_map); reused_locks.push_back(lock); + if (!m_volatile_lock_stack.empty()) { + m_volatile_lock_stack.back().m_wide_locks.push_back(lock); + } } } } - m_volatile_wide_locks.clear(); } std::size_t PrefixCache::getPageSize() const { @@ -708,9 +737,7 @@ namespace db0 { // NOTE boundary map may contain non-expired locks - e.g. ones supported by Snapshot (e.g. PrefixViewImpl) // there must be no volatile locks - assert(m_volatile_locks.empty()); - assert(m_volatile_wide_locks.empty()); - assert(m_volatile_boundary_locks.empty()); + assert(m_volatile_lock_stack.empty()); // clear all expired locks from the boundary map m_boundary_map.clear(); } @@ -737,4 +764,4 @@ namespace db0 return { dp_total, dp_cow }; } -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/PrefixCache.hpp b/src/dbzero/core/memory/PrefixCache.hpp index dd816c1f..16ddad56 100644 --- a/src/dbzero/core/memory/PrefixCache.hpp +++ b/src/dbzero/core/memory/PrefixCache.hpp @@ -123,6 +123,8 @@ namespace db0 // Flush managed boundary locks only void flushBoundary(); + void beginAtomic(); + /** * Relase / rollback all locks stored by the cache * this is required for a proper winding down in unit tests @@ -184,10 +186,16 @@ namespace db0 // marker lock (to mark missing ranges) const std::shared_ptr m_missing_dp_lock_ptr; const std::shared_ptr m_missing_wide_lock_ptr; - // locks (DP_Lock or WideLock) with no_flush flag (e.g. from an atomic update) - mutable std::vector > m_volatile_locks; - mutable std::vector > m_volatile_wide_locks; - mutable std::vector > m_volatile_boundary_locks; + // One volatile-lock frame per nested atomic block. Boundary locks are tracked by frame + // because their parent DP states can change during merge/rebase, so state-derived + // ownership is not reliable enough for scoped rollback. + struct VolatileLocks + { + std::vector > m_dp_locks; + std::vector > m_wide_locks; + std::vector > m_boundary_locks; + }; + mutable std::vector m_volatile_lock_stack; /** * Execute specific function for each stored resource lock, boundary locks processed first @@ -216,4 +224,4 @@ namespace db0 volatile_locks.clear(); } -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/PrefixImpl.cpp b/src/dbzero/core/memory/PrefixImpl.cpp index 60eb9def..bbe988aa 100644 --- a/src/dbzero/core/memory/PrefixImpl.cpp +++ b/src/dbzero/core/memory/PrefixImpl.cpp @@ -55,7 +55,7 @@ namespace db0 assert(state_num > 0); assert(size > 0); // for atomic operations use no_flush flag to allow reverting changes - if (m_atomic) { + if (m_atomic_depth > 0) { access_mode.set(AccessOptions::no_flush, true); } @@ -285,12 +285,12 @@ namespace db0 StateNumType PrefixImpl::getStateNum(bool finalized) const { // NOTE: must apply atomic operation adjustment - int adjust = m_atomic ? -1 : 0; + auto adjust = static_cast(m_atomic_depth); if (finalized) { // in case of read/write prefixes the head state number is never finalized - return (m_access_type == AccessType::READ_WRITE) ? (m_head_state_num - 1 + adjust):(m_head_state_num + adjust); + return (m_access_type == AccessType::READ_WRITE) ? (m_head_state_num - 1 - adjust):(m_head_state_num - adjust); } else { - return m_head_state_num + adjust; + return m_head_state_num - adjust; } } @@ -385,26 +385,26 @@ namespace db0 void PrefixImpl::beginAtomic() { - assert(!m_atomic); // Flush all boundary locks before the start of a new atomic operation // this is to avoid flushing (which in case of the boundary locks - mutates the underlying DPs) // during the atomic operation. Otherwise it would result in a data inconsistency - // this is because the atomic operation needs to start over a DP-consistent state // Due to the same reason, also flush the residual parts of wide locks m_cache.flushBoundary(); + m_cache.beginAtomic(); // increment state number to allow isolation ++m_head_state_num; - m_atomic = true; + ++m_atomic_depth; } void PrefixImpl::endAtomic() { - assert(m_atomic); + assert(m_atomic_depth > 0); std::vector > reused_locks; // merge all results into the current transaction m_cache.merge(m_head_state_num, m_head_state_num - 1, reused_locks); --m_head_state_num; - m_atomic = false; + --m_atomic_depth; // update reused locks with CacheRecycler // this can only be done AFTER completing the atomic operation (as it's a potentially mutable operation) @@ -418,10 +418,10 @@ namespace db0 void PrefixImpl::cancelAtomic() { - assert(m_atomic); + assert(m_atomic_depth > 0); m_cache.rollback(m_head_state_num); --m_head_state_num; - m_atomic = false; + --m_atomic_depth; } BaseStorage &PrefixImpl::getStorage() const { @@ -448,4 +448,4 @@ namespace db0 callback("dirty_dp_cow", cow_stats.second); } -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/PrefixImpl.hpp b/src/dbzero/core/memory/PrefixImpl.hpp index e191b464..6e7ca1ed 100644 --- a/src/dbzero/core/memory/PrefixImpl.hpp +++ b/src/dbzero/core/memory/PrefixImpl.hpp @@ -99,8 +99,8 @@ namespace db0 const std::uint32_t m_shift; StateNumType m_head_state_num; mutable PrefixCache m_cache; - // flag indicating atomic operation in progress - bool m_atomic = false; + // atomic operation nesting depth + std::uint32_t m_atomic_depth = 0; std::shared_ptr mapPage(std::uint64_t page_num, StateNumType state_num, FlagSet); std::shared_ptr mapBoundaryRange(std::uint64_t page_num, std::uint64_t address, @@ -115,4 +115,4 @@ namespace db0 void adjustAccessMode(FlagSet &access_mode, std::uint64_t address, std::size_t size) const; }; -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/SlabManager.cpp b/src/dbzero/core/memory/SlabManager.cpp index 652cd742..74b80f01 100644 --- a/src/dbzero/core/memory/SlabManager.cpp +++ b/src/dbzero/core/memory/SlabManager.cpp @@ -131,9 +131,9 @@ namespace db0 auto capacity = SlabAllocator::formatSlab(m_prefix, address, m_slab_size, m_page_size); // NOTE: for a new slab, the initial lost capacity is 0 auto slab = std::make_shared(m_prefix, address, m_slab_size, m_page_size, capacity, 0); - if (m_atomic) { + if (!m_atomic_stack.empty()) { // if atomic operation is in progress, add to the volatile slabs - m_volatile_slabs.push_back(address); + m_atomic_stack.back().m_volatile_slabs.push_back(address); } return { slab, slab_id }; @@ -343,34 +343,45 @@ namespace db0 void SlabManager::beginAtomic() { - assert(!m_atomic); - assert(m_volatile_slabs.empty()); - m_atomic = true; + m_atomic_stack.emplace_back(); } void SlabManager::endAtomic() { - assert(m_atomic); + assert(!m_atomic_stack.empty()); + auto atomicFrame = std::move(m_atomic_stack.back()); + m_atomic_stack.pop_back(); + // merge atomic deferred free operations - if (!m_atomic_deferred_free_ops.empty()) { - for (auto addr : m_atomic_deferred_free_ops) { - m_deferred_free_ops.insert(addr); + if (!atomicFrame.m_deferred_free_ops.empty()) { + if (!m_atomic_stack.empty()) { + auto &parentDeferredFreeOps = m_atomic_stack.back().m_deferred_free_ops; + parentDeferredFreeOps.insert(parentDeferredFreeOps.end(), atomicFrame.m_deferred_free_ops.begin(), + atomicFrame.m_deferred_free_ops.end()); + } else { + for (auto addr : atomicFrame.m_deferred_free_ops) { + m_deferred_free_ops.insert(addr); + } } - m_atomic_deferred_free_ops.clear(); } - m_volatile_slabs.clear(); - m_atomic = false; + if (!atomicFrame.m_volatile_slabs.empty() && !m_atomic_stack.empty()) { + auto &parentVolatileSlabs = m_atomic_stack.back().m_volatile_slabs; + parentVolatileSlabs.insert(parentVolatileSlabs.end(), atomicFrame.m_volatile_slabs.begin(), + atomicFrame.m_volatile_slabs.end()); + } } void SlabManager::cancelAtomic() { - assert(m_atomic); - // rollback atomic deferred free operations - m_atomic_deferred_free_ops.clear(); + assert(!m_atomic_stack.empty()); + auto atomicFrame = std::move(m_atomic_stack.back()); + m_atomic_stack.pop_back(); - // revert all volatile slabs from cache - for (auto slab_addr : m_volatile_slabs) { + // Rollback restores the prefix state, so cached slab objects from the + // canceled frame may carry stale capacity metadata. Drop all slab cache + // state and reopen slabs lazily from the restored prefix. + for (auto slab_addr : atomicFrame.m_volatile_slabs) { auto it = m_slabs.find(slab_addr); if (it != m_slabs.end()) { auto slab_item = it->second.lock(); @@ -380,9 +391,19 @@ namespace db0 m_slabs.erase(it); } } + for (auto &slab_item : m_dirty_slabs) { + slab_item->m_is_dirty = false; + } + m_dirty_slabs.clear(); + if (m_recycler_ptr) { + m_recycler_ptr->close([this](const SlabItem &slab) { + return &slab->getPrefix() == m_prefix.get(); + }); + } + m_slabs.clear(); + m_reserved_slabs.clear(); m_active_slab = {}; - m_volatile_slabs.clear(); - m_atomic = false; + m_next_slab_id = {}; } void SlabManager::saveItem(SlabItem &item) const @@ -676,8 +697,8 @@ namespace db0 void SlabManager::deferredFree(Address address) { - if (m_atomic) { - m_atomic_deferred_free_ops.push_back(address); + if (!m_atomic_stack.empty()) { + m_atomic_stack.back().m_deferred_free_ops.push_back(address); } else { m_deferred_free_ops.insert(address); } @@ -685,8 +706,7 @@ namespace db0 void SlabManager::flush() const { - assert(!m_atomic); - assert(m_atomic_deferred_free_ops.empty()); + assert(m_atomic_stack.empty()); // perform the deferred free operations if (!m_deferred_free_ops.empty()) { for (auto addr : m_deferred_free_ops) { @@ -700,4 +720,4 @@ namespace db0 return m_deferred_free_ops.size(); } -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/SlabManager.hpp b/src/dbzero/core/memory/SlabManager.hpp index 4b2653e3..f09acde1 100644 --- a/src/dbzero/core/memory/SlabManager.hpp +++ b/src/dbzero/core/memory/SlabManager.hpp @@ -154,11 +154,16 @@ namespace db0 std::function m_slab_address_func; std::function m_slab_id_func; mutable std::optional m_next_slab_id; - // addresses of slabs newly created during atomic operations (potentially to be reverted) - mutable std::vector m_volatile_slabs; - // the atomic operation's flag - bool m_atomic = false; - std::vector
m_atomic_deferred_free_ops; + // One frame per nested atomic block. New slabs and deferred frees must be + // promoted or discarded together so child commit/rollback keeps allocator state scoped. + struct AtomicFrame + { + // Slabs created in this atomic block; removed from cache if the block is rolled back. + std::vector m_volatile_slabs; + // Frees requested in this atomic block; promoted on commit, discarded on rollback. + std::vector
m_deferred_free_ops; + }; + mutable std::vector m_atomic_stack; const bool m_deferred_free; mutable std::unordered_set
m_deferred_free_ops; // the list of modified slabs (need backend refresh) @@ -186,4 +191,4 @@ namespace db0 void _free(Address, std::uint32_t slab_id); }; -} \ No newline at end of file +} diff --git a/src/dbzero/core/memory/VObjectCache.cpp b/src/dbzero/core/memory/VObjectCache.cpp index 9f003840..6c448be4 100644 --- a/src/dbzero/core/memory/VObjectCache.cpp +++ b/src/dbzero/core/memory/VObjectCache.cpp @@ -138,27 +138,30 @@ namespace db0 void VObjectCache::beginAtomic() { - assert(!m_atomic); commit(); - m_atomic = true; + m_volatile_stack.emplace_back(); } void VObjectCache::endAtomic() { - assert(m_atomic); - m_atomic = false; - m_volatile.clear(); + assert(!m_volatile_stack.empty()); + auto volatileInstances = std::move(m_volatile_stack.back()); + m_volatile_stack.pop_back(); + if (!m_volatile_stack.empty()) { + auto &parentVolatileInstances = m_volatile_stack.back(); + parentVolatileInstances.insert(volatileInstances.begin(), volatileInstances.end()); + } } void VObjectCache::cancelAtomic() { - assert(m_atomic); - m_atomic = false; + assert(!m_volatile_stack.empty()); // remove volatile instances from cache - for (auto address : m_volatile) { + auto volatileInstances = std::move(m_volatile_stack.back()); + m_volatile_stack.pop_back(); + for (auto address : volatileInstances) { erase(address); } - m_volatile.clear(); } } diff --git a/src/dbzero/core/memory/VObjectCache.hpp b/src/dbzero/core/memory/VObjectCache.hpp index f704d6de..6991630b 100644 --- a/src/dbzero/core/memory/VObjectCache.hpp +++ b/src/dbzero/core/memory/VObjectCache.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -119,9 +120,8 @@ namespace db0 // note that detach function may not be present (non-detachable) mutable std::unordered_map, std::uint32_t, std::function, std::function > > m_cache; - bool m_atomic = false; - // volatile instances - i.e. ones created during atomic operation - mutable std::unordered_set m_volatile; + // volatile instance stack - i.e. instances used during atomic operations + mutable std::vector > m_volatile_stack; }; template @@ -148,8 +148,8 @@ namespace db0 }; } m_cache[result_ptr->getAddress()] = { ptr, index, commit_func, detach_func }; - if (m_atomic) { - m_volatile.insert(result_ptr->getAddress()); + if (!m_volatile_stack.empty()) { + m_volatile_stack.back().insert(result_ptr->getAddress()); } return result_ptr; } @@ -162,8 +162,8 @@ namespace db0 auto lock = std::get<0>(it->second).lock(); if (lock) { // for atomic operations register the address as volatile - if (m_atomic) { - m_volatile.insert(address); + if (!m_volatile_stack.empty()) { + m_volatile_stack.back().insert(address); } return std::static_pointer_cast(lock); } else { @@ -187,8 +187,8 @@ namespace db0 m_cache.erase(it); return nullptr; } - if (m_atomic) { - m_volatile.insert(address); + if (!m_volatile_stack.empty()) { + m_volatile_stack.back().insert(address); } return std::static_pointer_cast(lock); } diff --git a/src/dbzero/object_model/LangCache.cpp b/src/dbzero/object_model/LangCache.cpp index a28a5ba1..ae818b82 100644 --- a/src/dbzero/object_model/LangCache.cpp +++ b/src/dbzero/object_model/LangCache.cpp @@ -114,7 +114,10 @@ namespace db0 } bool LangCache::erase(const Fixture &fixture, Address address, bool expired_only, bool as_defunct) { - return erase(getFixtureId(fixture), address, expired_only); + // Preserve the defunct-cleanup mode for callers that enter through a + // Fixture. Dropping it here would make interpreter-shutdown cleanup try + // to DECREF Python objects after the safe cleanup window. + return erase(getFixtureId(fixture), address, expired_only, as_defunct); } bool LangCache::erase(std::uint16_t fixture_id, Address address, bool expired_only, bool as_defunct) @@ -317,4 +320,4 @@ namespace db0 } } -} \ No newline at end of file +} diff --git a/src/dbzero/workspace/AtomicContext.cpp b/src/dbzero/workspace/AtomicContext.cpp index 73c89da3..7d447e73 100644 --- a/src/dbzero/workspace/AtomicContext.cpp +++ b/src/dbzero/workspace/AtomicContext.cpp @@ -14,7 +14,7 @@ namespace db0 { - std::mutex AtomicContext::m_atomic_mutex; + std::recursive_mutex AtomicContext::m_atomic_mutex; // NOTE: since objects might've been destroyed inside atomic operation, we need to check before detaching template void detachExisting(const T &obj) @@ -68,12 +68,15 @@ namespace db0 functions[static_cast(TypeId::DB0_TUPLE)] = detachObject; } - AtomicContext::AtomicContext(std::shared_ptr &workspace, std::unique_lock &&lock) + AtomicContext::AtomicContext(std::shared_ptr &workspace, std::unique_lock &&lock) : m_workspace(workspace) + , m_parent(workspace->currentAtomicContext()) , m_atomic_lock(std::move(lock)) { assert(isActive()); - m_workspace->preAtomic(); + if (!m_parent) { + m_workspace->preAtomic(); + } m_workspace->beginAtomic(this); } @@ -89,8 +92,8 @@ namespace db0 for (auto &pair : m_objects) { detachObject(type_manager.getTypeId(pair.second.get()), pair.second.get()); } - m_workspace->cancelAtomic(); m_objects.clear(); + m_workspace->cancelAtomic(this); } catch (...) { m_atomic_lock.unlock(); throw; @@ -121,7 +124,12 @@ namespace db0 detachObject(type_manager.getTypeId(pair.second.get()), pair.second.get()); } - m_workspace->endAtomic(); + m_workspace->endAtomic(this); + if (m_parent) { + for (auto &pair : m_objects) { + m_parent->add(pair.first, pair.second.get()); + } + } m_objects.clear(); } catch (...) { m_atomic_lock.unlock(); @@ -147,12 +155,12 @@ namespace db0 } } - std::unique_lock AtomicContext::lock() { - return std::unique_lock(m_atomic_mutex); + std::unique_lock AtomicContext::lock() { + return std::unique_lock(m_atomic_mutex); } bool AtomicContext::isActive() const { return m_atomic_lock.owns_lock(); } -} \ No newline at end of file +} diff --git a/src/dbzero/workspace/AtomicContext.hpp b/src/dbzero/workspace/AtomicContext.hpp index 3e2a4433..e582b618 100644 --- a/src/dbzero/workspace/AtomicContext.hpp +++ b/src/dbzero/workspace/AtomicContext.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include namespace db0 @@ -49,7 +50,7 @@ namespace db0 using ObjectSharedPtr = LangToolkit::ObjectSharedPtr; using ObjectSharedExtPtr = LangToolkit::ObjectSharedExtPtr; - AtomicContext(std::shared_ptr &, std::unique_lock &&); + AtomicContext(std::shared_ptr &, std::unique_lock &&); // Register specific instance with the current transaction (for rollback/detach) void add(Address, ObjectPtr); @@ -61,18 +62,19 @@ namespace db0 void cancel(); void close(); - static std::unique_lock lock(); + static std::unique_lock lock(); private: std::shared_ptr m_workspace; + AtomicContext *m_parent = nullptr; std::unordered_map m_objects; // mutex / lock to prevent mutliple concurrent atomic operations // also acquired by the autocommit-thread to prevent auto-commit during atomic operation - static std::mutex m_atomic_mutex; - std::unique_lock m_atomic_lock; + static std::recursive_mutex m_atomic_mutex; + std::unique_lock m_atomic_lock; bool isActive() const; }; -} \ No newline at end of file +} diff --git a/src/dbzero/workspace/Fixture.cpp b/src/dbzero/workspace/Fixture.cpp index ca36d4f3..deb5837a 100644 --- a/src/dbzero/workspace/Fixture.cpp +++ b/src/dbzero/workspace/Fixture.cpp @@ -475,8 +475,8 @@ namespace db0 void Fixture::beginAtomic(AtomicContext *context) { - assert(!m_atomic_context_ptr); - m_atomic_context_ptr = context; + preAtomic(); + m_atomic_context_stack.push_back(context); m_meta_allocator.beginAtomic(); getGC0().beginAtomic(); m_string_pool.commit(); @@ -507,10 +507,11 @@ namespace db0 Memspace::detach(); } - void Fixture::endAtomic() + void Fixture::endAtomic(AtomicContext *context) { - assert(m_atomic_context_ptr); - m_atomic_context_ptr = nullptr; + assert(!m_atomic_context_stack.empty()); + assert(m_atomic_context_stack.back() == context); + m_atomic_context_stack.pop_back(); m_meta_allocator.endAtomic(); m_v_object_cache.endAtomic(); @@ -518,10 +519,11 @@ namespace db0 Memspace::endAtomic(); } - void Fixture::cancelAtomic() + void Fixture::cancelAtomic(AtomicContext *context) { - assert(m_atomic_context_ptr); - m_atomic_context_ptr = nullptr; + assert(!m_atomic_context_stack.empty()); + assert(m_atomic_context_stack.back() == context); + m_atomic_context_stack.pop_back(); m_meta_allocator.cancelAtomic(); getGC0().cancelAtomic(); // rollback any uncommited changes @@ -539,7 +541,10 @@ namespace db0 } AtomicContext *Fixture::tryGetAtomicContext() const { - return m_atomic_context_ptr; + if (m_atomic_context_stack.empty()) { + return nullptr; + } + return m_atomic_context_stack.back(); } void Fixture::forAllSlabs(std::function f) const { diff --git a/src/dbzero/workspace/Fixture.hpp b/src/dbzero/workspace/Fixture.hpp index ced527d8..aee1a6c1 100644 --- a/src/dbzero/workspace/Fixture.hpp +++ b/src/dbzero/workspace/Fixture.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -253,9 +254,9 @@ DB0_PACKED_BEGIN void beginAtomic(AtomicContext *context); - void endAtomic(); + void endAtomic(AtomicContext *context); - void cancelAtomic(); + void cancelAtomic(AtomicContext *context); void detach(); @@ -301,7 +302,9 @@ DB0_PACKED_BEGIN ObjectCatalogue m_object_catalogue; // internal cache for dbzero based collections mutable VObjectCache m_v_object_cache; - AtomicContext *m_atomic_context_ptr = nullptr; + // Non-owning active atomic stack. AtomicContext instances are owned by the + // Python wrapper; fixtures only observe the current lexical/LIFO context. + std::vector m_atomic_context_stack; std::atomic m_closed = false; std::atomic m_commit_pending = false; diff --git a/src/dbzero/workspace/FixtureThreads.cpp b/src/dbzero/workspace/FixtureThreads.cpp index c5aba1b5..1b3bbc7b 100644 --- a/src/dbzero/workspace/FixtureThreads.cpp +++ b/src/dbzero/workspace/FixtureThreads.cpp @@ -176,13 +176,13 @@ namespace db0 { std::unique_lock m_commit_lock; std::unique_lock m_locked_context_lock; - std::unique_lock m_atomic_lock; + std::unique_lock m_atomic_lock; public: AutoSaveContext( std::unique_lock &&commit_lock, std::unique_lock &&locked_context_lock, - std::unique_lock &&atomic_lock) + std::unique_lock &&atomic_lock) : m_commit_lock(std::move(commit_lock)) , m_locked_context_lock(std::move(locked_context_lock)) , m_atomic_lock(std::move(atomic_lock)) diff --git a/src/dbzero/workspace/GC0.cpp b/src/dbzero/workspace/GC0.cpp index eba3ebfa..8c37334e 100644 --- a/src/dbzero/workspace/GC0.cpp +++ b/src/dbzero/workspace/GC0.cpp @@ -210,23 +210,28 @@ namespace db0 void GC0::beginAtomic() { - assert(!m_atomic); // commmit all active v_object instances so that the underlying locks can be re-created (CoW) commitAll(); - m_atomic = true; + m_volatile_stack.emplace_back(); } void GC0::endAtomic() { - assert(m_atomic); - m_volatile.clear(); - m_atomic = false; + assert(!m_volatile_stack.empty()); + auto volatilePtrs = std::move(m_volatile_stack.back()); + m_volatile_stack.pop_back(); + if (!m_volatile_stack.empty()) { + auto &parentVolatilePtrs = m_volatile_stack.back(); + parentVolatilePtrs.insert(parentVolatilePtrs.end(), volatilePtrs.begin(), volatilePtrs.end()); + } } void GC0::cancelAtomic() { - assert(m_atomic); - for (auto vptr : m_volatile) { + assert(!m_volatile_stack.empty()); + auto volatilePtrs = std::move(m_volatile_stack.back()); + m_volatile_stack.pop_back(); + for (auto vptr : volatilePtrs) { if (vptr) { tryRemove(vptr, true); } @@ -236,8 +241,6 @@ namespace db0 for (auto &item : m_flush_map) { ops_list[item.second].flush(item.first, true); } - m_volatile.clear(); - m_atomic = false; } std::optional GC0::erase(void *vptr) @@ -252,8 +255,8 @@ namespace db0 m_flush_map.erase(it); } - if (m_atomic) { - for (auto &volatile_ptr: m_volatile) { + if (!m_volatile_stack.empty()) { + for (auto &volatile_ptr: m_volatile_stack.back()) { if (volatile_ptr == vptr) { volatile_ptr = nullptr; } @@ -285,4 +288,4 @@ namespace db0 return std::make_unique(*this); } -} \ No newline at end of file +} diff --git a/src/dbzero/workspace/GC0.hpp b/src/dbzero/workspace/GC0.hpp index d02dc458..e7147fa8 100644 --- a/src/dbzero/workspace/GC0.hpp +++ b/src/dbzero/workspace/GC0.hpp @@ -163,10 +163,8 @@ namespace db0 std::unordered_map m_flush_map; // objects irrevocably scheduled for deletion std::unordered_map m_scheduled_for_deletion; - // flag indicating atomic operation in progress - bool m_atomic = false; - // the list of volatile instances - i.e. created during atomic operation - std::vector m_volatile; + // volatile instance stack - i.e. instances created during atomic operations + std::vector > m_volatile_stack; mutable std::mutex m_mutex; void commitAll(); @@ -194,8 +192,8 @@ namespace db0 if (ops_list[T::m_gc_ops_id].flush) { m_flush_map[vptr] = T::m_gc_ops_id; } - if (m_atomic) { - m_volatile.push_back(vptr); + if (!m_volatile_stack.empty()) { + m_volatile_stack.back().push_back(vptr); } } @@ -208,8 +206,8 @@ namespace db0 if (flush_op) { m_flush_map[vptr] = *flush_op; } - if (m_atomic) { - m_volatile.push_back(vptr); + if (!m_volatile_stack.empty()) { + m_volatile_stack.back().push_back(vptr); } } diff --git a/src/dbzero/workspace/Workspace.cpp b/src/dbzero/workspace/Workspace.cpp index be3321b0..e76253e4 100644 --- a/src/dbzero/workspace/Workspace.cpp +++ b/src/dbzero/workspace/Workspace.cpp @@ -374,9 +374,11 @@ namespace db0 fixture->commit(); } - if (m_atomic_context_ptr && *access_type == AccessType::READ_WRITE) { - // begin atomic with the new read/write fixture - fixture->beginAtomic(m_atomic_context_ptr); + if (!m_atomic_context_stack.empty() && *access_type == AccessType::READ_WRITE) { + // begin all active atomic levels with the new read/write fixture + for (auto *context : m_atomic_context_stack) { + fixture->beginAtomic(context); + } } it = m_fixtures.emplace(fixture->getUUID(), fixture).first; @@ -646,8 +648,7 @@ namespace db0 void Workspace::preAtomic() { - assert(!m_atomic_context_ptr); - // begin atomic with all open read/write fixtures + // prepare all currently open read/write fixtures before the root atomic block for (auto &[uuid, fixture] : m_fixtures) { if (fixture->getAccessType() == AccessType::READ_WRITE) { fixture->preAtomic(); @@ -657,14 +658,21 @@ namespace db0 void Workspace::beginAtomic(AtomicContext *context) { - assert(!m_atomic_context_ptr); // begin atomic with all open read/write fixtures for (auto &[uuid, fixture] : m_fixtures) { if (fixture->getAccessType() == AccessType::READ_WRITE) { fixture->beginAtomic(context); } } - m_atomic_context_ptr = context; + m_atomic_context_stack.push_back(context); + } + + AtomicContext *Workspace::currentAtomicContext() const + { + if (m_atomic_context_stack.empty()) { + return nullptr; + } + return m_atomic_context_stack.back(); } unsigned int Workspace::beginLocked() @@ -725,26 +733,30 @@ namespace db0 } } - void Workspace::endAtomic() + void Workspace::endAtomic(AtomicContext *context) { - assert(m_atomic_context_ptr); + assert(!m_atomic_context_stack.empty()); + assert(m_atomic_context_stack.back() == context); // end atomic with all open fixtures for (auto &[uuid, fixture] : m_fixtures) { if (fixture->getAccessType() == AccessType::READ_WRITE) { - fixture->endAtomic(); + fixture->endAtomic(context); } } - m_atomic_context_ptr = nullptr; + m_atomic_context_stack.pop_back(); } - void Workspace::cancelAtomic() + void Workspace::cancelAtomic(AtomicContext *context) { - assert(m_atomic_context_ptr); - // end atomic with all open fixtures + assert(!m_atomic_context_stack.empty()); + assert(m_atomic_context_stack.back() == context); + // cancel atomic with all open read/write fixtures for (auto &[uuid, fixture] : m_fixtures) { - fixture->cancelAtomic(); + if (fixture->getAccessType() == AccessType::READ_WRITE) { + fixture->cancelAtomic(context); + } } - m_atomic_context_ptr = nullptr; + m_atomic_context_stack.pop_back(); } void Workspace::setAutocommitInterval(std::uint64_t interval_ms) { diff --git a/src/dbzero/workspace/Workspace.hpp b/src/dbzero/workspace/Workspace.hpp index f2e1a9a6..2fd16b5d 100644 --- a/src/dbzero/workspace/Workspace.hpp +++ b/src/dbzero/workspace/Workspace.hpp @@ -273,10 +273,11 @@ namespace db0 // prepare for an atomic operation - e.g. by flushing internal update buffers void preAtomic(); void beginAtomic(AtomicContext *context); + AtomicContext *currentAtomicContext() const; void detach(); - void endAtomic(); + void endAtomic(AtomicContext *context); - void cancelAtomic(); + void cancelAtomic(AtomicContext *context); void setCacheSize(std::size_t cache_size); @@ -323,8 +324,10 @@ namespace db0 std::vector m_current_prefix_history; // shared object list is for maintainig v_object cache evition policy at a process level mutable FixedObjectList m_shared_object_list; - // flag indicating atomic operation in progress - AtomicContext *m_atomic_context_ptr = nullptr; + // active atomic operation stack + // Non-owning active atomic stack. AtomicContext instances are owned by the + // Python wrapper; the workspace only observes the current lexical/LIFO context. + std::vector m_atomic_context_stack; mutable std::shared_ptr m_lang_cache; std::unique_ptr m_workspace_threads; // associated workspace views (some of which may already be deleted) diff --git a/tests/unit_tests/PrefixImplTest.cpp b/tests/unit_tests/PrefixImplTest.cpp index e68efaad..edd68352 100644 --- a/tests/unit_tests/PrefixImplTest.cpp +++ b/tests/unit_tests/PrefixImplTest.cpp @@ -459,6 +459,115 @@ namespace tests cut.close(); } + TEST_F( PrefixImplTest , testNestedAtomicDPLockCommitMergesIntoParent ) + { + BDevStorage::create(file_name); + PrefixImpl cut(file_name, m_dirty_meter, &m_cache_recycler, std::make_shared(file_name)); + + { + auto w1 = cut.mapRange(16, 8, { AccessOptions::write }); + memcpy(w1.modify(), "base0000", 8); + } + + cut.beginAtomic(); + { + auto w1 = cut.mapRange(16, 8, { AccessOptions::read, AccessOptions::write }); + memcpy(w1.modify(), "outer000", 8); + } + + cut.beginAtomic(); + { + auto w1 = cut.mapRange(16, 8, { AccessOptions::read, AccessOptions::write }); + memcpy((char*)w1.modify() + 2, "INNER", 5); + } + cut.endAtomic(); + cut.endAtomic(); + + { + auto lock = cut.mapRange(16, 8, { AccessOptions::read }); + auto str_value = std::string((char *)lock.m_buffer, 8); + ASSERT_EQ(str_value, "ouINNER0"); + } + + cut.close(); + } + + TEST_F( PrefixImplTest , testNestedAtomicBoundaryLockRollbackKeepsParentUpdate ) + { + BDevStorage::create(file_name); + PrefixImpl cut(file_name, m_dirty_meter, &m_cache_recycler, std::make_shared(file_name)); + auto page_size = cut.getPageSize(); + + { + auto w1 = cut.mapRange(page_size - 4, 8, { AccessOptions::write }); + memcpy(w1.modify(), "base0000", 8); + } + + cut.beginAtomic(); + { + auto w1 = cut.mapRange(page_size - 4, 8, { AccessOptions::read, AccessOptions::write }); + memcpy(w1.modify(), "outer000", 8); + } + + cut.beginAtomic(); + { + auto w1 = cut.mapRange(page_size - 4, 8, { AccessOptions::read, AccessOptions::write }); + memcpy((char*)w1.modify() + 2, "ROLL", 4); + } + cut.cancelAtomic(); + cut.endAtomic(); + + { + auto lock = cut.mapRange(page_size - 4, 8, { AccessOptions::read }); + auto str_value = std::string((char *)lock.m_buffer, 8); + ASSERT_EQ(str_value, "outer000"); + + auto lhs = cut.mapRange(page_size - 4, 4, { AccessOptions::read }); + str_value = std::string((char *)lhs.m_buffer, 4); + ASSERT_EQ(str_value, "oute"); + + auto rhs = cut.mapRange(page_size, 4, { AccessOptions::read }); + str_value = std::string((char *)rhs.m_buffer, 4); + ASSERT_EQ(str_value, "r000"); + } + + cut.close(); + } + + TEST_F( PrefixImplTest , testNestedAtomicWideLockCommitMergesIntoParent ) + { + BDevStorage::create(file_name); + PrefixImpl cut(file_name, m_dirty_meter, &m_cache_recycler, std::make_shared(file_name)); + auto page_size = cut.getPageSize(); + + { + auto w1 = cut.mapRange(page_size, page_size + 8, { AccessOptions::write }); + memcpy((char*)w1.modify() + page_size, "base0000", 8); + } + + cut.beginAtomic(); + { + auto w1 = cut.mapRange(page_size, page_size + 8, { AccessOptions::read, AccessOptions::write }); + memcpy((char*)w1.modify() + page_size, "outer000", 8); + } + + cut.beginAtomic(); + { + auto w1 = cut.mapRange(page_size, page_size + 8, { AccessOptions::read, AccessOptions::write }); + memcpy((char*)w1.modify() + page_size + 2, "INNER", 5); + } + cut.endAtomic(); + cut.endAtomic(); + + { + auto lock = cut.mapRange(page_size, page_size + 8, { AccessOptions::read }); + auto str_value = std::string((char *)lock.m_buffer + page_size, 8); + ASSERT_EQ(str_value, "ouINNER0"); + } + + cut.close(); + } + TEST_F( PrefixImplTest , testMergingAtomicAndNonAtomicUpdates ) { BDevStorage::create(file_name); From 1e24536513afeaeb5828a633c9097499ae0b1d1a Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sun, 17 May 2026 13:26:31 +0200 Subject: [PATCH 2/4] post-review cleanups --- python_tests/test_atomic.py | 26 ++++++ src/dbzero/core/memory/PrefixCache.cpp | 14 +++- src/dbzero/core/memory/PrefixCache.hpp | 18 ++++- src/dbzero/core/memory/PrefixImpl.cpp | 2 +- src/dbzero/core/memory/SlabManager.cpp | 108 +++++++++++++++++-------- src/dbzero/core/memory/SlabManager.hpp | 7 ++ tests/unit_tests/PrefixCacheTest.cpp | 24 ++++++ 7 files changed, 158 insertions(+), 41 deletions(-) diff --git a/python_tests/test_atomic.py b/python_tests/test_atomic.py index 470c0e52..59e1715c 100644 --- a/python_tests/test_atomic.py +++ b/python_tests/test_atomic.py @@ -688,6 +688,32 @@ def test_nested_atomic_cancel_reverts_index_add_without_corrupting_index(db0_fix gc.collect() +def test_nested_atomic_rollback_preserves_parent_list_slab_metadata(db0_fixture): + items = db0.list() + + with db0.atomic(): + for i in range(20): + items.append(f"parent-before-{i}") + + try: + with db0.atomic(): + for i in range(20): + items.append(f"child-rollback-{i}") + raise RuntimeError("rollback child list writes") + except RuntimeError: + pass + + for i in range(20): + items.append(f"parent-after-{i}") + + assert list(items) == [ + *(f"parent-before-{i}" for i in range(20)), + *(f"parent-after-{i}" for i in range(20)), + ] + items = None + gc.collect() + + def test_deep_nested_atomic_mixed_commit_and_rollback(db0_fixture): items = db0.list() index = db0.index() diff --git a/src/dbzero/core/memory/PrefixCache.cpp b/src/dbzero/core/memory/PrefixCache.cpp index dc864708..f6e6d99d 100644 --- a/src/dbzero/core/memory/PrefixCache.cpp +++ b/src/dbzero/core/memory/PrefixCache.cpp @@ -481,9 +481,13 @@ namespace db0 }); } - void PrefixCache::beginAtomic() + void PrefixCache::beginAtomic(StateNumType state_num) { - m_volatile_lock_stack.emplace_back(); + // Atomic state numbers grow as nested blocks are entered. Recording the state + // on the frame makes rollback/merge validate the same LIFO order that owns + // the volatile locks. + assert(m_volatile_lock_stack.empty() || m_volatile_lock_stack.back().m_state_num < state_num); + m_volatile_lock_stack.emplace_back(state_num); } void PrefixCache::commit(ProcessTimer *parent_timer) @@ -532,6 +536,9 @@ namespace db0 void PrefixCache::rollback(StateNumType state_num) { assert(!m_volatile_lock_stack.empty()); + // Only the youngest atomic frame may be rolled back. If this assertion trips, + // the caller is trying to unwind a parent while child volatile locks still exist. + assert(m_volatile_lock_stack.back().m_state_num == state_num); auto volatileLocks = std::move(m_volatile_lock_stack.back()); m_volatile_lock_stack.pop_back(); // remove all volatile locks @@ -627,6 +634,9 @@ namespace db0 std::vector > &reused_locks) { assert(!m_volatile_lock_stack.empty()); + // Merge is the commit path for an atomic frame, so it must consume the stack + // top associated with the temporary state being merged. + assert(m_volatile_lock_stack.back().m_state_num == from_state_num); auto volatileLocks = std::move(m_volatile_lock_stack.back()); m_volatile_lock_stack.pop_back(); diff --git a/src/dbzero/core/memory/PrefixCache.hpp b/src/dbzero/core/memory/PrefixCache.hpp index 16ddad56..ac52c544 100644 --- a/src/dbzero/core/memory/PrefixCache.hpp +++ b/src/dbzero/core/memory/PrefixCache.hpp @@ -123,7 +123,7 @@ namespace db0 // Flush managed boundary locks only void flushBoundary(); - void beginAtomic(); + void beginAtomic(StateNumType state_num); /** * Relase / rollback all locks stored by the cache @@ -186,11 +186,21 @@ namespace db0 // marker lock (to mark missing ranges) const std::shared_ptr m_missing_dp_lock_ptr; const std::shared_ptr m_missing_wide_lock_ptr; - // One volatile-lock frame per nested atomic block. Boundary locks are tracked by frame - // because their parent DP states can change during merge/rebase, so state-derived - // ownership is not reliable enough for scoped rollback. + // One volatile-lock frame per nested atomic block. Each frame records the temporary + // state number assigned to its atomic block, which lets rollback/merge assert that + // nested atomic contexts are unwound strictly from the stack top. + // + // Boundary locks are tracked by frame because their parent DP states can change + // during merge/rebase, so state-derived ownership is not reliable enough for + // scoped rollback. struct VolatileLocks { + explicit VolatileLocks(StateNumType state_num) + : m_state_num(state_num) + { + } + + StateNumType m_state_num; std::vector > m_dp_locks; std::vector > m_wide_locks; std::vector > m_boundary_locks; diff --git a/src/dbzero/core/memory/PrefixImpl.cpp b/src/dbzero/core/memory/PrefixImpl.cpp index bbe988aa..41c81b5e 100644 --- a/src/dbzero/core/memory/PrefixImpl.cpp +++ b/src/dbzero/core/memory/PrefixImpl.cpp @@ -391,8 +391,8 @@ namespace db0 // this is because the atomic operation needs to start over a DP-consistent state // Due to the same reason, also flush the residual parts of wide locks m_cache.flushBoundary(); - m_cache.beginAtomic(); // increment state number to allow isolation + m_cache.beginAtomic(m_head_state_num + 1); ++m_head_state_num; ++m_atomic_depth; } diff --git a/src/dbzero/core/memory/SlabManager.cpp b/src/dbzero/core/memory/SlabManager.cpp index 74b80f01..df2b71d8 100644 --- a/src/dbzero/core/memory/SlabManager.cpp +++ b/src/dbzero/core/memory/SlabManager.cpp @@ -2,6 +2,7 @@ // Copyright (c) 2025 DBZero Software sp. z o.o. #include "SlabManager.hpp" +#include namespace db0 @@ -66,6 +67,16 @@ namespace db0 assert(locality < m_active_slab.size()); m_active_slab[locality] = {}; } + + void SlabManager::resetActiveSlab(std::shared_ptr slab) + { + if (m_active_slab[0] == slab) { + m_active_slab[0] = {}; + } + if (m_active_slab[1] == slab) { + m_active_slab[1] = {}; + } + } std::shared_ptr SlabManager::findFirst(std::size_t size, unsigned char locality) { @@ -228,6 +239,37 @@ namespace db0 } return slab; } + + void SlabManager::markDirty(std::shared_ptr slab) + { + if (!m_atomic_stack.empty()) { + m_atomic_stack.back().m_dirty_slabs.insert(m_slab_address_func(slab->m_cap_item.m_slab_id)); + } + if (!slab->m_is_dirty) { + slab->m_is_dirty = true; + m_dirty_slabs.push_back(slab); + } + } + + void SlabManager::invalidateCachedSlab(std::uint64_t address) + { + auto it = m_slabs.find(address); + if (it == m_slabs.end()) { + return; + } + + auto slab_item = it->second.lock(); + if (slab_item) { + slab_item->m_is_dirty = false; + resetActiveSlab(slab_item); + if (m_recycler_ptr) { + m_recycler_ptr->closeOne([&slab_item](const SlabItem &item) { + return slab_item.get() == &item; + }); + } + } + m_slabs.erase(it); + } void SlabManager::erase(std::shared_ptr slab) { erase(slab, true); @@ -342,7 +384,11 @@ namespace db0 } void SlabManager::beginAtomic() - { + { + // Parent-frame slab capacity metadata lives in SlabItem until saved. + // Persist dirty metadata before opening a child frame so child rollback can + // evict stale child-mutated SlabItem instances without losing parent updates. + saveDirtySlabs(); m_atomic_stack.emplace_back(); } @@ -370,6 +416,11 @@ namespace db0 parentVolatileSlabs.insert(parentVolatileSlabs.end(), atomicFrame.m_volatile_slabs.begin(), atomicFrame.m_volatile_slabs.end()); } + + if (!atomicFrame.m_dirty_slabs.empty() && !m_atomic_stack.empty()) { + auto &parentDirtySlabs = m_atomic_stack.back().m_dirty_slabs; + parentDirtySlabs.insert(atomicFrame.m_dirty_slabs.begin(), atomicFrame.m_dirty_slabs.end()); + } } void SlabManager::cancelAtomic() @@ -378,32 +429,26 @@ namespace db0 auto atomicFrame = std::move(m_atomic_stack.back()); m_atomic_stack.pop_back(); - // Rollback restores the prefix state, so cached slab objects from the - // canceled frame may carry stale capacity metadata. Drop all slab cache - // state and reopen slabs lazily from the restored prefix. - for (auto slab_addr : atomicFrame.m_volatile_slabs) { - auto it = m_slabs.find(slab_addr); - if (it != m_slabs.end()) { - auto slab_item = it->second.lock(); - if (slab_item) { - slab_item->m_is_dirty = false; - } - m_slabs.erase(it); - } + // Rollback restores prefix data for the canceled frame. Only slab + // allocators mutated by that frame can now carry stale in-memory state, + // so evict just those slabs and let later reads reopen them lazily. + for (auto slab_addr : atomicFrame.m_dirty_slabs) { + invalidateCachedSlab(slab_addr); } - for (auto &slab_item : m_dirty_slabs) { - slab_item->m_is_dirty = false; + for (auto slab_addr : atomicFrame.m_volatile_slabs) { + invalidateCachedSlab(slab_addr); } - m_dirty_slabs.clear(); - if (m_recycler_ptr) { - m_recycler_ptr->close([this](const SlabItem &slab) { - return &slab->getPrefix() == m_prefix.get(); - }); + + m_dirty_slabs.erase( + std::remove_if(m_dirty_slabs.begin(), m_dirty_slabs.end(), + [](const std::shared_ptr &slab_item) { + return !slab_item || !slab_item->m_is_dirty; + }), + m_dirty_slabs.end()); + + if (!atomicFrame.m_dirty_slabs.empty() || !atomicFrame.m_volatile_slabs.empty()) { + m_next_slab_id = {}; } - m_slabs.clear(); - m_reserved_slabs.clear(); - m_active_slab = {}; - m_next_slab_id = {}; } void SlabManager::saveItem(SlabItem &item) const @@ -576,11 +621,9 @@ namespace db0 } if (!unique || ((*slab)->tryMakeAddressUnique(*addr, instance_id))) { - // modified, add to dirty slabs - if (!slab->m_is_dirty) { - slab->m_is_dirty = true; - m_dirty_slabs.push_back(slab); - } + // Modified slabs are tracked per atomic frame so rollback can evict + // only stale allocator instances from that frame. + markDirty(slab); return addr; } @@ -643,15 +686,12 @@ namespace db0 auto slab = find(slab_id); assert(slab); (*slab)->free(address); + markDirty(slab); if ((*slab)->empty()) { // erase or mark as erased erase(slab); } else { - // modified, add to dirty slabs - if (!slab->m_is_dirty) { - slab->m_is_dirty = true; - m_dirty_slabs.push_back(slab); - } + // Slab metadata will be saved lazily by saveDirtySlabs(). } } diff --git a/src/dbzero/core/memory/SlabManager.hpp b/src/dbzero/core/memory/SlabManager.hpp index f09acde1..dfd6e5ac 100644 --- a/src/dbzero/core/memory/SlabManager.hpp +++ b/src/dbzero/core/memory/SlabManager.hpp @@ -15,6 +15,7 @@ #include #include #include +#include namespace db0 @@ -106,6 +107,7 @@ namespace db0 */ std::shared_ptr tryGetActiveSlab(unsigned char locality); void resetActiveSlab(unsigned char locality); + void resetActiveSlab(std::shared_ptr); /** * Retrieve the 1st slab to allocate a block of at least min_capacity @@ -128,6 +130,8 @@ namespace db0 // Find existing slab by ID std::shared_ptr tryFind(std::uint32_t slab_id) const; std::shared_ptr find(std::uint32_t slab_id) const; + void markDirty(std::shared_ptr); + void invalidateCachedSlab(std::uint64_t); /** * Erase if 'slab' is the last slab @@ -160,6 +164,9 @@ namespace db0 { // Slabs created in this atomic block; removed from cache if the block is rolled back. std::vector m_volatile_slabs; + // Existing slabs whose in-memory allocator state was changed in this block. + // Rollback evicts only these slabs so they are reopened lazily from the restored prefix. + std::unordered_set m_dirty_slabs; // Frees requested in this atomic block; promoted on commit, discarded on rollback. std::vector
m_deferred_free_ops; }; diff --git a/tests/unit_tests/PrefixCacheTest.cpp b/tests/unit_tests/PrefixCacheTest.cpp index 7bd89900..fdccf72d 100644 --- a/tests/unit_tests/PrefixCacheTest.cpp +++ b/tests/unit_tests/PrefixCacheTest.cpp @@ -93,5 +93,29 @@ namespace tests ASSERT_FALSE(lock_1); cut.release(); } + + TEST_F( PrefixCacheTest , testNestedAtomicRollbackConsumesTopVolatileFrame ) + { + db0::Storage0 dev_null; + PrefixCache cut(dev_null, nullptr, 0); + + cut.beginAtomic(2); + auto parent_lock = cut.createPage(0, 0, 2, { AccessOptions::write, AccessOptions::no_flush }); + + cut.beginAtomic(3); + auto child_lock = cut.createPage(1, 0, 3, { AccessOptions::write, AccessOptions::no_flush }); + + StateNumType read_state_num; + ASSERT_EQ(cut.findPage(0, 2, {}, read_state_num), parent_lock); + ASSERT_EQ(cut.findPage(1, 3, {}, read_state_num), child_lock); + + cut.rollback(3); + ASSERT_EQ(cut.findPage(1, 3, {}, read_state_num), nullptr); + ASSERT_EQ(cut.findPage(0, 2, {}, read_state_num), parent_lock); + + cut.rollback(2); + ASSERT_EQ(cut.findPage(0, 2, {}, read_state_num), nullptr); + cut.release(); + } } From 4e65579cf6c94b57dafc36227be6420f268a8cf9 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sun, 17 May 2026 13:32:57 +0200 Subject: [PATCH 3/4] post-review improvements --- src/dbzero/core/memory/SlabManager.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/dbzero/core/memory/SlabManager.cpp b/src/dbzero/core/memory/SlabManager.cpp index df2b71d8..689ef307 100644 --- a/src/dbzero/core/memory/SlabManager.cpp +++ b/src/dbzero/core/memory/SlabManager.cpp @@ -242,13 +242,16 @@ namespace db0 void SlabManager::markDirty(std::shared_ptr slab) { + if (slab->m_is_dirty) { + return; + } if (!m_atomic_stack.empty()) { + // The first dirty transition registers this slab with the active frame. + // Subsequent mutations are covered by the dirty flag and dirty-list entry. m_atomic_stack.back().m_dirty_slabs.insert(m_slab_address_func(slab->m_cap_item.m_slab_id)); } - if (!slab->m_is_dirty) { - slab->m_is_dirty = true; - m_dirty_slabs.push_back(slab); - } + slab->m_is_dirty = true; + m_dirty_slabs.push_back(slab); } void SlabManager::invalidateCachedSlab(std::uint64_t address) From f20dc19e8e666da3bf46cc09894e25788182fa8f Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sun, 17 May 2026 13:36:17 +0200 Subject: [PATCH 4/4] dirty pre-check --- src/dbzero/core/memory/SlabManager.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/dbzero/core/memory/SlabManager.cpp b/src/dbzero/core/memory/SlabManager.cpp index 689ef307..b4dc5778 100644 --- a/src/dbzero/core/memory/SlabManager.cpp +++ b/src/dbzero/core/memory/SlabManager.cpp @@ -626,7 +626,9 @@ namespace db0 if (!unique || ((*slab)->tryMakeAddressUnique(*addr, instance_id))) { // Modified slabs are tracked per atomic frame so rollback can evict // only stale allocator instances from that frame. - markDirty(slab); + if (!slab->m_is_dirty) { + markDirty(slab); + } return addr; } @@ -689,7 +691,9 @@ namespace db0 auto slab = find(slab_id); assert(slab); (*slab)->free(address); - markDirty(slab); + if (!slab->m_is_dirty) { + markDirty(slab); + } if ((*slab)->empty()) { // erase or mark as erased erase(slab);