Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,18 @@ public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec
* @return the constructed unified partition type
*/
public static StructType partitionType(Table table) {
Collection<PartitionSpec> specs = table.specs().values();
return buildPartitionProjectionType(
"table partition", specs, allActiveFieldIds(table.schema(), specs));
return partitionType(table.schema(), table.specs().values());
}

/**
* Builds a unified partition type from a schema and its specs, unioning every partition field
* whose source column is present in the schema.
*
* @param schema the schema used to determine which partition fields are active
* @param specs the partition specs to unify
*/
static StructType partitionType(Schema schema, Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("table partition", specs, allActiveFieldIds(schema, specs));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/TrackingStruct.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

/** Mutable {@link StructLike} implementation of {@link Tracking}. */
class TrackingStruct extends SupportsIndexProjection implements Tracking, Serializable {
private static final Types.StructType BASE_TYPE =
static final Types.StructType BASE_TYPE =
Types.StructType.of(
Tracking.STATUS,
Tracking.SNAPSHOT_ID,
Expand Down
276 changes: 276 additions & 0 deletions core/src/main/java/org/apache/iceberg/V4ManifestReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.metrics.ScanMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

/** Reader that reads a v4 manifest file as {@link TrackedFile}s. */

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: v4 -> v4+

class V4ManifestReader extends CloseableGroup implements CloseableIterable<TrackedFile> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name V4ManifestReader would be come stale when V5 rolls in. maybe TrackedFileManifestReader.

private final InputFile file;
private final Types.StructType partitionType;
private final Schema fileProjection;
private final ScanMetrics scanMetrics;

// partition pruning state, keyed by spec ID; empty when no filtering is required
private final Map<Integer, Evaluator> partitionEvaluators;
private final Map<Integer, StructProjection> partitionProjections;

private V4ManifestReader(
InputFile file,
Types.StructType partitionType,
Map<Integer, Evaluator> partitionEvaluators,
Map<Integer, StructProjection> partitionProjections,
Schema fileProjection,
ScanMetrics scanMetrics) {
this.file = file;
this.partitionType = partitionType;
this.partitionEvaluators = partitionEvaluators;
this.partitionProjections = partitionProjections;
this.fileProjection = fileProjection;
this.scanMetrics = scanMetrics;
}

static Builder builder(
InputFile file, Schema tableSchema, Map<Integer, PartitionSpec> specsById) {
return new Builder(file, tableSchema, specsById);
}

/** Returns all tracked files in this manifest, regardless of status. */
CloseableIterable<TrackedFile> allFiles() {
return files(false /* all files */);
}

/** Returns tracked files whose tracking {@link Tracking#isLive() is live}. */
CloseableIterable<TrackedFile> liveFiles() {
return files(true /* only live files */);
}

/** Returns live tracked files, each as an independent copy. */
@Override
public CloseableIterator<TrackedFile> iterator() {
return CloseableIterable.transform(liveFiles(), TrackedFile::copy).iterator();
}

private CloseableIterable<TrackedFile> files(boolean onlyLive) {
CloseableIterable<TrackedFile> entries = CloseableIterable.transform(open(), this::prepare);
if (!partitionEvaluators.isEmpty()) {
entries = CloseableIterable.filter(entries, this::matchesPartition);
}

if (onlyLive) {
entries = CloseableIterable.filter(entries, entry -> entry.tracking().isLive());
}

return entries;
}

private boolean matchesPartition(TrackedFile trackedFile) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we want to drop tuple-based pruning entirely and push predicates against partition-transform-expression columns in content_stats. Under that model the partition tuple is only needed to match data files to equality-delete files.

I am planning to bring up the discussion in Monday's sync.

FileContent content = trackedFile.contentType();
if (content == FileContent.DATA_MANIFEST || content == FileContent.DELETE_MANIFEST) {
// manifest references are expanded later and are not pruned by the partition filter
return true;
}

Integer specId = trackedFile.specId();
Evaluator evaluator = specId != null ? partitionEvaluators.get(specId) : null;
StructProjection projection = specId != null ? partitionProjections.get(specId) : null;
Preconditions.checkState(
evaluator != null && projection != null,
"Cannot apply partition filter: file %s has spec ID %s, not one of the known specs %s "
+ "in manifest %s",
trackedFile.location(),
specId,
partitionEvaluators.keySet(),
file.location());

boolean matches = evaluator.eval(projection.wrap(trackedFile.partition()));
if (!matches) {
if (content == FileContent.DATA) {
scanMetrics.skippedDataFiles().increment();
} else {
scanMetrics.skippedDeleteFiles().increment();
}
}

return matches;
}

private CloseableIterable<TrackedFile> open() {
FileFormat format = FileFormat.fromFileName(file.location());
Preconditions.checkArgument(
format != null, "Unable to determine format of manifest: %s", file.location());

CloseableIterable<TrackedFile> reader =
InternalData.read(format, file)
.project(readSchema())
.setRootType(TrackedFileStruct.class)
.setCustomType(TrackedFile.TRACKING.fieldId(), TrackingStruct.class)
.setCustomType(TrackedFile.DELETION_VECTOR.fieldId(), DeletionVectorStruct.class)
.setCustomType(TrackedFile.MANIFEST_INFO.fieldId(), ManifestInfoStruct.class)
.setCustomType(TrackedFile.PARTITION_ID, PartitionData.class)
.reuseContainers()
.build();
addCloseable(reader);
return reader;
}

private TrackedFile prepare(TrackedFile trackedFile) {
Tracking tracking = trackedFile.tracking();
// manifestLocation is not stored in the manifest; the reader fills it from the file location.
// manifestPos is filled from ROW_POSITION while reading the tracking struct.
if (tracking instanceof TrackingStruct) {
((TrackingStruct) tracking).setManifestLocation(file.location());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We populate Tracking.manifestPos using a metadata column ROW_POSITION, while we populate Tracking.manifestLocation manually here. For the latter, is there a reason we can't use MetadataColumns.FILE_PATH to be consistent?
We could get rid of the custom setter TrackingStruct.setManifestlocation method too and let it flow through internalSet getByPos methods.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They look symmetric but the underlying mechanism differs. ROW_POSITION is synthesized by the reader itself. FILE_PATH us only populated when the caller injects it. I'd lean toward keeping the manual set for now

}

return trackedFile;
}

private Schema readSchema() {
// content_stats is not projected yet, so build the schema with an empty stats type
Types.StructType fullType =
TrackedFile.schemaWithContentStats(partitionType, Types.StructType.of());
boolean unpartitioned = partitionType.fields().isEmpty();

Set<Integer> projectedIds = null;
if (fileProjection != null) {
projectedIds = Sets.newHashSet();
for (Types.NestedField field : fileProjection.asStruct().fields()) {
projectedIds.add(field.fieldId());
}

// tracking carries the status used to filter live files and is always projected
projectedIds.add(TrackedFile.TRACKING.fieldId());

// spec_id is required to resolve each entry's partition spec when pruning
if (!partitionEvaluators.isEmpty()) {
projectedIds.add(TrackedFile.SPEC_ID.fieldId());
}
}

List<Types.NestedField> fields = Lists.newArrayList();
for (Types.NestedField field : fullType.fields()) {
if (projectedIds != null && !projectedIds.contains(field.fieldId())) {
continue;
}

if (field.fieldId() == TrackedFile.TRACKING.fieldId()) {
fields.add(trackingWithRowPosition());
} else if (field.fieldId() == TrackedFile.CONTENT_STATS_ID) {
// content_stats are omitted for now
} else if (field.fieldId() == TrackedFile.PARTITION_ID && unpartitioned) {
// unpartitioned manifests omit the partition field
} else {
fields.add(field);
}
}

return new Schema(fields);
}

/**
* Builds the tracking field from the read schema, which includes {@code ROW_POSITION} so the
* reader populates the manifest position of each entry.
*/
private static Types.NestedField trackingWithRowPosition() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These Tracking schemas are kind of confusing. Let me know if I get it wrong!
So Tracking.schema() is the physical representation of the data, what we expect to be written, or in other words, this is the write schema.
TrackingStruct.BASE_TYPE is the logical representation including field(s) that are physically not persisted, or in other words this is the read schema.
Naively asking, since we are on the read path, can't we simply use the read schema from TrackingStruct instead of constructing it here from the physical schema?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your understanding correct! Tracking.schema() is the on-disk schema. TrackingStruct.BASE_TYPE is the same schema plus row_position. We can reuse it. I just needed to change the visibility of TrackingStruct.BASE_TYPE from private to package private.

return Types.NestedField.required(
TrackedFile.TRACKING.fieldId(),
TrackedFile.TRACKING.name(),
TrackingStruct.BASE_TYPE,
TrackedFile.TRACKING.doc());
}

static class Builder {
private final InputFile file;
private final Schema tableSchema;
private final Map<Integer, PartitionSpec> specsById;
private Expression rowFilter = Expressions.alwaysTrue();
private boolean caseSensitive = true;
private Schema fileProjection = null;
private ScanMetrics scanMetrics = ScanMetrics.noop();

private Builder(InputFile file, Schema tableSchema, Map<Integer, PartitionSpec> specsById) {
this.file = file;
this.tableSchema = tableSchema;
this.specsById = specsById;
}

/** Sets a row filter; files that cannot match the expression are skipped. */
Builder filterRows(Expression expr) {
Preconditions.checkNotNull(expr, "Row filter cannot be null");
this.rowFilter = expr;
return this;
}

Builder caseSensitive(boolean isCaseSensitive) {
this.caseSensitive = isCaseSensitive;
return this;
}

Builder project(Schema newFileProjection) {
this.fileProjection = newFileProjection;
return this;
}

Builder scanMetrics(ScanMetrics newScanMetrics) {
Preconditions.checkNotNull(newScanMetrics, "Scan metrics cannot be null");
this.scanMetrics = newScanMetrics;
return this;
}

V4ManifestReader build() {
Types.StructType partitionType = Partitioning.partitionType(tableSchema, specsById.values());
Map<Integer, Evaluator> partitionEvaluators = Maps.newHashMap();
Map<Integer, StructProjection> partitionProjections = Maps.newHashMap();
if (rowFilter != Expressions.alwaysTrue() && !partitionType.fields().isEmpty()) {
for (PartitionSpec spec : specsById.values()) {
Expression partFilter = Projections.inclusive(spec, caseSensitive).project(rowFilter);
partitionEvaluators.put(
spec.specId(), new Evaluator(spec.partitionType(), partFilter, caseSensitive));
partitionProjections.put(
spec.specId(), StructProjection.create(partitionType, spec.partitionType()));
}
}

return new V4ManifestReader(
file,
partitionType,
partitionEvaluators,
partitionProjections,
fileProjection,
scanMetrics);
}
}
}
Loading