From 5c187c9d7233bd02d1e29a48b6451bed9b5c5bc0 Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Sun, 14 Jun 2026 12:21:51 -0700 Subject: [PATCH 1/3] Parquet: Variant shredding follow-ups from PR #14297 --- .../iceberg/parquet/ParquetFormatModel.java | 9 +++++++++ .../parquet/VariantShreddingAnalyzer.java | 17 +++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java index 9a4a62cae612..b097df8fc6c7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java @@ -46,9 +46,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParquetFormatModel extends BaseFormatModel, R, MessageType> { + private static final Logger LOG = LoggerFactory.getLogger(ParquetFormatModel.class); + private final boolean isBatchReader; private final VariantShreddingAnalyzer variantAnalyzer; private final Function> copyFuncFactory; @@ -292,12 +296,17 @@ private FileAppender buildShreddedAppender() { Preconditions.checkState(copyFuncFactory != null, "copyFuncFactory must not be null"); UnaryOperator copyFunc = copyFuncFactory.apply(engineSchema); Preconditions.checkState(copyFunc != null, "copyFunc must not return null"); + LOG.debug("Building shredded variant appender with bufferSize={}", bufferSize); return new BufferedFileAppender<>( bufferSize, bufferedRows -> { Map shreddedTypes = variantAnalyzer.analyzeVariantColumns(bufferedRows, schema, engineSchema); + LOG.debug( + "Variant inference: rows={}, shredded fields={}", + bufferedRows.size(), + shreddedTypes.size()); if (!shreddedTypes.isEmpty()) { internal.variantShreddingFunc((fieldId, name) -> shreddedTypes.get(fieldId)); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java index d2a058c1128a..664702fb83d2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java @@ -48,9 +48,9 @@ * determinism is not guaranteed. * *
    - *
  • Object fields use a TreeMap, so field ordering is alphabetical and deterministic. - *
  • Type selection picks the most common type with explicit tie-break priority (see - * TIE_BREAK_PRIORITY), not enum ordinal. + *
  • Object fields are emitted in alphabetical order in the shredded schema. + *
  • Type selection picks the most common type with explicit tie-break priority (see {@link + * FieldInfo#TIE_BREAK_PRIORITY}), not enum ordinal. *
  • Integer types (INT8/16/32/64) and decimal types (DECIMAL4/8/16) are each promoted to the * widest observed before competing with other types. *
  • Fields below {@code MIN_FIELD_FREQUENCY} are pruned. Above {@code MAX_SHREDDED_FIELDS}, the @@ -275,8 +275,12 @@ private static Type createObjectTypedValue(PathNode node) { Types.GroupBuilder builder = Types.buildGroup(Type.Repetition.OPTIONAL); boolean hasFields = false; - for (PathNode child : node.objectChildren.values()) { - Type fieldType = buildFieldGroup(child); + // Sort by field name so the emitted schema field order is deterministic. + List> sortedChildren = + Lists.newArrayList(node.objectChildren.entrySet()); + sortedChildren.sort(Map.Entry.comparingByKey()); + for (Map.Entry entry : sortedChildren) { + Type fieldType = buildFieldGroup(entry.getValue()); if (fieldType != null) { builder.addField(fieldType); hasFields = true; @@ -312,7 +316,7 @@ private static Type createArrayTypedValue(PathNode node) { private static class PathNode { private final String fieldName; - private final Map objectChildren = Maps.newTreeMap(); + private final Map objectChildren = Maps.newHashMap(); private PathNode arrayElement = null; private FieldInfo info = null; @@ -425,6 +429,7 @@ private static class FieldInfo { PhysicalType.DECIMAL8, 1, PhysicalType.DECIMAL16, 2); + /** Tie-break ordering when two physical types have equal counts. Higher value wins. */ private static final Map TIE_BREAK_PRIORITY = ImmutableMap.builder() .put(PhysicalType.BOOLEAN_TRUE, 0) From d1709ca687a32bdea55cab1e9ace0409531973d3 Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Sun, 14 Jun 2026 19:38:32 -0700 Subject: [PATCH 2/3] inference cleanup --- .../parquet/VariantShreddingAnalyzer.java | 76 ++++++++++++------- .../parquet/TestVariantShreddingAnalyzer.java | 26 +++++++ 2 files changed, 73 insertions(+), 29 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java index 664702fb83d2..f4ffafc18207 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java @@ -19,8 +19,10 @@ package org.apache.iceberg.parquet; import java.math.BigDecimal; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -31,7 +33,6 @@ import org.apache.iceberg.variants.PhysicalType; import org.apache.iceberg.variants.VariantArray; import org.apache.iceberg.variants.VariantObject; -import org.apache.iceberg.variants.VariantPrimitive; import org.apache.iceberg.variants.VariantValue; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -168,17 +169,23 @@ private static void pruneInfrequentFields(PathNode node, int totalRows) { // Cap at MAX_SHREDDED_FIELDS, keep the most frequently observed if (node.objectChildren.size() > MAX_SHREDDED_FIELDS) { - List> sorted = Lists.newArrayList(node.objectChildren.entrySet()); - sorted.sort( - (a, b) -> { - int cmp = - Integer.compare( - b.getValue().info.observationCount, a.getValue().info.observationCount); - return cmp != 0 ? cmp : a.getKey().compareTo(b.getKey()); - }); - Set keep = Sets.newHashSet(); - for (int i = 0; i < MAX_SHREDDED_FIELDS; i++) { - keep.add(sorted.get(i).getKey()); + Comparator> worstFirst = + Comparator.>comparingInt( + e -> e.getValue().info.observationCount) + .thenComparing(Map.Entry::getKey, Comparator.reverseOrder()); + PriorityQueue> topK = + new PriorityQueue<>(MAX_SHREDDED_FIELDS, worstFirst); + for (Map.Entry entry : node.objectChildren.entrySet()) { + if (topK.size() < MAX_SHREDDED_FIELDS) { + topK.offer(entry); + } else if (worstFirst.compare(entry, topK.peek()) > 0) { + topK.poll(); + topK.offer(entry); + } + } + Set keep = Sets.newHashSetWithExpectedSize(MAX_SHREDDED_FIELDS); + for (Map.Entry entry : topK) { + keep.add(entry.getKey()); } node.objectChildren.entrySet().removeIf(entry -> !keep.contains(entry.getKey())); } @@ -411,10 +418,14 @@ private static Type createPrimitiveTypedValue(FieldInfo info, PhysicalType primi /** Tracks occurrence count and types for a single field. */ private static class FieldInfo { - private final Map typeCounts = Maps.newHashMap(); + private static final PhysicalType[] PHYSICAL_TYPES = PhysicalType.values(); + + private final int[] typeCounts = new int[PHYSICAL_TYPES.length]; private int maxDecimalScale = 0; private int maxDecimalIntegerDigits = 0; private int observationCount = 0; + private boolean mostCommonComputed = false; + private PhysicalType mostCommonCached = null; private static final Map INTEGER_PRIORITY = ImmutableMap.of( @@ -459,15 +470,11 @@ void observe(VariantValue value) { PhysicalType type = value.type() == PhysicalType.BOOLEAN_FALSE ? PhysicalType.BOOLEAN_TRUE : value.type(); - typeCounts.compute(type, (k, v) -> (v == null) ? 1 : v + 1); + typeCounts[type.ordinal()]++; // Track max precision and scale for decimal types - if (type == PhysicalType.DECIMAL4 - || type == PhysicalType.DECIMAL8 - || type == PhysicalType.DECIMAL16) { - VariantPrimitive primitive = value.asPrimitive(); - Object decimalValue = primitive.get(); - if (decimalValue instanceof BigDecimal bd) { + if (isDecimalType(type)) { + if (value.asPrimitive().get() instanceof BigDecimal bd) { maxDecimalIntegerDigits = Math.max(maxDecimalIntegerDigits, bd.precision() - bd.scale()); maxDecimalScale = Math.max(maxDecimalScale, bd.scale()); } @@ -475,6 +482,10 @@ void observe(VariantValue value) { } PhysicalType getMostCommonType() { + if (mostCommonComputed) { + return mostCommonCached; + } + Map combinedCounts = Maps.newHashMap(); int integerTotalCount = 0; @@ -483,9 +494,12 @@ PhysicalType getMostCommonType() { int decimalTotalCount = 0; PhysicalType mostCapableDecimal = null; - for (Map.Entry entry : typeCounts.entrySet()) { - PhysicalType type = entry.getKey(); - int count = entry.getValue(); + for (int i = 0; i < typeCounts.length; i++) { + int count = typeCounts[i]; + if (count == 0) { + continue; + } + PhysicalType type = PHYSICAL_TYPES[i]; if (isIntegerType(type)) { integerTotalCount += count; @@ -513,12 +527,16 @@ PhysicalType getMostCommonType() { } // Pick the most common type with tie-breaking - return combinedCounts.entrySet().stream() - .max( - Map.Entry.comparingByValue() - .thenComparingInt(entry -> TIE_BREAK_PRIORITY.getOrDefault(entry.getKey(), -1))) - .map(Map.Entry::getKey) - .orElse(null); + mostCommonCached = + combinedCounts.entrySet().stream() + .max( + Map.Entry.comparingByValue() + .thenComparingInt( + entry -> TIE_BREAK_PRIORITY.getOrDefault(entry.getKey(), -1))) + .map(Map.Entry::getKey) + .orElse(null); + mostCommonComputed = true; + return mostCommonCached; } private static boolean isIntegerType(PhysicalType type) { diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantShreddingAnalyzer.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantShreddingAnalyzer.java index 5ac10f74cc51..e13b9b27a6b1 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantShreddingAnalyzer.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantShreddingAnalyzer.java @@ -23,6 +23,7 @@ import java.math.BigDecimal; import java.util.List; import java.util.Locale; +import java.util.UUID; import java.util.function.Function; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.variants.ShreddedObject; @@ -133,6 +134,9 @@ public void testIntermediateFieldCapLimitsTrackedFields() { assertThat(schema).isInstanceOf(GroupType.class); GroupType typedValue = (GroupType) schema; assertThat(typedValue.getFieldCount()).isLessThanOrEqualTo(300).isGreaterThan(0); + assertThat(typedValue.containsField("field_0000")).isTrue(); + assertThat(typedValue.containsField("field_0299")).isTrue(); + assertThat(typedValue.containsField("field_0300")).isFalse(); } @Test @@ -408,6 +412,28 @@ public void testLongArrayInFewRowsSurvivesPruning() { assertThat(elementFields.containsField("key")).isTrue(); } + @Test + public void testUuidFieldIsTrackedAndShredded() { + VariantMetadata meta = Variants.metadata("id"); + List rows = Lists.newArrayList(); + for (int i = 0; i < 100; i++) { + ShreddedObject obj = Variants.object(meta); + obj.put("id", Variants.ofUUID(UUID.randomUUID())); + rows.add(obj); + } + + DirectAnalyzer analyzer = new DirectAnalyzer(); + Type schema = analyzer.analyzeAndCreateSchema(rows, 0); + + assertThat(schema).isNotNull(); + GroupType typedValue = (GroupType) schema; + assertThat(typedValue.containsField("id")).isTrue(); + GroupType idGroup = typedValue.getType("id").asGroupType(); + PrimitiveType idTyped = idGroup.getType("typed_value").asPrimitiveType(); + assertThat(idTyped.getLogicalTypeAnnotation()) + .isInstanceOf(LogicalTypeAnnotation.UUIDLogicalTypeAnnotation.class); + } + /** * Builds 100 variant rows where "common" appears in every row and "rare" appears in only {@code * rareCount} rows (below MIN_FIELD_FREQUENCY = 0.10 when rareCount < 10). From 3e01d17e17807db5026409d251d4f4957d4e3f83 Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Mon, 15 Jun 2026 22:44:22 -0700 Subject: [PATCH 3/3] PR comment, fix cache invalidation --- .../org/apache/iceberg/parquet/VariantShreddingAnalyzer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java index f4ffafc18207..b9f567d3efcb 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingAnalyzer.java @@ -465,6 +465,7 @@ private static class FieldInfo { .buildOrThrow(); void observe(VariantValue value) { + mostCommonComputed = false; observationCount++; // Use BOOLEAN_TRUE for both TRUE/FALSE values PhysicalType type =