Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbzero/dbzero/dbzero.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def load_dynamic(name, path):

def __bootstrap__():
global __bootstrap__, __loader__, __file__
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"]
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"]
__file__ = None
for path in paths:
if os.path.isdir(path):
Expand Down
3 changes: 2 additions & 1 deletion python_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def db0_fixture(request):
db0.open(
"my-test-prefix",
# use custom page_io_step_size if specified in request.param
page_io_step_size=__extract_param(request, "page_io_step_size", None)
page_io_step_size=__extract_param(request, "page_io_step_size", None),
autocommit=__extract_param(request, "autocommit", True)
)
yield db0
gc.collect()
Expand Down
17 changes: 8 additions & 9 deletions python_tests/test_copy_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
epoch_count = 2
for epoch in range(epoch_count):
print(f"=== Epoch {epoch} ===")
# obj_count = 5000
# commit_count = 100
obj_count = 500
obj_count = 5000
commit_count = 100
# start the writer process for a long run
p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True))
Expand Down Expand Up @@ -327,12 +325,13 @@ def modify_prefix():
assert len(root.value) == total_len


@pytest.mark.parametrize("db0_fixture", [{"autocommit": False}], indirect=True)
def test_copy_prefix_of_recovered_copy(db0_fixture):
file_name = "./test-copy.db0"
# remove file if it exists
if os.path.exists(file_name):
os.remove(file_name)

px_name = db0.get_current_prefix().name
px_path = os.path.join(DB0_DIR, px_name + ".db0")
root = MemoTestSingleton([])
Expand All @@ -355,18 +354,18 @@ def validate(expected_len):
c = charset[i % len(charset)]
assert item.value == c * 1024
assert len(root.value) == expected_len

total_len += modify_prefix(150)
total_len += modify_prefix(5150)
db0.copy_prefix(file_name, page_io_step_size=64 << 10)
db0.close()

# drop original file and replace with copy
os.remove(px_path)
os.rename(file_name, px_path)

# open recovered prefix for update
# open recovered prefix for update
db0.init(DB0_DIR, prefix=px_name, read_write=True)
total_len += modify_prefix(100)
total_len += modify_prefix(1350)

db0.close()
db0.init(DB0_DIR, prefix=px_name, read_write=True)
Expand All @@ -377,7 +376,7 @@ def validate(expected_len):
# restore copy of a restored and modified copy
os.remove(px_path)
os.rename(file_name, px_path)

