diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java b/core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java index e0feafeda246..24b3c013dc40 100644 --- a/core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java +++ b/core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java @@ -24,7 +24,10 @@ import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -/** Adapts {@link TrackedFile} entries to the {@link DataFile} and {@link DeleteFile} APIs. */ +/** + * Adapts {@link TrackedFile} entries to the {@link DataFile}, {@link DeleteFile}, and {@link + * ManifestFile} APIs. + */ class TrackedFileAdapters { private TrackedFileAdapters() {} @@ -53,6 +56,15 @@ static DeleteFile asEqualityDeleteFile(TrackedFile file, Map> implements ContentFile { @@ -405,6 +417,120 @@ public DeleteFile copyWithStats(Set requestedColumnIds) { } } + /** + * Adapts a TrackedFile to {@link ManifestFile} + */ + private static class TrackedManifestFile implements ManifestFile { + private final TrackedFile file; + + private TrackedManifestFile(TrackedFile file) { + Preconditions.checkArgument( + file.tracking().dataSequenceNumber() != null, "Invalid data sequence number: null"); + Preconditions.checkArgument( + file.tracking().snapshotId() != null, "Invalid snapshot ID: null"); + Preconditions.checkArgument( + file.manifestInfo().dv() == null, + "Cannot adapt manifest with a deletion vector to ManifestFile: %s", + file.location()); + this.file = file; + } + + @Override + public String path() { + return file.location(); + } + + @Override + public long length() { + return file.fileSizeInBytes(); + } + + @Override + public int partitionSpecId() { + throw new UnsupportedOperationException( + "v4 manifests are not bound to a single partition spec"); + } + + @Override + public ManifestContent content() { + switch (file.contentType()) { + case DATA_MANIFEST: + return ManifestContent.DATA; + case DELETE_MANIFEST: + return ManifestContent.DELETES; + default: + throw new UnsupportedOperationException( + "Unsupported content type for manifests: " + file.contentType()); + } + } + + @Override + public long sequenceNumber() { + return file.tracking().dataSequenceNumber(); + } + + @Override + public long minSequenceNumber() { + return file.manifestInfo().minSequenceNumber(); + } + + @Override + public Long snapshotId() { + return file.tracking().snapshotId(); + } + + @Override + public Integer addedFilesCount() { + return file.manifestInfo().addedFilesCount(); + } + + @Override + public Long addedRowsCount() { + return file.manifestInfo().addedRowsCount(); + } + + @Override + public Integer existingFilesCount() { + return file.manifestInfo().existingFilesCount(); + } + + @Override + public Long existingRowsCount() { + return file.manifestInfo().existingRowsCount(); + } + + @Override + public Integer deletedFilesCount() { + return file.manifestInfo().deletedFilesCount(); + } + + @Override + public Long deletedRowsCount() { + return file.manifestInfo().deletedRowsCount(); + } + + // v4 does not store partition summaries on manifests, so return null. + @Override + public List partitions() { + return null; + } + + @Override + public ByteBuffer keyMetadata() { + return file.keyMetadata(); + } + + @Override + public Long firstRowId() { + return file.tracking().firstRowId(); + } + + @Override + public ManifestFile copy() { + return new TrackedManifestFile(file.copy()); + } + } + private static PartitionSpec resolveSpec( TrackedFile file, Map specsById) { Integer specId = file.specId(); diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFileAdapters.java b/core/src/test/java/org/apache/iceberg/TestTrackedFileAdapters.java index f4d5675ee94a..394f387fd6a5 100644 --- a/core/src/test/java/org/apache/iceberg/TestTrackedFileAdapters.java +++ b/core/src/test/java/org/apache/iceberg/TestTrackedFileAdapters.java @@ -38,12 +38,16 @@ class TestTrackedFileAdapters { private static final String MANIFEST_LOCATION = "s3://bucket/table/manifest.parquet"; private static final String DATA_FILE_LOCATION = "s3://bucket/data/file.parquet"; private static final String DV_LOCATION = "s3://bucket/puffin/dv-file.bin"; + private static final String MANIFEST_FILE_LOCATION = "s3://bucket/table/manifests/m0.parquet"; // Tracking values that the delegation tests validate. private static final long MANIFEST_POS = 3L; private static final long DATA_SEQUENCE_NUMBER = 10L; private static final long FILE_SEQUENCE_NUMBER = 11L; private static final long FIRST_ROW_ID = 1000L; + private static final long SNAPSHOT_ID = 42L; + private static final long MANIFEST_RECORD_COUNT = 8L; + private static final long MANIFEST_FILE_SIZE = 2048L; private static final int UNPARTITIONED_SPEC_ID = PartitionSpec.unpartitioned().specId(); private static final Map UNPARTITIONED = @@ -58,8 +62,10 @@ class TestTrackedFileAdapters { .withSpecId(PARTITIONED_SPEC_ID) .build(); private static final PartitionData PARTITION = partition("books"); + private static final PartitionData EMPTY_PARTITION = new PartitionData(Types.StructType.of()); // Tracking field ordinals, looked up from the schema so the tests do not hard-code offsets. + private static final int SNAPSHOT_ID_ORDINAL = ordinalOf(Tracking.schema(), "snapshot_id"); private static final int DATA_SEQUENCE_NUMBER_ORDINAL = ordinalOf(Tracking.schema(), "sequence_number"); private static final int FILE_SEQUENCE_NUMBER_ORDINAL = @@ -276,6 +282,154 @@ void testDVDeleteFileAdapterRejectsNullDeletionVector() { .hasMessage("Cannot create DV delete file: no deletion vector"); } + @ParameterizedTest + @EnumSource( + value = FileContent.class, + names = {"DATA_MANIFEST", "DELETE_MANIFEST"}) + void testManifestFileAdapterDelegation(FileContent contentType) { + ManifestInfo manifestInfo = createManifestInfo(); + TrackedFile file = + manifestBuilder(contentType) + .manifestInfo(manifestInfo) + .keyMetadata(ByteBuffer.wrap(new byte[] {7, 8, 9})) + .build(); + populateTrackingFields(file); + + ManifestFile manifest = TrackedFileAdapters.asManifestFile(file); + + ManifestContent expectedContent = + contentType == FileContent.DATA_MANIFEST ? ManifestContent.DATA : ManifestContent.DELETES; + assertThat(manifest.path()).isEqualTo(MANIFEST_FILE_LOCATION); + assertThat(manifest.length()).isEqualTo(MANIFEST_FILE_SIZE); + assertThat(manifest.content()).isEqualTo(expectedContent); + assertThat(manifest.copy().content()).isEqualTo(expectedContent); + assertThat(manifest.sequenceNumber()).isEqualTo(DATA_SEQUENCE_NUMBER); + assertThat(manifest.minSequenceNumber()).isEqualTo(manifestInfo.minSequenceNumber()); + assertThat(manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat(manifest.addedFilesCount()).isEqualTo(manifestInfo.addedFilesCount()); + assertThat(manifest.addedRowsCount()).isEqualTo(manifestInfo.addedRowsCount()); + assertThat(manifest.existingFilesCount()).isEqualTo(manifestInfo.existingFilesCount()); + assertThat(manifest.existingRowsCount()).isEqualTo(manifestInfo.existingRowsCount()); + assertThat(manifest.deletedFilesCount()).isEqualTo(manifestInfo.deletedFilesCount()); + assertThat(manifest.deletedRowsCount()).isEqualTo(manifestInfo.deletedRowsCount()); + assertThat(manifest.firstRowId()).isEqualTo(FIRST_ROW_ID); + assertThat(manifest.keyMetadata()).isEqualTo(ByteBuffer.wrap(new byte[] {7, 8, 9})); + assertThat(manifest.partitions()).isNull(); + } + + @Test + void testManifestFileAdapterPartitionSpecIdUnsupported() { + TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, createManifestInfo()); + + ManifestFile manifest = TrackedFileAdapters.asManifestFile(file); + + assertThatThrownBy(manifest::partitionSpecId) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("v4 manifests are not bound to a single partition spec"); + } + + @Test + void testManifestFileAdapterRejectsManifestWithDeletionVector() { + ManifestInfo manifestInfo = + ManifestInfoStruct.builder() + .addedFilesCount(3) + .existingFilesCount(5) + .deletedFilesCount(2) + .replacedFilesCount(0) + .addedRowsCount(300L) + .existingRowsCount(500L) + .deletedRowsCount(200L) + .replacedRowsCount(0L) + .minSequenceNumber(7L) + .dv(ByteBuffer.wrap(new byte[] {1, 2, 3})) + .dvCardinality(4L) + .build(); + TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, manifestInfo); + + assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot adapt manifest with a deletion vector to ManifestFile: %s", + MANIFEST_FILE_LOCATION); + } + + @Test + void testManifestFileAdapterReturnsNullForUnsetNullableFields() { + TrackedFile file = + manifestBuilder(FileContent.DATA_MANIFEST).manifestInfo(createManifestInfo()).build(); + // Inheritance fills the data sequence number, which the adapter requires; first row ID and + // key metadata stay unset. + ((TrackingStruct) file.tracking()).set(DATA_SEQUENCE_NUMBER_ORDINAL, DATA_SEQUENCE_NUMBER); + + ManifestFile manifest = TrackedFileAdapters.asManifestFile(file); + + assertThat(manifest.firstRowId()).isNull(); + assertThat(manifest.keyMetadata()).isNull(); + } + + @Test + void testManifestFileAdapterCopy() { + TrackedFile file = + manifestBuilder(FileContent.DATA_MANIFEST) + .manifestInfo(createManifestInfo()) + .keyMetadata(ByteBuffer.wrap(new byte[] {7, 8, 9})) + .build(); + populateTrackingFields(file); + + ManifestFile original = TrackedFileAdapters.asManifestFile(file); + ManifestFile copy = original.copy(); + + assertThat(copy.path()).isEqualTo(original.path()); + assertThat(copy.length()).isEqualTo(original.length()); + assertThat(copy.content()).isEqualTo(original.content()); + assertThat(copy.sequenceNumber()).isEqualTo(original.sequenceNumber()); + assertThat(copy.minSequenceNumber()).isEqualTo(original.minSequenceNumber()); + assertThat(copy.snapshotId()).isEqualTo(original.snapshotId()); + assertThat(copy.addedFilesCount()).isEqualTo(original.addedFilesCount()); + assertThat(copy.addedRowsCount()).isEqualTo(original.addedRowsCount()); + assertThat(copy.existingFilesCount()).isEqualTo(original.existingFilesCount()); + assertThat(copy.existingRowsCount()).isEqualTo(original.existingRowsCount()); + assertThat(copy.deletedFilesCount()).isEqualTo(original.deletedFilesCount()); + assertThat(copy.deletedRowsCount()).isEqualTo(original.deletedRowsCount()); + assertThat(copy.firstRowId()).isEqualTo(original.firstRowId()); + assertThat(copy.keyMetadata()).isEqualTo(original.keyMetadata()); + assertThat(copy.partitions()).isNull(); + assertThat(copy.keyMetadata().array()).isNotSameAs(original.keyMetadata().array()); + } + + @ParameterizedTest + @EnumSource( + value = FileContent.class, + mode = EnumSource.Mode.EXCLUDE, + names = {"DATA_MANIFEST", "DELETE_MANIFEST"}) + void testManifestFileAdapterRejectsNonManifestContent(FileContent contentType) { + TrackedFileStruct file = dummyTrackedFile(contentType); + + assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid content type for ManifestFile: %s", contentType); + } + + @Test + void testManifestFileAdapterRejectsNullDataSequenceNumber() { + TrackedFile file = + manifestBuilder(FileContent.DATA_MANIFEST).manifestInfo(createManifestInfo()).build(); + + assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid data sequence number: null"); + } + + @Test + void testManifestFileAdapterRejectsNullSnapshotId() { + TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, createManifestInfo()); + ((TrackingStruct) file.tracking()).set(SNAPSHOT_ID_ORDINAL, null); + + assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid snapshot ID: null"); + } + @Test void testNullContentStatsReturnsNullStats() { TrackedFileStruct file = dummyTrackedFile(FileContent.DATA); @@ -392,6 +546,30 @@ private static TrackedFileStruct dummyTrackedFile(FileContent contentType) { return file; } + // Builder for a manifest entry with the required non-tracking fields set. Callers add the + // manifest info and any optional fields before building. + private static TrackedFileBuilder manifestBuilder(FileContent contentType) { + TrackedFileBuilder builder = + contentType == FileContent.DATA_MANIFEST + ? TrackedFileBuilder.dataManifest(SNAPSHOT_ID) + : TrackedFileBuilder.deleteManifest(SNAPSHOT_ID); + return builder + .formatVersion(FORMAT_VERSION_V4) + .location(MANIFEST_FILE_LOCATION) + .fileFormat(FileFormat.PARQUET) + .partition(EMPTY_PARTITION) + .recordCount(MANIFEST_RECORD_COUNT) + .fileSizeInBytes(MANIFEST_FILE_SIZE); + } + + // Builds a manifest entry and simulates inheritance so the tracking fields the adapter requires + // are populated. + private static TrackedFile manifestFile(FileContent contentType, ManifestInfo manifestInfo) { + TrackedFile file = manifestBuilder(contentType).manifestInfo(manifestInfo).build(); + populateTrackingFields(file); + return file; + } + private static void populateTrackingFields(TrackedFile file) { TrackingStruct tracking = (TrackingStruct) file.tracking(); tracking.set(DATA_SEQUENCE_NUMBER_ORDINAL, DATA_SEQUENCE_NUMBER); @@ -401,6 +579,20 @@ private static void populateTrackingFields(TrackedFile file) { tracking.set(MANIFEST_POS_ORDINAL, MANIFEST_POS); } + private static ManifestInfo createManifestInfo() { + return ManifestInfoStruct.builder() + .addedFilesCount(3) + .existingFilesCount(5) + .deletedFilesCount(2) + .replacedFilesCount(0) + .addedRowsCount(300L) + .existingRowsCount(500L) + .deletedRowsCount(200L) + .replacedRowsCount(0L) + .minSequenceNumber(7L) + .build(); + } + private static DeletionVector deletionVector() { return DeletionVectorStruct.builder() .location(DV_LOCATION)