Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmake_modules/IcebergThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ function(resolve_arrow_dependency)
set(ARROW_S3 ${ICEBERG_S3})
set(ARROW_JSON ON)
set(ARROW_PARQUET ON)
set(ARROW_ENABLE_THREADING ON)
set(ARROW_SIMD_LEVEL "NONE")
set(ARROW_RUNTIME_SIMD_LEVEL "NONE")
set(ARROW_POSITION_INDEPENDENT_CODE ON)
Expand Down Expand Up @@ -167,8 +168,8 @@ function(resolve_arrow_dependency)
endif()

# Arrow's exported static target interface may reference system libraries
# (e.g. OpenSSL, CURL, ZLIB) that consumers need to find.
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES ZLIB)
# (e.g. Threads, OpenSSL, CURL, ZLIB) that consumers need to find.
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Threads ZLIB)
if(ARROW_S3)
list(APPEND ICEBERG_SYSTEM_DEPENDENCIES OpenSSL CURL)
endif()
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ set(ICEBERG_SOURCES
util/snapshot_util.cc
util/string_util.cc
util/struct_like_set.cc
util/task_group.cc
util/temporal_util.cc
util/timepoint.cc
util/transform_util.cc
Expand Down
215 changes: 135 additions & 80 deletions src/iceberg/delete_file_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include <algorithm>
#include <cstdint>
#include <iterator>
#include <mutex>
#include <ranges>
#include <shared_mutex>
#include <vector>

#include "iceberg/expression/expression.h"
Expand All @@ -38,6 +40,7 @@
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/content_file_util.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/task_group.h"

namespace iceberg {

Expand Down Expand Up @@ -528,107 +531,159 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() {
return *this;
}

DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor executor) {
executor_ = executor;
return *this;
}

Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() {
// Build expression caches per spec ID
std::unordered_map<int32_t, std::shared_ptr<Expression>> part_expr_cache;
// TODO(zehua): Replace with a thread-safe LRU cache.
std::shared_mutex projected_expr_cache_mutex;
std::unordered_map<int32_t, std::shared_ptr<Expression>> projected_expr_cache;
std::shared_mutex eval_cache_mutex;
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;

auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_;

// Filter and read manifests into manifest entries
std::vector<ManifestEntry> files;
for (const auto& manifest : delete_manifests_) {
if (manifest.content != ManifestContent::kDeletes) {
continue;
auto and_filters =
[](std::shared_ptr<Expression> left,
std::shared_ptr<Expression> right) -> Result<std::shared_ptr<Expression>> {
if (left && right) {
return And::MakeFolded(std::move(left), std::move(right));
}
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
continue;
if (right) {
return right;
}
return left;
};

auto get_projected_expr = [&](int32_t spec_id,
const std::shared_ptr<PartitionSpec>& spec)
-> Result<std::shared_ptr<Expression>> {
if (!data_filter_) {
return std::shared_ptr<Expression>();
}

const int32_t spec_id = manifest.partition_spec_id;
auto spec_iter = specs_by_id_.find(spec_id);
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Partition spec ID {} not found when loading delete files", spec_id);
{
std::shared_lock lock(projected_expr_cache_mutex);
auto iter = projected_expr_cache.find(spec_id);
if (iter != projected_expr_cache.end()) {
return iter->second;
}
}

const auto& spec = spec_iter->second;
std::lock_guard lock(projected_expr_cache_mutex);
auto iter = projected_expr_cache.find(spec_id);
if (iter != projected_expr_cache.end()) {
return iter->second;
}

// Get or compute projected partition expression
if (!part_expr_cache.contains(spec_id) && data_filter_) {
auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_));
part_expr_cache[spec_id] = std::move(projected);
auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_));
auto [inserted_iter, _] = projected_expr_cache.emplace(spec_id, std::move(projected));
return inserted_iter->second;
};

auto get_manifest_evaluator =
[&](int32_t spec_id, const std::shared_ptr<PartitionSpec>& spec,
const std::shared_ptr<Expression>& filter) -> Result<ManifestEvaluator*> {
if (!filter) {
return nullptr;
}

// Get or create manifest evaluator
if (!eval_cache.contains(spec_id)) {
auto filter = partition_filter_;
if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) {
if (filter) {
ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second));
} else {
filter = it->second;
}
}
if (filter) {
ICEBERG_ASSIGN_OR_RAISE(auto evaluator,
ManifestEvaluator::MakePartitionFilter(
std::move(filter), spec, *schema_, case_sensitive_));
eval_cache[spec_id] = std::move(evaluator);
{
std::shared_lock lock(eval_cache_mutex);
auto iter = eval_cache.find(spec_id);
if (iter != eval_cache.end()) {
return iter->second.get();
}
}

// Evaluate manifest against filter
if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) {
ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest));
if (!should_match) {
continue; // Manifest doesn't match filter
}
std::lock_guard lock(eval_cache_mutex);
auto iter = eval_cache.find(spec_id);
if (iter != eval_cache.end()) {
return iter->second.get();
}

