Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Spark SQL 4.2 to 4.3

- Since Spark 4.3, the configuration key `spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled` has been renamed to `spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled` to reflect that it now applies to storage-partitioned joins, aggregates, and windows. The old key continues to work as an alias.

## Upgrading from Spark SQL 4.1 to 4.2

- Since Spark 4.2, Spark enables order-independent checksums for shuffle outputs by default to detect data inconsistencies during indeterminate shuffle stage retries. If a checksum mismatch is detected, Spark rolls back and re-executes all succeeding stages that depend on the shuffle output. If rolling back is not possible for some succeeding stages, the job will fail. To restore the previous behavior, set `spark.sql.shuffle.orderIndependentChecksum.enabled` and `spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch` to `false`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,16 +353,18 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
* `KeyedPartitioning` is used in two distinct forms:
*
* 1. '''As outputPartitioning''': When used as a node's output partitioning (e.g., in
* `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are always in sorted order.
* This is how leaf data source nodes produce partition keys originally, and this ordering is
* preserved through `GroupPartitionsExec`. The sorted order is critical for storage-partitioned
* join compatibility.
* `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are typically in sorted order
* because data sources produce them that way and `GroupPartitionsExec` sorts while grouping.
* Sorted order is not a hard requirement, but it is a useful property: when both sides of a
* storage-partitioned join report sorted keys, `EnsureRequirements` can often match them
* without inserting an additional `GroupPartitionsExec`. After a narrowing projection through
* `PartitioningPreservingUnaryExecNode`, the projected keys may no longer be sorted; this is
* acceptable because `EnsureRequirements` can always reconcile both sides via
* `GroupPartitionsExec` with `expectedPartitionKeys`.
*
* 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not be
* in sorted order. This occurs because `KeyedShuffleSpec` can project the partition keys by join
* key positions. The `EnsureRequirements` rule ensures that either the unordered keys from both
* sides of a join match exactly, or it builds a common ordered set of keys and pushes them down
* to `GroupPartitionsExec` on both sides to establish a compatible ordering.
* 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not
* be in sorted order. `EnsureRequirements` handles this by building a common ordered set of
* keys and pushing them down to `GroupPartitionsExec` on both sides.
*
* == Partition Keys ==
* - `partitionKeys`: The partition keys, one per partition. May contain duplicates initially
Expand Down Expand Up @@ -417,16 +419,23 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
*
* @param expressions Partition transform expressions (e.g., `years(col)`, `bucket(10, col)`).
* @param partitionKeys Partition keys wrapped in InternalRowComparableWrapper for efficient
* comparison and grouping. One per partition. When used as outputPartitioning,
* always in sorted order. When used in `KeyedShuffleSpec`, may be unsorted
* after projection. May contain duplicates when ungrouped.
* comparison and grouping. One per partition. Typically in sorted order when
* produced by a data source or `GroupPartitionsExec`, but this is not
* guaranteed after projection. May contain duplicates when ungrouped.
* @param isGrouped Whether partition keys are unique (no duplicates). Computed on first
* creation, then preserved through copy operations to avoid recomputation.
* @param isNarrowed Whether this partitioning was derived from a finer-grained one by dropping key
* positions (e.g. via `PartitioningPreservingUnaryExecNode`). When true,
* `GroupPartitionsExec` will merge partitions that shared distinct keys in the
* original partitioning, carrying the same skew risk as
* `allowKeysSubsetOfPartitionKeys`. Such a partitioning will not satisfy
* `ClusteredDistribution` unless that config is enabled.
*/
case class KeyedPartitioning(
expressions: Seq[Expression],
@transient partitionKeys: Seq[InternalRowComparableWrapper],
isGrouped: Boolean) extends Expression with Partitioning with Unevaluable {
isGrouped: Boolean,
isNarrowed: Boolean = false) extends Expression with Partitioning with Unevaluable {
override val numPartitions = partitionKeys.length

override def children: Seq[Expression] = expressions
Expand Down Expand Up @@ -480,13 +489,19 @@ case class KeyedPartitioning(
c.areAllClusterKeysMatched(expressions)
} else {
// We'll need to find leaf attributes from the partition expressions first.
val attributes = expressions.flatMap(_.collectLeaves())
lazy val attributes = expressions.flatMap(_.collectLeaves())

if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {
// check that join keys (required clustering keys)
if (SQLConf.get.v2BucketingAllowKeysSubsetOfPartitionKeys) {
// check that operation keys (required clustering keys)
// overlap with partition keys (KeyedPartitioning attributes)
requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
expressions.forall(_.collectLeaves().size == 1)
} else if (isNarrowed && !isGrouped) {
// A narrowed, non-grouped partitioning carries the same skew risk as using a subset of
// partition keys for a join: GroupPartitionsExec will merge partitions that held
// distinct keys in the original finer-grained partitioning. Require the same config to
// opt in.
false
} else {
attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
Expand All @@ -502,9 +517,9 @@ case class KeyedPartitioning(

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = {
val result = KeyedShuffleSpec(this, distribution)
if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {
// If allowing join keys to be subset of clustering keys, we should create a new
// `KeyedPartitioning` here that is grouped on the join keys instead, and use that as
if (SQLConf.get.v2BucketingAllowKeysSubsetOfPartitionKeys) {
// If allowing operation keys to be a subset of partition keys, create a new
// `KeyedPartitioning` grouped on the operation keys, and use that as
// the returned shuffle spec.
val joinKeyPositions = result.keyPositions.map(_.nonEmpty).zipWithIndex.filter(_._1).map(_._2)
val projectedExpressions = joinKeyPositions.map(expressions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2142,15 +2142,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
.doc("Whether to allow storage-partition join in the case where join keys are " +
"a subset of the partition keys of the source tables. At planning time, " +
"Spark will group the partitions by only those keys that are in the join keys. " +
val V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS =
buildConf("spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled")
.withAlternative("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
.doc("Whether to allow storage-partitioned operations (joins, aggregates, and windows) in " +
"the case where the operation's keys are a subset of the partition keys of the source " +
"tables. At planning time, Spark will group the partitions by only those keys that are " +
"in the operation's keys. " +
s"This is currently enabled only if ${REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key} " +
"is false."
)
.version("4.0.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -7940,8 +7943,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def v2BucketingShuffleEnabled: Boolean =
getConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED)

def v2BucketingAllowJoinKeysSubsetOfPartitionKeys: Boolean =
getConf(SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)
def v2BucketingAllowKeysSubsetOfPartitionKeys: Boolean =
getConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS)

def v2BucketingAllowCompatibleTransforms: Boolean =
getConf(SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.spark.sql.execution

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning}
import org.apache.spark.sql.catalyst.trees.MultiTransform

/**
* A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that
Expand All @@ -29,8 +30,31 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
with AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
val partitionings: Seq[Partitioning] = if (hasAlias) {
flattenPartitioning(child.outputPartitioning).iterator.flatMap {
val (keyedPartitionings, otherPartitionings) =
flattenPartitioning(child.outputPartitioning).partition(_.isInstanceOf[KeyedPartitioning])

val projectedKPs =
projectKeyedPartitionings(keyedPartitionings.map(_.asInstanceOf[KeyedPartitioning]))
val projectedOthers = projectOtherPartitionings(otherPartitionings)

(projectedKPs ++ projectedOthers).take(aliasCandidateLimit) match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
}

/**
* Projects non-[[KeyedPartitioning]] partitionings through the current node's output expressions.
*
* With aliases, each partitioning expression is substituted with all possible alias combinations;
* without aliases, partitionings whose expressions reference attributes outside the output are
* dropped.
*/
private def projectOtherPartitionings(
partitionings: Seq[Partitioning]): LazyList[Partitioning] = {
if (hasAlias) {
partitionings.to(LazyList).flatMap {
case e: Expression =>
// We need unique partitionings but if the input partitioning is
// `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after
Expand All @@ -41,23 +65,85 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
val partitioningSet = mutable.Set.empty[Expression]
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
.asInstanceOf[LazyList[Partitioning]]
case o => Seq(o)
}.take(aliasCandidateLimit).toSeq
case o => LazyList(o)
}
} else {
// Filter valid partitiongs (only reference output attributes of the current plan node)
// Filter valid partitionings (only reference output attributes of the current plan node)
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
flattenPartitioning(child.outputPartitioning).filter {
partitionings.to(LazyList).filter {
case e: Expression => e.references.subsetOf(outputSet)
case _ => true
}
}
partitionings match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
}

/**
* Projects all input [[KeyedPartitioning]]s through the current node's output expressions.
*
* For each expression position (0..N-1), collects the unique expressions at that position across
* all input KPs, projects each through the output aliases, and unions the alternatives.
* Positions with at least one alternative are projectable; their count determines the maximum
* achievable granularity. Positions that cannot be expressed in the output are dropped.
*
* The resulting [[KeyedPartitioning]]s are the cross-product of the per-position alternatives
* restricted to the projectable positions. All share the same `partitionKeys` object (projected
* to the same subset of positions), preserving the invariant required by [[GroupPartitionsExec]].
*/
private def projectKeyedPartitionings(
kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = {
if (kps.isEmpty) return LazyList.empty
val numPositions = kps.head.expressions.length

val alternativesPerPosition: IndexedSeq[LazyList[Expression]] =
if (hasAlias) {
// For each position, gather unique expressions across all KPs (ExpressionSet deduplicates
// semantically equal expressions, e.g. the same join-key column shared by both KP sides)
// and project each through the output aliases.
(0 until numPositions).map { i =>
val seen = mutable.Set.empty[Expression]
ExpressionSet(kps.map(_.expressions(i))).to(LazyList).flatMap { expr =>
projectExpression(expr).filter(e => seen.add(e.canonicalized))
}
}
} else {
// No aliases: filter out non-projectable expressions first, then deduplicate.
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
(0 until numPositions).map { i =>
ExpressionSet(kps.collect {
case kp if kp.expressions(i).references.subsetOf(outputSet) => kp.expressions(i)
}).to(LazyList)
}
}

// Non-empty positions define the maximum achievable granularity.
val projectablePositions =
(0 until numPositions).filter(i => alternativesPerPosition(i).nonEmpty)

if (projectablePositions.isEmpty) return LazyList.empty

// All input KPs share the same partitionKeys by invariant; use the first as the key source.
val keySource = kps.head
val sharedKeys =
if (projectablePositions.length == numPositions) keySource.partitionKeys
else keySource.projectKeys(projectablePositions)._2

val isGrouped = sharedKeys.distinct.size == sharedKeys.size
// A KP is narrowed if this node drops positions, or if the input KPs were already narrowed
// (i.e. came from a finer-grained partitioning). The flag must be sticky: a subsequent
// PartitioningPreservingUnaryExecNode that passes all positions through would otherwise
// recompute isNarrowed=false, silently dropping the protection.
val isNarrowed = projectablePositions.length < numPositions || keySource.isNarrowed

// Cross-product the per-position alternatives to produce all concrete KPs.
// Note: generateCartesianProduct expects thunks () => Seq[T], but wrapping LazyLists in thunks
// here is not strictly necessary since they are already lazy -- we do it only to match the API.
// No deduplication is needed here: per-position alternatives are already canonically distinct,
// so all cross-product combinations are distinct by construction.
MultiTransform.generateCartesianProduct(
projectablePositions.map(i => () => alternativesPerPosition(i)))
.map(projectedExprs =>
new KeyedPartitioning(projectedExprs, sharedKeys, isGrouped, isNarrowed))
}

private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,12 @@ case class EnsureRequirements(
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys)
.orElse(reorderJoinKeysRecursively(
leftKeys, rightKeys, leftPartitioning, None))
case (Some(KeyedPartitioning(clustering, _, _)), _) =>
case (Some(KeyedPartitioning(clustering, _, _, _)), _) =>
val leafExprs = clustering.flatMap(_.collectLeaves())
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, leftKeys)
.orElse(reorderJoinKeysRecursively(
leftKeys, rightKeys, None, rightPartitioning))
case (_, Some(KeyedPartitioning(clustering, _, _))) =>
case (_, Some(KeyedPartitioning(clustering, _, _, _))) =>
val leafExprs = clustering.flatMap(_.collectLeaves())
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, rightKeys)
.orElse(reorderJoinKeysRecursively(
Expand Down Expand Up @@ -512,7 +512,7 @@ case class EnsureRequirements(
leftSpec.isCompatibleWith(rightSpec)
if ((!isCompatible || conf.v2BucketingPartiallyClusteredDistributionEnabled) &&
(conf.v2BucketingPushPartValuesEnabled ||
conf.v2BucketingAllowJoinKeysSubsetOfPartitionKeys)) {
conf.v2BucketingAllowKeysSubsetOfPartitionKeys)) {
logInfo("Pushing common partition values for storage-partitioned join")
isCompatible = leftSpec.areKeysCompatible(rightSpec)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ object ShuffleExchangeExec {
val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
row => projection(row)
case SinglePartition => identity
case KeyedPartitioning(expressions, _, _) =>
case KeyedPartitioning(expressions, _, _, _) =>
row => bindReferences(expressions, outputAttributes).map(_.eval(row))
case s: ShufflePartitionIdPassThrough =>
// For ShufflePartitionIdPassThrough, the expression directly evaluates to the partition ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class DistributionAndOrderingSuiteBase
plan: QueryPlan[T]): Partitioning = partitioning match {
case HashPartitioning(exprs, numPartitions) =>
HashPartitioning(exprs.map(resolveAttrs(_, plan)), numPartitions)
case KeyedPartitioning(expressions, partitionKeys, isGrouped) =>
case KeyedPartitioning(expressions, partitionKeys, isGrouped, _) =>
KeyedPartitioning(expressions.map(resolveAttrs(_, plan)), partitionKeys, isGrouped)
case PartitioningCollection(partitionings) =>
PartitioningCollection(partitionings.map(resolvePartitioning(_, plan)))
Expand Down
Loading