From 95185f524fa9eeb077e0e6e73c6d89f85bf34d4f Mon Sep 17 00:00:00 2001 From: Wojtek Date: Tue, 9 Dec 2025 19:02:37 +0100 Subject: [PATCH 1/3] WIP: save work --- python_tests/test_copy_prefix.py | 28 ++++--- python_tests/test_issues_14.py | 78 +++++++++++++++++++ src/dbzero/core/memory/config.cpp | 5 ++ src/dbzero/core/memory/config.hpp | 3 + src/dbzero/core/storage/BDevStorage.cpp | 11 +++ src/dbzero/core/storage/BlockIOStream.hpp | 6 +- src/dbzero/core/storage/ChangeLogIOStream.cpp | 20 ++++- src/dbzero/core/storage/ChangeLogIOStream.hpp | 4 +- src/dbzero/core/storage/Diff_IO.cpp | 25 ++---- src/dbzero/core/storage/Diff_IO.hpp | 2 - src/dbzero/core/storage/ExtSpace.hpp | 6 +- src/dbzero/core/storage/MetaIOStream.cpp | 13 ++-- src/dbzero/core/storage/REL_Index.cpp | 33 ++++---- src/dbzero/core/storage/REL_Index.hpp | 8 +- src/dbzero/core/storage/copy_prefix.cpp | 2 +- 15 files changed, 177 insertions(+), 67 deletions(-) create mode 100644 python_tests/test_issues_14.py diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index 82bec855..977bb502 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -100,14 +100,13 @@ def writer_process(prefix, obj_count = 50, commit_count = 50, long_run = False): root.value.append(MemoTestClass("b" * 1024)) # 1 KB string db0.commit() if long_run: - print(f"Writer process: committed {i * obj_count} objects") + print(f"Writer process: committed {(i + 1) * obj_count} objects", flush=True) else: time.sleep(0.1) if long_run: - print(db0.get_storage_stats()) - db0.commit() - db0.close() + print(db0.get_storage_stats(), flush=True) + db0.close() def test_copy_prefix_being_actively_modified(db0_fixture): @@ -194,13 +193,15 @@ def test_copy_prefix_without_opening_it(db0_fixture): @pytest.mark.stress_test +# FIXME: log +@pytest.mark.parametrize("db0_fixture", [{"autocommit": False}], indirect=True) def test_copy_prefix_continuous_process(db0_fixture): px_name = db0.get_current_prefix().name px_path = os.path.join(DB0_DIR, px_name + ".db0") def validate_current_prefix(expected_len = None, expected_min_len = None): root = db0.fetch(MemoTestSingleton) - assert not expected_min_len or len(root.value) > expected_min_len + assert not expected_min_len or len(root.value) >= expected_min_len assert not expected_len or len(root.value) == expected_len for item in root.value: assert item.value == "b" * 1024 @@ -223,6 +224,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): # in each 'epoch' we modify prefix while making copies # then drop the original prefix and restore if from the last copy epoch_count = 2 + total_len = 0 for epoch in range(epoch_count): print(f"=== Epoch {epoch} ===") obj_count = 5000 @@ -253,7 +255,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): if os.path.exists(file_name): os.remove(file_name) # copy prefix without opening it, use default step size - print("--- Copying prefix iteration", copy_id) + print("--- Copying prefix iteration", copy_id) db0.copy_prefix(file_name, prefix=px_name) print("--- copy finished") copy_id += 1 @@ -262,11 +264,11 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): time.sleep(2.5) # wait a bit before next copy p.join() + total_len += obj_count * commit_count # validate original prefix (no copy yet) - print("Validating final prefix ...") - db0.open(px_name, "r") - validate_current_prefix(expected_len = obj_count * commit_count) + # print("Validating final prefix ...", flush=True) + # validate_current_prefix(expected_len = total_len) # make final stale copy (i.e. without active modifications) final_copy = f"./test-copy-final.db0" @@ -276,10 +278,12 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): db0.close() print("Validating all copies") - validate_copy("final", expected_len = obj_count * commit_count) + validate_copy("final", expected_len = total_len) for i in range(copy_id): last_len = validate_copy(i, expected_min_len = last_len) print(f"--- Copy {i} valid with {last_len} objects") + # this is the restored version + total_len = last_len # now, continue modifications starting from the last restored copy (making new copies) @@ -364,9 +368,9 @@ def validate(expected_len): os.rename(file_name, px_path) # open recovered prefix for update - db0.init(DB0_DIR, prefix=px_name, read_write=True) + db0.init(DB0_DIR, prefix=px_name, read_write=True) total_len += modify_prefix(1350) - + db0.close() db0.init(DB0_DIR, prefix=px_name, read_write=True) validate(total_len) diff --git a/python_tests/test_issues_14.py b/python_tests/test_issues_14.py new file mode 100644 index 00000000..1a4bde08 --- /dev/null +++ b/python_tests/test_issues_14.py @@ -0,0 +1,78 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (c) 2025 DBZero Software sp. z o.o. + +import dbzero as db0 +import pytest +from .conftest import DB0_DIR +from .memo_test_types import MemoTestSingleton, MemoTestClass +import multiprocessing +import os +import time + + +def writer_process(prefix, obj_count = 50, commit_count = 50): + db0.init(DB0_DIR) + db0.open(prefix, "rw") + # create new or open an existing root object + root = MemoTestSingleton([]) + if (len(root.value) > 0): + print(f"Writer process: opened existing prefix with {len(root.value)} objects") + for i in range(commit_count): + for _ in range(obj_count): + root.value.append(MemoTestClass("b" * 1024)) # 1 KB string + db0.commit() + + db0.commit() + db0.close() + + +@pytest.mark.parametrize("db0_fixture", [{"autocommit": False}], indirect=True) +def test_copy_prefix_issue1(db0_fixture): + """ + Issue: test failing with RuntimeError: Diff block not found + """ + px_name = db0.get_current_prefix().name + px_path = os.path.join(DB0_DIR, px_name + ".db0") + + def validate_current_prefix(expected_len = None, expected_min_len = None): + root = db0.fetch(MemoTestSingleton) + assert not expected_min_len or len(root.value) > expected_min_len + assert not expected_len or len(root.value) == expected_len + for item in root.value: + assert item.value == "b" * 1024 + return len(root.value) + + def validate_copy(copy_id, expected_len = None, expected_min_len = None): + file_name = f"./test-copy-{copy_id}.db0" + os.remove(px_path) + # restore the copy + os.rename(file_name, px_path) + + db0.init(DB0_DIR, prefix=px_name, read_write=False) + result = validate_current_prefix(expected_len, expected_min_len) + db0.close() + return result + + db0.close() + + # in each 'epoch' we modify prefix while making copies + # then drop the original prefix and restore if from the last copy + epoch_count = 2 + total_len = 0 + for _ in range(epoch_count): + obj_count = 30 + commit_count = 3 + writer_process(px_name, obj_count, commit_count) + total_len += obj_count * commit_count + + db0.init(DB0_DIR) + db0.open(px_name, "r") + + # make final stale copy (i.e. without active modifications) + final_copy = f"./test-copy-final.db0" + if os.path.exists(final_copy): + os.remove(final_copy) + db0.copy_prefix(final_copy, prefix=px_name) + db0.close() + + validate_copy("final", expected_len = total_len) diff --git a/src/dbzero/core/memory/config.cpp b/src/dbzero/core/memory/config.cpp index 1bb14888..24bfacaa 100644 --- a/src/dbzero/core/memory/config.cpp +++ b/src/dbzero/core/memory/config.cpp @@ -2,6 +2,7 @@ // Copyright (c) 2025 DBZero Software sp. z o.o. #include "config.hpp" +#include namespace db0 @@ -12,4 +13,8 @@ namespace db0 bool Settings::__storage_validation = false; #endif + std::function Settings::m_decode_error = []() { + THROWF(db0::IOException) << "Data decoding error: corrupt data detected"; + }; + } diff --git a/src/dbzero/core/memory/config.hpp b/src/dbzero/core/memory/config.hpp index 14827b77..52e7d942 100644 --- a/src/dbzero/core/memory/config.hpp +++ b/src/dbzero/core/memory/config.hpp @@ -5,6 +5,7 @@ #include #include +#include namespace db0 @@ -27,6 +28,8 @@ namespace db0 // performs storage full read / write validation (with in-memory mirroring) static bool __storage_validation; #endif + // Function to throw the data decoding error (i.e. corrupt data detected) + static std::function m_decode_error; }; } diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index 5ee0ad9a..45aac5ac 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -78,6 +78,17 @@ namespace db0 if (m_access_type == AccessType::READ_ONLY && !m_flags.test(StorageOptions::NO_LOAD)) { refresh(); } + + // Validate state consistency + // The state number reported by DRAM IO must match the one in the DP changelog IO + if (auto chunk_ptr = m_dp_changelog_io.getLastChangeLogChunk()) { + auto dp_state_num = chunk_ptr->m_state_num; + auto dram_state_num = m_sparse_pair.getMaxStateNum(); + if (dram_state_num != dp_state_num) { + THROWF(db0::IOException) << "Inconsistent state: DRAM IO max state number " << dram_state_num + << " does not match DP changelog last state number " << dp_state_num; + } + } } BDevStorage::~BDevStorage() diff --git a/src/dbzero/core/storage/BlockIOStream.hpp b/src/dbzero/core/storage/BlockIOStream.hpp index 9a4092ee..ed6fbe73 100644 --- a/src/dbzero/core/storage/BlockIOStream.hpp +++ b/src/dbzero/core/storage/BlockIOStream.hpp @@ -96,7 +96,7 @@ DB0_PACKED_END BlockIOStream(const BlockIOStream &) = delete; - ~BlockIOStream(); + virtual ~BlockIOStream(); /** * Add a new chunk with a specific header @@ -123,6 +123,10 @@ DB0_PACKED_END */ std::size_t readChunk(std::vector &buffer, std::size_t expected_size = 0, std::uint64_t *address = nullptr); + // Reach the next chunk into the internal buffer (where available) + // The default implementation throws + virtual std::size_t readChunk(); + /** * Refresh method re-reads the tail block from disk. * It needs to be called to be able to retrieve file modifications done by other process/instance diff --git a/src/dbzero/core/storage/ChangeLogIOStream.cpp b/src/dbzero/core/storage/ChangeLogIOStream.cpp index 363f96a7..9317819b 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.cpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.cpp @@ -25,14 +25,28 @@ namespace db0 template const o_change_log_t *ChangeLogIOStream::readChangeLogChunk(std::vector &buffer) { - if (BlockIOStream::readChunk(buffer)) { - m_last_change_log_ptr = &o_change_log_t::__const_ref(buffer.data()); + if (this->readChunk(buffer)) { return m_last_change_log_ptr; } else { return nullptr; } } - + + template + std::size_t ChangeLogIOStream::readChunk(std::vector &buffer, std::size_t expected_size, + std::uint64_t *address) + { + auto result = BlockIOStream::readChunk(buffer, expected_size, address); + if (result) { + // reference with bounds validation + const_bounded_buf_t const_buf(Settings::m_decode_error, reinterpret_cast(buffer.data()), + reinterpret_cast(buffer.data() + buffer.size()) + ); + m_last_change_log_ptr = &o_change_log_t::__safe_const_ref(const_buf); + } + return result; + } + template const o_change_log_t *ChangeLogIOStream::readChangeLogChunk() { return readChangeLogChunk(m_buffer); diff --git a/src/dbzero/core/storage/ChangeLogIOStream.hpp b/src/dbzero/core/storage/ChangeLogIOStream.hpp index 6b3fc5b2..e8cf5416 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.hpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.hpp @@ -46,7 +46,9 @@ namespace db0 const o_change_log_t *readChangeLogChunk(); // Read chunk, bring your own buffer - const o_change_log_t *readChangeLogChunk(std::vector &buffer); + // @return pointer to the change-log chunk or nullptr if EOF (from an internal buffer) + const o_change_log_t *readChangeLogChunk(std::vector &buffer); + std::size_t readChunk() override; /** * Get last read or written change log chunk diff --git a/src/dbzero/core/storage/Diff_IO.cpp b/src/dbzero/core/storage/Diff_IO.cpp index ab73dab5..8ac92867 100644 --- a/src/dbzero/core/storage/Diff_IO.cpp +++ b/src/dbzero/core/storage/Diff_IO.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace db0 @@ -65,8 +66,7 @@ DB0_PACKED_END { public: // buffer is 2 pages long - DiffReader(Page_IO &, std::uint64_t page_num, std::byte *begin, std::byte *end, - const std::function &decode_fault); + DiffReader(Page_IO &, std::uint64_t page_num, std::byte *begin, std::byte *end); // appy diffs from a specific page / state number into a provided data buffer // if underflow occurs then next page needs to be fetched and apply repeated @@ -84,8 +84,7 @@ DB0_PACKED_END const std::byte *m_current; std::byte const *m_end; // the number of objects remaining to be read - unsigned int m_size = 0; - const std::function &m_decode_fault; + unsigned int m_size = 0; }; DiffWriter::DiffWriter(Page_IO &page_io, std::byte *begin, std::byte *end) @@ -168,22 +167,20 @@ DB0_PACKED_END return m_header.m_size == 0 && m_header.m_offset == 0; } - DiffReader::DiffReader(Page_IO &page_io, std::uint64_t page_num, std::byte *begin, std::byte *end, - const std::function &decode_fault) + DiffReader::DiffReader(Page_IO &page_io, std::uint64_t page_num, std::byte *begin, std::byte *end) : m_page_io(page_io) , m_page_size(page_io.getPageSize()) , m_page_num(page_num) , m_begin(begin) , m_current(begin + m_page_size) - , m_end(end) - , m_decode_fault(decode_fault) + , m_end(end) { page_io.read(page_num, m_begin + m_page_size); m_size = o_diff_header::__const_ref(m_current).m_size; // position at the first diff block m_current += o_diff_header::sizeOf() + o_diff_header::__const_ref(m_current).m_offset; if (m_current > m_end) { - m_decode_fault(); + Settings::m_decode_error(); } } @@ -206,7 +203,7 @@ DB0_PACKED_END } auto &diff_buf = o_diff_buffer::__safe_const_ref( - const_bounded_buf_t(m_decode_fault, m_current, m_end) + const_bounded_buf_t(Settings::m_decode_error, m_current, m_end) ); diff_buf.apply(dp_data, dp_data + m_page_size); m_current += diff_buf_size; @@ -244,18 +241,12 @@ DB0_PACKED_END , m_writer(std::make_unique( reinterpret_cast(*this), m_write_buf.data(), m_write_buf.data() + m_write_buf.size()) ) - , m_decode_fault([]() { - THROWF(db0::IOException) << "Diff_IO: decode fault - corrupt diff data"; - }) { } Diff_IO::Diff_IO(std::size_t header_size, CFile &file, std::uint32_t page_size) : Page_IO(header_size, file, page_size) , m_read_buf(page_size * 2) - , m_decode_fault([]() { - THROWF(db0::IOException) << "Diff_IO: decode fault - corrupt diff data"; - }) { } @@ -310,7 +301,7 @@ DB0_PACKED_END { // must lock because the read-buffer is shared std::unique_lock lock(m_mx_read); - DiffReader reader((Page_IO&)*this, page_num, m_read_buf.data(), m_read_buf.data() + m_read_buf.size(), m_decode_fault); + DiffReader reader((Page_IO&)*this, page_num, m_read_buf.data(), m_read_buf.data() + m_read_buf.size()); for (;;) { bool underflow = false; if (reader.apply((std::byte*)buffer, page_and_state, underflow)) { diff --git a/src/dbzero/core/storage/Diff_IO.hpp b/src/dbzero/core/storage/Diff_IO.hpp index 73c97b60..ca658f73 100644 --- a/src/dbzero/core/storage/Diff_IO.hpp +++ b/src/dbzero/core/storage/Diff_IO.hpp @@ -67,8 +67,6 @@ namespace db0 std::size_t m_full_dp_bytes_written = 0; // total bytes written using the diff mechanism std::size_t m_diff_bytes_written = 0; - // function throwing an exception on decode fault (corrupt diff data) - std::function m_decode_fault; }; } \ No newline at end of file diff --git a/src/dbzero/core/storage/ExtSpace.hpp b/src/dbzero/core/storage/ExtSpace.hpp index 8732de7b..6266976c 100644 --- a/src/dbzero/core/storage/ExtSpace.hpp +++ b/src/dbzero/core/storage/ExtSpace.hpp @@ -75,12 +75,12 @@ DB0_PACKED_END assert(m_rel_index); return m_rel_index->getRelative(storage_page_num); } - + // Registers a new mapping rel_page_num -> storage_page_num // exception raised if unable to add the mapping - void addMapping(std::uint64_t storage_page_num, std::uint64_t rel_page_num) { + void addMapping(std::uint64_t storage_page_num, std::uint64_t rel_page_num, std::uint32_t count) { assert(m_rel_index); - m_rel_index->addMapping(storage_page_num, rel_page_num); + m_rel_index->addMapping(storage_page_num, rel_page_num, count); } // Begins the iterator over sorted elements (on condition that ExtSpace is valid) diff --git a/src/dbzero/core/storage/MetaIOStream.cpp b/src/dbzero/core/storage/MetaIOStream.cpp index 2aa9f868..9620e0d3 100644 --- a/src/dbzero/core/storage/MetaIOStream.cpp +++ b/src/dbzero/core/storage/MetaIOStream.cpp @@ -101,7 +101,7 @@ namespace db0 return m_last_meta_log; } - void MetaIOStream::setTailAll() + void MetaIOStream::setTailAll() { auto tail_log = tailMetaLog(); if (tail_log) { @@ -112,20 +112,17 @@ namespace db0 } m_managed_streams[i]->setStreamPos(it->m_address, it->m_stream_pos); // exhaust the stream after setting the position - std::vector tmp_buf; - // exhaust the stream - while (m_managed_streams[i]->readChunk(tmp_buf)); + while (m_managed_streams[i]->readChunk()); } } else { // no tail item available, in such case simple exhaust all streams - for (std::size_t i = 0; i < m_managed_streams.size(); ++i) { - std::vector tmp_buf; + for (std::size_t i = 0; i < m_managed_streams.size(); ++i) { // exhaust the stream - while (m_managed_streams[i]->readChunk(tmp_buf)); + while (m_managed_streams[i]->readChunk()); } } } - + const o_meta_log *MetaIOStream::lowerBound(StateNumType state_num, std::vector &buf) const { State state; diff --git a/src/dbzero/core/storage/REL_Index.cpp b/src/dbzero/core/storage/REL_Index.cpp index 782335a5..e1a135e4 100644 --- a/src/dbzero/core/storage/REL_Index.cpp +++ b/src/dbzero/core/storage/REL_Index.cpp @@ -162,7 +162,7 @@ namespace db0 REL_Index::REL_Index(mptr ptr, std::size_t node_capacity, AccessType access_type) : super_t(ptr, node_capacity, access_type) - , m_last_storage_page_num(this->treeHeader().m_last_storage_page_num) + , m_storage_page_num(this->treeHeader().m_storage_page_num) , m_rel_page_num(this->treeHeader().m_rel_page_num) , m_max_rel_page_num(this->treeHeader().m_max_rel_page_num) { @@ -180,7 +180,7 @@ namespace db0 { // flush locally cached value auto &self = const_cast(*this); - self.modifyTreeHeader().m_last_storage_page_num = m_last_storage_page_num; + self.modifyTreeHeader().m_storage_page_num = m_storage_page_num; self.modifyTreeHeader().m_rel_page_num = m_rel_page_num; self.modifyTreeHeader().m_max_rel_page_num = m_max_rel_page_num; super_t::commit(); @@ -188,16 +188,16 @@ namespace db0 std::uint64_t REL_Index::assignRelative(std::uint64_t storage_page_num, bool is_first_in_step) { - assert(storage_page_num >= m_last_storage_page_num); - // prevent adding duplicate mapping (e.g. might be called multiple times after appendDiff) - if (is_first_in_step && (storage_page_num != m_last_storage_page_num)) { + assert(storage_page_num >= m_storage_page_num); + // prevent adding a duplicate mapping (e.g. might be called multiple times after appendDiff) + if (is_first_in_step && (storage_page_num != m_storage_page_num)) { super_t::insert({ ++m_max_rel_page_num, storage_page_num }); - assert(storage_page_num > m_last_storage_page_num); - m_last_storage_page_num = storage_page_num; + assert(storage_page_num > m_storage_page_num); + m_storage_page_num = storage_page_num; m_rel_page_num = m_max_rel_page_num; } - auto result = m_rel_page_num + (storage_page_num - m_last_storage_page_num); + auto result = m_rel_page_num + (storage_page_num - m_storage_page_num); if (result > m_max_rel_page_num) { m_max_rel_page_num = result; } @@ -205,17 +205,17 @@ namespace db0 return result; } - void REL_Index::addMapping(std::uint64_t storage_page_num, std::uint64_t rel_page_num) + void REL_Index::addMapping(std::uint64_t storage_page_num, std::uint64_t rel_page_num, std::uint32_t count) { - assert(storage_page_num >= m_last_storage_page_num); - assert(rel_page_num >= m_rel_page_num); + assert(count > 0); + assert(storage_page_num >= m_storage_page_num); + assert(rel_page_num >= m_max_rel_page_num); - m_max_rel_page_num = rel_page_num; - m_last_storage_page_num = storage_page_num; + m_max_rel_page_num = rel_page_num + count - 1; if (!this->empty()) { // check if the mapping is already valid - if (storage_page_num - m_last_storage_page_num == rel_page_num - m_rel_page_num) { - // mapping already valid + if (storage_page_num - m_storage_page_num == rel_page_num - m_rel_page_num) { + // mapping already valid, no need for registering another entry return; } } @@ -223,12 +223,13 @@ namespace db0 // register the new mapping super_t::insert({ rel_page_num, storage_page_num }); m_rel_page_num = rel_page_num; + m_storage_page_num = storage_page_num; } void REL_Index::refresh() { detach(); - m_last_storage_page_num = this->treeHeader().m_last_storage_page_num; + m_storage_page_num = this->treeHeader().m_storage_page_num; m_rel_page_num = this->treeHeader().m_rel_page_num; m_max_rel_page_num = this->treeHeader().m_max_rel_page_num; } diff --git a/src/dbzero/core/storage/REL_Index.hpp b/src/dbzero/core/storage/REL_Index.hpp index b3e3d6e8..87e74acd 100644 --- a/src/dbzero/core/storage/REL_Index.hpp +++ b/src/dbzero/core/storage/REL_Index.hpp @@ -124,7 +124,7 @@ DB0_PACKED_END struct DB0_PACKED_ATTR o_rel_index_header: o_fixed_versioned { // the largest registered mapping from absolute page number - std::uint64_t m_last_storage_page_num = 0; + std::uint64_t m_storage_page_num = 0; // relative page number associated with the std::uint64_t m_rel_page_num = 0; // the maximum assigned relative page number @@ -191,6 +191,7 @@ DB0_PACKED_END // Assign (append) a mapping from an absolute to relative page number // NOTE: the mapping needs to be persisted for each "first_in_step" page + // This member is called for EACH newly written data page std::uint64_t assignRelative(std::uint64_t storage_page_num, bool is_first_in_step); // Retrieve storage (absolute) page num for a given relative page num @@ -203,7 +204,8 @@ DB0_PACKED_END // Registers a new mapping rel_page_num -> storage_page_num // exception raised if unable to add the mapping // the method is used by copy_prefix - void addMapping(std::uint64_t storage_page_num, std::uint64_t rel_page_num); + // @param count the number of consecutive pages mapped from rel_page_num + void addMapping(std::uint64_t storage_page_num, std::uint64_t rel_page_num, std::uint32_t count); void detach() const; void commit() const; @@ -216,7 +218,7 @@ DB0_PACKED_END private: // values maintained in-sync with the tree - std::uint64_t m_last_storage_page_num = 0; + std::uint64_t m_storage_page_num = 0; // key of the last inserted item std::uint64_t m_rel_page_num = 0; // key of the last inserted item std::uint64_t m_max_rel_page_num = 0; }; diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index 35d5c45e..1110d545 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -111,7 +111,7 @@ namespace db0 // note start_page_num must be registered as relative to storage_page_num // note each step might require its own mapping (unless stored as consecutive pages) // the de-duplication logic is handled by ExtSpace - ext_space.addMapping(storage_page_num, start_page_num); + ext_space.addMapping(storage_page_num, start_page_num, count); page_count -= count; start_page_num += count; } From 129f6df51b230ab15fa03ffd48e3724979c27974 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Tue, 9 Dec 2025 20:50:48 +0100 Subject: [PATCH 2/3] copy prefix fixes --- python_tests/test_copy_prefix.py | 2 -- src/dbzero/core/storage/BlockIOStream.cpp | 4 ++++ src/dbzero/core/storage/BlockIOStream.hpp | 3 ++- src/dbzero/core/storage/ChangeLogIOStream.cpp | 20 ++++++++++++++----- src/dbzero/core/storage/ChangeLogIOStream.hpp | 4 ++++ 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index 977bb502..5bd25c63 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -193,8 +193,6 @@ def test_copy_prefix_without_opening_it(db0_fixture): @pytest.mark.stress_test -# FIXME: log -@pytest.mark.parametrize("db0_fixture", [{"autocommit": False}], indirect=True) def test_copy_prefix_continuous_process(db0_fixture): px_name = db0.get_current_prefix().name px_path = os.path.join(DB0_DIR, px_name + ".db0") diff --git a/src/dbzero/core/storage/BlockIOStream.cpp b/src/dbzero/core/storage/BlockIOStream.cpp index f7c1fe01..7942a63e 100644 --- a/src/dbzero/core/storage/BlockIOStream.cpp +++ b/src/dbzero/core/storage/BlockIOStream.cpp @@ -557,4 +557,8 @@ namespace db0 } } + std::size_t BlockIOStream::readChunk() { + THROWF(db0::InternalException) << "BlockIOStream::readChunk() operation not supported" << THROWF_END; + } + } \ No newline at end of file diff --git a/src/dbzero/core/storage/BlockIOStream.hpp b/src/dbzero/core/storage/BlockIOStream.hpp index ed6fbe73..61167403 100644 --- a/src/dbzero/core/storage/BlockIOStream.hpp +++ b/src/dbzero/core/storage/BlockIOStream.hpp @@ -121,7 +121,8 @@ DB0_PACKED_END * @param address if not null, the absolute address of the chunk is stored here * @return the number of bytes read or 0 if EOF */ - std::size_t readChunk(std::vector &buffer, std::size_t expected_size = 0, std::uint64_t *address = nullptr); + virtual std::size_t readChunk(std::vector &buffer, std::size_t expected_size = 0, + std::uint64_t *address = nullptr); // Reach the next chunk into the internal buffer (where available) // The default implementation throws diff --git a/src/dbzero/core/storage/ChangeLogIOStream.cpp b/src/dbzero/core/storage/ChangeLogIOStream.cpp index 9317819b..8c467e71 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.cpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.cpp @@ -25,11 +25,10 @@ namespace db0 template const o_change_log_t *ChangeLogIOStream::readChangeLogChunk(std::vector &buffer) { - if (this->readChunk(buffer)) { - return m_last_change_log_ptr; - } else { + if (!this->readChunk(buffer)) { return nullptr; } + return m_last_change_log_ptr; } template @@ -38,15 +37,26 @@ namespace db0 { auto result = BlockIOStream::readChunk(buffer, expected_size, address); if (result) { + // map to a local buffer + if (buffer.data() != m_buffer.data()) { + m_buffer.resize(buffer.size()); + std::copy(buffer.data(), buffer.data() + buffer.size(), m_buffer.data()); + } + // reference with bounds validation - const_bounded_buf_t const_buf(Settings::m_decode_error, reinterpret_cast(buffer.data()), - reinterpret_cast(buffer.data() + buffer.size()) + const_bounded_buf_t const_buf(Settings::m_decode_error, reinterpret_cast(m_buffer.data()), + reinterpret_cast(m_buffer.data() + m_buffer.size()) ); m_last_change_log_ptr = &o_change_log_t::__safe_const_ref(const_buf); } return result; } + template + std::size_t ChangeLogIOStream::readChunk() { + return this->readChunk(m_buffer); + } + template const o_change_log_t *ChangeLogIOStream::readChangeLogChunk() { return readChangeLogChunk(m_buffer); diff --git a/src/dbzero/core/storage/ChangeLogIOStream.hpp b/src/dbzero/core/storage/ChangeLogIOStream.hpp index e8cf5416..18bacca5 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.hpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.hpp @@ -48,6 +48,10 @@ namespace db0 // Read chunk, bring your own buffer // @return pointer to the change-log chunk or nullptr if EOF (from an internal buffer) const o_change_log_t *readChangeLogChunk(std::vector &buffer); + + std::size_t readChunk(std::vector &buffer, std::size_t expected_size = 0, + std::uint64_t *address = nullptr) override; + std::size_t readChunk() override; /** From 0b38aa643ab27159d386d8d78288396939c2d96e Mon Sep 17 00:00:00 2001 From: Wojtek Date: Tue, 9 Dec 2025 21:13:46 +0100 Subject: [PATCH 3/3] test fix --- tests/unit_tests/REL_IndexTest.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/unit_tests/REL_IndexTest.cpp b/tests/unit_tests/REL_IndexTest.cpp index 1191cbb6..3ed310d9 100644 --- a/tests/unit_tests/REL_IndexTest.cpp +++ b/tests/unit_tests/REL_IndexTest.cpp @@ -36,7 +36,7 @@ namespace tests }; for (auto &item: items) { - cut.addMapping(item.m_storage_page_num, item.m_rel_page_num); + cut.addMapping(item.m_storage_page_num, item.m_rel_page_num, 50); } // relative -> absolute queries @@ -63,7 +63,7 @@ namespace tests }; for (auto &item: items) { - cut.addMapping(item.m_storage_page_num, item.m_rel_page_num); + cut.addMapping(item.m_storage_page_num, item.m_rel_page_num, 50); } // absolute -> relative queries @@ -90,7 +90,7 @@ namespace tests }; for (auto &item: items) { - cut.addMapping(item.m_storage_page_num, item.m_rel_page_num); + cut.addMapping(item.m_storage_page_num, item.m_rel_page_num, 50); } std::vector rel_page_nums; @@ -106,8 +106,9 @@ namespace tests auto node_size = 16u << 10; auto memspace = DRAMSpace::create(node_size); REL_Index cut(memspace, 16u << 10, AccessType::READ_WRITE); - cut.addMapping(32, 0); - cut.addMapping(64, 14); + // storage page num, relative page num, count + cut.addMapping(32, 0, 14); + cut.addMapping(64, 14, 64); cut.assignRelative(128, true); cut.assignRelative(144, true);