diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 377bb901e..f799157ac 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -316,7 +316,7 @@ private DistributionMode defaultWriteDistributionMode() { if (table.sortOrder().isSorted()) { return RANGE; } else if (table.spec().isPartitioned()) { - return HASH; + return NONE; } else { return NONE; } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java index 39ef72c6b..96e67b0aa 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java @@ -125,8 +125,11 @@ public void dropTable() { // // PARTITIONED BY date, days(ts) UNORDERED // ------------------------------------------------------------------------- - // write mode is NOT SET -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts) - // write mode is NOT SET (fanout) -> CLUSTER BY date, days(ts) + empty ordering + // NOTE: in this fork the default write distribution mode for partitioned tables is NONE + // (not HASH), so "write mode is NOT SET" behaves like "write mode is NONE" below + // (see SparkWriteConf#defaultWriteDistributionMode). + // write mode is NOT SET -> unspecified distribution + LOCALLY ORDER BY date, days(ts) + // write mode is NOT SET (fanout) -> unspecified distribution + empty ordering // write mode is NONE -> unspecified distribution + LOCALLY ORDERED BY date, days(ts) // write mode is NONE (fanout) -> unspecified distribution + empty ordering // write mode is HASH -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts) @@ -243,21 +246,19 @@ public void testDefaultWritePartitionedUnsortedTable() { disableFanoutWriters(table); - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); - + // The default distribution mode for partitioned tables is NONE in this fork, so no clustering + // is requested; only the local ordering on the partition columns is retained. SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) }; - checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); + checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, expectedOrdering); enableFanoutWriters(table); - checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); + checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); } @TestTemplate diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index e1a6a7cb6..24348acf6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -160,6 +160,11 @@ public void testDeleteGranularityInvalidValue() { public void testAdvisoryPartitionSize() { Table table = validationCatalog.loadTable(tableIdent); + // An advisory partition size is only reported when a distribution is requested (Spark prohibits + // requesting a size without distribution). Since the default for partitioned tables is NONE in + // this fork, explicitly request HASH so the advisory size resolution can be exercised. + table.updateProperties().set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); long value1 = writeConf.writeRequirements().advisoryPartitionSize(); @@ -180,7 +185,17 @@ public void testSparkWriteConfDistributionDefault() { SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); - checkMode(DistributionMode.HASH, writeConf); + // In this fork, normal writes to a partitioned (unsorted) table default to NONE rather than + // HASH to avoid an Iceberg-injected shuffle that interacts poorly with Celeborn (see + // SparkWriteConf#defaultWriteDistributionMode). Row-level operations (DELETE/UPDATE/MERGE) + // are unaffected and still default to HASH. + assertThat(writeConf.distributionMode()).isEqualTo(DistributionMode.NONE); + assertThat(writeConf.copyOnWriteDistributionMode(DELETE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.positionDeltaDistributionMode(DELETE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.copyOnWriteDistributionMode(UPDATE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.positionDeltaDistributionMode(UPDATE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.copyOnWriteDistributionMode(MERGE)).isEqualTo(DistributionMode.HASH); + assertThat(writeConf.positionDeltaDistributionMode(MERGE)).isEqualTo(DistributionMode.HASH); } @TestTemplate diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index 89c44dbfc..9df0daa2a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -676,7 +676,12 @@ private Map tableProperties() { TableProperties.FORMAT_VERSION, "2", TableProperties.DEFAULT_FILE_FORMAT, - format.toString()); + format.toString(), + // This fork defaults partitioned writes to NONE distribution; these tests expect data to be + // clustered into one file per partition, so request HASH explicitly to keep file counts + // deterministic regardless of input parallelism. + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_HASH); } private void writeRecords(Table table, int files, int numRecords) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index d37d6a861..fff27d23c 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1600,6 +1600,12 @@ public void testPartitionsTableLastUpdatedSnapshot() { public void testPartitionsTableDeleteStats() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test"); Table table = createTable(tableIdentifier, SCHEMA, SPEC); + // This fork defaults partitioned writes to NONE distribution; request HASH so each partition's + // rows are clustered into a single data file, as this test's file_count assertions expect. + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_HASH) + .commit(); Table partitionsTable = loadTable(tableIdentifier, "partitions"); Dataset df1 = spark.createDataFrame(