diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index dca3b03eeb4e7..66531397d2cc1 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -22,6 +22,10 @@ license: | * Table of contents {:toc} +## Upgrading from Spark SQL 4.2 to 4.3 + +- Since Spark 4.3, the configuration key `spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled` has been renamed to `spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled` to reflect that it now applies to storage-partitioned joins, aggregates, and windows. The old key continues to work as an alias. + ## Upgrading from Spark SQL 4.1 to 4.2 - Since Spark 4.2, Spark enables order-independent checksums for shuffle outputs by default to detect data inconsistencies during indeterminate shuffle stage retries. If a checksum mismatch is detected, Spark rolls back and re-executes all succeeding stages that depend on the shuffle output. If rolling back is not possible for some succeeding stages, the job will fail. To restore the previous behavior, set `spark.sql.shuffle.orderIndependentChecksum.enabled` and `spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch` to `false`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 6fb2ae3821e09..cc50da1f17fdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -353,16 +353,18 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa * `KeyedPartitioning` is used in two distinct forms: * * 1. '''As outputPartitioning''': When used as a node's output partitioning (e.g., in - * `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are always in sorted order. - * This is how leaf data source nodes produce partition keys originally, and this ordering is - * preserved through `GroupPartitionsExec`. The sorted order is critical for storage-partitioned - * join compatibility. + * `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are typically in sorted order + * because data sources produce them that way and `GroupPartitionsExec` sorts while grouping. + * Sorted order is not a hard requirement, but it is a useful property: when both sides of a + * storage-partitioned join report sorted keys, `EnsureRequirements` can often match them + * without inserting an additional `GroupPartitionsExec`. After a narrowing projection through + * `PartitioningPreservingUnaryExecNode`, the projected keys may no longer be sorted; this is + * acceptable because `EnsureRequirements` can always reconcile both sides via + * `GroupPartitionsExec` with `expectedPartitionKeys`. * - * 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not be - * in sorted order. This occurs because `KeyedShuffleSpec` can project the partition keys by join - * key positions. The `EnsureRequirements` rule ensures that either the unordered keys from both - * sides of a join match exactly, or it builds a common ordered set of keys and pushes them down - * to `GroupPartitionsExec` on both sides to establish a compatible ordering. + * 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not + * be in sorted order. `EnsureRequirements` handles this by building a common ordered set of + * keys and pushing them down to `GroupPartitionsExec` on both sides. * * == Partition Keys == * - `partitionKeys`: The partition keys, one per partition. May contain duplicates initially @@ -417,16 +419,23 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa * * @param expressions Partition transform expressions (e.g., `years(col)`, `bucket(10, col)`). * @param partitionKeys Partition keys wrapped in InternalRowComparableWrapper for efficient - * comparison and grouping. One per partition. When used as outputPartitioning, - * always in sorted order. When used in `KeyedShuffleSpec`, may be unsorted - * after projection. May contain duplicates when ungrouped. + * comparison and grouping. One per partition. Typically in sorted order when + * produced by a data source or `GroupPartitionsExec`, but this is not + * guaranteed after projection. May contain duplicates when ungrouped. * @param isGrouped Whether partition keys are unique (no duplicates). Computed on first * creation, then preserved through copy operations to avoid recomputation. + * @param isNarrowed Whether this partitioning was derived from a finer-grained one by dropping key + * positions (e.g. via `PartitioningPreservingUnaryExecNode`). When true, + * `GroupPartitionsExec` will merge partitions that shared distinct keys in the + * original partitioning, carrying the same skew risk as + * `allowKeysSubsetOfPartitionKeys`. Such a partitioning will not satisfy + * `ClusteredDistribution` unless that config is enabled. */ case class KeyedPartitioning( expressions: Seq[Expression], @transient partitionKeys: Seq[InternalRowComparableWrapper], - isGrouped: Boolean) extends Expression with Partitioning with Unevaluable { + isGrouped: Boolean, + isNarrowed: Boolean = false) extends Expression with Partitioning with Unevaluable { override val numPartitions = partitionKeys.length override def children: Seq[Expression] = expressions @@ -480,13 +489,19 @@ case class KeyedPartitioning( c.areAllClusterKeysMatched(expressions) } else { // We'll need to find leaf attributes from the partition expressions first. - val attributes = expressions.flatMap(_.collectLeaves()) + lazy val attributes = expressions.flatMap(_.collectLeaves()) - if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) { - // check that join keys (required clustering keys) + if (SQLConf.get.v2BucketingAllowKeysSubsetOfPartitionKeys) { + // check that operation keys (required clustering keys) // overlap with partition keys (KeyedPartitioning attributes) requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) && expressions.forall(_.collectLeaves().size == 1) + } else if (isNarrowed && !isGrouped) { + // A narrowed, non-grouped partitioning carries the same skew risk as using a subset of + // partition keys for a join: GroupPartitionsExec will merge partitions that held + // distinct keys in the original finer-grained partitioning. Require the same config to + // opt in. + false } else { attributes.forall(x => requiredClustering.exists(_.semanticEquals(x))) } @@ -502,9 +517,9 @@ case class KeyedPartitioning( override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = { val result = KeyedShuffleSpec(this, distribution) - if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) { - // If allowing join keys to be subset of clustering keys, we should create a new - // `KeyedPartitioning` here that is grouped on the join keys instead, and use that as + if (SQLConf.get.v2BucketingAllowKeysSubsetOfPartitionKeys) { + // If allowing operation keys to be a subset of partition keys, create a new + // `KeyedPartitioning` grouped on the operation keys, and use that as // the returned shuffle spec. val joinKeyPositions = result.keyPositions.map(_.nonEmpty).zipWithIndex.filter(_._1).map(_._2) val projectedExpressions = joinKeyPositions.map(expressions) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 83f1816c9727f..f94d695084b03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2142,15 +2142,18 @@ object SQLConf { .booleanConf .createWithDefault(false) - val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS = - buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled") - .doc("Whether to allow storage-partition join in the case where join keys are " + - "a subset of the partition keys of the source tables. At planning time, " + - "Spark will group the partitions by only those keys that are in the join keys. " + + val V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS = + buildConf("spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled") + .withAlternative("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled") + .doc("Whether to allow storage-partitioned operations (joins, aggregates, and windows) in " + + "the case where the operation's keys are a subset of the partition keys of the source " + + "tables. At planning time, Spark will group the partitions by only those keys that are " + + "in the operation's keys. " + s"This is currently enabled only if ${REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key} " + "is false." ) .version("4.0.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf .createWithDefault(false) @@ -7940,8 +7943,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingShuffleEnabled: Boolean = getConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED) - def v2BucketingAllowJoinKeysSubsetOfPartitionKeys: Boolean = - getConf(SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS) + def v2BucketingAllowKeysSubsetOfPartitionKeys: Boolean = + getConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS) def v2BucketingAllowCompatibleTransforms: Boolean = getConf(SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 3b847b5852b13..910cbcf2210a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.execution import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet} import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.catalyst.trees.MultiTransform /** * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that @@ -29,8 +30,31 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { - val partitionings: Seq[Partitioning] = if (hasAlias) { - flattenPartitioning(child.outputPartitioning).iterator.flatMap { + val (keyedPartitionings, otherPartitionings) = + flattenPartitioning(child.outputPartitioning).partition(_.isInstanceOf[KeyedPartitioning]) + + val projectedKPs = + projectKeyedPartitionings(keyedPartitionings.map(_.asInstanceOf[KeyedPartitioning])) + val projectedOthers = projectOtherPartitionings(otherPartitionings) + + (projectedKPs ++ projectedOthers).take(aliasCandidateLimit) match { + case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) + case Seq(p) => p + case ps => PartitioningCollection(ps) + } + } + + /** + * Projects non-[[KeyedPartitioning]] partitionings through the current node's output expressions. + * + * With aliases, each partitioning expression is substituted with all possible alias combinations; + * without aliases, partitionings whose expressions reference attributes outside the output are + * dropped. + */ + private def projectOtherPartitionings( + partitionings: Seq[Partitioning]): LazyList[Partitioning] = { + if (hasAlias) { + partitionings.to(LazyList).flatMap { case e: Expression => // We need unique partitionings but if the input partitioning is // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after @@ -41,23 +65,85 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode val partitioningSet = mutable.Set.empty[Expression] projectExpression(e) .filter(e => partitioningSet.add(e.canonicalized)) - .take(aliasCandidateLimit) .asInstanceOf[LazyList[Partitioning]] - case o => Seq(o) - }.take(aliasCandidateLimit).toSeq + case o => LazyList(o) + } } else { - // Filter valid partitiongs (only reference output attributes of the current plan node) + // Filter valid partitionings (only reference output attributes of the current plan node) val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) - flattenPartitioning(child.outputPartitioning).filter { + partitionings.to(LazyList).filter { case e: Expression => e.references.subsetOf(outputSet) case _ => true } } - partitionings match { - case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) - case Seq(p) => p - case ps => PartitioningCollection(ps) - } + } + + /** + * Projects all input [[KeyedPartitioning]]s through the current node's output expressions. + * + * For each expression position (0..N-1), collects the unique expressions at that position across + * all input KPs, projects each through the output aliases, and unions the alternatives. + * Positions with at least one alternative are projectable; their count determines the maximum + * achievable granularity. Positions that cannot be expressed in the output are dropped. + * + * The resulting [[KeyedPartitioning]]s are the cross-product of the per-position alternatives + * restricted to the projectable positions. All share the same `partitionKeys` object (projected + * to the same subset of positions), preserving the invariant required by [[GroupPartitionsExec]]. + */ + private def projectKeyedPartitionings( + kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = { + if (kps.isEmpty) return LazyList.empty + val numPositions = kps.head.expressions.length + + val alternativesPerPosition: IndexedSeq[LazyList[Expression]] = + if (hasAlias) { + // For each position, gather unique expressions across all KPs (ExpressionSet deduplicates + // semantically equal expressions, e.g. the same join-key column shared by both KP sides) + // and project each through the output aliases. + (0 until numPositions).map { i => + val seen = mutable.Set.empty[Expression] + ExpressionSet(kps.map(_.expressions(i))).to(LazyList).flatMap { expr => + projectExpression(expr).filter(e => seen.add(e.canonicalized)) + } + } + } else { + // No aliases: filter out non-projectable expressions first, then deduplicate. + val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) + (0 until numPositions).map { i => + ExpressionSet(kps.collect { + case kp if kp.expressions(i).references.subsetOf(outputSet) => kp.expressions(i) + }).to(LazyList) + } + } + + // Non-empty positions define the maximum achievable granularity. + val projectablePositions = + (0 until numPositions).filter(i => alternativesPerPosition(i).nonEmpty) + + if (projectablePositions.isEmpty) return LazyList.empty + + // All input KPs share the same partitionKeys by invariant; use the first as the key source. + val keySource = kps.head + val sharedKeys = + if (projectablePositions.length == numPositions) keySource.partitionKeys + else keySource.projectKeys(projectablePositions)._2 + + val isGrouped = sharedKeys.distinct.size == sharedKeys.size + // A KP is narrowed if this node drops positions, or if the input KPs were already narrowed + // (i.e. came from a finer-grained partitioning). The flag must be sticky: a subsequent + // PartitioningPreservingUnaryExecNode that passes all positions through would otherwise + // recompute isNarrowed=false, silently dropping the protection. + val isNarrowed = projectablePositions.length < numPositions || keySource.isNarrowed + + // Cross-product the per-position alternatives to produce all concrete KPs. + // Note: generateCartesianProduct expects thunks () => Seq[T], but wrapping LazyLists in thunks + // here is not strictly necessary since they are already lazy -- we do it only to match the API. + // No deduplication is needed here: per-position alternatives are already canonically distinct, + // so all cross-product combinations are distinct by construction. + MultiTransform.generateCartesianProduct( + projectablePositions.map(i => () => alternativesPerPosition(i))) + .map(projectedExprs => + new KeyedPartitioning(projectedExprs, sharedKeys, isGrouped, isNarrowed)) } private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 62a3a977162aa..c5a08e983e610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -408,12 +408,12 @@ case class EnsureRequirements( reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys) .orElse(reorderJoinKeysRecursively( leftKeys, rightKeys, leftPartitioning, None)) - case (Some(KeyedPartitioning(clustering, _, _)), _) => + case (Some(KeyedPartitioning(clustering, _, _, _)), _) => val leafExprs = clustering.flatMap(_.collectLeaves()) reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, leftKeys) .orElse(reorderJoinKeysRecursively( leftKeys, rightKeys, None, rightPartitioning)) - case (_, Some(KeyedPartitioning(clustering, _, _))) => + case (_, Some(KeyedPartitioning(clustering, _, _, _))) => val leafExprs = clustering.flatMap(_.collectLeaves()) reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leafExprs, rightKeys) .orElse(reorderJoinKeysRecursively( @@ -512,7 +512,7 @@ case class EnsureRequirements( leftSpec.isCompatibleWith(rightSpec) if ((!isCompatible || conf.v2BucketingPartiallyClusteredDistributionEnabled) && (conf.v2BucketingPushPartValuesEnabled || - conf.v2BucketingAllowJoinKeysSubsetOfPartitionKeys)) { + conf.v2BucketingAllowKeysSubsetOfPartitionKeys)) { logInfo("Pushing common partition values for storage-partitioned join") isCompatible = leftSpec.areKeysCompatible(rightSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 7f757c651c560..7444384229162 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -407,7 +407,7 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) row => projection(row) case SinglePartition => identity - case KeyedPartitioning(expressions, _, _) => + case KeyedPartitioning(expressions, _, _, _) => row => bindReferences(expressions, outputAttributes).map(_.eval(row)) case s: ShufflePartitionIdPassThrough => // For ShufflePartitionIdPassThrough, the expression directly evaluates to the partition ID diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala index d5825432ebbf1..0273a5d6dd494 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DistributionAndOrderingSuiteBase.scala @@ -50,7 +50,7 @@ abstract class DistributionAndOrderingSuiteBase plan: QueryPlan[T]): Partitioning = partitioning match { case HashPartitioning(exprs, numPartitions) => HashPartitioning(exprs.map(resolveAttrs(_, plan)), numPartitions) - case KeyedPartitioning(expressions, partitionKeys, isGrouped) => + case KeyedPartitioning(expressions, partitionKeys, isGrouped, _) => KeyedPartitioning(expressions.map(resolveAttrs(_, plan)), partitionKeys, isGrouped) case PartitioningCollection(partitionings) => PartitioningCollection(partitionings.map(resolvePartitioning(_, plan))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 38de6b043bc2b..2a0ab52c36933 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1733,7 +1733,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with } } - test("SPARK-48065: SPJ: allowJoinKeysSubsetOfPartitionKeys is too strict") { + test("SPARK-48065: SPJ: allowKeysSubsetOfPartitionKeys is too strict") { val table1 = "tab1e1" val table2 = "table2" val partition = Array(identity("id")) @@ -1764,7 +1764,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> partiallyClustered.toString, - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { val df = sql( s""" |${selectWithMergeJoinHint("t1", "t2")} @@ -1821,15 +1821,15 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with Seq(true, false).foreach { pushDownValues => Seq(true, false).foreach { filter => Seq(true, false).foreach { partiallyClustered => - Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => + Seq(true, false).foreach { allowKeysSubsetOfPartitionKeys => withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> partiallyClustered.toString, SQLConf.V2_BUCKETING_PARTITION_FILTER_ENABLED.key -> filter.toString, - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> - allowJoinKeysSubsetOfPartitionKeys.toString) { + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowKeysSubsetOfPartitionKeys.toString) { val df = sql( s""" |${selectWithMergeJoinHint("t1", "t2")} @@ -1838,7 +1838,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with |ON t1.id = t2.id ORDER BY t1.id, t1data, t2data |""".stripMargin) val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (allowJoinKeysSubsetOfPartitionKeys) { + if (allowKeysSubsetOfPartitionKeys) { assert(shuffles.isEmpty, "SPJ should be triggered") } else { assert(shuffles.nonEmpty, "SPJ should not be triggered") @@ -1846,7 +1846,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) .map(_.outputPartitioning.numPartitions) - (allowJoinKeysSubsetOfPartitionKeys, partiallyClustered, filter) match { + (allowKeysSubsetOfPartitionKeys, partiallyClustered, filter) match { // SPJ, partially-clustered, with filter case (true, true, true) => assert(groupPartitions == Seq(6, 6)) @@ -1963,13 +1963,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with sql(finalStr) } - Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => + Seq(true, false).foreach { allowKeysSubsetOfPartitionKeys => withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> - allowJoinKeysSubsetOfPartitionKeys.toString, + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowKeysSubsetOfPartitionKeys.toString, SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { val df = sql( s""" @@ -2123,13 +2123,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with sql(finalStr) } - Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => + Seq(true, false).foreach { allowKeysSubsetOfPartitionKeys => withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> - allowJoinKeysSubsetOfPartitionKeys.toString, + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowKeysSubsetOfPartitionKeys.toString, SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { val df = sql( s""" @@ -2237,13 +2237,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with sql(insertStr) } - Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => + Seq(true, false).foreach { allowKeysSubsetOfPartitionKeys => withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> - allowJoinKeysSubsetOfPartitionKeys.toString, + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowKeysSubsetOfPartitionKeys.toString, SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { val df = sql( s""" @@ -2306,7 +2306,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { val df = sql( s""" @@ -2372,7 +2372,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> allowPushDown.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> partiallyClustered.toString, - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { val df = sql( s""" @@ -2424,15 +2424,15 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with Seq(true, false).foreach { pushDownValues => Seq(true, false).foreach { partiallyClustered => - Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => + Seq(true, false).foreach { allowKeysSubsetOfPartitionKeys => withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> partiallyClustered.toString, - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> - allowJoinKeysSubsetOfPartitionKeys.toString) { + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowKeysSubsetOfPartitionKeys.toString) { val df = sql( s""" @@ -2445,7 +2445,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with checkAnswer(df, Seq(Row(1, 4, "aa"), Row(2, 5, "bb"), Row(3, 6, "cc"))) val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (allowJoinKeysSubsetOfPartitionKeys) { + if (allowKeysSubsetOfPartitionKeys) { assert(shuffles.isEmpty, "SPJ should be triggered") } else { assert(shuffles.nonEmpty, "SPJ should not be triggered") @@ -2453,7 +2453,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with val partitions = collectGroupPartitions(df.queryExecution.executedPlan) .map(_.outputPartitioning.numPartitions) - (pushDownValues, allowJoinKeysSubsetOfPartitionKeys, partiallyClustered) match { + (pushDownValues, allowKeysSubsetOfPartitionKeys, partiallyClustered) match { // SPJ and partially-clustered case (_, true, _) => assert(partitions == Seq(3, 3)) // non-SPJ or SPJ/partially-clustered @@ -2486,20 +2486,20 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with Seq(true, false).foreach { pushDownValues => Seq(true, false).foreach { partiallyClustered => - Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => + Seq(true, false).foreach { allowKeysSubsetOfPartitionKeys => withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> partiallyClustered.toString, - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> - allowJoinKeysSubsetOfPartitionKeys.toString) { + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowKeysSubsetOfPartitionKeys.toString) { val df = createJoinTestDF(Seq("arrive_time" -> "time"), extraColumns = Seq("p.item_id")) // Currently SPJ for case where join key not same as partition key // only supported when push-part-values enabled val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (allowJoinKeysSubsetOfPartitionKeys) { + if (allowKeysSubsetOfPartitionKeys) { assert(shuffles.isEmpty, "SPJ should be triggered") } else { assert(shuffles.nonEmpty, "SPJ should not be triggered") @@ -2507,7 +2507,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with val partitions = collectGroupPartitions(df.queryExecution.executedPlan) .map(_.outputPartitioning.numPartitions) - (allowJoinKeysSubsetOfPartitionKeys, partiallyClustered) match { + (allowKeysSubsetOfPartitionKeys, partiallyClustered) match { // SPJ and partially-clustered case (true, true) => assert(partitions == Seq(5, 5)) // SPJ and not partially-clustered @@ -2558,7 +2558,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushdownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.size == 1, "SPJ should be triggered") @@ -2705,7 +2705,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.size == 1, "SPJ should be triggered") @@ -3146,7 +3146,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", SQLConf.V2_BUCKETING_PRESERVE_ORDERING_ON_COALESCE_ENABLED.key -> "true") { val metrics = runAndFetchMetrics { val df = sql( @@ -3181,7 +3181,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { val customers_partitions = Array(identity("customer_name"), bucket(4, "customer_id")) createTable(customers, customersColumns, customers_partitions) @@ -3254,7 +3254,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with test("SPARK-55535: Multi table join granular partition grouping") { withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val items_partitions = Array(identity("id"), years("arrive_time")) createTable(items, itemsColumns, items_partitions) @@ -3450,7 +3450,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with sql(s"INSERT INTO testcat.ns.$purchases VALUES (2, 10.0, cast('2021-01-01' as timestamp))") withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { val df = sql( s""" @@ -3793,7 +3793,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with Seq(true, false).foreach { preserveOrdering => withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", - SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", SQLConf.V2_BUCKETING_PRESERVE_ORDERING_ON_COALESCE_ENABLED.key -> preserveOrdering.toString) { val df = sql( @@ -3988,4 +3988,199 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with } } } + + test("SPARK-46367: partition key alias in subquery projects KeyedPartitioning") { + // A subquery that renames a partition key (id -> pk) creates a ProjectExec between the scan and + // the join. This test verifies that KeyedPartitioning expressions are correctly projected + // through aliases so that SPJ still works without a shuffle. Both sides have the same partition + // key sequence so no GroupPartitionsExec is needed. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") + + val df = sql( + s""" + |${selectWithMergeJoinHint("sub", "p")} + |sub.pk, p.price AS purchase_price + |FROM (SELECT id AS pk FROM testcat.ns.$items) sub + |JOIN testcat.ns.$purchases p + |ON sub.pk = p.item_id + |ORDER BY pk, purchase_price + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle when partition key is aliased in subquery") + + checkAnswer(df, Seq(Row(1, 42.0f), Row(2, 11.0f), Row(3, 19.5f))) + } + + test("SPARK-46367: narrowing projection requires allowKeysSubsetOfPartitionKeys") { + // items is partitioned by (id, name). The subquery projects away 'name', narrowing + // KeyedPartitioning([id, name]) -> KeyedPartitioning([id]) with isNarrowed=true. + // Because id=1 maps to two original partitions ("aa" and "bb"), isGrouped=false. + // GroupPartitionsExec would merge them, carrying the same skew risk as subset partition + // keys -- so SPJ requires allowKeysSubsetOfPartitionKeys to be enabled. + val items_partitions = Array(identity("id"), identity("name")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'bb', 41.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'cc', 10.0, cast('2020-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp))") + + Seq(true, false).foreach { allowSubset => + withSQLConf( + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowSubset.toString) { + + val df = sql( + s""" + |${selectWithMergeJoinHint("sub", "p")} + |sub.id, p.price AS purchase_price + |FROM (SELECT id FROM testcat.ns.$items WHERE name >= 'aa') sub + |JOIN testcat.ns.$purchases p + |ON sub.id = p.item_id + |ORDER BY sub.id, purchase_price + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + if (allowSubset) { + assert(shuffles.isEmpty, "SPJ should be triggered with config enabled") + } else { + assert(shuffles.nonEmpty, "SPJ should not be triggered without config") + } + + checkAnswer(df, Seq(Row(1, 42.0f), Row(1, 42.0f), Row(2, 11.0f))) + } + } + } + + test("SPARK-46367: narrowing projection with distinct projected keys does not require " + + "allowKeysSubsetOfPartitionKeys") { + // items is partitioned by (id, name) but each id value is unique, so projecting away 'name' + // produces KeyedPartitioning([id]) with isNarrowed=true but isGrouped=true. + // Because no two original partitions share the same projected key, GroupPartitionsExec does not + // merge any partitions -- no skew risk -- so SPJ works without config. + val items_partitions = Array(identity("id"), identity("name")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") + + withSQLConf( + SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "false") { + val df = sql( + s""" + |${selectWithMergeJoinHint("sub", "p")} + |sub.id, p.price AS purchase_price + |FROM (SELECT id FROM testcat.ns.$items WHERE name >= 'aa') sub + |JOIN testcat.ns.$purchases p + |ON sub.id = p.item_id + |ORDER BY sub.id, purchase_price + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, + "should not add shuffle: narrowed KP remains grouped so no skew risk") + + checkAnswer(df, Seq(Row(1, 42.0f), Row(2, 11.0f), Row(3, 19.5f))) + } + } + + test("SPARK-46367: aggregate with GROUP BY subset of partition keys uses GroupPartitionsExec " + + "with allowKeysSubsetOfPartitionKeys") { + // Table partitioned by (id, name): id=1 maps to two distinct partition keys (1,'aa') and + // (1,'bb'). The partial HashAggregate (a PartitioningPreservingUnaryExecNode) projects away + // 'name', narrowing the KP from [id,name] to KP([id], isNarrowed=true, isGrouped=false). + // By default a shuffle is required; with allowKeysSubsetOfPartitionKeys enabled, + // EnsureRequirements inserts GroupPartitionsExec to coalesce both id=1 partitions so the final + // aggregate sees all id=1 partial results in one task -- correct and shuffle-free. + val items_partitions = Array(identity("id"), identity("name")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'bb', 41.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'cc', 10.0, cast('2020-01-01' as timestamp))") + + // Use MAX(name) so that 'name' stays in the scan output and is not column-pruned away. + // Without it, V2ScanPartitioningAndOrdering drops KP([id,name]) when 'name' is absent + // from the output, making the scan report UnknownPartitioning and always shuffling for + // a different reason -- which would mask the narrowing behaviour we are testing here. + val query = + s"SELECT id, MAX(name) AS max_name, COUNT(*) AS cnt FROM testcat.ns.$items GROUP BY id" + val expected = Seq(Row(1L, "bb", 2L), Row(2L, "cc", 1L)) + + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + checkAnswer(sql(query), expected) + assert(collectAllShuffles(sql(query).queryExecution.executedPlan).nonEmpty, + "shuffle required: KP([id,name]) does not satisfy ClusteredDistribution([id])") + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + checkAnswer(sql(query), expected) + assert(collectAllGroupPartitions(sql(query).queryExecution.executedPlan).nonEmpty, + "GroupPartitionsExec expected to coalesce partitions sharing the narrowed key [id]") + } + } + } + + test("SPARK-46367: window with PARTITION BY subset of partition keys uses GroupPartitionsExec " + + "with allowKeysSubsetOfPartitionKeys") { + // Same narrowing mechanism as the aggregate test: the partial HashAggregate (a + // PartitioningPreservingUnaryExecNode) for the inner GROUP BY id, price projects away 'name', + // narrowing KP([id,name]) to KP([id], isNarrowed=true, isGrouped=false). With + // allowKeysSubsetOfPartitionKeys enabled, EnsureRequirements inserts GroupPartitionsExec to + // coalesce both id=1 partitions for the final aggregate. The window PARTITION BY id then sees + // KP([id], isGrouped=true) from the aggregate output and needs no further exchange. + // + // MAX(name) in the subquery keeps 'name' in the scan output so that + // V2ScanPartitioningAndOrdering does not drop the KP before PartitioningPreservingUnaryExecNode + // narrowing can happen. + val items_partitions = Array(identity("id"), identity("name")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 10.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'bb', 20.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'cc', 30.0, cast('2020-01-01' as timestamp))") + + val query = + s"""SELECT id, SUM(price) OVER (PARTITION BY id ORDER BY price) AS running_sum, max_name + |FROM (SELECT id, MAX(name) AS max_name, price FROM testcat.ns.$items GROUP BY id, price) + |""".stripMargin + val expected = Seq(Row(1L, 10.0, "aa"), Row(1L, 30.0, "bb"), Row(2L, 30.0, "cc")) + + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + checkAnswer(sql(query), expected) + assert(collectAllShuffles(sql(query).queryExecution.executedPlan).nonEmpty, + "shuffle required: KP([id,name]) does not satisfy ClusteredDistribution([id])") + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + checkAnswer(sql(query), expected) + assert(collectAllGroupPartitions(sql(query).queryExecution.executedPlan).nonEmpty, + "GroupPartitionsExec expected to coalesce partitions sharing the narrowed key [id]") + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index ec13d48d45f84..db664b04ef08b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, TransformExpression} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashPartitioning, KeyedPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.connector.catalog.functions.{BucketFunction, YearsFunction} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{IntegerType, StringType} class ProjectedOrderingAndPartitioningSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { @@ -210,6 +211,389 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a") assert(outputOrdering.head.sameOrderExpressions.size == 0) } + + test("SPARK-46367: KeyedPartitioning expressions are projected through " + + "PartitioningPreservingUnaryExecNode") { + val a = AttributeReference("a", IntegerType)() + val partitionKeys = Seq(InternalRow(1), InternalRow(2), InternalRow(3)) + val child = DummyLeafExecWithPartitioning( + output = Seq(a), + partitioning = KeyedPartitioning(Seq(a), partitionKeys)) + val b = Alias(a, "b")() + val project = ProjectExec(Seq(b), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions === Seq(b.toAttribute), + "expressions must reference the aliased attribute, not the original") + assert(kp.partitionKeys === + child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys, + "partition keys must be preserved after projection") + case other => + fail(s"Expected KeyedPartitioning, got $other") + } + } + + test("SPARK-46367: narrowing projection on KeyedPartitioning produces projected partition keys") { + // KP([x, y], [(1,1),(1,2),(2,1),(2,2)]) through Project(x) should produce + // KP([x], [(1),(1),(2),(2)]) -- granularity narrows from 2 to 1. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), InternalRow(2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2d)) + val project = ProjectExec(Seq(x), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions === Seq(x), + "narrowed partitioning must keep the projected expression") + assert(kp.numPartitions === 4, + "partition count must be preserved") + case other => + fail(s"Expected KeyedPartitioning, got $other") + } + } + + test("SPARK-46367: narrowing projection with alias shares partition keys across alternatives") { + // KP([x, y], ...) through Project(x, x as x_alias) should produce + // PC(KP([x], keys1d), KP([x_alias], keys1d)) where both KPs reference the same keys1d object. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), InternalRow(2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2d)) + val xAlias = Alias(x, "x_alias")() + val project = ProjectExec(Seq(x, xAlias), child) + + project.outputPartitioning match { + case pc: PartitioningCollection => + val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning]) + assert(kps.forall(_.expressions.length == 1), + "all narrowed KPs must have 1 expression") + assert(kps.map(_.expressions.head.asInstanceOf[Attribute].name).toSet + === Set("x", "x_alias"), + "both the original and aliased attribute must appear") + // The invariant: all KPs in the collection must share the same partitionKeys object. + assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys), + "all KPs must share the same partitionKeys object") + case other => + fail(s"Expected PartitioningCollection, got $other") + } + } + + test("SPARK-46367: narrowing projection from 3 to 2 expressions with alias") { + // KP([x, y, z], keys3d) through Project(x, x as x_alias, y) -- z is dropped. + // Expected: PC(KP([x, y], keys2d), KP([x_alias, y], keys2d)) where both share keys2d. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val z = AttributeReference("z", IntegerType)() + val keys3d = Seq(InternalRow(1, 1, 1), InternalRow(1, 1, 2), InternalRow(1, 2, 1), + InternalRow(2, 1, 1), InternalRow(2, 2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y, z), + partitioning = KeyedPartitioning(Seq(x, y, z), keys3d)) + val xAlias = Alias(x, "x_alias")() + val project = ProjectExec(Seq(x, xAlias, y), child) + + project.outputPartitioning match { + case pc: PartitioningCollection => + val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning]) + assert(kps.forall(_.expressions.length == 2), + "narrowed KPs must have 2 expressions (z dropped, x and y kept)") + assert(kps.map(_.expressions.map(_.asInstanceOf[Attribute].name)).toSet === + Set(Seq("x", "y"), Seq("x_alias", "y"))) + assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys), + "all narrowed KPs must share the same partitionKeys object") + case other => + fail(s"Expected PartitioningCollection, got $other") + } + } + + test("SPARK-46367: non-prefix narrowing projection preserves original KP expression order") { + // KP([x, y, z], keys3d) through Project(z, y) -- x is dropped (non-prefix). + // The output expression order is [z, y], but the projected KP expressions must follow the + // original position order [y, z] because the per-position algorithm iterates positions 0..N-1 + // and z is at position 2, y at position 1. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val z = AttributeReference("z", IntegerType)() + // Projected to positions [1(y), 2(z)]: (1,1),(1,2),(2,1),(1,1),(2,2) -- (1,1) appears twice. + val keys3d = Seq(InternalRow(1, 1, 1), InternalRow(1, 1, 2), InternalRow(1, 2, 1), + InternalRow(2, 1, 1), InternalRow(2, 2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y, z), + partitioning = KeyedPartitioning(Seq(x, y, z), keys3d)) + val project = ProjectExec(Seq(z, y), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions.map(_.asInstanceOf[Attribute].name) === Seq("y", "z"), + "expressions must follow original KP position order [y, z], not output order [z, y]") + assert(kp.isNarrowed, "dropping x must mark the KP as narrowed") + assert(!kp.isGrouped, "projected keys have duplicate (1,1) entries") + case other => + fail(s"Expected KeyedPartitioning, got $other") + } + } + + test("SPARK-46367: non-prefix narrowing projection with alias produces cross-product " + + "in original KP expression order") { + // KP([x, y, z], keys3d) through Project(z, z as z_alias, y) -- x is dropped (non-prefix). + // Projectable positions: y (pos 1) -> [y], z (pos 2) -> [z, z_alias]. + // Cross-product: PC(KP([y, z], keys2d), KP([y, z_alias], keys2d)) -- expressions in + // original position order [y, z/z_alias], NOT in output expression order [z, z_alias, y]. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val z = AttributeReference("z", IntegerType)() + val keys3d = Seq(InternalRow(1, 1, 1), InternalRow(1, 1, 2), InternalRow(1, 2, 1), + InternalRow(2, 1, 1), InternalRow(2, 2, 2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y, z), + partitioning = KeyedPartitioning(Seq(x, y, z), keys3d)) + val zAlias = Alias(z, "z_alias")() + val project = ProjectExec(Seq(z, zAlias, y), child) + + project.outputPartitioning match { + case pc: PartitioningCollection => + val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning]) + assert(kps.forall(_.expressions.length == 2), + "narrowed KPs must have 2 expressions (x dropped)") + assert(kps.map(_.expressions.map(_.asInstanceOf[Attribute].name)).toSet === + Set(Seq("y", "z"), Seq("y", "z_alias")), + "expressions must follow original KP position order [y, z/z_alias], not output order") + assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys), + "all narrowed KPs must share the same partitionKeys object") + assert(kps.forall(_.isNarrowed), "all KPs must be marked as narrowed") + assert(kps.forall(!_.isGrouped), "projected keys have duplicate (1,1) entries") + case other => + fail(s"Expected PartitioningCollection, got $other") + } + } + + test("SPARK-46367: PartitioningCollection KPs with mixed projectability produce correct " + + "per-position cross-product") { + // PC(KP([x,y], keys2d), KP([x,y_alias], keys2d)) through Project(x, x as x_alias, y_alias): + // Per-position projection across both KPs: + // position 0: ExpressionSet({x, x}) = {x} => projectExpression(x) = [x, x_alias] + // position 1: ExpressionSet({y, y_alias}) => projectExpression(y) = [] (y not in output), + // projectExpression(y_alias) = [y_alias] + // => alternatives: [y_alias] + // Cross-product [x, x_alias] x [y_alias] => KP([x,y_alias], keys2d), KP([x_alias,y_alias], + // keys2d). Both share the same keys2d object. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val yAlias = AttributeReference("y_alias", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), InternalRow(2, 2)) + val childPartitioning = PartitioningCollection(Seq( + KeyedPartitioning(Seq(x, y), keys2d), + KeyedPartitioning(Seq(x, yAlias), keys2d))) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y, yAlias), partitioning = childPartitioning) + val xAlias = Alias(x, "x_alias")() + val project = ProjectExec(Seq(x, xAlias, yAlias), child) + + project.outputPartitioning match { + case pc: PartitioningCollection => + val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning]) + assert(kps.forall(_.expressions.length == 2), + "only full-granularity (2-expr) results must be returned; narrowed ones are subsumed") + assert(kps.map(_.expressions.map(_.asInstanceOf[Attribute].name)).toSet === + Set(Seq("x", "y_alias"), Seq("x_alias", "y_alias")), + "both x/y_alias and x_alias/y_alias projections must appear") + // The invariant: all KPs must share the same partitionKeys object. + assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys), + "all KPs must share the same partitionKeys object") + case other => + fail(s"Expected PartitioningCollection, got $other") + } + } + + test("SPARK-46367: narrowing projection with duplicate keys requires " + + "allowKeysSubsetOfPartitionKeys to satisfy ClusteredDistribution") { + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + + // Scenario 1: projected keys have duplicates (x-values: 1, 1, 2) -> isGrouped=false. + // GroupPartitionsExec would merge the two x=1 partitions, carrying the same skew risk as + // allowKeysSubsetOfPartitionKeys. EnsureRequirements calls groupedSatisfies() directly. + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1)) + val project = ProjectExec(Seq(x), + DummyLeafExecWithPartitioning(output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2d))) + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "false") { + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(!kp.isGrouped, "narrowed keys must have duplicates (1 appears twice)") + assert(kp.isNarrowed, "projection must mark the KP as narrowed") + assert(!kp.groupedSatisfies(ClusteredDistribution(Seq(x))), + "narrowed ungrouped KP must not satisfy via groupedSatisfies without config") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.groupedSatisfies(ClusteredDistribution(Seq(x))), + "narrowed ungrouped KP must satisfy via groupedSatisfies when config is enabled") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + + // Scenario 2: projected keys are distinct (x-values: 1, 2, 3) -> isGrouped=true. + // Each projected key maps to exactly one original partition so GroupPartitionsExec does not + // merge any partitions. No skew risk: must satisfy ClusteredDistribution regardless of config. + val keys2dDistinct = Seq(InternalRow(1, 1), InternalRow(2, 2), InternalRow(3, 3)) + val projectDistinct = ProjectExec(Seq(x), + DummyLeafExecWithPartitioning(output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2dDistinct))) + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "false") { + projectDistinct.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.isGrouped, "distinct projected keys must be grouped") + assert(kp.isNarrowed, "projection must mark the KP as narrowed") + assert(kp.satisfies(ClusteredDistribution(Seq(x))), + "grouped narrowed KP must satisfy ClusteredDistribution without config (no merging)") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + } + + test("SPARK-46367: isNarrowed is sticky across chained PartitioningPreservingUnaryExecNodes") { + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + + val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1)) + val innerProject = ProjectExec(Seq(x), + DummyLeafExecWithPartitioning(output = Seq(x, y), + partitioning = KeyedPartitioning(Seq(x, y), keys2d))) + val outerProject = ProjectExec(Seq(x), innerProject) + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "false") { + outerProject.outputPartitioning match { + case kp: KeyedPartitioning => + assert(!kp.isGrouped, "duplicate keys must survive the second hop") + assert(kp.isNarrowed, + "isNarrowed must be sticky: a second hop that keeps all positions must not reset it") + assert(!kp.groupedSatisfies(ClusteredDistribution(Seq(x))), + "narrowed ungrouped KP must still not satisfy ClusteredDistribution without config " + + "after a second PartitioningPreservingUnaryExecNode hop") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + } + + test("SPARK-46367: alias substitution propagates through bucket transform expression") { + // KP([bucket(32, id)], keys1d) through Project(id as pk) should produce + // KP([bucket(32, pk)], keys1d): the alias is pushed into the bucket's column argument. + val id = AttributeReference("id", IntegerType)() + val bucketExpr = TransformExpression(BucketFunction, Seq(id), Some(32)) + val keys1d = Seq(InternalRow(0), InternalRow(1), InternalRow(2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(id), + partitioning = KeyedPartitioning(Seq(bucketExpr), keys1d)) + val pk = Alias(id, "pk")() + val project = ProjectExec(Seq(pk), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions.length === 1) + kp.expressions.head match { + case te: TransformExpression => + assert(te.isSameFunction(bucketExpr), + "bucket function and numBuckets must be preserved after alias substitution") + assert(te.children.head.asInstanceOf[Attribute].name === "pk", + "bucket's column argument must be rewritten to the aliased attribute") + case other => fail(s"Expected TransformExpression, got $other") + } + assert(kp.partitionKeys eq child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys, + "partition keys must be unchanged") + assert(!kp.isNarrowed, "same number of positions: not narrowed") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + + test("SPARK-46367: narrowing projection drops transform when its column is absent") { + // KP([bucket(32, id), years(ts)], keys2d) through Project(id) -- ts is dropped. + // bucket(32, id) is projectable (id in output); years(ts) is not (ts absent). + // Result: KP([bucket(32, id)], keys1d, isNarrowed=true, isGrouped=false). + val id = AttributeReference("id", IntegerType)() + val ts = AttributeReference("ts", IntegerType)() + val bucketExpr = TransformExpression(BucketFunction, Seq(id), Some(32)) + val yearsExpr = TransformExpression(YearsFunction, Seq(ts)) + // Projected to position [0] (bucket): (0),(1),(0) -- bucket value 0 appears twice. + val keys2d = Seq(InternalRow(0, 2020), InternalRow(1, 2020), InternalRow(0, 2021)) + val child = DummyLeafExecWithPartitioning( + output = Seq(id, ts), + partitioning = KeyedPartitioning(Seq(bucketExpr, yearsExpr), keys2d)) + val project = ProjectExec(Seq(id), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions.length === 1, "years(ts) must be dropped: ts not in output") + kp.expressions.head match { + case te: TransformExpression => + assert(te.isSameFunction(bucketExpr), "bucket must be the surviving expression") + assert(te.children.head.asInstanceOf[Attribute].name === "id") + case other => fail(s"Expected TransformExpression, got $other") + } + assert(kp.isNarrowed, "dropping years(ts) position must mark the KP as narrowed") + assert(!kp.isGrouped, "projected bucket keys (0,1,0) have duplicates") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } + + test("SPARK-46367: alias substitution rewrites years transform while preserving bucket") { + // KP([bucket(32, id), years(ts)], keys2d) through Project(id, ts as ts_alias). + // bucket(32, id) keeps id (no alias for id); years(ts) is rewritten to years(ts_alias). + // Result: KP([bucket(32, id), years(ts_alias)], keys2d) -- not narrowed. + val id = AttributeReference("id", IntegerType)() + val ts = AttributeReference("ts", IntegerType)() + val bucketExpr = TransformExpression(BucketFunction, Seq(id), Some(32)) + val yearsExpr = TransformExpression(YearsFunction, Seq(ts)) + val keys2d = Seq(InternalRow(0, 2020), InternalRow(1, 2020), InternalRow(0, 2021)) + val child = DummyLeafExecWithPartitioning( + output = Seq(id, ts), + partitioning = KeyedPartitioning(Seq(bucketExpr, yearsExpr), keys2d)) + val tsAlias = Alias(ts, "ts_alias")() + val project = ProjectExec(Seq(id, tsAlias), child) + + project.outputPartitioning match { + case kp: KeyedPartitioning => + assert(kp.expressions.length === 2, "both positions must survive") + kp.expressions(0) match { + case te: TransformExpression => + assert(te.isSameFunction(bucketExpr)) + assert(te.children.head.asInstanceOf[Attribute].name === "id", + "bucket's argument must remain id (no alias for id in this projection)") + case other => fail(s"Expected TransformExpression at pos 0, got $other") + } + kp.expressions(1) match { + case te: TransformExpression => + assert(te.isSameFunction(yearsExpr)) + assert(te.children.head.asInstanceOf[Attribute].name === "ts_alias", + "years() argument must be rewritten to ts_alias") + case other => fail(s"Expected TransformExpression at pos 1, got $other") + } + assert(kp.partitionKeys eq child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys, + "partition keys must be unchanged") + assert(!kp.isNarrowed, "both positions projected: not narrowed") + case other => fail(s"Expected KeyedPartitioning, got $other") + } + } +} + +private case class DummyLeafExecWithPartitioning( + output: Seq[Attribute], + partitioning: Partitioning + ) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = null + override def outputPartitioning: Partitioning = partitioning } private case class DummyLeafPlanExec(output: Seq[Attribute]) extends LeafExecNode { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index 2ff4652646cca..1e35985f50491 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -1129,7 +1129,7 @@ class EnsureRequirementsSuite extends SharedSparkSession { EnsureRequirements.apply(smjExec) match { case ShuffledHashJoinExec(_, _, _, _, _, DummySparkPlan(_, _, left: KeyedPartitioning, _, _), - ShuffleExchangeExec(KeyedPartitioning(attrs, pks, _), + ShuffleExchangeExec(KeyedPartitioning(attrs, pks, _, _), DummySparkPlan(_, _, SinglePartition, _, _), _, _), _) => assert(left.expressions == a1 :: Nil) assert(attrs == a1 :: Nil)