diff --git a/python_tests/test_refresh.py b/python_tests/test_refresh.py index e8156510..1bc8d0ee 100644 --- a/python_tests/test_refresh.py +++ b/python_tests/test_refresh.py @@ -377,47 +377,6 @@ def rand_string(str_len): import string return ''.join(random.choice(string.ascii_letters) for i in range(str_len)) -def create_process_refresh_query_while_adding(px_name, num_iterations, - num_objects, str_len): - db0.init(DB0_DIR) - db0.open(px_name, "rw") - for _ in range(num_iterations): - for index in range(num_objects): - obj = MemoTestClass(rand_string(str_len)) - db0.tags(obj).add("tag1") - if index % 3 == 0: - db0.tags(obj).add("tag2") - db0.commit() - db0.close() - -@pytest.mark.stress_test -def test_refresh_query_while_adding_new_objects(db0_fixture): - px_name = db0.get_current_prefix().name - - db0.commit() - db0.close() - - num_iterations = 1 - num_objects = 1000 - str_len = 4096 - p = multiprocessing.Process(target=create_process_refresh_query_while_adding, - args = (px_name, num_iterations, num_objects, str_len)) - p.start() - - try: - db0.init(DB0_DIR) - db0.open(px_name, "r") - while True: - db0.refresh() - time.sleep(0.1) - query_len = len(list(db0.find(MemoTestClass, "tag1"))) - print(f"Query length: {query_len}") - if query_len == num_iterations * num_objects: - break - finally: - p.terminate() - p.join() - db0.close() def writer_process(prefix, writer_sem, reader_sem): db0.init(DB0_DIR) @@ -430,6 +389,7 @@ def writer_process(prefix, writer_sem, reader_sem): _obj = MemoTestClass(123) db0.commit() + def test_wait_for_updates(db0_fixture): prefix = db0.get_current_prefix().name db0.commit() @@ -484,17 +444,6 @@ def make_trasaction(n): p.join() -def writer_process(prefix, writer_sem, reader_sem): - db0.init(DB0_DIR) - db0.open(prefix, "rw") - reader_sem.release() - while True: - if not writer_sem.acquire(timeout=10.0): - return # Safeguard - time.sleep(0.1) - _obj = MemoTestClass(123) - db0.commit() - def make_trasaction(writer_sem, n): for _ in range(n): writer_sem.release() @@ -553,147 +502,4 @@ async def test_async_wait_for_updates(db0_fixture): p.terminate() p.join() - - -def append_to_prefix(prefix, obj_count = 50, commit_count = 50, long_run = False): - db0.init(DB0_DIR) - db0.open(prefix, "rw") - # create new or open an existing root object - root = MemoTestSingleton([]) - if (len(root.value) > 0): - print(f"Writer process: opened existing prefix with {len(root.value)} objects") - for i in range(commit_count): - for _ in range(obj_count): - root.value.append(MemoTestClass("b" * 1024)) # 1 KB string - db0.commit() - if long_run: - print(f"Writer process: committed {(i + 1) * obj_count} objects", flush=True) - else: - time.sleep(0.1) - - if long_run: - print(db0.get_storage_stats(), flush=True) - db0.close() - - -@pytest.mark.stress_test -def test_refresh_prefix_continuous_process_with_snapshot(db0_fixture): - px_name = db0.get_current_prefix().name - - def validate_current_prefix(expected_len = None, expected_min_len = None): - snap = db0.snapshot() - root = snap.fetch(MemoTestSingleton) - assert not expected_min_len or len(root.value) >= expected_min_len - assert not expected_len or len(root.value) == expected_len - for item in root.value: - assert item.value == "b" * 1024 - return len(root.value) - - db0.close() - - # in each 'epoch' we modify prefix while making copies - # then drop the original prefix and restore if from the last copy - epoch_count = 2 - total_len = 0 - for epoch in range(epoch_count): - print(f"=== Epoch {epoch} ===") - obj_count = 5000 - commit_count = 100 - # start the writer process for a long run - p = multiprocessing.Process(target=append_to_prefix, args=(px_name, obj_count, commit_count, True)) - p.start() - - db0.init(DB0_DIR) - db0.open(px_name, "r") - last_len = 0 - time.sleep(2.0) - while True: - try: - root = db0.fetch(MemoTestSingleton) - if len(root.value) > 1: - last_len = len(root.value) - break - except Exception: - pass - time.sleep(0.1) - - # validate prefix while writer is actively modifying it - while True: - if not p.is_alive(): - break - print("--- Validate prefix iteration", flush=True) - last_len = validate_current_prefix(expected_min_len = last_len) - print(f"--- Prefix valid with {last_len} objects", flush=True) - if not p.is_alive(): - break - time.sleep(2.5) # wait a bit before next copy - - p.join() - total_len += obj_count * commit_count - - print("Validating final prefix ...", flush=True) - validate_current_prefix(expected_len = total_len) - db0.close() - - -@pytest.mark.stress_test -# @pytest.mark.skip(reason="Test disabled due to issue #605") -# test failing due to issue: https://github.com/dbzero-software/dbzero/issues/605 -def test_refresh_prefix_continuous_process(db0_fixture): - px_name = db0.get_current_prefix().name - - def validate_current_prefix(expected_len = None, expected_min_len = None): - root = db0.fetch(MemoTestSingleton) - assert not expected_min_len or len(root.value) >= expected_min_len - assert not expected_len or len(root.value) == expected_len - for item in root.value: - assert item.value == "b" * 1024 - return len(root.value) - - db0.close() - - # in each 'epoch' we modify prefix while making copies - # then drop the original prefix and restore if from the last copy - epoch_count = 2 - total_len = 0 - for epoch in range(epoch_count): - print(f"=== Epoch {epoch} ===") - obj_count = 5000 - commit_count = 100 - # start the writer process for a long run - p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True)) - p.start() - - db0.init(DB0_DIR) - db0.open(px_name, "r") - last_len = 0 - while True: - try: - if not db0.exists(MemoTestSingleton): - time.sleep(0.1) - continue - root = db0.fetch(MemoTestSingleton) - if len(root.value) > 1: - last_len = len(root.value) - break - except Exception: - pass - time.sleep(0.1) - - # validate prefix while writer is actively modifying it - while True: - if not p.is_alive(): - break - print("--- Validate prefix iteration", flush=True) - last_len = validate_current_prefix(expected_min_len = last_len) - print(f"--- Prefix valid with {last_len} objects", flush=True) - if not p.is_alive(): - break - time.sleep(2.5) # wait a bit before next copy - - p.join() - total_len += obj_count * commit_count - - print("Validating final prefix ...", flush=True) - validate_current_prefix(expected_len = total_len) - db0.close() + \ No newline at end of file diff --git a/python_tests/test_refresh_stress_tests.py b/python_tests/test_refresh_stress_tests.py new file mode 100644 index 00000000..e76a861e --- /dev/null +++ b/python_tests/test_refresh_stress_tests.py @@ -0,0 +1,148 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (c) 2025 DBZero Software sp. z o.o. + +import pytest +import multiprocessing +import time +import dbzero as db0 +import os +from .conftest import DB0_DIR +from .memo_test_types import MemoTestClass, MemoTestSingleton + + +def append_to_prefix(prefix, obj_count = 50, commit_count = 50, long_run = False): + db0.init(DB0_DIR) + db0.open(prefix, "rw") + # create new or open an existing root object + root = MemoTestSingleton([]) + if (len(root.value) > 0): + print(f"Writer process: opened existing prefix with {len(root.value)} objects") + for i in range(commit_count): + for _ in range(obj_count): + root.value.append(MemoTestClass("b" * 1024)) # 1 KB string + db0.commit() + if long_run: + print(f"Writer process: committed {(i + 1) * obj_count} objects", flush=True) + else: + time.sleep(0.1) + + if long_run: + print(db0.get_storage_stats(), flush=True) + db0.close() + + +def validate_current_prefix(expected_len = None, expected_min_len = None): + # refresh to assure we have latest data + db0.refresh() + # NOTE: reader process needs to use snapshots for concurrency safety + with db0.snapshot() as snap: + root = snap.fetch(MemoTestSingleton) + print("--- begin iterate / validation", flush=True) + assert not expected_min_len or len(root.value) >= expected_min_len + assert not expected_len or len(root.value) == expected_len + for item in root.value: + assert item.value == "b" * 1024 + print(f"--- end iterate len = {len(root.value)}", flush=True) + return len(root.value) + + +def rand_string(str_len): + import random + import string + return ''.join(random.choice(string.ascii_letters) for i in range(str_len)) + + +def create_process_refresh_query_while_adding(px_name, num_iterations, + num_objects, str_len): + db0.init(DB0_DIR) + db0.open(px_name, "rw") + for _ in range(num_iterations): + for index in range(num_objects): + obj = MemoTestClass(rand_string(str_len)) + db0.tags(obj).add("tag1") + if index % 3 == 0: + db0.tags(obj).add("tag2") + db0.commit() + db0.close() + + +@pytest.mark.stress_test +def test_refresh_query_while_adding_new_objects(db0_fixture): + px_name = db0.get_current_prefix().name + + db0.commit() + db0.close() + + num_iterations = 1 + num_objects = 1000 + str_len = 4096 + p = multiprocessing.Process(target=create_process_refresh_query_while_adding, + args = (px_name, num_iterations, num_objects, str_len)) + p.start() + + try: + db0.init(DB0_DIR) + db0.open(px_name, "r") + while True: + db0.refresh() + time.sleep(0.1) + query_len = len(list(db0.find(MemoTestClass, "tag1"))) + print(f"Query length: {query_len}") + if query_len == num_iterations * num_objects: + break + finally: + p.terminate() + p.join() + db0.close() + + +@pytest.mark.stress_test +def test_continuous_refresh_process(db0_fixture): + px_name = db0.get_current_prefix().name + db0.close() + + # in each 'epoch' we modify prefix while making copies + # then drop the original prefix and restore if from the last copy + epoch_count = 2 + total_len = 0 + for epoch in range(epoch_count): + print(f"=== Epoch {epoch} ===") + obj_count = 5000 + commit_count = 100 + # start the writer process for a long run + p = multiprocessing.Process(target=append_to_prefix, args=(px_name, obj_count, commit_count, True)) + p.start() + + db0.init(DB0_DIR) + db0.open(px_name, "r") + last_len = 0 + while True: + # NOTE: reader needs to use snapshots for concurrency safety + with db0.snapshot() as snap: + if not snap.exists(MemoTestSingleton): + time.sleep(0.1) + continue + root = snap.fetch(MemoTestSingleton) + if len(root.value) > 1: + last_len = len(root.value) + break + time.sleep(0.1) + + # validate prefix while writer is actively modifying it + while True: + if not p.is_alive(): + break + print("--- Validate prefix iteration", flush=True) + last_len = validate_current_prefix(expected_min_len = last_len) + print(f"--- Prefix valid with {last_len} objects", flush=True) + if not p.is_alive(): + break + time.sleep(0.25) + + p.terminate() + p.join() + total_len += obj_count * commit_count + + print("Validating final prefix ...", flush=True) + validate_current_prefix(expected_len = total_len) + db0.close() diff --git a/src/dbzero/bindings/python/PyAPI.cpp b/src/dbzero/bindings/python/PyAPI.cpp index a188db63..56e02907 100644 --- a/src/dbzero/bindings/python/PyAPI.cpp +++ b/src/dbzero/bindings/python/PyAPI.cpp @@ -128,31 +128,11 @@ namespace db0::python shared_py_object tryFetch(PyObject *py_id, PyTypeObject *type, const char *prefix_name) { return tryFetchFrom(PyToolkit::getPyWorkspace().getWorkspace(), py_id, type, prefix_name); } - + PyObject* tryExists(PyObject *py_id, PyTypeObject *type, const char *prefix_name) { return PyBool_fromBool(tryExistsIn(PyToolkit::getPyWorkspace().getWorkspace(), py_id, type, prefix_name)); } - - bool tryParseFetchArgs(PyObject *args, PyObject *kwargs, PyObject *&py_id, - PyObject *&py_type, const char *&prefix_name) - { - static const char *kwlist[] = { "identifier", "expected_type", "prefix", NULL }; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Os", const_cast(kwlist), &py_id, &py_type, &prefix_name)) { - return false; - } - - // NOTE: for backwards compatibility, swap parameters if one is a type and the other is UUID - if (py_id && py_type && PyType_Check(py_id) && PyUnicode_Check(py_type)) { - std::swap(py_id, py_type); - } - - if (py_type && !PyType_Check(py_type)) { - PyErr_SetString(PyExc_TypeError, "Invalid argument type: type"); - return false; - } - return true; - } - + PyObject *PyAPI_fetch(PyObject *, PyObject *args, PyObject *kwargs) { PyObject *py_id = nullptr; diff --git a/src/dbzero/bindings/python/PyInternalAPI.cpp b/src/dbzero/bindings/python/PyInternalAPI.cpp index c52a9e7c..3377b360 100644 --- a/src/dbzero/bindings/python/PyInternalAPI.cpp +++ b/src/dbzero/bindings/python/PyInternalAPI.cpp @@ -210,6 +210,26 @@ namespace db0::python THROWF(db0::InputException) << "Invalid object ID" << THROWF_END; } + bool tryParseFetchArgs(PyObject *args, PyObject *kwargs, PyObject *&py_id, + PyObject *&py_type, const char *&prefix_name) + { + static const char *kwlist[] = { "identifier", "expected_type", "prefix", NULL }; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Os", const_cast(kwlist), &py_id, &py_type, &prefix_name)) { + return false; + } + + // NOTE: for backwards compatibility, swap parameters if one is a type and the other is UUID + if (py_id && py_type && PyType_Check(py_id) && PyUnicode_Check(py_type)) { + std::swap(py_id, py_type); + } + + if (py_type && !PyType_Check(py_type)) { + PyErr_SetString(PyExc_TypeError, "Invalid argument type: type"); + return false; + } + return true; + } + PyObject *fetchSingletonObject(db0::swine_ptr &fixture, PyTypeObject *py_type) { auto &class_factory = fixture->get(); diff --git a/src/dbzero/bindings/python/PyInternalAPI.hpp b/src/dbzero/bindings/python/PyInternalAPI.hpp index cbc15e1a..74a294ae 100644 --- a/src/dbzero/bindings/python/PyInternalAPI.hpp +++ b/src/dbzero/bindings/python/PyInternalAPI.hpp @@ -164,6 +164,9 @@ namespace db0::python bool isExistingObject(db0::Snapshot &, ObjectId object_id, PyTypeObject *py_expected_type = nullptr); + bool tryParseFetchArgs(PyObject *args, PyObject *kwargs, PyObject *&py_id, + PyObject *&py_type, const char *&prefix_name); + /** * Open dbzero singleton by its corresponding Python type */ diff --git a/src/dbzero/bindings/python/PySnapshot.cpp b/src/dbzero/bindings/python/PySnapshot.cpp index fee7c8a4..924ce97f 100644 --- a/src/dbzero/bindings/python/PySnapshot.cpp +++ b/src/dbzero/bindings/python/PySnapshot.cpp @@ -124,6 +124,17 @@ namespace db0::python return tryFetchFrom(snapshot, py_id, type, prefix_name).steal(); } + PyObject* tryPySnapshot_exists(PyObject *self, PyObject *py_id, PyTypeObject *type, const char *prefix_name) + { + if (!PySnapshot_Check(self)) { + PyErr_SetString(PyExc_TypeError, "Invalid argument type"); + return NULL; + } + + auto &snapshot = reinterpret_cast(self)->modifyExt(); + return PyBool_fromBool(tryExistsIn(snapshot, py_id, type, prefix_name)); + } + PyObject *tryPySnapshot_find(PyObject *self, PyObject *args, PyObject *kwargs) { if (!PySnapshot_Check(self)) { @@ -172,6 +183,21 @@ namespace db0::python return runSafe(tryGetStateNum, snapshot, args, kwargs); } + PyObject *PyAPI_PySnapshot_exists(PyObject *self, PyObject *args, PyObject *kwargs) + { + PyObject *py_id = nullptr; + PyObject *py_type = nullptr; + const char *prefix_name = nullptr; + // takes same arguments as fetch + if (!tryParseFetchArgs(args, kwargs, py_id, py_type, prefix_name)) { + // error already set in tryParseFetchArgs + return NULL; + } + + PY_API_FUNC + return runSafe(tryPySnapshot_exists, self, py_id, reinterpret_cast(py_type), prefix_name); + } + PyObject *PyAPI_PySnapshot_fetch(PyObject *self, PyObject *args, PyObject *kwargs) { PY_API_FUNC @@ -294,9 +320,10 @@ namespace db0::python // check if a specific prefix belongs to the snapshot .sq_contains = (objobjproc)PyAPI_PySnapshot_HasItem }; - + static PyMethodDef PySnapshot_methods[] = { + {"exists", (PyCFunction)&PyAPI_PySnapshot_exists, METH_VARARGS | METH_KEYWORDS, "Check if a specific UUID points to a valid dbzero object instance or if singleton of a given type exists"}, {"fetch", (PyCFunction)&PyAPI_PySnapshot_fetch, METH_VARARGS | METH_KEYWORDS, "Fetch dbzero object instance by its ID or type (in case of a singleton)"}, {"find", (PyCFunction)&PyAPI_PySnapshot_find, METH_VARARGS | METH_KEYWORDS, ""}, {"deserialize", (PyCFunction)&PyAPI_PySnapshot_deserialize, METH_FASTCALL, "Deserialize from bytes within the snapshot's context"}, diff --git a/src/dbzero/bindings/python/dbzero.cpp b/src/dbzero/bindings/python/dbzero.cpp index c2b98c9c..fb1af17f 100644 --- a/src/dbzero/bindings/python/dbzero.cpp +++ b/src/dbzero/bindings/python/dbzero.cpp @@ -40,7 +40,7 @@ static PyMethodDef dbzero_methods[] = {"drop", &py::PyAPI_drop, METH_VARARGS, "Drop prefix (if exists)"}, {"commit", &py::PyAPI_commit, METH_VARARGS, "Commit data to disk / persistent storage"}, {"fetch", (PyCFunction)&py::PyAPI_fetch, METH_VARARGS | METH_KEYWORDS, "Retrieve dbzero object instance by its UUID or type (in case of a singleton)"}, - {"exists", (PyCFunction)&py::PyAPI_exists, METH_VARARGS | METH_KEYWORDS, "Check if a specific UUID points to a valid dbzero object instance (or existing singleton instance)"}, + {"exists", (PyCFunction)&py::PyAPI_exists, METH_VARARGS | METH_KEYWORDS, "Check if a specific UUID points to a valid dbzero object instance or if singleton of a given type exists"}, {"delete", &py::PyAPI_del, METH_VARARGS, "Delete dbzero object and the corresponding Python instance"}, {"get_type_info", &py::PyAPI_getTypeInfo, METH_VARARGS, "Get dbzero type information"}, {"uuid", (PyCFunction)&py::PyAPI_getUUID, METH_FASTCALL, "Get unique object ID"}, diff --git a/src/dbzero/core/utils/weak_vector.hpp b/src/dbzero/core/utils/weak_vector.hpp index 0c3639cb..91ea2ec3 100644 --- a/src/dbzero/core/utils/weak_vector.hpp +++ b/src/dbzero/core/utils/weak_vector.hpp @@ -22,7 +22,7 @@ namespace db0 this->cleanup(); m_data.emplace_back(ptr); } - + // remove expired weak pointers // @return true if the vector is empty after cleanup bool cleanup() @@ -47,8 +47,8 @@ namespace db0 void forEach(std::function f) const { for (const auto &ptr : m_data) { - if (auto p = ptr.lock()) { - f(*p); + if (auto p = ptr.lock()) { + f(*p); } } } @@ -56,8 +56,8 @@ namespace db0 void forEach(std::function f) { for (const auto &ptr : m_data) { - if (auto p = ptr.lock()) { - f(*p); + if (auto p = ptr.lock()) { + f(*p); } } } diff --git a/src/dbzero/object_model/iterators/BaseIterator.hpp b/src/dbzero/object_model/iterators/BaseIterator.hpp index 900fdc75..50690185 100644 --- a/src/dbzero/object_model/iterators/BaseIterator.hpp +++ b/src/dbzero/object_model/iterators/BaseIterator.hpp @@ -54,10 +54,10 @@ namespace db0::object_model assureAttached(); return m_iterator.is_end(); } - + void detach() const { - // NOTE: this needs to be reimplemented to save key + iterator invalidation + // NOTE: this needs to be reimplemented to save key + iterator invalidation m_detached = true; } diff --git a/src/dbzero/object_model/list/List.cpp b/src/dbzero/object_model/list/List.cpp index ea9591af..e7fa19bb 100644 --- a/src/dbzero/object_model/list/List.cpp +++ b/src/dbzero/object_model/list/List.cpp @@ -223,10 +223,12 @@ namespace db0::object_model void List::detach() const { - // detach object and the associated iterators - m_iterators.forEach([](ListIterator &iter) { - iter.detach(); - }); + // FIXME: detaching iterators is not safe, need to investigate why + // NOTE: this is not necessary as long as developers properly use the "snapshot" mechanism + // // detach object and the associated iterators + // m_iterators.forEach([](ListIterator &iter) { + // iter.detach(); + // }); super_t::detach(); } diff --git a/src/dbzero/object_model/list/ListIterator.cpp b/src/dbzero/object_model/list/ListIterator.cpp index 0cd63f8a..1e318b44 100644 --- a/src/dbzero/object_model/list/ListIterator.cpp +++ b/src/dbzero/object_model/list/ListIterator.cpp @@ -16,6 +16,7 @@ namespace db0::object_model ListIterator::ObjectSharedPtr ListIterator::next() { assureAttached(); + assert(!is_end()); auto fixture = m_collection->getFixture(); auto [storage_class, value] = *m_iterator; ++m_iterator; @@ -25,9 +26,12 @@ namespace db0::object_model void ListIterator::restore() { - m_index = std::min(m_index, this->m_collection->size()); - // NOTE: may set the iterator as end - m_iterator = this->m_collection->begin(m_index); + if (m_index m_collection->size()) { + m_iterator = this->m_collection->begin(m_index); + } else { + // restore as end iterator + m_iterator = this->m_collection->end(); + } } } \ No newline at end of file diff --git a/src/dbzero/workspace/FixtureThreads.cpp b/src/dbzero/workspace/FixtureThreads.cpp index fdb42fc6..46f2b93b 100644 --- a/src/dbzero/workspace/FixtureThreads.cpp +++ b/src/dbzero/workspace/FixtureThreads.cpp @@ -111,7 +111,7 @@ namespace db0 m_fixture_status[uuid] = FixtureUpdateStatus{fixture.getPrefix().getLastUpdated(), ClockType::now()}; } - void RefreshThread::prepareContext() + void RefreshThread::prepareContext() { assert(!m_context && "Only one FixtureThreadCallbacksContext should exist at the time!"); m_context = std::make_shared(); @@ -127,11 +127,11 @@ namespace db0 void RefreshThread::onUpdate(Fixture &fixture) { auto prefix_ptr = fixture.getPrefixPtr(); - // prefix_ptr may not exist a fixture has already been closed + // prefix_ptr may not exist if fixture has already been closed if (!prefix_ptr) { return; } - + std::uint64_t uuid = fixture.getUUID(); auto last_updated = prefix_ptr->getLastUpdated(); auto now = ClockType::now();