Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -310,4 +311,94 @@ public void testManifestLocationsInScanWithDeleteFiles() throws IOException {
.get(0);
assertThat(deletes.get(0).manifestLocation()).isEqualTo(deleteManifest.path());
}

@TestTemplate
public void testPlanFilesPositionDeletePathReferenceWithPartitionMismatch() {
assumeThat(formatVersion).as("Requires V2 position deletes").isEqualTo(2);

table.newAppend().appendFile(FILE_A).commit();

// Position delete whose metadata partition (data_bucket=1) does not match FILE_A's
// partition (data_bucket=0), but whose path reference points to FILE_A.
DeleteFile posDeleteWrongPartition =
FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/pos-delete-wrong-partition-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1")
.withRecordCount(1)
.withReferencedDataFile(FILE_A.location())
.build();

table.newRowDelta().addDeletes(posDeleteWrongPartition).commit();

// Without a partition filter: delete is associated via path reference despite partition
// mismatch.
List<T> unfiltered = Lists.newArrayList(newScan().planFiles());
assertThat(unfiltered).as("Should have one task").hasSize(1);
FileScanTask unfilteredTask = (FileScanTask) unfiltered.get(0);
assertThat(unfilteredTask.file().location()).isEqualTo(FILE_A.location());
assertThat(unfilteredTask.deletes())
.as("Delete is associated via path reference without a partition filter")
.hasSize(1);
assertThat(unfilteredTask.deletes().get(0).location())
.isEqualTo(posDeleteWrongPartition.location());

// With a partition filter targeting data_bucket=0: the delete manifest contains only
// data_bucket=1 entries and is pruned, so the path-referenced delete is lost.
ScanT filtered =
newScan().filter(Expressions.equal(Expressions.bucket("data", BUCKETS_NUMBER), 0));
List<T> filteredTasks = Lists.newArrayList(filtered.planFiles());
assertThat(filteredTasks).as("Should have one task").hasSize(1);
FileScanTask filteredTask = (FileScanTask) filteredTasks.get(0);
assertThat(filteredTask.file().location()).isEqualTo(FILE_A.location());
assertThat(filteredTask.deletes())
.as("Delete is dropped when delete manifest is pruned by partition filter")
.hasSize(0);
}

@TestTemplate
public void testPlanFilesDeletionVectorPathReferenceWithPartitionMismatch() {
assumeThat(formatVersion).as("Requires V3 deletion vectors").isGreaterThanOrEqualTo(3);

table.newAppend().appendFile(FILE_A).commit();

// DV whose metadata partition (data_bucket=1) does not match FILE_A's partition
// (data_bucket=0), but whose path reference points to FILE_A.
DeleteFile dvWrongPartition =
FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/dv-wrong-partition-" + UUID.randomUUID() + ".puffin")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1")
.withRecordCount(1)
.withReferencedDataFile(FILE_A.location())
.withContentOffset(4)
.withContentSizeInBytes(6)
.build();

table.newRowDelta().addDeletes(dvWrongPartition).commit();

// Without a partition filter: DV is associated via path reference despite partition mismatch.
List<T> unfiltered = Lists.newArrayList(newScan().planFiles());
assertThat(unfiltered).as("Should have one task").hasSize(1);
FileScanTask unfilteredTask = (FileScanTask) unfiltered.get(0);
assertThat(unfilteredTask.file().location()).isEqualTo(FILE_A.location());
assertThat(unfilteredTask.deletes())
.as("DV is associated via path reference without a partition filter")
.hasSize(1);
assertThat(unfilteredTask.deletes().get(0).location()).isEqualTo(dvWrongPartition.location());

// With a partition filter targeting data_bucket=0: the delete manifest contains only
// data_bucket=1 entries and is pruned, so the path-referenced DV is lost.
ScanT filtered =
newScan().filter(Expressions.equal(Expressions.bucket("data", BUCKETS_NUMBER), 0));
List<T> filteredTasks = Lists.newArrayList(filtered.planFiles());
assertThat(filteredTasks).as("Should have one task").hasSize(1);
FileScanTask filteredTask = (FileScanTask) filteredTasks.get(0);
assertThat(filteredTask.file().location()).isEqualTo(FILE_A.location());
assertThat(filteredTask.deletes())
.as("DV is dropped when delete manifest is pruned by partition filter")
.hasSize(0);
}
}
74 changes: 74 additions & 0 deletions core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,80 @@ public void testPartitionedTableWithPartitionPosDeletes() {
.isEqualTo(fileADeletes().location());
}

@TestTemplate
public void testPositionDeletePathReferenceOverridesPartitionMismatch() {
assumeThat(formatVersion).as("Requires V2 position deletes").isEqualTo(2);

table.newAppend().appendFile(FILE_A).commit();

// Position delete whose metadata partition (data_bucket=1) does not match FILE_A's
// partition (data_bucket=0), but whose path reference points to FILE_A.
// Should this commit be disallowed?
DeleteFile posDeleteWrongPartition =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/pos-delete-wrong-partition-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1")
.withRecordCount(1)
.withReferencedDataFile(FILE_A.location())
.build();

table.newRowDelta().addDeletes(posDeleteWrongPartition).commit();

List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
assertThat(tasks).as("Should have one task").hasSize(1);

FileScanTask task = (FileScanTask) tasks.get(0);
assertThat(task.file().location())
.as("Should have the correct data file path")
.isEqualTo(FILE_A.location());
assertThat(task.deletes())
.as("Path reference should override partition mismatch, the delete must be included")
.hasSize(1);
assertThat(task.deletes().get(0).location())
.as("Should have the path-scoped position delete file")
.isEqualTo(posDeleteWrongPartition.location());
}

@TestTemplate
public void testDeletionVectorPathReferenceOverridesPartitionMismatch() {
assumeThat(formatVersion).as("Requires V3 deletion vectors").isGreaterThanOrEqualTo(3);

table.newAppend().appendFile(FILE_A).commit();

// DV whose metadata partition (data_bucket=1) does not match FILE_A's partition
// (data_bucket=0), but whose path reference points to FILE_A.
// Should this commit be disallowed?
DeleteFile dvWrongPartition =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/dv-wrong-partition-" + UUID.randomUUID() + ".puffin")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1")
.withRecordCount(1)
.withReferencedDataFile(FILE_A.location())
.withContentOffset(4)
.withContentSizeInBytes(6)
.build();

table.newRowDelta().addDeletes(dvWrongPartition).commit();

List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
assertThat(tasks).as("Should have one task").hasSize(1);

FileScanTask task = (FileScanTask) tasks.get(0);
assertThat(task.file().location())
.as("Should have the correct data file path")
.isEqualTo(FILE_A.location());
assertThat(task.deletes())
.as("Path reference should override partition mismatch, the DV must be included")
.hasSize(1);
assertThat(task.deletes().get(0).location())
.as("Should have the deletion vector")
.isEqualTo(dvWrongPartition.location());
}

@TestTemplate
public void testPartitionedTableWithPartitionEqDeletes() {
table.newAppend().appendFile(FILE_A).commit();
Expand Down
Loading