Skip to content
128 changes: 127 additions & 1 deletion core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** Adapts {@link TrackedFile} entries to the {@link DataFile} and {@link DeleteFile} APIs. */
/**
* Adapts {@link TrackedFile} entries to the {@link DataFile}, {@link DeleteFile}, and {@link
* ManifestFile} APIs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If comments require regular maintenance when touching the file, I prefer to rewrite them to minimize churn. This doesn't need to list everything that can be produced. Maybe edit this so that it say for example, {@link DataFile}?

*/
class TrackedFileAdapters {

private TrackedFileAdapters() {}
Expand Down Expand Up @@ -53,6 +56,15 @@ static DeleteFile asEqualityDeleteFile(TrackedFile file, Map<Integer, PartitionS
return new TrackedEqualityDeleteFile(file, resolveSpec(file, specsById));
}

static ManifestFile asManifestFile(TrackedFile file) {
Preconditions.checkArgument(
file.contentType() == FileContent.DATA_MANIFEST
|| file.contentType() == FileContent.DELETE_MANIFEST,
"Invalid content type for ManifestFile: %s",
file.contentType());
return new TrackedManifestFile(file);
}

/** Shared base for all tracked file adapters. */

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer true. How about "Shared base for data and delete file adapters"?

private abstract static class TrackedFileAdapter<F extends ContentFile<F>>
implements ContentFile<F> {
Expand Down Expand Up @@ -405,6 +417,120 @@ public DeleteFile copyWithStats(Set<Integer> requestedColumnIds) {
}
}

/**
* Adapts a TrackedFile to {@link ManifestFile}
*/
private static class TrackedManifestFile implements ManifestFile {
private final TrackedFile file;

private TrackedManifestFile(TrackedFile file) {
Comment thread
stevenzwu marked this conversation as resolved.
Preconditions.checkArgument(
file.tracking().dataSequenceNumber() != null, "Invalid data sequence number: null");
Preconditions.checkArgument(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the spec proposal data seq num is 'Optional for leaf manifests, required for root.'. I think it's valid to have it as null here. Seems also optional in ManifestFile

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is for root-level manifest tracking where null is truly an invalid state. We can either validate or just have it fail with an NPE.

Seems also optional in ManifestFile

It is required for a manifest file: ManifestFile.sequenceNumber() returns a primitive

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to add on top of what Anoop said. this is for leaf manifest file entry (TrackedFile) in the root manifest file. Hence dataSequenceNumber should be required

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @anoopj and @stevenzwu !

file.tracking().snapshotId() != null, "Invalid snapshot ID: null");
Preconditions.checkArgument(
file.manifestInfo().dv() == null,
"Cannot adapt manifest with a deletion vector to ManifestFile: %s",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct and calls into question whether this adapter is going to be useful. We expect manifest DVs to be common and if we can't adapt a manifest with a DV back to ManifestFile, then what is the purpose of this adapter?

ManifestFile is mostly below the API level. It is exposed in some places (accessed primarily from Snapshot), but reading and writing for correctness (scan planning and commit operations) are all done within Table APIs and even snapshot change access is moving to SnapshotChanges. You can read manifests directly using utils in core, but I don't think that is very common.

I think we may want to avoid adding this for now, even though it seems like a reasonable addition. In order for this to be valuable, we would need to continue using ManifestFile and expose a DV through that interface. Since we don't know if that's going to be helpful, I think we should keep this as a draft for now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will convert this into a draft and we can revisit this if it's needed as we build the reader and writer. cc @stevenzwu

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the write path, I also proposed new additions to the ManifestFile interface. I added 3 new methods in my next PR #16936 . We can add more (including manifest DV) as we go.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add the DV, let me know and I can revive this PR.

file.location());
this.file = file;
}

@Override
public String path() {
return file.location();
}

@Override
public long length() {
return file.fileSizeInBytes();
}

@Override
public int partitionSpecId() {
throw new UnsupportedOperationException(
"v4 manifests are not bound to a single partition spec");
}

@Override
public ManifestContent content() {
switch (file.contentType()) {
case DATA_MANIFEST:
return ManifestContent.DATA;
case DELETE_MANIFEST:
return ManifestContent.DELETES;
default:
throw new UnsupportedOperationException(
"Unsupported content type for manifests: " + file.contentType());
}
}

@Override
public long sequenceNumber() {
return file.tracking().dataSequenceNumber();
}

@Override
public long minSequenceNumber() {
return file.manifestInfo().minSequenceNumber();
}

@Override
public Long snapshotId() {
return file.tracking().snapshotId();
}

@Override
public Integer addedFilesCount() {
return file.manifestInfo().addedFilesCount();
}

@Override
public Long addedRowsCount() {
return file.manifestInfo().addedRowsCount();
}

@Override
public Integer existingFilesCount() {
return file.manifestInfo().existingFilesCount();
}

@Override
public Long existingRowsCount() {
return file.manifestInfo().existingRowsCount();
}

@Override
public Integer deletedFilesCount() {
return file.manifestInfo().deletedFilesCount();
}

@Override
public Long deletedRowsCount() {
return file.manifestInfo().deletedRowsCount();
}

// v4 does not store partition summaries on manifests, so return null.
@Override
public List<PartitionFieldSummary> partitions() {
Comment thread
stevenzwu marked this conversation as resolved.
return null;
}

@Override
public ByteBuffer keyMetadata() {
return file.keyMetadata();
}

@Override
public Long firstRowId() {
return file.tracking().firstRowId();
}

@Override
public ManifestFile copy() {
return new TrackedManifestFile(file.copy());
}
}

private static PartitionSpec resolveSpec(
TrackedFile file, Map<Integer, PartitionSpec> specsById) {
Integer specId = file.specId();
Expand Down
192 changes: 192 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTrackedFileAdapters.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ class TestTrackedFileAdapters {
private static final String MANIFEST_LOCATION = "s3://bucket/table/manifest.parquet";
private static final String DATA_FILE_LOCATION = "s3://bucket/data/file.parquet";
private static final String DV_LOCATION = "s3://bucket/puffin/dv-file.bin";
private static final String MANIFEST_FILE_LOCATION = "s3://bucket/table/manifests/m0.parquet";

// Tracking values that the delegation tests validate.
private static final long MANIFEST_POS = 3L;
private static final long DATA_SEQUENCE_NUMBER = 10L;
private static final long FILE_SEQUENCE_NUMBER = 11L;
private static final long FIRST_ROW_ID = 1000L;
private static final long SNAPSHOT_ID = 42L;
private static final long MANIFEST_RECORD_COUNT = 8L;
private static final long MANIFEST_FILE_SIZE = 2048L;

private static final int UNPARTITIONED_SPEC_ID = PartitionSpec.unpartitioned().specId();
private static final Map<Integer, PartitionSpec> UNPARTITIONED =
Expand All @@ -58,8 +62,10 @@ class TestTrackedFileAdapters {
.withSpecId(PARTITIONED_SPEC_ID)
.build();
private static final PartitionData PARTITION = partition("books");
private static final PartitionData EMPTY_PARTITION = new PartitionData(Types.StructType.of());

// Tracking field ordinals, looked up from the schema so the tests do not hard-code offsets.
private static final int SNAPSHOT_ID_ORDINAL = ordinalOf(Tracking.schema(), "snapshot_id");
private static final int DATA_SEQUENCE_NUMBER_ORDINAL =
ordinalOf(Tracking.schema(), "sequence_number");
private static final int FILE_SEQUENCE_NUMBER_ORDINAL =
Expand Down Expand Up @@ -276,6 +282,154 @@ void testDVDeleteFileAdapterRejectsNullDeletionVector() {
.hasMessage("Cannot create DV delete file: no deletion vector");
}

@ParameterizedTest
@EnumSource(
value = FileContent.class,
names = {"DATA_MANIFEST", "DELETE_MANIFEST"})
void testManifestFileAdapterDelegation(FileContent contentType) {
ManifestInfo manifestInfo = createManifestInfo();
TrackedFile file =
manifestBuilder(contentType)
.manifestInfo(manifestInfo)
.keyMetadata(ByteBuffer.wrap(new byte[] {7, 8, 9}))
.build();
populateTrackingFields(file);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI: in the follow-up for the TrackedFileBuilder PR, we decided to drop populateTrackingFields and went to call the constructor of TrackedFile instead using the createTracking (revived in the follow-up PR) method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I can rebase when the followup PR is merged.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this and either use a Tracking constant or create one inline. This indirection makes the tests unreadable.


ManifestFile manifest = TrackedFileAdapters.asManifestFile(file);

ManifestContent expectedContent =
contentType == FileContent.DATA_MANIFEST ? ManifestContent.DATA : ManifestContent.DELETES;
assertThat(manifest.path()).isEqualTo(MANIFEST_FILE_LOCATION);
assertThat(manifest.length()).isEqualTo(MANIFEST_FILE_SIZE);
assertThat(manifest.content()).isEqualTo(expectedContent);
assertThat(manifest.copy().content()).isEqualTo(expectedContent);
assertThat(manifest.sequenceNumber()).isEqualTo(DATA_SEQUENCE_NUMBER);
assertThat(manifest.minSequenceNumber()).isEqualTo(manifestInfo.minSequenceNumber());
assertThat(manifest.snapshotId()).isEqualTo(SNAPSHOT_ID);
assertThat(manifest.addedFilesCount()).isEqualTo(manifestInfo.addedFilesCount());
assertThat(manifest.addedRowsCount()).isEqualTo(manifestInfo.addedRowsCount());
assertThat(manifest.existingFilesCount()).isEqualTo(manifestInfo.existingFilesCount());
assertThat(manifest.existingRowsCount()).isEqualTo(manifestInfo.existingRowsCount());
assertThat(manifest.deletedFilesCount()).isEqualTo(manifestInfo.deletedFilesCount());
assertThat(manifest.deletedRowsCount()).isEqualTo(manifestInfo.deletedRowsCount());
assertThat(manifest.firstRowId()).isEqualTo(FIRST_ROW_ID);
assertThat(manifest.keyMetadata()).isEqualTo(ByteBuffer.wrap(new byte[] {7, 8, 9}));
assertThat(manifest.partitions()).isNull();
}

@Test
void testManifestFileAdapterPartitionSpecIdUnsupported() {
TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, createManifestInfo());

ManifestFile manifest = TrackedFileAdapters.asManifestFile(file);

assertThatThrownBy(manifest::partitionSpecId)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("v4 manifests are not bound to a single partition spec");
}

@Test
void testManifestFileAdapterRejectsManifestWithDeletionVector() {
ManifestInfo manifestInfo =
ManifestInfoStruct.builder()
.addedFilesCount(3)
.existingFilesCount(5)
.deletedFilesCount(2)
.replacedFilesCount(0)
.addedRowsCount(300L)
.existingRowsCount(500L)
.deletedRowsCount(200L)
.replacedRowsCount(0L)
.minSequenceNumber(7L)
.dv(ByteBuffer.wrap(new byte[] {1, 2, 3}))
.dvCardinality(4L)
.build();
TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, manifestInfo);

assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Cannot adapt manifest with a deletion vector to ManifestFile: %s",
MANIFEST_FILE_LOCATION);
}

@Test
void testManifestFileAdapterReturnsNullForUnsetNullableFields() {
TrackedFile file =
manifestBuilder(FileContent.DATA_MANIFEST).manifestInfo(createManifestInfo()).build();
// Inheritance fills the data sequence number, which the adapter requires; first row ID and
// key metadata stay unset.
((TrackingStruct) file.tracking()).set(DATA_SEQUENCE_NUMBER_ORDINAL, DATA_SEQUENCE_NUMBER);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using the ordinal, we can use the inheritFrom(some_manifest) method to do that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Routing through inheritFrom() will require building a parent Tzracking with several nulls. I prefer to keep it since this is simpler.


ManifestFile manifest = TrackedFileAdapters.asManifestFile(file);

assertThat(manifest.firstRowId()).isNull();
assertThat(manifest.keyMetadata()).isNull();
}

@Test
void testManifestFileAdapterCopy() {
TrackedFile file =
manifestBuilder(FileContent.DATA_MANIFEST)
.manifestInfo(createManifestInfo())
.keyMetadata(ByteBuffer.wrap(new byte[] {7, 8, 9}))
.build();
populateTrackingFields(file);

ManifestFile original = TrackedFileAdapters.asManifestFile(file);
ManifestFile copy = original.copy();

assertThat(copy.path()).isEqualTo(original.path());
assertThat(copy.length()).isEqualTo(original.length());
assertThat(copy.content()).isEqualTo(original.content());
assertThat(copy.sequenceNumber()).isEqualTo(original.sequenceNumber());
assertThat(copy.minSequenceNumber()).isEqualTo(original.minSequenceNumber());
assertThat(copy.snapshotId()).isEqualTo(original.snapshotId());
assertThat(copy.addedFilesCount()).isEqualTo(original.addedFilesCount());
assertThat(copy.addedRowsCount()).isEqualTo(original.addedRowsCount());
assertThat(copy.existingFilesCount()).isEqualTo(original.existingFilesCount());
assertThat(copy.existingRowsCount()).isEqualTo(original.existingRowsCount());
assertThat(copy.deletedFilesCount()).isEqualTo(original.deletedFilesCount());
assertThat(copy.deletedRowsCount()).isEqualTo(original.deletedRowsCount());
assertThat(copy.firstRowId()).isEqualTo(original.firstRowId());
assertThat(copy.keyMetadata()).isEqualTo(original.keyMetadata());
assertThat(copy.partitions()).isNull();
assertThat(copy.keyMetadata().array()).isNotSameAs(original.keyMetadata().array());
}

@ParameterizedTest
@EnumSource(
value = FileContent.class,
mode = EnumSource.Mode.EXCLUDE,
names = {"DATA_MANIFEST", "DELETE_MANIFEST"})
void testManifestFileAdapterRejectsNonManifestContent(FileContent contentType) {
TrackedFileStruct file = dummyTrackedFile(contentType);

assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid content type for ManifestFile: %s", contentType);
}

@Test
void testManifestFileAdapterRejectsNullDataSequenceNumber() {
TrackedFile file =
manifestBuilder(FileContent.DATA_MANIFEST).manifestInfo(createManifestInfo()).build();

assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid data sequence number: null");
}

@Test
void testManifestFileAdapterRejectsNullSnapshotId() {
TrackedFile file = manifestFile(FileContent.DATA_MANIFEST, createManifestInfo());
((TrackingStruct) file.tracking()).set(SNAPSHOT_ID_ORDINAL, null);

assertThatThrownBy(() -> TrackedFileAdapters.asManifestFile(file))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid snapshot ID: null");
}

@Test
void testNullContentStatsReturnsNullStats() {
TrackedFileStruct file = dummyTrackedFile(FileContent.DATA);
Expand Down Expand Up @@ -392,6 +546,30 @@ private static TrackedFileStruct dummyTrackedFile(FileContent contentType) {
return file;
}

// Builder for a manifest entry with the required non-tracking fields set. Callers add the
// manifest info and any optional fields before building.
private static TrackedFileBuilder manifestBuilder(FileContent contentType) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the complexity here to use the builder is a good thing.

This creates indirection and makes tests harder to read, while gaining almost nothing. There is hardly any readability benefit to using the builder when you're already setting constants like FORMAT_VERSION_V4. All this does is move the initialization here, so people have to find this method to validate that the assertions above are correct, like assertThat(manifest.path()).isEqualTo(MANIFEST_FILE_LOCATION). I would rather see MANIFEST_FILE_LOCATION passed in and then validated.

I think this should be removed and the constructor should be used instead. All these tests need to verify is that these values are passed through correctly.

TrackedFileBuilder builder =
contentType == FileContent.DATA_MANIFEST
? TrackedFileBuilder.dataManifest(SNAPSHOT_ID)
: TrackedFileBuilder.deleteManifest(SNAPSHOT_ID);
return builder
.formatVersion(FORMAT_VERSION_V4)
.location(MANIFEST_FILE_LOCATION)
.fileFormat(FileFormat.PARQUET)
.partition(EMPTY_PARTITION)
.recordCount(MANIFEST_RECORD_COUNT)
.fileSizeInBytes(MANIFEST_FILE_SIZE);
}

// Builds a manifest entry and simulates inheritance so the tracking fields the adapter requires
// are populated.
private static TrackedFile manifestFile(FileContent contentType, ManifestInfo manifestInfo) {
TrackedFile file = manifestBuilder(contentType).manifestInfo(manifestInfo).build();
populateTrackingFields(file);
return file;
}

private static void populateTrackingFields(TrackedFile file) {
TrackingStruct tracking = (TrackingStruct) file.tracking();
tracking.set(DATA_SEQUENCE_NUMBER_ORDINAL, DATA_SEQUENCE_NUMBER);
Expand All @@ -401,6 +579,20 @@ private static void populateTrackingFields(TrackedFile file) {
tracking.set(MANIFEST_POS_ORDINAL, MANIFEST_POS);
}

private static ManifestInfo createManifestInfo() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a constant, not a method. We don't need to create a manifest info struct every time, and a method gives no guarantees about the values produced. It could create random values every time.

If we use a constant, then you can write assertions like assertThat(manifest.existingFilesCount()).isEqualTo(manifestInfo.existingFilesCount()).

return ManifestInfoStruct.builder()
.addedFilesCount(3)
.existingFilesCount(5)
.deletedFilesCount(2)
.replacedFilesCount(0)
.addedRowsCount(300L)
.existingRowsCount(500L)
.deletedRowsCount(200L)
.replacedRowsCount(0L)
.minSequenceNumber(7L)
.build();
}

private static DeletionVector deletionVector() {
return DeletionVectorStruct.builder()
.location(DV_LOCATION)
Expand Down
Loading