Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python_tests/test_copy_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):

p.join()
total_len += obj_count * commit_count

# make final stale copy (i.e. without active modifications)
final_copy = f"./test-copy-final.db0"
if os.path.exists(final_copy):
Expand All @@ -272,7 +272,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
db0.close()

print("Validating all copies", flush=True)
validate_copy("final", expected_len = total_len)
validate_copy("final", expected_len = total_len)
for i in range(copy_id):
last_len = validate_copy(i, expected_min_len = last_len)
print(f"--- Copy {i} valid with {last_len} objects", flush=True)
Expand Down
45 changes: 31 additions & 14 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,21 @@ namespace db0
dram_changelog_io_pos = m_dram_changelog_io.getStreamPos();
if (!!m_ext_space) {
assert(m_ext_dram_changelog_io);
m_ext_dram_changelog_io->refresh();
ext_dram_state_num = m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io);
ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos();
}

if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) {
// refresh underlying sparse index / diff index after DRAM update
m_sparse_pair.refresh();
}
if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) {
m_ext_space.refresh();
}

// this is the state number to sync-up to
auto max_state_num = m_sparse_pair.getMaxStateNum();
// send all page-update notifications to the provided handler
if (on_page_updated) {
StateNumType updated_state_num = 0;
Expand All @@ -711,22 +722,33 @@ namespace db0
auto reader = m_dp_changelog_io.getStreamReader();
// feed the reader with all available chunks, in case of IOException the stream is getting reverted
// this is to make the operation atomic
while (reader.readChangeLogChunk());
while (auto chunk_ptr = reader.readChangeLogChunk()) {
if (chunk_ptr->m_state_num == max_state_num) {
// stop at the max known state number
break;
}
if (chunk_ptr->m_state_num > max_state_num) {
// NOTE: this critical and irrecoverable error indicates corruption of the DP changelog stream
THROWF(db0::InternalException) << "Inconsistent state: DP changelog state number "
<< chunk_ptr->m_state_num << " exceeds max known state number " << max_state_num;
}
}

// reset to read all updates again
reader.reset();
for (;;) {
auto dp_change_log_ptr = reader.readChangeLogChunk();
if (!dp_change_log_ptr) {
if (!dp_change_log_ptr || dp_change_log_ptr->m_state_num > max_state_num) {
// end of the stream or the max known state number reached
break;
}

assert(dp_change_log_ptr->m_state_num != updated_state_num);
updated_state_num = dp_change_log_ptr->m_state_num;
// Elements are storage page numbers (mutated in that transaction)
for (auto storage_page_num: *dp_change_log_ptr) {
on_page_updated(storage_page_num, updated_state_num);
}
// Elements are logical page numbers (mutated in that transaction)
for (auto page_num: *dp_change_log_ptr) {
on_page_updated(page_num, updated_state_num);
}
}
}

Expand All @@ -740,13 +762,6 @@ namespace db0
result = m_file.getLastModifiedTime();
}

if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) {
// refresh underlying sparse index / diff index after DRAM update
m_sparse_pair.refresh();
}
if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) {
m_ext_space.refresh();
}
m_meta_io.refresh();
// refresh cycle complete
m_refresh_pending = false;
Expand Down Expand Up @@ -888,7 +903,9 @@ namespace db0

// assure copied streams are consistent
if (dp_header->m_state_num != max_state_num) {
THROWF(db0::IOException) << "BDevStorage::copyTo: inconsistent max_state_num in DP changelog";
THROWF(db0::IOException)
<< "BDevStorage::copyTo: inconsistent max_state_num in DP changelog: "
<< (StateNumType)(dp_header->m_state_num) << " != " << max_state_num;
}
std::uint64_t end_page_num = dp_header->m_end_storage_page_num;
// NOTE: end_page_num may be relative, need to translate to absolute
Expand Down
4 changes: 2 additions & 2 deletions src/dbzero/core/storage/ExtSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ namespace db0
}

void ExtSpace::refresh()
{
{
m_ext_space_root.detach();
if (m_rel_index) {
m_rel_index->refresh();
}
}

void ExtSpace::commit()
{
if (!!m_ext_space_root) {
Expand Down
2 changes: 1 addition & 1 deletion src/dbzero/core/storage/SparsePair.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace db0
{
std::sort(m_change_log.begin(), m_change_log.end());
ChangeLogData cl_data;
// add page numbers with deduplication
// add page numbers (logical) with deduplication
for (auto page_num : m_change_log) {
cl_data.m_rle_builder.append(page_num, false);
}
Expand Down
4 changes: 3 additions & 1 deletion src/dbzero/core/storage/copy_prefix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,14 @@ namespace db0
std::vector<char> buffer;
std::size_t chunk_size = 0;
std::uint64_t in_addr, out_addr;
for (;;) {
bool stop_copying = false;
while (!stop_copying) {
while ((chunk_size = in.readChunk(buffer, 0, &in_addr)) > 0) {
// NOTE: this buffer does NOT include the block IO header at the beginning
if (filter && !filter(buffer, buffer.data() + chunk_size)) {
// stop copying entirely
if (!copy_all) {
stop_copying = true;
break;
}
// skip this chunk only
Expand Down
Loading