Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,11 @@ namespace db0
{
assert(m_access_type == AccessType::READ_ONLY);
std::uint64_t result = 0;
// NOTE: in some situations (e.g. very slow reader) we might not be able
// to grab the consistent snapshot of the DRAM prefix, in such case the operation
// needs to be retried until successful
// WARNING: if the reader is much slower that the writer (~100x slower) then this loop might not terminate
bool is_consistent = true;
// continue refreshing until all updates are retrieved to guarantee a consistent state
do {
// safe stream positions for rollback on file read failure
Expand All @@ -678,7 +683,7 @@ namespace db0
assert(m_ext_dram_changelog_io);
ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos();
}
auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos();
auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos();
// reverts streams to previous positions
auto revert_streams = [&]() {
m_dram_changelog_io.setStreamPos(dram_changelog_io_pos);
Expand All @@ -693,7 +698,7 @@ namespace db0
auto dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io);
if (!dram_state_num) {
// no updates to process
break;
break;
}
dram_changelog_io_pos = m_dram_changelog_io.getStreamPos();
// NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates
Expand All @@ -707,14 +712,21 @@ namespace db0
}

assert(dram_state_num);
if (m_dram_io.completeApplyChanges(*dram_state_num)) {
// refresh underlying sparse index / diff index after DRAM update
m_sparse_pair.refresh();
}
if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) {
is_consistent = m_dram_io.completeApplyChanges(*dram_state_num);
if (!!m_ext_space && ext_dram_state_num) {
is_consistent &= m_ext_dram_io->completeApplyChanges(*ext_dram_state_num);
m_ext_space.refresh();
}

if (!is_consistent) {
// must continue with the refresh until getting a consistent state
m_dram_changelog_io.refresh();
continue;
}

// refresh underlying sparse index / diff index after DRAM update
m_sparse_pair.refresh();

// this is the state number to sync-up to (which must be identical as dram_state_num)
auto max_state_num = m_sparse_pair.getMaxStateNum();
if (dram_state_num != max_state_num) {
Expand Down Expand Up @@ -775,7 +787,7 @@ namespace db0
// refresh cycle complete
m_refresh_pending = false;
}
while (beginRefresh());
while (beginRefresh() || !is_consistent);
return result;
}

Expand Down
29 changes: 20 additions & 9 deletions src/dbzero/core/storage/DRAM_IOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ namespace db0
}

void *DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
const o_dram_chunk_header &header, StateNumType max_state_num)
const o_dram_chunk_header &header, StateNumType max_state_num, bool *is_consistent)
{
// NOTE: header may be invalid (i.e. copied chunk marked as invalid on copy post-processing)
// NOTE: ignore changes beyond the last known consistent state number
if (!!header && header.m_state_num <= max_state_num) {
if (is_consistent) {
*is_consistent = true;
}
// page map = page_num / state_num
auto dram_page = m_page_map.find(header.m_page_num);
// NOTE: even if the same state number is encountered, the page is updated
Expand Down Expand Up @@ -87,17 +90,23 @@ namespace db0
}
}

if (is_consistent) {
// NOTE: null header is assumed as not violating consistency
// NOTE: we allow up to +1 state number to differ (from the transaction being synced to)
*is_consistent = !header || header.m_state_num <= max_state_num + 1;
}

// mark block as reusable (read/write mode only)
if (m_access_type == AccessType::READ_WRITE) {
m_reusable_chunks.insert(address);
}
}
return nullptr;
}

void DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num)
const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num, bool *is_consistent)
{
auto result = updateDRAMPage(address, allocs_ptr, header, max_state_num);
auto result = updateDRAMPage(address, allocs_ptr, header, max_state_num, is_consistent);
if (result) {
std::memcpy(result, bytes, m_dram_page_size);
}
Expand Down Expand Up @@ -298,20 +307,23 @@ namespace db0

bool DRAM_IOStream::completeApplyChanges(StateNumType max_state_num)
{
bool result = false;
bool is_consistent = true;
for (const auto &item: m_read_ahead_chunks) {
auto address = item.first;
const auto &buffer = item.second;
const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf());
// NOTE: ignore invalid or incomplete DRAM chunks (too fresh to be included)
if (!isDRAM_ChunkValid(m_dram_page_size, header, header.getData(), buffer.data() + buffer.size())) {
// NOTE: since we don't know chunk's actual status, must assume inconsistency
is_consistent = false;
continue;
}
updateDRAMPage(address, nullptr, header, header.getData(), max_state_num);
result = true;
bool consistent_update;
updateDRAMPage(address, nullptr, header, header.getData(), max_state_num, &consistent_update);
is_consistent &= consistent_update;
}
m_read_ahead_chunks.clear();
return result;
return is_consistent;
}

