Skip to content

Commit 42014d4

Browse files
wskozlowskiWojtek
andauthored
Issue/551 (#623)
* refresh related fixes * fix --------- Co-authored-by: Wojtek <wskozlowski@dbzero.io>
1 parent ec972c6 commit 42014d4

5 files changed

Lines changed: 39 additions & 20 deletions

File tree

python_tests/test_copy_prefix.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
263263

264264
p.join()
265265
total_len += obj_count * commit_count
266-
266+
267267
# make final stale copy (i.e. without active modifications)
268268
final_copy = f"./test-copy-final.db0"
269269
if os.path.exists(final_copy):
@@ -272,7 +272,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
272272
db0.close()
273273

274274
print("Validating all copies", flush=True)
275-
validate_copy("final", expected_len = total_len)
275+
validate_copy("final", expected_len = total_len)
276276
for i in range(copy_id):
277277
last_len = validate_copy(i, expected_min_len = last_len)
278278
print(f"--- Copy {i} valid with {last_len} objects", flush=True)

src/dbzero/core/storage/BDevStorage.cpp

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -699,10 +699,21 @@ namespace db0
699699
dram_changelog_io_pos = m_dram_changelog_io.getStreamPos();
700700
if (!!m_ext_space) {
701701
assert(m_ext_dram_changelog_io);
702+
m_ext_dram_changelog_io->refresh();
702703
ext_dram_state_num = m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io);
703704
ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos();
704705
}
705706

707+
if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) {
708+
// refresh underlying sparse index / diff index after DRAM update
709+
m_sparse_pair.refresh();
710+
}
711+
if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) {
712+
m_ext_space.refresh();
713+
}
714+
715+
// this is the state number to sync-up to
716+
auto max_state_num = m_sparse_pair.getMaxStateNum();
706717
// send all page-update notifications to the provided handler
707718
if (on_page_updated) {
708719
StateNumType updated_state_num = 0;
@@ -711,22 +722,33 @@ namespace db0
711722
auto reader = m_dp_changelog_io.getStreamReader();
712723
// feed the reader with all available chunks, in case of IOException the stream is getting reverted
713724
// this is to make the operation atomic
714-
while (reader.readChangeLogChunk());
725+
while (auto chunk_ptr = reader.readChangeLogChunk()) {
726+
if (chunk_ptr->m_state_num == max_state_num) {
727+
// stop at the max known state number
728+
break;
729+
}
730+
if (chunk_ptr->m_state_num > max_state_num) {
731+
// NOTE: this critical and irrecoverable error indicates corruption of the DP changelog stream
732+
THROWF(db0::InternalException) << "Inconsistent state: DP changelog state number "
733+
<< chunk_ptr->m_state_num << " exceeds max known state number " << max_state_num;
734+
}
735+
}
715736

716737
// reset to read all updates again
717738
reader.reset();
718739
for (;;) {
719740
auto dp_change_log_ptr = reader.readChangeLogChunk();
720-
if (!dp_change_log_ptr) {
741+
if (!dp_change_log_ptr || dp_change_log_ptr->m_state_num > max_state_num) {
742+
// end of the stream or the max known state number reached
721743
break;
722744
}
723745

724746
assert(dp_change_log_ptr->m_state_num != updated_state_num);
725747
updated_state_num = dp_change_log_ptr->m_state_num;
726-
// Elements are storage page numbers (mutated in that transaction)
727-
for (auto storage_page_num: *dp_change_log_ptr) {
728-
on_page_updated(storage_page_num, updated_state_num);
729-
}
748+
// Elements are logical page numbers (mutated in that transaction)
749+
for (auto page_num: *dp_change_log_ptr) {
750+
on_page_updated(page_num, updated_state_num);
751+
}
730752
}
731753
}
732754

@@ -740,13 +762,6 @@ namespace db0
740762
result = m_file.getLastModifiedTime();
741763
}
742764

743-
if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) {
744-
// refresh underlying sparse index / diff index after DRAM update
745-
m_sparse_pair.refresh();
746-
}
747-
if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) {
748-
m_ext_space.refresh();
749-
}
750765
m_meta_io.refresh();
751766
// refresh cycle complete
752767
m_refresh_pending = false;
@@ -888,7 +903,9 @@ namespace db0
888903

889904
// assure copied streams are consistent
890905
if (dp_header->m_state_num != max_state_num) {
891-
THROWF(db0::IOException) << "BDevStorage::copyTo: inconsistent max_state_num in DP changelog";
906+
THROWF(db0::IOException)
907+
<< "BDevStorage::copyTo: inconsistent max_state_num in DP changelog: "
908+
<< (StateNumType)(dp_header->m_state_num) << " != " << max_state_num;
892909
}
893910
std::uint64_t end_page_num = dp_header->m_end_storage_page_num;
894911
// NOTE: end_page_num may be relative, need to translate to absolute

src/dbzero/core/storage/ExtSpace.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ namespace db0
5050
}
5151

5252
void ExtSpace::refresh()
53-
{
53+
{
5454
m_ext_space_root.detach();
5555
if (m_rel_index) {
5656
m_rel_index->refresh();
5757
}
5858
}
59-
59+
6060
void ExtSpace::commit()
6161
{
6262
if (!!m_ext_space_root) {

src/dbzero/core/storage/SparsePair.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ namespace db0
5959
{
6060
std::sort(m_change_log.begin(), m_change_log.end());
6161
ChangeLogData cl_data;
62-
// add page numbers with deduplication
62+
// add page numbers (logical) with deduplication
6363
for (auto page_num : m_change_log) {
6464
cl_data.m_rle_builder.append(page_num, false);
6565
}

src/dbzero/core/storage/copy_prefix.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,14 @@ namespace db0
136136
std::vector<char> buffer;
137137
std::size_t chunk_size = 0;
138138
std::uint64_t in_addr, out_addr;
139-
for (;;) {
139+
bool stop_copying = false;
140+
while (!stop_copying) {
140141
while ((chunk_size = in.readChunk(buffer, 0, &in_addr)) > 0) {
141142
// NOTE: this buffer does NOT include the block IO header at the beginning
142143
if (filter && !filter(buffer, buffer.data() + chunk_size)) {
143144
// stop copying entirely
144145
if (!copy_all) {
146+
stop_copying = true;
145147
break;
146148
}
147149
// skip this chunk only

0 commit comments

Comments
 (0)