Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,32 @@ jobs:
- name: Run tests
run: |
python -m pytest -m 'not integration_test' -m 'not stress_test' -c pytest.ini --capture=no -vv
stress-tests-wheels-linux:
runs-on: ubuntu-latest
timeout-minutes: 15
needs: build-linux
strategy:
fail-fast: false
matrix:
python-version: ["3.12"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Download wheel artifact
uses: actions/download-artifact@v4
with:
name: wheels-linux-${{ matrix.python-version }}
path: ./wheels/
- name: Install wheel and dependencies
run: |
pip install pytest
pip install -r requirements.txt
pip install ./wheels/*.whl
- name: Run tests
run: |
python3 -m pytest -m 'stress_test' -c pytest.ini --capture=no -vv -s

17 changes: 6 additions & 11 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ namespace db0
, m_dp_changelog_io(getChangeLogIOStream(m_config.m_dp_changelog_io_offset, access_type))
, m_meta_io(init(getMetaIOStream(m_config.m_meta_io_offset, meta_io_step_size.value_or(DEFAULT_META_IO_STEP_SIZE),
access_type)))
, m_dram_io(init(getDRAMIOStream(m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io))
, m_dram_io(init(getDRAMIOStream(m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io))
, m_sparse_pair(m_dram_io.getDRAMPair(), access_type)
, m_sparse_index(m_sparse_pair.getSparseIndex())
, m_diff_index(m_sparse_pair.getDiffIndex())
, m_page_io(getPage_IO(m_sparse_pair.getNextStoragePageNum(), access_type))
{
// in read-only mode need to refresh in order to retrieve a consitent DRAM state
// since other process might be actively modifying the underlying file
if (m_access_type == AccessType::READ_ONLY) {
refresh();
}
Expand All @@ -56,15 +58,8 @@ namespace db0
}

DRAM_IOStream BDevStorage::init(DRAM_IOStream &&dram_io, ChangeLogIOStream &dram_change_log)
{
if (dram_io.getAccessType() == AccessType::READ_WRITE) {
// simply exhaust the change-log stream
while (dram_change_log.readChangeLogChunk());
} else {
// apply all changes from the change-log
dram_io.beginApplyChanges(dram_change_log);
dram_io.completeApplyChanges();
}
{
dram_io.load(dram_change_log);
return std::move(dram_io);
}

Expand All @@ -81,7 +76,7 @@ namespace db0
m_file.read(0, buffer.size(), buffer.data());
auto &config = o_prefix_config::__const_ref(buffer.data());
if (config.m_magic != o_prefix_config::DB0_MAGIC) {
THROWF(db0::IOException) << "Invalid DB0 file: " << m_file.getName();
THROWF(db0::IOException) << "Not a dbzero file: " << m_file.getName();
}
return config;
}
Expand Down
30 changes: 21 additions & 9 deletions src/dbzero/core/storage/DRAM_IOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ namespace db0
, m_prefix(std::make_shared<DRAM_Prefix>(m_dram_page_size))
, m_allocator(std::make_shared<DRAM_Allocator>(m_dram_page_size))
{
// load only allowed in read/write mode because otherwise the reader might
// fetch inconsistent state
if (m_access_type == AccessType::READ_WRITE) {
load();
}
}

DRAM_IOStream::DRAM_IOStream(DRAM_IOStream &&other)
Expand Down Expand Up @@ -86,9 +81,14 @@ namespace db0
}
}

void DRAM_IOStream::load()
void DRAM_IOStream::load(ChangeLogIOStream &changelog_io)
{
assert(m_access_type == AccessType::READ_WRITE);

// simply exhaust the change-log stream
// its position marks the synchronization point
while (changelog_io.readChangeLogChunk());

std::vector<char> buffer(m_chunk_size, 0);
const auto &header = o_dram_chunk_header::__ref(buffer.data());
auto bytes = buffer.data() + header.sizeOf();
Expand Down Expand Up @@ -164,7 +164,7 @@ namespace db0
m_page_map[page_num] = { state_num, address };
};

// flush all changes done to DRAM Prerfix (append modified pages only)
// flush all changes done to DRAM Prefix (append modified pages only)
std::vector<std::uint64_t> dram_changelog;
m_prefix->flushDirty([&, this](std::uint64_t page_num, const void *page_buffer) {
// the last page must be stored in a new block to mark end of the sequence
Expand Down Expand Up @@ -235,8 +235,8 @@ namespace db0
// Note that change log and the data chunks may be updated by other process while we read it
// the consistent state is only guaranteed after reaching end of the stream
auto change_log_ptr = changelog_io.readChangeLogChunk();

// first collect the change log to only visit each address once
// First collect the change log to only visit each address once
while (change_log_ptr) {
for (auto address: *change_log_ptr) {
if (m_addr_set.find(address) == m_addr_set.end()) {
Expand All @@ -251,6 +251,18 @@ namespace db0
}
change_log_ptr = changelog_io.readChangeLogChunk();
}

// Visit the addresses next
// this is important becase otherwise we might've been accessing outdated or inconsistent DPs
for (auto address: m_addr_set) {
// buffer must include BlockIOStream's chunk header and data
auto &buffer = createReadAheadBuffer(address, m_chunk_size + o_block_io_chunk_header::sizeOf());
// the address reported in changelog must already be available in the stream
// it may come from a more recent update as well (and potentially may only be partially written)
// therefore chunk-level checksum validation is necessary
BlockIOStream::readFromChunk(address, buffer.data(), buffer.size());
}

} catch (db0::IOException &) {
changelog_io.setStreamPos(stream_pos);
m_read_ahead_chunks.clear();
Expand Down
18 changes: 10 additions & 8 deletions src/dbzero/core/storage/DRAM_IOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ DB0_PACKED_BEGIN
std::size_t getAllocatedSize() const;

const DRAM_Prefix &getDRAMPrefix() const;

const DRAM_Allocator &getDRAMAllocator() const;

// get the number of random write operations performed while flushing updates
Expand All @@ -130,7 +130,13 @@ DB0_PACKED_BEGIN
// Read physical data block from file and detect discrepancies
void dramIOCheck(std::vector<DRAM_CheckResult> &) const;
#endif


/**
* Exhaust the entire change-log (to mark synchronization point)
* then load entire contents from stream into the DRAM Storage
*/
void load(ChangeLogIOStream &changelog_io);

private:
const std::uint32_t m_dram_page_size;
const std::size_t m_chunk_size;
Expand All @@ -142,12 +148,8 @@ DB0_PACKED_BEGIN
std::shared_ptr<DRAM_Allocator> m_allocator;
// chunks buffer for the beginApplyChanges / completeApplyChanges operations
mutable std::unordered_map<std::uint64_t, std::vector<char> > m_read_ahead_chunks;
mutable std::unordered_set<std::uint64_t> m_addr_set;

/**
* Load entire contents from stream into the DRAM Storage
*/
void load();
mutable std::unordered_set<std::uint64_t> m_addr_set;

void *updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
const o_dram_chunk_header &header);
void updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
Expand Down
Loading