diff --git a/python_tests/conftest.py b/python_tests/conftest.py index 2d1d82f8..4683e49b 100644 --- a/python_tests/conftest.py +++ b/python_tests/conftest.py @@ -24,8 +24,9 @@ def __extract_param(request, key, default): @pytest.fixture() def db0_fixture(request): if 'D' in db0.build_flags(): - db0.enable_storage_validation(__extract_param(request, "storage_validation", False)) - + db0.reset_test_params() # reset to defaults + db0.enable_storage_validation(__extract_param(request, "storage_validation", False)) + if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) os.mkdir(DB0_DIR) @@ -41,14 +42,15 @@ def db0_fixture(request): yield db0 gc.collect() db0.close() - if 'D' in db0.build_flags(): - db0.enable_storage_validation(False) if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) @pytest.fixture() -def db0_no_default_fixture(): +def db0_no_default_fixture(): + if 'D' in db0.build_flags(): + db0.reset_test_params() # reset to defaults + if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) # create empty directory @@ -63,9 +65,9 @@ def db0_no_default_fixture(): @pytest.fixture def db0_slab_size(request): - """ - DB0 scope with a very short autocommit interval - """ + if 'D' in db0.build_flags(): + db0.reset_test_params() # reset to defaults + if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) # create empty directory @@ -86,8 +88,11 @@ def db0_slab_size(request): @pytest.fixture def db0_autocommit_fixture(request): """ - DB0 scope with a very short autocommit interval + dbzero scope with a very short autocommit interval """ + if 'D' in db0.build_flags(): + db0.reset_test_params() # reset to defaults + if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) # create empty directory @@ -103,9 +108,9 @@ def db0_autocommit_fixture(request): @pytest.fixture() def db0_no_autocommit(): - """ - DB0 scope with a very short autocommit interval - """ + if 'D' in db0.build_flags(): + db0.reset_test_params() # reset to defaults + if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) # create empty directory @@ -174,8 +179,8 @@ def memo_scoped_enum_tags(): @pytest.fixture() def db0_metaio_fixture(): """ - DB0 scope for testing metaio (very small step size) - """ + dbzero scope for testing metaio (very small step size) + """ if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) # create empty directory @@ -192,8 +197,11 @@ def db0_metaio_fixture(): @pytest.fixture() def db0_large_lang_cache_no_autocommit(): """ - DB0 scope for testing large language cache (no autocommit) - """ + dbzero scope for testing large language cache (no autocommit) + """ + if 'D' in db0.build_flags(): + db0.reset_test_params() # reset to defaults + if os.path.exists(DB0_DIR): shutil.rmtree(DB0_DIR) # create empty directory diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index 10b26c33..ec34dcda 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -220,8 +220,8 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): db0.close() # in each 'epoch' we modify prefix while making copies - # then drop the original prefix and restore if from the last copy - epoch_count = 2 + # then drop the original prefix and restore if from the last copy + epoch_count = 1 total_len = 0 for epoch in range(epoch_count): print(f"=== Epoch {epoch} ===", flush=True) @@ -264,10 +264,6 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): p.join() total_len += obj_count * commit_count - # validate original prefix (no copy yet) - print("Validating final prefix ...", flush=True) - validate_current_prefix(expected_len = total_len) - # make final stale copy (i.e. without active modifications) final_copy = f"./test-copy-final.db0" if os.path.exists(final_copy): @@ -382,4 +378,75 @@ def validate(expected_len): # open prefix from recovered and modified copy of a copy db0.init(DB0_DIR, prefix=px_name, read_write=False) validate(total_len) + + +def test_slow_copy(db0_fixture): + """ + Test simulating fast writer and slow copy/reader process (debug mode only) + """ + if 'D' in db0.build_flags(): + px_name = db0.get_current_prefix().name + px_path = os.path.join(DB0_DIR, px_name + ".db0") + + def validate_current_prefix(expected_len = None, expected_min_len = None): + root = db0.fetch(MemoTestSingleton) + assert not expected_min_len or len(root.value) >= expected_min_len + assert not expected_len or len(root.value) == expected_len + for item in root.value: + assert item.value == "b" * 1024 + return len(root.value) + + def validate_copy(copy_id, expected_len = None, expected_min_len = None): + file_name = f"./test-copy-{copy_id}.db0" + os.remove(px_path) + # restore the copy + os.rename(file_name, px_path) + db0.init(DB0_DIR, prefix=px_name, read_write=False) + result = validate_current_prefix(expected_len, expected_min_len) + db0.close() + return result + + db0.close() + + obj_count = 250 + commit_count = 15 + # start the writer process for a long run + p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True)) + p.start() + + db0.init(DB0_DIR) + db0.open(px_name, "r") + last_len = 0 + while True: + try: + if not db0.exists(MemoTestSingleton): + time.sleep(0.1) + continue + root = db0.fetch(MemoTestSingleton) + if len(root.value) > 1: + last_len = len(root.value) + break + except Exception: + pass + time.sleep(0.1) + + copy_id = 0 + # copy the prefix multiple times while it is being modified + db0.set_test_params(sleep_interval = 50) + while True: + if not p.is_alive(): + break + file_name = f"./test-copy-{copy_id}.db0" + if os.path.exists(file_name): + os.remove(file_name) + db0.copy_prefix(file_name, prefix=px_name) + copy_id += 1 + if not p.is_alive(): + break + + p.join() + db0.close() + + for i in range(copy_id): + last_len = validate_copy(i, expected_min_len = last_len) \ No newline at end of file diff --git a/src/dbzero/bindings/python/PyAPI.cpp b/src/dbzero/bindings/python/PyAPI.cpp index 5da082db..ed2366cb 100644 --- a/src/dbzero/bindings/python/PyAPI.cpp +++ b/src/dbzero/bindings/python/PyAPI.cpp @@ -1473,6 +1473,18 @@ namespace db0::python db0::Settings::__storage_validation = enable; Py_RETURN_NONE; } + + PyObject *PyAPI_setTestParams(PyObject *, PyObject *, PyObject *kwargs) + { + PY_API_FUNC + return runSafe(trySetTestParams, kwargs); + } + + PyObject *PyAPI_resetTestParams(PyObject *, PyObject *) + { + PY_API_FUNC + return runSafe(tryResetTestParams); + } #endif PyObject *PyAPI_assign(PyObject *, PyObject *args, PyObject *kwargs) diff --git a/src/dbzero/bindings/python/PyAPI.hpp b/src/dbzero/bindings/python/PyAPI.hpp index 618cd20a..88b6587a 100644 --- a/src/dbzero/bindings/python/PyAPI.hpp +++ b/src/dbzero/bindings/python/PyAPI.hpp @@ -185,9 +185,11 @@ namespace db0::python PyObject *getDRAM_IOMap(PyObject *, PyObject *args, PyObject *kwargs); PyObject *PyAPI_crashFromCommit(PyObject *self, PyObject *const *args, Py_ssize_t nargs); - + PyObject *PyAPI_breakpoint(PyObject *self, PyObject *const *args, Py_ssize_t nargs); PyObject *PyAPI_enableStorageValidation(PyObject *, PyObject *args, PyObject *kwargs); + PyObject *PyAPI_setTestParams(PyObject *, PyObject *args, PyObject *kwargs); + PyObject *PyAPI_resetTestParams(PyObject *, PyObject *); #endif template db0::object_model::StorageClass getStorageClass(); diff --git a/src/dbzero/bindings/python/PyInternalAPI.cpp b/src/dbzero/bindings/python/PyInternalAPI.cpp index c4b29c69..fbbddac8 100644 --- a/src/dbzero/bindings/python/PyInternalAPI.cpp +++ b/src/dbzero/bindings/python/PyInternalAPI.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -231,11 +232,11 @@ namespace db0::python { auto &class_factory = fixture->get(); // find type associated class with the ClassFactory - auto type = class_factory.getExistingType(py_type); - if (!type->isSingleton()) { + auto type = class_factory.tryGetExistingType(py_type); + if (!type || !type->isSingleton()) { return false; } - + return type->isExistingSingleton(); } @@ -1048,6 +1049,21 @@ namespace db0::python } } +#ifndef NDEBUG + PyObject *trySetTestParams(PyObject *py_dict) + { + db0::Config config(py_dict); + db0::Settings::__sleep_interval = config.get("sleep_interval", 0); + Py_RETURN_NONE; + } + + PyObject *tryResetTestParams() + { + db0::Settings::reset(); + Py_RETURN_NONE; + } +#endif + template PyObject *getMaterializedMemoObject(MemoObject *); template PyObject *getMaterializedMemoObject(MemoImmutableObject *); diff --git a/src/dbzero/bindings/python/PyInternalAPI.hpp b/src/dbzero/bindings/python/PyInternalAPI.hpp index b22390e9..cbc15e1a 100644 --- a/src/dbzero/bindings/python/PyInternalAPI.hpp +++ b/src/dbzero/bindings/python/PyInternalAPI.hpp @@ -235,6 +235,8 @@ namespace db0::python PyObject *tryGetDRAM_IOMap(const Fixture &); // opens BDevStorage and reads DRAM_IO map directly, without opening the prefix PyObject *tryGetDRAM_IOMapFromFile(const char *file_name); + PyObject *trySetTestParams(PyObject *py_dict); + PyObject *tryResetTestParams(); #endif PyObject *tryAssign(PyObject *targets, PyObject *key_values); diff --git a/src/dbzero/bindings/python/dbzero.cpp b/src/dbzero/bindings/python/dbzero.cpp index ed9b1048..6284d695 100644 --- a/src/dbzero/bindings/python/dbzero.cpp +++ b/src/dbzero/bindings/python/dbzero.cpp @@ -120,6 +120,8 @@ static PyMethodDef dbzero_methods[] = {"get_dram_io_map", (PyCFunction)&py::getDRAM_IOMap, METH_VARARGS | METH_KEYWORDS, "Get page_num -> state_num mapping related with a specific DRAM_Prefix"}, {"breakpoint", (PyCFunction)&py::PyAPI_breakpoint, METH_FASTCALL, "Testing & debugging function "}, {"enable_storage_validation", (PyCFunction)&py::PyAPI_enableStorageValidation, METH_VARARGS | METH_KEYWORDS, "Enable full storage validation for testing"}, + {"set_test_params", (PyCFunction)&py::PyAPI_setTestParams, METH_VARARGS | METH_KEYWORDS, "Test keyword parameters"}, + {"reset_test_params", (PyCFunction)&py::PyAPI_resetTestParams, METH_NOARGS, "Restore default test parameters"}, #endif {NULL} // Sentinel }; diff --git a/src/dbzero/core/memory/config.cpp b/src/dbzero/core/memory/config.cpp index 24bfacaa..1a8bc739 100644 --- a/src/dbzero/core/memory/config.cpp +++ b/src/dbzero/core/memory/config.cpp @@ -11,10 +11,20 @@ namespace db0 #ifndef NDEBUG bool Settings::__dbg_logs = false; bool Settings::__storage_validation = false; + unsigned long long Settings::__sleep_interval = 0; #endif std::function Settings::m_decode_error = []() { THROWF(db0::IOException) << "Data decoding error: corrupt data detected"; }; + + void Settings::reset() + { +#ifndef NDEBUG + __dbg_logs = false; + __storage_validation = false; + __sleep_interval = 0; +#endif + } } diff --git a/src/dbzero/core/memory/config.hpp b/src/dbzero/core/memory/config.hpp index 52e7d942..2e489d35 100644 --- a/src/dbzero/core/memory/config.hpp +++ b/src/dbzero/core/memory/config.hpp @@ -27,9 +27,14 @@ namespace db0 static bool __dbg_logs; // performs storage full read / write validation (with in-memory mirroring) static bool __storage_validation; + // sleep interval for time-sensitive tests (e.g. copy_prefix) in milliseconds + static unsigned long long __sleep_interval; #endif // Function to throw the data decoding error (i.e. corrupt data detected) static std::function m_decode_error; + + // reset all settings to default values + static void reset(); }; } diff --git a/src/dbzero/core/serialization/bounded_buf_t.hpp b/src/dbzero/core/serialization/bounded_buf_t.hpp index bfed2c1c..3f0f1709 100644 --- a/src/dbzero/core/serialization/bounded_buf_t.hpp +++ b/src/dbzero/core/serialization/bounded_buf_t.hpp @@ -32,11 +32,7 @@ namespace db0 void init(const std::vector &); - inline const std::byte *get() const - { - if (begin >= end) { - m_throw_func(); - } + inline const std::byte *get() const { return begin; } diff --git a/src/dbzero/core/serialization/packed_int.hpp b/src/dbzero/core/serialization/packed_int.hpp index 1eea0767..a6b1f29c 100644 --- a/src/dbzero/core/serialization/packed_int.hpp +++ b/src/dbzero/core/serialization/packed_int.hpp @@ -7,12 +7,14 @@ #include "Types.hpp" #include #include +#include +#include namespace db0 { -DB0_PACKED_BEGIN +DB0_PACKED_BEGIN /** * @tparam IntT - underlying encoded unsigned integer type * @tparam is_nullable flag indicating if this type can be null-ed @@ -64,6 +66,21 @@ DB0_PACKED_BEGIN } } + // read with bounds validation + static IntT read(const std::byte *&at, const std::byte *end) + { + const_bounded_buf_t safe_buf(Settings::m_decode_error, at, end); + if constexpr (is_nullable) { + auto result = decodeNullable(safe_buf); + at = safe_buf; + return result; + } else { + auto result = decode(safe_buf); + at = safe_buf; + return result; + } + } + static void write(std::byte *&at, IntT value) { auto size_of = measure(value); @@ -191,7 +208,7 @@ DB0_PACKED_BEGIN return value; } - template static IntT decodeNullable(buf_t &buf) + template static IntT decodeNullable(buf_t &buf) { // test for null value if (static_cast(*buf) == 0x7f) { @@ -201,11 +218,11 @@ DB0_PACKED_BEGIN return decode(buf); } }; - +DB0_PACKED_END + using packed_int32 = o_packed_int; using packed_int64 = o_packed_int; using nullable_packed_int32 = o_packed_int; using nullable_packed_int64 = o_packed_int; - -DB0_PACKED_END + } diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index 8897d0a4..dfeee65a 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -384,7 +384,7 @@ namespace db0 } m_page_io.write(page_io_id, write_buf); } else { - // append as new page + // append as new page bool is_first_page; auto page_io_id = m_page_io.append(write_buf, &is_first_page); if (!!m_ext_space) { diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index d1a26133..4b261031 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -11,6 +11,7 @@ #include #include "ChangeLogIOStream.hpp" #include +#include namespace db0 @@ -40,46 +41,45 @@ namespace db0 void *DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, const o_dram_chunk_header &header, StateNumType max_state_num) { - if (header.m_state_num > max_state_num) { - // ignore changes beyond the last known consistent state number - return nullptr; - } - - // page map = page_num / state_num - auto dram_page = m_page_map.find(header.m_page_num); - // NOTE: even if the same state number is encountered, the page is updated - // (the previous version might've been incomplete!!) - if (dram_page == m_page_map.end() || header.m_state_num >= dram_page->second.m_state_num) { - // update DRAM to most recent page version, page not marked as dirty - auto result = m_prefix->update(header.m_page_num, false); - if (dram_page == m_page_map.end()) { - // mark address as taken - if (allocs_ptr) { - allocs_ptr->insert(header.m_page_num * m_dram_page_size); - } - } else { - // mark previously occupied block as reusable (read/write mode only) - if (m_access_type == AccessType::READ_WRITE) { - m_reusable_chunks.insert(dram_page->second.m_address); + // NOTE: header may be invalid (i.e. copied chunk marked as invalid on copy post-processing) + // NOTE: ignore changes beyond the last known consistent state number + if (!!header && header.m_state_num <= max_state_num) { + // page map = page_num / state_num + auto dram_page = m_page_map.find(header.m_page_num); + // NOTE: even if the same state number is encountered, the page is updated + // (the previous version might've been incomplete!!) + if (dram_page == m_page_map.end() || header.m_state_num >= dram_page->second.m_state_num) { + // update DRAM to most recent page version, page not marked as dirty + auto result = m_prefix->update(header.m_page_num, false); + if (dram_page == m_page_map.end()) { + // mark address as taken + if (allocs_ptr) { + allocs_ptr->insert(header.m_page_num * m_dram_page_size); + } + } else { + // mark previously occupied block as reusable (read/write mode only) + if (m_access_type == AccessType::READ_WRITE) { + m_reusable_chunks.insert(dram_page->second.m_address); + } } - } - - // update DRAM page info - m_page_map[header.m_page_num] = { header.m_state_num, address }; - // remove address from reusables - { - auto it = m_reusable_chunks.find(address); - if (it != m_reusable_chunks.end()) { - m_reusable_chunks.erase(it); + + // update DRAM page info + m_page_map[header.m_page_num] = { header.m_state_num, address }; + // remove address from reusables + { + auto it = m_reusable_chunks.find(address); + if (it != m_reusable_chunks.end()) { + m_reusable_chunks.erase(it); + } } - } - return result; - } else { - // mark block as reusable (read/write mode only) - if (m_access_type == AccessType::READ_WRITE) { - m_reusable_chunks.insert(address); + return result; } } + + // mark block as reusable (read/write mode only) + if (m_access_type == AccessType::READ_WRITE) { + m_reusable_chunks.insert(address); + } return nullptr; } @@ -232,7 +232,7 @@ namespace db0 BlockIOStream::flush(); // output changelog, no RLE encoding, no duplicates ChangeLogData cl_data(std::move(dram_changelog), false, false, false); - dram_changelog_io.appendChangeLog(std::move(cl_data), state_num); + dram_changelog_io.appendChangeLog(std::move(cl_data), state_num); } #ifndef NDEBUG @@ -375,6 +375,14 @@ namespace db0 // it may come from a more recent update as well (and potentially may only be partially written) // therefore chunk-level checksum validation is necessary dram_io.readFromChunk(address, buffer.data(), buffer.size()); + +#ifndef NDEBUG + // Optional sleep for time-sensitive tests (e.g. copy_prefix) + if (db0::Settings::__sleep_interval > 0) { + std::this_thread::sleep_for( + std::chrono::milliseconds(db0::Settings::__sleep_interval)); + } +#endif } change_log_ptr = changelog_io.readChangeLogChunk(); } @@ -405,19 +413,25 @@ namespace db0 const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf()); return isDRAM_ChunkValid(dram_page_size, header, header.getData(), buffer.data() + buffer.size()); } - + + StateNumType getDRAM_ChunkStateNum(const std::vector &chunk_data) + { + const auto &header = o_dram_chunk_header::__const_ref(chunk_data.data() + o_block_io_chunk_header::sizeOf()); + return header.m_state_num; + } + void flushDRAM_IOChanges(DRAM_IOStream &dram_io, const std::unordered_map > &chunks_buf) { auto dram_page_size = dram_io.getDRAMPrefix().getPageSize(); - for (const auto &item: chunks_buf) { + for (const auto &item: chunks_buf) { auto address = item.first; const auto &buffer = item.second; // NOTE: we don't flush inconsistent / incomplete chunks if (!isDRAM_ChunkValid(dram_page_size, buffer)) { continue; } - dram_io.writeToChunk(address, buffer.data(), buffer.size()); + dram_io.writeToChunk(address, buffer.data(), buffer.size()); } } @@ -428,13 +442,20 @@ namespace db0 if (!isDRAM_ChunkValid(dram_page_size, buffer)) { continue; } - dram_io.addChunk(buffer.size()); - dram_io.appendToChunk(buffer.data(), buffer.size()); + // NOTE: buffer already includes BlockIOStream's chunk header + auto chunk_size = buffer.size() - o_block_io_chunk_header::sizeOf(); + auto chunk_data = buffer.data() + o_block_io_chunk_header::sizeOf(); + dram_io.addChunk(chunk_size); + dram_io.appendToChunk(chunk_data, chunk_size); } } - + std::uint64_t o_dram_chunk_header::calculateHash(const void *data, std::size_t data_size) const { return db0::murmurhash64A(data, data_size, m_page_num + m_state_num); } + bool o_dram_chunk_header::operator!() const { + return m_state_num == 0 && m_page_num == 0 && m_hash == 0; + } + } \ No newline at end of file diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index b58a1805..8d45627b 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -35,6 +35,7 @@ DB0_PACKED_BEGIN StateNumType m_state_num = 0; std::uint64_t m_page_num = 0; + // as invalid o_dram_chunk_header() = default; o_dram_chunk_header(StateNumType state_num, std::uint64_t page_num = 0) : m_state_num(state_num) @@ -42,6 +43,8 @@ DB0_PACKED_BEGIN { } + bool operator!() const; + // Calculate hash from the entire block's data (including header) std::uint64_t calculateHash(const void *data, std::size_t data_size) const; @@ -194,6 +197,9 @@ DB0_PACKED_END bool isDRAM_ChunkValid(std::uint32_t dram_page_size, const o_dram_chunk_header &header, const void *data_begin, const void *data_end); + // Extract state number from a valid DRAM chunk + StateNumType getDRAM_ChunkStateNum(const std::vector &chunk_data); + // Pre-fetch changes into the chunks buffer // @param callback optional function to be called for each changelog chunk read // @return the latest state number of available changes diff --git a/src/dbzero/core/storage/Page_IO.cpp b/src/dbzero/core/storage/Page_IO.cpp index d7d44b44..2d468b1f 100644 --- a/src/dbzero/core/storage/Page_IO.cpp +++ b/src/dbzero/core/storage/Page_IO.cpp @@ -207,7 +207,7 @@ namespace db0 std::optional end_page_num) : m_page_io(page_io) , m_step_it(ext_space) - , m_end_page_num(std::min(end_page_num.value_or(std::numeric_limits::max()), page_io.getEndPageNum())) + , m_end_page_num(std::min(end_page_num.value_or(std::numeric_limits::max()), endPageNum())) , m_current_page_num(getFirstPageNum(ext_space)) { } @@ -258,6 +258,7 @@ namespace db0 std::uint64_t Page_IO::Reader::endPageNum() const { // calculate end page number from actual file size + m_page_io.m_file.refresh(); auto file_size = m_page_io.m_file.size(); if (file_size < m_page_io.m_header_size) { return 0; diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index e36c3e8a..a1312cd8 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -31,6 +31,25 @@ namespace db0 return result; } + std::unordered_map > filterDRAM_Chunks( + std::unordered_map > &&chunk_buf, DRAM_FilterT filter) + { + auto it = chunk_buf.begin(); + while (it != chunk_buf.end()) { + // NOTE: this buffer also contains the block IO header at the beginning + const auto &buffer = it->second; + const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf()); + if (!filter(header, buffer.data() + buffer.size())) { + // discard this chunk + it = chunk_buf.erase(it); + } else { + ++it; + } + } + + return chunk_buf; + } + void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog) { @@ -38,7 +57,7 @@ namespace db0 // Exhaust the input_dram_changelog first // NOTE: we don't need to copy the changelog, just insert an empty item with the latest state number - input_dram_changelog.setStreamPosHead(); + input_dram_changelog.setStreamPosHead(); for (;;) { while (input_dram_changelog.readChangeLogChunk()); // continue refreshing until reaching the most recent state @@ -46,11 +65,35 @@ namespace db0 break; } } - + + auto last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk(); + if (!last_chunk_ptr) { + // looks like the DRAM IO is empty + return; + } + + // retrieve the state number candidate + auto state_num = last_chunk_ptr->m_state_num; + // Copy the entire DRAM_IO stream next (possibly inconsistent state) // collecting the mapping of chunk addresses + // NOTE: when copying we ignore: a) incomplete pages (hash mismatch), b) pages beyond the last consistent state number + // they will be processed later when following up with the changelog std::unordered_map chunk_addr_map; - copyStream(input_io, output_io, &chunk_addr_map); + auto dram_page_size = input_io.getDRAMPageSize(); + auto dram_filter = [&](const o_dram_chunk_header &header, const void *data_end) + { + if (!isDRAM_ChunkValid(dram_page_size, header, header.getData(), data_end)) { + return false; + } + // reject chunks beyond the last consistent state number + if (header.m_state_num > state_num) { + return false; + } + return true; + }; + + copyStream(input_io, output_io, &chunk_addr_map, dram_filter); // Chunks loaded during the sync step // NOTE: in this step we prefetch to memory to be able to catch up with changes @@ -58,26 +101,28 @@ namespace db0 while (input_dram_changelog.refresh()) { fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf); } - auto last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk(); - if (!last_chunk_ptr) { - // looks like the DRAM IO is empty - return; - } - + + last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk(); + assert(last_chunk_ptr); + // this is the actually copied last consistent state number - auto state_num = last_chunk_ptr->m_state_num; - + state_num = last_chunk_ptr->m_state_num; + + // NOTE: at this stage we might also encounter incomplete + // or new chunks beyond the copied stream which needs to be discarded + chunk_buf = filterDRAM_Chunks(std::move(chunk_buf), dram_filter); // NOTE: flush must be done under translated addresses (or appended to stream if translation not present) auto bufs_pair = translateDRAM_Chunks(std::move(chunk_buf), chunk_addr_map); flushDRAM_IOChanges(output_io, bufs_pair.first); // append new chuks which were not present during the initial copy - appendDRAM_IOChunks(output_io, bufs_pair.second); + appendDRAM_IOChunks(output_io, bufs_pair.second); // append the sentinel entry with state number only (i.e. empty changelog) output_dram_changelog.appendChangeLog({}, state_num); + output_io.close(); } - std::vector copyStream(BlockIOStream &in, BlockIOStream &out, - std::unordered_map *addr_map) + std::vector copyStream(BlockIOStream &in, BlockIOStream &out, + std::unordered_map *addr_map, DRAM_FilterT filter) { // position at the beginning of the stream in.setStreamPosHead(); @@ -86,12 +131,26 @@ namespace db0 std::uint64_t in_addr, out_addr; for (;;) { while ((chunk_size = in.readChunk(buffer, 0, &in_addr)) > 0) { + // NOTE: this buffer does NOT include the block IO header at the beginning + const auto &header = o_dram_chunk_header::__const_ref(buffer.data()); + if (filter && !filter(header, buffer.data() + buffer.size())) { + // skip this chunk + continue; + } + out.addChunk(chunk_size, &out_addr); out.appendToChunk(buffer.data(), chunk_size); // register the mapping if (addr_map) { addr_map->emplace(in_addr, out_addr); } + +#ifndef NDEBUG + if (db0::Settings::__sleep_interval > 0) { + std::this_thread::sleep_for( + std::chrono::milliseconds(db0::Settings::__sleep_interval)); + } +#endif } if (!in.refresh()) { // continue refreshing until reaching the most recent state @@ -150,7 +209,7 @@ namespace db0 THROWF(db0::IOException) << "copyPageIO: page size mismatch between input and output streams"; } - Page_IO::Reader reader(in, src_ext_space, end_page_num); + Page_IO::Reader reader(in, src_ext_space, end_page_num); std::vector buffer; std::uint64_t start_page_num = 0; while (auto page_count = reader.next(buffer, start_page_num)) { @@ -163,7 +222,7 @@ namespace db0 // page number (absolute) in the output stream auto storage_page_num = out.getNextPageNum().first; auto count = std::min(page_count, out.getCurrentStepRemainingPages()); - // append as many pages as possible in the current "step" + // append as many pages as possible in the current "step" out.append(buf_ptr, count); buf_ptr += page_size * count; // note start_page_num must be registered as relative to storage_page_num @@ -173,7 +232,7 @@ namespace db0 page_count -= count; start_page_num += count; } - } + } } } \ No newline at end of file diff --git a/src/dbzero/core/storage/copy_prefix.hpp b/src/dbzero/core/storage/copy_prefix.hpp index a79866df..53171944 100644 --- a/src/dbzero/core/storage/copy_prefix.hpp +++ b/src/dbzero/core/storage/copy_prefix.hpp @@ -25,11 +25,15 @@ namespace db0 void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog); + using DRAM_FilterT = std::function; + // Copy entire contents from one BlockIOStream to another (type agnostic) // @param addr_map optional map to receive address translation (from source to destination) + // @param chunk_filter optional filter to decide whether a specific chunk is to be copied // @return the last copied chunk data std::vector copyStream(BlockIOStream &in, BlockIOStream &out, - std::unordered_map *addr_map = nullptr); + std::unordered_map *addr_map = nullptr, + DRAM_FilterT chunk_filter = {}); // DP-changelog specialization // @return the last chunk's header (if anything copied) diff --git a/src/dbzero/core/storage/diff_buffer.cpp b/src/dbzero/core/storage/diff_buffer.cpp index 2c002eab..0a649f61 100644 --- a/src/dbzero/core/storage/diff_buffer.cpp +++ b/src/dbzero/core/storage/diff_buffer.cpp @@ -58,7 +58,7 @@ namespace db0 const std::byte *at = (std::byte*)this + sizeof(o_diff_buffer); auto end = (std::byte*)this + m_size; while (at < end) { - auto diff_size = o_packed_int::read(at); + auto diff_size = o_packed_int::read(at, end); if (diff_size > 0) { assert(dp_result + diff_size <= dp_end); // this check prevents processing of corrupt diff data @@ -70,7 +70,7 @@ namespace db0 at += diff_size; } if (at < end) { - auto identical_size = o_packed_int::read(at); + auto identical_size = o_packed_int::read(at, end); dp_result += identical_size; if (dp_result > dp_end) { THROWF(db0::IOException) << "o_diff_buffer::apply: corrupt diff data"; diff --git a/src/dbzero/workspace/Config.cpp b/src/dbzero/workspace/Config.cpp index d267e11f..da5f0b35 100644 --- a/src/dbzero/workspace/Config.cpp +++ b/src/dbzero/workspace/Config.cpp @@ -33,8 +33,8 @@ namespace db0 } return LangToolkit::getLong(lang_dict, key); } - - // long specialization + + // unsigned long long specialization template <> std::optional get( typename LangToolkit::ObjectPtr lang_dict, const std::string &key) {