Skip to content

Spark 3.5: Default partitioned write distribution mode to NONE#249

Merged
cbb330 merged 2 commits into
openhouse-1.5.2from
smahadik/spark35-partitioned-write-distribution-none
Jun 17, 2026
Merged

Spark 3.5: Default partitioned write distribution mode to NONE#249
cbb330 merged 2 commits into
openhouse-1.5.2from
smahadik/spark35-partitioned-write-distribution-none

Conversation

@shardulm94

Copy link
Copy Markdown
Contributor

Summary

Iceberg 1.5 implements RequiresDistributionAndOrdering, so every Spark 3.5 write to a partitioned table now requests a HASH distribution by default (SparkWriteConf.defaultWriteDistributionMode). Spark plans this as a RebalancePartitions / REBALANCE shuffle on the partition columns.

REBALANCE is poorly handled by Celeborn 0.4 and has already caused production incidents (notably Kyoto CDC writes overloading Celeborn and impacting other tenants). Celeborn 0.6 fixes this but is still a couple of months out, while Spark 3.5 traffic is about to ramp significantly with the Python 3.12 migration this half.

This PR flips the default write distribution mode for partitioned, unsorted tables from HASH to NONE, restoring the Spark 3.1 status quo:

  • Scope is intentionally narrow. Only normal (batch/append) writes are affected. DELETE/UPDATE/MERGE keep their existing HASH defaults, which covers the CDC merge-into case separately.
  • Ordering is unchanged. We keep the local sort, so users don't need to add .sortWithinPartitions() in their code.
  • Explicit requests still work. Users who want a distribution can still request it via the write option, session conf, or table property.
  • Any small files that result are handled by Open House's post-write compaction. We can revisit re-enabling the default distribution once Celeborn 0.6 is rolled out.

This also avoids the Iceberg-injected shuffle silently overriding user intent (e.g. DaliSpark.OUTPUT_PARALLELISM) and the extra resource usage it adds.

Testing Done

  • Local code review completed
  • Unit tests updated to reflect the new default
  • Targeted test suites pass on JDK 11

Updated tests:

  • TestSparkWriteConf#testSparkWriteConfDistributionDefault — now asserts NONE for normal writes and HASH for row-level operations (DELETE/UPDATE/MERGE).
  • TestSparkWriteConf#testAdvisoryPartitionSize — explicitly requests HASH so an advisory partition size is reported (Spark only reports one when a distribution is requested).
  • TestSparkDistributionAndOrderingUtil#testDefaultWritePartitionedUnsortedTable — now expects an unspecified distribution with the local ordering retained.

Ran TestSparkWriteConf, TestSparkDistributionAndOrderingUtil, and TestRequiredDistributionAndOrdering (both spark and spark-extensions modules) — all green.

🤖 Generated with Claude Code

Iceberg 1.5 implements RequiresDistributionAndOrdering, so every Spark
write to a partitioned table now requests a HASH distribution by default
(SparkWriteConf.defaultWriteDistributionMode). Spark plans this as a
RebalancePartitions / REBALANCE shuffle on the partition columns.

REBALANCE is poorly handled by Celeborn 0.4 and has already caused
production incidents (notably Kyoto CDC writes overloading Celeborn and
impacting other tenants). Celeborn 0.6 addresses this but is still a
couple of months out, while Spark 3.5 traffic is about to ramp with the
Python 3.12 migration.

This change flips the default write distribution mode for partitioned,
unsorted tables from HASH to NONE, restoring the Spark 3.1 status quo:

- Only normal (batch/append) writes are affected. DELETE/UPDATE/MERGE
  keep their existing HASH defaults.
- The sort/ordering behavior is unchanged, so users do not need to add
  .sortWithinPartitions() in their code.
- Users who explicitly want a distribution can still request it via the
  write option, session conf, or table property.

Small files that may result are handled by Open House's post-write
compaction. We can revisit re-enabling the default distribution once
Celeborn 0.6 is rolled out.

Tests updated to reflect the new default:
- TestSparkWriteConf#testSparkWriteConfDistributionDefault now asserts
  NONE for normal writes and HASH for row-level operations.
- TestSparkWriteConf#testAdvisoryPartitionSize explicitly requests HASH
  so an advisory partition size is reported (Spark only reports one when
  a distribution is requested).
- TestSparkDistributionAndOrderingUtil#testDefaultWritePartitionedUnsortedTable
  now expects an unspecified distribution with local ordering retained.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the SPARK label Jun 16, 2026
@shardulm94 shardulm94 marked this pull request as ready for review June 16, 2026 19:19
Changing the default partitioned write distribution mode from HASH to
NONE means writes no longer cluster each partition's rows into a single
file; the number of output files now depends on input parallelism. Three
functional tests asserted exact file counts that implicitly relied on the
old one-file-per-partition layout and failed on multi-core CI runners.

Restore deterministic layout by requesting HASH explicitly where the test
intent is about file/partition layout rather than the default:
- TestRewritePositionDeleteFilesAction: set WRITE_DISTRIBUTION_MODE=hash
  in the shared tableProperties() helper (partitioned tables only; the
  unpartitioned cases control file count via repartition()).
- TestIcebergSourceTablesBase#testPartitionsTableDeleteStats: set
  WRITE_DISTRIBUTION_MODE=hash on the table so each partition's rows land
  in a single data file, matching the file_count assertions.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@cbb330

cbb330 commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator

Thanks for the change. Discussed offline on the implications and it will not have impact to largest customers and keeps parity with spark 3.1.

@cbb330 cbb330 merged commit d69c1fd into openhouse-1.5.2 Jun 17, 2026
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants