Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/dbzero/core/memory/PrefixImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,10 @@ namespace db0
m_cache.markAsMissing(updated_page_num, state_num);
});

assert(result);
// retrieve & sync to the refreshed state number
m_head_state_num = m_storage_ptr->getMaxStateNum();
if (result) {
// retrieve & sync to the refreshed state number
m_head_state_num = m_storage_ptr->getMaxStateNum();
}
return result;
}

Expand Down
27 changes: 18 additions & 9 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,7 @@ namespace db0
assert(m_ext_dram_changelog_io);
ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos();
}
auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos();
std::optional<StateNumType> dram_state_num;
// NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates
// without breaking the consistency
std::optional<StateNumType> ext_dram_state_num;

auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos();
// reverts streams to previous positions
auto revert_streams = [&]() {
m_dram_changelog_io.setStreamPos(dram_changelog_io_pos);
Expand All @@ -695,25 +690,39 @@ namespace db0
};

try {
dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io);
auto dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io);
if (!dram_state_num) {
// no updates to process
break;
}
dram_changelog_io_pos = m_dram_changelog_io.getStreamPos();
// NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates
// without breaking the consistency
std::optional<StateNumType> ext_dram_state_num;
if (!!m_ext_space) {
assert(m_ext_dram_changelog_io);
m_ext_dram_changelog_io->refresh();
ext_dram_state_num = m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io);
ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos();
}

if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) {
assert(dram_state_num);
if (m_dram_io.completeApplyChanges(*dram_state_num)) {
// refresh underlying sparse index / diff index after DRAM update
m_sparse_pair.refresh();
}
if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) {
m_ext_space.refresh();
}

// this is the state number to sync-up to
// this is the state number to sync-up to (which must be identical as dram_state_num)
auto max_state_num = m_sparse_pair.getMaxStateNum();
if (dram_state_num != max_state_num) {
// NOTE: this critical and irrecoverable error indicates corruption of the DRAM changelog stream
THROWF(db0::InternalException) << "Inconsistent state: DRAM changelog state number "
<< *dram_state_num << " does not match max known state number " << max_state_num;
}

// send all page-update notifications to the provided handler
if (on_page_updated) {
StateNumType updated_state_num = 0;
Expand Down