Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,14 @@ namespace db0
m_sparse_pair.extractChangeLog(m_dp_changelog_io);
m_dram_io.flushUpdates(state_num, m_dram_changelog_io);
m_dp_changelog_io.flush();
// flush changelog AFTER all updates from dram_io have been flushed
m_dram_changelog_io.flush();
m_page_io.flush();
// NOTE: fsync has stronger guarantees than flush in a multi-process environments
m_file.fsync();
// flush changelog AFTER all updates from all other streams have been flushed
m_dram_changelog_io.flush();
// the last fsync finalizes the commit
m_file.fsync();

// commit to collect future updates correctly
m_sparse_pair.commit();
return true;
Expand Down Expand Up @@ -458,7 +461,7 @@ namespace db0
{
assert(m_access_type == AccessType::READ_ONLY);
std::uint64_t result = 0;
// continue refreshing until all updates retrieved to guarantee a consistent state
// continue refreshing until all updates are retrieved to guarantee a consistent state
do {
// safe stream positions for rollback on file read failure
auto dram_changelog_io_pos = m_dram_changelog_io.getStreamPos();
Expand Down
2 changes: 1 addition & 1 deletion src/dbzero/core/storage/BlockIOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ namespace db0
bool BlockIOStream::eos() const {
return m_eos;
}

bool BlockIOStream::readBlock(std::uint64_t address, void *buffer)
{
if (m_checksums_enabled) {
Expand Down
14 changes: 9 additions & 5 deletions src/dbzero/core/storage/BlockIOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
namespace db0

{
DB0_PACKED_BEGIN

/**
* Calculate a buffer's checksum (must be aligned to 8 bytes)
*/
std::uint64_t checksum(const void *begin, const void *end);

// block level header
DB0_PACKED_BEGIN
struct DB0_PACKED_ATTR o_block_io_block_header: public o_fixed<o_block_io_block_header>
{
std::uint64_t m_next_block_address = 0;
Expand All @@ -34,8 +34,10 @@ DB0_PACKED_BEGIN
m_next_block_address = address;
}
};

DB0_PACKED_END

// block level header with a checksum
DB0_PACKED_BEGIN
struct DB0_PACKED_ATTR o_block_io_cs_block_header:
public o_fixed_ext<o_block_io_cs_block_header, o_block_io_block_header>
{
Expand All @@ -47,9 +49,11 @@ DB0_PACKED_BEGIN
// calculate checksum of this object excluding the checksum field
return checksum((const char*)this, (const char*)this + sizeOf() - sizeof(m_block_checksum));
}
};

};
DB0_PACKED_END

// chunk level header
DB0_PACKED_BEGIN
struct DB0_PACKED_ATTR o_block_io_chunk_header: public o_fixed<o_block_io_chunk_header>
{
std::uint32_t m_chunk_size = 0;
Expand All @@ -64,6 +68,7 @@ DB0_PACKED_BEGIN
return m_chunk_size != 0;
}
};
DB0_PACKED_END

/**
* Stream of blocks embeddable into the .db0 file
Expand Down Expand Up @@ -282,5 +287,4 @@ DB0_PACKED_BEGIN
std::uint64_t nextAddress() const;
};

DB0_PACKED_END
}
4 changes: 2 additions & 2 deletions src/dbzero/core/storage/ChangeLogIOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ namespace db0

ChangeLogIOStream::ChangeLogIOStream(CFile &m_file, std::uint64_t begin, std::uint32_t block_size,
std::function<std::uint64_t()> tail_function, AccessType access_type)
// enable checksums
// enable checksums by default
: BlockIOStream(m_file, begin, block_size, tail_function, access_type, true)
{
}

ChangeLogIOStream::ChangeLogIOStream(BlockIOStream &&io_stream)
: BlockIOStream(std::move(io_stream))
{
Expand Down
2 changes: 1 addition & 1 deletion src/dbzero/workspace/Fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ namespace db0
if (timer_ptr) {
timer = std::make_unique<ProcessTimer>("Fixture::close", timer_ptr);
}

// clear cache to destroy object instances supported by the cache
// this has to be done before commit (to not commit unrefereced objects)
m_lang_cache.clear(true, as_defunct);
Expand Down