Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 2 additions & 91 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand All @@ -47,7 +46,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, wi
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
import org.apache.comet.parquet.{Native, SupportsComet}
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
import org.apache.comet.serde.operator.CometNativeScan
import org.apache.comet.shims.CometTypeShim
Expand All @@ -57,8 +56,6 @@ import org.apache.comet.shims.CometTypeShim
*/
case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with CometTypeShim {

import CometScanRule._

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

override def apply(plan: SparkPlan): SparkPlan = {
Expand Down Expand Up @@ -172,8 +169,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_ICEBERG_COMPAT =>
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_COMET =>
nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
}

case _ =>
Expand Down Expand Up @@ -214,47 +209,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT))
}

private def nativeCometScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET))
}

private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {

scanExec.scan match {
case scan: ParquetScan if COMET_NATIVE_SCAN_IMPL.get() == SCAN_NATIVE_COMET =>
val fallbackReasons = new ListBuffer[String]()
val schemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons)
if (!schemaSupported) {
fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
}

val partitionSchemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons)
if (!partitionSchemaSupported) {
fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported"
}

if (scan.pushedAggregate.nonEmpty) {
fallbackReasons += "Comet does not support pushed aggregate"
}

if (schemaSupported && partitionSchemaSupported && scan.pushedAggregate.isEmpty) {
val cometScan = CometParquetScan(session, scanExec.scan.asInstanceOf[ParquetScan])
CometBatchScanExec(
scanExec.copy(scan = cometScan),
runtimeFilters = scanExec.runtimeFilters)
} else {
withInfos(scanExec, fallbackReasons.toSet)
}

case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() =>
val fallbackReasons = new ListBuffer[String]()
val schemaSupported =
Expand Down Expand Up @@ -641,48 +598,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
}
}

private def selectScan(
scanExec: FileSourceScanExec,
partitionSchema: StructType,
hadoopConf: Configuration): String = {

val fallbackReasons = new ListBuffer[String]()

// native_iceberg_compat only supports local filesystem and S3
if (scanExec.relation.inputFiles
.forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) {

val filePath = scanExec.relation.inputFiles.headOption
if (filePath.exists(_.startsWith("s3a://"))) {
validateObjectStoreConfig(filePath.get, hadoopConf, fallbackReasons)
}
} else {
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3"
}

val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
val partitionSchemaSupported =
typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)

val cometExecEnabled = COMET_EXEC_ENABLED.get()
if (!cometExecEnabled) {
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true"
}

if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
fallbackReasons.isEmpty) {
logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
SCAN_NATIVE_ICEBERG_COMPAT
} else {
logInfo(
s"Auto scan mode falling back to $SCAN_NATIVE_COMET due to " +
s"${fallbackReasons.mkString(", ")}")
SCAN_NATIVE_COMET
}
}

private def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])

Expand Down Expand Up @@ -725,16 +640,12 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
name: String,
fallbackReasons: ListBuffer[String]): Boolean = {
dt match {
case ShortType
if scanImpl != CometConf.SCAN_NATIVE_COMET &&
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
case ShortType if CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " +
s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " +
"native execution if your data does not contain unsigned small integers. " +
CometConf.COMPAT_GUIDE
false
case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET =>
false
case dt if isStringCollationType(dt) =>
// we don't need specific support for collation in scans, but this
// is a convenient place to force the whole query to fall back to Spark for now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
* with such scans because the buffers may be modified after C2R reads them.
*
* This includes:
* - CometScanExec with native_comet scan implementation (V1 path) - uses BatchReader
* - CometScanExec with native_iceberg_compat and partition columns - uses
* ConstantColumnReader
* - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses BatchReader
Expand All @@ -167,9 +166,8 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
case _ =>
op.exists {
case scan: CometScanExec =>
scan.scanImpl == CometConf.SCAN_NATIVE_COMET ||
(scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
scan.relation.partitionSchema.nonEmpty)
scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
scan.relation.partitionSchema.nonEmpty
case scan: CometBatchScanExec => scan.scan.isInstanceOf[CometParquetScan]
case _ => false
}
Expand Down
Loading