diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 69f82931833e..cd6604e7995d 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -42,6 +42,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; import org.apache.iceberg.puffin.Puffin; import org.apache.iceberg.puffin.PuffinCompressionCodec; import org.apache.iceberg.puffin.PuffinReader; @@ -676,15 +677,14 @@ private static void rewriteDVFile( String sourcePrefix, String targetPrefix) throws IOException { - List rewrittenBlobs = Lists.newArrayList(); - try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build()) { - // Read all blobs and rewrite them with updated referenced data file paths - for (Pair blobPair : + try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build(); + PuffinWriter writer = + Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) { + for (Pair blobPair : reader.readAll(reader.fileMetadata().blobs())) { - org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first(); + BlobMetadata blobMetadata = blobPair.first(); ByteBuffer blobData = blobPair.second(); - // Get the original properties and update the referenced data file path Map properties = Maps.newHashMap(blobMetadata.properties()); String referencedDataFile = properties.get("referenced-data-file"); if (referencedDataFile != null && referencedDataFile.startsWith(sourcePrefix)) { @@ -692,8 +692,7 @@ private static void rewriteDVFile( properties.put("referenced-data-file", newReferencedDataFile); } - // Create a new blob with updated properties - rewrittenBlobs.add( + writer.write( new Blob( blobMetadata.type(), blobMetadata.inputFields(), @@ -704,11 +703,6 @@ private static void rewriteDVFile( properties)); } } - - try (PuffinWriter writer = - Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) { - rewrittenBlobs.forEach(writer::write); - } } private static PositionDelete newPositionDeleteRecord( diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java index bedd8dd66d71..be20ab38c2f3 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java @@ -23,13 +23,31 @@ import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; import java.util.Set; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.Pair; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(ParameterizedTestExtension.class) public class TestRewriteTablePathUtil extends TestBase { + private static final String REFERENCED_DATA_FILE = "referenced-data-file"; + private static final String CARDINALITY = "cardinality"; @Test public void testStagingPathPreservesDirectoryStructure() { @@ -281,4 +299,136 @@ public void testRewritingMultiplePositionDeleteEntriesWithinManifestFile() throw assertThat(deleteFileRewriteResult.toRewrite()).hasSize(2); } + + @TestTemplate + void rewriteDVFileRewritesReferencedDataFileInBlobMetadata() throws IOException { + assumeThat(formatVersion).as("DVs require format version 3+").isGreaterThanOrEqualTo(3); + + String sourcePrefix = temp.resolve("source").toString(); + String targetPrefix = temp.resolve("target").toString(); + String sourceDataFile = sourcePrefix + "/data/file-a.parquet"; + String externalDataFile = temp.resolve("external/data/file-b.parquet").toString(); + + PositionDeleteIndex sourceDeletes = positionDeleteIndex(1L, 4L); + PositionDeleteIndex externalDeletes = positionDeleteIndex(2L, 5L, 8L); + byte[] sourcePayload = serializedDV(sourceDeletes); + byte[] externalPayload = serializedDV(externalDeletes); + + OutputFile sourceDVFile = + Files.localOutput( + temp.resolve("source/metadata/dv-" + System.nanoTime() + ".puffin").toString()); + try (PuffinWriter writer = Puffin.write(sourceDVFile).createdBy("test").build()) { + writer.write(newDVBlob(sourceDeletes, sourcePayload, sourceDataFile)); + writer.write(newDVBlob(externalDeletes, externalPayload, externalDataFile)); + } + + List sourceBlobMetadata; + try (PuffinReader reader = Puffin.read(sourceDVFile.toInputFile()).build()) { + sourceBlobMetadata = reader.fileMetadata().blobs(); + } + assertThat(sourceBlobMetadata).hasSize(2); + + DeleteFile dvDeleteFile = + FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withFormat(FileFormat.PUFFIN) + .withPath(sourceDVFile.location()) + .withFileSizeInBytes(sourceDVFile.toInputFile().getLength()) + .withPartition(FILE_A.partition()) + .withRecordCount(sourceDeletes.cardinality()) + .withReferencedDataFile(sourceDataFile) + .withContentOffset(sourceBlobMetadata.get(0).offset()) + .withContentSizeInBytes(sourceBlobMetadata.get(0).length()) + .build(); + + OutputFile rewrittenDVFile = + Files.localOutput( + temp.resolve("target/metadata/dv-rewritten-" + System.nanoTime() + ".puffin") + .toString()); + RewriteTablePathUtil.rewritePositionDeleteFile( + dvDeleteFile, rewrittenDVFile, table.io(), table.spec(), sourcePrefix, targetPrefix, null); + + try (PuffinReader reader = Puffin.read(rewrittenDVFile.toInputFile()).build()) { + List rewrittenBlobMetadata = reader.fileMetadata().blobs(); + assertThat(rewrittenBlobMetadata).hasSize(2); + + BlobMetadata rewrittenSourceBlob = rewrittenBlobMetadata.get(0); + BlobMetadata rewrittenExternalBlob = rewrittenBlobMetadata.get(1); + assertDVBlobMetadata( + rewrittenSourceBlob, targetPrefix + "/data/file-a.parquet", sourceDeletes.cardinality()); + assertDVBlobMetadata(rewrittenExternalBlob, externalDataFile, externalDeletes.cardinality()); + + assertThat(rewrittenSourceBlob.offset()).isEqualTo(dvDeleteFile.contentOffset()); + assertThat(rewrittenSourceBlob.length()).isEqualTo(dvDeleteFile.contentSizeInBytes()); + assertThat(rewrittenExternalBlob.offset()) + .isEqualTo(rewrittenSourceBlob.offset() + rewrittenSourceBlob.length()); + + List> blobs = + ImmutableList.copyOf(reader.readAll(rewrittenBlobMetadata)); + assertThat(ByteBuffers.toByteArray(blobs.get(0).second())).isEqualTo(sourcePayload); + assertThat(ByteBuffers.toByteArray(blobs.get(1).second())).isEqualTo(externalPayload); + } + + DeleteFile rewrittenDVDeleteFile = + FileMetadata.deleteFileBuilder(table.spec()) + .copy(dvDeleteFile) + .withPath(rewrittenDVFile.location()) + .withFileSizeInBytes(rewrittenDVFile.toInputFile().getLength()) + .withReferencedDataFile(targetPrefix + "/data/file-a.parquet") + .build(); + assertDeletedPositions(DVUtil.readDV(rewrittenDVDeleteFile, table.io()), 1L, 4L); + } + + private static Blob newDVBlob( + PositionDeleteIndex deletes, byte[] payload, String referencedDataFile) { + return new Blob( + StandardBlobTypes.DV_V1, + ImmutableList.of(MetadataColumns.ROW_POSITION.fieldId()), + -1L, + -1L, + ByteBuffer.wrap(payload), + null, + ImmutableMap.of( + REFERENCED_DATA_FILE, + referencedDataFile, + CARDINALITY, + String.valueOf(deletes.cardinality()))); + } + + private static PositionDeleteIndex positionDeleteIndex(long... positions) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (long position : positions) { + builder.add(position); + } + + return Deletes.toPositionIndex(CloseableIterable.withNoopClose(builder.build())); + } + + private static byte[] serializedDV(PositionDeleteIndex deletes) { + return ByteBuffers.toByteArray(deletes.serialize()); + } + + private static void assertDVBlobMetadata( + BlobMetadata blobMetadata, String referencedDataFile, long cardinality) { + assertThat(blobMetadata.type()).isEqualTo(StandardBlobTypes.DV_V1); + assertThat(blobMetadata.inputFields()).containsExactly(MetadataColumns.ROW_POSITION.fieldId()); + assertThat(blobMetadata.snapshotId()).isEqualTo(-1L); + assertThat(blobMetadata.sequenceNumber()).isEqualTo(-1L); + assertThat(blobMetadata.compressionCodec()).isNull(); + assertThat(blobMetadata.offset()).isPositive(); + assertThat(blobMetadata.length()).isPositive(); + assertThat(blobMetadata.properties()) + .containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + REFERENCED_DATA_FILE, + referencedDataFile, + CARDINALITY, + String.valueOf(cardinality))); + } + + private static void assertDeletedPositions(PositionDeleteIndex deletes, long... positions) { + for (long position : positions) { + assertThat(deletes.isDeleted(position)).isTrue(); + } + } }