From 1b32f3ddc86b3ad11f3d96f11f5d8eea084ec6d0 Mon Sep 17 00:00:00 2001 From: Innocent Date: Sat, 6 Jun 2026 14:08:27 -0700 Subject: [PATCH] feat:metrics reporting integration --- .gitignore | 3 + .../catalog/memory/in_memory_catalog.cc | 22 +- .../catalog/memory/in_memory_catalog.h | 4 + src/iceberg/catalog/rest/CMakeLists.txt | 1 + src/iceberg/catalog/rest/catalog_properties.h | 6 + src/iceberg/catalog/rest/meson.build | 1 + src/iceberg/catalog/rest/rest_catalog.cc | 50 +- src/iceberg/catalog/rest/rest_catalog.h | 18 +- .../catalog/rest/rest_metrics_reporter.cc | 69 +++ .../catalog/rest/rest_metrics_reporter.h | 62 +++ src/iceberg/catalog/sql/sql_catalog.cc | 23 +- src/iceberg/catalog/sql/sql_catalog.h | 5 + src/iceberg/delete_file_index.cc | 16 + src/iceberg/delete_file_index.h | 8 + src/iceberg/manifest/manifest_group.cc | 54 +++ src/iceberg/manifest/manifest_group.h | 6 + src/iceberg/manifest/manifest_reader.cc | 8 + src/iceberg/manifest/manifest_reader.h | 9 + .../manifest/manifest_reader_internal.h | 3 + src/iceberg/table.cc | 36 +- src/iceberg/table.h | 29 +- src/iceberg/table_scan.cc | 59 ++- src/iceberg/table_scan.h | 14 + src/iceberg/test/CMakeLists.txt | 2 + src/iceberg/test/fast_append_test.cc | 153 ++++++ .../test/rest_metrics_reporter_test.cc | 74 +++ .../test/scan_planning_metrics_test.cc | 451 ++++++++++++++++++ src/iceberg/transaction.cc | 40 +- src/iceberg/update/snapshot_update.h | 14 + 29 files changed, 1195 insertions(+), 45 deletions(-) create mode 100644 src/iceberg/catalog/rest/rest_metrics_reporter.cc create mode 100644 src/iceberg/catalog/rest/rest_metrics_reporter.h create mode 100644 src/iceberg/test/rest_metrics_reporter_test.cc create mode 100644 src/iceberg/test/scan_planning_metrics_test.cc diff --git a/.gitignore b/.gitignore index 458f876f7..6fb9e41ed 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,9 @@ # under the License. build/ +build_bundle/ +build_core/ +builddir/ cmake-build/ cmake-build-debug/ cmake-build-release/ diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index ede316add..686d258db 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -22,6 +22,7 @@ #include #include +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" @@ -351,7 +352,15 @@ InMemoryCatalog::InMemoryCatalog( properties_(std::move(properties)), file_io_(std::move(file_io)), warehouse_location_(std::move(warehouse_location)), - root_namespace_(std::make_unique()) {} + root_namespace_(std::make_unique()) { + auto it = properties_.find(std::string(kMetricsReporterImpl)); + if (it != properties_.end() && !it->second.empty() && + it->second != kMetricsReporterTypeNoop) { + if (auto r = MetricsReporters::Load(properties_); r.has_value()) { + reporter_ = std::shared_ptr(std::move(r.value())); + } + } +} InMemoryCatalog::~InMemoryCatalog() = default; @@ -428,7 +437,8 @@ Result> InMemoryCatalog::CreateTable( ICEBERG_RETURN_UNEXPECTED( root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location)); return Table::Make(identifier, std::move(table_metadata), - std::move(metadata_file_location), file_io_, shared_from_this()); + std::move(metadata_file_location), file_io_, shared_from_this(), + reporter_); } Result> InMemoryCatalog::UpdateTable( @@ -479,7 +489,7 @@ Result> InMemoryCatalog::UpdateTable( TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated); return Table::Make(identifier, std::move(updated), std::move(new_metadata_location), - file_io_, shared_from_this()); + file_io_, shared_from_this(), reporter_); } Result> InMemoryCatalog::StageCreateTable( @@ -500,7 +510,7 @@ Result> InMemoryCatalog::StageCreateTable( TableMetadata::Make(*schema, *spec, *order, base_location, properties)); ICEBERG_ASSIGN_OR_RAISE( auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_, - shared_from_this())); + shared_from_this(), reporter_)); return Transaction::Make(std::move(table), TransactionKind::kCreate); } @@ -537,7 +547,7 @@ Result> InMemoryCatalog::LoadTable( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location)); return Table::Make(identifier, std::move(metadata), std::move(metadata_location), - file_io_, shared_from_this()); + file_io_, shared_from_this(), reporter_); } Result> InMemoryCatalog::RegisterTable( @@ -557,7 +567,7 @@ Result> InMemoryCatalog::RegisterTable( return UnknownError("The registry failed."); } return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } } // namespace iceberg diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index 22a596c10..8a6cce417 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -19,12 +19,15 @@ #pragma once +#include #include #include "iceberg/catalog.h" namespace iceberg { +class MetricsReporter; + /** * @brief An in-memory implementation of the Iceberg Catalog interface. * @@ -106,6 +109,7 @@ class ICEBERG_EXPORT InMemoryCatalog std::string warehouse_location_; std::unique_ptr root_namespace_; mutable std::shared_mutex mutex_; + std::shared_ptr reporter_; }; } // namespace iceberg diff --git a/src/iceberg/catalog/rest/CMakeLists.txt b/src/iceberg/catalog/rest/CMakeLists.txt index 8fb2e93c0..5afe34885 100644 --- a/src/iceberg/catalog/rest/CMakeLists.txt +++ b/src/iceberg/catalog/rest/CMakeLists.txt @@ -32,6 +32,7 @@ set(ICEBERG_REST_SOURCES resource_paths.cc rest_catalog.cc rest_file_io.cc + rest_metrics_reporter.cc rest_util.cc types.cc) diff --git a/src/iceberg/catalog/rest/catalog_properties.h b/src/iceberg/catalog/rest/catalog_properties.h index 0515926c7..4711d2d13 100644 --- a/src/iceberg/catalog/rest/catalog_properties.h +++ b/src/iceberg/catalog/rest/catalog_properties.h @@ -55,6 +55,12 @@ class ICEBERG_REST_EXPORT RestCatalogProperties inline static Entry kNamespaceSeparator{"namespace-separator", "%1F"}; /// \brief The snapshot loading mode (ALL or REFS). inline static Entry kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"}; + /// \brief Whether to report metrics to the REST catalog server (default: true). + /// + /// When true and the server advertises the ReportMetrics endpoint, RestCatalog + /// automatically POSTs scan and commit reports to the per-table metrics endpoint. + inline static Entry kMetricsReportingEnabled{ + "rest-metrics-reporting-enabled", "true"}; /// \brief The prefix for HTTP headers. inline static constexpr std::string_view kHeaderPrefix = "header."; diff --git a/src/iceberg/catalog/rest/meson.build b/src/iceberg/catalog/rest/meson.build index f3eae6d45..82b3b30a4 100644 --- a/src/iceberg/catalog/rest/meson.build +++ b/src/iceberg/catalog/rest/meson.build @@ -30,6 +30,7 @@ iceberg_rest_sources = files( 'resource_paths.cc', 'rest_catalog.cc', 'rest_file_io.cc', + 'rest_metrics_reporter.cc', 'rest_util.cc', 'types.cc', ) diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index f04f5fb55..d5567da14 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -36,9 +36,11 @@ #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/resource_paths.h" #include "iceberg/catalog/rest/rest_file_io.h" +#include "iceberg/catalog/rest/rest_metrics_reporter.h" #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/catalog/rest/types.h" #include "iceberg/json_serde_internal.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -48,6 +50,7 @@ #include "iceberg/table_update.h" #include "iceberg/transaction.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" namespace iceberg::rest { @@ -171,7 +174,7 @@ Result> RestCatalog::Make( // Get snapshot loading mode ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode()); - auto client = std::make_unique(final_config.ExtractHeaders()); + auto client = std::make_shared(final_config.ExtractHeaders()); ICEBERG_ASSIGN_OR_RAISE(auto catalog_session, auth_manager->CatalogSession(*client, final_config.configs())); @@ -185,7 +188,7 @@ Result> RestCatalog::Make( } RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, - std::unique_ptr client, + std::shared_ptr client, std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, @@ -201,10 +204,33 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr f catalog_session_(std::move(catalog_session)), snapshot_mode_(snapshot_mode) { ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null"); + const auto& props = config_.configs(); + auto it = props.find(std::string(kMetricsReporterImpl)); + if (it != props.end() && !it->second.empty() && + it->second != kMetricsReporterTypeNoop) { + if (auto r = MetricsReporters::Load(props); r.has_value()) { + reporter_ = std::shared_ptr(std::move(r.value())); + } + } } std::string_view RestCatalog::name() const { return name_; } +std::shared_ptr RestCatalog::MakeTableReporter( + const TableIdentifier& identifier) const { + auto enabled = config_.Get(RestCatalogProperties::kMetricsReportingEnabled); + if (StringUtils::ToLower(enabled) == "true" && + supported_endpoints_.contains(Endpoint::ReportMetrics())) { + auto path = paths_->Metrics(identifier); + if (path.has_value()) { + auto rest_reporter = + std::make_shared(client_, *path, catalog_session_); + return MetricsReporters::Combine(reporter_, rest_reporter); + } + } + return reporter_; +} + Result> RestCatalog::ListNamespaces(const Namespace& ns) const { ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces()); @@ -367,7 +393,8 @@ Result> RestCatalog::CreateTable( CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/false)); return Table::Make(identifier, std::move(result.metadata), - std::move(result.metadata_location), file_io_, shared_from_this()); + std::move(result.metadata_location), file_io_, shared_from_this(), + MakeTableReporter(identifier)); } Result> RestCatalog::UpdateTable( @@ -398,7 +425,7 @@ Result> RestCatalog::UpdateTable( return Table::Make(identifier, std::move(commit_response.metadata), std::move(commit_response.metadata_location), file_io_, - shared_from_this()); + shared_from_this(), MakeTableReporter(identifier)); } Result> RestCatalog::StageCreateTable( @@ -409,10 +436,11 @@ Result> RestCatalog::StageCreateTable( ICEBERG_ASSIGN_OR_RAISE(auto result, CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/true)); - ICEBERG_ASSIGN_OR_RAISE(auto staged_table, - StagedTable::Make(identifier, std::move(result.metadata), - std::move(result.metadata_location), file_io_, - shared_from_this())); + ICEBERG_ASSIGN_OR_RAISE( + auto staged_table, + StagedTable::Make(identifier, std::move(result.metadata), + std::move(result.metadata_location), file_io_, shared_from_this(), + MakeTableReporter(identifier))); return Transaction::Make(std::move(staged_table), TransactionKind::kCreate); } @@ -480,9 +508,11 @@ Result> RestCatalog::LoadTable(const TableIdentifier& ide ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); /// FIXME: support per-table FileIO creation + /// FIXME: support per-table auth session (currently uses catalog-level + /// catalog_session_) return Table::Make(identifier, std::move(load_result.metadata), std::move(load_result.metadata_location), file_io_, - shared_from_this()); + shared_from_this(), MakeTableReporter(identifier)); } Result> RestCatalog::RegisterTable( @@ -505,7 +535,7 @@ Result> RestCatalog::RegisterTable( ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); return Table::Make(identifier, std::move(load_result.metadata), std::move(load_result.metadata_location), file_io_, - shared_from_this()); + shared_from_this(), MakeTableReporter(identifier)); } } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 4fd4db5b8..57dda83ca 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -30,6 +30,10 @@ #include "iceberg/catalog/rest/type_fwd.h" #include "iceberg/result.h" +namespace iceberg { +class MetricsReporter; +} // namespace iceberg + /// \file iceberg/catalog/rest/rest_catalog.h /// RestCatalog implementation for Iceberg REST API. @@ -101,7 +105,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, private: RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, - std::unique_ptr client, std::unique_ptr paths, + std::shared_ptr client, std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, std::shared_ptr catalog_session, @@ -109,6 +113,15 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, Result LoadTableInternal(const TableIdentifier& identifier) const; + /// \brief Build the per-table metrics reporter. + /// + /// When rest-metrics-reporting-enabled is true and the server advertises the + /// ReportMetrics endpoint, returns a CompositeMetricsReporter combining configured + /// reporter with a RestMetricsReporter targeting this table. Otherwise returns the + /// configured reporter. + std::shared_ptr MakeTableReporter( + const TableIdentifier& identifier) const; + Result CreateTableInternal( const TableIdentifier& identifier, const std::shared_ptr& schema, const std::shared_ptr& spec, const std::shared_ptr& order, @@ -117,13 +130,14 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, RestCatalogProperties config_; std::shared_ptr file_io_; - std::unique_ptr client_; + std::shared_ptr client_; std::unique_ptr paths_; std::string name_; std::unordered_set supported_endpoints_; std::unique_ptr auth_manager_; std::shared_ptr catalog_session_; SnapshotMode snapshot_mode_; + std::shared_ptr reporter_; }; } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_metrics_reporter.cc b/src/iceberg/catalog/rest/rest_metrics_reporter.cc new file mode 100644 index 000000000..bd70c823c --- /dev/null +++ b/src/iceberg/catalog/rest/rest_metrics_reporter.cc @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/rest_metrics_reporter.h" + +#include +#include + +#include + +#include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/error_handlers.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/metrics/json_serde_internal.h" +#include "iceberg/metrics/metrics_reporter.h" + +namespace iceberg::rest { + +namespace { + +constexpr std::string_view kReportType = "report-type"; +constexpr std::string_view kScanReportType = "scan-report"; +constexpr std::string_view kCommitReportType = "commit-report"; + +} // namespace + +RestMetricsReporter::RestMetricsReporter(std::shared_ptr client, + std::string metrics_endpoint, + std::shared_ptr session) + : client_(std::move(client)), + metrics_endpoint_(std::move(metrics_endpoint)), + session_(std::move(session)) {} + +Status RestMetricsReporter::Report(const MetricsReport& report) { + // Serialize the report variant to JSON. + Result json_result = std::visit( + [](const auto& r) -> Result { return ToJson(r); }, report); + if (!json_result) { + return {}; + } + + // Inject "report-type" required by the REST spec (not included in core ToJson). + auto& json = json_result.value(); + json[kReportType] = + std::holds_alternative(report) ? kScanReportType : kCommitReportType; + + // POST to the metrics endpoint; suppress errors to match Java fire-and-forget behavior. + std::ignore = client_->Post(metrics_endpoint_, json.dump(), /*headers=*/{}, + *DefaultErrorHandler::Instance(), *session_); + return {}; +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_metrics_reporter.h b/src/iceberg/catalog/rest/rest_metrics_reporter.h new file mode 100644 index 000000000..9efe46720 --- /dev/null +++ b/src/iceberg/catalog/rest/rest_metrics_reporter.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/metrics/metrics_reporter.h" + +/// \file iceberg/catalog/rest/rest_metrics_reporter.h +/// \brief MetricsReporter that POSTs reports to the Iceberg REST metrics endpoint. + +namespace iceberg::rest { + +/// \brief Reports scan and commit metrics to the Iceberg REST catalog metrics endpoint. +/// +/// This is the default metrics reporter wired automatically by RestCatalog for each +/// table, mirroring Java's RESTMetricsReporter. It POSTs the serialized report to +/// POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics. +/// This C++ implementation calls HttpClient::Post() synchronously. +/// A future improvement would be to introduce a thread pool. +/// +/// This implementation uses the catalog-level AuthSession. Per-table auth is a future +/// improvement consistent with the existing FIXME for per-table FileIO in +/// RestCatalog::LoadTable. +class ICEBERG_REST_EXPORT RestMetricsReporter : public MetricsReporter { + public: + /// \param client Shared ownership of the HTTP client; must not be null. + /// \param metrics_endpoint Pre-built path from ResourcePaths::Metrics(). + /// \param session Auth session used to authenticate the POST request. + RestMetricsReporter(std::shared_ptr client, std::string metrics_endpoint, + std::shared_ptr session); + + /// \brief POST the report to the metrics endpoint, suppressing all errors. + Status Report(const MetricsReport& report) override; + + private: + std::shared_ptr client_; + std::string metrics_endpoint_; + std::shared_ptr session_; +}; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/sql/sql_catalog.cc b/src/iceberg/catalog/sql/sql_catalog.cc index eb066bacb..0c1638a22 100644 --- a/src/iceberg/catalog/sql/sql_catalog.cc +++ b/src/iceberg/catalog/sql/sql_catalog.cc @@ -24,6 +24,7 @@ #include #include "iceberg/catalog/sql/config.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" @@ -128,9 +129,15 @@ Result ResolveTableLocation( SqlCatalog::SqlCatalog(SqlCatalogConfig config, std::shared_ptr file_io, std::shared_ptr store) - : config_(std::move(config)), - file_io_(std::move(file_io)), - store_(std::move(store)) {} + : config_(std::move(config)), file_io_(std::move(file_io)), store_(std::move(store)) { + auto it = config_.props.find(std::string(kMetricsReporterImpl)); + if (it != config_.props.end() && !it->second.empty() && + it->second != kMetricsReporterTypeNoop) { + if (auto r = MetricsReporters::Load(config_.props); r.has_value()) { + reporter_ = std::shared_ptr(std::move(r.value())); + } + } +} SqlCatalog::~SqlCatalog() = default; @@ -371,7 +378,7 @@ Result> SqlCatalog::LoadTableFrom( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location)); return Table::Make(identifier, std::move(metadata), metadata_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> SqlCatalog::LoadTable(const TableIdentifier& identifier) { @@ -409,7 +416,7 @@ Result> SqlCatalog::CreateTable( store_->InsertTable(ns_str, identifier.name, metadata_location)); return Table::Make(identifier, std::move(metadata), metadata_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> SqlCatalog::UpdateTable( @@ -474,7 +481,7 @@ Result> SqlCatalog::UpdateTable( } return Table::Make(identifier, std::move(updated), new_metadata_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> SqlCatalog::StageCreateTable( @@ -501,7 +508,7 @@ Result> SqlCatalog::StageCreateTable( base_location, properties)); ICEBERG_ASSIGN_OR_RAISE(auto table, StagedTable::Make(identifier, std::move(metadata), "", file_io_, - shared_from_this())); + shared_from_this(), reporter_)); return Transaction::Make(std::move(table), TransactionKind::kCreate); } @@ -567,7 +574,7 @@ Result> SqlCatalog::RegisterTable( store_->InsertTable(ns_str, identifier.name, metadata_file_location)); return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } // -------------------------------------------------------------------------- diff --git a/src/iceberg/catalog/sql/sql_catalog.h b/src/iceberg/catalog/sql/sql_catalog.h index 35ef107b1..6146af1ad 100644 --- a/src/iceberg/catalog/sql/sql_catalog.h +++ b/src/iceberg/catalog/sql/sql_catalog.h @@ -40,6 +40,10 @@ #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" +namespace iceberg { +class MetricsReporter; +} // namespace iceberg + namespace iceberg::sql { /// \brief Default maximum number of database connections for built-in SQL stores. @@ -184,6 +188,7 @@ class ICEBERG_SQL_CATALOG_EXPORT SqlCatalog SqlCatalogConfig config_; std::shared_ptr file_io_; std::shared_ptr store_; + std::shared_ptr reporter_; }; } // namespace iceberg::sql diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc index 7c8c35032..2b226618e 100644 --- a/src/iceberg/delete_file_index.cc +++ b/src/iceberg/delete_file_index.cc @@ -33,6 +33,7 @@ #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" #include "iceberg/metadata_columns.h" +#include "iceberg/metrics/scan_report.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/util/checked_cast.h" @@ -528,6 +529,12 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() { return *this; } +DeleteFileIndex::Builder& DeleteFileIndex::Builder::ScanMetrics( + class iceberg::ScanMetrics* scan_metrics) { + scan_metrics_ = scan_metrics; + return *this; +} + Result> DeleteFileIndex::Builder::LoadDeleteFiles() { // Build expression caches per spec ID std::unordered_map> part_expr_cache; @@ -542,6 +549,7 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { continue; } if (!manifest.has_added_files() && !manifest.has_existing_files()) { + if (scan_metrics_) scan_metrics_->skipped_delete_manifests->Increment(1); continue; } @@ -581,10 +589,13 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) { ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest)); if (!should_match) { + if (scan_metrics_) scan_metrics_->skipped_delete_manifests->Increment(1); continue; // Manifest doesn't match filter } } + if (scan_metrics_) scan_metrics_->scanned_delete_manifests->Increment(1); + // Read manifest entries ICEBERG_ASSIGN_OR_RAISE(auto reader, ManifestReader::Make(manifest, io_, schema_, spec)); @@ -605,6 +616,9 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { reader->FilterPartitions(partition_set_); } reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); + if (scan_metrics_) { + reader->SkipCounter(scan_metrics_->skipped_delete_files); + } ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); files.reserve(files.size() + entries.size()); @@ -624,6 +638,8 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { file.equality_ids.end()); ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); files.emplace_back(std::move(entry)); + } else { + if (scan_metrics_) scan_metrics_->skipped_delete_files->Increment(1); } } } diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h index 5444281a0..70cdfcc5a 100644 --- a/src/iceberg/delete_file_index.h +++ b/src/iceberg/delete_file_index.h @@ -39,6 +39,8 @@ namespace iceberg { +class ScanMetrics; + namespace internal { /// \brief Wrapper for equality delete files that caches converted bounds. @@ -356,6 +358,11 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { /// \brief Ignore residual expressions after partition filtering. Builder& IgnoreResiduals(); + /// \brief Attach scan metrics for counting scanned/skipped delete manifests. + /// + /// Non-owning pointer; the pointed-to ScanMetrics must outlive the Build() call. + Builder& ScanMetrics(class iceberg::ScanMetrics* scan_metrics); + /// \brief Build the DeleteFileIndex. Result> Build(); @@ -390,6 +397,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { std::shared_ptr partition_set_; bool case_sensitive_ = true; bool ignore_residuals_ = false; + class iceberg::ScanMetrics* scan_metrics_ = nullptr; }; } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 61bb57da2..43b0d5595 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -34,6 +34,7 @@ #include "iceberg/expression/residual_evaluator.h" #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_reader.h" +#include "iceberg/metrics/scan_report.h" #include "iceberg/partition_spec.h" #include "iceberg/row/manifest_wrapper.h" #include "iceberg/schema.h" @@ -189,6 +190,12 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set col return *this; } +ManifestGroup& ManifestGroup::ScanMetrics( + std::shared_ptr scan_metrics) { + scan_metrics_ = std::move(scan_metrics); + return *this; +} + Result>> ManifestGroup::PlanFiles() { auto create_file_scan_tasks = [this](std::vector&& entries, @@ -220,6 +227,24 @@ Result>> ManifestGroup::PlanFiles() { for (auto& task : tasks) { file_tasks.push_back(internal::checked_pointer_cast(task)); } + + if (scan_metrics_) { + for (const auto& task : file_tasks) { + for (const auto& df : task->delete_files()) { + scan_metrics_->result_delete_files->Increment(1); + scan_metrics_->indexed_delete_files->Increment(1); + scan_metrics_->total_delete_file_size_in_bytes->Increment(df->file_size_in_bytes); + if (ContentFileUtil::IsDV(*df)) { + scan_metrics_->dvs->Increment(1); + } else if (df->content == DataFile::Content::kEqualityDeletes) { + scan_metrics_->equality_delete_files->Increment(1); + } else { + scan_metrics_->positional_delete_files->Increment(1); + } + } + } + } + return file_tasks; } @@ -245,6 +270,9 @@ Result>> ManifestGroup::Plan( return residual_cache[spec_id].get(); }; + if (scan_metrics_) { + delete_index_builder_.ScanMetrics(scan_metrics_.get()); + } ICEBERG_ASSIGN_OR_RAISE(auto delete_index, delete_index_builder_.Build()); bool drop_stats = ManifestReader::ShouldDropStats(columns_); @@ -338,6 +366,10 @@ Result> ManifestGroup::MakeReader( .CaseSensitive(case_sensitive_) .Select(std::move(columns)); + if (scan_metrics_) { + reader->SkipCounter(scan_metrics_->skipped_data_files); + } + return reader; } @@ -386,12 +418,18 @@ ManifestGroup::ReadEntries() { ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); if (!should_match) { // Skip this manifest because it doesn't match partition filter + if (scan_metrics_) { + scan_metrics_->skipped_data_manifests->Increment(1); + } continue; } if (ignore_deleted_) { // only scan manifests that have entries other than deletes if (!manifest.has_added_files() && !manifest.has_existing_files()) { + if (scan_metrics_) { + scan_metrics_->skipped_data_manifests->Increment(1); + } continue; } } @@ -399,17 +437,24 @@ ManifestGroup::ReadEntries() { if (ignore_existing_) { // only scan manifests that have entries other than existing if (!manifest.has_added_files() && !manifest.has_deleted_files()) { + if (scan_metrics_) { + scan_metrics_->skipped_data_manifests->Increment(1); + } continue; } } // Read manifest entries + if (scan_metrics_) { + scan_metrics_->scanned_data_manifests->Increment(1); + } ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); ICEBERG_ASSIGN_OR_RAISE(auto entries, ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); for (auto& entry : entries) { if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { + if (scan_metrics_) scan_metrics_->skipped_data_files->Increment(1); continue; } @@ -418,14 +463,23 @@ ManifestGroup::ReadEntries() { ICEBERG_ASSIGN_OR_RAISE(bool should_match, data_file_evaluator->Evaluate(data_file)); if (!should_match) { + if (scan_metrics_) { + scan_metrics_->skipped_data_files->Increment(1); + } continue; } } if (!manifest_entry_predicate_(entry)) { + if (scan_metrics_) scan_metrics_->skipped_data_files->Increment(1); continue; } + if (scan_metrics_ && entry.data_file) { + scan_metrics_->result_data_files->Increment(1); + scan_metrics_->total_file_size_in_bytes->Increment( + entry.data_file->file_size_in_bytes); + } result[spec_id].push_back(std::move(entry)); } } diff --git a/src/iceberg/manifest/manifest_group.h b/src/iceberg/manifest/manifest_group.h index 10b552786..b46ef2526 100644 --- a/src/iceberg/manifest/manifest_group.h +++ b/src/iceberg/manifest/manifest_group.h @@ -39,6 +39,8 @@ namespace iceberg { +class ScanMetrics; + /// \brief Context passed to task creation functions. struct ICEBERG_EXPORT TaskContext { public: @@ -120,6 +122,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { /// \param column_ids Field IDs of columns whose statistics should be preserved. ManifestGroup& ColumnsToKeepStats(std::unordered_set column_ids); + /// \brief Attach scan metrics to receive per-manifest and per-file counters. + ManifestGroup& ScanMetrics(std::shared_ptr scan_metrics); + /// \brief Plan scan tasks for all matching data files. Result>> PlanFiles(); @@ -162,6 +167,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { bool ignore_deleted_ = false; bool ignore_existing_ = false; bool ignore_residuals_ = false; + std::shared_ptr scan_metrics_; }; } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 6c166df53..f26a19892 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -770,6 +770,11 @@ ManifestReader& ManifestReaderImpl::TryDropStats() { return *this; } +ManifestReader& ManifestReaderImpl::SkipCounter(std::shared_ptr counter) { + skip_counter_ = std::move(counter); + return *this; +} + bool ManifestReaderImpl::HasPartitionFilter() const { ICEBERG_DCHECK(part_filter_, "Partition filter is not set"); return part_filter_->op() != Expression::Operation::kTrue; @@ -926,6 +931,7 @@ Result> ManifestReaderImpl::ReadEntries(bool only_liv ICEBERG_ASSIGN_OR_RAISE(bool partition_match, evaluator->Evaluate(entry.data_file->partition)); if (!partition_match) { + if (skip_counter_) skip_counter_->Increment(1); continue; } } @@ -933,11 +939,13 @@ Result> ManifestReaderImpl::ReadEntries(bool only_liv ICEBERG_ASSIGN_OR_RAISE(bool metrics_match, metrics_evaluator->Evaluate(*entry.data_file)); if (!metrics_match) { + if (skip_counter_) skip_counter_->Increment(1); continue; } } ICEBERG_ASSIGN_OR_RAISE(bool in_partition_set, InPartitionSet(*entry.data_file)); if (!in_partition_set) { + if (skip_counter_) skip_counter_->Increment(1); continue; } } diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index b2d1c6505..d952c01c4 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -30,6 +30,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/counter.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -76,6 +77,14 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Try to drop stats from returned DataFile objects. virtual ManifestReader& TryDropStats() = 0; + /// \brief Set a counter to increment for each entry skipped by per-entry filters. + /// + /// Mirrors Java's ManifestReader.scanMetrics() skip-counting behaviour. Entries + /// dropped by the partition filter, row metrics filter, or partition-set filter + /// inside ReadEntries() will each increment this counter by one. Entries dropped + /// by the live-only filter (deleted status) are NOT counted, matching Java. + virtual ManifestReader& SkipCounter(std::shared_ptr counter) = 0; + /// \brief Determine whether stats should be dropped based on selected columns. /// /// Returns true if the selected columns do not include any stats columns, or only diff --git a/src/iceberg/manifest/manifest_reader_internal.h b/src/iceberg/manifest/manifest_reader_internal.h index 53ce2fcb5..15847ecf8 100644 --- a/src/iceberg/manifest/manifest_reader_internal.h +++ b/src/iceberg/manifest/manifest_reader_internal.h @@ -77,6 +77,8 @@ class ManifestReaderImpl : public ManifestReader { ManifestReader& TryDropStats() override; + ManifestReader& SkipCounter(std::shared_ptr counter) override; + private: /// \brief Read entries with optional live-only filtering. Result> ReadEntries(bool only_live); @@ -114,6 +116,7 @@ class ManifestReaderImpl : public ManifestReader { std::shared_ptr part_filter_{True::Instance()}; std::shared_ptr row_filter_{True::Instance()}; std::shared_ptr partition_set_; + std::shared_ptr skip_counter_; bool case_sensitive_{true}; bool drop_stats_{false}; diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 1255871c3..20d6a41fd 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -23,6 +23,7 @@ #include "iceberg/catalog.h" #include "iceberg/location_provider.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -51,7 +52,8 @@ Result> Table::Make(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) { + std::shared_ptr catalog, + std::shared_ptr reporter) { if (metadata == nullptr) [[unlikely]] { return InvalidArgument("Metadata cannot be null"); } @@ -66,19 +68,20 @@ Result> Table::Make(TableIdentifier identifier, } return std::shared_ptr(new Table(std::move(identifier), std::move(metadata), std::move(metadata_location), std::move(io), - std::move(catalog))); + std::move(catalog), std::move(reporter))); } Table::~Table() = default; Table::Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) + std::shared_ptr catalog, std::shared_ptr reporter) : identifier_(std::move(identifier)), metadata_(std::move(metadata)), metadata_location_(std::move(metadata_location)), io_(std::move(io)), catalog_(std::move(catalog)), + reporter_(std::move(reporter)), metadata_cache_(std::make_unique(metadata_.get())) {} const std::string& Table::uuid() const { return metadata_->table_uuid; } @@ -151,12 +154,23 @@ const std::shared_ptr& Table::metadata() const { return metadata_ const std::shared_ptr& Table::catalog() const { return catalog_; } +const std::shared_ptr& Table::reporter() const { return reporter_; } + +void Table::CombineReporter(std::shared_ptr additional) { + reporter_ = MetricsReporters::Combine(reporter_, std::move(additional)); +} + Result> Table::location_provider() const { return LocationProvider::Make(metadata_->location, metadata_->properties); } Result> Table::NewScan() const { - return DataTableScanBuilder::Make(metadata_, io_); + ICEBERG_ASSIGN_OR_RAISE(auto builder, DataTableScanBuilder::Make(metadata_, io_)); + builder->TableName(identifier_.ToString()); + if (reporter_) { + builder->MetricsReporter(reporter_); + } + return builder; } Result> Table::NewIncrementalAppendScan() @@ -214,7 +228,11 @@ Result> Table::NewUpdateLocation() { Result> Table::NewFastAppend() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); - return FastAppend::Make(name().name, std::move(ctx)); + ICEBERG_ASSIGN_OR_RAISE(auto op, FastAppend::Make(name().name, std::move(ctx))); + if (reporter_) { + op->ReportWith(reporter_); + } + return op; } Result> Table::NewUpdateStatistics() { @@ -236,7 +254,7 @@ Result> Table::NewSnapshotManager() { Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) { + std::shared_ptr catalog, std::shared_ptr reporter) { if (metadata == nullptr) [[unlikely]] { return InvalidArgument("Metadata cannot be null"); } @@ -246,9 +264,9 @@ Result> StagedTable::Make( if (catalog == nullptr) [[unlikely]] { return InvalidArgument("Catalog cannot be null"); } - return std::shared_ptr( - new StagedTable(std::move(identifier), std::move(metadata), - std::move(metadata_location), std::move(io), std::move(catalog))); + return std::shared_ptr(new StagedTable( + std::move(identifier), std::move(metadata), std::move(metadata_location), + std::move(io), std::move(catalog), std::move(reporter))); } StagedTable::~StagedTable() = default; diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 8d8849f37..32165ea61 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_reporter.h" #include "iceberg/snapshot.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" @@ -43,11 +44,13 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \param[in] metadata_location The location of the table metadata file. /// \param[in] io The FileIO to read and write table data and metadata files. /// \param[in] catalog The catalog that this table belongs to. - static Result> Make(TableIdentifier identifier, - std::shared_ptr metadata, - std::string metadata_location, - std::shared_ptr io, - std::shared_ptr catalog); + /// \param[in] reporter Optional metrics reporter for this table. Defaults to nullptr + /// (noop). + static Result> Make( + TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io, + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); virtual ~Table(); @@ -117,6 +120,15 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Returns the catalog that this table belongs to const std::shared_ptr& catalog() const; + /// \brief Returns the metrics reporter for this table. + const std::shared_ptr& reporter() const; + + /// \brief Add an additional metrics reporter, combining with any existing one. + /// + /// If a reporter is already set, + /// the new reporter is combined into a CompositeMetricsReporter. + void CombineReporter(std::shared_ptr additional); + /// \brief Returns a LocationProvider for this table Result> location_provider() const; @@ -182,13 +194,15 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); const TableIdentifier identifier_; std::shared_ptr metadata_; std::string metadata_location_; std::shared_ptr io_; std::shared_ptr catalog_; + std::shared_ptr reporter_; std::unique_ptr metadata_cache_; }; @@ -198,7 +212,8 @@ class ICEBERG_EXPORT StagedTable final : public Table { static Result> Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); ~StagedTable() override; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 6881d34fb..775e0c069 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -27,6 +27,9 @@ #include "iceberg/expression/residual_evaluator.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_group.h" +#include "iceberg/metrics/metrics_context.h" +#include "iceberg/metrics/metrics_reporters.h" +#include "iceberg/metrics/scan_report.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" @@ -407,6 +410,21 @@ TableScanBuilder::ResolveSnapshotSchema() { return snapshot_schema_; } +template +TableScanBuilder& TableScanBuilder::MetricsReporter( + std::shared_ptr reporter) { + context_.metrics_reporter = + MetricsReporters::Combine(context_.metrics_reporter, std::move(reporter)); + return *this; +} + +template +TableScanBuilder& TableScanBuilder::TableName( + std::string table_name) { + context_.table_name = std::move(table_name); + return *this; +} + template Result> TableScanBuilder::Build() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); @@ -522,6 +540,10 @@ Result>> DataTableScan::PlanFiles() co return std::vector>{}; } + auto metrics_context = MetricsContext::Default(); + std::shared_ptr scan_metrics = ScanMetrics::Make(*metrics_context); + auto timed = scan_metrics->total_planning_duration->Start(); + TableMetadataCache metadata_cache(metadata_.get()); ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById()); @@ -529,6 +551,11 @@ Result>> DataTableScan::PlanFiles() co ICEBERG_ASSIGN_OR_RAISE(auto data_manifests, snapshot_cache.DataManifests(io_)); ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests, snapshot_cache.DeleteManifests(io_)); + scan_metrics->total_data_manifests->Increment( + static_cast(data_manifests.size())); + scan_metrics->total_delete_manifests->Increment( + static_cast(delete_manifests.size())); + ICEBERG_ASSIGN_OR_RAISE( auto manifest_group, ManifestGroup::Make(io_, schema_, specs_by_id, @@ -538,11 +565,39 @@ Result>> DataTableScan::PlanFiles() co .Select(ScanColumns()) .FilterData(filter()) .IgnoreDeleted() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .ScanMetrics(scan_metrics); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); } - return manifest_group->PlanFiles(); + ICEBERG_ASSIGN_OR_RAISE(auto tasks, manifest_group->PlanFiles()); + + timed.Stop(); + + if (context_.metrics_reporter) { + ICEBERG_ASSIGN_OR_RAISE(auto projected_schema, ResolveProjectedSchema()); + const auto& schema_ptr = projected_schema.get(); + std::vector projected_field_ids; + std::vector projected_field_names; + for (const auto& field : schema_ptr->fields()) { + projected_field_ids.push_back(field.field_id()); + projected_field_names.emplace_back(field.name()); + } + + ScanReport report{ + .table_name = context_.table_name, + .snapshot_id = snapshot->snapshot_id, + .filter = context_.filter, + .schema_id = schema_ptr->schema_id(), + .projected_field_ids = std::move(projected_field_ids), + .projected_field_names = std::move(projected_field_names), + .scan_metrics = scan_metrics->ToResult(), + .metadata = context_.options, + }; + (void)context_.metrics_reporter->Report(report); + } + + return tasks; } // Friend function template for IncrementalScan that implements the shared PlanFiles diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 64fb3ffd1..c74c80325 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -28,6 +28,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_reporter.h" #include "iceberg/result.h" #include "iceberg/table_metadata.h" #include "iceberg/type_fwd.h" @@ -228,6 +229,10 @@ struct TableScanContext { std::optional to_snapshot_id; std::string branch{}; std::optional min_rows_requested; + /// \brief Fully-qualified table name for metrics reporting. + std::string table_name; + /// \brief Reporter to receive ScanReport after PlanFiles() completes. + std::shared_ptr metrics_reporter; // Validate the context parameters to see if they have conflicts. [[nodiscard]] Status Validate() const; @@ -375,6 +380,15 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { TableScanBuilder& UseBranch(const std::string& branch) requires IsIncrementalScan; + /// \brief Add a metrics reporter for this scan. + /// + /// May be called multiple times; each call combines with the previous reporter + /// via MetricsReporters::Combine(). Mirrors Java TableScan.metricsReporter(). + TableScanBuilder& MetricsReporter(std::shared_ptr reporter); + + /// \brief Set the table name for metrics reporting. + TableScanBuilder& TableName(std::string table_name); + /// \brief Builds and returns a TableScan instance. /// \return A Result containing the TableScan or an error. Result> Build(); diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index e18b63d5c..7cff5b150 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -210,6 +210,7 @@ if(ICEBERG_BUILD_BUNDLE) file_scan_task_test.cc incremental_append_scan_test.cc incremental_changelog_scan_test.cc + scan_planning_metrics_test.cc table_scan_test.cc) add_iceberg_test(table_update_test @@ -284,6 +285,7 @@ if(ICEBERG_BUILD_REST) endpoint_test.cc rest_file_io_test.cc rest_json_serde_test.cc + rest_metrics_reporter_test.cc rest_util_test.cc) if(ICEBERG_BUILD_REST_INTEGRATION_TESTS) diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 98956ba7c..60869f6cd 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -20,17 +20,23 @@ #include "iceberg/update/fast_append.h" #include +#include +#include #include #include #include "iceberg/avro/avro_register.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" #include "iceberg/test/test_resource.h" #include "iceberg/test/update_test_base.h" +#include "iceberg/update/update_properties.h" #include "iceberg/util/uuid.h" namespace iceberg { @@ -199,4 +205,151 @@ TEST_F(FastAppendTest, SetSnapshotProperty) { EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value"); } +// --------------------------------------------------------------------------- +// Metrics integration tests +// --------------------------------------------------------------------------- + +namespace { + +class CapturingReporter final : public MetricsReporter { + public: + Status Report(const MetricsReport& report) override { + reports_.push_back(report); + return {}; + } + const std::vector& reports() const { return reports_; } + void clear() { reports_.clear(); } + + private: + std::vector reports_; +}; + +CapturingReporter* g_capturing_reporter = nullptr; + +void RegisterCapturingReporter() { + static std::once_flag flag; + std::call_once(flag, [] { + (void)MetricsReporters::Register( + "fast.append.test.reporter", + [](const auto&) -> Result> { + auto ptr = std::make_unique(); + g_capturing_reporter = ptr.get(); + return ptr; + }); + }); +} + +} // namespace + +// Test fixture that creates an InMemoryCatalog with a CapturingReporter so +// CommitReports emitted by Transaction::Commit() are observable. +class FastAppendMetricsTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + avro::RegisterAll(); + RegisterCapturingReporter(); + } + + void SetUp() override { + table_ident_ = TableIdentifier{.name = "metrics_test_table"}; + table_location_ = "/warehouse/metrics_test_table"; + + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + catalog_ = InMemoryCatalog::Make( + "metrics_test_catalog", file_io_, "/warehouse/", + {{std::string(kMetricsReporterImpl), "fast.append.test.reporter"}}); + + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + + auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", + table_location_, Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL( + auto metadata, ReadTableMetadataFromResource("TableMetadataV2ValidMinimal.json")); + metadata->location = table_location_; + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + ICEBERG_UNWRAP_OR_FAIL(table_, + catalog_->RegisterTable(table_ident_, metadata_location)); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t record_count, + int64_t size, int64_t partition_value = 0) { + auto data_file = std::make_shared(); + data_file->content = DataFile::Content::kData; + data_file->file_path = table_location_ + path; + data_file->file_format = FileFormatType::kParquet; + data_file->partition = + PartitionValues(std::vector{Literal::Long(partition_value)}); + data_file->file_size_in_bytes = size; + data_file->record_count = record_count; + data_file->partition_spec_id = spec_->spec_id(); + return data_file; + } + + TableIdentifier table_ident_; + std::string table_location_; + std::shared_ptr file_io_; + std::shared_ptr catalog_; + std::shared_ptr
table_; + std::shared_ptr spec_; + std::shared_ptr schema_; +}; + +// A CommitReport must be emitted once for each FastAppend commit that creates a +// new snapshot. Validate table_name, snapshot_id, operation, and attempt count. +TEST_F(FastAppendMetricsTest, CommitReportFiredAfterFastAppend) { + ASSERT_NE(g_capturing_reporter, nullptr); + + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(MakeDataFile("/data/file_a.parquet", 100, 1024, 1024)); + ASSERT_THAT(fast_append->Commit(), IsOk()); + + ASSERT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + + const auto& reports = g_capturing_reporter->reports(); + ASSERT_EQ(reports.size(), 1u); + ASSERT_TRUE(std::holds_alternative(reports[0])); + + const auto& report = std::get(reports[0]); + EXPECT_EQ(report.table_name, table_ident_.ToString()); + EXPECT_EQ(report.snapshot_id, snapshot->snapshot_id); + EXPECT_EQ(report.operation, "append"); + ASSERT_TRUE(report.commit_metrics.attempts.has_value()); + EXPECT_EQ(report.commit_metrics.attempts->value, 1); +} + +// A property-only commit must NOT emit a CommitReport because it does not +// create a new snapshot. This covers the original bug where comparing a +// pre-commit snapshot ID of -1 against the existing snapshot ID would be +// skipped by the has_value() guard. +TEST_F(FastAppendMetricsTest, CommitReportNotFiredForPropertyOnlyCommit) { + ASSERT_NE(g_capturing_reporter, nullptr); + + // First do a FastAppend to create a snapshot, then clear the recorder. + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(MakeDataFile("/data/file_a.parquet", 100, 1024, 1024)); + ASSERT_THAT(fast_append->Commit(), IsOk()); + ASSERT_EQ(g_capturing_reporter->reports().size(), 1u); + g_capturing_reporter->clear(); + + // Property-only commit on a table that already has a snapshot. + ASSERT_THAT(table_->Refresh(), IsOk()); + std::shared_ptr update_props; + ICEBERG_UNWRAP_OR_FAIL(update_props, table_->NewUpdateProperties()); + update_props->Set("test-key", "test-value"); + ASSERT_THAT(update_props->Commit(), IsOk()); + + // No new snapshot was created, so no CommitReport must be emitted. + EXPECT_TRUE(g_capturing_reporter->reports().empty()); +} + } // namespace iceberg diff --git a/src/iceberg/test/rest_metrics_reporter_test.cc b/src/iceberg/test/rest_metrics_reporter_test.cc new file mode 100644 index 000000000..08f0613e4 --- /dev/null +++ b/src/iceberg/test/rest_metrics_reporter_test.cc @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/rest_metrics_reporter.h" + +#include + +#include + +#include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/metrics/scan_report.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::rest { + +class RestMetricsReporterTest : public ::testing::Test { + protected: + void SetUp() override { + client_ = std::make_shared(); + session_ = auth::AuthSession::MakeDefault({}); + } + + std::shared_ptr client_; + std::shared_ptr session_; +}; + +// Report() must return OK even when the HTTP call fails (connection refused). +// This validates the fire-and-forget error-suppression contract matching Java behavior. +TEST_F(RestMetricsReporterTest, ReportSuppressesHttpErrorsForScanReport) { + RestMetricsReporter reporter(client_, "http://localhost:0/v1/ns/tables/tbl/metrics", + session_); + + ScanReport report; + report.table_name = "ns.tbl"; + report.snapshot_id = 42; + report.schema_id = 0; + // Leave filter/metrics as default; serialization should still succeed. + + EXPECT_THAT(reporter.Report(report), IsOk()); +} + +TEST_F(RestMetricsReporterTest, ReportSuppressesHttpErrorsForCommitReport) { + RestMetricsReporter reporter(client_, "http://localhost:0/v1/ns/tables/tbl/metrics", + session_); + + CommitReport report; + report.table_name = "ns.tbl"; + report.snapshot_id = 99; + report.sequence_number = 1; + report.operation = "append"; + + EXPECT_THAT(reporter.Report(report), IsOk()); +} + +} // namespace iceberg::rest diff --git a/src/iceberg/test/scan_planning_metrics_test.cc b/src/iceberg/test/scan_planning_metrics_test.cc new file mode 100644 index 000000000..fca314e83 --- /dev/null +++ b/src/iceberg/test/scan_planning_metrics_test.cc @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/// \file scan_planning_metrics_test.cc +/// End-to-end tests for scan planning metrics, mirroring Java's +/// ScanPlanningAndReportingTestBase. + +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/metrics/scan_report.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_scan.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/scan_test_base.h" +#include "iceberg/transform.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +namespace { + +/// Reporter that captures the most recent ScanReport for assertions. +class CapturingReporter final : public MetricsReporter { + public: + Status Report(const MetricsReport& report) override { + if (std::holds_alternative(report)) { + last_ = std::get(report); + } + return {}; + } + + const std::optional& last() const { return last_; } + void clear() { last_.reset(); } + + private: + std::optional last_; +}; + +} // namespace + +class ScanPlanningMetricsTest : public ScanTestBase { + protected: + void SetUp() override { + ScanTestBase::SetUp(); + reporter_ = std::make_shared(); + + ICEBERG_UNWRAP_OR_FAIL( + id_identity_spec_, + PartitionSpec::Make(/*spec_id=*/2, + {PartitionField(/*source_id=*/1, /*field_id=*/1001, "id", + Transform::Identity())})); + } + + /// \brief Build a DataFile with optional lower/upper bounds on the "id" field. + std::shared_ptr MakeDataFile(const std::string& path, + const PartitionValues& partition, + int32_t spec_id, int64_t record_count = 1, + std::optional lower_id = std::nullopt, + std::optional upper_id = std::nullopt) { + auto file = std::make_shared(DataFile{ + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = record_count, + .file_size_in_bytes = 10, + .sort_order_id = 0, + .partition_spec_id = spec_id, + }); + if (lower_id.has_value()) { + file->lower_bounds[1] = Literal::Int(lower_id.value()).Serialize().value(); + } + if (upper_id.has_value()) { + file->upper_bounds[1] = Literal::Int(upper_id.value()).Serialize().value(); + } + return file; + } + + /// \brief Build a positional-delete DataFile. + std::shared_ptr MakePositionDeleteFile( + const std::string& path, const PartitionValues& partition, int32_t spec_id, + std::optional referenced_file = std::nullopt) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .referenced_data_file = std::move(referenced_file), + .partition_spec_id = spec_id, + }); + } + + /// \brief Build a single-snapshot TableMetadata from a manifest list path. + std::shared_ptr BuildMetadata( + int64_t snapshot_id, const std::string& manifest_list_path, + std::shared_ptr spec = nullptr) { + if (!spec) spec = partitioned_spec_; + const auto ts = TimePointMsFromUnixMs(1609459200000L); + auto snapshot = + std::make_shared(Snapshot{.snapshot_id = snapshot_id, + .parent_snapshot_id = std::nullopt, + .sequence_number = 1L, + .timestamp_ms = ts, + .manifest_list = manifest_list_path, + .schema_id = schema_->schema_id()}); + return std::make_shared( + TableMetadata{.format_version = 2, + .table_uuid = "test-table-uuid", + .location = "/tmp/table", + .last_sequence_number = 1L, + .last_updated_ms = ts, + .last_column_id = 2, + .schemas = {schema_}, + .current_schema_id = schema_->schema_id(), + .partition_specs = {spec, unpartitioned_spec_}, + .default_spec_id = spec->spec_id(), + .last_partition_id = 1001, + .current_snapshot_id = snapshot_id, + .snapshots = {snapshot}, + .snapshot_log = {SnapshotLogEntry{.timestamp_ms = ts, + .snapshot_id = snapshot_id}}, + .default_sort_order_id = 0, + .refs = {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = snapshot_id, + .retention = SnapshotRef::Branch{}})}}}); + } + + /// \brief Wrapper matching WriteManifestList(format_version, snap_id, seq, manifests). + std::string WriteManifestList(int8_t format_version, int64_t snapshot_id, + int64_t sequence_number, + const std::vector& manifests) { + return ScanTestBase::WriteManifestList(format_version, snapshot_id, + /*parent_snapshot_id=*/0L, sequence_number, + manifests); + } + + std::shared_ptr reporter_; + std::shared_ptr id_identity_spec_; +}; + +// --------------------------------------------------------------------------- +// Test 1: Verify a ScanReport is fired and contains basic accurate fields. +// Mirrors Java's scanningWithMultipleReporters(). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanReportFiredAfterPlanFiles) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2000L; + const auto part = PartitionValues({Literal::Int(0)}); + + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id(), + /*record_count=*/100, + /*lower_id=*/1, /*upper_id=*/50))}, + partitioned_spec_); + auto manifest_list = + WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, {data_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& report = *reporter_->last(); + EXPECT_EQ(report.table_name, "test.table"); + EXPECT_EQ(report.snapshot_id, kSnapshotId); + + const auto& m = report.scan_metrics; + ASSERT_TRUE(m.total_planning_duration.has_value()); + EXPECT_EQ(m.total_planning_duration->count, 1); + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 1); +} + +// --------------------------------------------------------------------------- +// Test 2: Two manifests, 3 total data files — verify all 12 counters. +// Mirrors Java's scanningWithMultipleDataManifests() (unfiltered sub-scan). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithMultipleDataManifests) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2001L; + const auto part = PartitionValues({Literal::Int(0)}); + + auto manifest1 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id())), + MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part, partitioned_spec_->spec_id()))}, + partitioned_spec_); + + auto manifest2 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_c.parquet", part, partitioned_spec_->spec_id()))}, + partitioned_spec_); + + auto manifest_list = WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, + {manifest1, manifest2}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 3); + ASSERT_TRUE(m.result_delete_files.has_value()); + EXPECT_EQ(m.result_delete_files->value, 0); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 2); + ASSERT_TRUE(m.scanned_delete_manifests.has_value()); + EXPECT_EQ(m.scanned_delete_manifests->value, 0); + ASSERT_TRUE(m.skipped_data_manifests.has_value()); + EXPECT_EQ(m.skipped_data_manifests->value, 0); + ASSERT_TRUE(m.skipped_delete_manifests.has_value()); + EXPECT_EQ(m.skipped_delete_manifests->value, 0); + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 2); + ASSERT_TRUE(m.total_delete_manifests.has_value()); + EXPECT_EQ(m.total_delete_manifests->value, 0); + ASSERT_TRUE(m.skipped_data_files.has_value()); + EXPECT_EQ(m.skipped_data_files->value, 0); + ASSERT_TRUE(m.skipped_delete_files.has_value()); + EXPECT_EQ(m.skipped_delete_files->value, 0); +} + +// --------------------------------------------------------------------------- +// Test 3: Partition filter prunes one of two manifests. +// Uses an identity(id) partition so the manifest evaluator can prune by +// the id range recorded in each manifest's partition field summary. +// Mirrors Java's scanningWithMultipleDataManifests() (filtered sub-scan). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithManifestPruning) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2002L; + + // Manifest 1: id partition = 1 (files with id=1) + const auto part1 = PartitionValues({Literal::Int(1)}); + auto manifest1 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part1, id_identity_spec_->spec_id()))}, + id_identity_spec_); + + // Manifest 2: id partition = 2 (files with id=2) + const auto part2 = PartitionValues({Literal::Int(2)}); + auto manifest2 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part2, id_identity_spec_->spec_id()))}, + id_identity_spec_); + + auto manifest_list = WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, + {manifest1, manifest2}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list, id_identity_spec_); + + // Filter id = 1: only manifest 1 survives the manifest-level evaluator. + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_) + .TableName("test.table") + .Filter(Expressions::Equal("id", Literal::Int(1))); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1u); + EXPECT_EQ(tasks[0]->data_file()->file_path, "/data/file_a.parquet"); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 2); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 1); + ASSERT_TRUE(m.skipped_data_manifests.has_value()); + EXPECT_EQ(m.skipped_data_manifests->value, 1); + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 1); + ASSERT_TRUE(m.skipped_data_files.has_value()); + EXPECT_EQ(m.skipped_data_files->value, 0); +} + +// --------------------------------------------------------------------------- +// Test 4: Row-stats filter skips one entry inside a scanned manifest. +// Both files live in the same manifest; only the inclusive metrics evaluator +// (lower/upper bounds on "id") can distinguish them. +// Mirrors Java's scanningWithSkippedDataFiles(). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithSkippedDataFiles) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2003L; + const auto part = PartitionValues({Literal::Int(0)}); + + // Both files share the same bucket partition so the manifest is not pruned. + // file_a covers id [1, 50]; file_b covers id [51, 100]. + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id(), + /*record_count=*/50, /*lower_id=*/1, /*upper_id=*/50)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part, partitioned_spec_->spec_id(), + /*record_count=*/50, /*lower_id=*/51, /*upper_id=*/100))}, + partitioned_spec_); + auto manifest_list = + WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, {data_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + // Filter id = 25: within file_a's range, outside file_b's range. + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_) + .TableName("test.table") + .Filter(Expressions::Equal("id", Literal::Int(25))); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1u); + EXPECT_EQ(tasks[0]->data_file()->file_path, "/data/file_a.parquet"); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 1); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 1); + ASSERT_TRUE(m.skipped_data_manifests.has_value()); + EXPECT_EQ(m.skipped_data_manifests->value, 0); + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 1); + ASSERT_TRUE(m.skipped_data_files.has_value()); + EXPECT_EQ(m.skipped_data_files->value, 1); +} + +// --------------------------------------------------------------------------- +// Test 5: Scan with positional delete files — verify delete file counters. +// Mirrors Java's scanningWithDeletes(). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithDeleteFiles) { + auto version = GetParam(); + if (version < 2) { + GTEST_SKIP() << "Delete files are only supported in format version 2+"; + } + constexpr int64_t kSnapshotId = 2004L; + const auto part = PartitionValues({Literal::Int(0)}); + + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id())), + MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part, partitioned_spec_->spec_id()))}, + partitioned_spec_); + + // One positional-delete file covering file_a. + auto delete_manifest = WriteDeleteManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/2, + MakePositionDeleteFile("/data/pos_delete.parquet", part, + partitioned_spec_->spec_id(), "/data/file_a.parquet"))}, + partitioned_spec_); + + auto manifest_list = WriteManifestList(version, kSnapshotId, /*sequence_number=*/2, + {data_manifest, delete_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 2); + ASSERT_TRUE(m.result_delete_files.has_value()); + EXPECT_EQ(m.result_delete_files->value, 1); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 1); + ASSERT_TRUE(m.scanned_delete_manifests.has_value()); + EXPECT_EQ(m.scanned_delete_manifests->value, 1); + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 1); + ASSERT_TRUE(m.total_delete_manifests.has_value()); + EXPECT_EQ(m.total_delete_manifests->value, 1); + ASSERT_TRUE(m.indexed_delete_files.has_value()); + EXPECT_EQ(m.indexed_delete_files->value, 1); + ASSERT_TRUE(m.positional_delete_files.has_value()); + EXPECT_EQ(m.positional_delete_files->value, 1); + ASSERT_TRUE(m.equality_delete_files.has_value()); + EXPECT_EQ(m.equality_delete_files->value, 0); + ASSERT_TRUE(m.dvs.has_value()); + EXPECT_EQ(m.dvs->value, 0); +} + +INSTANTIATE_TEST_SUITE_P(ScanPlanningMetricsVersions, ScanPlanningMetricsTest, + testing::Values(2, 3)); + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 049b0f49d..8810fc97a 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -23,6 +23,8 @@ #include #include "iceberg/catalog.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/metrics_context.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/statistics_file.h" @@ -353,16 +355,28 @@ Result> Transaction::Commit() { int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs); int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs); int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs); + int64_t pre_commit_snapshot_id = -1; + if (auto pre = ctx_->table->metadata()->Snapshot(); pre.has_value() && pre.value()) { + pre_commit_snapshot_id = pre.value()->snapshot_id; + } + + auto metrics_context = MetricsContext::Default(); + auto commit_metrics = CommitMetrics::Make(*metrics_context); + auto timed = commit_metrics->total_duration->Start(); bool is_first_attempt = true; auto commit_result = MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms) - .Run([this, &is_first_attempt]() -> Result> { + .Run([this, &is_first_attempt, + &commit_metrics]() -> Result> { + commit_metrics->attempts->Increment(1); auto result = CommitOnce(is_first_attempt); is_first_attempt = false; return result; }); + timed.Stop(); + Result finalize_result = commit_result.has_value() ? Result(commit_result.value()->metadata().get()) @@ -378,6 +392,27 @@ Result> Transaction::Commit() { committed_ = true; ctx_->table = std::move(commit_result.value()); + // Fire CommitReport only when a new snapshot was produced (not for property-only + // commits). + const auto& reporter = ctx_->table->reporter(); + if (reporter) { + auto snapshot_result = ctx_->table->metadata()->Snapshot(); + if (snapshot_result.has_value() && snapshot_result.value() && + snapshot_result.value()->snapshot_id != pre_commit_snapshot_id) { + const auto& snapshot = snapshot_result.value(); + const auto op = snapshot->Operation(); + CommitReport report{ + .table_name = ctx_->table->name().ToString(), + .snapshot_id = snapshot->snapshot_id, + .sequence_number = snapshot->sequence_number, + .operation = op.has_value() ? std::string(op.value()) : "", + .commit_metrics = CommitMetricsResult::From(*commit_metrics, snapshot->summary), + .metadata = {}, + }; + (void)reporter->Report(report); + } + } + return ctx_->table; } @@ -475,6 +510,9 @@ Result> Transaction::NewFastAppend() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr fast_append, FastAppend::Make(ctx_->table->name().name, ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append)); + if (const auto& r = ctx_->table->reporter()) { + fast_append->ReportWith(r); + } return fast_append; } diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 03a74e788..0ed7dfa70 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -29,6 +29,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_reporter.h" #include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" @@ -54,6 +55,15 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { Kind kind() const override { return Kind::kUpdateSnapshot; } bool IsRetryable() const override { return true; } + /// \brief Set the metrics reporter for this snapshot update. + /// + /// \param reporter The metrics reporter to use. + /// \return Reference to this for method chaining. + auto& ReportWith(this auto& self, std::shared_ptr reporter) { + static_cast(self).reporter_ = std::move(reporter); + return self; + } + /// \brief Set a callback to delete files instead of the table's default. /// /// \param delete_func A function used to delete file locations @@ -217,6 +227,10 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::string ManifestListPath(); SnapshotSummaryBuilder& summary_builder() { return summary_; } + protected: + /// \brief Reporter to receive CommitReport after a successful commit. + std::shared_ptr reporter_; + private: /// \brief Returns the snapshot summary from the implementation and updates totals. Result> ComputeSummary(