diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 2f732aef427f..6ada3012e3a0 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -29,6 +29,12 @@ public interface ManifestFile { int PARTITION_SUMMARIES_ELEMENT_ID = 508; + /** Value of {@link #formatVersion()} for v4+ manifest files. */ + int V4_FORMAT_VERSION = 4; + + /** Value of {@link #formatVersion()} for pre-v4 manifest files (v1, v2, and v3). */ + int LEGACY_FORMAT_VERSION = 0; + Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get(), "Location URI with FS scheme"); Types.NestedField LENGTH = @@ -186,6 +192,26 @@ default boolean hasDeletedFiles() { /** Returns the total number of rows in all files with status DELETED in the manifest file. */ Long deletedRowsCount(); + /** + * Returns the number of files with status REPLACED in the manifest file, or null if not tracked. + * + *

REPLACED files are the prior-state entries of v4 REPLACED/MODIFIED pairs and are not live. + * Returns null for manifest files written by pre-v4 writers. + */ + default Integer replacedFilesCount() { + return null; + } + + /** + * Returns the total number of rows in all files with status REPLACED in the manifest file, or + * null if not tracked. + * + *

Returns null for manifest files written by pre-v4 writers. + */ + default Long replacedRowsCount() { + return null; + } + /** * Returns a list of {@link PartitionFieldSummary partition field summaries}. * @@ -210,6 +236,19 @@ default Long firstRowId() { return null; } + /** Returns the number of records in the manifest file, or {@code null} for pre-v4 manifests. */ + default Long recordCount() { + return null; + } + + /** + * Returns the format version of the manifest file, or {@link #LEGACY_FORMAT_VERSION} for pre-v4 + * manifests. + */ + default int formatVersion() { + return LEGACY_FORMAT_VERSION; + } + /** * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use * this method to make defensive copies. diff --git a/core/src/main/java/org/apache/iceberg/ContentEntryAdapters.java b/core/src/main/java/org/apache/iceberg/ContentEntryAdapters.java new file mode 100644 index 000000000000..41a716176841 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ContentEntryAdapters.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; + +/** + * Builds {@link TrackedFile} instances for v4+ content_entry rows from legacy {@link ManifestEntry} + * and {@link ManifestFile} inputs. + */ +class ContentEntryAdapters { + // Cache of primitive-field types keyed by table schema. The originalTypes map a Metrics + // instance carries is read-only inside MetricsUtil.fromMetrics, so a single per-schema map + // can be shared across every adapter call for a writer's lifetime instead of being rebuilt + // per row. WeakHashMap keys release when callers stop referencing the schema. + private static final Map> PRIMITIVE_TYPES_BY_SCHEMA = + Collections.synchronizedMap(new WeakHashMap<>()); + + private ContentEntryAdapters() {} + + static TrackedFile fromDataFile( + ManifestEntry entry, + Schema tableSchema, + Types.StructType unionPartitionType, + EntryStatus statusOverride) { + Preconditions.checkArgument(entry != null, "Invalid manifest entry: null"); + DataFile file = entry.file(); + Preconditions.checkArgument(file != null, "Invalid data file: null"); + Preconditions.checkArgument( + file.content() == FileContent.DATA, "Invalid content for data file: %s", file.content()); + return buildContentFileEntry( + file, + statusOverride, + entry.snapshotId(), + entry.dataSequenceNumber(), + entry.fileSequenceNumber(), + tableSchema, + unionPartitionType); + } + + static TrackedFile fromDeleteFile( + ManifestEntry entry, + Schema tableSchema, + Types.StructType unionPartitionType, + EntryStatus statusOverride) { + Preconditions.checkArgument(entry != null, "Invalid manifest entry: null"); + DeleteFile file = entry.file(); + Preconditions.checkArgument(file != null, "Invalid delete file: null"); + // v4+ leaf delete manifests must only contain content_type=EQUALITY_DELETES (per spec PR + // #16025). POSITION_DELETES has no v4+ leaf representation: + // - v3 delete vectors (POSITION_DELETES stored as a Puffin blob) must be colocated on the + // data file's content_entry via TrackedFileBuilder.deletionVector(...) — see + // MergingSnapshotProducer's row-delta path. + // - v2 standalone position delete files (POSITION_DELETES stored in Parquet/Avro/ORC) have + // no v4+ representation; they can only live in pre-v4 legacy manifests carried over via a + // format_version=0 manifest reference. + Preconditions.checkArgument( + file.content() == FileContent.EQUALITY_DELETES, + "Invalid content for delete file: %s", + file.content()); + return buildContentFileEntry( + file, + statusOverride, + entry.snapshotId(), + entry.dataSequenceNumber(), + entry.fileSequenceNumber(), + tableSchema, + unionPartitionType); + } + + /** + * Builds a manifest reference content_entry for the v4+ root manifest. + * + *

The on-disk {@code format_version} field is sourced from {@link + * ManifestFile#formatVersion()} (0 for pre-v4 leaves carried over during a v3-to-v4 upgrade, 4 + * for v4+ leaves). For v4+ leaves the source manifest must carry a non-null {@link + * ManifestFile#recordCount()} populated by the v4+ writer or root-manifest reader; for pre-v4 + * leaves the record_count is summed from the per-status file counts. + * + * @param status entry status (typically ADDED for a newly written leaf, EXISTING for a + * carried-over reference) + * @param firstRowId the resolved first-row-id to write for this reference, or null for delete + * manifests. Callers are responsible for resolving the value (either carrying over {@link + * ManifestFile#firstRowId()} or assigning from a writer-side counter); the adapter does not + * decide between the two. + */ + static TrackedFile fromManifestFile(ManifestFile manifest, EntryStatus status, Long firstRowId) { + Preconditions.checkArgument(manifest != null, "Invalid manifest file: null"); + Preconditions.checkArgument(status != null, "Invalid status: null"); + int formatVersion = manifest.formatVersion(); + Preconditions.checkArgument( + formatVersion == ManifestFile.LEGACY_FORMAT_VERSION + || formatVersion >= ManifestFile.V4_FORMAT_VERSION, + "Invalid manifest format_version: %s (must be %s for pre-v4 or >= %s for v4+)", + formatVersion, + ManifestFile.LEGACY_FORMAT_VERSION, + ManifestFile.V4_FORMAT_VERSION); + Long manifestSnapshotId = manifest.snapshotId(); + Preconditions.checkArgument(manifestSnapshotId != null, "Invalid manifest snapshot id: null"); + Preconditions.checkArgument( + firstRowId == null || manifest.content() == ManifestContent.DATA, + "firstRowId is only valid for DATA manifests, but content is %s", + manifest.content()); + + long manifestSeq = manifest.sequenceNumber(); + Preconditions.checkArgument( + manifestSeq != ManifestWriter.UNASSIGNED_SEQ, + "Cannot build content_entry for manifest reference %s: sequence_number is unassigned. " + + "Resolve via RootManifestWriter.assignSequenceNumber before writing.", + manifest.path()); + ManifestInfo info = manifestInfo(manifest); + + FileContent contentType = + manifest.content() == ManifestContent.DATA + ? FileContent.DATA_MANIFEST + : FileContent.DELETE_MANIFEST; + long recordCount = resolveRecordCount(manifest); + + TrackedFileBuilder builder = + TrackedFileBuilder.explicitTracking( + contentType, + TrackingBuilder.forContentEntry( + status, manifestSnapshotId, manifestSeq, manifestSeq, firstRowId)); + builder + .formatVersion(formatVersion) + .location(manifest.path()) + .fileFormat(FileFormat.fromFileName(manifest.path())) + .recordCount(recordCount) + .fileSizeInBytes(manifest.length()) + .specId(manifest.partitionSpecId()) + .manifestInfo(info); + + if (manifest.keyMetadata() != null) { + builder.keyMetadata(manifest.keyMetadata()); + } + + return builder.build(); + } + + private static TrackedFile buildContentFileEntry( + ContentFile file, + EntryStatus status, + long snapshotId, + Long dataSequenceNumber, + Long fileSequenceNumber, + Schema tableSchema, + Types.StructType unionPartitionType) { + Preconditions.checkArgument(status != null, "Invalid status: null"); + // fromDataFile / fromDeleteFile project legacy ManifestEntry rows, whose status is ADDED, + // EXISTING, or DELETED. MODIFIED and REPLACED have no legacy representation — they're written + // directly by V4Writer.prepareWithStatus via TrackedFileBuilder.from(source, sid). + // deletionVector(dv).build() (MODIFIED) and TrackedFileBuilder.replaced(source, sid) + // (REPLACED). + Preconditions.checkArgument( + status == EntryStatus.ADDED + || status == EntryStatus.EXISTING + || status == EntryStatus.DELETED, + "Unsupported status for content file entry: %s (use V4Writer.prepareWithStatus for " + + "MODIFIED/REPLACED transitions)", + status); + PartitionData partition = toPartitionData(file, unionPartitionType); + FileFormat format = file.format(); + Preconditions.checkArgument( + format != null, "Invalid file format: null for %s", file.location()); + ContentStats stats = MetricsUtil.fromMetrics(tableSchema, toMetrics(file, tableSchema)); + boolean isDataFile = file.content() == FileContent.DATA; + + TrackedFileBuilder builder; + if (status == EntryStatus.ADDED) { + builder = + isDataFile + ? TrackedFileBuilder.data(snapshotId) + : TrackedFileBuilder.equalityDelete(snapshotId); + } else { + Preconditions.checkArgument( + dataSequenceNumber != null, "Invalid data sequence number: null for non-ADDED entry"); + Preconditions.checkArgument( + fileSequenceNumber != null, "Invalid file sequence number: null for non-ADDED entry"); + Long firstRowId = isDataFile ? ((DataFile) file).firstRowId() : null; + FileContent contentType = isDataFile ? FileContent.DATA : FileContent.EQUALITY_DELETES; + builder = + TrackedFileBuilder.explicitTracking( + contentType, + TrackingBuilder.forContentEntry( + status, snapshotId, dataSequenceNumber, fileSequenceNumber, firstRowId)); + } + + builder + .formatVersion(ManifestFile.V4_FORMAT_VERSION) + .location(file.location()) + .fileFormat(format) + .partition(partition) + .recordCount(file.recordCount()) + .fileSizeInBytes(file.fileSizeInBytes()) + .specId(file.specId()); + + populateOptionalFields(builder, file, stats, isDataFile); + + return builder.build(); + } + + private static void populateOptionalFields( + TrackedFileBuilder builder, ContentFile file, ContentStats stats, boolean isDataFile) { + if (stats != null) { + builder.contentStats(stats); + } + + if (file.sortOrderId() != null && isDataFile) { + builder.sortOrderId(file.sortOrderId()); + } + + if (file.keyMetadata() != null) { + builder.keyMetadata(file.keyMetadata()); + } + + if (file.splitOffsets() != null) { + builder.splitOffsets(file.splitOffsets()); + } + + if (!isDataFile && file.equalityFieldIds() != null) { + builder.equalityIds(file.equalityFieldIds()); + } + } + + /** + * Returns the record_count for a manifest-reference content_entry row. For v4+ manifests this is + * the persisted record_count populated by the writer or reader; for pre-v4 manifests it is summed + * from the per-status file counts. REPLACED is not summed because it has no pre-v4 + * representation. + */ + private static long resolveRecordCount(ManifestFile manifest) { + if (manifest.formatVersion() >= ManifestFile.V4_FORMAT_VERSION) { + Long persisted = manifest.recordCount(); + Preconditions.checkArgument( + persisted != null, + "Invalid v4 manifest reference for %s: record_count must be set by the writer", + manifest.path()); + return persisted; + } + + long total = 0L; + if (manifest.addedFilesCount() != null) { + total += manifest.addedFilesCount(); + } + + if (manifest.existingFilesCount() != null) { + total += manifest.existingFilesCount(); + } + + if (manifest.deletedFilesCount() != null) { + total += manifest.deletedFilesCount(); + } + + return total; + } + + private static ManifestInfo manifestInfo(ManifestFile manifest) { + long minSeq = manifest.minSequenceNumber(); + Preconditions.checkArgument( + minSeq != ManifestWriter.UNASSIGNED_SEQ, + "Cannot build manifest_info for %s: min_sequence_number is unassigned. " + + "Resolve via RootManifestWriter.assignSequenceNumber before writing.", + manifest.path()); + + return ManifestInfoStruct.builder() + .addedFilesCount(zeroIfNull(manifest.addedFilesCount())) + .existingFilesCount(zeroIfNull(manifest.existingFilesCount())) + .deletedFilesCount(zeroIfNull(manifest.deletedFilesCount())) + .replacedFilesCount(zeroIfNull(manifest.replacedFilesCount())) + .addedRowsCount(zeroIfNull(manifest.addedRowsCount())) + .existingRowsCount(zeroIfNull(manifest.existingRowsCount())) + .deletedRowsCount(zeroIfNull(manifest.deletedRowsCount())) + .replacedRowsCount(zeroIfNull(manifest.replacedRowsCount())) + .minSequenceNumber(minSeq) + .build(); + } + + private static int zeroIfNull(Integer value) { + return value != null ? value : 0; + } + + private static long zeroIfNull(Long value) { + return value != null ? value : 0L; + } + + /** + * Projects the file's per-spec partition tuple into the unified partition schema (union of all + * live specs) used by every v4+ content_entry row. Result fields are sourced by field ID via + * {@link StructProjection#createAllowMissing}; fields not present in the writer spec land as + * null. + */ + private static PartitionData toPartitionData( + ContentFile file, Types.StructType unionPartitionType) { + Preconditions.checkArgument( + unionPartitionType != null, "Invalid union partition type: null for %s", file.location()); + PartitionData result = new PartitionData(unionPartitionType); + StructLike partition = file.partition(); + if (partition == null) { + return result; + } + + Types.StructType sourceType; + if (partition instanceof PartitionData) { + sourceType = ((PartitionData) partition).getPartitionType(); + } else { + // Without a backing PartitionData the partition's element types are unavailable, so an empty + // struct is the only safe materialization. Reject any non-empty case rather than silently + // dropping fields. + Preconditions.checkArgument( + partition.size() == 0, + "Cannot convert partition for %s: type information is unavailable for %s", + file.location(), + partition); + return result; + } + + if (sourceType.fields().isEmpty()) { + return result; + } + + // Build a per-call StructProjection (not cached: the writer spec varies across files in + // multi-spec carry-over flows). Field-ID lookup means partition values land in the correct + // union-schema position regardless of the writer spec's ordering. + StructProjection projection = + StructProjection.createAllowMissing(sourceType, unionPartitionType); + projection.wrap(partition); + for (int pos = 0; pos < unionPartitionType.fields().size(); pos += 1) { + result.set(pos, projection.get(pos, Object.class)); + } + return result; + } + + private static Metrics toMetrics(ContentFile file, Schema tableSchema) { + Map lowerBounds = file.lowerBounds(); + Map upperBounds = file.upperBounds(); + boolean hasBounds = + (lowerBounds != null && !lowerBounds.isEmpty()) + || (upperBounds != null && !upperBounds.isEmpty()); + Map originalTypes = hasBounds ? primitiveTypesFor(tableSchema) : null; + + return new Metrics( + file.recordCount(), + file.columnSizes(), + file.valueCounts(), + file.nullValueCounts(), + file.nanValueCounts(), + lowerBounds, + upperBounds, + originalTypes); + } + + private static Map primitiveTypesFor(Schema schema) { + if (schema == null) { + return null; + } + + Map cached = PRIMITIVE_TYPES_BY_SCHEMA.get(schema); + if (cached != null) { + return cached; + } + + Map types = Maps.newHashMap(); + for (Types.NestedField field : schema.columns()) { + collectPrimitiveTypes(field, types); + } + + Map result = Collections.unmodifiableMap(types); + PRIMITIVE_TYPES_BY_SCHEMA.put(schema, result); + return result; + } + + private static void collectPrimitiveTypes(Types.NestedField field, Map types) { + Type type = field.type(); + if (type.isPrimitiveType()) { + types.put(field.fieldId(), type); + return; + } + + if (type.isStructType()) { + for (Types.NestedField nested : type.asStructType().fields()) { + collectPrimitiveTypes(nested, types); + } + return; + } + + if (type.isListType()) { + collectPrimitiveTypes(type.asListType().fields().get(0), types); + return; + } + + if (type.isMapType()) { + for (Types.NestedField nested : type.asMapType().fields()) { + collectPrimitiveTypes(nested, types); + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 9624484ffe0c..d288835a9438 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -60,6 +60,16 @@ public class GenericManifestFile extends SupportsIndexProjection private PartitionFieldSummary[] partitions = null; private byte[] keyMetadata = null; private Long firstRowId = null; + // v4+: total number of entries persisted at the root-manifest content_entry level. Null for + // pre-v4 manifest list entries where the count is not directly tracked at the manifest reference + // level. Populated via the v4+ constructor variant by V4Writer.toManifestFile and by + // RootManifestReader when projecting a content_entry row. + private Long recordCount = null; + // v4+: format version of the manifest file. LEGACY_FORMAT_VERSION (0) for pre-v4 manifests; + // V4_FORMAT_VERSION (4) for v4+ manifests. Populated via the v4+ constructor variant by v4+ + // writers/readers; pre-v4 manifest list serialization does not carry this field, so it stays at + // the default for any manifest read from a v1-v3 manifest list. + private int formatVersion = LEGACY_FORMAT_VERSION; /** Used by Avro reflection to instantiate this class when reading manifest files. */ public GenericManifestFile(Schema avroSchema) { @@ -112,6 +122,51 @@ public GenericManifestFile(Schema avroSchema) { Integer deletedFilesCount, Long deletedRowsCount, Long firstRowId) { + this( + path, + length, + specId, + content, + sequenceNumber, + minSequenceNumber, + snapshotId, + partitions, + keyMetadata, + addedFilesCount, + addedRowsCount, + existingFilesCount, + existingRowsCount, + deletedFilesCount, + deletedRowsCount, + firstRowId, + null /* recordCount */, + LEGACY_FORMAT_VERSION); + } + + /** + * v4+ constructor variant that accepts the v4+ {@code recordCount} and {@code formatVersion} + * fields. Pre-v4 callers should use the constructor without these parameters (which defaults + * recordCount to null and formatVersion to {@link #LEGACY_FORMAT_VERSION}). + */ + GenericManifestFile( + String path, + long length, + int specId, + ManifestContent content, + long sequenceNumber, + long minSequenceNumber, + Long snapshotId, + List partitions, + ByteBuffer keyMetadata, + Integer addedFilesCount, + Long addedRowsCount, + Integer existingFilesCount, + Long existingRowsCount, + Integer deletedFilesCount, + Long deletedRowsCount, + Long firstRowId, + Long recordCount, + int formatVersion) { super(ManifestFile.schema().columns().size()); this.avroSchema = AVRO_SCHEMA; this.manifestPath = path; @@ -130,6 +185,8 @@ public GenericManifestFile(Schema avroSchema) { this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); this.firstRowId = firstRowId; + this.recordCount = recordCount; + this.formatVersion = formatVersion; } /** @@ -172,6 +229,8 @@ private GenericManifestFile(GenericManifestFile toCopy) { ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); this.firstRowId = toCopy.firstRowId; + this.recordCount = toCopy.recordCount; + this.formatVersion = toCopy.formatVersion; } /** Constructor for Java serialization. */ @@ -272,6 +331,16 @@ public Long firstRowId() { return firstRowId; } + @Override + public Long recordCount() { + return recordCount; + } + + @Override + public int formatVersion() { + return formatVersion; + } + @Override public int size() { return ManifestFile.schema().columns().size(); diff --git a/core/src/main/java/org/apache/iceberg/TrackedFile.java b/core/src/main/java/org/apache/iceberg/TrackedFile.java index 5b9cdd9ab46c..aaab0ef1081f 100644 --- a/core/src/main/java/org/apache/iceberg/TrackedFile.java +++ b/core/src/main/java/org/apache/iceberg/TrackedFile.java @@ -106,7 +106,7 @@ static Types.StructType schemaWithContentStats( RECORD_COUNT, FILE_SIZE_IN_BYTES, SPEC_ID, - Types.NestedField.required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC), + Types.NestedField.optional(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC), Types.NestedField.optional( CONTENT_STATS_ID, CONTENT_STATS_NAME, contentStatsType, CONTENT_STATS_DOC), SORT_ORDER_ID, diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java b/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java index d7aa28c3290f..62f0f3ae2ecd 100644 --- a/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java +++ b/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java @@ -23,8 +23,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; class TrackedFileBuilder { - private final long snapshotId; private final FileContent contentType; + private final TrackingBuilder trackingBuilder; // Required fields private Integer formatVersion = null; @@ -44,8 +44,7 @@ class TrackedFileBuilder { private List splitOffsets = null; private List equalityIds = null; - // tracking-related fields - private Tracking sourceTracking = null; + // tracking-related deferred mutators applied at build() time private boolean dvUpdated = false; private ByteBuffer deletedPositions = null; private ByteBuffer replacedPositions = null; @@ -97,6 +96,18 @@ static TrackedFileBuilder from(TrackedFile source, long newSnapshotId) { return new TrackedFileBuilder(source, newSnapshotId); } + /** + * Creates a builder with an externally-constructed TrackingBuilder. Use for content_entry rows + * whose tracking values come from outside the writer (v4+ manifest references, projections from + * legacy ManifestEntry). + */ + static TrackedFileBuilder explicitTracking( + FileContent contentType, TrackingBuilder trackingBuilder) { + Preconditions.checkArgument(contentType != null, "Invalid content type: null"); + Preconditions.checkArgument(trackingBuilder != null, "Invalid tracking builder: null"); + return new TrackedFileBuilder(contentType, trackingBuilder); + } + /** * Returns a DELETED tracked file derived from {@code source}. * @@ -147,12 +158,11 @@ private static TrackedFile terminal(TrackedFile source, Tracking tracking) { private TrackedFileBuilder(FileContent contentType, long snapshotId) { this.contentType = contentType; - this.snapshotId = snapshotId; + this.trackingBuilder = TrackingBuilder.added(snapshotId); } private TrackedFileBuilder(TrackedFile source, long snapshotId) { this.contentType = source.contentType(); - this.snapshotId = snapshotId; this.formatVersion = source.formatVersion(); this.location = source.location(); this.fileFormat = source.fileFormat(); @@ -167,7 +177,12 @@ private TrackedFileBuilder(TrackedFile source, long snapshotId) { this.keyMetadata = source.keyMetadata(); this.splitOffsets = source.splitOffsets(); this.equalityIds = source.equalityIds(); - this.sourceTracking = source.tracking(); + this.trackingBuilder = TrackingBuilder.from(source.tracking(), snapshotId); + } + + private TrackedFileBuilder(FileContent contentType, TrackingBuilder trackingBuilder) { + this.contentType = contentType; + this.trackingBuilder = trackingBuilder; } TrackedFileBuilder formatVersion(int newFormatVersion) { @@ -315,7 +330,10 @@ TrackedFile build() { Preconditions.checkArgument(recordCount != null, "Missing required field: record count"); Preconditions.checkArgument( fileSizeInBytes != null, "Missing required field: file size in bytes"); - Preconditions.checkArgument(partitionData != null, "Missing required field: partition data"); + Preconditions.checkArgument( + partitionData != null || isLeafManifest(contentType), + "Partition is required for content type %s", + contentType); Preconditions.checkArgument( !isLeafManifest(contentType) || manifestInfo != null, "Missing required field: manifest info"); @@ -323,11 +341,6 @@ TrackedFile build() { contentType != FileContent.EQUALITY_DELETES || equalityIds != null, "Missing required field: equality IDs"); - TrackingBuilder trackingBuilder = - sourceTracking == null - ? TrackingBuilder.added(snapshotId) - : TrackingBuilder.from(sourceTracking, snapshotId); - if (dvUpdated) { trackingBuilder.dvUpdated(); } @@ -340,8 +353,10 @@ TrackedFile build() { trackingBuilder.replacedPositions(replacedPositions); } + Tracking trackingResult = trackingBuilder.build(); + return new TrackedFileStruct( - trackingBuilder.build(), + trackingResult, contentType, formatVersion, location, diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java index 958ddfbbc4cf..ccb97c8e29c3 100644 --- a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java +++ b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java @@ -151,7 +151,7 @@ private TrackedFileStruct(TrackedFileStruct toCopy, boolean withStats, Set + ContentEntryAdapters.fromDataFile( + existingDataEntry(), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.REPLACED)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported status for content file entry: REPLACED"); + } + + @Test + void fromDataFileRejectsModified() { + // MODIFIED has no legacy ManifestEntry representation; the adapter only handles + // ADDED/EXISTING/DELETED. + assertThatThrownBy( + () -> + ContentEntryAdapters.fromDataFile( + existingDataEntry(), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.MODIFIED)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported status for content file entry: MODIFIED"); + } + + @Test + void fromDataFileWithPartitionSpec() { + // Multi-spec carry-over: the writer spec has one field but the union partition type carries an + // additional field from another live spec. The adapter must place the writer's value at the + // union-schema position keyed by field ID and leave the extra union field null. + PartitionSpec writerSpec = PartitionSpec.builderFor(SCHEMA).bucket("id", 16).build(); + Types.NestedField writerField = writerSpec.partitionType().fields().get(0); + Types.NestedField extraField = + Types.NestedField.optional(2000, "extra", Types.IntegerType.get()); + Types.StructType unionType = Types.StructType.of(writerField, extraField); + + PartitionData partition = new PartitionData(writerSpec.partitionType()); + partition.set(0, 7); + DataFile file = + new GenericDataFile( + writerSpec.specId(), + DATA_PATH, + FileFormat.PARQUET, + partition, + 1024L, + new Metrics(100L, null, null, null, null), + null, + ImmutableList.of(0L), + null, + null); + + TrackedFile result = + ContentEntryAdapters.fromDataFile(wrapAdded(file), SCHEMA, unionType, EntryStatus.ADDED); + + assertThat(result.partition()).isInstanceOf(PartitionData.class); + PartitionData projected = (PartitionData) result.partition(); + assertThat(projected.getPartitionType()).isEqualTo(unionType); + assertThat(projected.size()).isEqualTo(2); + + int writerPos = unionType.fields().indexOf(writerField); + int extraPos = unionType.fields().indexOf(extraField); + assertThat(projected.get(writerPos, Integer.class)).isEqualTo(7); + assertThat(projected.get(extraPos, Integer.class)).isNull(); + } + + @Test + void fromDeleteFilePopulatesEqualityIds() { + TrackedFile result = + ContentEntryAdapters.fromDeleteFile( + addedEqualityDeleteEntry(), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED); + + assertThat(result.contentType()).isEqualTo(FileContent.EQUALITY_DELETES); + assertThat(result.location()).isEqualTo(DELETE_PATH); + assertThat(result.equalityIds()).containsExactly(1); + assertThat(result.tracking().status()).isEqualTo(EntryStatus.ADDED); + } + + @Test + void fromDataFilePopulatesContentStatsBounds() { + DataFile file = dataFileWithMetrics(); + TrackedFile result = + ContentEntryAdapters.fromDataFile( + wrapAdded(file), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED); + + ContentStats stats = result.contentStats(); + assertThat(stats).isNotNull(); + assertThat(stats.fieldStats()).extracting(FieldStats::fieldId).containsExactlyInAnyOrder(1, 2); + + FieldStats idStats = + stats.fieldStats().stream().filter(s -> s.fieldId() == 1).findFirst().orElseThrow(); + assertThat(idStats.valueCount()).isEqualTo(100L); + assertThat(idStats.lowerBound()).isEqualTo(1); + assertThat(idStats.upperBound()).isEqualTo(1000); + } + + @Test + void fromDataFilePopulatesContentStatsForListAndMapElements() { + // primitiveTypesFor must recurse into list element and map key/value types so that bounds + // keyed by the inner field IDs are preserved. Without the list/map branches in + // collectPrimitiveTypes the inner types would be absent from originalTypes and + // MetricsUtil.fromMetrics would silently drop these field stats. + Schema schema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required( + 2, "tags", Types.ListType.ofRequired(10 /* elementId */, Types.IntegerType.get())), + required( + 3, + "props", + Types.MapType.ofRequired( + 11 /* keyId */, + 12 /* valueId */, + Types.StringType.get(), + Types.IntegerType.get()))); + + Metrics metrics = + new Metrics( + 100L, + null /* columnSizes */, + ImmutableMap.of(1, 100L, 10, 100L, 11, 100L, 12, 100L), + ImmutableMap.of(1, 0L, 10, 0L, 11, 0L, 12, 0L), + ImmutableMap.of(), + ImmutableMap.of( + 10, Conversions.toByteBuffer(Types.IntegerType.get(), 1), + 11, Conversions.toByteBuffer(Types.StringType.get(), "a"), + 12, Conversions.toByteBuffer(Types.IntegerType.get(), 5)), + ImmutableMap.of( + 10, Conversions.toByteBuffer(Types.IntegerType.get(), 999), + 11, Conversions.toByteBuffer(Types.StringType.get(), "z"), + 12, Conversions.toByteBuffer(Types.IntegerType.get(), 50))); + + DataFile file = + new GenericDataFile( + UNPARTITIONED.specId(), + DATA_PATH, + FileFormat.PARQUET, + EMPTY_PARTITION, + 1024L, + metrics, + null, + ImmutableList.of(0L), + null, + null); + + TrackedFile result = + ContentEntryAdapters.fromDataFile( + wrapAdded(file), schema, UNION_PARTITION_TYPE, EntryStatus.ADDED); + + ContentStats stats = result.contentStats(); + assertThat(stats).isNotNull(); + assertThat(stats.fieldStats()).extracting(FieldStats::fieldId).contains(10, 11, 12); + + FieldStats elementStats = + stats.fieldStats().stream().filter(s -> s.fieldId() == 10).findFirst().orElseThrow(); + assertThat(elementStats.lowerBound()).isEqualTo(1); + assertThat(elementStats.upperBound()).isEqualTo(999); + + FieldStats keyStats = + stats.fieldStats().stream().filter(s -> s.fieldId() == 11).findFirst().orElseThrow(); + assertThat(keyStats.lowerBound()).isEqualTo("a"); + assertThat(keyStats.upperBound()).isEqualTo("z"); + + FieldStats valueStats = + stats.fieldStats().stream().filter(s -> s.fieldId() == 12).findFirst().orElseThrow(); + assertThat(valueStats.lowerBound()).isEqualTo(5); + assertThat(valueStats.upperBound()).isEqualTo(50); + } + + @Test + void fromManifestFileForDataManifest() { + GenericManifestFile manifest = v4ManifestFile(ManifestContent.DATA, 4, 6L); + TrackedFile result = ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, 1000L); + + assertThat(result.contentType()).isEqualTo(FileContent.DATA_MANIFEST); + assertThat(result.formatVersion()).isEqualTo(4); + assertThat(result.location()).isEqualTo(MANIFEST_PATH); + assertThat(result.tracking().firstRowId()).isEqualTo(1000L); + // record_count for a manifest-reference content_entry is sourced from the manifest's + // persisted record_count (v4) or summed from per-status file counts (pre-v4). Data-row totals + // live in manifest_info, not in this field. + assertThat(result.recordCount()).isEqualTo(6L); + assertThat(result.manifestInfo()).isNotNull(); + assertThat(result.manifestInfo().addedFilesCount()).isEqualTo(2); + assertThat(result.manifestInfo().existingFilesCount()).isEqualTo(3); + assertThat(result.manifestInfo().deletedFilesCount()).isEqualTo(1); + assertThat(result.manifestInfo().addedRowsCount()).isEqualTo(200L); + assertThat(result.manifestInfo().existingRowsCount()).isEqualTo(300L); + assertThat(result.manifestInfo().deletedRowsCount()).isEqualTo(100L); + // replaced/modified counts default to 0 when the source manifest does not track them + assertThat(result.manifestInfo().replacedFilesCount()).isEqualTo(0); + assertThat(result.manifestInfo().replacedRowsCount()).isEqualTo(0L); + } + + @Test + void fromManifestFileUsesPersistedRecordCount() { + // When the source manifest carries a persisted record_count (e.g., a v4+ leaf writer that + // tracked MODIFIED entries, or a row projected from a content_entry on read), the adapter + // must propagate it instead of recomputing from per-status file counts. The persisted value + // here (99) deliberately differs from the per-status sum (2 + 3 + 1 = 6) to prove which + // path is taken. + GenericManifestFile manifest = v4ManifestFile(ManifestContent.DATA, 4, 99L); + + TrackedFile result = ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null); + + assertThat(result.recordCount()).isEqualTo(99L); + } + + @Test + void fromManifestFileRejectsV4ManifestWithoutRecordCount() { + // v4+ manifests must carry a persisted record_count: the per-status sum cannot reflect + // MODIFIED entries, so v4+ writers and root-manifest readers populate it directly. + GenericManifestFile manifest = + v4ManifestFile(ManifestContent.DATA, 4, null /* recordCount intentionally null */); + assertThatThrownBy( + () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("record_count must be set"); + } + + @Test + void fromManifestFileForDeleteManifestPreV4() { + // Pre-v4 manifests have formatVersion=0 by default (not tracked in the v1-v3 manifest list + // schema, so reads land on the interface default). The adapter dispatches to the legacy sum. + ManifestFile manifest = manifestFile(ManifestContent.DELETES); + TrackedFile result = + ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.EXISTING, null); + + assertThat(result.contentType()).isEqualTo(FileContent.DELETE_MANIFEST); + assertThat(result.formatVersion()).isEqualTo(0); + assertThat(result.recordCount()).isEqualTo(6L); + assertThat(result.tracking().status()).isEqualTo(EntryStatus.EXISTING); + assertThat(result.tracking().firstRowId()).isNull(); + } + + @Test + void fromManifestFileRejectsFirstRowIdOnDeleteManifest() { + ManifestFile manifest = manifestFile(ManifestContent.DELETES); + + assertThatThrownBy( + () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, 100L)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("firstRowId is only valid for DATA manifests"); + } + + @Test + void fromManifestFileRejectsInvalidFormatVersion() { + // The manifest's intrinsic format_version is the source of truth. Values strictly between 0 + // and 4 (1, 2, 3) and negative values have no defined meaning at the v4+ content_entry layer + // and are rejected. + GenericManifestFile manifest = v4ManifestFile(ManifestContent.DATA, 2, null); + assertThatThrownBy( + () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid manifest format_version: 2"); + } + + @Test + void fromManifestFileRejectsUnassignedMinSequenceNumber() { + ManifestFile manifest = manifestFile(ManifestContent.DATA, ManifestWriter.UNASSIGNED_SEQ); + assertThatThrownBy( + () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("min_sequence_number is unassigned"); + } + + @Test + void fromManifestFileRejectsUnassignedSequenceNumber() { + ManifestFile manifest = + manifestFileWithSequenceNumbers( + ManifestContent.DATA, ManifestWriter.UNASSIGNED_SEQ, 4L /* minSeq, valid */); + assertThatThrownBy( + () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("sequence_number is unassigned"); + } + + @Test + void fromManifestFileRejectsNullManifest() { + assertThatThrownBy(() -> ContentEntryAdapters.fromManifestFile(null, EntryStatus.ADDED, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid manifest file: null"); + } + + @Test + void fromDataFileRejectsNullEntry() { + assertThatThrownBy( + () -> + ContentEntryAdapters.fromDataFile( + null, SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid manifest entry: null"); + } + + @Test + void fromDataFileRejectsNullStatus() { + assertThatThrownBy( + () -> + ContentEntryAdapters.fromDataFile( + addedDataEntry(), SCHEMA, UNION_PARTITION_TYPE, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid status: null"); + } + + @Test + void fromDeleteFileRejectsV3DeleteVector() { + // A v3 delete vector is shaped as POSITION_DELETES stored in a Puffin file. v4+ colocates DVs + // on the data file's content_entry, so this should be rejected at the delete-manifest writer + // boundary. + DeleteFile dv = + new GenericDeleteFile( + UNPARTITIONED.specId(), + FileContent.POSITION_DELETES, + DELETE_PATH, + FileFormat.PUFFIN, + EMPTY_PARTITION, + 512L, + new Metrics(10L, null, null, null, null), + null, + null, + null, + null, + DATA_PATH, + 0L, + 512L); + GenericManifestEntry entry = + new GenericManifestEntry<>( + ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct()); + entry.wrapAppend(SNAPSHOT_ID, dv); + + assertThatThrownBy( + () -> + ContentEntryAdapters.fromDeleteFile( + entry, SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid content for delete file: POSITION_DELETES"); + } + + @Test + void fromDeleteFileRejectsV2PositionDeleteFile() { + // A v2 standalone position delete file is shaped as POSITION_DELETES stored in Parquet/Avro/ORC + // (anything other than Puffin). It has no v4+ representation; carry it over only via a legacy + // v3 manifest with format_version=0. + DeleteFile positionDelete = + new GenericDeleteFile( + UNPARTITIONED.specId(), + FileContent.POSITION_DELETES, + DELETE_PATH, + FileFormat.PARQUET, + EMPTY_PARTITION, + 512L, + new Metrics(10L, null, null, null, null), + null, + null, + null, + null, + DATA_PATH, + null /* contentOffset */, + null /* contentSizeInBytes */); + GenericManifestEntry entry = + new GenericManifestEntry<>( + ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct()); + entry.wrapAppend(SNAPSHOT_ID, positionDelete); + + assertThatThrownBy( + () -> + ContentEntryAdapters.fromDeleteFile( + entry, SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid content for delete file: POSITION_DELETES"); + } + + private static void assertAddedTracking(TrackedFile result, long snapshotId) { + assertThat(result.tracking().status()).isEqualTo(EntryStatus.ADDED); + assertThat(result.tracking().snapshotId()).isEqualTo(snapshotId); + assertThat(result.tracking().dataSequenceNumber()).isNull(); + assertThat(result.tracking().fileSequenceNumber()).isNull(); + } + + private static void assertExistingTracking( + TrackedFile result, long snapshotId, long dataSeq, long fileSeq) { + assertThat(result.tracking().status()).isEqualTo(EntryStatus.EXISTING); + assertThat(result.tracking().snapshotId()).isEqualTo(snapshotId); + assertThat(result.tracking().dataSequenceNumber()).isEqualTo(dataSeq); + assertThat(result.tracking().fileSequenceNumber()).isEqualTo(fileSeq); + } + + private static void assertDeletedTracking( + TrackedFile result, long snapshotId, long dataSeq, long fileSeq) { + assertThat(result.tracking().status()).isEqualTo(EntryStatus.DELETED); + assertThat(result.tracking().snapshotId()).isEqualTo(snapshotId); + assertThat(result.tracking().dataSequenceNumber()).isEqualTo(dataSeq); + assertThat(result.tracking().fileSequenceNumber()).isEqualTo(fileSeq); + } + + private static void assertCommonDataFields(TrackedFile result, DataFile file) { + assertThat(result.contentType()).isEqualTo(FileContent.DATA); + assertThat(result.formatVersion()).isEqualTo(4); + assertThat(result.location()).isEqualTo(file.location()); + assertThat(result.fileFormat()).isEqualTo(file.format()); + assertThat(result.recordCount()).isEqualTo(file.recordCount()); + assertThat(result.fileSizeInBytes()).isEqualTo(file.fileSizeInBytes()); + assertThat(result.specId()).isEqualTo(file.specId()); + assertThat(result.partition().size()).isEqualTo(0); + assertThat(result.splitOffsets()).isEqualTo(file.splitOffsets()); + } + + private static void assertCommonDeleteFields(TrackedFile result, DeleteFile file) { + assertThat(result.contentType()).isEqualTo(FileContent.EQUALITY_DELETES); + assertThat(result.formatVersion()).isEqualTo(4); + assertThat(result.location()).isEqualTo(file.location()); + assertThat(result.fileFormat()).isEqualTo(file.format()); + assertThat(result.recordCount()).isEqualTo(file.recordCount()); + assertThat(result.fileSizeInBytes()).isEqualTo(file.fileSizeInBytes()); + assertThat(result.specId()).isEqualTo(file.specId()); + assertThat(result.partition().size()).isEqualTo(0); + assertThat(result.equalityIds()) + .containsExactlyElementsOf(ImmutableList.copyOf(file.equalityFieldIds())); + } + + private static DataFile dataFile() { + return new GenericDataFile( + UNPARTITIONED.specId(), + DATA_PATH, + FileFormat.PARQUET, + EMPTY_PARTITION, + 1024L, + new Metrics(100L, null, null, null, null), + null, + ImmutableList.of(0L), + null, + null); + } + + private static DataFile dataFileWithMetrics() { + return new GenericDataFile( + UNPARTITIONED.specId(), + DATA_PATH, + FileFormat.PARQUET, + EMPTY_PARTITION, + 1024L, + METRICS_WITH_BOUNDS, + null, + ImmutableList.of(0L), + null, + null); + } + + private static DeleteFile equalityDeleteFile() { + return new GenericDeleteFile( + UNPARTITIONED.specId(), + FileContent.EQUALITY_DELETES, + DELETE_PATH, + FileFormat.PARQUET, + EMPTY_PARTITION, + 512L, + new Metrics(50L, null, null, null, null), + new int[] {1}, + null, + null, + null, + null, + null, + null); + } + + private static ManifestEntry addedDataEntry() { + return wrapAdded(dataFile()); + } + + private static ManifestEntry existingDataEntry() { + GenericManifestEntry entry = + new GenericManifestEntry<>( + ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct()); + entry.wrapExisting(SNAPSHOT_ID, DATA_SEQ, FILE_SEQ, dataFile()); + return entry; + } + + private static ManifestEntry addedEqualityDeleteEntry() { + GenericManifestEntry entry = + new GenericManifestEntry<>( + ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct()); + entry.wrapAppend(SNAPSHOT_ID, equalityDeleteFile()); + return entry; + } + + private static ManifestEntry existingEqualityDeleteEntry() { + GenericManifestEntry entry = + new GenericManifestEntry<>( + ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct()); + entry.wrapExisting(SNAPSHOT_ID, DATA_SEQ, FILE_SEQ, equalityDeleteFile()); + return entry; + } + + private static ManifestEntry wrapAdded(DataFile file) { + GenericManifestEntry entry = + new GenericManifestEntry<>( + ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct()); + entry.wrapAppend(SNAPSHOT_ID, file); + return entry; + } + + private static ManifestFile manifestFile(ManifestContent content) { + return manifestFile(content, 5L /* sequenceNumber */, 4L /* minSequenceNumber */); + } + + private static ManifestFile manifestFile(ManifestContent content, long minSequenceNumber) { + return manifestFile(content, 5L /* sequenceNumber */, minSequenceNumber); + } + + private static ManifestFile manifestFileWithSequenceNumbers( + ManifestContent content, long sequenceNumber, long minSequenceNumber) { + return manifestFile(content, sequenceNumber, minSequenceNumber); + } + + private static GenericManifestFile v4ManifestFile( + ManifestContent content, int formatVersion, Long recordCount) { + List partitions = ImmutableList.of(); + return new GenericManifestFile( + MANIFEST_PATH, + 2048L, + UNPARTITIONED.specId(), + content, + 5L /* sequenceNumber */, + 4L /* minSequenceNumber */, + SNAPSHOT_ID, + partitions, + null, + 2 /* addedFilesCount */, + 200L /* addedRowsCount */, + 3 /* existingFilesCount */, + 300L /* existingRowsCount */, + 1 /* deletedFilesCount */, + 100L /* deletedRowsCount */, + null /* firstRowId */, + recordCount, + formatVersion); + } + + private static ManifestFile manifestFile( + ManifestContent content, long sequenceNumber, long minSequenceNumber) { + List partitions = ImmutableList.of(); + return new GenericManifestFile( + MANIFEST_PATH, + 2048L, + UNPARTITIONED.specId(), + content, + sequenceNumber, + minSequenceNumber, + SNAPSHOT_ID, + partitions, + null, + 2 /* addedFilesCount */, + 200L /* addedRowsCount */, + 3 /* existingFilesCount */, + 300L /* existingRowsCount */, + 1 /* deletedFilesCount */, + 100L /* deletedRowsCount */, + null); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFile.java b/core/src/test/java/org/apache/iceberg/TestTrackedFile.java index 170c01ef7dc4..e5c3ba8a247c 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackedFile.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackedFile.java @@ -109,11 +109,11 @@ public void schemaWithContentStatsReflectsInput() { } @Test - public void schemaWithContentStatsPartitionIsRequired() { + public void schemaWithContentStatsPartitionIsOptional() { Types.StructType type = TrackedFile.schemaWithContentStats(PARTITION_TYPE, CONTENT_STATS_TYPE); Types.NestedField partitionField = type.field(TrackedFile.PARTITION_ID); - assertThat(partitionField.isRequired()).isTrue(); + assertThat(partitionField.isOptional()).isTrue(); assertThat(partitionField.name()).isEqualTo(TrackedFile.PARTITION_NAME); assertThat(partitionField.doc()).isEqualTo(TrackedFile.PARTITION_DOC); } diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java b/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java index d6b0701fdfd7..91ac83ca0dc7 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java @@ -91,8 +91,7 @@ private static Stream missingRequiredFieldCases() { Arguments.of("location", "Missing required field: location"), Arguments.of("fileFormat", "Missing required field: file format"), Arguments.of("recordCount", "Missing required field: record count"), - Arguments.of("fileSizeInBytes", "Missing required field: file size in bytes"), - Arguments.of("partition", "Missing required field: partition data")); + Arguments.of("fileSizeInBytes", "Missing required field: file size in bytes")); } @ParameterizedTest @@ -121,6 +120,37 @@ public void missingRequiredFields(String missingField, String expectedMessage) { .hasMessage(expectedMessage); } + @Test + public void missingPartitionFailsOnlyForDataAndEqualityDelete() { + // Partition is optional on content_entry rows; the builder rejects a null partition only when + // the content type is DATA or EQUALITY_DELETES. Manifest references (DATA_MANIFEST / + // DELETE_MANIFEST) leave partition null. + TrackedFileBuilder dataBuilder = + builderWithMissingRequiredField(TrackedFileBuilder.data(50L), "partition"); + TrackedFileBuilder equalityDeleteBuilder = + builderWithMissingRequiredField(TrackedFileBuilder.equalityDelete(50L), "partition"); + assertThatThrownBy(dataBuilder::build) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Partition is required for content type DATA"); + assertThatThrownBy(equalityDeleteBuilder::build) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Partition is required for content type EQUALITY_DELETES"); + + // Manifest builders should not complain about a missing partition; they fail with the next + // missing required field (manifest info). Manifest reference rows leave the partition as the + // builder's empty-struct default (interpreted by readers as null per the optional schema). + TrackedFile dataManifest = + builderWithMissingRequiredField(TrackedFileBuilder.dataManifest(50L), "partition") + .manifestInfo(MANIFEST_INFO) + .build(); + TrackedFile deleteManifest = + builderWithMissingRequiredField(TrackedFileBuilder.deleteManifest(50L), "partition") + .manifestInfo(MANIFEST_INFO) + .build(); + assertThat(dataManifest.partition().size()).isEqualTo(0); + assertThat(deleteManifest.partition().size()).isEqualTo(0); + } + private TrackedFileBuilder builderWithMissingRequiredField( TrackedFileBuilder builder, String missingField) { if (!"formatVersion".equals(missingField)) { @@ -182,11 +212,13 @@ private static Stream nonEqualityDeleteBuilders() { Arguments.of(TrackedFileBuilder.data(10L), FileContent.DATA), Arguments.of(TrackedFileBuilder.dataManifest(10L), FileContent.DATA_MANIFEST), Arguments.of(TrackedFileBuilder.deleteManifest(10L), FileContent.DELETE_MANIFEST), - Arguments.of(TrackedFileBuilder.from(sourceData(12L), 20L), FileContent.DATA), + Arguments.of(TrackedFileBuilder.from(inheritedSourceData(12L, 5L), 20L), FileContent.DATA), Arguments.of( - TrackedFileBuilder.from(sourceDataManifest(21L), 25L), FileContent.DATA_MANIFEST), + TrackedFileBuilder.from(inheritedSourceDataManifest(21L, 5L), 25L), + FileContent.DATA_MANIFEST), Arguments.of( - TrackedFileBuilder.from(sourceDeleteManifest(12L), 20L), FileContent.DELETE_MANIFEST)); + TrackedFileBuilder.from(inheritedSourceDeleteManifest(12L, 5L), 20L), + FileContent.DELETE_MANIFEST)); } @ParameterizedTest @@ -206,11 +238,14 @@ private static Stream nonDataBuilders() { Arguments.of(TrackedFileBuilder.dataManifest(10L), FileContent.DATA_MANIFEST), Arguments.of(TrackedFileBuilder.deleteManifest(10L), FileContent.DELETE_MANIFEST), Arguments.of( - TrackedFileBuilder.from(sourceEqualityDelete(12L), 20L), FileContent.EQUALITY_DELETES), + TrackedFileBuilder.from(inheritedSourceEqualityDelete(12L, 5L), 20L), + FileContent.EQUALITY_DELETES), Arguments.of( - TrackedFileBuilder.from(sourceDataManifest(21L), 25L), FileContent.DATA_MANIFEST), + TrackedFileBuilder.from(inheritedSourceDataManifest(21L, 5L), 25L), + FileContent.DATA_MANIFEST), Arguments.of( - TrackedFileBuilder.from(sourceDeleteManifest(12L), 20L), FileContent.DELETE_MANIFEST)); + TrackedFileBuilder.from(inheritedSourceDeleteManifest(12L, 5L), 20L), + FileContent.DELETE_MANIFEST)); } @ParameterizedTest @@ -247,9 +282,10 @@ private static Stream nonManifestBuilders() { return Stream.of( Arguments.of(TrackedFileBuilder.data(10L), FileContent.DATA), Arguments.of(TrackedFileBuilder.equalityDelete(10L), FileContent.EQUALITY_DELETES), - Arguments.of(TrackedFileBuilder.from(sourceData(12L), 20L), FileContent.DATA), + Arguments.of(TrackedFileBuilder.from(inheritedSourceData(12L, 5L), 20L), FileContent.DATA), Arguments.of( - TrackedFileBuilder.from(sourceEqualityDelete(12L), 20L), FileContent.EQUALITY_DELETES)); + TrackedFileBuilder.from(inheritedSourceEqualityDelete(12L, 5L), 20L), + FileContent.EQUALITY_DELETES)); } @ParameterizedTest @@ -721,6 +757,44 @@ public void addingSameDeletionVectorFails() { .hasMessage("The same deletion vector already added"); } + @Test + public void buildWithExplicitTrackingFields() { + // Build a tracking row via the explicitTracking factory (e.g., for a manifest reference with + // explicit data/file seq numbers), bypassing the added()/from(source) tracking-derivation + // path. build() resolves the supplied TrackingBuilder. + TrackedFile result = + TrackedFileBuilder.explicitTracking( + FileContent.DATA_MANIFEST, + TrackingBuilder.forContentEntry(EntryStatus.EXISTING, 100L, 7L, 9L, 1000L)) + .formatVersion(FORMAT_VERSION_V4) + .location("s3://bucket/data/manifest.avro") + .fileFormat(FileFormat.AVRO) + .recordCount(420L) + .fileSizeInBytes(556L) + .partition(PARTITION_DATA) + .manifestInfo(MANIFEST_INFO) + .build(); + + assertThat(result.tracking().status()).isEqualTo(EntryStatus.EXISTING); + assertThat(result.tracking().snapshotId()).isEqualTo(100L); + assertThat(result.tracking().dataSequenceNumber()).isEqualTo(7L); + assertThat(result.tracking().fileSequenceNumber()).isEqualTo(9L); + assertThat(result.tracking().firstRowId()).isEqualTo(1000L); + assertThat(result.tracking().dvSnapshotId()).isNull(); + assertThat(result.tracking().deletedPositions()).isNull(); + assertThat(result.tracking().replacedPositions()).isNull(); + } + + @Test + public void explicitTrackingRejectsNull() { + assertThatThrownBy(() -> TrackedFileBuilder.explicitTracking(FileContent.DATA, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid tracking builder: null"); + assertThatThrownBy(() -> TrackedFileBuilder.explicitTracking(null, TrackingBuilder.added(100L))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid content type: null"); + } + private static Stream nonManifestSources() { return Stream.of( Arguments.of(sourceData(10L), FileContent.DATA), @@ -830,6 +904,22 @@ private static TrackedFile entryWithInheritedSeqNums(TrackedFile entry, long seq return entry; } + private static TrackedFile inheritedSourceData(long snapshotId, long sequenceNumber) { + return entryWithInheritedSeqNums(sourceData(snapshotId), sequenceNumber); + } + + private static TrackedFile inheritedSourceEqualityDelete(long snapshotId, long sequenceNumber) { + return entryWithInheritedSeqNums(sourceEqualityDelete(snapshotId), sequenceNumber); + } + + private static TrackedFile inheritedSourceDataManifest(long snapshotId, long sequenceNumber) { + return entryWithInheritedSeqNums(sourceDataManifest(snapshotId), sequenceNumber); + } + + private static TrackedFile inheritedSourceDeleteManifest(long snapshotId, long sequenceNumber) { + return entryWithInheritedSeqNums(sourceDeleteManifest(snapshotId), sequenceNumber); + } + /** * Verifies that fields in entry are the same as in source. Note, snapshot ID can't be verified * here, because based on the entry's status it is either carried over or not. diff --git a/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java b/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java index 0d1803022a23..32995b3dc61c 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java @@ -296,6 +296,66 @@ void testSourceDvPositionsAreNotCarriedForward() { assertThat(replaced.replacedPositions()).isNull(); } + @Test + void testForContentEntryManifestReference() { + // Manifest references carry explicit (not inherited) data/file sequence numbers and an + // optional first-row-id. DV and position-bitmap fields are always null for a manifest + // reference. + Tracking tracking = + TrackingBuilder.forContentEntry(EntryStatus.ADDED, 42L, 7L, 9L, 1000L).build(); + + assertThat(tracking.status()).isEqualTo(EntryStatus.ADDED); + assertThat(tracking.snapshotId()).isEqualTo(42L); + assertThat(tracking.dataSequenceNumber()).isEqualTo(7L); + assertThat(tracking.fileSequenceNumber()).isEqualTo(9L); + assertThat(tracking.firstRowId()).isEqualTo(1000L); + assertThat(tracking.dvSnapshotId()).isNull(); + assertThat(tracking.deletedPositions()).isNull(); + assertThat(tracking.replacedPositions()).isNull(); + } + + @Test + void testForContentEntryManifestReferenceNullFirstRowId() { + // Delete manifests carry null first-row-id (row lineage is data-only). + Tracking tracking = + TrackingBuilder.forContentEntry(EntryStatus.ADDED, 42L, 7L, 9L, null).build(); + + assertThat(tracking.firstRowId()).isNull(); + } + + @Test + void testForContentEntryLegacyProjection() { + // Content-file rows projected from a legacy ManifestEntry carry sequence numbers from the + // legacy entry. DV and position-bitmap fields are always null for the projected row. + Tracking tracking = + TrackingBuilder.forContentEntry(EntryStatus.EXISTING, 42L, 7L, 9L, 1000L).build(); + + assertThat(tracking.status()).isEqualTo(EntryStatus.EXISTING); + assertThat(tracking.snapshotId()).isEqualTo(42L); + assertThat(tracking.dataSequenceNumber()).isEqualTo(7L); + assertThat(tracking.fileSequenceNumber()).isEqualTo(9L); + assertThat(tracking.firstRowId()).isEqualTo(1000L); + assertThat(tracking.dvSnapshotId()).isNull(); + assertThat(tracking.deletedPositions()).isNull(); + assertThat(tracking.replacedPositions()).isNull(); + } + + @Test + void testForContentEntryLegacyProjectionNullFirstRowId() { + // Delete files and entries that inherit from the manifest counter carry null first-row-id. + Tracking tracking = + TrackingBuilder.forContentEntry(EntryStatus.EXISTING, 42L, 7L, 9L, null).build(); + + assertThat(tracking.firstRowId()).isNull(); + } + + @Test + void testForContentEntryRejectsNullStatus() { + assertThatThrownBy(() -> TrackingBuilder.forContentEntry(null, 42L, 7L, 9L, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid status: null"); + } + @Test void testDvUpdatedProducesModifiedAndAdvancesDvSnapshotId() { Tracking source = sourceTracking();