diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc index 7c8c35032..c4a5b0ca1 100644 --- a/src/iceberg/delete_file_index.cc +++ b/src/iceberg/delete_file_index.cc @@ -22,7 +22,9 @@ #include #include #include +#include #include +#include #include #include "iceberg/expression/expression.h" @@ -38,6 +40,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -528,107 +531,159 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() { return *this; } +DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor executor) { + executor_ = executor; + return *this; +} + Result> DeleteFileIndex::Builder::LoadDeleteFiles() { - // Build expression caches per spec ID - std::unordered_map> part_expr_cache; + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex projected_expr_cache_mutex; + std::unordered_map> projected_expr_cache; + std::shared_mutex eval_cache_mutex; std::unordered_map> eval_cache; auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_; - // Filter and read manifests into manifest entries - std::vector files; - for (const auto& manifest : delete_manifests_) { - if (manifest.content != ManifestContent::kDeletes) { - continue; + auto and_filters = + [](std::shared_ptr left, + std::shared_ptr right) -> Result> { + if (left && right) { + return And::MakeFolded(std::move(left), std::move(right)); } - if (!manifest.has_added_files() && !manifest.has_existing_files()) { - continue; + if (right) { + return right; + } + return left; + }; + + auto get_projected_expr = [&](int32_t spec_id, + const std::shared_ptr& spec) + -> Result> { + if (!data_filter_) { + return std::shared_ptr(); } - const int32_t spec_id = manifest.partition_spec_id; - auto spec_iter = specs_by_id_.find(spec_id); - ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), - "Partition spec ID {} not found when loading delete files", spec_id); + { + std::shared_lock lock(projected_expr_cache_mutex); + auto iter = projected_expr_cache.find(spec_id); + if (iter != projected_expr_cache.end()) { + return iter->second; + } + } - const auto& spec = spec_iter->second; + std::lock_guard lock(projected_expr_cache_mutex); + auto iter = projected_expr_cache.find(spec_id); + if (iter != projected_expr_cache.end()) { + return iter->second; + } - // Get or compute projected partition expression - if (!part_expr_cache.contains(spec_id) && data_filter_) { - auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); - ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_)); - part_expr_cache[spec_id] = std::move(projected); + auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); + ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_)); + auto [inserted_iter, _] = projected_expr_cache.emplace(spec_id, std::move(projected)); + return inserted_iter->second; + }; + + auto get_manifest_evaluator = + [&](int32_t spec_id, const std::shared_ptr& spec, + const std::shared_ptr& filter) -> Result { + if (!filter) { + return nullptr; } - // Get or create manifest evaluator - if (!eval_cache.contains(spec_id)) { - auto filter = partition_filter_; - if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { - if (filter) { - ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second)); - } else { - filter = it->second; - } - } - if (filter) { - ICEBERG_ASSIGN_OR_RAISE(auto evaluator, - ManifestEvaluator::MakePartitionFilter( - std::move(filter), spec, *schema_, case_sensitive_)); - eval_cache[spec_id] = std::move(evaluator); + { + std::shared_lock lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } } - // Evaluate manifest against filter - if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) { - ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest)); - if (!should_match) { - continue; // Manifest doesn't match filter - } + std::lock_guard lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } - // Read manifest entries - ICEBERG_ASSIGN_OR_RAISE(auto reader, - ManifestReader::Make(manifest, io_, schema_, spec)); - - auto partition_filter = partition_filter_; - if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { - if (partition_filter) { - ICEBERG_ASSIGN_OR_RAISE(partition_filter, - And::Make(partition_filter, it->second)); - } else { - partition_filter = it->second; + ICEBERG_ASSIGN_OR_RAISE(auto evaluator, ManifestEvaluator::MakePartitionFilter( + filter, spec, *schema_, case_sensitive_)); + auto [inserted_iter, _] = eval_cache.emplace(spec_id, std::move(evaluator)); + return inserted_iter->second.get(); + }; + + std::vector> manifest_results(delete_manifests_.size()); + auto read_tasks = TaskGroup().SetExecutor(executor_); + for (auto&& [manifest, manifest_result] : + std::views::zip(delete_manifests_, manifest_results)) { + read_tasks.Submit([&]() -> Status { + if (manifest.content != ManifestContent::kDeletes) { + return {}; } - } - if (partition_filter) { - reader->FilterPartitions(std::move(partition_filter)); - } - if (partition_set_) { - reader->FilterPartitions(partition_set_); - } - reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); - - ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); - files.reserve(files.size() + entries.size()); - - for (auto& entry : entries) { - ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); - ICEBERG_CHECK(entry.sequence_number.has_value(), - "Missing sequence number from delete file: {}", - entry.data_file->file_path); - if (entry.sequence_number.value() > min_sequence_number_) { - auto& file = *entry.data_file; - // keep minimum stats to avoid memory pressure - std::unordered_set columns = - file.content == DataFile::Content::kPositionDeletes - ? std::unordered_set{MetadataColumns::kDeleteFilePathColumnId} - : std::unordered_set(file.equality_ids.begin(), - file.equality_ids.end()); - ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); - files.emplace_back(std::move(entry)); + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + return {}; } - } + + const int32_t spec_id = manifest.partition_spec_id; + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Partition spec ID {} not found when loading delete files", spec_id); + + const auto& spec = spec_iter->second; + + ICEBERG_ASSIGN_OR_RAISE(auto projected_data_filter, + get_projected_expr(spec_id, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto delete_partition_filter, + and_filters(partition_filter_, projected_data_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_evaluator, + get_manifest_evaluator(spec_id, spec, delete_partition_filter)); + if (manifest_evaluator != nullptr) { + ICEBERG_ASSIGN_OR_RAISE(auto should_match, + manifest_evaluator->Evaluate(manifest)); + if (!should_match) { + return {}; + } + } + + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, io_, schema_, spec)); + + if (delete_partition_filter) { + reader->FilterPartitions(std::move(delete_partition_filter)); + } + if (partition_set_) { + reader->FilterPartitions(partition_set_); + } + reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); + + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + manifest_result.reserve(entries.size()); + + for (auto& entry : entries) { + ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); + ICEBERG_CHECK(entry.sequence_number.has_value(), + "Missing sequence number from delete file: {}", + entry.data_file->file_path); + if (entry.sequence_number.value() > min_sequence_number_) { + auto& file = *entry.data_file; + // keep minimum stats to avoid memory pressure + std::unordered_set columns = + file.content == DataFile::Content::kPositionDeletes + ? std::unordered_set{MetadataColumns::kDeleteFilePathColumnId} + : std::unordered_set(file.equality_ids.begin(), + file.equality_ids.end()); + ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); + manifest_result.emplace_back(std::move(entry)); + } + } + return {}; + }); } + ICEBERG_RETURN_UNEXPECTED(std::move(read_tasks).Run()); - return files; + return manifest_results | std::views::join | std::views::as_rvalue | + std::ranges::to>(); } Status DeleteFileIndex::Builder::AddDV( diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h index 5444281a0..9f3768edf 100644 --- a/src/iceberg/delete_file_index.h +++ b/src/iceberg/delete_file_index.h @@ -35,6 +35,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" #include "iceberg/util/partition_value_util.h" namespace iceberg { @@ -356,6 +357,9 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { /// \brief Ignore residual expressions after partition filtering. Builder& IgnoreResiduals(); + /// \brief Configure an optional executor for reading delete manifests. + Builder& PlanWith(OptionalExecutor executor); + /// \brief Build the DeleteFileIndex. Result> Build(); @@ -388,6 +392,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { std::shared_ptr data_filter_; std::shared_ptr partition_filter_; std::shared_ptr partition_set_; + OptionalExecutor executor_; bool case_sensitive_ = true; bool ignore_residuals_ = false; }; diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 61bb57da2..1850d1a50 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -21,6 +21,9 @@ #include #include +#include +#include +#include #include #include #include @@ -42,6 +45,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -189,6 +193,12 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set col return *this; } +ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) { + executor_ = executor; + delete_index_builder_.PlanWith(executor); + return *this; +} + Result>> ManifestGroup::PlanFiles() { auto create_file_scan_tasks = [this](std::vector&& entries, @@ -343,10 +353,23 @@ Result> ManifestGroup::MakeReader( Result>> ManifestGroup::ReadEntries() { + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex eval_cache_mutex; std::unordered_map> eval_cache; + auto get_manifest_evaluator = [&](int32_t spec_id) -> Result { - if (eval_cache.contains(spec_id)) { - return eval_cache[spec_id].get(); + { + std::shared_lock lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); + } + } + + std::lock_guard lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } auto spec_iter = specs_by_id_.find(spec_id); @@ -376,57 +399,73 @@ ManifestGroup::ReadEntries() { Evaluator::Make(*DataFileFilterSchema(), file_filter_, case_sensitive_)); } - std::unordered_map> result; + std::vector>> manifest_results( + data_manifests_.size()); - // TODO(gangwu): Parallelize reading manifests - for (const auto& manifest : data_manifests_) { - const int32_t spec_id = manifest.partition_spec_id; + auto read_tasks = TaskGroup().SetExecutor(executor_); + for (auto&& [manifest, manifest_result] : + std::views::zip(data_manifests_, manifest_results)) { + read_tasks.Submit([&]() -> Status { + const int32_t spec_id = manifest.partition_spec_id; - ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); - ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); - if (!should_match) { - // Skip this manifest because it doesn't match partition filter - continue; - } + ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); + if (!should_match) { + // Skip this manifest because it doesn't match partition filter + return {}; + } - if (ignore_deleted_) { - // only scan manifests that have entries other than deletes - if (!manifest.has_added_files() && !manifest.has_existing_files()) { - continue; + if (ignore_deleted_) { + // only scan manifests that have entries other than deletes + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + return {}; + } } - } - if (ignore_existing_) { - // only scan manifests that have entries other than existing - if (!manifest.has_added_files() && !manifest.has_deleted_files()) { - continue; + if (ignore_existing_) { + // only scan manifests that have entries other than existing + if (!manifest.has_added_files() && !manifest.has_deleted_files()) { + return {}; + } } - } - // Read manifest entries - ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); - ICEBERG_ASSIGN_OR_RAISE(auto entries, - ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); + ICEBERG_ASSIGN_OR_RAISE( + auto entries, ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); - for (auto& entry : entries) { - if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { - continue; - } + for (auto& entry : entries) { + if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { + continue; + } - if (data_file_evaluator != nullptr) { - DataFileStructLike data_file(*entry.data_file); - ICEBERG_ASSIGN_OR_RAISE(bool should_match, - data_file_evaluator->Evaluate(data_file)); - if (!should_match) { + if (data_file_evaluator != nullptr) { + DataFileStructLike data_file(*entry.data_file); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, + data_file_evaluator->Evaluate(data_file)); + if (!should_match) { + continue; + } + } + + if (!manifest_entry_predicate_(entry)) { continue; } - } - if (!manifest_entry_predicate_(entry)) { - continue; + manifest_result[spec_id].push_back(std::move(entry)); } + return {}; + }); + } + ICEBERG_RETURN_UNEXPECTED(std::move(read_tasks).Run()); - result[spec_id].push_back(std::move(entry)); + std::unordered_map> result; + for (auto& manifest_result : manifest_results) { + result.merge(manifest_result); + for (auto& [spec_id, entries] : manifest_result) { + auto& spec_entries = result[spec_id]; + spec_entries.insert(spec_entries.end(), std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); } } diff --git a/src/iceberg/manifest/manifest_group.h b/src/iceberg/manifest/manifest_group.h index 10b552786..cbc935047 100644 --- a/src/iceberg/manifest/manifest_group.h +++ b/src/iceberg/manifest/manifest_group.h @@ -36,6 +36,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -120,6 +121,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { /// \param column_ids Field IDs of columns whose statistics should be preserved. ManifestGroup& ColumnsToKeepStats(std::unordered_set column_ids); + /// \brief Configure an optional executor for manifest planning. + ManifestGroup& PlanWith(OptionalExecutor executor); + /// \brief Plan scan tasks for all matching data files. Result>> PlanFiles(); @@ -158,6 +162,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { std::function manifest_entry_predicate_; std::vector columns_; std::unordered_set columns_to_keep_stats_; + OptionalExecutor executor_; bool case_sensitive_ = true; bool ignore_deleted_ = false; bool ignore_existing_ = false; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 6881d34fb..fb4fbf3c5 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -288,6 +288,12 @@ TableScanBuilder& TableScanBuilder::MinRowsRequested( return *this; } +template +TableScanBuilder& TableScanBuilder::PlanWith(Executor& executor) { + context_.plan_executor = std::ref(executor); + return *this; +} + template TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) { ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(), @@ -538,7 +544,8 @@ Result>> DataTableScan::PlanFiles() co .Select(ScanColumns()) .FilterData(filter()) .IgnoreDeleted() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); } @@ -641,7 +648,8 @@ Result>> IncrementalAppendScan::PlanFi entry.status == ManifestStatus::kAdded; }) .IgnoreDeleted() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); @@ -737,7 +745,8 @@ IncrementalChangelogScan::PlanFiles(std::optional from_snapshot_id_excl snapshot_ids.contains(entry.snapshot_id.value()); }) .IgnoreExisting() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 64fb3ffd1..f0c1747b6 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -32,6 +32,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -228,6 +229,7 @@ struct TableScanContext { std::optional to_snapshot_id; std::string branch{}; std::optional min_rows_requested; + OptionalExecutor plan_executor; // Validate the context parameters to see if they have conflicts. [[nodiscard]] Status Validate() const; @@ -302,6 +304,9 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { /// \param num_rows The minimum number of rows requested TableScanBuilder& MinRowsRequested(int64_t num_rows); + /// \brief Configure an executor for manifest planning. + TableScanBuilder& PlanWith(Executor& executor); + /// \brief Request this scan to use the given snapshot by ID. /// \param snapshot_id a snapshot ID /// \note InvalidArgument will be returned if the snapshot cannot be found diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 98956ba7c..a7b083beb 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -20,6 +20,8 @@ #include "iceberg/update/fast_append.h" #include +#include +#include #include #include @@ -29,12 +31,31 @@ #include "iceberg/schema.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" -#include "iceberg/test/test_resource.h" #include "iceberg/test/update_test_base.h" -#include "iceberg/util/uuid.h" +#include "iceberg/transaction.h" namespace iceberg { +namespace { + +class TestSnapshotUpdate : public SnapshotUpdate { + public: + explicit TestSnapshotUpdate(std::shared_ptr ctx) + : SnapshotUpdate(std::move(ctx)) {} + + using SnapshotUpdate::ManifestPath; + + Status CleanUncommitted(const std::unordered_set&) override { return {}; } + std::string operation() override { return "test"; } + Result> Apply(const TableMetadata&, + const std::shared_ptr&) override { + return std::vector{}; + } + std::unordered_map Summary() override { return {}; } +}; + +} // namespace + class FastAppendTest : public UpdateTestBase { protected: static void SetUpTestSuite() { avro::RegisterAll(); } @@ -78,6 +99,8 @@ class FastAppendTest : public UpdateTestBase { std::shared_ptr file_b_; }; +class SnapshotUpdateTest : public UpdateTestBase {}; + TEST_F(FastAppendTest, AppendDataFile) { std::shared_ptr fast_append; ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); @@ -199,4 +222,35 @@ TEST_F(FastAppendTest, SetSnapshotProperty) { EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value"); } +TEST_F(SnapshotUpdateTest, ConcurrentManifestPaths) { + ICEBERG_UNWRAP_OR_FAIL(auto ctx, + TransactionContext::Make(table_, TransactionKind::kUpdate)); + TestSnapshotUpdate update(std::move(ctx)); + + constexpr int kThreadCount = 8; + constexpr int kPathsPerThread = 32; + std::vector paths(kThreadCount * kPathsPerThread); + std::vector threads; + threads.reserve(kThreadCount); + + for (int thread_index = 0; thread_index < kThreadCount; ++thread_index) { + threads.emplace_back([&, thread_index] { + for (int path_index = 0; path_index < kPathsPerThread; ++path_index) { + paths[thread_index * kPathsPerThread + path_index] = update.ManifestPath(); + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + std::unordered_set unique_paths(paths.begin(), paths.end()); + ASSERT_EQ(unique_paths.size(), paths.size()); + for (const auto& path : paths) { + EXPECT_THAT(path, ::testing::HasSubstr("/metadata/")); + EXPECT_THAT(path, ::testing::HasSubstr("-m")); + } +} + } // namespace iceberg diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 70e2cea99..b3026ab4a 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -39,6 +39,7 @@ #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/table_scan.h" +#include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" #include "iceberg/transform.h" #include "iceberg/type.h" @@ -329,6 +330,44 @@ TEST_P(ManifestGroupTest, IgnoreDeleted) { "/path/to/existing.parquet")); } +TEST_P(ManifestGroupTest, PlanWithExecutor) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + constexpr int64_t kSequenceNumber = 1L; + const auto partition_a = PartitionValues({Literal::Int(0)}); + const auto partition_b = PartitionValues({Literal::Int(1)}); + + auto manifest_a = + WriteDataManifest(version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/data-a.parquet", partition_a, + partitioned_spec_->spec_id()))}, + partitioned_spec_); + auto manifest_b = + WriteDataManifest(version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/data-b.parquet", partition_b, + partitioned_spec_->spec_id()))}, + partitioned_spec_); + + std::vector manifests = { + WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, manifest_a), + WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, manifest_b)}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + + test::ThreadExecutor executor; + group->PlanWith(std::ref(executor)); + + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data-a.parquet", + "/path/to/data-b.parquet")); + EXPECT_EQ(executor.submit_count(), 2); +} + TEST_P(ManifestGroupTest, IgnoreExisting) { auto version = GetParam(); diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index 11905a870..3ac877fcc 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -30,6 +30,7 @@ #include "iceberg/expression/expressions.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" +#include "iceberg/test/executor.h" #include "iceberg/test/scan_test_base.h" namespace iceberg { @@ -401,6 +402,72 @@ TEST_P(TableScanTest, PlanFilesWithMultipleManifests) { "/path/to/data2.parquet")); } +TEST_P(TableScanTest, PlanWithExecutor) { + auto version = GetParam(); + + const auto partition_a = PartitionValues({Literal::Int(0)}); + const auto partition_b = PartitionValues({Literal::Int(1)}); + + std::vector data_entries_1{MakeEntry( + ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1, + MakeDataFile("/path/to/data1.parquet", partition_a, partitioned_spec_->spec_id()))}; + auto data_manifest_1 = WriteDataManifest(version, /*snapshot_id=*/1000L, + std::move(data_entries_1), partitioned_spec_); + + std::vector data_entries_2{MakeEntry( + ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1, + MakeDataFile("/path/to/data2.parquet", partition_b, partitioned_spec_->spec_id()))}; + auto data_manifest_2 = WriteDataManifest(version, /*snapshot_id=*/1000L, + std::move(data_entries_2), partitioned_spec_); + + std::string manifest_list_path = + WriteManifestList(version, /*snapshot_id=*/1000L, /*sequence_number=*/1, + {data_manifest_1, data_manifest_2}); + + auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L); + auto snapshot_with_manifests = + std::make_shared(Snapshot{.snapshot_id = 1000L, + .parent_snapshot_id = std::nullopt, + .sequence_number = 1L, + .timestamp_ms = timestamp_ms, + .manifest_list = manifest_list_path, + .summary = {}, + .schema_id = schema_->schema_id()}); + + auto metadata_with_manifests = std::make_shared( + TableMetadata{.format_version = 2, + .table_uuid = "test-table-uuid", + .location = "/tmp/table", + .last_sequence_number = 1L, + .last_updated_ms = timestamp_ms, + .last_column_id = 2, + .schemas = {schema_}, + .current_schema_id = schema_->schema_id(), + .partition_specs = {partitioned_spec_, unpartitioned_spec_}, + .default_spec_id = partitioned_spec_->spec_id(), + .last_partition_id = 1000, + .current_snapshot_id = 1000L, + .snapshots = {snapshot_with_manifests}, + .snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms, + .snapshot_id = 1000L}}, + .default_sort_order_id = 0, + .refs = {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, + .retention = SnapshotRef::Branch{}, + })}}}); + + test::ThreadExecutor executor; + ICEBERG_UNWRAP_OR_FAIL(auto builder, + DataTableScanBuilder::Make(metadata_with_manifests, file_io_)); + builder->PlanWith(executor); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet", + "/path/to/data2.parquet")); + EXPECT_EQ(executor.submit_count(), 2); +} + TEST_P(TableScanTest, PlanFilesWithFilter) { auto version = GetParam(); diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index c9ac9e4cd..12107d872 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -693,6 +695,11 @@ ExpireSnapshots& ExpireSnapshots::DeleteWith( return *this; } +ExpireSnapshots& ExpireSnapshots::PlanWith(Executor& executor) { + plan_executor_ = std::ref(executor); + return *this; +} + ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) { cleanup_level_ = level; return *this; @@ -865,20 +872,34 @@ Result ExpireSnapshots::Apply() { }); if (clean_expired_metadata_) { + std::vector> reachable_spec_ids(ids_to_retain.size()); + std::vector> reachable_schema_ids(ids_to_retain.size()); + TaskGroup<> metadata_tasks; + metadata_tasks.SetExecutor(plan_executor_); + for (auto&& [snapshot_id, spec_ids, schema_id] : + std::views::zip(ids_to_retain, reachable_spec_ids, reachable_schema_ids)) { + metadata_tasks.Submit([&]() -> Status { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id)); + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, + snapshot_cache.Manifests(ctx_->table->io())); + for (const auto& manifest : manifests) { + spec_ids.insert(manifest.partition_spec_id); + } + schema_id = snapshot->schema_id; + return {}; + }); + } + ICEBERG_RETURN_UNEXPECTED(std::move(metadata_tasks).Run()); + std::unordered_set reachable_specs = {base.default_spec_id}; std::unordered_set reachable_schemas = {base.current_schema_id}; - // TODO(xiao.dong) parallel processing - for (int64_t snapshot_id : ids_to_retain) { - ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id)); - SnapshotCache snapshot_cache(snapshot.get()); - ICEBERG_ASSIGN_OR_RAISE(auto manifests, - snapshot_cache.Manifests(ctx_->table->io())); - for (const auto& manifest : manifests) { - reachable_specs.insert(manifest.partition_spec_id); - } - if (snapshot->schema_id.has_value()) { - reachable_schemas.insert(snapshot->schema_id.value()); + for (auto&& [spec_ids, schema_id] : + std::views::zip(reachable_spec_ids, reachable_schema_ids)) { + reachable_specs.merge(spec_ids); + if (schema_id.has_value()) { + reachable_schemas.insert(schema_id.value()); } } diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index a5b6e3b32..8ed9d5b3c 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -32,6 +32,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" +#include "iceberg/util/executor.h" #include "iceberg/util/timepoint.h" /// \file iceberg/update/expire_snapshots.h @@ -119,6 +120,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// \return Reference to this for method chaining. ExpireSnapshots& DeleteWith(std::function delete_func); + /// \brief Configure an executor for planning expired snapshot metadata. + ExpireSnapshots& PlanWith(Executor& executor); + /// \brief Configures the cleanup level for expired files. /// /// This method provides fine-grained control over which files are cleaned up during @@ -182,6 +186,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { std::function delete_func_; std::vector snapshot_ids_to_expire_; enum CleanupLevel cleanup_level_ { CleanupLevel::kAll }; + OptionalExecutor plan_executor_; bool clean_expired_metadata_{false}; bool specified_snapshot_id_{false}; diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index bb5376fa8..e71fe1cd3 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -30,11 +30,12 @@ #include "iceberg/manifest/manifest_writer.h" #include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/partition_summary_internal.h" -#include "iceberg/table.h" +#include "iceberg/table.h" // IWYU pragma: keep #include "iceberg/transaction.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" +#include "iceberg/util/task_group.h" #include "iceberg/util/uuid.h" namespace iceberg { @@ -174,7 +175,7 @@ void SnapshotUpdate::SetSummaryProperty(const std::string& property, summary_.Set(property, value); } -// TODO(xxx): write manifests in parallel +// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDataManifests( std::span> files, const std::shared_ptr& spec, @@ -200,7 +201,7 @@ Result> SnapshotUpdate::WriteDataManifests( return rolling_writer.ToManifestFiles(); } -// TODO(xxx): write manifests in parallel +// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDeleteManifests( std::span files, const std::shared_ptr& spec) { @@ -257,13 +258,17 @@ Result SnapshotUpdate::Apply() { ICEBERG_RETURN_UNEXPECTED(Validate(base(), parent_snapshot)); ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot)); + auto metadata_tasks = TaskGroup().SetExecutor(plan_executor_); for (auto& manifest : manifests) { if (manifest.added_snapshot_id != kInvalidSnapshotId) { continue; } - // TODO(xxx): read in parallel and cache enriched manifests for retries - ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), base())); + metadata_tasks.Submit([&manifest, this]() -> Status { + ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), base())); + return {}; + }); } + ICEBERG_RETURN_UNEXPECTED(std::move(metadata_tasks).Run()); std::string manifest_list_path = ManifestListPath(); manifest_lists_.push_back(manifest_list_path); @@ -419,15 +424,17 @@ std::string SnapshotUpdate::ManifestListPath() { // Generate manifest list path // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro int64_t snapshot_id = SnapshotId(); + auto attempt = attempt_.fetch_add(1, std::memory_order_relaxed) + 1; std::string filename = - std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_); + std::format("snap-{}-{}-{}.avro", snapshot_id, attempt, commit_uuid_); return ctx_->MetadataFileLocation(filename); } std::string SnapshotUpdate::ManifestPath() { // Generate manifest path // Format: {metadata_location}/{uuid}-m{manifest_count}.avro - std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count_++); + auto manifest_count = manifest_count_.fetch_add(1, std::memory_order_relaxed); + std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count); return ctx_->MetadataFileLocation(filename); } diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 03a74e788..54b2ae50a 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -33,6 +34,7 @@ #include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -77,6 +79,12 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Configure an executor for manifest planning work. + auto& ScanManifestsWith(this auto& self, Executor& executor) { + self.plan_executor_ = std::ref(executor); + return self; + } + /// \brief Perform operations on a particular branch /// /// \param branch Which is name of SnapshotRef of type branch @@ -147,8 +155,10 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { const std::string& target_branch() const { return target_branch_; } bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } const std::string& commit_uuid() const { return commit_uuid_; } - int32_t manifest_count() const { return manifest_count_; } - int32_t attempt() const { return attempt_; } + int32_t manifest_count() const { + return manifest_count_.load(std::memory_order_relaxed); + } + int32_t attempt() const { return attempt_.load(std::memory_order_relaxed); } int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; } /// \brief Clean up any uncommitted manifests that were created. @@ -231,11 +241,12 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { private: const bool can_inherit_snapshot_id_{true}; const std::string commit_uuid_; - int32_t manifest_count_{0}; - int32_t attempt_{0}; + std::atomic manifest_count_{0}; + std::atomic attempt_{0}; std::vector manifest_lists_; const int64_t target_manifest_size_bytes_; std::optional snapshot_id_; + OptionalExecutor plan_executor_; bool stage_only_{false}; std::function delete_func_; std::string target_branch_{SnapshotRef::kMainBranch};