From 1509bffc40f2501a0bafd95d5801e2ac819ce026 Mon Sep 17 00:00:00 2001 From: Levi Jiang Date: Mon, 4 May 2026 16:22:41 -0700 Subject: [PATCH 1/3] Add flag to spark write abort to disable error suppress and retry --- .../org/apache/iceberg/TableProperties.java | 7 + .../spark/extensions/TestWriteAborts.java | 152 +++++++++++++++++- .../spark/source/SparkCleanupUtil.java | 85 +++++++--- .../iceberg/spark/source/SparkWrite.java | 69 ++++++-- 4 files changed, 277 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index c94376817b..a2d260dffd 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -296,6 +296,13 @@ private TableProperties() {} public static final String SPARK_WRITE_ACCEPT_ANY_SCHEMA = "write.spark.accept-any-schema"; public static final boolean SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT = false; + public static final String SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED = + "write.spark.abort.suppress-failure.enabled"; + public static final boolean SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED_DEFAULT = true; + + public static final String SPARK_WRITE_ABORT_RETRY_ENABLED = "write.spark.abort.retry.enabled"; + public static final boolean SPARK_WRITE_ABORT_RETRY_ENABLED_DEFAULT = true; + public static final String SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled"; public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false; diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java index 4af9cee77e..8a4b7829bb 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.CatalogProperties; @@ -38,7 +39,11 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.assertj.core.api.Assertions; import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -80,9 +85,16 @@ public TestWriteAborts(String catalogName, String implementation, Map= 4); + } + + @Test + public void testAbortRetryDisabledByTableProperty() throws Exception { + // Bulk path is not affected by the retry flag. + Assume.assumeFalse(catalogName.equals("testhivebulk")); + + String dataLocation = temp.newFolder().toString(); + CustomFileIO.failDeletes = true; + + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data)" + + "TBLPROPERTIES ('%s' '%s', '%s' '%s')", + tableName, + TableProperties.WRITE_DATA_LOCATION, + dataLocation, + TableProperties.SPARK_WRITE_ABORT_RETRY_ENABLED, + "false"); + + triggerFailingAppend(); + + // With retry disabled, each path should be attempted exactly once. We don't know the exact + // number of files written before the task failed, but it should be small (coalesce(1) on + // 4 input rows). The default retry would produce >= 4 attempts per file. + int attempts = CustomFileIO.deleteAttempts.get(); + Assert.assertTrue( + "Expected at least one delete attempt, got " + attempts, attempts >= 1); + Assert.assertTrue( + "Expected fewer than 4 delete attempts when retry is disabled, but got " + attempts, + attempts < 4); + } + + @Test + public void testAbortSuppressFailureDisabledByTableProperty() throws Exception { + String dataLocation = temp.newFolder().toString(); + CustomFileIO.failDeletes = true; + + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data)" + + "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')", + tableName, + TableProperties.WRITE_DATA_LOCATION, + dataLocation, + TableProperties.SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED, + "false", + // Disable retries to keep the failure surface small; the suppress-vs-throw decision is + // independent of retry behavior. + TableProperties.SPARK_WRITE_ABORT_RETRY_ENABLED, + "false"); + + List records = + ImmutableList.of( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "a"), + new SimpleRecord(4, "b")); + Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); + + // With suppress disabled, the simulated FileIO failure should surface in the exception chain + // rather than being silently swallowed by the cleanup utility. + Assertions.assertThatThrownBy( + () -> { + try { + inputDF.coalesce(1).sortWithinPartitions("id").writeTo(tableName).append(); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }) + .hasStackTraceContaining("simulated FileIO delete failure"); + } + + private void triggerFailingAppend() { + List records = + ImmutableList.of( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "a"), + new SimpleRecord(4, "b")); + Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); + + AssertHelpers.assertThrows( + "Write must fail", + SparkException.class, + "Writing job aborted", + () -> { + try { + // incoming records are not ordered by partitions so the job must fail + inputDF.coalesce(1).sortWithinPartitions("id").writeTo(tableName).append(); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }); + } + public static class CustomFileIO implements FileIO { + static final AtomicInteger deleteAttempts = new AtomicInteger(0); + static volatile boolean failDeletes = false; + private final FileIO delegate = new HadoopFileIO(new Configuration()); public CustomFileIO() {} @@ -149,6 +285,10 @@ public OutputFile newOutputFile(String path) { @Override public void deleteFile(String path) { + deleteAttempts.incrementAndGet(); + if (failDeletes) { + throw new RuntimeException("simulated FileIO delete failure"); + } delegate.deleteFile(path); } @@ -179,8 +319,18 @@ public void deleteFile(String path) { @Override public void deleteFiles(Iterable paths) throws BulkDeletionFailureException { + int count = 0; for (String path : paths) { - delegate().deleteFile(path); + count++; + deleteAttempts.incrementAndGet(); + if (!failDeletes) { + delegate().deleteFile(path); + } + } + if (failDeletes) { + BulkDeletionFailureException ex = new BulkDeletionFailureException(count); + ex.initCause(new RuntimeException("simulated FileIO delete failure")); + throw ex; } } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index a103a50032..e1fe293ae0 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -48,13 +48,20 @@ private SparkCleanupUtil() {} * Attempts to delete as many files produced by a task as possible. * *

