Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbzero/dbzero/dbzero.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def load_dynamic(name, path):

def __bootstrap__():
global __bootstrap__, __loader__, __file__
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"]
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"]
__file__ = None
for path in paths:
if os.path.isdir(path):
Expand Down
3 changes: 2 additions & 1 deletion python_tests/test_copy_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,5 @@ def validate(expected_len):

# open prefix from recovered and modified copy of a copy
db0.init(DB0_DIR, prefix=px_name, read_write=False)
validate(total_len)
validate(total_len)

43 changes: 26 additions & 17 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ namespace db0
}

// Validate state consistency
// The state number reported by DRAM IO must match the one in the DP changelog IO
// The state number reported by DRAM IO must NOT superseed the last state number recorded in DP changelog
if (auto chunk_ptr = m_dp_changelog_io.getLastChangeLogChunk()) {
auto dp_state_num = chunk_ptr->m_state_num;
auto dram_state_num = m_sparse_pair.getMaxStateNum();
if (dram_state_num != dp_state_num) {
THROWF(db0::IOException) << "Inconsistent state: DRAM IO max state number " << dram_state_num
<< " does not match DP changelog last state number " << dp_state_num;
if (dram_state_num > dp_state_num) {
THROWF(db0::IOException) << "Inconsistent state: DRAM state number " << dram_state_num
<< " exceeds DP changelog state number " << dp_state_num;
}
}
}
Expand Down Expand Up @@ -676,14 +676,30 @@ namespace db0
ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos();
}
auto dp_changelog_io_pos = m_dp_changelog_io.getStreamPos();
std::optional<StateNumType> dram_state_num;
// NOTE: ext DRAM updates have incremental nature so they might preceed DRAM updates
// without breaking the consistency
std::optional<StateNumType> ext_dram_state_num;

// reverts streams to previous positions
auto revert_streams = [&]() {
m_dram_changelog_io.setStreamPos(dram_changelog_io_pos);
m_dp_changelog_io.setStreamPos(dp_changelog_io_pos);
if (!!m_ext_space) {
assert(m_ext_dram_changelog_io);
m_ext_dram_changelog_io->setStreamPos(ext_dram_changelog_io_pos);
}
};

try {
m_dram_io.beginApplyChanges(m_dram_changelog_io);
dram_state_num = m_dram_io.beginApplyChanges(m_dram_changelog_io);
dram_changelog_io_pos = m_dram_changelog_io.getStreamPos();
if (!!m_ext_space) {
assert(m_ext_dram_changelog_io);
m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io);
ext_dram_state_num = m_ext_dram_io->beginApplyChanges(*m_ext_dram_changelog_io);
ext_dram_changelog_io_pos = m_ext_dram_changelog_io->getStreamPos();
}

// send all page-update notifications to the provided handler
if (on_page_updated) {
StateNumType updated_state_num = 0;
Expand Down Expand Up @@ -712,27 +728,20 @@ namespace db0
}

} catch (db0::IOException &) {
// NOTE: this exception may appear on distributed filesystems
// where changes are not guaranteed to be written sequentially
// need to revert the refresh operation to the point where it originally started
m_dram_changelog_io.setStreamPos(dram_changelog_io_pos);
m_dp_changelog_io.setStreamPos(dp_changelog_io_pos);
if (!!m_ext_space) {
assert(m_ext_dram_changelog_io);
m_ext_dram_changelog_io->setStreamPos(ext_dram_changelog_io_pos);
}
revert_streams();
// NOTE: this may be a temporary problem, refresh needs repeating
break;
}

if (!result) {
result = m_file.getLastModifiedTime();
}

if (m_dram_io.completeApplyChanges()) {
if (dram_state_num && m_dram_io.completeApplyChanges(*dram_state_num)) {
// refresh underlying sparse index / diff index after DRAM update
m_sparse_pair.refresh();
}
if (!!m_ext_space && m_ext_dram_io->completeApplyChanges()) {
if (!!m_ext_space && ext_dram_state_num && m_ext_dram_io->completeApplyChanges(*ext_dram_state_num)) {
m_ext_space.refresh();
}
m_meta_io.refresh();
Expand Down
2 changes: 1 addition & 1 deletion src/dbzero/core/storage/BDevStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ DB0_PACKED_END
public:
static constexpr std::uint32_t DEFAULT_PAGE_SIZE = 4096;
static constexpr std::size_t DEFAULT_META_IO_STEP_SIZE = 16 << 20;
using DRAM_ChangeLogStreamT = ChangeLogIOStream<>;
using DRAM_ChangeLogStreamT = ChangeLogIOStream<DRAM_ChangeLogT>;
using DP_ChangeLogStreamT = ChangeLogIOStream<DP_ChangeLogT>;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/dbzero/core/storage/BaseStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace db0
class BaseStorage
{
public:
using DRAM_ChangeLogT = db0::o_change_log<db0::o_fixed_null>;
using DRAM_ChangeLogT = db0::o_change_log<db0::o_dram_changelog_header>;
using DP_ChangeLogT = db0::o_change_log<db0::o_dp_changelog_header>;

BaseStorage(AccessType, StorageFlags = {});
Expand Down
3 changes: 2 additions & 1 deletion src/dbzero/core/storage/ChangeLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ namespace db0
return rleCompressed().value();
}

template class o_change_log<db0::o_fixed_null>;
template class o_change_log<>;
template class o_change_log<o_dram_changelog_header>;
template class o_change_log<o_dp_changelog_header>;

}
4 changes: 2 additions & 2 deletions src/dbzero/core/storage/ChangeLog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ DB0_PACKED_END
(o_list<o_simple<std::uint64_t> >::type(), data.m_change_log);
}
}
extern template class o_change_log<db0::o_fixed_null>;

