Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,13 @@ private TableProperties() {}
public static final String SPARK_WRITE_ACCEPT_ANY_SCHEMA = "write.spark.accept-any-schema";
public static final boolean SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT = false;

public static final String SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED =
"write.spark.abort.suppress-failure.enabled";
public static final boolean SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED_DEFAULT = true;

public static final String SPARK_WRITE_ABORT_RETRY_ENABLED = "write.spark.abort.retry.enabled";
public static final boolean SPARK_WRITE_ABORT_RETRY_ENABLED_DEFAULT = true;

public static final String SNAPSHOT_ID_INHERITANCE_ENABLED =
"compatibility.snapshot-id-inheritance.enabled";
public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -38,7 +39,11 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -80,9 +85,16 @@ public TestWriteAborts(String catalogName, String implementation, Map<String, St
super(catalogName, implementation, config);
}

@Before
public void resetCustomIOState() {
CustomFileIO.deleteAttempts.set(0);
CustomFileIO.failDeletes = false;
}

@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
CustomFileIO.failDeletes = false;
}

@Test
Expand Down Expand Up @@ -127,8 +139,131 @@ public void testBatchAppend() throws Exception {
catalogName, tableName, System.currentTimeMillis() + 5000, dataLocation));
}

@Test
public void testAbortRetriesByDefault() throws Exception {
// Bulk path doesn't retry per file (a single deleteFiles call), so this assertion is only
// meaningful for the non-bulk FileIO.
Assume.assumeFalse(catalogName.equals("testhivebulk"));

String dataLocation = temp.newFolder().toString();
CustomFileIO.failDeletes = true;

sql(
"CREATE TABLE %s (id INT, data STRING) "
+ "USING iceberg "
+ "PARTITIONED BY (data)"
+ "TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.WRITE_DATA_LOCATION, dataLocation);

triggerFailingAppend();

// With retry enabled (default), each path should be attempted retry(3)+1 = 4 times.
Assert.assertTrue(
"Expected at least 4 delete attempts when retry is enabled by default, but got "
+ CustomFileIO.deleteAttempts.get(),
CustomFileIO.deleteAttempts.get() >= 4);
}

@Test
public void testAbortRetryDisabledByTableProperty() throws Exception {
// Bulk path is not affected by the retry flag.
Assume.assumeFalse(catalogName.equals("testhivebulk"));

String dataLocation = temp.newFolder().toString();
CustomFileIO.failDeletes = true;

sql(
"CREATE TABLE %s (id INT, data STRING) "
+ "USING iceberg "
+ "PARTITIONED BY (data)"
+ "TBLPROPERTIES ('%s' '%s', '%s' '%s')",
tableName,
TableProperties.WRITE_DATA_LOCATION,
dataLocation,
TableProperties.SPARK_WRITE_ABORT_RETRY_ENABLED,
"false");

triggerFailingAppend();

// With retry disabled, each path should be attempted exactly once. We don't know the exact
// number of files written before the task failed, but it should be small (coalesce(1) on
// 4 input rows). The default retry would produce >= 4 attempts per file.
int attempts = CustomFileIO.deleteAttempts.get();
Assert.assertTrue("Expected at least one delete attempt, got " + attempts, attempts >= 1);
Assert.assertTrue(
"Expected fewer than 4 delete attempts when retry is disabled, but got " + attempts,
attempts < 4);
}

@Test
public void testAbortSuppressFailureDisabledByTableProperty() throws Exception {
String dataLocation = temp.newFolder().toString();
CustomFileIO.failDeletes = true;

sql(
"CREATE TABLE %s (id INT, data STRING) "
+ "USING iceberg "
+ "PARTITIONED BY (data)"
+ "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')",
tableName,
TableProperties.WRITE_DATA_LOCATION,
dataLocation,
TableProperties.SPARK_WRITE_ABORT_SUPPRESS_FAILURE_ENABLED,
"false",
// Disable retries to keep the failure surface small; the suppress-vs-throw decision is
// independent of retry behavior.
TableProperties.SPARK_WRITE_ABORT_RETRY_ENABLED,
"false");

List<SimpleRecord> records =
ImmutableList.of(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "a"),
new SimpleRecord(4, "b"));
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);

// With suppress disabled, the simulated FileIO failure should surface in the exception chain
// rather than being silently swallowed by the cleanup utility.
Assertions.assertThatThrownBy(
() -> {
try {
inputDF.coalesce(1).sortWithinPartitions("id").writeTo(tableName).append();
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
})
.hasStackTraceContaining("simulated FileIO delete failure");
}

private void triggerFailingAppend() {
List<SimpleRecord> records =
ImmutableList.of(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "a"),
new SimpleRecord(4, "b"));
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);

AssertHelpers.assertThrows(
"Write must fail",
SparkException.class,
"Writing job aborted",
() -> {
try {
// incoming records are not ordered by partitions so the job must fail
inputDF.coalesce(1).sortWithinPartitions("id").writeTo(tableName).append();
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
});
}

