diff --git a/dbzero/dbzero/dbzero.py b/dbzero/dbzero/dbzero.py index c9e4f4dc..21899e3d 100644 --- a/dbzero/dbzero/dbzero.py +++ b/dbzero/dbzero/dbzero.py @@ -10,7 +10,7 @@ def load_dynamic(name, path): def __bootstrap__(): global __bootstrap__, __loader__, __file__ - paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"] + paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"] __file__ = None for path in paths: if os.path.isdir(path): diff --git a/pyproject.toml b/pyproject.toml index 19cb59f4..9fdaa6ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,9 +3,9 @@ build-backend = 'mesonpy' requires = ['meson-python'] [project] -name = 'DBzero' +name = 'dbzero' version = '0.1.0' -description = 'DBZero Community edition' +description = 'DBZero Community Edition' readme = 'README.md' requires-python = '>=3.8' license = {file = 'LICENSE'} diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index 2b37169d..72cab0e8 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -91,7 +91,10 @@ def test_copy_prefix_custom_step_size(db0_fixture): def writer_process(prefix, obj_count = 50, commit_count = 50, long_run = False): db0.init(DB0_DIR) db0.open(prefix, "rw") + # create new or open an existing root object root = MemoTestSingleton([]) + if (len(root.value) > 0): + print(f"Writer process: opened existing prefix with {len(root.value)} objects") for i in range(commit_count): for _ in range(obj_count): root.value.append(MemoTestClass("b" * 1024)) # 1 KB string @@ -100,7 +103,7 @@ def writer_process(prefix, obj_count = 50, commit_count = 50, long_run = False): print(f"Writer process: committed {i * obj_count} objects") else: time.sleep(0.1) - + if long_run: print(db0.get_storage_stats()) db0.commit() @@ -216,72 +219,165 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): return result db0.close() - obj_count = 5000 - commit_count = 100 - # start the writer process for a long run - p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True)) - p.start() - db0.init(DB0_DIR) - db0.open(px_name, "r") - last_len = 0 - while True: - try: - root = db0.fetch(MemoTestSingleton) - if len(root.value) > 1: - last_len = len(root.value) + # in each 'epoch' we modify prefix while making copies + # then drop the original prefix and restore if from the last copy + epoch_count = 2 + for epoch in range(epoch_count): + print(f"=== Epoch {epoch} ===") + # obj_count = 5000 + # commit_count = 100 + obj_count = 500 + commit_count = 100 + # start the writer process for a long run + p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True)) + p.start() + + db0.init(DB0_DIR) + db0.open(px_name, "r") + last_len = 0 + while True: + try: + root = db0.fetch(MemoTestSingleton) + if len(root.value) > 1: + last_len = len(root.value) + break + except Exception: + pass + time.sleep(0.1) + + copy_id = 0 + # copy the prefix multiple times while it is being modified + while True: + if not p.is_alive(): break - except Exception: - pass - time.sleep(0.1) + file_name = f"./test-copy-{copy_id}.db0" + if os.path.exists(file_name): + os.remove(file_name) + # copy prefix without opening it, use default step size + print("--- Copying prefix iteration", copy_id) + db0.copy_prefix(file_name, prefix=px_name) + print("--- copy finished") + copy_id += 1 + if not p.is_alive(): + break + time.sleep(2.5) # wait a bit before next copy + + p.join() + + # validate original prefix (no copy yet) + print("Validating final prefix ...") + db0.open(px_name, "r") + validate_current_prefix(expected_len = obj_count * commit_count) + + # make final stale copy (i.e. without active modifications) + final_copy = f"./test-copy-final.db0" + if os.path.exists(final_copy): + os.remove(final_copy) + db0.copy_prefix(final_copy, prefix=px_name) + db0.close() + + print("Validating all copies") + validate_copy("final", expected_len = obj_count * commit_count) + for i in range(copy_id): + last_len = validate_copy(i, expected_min_len = last_len) + print(f"--- Copy {i} valid with {last_len} objects") + + # now, continue modifications starting from the last restored copy (making new copies) + + +def test_modify_copied_prefix(db0_fixture): + file_name = "./test-copy.db0" + # remove file if it exists + if os.path.exists(file_name): + os.remove(file_name) + + px_name = db0.get_current_prefix().name + px_path = os.path.join(DB0_DIR, px_name + ".db0") + root = MemoTestSingleton([]) + total_len = 0 - copy_id = 0 - # copy the prefix multiple times while it is being modified - while True: - if not p.is_alive(): - break - file_name = f"./test-copy-{copy_id}.db0" - if os.path.exists(file_name): - os.remove(file_name) - # copy prefix without opening it, use default step size - print("--- Copying prefix iteration", copy_id) - db0.copy_prefix(file_name, prefix=px_name) - print("--- copy finished") - copy_id += 1 - if not p.is_alive(): - break - time.sleep(2.5) # wait a bit before next copy + def modify_prefix(): + append_count = 0 + root = db0.fetch(MemoTestSingleton) + for _ in range(50): + root.value.append(MemoTestClass("a" * 1024)) # 1 KB string + append_count += 1 + db0.commit() + return append_count - p.join() + total_len += modify_prefix() + db0.copy_prefix(file_name) + db0.close() - # validate original prefix (no copy yet) - print("Validating final prefix ...") - db0.open(px_name, "r") - validate_current_prefix(expected_len = obj_count * commit_count) + # drop original file and replace with copy + os.remove(px_path) + os.rename(file_name, px_path) - # make final stale copy (i.e. without active modifications) - final_copy = f"./test-copy-final.db0" - if os.path.exists(final_copy): - os.remove(final_copy) - db0.copy_prefix(final_copy, prefix=px_name) + # open recovered prefix for update + db0.init(DB0_DIR, prefix=px_name, read_write=True) + total_len += modify_prefix() db0.close() + + # open prefix from recovered and modified copy + db0.init(DB0_DIR, prefix=px_name, read_write=False) + root = db0.fetch(MemoTestSingleton) + for item in root.value: + assert item.value == "a" * 1024 + assert len(root.value) == total_len + + +def test_copy_prefix_of_recovered_copy(db0_fixture): + file_name = "./test-copy.db0" + # remove file if it exists + if os.path.exists(file_name): + os.remove(file_name) + + px_name = db0.get_current_prefix().name + px_path = os.path.join(DB0_DIR, px_name + ".db0") + root = MemoTestSingleton([]) + total_len = 0 + charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - print("Validating all copies") - validate_copy("final", expected_len = obj_count * commit_count) - for i in range(copy_id): - last_len = validate_copy(i, expected_min_len = last_len) - print(f"--- Copy {i} valid with {last_len} objects") + def modify_prefix(op_count = 50): + append_count = 0 + root = db0.fetch(MemoTestSingleton) + for _ in range(op_count): + c = charset[len(root.value) % len(charset)] + root.value.append(MemoTestClass(c * 1024)) # 1 KB string + append_count += 1 + db0.commit() + return append_count -def test_copy_prefix_throws_on_path_passed(db0_fixture): - path = "./invalid-dir/nonexistent/-copy/" - # remove path if it exists - if os.path.exists(path): - os.rmdir(path) + def validate(expected_len): + root = db0.fetch(MemoTestSingleton) + for i, item in enumerate(root.value): + c = charset[i % len(charset)] + assert item.value == c * 1024 + assert len(root.value) == expected_len + + total_len += modify_prefix(150) + db0.copy_prefix(file_name, page_io_step_size=64 << 10) + db0.close() + + # drop original file and replace with copy + os.remove(px_path) + os.rename(file_name, px_path) - root = MemoTestSingleton([]) - for _ in range(50): - root.value.append(MemoTestClass("a" * 1024)) # 1 KB string - db0.commit() + # open recovered prefix for update + db0.init(DB0_DIR, prefix=px_name, read_write=True) + total_len += modify_prefix(100) + + db0.close() + db0.init(DB0_DIR, prefix=px_name, read_write=True) + validate(total_len) + db0.copy_prefix(file_name) + db0.close() - with pytest.raises(OSError) as excinfo: - db0.copy_prefix(path) + # restore copy of a restored and modified copy + os.remove(px_path) + os.rename(file_name, px_path) + + # open prefix from recovered and modified copy of a copy + db0.init(DB0_DIR, prefix=px_name, read_write=False) + validate(total_len) \ No newline at end of file diff --git a/src/dbzero/bindings/python/PyInternalAPI.cpp b/src/dbzero/bindings/python/PyInternalAPI.cpp index 02c3fe4a..8a8df3d1 100644 --- a/src/dbzero/bindings/python/PyInternalAPI.cpp +++ b/src/dbzero/bindings/python/PyInternalAPI.cpp @@ -946,13 +946,14 @@ namespace db0::python PyErr_Format(PyExc_OSError, "Output file already exists: '%s'", output_file_name); return nullptr; } - + // use either explicit step size, input step size (if > 1) or default = 4MB if (!page_io_step_size) { - auto in_step_size = src_storage.getPageIO().getStepSize(); - page_io_step_size = in_step_size > 1 ? in_step_size : (4u << 20); + auto &page_io = src_storage.getPageIO(); + auto in_step_size = page_io.getStepSize(); + page_io_step_size = in_step_size > 1 ? (in_step_size * page_io.getBlockSize()) : (4u << 20); } - + if (!meta_io_step_size) { auto in_meta_step_size = src_storage.getMetaIO().getStepSize(); meta_io_step_size = in_meta_step_size > 1 ? in_meta_step_size : (1u << 20); diff --git a/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp b/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp index 2d73717d..85eab5c5 100644 --- a/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp +++ b/src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp @@ -227,6 +227,7 @@ DB0_PACKED_END using CompT = typename super_t::CompT; using NodeItemCompT = typename super_t::NodeItemCompT; using NodeItemEqualT = typename super_t::NodeItemEqualT; + using const_iterator = typename super_t::const_iterator; // as null / invalid SGB_CompressedLookupTree() = default; @@ -274,7 +275,11 @@ DB0_PACKED_END sg_tree_const_iterator cend_nodes() const { return base_t::end(); } - + + bool empty() const { + return super_t::empty(); + } + std::size_t size() const { return super_t::size(); } @@ -337,7 +342,7 @@ DB0_PACKED_END return std::nullopt; } - + // Locate first element which is greater or equal to the key template std::optional upper_equal_bound(const KeyT &key) const { @@ -423,6 +428,35 @@ DB0_PACKED_END bool operator!() const { return super_t::operator!(); } + + class uncompressed_const_iterator: protected super_t::const_iterator + { + public: + uncompressed_const_iterator(const const_iterator &iterator) + : super_t::const_iterator(iterator) + { + } + + bool is_end() const { + return super_t::const_iterator::is_end(); + } + + uncompressed_const_iterator &operator++() + { + super_t::const_iterator::operator++(); + return *this; + } + + ItemT operator*() const { + // return uncompressed item from the underlying iterator + return this->m_tree_it->header().uncompress(super_t::const_iterator::operator*()); + } + }; + + // Begin sorted iteration over all items (uncompressed) + uncompressed_const_iterator cbegin() const { + return super_t::cbegin(); + } private: ItemCompT m_raw_item_comp; diff --git a/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp b/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp index 4da9091a..92385e6f 100644 --- a/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp +++ b/src/dbzero/core/collections/SGB_Tree/SGB_LookupTree.hpp @@ -82,11 +82,11 @@ DB0_PACKED_BEGIN const_iterator cbegin() const { - if (!is_reversed()) { - return super_t::cbegin(); + if (is_reversed()) { + // reversed begin + return super_t::cbegin() + this->maxItems() - 1; } - // reversed begin - return super_t::cbegin() + this->maxItems() - 1; + return super_t::cbegin(); } iterator begin() const { @@ -95,10 +95,10 @@ DB0_PACKED_BEGIN const_iterator cend() const { - if (!is_reversed()) { - return super_t::cend(); + if (is_reversed()) { + return this->cbegin() - this->m_size; } - return this->cbegin() - this->m_size; + return super_t::cend(); } iterator end() { @@ -107,11 +107,11 @@ DB0_PACKED_BEGIN const_iterator clast() const { - if (!is_reversed()) { - return super_t::clast(); - } - // reversed last - return super_t::cbegin() - 1; + if (is_reversed()) { + // reversed last + return super_t::cbegin() - 1; + } + return super_t::clast(); } iterator last() { @@ -119,7 +119,7 @@ DB0_PACKED_BEGIN } const ItemT &keyItem() const { - // key item is the first heap item + // key item is the first heap item (or first sorted item) return *this->cbegin(); } @@ -153,7 +153,8 @@ DB0_PACKED_BEGIN * * @return true if item was erased */ - template bool erase(const KeyT &key, const HeapCompT &comp) + template + bool erase(const KeyT &key, const HeapCompT &comp) { if (this->is_reversed()) { auto item_ptr = dheap::rfind(this->begin(), this->end(), key, comp.itemEqual); @@ -178,7 +179,8 @@ DB0_PACKED_BEGIN return this->header().m_flags[LookupHeaderFlags::reversed] ? -1 : 1; } - template const_iterator lower_equal_bound(const KeyT &key, const HeapCompT &comp) const + template + const_iterator lower_equal_bound(const KeyT &key, CompT comp) const { const_iterator result = nullptr; if (is_sorted()) { @@ -206,7 +208,8 @@ DB0_PACKED_BEGIN return result; } - template const_iterator upper_equal_bound(const KeyT &key, const HeapCompT &comp) const + template + const_iterator upper_equal_bound(const KeyT &key, const HeapCompT &comp) const { const_iterator result = nullptr; if (is_sorted()) { @@ -319,6 +322,85 @@ DB0_PACKED_BEGIN return this->erase_existing(this->itemAt(at), comp); } + class const_sorting_iterator + { + public: + const_sorting_iterator() = default; + const_sorting_iterator(const ItemT *ptr, const ItemT *end_ptr, const HeapCompT &comp, + bool is_sorted, bool is_reversed) + : m_ptr(is_sorted ? ptr : nullptr) + , m_end_ptr(is_sorted ? end_ptr : nullptr) + , m_is_sorted(is_sorted) + , m_is_reversed(is_reversed) + , m_step(is_reversed ? -1 : 1) + { + if (!is_sorted) { + if (is_reversed) { + // NOTE: pointers are reversed as well + assert(!(ptr <= end_ptr)); + // copy items in reversed heap order + std::vector items; + items.reserve(std::distance(end_ptr, ptr)); + while (ptr != end_ptr) { + items.push_back(*ptr); + --ptr; + } + m_it = { std::move(items), comp }; + } else { + m_it = { ptr, end_ptr, comp }; + } + } + } + + const_sorting_iterator &operator++() + { + assert(!is_end()); + if (m_is_sorted) { + m_ptr += m_step; + } else { + assert(!!m_it); + ++m_it; + } + return *this; + } + + // Check if the instance is valid + bool operator!() const { + return !(m_ptr || !!m_it); + } + + bool is_end() const + { + if (m_is_sorted) { + return m_ptr == m_end_ptr; + } else { + return m_it.is_end(); + } + } + + ItemT operator*() const + { + assert(!is_end()); + if (m_is_sorted) { + return *m_ptr; + } else { + return *m_it; + } + } + + private: + typename super_t::const_sorting_iterator m_it; + const ItemT *m_ptr = nullptr; + const ItemT *m_end_ptr = nullptr; + bool m_is_sorted = false; + bool m_is_reversed = false; + int m_step; + }; + + const_sorting_iterator cbegin_sorted(const HeapCompT &comp) const { + return { this->cbegin(), this->cend(), comp, is_sorted(), is_reversed() }; + } + private: /** diff --git a/src/dbzero/core/collections/SGB_Tree/SGB_Tree.hpp b/src/dbzero/core/collections/SGB_Tree/SGB_Tree.hpp index 150641e6..88a2e0cd 100644 --- a/src/dbzero/core/collections/SGB_Tree/SGB_Tree.hpp +++ b/src/dbzero/core/collections/SGB_Tree/SGB_Tree.hpp @@ -209,11 +209,13 @@ namespace db0 item.first = nullptr; #endif } - + + // Sorted order iterator class const_iterator { public: - const_iterator(const sg_tree_const_iterator &tree_it, const sg_tree_const_iterator &tree_end, const HeapCompT &comp) + const_iterator(const sg_tree_const_iterator &tree_it, const sg_tree_const_iterator &tree_end, + const HeapCompT &comp) : m_tree_it(tree_it) , m_tree_end(tree_end) , m_comp(comp) @@ -236,11 +238,11 @@ namespace db0 return *this; } - ItemT operator*() const { + auto operator*() const { return *m_node_it; } - const ItemT *operator->() const { + auto operator->() const { return m_node_it.get_ptr(); } @@ -248,12 +250,12 @@ namespace db0 return m_tree_it == m_tree_end || m_node_it.is_end(); } - private: + protected: sg_tree_const_iterator m_tree_it, m_tree_end; sgb_node_const_sorting_iterator m_node_it; HeapCompT m_comp; }; - + const_iterator cbegin() const { return const_iterator(super_t::begin(), super_t::end(), m_heap_comp); } @@ -582,7 +584,7 @@ namespace db0 ); return { new_node->begin(), new_node }; } - + }; /** diff --git a/src/dbzero/core/collections/SGB_Tree/sgb_tree_node.hpp b/src/dbzero/core/collections/SGB_Tree/sgb_tree_node.hpp index 95c6a2ec..3c97fc57 100644 --- a/src/dbzero/core/collections/SGB_Tree/sgb_tree_node.hpp +++ b/src/dbzero/core/collections/SGB_Tree/sgb_tree_node.hpp @@ -44,7 +44,7 @@ DB0_PACKED_BEGIN using const_iterator = const ItemT *; using CompT = ItemCompT; using EqualT = ItemEqualT; - + // tree pointers (possibly relative to slab) sgb_tree_ptr_set ptr_set; // total number of available (allocated) bytes @@ -52,7 +52,7 @@ DB0_PACKED_BEGIN // actual number of stored elements CapacityT m_size = 0; - /// inverts items for the min heap + // Reverses items for the min heap struct HeapCompT { ItemCompT itemComp; @@ -223,10 +223,12 @@ DB0_PACKED_BEGIN /** * const_sorting_iterator uses additional memory to sort items on-the-fly + * from the heap order to the sorted order */ class const_sorting_iterator { public: + // as null / invalid const_sorting_iterator() = default; const_sorting_iterator(const ItemT *ptr, const ItemT *end_ptr, const HeapCompT &comp) : m_items(ptr, end_ptr) @@ -234,8 +236,30 @@ DB0_PACKED_BEGIN , m_end_ptr(m_items.data() + m_items.size()) , m_comp(comp) { + assert(ptr <= end_ptr); + } + + const_sorting_iterator(const const_sorting_iterator &other) + : m_items(other.m_items) + // rebase items + , m_ptr(m_items.data() + (other.m_ptr - other.m_items.data())) + , m_end_ptr(m_items.data() + (other.m_end_ptr - other.m_items.data())) + , m_comp(other.m_comp) + { + } + + const_sorting_iterator(const_sorting_iterator &&other) { + (*this) = std::move(other); } + const_sorting_iterator(std::vector &&items, const HeapCompT &comp) + : m_items(std::move(items)) + , m_ptr(m_items.data()) + , m_end_ptr(m_items.data() + m_items.size()) + , m_comp(comp) + { + } + const_sorting_iterator &operator++() { assert(!is_end()); @@ -244,6 +268,11 @@ DB0_PACKED_BEGIN return *this; } + // Check if the instance is valid + bool operator!() const { + return !m_ptr || !m_end_ptr; + } + bool is_end() const { return m_ptr == m_end_ptr; } @@ -260,14 +289,41 @@ DB0_PACKED_BEGIN return m_ptr; } + const_sorting_iterator &operator=(const const_sorting_iterator &other) + { + if (this != &other) { + m_items = other.m_items; + // rebase items + m_ptr = m_items.data() + (other.m_ptr - other.m_items.data()); + m_end_ptr = m_items.data() + (other.m_end_ptr - other.m_items.data()); + m_comp = other.m_comp; + } + return *this; + } + + const_sorting_iterator &operator=(const_sorting_iterator &&other) + { + if (this != &other) { + auto ptr_diff = other.m_ptr - other.m_items.data(); + auto end_ptr_diff = other.m_end_ptr - other.m_items.data(); + m_items = std::move(other.m_items); + // rebase items + m_ptr = m_items.data() + ptr_diff; + m_end_ptr = m_items.data() + end_ptr_diff; + m_comp = other.m_comp; + } + return *this; + } + private: std::vector m_items; - ItemT *m_ptr = nullptr, *m_end_ptr = nullptr; + ItemT *m_ptr = nullptr; + ItemT *m_end_ptr = nullptr; HeapCompT m_comp; }; const_sorting_iterator cbegin_sorted(const HeapCompT &comp) const { - return const_sorting_iterator(cbegin(), cend(), comp); + return { cbegin(), cend(), comp }; } const_iterator find_max(const HeapCompT &comp) const { diff --git a/src/dbzero/core/collections/sgtree/v_sgtree.hpp b/src/dbzero/core/collections/sgtree/v_sgtree.hpp index 9dce0348..d44955a7 100644 --- a/src/dbzero/core/collections/sgtree/v_sgtree.hpp +++ b/src/dbzero/core/collections/sgtree/v_sgtree.hpp @@ -350,11 +350,11 @@ DB0_PACKED_END template iterator lower_bound(const KeyT &key) const { return SG_Tree::lower_bound(this->head(), key, _comp); } - + template iterator lower_equal_bound(const KeyT &key) const { return SG_Tree::lower_equal_bound(this->head(), key, _comp); } - + /** * Find upper-bound node by initializer / key */ diff --git a/src/dbzero/core/exception/AbstractException.cpp b/src/dbzero/core/exception/AbstractException.cpp index 72144da4..3c164b75 100644 --- a/src/dbzero/core/exception/AbstractException.cpp +++ b/src/dbzero/core/exception/AbstractException.cpp @@ -532,3 +532,11 @@ const char* AbstractException::what() const throw() { string AbstractException::getName() const { return typeid(*this).name(); } + +std::ostream &db0::showStackTrace(std::ostream &os, unsigned int pruneTop, bool omitLastNotDemangled) +{ + TraceInfo ti; + ti.generateInfo(); + os << ti.getPrintableInfo(pruneTop, omitLastNotDemangled); + return os; +} diff --git a/src/dbzero/core/exception/AbstractException.hpp b/src/dbzero/core/exception/AbstractException.hpp index 59c4e8f0..363187c7 100644 --- a/src/dbzero/core/exception/AbstractException.hpp +++ b/src/dbzero/core/exception/AbstractException.hpp @@ -267,4 +267,6 @@ namespace db0 mutable std::string formattedMsg; }; + std::ostream &showStackTrace(std::ostream &os, unsigned int pruneTop = 0, bool omitLastNotDemangled = true); + } \ No newline at end of file diff --git a/src/dbzero/core/memory/utils.hpp b/src/dbzero/core/memory/utils.hpp index 6b21b907..75099b23 100644 --- a/src/dbzero/core/memory/utils.hpp +++ b/src/dbzero/core/memory/utils.hpp @@ -73,5 +73,18 @@ namespace db0 } return has_mutation; } + + template + std::optional optional_max(const std::optional &a, const std::optional &b) + { + if (a && b) { + return std::max(*a, *b); + } else if (a) { + return a; + } else { + return b; + } + return {}; + } } diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index 702d3ee1..221c0291 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -154,7 +154,7 @@ namespace db0 auto config = new (buffer.data()) o_prefix_config( block_size, *page_size, dram_page_size, getPageIOStepSize(block_size, step_size_hint) ); - + std::uint64_t offset = CONFIG_BLOCK_SIZE; auto next_block_offset = [&]() { @@ -300,12 +300,12 @@ namespace db0 // query.first yields the full-DP (if it exists) std::uint64_t page_io_id = query.first(); - if (page_io_id) { + if (page_io_id) { if (!!m_ext_space) { // convert relative page number back to absolute page_io_id = m_ext_space.getAbsolute(page_io_id); } - // read full DP + // read full DP m_page_io.read(page_io_id, read_buf); } else { // requesting a diff-DP only encoded page, use zero buffer as a base @@ -406,7 +406,7 @@ namespace db0 // we cannot append as diff but need to overwrite the full page instead if (state_num != query.firstStateNum() && query.leftLessThan(max_len)) { bool is_first_page; - // append as diff-page (NOTE: diff-writes are only appended) + // append as diff-page (NOTE: diff-writes are only appended) auto [page_io_id, overflow] = m_page_io.appendDiff(buffer, { page_num, state_num }, diff_data, &is_first_page); if (!!m_ext_space) { // NOTE: first page (of each step) must be registered with REL_Index if it's maintained @@ -455,7 +455,7 @@ namespace db0 m_ext_dram_changelog_io->flush(); return true; } - + bool BDevStorage::flush(ProcessTimer *parent_timer) { std::unique_lock lock(m_mutex); @@ -483,10 +483,11 @@ namespace db0 m_page_io.flush(); // Extract & flush sparse index change log first (on condition of any updates) // we also need to collect the end storage page number, possibly relative (sentinel) - auto end_page_io_page_num = m_page_io.getEndPageNum(); + bool is_first = false; + auto end_page_io_page_num = m_page_io.getEndPageNum(&is_first); if (!!m_ext_space) { // convert to relative page number - end_page_io_page_num = m_ext_space.assignRelative(end_page_io_page_num, false); + end_page_io_page_num = m_ext_space.assignRelative(end_page_io_page_num, is_first); } m_sparse_pair.extractChangeLog(m_dp_changelog_io, end_page_io_page_num); @@ -570,15 +571,25 @@ namespace db0 return result; } - Diff_IO BDevStorage::getPage_IO(std::uint64_t next_page_hint, std::uint32_t step_size) - { - auto block_id = (next_page_hint * m_config.m_page_size) / m_config.m_block_size; + Diff_IO BDevStorage::getPage_IO(std::optional next_page_hint, std::uint32_t step_size) + { auto block_capacity = m_config.m_block_size / m_config.m_page_size; std::optional block_num; std::uint64_t address = 0; std::uint32_t page_count = 0; - if (next_page_hint == 0) { + + if (next_page_hint) { + auto block_id = (*next_page_hint * m_config.m_page_size) / m_config.m_block_size; + address = CONFIG_BLOCK_SIZE + block_id * m_config.m_block_size; + page_count = static_cast(*next_page_hint % block_capacity); + + // position at the end of the last existing block + if (page_count == 0) { + address -= m_config.m_block_size; + page_count = block_capacity; + } + } else { // assign first page address = std::max(m_dram_io.tail(), m_meta_io.tail()); address = std::max(address, m_dram_changelog_io.tail()); @@ -591,15 +602,6 @@ namespace db0 // NOTE: initialize with a known block num = 0 (first block of the first step) block_num = 0; - } else { - address = CONFIG_BLOCK_SIZE + block_id * m_config.m_block_size; - page_count = static_cast(next_page_hint % block_capacity); - - // position at the end of the last existing block - if (page_count == 0) { - address -= m_config.m_block_size; - page_count = block_capacity; - } } // NOTE: block num is unknown in this case @@ -872,7 +874,7 @@ namespace db0 if (!!m_ext_space) { end_page_num = m_ext_space.getAbsolute(end_page_num); } - copyPageIO(m_page_io, out.m_page_io, end_page_num, out.m_ext_space); + copyPageIO(m_page_io, m_ext_space, out.m_page_io, end_page_num, out.m_ext_space); } copyStream(m_meta_io, out.m_meta_io); @@ -890,14 +892,20 @@ namespace db0 return *this; } - std::uint64_t BDevStorage::getNextStoragePageNum() const - { + std::optional BDevStorage::getNextStoragePageNum() const + { // NOTE: in no-load mode we cannot use sparse_pair - // therefore will calculate end page bound from the file size + // therefore will calculate end page bound from the file size (absolute page number) if (m_flags[StorageOptions::NO_LOAD]) { return (m_file.size() - CONFIG_BLOCK_SIZE) / m_config.m_page_size; } - return m_sparse_pair.getNextStoragePageNum(); + + auto page_io_id = m_sparse_pair.getNextStoragePageNum(); + if (!!m_ext_space && page_io_id) { + // convert to absolute page number + page_io_id = m_ext_space.getAbsolute(*page_io_id); + } + return page_io_id; } - + } \ No newline at end of file diff --git a/src/dbzero/core/storage/BDevStorage.hpp b/src/dbzero/core/storage/BDevStorage.hpp index c8700a00..ffbd299c 100644 --- a/src/dbzero/core/storage/BDevStorage.hpp +++ b/src/dbzero/core/storage/BDevStorage.hpp @@ -216,7 +216,7 @@ DB0_PACKED_END */ std::uint64_t getBlockCount(std::uint64_t file_size) const; - std::uint64_t getNextStoragePageNum() const; + std::optional getNextStoragePageNum() const; BlockIOStream getBlockIOStream(std::uint64_t first_block_pos, AccessType); @@ -229,7 +229,7 @@ DB0_PACKED_END { return { m_file, first_block_pos, m_config.m_block_size, getTailFunction(), access_type }; } - + template std::unique_ptr tryGetChangeLogIOStream(std::uint64_t first_block_pos, AccessType access_type) { @@ -245,7 +245,7 @@ DB0_PACKED_END MetaIOStream getMetaIOStream(std::uint64_t first_block_pos, std::size_t step_size, AccessType); - Diff_IO getPage_IO(std::uint64_t next_page_hint, std::uint32_t step_size); + Diff_IO getPage_IO(std::optional next_page_hint, std::uint32_t step_size); o_prefix_config readConfig() const; diff --git a/src/dbzero/core/storage/BlockIOStream.cpp b/src/dbzero/core/storage/BlockIOStream.cpp index 2d7430c6..f7c1fe01 100644 --- a/src/dbzero/core/storage/BlockIOStream.cpp +++ b/src/dbzero/core/storage/BlockIOStream.cpp @@ -418,6 +418,7 @@ namespace db0 std::uint64_t BlockIOStream::tail() const { if (!m_eos) { + assert(false); THROWF(db0::InternalException) << "BlockIOStream::tail: Failed (must be EOS)"; } return m_address + m_block_size; diff --git a/src/dbzero/core/storage/CFile.cpp b/src/dbzero/core/storage/CFile.cpp index 2982be60..3cb40ef7 100644 --- a/src/dbzero/core/storage/CFile.cpp +++ b/src/dbzero/core/storage/CFile.cpp @@ -190,7 +190,7 @@ namespace db0 } void CFile::write(std::uint64_t address, std::size_t size, const void *buffer) - { + { std::unique_lock lock(m_mutex); assert(m_access_type != AccessType::READ_ONLY); if (address != m_file_pos) { diff --git a/src/dbzero/core/storage/ChangeLogTypes.hpp b/src/dbzero/core/storage/ChangeLogTypes.hpp index 7a67fe30..04f5b246 100644 --- a/src/dbzero/core/storage/ChangeLogTypes.hpp +++ b/src/dbzero/core/storage/ChangeLogTypes.hpp @@ -15,7 +15,7 @@ DB0_PACKED_BEGIN // state number this change log corresponds to StateNumType m_state_num; // sentinel storage page number for this transaction (see Page_IO::getEndPageNum()) - // always the ABSOLUTE storage page number + // NOTE: this value might be relative if the mapping is active std::uint64_t m_end_storage_page_num; o_dp_changelog_header(StateNumType state_num, std::uint64_t end_storage_page_num) diff --git a/src/dbzero/core/storage/DiffIndex.cpp b/src/dbzero/core/storage/DiffIndex.cpp index 2f090169..a5b303a1 100644 --- a/src/dbzero/core/storage/DiffIndex.cpp +++ b/src/dbzero/core/storage/DiffIndex.cpp @@ -149,6 +149,10 @@ namespace db0 { } + bool DiffIndex::empty() const { + return super_t::empty(); + } + std::size_t DiffIndex::size() const { return super_t::size(); } @@ -190,8 +194,8 @@ namespace db0 Address DiffIndex::getIndexAddress() const { return super_t::getIndexAddress(); } - - typename DiffIndex::PageNumT DiffIndex::getNextStoragePageNum() const { + + std::optional DiffIndex::getNextStoragePageNum() const { return super_t::getNextStoragePageNum(); } diff --git a/src/dbzero/core/storage/DiffIndex.hpp b/src/dbzero/core/storage/DiffIndex.hpp index bedb4748..cbb2b271 100644 --- a/src/dbzero/core/storage/DiffIndex.hpp +++ b/src/dbzero/core/storage/DiffIndex.hpp @@ -133,6 +133,7 @@ DB0_PACKED_END // been written witn an "overflow" - in which case actually 2 DPs were written void insert(PageNumT page_num, StateNumT state_num, PageNumT storage_page_num, bool overflow = false); + bool empty() const; std::size_t size() const; // Find mutation of page_num where state >= state_num @@ -140,7 +141,7 @@ DB0_PACKED_END // Find mutation ID of page_num where state <= state_num StateNumT findLower(PageNumT page_num, StateNumT state_num) const; - PageNumT getNextStoragePageNum() const; + std::optional getNextStoragePageNum() const; StateNumT getMaxStateNum() const; diff --git a/src/dbzero/core/storage/ExtSpace.cpp b/src/dbzero/core/storage/ExtSpace.cpp index cf55511e..fbfddce6 100644 --- a/src/dbzero/core/storage/ExtSpace.cpp +++ b/src/dbzero/core/storage/ExtSpace.cpp @@ -87,4 +87,12 @@ namespace db0 ); } + std::unique_ptr ExtSpace::tryBegin() const + { + if (!(*this) || !m_rel_index) { + return {}; + } + return std::make_unique(m_rel_index->cbegin()); + } + } \ No newline at end of file diff --git a/src/dbzero/core/storage/ExtSpace.hpp b/src/dbzero/core/storage/ExtSpace.hpp index decec414..8732de7b 100644 --- a/src/dbzero/core/storage/ExtSpace.hpp +++ b/src/dbzero/core/storage/ExtSpace.hpp @@ -46,6 +46,8 @@ DB0_PACKED_END public: using DP_ChangeLogT = BaseStorage::DP_ChangeLogT; using DP_ChangeLogStreamT = db0::ChangeLogIOStream; + using const_iterator = REL_Index::const_iterator; + struct tag_create {}; // NOTE: dram pair may be nullptr (for a null ExtSpace) @@ -69,6 +71,11 @@ DB0_PACKED_END return m_rel_index->getAbsolute(rel_page_num); } + std::uint64_t getRelative(std::uint64_t storage_page_num) const { + assert(m_rel_index); + return m_rel_index->getRelative(storage_page_num); + } + // Registers a new mapping rel_page_num -> storage_page_num // exception raised if unable to add the mapping void addMapping(std::uint64_t storage_page_num, std::uint64_t rel_page_num) { @@ -76,6 +83,9 @@ DB0_PACKED_END m_rel_index->addMapping(storage_page_num, rel_page_num); } + // Begins the iterator over sorted elements (on condition that ExtSpace is valid) + std::unique_ptr tryBegin() const; + void refresh(); void commit(); diff --git a/src/dbzero/core/storage/Page_IO.cpp b/src/dbzero/core/storage/Page_IO.cpp index f098875d..d7d44b44 100644 --- a/src/dbzero/core/storage/Page_IO.cpp +++ b/src/dbzero/core/storage/Page_IO.cpp @@ -26,6 +26,7 @@ namespace db0 , m_block_num(block_num) { assert(block_size % page_size == 0); + assert(m_address == m_header_size + m_first_page_num * m_page_size); } Page_IO::Page_IO(std::size_t header_size, CFile &file, std::uint32_t page_size) @@ -88,12 +89,14 @@ namespace db0 // allocate next block within the step m_address += m_block_size; m_first_page_num += m_block_capacity; + assert(m_address == m_header_size + m_first_page_num * m_page_size); m_page_count = 0; ++(*m_block_num); } else { // allocate the next step / block by appending it to the file m_address = std::max(this->tail(), m_tail_function()); m_first_page_num = getPageNum(m_address); + assert(m_address == m_header_size + m_first_page_num * m_page_size); m_page_count = 0; // initiate the next full step m_block_num = 0; @@ -113,7 +116,7 @@ namespace db0 } std::uint64_t Page_IO::getPageNum(std::uint64_t address) const { - return ((address - m_header_size) / m_block_size) * m_block_capacity; + return (address - m_header_size) / m_page_size; } std::uint64_t Page_IO::tail() const @@ -123,7 +126,7 @@ namespace db0 // reserve space up to end of the step return m_address + (m_step_size - *m_block_num) * m_block_size; } else { - // step not known, return end of current block + // step not known, return end of the current block return m_address + m_block_size; } } @@ -145,16 +148,68 @@ namespace db0 return { m_first_page_num + m_page_count, m_block_capacity - m_page_count }; } - std::uint64_t Page_IO::getEndPageNum() const + std::uint64_t Page_IO::getEndPageNum(bool *is_first_page_ptr) const { assert(m_access_type == AccessType::READ_WRITE); - return m_first_page_num + m_page_count; + if (is_first_page_ptr) { + // first page of the first block in the step + *is_first_page_ptr = (m_page_count == 0) && (m_block_num && *m_block_num == 0); + } + return m_first_page_num + m_page_count; } - Page_IO::Reader::Reader(const Page_IO &page_io, std::optional end_page_num) + Page_IO::StepIterator::StepIterator(const ExtSpace &ext_space) + : m_next_it(ext_space.tryBegin()) + { + if (m_next_it && !m_next_it->is_end()) { + m_current_page_num = (**m_next_it).m_storage_page_num; + m_current_rel_page_num = (**m_next_it).m_rel_page_num; + ++(*m_next_it); + } + } + + bool Page_IO::StepIterator::operator!() const { + return !m_next_it.get(); + } + + bool Page_IO::StepIterator::is_end() const { + return !m_current_page_num.has_value(); + } + + std::uint64_t Page_IO::StepIterator::operator*() const { + return *m_current_page_num; + } + + Page_IO::StepIterator &Page_IO::StepIterator::operator++() + { + if (m_next_it && !m_next_it->is_end()) { + m_current_page_num = (**m_next_it).m_storage_page_num; + m_current_rel_page_num = (**m_next_it).m_rel_page_num; + ++(*m_next_it); + } else { + m_current_page_num = std::nullopt; + m_current_rel_page_num = std::nullopt; + } + return *this; + } + + std::optional Page_IO::StepIterator::tryGetStepPages() const + { + if (m_next_it && !m_next_it->is_end()) { + // step size may not be larger the the distance between the 2 consecutive ext-space entries + // NOTE: the distance is measure between relative page numbers + return (**m_next_it).m_rel_page_num - *m_current_rel_page_num; + } + return std::nullopt; + } + + Page_IO::Reader::Reader(const Page_IO &page_io, const ExtSpace &ext_space, + std::optional end_page_num) : m_page_io(page_io) + , m_step_it(ext_space) , m_end_page_num(std::min(end_page_num.value_or(std::numeric_limits::max()), page_io.getEndPageNum())) - { + , m_current_page_num(getFirstPageNum(ext_space)) + { } std::uint32_t Page_IO::Reader::next(std::vector &buf, std::uint64_t &start_page_num, @@ -168,10 +223,34 @@ namespace db0 start_page_num = m_current_page_num; auto to_read = std::min(max_pages, m_end_page_num - m_current_page_num); + // align with the step size (if defined) + if (!!m_step_it) { + if (!m_step_it.is_end()) { + auto step_pages = m_step_it.tryGetStepPages(); + if (step_pages) { + auto step_end_page = *m_step_it + *step_pages; + to_read = std::min(to_read, step_end_page - m_current_page_num); + } + } + } + if (to_read > 0) { m_page_io.read(m_current_page_num, buf.data(), static_cast(to_read)); m_current_page_num += to_read; - return static_cast(to_read); + // move on to the next step if end of the current step reached + if (!!m_step_it) { + auto step_pages = m_step_it.tryGetStepPages(); + if (step_pages) { + auto step_end_page = *m_step_it + *step_pages; + if (m_current_page_num >= step_end_page) { + ++m_step_it; + if (!m_step_it.is_end()) { + // position at the beginning of the next step + m_current_page_num = *m_step_it; + } + } + } + } } return to_read; } @@ -186,6 +265,17 @@ namespace db0 return (file_size - m_page_io.m_header_size) / m_page_io.m_page_size; } + std::uint64_t Page_IO::Reader::getFirstPageNum(const ExtSpace &ext_space) const + { + if (!!ext_space) { + auto it = ext_space.tryBegin(); + if (it && !it->is_end()) { + return (**it).m_storage_page_num; + } + } + return 0; + } + void Page_IO::moveBy(std::uint32_t page_count) { if (!m_block_num) { @@ -212,6 +302,7 @@ namespace db0 // set new position variables (might be end of the block / step) m_first_page_num += page_diff; m_address += page_diff * m_page_size; + assert(m_address == m_header_size + m_first_page_num * m_page_size); m_block_num = new_block_num; m_page_count = page_count; } diff --git a/src/dbzero/core/storage/Page_IO.hpp b/src/dbzero/core/storage/Page_IO.hpp index 15715880..e53adc2c 100644 --- a/src/dbzero/core/storage/Page_IO.hpp +++ b/src/dbzero/core/storage/Page_IO.hpp @@ -4,6 +4,7 @@ #pragma once #include "CFile.hpp" +#include "ExtSpace.hpp" #include namespace db0 @@ -22,7 +23,7 @@ namespace db0 // @param file the underlying file object // @param page_size the size of a single page in bytes // @param block_size size of a unit block of pages to be pre-allocated by the stream - // @param address of the currently active block + // @param address of the currently active block (for append) // @param page_count the number of pages already stored in the current block // @param step_size number of blocks per single indivisible step (for REL_Index mapping) // @param tail_function a function returning current (unflushed) size of the file (Page IO excluded) @@ -63,7 +64,7 @@ namespace db0 // Get the page number which is > all pages currently stored // This value can act as a "sentinel" for end-of-stream (at the moment of the call) // NOTE: the member is only available in read/write mode - std::uint64_t getEndPageNum() const; + std::uint64_t getEndPageNum(bool *is_first_page = nullptr) const; // Get the next page number to be assigned by the "append" method (first) // and the number of consecutive pages available in the current block @@ -72,29 +73,64 @@ namespace db0 // Get the number of pages remaining in the current step (for append) std::uint32_t getCurrentStepRemainingPages() const; - std::uint32_t getStepSize() const { + // @return step size in number of blocks + std::size_t getStepSize() const { return m_step_size; } + + // @return block size in bytes + std::size_t getBlockSize() const { + return m_block_size; + } + + class StepIterator + { + public: + StepIterator(const ExtSpace &); + + bool operator!() const; + + bool is_end() const; + // @retrun storage page number of the current step + std::uint64_t operator*() const; + + StepIterator &operator++(); + std::optional tryGetStepPages() const; + + private: + std::optional m_current_page_num; + std::optional m_current_rel_page_num; + // next step's iterator (may be end) + std::unique_ptr m_next_it; + }; // Reads entire blocks / steps sequentially // until reaching the end_page_num or end-of-stream whichever comes first class Reader { public: - Reader(const Page_IO &page_io, std::optional end_page_num = {}); + // @param ext_space optional ExtSpace for locating data "steps" and + // for translating into relative page numbers + Reader(const Page_IO &page_io, const ExtSpace &ext_space, + std::optional end_page_num = {}); // Reads up to max_bytes of data + // @param start_page_num the first storage page number read in this call // @return number of pages read, 0 if end-of-stream reached std::uint32_t next(std::vector &, std::uint64_t &start_page_num, std::size_t max_bytes = 64u << 20); private: const Page_IO &m_page_io; + StepIterator m_step_it; std::uint64_t m_end_page_num; + // current storage page number std::uint64_t m_current_page_num = 0; // Calculate end page number from actual file size std::uint64_t endPageNum() const; + // First storage page number to read from + std::uint64_t getFirstPageNum(const ExtSpace &) const; }; protected: diff --git a/src/dbzero/core/storage/REL_Index.cpp b/src/dbzero/core/storage/REL_Index.cpp index 4b302c34..989f054f 100644 --- a/src/dbzero/core/storage/REL_Index.cpp +++ b/src/dbzero/core/storage/REL_Index.cpp @@ -15,6 +15,14 @@ namespace db0 return ss.str(); } + bool REL_ItemCompT::operator()(const REL_Item &item, REL_StoragePageNum storage_page_num) const { + return item.m_storage_page_num < storage_page_num.m_value; + } + + bool REL_ItemCompT::operator()(REL_StoragePageNum storage_page_num, const REL_Item &item) const { + return storage_page_num.m_value < item.m_storage_page_num; + } + bool REL_ItemCompT::operator()(const REL_Item &lhs, const REL_Item &rhs) const { return lhs.m_rel_page_num < rhs.m_rel_page_num; } @@ -56,7 +64,15 @@ namespace db0 bool REL_CompressedItemEqualT::operator()(const REL_CompressedItem &lhs, const REL_CompressedItem &rhs) const { return lhs.m_compressed_rel_page_num == rhs.m_compressed_rel_page_num; } - + + bool REL_CompressedItemCompT::operator()(const REL_CompressedItem &item, REL_StoragePageNum storage_page_num) const { + return item.m_storage_page_num < storage_page_num.m_value; + } + + bool REL_CompressedItemCompT::operator()(REL_StoragePageNum storage_page_num, const REL_CompressedItem &item) const { + return storage_page_num.m_value < item.m_storage_page_num; + } + REL_CompressedItem::REL_CompressedItem(std::uint32_t first_rel_page_num, const REL_Item &item) : m_storage_page_num(item.m_storage_page_num) , m_flags(item.m_flags) @@ -124,6 +140,14 @@ namespace db0 return m_first_page_num == (rel_page_num >> 32); } + bool REL_IndexTypes::BlockHeader::canFit(REL_StoragePageNum) const { + return true; + } + + REL_StoragePageNum REL_IndexTypes::BlockHeader::compress(REL_StoragePageNum storage_page_num) const { + return storage_page_num; + } + std::string REL_IndexTypes::BlockHeader::toString(const CompressedItemT &item) const { auto full_item = uncompress(item); @@ -163,20 +187,20 @@ namespace db0 } std::uint64_t REL_Index::assignRelative(std::uint64_t storage_page_num, bool is_first_in_step) - { - if (is_first_in_step) { + { + if (is_first_in_step) { super_t::insert({ ++m_max_rel_page_num, storage_page_num }); assert(storage_page_num > m_last_storage_page_num); m_last_storage_page_num = storage_page_num; m_rel_page_num = m_max_rel_page_num; } - + assert(storage_page_num >= m_last_storage_page_num); auto result = m_rel_page_num + (storage_page_num - m_last_storage_page_num); if (result > m_max_rel_page_num) { m_max_rel_page_num = result; } - + return result; } @@ -217,10 +241,26 @@ namespace db0 return result->m_storage_page_num + (rel_page_num - result->m_rel_page_num); } + std::uint64_t REL_Index::getRelative(std::uint64_t storage_page_num) const + { + // Query using an alternative comparator + // - by storage page num only (which is stored preserving the same order as relative page num) + auto result = super_t::lower_equal_bound(REL_StoragePageNum { storage_page_num}); + if (!result) { + THROWF(db0::InternalException) << "REL_Index: page lookup failed on: " << storage_page_num; + } + // translate to relative page number + return result->m_rel_page_num + (storage_page_num - result->m_storage_page_num); + } + std::uint64_t REL_Index::size() const { return super_t::size(); } + REL_Index::const_iterator REL_Index::cbegin() const { + return super_t::cbegin(); + } + } namespace std diff --git a/src/dbzero/core/storage/REL_Index.hpp b/src/dbzero/core/storage/REL_Index.hpp index f9d701ae..732aa326 100644 --- a/src/dbzero/core/storage/REL_Index.hpp +++ b/src/dbzero/core/storage/REL_Index.hpp @@ -25,11 +25,21 @@ namespace db0 using REL_Flags = FlagSet; + // Type to enable comparing by storage page number only + struct REL_StoragePageNum + { + std::uint64_t m_value; + }; + struct REL_ItemCompT { bool operator()(const REL_Item &lhs, const REL_Item &rhs) const; bool operator()(const REL_Item &lhs, std::uint64_t rhs) const; bool operator()(std::uint64_t lhs, const REL_Item &rhs) const; + + // Comparison by storage page number only + bool operator()(const REL_Item &, REL_StoragePageNum) const; + bool operator()(REL_StoragePageNum, const REL_Item &) const; }; struct REL_ItemEqualT @@ -68,6 +78,9 @@ DB0_PACKED_END struct REL_CompressedItemCompT { bool operator()(const REL_CompressedItem &, const REL_CompressedItem &) const; + // compare by absolute storage page number + bool operator()(const REL_CompressedItem &, REL_StoragePageNum) const; + bool operator()(REL_StoragePageNum, const REL_CompressedItem &) const; }; struct REL_CompressedItemEqualT @@ -75,19 +88,27 @@ DB0_PACKED_END bool operator()(const REL_CompressedItem &, const REL_CompressedItem &) const; }; + // Alternative comparators, by the absolute storage page number + struct REL_CompressedItemAltCompT + { + bool operator()(const REL_CompressedItem &, const REL_CompressedItem &) const; + }; + // Compressed items are actual in-memory representation DB0_PACKED_BEGIN struct DB0_PACKED_ATTR REL_CompressedItem { using CompT = REL_CompressedItemCompT; using EqualT = REL_CompressedItemEqualT; + + REL_CompressedItem() = default; // construct REL-compressed item relative to the specific page number - i.e. first_page_num REL_CompressedItem(std::uint32_t first_rel_page_num, const REL_Item &); REL_CompressedItem(std::uint32_t first_rel_page_num, std::uint64_t rel_page_num, std::uint64_t storage_page_num, REL_Flags flags = {}); - std::uint32_t m_compressed_rel_page_num; - std::uint64_t m_storage_page_num; + std::uint32_t m_compressed_rel_page_num = 0; + std::uint64_t m_storage_page_num = 0; REL_Flags m_flags; // uncompress relative to a specific page number @@ -132,12 +153,16 @@ DB0_PACKED_END std::uint64_t getRelPageNum(const CompressedItemT &) const; bool canFit(const ItemT &) const; - bool canFit(std::uint64_t rel_page_num) const; + bool canFit(std::uint64_t rel_page_num) const; std::string toString(const CompressedItemT &) const; std::string toString() const; - }; + // members added for type compatibility + bool canFit(REL_StoragePageNum) const; + REL_StoragePageNum compress(REL_StoragePageNum) const; + }; + // DRAM space deployed REL-index (in-memory) using IndexT = SGB_CompressedLookupTree< REL_Item, REL_CompressedItem, BlockHeader, @@ -146,6 +171,7 @@ DB0_PACKED_END using ConstNodeIterator = typename IndexT::sg_tree_const_iterator; using ConstItemIterator = typename IndexT::ConstItemIterator; + using const_iterator = typename IndexT::uncompressed_const_iterator; }; // REL_Index holds a complete mapping from relative to absolute Page IO addresses @@ -155,6 +181,7 @@ DB0_PACKED_END { public: using super_t = REL_IndexTypes::IndexT; + using const_iterator = REL_IndexTypes::const_iterator; // as null REL_Index() = default; @@ -168,6 +195,8 @@ DB0_PACKED_END // Retrieve storage (absolute) page num for a given relative page num std::uint64_t getAbsolute(std::uint64_t rel_page_num) const; + // Retrieve relative page num for a given storage (absolute) page num + std::uint64_t getRelative(std::uint64_t storage_page_num) const; db0::Address getAddress() const; @@ -182,6 +211,8 @@ DB0_PACKED_END void refresh(); std::uint64_t size() const; + + const_iterator cbegin() const; private: // values maintained in-sync with the tree diff --git a/src/dbzero/core/storage/SparseIndexBase.hpp b/src/dbzero/core/storage/SparseIndexBase.hpp index 23abd946..0b8a3bd4 100644 --- a/src/dbzero/core/storage/SparseIndexBase.hpp +++ b/src/dbzero/core/storage/SparseIndexBase.hpp @@ -89,7 +89,7 @@ namespace db0 /** * Get next storage page number expected to be assigned */ - PageNumT getNextStoragePageNum() const; + std::optional getNextStoragePageNum() const; /** * Get the maximum used state number @@ -104,7 +104,9 @@ namespace db0 void forAll(std::function callback) const { m_index.forAll(callback); } - + + bool empty() const; + // Get the total number of data page descriptors stored in the index std::size_t size() const; @@ -361,11 +363,15 @@ DB0_PACKED_END } template - typename SparseIndexBase::PageNumT - SparseIndexBase::getNextStoragePageNum() const { + std::optional::PageNumT> + SparseIndexBase::getNextStoragePageNum() const + { + if (this->empty() ) { + return std::nullopt; + } return m_next_page_num; } - + template typename SparseIndexBase::StateNumT SparseIndexBase::getMaxStateNum() const { @@ -393,6 +399,11 @@ DB0_PACKED_END return _str.str(); } + template + bool SparseIndexBase::empty() const { + return m_index.empty(); + } + template std::size_t SparseIndexBase::size() const { return m_index.size(); diff --git a/src/dbzero/core/storage/SparsePair.cpp b/src/dbzero/core/storage/SparsePair.cpp index 6ef242e7..05dfff06 100644 --- a/src/dbzero/core/storage/SparsePair.cpp +++ b/src/dbzero/core/storage/SparsePair.cpp @@ -2,6 +2,7 @@ // Copyright (c) 2025 DBZero Software sp. z o.o. #include "SparsePair.hpp" +#include namespace db0 @@ -31,8 +32,8 @@ namespace db0 { } - typename SparsePair::PageNumT SparsePair::getNextStoragePageNum() const { - return std::max(m_sparse_index.getNextStoragePageNum(), m_diff_index.getNextStoragePageNum()); + std::optional SparsePair::getNextStoragePageNum() const { + return optional_max(m_sparse_index.getNextStoragePageNum(), m_diff_index.getNextStoragePageNum()); } typename SparsePair::StateNumT SparsePair::getMaxStateNum() const { @@ -49,6 +50,10 @@ namespace db0 return m_sparse_index.size() + m_diff_index.size(); } + bool SparsePair::empty() const { + return m_sparse_index.empty() && m_diff_index.empty(); + } + const SparsePair::DP_ChangeLogT &SparsePair::extractChangeLog(DP_ChangeLogStreamT &changelog_io, std::uint64_t end_storage_page_num) { diff --git a/src/dbzero/core/storage/SparsePair.hpp b/src/dbzero/core/storage/SparsePair.hpp index 3942d33f..d7a4d0b1 100644 --- a/src/dbzero/core/storage/SparsePair.hpp +++ b/src/dbzero/core/storage/SparsePair.hpp @@ -47,11 +47,12 @@ namespace db0 } // combine from both underlyig indexes - PageNumT getNextStoragePageNum() const; + std::optional getNextStoragePageNum() const; // combine from both underlyig indexes StateNumT getMaxStateNum() const; + bool empty() const; std::size_t size() const; void refresh(); diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index 385d11f4..35d5c45e 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -57,27 +57,59 @@ namespace db0 return o_change_log_t::__const_ref(last_chunk_buf.data()); } - void copyPageIO(const Page_IO &in, Page_IO &out, std::uint64_t end_page_num, ExtSpace &ext_space) + // Debug & validation function - to compare pages of the 2 streams (e.g. source and copy) + // NOTE: both streams may store under different absolute page numbers but same relative + // @param rel_page_num relative page number in the ExtSpace + bool comparePages(const Page_IO &first, const ExtSpace &first_ext_space, const Page_IO &second, + const ExtSpace &second_ext_space, std::uint64_t rel_page_num) + { + if (first.getPageSize() != second.getPageSize()) { + THROWF(db0::IOException) << "comparePages: page size mismatch between input streams"; + } + auto page_size = first.getPageSize(); + auto page_num_1 = rel_page_num; + if (!!first_ext_space) { + page_num_1 = first_ext_space.getAbsolute(rel_page_num); + assert(rel_page_num == first_ext_space.getRelative(page_num_1)); + } + auto page_num_2 = rel_page_num; + if (!!second_ext_space) { + page_num_2 = second_ext_space.getAbsolute(rel_page_num); + assert(rel_page_num == second_ext_space.getRelative(page_num_2)); + } + std::vector buf_1(page_size); + first.read(page_num_1, buf_1.data()); + std::vector buf_2(page_size); + second.read(page_num_2, buf_2.data()); + return memcmp(buf_1.data(), buf_2.data(), page_size) == 0; + } + + void copyPageIO(const Page_IO &in, const ExtSpace &src_ext_space, Page_IO &out, + std::uint64_t end_page_num, ExtSpace &ext_space) { std::size_t page_size = in.getPageSize(); if (page_size != out.getPageSize()) { THROWF(db0::IOException) << "copyPageIO: page size mismatch between input and output streams"; } - - Page_IO::Reader reader(in, end_page_num); + + Page_IO::Reader reader(in, src_ext_space, end_page_num); std::vector buffer; std::uint64_t start_page_num = 0; while (auto page_count = reader.next(buffer, start_page_num)) { auto buf_ptr = buffer.data(); + if (!!src_ext_space) { + // translate to relative page number + start_page_num = src_ext_space.getRelative(start_page_num); + } while (page_count > 0) { // page number (absolute) in the output stream auto storage_page_num = out.getNextPageNum().first; auto count = std::min(page_count, out.getCurrentStepRemainingPages()); - // append as many pages as possible in current "step" + // append as many pages as possible in the current "step" out.append(buf_ptr, count); buf_ptr += page_size * count; // note start_page_num must be registered as relative to storage_page_num - // note each step might require is own mapping (unless stored as consecutive pages) + // note each step might require its own mapping (unless stored as consecutive pages) // the de-duplication logic is handled by ExtSpace ext_space.addMapping(storage_page_num, start_page_num); page_count -= count; diff --git a/src/dbzero/core/storage/copy_prefix.hpp b/src/dbzero/core/storage/copy_prefix.hpp index 72402733..8c0dd8f3 100644 --- a/src/dbzero/core/storage/copy_prefix.hpp +++ b/src/dbzero/core/storage/copy_prefix.hpp @@ -35,11 +35,13 @@ namespace db0 // Copy raw contents of a specific Page_IO up to a specific storage page number // @param in the input (source) Page_IO (must NOT define ext-space - i.e. absolute / relative mapping) + // @param src_ext_space the source ExtSpace (to retrieve relative mappings if any) // @param out the output Page_IO // @param end_page_num the storage page number (not to be exceeded on copy) // @param ext_space the ExtSpace to assign new relative page numbers on copy // NOTE: after copy the source "absolute" page numbers will be corresponding do destination's relative page numbers // therefore we have no need to translate the source DRAM_IO - void copyPageIO(const Page_IO &in, Page_IO &out, std::uint64_t end_page_num, ExtSpace &ext_space); + void copyPageIO(const Page_IO &in, const ExtSpace &src_ext_space, Page_IO &out, + std::uint64_t end_page_num, ExtSpace &ext_space); } \ No newline at end of file diff --git a/tests/unit_tests/REL_IndexTest.cpp b/tests/unit_tests/REL_IndexTest.cpp new file mode 100644 index 00000000..1191cbb6 --- /dev/null +++ b/tests/unit_tests/REL_IndexTest.cpp @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (c) 2025 DBZero Software sp. z o.o. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace db0; +using namespace db0::tests; + +namespace tests + +{ + + class REL_IndexTest: public testing::Test + { + }; + + TEST_F( REL_IndexTest , testREL_IndexGetAbsolute ) + { + auto node_size = 16u << 10; + auto memspace = DRAMSpace::create(node_size); + REL_Index cut(memspace, 16u << 10, AccessType::READ_WRITE); + std::vector items { + // relative page number, absolute page number + { 0, 100 }, { 50, 200 }, { 100, 300 }, { 150, 400 }, { 200, 500 } + }; + + for (auto &item: items) { + cut.addMapping(item.m_storage_page_num, item.m_rel_page_num); + } + + // relative -> absolute queries + std::vector > queries { + { 13, 113 }, { 50, 200 }, { 75, 225 }, { 100, 300 }, { 125, 325 }, { 150, 400 }, + { 175, 425 }, { 200, 500 }, { 0, 100 } + }; + + for (auto &query: queries) { + auto abs_page_num = cut.getAbsolute(query.first); + ASSERT_EQ(abs_page_num, query.second) + << "Relative page num: " << query.first; + } + } + + TEST_F( REL_IndexTest , testREL_IndexGetRelative ) + { + auto node_size = 16u << 10; + auto memspace = DRAMSpace::create(node_size); + REL_Index cut(memspace, 16u << 10, AccessType::READ_WRITE); + std::vector items { + // relative page number, absolute page number + { 0, 100 }, { 50, 200 }, { 100, 300 }, { 150, 400 }, { 200, 500 } + }; + + for (auto &item: items) { + cut.addMapping(item.m_storage_page_num, item.m_rel_page_num); + } + + // absolute -> relative queries + std::vector > queries { + { 113, 13 }, { 200, 50 }, { 225, 75 }, { 300, 100 }, { 325, 125 }, { 400, 150 }, + { 425, 175 }, { 500, 200 }, { 100, 0 } + }; + + for (auto &query: queries) { + auto rel_page_num = cut.getRelative(query.first); + ASSERT_EQ(rel_page_num, query.second) + << "Absolute page num: " << query.first; + } + } + + TEST_F( REL_IndexTest , testREL_IndexSortedIteration ) + { + auto node_size = 16u << 10; + auto memspace = DRAMSpace::create(node_size); + REL_Index cut(memspace, 16u << 10, AccessType::READ_WRITE); + std::vector items { + // relative page number, absolute page number + { 0, 100 }, { 50, 200 }, { 60, 210 }, { 100, 300 }, { 150, 400 }, { 160, 410 }, { 200, 500 } + }; + + for (auto &item: items) { + cut.addMapping(item.m_storage_page_num, item.m_rel_page_num); + } + + std::vector rel_page_nums; + auto it = cut.cbegin(); + while (!it.is_end()) { + rel_page_nums.push_back((*it).m_rel_page_num); + ++it; + } + } + + TEST_F( REL_IndexTest , testREL_IndexIteratorIssue1 ) + { + auto node_size = 16u << 10; + auto memspace = DRAMSpace::create(node_size); + REL_Index cut(memspace, 16u << 10, AccessType::READ_WRITE); + cut.addMapping(32, 0); + cut.addMapping(64, 14); + cut.assignRelative(128, true); + cut.assignRelative(144, true); + + std::vector rel_page_nums; + auto it = cut.cbegin(); + while (!it.is_end()) { + rel_page_nums.push_back((*it).m_storage_page_num); + ++it; + } + ASSERT_EQ(rel_page_nums, (std::vector{32, 64, 128, 144})); + } + +}