From ab3fe64be39f640df84e1839e19eb95ee10cf049 Mon Sep 17 00:00:00 2001 From: JandyTenedora Date: Mon, 22 Jun 2026 17:54:24 +0100 Subject: [PATCH] Core: Stream DV Puffin rewrite in RewriteTablePathUtil rewriteDVFile collected all rewritten blobs into an in-memory list before writing them out, creating unnecessary memory pressure for large DV files. Open the PuffinReader and PuffinWriter together and stream each blob directly to the output as it is read, keeping memory bounded to a single blob instead of the full file contents. --- .../apache/iceberg/RewriteTablePathUtil.java | 20 +-- .../iceberg/TestRewriteTablePathUtil.java | 150 ++++++++++++++++++ 2 files changed, 157 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 69f82931833e..cd6604e7995d 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -42,6 +42,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; import org.apache.iceberg.puffin.Puffin; import org.apache.iceberg.puffin.PuffinCompressionCodec; import org.apache.iceberg.puffin.PuffinReader; @@ -676,15 +677,14 @@ private static void rewriteDVFile( String sourcePrefix, String targetPrefix) throws IOException { - List rewrittenBlobs = Lists.newArrayList(); - try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build()) { - // Read all blobs and rewrite them with updated referenced data file paths - for (Pair blobPair : + try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build(); + PuffinWriter writer = + Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) { + for (Pair blobPair : reader.readAll(reader.fileMetadata().blobs())) { - org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first(); + BlobMetadata blobMetadata = blobPair.first(); ByteBuffer blobData = blobPair.second(); - // Get the original properties and update the referenced data file path Map properties = Maps.newHashMap(blobMetadata.properties()); String referencedDataFile = properties.get("referenced-data-file"); if (referencedDataFile != null && referencedDataFile.startsWith(sourcePrefix)) { @@ -692,8 +692,7 @@ private static void rewriteDVFile( properties.put("referenced-data-file", newReferencedDataFile); } - // Create a new blob with updated properties - rewrittenBlobs.add( + writer.write( new Blob( blobMetadata.type(), blobMetadata.inputFields(), @@ -704,11 +703,6 @@ private static void rewriteDVFile( properties)); } } - - try (PuffinWriter writer = - Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) { - rewrittenBlobs.forEach(writer::write); - } } private static PositionDelete newPositionDeleteRecord( diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java index bedd8dd66d71..be20ab38c2f3 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java @@ -23,13 +23,31 @@ import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; import java.util.Set; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.Pair; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(ParameterizedTestExtension.class) public class TestRewriteTablePathUtil extends TestBase { + private static final String REFERENCED_DATA_FILE = "referenced-data-file"; + private static final String CARDINALITY = "cardinality"; @Test public void testStagingPathPreservesDirectoryStructure() { @@ -281,4 +299,136 @@ public void testRewritingMultiplePositionDeleteEntriesWithinManifestFile() throw assertThat(deleteFileRewriteResult.toRewrite()).hasSize(2); } + + @TestTemplate + void rewriteDVFileRewritesReferencedDataFileInBlobMetadata() throws IOException { + assumeThat(formatVersion).as("DVs require format version 3+").isGreaterThanOrEqualTo(3); + + String sourcePrefix = temp.resolve("source").toString(); + String targetPrefix = temp.resolve("target").toString(); + String sourceDataFile = sourcePrefix + "/data/file-a.parquet"; + String externalDataFile = temp.resolve("external/data/file-b.parquet").toString(); + + PositionDeleteIndex sourceDeletes = positionDeleteIndex(1L, 4L); + PositionDeleteIndex externalDeletes = positionDeleteIndex(2L, 5L, 8L); + byte[] sourcePayload = serializedDV(sourceDeletes); + byte[] externalPayload = serializedDV(externalDeletes); + + OutputFile sourceDVFile = + Files.localOutput( + temp.resolve("source/metadata/dv-" + System.nanoTime() + ".puffin").toString()); + try (PuffinWriter writer = Puffin.write(sourceDVFile).createdBy("test").build()) { + writer.write(newDVBlob(sourceDeletes, sourcePayload, sourceDataFile)); + writer.write(newDVBlob(externalDeletes, externalPayload, externalDataFile)); + } + + List sourceBlobMetadata; + try (PuffinReader reader = Puffin.read(sourceDVFile.toInputFile()).build()) { + sourceBlobMetadata = reader.fileMetadata().blobs(); + } + assertThat(sourceBlobMetadata).hasSize(2); + + DeleteFile dvDeleteFile = + FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withFormat(FileFormat.PUFFIN) + .withPath(sourceDVFile.location()) + .withFileSizeInBytes(sourceDVFile.toInputFile().getLength()) + .withPartition(FILE_A.partition()) + .withRecordCount(sourceDeletes.cardinality()) + .withReferencedDataFile(sourceDataFile) + .withContentOffset(sourceBlobMetadata.get(0).offset()) + .withContentSizeInBytes(sourceBlobMetadata.get(0).length()) + .build(); + + OutputFile rewrittenDVFile = + Files.localOutput( + temp.resolve("target/metadata/dv-rewritten-" + System.nanoTime() + ".puffin") + .toString()); + RewriteTablePathUtil.rewritePositionDeleteFile( + dvDeleteFile, rewrittenDVFile, table.io(), table.spec(), sourcePrefix, targetPrefix, null); + + try (PuffinReader reader = Puffin.read(rewrittenDVFile.toInputFile()).build()) { + List rewrittenBlobMetadata = reader.fileMetadata().blobs(); + assertThat(rewrittenBlobMetadata).hasSize(2); + + BlobMetadata rewrittenSourceBlob = rewrittenBlobMetadata.get(0); + BlobMetadata rewrittenExternalBlob = rewrittenBlobMetadata.get(1); + assertDVBlobMetadata( + rewrittenSourceBlob, targetPrefix + "/data/file-a.parquet", sourceDeletes.cardinality()); + assertDVBlobMetadata(rewrittenExternalBlob, externalDataFile, externalDeletes.cardinality()); + + assertThat(rewrittenSourceBlob.offset()).isEqualTo(dvDeleteFile.contentOffset()); + assertThat(rewrittenSourceBlob.length()).isEqualTo(dvDeleteFile.contentSizeInBytes()); + assertThat(rewrittenExternalBlob.offset()) + .isEqualTo(rewrittenSourceBlob.offset() + rewrittenSourceBlob.length()); + + List> blobs = + ImmutableList.copyOf(reader.readAll(rewrittenBlobMetadata)); + assertThat(ByteBuffers.toByteArray(blobs.get(0).second())).isEqualTo(sourcePayload); + assertThat(ByteBuffers.toByteArray(blobs.get(1).second())).isEqualTo(externalPayload); + } + + DeleteFile rewrittenDVDeleteFile = + FileMetadata.deleteFileBuilder(table.spec()) + .copy(dvDeleteFile) + .withPath(rewrittenDVFile.location()) + .withFileSizeInBytes(rewrittenDVFile.toInputFile().getLength()) + .withReferencedDataFile(targetPrefix + "/data/file-a.parquet") + .build(); + assertDeletedPositions(DVUtil.readDV(rewrittenDVDeleteFile, table.io()), 1L, 4L); + } + + private static Blob newDVBlob( + PositionDeleteIndex deletes, byte[] payload, String referencedDataFile) { + return new Blob( + StandardBlobTypes.DV_V1, + ImmutableList.of(MetadataColumns.ROW_POSITION.fieldId()), + -1L, + -1L, + ByteBuffer.wrap(payload), + null, + ImmutableMap.of( + REFERENCED_DATA_FILE, + referencedDataFile, + CARDINALITY, + String.valueOf(deletes.cardinality()))); + } + + private static PositionDeleteIndex positionDeleteIndex(long... positions) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (long position : positions) { + builder.add(position); + } + + return Deletes.toPositionIndex(CloseableIterable.withNoopClose(builder.build())); + } + + private static byte[] serializedDV(PositionDeleteIndex deletes) { + return ByteBuffers.toByteArray(deletes.serialize()); + } + + private static void assertDVBlobMetadata( + BlobMetadata blobMetadata, String referencedDataFile, long cardinality) { + assertThat(blobMetadata.type()).isEqualTo(StandardBlobTypes.DV_V1); + assertThat(blobMetadata.inputFields()).containsExactly(MetadataColumns.ROW_POSITION.fieldId()); + assertThat(blobMetadata.snapshotId()).isEqualTo(-1L); + assertThat(blobMetadata.sequenceNumber()).isEqualTo(-1L); + assertThat(blobMetadata.compressionCodec()).isNull(); + assertThat(blobMetadata.offset()).isPositive(); + assertThat(blobMetadata.length()).isPositive(); + assertThat(blobMetadata.properties()) + .containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + REFERENCED_DATA_FILE, + referencedDataFile, + CARDINALITY, + String.valueOf(cardinality))); + } + + private static void assertDeletedPositions(PositionDeleteIndex deletes, long... positions) { + for (long position : positions) { + assertThat(deletes.isDeleted(position)).isTrue(); + } + } }