Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ static bool has_file_cache_statistics(const io::FileCacheStatistics& stats) {
stats.inverted_index_bytes_read_from_remote != 0 ||
stats.inverted_index_bytes_read_from_peer != 0 ||
stats.inverted_index_local_io_timer != 0 || stats.inverted_index_remote_io_timer != 0 ||
stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0;
stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0 ||
stats.inverted_index_request_bytes != 0 || stats.inverted_index_read_bytes != 0 ||
stats.inverted_index_range_read_count != 0 ||
stats.inverted_index_serial_read_rounds != 0;
}

Status OlapScanner::_prepare_impl() {
Expand Down
169 changes: 169 additions & 0 deletions be/src/io/cache/block_file_cache_profile.cpp

Large diffs are not rendered by default.

37 changes: 36 additions & 1 deletion be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/Metrics_types.h>

#include <array>
#include <atomic>
#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -58,7 +59,6 @@ class FileCacheMetrics {
void register_entity();
void update_metrics_callback();

private:
std::mutex _mtx;
// use shared_ptr for concurrent
std::shared_ptr<AtomicStatistics> _statistics;
Expand All @@ -75,11 +75,20 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_peer = nullptr;
RuntimeProfile::Counter* remote_physical_read_bytes = nullptr;
RuntimeProfile::Counter* remote_physical_read_count = nullptr;
RuntimeProfile::Counter* peer_physical_read_bytes = nullptr;
RuntimeProfile::Counter* peer_physical_read_count = nullptr;
RuntimeProfile::Counter* remote_io_timer = nullptr;
RuntimeProfile::Counter* peer_io_timer = nullptr;
RuntimeProfile::Counter* remote_wait_timer = nullptr;
RuntimeProfile::Counter* write_cache_io_timer = nullptr;
RuntimeProfile::Counter* bytes_write_into_cache = nullptr;
RuntimeProfile::Counter* file_cache_blocks_total = nullptr;
RuntimeProfile::Counter* file_cache_blocks_hit = nullptr;
RuntimeProfile::Counter* file_cache_blocks_miss = nullptr;
RuntimeProfile::Counter* file_cache_blocks_skip = nullptr;
RuntimeProfile::Counter* file_cache_blocks_downloading = nullptr;
RuntimeProfile::Counter* num_skip_cache_io_total = nullptr;
RuntimeProfile::Counter* read_cache_file_directly_timer = nullptr;
RuntimeProfile::Counter* cache_get_or_set_timer = nullptr;
Expand All @@ -93,10 +102,36 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_remote = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_peer = nullptr;
RuntimeProfile::Counter* inverted_index_remote_physical_read_bytes = nullptr;
RuntimeProfile::Counter* inverted_index_remote_physical_read_count = nullptr;
RuntimeProfile::Counter* inverted_index_peer_physical_read_bytes = nullptr;
RuntimeProfile::Counter* inverted_index_peer_physical_read_count = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_write_into_cache = nullptr;
RuntimeProfile::Counter* inverted_index_file_cache_blocks_total = nullptr;
RuntimeProfile::Counter* inverted_index_file_cache_blocks_hit = nullptr;
RuntimeProfile::Counter* inverted_index_file_cache_blocks_miss = nullptr;
RuntimeProfile::Counter* inverted_index_file_cache_blocks_skip = nullptr;
RuntimeProfile::Counter* inverted_index_file_cache_blocks_downloading = nullptr;
RuntimeProfile::Counter* inverted_index_local_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_peer_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_request_bytes = nullptr;
RuntimeProfile::Counter* inverted_index_read_bytes = nullptr;
RuntimeProfile::Counter* inverted_index_range_read_count = nullptr;
RuntimeProfile::Counter* inverted_index_serial_read_rounds = nullptr;
std::array<RuntimeProfile::Counter*, SNII_SECTION_COUNT>
inverted_index_snii_section_read_bytes {};
std::array<RuntimeProfile::Counter*, SNII_SECTION_COUNT>
inverted_index_snii_section_remote_physical_read_bytes {};
std::array<RuntimeProfile::Counter*, SNII_SECTION_COUNT>
inverted_index_snii_section_bytes_write_into_cache {};
std::array<RuntimeProfile::Counter*, SNII_SECTION_COUNT>
inverted_index_snii_section_file_cache_blocks_total {};
std::array<RuntimeProfile::Counter*, SNII_SECTION_COUNT>
inverted_index_snii_section_file_cache_blocks_hit {};
std::array<RuntimeProfile::Counter*, SNII_SECTION_COUNT>
inverted_index_snii_section_file_cache_blocks_miss {};

FileCacheProfileReporter(RuntimeProfile* profile);
void update(const FileCacheStatistics* statistics) const;
Expand Down
81 changes: 76 additions & 5 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>
s3_read_counter << 1;
SCOPED_RAW_TIMER(&stats.remote_read_timer);
stats.from_peer_cache = false;
return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx);
auto st = remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx);
if (st.ok()) {
++stats.remote_physical_read_count;
stats.remote_physical_read_bytes += size;
}
return st;
}

