Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private DistributionMode defaultWriteDistributionMode() {
if (table.sortOrder().isSorted()) {
return RANGE;
} else if (table.spec().isPartitioned()) {
return HASH;
return NONE;
} else {
return NONE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ public void dropTable() {
//
// PARTITIONED BY date, days(ts) UNORDERED
// -------------------------------------------------------------------------
// write mode is NOT SET -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts)
// write mode is NOT SET (fanout) -> CLUSTER BY date, days(ts) + empty ordering
// NOTE: in this fork the default write distribution mode for partitioned tables is NONE
// (not HASH), so "write mode is NOT SET" behaves like "write mode is NONE" below
// (see SparkWriteConf#defaultWriteDistributionMode).
// write mode is NOT SET -> unspecified distribution + LOCALLY ORDER BY date, days(ts)
// write mode is NOT SET (fanout) -> unspecified distribution + empty ordering
// write mode is NONE -> unspecified distribution + LOCALLY ORDERED BY date, days(ts)
// write mode is NONE (fanout) -> unspecified distribution + empty ordering
// write mode is HASH -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts)
Expand Down Expand Up @@ -243,21 +246,19 @@ public void testDefaultWritePartitionedUnsortedTable() {

disableFanoutWriters(table);

Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"), Expressions.days("ts")};
Distribution expectedDistribution = Distributions.clustered(expectedClustering);

// The default distribution mode for partitioned tables is NONE in this fork, so no clustering
// is requested; only the local ordering on the partition columns is retained.
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
};

checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering);
checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, expectedOrdering);

enableFanoutWriters(table);

checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING);
checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ public void testDeleteGranularityInvalidValue() {
public void testAdvisoryPartitionSize() {
Table table = validationCatalog.loadTable(tableIdent);

// An advisory partition size is only reported when a distribution is requested (Spark prohibits
// requesting a size without distribution). Since the default for partitioned tables is NONE in
// this fork, explicitly request HASH so the advisory size resolution can be exercised.
table.updateProperties().set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit();

SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());

long value1 = writeConf.writeRequirements().advisoryPartitionSize();
Expand All @@ -180,7 +185,17 @@ public void testSparkWriteConfDistributionDefault() {

SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());

checkMode(DistributionMode.HASH, writeConf);
// In this fork, normal writes to a partitioned (unsorted) table default to NONE rather than
// HASH to avoid an Iceberg-injected shuffle that interacts poorly with Celeborn (see
// SparkWriteConf#defaultWriteDistributionMode). Row-level operations (DELETE/UPDATE/MERGE)
// are unaffected and still default to HASH.
assertThat(writeConf.distributionMode()).isEqualTo(DistributionMode.NONE);
assertThat(writeConf.copyOnWriteDistributionMode(DELETE)).isEqualTo(DistributionMode.HASH);
assertThat(writeConf.positionDeltaDistributionMode(DELETE)).isEqualTo(DistributionMode.HASH);
assertThat(writeConf.copyOnWriteDistributionMode(UPDATE)).isEqualTo(DistributionMode.HASH);
assertThat(writeConf.positionDeltaDistributionMode(UPDATE)).isEqualTo(DistributionMode.HASH);
assertThat(writeConf.copyOnWriteDistributionMode(MERGE)).isEqualTo(DistributionMode.HASH);
assertThat(writeConf.positionDeltaDistributionMode(MERGE)).isEqualTo(DistributionMode.HASH);
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,12 @@ private Map<String, String> tableProperties() {
TableProperties.FORMAT_VERSION,
"2",
TableProperties.DEFAULT_FILE_FORMAT,
format.toString());
format.toString(),
// This fork defaults partitioned writes to NONE distribution; these tests expect data to be
// clustered into one file per partition, so request HASH explicitly to keep file counts
// deterministic regardless of input parallelism.
TableProperties.WRITE_DISTRIBUTION_MODE,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH);
}

private void writeRecords(Table table, int files, int numRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,12 @@ public void testPartitionsTableLastUpdatedSnapshot() {
public void testPartitionsTableDeleteStats() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
// This fork defaults partitioned writes to NONE distribution; request HASH so each partition's
// rows are clustered into a single data file, as this test's file_count assertions expect.
table
.updateProperties()
.set(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_HASH)
.commit();
Table partitionsTable = loadTable(tableIdentifier, "partitions");
Dataset<Row> df1 =
spark.createDataFrame(
Expand Down
Loading