diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 04a73ca4c..f3dfbc4d9 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -22,6 +22,7 @@ set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc delete_file_index.cc + deletes/roaring_position_bitmap.cc expression/aggregate.cc expression/binder.cc expression/evaluator.cc @@ -70,6 +71,11 @@ set(ICEBERG_SOURCES partition_field.cc partition_spec.cc partition_summary.cc + puffin/file_metadata.cc + puffin/json_serde.cc + puffin/puffin_format.cc + puffin/puffin_reader.cc + puffin/puffin_writer.cc row/arrow_array_wrapper.cc row/manifest_wrapper.cc row/partition_values.cc @@ -139,22 +145,24 @@ list(APPEND ICEBERG_STATIC_BUILD_INTERFACE_LIBS "$,nanoarrow::nanoarrow_static,$,nanoarrow::nanoarrow_static,nanoarrow::nanoarrow_shared>>" nlohmann_json::nlohmann_json + roaring::roaring ZLIB::ZLIB) list(APPEND ICEBERG_SHARED_BUILD_INTERFACE_LIBS "$,nanoarrow::nanoarrow_static,$,nanoarrow::nanoarrow_shared,nanoarrow::nanoarrow_static>>" nlohmann_json::nlohmann_json + roaring::roaring ZLIB::ZLIB) list(APPEND ICEBERG_STATIC_INSTALL_INTERFACE_LIBS "$,iceberg::nanoarrow_static,$,nanoarrow::nanoarrow_static,nanoarrow::nanoarrow_shared>>" "$,iceberg::nlohmann_json,$,nlohmann_json::nlohmann_json,nlohmann_json::nlohmann_json>>" -) + "$,iceberg::roaring,roaring::roaring>") list(APPEND ICEBERG_SHARED_INSTALL_INTERFACE_LIBS "$,iceberg::nanoarrow_static,$,nanoarrow::nanoarrow_shared,nanoarrow::nanoarrow_static>>" "$,iceberg::nlohmann_json,$,nlohmann_json::nlohmann_json,nlohmann_json::nlohmann_json>>" -) + "$,iceberg::roaring,roaring::roaring>") add_iceberg_lib(iceberg SOURCES @@ -181,13 +189,7 @@ set(ICEBERG_DATA_SOURCES data/position_delete_writer.cc data/writer.cc deletes/position_delete_index.cc - deletes/position_delete_range_consumer.cc - deletes/roaring_position_bitmap.cc - puffin/file_metadata.cc - puffin/json_serde.cc - puffin/puffin_format.cc - puffin/puffin_reader.cc - puffin/puffin_writer.cc) + deletes/position_delete_range_consumer.cc) set(ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS) @@ -195,19 +197,17 @@ set(ICEBERG_DATA_STATIC_INSTALL_INTERFACE_LIBS) set(ICEBERG_DATA_SHARED_INSTALL_INTERFACE_LIBS) list(APPEND ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS - "$,iceberg_static,iceberg_shared>" - roaring::roaring) + "$,iceberg_static,iceberg_shared>") list(APPEND ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS - "$,iceberg_shared,iceberg_static>" - roaring::roaring) + "$,iceberg_shared,iceberg_static>") list(APPEND ICEBERG_DATA_STATIC_INSTALL_INTERFACE_LIBS "$,iceberg::iceberg_static,iceberg::iceberg_shared>" - "$,iceberg::roaring,roaring::roaring>") +) list(APPEND ICEBERG_DATA_SHARED_INSTALL_INTERFACE_LIBS "$,iceberg::iceberg_shared,iceberg::iceberg_static>" - "$,iceberg::roaring,roaring::roaring>") +) add_iceberg_lib(iceberg_data SOURCES diff --git a/src/iceberg/deletes/roaring_position_bitmap.h b/src/iceberg/deletes/roaring_position_bitmap.h index bfb7d7c9e..61a8f5ddb 100644 --- a/src/iceberg/deletes/roaring_position_bitmap.h +++ b/src/iceberg/deletes/roaring_position_bitmap.h @@ -29,7 +29,7 @@ #include #include -#include "iceberg/iceberg_data_export.h" +#include "iceberg/iceberg_export.h" #include "iceberg/result.h" namespace iceberg { @@ -48,7 +48,7 @@ class PositionDeleteIndex; /// \note This class is used to represent deletion vectors. The Puffin reader/writer /// handle adding the additional required framing (length prefix, magic bytes, CRC-32) /// for `deletion-vector-v1` persistence. -class ICEBERG_DATA_EXPORT RoaringPositionBitmap { +class ICEBERG_EXPORT RoaringPositionBitmap { public: /// \brief Maximum supported position (aligned with the Java implementation). static constexpr int64_t kMaxPosition = 0x7FFFFFFE80000000LL; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 15fd5d79d..53cba260f 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -44,6 +44,7 @@ iceberg_sources = files( 'arrow_c_data_util.cc', 'catalog/memory/in_memory_catalog.cc', 'delete_file_index.cc', + 'deletes/roaring_position_bitmap.cc', 'expression/aggregate.cc', 'expression/binder.cc', 'expression/evaluator.cc', @@ -92,6 +93,11 @@ iceberg_sources = files( 'partition_field.cc', 'partition_spec.cc', 'partition_summary.cc', + 'puffin/file_metadata.cc', + 'puffin/json_serde.cc', + 'puffin/puffin_format.cc', + 'puffin/puffin_reader.cc', + 'puffin/puffin_writer.cc', 'row/arrow_array_wrapper.cc', 'row/manifest_wrapper.cc', 'row/partition_values.cc', @@ -163,12 +169,6 @@ iceberg_data_sources = files( 'data/writer.cc', 'deletes/position_delete_index.cc', 'deletes/position_delete_range_consumer.cc', - 'deletes/roaring_position_bitmap.cc', - 'puffin/file_metadata.cc', - 'puffin/json_serde.cc', - 'puffin/puffin_format.cc', - 'puffin/puffin_reader.cc', - 'puffin/puffin_writer.cc', ) # CRoaring does not export symbols, so on Windows it must @@ -182,7 +182,7 @@ nanoarrow_dep = dependency('nanoarrow') nlohmann_json_dep = dependency('nlohmann_json') zlib_dep = dependency('zlib') -iceberg_deps = [nanoarrow_dep, nlohmann_json_dep, zlib_dep] +iceberg_deps = [nanoarrow_dep, nlohmann_json_dep, croaring_dep, zlib_dep] iceberg_lib = library( 'iceberg', @@ -208,7 +208,7 @@ iceberg_dep = declare_dependency( ) meson.override_dependency('iceberg', iceberg_dep) -iceberg_data_deps = [iceberg_dep, croaring_dep] +iceberg_data_deps = [iceberg_dep] iceberg_data_lib = library( 'iceberg_data', sources: iceberg_data_sources, diff --git a/src/iceberg/puffin/file_metadata.h b/src/iceberg/puffin/file_metadata.h index 14eaae6c6..17ddad77d 100644 --- a/src/iceberg/puffin/file_metadata.h +++ b/src/iceberg/puffin/file_metadata.h @@ -29,7 +29,7 @@ #include #include -#include "iceberg/iceberg_data_export.h" +#include "iceberg/iceberg_export.h" #include "iceberg/result.h" namespace iceberg::puffin { @@ -41,12 +41,12 @@ enum class PuffinCompressionCodec { kZstd, }; -ICEBERG_DATA_EXPORT std::string_view CodecName(PuffinCompressionCodec codec); +ICEBERG_EXPORT std::string_view CodecName(PuffinCompressionCodec codec); -ICEBERG_DATA_EXPORT Result PuffinCompressionCodecFromName( +ICEBERG_EXPORT Result PuffinCompressionCodecFromName( std::string_view codec_name); -ICEBERG_DATA_EXPORT std::string ToString(PuffinCompressionCodec codec); +ICEBERG_EXPORT std::string ToString(PuffinCompressionCodec codec); /// \brief Standard blob types defined by the Iceberg specification. struct StandardBlobTypes { @@ -67,7 +67,7 @@ struct StandardPuffinProperties { }; /// \brief A blob in a Puffin file. -struct ICEBERG_DATA_EXPORT Blob { +struct ICEBERG_EXPORT Blob { /// See StandardBlobTypes for known types. std::string type; /// Ordered list of field IDs the blob was computed from. @@ -84,10 +84,10 @@ struct ICEBERG_DATA_EXPORT Blob { friend bool operator==(const Blob& lhs, const Blob& rhs) = default; }; -ICEBERG_DATA_EXPORT std::string ToString(const Blob& blob); +ICEBERG_EXPORT std::string ToString(const Blob& blob); /// \brief Metadata about a blob stored in a Puffin file footer. -struct ICEBERG_DATA_EXPORT BlobMetadata { +struct ICEBERG_EXPORT BlobMetadata { /// See StandardBlobTypes for known types. std::string type; /// Ordered list of field IDs the blob was computed from. @@ -105,16 +105,16 @@ struct ICEBERG_DATA_EXPORT BlobMetadata { friend bool operator==(const BlobMetadata& lhs, const BlobMetadata& rhs) = default; }; -ICEBERG_DATA_EXPORT std::string ToString(const BlobMetadata& blob_metadata); +ICEBERG_EXPORT std::string ToString(const BlobMetadata& blob_metadata); /// \brief Metadata about a Puffin file. -struct ICEBERG_DATA_EXPORT FileMetadata { +struct ICEBERG_EXPORT FileMetadata { std::vector blobs; std::unordered_map properties; friend bool operator==(const FileMetadata& lhs, const FileMetadata& rhs) = default; }; -ICEBERG_DATA_EXPORT std::string ToString(const FileMetadata& file_metadata); +ICEBERG_EXPORT std::string ToString(const FileMetadata& file_metadata); } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/json_serde_internal.h b/src/iceberg/puffin/json_serde_internal.h index 9e003e168..dbb5fbd09 100644 --- a/src/iceberg/puffin/json_serde_internal.h +++ b/src/iceberg/puffin/json_serde_internal.h @@ -27,30 +27,30 @@ #include -#include "iceberg/iceberg_data_export.h" +#include "iceberg/iceberg_export.h" #include "iceberg/puffin/type_fwd.h" #include "iceberg/result.h" namespace iceberg::puffin { /// \brief Serialize a BlobMetadata to JSON. -ICEBERG_DATA_EXPORT nlohmann::json ToJson(const BlobMetadata& blob_metadata); +ICEBERG_EXPORT nlohmann::json ToJson(const BlobMetadata& blob_metadata); /// \brief Deserialize a BlobMetadata from JSON. -ICEBERG_DATA_EXPORT Result BlobMetadataFromJson(const nlohmann::json& json); +ICEBERG_EXPORT Result BlobMetadataFromJson(const nlohmann::json& json); /// \brief Serialize a FileMetadata to JSON. -ICEBERG_DATA_EXPORT nlohmann::json ToJson(const FileMetadata& file_metadata); +ICEBERG_EXPORT nlohmann::json ToJson(const FileMetadata& file_metadata); /// \brief Deserialize a FileMetadata from JSON. -ICEBERG_DATA_EXPORT Result FileMetadataFromJson(const nlohmann::json& json); +ICEBERG_EXPORT Result FileMetadataFromJson(const nlohmann::json& json); /// \brief Serialize a FileMetadata to a JSON string. -ICEBERG_DATA_EXPORT std::string ToJsonString(const FileMetadata& file_metadata, - bool pretty = false); +ICEBERG_EXPORT std::string ToJsonString(const FileMetadata& file_metadata, + bool pretty = false); /// \brief Deserialize a FileMetadata from a JSON string. -ICEBERG_DATA_EXPORT Result FileMetadataFromJsonString( +ICEBERG_EXPORT Result FileMetadataFromJsonString( std::string_view json_string); } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_format.h b/src/iceberg/puffin/puffin_format.h index b3b5f10de..05a27145e 100644 --- a/src/iceberg/puffin/puffin_format.h +++ b/src/iceberg/puffin/puffin_format.h @@ -28,14 +28,14 @@ #include #include -#include "iceberg/iceberg_data_export.h" +#include "iceberg/iceberg_export.h" #include "iceberg/puffin/file_metadata.h" #include "iceberg/result.h" namespace iceberg::puffin { /// \brief Puffin file format constants. -struct ICEBERG_DATA_EXPORT PuffinFormat { +struct ICEBERG_EXPORT PuffinFormat { /// Magic bytes: "PFA1" (Puffin Fratercula arctica, version 1) static constexpr std::array kMagicV1 = {0x50, 0x46, 0x41, 0x31}; @@ -63,17 +63,17 @@ enum class PuffinFlag : uint8_t { }; /// \brief Check if a flag is set in the flags bytes. -ICEBERG_DATA_EXPORT bool IsFlagSet(std::span flags, PuffinFlag flag); +ICEBERG_EXPORT bool IsFlagSet(std::span flags, PuffinFlag flag); /// \brief Set a flag in the flags bytes. -ICEBERG_DATA_EXPORT void SetFlag(std::span flags, PuffinFlag flag); +ICEBERG_EXPORT void SetFlag(std::span flags, PuffinFlag flag); /// \brief Compress data using the specified codec. -ICEBERG_DATA_EXPORT Result> Compress( - PuffinCompressionCodec codec, std::span input); +ICEBERG_EXPORT Result> Compress(PuffinCompressionCodec codec, + std::span input); /// \brief Decompress data using the specified codec. -ICEBERG_DATA_EXPORT Result> Decompress( +ICEBERG_EXPORT Result> Decompress( PuffinCompressionCodec codec, std::span input); } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.h b/src/iceberg/puffin/puffin_reader.h index 3b805426b..d2cd92e17 100644 --- a/src/iceberg/puffin/puffin_reader.h +++ b/src/iceberg/puffin/puffin_reader.h @@ -29,7 +29,7 @@ #include #include -#include "iceberg/iceberg_data_export.h" +#include "iceberg/iceberg_export.h" #include "iceberg/puffin/file_metadata.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -39,7 +39,7 @@ namespace iceberg::puffin { /// \brief Reader for Puffin files. /// /// Reads from an InputFile with seek support for efficient blob access. -class ICEBERG_DATA_EXPORT PuffinReader { +class ICEBERG_EXPORT PuffinReader { public: /// \brief Create a PuffinReader for the given input file. /// \param input_file The input file to read from. diff --git a/src/iceberg/puffin/puffin_writer.h b/src/iceberg/puffin/puffin_writer.h index 2a7984091..b9d49bdcc 100644 --- a/src/iceberg/puffin/puffin_writer.h +++ b/src/iceberg/puffin/puffin_writer.h @@ -30,7 +30,7 @@ #include #include -#include "iceberg/iceberg_data_export.h" +#include "iceberg/iceberg_export.h" #include "iceberg/puffin/file_metadata.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -40,7 +40,7 @@ namespace iceberg::puffin { /// \brief Writer for Puffin files. /// /// Writes blobs and footer to an OutputFile stream. -class ICEBERG_DATA_EXPORT PuffinWriter { +class ICEBERG_EXPORT PuffinWriter { public: /// \brief Create a PuffinWriter for the given output file. /// \param output_file The output file to write to. diff --git a/src/iceberg/test/merging_snapshot_update_test.cc b/src/iceberg/test/merging_snapshot_update_test.cc index 69ee10b91..60de65971 100644 --- a/src/iceberg/test/merging_snapshot_update_test.cc +++ b/src/iceberg/test/merging_snapshot_update_test.cc @@ -29,11 +29,16 @@ #include "iceberg/avro/avro_register.h" #include "iceberg/constants.h" +#include "iceberg/deletes/roaring_position_bitmap.h" #include "iceberg/expression/expressions.h" +#include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_reader.h" #include "iceberg/manifest/manifest_writer.h" #include "iceberg/partition_spec.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_reader.h" +#include "iceberg/puffin/puffin_writer.h" #include "iceberg/row/partition_values.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" @@ -43,12 +48,15 @@ #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" #include "iceberg/transaction.h" -#include "iceberg/update/fast_append.h" -#include "iceberg/update/update_properties.h" +#include "iceberg/update/fast_append.h" // IWYU pragma: keep +#include "iceberg/update/update_properties.h" // IWYU pragma: keep #include "iceberg/util/macros.h" namespace iceberg { +static_assert(sizeof(FastAppend) > 0); +static_assert(sizeof(UpdateProperties) > 0); + /// \brief Concrete subclass of MergingSnapshotUpdate for testing. class TestMergeAppend : public MergingSnapshotUpdate { public: @@ -246,6 +254,60 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase { return f; } + Result> WriteDeletionVector( + const std::string& path, const std::vector& positions, + const std::shared_ptr& data_file) { + RoaringPositionBitmap bitmap; + for (auto position : positions) { + bitmap.Add(position); + } + ICEBERG_ASSIGN_OR_RAISE(auto serialized, bitmap.Serialize()); + + auto file_path = table_location_ + path; + ICEBERG_ASSIGN_OR_RAISE(auto output, file_io_->NewOutputFile(file_path)); + ICEBERG_ASSIGN_OR_RAISE(auto writer, puffin::PuffinWriter::Make(std::move(output))); + std::vector bytes(serialized.begin(), serialized.end()); + ICEBERG_ASSIGN_OR_RAISE( + auto blob_metadata, + writer->Write(puffin::Blob{ + .type = std::string(puffin::StandardBlobTypes::kDeletionVectorV1), + .input_fields = {}, + .snapshot_id = 1, + .sequence_number = 7, + .data = std::move(bytes), + })); + ICEBERG_RETURN_UNEXPECTED(writer->Finish()); + ICEBERG_ASSIGN_OR_RAISE(auto file_size, writer->FileSize()); + + auto delete_file = MakeDeleteFile(path, 1L); + delete_file->partition = data_file->partition; + delete_file->partition_spec_id = data_file->partition_spec_id; + delete_file->file_format = FileFormatType::kPuffin; + delete_file->record_count = static_cast(positions.size()); + delete_file->file_size_in_bytes = file_size; + delete_file->referenced_data_file = data_file->file_path; + delete_file->content_offset = blob_metadata.offset; + delete_file->content_size_in_bytes = blob_metadata.length; + return delete_file; + } + + Result ReadDeletionVector(const DataFile& delete_file) { + ICEBERG_ASSIGN_OR_RAISE(auto input, file_io_->NewInputFile(delete_file.file_path)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + puffin::PuffinReader::Make(std::move(input), std::nullopt, + delete_file.file_size_in_bytes)); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, reader->ReadFileMetadata()); + auto blob_it = std::ranges::find_if(metadata.blobs, [&](const auto& blob) { + return blob.type == puffin::StandardBlobTypes::kDeletionVectorV1 && + blob.offset == delete_file.content_offset.value() && + blob.length == delete_file.content_size_in_bytes.value(); + }); + ICEBERG_PRECHECK(blob_it != metadata.blobs.end(), "DV blob not found"); + ICEBERG_ASSIGN_OR_RAISE(auto blob, reader->ReadBlob(*blob_it)); + return RoaringPositionBitmap::Deserialize(std::string_view( + reinterpret_cast(blob.second.data()), blob.second.size())); + } + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, int64_t partition_x) { auto f = MakeDeleteFile(path, partition_x); @@ -805,6 +867,68 @@ TEST_F(MergingSnapshotUpdateTest, ValidateNewDeleteFileV3AllowsDeletionVector) { EXPECT_THAT(op->AddDelete(del_file), IsOk()); } +TEST_F(MergingSnapshotUpdateTest, ApplyMergesDuplicateDeletionVectors) { + SetTableFormatVersion(3); + + ICEBERG_UNWRAP_OR_FAIL(auto first_dv, + WriteDeletionVector("/delete/dv_a.puffin", {1, 3}, file_a_)); + ICEBERG_UNWRAP_OR_FAIL(auto second_dv, + WriteDeletionVector("/delete/dv_b.puffin", {3, 5}, file_a_)); + ICEBERG_UNWRAP_OR_FAIL(auto third_dv, + WriteDeletionVector("/delete/dv_c.puffin", {7}, file_b_)); + ICEBERG_UNWRAP_OR_FAIL(auto fourth_dv, + WriteDeletionVector("/delete/dv_d.puffin", {9}, file_b_)); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend()); + EXPECT_THAT(op->AddDelete(first_dv, 7), IsOk()); + EXPECT_THAT(op->AddDelete(second_dv, 7), IsOk()); + EXPECT_THAT(op->AddDelete(third_dv, 7), IsOk()); + EXPECT_THAT(op->AddDelete(fourth_dv, 7), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto apply, static_cast(*op).Apply()); + SnapshotCache snapshot_cache(apply.snapshot.get()); + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, snapshot_cache.DeleteManifests(file_io_)); + std::vector delete_manifest_vector(delete_manifests.begin(), + delete_manifests.end()); + ICEBERG_UNWRAP_OR_FAIL(auto entries, + ReadAllEntries(delete_manifest_vector, *table_->metadata())); + + ASSERT_EQ(entries.size(), 2U); + ASSERT_NE(entries[0].data_file, nullptr); + ASSERT_NE(entries[1].data_file, nullptr); + const DataFile* merged_a = nullptr; + const DataFile* merged_b = nullptr; + for (const auto& entry : entries) { + const auto& merged_dv = *entry.data_file; + EXPECT_EQ(merged_dv.file_format, FileFormatType::kPuffin); + if (merged_dv.referenced_data_file == file_a_->file_path) { + merged_a = &merged_dv; + } else if (merged_dv.referenced_data_file == file_b_->file_path) { + merged_b = &merged_dv; + } + } + ASSERT_NE(merged_a, nullptr); + ASSERT_NE(merged_b, nullptr); + EXPECT_EQ(merged_a->file_path, merged_b->file_path); + EXPECT_NE(merged_a->content_offset, merged_b->content_offset); + EXPECT_NE(merged_a->file_path, first_dv->file_path); + EXPECT_NE(merged_a->file_path, second_dv->file_path); + EXPECT_NE(merged_b->file_path, third_dv->file_path); + EXPECT_NE(merged_b->file_path, fourth_dv->file_path); + EXPECT_EQ(merged_a->record_count, 3); + EXPECT_EQ(merged_b->record_count, 2); + + ICEBERG_UNWRAP_OR_FAIL(auto bitmap_a, ReadDeletionVector(*merged_a)); + EXPECT_EQ(bitmap_a.Cardinality(), 3U); + EXPECT_TRUE(bitmap_a.Contains(1)); + EXPECT_TRUE(bitmap_a.Contains(3)); + EXPECT_TRUE(bitmap_a.Contains(5)); + ICEBERG_UNWRAP_OR_FAIL(auto bitmap_b, ReadDeletionVector(*merged_b)); + EXPECT_EQ(bitmap_b.Cardinality(), 2U); + EXPECT_TRUE(bitmap_b.Contains(7)); + EXPECT_TRUE(bitmap_b.Contains(9)); +} + TEST_F(MergingSnapshotUpdateTest, ValidateNewDeleteFileRejectsUnsupportedVersion) { SetTableFormatVersion(TableMetadata::kSupportedTableFormatVersion + 1); diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 049b0f49d..1023f4f9d 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -22,10 +22,9 @@ #include #include -#include "iceberg/catalog.h" -#include "iceberg/schema.h" +#include "iceberg/catalog.h" // IWYU pragma: keep +#include "iceberg/location_provider.h" #include "iceberg/snapshot.h" -#include "iceberg/statistics_file.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -81,6 +80,11 @@ const TableMetadata& TransactionContext::current() const { return metadata_builder->current(); } +Result TransactionContext::NewDataLocation(std::string_view filename) const { + ICEBERG_ASSIGN_OR_RAISE(auto location_provider, table->location_provider()); + return location_provider->NewDataLocation(filename); +} + std::string TransactionContext::MetadataFileLocation(std::string_view filename) const { const auto metadata_location = current().properties.Get(TableProperties::kWriteMetadataLocation); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 60fe935f3..5b6c8a5c4 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -169,6 +169,7 @@ class ICEBERG_EXPORT TransactionContext { const TableMetadata* base() const; const TableMetadata& current() const; std::string MetadataFileLocation(std::string_view filename) const; + Result NewDataLocation(std::string_view filename) const; std::shared_ptr table; std::unique_ptr metadata_builder; diff --git a/src/iceberg/update/merging_snapshot_update.cc b/src/iceberg/update/merging_snapshot_update.cc index a0d882d14..4bd3c9a4b 100644 --- a/src/iceberg/update/merging_snapshot_update.cc +++ b/src/iceberg/update/merging_snapshot_update.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -29,9 +30,11 @@ #include "iceberg/constants.h" #include "iceberg/delete_file_index.h" +#include "iceberg/deletes/roaring_position_bitmap.h" #include "iceberg/expression/expressions.h" #include "iceberg/expression/manifest_evaluator.h" #include "iceberg/expression/projections.h" +#include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_group.h" #include "iceberg/manifest/manifest_list.h" @@ -39,9 +42,12 @@ #include "iceberg/manifest/manifest_util_internal.h" #include "iceberg/manifest/manifest_writer.h" #include "iceberg/partition_spec.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_reader.h" +#include "iceberg/puffin/puffin_writer.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" -#include "iceberg/table.h" +#include "iceberg/table.h" // IWYU pragma: keep #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/transaction.h" @@ -743,9 +749,12 @@ Result> MergingSnapshotUpdate::WriteNewDataManifests() } Result> -MergingSnapshotUpdate::MergeDVs() const { +MergingSnapshotUpdate::MergeDVs() { std::vector result; result.reserve(dvs_by_referenced_file_.size()); + std::unique_ptr writer; + std::string merged_dv_path; + std::vector> merged_files; for (const auto& entry : dvs_by_referenced_file_.entries()) { const auto& referenced_file = entry.referenced_file; @@ -754,20 +763,110 @@ MergingSnapshotUpdate::MergeDVs() const { continue; } if (dvs.size() > 1) { - // TODO(Guotao): Merge duplicate DVs for one referenced data file once C++ - // has DVUtil/Puffin DV rewriting; Java merges them before writing manifests. - return NotImplemented( - "Merging multiple deletion vectors is not supported yet for referenced " - "data file: {}", - referenced_file); + if (!writer) { + ICEBERG_ASSIGN_OR_RAISE(merged_dv_path, ctx_->NewDataLocation(std::format( + "merged-dvs-{}-{}.puffin", + SnapshotId(), merged_dv_count_++))); + ICEBERG_ASSIGN_OR_RAISE(auto output, + ctx_->table->io()->NewOutputFile(merged_dv_path)); + ICEBERG_ASSIGN_OR_RAISE(writer, puffin::PuffinWriter::Make(std::move(output))); + } + ICEBERG_ASSIGN_OR_RAISE( + auto merged, + MergeDVsForReferencedFile(referenced_file, dvs, *writer, merged_dv_path)); + merged_files.push_back(merged.file); + merged_dvs_.push_back(merged); + result.push_back(std::move(merged)); + continue; } result.push_back(dvs.front()); } + if (writer) { + ICEBERG_RETURN_UNEXPECTED(writer->Finish()); + ICEBERG_ASSIGN_OR_RAISE(auto file_size, writer->FileSize()); + for (const auto& file : merged_files) { + file->file_size_in_bytes = file_size; + } + } + return result; } +Result +MergingSnapshotUpdate::MergeDVsForReferencedFile( + const std::string& referenced_file, const std::vector& dvs, + puffin::PuffinWriter& writer, const std::string& path) { + const auto& first_file = dvs.front().file; + const auto first_data_sequence_number = dvs.front().data_sequence_number; + const auto first_spec_id = first_file->partition_spec_id; + RoaringPositionBitmap bitmap; + + for (const auto& dv : dvs) { + ICEBERG_PRECHECK(dv.data_sequence_number == first_data_sequence_number, + "Cannot merge DVs, mismatched sequence numbers for {}", + referenced_file); + ICEBERG_PRECHECK(dv.file->partition_spec_id == first_spec_id, + "Cannot merge DVs, mismatched partition specs for {}", + referenced_file); + ICEBERG_PRECHECK(dv.file->partition == first_file->partition, + "Cannot merge DVs, mismatched partition tuples for {}", + referenced_file); + ICEBERG_PRECHECK(dv.file->content_offset.has_value(), + "DV must have a content offset: {}", dv.file->file_path); + ICEBERG_PRECHECK(dv.file->content_size_in_bytes.has_value(), + "DV must have a content size: {}", dv.file->file_path); + + ICEBERG_ASSIGN_OR_RAISE(auto input, + ctx_->table->io()->NewInputFile(dv.file->file_path)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + puffin::PuffinReader::Make(std::move(input), std::nullopt, + dv.file->file_size_in_bytes)); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, reader->ReadFileMetadata()); + auto blob_it = std::ranges::find_if(metadata.blobs, [&](const auto& blob) { + return blob.type == puffin::StandardBlobTypes::kDeletionVectorV1 && + blob.offset == dv.file->content_offset.value() && + blob.length == dv.file->content_size_in_bytes.value(); + }); + ICEBERG_PRECHECK(blob_it != metadata.blobs.end(), + "Cannot find DV blob at offset {} with length {} in {}", + dv.file->content_offset.value(), + dv.file->content_size_in_bytes.value(), dv.file->file_path); + ICEBERG_ASSIGN_OR_RAISE(auto blob, reader->ReadBlob(*blob_it)); + ICEBERG_ASSIGN_OR_RAISE( + auto dv_bitmap, + RoaringPositionBitmap::Deserialize(std::string_view( + reinterpret_cast(blob.second.data()), blob.second.size()))); + bitmap.Or(dv_bitmap); + } + + bitmap.Optimize(); + ICEBERG_ASSIGN_OR_RAISE(auto serialized, bitmap.Serialize()); + + ICEBERG_ASSIGN_OR_RAISE( + auto blob_metadata, + writer.Write(puffin::Blob{ + .type = std::string(puffin::StandardBlobTypes::kDeletionVectorV1), + .input_fields = {}, + .snapshot_id = SnapshotId(), + .sequence_number = first_data_sequence_number.value_or(0), + .data = {serialized.begin(), serialized.end()}, + })); + + auto merged_file = std::make_shared(*first_file); + merged_file->file_path = path; + merged_file->record_count = static_cast(bitmap.Cardinality()); + merged_file->referenced_data_file = referenced_file; + merged_file->content_offset = blob_metadata.offset; + merged_file->content_size_in_bytes = blob_metadata.length; + + return PendingDeleteFile{ + .file = std::move(merged_file), + .data_sequence_number = first_data_sequence_number, + }; +} + Result> MergingSnapshotUpdate::WriteNewDeleteManifests() { // If new files were staged after the cache was populated (commit retry), invalidate. if (has_new_delete_files_ && !cached_new_delete_manifests_.empty()) { @@ -775,6 +874,13 @@ Result> MergingSnapshotUpdate::WriteNewDeleteManifests std::ignore = DeleteFile(m.manifest_path); } cached_new_delete_manifests_.clear(); + std::unordered_set deleted_merged_dv_paths; + for (const auto& dv : merged_dvs_) { + if (deleted_merged_dv_paths.insert(dv.file->file_path).second) { + std::ignore = DeleteFile(dv.file->file_path); + } + } + merged_dvs_.clear(); added_delete_files_summary_.Clear(); } @@ -988,6 +1094,15 @@ Status MergingSnapshotUpdate::CleanUncommittedAppends( // rewritten_append_manifests_ are always owned by the table. ICEBERG_RETURN_UNEXPECTED( DeleteUncommitted(rewritten_append_manifests_, committed, /*clear=*/false)); + if (committed.empty()) { + std::unordered_set deleted_merged_dv_paths; + for (const auto& dv : merged_dvs_) { + if (deleted_merged_dv_paths.insert(dv.file->file_path).second) { + std::ignore = DeleteFile(dv.file->file_path); + } + } + } + merged_dvs_.clear(); // append_manifests_ are only owned by the table if the commit succeeded. if (!committed.empty()) { diff --git a/src/iceberg/update/merging_snapshot_update.h b/src/iceberg/update/merging_snapshot_update.h index 5d4e128e9..0b830e673 100644 --- a/src/iceberg/update/merging_snapshot_update.h +++ b/src/iceberg/update/merging_snapshot_update.h @@ -40,6 +40,10 @@ namespace iceberg { +namespace puffin { +class PuffinWriter; +} + /// \brief Abstract base class for all merge-based snapshot write operations. /// /// Provides the complete filter → write → merge pipeline that all merge-based @@ -333,7 +337,11 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { void SetSummaryProperty(const std::string& property, const std::string& value) override; - Result> MergeDVs() const; + Result> MergeDVs(); + + Result MergeDVsForReferencedFile( + const std::string& referenced_file, const std::vector& dvs, + puffin::PuffinWriter& writer, const std::string& path); /// \brief Write new data manifests for staged data files; caches the result. Result> WriteNewDataManifests(); @@ -380,6 +388,8 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { std::vector cached_new_data_manifests_; std::vector cached_new_delete_manifests_; + std::vector merged_dvs_; + int32_t merged_dv_count_ = 0; }; } // namespace iceberg