Spark 3.5: Default partitioned write distribution mode to NONE#249
Merged
cbb330 merged 2 commits intoJun 17, 2026
Merged
Conversation
Iceberg 1.5 implements RequiresDistributionAndOrdering, so every Spark write to a partitioned table now requests a HASH distribution by default (SparkWriteConf.defaultWriteDistributionMode). Spark plans this as a RebalancePartitions / REBALANCE shuffle on the partition columns. REBALANCE is poorly handled by Celeborn 0.4 and has already caused production incidents (notably Kyoto CDC writes overloading Celeborn and impacting other tenants). Celeborn 0.6 addresses this but is still a couple of months out, while Spark 3.5 traffic is about to ramp with the Python 3.12 migration. This change flips the default write distribution mode for partitioned, unsorted tables from HASH to NONE, restoring the Spark 3.1 status quo: - Only normal (batch/append) writes are affected. DELETE/UPDATE/MERGE keep their existing HASH defaults. - The sort/ordering behavior is unchanged, so users do not need to add .sortWithinPartitions() in their code. - Users who explicitly want a distribution can still request it via the write option, session conf, or table property. Small files that may result are handled by Open House's post-write compaction. We can revisit re-enabling the default distribution once Celeborn 0.6 is rolled out. Tests updated to reflect the new default: - TestSparkWriteConf#testSparkWriteConfDistributionDefault now asserts NONE for normal writes and HASH for row-level operations. - TestSparkWriteConf#testAdvisoryPartitionSize explicitly requests HASH so an advisory partition size is reported (Spark only reports one when a distribution is requested). - TestSparkDistributionAndOrderingUtil#testDefaultWritePartitionedUnsortedTable now expects an unspecified distribution with local ordering retained. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Changing the default partitioned write distribution mode from HASH to NONE means writes no longer cluster each partition's rows into a single file; the number of output files now depends on input parallelism. Three functional tests asserted exact file counts that implicitly relied on the old one-file-per-partition layout and failed on multi-core CI runners. Restore deterministic layout by requesting HASH explicitly where the test intent is about file/partition layout rather than the default: - TestRewritePositionDeleteFilesAction: set WRITE_DISTRIBUTION_MODE=hash in the shared tableProperties() helper (partitioned tables only; the unpartitioned cases control file count via repartition()). - TestIcebergSourceTablesBase#testPartitionsTableDeleteStats: set WRITE_DISTRIBUTION_MODE=hash on the table so each partition's rows land in a single data file, matching the file_count assertions. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Collaborator
|
Thanks for the change. Discussed offline on the implications and it will not have impact to largest customers and keeps parity with spark 3.1. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Iceberg 1.5 implements
RequiresDistributionAndOrdering, so every Spark 3.5 write to a partitioned table now requests a HASH distribution by default (SparkWriteConf.defaultWriteDistributionMode). Spark plans this as aRebalancePartitions/ REBALANCE shuffle on the partition columns.REBALANCE is poorly handled by Celeborn 0.4 and has already caused production incidents (notably Kyoto CDC writes overloading Celeborn and impacting other tenants). Celeborn 0.6 fixes this but is still a couple of months out, while Spark 3.5 traffic is about to ramp significantly with the Python 3.12 migration this half.
This PR flips the default write distribution mode for partitioned, unsorted tables from
HASHtoNONE, restoring the Spark 3.1 status quo:DELETE/UPDATE/MERGEkeep their existingHASHdefaults, which covers the CDC merge-into case separately..sortWithinPartitions()in their code.This also avoids the Iceberg-injected shuffle silently overriding user intent (e.g.
DaliSpark.OUTPUT_PARALLELISM) and the extra resource usage it adds.Testing Done
Updated tests:
TestSparkWriteConf#testSparkWriteConfDistributionDefault— now assertsNONEfor normal writes andHASHfor row-level operations (DELETE/UPDATE/MERGE).TestSparkWriteConf#testAdvisoryPartitionSize— explicitly requestsHASHso an advisory partition size is reported (Spark only reports one when a distribution is requested).TestSparkDistributionAndOrderingUtil#testDefaultWritePartitionedUnsortedTable— now expects an unspecified distribution with the local ordering retained.Ran
TestSparkWriteConf,TestSparkDistributionAndOrderingUtil, andTestRequiredDistributionAndOrdering(bothsparkandspark-extensionsmodules) — all green.🤖 Generated with Claude Code