# open prefix from recovered and modified copy of a copy
db0.init(DB0_DIR, prefix=px_name, read_write=False)
validate(total_len)
3 changes: 0 additions & 3 deletions src/dbzero/bindings/python/PyInternalAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,9 +1040,6 @@ namespace db0::python
auto result = Py_OWN(tryCopyPrefixImpl(*storage, output_file_name, page_io_step_size, meta_io_step_size));
storage->close();
return result.steal();
if (!result) {
return nullptr;
}
} catch (...) {
if (storage) {
storage->close();
Expand Down
8 changes: 4 additions & 4 deletions src/dbzero/core/memory/diff_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,16 +371,16 @@ namespace db0
std::byte *dp_result, const std::byte *dp_end)
{
const std::byte *dp_in = static_cast<const std::byte *>(in_buffer);
for (auto it = diffs.begin(); it != diffs.end(); ) {
for (auto it = diffs.begin(); it != diffs.end();) {
auto diff_size = *it;
++it;
if (diff_size > 0) {
assert(dp_result + diff_size <= dp_end);
std::memcpy(dp_result, dp_in, diff_size);
dp_result += diff_size;
if (dp_result > dp_end) {
if (dp_result + diff_size > dp_end) {
THROWF(db0::IOException) << "applyDiffs: diff application exceeds buffer size";
}
std::memcpy(dp_result, dp_in, diff_size);
dp_result += diff_size;
dp_in += diff_size;
}
if (it == diffs.end()) {
Expand Down
4 changes: 2 additions & 2 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,12 @@ namespace db0

// query.first yields the full-DP (if it exists)
std::uint64_t page_io_id = query.first();
if (page_io_id) {
if (page_io_id) {
if (!!m_ext_space) {
// convert relative page number back to absolute
page_io_id = m_ext_space.getAbsolute(page_io_id);
}
// read full DP
// read full DP
m_page_io.read(page_io_id, read_buf);
} else {
// requesting a diff-DP only encoded page, use zero buffer as a base
Expand Down
30 changes: 23 additions & 7 deletions src/dbzero/core/storage/Diff_IO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ DB0_PACKED_END
{
public:
// buffer is 2 pages long
DiffReader(Page_IO &, std::uint64_t page_num, std::byte *begin, std::byte *end);
DiffReader(Page_IO &, std::uint64_t page_num, std::byte *begin, std::byte *end,
const std::function<void()> &decode_fault);

// appy diffs from a specific page / state number into a provided data buffer
// if underflow occurs then next page needs to be fetched and apply repeated
Expand All @@ -84,6 +85,7 @@ DB0_PACKED_END
std::byte const *m_end;
// the number of objects remaining to be read
unsigned int m_size = 0;
const std::function<void()> &m_decode_fault;
};

DiffWriter::DiffWriter(Page_IO &page_io, std::byte *begin, std::byte *end)
Expand Down Expand Up @@ -166,18 +168,23 @@ DB0_PACKED_END
return m_header.m_size == 0 && m_header.m_offset == 0;
}

DiffReader::DiffReader(Page_IO &page_io, std::uint64_t page_num, std::byte *begin, std::byte *end)
DiffReader::DiffReader(Page_IO &page_io, std::uint64_t page_num, std::byte *begin, std::byte *end,
const std::function<void()> &decode_fault)
: m_page_io(page_io)
, m_page_size(page_io.getPageSize())
, m_page_num(page_num)
, m_begin(begin)
, m_current(begin + m_page_size)
, m_end(end)
, m_decode_fault(decode_fault)
{
page_io.read(page_num, m_begin + m_page_size);
m_size = o_diff_header::__const_ref(m_current).m_size;
// position at the first diff block
m_current += o_diff_header::sizeOf()+ o_diff_header::__const_ref(m_current).m_offset;
m_current += o_diff_header::sizeOf() + o_diff_header::__const_ref(m_current).m_offset;
if (m_current > m_end) {
m_decode_fault();
}
}

bool DiffReader::apply(std::byte *dp_data, std::pair<std::uint64_t, std::uint32_t> page_and_state,
Expand All @@ -197,8 +204,11 @@ DB0_PACKED_END
underflow = true;
return false;
}

o_diff_buffer::__const_ref(m_current).apply(dp_data, dp_data + m_page_size);

auto &diff_buf = o_diff_buffer::__safe_const_ref(
const_bounded_buf_t(m_decode_fault, m_current, m_end)
);
diff_buf.apply(dp_data, dp_data + m_page_size);
m_current += diff_buf_size;
--m_size;
return true;
Expand Down Expand Up @@ -234,12 +244,18 @@ DB0_PACKED_END
, m_writer(std::make_unique<DiffWriter>(
reinterpret_cast<Page_IO&>(*this), m_write_buf.data(), m_write_buf.data() + m_write_buf.size())
)
, m_decode_fault([]() {
THROWF(db0::IOException) << "Diff_IO: decode fault - corrupt diff data";
})
{
}

Diff_IO::Diff_IO(std::size_t header_size, CFile &file, std::uint32_t page_size)
: Page_IO(header_size, file, page_size)
, m_read_buf(page_size * 2)
, m_decode_fault([]() {
THROWF(db0::IOException) << "Diff_IO: decode fault - corrupt diff data";
})
{
}

Expand Down Expand Up @@ -294,7 +310,7 @@ DB0_PACKED_END
{
// must lock because the read-buffer is shared
std::unique_lock<std::mutex> lock(m_mx_read);
DiffReader reader((Page_IO&)*this, page_num, m_read_buf.data(), m_read_buf.data() + m_read_buf.size());
DiffReader reader((Page_IO&)*this, page_num, m_read_buf.data(), m_read_buf.data() + m_read_buf.size(), m_decode_fault);
for (;;) {
bool underflow = false;
if (reader.apply((std::byte*)buffer, page_and_state, underflow)) {
Expand All @@ -305,7 +321,7 @@ DB0_PACKED_END
reader.loadNext();
continue;
}
THROWF(db0::InternalException) << "Diff block not found";
THROWF(db0::InternalException) << "Diff block not found";
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/dbzero/core/storage/Diff_IO.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ namespace db0
std::size_t m_full_dp_bytes_written = 0;
// total bytes written using the diff mechanism
std::size_t m_diff_bytes_written = 0;
// function throwing an exception on decode fault (corrupt diff data)
std::function<void()> m_decode_fault;
};

}
10 changes: 6 additions & 4 deletions src/dbzero/core/storage/REL_Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,15 @@ namespace db0

std::uint64_t REL_Index::assignRelative(std::uint64_t storage_page_num, bool is_first_in_step)
{
if (is_first_in_step) {
assert(storage_page_num >= m_last_storage_page_num);
// prevent adding duplicate mapping (e.g. might be called multiple times after appendDiff)
if (is_first_in_step && (storage_page_num != m_last_storage_page_num)) {
super_t::insert({ ++m_max_rel_page_num, storage_page_num });
assert(storage_page_num > m_last_storage_page_num);
m_last_storage_page_num = storage_page_num;
m_rel_page_num = m_max_rel_page_num;
}

assert(storage_page_num >= m_last_storage_page_num);
auto result = m_rel_page_num + (storage_page_num - m_last_storage_page_num);
if (result > m_max_rel_page_num) {
m_max_rel_page_num = result;
Expand All @@ -208,6 +209,9 @@ namespace db0
{
assert(storage_page_num >= m_last_storage_page_num);
assert(rel_page_num >= m_rel_page_num);

m_max_rel_page_num = rel_page_num;
m_last_storage_page_num = storage_page_num;
if (!this->empty()) {
// check if the mapping is already valid
if (storage_page_num - m_last_storage_page_num == rel_page_num - m_rel_page_num) {
Expand All @@ -218,8 +222,6 @@ namespace db0

// register the new mapping
super_t::insert({ rel_page_num, storage_page_num });
m_max_rel_page_num = rel_page_num;
m_last_storage_page_num = storage_page_num;
m_rel_page_num = rel_page_num;
}

Expand Down
6 changes: 3 additions & 3 deletions src/dbzero/core/storage/REL_Index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ DB0_PACKED_END
std::uint64_t size() const;

const_iterator cbegin() const;

private:
// values maintained in-sync with the tree
std::uint64_t m_last_storage_page_num = 0;
std::uint64_t m_rel_page_num = 0;
std::uint64_t m_last_storage_page_num = 0;
std::uint64_t m_rel_page_num = 0; // key of the last inserted item
std::uint64_t m_max_rel_page_num = 0;
};

Expand Down
7 changes: 7 additions & 0 deletions src/dbzero/core/storage/diff_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,20 @@ namespace db0
auto diff_size = o_packed_int<std::uint16_t>::read(at);
if (diff_size > 0) {
assert(dp_result + diff_size <= dp_end);
// this check prevents processing of corrupt diff data
if (dp_result + diff_size > dp_end) {
THROWF(db0::IOException) << "o_diff_buffer::apply: corrupt diff data";
}
std::memcpy(dp_result, at, diff_size);
dp_result += diff_size;
at += diff_size;
}
if (at < end) {
auto identical_size = o_packed_int<std::uint16_t>::read(at);
dp_result += identical_size;
if (dp_result > dp_end) {
THROWF(db0::IOException) << "o_diff_buffer::apply: corrupt diff data";
}
// zero-fill when the indicator is present (special 0,0 indicator)
if (!diff_size && !identical_size) {
// make sure the indicator is only present at the beginning
Expand Down
6 changes: 3 additions & 3 deletions src/dbzero/core/storage/diff_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
namespace db0

{
DB0_PACKED_BEGIN

DB0_PACKED_BEGIN
class DB0_PACKED_ATTR o_diff_buffer: public o_base<o_diff_buffer, 0, false>
{
protected:
Expand Down Expand Up @@ -41,7 +41,7 @@ DB0_PACKED_BEGIN
static std::size_t sizeOfHeader() {
return sizeof(o_diff_buffer);
}
};

};
DB0_PACKED_END

}
Loading