diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java
index 2f732aef427f..6ada3012e3a0 100644
--- a/api/src/main/java/org/apache/iceberg/ManifestFile.java
+++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java
@@ -29,6 +29,12 @@
public interface ManifestFile {
int PARTITION_SUMMARIES_ELEMENT_ID = 508;
+ /** Value of {@link #formatVersion()} for v4+ manifest files. */
+ int V4_FORMAT_VERSION = 4;
+
+ /** Value of {@link #formatVersion()} for pre-v4 manifest files (v1, v2, and v3). */
+ int LEGACY_FORMAT_VERSION = 0;
+
Types.NestedField PATH =
required(500, "manifest_path", Types.StringType.get(), "Location URI with FS scheme");
Types.NestedField LENGTH =
@@ -186,6 +192,26 @@ default boolean hasDeletedFiles() {
/** Returns the total number of rows in all files with status DELETED in the manifest file. */
Long deletedRowsCount();
+ /**
+ * Returns the number of files with status REPLACED in the manifest file, or null if not tracked.
+ *
+ *
REPLACED files are the prior-state entries of v4 REPLACED/MODIFIED pairs and are not live.
+ * Returns null for manifest files written by pre-v4 writers.
+ */
+ default Integer replacedFilesCount() {
+ return null;
+ }
+
+ /**
+ * Returns the total number of rows in all files with status REPLACED in the manifest file, or
+ * null if not tracked.
+ *
+ *
Returns null for manifest files written by pre-v4 writers.
+ */
+ default Long replacedRowsCount() {
+ return null;
+ }
+
/**
* Returns a list of {@link PartitionFieldSummary partition field summaries}.
*
@@ -210,6 +236,19 @@ default Long firstRowId() {
return null;
}
+ /** Returns the number of records in the manifest file, or {@code null} for pre-v4 manifests. */
+ default Long recordCount() {
+ return null;
+ }
+
+ /**
+ * Returns the format version of the manifest file, or {@link #LEGACY_FORMAT_VERSION} for pre-v4
+ * manifests.
+ */
+ default int formatVersion() {
+ return LEGACY_FORMAT_VERSION;
+ }
+
/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
diff --git a/core/src/main/java/org/apache/iceberg/ContentEntryAdapters.java b/core/src/main/java/org/apache/iceberg/ContentEntryAdapters.java
new file mode 100644
index 000000000000..41a716176841
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ContentEntryAdapters.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Builds {@link TrackedFile} instances for v4+ content_entry rows from legacy {@link ManifestEntry}
+ * and {@link ManifestFile} inputs.
+ */
+class ContentEntryAdapters {
+ // Cache of primitive-field types keyed by table schema. The originalTypes map a Metrics
+ // instance carries is read-only inside MetricsUtil.fromMetrics, so a single per-schema map
+ // can be shared across every adapter call for a writer's lifetime instead of being rebuilt
+ // per row. WeakHashMap keys release when callers stop referencing the schema.
+ private static final Map> PRIMITIVE_TYPES_BY_SCHEMA =
+ Collections.synchronizedMap(new WeakHashMap<>());
+
+ private ContentEntryAdapters() {}
+
+ static TrackedFile fromDataFile(
+ ManifestEntry entry,
+ Schema tableSchema,
+ Types.StructType unionPartitionType,
+ EntryStatus statusOverride) {
+ Preconditions.checkArgument(entry != null, "Invalid manifest entry: null");
+ DataFile file = entry.file();
+ Preconditions.checkArgument(file != null, "Invalid data file: null");
+ Preconditions.checkArgument(
+ file.content() == FileContent.DATA, "Invalid content for data file: %s", file.content());
+ return buildContentFileEntry(
+ file,
+ statusOverride,
+ entry.snapshotId(),
+ entry.dataSequenceNumber(),
+ entry.fileSequenceNumber(),
+ tableSchema,
+ unionPartitionType);
+ }
+
+ static TrackedFile fromDeleteFile(
+ ManifestEntry entry,
+ Schema tableSchema,
+ Types.StructType unionPartitionType,
+ EntryStatus statusOverride) {
+ Preconditions.checkArgument(entry != null, "Invalid manifest entry: null");
+ DeleteFile file = entry.file();
+ Preconditions.checkArgument(file != null, "Invalid delete file: null");
+ // v4+ leaf delete manifests must only contain content_type=EQUALITY_DELETES (per spec PR
+ // #16025). POSITION_DELETES has no v4+ leaf representation:
+ // - v3 delete vectors (POSITION_DELETES stored as a Puffin blob) must be colocated on the
+ // data file's content_entry via TrackedFileBuilder.deletionVector(...) — see
+ // MergingSnapshotProducer's row-delta path.
+ // - v2 standalone position delete files (POSITION_DELETES stored in Parquet/Avro/ORC) have
+ // no v4+ representation; they can only live in pre-v4 legacy manifests carried over via a
+ // format_version=0 manifest reference.
+ Preconditions.checkArgument(
+ file.content() == FileContent.EQUALITY_DELETES,
+ "Invalid content for delete file: %s",
+ file.content());
+ return buildContentFileEntry(
+ file,
+ statusOverride,
+ entry.snapshotId(),
+ entry.dataSequenceNumber(),
+ entry.fileSequenceNumber(),
+ tableSchema,
+ unionPartitionType);
+ }
+
+ /**
+ * Builds a manifest reference content_entry for the v4+ root manifest.
+ *
+ * The on-disk {@code format_version} field is sourced from {@link
+ * ManifestFile#formatVersion()} (0 for pre-v4 leaves carried over during a v3-to-v4 upgrade, 4
+ * for v4+ leaves). For v4+ leaves the source manifest must carry a non-null {@link
+ * ManifestFile#recordCount()} populated by the v4+ writer or root-manifest reader; for pre-v4
+ * leaves the record_count is summed from the per-status file counts.
+ *
+ * @param status entry status (typically ADDED for a newly written leaf, EXISTING for a
+ * carried-over reference)
+ * @param firstRowId the resolved first-row-id to write for this reference, or null for delete
+ * manifests. Callers are responsible for resolving the value (either carrying over {@link
+ * ManifestFile#firstRowId()} or assigning from a writer-side counter); the adapter does not
+ * decide between the two.
+ */
+ static TrackedFile fromManifestFile(ManifestFile manifest, EntryStatus status, Long firstRowId) {
+ Preconditions.checkArgument(manifest != null, "Invalid manifest file: null");
+ Preconditions.checkArgument(status != null, "Invalid status: null");
+ int formatVersion = manifest.formatVersion();
+ Preconditions.checkArgument(
+ formatVersion == ManifestFile.LEGACY_FORMAT_VERSION
+ || formatVersion >= ManifestFile.V4_FORMAT_VERSION,
+ "Invalid manifest format_version: %s (must be %s for pre-v4 or >= %s for v4+)",
+ formatVersion,
+ ManifestFile.LEGACY_FORMAT_VERSION,
+ ManifestFile.V4_FORMAT_VERSION);
+ Long manifestSnapshotId = manifest.snapshotId();
+ Preconditions.checkArgument(manifestSnapshotId != null, "Invalid manifest snapshot id: null");
+ Preconditions.checkArgument(
+ firstRowId == null || manifest.content() == ManifestContent.DATA,
+ "firstRowId is only valid for DATA manifests, but content is %s",
+ manifest.content());
+
+ long manifestSeq = manifest.sequenceNumber();
+ Preconditions.checkArgument(
+ manifestSeq != ManifestWriter.UNASSIGNED_SEQ,
+ "Cannot build content_entry for manifest reference %s: sequence_number is unassigned. "
+ + "Resolve via RootManifestWriter.assignSequenceNumber before writing.",
+ manifest.path());
+ ManifestInfo info = manifestInfo(manifest);
+
+ FileContent contentType =
+ manifest.content() == ManifestContent.DATA
+ ? FileContent.DATA_MANIFEST
+ : FileContent.DELETE_MANIFEST;
+ long recordCount = resolveRecordCount(manifest);
+
+ TrackedFileBuilder builder =
+ TrackedFileBuilder.explicitTracking(
+ contentType,
+ TrackingBuilder.forContentEntry(
+ status, manifestSnapshotId, manifestSeq, manifestSeq, firstRowId));
+ builder
+ .formatVersion(formatVersion)
+ .location(manifest.path())
+ .fileFormat(FileFormat.fromFileName(manifest.path()))
+ .recordCount(recordCount)
+ .fileSizeInBytes(manifest.length())
+ .specId(manifest.partitionSpecId())
+ .manifestInfo(info);
+
+ if (manifest.keyMetadata() != null) {
+ builder.keyMetadata(manifest.keyMetadata());
+ }
+
+ return builder.build();
+ }
+
+ private static TrackedFile buildContentFileEntry(
+ ContentFile> file,
+ EntryStatus status,
+ long snapshotId,
+ Long dataSequenceNumber,
+ Long fileSequenceNumber,
+ Schema tableSchema,
+ Types.StructType unionPartitionType) {
+ Preconditions.checkArgument(status != null, "Invalid status: null");
+ // fromDataFile / fromDeleteFile project legacy ManifestEntry rows, whose status is ADDED,
+ // EXISTING, or DELETED. MODIFIED and REPLACED have no legacy representation — they're written
+ // directly by V4Writer.prepareWithStatus via TrackedFileBuilder.from(source, sid).
+ // deletionVector(dv).build() (MODIFIED) and TrackedFileBuilder.replaced(source, sid)
+ // (REPLACED).
+ Preconditions.checkArgument(
+ status == EntryStatus.ADDED
+ || status == EntryStatus.EXISTING
+ || status == EntryStatus.DELETED,
+ "Unsupported status for content file entry: %s (use V4Writer.prepareWithStatus for "
+ + "MODIFIED/REPLACED transitions)",
+ status);
+ PartitionData partition = toPartitionData(file, unionPartitionType);
+ FileFormat format = file.format();
+ Preconditions.checkArgument(
+ format != null, "Invalid file format: null for %s", file.location());
+ ContentStats stats = MetricsUtil.fromMetrics(tableSchema, toMetrics(file, tableSchema));
+ boolean isDataFile = file.content() == FileContent.DATA;
+
+ TrackedFileBuilder builder;
+ if (status == EntryStatus.ADDED) {
+ builder =
+ isDataFile
+ ? TrackedFileBuilder.data(snapshotId)
+ : TrackedFileBuilder.equalityDelete(snapshotId);
+ } else {
+ Preconditions.checkArgument(
+ dataSequenceNumber != null, "Invalid data sequence number: null for non-ADDED entry");
+ Preconditions.checkArgument(
+ fileSequenceNumber != null, "Invalid file sequence number: null for non-ADDED entry");
+ Long firstRowId = isDataFile ? ((DataFile) file).firstRowId() : null;
+ FileContent contentType = isDataFile ? FileContent.DATA : FileContent.EQUALITY_DELETES;
+ builder =
+ TrackedFileBuilder.explicitTracking(
+ contentType,
+ TrackingBuilder.forContentEntry(
+ status, snapshotId, dataSequenceNumber, fileSequenceNumber, firstRowId));
+ }
+
+ builder
+ .formatVersion(ManifestFile.V4_FORMAT_VERSION)
+ .location(file.location())
+ .fileFormat(format)
+ .partition(partition)
+ .recordCount(file.recordCount())
+ .fileSizeInBytes(file.fileSizeInBytes())
+ .specId(file.specId());
+
+ populateOptionalFields(builder, file, stats, isDataFile);
+
+ return builder.build();
+ }
+
+ private static void populateOptionalFields(
+ TrackedFileBuilder builder, ContentFile> file, ContentStats stats, boolean isDataFile) {
+ if (stats != null) {
+ builder.contentStats(stats);
+ }
+
+ if (file.sortOrderId() != null && isDataFile) {
+ builder.sortOrderId(file.sortOrderId());
+ }
+
+ if (file.keyMetadata() != null) {
+ builder.keyMetadata(file.keyMetadata());
+ }
+
+ if (file.splitOffsets() != null) {
+ builder.splitOffsets(file.splitOffsets());
+ }
+
+ if (!isDataFile && file.equalityFieldIds() != null) {
+ builder.equalityIds(file.equalityFieldIds());
+ }
+ }
+
+ /**
+ * Returns the record_count for a manifest-reference content_entry row. For v4+ manifests this is
+ * the persisted record_count populated by the writer or reader; for pre-v4 manifests it is summed
+ * from the per-status file counts. REPLACED is not summed because it has no pre-v4
+ * representation.
+ */
+ private static long resolveRecordCount(ManifestFile manifest) {
+ if (manifest.formatVersion() >= ManifestFile.V4_FORMAT_VERSION) {
+ Long persisted = manifest.recordCount();
+ Preconditions.checkArgument(
+ persisted != null,
+ "Invalid v4 manifest reference for %s: record_count must be set by the writer",
+ manifest.path());
+ return persisted;
+ }
+
+ long total = 0L;
+ if (manifest.addedFilesCount() != null) {
+ total += manifest.addedFilesCount();
+ }
+
+ if (manifest.existingFilesCount() != null) {
+ total += manifest.existingFilesCount();
+ }
+
+ if (manifest.deletedFilesCount() != null) {
+ total += manifest.deletedFilesCount();
+ }
+
+ return total;
+ }
+
+ private static ManifestInfo manifestInfo(ManifestFile manifest) {
+ long minSeq = manifest.minSequenceNumber();
+ Preconditions.checkArgument(
+ minSeq != ManifestWriter.UNASSIGNED_SEQ,
+ "Cannot build manifest_info for %s: min_sequence_number is unassigned. "
+ + "Resolve via RootManifestWriter.assignSequenceNumber before writing.",
+ manifest.path());
+
+ return ManifestInfoStruct.builder()
+ .addedFilesCount(zeroIfNull(manifest.addedFilesCount()))
+ .existingFilesCount(zeroIfNull(manifest.existingFilesCount()))
+ .deletedFilesCount(zeroIfNull(manifest.deletedFilesCount()))
+ .replacedFilesCount(zeroIfNull(manifest.replacedFilesCount()))
+ .addedRowsCount(zeroIfNull(manifest.addedRowsCount()))
+ .existingRowsCount(zeroIfNull(manifest.existingRowsCount()))
+ .deletedRowsCount(zeroIfNull(manifest.deletedRowsCount()))
+ .replacedRowsCount(zeroIfNull(manifest.replacedRowsCount()))
+ .minSequenceNumber(minSeq)
+ .build();
+ }
+
+ private static int zeroIfNull(Integer value) {
+ return value != null ? value : 0;
+ }
+
+ private static long zeroIfNull(Long value) {
+ return value != null ? value : 0L;
+ }
+
+ /**
+ * Projects the file's per-spec partition tuple into the unified partition schema (union of all
+ * live specs) used by every v4+ content_entry row. Result fields are sourced by field ID via
+ * {@link StructProjection#createAllowMissing}; fields not present in the writer spec land as
+ * null.
+ */
+ private static PartitionData toPartitionData(
+ ContentFile> file, Types.StructType unionPartitionType) {
+ Preconditions.checkArgument(
+ unionPartitionType != null, "Invalid union partition type: null for %s", file.location());
+ PartitionData result = new PartitionData(unionPartitionType);
+ StructLike partition = file.partition();
+ if (partition == null) {
+ return result;
+ }
+
+ Types.StructType sourceType;
+ if (partition instanceof PartitionData) {
+ sourceType = ((PartitionData) partition).getPartitionType();
+ } else {
+ // Without a backing PartitionData the partition's element types are unavailable, so an empty
+ // struct is the only safe materialization. Reject any non-empty case rather than silently
+ // dropping fields.
+ Preconditions.checkArgument(
+ partition.size() == 0,
+ "Cannot convert partition for %s: type information is unavailable for %s",
+ file.location(),
+ partition);
+ return result;
+ }
+
+ if (sourceType.fields().isEmpty()) {
+ return result;
+ }
+
+ // Build a per-call StructProjection (not cached: the writer spec varies across files in
+ // multi-spec carry-over flows). Field-ID lookup means partition values land in the correct
+ // union-schema position regardless of the writer spec's ordering.
+ StructProjection projection =
+ StructProjection.createAllowMissing(sourceType, unionPartitionType);
+ projection.wrap(partition);
+ for (int pos = 0; pos < unionPartitionType.fields().size(); pos += 1) {
+ result.set(pos, projection.get(pos, Object.class));
+ }
+ return result;
+ }
+
+ private static Metrics toMetrics(ContentFile> file, Schema tableSchema) {
+ Map lowerBounds = file.lowerBounds();
+ Map upperBounds = file.upperBounds();
+ boolean hasBounds =
+ (lowerBounds != null && !lowerBounds.isEmpty())
+ || (upperBounds != null && !upperBounds.isEmpty());
+ Map originalTypes = hasBounds ? primitiveTypesFor(tableSchema) : null;
+
+ return new Metrics(
+ file.recordCount(),
+ file.columnSizes(),
+ file.valueCounts(),
+ file.nullValueCounts(),
+ file.nanValueCounts(),
+ lowerBounds,
+ upperBounds,
+ originalTypes);
+ }
+
+ private static Map primitiveTypesFor(Schema schema) {
+ if (schema == null) {
+ return null;
+ }
+
+ Map cached = PRIMITIVE_TYPES_BY_SCHEMA.get(schema);
+ if (cached != null) {
+ return cached;
+ }
+
+ Map types = Maps.newHashMap();
+ for (Types.NestedField field : schema.columns()) {
+ collectPrimitiveTypes(field, types);
+ }
+
+ Map result = Collections.unmodifiableMap(types);
+ PRIMITIVE_TYPES_BY_SCHEMA.put(schema, result);
+ return result;
+ }
+
+ private static void collectPrimitiveTypes(Types.NestedField field, Map types) {
+ Type type = field.type();
+ if (type.isPrimitiveType()) {
+ types.put(field.fieldId(), type);
+ return;
+ }
+
+ if (type.isStructType()) {
+ for (Types.NestedField nested : type.asStructType().fields()) {
+ collectPrimitiveTypes(nested, types);
+ }
+ return;
+ }
+
+ if (type.isListType()) {
+ collectPrimitiveTypes(type.asListType().fields().get(0), types);
+ return;
+ }
+
+ if (type.isMapType()) {
+ for (Types.NestedField nested : type.asMapType().fields()) {
+ collectPrimitiveTypes(nested, types);
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
index 9624484ffe0c..d288835a9438 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
@@ -60,6 +60,16 @@ public class GenericManifestFile extends SupportsIndexProjection
private PartitionFieldSummary[] partitions = null;
private byte[] keyMetadata = null;
private Long firstRowId = null;
+ // v4+: total number of entries persisted at the root-manifest content_entry level. Null for
+ // pre-v4 manifest list entries where the count is not directly tracked at the manifest reference
+ // level. Populated via the v4+ constructor variant by V4Writer.toManifestFile and by
+ // RootManifestReader when projecting a content_entry row.
+ private Long recordCount = null;
+ // v4+: format version of the manifest file. LEGACY_FORMAT_VERSION (0) for pre-v4 manifests;
+ // V4_FORMAT_VERSION (4) for v4+ manifests. Populated via the v4+ constructor variant by v4+
+ // writers/readers; pre-v4 manifest list serialization does not carry this field, so it stays at
+ // the default for any manifest read from a v1-v3 manifest list.
+ private int formatVersion = LEGACY_FORMAT_VERSION;
/** Used by Avro reflection to instantiate this class when reading manifest files. */
public GenericManifestFile(Schema avroSchema) {
@@ -112,6 +122,51 @@ public GenericManifestFile(Schema avroSchema) {
Integer deletedFilesCount,
Long deletedRowsCount,
Long firstRowId) {
+ this(
+ path,
+ length,
+ specId,
+ content,
+ sequenceNumber,
+ minSequenceNumber,
+ snapshotId,
+ partitions,
+ keyMetadata,
+ addedFilesCount,
+ addedRowsCount,
+ existingFilesCount,
+ existingRowsCount,
+ deletedFilesCount,
+ deletedRowsCount,
+ firstRowId,
+ null /* recordCount */,
+ LEGACY_FORMAT_VERSION);
+ }
+
+ /**
+ * v4+ constructor variant that accepts the v4+ {@code recordCount} and {@code formatVersion}
+ * fields. Pre-v4 callers should use the constructor without these parameters (which defaults
+ * recordCount to null and formatVersion to {@link #LEGACY_FORMAT_VERSION}).
+ */
+ GenericManifestFile(
+ String path,
+ long length,
+ int specId,
+ ManifestContent content,
+ long sequenceNumber,
+ long minSequenceNumber,
+ Long snapshotId,
+ List partitions,
+ ByteBuffer keyMetadata,
+ Integer addedFilesCount,
+ Long addedRowsCount,
+ Integer existingFilesCount,
+ Long existingRowsCount,
+ Integer deletedFilesCount,
+ Long deletedRowsCount,
+ Long firstRowId,
+ Long recordCount,
+ int formatVersion) {
super(ManifestFile.schema().columns().size());
this.avroSchema = AVRO_SCHEMA;
this.manifestPath = path;
@@ -130,6 +185,8 @@ public GenericManifestFile(Schema avroSchema) {
this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]);
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.firstRowId = firstRowId;
+ this.recordCount = recordCount;
+ this.formatVersion = formatVersion;
}
/**
@@ -172,6 +229,8 @@ private GenericManifestFile(GenericManifestFile toCopy) {
? null
: Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length);
this.firstRowId = toCopy.firstRowId;
+ this.recordCount = toCopy.recordCount;
+ this.formatVersion = toCopy.formatVersion;
}
/** Constructor for Java serialization. */
@@ -272,6 +331,16 @@ public Long firstRowId() {
return firstRowId;
}
+ @Override
+ public Long recordCount() {
+ return recordCount;
+ }
+
+ @Override
+ public int formatVersion() {
+ return formatVersion;
+ }
+
@Override
public int size() {
return ManifestFile.schema().columns().size();
diff --git a/core/src/main/java/org/apache/iceberg/TrackedFile.java b/core/src/main/java/org/apache/iceberg/TrackedFile.java
index 5b9cdd9ab46c..aaab0ef1081f 100644
--- a/core/src/main/java/org/apache/iceberg/TrackedFile.java
+++ b/core/src/main/java/org/apache/iceberg/TrackedFile.java
@@ -106,7 +106,7 @@ static Types.StructType schemaWithContentStats(
RECORD_COUNT,
FILE_SIZE_IN_BYTES,
SPEC_ID,
- Types.NestedField.required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
+ Types.NestedField.optional(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
Types.NestedField.optional(
CONTENT_STATS_ID, CONTENT_STATS_NAME, contentStatsType, CONTENT_STATS_DOC),
SORT_ORDER_ID,
diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java b/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java
index d7aa28c3290f..62f0f3ae2ecd 100644
--- a/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java
+++ b/core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java
@@ -23,8 +23,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
class TrackedFileBuilder {
- private final long snapshotId;
private final FileContent contentType;
+ private final TrackingBuilder trackingBuilder;
// Required fields
private Integer formatVersion = null;
@@ -44,8 +44,7 @@ class TrackedFileBuilder {
private List splitOffsets = null;
private List equalityIds = null;
- // tracking-related fields
- private Tracking sourceTracking = null;
+ // tracking-related deferred mutators applied at build() time
private boolean dvUpdated = false;
private ByteBuffer deletedPositions = null;
private ByteBuffer replacedPositions = null;
@@ -97,6 +96,18 @@ static TrackedFileBuilder from(TrackedFile source, long newSnapshotId) {
return new TrackedFileBuilder(source, newSnapshotId);
}
+ /**
+ * Creates a builder with an externally-constructed TrackingBuilder. Use for content_entry rows
+ * whose tracking values come from outside the writer (v4+ manifest references, projections from
+ * legacy ManifestEntry).
+ */
+ static TrackedFileBuilder explicitTracking(
+ FileContent contentType, TrackingBuilder trackingBuilder) {
+ Preconditions.checkArgument(contentType != null, "Invalid content type: null");
+ Preconditions.checkArgument(trackingBuilder != null, "Invalid tracking builder: null");
+ return new TrackedFileBuilder(contentType, trackingBuilder);
+ }
+
/**
* Returns a DELETED tracked file derived from {@code source}.
*
@@ -147,12 +158,11 @@ private static TrackedFile terminal(TrackedFile source, Tracking tracking) {
private TrackedFileBuilder(FileContent contentType, long snapshotId) {
this.contentType = contentType;
- this.snapshotId = snapshotId;
+ this.trackingBuilder = TrackingBuilder.added(snapshotId);
}
private TrackedFileBuilder(TrackedFile source, long snapshotId) {
this.contentType = source.contentType();
- this.snapshotId = snapshotId;
this.formatVersion = source.formatVersion();
this.location = source.location();
this.fileFormat = source.fileFormat();
@@ -167,7 +177,12 @@ private TrackedFileBuilder(TrackedFile source, long snapshotId) {
this.keyMetadata = source.keyMetadata();
this.splitOffsets = source.splitOffsets();
this.equalityIds = source.equalityIds();
- this.sourceTracking = source.tracking();
+ this.trackingBuilder = TrackingBuilder.from(source.tracking(), snapshotId);
+ }
+
+ private TrackedFileBuilder(FileContent contentType, TrackingBuilder trackingBuilder) {
+ this.contentType = contentType;
+ this.trackingBuilder = trackingBuilder;
}
TrackedFileBuilder formatVersion(int newFormatVersion) {
@@ -315,7 +330,10 @@ TrackedFile build() {
Preconditions.checkArgument(recordCount != null, "Missing required field: record count");
Preconditions.checkArgument(
fileSizeInBytes != null, "Missing required field: file size in bytes");
- Preconditions.checkArgument(partitionData != null, "Missing required field: partition data");
+ Preconditions.checkArgument(
+ partitionData != null || isLeafManifest(contentType),
+ "Partition is required for content type %s",
+ contentType);
Preconditions.checkArgument(
!isLeafManifest(contentType) || manifestInfo != null,
"Missing required field: manifest info");
@@ -323,11 +341,6 @@ TrackedFile build() {
contentType != FileContent.EQUALITY_DELETES || equalityIds != null,
"Missing required field: equality IDs");
- TrackingBuilder trackingBuilder =
- sourceTracking == null
- ? TrackingBuilder.added(snapshotId)
- : TrackingBuilder.from(sourceTracking, snapshotId);
-
if (dvUpdated) {
trackingBuilder.dvUpdated();
}
@@ -340,8 +353,10 @@ TrackedFile build() {
trackingBuilder.replacedPositions(replacedPositions);
}
+ Tracking trackingResult = trackingBuilder.build();
+
return new TrackedFileStruct(
- trackingBuilder.build(),
+ trackingResult,
contentType,
formatVersion,
location,
diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java
index 958ddfbbc4cf..ccb97c8e29c3 100644
--- a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java
+++ b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java
@@ -151,7 +151,7 @@ private TrackedFileStruct(TrackedFileStruct toCopy, boolean withStats, Set
+ ContentEntryAdapters.fromDataFile(
+ existingDataEntry(), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.REPLACED))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported status for content file entry: REPLACED");
+ }
+
+ @Test
+ void fromDataFileRejectsModified() {
+ // MODIFIED has no legacy ManifestEntry representation; the adapter only handles
+ // ADDED/EXISTING/DELETED.
+ assertThatThrownBy(
+ () ->
+ ContentEntryAdapters.fromDataFile(
+ existingDataEntry(), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.MODIFIED))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported status for content file entry: MODIFIED");
+ }
+
+ @Test
+ void fromDataFileWithPartitionSpec() {
+ // Multi-spec carry-over: the writer spec has one field but the union partition type carries an
+ // additional field from another live spec. The adapter must place the writer's value at the
+ // union-schema position keyed by field ID and leave the extra union field null.
+ PartitionSpec writerSpec = PartitionSpec.builderFor(SCHEMA).bucket("id", 16).build();
+ Types.NestedField writerField = writerSpec.partitionType().fields().get(0);
+ Types.NestedField extraField =
+ Types.NestedField.optional(2000, "extra", Types.IntegerType.get());
+ Types.StructType unionType = Types.StructType.of(writerField, extraField);
+
+ PartitionData partition = new PartitionData(writerSpec.partitionType());
+ partition.set(0, 7);
+ DataFile file =
+ new GenericDataFile(
+ writerSpec.specId(),
+ DATA_PATH,
+ FileFormat.PARQUET,
+ partition,
+ 1024L,
+ new Metrics(100L, null, null, null, null),
+ null,
+ ImmutableList.of(0L),
+ null,
+ null);
+
+ TrackedFile result =
+ ContentEntryAdapters.fromDataFile(wrapAdded(file), SCHEMA, unionType, EntryStatus.ADDED);
+
+ assertThat(result.partition()).isInstanceOf(PartitionData.class);
+ PartitionData projected = (PartitionData) result.partition();
+ assertThat(projected.getPartitionType()).isEqualTo(unionType);
+ assertThat(projected.size()).isEqualTo(2);
+
+ int writerPos = unionType.fields().indexOf(writerField);
+ int extraPos = unionType.fields().indexOf(extraField);
+ assertThat(projected.get(writerPos, Integer.class)).isEqualTo(7);
+ assertThat(projected.get(extraPos, Integer.class)).isNull();
+ }
+
+ @Test
+ void fromDeleteFilePopulatesEqualityIds() {
+ TrackedFile result =
+ ContentEntryAdapters.fromDeleteFile(
+ addedEqualityDeleteEntry(), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED);
+
+ assertThat(result.contentType()).isEqualTo(FileContent.EQUALITY_DELETES);
+ assertThat(result.location()).isEqualTo(DELETE_PATH);
+ assertThat(result.equalityIds()).containsExactly(1);
+ assertThat(result.tracking().status()).isEqualTo(EntryStatus.ADDED);
+ }
+
+ @Test
+ void fromDataFilePopulatesContentStatsBounds() {
+ DataFile file = dataFileWithMetrics();
+ TrackedFile result =
+ ContentEntryAdapters.fromDataFile(
+ wrapAdded(file), SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED);
+
+ ContentStats stats = result.contentStats();
+ assertThat(stats).isNotNull();
+ assertThat(stats.fieldStats()).extracting(FieldStats::fieldId).containsExactlyInAnyOrder(1, 2);
+
+ FieldStats> idStats =
+ stats.fieldStats().stream().filter(s -> s.fieldId() == 1).findFirst().orElseThrow();
+ assertThat(idStats.valueCount()).isEqualTo(100L);
+ assertThat(idStats.lowerBound()).isEqualTo(1);
+ assertThat(idStats.upperBound()).isEqualTo(1000);
+ }
+
+ @Test
+ void fromDataFilePopulatesContentStatsForListAndMapElements() {
+ // primitiveTypesFor must recurse into list element and map key/value types so that bounds
+ // keyed by the inner field IDs are preserved. Without the list/map branches in
+ // collectPrimitiveTypes the inner types would be absent from originalTypes and
+ // MetricsUtil.fromMetrics would silently drop these field stats.
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(
+ 2, "tags", Types.ListType.ofRequired(10 /* elementId */, Types.IntegerType.get())),
+ required(
+ 3,
+ "props",
+ Types.MapType.ofRequired(
+ 11 /* keyId */,
+ 12 /* valueId */,
+ Types.StringType.get(),
+ Types.IntegerType.get())));
+
+ Metrics metrics =
+ new Metrics(
+ 100L,
+ null /* columnSizes */,
+ ImmutableMap.of(1, 100L, 10, 100L, 11, 100L, 12, 100L),
+ ImmutableMap.of(1, 0L, 10, 0L, 11, 0L, 12, 0L),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ 10, Conversions.toByteBuffer(Types.IntegerType.get(), 1),
+ 11, Conversions.toByteBuffer(Types.StringType.get(), "a"),
+ 12, Conversions.toByteBuffer(Types.IntegerType.get(), 5)),
+ ImmutableMap.of(
+ 10, Conversions.toByteBuffer(Types.IntegerType.get(), 999),
+ 11, Conversions.toByteBuffer(Types.StringType.get(), "z"),
+ 12, Conversions.toByteBuffer(Types.IntegerType.get(), 50)));
+
+ DataFile file =
+ new GenericDataFile(
+ UNPARTITIONED.specId(),
+ DATA_PATH,
+ FileFormat.PARQUET,
+ EMPTY_PARTITION,
+ 1024L,
+ metrics,
+ null,
+ ImmutableList.of(0L),
+ null,
+ null);
+
+ TrackedFile result =
+ ContentEntryAdapters.fromDataFile(
+ wrapAdded(file), schema, UNION_PARTITION_TYPE, EntryStatus.ADDED);
+
+ ContentStats stats = result.contentStats();
+ assertThat(stats).isNotNull();
+ assertThat(stats.fieldStats()).extracting(FieldStats::fieldId).contains(10, 11, 12);
+
+ FieldStats> elementStats =
+ stats.fieldStats().stream().filter(s -> s.fieldId() == 10).findFirst().orElseThrow();
+ assertThat(elementStats.lowerBound()).isEqualTo(1);
+ assertThat(elementStats.upperBound()).isEqualTo(999);
+
+ FieldStats> keyStats =
+ stats.fieldStats().stream().filter(s -> s.fieldId() == 11).findFirst().orElseThrow();
+ assertThat(keyStats.lowerBound()).isEqualTo("a");
+ assertThat(keyStats.upperBound()).isEqualTo("z");
+
+ FieldStats> valueStats =
+ stats.fieldStats().stream().filter(s -> s.fieldId() == 12).findFirst().orElseThrow();
+ assertThat(valueStats.lowerBound()).isEqualTo(5);
+ assertThat(valueStats.upperBound()).isEqualTo(50);
+ }
+
+ @Test
+ void fromManifestFileForDataManifest() {
+ GenericManifestFile manifest = v4ManifestFile(ManifestContent.DATA, 4, 6L);
+ TrackedFile result = ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, 1000L);
+
+ assertThat(result.contentType()).isEqualTo(FileContent.DATA_MANIFEST);
+ assertThat(result.formatVersion()).isEqualTo(4);
+ assertThat(result.location()).isEqualTo(MANIFEST_PATH);
+ assertThat(result.tracking().firstRowId()).isEqualTo(1000L);
+ // record_count for a manifest-reference content_entry is sourced from the manifest's
+ // persisted record_count (v4) or summed from per-status file counts (pre-v4). Data-row totals
+ // live in manifest_info, not in this field.
+ assertThat(result.recordCount()).isEqualTo(6L);
+ assertThat(result.manifestInfo()).isNotNull();
+ assertThat(result.manifestInfo().addedFilesCount()).isEqualTo(2);
+ assertThat(result.manifestInfo().existingFilesCount()).isEqualTo(3);
+ assertThat(result.manifestInfo().deletedFilesCount()).isEqualTo(1);
+ assertThat(result.manifestInfo().addedRowsCount()).isEqualTo(200L);
+ assertThat(result.manifestInfo().existingRowsCount()).isEqualTo(300L);
+ assertThat(result.manifestInfo().deletedRowsCount()).isEqualTo(100L);
+ // replaced/modified counts default to 0 when the source manifest does not track them
+ assertThat(result.manifestInfo().replacedFilesCount()).isEqualTo(0);
+ assertThat(result.manifestInfo().replacedRowsCount()).isEqualTo(0L);
+ }
+
+ @Test
+ void fromManifestFileUsesPersistedRecordCount() {
+ // When the source manifest carries a persisted record_count (e.g., a v4+ leaf writer that
+ // tracked MODIFIED entries, or a row projected from a content_entry on read), the adapter
+ // must propagate it instead of recomputing from per-status file counts. The persisted value
+ // here (99) deliberately differs from the per-status sum (2 + 3 + 1 = 6) to prove which
+ // path is taken.
+ GenericManifestFile manifest = v4ManifestFile(ManifestContent.DATA, 4, 99L);
+
+ TrackedFile result = ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null);
+
+ assertThat(result.recordCount()).isEqualTo(99L);
+ }
+
+ @Test
+ void fromManifestFileRejectsV4ManifestWithoutRecordCount() {
+ // v4+ manifests must carry a persisted record_count: the per-status sum cannot reflect
+ // MODIFIED entries, so v4+ writers and root-manifest readers populate it directly.
+ GenericManifestFile manifest =
+ v4ManifestFile(ManifestContent.DATA, 4, null /* recordCount intentionally null */);
+ assertThatThrownBy(
+ () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("record_count must be set");
+ }
+
+ @Test
+ void fromManifestFileForDeleteManifestPreV4() {
+ // Pre-v4 manifests have formatVersion=0 by default (not tracked in the v1-v3 manifest list
+ // schema, so reads land on the interface default). The adapter dispatches to the legacy sum.
+ ManifestFile manifest = manifestFile(ManifestContent.DELETES);
+ TrackedFile result =
+ ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.EXISTING, null);
+
+ assertThat(result.contentType()).isEqualTo(FileContent.DELETE_MANIFEST);
+ assertThat(result.formatVersion()).isEqualTo(0);
+ assertThat(result.recordCount()).isEqualTo(6L);
+ assertThat(result.tracking().status()).isEqualTo(EntryStatus.EXISTING);
+ assertThat(result.tracking().firstRowId()).isNull();
+ }
+
+ @Test
+ void fromManifestFileRejectsFirstRowIdOnDeleteManifest() {
+ ManifestFile manifest = manifestFile(ManifestContent.DELETES);
+
+ assertThatThrownBy(
+ () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, 100L))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("firstRowId is only valid for DATA manifests");
+ }
+
+ @Test
+ void fromManifestFileRejectsInvalidFormatVersion() {
+ // The manifest's intrinsic format_version is the source of truth. Values strictly between 0
+ // and 4 (1, 2, 3) and negative values have no defined meaning at the v4+ content_entry layer
+ // and are rejected.
+ GenericManifestFile manifest = v4ManifestFile(ManifestContent.DATA, 2, null);
+ assertThatThrownBy(
+ () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid manifest format_version: 2");
+ }
+
+ @Test
+ void fromManifestFileRejectsUnassignedMinSequenceNumber() {
+ ManifestFile manifest = manifestFile(ManifestContent.DATA, ManifestWriter.UNASSIGNED_SEQ);
+ assertThatThrownBy(
+ () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("min_sequence_number is unassigned");
+ }
+
+ @Test
+ void fromManifestFileRejectsUnassignedSequenceNumber() {
+ ManifestFile manifest =
+ manifestFileWithSequenceNumbers(
+ ManifestContent.DATA, ManifestWriter.UNASSIGNED_SEQ, 4L /* minSeq, valid */);
+ assertThatThrownBy(
+ () -> ContentEntryAdapters.fromManifestFile(manifest, EntryStatus.ADDED, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("sequence_number is unassigned");
+ }
+
+ @Test
+ void fromManifestFileRejectsNullManifest() {
+ assertThatThrownBy(() -> ContentEntryAdapters.fromManifestFile(null, EntryStatus.ADDED, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid manifest file: null");
+ }
+
+ @Test
+ void fromDataFileRejectsNullEntry() {
+ assertThatThrownBy(
+ () ->
+ ContentEntryAdapters.fromDataFile(
+ null, SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid manifest entry: null");
+ }
+
+ @Test
+ void fromDataFileRejectsNullStatus() {
+ assertThatThrownBy(
+ () ->
+ ContentEntryAdapters.fromDataFile(
+ addedDataEntry(), SCHEMA, UNION_PARTITION_TYPE, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid status: null");
+ }
+
+ @Test
+ void fromDeleteFileRejectsV3DeleteVector() {
+ // A v3 delete vector is shaped as POSITION_DELETES stored in a Puffin file. v4+ colocates DVs
+ // on the data file's content_entry, so this should be rejected at the delete-manifest writer
+ // boundary.
+ DeleteFile dv =
+ new GenericDeleteFile(
+ UNPARTITIONED.specId(),
+ FileContent.POSITION_DELETES,
+ DELETE_PATH,
+ FileFormat.PUFFIN,
+ EMPTY_PARTITION,
+ 512L,
+ new Metrics(10L, null, null, null, null),
+ null,
+ null,
+ null,
+ null,
+ DATA_PATH,
+ 0L,
+ 512L);
+ GenericManifestEntry entry =
+ new GenericManifestEntry<>(
+ ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct());
+ entry.wrapAppend(SNAPSHOT_ID, dv);
+
+ assertThatThrownBy(
+ () ->
+ ContentEntryAdapters.fromDeleteFile(
+ entry, SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid content for delete file: POSITION_DELETES");
+ }
+
+ @Test
+ void fromDeleteFileRejectsV2PositionDeleteFile() {
+ // A v2 standalone position delete file is shaped as POSITION_DELETES stored in Parquet/Avro/ORC
+ // (anything other than Puffin). It has no v4+ representation; carry it over only via a legacy
+ // v3 manifest with format_version=0.
+ DeleteFile positionDelete =
+ new GenericDeleteFile(
+ UNPARTITIONED.specId(),
+ FileContent.POSITION_DELETES,
+ DELETE_PATH,
+ FileFormat.PARQUET,
+ EMPTY_PARTITION,
+ 512L,
+ new Metrics(10L, null, null, null, null),
+ null,
+ null,
+ null,
+ null,
+ DATA_PATH,
+ null /* contentOffset */,
+ null /* contentSizeInBytes */);
+ GenericManifestEntry entry =
+ new GenericManifestEntry<>(
+ ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct());
+ entry.wrapAppend(SNAPSHOT_ID, positionDelete);
+
+ assertThatThrownBy(
+ () ->
+ ContentEntryAdapters.fromDeleteFile(
+ entry, SCHEMA, UNION_PARTITION_TYPE, EntryStatus.ADDED))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid content for delete file: POSITION_DELETES");
+ }
+
+ private static void assertAddedTracking(TrackedFile result, long snapshotId) {
+ assertThat(result.tracking().status()).isEqualTo(EntryStatus.ADDED);
+ assertThat(result.tracking().snapshotId()).isEqualTo(snapshotId);
+ assertThat(result.tracking().dataSequenceNumber()).isNull();
+ assertThat(result.tracking().fileSequenceNumber()).isNull();
+ }
+
+ private static void assertExistingTracking(
+ TrackedFile result, long snapshotId, long dataSeq, long fileSeq) {
+ assertThat(result.tracking().status()).isEqualTo(EntryStatus.EXISTING);
+ assertThat(result.tracking().snapshotId()).isEqualTo(snapshotId);
+ assertThat(result.tracking().dataSequenceNumber()).isEqualTo(dataSeq);
+ assertThat(result.tracking().fileSequenceNumber()).isEqualTo(fileSeq);
+ }
+
+ private static void assertDeletedTracking(
+ TrackedFile result, long snapshotId, long dataSeq, long fileSeq) {
+ assertThat(result.tracking().status()).isEqualTo(EntryStatus.DELETED);
+ assertThat(result.tracking().snapshotId()).isEqualTo(snapshotId);
+ assertThat(result.tracking().dataSequenceNumber()).isEqualTo(dataSeq);
+ assertThat(result.tracking().fileSequenceNumber()).isEqualTo(fileSeq);
+ }
+
+ private static void assertCommonDataFields(TrackedFile result, DataFile file) {
+ assertThat(result.contentType()).isEqualTo(FileContent.DATA);
+ assertThat(result.formatVersion()).isEqualTo(4);
+ assertThat(result.location()).isEqualTo(file.location());
+ assertThat(result.fileFormat()).isEqualTo(file.format());
+ assertThat(result.recordCount()).isEqualTo(file.recordCount());
+ assertThat(result.fileSizeInBytes()).isEqualTo(file.fileSizeInBytes());
+ assertThat(result.specId()).isEqualTo(file.specId());
+ assertThat(result.partition().size()).isEqualTo(0);
+ assertThat(result.splitOffsets()).isEqualTo(file.splitOffsets());
+ }
+
+ private static void assertCommonDeleteFields(TrackedFile result, DeleteFile file) {
+ assertThat(result.contentType()).isEqualTo(FileContent.EQUALITY_DELETES);
+ assertThat(result.formatVersion()).isEqualTo(4);
+ assertThat(result.location()).isEqualTo(file.location());
+ assertThat(result.fileFormat()).isEqualTo(file.format());
+ assertThat(result.recordCount()).isEqualTo(file.recordCount());
+ assertThat(result.fileSizeInBytes()).isEqualTo(file.fileSizeInBytes());
+ assertThat(result.specId()).isEqualTo(file.specId());
+ assertThat(result.partition().size()).isEqualTo(0);
+ assertThat(result.equalityIds())
+ .containsExactlyElementsOf(ImmutableList.copyOf(file.equalityFieldIds()));
+ }
+
+ private static DataFile dataFile() {
+ return new GenericDataFile(
+ UNPARTITIONED.specId(),
+ DATA_PATH,
+ FileFormat.PARQUET,
+ EMPTY_PARTITION,
+ 1024L,
+ new Metrics(100L, null, null, null, null),
+ null,
+ ImmutableList.of(0L),
+ null,
+ null);
+ }
+
+ private static DataFile dataFileWithMetrics() {
+ return new GenericDataFile(
+ UNPARTITIONED.specId(),
+ DATA_PATH,
+ FileFormat.PARQUET,
+ EMPTY_PARTITION,
+ 1024L,
+ METRICS_WITH_BOUNDS,
+ null,
+ ImmutableList.of(0L),
+ null,
+ null);
+ }
+
+ private static DeleteFile equalityDeleteFile() {
+ return new GenericDeleteFile(
+ UNPARTITIONED.specId(),
+ FileContent.EQUALITY_DELETES,
+ DELETE_PATH,
+ FileFormat.PARQUET,
+ EMPTY_PARTITION,
+ 512L,
+ new Metrics(50L, null, null, null, null),
+ new int[] {1},
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+
+ private static ManifestEntry addedDataEntry() {
+ return wrapAdded(dataFile());
+ }
+
+ private static ManifestEntry existingDataEntry() {
+ GenericManifestEntry entry =
+ new GenericManifestEntry<>(
+ ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct());
+ entry.wrapExisting(SNAPSHOT_ID, DATA_SEQ, FILE_SEQ, dataFile());
+ return entry;
+ }
+
+ private static ManifestEntry addedEqualityDeleteEntry() {
+ GenericManifestEntry entry =
+ new GenericManifestEntry<>(
+ ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct());
+ entry.wrapAppend(SNAPSHOT_ID, equalityDeleteFile());
+ return entry;
+ }
+
+ private static ManifestEntry existingEqualityDeleteEntry() {
+ GenericManifestEntry entry =
+ new GenericManifestEntry<>(
+ ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct());
+ entry.wrapExisting(SNAPSHOT_ID, DATA_SEQ, FILE_SEQ, equalityDeleteFile());
+ return entry;
+ }
+
+ private static ManifestEntry wrapAdded(DataFile file) {
+ GenericManifestEntry entry =
+ new GenericManifestEntry<>(
+ ManifestEntry.getSchema(UNPARTITIONED.partitionType()).asStruct());
+ entry.wrapAppend(SNAPSHOT_ID, file);
+ return entry;
+ }
+
+ private static ManifestFile manifestFile(ManifestContent content) {
+ return manifestFile(content, 5L /* sequenceNumber */, 4L /* minSequenceNumber */);
+ }
+
+ private static ManifestFile manifestFile(ManifestContent content, long minSequenceNumber) {
+ return manifestFile(content, 5L /* sequenceNumber */, minSequenceNumber);
+ }
+
+ private static ManifestFile manifestFileWithSequenceNumbers(
+ ManifestContent content, long sequenceNumber, long minSequenceNumber) {
+ return manifestFile(content, sequenceNumber, minSequenceNumber);
+ }
+
+ private static GenericManifestFile v4ManifestFile(
+ ManifestContent content, int formatVersion, Long recordCount) {
+ List partitions = ImmutableList.of();
+ return new GenericManifestFile(
+ MANIFEST_PATH,
+ 2048L,
+ UNPARTITIONED.specId(),
+ content,
+ 5L /* sequenceNumber */,
+ 4L /* minSequenceNumber */,
+ SNAPSHOT_ID,
+ partitions,
+ null,
+ 2 /* addedFilesCount */,
+ 200L /* addedRowsCount */,
+ 3 /* existingFilesCount */,
+ 300L /* existingRowsCount */,
+ 1 /* deletedFilesCount */,
+ 100L /* deletedRowsCount */,
+ null /* firstRowId */,
+ recordCount,
+ formatVersion);
+ }
+
+ private static ManifestFile manifestFile(
+ ManifestContent content, long sequenceNumber, long minSequenceNumber) {
+ List partitions = ImmutableList.of();
+ return new GenericManifestFile(
+ MANIFEST_PATH,
+ 2048L,
+ UNPARTITIONED.specId(),
+ content,
+ sequenceNumber,
+ minSequenceNumber,
+ SNAPSHOT_ID,
+ partitions,
+ null,
+ 2 /* addedFilesCount */,
+ 200L /* addedRowsCount */,
+ 3 /* existingFilesCount */,
+ 300L /* existingRowsCount */,
+ 1 /* deletedFilesCount */,
+ 100L /* deletedRowsCount */,
+ null);
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFile.java b/core/src/test/java/org/apache/iceberg/TestTrackedFile.java
index 170c01ef7dc4..e5c3ba8a247c 100644
--- a/core/src/test/java/org/apache/iceberg/TestTrackedFile.java
+++ b/core/src/test/java/org/apache/iceberg/TestTrackedFile.java
@@ -109,11 +109,11 @@ public void schemaWithContentStatsReflectsInput() {
}
@Test
- public void schemaWithContentStatsPartitionIsRequired() {
+ public void schemaWithContentStatsPartitionIsOptional() {
Types.StructType type = TrackedFile.schemaWithContentStats(PARTITION_TYPE, CONTENT_STATS_TYPE);
Types.NestedField partitionField = type.field(TrackedFile.PARTITION_ID);
- assertThat(partitionField.isRequired()).isTrue();
+ assertThat(partitionField.isOptional()).isTrue();
assertThat(partitionField.name()).isEqualTo(TrackedFile.PARTITION_NAME);
assertThat(partitionField.doc()).isEqualTo(TrackedFile.PARTITION_DOC);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java b/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java
index d6b0701fdfd7..91ac83ca0dc7 100644
--- a/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java
+++ b/core/src/test/java/org/apache/iceberg/TestTrackedFileBuilder.java
@@ -91,8 +91,7 @@ private static Stream missingRequiredFieldCases() {
Arguments.of("location", "Missing required field: location"),
Arguments.of("fileFormat", "Missing required field: file format"),
Arguments.of("recordCount", "Missing required field: record count"),
- Arguments.of("fileSizeInBytes", "Missing required field: file size in bytes"),
- Arguments.of("partition", "Missing required field: partition data"));
+ Arguments.of("fileSizeInBytes", "Missing required field: file size in bytes"));
}
@ParameterizedTest
@@ -121,6 +120,37 @@ public void missingRequiredFields(String missingField, String expectedMessage) {
.hasMessage(expectedMessage);
}
+ @Test
+ public void missingPartitionFailsOnlyForDataAndEqualityDelete() {
+ // Partition is optional on content_entry rows; the builder rejects a null partition only when
+ // the content type is DATA or EQUALITY_DELETES. Manifest references (DATA_MANIFEST /
+ // DELETE_MANIFEST) leave partition null.
+ TrackedFileBuilder dataBuilder =
+ builderWithMissingRequiredField(TrackedFileBuilder.data(50L), "partition");
+ TrackedFileBuilder equalityDeleteBuilder =
+ builderWithMissingRequiredField(TrackedFileBuilder.equalityDelete(50L), "partition");
+ assertThatThrownBy(dataBuilder::build)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Partition is required for content type DATA");
+ assertThatThrownBy(equalityDeleteBuilder::build)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Partition is required for content type EQUALITY_DELETES");
+
+ // Manifest builders should not complain about a missing partition; they fail with the next
+ // missing required field (manifest info). Manifest reference rows leave the partition as the
+ // builder's empty-struct default (interpreted by readers as null per the optional schema).
+ TrackedFile dataManifest =
+ builderWithMissingRequiredField(TrackedFileBuilder.dataManifest(50L), "partition")
+ .manifestInfo(MANIFEST_INFO)
+ .build();
+ TrackedFile deleteManifest =
+ builderWithMissingRequiredField(TrackedFileBuilder.deleteManifest(50L), "partition")
+ .manifestInfo(MANIFEST_INFO)
+ .build();
+ assertThat(dataManifest.partition().size()).isEqualTo(0);
+ assertThat(deleteManifest.partition().size()).isEqualTo(0);
+ }
+
private TrackedFileBuilder builderWithMissingRequiredField(
TrackedFileBuilder builder, String missingField) {
if (!"formatVersion".equals(missingField)) {
@@ -182,11 +212,13 @@ private static Stream nonEqualityDeleteBuilders() {
Arguments.of(TrackedFileBuilder.data(10L), FileContent.DATA),
Arguments.of(TrackedFileBuilder.dataManifest(10L), FileContent.DATA_MANIFEST),
Arguments.of(TrackedFileBuilder.deleteManifest(10L), FileContent.DELETE_MANIFEST),
- Arguments.of(TrackedFileBuilder.from(sourceData(12L), 20L), FileContent.DATA),
+ Arguments.of(TrackedFileBuilder.from(inheritedSourceData(12L, 5L), 20L), FileContent.DATA),
Arguments.of(
- TrackedFileBuilder.from(sourceDataManifest(21L), 25L), FileContent.DATA_MANIFEST),
+ TrackedFileBuilder.from(inheritedSourceDataManifest(21L, 5L), 25L),
+ FileContent.DATA_MANIFEST),
Arguments.of(
- TrackedFileBuilder.from(sourceDeleteManifest(12L), 20L), FileContent.DELETE_MANIFEST));
+ TrackedFileBuilder.from(inheritedSourceDeleteManifest(12L, 5L), 20L),
+ FileContent.DELETE_MANIFEST));
}
@ParameterizedTest
@@ -206,11 +238,14 @@ private static Stream nonDataBuilders() {
Arguments.of(TrackedFileBuilder.dataManifest(10L), FileContent.DATA_MANIFEST),
Arguments.of(TrackedFileBuilder.deleteManifest(10L), FileContent.DELETE_MANIFEST),
Arguments.of(
- TrackedFileBuilder.from(sourceEqualityDelete(12L), 20L), FileContent.EQUALITY_DELETES),
+ TrackedFileBuilder.from(inheritedSourceEqualityDelete(12L, 5L), 20L),
+ FileContent.EQUALITY_DELETES),
Arguments.of(
- TrackedFileBuilder.from(sourceDataManifest(21L), 25L), FileContent.DATA_MANIFEST),
+ TrackedFileBuilder.from(inheritedSourceDataManifest(21L, 5L), 25L),
+ FileContent.DATA_MANIFEST),
Arguments.of(
- TrackedFileBuilder.from(sourceDeleteManifest(12L), 20L), FileContent.DELETE_MANIFEST));
+ TrackedFileBuilder.from(inheritedSourceDeleteManifest(12L, 5L), 20L),
+ FileContent.DELETE_MANIFEST));
}
@ParameterizedTest
@@ -247,9 +282,10 @@ private static Stream nonManifestBuilders() {
return Stream.of(
Arguments.of(TrackedFileBuilder.data(10L), FileContent.DATA),
Arguments.of(TrackedFileBuilder.equalityDelete(10L), FileContent.EQUALITY_DELETES),
- Arguments.of(TrackedFileBuilder.from(sourceData(12L), 20L), FileContent.DATA),
+ Arguments.of(TrackedFileBuilder.from(inheritedSourceData(12L, 5L), 20L), FileContent.DATA),
Arguments.of(
- TrackedFileBuilder.from(sourceEqualityDelete(12L), 20L), FileContent.EQUALITY_DELETES));
+ TrackedFileBuilder.from(inheritedSourceEqualityDelete(12L, 5L), 20L),
+ FileContent.EQUALITY_DELETES));
}
@ParameterizedTest
@@ -721,6 +757,44 @@ public void addingSameDeletionVectorFails() {
.hasMessage("The same deletion vector already added");
}
+ @Test
+ public void buildWithExplicitTrackingFields() {
+ // Build a tracking row via the explicitTracking factory (e.g., for a manifest reference with
+ // explicit data/file seq numbers), bypassing the added()/from(source) tracking-derivation
+ // path. build() resolves the supplied TrackingBuilder.
+ TrackedFile result =
+ TrackedFileBuilder.explicitTracking(
+ FileContent.DATA_MANIFEST,
+ TrackingBuilder.forContentEntry(EntryStatus.EXISTING, 100L, 7L, 9L, 1000L))
+ .formatVersion(FORMAT_VERSION_V4)
+ .location("s3://bucket/data/manifest.avro")
+ .fileFormat(FileFormat.AVRO)
+ .recordCount(420L)
+ .fileSizeInBytes(556L)
+ .partition(PARTITION_DATA)
+ .manifestInfo(MANIFEST_INFO)
+ .build();
+
+ assertThat(result.tracking().status()).isEqualTo(EntryStatus.EXISTING);
+ assertThat(result.tracking().snapshotId()).isEqualTo(100L);
+ assertThat(result.tracking().dataSequenceNumber()).isEqualTo(7L);
+ assertThat(result.tracking().fileSequenceNumber()).isEqualTo(9L);
+ assertThat(result.tracking().firstRowId()).isEqualTo(1000L);
+ assertThat(result.tracking().dvSnapshotId()).isNull();
+ assertThat(result.tracking().deletedPositions()).isNull();
+ assertThat(result.tracking().replacedPositions()).isNull();
+ }
+
+ @Test
+ public void explicitTrackingRejectsNull() {
+ assertThatThrownBy(() -> TrackedFileBuilder.explicitTracking(FileContent.DATA, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid tracking builder: null");
+ assertThatThrownBy(() -> TrackedFileBuilder.explicitTracking(null, TrackingBuilder.added(100L)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid content type: null");
+ }
+
private static Stream nonManifestSources() {
return Stream.of(
Arguments.of(sourceData(10L), FileContent.DATA),
@@ -830,6 +904,22 @@ private static TrackedFile entryWithInheritedSeqNums(TrackedFile entry, long seq
return entry;
}
+ private static TrackedFile inheritedSourceData(long snapshotId, long sequenceNumber) {
+ return entryWithInheritedSeqNums(sourceData(snapshotId), sequenceNumber);
+ }
+
+ private static TrackedFile inheritedSourceEqualityDelete(long snapshotId, long sequenceNumber) {
+ return entryWithInheritedSeqNums(sourceEqualityDelete(snapshotId), sequenceNumber);
+ }
+
+ private static TrackedFile inheritedSourceDataManifest(long snapshotId, long sequenceNumber) {
+ return entryWithInheritedSeqNums(sourceDataManifest(snapshotId), sequenceNumber);
+ }
+
+ private static TrackedFile inheritedSourceDeleteManifest(long snapshotId, long sequenceNumber) {
+ return entryWithInheritedSeqNums(sourceDeleteManifest(snapshotId), sequenceNumber);
+ }
+
/**
* Verifies that fields in entry are the same as in source. Note, snapshot ID can't be verified
* here, because based on the entry's status it is either carried over or not.
diff --git a/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java b/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java
index 0d1803022a23..32995b3dc61c 100644
--- a/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java
+++ b/core/src/test/java/org/apache/iceberg/TestTrackingStruct.java
@@ -296,6 +296,66 @@ void testSourceDvPositionsAreNotCarriedForward() {
assertThat(replaced.replacedPositions()).isNull();
}
+ @Test
+ void testForContentEntryManifestReference() {
+ // Manifest references carry explicit (not inherited) data/file sequence numbers and an
+ // optional first-row-id. DV and position-bitmap fields are always null for a manifest
+ // reference.
+ Tracking tracking =
+ TrackingBuilder.forContentEntry(EntryStatus.ADDED, 42L, 7L, 9L, 1000L).build();
+
+ assertThat(tracking.status()).isEqualTo(EntryStatus.ADDED);
+ assertThat(tracking.snapshotId()).isEqualTo(42L);
+ assertThat(tracking.dataSequenceNumber()).isEqualTo(7L);
+ assertThat(tracking.fileSequenceNumber()).isEqualTo(9L);
+ assertThat(tracking.firstRowId()).isEqualTo(1000L);
+ assertThat(tracking.dvSnapshotId()).isNull();
+ assertThat(tracking.deletedPositions()).isNull();
+ assertThat(tracking.replacedPositions()).isNull();
+ }
+
+ @Test
+ void testForContentEntryManifestReferenceNullFirstRowId() {
+ // Delete manifests carry null first-row-id (row lineage is data-only).
+ Tracking tracking =
+ TrackingBuilder.forContentEntry(EntryStatus.ADDED, 42L, 7L, 9L, null).build();
+
+ assertThat(tracking.firstRowId()).isNull();
+ }
+
+ @Test
+ void testForContentEntryLegacyProjection() {
+ // Content-file rows projected from a legacy ManifestEntry carry sequence numbers from the
+ // legacy entry. DV and position-bitmap fields are always null for the projected row.
+ Tracking tracking =
+ TrackingBuilder.forContentEntry(EntryStatus.EXISTING, 42L, 7L, 9L, 1000L).build();
+
+ assertThat(tracking.status()).isEqualTo(EntryStatus.EXISTING);
+ assertThat(tracking.snapshotId()).isEqualTo(42L);
+ assertThat(tracking.dataSequenceNumber()).isEqualTo(7L);
+ assertThat(tracking.fileSequenceNumber()).isEqualTo(9L);
+ assertThat(tracking.firstRowId()).isEqualTo(1000L);
+ assertThat(tracking.dvSnapshotId()).isNull();
+ assertThat(tracking.deletedPositions()).isNull();
+ assertThat(tracking.replacedPositions()).isNull();
+ }
+
+ @Test
+ void testForContentEntryLegacyProjectionNullFirstRowId() {
+ // Delete files and entries that inherit from the manifest counter carry null first-row-id.
+ Tracking tracking =
+ TrackingBuilder.forContentEntry(EntryStatus.EXISTING, 42L, 7L, 9L, null).build();
+
+ assertThat(tracking.firstRowId()).isNull();
+ }
+
+ @Test
+ void testForContentEntryRejectsNullStatus() {
+ assertThatThrownBy(() -> TrackingBuilder.forContentEntry(null, 42L, 7L, 9L, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid status: null");
+ }
+
@Test
void testDvUpdatedProducesModifiedAndAdvancesDvSnapshotId() {
Tracking source = sourceTracking();