From 69d1156e2f29575929582a21b06e10aa1031f975 Mon Sep 17 00:00:00 2001 From: Kai-Hsiang Hsu Date: Fri, 26 Jun 2026 17:59:01 +0800 Subject: [PATCH 1/3] Core, Spark: Fix incorrect partial commit failure count in rewrite data files The rewrite data files procedure reported partial commit failures even when all commits succeeded, because it derived the total commit count as min(totalGroupCount, maxCommits). When groups are batched into groupsPerCommit = ceil(totalGroupCount / maxCommits) per commit, the actual number of commits is ceil(totalGroupCount / groupsPerCommit), which can be smaller. The mismatch made totalCommits - succeededCommits positive even with zero failures. Compute totalCommits as ceil(totalGroupCount / groupsPerCommit) so the failed commit count is correct. Also make succeededCommits an AtomicInteger since it is incremented from multiple rewrite threads. Co-Authored-By: Claude Opus 4.8 --- .../java/org/apache/iceberg/actions/BaseCommitService.java | 7 ++++--- .../iceberg/spark/actions/RewriteDataFilesSparkAction.java | 3 ++- .../iceberg/spark/actions/RewriteDataFilesSparkAction.java | 3 ++- .../iceberg/spark/actions/RewriteDataFilesSparkAction.java | 3 ++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java index d45bbfb4a667..af7a11f5a5d9 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -60,7 +61,7 @@ abstract class BaseCommitService implements Closeable { private final int rewritesPerCommit; private final AtomicBoolean running = new AtomicBoolean(false); private final long timeoutInMS; - private int succeededCommits = 0; + private final AtomicInteger succeededCommits = new AtomicInteger(0); /** * Constructs a {@link BaseCommitService} @@ -228,7 +229,7 @@ private void commitReadyCommitGroups() { try { commitOrClean(batch); committedRewrites.addAll(batch); - succeededCommits++; + succeededCommits.incrementAndGet(); } catch (Exception e) { LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e); } @@ -237,7 +238,7 @@ private void commitReadyCommitGroups() { } public int succeededCommits() { - return succeededCommits; + return succeededCommits.get(); } @VisibleForTesting diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 2f5cc25074ea..419aa000f059 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -342,7 +342,8 @@ private Builder doExecuteWithPartialProgress( // stop commit service commitService.close(); - int totalCommits = Math.min(plan.totalGroupCount(), maxCommits); + int totalCommits = + IntMath.divide(plan.totalGroupCount(), groupsPerCommit, RoundingMode.CEILING); int failedCommits = totalCommits - commitService.succeededCommits(); if (failedCommits > 0 && failedCommits <= maxFailedCommits) { LOG.warn( diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 3415b6a551ae..5a3091245ce2 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -342,7 +342,8 @@ private Builder doExecuteWithPartialProgress( // stop commit service commitService.close(); - int totalCommits = Math.min(plan.totalGroupCount(), maxCommits); + int totalCommits = + IntMath.divide(plan.totalGroupCount(), groupsPerCommit, RoundingMode.CEILING); int failedCommits = totalCommits - commitService.succeededCommits(); if (failedCommits > 0 && failedCommits <= maxFailedCommits) { LOG.warn( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 3415b6a551ae..5a3091245ce2 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -342,7 +342,8 @@ private Builder doExecuteWithPartialProgress( // stop commit service commitService.close(); - int totalCommits = Math.min(plan.totalGroupCount(), maxCommits); + int totalCommits = + IntMath.divide(plan.totalGroupCount(), groupsPerCommit, RoundingMode.CEILING); int failedCommits = totalCommits - commitService.succeededCommits(); if (failedCommits > 0 && failedCommits <= maxFailedCommits) { LOG.warn( From c4c6538b4b54759b4b0e342de625b6d74971a515 Mon Sep 17 00:00:00 2001 From: Kai-Hsiang Hsu Date: Fri, 26 Jun 2026 20:16:26 +0800 Subject: [PATCH 2/3] add integration tests --- .../actions/TestRewriteDataFilesAction.java | 34 +++++++++++++++++++ .../actions/TestRewriteDataFilesAction.java | 34 +++++++++++++++++++ .../actions/TestRewriteDataFilesAction.java | 34 +++++++++++++++++++ 3 files changed, 102 insertions(+) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d74d8a29f994..a43d8addd53d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1441,6 +1441,40 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount() shouldHaveACleanCache(table); } + @TestTemplate + public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + table.updateProperties().set(COMMIT_NUM_RETRIES, "10").commit(); + + List originalData = currentData(); + + RewriteDataFilesSparkAction rewrite = + basicRewrite(table) + .option( + RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "2") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + // Since we can have at most one commit per 2 file groups and there + // are only 10 file groups, actual number of commits is 5. + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "7") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0"); + + RewriteDataFiles.Result result = rewrite.execute(); + assertThat(result.rewriteResults()).hasSize(10); + assertThat(result.rewriteFailures()).hasSize(0); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and max commits of 7, we have 2 groups per commit. + // Hence, there will be 1 + 5 commits in total + shouldHaveSnapshots(table, 6); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + @TestTemplate public void testInvalidOptions() { Table table = createTable(20); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 38ddefd26a45..972b731f2476 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1442,6 +1442,40 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount() shouldHaveACleanCache(table); } + @TestTemplate + public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + table.updateProperties().set(COMMIT_NUM_RETRIES, "10").commit(); + + List originalData = currentData(); + + RewriteDataFilesSparkAction rewrite = + basicRewrite(table) + .option( + RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "2") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + // Since we can have at most one commit per 2 file groups and there + // are only 10 file groups, actual number of commits is 5. + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "7") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0"); + + RewriteDataFiles.Result result = rewrite.execute(); + assertThat(result.rewriteResults()).hasSize(10); + assertThat(result.rewriteFailures()).hasSize(0); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and max commits of 7, we have 2 groups per commit. + // Hence, there will be 1 + 5 commits in total + shouldHaveSnapshots(table, 6); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + @TestTemplate public void testInvalidOptions() { Table table = createTable(20); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 110e43ede1f9..ddbb20ba640b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1442,6 +1442,40 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount() shouldHaveACleanCache(table); } + @TestTemplate + public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + table.updateProperties().set(COMMIT_NUM_RETRIES, "10").commit(); + + List originalData = currentData(); + + RewriteDataFilesSparkAction rewrite = + basicRewrite(table) + .option( + RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "2") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + // Since we can have at most one commit per 2 file groups and there + // are only 10 file groups, actual number of commits is 5. + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "7") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0"); + + RewriteDataFiles.Result result = rewrite.execute(); + assertThat(result.rewriteResults()).hasSize(10); + assertThat(result.rewriteFailures()).hasSize(0); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and max commits of 7, we have 2 groups per commit. + // Hence, there will be 1 + 5 commits in total + shouldHaveSnapshots(table, 6); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + @TestTemplate public void testInvalidOptions() { Table table = createTable(20); From f3805c2d3c892084d143332817dd9613fae55b2f Mon Sep 17 00:00:00 2001 From: Kai-Hsiang Hsu Date: Tue, 30 Jun 2026 21:03:33 +0800 Subject: [PATCH 3/3] Update comment for clarity Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../iceberg/spark/actions/TestRewriteDataFilesAction.java | 2 +- .../iceberg/spark/actions/TestRewriteDataFilesAction.java | 2 +- .../iceberg/spark/actions/TestRewriteDataFilesAction.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index a43d8addd53d..8c678872b0de 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1469,7 +1469,7 @@ public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { assertEquals("We shouldn't have changed the data", originalData, postRewriteData); // With 10 original groups and max commits of 7, we have 2 groups per commit. - // Hence, there will be 1 + 5 commits in total + // That produces 5 rewrite commits plus the initial snapshot (6 snapshots total). shouldHaveSnapshots(table, 6); shouldHaveNoOrphans(table); shouldHaveACleanCache(table); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 972b731f2476..e45ddaa6f351 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1470,7 +1470,7 @@ public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { assertEquals("We shouldn't have changed the data", originalData, postRewriteData); // With 10 original groups and max commits of 7, we have 2 groups per commit. - // Hence, there will be 1 + 5 commits in total + // That produces 5 rewrite commits plus the initial snapshot (6 snapshots total). shouldHaveSnapshots(table, 6); shouldHaveNoOrphans(table); shouldHaveACleanCache(table); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index ddbb20ba640b..a06ccfb8f910 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1470,7 +1470,7 @@ public void testParallelPartialProgressWithTotalCommitsLessThanMaxCommits() { assertEquals("We shouldn't have changed the data", originalData, postRewriteData); // With 10 original groups and max commits of 7, we have 2 groups per commit. - // Hence, there will be 1 + 5 commits in total + // That produces 5 rewrite commits plus the initial snapshot (6 snapshots total). shouldHaveSnapshots(table, 6); shouldHaveNoOrphans(table); shouldHaveACleanCache(table);