diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java index d45bbfb4a667..af7a11f5a5d9 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -60,7 +61,7 @@ abstract class BaseCommitService implements Closeable { private final int rewritesPerCommit; private final AtomicBoolean running = new AtomicBoolean(false); private final long timeoutInMS; - private int succeededCommits = 0; + private final AtomicInteger succeededCommits = new AtomicInteger(0); /** * Constructs a {@link BaseCommitService} @@ -228,7 +229,7 @@ private void commitReadyCommitGroups() { try { commitOrClean(batch); committedRewrites.addAll(batch); - succeededCommits++; + succeededCommits.incrementAndGet(); } catch (Exception e) { LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e); } @@ -237,7 +238,7 @@ private void commitReadyCommitGroups() { } public int succeededCommits() { - return succeededCommits; + return succeededCommits.get(); } @VisibleForTesting diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 2f5cc25074ea..419aa000f059 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -342,7 +342,8 @@ private Builder doExecuteWithPartialProgress( // stop commit service commitService.close(); - int totalCommits = Math.min(plan.totalGroupCount(), maxCommits); + int totalCommits = + IntMath.divide(plan.totalGroupCount(), groupsPerCommit, RoundingMode.CEILING); int failedCommits = totalCommits - commitService.succeededCommits(); if (failedCommits > 0 && failedCommits <= maxFailedCommits) { LOG.warn( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d74d8a29f994..8c678872b0de 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1441,6 +1441,40 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount() shouldHaveACleanCache(table); } + @TestTemplate + public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + table.updateProperties().set(COMMIT_NUM_RETRIES, "10").commit(); + + List originalData = currentData(); + + RewriteDataFilesSparkAction rewrite = + basicRewrite(table) + .option( + RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "2") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + // Since we can have at most one commit per 2 file groups and there + // are only 10 file groups, actual number of commits is 5. + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "7") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0"); + + RewriteDataFiles.Result result = rewrite.execute(); + assertThat(result.rewriteResults()).hasSize(10); + assertThat(result.rewriteFailures()).hasSize(0); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and max commits of 7, we have 2 groups per commit. + // That produces 5 rewrite commits plus the initial snapshot (6 snapshots total). + shouldHaveSnapshots(table, 6); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + @TestTemplate public void testInvalidOptions() { Table table = createTable(20); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 3415b6a551ae..5a3091245ce2 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -342,7 +342,8 @@ private Builder doExecuteWithPartialProgress( // stop commit service commitService.close(); - int totalCommits = Math.min(plan.totalGroupCount(), maxCommits); + int totalCommits = + IntMath.divide(plan.totalGroupCount(), groupsPerCommit, RoundingMode.CEILING); int failedCommits = totalCommits - commitService.succeededCommits(); if (failedCommits > 0 && failedCommits <= maxFailedCommits) { LOG.warn( diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 38ddefd26a45..e45ddaa6f351 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1442,6 +1442,40 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount() shouldHaveACleanCache(table); } + @TestTemplate + public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + table.updateProperties().set(COMMIT_NUM_RETRIES, "10").commit(); + + List originalData = currentData(); + + RewriteDataFilesSparkAction rewrite = + basicRewrite(table) + .option( + RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "2") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + // Since we can have at most one commit per 2 file groups and there + // are only 10 file groups, actual number of commits is 5. + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "7") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0"); + + RewriteDataFiles.Result result = rewrite.execute(); + assertThat(result.rewriteResults()).hasSize(10); + assertThat(result.rewriteFailures()).hasSize(0); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and max commits of 7, we have 2 groups per commit. + // That produces 5 rewrite commits plus the initial snapshot (6 snapshots total). + shouldHaveSnapshots(table, 6); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + @TestTemplate public void testInvalidOptions() { Table table = createTable(20); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 3415b6a551ae..5a3091245ce2 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -342,7 +342,8 @@ private Builder doExecuteWithPartialProgress( // stop commit service commitService.close(); - int totalCommits = Math.min(plan.totalGroupCount(), maxCommits); + int totalCommits = + IntMath.divide(plan.totalGroupCount(), groupsPerCommit, RoundingMode.CEILING); int failedCommits = totalCommits - commitService.succeededCommits(); if (failedCommits > 0 && failedCommits <= maxFailedCommits) { LOG.warn( diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 110e43ede1f9..a06ccfb8f910 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1442,6 +1442,40 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount() shouldHaveACleanCache(table); } + @TestTemplate + public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + table.updateProperties().set(COMMIT_NUM_RETRIES, "10").commit(); + + List originalData = currentData(); + + RewriteDataFilesSparkAction rewrite = + basicRewrite(table) + .option( + RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "2") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + // Since we can have at most one commit per 2 file groups and there + // are only 10 file groups, actual number of commits is 5. + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "7") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0"); + + RewriteDataFiles.Result result = rewrite.execute(); + assertThat(result.rewriteResults()).hasSize(10); + assertThat(result.rewriteFailures()).hasSize(0); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and max commits of 7, we have 2 groups per commit. + // That produces 5 rewrite commits plus the initial snapshot (6 snapshots total). + shouldHaveSnapshots(table, 6); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + @TestTemplate public void testInvalidOptions() { Table table = createTable(20);