From 8cd2a37e61c3bec6b70c7e74b1aac30de2800022 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Wed, 10 Dec 2025 14:22:56 +0100 Subject: [PATCH] fixes --- dbzero/dbzero/dbzero.py | 2 +- python_tests/test_copy_prefix.py | 3 +- src/dbzero/core/storage/BDevStorage.cpp | 43 +++++++++------ src/dbzero/core/storage/BDevStorage.hpp | 2 +- src/dbzero/core/storage/BaseStorage.hpp | 2 +- src/dbzero/core/storage/ChangeLog.cpp | 3 +- src/dbzero/core/storage/ChangeLog.hpp | 4 +- src/dbzero/core/storage/ChangeLogIOStream.cpp | 1 + src/dbzero/core/storage/ChangeLogIOStream.hpp | 1 + src/dbzero/core/storage/ChangeLogTypes.hpp | 20 ++++++- src/dbzero/core/storage/DRAM_IOStream.cpp | 55 +++++++++++++------ src/dbzero/core/storage/DRAM_IOStream.hpp | 25 ++++++--- src/dbzero/core/storage/copy_prefix.cpp | 9 ++- src/dbzero/object_model/LangCache.cpp | 2 +- tests/unit_tests/REL_IndexTest.cpp | 4 +- 15 files changed, 122 insertions(+), 54 deletions(-) diff --git a/dbzero/dbzero/dbzero.py b/dbzero/dbzero/dbzero.py index c9e4f4dc..21899e3d 100644 --- a/dbzero/dbzero/dbzero.py +++ b/dbzero/dbzero/dbzero.py @@ -10,7 +10,7 @@ def load_dynamic(name, path): def __bootstrap__(): global __bootstrap__, __loader__, __file__ - paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"] + paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"] __file__ = None for path in paths: if os.path.isdir(path): diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index 5bd25c63..25815f39 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -381,4 +381,5 @@ def validate(expected_len): # open prefix from recovered and modified copy of a copy db0.init(DB0_DIR, prefix=px_name, read_write=False) - validate(total_len) \ No newline at end of file + validate(total_len) + \ No newline at end of file diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index 45aac5ac..8897d0a4 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -80,13 +80,13 @@ namespace db0 } // Validate state consistency - // The state number reported by DRAM IO must match the one in the DP changelog IO + // The state number reported by DRAM IO must NOT superseed the last state number recorded in DP changelog if (auto chunk_ptr = m_dp_changelog_io.getLastChangeLogChunk()) { auto dp_state_num = chunk_ptr->m_state_num; auto dram_state_num = m_sparse_pair.getMaxStateNum(); - if (dram_state_num != dp_state_num) { - THROWF(db0::IOException) << "Inconsistent state: DRAM IO max state number " << dram_state_num - << " does not match DP changelog last state number " << dp_state_num; + if (dram_state_num > dp_state_num) { + THROWF(db0::IOException) << "Inconsistent state: DRAM state number " << dram_state_num + << " exceeds DP changelog state number " << dp_state_num; } } } @@ -676,14 +676,30 @@ namespace db0 ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos(); + std::optional dram_state_num; + // NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates + // without breaking the consistency + std::optional ext_dram_state_num; + + // reverts streams to previous positions + auto revert_streams = [&]() { + m_dram_changelog_io.setStreamPos(dram_changelog_io_pos); + m_dp_changelog_io.setStreamPos(dp_changelog_io_pos); + if (!!m_ext_space) { + assert(m_ext_dram_changelog_io); + m_ext_dram_changelog_io->setStreamPos(ext_dram_changelog_io_pos); + } + }; + try { - m_dram_io.beginApplyChanges(m_dram_changelog_io); + dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io); dram_changelog_io_pos = m_dram_changelog_io.getStreamPos(); if (!!m_ext_space) { assert(m_ext_dram_changelog_io); - m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io); + ext_dram_state_num = m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io); ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } + // send all page-update notifications to the provided handler if (on_page_updated) { StateNumType updated_state_num = 0; @@ -712,15 +728,8 @@ namespace db0 } } catch (db0::IOException &) { - // NOTE: this exception may appear on distributed filesystems - // where changes are not guaranteed to be written sequentially - // need to revert the refresh operation to the point where it originally started - m_dram_changelog_io.setStreamPos(dram_changelog_io_pos); - m_dp_changelog_io.setStreamPos(dp_changelog_io_pos); - if (!!m_ext_space) { - assert(m_ext_dram_changelog_io); - m_ext_dram_changelog_io->setStreamPos(ext_dram_changelog_io_pos); - } + revert_streams(); + // NOTE: this may be a temporary problem, refresh needs repeating break; } @@ -728,11 +737,11 @@ namespace db0 result = m_file.getLastModifiedTime(); } - if (m_dram_io.completeApplyChanges()) { + if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) { // refresh underlying sparse index / diff index after DRAM update m_sparse_pair.refresh(); } - if (!!m_ext_space && m_ext_dram_io->completeApplyChanges()) { + if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) { m_ext_space.refresh(); } m_meta_io.refresh(); diff --git a/src/dbzero/core/storage/BDevStorage.hpp b/src/dbzero/core/storage/BDevStorage.hpp index ffbd299c..f545285e 100644 --- a/src/dbzero/core/storage/BDevStorage.hpp +++ b/src/dbzero/core/storage/BDevStorage.hpp @@ -71,7 +71,7 @@ DB0_PACKED_END public: static constexpr std::uint32_t DEFAULT_PAGE_SIZE = 4096; static constexpr std::size_t DEFAULT_META_IO_STEP_SIZE = 16 << 20; - using DRAM_ChangeLogStreamT = ChangeLogIOStream<>; + using DRAM_ChangeLogStreamT = ChangeLogIOStream; using DP_ChangeLogStreamT = ChangeLogIOStream; /** diff --git a/src/dbzero/core/storage/BaseStorage.hpp b/src/dbzero/core/storage/BaseStorage.hpp index 927cdf8c..fc1aabdc 100644 --- a/src/dbzero/core/storage/BaseStorage.hpp +++ b/src/dbzero/core/storage/BaseStorage.hpp @@ -27,7 +27,7 @@ namespace db0 class BaseStorage { public: - using DRAM_ChangeLogT = db0::o_change_log; + using DRAM_ChangeLogT = db0::o_change_log; using DP_ChangeLogT = db0::o_change_log; BaseStorage(AccessType, StorageFlags = {}); diff --git a/src/dbzero/core/storage/ChangeLog.cpp b/src/dbzero/core/storage/ChangeLog.cpp index c81230e2..8584455b 100644 --- a/src/dbzero/core/storage/ChangeLog.cpp +++ b/src/dbzero/core/storage/ChangeLog.cpp @@ -108,7 +108,8 @@ namespace db0 return rleCompressed().value(); } - template class o_change_log; + template class o_change_log<>; + template class o_change_log; template class o_change_log; } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLog.hpp b/src/dbzero/core/storage/ChangeLog.hpp index fdee6aae..93425131 100644 --- a/src/dbzero/core/storage/ChangeLog.hpp +++ b/src/dbzero/core/storage/ChangeLog.hpp @@ -137,7 +137,7 @@ DB0_PACKED_END (o_list >::type(), data.m_change_log); } } - - extern template class o_change_log; + + extern template class o_change_log<>; } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLogIOStream.cpp b/src/dbzero/core/storage/ChangeLogIOStream.cpp index 8c467e71..b39f2ee5 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.cpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.cpp @@ -154,6 +154,7 @@ namespace db0 } template class ChangeLogIOStream<>; + template class ChangeLogIOStream >; template class ChangeLogIOStream >; } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLogIOStream.hpp b/src/dbzero/core/storage/ChangeLogIOStream.hpp index 18bacca5..b307e0ee 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.hpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.hpp @@ -126,6 +126,7 @@ namespace db0 } extern template class ChangeLogIOStream<>; + extern template class ChangeLogIOStream >; extern template class ChangeLogIOStream >; } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLogTypes.hpp b/src/dbzero/core/storage/ChangeLogTypes.hpp index 04f5b246..da6b6d95 100644 --- a/src/dbzero/core/storage/ChangeLogTypes.hpp +++ b/src/dbzero/core/storage/ChangeLogTypes.hpp @@ -17,6 +17,8 @@ DB0_PACKED_BEGIN // sentinel storage page number for this transaction (see Page_IO::getEndPageNum()) // NOTE: this value might be relative if the mapping is active std::uint64_t m_end_storage_page_num; + // reserved for future use + std::array m_reserved = { 0, 0 }; o_dp_changelog_header(StateNumType state_num, std::uint64_t end_storage_page_num) : m_state_num(state_num) @@ -26,7 +28,23 @@ DB0_PACKED_BEGIN }; DB0_PACKED_END - extern template class o_change_log; +DB0_PACKED_BEGIN + struct DB0_PACKED_ATTR o_dram_changelog_header: o_fixed + { + // state number this change log corresponds to + StateNumType m_state_num; + // reserved for future use + std::array m_reserved = { 0, 0 }; + + o_dram_changelog_header(StateNumType state_num) + : m_state_num(state_num) + { + } + }; +DB0_PACKED_END + + extern template class o_change_log<>; + extern template class o_change_log; extern template class o_change_log; } diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index b45c6e09..9c469b19 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -14,7 +14,7 @@ namespace db0 { - + DRAM_IOStream::DRAM_IOStream(CFile &m_file, std::uint64_t begin, std::uint32_t block_size, std::function tail_function, AccessType access_type, std::uint32_t dram_page_size) : BlockIOStream(m_file, begin, block_size, tail_function, access_type, DRAM_IOStream::ENABLE_CHECKSUMS) @@ -37,10 +37,15 @@ namespace db0 } void *DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header) + const o_dram_chunk_header &header, StateNumType max_state_num) { + if (header.m_state_num > max_state_num) { + // ignore changes beyond the last known consistent state number + return nullptr; + } + // page map = page_num / state_num - auto dram_page = m_page_map.find(header.m_page_num); + auto dram_page = m_page_map.find(header.m_page_num); if (dram_page == m_page_map.end() || dram_page->second.m_state_num < header.m_state_num) { // update DRAM to most recent page version, page not marked as dirty auto result = m_prefix->update(header.m_page_num, false); @@ -76,9 +81,9 @@ namespace db0 } void DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header, const void *bytes) + const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num) { - auto result = updateDRAMPage(address, allocs_ptr, header); + auto result = updateDRAMPage(address, allocs_ptr, header, max_state_num); if (result) { std::memcpy(result, bytes, m_dram_page_size); } @@ -86,7 +91,7 @@ namespace db0 void DRAM_IOStream::load(DRAM_ChangeLogStreamT &changelog_io) { - // simply exhaust the change-log stream + // Exhaust the change-log stream first and retrieve the last valid state number // its position marks the synchronization point while (changelog_io.readChangeLogChunk()); @@ -94,8 +99,14 @@ namespace db0 const auto &header = o_dram_chunk_header::__ref(buffer.data()); auto bytes = buffer.data() + header.sizeOf(); - // maximum known state number by page - // this is required to only select the maximum state per page (discard older mutations) + auto last_chunk_ptr = changelog_io.getLastChangeLogChunk(); + if (!last_chunk_ptr) { + // no changes to load + return; + } + + // The last known consistent state number + auto max_state_num = last_chunk_ptr->m_state_num; std::unordered_set allocs; for (;;) { auto block_id = tellBlock(); @@ -110,12 +121,12 @@ namespace db0 THROWF(db0::IOException) << "DRAM_IOStream::load error: unaligned block"; } - updateDRAMPage(chunk_addr, &allocs, header, bytes); + updateDRAMPage(chunk_addr, &allocs, header, bytes, max_state_num); } m_allocator->update(allocs); } - void DRAM_IOStream::flushUpdates(std::uint64_t state_num, DRAM_ChangeLogStreamT &dram_changelog_io) + void DRAM_IOStream::flushUpdates(StateNumType state_num, DRAM_ChangeLogStreamT &dram_changelog_io) { if (m_access_type == AccessType::READ_ONLY) { THROWF(db0::IOException) << "DRAM_IOStream::flushUpdates error: read-only stream"; @@ -198,7 +209,7 @@ namespace db0 BlockIOStream::flush(); // output changelog, no RLE encoding, no duplicates ChangeLogData cl_data(std::move(dram_changelog), false, false, false); - dram_changelog_io.appendChangeLog(std::move(cl_data)); + dram_changelog_io.appendChangeLog(std::move(cl_data), state_num); } #ifndef NDEBUG @@ -222,24 +233,24 @@ namespace db0 return { m_prefix, m_allocator }; } - void DRAM_IOStream::beginApplyChanges(DRAM_ChangeLogStreamT &changelog_io) const + std::optional DRAM_IOStream::beginApplyChanges(DRAM_ChangeLogStreamT &changelog_io) const { assert(m_read_ahead_chunks.empty()); if (m_access_type == AccessType::READ_WRITE) { THROWF(db0::InternalException) << "DRAM_IOStream::applyChanges require read-only stream"; } - fetchDRAM_IOChanges(*this, changelog_io, m_read_ahead_chunks); + return fetchDRAM_IOChanges(*this, changelog_io, m_read_ahead_chunks); } - bool DRAM_IOStream::completeApplyChanges() + bool DRAM_IOStream::completeApplyChanges(StateNumType max_state_num) { bool result = false; for (const auto &item: m_read_ahead_chunks) { auto address = item.first; const auto &buffer = item.second; const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf()); - updateDRAMPage(address, nullptr, header, header.getData()); + updateDRAMPage(address, nullptr, header, header.getData(), max_state_num); result = true; } m_read_ahead_chunks.clear(); @@ -293,8 +304,10 @@ namespace db0 } #endif - void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io, - std::unordered_map > &chunks_buf) + std::optional fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, + DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io, + std::unordered_map > &chunks_buf, + std::function callback) { auto create_read_ahead_buffer = [&](std::uint64_t address, std::size_t size) -> std::vector & { @@ -306,6 +319,7 @@ namespace db0 }; auto stream_pos = changelog_io.getStreamPos(); + std::optional max_state_num; try { // Must continue until exhausting the change-log for (;;) { @@ -322,6 +336,11 @@ namespace db0 // this is because: a) file writes are NOT atomic, b) DP might be modified while we process the log // NOTE: this might be optimized when modifiaction timestamps are introduced while (change_log_ptr) { + if (callback) { + callback(*change_log_ptr); + } + + max_state_num = change_log_ptr->m_state_num; for (auto address: *change_log_ptr) { // buffer must include BlockIOStream's chunk header and data auto &buffer = create_read_ahead_buffer(address, dram_io.getChunkSize() + o_block_io_chunk_header::sizeOf()); @@ -334,6 +353,8 @@ namespace db0 } } + return max_state_num; + } catch (db0::IOException &) { changelog_io.setStreamPos(stream_pos); chunks_buf.clear(); diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index 7fef70ba..ce41b115 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include #include "BaseStorage.hpp" #include "ChangeLogIOStream.hpp" @@ -79,15 +81,16 @@ DB0_PACKED_END * @param state_num the state number under which the modifications are to be stored * @param dram_changelog_io the stream to receive DRAM IO "changelog" chunks */ - void flushUpdates(std::uint64_t state_num, DRAM_ChangeLogStreamT &); + void flushUpdates(StateNumType state_num, DRAM_ChangeLogStreamT &); // The purpose of this operation is allowing atomic application of changes // this call may end with an IOException without affecting internal state (except populating temporary buffers) // @return the latest state number of available changes - void beginApplyChanges(DRAM_ChangeLogStreamT &) const; + std::optional beginApplyChanges(DRAM_ChangeLogStreamT &) const; // Apply buffered changes (allowed on condition beginApplyChanges succeeded) - bool completeApplyChanges(); + // @param max_state_num the last known consistent state number + bool completeApplyChanges(StateNumType max_state_num); /** * Get the underlying DRAM pair (prefix and allocator) @@ -159,19 +162,25 @@ DB0_PACKED_END std::shared_ptr m_allocator; // chunks buffer for the beginApplyChanges / completeApplyChanges operations mutable std::unordered_map > m_read_ahead_chunks; - + + // @param max_state_num the last known consistent state number + // data pages with higher state numbers are ignored void *updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header); + const o_dram_chunk_header &header, StateNumType max_state_num); void updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header, const void *bytes); + const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num); // the number of random write operations performed while flushing updates std::uint64_t m_rand_ops = 0; }; // Pre-fetch changes into the chunks buffer - void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io, - std::unordered_map > &chunks_buf); + // @param callback optional function to be called for each changelog chunk read + // @return the latest state number of available changes + std::optional fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, + DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io, + std::unordered_map > &chunks_buf, + std::function callback = {}); // Flush changes from the buffer void flushDRAM_IOChanges(DRAM_IOStream &dram_io, diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index 1110d545..02667b6f 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -11,6 +11,8 @@ namespace db0 void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog) { + using DRAM_ChangeLogT = DRAM_IOStream::DRAM_ChangeLogT; + // Exhaust the input_dram_changelog first input_dram_changelog.setStreamPosHead(); auto change_log_ptr = input_dram_changelog.readChangeLogChunk(); @@ -23,10 +25,15 @@ namespace db0 // Copy the entire DRAM_IO stream next (possibly inconsistent state) copyStream(input_io, output_io); + // Callback to copy additional changelog chunks retrieved on post-sync + auto copy_callback = [&](const DRAM_ChangeLogT &change_log) { + output_dram_changelog.appendChangeLog(change_log); + }; + // Chunks loaded during the sync step // NOTE: in this step we prefetch to memory to be able to catch up with changes std::unordered_map > chunk_buf; - fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf); + fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf, copy_callback); flushDRAM_IOChanges(output_io, chunk_buf); } diff --git a/src/dbzero/object_model/LangCache.cpp b/src/dbzero/object_model/LangCache.cpp index a894fea3..771bac3f 100644 --- a/src/dbzero/object_model/LangCache.cpp +++ b/src/dbzero/object_model/LangCache.cpp @@ -325,5 +325,5 @@ namespace db0 m_objects.clear(); } } - + } \ No newline at end of file diff --git a/tests/unit_tests/REL_IndexTest.cpp b/tests/unit_tests/REL_IndexTest.cpp index 3ed310d9..60c5b66c 100644 --- a/tests/unit_tests/REL_IndexTest.cpp +++ b/tests/unit_tests/REL_IndexTest.cpp @@ -88,9 +88,9 @@ namespace tests // relative page number, absolute page number { 0, 100 }, { 50, 200 }, { 60, 210 }, { 100, 300 }, { 150, 400 }, { 160, 410 }, { 200, 500 } }; - + for (auto &item: items) { - cut.addMapping(item.m_storage_page_num, item.m_rel_page_num, 50); + cut.addMapping(item.m_storage_page_num, item.m_rel_page_num, 10); } std::vector rel_page_nums;