From ea20ee3e517c1f2a2d2d18afa0babb5e7268c082 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sat, 6 Jun 2026 21:05:15 +0800 Subject: [PATCH] fix: align manifest partition reads with reassigned IDs Use unknown types for missing partition source fields when building manifest partition schemas. Add Java-style schema ID reassignment maps and use them to map raw manifest partition field IDs back to their original IDs. Also add transform result type support so unbound partition specs can derive partition field types consistently with Java. --- src/iceberg/manifest/manifest_reader.cc | 2 +- src/iceberg/partition_spec.cc | 43 +++++--- src/iceberg/partition_spec.h | 3 + src/iceberg/schema.cc | 121 ++++++++++++++++++++- src/iceberg/schema.h | 25 ++++- src/iceberg/test/assign_id_visitor_test.cc | 51 +++++++++ src/iceberg/test/expire_snapshots_test.cc | 6 +- src/iceberg/test/manifest_reader_test.cc | 20 ++++ src/iceberg/test/partition_spec_test.cc | 53 +++++++++ src/iceberg/test/transform_test.cc | 12 +- src/iceberg/transform.cc | 20 ++++ src/iceberg/transform.h | 3 + src/iceberg/update/expire_snapshots.cc | 4 - 13 files changed, 328 insertions(+), 35 deletions(-) diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 6c166df53..bc7572591 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -861,7 +861,7 @@ Result> ManifestReaderImpl::LiveEntries() { } Result> ManifestReaderImpl::ReadEntries(bool only_live) { - ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->PartitionType(*schema_)); + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->RawPartitionType(*schema_)); auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema(); std::shared_ptr projected_data_file_schema; diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index c00eab7d2..ed5aa831a 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -74,24 +74,16 @@ Result> PartitionSpec::PartitionType( std::vector partition_fields; for (const auto& partition_field : fields_) { - // Get the source field from the original schema by source_id ICEBERG_ASSIGN_OR_RAISE(auto source_field, schema.FindFieldById(partition_field.source_id())); - if (!source_field.has_value()) { - // TODO(xiao.dong) when source field is missing, - // should return an error or just use UNKNOWN type - return InvalidSchema("Cannot find source field for partition field:{}", - partition_field.field_id()); + std::shared_ptr result_type; + if (source_field.has_value()) { + auto source_field_type = source_field.value().get().type(); + result_type = partition_field.transform()->ResultType(std::move(source_field_type)); + } else { + result_type = unknown(); } - auto source_field_type = source_field.value().get().type(); - // Bind the transform to the source field type to get the result type - ICEBERG_ASSIGN_OR_RAISE(auto transform_function, - partition_field.transform()->Bind(source_field_type)); - - auto result_type = transform_function->ResultType(); - // Create the partition field with the transform result type - // Partition fields are always optional (can be null) partition_fields.emplace_back(partition_field.field_id(), std::string(partition_field.name()), std::move(result_type), @@ -101,6 +93,29 @@ Result> PartitionSpec::PartitionType( return std::make_unique(std::move(partition_fields)); } +Result> PartitionSpec::RawPartitionType( + const Schema& schema) const { + const auto& ids_to_original = schema.IdsToOriginal(); + if (ids_to_original.empty()) { + return PartitionType(schema); + } + + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, PartitionType(schema)); + std::vector raw_partition_fields; + raw_partition_fields.reserve(partition_type->fields().size()); + for (const auto& field : partition_type->fields()) { + auto original_id = ids_to_original.find(field.field_id()); + if (original_id == ids_to_original.end()) { + return InvalidSchema("Cannot find original field ID for reassigned field ID: {}", + field.field_id()); + } + raw_partition_fields.emplace_back(original_id->second, std::string(field.name()), + field.type(), field.optional(), + std::string(field.doc())); + } + return std::make_unique(std::move(raw_partition_fields)); +} + Result PartitionSpec::PartitionPath(const PartitionValues& data) const { ICEBERG_PRECHECK(fields_.size() == data.num_fields(), "Partition spec and data mismatch, expected field num {}, got {}", diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 0fb8814b8..2ed0ddc8a 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -64,6 +64,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Get the partition type binding to the input schema. Result> PartitionType(const Schema& schema) const; + /// \brief Get the partition type as physically written in manifest files. + Result> RawPartitionType(const Schema& schema) const; + /// \brief Get the partition path for the given partition data. Result PartitionPath(const PartitionValues& data) const; diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 5e60b551f..fcac43c78 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -35,8 +35,103 @@ namespace iceberg { +struct SchemaReassignIdContext { + Schema::IdMap ids_to_reassigned; + Schema::IdMap ids_to_original; +}; + namespace { +const Schema::IdMap& EmptyIdMap() { + static const Schema::IdMap kEmpty; + return kEmpty; +} + +void RecordIdReassignment(int32_t old_id, int32_t new_id, + Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original) { + if (new_id != old_id) { + ids_to_reassigned[old_id] = new_id; + ids_to_original[new_id] = old_id; + } +} + +SchemaField ReassignField(const SchemaField& field, int32_t new_id, + const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original); + +std::shared_ptr ReassignTypeIds(const std::shared_ptr& type, + const Schema::GetId& get_id, + Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original) { + switch (type->type_id()) { + case TypeId::kStruct: { + const auto& struct_type = static_cast(*type); + const auto& fields = struct_type.fields(); + std::vector new_ids; + new_ids.reserve(fields.size()); + for (const auto& field : fields) { + const auto new_id = get_id(field.field_id()); + RecordIdReassignment(field.field_id(), new_id, ids_to_reassigned, + ids_to_original); + new_ids.push_back(new_id); + } + + std::vector reassigned_fields; + reassigned_fields.reserve(fields.size()); + for (size_t i = 0; i < fields.size(); ++i) { + reassigned_fields.emplace_back(ReassignField(fields[i], new_ids[i], get_id, + ids_to_reassigned, ids_to_original)); + } + return std::make_shared(std::move(reassigned_fields)); + } + case TypeId::kList: { + const auto& list_type = static_cast(*type); + const auto& element = list_type.element(); + const auto new_id = get_id(element.field_id()); + RecordIdReassignment(element.field_id(), new_id, ids_to_reassigned, + ids_to_original); + return std::make_shared( + ReassignField(element, new_id, get_id, ids_to_reassigned, ids_to_original)); + } + case TypeId::kMap: { + const auto& map_type = static_cast(*type); + const auto& key = map_type.key(); + const auto& value = map_type.value(); + const auto new_key_id = get_id(key.field_id()); + const auto new_value_id = get_id(value.field_id()); + RecordIdReassignment(key.field_id(), new_key_id, ids_to_reassigned, + ids_to_original); + RecordIdReassignment(value.field_id(), new_value_id, ids_to_reassigned, + ids_to_original); + return std::make_shared( + ReassignField(key, new_key_id, get_id, ids_to_reassigned, ids_to_original), + ReassignField(value, new_value_id, get_id, ids_to_reassigned, ids_to_original)); + } + default: + return type; + } +} + +SchemaField ReassignField(const SchemaField& field, int32_t new_id, + const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned, + Schema::IdMap& ids_to_original) { + return {new_id, std::string(field.name()), + ReassignTypeIds(field.type(), get_id, ids_to_reassigned, ids_to_original), + field.optional(), std::string(field.doc())}; +} + +std::vector ReassignIds(std::vector fields, + const Schema::GetId& get_id, + SchemaReassignIdContext& reassign_id_context) { + auto reassigned_type = ReassignTypeIds(std::make_shared(std::move(fields)), + get_id, reassign_id_context.ids_to_reassigned, + reassign_id_context.ids_to_original); + const auto& reassigned_fields = + internal::checked_cast(*reassigned_type).fields(); + return {reassigned_fields.begin(), reassigned_fields.end()}; +} + Status ValidateFieldNullability(const Type& type) { auto validate_field = [&](const SchemaField& field) -> Status { ICEBERG_PRECHECK(field.optional() || field.type()->type_id() != TypeId::kUnknown, @@ -73,17 +168,23 @@ Status ValidateFieldNullability(const Type& type) { } // namespace -Schema::Schema(std::vector fields, int32_t schema_id) +Schema::Schema(std::vector fields, int32_t schema_id, GetId get_id) : StructType(std::move(fields)), schema_id_(schema_id), - cache_(std::make_unique(this)) {} + cache_(std::make_unique(this)) { + if (get_id) { + reassign_id_context_ = std::make_unique(); + fields_ = ReassignIds(std::move(fields_), get_id, *reassign_id_context_); + } +} Schema::~Schema() = default; Result> Schema::Make(std::vector fields, int32_t schema_id, - std::vector identifier_field_ids) { - auto schema = std::make_unique(std::move(fields), schema_id); + std::vector identifier_field_ids, + GetId get_id) { + auto schema = std::make_unique(std::move(fields), schema_id, std::move(get_id)); if (!identifier_field_ids.empty()) { auto id_to_parent = IndexParents(*schema); @@ -99,8 +200,8 @@ Result> Schema::Make(std::vector fields, Result> Schema::Make( std::vector fields, int32_t schema_id, - const std::vector& identifier_field_names) { - auto schema = std::make_unique(std::move(fields), schema_id); + const std::vector& identifier_field_names, GetId get_id) { + auto schema = std::make_unique(std::move(fields), schema_id, std::move(get_id)); std::vector fresh_identifier_ids; for (const auto& name : identifier_field_names) { @@ -181,6 +282,14 @@ const std::shared_ptr& Schema::EmptySchema() { int32_t Schema::schema_id() const { return schema_id_; } +const Schema::IdMap& Schema::IdsToReassigned() const { + return reassign_id_context_ ? reassign_id_context_->ids_to_reassigned : EmptyIdMap(); +} + +const Schema::IdMap& Schema::IdsToOriginal() const { + return reassign_id_context_ ? reassign_id_context_->ids_to_original : EmptyIdMap(); +} + std::string Schema::ToString() const { std::string repr = "schema<"; for (const auto& field : fields_) { diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h index 791ed5c8f..9245be02e 100644 --- a/src/iceberg/schema.h +++ b/src/iceberg/schema.h @@ -24,6 +24,7 @@ /// and any utility functions. See iceberg/type.h and iceberg/field.h as well. #include +#include #include #include #include @@ -40,6 +41,7 @@ namespace iceberg { class SchemaCache; +struct SchemaReassignIdContext; /// \brief A schema for a Table. /// @@ -55,7 +57,14 @@ class ICEBERG_EXPORT Schema : public StructType { /// \brief Special value to select all columns from manifest files. static constexpr std::string_view kAllColumns = "*"; - explicit Schema(std::vector fields, int32_t schema_id = kInitialSchemaId); + /// \brief Maps an original field ID to its reassigned ID. + /// + /// The mapping is total: return the original ID when no reassignment is needed. + using GetId = std::function; + using IdMap = std::unordered_map; + + explicit Schema(std::vector fields, int32_t schema_id = kInitialSchemaId, + GetId get_id = {}); ~Schema() override; @@ -64,10 +73,12 @@ class ICEBERG_EXPORT Schema : public StructType { /// \param fields The fields that make up the schema. /// \param schema_id The unique identifier for this schema (default:kInitialSchemaId). /// \param identifier_field_ids Field IDs that uniquely identify rows in the table. + /// \param get_id Function mapping each original field ID to its reassigned ID. /// \return A new Schema instance or Status if failed. static Result> Make(std::vector fields, int32_t schema_id, - std::vector identifier_field_ids); + std::vector identifier_field_ids, + GetId get_id = {}); /// \brief Create a schema. /// @@ -75,10 +86,11 @@ class ICEBERG_EXPORT Schema : public StructType { /// \param schema_id The unique identifier for this schema (default: kInitialSchemaId). /// \param identifier_field_names Canonical names of fields that uniquely identify rows /// in the table. + /// \param get_id Function mapping each original field ID to its reassigned ID. /// \return A new Schema instance or Status if failed. static Result> Make( std::vector fields, int32_t schema_id, - const std::vector& identifier_field_names); + const std::vector& identifier_field_names, GetId get_id = {}); /// \brief Validate that the identifier field with the given ID is valid for the schema /// @@ -166,6 +178,12 @@ class ICEBERG_EXPORT Schema : public StructType { /// \brief Return the field IDs of the identifier fields. const std::vector& IdentifierFieldIds() const; + /// \brief Return a map of original field IDs to reassigned field IDs. + const IdMap& IdsToReassigned() const; + + /// \brief Return a map of reassigned field IDs to original field IDs. + const IdMap& IdsToOriginal() const; + /// \brief Return the canonical field names of the identifier fields. Result> IdentifierFieldNames() const; @@ -196,6 +214,7 @@ class ICEBERG_EXPORT Schema : public StructType { const int32_t schema_id_; // Field IDs that uniquely identify rows in the table. std::vector identifier_field_ids_; + std::unique_ptr reassign_id_context_; // Cache for schema mappings to facilitate fast lookups. std::unique_ptr cache_; }; diff --git a/src/iceberg/test/assign_id_visitor_test.cc b/src/iceberg/test/assign_id_visitor_test.cc index 8bec0ad53..4830e4803 100644 --- a/src/iceberg/test/assign_id_visitor_test.cc +++ b/src/iceberg/test/assign_id_visitor_test.cc @@ -106,6 +106,8 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) { }, Schema::kInitialSchemaId), *fresh_schema); + EXPECT_TRUE(fresh_schema->IdsToReassigned().empty()); + EXPECT_TRUE(fresh_schema->IdsToOriginal().empty()); } TEST(AssignFreshIdVisitorTest, NestedSchema) { @@ -169,6 +171,55 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) { EXPECT_EQ(*expect_nested_struct_type, *nested_struct_type); } +TEST(AssignFreshIdVisitorTest, GetIdMaps) { + ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema()); + std::vector fields(schema->fields().begin(), schema->fields().end()); + auto reassign_id = [](int32_t old_id) { return old_id + 1000; }; + + Schema reassigned_schema(std::move(fields), Schema::kInitialSchemaId, reassign_id); + + EXPECT_EQ(reassigned_schema.fields()[0].field_id(), 1010); + EXPECT_EQ(reassigned_schema.fields()[1].field_id(), 1020); + auto list_type = + std::dynamic_pointer_cast(reassigned_schema.fields()[1].type()); + ASSERT_TRUE(list_type); + EXPECT_EQ(list_type->element().field_id(), 1101); + + EXPECT_EQ(reassigned_schema.IdsToReassigned().size(), 15U); + EXPECT_THAT( + reassigned_schema.IdsToReassigned(), + testing::UnorderedElementsAre( + testing::Pair(10, 1010), testing::Pair(20, 1020), testing::Pair(30, 1030), + testing::Pair(40, 1040), testing::Pair(101, 1101), testing::Pair(102, 1102), + testing::Pair(103, 1103), testing::Pair(201, 1201), testing::Pair(202, 1202), + testing::Pair(203, 1203), testing::Pair(204, 1204), testing::Pair(301, 1301), + testing::Pair(302, 1302), testing::Pair(303, 1303), testing::Pair(304, 1304))); + EXPECT_THAT( + reassigned_schema.IdsToOriginal(), + testing::UnorderedElementsAre( + testing::Pair(1010, 10), testing::Pair(1020, 20), testing::Pair(1030, 30), + testing::Pair(1040, 40), testing::Pair(1101, 101), testing::Pair(1102, 102), + testing::Pair(1103, 103), testing::Pair(1201, 201), testing::Pair(1202, 202), + testing::Pair(1203, 203), testing::Pair(1204, 204), testing::Pair(1301, 301), + testing::Pair(1302, 302), testing::Pair(1303, 303), testing::Pair(1304, 304))); +} + +TEST(AssignFreshIdVisitorTest, GetIdIdentifierNames) { + ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema()); + std::vector fields(schema->fields().begin(), schema->fields().end()); + auto reassign_id = [](int32_t old_id) { return old_id + 1000; }; + + ICEBERG_UNWRAP_OR_FAIL( + auto reassigned_schema, + Schema::Make(std::move(fields), Schema::kInitialSchemaId, + std::vector{"id", "struct.outer_id"}, reassign_id)); + + EXPECT_THAT(reassigned_schema->IdentifierFieldIds(), testing::ElementsAre(1010, 1301)); + ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names, + reassigned_schema->IdentifierFieldNames()); + EXPECT_THAT(identifier_field_names, testing::ElementsAre("id", "struct.outer_id")); +} + TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) { int32_t id = 0; auto next_id = [&id]() { return ++id; }; diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 3a99b0009..274a8cb44 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -752,7 +752,7 @@ TEST_F(ExpireSnapshotsCleanupTest, IncrementalSkipsCherryPickedSnapshotCleanup) EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId); } -TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpiredSpec) { +TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupReadsExpiredSpecWithMissingSource) { const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; const auto expired_manifest_list_path = @@ -796,9 +796,9 @@ TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpire [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); EXPECT_THAT(update->Commit(), IsOk()); - EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, + EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, + expired_data_manifest_path, expired_manifest_list_path)); - EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path))); } TEST_F(ExpireSnapshotsCleanupTest, CommitIgnoresMalformedSourceSnapshotIdCleanup) { diff --git a/src/iceberg/test/manifest_reader_test.cc b/src/iceberg/test/manifest_reader_test.cc index f2c4ef406..4a17eacaa 100644 --- a/src/iceberg/test/manifest_reader_test.cc +++ b/src/iceberg/test/manifest_reader_test.cc @@ -288,6 +288,26 @@ TEST_P(TestManifestReader, TestManifestReaderWithPartitionMetadata) { EXPECT_EQ(read_entry.data_file->partition.values()[0], Literal::Int(0)); } +TEST_P(TestManifestReader, ReadsEntriesWhenPartitionSourceFieldIsMissing) { + auto version = GetParam(); + auto file = MakeDataFile("/path/to/historical-data.parquet", + PartitionValues({Literal::Int(7)})); + auto manifest = + WriteManifest(version, /*snapshot_id=*/1000L, + {MakeEntry(ManifestStatus::kAdded, 1000L, std::move(file))}); + + auto current_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(/*field_id=*/3, "id", int32())}); + + ICEBERG_UNWRAP_OR_FAIL(auto reader, + ManifestReader::Make(manifest, file_io_, current_schema, spec_)); + ICEBERG_UNWRAP_OR_FAIL(auto read_entries, reader->Entries()); + + ASSERT_EQ(read_entries.size(), 1U); + EXPECT_EQ(read_entries[0].data_file->file_path, "/path/to/historical-data.parquet"); + EXPECT_EQ(read_entries[0].data_file->record_count, 1); +} + TEST_P(TestManifestReader, TestDeleteFilesWithReferences) { auto version = GetParam(); if (version < 2) { diff --git a/src/iceberg/test/partition_spec_test.cc b/src/iceberg/test/partition_spec_test.cc index 89c4cdc83..e3b86e274 100644 --- a/src/iceberg/test/partition_spec_test.cc +++ b/src/iceberg/test/partition_spec_test.cc @@ -149,6 +149,59 @@ TEST(PartitionSpecTest, PartitionTypeTest) { EXPECT_EQ(pt_field3, partition_type->fields()[2]); } +TEST(PartitionSpecTest, PartitionTypeMissingSource) { + Schema schema({SchemaField::MakeRequired(2, "ts", timestamp())}, + Schema::kInitialSchemaId); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, + PartitionSpec::Make( + 1, {PartitionField(1, 1000, "dropped_identity", Transform::Identity()), + PartitionField(3, 1001, "dropped_bucket", Transform::Bucket(16)), + PartitionField(2, 1002, "ts_day", Transform::Day())})); + + ICEBERG_UNWRAP_OR_FAIL(auto partition_type, spec->PartitionType(schema)); + + ASSERT_EQ(partition_type->fields().size(), 3U); + EXPECT_EQ(partition_type->fields()[0], + SchemaField::MakeOptional(1000, "dropped_identity", unknown())); + EXPECT_EQ(partition_type->fields()[1], + SchemaField::MakeOptional(1001, "dropped_bucket", unknown())); + EXPECT_EQ(partition_type->fields()[2], + SchemaField::MakeOptional(1002, "ts_day", date())); +} + +TEST(PartitionSpecTest, RawPartitionTypeNoReassign) { + Schema schema({SchemaField::MakeRequired(2, "ts", timestamp())}, + Schema::kInitialSchemaId); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, PartitionSpec::Make( + 1, {PartitionField(1, 1000, "dropped", Transform::Identity())})); + + ICEBERG_UNWRAP_OR_FAIL(auto partition_type, spec->PartitionType(schema)); + ICEBERG_UNWRAP_OR_FAIL(auto raw_partition_type, spec->RawPartitionType(schema)); + + EXPECT_EQ(*raw_partition_type, *partition_type); +} + +TEST(PartitionSpecTest, RawPartitionTypeReassignIds) { + auto reassign_id = [](int32_t old_id) { return old_id == 1000 ? 2000 : old_id; }; + Schema schema({SchemaField::MakeOptional(1000, "partition_col", int32())}, + Schema::kInitialSchemaId, reassign_id); + ICEBERG_UNWRAP_OR_FAIL( + auto spec, PartitionSpec::Make(1, {PartitionField(2000, 2000, "partition_col", + Transform::Identity())})); + + ICEBERG_UNWRAP_OR_FAIL(auto partition_type, spec->PartitionType(schema)); + ICEBERG_UNWRAP_OR_FAIL(auto raw_partition_type, spec->RawPartitionType(schema)); + + ASSERT_EQ(partition_type->fields().size(), 1U); + EXPECT_EQ(partition_type->fields()[0], + SchemaField::MakeOptional(2000, "partition_col", int32())); + ASSERT_EQ(raw_partition_type->fields().size(), 1U); + EXPECT_EQ(raw_partition_type->fields()[0], + SchemaField::MakeOptional(1000, "partition_col", int32())); +} + TEST(PartitionSpecTest, InvalidTransformForType) { // Test Day transform on string type (should fail) auto field_string = SchemaField::MakeRequired(6, "s", string()); diff --git a/src/iceberg/test/transform_test.cc b/src/iceberg/test/transform_test.cc index 47a1e87e6..943b2fb52 100644 --- a/src/iceberg/test/transform_test.cc +++ b/src/iceberg/test/transform_test.cc @@ -159,12 +159,16 @@ TEST(TransformResultTypeTest, PositiveCases) { ASSERT_TRUE(result.has_value()) << "Failed to parse: " << c.str; const auto& transform = result.value(); - const auto transformPtr = transform->Bind(c.source_type); - ASSERT_TRUE(transformPtr.has_value()) << "Failed to bind: " << c.str; - - auto result_type = transformPtr.value()->ResultType(); + auto result_type = transform->ResultType(c.source_type); + ASSERT_NE(result_type, nullptr) << "Missing result type for: " << c.str; EXPECT_EQ(result_type->type_id(), c.expected_result_type->type_id()) << "Unexpected result type for: " << c.str; + + const auto transform_func = transform->Bind(c.source_type); + ASSERT_TRUE(transform_func.has_value()) << "Failed to bind: " << c.str; + EXPECT_EQ(transform_func.value()->ResultType()->type_id(), + c.expected_result_type->type_id()) + << "Unexpected bound result type for: " << c.str; } } diff --git a/src/iceberg/transform.cc b/src/iceberg/transform.cc index c915ec067..453941c95 100644 --- a/src/iceberg/transform.cc +++ b/src/iceberg/transform.cc @@ -135,6 +135,26 @@ Result> Transform::Bind( } } +std::shared_ptr Transform::ResultType( + const std::shared_ptr& source_type) const { + switch (transform_type_) { + case TransformType::kIdentity: + case TransformType::kTruncate: + case TransformType::kVoid: + return source_type; + case TransformType::kBucket: + case TransformType::kYear: + case TransformType::kMonth: + case TransformType::kHour: + return int32(); + case TransformType::kDay: + return date(); + case TransformType::kUnknown: + return string(); + } + std::unreachable(); +} + bool Transform::CanTransform(const Type& source_type) const { switch (transform_type_) { case TransformType::kIdentity: diff --git a/src/iceberg/transform.h b/src/iceberg/transform.h index 873b3ca6e..6b855a74d 100644 --- a/src/iceberg/transform.h +++ b/src/iceberg/transform.h @@ -151,6 +151,9 @@ class ICEBERG_EXPORT Transform : public util::Formattable { Result> Bind( const std::shared_ptr& source_type) const; + /// \brief Return the type produced by this transform for a source type. + std::shared_ptr ResultType(const std::shared_ptr& source_type) const; + /// \brief Checks whether this function can be applied to the given Type. /// \param source_type The source type to check. /// \return true if this transform can be applied to the type, false otherwise diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index c9ac9e4cd..7c056fab2 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -51,10 +51,6 @@ namespace { Result> MakeManifestReader( const ManifestFile& manifest, const std::shared_ptr& file_io, const TableMetadata& metadata) { - // TODO(gangwu): Build manifest file schemas from PartitionSpec::RawPartitionType - // with UnknownType for dropped source fields instead of requiring the table schema - // to bind every partition source field. Until then, cleanup fails closed when - // historical specs cannot bind to the metadata schema. ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); TableMetadataCache metadata_cache(&metadata); ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());