diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a41d3ddfb8..f70b4b4d97 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1018,6 +1018,16 @@ acceptedBreaks: old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::(org.apache.iceberg.Table,\ \ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)" justification: "Removing deprecated code" + "1.5.2.2": + com.linkedin.iceberg:iceberg-core: + - code: "java.method.addedToInterface" + new: "method org.apache.hadoop.conf.Configuration org.apache.hadoop.conf.Configurable::getConf()\ + \ @ org.apache.iceberg.hadoop.HadoopConfigurable" + justification: "This break is ok" + - code: "java.method.addedToInterface" + new: "method void org.apache.hadoop.conf.Configurable::setConf(org.apache.hadoop.conf.Configuration)\ + \ @ org.apache.iceberg.hadoop.HadoopConfigurable" + justification: "This break is ok" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index a521cbf79d..7b69866115 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -73,6 +73,21 @@ default InputFile newInputFile(ManifestFile manifest) { /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ OutputFile newOutputFile(String path); + /** + * Get a {@link OutputFile} instance to write bytes to the file at the given path with specific + * properties. + * + * @param path the file path + * @param properties the properties for the output file + * @return an OutputFile for writing data + */ + default OutputFile newOutputFile(String path, Map properties) { + throw new UnsupportedOperationException( + String.format( + "Creating output file at: %s with properties: %s is not " + "supported", + path, properties)); + } + /** Delete the file at the given path. */ void deleteFile(String path); diff --git a/baseline.gradle b/baseline.gradle index 7c1ceb7874..bfaaf15862 100644 --- a/baseline.gradle +++ b/baseline.gradle @@ -120,4 +120,9 @@ subprojects { quiet = false } } + + // Apply revapi plugin to specific projects for API compatibility checking + if (project.name in ['iceberg-api', 'iceberg-core', 'iceberg-parquet', 'iceberg-orc', 'iceberg-common', 'iceberg-data']) { + apply plugin: 'com.palantir.revapi' + } } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 7aaa2b6a75..8286099e8e 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -97,6 +97,11 @@ public OutputFile newOutputFile(String path) { return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get()); } + @Override + public OutputFile newOutputFile(String path, Map fileProperties) { + return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get(), fileProperties); + } + @Override public void deleteFile(String path) { Path toDelete = new Path(path); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java index 9453ee5720..9224c4ef5c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java @@ -19,7 +19,9 @@ package org.apache.iceberg.hadoop; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,14 +31,20 @@ import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PositionOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** {@link OutputFile} implementation using the Hadoop {@link FileSystem} API. */ public class HadoopOutputFile implements OutputFile, NativelyEncryptedFile { + private static final Logger LOG = LoggerFactory.getLogger(HadoopOutputFile.class); + private static final short DEFAULT_REPLICATION_FACTOR = 3; private final FileSystem fs; private final Path path; private final Configuration conf; + private final short replication; private NativeFileCryptoParameters nativeEncryptionParameters; public static OutputFile fromLocation(CharSequence location, Configuration conf) { @@ -54,24 +62,61 @@ public static OutputFile fromPath(Path path, Configuration conf) { return fromPath(path, fs, conf); } + public static OutputFile fromPath(Path path, Configuration conf, Map properties) { + short replicationFactor = DEFAULT_REPLICATION_FACTOR; + if (properties != null) { + String replicationFactorAsString = properties.get(OutputFileFactory.FILE_REPLICATION_FACTOR); + if (replicationFactorAsString != null) { + try { + replicationFactor = Short.parseShort(replicationFactorAsString); + } catch (NumberFormatException e) { + LOG.warn( + "Failed to parse replication factor: {}, defaulting to {}", + replicationFactorAsString, + DEFAULT_REPLICATION_FACTOR, + e); + } + } + } + return fromPath(path, conf, replicationFactor); + } + + public static OutputFile fromPath(Path path, Configuration conf, short replication) { + FileSystem fs = Util.getFs(path, conf); + return new HadoopOutputFile(fs, path, conf, replication); + } + public static OutputFile fromPath(Path path, FileSystem fs) { return fromPath(path, fs, fs.getConf()); } public static OutputFile fromPath(Path path, FileSystem fs, Configuration conf) { - return new HadoopOutputFile(fs, path, conf); + return new HadoopOutputFile(fs, path, conf, (short) -1); } - private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) { + private HadoopOutputFile(FileSystem fs, Path path, Configuration conf, short replication) { this.fs = fs; this.path = path; this.conf = conf; + this.replication = replication; } @Override public PositionOutputStream create() { try { - return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */)); + if (replication > 0) { + return HadoopStreams.wrap( + fs.create( + path, + false /* overwrite */, + conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT), + replication, + fs.getDefaultBlockSize(path))); + } else { + return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */)); + } } catch (FileAlreadyExistsException e) { throw new AlreadyExistsException(e, "Path already exists: %s", path); } catch (IOException e) { @@ -82,7 +127,19 @@ public PositionOutputStream create() { @Override public PositionOutputStream createOrOverwrite() { try { - return HadoopStreams.wrap(fs.create(path, true /* createOrOverwrite */)); + if (replication > 0) { + return HadoopStreams.wrap( + fs.create( + path, + true /* overwrite */, + conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT), + replication, + fs.getDefaultBlockSize(path))); + } else { + return HadoopStreams.wrap(fs.create(path, true /* createOrOverwrite */)); + } } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create file: %s", path); } diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index c46ca132ed..e54cac37f9 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -21,6 +21,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -30,9 +32,11 @@ import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** Factory responsible for generating unique but recognizable data/delete file names. */ public class OutputFileFactory { + public static final String FILE_REPLICATION_FACTOR = "file-replication-factor"; private final PartitionSpec defaultSpec; private final FileFormat format; private final LocationProvider locations; @@ -48,6 +52,7 @@ public class OutputFileFactory { private final String operationId; private final AtomicInteger fileCount = new AtomicInteger(0); private final String suffix; + private final Optional replicationFactor; /** * Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be @@ -63,6 +68,7 @@ public class OutputFileFactory { * @param taskId Second part of the file name * @param operationId Third part of the file name * @param suffix Suffix part of the file name + * @param replicationFactor the replication factor of output file */ private OutputFileFactory( PartitionSpec spec, @@ -73,7 +79,8 @@ private OutputFileFactory( int partitionId, long taskId, String operationId, - String suffix) { + String suffix, + Optional replicationFactor) { this.defaultSpec = spec; this.format = format; this.locations = locations; @@ -83,6 +90,7 @@ private OutputFileFactory( this.taskId = taskId; this.operationId = operationId; this.suffix = suffix; + this.replicationFactor = replicationFactor; } public static Builder builderFor(Table table, int partitionId, long taskId) { @@ -102,10 +110,25 @@ private String generateFilename() { /** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */ public EncryptedOutputFile newOutputFile() { - OutputFile file = ioSupplier.get().newOutputFile(locations.newDataLocation(generateFilename())); + OutputFile file; + if (replicationFactor.isPresent()) { + file = + ioSupplier + .get() + .newOutputFile(locations.newDataLocation(generateFilename()), getProperties()); + } else { + file = ioSupplier.get().newOutputFile(locations.newDataLocation(generateFilename())); + } return encryptionManager.encrypt(file); } + private Map getProperties() { + Map properties = Maps.newHashMap(); + replicationFactor.ifPresent( + replication -> properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replication))); + return properties; + } + /** Generates an {@link EncryptedOutputFile} for partitioned writes in the default spec. */ public EncryptedOutputFile newOutputFile(StructLike partition) { return newOutputFile(defaultSpec, partition); @@ -114,7 +137,12 @@ public EncryptedOutputFile newOutputFile(StructLike partition) { /** Generates an {@link EncryptedOutputFile} for partitioned writes in a given spec. */ public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) { String newDataLocation = locations.newDataLocation(spec, partition, generateFilename()); - OutputFile rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation); + OutputFile rawOutputFile; + if (replicationFactor.isPresent()) { + rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation, getProperties()); + } else { + rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation); + } return encryptionManager.encrypt(rawOutputFile); } @@ -127,6 +155,7 @@ public static class Builder { private FileFormat format; private String suffix; private Supplier ioSupplier; + private Optional replicationFactorOptional = Optional.empty(); private Builder(Table table, int partitionId, long taskId) { this.table = table; @@ -151,6 +180,13 @@ public Builder operationId(String newOperationId) { return this; } + public Builder replicationFactor(short replicationFactor) { + if (replicationFactor > 0) { + this.replicationFactorOptional = Optional.of(replicationFactor); + } + return this; + } + public Builder format(FileFormat newFormat) { this.format = newFormat; return this; @@ -185,7 +221,8 @@ public OutputFileFactory build() { partitionId, taskId, operationId, - suffix); + suffix, + replicationFactorOptional); } } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopOutputFileReplication.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopOutputFileReplication.java new file mode 100644 index 0000000000..711fbe9e2f --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopOutputFileReplication.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestHadoopOutputFileReplication { + + @TempDir private File tempDir; + private Configuration conf; + private FileSystem fs; + + @BeforeEach + public void before() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + } + + @Test + public void testOutputFileWithDefaultReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-default-replication.txt"); + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, fs, conf); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + assertThat(fs.getFileStatus(testPath).getLen()).isGreaterThan(0); + } + + @Test + public void testOutputFileWithCustomReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-custom-replication.txt"); + short replicationFactor = 2; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data with custom replication".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + assertThat(fs.getFileStatus(testPath).getLen()).isGreaterThan(0); + // Note: Local filesystem doesn't support replication, but the API should work + } + + @Test + public void testOutputFileWithZeroReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-zero-replication.txt"); + short replicationFactor = 0; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data with zero replication".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + } + + @Test + public void testOutputFileWithNegativeReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-negative-replication.txt"); + short replicationFactor = -1; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data with negative replication".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + } + + @Test + public void testCreateOrOverwriteWithCustomReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-overwrite-replication.txt"); + short replicationFactor = 3; + OutputFile outputFile = HadoopOutputFile.fromPath(testPath, conf, replicationFactor); + + // Create initial file + try (PositionOutputStream stream = outputFile.createOrOverwrite()) { + stream.write("initial data".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + long firstSize = fs.getFileStatus(testPath).getLen(); + + // Overwrite the file + try (PositionOutputStream stream = outputFile.createOrOverwrite()) { + stream.write("overwritten data with more content".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + long secondSize = fs.getFileStatus(testPath).getLen(); + assertThat(secondSize).isGreaterThan(firstSize); + } + + @Test + public void testFileIONewOutputFileWithReplication() throws IOException { + HadoopFileIO fileIO = new HadoopFileIO(conf); + String location = + new Path(tempDir.toURI().toString(), "test-fileio-replication.txt").toString(); + short replicationFactor = 2; + + OutputFile outputFile = + fileIO.newOutputFile( + location, + ImmutableMap.of( + OutputFileFactory.FILE_REPLICATION_FACTOR, String.valueOf(replicationFactor))); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data from FileIO".getBytes()); + } + + assertThat(fileIO.newInputFile(location).exists()).isTrue(); + } + + @Test + public void testFileIONewOutputFileWithoutReplication() throws IOException { + HadoopFileIO fileIO = new HadoopFileIO(conf); + String location = + new Path(tempDir.toURI().toString(), "test-fileio-no-replication.txt").toString(); + + // Use default method without replication + OutputFile outputFile = fileIO.newOutputFile(location); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data without explicit replication".getBytes()); + } + + assertThat(fileIO.newInputFile(location).exists()).isTrue(); + } + + @Test + public void testOutputFileWithInvalidReplication() throws IOException { + Path testPath = new Path(tempDir.toURI().toString(), "test-invalid-replication.txt"); + String invalidReplication = "invalid"; + OutputFile outputFile = + HadoopOutputFile.fromPath( + testPath, + conf, + ImmutableMap.of(OutputFileFactory.FILE_REPLICATION_FACTOR, invalidReplication)); + + try (PositionOutputStream stream = outputFile.create()) { + stream.write("test data".getBytes()); + } + + assertThat(fs.exists(testPath)).isTrue(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactoryReplication.java b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactoryReplication.java new file mode 100644 index 0000000000..e570c67025 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactoryReplication.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests for OutputFileFactory's replication factor support. + * + *