public static class CustomFileIO implements FileIO {

static final AtomicInteger deleteAttempts = new AtomicInteger(0);
static volatile boolean failDeletes = false;

private final FileIO delegate = new HadoopFileIO(new Configuration());

public CustomFileIO() {}
Expand All @@ -149,6 +284,10 @@ public OutputFile newOutputFile(String path) {

@Override
public void deleteFile(String path) {
deleteAttempts.incrementAndGet();
if (failDeletes) {
throw new RuntimeException("simulated FileIO delete failure");
}
delegate.deleteFile(path);
}

Expand Down Expand Up @@ -179,8 +318,18 @@ public void deleteFile(String path) {

@Override
public void deleteFiles(Iterable<String> paths) throws BulkDeletionFailureException {
int count = 0;
for (String path : paths) {
delegate().deleteFile(path);
count++;
deleteAttempts.incrementAndGet();
if (!failDeletes) {
delegate().deleteFile(path);
}
}
if (failDeletes) {
BulkDeletionFailureException ex = new BulkDeletionFailureException(count);
ex.initCause(new RuntimeException("simulated FileIO delete failure"));
throw ex;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,20 @@ private SparkCleanupUtil() {}
* Attempts to delete as many files produced by a task as possible.
*
* <p>Note this method will log Spark task info and is supposed to be called only on executors.
* Use {@link #deleteFiles(String, FileIO, List)} to delete files on the driver.
* Use {@link #deleteFiles(String, FileIO, List, boolean, boolean)} to delete files on the driver.
*
* @param io a {@link FileIO} instance used for deleting files
* @param files a list of files to delete
* @param suppressFailure whether to suppress per-file failures and continue; when false, the
* first failure is rethrown after the run completes
* @param enableRetry whether to retry individual file deletions on transient failures
*/
public static void deleteTaskFiles(FileIO io, List<? extends ContentFile<?>> files) {
deleteFiles(taskInfo(), io, files);
public static void deleteTaskFiles(
FileIO io,
List<? extends ContentFile<?>> files,
boolean suppressFailure,
boolean enableRetry) {
deleteFiles(taskInfo(), io, files, suppressFailure, enableRetry);
}

// the format matches what Spark uses for internal logging
Expand All @@ -79,22 +86,32 @@ private static String taskInfo() {
* @param context a helpful description of the operation invoking this method
* @param io a {@link FileIO} instance used for deleting files
* @param files a list of files to delete
* @param suppressFailure whether to suppress per-file failures and continue; when false, the
* first failure is rethrown after the run completes
* @param enableRetry whether to retry individual file deletions on transient failures
*/
public static void deleteFiles(String context, FileIO io, List<? extends ContentFile<?>> files) {
public static void deleteFiles(
String context,
FileIO io,
List<? extends ContentFile<?>> files,
boolean suppressFailure,
boolean enableRetry) {
List<String> paths = Lists.transform(files, file -> file.path().toString());
deletePaths(context, io, paths);
deletePaths(context, io, paths, suppressFailure, enableRetry);
}

private static void deletePaths(String context, FileIO io, List<String> paths) {
private static void deletePaths(
String context, FileIO io, List<String> paths, boolean suppressFailure, boolean enableRetry) {
if (io instanceof SupportsBulkOperations) {
SupportsBulkOperations bulkIO = (SupportsBulkOperations) io;
bulkDelete(context, bulkIO, paths);
bulkDelete(context, bulkIO, paths, suppressFailure);
} else {
delete(context, io, paths);
delete(context, io, paths, suppressFailure, enableRetry);
}
}

private static void bulkDelete(String context, SupportsBulkOperations io, List<String> paths) {
private static void bulkDelete(
String context, SupportsBulkOperations io, List<String> paths, boolean suppressFailure) {
try {
io.deleteFiles(paths);
LOG.info("Deleted {} file(s) using bulk deletes ({})", paths.size(), context);
Expand All @@ -106,28 +123,40 @@ private static void bulkDelete(String context, SupportsBulkOperations io, List<S
deletedFilesCount,
paths.size(),
context);
if (!suppressFailure) {
throw e;
}
}
}

private static void delete(String context, FileIO io, List<String> paths) {
private static void delete(
String context, FileIO io, List<String> paths, boolean suppressFailure, boolean enableRetry) {
AtomicInteger deletedFilesCount = new AtomicInteger(0);

Tasks.foreach(paths)
.executeWith(ThreadPools.getWorkerPool())
.stopRetryOn(NotFoundException.class)
.suppressFailureWhenFinished()
.onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc))
.retry(DELETE_NUM_RETRIES)
.exponentialBackoff(
DELETE_MIN_RETRY_WAIT_MS,
DELETE_MAX_RETRY_WAIT_MS,
DELETE_TOTAL_RETRY_TIME_MS,
2 /* exponential */)
.run(
path -> {
io.deleteFile(path);
deletedFilesCount.incrementAndGet();
});
Tasks.Builder<String> builder =
Tasks.foreach(paths)
.executeWith(ThreadPools.getWorkerPool())
.stopRetryOn(NotFoundException.class)
.throwFailureWhenFinished(!suppressFailure)
.onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc));

if (enableRetry) {
builder
.retry(DELETE_NUM_RETRIES)
.exponentialBackoff(
DELETE_MIN_RETRY_WAIT_MS,
DELETE_MAX_RETRY_WAIT_MS,
DELETE_TOTAL_RETRY_TIME_MS,
2 /* exponential */);
} else {
builder.noRetry();
}

builder.run(
path -> {
io.deleteFile(path);
deletedFilesCount.incrementAndGet();
});

if (deletedFilesCount.get() < paths.size()) {
LOG.warn("Deleted only {} of {} file(s) ({})", deletedFilesCount, paths.size(), context);
Expand Down
Loading
Loading