extern template class o_change_log<>;

}
1 change: 1 addition & 0 deletions src/dbzero/core/storage/ChangeLogIOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ namespace db0
}

template class ChangeLogIOStream<>;
template class ChangeLogIOStream<db0::o_change_log<db0::o_dram_changelog_header> >;
template class ChangeLogIOStream<db0::o_change_log<db0::o_dp_changelog_header> >;

}
1 change: 1 addition & 0 deletions src/dbzero/core/storage/ChangeLogIOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ namespace db0
}

extern template class ChangeLogIOStream<>;
extern template class ChangeLogIOStream<db0::o_change_log<db0::o_dram_changelog_header> >;
extern template class ChangeLogIOStream<db0::o_change_log<db0::o_dp_changelog_header> >;

}
20 changes: 19 additions & 1 deletion src/dbzero/core/storage/ChangeLogTypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ DB0_PACKED_BEGIN
// sentinel storage page number for this transaction (see Page_IO::getEndPageNum())
// NOTE: this value might be relative if the mapping is active
std::uint64_t m_end_storage_page_num;
// reserved for future use
std::array<std::uint64_t, 2> m_reserved = { 0, 0 };

o_dp_changelog_header(StateNumType state_num, std::uint64_t end_storage_page_num)
: m_state_num(state_num)
Expand All @@ -26,7 +28,23 @@ DB0_PACKED_BEGIN
};
DB0_PACKED_END

extern template class o_change_log<db0::o_fixed_null>;
DB0_PACKED_BEGIN
struct DB0_PACKED_ATTR o_dram_changelog_header: o_fixed<o_dram_changelog_header>
{
// state number this change log corresponds to
StateNumType m_state_num;
// reserved for future use
std::array<std::uint64_t, 2> m_reserved = { 0, 0 };

o_dram_changelog_header(StateNumType state_num)
: m_state_num(state_num)
{
}
};
DB0_PACKED_END

extern template class o_change_log<>;
extern template class o_change_log<o_dram_changelog_header>;
extern template class o_change_log<o_dp_changelog_header>;

}
55 changes: 38 additions & 17 deletions src/dbzero/core/storage/DRAM_IOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace db0

{

DRAM_IOStream::DRAM_IOStream(CFile &m_file, std::uint64_t begin, std::uint32_t block_size,
std::function<std::uint64_t()> tail_function, AccessType access_type, std::uint32_t dram_page_size)
: BlockIOStream(m_file, begin, block_size, tail_function, access_type, DRAM_IOStream::ENABLE_CHECKSUMS)
Expand All @@ -37,10 +37,15 @@ namespace db0
}

void *DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
const o_dram_chunk_header &header)
const o_dram_chunk_header &header, StateNumType max_state_num)
{
if (header.m_state_num > max_state_num) {
// ignore changes beyond the last known consistent state number
return nullptr;
}

// page map = page_num / state_num
auto dram_page = m_page_map.find(header.m_page_num);
auto dram_page = m_page_map.find(header.m_page_num);
if (dram_page == m_page_map.end() || dram_page->second.m_state_num < header.m_state_num) {
// update DRAM to most recent page version, page not marked as dirty
auto result = m_prefix->update(header.m_page_num, false);
Expand Down Expand Up @@ -76,26 +81,32 @@ namespace db0
}

void DRAM_IOStream::updateDRAMPage(std::uint64_t address, std::unordered_set<std::size_t> *allocs_ptr,
const o_dram_chunk_header &header, const void *bytes)
const o_dram_chunk_header &header, const void *bytes, StateNumType max_state_num)
{
auto result = updateDRAMPage(address, allocs_ptr, header);
auto result = updateDRAMPage(address, allocs_ptr, header, max_state_num);
if (result) {
std::memcpy(result, bytes, m_dram_page_size);
}
}