Note this method will log Spark task info and is supposed to be called only on executors. - * Use {@link #deleteFiles(String, FileIO, List)} to delete files on the driver. + * Use {@link #deleteFiles(String, FileIO, List, boolean, boolean)} to delete files on the driver. * * @param io a {@link FileIO} instance used for deleting files * @param files a list of files to delete + * @param suppressFailure whether to suppress per-file failures and continue; when false, the + * first failure is rethrown after the run completes + * @param enableRetry whether to retry individual file deletions on transient failures */ - public static void deleteTaskFiles(FileIO io, List> files) { - deleteFiles(taskInfo(), io, files); + public static void deleteTaskFiles( + FileIO io, + List> files, + boolean suppressFailure, + boolean enableRetry) { + deleteFiles(taskInfo(), io, files, suppressFailure, enableRetry); } // the format matches what Spark uses for internal logging @@ -79,22 +86,36 @@ private static String taskInfo() { * @param context a helpful description of the operation invoking this method * @param io a {@link FileIO} instance used for deleting files * @param files a list of files to delete + * @param suppressFailure whether to suppress per-file failures and continue; when false, the + * first failure is rethrown after the run completes + * @param enableRetry whether to retry individual file deletions on transient failures */ - public static void deleteFiles(String context, FileIO io, List> files) { + public static void deleteFiles( + String context, + FileIO io, + List> files, + boolean suppressFailure, + boolean enableRetry) { List paths = Lists.transform(files, file -> file.path().toString()); - deletePaths(context, io, paths); + deletePaths(context, io, paths, suppressFailure, enableRetry); } - private static void deletePaths(String context, FileIO io, List paths) { + private static void deletePaths( + String context, + FileIO io, + List paths, + boolean suppressFailure, + boolean enableRetry) { if (io instanceof SupportsBulkOperations) { SupportsBulkOperations bulkIO = (SupportsBulkOperations) io; - bulkDelete(context, bulkIO, paths); + bulkDelete(context, bulkIO, paths, suppressFailure); } else { - delete(context, io, paths); + delete(context, io, paths, suppressFailure, enableRetry); } } - private static void bulkDelete(String context, SupportsBulkOperations io, List paths) { + private static void bulkDelete( + String context, SupportsBulkOperations io, List paths, boolean suppressFailure) { try { io.deleteFiles(paths); LOG.info("Deleted {} file(s) using bulk deletes ({})", paths.size(), context); @@ -106,28 +127,40 @@ private static void bulkDelete(String context, SupportsBulkOperations io, List paths) { + private static void delete( + String context, FileIO io, List paths, boolean suppressFailure, boolean enableRetry) { AtomicInteger deletedFilesCount = new AtomicInteger(0); - Tasks.foreach(paths) - .executeWith(ThreadPools.getWorkerPool()) - .stopRetryOn(NotFoundException.class) - .suppressFailureWhenFinished() - .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc)) - .retry(DELETE_NUM_RETRIES) - .exponentialBackoff( - DELETE_MIN_RETRY_WAIT_MS, - DELETE_MAX_RETRY_WAIT_MS, - DELETE_TOTAL_RETRY_TIME_MS, - 2 /* exponential */) - .run( - path -> { - io.deleteFile(path); - deletedFilesCount.incrementAndGet(); - }); + Tasks.Builder builder = + Tasks.foreach(paths) + .executeWith(ThreadPools.getWorkerPool()) + .stopRetryOn(NotFoundException.class) + .throwFailureWhenFinished(!suppressFailure) + .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc)); + + if (enableRetry) { + builder + .retry(DELETE_NUM_RETRIES) + .exponentialBackoff( + DELETE_MIN_RETRY_WAIT_MS, + DELETE_MAX_RETRY_WAIT_MS, + DELETE_TOTAL_RETRY_TIME_MS, + 2 /* exponential */); + } else { + builder.noRetry(); + } + + builder.run( + path -> { + io.deleteFile(path); + deletedFilesCount.incrementAndGet(); + }); if (deletedFilesCount.get() < paths.size()) { LOG.warn("Deleted only {} of {} file(s) ({})", deletedFilesCount, paths.size(), context); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 442c0cfe0d..dbe4c109b5 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -40,6 +40,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.expressions.Expression; @@ -57,6 +58,7 @@ import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; import org.apache.spark.api.java.JavaSparkContext; @@ -92,6 +94,8 @@ class SparkWrite { private final StructType dsSchema; private final Map extraSnapshotMetadata; private final boolean partitionedFanoutEnabled; + private final boolean suppressFailureOnAbort; + private final boolean retryOnAbort; /** Pending schema update when mergeSchema was used; committed in commit() before data commit. */ private final UpdateSchema pendingSchemaUpdate; @@ -143,6 +147,16 @@ class SparkWrite { this.dsSchema = dsSchema; this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); + this.suppressFailureOnAbort = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED, + TableProperties.SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED_DEFAULT); + this.retryOnAbort = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SPARK_WRITE_ABORT_RETRY_ENABLED, + TableProperties.SPARK_WRITE_ABORT_RETRY_ENABLED_DEFAULT); this.pendingSchemaUpdate = pendingSchemaUpdate; } @@ -189,7 +203,14 @@ private WriterFactory createWriterFactory() { Broadcast tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); return new WriterFactory( - tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled); + tableBroadcast, + format, + targetFileSize, + writeSchema, + dsSchema, + partitionedFanoutEnabled, + suppressFailureOnAbort, + retryOnAbort); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -230,7 +251,8 @@ private void commitOperation(SnapshotUpdate operation, String description) { private void abort(WriterCommitMessage[] messages) { if (cleanupOnAbort) { - SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + SparkCleanupUtil.deleteFiles( + "job abort", table.io(), files(messages), suppressFailureOnAbort, retryOnAbort); } else { LOG.warn("Skipping cleanup of written files"); } @@ -577,6 +599,8 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final Schema writeSchema; private final StructType dsSchema; private final boolean partitionedFanoutEnabled; + private final boolean suppressFailureOnAbort; + private final boolean retryOnAbort; protected WriterFactory( Broadcast
tableBroadcast, @@ -584,13 +608,17 @@ protected WriterFactory( long targetFileSize, Schema writeSchema, StructType dsSchema, - boolean partitionedFanoutEnabled) { + boolean partitionedFanoutEnabled, + boolean suppressFailureOnAbort, + boolean retryOnAbort) { this.tableBroadcast = tableBroadcast; this.format = format; this.targetFileSize = targetFileSize; this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.partitionedFanoutEnabled = partitionedFanoutEnabled; + this.suppressFailureOnAbort = suppressFailureOnAbort; + this.retryOnAbort = retryOnAbort; } @Override @@ -614,7 +642,14 @@ public DataWriter createWriter(int partitionId, long taskId, long e .build(); if (spec.isUnpartitioned()) { - return new UnpartitionedDataWriter(writerFactory, fileFactory, io, spec, targetFileSize); + return new UnpartitionedDataWriter( + writerFactory, + fileFactory, + io, + spec, + targetFileSize, + suppressFailureOnAbort, + retryOnAbort); } else { return new PartitionedDataWriter( @@ -625,7 +660,9 @@ public DataWriter createWriter(int partitionId, long taskId, long e writeSchema, dsSchema, targetFileSize, - partitionedFanoutEnabled); + partitionedFanoutEnabled, + suppressFailureOnAbort, + retryOnAbort); } } } @@ -633,16 +670,22 @@ public DataWriter createWriter(int partitionId, long taskId, long e private static class UnpartitionedDataWriter implements DataWriter { private final FileWriter delegate; private final FileIO io; + private final boolean suppressFailureOnAbort; + private final boolean retryOnAbort; private UnpartitionedDataWriter( SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, - long targetFileSize) { + long targetFileSize, + boolean suppressFailureOnAbort, + boolean retryOnAbort) { this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; + this.suppressFailureOnAbort = suppressFailureOnAbort; + this.retryOnAbort = retryOnAbort; } @Override @@ -665,7 +708,8 @@ public void abort() throws IOException { close(); DataWriteResult result = delegate.result(); - SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles()); + SparkCleanupUtil.deleteTaskFiles( + io, result.dataFiles(), suppressFailureOnAbort, retryOnAbort); } @Override @@ -680,6 +724,8 @@ private static class PartitionedDataWriter implements DataWriter { private final PartitionSpec spec; private final PartitionKey partitionKey; private final InternalRowWrapper internalRowWrapper; + private final boolean suppressFailureOnAbort; + private final boolean retryOnAbort; private PartitionedDataWriter( SparkFileWriterFactory writerFactory, @@ -689,7 +735,9 @@ private PartitionedDataWriter( Schema dataSchema, StructType dataSparkType, long targetFileSize, - boolean fanoutEnabled) { + boolean fanoutEnabled, + boolean suppressFailureOnAbort, + boolean retryOnAbort) { if (fanoutEnabled) { this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } else { @@ -699,6 +747,8 @@ private PartitionedDataWriter( this.spec = spec; this.partitionKey = new PartitionKey(spec, dataSchema); this.internalRowWrapper = new InternalRowWrapper(dataSparkType); + this.suppressFailureOnAbort = suppressFailureOnAbort; + this.retryOnAbort = retryOnAbort; } @Override @@ -722,7 +772,8 @@ public void abort() throws IOException { close(); DataWriteResult result = delegate.result(); - SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles()); + SparkCleanupUtil.deleteTaskFiles( + io, result.dataFiles(), suppressFailureOnAbort, retryOnAbort); } @Override From 379a035960023936f704cf76dbbb622ebb6cba3f Mon Sep 17 00:00:00 2001 From: Levi Jiang Date: Mon, 4 May 2026 16:23:37 -0700 Subject: [PATCH 2/3] fix --- .../org/apache/iceberg/spark/source/SparkCleanupUtil.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index e1fe293ae0..dc7b12a05b 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -101,11 +101,7 @@ public static void deleteFiles( } private static void deletePaths( - String context, - FileIO io, - List paths, - boolean suppressFailure, - boolean enableRetry) { + String context, FileIO io, List paths, boolean suppressFailure, boolean enableRetry) { if (io instanceof SupportsBulkOperations) { SupportsBulkOperations bulkIO = (SupportsBulkOperations) io; bulkDelete(context, bulkIO, paths, suppressFailure); From bfb37e5eba60c3e06160f803acd00d478c76e1ee Mon Sep 17 00:00:00 2001 From: Levi Jiang Date: Mon, 4 May 2026 16:27:45 -0700 Subject: [PATCH 3/3] fix style --- .../org/apache/iceberg/spark/extensions/TestWriteAborts.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java index 8a4b7829bb..978eded3d7 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java @@ -189,8 +189,7 @@ public void testAbortRetryDisabledByTableProperty() throws Exception { // number of files written before the task failed, but it should be small (coalesce(1) on // 4 input rows). The default retry would produce >= 4 attempts per file. int attempts = CustomFileIO.deleteAttempts.get(); - Assert.assertTrue( - "Expected at least one delete attempt, got " + attempts, attempts >= 1); + Assert.assertTrue("Expected at least one delete attempt, got " + attempts, attempts >= 1); Assert.assertTrue( "Expected fewer than 4 delete attempts when retry is disabled, but got " + attempts, attempts < 4);