-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Core: Add ManifestFile adapter for v4 tracked files #16867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
93368cb
9974e9b
67f64ae
e3be26b
ae41f2a
73b9bfa
ae9a18e
f98a534
74f4995
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,10 @@ | |
| import java.util.Set; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| /** Adapts {@link TrackedFile} entries to the {@link DataFile} and {@link DeleteFile} APIs. */ | ||
| /** | ||
| * Adapts {@link TrackedFile} entries to the {@link DataFile}, {@link DeleteFile}, and {@link | ||
| * ManifestFile} APIs. | ||
| */ | ||
| class TrackedFileAdapters { | ||
|
|
||
| private TrackedFileAdapters() {} | ||
|
|
@@ -53,6 +56,15 @@ static DeleteFile asEqualityDeleteFile(TrackedFile file, Map<Integer, PartitionS | |
| return new TrackedEqualityDeleteFile(file, resolveSpec(file, specsById)); | ||
| } | ||
|
|
||
| static ManifestFile asManifestFile(TrackedFile file) { | ||
| Preconditions.checkArgument( | ||
| file.contentType() == FileContent.DATA_MANIFEST | ||
| || file.contentType() == FileContent.DELETE_MANIFEST, | ||
| "Invalid content type for ManifestFile: %s", | ||
| file.contentType()); | ||
| return new TrackedManifestFile(file); | ||
| } | ||
|
|
||
| /** Shared base for all tracked file adapters. */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is no longer true. How about "Shared base for data and delete file adapters"? |
||
| private abstract static class TrackedFileAdapter<F extends ContentFile<F>> | ||
| implements ContentFile<F> { | ||
|
|
@@ -405,6 +417,120 @@ public DeleteFile copyWithStats(Set<Integer> requestedColumnIds) { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Adapts a TrackedFile to {@link ManifestFile} | ||
| */ | ||
| private static class TrackedManifestFile implements ManifestFile { | ||
| private final TrackedFile file; | ||
|
|
||
| private TrackedManifestFile(TrackedFile file) { | ||
|
stevenzwu marked this conversation as resolved.
|
||
| Preconditions.checkArgument( | ||
| file.tracking().dataSequenceNumber() != null, "Invalid data sequence number: null"); | ||
| Preconditions.checkArgument( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per the spec proposal data seq num is 'Optional for leaf manifests, required for root.'. I think it's valid to have it as null here. Seems also optional in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR is for root-level manifest tracking where
It is required for a manifest file:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to add on top of what Anoop said. this is for leaf manifest file entry (TrackedFile) in the root manifest file. Hence
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation @anoopj and @stevenzwu ! |
||
| file.tracking().snapshotId() != null, "Invalid snapshot ID: null"); | ||
| Preconditions.checkArgument( | ||
| file.manifestInfo().dv() == null, | ||
| "Cannot adapt manifest with a deletion vector to ManifestFile: %s", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is correct and calls into question whether this adapter is going to be useful. We expect manifest DVs to be common and if we can't adapt a manifest with a DV back to
I think we may want to avoid adding this for now, even though it seems like a reasonable addition. In order for this to be valuable, we would need to continue using
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I will convert this into a draft and we can revisit this if it's needed as we build the reader and writer. cc @stevenzwu
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the write path, I also proposed new additions to the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we add the DV, let me know and I can revive this PR. |
||
| file.location()); | ||
| this.file = file; | ||
| } | ||
|
|
||
| @Override | ||
| public String path() { | ||
| return file.location(); | ||
| } | ||
|
|
||
| @Override | ||
| public long length() { | ||
| return file.fileSizeInBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public int partitionSpecId() { | ||
| throw new UnsupportedOperationException( | ||
| "v4 manifests are not bound to a single partition spec"); | ||
| } | ||
|
|
||
| @Override | ||
| public ManifestContent content() { | ||
| switch (file.contentType()) { | ||
| case DATA_MANIFEST: | ||
| return ManifestContent.DATA; | ||
| case DELETE_MANIFEST: | ||
| return ManifestContent.DELETES; | ||
| default: | ||
| throw new UnsupportedOperationException( | ||
| "Unsupported content type for manifests: " + file.contentType()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public long sequenceNumber() { | ||
| return file.tracking().dataSequenceNumber(); | ||
| } | ||
|
|
||
| @Override | ||
| public long minSequenceNumber() { | ||
| return file.manifestInfo().minSequenceNumber(); | ||
| } | ||
|
|
||
| @Override | ||
| public Long snapshotId() { | ||
| return file.tracking().snapshotId(); | ||
| } | ||
|
|
||
| @Override | ||
| public Integer addedFilesCount() { | ||
| return file.manifestInfo().addedFilesCount(); | ||
| } | ||
|
|
||
| @Override | ||
| public Long addedRowsCount() { | ||
| return file.manifestInfo().addedRowsCount(); | ||
| } | ||
|
|
||
| @Override | ||
| public Integer existingFilesCount() { | ||
| return file.manifestInfo().existingFilesCount(); | ||
| } | ||
|
|
||
| @Override | ||
| public Long existingRowsCount() { | ||
| return file.manifestInfo().existingRowsCount(); | ||
| } | ||
|
|
||
| @Override | ||
| public Integer deletedFilesCount() { | ||
| return file.manifestInfo().deletedFilesCount(); | ||
| } | ||
|
|
||
| @Override | ||
| public Long deletedRowsCount() { | ||
| return file.manifestInfo().deletedRowsCount(); | ||
| } | ||
|
|
||
| // v4 does not store partition summaries on manifests, so return null. | ||
| @Override | ||
| public List<PartitionFieldSummary> partitions() { | ||
|
stevenzwu marked this conversation as resolved.
|
||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public ByteBuffer keyMetadata() { | ||
| return file.keyMetadata(); | ||
| } | ||
|
|
||
| @Override | ||
| public Long firstRowId() { | ||
| return file.tracking().firstRowId(); | ||
| } | ||
|
|
||
| @Override | ||
| public ManifestFile copy() { | ||
| return new TrackedManifestFile(file.copy()); | ||
| } | ||
| } | ||
|
|
||
| private static PartitionSpec resolveSpec( | ||
| TrackedFile file, Map<Integer, PartitionSpec> specsById) { | ||
| Integer specId = file.specId(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,12 +38,16 @@ class TestTrackedFileAdapters { | |
| private static final String MANIFEST_LOCATION = "s3://bucket/table/manifest.parquet"; | ||
| private static final String DATA_FILE_LOCATION = "s3://bucket/data/file.parquet"; | ||
| private static final String DV_LOCATION = "s3://bucket/puffin/dv-file.bin"; | ||
| private static final String MANIFEST_FILE_LOCATION = "s3://bucket/table/manifests/m0.parquet"; | ||
|
|
||
| // Tracking values that the delegation tests validate. | ||
| private static final long MANIFEST_POS = 3L; | ||
| private static final long DATA_SEQUENCE_NUMBER = 10L; | ||
| private static final long FILE_SEQUENCE_NUMBER = 11L; | ||
| private static final long FIRST_ROW_ID = 1000L; | ||
| private static final long SNAPSHOT_ID = 42L; | ||
| private static final long MANIFEST_RECORD_COUNT = 8L; | ||
| private static final long MANIFEST_FILE_SIZE = 2048L; | ||
|
|
||
| private static final int UNPARTITIONED_SPEC_ID = PartitionSpec.unpartitioned().specId(); | ||
| private static final Map<Integer, PartitionSpec> UNPARTITIONED = | ||
|
|
@@ -58,8 +62,10 @@ class TestTrackedFileAdapters { | |
| .withSpecId(PARTITIONED_SPEC_ID) | ||
| .build(); | ||
| private static final PartitionData PARTITION = partition("books"); | ||
| private static final PartitionData EMPTY_PARTITION = new PartitionData(Types.StructType.of()); | ||
|
|
||
| // Tracking field ordinals, looked up from the schema so the tests do not hard-code offsets. | ||
| private static final int SNAPSHOT_ID_ORDINAL = ordinalOf(Tracking.schema(), "snapshot_id"); | ||
| private static final int DATA_SEQUENCE_NUMBER_ORDINAL = | ||
| ordinalOf(Tracking.schema(), "sequence_number"); | ||
| private static final int FILE_SEQUENCE_NUMBER_ORDINAL = | ||
|
|
@@ -276,6 +282,154 @@ void testDVDeleteFileAdapterRejectsNullDeletionVector() { | |
| .hasMessage("Cannot create DV delete file: no deletion vector"); | ||
| } | ||
|
|
||
| @ParameterizedTest | ||
| @EnumSource( | ||
| value = FileContent.class, | ||
| names = {"DATA_MANIFEST", "DELETE_MANIFEST"}) | ||
| void testManifestFileAdapterDelegation(FileContent contentType) { | ||
| ManifestInfo manifestInfo = createManifestInfo(); | ||
| TrackedFile file = | ||
| manifestBuilder(contentType) | ||
| .manifestInfo(manifestInfo) | ||
| .keyMetadata(ByteBuffer.wrap(new byte[] {7, 8, 9})) | ||
| .build(); | ||
| populateTrackingFields(file); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just FYI: in the follow-up for the TrackedFileBuilder PR, we decided to drop
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack. I can rebase when the followup PR is merged.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this and either use a |
||
|
|
||
| ManifestFile manifest = TrackedFileAdapters.asManifestFile(file); | ||
|
|
||
| ManifestContent expectedContent = | ||
| contentType == FileContent.DATA_MANIFEST ? ManifestContent.DATA : ManifestContent.DELETES; | ||
| assertThat(manifest.path()).isEqualTo(MANIFEST_FILE_LOCATION); | ||
| assertThat(manifest.length()).isEqualTo(MANIFEST_FILE_SIZE); | ||
| assertThat(manifest.content()).isEqualTo(expectedContent); | ||
| assertThat(manifest.copy().content()).isEqualTo(expectedContent); | ||
| assertThat(manifest.sequenceNumber()).isEqualTo(DATA_SEQUENCE_NUMBER); | ||
| assertThat(manifest.minSequenceNumber()).isEqualTo(manifestInfo.minSequenceNumber()); | ||
| assertThat(manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); | ||
| assertThat(manifest.addedFilesCount()).isEqualTo(manifestInfo.addedFilesCount()); | ||
| assertThat(manifest.addedRowsCount()).isEqualTo(manifestInfo.addedRowsCount()); | ||
| assertThat(manifest.existingFilesCount()).isEqualTo(manifestInfo.existingFilesCount()); | ||
| assertThat(manifest.existingRowsCount()).isEqualTo(manifestInfo.existingRowsCount()); | ||
| assertThat(manifest.deletedFilesCount()).isEqualTo(manifestInfo.deletedFilesCount()); | ||
| assertThat(manifest.deletedRowsCount()).isEqualTo(manifestInfo.deletedRowsCount()); | ||
| assertThat(manifest.firstRowId()).isEqualTo(FIRST_ROW_ID); | ||
| assertThat(manifest.keyMetadata()).isEqualTo(ByteBuffer.wrap(new byte[] {7, 8, 9})); | ||
| assertThat(manifest.partitions()).isNull(); | ||
| } | ||
|
|
||
| @Test | ||
| void testManifestFileAdapterPartitionSpecIdUnsupported() { | ||
| TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, createManifestInfo()); | ||
|
|
||
| ManifestFile manifest = TrackedFileAdapters.asManifestFile(file); | ||
|
|
||
| assertThatThrownBy(manifest::partitionSpecId) | ||
| .isInstanceOf(UnsupportedOperationException.class) | ||
| .hasMessage("v4 manifests are not bound to a single partition spec"); | ||
| } | ||
|
|
||
| @Test | ||
| void testManifestFileAdapterRejectsManifestWithDeletionVector() { | ||
| ManifestInfo manifestInfo = | ||
| ManifestInfoStruct.builder() | ||
| .addedFilesCount(3) | ||
| .existingFilesCount(5) | ||
| .deletedFilesCount(2) | ||
| .replacedFilesCount(0) | ||
| .addedRowsCount(300L) | ||
| .existingRowsCount(500L) | ||
| .deletedRowsCount(200L) | ||
| .replacedRowsCount(0L) | ||
| .minSequenceNumber(7L) | ||
| .dv(ByteBuffer.wrap(new byte[] {1, 2, 3})) | ||
| .dvCardinality(4L) | ||
| .build(); | ||
| TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, manifestInfo); | ||
|
|
||
| assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessage( | ||
| "Cannot adapt manifest with a deletion vector to ManifestFile: %s", | ||
| MANIFEST_FILE_LOCATION); | ||
| } | ||
|
|
||
| @Test | ||
| void testManifestFileAdapterReturnsNullForUnsetNullableFields() { | ||
| TrackedFile file = | ||
| manifestBuilder(FileContent.DATA_MANIFEST).manifestInfo(createManifestInfo()).build(); | ||
| // Inheritance fills the data sequence number, which the adapter requires; first row ID and | ||
| // key metadata stay unset. | ||
| ((TrackingStruct) file.tracking()).set(DATA_SEQUENCE_NUMBER_ORDINAL, DATA_SEQUENCE_NUMBER); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of using the ordinal, we can use the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Routing through inheritFrom() will require building a parent |
||
|
|
||
| ManifestFile manifest = TrackedFileAdapters.asManifestFile(file); | ||
|
|
||
| assertThat(manifest.firstRowId()).isNull(); | ||
| assertThat(manifest.keyMetadata()).isNull(); | ||
| } | ||
|
|
||
| @Test | ||
| void testManifestFileAdapterCopy() { | ||
| TrackedFile file = | ||
| manifestBuilder(FileContent.DATA_MANIFEST) | ||
| .manifestInfo(createManifestInfo()) | ||
| .keyMetadata(ByteBuffer.wrap(new byte[] {7, 8, 9})) | ||
| .build(); | ||
| populateTrackingFields(file); | ||
|
|
||
| ManifestFile original = TrackedFileAdapters.asManifestFile(file); | ||
| ManifestFile copy = original.copy(); | ||
|
|
||
| assertThat(copy.path()).isEqualTo(original.path()); | ||
| assertThat(copy.length()).isEqualTo(original.length()); | ||
| assertThat(copy.content()).isEqualTo(original.content()); | ||
| assertThat(copy.sequenceNumber()).isEqualTo(original.sequenceNumber()); | ||
| assertThat(copy.minSequenceNumber()).isEqualTo(original.minSequenceNumber()); | ||
| assertThat(copy.snapshotId()).isEqualTo(original.snapshotId()); | ||
| assertThat(copy.addedFilesCount()).isEqualTo(original.addedFilesCount()); | ||
| assertThat(copy.addedRowsCount()).isEqualTo(original.addedRowsCount()); | ||
| assertThat(copy.existingFilesCount()).isEqualTo(original.existingFilesCount()); | ||
| assertThat(copy.existingRowsCount()).isEqualTo(original.existingRowsCount()); | ||
| assertThat(copy.deletedFilesCount()).isEqualTo(original.deletedFilesCount()); | ||
| assertThat(copy.deletedRowsCount()).isEqualTo(original.deletedRowsCount()); | ||
| assertThat(copy.firstRowId()).isEqualTo(original.firstRowId()); | ||
| assertThat(copy.keyMetadata()).isEqualTo(original.keyMetadata()); | ||
| assertThat(copy.partitions()).isNull(); | ||
| assertThat(copy.keyMetadata().array()).isNotSameAs(original.keyMetadata().array()); | ||
| } | ||
|
|
||
| @ParameterizedTest | ||
| @EnumSource( | ||
| value = FileContent.class, | ||
| mode = EnumSource.Mode.EXCLUDE, | ||
| names = {"DATA_MANIFEST", "DELETE_MANIFEST"}) | ||
| void testManifestFileAdapterRejectsNonManifestContent(FileContent contentType) { | ||
| TrackedFileStruct file = dummyTrackedFile(contentType); | ||
|
|
||
| assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessage("Invalid content type for ManifestFile: %s", contentType); | ||
| } | ||
|
|
||
| @Test | ||
| void testManifestFileAdapterRejectsNullDataSequenceNumber() { | ||
| TrackedFile file = | ||
| manifestBuilder(FileContent.DATA_MANIFEST).manifestInfo(createManifestInfo()).build(); | ||
|
|
||
| assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessage("Invalid data sequence number: null"); | ||
| } | ||
|
|
||
| @Test | ||
| void testManifestFileAdapterRejectsNullSnapshotId() { | ||
| TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, createManifestInfo()); | ||
| ((TrackingStruct) file.tracking()).set(SNAPSHOT_ID_ORDINAL, null); | ||
|
|
||
| assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessage("Invalid snapshot ID: null"); | ||
| } | ||
|
|
||
| @Test | ||
| void testNullContentStatsReturnsNullStats() { | ||
| TrackedFileStruct file = dummyTrackedFile(FileContent.DATA); | ||
|
|
@@ -392,6 +546,30 @@ private static TrackedFileStruct dummyTrackedFile(FileContent contentType) { | |
| return file; | ||
| } | ||
|
|
||
| // Builder for a manifest entry with the required non-tracking fields set. Callers add the | ||
| // manifest info and any optional fields before building. | ||
| private static TrackedFileBuilder manifestBuilder(FileContent contentType) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that the complexity here to use the builder is a good thing. This creates indirection and makes tests harder to read, while gaining almost nothing. There is hardly any readability benefit to using the builder when you're already setting constants like I think this should be removed and the constructor should be used instead. All these tests need to verify is that these values are passed through correctly. |
||
| TrackedFileBuilder builder = | ||
| contentType == FileContent.DATA_MANIFEST | ||
| ? TrackedFileBuilder.dataManifest(SNAPSHOT_ID) | ||
| : TrackedFileBuilder.deleteManifest(SNAPSHOT_ID); | ||
| return builder | ||
| .formatVersion(FORMAT_VERSION_V4) | ||
| .location(MANIFEST_FILE_LOCATION) | ||
| .fileFormat(FileFormat.PARQUET) | ||
| .partition(EMPTY_PARTITION) | ||
| .recordCount(MANIFEST_RECORD_COUNT) | ||
| .fileSizeInBytes(MANIFEST_FILE_SIZE); | ||
| } | ||
|
|
||
| // Builds a manifest entry and simulates inheritance so the tracking fields the adapter requires | ||
| // are populated. | ||
| private static TrackedFile manifestFile(FileContent contentType, ManifestInfo manifestInfo) { | ||
| TrackedFile file = manifestBuilder(contentType).manifestInfo(manifestInfo).build(); | ||
| populateTrackingFields(file); | ||
| return file; | ||
| } | ||
|
|
||
| private static void populateTrackingFields(TrackedFile file) { | ||
| TrackingStruct tracking = (TrackingStruct) file.tracking(); | ||
| tracking.set(DATA_SEQUENCE_NUMBER_ORDINAL, DATA_SEQUENCE_NUMBER); | ||
|
|
@@ -401,6 +579,20 @@ private static void populateTrackingFields(TrackedFile file) { | |
| tracking.set(MANIFEST_POS_ORDINAL, MANIFEST_POS); | ||
| } | ||
|
|
||
| private static ManifestInfo createManifestInfo() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be a constant, not a method. We don't need to create a manifest info struct every time, and a method gives no guarantees about the values produced. It could create random values every time. If we use a constant, then you can write assertions like |
||
| return ManifestInfoStruct.builder() | ||
| .addedFilesCount(3) | ||
| .existingFilesCount(5) | ||
| .deletedFilesCount(2) | ||
| .replacedFilesCount(0) | ||
| .addedRowsCount(300L) | ||
| .existingRowsCount(500L) | ||
| .deletedRowsCount(200L) | ||
| .replacedRowsCount(0L) | ||
| .minSequenceNumber(7L) | ||
| .build(); | ||
| } | ||
|
|
||
| private static DeletionVector deletionVector() { | ||
| return DeletionVectorStruct.builder() | ||
| .location(DV_LOCATION) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If comments require regular maintenance when touching the file, I prefer to rewrite them to minimize churn. This doesn't need to list everything that can be produced. Maybe edit this so that it say
for example, {@link DataFile}?