Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class DeleteFileIndex {
private final PartitionMap<PositionDeletes> posDeletesByPartition;
private final Map<String, PositionDeletes> posDeletesByPath;
private final Map<String, DeleteFile> dvByPath;
private final LoadingCache<Integer, Comparator<StructLike>> partitionComparatorsBySpecId;
private final boolean hasEqDeletes;
private final boolean hasPosDeletes;
private final boolean isEmpty;
Expand All @@ -84,12 +85,18 @@ private DeleteFileIndex(
PartitionMap<EqualityDeletes> eqDeletesByPartition,
PartitionMap<PositionDeletes> posDeletesByPartition,
Map<String, PositionDeletes> posDeletesByPath,
Map<String, DeleteFile> dvByPath) {
Map<String, DeleteFile> dvByPath,
Map<Integer, PartitionSpec> specsById) {
this.globalDeletes = globalDeletes;
this.eqDeletesByPartition = eqDeletesByPartition;
this.posDeletesByPartition = posDeletesByPartition;
this.posDeletesByPath = posDeletesByPath;
this.dvByPath = dvByPath;
this.partitionComparatorsBySpecId =
specsById == null
? null
: Caffeine.newBuilder()
.build(specId -> Comparators.forType(specsById.get(specId).partitionType()));
this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
this.hasPosDeletes =
posDeletesByPartition != null || posDeletesByPath != null || dvByPath != null;
Expand Down Expand Up @@ -196,7 +203,15 @@ private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) {
}

PositionDeletes deletes = posDeletesByPath.get(dataFile.location());
return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
if (deletes == null) {
return EMPTY_DELETES;
}

DeleteFile[] matchingDeletes = deletes.filter(seq);
for (DeleteFile deleteFile : matchingDeletes) {
validatePartitionMatch(deleteFile, dataFile);
}
return matchingDeletes;
}

private DeleteFile findDV(long seq, DataFile dataFile) {
Expand All @@ -211,10 +226,34 @@ private DeleteFile findDV(long seq, DataFile dataFile) {
"DV data sequence number (%s) must be greater than or equal to data file sequence number (%s)",
dv.dataSequenceNumber(),
seq);
validatePartitionMatch(dv, dataFile);
}
return dv;
}

private void validatePartitionMatch(DeleteFile deleteFile, DataFile dataFile) {
ValidationException.check(
deleteFile.specId() == dataFile.specId(),
"Mismatched partition specs (%s, %s) for delete file %s and data file %s:"
+ " metadata is corrupted",
deleteFile.specId(),
dataFile.specId(),
deleteFile.location(),
dataFile.location());
if (partitionComparatorsBySpecId != null) {
Comparator<StructLike> partitionComparator =
partitionComparatorsBySpecId.get(deleteFile.specId());
ValidationException.check(
partitionComparator.compare(deleteFile.partition(), dataFile.partition()) == 0,
"Mismatched partition tuples (%s, %s) for delete file %s and data file %s:"
+ " metadata is corrupted",
deleteFile.partition(),
dataFile.partition(),
deleteFile.location(),
dataFile.location());
}
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean canContainEqDeletesForFile(
DataFile dataFile, EqualityDeleteFile deleteFile) {
Expand Down Expand Up @@ -522,7 +561,8 @@ DeleteFileIndex build() {
eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
posDeletesByPath.isEmpty() ? null : posDeletesByPath,
dvByPath.isEmpty() ? null : dvByPath);
dvByPath.isEmpty() ? null : dvByPath,
specsById.isEmpty() ? null : specsById);
}

private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
Expand Down
61 changes: 61 additions & 0 deletions core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -310,4 +311,64 @@ public void testManifestLocationsInScanWithDeleteFiles() throws IOException {
.get(0);
assertThat(deletes.get(0).manifestLocation()).isEqualTo(deleteManifest.path());
}

@TestTemplate
public void testPlanFilesPositionDeletePathReferenceWithPartitionMismatchFails() {
assumeThat(formatVersion).as("Requires V2 position deletes").isEqualTo(2);

table.newAppend().appendFile(FILE_A).commit();

// Position delete whose metadata partition (data_bucket=1) does not match FILE_A's
// partition (data_bucket=0), but whose path reference points to FILE_A.
DeleteFile posDeleteWrongPartition =
FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/pos-delete-wrong-partition-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1")
.withRecordCount(1)
.withReferencedDataFile(FILE_A.location())
.build();

table.newRowDelta().addDeletes(posDeleteWrongPartition).commit();

assertThatThrownBy(() -> Lists.newArrayList(newScan().planFiles()))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Mismatched partition tuples (PartitionData{data_bucket=1},"
+ " PartitionData{data_bucket=0})")
.hasMessageContaining(FILE_A.location())
.hasMessageContaining("metadata is corrupted");
}

@TestTemplate
public void testPlanFilesDeletionVectorPathReferenceWithPartitionMismatchFails() {
assumeThat(formatVersion).as("Requires V3 deletion vectors").isGreaterThanOrEqualTo(3);

table.newAppend().appendFile(FILE_A).commit();

// DV whose metadata partition (data_bucket=1) does not match FILE_A's partition
// (data_bucket=0), but whose path reference points to FILE_A.
DeleteFile dvWrongPartition =
FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/dv-wrong-partition-" + UUID.randomUUID() + ".puffin")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1")
.withRecordCount(1)
.withReferencedDataFile(FILE_A.location())
.withContentOffset(4)
.withContentSizeInBytes(6)
.build();

table.newRowDelta().addDeletes(dvWrongPartition).commit();

assertThatThrownBy(() -> Lists.newArrayList(newScan().planFiles()))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Mismatched partition tuples (PartitionData{data_bucket=1},"
+ " PartitionData{data_bucket=0})")
.hasMessageContaining(FILE_A.location())
.hasMessageContaining("metadata is corrupted");
}
}