Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/dbzero/core/serialization/Fixed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ DB0_PACKED_BEGIN
*/
template<typename buf_t> static T &__safe_ref(buf_t buf)
{
const std::byte *_buf = buf;
const std::byte *_buf = static_cast<std::byte*>(buf);
// validate bounds here
buf += true_size_of<T>();
return __ref(const_cast<std::byte*>(_buf));
}

template <typename buf_t> static const T &__safe_const_ref(buf_t buf)
{
const std::byte *_buf = buf;
const std::byte *_buf = static_cast<const std::byte*>(buf);
// validate bounds here
buf += true_size_of<T>();
return __const_ref(_buf);
Expand Down
20 changes: 16 additions & 4 deletions src/dbzero/core/serialization/Types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
namespace db0

{
DB0_PACKED_BEGIN

class Memspace;

Expand All @@ -18,6 +17,7 @@ DB0_PACKED_BEGIN
/**
* Overlaid simple type wrapper
*/
DB0_PACKED_BEGIN
template <class T> class DB0_PACKED_ATTR o_simple: public o_fixed<o_simple<T> >
{
public :
Expand Down Expand Up @@ -172,10 +172,12 @@ DB0_PACKED_BEGIN
std::uint32_t m_bytes = 0;
std::byte m_buf;
};
DB0_PACKED_END

/**
* Overlaid null type
* Overlaid null type (derived from o_base)
*/
DB0_PACKED_BEGIN
class DB0_PACKED_ATTR o_null: public o_base<o_null>
{
public :
Expand Down Expand Up @@ -207,14 +209,24 @@ DB0_PACKED_BEGIN
std::uint16_t getVersion() const;
static constexpr bool versionIsStored();
};

DB0_PACKED_END

DB0_PACKED_BEGIN
// NOTE: when used as a base class, the sizeof should be zero! (due to empty base optimization)
struct DB0_PACKED_ATTR o_fixed_null: public o_fixed<o_fixed_null>
{
static std::size_t sizeOf() {
return 0;
}
};
DB0_PACKED_END

/// some predefined simple overlaid types
using o_int = o_simple<int>;
using o_uint = o_simple<std::uint32_t>;
using o_float = o_simple<float>;
using o_double = o_simple<double>;

DB0_PACKED_END
}

namespace std
Expand Down
43 changes: 23 additions & 20 deletions src/dbzero/core/storage/BDevStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ namespace db0
: BaseStorage(access_type)
, m_file(file_name, access_type, lock_flags)
, m_config(readConfig())
, m_dram_changelog_io(getChangeLogIOStream(m_config.m_dram_changelog_io_offset, access_type))
, m_dp_changelog_io(getChangeLogIOStream(m_config.m_dp_changelog_io_offset, access_type))
, m_meta_io(init(getMetaIOStream(m_config.m_meta_io_offset, meta_io_step_size.value_or(DEFAULT_META_IO_STEP_SIZE),
access_type)))
, m_dram_io(init(getDRAMIOStream(m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io))
, m_dram_changelog_io(getChangeLogIOStream<DRAM_ChangeLogStreamT>(
m_config.m_dram_changelog_io_offset, access_type)
)
, m_dp_changelog_io(getChangeLogIOStream<DP_ChangeLogStreamT>(
m_config.m_dp_changelog_io_offset, access_type)
)
, m_meta_io(init(getMetaIOStream(
m_config.m_meta_io_offset, meta_io_step_size.value_or(DEFAULT_META_IO_STEP_SIZE), access_type))
)
, m_dram_io(init(getDRAMIOStream(
m_config.m_dram_io_offset, m_config.m_dram_page_size, access_type), m_dram_changelog_io)
)
, m_sparse_pair(m_dram_io.getDRAMPair(), access_type)
, m_sparse_index(m_sparse_pair.getSparseIndex())
, m_diff_index(m_sparse_pair.getDiffIndex())
Expand All @@ -57,7 +64,7 @@ namespace db0
{
}

DRAM_IOStream BDevStorage::init(DRAM_IOStream &&dram_io, ChangeLogIOStream &dram_change_log)
DRAM_IOStream BDevStorage::init(DRAM_IOStream &&dram_io, DRAM_ChangeLogStreamT &dram_change_log)
{
dram_io.load(dram_change_log);
return std::move(dram_io);
Expand Down Expand Up @@ -118,15 +125,15 @@ namespace db0
// Create higher-order data structures
{
CFile file(file_name, AccessType::READ_WRITE);
ChangeLogIOStream *dram_changelog_io_ptr = nullptr;
DRAM_ChangeLogStreamT *dram_changelog_io_ptr = nullptr;
DRAM_IOStream *dram_io_ptr = nullptr;
auto tail_function = [&]() {
assert(dram_io_ptr && dram_changelog_io_ptr);
// take max from the underlying I/O streams
return std::max(offset, std::max(dram_io_ptr->tail(), dram_changelog_io_ptr->tail()));
};

auto dram_changelog_io = ChangeLogIOStream(file, config->m_dram_changelog_io_offset, config->m_block_size,
auto dram_changelog_io = DRAM_ChangeLogStreamT(file, config->m_dram_changelog_io_offset, config->m_block_size,
tail_function, AccessType::READ_WRITE);
dram_changelog_io_ptr = &dram_changelog_io;
auto dram_io = DRAM_IOStream(file, config->m_dram_io_offset, config->m_block_size, tail_function,
Expand Down Expand Up @@ -369,11 +376,7 @@ namespace db0
DRAM_IOStream BDevStorage::getDRAMIOStream(std::uint64_t first_block_pos, std::uint32_t dram_page_size, AccessType access_type) {
return { m_file, first_block_pos, m_config.m_block_size, getTailFunction(), access_type, dram_page_size };
}

ChangeLogIOStream BDevStorage::getChangeLogIOStream(std::uint64_t first_block_pos, AccessType access_type) {
return { m_file, first_block_pos, m_config.m_block_size, getTailFunction(), access_type };
}


std::uint64_t BDevStorage::tail() const
{
// take max from the 4 underlying I/O streams
Expand Down Expand Up @@ -577,17 +580,17 @@ namespace db0
}
#endif

void BDevStorage::fetchChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType state_num, const o_change_log &)> f) const
void BDevStorage::fetchDP_ChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType state_num, const DP_ChangeLogT &)> f) const
{
std::unique_lock<std::shared_mutex> lock(m_mutex);
if (m_dp_changelog_io.modified()) {
THROWF(db0::IOException) << "BDevStorage::fetchChangeLogs: dp-changelog is modified and needs to be flushed first";
}
auto &dp_changelog_io = const_cast<ChangeLogIOStream &>(m_dp_changelog_io);
ChangeLogIOStream::State dp_state;
auto &dp_changelog_io = const_cast<DP_ChangeLogStreamT &>(m_dp_changelog_io);
DP_ChangeLogStreamT::State dp_state;
dp_changelog_io.saveState(dp_state);

{
std::vector<char> buf;
// try locating the nearest meta-log entry to position the dp-changelog
Expand Down Expand Up @@ -626,7 +629,7 @@ namespace db0
dp_changelog_io.restoreState(dp_state);
}

void BDevStorage::beginCommit()
void BDevStorage::beginCommit()
{
#ifndef NDEBUG
m_commit_pending = true;
Expand Down
23 changes: 15 additions & 8 deletions src/dbzero/core/storage/BDevStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ DB0_PACKED_END
public:
static constexpr std::uint32_t DEFAULT_PAGE_SIZE = 4096;
static constexpr std::size_t DEFAULT_META_IO_STEP_SIZE = 16 << 20;
using DRAM_ChangeLogStreamT = ChangeLogIOStream<>;
using DP_ChangeLogStreamT = ChangeLogIOStream<DP_ChangeLogT>;

/**
* Opens BDevStorage over an existing file
Expand Down Expand Up @@ -115,8 +117,8 @@ DB0_PACKED_END
// @return total bytes written / diff bytes written
std::pair<std::size_t, std::size_t> getDiff_IOStats() const;

void fetchChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType, const o_change_log &)> f) const override;
void fetchDP_ChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType, const DP_ChangeLogT &)> f) const override;

#ifndef NDEBUG
void getDRAM_IOMap(std::unordered_map<std::uint64_t, DRAM_PageInfo> &) const override;
Expand All @@ -134,10 +136,11 @@ DB0_PACKED_END

// DRAM-changelog stream stores the sequence of updates to DRAM pages
// DRAM-changelog must be initialized before DRAM_IOStream
ChangeLogIOStream m_dram_changelog_io;
DRAM_ChangeLogStreamT m_dram_changelog_io;
// data-page change log, each chunk corresponds to a separate data transaction
// first element from each chunk represents the state number
ChangeLogIOStream m_dp_changelog_io;
// and the rest are the logical data page numbers mutated in that transaction
DP_ChangeLogStreamT m_dp_changelog_io;
// meta-stream keeps meta-data about the other streams
MetaIOStream m_meta_io;
// memory-mapped file I/O
Expand All @@ -159,7 +162,7 @@ DB0_PACKED_END
unsigned int *m_throw_op_count_ptr = nullptr;
#endif

static DRAM_IOStream init(DRAM_IOStream &&, ChangeLogIOStream &);
static DRAM_IOStream init(DRAM_IOStream &&, DRAM_ChangeLogStreamT &);

static MetaIOStream init(MetaIOStream &&);

Expand All @@ -172,8 +175,12 @@ DB0_PACKED_END
BlockIOStream getBlockIOStream(std::uint64_t first_block_pos, AccessType);

DRAM_IOStream getDRAMIOStream(std::uint64_t first_block_pos, std::uint32_t dram_page_size, AccessType);

ChangeLogIOStream getChangeLogIOStream(std::uint64_t first_block_pos, AccessType);

template<typename ChangeLogIOStreamT>
ChangeLogIOStreamT getChangeLogIOStream(std::uint64_t first_block_pos, AccessType access_type)
{
return { m_file, first_block_pos, m_config.m_block_size, getTailFunction(), access_type };
}

MetaIOStream getMetaIOStream(std::uint64_t first_block_pos, std::size_t step_size, AccessType);

Expand All @@ -198,5 +205,5 @@ DB0_PACKED_END
void _read(std::uint64_t address, StateNumType state_num, std::size_t size, void *buffer,
FlagSet<AccessOptions> = { AccessOptions::read, AccessOptions::write }, unsigned int *chain_len = nullptr) const;
};

}
6 changes: 3 additions & 3 deletions src/dbzero/core/storage/BaseStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ namespace db0
void BaseStorage::beginCommit() {
}

void BaseStorage::endCommit() {
void BaseStorage::endCommit() {
}

void BaseStorage::fetchChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType state_num, const o_change_log &)> f) const
void BaseStorage::fetchDP_ChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType state_num, const DP_ChangeLogT &)> f) const
{
THROWF(db0::InternalException) << "Operation not supported: fetchChangeLog";
}
Expand Down
20 changes: 12 additions & 8 deletions src/dbzero/core/storage/BaseStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <dbzero/core/utils/FlagSet.hpp>
#include <dbzero/core/memory/config.hpp>
#include <dbzero/core/memory/AccessOptions.hpp>
#include <dbzero/core/serialization/Types.hpp>
#include <functional>
#include <unordered_map>
#include <optional>
Expand All @@ -12,17 +13,20 @@ namespace db0
{

class ProcessTimer;
struct o_change_log;
template <typename BaseT> struct o_change_log;

/**
* Defines the file-oriented storage interface
*/
class BaseStorage
{
public:
public:
using DRAM_ChangeLogT = db0::o_change_log<db0::o_fixed_null>;
using DP_ChangeLogT = db0::o_change_log<db0::o_fixed_null>;

BaseStorage(AccessType);
virtual ~BaseStorage() = default;

/**
* Read data from a specific state into a user provided buffer
*
Expand Down Expand Up @@ -121,13 +125,13 @@ namespace db0
// @param end_state the first state number past the last state number to be included
// in the change log (or up to the last state number if not specified)
// @param f function to be called for each transaction's change log
virtual void fetchChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType state_num, const o_change_log &)> f) const;
virtual void fetchDP_ChangeLogs(StateNumType begin_state, std::optional<StateNumType> end_state,
std::function<void(StateNumType state_num, const DP_ChangeLogT &)> f) const;

#ifndef NDEBUG
// state number, file offset
using DRAM_PageInfo = std::pair<std::uint64_t, std::uint64_t>;

struct DRAM_CheckResult
{
std::uint64_t m_address;
Expand All @@ -140,8 +144,8 @@ namespace db0

// Activate throw from commit after specific number of operations (for testing purposes)
virtual void setCrashFromCommit(unsigned int *op_count_ref);
#endif

#endif
protected:
AccessType m_access_type;
};
Expand Down
44 changes: 22 additions & 22 deletions src/dbzero/core/storage/BlockIOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ DB0_PACKED_END
static constexpr std::size_t sizeOfHeaders(bool checksums_enabled) {
return sizeOfBlockHeader(checksums_enabled) + sizeOfChunkHeader();
}

AccessType getAccessType() const {
return m_access_type;
}
Expand All @@ -195,7 +195,7 @@ DB0_PACKED_END
bool m_eos;
};

// Temporarily save the stream's state, to be later restore with restoreState()
// Temporarily save the stream's state, to be later restored with restoreState()
// NOTE: no mutations between saveState() and restoreState() are allowed, or the behavior is undefined
void saveState(State &) const;

Expand All @@ -206,6 +206,26 @@ DB0_PACKED_END
return m_modified;
}

/**
* Overwrite existing chunk with arbitrary data.
* No validations are performed, needs to be used with caution.
* Since this operations affects checksum, it's not allowed for streams opened with maintain_checksums = true
* Cursor position is not affected by this operation.
* @param address absolute address of the chunk
* @param buffer data to be written
* @param size size of the buffer
*/
void writeToChunk(std::uint64_t address, const void *buffer, std::size_t size);

/**
* Read a chunk under a specific address without moving the within-stream cursor's position
* checksum validation is not performed
* @param address absolute address of the chunk
* @param buffer buffer to hold the chunk data
* @param chunk_size size of the chunk
*/
void readFromChunk(std::uint64_t address, void *buffer, std::size_t chunk_size) const;

protected:
CFile &m_file;
// the stream's starting address
Expand Down Expand Up @@ -264,26 +284,6 @@ DB0_PACKED_END
*/
bool readBlock(std::uint64_t address, void *buffer);

/**
* Overwrite existing chunk with arbitrary data.
* No validations are performed, needs to be used with caution.
* Since this operations affects checksum, it's not allowed for streams opened with maintain_checksums = true
* Cursor position is not affected by this operation.
* @param address absolute address of the chunk
* @param buffer data to be written
* @param size size of the buffer
*/
void writeToChunk(std::uint64_t address, const void *buffer, std::size_t size);

/**
* Read a chunk under a specific address without moving the within-stream cursor's position
* checksum validation is not performed
* @param address absolute address of the chunk
* @param buffer buffer to hold the chunk data
* @param chunk_size size of the chunk
*/
void readFromChunk(std::uint64_t address, void *buffer, std::size_t chunk_size) const;

std::uint64_t nextAddress() const;
};

Expand Down
Loading