void DRAM_IOStream::flush() {
Expand Down Expand Up @@ -414,7 +426,6 @@ namespace db0
// it may come from a more recent update as well (and potentially may only be partially written)
// therefore chunk-level checksum validation is necessary
dram_io.readFromChunk(address, buffer.data(), buffer.size());

#ifndef NDEBUG
// Optional sleep for time-sensitive tests (e.g. copy_prefix)
if (db0::Settings::__sleep_interval > 0) {
Expand Down
8 changes: 5 additions & 3 deletions src/dbzero/core/storage/DRAM_IOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ DB0_PACKED_END

// Apply buffered changes (allowed on condition beginApplyChanges succeeded)
// @param max_state_num the last known consistent state number
// @return false if the applied changes were INCONSISTENT (refresh must be repeated)
bool completeApplyChanges(StateNumType max_state_num);

/**
* Get the underlying DRAM pair (prefix and allocator)
*/
Expand Down Expand Up @@ -181,11 +182,12 @@ DB0_PACKED_END
mutable std::unordered_map<std::uint64_t, std::vector<char> > m_read_ahead_chunks;

// @param max_state_num the last known consistent state number
// @param is_consistent flag set to false if the resulting state cannot be assumed consistent
// data pages with higher state numbers are ignored
void *updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
const o_dram_chunk_header &header, StateNumType max_state_num);
const o_dram_chunk_header &header, StateNumType max_state_num, bool *is_consistent = nullptr);
void updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num);
const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num, bool *is_consistent = nullptr);

// Overwrite invalid or corrupted DRAM page with null data
void trashDRAMPage(std::uint64_t address);
Expand Down
66 changes: 38 additions & 28 deletions src/dbzero/core/storage/copy_prefix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace db0

return chunk_buf;
}

std::optional<StateNumType> copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog,
DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog)
{
Expand Down Expand Up @@ -87,43 +87,53 @@ namespace db0
return false;
}
// reject chunks beyond the last consistent state number
if (header.m_state_num > state_num) {
return false;
}
return true;
return header.m_state_num <= state_num;
};

auto chunk_filter = [&](const std::vector<char> &buffer, const void *data_end) -> bool
{
const auto &header = o_dram_chunk_header::__const_ref(buffer.data());
return dram_filter(header, data_end);
};

copyStream(input_io, output_io, &chunk_addr_map, chunk_filter);

// Chunks loaded during the sync step
// NOTE: in this step we prefetch to memory to be able to catch up with changes
std::unordered_map<std::uint64_t, std::vector<char> > chunk_buf;
while (input_dram_changelog.refresh()) {
fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf);
}
copyStream(input_io, output_io, &chunk_addr_map, chunk_filter);

last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk();
assert(last_chunk_ptr);
// NOTE: the operation might need to be repeated multiple times
// if unable to reach a consistent state in one pass (this might be due to a very slow reader process)
for (;;) {
// Chunks loaded during the sync step
// NOTE: in this step we prefetch to memory to be able to catch up with changes
std::unordered_map<std::uint64_t, std::vector<char> > chunk_buf;
while (input_dram_changelog.refresh()) {
fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf);
}

last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk();
assert(last_chunk_ptr);

// this is the actually copied last consistent state number
state_num = last_chunk_ptr->m_state_num;

// NOTE: at this stage we might also encounter incomplete
// or new chunks beyond the copied stream which needs to be discarded
chunk_buf = filterDRAM_Chunks(std::move(chunk_buf), dram_filter);

// this is the actually copied last consistent state number
state_num = last_chunk_ptr->m_state_num;
// NOTE: flush must be done under translated addresses (or appended to stream if translation not present)
auto bufs_pair = translateDRAM_Chunks(std::move(chunk_buf), chunk_addr_map);
flushDRAM_IOChanges(output_io, bufs_pair.first);
// append new chuks which were not present during the initial copy
appendDRAM_IOChunks(output_io, bufs_pair.second);
// append the sentinel entry with state number only (i.e. empty changelog)
output_dram_changelog.appendChangeLog({}, state_num);

// this operation needs to be continued until exhausting the entire changelog
if (input_dram_changelog.refresh()) {
continue;
} else {
break;
}
}

// NOTE: at this stage we might also encounter incomplete
// or new chunks beyond the copied stream which needs to be discarded
chunk_buf = filterDRAM_Chunks(std::move(chunk_buf), dram_filter);
// NOTE: flush must be done under translated addresses (or appended to stream if translation not present)
auto bufs_pair = translateDRAM_Chunks(std::move(chunk_buf), chunk_addr_map);
flushDRAM_IOChanges(output_io, bufs_pair.first);
// append new chuks which were not present during the initial copy
appendDRAM_IOChunks(output_io, bufs_pair.second);
// append the sentinel entry with state number only (i.e. empty changelog)
output_dram_changelog.appendChangeLog({}, state_num);
output_io.close();
return state_num;
}
Expand Down