From 5cdd616aae821366b85ae3a4eea326feebcca648 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Wed, 19 Nov 2025 12:50:30 +0100 Subject: [PATCH 1/2] test code --- python_tests/test_list.py | 39 ++++++++++++++++++- src/dbzero/bindings/python/Memo.cpp | 36 +++++------------ .../python/collections/PyByteArray.cpp | 6 +-- .../SGB_Tree/SGB_CompressedLookupTree.hpp | 30 ++++++++++++++ .../collections/SGB_Tree/SGB_LookupTree.hpp | 2 +- src/dbzero/core/memory/PrefixImpl.cpp | 2 - src/dbzero/core/metaprog/misc_utils.hpp | 14 +++++++ src/dbzero/core/storage/BDevStorage.cpp | 17 ++++++-- src/dbzero/core/storage/BDevStorage.hpp | 2 + src/dbzero/core/storage/SparseIndexBase.hpp | 39 ++++++++++++++++--- .../object_model/object/ObjectImplBase.cpp | 2 +- src/dbzero/object_model/value/Member.cpp | 17 ++++---- 12 files changed, 156 insertions(+), 50 deletions(-) diff --git a/python_tests/test_list.py b/python_tests/test_list.py index 68f116ab..cc46252b 100644 --- a/python_tests/test_list.py +++ b/python_tests/test_list.py @@ -4,6 +4,7 @@ from .memo_test_types import MemoTestClass, MemoTestSingleton from .conftest import DB0_DIR from .memo_test_types import MemoTestClass +import random def make_python_list(): @@ -465,12 +466,14 @@ def test_list_extend_with_none(db0_fixture): for i in range(1024): assert cut[i] is None + def test_db0_list_str_same_as_python_list(db0_fixture): db0_list = db0.list([1, "two", 3.0, None]) py_list = [1, "two", 3.0, None] assert str(db0_list) == str(py_list) assert repr(db0_list) == repr(py_list) + def test_db0_list_str_with_nested_objects(db0_fixture): inner_list = db0.list([1, 2, 3]) db0_list = db0.list([inner_list, "test", None]) @@ -479,6 +482,7 @@ def test_db0_list_str_with_nested_objects(db0_fixture): assert str(db0_list) == str(py_list) assert repr(db0_list) == repr(py_list) + def test_db0_list_str_with_nested_memo_objects(db0_fixture): inner_memo = MemoTestClass("inner") db0_list = db0.list([inner_memo, "test", None]) @@ -487,15 +491,48 @@ def test_db0_list_str_with_nested_memo_objects(db0_fixture): assert str(db0_list) == str(py_list) assert repr(db0_list) == repr(py_list) + def test_db0_list_islice_iteration(db0_fixture): db0_list = db0.list(range(30)) expected_values = [10, 12, 14, 16, 18] for index, value in enumerate(itertools.islice(db0_list, 10, 20, 2)): assert value == expected_values[index] + def test_db0_list_compare_with_other_typse(db0_fixture): db0_list = db0.list([1, 2, 3]) python_tuple = (1, 2, 3) python_set = {1, 2, 3} assert db0_list != python_tuple - assert db0_list != python_set \ No newline at end of file + assert db0_list != python_set + + +@pytest.mark.stress_test +@pytest.mark.parametrize("db0_autocommit_fixture", [500], indirect=True) +def test_append_to_random_lists(db0_autocommit_fixture): + print("Creating multiple lists") + db0.set_cache_size(8 << 30) + lists = db0.dict() + for k in range(100000): + lists[k] = db0.list() + + RANDOM_BYTES = b'DB0'*22000 + count = 0 + db0.commit() + print(f"Appending objects to random {len(lists)} lists") + for _ in range(200000): + item = lists[random.randint(0, len(lists) - 1)] + if random.randint(0, 100) < 10: + # 20% chance to create a large object + data_size = random.randint(8000, 56000) + else: + # mostly create small objects + data_size = random.randint(1, 1500) + + item.append(MemoTestClass(value = RANDOM_BYTES[0:data_size])) + count += 1 + if count % 10000 == 0: + print(f"Appended {count} objects") + print(f"Prefix size = {db0.get_storage_stats()['prefix_size']} bytes") + + db0.commit() diff --git a/src/dbzero/bindings/python/Memo.cpp b/src/dbzero/bindings/python/Memo.cpp index 0b339cf8..305b4e54 100644 --- a/src/dbzero/bindings/python/Memo.cpp +++ b/src/dbzero/bindings/python/Memo.cpp @@ -198,7 +198,7 @@ namespace db0::python } template - void MemoObject_del(MemoImplT *memo_obj) + void PyAPI_MemoObject_del(MemoImplT *memo_obj) { PY_API_FUNC // destroy associated db0 Object instance @@ -340,21 +340,7 @@ namespace db0::python if (member.get()) { return member.steal(); } - - /* FIXME: log - // Use type's tp_getattro to avoid instance dict access issues in Python 3.10 - // Since we disable Py_TPFLAGS_MANAGED_DICT, PyObject_GenericGetAttr can crash - // when it tries to access the instance dictionary. Instead, we use the base type's - // getattro or fall back to PyType_Type's implementation. - PyTypeObject *type = Py_TYPE(memo_obj); - PyTypeObject *base = type->tp_base; - - // Use base class tp_getattro if it's not the memo wrapper itself - if (base && base->tp_getattro && base->tp_getattro != (getattrofunc)PyAPI_MemoObject_getattro) { - return base->tp_getattro(reinterpret_cast(memo_obj), attr); - } - */ - + // Fallback to type-level attribute lookup only (no instance dict) return PyObject_GenericGetAttr(reinterpret_cast(memo_obj), attr); } @@ -507,7 +493,7 @@ namespace db0::python // Regular memo slots static PyType_Slot MemoObject_common_slots[] = { {Py_tp_new, (void *)PyAPI_MemoObject_new}, - {Py_tp_dealloc, (void *)(MemoObject_del)}, + {Py_tp_dealloc, (void *)(PyAPI_MemoObject_del)}, {Py_tp_init, (void *)PyAPI_MemoObject_init}, {Py_tp_getattro, (void *)PyAPI_MemoObject_getattro}, {Py_tp_setattro, (void *)PyAPI_MemoObject_setattro}, @@ -521,7 +507,7 @@ namespace db0::python // Immutable memo slots static PyType_Slot MemoImmutableObject_common_slots[] = { {Py_tp_new, (void *)PyAPI_MemoObject_new}, - {Py_tp_dealloc, (void *)(MemoObject_del)}, + {Py_tp_dealloc, (void *)(PyAPI_MemoObject_del)}, {Py_tp_init, (void *)PyAPI_MemoObject_init}, {Py_tp_getattro, (void *)PyAPI_MemoObject_getattro}, // set available only on pre-initialized objects @@ -884,8 +870,6 @@ namespace db0::python return member.steal(); } - // FIXME: log - // return _PyObject_GenericGetAttrWithDict(reinterpret_cast(memo_obj), attr, NULL, 0); return PyObject_GenericGetAttr(reinterpret_cast(memo_obj), attr); } @@ -1000,7 +984,7 @@ namespace db0::python } Py_RETURN_FALSE; } - + PyObject *tryGetSchema(PyTypeObject *py_type) { using SchemaTypeId = db0::object_model::SchemaTypeId; @@ -1053,8 +1037,8 @@ namespace db0::python bool PyAnyMemoType_Check(PyTypeObject *type) { assert(type); - return type->tp_dealloc == reinterpret_cast((void(*)(MemoObject*))MemoObject_del) || - type->tp_dealloc == reinterpret_cast((void(*)(MemoImmutableObject*))MemoObject_del); + return type->tp_dealloc == reinterpret_cast((void(*)(MemoObject*))PyAPI_MemoObject_del) || + type->tp_dealloc == reinterpret_cast((void(*)(MemoImmutableObject*))PyAPI_MemoObject_del); } template @@ -1063,7 +1047,7 @@ namespace db0::python assert(obj); // needs to stay as 2 lines to proper compile on window auto expected = reinterpret_cast( - static_cast(&MemoObject_del) + static_cast(&PyAPI_MemoObject_del) ); return obj->ob_type->tp_dealloc == expected; } @@ -1072,9 +1056,9 @@ namespace db0::python bool PyMemoType_Check(PyTypeObject *type) { assert(type); - // needs to stay as 2 lines to proper compile on window + // needs to stay as 2 lines to proper compile on windows auto expected = reinterpret_cast( - static_cast(&MemoObject_del) + static_cast(&PyAPI_MemoObject_del) ); return type->tp_dealloc == expected; } diff --git a/src/dbzero/bindings/python/collections/PyByteArray.cpp b/src/dbzero/bindings/python/collections/PyByteArray.cpp index 6b2a5f6a..bd90927d 100644 --- a/src/dbzero/bindings/python/collections/PyByteArray.cpp +++ b/src/dbzero/bindings/python/collections/PyByteArray.cpp @@ -295,11 +295,11 @@ namespace db0::python lock->getLangCache().add(bytearray_object.get()->ext().getAddress(), bytearray_object.get()); return bytearray_object.steal(); } - - ByteArrayObject *PyAPI_makeByteArray(PyObject *self, PyObject *const *args, Py_ssize_t nargs){ + + ByteArrayObject *PyAPI_makeByteArray(PyObject *self, PyObject *const *args, Py_ssize_t nargs) { PY_API_FUNC return runSafe(tryPyAPI_makeByteArray, self, args, nargs); - } + } bool ByteArrayObject_Check(PyObject *object) { return Py_TYPE(object) == &ByteArrayObjectType; diff --git a/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp b/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp index a5015bd7..4fad1fb3 100644 --- a/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp +++ b/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp @@ -8,6 +8,7 @@ #include "SGB_LookupTree.hpp" #include #include +#include namespace db0 @@ -286,6 +287,13 @@ DB0_PACKED_END if (this->m_access_type == AccessType::READ_WRITE) { this->onNodeLookup(node); } + + // FIXME: log + if (!node->header().canFit(key)) { + std::cout << "Unable to fit key: " << key << std::endl; + std::cout << "The header is: " << node->header().toString() << std::endl; + } + /* FIXME: causing segfault in some cases, need to investigate if (!node->header().canFit(key)) { return { nullptr, sg_tree_const_iterator() }; @@ -309,6 +317,13 @@ DB0_PACKED_END if (this->m_access_type == AccessType::READ_WRITE) { this->onNodeLookup(node); } + + // FIXME: log + if (!node->header().canFit(key)) { + std::cout << "Unable to fit key: " << key << std::endl; + std::cout << "The header is: " << node->header().toString() << std::endl; + } + // within the node look up by compressed key // NOTE: if unable to fit key then the item cannot be present in the node /* FIXME: causing segfault in some cases, need to investigate @@ -343,6 +358,12 @@ DB0_PACKED_END --node; } + // FIXME: log + if (!node->header().canFit(key)) { + std::cout << "Unable to fit key: " << key << std::endl; + std::cout << "The header is: " << node->header().toString() << std::endl; + } + // node will be sorted if needed (only if opened as READ/WRITE) if (this->m_access_type == AccessType::READ_WRITE) { this->onNodeLookup(node); @@ -384,9 +405,18 @@ DB0_PACKED_END if (this->m_access_type == AccessType::READ_WRITE) { this->onNodeLookup(node); } + + // FIXME: log + if (!node->header().canFit(key)) { + std::cout << "Unable to fit key: " << key << std::endl; + std::cout << "The header is: " << node->header().toString() << std::endl; + } + + /* FIXME: log if (!node->header().canFit(key)) { return nullptr; } + */ // within the node look up by compressed key // NOTE: if unable to fit key then the item cannot be present in the node return node->lower_equal_bound(node->header().compress(key), this->m_heap_comp); diff --git a/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp b/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp index 23c263da..567bb9e3 100644 --- a/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp +++ b/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp @@ -488,5 +488,5 @@ DB0_PACKED_END { } }; - + } \ No newline at end of file diff --git a/src/dbzero/core/memory/PrefixImpl.cpp b/src/dbzero/core/memory/PrefixImpl.cpp index 33aa7901..7e45cc6b 100644 --- a/src/dbzero/core/memory/PrefixImpl.cpp +++ b/src/dbzero/core/memory/PrefixImpl.cpp @@ -313,8 +313,6 @@ namespace db0 if (timer_ptr) { timer = std::make_unique("Prefix::close", timer_ptr); } - // FIXME: log - // m_cache.release(); m_storage_ptr->close(); } diff --git a/src/dbzero/core/metaprog/misc_utils.hpp b/src/dbzero/core/metaprog/misc_utils.hpp index db6d3a66..35242960 100644 --- a/src/dbzero/core/metaprog/misc_utils.hpp +++ b/src/dbzero/core/metaprog/misc_utils.hpp @@ -1,6 +1,8 @@ #pragma once #include +#include +#include namespace db0 @@ -26,3 +28,15 @@ namespace db0 {}; } + +namespace std + +{ + + template + ostream& operator<<(ostream& os, const std::pair& p) { + return os << "(" << p.first << ", " << p.second << ")"; + } + +} + diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index 9368e1e6..d58ab5af 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -151,12 +151,14 @@ namespace db0 bool BDevStorage::tryFindMutation(std::uint64_t page_num, StateNumType state_num, StateNumType &mutation_id) const { + std::shared_lock lock(m_mutex); return db0::tryFindMutation(m_sparse_index, m_diff_index, page_num, state_num, mutation_id); } StateNumType BDevStorage::findMutation(std::uint64_t page_num, StateNumType state_num) const { StateNumType result; + std::shared_lock lock(m_mutex); if (!db0::tryFindMutation(m_sparse_index, m_diff_index, page_num, state_num, result)) { assert(false && "BDevStorage::findMutation: page not found"); THROWF(db0::IOException) @@ -168,6 +170,7 @@ namespace db0 void BDevStorage::read(std::uint64_t address, StateNumType state_num, std::size_t size, void *buffer, FlagSet flags) const { + std::shared_lock lock(m_mutex); _read(address, state_num, size, buffer, flags); } @@ -231,11 +234,13 @@ namespace db0 assert(state_num > 0 && "BDevStorage::write: state number must be > 0"); assert((address % m_config.m_page_size == 0) && "BDevStorage::write: address must be page-aligned"); assert((size % m_config.m_page_size == 0) && "BDevStorage::write: size must be page-aligned"); - + auto begin_page = address / m_config.m_page_size; auto end_page = begin_page + size / m_config.m_page_size; std::byte *write_buf = reinterpret_cast(buffer); + + std::unique_lock lock(m_mutex); // write as physical pages and register with the sparse index for (auto page_num = begin_page; page_num != end_page; ++page_num, write_buf += m_config.m_page_size) { // look up if page has already been added in current transaction @@ -265,6 +270,7 @@ namespace db0 auto page_num = address / m_config.m_page_size; + std::unique_lock lock(m_mutex); // Use SparseIndexQuery to determine the current sequence length & check limits SparseIndexQuery query(m_sparse_index, m_diff_index, page_num, state_num); // if a page has already been written as full-DP in the current transaction then @@ -300,6 +306,7 @@ namespace db0 bool BDevStorage::flush(ProcessTimer *parent_timer) { + std::unique_lock lock(m_mutex); std::unique_ptr timer; if (parent_timer) { timer = std::make_unique("BDevStorage::flush", parent_timer); @@ -522,6 +529,7 @@ namespace db0 void BDevStorage::getStats(std::function callback) const { + std::unique_lock lock(m_mutex); callback("dram_io_rand_ops", m_dram_io.getRandOpsCount()); callback("dram_prefix_size", m_dram_io.getDRAMPrefix().size()); auto file_rand_ops = m_file.getRandOps(); @@ -541,7 +549,9 @@ namespace db0 #endif } - std::pair BDevStorage::getDiff_IOStats() const { + std::pair BDevStorage::getDiff_IOStats() const + { + std::unique_lock lock(m_mutex); return m_page_io.getStats(); } @@ -572,6 +582,7 @@ namespace db0 void BDevStorage::fetchChangeLogs(StateNumType begin_state, std::optional end_state, std::function f) const { + std::unique_lock lock(m_mutex); if (m_dp_changelog_io.modified()) { THROWF(db0::IOException) << "BDevStorage::fetchChangeLogs: dp-changelog is modified and needs to be flushed first"; } @@ -624,7 +635,7 @@ namespace db0 #endif } - void BDevStorage::endCommit() + void BDevStorage::endCommit() { #ifndef NDEBUG m_commit_pending = false; diff --git a/src/dbzero/core/storage/BDevStorage.hpp b/src/dbzero/core/storage/BDevStorage.hpp index 30d0b1f7..77ddb77e 100644 --- a/src/dbzero/core/storage/BDevStorage.hpp +++ b/src/dbzero/core/storage/BDevStorage.hpp @@ -17,6 +17,7 @@ #include "MetaIOStream.hpp" #include #include +#include namespace db0 @@ -149,6 +150,7 @@ DB0_PACKED_END // the stream for storing & reading full-DPs and diff-encoded DPs Diff_IO m_page_io; bool m_refresh_pending = false; + mutable std::shared_mutex m_mutex; #ifndef NDEBUG // total number of bytes from mutated data pages std::uint64_t m_page_io_raw_bytes = 0; diff --git a/src/dbzero/core/storage/SparseIndexBase.hpp b/src/dbzero/core/storage/SparseIndexBase.hpp index eaaa699c..ee3420ba 100644 --- a/src/dbzero/core/storage/SparseIndexBase.hpp +++ b/src/dbzero/core/storage/SparseIndexBase.hpp @@ -2,6 +2,16 @@ #include #include + +namespace db0 +{ + // Forward declarations for operator<< to be used in SGB_LookupTree.hpp + template class SparseIndexBase; + + template + std::ostream &operator<<(std::ostream &os, const typename db0::SparseIndexBase::BlockHeader &header); +} + #include #include #include @@ -99,9 +109,6 @@ namespace db0 void commit(); - protected: - friend class SparsePair; - struct BlockHeader { // number of the 1st page in a data block / node (high order bits) @@ -122,8 +129,12 @@ namespace db0 bool canFit(const ItemT &) const; std::string toString(const CompressedItemT &) const; + std::string toString() const; }; + protected: + friend class SparsePair; + DB0_PACKED_BEGIN // tree-level header type struct DB0_PACKED_ATTR o_sparse_index_header: o_fixed_versioned @@ -284,13 +295,21 @@ DB0_PACKED_END CompressedItemT SparseIndexBase::BlockHeader::compress(const ItemT &item) const { assert(m_first_page_num == (item.m_page_num >> 24)); + // FIXME: log + if (m_first_page_num != (item.m_page_num >> 24)) { + std::terminate(); + } return CompressedItemT(m_first_page_num, item); } - + template CompressedItemT SparseIndexBase::BlockHeader::compress(std::pair item) const { assert(m_first_page_num == (item.first >> 24)); + // FIXME: log + if (m_first_page_num != (item.first >> 24)) { + std::terminate(); + } return CompressedItemT(m_first_page_num, item.first, item.second); } @@ -365,7 +384,15 @@ DB0_PACKED_END std::string SparseIndexBase::BlockHeader::toString(const CompressedItemT &item) const { return item.toString(); } - + + template + std::string SparseIndexBase::BlockHeader::toString() const + { + std::stringstream _str; + _str << "BlockHeader{ first_page_num: " << m_first_page_num << " }"; + return _str.str(); + } + template std::size_t SparseIndexBase::size() const { return m_index.size(); @@ -414,4 +441,4 @@ DB0_PACKED_END m_index.commit(); } -} +} \ No newline at end of file diff --git a/src/dbzero/object_model/object/ObjectImplBase.cpp b/src/dbzero/object_model/object/ObjectImplBase.cpp index 3b67394d..708026aa 100644 --- a/src/dbzero/object_model/object/ObjectImplBase.cpp +++ b/src/dbzero/object_model/object/ObjectImplBase.cpp @@ -178,7 +178,7 @@ namespace db0::object_model auto type_id = LangToolkit::getTypeManager().getTypeId(lang_value); // NOTE: allow storage as PACK_2 auto pre_storage_class = TypeUtils::m_storage_class_mapper.getPreStorageClass(type_id, true); - if (type_id == TypeId::MEMO_OBJECT) { + if (type_id == TypeId::MEMO_OBJECT || type_id == TypeId::MEMO_IMMUTABLE_OBJECT) { // object reference must be from the same fixture auto &obj = LangToolkit::getTypeManager().extractAnyObject(lang_value); if (fixture.getUUID() != obj.getFixture()->getUUID()) { diff --git a/src/dbzero/object_model/value/Member.cpp b/src/dbzero/object_model/value/Member.cpp index 34d4fbbe..fc8a0e59 100644 --- a/src/dbzero/object_model/value/Member.cpp +++ b/src/dbzero/object_model/value/Member.cpp @@ -56,18 +56,17 @@ namespace db0::object_model return db0::v_object(*fixture, PyUnicode_AsUTF8(obj_ptr), access_mode).getAddress(); } - // OBJECT specialization (mutable) - template <> Value createMember(db0::swine_ptr &fixture, + // OBJECT specialization (mutable or immutable) + template Value createObjectMember(db0::swine_ptr &fixture, PyObjectPtr obj_ptr, StorageClass, AccessFlags) - { - using MemoObject = PyToolkit::TypeManager::MemoObject; - auto &obj = PyToolkit::getTypeManager().extractMutableObject(obj_ptr); + { + auto &obj = PyToolkit::getTypeManager().extractMutableObject(obj_ptr); assert(obj.hasInstance()); assureSameFixture(fixture, obj); obj.modify().incRef(false); return obj.getAddress(); } - + // LIST specialization template <> Value createMember(db0::swine_ptr &fixture, PyObjectPtr obj_ptr, StorageClass, AccessFlags) @@ -332,13 +331,17 @@ namespace db0::object_model template <> void registerCreateMemberFunctions( std::vector &, PyObjectPtr, StorageClass, AccessFlags)> &functions) { + using MemoObject = PyToolkit::TypeManager::MemoObject; + using MemoImmutableObject = PyToolkit::TypeManager::MemoImmutableObject; + functions.resize(static_cast(TypeId::COUNT)); std::fill(functions.begin(), functions.end(), nullptr); functions[static_cast(TypeId::NONE)] = createMember; functions[static_cast(TypeId::INTEGER)] = createMember; functions[static_cast(TypeId::FLOAT)] = createMember; functions[static_cast(TypeId::STRING)] = createMember; - functions[static_cast(TypeId::MEMO_OBJECT)] = createMember; + functions[static_cast(TypeId::MEMO_OBJECT)] = createObjectMember; + functions[static_cast(TypeId::MEMO_IMMUTABLE_OBJECT)] = createObjectMember; functions[static_cast(TypeId::DB0_LIST)] = createMember; functions[static_cast(TypeId::DB0_INDEX)] = createMember; functions[static_cast(TypeId::DB0_SET)] = createMember; From 756b626fb265b4663433ca386cebc91b898caf19 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Wed, 19 Nov 2025 20:55:55 +0100 Subject: [PATCH 2/2] auto-commit / commit conflict fixes --- dbzero/dbzero/dbzero.py | 2 +- python_tests/test_list.py | 8 +- .../SGB_Tree/SGB_CompressedLookupTree.hpp | 28 +----- src/dbzero/core/storage/SparseIndexBase.hpp | 8 -- src/dbzero/workspace/Fixture.cpp | 2 +- src/dbzero/workspace/FixtureThreads.cpp | 87 ++++++++++--------- src/dbzero/workspace/FixtureThreads.hpp | 25 +++--- 7 files changed, 67 insertions(+), 93 deletions(-) diff --git a/dbzero/dbzero/dbzero.py b/dbzero/dbzero/dbzero.py index c9e4f4dc..21899e3d 100644 --- a/dbzero/dbzero/dbzero.py +++ b/dbzero/dbzero/dbzero.py @@ -10,7 +10,7 @@ def load_dynamic(name, path): def __bootstrap__(): global __bootstrap__, __loader__, __file__ - paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"] + paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"] __file__ = None for path in paths: if os.path.isdir(path): diff --git a/python_tests/test_list.py b/python_tests/test_list.py index cc46252b..d33d8e81 100644 --- a/python_tests/test_list.py +++ b/python_tests/test_list.py @@ -508,13 +508,13 @@ def test_db0_list_compare_with_other_typse(db0_fixture): @pytest.mark.stress_test -@pytest.mark.parametrize("db0_autocommit_fixture", [500], indirect=True) +@pytest.mark.parametrize("db0_autocommit_fixture", [50], indirect=True) def test_append_to_random_lists(db0_autocommit_fixture): print("Creating multiple lists") db0.set_cache_size(8 << 30) lists = db0.dict() for k in range(100000): - lists[k] = db0.list() + lists[k] = db0.index() RANDOM_BYTES = b'DB0'*22000 count = 0 @@ -529,8 +529,10 @@ def test_append_to_random_lists(db0_autocommit_fixture): # mostly create small objects data_size = random.randint(1, 1500) - item.append(MemoTestClass(value = RANDOM_BYTES[0:data_size])) + item.add(count, MemoTestClass(value = RANDOM_BYTES[0:data_size])) count += 1 + if count % 5000 == 0: + db0.commit() if count % 10000 == 0: print(f"Appended {count} objects") print(f"Prefix size = {db0.get_storage_stats()['prefix_size']} bytes") diff --git a/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp b/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp index 4fad1fb3..f21b57a2 100644 --- a/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp +++ b/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp @@ -288,12 +288,6 @@ DB0_PACKED_END this->onNodeLookup(node); } - // FIXME: log - if (!node->header().canFit(key)) { - std::cout << "Unable to fit key: " << key << std::endl; - std::cout << "The header is: " << node->header().toString() << std::endl; - } - /* FIXME: causing segfault in some cases, need to investigate if (!node->header().canFit(key)) { return { nullptr, sg_tree_const_iterator() }; @@ -318,12 +312,6 @@ DB0_PACKED_END this->onNodeLookup(node); } - // FIXME: log - if (!node->header().canFit(key)) { - std::cout << "Unable to fit key: " << key << std::endl; - std::cout << "The header is: " << node->header().toString() << std::endl; - } - // within the node look up by compressed key // NOTE: if unable to fit key then the item cannot be present in the node /* FIXME: causing segfault in some cases, need to investigate @@ -335,7 +323,7 @@ DB0_PACKED_END } } */ - + auto item_ptr = node->lower_equal_bound(node->header().compress(key), this->m_heap_comp); if (item_ptr) { // return uncompressed @@ -358,12 +346,6 @@ DB0_PACKED_END --node; } - // FIXME: log - if (!node->header().canFit(key)) { - std::cout << "Unable to fit key: " << key << std::endl; - std::cout << "The header is: " << node->header().toString() << std::endl; - } - // node will be sorted if needed (only if opened as READ/WRITE) if (this->m_access_type == AccessType::READ_WRITE) { this->onNodeLookup(node); @@ -405,12 +387,6 @@ DB0_PACKED_END if (this->m_access_type == AccessType::READ_WRITE) { this->onNodeLookup(node); } - - // FIXME: log - if (!node->header().canFit(key)) { - std::cout << "Unable to fit key: " << key << std::endl; - std::cout << "The header is: " << node->header().toString() << std::endl; - } /* FIXME: log if (!node->header().canFit(key)) { @@ -421,7 +397,7 @@ DB0_PACKED_END // NOTE: if unable to fit key then the item cannot be present in the node return node->lower_equal_bound(node->header().compress(key), this->m_heap_comp); } - + const TreeHeaderT &treeHeader() const { return base_t::getData()->treeHeader(); } diff --git a/src/dbzero/core/storage/SparseIndexBase.hpp b/src/dbzero/core/storage/SparseIndexBase.hpp index ee3420ba..3d0d9994 100644 --- a/src/dbzero/core/storage/SparseIndexBase.hpp +++ b/src/dbzero/core/storage/SparseIndexBase.hpp @@ -295,10 +295,6 @@ DB0_PACKED_END CompressedItemT SparseIndexBase::BlockHeader::compress(const ItemT &item) const { assert(m_first_page_num == (item.m_page_num >> 24)); - // FIXME: log - if (m_first_page_num != (item.m_page_num >> 24)) { - std::terminate(); - } return CompressedItemT(m_first_page_num, item); } @@ -306,10 +302,6 @@ DB0_PACKED_END CompressedItemT SparseIndexBase::BlockHeader::compress(std::pair item) const { assert(m_first_page_num == (item.first >> 24)); - // FIXME: log - if (m_first_page_num != (item.first >> 24)) { - std::terminate(); - } return CompressedItemT(m_first_page_num, item.first, item.second); } diff --git a/src/dbzero/workspace/Fixture.cpp b/src/dbzero/workspace/Fixture.cpp index 80582486..e8e49078 100644 --- a/src/dbzero/workspace/Fixture.cpp +++ b/src/dbzero/workspace/Fixture.cpp @@ -555,7 +555,7 @@ namespace db0 StateReachedCallbackList result_callbacks; auto current_state_num = prefix_ptr->getStateNum(true); auto it = m_state_num_callbacks.begin(); - while(it != m_state_num_callbacks.end() && it->first <= current_state_num) { + while (it != m_state_num_callbacks.end() && it->first <= current_state_num) { StateReachedCallbackList &state_num_callbacks = it->second; std::move(state_num_callbacks.begin(), state_num_callbacks.end(), std::back_inserter(result_callbacks)); it = m_state_num_callbacks.erase(it); diff --git a/src/dbzero/workspace/FixtureThreads.cpp b/src/dbzero/workspace/FixtureThreads.cpp index 21971969..86096c0b 100644 --- a/src/dbzero/workspace/FixtureThreads.cpp +++ b/src/dbzero/workspace/FixtureThreads.cpp @@ -14,16 +14,15 @@ namespace db0 using StateReachedCallbackList = Fixture::StateReachedCallbackList; virtual ~FixtureThreadCallbacksContext() = default; - + virtual void finalize() override { for(auto &callback : m_callbacks) { callback->execute(); } } - - void appendCallbacks(StateReachedCallbackList &&callbacks) - { + + void appendCallbacks(StateReachedCallbackList &&callbacks) { // As of writing this, the purpose of these callbacks is solely to notify observers of prefix state number being reached std::move(callbacks.begin(), callbacks.end(), std::back_inserter(m_callbacks)); } @@ -62,8 +61,8 @@ namespace db0 } void FixtureThread::run() - { - while (true) { + { + while (true) { std::unique_lock lock(m_mutex); m_cv.wait_for(lock, std::chrono::milliseconds(m_interval_ms)); if (m_stopped) { @@ -71,12 +70,12 @@ namespace db0 } // prepare commit context if configured lock.unlock(); - std::shared_ptr context = prepareContext(); + prepareContext(); // collect fixtures first std::vector > fixtures; - fixtures.reserve(m_fixtures.size()); lock.lock(); - for (auto it = m_fixtures.begin(); it != m_fixtures.end(); ) { + fixtures.reserve(m_fixtures.size()); + for (auto it = m_fixtures.begin(); it != m_fixtures.end();) { auto fixture_ptr = it->lock(); if (!fixture_ptr) { it = m_fixtures.erase(it); @@ -88,25 +87,17 @@ namespace db0 // then process as unlocked lock.unlock(); for (auto &fixture_ptr : fixtures) { - onUpdate(*fixture_ptr); - } - - if (context) { - context->finalize(); + onUpdate(*fixture_ptr); } - } + + closeContext(); + } } - + void FixtureThread::onFixtureAdded(Fixture &) { } - std::shared_ptr FixtureThread::prepareContext() { - return nullptr; - } - - - RefreshThread::RefreshThread() : FixtureThread(250) { @@ -119,11 +110,17 @@ namespace db0 m_fixture_status[uuid] = FixtureUpdateStatus{fixture.getPrefix().getLastUpdated(), ClockType::now()}; } - std::shared_ptr RefreshThread::prepareContext() + void RefreshThread::prepareContext() { - auto context = std::make_shared(); - m_tmp_context = context; - return context; + assert(!m_context && "Only one FixtureThreadCallbacksContext should exist at the time!"); + m_context = std::make_shared(); + } + + void RefreshThread::closeContext() + { + assert(m_context && "FixtureThreadCallbacksContext must exist here!"); + m_context->finalize(); + m_context = nullptr; } void RefreshThread::onUpdate(Fixture &fixture) @@ -164,9 +161,8 @@ namespace db0 auto callbacks = fixture.onRefresh(); if (!callbacks.empty()) { - auto context = m_tmp_context.lock(); - assert(context); - context->appendCallbacks(std::move(callbacks)); + assert(m_context && "FixtureThreadCallbacksContext must exist here!"); + m_context->appendCallbacks(std::move(callbacks)); } } @@ -217,32 +213,37 @@ namespace db0 auto lang_lock = LangToolkit::ensureLocked(); #ifndef NDEBUG ThreadTracker::beginUnique(); -#endif +#endif auto callbacks = fixture.onAutoCommit(); - // This should always succeed. We just want the context lifetime to be managed by the FixtureThread. - auto context = m_tmp_context.lock(); - assert(context); - // These callbacks have to be executed when 'everything' is unlocked. Otherwise we are risking a deadlock. - context->appendCallbacks(std::move(callbacks)); + if (!callbacks.empty()) { + assert(m_context && "AutoSaveContext must exist here!"); + // These callbacks have to be executed when 'everything' is unlocked. Otherwise we are risking a deadlock. + m_context->appendCallbacks(std::move(callbacks)); + } #ifndef NDEBUG ThreadTracker::end(); -#endif +#endif } - std::shared_ptr AutoCommitThread::prepareContext() + void AutoCommitThread::prepareContext() { - assert(!m_tmp_context.lock() && "Only one AutoSaveContext should exist at the time!"); + assert(!m_context && "Only one AutoSaveContext should exist at the time!"); auto commit_lock = std::unique_lock(m_commit_mutex); // must acquire unique lock-context's lock auto locked_context_lock = db0::LockedContext::lockUnique(); // and the atomic lock next (order is relevant here !!) auto atomic_lock = db0::AtomicContext::lock(); - auto context = std::make_shared(std::move(commit_lock), - std::move(locked_context_lock), std::move(atomic_lock) - ); // To collect callbacks from fixtures as we proceed with commiting - m_tmp_context = context; - return context; + m_context = std::make_shared(std::move(commit_lock), + std::move(locked_context_lock), std::move(atomic_lock) + ); + } + + void AutoCommitThread::closeContext() + { + assert(m_context && "AutoSaveContext must exist here!"); + m_context->finalize(); + m_context = nullptr; } std::unique_lock AutoCommitThread::preventAutoCommit() { diff --git a/src/dbzero/workspace/FixtureThreads.hpp b/src/dbzero/workspace/FixtureThreads.hpp index cbf83f2c..b4da8600 100644 --- a/src/dbzero/workspace/FixtureThreads.hpp +++ b/src/dbzero/workspace/FixtureThreads.hpp @@ -41,8 +41,6 @@ namespace db0 virtual void onFixtureAdded(Fixture &); - virtual std::shared_ptr prepareContext(); - protected: std::atomic m_interval_ms; std::condition_variable m_cv; @@ -50,6 +48,9 @@ namespace db0 bool m_stopped = false; std::vector> m_fixtures; + + virtual void prepareContext() = 0; + virtual void closeContext() = 0; }; /** @@ -65,9 +66,7 @@ namespace db0 virtual void onUpdate(Fixture &) override; virtual void onFixtureAdded(Fixture &) override; - - virtual std::shared_ptr prepareContext() override; - + private: void tryRefresh(Fixture &fixture); @@ -79,7 +78,10 @@ namespace db0 }; std::unordered_map m_fixture_status; - std::weak_ptr m_tmp_context; + std::shared_ptr m_context; + + void prepareContext() override; + void closeContext() override; }; /** @@ -91,17 +93,18 @@ namespace db0 { public: AutoCommitThread(std::uint64_t commit_interval_ms = 250); - - virtual void onUpdate(Fixture &) override; - - virtual std::shared_ptr prepareContext() override; + virtual void onUpdate(Fixture &) override; + // This lock prevents auto-commit thread collision (even if auto-commit thread is already waiting) static std::unique_lock preventAutoCommit(); private: static std::mutex m_commit_mutex; - std::weak_ptr m_tmp_context; + std::shared_ptr m_context; + + void prepareContext() override; + void closeContext() override; }; }