Skip to content

Commit ea20ee3

Browse files
committed
fix: align manifest partition reads with reassigned IDs
Use unknown types for missing partition source fields when building manifest partition schemas. Add Java-style schema ID reassignment maps and use them to map raw manifest partition field IDs back to their original IDs. Also add transform result type support so unbound partition specs can derive partition field types consistently with Java.
1 parent ae29c3d commit ea20ee3

13 files changed

Lines changed: 328 additions & 35 deletions

src/iceberg/manifest/manifest_reader.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::LiveEntries() {
861861
}
862862

863863
Result<std::vector<ManifestEntry>> ManifestReaderImpl::ReadEntries(bool only_live) {
864-
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->PartitionType(*schema_));
864+
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->RawPartitionType(*schema_));
865865
auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema();
866866

867867
std::shared_ptr<Schema> projected_data_file_schema;

src/iceberg/partition_spec.cc

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,24 +74,16 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
7474

7575
std::vector<SchemaField> partition_fields;
7676
for (const auto& partition_field : fields_) {
77-
// Get the source field from the original schema by source_id
7877
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
7978
schema.FindFieldById(partition_field.source_id()));
80-
if (!source_field.has_value()) {
81-
// TODO(xiao.dong) when source field is missing,
82-
// should return an error or just use UNKNOWN type
83-
return InvalidSchema("Cannot find source field for partition field:{}",
84-
partition_field.field_id());
79+
std::shared_ptr<Type> result_type;
80+
if (source_field.has_value()) {
81+
auto source_field_type = source_field.value().get().type();
82+
result_type = partition_field.transform()->ResultType(std::move(source_field_type));
83+
} else {
84+
result_type = unknown();
8585
}
86-
auto source_field_type = source_field.value().get().type();
87-
// Bind the transform to the source field type to get the result type
88-
ICEBERG_ASSIGN_OR_RAISE(auto transform_function,
89-
partition_field.transform()->Bind(source_field_type));
90-
91-
auto result_type = transform_function->ResultType();
9286

93-
// Create the partition field with the transform result type
94-
// Partition fields are always optional (can be null)
9587
partition_fields.emplace_back(partition_field.field_id(),
9688
std::string(partition_field.name()),
9789
std::move(result_type),
@@ -101,6 +93,29 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
10193
return std::make_unique<StructType>(std::move(partition_fields));
10294
}
10395

96+
Result<std::unique_ptr<StructType>> PartitionSpec::RawPartitionType(
97+
const Schema& schema) const {
98+
const auto& ids_to_original = schema.IdsToOriginal();
99+
if (ids_to_original.empty()) {
100+
return PartitionType(schema);
101+
}
102+
103+
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, PartitionType(schema));
104+
std::vector<SchemaField> raw_partition_fields;
105+
raw_partition_fields.reserve(partition_type->fields().size());
106+
for (const auto& field : partition_type->fields()) {
107+
auto original_id = ids_to_original.find(field.field_id());
108+
if (original_id == ids_to_original.end()) {
109+
return InvalidSchema("Cannot find original field ID for reassigned field ID: {}",
110+
field.field_id());
111+
}
112+
raw_partition_fields.emplace_back(original_id->second, std::string(field.name()),
113+
field.type(), field.optional(),
114+
std::string(field.doc()));
115+
}
116+
return std::make_unique<StructType>(std::move(raw_partition_fields));
117+
}
118+
104119
Result<std::string> PartitionSpec::PartitionPath(const PartitionValues& data) const {
105120
ICEBERG_PRECHECK(fields_.size() == data.num_fields(),
106121
"Partition spec and data mismatch, expected field num {}, got {}",

src/iceberg/partition_spec.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
6464
/// \brief Get the partition type binding to the input schema.
6565
Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) const;
6666

67+
/// \brief Get the partition type as physically written in manifest files.
68+
Result<std::unique_ptr<StructType>> RawPartitionType(const Schema& schema) const;
69+
6770
/// \brief Get the partition path for the given partition data.
6871
Result<std::string> PartitionPath(const PartitionValues& data) const;
6972

src/iceberg/schema.cc

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,103 @@
3535

3636
namespace iceberg {
3737

38+
struct SchemaReassignIdContext {
39+
Schema::IdMap ids_to_reassigned;
40+
Schema::IdMap ids_to_original;
41+
};
42+
3843
namespace {
3944

45+
const Schema::IdMap& EmptyIdMap() {
46+
static const Schema::IdMap kEmpty;
47+
return kEmpty;
48+
}
49+
50+
void RecordIdReassignment(int32_t old_id, int32_t new_id,
51+
Schema::IdMap& ids_to_reassigned,
52+
Schema::IdMap& ids_to_original) {
53+
if (new_id != old_id) {
54+
ids_to_reassigned[old_id] = new_id;
55+
ids_to_original[new_id] = old_id;
56+
}
57+
}
58+
59+
SchemaField ReassignField(const SchemaField& field, int32_t new_id,
60+
const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned,
61+
Schema::IdMap& ids_to_original);
62+
63+
std::shared_ptr<Type> ReassignTypeIds(const std::shared_ptr<Type>& type,
64+
const Schema::GetId& get_id,
65+
Schema::IdMap& ids_to_reassigned,
66+
Schema::IdMap& ids_to_original) {
67+
switch (type->type_id()) {
68+
case TypeId::kStruct: {
69+
const auto& struct_type = static_cast<const StructType&>(*type);
70+
const auto& fields = struct_type.fields();
71+
std::vector<int32_t> new_ids;
72+
new_ids.reserve(fields.size());
73+
for (const auto& field : fields) {
74+
const auto new_id = get_id(field.field_id());
75+
RecordIdReassignment(field.field_id(), new_id, ids_to_reassigned,
76+
ids_to_original);
77+
new_ids.push_back(new_id);
78+
}
79+
80+
std::vector<SchemaField> reassigned_fields;
81+
reassigned_fields.reserve(fields.size());
82+
for (size_t i = 0; i < fields.size(); ++i) {
83+
reassigned_fields.emplace_back(ReassignField(fields[i], new_ids[i], get_id,
84+
ids_to_reassigned, ids_to_original));
85+
}
86+
return std::make_shared<StructType>(std::move(reassigned_fields));
87+
}
88+
case TypeId::kList: {
89+
const auto& list_type = static_cast<const ListType&>(*type);
90+
const auto& element = list_type.element();
91+
const auto new_id = get_id(element.field_id());
92+
RecordIdReassignment(element.field_id(), new_id, ids_to_reassigned,
93+
ids_to_original);
94+
return std::make_shared<ListType>(
95+
ReassignField(element, new_id, get_id, ids_to_reassigned, ids_to_original));
96+
}
97+
case TypeId::kMap: {
98+
const auto& map_type = static_cast<const MapType&>(*type);
99+
const auto& key = map_type.key();
100+
const auto& value = map_type.value();
101+
const auto new_key_id = get_id(key.field_id());
102+
const auto new_value_id = get_id(value.field_id());
103+
RecordIdReassignment(key.field_id(), new_key_id, ids_to_reassigned,
104+
ids_to_original);
105+
RecordIdReassignment(value.field_id(), new_value_id, ids_to_reassigned,
106+
ids_to_original);
107+
return std::make_shared<MapType>(
108+
ReassignField(key, new_key_id, get_id, ids_to_reassigned, ids_to_original),
109+
ReassignField(value, new_value_id, get_id, ids_to_reassigned, ids_to_original));
110+
}
111+
default:
112+
return type;
113+
}
114+
}
115+
116+
SchemaField ReassignField(const SchemaField& field, int32_t new_id,
117+
const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned,
118+
Schema::IdMap& ids_to_original) {
119+
return {new_id, std::string(field.name()),
120+
ReassignTypeIds(field.type(), get_id, ids_to_reassigned, ids_to_original),
121+
field.optional(), std::string(field.doc())};
122+
}
123+
124+
std::vector<SchemaField> ReassignIds(std::vector<SchemaField> fields,
125+
const Schema::GetId& get_id,
126+
SchemaReassignIdContext& reassign_id_context) {
127+
auto reassigned_type = ReassignTypeIds(std::make_shared<StructType>(std::move(fields)),
128+
get_id, reassign_id_context.ids_to_reassigned,
129+
reassign_id_context.ids_to_original);
130+
const auto& reassigned_fields =
131+
internal::checked_cast<const StructType&>(*reassigned_type).fields();
132+
return {reassigned_fields.begin(), reassigned_fields.end()};
133+
}
134+
40135
Status ValidateFieldNullability(const Type& type) {
41136
auto validate_field = [&](const SchemaField& field) -> Status {
42137
ICEBERG_PRECHECK(field.optional() || field.type()->type_id() != TypeId::kUnknown,
@@ -73,17 +168,23 @@ Status ValidateFieldNullability(const Type& type) {
73168

74169
} // namespace
75170

76-
Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id)
171+
Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id, GetId get_id)
77172
: StructType(std::move(fields)),
78173
schema_id_(schema_id),
79-
cache_(std::make_unique<SchemaCache>(this)) {}
174+
cache_(std::make_unique<SchemaCache>(this)) {
175+
if (get_id) {
176+
reassign_id_context_ = std::make_unique<SchemaReassignIdContext>();
177+
fields_ = ReassignIds(std::move(fields_), get_id, *reassign_id_context_);
178+
}
179+
}
80180

81181
Schema::~Schema() = default;
82182

83183
Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
84184
int32_t schema_id,
85-
std::vector<int32_t> identifier_field_ids) {
86-
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
185+
std::vector<int32_t> identifier_field_ids,
186+
GetId get_id) {
187+
auto schema = std::make_unique<Schema>(std::move(fields), schema_id, std::move(get_id));
87188

88189
if (!identifier_field_ids.empty()) {
89190
auto id_to_parent = IndexParents(*schema);
@@ -99,8 +200,8 @@ Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
99200

100201
Result<std::unique_ptr<Schema>> Schema::Make(
101202
std::vector<SchemaField> fields, int32_t schema_id,
102-
const std::vector<std::string>& identifier_field_names) {
103-
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
203+
const std::vector<std::string>& identifier_field_names, GetId get_id) {
204+
auto schema = std::make_unique<Schema>(std::move(fields), schema_id, std::move(get_id));
104205

105206
std::vector<int32_t> fresh_identifier_ids;
106207
for (const auto& name : identifier_field_names) {
@@ -181,6 +282,14 @@ const std::shared_ptr<Schema>& Schema::EmptySchema() {
181282

182283
int32_t Schema::schema_id() const { return schema_id_; }
183284

285+
const Schema::IdMap& Schema::IdsToReassigned() const {
286+
return reassign_id_context_ ? reassign_id_context_->ids_to_reassigned : EmptyIdMap();
287+
}
288+
289+
const Schema::IdMap& Schema::IdsToOriginal() const {
290+
return reassign_id_context_ ? reassign_id_context_->ids_to_original : EmptyIdMap();
291+
}
292+
184293
std::string Schema::ToString() const {
185294
std::string repr = "schema<";
186295
for (const auto& field : fields_) {

src/iceberg/schema.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
/// and any utility functions. See iceberg/type.h and iceberg/field.h as well.
2525

2626
#include <cstdint>
27+
#include <functional>
2728
#include <optional>
2829
#include <string>
2930
#include <unordered_map>
@@ -40,6 +41,7 @@
4041
namespace iceberg {
4142

4243
class SchemaCache;
44+
struct SchemaReassignIdContext;
4345

4446
/// \brief A schema for a Table.
4547
///
@@ -55,7 +57,14 @@ class ICEBERG_EXPORT Schema : public StructType {
5557
/// \brief Special value to select all columns from manifest files.
5658
static constexpr std::string_view kAllColumns = "*";
5759

58-
explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId);
60+
/// \brief Maps an original field ID to its reassigned ID.
61+
///
62+
/// The mapping is total: return the original ID when no reassignment is needed.
63+
using GetId = std::function<int32_t(int32_t)>;
64+
using IdMap = std::unordered_map<int32_t, int32_t>;
65+
66+
explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
67+
GetId get_id = {});
5968

6069
~Schema() override;
6170

@@ -64,21 +73,24 @@ class ICEBERG_EXPORT Schema : public StructType {
6473
/// \param fields The fields that make up the schema.
6574
/// \param schema_id The unique identifier for this schema (default:kInitialSchemaId).
6675
/// \param identifier_field_ids Field IDs that uniquely identify rows in the table.
76+
/// \param get_id Function mapping each original field ID to its reassigned ID.
6777
/// \return A new Schema instance or Status if failed.
6878
static Result<std::unique_ptr<Schema>> Make(std::vector<SchemaField> fields,
6979
int32_t schema_id,
70-
std::vector<int32_t> identifier_field_ids);
80+
std::vector<int32_t> identifier_field_ids,
81+
GetId get_id = {});
7182

7283
/// \brief Create a schema.
7384
///
7485
/// \param fields The fields that make up the schema.
7586
/// \param schema_id The unique identifier for this schema (default: kInitialSchemaId).
7687
/// \param identifier_field_names Canonical names of fields that uniquely identify rows
7788
/// in the table.
89+
/// \param get_id Function mapping each original field ID to its reassigned ID.
7890
/// \return A new Schema instance or Status if failed.
7991
static Result<std::unique_ptr<Schema>> Make(
8092
std::vector<SchemaField> fields, int32_t schema_id,
81-
const std::vector<std::string>& identifier_field_names);
93+
const std::vector<std::string>& identifier_field_names, GetId get_id = {});
8294

8395
/// \brief Validate that the identifier field with the given ID is valid for the schema
8496
///
@@ -166,6 +178,12 @@ class ICEBERG_EXPORT Schema : public StructType {
166178
/// \brief Return the field IDs of the identifier fields.
167179
const std::vector<int32_t>& IdentifierFieldIds() const;
168180

181+
/// \brief Return a map of original field IDs to reassigned field IDs.
182+
const IdMap& IdsToReassigned() const;
183+
184+
/// \brief Return a map of reassigned field IDs to original field IDs.
185+
const IdMap& IdsToOriginal() const;
186+
169187
/// \brief Return the canonical field names of the identifier fields.
170188
Result<std::vector<std::string>> IdentifierFieldNames() const;
171189

@@ -196,6 +214,7 @@ class ICEBERG_EXPORT Schema : public StructType {
196214
const int32_t schema_id_;
197215
// Field IDs that uniquely identify rows in the table.
198216
std::vector<int32_t> identifier_field_ids_;
217+
std::unique_ptr<SchemaReassignIdContext> reassign_id_context_;
199218
// Cache for schema mappings to facilitate fast lookups.
200219
std::unique_ptr<SchemaCache> cache_;
201220
};

src/iceberg/test/assign_id_visitor_test.cc

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) {
106106
},
107107
Schema::kInitialSchemaId),
108108
*fresh_schema);
109+
EXPECT_TRUE(fresh_schema->IdsToReassigned().empty());
110+
EXPECT_TRUE(fresh_schema->IdsToOriginal().empty());
109111
}
110112

111113
TEST(AssignFreshIdVisitorTest, NestedSchema) {
@@ -169,6 +171,55 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) {
169171
EXPECT_EQ(*expect_nested_struct_type, *nested_struct_type);
170172
}
171173

174+
TEST(AssignFreshIdVisitorTest, GetIdMaps) {
175+
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema());
176+
std::vector<SchemaField> fields(schema->fields().begin(), schema->fields().end());
177+
auto reassign_id = [](int32_t old_id) { return old_id + 1000; };
178+
179+
Schema reassigned_schema(std::move(fields), Schema::kInitialSchemaId, reassign_id);
180+
181+
EXPECT_EQ(reassigned_schema.fields()[0].field_id(), 1010);
182+
EXPECT_EQ(reassigned_schema.fields()[1].field_id(), 1020);
183+
auto list_type =
184+
std::dynamic_pointer_cast<ListType>(reassigned_schema.fields()[1].type());
185+
ASSERT_TRUE(list_type);
186+
EXPECT_EQ(list_type->element().field_id(), 1101);
187+
188+
EXPECT_EQ(reassigned_schema.IdsToReassigned().size(), 15U);
189+
EXPECT_THAT(
190+
reassigned_schema.IdsToReassigned(),
191+
testing::UnorderedElementsAre(
192+
testing::Pair(10, 1010), testing::Pair(20, 1020), testing::Pair(30, 1030),
193+
testing::Pair(40, 1040), testing::Pair(101, 1101), testing::Pair(102, 1102),
194+
testing::Pair(103, 1103), testing::Pair(201, 1201), testing::Pair(202, 1202),
195+
testing::Pair(203, 1203), testing::Pair(204, 1204), testing::Pair(301, 1301),
196+
testing::Pair(302, 1302), testing::Pair(303, 1303), testing::Pair(304, 1304)));
197+
EXPECT_THAT(
198+
reassigned_schema.IdsToOriginal(),
199+
testing::UnorderedElementsAre(
200+
testing::Pair(1010, 10), testing::Pair(1020, 20), testing::Pair(1030, 30),
201+
testing::Pair(1040, 40), testing::Pair(1101, 101), testing::Pair(1102, 102),
202+
testing::Pair(1103, 103), testing::Pair(1201, 201), testing::Pair(1202, 202),
203+
testing::Pair(1203, 203), testing::Pair(1204, 204), testing::Pair(1301, 301),
204+
testing::Pair(1302, 302), testing::Pair(1303, 303), testing::Pair(1304, 304)));
205+
}
206+
207+
TEST(AssignFreshIdVisitorTest, GetIdIdentifierNames) {
208+
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema());
209+
std::vector<SchemaField> fields(schema->fields().begin(), schema->fields().end());
210+
auto reassign_id = [](int32_t old_id) { return old_id + 1000; };
211+
212+
ICEBERG_UNWRAP_OR_FAIL(
213+
auto reassigned_schema,
214+
Schema::Make(std::move(fields), Schema::kInitialSchemaId,
215+
std::vector<std::string>{"id", "struct.outer_id"}, reassign_id));
216+
217+
EXPECT_THAT(reassigned_schema->IdentifierFieldIds(), testing::ElementsAre(1010, 1301));
218+
ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names,
219+
reassigned_schema->IdentifierFieldNames());
220+
EXPECT_THAT(identifier_field_names, testing::ElementsAre("id", "struct.outer_id"));
221+
}
222+
172223
TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) {
173224
int32_t id = 0;
174225
auto next_id = [&id]() { return ++id; };

0 commit comments

Comments
 (0)