Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions python_tests/test_copy_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,13 @@ def writer_process(prefix, obj_count = 50, commit_count = 50, long_run = False):
root.value.append(MemoTestClass("b" * 1024)) # 1 KB string
db0.commit()
if long_run:
print(f"Writer process: committed {i * obj_count} objects")
print(f"Writer process: committed {(i + 1) * obj_count} objects", flush=True)
else:
time.sleep(0.1)

if long_run:
print(db0.get_storage_stats())
db0.commit()
db0.close()
print(db0.get_storage_stats(), flush=True)
db0.close()


def test_copy_prefix_being_actively_modified(db0_fixture):
Expand Down Expand Up @@ -200,7 +199,7 @@ def test_copy_prefix_continuous_process(db0_fixture):

def validate_current_prefix(expected_len = None, expected_min_len = None):
root = db0.fetch(MemoTestSingleton)
assert not expected_min_len or len(root.value) > expected_min_len
assert not expected_min_len or len(root.value) >= expected_min_len
assert not expected_len or len(root.value) == expected_len
for item in root.value:
assert item.value == "b" * 1024
Expand All @@ -223,6 +222,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
# in each 'epoch' we modify prefix while making copies
# then drop the original prefix and restore if from the last copy
epoch_count = 2
total_len = 0
for epoch in range(epoch_count):
print(f"=== Epoch {epoch} ===")
obj_count = 5000
Expand Down Expand Up @@ -253,7 +253,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
if os.path.exists(file_name):
os.remove(file_name)
# copy prefix without opening it, use default step size
print("--- Copying prefix iteration", copy_id)
print("--- Copying prefix iteration", copy_id)
db0.copy_prefix(file_name, prefix=px_name)
print("--- copy finished")
copy_id += 1
Expand All @@ -262,11 +262,11 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
time.sleep(2.5) # wait a bit before next copy

p.join()
total_len += obj_count * commit_count

# validate original prefix (no copy yet)
print("Validating final prefix ...")
db0.open(px_name, "r")
validate_current_prefix(expected_len = obj_count * commit_count)
# print("Validating final prefix ...", flush=True)
# validate_current_prefix(expected_len = total_len)

# make final stale copy (i.e. without active modifications)
final_copy = f"./test-copy-final.db0"
Expand All @@ -276,10 +276,12 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
db0.close()

print("Validating all copies")
validate_copy("final", expected_len = obj_count * commit_count)
validate_copy("final", expected_len = total_len)
for i in range(copy_id):
last_len = validate_copy(i, expected_min_len = last_len)
print(f"--- Copy {i} valid with {last_len} objects")
# this is the restored version
total_len = last_len

# now, continue modifications starting from the last restored copy (making new copies)

Expand Down Expand Up @@ -364,9 +366,9 @@ def validate(expected_len):
os.rename(file_name, px_path)

# open recovered prefix for update
db0.init(DB0_DIR, prefix=px_name, read_write=True)
db0.init(DB0_DIR, prefix=px_name, read_write=True)
total_len += modify_prefix(1350)

db0.close()
db0.init(DB0_DIR, prefix=px_name, read_write=True)
validate(total_len)
Expand Down
78 changes: 78 additions & 0 deletions python_tests/test_issues_14.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (c) 2025 DBZero Software sp. z o.o.

import dbzero as db0
import pytest
from .conftest import DB0_DIR
from .memo_test_types import MemoTestSingleton, MemoTestClass
import multiprocessing
import os
import time


def writer_process(prefix, obj_count = 50, commit_count = 50):
db0.init(DB0_DIR)
db0.open(prefix, "rw")
# create new or open an existing root object
root = MemoTestSingleton([])
if (len(root.value) > 0):
print(f"Writer process: opened existing prefix with {len(root.value)} objects")
for i in range(commit_count):
for _ in range(obj_count):
root.value.append(MemoTestClass("b" * 1024)) # 1 KB string
db0.commit()

db0.commit()
db0.close()


@pytest.mark.parametrize("db0_fixture", [{"autocommit": False}], indirect=True)
def test_copy_prefix_issue1(db0_fixture):
"""
Issue: test failing with RuntimeError: Diff block not found
"""
px_name = db0.get_current_prefix().name
px_path = os.path.join(DB0_DIR, px_name + ".db0")

def validate_current_prefix(expected_len = None, expected_min_len = None):
root = db0.fetch(MemoTestSingleton)
assert not expected_min_len or len(root.value) > expected_min_len
assert not expected_len or len(root.value) == expected_len
for item in root.value:
assert item.value == "b" * 1024
return len(root.value)

def validate_copy(copy_id, expected_len = None, expected_min_len = None):
file_name = f"./test-copy-{copy_id}.db0"
os.remove(px_path)
# restore the copy
os.rename(file_name, px_path)

db0.init(DB0_DIR, prefix=px_name, read_write=False)
result = validate_current_prefix(expected_len, expected_min_len)
db0.close()
return result

db0.close()

