From bdf36369b486521daca71d332732a93646bf20a0 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 13 Jun 2022 23:31:41 +0800 Subject: [PATCH 1/9] alpha version --- .../DeltaMerge/DMSegmentThreadInputStream.h | 2 +- .../Delta/ColumnFilePersistedSet.cpp | 13 ++++++-- .../DeltaMerge/Delta/ColumnFilePersistedSet.h | 3 ++ .../DeltaMerge/Delta/DeltaValueSpace.cpp | 15 ++++++++- .../DeltaMerge/Delta/DeltaValueSpace.h | 19 ++++++----- .../DeltaMerge/Delta/MinorCompaction.cpp | 9 +++++- .../DeltaMerge/Delta/MinorCompaction.h | 11 +++++-- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 3 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 1 + dbms/src/Storages/DeltaMerge/Segment.cpp | 32 +++++++++++-------- dbms/src/Storages/DeltaMerge/Segment.h | 2 ++ dbms/src/Storages/StorageDeltaMerge.cpp | 1 + 12 files changed, 79 insertions(+), 32 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index d47c3b105cc..325b069b32b 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -104,7 +104,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream cur_segment = task->segment; if (is_raw) { - cur_stream = cur_segment->getInputStreamRaw(*dm_context, columns_to_read, task->read_snapshot, do_range_filter_for_raw); + cur_stream = cur_segment->getInputStreamRaw(*dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, do_range_filter_for_raw); } else { diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index 33ef262d557..0886ade9b52 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -56,8 +56,11 @@ void ColumnFilePersistedSet::updateColumnFileStats() size_t new_rows = 0; size_t new_bytes = 0; size_t new_deletes = 0; - for (auto & file_level : persisted_files_levels) + rows_and_deletes_offsets_per_level.clear(); + for (size_t i = persisted_files_levels.size() - 1; i >= 0; i++) { + rows_and_deletes_offsets_per_level[i] = std::make_pair(new_rows, new_deletes); + auto & file_level = persisted_files_levels[i]; new_persisted_files_count += file_level.size(); for (auto & file : file_level) { @@ -343,17 +346,21 @@ MinorCompactionPtr ColumnFilePersistedSet::pickUpMinorCompaction(DMContext & con auto compaction = std::make_shared(next_compaction_level, minor_compaction_version); auto & level = persisted_files_levels[next_compaction_level]; + auto cur_rows_offset = rows_and_deletes_offsets_per_level[next_compaction_level].first; + auto cur_deletes_offset = rows_and_deletes_offsets_per_level[next_compaction_level].second; if (!level.empty()) { bool is_all_trivial_move = true; - MinorCompaction::Task cur_task; + MinorCompaction::Task cur_task{cur_rows_offset, cur_deletes_offset}; for (auto & file : level) { auto pack_up_cur_task = [&]() { bool is_trivial_move = compaction->packUpTask(std::move(cur_task)); is_all_trivial_move = is_all_trivial_move && is_trivial_move; - cur_task = {}; + cur_task = MinorCompaction::Task{cur_rows_offset, cur_deletes_offset}; }; + cur_rows_offset += file->getRows(); + cur_deletes_offset += file->getDeletes(); if (auto * t_file = file->tryToTinyFile(); t_file) { diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index 1580ae747da..4d39d6a4f84 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -56,6 +56,9 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this>; + RowsAndDeletesOffsets rows_and_deletes_offsets_per_level; + // TODO: check the proper memory_order when use this atomic variable std::atomic persisted_files_count = 0; std::atomic persisted_files_level_count = 0; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 132732d6989..c82e279b7fe 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -223,6 +223,7 @@ bool DeltaValueSpace::compact(DMContext & context) MinorCompactionPtr compaction_task; PageStorage::SnapshotPtr log_storage_snap; + DeltaIndexPtr cur_delta_index; { std::scoped_lock lock(mutex); if (abandoned.load(std::memory_order_relaxed)) @@ -237,12 +238,20 @@ bool DeltaValueSpace::compact(DMContext & context) return true; } log_storage_snap = context.storage_pool.logReader()->getSnapshot(/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo())); + cur_delta_index = delta_index; } // do compaction task WriteBatches wbs(context.storage_pool, context.getWriteLimiter()); const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap); - compaction_task->prepare(context, wbs, reader); + auto delta_index_updates = compaction_task->prepare(context, wbs, reader); + DeltaIndexPtr new_delta_index; + if (!delta_index_updates.empty()) + { + LOG_FMT_DEBUG(log, "{} Update index start", simpleInfo()); + new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates); + LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo()); + } { std::scoped_lock lock(mutex); @@ -262,6 +271,10 @@ bool DeltaValueSpace::compact(DMContext & context) return false; } + /// Update delta tree + if (new_delta_index) + delta_index = new_delta_index; + LOG_FMT_DEBUG(log, "{} {}", simpleInfo(), compaction_task->info()); } wbs.writeRemoves(); diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 8f14682caa8..807a85f043a 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -376,8 +376,8 @@ class DeltaValueReader class DeltaValueInputStream : public IBlockInputStream { private: - ColumnFileSetInputStream mem_table_input_stream; - ColumnFileSetInputStream persisted_files_input_stream; + BlockInputStreamPtr mem_table_input_stream; + BlockInputStreamPtr persisted_files_input_stream; bool persisted_files_done = false; @@ -386,25 +386,28 @@ class DeltaValueInputStream : public IBlockInputStream const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, const RowKeyRange & segment_range_) - : mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_) - , persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_) + : mem_table_input_stream{std::make_shared(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)} + , persisted_files_input_stream{std::make_shared(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)} {} String getName() const override { return "DeltaValue"; } - Block getHeader() const override { return persisted_files_input_stream.getHeader(); } + Block getHeader() const override { return persisted_files_input_stream->getHeader(); } + + BlockInputStreamPtr getPersistedFilesInputStream() { return persisted_files_input_stream; } + BlockInputStreamPtr getMemTableInputStream() { return mem_table_input_stream; } Block read() override { if (persisted_files_done) - return mem_table_input_stream.read(); + return mem_table_input_stream->read(); - Block block = persisted_files_input_stream.read(); + Block block = persisted_files_input_stream->read(); if (block) return block; else { persisted_files_done = true; - return mem_table_input_stream.read(); + return mem_table_input_stream->read(); } } }; diff --git a/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp b/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp index 53e58c3bc23..61d579eae6d 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp @@ -29,8 +29,9 @@ MinorCompaction::MinorCompaction(size_t compaction_src_level_, size_t current_co , current_compaction_version{current_compaction_version_} {} -void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader) +DeltaIndex::Updates MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader) { + DeltaIndex::Updates delta_index_updates; for (auto & task : tasks) { if (task.is_trivial_move) @@ -55,6 +56,11 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag wbs.removed_log.delPage(t_file->getDataPageId()); } Block compact_block = schema.cloneWithColumns(std::move(compact_columns)); + IColumn::Permutation perm; + if (sortBlockByPk(getExtraHandleColumnDefine(context.is_common_handle), compact_block, perm)) + { + delta_index_updates.emplace_back(task.deletes_offset, task.rows_offset, perm); + } auto compact_rows = compact_block.rows(); auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs, task.to_compact.front()->tryToTinyFile()->getSchema()); wbs.writeLogAndData(); @@ -64,6 +70,7 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag total_compact_rows += compact_rows; result_compact_files += 1; } + return delta_index_updates; } bool MinorCompaction::commit(ColumnFilePersistedSetPtr & persisted_file_set, WriteBatches & wbs) diff --git a/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h b/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h index 1c727fe4357..4f4e902af07 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h @@ -33,7 +33,12 @@ class MinorCompaction : public std::enable_shared_from_this public: struct Task { - Task() = default; + Task(size_t rows_offset_, size_t deletes_offset_) + : rows_offset(rows_offset_), deletes_offset(deletes_offset_) {} + + + size_t rows_offset = 0; + size_t deletes_offset = 0; ColumnFilePersisteds to_compact; size_t total_rows = 0; @@ -73,7 +78,7 @@ class MinorCompaction : public std::enable_shared_from_this bool is_trivial_move = false; if (task.to_compact.size() == 1) { - // Maybe this column file is small, but it cannot be merged with other column files, so also remove it's cache if possible. + // Maybe this column file is small, but it cannot be merged with other column files, so also remove its cache if possible. for (auto & f : task.to_compact) { if (auto * t_file = f->tryToTinyFile(); t_file) @@ -94,7 +99,7 @@ class MinorCompaction : public std::enable_shared_from_this size_t getCompactionVersion() const { return current_compaction_version; } /// Create new column file by combining several small `ColumnFileTiny`s - void prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader); + DeltaIndex::Updates prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader); /// Add new column files and remove old column files in `ColumnFilePersistedSet` bool commit(ColumnFilePersistedSetPtr & persisted_file_set, WriteBatches & wbs); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index a74404f3dbb..423b3b6fc37 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1182,6 +1182,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, UInt64 max_version, const RSOperatorPtr & filter, const String & tracing_id, + bool is_raw, size_t expected_block_size, const SegmentIdSet & read_segments, size_t extra_table_id_index) @@ -1217,7 +1218,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - false, + is_raw, db_settings.dt_raw_filter_range, extra_table_id_index, physical_table_id, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 705481ca107..e3cfc20ad6f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -363,6 +363,7 @@ class DeltaMergeStore : private boost::noncopyable UInt64 max_version, const RSOperatorPtr & filter, const String & tracing_id, + bool is_raw = true, size_t expected_block_size = DEFAULT_BLOCK_SIZE, const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 8398fdcee40..ac1ac66a109 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -500,6 +500,8 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + const RSOperatorPtr & filter, bool do_range_filter, size_t expected_block_size) { @@ -520,28 +522,28 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, } } + DeltaValueInputStream delta_stream(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); + auto memtable_stream = delta_stream.getMemTableInputStream(); + auto persisted_files_stream = delta_stream.getPersistedFilesInputStream(); - BlockInputStreamPtr delta_stream = std::make_shared(dm_context, // - segment_snap->delta, - new_columns_to_read, - this->rowkey_range); - - RowKeyRanges rowkey_ranges{rowkey_range}; BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( dm_context, *new_columns_to_read, - rowkey_ranges, - EMPTY_FILTER, + data_ranges, + filter, std::numeric_limits::max(), expected_block_size, false); if (do_range_filter) { - delta_stream = std::make_shared>(delta_stream, rowkey_ranges, 0); - delta_stream = std::make_shared(delta_stream, columns_to_read); + memtable_stream = std::make_shared>(memtable_stream, data_ranges, 0); + memtable_stream = std::make_shared(memtable_stream, columns_to_read); + + persisted_files_stream = std::make_shared>(persisted_files_stream, data_ranges, 0); + persisted_files_stream = std::make_shared(persisted_files_stream, columns_to_read); - stable_stream = std::make_shared>(stable_stream, rowkey_ranges, 0); + stable_stream = std::make_shared>(stable_stream, data_ranges, 0); stable_stream = std::make_shared(stable_stream, columns_to_read); } @@ -549,7 +551,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, if (dm_context.read_delta_only) { - streams.push_back(delta_stream); + streams.push_back(memtable_stream); + streams.push_back(persisted_files_stream); } else if (dm_context.read_stable_only) { @@ -557,7 +560,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, } else { - streams.push_back(delta_stream); + streams.push_back(memtable_stream); + streams.push_back(persisted_files_stream); streams.push_back(stable_stream); } return std::make_shared(streams, /*req_id=*/""); @@ -568,7 +572,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, con auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw); if (!segment_snap) return {}; - return getInputStreamRaw(dm_context, columns_to_read, segment_snap, true); + return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}, EMPTY_FILTER, true); } SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index cccfc5091b9..27410924d1d 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -182,6 +182,8 @@ class Segment : private boost::noncopyable const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + const RSOperatorPtr & filter, bool do_range_filter, size_t expected_block_size = DEFAULT_BLOCK_SIZE); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 67d32c73a05..8cc3b6a994c 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -756,6 +756,7 @@ BlockInputStreams StorageDeltaMerge::read( /*max_version=*/mvcc_query_info.read_tso, rs_operator, query_info.req_id, + true, max_block_size, parseSegmentSet(select_query.segment_expression_list), extra_table_id_index); From b387f4aaccb531110af441806b0364843a0c8fc4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 14 Jun 2022 15:24:03 +0800 Subject: [PATCH 2/9] fix segmentation fault --- dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index 0886ade9b52..b16f4cadc44 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -57,7 +57,8 @@ void ColumnFilePersistedSet::updateColumnFileStats() size_t new_bytes = 0; size_t new_deletes = 0; rows_and_deletes_offsets_per_level.clear(); - for (size_t i = persisted_files_levels.size() - 1; i >= 0; i++) + rows_and_deletes_offsets_per_level.resize(persisted_files_levels.size()); + for (int i = persisted_files_levels.size() - 1; i >= 0; i--) { rows_and_deletes_offsets_per_level[i] = std::make_pair(new_rows, new_deletes); auto & file_level = persisted_files_levels[i]; From bc137fd9366339efac1956a75e70cc6cc5328adc Mon Sep 17 00:00:00 2001 From: Wish Date: Mon, 13 Jun 2022 23:32:44 +0800 Subject: [PATCH 3/9] Handle fast mode DDL --- dbms/src/Storages/Transaction/TiDB.cpp | 38 +++++++++++++++++++++ dbms/src/Storages/Transaction/TiDB.h | 11 ++++++ dbms/src/TiDB/Schema/SchemaBuilder.cpp | 46 ++++++++++++++++++++++++++ dbms/src/TiDB/Schema/SchemaBuilder.h | 3 ++ dbms/src/TiDB/Schema/SchemaGetter.h | 4 ++- 5 files changed, 101 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index 15bf2a3fb58..55a9a552af9 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -772,6 +772,36 @@ catch (const Poco::Exception & e) DB::Exception(e)); } +std::string_view modeToString(TiFlashMode mode) +{ + switch (mode) + { + case TiFlashMode::Normal: + return ""; + case TiFlashMode::Fast: + return "fast"; + default: + return ""; + } +} + +TiFlashMode parseMode(std::string_view mode_str) +{ + if (mode_str == "") + { + return TiFlashMode::Normal; + } + else if (mode_str == "fast") + { + return TiFlashMode::Fast; + } + else + { + return TiFlashMode::Normal; + } +} + + /////////////////////// ////// TableInfo ////// /////////////////////// @@ -840,6 +870,8 @@ try json->set("tiflash_replica", replica_info.getJSONObject()); + json->set("tiflash_mode", std::string{modeToString(mode)}); + json->stringify(buf); return buf.str(); @@ -926,6 +958,11 @@ try replica_info.deserialize(replica_obj); } } + if (obj->has("tiflash_mode")) + { + auto mode_str = obj->getValue("tiflash_mode"); + mode = parseMode(mode_str); + } if (is_common_handle && index_infos.size() != 1) { throw DB::Exception( @@ -1128,4 +1165,5 @@ ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type) return ret; } + } // namespace TiDB diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index f67bfb332c7..5d2e6259281 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -323,6 +323,12 @@ struct IndexInfo bool is_global; }; +enum class TiFlashMode +{ + Normal, + Fast, +}; + struct TableInfo { TableInfo() = default; @@ -372,6 +378,8 @@ struct TableInfo // The TiFlash replica info persisted by TiDB TiFlashReplicaInfo replica_info; + TiFlashMode mode; + ::TiDB::StorageEngine engine_type = ::TiDB::StorageEngine::UNSPECIFIED; ColumnID getColumnID(const String & name) const; @@ -398,4 +406,7 @@ String genJsonNull(); tipb::FieldType columnInfoToFieldType(const ColumnInfo & ci); ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type); +std::string_view modeToString(TiFlashMode mode); +TiFlashMode parseMode(std::string_view mode_str); + } // namespace TiDB diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.cpp b/dbms/src/TiDB/Schema/SchemaBuilder.cpp index 99e540e6c95..c9add2c5e4b 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.cpp +++ b/dbms/src/TiDB/Schema/SchemaBuilder.cpp @@ -534,6 +534,11 @@ void SchemaBuilder::applyDiff(const SchemaDiff & diff) applySetTiFlashReplica(db_info, diff.table_id); break; } + case SchemaActionType::SetTiFlashMode: + { + applySetTiFlashMode(db_info, diff.table_id); + break; + } default: { if (diff.type < SchemaActionType::MaxRecognizedType) @@ -1231,6 +1236,46 @@ void SchemaBuilder::applySetTiFlashReplica( LOG_FMT_INFO(log, "Updated replica info for {}", name_mapper.debugCanonicalName(*db_info, table_info)); } +template +void SchemaBuilder::applySetTiFlashMode(TiDB::DBInfoPtr db_info, TableID table_id) +{ + auto latest_table_info = getter.getTableInfo(db_info->id, table_id); + if (unlikely(latest_table_info == nullptr)) + { + throw TiFlashException(fmt::format("miss table in TiKV : {}", table_id), Errors::DDL::StaleSchema); + } + auto & tmt_context = context.getTMTContext(); + auto storage = tmt_context.getStorages().get(latest_table_info->id); + if (unlikely(storage == nullptr)) + { + throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*db_info, *latest_table_info)), + Errors::DDL::MissingTable); + } + + auto managed_storage = std::dynamic_pointer_cast(storage); + if (unlikely(!managed_storage)) + throw Exception(fmt::format("{} is not a ManageableStorage", name_mapper.debugCanonicalName(*db_info, *latest_table_info))); + + applySetTiFlashMode(db_info, latest_table_info, managed_storage); +} + +template +void SchemaBuilder::applySetTiFlashMode( + TiDB::DBInfoPtr db_info, + TiDB::TableInfoPtr latest_table_info, + ManageableStoragePtr storage) +{ + if (storage->getTableInfo().mode == latest_table_info->mode) + return; + + TiDB::TableInfo table_info = storage->getTableInfo(); + table_info.mode = latest_table_info->mode; + + LOG_FMT_INFO(log, "Table mode for {} updated to {}", name_mapper.debugCanonicalName(*db_info, table_info), modeToString(table_info.mode)); + storage->setTableInfo(table_info); + // storage->alterMode(....); +} + template void SchemaBuilder::syncAllSchema() { @@ -1300,6 +1345,7 @@ void SchemaBuilder::syncAllSchema() applyRenameLogicalTable(db, table, storage); /// Update replica info if needed. applySetTiFlashReplica(db, table, storage); + applySetTiFlashMode(db, table, storage); /// Alter if needed. applyAlterLogicalTable(db, table, storage); LOG_FMT_DEBUG(log, "Table {} synced during sync all schemas", name_mapper.debugCanonicalName(*db, *table)); diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.h b/dbms/src/TiDB/Schema/SchemaBuilder.h index 8446765f74a..eefe15d360e 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.h +++ b/dbms/src/TiDB/Schema/SchemaBuilder.h @@ -88,6 +88,9 @@ struct SchemaBuilder void applySetTiFlashReplica(TiDB::DBInfoPtr db_info, TableID table_id); void applySetTiFlashReplica(TiDB::DBInfoPtr db_info, TiDB::TableInfoPtr table_info, ManageableStoragePtr storage); + + void applySetTiFlashMode(TiDB::DBInfoPtr db_info, TableID table_id); + void applySetTiFlashMode(TiDB::DBInfoPtr db_info, TiDB::TableInfoPtr table_info, ManageableStoragePtr storage); }; } // namespace DB diff --git a/dbms/src/TiDB/Schema/SchemaGetter.h b/dbms/src/TiDB/Schema/SchemaGetter.h index cfa5e1c6335..937f56eff0e 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.h +++ b/dbms/src/TiDB/Schema/SchemaGetter.h @@ -92,10 +92,12 @@ enum class SchemaActionType : Int8 AlterNoCacheTable = 59, CreateTables = 60, + SetTiFlashMode = 100, + // If we supporte new type from TiDB. // MaxRecognizedType also needs to be changed. // It should always be equal to the maximum supported type + 1 - MaxRecognizedType = 61, + MaxRecognizedType = 101, }; struct AffectedOption From 5ba01f73140964fea5a8b2731fbb4c3b04af69b0 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 14 Jun 2022 16:12:01 +0800 Subject: [PATCH 4/9] choose read mode according to storage mode --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 4 ++-- dbms/src/Storages/DeltaMerge/DeltaMergeStore.h | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 423b3b6fc37..e60e23f84ae 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1182,7 +1182,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, UInt64 max_version, const RSOperatorPtr & filter, const String & tracing_id, - bool is_raw, + bool is_raw_read, size_t expected_block_size, const SegmentIdSet & read_segments, size_t extra_table_id_index) @@ -1218,7 +1218,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - is_raw, + is_raw_read, db_settings.dt_raw_filter_range, extra_table_id_index, physical_table_id, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index e3cfc20ad6f..6c510655cd8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -363,7 +363,7 @@ class DeltaMergeStore : private boost::noncopyable UInt64 max_version, const RSOperatorPtr & filter, const String & tracing_id, - bool is_raw = true, + bool is_raw_read = true, size_t expected_block_size = DEFAULT_BLOCK_SIZE, const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 8cc3b6a994c..65067c23b60 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -756,7 +756,7 @@ BlockInputStreams StorageDeltaMerge::read( /*max_version=*/mvcc_query_info.read_tso, rs_operator, query_info.req_id, - true, + /* is_raw_read */tidb_table_info.mode == TiDB::TiFlashMode::Fast, max_block_size, parseSegmentSet(select_query.segment_expression_list), extra_table_id_index); From 1869afe153c656eed64f7eb43a9855dad2f0f0b2 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jun 2022 11:40:55 +0800 Subject: [PATCH 5/9] only preserve one version at major compaction --- .../DeltaMerge/Delta/DeltaValueSpace.cpp | 13 ++----- dbms/src/Storages/DeltaMerge/Segment.cpp | 34 +++++++------------ 2 files changed, 14 insertions(+), 33 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index c82e279b7fe..6cadd7e3c88 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -223,7 +223,6 @@ bool DeltaValueSpace::compact(DMContext & context) MinorCompactionPtr compaction_task; PageStorage::SnapshotPtr log_storage_snap; - DeltaIndexPtr cur_delta_index; { std::scoped_lock lock(mutex); if (abandoned.load(std::memory_order_relaxed)) @@ -238,20 +237,12 @@ bool DeltaValueSpace::compact(DMContext & context) return true; } log_storage_snap = context.storage_pool.logReader()->getSnapshot(/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo())); - cur_delta_index = delta_index; } // do compaction task WriteBatches wbs(context.storage_pool, context.getWriteLimiter()); const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap); auto delta_index_updates = compaction_task->prepare(context, wbs, reader); - DeltaIndexPtr new_delta_index; - if (!delta_index_updates.empty()) - { - LOG_FMT_DEBUG(log, "{} Update index start", simpleInfo()); - new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates); - LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo()); - } { std::scoped_lock lock(mutex); @@ -272,8 +263,8 @@ bool DeltaValueSpace::compact(DMContext & context) } /// Update delta tree - if (new_delta_index) - delta_index = new_delta_index; + if (!delta_index_updates.empty()) + delta_index = std::make_shared(); LOG_FMT_DEBUG(log, "{} {}", simpleInfo(), compaction_task->info()); } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index ac1ac66a109..aa5c62845b5 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -491,7 +491,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co data_stream = std::make_shared>( data_stream, *read_info.read_columns, - dm_context.min_version, + UINT64_MAX, is_common_handle); return data_stream; @@ -506,20 +506,13 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, size_t expected_block_size) { auto new_columns_to_read = std::make_shared(); + (void)do_range_filter; + new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle)); - if (!do_range_filter) - { - (*new_columns_to_read) = columns_to_read; - } - else + for (const auto & c : columns_to_read) { - new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle)); - - for (const auto & c : columns_to_read) - { - if (c.id != EXTRA_HANDLE_COLUMN_ID) - new_columns_to_read->push_back(c); - } + if (c.id != EXTRA_HANDLE_COLUMN_ID) + new_columns_to_read->push_back(c); } DeltaValueInputStream delta_stream(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); @@ -535,17 +528,14 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, expected_block_size, false); - if (do_range_filter) - { - memtable_stream = std::make_shared>(memtable_stream, data_ranges, 0); - memtable_stream = std::make_shared(memtable_stream, columns_to_read); + memtable_stream = std::make_shared>(memtable_stream, data_ranges, 0); + memtable_stream = std::make_shared(memtable_stream, columns_to_read); - persisted_files_stream = std::make_shared>(persisted_files_stream, data_ranges, 0); - persisted_files_stream = std::make_shared(persisted_files_stream, columns_to_read); + persisted_files_stream = std::make_shared>(persisted_files_stream, data_ranges, 0); + persisted_files_stream = std::make_shared(persisted_files_stream, columns_to_read); - stable_stream = std::make_shared>(stable_stream, data_ranges, 0); - stable_stream = std::make_shared(stable_stream, columns_to_read); - } + stable_stream = std::make_shared>(stable_stream, data_ranges, 0); + stable_stream = std::make_shared(stable_stream, columns_to_read); BlockInputStreams streams; From d89c636a2e2bd1c548435727253629b05519eb39 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jun 2022 15:54:06 +0800 Subject: [PATCH 6/9] avoid race of updating delta index between flush and minor compaction task --- .../DeltaMerge/Delta/DeltaValueSpace.cpp | 100 +++++++++++++----- .../DeltaMerge/Delta/DeltaValueSpace.h | 3 + 2 files changed, 74 insertions(+), 29 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 6cadd7e3c88..1ad03b4b0de 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -149,7 +149,6 @@ bool DeltaValueSpace::flush(DMContext & context) ColumnFileFlushTaskPtr flush_task; WriteBatches wbs(context.storage_pool, context.getWriteLimiter()); - DeltaIndexPtr cur_delta_index; { /// Prepare data which will be written to disk. std::scoped_lock lock(mutex); @@ -159,7 +158,6 @@ bool DeltaValueSpace::flush(DMContext & context) return false; } flush_task = mem_table_set->buildFlushTask(context, persisted_file_set->getRows(), persisted_file_set->getDeletes(), persisted_file_set->getCurrentFlushVersion()); - cur_delta_index = delta_index; } // No update, return successfully. @@ -174,36 +172,53 @@ bool DeltaValueSpace::flush(DMContext & context) DeltaIndexPtr new_delta_index; if (!delta_index_updates.empty()) { + // Get current delta index and mark the delta index is updating + DeltaIndexPtr cur_delta_index; + { + std::unique_lock lock(mutex); + while (is_delta_index_updating) + { + delta_index_update_cv.wait(lock); + } + is_delta_index_updating = true; + cur_delta_index = delta_index; + } LOG_FMT_DEBUG(log, "{} Update index start", simpleInfo()); new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates); LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo()); } + bool success = false; { /// If this instance is still valid, then commit. std::scoped_lock lock(mutex); if (abandoned.load(std::memory_order_relaxed)) { - // Delete written data. - wbs.setRollback(); LOG_FMT_DEBUG(log, "{} Flush stop because abandoned", simpleInfo()); - return false; } - - if (!flush_task->commit(persisted_file_set, wbs)) + else if (!flush_task->commit(persisted_file_set, wbs)) { - wbs.rollbackWrittenLogAndData(); LOG_FMT_DEBUG(log, "{} Stop flush because structure got updated", simpleInfo()); - return false; } - - /// Update delta tree - if (new_delta_index) + else if (new_delta_index) + { + /// Update delta tree and release the update status of delta index delta_index = new_delta_index; - - LOG_FMT_DEBUG(log, "{} Flush end. Flushed {} column files, {} rows and {} deletes.", info(), flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushDeletes()); + success = true; + } + is_delta_index_updating = false; + if (success) + { + LOG_FMT_DEBUG(log, "{} Flush end. Flushed {} column files, {} rows and {} deletes.", info(), flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushDeletes()); + } } - return true; + delta_index_update_cv.notify_all(); + if (!success) + { + // Delete written data. + wbs.rollbackWrittenLogAndData(); + } + return success; } bool DeltaValueSpace::compact(DMContext & context) @@ -243,34 +258,61 @@ bool DeltaValueSpace::compact(DMContext & context) WriteBatches wbs(context.storage_pool, context.getWriteLimiter()); const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap); auto delta_index_updates = compaction_task->prepare(context, wbs, reader); - + DeltaIndexPtr new_delta_index; + if (!delta_index_updates.empty()) + { + // Get current delta index and mark the delta index is updating + DeltaIndexPtr cur_delta_index; + { + std::unique_lock lock(mutex); + while (is_delta_index_updating) + { + delta_index_update_cv.wait(lock); + } + is_delta_index_updating = true; + cur_delta_index = delta_index; + } + LOG_FMT_DEBUG(log, "{} Update index start", simpleInfo()); + new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates); + LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo()); + } + bool success = false; { std::scoped_lock lock(mutex); /// Check before commit. if (abandoned.load(std::memory_order_relaxed)) { - wbs.rollbackWrittenLogAndData(); LOG_FMT_DEBUG(log, "{} Stop compact because abandoned", simpleInfo()); - return false; } - if (!compaction_task->commit(persisted_file_set, wbs)) + else if (!compaction_task->commit(persisted_file_set, wbs)) { LOG_FMT_WARNING(log, "Structure has been updated during compact"); - wbs.rollbackWrittenLogAndData(); LOG_FMT_DEBUG(log, "{} Compact stop because structure got updated", simpleInfo()); return false; } - - /// Update delta tree - if (!delta_index_updates.empty()) - delta_index = std::make_shared(); - - LOG_FMT_DEBUG(log, "{} {}", simpleInfo(), compaction_task->info()); + else if (new_delta_index) + { + /// Update delta tree + delta_index = new_delta_index; + success = true; + } + is_delta_index_updating = false; + if (success) + { + LOG_FMT_DEBUG(log, "{} {}", simpleInfo(), compaction_task->info()); + } } - wbs.writeRemoves(); - - return true; + delta_index_update_cv.notify_all(); + if (success) + { + wbs.writeRemoves(); + } + else + { + wbs.rollbackWrittenLogAndData(); + } + return success; } } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 807a85f043a..fc99ae63c5d 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -91,6 +91,9 @@ class DeltaValueSpace // Protects the operations in this instance. mutable std::mutex mutex; + bool is_delta_index_updating = false; + std::condition_variable delta_index_update_cv; + Poco::Logger * log; public: From b6367f204d3906b6d8fc87e7cc8136a2723129fb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jun 2022 17:59:37 +0800 Subject: [PATCH 7/9] Fix delta index place when do minor compaction --- .../src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index b16f4cadc44..82a7f4d1f3a 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -360,9 +360,6 @@ MinorCompactionPtr ColumnFilePersistedSet::pickUpMinorCompaction(DMContext & con is_all_trivial_move = is_all_trivial_move && is_trivial_move; cur_task = MinorCompaction::Task{cur_rows_offset, cur_deletes_offset}; }; - cur_rows_offset += file->getRows(); - cur_deletes_offset += file->getDeletes(); - if (auto * t_file = file->tryToTinyFile(); t_file) { bool cur_task_full = cur_task.total_rows >= context.delta_small_column_file_rows; @@ -385,6 +382,8 @@ MinorCompactionPtr ColumnFilePersistedSet::pickUpMinorCompaction(DMContext & con pack_up_cur_task(); cur_task.addColumnFile(file); } + cur_rows_offset += file->getRows(); + cur_deletes_offset += file->getDeletes(); } bool is_trivial_move = compaction->packUpTask(std::move(cur_task)); is_all_trivial_move = is_all_trivial_move && is_trivial_move; From d0062536a475e24d6efae552521643d71054c9ef Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 15 Jun 2022 18:31:36 +0800 Subject: [PATCH 8/9] Fix error in compact and flush process --- .../DeltaMerge/Delta/DeltaValueSpace.cpp | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 1ad03b4b0de..7a9b25e6c93 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -200,17 +200,20 @@ bool DeltaValueSpace::flush(DMContext & context) { LOG_FMT_DEBUG(log, "{} Stop flush because structure got updated", simpleInfo()); } - else if (new_delta_index) + else { - /// Update delta tree and release the update status of delta index - delta_index = new_delta_index; success = true; } - is_delta_index_updating = false; if (success) { + /// Update delta tree + if (new_delta_index) + { + delta_index = new_delta_index; + } LOG_FMT_DEBUG(log, "{} Flush end. Flushed {} column files, {} rows and {} deletes.", info(), flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushDeletes()); } + is_delta_index_updating = false; } delta_index_update_cv.notify_all(); if (!success) @@ -289,19 +292,21 @@ bool DeltaValueSpace::compact(DMContext & context) { LOG_FMT_WARNING(log, "Structure has been updated during compact"); LOG_FMT_DEBUG(log, "{} Compact stop because structure got updated", simpleInfo()); - return false; } - else if (new_delta_index) + else { - /// Update delta tree - delta_index = new_delta_index; success = true; } - is_delta_index_updating = false; if (success) { + /// Update delta tree + if (new_delta_index) + { + delta_index = new_delta_index; + } LOG_FMT_DEBUG(log, "{} {}", simpleInfo(), compaction_task->info()); } + is_delta_index_updating = false; } delta_index_update_cv.notify_all(); if (success) From bebcbfb9e6014e9d300f448a870b919b0c4f7151 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 19 Jun 2022 12:02:08 +0800 Subject: [PATCH 9/9] enable clean read optimization under fast mode --- dbms/src/Storages/DeltaMerge/Segment.cpp | 35 ++++++++++++++++++------ 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index aa5c62845b5..6f57524384b 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -519,14 +519,33 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, auto memtable_stream = delta_stream.getMemTableInputStream(); auto persisted_files_stream = delta_stream.getPersistedFilesInputStream(); - BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( - dm_context, - *new_columns_to_read, - data_ranges, - filter, - std::numeric_limits::max(), - expected_block_size, - false); + BlockInputStreamPtr stable_stream; + if (segment_snap->delta->getRows() == 0 && segment_snap->delta->getDeletes() == 0 // + && !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID) // + && !hasColumn(columns_to_read, VERSION_COLUMN_ID) // + && !hasColumn(columns_to_read, TAG_COLUMN_ID)) + { + // No delta, let's try some optimizations. + stable_stream = segment_snap->stable->getInputStream( + dm_context, + *new_columns_to_read, + data_ranges, + filter, + std::numeric_limits::max(), + expected_block_size, + true); + } + else + { + stable_stream = segment_snap->stable->getInputStream( + dm_context, + *new_columns_to_read, + data_ranges, + filter, + std::numeric_limits::max(), + expected_block_size, + false); + } memtable_stream = std::make_shared>(memtable_stream, data_ranges, 0); memtable_stream = std::make_shared(memtable_stream, columns_to_read);