diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 6f33467efe..43b7a9e2f7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1904,6 +1904,30 @@ case class CometSortMergeJoinExec( } object CometScanWrapper extends CometSink[SparkPlan] { + override def convert( + op: SparkPlan, + builder: Operator.Builder, + childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { + val result = super.convert(op, builder, childOp: _*) + result.map { operator => + op match { + case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + val hasPartitionColumns = scan.relation.partitionSchema.nonEmpty + val hasMissingColumns = scan.requiredSchema.fields.exists { field => + !scan.relation.dataSchema.fieldNames.contains(field.name) + } + val ffiSafe = !hasPartitionColumns && !hasMissingColumns + if (ffiSafe) { + val scanProto = operator.getScan.toBuilder.setArrowFfiSafe(true).build() + operator.toBuilder.setScan(scanProto).build() + } else { + operator + } + case _ => operator + } + } + } + override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec = { CometScanWrapper(nativeOp, op) }