Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
cur_segment = task->segment;
if (is_raw)
{
cur_stream = cur_segment->getInputStreamRaw(*dm_context, columns_to_read, task->read_snapshot, do_range_filter_for_raw);
cur_stream = cur_segment->getInputStreamRaw(*dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, do_range_filter_for_raw);
}
else
{
Expand Down
15 changes: 11 additions & 4 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ void ColumnFilePersistedSet::updateColumnFileStats()
size_t new_rows = 0;
size_t new_bytes = 0;
size_t new_deletes = 0;
for (auto & file_level : persisted_files_levels)
rows_and_deletes_offsets_per_level.clear();
rows_and_deletes_offsets_per_level.resize(persisted_files_levels.size());
for (int i = persisted_files_levels.size() - 1; i >= 0; i--)
{
rows_and_deletes_offsets_per_level[i] = std::make_pair(new_rows, new_deletes);
auto & file_level = persisted_files_levels[i];
new_persisted_files_count += file_level.size();
for (auto & file : file_level)
{
Expand Down Expand Up @@ -343,18 +347,19 @@ MinorCompactionPtr ColumnFilePersistedSet::pickUpMinorCompaction(DMContext & con

auto compaction = std::make_shared<MinorCompaction>(next_compaction_level, minor_compaction_version);
auto & level = persisted_files_levels[next_compaction_level];
auto cur_rows_offset = rows_and_deletes_offsets_per_level[next_compaction_level].first;
auto cur_deletes_offset = rows_and_deletes_offsets_per_level[next_compaction_level].second;
if (!level.empty())
{
bool is_all_trivial_move = true;
MinorCompaction::Task cur_task;
MinorCompaction::Task cur_task{cur_rows_offset, cur_deletes_offset};
for (auto & file : level)
{
auto pack_up_cur_task = [&]() {
bool is_trivial_move = compaction->packUpTask(std::move(cur_task));
is_all_trivial_move = is_all_trivial_move && is_trivial_move;
cur_task = {};
cur_task = MinorCompaction::Task{cur_rows_offset, cur_deletes_offset};
};

if (auto * t_file = file->tryToTinyFile(); t_file)
{
bool cur_task_full = cur_task.total_rows >= context.delta_small_column_file_rows;
Expand All @@ -377,6 +382,8 @@ MinorCompactionPtr ColumnFilePersistedSet::pickUpMinorCompaction(DMContext & con
pack_up_cur_task();
cur_task.addColumnFile(file);
}
cur_rows_offset += file->getRows();
cur_deletes_offset += file->getDeletes();
}
bool is_trivial_move = compaction->packUpTask(std::move(cur_task));
is_all_trivial_move = is_all_trivial_move && is_trivial_move;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
private:
PageId metadata_id;
ColumnFilePersistedLevels persisted_files_levels;
using RowsAndDeletesOffsets = std::vector<std::pair<size_t, size_t>>;
RowsAndDeletesOffsets rows_and_deletes_offsets_per_level;

// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> persisted_files_count = 0;
std::atomic<size_t> persisted_files_level_count = 0;
Expand Down
107 changes: 79 additions & 28 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ bool DeltaValueSpace::flush(DMContext & context)

ColumnFileFlushTaskPtr flush_task;
WriteBatches wbs(context.storage_pool, context.getWriteLimiter());
DeltaIndexPtr cur_delta_index;
{
/// Prepare data which will be written to disk.
std::scoped_lock lock(mutex);
Expand All @@ -159,7 +158,6 @@ bool DeltaValueSpace::flush(DMContext & context)
return false;
}
flush_task = mem_table_set->buildFlushTask(context, persisted_file_set->getRows(), persisted_file_set->getDeletes(), persisted_file_set->getCurrentFlushVersion());
cur_delta_index = delta_index;
}

// No update, return successfully.
Expand All @@ -174,36 +172,56 @@ bool DeltaValueSpace::flush(DMContext & context)
DeltaIndexPtr new_delta_index;
if (!delta_index_updates.empty())
{
// Get current delta index and mark the delta index is updating
DeltaIndexPtr cur_delta_index;
{
std::unique_lock lock(mutex);
while (is_delta_index_updating)
{
delta_index_update_cv.wait(lock);
}
is_delta_index_updating = true;
cur_delta_index = delta_index;
}
LOG_FMT_DEBUG(log, "{} Update index start", simpleInfo());
new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates);
LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo());
}

bool success = false;
{
/// If this instance is still valid, then commit.
std::scoped_lock lock(mutex);
if (abandoned.load(std::memory_order_relaxed))
{
// Delete written data.
wbs.setRollback();
LOG_FMT_DEBUG(log, "{} Flush stop because abandoned", simpleInfo());
return false;
}

if (!flush_task->commit(persisted_file_set, wbs))
else if (!flush_task->commit(persisted_file_set, wbs))
{
wbs.rollbackWrittenLogAndData();
LOG_FMT_DEBUG(log, "{} Stop flush because structure got updated", simpleInfo());
return false;
}

/// Update delta tree
if (new_delta_index)
delta_index = new_delta_index;

LOG_FMT_DEBUG(log, "{} Flush end. Flushed {} column files, {} rows and {} deletes.", info(), flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushDeletes());
else
{
success = true;
}
if (success)
{
/// Update delta tree
if (new_delta_index)
{
delta_index = new_delta_index;
}
LOG_FMT_DEBUG(log, "{} Flush end. Flushed {} column files, {} rows and {} deletes.", info(), flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushDeletes());
}
is_delta_index_updating = false;
}
return true;
delta_index_update_cv.notify_all();
if (!success)
{
// Delete written data.
wbs.rollbackWrittenLogAndData();
}
return success;
}

bool DeltaValueSpace::compact(DMContext & context)
Expand Down Expand Up @@ -242,31 +260,64 @@ bool DeltaValueSpace::compact(DMContext & context)
// do compaction task
WriteBatches wbs(context.storage_pool, context.getWriteLimiter());
const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap);
compaction_task->prepare(context, wbs, reader);

auto delta_index_updates = compaction_task->prepare(context, wbs, reader);
DeltaIndexPtr new_delta_index;
if (!delta_index_updates.empty())
{
// Get current delta index and mark the delta index is updating
DeltaIndexPtr cur_delta_index;
{
std::unique_lock lock(mutex);
while (is_delta_index_updating)
{
delta_index_update_cv.wait(lock);
}
is_delta_index_updating = true;
cur_delta_index = delta_index;
}
LOG_FMT_DEBUG(log, "{} Update index start", simpleInfo());
new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates);
LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo());
}
bool success = false;
{
std::scoped_lock lock(mutex);

/// Check before commit.
if (abandoned.load(std::memory_order_relaxed))
{
wbs.rollbackWrittenLogAndData();
LOG_FMT_DEBUG(log, "{} Stop compact because abandoned", simpleInfo());
return false;
}
if (!compaction_task->commit(persisted_file_set, wbs))
else if (!compaction_task->commit(persisted_file_set, wbs))
{
LOG_FMT_WARNING(log, "Structure has been updated during compact");
wbs.rollbackWrittenLogAndData();
LOG_FMT_DEBUG(log, "{} Compact stop because structure got updated", simpleInfo());
return false;
}

LOG_FMT_DEBUG(log, "{} {}", simpleInfo(), compaction_task->info());
else
{
success = true;
}
if (success)
{
/// Update delta tree
if (new_delta_index)
{
delta_index = new_delta_index;
}
LOG_FMT_DEBUG(log, "{} {}", simpleInfo(), compaction_task->info());
}
is_delta_index_updating = false;
}
wbs.writeRemoves();

