From 6d94d2d169e1bb2d7e85ea65a64920e459f49ef6 Mon Sep 17 00:00:00 2001 From: Adrian Zawadzki Date: Fri, 21 Nov 2025 21:48:29 +0100 Subject: [PATCH 1/2] added stress tests for pipeline --- .github/workflows/build-and-test.yml | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 1e27a2f7..ec80ad75 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -69,4 +69,32 @@ jobs: - name: Run tests run: | python -m pytest -m 'not integration_test' -m 'not stress_test' -c pytest.ini --capture=no -vv + stress-tests-wheels-linux: + runs-on: ubuntu-latest + timeout-minutes: 15 + needs: build-linux + strategy: + fail-fast: false + matrix: + python-version: ["3.12"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Download wheel artifact + uses: actions/download-artifact@v4 + with: + name: wheels-linux-${{ matrix.python-version }} + path: ./wheels/ + - name: Install wheel and dependencies + run: | + pip install pytest + pip install -r requirements.txt + pip install ./wheels/*.whl + - name: Run tests + run: | + python3 -m pytest -m 'stress_test' -c pytest.ini --capture=no -vv -s From 87f0a6e81af29b669eb7a921cb19f63863409f34 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sat, 22 Nov 2025 20:55:49 +0100 Subject: [PATCH 2/2] DRAM IO initialization in read-only mode improvements --- src/dbzero/core/storage/BDevStorage.cpp | 17 +++++-------- src/dbzero/core/storage/DRAM_IOStream.cpp | 30 ++++++++++++++++------- src/dbzero/core/storage/DRAM_IOStream.hpp | 18 ++++++++------ 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index a2a7c01b..ef809c97 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -40,12 +40,14 @@ namespace db0 , m_dp_changelog_io(getChangeLogIOStream(m_config.m_dp_changelog_io_offset, access_type)) , m_meta_io(init(getMetaIOStream(m_config.m_meta_io_offset, meta_io_step_size.value_or(DEFAULT_META_IO_STEP_SIZE), access_type))) - , m_dram_io(init(getDRAMIOStream(m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io)) + , m_dram_io(init(getDRAMIOStream(m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io)) , m_sparse_pair(m_dram_io.getDRAMPair(), access_type) , m_sparse_index(m_sparse_pair.getSparseIndex()) , m_diff_index(m_sparse_pair.getDiffIndex()) , m_page_io(getPage_IO(m_sparse_pair.getNextStoragePageNum(), access_type)) { + // in read-only mode need to refresh in order to retrieve a consitent DRAM state + // since other process might be actively modifying the underlying file if (m_access_type == AccessType::READ_ONLY) { refresh(); } @@ -56,15 +58,8 @@ namespace db0 } DRAM_IOStream BDevStorage::init(DRAM_IOStream &&dram_io, ChangeLogIOStream &dram_change_log) - { - if (dram_io.getAccessType() == AccessType::READ_WRITE) { - // simply exhaust the change-log stream - while (dram_change_log.readChangeLogChunk()); - } else { - // apply all changes from the change-log - dram_io.beginApplyChanges(dram_change_log); - dram_io.completeApplyChanges(); - } + { + dram_io.load(dram_change_log); return std::move(dram_io); } @@ -81,7 +76,7 @@ namespace db0 m_file.read(0, buffer.size(), buffer.data()); auto &config = o_prefix_config::__const_ref(buffer.data()); if (config.m_magic != o_prefix_config::DB0_MAGIC) { - THROWF(db0::IOException) << "Invalid DB0 file: " << m_file.getName(); + THROWF(db0::IOException) << "Not a dbzero file: " << m_file.getName(); } return config; } diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index 6648d031..5d5bd600 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -20,11 +20,6 @@ namespace db0 , m_prefix(std::make_shared(m_dram_page_size)) , m_allocator(std::make_shared(m_dram_page_size)) { - // load only allowed in read/write mode because otherwise the reader might - // fetch inconsistent state - if (m_access_type == AccessType::READ_WRITE) { - load(); - } } DRAM_IOStream::DRAM_IOStream(DRAM_IOStream &&other) @@ -86,9 +81,14 @@ namespace db0 } } - void DRAM_IOStream::load() + void DRAM_IOStream::load(ChangeLogIOStream &changelog_io) { assert(m_access_type == AccessType::READ_WRITE); + + // simply exhaust the change-log stream + // its position marks the synchronization point + while (changelog_io.readChangeLogChunk()); + std::vector buffer(m_chunk_size, 0); const auto &header = o_dram_chunk_header::__ref(buffer.data()); auto bytes = buffer.data() + header.sizeOf(); @@ -164,7 +164,7 @@ namespace db0 m_page_map[page_num] = { state_num, address }; }; - // flush all changes done to DRAM Prerfix (append modified pages only) + // flush all changes done to DRAM Prefix (append modified pages only) std::vector dram_changelog; m_prefix->flushDirty([&, this](std::uint64_t page_num, const void *page_buffer) { // the last page must be stored in a new block to mark end of the sequence @@ -235,8 +235,8 @@ namespace db0 // Note that change log and the data chunks may be updated by other process while we read it // the consistent state is only guaranteed after reaching end of the stream auto change_log_ptr = changelog_io.readChangeLogChunk(); - - // first collect the change log to only visit each address once + + // First collect the change log to only visit each address once while (change_log_ptr) { for (auto address: *change_log_ptr) { if (m_addr_set.find(address) == m_addr_set.end()) { @@ -251,6 +251,18 @@ namespace db0 } change_log_ptr = changelog_io.readChangeLogChunk(); } + + // Visit the addresses next + // this is important becase otherwise we might've been accessing outdated or inconsistent DPs + for (auto address: m_addr_set) { + // buffer must include BlockIOStream's chunk header and data + auto &buffer = createReadAheadBuffer(address, m_chunk_size + o_block_io_chunk_header::sizeOf()); + // the address reported in changelog must already be available in the stream + // it may come from a more recent update as well (and potentially may only be partially written) + // therefore chunk-level checksum validation is necessary + BlockIOStream::readFromChunk(address, buffer.data(), buffer.size()); + } + } catch (db0::IOException &) { changelog_io.setStreamPos(stream_pos); m_read_ahead_chunks.clear(); diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index 5706d1a6..6c6ccda2 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -117,7 +117,7 @@ DB0_PACKED_BEGIN std::size_t getAllocatedSize() const; const DRAM_Prefix &getDRAMPrefix() const; - + const DRAM_Allocator &getDRAMAllocator() const; // get the number of random write operations performed while flushing updates @@ -130,7 +130,13 @@ DB0_PACKED_BEGIN // Read physical data block from file and detect discrepancies void dramIOCheck(std::vector &) const; #endif - + + /** + * Exhaust the entire change-log (to mark synchronization point) + * then load entire contents from stream into the DRAM Storage + */ + void load(ChangeLogIOStream &changelog_io); + private: const std::uint32_t m_dram_page_size; const std::size_t m_chunk_size; @@ -142,12 +148,8 @@ DB0_PACKED_BEGIN std::shared_ptr m_allocator; // chunks buffer for the beginApplyChanges / completeApplyChanges operations mutable std::unordered_map > m_read_ahead_chunks; - mutable std::unordered_set m_addr_set; - - /** - * Load entire contents from stream into the DRAM Storage - */ - void load(); + mutable std::unordered_set m_addr_set; + void *updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, const o_dram_chunk_header &header); void updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr,