From 73a6ae317abb358108d6a35a96cbffaf2bae61db Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Tue, 16 Jun 2026 12:15:49 -0700 Subject: [PATCH 1/2] Spark 3.5: Default partitioned write distribution mode to NONE Iceberg 1.5 implements RequiresDistributionAndOrdering, so every Spark write to a partitioned table now requests a HASH distribution by default (SparkWriteConf.defaultWriteDistributionMode). Spark plans this as a RebalancePartitions / REBALANCE shuffle on the partition columns. REBALANCE is poorly handled by Celeborn 0.4 and has already caused production incidents (notably Kyoto CDC writes overloading Celeborn and impacting other tenants). Celeborn 0.6 addresses this but is still a couple of months out, while Spark 3.5 traffic is about to ramp with the Python 3.12 migration. This change flips the default write distribution mode for partitioned, unsorted tables from HASH to NONE, restoring the Spark 3.1 status quo: - Only normal (batch/append) writes are affected. DELETE/UPDATE/MERGE keep their existing HASH defaults. - The sort/ordering behavior is unchanged, so users do not need to add .sortWithinPartitions() in their code. - Users who explicitly want a distribution can still request it via the write option, session conf, or table property. Small files that may result are handled by Open House's post-write compaction. We can revisit re-enabling the default distribution once Celeborn 0.6 is rolled out. Tests updated to reflect the new default: - TestSparkWriteConf#testSparkWriteConfDistributionDefault now asserts NONE for normal writes and HASH for row-level operations. - TestSparkWriteConf#testAdvisoryPartitionSize explicitly requests HASH so an advisory partition size is reported (Spark only reports one when a distribution is requested). - TestSparkDistributionAndOrderingUtil#testDefaultWritePartitionedUnsortedTable now expects an unspecified distribution with local ordering retained. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../apache/iceberg/spark/SparkWriteConf.java | 2 +- .../TestSparkDistributionAndOrderingUtil.java | 17 +++++++++-------- .../iceberg/spark/TestSparkWriteConf.java | 17 ++++++++++++++++- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 377bb901e1..f799157acb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -316,7 +316,7 @@ private DistributionMode defaultWriteDistributionMode() { if (table.sortOrder().isSorted()) { return RANGE; } else if (table.spec().isPartitioned()) { - return HASH; + return NONE; } else { return NONE; } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java index 39ef72c6bb..96e67b0aa6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java @@ -125,8 +125,11 @@ public void dropTable() { // // PARTITIONED BY date, days(ts) UNORDERED // ------------------------------------------------------------------------- - // write mode is NOT SET -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts) - // write mode is NOT SET (fanout) -> CLUSTER BY date, days(ts) + empty ordering + // NOTE: in this fork the default write distribution mode for partitioned tables is NONE + // (not HASH), so "write mode is NOT SET" behaves like "write mode is NONE" below + // (see SparkWriteConf#defaultWriteDistributionMode). + // write mode is NOT SET -> unspecified distribution + LOCALLY ORDER BY date, days(ts) + // write mode is NOT SET (fanout) -> unspecified distribution + empty ordering // write mode is NONE -> unspecified distribution + LOCALLY ORDERED BY date, days(ts) // write mode is NONE (fanout) -> unspecified distribution + empty ordering // write mode is HASH -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts) @@ -243,21 +246,19 @@ public void testDefaultWritePartitionedUnsortedTable() { disableFanoutWriters(table); - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); - + // The default distribution mode for partitioned tables is NONE in this fork, so no clustering + // is requested; only the local ordering on the partition columns is retained. SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) }; - checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); + checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, expectedOrdering); enableFanoutWriters(table); - checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); + checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); } @TestTemplate diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index e1a6a7cb67..24348acf6f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -160,6 +160,11 @@ public void testDeleteGranularityInvalidValue() { public void testAdvisoryPartitionSize() { Table table = validationCatalog.loadTable(tableIdent); + // An advisory partition size is only reported when a distribution is requested (Spark prohibits + // requesting a size without distribution). Since the default for partitioned tables is NONE in + // this fork, explicitly request HASH so the advisory size resolution can be exercised. + table.updateProperties().set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); long value1 = writeConf.writeRequirements().advisoryPartitionSize(); @@ -180,7 +185,17 @@ public void testSparkWriteConfDistributionDefault() { SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); - checkMode(DistributionMode.HASH, writeConf); + // In this fork, normal writes to a partitioned (unsorted) table default to NONE rather than + // HASH to avoid an Iceberg-injected shuffle that interacts poorly with Celeborn (see + // SparkWriteConf#defaultWriteDistributionMode). Row-level operations (DELETE/UPDATE/MERGE) + // are unaffected and still default to HASH. + assertThat(writeConf.distributionMode()).isEqualTo(DistributionMode.NONE); + assertThat(writeConf.copyOnWriteDistributionMode(DELETE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.positionDeltaDistributionMode(DELETE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.copyOnWriteDistributionMode(UPDATE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.positionDeltaDistributionMode(UPDATE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.copyOnWriteDistributionMode(MERGE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.positionDeltaDistributionMode(MERGE)).isEqualTo(DistributionMode.HASH); } @TestTemplate From 02794d007e945f19df1bf7e5a583392cafc37e82 Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Tue, 16 Jun 2026 13:45:27 -0700 Subject: [PATCH 2/2] Spark 3.5: Fix file-count assertions broken by NONE default distribution Changing the default partitioned write distribution mode from HASH to NONE means writes no longer cluster each partition's rows into a single file; the number of output files now depends on input parallelism. Three functional tests asserted exact file counts that implicitly relied on the old one-file-per-partition layout and failed on multi-core CI runners. Restore deterministic layout by requesting HASH explicitly where the test intent is about file/partition layout rather than the default: - TestRewritePositionDeleteFilesAction: set WRITE_DISTRIBUTION_MODE=hash in the shared tableProperties() helper (partitioned tables only; the unpartitioned cases control file count via repartition()). - TestIcebergSourceTablesBase#testPartitionsTableDeleteStats: set WRITE_DISTRIBUTION_MODE=hash on the table so each partition's rows land in a single data file, matching the file_count assertions. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../actions/TestRewritePositionDeleteFilesAction.java | 7 ++++++- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 ++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index 89c44dbfcc..9df0daa2a6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -676,7 +676,12 @@ private Map tableProperties() { TableProperties.FORMAT_VERSION, "2", TableProperties.DEFAULT_FILE_FORMAT, - format.toString()); + format.toString(), + // This fork defaults partitioned writes to NONE distribution; these tests expect data to be + // clustered into one file per partition, so request HASH explicitly to keep file counts + // deterministic regardless of input parallelism. + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_HASH); } private void writeRecords(Table table, int files, int numRecords) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index d37d6a8616..fff27d23c8 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1600,6 +1600,12 @@ public void testPartitionsTableLastUpdatedSnapshot() { public void testPartitionsTableDeleteStats() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test"); Table table = createTable(tableIdentifier, SCHEMA, SPEC); + // This fork defaults partitioned writes to NONE distribution; request HASH so each partition's + // rows are clustered into a single data file, as this test's file_count assertions expect. + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_HASH) + .commit(); Table partitionsTable = loadTable(tableIdentifier, "partitions"); Dataset df1 = spark.createDataFrame(