# in each 'epoch' we modify prefix while making copies
# then drop the original prefix and restore if from the last copy
epoch_count = 2
total_len = 0
for _ in range(epoch_count):
obj_count = 30
commit_count = 3
writer_process(px_name, obj_count, commit_count)
total_len += obj_count * commit_count

db0.init(DB0_DIR)
db0.open(px_name, "r")

# make final stale copy (i.e. without active modifications)
final_copy = f"./test-copy-final.db0"
if os.path.exists(final_copy):
os.remove(final_copy)
db0.copy_prefix(final_copy, prefix=px_name)
db0.close()

validate_copy("final", expected_len = total_len)
5 changes: 5 additions & 0 deletions src/dbzero/core/memory/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) 2025 DBZero Software sp. z o.o.

#include "config.hpp"
#include <dbzero/core/exception/Exceptions.hpp>

namespace db0

Expand All @@ -12,4 +13,8 @@ namespace db0
bool Settings::__storage_validation = false;
#endif

std::function<void()> Settings::m_decode_error = []() {
THROWF(db0::IOException) << "Data decoding error: corrupt data detected";
};

}
3 changes: 3 additions & 0 deletions src/dbzero/core/memory/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <cstdint>
#include <cstddef>
#include <functional>

namespace db0

Expand All @@ -27,6 +28,8 @@ namespace db0
// performs storage full read / write validation (with in-memory mirroring)
static bool __storage_validation;
#endif
// Function to throw the data decoding error (i.e. corrupt data detected)
static std::function<void()> m_decode_error;
};

}
11 changes: 11 additions & 0 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ namespace db0
if (m_access_type == AccessType::READ_ONLY && !m_flags.test(StorageOptions::NO_LOAD)) {
refresh();
}

// Validate state consistency
// The state number reported by DRAM IO must match the one in the DP changelog IO
if (auto chunk_ptr = m_dp_changelog_io.getLastChangeLogChunk()) {
auto dp_state_num = chunk_ptr->m_state_num;
auto dram_state_num = m_sparse_pair.getMaxStateNum();
if (dram_state_num != dp_state_num) {
THROWF(db0::IOException) << "Inconsistent state: DRAM IO max state number " << dram_state_num
<< " does not match DP changelog last state number " << dp_state_num;
}
}
}

BDevStorage::~BDevStorage()
Expand Down
4 changes: 4 additions & 0 deletions src/dbzero/core/storage/BlockIOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,4 +557,8 @@ namespace db0
}
}

std::size_t BlockIOStream::readChunk() {
THROWF(db0::InternalException) << "BlockIOStream::readChunk() operation not supported" << THROWF_END;
}

}
9 changes: 7 additions & 2 deletions src/dbzero/core/storage/BlockIOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ DB0_PACKED_END

BlockIOStream(const BlockIOStream &) = delete;

~BlockIOStream();
virtual ~BlockIOStream();

/**
* Add a new chunk with a specific header
Expand All @@ -121,7 +121,12 @@ DB0_PACKED_END
* @param address if not null, the absolute address of the chunk is stored here
* @return the number of bytes read or 0 if EOF
*/
std::size_t readChunk(std::vector<char> &buffer, std::size_t expected_size = 0, std::uint64_t *address = nullptr);
virtual std::size_t readChunk(std::vector<char> &buffer, std::size_t expected_size = 0,
std::uint64_t *address = nullptr);

// Reach the next chunk into the internal buffer (where available)
// The default implementation throws
virtual std::size_t readChunk();

