Skip to content

Commit fc39adf

Browse files
manuzhangcodex
andcommitted
feat: support Iceberg v3 unknown type
Add an Iceberg unknown primitive type and JSON, Arrow, Avro, Parquet, projection, and data path support for null-only unknown fields. Enforce optionality invariants so required projections cannot be materialized from unknown/null-only fields. Co-authored-by: Codex <codex@openai.com> test: cover forbidden nested type promotions Assert that promotion helpers reject nested type targets for unknown and regular primitive source types. Co-authored-by: Codex <codex@openai.com>
1 parent 100bbe3 commit fc39adf

37 files changed

Lines changed: 2251 additions & 127 deletions

src/iceberg/avro/avro_data_util.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
457457
const SchemaField& projected_field,
458458
const arrow::MetadataColumnContext& metadata_context,
459459
::arrow::ArrayBuilder* array_builder) {
460+
if (projection.kind == FieldProjection::Kind::kNull) {
461+
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
462+
return {};
463+
}
464+
460465
if (avro_node->type() == ::avro::AVRO_UNION) {
461466
size_t branch = avro_datum.unionBranch();
462467
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
@@ -507,6 +512,9 @@ Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
507512
}
508513

509514
if (array.IsNull(index)) {
515+
if (datum->type() == ::avro::AVRO_NULL) {
516+
return {};
517+
}
510518
if (!datum->isUnion()) [[unlikely]] {
511519
return InvalidSchema("Cannot extract null to non-union type: {}",
512520
::avro::toString(datum->type()));

src/iceberg/avro/avro_direct_decoder.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,12 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d
588588
const SchemaField& projected_field,
589589
const arrow::MetadataColumnContext& metadata_context,
590590
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
591+
if (projection.kind == FieldProjection::Kind::kNull) {
592+
ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node, decoder));
593+
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
594+
return {};
595+
}
596+
591597
if (avro_node->type() == ::avro::AVRO_UNION) {
592598
const size_t branch_index = decoder.decodeUnionIndex();
593599

src/iceberg/avro/avro_direct_encoder.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco
8080
return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx);
8181
}
8282

83-
if (is_null) {
83+
if (is_null && avro_node->type() != ::avro::AVRO_NULL) {
8484
return InvalidArgument("Null value in non-nullable field");
8585
}
8686

src/iceberg/avro/avro_schema_util.cc

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) {
237237
return {};
238238
}
239239

240+
Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) {
241+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL);
242+
return {};
243+
}
244+
240245
Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
241246
*node = std::make_shared<::avro::NodeRecord>();
242247

@@ -338,7 +343,7 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node)
338343
field_ids_.push(field.field_id());
339344
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node));
340345

341-
if (field.optional()) {
346+
if (field.optional() && (*node)->type() != ::avro::AVRO_NULL) {
342347
::avro::MultiLeaves union_types;
343348
union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL));
344349
union_types.add(std::move(*node));
@@ -383,8 +388,8 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
383388
case ::avro::AVRO_STRING:
384389
case ::avro::AVRO_BYTES:
385390
case ::avro::AVRO_FIXED:
386-
return {};
387391
case ::avro::AVRO_NULL:
392+
return {};
388393
case ::avro::AVRO_ENUM:
389394
default:
390395
return InvalidSchema("Unsupported Avro type: {}", static_cast<int>(node->type()));
@@ -512,6 +517,10 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
512517

513518
Status ValidateAvroSchemaEvolution(const Type& expected_type,
514519
const ::avro::NodePtr& avro_node) {
520+
if (avro_node->type() == ::avro::AVRO_NULL) {
521+
return {};
522+
}
523+
515524
switch (expected_type.type_id()) {
516525
case TypeId::kBoolean:
517526
if (avro_node->type() == ::avro::AVRO_BOOL) {
@@ -615,6 +624,8 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
615624
return {};
616625
}
617626
break;
627+
case TypeId::kUnknown:
628+
return {};
618629
default:
619630
break;
620631
}
@@ -650,6 +661,35 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
650661
const ::avro::NodePtr& avro_node,
651662
bool prune_source);
652663