void DRAM_IOStream::load(DRAM_ChangeLogStreamT &changelog_io)
{
// simply exhaust the change-log stream
// Exhaust the change-log stream first and retrieve the last valid state number
// its position marks the synchronization point
while (changelog_io.readChangeLogChunk());

std::vector<char> buffer(m_chunk_size, 0);
const auto &header = o_dram_chunk_header::__ref(buffer.data());
auto bytes = buffer.data() + header.sizeOf();

// maximum known state number by page
// this is required to only select the maximum state per page (discard older mutations)
auto last_chunk_ptr = changelog_io.getLastChangeLogChunk();
if (!last_chunk_ptr) {
// no changes to load
return;
}

// The last known consistent state number
auto max_state_num = last_chunk_ptr->m_state_num;
std::unordered_set<std::size_t> allocs;
for (;;) {
auto block_id = tellBlock();
Expand All @@ -110,12 +121,12 @@ namespace db0
THROWF(db0::IOException) << "DRAM_IOStream::load error: unaligned block";
}

updateDRAMPage(chunk_addr, &allocs, header, bytes);
updateDRAMPage(chunk_addr, &allocs, header, bytes, max_state_num);
}
m_allocator->update(allocs);
}

void DRAM_IOStream::flushUpdates(std::uint64_t state_num, DRAM_ChangeLogStreamT &dram_changelog_io)
void DRAM_IOStream::flushUpdates(StateNumType state_num, DRAM_ChangeLogStreamT &dram_changelog_io)
{
if (m_access_type == AccessType::READ_ONLY) {
THROWF(db0::IOException) << "DRAM_IOStream::flushUpdates error: read-only stream";
Expand Down Expand Up @@ -198,7 +209,7 @@ namespace db0
BlockIOStream::flush();
// output changelog, no RLE encoding, no duplicates
ChangeLogData cl_data(std::move(dram_changelog), false, false, false);
dram_changelog_io.appendChangeLog(std::move(cl_data));
dram_changelog_io.appendChangeLog(std::move(cl_data), state_num);
}

#ifndef NDEBUG
Expand All @@ -222,24 +233,24 @@ namespace db0
return { m_prefix, m_allocator };
}

void DRAM_IOStream::beginApplyChanges(DRAM_ChangeLogStreamT &changelog_io) const
std::optional<StateNumType> DRAM_IOStream::beginApplyChanges(DRAM_ChangeLogStreamT &changelog_io) const
{
assert(m_read_ahead_chunks.empty());
if (m_access_type == AccessType::READ_WRITE) {
THROWF(db0::InternalException) << "DRAM_IOStream::applyChanges require read-only stream";
}

fetchDRAM_IOChanges(*this, changelog_io, m_read_ahead_chunks);
return fetchDRAM_IOChanges(*this, changelog_io, m_read_ahead_chunks);
}

bool DRAM_IOStream::completeApplyChanges()
bool DRAM_IOStream::completeApplyChanges(StateNumType max_state_num)
{
bool result = false;
for (const auto &item: m_read_ahead_chunks) {
auto address = item.first;
const auto &buffer = item.second;
const auto &header = o_dram_chunk_header::__const_ref(buffer.data() + o_block_io_chunk_header::sizeOf());
updateDRAMPage(address, nullptr, header, header.getData());
updateDRAMPage(address, nullptr, header, header.getData(), max_state_num);
result = true;
}
m_read_ahead_chunks.clear();
Expand Down Expand Up @@ -293,8 +304,10 @@ namespace db0
}
#endif

void fetchDRAM_IOChanges(const DRAM_IOStream &dram_io, DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io,
std::unordered_map<std::uint64_t, std::vector<char> > &chunks_buf)
std::optional<StateNumType> fetchDRAM_IOChanges(const DRAM_IOStream &dram_io,
DRAM_IOStream::DRAM_ChangeLogStreamT &changelog_io,
std::unordered_map<std::uint64_t, std::vector<char> > &chunks_buf,
std::function<void(const DRAM_IOStream::DRAM_ChangeLogT &)> callback)
{
auto create_read_ahead_buffer = [&](std::uint64_t address, std::size_t size) -> std::vector<char> &
{
Expand All @@ -306,6 +319,7 @@ namespace db0
};

auto stream_pos = changelog_io.getStreamPos();
std::optional<StateNumType> max_state_num;
try {
// Must continue until exhausting the change-log
for (;;) {
Expand All @@ -322,6 +336,11 @@ namespace db0
// this is because: a) file writes are NOT atomic, b) DP might be modified while we process the log
// NOTE: this might be optimized when modifiaction timestamps are introduced
while (change_log_ptr) {
if (callback) {
callback(*change_log_ptr);
}

max_state_num = change_log_ptr->m_state_num;
for (auto address: *change_log_ptr) {
// buffer must include BlockIOStream's chunk header and data
auto &buffer = create_read_ahead_buffer(address, dram_io.getChunkSize() + o_block_io_chunk_header::sizeOf());
Expand All @@ -334,6 +353,8 @@ namespace db0
}
}

return max_state_num;

} catch (db0::IOException &) {
changelog_io.setStreamPos(stream_pos);
chunks_buf.clear();
Expand Down
Loading
Loading