diff --git a/README.md b/README.md index cc63fab0..84d82d92 100644 --- a/README.md +++ b/README.md @@ -292,11 +292,11 @@ Configure dbzero during initialization: ```python db0.init( - storage_dir="/path/to/data", + dbzero_root="/path/to/data", config={ 'autocommit': True, - 'autocommit_interval': 1000, # milliseconds - 'cache_size': 1024 * 1024 * 100 # 100MB + 'autocommit_interval': 367, # milliseconds + 'cache_size': 8 << 30 # 8GiB } ) ``` diff --git a/dbzero/dbzero/initialization.py b/dbzero/dbzero/initialization.py index c285e8c3..1cc16ac9 100644 --- a/dbzero/dbzero/initialization.py +++ b/dbzero/dbzero/initialization.py @@ -24,7 +24,7 @@ def init(dbzero_root: str, **kwargs) -> None: Configure global dbzero behavior: * autocommit (bool, default True) to enable automatic commits - * autocommit_interval (int, default 250) for commit interval in milliseconds + * autocommit_interval (int, default 367) for commit interval in milliseconds * cache_size (int, default 2 GiB) for main object cache size in bytes * lang_cache_size (int, default 1024) for language model data cache size * lock_flags (dict) to configure locking behavior when opening the prefix in read-write mode diff --git a/python_tests/test_autocommit.py b/python_tests/test_autocommit.py index 12a93cab..53a68d28 100644 --- a/python_tests/test_autocommit.py +++ b/python_tests/test_autocommit.py @@ -12,9 +12,10 @@ def test_db0_starts_autocommit_by_default(db0_fixture): object_1 = MemoTestClass(951) + commit_interval = db0.get_config()['autocommit_interval'] state_1 = db0.get_state_num() - # auto-commit should happen no later than within 250ms - time.sleep(0.3) + # wait as long as autocommit interval + 100ms margin + time.sleep(commit_interval / 1000.0 + 0.1) state_2 = db0.get_state_num() # state changed due to autocommit assert state_2 > state_1 @@ -136,7 +137,7 @@ def test_list_items_append(db0_autocommit_fixture): def test_autocommit_config(db0_fixture): cfg = db0.get_config() assert cfg['autocommit'] == True - assert cfg['autocommit_interval'] == 250 + default_interval = cfg['autocommit_interval'] db0.close() db0.init(DB0_DIR, autocommit=False, autocommit_interval=1000) @@ -148,7 +149,7 @@ def test_autocommit_config(db0_fixture): db0.init(DB0_DIR, autocommit=False) cfg = db0.get_config() assert cfg['autocommit'] == False - assert cfg['autocommit_interval'] == 250 + assert cfg['autocommit_interval'] == default_interval db0.close() with pytest.raises(Exception): diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index ec34dcda..dc352354 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -221,7 +221,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): # in each 'epoch' we modify prefix while making copies # then drop the original prefix and restore if from the last copy - epoch_count = 1 + epoch_count = 3 total_len = 0 for epoch in range(epoch_count): print(f"=== Epoch {epoch} ===", flush=True) diff --git a/python_tests/test_crash_recovery.py b/python_tests/test_crash_recovery.py index fc7e99a6..057cfc37 100644 --- a/python_tests/test_crash_recovery.py +++ b/python_tests/test_crash_recovery.py @@ -26,14 +26,18 @@ def generator_process(op_size, op_count, crash_after): db0.init(DB0_DIR, autocommit=False) db0.open(px_name, "rw") root = MemoTestSingleton() - next_id = len(root.value) + next_id = len(root.value) for i in range(op_count): for _ in range(op_size): root.value.append(MemoTestClass(next_id)) root.value_2.append(expected_values[next_id]) next_id += 1 - if crash_after is not None and i == crash_after: - db0.dbg_crash_from_commit(3) + if crash_after is not None and i == crash_after[0]: + # activate the write poison to simulate a crash + if crash_after[1]: + db0.set_test_params(dram_io_flush_poison = crash_after[1]) + else: + db0.set_test_params(write_poison = 3) db0.commit() # NOTE: db0.close is not called if the process crashes db0.close() @@ -43,18 +47,20 @@ def generator_process(op_size, op_count, crash_after): db0.close() # NOTE: the 2nd process will crash during the 3rd commit - crash_after = [None, 3, None] + # the 3rd process will be killed during DRAM IO flush + crash_after = [None, (3, None), (2, 2), None] op_size = 50 op_count = 5 - for i in range(3): - p = multiprocessing.Process(target=generator_process, args=(op_size, op_count, crash_after[i])) + for poison in crash_after: + p = multiprocessing.Process(target=generator_process, args=(op_size, op_count, poison)) p.start() p.join() db0.init(DB0_DIR) db0.open(px_name, "r") - # NOTE: we expect the 2 transactions to be discarded - expected_len = op_size * op_count * 3 - (2 * op_size) + # NOTE: we expect the 2 cycles to be fully completed + # and 2 to be partially completed + expected_len = op_size * op_count * 2 + (5 * op_size) assert len(MemoTestSingleton().value) == expected_len assert len(MemoTestSingleton().value_2) == expected_len diff --git a/python_tests/test_durability.py b/python_tests/test_durability.py index 03aa9adf..b7c7fbb7 100644 --- a/python_tests/test_durability.py +++ b/python_tests/test_durability.py @@ -80,8 +80,8 @@ def open_prefix_then_crash(): # end process with exception before commit / close raise Exception("Crash!") -def test_opening_prefix_of_crashed_process(db0_no_default_fixture): +def test_opening_prefix_of_crashed_process(db0_no_default_fixture): p = multiprocessing.Process(target=open_prefix_then_crash) p.start() p.join() @@ -92,7 +92,6 @@ def test_opening_prefix_of_crashed_process(db0_no_default_fixture): def test_modify_prefix_of_crashed_process(db0_no_default_fixture): - p = multiprocessing.Process(target=open_prefix_then_crash) p.start() p.join() @@ -102,12 +101,14 @@ def test_modify_prefix_of_crashed_process(db0_no_default_fixture): db0.tags(MemoTestClass(123)).add("tag1", "tag2") db0.commit() + def rand_string(max_len): import random import string actual_len = random.randint(1, max_len) return ''.join(random.choice(string.ascii_letters) for i in range(actual_len)) + def create_objects(append_count=1000): db0.open("new-prefix-1") buf = db0.list() @@ -123,23 +124,6 @@ def create_objects(append_count=1000): db0.commit() db0.close() -# def test_durability_of_random_objects_issue1(db0_no_default_fixture): -# """ -# This test was failing with an exception when opening the prefix: -# BDevStorage::findMutation: page_num 0 not found, state: 10 -# Resolution: the problem was related to DirtyCache assuming page_size as pow 2 while DRAM_Prefix page size is not -# """ -# append_count = 1000 - -# p = multiprocessing.Process(target=create_objects, args=(append_count,)) -# p.start() -# p.join() - -# # read the created objects -# db0.open("new-prefix-1", "r") -# root = MemoTestSingleton() -# assert len(root.value) == append_count - def test_dump_dram_io_map(db0_fixture): if 'D' in db0.build_flags(): diff --git a/src/dbzero/bindings/python/PyAPI.cpp b/src/dbzero/bindings/python/PyAPI.cpp index ae083662..a188db63 100644 --- a/src/dbzero/bindings/python/PyAPI.cpp +++ b/src/dbzero/bindings/python/PyAPI.cpp @@ -1424,30 +1424,7 @@ namespace db0::python db0::Settings::__dbg_logs = true; Py_RETURN_NONE; } - - PyObject *tryCrashFromCommit(unsigned int op_count) - { - PyToolkit::getPyWorkspace().getWorkspace().setCrashFromCommit(op_count); - Py_RETURN_NONE; - } - - PyObject *PyAPI_crashFromCommit(PyObject *self, PyObject *const *args, Py_ssize_t nargs) - { - PY_API_FUNC - if (nargs != 1) { - PyErr_SetString(PyExc_TypeError, "Function requires exactly 1 argument"); - return NULL; - } - - if (!PyLong_Check(args[0])) { - PyErr_SetString(PyExc_TypeError, "Argument must be an integer"); - return NULL; - } - - auto op_count = PyLong_AsUnsignedLong(args[0]); - return runSafe(tryCrashFromCommit, op_count); - } - + PyObject *PyAPI_breakpoint(PyObject *self, PyObject *const *args, Py_ssize_t nargs) { PY_API_FUNC diff --git a/src/dbzero/bindings/python/PyAPI.hpp b/src/dbzero/bindings/python/PyAPI.hpp index 88b6587a..9bd28229 100644 --- a/src/dbzero/bindings/python/PyAPI.hpp +++ b/src/dbzero/bindings/python/PyAPI.hpp @@ -183,9 +183,7 @@ namespace db0::python // For a specific prefix, extract page num -> state num mapping related with its DRAM_Prefix PyObject *getDRAM_IOMap(PyObject *, PyObject *args, PyObject *kwargs); - - PyObject *PyAPI_crashFromCommit(PyObject *self, PyObject *const *args, Py_ssize_t nargs); - + PyObject *PyAPI_breakpoint(PyObject *self, PyObject *const *args, Py_ssize_t nargs); PyObject *PyAPI_enableStorageValidation(PyObject *, PyObject *args, PyObject *kwargs); PyObject *PyAPI_setTestParams(PyObject *, PyObject *args, PyObject *kwargs); diff --git a/src/dbzero/bindings/python/PyInternalAPI.cpp b/src/dbzero/bindings/python/PyInternalAPI.cpp index fbbddac8..c52a9e7c 100644 --- a/src/dbzero/bindings/python/PyInternalAPI.cpp +++ b/src/dbzero/bindings/python/PyInternalAPI.cpp @@ -1053,10 +1053,18 @@ namespace db0::python PyObject *trySetTestParams(PyObject *py_dict) { db0::Config config(py_dict); - db0::Settings::__sleep_interval = config.get("sleep_interval", 0); + if (config.hasKey("sleep_interval")) { + db0::Settings::__sleep_interval = config.get("sleep_interval", 0); + } + if (config.hasKey("write_poison")) { + db0::Settings::__write_poison = config.get("write_poison", 0); + } + if (config.hasKey("dram_io_flush_poison")) { + db0::Settings::__dram_io_flush_poison = config.get("dram_io_flush_poison", 0); + } Py_RETURN_NONE; } - + PyObject *tryResetTestParams() { db0::Settings::reset(); diff --git a/src/dbzero/bindings/python/PyToolkit.cpp b/src/dbzero/bindings/python/PyToolkit.cpp index e4d4b5bc..46fb1e22 100644 --- a/src/dbzero/bindings/python/PyToolkit.cpp +++ b/src/dbzero/bindings/python/PyToolkit.cpp @@ -671,6 +671,20 @@ namespace db0::python } return PyLong_AsUnsignedLongLong(*py_value); } + + std::optional PyToolkit::getUnsignedInt(ObjectPtr py_object, const std::string &key) + { + auto py_value = Py_OWN(getValue(py_object, key)); + if (!py_value) { + return std::nullopt; + } + + if (!PyLong_Check(*py_value)) { + THROWF(db0::InputException) << "Invalid type of: " << key << ". Integer expected but got: " + << Py_TYPE(*py_value)->tp_name << THROWF_END; + } + return PyLong_AsUnsignedLong(*py_value); + } std::optional PyToolkit::getBool(ObjectPtr py_object, const std::string &key) { @@ -696,7 +710,14 @@ namespace db0::python return std::string(PyUnicode_AsUTF8(*py_value)); } - bool PyToolkit::compare(ObjectPtr py_object1, ObjectPtr py_object2) { + bool PyToolkit::hasKey(ObjectPtr py_object, const std::string &key) + { + auto py_value = Py_OWN(getValue(py_object, key)); + return py_value.get() != nullptr; + } + + bool PyToolkit::compare(ObjectPtr py_object1, ObjectPtr py_object2) + { auto result = PyObject_RichCompareBool(py_object1, py_object2, Py_EQ); if (result < 0) { // comparison failed diff --git a/src/dbzero/bindings/python/PyToolkit.hpp b/src/dbzero/bindings/python/PyToolkit.hpp index c7a891cc..5c4fb65d 100644 --- a/src/dbzero/bindings/python/PyToolkit.hpp +++ b/src/dbzero/bindings/python/PyToolkit.hpp @@ -234,8 +234,11 @@ namespace db0::python // Extract keys (if present) from a Python dict object static std::optional getLong(ObjectPtr py_object, const std::string &key); static std::optional getUnsignedLongLong(ObjectPtr py_object, const std::string &key); + static std::optional getUnsignedInt(ObjectPtr py_object, const std::string &key); static std::optional getBool(ObjectPtr py_object, const std::string &key); static std::optional getString(ObjectPtr py_object, const std::string &key); + // Check if key exists in a Python dict object + static bool hasKey(ObjectPtr py_object, const std::string &key); // Blocks until lock acquired static SafeRLock lockApi(); diff --git a/src/dbzero/bindings/python/dbzero.cpp b/src/dbzero/bindings/python/dbzero.cpp index 6284d695..c2b98c9c 100644 --- a/src/dbzero/bindings/python/dbzero.cpp +++ b/src/dbzero/bindings/python/dbzero.cpp @@ -114,8 +114,7 @@ static PyMethodDef dbzero_methods[] = {"dbg_write_bytes", &py::writeBytes, METH_VARARGS, "Debug function"}, {"dbg_free_bytes", &py::freeBytes, METH_VARARGS, "Debug function"}, {"dbg_read_bytes", &py::readBytes, METH_VARARGS, "Debug function"}, - {"dbg_start_logs", &py::PyAPI_startDebugLogs, METH_VARARGS, "Enable dbzeo debug logs"}, - {"dbg_crash_from_commit", (PyCFunction)&py::PyAPI_crashFromCommit, METH_FASTCALL, "The function activates abrupt process terminate from storage::commit after specific number of DP writes (for testing purposes)"}, + {"dbg_start_logs", &py::PyAPI_startDebugLogs, METH_VARARGS, "Enable dbzeo debug logs"}, {"get_base_lock_usage", &py::getResourceLockUsage, METH_VARARGS, "Debug function, retrieves total memory occupied by ResourceLocks"}, {"get_dram_io_map", (PyCFunction)&py::getDRAM_IOMap, METH_VARARGS | METH_KEYWORDS, "Get page_num -> state_num mapping related with a specific DRAM_Prefix"}, {"breakpoint", (PyCFunction)&py::PyAPI_breakpoint, METH_FASTCALL, "Testing & debugging function "}, diff --git a/src/dbzero/core/memory/config.cpp b/src/dbzero/core/memory/config.cpp index 1a8bc739..48465a84 100644 --- a/src/dbzero/core/memory/config.cpp +++ b/src/dbzero/core/memory/config.cpp @@ -12,6 +12,8 @@ namespace db0 bool Settings::__dbg_logs = false; bool Settings::__storage_validation = false; unsigned long long Settings::__sleep_interval = 0; + unsigned int Settings::__write_poison = 0; + unsigned int Settings::__dram_io_flush_poison = 0; #endif std::function Settings::m_decode_error = []() { @@ -24,6 +26,8 @@ namespace db0 __dbg_logs = false; __storage_validation = false; __sleep_interval = 0; + __write_poison = 0; + __dram_io_flush_poison = 0; #endif } diff --git a/src/dbzero/core/memory/config.hpp b/src/dbzero/core/memory/config.hpp index 2e489d35..903e50c3 100644 --- a/src/dbzero/core/memory/config.hpp +++ b/src/dbzero/core/memory/config.hpp @@ -29,6 +29,10 @@ namespace db0 static bool __storage_validation; // sleep interval for time-sensitive tests (e.g. copy_prefix) in milliseconds static unsigned long long __sleep_interval; + // the number of allowed writes before std::abort (or 0 = disabled) + static unsigned int __write_poison; + // the number of allowed DRAM_IO flush operations before std::abort (or 0 = disabled) + static unsigned int __dram_io_flush_poison; #endif // Function to throw the data decoding error (i.e. corrupt data detected) static std::function m_decode_error; diff --git a/src/dbzero/core/memory/utils.cpp b/src/dbzero/core/memory/utils.cpp index 5d22a0ed..0d174723 100644 --- a/src/dbzero/core/memory/utils.cpp +++ b/src/dbzero/core/memory/utils.cpp @@ -48,5 +48,14 @@ namespace db0 } std::cout << std::endl; } + + void checkPoisonedOp(unsigned int &counter) + { + if (counter > 0) { + if (--counter == 0) { + std::abort(); + } + } + } } \ No newline at end of file diff --git a/src/dbzero/core/memory/utils.hpp b/src/dbzero/core/memory/utils.hpp index e3cdac15..264979ca 100644 --- a/src/dbzero/core/memory/utils.hpp +++ b/src/dbzero/core/memory/utils.hpp @@ -88,4 +88,7 @@ namespace db0 return {}; } + // std::abort or decrement the counter (nothing if counter is 0) + void checkPoisonedOp(unsigned int &counter); + } diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index dfeee65a..dc1b9b90 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include "copy_prefix.hpp" namespace db0 @@ -57,11 +58,13 @@ namespace db0 , m_ext_dram_changelog_io(tryGetChangeLogIOStream( m_config.m_ext_dram_changelog_io_offset, access_type) ) - , m_ext_dram_io(init(tryGetDRAMIOStream( + , m_ext_dram_io(initExt(tryGetDRAMIOStream( m_config.m_ext_dram_io_offset, m_config.m_ext_dram_page_size, access_type), m_ext_dram_changelog_io.get(), // NOTE: the NO_LOAD flag is not applicable to ext DRAM IO since it's created on-demand - flags & ~ StorageFlags {StorageOptions::NO_LOAD }) + flags & ~ StorageFlags {StorageOptions::NO_LOAD }, + // NOTE: we synchronize up to the maximum state number from DRAM IO (in read/write mode) + this->getMaxExtStateNum()) ) , m_ext_space(tryGetDRAMPair(m_ext_dram_io.get()), access_type) , m_page_io(getPage_IO(getNextStoragePageNum(), m_config.m_page_io_step_size)) @@ -103,12 +106,12 @@ namespace db0 return std::move(dram_io); } - std::unique_ptr BDevStorage::init(std::unique_ptr &&dram_io, - DRAM_ChangeLogStreamT *dram_change_log, StorageFlags flags) + std::unique_ptr BDevStorage::initExt(std::unique_ptr &&dram_io, + DRAM_ChangeLogStreamT *dram_change_log, StorageFlags flags, std::optional max_state_num) { if (dram_io && !flags[StorageOptions::NO_LOAD]) { assert(dram_change_log); - dram_io->load(*dram_change_log); + dram_io->load(*dram_change_log, max_state_num); } return std::move(dram_io); } @@ -395,7 +398,7 @@ namespace db0 m_sparse_index.emplace(page_num, state_num, page_io_id); #ifndef NDEBUG m_page_io_raw_bytes += m_config.m_page_size; - checkCrashFromCommit(); + checkPoisonedOp(Settings::__write_poison); #endif } } @@ -435,7 +438,7 @@ namespace db0 #ifndef NDEBUG m_page_io_raw_bytes += m_config.m_page_size; - checkCrashFromCommit(); + checkPoisonedOp(Settings::__write_poison); #endif #ifndef NDEBUG @@ -791,24 +794,10 @@ namespace db0 void BDevStorage::getDRAM_IOMap(std::unordered_map &io_map) const { m_dram_io.getDRAM_IOMap(io_map); } - + void BDevStorage::dramIOCheck(std::vector &check_result) const { m_dram_io.dramIOCheck(check_result); } - - void BDevStorage::setCrashFromCommit(unsigned int *throw_op_count_ptr) { - this->m_throw_op_count_ptr = throw_op_count_ptr; - } - - void BDevStorage::checkCrashFromCommit() - { - if (m_throw_op_count_ptr && *m_throw_op_count_ptr > 0) { - if (!--(*m_throw_op_count_ptr)) { - // terminate the process abruptly - std::terminate(); - } - } - } #endif void BDevStorage::fetchDP_ChangeLogs(StateNumType begin_state, std::optional end_state, @@ -884,20 +873,34 @@ namespace db0 } auto writer = out.m_dram_changelog_io.getStreamWriter(); - copyDRAM_IO(m_dram_io, m_dram_changelog_io, out.m_dram_io, writer); - auto dp_header = copyDPStream(m_dp_changelog_io, out.m_dp_changelog_io); - StateNumType max_state_num = 0; - if (dp_header) { - max_state_num = dp_header->m_state_num; - std::uint64_t end_page_num = dp_header->m_end_storage_page_num; - // NOTE: end_page_num may be relative, need to translate to absolute - if (!!m_ext_space) { - end_page_num = m_ext_space.getAbsolute(end_page_num); - } - copyPageIO(m_page_io, m_ext_space, out.m_page_io, end_page_num, out.m_ext_space); + auto maybe_max_state_num = copyDRAM_IO(m_dram_io, m_dram_changelog_io, out.m_dram_io, writer); + if (!maybe_max_state_num) { + // nothing to copy + return; } + + auto max_state_num = *maybe_max_state_num; + // copy up to the max_state_num (inclusive) + auto dp_header = copyDPStream(m_dp_changelog_io, out.m_dp_changelog_io, max_state_num); + if (!dp_header) { + THROWF(db0::IOException) << "BDevStorage::copyTo: failed to copy DP changelog"; + } + + // assure copied streams are consistent + if (dp_header->m_state_num != max_state_num) { + THROWF(db0::IOException) << "BDevStorage::copyTo: inconsistent max_state_num in DP changelog"; + } + std::uint64_t end_page_num = dp_header->m_end_storage_page_num; + // NOTE: end_page_num may be relative, need to translate to absolute + if (!!m_ext_space) { + end_page_num = m_ext_space.getAbsolute(end_page_num); + } + copyPageIO(m_page_io, m_ext_space, out.m_page_io, end_page_num, out.m_ext_space); - copyStream(m_meta_io, out.m_meta_io); + // NOTE: meta_is stream can't be copied since it's structure depends on the managed streams + // NOTE: for simplicity we don't generate the entire meta-io, just save the last checkpoint + out.m_meta_io.checkAndAppend(max_state_num); + // flush ext-space only, the other streams are already flushed by copy operators // NOTE: we need to use max state num from the source storage since the desination // did not load the sparse index (it was only copied) @@ -928,4 +931,14 @@ namespace db0 return page_io_id; } + std::optional BDevStorage::getMaxExtStateNum() const + { + if (m_access_type == AccessType::READ_ONLY) { + // no synchronization required in read-only mode + return std::nullopt; + } + // synchronize to the same state as the DRAM IO + return getMaxStateNum(); + } + } \ No newline at end of file diff --git a/src/dbzero/core/storage/BDevStorage.hpp b/src/dbzero/core/storage/BDevStorage.hpp index f545285e..d0106056 100644 --- a/src/dbzero/core/storage/BDevStorage.hpp +++ b/src/dbzero/core/storage/BDevStorage.hpp @@ -153,13 +153,11 @@ DB0_PACKED_END // Copy a read-only prefix to an empty BDevStorage void copyTo(BDevStorage &); - + #ifndef NDEBUG void getDRAM_IOMap(std::unordered_map &) const override; - void dramIOCheck(std::vector &) const override; - void setCrashFromCommit(unsigned int *throw_op_count_ptr) override; - - void checkCrashFromCommit(); + void dramIOCheck(std::vector &) const override; + // write into the validation buffer only void writeForValidation(std::uint64_t address, StateNumType state_num, std::size_t size, void *buffer); #endif @@ -206,7 +204,8 @@ DB0_PACKED_END #endif static DRAM_IOStream init(DRAM_IOStream &&, DRAM_ChangeLogStreamT &, StorageFlags); - static std::unique_ptr init(std::unique_ptr &&, DRAM_ChangeLogStreamT *, StorageFlags); + static std::unique_ptr initExt(std::unique_ptr &&, DRAM_ChangeLogStreamT *, + StorageFlags, std::optional max_state_num); static MetaIOStream init(MetaIOStream &&, StorageFlags); @@ -269,6 +268,9 @@ DB0_PACKED_END // Flush ext-space streams only (if existing) bool flushExt(StateNumType max_state_num); void fsync(); + + // Synchronization state number for ext-space + std::optional getMaxExtStateNum() const; }; } \ No newline at end of file diff --git a/src/dbzero/core/storage/BaseStorage.cpp b/src/dbzero/core/storage/BaseStorage.cpp index 9696ba65..027f6567 100644 --- a/src/dbzero/core/storage/BaseStorage.cpp +++ b/src/dbzero/core/storage/BaseStorage.cpp @@ -51,10 +51,6 @@ namespace db0 void BaseStorage::dramIOCheck(std::vector &) const { } - - void BaseStorage::setCrashFromCommit(unsigned int *) - { - } #endif void BaseStorage::beginCommit() { diff --git a/src/dbzero/core/storage/BaseStorage.hpp b/src/dbzero/core/storage/BaseStorage.hpp index fc1aabdc..2d2afbf3 100644 --- a/src/dbzero/core/storage/BaseStorage.hpp +++ b/src/dbzero/core/storage/BaseStorage.hpp @@ -150,10 +150,7 @@ namespace db0 }; virtual void getDRAM_IOMap(std::unordered_map &) const; - virtual void dramIOCheck(std::vector &) const; - - // Activate throw from commit after specific number of operations (for testing purposes) - virtual void setCrashFromCommit(unsigned int *op_count_ref); + virtual void dramIOCheck(std::vector &) const; #endif protected: diff --git a/src/dbzero/core/storage/BlockIOStream.cpp b/src/dbzero/core/storage/BlockIOStream.cpp index 7942a63e..7dfa28df 100644 --- a/src/dbzero/core/storage/BlockIOStream.cpp +++ b/src/dbzero/core/storage/BlockIOStream.cpp @@ -376,12 +376,15 @@ namespace db0 return false; } - void BlockIOStream::flush() + void BlockIOStream::flush(bool no_fsync) { // flush modified block to disk if (flushModified()) { assert(m_access_type == AccessType::READ_WRITE); m_file.flush(); + if (!no_fsync) { + m_file.fsync(); + } } } diff --git a/src/dbzero/core/storage/BlockIOStream.hpp b/src/dbzero/core/storage/BlockIOStream.hpp index 61167403..0ba500ac 100644 --- a/src/dbzero/core/storage/BlockIOStream.hpp +++ b/src/dbzero/core/storage/BlockIOStream.hpp @@ -135,9 +135,10 @@ DB0_PACKED_END * @return true if the tail block's contents changed */ bool refresh(); - - void flush(); - + + // @param no_fsync if true then skip fsync after flush (required only in rare cases) + void flush(bool no_fsync = true); + void close(); /** diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index 4b261031..cae3ede0 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -12,6 +12,7 @@ #include "ChangeLogIOStream.hpp" #include #include +#include namespace db0 @@ -23,7 +24,7 @@ namespace db0 , m_dram_page_size(dram_page_size) , m_chunk_size(dram_page_size + o_dram_chunk_header::sizeOf()) , m_prefix(std::make_shared(m_dram_page_size)) - , m_allocator(std::make_shared(m_dram_page_size)) + , m_allocator(std::make_shared(m_dram_page_size)) { } @@ -34,10 +35,20 @@ namespace db0 , m_reusable_chunks(std::move(other.m_reusable_chunks)) , m_page_map(std::move(other.m_page_map)) , m_prefix(other.m_prefix) - , m_allocator(other.m_allocator) + , m_allocator(other.m_allocator) { } + void DRAM_IOStream::trashDRAMPage(std::uint64_t address) + { + assert(m_access_type == AccessType::READ_WRITE); + // mark as reusable + m_reusable_chunks.insert(address); + auto raw_block = getTrashDRAMPage(); + writeToChunk(address, raw_block.data(), raw_block.size()); + ++m_rand_ops; + } + void *DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, const o_dram_chunk_header &header, StateNumType max_state_num) { @@ -92,7 +103,7 @@ namespace db0 } } - void DRAM_IOStream::load(DRAM_ChangeLogStreamT &changelog_io) + void DRAM_IOStream::load(DRAM_ChangeLogStreamT &changelog_io, std::optional max_state_num) { // Exhaust the change-log stream first and retrieve the last valid state number // its position marks the synchronization point @@ -104,12 +115,14 @@ namespace db0 auto last_chunk_ptr = changelog_io.getLastChangeLogChunk(); if (!last_chunk_ptr) { - // no changes to load + // no data to load return; } - - // The last known consistent state number - auto max_state_num = last_chunk_ptr->m_state_num; + + // The last known consistent state number (unless explicitly provided) + if (!max_state_num) { + max_state_num = last_chunk_ptr->m_state_num; + } std::unordered_set allocs; for (;;) { auto block_id = tellBlock(); @@ -125,11 +138,21 @@ namespace db0 } // NOTE: ignore invalid or incomplete DRAM chunks - // this is fine since filesystem writes are not assumed atomic - this chunk is too fresh to be included - if (!isDRAM_ChunkValid(m_dram_page_size, header, bytes, buffer.data() + buffer.size())) { + // this does not automatically indicate any error - it may occur filesystem writes are not assumed atomic + // - this chunk might simply be too fresh to be included + // NOTE: also pages from future (abruptly terminated) transactions are reverted + if (!isDRAM_ChunkValid(m_dram_page_size, header, bytes, buffer.data() + buffer.size()) + || header.m_state_num > *max_state_num) + { + // overwrite the page to prevent from being included in the future + // this is only permitted in read/write mode !! + if (m_access_type == AccessType::READ_WRITE) { + trashDRAMPage(chunk_addr); + } continue; } - updateDRAMPage(chunk_addr, &allocs, header, bytes, max_state_num); + + updateDRAMPage(chunk_addr, &allocs, header, bytes, *max_state_num); } m_allocator->update(allocs); } @@ -226,6 +249,13 @@ namespace db0 // update to the last known page location, collect previous location as reusable update_page_location(page_num, chunk_addr); } +#ifndef NDEBUG + if (Settings::__dram_io_flush_poison == 1) { + // flush / fsync before poisoned op (to purpusefully corrupt data) + BlockIOStream::flush(false); + } + checkPoisonedOp(Settings::__dram_io_flush_poison); +#endif }); // flush all DRAM data updates before changelog updates @@ -322,6 +352,15 @@ namespace db0 BlockIOStream::close(); } + std::vector DRAM_IOStream::getTrashDRAMPage() const + { + std::vector raw_block; + auto buffer = prepareChunk(m_chunk_size, raw_block); + // initialize header as invalid + o_dram_chunk_header::__new(buffer); + return raw_block; + } + #ifndef NDEBUG void DRAM_IOStream::getDRAM_IOMap(std::unordered_map > &io_map) const { @@ -404,7 +443,7 @@ namespace db0 THROWF(db0::IOException) << "isDRAM_ChunkValid: invalid chunk size"; } // determine if valid by comparing header hash values (calculated vs stored) - return header.calculateHash(data_begin, dram_page_size) == header.m_hash; + return !!header && header.calculateHash(data_begin, dram_page_size) == header.m_hash; } bool isDRAM_ChunkValid(std::uint32_t dram_page_size, const std::vector &buffer) @@ -457,5 +496,5 @@ namespace db0 bool o_dram_chunk_header::operator!() const { return m_state_num == 0 && m_page_num == 0 && m_hash == 0; } - + } \ No newline at end of file diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index 8d45627b..994f0456 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -156,8 +156,9 @@ DB0_PACKED_END /** * Exhaust the entire change-log (to mark synchronization point) * then load entire contents from stream into the DRAM Storage + * @param max_state_num optional state number to sync up to */ - void load(DRAM_ChangeLogStreamT &); + void load(DRAM_ChangeLogStreamT &, std::optional max_state_num = std::nullopt); std::size_t getChunkSize() const { return m_chunk_size; @@ -186,9 +187,13 @@ DB0_PACKED_END void updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num); + // Overwrite invalid or corrupted DRAM page with null data + void trashDRAMPage(std::uint64_t address); + // the number of random write operations performed while flushing updates std::uint64_t m_rand_ops = 0; + std::vector getTrashDRAMPage() const; std::ostream &dumpPageMap(std::ostream &os) const; }; diff --git a/src/dbzero/core/storage/MetaIOStream.hpp b/src/dbzero/core/storage/MetaIOStream.hpp index ae241ef8..7d4a1cda 100644 --- a/src/dbzero/core/storage/MetaIOStream.hpp +++ b/src/dbzero/core/storage/MetaIOStream.hpp @@ -64,7 +64,7 @@ DB0_PACKED_END std::uint32_t block_size, std::function tail_function = {}, AccessType = AccessType::READ_WRITE, bool maintain_checksums = false, std::size_t step_size = 16 << 20); - // Check the underlying managed streams and append the meta log if needed (step size is reached) + // Check the underlying managed streams and append the meta log if needed (i.e. if step size is reached) void checkAndAppend(StateNumType state_num); /** diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index a1312cd8..e866ae0c 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -50,7 +50,7 @@ namespace db0 return chunk_buf; } - void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, + std::optional copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog) { using DRAM_ChangeLogT = DRAM_IOStream::DRAM_ChangeLogT; @@ -69,7 +69,7 @@ namespace db0 auto last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk(); if (!last_chunk_ptr) { // looks like the DRAM IO is empty - return; + return {}; } // retrieve the state number candidate @@ -81,7 +81,7 @@ namespace db0 // they will be processed later when following up with the changelog std::unordered_map chunk_addr_map; auto dram_page_size = input_io.getDRAMPageSize(); - auto dram_filter = [&](const o_dram_chunk_header &header, const void *data_end) + auto dram_filter = [&](const o_dram_chunk_header &header, const void *data_end) -> bool { if (!isDRAM_ChunkValid(dram_page_size, header, header.getData(), data_end)) { return false; @@ -93,7 +93,13 @@ namespace db0 return true; }; - copyStream(input_io, output_io, &chunk_addr_map, dram_filter); + auto chunk_filter = [&](const std::vector &buffer, const void *data_end) -> bool + { + const auto &header = o_dram_chunk_header::__const_ref(buffer.data()); + return dram_filter(header, data_end); + }; + + copyStream(input_io, output_io, &chunk_addr_map, chunk_filter); // Chunks loaded during the sync step // NOTE: in this step we prefetch to memory to be able to catch up with changes @@ -119,10 +125,11 @@ namespace db0 // append the sentinel entry with state number only (i.e. empty changelog) output_dram_changelog.appendChangeLog({}, state_num); output_io.close(); + return state_num; } std::vector copyStream(BlockIOStream &in, BlockIOStream &out, - std::unordered_map *addr_map, DRAM_FilterT filter) + std::unordered_map *addr_map, ChunkFilterT filter, bool copy_all) { // position at the beginning of the stream in.setStreamPosHead(); @@ -131,10 +138,13 @@ namespace db0 std::uint64_t in_addr, out_addr; for (;;) { while ((chunk_size = in.readChunk(buffer, 0, &in_addr)) > 0) { - // NOTE: this buffer does NOT include the block IO header at the beginning - const auto &header = o_dram_chunk_header::__const_ref(buffer.data()); - if (filter && !filter(header, buffer.data() + buffer.size())) { - // skip this chunk + // NOTE: this buffer does NOT include the block IO header at the beginning + if (filter && !filter(buffer, buffer.data() + chunk_size)) { + // stop copying entirely + if (!copy_all) { + break; + } + // skip this chunk only continue; } @@ -149,8 +159,8 @@ namespace db0 if (db0::Settings::__sleep_interval > 0) { std::this_thread::sleep_for( std::chrono::milliseconds(db0::Settings::__sleep_interval)); - } -#endif + } +#endif } if (!in.refresh()) { // continue refreshing until reaching the most recent state @@ -161,9 +171,28 @@ namespace db0 return buffer; } - std::optional copyDPStream(DP_ChangeLogStreamT &in, DP_ChangeLogStreamT &out) + std::optional copyDPStream(DP_ChangeLogStreamT &in, DP_ChangeLogStreamT &out, + StateNumType max_state_num) { - auto last_chunk_buf = copyStream(in, out); + using DP_ChangeLogT = DP_ChangeLogStreamT::ChangeLogT; + auto chunk_filter = [&](const std::vector &buffer, const void *data_end) -> bool + { + const auto &header = DP_ChangeLogT::__const_ref(buffer.data()); + // only include chunks up to max_state_num + if (header.m_state_num == max_state_num) { + // NOTE: this is the last chunk, we include it and stop further copying + auto chunk_size = (char*)data_end - buffer.data(); + out.addChunk(chunk_size); + out.appendToChunk(buffer.data(), chunk_size); + return false; + } + + return header.m_state_num < max_state_num; + }; + + // NOTE: we use copy_all = false to stop on the first non-matching chunk + // since chunks are ordered by state number + auto last_chunk_buf = copyStream(in, out, nullptr, chunk_filter, false); // we can retrieve the end page number from the last appended chunk if (last_chunk_buf.empty()) { // nothing copied diff --git a/src/dbzero/core/storage/copy_prefix.hpp b/src/dbzero/core/storage/copy_prefix.hpp index 53171944..173acf4c 100644 --- a/src/dbzero/core/storage/copy_prefix.hpp +++ b/src/dbzero/core/storage/copy_prefix.hpp @@ -15,29 +15,35 @@ namespace db0 { - class ExtSpace; + class ExtSpace; using DRAM_ChangeLogStreamT = db0::ChangeLogIOStream; using DP_ChangeLogStreamT = db0::ChangeLogIOStream; // This routine copies the entire DRAM_IO stream (from begin) in a manner // synchronized with the correspoding changelog stream // NOTE: output_changelog is NOT flushed (see the design) - void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, + // @return the finalal copied state number (unless nothing was copied - then std::nullopt) + std::optional copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog); + using ChunkFilterT = std::function &chunk_buffer, const void *data_end)>; using DRAM_FilterT = std::function; // Copy entire contents from one BlockIOStream to another (type agnostic) // @param addr_map optional map to receive address translation (from source to destination) // @param chunk_filter optional filter to decide whether a specific chunk is to be copied + // @param copy_all if true then all chunks are copy attempted, otherwise will stop copying on first non-matched, + // this parameter only makes sense when chunk_filter is provided // @return the last copied chunk data - std::vector copyStream(BlockIOStream &in, BlockIOStream &out, + std::vector copyStream(BlockIOStream &in, BlockIOStream &out, std::unordered_map *addr_map = nullptr, - DRAM_FilterT chunk_filter = {}); + ChunkFilterT chunk_filter = {}, bool copy_all = true); // DP-changelog specialization + // @param max_state_num the maximum state number to be copied (inclusive) // @return the last chunk's header (if anything copied) - std::optional copyDPStream(DP_ChangeLogStreamT &in, DP_ChangeLogStreamT &out); + std::optional copyDPStream(DP_ChangeLogStreamT &in, DP_ChangeLogStreamT &out, + StateNumType max_state_num); // Copy raw contents of a specific Page_IO up to a specific storage page number // @param in the input (source) Page_IO (must NOT define ext-space - i.e. absolute / relative mapping) diff --git a/src/dbzero/workspace/Config.cpp b/src/dbzero/workspace/Config.cpp index da5f0b35..03c2b238 100644 --- a/src/dbzero/workspace/Config.cpp +++ b/src/dbzero/workspace/Config.cpp @@ -25,6 +25,10 @@ namespace db0 return m_lang_config; } + bool Config::hasKey(const std::string &key) const { + return LangToolkit::hasKey(m_lang_config.get(), key); + } + // long specialization template <> std::optional get(typename LangToolkit::ObjectPtr lang_dict, const std::string &key) { @@ -43,6 +47,16 @@ namespace db0 } return LangToolkit::getUnsignedLongLong(lang_dict, key); } + + // unsigned int specialization + template <> std::optional get( + typename LangToolkit::ObjectPtr lang_dict, const std::string &key) + { + if (!lang_dict) { + return std::nullopt; + } + return LangToolkit::getUnsignedInt(lang_dict, key); + } // bool specialization template <> std::optional get(typename LangToolkit::ObjectPtr lang_dict, const std::string &key) diff --git a/src/dbzero/workspace/Config.hpp b/src/dbzero/workspace/Config.hpp index eb519bea..a3590b3e 100644 --- a/src/dbzero/workspace/Config.hpp +++ b/src/dbzero/workspace/Config.hpp @@ -33,7 +33,9 @@ namespace db0 auto value = get(key); return value ? *value : default_value; } - + + bool hasKey(const std::string &key) const; + private: ObjectSharedPtr m_lang_config; }; diff --git a/src/dbzero/workspace/Workspace.cpp b/src/dbzero/workspace/Workspace.cpp index 9d7537bb..18ff797b 100644 --- a/src/dbzero/workspace/Workspace.cpp +++ b/src/dbzero/workspace/Workspace.cpp @@ -801,17 +801,7 @@ namespace db0 bool Workspace::isMutable() const { return true; } - -#ifndef NDEBUG - void Workspace::setCrashFromCommit(unsigned int op_count) - { - m_throw_op_count = op_count; - for (auto &item: m_fixtures) { - item.second->getPrefix().getStorage().setCrashFromCommit(&m_throw_op_count); - } - } -#endif - + std::size_t Workspace::size() const { return m_fixtures.size(); } diff --git a/src/dbzero/workspace/Workspace.hpp b/src/dbzero/workspace/Workspace.hpp index f4746d99..b6e27ebd 100644 --- a/src/dbzero/workspace/Workspace.hpp +++ b/src/dbzero/workspace/Workspace.hpp @@ -146,9 +146,10 @@ namespace db0 class Workspace: protected BaseWorkspace, public Snapshot { public: - static constexpr std::uint32_t DEFAULT_AUTOCOMMIT_INTERVAL_MS = 250; + // NOTE: the default autocommit intarval (367ms) allows 50 years of continuous operation with 32-bit state numbers + static constexpr std::uint32_t DEFAULT_AUTOCOMMIT_INTERVAL_MS = 367; static constexpr std::size_t DEFAULT_VOBJECT_CACHE_SIZE = 16384; - + Workspace(const std::string &root_path = "", std::optional cache_size = {}, std::optional slab_cache_size = {}, std::optional flush_size = {}, std::optional vobject_cache_size = {}, @@ -300,12 +301,7 @@ namespace db0 // End a specific locked section, callback will be notified with all mutated fixtures void endLocked(unsigned int, std::function callback); - -#ifndef NDEBUG - // Activate throw from Storage::commit after specific number of operations (for testing purposes) - void setCrashFromCommit(unsigned int op_count); -#endif - + protected: friend class WorkspaceView; @@ -337,10 +333,6 @@ namespace db0 std::unordered_map > > m_locked_section_log; // this is to prevent recursive cleanups (which might result in a deadlock) mutable std::atomic m_cleanup_pending = false; -#ifndef NDEBUG - // see setCrashFromCommit - unsigned int m_throw_op_count = 0; -#endif void forEachMemspace(std::function callback) override;