From 15a6e834623ecd27b43694d379bd4d4cc4d2f90a Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sun, 14 Dec 2025 19:47:31 +0100 Subject: [PATCH 1/2] refresh related fixes --- python_tests/test_copy_prefix.py | 4 +-- src/dbzero/core/storage/BDevStorage.cpp | 46 +++++++++++++++++-------- src/dbzero/core/storage/ExtSpace.cpp | 4 +-- src/dbzero/core/storage/SparsePair.cpp | 2 +- src/dbzero/core/storage/copy_prefix.cpp | 4 ++- 5 files changed, 40 insertions(+), 20 deletions(-) diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index dc352354..6cfbf9f1 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -263,7 +263,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): p.join() total_len += obj_count * commit_count - + # make final stale copy (i.e. without active modifications) final_copy = f"./test-copy-final.db0" if os.path.exists(final_copy): @@ -272,7 +272,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): db0.close() print("Validating all copies", flush=True) - validate_copy("final", expected_len = total_len) + validate_copy("final", expected_len = total_len) for i in range(copy_id): last_len = validate_copy(i, expected_min_len = last_len) print(f"--- Copy {i} valid with {last_len} objects", flush=True) diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index dc1b9b90..e2025233 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -699,10 +699,22 @@ namespace db0 dram_changelog_io_pos = m_dram_changelog_io.getStreamPos(); if (!!m_ext_space) { assert(m_ext_dram_changelog_io); + // FIXME: log + // m_ext_dram_changelog_io->refresh(); ext_dram_state_num = m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io); ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); } + if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) { + // refresh underlying sparse index / diff index after DRAM update + m_sparse_pair.refresh(); + } + if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) { + m_ext_space.refresh(); + } + + // this is the state number to sync-up to + auto max_state_num = m_sparse_pair.getMaxStateNum(); // send all page-update notifications to the provided handler if (on_page_updated) { StateNumType updated_state_num = 0; @@ -711,22 +723,33 @@ namespace db0 auto reader = m_dp_changelog_io.getStreamReader(); // feed the reader with all available chunks, in case of IOException the stream is getting reverted // this is to make the operation atomic - while (reader.readChangeLogChunk()); + while (auto chunk_ptr = reader.readChangeLogChunk()) { + if (chunk_ptr->m_state_num == max_state_num) { + // stop at the max known state number + break; + } + if (chunk_ptr->m_state_num > max_state_num) { + // NOTE: this critical and irrecoverable error indicates corruption of the DP changelog stream + THROWF(db0::InternalException) << "Inconsistent state: DP changelog state number " + << chunk_ptr->m_state_num << " exceeds max known state number " << max_state_num; + } + } // reset to read all updates again reader.reset(); for (;;) { auto dp_change_log_ptr = reader.readChangeLogChunk(); - if (!dp_change_log_ptr) { + if (!dp_change_log_ptr || dp_change_log_ptr->m_state_num > max_state_num) { + // end of the stream or the max known state number reached break; } assert(dp_change_log_ptr->m_state_num != updated_state_num); updated_state_num = dp_change_log_ptr->m_state_num; - // Elements are storage page numbers (mutated in that transaction) - for (auto storage_page_num: *dp_change_log_ptr) { - on_page_updated(storage_page_num, updated_state_num); - } + // Elements are logical page numbers (mutated in that transaction) + for (auto page_num: *dp_change_log_ptr) { + on_page_updated(page_num, updated_state_num); + } } } @@ -740,13 +763,6 @@ namespace db0 result = m_file.getLastModifiedTime(); } - if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) { - // refresh underlying sparse index / diff index after DRAM update - m_sparse_pair.refresh(); - } - if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) { - m_ext_space.refresh(); - } m_meta_io.refresh(); // refresh cycle complete m_refresh_pending = false; @@ -888,7 +904,9 @@ namespace db0 // assure copied streams are consistent if (dp_header->m_state_num != max_state_num) { - THROWF(db0::IOException) << "BDevStorage::copyTo: inconsistent max_state_num in DP changelog"; + THROWF(db0::IOException) + << "BDevStorage::copyTo: inconsistent max_state_num in DP changelog: " + << (StateNumType)(dp_header->m_state_num) << " != " << max_state_num; } std::uint64_t end_page_num = dp_header->m_end_storage_page_num; // NOTE: end_page_num may be relative, need to translate to absolute diff --git a/src/dbzero/core/storage/ExtSpace.cpp b/src/dbzero/core/storage/ExtSpace.cpp index fbfddce6..8028cf57 100644 --- a/src/dbzero/core/storage/ExtSpace.cpp +++ b/src/dbzero/core/storage/ExtSpace.cpp @@ -50,13 +50,13 @@ namespace db0 } void ExtSpace::refresh() - { + { m_ext_space_root.detach(); if (m_rel_index) { m_rel_index->refresh(); } } - + void ExtSpace::commit() { if (!!m_ext_space_root) { diff --git a/src/dbzero/core/storage/SparsePair.cpp b/src/dbzero/core/storage/SparsePair.cpp index 05dfff06..c7fa7e8c 100644 --- a/src/dbzero/core/storage/SparsePair.cpp +++ b/src/dbzero/core/storage/SparsePair.cpp @@ -59,7 +59,7 @@ namespace db0 { std::sort(m_change_log.begin(), m_change_log.end()); ChangeLogData cl_data; - // add page numbers with deduplication + // add page numbers (logical) with deduplication for (auto page_num : m_change_log) { cl_data.m_rle_builder.append(page_num, false); } diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index e866ae0c..49f40f5e 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -136,12 +136,14 @@ namespace db0 std::vector buffer; std::size_t chunk_size = 0; std::uint64_t in_addr, out_addr; - for (;;) { + bool stop_copying = false; + while (!stop_copying) { while ((chunk_size = in.readChunk(buffer, 0, &in_addr)) > 0) { // NOTE: this buffer does NOT include the block IO header at the beginning if (filter && !filter(buffer, buffer.data() + chunk_size)) { // stop copying entirely if (!copy_all) { + stop_copying = true; break; } // skip this chunk only From ca3e2bd09d4666edd5ef7b8a2d199f496d7fd62a Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sun, 14 Dec 2025 19:54:26 +0100 Subject: [PATCH 2/2] fix --- src/dbzero/core/storage/BDevStorage.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index e2025233..68fbd388 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -699,8 +699,7 @@ namespace db0 dram_changelog_io_pos = m_dram_changelog_io.getStreamPos(); if (!!m_ext_space) { assert(m_ext_dram_changelog_io); - // FIXME: log - // m_ext_dram_changelog_io->refresh(); + m_ext_dram_changelog_io->refresh(); ext_dram_state_num = m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io); ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos(); }