Note: These tests verify that the builder pattern accepts replication factor parameters and + * constructs factories correctly. The actual file creation with replication is tested at the + * HadoopOutputFile level. + */ +public class TestOutputFileFactoryReplication { + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get())); + + @TempDir private File tempDir; + + @Test + public void testBuilderWithReplicationFactor() { + Configuration conf = new Configuration(); + HadoopFileIO fileIO = new HadoopFileIO(conf); + PartitionSpec spec = PartitionSpec.unpartitioned(); + + short replicationFactor = 3; + int partitionId = 1; + long taskId = 100L; + + // Verify that the builder accepts replication factor parameter + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + OutputFileFactory.Builder builder = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor(replicationFactor); + + assertThat(builder).isNotNull(); + + OutputFileFactory factory = builder.build(); + assertThat(factory).isNotNull(); + } + + @Test + public void testBuilderWithoutReplicationFactor() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + int partitionId = 1; + long taskId = 100L; + + // Verify that the builder works without replication factor (default) + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table2"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + OutputFileFactory.Builder builder = + OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET); + + assertThat(builder).isNotNull(); + + OutputFileFactory factory = builder.build(); + assertThat(factory).isNotNull(); + } + + @Test + public void testBuilderWithDifferentReplicationFactors() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + int partitionId = 1; + long taskId = 100L; + + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table3"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + // Test with replication factor 1 + OutputFileFactory factory1 = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor((short) 1) + .build(); + assertThat(factory1).isNotNull(); + + // Test with replication factor 5 + OutputFileFactory factory2 = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor((short) 5) + .build(); + assertThat(factory2).isNotNull(); + + // Test with maximum replication factor + OutputFileFactory factory3 = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor(Short.MAX_VALUE) + .build(); + assertThat(factory3).isNotNull(); + } + + @Test + public void testBuilderWithReplicationFactorAndSuffix() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + short replicationFactor = 3; + int partitionId = 1; + long taskId = 100L; + + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table4"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + // Verify that replication factor works with suffix option + OutputFileFactory factory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.PARQUET) + .replicationFactor(replicationFactor) + .suffix("deletes") + .build(); + + assertThat(factory).isNotNull(); + } + + @Test + public void testBuilderWithAllOptions() { + Configuration conf = new Configuration(); + PartitionSpec spec = PartitionSpec.unpartitioned(); + short replicationFactor = 2; + int partitionId = 1; + long taskId = 100L; + + org.apache.iceberg.hadoop.HadoopTables tables = + new org.apache.iceberg.hadoop.HadoopTables(conf); + String location = tempDir.getAbsolutePath() + "/test_table5"; + org.apache.iceberg.Table table = tables.create(SCHEMA, spec, location); + + // Test builder with all options including replication factor + OutputFileFactory factory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(FileFormat.AVRO) + .replicationFactor(replicationFactor) + .operationId("test-operation-id") + .suffix("test-suffix") + .build(); + + assertThat(factory).isNotNull(); + } +} diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java index 3d995cc4f0..0d202af0c8 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java @@ -320,6 +320,11 @@ public OutputFile newOutputFile(String path) { return Files.localOutput(path); } + @Override + public OutputFile newOutputFile(String path, Map properties) { + return Files.localOutput(path); + } + @Override public void deleteFile(String path) { File file = new File(path); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 824e3aca9a..c37c09c95d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -132,6 +132,15 @@ public String wapId() { return sessionConf.get(SparkSQLProperties.WAP_ID, null); } + public short deleteFileReplication() { + return (short) + confParser + .intConf() + .option(SparkWriteOptions.DELETE_FILE_REPLICATION) + .defaultValue(SparkWriteOptions.DEFAULT_DELETE_FILE_REPLICATION) + .parse(); + } + public boolean mergeSchema() { return confParser .booleanConf() diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 391cb6bae3..f5f3a4259b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -23,6 +23,10 @@ public class SparkWriteOptions { private SparkWriteOptions() {} + public static final String DELETE_FILE_REPLICATION = "delete-file-replication"; + + public static final short DEFAULT_DELETE_FILE_REPLICATION = 3; + // Fileformat for write operations(default: Table write.format.default ) public static final String WRITE_FORMAT = "write-format"; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 08c06e85b5..d25e254faa 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -78,6 +78,7 @@ public class SparkPositionDeletesRewrite implements Write { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -112,6 +113,7 @@ public class SparkPositionDeletesRewrite implements Write { this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -137,7 +139,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dsSchema, specId, partition, - writeProperties); + writeProperties, + replicationFactor); } @Override @@ -189,6 +192,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; PositionDeletesWriterFactory( Broadcast tableBroadcast, @@ -200,7 +204,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { StructType dsSchema, int specId, StructLike partition, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -211,6 +216,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.specId = specId; this.partition = partition; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -222,6 +228,7 @@ public DataWriter createWriter(int partitionId, long taskId) { .format(format) .operationId(queryId) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); Schema positionDeleteRowSchema = positionDeleteRowSchema(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index e0cd06de17..4674c2c340 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -105,6 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final SparkWriteRequirements writeRequirements; private final Context context; private final Map writeProperties; + private final short replicationFactor; private boolean cleanupOnAbort = true; @@ -131,6 +132,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -160,7 +162,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + return new PositionDeltaWriteFactory( + tableBroadcast, command, context, writeProperties, replicationFactor); } @Override @@ -341,16 +344,19 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Command command; private final Context context; private final Map writeProperties; + private final short replicationFactor; PositionDeltaWriteFactory( Broadcast
tableBroadcast, Command command, Context context, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -367,6 +373,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .format(context.deleteFileFormat()) .operationId(context.queryId()) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); SparkFileWriterFactory writerFactory = diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java index 35dfb55d5b..8bdc3d6eae 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java @@ -398,6 +398,11 @@ public OutputFile newOutputFile(String path) { return Files.localOutput(path); } + @Override + public OutputFile newOutputFile(String path, Map properties) { + return newOutputFile(path); + } + @Override public void deleteFile(String path) { File file = new File(path); diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index e42707bf10..87ce1c883b 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -63,9 +63,11 @@ public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark { private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class); private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024; + private static final short DELETE_FILE_REPLICATION = (short) 3; protected static final int NUM_FILES = 1; protected static final int NUM_ROWS = 10 * 1000 * 1000; + private static final short DEFAULT_REPLICATION_FACTOR = 3; @Setup public void setupBenchmark() throws IOException { @@ -320,7 +322,11 @@ private void writeEqDeletes(List rows) throws IOException { } private OutputFileFactory newFileFactory() { - return OutputFileFactory.builderFor(table(), 1, 1).format(fileFormat()).build(); + // OutputFileFactory is used only for creating delete files. + return OutputFileFactory.builderFor(table(), 1, 1) + .replicationFactor(DEFAULT_REPLICATION_FACTOR) + .format(fileFormat()) + .build(); } private CharSequence noisePath(CharSequence path) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 07393a67fe..2bef37a8e2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -132,6 +132,15 @@ public boolean wapEnabled() { .parse(); } + public short deleteFileReplication() { + return (short) + confParser + .intConf() + .option(SparkWriteOptions.DELETE_FILE_REPLICATION) + .defaultValue(SparkWriteOptions.DEFAULT_DELETE_FILE_REPLICATION) + .parse(); + } + public String wapId() { return sessionConf.get(SparkSQLProperties.WAP_ID, null); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index d9c4f66b19..17f6250a49 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -20,9 +20,11 @@ /** Spark DF write options */ public class SparkWriteOptions { - private SparkWriteOptions() {} + public static final String DELETE_FILE_REPLICATION = "delete-file-replication"; + public static final short DEFAULT_DELETE_FILE_REPLICATION = 3; + // Fileformat for write operations(default: Table write.format.default ) public static final String WRITE_FORMAT = "write-format"; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index d917794758..00297b4099 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -78,6 +78,7 @@ public class SparkPositionDeletesRewrite implements Write { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -112,6 +113,7 @@ public class SparkPositionDeletesRewrite implements Write { this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -137,7 +139,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dsSchema, specId, partition, - writeProperties); + writeProperties, + replicationFactor); } @Override @@ -189,6 +192,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final int specId; private final StructLike partition; private final Map writeProperties; + private final short replicationFactor; PositionDeletesWriterFactory( Broadcast
tableBroadcast, @@ -200,7 +204,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { StructType dsSchema, int specId, StructLike partition, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -211,6 +216,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.specId = specId; this.partition = partition; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -221,6 +227,7 @@ public DataWriter createWriter(int partitionId, long taskId) { OutputFileFactory.builderFor(table, partitionId, taskId) .format(format) .operationId(queryId) + .replicationFactor(replicationFactor) .suffix("deletes") .build(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 3f0f9c843b..833af8376a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -105,6 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final SparkWriteRequirements writeRequirements; private final Context context; private final Map writeProperties; + private final short replicationFactor; private boolean cleanupOnAbort = true; @@ -131,6 +132,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); + this.replicationFactor = writeConf.deleteFileReplication(); } @Override @@ -171,7 +173,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + return new PositionDeltaWriteFactory( + tableBroadcast, command, context, writeProperties, replicationFactor); } @Override @@ -352,16 +355,19 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Command command; private final Context context; private final Map writeProperties; + private final short replicationFactor; PositionDeltaWriteFactory( Broadcast
tableBroadcast, Command command, Context context, - Map writeProperties) { + Map writeProperties, + short replicationFactor) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; + this.replicationFactor = replicationFactor; } @Override @@ -378,6 +384,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .format(context.deleteFileFormat()) .operationId(context.queryId()) .suffix("deletes") + .replicationFactor(replicationFactor) .build(); SparkFileWriterFactory writerFactory = diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 908a3d3d3c..551e802103 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -102,6 +102,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final boolean useFanoutWriter; private final SparkWriteRequirements writeRequirements; private final Map writeProperties; + private final short deleteFileReplication; private boolean cleanupOnAbort = true; @@ -133,6 +134,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); this.writeProperties = writeConf.writeProperties(); + this.deleteFileReplication = writeConf.deleteFileReplication(); } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java index 189c125b4e..cdf8fb1f94 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java @@ -464,6 +464,11 @@ public OutputFile newOutputFile(String path) { return Files.localOutput(path); } + @Override + public OutputFile newOutputFile(String path, Map properties) { + return Files.localOutput(path); + } + @Override public void deleteFile(String path) { File file = new File(path); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 9f4a4f47bf..e1a6a7cb67 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -545,4 +545,46 @@ private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf) assertThat(writeConf.copyOnWriteDistributionMode(MERGE)).isEqualTo(expectedMode); assertThat(writeConf.positionDeltaDistributionMode(MERGE)).isEqualTo(expectedMode); } + + @TestTemplate + public void testDeleteFileReplicationDefault() { + Table table = validationCatalog.loadTable(tableIdent); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + // Default replication factor should be 3 as per DEFAULT_DELETE_FILE_REPLICATION + assertThat(writeConf.deleteFileReplication()).isEqualTo((short) 3); + } + + @TestTemplate + public void testDeleteFileReplicationFromWriteOption() { + Table table = validationCatalog.loadTable(tableIdent); + + Map writeOptions = + ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, "5"); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + assertThat(writeConf.deleteFileReplication()).isEqualTo((short) 5); + } + + @TestTemplate + public void testDeleteFileReplicationWithOne() { + Table table = validationCatalog.loadTable(tableIdent); + + Map writeOptions = + ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, "1"); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + assertThat(writeConf.deleteFileReplication()).isEqualTo((short) 1); + } + + @TestTemplate + public void testDeleteFileReplicationWithMaxValue() { + Table table = validationCatalog.loadTable(tableIdent); + + Map writeOptions = + ImmutableMap.of(SparkWriteOptions.DELETE_FILE_REPLICATION, String.valueOf(Short.MAX_VALUE)); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + assertThat(writeConf.deleteFileReplication()).isEqualTo(Short.MAX_VALUE); + } }