// Get peer connection info from tablet_id
Expand Down Expand Up @@ -220,6 +225,9 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t
if (!st.ok()) {
VLOG_DEBUG << "PeerFileCacheReader read from peer failed"
<< ", host=" << host << ", port=" << port << ", error=" << st.msg();
} else {
++stats.peer_physical_read_count;
stats.peer_physical_read_bytes += size;
}
stats.from_peer_cache = true;
return st;
Expand Down Expand Up @@ -322,15 +330,34 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
if (!io_ctx->is_warmup) {
// update stats increment in this reading procedure for file cache metrics
FileCacheStatistics fcache_stats_increment;
_update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index);
_update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index,
io_ctx->snii_section_type);
io::FileCacheMetrics::instance().update(&fcache_stats_increment);
}
if (io_ctx->file_cache_stats) {
// update stats in io_ctx, for query profile
_update_stats(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index);
_update_stats(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index,
io_ctx->snii_section_type);
}
};
std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func));
if (!io_ctx->read_file_cache) {
SCOPED_RAW_TIMER(&stats.remote_read_timer);
size_t direct_bytes_read = 0;
RETURN_IF_ERROR(_remote_file_reader->read_at(offset, result, &direct_bytes_read, io_ctx));
if (direct_bytes_read != bytes_req) {
return Status::IOError("short remote read at offset {}, expect {}, got {}", offset,
bytes_req, direct_bytes_read);
}
*bytes_read = direct_bytes_read;
stats.hit_cache = false;
stats.skip_cache = true;
stats.bytes_read_from_remote += direct_bytes_read;
++stats.remote_physical_read_count;
stats.remote_physical_read_bytes += direct_bytes_read;
read_success = true;
return Status::OK();
}
if (_is_doris_table && config::enable_read_cache_file_directly) {
// read directly
SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
Expand Down Expand Up @@ -363,6 +390,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}
stats.bytes_read_from_local += reserve_bytes;
}
++stats.file_cache_blocks_total;
++stats.file_cache_blocks_hit;
_cache->add_need_update_lru_block(iter->second);
need_read_size -= reserve_bytes;
cur_offset += reserve_bytes;
Expand Down Expand Up @@ -403,6 +432,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
for (auto& block : holder.file_blocks) {
switch (block->state()) {
case FileBlock::State::EMPTY:
++stats.file_cache_blocks_total;
++stats.file_cache_blocks_miss;
VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}",
path().native(), _cache_hash.to_string(), _cache_hash.high(),
_cache_hash.low(), block->offset(), block->get_cache_file());
Expand All @@ -414,6 +445,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
stats.hit_cache = false;
break;
case FileBlock::State::SKIP_CACHE:
++stats.file_cache_blocks_total;
++stats.file_cache_blocks_skip;
VLOG_DEBUG << fmt::format(
"Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}",
path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(),
Expand All @@ -423,9 +456,13 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
stats.skip_cache = true;
break;
case FileBlock::State::DOWNLOADING:
++stats.file_cache_blocks_total;
++stats.file_cache_blocks_downloading;
stats.hit_cache = false;
break;
case FileBlock::State::DOWNLOADED:
++stats.file_cache_blocks_total;
++stats.file_cache_blocks_hit;
_insert_file_reader(block);
break;
}
Expand Down Expand Up @@ -608,8 +645,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}