664+
Result<FieldProjection> ProjectField(const SchemaField& expected_field,
665+
const ::avro::NodePtr& avro_node,
666+
size_t source_index, bool prune_source) {
667+
const Type& expected_type = *expected_field.type();
668+
::avro::NodePtr field_node;
669+
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node, &field_node));
670+
671+
FieldProjection projection;
672+
if (expected_type.type_id() == TypeId::kUnknown ||
673+
field_node->type() == ::avro::AVRO_NULL) {
674+
if (!expected_field.optional()) {
675+
return InvalidSchema("Cannot project required field with ID: {} as null",
676+
expected_field.field_id());
677+
}
678+
projection.kind = FieldProjection::Kind::kNull;
679+
return projection;
680+
}
681+
682+
if (expected_type.is_nested()) {
683+
ICEBERG_ASSIGN_OR_RAISE(projection,
684+
ProjectNested(expected_type, field_node, prune_source));
685+
} else {
686+
ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(expected_type, field_node));
687+
}
688+
projection.from = source_index;
689+
projection.kind = FieldProjection::Kind::kProjected;
690+
return projection;
691+
}
692+
653693
Result<FieldProjection> ProjectStruct(const StructType& struct_type,
654694
const ::avro::NodePtr& avro_node,
655695
bool prune_source) {
@@ -685,18 +725,9 @@ Result<FieldProjection> ProjectStruct(const StructType& struct_type,
685725
FieldProjection child_projection;
686726

687727
if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) {
688-
::avro::NodePtr field_node;
689-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node));
690-
if (expected_field.type()->is_nested()) {
691-
ICEBERG_ASSIGN_OR_RAISE(
692-
child_projection,
693-
ProjectNested(*expected_field.type(), field_node, prune_source));
694-
} else {
695-
ICEBERG_RETURN_UNEXPECTED(
696-
ValidateAvroSchemaEvolution(*expected_field.type(), field_node));
697-
}
698-
child_projection.from = iter->second.local_index;
699-
child_projection.kind = FieldProjection::Kind::kProjected;
728+
ICEBERG_ASSIGN_OR_RAISE(child_projection,
729+
ProjectField(expected_field, iter->second.field_node,
730+
iter->second.local_index, prune_source));
700731
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
701732
child_projection.kind = FieldProjection::Kind::kMetadata;
702733
} else if (expected_field.optional()) {
@@ -733,20 +764,9 @@ Result<FieldProjection> ProjectList(const ListType& list_type,
733764
}
734765

735766
FieldProjection element_projection;
736-
::avro::NodePtr element_node;
737-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node));
738-
if (expected_element_field.type()->is_nested()) {
739-
ICEBERG_ASSIGN_OR_RAISE(
740-
element_projection,
741-
ProjectNested(*expected_element_field.type(), element_node, prune_source));
742-
} else {
743-
ICEBERG_RETURN_UNEXPECTED(
744-
ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node));
745-
}
746-
747-
// Set the element projection metadata but preserve its children
748-
element_projection.kind = FieldProjection::Kind::kProjected;
749-
element_projection.from = size_t{0};
767+
ICEBERG_ASSIGN_OR_RAISE(element_projection,
768+
ProjectField(expected_element_field, avro_node->leafAt(0),
769+
size_t{0}, prune_source));
750770

751771
FieldProjection result;
752772
result.children.emplace_back(std::move(element_projection));
@@ -802,18 +822,10 @@ Result<FieldProjection> ProjectMap(const MapType& map_type,
802822

803823
for (size_t i = 0; i < map_node->leaves(); ++i) {
804824
FieldProjection sub_projection;
805-
::avro::NodePtr sub_node;
806-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node));
807825
const auto& expected_sub_field = map_type.fields()[i];
808-
if (expected_sub_field.type()->is_nested()) {
809-
ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(),
810-
sub_node, prune_source));
811-
} else {
812-
ICEBERG_RETURN_UNEXPECTED(
813-
ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node));
814-
}
815-
sub_projection.kind = FieldProjection::Kind::kProjected;
816-
sub_projection.from = i;
826+
ICEBERG_ASSIGN_OR_RAISE(
827+
sub_projection,
828+
ProjectField(expected_sub_field, map_node->leafAt(i), i, prune_source));
817829
result.children.emplace_back(std::move(sub_projection));
818830
}
819831

@@ -1049,9 +1061,9 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original
10491061
case ::avro::AVRO_STRING:
10501062
case ::avro::AVRO_BYTES:
10511063
case ::avro::AVRO_FIXED:
1064+
case ::avro::AVRO_NULL:
10521065
// For primitive types, just return a copy
10531066
return original_node;
1054-
case ::avro::AVRO_NULL:
10551067
case ::avro::AVRO_ENUM:
10561068
default:
10571069
return InvalidSchema("Unsupported Avro type for field ID application: {}",

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class ToAvroNodeVisitor {
5858
Status Visit(const UuidType& type, ::avro::NodePtr* node);
5959
Status Visit(const FixedType& type, ::avro::NodePtr* node);
6060
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
61+
Status Visit(const UnknownType&, ::avro::NodePtr*);
6162
Status Visit(const StructType& type, ::avro::NodePtr* node);
6263
Status Visit(const ListType& type, ::avro::NodePtr* node);
6364
Status Visit(const MapType& type, ::avro::NodePtr* node);

0 commit comments

Comments
 (0)