From d8e5425f48adce95fce61b75d202b85a97cd67b8 Mon Sep 17 00:00:00 2001 From: CI Builder Date: Mon, 1 Dec 2025 10:15:57 -0800 Subject: [PATCH 01/10] Adding replication as a output file argument --- .../java/org/apache/iceberg/io/FileIO.java | 12 ++ .../apache/iceberg/hadoop/HadoopFileIO.java | 5 + .../iceberg/hadoop/HadoopOutputFile.java | 53 ++++- .../apache/iceberg/io/OutputFileFactory.java | 36 +++- .../TestHadoopOutputFileReplication.java | 158 +++++++++++++++ .../io/TestOutputFileFactoryReplication.java | 183 ++++++++++++++++++ .../source/SparkPositionDeletesRewrite.java | 17 +- .../spark/source/SparkPositionDeltaWrite.java | 20 +- .../source/SparkPositionDeletesRewrite.java | 11 +- .../spark/source/SparkPositionDeltaWrite.java | 10 +- .../source/IcebergSourceDeleteBenchmark.java | 8 +- .../apache/iceberg/spark/SparkWriteConf.java | 9 + .../iceberg/spark/SparkWriteOptions.java | 3 + .../source/SparkPositionDeletesRewrite.java | 11 +- .../spark/source/SparkPositionDeltaWrite.java | 11 +- .../iceberg/spark/source/SparkWrite.java | 2 + .../iceberg/spark/TestSparkWriteConf.java | 42 ++++ 17 files changed, 569 insertions(+), 22 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/hadoop/TestHadoopOutputFileReplication.java create mode 100644 core/src/test/java/org/apache/iceberg/io/TestOutputFileFactoryReplication.java diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index a521cbf79d..8640ed4d82 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -73,6 +73,18 @@ default InputFile newInputFile(ManifestFile manifest) { /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ OutputFile newOutputFile(String path); + /** + * Get a {@link OutputFile} instance to write bytes to the file at the given path with specific + * properties. + * + * @param path the file path + * @param properties the properties for the output file + * @return an OutputFile for writing data + */ + default OutputFile newOutputFile(String path, Map properties) { + return newOutputFile(path); + } + /** Delete the file at the given path. */ void deleteFile(String path); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 7aaa2b6a75..61e45228a6 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -97,6 +97,11 @@ public OutputFile newOutputFile(String path) { return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get()); } + @Override + public OutputFile newOutputFile(String path, Map properties) { + return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get(), properties); + } + @Override public void deleteFile(String path) { Path toDelete = new Path(path); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java index 9453ee5720..7584668baf 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java @@ -19,7 +19,9 @@ package org.apache.iceberg.hadoop; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,6 +31,7 @@ import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PositionOutputStream; /** {@link OutputFile} implementation using the Hadoop {@link FileSystem} API. */ @@ -37,6 +40,7 @@ public class HadoopOutputFile implements OutputFile, NativelyEncryptedFile { private final FileSystem fs; private final Path path; private final Configuration conf; + private final short replication; private NativeFileCryptoParameters nativeEncryptionParameters; public static OutputFile fromLocation(CharSequence location, Configuration conf) { @@ -54,24 +58,53 @@ public static OutputFile fromPath(Path path, Configuration conf) { return fromPath(path, fs, conf); } + public static OutputFile fromPath(Path path, Configuration conf, Map properties) { + short replicationFactor = -1; + if (properties != null) { + String replicationFactorAsString = properties.get(OutputFileFactory.FILE_REPLICATION_FACTOR); + if (replicationFactorAsString != null) { + replicationFactor = Short.parseShort(replicationFactorAsString); + } + } + return fromPath(path, conf, replicationFactor); + } + + public static OutputFile fromPath(Path path, Configuration conf, short replication) { + FileSystem fs = Util.getFs(path, conf); + return new HadoopOutputFile(fs, path, conf, replication); + } + public static OutputFile fromPath(Path path, FileSystem fs) { return fromPath(path, fs, fs.getConf()); } public static OutputFile fromPath(Path path, FileSystem fs, Configuration conf) { - return new HadoopOutputFile(fs, path, conf); + return new HadoopOutputFile(fs, path, conf, (short) -1); } - private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) { + private HadoopOutputFile(FileSystem fs, Path path, Configuration conf, short replication) { this.fs = fs; this.path = path; this.conf = conf; + this.replication = replication; } @Override public PositionOutputStream create() { try { - return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */)); + if (replication > 0) { + return HadoopStreams.wrap( + fs.create( + path, + false /* overwrite */, + conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT), + replication, + fs.getDefaultBlockSize(path))); + } else { + return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */)); + } } catch (FileAlreadyExistsException e) { throw new AlreadyExistsException(e, "Path already exists: %s", path); } catch (IOException e) { @@ -82,7 +115,19 @@ public PositionOutputStream create() { @Override public PositionOutputStream createOrOverwrite() { try { - return HadoopStreams.wrap(fs.create(path, true /* createOrOverwrite */)); + if (replication > 0) { + return HadoopStreams.wrap( + fs.create( + path, + true /* overwrite */, + conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT), + replication, + fs.getDefaultBlockSize(path))); + } else { + return HadoopStreams.wrap(fs.create(path, true /* createOrOverwrite */)); + } } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create file: %s", path); } diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index c46ca132ed..02ff17d64b 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -21,6 +21,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -30,9 +32,11 @@ import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** Factory responsible for generating unique but recognizable data/delete file names. */ public class OutputFileFactory { + public static final String FILE_REPLICATION_FACTOR = "file-replication-factor"; private final PartitionSpec defaultSpec; private final FileFormat format; private final LocationProvider locations; @@ -48,6 +52,7 @@ public class OutputFileFactory { private final String operationId; private final AtomicInteger fileCount = new AtomicInteger(0); private final String suffix; + private final Optional replicationFactorOptional; /** * Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be @@ -63,6 +68,7 @@ public class OutputFileFactory { * @param taskId Second part of the file name * @param operationId Third part of the file name * @param suffix Suffix part of the file name + * @param replicationFactorOptional the replication factor of output file */ private OutputFileFactory( PartitionSpec spec, @@ -73,7 +79,8 @@ private OutputFileFactory( int partitionId, long taskId, String operationId, - String suffix) { + String suffix, + Optional replicationFactorOptional) { this.defaultSpec = spec; this.format = format; this.locations = locations; @@ -83,6 +90,7 @@ private OutputFileFactory( this.taskId = taskId; this.operationId = operationId; this.suffix = suffix; + this.replicationFactorOptional = replicationFactorOptional; } public static Builder builderFor(Table table, int partitionId, long taskId) { @@ -102,10 +110,21 @@ private String generateFilename() { /** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */ public EncryptedOutputFile newOutputFile() { - OutputFile file = ioSupplier.get().newOutputFile(locations.newDataLocation(generateFilename())); + OutputFile file = + ioSupplier + .get() + .newOutputFile(locations.newDataLocation(generateFilename()), getProperties()); return encryptionManager.encrypt(file); } + private Map getProperties() { + Map properties = Maps.newHashMap(); + if (replicationFactorOptional.isPresent()) { + properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replicationFactorOptional)); + } + return properties; + } + /** Generates an {@link EncryptedOutputFile} for partitioned writes in the default spec. */ public EncryptedOutputFile newOutputFile(StructLike partition) { return newOutputFile(defaultSpec, partition); @@ -114,7 +133,7 @@ public EncryptedOutputFile newOutputFile(StructLike partition) { /** Generates an {@link EncryptedOutputFile} for partitioned writes in a given spec. */ public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) { String newDataLocation = locations.newDataLocation(spec, partition, generateFilename()); - OutputFile rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation); + OutputFile rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation, getProperties()); return encryptionManager.encrypt(rawOutputFile); } @@ -127,6 +146,7 @@ public static class Builder { private FileFormat format; private String suffix; private Supplier ioSupplier; + private Optional replicationFactorOptional = Optional.empty(); private Builder(Table table, int partitionId, long taskId) { this.table = table; @@ -151,6 +171,13 @@ public Builder operationId(String newOperationId) { return this; } + public Builder replicationFactor(short replicationFactor) { + if (replicationFactor > 0) { + this.replicationFactorOptional = Optional.of(replicationFactor); + } + return this; + } + public Builder format(FileFormat newFormat) { this.format = newFormat; return this; @@ -185,7 +212,8 @@ public OutputFileFactory build() { partitionId, taskId, operationId, - suffix); + suffix, + replicationFactorOptional); } } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopOutputFileReplication.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopOutputFileReplication.java new file mode 100644 index 0000000000..835d1d6cea --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopOutputFileReplication.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestHadoopOutputFileReplication { + + @TempDir private File tempDir; + private Configuration conf; + private FileSystem fs; + + @BeforeEach + public void before() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + } + + @Test + public void testOutputFileWithDefaultReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-default-replication.txt"); + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, fs, conf); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + assertThat(fs.getFileStatus(testPath).getLen()).isGreaterThan(0); + } + + @Test + public void testOutputFileWithCustomReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-custom-replication.txt"); + short replicationFactor = 2; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data with custom replication".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + assertThat(fs.getFileStatus(testPath).getLen()).isGreaterThan(0); + // Note: Local filesystem doesn't support replication, but the API should work + } + + @Test + public void testOutputFileWithZeroReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-zero-replication.txt"); + short replicationFactor = 0; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data with zero replication".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + } + + @Test + public void testOutputFileWithNegativeReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-negative-replication.txt"); + short replicationFactor = -1; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data with negative replication".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + } + + @Test + public void testCreateOrOverwriteWithCustomReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-overwrite-replication.txt"); + short replicationFactor = 3; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + // Create initial file + try (PositionOutputStream stream = outputFile.createOrOverwrite()) { + stream.write("initial data".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + long firstSize = fs.getFileStatus(testPath).getLen(); + + // Overwrite the file + try (PositionOutputStream stream = outputFile.createOrOverwrite()) { + stream.write("overwritten data with more content".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + long secondSize = fs.getFileStatus(testPath).getLen(); + assertThat(secondSize).isGreaterThan(firstSize); + } + + @Test + public void testFileIONewOutputFileWithReplication() throws IOException { + HadoopFileIO fileIO = new HadoopFileIO(conf); + String location = + new Path(tempDir.toURI().toString(), "test-fileio-replication.txt").toString(); + short replicationFactor = 2; + + OutputFile outputFile = + fileIO.newOutputFile( + location, ImmutableMap.of("file-replication", String.valueOf(replicationFactor))); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data from FileIO".getBytes()); + } + + assertThat(fileIO.newInputFile(location).exists()).isTrue(); + } + + @Test + public void testFileIONewOutputFileWithoutReplication() throws IOException { + HadoopFileIO fileIO = new HadoopFileIO(conf); + String location = + new Path(tempDir.toURI().toString(), "test-fileio-no-replication.txt").toString(); + + // Use default method without replication + OutputFile outputFile = fileIO.newOutputFile(location); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data without explicit replication".getBytes()); + } + + assertThat(fileIO.newInputFile(location).exists()).isTrue(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactoryReplication.java b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactoryReplication.java new file mode 100644 index 0000000000..e570c67025 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactoryReplication.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests for OutputFileFactory's replication factor support. + * + *

