From b105c5b0dcd47d1e6f074a09ade3647d8162e869 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Mon, 15 Dec 2025 10:04:03 +0100 Subject: [PATCH] refresh fixes --- src/dbzero/core/memory/PrefixImpl.cpp | 7 ++++--- src/dbzero/core/storage/BDevStorage.cpp | 27 ++++++++++++++++--------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/dbzero/core/memory/PrefixImpl.cpp b/src/dbzero/core/memory/PrefixImpl.cpp index f6356442..fee92bae 100644 --- a/src/dbzero/core/memory/PrefixImpl.cpp +++ b/src/dbzero/core/memory/PrefixImpl.cpp @@ -339,9 +339,10 @@ namespace db0 m_cache.markAsMissing(updated_page_num, state_num); }); - assert(result); - // retrieve & sync to the refreshed state number - m_head_state_num = m_storage_ptr->getMaxStateNum(); + if (result) { + // retrieve & sync to the refreshed state number + m_head_state_num = m_storage_ptr->getMaxStateNum(); + } return result; } diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index 68fbd388..b7145cff 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -678,12 +678,7 @@ namespace db0 assert(m_ext_dram_changelog_io); ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } - auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos(); - std::optional dram_state_num; - // NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates - // without breaking the consistency - std::optional ext_dram_state_num; - + auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos(); // reverts streams to previous positions auto revert_streams = [&]() { m_dram_changelog_io.setStreamPos(dram_changelog_io_pos); @@ -695,8 +690,15 @@ namespace db0 }; try { - dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io); + auto dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io); + if (!dram_state_num) { + // no updates to process + break; + } dram_changelog_io_pos = m_dram_changelog_io.getStreamPos(); + // NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates + // without breaking the consistency + std::optional ext_dram_state_num; if (!!m_ext_space) { assert(m_ext_dram_changelog_io); m_ext_dram_changelog_io->refresh(); @@ -704,7 +706,8 @@ namespace db0 ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } - if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) { + assert(dram_state_num); + if (m_dram_io.completeApplyChanges(*dram_state_num)) { // refresh underlying sparse index / diff index after DRAM update m_sparse_pair.refresh(); } @@ -712,8 +715,14 @@ namespace db0 m_ext_space.refresh(); } - // this is the state number to sync-up to + // this is the state number to sync-up to (which must be identical as dram_state_num) auto max_state_num = m_sparse_pair.getMaxStateNum(); + if (dram_state_num != max_state_num) { + // NOTE: this critical and irrecoverable error indicates corruption of the DRAM changelog stream + THROWF(db0::InternalException) << "Inconsistent state: DRAM changelog state number " + << *dram_state_num << " does not match max known state number " << max_state_num; + } + // send all page-update notifications to the provided handler if (on_page_updated) { StateNumType updated_state_num = 0;