diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.cpp b/dbms/src/Flash/TiFlashSnapshotHandler.cpp new file mode 100644 index 00000000000..5ba7b8b9c0d --- /dev/null +++ b/dbms/src/Flash/TiFlashSnapshotHandler.cpp @@ -0,0 +1,171 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +struct PreHandledTiFlashSnapshot +{ + ~PreHandledTiFlashSnapshot(); + RegionPtr region; + std::string path; +}; + +PreHandledTiFlashSnapshot::~PreHandledTiFlashSnapshot() +{ + std::cerr << "GC PreHandledTiFlashSnapshot success" + << "\n"; +} + +struct TiFlashSnapshot +{ + TiFlashSnapshot(const DM::ColumnDefines & write_columns_) : write_columns{write_columns_} {} + Pipeline pipeline; + const DM::ColumnDefines & write_columns; + ~TiFlashSnapshot(); +}; + +TiFlashSnapshot::~TiFlashSnapshot() +{ + std::cerr << "GC TiFlashSnapshot success" + << "\n"; +} + +PreHandledTiFlashSnapshot * TiFlashSnapshotHandler::preHandleTiFlashSnapshot(RegionPtr region, const String & path) +{ + return new PreHandledTiFlashSnapshot{std::move(region), path}; +} + +void TiFlashSnapshotHandler::applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap) +{ + std::cerr << "applyPreHandledTiFlashSnapshot: " << snap->region->toString() << "\n"; + auto & kvstore = tmt->getKVStore(); + kvstore->handleApplySnapshot(snap->region, *tmt); + + // TODO: check storage is not nullptr + auto table_id = snap->region->getMappedTableID(); + auto storage = tmt->getStorages().get(table_id); + auto dm_storage = std::dynamic_pointer_cast(storage); + + auto snapshot_file = DM::DMFile::restore(tmt->getContext().getFileProvider(), snap->path); + auto column_cache = std::make_shared(); + DM::DMFileBlockInputStream stream(tmt->getContext(), + DM::MAX_UINT64, + false, + 0, + snapshot_file, + dm_storage->getStore()->getTableColumns(), + DM::RowKeyRange::newAll(dm_storage->isCommonHandle(), 1), + DM::EMPTY_FILTER, + column_cache, + DM::IdSetPtr{}); + + auto settings = tmt->getContext().getSettingsRef(); + stream.readPrefix(); + while (auto block = stream.read()) + dm_storage->write(std::move(block), settings); + + stream.readSuffix(); +} + +TiFlashSnapshot * TiFlashSnapshotHandler::genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id) +{ + auto & kvstore = tmt->getKVStore(); + // generate snapshot struct; + const RegionPtr region = kvstore->getRegion(region_id); + auto region_range = region->getRange(); + auto table_id = region->getMappedTableID(); + auto storage = tmt->getStorages().get(table_id); + Logger * log = &Logger::get("TiFlashSnapshotHandler"); + if (storage == nullptr) + { + LOG_WARNING(log, + "genTiFlashSnapshot can not get table for region:" + region->toString() + " with table id: " + DB::toString(table_id) + + ", ignored"); + return new TiFlashSnapshot(DM::ColumnDefines{}); + } + + auto dm_storage = std::dynamic_pointer_cast(storage); + + auto * snapshot = new TiFlashSnapshot(dm_storage->getStore()->getTableColumns()); + const Settings & settings = tmt->getContext().getSettingsRef(); + + SelectQueryInfo query_info; + // query_info.query is just a placeholder + String query_str = "SELECT 1"; + SQLQuerySource query_src(query_str.data(), query_str.data() + query_str.size()); + std::tie(std::ignore, query_info.query) = query_src.parse(0); + const ASTSelectWithUnionQuery & ast = typeid_cast(*query_info.query); + query_info.query = ast.list_of_selects->children[0]; + + auto mvcc_query_info = std::make_unique(); + mvcc_query_info->resolve_locks = true; + mvcc_query_info->read_tso = settings.read_tso; + RegionQueryInfo info; + { + info.region_id = region_id; + info.version = region->version(); + info.conf_version = region->confVer(); + info.range_in_table = region_range->rawKeys(); + } + mvcc_query_info->regions_query_info.emplace_back(std::move(info)); + query_info.mvcc_query_info = std::move(mvcc_query_info); + + DAGPreparedSets dag_sets{}; + query_info.dag_query = std::make_unique(std::vector{}, dag_sets, std::vector{}); + + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + Names required_columns = storage->getColumns().getNamesOfPhysical(); + snapshot->pipeline.streams = storage->read(required_columns, query_info, tmt->getContext(), from_stage, settings.max_block_size, 1); + snapshot->pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); }); + return snapshot; +} + +SerializeTiFlashSnapshotRes TiFlashSnapshotHandler::serializeTiFlashSnapshotInto( + TMTContext * tmt, TiFlashSnapshot * snapshot, const String & path) +{ + if (snapshot->write_columns.empty()) + return {0, 0, 0}; + auto snapshot_file = DM::DMFile::create(path); + uint64_t key_count = 0; + DM::DMFileBlockOutputStream dst_stream(tmt->getContext(), snapshot_file, snapshot->write_columns); + auto & src_stream = snapshot->pipeline.firstStream(); + src_stream->readPrefix(); + dst_stream.writePrefix(); + while (auto block = src_stream->read()) + { + key_count += block.rows(); + dst_stream.write(block, 0); + } + src_stream->readSuffix(); + dst_stream.writeSuffix(); + Poco::File file(path); + uint64_t total_size = file.getSize(); + // if key_count is 0, file will be deleted + return {1, key_count, total_size}; +} + +bool TiFlashSnapshotHandler::isTiFlashSnapshot(TMTContext * tmt, const String & path) +{ + return DM::DMFile::isValidDMFileInSingleFileMode(tmt->getContext().getFileProvider(), path); +} + +void TiFlashSnapshotHandler::deleteTiFlashSnapshot(TiFlashSnapshot * snap) { delete snap; } + +void TiFlashSnapshotHandler::deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap) { delete snap; } + +} // namespace DB diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.h b/dbms/src/Flash/TiFlashSnapshotHandler.h new file mode 100644 index 00000000000..f595c19909e --- /dev/null +++ b/dbms/src/Flash/TiFlashSnapshotHandler.h @@ -0,0 +1,39 @@ +#include + +#include +#include + +namespace DB +{ +class TMTContext; + +struct PreHandledTiFlashSnapshot; +struct TiFlashSnapshot; + +struct SerializeTiFlashSnapshotRes +{ + uint8_t ok; + uint64_t key_count; + uint64_t total_size; +}; +using String = std::string; + +class TiFlashSnapshotHandler +{ +public: + static PreHandledTiFlashSnapshot * preHandleTiFlashSnapshot(RegionPtr region, const String & path); + + static void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap); + + static TiFlashSnapshot * genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id); + + static SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot * snapshot, const String & path); + + static bool isTiFlashSnapshot(TMTContext * tmt, const String & path); + + static void deleteTiFlashSnapshot(TiFlashSnapshot * snap); + + static void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap); +}; + +} // namespace DB diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 6f3ded31e23..2a830bfd083 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -9,19 +9,16 @@ #include #include #include -#include #include #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 6ade7f5340a..dcd8b339f62 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -53,7 +53,7 @@ String getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Statu String DMFile::path() const { - return getPathByStatus(parent_path, file_id, status); + return !file_path.empty() ? file_path : getPathByStatus(parent_path, file_id, status); } String DMFile::ngcPath() const @@ -103,6 +103,29 @@ DMFilePtr DMFile::restore(const FileProviderPtr & file_provider, UInt64 file_id, return dmfile; } +DMFilePtr DMFile::create(const String & path) +{ + Logger * log = &Logger::get("DMFile"); + + DMFilePtr new_dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::WRITABLE, log)); + Poco::File file(path); + if (file.exists()) + { + file.remove(true); + LOG_WARNING(log, "Existing dmfile, removed :" << path); + } + PageUtil::touchFile(path); + + return new_dmfile; +} + +DMFilePtr DMFile::restore(const FileProviderPtr & file_provider, const String & path) +{ + DMFilePtr dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::READABLE, &Logger::get("DMFile"))); + dmfile->readMeta(file_provider); + return dmfile; +} + String DMFile::colIndexCacheKey(const FileNameBase & file_name_base) const { if (isSingleFileMode()) @@ -144,7 +167,7 @@ bool DMFile::isColIndexExist(const ColId & col_id) const const String DMFile::encryptionBasePath() const { - return parent_path + "/" + FOLDER_PREFIX_READABLE + DB::toString(file_id); + return !file_path.empty() ? file_path : parent_path + "/" + FOLDER_PREFIX_READABLE + DB::toString(file_id); } @@ -339,6 +362,7 @@ void DMFile::finalize(const FileProviderPtr & file_provider) void DMFile::finalize(WriteBuffer & buffer) { Footer footer; + footer.magic_number = DMFile::magic_number; std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMeta(buffer); std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePack(buffer); footer.sub_file_stat_offset = buffer.count(); @@ -357,14 +381,20 @@ void DMFile::finalize(WriteBuffer & buffer) writeIntBinary(footer.sub_file_stat_offset, buffer); writeIntBinary(footer.sub_file_num, buffer); writeIntBinary(static_cast>(footer.file_format_version), buffer); + writeIntBinary(footer.magic_number, buffer); buffer.next(); if (status != Status::WRITING) throw Exception("Expected WRITING status, now " + statusString(status)); - Poco::File old_file(path()); - Poco::File old_ngc_file(ngcPath()); - status = Status::READABLE; - auto new_path = path(); + auto old_path = path(); + auto old_ngc_path = ngcPath(); + status = Status::READABLE; + auto new_path = path(); + // If the path is same, then this is a snapshot file, no need to rename the file + if (old_path == new_path) + return; + Poco::File old_file(old_path); + Poco::File old_ngc_file(old_ngc_path); Poco::File file(new_path); if (file.exists()) file.remove(); @@ -479,5 +509,20 @@ void DMFile::remove(const FileProviderPtr & file_provider) } } +bool DMFile::isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider, const String & path) +{ + Poco::File file(path); + if (!file.isFile()) + return false; + + MagicNumber number; + ReadBufferFromFileProvider buf(file_provider, path, EncryptionPath(path, "")); + buf.seek(file.getSize() - sizeof(MagicNumber), SEEK_SET); + DB::readIntBinary(number, buf); + return number == DMFile::magic_number; +} + +const DMFile::MagicNumber DMFile::magic_number = 0x23579BDF48799ADE; + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index cbba8a1a19f..f9748952b7d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -82,6 +82,7 @@ class DMFile : private boost::noncopyable UInt64 pack_stat_size; }; + using MagicNumber = UInt64; struct Footer { MetaPackInfo meta_pack_info; @@ -89,6 +90,7 @@ class DMFile : private boost::noncopyable UInt32 sub_file_num; DMSingleFileFormatVersion file_format_version; + MagicNumber magic_number; }; using PackStats = PaddedPODArray; @@ -98,6 +100,10 @@ class DMFile : private boost::noncopyable static DMFilePtr restore(const FileProviderPtr & file_provider, UInt64 file_id, UInt64 ref_id, const String & parent_path, bool read_meta = true); + // used for create and read from snapshot file + static DMFilePtr create(const String & path); + static DMFilePtr restore(const FileProviderPtr & file_provider, const String & path); + static std::set listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, bool can_gc); bool canGC(); @@ -108,6 +114,7 @@ class DMFile : private boost::noncopyable UInt64 refId() const { return ref_id; } String path() const; + String metaPath() const { return subFilePath(metaFileName()); } String packStatPath() const { return subFilePath(packStatFileName()); } @@ -184,12 +191,16 @@ class DMFile : private boost::noncopyable return IDataType::getFileNameForStream(DB::toString(col_id), substream); } + static bool isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider, const String & path); + private: DMFile(UInt64 file_id_, UInt64 ref_id_, const String & parent_path_, Mode mode_, Status status_, Logger * log_) : file_id(file_id_), ref_id(ref_id_), parent_path(parent_path_), mode(mode_), status(status_), log(log_) { } + DMFile(const String & path_, Mode mode_, Status status_, Logger * log_) : file_path(path_), mode(mode_), status(status_), log(log_) {} + bool isSingleFileMode() const { return mode == Mode::SINGLE_FILE; } bool isFolderMode() const { return mode == Mode::FOLDER; } @@ -238,12 +249,17 @@ class DMFile : private boost::noncopyable PackStats pack_stats; ColumnStats column_stats; + // used for snapshot file + String file_path; + Mode mode; Status status; mutable std::mutex mutex; SubFileStats sub_file_stats; + static const MagicNumber magic_number; + Logger * log; friend class DMFileWriter; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h index 100075aed47..0d4ea2843b4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h @@ -18,8 +18,7 @@ class DMFileBlockOutputStream context.getSettingsRef().max_compress_block_size, // context.chooseCompressionSettings(0, 0), TODO: should enable this, and make unit testes work. CompressionSettings(CompressionMethod::LZ4), - context.getFileProvider(), - context.getSettingsRef().dt_enable_single_file_mode_dmfile) + context.getFileProvider()) { } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 66559b787a4..6df8fb9a1bc 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -13,24 +13,23 @@ DMFileWriter::DMFileWriter(const DMFilePtr & dmfile_, size_t min_compress_block_size_, size_t max_compress_block_size_, const CompressionSettings & compression_settings_, - const FileProviderPtr & file_provider_, - bool single_file_mode_) + const FileProviderPtr & file_provider_) : dmfile(dmfile_), write_columns(write_columns_), min_compress_block_size(min_compress_block_size_), max_compress_block_size(max_compress_block_size_), compression_settings(compression_settings_), + single_file_mode(dmfile_->isSingleFileMode()), // assume pack_stat_file is the first file created inside DMFile // it will create encryption info for the whole DMFile pack_stat_file( - single_file_mode_ + single_file_mode ? nullptr : createWriteBufferFromFileBaseByFileProvider( file_provider_, dmfile->packStatPath(), dmfile->encryptionPackStatPath(), true, 0, 0, max_compress_block_size)), single_file_stream( - !single_file_mode_ ? nullptr : new SingleFileStream(dmfile_, compression_settings_, max_compress_block_size_, file_provider_)), - file_provider(file_provider_), - single_file_mode(single_file_mode_) + !single_file_mode ? nullptr : new SingleFileStream(dmfile_, compression_settings_, max_compress_block_size_, file_provider_)), + file_provider(file_provider_) { dmfile->setStatus(DMFile::Status::WRITING); for (auto & cd : write_columns) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index e37e18e6741..31b95f5471d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -125,8 +125,7 @@ class DMFileWriter size_t min_compress_block_size_, size_t max_compress_block_size_, const CompressionSettings & compression_settings_, - const FileProviderPtr & file_provider_, - bool single_file_mode_ = false); + const FileProviderPtr & file_provider_); void write(const Block & block, size_t not_clean_rows); void finalize(); @@ -146,6 +145,7 @@ class DMFileWriter size_t min_compress_block_size; size_t max_compress_block_size; CompressionSettings compression_settings; + bool single_file_mode; ColumnStreams column_streams; @@ -154,7 +154,6 @@ class DMFileWriter SingleFileStreamPtr single_file_stream; FileProviderPtr file_provider; - bool single_file_mode; }; } // namespace DM diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 5f83fb4f955..8b2e31115ee 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -195,23 +195,9 @@ void ApplyPreHandledTiKVSnapshot(TiFlashServer * server, PreHandledTiKVSnapshot } } -struct PreHandledTiFlashSnapshot -{ - ~PreHandledTiFlashSnapshot(); - RegionPtr region; -}; - -PreHandledTiFlashSnapshot::~PreHandledTiFlashSnapshot() -{ - std::cerr << "GC PreHandledTiFlashSnapshot success" - << "\n"; -} - void ApplyPreHandledTiFlashSnapshot(TiFlashServer * server, PreHandledTiFlashSnapshot * snap) { - std::cerr << "ApplyPreHandledTiFlashSnapshot: " << snap->region->toString() << "\n"; - auto & kvstore = server->tmt->getKVStore(); - kvstore->handleApplySnapshot(snap->region, *server->tmt); + TiFlashSnapshotHandler::applyPreHandledTiFlashSnapshot(server->tmt, snap); } void ApplyPreHandledSnapshot(TiFlashServer * server, void * res, RawCppPtrType type) @@ -253,10 +239,10 @@ void GcRawCppPtr(TiFlashServer *, RawCppPtr p) delete reinterpret_cast(ptr); break; case RawCppPtrType::TiFlashSnapshot: - delete reinterpret_cast(ptr); + TiFlashSnapshotHandler::deleteTiFlashSnapshot(reinterpret_cast(ptr)); break; case RawCppPtrType::PreHandledTiFlashSnapshot: - delete reinterpret_cast(ptr); + TiFlashSnapshotHandler::deletePreHandledTiFlashSnapshot(reinterpret_cast(ptr)); break; case RawCppPtrType::SplitKeys: delete reinterpret_cast(ptr); @@ -286,13 +272,13 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) try { - auto & kvstore = server->tmt->getKVStore(); + auto & tmt = server->tmt; + auto & kvstore = tmt->getKVStore(); // flush all data of region and persist - if (!kvstore->preGenTiFlashSnapshot(header.region_id, header.index, *server->tmt)) + if (!kvstore->preGenTiFlashSnapshot(header.region_id, header.index, *tmt)) return RawCppPtr(nullptr, RawCppPtrType::None); - // generate snapshot struct; - // TODO - return RawCppPtr(new TiFlashSnapshot(), RawCppPtrType::TiFlashSnapshot); + + return RawCppPtr(TiFlashSnapshotHandler::genTiFlashSnapshot(tmt, header.region_id), RawCppPtrType::TiFlashSnapshot); } catch (...) { @@ -301,62 +287,21 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) } } -SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, TiFlashSnapshot *, BaseBuffView path) +SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, TiFlashSnapshot * snapshot, BaseBuffView path) { std::string real_path(path.data, path.len); - std::cerr << "serializeInto TiFlashSnapshot into path " << real_path << "\n"; - auto encryption_info = server->proxy_helper->newFile(real_path); - char buffer[TiFlashSnapshot::flag.size() + 10]; - std::memset(buffer, 0, sizeof(buffer)); - auto file = fopen(real_path.data(), "w"); - if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) - { - std::cerr << "start to write encryption data" - << "\n"; - BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); - memcpy(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()); - cipher_stream->encrypt(0, buffer, TiFlashSnapshot::flag.size()); - fputs(buffer, file); - } - else - { - fputs(TiFlashSnapshot::flag.data(), file); - std::cerr << "start to write data" - << "\n"; - } - fclose(file); - std::cerr << "finish write " << TiFlashSnapshot::flag.size() << " bytes " + std::cerr << "serialize TiFlashSnapshot into path " << real_path << "\n"; + auto res = TiFlashSnapshotHandler::serializeTiFlashSnapshotInto(server->tmt, snapshot, real_path); + std::cerr << "finish write " << res.total_size << " bytes " << "\n"; - // is key_count is 0, file will be deleted - return {1, 6, TiFlashSnapshot::flag.size()}; + return res; } uint8_t IsTiFlashSnapshot(TiFlashServer * server, BaseBuffView path) { std::string real_path(path.data, path.len); std::cerr << "IsTiFlashSnapshot of path " << real_path << "\n"; - bool res = false; - char buffer[TiFlashSnapshot::flag.size() + 10]; - std::memset(buffer, 0, sizeof(buffer)); - auto encryption_info = server->proxy_helper->getFile(path); - auto file = fopen(real_path.data(), "rb"); - size_t bytes_read = 0; - if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) - { - std::cerr << "try to decrypt file" - << "\n"; - - BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); - bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); - cipher_stream->decrypt(0, buffer, bytes_read); - } - else - { - bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); - } - fclose(file); - if (bytes_read == TiFlashSnapshot::flag.size() && memcmp(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()) == 0) - res = true; + auto res = TiFlashSnapshotHandler::isTiFlashSnapshot(server->tmt, real_path); std::cerr << "start to check IsTiFlashSnapshot, res " << res << "\n"; return res; } @@ -370,10 +315,11 @@ RawCppPtr PreHandleTiFlashSnapshot( region.ParseFromArray(region_buff.data, (int)region_buff.len); auto & tmt = *server->tmt; auto new_region = GenRegionPtr(std::move(region), peer_id, index, term, tmt); + std::string real_path(path.data, path.len); - std::cerr << "PreHandleTiFlashSnapshot from path " << std::string_view(path) << " region " << region.id() << " peer " << peer_id - << " index " << index << " term " << term << "\n"; - return RawCppPtr(new PreHandledTiFlashSnapshot{new_region}, RawCppPtrType::PreHandledTiFlashSnapshot); + std::cerr << "PreHandleTiFlashSnapshot from path " << real_path << " region " << region.id() << " peer " << peer_id << " index " + << index << " term " << term << "\n"; + return RawCppPtr(TiFlashSnapshotHandler::preHandleTiFlashSnapshot(new_region, real_path), RawCppPtrType::PreHandledTiFlashSnapshot); } catch (...) { @@ -382,14 +328,6 @@ RawCppPtr PreHandleTiFlashSnapshot( } } -TiFlashSnapshot::~TiFlashSnapshot() -{ - std::cerr << "GC TiFlashSnapshot success" - << "\n"; -} - -const std::string TiFlashSnapshot::flag = "this is tiflash snapshot"; - GetRegionApproximateSizeKeysRes GetRegionApproximateSizeKeys( TiFlashServer *, uint64_t region_id, BaseBuffView start_key, BaseBuffView end_key) { diff --git a/dbms/src/Storages/Transaction/ProxyFFIType.h b/dbms/src/Storages/Transaction/ProxyFFIType.h index b258ec22861..4485076b6e7 100644 --- a/dbms/src/Storages/Transaction/ProxyFFIType.h +++ b/dbms/src/Storages/Transaction/ProxyFFIType.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -148,19 +149,6 @@ struct CppStrWithView {} }; -struct SerializeTiFlashSnapshotRes -{ - uint8_t ok; - uint64_t key_count; - uint64_t total_size; -}; - -struct TiFlashSnapshot -{ - ~TiFlashSnapshot(); - static const std::string flag; -}; - struct GetRegionApproximateSizeKeysRes { uint8_t ok;