Note: These tests verify that the builder pattern accepts replication factor parameters and + * constructs factories correctly. The actual file creation with replication is tested at the + * HadoopOutputFile level. + */ +public class TestOutputFileFactoryReplication { + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get())); + + @TempDir private File tempDir; + + @Test + public void testBuilderWithReplicationFactor() { + Configuration conf = new Configuration(); + HadoopFileIO fileIO = new HadoopFileIO(conf); + PartitionSpec spec = PartitionSpec.unpartitioned(); + + short replicationFactor = 3; + int partitionId = 1; + long taskId = 100L; + + // Verify that the builder accepts replication factor parameter + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + OutputFileFactory.Builder builder = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor(replicationFactor); + + assertThat(builder).isNotNull(); + + OutputFileFactory factory = builder.build(); + assertThat(factory).isNotNull(); + } + + @Test + public void testBuilderWithoutReplicationFactor() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + int partitionId = 1; + long taskId = 100L; + + // Verify that the builder works without replication factor (default) + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table2"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + OutputFileFactory.Builder builder = + OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET); + + assertThat(builder).isNotNull(); + + OutputFileFactory factory = builder.build(); + assertThat(factory).isNotNull(); + } + + @Test + public void testBuilderWithDifferentReplicationFactors() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + int partitionId = 1; + long taskId = 100L; + + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table3"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + // Test with replication factor 1 + OutputFileFactory factory1 = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor((short) 1) + .build(); + assertThat(factory1).isNotNull(); + + // Test with replication factor 5 + OutputFileFactory factory2 = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor((short) 5) + .build(); + assertThat(factory2).isNotNull(); + + // Test with maximum replication factor + OutputFileFactory factory3 = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor(Short.MAX_VALUE) + .build(); + assertThat(factory3).isNotNull(); + } + + @Test + public void testBuilderWithReplicationFactorAndSuffix() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + short replicationFactor = 3; + int partitionId = 1; + long taskId = 100L; + + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table4"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + // Verify that replication factor works with suffix option + OutputFileFactory factory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor(replicationFactor) + .suffix("deletes") + .build(); + + assertThat(factory).isNotNull(); + } + + @Test + public void testBuilderWithAllOptions() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + short replicationFactor = 2; + int partitionId = 1; + long taskId = 100L; + + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table5"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + // Test builder with all options including replication factor + OutputFileFactory factory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.AVRO) + .replicationFactor(replicationFactor) + .operationId("test-operation-id") + .suffix("test-suffix") + .build(); + + assertThat(factory).isNotNull(); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 0aebb6bdb2..0efb1fe937 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -74,6 +74,8 @@ public class SparkPositionDeletesRewrite implements Write { private final String fileSetId; private final int specId; private final StructLike partition; + private final Map writeProperties; + private final short replicationFactor; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -106,6 +108,8 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; + this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -129,7 +133,9 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { writeSchema, dsSchema, specId, - partition); + partition, + writeProperties, + replicationFactor); } @Override @@ -174,6 +180,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final StructType dsSchema; private final int specId; private final StructLike partition; + private final Map writeProperties; + private final short replicationFactor; PositionDeletesWriterFactory( Broadcast tableBroadcast, @@ -183,7 +191,9 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { Schema writeSchema, StructType dsSchema, int specId, - StructLike partition) { + StructLike partition, + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -192,6 +202,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.dsSchema = dsSchema; this.specId = specId; this.partition = partition; + this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -203,6 +215,7 @@ public DataWriter createWriter(int partitionId, long taskId) { .format(format) .operationId(queryId) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); Schema positionDeleteRowSchema = positionDeleteRowSchema(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 577e94a823..7e2385f76c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -102,6 +102,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Map extraSnapshotMetadata; private final Distribution requiredDistribution; private final SortOrder[] requiredOrdering; + private final Map writeProperties; + private final short replicationFactor; private boolean cleanupOnAbort = true; @@ -130,6 +132,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.requiredDistribution = requiredDistribution; this.requiredOrdering = requiredOrdering; + this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -154,7 +158,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context); + return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties, replicationFactor); } @Override @@ -331,11 +335,20 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast
tableBroadcast; private final Command command; private final Context context; - - PositionDeltaWriteFactory(Broadcast
tableBroadcast, Command command, Context context) { + private final Map writeProperties; + private final short replicationFactor; + + PositionDeltaWriteFactory( + Broadcast
tableBroadcast, + Command command, + Context context, + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; + this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -352,6 +365,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .format(context.deleteFileFormat()) .operationId(context.queryId()) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); SparkFileWriterFactory writerFactory = diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 08c06e85b5..d25e254faa 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -78,6 +78,7 @@ public class SparkPositionDeletesRewrite implements Write { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -112,6 +113,7 @@ public class SparkPositionDeletesRewrite implements Write { this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -137,7 +139,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dsSchema, specId, partition, - writeProperties); + writeProperties, + replicationFactor); } @Override @@ -189,6 +192,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; PositionDeletesWriterFactory( Broadcast
tableBroadcast, @@ -200,7 +204,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { StructType dsSchema, int specId, StructLike partition, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -211,6 +216,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.specId = specId; this.partition = partition; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -222,6 +228,7 @@ public DataWriter createWriter(int partitionId, long taskId) { .format(format) .operationId(queryId) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); Schema positionDeleteRowSchema = positionDeleteRowSchema(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index e0cd06de17..67d749780b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -105,6 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final SparkWriteRequirements writeRequirements; private final Context context; private final Map writeProperties; + private final short replicationFactor; private boolean cleanupOnAbort = true; @@ -131,6 +132,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -160,7 +162,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties, replicationFactor); } @Override @@ -341,16 +343,19 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Command command; private final Context context; private final Map writeProperties; + private final short replicationFactor; PositionDeltaWriteFactory( Broadcast
tableBroadcast, Command command, Context context, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -367,6 +372,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .format(context.deleteFileFormat()) .operationId(context.queryId()) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); SparkFileWriterFactory writerFactory = diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index e42707bf10..87ce1c883b 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -63,9 +63,11 @@ public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark { private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class); private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; + private static final short DELETE_FILE_REPLICATION = (short) 3; protected static final int NUM_FILES = 1; protected static final int NUM_ROWS = 10 * 1000 * 1000; + private static final short DEFAULT_REPLICATION_FACTOR = 3; @Setup public void setupBenchmark() throws IOException { @@ -320,7 +322,11 @@ private void writeEqDeletes(List rows) throws IOException { } private OutputFileFactory newFileFactory() { - return OutputFileFactory.builderFor(table(), 1, 1).format(fileFormat()).build(); + // OutputFileFactory is used only for creating delete files. + return OutputFileFactory.builderFor(table(), 1, 1) + .replicationFactor(DEFAULT_REPLICATION_FACTOR) + .format(fileFormat()) + .build(); } private CharSequence noisePath(CharSequence path) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 07393a67fe..e5b04525c1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -132,6 +132,15 @@ public boolean wapEnabled() { .parse(); } + public short fileReplication() { + return (short) + confParser + .intConf() + .option(SparkWriteOptions.DELETE_FILE_REPLICATION) + .defaultValue(SparkWriteOptions.DEFAULT_DELETE_FILE_REPLICATION) + .parse(); + } + public String wapId() { return sessionConf.get(SparkSQLProperties.WAP_ID, null); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index d9c4f66b19..98147b7bfd 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -21,6 +21,9 @@ /** Spark DF write options */ public class SparkWriteOptions { + public static final String DELETE_FILE_REPLICATION = "delete-file-replication"; + public static final short DEFAULT_DELETE_FILE_REPLICATION = 3; + private SparkWriteOptions() {} // Fileformat for write operations(default: Table write.format.default ) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index d917794758..09e43da695 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -78,6 +78,7 @@ public class SparkPositionDeletesRewrite implements Write { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -112,6 +113,7 @@ public class SparkPositionDeletesRewrite implements Write { this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.fileReplication(); } @Override @@ -137,7 +139,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dsSchema, specId, partition, - writeProperties); + writeProperties, + replicationFactor); } @Override @@ -189,6 +192,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; PositionDeletesWriterFactory( Broadcast
tableBroadcast, @@ -200,7 +204,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { StructType dsSchema, int specId, StructLike partition, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -211,6 +216,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.specId = specId; this.partition = partition; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -221,6 +227,7 @@ public DataWriter createWriter(int partitionId, long taskId) { OutputFileFactory.builderFor(table, partitionId, taskId) .format(format) .operationId(queryId) + .replicationFactor(replicationFactor) .suffix("deletes") .build(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 3f0f9c843b..437fd018ba 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -105,6 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final SparkWriteRequirements writeRequirements; private final Context context; private final Map writeProperties; + private final short replicationFactor; private boolean cleanupOnAbort = true; @@ -131,6 +132,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.fileReplication(); } @Override @@ -171,7 +173,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + return new PositionDeltaWriteFactory( + tableBroadcast, command, context, writeProperties, replicationFactor); } @Override @@ -352,16 +355,19 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Command command; private final Context context; private final Map writeProperties; + private final short replicationFactor; PositionDeltaWriteFactory( Broadcast
tableBroadcast, Command command, Context context, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -378,6 +384,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .format(context.deleteFileFormat()) .operationId(context.queryId()) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); SparkFileWriterFactory writerFactory = diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 908a3d3d3c..668dc21900 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -102,6 +102,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final boolean useFanoutWriter; private final SparkWriteRequirements writeRequirements; private final Map writeProperties; + private final short deleteFileReplication; private boolean cleanupOnAbort = true; @@ -133,6 +134,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); this.writeProperties = writeConf.writeProperties(); + this.deleteFileReplication = writeConf.fileReplication(); } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 9f4a4f47bf..01e7048d17 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -545,4 +545,46 @@ private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf) assertThat(writeConf.copyOnWriteDistributionMode(MERGE)).isEqualTo(expectedMode); assertThat(writeConf.positionDeltaDistributionMode(MERGE)).isEqualTo(expectedMode); } + + @TestTemplate + public void testFileReplicationDefault() { + Table table = validationCatalog.loadTable(tableIdent); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + // Default replication factor should be 3 as per DEFAULT_DELETE_FILE_REPLICATION + assertThat(writeConf.fileReplication()).isEqualTo((short) 3); + } + + @TestTemplate + public void testFileReplicationFromWriteOption() { + Table table = validationCatalog.loadTable(tableIdent); + + Map writeOptions = + ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, "5"); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + assertThat(writeConf.fileReplication()).isEqualTo((short) 5); + } + + @TestTemplate + public void testFileReplicationWithOne() { + Table table = validationCatalog.loadTable(tableIdent); + + Map writeOptions = + ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, "1"); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + assertThat(writeConf.fileReplication()).isEqualTo((short) 1); + } + + @TestTemplate + public void testFileReplicationWithMaxValue() { + Table table = validationCatalog.loadTable(tableIdent); + + Map writeOptions = + ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, String.valueOf(Short.MAX_VALUE)); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + assertThat(writeConf.fileReplication()).isEqualTo(Short.MAX_VALUE); + } } From 364e96a9f9590856e39d19ecb11d9b2062b8b47a Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 2 Dec 2025 10:10:05 -0800 Subject: [PATCH 02/10] Fix spotless apply-build failure. --- .../apache/iceberg/spark/source/SparkPositionDeltaWrite.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 7e2385f76c..f0684a953e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -158,7 +158,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties, replicationFactor); + return new PositionDeltaWriteFactory( + tableBroadcast, command, context, writeProperties, replicationFactor); } @Override From 9e078c8a8f524a49995250ae380ce6911bb44396 Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 2 Dec 2025 11:09:27 -0800 Subject: [PATCH 03/10] Fix checkstyle issues. --- .../apache/iceberg/hadoop/HadoopFileIO.java | 4 +- .../apache/iceberg/io/OutputFileFactory.java | 4 +- .../iceberg/spark/SparkSQLProperties.java | 12 + .../apache/iceberg/spark/SparkWriteConf.java | 214 ++++++++++++++++++ .../iceberg/spark/SparkWriteOptions.java | 14 ++ .../source/SparkPositionDeletesRewrite.java | 1 + .../apache/iceberg/spark/SparkWriteConf.java | 9 + .../iceberg/spark/SparkWriteOptions.java | 4 + .../spark/source/SparkPositionDeltaWrite.java | 3 +- .../apache/iceberg/spark/SparkWriteConf.java | 2 +- .../iceberg/spark/SparkWriteOptions.java | 3 +- .../source/SparkPositionDeletesRewrite.java | 2 +- .../spark/source/SparkPositionDeltaWrite.java | 2 +- .../iceberg/spark/source/SparkWrite.java | 2 +- .../iceberg/spark/TestSparkWriteConf.java | 16 +- 15 files changed, 272 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 61e45228a6..8286099e8e 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -98,8 +98,8 @@ public OutputFile newOutputFile(String path) { } @Override - public OutputFile newOutputFile(String path, Map properties) { - return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get(), properties); + public OutputFile newOutputFile(String path, Map fileProperties) { + return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get(), fileProperties); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 02ff17d64b..57ff41d26a 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -119,9 +119,7 @@ public EncryptedOutputFile newOutputFile() { private Map getProperties() { Map properties = Maps.newHashMap(); - if (replicationFactorOptional.isPresent()) { - properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replicationFactorOptional)); - } + replicationFactorOptional.ifPresent(replicationFactor -> properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replicationFactor))); return properties; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 1d1fa885bb..cd4c541d99 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -66,4 +66,16 @@ private SparkSQLProperties() {} // Controls whether to report locality information to Spark while allocating input partitions public static final String LOCALITY = "spark.sql.iceberg.locality.enabled"; + + // Controls compression codec for write operations + public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; + + // Controls compression level for write operations + public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level"; + + // Controls compression strategy for write operations + public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy"; + + // Controls advisory partition size for write operations + public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size"; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 41777c5155..95705b1f42 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -21,6 +21,18 @@ import static org.apache.iceberg.DistributionMode.HASH; import static org.apache.iceberg.DistributionMode.NONE; import static org.apache.iceberg.DistributionMode.RANGE; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import java.util.Locale; import java.util.Map; @@ -133,6 +145,15 @@ public String wapId() { return sessionConf.get(SparkSQLProperties.WAP_ID, null); } + public short deleteFileReplication() { + return (short) + confParser + .intConf() + .option(SparkWriteOptions.DELETE_FILE_REPLICATION) + .defaultValue(SparkWriteOptions.DEFAULT_DELETE_FILE_REPLICATION) + .parse(); + } + public boolean mergeSchema() { return confParser .booleanConf() @@ -373,4 +394,197 @@ public String branch() { return branch; } + + public Map writeProperties() { + Map writeProperties = Maps.newHashMap(); + writeProperties.putAll(dataWriteProperties()); + writeProperties.putAll(deleteWriteProperties()); + return writeProperties; + } + + private Map dataWriteProperties() { + Map writeProperties = Maps.newHashMap(); + FileFormat dataFormat = dataFileFormat(); + + switch (dataFormat) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; + + default: + // skip + } + + return writeProperties; + } + + private Map deleteWriteProperties() { + Map writeProperties = Maps.newHashMap(); + FileFormat deleteFormat = deleteFileFormat(); + + switch (deleteFormat) { + case PARQUET: + writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); + String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); + if (deleteParquetCompressionLevel != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(DELETE_AVRO_COMPRESSION, deleteAvroCompressionCodec()); + String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); + if (deleteAvroCompressionLevel != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec()); + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy()); + break; + + default: + // skip + } + + return writeProperties; + } + + private String parquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.PARQUET_COMPRESSION) + .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .parse(); + } + + private String deleteParquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_PARQUET_COMPRESSION) + .defaultValue(parquetCompressionCodec()) + .parse(); + } + + private String parquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) + .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + private String deleteParquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) + .defaultValue(parquetCompressionLevel()) + .parseOptional(); + } + + private String avroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.AVRO_COMPRESSION) + .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .parse(); + } + + private String deleteAvroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_AVRO_COMPRESSION) + .defaultValue(avroCompressionCodec()) + .parse(); + } + + private String avroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) + .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + private String deleteAvroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) + .defaultValue(avroCompressionLevel()) + .parseOptional(); + } + + private String orcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.ORC_COMPRESSION) + .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .parse(); + } + + private String deleteOrcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_ORC_COMPRESSION) + .defaultValue(orcCompressionCodec()) + .parse(); + } + + private String orcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY) + .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .parse(); + } + + private String deleteOrcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) + .defaultValue(orcCompressionStrategy()) + .parse(); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index c4eacb7b98..c49cba87bb 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -23,6 +23,9 @@ public class SparkWriteOptions { private SparkWriteOptions() {} + public static final String DELETE_FILE_REPLICATION = "delete-file-replication"; + public static final short DEFAULT_DELETE_FILE_REPLICATION = 3; + // Fileformat for write operations(default: Table write.format.default ) public static final String WRITE_FORMAT = "write-format"; @@ -79,4 +82,15 @@ private SparkWriteOptions() {} // Isolation Level for DataFrame calls. Currently supported by overwritePartitions public static final String ISOLATION_LEVEL = "isolation-level"; + + // Controls write compress options + public static final String COMPRESSION_CODEC = "compression-codec"; + public static final String COMPRESSION_LEVEL = "compression-level"; + public static final String COMPRESSION_STRATEGY = "compression-strategy"; + + // Overrides the advisory partition size + public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size"; + + // Overrides the delete granularity + public static final String DELETE_GRANULARITY = "delete-granularity"; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 0efb1fe937..aaaf99b42f 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 824e3aca9a..c37c09c95d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -132,6 +132,15 @@ public String wapId() { return sessionConf.get(SparkSQLProperties.WAP_ID, null); } + public short deleteFileReplication() { + return (short) + confParser + .intConf() + .option(SparkWriteOptions.DELETE_FILE_REPLICATION) + .defaultValue(SparkWriteOptions.DEFAULT_DELETE_FILE_REPLICATION) + .parse(); + } + public boolean mergeSchema() { return confParser .booleanConf() diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 391cb6bae3..f5f3a4259b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -23,6 +23,10 @@ public class SparkWriteOptions { private SparkWriteOptions() {} + public static final String DELETE_FILE_REPLICATION = "delete-file-replication"; + + public static final short DEFAULT_DELETE_FILE_REPLICATION = 3; + // Fileformat for write operations(default: Table write.format.default ) public static final String WRITE_FORMAT = "write-format"; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 67d749780b..4674c2c340 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -162,7 +162,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties, replicationFactor); + return new PositionDeltaWriteFactory( + tableBroadcast, command, context, writeProperties, replicationFactor); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index e5b04525c1..2bef37a8e2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -132,7 +132,7 @@ public boolean wapEnabled() { .parse(); } - public short fileReplication() { + public short deleteFileReplication() { return (short) confParser .intConf() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 98147b7bfd..17f6250a49 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -20,12 +20,11 @@ /** Spark DF write options */ public class SparkWriteOptions { + private SparkWriteOptions() {} public static final String DELETE_FILE_REPLICATION = "delete-file-replication"; public static final short DEFAULT_DELETE_FILE_REPLICATION = 3; - private SparkWriteOptions() {} - // Fileformat for write operations(default: Table write.format.default ) public static final String WRITE_FORMAT = "write-format"; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 09e43da695..00297b4099 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -113,7 +113,7 @@ public class SparkPositionDeletesRewrite implements Write { this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); - this.replicationFactor = writeConf.fileReplication(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 437fd018ba..833af8376a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -132,7 +132,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); - this.replicationFactor = writeConf.fileReplication(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 668dc21900..551e802103 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -134,7 +134,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); this.writeProperties = writeConf.writeProperties(); - this.deleteFileReplication = writeConf.fileReplication(); + this.deleteFileReplication = writeConf.deleteFileReplication(); } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 01e7048d17..e1a6a7cb67 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -547,44 +547,44 @@ private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf) } @TestTemplate - public void testFileReplicationDefault() { + public void testDeleteFileReplicationDefault() { Table table = validationCatalog.loadTable(tableIdent); SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); // Default replication factor should be 3 as per DEFAULT_DELETE_FILE_REPLICATION - assertThat(writeConf.fileReplication()).isEqualTo((short) 3); + assertThat(writeConf.deleteFileReplication()).isEqualTo((short) 3); } @TestTemplate - public void testFileReplicationFromWriteOption() { + public void testDeleteFileReplicationFromWriteOption() { Table table = validationCatalog.loadTable(tableIdent); Map writeOptions = ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, "5"); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); - assertThat(writeConf.fileReplication()).isEqualTo((short) 5); + assertThat(writeConf.deleteFileReplication()).isEqualTo((short) 5); } @TestTemplate - public void testFileReplicationWithOne() { + public void testDeleteFileReplicationWithOne() { Table table = validationCatalog.loadTable(tableIdent); Map writeOptions = ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, "1"); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); - assertThat(writeConf.fileReplication()).isEqualTo((short) 1); + assertThat(writeConf.deleteFileReplication()).isEqualTo((short) 1); } @TestTemplate - public void testFileReplicationWithMaxValue() { + public void testDeleteFileReplicationWithMaxValue() { Table table = validationCatalog.loadTable(tableIdent); Map writeOptions = ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, String.valueOf(Short.MAX_VALUE)); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); - assertThat(writeConf.fileReplication()).isEqualTo(Short.MAX_VALUE); + assertThat(writeConf.deleteFileReplication()).isEqualTo(Short.MAX_VALUE); } } From 4f60693dcd1f456a3f4e33cd0a0e008bc10f9a9e Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 2 Dec 2025 13:36:23 -0800 Subject: [PATCH 04/10] Fix checkstyle issues. --- .../main/java/org/apache/iceberg/io/OutputFileFactory.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 57ff41d26a..030eed97b7 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -119,7 +119,9 @@ public EncryptedOutputFile newOutputFile() { private Map getProperties() { Map properties = Maps.newHashMap(); - replicationFactorOptional.ifPresent(replicationFactor -> properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replicationFactor))); + replicationFactorOptional.ifPresent( + replicationFactor -> + properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replicationFactor))); return properties; } From 9d55eee05699512fe4b3db9a00783395faf37e62 Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 2 Dec 2025 14:35:24 -0800 Subject: [PATCH 05/10] Fix rev-api errors. --- baseline.gradle | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/baseline.gradle b/baseline.gradle index 7c1ceb7874..bfaaf15862 100644 --- a/baseline.gradle +++ b/baseline.gradle @@ -120,4 +120,9 @@ subprojects { quiet = false } } + + // Apply revapi plugin to specific projects for API compatibility checking + if (project.name in ['iceberg-api', 'iceberg-core', 'iceberg-parquet', 'iceberg-orc', 'iceberg-common', 'iceberg-data']) { + apply plugin: 'com.palantir.revapi' + } } From 3596e30e707a9e0586c6ea662681478aa8456699 Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 2 Dec 2025 14:39:09 -0800 Subject: [PATCH 06/10] Fix rev-api errors. --- .palantir/revapi.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a41d3ddfb8..f70b4b4d97 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1018,6 +1018,16 @@ acceptedBreaks: old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::(org.apache.iceberg.Table,\ \ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)" justification: "Removing deprecated code" + "1.5.2.2": + com.linkedin.iceberg:iceberg-core: + - code: "java.method.addedToInterface" + new: "method org.apache.hadoop.conf.Configuration org.apache.hadoop.conf.Configurable::getConf()\ + \ @ org.apache.iceberg.hadoop.HadoopConfigurable" + justification: "This break is ok" + - code: "java.method.addedToInterface" + new: "method void org.apache.hadoop.conf.Configurable::setConf(org.apache.hadoop.conf.Configuration)\ + \ @ org.apache.iceberg.hadoop.HadoopConfigurable" + justification: "This break is ok" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" From 16e2847e94ea047c231a0e985b690ffd5e79d101 Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 9 Dec 2025 11:49:49 -0800 Subject: [PATCH 07/10] Address review comments. --- .../iceberg/hadoop/HadoopOutputFile.java | 3 +- .../apache/iceberg/io/OutputFileFactory.java | 20 +- gradle.properties | 2 +- .../iceberg/spark/SparkSQLProperties.java | 12 - .../apache/iceberg/spark/SparkWriteConf.java | 214 ------------------ .../iceberg/spark/SparkWriteOptions.java | 14 -- .../source/SparkPositionDeletesRewrite.java | 18 +- .../spark/source/SparkPositionDeltaWrite.java | 21 +- 8 files changed, 18 insertions(+), 286 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java index 7584668baf..209d0d94cb 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java @@ -37,6 +37,7 @@ /** {@link OutputFile} implementation using the Hadoop {@link FileSystem} API. */ public class HadoopOutputFile implements OutputFile, NativelyEncryptedFile { + private static final short DEFAULT_REPLICATION_FACTOR = 3; private final FileSystem fs; private final Path path; private final Configuration conf; @@ -59,7 +60,7 @@ public static OutputFile fromPath(Path path, Configuration conf) { } public static OutputFile fromPath(Path path, Configuration conf, Map properties) { - short replicationFactor = -1; + short replicationFactor = DEFAULT_REPLICATION_FACTOR; if (properties != null) { String replicationFactorAsString = properties.get(OutputFileFactory.FILE_REPLICATION_FACTOR); if (replicationFactorAsString != null) { diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 030eed97b7..fb73ce8465 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg.io; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -34,6 +31,9 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + /** Factory responsible for generating unique but recognizable data/delete file names. */ public class OutputFileFactory { public static final String FILE_REPLICATION_FACTOR = "file-replication-factor"; @@ -52,7 +52,7 @@ public class OutputFileFactory { private final String operationId; private final AtomicInteger fileCount = new AtomicInteger(0); private final String suffix; - private final Optional replicationFactorOptional; + private final Optional replicationFactor; /** * Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be @@ -68,7 +68,7 @@ public class OutputFileFactory { * @param taskId Second part of the file name * @param operationId Third part of the file name * @param suffix Suffix part of the file name - * @param replicationFactorOptional the replication factor of output file + * @param replicationFactor the replication factor of output file */ private OutputFileFactory( PartitionSpec spec, @@ -80,7 +80,7 @@ private OutputFileFactory( long taskId, String operationId, String suffix, - Optional replicationFactorOptional) { + Optional replicationFactor) { this.defaultSpec = spec; this.format = format; this.locations = locations; @@ -90,7 +90,7 @@ private OutputFileFactory( this.taskId = taskId; this.operationId = operationId; this.suffix = suffix; - this.replicationFactorOptional = replicationFactorOptional; + this.replicationFactor = replicationFactor; } public static Builder builderFor(Table table, int partitionId, long taskId) { @@ -119,9 +119,9 @@ public EncryptedOutputFile newOutputFile() { private Map getProperties() { Map properties = Maps.newHashMap(); - replicationFactorOptional.ifPresent( - replicationFactor -> - properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replicationFactor))); + replicationFactor.ifPresent( + replication -> + properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replication))); return properties; } diff --git a/gradle.properties b/gradle.properties index ea857e7f27..5b85c446ea 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,7 +21,7 @@ systemProp.knownFlinkVersions=1.16,1.17,1.18 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 systemProp.defaultSparkVersions=3.5 -systemProp.knownSparkVersions=3.3,3.4,3.5 +systemProp.knownSparkVersions=3.5 systemProp.defaultScalaVersion=2.12 systemProp.knownScalaVersions=2.12,2.13 org.gradle.parallel=true diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index cd4c541d99..1d1fa885bb 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -66,16 +66,4 @@ private SparkSQLProperties() {} // Controls whether to report locality information to Spark while allocating input partitions public static final String LOCALITY = "spark.sql.iceberg.locality.enabled"; - - // Controls compression codec for write operations - public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; - - // Controls compression level for write operations - public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level"; - - // Controls compression strategy for write operations - public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy"; - - // Controls advisory partition size for write operations - public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size"; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 95705b1f42..41777c5155 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -21,18 +21,6 @@ import static org.apache.iceberg.DistributionMode.HASH; import static org.apache.iceberg.DistributionMode.NONE; import static org.apache.iceberg.DistributionMode.RANGE; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import java.util.Locale; import java.util.Map; @@ -145,15 +133,6 @@ public String wapId() { return sessionConf.get(SparkSQLProperties.WAP_ID, null); } - public short deleteFileReplication() { - return (short) - confParser - .intConf() - .option(SparkWriteOptions.DELETE_FILE_REPLICATION) - .defaultValue(SparkWriteOptions.DEFAULT_DELETE_FILE_REPLICATION) - .parse(); - } - public boolean mergeSchema() { return confParser .booleanConf() @@ -394,197 +373,4 @@ public String branch() { return branch; } - - public Map writeProperties() { - Map writeProperties = Maps.newHashMap(); - writeProperties.putAll(dataWriteProperties()); - writeProperties.putAll(deleteWriteProperties()); - return writeProperties; - } - - private Map dataWriteProperties() { - Map writeProperties = Maps.newHashMap(); - FileFormat dataFormat = dataFileFormat(); - - switch (dataFormat) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); - String parquetCompressionLevel = parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - break; - - case AVRO: - writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); - String avroCompressionLevel = avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); - } - break; - - case ORC: - writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); - break; - - default: - // skip - } - - return writeProperties; - } - - private Map deleteWriteProperties() { - Map writeProperties = Maps.newHashMap(); - FileFormat deleteFormat = deleteFileFormat(); - - switch (deleteFormat) { - case PARQUET: - writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); - String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); - if (deleteParquetCompressionLevel != null) { - writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); - } - break; - - case AVRO: - writeProperties.put(DELETE_AVRO_COMPRESSION, deleteAvroCompressionCodec()); - String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); - if (deleteAvroCompressionLevel != null) { - writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); - } - break; - - case ORC: - writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec()); - writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy()); - break; - - default: - // skip - } - - return writeProperties; - } - - private String parquetCompressionCodec() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_CODEC) - .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) - .tableProperty(TableProperties.PARQUET_COMPRESSION) - .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) - .parse(); - } - - private String deleteParquetCompressionCodec() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_CODEC) - .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) - .tableProperty(DELETE_PARQUET_COMPRESSION) - .defaultValue(parquetCompressionCodec()) - .parse(); - } - - private String parquetCompressionLevel() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_LEVEL) - .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) - .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) - .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) - .parseOptional(); - } - - private String deleteParquetCompressionLevel() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_LEVEL) - .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) - .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) - .defaultValue(parquetCompressionLevel()) - .parseOptional(); - } - - private String avroCompressionCodec() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_CODEC) - .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) - .tableProperty(TableProperties.AVRO_COMPRESSION) - .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) - .parse(); - } - - private String deleteAvroCompressionCodec() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_CODEC) - .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) - .tableProperty(DELETE_AVRO_COMPRESSION) - .defaultValue(avroCompressionCodec()) - .parse(); - } - - private String avroCompressionLevel() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_LEVEL) - .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) - .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) - .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) - .parseOptional(); - } - - private String deleteAvroCompressionLevel() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_LEVEL) - .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) - .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) - .defaultValue(avroCompressionLevel()) - .parseOptional(); - } - - private String orcCompressionCodec() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_CODEC) - .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) - .tableProperty(TableProperties.ORC_COMPRESSION) - .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) - .parse(); - } - - private String deleteOrcCompressionCodec() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_CODEC) - .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) - .tableProperty(DELETE_ORC_COMPRESSION) - .defaultValue(orcCompressionCodec()) - .parse(); - } - - private String orcCompressionStrategy() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_STRATEGY) - .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) - .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY) - .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) - .parse(); - } - - private String deleteOrcCompressionStrategy() { - return confParser - .stringConf() - .option(SparkWriteOptions.COMPRESSION_STRATEGY) - .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) - .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) - .defaultValue(orcCompressionStrategy()) - .parse(); - } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index c49cba87bb..c4eacb7b98 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -23,9 +23,6 @@ public class SparkWriteOptions { private SparkWriteOptions() {} - public static final String DELETE_FILE_REPLICATION = "delete-file-replication"; - public static final short DEFAULT_DELETE_FILE_REPLICATION = 3; - // Fileformat for write operations(default: Table write.format.default ) public static final String WRITE_FORMAT = "write-format"; @@ -82,15 +79,4 @@ private SparkWriteOptions() {} // Isolation Level for DataFrame calls. Currently supported by overwritePartitions public static final String ISOLATION_LEVEL = "isolation-level"; - - // Controls write compress options - public static final String COMPRESSION_CODEC = "compression-codec"; - public static final String COMPRESSION_LEVEL = "compression-level"; - public static final String COMPRESSION_STRATEGY = "compression-strategy"; - - // Overrides the advisory partition size - public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size"; - - // Overrides the delete granularity - public static final String DELETE_GRANULARITY = "delete-granularity"; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index aaaf99b42f..0aebb6bdb2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Map; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -75,8 +74,6 @@ public class SparkPositionDeletesRewrite implements Write { private final String fileSetId; private final int specId; private final StructLike partition; - private final Map writeProperties; - private final short replicationFactor; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -109,8 +106,6 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; - this.writeProperties = writeConf.writeProperties(); - this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -134,9 +129,7 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { writeSchema, dsSchema, specId, - partition, - writeProperties, - replicationFactor); + partition); } @Override @@ -181,8 +174,6 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final StructType dsSchema; private final int specId; private final StructLike partition; - private final Map writeProperties; - private final short replicationFactor; PositionDeletesWriterFactory( Broadcast
tableBroadcast, @@ -192,9 +183,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { Schema writeSchema, StructType dsSchema, int specId, - StructLike partition, - Map writeProperties, - short replicationFactor) { + StructLike partition) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -203,8 +192,6 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.dsSchema = dsSchema; this.specId = specId; this.partition = partition; - this.writeProperties = writeProperties; - this.replicationFactor = replicationFactor; } @Override @@ -216,7 +203,6 @@ public DataWriter createWriter(int partitionId, long taskId) { .format(format) .operationId(queryId) .suffix("deletes") - .replicationFactor(replicationFactor) .build(); Schema positionDeleteRowSchema = positionDeleteRowSchema(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index f0684a953e..577e94a823 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -102,8 +102,6 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Map extraSnapshotMetadata; private final Distribution requiredDistribution; private final SortOrder[] requiredOrdering; - private final Map writeProperties; - private final short replicationFactor; private boolean cleanupOnAbort = true; @@ -132,8 +130,6 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.requiredDistribution = requiredDistribution; this.requiredOrdering = requiredOrdering; - this.writeProperties = writeConf.writeProperties(); - this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -158,8 +154,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory( - tableBroadcast, command, context, writeProperties, replicationFactor); + return new PositionDeltaWriteFactory(tableBroadcast, command, context); } @Override @@ -336,20 +331,11 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast
tableBroadcast; private final Command command; private final Context context; - private final Map writeProperties; - private final short replicationFactor; - - PositionDeltaWriteFactory( - Broadcast
tableBroadcast, - Command command, - Context context, - Map writeProperties, - short replicationFactor) { + + PositionDeltaWriteFactory(Broadcast
tableBroadcast, Command command, Context context) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; - this.writeProperties = writeProperties; - this.replicationFactor = replicationFactor; } @Override @@ -366,7 +352,6 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .format(context.deleteFileFormat()) .operationId(context.queryId()) .suffix("deletes") - .replicationFactor(replicationFactor) .build(); SparkFileWriterFactory writerFactory = From d3f411e4d88a39d66bac44d1e3d07b674172dea8 Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 9 Dec 2025 11:56:58 -0800 Subject: [PATCH 08/10] Address review comments. --- .../java/org/apache/iceberg/io/FileIO.java | 5 +++- .../apache/iceberg/io/OutputFileFactory.java | 29 ++++++++++++------- gradle.properties | 2 +- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index 8640ed4d82..7b69866115 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -82,7 +82,10 @@ default InputFile newInputFile(ManifestFile manifest) { * @return an OutputFile for writing data */ default OutputFile newOutputFile(String path, Map properties) { - return newOutputFile(path); + throw new UnsupportedOperationException( + String.format( + "Creating output file at: %s with properties: %s is not " + "supported", + path, properties)); } /** Delete the file at the given path. */ diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index fb73ce8465..e54cac37f9 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.io; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -31,9 +34,6 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - /** Factory responsible for generating unique but recognizable data/delete file names. */ public class OutputFileFactory { public static final String FILE_REPLICATION_FACTOR = "file-replication-factor"; @@ -110,18 +110,22 @@ private String generateFilename() { /** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */ public EncryptedOutputFile newOutputFile() { - OutputFile file = - ioSupplier - .get() - .newOutputFile(locations.newDataLocation(generateFilename()), getProperties()); + OutputFile file; + if (replicationFactor.isPresent()) { + file = + ioSupplier + .get() + .newOutputFile(locations.newDataLocation(generateFilename()), getProperties()); + } else { + file = ioSupplier.get().newOutputFile(locations.newDataLocation(generateFilename())); + } return encryptionManager.encrypt(file); } private Map getProperties() { Map properties = Maps.newHashMap(); replicationFactor.ifPresent( - replication -> - properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replication))); + replication -> properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replication))); return properties; } @@ -133,7 +137,12 @@ public EncryptedOutputFile newOutputFile(StructLike partition) { /** Generates an {@link EncryptedOutputFile} for partitioned writes in a given spec. */ public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) { String newDataLocation = locations.newDataLocation(spec, partition, generateFilename()); - OutputFile rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation, getProperties()); + OutputFile rawOutputFile; + if (replicationFactor.isPresent()) { + rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation, getProperties()); + } else { + rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation); + } return encryptionManager.encrypt(rawOutputFile); } diff --git a/gradle.properties b/gradle.properties index 5b85c446ea..ea857e7f27 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,7 +21,7 @@ systemProp.knownFlinkVersions=1.16,1.17,1.18 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 systemProp.defaultSparkVersions=3.5 -systemProp.knownSparkVersions=3.5 +systemProp.knownSparkVersions=3.3,3.4,3.5 systemProp.defaultScalaVersion=2.12 systemProp.knownScalaVersions=2.12,2.13 org.gradle.parallel=true From 3eb8f71467a9a704a630121ba94e41f1e472254d Mon Sep 17 00:00:00 2001 From: CI Builder Date: Tue, 9 Dec 2025 18:02:27 -0800 Subject: [PATCH 09/10] Fix failing tests. --- .../iceberg/spark/extensions/TestSparkExecutorCache.java | 5 +++++ .../org/apache/iceberg/spark/TestSparkExecutorCache.java | 5 +++++ .../org/apache/iceberg/spark/TestSparkExecutorCache.java | 5 +++++ 3 files changed, 15 insertions(+) diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java index 3d995cc4f0..0d202af0c8 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java @@ -320,6 +320,11 @@ public OutputFile newOutputFile(String path) { return Files.localOutput(path); } + @Override + public OutputFile newOutputFile(String path, Map properties) { + return Files.localOutput(path); + } + @Override public void deleteFile(String path) { File file = new File(path); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java index 35dfb55d5b..8bdc3d6eae 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java @@ -398,6 +398,11 @@ public OutputFile newOutputFile(String path) { return Files.localOutput(path); } + @Override + public OutputFile newOutputFile(String path, Map properties) { + return newOutputFile(path); + } + @Override public void deleteFile(String path) { File file = new File(path); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java index 189c125b4e..cdf8fb1f94 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java @@ -464,6 +464,11 @@ public OutputFile newOutputFile(String path) { return Files.localOutput(path); } + @Override + public OutputFile newOutputFile(String path, Map properties) { + return Files.localOutput(path); + } + @Override public void deleteFile(String path) { File file = new File(path); From 84243fadac7a66a3d88e4f5cb9bbdf4c86ac705a Mon Sep 17 00:00:00 2001 From: CI Builder Date: Mon, 5 Jan 2026 12:44:12 -0800 Subject: [PATCH 10/10] Address review comments. --- .../iceberg/hadoop/HadoopOutputFile.java | 13 ++++++++++- .../TestHadoopOutputFileReplication.java | 22 ++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java index 209d0d94cb..9224c4ef5c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java @@ -33,10 +33,13 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PositionOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** {@link OutputFile} implementation using the Hadoop {@link FileSystem} API. */ public class HadoopOutputFile implements OutputFile, NativelyEncryptedFile { + private static final Logger LOG = LoggerFactory.getLogger(HadoopOutputFile.class); private static final short DEFAULT_REPLICATION_FACTOR = 3; private final FileSystem fs; private final Path path; @@ -64,7 +67,15 @@ public static OutputFile fromPath(Path path, Configuration conf, Map