// Read manifest entries
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, spec));

auto partition_filter = partition_filter_;
if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) {
if (partition_filter) {
ICEBERG_ASSIGN_OR_RAISE(partition_filter,
And::Make(partition_filter, it->second));
} else {
partition_filter = it->second;
ICEBERG_ASSIGN_OR_RAISE(auto evaluator, ManifestEvaluator::MakePartitionFilter(
filter, spec, *schema_, case_sensitive_));
auto [inserted_iter, _] = eval_cache.emplace(spec_id, std::move(evaluator));
return inserted_iter->second.get();
};

std::vector<std::vector<ManifestEntry>> manifest_results(delete_manifests_.size());
auto read_tasks = TaskGroup().SetExecutor(executor_);
for (auto&& [manifest, manifest_result] :
std::views::zip(delete_manifests_, manifest_results)) {
read_tasks.Submit([&]() -> Status {
Comment thread
HuaHuaY marked this conversation as resolved.
if (manifest.content != ManifestContent::kDeletes) {
return {};
}
}
if (partition_filter) {
reader->FilterPartitions(std::move(partition_filter));
}
if (partition_set_) {
reader->FilterPartitions(partition_set_);
}
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();

ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
files.reserve(files.size() + entries.size());

for (auto& entry : entries) {
ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
ICEBERG_CHECK(entry.sequence_number.has_value(),
"Missing sequence number from delete file: {}",
entry.data_file->file_path);
if (entry.sequence_number.value() > min_sequence_number_) {
auto& file = *entry.data_file;
// keep minimum stats to avoid memory pressure
std::unordered_set<int32_t> columns =
file.content == DataFile::Content::kPositionDeletes
? std::unordered_set<int32_t>{MetadataColumns::kDeleteFilePathColumnId}
: std::unordered_set<int32_t>(file.equality_ids.begin(),
file.equality_ids.end());
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
files.emplace_back(std::move(entry));
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
return {};
}
}

const int32_t spec_id = manifest.partition_spec_id;
auto spec_iter = specs_by_id_.find(spec_id);
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Partition spec ID {} not found when loading delete files", spec_id);

const auto& spec = spec_iter->second;

ICEBERG_ASSIGN_OR_RAISE(auto projected_data_filter,
get_projected_expr(spec_id, spec));
ICEBERG_ASSIGN_OR_RAISE(auto delete_partition_filter,
and_filters(partition_filter_, projected_data_filter));
ICEBERG_ASSIGN_OR_RAISE(
auto manifest_evaluator,
get_manifest_evaluator(spec_id, spec, delete_partition_filter));
if (manifest_evaluator != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(auto should_match,
manifest_evaluator->Evaluate(manifest));
if (!should_match) {
return {};
}
}

// Read manifest entries
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, spec));

if (delete_partition_filter) {
reader->FilterPartitions(std::move(delete_partition_filter));
}
if (partition_set_) {
reader->FilterPartitions(partition_set_);
}
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();

ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
manifest_result.reserve(entries.size());

for (auto& entry : entries) {
ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
ICEBERG_CHECK(entry.sequence_number.has_value(),
"Missing sequence number from delete file: {}",
entry.data_file->file_path);
if (entry.sequence_number.value() > min_sequence_number_) {
auto& file = *entry.data_file;
// keep minimum stats to avoid memory pressure
std::unordered_set<int32_t> columns =
file.content == DataFile::Content::kPositionDeletes
? std::unordered_set<int32_t>{MetadataColumns::kDeleteFilePathColumnId}
: std::unordered_set<int32_t>(file.equality_ids.begin(),
file.equality_ids.end());
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
manifest_result.emplace_back(std::move(entry));
}
}
return {};
});
}
ICEBERG_RETURN_UNEXPECTED(std::move(read_tasks).Run());

return files;
return manifest_results | std::views::join | std::views::as_rvalue |
std::ranges::to<std::vector<ManifestEntry>>();
}

Status DeleteFileIndex::Builder::AddDV(
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/delete_file_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/executor.h"
#include "iceberg/util/partition_value_util.h"

namespace iceberg {
Expand Down Expand Up @@ -356,6 +357,9 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
/// \brief Ignore residual expressions after partition filtering.
Builder& IgnoreResiduals();

/// \brief Configure an optional executor for reading delete manifests.
Builder& PlanWith(OptionalExecutor executor);

/// \brief Build the DeleteFileIndex.
Result<std::unique_ptr<DeleteFileIndex>> Build();

Expand Down Expand Up @@ -388,6 +392,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
std::shared_ptr<Expression> data_filter_;
std::shared_ptr<Expression> partition_filter_;
std::shared_ptr<PartitionSet> partition_set_;
OptionalExecutor executor_;
bool case_sensitive_ = true;
bool ignore_residuals_ = false;
};
Expand Down
Loading
Loading