From b105c5b0dcd47d1e6f074a09ade3647d8162e869 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Mon, 15 Dec 2025 10:04:03 +0100 Subject: [PATCH 1/2] refresh fixes --- src/dbzero/core/memory/PrefixImpl.cpp | 7 ++++--- src/dbzero/core/storage/BDevStorage.cpp | 27 ++++++++++++++++--------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/dbzero/core/memory/PrefixImpl.cpp b/src/dbzero/core/memory/PrefixImpl.cpp index f6356442..fee92bae 100644 --- a/src/dbzero/core/memory/PrefixImpl.cpp +++ b/src/dbzero/core/memory/PrefixImpl.cpp @@ -339,9 +339,10 @@ namespace db0 m_cache.markAsMissing(updated_page_num, state_num); }); - assert(result); - // retrieve & sync to the refreshed state number - m_head_state_num = m_storage_ptr->getMaxStateNum(); + if (result) { + // retrieve & sync to the refreshed state number + m_head_state_num = m_storage_ptr->getMaxStateNum(); + } return result; } diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index 68fbd388..b7145cff 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -678,12 +678,7 @@ namespace db0 assert(m_ext_dram_changelog_io); ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } - auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos(); - std::optional dram_state_num; - // NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates - // without breaking the consistency - std::optional ext_dram_state_num; - + auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos(); // reverts streams to previous positions auto revert_streams = [&]() { m_dram_changelog_io.setStreamPos(dram_changelog_io_pos); @@ -695,8 +690,15 @@ namespace db0 }; try { - dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io); + auto dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io); + if (!dram_state_num) { + // no updates to process + break; + } dram_changelog_io_pos = m_dram_changelog_io.getStreamPos(); + // NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates + // without breaking the consistency + std::optional ext_dram_state_num; if (!!m_ext_space) { assert(m_ext_dram_changelog_io); m_ext_dram_changelog_io->refresh(); @@ -704,7 +706,8 @@ namespace db0 ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } - if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) { + assert(dram_state_num); + if (m_dram_io.completeApplyChanges(*dram_state_num)) { // refresh underlying sparse index / diff index after DRAM update m_sparse_pair.refresh(); } @@ -712,8 +715,14 @@ namespace db0 m_ext_space.refresh(); } - // this is the state number to sync-up to + // this is the state number to sync-up to (which must be identical as dram_state_num) auto max_state_num = m_sparse_pair.getMaxStateNum(); + if (dram_state_num != max_state_num) { + // NOTE: this critical and irrecoverable error indicates corruption of the DRAM changelog stream + THROWF(db0::InternalException) << "Inconsistent state: DRAM changelog state number " + << *dram_state_num << " does not match max known state number " << max_state_num; + } + // send all page-update notifications to the provided handler if (on_page_updated) { StateNumType updated_state_num = 0; From e5ebbce88a244ab37fc9c9ea5f64bb773dd41ad9 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Mon, 15 Dec 2025 18:26:12 +0100 Subject: [PATCH 2/2] refresh / copy_prefix fixes --- src/dbzero/core/storage/BDevStorage.cpp | 28 +++++++--- src/dbzero/core/storage/DRAM_IOStream.cpp | 29 ++++++---- src/dbzero/core/storage/DRAM_IOStream.hpp | 8 +-- src/dbzero/core/storage/copy_prefix.cpp | 66 +++++++++++++---------- 4 files changed, 83 insertions(+), 48 deletions(-) diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index b7145cff..57593081 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -669,6 +669,11 @@ namespace db0 { assert(m_access_type == AccessType::READ_ONLY); std::uint64_t result = 0; + // NOTE: in some situations (e.g. very slow reader) we might not be able + // to grab the consistent snapshot of the DRAM prefix, in such case the operation + // needs to be retried until successful + // WARNING: if the reader is much slower that the writer (~100x slower) then this loop might not terminate + bool is_consistent = true; // continue refreshing until all updates are retrieved to guarantee a consistent state do { // safe stream positions for rollback on file read failure @@ -678,7 +683,7 @@ namespace db0 assert(m_ext_dram_changelog_io); ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } - auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos(); + auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos(); // reverts streams to previous positions auto revert_streams = [&]() { m_dram_changelog_io.setStreamPos(dram_changelog_io_pos); @@ -693,7 +698,7 @@ namespace db0 auto dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io); if (!dram_state_num) { // no updates to process - break; + break; } dram_changelog_io_pos = m_dram_changelog_io.getStreamPos(); // NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates @@ -707,14 +712,21 @@ namespace db0 } assert(dram_state_num); - if (m_dram_io.completeApplyChanges(*dram_state_num)) { - // refresh underlying sparse index / diff index after DRAM update - m_sparse_pair.refresh(); - } - if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) { + is_consistent = m_dram_io.completeApplyChanges(*dram_state_num); + if (!!m_ext_space && ext_dram_state_num) { + is_consistent &= m_ext_dram_io->completeApplyChanges(*ext_dram_state_num); m_ext_space.refresh(); } + if (!is_consistent) { + // must continue with the refresh until getting a consistent state + m_dram_changelog_io.refresh(); + continue; + } + + // refresh underlying sparse index / diff index after DRAM update + m_sparse_pair.refresh(); + // this is the state number to sync-up to (which must be identical as dram_state_num) auto max_state_num = m_sparse_pair.getMaxStateNum(); if (dram_state_num != max_state_num) { @@ -775,7 +787,7 @@ namespace db0 // refresh cycle complete m_refresh_pending = false; } - while (beginRefresh()); + while (beginRefresh() || !is_consistent); return result; } diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index cae3ede0..510f01b8 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -50,11 +50,14 @@ namespace db0 } void *DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header, StateNumType max_state_num) + const o_dram_chunk_header &header, StateNumType max_state_num, bool *is_consistent) { // NOTE: header may be invalid (i.e. copied chunk marked as invalid on copy post-processing) // NOTE: ignore changes beyond the last known consistent state number if (!!header && header.m_state_num <= max_state_num) { + if (is_consistent) { + *is_consistent = true; + } // page map = page_num / state_num auto dram_page = m_page_map.find(header.m_page_num); // NOTE: even if the same state number is encountered, the page is updated @@ -87,17 +90,23 @@ namespace db0 } } + if (is_consistent) { + // NOTE: null header is assumed as not violating consistency + // NOTE: we allow up to +1 state number to differ (from the transaction being synced to) + *is_consistent = !header || header.m_state_num <= max_state_num + 1; + } + // mark block as reusable (read/write mode only) if (m_access_type == AccessType::READ_WRITE) { m_reusable_chunks.insert(address); - } + } return nullptr; } void DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num) + const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num, bool *is_consistent) { - auto result = updateDRAMPage(address, allocs_ptr, header, max_state_num); + auto result = updateDRAMPage(address, allocs_ptr, header, max_state_num, is_consistent); if (result) { std::memcpy(result, bytes, m_dram_page_size); } @@ -298,20 +307,23 @@ namespace db0 bool DRAM_IOStream::completeApplyChanges(StateNumType max_state_num) { - bool result = false; + bool is_consistent = true; for (const auto &item: m_read_ahead_chunks) { auto address = item.first; const auto &buffer = item.second; const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf()); // NOTE: ignore invalid or incomplete DRAM chunks (too fresh to be included) if (!isDRAM_ChunkValid(m_dram_page_size, header, header.getData(), buffer.data() + buffer.size())) { + // NOTE: since we don't know chunk's actual status, must assume inconsistency + is_consistent = false; continue; } - updateDRAMPage(address, nullptr, header, header.getData(), max_state_num); - result = true; + bool consistent_update; + updateDRAMPage(address, nullptr, header, header.getData(), max_state_num, &consistent_update); + is_consistent &= consistent_update; } m_read_ahead_chunks.clear(); - return result; + return is_consistent; } void DRAM_IOStream::flush() { @@ -414,7 +426,6 @@ namespace db0 // it may come from a more recent update as well (and potentially may only be partially written) // therefore chunk-level checksum validation is necessary dram_io.readFromChunk(address, buffer.data(), buffer.size()); - #ifndef NDEBUG // Optional sleep for time-sensitive tests (e.g. copy_prefix) if (db0::Settings::__sleep_interval > 0) { diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index 994f0456..2db5fc5e 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -102,8 +102,9 @@ DB0_PACKED_END // Apply buffered changes (allowed on condition beginApplyChanges succeeded) // @param max_state_num the last known consistent state number + // @return false if the applied changes were INCONSISTENT (refresh must be repeated) bool completeApplyChanges(StateNumType max_state_num); - + /** * Get the underlying DRAM pair (prefix and allocator) */ @@ -181,11 +182,12 @@ DB0_PACKED_END mutable std::unordered_map > m_read_ahead_chunks; // @param max_state_num the last known consistent state number + // @param is_consistent flag set to false if the resulting state cannot be assumed consistent // data pages with higher state numbers are ignored void *updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header, StateNumType max_state_num); + const o_dram_chunk_header &header, StateNumType max_state_num, bool *is_consistent = nullptr); void updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, - const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num); + const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num, bool *is_consistent = nullptr); // Overwrite invalid or corrupted DRAM page with null data void trashDRAMPage(std::uint64_t address); diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index 49f40f5e..cf8b29b7 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -49,7 +49,7 @@ namespace db0 return chunk_buf; } - + std::optional copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog) { @@ -87,43 +87,53 @@ namespace db0 return false; } // reject chunks beyond the last consistent state number - if (header.m_state_num > state_num) { - return false; - } - return true; + return header.m_state_num <= state_num; }; - + auto chunk_filter = [&](const std::vector &buffer, const void *data_end) -> bool { const auto &header = o_dram_chunk_header::__const_ref(buffer.data()); return dram_filter(header, data_end); }; - - copyStream(input_io, output_io, &chunk_addr_map, chunk_filter); - // Chunks loaded during the sync step - // NOTE: in this step we prefetch to memory to be able to catch up with changes - std::unordered_map > chunk_buf; - while (input_dram_changelog.refresh()) { - fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf); - } + copyStream(input_io, output_io, &chunk_addr_map, chunk_filter); - last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk(); - assert(last_chunk_ptr); + // NOTE: the operation might need to be repeated multiple times + // if unable to reach a consistent state in one pass (this might be due to a very slow reader process) + for (;;) { + // Chunks loaded during the sync step + // NOTE: in this step we prefetch to memory to be able to catch up with changes + std::unordered_map > chunk_buf; + while (input_dram_changelog.refresh()) { + fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf); + } + + last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk(); + assert(last_chunk_ptr); + + // this is the actually copied last consistent state number + state_num = last_chunk_ptr->m_state_num; + + // NOTE: at this stage we might also encounter incomplete + // or new chunks beyond the copied stream which needs to be discarded + chunk_buf = filterDRAM_Chunks(std::move(chunk_buf), dram_filter); - // this is the actually copied last consistent state number - state_num = last_chunk_ptr->m_state_num; + // NOTE: flush must be done under translated addresses (or appended to stream if translation not present) + auto bufs_pair = translateDRAM_Chunks(std::move(chunk_buf), chunk_addr_map); + flushDRAM_IOChanges(output_io, bufs_pair.first); + // append new chuks which were not present during the initial copy + appendDRAM_IOChunks(output_io, bufs_pair.second); + // append the sentinel entry with state number only (i.e. empty changelog) + output_dram_changelog.appendChangeLog({}, state_num); + + // this operation needs to be continued until exhausting the entire changelog + if (input_dram_changelog.refresh()) { + continue; + } else { + break; + } + } - // NOTE: at this stage we might also encounter incomplete - // or new chunks beyond the copied stream which needs to be discarded - chunk_buf = filterDRAM_Chunks(std::move(chunk_buf), dram_filter); - // NOTE: flush must be done under translated addresses (or appended to stream if translation not present) - auto bufs_pair = translateDRAM_Chunks(std::move(chunk_buf), chunk_addr_map); - flushDRAM_IOChanges(output_io, bufs_pair.first); - // append new chuks which were not present during the initial copy - appendDRAM_IOChunks(output_io, bufs_pair.second); - // append the sentinel entry with state number only (i.e. empty changelog) - output_dram_changelog.appendChangeLog({}, state_num); output_io.close(); return state_num; }