return true;
delta_index_update_cv.notify_all();
if (success)
{
wbs.writeRemoves();
}
else
{
wbs.rollbackWrittenLogAndData();
}
return success;
}
} // namespace DM
} // namespace DB
22 changes: 14 additions & 8 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class DeltaValueSpace
// Protects the operations in this instance.
mutable std::mutex mutex;

bool is_delta_index_updating = false;
std::condition_variable delta_index_update_cv;

Poco::Logger * log;

public:
Expand Down Expand Up @@ -376,8 +379,8 @@ class DeltaValueReader
class DeltaValueInputStream : public IBlockInputStream
{
private:
ColumnFileSetInputStream mem_table_input_stream;
ColumnFileSetInputStream persisted_files_input_stream;
BlockInputStreamPtr mem_table_input_stream;
BlockInputStreamPtr persisted_files_input_stream;

bool persisted_files_done = false;

Expand All @@ -386,25 +389,28 @@ class DeltaValueInputStream : public IBlockInputStream
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)
, persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)
: mem_table_input_stream{std::make_shared<ColumnFileSetInputStream>(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)}
, persisted_files_input_stream{std::make_shared<ColumnFileSetInputStream>(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)}
{}

String getName() const override { return "DeltaValue"; }
Block getHeader() const override { return persisted_files_input_stream.getHeader(); }
Block getHeader() const override { return persisted_files_input_stream->getHeader(); }

