From a12b7ada7356a6cdf83d5f1176db662375f3092a Mon Sep 17 00:00:00 2001 From: linkmyth Date: Thu, 15 Oct 2020 13:34:41 +0800 Subject: [PATCH 1/4] alpha version --- dbms/src/Server/Server.cpp | 3 - dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 43 ++++- dbms/src/Storages/DeltaMerge/File/DMFile.h | 17 ++ dbms/src/Storages/Transaction/ProxyFFI.cpp | 184 ++++++++++++++----- dbms/src/Storages/Transaction/ProxyFFIType.h | 5 + 5 files changed, 198 insertions(+), 54 deletions(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 6f3ded31e23..2a830bfd083 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -9,19 +9,16 @@ #include #include #include -#include #include #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 6ade7f5340a..e1a1bdcb060 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -53,7 +53,7 @@ String getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Statu String DMFile::path() const { - return getPathByStatus(parent_path, file_id, status); + return !file_path.empty() ? file_path : getPathByStatus(parent_path, file_id, status); } String DMFile::ngcPath() const @@ -103,6 +103,29 @@ DMFilePtr DMFile::restore(const FileProviderPtr & file_provider, UInt64 file_id, return dmfile; } +DMFilePtr DMFile::create(const String & path) +{ + Logger * log = &Logger::get("DMFile"); + + DMFilePtr new_dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::WRITABLE, log)); + Poco::File file(path); + if (file.exists()) + { + file.remove(true); + LOG_WARNING(log, "Existing dmfile, removed :" << path); + } + PageUtil::touchFile(path); + + return new_dmfile; +} + +DMFilePtr DMFile::restore(const FileProviderPtr & file_provider, const String & path) +{ + DMFilePtr dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::READABLE, &Logger::get("DMFile"))); + dmfile->readMeta(file_provider); + return dmfile; +} + String DMFile::colIndexCacheKey(const FileNameBase & file_name_base) const { if (isSingleFileMode()) @@ -144,7 +167,7 @@ bool DMFile::isColIndexExist(const ColId & col_id) const const String DMFile::encryptionBasePath() const { - return parent_path + "/" + FOLDER_PREFIX_READABLE + DB::toString(file_id); + return !file_path.empty() ? file_path : parent_path + "/" + FOLDER_PREFIX_READABLE + DB::toString(file_id); } @@ -357,6 +380,7 @@ void DMFile::finalize(WriteBuffer & buffer) writeIntBinary(footer.sub_file_stat_offset, buffer); writeIntBinary(footer.sub_file_num, buffer); writeIntBinary(static_cast>(footer.file_format_version), buffer); + writeIntBinary(footer.magic_number, buffer); buffer.next(); if (status != Status::WRITING) throw Exception("Expected WRITING status, now " + statusString(status)); @@ -479,5 +503,20 @@ void DMFile::remove(const FileProviderPtr & file_provider) } } +bool DMFile::isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider, const String & path) +{ + Poco::File file(path); + if (!file.isFile()) + return false; + + MagicNumber magic_number; + ReadBufferFromFileProvider buf(file_provider, path, EncryptionPath(path, "")); + buf.seek(file.getSize() - sizeof(MagicNumber), SEEK_SET); + DB::readIntBinary(magic_number, buf); + return magic_number == DMFile::Footer::magic_number; +} + +const DMFile::MagicNumber DMFile::Footer::magic_number = 0x13579BDF13579BDF; + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index cbba8a1a19f..bba40c7a2a3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -82,6 +82,7 @@ class DMFile : private boost::noncopyable UInt64 pack_stat_size; }; + using MagicNumber = UInt64; struct Footer { MetaPackInfo meta_pack_info; @@ -89,6 +90,7 @@ class DMFile : private boost::noncopyable UInt32 sub_file_num; DMSingleFileFormatVersion file_format_version; + static const MagicNumber magic_number; }; using PackStats = PaddedPODArray; @@ -98,6 +100,10 @@ class DMFile : private boost::noncopyable static DMFilePtr restore(const FileProviderPtr & file_provider, UInt64 file_id, UInt64 ref_id, const String & parent_path, bool read_meta = true); + // used for create and read from snapshot file + static DMFilePtr create(const String & path); + static DMFilePtr restore(const FileProviderPtr & file_provider, const String & path); + static std::set listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, bool can_gc); bool canGC(); @@ -108,6 +114,7 @@ class DMFile : private boost::noncopyable UInt64 refId() const { return ref_id; } String path() const; + String metaPath() const { return subFilePath(metaFileName()); } String packStatPath() const { return subFilePath(packStatFileName()); } @@ -184,12 +191,19 @@ class DMFile : private boost::noncopyable return IDataType::getFileNameForStream(DB::toString(col_id), substream); } + static bool isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider, const String & path); + private: DMFile(UInt64 file_id_, UInt64 ref_id_, const String & parent_path_, Mode mode_, Status status_, Logger * log_) : file_id(file_id_), ref_id(ref_id_), parent_path(parent_path_), mode(mode_), status(status_), log(log_) { } + DMFile(const String & path_, Mode mode_, Status status_, Logger * log_) + : file_path(path_), mode(mode_), status(status_), log(log_) + { + } + bool isSingleFileMode() const { return mode == Mode::SINGLE_FILE; } bool isFolderMode() const { return mode == Mode::FOLDER; } @@ -238,6 +252,9 @@ class DMFile : private boost::noncopyable PackStats pack_stats; ColumnStats column_stats; + // used for snapshot file + String file_path; + Mode mode; Status status; diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 5f83fb4f955..d60328202f4 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -1,7 +1,13 @@ #include #include #include +#include +#include +#include +#include +#include #include +#include #include #include #include @@ -199,6 +205,7 @@ struct PreHandledTiFlashSnapshot { ~PreHandledTiFlashSnapshot(); RegionPtr region; + std::string path; }; PreHandledTiFlashSnapshot::~PreHandledTiFlashSnapshot() @@ -210,8 +217,36 @@ PreHandledTiFlashSnapshot::~PreHandledTiFlashSnapshot() void ApplyPreHandledTiFlashSnapshot(TiFlashServer * server, PreHandledTiFlashSnapshot * snap) { std::cerr << "ApplyPreHandledTiFlashSnapshot: " << snap->region->toString() << "\n"; - auto & kvstore = server->tmt->getKVStore(); - kvstore->handleApplySnapshot(snap->region, *server->tmt); + auto & tmt = server->tmt; + auto & kvstore = tmt->getKVStore(); + kvstore->handleApplySnapshot(snap->region, *tmt); + + // TODO: check storage is not nullptr + auto table_id = snap->region->getMappedTableID(); + auto storage = tmt->getStorages().get(table_id); + auto dm_storage = std::dynamic_pointer_cast(storage); + + auto snapshot_file = DM::DMFile::restore(tmt->getContext().getFileProvider(), snap->path); + auto column_cache = std::make_shared(); + DM::DMFileBlockInputStream stream(server->tmt->getContext(), + DM::MAX_UINT64, + false, + 0, + snapshot_file, + dm_storage->getStore()->getTableColumns(), + // TODO: how to deal with is_common_handle + DM::RowKeyRange::newAll(false, 1), + DM::EMPTY_FILTER, + column_cache, + DM::IdSetPtr{}); + + auto settings = tmt->getContext().getSettingsRef(); + stream.readPrefix(); + while (auto block = stream.read()) + { + dm_storage->write(std::move(block), settings); + } + stream.readSuffix(); } void ApplyPreHandledSnapshot(TiFlashServer * server, void * res, RawCppPtrType type) @@ -286,13 +321,51 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) try { - auto & kvstore = server->tmt->getKVStore(); + auto & tmt = server->tmt; + auto & kvstore = tmt->getKVStore(); // flush all data of region and persist - if (!kvstore->preGenTiFlashSnapshot(header.region_id, header.index, *server->tmt)) + if (!kvstore->preGenTiFlashSnapshot(header.region_id, header.index, *tmt)) return RawCppPtr(nullptr, RawCppPtrType::None); // generate snapshot struct; - // TODO - return RawCppPtr(new TiFlashSnapshot(), RawCppPtrType::TiFlashSnapshot); + + // TODO: check RegionPtr is not nullptr + const RegionPtr region = kvstore->getRegion(header.region_id); + auto region_range = region->getRange(); + + // TODO: check storage is not nullptr + auto table_id = region->getMappedTableID(); + auto storage = tmt->getStorages().get(table_id); + auto dm_storage = std::dynamic_pointer_cast(storage); + + auto * snapshot = new TiFlashSnapshot(dm_storage->getStore()->getTableColumns()); + + const Settings & settings = tmt->getContext().getSettingsRef(); + auto mvcc_query_info = std::make_unique(); + mvcc_query_info->resolve_locks = true; + mvcc_query_info->read_tso = settings.read_tso; + RegionQueryInfo info; + { + info.region_id = header.region_id; + info.version = region->version(); + info.conf_version = region->confVer(); + info.range_in_table = region_range->rawKeys(); + } + mvcc_query_info->regions_query_info.emplace_back(std::move(info)); + + SelectQueryInfo query_info; + String query_str = "SELECT * FROM " + storage->getDatabaseName() + "." + storage->getTableName(); + SQLQuerySource query_src(query_str.data(), query_str.data() + query_str.size()); + std::tie(std::ignore, query_info.query) = query_src.parse(0); + + query_info.mvcc_query_info = std::move(mvcc_query_info); + + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + + auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + Names required_columns = storage->getColumns().getNamesOfPhysical(); + snapshot->pipeline.streams = storage->read(required_columns, query_info, tmt->getContext(), from_stage, settings.max_block_size, 1); + snapshot->pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); }); + return RawCppPtr(snapshot, RawCppPtrType::TiFlashSnapshot); } catch (...) { @@ -301,33 +374,44 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) } } -SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, TiFlashSnapshot *, BaseBuffView path) +SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, TiFlashSnapshot *snapshot, BaseBuffView path) { std::string real_path(path.data, path.len); std::cerr << "serializeInto TiFlashSnapshot into path " << real_path << "\n"; - auto encryption_info = server->proxy_helper->newFile(real_path); - char buffer[TiFlashSnapshot::flag.size() + 10]; - std::memset(buffer, 0, sizeof(buffer)); - auto file = fopen(real_path.data(), "w"); - if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) - { - std::cerr << "start to write encryption data" - << "\n"; - BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); - memcpy(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()); - cipher_stream->encrypt(0, buffer, TiFlashSnapshot::flag.size()); - fputs(buffer, file); - } - else +// auto encryption_info = server->proxy_helper->newFile(real_path); +// char buffer[TiFlashSnapshot::flag.size() + 10]; +// std::memset(buffer, 0, sizeof(buffer)); +// auto file = fopen(real_path.data(), "w"); +// if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) +// { +// std::cerr << "start to write encryption data" +// << "\n"; +// BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); +// memcpy(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()); +// cipher_stream->encrypt(0, buffer, TiFlashSnapshot::flag.size()); +// fputs(buffer, file); +// } +// else +// { +// fputs(TiFlashSnapshot::flag.data(), file); +// std::cerr << "start to write data" +// << "\n"; +// } +// fclose(file); + auto snapshot_file = DM::DMFile::create(real_path); + DM::DMFileBlockOutputStream dst_stream(server->tmt->getContext(), snapshot_file, snapshot->write_columns); + auto & src_stream = snapshot->pipeline.firstStream(); + src_stream->readPrefix(); + dst_stream.writePrefix(); + while (auto block = src_stream->read()) { - fputs(TiFlashSnapshot::flag.data(), file); - std::cerr << "start to write data" - << "\n"; + dst_stream.write(block, 0); } - fclose(file); + src_stream->readSuffix(); + dst_stream.writeSuffix(); std::cerr << "finish write " << TiFlashSnapshot::flag.size() << " bytes " << "\n"; - // is key_count is 0, file will be deleted + // if key_count is 0, file will be deleted return {1, 6, TiFlashSnapshot::flag.size()}; } @@ -336,27 +420,28 @@ uint8_t IsTiFlashSnapshot(TiFlashServer * server, BaseBuffView path) std::string real_path(path.data, path.len); std::cerr << "IsTiFlashSnapshot of path " << real_path << "\n"; bool res = false; - char buffer[TiFlashSnapshot::flag.size() + 10]; - std::memset(buffer, 0, sizeof(buffer)); - auto encryption_info = server->proxy_helper->getFile(path); - auto file = fopen(real_path.data(), "rb"); - size_t bytes_read = 0; - if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) - { - std::cerr << "try to decrypt file" - << "\n"; - - BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); - bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); - cipher_stream->decrypt(0, buffer, bytes_read); - } - else - { - bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); - } - fclose(file); - if (bytes_read == TiFlashSnapshot::flag.size() && memcmp(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()) == 0) - res = true; +// char buffer[TiFlashSnapshot::flag.size() + 10]; +// std::memset(buffer, 0, sizeof(buffer)); +// auto encryption_info = server->proxy_helper->getFile(path); +// auto file = fopen(real_path.data(), "rb"); +// size_t bytes_read = 0; +// if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) +// { +// std::cerr << "try to decrypt file" +// << "\n"; +// +// BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); +// bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); +// cipher_stream->decrypt(0, buffer, bytes_read); +// } +// else +// { +// bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); +// } +// fclose(file); +// if (bytes_read == TiFlashSnapshot::flag.size() && memcmp(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()) == 0) +// res = true; + res = DM::DMFile::isValidDMFileInSingleFileMode(server->tmt->getContext().getFileProvider(), real_path); std::cerr << "start to check IsTiFlashSnapshot, res " << res << "\n"; return res; } @@ -370,10 +455,11 @@ RawCppPtr PreHandleTiFlashSnapshot( region.ParseFromArray(region_buff.data, (int)region_buff.len); auto & tmt = *server->tmt; auto new_region = GenRegionPtr(std::move(region), peer_id, index, term, tmt); + std::string real_path(path.data, path.len); - std::cerr << "PreHandleTiFlashSnapshot from path " << std::string_view(path) << " region " << region.id() << " peer " << peer_id + std::cerr << "PreHandleTiFlashSnapshot from path " << real_path << " region " << region.id() << " peer " << peer_id << " index " << index << " term " << term << "\n"; - return RawCppPtr(new PreHandledTiFlashSnapshot{new_region}, RawCppPtrType::PreHandledTiFlashSnapshot); + return RawCppPtr(new PreHandledTiFlashSnapshot{new_region, real_path}, RawCppPtrType::PreHandledTiFlashSnapshot); } catch (...) { diff --git a/dbms/src/Storages/Transaction/ProxyFFIType.h b/dbms/src/Storages/Transaction/ProxyFFIType.h index b258ec22861..7977e7d23b1 100644 --- a/dbms/src/Storages/Transaction/ProxyFFIType.h +++ b/dbms/src/Storages/Transaction/ProxyFFIType.h @@ -1,7 +1,9 @@ #pragma once +#include #include #include +#include #include #include @@ -157,6 +159,9 @@ struct SerializeTiFlashSnapshotRes struct TiFlashSnapshot { + TiFlashSnapshot(const DM::ColumnDefines & write_columns_) : write_columns{write_columns_} {} + Pipeline pipeline; + const DM::ColumnDefines & write_columns; ~TiFlashSnapshot(); static const std::string flag; }; From 0ec3c2385785716fe292b9a57f6fd5e3b00075e2 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Sun, 18 Oct 2020 11:54:49 +0800 Subject: [PATCH 2/4] almost complete --- dbms/src/Flash/TiFlashSnapshotHandler.cpp | 172 ++++++++++++++++++ dbms/src/Flash/TiFlashSnapshotHandler.h | 30 +++ dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 22 ++- dbms/src/Storages/DeltaMerge/File/DMFile.h | 4 +- .../DeltaMerge/File/DMFileBlockOutputStream.h | 3 +- .../Storages/DeltaMerge/File/DMFileWriter.cpp | 13 +- .../Storages/DeltaMerge/File/DMFileWriter.h | 6 +- dbms/src/Storages/Transaction/ProxyFFI.cpp | 168 +---------------- dbms/src/Storages/Transaction/ProxyFFIType.h | 19 +- 9 files changed, 242 insertions(+), 195 deletions(-) create mode 100644 dbms/src/Flash/TiFlashSnapshotHandler.cpp create mode 100644 dbms/src/Flash/TiFlashSnapshotHandler.h diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.cpp b/dbms/src/Flash/TiFlashSnapshotHandler.cpp new file mode 100644 index 00000000000..69b2e2829f6 --- /dev/null +++ b/dbms/src/Flash/TiFlashSnapshotHandler.cpp @@ -0,0 +1,172 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +struct PreHandledTiFlashSnapshot +{ + ~PreHandledTiFlashSnapshot(); + RegionPtr region; + std::string path; +}; + +PreHandledTiFlashSnapshot::~PreHandledTiFlashSnapshot() +{ + std::cerr << "GC PreHandledTiFlashSnapshot success" + << "\n"; +} + +struct TiFlashSnapshot +{ + TiFlashSnapshot(const DM::ColumnDefines & write_columns_) : write_columns{write_columns_} {} + Pipeline pipeline; + const DM::ColumnDefines & write_columns; + ~TiFlashSnapshot(); +}; + +TiFlashSnapshot::~TiFlashSnapshot() +{ + std::cerr << "GC TiFlashSnapshot success" + << "\n"; +} + +PreHandledTiFlashSnapshot *preHandleTiFlashSnapshot(RegionPtr region, const String & path) +{ + return new PreHandledTiFlashSnapshot{std::move(region), path}; +} + +void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap) +{ + std::cerr << "applyPreHandledTiFlashSnapshot: " << snap->region->toString() << "\n"; + auto & kvstore = tmt->getKVStore(); + kvstore->handleApplySnapshot(snap->region, *tmt); + + // TODO: check storage is not nullptr + auto table_id = snap->region->getMappedTableID(); + auto storage = tmt->getStorages().get(table_id); + auto dm_storage = std::dynamic_pointer_cast(storage); + + auto snapshot_file = DM::DMFile::restore(tmt->getContext().getFileProvider(), snap->path); + auto column_cache = std::make_shared(); + DM::DMFileBlockInputStream stream(tmt->getContext(), + DM::MAX_UINT64, + false, + 0, + snapshot_file, + dm_storage->getStore()->getTableColumns(), + DM::RowKeyRange::newAll(dm_storage->isCommonHandle(), 1), + DM::EMPTY_FILTER, + column_cache, + DM::IdSetPtr{}); + + auto settings = tmt->getContext().getSettingsRef(); + stream.readPrefix(); + while (auto block = stream.read()) + { + dm_storage->write(std::move(block), settings); + } + stream.readSuffix(); +} + +TiFlashSnapshot *genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id) +{ + auto & kvstore = tmt->getKVStore(); + // generate snapshot struct; + // TODO: check RegionPtr is not nullptr + const RegionPtr region = kvstore->getRegion(region_id); + auto region_range = region->getRange(); + // TODO: check storage is not nullptr + auto table_id = region->getMappedTableID(); + auto storage = tmt->getStorages().get(table_id); + auto dm_storage = std::dynamic_pointer_cast(storage); + + auto * snapshot = new TiFlashSnapshot(dm_storage->getStore()->getTableColumns()); + const Settings & settings = tmt->getContext().getSettingsRef(); + + SelectQueryInfo query_info; + // query_info.query is just a placeholder + String query_str = "SELECT 1"; + SQLQuerySource query_src(query_str.data(), query_str.data() + query_str.size()); + std::tie(std::ignore, query_info.query) = query_src.parse(0); + const ASTSelectWithUnionQuery & ast = typeid_cast(*query_info.query); + query_info.query = ast.list_of_selects->children[0]; + + auto mvcc_query_info = std::make_unique(); + mvcc_query_info->resolve_locks = true; + mvcc_query_info->read_tso = settings.read_tso; + RegionQueryInfo info; + { + info.region_id = region_id; + info.version = region->version(); + info.conf_version = region->confVer(); + info.range_in_table = region_range->rawKeys(); + } + mvcc_query_info->regions_query_info.emplace_back(std::move(info)); + query_info.mvcc_query_info = std::move(mvcc_query_info); + + DAGPreparedSets dag_sets{}; + query_info.dag_query = std::make_unique(std::vector{}, dag_sets, std::vector{}); + + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + Names required_columns = storage->getColumns().getNamesOfPhysical(); + snapshot->pipeline.streams = storage->read(required_columns, query_info, tmt->getContext(), from_stage, settings.max_block_size, 1); + snapshot->pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); }); + return snapshot; +} + +SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path) +{ + auto snapshot_file = DM::DMFile::create(path); + Poco::File file1(path); + std::cerr << "after create dmfile, file exists: " << file1.exists() << std::endl; + uint64_t key_count = 0; + DM::DMFileBlockOutputStream dst_stream(tmt->getContext(), snapshot_file, snapshot->write_columns); + auto & src_stream = snapshot->pipeline.firstStream(); + Poco::File file2(path); + std::cerr << "after get src stream, file exists: " << file2.exists() << std::endl; + src_stream->readPrefix(); + dst_stream.writePrefix(); + Poco::File file3(path); + std::cerr << "after write prewrite, file exists: " << file3.exists() << std::endl; + while (auto block = src_stream->read()) + { + key_count += block.rows(); + dst_stream.write(block, 0); + Poco::File file4(path); + std::cerr << "after write block, file exists: " << file4.exists() << std::endl; + } + src_stream->readSuffix(); + dst_stream.writeSuffix(); + Poco::File file5(path); + std::cerr << "after write suffix, file exists: " << file5.exists() << std::endl; + Poco::File file(path); + uint64_t total_size = file.getSize(); + // if key_count is 0, file will be deleted + return {1, key_count, total_size}; +} + +bool isTiFlashSnapshot(TMTContext * tmt, const String & path) +{ + return DM::DMFile::isValidDMFileInSingleFileMode(tmt->getContext().getFileProvider(), path); +} + +void deleteTiFlashSnapshot(TiFlashSnapshot * snap) { delete snap; } + +void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap) { delete snap; } + +} \ No newline at end of file diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.h b/dbms/src/Flash/TiFlashSnapshotHandler.h new file mode 100644 index 00000000000..5d82bed000b --- /dev/null +++ b/dbms/src/Flash/TiFlashSnapshotHandler.h @@ -0,0 +1,30 @@ +#include + +#include +#include + +namespace DB +{ +class TMTContext; + +struct PreHandledTiFlashSnapshot; +struct TiFlashSnapshot; + +struct SerializeTiFlashSnapshotRes +{ + uint8_t ok; + uint64_t key_count; + uint64_t total_size; +}; +using String = std::string; + +PreHandledTiFlashSnapshot *preHandleTiFlashSnapshot(RegionPtr region, const String & path); +void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap); +TiFlashSnapshot *genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id); +SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path); +bool isTiFlashSnapshot(TMTContext * tmt, const String & path); +void deleteTiFlashSnapshot(TiFlashSnapshot * snap); +void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap); + +} + diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index e1a1bdcb060..1d33c39002b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -361,7 +361,9 @@ void DMFile::finalize(const FileProviderPtr & file_provider) void DMFile::finalize(WriteBuffer & buffer) { + std::cerr << "DMFile::finalize" << std::endl; Footer footer; + footer.magic_number = DMFile::magic_number; std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMeta(buffer); std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePack(buffer); footer.sub_file_stat_offset = buffer.count(); @@ -384,11 +386,17 @@ void DMFile::finalize(WriteBuffer & buffer) buffer.next(); if (status != Status::WRITING) throw Exception("Expected WRITING status, now " + statusString(status)); - Poco::File old_file(path()); - Poco::File old_ngc_file(ngcPath()); - status = Status::READABLE; + auto old_path = path(); + auto old_ngc_path = ngcPath(); + status = Status::READABLE; auto new_path = path(); + // If the path is same, then this is a snapshot file, no need to rename the file + if (old_path == new_path) + return; + std::cerr << "old_path " << old_path << " new_path " << new_path << std::endl; + Poco::File old_file(old_path); + Poco::File old_ngc_file(old_ngc_path); Poco::File file(new_path); if (file.exists()) file.remove(); @@ -509,14 +517,14 @@ bool DMFile::isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider if (!file.isFile()) return false; - MagicNumber magic_number; + MagicNumber number; ReadBufferFromFileProvider buf(file_provider, path, EncryptionPath(path, "")); buf.seek(file.getSize() - sizeof(MagicNumber), SEEK_SET); - DB::readIntBinary(magic_number, buf); - return magic_number == DMFile::Footer::magic_number; + DB::readIntBinary(number, buf); + return number == DMFile::magic_number; } -const DMFile::MagicNumber DMFile::Footer::magic_number = 0x13579BDF13579BDF; +const DMFile::MagicNumber DMFile::magic_number = 0x13579BDF13579BDF; } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index bba40c7a2a3..dec3ddc0b26 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -90,7 +90,7 @@ class DMFile : private boost::noncopyable UInt32 sub_file_num; DMSingleFileFormatVersion file_format_version; - static const MagicNumber magic_number; + MagicNumber magic_number; }; using PackStats = PaddedPODArray; @@ -261,6 +261,8 @@ class DMFile : private boost::noncopyable mutable std::mutex mutex; SubFileStats sub_file_stats; + static const MagicNumber magic_number; + Logger * log; friend class DMFileWriter; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h index 100075aed47..0d4ea2843b4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockOutputStream.h @@ -18,8 +18,7 @@ class DMFileBlockOutputStream context.getSettingsRef().max_compress_block_size, // context.chooseCompressionSettings(0, 0), TODO: should enable this, and make unit testes work. CompressionSettings(CompressionMethod::LZ4), - context.getFileProvider(), - context.getSettingsRef().dt_enable_single_file_mode_dmfile) + context.getFileProvider()) { } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 66559b787a4..26dd1e545ac 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -13,24 +13,23 @@ DMFileWriter::DMFileWriter(const DMFilePtr & dmfile_, size_t min_compress_block_size_, size_t max_compress_block_size_, const CompressionSettings & compression_settings_, - const FileProviderPtr & file_provider_, - bool single_file_mode_) + const FileProviderPtr & file_provider_) : dmfile(dmfile_), write_columns(write_columns_), min_compress_block_size(min_compress_block_size_), max_compress_block_size(max_compress_block_size_), compression_settings(compression_settings_), + single_file_mode(dmfile_->isSingleFileMode()), // assume pack_stat_file is the first file created inside DMFile // it will create encryption info for the whole DMFile pack_stat_file( - single_file_mode_ + single_file_mode ? nullptr : createWriteBufferFromFileBaseByFileProvider( file_provider_, dmfile->packStatPath(), dmfile->encryptionPackStatPath(), true, 0, 0, max_compress_block_size)), single_file_stream( - !single_file_mode_ ? nullptr : new SingleFileStream(dmfile_, compression_settings_, max_compress_block_size_, file_provider_)), - file_provider(file_provider_), - single_file_mode(single_file_mode_) + !single_file_mode ? nullptr : new SingleFileStream(dmfile_, compression_settings_, max_compress_block_size_, file_provider_)), + file_provider(file_provider_) { dmfile->setStatus(DMFile::Status::WRITING); for (auto & cd : write_columns) @@ -113,10 +112,12 @@ void DMFileWriter::write(const Block & block, size_t not_clean_rows) void DMFileWriter::finalize() { + std::cerr << "finalize\n"; for (auto & cd : write_columns) { finalizeColumn(cd.id, cd.type); } + std::cerr << "after finalizeColumn\n"; if (single_file_mode) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index e37e18e6741..8460361f988 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -95,6 +95,7 @@ class DMFileWriter void flush() { + std::cerr << "flush plain_hashing\n"; plain_hashing.next(); plain_file->next(); @@ -125,8 +126,7 @@ class DMFileWriter size_t min_compress_block_size_, size_t max_compress_block_size_, const CompressionSettings & compression_settings_, - const FileProviderPtr & file_provider_, - bool single_file_mode_ = false); + const FileProviderPtr & file_provider_); void write(const Block & block, size_t not_clean_rows); void finalize(); @@ -146,6 +146,7 @@ class DMFileWriter size_t min_compress_block_size; size_t max_compress_block_size; CompressionSettings compression_settings; + bool single_file_mode; ColumnStreams column_streams; @@ -154,7 +155,6 @@ class DMFileWriter SingleFileStreamPtr single_file_stream; FileProviderPtr file_provider; - bool single_file_mode; }; } // namespace DM diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index d60328202f4..09239a0fee3 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -1,13 +1,7 @@ #include #include #include -#include -#include -#include -#include -#include #include -#include #include #include #include @@ -201,52 +195,9 @@ void ApplyPreHandledTiKVSnapshot(TiFlashServer * server, PreHandledTiKVSnapshot } } -struct PreHandledTiFlashSnapshot -{ - ~PreHandledTiFlashSnapshot(); - RegionPtr region; - std::string path; -}; - -PreHandledTiFlashSnapshot::~PreHandledTiFlashSnapshot() -{ - std::cerr << "GC PreHandledTiFlashSnapshot success" - << "\n"; -} - void ApplyPreHandledTiFlashSnapshot(TiFlashServer * server, PreHandledTiFlashSnapshot * snap) { - std::cerr << "ApplyPreHandledTiFlashSnapshot: " << snap->region->toString() << "\n"; - auto & tmt = server->tmt; - auto & kvstore = tmt->getKVStore(); - kvstore->handleApplySnapshot(snap->region, *tmt); - - // TODO: check storage is not nullptr - auto table_id = snap->region->getMappedTableID(); - auto storage = tmt->getStorages().get(table_id); - auto dm_storage = std::dynamic_pointer_cast(storage); - - auto snapshot_file = DM::DMFile::restore(tmt->getContext().getFileProvider(), snap->path); - auto column_cache = std::make_shared(); - DM::DMFileBlockInputStream stream(server->tmt->getContext(), - DM::MAX_UINT64, - false, - 0, - snapshot_file, - dm_storage->getStore()->getTableColumns(), - // TODO: how to deal with is_common_handle - DM::RowKeyRange::newAll(false, 1), - DM::EMPTY_FILTER, - column_cache, - DM::IdSetPtr{}); - - auto settings = tmt->getContext().getSettingsRef(); - stream.readPrefix(); - while (auto block = stream.read()) - { - dm_storage->write(std::move(block), settings); - } - stream.readSuffix(); + applyPreHandledTiFlashSnapshot(server->tmt, snap); } void ApplyPreHandledSnapshot(TiFlashServer * server, void * res, RawCppPtrType type) @@ -288,10 +239,10 @@ void GcRawCppPtr(TiFlashServer *, RawCppPtr p) delete reinterpret_cast(ptr); break; case RawCppPtrType::TiFlashSnapshot: - delete reinterpret_cast(ptr); + deleteTiFlashSnapshot(reinterpret_cast(ptr)); break; case RawCppPtrType::PreHandledTiFlashSnapshot: - delete reinterpret_cast(ptr); + deletePreHandledTiFlashSnapshot(reinterpret_cast(ptr)); break; case RawCppPtrType::SplitKeys: delete reinterpret_cast(ptr); @@ -326,46 +277,8 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) // flush all data of region and persist if (!kvstore->preGenTiFlashSnapshot(header.region_id, header.index, *tmt)) return RawCppPtr(nullptr, RawCppPtrType::None); - // generate snapshot struct; - - // TODO: check RegionPtr is not nullptr - const RegionPtr region = kvstore->getRegion(header.region_id); - auto region_range = region->getRange(); - // TODO: check storage is not nullptr - auto table_id = region->getMappedTableID(); - auto storage = tmt->getStorages().get(table_id); - auto dm_storage = std::dynamic_pointer_cast(storage); - - auto * snapshot = new TiFlashSnapshot(dm_storage->getStore()->getTableColumns()); - - const Settings & settings = tmt->getContext().getSettingsRef(); - auto mvcc_query_info = std::make_unique(); - mvcc_query_info->resolve_locks = true; - mvcc_query_info->read_tso = settings.read_tso; - RegionQueryInfo info; - { - info.region_id = header.region_id; - info.version = region->version(); - info.conf_version = region->confVer(); - info.range_in_table = region_range->rawKeys(); - } - mvcc_query_info->regions_query_info.emplace_back(std::move(info)); - - SelectQueryInfo query_info; - String query_str = "SELECT * FROM " + storage->getDatabaseName() + "." + storage->getTableName(); - SQLQuerySource query_src(query_str.data(), query_str.data() + query_str.size()); - std::tie(std::ignore, query_info.query) = query_src.parse(0); - - query_info.mvcc_query_info = std::move(mvcc_query_info); - - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; - - auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); - Names required_columns = storage->getColumns().getNamesOfPhysical(); - snapshot->pipeline.streams = storage->read(required_columns, query_info, tmt->getContext(), from_stage, settings.max_block_size, 1); - snapshot->pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); }); - return RawCppPtr(snapshot, RawCppPtrType::TiFlashSnapshot); + return RawCppPtr(genTiFlashSnapshot(tmt, header.region_id), RawCppPtrType::TiFlashSnapshot); } catch (...) { @@ -377,71 +290,18 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, TiFlashSnapshot *snapshot, BaseBuffView path) { std::string real_path(path.data, path.len); - std::cerr << "serializeInto TiFlashSnapshot into path " << real_path << "\n"; -// auto encryption_info = server->proxy_helper->newFile(real_path); -// char buffer[TiFlashSnapshot::flag.size() + 10]; -// std::memset(buffer, 0, sizeof(buffer)); -// auto file = fopen(real_path.data(), "w"); -// if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) -// { -// std::cerr << "start to write encryption data" -// << "\n"; -// BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); -// memcpy(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()); -// cipher_stream->encrypt(0, buffer, TiFlashSnapshot::flag.size()); -// fputs(buffer, file); -// } -// else -// { -// fputs(TiFlashSnapshot::flag.data(), file); -// std::cerr << "start to write data" -// << "\n"; -// } -// fclose(file); - auto snapshot_file = DM::DMFile::create(real_path); - DM::DMFileBlockOutputStream dst_stream(server->tmt->getContext(), snapshot_file, snapshot->write_columns); - auto & src_stream = snapshot->pipeline.firstStream(); - src_stream->readPrefix(); - dst_stream.writePrefix(); - while (auto block = src_stream->read()) - { - dst_stream.write(block, 0); - } - src_stream->readSuffix(); - dst_stream.writeSuffix(); - std::cerr << "finish write " << TiFlashSnapshot::flag.size() << " bytes " + std::cerr << "serialize TiFlashSnapshot into path " << real_path << "\n"; + auto res = serializeTiFlashSnapshotInto(server->tmt, snapshot, real_path); + std::cerr << "finish write " << res.total_size << " bytes " << "\n"; - // if key_count is 0, file will be deleted - return {1, 6, TiFlashSnapshot::flag.size()}; + return res; } uint8_t IsTiFlashSnapshot(TiFlashServer * server, BaseBuffView path) { std::string real_path(path.data, path.len); std::cerr << "IsTiFlashSnapshot of path " << real_path << "\n"; - bool res = false; -// char buffer[TiFlashSnapshot::flag.size() + 10]; -// std::memset(buffer, 0, sizeof(buffer)); -// auto encryption_info = server->proxy_helper->getFile(path); -// auto file = fopen(real_path.data(), "rb"); -// size_t bytes_read = 0; -// if (encryption_info.res == FileEncryptionRes::Ok && encryption_info.method != EncryptionMethod::Plaintext) -// { -// std::cerr << "try to decrypt file" -// << "\n"; -// -// BlockAccessCipherStreamPtr cipher_stream = AESCTRCipherStream::createCipherStream(encryption_info, EncryptionPath(real_path, "")); -// bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); -// cipher_stream->decrypt(0, buffer, bytes_read); -// } -// else -// { -// bytes_read = fread(buffer, 1, TiFlashSnapshot::flag.size(), file); -// } -// fclose(file); -// if (bytes_read == TiFlashSnapshot::flag.size() && memcmp(buffer, TiFlashSnapshot::flag.data(), TiFlashSnapshot::flag.size()) == 0) -// res = true; - res = DM::DMFile::isValidDMFileInSingleFileMode(server->tmt->getContext().getFileProvider(), real_path); + auto res = isTiFlashSnapshot(server->tmt, real_path); std::cerr << "start to check IsTiFlashSnapshot, res " << res << "\n"; return res; } @@ -459,7 +319,7 @@ RawCppPtr PreHandleTiFlashSnapshot( std::cerr << "PreHandleTiFlashSnapshot from path " << real_path << " region " << region.id() << " peer " << peer_id << " index " << index << " term " << term << "\n"; - return RawCppPtr(new PreHandledTiFlashSnapshot{new_region, real_path}, RawCppPtrType::PreHandledTiFlashSnapshot); + return RawCppPtr(preHandleTiFlashSnapshot(new_region, real_path), RawCppPtrType::PreHandledTiFlashSnapshot); } catch (...) { @@ -468,14 +328,6 @@ RawCppPtr PreHandleTiFlashSnapshot( } } -TiFlashSnapshot::~TiFlashSnapshot() -{ - std::cerr << "GC TiFlashSnapshot success" - << "\n"; -} - -const std::string TiFlashSnapshot::flag = "this is tiflash snapshot"; - GetRegionApproximateSizeKeysRes GetRegionApproximateSizeKeys( TiFlashServer *, uint64_t region_id, BaseBuffView start_key, BaseBuffView end_key) { diff --git a/dbms/src/Storages/Transaction/ProxyFFIType.h b/dbms/src/Storages/Transaction/ProxyFFIType.h index 7977e7d23b1..4485076b6e7 100644 --- a/dbms/src/Storages/Transaction/ProxyFFIType.h +++ b/dbms/src/Storages/Transaction/ProxyFFIType.h @@ -1,9 +1,8 @@ #pragma once -#include +#include #include #include -#include #include #include @@ -150,22 +149,6 @@ struct CppStrWithView {} }; -struct SerializeTiFlashSnapshotRes -{ - uint8_t ok; - uint64_t key_count; - uint64_t total_size; -}; - -struct TiFlashSnapshot -{ - TiFlashSnapshot(const DM::ColumnDefines & write_columns_) : write_columns{write_columns_} {} - Pipeline pipeline; - const DM::ColumnDefines & write_columns; - ~TiFlashSnapshot(); - static const std::string flag; -}; - struct GetRegionApproximateSizeKeysRes { uint8_t ok; From 387a3cf74509d67266f0e782fa39457e45aca586 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Sun, 18 Oct 2020 12:32:23 +0800 Subject: [PATCH 3/4] remove unnecessary log --- dbms/src/Flash/TiFlashSnapshotHandler.cpp | 42 +++++++++---------- dbms/src/Flash/TiFlashSnapshotHandler.h | 25 +++++++---- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 4 +- .../Storages/DeltaMerge/File/DMFileWriter.cpp | 2 - .../Storages/DeltaMerge/File/DMFileWriter.h | 1 - dbms/src/Storages/Transaction/ProxyFFI.cpp | 14 +++---- 6 files changed, 45 insertions(+), 43 deletions(-) diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.cpp b/dbms/src/Flash/TiFlashSnapshotHandler.cpp index 69b2e2829f6..4f58ef30d1e 100644 --- a/dbms/src/Flash/TiFlashSnapshotHandler.cpp +++ b/dbms/src/Flash/TiFlashSnapshotHandler.cpp @@ -44,12 +44,12 @@ TiFlashSnapshot::~TiFlashSnapshot() << "\n"; } -PreHandledTiFlashSnapshot *preHandleTiFlashSnapshot(RegionPtr region, const String & path) +PreHandledTiFlashSnapshot *TiFlashSnapshotHandler::preHandleTiFlashSnapshot(RegionPtr region, const String & path) { return new PreHandledTiFlashSnapshot{std::move(region), path}; } -void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap) +void TiFlashSnapshotHandler::applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap) { std::cerr << "applyPreHandledTiFlashSnapshot: " << snap->region->toString() << "\n"; auto & kvstore = tmt->getKVStore(); @@ -76,22 +76,28 @@ void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot auto settings = tmt->getContext().getSettingsRef(); stream.readPrefix(); while (auto block = stream.read()) - { dm_storage->write(std::move(block), settings); - } + stream.readSuffix(); } -TiFlashSnapshot *genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id) +TiFlashSnapshot *TiFlashSnapshotHandler::genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id) { auto & kvstore = tmt->getKVStore(); // generate snapshot struct; - // TODO: check RegionPtr is not nullptr const RegionPtr region = kvstore->getRegion(region_id); auto region_range = region->getRange(); - // TODO: check storage is not nullptr auto table_id = region->getMappedTableID(); auto storage = tmt->getStorages().get(table_id); + Logger * log = &Logger::get("TiFlashSnapshotHandler"); + if (storage == nullptr) + { + LOG_WARNING(log, + "genTiFlashSnapshot can not get table for region:" + region->toString() + + " with table id: " + DB::toString(table_id) + ", ignored"); + return new TiFlashSnapshot(DM::ColumnDefines{}); + } + auto dm_storage = std::dynamic_pointer_cast(storage); auto * snapshot = new TiFlashSnapshot(dm_storage->getStore()->getTableColumns()); @@ -129,44 +135,36 @@ TiFlashSnapshot *genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id) return snapshot; } -SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path) +SerializeTiFlashSnapshotRes TiFlashSnapshotHandler::serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path) { + if (snapshot->write_columns.empty()) + return {0, 0, 0}; auto snapshot_file = DM::DMFile::create(path); - Poco::File file1(path); - std::cerr << "after create dmfile, file exists: " << file1.exists() << std::endl; uint64_t key_count = 0; DM::DMFileBlockOutputStream dst_stream(tmt->getContext(), snapshot_file, snapshot->write_columns); auto & src_stream = snapshot->pipeline.firstStream(); - Poco::File file2(path); - std::cerr << "after get src stream, file exists: " << file2.exists() << std::endl; src_stream->readPrefix(); dst_stream.writePrefix(); - Poco::File file3(path); - std::cerr << "after write prewrite, file exists: " << file3.exists() << std::endl; while (auto block = src_stream->read()) { key_count += block.rows(); dst_stream.write(block, 0); - Poco::File file4(path); - std::cerr << "after write block, file exists: " << file4.exists() << std::endl; } src_stream->readSuffix(); dst_stream.writeSuffix(); - Poco::File file5(path); - std::cerr << "after write suffix, file exists: " << file5.exists() << std::endl; Poco::File file(path); uint64_t total_size = file.getSize(); // if key_count is 0, file will be deleted return {1, key_count, total_size}; } -bool isTiFlashSnapshot(TMTContext * tmt, const String & path) +bool TiFlashSnapshotHandler::isTiFlashSnapshot(TMTContext * tmt, const String & path) { return DM::DMFile::isValidDMFileInSingleFileMode(tmt->getContext().getFileProvider(), path); } -void deleteTiFlashSnapshot(TiFlashSnapshot * snap) { delete snap; } +void TiFlashSnapshotHandler::deleteTiFlashSnapshot(TiFlashSnapshot * snap) { delete snap; } -void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap) { delete snap; } +void TiFlashSnapshotHandler::deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap) { delete snap; } -} \ No newline at end of file +} diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.h b/dbms/src/Flash/TiFlashSnapshotHandler.h index 5d82bed000b..d8676ffd7e1 100644 --- a/dbms/src/Flash/TiFlashSnapshotHandler.h +++ b/dbms/src/Flash/TiFlashSnapshotHandler.h @@ -18,13 +18,22 @@ struct SerializeTiFlashSnapshotRes }; using String = std::string; -PreHandledTiFlashSnapshot *preHandleTiFlashSnapshot(RegionPtr region, const String & path); -void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap); -TiFlashSnapshot *genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id); -SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path); -bool isTiFlashSnapshot(TMTContext * tmt, const String & path); -void deleteTiFlashSnapshot(TiFlashSnapshot * snap); -void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap); +class TiFlashSnapshotHandler +{ +public: + static PreHandledTiFlashSnapshot *preHandleTiFlashSnapshot(RegionPtr region, const String & path); -} + static void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap); + + static TiFlashSnapshot *genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id); + + static SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path); + + static bool isTiFlashSnapshot(TMTContext * tmt, const String & path); + static void deleteTiFlashSnapshot(TiFlashSnapshot * snap); + + static void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap); +}; + +} diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 1d33c39002b..6588a0f023d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -361,7 +361,6 @@ void DMFile::finalize(const FileProviderPtr & file_provider) void DMFile::finalize(WriteBuffer & buffer) { - std::cerr << "DMFile::finalize" << std::endl; Footer footer; footer.magic_number = DMFile::magic_number; std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMeta(buffer); @@ -394,7 +393,6 @@ void DMFile::finalize(WriteBuffer & buffer) // If the path is same, then this is a snapshot file, no need to rename the file if (old_path == new_path) return; - std::cerr << "old_path " << old_path << " new_path " << new_path << std::endl; Poco::File old_file(old_path); Poco::File old_ngc_file(old_ngc_path); Poco::File file(new_path); @@ -524,7 +522,7 @@ bool DMFile::isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider return number == DMFile::magic_number; } -const DMFile::MagicNumber DMFile::magic_number = 0x13579BDF13579BDF; +const DMFile::MagicNumber DMFile::magic_number = 0x23579BDF48799ADE; } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 26dd1e545ac..6df8fb9a1bc 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -112,12 +112,10 @@ void DMFileWriter::write(const Block & block, size_t not_clean_rows) void DMFileWriter::finalize() { - std::cerr << "finalize\n"; for (auto & cd : write_columns) { finalizeColumn(cd.id, cd.type); } - std::cerr << "after finalizeColumn\n"; if (single_file_mode) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 8460361f988..31b95f5471d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -95,7 +95,6 @@ class DMFileWriter void flush() { - std::cerr << "flush plain_hashing\n"; plain_hashing.next(); plain_file->next(); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 09239a0fee3..e1f0ca402b0 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -197,7 +197,7 @@ void ApplyPreHandledTiKVSnapshot(TiFlashServer * server, PreHandledTiKVSnapshot void ApplyPreHandledTiFlashSnapshot(TiFlashServer * server, PreHandledTiFlashSnapshot * snap) { - applyPreHandledTiFlashSnapshot(server->tmt, snap); + TiFlashSnapshotHandler::applyPreHandledTiFlashSnapshot(server->tmt, snap); } void ApplyPreHandledSnapshot(TiFlashServer * server, void * res, RawCppPtrType type) @@ -239,10 +239,10 @@ void GcRawCppPtr(TiFlashServer *, RawCppPtr p) delete reinterpret_cast(ptr); break; case RawCppPtrType::TiFlashSnapshot: - deleteTiFlashSnapshot(reinterpret_cast(ptr)); + TiFlashSnapshotHandler::deleteTiFlashSnapshot(reinterpret_cast(ptr)); break; case RawCppPtrType::PreHandledTiFlashSnapshot: - deletePreHandledTiFlashSnapshot(reinterpret_cast(ptr)); + TiFlashSnapshotHandler::deletePreHandledTiFlashSnapshot(reinterpret_cast(ptr)); break; case RawCppPtrType::SplitKeys: delete reinterpret_cast(ptr); @@ -278,7 +278,7 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) if (!kvstore->preGenTiFlashSnapshot(header.region_id, header.index, *tmt)) return RawCppPtr(nullptr, RawCppPtrType::None); - return RawCppPtr(genTiFlashSnapshot(tmt, header.region_id), RawCppPtrType::TiFlashSnapshot); + return RawCppPtr(TiFlashSnapshotHandler::genTiFlashSnapshot(tmt, header.region_id), RawCppPtrType::TiFlashSnapshot); } catch (...) { @@ -291,7 +291,7 @@ SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, { std::string real_path(path.data, path.len); std::cerr << "serialize TiFlashSnapshot into path " << real_path << "\n"; - auto res = serializeTiFlashSnapshotInto(server->tmt, snapshot, real_path); + auto res = TiFlashSnapshotHandler::serializeTiFlashSnapshotInto(server->tmt, snapshot, real_path); std::cerr << "finish write " << res.total_size << " bytes " << "\n"; return res; @@ -301,7 +301,7 @@ uint8_t IsTiFlashSnapshot(TiFlashServer * server, BaseBuffView path) { std::string real_path(path.data, path.len); std::cerr << "IsTiFlashSnapshot of path " << real_path << "\n"; - auto res = isTiFlashSnapshot(server->tmt, real_path); + auto res = TiFlashSnapshotHandler::isTiFlashSnapshot(server->tmt, real_path); std::cerr << "start to check IsTiFlashSnapshot, res " << res << "\n"; return res; } @@ -319,7 +319,7 @@ RawCppPtr PreHandleTiFlashSnapshot( std::cerr << "PreHandleTiFlashSnapshot from path " << real_path << " region " << region.id() << " peer " << peer_id << " index " << index << " term " << term << "\n"; - return RawCppPtr(preHandleTiFlashSnapshot(new_region, real_path), RawCppPtrType::PreHandledTiFlashSnapshot); + return RawCppPtr(TiFlashSnapshotHandler::preHandleTiFlashSnapshot(new_region, real_path), RawCppPtrType::PreHandledTiFlashSnapshot); } catch (...) { From 16fc96b1b254f97e276d5a7dc0c466c75bec9c94 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Sun, 18 Oct 2020 12:34:47 +0800 Subject: [PATCH 4/4] format code --- dbms/src/Flash/TiFlashSnapshotHandler.cpp | 25 ++++++++++---------- dbms/src/Flash/TiFlashSnapshotHandler.h | 8 +++---- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 12 +++++----- dbms/src/Storages/DeltaMerge/File/DMFile.h | 7 ++---- dbms/src/Storages/Transaction/ProxyFFI.cpp | 6 ++--- 5 files changed, 28 insertions(+), 30 deletions(-) diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.cpp b/dbms/src/Flash/TiFlashSnapshotHandler.cpp index 4f58ef30d1e..5ba7b8b9c0d 100644 --- a/dbms/src/Flash/TiFlashSnapshotHandler.cpp +++ b/dbms/src/Flash/TiFlashSnapshotHandler.cpp @@ -1,16 +1,16 @@ +#include +#include #include #include #include +#include #include -#include -#include -#include #include #include #include -#include -#include -#include +#include +#include +#include #include @@ -44,7 +44,7 @@ TiFlashSnapshot::~TiFlashSnapshot() << "\n"; } -PreHandledTiFlashSnapshot *TiFlashSnapshotHandler::preHandleTiFlashSnapshot(RegionPtr region, const String & path) +PreHandledTiFlashSnapshot * TiFlashSnapshotHandler::preHandleTiFlashSnapshot(RegionPtr region, const String & path) { return new PreHandledTiFlashSnapshot{std::move(region), path}; } @@ -81,7 +81,7 @@ void TiFlashSnapshotHandler::applyPreHandledTiFlashSnapshot(TMTContext * tmt, Pr stream.readSuffix(); } -TiFlashSnapshot *TiFlashSnapshotHandler::genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id) +TiFlashSnapshot * TiFlashSnapshotHandler::genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id) { auto & kvstore = tmt->getKVStore(); // generate snapshot struct; @@ -93,8 +93,8 @@ TiFlashSnapshot *TiFlashSnapshotHandler::genTiFlashSnapshot(TMTContext * tmt, ui if (storage == nullptr) { LOG_WARNING(log, - "genTiFlashSnapshot can not get table for region:" + region->toString() - + " with table id: " + DB::toString(table_id) + ", ignored"); + "genTiFlashSnapshot can not get table for region:" + region->toString() + " with table id: " + DB::toString(table_id) + + ", ignored"); return new TiFlashSnapshot(DM::ColumnDefines{}); } @@ -135,7 +135,8 @@ TiFlashSnapshot *TiFlashSnapshotHandler::genTiFlashSnapshot(TMTContext * tmt, ui return snapshot; } -SerializeTiFlashSnapshotRes TiFlashSnapshotHandler::serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path) +SerializeTiFlashSnapshotRes TiFlashSnapshotHandler::serializeTiFlashSnapshotInto( + TMTContext * tmt, TiFlashSnapshot * snapshot, const String & path) { if (snapshot->write_columns.empty()) return {0, 0, 0}; @@ -167,4 +168,4 @@ void TiFlashSnapshotHandler::deleteTiFlashSnapshot(TiFlashSnapshot * snap) { del void TiFlashSnapshotHandler::deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap) { delete snap; } -} +} // namespace DB diff --git a/dbms/src/Flash/TiFlashSnapshotHandler.h b/dbms/src/Flash/TiFlashSnapshotHandler.h index d8676ffd7e1..f595c19909e 100644 --- a/dbms/src/Flash/TiFlashSnapshotHandler.h +++ b/dbms/src/Flash/TiFlashSnapshotHandler.h @@ -21,13 +21,13 @@ using String = std::string; class TiFlashSnapshotHandler { public: - static PreHandledTiFlashSnapshot *preHandleTiFlashSnapshot(RegionPtr region, const String & path); + static PreHandledTiFlashSnapshot * preHandleTiFlashSnapshot(RegionPtr region, const String & path); static void applyPreHandledTiFlashSnapshot(TMTContext * tmt, PreHandledTiFlashSnapshot * snap); - static TiFlashSnapshot *genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id); + static TiFlashSnapshot * genTiFlashSnapshot(TMTContext * tmt, uint64_t region_id); - static SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot *snapshot, const String & path); + static SerializeTiFlashSnapshotRes serializeTiFlashSnapshotInto(TMTContext * tmt, TiFlashSnapshot * snapshot, const String & path); static bool isTiFlashSnapshot(TMTContext * tmt, const String & path); @@ -36,4 +36,4 @@ class TiFlashSnapshotHandler static void deletePreHandledTiFlashSnapshot(PreHandledTiFlashSnapshot * snap); }; -} +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 6588a0f023d..dcd8b339f62 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -107,7 +107,7 @@ DMFilePtr DMFile::create(const String & path) { Logger * log = &Logger::get("DMFile"); - DMFilePtr new_dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::WRITABLE, log)); + DMFilePtr new_dmfile(new DMFile(path, Mode::SINGLE_FILE, Status::WRITABLE, log)); Poco::File file(path); if (file.exists()) { @@ -362,7 +362,7 @@ void DMFile::finalize(const FileProviderPtr & file_provider) void DMFile::finalize(WriteBuffer & buffer) { Footer footer; - footer.magic_number = DMFile::magic_number; + footer.magic_number = DMFile::magic_number; std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMeta(buffer); std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePack(buffer); footer.sub_file_stat_offset = buffer.count(); @@ -386,10 +386,10 @@ void DMFile::finalize(WriteBuffer & buffer) if (status != Status::WRITING) throw Exception("Expected WRITING status, now " + statusString(status)); - auto old_path = path(); + auto old_path = path(); auto old_ngc_path = ngcPath(); - status = Status::READABLE; - auto new_path = path(); + status = Status::READABLE; + auto new_path = path(); // If the path is same, then this is a snapshot file, no need to rename the file if (old_path == new_path) return; @@ -515,7 +515,7 @@ bool DMFile::isValidDMFileInSingleFileMode(const FileProviderPtr & file_provider if (!file.isFile()) return false; - MagicNumber number; + MagicNumber number; ReadBufferFromFileProvider buf(file_provider, path, EncryptionPath(path, "")); buf.seek(file.getSize() - sizeof(MagicNumber), SEEK_SET); DB::readIntBinary(number, buf); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index dec3ddc0b26..f9748952b7d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -90,7 +90,7 @@ class DMFile : private boost::noncopyable UInt32 sub_file_num; DMSingleFileFormatVersion file_format_version; - MagicNumber magic_number; + MagicNumber magic_number; }; using PackStats = PaddedPODArray; @@ -199,10 +199,7 @@ class DMFile : private boost::noncopyable { } - DMFile(const String & path_, Mode mode_, Status status_, Logger * log_) - : file_path(path_), mode(mode_), status(status_), log(log_) - { - } + DMFile(const String & path_, Mode mode_, Status status_, Logger * log_) : file_path(path_), mode(mode_), status(status_), log(log_) {} bool isSingleFileMode() const { return mode == Mode::SINGLE_FILE; } bool isFolderMode() const { return mode == Mode::FOLDER; } diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index e1f0ca402b0..8b2e31115ee 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -287,7 +287,7 @@ RawCppPtr GenTiFlashSnapshot(TiFlashServer * server, RaftCmdHeader header) } } -SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, TiFlashSnapshot *snapshot, BaseBuffView path) +SerializeTiFlashSnapshotRes SerializeTiFlashSnapshotInto(TiFlashServer * server, TiFlashSnapshot * snapshot, BaseBuffView path) { std::string real_path(path.data, path.len); std::cerr << "serialize TiFlashSnapshot into path " << real_path << "\n"; @@ -317,8 +317,8 @@ RawCppPtr PreHandleTiFlashSnapshot( auto new_region = GenRegionPtr(std::move(region), peer_id, index, term, tmt); std::string real_path(path.data, path.len); - std::cerr << "PreHandleTiFlashSnapshot from path " << real_path << " region " << region.id() << " peer " << peer_id - << " index " << index << " term " << term << "\n"; + std::cerr << "PreHandleTiFlashSnapshot from path " << real_path << " region " << region.id() << " peer " << peer_id << " index " + << index << " term " << term << "\n"; return RawCppPtr(TiFlashSnapshotHandler::preHandleTiFlashSnapshot(new_region, real_path), RawCppPtrType::PreHandledTiFlashSnapshot); } catch (...)