From 3ade8665e8f856270adcf1c5f912e0295680c4bb Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sun, 23 Nov 2025 15:56:20 +0100 Subject: [PATCH 1/3] WIP: copy_prefix light --- src/dbzero/core/storage/BDevStorage.hpp | 1 + src/dbzero/core/storage/BlockIOStream.hpp | 44 +++--- src/dbzero/core/storage/ChangeLog.hpp | 4 +- src/dbzero/core/storage/ChangeLogIOStream.hpp | 11 ++ src/dbzero/core/storage/DRAM_IOStream.cpp | 126 ++++++++++-------- src/dbzero/core/storage/DRAM_IOStream.hpp | 26 ++-- src/dbzero/core/storage/MetaIOStream.hpp | 8 +- src/dbzero/core/storage/Page_IO.hpp | 4 +- src/dbzero/core/storage/copy_prefix.cpp | 41 ++++++ src/dbzero/core/storage/copy_prefix.hpp | 25 ++++ 10 files changed, 194 insertions(+), 96 deletions(-) create mode 100644 src/dbzero/core/storage/copy_prefix.cpp create mode 100644 src/dbzero/core/storage/copy_prefix.hpp diff --git a/src/dbzero/core/storage/BDevStorage.hpp b/src/dbzero/core/storage/BDevStorage.hpp index 77ddb77e..8ea40045 100644 --- a/src/dbzero/core/storage/BDevStorage.hpp +++ b/src/dbzero/core/storage/BDevStorage.hpp @@ -137,6 +137,7 @@ DB0_PACKED_END ChangeLogIOStream m_dram_changelog_io; // data-page change log, each chunk corresponds to a separate data transaction // first element from each chunk represents the state number + // and the rest are the logical data page numbers mutated in that transaction ChangeLogIOStream m_dp_changelog_io; // meta-stream keeps meta-data about the other streams MetaIOStream m_meta_io; diff --git a/src/dbzero/core/storage/BlockIOStream.hpp b/src/dbzero/core/storage/BlockIOStream.hpp index 6729c5ea..457e3617 100644 --- a/src/dbzero/core/storage/BlockIOStream.hpp +++ b/src/dbzero/core/storage/BlockIOStream.hpp @@ -170,7 +170,7 @@ DB0_PACKED_END static constexpr std::size_t sizeOfHeaders(bool checksums_enabled) { return sizeOfBlockHeader(checksums_enabled) + sizeOfChunkHeader(); } - + AccessType getAccessType() const { return m_access_type; } @@ -195,7 +195,7 @@ DB0_PACKED_END bool m_eos; }; - // Temporarily save the stream's state, to be later restore with restoreState() + // Temporarily save the stream's state, to be later restored with restoreState() // NOTE: no mutations between saveState() and restoreState() are allowed, or the behavior is undefined void saveState(State &) const; @@ -206,6 +206,26 @@ DB0_PACKED_END return m_modified; } + /** + * Overwrite existing chunk with arbitrary data. + * No validations are performed, needs to be used with caution. + * Since this operations affects checksum, it's not allowed for streams opened with maintain_checksums = true + * Cursor position is not affected by this operation. + * @param address absolute address of the chunk + * @param buffer data to be written + * @param size size of the buffer + */ + void writeToChunk(std::uint64_t address, const void *buffer, std::size_t size); + + /** + * Read a chunk under a specific address without moving the within-stream cursor's position + * checksum validation is not performed + * @param address absolute address of the chunk + * @param buffer buffer to hold the chunk data + * @param chunk_size size of the chunk + */ + void readFromChunk(std::uint64_t address, void *buffer, std::size_t chunk_size) const; + protected: CFile &m_file; // the stream's starting address @@ -264,26 +284,6 @@ DB0_PACKED_END */ bool readBlock(std::uint64_t address, void *buffer); - /** - * Overwrite existing chunk with arbitrary data. - * No validations are performed, needs to be used with caution. - * Since this operations affects checksum, it's not allowed for streams opened with maintain_checksums = true - * Cursor position is not affected by this operation. - * @param address absolute address of the chunk - * @param buffer data to be written - * @param size size of the buffer - */ - void writeToChunk(std::uint64_t address, const void *buffer, std::size_t size); - - /** - * Read a chunk under a specific address without moving the within-stream cursor's position - * checksum validation is not performed - * @param address absolute address of the chunk - * @param buffer buffer to hold the chunk data - * @param chunk_size size of the chunk - */ - void readFromChunk(std::uint64_t address, void *buffer, std::size_t chunk_size) const; - std::uint64_t nextAddress() const; }; diff --git a/src/dbzero/core/storage/ChangeLog.hpp b/src/dbzero/core/storage/ChangeLog.hpp index 4ddc8e72..1774443f 100644 --- a/src/dbzero/core/storage/ChangeLog.hpp +++ b/src/dbzero/core/storage/ChangeLog.hpp @@ -8,7 +8,6 @@ namespace db0 { -DB0_PACKED_BEGIN class ChangeLogData { @@ -31,6 +30,7 @@ DB0_PACKED_BEGIN void initRLECompress(bool is_sorted, bool add_duplicates); }; +DB0_PACKED_BEGIN struct DB0_PACKED_ATTR o_change_log: public o_base { protected: @@ -91,6 +91,6 @@ DB0_PACKED_BEGIN return buf - _buf; } }; - DB0_PACKED_END + } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLogIOStream.hpp b/src/dbzero/core/storage/ChangeLogIOStream.hpp index 3b40560d..1436c5cc 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.hpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.hpp @@ -60,6 +60,17 @@ namespace db0 std::list >::const_iterator m_it_next_buffer; }; + // The buffering proxy for write operations + // changes are only reflected with the underlying stream on "flush" + class Writer + { + public: + void appendChangeLog(const o_change_log &); + void flush(); + + private: + }; + // Retrieves a caching reaader, which allows multiple scan over the same data Reader getStreamReader(); diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index 5d5bd600..61c1bcce 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -224,63 +224,23 @@ namespace db0 void DRAM_IOStream::beginApplyChanges(ChangeLogIOStream &changelog_io) const { assert(m_read_ahead_chunks.empty()); - assert(m_addr_set.empty()); - if (m_access_type == AccessType::READ_WRITE) { THROWF(db0::InternalException) << "DRAM_IOStream::applyChanges require read-only stream"; } - auto stream_pos = changelog_io.getStreamPos(); - try { - // Note that change log and the data chunks may be updated by other process while we read it - // the consistent state is only guaranteed after reaching end of the stream - auto change_log_ptr = changelog_io.readChangeLogChunk(); - - // First collect the change log to only visit each address once - while (change_log_ptr) { - for (auto address: *change_log_ptr) { - if (m_addr_set.find(address) == m_addr_set.end()) { - m_addr_set.insert(address); - // buffer must include BlockIOStream's chunk header and data - auto &buffer = createReadAheadBuffer(address, m_chunk_size + o_block_io_chunk_header::sizeOf()); - // the address reported in changelog must already be available in the stream - // it may come from a more recent update as well (and potentially may only be partially written) - // therefore chunk-level checksum validation is necessary - BlockIOStream::readFromChunk(address, buffer.data(), buffer.size()); - } - } - change_log_ptr = changelog_io.readChangeLogChunk(); - } - - // Visit the addresses next - // this is important becase otherwise we might've been accessing outdated or inconsistent DPs - for (auto address: m_addr_set) { - // buffer must include BlockIOStream's chunk header and data - auto &buffer = createReadAheadBuffer(address, m_chunk_size + o_block_io_chunk_header::sizeOf()); - // the address reported in changelog must already be available in the stream - // it may come from a more recent update as well (and potentially may only be partially written) - // therefore chunk-level checksum validation is necessary - BlockIOStream::readFromChunk(address, buffer.data(), buffer.size()); - } - - } catch (db0::IOException &) { - changelog_io.setStreamPos(stream_pos); - m_read_ahead_chunks.clear(); - m_addr_set.clear(); - throw; - } + fetchDRAM_IOChanges(*this, changelog_io, m_read_ahead_chunks); } bool DRAM_IOStream::completeApplyChanges() { bool result = false; - for (auto address: m_addr_set) { - auto &buffer = getReadAheadBuffer(address); + for (const auto &item: m_read_ahead_chunks) { + auto address = item.first; + const auto &buffer = item.second; const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf()); updateDRAMPage(address, nullptr, header, header.getData()); result = true; } - m_addr_set.clear(); m_read_ahead_chunks.clear(); return result; } @@ -323,18 +283,6 @@ namespace db0 BlockIOStream::close(); } - std::vector &DRAM_IOStream::createReadAheadBuffer(std::uint64_t address, std::size_t size) const - { - assert(m_read_ahead_chunks.find(address) == m_read_ahead_chunks.end()); - return m_read_ahead_chunks.emplace(address, size).first->second; - } - - const std::vector &DRAM_IOStream::getReadAheadBuffer(std::uint64_t address) const - { - assert(m_read_ahead_chunks.find(address) != m_read_ahead_chunks.end()); - return m_read_ahead_chunks.at(address); - } - #ifndef NDEBUG void DRAM_IOStream::getDRAM_IOMap(std::unordered_map > &io_map) const { @@ -342,6 +290,70 @@ namespace db0 io_map[entry.first] = { entry.second.m_state_num, entry.second.m_address }; } } -#endif +#endif + + void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, ChangeLogIOStream &changelog_io, + std::unordered_map > &chunks_buf) + { + auto create_read_ahead_buffer = [&](std::uint64_t address, std::size_t size) -> std::vector & + { + auto it = chunks_buf.find(address); + if (it != chunks_buf.end()) { + return it->second; + } + return chunks_buf.emplace(address, size).first->second; + }; + + auto stream_pos = changelog_io.getStreamPos(); + try { + // Must continue until exhausting the change-log + for (;;) { + // Note that change log and the data chunks may be updated by other process while we read it + // the consistent state is only guaranteed after reaching end of the stream + auto change_log_ptr = changelog_io.readChangeLogChunk(); + if (!change_log_ptr) { + // change-log exhausted + break; + } + + // First collect the change log to only visit each address once + std::unordered_set addr_set; + while (change_log_ptr) { + for (auto address: *change_log_ptr) { + if (addr_set.find(address) == addr_set.end()) { + addr_set.insert(address); + } + } + change_log_ptr = changelog_io.readChangeLogChunk(); + } + + // Visit the addresses next + // this is important becase otherwise we might've been accessing outdated or inconsistent DPs + for (auto address: addr_set) { + // buffer must include BlockIOStream's chunk header and data + auto &buffer = create_read_ahead_buffer(address, dram_io.getChunkSize() + o_block_io_chunk_header::sizeOf()); + // the address reported in changelog must already be available in the stream + // it may come from a more recent update as well (and potentially may only be partially written) + // therefore chunk-level checksum validation is necessary + dram_io.readFromChunk(address, buffer.data(), buffer.size()); + } + } + + } catch (db0::IOException &) { + changelog_io.setStreamPos(stream_pos); + chunks_buf.clear(); + throw; + } + } + + void flushDRAM_IOChanges(DRAM_IOStream &dram_io, + std::unordered_map > &chunks_buf) + { + for (const auto &item: chunks_buf) { + auto address = item.first; + const auto &buffer = item.second; + dram_io.writeToChunk(address, buffer.data(), buffer.size()); + } + } } \ No newline at end of file diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index 6c6ccda2..ad9bd074 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -15,13 +15,13 @@ namespace db0 { -DB0_PACKED_BEGIN class DRAM_Prefix; class DRAM_Allocator; class CFile; class ChangeLogIOStream; +DB0_PACKED_BEGIN struct DB0_PACKED_ATTR o_dram_chunk_header: public o_fixed { std::uint64_t m_state_num = 0; @@ -43,6 +43,7 @@ DB0_PACKED_BEGIN return (const char*)this + sizeOf(); } }; +DB0_PACKED_END struct DRAM_PageInfo { @@ -55,7 +56,7 @@ DB0_PACKED_BEGIN /** * BlockIOStream wrapper with specialization for reading/writing DRAMSpace contents */ - class DRAM_IOStream: protected BlockIOStream + class DRAM_IOStream: public BlockIOStream { public: // checksums disabled in this type of stream @@ -137,6 +138,10 @@ DB0_PACKED_BEGIN */ void load(ChangeLogIOStream &changelog_io); + std::size_t getChunkSize() const { + return m_chunk_size; + } + private: const std::uint32_t m_dram_page_size; const std::size_t m_chunk_size; @@ -148,7 +153,6 @@ DB0_PACKED_BEGIN std::shared_ptr m_allocator; // chunks buffer for the beginApplyChanges / completeApplyChanges operations mutable std::unordered_map > m_read_ahead_chunks; - mutable std::unordered_set m_addr_set; void *updateDRAMPage(std::uint64_t address, std::unordered_set *allocs_ptr, const o_dram_chunk_header &header); @@ -156,13 +160,15 @@ DB0_PACKED_BEGIN const o_dram_chunk_header &header, const void *bytes); // the number of random write operations performed while flushing updates - std::uint64_t m_rand_ops = 0; - - // Create new read-ahead buffer - std::vector &createReadAheadBuffer(std::uint64_t address, std::size_t size) const; - // Retrieve existing read-ahead buffer - const std::vector &getReadAheadBuffer(std::uint64_t address) const; + std::uint64_t m_rand_ops = 0; }; -DB0_PACKED_END + // Pre-fetch changes into the chunks buffer + void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, ChangeLogIOStream &changelog_io, + std::unordered_map > &chunks_buf); + + // Flush changes from the buffer + void flushDRAM_IOChanges(DRAM_IOStream &dram_io, + std::unordered_map > &chunks_buf); + } \ No newline at end of file diff --git a/src/dbzero/core/storage/MetaIOStream.hpp b/src/dbzero/core/storage/MetaIOStream.hpp index c211c1d1..d59cdda0 100644 --- a/src/dbzero/core/storage/MetaIOStream.hpp +++ b/src/dbzero/core/storage/MetaIOStream.hpp @@ -8,8 +8,8 @@ namespace db0 { -DB0_PACKED_BEGIN +DB0_PACKED_BEGIN // Single managed-stream associated item struct DB0_PACKED_ATTR o_meta_item: public o_fixed { @@ -20,7 +20,9 @@ DB0_PACKED_BEGIN o_meta_item(std::pair stream_pos); }; +DB0_PACKED_END +DB0_PACKED_BEGIN // The single log item, possibly associated with multiple managed streams class DB0_PACKED_ATTR o_meta_log: public o_base { @@ -43,6 +45,7 @@ DB0_PACKED_BEGIN (o_list::type()); } }; +DB0_PACKED_END // The MetaIOStream is used to annotate data (i.e. state numbers and the corresponding file positions) // in the underlying managed ChangeLogIOStream-s @@ -93,6 +96,5 @@ DB0_PACKED_BEGIN bool checkAppend() const; void appendMetaLog(StateNumType state_num, const std::vector &meta_items); }; - -DB0_PACKED_END + } diff --git a/src/dbzero/core/storage/Page_IO.hpp b/src/dbzero/core/storage/Page_IO.hpp index 784c7b07..9581ed92 100644 --- a/src/dbzero/core/storage/Page_IO.hpp +++ b/src/dbzero/core/storage/Page_IO.hpp @@ -29,9 +29,9 @@ namespace db0 Page_IO(std::size_t header_size, CFile &file, std::uint32_t page_size); ~Page_IO(); - + // Appends a new page to the stream - // @return page number (aka storage page number) + // @return ever increasing page number (aka storage page number) std::uint64_t append(const void *buffer); void read(std::uint64_t page_num, void *buffer) const; diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp new file mode 100644 index 00000000..437ca406 --- /dev/null +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -0,0 +1,41 @@ +#include "copy_prefix.hpp" + +namespace db0 + +{ + + void copyDRAM_IO(DRAM_IOStream &input_io, ChangeLogIOStream &input_dram_changelog, + DRAM_IOStream &output_io, ChangeLogIOStream::Writer &output_dram_changelog) + { + // Exhaust the input_dram_changelog first + input_dram_changelog.setStreamPosHead(); + auto change_log_ptr = input_dram_changelog.readChangeLogChunk(); + + while (change_log_ptr) { + output_dram_changelog.appendChangeLog(*change_log_ptr); + change_log_ptr = input_dram_changelog.readChangeLogChunk(); + } + + // Copy the entire DRAM_IO stream next + copyStream(input_io, output_io); + + // Chunks loaded during the sync step + // NOTE: in this step we prefetch to memory to be able to catch up with changes + std::unordered_map > chunk_buf; + fetchDRAM_IOChanges(input_io, input_dram_changelog, chunk_buf); + flushDRAM_IOChanges(output_io, chunk_buf); + } + + void copyStream(BlockIOStream &in, BlockIOStream &out) + { + // position at the beginning of the stream + in.setStreamPosHead(); + std::vector buffer; + std::size_t chunk_size = 0; + while ((chunk_size = in.readChunk(buffer)) > 0) { + out.addChunk(chunk_size); + out.appendToChunk(buffer.data(), chunk_size); + } + } + +} \ No newline at end of file diff --git a/src/dbzero/core/storage/copy_prefix.hpp b/src/dbzero/core/storage/copy_prefix.hpp new file mode 100644 index 00000000..7e5f7f22 --- /dev/null +++ b/src/dbzero/core/storage/copy_prefix.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include "BlockIOStream.hpp" +#include "Page_IO.hpp" +#include "Diff_IO.hpp" +#include "DRAM_IOStream.hpp" +#include "ChangeLogIOStream.hpp" + +namespace db0 + +{ + + // This routine copies the entire DRAM_IO stream (from begin) in a manner + // synchronized with the correspoding changelog stream + // NOTE: output_changelog is NOT flushed (see the design) + void copyDRAM_IO(DRAM_IOStream &input_io, ChangeLogIOStream &input_dram_changelog, + DRAM_IOStream &output_io, ChangeLogIOStream::Writer &output_dram_changelog); + + // Copy entire contents from one BlockIOStream to another + void copyStream(BlockIOStream &in, BlockIOStream &out); + + void copyPageIO(const Page_IO &input_io, const ChangeLogIOStream &input_dp_changelog, + Page_IO &output_io, ChangeLogIOStream::Writer &output_dp_changelog); + +} \ No newline at end of file From 0aa5b4dc3c006620788699b36fdec42a57d15fd3 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Mon, 24 Nov 2025 14:40:32 +0100 Subject: [PATCH 2/3] WIP: save work, refactor of change log streams to define flex headers --- src/dbzero/core/serialization/Types.hpp | 20 ++++-- src/dbzero/core/storage/BDevStorage.cpp | 43 +++++------ src/dbzero/core/storage/BDevStorage.hpp | 22 +++--- src/dbzero/core/storage/BaseStorage.cpp | 6 +- src/dbzero/core/storage/BaseStorage.hpp | 20 +++--- src/dbzero/core/storage/ChangeLog.cpp | 58 ++++++--------- src/dbzero/core/storage/ChangeLog.hpp | 71 +++++++++++++++---- src/dbzero/core/storage/ChangeLogIOStream.cpp | 41 +++++++---- src/dbzero/core/storage/ChangeLogIOStream.hpp | 25 ++++--- src/dbzero/core/storage/DRAM_IOStream.cpp | 10 +-- src/dbzero/core/storage/DRAM_IOStream.hpp | 15 ++-- src/dbzero/core/storage/Page_IO.cpp | 6 ++ src/dbzero/core/storage/Page_IO.hpp | 7 +- src/dbzero/core/storage/SparseIndexBase.hpp | 4 +- src/dbzero/core/storage/SparsePair.cpp | 4 +- src/dbzero/core/storage/SparsePair.hpp | 12 ++-- src/dbzero/core/storage/copy_prefix.cpp | 6 +- src/dbzero/core/storage/copy_prefix.hpp | 14 ++-- .../object_model/tags/SelectModified.cpp | 3 +- tests/unit_tests/ChangeLogTest.cpp | 65 +++++++++++++++++ 20 files changed, 304 insertions(+), 148 deletions(-) create mode 100644 tests/unit_tests/ChangeLogTest.cpp diff --git a/src/dbzero/core/serialization/Types.hpp b/src/dbzero/core/serialization/Types.hpp index 3e709804..0a39cbb6 100644 --- a/src/dbzero/core/serialization/Types.hpp +++ b/src/dbzero/core/serialization/Types.hpp @@ -8,7 +8,6 @@ namespace db0 { -DB0_PACKED_BEGIN class Memspace; @@ -18,6 +17,7 @@ DB0_PACKED_BEGIN /** * Overlaid simple type wrapper */ +DB0_PACKED_BEGIN template class DB0_PACKED_ATTR o_simple: public o_fixed > { public : @@ -172,10 +172,12 @@ DB0_PACKED_BEGIN std::uint32_t m_bytes = 0; std::byte m_buf; }; +DB0_PACKED_END /** - * Overlaid null type + * Overlaid null type (derived from o_base) */ +DB0_PACKED_BEGIN class DB0_PACKED_ATTR o_null: public o_base { public : @@ -207,14 +209,24 @@ DB0_PACKED_BEGIN std::uint16_t getVersion() const; static constexpr bool versionIsStored(); }; - +DB0_PACKED_END + +DB0_PACKED_BEGIN + // NOTE: when used as a base class, the sizeof should be zero! (due to empty base optimization) + struct DB0_PACKED_ATTR o_fixed_null: public o_fixed + { + static std::size_t sizeOf() { + return 0; + } + }; +DB0_PACKED_END + /// some predefined simple overlaid types using o_int = o_simple; using o_uint = o_simple; using o_float = o_simple; using o_double = o_simple; -DB0_PACKED_END } namespace std diff --git a/src/dbzero/core/storage/BDevStorage.cpp b/src/dbzero/core/storage/BDevStorage.cpp index ef809c97..04f8e083 100644 --- a/src/dbzero/core/storage/BDevStorage.cpp +++ b/src/dbzero/core/storage/BDevStorage.cpp @@ -36,11 +36,18 @@ namespace db0 : BaseStorage(access_type) , m_file(file_name, access_type, lock_flags) , m_config(readConfig()) - , m_dram_changelog_io(getChangeLogIOStream(m_config.m_dram_changelog_io_offset, access_type)) - , m_dp_changelog_io(getChangeLogIOStream(m_config.m_dp_changelog_io_offset, access_type)) - , m_meta_io(init(getMetaIOStream(m_config.m_meta_io_offset, meta_io_step_size.value_or(DEFAULT_META_IO_STEP_SIZE), - access_type))) - , m_dram_io(init(getDRAMIOStream(m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io)) + , m_dram_changelog_io(getChangeLogIOStream( + m_config.m_dram_changelog_io_offset, access_type) + ) + , m_dp_changelog_io(getChangeLogIOStream( + m_config.m_dp_changelog_io_offset, access_type) + ) + , m_meta_io(init(getMetaIOStream( + m_config.m_meta_io_offset, meta_io_step_size.value_or(DEFAULT_META_IO_STEP_SIZE), access_type)) + ) + , m_dram_io(init(getDRAMIOStream( + m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io) + ) , m_sparse_pair(m_dram_io.getDRAMPair(), access_type) , m_sparse_index(m_sparse_pair.getSparseIndex()) , m_diff_index(m_sparse_pair.getDiffIndex()) @@ -57,7 +64,7 @@ namespace db0 { } - DRAM_IOStream BDevStorage::init(DRAM_IOStream &&dram_io, ChangeLogIOStream &dram_change_log) + DRAM_IOStream BDevStorage::init(DRAM_IOStream &&dram_io, DRAM_ChangeLogStreamT &dram_change_log) { dram_io.load(dram_change_log); return std::move(dram_io); @@ -118,15 +125,15 @@ namespace db0 // Create higher-order data structures { CFile file(file_name, AccessType::READ_WRITE); - ChangeLogIOStream *dram_changelog_io_ptr = nullptr; + DRAM_ChangeLogStreamT *dram_changelog_io_ptr = nullptr; DRAM_IOStream *dram_io_ptr = nullptr; auto tail_function = [&]() { assert(dram_io_ptr && dram_changelog_io_ptr); // take max from the underlying I/O streams return std::max(offset, std::max(dram_io_ptr->tail(), dram_changelog_io_ptr->tail())); }; - - auto dram_changelog_io = ChangeLogIOStream(file, config->m_dram_changelog_io_offset, config->m_block_size, + + auto dram_changelog_io = DRAM_ChangeLogStreamT(file, config->m_dram_changelog_io_offset, config->m_block_size, tail_function, AccessType::READ_WRITE); dram_changelog_io_ptr = &dram_changelog_io; auto dram_io = DRAM_IOStream(file, config->m_dram_io_offset, config->m_block_size, tail_function, @@ -369,11 +376,7 @@ namespace db0 DRAM_IOStream BDevStorage::getDRAMIOStream(std::uint64_t first_block_pos, std::uint32_t dram_page_size, AccessType access_type) { return { m_file, first_block_pos, m_config.m_block_size, getTailFunction(), access_type, dram_page_size }; } - - ChangeLogIOStream BDevStorage::getChangeLogIOStream(std::uint64_t first_block_pos, AccessType access_type) { - return { m_file, first_block_pos, m_config.m_block_size, getTailFunction(), access_type }; - } - + std::uint64_t BDevStorage::tail() const { // take max from the 4 underlying I/O streams @@ -577,17 +580,17 @@ namespace db0 } #endif - void BDevStorage::fetchChangeLogs(StateNumType begin_state, std::optional end_state, - std::function f) const + void BDevStorage::fetchDP_ChangeLogs(StateNumType begin_state, std::optional end_state, + std::function f) const { std::unique_lock lock(m_mutex); if (m_dp_changelog_io.modified()) { THROWF(db0::IOException) << "BDevStorage::fetchChangeLogs: dp-changelog is modified and needs to be flushed first"; } - auto &dp_changelog_io = const_cast(m_dp_changelog_io); - ChangeLogIOStream::State dp_state; + auto &dp_changelog_io = const_cast(m_dp_changelog_io); + DP_ChangeLogStreamT::State dp_state; dp_changelog_io.saveState(dp_state); - + { std::vector buf; // try locating the nearest meta-log entry to position the dp-changelog @@ -626,7 +629,7 @@ namespace db0 dp_changelog_io.restoreState(dp_state); } - void BDevStorage::beginCommit() + void BDevStorage::beginCommit() { #ifndef NDEBUG m_commit_pending = true; diff --git a/src/dbzero/core/storage/BDevStorage.hpp b/src/dbzero/core/storage/BDevStorage.hpp index 8ea40045..38a16247 100644 --- a/src/dbzero/core/storage/BDevStorage.hpp +++ b/src/dbzero/core/storage/BDevStorage.hpp @@ -56,6 +56,8 @@ DB0_PACKED_END public: static constexpr std::uint32_t DEFAULT_PAGE_SIZE = 4096; static constexpr std::size_t DEFAULT_META_IO_STEP_SIZE = 16 << 20; + using DRAM_ChangeLogStreamT = ChangeLogIOStream<>; + using DP_ChangeLogStreamT = ChangeLogIOStream; /** * Opens BDevStorage over an existing file @@ -115,8 +117,8 @@ DB0_PACKED_END // @return total bytes written / diff bytes written std::pair getDiff_IOStats() const; - void fetchChangeLogs(StateNumType begin_state, std::optional end_state, - std::function f) const override; + void fetchDP_ChangeLogs(StateNumType begin_state, std::optional end_state, + std::function f) const override; #ifndef NDEBUG void getDRAM_IOMap(std::unordered_map &) const override; @@ -134,11 +136,11 @@ DB0_PACKED_END // DRAM-changelog stream stores the sequence of updates to DRAM pages // DRAM-changelog must be initialized before DRAM_IOStream - ChangeLogIOStream m_dram_changelog_io; + DRAM_ChangeLogStreamT m_dram_changelog_io; // data-page change log, each chunk corresponds to a separate data transaction // first element from each chunk represents the state number // and the rest are the logical data page numbers mutated in that transaction - ChangeLogIOStream m_dp_changelog_io; + DP_ChangeLogStreamT m_dp_changelog_io; // meta-stream keeps meta-data about the other streams MetaIOStream m_meta_io; // memory-mapped file I/O @@ -160,7 +162,7 @@ DB0_PACKED_END unsigned int *m_throw_op_count_ptr = nullptr; #endif - static DRAM_IOStream init(DRAM_IOStream &&, ChangeLogIOStream &); + static DRAM_IOStream init(DRAM_IOStream &&, DRAM_ChangeLogStreamT &); static MetaIOStream init(MetaIOStream &&); @@ -173,8 +175,12 @@ DB0_PACKED_END BlockIOStream getBlockIOStream(std::uint64_t first_block_pos, AccessType); DRAM_IOStream getDRAMIOStream(std::uint64_t first_block_pos, std::uint32_t dram_page_size, AccessType); - - ChangeLogIOStream getChangeLogIOStream(std::uint64_t first_block_pos, AccessType); + + template + ChangeLogIOStreamT getChangeLogIOStream(std::uint64_t first_block_pos, AccessType access_type) + { + return { m_file, first_block_pos, m_config.m_block_size, getTailFunction(), access_type }; + } MetaIOStream getMetaIOStream(std::uint64_t first_block_pos, std::size_t step_size, AccessType); @@ -199,5 +205,5 @@ DB0_PACKED_END void _read(std::uint64_t address, StateNumType state_num, std::size_t size, void *buffer, FlagSet = { AccessOptions::read, AccessOptions::write }, unsigned int *chain_len = nullptr) const; }; - + } \ No newline at end of file diff --git a/src/dbzero/core/storage/BaseStorage.cpp b/src/dbzero/core/storage/BaseStorage.cpp index e3785081..81ea7982 100644 --- a/src/dbzero/core/storage/BaseStorage.cpp +++ b/src/dbzero/core/storage/BaseStorage.cpp @@ -56,11 +56,11 @@ namespace db0 void BaseStorage::beginCommit() { } - void BaseStorage::endCommit() { + void BaseStorage::endCommit() { } - void BaseStorage::fetchChangeLogs(StateNumType begin_state, std::optional end_state, - std::function f) const + void BaseStorage::fetchDP_ChangeLogs(StateNumType begin_state, std::optional end_state, + std::function f) const { THROWF(db0::InternalException) << "Operation not supported: fetchChangeLog"; } diff --git a/src/dbzero/core/storage/BaseStorage.hpp b/src/dbzero/core/storage/BaseStorage.hpp index 4308826e..58c26e18 100644 --- a/src/dbzero/core/storage/BaseStorage.hpp +++ b/src/dbzero/core/storage/BaseStorage.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -12,17 +13,20 @@ namespace db0 { class ProcessTimer; - struct o_change_log; + template struct o_change_log; /** * Defines the file-oriented storage interface */ class BaseStorage { - public: + public: + using DRAM_ChangeLogT = db0::o_change_log; + using DP_ChangeLogT = db0::o_change_log; + BaseStorage(AccessType); virtual ~BaseStorage() = default; - + /** * Read data from a specific state into a user provided buffer * @@ -121,13 +125,13 @@ namespace db0 // @param end_state the first state number past the last state number to be included // in the change log (or up to the last state number if not specified) // @param f function to be called for each transaction's change log - virtual void fetchChangeLogs(StateNumType begin_state, std::optional end_state, - std::function f) const; + virtual void fetchDP_ChangeLogs(StateNumType begin_state, std::optional end_state, + std::function f) const; #ifndef NDEBUG // state number, file offset using DRAM_PageInfo = std::pair; - + struct DRAM_CheckResult { std::uint64_t m_address; @@ -140,8 +144,8 @@ namespace db0 // Activate throw from commit after specific number of operations (for testing purposes) virtual void setCrashFromCommit(unsigned int *op_count_ref); -#endif - +#endif + protected: AccessType m_access_type; }; diff --git a/src/dbzero/core/storage/ChangeLog.cpp b/src/dbzero/core/storage/ChangeLog.cpp index abb76671..bc1360dd 100644 --- a/src/dbzero/core/storage/ChangeLog.cpp +++ b/src/dbzero/core/storage/ChangeLog.cpp @@ -12,7 +12,7 @@ namespace db0 initRLECompress(is_sorted, add_duplicates); } } - + ChangeLogData::ChangeLogData(std::vector &&change_log, bool rle_compress, bool add_duplicates, bool is_sorted) : m_change_log(std::move(change_log)) { @@ -30,44 +30,23 @@ namespace db0 m_rle_builder.append(value, add_duplicates); } } - - o_change_log::o_change_log(const ChangeLogData &data) - : m_rle_compressed(!data.m_rle_builder.empty()) - { - if (m_rle_compressed) { - arrangeMembers() - (o_rle_sequence::type(), data.m_rle_builder.getData()); - } else { - arrangeMembers() - (o_list >::type(), data.m_change_log); - } - } - std::size_t o_change_log::measure(const ChangeLogData &data) - { - bool rle_compressed = !data.m_rle_builder.empty(); - if (rle_compressed) { - return measureMembers() - (o_rle_sequence::type(), data.m_rle_builder.getData()); - } else { - return measureMembers() - (o_list >::type(), data.m_change_log); - } - } - - o_change_log::ConstIterator::ConstIterator(o_list >::const_iterator it) + template + o_change_log::ConstIterator::ConstIterator(o_list >::const_iterator it) : m_rle(false) , m_list_it(it) { } - o_change_log::ConstIterator::ConstIterator(o_rle_sequence::ConstIterator it) + template + o_change_log::ConstIterator::ConstIterator(o_rle_sequence::ConstIterator it) : m_rle(true) , m_rle_it(it) { } - o_change_log::ConstIterator &o_change_log::ConstIterator::operator++() + template + typename o_change_log::ConstIterator &o_change_log::ConstIterator::operator++() { if (m_rle) { ++m_rle_it; @@ -77,7 +56,8 @@ namespace db0 return *this; } - std::uint64_t o_change_log::ConstIterator::operator*() const + template + std::uint64_t o_change_log::ConstIterator::operator*() const { if (m_rle) { return *m_rle_it; @@ -86,7 +66,8 @@ namespace db0 } } - bool o_change_log::ConstIterator::operator!=(const ConstIterator &other) const + template + bool o_change_log::ConstIterator::operator!=(const ConstIterator &other) const { if (m_rle != other.m_rle) { return true; @@ -98,26 +79,29 @@ namespace db0 } } - o_change_log::ConstIterator o_change_log::begin() const + template + typename o_change_log::ConstIterator o_change_log::begin() const { - if (m_rle_compressed) { + if (this->isRLECompressed()) { return ConstIterator(rle_sequence().begin()); } else { return ConstIterator(changle_log().begin()); } } - o_change_log::ConstIterator o_change_log::end() const + template + typename o_change_log::ConstIterator o_change_log::end() const { - if (m_rle_compressed) { + if (this->isRLECompressed()) { return ConstIterator(rle_sequence().end()); } else { return ConstIterator(changle_log().end()); } } - bool o_change_log::isRLECompressed() const { - return m_rle_compressed; + template + bool o_change_log::isRLECompressed() const { + return rleCompressed().value(); } - + } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLog.hpp b/src/dbzero/core/storage/ChangeLog.hpp index 1774443f..103322ff 100644 --- a/src/dbzero/core/storage/ChangeLog.hpp +++ b/src/dbzero/core/storage/ChangeLog.hpp @@ -4,11 +4,13 @@ #include #include #include +#include +#include namespace db0 { - + class ChangeLogData { public: @@ -30,29 +32,38 @@ namespace db0 void initRLECompress(bool is_sorted, bool add_duplicates); }; + // @tparam BaseT base class type (for storing common header info if needed) DB0_PACKED_BEGIN - struct DB0_PACKED_ATTR o_change_log: public o_base + template + struct DB0_PACKED_ATTR o_change_log: public o_ext, BaseT, 0, false> { protected: - friend struct o_base; - bool m_rle_compressed; + using super_t = o_ext, BaseT, 0, false>; + friend super_t; - o_change_log(const ChangeLogData &); + template + o_change_log(const ChangeLogData &, Args&&... args); + + const o_simple &rleCompressed() const { + return this->getDynFirst(o_simple::type()); + } // change log as RLE sequence type const o_rle_sequence &rle_sequence() const { - return reinterpret_cast &>(this->getDynFirst( - o_rle_sequence::type())); + return reinterpret_cast &>( + this->getDynAfter(rleCompressed(), o_rle_sequence::type()) + ); } // uncompressed change log const o_list > &changle_log() const { - return this->getDynFirst(o_list >::type()); + return this->getDynAfter(rleCompressed(), o_list >::type()); } public: - static std::size_t measure(const ChangeLogData &); + template + static std::size_t measure(const ChangeLogData &, Args... args); class ConstIterator { @@ -70,17 +81,18 @@ DB0_PACKED_BEGIN o_rle_sequence::ConstIterator m_rle_it; }; + bool isRLECompressed() const; + ConstIterator begin() const; ConstIterator end() const; // Decode the last element from the log (possibly a sentinel) std::uint64_t last() const; - - bool isRLECompressed() const; - + template static std::size_t safeSizeOf(T buf) { auto _buf = buf; + buf += super_t::safeBaseSize(buf); auto is_rle_compressed = o_simple::__const_ref(buf); buf += is_rle_compressed.sizeOf(); if (is_rle_compressed.value()) { @@ -92,5 +104,40 @@ DB0_PACKED_BEGIN } }; DB0_PACKED_END + + template + template + o_change_log::o_change_log(const ChangeLogData &data, Args&&... args) + : super_t(std::forward(args)...) + { + bool rle_compressed = !data.m_rle_builder.empty(); + if (rle_compressed) { + this->arrangeMembers() + (o_simple::type(), true) + (o_rle_sequence::type(), data.m_rle_builder.getData()); + } else { + this->arrangeMembers() + (o_simple::type(), false) + (o_list >::type(), data.m_change_log); + } + } + + template + template + std::size_t o_change_log::measure(const ChangeLogData &data, Args... args) + { + bool rle_compressed = !data.m_rle_builder.empty(); + if (rle_compressed) { + return measureBaseMembers(std::forward(args)...) + (o_simple::type()) + (o_rle_sequence::type(), data.m_rle_builder.getData()); + } else { + return measureBaseMembers(std::forward(args)...) + (o_simple::type()) + (o_list >::type(), data.m_change_log); + } + } + + extern template class o_change_log; } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLogIOStream.cpp b/src/dbzero/core/storage/ChangeLogIOStream.cpp index 3f8475ef..c05ff2c6 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.cpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.cpp @@ -5,64 +5,74 @@ namespace db0 { - ChangeLogIOStream::ChangeLogIOStream(CFile &m_file, std::uint64_t begin, std::uint32_t block_size, + template + ChangeLogIOStream::ChangeLogIOStream(CFile &m_file, std::uint64_t begin, std::uint32_t block_size, std::function tail_function, AccessType access_type) // enable checksums by default : BlockIOStream(m_file, begin, block_size, tail_function, access_type, true) { } - ChangeLogIOStream::ChangeLogIOStream(BlockIOStream &&io_stream) + template + ChangeLogIOStream::ChangeLogIOStream(BlockIOStream &&io_stream) : BlockIOStream(std::move(io_stream)) { } - const o_change_log &ChangeLogIOStream::appendChangeLog(ChangeLogData &&data) + template + template + const o_change_log_t &ChangeLogIOStream::appendChangeLog(ChangeLogData &&data, Args&&... args) { - auto size_of = o_change_log::measure(data); + auto size_of = o_change_log_t::measure(data, std::forward(args)...); if (m_buffer.size() < size_of) { m_buffer.resize(size_of); } - o_change_log::__new(m_buffer.data(), data); + o_change_log_t::__new(m_buffer.data(), data, std::forward(args)...); // append change log as a separate chunk BlockIOStream::addChunk(size_of); BlockIOStream::appendToChunk(m_buffer.data(), size_of); - m_last_change_log_ptr = &o_change_log::__const_ref(m_buffer.data()); + m_last_change_log_ptr = &o_change_log_t::__const_ref(m_buffer.data()); assert(m_last_change_log_ptr->sizeOf() == size_of); return *m_last_change_log_ptr; } - const o_change_log *ChangeLogIOStream::readChangeLogChunk(std::vector &buffer) + template + const o_change_log_t *ChangeLogIOStream::readChangeLogChunk(std::vector &buffer) { if (BlockIOStream::readChunk(buffer)) { - m_last_change_log_ptr = &o_change_log::__const_ref(buffer.data()); + m_last_change_log_ptr = &o_change_log_t::__const_ref(buffer.data()); return m_last_change_log_ptr; } else { return nullptr; } } - const o_change_log *ChangeLogIOStream::readChangeLogChunk() { + template + const o_change_log_t *ChangeLogIOStream::readChangeLogChunk() { return readChangeLogChunk(m_buffer); } - const o_change_log *ChangeLogIOStream::getLastChangeLogChunk() const { + template + const o_change_log_t *ChangeLogIOStream::getLastChangeLogChunk() const { return m_last_change_log_ptr; } - ChangeLogIOStream::Reader::Reader(ChangeLogIOStream &stream) + template + ChangeLogIOStream::Reader::Reader(ChangeLogIOStream &stream) : m_stream(stream) , m_it_next_buffer(m_buffers.end()) { } - ChangeLogIOStream::Reader ChangeLogIOStream::getStreamReader() { + template + typename ChangeLogIOStream::Reader ChangeLogIOStream::getStreamReader() { return *this; } - const o_change_log *ChangeLogIOStream::Reader::readChangeLogChunk() + template + const o_change_log_t *ChangeLogIOStream::Reader::readChangeLogChunk() { if (m_it_next_buffer == m_buffers.end()) { // read and cache the result @@ -76,13 +86,14 @@ namespace db0 return result; } else { // read from a cached buffer - auto result = &o_change_log::__const_ref(m_it_next_buffer->data()); + auto result = &o_change_log_t::__const_ref(m_it_next_buffer->data()); ++m_it_next_buffer; return result; } } - void ChangeLogIOStream::Reader::reset() { + template + void ChangeLogIOStream::Reader::reset() { m_it_next_buffer = m_buffers.begin(); } diff --git a/src/dbzero/core/storage/ChangeLogIOStream.hpp b/src/dbzero/core/storage/ChangeLogIOStream.hpp index 1436c5cc..01107fc0 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.hpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.hpp @@ -8,10 +8,12 @@ namespace db0 { - + /** * The BlockIOStream specialization to collect change-logs as separate chunks + * @tparam HeaderT the optional header type of change-log chunks */ + template > class ChangeLogIOStream: public BlockIOStream { public: @@ -26,30 +28,31 @@ namespace db0 * This method encodes the provided change log vector and appends it as a separate chunk. * The operation replaces the last stored change log chunk. * @param data the change log data to be appended - */ - const o_change_log &appendChangeLog(ChangeLogData &&data); + */ + template + const o_change_log_t &appendChangeLog(ChangeLogData &&data, Args&&... args); /** * Read a single change-log chunk from the stream. * The operation overwrites result of the previous read (unless nullptr is returned) * @return the change-log sequence or nullptr if end of the stream reached */ - const o_change_log *readChangeLogChunk(); + const o_change_log_t *readChangeLogChunk(); // Read chunk, bring your own buffer - const o_change_log *readChangeLogChunk(std::vector &buffer); + const o_change_log_t *readChangeLogChunk(std::vector &buffer); /** * Get last read or written change log chunk */ - const o_change_log *getLastChangeLogChunk() const; + const o_change_log_t *getLastChangeLogChunk() const; class Reader { public: Reader(ChangeLogIOStream &); - const o_change_log *readChangeLogChunk(); + const o_change_log_t *readChangeLogChunk(); // initialize reading from the beginning void reset(); @@ -65,18 +68,20 @@ namespace db0 class Writer { public: - void appendChangeLog(const o_change_log &); + void appendChangeLog(const o_change_log_t &); void flush(); - private: + private: }; // Retrieves a caching reaader, which allows multiple scan over the same data Reader getStreamReader(); private: - const o_change_log *m_last_change_log_ptr = nullptr; + const o_change_log_t *m_last_change_log_ptr = nullptr; std::vector m_buffer; }; + extern template class ChangeLogIOStream<>; + } \ No newline at end of file diff --git a/src/dbzero/core/storage/DRAM_IOStream.cpp b/src/dbzero/core/storage/DRAM_IOStream.cpp index 61c1bcce..d9930c4c 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.cpp +++ b/src/dbzero/core/storage/DRAM_IOStream.cpp @@ -81,7 +81,7 @@ namespace db0 } } - void DRAM_IOStream::load(ChangeLogIOStream &changelog_io) + void DRAM_IOStream::load(DRAM_ChangeLogStreamT &changelog_io) { assert(m_access_type == AccessType::READ_WRITE); @@ -114,7 +114,7 @@ namespace db0 m_allocator->update(allocs); } - void DRAM_IOStream::flushUpdates(std::uint64_t state_num, ChangeLogIOStream &dram_changelog_io) + void DRAM_IOStream::flushUpdates(std::uint64_t state_num, DRAM_ChangeLogStreamT &dram_changelog_io) { if (m_access_type == AccessType::READ_ONLY) { THROWF(db0::IOException) << "DRAM_IOStream::flushUpdates error: read-only stream"; @@ -221,7 +221,7 @@ namespace db0 return { m_prefix, m_allocator }; } - void DRAM_IOStream::beginApplyChanges(ChangeLogIOStream &changelog_io) const + void DRAM_IOStream::beginApplyChanges(DRAM_ChangeLogStreamT &changelog_io) const { assert(m_read_ahead_chunks.empty()); if (m_access_type == AccessType::READ_WRITE) { @@ -292,7 +292,7 @@ namespace db0 } #endif - void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, ChangeLogIOStream &changelog_io, + void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io, std::unordered_map > &chunks_buf) { auto create_read_ahead_buffer = [&](std::uint64_t address, std::size_t size) -> std::vector & @@ -345,7 +345,7 @@ namespace db0 throw; } } - + void flushDRAM_IOChanges(DRAM_IOStream &dram_io, std::unordered_map > &chunks_buf) { diff --git a/src/dbzero/core/storage/DRAM_IOStream.hpp b/src/dbzero/core/storage/DRAM_IOStream.hpp index ad9bd074..cda59b01 100644 --- a/src/dbzero/core/storage/DRAM_IOStream.hpp +++ b/src/dbzero/core/storage/DRAM_IOStream.hpp @@ -11,6 +11,8 @@ #include #include #include +#include "BaseStorage.hpp" +#include "ChangeLogIOStream.hpp" namespace db0 @@ -19,7 +21,6 @@ namespace db0 class DRAM_Prefix; class DRAM_Allocator; class CFile; - class ChangeLogIOStream; DB0_PACKED_BEGIN struct DB0_PACKED_ATTR o_dram_chunk_header: public o_fixed @@ -61,6 +62,8 @@ DB0_PACKED_END public: // checksums disabled in this type of stream static constexpr bool ENABLE_CHECKSUMS = false; + using DRAM_ChangeLogT = BaseStorage::DRAM_ChangeLogT; + using DRAM_ChangeLogStreamT = db0::ChangeLogIOStream; DRAM_IOStream(CFile &m_file, std::uint64_t begin, std::uint32_t block_size, std::function tail_function, AccessType access_type, std::uint32_t dram_page_size); @@ -73,12 +76,12 @@ DB0_PACKED_END * @param state_num the state number under which the modifications are to be stored * @param dram_changelog_io the stream to receive DRAM IO "changelog" chunks */ - void flushUpdates(std::uint64_t state_num, ChangeLogIOStream &dram_changelog_io); + void flushUpdates(std::uint64_t state_num, DRAM_ChangeLogStreamT &); // The purpose of this operation is allowing atomic application of changes // this call may end with an IOException without affecting internal state (except populating temporary buffers) // @return the latest state number of available changes - void beginApplyChanges(ChangeLogIOStream &changelog_io) const; + void beginApplyChanges(DRAM_ChangeLogStreamT &) const; // Apply buffered changes (allowed on condition beginApplyChanges succeeded) bool completeApplyChanges(); @@ -136,12 +139,12 @@ DB0_PACKED_END * Exhaust the entire change-log (to mark synchronization point) * then load entire contents from stream into the DRAM Storage */ - void load(ChangeLogIOStream &changelog_io); + void load(DRAM_ChangeLogStreamT &); std::size_t getChunkSize() const { return m_chunk_size; } - + private: const std::uint32_t m_dram_page_size; const std::size_t m_chunk_size; @@ -164,7 +167,7 @@ DB0_PACKED_END }; // Pre-fetch changes into the chunks buffer - void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, ChangeLogIOStream &changelog_io, + void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io, std::unordered_map > &chunks_buf); // Flush changes from the buffer diff --git a/src/dbzero/core/storage/Page_IO.cpp b/src/dbzero/core/storage/Page_IO.cpp index c277c51d..0776f12e 100644 --- a/src/dbzero/core/storage/Page_IO.cpp +++ b/src/dbzero/core/storage/Page_IO.cpp @@ -84,4 +84,10 @@ namespace db0 return { m_first_page_num + m_page_count, m_block_capacity - m_page_count }; } + std::uint64_t Page_IO::getEndPageNum() const + { + assert(m_access_type == AccessType::READ_WRITE); + return m_first_page_num + m_page_count; + } + } \ No newline at end of file diff --git a/src/dbzero/core/storage/Page_IO.hpp b/src/dbzero/core/storage/Page_IO.hpp index 9581ed92..e1465fad 100644 --- a/src/dbzero/core/storage/Page_IO.hpp +++ b/src/dbzero/core/storage/Page_IO.hpp @@ -44,7 +44,12 @@ namespace db0 std::uint64_t tail() const; std::uint32_t getPageSize() const; - + + // Get the page number which is > all pages currently stored + // This value can act as a "sentinel" for end-of-stream (at the moment of the call) + // NOTE: the member is only available in read/write mode + std::uint64_t getEndPageNum() const; + protected: const std::size_t m_header_size; const std::uint32_t m_page_size; diff --git a/src/dbzero/core/storage/SparseIndexBase.hpp b/src/dbzero/core/storage/SparseIndexBase.hpp index 3d0d9994..434c9fd2 100644 --- a/src/dbzero/core/storage/SparseIndexBase.hpp +++ b/src/dbzero/core/storage/SparseIndexBase.hpp @@ -16,7 +16,6 @@ namespace db0 #include #include #include -#include "ChangeLogIOStream.hpp" #include namespace db0 @@ -24,8 +23,7 @@ namespace db0 { class DRAM_Prefix; - class DRAM_Allocator; - class ChangeLogIOStream; + class DRAM_Allocator; /** * The in-memory sparse index implementation diff --git a/src/dbzero/core/storage/SparsePair.cpp b/src/dbzero/core/storage/SparsePair.cpp index 993f3b28..e21cdb95 100644 --- a/src/dbzero/core/storage/SparsePair.cpp +++ b/src/dbzero/core/storage/SparsePair.cpp @@ -27,7 +27,7 @@ namespace db0 SparsePair::~SparsePair() { } - + typename SparsePair::PageNumT SparsePair::getNextStoragePageNum() const { return std::max(m_sparse_index.getNextStoragePageNum(), m_diff_index.getNextStoragePageNum()); } @@ -46,7 +46,7 @@ namespace db0 return m_sparse_index.size() + m_diff_index.size(); } - const o_change_log &SparsePair::extractChangeLog(ChangeLogIOStream &changelog_io) + const SparsePair::DRAM_ChangeLogT &SparsePair::extractChangeLog(DRAM_ChangeLogStreamT &changelog_io) { assert(!m_change_log.empty()); // sort change log but keep the 1st item (the state number) at its place diff --git a/src/dbzero/core/storage/SparsePair.hpp b/src/dbzero/core/storage/SparsePair.hpp index 50c8d507..69c9cebe 100644 --- a/src/dbzero/core/storage/SparsePair.hpp +++ b/src/dbzero/core/storage/SparsePair.hpp @@ -3,13 +3,13 @@ #include #include "SparseIndex.hpp" #include "DiffIndex.hpp" +#include "BaseStorage.hpp" +#include "ChangeLogIOStream.hpp" namespace db0 { - - class ChangeLogIOStream; - + // The SparsePair combines SparseIndex and DiffIndex class SparsePair { @@ -17,6 +17,8 @@ namespace db0 using PageNumT = SparseIndex::PageNumT; using StateNumT = SparseIndex::StateNumT; using tag_create = SparseIndex::tag_create; + using DRAM_ChangeLogT = BaseStorage::DRAM_ChangeLogT; + using DRAM_ChangeLogStreamT = db0::ChangeLogIOStream; SparsePair(std::size_t node_size); SparsePair(DRAM_Pair, AccessType); @@ -54,10 +56,10 @@ namespace db0 * Write internally managed change log into a specific stream * and then clean the internal change log */ - const o_change_log &extractChangeLog(ChangeLogIOStream &); + const DRAM_ChangeLogT &extractChangeLog(DRAM_ChangeLogStreamT &); std::size_t getChangeLogSize() const; - + void commit(); private: diff --git a/src/dbzero/core/storage/copy_prefix.cpp b/src/dbzero/core/storage/copy_prefix.cpp index 437ca406..b9ce68f1 100644 --- a/src/dbzero/core/storage/copy_prefix.cpp +++ b/src/dbzero/core/storage/copy_prefix.cpp @@ -4,8 +4,8 @@ namespace db0 { - void copyDRAM_IO(DRAM_IOStream &input_io, ChangeLogIOStream &input_dram_changelog, - DRAM_IOStream &output_io, ChangeLogIOStream::Writer &output_dram_changelog) + void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, + DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog) { // Exhaust the input_dram_changelog first input_dram_changelog.setStreamPosHead(); @@ -37,5 +37,5 @@ namespace db0 out.appendToChunk(buffer.data(), chunk_size); } } - + } \ No newline at end of file diff --git a/src/dbzero/core/storage/copy_prefix.hpp b/src/dbzero/core/storage/copy_prefix.hpp index 7e5f7f22..20e29229 100644 --- a/src/dbzero/core/storage/copy_prefix.hpp +++ b/src/dbzero/core/storage/copy_prefix.hpp @@ -5,21 +5,25 @@ #include "Diff_IO.hpp" #include "DRAM_IOStream.hpp" #include "ChangeLogIOStream.hpp" +#include "BaseStorage.hpp" namespace db0 { + using DRAM_ChangeLogStreamT = db0::ChangeLogIOStream; + using DP_ChangeLogStreamT = db0::ChangeLogIOStream; + // This routine copies the entire DRAM_IO stream (from begin) in a manner // synchronized with the correspoding changelog stream // NOTE: output_changelog is NOT flushed (see the design) - void copyDRAM_IO(DRAM_IOStream &input_io, ChangeLogIOStream &input_dram_changelog, - DRAM_IOStream &output_io, ChangeLogIOStream::Writer &output_dram_changelog); + void copyDRAM_IO(DRAM_IOStream &input_io, DRAM_ChangeLogStreamT &input_dram_changelog, + DRAM_IOStream &output_io, DRAM_ChangeLogStreamT::Writer &output_dram_changelog); - // Copy entire contents from one BlockIOStream to another + // Copy entire contents from one BlockIOStream to another (type agnostic) void copyStream(BlockIOStream &in, BlockIOStream &out); - void copyPageIO(const Page_IO &input_io, const ChangeLogIOStream &input_dp_changelog, - Page_IO &output_io, ChangeLogIOStream::Writer &output_dp_changelog); + void copyPageIO(const Page_IO &input_io, const DP_ChangeLogStreamT &input_dp_changelog, + Page_IO &output_io, DP_ChangeLogStreamT::Writer &output_dp_changelog); } \ No newline at end of file diff --git a/src/dbzero/object_model/tags/SelectModified.cpp b/src/dbzero/object_model/tags/SelectModified.cpp index 98a049c1..2f44d24f 100644 --- a/src/dbzero/object_model/tags/SelectModified.cpp +++ b/src/dbzero/object_model/tags/SelectModified.cpp @@ -21,6 +21,7 @@ namespace db0::object_model std::unique_ptr selectModCandidates(std::unique_ptr &&query, const db0::BaseStorage &storage, StateNumType from_state, StateNumType to_state) { + using DP_ChangeLogT = db0::BaseStorage::DP_ChangeLogT; auto dp_size = storage.getPageSize(); auto dp_shift = db0::getPageShift(dp_size); @@ -31,7 +32,7 @@ namespace db0::object_model // 4. refine results (lazy filter) by binary comparison of pre-scope and post-scope objects to identify actual mutations std::unordered_set mutated_dps; - storage.fetchChangeLogs(from_state, to_state + 1, [&](StateNumType, const db0::o_change_log &change_log) { + storage.fetchDP_ChangeLogs(from_state, to_state + 1, [&](StateNumType, const DP_ChangeLogT &change_log) { auto it = change_log.begin(), end = change_log.end(); if (it != end) { // first element holds the state number and should be ignored diff --git a/tests/unit_tests/ChangeLogTest.cpp b/tests/unit_tests/ChangeLogTest.cpp new file mode 100644 index 00000000..5ffe8280 --- /dev/null +++ b/tests/unit_tests/ChangeLogTest.cpp @@ -0,0 +1,65 @@ +#include +#include +#include +#include + +using namespace std; +using namespace db0; +using namespace db0::tests; + +namespace tests + +{ + + class ChangeLogTest: public testing::Test + { + public: + static constexpr const char *file_name = "my-test-prefix_1.db0"; + + virtual void SetUp() override { + drop(file_name); + } + + virtual void TearDown() override { + drop(file_name); + } + }; + + TEST_F( ChangeLogTest , testChangeLogMeasureAndSizeOf ) + { + std::vector buf; + // create default change log (i.e. null header) + using ChangeLogT = o_change_log<>; + + // Test empty + { + auto measured_size = ChangeLogT::measure(ChangeLogData()); + buf.resize(measured_size); + ChangeLogT::__new(buf.data(), ChangeLogData()); + auto safe_size = ChangeLogT::safeSizeOf(buf.data()); + ASSERT_EQ(measured_size, safe_size); + } + + // Test RLE compressed + { + std::vector change_log = { 1, 2, 3, 4, 5 }; + ChangeLogData data(std::move(change_log), true, false, false); + auto measured_size = ChangeLogT::measure(data); + buf.resize(measured_size); + ChangeLogT::__new(buf.data(), data); + auto safe_size = ChangeLogT::safeSizeOf(buf.data()); + } + + // Test uncompressed + { + std::vector change_log = { 3, 4, 8 }; + ChangeLogData data(std::move(change_log), false, false, false); + auto measured_size = ChangeLogT::measure(data); + buf.resize(measured_size); + ChangeLogT::__new(buf.data(), data); + auto safe_size = ChangeLogT::safeSizeOf(buf.data()); + ASSERT_EQ(measured_size, safe_size); + } + } + +} From 5e73bd70842a650fc3eb58d6db92174dedfd33d6 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Mon, 24 Nov 2025 16:33:13 +0100 Subject: [PATCH 3/3] change-log types refactor --- src/dbzero/core/serialization/Fixed.hpp | 4 +-- src/dbzero/core/storage/ChangeLog.cpp | 4 ++- src/dbzero/core/storage/ChangeLog.hpp | 17 ++++------ src/dbzero/core/storage/ChangeLogIOStream.cpp | 33 ++++++++----------- src/dbzero/core/storage/ChangeLogIOStream.hpp | 19 +++++++++++ src/dbzero/workspace/Workspace.cpp | 4 +-- tests/unit_tests/BDevStorageTest.cpp | 6 ++-- tests/unit_tests/ChangeLogTest.cpp | 31 +++++++++++++++-- 8 files changed, 79 insertions(+), 39 deletions(-) diff --git a/src/dbzero/core/serialization/Fixed.hpp b/src/dbzero/core/serialization/Fixed.hpp index f467cc42..4683f6fc 100644 --- a/src/dbzero/core/serialization/Fixed.hpp +++ b/src/dbzero/core/serialization/Fixed.hpp @@ -76,7 +76,7 @@ DB0_PACKED_BEGIN */ template static T &__safe_ref(buf_t buf) { - const std::byte *_buf = buf; + const std::byte *_buf = static_cast(buf); // validate bounds here buf += true_size_of(); return __ref(const_cast(_buf)); @@ -84,7 +84,7 @@ DB0_PACKED_BEGIN template static const T &__safe_const_ref(buf_t buf) { - const std::byte *_buf = buf; + const std::byte *_buf = static_cast(buf); // validate bounds here buf += true_size_of(); return __const_ref(_buf); diff --git a/src/dbzero/core/storage/ChangeLog.cpp b/src/dbzero/core/storage/ChangeLog.cpp index bc1360dd..b655dda4 100644 --- a/src/dbzero/core/storage/ChangeLog.cpp +++ b/src/dbzero/core/storage/ChangeLog.cpp @@ -103,5 +103,7 @@ namespace db0 bool o_change_log::isRLECompressed() const { return rleCompressed().value(); } - + + template class o_change_log; + } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLog.hpp b/src/dbzero/core/storage/ChangeLog.hpp index 103322ff..531ece82 100644 --- a/src/dbzero/core/storage/ChangeLog.hpp +++ b/src/dbzero/core/storage/ChangeLog.hpp @@ -44,10 +44,6 @@ DB0_PACKED_BEGIN template o_change_log(const ChangeLogData &, Args&&... args); - const o_simple &rleCompressed() const { - return this->getDynFirst(o_simple::type()); - } - // change log as RLE sequence type const o_rle_sequence &rle_sequence() const { @@ -65,6 +61,10 @@ DB0_PACKED_BEGIN template static std::size_t measure(const ChangeLogData &, Args... args); + const o_simple &rleCompressed() const { + return this->getDynFirst(o_simple::type()); + } + class ConstIterator { public: @@ -82,12 +82,9 @@ DB0_PACKED_BEGIN }; bool isRLECompressed() const; - + ConstIterator begin() const; ConstIterator end() const; - - // Decode the last element from the log (possibly a sentinel) - std::uint64_t last() const; template static std::size_t safeSizeOf(T buf) { @@ -128,11 +125,11 @@ DB0_PACKED_END { bool rle_compressed = !data.m_rle_builder.empty(); if (rle_compressed) { - return measureBaseMembers(std::forward(args)...) + return super_t::measureMembersFromBase(std::forward(args)...) (o_simple::type()) (o_rle_sequence::type(), data.m_rle_builder.getData()); } else { - return measureBaseMembers(std::forward(args)...) + return super_t::measureMembersFromBase(std::forward(args)...) (o_simple::type()) (o_list >::type(), data.m_change_log); } diff --git a/src/dbzero/core/storage/ChangeLogIOStream.cpp b/src/dbzero/core/storage/ChangeLogIOStream.cpp index c05ff2c6..c9c1f0d6 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.cpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.cpp @@ -18,26 +18,7 @@ namespace db0 : BlockIOStream(std::move(io_stream)) { } - - template - template - const o_change_log_t &ChangeLogIOStream::appendChangeLog(ChangeLogData &&data, Args&&... args) - { - auto size_of = o_change_log_t::measure(data, std::forward(args)...); - if (m_buffer.size() < size_of) { - m_buffer.resize(size_of); - } - o_change_log_t::__new(m_buffer.data(), data, std::forward(args)...); - // append change log as a separate chunk - BlockIOStream::addChunk(size_of); - BlockIOStream::appendToChunk(m_buffer.data(), size_of); - m_last_change_log_ptr = &o_change_log_t::__const_ref(m_buffer.data()); - assert(m_last_change_log_ptr->sizeOf() == size_of); - - return *m_last_change_log_ptr; - } - template const o_change_log_t *ChangeLogIOStream::readChangeLogChunk(std::vector &buffer) { @@ -97,4 +78,18 @@ namespace db0 m_it_next_buffer = m_buffers.begin(); } + template + void ChangeLogIOStream::Writer::appendChangeLog(const o_change_log_t &) + { + throw std::runtime_error("not implemented"); + } + + template + void ChangeLogIOStream::Writer::flush() + { + throw std::runtime_error("not implemented"); + } + + template class ChangeLogIOStream<>; + } \ No newline at end of file diff --git a/src/dbzero/core/storage/ChangeLogIOStream.hpp b/src/dbzero/core/storage/ChangeLogIOStream.hpp index 01107fc0..3c145c55 100644 --- a/src/dbzero/core/storage/ChangeLogIOStream.hpp +++ b/src/dbzero/core/storage/ChangeLogIOStream.hpp @@ -82,6 +82,25 @@ namespace db0 std::vector m_buffer; }; + template + template + const o_change_log_t &ChangeLogIOStream::appendChangeLog(ChangeLogData &&data, Args&&... args) + { + auto size_of = o_change_log_t::measure(data, std::forward(args)...); + if (m_buffer.size() < size_of) { + m_buffer.resize(size_of); + } + + o_change_log_t::__new(m_buffer.data(), data, std::forward(args)...); + // append change log as a separate chunk + BlockIOStream::addChunk(size_of); + BlockIOStream::appendToChunk(m_buffer.data(), size_of); + m_last_change_log_ptr = &o_change_log_t::__const_ref(m_buffer.data()); + assert(m_last_change_log_ptr->sizeOf() == size_of); + + return *m_last_change_log_ptr; + } + extern template class ChangeLogIOStream<>; } \ No newline at end of file diff --git a/src/dbzero/workspace/Workspace.cpp b/src/dbzero/workspace/Workspace.cpp index c590191b..3094324c 100644 --- a/src/dbzero/workspace/Workspace.cpp +++ b/src/dbzero/workspace/Workspace.cpp @@ -230,8 +230,8 @@ namespace db0 std::optional slab_cache_size, std::optional vobject_cache_size, std::optional flush_size, std::function &, bool, bool, bool)> fixture_initializer, std::shared_ptr config, std::optional default_lock_flags) - : BaseWorkspace(root_path, cache_size, slab_cache_size, flush_size, - default_lock_flags, m_config ? config->get("suppress_dist_overflow_error") : false) + : BaseWorkspace(root_path, cache_size, slab_cache_size, flush_size, + default_lock_flags, config ? config->get("suppress_dist_overflow_error") : false) , m_config(config) , m_fixture_catalog(m_prefix_catalog) , m_fixture_initializer(fixture_initializer) diff --git a/tests/unit_tests/BDevStorageTest.cpp b/tests/unit_tests/BDevStorageTest.cpp index d62378f4..ded721ed 100644 --- a/tests/unit_tests/BDevStorageTest.cpp +++ b/tests/unit_tests/BDevStorageTest.cpp @@ -495,6 +495,8 @@ namespace tests TEST_F( BDevStorageTest , testBDevStorageFetchChangeLogs ) { + using DP_ChangeLogT = db0::BaseStorage::DP_ChangeLogT; + srand(9142424u); BDevStorage::create(file_name); BDevStorage cut(file_name, AccessType::READ_WRITE, {}, 16); @@ -536,12 +538,12 @@ namespace tests { 8, 9, 10 }, { 3, 4, 5, 6 } }; - + unsigned int range_id = 0; for (auto range: state_ranges) { // collect and validate change-logs std::vector state_nums; - cut.fetchChangeLogs(range.first, range.second, [&](StateNumType fetched_state_num, const o_change_log &cl) { + cut.fetchDP_ChangeLogs(range.first, range.second, [&](StateNumType fetched_state_num, const DP_ChangeLogT &cl) { state_nums.push_back(fetched_state_num); std::vector page_nums; auto it = cl.begin(); diff --git a/tests/unit_tests/ChangeLogTest.cpp b/tests/unit_tests/ChangeLogTest.cpp index 5ffe8280..79a202be 100644 --- a/tests/unit_tests/ChangeLogTest.cpp +++ b/tests/unit_tests/ChangeLogTest.cpp @@ -27,7 +27,7 @@ namespace tests TEST_F( ChangeLogTest , testChangeLogMeasureAndSizeOf ) { - std::vector buf; + std::vector buf; // create default change log (i.e. null header) using ChangeLogT = o_change_log<>; @@ -39,7 +39,7 @@ namespace tests auto safe_size = ChangeLogT::safeSizeOf(buf.data()); ASSERT_EQ(measured_size, safe_size); } - + // Test RLE compressed { std::vector change_log = { 1, 2, 3, 4, 5 }; @@ -47,7 +47,8 @@ namespace tests auto measured_size = ChangeLogT::measure(data); buf.resize(measured_size); ChangeLogT::__new(buf.data(), data); - auto safe_size = ChangeLogT::safeSizeOf(buf.data()); + auto safe_size = ChangeLogT::safeSizeOf(buf.data()); + ASSERT_EQ(measured_size, safe_size); } // Test uncompressed @@ -61,5 +62,29 @@ namespace tests ASSERT_EQ(measured_size, safe_size); } } + + TEST_F( ChangeLogTest , testChangeLogNullHeaderHasNoOverhead ) + { + std::vector buf; + // create default change log (i.e. null header) + using ChangeLogT = o_change_log; + + std::vector change_log = { 1, 2, 3, 4, 5 }; + ChangeLogData data(std::move(change_log), true, false, false); + auto measured_size = ChangeLogT::measure(data); + buf.resize(measured_size); + auto &cut = ChangeLogT::__new(buf.data(), data); + ASSERT_TRUE(cut.isRLECompressed()); + + auto diff = (std::byte*)&cut.rleCompressed() - (std::byte*)&cut; + ASSERT_EQ(0, diff); + + unsigned int count = 0; + for (auto addr: cut) { + ASSERT_EQ(addr, count + 1); + ++count; + } + ASSERT_EQ(count, 5u); + } }