diff --git a/dbzero/dbzero/dbzero.py b/dbzero/dbzero/dbzero.py index 21899e3d..c9e4f4dc 100644 --- a/dbzero/dbzero/dbzero.py +++ b/dbzero/dbzero/dbzero.py @@ -10,7 +10,7 @@ def load_dynamic(name, path): def __bootstrap__(): global __bootstrap__, __loader__, __file__ - paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"] + paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"] __file__ = None for path in paths: if os.path.isdir(path): diff --git a/python_tests/test_copy_prefix.py b/python_tests/test_copy_prefix.py index 25815f39..10b26c33 100644 --- a/python_tests/test_copy_prefix.py +++ b/python_tests/test_copy_prefix.py @@ -211,7 +211,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): # restore the copy os.rename(file_name, px_path) - print(f"--- Validating copy {copy_id}") + print(f"--- Validating copy {copy_id}", flush=True) db0.init(DB0_DIR, prefix=px_name, read_write=False) result = validate_current_prefix(expected_len, expected_min_len) db0.close() @@ -224,7 +224,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): epoch_count = 2 total_len = 0 for epoch in range(epoch_count): - print(f"=== Epoch {epoch} ===") + print(f"=== Epoch {epoch} ===", flush=True) obj_count = 5000 commit_count = 100 # start the writer process for a long run @@ -246,16 +246,16 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): copy_id = 0 # copy the prefix multiple times while it is being modified - while True: + while True: if not p.is_alive(): break file_name = f"./test-copy-{copy_id}.db0" if os.path.exists(file_name): os.remove(file_name) # copy prefix without opening it, use default step size - print("--- Copying prefix iteration", copy_id) + print("--- Copying prefix iteration", copy_id, flush=True) db0.copy_prefix(file_name, prefix=px_name) - print("--- copy finished") + print("--- copy finished", flush=True) copy_id += 1 if not p.is_alive(): break @@ -265,8 +265,8 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): total_len += obj_count * commit_count # validate original prefix (no copy yet) - # print("Validating final prefix ...", flush=True) - # validate_current_prefix(expected_len = total_len) + print("Validating final prefix ...", flush=True) + validate_current_prefix(expected_len = total_len) # make final stale copy (i.e. without active modifications) final_copy = f"./test-copy-final.db0" @@ -275,11 +275,11 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None): db0.copy_prefix(final_copy, prefix=px_name) db0.close() - print("Validating all copies") + print("Validating all copies", flush=True) validate_copy("final", expected_len = total_len) for i in range(copy_id): last_len = validate_copy(i, expected_min_len = last_len) - print(f"--- Copy {i} valid with {last_len} objects") + print(f"--- Copy {i} valid with {last_len} objects", flush=True) # this is the restored version total_len = last_len diff --git a/python_tests/test_refresh.py b/python_tests/test_refresh.py index 6150a47d..6978c338 100644 --- a/python_tests/test_refresh.py +++ b/python_tests/test_refresh.py @@ -6,6 +6,7 @@ import time import asyncio import dbzero as db0 +import os from .conftest import DB0_DIR from .memo_test_types import DynamicDataClass, DynamicDataSingleton, MemoTestClass, MemoTestSingleton @@ -552,3 +553,143 @@ async def test_async_wait_for_updates(db0_fixture): p.terminate() p.join() + + +def append_to_prefix(prefix, obj_count = 50, commit_count = 50, long_run = False): + db0.init(DB0_DIR) + db0.open(prefix, "rw") + # create new or open an existing root object + root = MemoTestSingleton([]) + if (len(root.value) > 0): + print(f"Writer process: opened existing prefix with {len(root.value)} objects") + for i in range(commit_count): + for _ in range(obj_count): + root.value.append(MemoTestClass("b" * 1024)) # 1 KB string + db0.commit() + if long_run: + print(f"Writer process: committed {(i + 1) * obj_count} objects", flush=True) + else: + time.sleep(0.1) + + if long_run: + print(db0.get_storage_stats(), flush=True) + db0.close() + + +@pytest.mark.stress_test +def test_refresh_prefix_continuous_process_with_snapshot(db0_fixture): + px_name = db0.get_current_prefix().name + + def validate_current_prefix(expected_len = None, expected_min_len = None): + snap = db0.snapshot() + root = snap.fetch(MemoTestSingleton) + assert not expected_min_len or len(root.value) >= expected_min_len + assert not expected_len or len(root.value) == expected_len + for item in root.value: + assert item.value == "b" * 1024 + return len(root.value) + + db0.close() + + # in each 'epoch' we modify prefix while making copies + # then drop the original prefix and restore if from the last copy + epoch_count = 2 + total_len = 0 + for epoch in range(epoch_count): + print(f"=== Epoch {epoch} ===") + obj_count = 5000 + commit_count = 100 + # start the writer process for a long run + p = multiprocessing.Process(target=append_to_prefix, args=(px_name, obj_count, commit_count, True)) + p.start() + + db0.init(DB0_DIR) + db0.open(px_name, "r") + last_len = 0 + while True: + try: + root = db0.fetch(MemoTestSingleton) + if len(root.value) > 1: + last_len = len(root.value) + break + except Exception: + pass + time.sleep(0.1) + + # validate prefix while writer is actively modifying it + while True: + if not p.is_alive(): + break + print("--- Validate prefix iteration", flush=True) + last_len = validate_current_prefix(expected_min_len = last_len) + print(f"--- Prefix valid with {last_len} objects", flush=True) + if not p.is_alive(): + break + time.sleep(2.5) # wait a bit before next copy + + p.join() + total_len += obj_count * commit_count + + print("Validating final prefix ...", flush=True) + validate_current_prefix(expected_len = total_len) + db0.close() + + +@pytest.mark.stress_test +@pytest.mark.skip(reason="Test disabled due to issue #605") +# test failing due to issue: https://github.com/dbzero-software/dbzero/issues/605 +def test_refresh_prefix_continuous_process(db0_fixture): + px_name = db0.get_current_prefix().name + + def validate_current_prefix(expected_len = None, expected_min_len = None): + root = db0.fetch(MemoTestSingleton) + assert not expected_min_len or len(root.value) >= expected_min_len + assert not expected_len or len(root.value) == expected_len + for item in root.value: + assert item.value == "b" * 1024 + return len(root.value) + + db0.close() + + # in each 'epoch' we modify prefix while making copies + # then drop the original prefix and restore if from the last copy + epoch_count = 2 + total_len = 0 + for epoch in range(epoch_count): + print(f"=== Epoch {epoch} ===") + obj_count = 5000 + commit_count = 100 + # start the writer process for a long run + p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True)) + p.start() + + db0.init(DB0_DIR) + db0.open(px_name, "r") + last_len = 0 + while True: + try: + root = db0.fetch(MemoTestSingleton) + if len(root.value) > 1: + last_len = len(root.value) + break + except Exception: + pass + time.sleep(0.1) + + # validate prefix while writer is actively modifying it + while True: + if not p.is_alive(): + break + print("--- Validate prefix iteration", flush=True) + last_len = validate_current_prefix(expected_min_len = last_len) + print(f"--- Prefix valid with {last_len} objects", flush=True) + if not p.is_alive(): + break + time.sleep(2.5) # wait a bit before next copy + + p.join() + total_len += obj_count * commit_count + + print("Validating final prefix ...", flush=True) + validate_current_prefix(expected_len = total_len) + db0.close() diff --git a/src/dbzero/core/collections/full_text/FT_IndexIterator.hpp b/src/dbzero/core/collections/full_text/FT_IndexIterator.hpp index 2979aa9f..c5e39f38 100644 --- a/src/dbzero/core/collections/full_text/FT_IndexIterator.hpp +++ b/src/dbzero/core/collections/full_text/FT_IndexIterator.hpp @@ -243,7 +243,7 @@ namespace db0 */ } - template + template typename FT_IndexIterator::iterator &FT_IndexIterator::getIterator() { if (m_is_detached) { diff --git a/src/dbzero/core/storage/ChangeLogIOStream.hpp b/src/dbzero/core/storage/ChangeLogIOStream.hpp index b307e0ee..3b88d6b3 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.hpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.hpp @@ -86,6 +86,16 @@ namespace db0 Writer(ChangeLogIOStream &); void appendChangeLog(const o_change_log_t &); + + template + void appendChangeLog(const ChangeLogData &data, Args&&... args) + { + auto size_of = o_change_log_t::measure(data, std::forward(args)...); + std::vector buffer(size_of); + auto &change_log = o_change_log_t::__new(buffer.data(), data, std::forward(args)...); + appendChangeLog(change_log); + } + void flush(); private: diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index 9c469b19..d1a26133 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -10,6 +10,7 @@ #include #include #include "ChangeLogIOStream.hpp" +#include namespace db0 @@ -43,10 +44,12 @@ namespace db0 // ignore changes beyond the last known consistent state number return nullptr; } - + // page map = page_num / state_num auto dram_page = m_page_map.find(header.m_page_num); - if (dram_page == m_page_map.end() || dram_page->second.m_state_num < header.m_state_num) { + // NOTE: even if the same state number is encountered, the page is updated + // (the previous version might've been incomplete!!) + if (dram_page == m_page_map.end() || header.m_state_num >= dram_page->second.m_state_num) { // update DRAM to most recent page version, page not marked as dirty auto result = m_prefix->update(header.m_page_num, false); if (dram_page == m_page_map.end()) { @@ -121,11 +124,29 @@ namespace db0 THROWF(db0::IOException) << "DRAM_IOStream::load error: unaligned block"; } + // NOTE: ignore invalid or incomplete DRAM chunks + // this is fine since filesystem writes are not assumed atomic - this chunk is too fresh to be included + if (!isDRAM_ChunkValid(m_dram_page_size, header, bytes, buffer.data() + buffer.size())) { + continue; + } updateDRAMPage(chunk_addr, &allocs, header, bytes, max_state_num); } m_allocator->update(allocs); } + std::ostream &DRAM_IOStream::dumpPageMap(std::ostream &os) const + { + std::vector > sorted_pages(m_page_map.begin(), m_page_map.end()); + std::sort(sorted_pages.begin(), sorted_pages.end(), [](const auto &a, const auto &b) { + return a.first < b.first; + }); + for (const auto &item: sorted_pages) { + std::cout << "(" << item.first << ": state_num=" << item.second.m_state_num + << ", address=" << item.second.m_address << "),"; + } + return os; + } + void DRAM_IOStream::flushUpdates(StateNumType state_num, DRAM_ChangeLogStreamT &dram_changelog_io) { if (m_access_type == AccessType::READ_ONLY) { @@ -172,7 +193,7 @@ namespace db0 // add the old page location to reusable addresses m_reusable_chunks.insert(dram_page->second.m_address); } - // update to most recent location + // update to most recent location (and state number) m_page_map[page_num] = { state_num, address }; }; @@ -184,6 +205,7 @@ namespace db0 if (reusable_addr) { reusable_header.m_page_num = page_num; std::memcpy(reusable_header.getData(), page_buffer, m_dram_page_size); + reusable_header.setHash(page_buffer, m_dram_page_size); // overwrite chunk in the reusable block writeToChunk(*reusable_addr, raw_block.data(), raw_block.size()); ++m_rand_ops; @@ -197,6 +219,7 @@ namespace db0 // append data into a new chunk / block addChunk(m_chunk_size, &chunk_addr); o_dram_chunk_header header(state_num, page_num); + header.setHash(page_buffer, m_dram_page_size); appendToChunk(&header, sizeof(header)); appendToChunk(page_buffer, m_dram_page_size); dram_changelog.push_back(chunk_addr); @@ -209,7 +232,7 @@ namespace db0 BlockIOStream::flush(); // output changelog, no RLE encoding, no duplicates ChangeLogData cl_data(std::move(dram_changelog), false, false, false); - dram_changelog_io.appendChangeLog(std::move(cl_data), state_num); + dram_changelog_io.appendChangeLog(std::move(cl_data), state_num); } #ifndef NDEBUG @@ -250,10 +273,14 @@ namespace db0 auto address = item.first; const auto &buffer = item.second; const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf()); + // NOTE: ignore invalid or incomplete DRAM chunks (too fresh to be included) + if (!isDRAM_ChunkValid(m_dram_page_size, header, header.getData(), buffer.data() + buffer.size())) { + continue; + } updateDRAMPage(address, nullptr, header, header.getData(), max_state_num); result = true; } - m_read_ahead_chunks.clear(); + m_read_ahead_chunks.clear(); return result; } @@ -350,7 +377,7 @@ namespace db0 dram_io.readFromChunk(address, buffer.data(), buffer.size()); } change_log_ptr = changelog_io.readChangeLogChunk(); - } + } } return max_state_num; @@ -361,15 +388,53 @@ namespace db0 throw; } } + + bool isDRAM_ChunkValid(std::uint32_t dram_page_size, const o_dram_chunk_header &header, + const void *data_begin, const void *data_end) + { + if (static_cast(data_begin) + dram_page_size > static_cast(data_end)) { + THROWF(db0::IOException) << "isDRAM_ChunkValid: invalid chunk size"; + } + // determine if valid by comparing header hash values (calculated vs stored) + return header.calculateHash(data_begin, dram_page_size) == header.m_hash; + } + + bool isDRAM_ChunkValid(std::uint32_t dram_page_size, const std::vector &buffer) + { + // NOTE: the buffer already includes chunk header + const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf()); + return isDRAM_ChunkValid(dram_page_size, header, header.getData(), buffer.data() + buffer.size()); + } void flushDRAM_IOChanges(DRAM_IOStream &dram_io, - std::unordered_map > &chunks_buf) + const std::unordered_map > &chunks_buf) { + auto dram_page_size = dram_io.getDRAMPrefix().getPageSize(); for (const auto &item: chunks_buf) { auto address = item.first; const auto &buffer = item.second; - dram_io.writeToChunk(address, buffer.data(), buffer.size()); + // NOTE: we don't flush inconsistent / incomplete chunks + if (!isDRAM_ChunkValid(dram_page_size, buffer)) { + continue; + } + dram_io.writeToChunk(address, buffer.data(), buffer.size()); + } + } + + void appendDRAM_IOChunks(DRAM_IOStream &dram_io, const std::vector > &chunks_buf) + { + auto dram_page_size = dram_io.getDRAMPrefix().getPageSize(); + for (const auto &buffer : chunks_buf) { + if (!isDRAM_ChunkValid(dram_page_size, buffer)) { + continue; + } + dram_io.addChunk(buffer.size()); + dram_io.appendToChunk(buffer.data(), buffer.size()); } } + std::uint64_t o_dram_chunk_header::calculateHash(const void *data, std::size_t data_size) const { + return db0::murmurhash64A(data, data_size, m_page_num + m_state_num); + } + } \ No newline at end of file diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index ce41b115..b58a1805 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -30,16 +30,21 @@ namespace db0 DB0_PACKED_BEGIN struct DB0_PACKED_ATTR o_dram_chunk_header: public o_fixed { - std::uint64_t m_state_num = 0; - std::uint64_t m_page_num = 0; - + // hash from the DRAM page contents (header included) + std::uint64_t m_hash = 0; + StateNumType m_state_num = 0; + std::uint64_t m_page_num = 0; + o_dram_chunk_header() = default; - o_dram_chunk_header(std::uint64_t state_num, std::uint64_t page_num = 0) + o_dram_chunk_header(StateNumType state_num, std::uint64_t page_num = 0) : m_state_num(state_num) , m_page_num(page_num) { } + // Calculate hash from the entire block's data (including header) + std::uint64_t calculateHash(const void *data, std::size_t data_size) const; + // calculate data pointer immediately following the header char *getData() { return (char*)this + sizeOf(); @@ -48,6 +53,10 @@ DB0_PACKED_BEGIN const char *getData() const { return (const char*)this + sizeOf(); } + + void setHash(const void *data, std::size_t data_size) { + m_hash = calculateHash(data, data_size); + } }; DB0_PACKED_END @@ -151,6 +160,10 @@ DB0_PACKED_END return m_chunk_size; } + std::uint32_t getDRAMPageSize() const { + return m_dram_page_size; + } + private: const std::uint32_t m_dram_page_size; const std::size_t m_chunk_size; @@ -171,9 +184,16 @@ DB0_PACKED_END const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num); // the number of random write operations performed while flushing updates - std::uint64_t m_rand_ops = 0; + std::uint64_t m_rand_ops = 0; + + std::ostream &dumpPageMap(std::ostream &os) const; }; + // Calculate hash to determine if the dram chunk is valid + bool isDRAM_ChunkValid(std::uint32_t dram_page_size, const std::vector &chunk_data); + bool isDRAM_ChunkValid(std::uint32_t dram_page_size, const o_dram_chunk_header &header, const void *data_begin, + const void *data_end); + // Pre-fetch changes into the chunks buffer // @param callback optional function to be called for each changelog chunk read // @return the latest state number of available changes @@ -184,6 +204,10 @@ DB0_PACKED_END // Flush changes from the buffer void flushDRAM_IOChanges(DRAM_IOStream &dram_io, - std::unordered_map > &chunks_buf); + const std::unordered_map > &chunks_buf); + + // Append all chunks from the buffer + void appendDRAM_IOChunks(DRAM_IOStream &dram_io, + const std::vector > &chunks_buf); } \ No newline at end of file diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index 02667b6f..e36c3e8a 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -8,44 +8,95 @@ namespace db0 { + // chunk buffer + append buffer + using ChunkBufPair = std::pair< + std::unordered_map >, + std::vector > >; + + ChunkBufPair translateDRAM_Chunks( + const std::unordered_map > &&chunk_buf, + const std::unordered_map &addr_map) + { + ChunkBufPair result; + for (const auto &pair : chunk_buf) { + auto it_addr = addr_map.find(pair.first); + if (it_addr == addr_map.end()) { + // not present in the copied stream, must be appended + result.second.emplace_back(std::move(pair.second)); + } else { + // register under translated address + result.first.emplace(it_addr->second, std::move(pair.second)); + } + } + return result; + } + void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog) { using DRAM_ChangeLogT = DRAM_IOStream::DRAM_ChangeLogT; // Exhaust the input_dram_changelog first - input_dram_changelog.setStreamPosHead(); - auto change_log_ptr = input_dram_changelog.readChangeLogChunk(); - - while (change_log_ptr) { - output_dram_changelog.appendChangeLog(*change_log_ptr); - change_log_ptr = input_dram_changelog.readChangeLogChunk(); + // NOTE: we don't need to copy the changelog, just insert an empty item with the latest state number + input_dram_changelog.setStreamPosHead(); + for (;;) { + while (input_dram_changelog.readChangeLogChunk()); + // continue refreshing until reaching the most recent state + if (!input_dram_changelog.refresh()) { + break; + } } // Copy the entire DRAM_IO stream next (possibly inconsistent state) - copyStream(input_io, output_io); + // collecting the mapping of chunk addresses + std::unordered_map chunk_addr_map; + copyStream(input_io, output_io, &chunk_addr_map); - // Callback to copy additional changelog chunks retrieved on post-sync - auto copy_callback = [&](const DRAM_ChangeLogT &change_log) { - output_dram_changelog.appendChangeLog(change_log); - }; - // Chunks loaded during the sync step // NOTE: in this step we prefetch to memory to be able to catch up with changes std::unordered_map > chunk_buf; - fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf, copy_callback); - flushDRAM_IOChanges(output_io, chunk_buf); + while (input_dram_changelog.refresh()) { + fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf); + } + auto last_chunk_ptr = input_dram_changelog.getLastChangeLogChunk(); + if (!last_chunk_ptr) { + // looks like the DRAM IO is empty + return; + } + + // this is the actually copied last consistent state number + auto state_num = last_chunk_ptr->m_state_num; + + // NOTE: flush must be done under translated addresses (or appended to stream if translation not present) + auto bufs_pair = translateDRAM_Chunks(std::move(chunk_buf), chunk_addr_map); + flushDRAM_IOChanges(output_io, bufs_pair.first); + // append new chuks which were not present during the initial copy + appendDRAM_IOChunks(output_io, bufs_pair.second); + // append the sentinel entry with state number only (i.e. empty changelog) + output_dram_changelog.appendChangeLog({}, state_num); } - std::vector copyStream(BlockIOStream &in, BlockIOStream &out) + std::vector copyStream(BlockIOStream &in, BlockIOStream &out, + std::unordered_map *addr_map) { // position at the beginning of the stream in.setStreamPosHead(); std::vector buffer; std::size_t chunk_size = 0; - while ((chunk_size = in.readChunk(buffer)) > 0) { - out.addChunk(chunk_size); - out.appendToChunk(buffer.data(), chunk_size); + std::uint64_t in_addr, out_addr; + for (;;) { + while ((chunk_size = in.readChunk(buffer, 0, &in_addr)) > 0) { + out.addChunk(chunk_size, &out_addr); + out.appendToChunk(buffer.data(), chunk_size); + // register the mapping + if (addr_map) { + addr_map->emplace(in_addr, out_addr); + } + } + if (!in.refresh()) { + // continue refreshing until reaching the most recent state + break; + } } out.flush(); return buffer; diff --git a/src/dbzero/core/storage/copy_prefix.hpp b/src/dbzero/core/storage/copy_prefix.hpp index 8c0dd8f3..a79866df 100644 --- a/src/dbzero/core/storage/copy_prefix.hpp +++ b/src/dbzero/core/storage/copy_prefix.hpp @@ -26,8 +26,10 @@ namespace db0 DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog); // Copy entire contents from one BlockIOStream to another (type agnostic) + // @param addr_map optional map to receive address translation (from source to destination) // @return the last copied chunk data - std::vector copyStream(BlockIOStream &in, BlockIOStream &out); + std::vector copyStream(BlockIOStream &in, BlockIOStream &out, + std::unordered_map *addr_map = nullptr); // DP-changelog specialization // @return the last chunk's header (if anything copied) diff --git a/src/dbzero/workspace/Fixture.cpp b/src/dbzero/workspace/Fixture.cpp index b546a30e..e95657ef 100644 --- a/src/dbzero/workspace/Fixture.cpp +++ b/src/dbzero/workspace/Fixture.cpp @@ -184,9 +184,9 @@ namespace db0 if (timer_ptr) { timer = std::make_unique("Fixture::close", timer_ptr); } - + // clear cache to destroy object instances supported by the cache - // this has to be done before commit (to not commit unrefereced objects) + // this has to be done before commit (to not commit unrefereced objects) m_lang_cache.clear(true, as_defunct); // auto-commit before closing @@ -200,7 +200,7 @@ namespace db0 getGC0().flushAllOf(Memspace::getForFlush()); } - // clear lang cache again since flush might've released some Python instances + // clear lang cache again since flush might've released some Python instances m_lang_cache.clear(true); // lock for exclusive access @@ -387,7 +387,7 @@ namespace db0 // Flush using registered flush handlers for (auto &handler: m_flush_handlers) { handler(); - } + } m_lang_cache.clear(true); // lock for exclusive access {