/**
* Refresh method re-reads the tail block from disk.
Expand Down
32 changes: 28 additions & 4 deletions src/dbzero/core/storage/ChangeLogIOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,36 @@ namespace db0
template <typename o_change_log_t>
const o_change_log_t *ChangeLogIOStream<o_change_log_t>::readChangeLogChunk(std::vector<char> &buffer)
{
if (BlockIOStream::readChunk(buffer)) {
m_last_change_log_ptr = &o_change_log_t::__const_ref(buffer.data());
return m_last_change_log_ptr;
} else {
if (!this->readChunk(buffer)) {
return nullptr;
}
return m_last_change_log_ptr;
}

template <typename o_change_log_t>
std::size_t ChangeLogIOStream<o_change_log_t>::readChunk(std::vector<char> &buffer, std::size_t expected_size,
std::uint64_t *address)
{
auto result = BlockIOStream::readChunk(buffer, expected_size, address);
if (result) {
// map to a local buffer
if (buffer.data() != m_buffer.data()) {
m_buffer.resize(buffer.size());
std::copy(buffer.data(), buffer.data() + buffer.size(), m_buffer.data());
}

// reference with bounds validation
const_bounded_buf_t const_buf(Settings::m_decode_error, reinterpret_cast<std::byte*>(m_buffer.data()),
reinterpret_cast<std::byte*>(m_buffer.data() + m_buffer.size())
);
m_last_change_log_ptr = &o_change_log_t::__safe_const_ref(const_buf);
}
return result;
}

template <typename o_change_log_t>
std::size_t ChangeLogIOStream<o_change_log_t>::readChunk() {
return this->readChunk(m_buffer);
}

template <typename o_change_log_t>
Expand Down
8 changes: 7 additions & 1 deletion src/dbzero/core/storage/ChangeLogIOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ namespace db0
const o_change_log_t *readChangeLogChunk();

// Read chunk, bring your own buffer
const o_change_log_t *readChangeLogChunk(std::vector<char> &buffer);
// @return pointer to the change-log chunk or nullptr if EOF (from an internal buffer)
const o_change_log_t *readChangeLogChunk(std::vector<char> &buffer);

std::size_t readChunk(std::vector<char> &buffer, std::size_t expected_size = 0,
std::uint64_t *address = nullptr) override;

std::size_t readChunk() override;

/**
* Get last read or written change log chunk
Expand Down
25 changes: 8 additions & 17 deletions src/dbzero/core/storage/Diff_IO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <dbzero/core/serialization/packed_int_pair.hpp>
#include <dbzero/core/exception/Exceptions.hpp>
#include <dbzero/core/compiler_attributes.hpp>
#include <dbzero/core/memory/config.hpp>

namespace db0

Expand Down Expand Up @@ -65,8 +66,7 @@ DB0_PACKED_END
{
public:
// buffer is 2 pages long
DiffReader(Page_IO &, std::uint64_t page_num, std::byte *begin, std::byte *end,
const std::function<void()> &decode_fault);
DiffReader(Page_IO &, std::uint64_t page_num, std::byte *begin, std::byte *end);

// appy diffs from a specific page / state number into a provided data buffer
// if underflow occurs then next page needs to be fetched and apply repeated
Expand All @@ -84,8 +84,7 @@ DB0_PACKED_END
const std::byte *m_current;
std::byte const *m_end;
// the number of objects remaining to be read
unsigned int m_size = 0;
const std::function<void()> &m_decode_fault;
unsigned int m_size = 0;
};

DiffWriter::DiffWriter(Page_IO &page_io, std::byte *begin, std::byte *end)
Expand Down Expand Up @@ -168,22 +167,20 @@ DB0_PACKED_END
return m_header.m_size == 0 && m_header.m_offset == 0;
}

DiffReader::DiffReader(Page_IO &page_io, std::uint64_t page_num, std::byte *begin, std::byte *end,
const std::function<void()> &decode_fault)
DiffReader::DiffReader(Page_IO &page_io, std::uint64_t page_num, std::byte *begin, std::byte *end)
: m_page_io(page_io)
, m_page_size(page_io.getPageSize())
, m_page_num(page_num)
, m_begin(begin)
, m_current(begin + m_page_size)
, m_end(end)
, m_decode_fault(decode_fault)
, m_end(end)
{
page_io.read(page_num, m_begin + m_page_size);
m_size = o_diff_header::__const_ref(m_current).m_size;
// position at the first diff block
m_current += o_diff_header::sizeOf() + o_diff_header::__const_ref(m_current).m_offset;
if (m_current > m_end) {
m_decode_fault();
Settings::m_decode_error();
}
}

Expand All @@ -206,7 +203,7 @@ DB0_PACKED_END
}

auto &diff_buf = o_diff_buffer::__safe_const_ref(
const_bounded_buf_t(m_decode_fault, m_current, m_end)
const_bounded_buf_t(Settings::m_decode_error, m_current, m_end)
);
diff_buf.apply(dp_data, dp_data + m_page_size);
m_current += diff_buf_size;
Expand Down Expand Up @@ -244,18 +241,12 @@ DB0_PACKED_END
, m_writer(std::make_unique<DiffWriter>(
reinterpret_cast<Page_IO&>(*this), m_write_buf.data(), m_write_buf.data() + m_write_buf.size())
)
, m_decode_fault([]() {
THROWF(db0::IOException) << "Diff_IO: decode fault - corrupt diff data";
})
{
}

Diff_IO::Diff_IO(std::size_t header_size, CFile &file, std::uint32_t page_size)
: Page_IO(header_size, file, page_size)
, m_read_buf(page_size * 2)
, m_decode_fault([]() {
THROWF(db0::IOException) << "Diff_IO: decode fault - corrupt diff data";
})
{
}

Expand Down Expand Up @@ -310,7 +301,7 @@ DB0_PACKED_END
{
// must lock because the read-buffer is shared
std::unique_lock<std::mutex> lock(m_mx_read);
DiffReader reader((Page_IO&)*this, page_num, m_read_buf.data(), m_read_buf.data() + m_read_buf.size(), m_decode_fault);
DiffReader reader((Page_IO&)*this, page_num, m_read_buf.data(), m_read_buf.data() + m_read_buf.size());
for (;;) {
bool underflow = false;
if (reader.apply((std::byte*)buffer, page_and_state, underflow)) {
Expand Down
Loading
Loading