Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetFormatModel<D, S, R>
extends BaseFormatModel<D, S, ParquetValueWriter<?>, R, MessageType> {
private static final Logger LOG = LoggerFactory.getLogger(ParquetFormatModel.class);

private final boolean isBatchReader;
private final VariantShreddingAnalyzer<D, S> variantAnalyzer;
private final Function<S, UnaryOperator<D>> copyFuncFactory;
Expand Down Expand Up @@ -292,12 +296,17 @@ private FileAppender<D> buildShreddedAppender() {
Preconditions.checkState(copyFuncFactory != null, "copyFuncFactory must not be null");
UnaryOperator<D> copyFunc = copyFuncFactory.apply(engineSchema);
Preconditions.checkState(copyFunc != null, "copyFunc must not return null");
LOG.debug("Building shredded variant appender with bufferSize={}", bufferSize);

return new BufferedFileAppender<>(
bufferSize,
bufferedRows -> {
Map<Integer, Type> shreddedTypes =
variantAnalyzer.analyzeVariantColumns(bufferedRows, schema, engineSchema);
LOG.debug(
"Variant inference: rows={}, shredded fields={}",
bufferedRows.size(),
shreddedTypes.size());

if (!shreddedTypes.isEmpty()) {
internal.variantShreddingFunc((fieldId, name) -> shreddedTypes.get(fieldId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.iceberg.parquet;

import java.math.BigDecimal;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -31,7 +33,6 @@
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.VariantArray;
import org.apache.iceberg.variants.VariantObject;
import org.apache.iceberg.variants.VariantPrimitive;
import org.apache.iceberg.variants.VariantValue;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
Expand All @@ -48,9 +49,9 @@
* determinism is not guaranteed.
*
* <ul>
* <li>Object fields use a TreeMap, so field ordering is alphabetical and deterministic.
* <li>Type selection picks the most common type with explicit tie-break priority (see
* TIE_BREAK_PRIORITY), not enum ordinal.
* <li>Object fields are emitted in alphabetical order in the shredded schema.
* <li>Type selection picks the most common type with explicit tie-break priority (see {@link
* FieldInfo#TIE_BREAK_PRIORITY}), not enum ordinal.
* <li>Integer types (INT8/16/32/64) and decimal types (DECIMAL4/8/16) are each promoted to the
* widest observed before competing with other types.
* <li>Fields below {@code MIN_FIELD_FREQUENCY} are pruned. Above {@code MAX_SHREDDED_FIELDS}, the
Expand Down Expand Up @@ -168,17 +169,23 @@ private static void pruneInfrequentFields(PathNode node, int totalRows) {

// Cap at MAX_SHREDDED_FIELDS, keep the most frequently observed
if (node.objectChildren.size() > MAX_SHREDDED_FIELDS) {
List<Map.Entry<String, PathNode>> sorted = Lists.newArrayList(node.objectChildren.entrySet());
sorted.sort(
(a, b) -> {
int cmp =
Integer.compare(
b.getValue().info.observationCount, a.getValue().info.observationCount);
return cmp != 0 ? cmp : a.getKey().compareTo(b.getKey());
});
Set<String> keep = Sets.newHashSet();
for (int i = 0; i < MAX_SHREDDED_FIELDS; i++) {
keep.add(sorted.get(i).getKey());
Comparator<Map.Entry<String, PathNode>> worstFirst =
Comparator.<Map.Entry<String, PathNode>>comparingInt(
e -> e.getValue().info.observationCount)
.thenComparing(Map.Entry::getKey, Comparator.reverseOrder());
PriorityQueue<Map.Entry<String, PathNode>> topK =
new PriorityQueue<>(MAX_SHREDDED_FIELDS, worstFirst);
for (Map.Entry<String, PathNode> entry : node.objectChildren.entrySet()) {
if (topK.size() < MAX_SHREDDED_FIELDS) {
topK.offer(entry);
} else if (worstFirst.compare(entry, topK.peek()) > 0) {
topK.poll();
topK.offer(entry);
}
}
Set<String> keep = Sets.newHashSetWithExpectedSize(MAX_SHREDDED_FIELDS);
for (Map.Entry<String, PathNode> entry : topK) {
keep.add(entry.getKey());
}
node.objectChildren.entrySet().removeIf(entry -> !keep.contains(entry.getKey()));
}
Expand Down Expand Up @@ -275,8 +282,12 @@ private static Type createObjectTypedValue(PathNode node) {

Types.GroupBuilder<GroupType> builder = Types.buildGroup(Type.Repetition.OPTIONAL);
boolean hasFields = false;
for (PathNode child : node.objectChildren.values()) {
Type fieldType = buildFieldGroup(child);
// Sort by field name so the emitted schema field order is deterministic.
List<Map.Entry<String, PathNode>> sortedChildren =
Lists.newArrayList(node.objectChildren.entrySet());
sortedChildren.sort(Map.Entry.comparingByKey());
for (Map.Entry<String, PathNode> entry : sortedChildren) {
Type fieldType = buildFieldGroup(entry.getValue());
if (fieldType != null) {
builder.addField(fieldType);
hasFields = true;
Expand Down Expand Up @@ -312,7 +323,7 @@ private static Type createArrayTypedValue(PathNode node) {

private static class PathNode {
private final String fieldName;
private final Map<String, PathNode> objectChildren = Maps.newTreeMap();
private final Map<String, PathNode> objectChildren = Maps.newHashMap();
private PathNode arrayElement = null;
private FieldInfo info = null;

Expand Down Expand Up @@ -407,10 +418,14 @@ private static Type createPrimitiveTypedValue(FieldInfo info, PhysicalType primi

/** Tracks occurrence count and types for a single field. */
private static class FieldInfo {
private final Map<PhysicalType, Integer> typeCounts = Maps.newHashMap();
private static final PhysicalType[] PHYSICAL_TYPES = PhysicalType.values();

private final int[] typeCounts = new int[PHYSICAL_TYPES.length];
private int maxDecimalScale = 0;
private int maxDecimalIntegerDigits = 0;
private int observationCount = 0;
private boolean mostCommonComputed = false;
private PhysicalType mostCommonCached = null;
Comment thread
nssalian marked this conversation as resolved.

private static final Map<PhysicalType, Integer> INTEGER_PRIORITY =
ImmutableMap.of(
Expand All @@ -425,6 +440,7 @@ private static class FieldInfo {
PhysicalType.DECIMAL8, 1,
PhysicalType.DECIMAL16, 2);

/** Tie-break ordering when two physical types have equal counts. Higher value wins. */
private static final Map<PhysicalType, Integer> TIE_BREAK_PRIORITY =
ImmutableMap.<PhysicalType, Integer>builder()
.put(PhysicalType.BOOLEAN_TRUE, 0)
Expand All @@ -449,27 +465,28 @@ private static class FieldInfo {
.buildOrThrow();

void observe(VariantValue value) {
mostCommonComputed = false;
observationCount++;
// Use BOOLEAN_TRUE for both TRUE/FALSE values
PhysicalType type =
value.type() == PhysicalType.BOOLEAN_FALSE ? PhysicalType.BOOLEAN_TRUE : value.type();

typeCounts.compute(type, (k, v) -> (v == null) ? 1 : v + 1);
typeCounts[type.ordinal()]++;

// Track max precision and scale for decimal types
if (type == PhysicalType.DECIMAL4
|| type == PhysicalType.DECIMAL8
|| type == PhysicalType.DECIMAL16) {
VariantPrimitive<?> primitive = value.asPrimitive();
Object decimalValue = primitive.get();
if (decimalValue instanceof BigDecimal bd) {
if (isDecimalType(type)) {
if (value.asPrimitive().get() instanceof BigDecimal bd) {
maxDecimalIntegerDigits = Math.max(maxDecimalIntegerDigits, bd.precision() - bd.scale());
maxDecimalScale = Math.max(maxDecimalScale, bd.scale());
}
}
}

PhysicalType getMostCommonType() {
if (mostCommonComputed) {
return mostCommonCached;
}

Map<PhysicalType, Integer> combinedCounts = Maps.newHashMap();

int integerTotalCount = 0;
Expand All @@ -478,9 +495,12 @@ PhysicalType getMostCommonType() {
int decimalTotalCount = 0;
PhysicalType mostCapableDecimal = null;

for (Map.Entry<PhysicalType, Integer> entry : typeCounts.entrySet()) {
PhysicalType type = entry.getKey();
int count = entry.getValue();
for (int i = 0; i < typeCounts.length; i++) {
int count = typeCounts[i];
if (count == 0) {
continue;
}
PhysicalType type = PHYSICAL_TYPES[i];

if (isIntegerType(type)) {
integerTotalCount += count;
Expand Down Expand Up @@ -508,12 +528,16 @@ PhysicalType getMostCommonType() {
}

// Pick the most common type with tie-breaking
return combinedCounts.entrySet().stream()
.max(
Map.Entry.<PhysicalType, Integer>comparingByValue()
.thenComparingInt(entry -> TIE_BREAK_PRIORITY.getOrDefault(entry.getKey(), -1)))
.map(Map.Entry::getKey)
.orElse(null);
mostCommonCached =
combinedCounts.entrySet().stream()
.max(
Map.Entry.<PhysicalType, Integer>comparingByValue()
.thenComparingInt(
entry -> TIE_BREAK_PRIORITY.getOrDefault(entry.getKey(), -1)))
.map(Map.Entry::getKey)
.orElse(null);
mostCommonComputed = true;
return mostCommonCached;
}

private static boolean isIntegerType(PhysicalType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.math.BigDecimal;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.variants.ShreddedObject;
Expand Down Expand Up @@ -133,6 +134,9 @@ public void testIntermediateFieldCapLimitsTrackedFields() {
assertThat(schema).isInstanceOf(GroupType.class);
GroupType typedValue = (GroupType) schema;
assertThat(typedValue.getFieldCount()).isLessThanOrEqualTo(300).isGreaterThan(0);
assertThat(typedValue.containsField("field_0000")).isTrue();
assertThat(typedValue.containsField("field_0299")).isTrue();
assertThat(typedValue.containsField("field_0300")).isFalse();
}

@Test
Expand Down Expand Up @@ -408,6 +412,28 @@ public void testLongArrayInFewRowsSurvivesPruning() {
assertThat(elementFields.containsField("key")).isTrue();
}

@Test
public void testUuidFieldIsTrackedAndShredded() {
VariantMetadata meta = Variants.metadata("id");
List<VariantValue> rows = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
ShreddedObject obj = Variants.object(meta);
obj.put("id", Variants.ofUUID(UUID.randomUUID()));
rows.add(obj);
}

DirectAnalyzer analyzer = new DirectAnalyzer();
Type schema = analyzer.analyzeAndCreateSchema(rows, 0);

assertThat(schema).isNotNull();
GroupType typedValue = (GroupType) schema;
assertThat(typedValue.containsField("id")).isTrue();
GroupType idGroup = typedValue.getType("id").asGroupType();
PrimitiveType idTyped = idGroup.getType("typed_value").asPrimitiveType();
assertThat(idTyped.getLogicalTypeAnnotation())
.isInstanceOf(LogicalTypeAnnotation.UUIDLogicalTypeAnnotation.class);
}

/**
* Builds 100 variant rows where "common" appears in every row and "rare" appears in only {@code
* rareCount} rows (below MIN_FIELD_FREQUENCY = 0.10 when rareCount < 10).
Expand Down