From eb012e681ee5e76da85e2b719e95f3ee2f233150 Mon Sep 17 00:00:00 2001 From: Gabor Kaszab Date: Mon, 11 May 2026 15:47:30 +0200 Subject: [PATCH] Core: Basic fields and schemas for column files This change introduces the basic structs for column files and also integrates them to the schema for TrackedFile and Tracking. --- .../java/org/apache/iceberg/ColumnFile.java | 54 +++++ .../org/apache/iceberg/ColumnFileStruct.java | 171 +++++++++++++++ .../java/org/apache/iceberg/TrackedFile.java | 12 +- .../apache/iceberg/TrackedFileBuilder.java | 65 +++++- .../org/apache/iceberg/TrackedFileStruct.java | 27 ++- .../java/org/apache/iceberg/Tracking.java | 15 +- .../org/apache/iceberg/TrackingBuilder.java | 22 +- .../org/apache/iceberg/TrackingStruct.java | 92 ++++---- .../apache/iceberg/TestColumnFileStruct.java | 205 ++++++++++++++++++ .../org/apache/iceberg/TestTrackedFile.java | 5 +- .../iceberg/TestTrackedFileBuilder.java | 164 +++++++++++++- .../apache/iceberg/TestTrackedFileStruct.java | 113 +++++++++- .../apache/iceberg/TestTrackingStruct.java | 106 ++++++++- 13 files changed, 981 insertions(+), 70 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/ColumnFile.java create mode 100644 core/src/main/java/org/apache/iceberg/ColumnFileStruct.java create mode 100644 core/src/test/java/org/apache/iceberg/TestColumnFileStruct.java diff --git a/core/src/main/java/org/apache/iceberg/ColumnFile.java b/core/src/main/java/org/apache/iceberg/ColumnFile.java new file mode 100644 index 000000000000..f3b119b4a714 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ColumnFile.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import org.apache.iceberg.types.Types; + +/** Information about a column file. */ +interface ColumnFile { + Types.NestedField FIELD_IDS = + Types.NestedField.required( + 161, + "field_ids", + Types.ListType.ofRequired(162, Types.IntegerType.get()), + "Field IDs this column file contains"); + Types.NestedField LOCATION = + Types.NestedField.required( + 163, "location", Types.StringType.get(), "Location of the column file"); + Types.NestedField FILE_SIZE_IN_BYTES = + Types.NestedField.required( + 164, "file_size_in_bytes", Types.LongType.get(), "Total column file size in bytes"); + + static Types.StructType schema() { + return Types.StructType.of(FIELD_IDS, LOCATION, FILE_SIZE_IN_BYTES); + } + + /** Returns the field IDs contained in this column file. */ + List fieldIds(); + + /** Returns the location of the column file. */ + String location(); + + /** Returns the total size of the column file in bytes. */ + long fileSizeInBytes(); + + /** Copies this column file. */ + ColumnFile copy(); +} diff --git a/core/src/main/java/org/apache/iceberg/ColumnFileStruct.java b/core/src/main/java/org/apache/iceberg/ColumnFileStruct.java new file mode 100644 index 000000000000..c73aa21afec9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ColumnFileStruct.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.avro.SupportsIndexProjection; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; + +/** Mutable {@link StructLike} implementation of {@link ColumnFile}. */ +class ColumnFileStruct extends SupportsIndexProjection implements ColumnFile, Serializable { + private static final Types.StructType BASE_TYPE = + Types.StructType.of(ColumnFile.FIELD_IDS, ColumnFile.LOCATION, ColumnFile.FILE_SIZE_IN_BYTES); + + private int[] fieldIds = null; + private String location = null; + private long fileSizeInBytes = -1L; + + /** Used by internal readers to instantiate this class with a projection schema. */ + ColumnFileStruct(Types.StructType projection) { + super(BASE_TYPE, projection); + } + + private ColumnFileStruct(int[] fieldIds, String location, long fileSizeInBytes) { + super(BASE_TYPE.fields().size()); + this.fieldIds = fieldIds; + this.location = location; + this.fileSizeInBytes = fileSizeInBytes; + } + + /** Copy constructor. */ + private ColumnFileStruct(ColumnFileStruct toCopy) { + super(toCopy); + this.fieldIds = + toCopy.fieldIds != null ? Arrays.copyOf(toCopy.fieldIds, toCopy.fieldIds.length) : null; + this.location = toCopy.location; + this.fileSizeInBytes = toCopy.fileSizeInBytes; + } + + /** Constructor for Java serialization. */ + ColumnFileStruct() { + super(BASE_TYPE.fields().size()); + } + + @Override + public List fieldIds() { + return fieldIds != null ? ArrayUtil.toUnmodifiableIntList(fieldIds) : null; + } + + @Override + public String location() { + return location; + } + + @Override + public long fileSizeInBytes() { + return fileSizeInBytes; + } + + @Override + public ColumnFile copy() { + return new ColumnFileStruct(this); + } + + @Override + protected T internalGet(int pos, Class javaClass) { + return javaClass.cast(getByPos(pos)); + } + + private Object getByPos(int pos) { + switch (pos) { + case 0: + return fieldIds(); + case 1: + return location; + case 2: + return fileSizeInBytes; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + @SuppressWarnings("unchecked") + protected void internalSet(int pos, T value) { + switch (pos) { + case 0: + this.fieldIds = ArrayUtil.toIntArray((List) value); + break; + case 1: + // always coerce to String for Serializable + this.location = value.toString(); + break; + case 2: + this.fileSizeInBytes = (long) value; + break; + default: + // ignore the object, it must be from a newer version of the format + } + } + + static Builder builder() { + return new Builder(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("field_ids", fieldIds) + .add("location", location) + .add("file_size_in_bytes", fileSizeInBytes) + .toString(); + } + + static class Builder { + private int[] fieldIds = null; + private String location = null; + private Long fileSizeInBytes = null; + + Builder fieldIds(List newFieldIds) { + Preconditions.checkArgument(newFieldIds != null, "Invalid field IDs: null"); + Preconditions.checkArgument(!newFieldIds.isEmpty(), "Invalid field IDs: empty"); + this.fieldIds = ArrayUtil.toIntArray(newFieldIds); + return this; + } + + Builder location(String newLocation) { + Preconditions.checkArgument(newLocation != null, "Invalid location: null"); + Preconditions.checkArgument(!newLocation.isEmpty(), "Invalid location: empty"); + this.location = newLocation; + return this; + } + + Builder fileSizeInBytes(long newFileSizeInBytes) { + Preconditions.checkArgument( + newFileSizeInBytes >= 0, + "Invalid file size in bytes: %s (must be >= 0)", + newFileSizeInBytes); + this.fileSizeInBytes = newFileSizeInBytes; + return this; + } + + ColumnFile build() { + Preconditions.checkArgument(fieldIds != null, "Missing required value: fieldIds"); + Preconditions.checkArgument(location != null, "Missing required value: location"); + Preconditions.checkArgument( + fileSizeInBytes != null, "Missing required value: fileSizeInBytes"); + return new ColumnFileStruct(fieldIds, location, fileSizeInBytes); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TrackedFile.java b/core/src/main/java/org/apache/iceberg/TrackedFile.java index 5b9cdd9ab46c..195d8001bf0b 100644 --- a/core/src/main/java/org/apache/iceberg/TrackedFile.java +++ b/core/src/main/java/org/apache/iceberg/TrackedFile.java @@ -94,6 +94,12 @@ interface TrackedFile { "equality_ids", Types.ListType.ofRequired(136, Types.IntegerType.get()), "Field ids used to determine row equality in equality delete files"); + Types.NestedField COLUMN_FILES = + Types.NestedField.optional( + 158, + "column_files", + Types.ListType.ofRequired(159, ColumnFile.schema()), + "Column update files"); static Types.StructType schemaWithContentStats( Types.StructType partitionType, Types.StructType contentStatsType) { @@ -114,7 +120,8 @@ static Types.StructType schemaWithContentStats( MANIFEST_INFO, KEY_METADATA, SPLIT_OFFSETS, - EQUALITY_IDS); + EQUALITY_IDS, + COLUMN_FILES); } /** Returns the tracking information for this entry. */ @@ -165,6 +172,9 @@ static Types.StructType schemaWithContentStats( /** Returns the set of field IDs used for equality comparison in equality delete files. */ List equalityIds(); + /** Returns the column files for this file. */ + List columnFiles(); + /** Copies this tracked file. */ TrackedFile copy(); diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java b/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java index d7aa28c3290f..6a86c5264dee 100644 --- a/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java +++ b/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java @@ -19,8 +19,12 @@ package org.apache.iceberg; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; class TrackedFileBuilder { private final long snapshotId; @@ -43,12 +47,14 @@ class TrackedFileBuilder { private ByteBuffer keyMetadata = null; private List splitOffsets = null; private List equalityIds = null; + private List columnFiles = null; // tracking-related fields private Tracking sourceTracking = null; private boolean dvUpdated = false; private ByteBuffer deletedPositions = null; private ByteBuffer replacedPositions = null; + private boolean columnFilesUpdated = false; /** * Creates a builder for a newly added data file entry. @@ -142,7 +148,8 @@ private static TrackedFile terminal(TrackedFile source, Tracking tracking) { source.manifestInfo(), source.keyMetadata(), source.splitOffsets(), - source.equalityIds()); + source.equalityIds(), + source.columnFiles()); } private TrackedFileBuilder(FileContent contentType, long snapshotId) { @@ -168,6 +175,7 @@ private TrackedFileBuilder(TrackedFile source, long snapshotId) { this.splitOffsets = source.splitOffsets(); this.equalityIds = source.equalityIds(); this.sourceTracking = source.tracking(); + this.columnFiles = source.columnFiles(); } TrackedFileBuilder formatVersion(int newFormatVersion) { @@ -304,6 +312,54 @@ TrackedFileBuilder replacedPositions(ByteBuffer newReplacedPositions) { return this; } + TrackedFileBuilder columnFiles(List newColumnFiles) { + Preconditions.checkArgument(newColumnFiles != null, "Invalid column files: null"); + Preconditions.checkArgument(!newColumnFiles.isEmpty(), "Invalid column files: empty"); + Preconditions.checkArgument( + contentType == FileContent.DATA || contentType == FileContent.DATA_MANIFEST, + "Cannot add column files for file with content: %s", + contentType); + validateColumnFiles(newColumnFiles); + Preconditions.checkArgument( + !addingSameColumnFiles(newColumnFiles), "The same column files already added"); + + this.columnFiles = newColumnFiles; + this.columnFilesUpdated = true; + return this; + } + + private boolean addingSameColumnFiles(List newColumnFiles) { + if (columnFiles == null || columnFiles.size() != newColumnFiles.size()) { + return false; + } + + Set existingLocations = + columnFiles.stream().map(ColumnFile::location).collect(Collectors.toSet()); + return newColumnFiles.stream().map(ColumnFile::location).allMatch(existingLocations::contains); + } + + private void validateColumnFiles(List newColumnFiles) { + Set locations = Sets.newHashSet(); + Set allFieldIds = Sets.newHashSet(); + for (ColumnFile columnFile : newColumnFiles) { + Preconditions.checkArgument(columnFile != null, "Invalid column file: null"); + + String columnFileLocation = columnFile.location(); + List fieldIds = columnFile.fieldIds(); + + Preconditions.checkArgument( + !locations.contains(columnFileLocation), + "Invalid column files: duplicate column file, location: %s", + columnFileLocation); + Preconditions.checkArgument( + Collections.disjoint(allFieldIds, fieldIds), + "Invalid column files: overlapping field IDs across column files"); + + allFieldIds.addAll(fieldIds); + locations.add(columnFileLocation); + } + } + private static boolean isLeafManifest(FileContent contentType) { return contentType == FileContent.DATA_MANIFEST || contentType == FileContent.DELETE_MANIFEST; } @@ -340,6 +396,10 @@ TrackedFile build() { trackingBuilder.replacedPositions(replacedPositions); } + if (columnFilesUpdated) { + trackingBuilder.columnFilesUpdated(); + } + return new TrackedFileStruct( trackingBuilder.build(), contentType, @@ -356,6 +416,7 @@ TrackedFile build() { manifestInfo, keyMetadata, splitOffsets, - equalityIds); + equalityIds, + columnFiles); } } diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java index 958ddfbbc4cf..546a3bcae42d 100644 --- a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java +++ b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java @@ -21,10 +21,14 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.avro.SupportsIndexProjection; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; @@ -66,7 +70,8 @@ public PartitionData copy() { TrackedFile.MANIFEST_INFO, TrackedFile.KEY_METADATA, TrackedFile.SPLIT_OFFSETS, - TrackedFile.EQUALITY_IDS); + TrackedFile.EQUALITY_IDS, + TrackedFile.COLUMN_FILES); private FileContent contentType = null; private int formatVersion = -1; @@ -86,6 +91,7 @@ public PartitionData copy() { private byte[] keyMetadata = null; private long[] splitOffsets = null; private int[] equalityIds = null; + private List columnFiles = null; /** Used by internal readers to instantiate this class with a projection schema. */ TrackedFileStruct(Types.StructType projection) { @@ -118,7 +124,8 @@ public PartitionData copy() { ManifestInfo manifestInfo, ByteBuffer keyMetadata, List splitOffsets, - List equalityIds) { + List equalityIds, + List columnFiles) { super(BASE_TYPE.fields().size()); this.tracking = tracking; this.contentType = contentType; @@ -139,6 +146,7 @@ public PartitionData copy() { this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); this.splitOffsets = ArrayUtil.toLongArray(splitOffsets); this.equalityIds = ArrayUtil.toIntArray(equalityIds); + this.columnFiles = columnFiles != null ? Lists.newArrayList(columnFiles) : null; } /** Copy constructor. */ @@ -176,6 +184,13 @@ private TrackedFileStruct(TrackedFileStruct toCopy, boolean withStats, Set equalityIds() { return equalityIds != null ? ArrayUtil.toUnmodifiableIntList(equalityIds) : null; } + @Override + public List columnFiles() { + return columnFiles != null ? Collections.unmodifiableList(columnFiles) : null; + } + @Override public TrackedFile copy() { return new TrackedFileStruct(this, true, null); @@ -291,6 +311,7 @@ private Object getByPos(int pos) { case 13 -> keyMetadata(); case 14 -> splitOffsets(); case 15 -> equalityIds(); + case 16 -> columnFiles(); default -> throw new UnsupportedOperationException("Unknown field ordinal: " + pos); }; } @@ -316,6 +337,7 @@ protected void internalSet(int pos, T value) { case 13 -> this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); case 14 -> this.splitOffsets = ArrayUtil.toLongArray((List) value); case 15 -> this.equalityIds = ArrayUtil.toIntArray((List) value); + case 16 -> this.columnFiles = (List) value; default -> { // ignore the object, it must be from a newer version of the format } @@ -341,6 +363,7 @@ public String toString() { .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") .add("split_offsets", splitOffsets == null ? "null" : splitOffsets()) .add("equality_ids", equalityIds == null ? "null" : equalityIds()) + .add("column_files", columnFiles) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/Tracking.java b/core/src/main/java/org/apache/iceberg/Tracking.java index fcdc4e50b236..2d586e4bb66e 100644 --- a/core/src/main/java/org/apache/iceberg/Tracking.java +++ b/core/src/main/java/org/apache/iceberg/Tracking.java @@ -65,6 +65,12 @@ interface Tracking { "replaced_positions", Types.BinaryType.get(), "Bitmap of positions replaced in this snapshot"); + Types.NestedField LATEST_COLUMN_FILE_SNAPSHOT_ID = + Types.NestedField.optional( + 160, + "latest_column_file_snapshot_id", + Types.LongType.get(), + "Snapshot ID where the latest column file was added; null if there are no column files"); static Types.StructType schema() { return Types.StructType.of( @@ -75,7 +81,8 @@ static Types.StructType schema() { DV_SNAPSHOT_ID, FIRST_ROW_ID, DELETED_POSITIONS, - REPLACED_POSITIONS); + REPLACED_POSITIONS, + LATEST_COLUMN_FILE_SNAPSHOT_ID); } /** Returns the status of the entry. */ @@ -107,6 +114,12 @@ default boolean isLive() { /** Returns the bitmap of positions replaced in this snapshot. */ ByteBuffer replacedPositions(); + /** + * Returns the snapshot ID where the latest column file was added; null if there are no column + * files. + */ + Long latestColumnFileSnapshotId(); + /** Returns the manifest location this entry was read from, or null. */ String manifestLocation(); diff --git a/core/src/main/java/org/apache/iceberg/TrackingBuilder.java b/core/src/main/java/org/apache/iceberg/TrackingBuilder.java index c5a11bc53cee..1a830e12b244 100644 --- a/core/src/main/java/org/apache/iceberg/TrackingBuilder.java +++ b/core/src/main/java/org/apache/iceberg/TrackingBuilder.java @@ -25,13 +25,14 @@ class TrackingBuilder { private final long newSnapshotId; private final Long snapshotId; - private final Long dataSequenceNumber; private final Long fileSequenceNumber; private final Long firstRowId; private EntryStatus status; + private Long dataSequenceNumber; private Long dvSnapshotId; private byte[] deletedPositions; private byte[] replacedPositions; + private Long latestColumnFileSnapshotId; /** * Creates a builder for a newly added file. @@ -82,6 +83,7 @@ private TrackingBuilder(long newSnapshotId) { this.dvSnapshotId = null; this.deletedPositions = null; this.replacedPositions = null; + this.latestColumnFileSnapshotId = null; } private TrackingBuilder(Tracking source, long newSnapshotId) { @@ -96,6 +98,7 @@ private TrackingBuilder(Tracking source, long newSnapshotId) { this.dvSnapshotId = source.dvSnapshotId(); this.deletedPositions = null; this.replacedPositions = null; + this.latestColumnFileSnapshotId = source.latestColumnFileSnapshotId(); } /** Indicates that the DV has been updated for the new Tracking. */ @@ -111,6 +114,17 @@ TrackingBuilder dvUpdated() { return this; } + /** Indicates that the column files list has been updated for the new Tracking. */ + TrackingBuilder columnFilesUpdated() { + this.latestColumnFileSnapshotId = newSnapshotId; + if (status == EntryStatus.EXISTING) { + this.status = EntryStatus.MODIFIED; + } + // Bumping 'dataSequenceNumber' to avoid having both equality deletes and column files. + this.dataSequenceNumber = null; + return this; + } + /** Sets the positions deleted by this commit for a manifest entry. */ TrackingBuilder deletedPositions(ByteBuffer positions) { Preconditions.checkState( @@ -140,7 +154,8 @@ Tracking build() { dvSnapshotId, firstRowId, deletedPositions, - replacedPositions); + replacedPositions, + latestColumnFileSnapshotId); } private static Tracking terminal(EntryStatus to, Tracking source, long newSnapshotId) { @@ -154,7 +169,8 @@ private static Tracking terminal(EntryStatus to, Tracking source, long newSnapsh source.dvSnapshotId(), source.firstRowId(), null, - null); + null, + source.latestColumnFileSnapshotId()); } private static void validateSource(Tracking source) { diff --git a/core/src/main/java/org/apache/iceberg/TrackingStruct.java b/core/src/main/java/org/apache/iceberg/TrackingStruct.java index 8ae4b7e4ce88..975def87deab 100644 --- a/core/src/main/java/org/apache/iceberg/TrackingStruct.java +++ b/core/src/main/java/org/apache/iceberg/TrackingStruct.java @@ -40,6 +40,7 @@ class TrackingStruct extends SupportsIndexProjection implements Tracking, Serial Tracking.FIRST_ROW_ID, Tracking.DELETED_POSITIONS, Tracking.REPLACED_POSITIONS, + Tracking.LATEST_COLUMN_FILE_SNAPSHOT_ID, MetadataColumns.ROW_POSITION); private EntryStatus status = null; @@ -50,6 +51,7 @@ class TrackingStruct extends SupportsIndexProjection implements Tracking, Serial private Long firstRowId = null; private byte[] deletedPositions = null; private byte[] replacedPositions = null; + private Long latestColumnFileSnapshotId = null; // set by manifest readers, not written to manifests private String manifestLocation = null; @@ -80,6 +82,7 @@ private TrackingStruct(TrackingStruct toCopy) { toCopy.replacedPositions != null ? Arrays.copyOf(toCopy.replacedPositions, toCopy.replacedPositions.length) : null; + this.latestColumnFileSnapshotId = toCopy.latestColumnFileSnapshotId; this.manifestLocation = toCopy.manifestLocation; this.manifestPos = toCopy.manifestPos; } @@ -92,7 +95,8 @@ private TrackingStruct(TrackingStruct toCopy) { Long dvSnapshotId, Long firstRowId, byte[] deletedPositions, - byte[] replacedPositions) { + byte[] replacedPositions, + Long latestColumnFileSnapshotId) { super(BASE_TYPE.fields().size()); this.status = status; this.snapshotId = snapshotId; @@ -102,6 +106,7 @@ private TrackingStruct(TrackingStruct toCopy) { this.firstRowId = firstRowId; this.deletedPositions = deletedPositions; this.replacedPositions = replacedPositions; + this.latestColumnFileSnapshotId = latestColumnFileSnapshotId; } void inheritFrom(Tracking manifestTracking) { @@ -118,11 +123,13 @@ void inheritFrom(Tracking manifestTracking) { manifestTracking.dataSequenceNumber(), manifestTracking.fileSequenceNumber()); - if (status == EntryStatus.ADDED) { + if (status == EntryStatus.ADDED || status == EntryStatus.MODIFIED) { if (dataSequenceNumber == null) { this.dataSequenceNumber = manifestTracking.fileSequenceNumber(); } + } + if (status == EntryStatus.ADDED) { if (fileSequenceNumber == null) { this.fileSequenceNumber = manifestTracking.fileSequenceNumber(); } @@ -174,6 +181,11 @@ public ByteBuffer replacedPositions() { return replacedPositions != null ? ByteBuffer.wrap(replacedPositions) : null; } + @Override + public Long latestColumnFileSnapshotId() { + return latestColumnFileSnapshotId; + } + @Override public String manifestLocation() { return manifestLocation; @@ -195,62 +207,37 @@ protected T internalGet(int pos, Class javaClass) { } private Object getByPos(int pos) { - switch (pos) { - case 0: - return status != null ? status.id() : null; - case 1: - return snapshotId(); - case 2: - return dataSequenceNumber(); - case 3: - return fileSequenceNumber(); - case 4: - return dvSnapshotId; - case 5: - return firstRowId; - case 6: - return deletedPositions(); - case 7: - return replacedPositions(); - case 8: - return manifestPos; - default: - throw new UnsupportedOperationException("Unknown field ordinal: " + pos); - } + return switch (pos) { + case 0 -> status != null ? status.id() : null; + case 1 -> snapshotId(); + case 2 -> dataSequenceNumber(); + case 3 -> fileSequenceNumber(); + case 4 -> dvSnapshotId; + case 5 -> firstRowId; + case 6 -> deletedPositions(); + case 7 -> replacedPositions(); + case 8 -> latestColumnFileSnapshotId; + case 9 -> manifestPos; + default -> throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + }; } @Override protected void internalSet(int pos, T value) { switch (pos) { - case 0: - this.status = EntryStatus.fromId((Integer) value); - break; - case 1: - this.snapshotId = (Long) value; - break; - case 2: - this.dataSequenceNumber = (Long) value; - break; - case 3: - this.fileSequenceNumber = (Long) value; - break; - case 4: - this.dvSnapshotId = (Long) value; - break; - case 5: - this.firstRowId = (Long) value; - break; - case 6: - this.deletedPositions = ByteBuffers.toByteArray((ByteBuffer) value); - break; - case 7: - this.replacedPositions = ByteBuffers.toByteArray((ByteBuffer) value); - break; - case 8: - this.manifestPos = (long) value; - break; - default: + case 0 -> this.status = EntryStatus.fromId((Integer) value); + case 1 -> this.snapshotId = (Long) value; + case 2 -> this.dataSequenceNumber = (Long) value; + case 3 -> this.fileSequenceNumber = (Long) value; + case 4 -> this.dvSnapshotId = (Long) value; + case 5 -> this.firstRowId = (Long) value; + case 6 -> this.deletedPositions = ByteBuffers.toByteArray((ByteBuffer) value); + case 7 -> this.replacedPositions = ByteBuffers.toByteArray((ByteBuffer) value); + case 8 -> this.latestColumnFileSnapshotId = (Long) value; + case 9 -> this.manifestPos = (long) value; + default -> { // ignore the object, it must be from a newer version of the format + } } } @@ -265,6 +252,7 @@ public String toString() { .add("first_row_id", firstRowId) .add("deleted_positions", deletedPositions == null ? "null" : "(binary)") .add("replaced_positions", replacedPositions == null ? "null" : "(binary)") + .add("latest_column_file_snapshot_id", latestColumnFileSnapshotId) .toString(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestColumnFileStruct.java b/core/src/test/java/org/apache/iceberg/TestColumnFileStruct.java new file mode 100644 index 000000000000..96b06992cf8e --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestColumnFileStruct.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestColumnFileStruct { + + private static final List FIELD_IDS = Lists.newArrayList(1, 2, 3); + private static final String LOCATION = "s3://bucket/data/column.parquet"; + + @Test + void testFieldAccess() { + ColumnFile columnFile = + ColumnFileStruct.builder() + .fieldIds(FIELD_IDS) + .location(LOCATION) + .fileSizeInBytes(1024L) + .build(); + + assertThat(columnFile.fieldIds()).containsExactlyElementsOf(FIELD_IDS); + assertThat(columnFile.location()).isEqualTo(LOCATION); + assertThat(columnFile.fileSizeInBytes()).isEqualTo(1024L); + } + + @Test + void testCopy() { + ColumnFile columnFile = + ColumnFileStruct.builder() + .fieldIds(FIELD_IDS) + .location(LOCATION) + .fileSizeInBytes(2048L) + .build(); + + ColumnFile copy = columnFile.copy(); + + assertThat(copy.fieldIds()).containsExactlyElementsOf(FIELD_IDS); + assertThat(copy.location()).isEqualTo(LOCATION); + assertThat(copy.fileSizeInBytes()).isEqualTo(2048L); + } + + @Test + void testStructLikeSize() { + ColumnFileStruct columnFile = new ColumnFileStruct(ColumnFile.schema()); + assertThat(columnFile.size()).isEqualTo(3); + } + + @Test + void testStructLikeGetSet() { + ColumnFileStruct columnFile = new ColumnFileStruct(ColumnFile.schema()); + + columnFile.set(0, Lists.newArrayList(1, 2, 3, 4)); + columnFile.set(1, LOCATION); + columnFile.set(2, 128L); + + assertThat(columnFile.get(0, List.class)).containsExactly(1, 2, 3, 4); + assertThat(columnFile.get(1, String.class)).isEqualTo(LOCATION); + assertThat(columnFile.get(2, Long.class)).isEqualTo(128L); + } + + @Test + void testProjectedStructLike() { + Types.StructType projection = + Types.StructType.of(ColumnFile.LOCATION, ColumnFile.FILE_SIZE_IN_BYTES); + + ColumnFileStruct columnFile = new ColumnFileStruct(projection); + assertThat(columnFile.size()).isEqualTo(2); + + // projected position 0 maps to internal position 1 (location) + // projected position 1 maps to internal position 2 (file_size_in_bytes) + columnFile.set(0, LOCATION); + columnFile.set(1, 1024L); + + assertThat(columnFile.location()).isEqualTo(LOCATION); + assertThat(columnFile.fileSizeInBytes()).isEqualTo(1024L); + assertThat(columnFile.get(0, String.class)).isEqualTo(LOCATION); + assertThat(columnFile.get(1, Long.class)).isEqualTo(1024L); + } + + @Test + void testInvalidBuilderValues() { + assertThatThrownBy( + () -> + ColumnFileStruct.builder() + .fieldIds(null) + .location(LOCATION) + .fileSizeInBytes(1024L) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid field IDs: null"); + + assertThatThrownBy( + () -> + ColumnFileStruct.builder() + .fieldIds(Lists.newArrayList()) + .location(LOCATION) + .fileSizeInBytes(1024L) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid field IDs: empty"); + + assertThatThrownBy( + () -> + ColumnFileStruct.builder() + .fieldIds(FIELD_IDS) + .location(null) + .fileSizeInBytes(1024L) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid location: null"); + + assertThatThrownBy( + () -> + ColumnFileStruct.builder() + .fieldIds(FIELD_IDS) + .location("") + .fileSizeInBytes(1024L) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid location: empty"); + + assertThatThrownBy( + () -> + ColumnFileStruct.builder() + .fieldIds(FIELD_IDS) + .location(LOCATION) + .fileSizeInBytes(-1) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid file size in bytes: -1 (must be >= 0)"); + } + + @Test + void testMissingBuilderValues() { + assertThatThrownBy( + () -> ColumnFileStruct.builder().location(LOCATION).fileSizeInBytes(1024L).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Missing required value: fieldIds"); + + assertThatThrownBy( + () -> ColumnFileStruct.builder().fieldIds(FIELD_IDS).fileSizeInBytes(1024L).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Missing required value: location"); + + assertThatThrownBy( + () -> ColumnFileStruct.builder().fieldIds(FIELD_IDS).location(LOCATION).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Missing required value: fileSizeInBytes"); + } + + @Test + void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException { + ColumnFile columnFile = + ColumnFileStruct.builder() + .fieldIds(FIELD_IDS) + .location(LOCATION) + .fileSizeInBytes(1024L) + .build(); + + ColumnFile deserialized = TestHelpers.roundTripSerialize(columnFile); + + assertThat(deserialized.fieldIds()).containsExactlyElementsOf(FIELD_IDS); + assertThat(deserialized.location()).isEqualTo(LOCATION); + assertThat(deserialized.fileSizeInBytes()).isEqualTo(1024L); + } + + @Test + void testKryoSerializationRoundTrip() throws IOException { + ColumnFile columnFile = + ColumnFileStruct.builder() + .fieldIds(FIELD_IDS) + .location(LOCATION) + .fileSizeInBytes(1024L) + .build(); + + ColumnFile deserialized = TestHelpers.KryoHelpers.roundTripSerialize(columnFile); + + assertThat(deserialized.fieldIds()).containsExactlyElementsOf(FIELD_IDS); + assertThat(deserialized.location()).isEqualTo(LOCATION); + assertThat(deserialized.fileSizeInBytes()).isEqualTo(1024L); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFile.java b/core/src/test/java/org/apache/iceberg/TestTrackedFile.java index 170c01ef7dc4..e0372b3a6da5 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackedFile.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackedFile.java @@ -60,7 +60,8 @@ public void schemaWithContentStatsFieldOrder() { "manifest_info", "key_metadata", "split_offsets", - "equality_ids"); + "equality_ids", + "column_files"); } @Test @@ -71,7 +72,7 @@ public void schemaWithContentStatsFieldIds() { assertThat(fields) .extracting(Types.NestedField::fieldId) .containsExactly( - 147, 134, 157, 100, 101, 103, 104, 141, 102, 146, 140, 148, 150, 131, 132, 135); + 147, 134, 157, 100, 101, 103, 104, 141, 102, 146, 140, 148, 150, 131, 132, 135, 158); } @Test diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java b/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java index d6b0701fdfd7..ceb655a4e25e 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java @@ -23,6 +23,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; import java.util.stream.Stream; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.Types; @@ -84,6 +86,12 @@ public class TestTrackedFileBuilder { private static final ImmutableList SPLIT_OFFSETS = ImmutableList.of(0L, 4096L, 8192L); private static final ByteBuffer DELETED_POSITIONS = ByteBuffer.wrap(new byte[] {10, 11, 12}); private static final ByteBuffer REPLACED_POSITIONS = ByteBuffer.wrap(new byte[] {20, 21, 22}); + private static final ColumnFile COLUMN_FILE = + ColumnFileStruct.builder() + .fieldIds(ImmutableList.of(1, 2)) + .location("s3://bucket/data/col1.parquet") + .fileSizeInBytes(100L) + .build(); private static Stream missingRequiredFieldCases() { return Stream.of( @@ -284,6 +292,75 @@ public void invalidReplacedPositionsForContentType( + contentType); } + private static Stream nonDataOrDataManifestBuilders() { + return Stream.of( + Arguments.of(TrackedFileBuilder.equalityDelete(10L), FileContent.EQUALITY_DELETES), + Arguments.of(TrackedFileBuilder.deleteManifest(10L), FileContent.DELETE_MANIFEST), + Arguments.of( + TrackedFileBuilder.from(sourceEqualityDelete(12L), 20L), FileContent.EQUALITY_DELETES), + Arguments.of( + TrackedFileBuilder.from(sourceDeleteManifest(12L), 20L), FileContent.DELETE_MANIFEST)); + } + + @ParameterizedTest + @MethodSource("nonDataOrDataManifestBuilders") + public void invalidColumnFilesForContentType( + TrackedFileBuilder builder, FileContent contentType) { + assertThatThrownBy(() -> builder.columnFiles(ImmutableList.of(COLUMN_FILE))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot add column files for file with content: " + contentType); + } + + @Test + public void invalidEmptyColumnFiles() { + assertThatThrownBy(() -> TrackedFileBuilder.data(50L).columnFiles(ImmutableList.of())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid column files: empty"); + } + + @Test + public void invalidColumnFilesWithNullEntry() { + assertThatThrownBy( + () -> TrackedFileBuilder.data(50L).columnFiles(Arrays.asList(COLUMN_FILE, null))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid column file: null"); + } + + @Test + public void invalidColumnFilesWithDuplicateLocation() { + ColumnFile columnFileWithSameLocation = + ColumnFileStruct.builder() + .fieldIds(ImmutableList.of(3)) + .location("s3://bucket/data/col1.parquet") + .fileSizeInBytes(200L) + .build(); + + assertThatThrownBy( + () -> + TrackedFileBuilder.data(50L) + .columnFiles(ImmutableList.of(COLUMN_FILE, columnFileWithSameLocation))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Invalid column files: duplicate column file, location: s3://bucket/data/col1.parquet"); + } + + @Test + public void invalidColumnFilesWithOverlappingFieldIds() { + ColumnFile columnFileWithOverlappingFieldIds = + ColumnFileStruct.builder() + .fieldIds(ImmutableList.of(2, 3)) + .location("s3://bucket/data/col2.parquet") + .fileSizeInBytes(200L) + .build(); + + assertThatThrownBy( + () -> + TrackedFileBuilder.data(50L) + .columnFiles(ImmutableList.of(COLUMN_FILE, columnFileWithOverlappingFieldIds))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid column files: overlapping field IDs across column files"); + } + @Test public void invalidNullInputs() { assertThatThrownBy(() -> TrackedFileBuilder.data(30L).location(null)) @@ -322,6 +399,10 @@ public void invalidNullInputs() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid equality IDs: null"); + assertThatThrownBy(() -> TrackedFileBuilder.data(30L).columnFiles(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid column files: null"); + assertThatThrownBy(() -> TrackedFileBuilder.from(null, 20L)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid source: null"); @@ -406,6 +487,7 @@ public void buildDataFileWithRequiredFieldsOnly() { assertThat(trackedFile.keyMetadata()).isNull(); assertThat(trackedFile.splitOffsets()).isNull(); assertThat(trackedFile.equalityIds()).isNull(); + assertThat(trackedFile.columnFiles()).isNull(); } @Test @@ -424,6 +506,7 @@ public void buildDataFileWithAllFields() { .deletionVector(DELETION_VECTOR) .keyMetadata(KEY_METADATA) .splitOffsets(SPLIT_OFFSETS) + .columnFiles(ImmutableList.of(COLUMN_FILE)) .build(); assertThat(trackedFile.formatVersion()).isEqualTo(FORMAT_VERSION_V4); @@ -439,10 +522,13 @@ public void buildDataFileWithAllFields() { assertThat(trackedFile.deletionVector()).isSameAs(DELETION_VECTOR); assertThat(trackedFile.keyMetadata()).isEqualTo(KEY_METADATA); assertThat(trackedFile.splitOffsets()).isEqualTo(SPLIT_OFFSETS); + assertThat(trackedFile.columnFiles()).hasSize(1); + verifyColumnFile(COLUMN_FILE, trackedFile.columnFiles().get(0)); assertThat(trackedFile.tracking().status()).isEqualTo(EntryStatus.ADDED); assertThat(trackedFile.tracking().snapshotId()).isEqualTo(50L); assertThat(trackedFile.tracking().dvSnapshotId()).isEqualTo(50L); + assertThat(trackedFile.tracking().latestColumnFileSnapshotId()).isEqualTo(50L); // Unsupported fields for data files assertThat(trackedFile.manifestInfo()).isNull(); @@ -481,6 +567,7 @@ public void buildEqualityDeleteFileWithRequiredFieldsOnly() { assertThat(trackedFile.manifestInfo()).isNull(); assertThat(trackedFile.keyMetadata()).isNull(); assertThat(trackedFile.splitOffsets()).isNull(); + assertThat(trackedFile.columnFiles()).isNull(); } @Test @@ -521,6 +608,7 @@ public void buildEqualityDeleteFileWithAllFields() { // Unsupported fields for equality delete files assertThat(trackedFile.deletionVector()).isNull(); assertThat(trackedFile.manifestInfo()).isNull(); + assertThat(trackedFile.columnFiles()).isNull(); } private static Stream manifestBuilders() { @@ -563,6 +651,7 @@ public void buildManifestWithRequiredFieldsOnly( assertThat(trackedFile.keyMetadata()).isNull(); assertThat(trackedFile.splitOffsets()).isNull(); assertThat(trackedFile.equalityIds()).isNull(); + assertThat(trackedFile.columnFiles()).isNull(); } @ParameterizedTest @@ -602,6 +691,7 @@ public void buildManifestWithAllFields(TrackedFileBuilder builder, FileContent c assertThat(trackedFile.deletionVector()).isNull(); assertThat(trackedFile.splitOffsets()).isNull(); assertThat(trackedFile.equalityIds()).isNull(); + assertThat(trackedFile.columnFiles()).isNull(); } private static Stream manifestSources() { @@ -721,6 +811,59 @@ public void addingSameDeletionVectorFails() { .hasMessage("The same deletion vector already added"); } + @Test + public void updateColumnFilesWhenBuildingDataFileFromSource() { + TrackedFile source = entryWithInheritedSeqNums(sourceData(10L), 45L); + + ColumnFile columnFile = + ColumnFileStruct.builder() + .fieldIds(ImmutableList.of(4, 5)) + .location("s3://bucket/data/new_col_file.parquet") + .fileSizeInBytes(1234L) + .build(); + List columnFiles = ImmutableList.of(COLUMN_FILE, columnFile); + + TrackedFile trackedFile = TrackedFileBuilder.from(source, 20L).columnFiles(columnFiles).build(); + + assertThat(trackedFile.columnFiles()).hasSize(2); + verifyColumnFile(COLUMN_FILE, trackedFile.columnFiles().get(0)); + verifyColumnFile(columnFile, trackedFile.columnFiles().get(1)); + assertThat(trackedFile.tracking().status()).isEqualTo(EntryStatus.MODIFIED); + assertThat(trackedFile.tracking().snapshotId()).isEqualTo(10L); + // data sequence number is bumped + assertThat(trackedFile.tracking().dataSequenceNumber()).isNull(); + assertThat(trackedFile.tracking().fileSequenceNumber()).isEqualTo(45L); + assertThat(trackedFile.tracking().latestColumnFileSnapshotId()).isEqualTo(20L); + } + + @Test + public void addingSameColumnFilesFails() { + TrackedFile source = entryWithInheritedSeqNums(sourceData(10L), 45L); + + ColumnFile columnFile = + ColumnFileStruct.builder() + .fieldIds(ImmutableList.of(4, 5)) + .location("s3://bucket/data/new_col_file.parquet") + .fileSizeInBytes(1234L) + .build(); + List columnFiles = ImmutableList.of(COLUMN_FILE, columnFile); + List columnFilesCopy = ImmutableList.of(COLUMN_FILE.copy(), columnFile.copy()); + List columnFilesReordered = ImmutableList.of(columnFile, COLUMN_FILE); + + TrackedFile trackedFile = TrackedFileBuilder.from(source, 20L).columnFiles(columnFiles).build(); + + assertThatThrownBy(() -> TrackedFileBuilder.from(trackedFile, 30L).columnFiles(columnFiles)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The same column files already added"); + assertThatThrownBy(() -> TrackedFileBuilder.from(trackedFile, 30L).columnFiles(columnFilesCopy)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The same column files already added"); + assertThatThrownBy( + () -> TrackedFileBuilder.from(trackedFile, 30L).columnFiles(columnFilesReordered)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The same column files already added"); + } + private static Stream nonManifestSources() { return Stream.of( Arguments.of(sourceData(10L), FileContent.DATA), @@ -782,6 +925,7 @@ private static TrackedFile sourceData(long snapshotId) { .deletionVector(DELETION_VECTOR) .keyMetadata(KEY_METADATA) .splitOffsets(SPLIT_OFFSETS) + .columnFiles(ImmutableList.of(COLUMN_FILE)) .build(); } @@ -806,6 +950,7 @@ private static TrackedFile sourceDataManifest(long snapshotId) { .fileSizeInBytes(556L) .partition(PARTITION_DATA) .manifestInfo(MANIFEST_INFO) + .columnFiles(ImmutableList.of(COLUMN_FILE)) .build(); } @@ -824,7 +969,15 @@ private static TrackedFile sourceDeleteManifest(long snapshotId) { private static TrackedFile entryWithInheritedSeqNums(TrackedFile entry, long sequenceNumber) { Tracking manifestTrackingToInheritFrom = new TrackingStruct( - EntryStatus.EXISTING, 123L, sequenceNumber, sequenceNumber, null, null, null, null); + EntryStatus.EXISTING, + 123L, + sequenceNumber, + sequenceNumber, + null, + null, + null, + null, + null); ((TrackingStruct) entry.tracking()).inheritFrom(manifestTrackingToInheritFrom); return entry; @@ -849,6 +1002,7 @@ private static void verifyFieldsAreFromSource(TrackedFile entry, TrackedFile sou assertThat(entry.splitOffsets()).isEqualTo(source.splitOffsets()); assertThat(entry.manifestInfo()).isSameAs(source.manifestInfo()); assertThat(entry.equalityIds()).isEqualTo(source.equalityIds()); + assertThat(entry.columnFiles()).isEqualTo(source.columnFiles()); assertThat(entry.tracking().dataSequenceNumber()) .isEqualTo(source.tracking().dataSequenceNumber()); @@ -859,5 +1013,13 @@ private static void verifyFieldsAreFromSource(TrackedFile entry, TrackedFile sou assertThat(entry.tracking().deletedPositions()).isEqualTo(source.tracking().deletedPositions()); assertThat(entry.tracking().replacedPositions()) .isEqualTo(source.tracking().replacedPositions()); + assertThat(entry.tracking().latestColumnFileSnapshotId()) + .isEqualTo(source.tracking().latestColumnFileSnapshotId()); + } + + private static void verifyColumnFile(ColumnFile expected, ColumnFile actual) { + assertThat(actual.fieldIds()).containsExactlyElementsOf(expected.fieldIds()); + assertThat(actual.location()).isEqualTo(expected.location()); + assertThat(actual.fileSizeInBytes()).isEqualTo(expected.fileSizeInBytes()); } } diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFileStruct.java b/core/src/test/java/org/apache/iceberg/TestTrackedFileStruct.java index 8e0d5c06824b..bb50127f27d9 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackedFileStruct.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackedFileStruct.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -35,6 +36,18 @@ class TestTrackedFileStruct { Types.StructType.of( Types.NestedField.optional(1000, "id_bucket", Types.IntegerType.get()), Types.NestedField.optional(1001, "category", Types.StringType.get())); + private static final ColumnFile COLUMN_FILE_1 = + ColumnFileStruct.builder() + .fieldIds(ImmutableList.of(1, 2)) + .location("s3://bucket/data/col-1.parquet") + .fileSizeInBytes(256L) + .build(); + private static final ColumnFile COLUMN_FILE_2 = + ColumnFileStruct.builder() + .fieldIds(ImmutableList.of(3)) + .location("s3://bucket/data/col-2.parquet") + .fileSizeInBytes(128L) + .build(); // Ordinals looked up from the TrackedFile schema so tests don't hard-code positions. private static final List SCHEMA_FIELDS = @@ -60,6 +73,7 @@ class TestTrackedFileStruct { private static final int KEY_METADATA_ORDINAL = SCHEMA_FIELDS.indexOf(TrackedFile.KEY_METADATA); private static final int SPLIT_OFFSETS_ORDINAL = SCHEMA_FIELDS.indexOf(TrackedFile.SPLIT_OFFSETS); private static final int EQUALITY_IDS_ORDINAL = SCHEMA_FIELDS.indexOf(TrackedFile.EQUALITY_IDS); + private static final int COLUMN_FILES_ORDINAL = SCHEMA_FIELDS.indexOf(TrackedFile.COLUMN_FILES); // Ordinal of MetadataColumns.ROW_POSITION within TrackingStruct's BASE_TYPE, // which appends ROW_POSITION after the Tracking schema fields. @@ -112,6 +126,7 @@ void testFieldAccess() { file.set(KEY_METADATA_ORDINAL, ByteBuffer.wrap(new byte[] {1, 2, 3})); file.set(SPLIT_OFFSETS_ORDINAL, ImmutableList.of(100L, 200L)); file.set(EQUALITY_IDS_ORDINAL, ImmutableList.of(1, 2, 3)); + file.set(COLUMN_FILES_ORDINAL, ImmutableList.of(COLUMN_FILE_1, COLUMN_FILE_2)); assertThat(file.tracking()).isNotNull(); assertThat(file.tracking().status()).isEqualTo(EntryStatus.ADDED); @@ -129,6 +144,10 @@ void testFieldAccess() { assertThat(file.keyMetadata()).isEqualTo(ByteBuffer.wrap(new byte[] {1, 2, 3})); assertThat(file.splitOffsets()).containsExactly(100L, 200L); assertThat(file.equalityIds()).containsExactly(1, 2, 3); + assertThat(file.columnFiles()).hasSize(2); + verifyColumnFile(COLUMN_FILE_1, file.columnFiles().get(0)); + verifyColumnFile(COLUMN_FILE_2, file.columnFiles().get(1)); + // should return EMPTY_PARTITION_DATA assertThat(file.partition()).isNotNull(); assertThat(file.partition().size()).isEqualTo(0); @@ -215,7 +234,64 @@ void testCopy() { assertThat(copy.equalityIds()).isNull(); assertThat(copy.tracking().manifestLocation()).isEqualTo("s3://bucket/manifest.avro"); assertThat(copy.tracking().manifestPos()).isEqualTo(3L); + assertThat(copy.tracking().latestColumnFileSnapshotId()).isEqualTo(42L); assertThat(copy.partition()).isEqualTo(newPartition(7, "music")); + assertThat(copy.columnFiles()).hasSize(2); + verifyColumnFile(COLUMN_FILE_1, copy.columnFiles().get(0)); + verifyColumnFile(COLUMN_FILE_2, copy.columnFiles().get(1)); + } + + @Test + void testCopyWithOnlyRequiredFields() { + TrackedFileStruct file = + (TrackedFileStruct) + TrackedFileBuilder.data(42L) + .formatVersion(FORMAT_VERSION_V4) + .location("s3://bucket/data/file.parquet") + .fileFormat(FileFormat.PARQUET) + .partition(newPartition(7, "music")) + .recordCount(100L) + .fileSizeInBytes(1024L) + .build(); + + TrackedFile copy = file.copy(); + assertThat(copy).isInstanceOf(TrackedFileStruct.class); + + // required fields are copied + assertThat(copy.contentType()).isEqualTo(FileContent.DATA); + assertThat(copy.formatVersion()).isEqualTo(FORMAT_VERSION_V4); + assertThat(copy.location()).isEqualTo("s3://bucket/data/file.parquet"); + assertThat(copy.fileFormat()).isEqualTo(FileFormat.PARQUET); + assertThat(copy.recordCount()).isEqualTo(100L); + assertThat(copy.fileSizeInBytes()).isEqualTo(1024L); + assertThat(copy.tracking()).isNotNull(); + assertThat(copy.tracking().status()).isEqualTo(EntryStatus.ADDED); + assertThat(copy.tracking().snapshotId()).isEqualTo(42L); + assertThat(copy.tracking().latestColumnFileSnapshotId()).isNull(); + assertThat(copy.partition()).isEqualTo(newPartition(7, "music")); + + // optional fields are null + assertThat(copy.specId()).isNull(); + assertThat(copy.sortOrderId()).isNull(); + assertThat(copy.deletionVector()).isNull(); + assertThat(copy.manifestInfo()).isNull(); + assertThat(copy.contentStats()).isNull(); + assertThat(copy.keyMetadata()).isNull(); + assertThat(copy.splitOffsets()).isNull(); + assertThat(copy.equalityIds()).isNull(); + assertThat(copy.columnFiles()).isNull(); + } + + @Test + void testCopyWithNullColumnFile() { + TrackedFileStruct file = createFullTrackedFile(); + file.set(COLUMN_FILES_ORDINAL, Arrays.asList(COLUMN_FILE_1, null, COLUMN_FILE_2)); + + TrackedFile copy = file.copy(); + + assertThat(copy.columnFiles()).hasSize(2); + verifyColumnFile(COLUMN_FILE_1, copy.columnFiles().get(0)); + verifyColumnFile(COLUMN_FILE_2, copy.columnFiles().get(1)); } @Test @@ -249,14 +325,18 @@ void testCopyIsDeep() { TrackedFile copy = file.copy(); - // keyMetadata should be a deep copy assertThat(copy.keyMetadata()).isNotSameAs(file.keyMetadata()); + assertThat(copy.columnFiles()).isNotSameAs(file.columnFiles()); + assertThat(copy.columnFiles()).hasSize(file.columnFiles().size()); + for (int i = 0; i < file.columnFiles().size(); ++i) { + assertThat(copy.columnFiles().get(i)).isNotSameAs(file.columnFiles().get(i)); + } } @Test void testStructLikeSize() { TrackedFileStruct file = new TrackedFileStruct(); - assertThat(file.size()).isEqualTo(16); + assertThat(file.size()).isEqualTo(17); } @Test @@ -274,6 +354,13 @@ void testStructLikeGetSet() { file.set(SORT_ORDER_ID_ORDINAL, 3); assertThat(file.get(SORT_ORDER_ID_ORDINAL, Integer.class)).isEqualTo(3); + + file.set(COLUMN_FILES_ORDINAL, ImmutableList.of(COLUMN_FILE_1, COLUMN_FILE_2)); + @SuppressWarnings("unchecked") + List roundTrippedColumnFiles = file.get(COLUMN_FILES_ORDINAL, List.class); + assertThat(roundTrippedColumnFiles).hasSize(2); + verifyColumnFile(COLUMN_FILE_1, roundTrippedColumnFiles.get(0)); + verifyColumnFile(COLUMN_FILE_2, roundTrippedColumnFiles.get(1)); } @Test @@ -316,6 +403,12 @@ void testContentStatsNullWhenNotSet() { assertThat(file.contentStats()).isNull(); } + @Test + void testColumnFilesNullWhenNotSet() { + TrackedFileStruct file = new TrackedFileStruct(); + assertThat(file.columnFiles()).isNull(); + } + @Test void testAllFileContentTypesSupported() { for (FileContent content : FileContent.values()) { @@ -346,7 +439,11 @@ void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException assertThat(deserialized.splitOffsets()).containsExactly(50L); assertThat(deserialized.tracking().manifestPos()).isEqualTo(3L); assertThat(deserialized.tracking().manifestLocation()).isEqualTo("s3://bucket/manifest.avro"); + assertThat(deserialized.tracking().latestColumnFileSnapshotId()).isEqualTo(42L); assertThat(deserialized.partition()).isEqualTo(newPartition(7, "music")); + assertThat(deserialized.columnFiles()).hasSize(2); + verifyColumnFile(COLUMN_FILE_1, deserialized.columnFiles().get(0)); + verifyColumnFile(COLUMN_FILE_2, deserialized.columnFiles().get(1)); } @Test @@ -370,7 +467,11 @@ void testKryoSerializationRoundTrip() throws IOException { assertThat(deserialized.splitOffsets()).containsExactly(50L); assertThat(deserialized.tracking().manifestPos()).isEqualTo(3L); assertThat(deserialized.tracking().manifestLocation()).isEqualTo("s3://bucket/manifest.avro"); + assertThat(deserialized.tracking().latestColumnFileSnapshotId()).isEqualTo(42L); assertThat(deserialized.partition()).isEqualTo(newPartition(7, "music")); + assertThat(deserialized.columnFiles()).hasSize(2); + verifyColumnFile(COLUMN_FILE_1, deserialized.columnFiles().get(0)); + verifyColumnFile(COLUMN_FILE_2, deserialized.columnFiles().get(1)); } static TrackedFileStruct createFullTrackedFile() { @@ -396,6 +497,7 @@ static TrackedFileStruct createFullTrackedFile() { .deletionVector(dv) .keyMetadata(ByteBuffer.wrap(new byte[] {1, 2, 3})) .splitOffsets(ImmutableList.of(50L)) + .columnFiles(ImmutableList.of(COLUMN_FILE_1, COLUMN_FILE_2)) .build(); TrackingStruct tracking = (TrackingStruct) file.tracking(); @@ -470,6 +572,13 @@ static TrackedFileStruct createTrackedFileWithStats() { .fileSizeInBytes(1024L) .specId(0) .contentStats(stats) + .columnFiles(ImmutableList.of(COLUMN_FILE_1, COLUMN_FILE_2)) .build(); } + + private static void verifyColumnFile(ColumnFile expected, ColumnFile actual) { + assertThat(actual.fieldIds()).containsExactlyElementsOf(expected.fieldIds()); + assertThat(actual.location()).isEqualTo(expected.location()); + assertThat(actual.fileSizeInBytes()).isEqualTo(expected.fileSizeInBytes()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java b/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java index 0d1803022a23..cc6798f34d77 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java @@ -50,6 +50,8 @@ class TestTrackingStruct { TRACKING_FIELDS.indexOf(Tracking.DELETED_POSITIONS); private static final int REPLACED_POSITIONS_ORDINAL = TRACKING_FIELDS.indexOf(Tracking.REPLACED_POSITIONS); + private static final int LATEST_COLUMN_FILE_SNAPSHOT_ID_ORDINAL = + TRACKING_FIELDS.indexOf(Tracking.LATEST_COLUMN_FILE_SNAPSHOT_ID); @Test void testFieldAccess() { @@ -61,6 +63,7 @@ void testFieldAccess() { tracking.set(FILE_SEQUENCE_NUMBER_ORDINAL, 11L); tracking.set(DV_SNAPSHOT_ID_ORDINAL, 43L); tracking.set(FIRST_ROW_ID_ORDINAL, 1000L); + tracking.set(LATEST_COLUMN_FILE_SNAPSHOT_ID_ORDINAL, 15L); assertThat(tracking.status()).isEqualTo(EntryStatus.ADDED); assertThat(tracking.snapshotId()).isEqualTo(42L); @@ -70,6 +73,7 @@ void testFieldAccess() { assertThat(tracking.firstRowId()).isEqualTo(1000L); assertThat(tracking.deletedPositions()).isNull(); assertThat(tracking.replacedPositions()).isNull(); + assertThat(tracking.latestColumnFileSnapshotId()).isEqualTo(15L); } @Test @@ -78,6 +82,7 @@ void testCopy() { TrackingBuilder.from(manifestSourceTracking(), 1L) .deletedPositions(ByteBuffer.wrap(new byte[] {1, 2})) .replacedPositions(ByteBuffer.wrap(new byte[] {3, 4})) + .columnFilesUpdated() .build(); Tracking copy = tracking.copy(); @@ -90,6 +95,7 @@ void testCopy() { assertThat(copy.firstRowId()).isEqualTo(tracking.firstRowId()); assertThat(copy.deletedPositions()).isEqualTo(tracking.deletedPositions()); assertThat(copy.replacedPositions()).isEqualTo(tracking.replacedPositions()); + assertThat(copy.latestColumnFileSnapshotId()).isEqualTo(1L); // verify deep copy of ByteBuffer backing arrays assertThat(copy.deletedPositions().array()).isNotSameAs(tracking.deletedPositions().array()); @@ -142,7 +148,7 @@ void testDoNotInheritSequenceNumberForExistingEntries() { } @Test - void testDoNotInheritSequenceNumberForModifiedEntries() { + void testDoNotInheritSequenceNumberForModifiedIfAlreadySet() { TrackingStruct tracking = new TrackingStruct(Tracking.schema()); tracking.set(STATUS_ORDINAL, EntryStatus.MODIFIED.id()); tracking.set(DATA_SEQUENCE_NUMBER_ORDINAL, 5L); @@ -155,6 +161,24 @@ void testDoNotInheritSequenceNumberForModifiedEntries() { assertThat(tracking.fileSequenceNumber()).isEqualTo(6L); } + @Test + void testInheritDataSequenceNumberAfterColumnFilesChange() { + TrackingStruct source = new TrackingStruct(Tracking.schema()); + source.set(STATUS_ORDINAL, EntryStatus.MODIFIED.id()); + source.set(DATA_SEQUENCE_NUMBER_ORDINAL, 5L); + source.set(FILE_SEQUENCE_NUMBER_ORDINAL, 6L); + + Tracking tracking = TrackingBuilder.from(source, 10L).columnFilesUpdated().build(); + + assertThat(tracking.dataSequenceNumber()).isNull(); + assertThat(tracking.fileSequenceNumber()).isEqualTo(6); + + ((TrackingStruct) tracking).inheritFrom(createManifestTracking(100L, 60L)); + + assertThat(tracking.dataSequenceNumber()).isEqualTo(60L); + assertThat(tracking.fileSequenceNumber()).isEqualTo(6); + } + @Test void testExplicitValuesOverrideInheritance() { TrackingStruct tracking = new TrackingStruct(Tracking.schema()); @@ -222,13 +246,14 @@ private static Tracking createManifestTracking(long snapshotId, long sequenceNum @Test void testAddedWithSameCommitDvStaysAdded() { - Tracking tracking = TrackingBuilder.added(42L).dvUpdated().build(); + Tracking tracking = TrackingBuilder.added(42L).dvUpdated().columnFilesUpdated().build(); assertThat(tracking.status()).isEqualTo(EntryStatus.ADDED); assertThat(tracking.snapshotId()).isEqualTo(42L); assertThat(tracking.dvSnapshotId()).isEqualTo(42L); assertThat(tracking.deletedPositions()).isNull(); assertThat(tracking.replacedPositions()).isNull(); + assertThat(tracking.latestColumnFileSnapshotId()).isEqualTo(42L); // sequence numbers and firstRowId remain null; populated by inheritance assertThat(tracking.dataSequenceNumber()).isNull(); assertThat(tracking.fileSequenceNumber()).isNull(); @@ -247,6 +272,8 @@ void testExistingBuilderPreservesSourceFields() { assertThat(existing.fileSequenceNumber()).isEqualTo(source.fileSequenceNumber()); assertThat(existing.dvSnapshotId()).isEqualTo(source.dvSnapshotId()); assertThat(existing.firstRowId()).isEqualTo(source.firstRowId()); + assertThat(existing.latestColumnFileSnapshotId()) + .isEqualTo(source.latestColumnFileSnapshotId()); } @Test @@ -261,6 +288,7 @@ void testDeleteUpdatesSnapshotIdAndPreservesRest() { assertThat(deleted.fileSequenceNumber()).isEqualTo(source.fileSequenceNumber()); assertThat(deleted.dvSnapshotId()).isEqualTo(source.dvSnapshotId()); assertThat(deleted.firstRowId()).isEqualTo(source.firstRowId()); + assertThat(deleted.latestColumnFileSnapshotId()).isEqualTo(source.latestColumnFileSnapshotId()); } @Test @@ -275,6 +303,8 @@ void testReplaceUpdatesSnapshotIdAndPreservesRest() { assertThat(replaced.fileSequenceNumber()).isEqualTo(source.fileSequenceNumber()); assertThat(replaced.dvSnapshotId()).isEqualTo(source.dvSnapshotId()); assertThat(replaced.firstRowId()).isEqualTo(source.firstRowId()); + assertThat(replaced.latestColumnFileSnapshotId()) + .isEqualTo(source.latestColumnFileSnapshotId()); } @Test @@ -308,6 +338,12 @@ void testDvUpdatedProducesModifiedAndAdvancesDvSnapshotId() { assertThat(modified.dvSnapshotId()).isEqualTo(999L); } + @Test + void testExistingBuilderAllowsColumnFileMutation() { + Tracking existing = TrackingBuilder.from(sourceTracking(), 900L).columnFilesUpdated().build(); + assertThat(existing.latestColumnFileSnapshotId()).isEqualTo(900L); + } + @Test void testManifestDVMutatorsRejectedOnAdded() { assertThatThrownBy( @@ -463,6 +499,60 @@ void testManifestDVPositionsProduceModified() { assertThat(modified.deletedPositions()).isEqualTo(deletedBytes); } + @Test + void testManifestPositionsWithColumnFilesUpdated() { + ByteBuffer deletedBytes = ByteBuffer.wrap(new byte[] {1}); + Tracking withDeletedPositions = + TrackingBuilder.from(manifestSourceTracking(), 999L) + .columnFilesUpdated() + .deletedPositions(deletedBytes) + .build(); + + assertThat(withDeletedPositions.status()).isEqualTo(EntryStatus.MODIFIED); + assertThat(withDeletedPositions.latestColumnFileSnapshotId()).isEqualTo(999L); + assertThat(withDeletedPositions.dvSnapshotId()).isEqualTo(999L); + assertThat(withDeletedPositions.deletedPositions()).isEqualTo(deletedBytes); + + ByteBuffer replacedBytes = ByteBuffer.wrap(new byte[] {2}); + Tracking withReplacedPositions = + TrackingBuilder.from(manifestSourceTracking(), 999L) + .columnFilesUpdated() + .replacedPositions(replacedBytes) + .build(); + + assertThat(withReplacedPositions.status()).isEqualTo(EntryStatus.MODIFIED); + assertThat(withReplacedPositions.latestColumnFileSnapshotId()).isEqualTo(999L); + assertThat(withReplacedPositions.dvSnapshotId()).isEqualTo(999L); + assertThat(withReplacedPositions.replacedPositions()).isEqualTo(replacedBytes); + } + + @Test + void testColumnFilesUpdatedWithManifestPositions() { + ByteBuffer deletedBytes = ByteBuffer.wrap(new byte[] {1}); + Tracking withDeletedPositions = + TrackingBuilder.from(manifestSourceTracking(), 999L) + .deletedPositions(deletedBytes) + .columnFilesUpdated() + .build(); + + assertThat(withDeletedPositions.status()).isEqualTo(EntryStatus.MODIFIED); + assertThat(withDeletedPositions.latestColumnFileSnapshotId()).isEqualTo(999L); + assertThat(withDeletedPositions.dvSnapshotId()).isEqualTo(999L); + assertThat(withDeletedPositions.deletedPositions()).isEqualTo(deletedBytes); + + ByteBuffer replacedBytes = ByteBuffer.wrap(new byte[] {2}); + Tracking withReplacedPositions = + TrackingBuilder.from(manifestSourceTracking(), 999L) + .replacedPositions(replacedBytes) + .columnFilesUpdated() + .build(); + + assertThat(withReplacedPositions.status()).isEqualTo(EntryStatus.MODIFIED); + assertThat(withReplacedPositions.latestColumnFileSnapshotId()).isEqualTo(999L); + assertThat(withReplacedPositions.dvSnapshotId()).isEqualTo(999L); + assertThat(withReplacedPositions.replacedPositions()).isEqualTo(replacedBytes); + } + @Test void testIsLiveDelegatesToStatus() { assertThat(sourceTrackingWithStatus(EntryStatus.ADDED).isLive()).isTrue(); @@ -480,6 +570,7 @@ void testInternalSetIgnoresUnknownOrdinal() { tracking.set(FIRST_ROW_ID_ORDINAL, 1000L); tracking.set(DELETED_POSITIONS_ORDINAL, ByteBuffer.wrap(new byte[] {1, 2})); tracking.set(REPLACED_POSITIONS_ORDINAL, ByteBuffer.wrap(new byte[] {3, 4})); + tracking.set(LATEST_COLUMN_FILE_SNAPSHOT_ID_ORDINAL, 49L); // unknown ordinals from a newer format version are silently ignored tracking.internalSet(99, "value from a newer format"); @@ -493,6 +584,7 @@ void testInternalSetIgnoresUnknownOrdinal() { assertThat(tracking.firstRowId()).isEqualTo(1000L); assertThat(tracking.deletedPositions()).isEqualTo(ByteBuffer.wrap(new byte[] {1, 2})); assertThat(tracking.replacedPositions()).isEqualTo(ByteBuffer.wrap(new byte[] {3, 4})); + assertThat(tracking.latestColumnFileSnapshotId()).isEqualTo(49L); } @Test @@ -516,7 +608,7 @@ void testProjectedStructLike() { @Test void testJavaSerializationRoundTripForDataFile() throws IOException, ClassNotFoundException { - Tracking tracking = TrackingBuilder.added(42L).dvUpdated().build(); + Tracking tracking = TrackingBuilder.added(42L).dvUpdated().columnFilesUpdated().build(); Tracking deserialized = TestHelpers.roundTripSerialize(tracking); @@ -525,6 +617,7 @@ void testJavaSerializationRoundTripForDataFile() throws IOException, ClassNotFou assertThat(deserialized.dvSnapshotId()).isEqualTo(42L); assertThat(deserialized.deletedPositions()).isNull(); assertThat(deserialized.replacedPositions()).isNull(); + assertThat(deserialized.latestColumnFileSnapshotId()).isEqualTo(42L); } @Test @@ -533,6 +626,7 @@ void testJavaSerializationRoundTripForManifest() throws IOException, ClassNotFou TrackingBuilder.from(manifestSourceTracking(), 1L) .deletedPositions(ByteBuffer.wrap(new byte[] {1, 2})) .replacedPositions(ByteBuffer.wrap(new byte[] {3, 4})) + .columnFilesUpdated() .build(); Tracking deserialized = TestHelpers.roundTripSerialize(tracking); @@ -541,11 +635,12 @@ void testJavaSerializationRoundTripForManifest() throws IOException, ClassNotFou assertThat(deserialized.dvSnapshotId()).isEqualTo(1L); assertThat(deserialized.deletedPositions()).isEqualTo(ByteBuffer.wrap(new byte[] {1, 2})); assertThat(deserialized.replacedPositions()).isEqualTo(ByteBuffer.wrap(new byte[] {3, 4})); + assertThat(deserialized.latestColumnFileSnapshotId()).isEqualTo(1L); } @Test void testKryoSerializationRoundTripForDataFile() throws IOException { - Tracking tracking = TrackingBuilder.added(42L).dvUpdated().build(); + Tracking tracking = TrackingBuilder.added(42L).dvUpdated().columnFilesUpdated().build(); Tracking deserialized = TestHelpers.KryoHelpers.roundTripSerialize(tracking); @@ -554,6 +649,7 @@ void testKryoSerializationRoundTripForDataFile() throws IOException { assertThat(deserialized.dvSnapshotId()).isEqualTo(42L); assertThat(deserialized.deletedPositions()).isNull(); assertThat(deserialized.replacedPositions()).isNull(); + assertThat(deserialized.latestColumnFileSnapshotId()).isEqualTo(42L); } @Test @@ -562,6 +658,7 @@ void testKryoSerializationRoundTripForManifest() throws IOException { TrackingBuilder.from(manifestSourceTracking(), 1L) .deletedPositions(ByteBuffer.wrap(new byte[] {1, 2})) .replacedPositions(ByteBuffer.wrap(new byte[] {3, 4})) + .columnFilesUpdated() .build(); Tracking deserialized = TestHelpers.KryoHelpers.roundTripSerialize(tracking); @@ -570,6 +667,7 @@ void testKryoSerializationRoundTripForManifest() throws IOException { assertThat(deserialized.dvSnapshotId()).isEqualTo(1L); assertThat(deserialized.deletedPositions()).isEqualTo(ByteBuffer.wrap(new byte[] {1, 2})); assertThat(deserialized.replacedPositions()).isEqualTo(ByteBuffer.wrap(new byte[] {3, 4})); + assertThat(deserialized.latestColumnFileSnapshotId()).isEqualTo(1L); } private static TrackingStruct sourceTracking() {