From 21cf8e49d14fe3f57d6227da2c184a26281a203d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 4 Feb 2026 12:36:39 -0700 Subject: [PATCH 1/3] chore: Remove dead code paths for deprecated native_comet scan Remove code that handled the `native_comet` scan implementation, which was deprecated in 0.9.0 and is no longer a valid config option. Changes: - Remove match arm and nativeCometScan method from CometScanRule - Remove V2 ParquetScan handling for native_comet - Remove unused selectScan method - Simplify type checking in CometScanTypeChecker - Remove native_comet mutable buffer check from EliminateRedundantTransitions Co-Authored-By: Claude Opus 4.5 --- .../apache/comet/rules/CometScanRule.scala | 88 +------------------ .../rules/EliminateRedundantTransitions.scala | 6 +- 2 files changed, 3 insertions(+), 91 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 45faa4d940..a267af8544 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -172,8 +172,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) case SCAN_NATIVE_ICEBERG_COMPAT => nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) - case SCAN_NATIVE_COMET => - nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) } case _ => @@ -214,47 +212,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT)) } - private def nativeCometScan( - session: SparkSession, - scanExec: FileSourceScanExec, - r: HadoopFsRelation, - hadoopConf: Configuration): Option[SparkPlan] = { - if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) { - return None - } - Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET)) - } - private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = { scanExec.scan match { - case scan: ParquetScan if COMET_NATIVE_SCAN_IMPL.get() == SCAN_NATIVE_COMET => - val fallbackReasons = new ListBuffer[String]() - val schemaSupported = - CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons) - if (!schemaSupported) { - fallbackReasons += s"Schema ${scan.readDataSchema} is not supported" - } - - val partitionSchemaSupported = - CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons) - if (!partitionSchemaSupported) { - fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported" - } - - if (scan.pushedAggregate.nonEmpty) { - fallbackReasons += "Comet does not support pushed aggregate" - } - - if (schemaSupported && partitionSchemaSupported && scan.pushedAggregate.isEmpty) { - val cometScan = CometParquetScan(session, scanExec.scan.asInstanceOf[ParquetScan]) - CometBatchScanExec( - scanExec.copy(scan = cometScan), - runtimeFilters = scanExec.runtimeFilters) - } else { - withInfos(scanExec, fallbackReasons.toSet) - } - case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() => val fallbackReasons = new ListBuffer[String]() val schemaSupported = @@ -641,48 +601,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com } } - private def selectScan( - scanExec: FileSourceScanExec, - partitionSchema: StructType, - hadoopConf: Configuration): String = { - - val fallbackReasons = new ListBuffer[String]() - - // native_iceberg_compat only supports local filesystem and S3 - if (scanExec.relation.inputFiles - .forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) { - - val filePath = scanExec.relation.inputFiles.headOption - if (filePath.exists(_.startsWith("s3a://"))) { - validateObjectStoreConfig(filePath.get, hadoopConf, fallbackReasons) - } - } else { - fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3" - } - - val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT) - val schemaSupported = - typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) - val partitionSchemaSupported = - typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) - - val cometExecEnabled = COMET_EXEC_ENABLED.get() - if (!cometExecEnabled) { - fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true" - } - - if (cometExecEnabled && schemaSupported && partitionSchemaSupported && - fallbackReasons.isEmpty) { - logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT") - SCAN_NATIVE_ICEBERG_COMPAT - } else { - logInfo( - s"Auto scan mode falling back to $SCAN_NATIVE_COMET due to " + - s"${fallbackReasons.mkString(", ")}") - SCAN_NATIVE_COMET - } - } - private def isDynamicPruningFilter(e: Expression): Boolean = e.exists(_.isInstanceOf[PlanExpression[_]]) @@ -725,16 +643,12 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C name: String, fallbackReasons: ListBuffer[String]): Boolean = { dt match { - case ShortType - if scanImpl != CometConf.SCAN_NATIVE_COMET && - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() => + case ShortType if CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() => fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " + s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " + "native execution if your data does not contain unsigned small integers. " + CometConf.COMPAT_GUIDE false - case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => - false case dt if isStringCollationType(dt) => // we don't need specific support for collation in scans, but this // is a convenient place to force the whole query to fall back to Spark for now diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index d1c3b07677..ec33363525 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -155,7 +155,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa * with such scans because the buffers may be modified after C2R reads them. * * This includes: - * - CometScanExec with native_comet scan implementation (V1 path) - uses BatchReader * - CometScanExec with native_iceberg_compat and partition columns - uses * ConstantColumnReader * - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses BatchReader @@ -167,9 +166,8 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa case _ => op.exists { case scan: CometScanExec => - scan.scanImpl == CometConf.SCAN_NATIVE_COMET || - (scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT && - scan.relation.partitionSchema.nonEmpty) + scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT && + scan.relation.partitionSchema.nonEmpty case scan: CometBatchScanExec => scan.scan.isInstanceOf[CometParquetScan] case _ => false } From 375fee0eacab07f6c6475999b8a44fbdaa922d90 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 4 Feb 2026 15:44:09 -0700 Subject: [PATCH 2/3] remove unused import --- spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index a267af8544..da7de8f933 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -57,8 +57,6 @@ import org.apache.comet.shims.CometTypeShim */ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with CometTypeShim { - import CometScanRule._ - private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get() override def apply(plan: SparkPlan): SparkPlan = { From e3c2502a0db7960daa1771e3bd7c0fc80dd9cd6a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 06:23:33 -0700 Subject: [PATCH 3/3] spotless --- .../src/main/scala/org/apache/comet/rules/CometScanRule.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index da7de8f933..82c6432e3d 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -47,7 +46,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, wi import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.objectstore.NativeConfig -import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet} +import org.apache.comet.parquet.{Native, SupportsComet} import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} import org.apache.comet.serde.operator.CometNativeScan import org.apache.comet.shims.CometTypeShim