void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
FileCacheStatistics* statis,
bool is_inverted_index) const {
FileCacheStatistics* statis, bool is_inverted_index,
uint8_t snii_section_type) const {
if (statis == nullptr) {
return;
}
Expand All @@ -627,10 +664,19 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
statis->bytes_read_from_peer += read_stats.bytes_read_from_peer;
statis->peer_io_timer += read_stats.peer_read_timer;
}
statis->remote_physical_read_count += read_stats.remote_physical_read_count;
statis->remote_physical_read_bytes += read_stats.remote_physical_read_bytes;
statis->peer_physical_read_count += read_stats.peer_physical_read_count;
statis->peer_physical_read_bytes += read_stats.peer_physical_read_bytes;
statis->remote_wait_timer += read_stats.remote_wait_timer;
statis->local_io_timer += read_stats.local_read_timer;
statis->num_skip_cache_io_total += read_stats.skip_cache;
statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache;
statis->file_cache_blocks_total += read_stats.file_cache_blocks_total;
statis->file_cache_blocks_hit += read_stats.file_cache_blocks_hit;
statis->file_cache_blocks_miss += read_stats.file_cache_blocks_miss;
statis->file_cache_blocks_skip += read_stats.file_cache_blocks_skip;
statis->file_cache_blocks_downloading += read_stats.file_cache_blocks_downloading;
statis->write_cache_io_timer += read_stats.local_write_timer;

statis->read_cache_file_directly_timer += read_stats.read_cache_file_directly_timer;
Expand All @@ -654,7 +700,32 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read_from_peer;
statis->inverted_index_peer_io_timer += read_stats.peer_read_timer;
}
statis->inverted_index_remote_physical_read_count += read_stats.remote_physical_read_count;
statis->inverted_index_remote_physical_read_bytes += read_stats.remote_physical_read_bytes;
statis->inverted_index_peer_physical_read_count += read_stats.peer_physical_read_count;
statis->inverted_index_peer_physical_read_bytes += read_stats.peer_physical_read_bytes;
statis->inverted_index_bytes_write_into_cache += read_stats.bytes_write_into_file_cache;
statis->inverted_index_file_cache_blocks_total += read_stats.file_cache_blocks_total;
statis->inverted_index_file_cache_blocks_hit += read_stats.file_cache_blocks_hit;
statis->inverted_index_file_cache_blocks_miss += read_stats.file_cache_blocks_miss;
statis->inverted_index_file_cache_blocks_skip += read_stats.file_cache_blocks_skip;
statis->inverted_index_file_cache_blocks_downloading +=
read_stats.file_cache_blocks_downloading;
statis->inverted_index_local_io_timer += read_stats.local_read_timer;
if (snii_section_type < SNII_SECTION_COUNT) {
statis->inverted_index_snii_section_read_bytes[snii_section_type] +=
read_stats.bytes_read;
statis->inverted_index_snii_section_remote_physical_read_bytes[snii_section_type] +=
read_stats.remote_physical_read_bytes;
statis->inverted_index_snii_section_bytes_write_into_cache[snii_section_type] +=
read_stats.bytes_write_into_file_cache;
statis->inverted_index_snii_section_file_cache_blocks_total[snii_section_type] +=
read_stats.file_cache_blocks_total;
statis->inverted_index_snii_section_file_cache_blocks_hit[snii_section_type] +=
read_stats.file_cache_blocks_hit;
statis->inverted_index_snii_section_file_cache_blocks_miss[snii_section_type] +=
read_stats.file_cache_blocks_miss;
}
}

g_skip_cache_sum << read_stats.skip_cache;
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class CachedRemoteFileReader final : public FileReader,
ReadStatistics& stats, const IOContext* io_ctx);

void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
bool is_inverted_index, uint8_t snii_section_type) const;

