Skip to content

Commit 67325e9

Browse files
committed
add new spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled config
1 parent cf295d8 commit 67325e9

2 files changed

Lines changed: 21 additions & 4 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2165,6 +2165,18 @@ object SQLConf {
21652165
.booleanConf
21662166
.createWithDefault(false)
21672167

2168+
val V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED =
2169+
buildConf("spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled")
2170+
.doc("When enabled, Spark derives output ordering from the partition key expressions of " +
2171+
"a V2 data source that reports a KeyedPartitioning but does not report explicit ordering " +
2172+
"via SupportsReportOrdering. Within a single partition all rows share the same key " +
2173+
s"value, so the data is trivially sorted by those expressions. Requires " +
2174+
s"${V2_BUCKETING_ENABLED.key} to be enabled.")
2175+
.version("4.2.0")
2176+
.withBindingPolicy(ConfigBindingPolicy.SESSION)
2177+
.booleanConf
2178+
.createWithDefault(true)
2179+
21682180
val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
21692181
.doc("The maximum number of buckets allowed.")
21702182
.version("2.4.0")
@@ -7731,6 +7743,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
77317743
def v2BucketingAllowSorting: Boolean =
77327744
getConf(SQLConf.V2_BUCKETING_SORTING_ENABLED)
77337745

7746+
def v2BucketingPartitionKeyOrderingEnabled: Boolean =
7747+
getConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED)
7748+
77347749
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
77357750
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
77367751

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
106106
/**
107107
* Returns the output ordering for this scan. When the source reports ordering via
108108
* `SupportsReportOrdering`, that ordering is returned as-is. Otherwise, when the output
109-
* partitioning is a `KeyedPartitioning`, each partition contains rows where the key expressions
110-
* evaluate to a single constant value, so the data is trivially sorted by those expressions
111-
* within the partition.
109+
* partitioning is a `KeyedPartitioning` and
110+
* `spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled` is on, each partition
111+
* contains rows where the key expressions evaluate to a single constant value, so the data
112+
* is trivially sorted by those expressions within the partition.
112113
*/
113114
override def outputOrdering: Seq[SortOrder] = {
114115
(ordering, outputPartitioning) match {
115116
case (Some(o), _) => o
116-
case (_, k: KeyedPartitioning) => k.expressions.map(SortOrder(_, Ascending))
117+
case (_, k: KeyedPartitioning) if conf.v2BucketingPartitionKeyOrderingEnabled =>
118+
k.expressions.map(SortOrder(_, Ascending))
117119
case _ => Seq.empty
118120
}
119121
}

0 commit comments

Comments
 (0)