From 18098243f854afe237ad0363b1805803b6631a93 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 10:37:25 -0700 Subject: [PATCH] feat: Set ffi_safe flag conditionally for native_iceberg_compat scans For native_iceberg_compat scans that have no partition columns and no missing columns, set arrow_ffi_safe=true on the Scan protobuf. In this case all Arrow arrays come from parquet file data with non-reused buffers, so a cheap clone suffices instead of a deep copy on the native side. Co-Authored-By: Claude Opus 4.5 --- .../apache/spark/sql/comet/operators.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 6f33467efe..43b7a9e2f7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1904,6 +1904,30 @@ case class CometSortMergeJoinExec( } object CometScanWrapper extends CometSink[SparkPlan] { + override def convert( + op: SparkPlan, + builder: Operator.Builder, + childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { + val result = super.convert(op, builder, childOp: _*) + result.map { operator => + op match { + case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + val hasPartitionColumns = scan.relation.partitionSchema.nonEmpty + val hasMissingColumns = scan.requiredSchema.fields.exists { field => + !scan.relation.dataSchema.fieldNames.contains(field.name) + } + val ffiSafe = !hasPartitionColumns && !hasMissingColumns + if (ffiSafe) { + val scanProto = operator.getScan.toBuilder.setArrowFfiSafe(true).build() + operator.toBuilder.setScan(scanProto).build() + } else { + operator + } + case _ => operator + } + } + } + override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec = { CometScanWrapper(nativeOp, op) }