BlockInputStreamPtr getPersistedFilesInputStream() { return persisted_files_input_stream; }
BlockInputStreamPtr getMemTableInputStream() { return mem_table_input_stream; }

Block read() override
{
if (persisted_files_done)
return mem_table_input_stream.read();
return mem_table_input_stream->read();

Block block = persisted_files_input_stream.read();
Block block = persisted_files_input_stream->read();
if (block)
return block;
else
{
persisted_files_done = true;
return mem_table_input_stream.read();
return mem_table_input_stream->read();
}
}
};
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ MinorCompaction::MinorCompaction(size_t compaction_src_level_, size_t current_co
, current_compaction_version{current_compaction_version_}
{}

void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader)
DeltaIndex::Updates MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader)
{
DeltaIndex::Updates delta_index_updates;
for (auto & task : tasks)
{
if (task.is_trivial_move)
Expand All @@ -55,6 +56,11 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag
wbs.removed_log.delPage(t_file->getDataPageId());
}
Block compact_block = schema.cloneWithColumns(std::move(compact_columns));
IColumn::Permutation perm;
if (sortBlockByPk(getExtraHandleColumnDefine(context.is_common_handle), compact_block, perm))
{
delta_index_updates.emplace_back(task.deletes_offset, task.rows_offset, perm);
}
auto compact_rows = compact_block.rows();
auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs, task.to_compact.front()->tryToTinyFile()->getSchema());
wbs.writeLogAndData();
Expand All @@ -64,6 +70,7 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag
total_compact_rows += compact_rows;
result_compact_files += 1;
}
return delta_index_updates;
}

bool MinorCompaction::commit(ColumnFilePersistedSetPtr & persisted_file_set, WriteBatches & wbs)
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>
public:
struct Task
{
Task() = default;
Task(size_t rows_offset_, size_t deletes_offset_)
: rows_offset(rows_offset_), deletes_offset(deletes_offset_) {}


size_t rows_offset = 0;
size_t deletes_offset = 0;

ColumnFilePersisteds to_compact;
size_t total_rows = 0;
Expand Down Expand Up @@ -73,7 +78,7 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>
bool is_trivial_move = false;
if (task.to_compact.size() == 1)
{
// Maybe this column file is small, but it cannot be merged with other column files, so also remove it's cache if possible.
// Maybe this column file is small, but it cannot be merged with other column files, so also remove its cache if possible.
for (auto & f : task.to_compact)
{
if (auto * t_file = f->tryToTinyFile(); t_file)
Expand All @@ -94,7 +99,7 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>
size_t getCompactionVersion() const { return current_compaction_version; }

/// Create new column file by combining several small `ColumnFileTiny`s
void prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader);
DeltaIndex::Updates prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader);

/// Add new column files and remove old column files in `ColumnFilePersistedSet`
bool commit(ColumnFilePersistedSetPtr & persisted_file_set, WriteBatches & wbs);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
UInt64 max_version,
const RSOperatorPtr & filter,
const String & tracing_id,
bool is_raw_read,
size_t expected_block_size,
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
Expand Down Expand Up @@ -1217,7 +1218,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
false,
is_raw_read,
db_settings.dt_raw_filter_range,
extra_table_id_index,
physical_table_id,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ class DeltaMergeStore : private boost::noncopyable
UInt64 max_version,
const RSOperatorPtr & filter,
const String & tracing_id,
bool is_raw_read = true,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);
Expand Down
Loading