bool _is_doris_table = false;
int64_t _tablet_id = -1;
Expand Down
9 changes: 9 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ struct ReadStatistics {
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
int64_t bytes_read_from_peer = 0;
int64_t remote_physical_read_count = 0;
int64_t remote_physical_read_bytes = 0;
int64_t peer_physical_read_count = 0;
int64_t peer_physical_read_bytes = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t file_cache_blocks_total = 0;
int64_t file_cache_blocks_hit = 0;
int64_t file_cache_blocks_miss = 0;
int64_t file_cache_blocks_skip = 0;
int64_t file_cache_blocks_downloading = 0;
int64_t remote_read_timer = 0;
int64_t peer_read_timer = 0;
int64_t remote_wait_timer = 0; // wait for other downloader
Expand Down
47 changes: 47 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

#include <gen_cpp/Types_types.h>

#include <array>
#include <cstddef>
#include <cstdint>

namespace doris {

enum class ReaderType : uint8_t {
Expand All @@ -37,6 +41,17 @@ enum class ReaderType : uint8_t {

namespace io {

enum SniiSectionType : uint8_t {
SNII_SECTION_UNKNOWN = 0,
SNII_SECTION_META = 1,
SNII_SECTION_DICT = 2,
SNII_SECTION_POSTING = 3,
SNII_SECTION_BSBF = 4,
SNII_SECTION_NORMS = 5,
SNII_SECTION_NULL_BITMAP = 6,
SNII_SECTION_COUNT = 7,
};

struct FileReaderStats {
size_t read_calls = 0;
size_t read_bytes = 0;
Expand All @@ -52,11 +67,20 @@ struct FileCacheStatistics {
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
int64_t bytes_read_from_peer = 0;
int64_t remote_physical_read_count = 0;
int64_t remote_physical_read_bytes = 0;
int64_t peer_physical_read_count = 0;
int64_t peer_physical_read_bytes = 0;
int64_t remote_io_timer = 0;
int64_t peer_io_timer = 0;
int64_t remote_wait_timer = 0;
int64_t write_cache_io_timer = 0;
int64_t bytes_write_into_cache = 0;
int64_t file_cache_blocks_total = 0;
int64_t file_cache_blocks_hit = 0;
int64_t file_cache_blocks_miss = 0;
int64_t file_cache_blocks_skip = 0;
int64_t file_cache_blocks_downloading = 0;
int64_t num_skip_cache_io_total = 0;
int64_t read_cache_file_directly_timer = 0;
int64_t cache_get_or_set_timer = 0;
Expand All @@ -70,10 +94,32 @@ struct FileCacheStatistics {
int64_t inverted_index_bytes_read_from_local = 0;
int64_t inverted_index_bytes_read_from_remote = 0;
int64_t inverted_index_bytes_read_from_peer = 0;
int64_t inverted_index_remote_physical_read_count = 0;
int64_t inverted_index_remote_physical_read_bytes = 0;
int64_t inverted_index_peer_physical_read_count = 0;
int64_t inverted_index_peer_physical_read_bytes = 0;
int64_t inverted_index_bytes_write_into_cache = 0;
int64_t inverted_index_file_cache_blocks_total = 0;
int64_t inverted_index_file_cache_blocks_hit = 0;
int64_t inverted_index_file_cache_blocks_miss = 0;
int64_t inverted_index_file_cache_blocks_skip = 0;
int64_t inverted_index_file_cache_blocks_downloading = 0;
int64_t inverted_index_local_io_timer = 0;
int64_t inverted_index_remote_io_timer = 0;
int64_t inverted_index_peer_io_timer = 0;
int64_t inverted_index_io_timer = 0;
int64_t inverted_index_request_bytes = 0;
int64_t inverted_index_read_bytes = 0;
int64_t inverted_index_range_read_count = 0;
int64_t inverted_index_serial_read_rounds = 0;

std::array<int64_t, SNII_SECTION_COUNT> inverted_index_snii_section_read_bytes {};
std::array<int64_t, SNII_SECTION_COUNT>
inverted_index_snii_section_remote_physical_read_bytes {};
std::array<int64_t, SNII_SECTION_COUNT> inverted_index_snii_section_bytes_write_into_cache {};
std::array<int64_t, SNII_SECTION_COUNT> inverted_index_snii_section_file_cache_blocks_total {};
std::array<int64_t, SNII_SECTION_COUNT> inverted_index_snii_section_file_cache_blocks_hit {};
std::array<int64_t, SNII_SECTION_COUNT> inverted_index_snii_section_file_cache_blocks_miss {};
};

struct IOContext {
Expand All @@ -91,6 +137,7 @@ struct IOContext {
FileCacheStatistics* file_cache_stats = nullptr; // Ref
FileReaderStats* file_reader_stats = nullptr; // Ref
bool is_inverted_index = false;
uint8_t snii_section_type = SNII_SECTION_UNKNOWN;
// if is_dryrun, read IO will download data to cache but return no data to reader
// useful to skip cache data read from local disk to accelarate warm up
bool is_dryrun = false;
Expand Down
Loading
Loading