From 4c7318a405aff7994ab2c1a06188ae77a5bd9968 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 19:31:03 -0700 Subject: [PATCH 01/23] [EXPERIMENTAL] Split Iceberg FileScanTask serialization for reduced network transfer This PR implements split serialization for Iceberg native scans to reduce network transfer and deserialization overhead when scanning tables with many partitions. **Problem:** Currently, ALL partition metadata is serialized into a single IcebergScan message sent to every executor task. Each task only uses one partition's data but deserializes all partitions. For 10,000 partitions, this means each task receives ~10,000x more file_partitions data than needed. **Solution:** Split serialization into: - Common data (IcebergScanCommon): pools, metadata, catalog properties - serialized once, captured in RDD closure - Per-partition data (IcebergFilePartition[]): file scan tasks - one per partition, stored in Partition objects Each task now receives only: commonData + its own partitionBytes **Changes:** - Proto: Added IcebergScanCommon message and split_mode fields to IcebergScan - Rust: Handle split_mode in planner, added parse_file_scan_tasks_from_common() - Scala: New CometIcebergSplitRDD with custom Partition holding per-partition bytes - Scala: Thread-local to pass split data from convert() to createExec() Co-Authored-By: Claude Opus 4.5 --- native/core/src/execution/planner.rs | 261 ++++++++++++++++-- native/proto/src/proto/operator.proto | 32 +++ .../operator/CometIcebergNativeScan.scala | 53 +++- .../comet/CometIcebergNativeScanExec.scala | 53 +++- .../sql/comet/CometIcebergSplitRDD.scala | 126 +++++++++ 5 files changed, 504 insertions(+), 21 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 44ff20a44f..e458fc6c11 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1132,26 +1132,59 @@ impl PhysicalPlanner { )) } OpStruct::IcebergScan(scan) => { - let required_schema: SchemaRef = - convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); - - let catalog_properties: HashMap = scan - .catalog_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); + // Determine if we're in split mode or legacy mode + let (required_schema, catalog_properties, metadata_location, tasks) = + if scan.split_mode.unwrap_or(false) { + // Split mode: extract from embedded common + single partition + let common = scan.common.as_ref().ok_or_else(|| { + GeneralError( + "split_mode=true but common data missing".into(), + ) + })?; + let partition = scan.partition.as_ref().ok_or_else(|| { + GeneralError( + "split_mode=true but partition data missing".into(), + ) + })?; + + let schema = + convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); + let catalog_props: HashMap = common + .catalog_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let metadata_loc = common.metadata_location.clone(); + let tasks = parse_file_scan_tasks_from_common( + common, + &partition.file_scan_tasks, + )?; - let metadata_location = scan.metadata_location.clone(); + (schema, catalog_props, metadata_loc, tasks) + } else { + // Legacy mode: all data in single message + let schema = + convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); + let catalog_props: HashMap = scan + .catalog_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let metadata_loc = scan.metadata_location.clone(); + + debug_assert!( + !scan.file_partitions.is_empty(), + "IcebergScan must have at least one file partition." + ); + + let tasks = parse_file_scan_tasks( + scan, + &scan.file_partitions[self.partition as usize].file_scan_tasks, + )?; - debug_assert!( - !scan.file_partitions.is_empty(), - "IcebergScan must have at least one file partition. This indicates a bug in Scala serialization." - ); + (schema, catalog_props, metadata_loc, tasks) + }; - let tasks = parse_file_scan_tasks( - scan, - &scan.file_partitions[self.partition as usize].file_scan_tasks, - )?; let file_task_groups = vec![tasks]; let iceberg_scan = IcebergScanExec::new( @@ -2941,6 +2974,200 @@ fn parse_file_scan_tasks( results } +/// Parse FileScanTasks from IcebergScanCommon (split mode). +/// Similar to parse_file_scan_tasks but reads pools from IcebergScanCommon. +fn parse_file_scan_tasks_from_common( + proto_common: &spark_operator::IcebergScanCommon, + proto_tasks: &[spark_operator::IcebergFileScanTask], +) -> Result, ExecutionError> { + // Build caches upfront from common data + let schema_cache: Vec> = proto_common + .schema_pool + .iter() + .map(|json| { + serde_json::from_str(json).map(Arc::new).map_err(|e| { + ExecutionError::GeneralError(format!( + "Failed to parse schema JSON from pool: {}", + e + )) + }) + }) + .collect::, _>>()?; + + let partition_spec_cache: Vec>> = proto_common + .partition_spec_pool + .iter() + .map(|json| { + serde_json::from_str::(json) + .ok() + .map(Arc::new) + }) + .collect(); + + let name_mapping_cache: Vec>> = proto_common + .name_mapping_pool + .iter() + .map(|json| { + serde_json::from_str::(json) + .ok() + .map(Arc::new) + }) + .collect(); + + let delete_files_cache: Vec> = proto_common + .delete_files_pool + .iter() + .map(|list| { + list.delete_files + .iter() + .map(|del| { + let file_type = match del.content_type.as_str() { + "POSITION_DELETES" => iceberg::spec::DataContentType::PositionDeletes, + "EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes, + other => { + return Err(GeneralError(format!( + "Invalid delete content type '{}'", + other + ))) + } + }; + + Ok(iceberg::scan::FileScanTaskDeleteFile { + file_path: del.file_path.clone(), + file_type, + partition_spec_id: del.partition_spec_id, + equality_ids: if del.equality_ids.is_empty() { + None + } else { + Some(del.equality_ids.clone()) + }, + }) + }) + .collect::, ExecutionError>>() + }) + .collect::, _>>()?; + + let results: Result, _> = proto_tasks + .iter() + .map(|proto_task| { + let schema_ref = Arc::clone( + schema_cache + .get(proto_task.schema_idx as usize) + .ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Invalid schema_idx: {} (pool size: {})", + proto_task.schema_idx, + schema_cache.len() + )) + })?, + ); + + let data_file_format = iceberg::spec::DataFileFormat::Parquet; + + let deletes = if let Some(idx) = proto_task.delete_files_idx { + delete_files_cache + .get(idx as usize) + .ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Invalid delete_files_idx: {} (pool size: {})", + idx, + delete_files_cache.len() + )) + })? + .clone() + } else { + vec![] + }; + + let bound_predicate = if let Some(idx) = proto_task.residual_idx { + proto_common + .residual_pool + .get(idx as usize) + .and_then(convert_spark_expr_to_predicate) + .map( + |pred| -> Result { + pred.bind(Arc::clone(&schema_ref), true).map_err(|e| { + ExecutionError::GeneralError(format!( + "Failed to bind predicate to schema: {}", + e + )) + }) + }, + ) + .transpose()? + } else { + None + }; + + let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { + let partition_data_proto = proto_common + .partition_data_pool + .get(partition_data_idx as usize) + .ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Invalid partition_data_idx: {} (pool size: {})", + partition_data_idx, + proto_common.partition_data_pool.len() + )) + })?; + + match partition_data_to_struct(partition_data_proto) { + Ok(s) => Some(s), + Err(e) => { + return Err(ExecutionError::GeneralError(format!( + "Failed to deserialize partition data: {}", + e + ))) + } + } + } else { + None + }; + + let partition_spec = proto_task + .partition_spec_idx + .and_then(|idx| partition_spec_cache.get(idx as usize)) + .and_then(|opt| opt.clone()); + + let name_mapping = proto_task + .name_mapping_idx + .and_then(|idx| name_mapping_cache.get(idx as usize)) + .and_then(|opt| opt.clone()); + + let project_field_ids = proto_common + .project_field_ids_pool + .get(proto_task.project_field_ids_idx as usize) + .ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Invalid project_field_ids_idx: {} (pool size: {})", + proto_task.project_field_ids_idx, + proto_common.project_field_ids_pool.len() + )) + })? + .field_ids + .clone(); + + Ok(iceberg::scan::FileScanTask { + data_file_path: proto_task.data_file_path.clone(), + start: proto_task.start, + length: proto_task.length, + record_count: proto_task.record_count, + data_file_format, + schema: schema_ref, + project_field_ids, + predicate: bound_predicate, + deletes, + partition, + partition_spec, + name_mapping, + case_sensitive: false, + }) + }) + .collect(); + + results +} + /// Create CASE WHEN expression and add casting as needed fn create_case_expr( when_then_pairs: Vec<(Arc, Arc)>, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 73c087cf36..43a32c935e 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -156,6 +156,28 @@ message PartitionData { repeated PartitionValue values = 1; } +// Common data shared by all partitions in split mode (sent once, captured in closure) +message IcebergScanCommon { + // Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.) + map catalog_properties = 1; + + // Table metadata file path for FileIO initialization + string metadata_location = 2; + + // Schema to read + repeated SparkStructField required_schema = 3; + + // Deduplication pools (must contain ALL entries for cross-partition deduplication) + repeated string schema_pool = 4; + repeated string partition_type_pool = 5; + repeated string partition_spec_pool = 6; + repeated string name_mapping_pool = 7; + repeated ProjectFieldIdList project_field_ids_pool = 8; + repeated PartitionData partition_data_pool = 9; + repeated DeleteFileList delete_files_pool = 10; + repeated spark.spark_expression.Expr residual_pool = 11; +} + message IcebergScan { // Schema to read repeated SparkStructField required_schema = 1; @@ -178,6 +200,16 @@ message IcebergScan { repeated PartitionData partition_data_pool = 10; repeated DeleteFileList delete_files_pool = 11; repeated spark.spark_expression.Expr residual_pool = 12; + + // Split mode: when true, common data and single partition are embedded + // instead of all partitions in file_partitions + optional bool split_mode = 20; + + // Embedded common data for split mode (pools, metadata, catalog props) + optional IcebergScanCommon common = 21; + + // Single partition's file tasks for split mode + optional IcebergFilePartition partition = 22; } // Helper message for deduplicating field ID lists diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 7238f8ae8c..80bd6ea46d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec} import org.apache.spark.sql.types._ import org.apache.comet.ConfigEntry -import org.apache.comet.iceberg.IcebergReflection +import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} @@ -41,6 +41,12 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit override def enabledConfig: Option[ConfigEntry[Boolean]] = None + /** Thread-local storage for split serialization data. */ + private case class SplitData(commonBytes: Array[Byte], perPartitionBytes: Array[Array[Byte]]) + private val splitDataThreadLocal = new java.lang.ThreadLocal[Option[SplitData]] { + override def initialValue(): Option[SplitData] = None + } + /** * Constants specific to Iceberg expression conversion (not in shared IcebergReflection). */ @@ -1022,10 +1028,36 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit s"$partitionDataPoolBytes bytes (protobuf)") } + // Build split serialization data for efficient per-partition transfer + val builtScan = icebergScanBuilder.build() + buildAndStoreSplitData(builtScan) + builder.clearChildren() Some(builder.setIcebergScan(icebergScanBuilder).build()) } + /** Builds split data (IcebergScanCommon + per-partition bytes) and stores in thread-local. */ + private def buildAndStoreSplitData(builtScan: OperatorOuterClass.IcebergScan): Unit = { + val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() + + commonBuilder.setMetadataLocation(builtScan.getMetadataLocation) + commonBuilder.putAllCatalogProperties(builtScan.getCatalogPropertiesMap) + builtScan.getRequiredSchemaList.forEach(f => commonBuilder.addRequiredSchema(f)) + builtScan.getSchemaPoolList.forEach(s => commonBuilder.addSchemaPool(s)) + builtScan.getPartitionTypePoolList.forEach(s => commonBuilder.addPartitionTypePool(s)) + builtScan.getPartitionSpecPoolList.forEach(s => commonBuilder.addPartitionSpecPool(s)) + builtScan.getNameMappingPoolList.forEach(s => commonBuilder.addNameMappingPool(s)) + builtScan.getProjectFieldIdsPoolList.forEach(p => commonBuilder.addProjectFieldIdsPool(p)) + builtScan.getPartitionDataPoolList.forEach(p => commonBuilder.addPartitionDataPool(p)) + builtScan.getDeleteFilesPoolList.forEach(d => commonBuilder.addDeleteFilesPool(d)) + builtScan.getResidualPoolList.forEach(r => commonBuilder.addResidualPool(r)) + + val commonBytes = commonBuilder.build().toByteArray + val perPartitionBytes = builtScan.getFilePartitionsList.asScala.map(_.toByteArray).toArray + + splitDataThreadLocal.set(Some(SplitData(commonBytes, perPartitionBytes))) + } + override def createExec(nativeOp: Operator, op: CometBatchScanExec): CometNativeExec = { import org.apache.spark.sql.comet.CometIcebergNativeScanExec @@ -1039,7 +1071,22 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Extract metadataLocation from the native operator val metadataLocation = nativeOp.getIcebergScan.getMetadataLocation - // Create the CometIcebergNativeScanExec using the companion object's apply method - CometIcebergNativeScanExec(nativeOp, op.wrapped, op.session, metadataLocation, metadata) + // Retrieve split data from thread-local (set during convert()) + val splitData = splitDataThreadLocal.get() + splitDataThreadLocal.remove() + + splitData match { + case Some(data) => + CometIcebergNativeScanExec( + nativeOp, + op.wrapped, + op.session, + metadataLocation, + metadata, + data.commonBytes, + data.perPartitionBytes) + case None => + CometIcebergNativeScanExec(nativeOp, op.wrapped, op.session, metadataLocation, metadata) + } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 223ae4fbb7..6817611085 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -21,12 +21,14 @@ package org.apache.spark.sql.comet import scala.jdk.CollectionConverters._ +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.AccumulatorV2 import com.google.common.base.Objects @@ -49,7 +51,11 @@ case class CometIcebergNativeScanExec( override val serializedPlanOpt: SerializedPlan, metadataLocation: String, numPartitions: Int, - @transient nativeIcebergScanMetadata: CometIcebergNativeScanMetadata) + @transient nativeIcebergScanMetadata: CometIcebergNativeScanMetadata, + // Split mode: serialized IcebergScanCommon (captured in closure, sent with task) + commonData: Array[Byte] = Array.empty, + // Split mode: serialized IcebergFilePartition per partition (transient) + @transient perPartitionData: Array[Array[Byte]] = Array.empty) extends CometLeafExec { override val supportsColumnar: Boolean = true @@ -146,6 +152,21 @@ case class CometIcebergNativeScanExec( baseMetrics ++ icebergMetrics + ("num_splits" -> numSplitsMetric) } + /** Uses split mode RDD when split data is available, otherwise falls back to legacy mode. */ + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + if (commonData.nonEmpty && perPartitionData.nonEmpty) { + val nativeMetrics = CometMetricNode.fromCometPlan(this) + CometIcebergSplitRDD( + sparkContext, + commonData, + perPartitionData, + output.length, + nativeMetrics) + } else { + super.doExecuteColumnar() + } + } + override protected def doCanonicalize(): CometIcebergNativeScanExec = { CometIcebergNativeScanExec( nativeOp, @@ -229,4 +250,34 @@ object CometIcebergNativeScanExec { scanExec.logicalLink.foreach(exec.setLogicalLink) exec } + + /** Creates a CometIcebergNativeScanExec with split serialization data. */ + def apply( + nativeOp: Operator, + scanExec: BatchScanExec, + session: SparkSession, + metadataLocation: String, + nativeIcebergScanMetadata: CometIcebergNativeScanMetadata, + commonData: Array[Byte], + perPartitionData: Array[Array[Byte]]): CometIcebergNativeScanExec = { + + val numParts = scanExec.outputPartitioning match { + case p: KeyGroupedPartitioning => p.numPartitions + case _ => scanExec.inputRDD.getNumPartitions + } + + val exec = CometIcebergNativeScanExec( + nativeOp, + scanExec.output, + scanExec, + SerializedPlan(None), + metadataLocation, + numParts, + nativeIcebergScanMetadata, + commonData, + perPartitionData) + + scanExec.logicalLink.foreach(exec.setLogicalLink) + exec + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala new file mode 100644 index 0000000000..f5131bbc50 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.comet.CometExecIterator +import org.apache.comet.serde.OperatorOuterClass +import org.apache.comet.serde.OperatorOuterClass.IcebergFilePartition + +/** + * Custom partition for split Iceberg serialization. Holds only bytes for this partition's file + * scan tasks. + */ +private[spark] class CometIcebergSplitPartition( + override val index: Int, + val partitionBytes: Array[Byte]) + extends Partition + +/** + * RDD for split Iceberg scan serialization that avoids sending all partition data to every task. + * + * With split serialization: + * - commonData: serialized IcebergScanCommon (pools, metadata) - captured in closure + * - perPartitionData: Array of serialized IcebergFilePartition - populates Partition objects + * + * Each task receives commonData (via closure) + partitionBytes (via Partition), combines them + * into an IcebergScan with split_mode=true, and passes to native execution. + */ +private[spark] class CometIcebergSplitRDD( + sc: SparkContext, + commonData: Array[Byte], + @transient perPartitionData: Array[Array[Byte]], + numParts: Int, + var computeFunc: (Array[Byte], CometMetricNode, Int, Int) => Iterator[ColumnarBatch]) + extends RDD[ColumnarBatch](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { + perPartitionData.zipWithIndex.map { case (bytes, idx) => + new CometIcebergSplitPartition(idx, bytes) + } + } + + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val partition = split.asInstanceOf[CometIcebergSplitPartition] + + val combinedPlan = + CometIcebergSplitRDD.buildCombinedPlan(commonData, partition.partitionBytes) + + // Use cached numParts to avoid triggering getPartitions() on executor + val it = computeFunc(combinedPlan, null, numParts, partition.index) + + Option(context).foreach { ctx => + ctx.addTaskCompletionListener[Unit] { _ => + it.asInstanceOf[CometExecIterator].close() + } + } + + it + } +} + +object CometIcebergSplitRDD { + + def apply( + sc: SparkContext, + commonData: Array[Byte], + perPartitionData: Array[Array[Byte]], + numOutputCols: Int, + nativeMetrics: CometMetricNode): CometIcebergSplitRDD = { + + // Create compute function that captures nativeMetrics in its closure + val computeFunc = + (combinedPlan: Array[Byte], _: CometMetricNode, numParts: Int, partIndex: Int) => { + new CometExecIterator( + CometExec.newIterId, + Seq.empty, + numOutputCols, + combinedPlan, + nativeMetrics, + numParts, + partIndex, + None, + Seq.empty) + } + + val numParts = perPartitionData.length + new CometIcebergSplitRDD(sc, commonData, perPartitionData, numParts, computeFunc) + } + + private[comet] def buildCombinedPlan( + commonBytes: Array[Byte], + partitionBytes: Array[Byte]): Array[Byte] = { + val common = OperatorOuterClass.IcebergScanCommon.parseFrom(commonBytes) + val partition = IcebergFilePartition.parseFrom(partitionBytes) + + val scanBuilder = OperatorOuterClass.IcebergScan.newBuilder() + scanBuilder.setSplitMode(true) + scanBuilder.setCommon(common) + scanBuilder.setPartition(partition) + + val opBuilder = OperatorOuterClass.Operator.newBuilder() + opBuilder.setIcebergScan(scanBuilder) + + opBuilder.build().toByteArray + } +} From 183a74a287e2b09aab67f111a856ed584ad364ca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 19:34:02 -0700 Subject: [PATCH 02/23] Change iceberg-rust CI jobs to trigger on [iceberg-rust] tag Update the three rust-specific Iceberg test jobs to only run when the PR title contains [iceberg-rust] rather than [iceberg]. This allows running Java-based Iceberg tests separately from Rust-based ones. Also applies cargo fmt to planner.rs. Co-Authored-By: Claude Opus 4.5 --- .github/workflows/iceberg_spark_test.yml | 6 +++--- native/core/src/execution/planner.rs | 14 ++++---------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index 74badcda5f..54ef48d786 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -164,7 +164,7 @@ jobs: -Pquick=true -x javadoc iceberg-spark-rust: - if: contains(github.event.pull_request.title, '[iceberg]') + if: contains(github.event.pull_request.title, '[iceberg-rust]') strategy: matrix: os: [ubuntu-24.04] @@ -203,7 +203,7 @@ jobs: -Pquick=true -x javadoc iceberg-spark-extensions-rust: - if: contains(github.event.pull_request.title, '[iceberg]') + if: contains(github.event.pull_request.title, '[iceberg-rust]') strategy: matrix: os: [ubuntu-24.04] @@ -242,7 +242,7 @@ jobs: -Pquick=true -x javadoc iceberg-spark-runtime-rust: - if: contains(github.event.pull_request.title, '[iceberg]') + if: contains(github.event.pull_request.title, '[iceberg-rust]') strategy: matrix: os: [ubuntu-24.04] diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index e458fc6c11..eecd542b16 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1137,14 +1137,10 @@ impl PhysicalPlanner { if scan.split_mode.unwrap_or(false) { // Split mode: extract from embedded common + single partition let common = scan.common.as_ref().ok_or_else(|| { - GeneralError( - "split_mode=true but common data missing".into(), - ) + GeneralError("split_mode=true but common data missing".into()) })?; let partition = scan.partition.as_ref().ok_or_else(|| { - GeneralError( - "split_mode=true but partition data missing".into(), - ) + GeneralError("split_mode=true but partition data missing".into()) })?; let schema = @@ -1155,10 +1151,8 @@ impl PhysicalPlanner { .map(|(k, v)| (k.clone(), v.clone())) .collect(); let metadata_loc = common.metadata_location.clone(); - let tasks = parse_file_scan_tasks_from_common( - common, - &partition.file_scan_tasks, - )?; + let tasks = + parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; (schema, catalog_props, metadata_loc, tasks) } else { From 4492d67ff64b1e5347a4ddc99ae93eb138504b23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 19:40:11 -0700 Subject: [PATCH 03/23] Allow clippy::large_enum_variant for spark_operator module The IcebergScan variant is larger after adding split mode fields. This is expected for generated protobuf code. Co-Authored-By: Claude Opus 4.5 --- native/proto/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native/proto/src/lib.rs b/native/proto/src/lib.rs index 6dfe546ac8..a55657b7af 100644 --- a/native/proto/src/lib.rs +++ b/native/proto/src/lib.rs @@ -34,6 +34,7 @@ pub mod spark_partitioning { // Include generated modules from .proto files. #[allow(missing_docs)] +#[allow(clippy::large_enum_variant)] pub mod spark_operator { include!(concat!("generated", "/spark.spark_operator.rs")); } From 537f45050266b454c1d59c29e7e578d914dcbbb3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 19:42:41 -0700 Subject: [PATCH 04/23] trigger CI From 0af529d8fb7490622340dba35a9766abc78a3d4e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 19:53:10 -0700 Subject: [PATCH 05/23] Remove unused import Co-Authored-By: Claude Opus 4.5 --- .../apache/comet/serde/operator/CometIcebergNativeScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 80bd6ea46d..0690e2b7d7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec} import org.apache.spark.sql.types._ import org.apache.comet.ConfigEntry -import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} +import org.apache.comet.iceberg.IcebergReflection import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} From d78d9dd49530556554f2d70f15e26b2539d4e586 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 06:26:58 -0700 Subject: [PATCH 06/23] Simplify split mode detection for Iceberg scan serialization Remove the explicit split_mode boolean flag. Instead, detect split mode by checking if both common and partition fields are present. This simplifies the proto and code while maintaining compatibility with native shuffle which uses the serialized plan directly. - Remove split_mode field from IcebergScan proto - Keep both legacy fields (1-12) and split mode fields (20-21) - Detect split mode in Rust by checking common/partition presence - Simplify Scala code to always use CometIcebergSplitRDD Co-Authored-By: Claude Opus 4.5 --- native/core/src/execution/planner.rs | 83 +++++++++---------- native/proto/src/proto/operator.proto | 13 +-- .../operator/CometIcebergNativeScan.scala | 35 ++++---- .../comet/CometIcebergNativeScanExec.scala | 59 +++---------- .../sql/comet/CometIcebergSplitRDD.scala | 3 +- 5 files changed, 74 insertions(+), 119 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index eecd542b16..1b28ef6f8d 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1132,17 +1132,12 @@ impl PhysicalPlanner { )) } OpStruct::IcebergScan(scan) => { - // Determine if we're in split mode or legacy mode + // Determine if we're in split mode (common + partition present) or legacy mode let (required_schema, catalog_properties, metadata_location, tasks) = - if scan.split_mode.unwrap_or(false) { - // Split mode: extract from embedded common + single partition - let common = scan.common.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but common data missing".into()) - })?; - let partition = scan.partition.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but partition data missing".into()) - })?; - + if let (Some(common), Some(partition)) = + (scan.common.as_ref(), scan.partition.as_ref()) + { + // Split mode: read from embedded common + single partition let schema = convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); let catalog_props: HashMap = common @@ -1151,12 +1146,11 @@ impl PhysicalPlanner { .map(|(k, v)| (k.clone(), v.clone())) .collect(); let metadata_loc = common.metadata_location.clone(); - let tasks = - parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; + let tasks = parse_file_scan_tasks(common, &partition.file_scan_tasks)?; (schema, catalog_props, metadata_loc, tasks) } else { - // Legacy mode: all data in single message + // Legacy mode: all data in top-level IcebergScan fields let schema = convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); let catalog_props: HashMap = scan @@ -1171,7 +1165,7 @@ impl PhysicalPlanner { "IcebergScan must have at least one file partition." ); - let tasks = parse_file_scan_tasks( + let tasks = parse_file_scan_tasks_legacy( scan, &scan.file_partitions[self.partition as usize].file_scan_tasks, )?; @@ -2767,18 +2761,19 @@ fn partition_data_to_struct( /// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects. /// +/// Parses Iceberg FileScanTasks from protobuf messages into iceberg-rust's FileScanTask format. +/// /// Each task contains a residual predicate that is used for row-group level filtering /// during Parquet scanning. /// -/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing +/// This function uses deduplication pools from IcebergScanCommon to avoid redundant parsing /// of schemas, partition specs, partition types, name mappings, and other repeated data. fn parse_file_scan_tasks( - proto_scan: &spark_operator::IcebergScan, + proto_common: &spark_operator::IcebergScanCommon, proto_tasks: &[spark_operator::IcebergFileScanTask], ) -> Result, ExecutionError> { - // Build caches upfront: for 10K tasks with 1 schema, this parses the schema - // once instead of 10K times, eliminating redundant JSON deserialization - let schema_cache: Vec> = proto_scan + // Build caches upfront from common data + let schema_cache: Vec> = proto_common .schema_pool .iter() .map(|json| { @@ -2791,7 +2786,7 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - let partition_spec_cache: Vec>> = proto_scan + let partition_spec_cache: Vec>> = proto_common .partition_spec_pool .iter() .map(|json| { @@ -2801,7 +2796,7 @@ fn parse_file_scan_tasks( }) .collect(); - let name_mapping_cache: Vec>> = proto_scan + let name_mapping_cache: Vec>> = proto_common .name_mapping_pool .iter() .map(|json| { @@ -2811,7 +2806,7 @@ fn parse_file_scan_tasks( }) .collect(); - let delete_files_cache: Vec> = proto_scan + let delete_files_cache: Vec> = proto_common .delete_files_pool .iter() .map(|list| { @@ -2823,7 +2818,7 @@ fn parse_file_scan_tasks( "EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes, other => { return Err(GeneralError(format!( - "Invalid delete content type '{}'. This indicates a bug in Scala serialization.", + "Invalid delete content type '{}'", other ))) } @@ -2844,7 +2839,6 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - // Partition data pool is in protobuf messages let results: Result, _> = proto_tasks .iter() .map(|proto_task| { @@ -2878,7 +2872,7 @@ fn parse_file_scan_tasks( }; let bound_predicate = if let Some(idx) = proto_task.residual_idx { - proto_scan + proto_common .residual_pool .get(idx as usize) .and_then(convert_spark_expr_to_predicate) @@ -2898,24 +2892,22 @@ fn parse_file_scan_tasks( }; let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - // Get partition data from protobuf pool - let partition_data_proto = proto_scan + let partition_data_proto = proto_common .partition_data_pool .get(partition_data_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid partition_data_idx: {} (pool size: {})", partition_data_idx, - proto_scan.partition_data_pool.len() + proto_common.partition_data_pool.len() )) })?; - // Convert protobuf PartitionData to iceberg Struct match partition_data_to_struct(partition_data_proto) { Ok(s) => Some(s), Err(e) => { return Err(ExecutionError::GeneralError(format!( - "Failed to deserialize partition data from protobuf: {}", + "Failed to deserialize partition data: {}", e ))) } @@ -2934,14 +2926,14 @@ fn parse_file_scan_tasks( .and_then(|idx| name_mapping_cache.get(idx as usize)) .and_then(|opt| opt.clone()); - let project_field_ids = proto_scan + let project_field_ids = proto_common .project_field_ids_pool .get(proto_task.project_field_ids_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid project_field_ids_idx: {} (pool size: {})", proto_task.project_field_ids_idx, - proto_scan.project_field_ids_pool.len() + proto_common.project_field_ids_pool.len() )) })? .field_ids @@ -2968,14 +2960,13 @@ fn parse_file_scan_tasks( results } -/// Parse FileScanTasks from IcebergScanCommon (split mode). -/// Similar to parse_file_scan_tasks but reads pools from IcebergScanCommon. -fn parse_file_scan_tasks_from_common( - proto_common: &spark_operator::IcebergScanCommon, +/// Parse FileScanTasks from IcebergScan (legacy mode - pools in top-level fields). +fn parse_file_scan_tasks_legacy( + proto_scan: &spark_operator::IcebergScan, proto_tasks: &[spark_operator::IcebergFileScanTask], ) -> Result, ExecutionError> { - // Build caches upfront from common data - let schema_cache: Vec> = proto_common + // Build caches upfront from top-level IcebergScan fields + let schema_cache: Vec> = proto_scan .schema_pool .iter() .map(|json| { @@ -2988,7 +2979,7 @@ fn parse_file_scan_tasks_from_common( }) .collect::, _>>()?; - let partition_spec_cache: Vec>> = proto_common + let partition_spec_cache: Vec>> = proto_scan .partition_spec_pool .iter() .map(|json| { @@ -2998,7 +2989,7 @@ fn parse_file_scan_tasks_from_common( }) .collect(); - let name_mapping_cache: Vec>> = proto_common + let name_mapping_cache: Vec>> = proto_scan .name_mapping_pool .iter() .map(|json| { @@ -3008,7 +2999,7 @@ fn parse_file_scan_tasks_from_common( }) .collect(); - let delete_files_cache: Vec> = proto_common + let delete_files_cache: Vec> = proto_scan .delete_files_pool .iter() .map(|list| { @@ -3074,7 +3065,7 @@ fn parse_file_scan_tasks_from_common( }; let bound_predicate = if let Some(idx) = proto_task.residual_idx { - proto_common + proto_scan .residual_pool .get(idx as usize) .and_then(convert_spark_expr_to_predicate) @@ -3094,14 +3085,14 @@ fn parse_file_scan_tasks_from_common( }; let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - let partition_data_proto = proto_common + let partition_data_proto = proto_scan .partition_data_pool .get(partition_data_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid partition_data_idx: {} (pool size: {})", partition_data_idx, - proto_common.partition_data_pool.len() + proto_scan.partition_data_pool.len() )) })?; @@ -3128,14 +3119,14 @@ fn parse_file_scan_tasks_from_common( .and_then(|idx| name_mapping_cache.get(idx as usize)) .and_then(|opt| opt.clone()); - let project_field_ids = proto_common + let project_field_ids = proto_scan .project_field_ids_pool .get(proto_task.project_field_ids_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid project_field_ids_idx: {} (pool size: {})", proto_task.project_field_ids_idx, - proto_common.project_field_ids_pool.len() + proto_scan.project_field_ids_pool.len() )) })? .field_ids diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 43a32c935e..86f89c92da 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -201,15 +201,10 @@ message IcebergScan { repeated DeleteFileList delete_files_pool = 11; repeated spark.spark_expression.Expr residual_pool = 12; - // Split mode: when true, common data and single partition are embedded - // instead of all partitions in file_partitions - optional bool split_mode = 20; - - // Embedded common data for split mode (pools, metadata, catalog props) - optional IcebergScanCommon common = 21; - - // Single partition's file tasks for split mode - optional IcebergFilePartition partition = 22; + // Split mode: common data and single partition embedded for per-task serialization + // When set, pools are read from common instead of the above fields + optional IcebergScanCommon common = 20; + optional IcebergFilePartition partition = 21; } // Helper message for deduplicating field ID lists diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 0690e2b7d7..396ec67c29 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -687,6 +687,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit scan: CometBatchScanExec, builder: Operator.Builder, childOp: Operator*): Option[OperatorOuterClass.Operator] = { + // Build IcebergScan for serialized plan (native shuffle uses this) val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder() // Deduplication structures - map unique values to pool indices @@ -1028,18 +1029,21 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit s"$partitionDataPoolBytes bytes (protobuf)") } - // Build split serialization data for efficient per-partition transfer + // Build the IcebergScan (used for serialized plan in native shuffle) val builtScan = icebergScanBuilder.build() + + // Build split serialization data for efficient per-partition transfer via CometIcebergSplitRDD buildAndStoreSplitData(builtScan) builder.clearChildren() Some(builder.setIcebergScan(icebergScanBuilder).build()) } - /** Builds split data (IcebergScanCommon + per-partition bytes) and stores in thread-local. */ + /** Builds IcebergScanCommon from IcebergScan and stores split data in thread-local. */ private def buildAndStoreSplitData(builtScan: OperatorOuterClass.IcebergScan): Unit = { val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() + // Copy pools and metadata from IcebergScan to IcebergScanCommon commonBuilder.setMetadataLocation(builtScan.getMetadataLocation) commonBuilder.putAllCatalogProperties(builtScan.getCatalogPropertiesMap) builtScan.getRequiredSchemaList.forEach(f => commonBuilder.addRequiredSchema(f)) @@ -1072,21 +1076,20 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val metadataLocation = nativeOp.getIcebergScan.getMetadataLocation // Retrieve split data from thread-local (set during convert()) - val splitData = splitDataThreadLocal.get() + val splitData = splitDataThreadLocal.get().getOrElse { + throw new IllegalStateException( + "Programming error: Split data not found in thread-local. " + + "buildAndStoreSplitData should have been called during convert().") + } splitDataThreadLocal.remove() - splitData match { - case Some(data) => - CometIcebergNativeScanExec( - nativeOp, - op.wrapped, - op.session, - metadataLocation, - metadata, - data.commonBytes, - data.perPartitionBytes) - case None => - CometIcebergNativeScanExec(nativeOp, op.wrapped, op.session, metadataLocation, metadata) - } + CometIcebergNativeScanExec( + nativeOp, + op.wrapped, + op.session, + metadataLocation, + metadata, + splitData.commonBytes, + splitData.perPartitionBytes) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 6817611085..de1354cb51 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -52,10 +52,10 @@ case class CometIcebergNativeScanExec( metadataLocation: String, numPartitions: Int, @transient nativeIcebergScanMetadata: CometIcebergNativeScanMetadata, - // Split mode: serialized IcebergScanCommon (captured in closure, sent with task) - commonData: Array[Byte] = Array.empty, - // Split mode: serialized IcebergFilePartition per partition (transient) - @transient perPartitionData: Array[Array[Byte]] = Array.empty) + // Serialized IcebergScanCommon (captured in closure, sent with task) + commonData: Array[Byte], + // Serialized IcebergFilePartition per partition (transient, goes into Partition objects) + @transient perPartitionData: Array[Array[Byte]]) extends CometLeafExec { override val supportsColumnar: Boolean = true @@ -152,19 +152,9 @@ case class CometIcebergNativeScanExec( baseMetrics ++ icebergMetrics + ("num_splits" -> numSplitsMetric) } - /** Uses split mode RDD when split data is available, otherwise falls back to legacy mode. */ override def doExecuteColumnar(): RDD[ColumnarBatch] = { - if (commonData.nonEmpty && perPartitionData.nonEmpty) { - val nativeMetrics = CometMetricNode.fromCometPlan(this) - CometIcebergSplitRDD( - sparkContext, - commonData, - perPartitionData, - output.length, - nativeMetrics) - } else { - super.doExecuteColumnar() - } + val nativeMetrics = CometMetricNode.fromCometPlan(this) + CometIcebergSplitRDD(sparkContext, commonData, perPartitionData, output.length, nativeMetrics) } override protected def doCanonicalize(): CometIcebergNativeScanExec = { @@ -175,7 +165,9 @@ case class CometIcebergNativeScanExec( SerializedPlan(None), metadataLocation, numPartitions, - nativeIcebergScanMetadata) + nativeIcebergScanMetadata, + commonData, + perPartitionData) } override def stringArgs: Iterator[Any] = @@ -220,38 +212,13 @@ object CometIcebergNativeScanExec { * Path to table metadata file * @param nativeIcebergScanMetadata * Pre-extracted Iceberg metadata from planning phase + * @param commonData + * Serialized IcebergScanCommon (pools, metadata, catalog props) + * @param perPartitionData + * Serialized IcebergFilePartition per partition * @return * A new CometIcebergNativeScanExec */ - def apply( - nativeOp: Operator, - scanExec: BatchScanExec, - session: SparkSession, - metadataLocation: String, - nativeIcebergScanMetadata: CometIcebergNativeScanMetadata): CometIcebergNativeScanExec = { - - // Determine number of partitions from Iceberg's output partitioning - val numParts = scanExec.outputPartitioning match { - case p: KeyGroupedPartitioning => - p.numPartitions - case _ => - scanExec.inputRDD.getNumPartitions - } - - val exec = CometIcebergNativeScanExec( - nativeOp, - scanExec.output, - scanExec, - SerializedPlan(None), - metadataLocation, - numParts, - nativeIcebergScanMetadata) - - scanExec.logicalLink.foreach(exec.setLogicalLink) - exec - } - - /** Creates a CometIcebergNativeScanExec with split serialization data. */ def apply( nativeOp: Operator, scanExec: BatchScanExec, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala index f5131bbc50..4b24fdf96e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala @@ -44,7 +44,7 @@ private[spark] class CometIcebergSplitPartition( * - perPartitionData: Array of serialized IcebergFilePartition - populates Partition objects * * Each task receives commonData (via closure) + partitionBytes (via Partition), combines them - * into an IcebergScan with split_mode=true, and passes to native execution. + * into an IcebergScan, and passes to native execution. */ private[spark] class CometIcebergSplitRDD( sc: SparkContext, @@ -114,7 +114,6 @@ object CometIcebergSplitRDD { val partition = IcebergFilePartition.parseFrom(partitionBytes) val scanBuilder = OperatorOuterClass.IcebergScan.newBuilder() - scanBuilder.setSplitMode(true) scanBuilder.setCommon(common) scanBuilder.setPartition(partition) From 5066cc9909dae310f7a8ebb939a0029f762bb200 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 06:56:30 -0700 Subject: [PATCH 07/23] Revert "Simplify split mode detection for Iceberg scan serialization" This reverts commit d78d9dd49530556554f2d70f15e26b2539d4e586. --- native/core/src/execution/planner.rs | 83 ++++++++++--------- native/proto/src/proto/operator.proto | 13 ++- .../operator/CometIcebergNativeScan.scala | 35 ++++---- .../comet/CometIcebergNativeScanExec.scala | 59 ++++++++++--- .../sql/comet/CometIcebergSplitRDD.scala | 3 +- 5 files changed, 119 insertions(+), 74 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1b28ef6f8d..eecd542b16 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1132,12 +1132,17 @@ impl PhysicalPlanner { )) } OpStruct::IcebergScan(scan) => { - // Determine if we're in split mode (common + partition present) or legacy mode + // Determine if we're in split mode or legacy mode let (required_schema, catalog_properties, metadata_location, tasks) = - if let (Some(common), Some(partition)) = - (scan.common.as_ref(), scan.partition.as_ref()) - { - // Split mode: read from embedded common + single partition + if scan.split_mode.unwrap_or(false) { + // Split mode: extract from embedded common + single partition + let common = scan.common.as_ref().ok_or_else(|| { + GeneralError("split_mode=true but common data missing".into()) + })?; + let partition = scan.partition.as_ref().ok_or_else(|| { + GeneralError("split_mode=true but partition data missing".into()) + })?; + let schema = convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); let catalog_props: HashMap = common @@ -1146,11 +1151,12 @@ impl PhysicalPlanner { .map(|(k, v)| (k.clone(), v.clone())) .collect(); let metadata_loc = common.metadata_location.clone(); - let tasks = parse_file_scan_tasks(common, &partition.file_scan_tasks)?; + let tasks = + parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; (schema, catalog_props, metadata_loc, tasks) } else { - // Legacy mode: all data in top-level IcebergScan fields + // Legacy mode: all data in single message let schema = convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); let catalog_props: HashMap = scan @@ -1165,7 +1171,7 @@ impl PhysicalPlanner { "IcebergScan must have at least one file partition." ); - let tasks = parse_file_scan_tasks_legacy( + let tasks = parse_file_scan_tasks( scan, &scan.file_partitions[self.partition as usize].file_scan_tasks, )?; @@ -2761,19 +2767,18 @@ fn partition_data_to_struct( /// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects. /// -/// Parses Iceberg FileScanTasks from protobuf messages into iceberg-rust's FileScanTask format. -/// /// Each task contains a residual predicate that is used for row-group level filtering /// during Parquet scanning. /// -/// This function uses deduplication pools from IcebergScanCommon to avoid redundant parsing +/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing /// of schemas, partition specs, partition types, name mappings, and other repeated data. fn parse_file_scan_tasks( - proto_common: &spark_operator::IcebergScanCommon, + proto_scan: &spark_operator::IcebergScan, proto_tasks: &[spark_operator::IcebergFileScanTask], ) -> Result, ExecutionError> { - // Build caches upfront from common data - let schema_cache: Vec> = proto_common + // Build caches upfront: for 10K tasks with 1 schema, this parses the schema + // once instead of 10K times, eliminating redundant JSON deserialization + let schema_cache: Vec> = proto_scan .schema_pool .iter() .map(|json| { @@ -2786,7 +2791,7 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - let partition_spec_cache: Vec>> = proto_common + let partition_spec_cache: Vec>> = proto_scan .partition_spec_pool .iter() .map(|json| { @@ -2796,7 +2801,7 @@ fn parse_file_scan_tasks( }) .collect(); - let name_mapping_cache: Vec>> = proto_common + let name_mapping_cache: Vec>> = proto_scan .name_mapping_pool .iter() .map(|json| { @@ -2806,7 +2811,7 @@ fn parse_file_scan_tasks( }) .collect(); - let delete_files_cache: Vec> = proto_common + let delete_files_cache: Vec> = proto_scan .delete_files_pool .iter() .map(|list| { @@ -2818,7 +2823,7 @@ fn parse_file_scan_tasks( "EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes, other => { return Err(GeneralError(format!( - "Invalid delete content type '{}'", + "Invalid delete content type '{}'. This indicates a bug in Scala serialization.", other ))) } @@ -2839,6 +2844,7 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; + // Partition data pool is in protobuf messages let results: Result, _> = proto_tasks .iter() .map(|proto_task| { @@ -2872,7 +2878,7 @@ fn parse_file_scan_tasks( }; let bound_predicate = if let Some(idx) = proto_task.residual_idx { - proto_common + proto_scan .residual_pool .get(idx as usize) .and_then(convert_spark_expr_to_predicate) @@ -2892,22 +2898,24 @@ fn parse_file_scan_tasks( }; let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - let partition_data_proto = proto_common + // Get partition data from protobuf pool + let partition_data_proto = proto_scan .partition_data_pool .get(partition_data_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid partition_data_idx: {} (pool size: {})", partition_data_idx, - proto_common.partition_data_pool.len() + proto_scan.partition_data_pool.len() )) })?; + // Convert protobuf PartitionData to iceberg Struct match partition_data_to_struct(partition_data_proto) { Ok(s) => Some(s), Err(e) => { return Err(ExecutionError::GeneralError(format!( - "Failed to deserialize partition data: {}", + "Failed to deserialize partition data from protobuf: {}", e ))) } @@ -2926,14 +2934,14 @@ fn parse_file_scan_tasks( .and_then(|idx| name_mapping_cache.get(idx as usize)) .and_then(|opt| opt.clone()); - let project_field_ids = proto_common + let project_field_ids = proto_scan .project_field_ids_pool .get(proto_task.project_field_ids_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid project_field_ids_idx: {} (pool size: {})", proto_task.project_field_ids_idx, - proto_common.project_field_ids_pool.len() + proto_scan.project_field_ids_pool.len() )) })? .field_ids @@ -2960,13 +2968,14 @@ fn parse_file_scan_tasks( results } -/// Parse FileScanTasks from IcebergScan (legacy mode - pools in top-level fields). -fn parse_file_scan_tasks_legacy( - proto_scan: &spark_operator::IcebergScan, +/// Parse FileScanTasks from IcebergScanCommon (split mode). +/// Similar to parse_file_scan_tasks but reads pools from IcebergScanCommon. +fn parse_file_scan_tasks_from_common( + proto_common: &spark_operator::IcebergScanCommon, proto_tasks: &[spark_operator::IcebergFileScanTask], ) -> Result, ExecutionError> { - // Build caches upfront from top-level IcebergScan fields - let schema_cache: Vec> = proto_scan + // Build caches upfront from common data + let schema_cache: Vec> = proto_common .schema_pool .iter() .map(|json| { @@ -2979,7 +2988,7 @@ fn parse_file_scan_tasks_legacy( }) .collect::, _>>()?; - let partition_spec_cache: Vec>> = proto_scan + let partition_spec_cache: Vec>> = proto_common .partition_spec_pool .iter() .map(|json| { @@ -2989,7 +2998,7 @@ fn parse_file_scan_tasks_legacy( }) .collect(); - let name_mapping_cache: Vec>> = proto_scan + let name_mapping_cache: Vec>> = proto_common .name_mapping_pool .iter() .map(|json| { @@ -2999,7 +3008,7 @@ fn parse_file_scan_tasks_legacy( }) .collect(); - let delete_files_cache: Vec> = proto_scan + let delete_files_cache: Vec> = proto_common .delete_files_pool .iter() .map(|list| { @@ -3065,7 +3074,7 @@ fn parse_file_scan_tasks_legacy( }; let bound_predicate = if let Some(idx) = proto_task.residual_idx { - proto_scan + proto_common .residual_pool .get(idx as usize) .and_then(convert_spark_expr_to_predicate) @@ -3085,14 +3094,14 @@ fn parse_file_scan_tasks_legacy( }; let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - let partition_data_proto = proto_scan + let partition_data_proto = proto_common .partition_data_pool .get(partition_data_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid partition_data_idx: {} (pool size: {})", partition_data_idx, - proto_scan.partition_data_pool.len() + proto_common.partition_data_pool.len() )) })?; @@ -3119,14 +3128,14 @@ fn parse_file_scan_tasks_legacy( .and_then(|idx| name_mapping_cache.get(idx as usize)) .and_then(|opt| opt.clone()); - let project_field_ids = proto_scan + let project_field_ids = proto_common .project_field_ids_pool .get(proto_task.project_field_ids_idx as usize) .ok_or_else(|| { ExecutionError::GeneralError(format!( "Invalid project_field_ids_idx: {} (pool size: {})", proto_task.project_field_ids_idx, - proto_scan.project_field_ids_pool.len() + proto_common.project_field_ids_pool.len() )) })? .field_ids diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 86f89c92da..43a32c935e 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -201,10 +201,15 @@ message IcebergScan { repeated DeleteFileList delete_files_pool = 11; repeated spark.spark_expression.Expr residual_pool = 12; - // Split mode: common data and single partition embedded for per-task serialization - // When set, pools are read from common instead of the above fields - optional IcebergScanCommon common = 20; - optional IcebergFilePartition partition = 21; + // Split mode: when true, common data and single partition are embedded + // instead of all partitions in file_partitions + optional bool split_mode = 20; + + // Embedded common data for split mode (pools, metadata, catalog props) + optional IcebergScanCommon common = 21; + + // Single partition's file tasks for split mode + optional IcebergFilePartition partition = 22; } // Helper message for deduplicating field ID lists diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 396ec67c29..0690e2b7d7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -687,7 +687,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit scan: CometBatchScanExec, builder: Operator.Builder, childOp: Operator*): Option[OperatorOuterClass.Operator] = { - // Build IcebergScan for serialized plan (native shuffle uses this) val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder() // Deduplication structures - map unique values to pool indices @@ -1029,21 +1028,18 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit s"$partitionDataPoolBytes bytes (protobuf)") } - // Build the IcebergScan (used for serialized plan in native shuffle) + // Build split serialization data for efficient per-partition transfer val builtScan = icebergScanBuilder.build() - - // Build split serialization data for efficient per-partition transfer via CometIcebergSplitRDD buildAndStoreSplitData(builtScan) builder.clearChildren() Some(builder.setIcebergScan(icebergScanBuilder).build()) } - /** Builds IcebergScanCommon from IcebergScan and stores split data in thread-local. */ + /** Builds split data (IcebergScanCommon + per-partition bytes) and stores in thread-local. */ private def buildAndStoreSplitData(builtScan: OperatorOuterClass.IcebergScan): Unit = { val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() - // Copy pools and metadata from IcebergScan to IcebergScanCommon commonBuilder.setMetadataLocation(builtScan.getMetadataLocation) commonBuilder.putAllCatalogProperties(builtScan.getCatalogPropertiesMap) builtScan.getRequiredSchemaList.forEach(f => commonBuilder.addRequiredSchema(f)) @@ -1076,20 +1072,21 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val metadataLocation = nativeOp.getIcebergScan.getMetadataLocation // Retrieve split data from thread-local (set during convert()) - val splitData = splitDataThreadLocal.get().getOrElse { - throw new IllegalStateException( - "Programming error: Split data not found in thread-local. " + - "buildAndStoreSplitData should have been called during convert().") - } + val splitData = splitDataThreadLocal.get() splitDataThreadLocal.remove() - CometIcebergNativeScanExec( - nativeOp, - op.wrapped, - op.session, - metadataLocation, - metadata, - splitData.commonBytes, - splitData.perPartitionBytes) + splitData match { + case Some(data) => + CometIcebergNativeScanExec( + nativeOp, + op.wrapped, + op.session, + metadataLocation, + metadata, + data.commonBytes, + data.perPartitionBytes) + case None => + CometIcebergNativeScanExec(nativeOp, op.wrapped, op.session, metadataLocation, metadata) + } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index de1354cb51..6817611085 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -52,10 +52,10 @@ case class CometIcebergNativeScanExec( metadataLocation: String, numPartitions: Int, @transient nativeIcebergScanMetadata: CometIcebergNativeScanMetadata, - // Serialized IcebergScanCommon (captured in closure, sent with task) - commonData: Array[Byte], - // Serialized IcebergFilePartition per partition (transient, goes into Partition objects) - @transient perPartitionData: Array[Array[Byte]]) + // Split mode: serialized IcebergScanCommon (captured in closure, sent with task) + commonData: Array[Byte] = Array.empty, + // Split mode: serialized IcebergFilePartition per partition (transient) + @transient perPartitionData: Array[Array[Byte]] = Array.empty) extends CometLeafExec { override val supportsColumnar: Boolean = true @@ -152,9 +152,19 @@ case class CometIcebergNativeScanExec( baseMetrics ++ icebergMetrics + ("num_splits" -> numSplitsMetric) } + /** Uses split mode RDD when split data is available, otherwise falls back to legacy mode. */ override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val nativeMetrics = CometMetricNode.fromCometPlan(this) - CometIcebergSplitRDD(sparkContext, commonData, perPartitionData, output.length, nativeMetrics) + if (commonData.nonEmpty && perPartitionData.nonEmpty) { + val nativeMetrics = CometMetricNode.fromCometPlan(this) + CometIcebergSplitRDD( + sparkContext, + commonData, + perPartitionData, + output.length, + nativeMetrics) + } else { + super.doExecuteColumnar() + } } override protected def doCanonicalize(): CometIcebergNativeScanExec = { @@ -165,9 +175,7 @@ case class CometIcebergNativeScanExec( SerializedPlan(None), metadataLocation, numPartitions, - nativeIcebergScanMetadata, - commonData, - perPartitionData) + nativeIcebergScanMetadata) } override def stringArgs: Iterator[Any] = @@ -212,13 +220,38 @@ object CometIcebergNativeScanExec { * Path to table metadata file * @param nativeIcebergScanMetadata * Pre-extracted Iceberg metadata from planning phase - * @param commonData - * Serialized IcebergScanCommon (pools, metadata, catalog props) - * @param perPartitionData - * Serialized IcebergFilePartition per partition * @return * A new CometIcebergNativeScanExec */ + def apply( + nativeOp: Operator, + scanExec: BatchScanExec, + session: SparkSession, + metadataLocation: String, + nativeIcebergScanMetadata: CometIcebergNativeScanMetadata): CometIcebergNativeScanExec = { + + // Determine number of partitions from Iceberg's output partitioning + val numParts = scanExec.outputPartitioning match { + case p: KeyGroupedPartitioning => + p.numPartitions + case _ => + scanExec.inputRDD.getNumPartitions + } + + val exec = CometIcebergNativeScanExec( + nativeOp, + scanExec.output, + scanExec, + SerializedPlan(None), + metadataLocation, + numParts, + nativeIcebergScanMetadata) + + scanExec.logicalLink.foreach(exec.setLogicalLink) + exec + } + + /** Creates a CometIcebergNativeScanExec with split serialization data. */ def apply( nativeOp: Operator, scanExec: BatchScanExec, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala index 4b24fdf96e..f5131bbc50 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala @@ -44,7 +44,7 @@ private[spark] class CometIcebergSplitPartition( * - perPartitionData: Array of serialized IcebergFilePartition - populates Partition objects * * Each task receives commonData (via closure) + partitionBytes (via Partition), combines them - * into an IcebergScan, and passes to native execution. + * into an IcebergScan with split_mode=true, and passes to native execution. */ private[spark] class CometIcebergSplitRDD( sc: SparkContext, @@ -114,6 +114,7 @@ object CometIcebergSplitRDD { val partition = IcebergFilePartition.parseFrom(partitionBytes) val scanBuilder = OperatorOuterClass.IcebergScan.newBuilder() + scanBuilder.setSplitMode(true) scanBuilder.setCommon(common) scanBuilder.setPartition(partition) From 074b88c95755afd70a4b04f6bc0a49bb7ad33f5b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 07:45:23 -0700 Subject: [PATCH 08/23] Fix partition index handling in IcebergScanExec for split mode When IcebergScanExec is a child of another operator (e.g., FilterExec), the parent calls execute(partition=N) but in split mode there's only 1 task group containing the correct data for partition N. This fix checks if there's only 1 task group and uses effective partition 0 regardless of the requested partition index. This allows parent operators to correctly execute their partition N while the IcebergScanExec returns data from its single task group. Also removed debug eprintln statement and improved error messages for legacy mode. Co-Authored-By: Claude Opus 4.5 --- .../src/execution/operators/iceberg_scan.rs | 14 ++- native/core/src/execution/planner.rs | 102 ++++++++++-------- 2 files changed, 71 insertions(+), 45 deletions(-) diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index 2f639e9f70..e23aa49e07 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -130,8 +130,18 @@ impl ExecutionPlan for IcebergScanExec { partition: usize, context: Arc, ) -> DFResult { - if partition < self.file_task_groups.len() { - let tasks = &self.file_task_groups[partition]; + // In split mode (single task group), always use index 0 regardless of requested partition. + // This is because in Comet's per-partition execution model, each task builds its own plan + // with only its partition's data. The parent operator may request partition N, but this + // IcebergScanExec already contains the correct data for partition N in task_groups[0]. + let effective_partition = if self.file_task_groups.len() == 1 { + 0 + } else { + partition + }; + + if effective_partition < self.file_task_groups.len() { + let tasks = &self.file_task_groups[effective_partition]; self.execute_with_tasks(tasks.clone(), partition, context) } else { Err(DataFusionError::Execution(format!( diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index eecd542b16..c89d189d92 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1133,51 +1133,67 @@ impl PhysicalPlanner { } OpStruct::IcebergScan(scan) => { // Determine if we're in split mode or legacy mode - let (required_schema, catalog_properties, metadata_location, tasks) = - if scan.split_mode.unwrap_or(false) { - // Split mode: extract from embedded common + single partition - let common = scan.common.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but common data missing".into()) - })?; - let partition = scan.partition.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but partition data missing".into()) - })?; - - let schema = - convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); - let catalog_props: HashMap = common - .catalog_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let metadata_loc = common.metadata_location.clone(); - let tasks = - parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; + let (required_schema, catalog_properties, metadata_location, tasks) = if scan + .split_mode + .unwrap_or(false) + { + // Split mode: extract from embedded common + single partition + let common = scan.common.as_ref().ok_or_else(|| { + GeneralError("split_mode=true but common data missing".into()) + })?; + let partition = scan.partition.as_ref().ok_or_else(|| { + GeneralError("split_mode=true but partition data missing".into()) + })?; - (schema, catalog_props, metadata_loc, tasks) - } else { - // Legacy mode: all data in single message - let schema = - convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); - let catalog_props: HashMap = scan - .catalog_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let metadata_loc = scan.metadata_location.clone(); - - debug_assert!( - !scan.file_partitions.is_empty(), - "IcebergScan must have at least one file partition." - ); - - let tasks = parse_file_scan_tasks( - scan, - &scan.file_partitions[self.partition as usize].file_scan_tasks, - )?; + let schema = + convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); + let catalog_props: HashMap = common + .catalog_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let metadata_loc = common.metadata_location.clone(); + let tasks = + parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; - (schema, catalog_props, metadata_loc, tasks) - }; + (schema, catalog_props, metadata_loc, tasks) + } else { + // Legacy mode: all data in single message + let schema = + convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); + let catalog_props: HashMap = scan + .catalog_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let metadata_loc = scan.metadata_location.clone(); + + // Check file_partitions before accessing + if scan.file_partitions.is_empty() { + return Err(GeneralError(format!( + "IcebergScan in legacy mode (split_mode={:?}) but file_partitions is empty. \ + Partition index: {}, has_common: {}, has_partition: {}", + scan.split_mode, + self.partition, + scan.common.is_some(), + scan.partition.is_some() + ))); + } + if self.partition as usize >= scan.file_partitions.len() { + return Err(GeneralError(format!( + "IcebergScan partition index {} out of range (file_partitions.len={})", + self.partition, + scan.file_partitions.len() + ))); + } + + let tasks = parse_file_scan_tasks( + scan, + &scan.file_partitions[self.partition as usize].file_scan_tasks, + )?; + + (schema, catalog_props, metadata_loc, tasks) + }; let file_task_groups = vec![tasks]; From 894e02d12465521dbfc3e4dbe6f993893937a805 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 08:01:52 -0700 Subject: [PATCH 09/23] Keep legacy code path for IcebergScan as child of other operators Split mode is used when IcebergScan executes directly via CometIcebergSplitRDD. Legacy mode is used when IcebergScan is a child of another native operator (like FilterExec), where the parent's nativeOp contains the child IcebergScan with all partitions in file_partitions. - Restore legacy code path in planner.rs that uses file_partitions - Add parse_file_scan_tasks() for legacy mode - Require split data in CometIcebergNativeScanExec.doExecuteColumnar() - Require split data in CometIcebergNativeScan.createExec() Co-Authored-By: Claude Opus 4.5 --- native/core/src/execution/planner.rs | 126 ++++++++---------- .../operator/CometIcebergNativeScan.scala | 27 ++-- .../comet/CometIcebergNativeScanExec.scala | 19 +-- 3 files changed, 78 insertions(+), 94 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index c89d189d92..d23b760a99 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1133,67 +1133,59 @@ impl PhysicalPlanner { } OpStruct::IcebergScan(scan) => { // Determine if we're in split mode or legacy mode - let (required_schema, catalog_properties, metadata_location, tasks) = if scan - .split_mode - .unwrap_or(false) - { - // Split mode: extract from embedded common + single partition - let common = scan.common.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but common data missing".into()) - })?; - let partition = scan.partition.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but partition data missing".into()) - })?; + let (required_schema, catalog_properties, metadata_location, tasks) = + if scan.split_mode.unwrap_or(false) { + // Split mode: extract from embedded common + single partition + let common = scan.common.as_ref().ok_or_else(|| { + GeneralError("split_mode=true but common data missing".into()) + })?; + let partition = scan.partition.as_ref().ok_or_else(|| { + GeneralError("split_mode=true but partition data missing".into()) + })?; + + let schema = + convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); + let catalog_props: HashMap = common + .catalog_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let metadata_loc = common.metadata_location.clone(); + let tasks = + parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; - let schema = - convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); - let catalog_props: HashMap = common - .catalog_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let metadata_loc = common.metadata_location.clone(); - let tasks = - parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; + (schema, catalog_props, metadata_loc, tasks) + } else { + // Legacy mode: all data in single message (used when IcebergScan is child of another operator) + let schema = + convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); + let catalog_props: HashMap = scan + .catalog_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let metadata_loc = scan.metadata_location.clone(); - (schema, catalog_props, metadata_loc, tasks) - } else { - // Legacy mode: all data in single message - let schema = - convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); - let catalog_props: HashMap = scan - .catalog_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let metadata_loc = scan.metadata_location.clone(); - - // Check file_partitions before accessing - if scan.file_partitions.is_empty() { - return Err(GeneralError(format!( - "IcebergScan in legacy mode (split_mode={:?}) but file_partitions is empty. \ - Partition index: {}, has_common: {}, has_partition: {}", - scan.split_mode, - self.partition, - scan.common.is_some(), - scan.partition.is_some() - ))); - } - if self.partition as usize >= scan.file_partitions.len() { - return Err(GeneralError(format!( + if scan.file_partitions.is_empty() { + return Err(GeneralError( + "IcebergScan in legacy mode but file_partitions is empty".into(), + )); + } + if self.partition as usize >= scan.file_partitions.len() { + return Err(GeneralError(format!( "IcebergScan partition index {} out of range (file_partitions.len={})", self.partition, scan.file_partitions.len() ))); - } + } - let tasks = parse_file_scan_tasks( - scan, - &scan.file_partitions[self.partition as usize].file_scan_tasks, - )?; + let tasks = parse_file_scan_tasks( + scan, + &scan.file_partitions[self.partition as usize].file_scan_tasks, + )?; - (schema, catalog_props, metadata_loc, tasks) - }; + (schema, catalog_props, metadata_loc, tasks) + }; let file_task_groups = vec![tasks]; @@ -2781,19 +2773,15 @@ fn partition_data_to_struct( Ok(iceberg::spec::Struct::from_iter(literals)) } -/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects. +/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects (legacy mode). /// -/// Each task contains a residual predicate that is used for row-group level filtering -/// during Parquet scanning. -/// -/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing -/// of schemas, partition specs, partition types, name mappings, and other repeated data. +/// This function is used when IcebergScan is a child of another operator and the full +/// IcebergScan message (with file_partitions) is serialized as part of the parent's plan. fn parse_file_scan_tasks( proto_scan: &spark_operator::IcebergScan, proto_tasks: &[spark_operator::IcebergFileScanTask], ) -> Result, ExecutionError> { - // Build caches upfront: for 10K tasks with 1 schema, this parses the schema - // once instead of 10K times, eliminating redundant JSON deserialization + // Build caches upfront from IcebergScan pools let schema_cache: Vec> = proto_scan .schema_pool .iter() @@ -2839,7 +2827,7 @@ fn parse_file_scan_tasks( "EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes, other => { return Err(GeneralError(format!( - "Invalid delete content type '{}'. This indicates a bug in Scala serialization.", + "Invalid delete content type '{}'", other ))) } @@ -2860,7 +2848,6 @@ fn parse_file_scan_tasks( }) .collect::, _>>()?; - // Partition data pool is in protobuf messages let results: Result, _> = proto_tasks .iter() .map(|proto_task| { @@ -2914,7 +2901,6 @@ fn parse_file_scan_tasks( }; let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - // Get partition data from protobuf pool let partition_data_proto = proto_scan .partition_data_pool .get(partition_data_idx as usize) @@ -2926,12 +2912,11 @@ fn parse_file_scan_tasks( )) })?; - // Convert protobuf PartitionData to iceberg Struct match partition_data_to_struct(partition_data_proto) { Ok(s) => Some(s), Err(e) => { return Err(ExecutionError::GeneralError(format!( - "Failed to deserialize partition data from protobuf: {}", + "Failed to deserialize partition data: {}", e ))) } @@ -2984,8 +2969,13 @@ fn parse_file_scan_tasks( results } -/// Parse FileScanTasks from IcebergScanCommon (split mode). -/// Similar to parse_file_scan_tasks but reads pools from IcebergScanCommon. +/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects (split mode). +/// +/// Each task contains a residual predicate that is used for row-group level filtering +/// during Parquet scanning. +/// +/// This function uses deduplication pools from the IcebergScanCommon to avoid redundant +/// parsing of schemas, partition specs, partition types, name mappings, and other repeated data. fn parse_file_scan_tasks_from_common( proto_common: &spark_operator::IcebergScanCommon, proto_tasks: &[spark_operator::IcebergFileScanTask], diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 0690e2b7d7..93701a7262 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -1072,21 +1072,20 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val metadataLocation = nativeOp.getIcebergScan.getMetadataLocation // Retrieve split data from thread-local (set during convert()) - val splitData = splitDataThreadLocal.get() + val splitData = splitDataThreadLocal.get().getOrElse { + throw new IllegalStateException( + "Programming error: split data not available. " + + "buildAndStoreSplitData() should have been called during convert().") + } splitDataThreadLocal.remove() - splitData match { - case Some(data) => - CometIcebergNativeScanExec( - nativeOp, - op.wrapped, - op.session, - metadataLocation, - metadata, - data.commonBytes, - data.perPartitionBytes) - case None => - CometIcebergNativeScanExec(nativeOp, op.wrapped, op.session, metadataLocation, metadata) - } + CometIcebergNativeScanExec( + nativeOp, + op.wrapped, + op.session, + metadataLocation, + metadata, + splitData.commonBytes, + splitData.perPartitionBytes) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 6817611085..2483abbb50 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -152,19 +152,14 @@ case class CometIcebergNativeScanExec( baseMetrics ++ icebergMetrics + ("num_splits" -> numSplitsMetric) } - /** Uses split mode RDD when split data is available, otherwise falls back to legacy mode. */ + /** Executes using split mode RDD - split data must be available. */ override def doExecuteColumnar(): RDD[ColumnarBatch] = { - if (commonData.nonEmpty && perPartitionData.nonEmpty) { - val nativeMetrics = CometMetricNode.fromCometPlan(this) - CometIcebergSplitRDD( - sparkContext, - commonData, - perPartitionData, - output.length, - nativeMetrics) - } else { - super.doExecuteColumnar() - } + require( + commonData.nonEmpty && perPartitionData.nonEmpty, + "IcebergScan requires split serialization data (commonData and perPartitionData)") + + val nativeMetrics = CometMetricNode.fromCometPlan(this) + CometIcebergSplitRDD(sparkContext, commonData, perPartitionData, output.length, nativeMetrics) } override protected def doCanonicalize(): CometIcebergNativeScanExec = { From c76ddf396c7d82eed90808c5e8aee212ebb0f9f8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 08:58:05 -0700 Subject: [PATCH 10/23] Implement per-partition plan injection for Iceberg split serialization When IcebergScan is a child of another operator (e.g., FilterExec), the parent's nativeOp tree contains the IcebergScan. Previously, each task would receive all partitions' data even though it only needed its own. This change implements per-partition plan injection: - Add IcebergPartitionInjector to traverse operator trees and inject partition data into IcebergScan nodes with split_mode=true - Add findIcebergSplitData() to locate CometIcebergNativeScanExec descendants with split mode data - Modify doExecuteColumnar() to detect Iceberg split data and create per-partition plans by injecting partition data before serialization - Remove legacy code path from Rust planner (all Iceberg scans now require split_mode=true with common + partition data) Co-Authored-By: Claude Opus 4.5 --- native/core/src/execution/planner.rs | 276 ++---------------- .../operator/CometIcebergNativeScan.scala | 56 ++-- .../apache/spark/sql/comet/operators.scala | 87 +++++- 3 files changed, 144 insertions(+), 275 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index d23b760a99..06a03236b5 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1132,60 +1132,34 @@ impl PhysicalPlanner { )) } OpStruct::IcebergScan(scan) => { - // Determine if we're in split mode or legacy mode - let (required_schema, catalog_properties, metadata_location, tasks) = - if scan.split_mode.unwrap_or(false) { - // Split mode: extract from embedded common + single partition - let common = scan.common.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but common data missing".into()) - })?; - let partition = scan.partition.as_ref().ok_or_else(|| { - GeneralError("split_mode=true but partition data missing".into()) - })?; - - let schema = - convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); - let catalog_props: HashMap = common - .catalog_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let metadata_loc = common.metadata_location.clone(); - let tasks = - parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; - - (schema, catalog_props, metadata_loc, tasks) - } else { - // Legacy mode: all data in single message (used when IcebergScan is child of another operator) - let schema = - convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); - let catalog_props: HashMap = scan - .catalog_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let metadata_loc = scan.metadata_location.clone(); - - if scan.file_partitions.is_empty() { - return Err(GeneralError( - "IcebergScan in legacy mode but file_partitions is empty".into(), - )); - } - if self.partition as usize >= scan.file_partitions.len() { - return Err(GeneralError(format!( - "IcebergScan partition index {} out of range (file_partitions.len={})", - self.partition, - scan.file_partitions.len() - ))); - } + // Split mode: extract from embedded common + single partition + // Per-partition injection happens in Scala before sending to native + if !scan.split_mode.unwrap_or(false) { + return Err(GeneralError( + "IcebergScan requires split_mode=true (partition data injected at \ + execution time)" + .into(), + )); + } - let tasks = parse_file_scan_tasks( - scan, - &scan.file_partitions[self.partition as usize].file_scan_tasks, - )?; + let common = scan + .common + .as_ref() + .ok_or_else(|| GeneralError("IcebergScan missing common data".into()))?; + let partition = scan + .partition + .as_ref() + .ok_or_else(|| GeneralError("IcebergScan missing partition data".into()))?; - (schema, catalog_props, metadata_loc, tasks) - }; + let required_schema = + convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); + let catalog_properties: HashMap = common + .catalog_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let metadata_location = common.metadata_location.clone(); + let tasks = parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?; let file_task_groups = vec![tasks]; @@ -2773,203 +2747,7 @@ fn partition_data_to_struct( Ok(iceberg::spec::Struct::from_iter(literals)) } -/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects (legacy mode). -/// -/// This function is used when IcebergScan is a child of another operator and the full -/// IcebergScan message (with file_partitions) is serialized as part of the parent's plan. -fn parse_file_scan_tasks( - proto_scan: &spark_operator::IcebergScan, - proto_tasks: &[spark_operator::IcebergFileScanTask], -) -> Result, ExecutionError> { - // Build caches upfront from IcebergScan pools - let schema_cache: Vec> = proto_scan - .schema_pool - .iter() - .map(|json| { - serde_json::from_str(json).map(Arc::new).map_err(|e| { - ExecutionError::GeneralError(format!( - "Failed to parse schema JSON from pool: {}", - e - )) - }) - }) - .collect::, _>>()?; - - let partition_spec_cache: Vec>> = proto_scan - .partition_spec_pool - .iter() - .map(|json| { - serde_json::from_str::(json) - .ok() - .map(Arc::new) - }) - .collect(); - - let name_mapping_cache: Vec>> = proto_scan - .name_mapping_pool - .iter() - .map(|json| { - serde_json::from_str::(json) - .ok() - .map(Arc::new) - }) - .collect(); - - let delete_files_cache: Vec> = proto_scan - .delete_files_pool - .iter() - .map(|list| { - list.delete_files - .iter() - .map(|del| { - let file_type = match del.content_type.as_str() { - "POSITION_DELETES" => iceberg::spec::DataContentType::PositionDeletes, - "EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes, - other => { - return Err(GeneralError(format!( - "Invalid delete content type '{}'", - other - ))) - } - }; - - Ok(iceberg::scan::FileScanTaskDeleteFile { - file_path: del.file_path.clone(), - file_type, - partition_spec_id: del.partition_spec_id, - equality_ids: if del.equality_ids.is_empty() { - None - } else { - Some(del.equality_ids.clone()) - }, - }) - }) - .collect::, ExecutionError>>() - }) - .collect::, _>>()?; - - let results: Result, _> = proto_tasks - .iter() - .map(|proto_task| { - let schema_ref = Arc::clone( - schema_cache - .get(proto_task.schema_idx as usize) - .ok_or_else(|| { - ExecutionError::GeneralError(format!( - "Invalid schema_idx: {} (pool size: {})", - proto_task.schema_idx, - schema_cache.len() - )) - })?, - ); - - let data_file_format = iceberg::spec::DataFileFormat::Parquet; - - let deletes = if let Some(idx) = proto_task.delete_files_idx { - delete_files_cache - .get(idx as usize) - .ok_or_else(|| { - ExecutionError::GeneralError(format!( - "Invalid delete_files_idx: {} (pool size: {})", - idx, - delete_files_cache.len() - )) - })? - .clone() - } else { - vec![] - }; - - let bound_predicate = if let Some(idx) = proto_task.residual_idx { - proto_scan - .residual_pool - .get(idx as usize) - .and_then(convert_spark_expr_to_predicate) - .map( - |pred| -> Result { - pred.bind(Arc::clone(&schema_ref), true).map_err(|e| { - ExecutionError::GeneralError(format!( - "Failed to bind predicate to schema: {}", - e - )) - }) - }, - ) - .transpose()? - } else { - None - }; - - let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { - let partition_data_proto = proto_scan - .partition_data_pool - .get(partition_data_idx as usize) - .ok_or_else(|| { - ExecutionError::GeneralError(format!( - "Invalid partition_data_idx: {} (pool size: {})", - partition_data_idx, - proto_scan.partition_data_pool.len() - )) - })?; - - match partition_data_to_struct(partition_data_proto) { - Ok(s) => Some(s), - Err(e) => { - return Err(ExecutionError::GeneralError(format!( - "Failed to deserialize partition data: {}", - e - ))) - } - } - } else { - None - }; - - let partition_spec = proto_task - .partition_spec_idx - .and_then(|idx| partition_spec_cache.get(idx as usize)) - .and_then(|opt| opt.clone()); - - let name_mapping = proto_task - .name_mapping_idx - .and_then(|idx| name_mapping_cache.get(idx as usize)) - .and_then(|opt| opt.clone()); - - let project_field_ids = proto_scan - .project_field_ids_pool - .get(proto_task.project_field_ids_idx as usize) - .ok_or_else(|| { - ExecutionError::GeneralError(format!( - "Invalid project_field_ids_idx: {} (pool size: {})", - proto_task.project_field_ids_idx, - proto_scan.project_field_ids_pool.len() - )) - })? - .field_ids - .clone(); - - Ok(iceberg::scan::FileScanTask { - data_file_path: proto_task.data_file_path.clone(), - start: proto_task.start, - length: proto_task.length, - record_count: proto_task.record_count, - data_file_format, - schema: schema_ref, - project_field_ids, - predicate: bound_predicate, - deletes, - partition, - partition_spec, - name_mapping, - case_sensitive: false, - }) - }) - .collect(); - - results -} - -/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects (split mode). +/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects. /// /// Each task contains a residual predicate that is used for row-group level filtering /// during Parquet scanning. diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 93701a7262..ce84a26fc7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -700,6 +700,9 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]() val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]() + // Per-partition file tasks (for split serialization - injected at execution time) + val perPartitionBuilders = mutable.ArrayBuffer[OperatorOuterClass.IcebergFilePartition]() + var totalTasks = 0 // Get pre-extracted metadata from planning phase @@ -978,8 +981,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } + // Collect partition data for later per-partition injection + // Do NOT add to file_partitions (legacy format) val builtPartition = partitionBuilder.build() - icebergScanBuilder.addFilePartitions(builtPartition) + perPartitionBuilders += builtPartition } case _ => } @@ -1028,34 +1033,37 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit s"$partitionDataPoolBytes bytes (protobuf)") } - // Build split serialization data for efficient per-partition transfer - val builtScan = icebergScanBuilder.build() - buildAndStoreSplitData(builtScan) - - builder.clearChildren() - Some(builder.setIcebergScan(icebergScanBuilder).build()) - } - - /** Builds split data (IcebergScanCommon + per-partition bytes) and stores in thread-local. */ - private def buildAndStoreSplitData(builtScan: OperatorOuterClass.IcebergScan): Unit = { + // Build common data (pools, metadata) for split mode val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() + commonBuilder.setMetadataLocation(icebergScanBuilder.getMetadataLocation) + commonBuilder.putAllCatalogProperties(icebergScanBuilder.getCatalogPropertiesMap) + icebergScanBuilder.getRequiredSchemaList.forEach(f => commonBuilder.addRequiredSchema(f)) + icebergScanBuilder.getSchemaPoolList.forEach(s => commonBuilder.addSchemaPool(s)) + icebergScanBuilder.getPartitionTypePoolList.forEach(s => + commonBuilder.addPartitionTypePool(s)) + icebergScanBuilder.getPartitionSpecPoolList.forEach(s => + commonBuilder.addPartitionSpecPool(s)) + icebergScanBuilder.getNameMappingPoolList.forEach(s => commonBuilder.addNameMappingPool(s)) + icebergScanBuilder.getProjectFieldIdsPoolList.forEach { p => + commonBuilder.addProjectFieldIdsPool(p) + } + icebergScanBuilder.getPartitionDataPoolList.forEach(p => + commonBuilder.addPartitionDataPool(p)) + icebergScanBuilder.getDeleteFilesPoolList.forEach(d => commonBuilder.addDeleteFilesPool(d)) + icebergScanBuilder.getResidualPoolList.forEach(r => commonBuilder.addResidualPool(r)) - commonBuilder.setMetadataLocation(builtScan.getMetadataLocation) - commonBuilder.putAllCatalogProperties(builtScan.getCatalogPropertiesMap) - builtScan.getRequiredSchemaList.forEach(f => commonBuilder.addRequiredSchema(f)) - builtScan.getSchemaPoolList.forEach(s => commonBuilder.addSchemaPool(s)) - builtScan.getPartitionTypePoolList.forEach(s => commonBuilder.addPartitionTypePool(s)) - builtScan.getPartitionSpecPoolList.forEach(s => commonBuilder.addPartitionSpecPool(s)) - builtScan.getNameMappingPoolList.forEach(s => commonBuilder.addNameMappingPool(s)) - builtScan.getProjectFieldIdsPoolList.forEach(p => commonBuilder.addProjectFieldIdsPool(p)) - builtScan.getPartitionDataPoolList.forEach(p => commonBuilder.addPartitionDataPool(p)) - builtScan.getDeleteFilesPoolList.forEach(d => commonBuilder.addDeleteFilesPool(d)) - builtScan.getResidualPoolList.forEach(r => commonBuilder.addResidualPool(r)) + // Set split mode and embed common data (partition injected at execution time) + icebergScanBuilder.setSplitMode(true) + icebergScanBuilder.setCommon(commonBuilder.build()) + // Note: partition is NOT set here - it will be injected at execution time + // Store per-partition data for injection at execution time val commonBytes = commonBuilder.build().toByteArray - val perPartitionBytes = builtScan.getFilePartitionsList.asScala.map(_.toByteArray).toArray - + val perPartitionBytes = perPartitionBuilders.map(_.toByteArray).toArray splitDataThreadLocal.set(Some(SplitData(commonBytes, perPartitionBytes))) + + builder.clearChildren() + Some(builder.setIcebergScan(icebergScanBuilder).build()) } override def createExec(nativeOp: Operator, op: CometBatchScanExec): CometNativeExec = { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 6f33467efe..c3824f1215 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -55,10 +55,62 @@ import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, Co import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} import org.apache.comet.parquet.CometParquetUtils import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} -import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator} +import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, IcebergFilePartition, Operator} import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType} import org.apache.comet.serde.operator.CometSink +/** + * Helper object for building per-partition native plans with injected IcebergScan partition data. + */ +private[comet] object IcebergPartitionInjector { + + /** + * Injects partition data into an Operator tree by finding IcebergScan nodes with + * split_mode=true and empty partition, then setting the partition field. + * + * @param op + * The operator tree to modify + * @param partitionBytes + * Serialized IcebergFilePartition bytes to inject + * @return + * New operator tree with partition data injected + */ + def injectPartitionData(op: Operator, partitionBytes: Array[Byte]): Operator = { + val builder = op.toBuilder + + // If this is an IcebergScan with split_mode=true and no partition, inject it + if (op.hasIcebergScan) { + val scan = op.getIcebergScan + if (scan.getSplitMode && !scan.hasPartition) { + val partition = IcebergFilePartition.parseFrom(partitionBytes) + val scanBuilder = scan.toBuilder + scanBuilder.setPartition(partition) + builder.setIcebergScan(scanBuilder) + } + } + + // Recursively process children + builder.clearChildren() + op.getChildrenList.asScala.foreach { child => + builder.addChildren(injectPartitionData(child, partitionBytes)) + } + + builder.build() + } + + /** + * Serializes an operator to bytes. + */ + def serializeOperator(op: Operator): Array[Byte] = { + val size = op.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + op.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + bytes + } +} + /** * A Comet physical operator */ @@ -250,6 +302,10 @@ abstract class CometNativeExec extends CometExec { // TODO: support native metrics for all operators. val nativeMetrics = CometMetricNode.fromCometPlan(this) + // Check for IcebergScan with split mode data that needs per-partition injection + val icebergSplitData: Option[Array[Array[Byte]]] = + findIcebergSplitData(this) + // Go over all the native scans, in order to see if they need encryption options. // For each relation in a CometNativeScan generate a hadoopConf, // for each file path in a relation associate with hadoopConf @@ -294,11 +350,24 @@ abstract class CometNativeExec extends CometExec { inputs: Seq[Iterator[ColumnarBatch]], numParts: Int, partitionIndex: Int): CometExecIterator = { + // Get the actual serialized plan - either shared or per-partition + val actualPlan = icebergSplitData match { + case Some(perPartitionData) => + // Parse the base plan, inject partition data, and re-serialize + val basePlan = OperatorOuterClass.Operator.parseFrom(serializedPlanCopy) + val injected = IcebergPartitionInjector.injectPartitionData( + basePlan, + perPartitionData(partitionIndex)) + IcebergPartitionInjector.serializeOperator(injected) + case None => + serializedPlanCopy + } + val it = new CometExecIterator( CometExec.newIterId, inputs, output.length, - serializedPlanCopy, + actualPlan, nativeMetrics, numParts, partitionIndex, @@ -440,6 +509,20 @@ abstract class CometNativeExec extends CometExec { } } + /** + * Find CometIcebergNativeScanExec with split mode data in the plan tree. Returns the + * per-partition data array if found, None otherwise. + */ + private def findIcebergSplitData(plan: SparkPlan): Option[Array[Array[Byte]]] = { + plan match { + case iceberg: CometIcebergNativeScanExec + if iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty => + Some(iceberg.perPartitionData) + case _ => + plan.children.flatMap(findIcebergSplitData).headOption + } + } + /** * Converts this native Comet operator and its children into a native block which can be * executed as a whole (i.e., in a single JNI call) from the native side. From 2617d6c26c731810c2bba658037b6ae857b78a6e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 09:05:38 -0700 Subject: [PATCH 11/23] Remove unused legacy apply method from CometIcebergNativeScanExec Co-Authored-By: Claude Opus 4.5 --- .../comet/CometIcebergNativeScanExec.scala | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 2483abbb50..d5c6417b2d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -198,54 +198,6 @@ case class CometIcebergNativeScanExec( object CometIcebergNativeScanExec { - /** - * Creates a CometIcebergNativeScanExec from a Spark BatchScanExec. - * - * Determines the number of partitions from Iceberg's output partitioning: - * - KeyGroupedPartitioning: Use Iceberg's partition count - * - Other cases: Use the number of InputPartitions from Iceberg's planning - * - * @param nativeOp - * The serialized native operator - * @param scanExec - * The original Spark BatchScanExec - * @param session - * The SparkSession - * @param metadataLocation - * Path to table metadata file - * @param nativeIcebergScanMetadata - * Pre-extracted Iceberg metadata from planning phase - * @return - * A new CometIcebergNativeScanExec - */ - def apply( - nativeOp: Operator, - scanExec: BatchScanExec, - session: SparkSession, - metadataLocation: String, - nativeIcebergScanMetadata: CometIcebergNativeScanMetadata): CometIcebergNativeScanExec = { - - // Determine number of partitions from Iceberg's output partitioning - val numParts = scanExec.outputPartitioning match { - case p: KeyGroupedPartitioning => - p.numPartitions - case _ => - scanExec.inputRDD.getNumPartitions - } - - val exec = CometIcebergNativeScanExec( - nativeOp, - scanExec.output, - scanExec, - SerializedPlan(None), - metadataLocation, - numParts, - nativeIcebergScanMetadata) - - scanExec.logicalLink.foreach(exec.setLogicalLink) - exec - } - /** Creates a CometIcebergNativeScanExec with split serialization data. */ def apply( nativeOp: Operator, From 89cd3f93d0ceb7a85796a80b7c4c3797a1171866 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 09:44:09 -0700 Subject: [PATCH 12/23] Remove split_mode flag and simplify IcebergScan protobuf IcebergScan now only has two fields: - common: IcebergScanCommon (shared pools, metadata, catalog props) - partition: IcebergFilePartition (single partition's file tasks) The legacy fields (file_partitions, separate pools, etc.) and split_mode flag are removed since all Iceberg scans now use per-partition plan injection. Also build pools directly into commonBuilder during convert() instead of building into icebergScanBuilder and copying. Co-Authored-By: Claude Opus 4.5 --- PR_DESCRIPTION.md | 60 +++++++++++++++++++ native/core/src/execution/planner.rs | 10 +--- native/proto/src/proto/operator.proto | 34 ++--------- .../operator/CometIcebergNativeScan.scala | 58 +++++++----------- .../sql/comet/CometIcebergSplitRDD.scala | 1 - .../apache/spark/sql/comet/operators.scala | 4 +- 6 files changed, 87 insertions(+), 80 deletions(-) create mode 100644 PR_DESCRIPTION.md diff --git a/PR_DESCRIPTION.md b/PR_DESCRIPTION.md new file mode 100644 index 0000000000..1cb849e02d --- /dev/null +++ b/PR_DESCRIPTION.md @@ -0,0 +1,60 @@ +# Per-Partition Plan Building for Native Iceberg Scans + +## Summary + +This PR ensures that when Iceberg scans are executed natively, each Spark task's native plan only contains file scan tasks for that specific partition, rather than all partitions. + +## Problem + +Previously, when `CometIcebergNativeScanExec` was a child of another operator (e.g., `CometFilterExec`), the parent operator's serialized native plan (`nativeOp`) contained the entire `IcebergScan` message with data for all partitions. This meant: + +- Each task received metadata for all partitions when it only needed one +- The serialized plan size grew linearly with partition count +- Every task deserialized unnecessary partition data + +## Solution + +This PR implements per-partition plan building that injects only the relevant partition's data at execution time: + +### New Components + +**`CometIcebergSplitRDD`** - A custom RDD that: +- Holds common data (deduplication pools, catalog properties) in the closure +- Stores each partition's file scan tasks in its `Partition` objects +- Combines common + partition data at compute time to build partition-specific native plans + +**`IcebergPartitionInjector`** - A helper that traverses an `Operator` tree and injects partition data into `IcebergScan` nodes that are missing it + +**`findIcebergSplitData()`** - Locates `CometIcebergNativeScanExec` descendants in the plan tree and retrieves their per-partition data + +### Modified Execution Flow + +In `CometNativeExec.doExecuteColumnar()`: +1. Check if the plan tree contains an `IcebergScan` with per-partition data available +2. If so, for each partition: + - Parse the base operator tree from the serialized plan + - Inject that partition's file scan task data into the `IcebergScan` node + - Re-serialize and pass to native execution +3. Each task's native plan now only contains its own partition's data + +### Protobuf Changes + +Added `IcebergScanCommon` message to hold shared data (pools, metadata) separately from per-partition file scan tasks. The `IcebergScan` message now has: +- `common` field for shared deduplication pools +- `partition` field for a single partition's file tasks + +### Rust Changes + +Simplified the Iceberg scan handling in `planner.rs` to expect common + partition data, removing the code path that read from a list of all partitions. + +## Test Plan + +- [x] All 62 existing Iceberg tests pass +- [x] Filter pushdown tests verify parent operators work correctly +- [x] MOR (Merge-On-Read) tests with positional and equality deletes +- [x] Schema evolution, complex types, partition pruning tests +- [x] REST catalog integration test + +--- + +Generated with [Claude Code](https://claude.ai/code) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 06a03236b5..57d3fc1a49 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1132,16 +1132,8 @@ impl PhysicalPlanner { )) } OpStruct::IcebergScan(scan) => { - // Split mode: extract from embedded common + single partition + // Extract common data and single partition's file tasks // Per-partition injection happens in Scala before sending to native - if !scan.split_mode.unwrap_or(false) { - return Err(GeneralError( - "IcebergScan requires split_mode=true (partition data injected at \ - execution time)" - .into(), - )); - } - let common = scan .common .as_ref() diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 43a32c935e..3e89628d46 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -179,37 +179,11 @@ message IcebergScanCommon { } message IcebergScan { - // Schema to read - repeated SparkStructField required_schema = 1; - - // Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.) - map catalog_properties = 2; + // Common data shared across partitions (pools, metadata, catalog props) + IcebergScanCommon common = 1; - // Pre-planned file scan tasks grouped by Spark partition - repeated IcebergFilePartition file_partitions = 3; - - // Table metadata file path for FileIO initialization - string metadata_location = 4; - - // Deduplication pools - shared data referenced by index from tasks - repeated string schema_pool = 5; - repeated string partition_type_pool = 6; - repeated string partition_spec_pool = 7; - repeated string name_mapping_pool = 8; - repeated ProjectFieldIdList project_field_ids_pool = 9; - repeated PartitionData partition_data_pool = 10; - repeated DeleteFileList delete_files_pool = 11; - repeated spark.spark_expression.Expr residual_pool = 12; - - // Split mode: when true, common data and single partition are embedded - // instead of all partitions in file_partitions - optional bool split_mode = 20; - - // Embedded common data for split mode (pools, metadata, catalog props) - optional IcebergScanCommon common = 21; - - // Single partition's file tasks for split mode - optional IcebergFilePartition partition = 22; + // Single partition's file scan tasks + IcebergFilePartition partition = 2; } // Helper message for deduplicating field ID lists diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index ce84a26fc7..e44263bc30 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -315,7 +315,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit contentScanTaskClass: Class[_], fileScanTaskClass: Class[_], taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, - icebergScanBuilder: OperatorOuterClass.IcebergScan.Builder, + commonBuilder: OperatorOuterClass.IcebergScanCommon.Builder, partitionTypeToPoolIndex: mutable.HashMap[String, Int], partitionSpecToPoolIndex: mutable.HashMap[String, Int], partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = { @@ -340,7 +340,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val specIdx = partitionSpecToPoolIndex.getOrElseUpdate( partitionSpecJson, { val idx = partitionSpecToPoolIndex.size - icebergScanBuilder.addPartitionSpecPool(partitionSpecJson) + commonBuilder.addPartitionSpecPool(partitionSpecJson) idx }) taskBuilder.setPartitionSpecIdx(specIdx) @@ -421,7 +421,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val typeIdx = partitionTypeToPoolIndex.getOrElseUpdate( partitionTypeJson, { val idx = partitionTypeToPoolIndex.size - icebergScanBuilder.addPartitionTypePool(partitionTypeJson) + commonBuilder.addPartitionTypePool(partitionTypeJson) idx }) taskBuilder.setPartitionTypeIdx(typeIdx) @@ -476,7 +476,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val partitionDataIdx = partitionDataToPoolIndex.getOrElseUpdate( partitionDataKey, { val idx = partitionDataToPoolIndex.size - icebergScanBuilder.addPartitionDataPool(partitionDataProto) + commonBuilder.addPartitionDataPool(partitionDataProto) idx }) taskBuilder.setPartitionDataIdx(partitionDataIdx) @@ -688,6 +688,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit builder: Operator.Builder, childOp: Operator*): Option[OperatorOuterClass.Operator] = { val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder() + // commonBuilder holds shared data (pools, metadata) - built throughout this method + val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() // Deduplication structures - map unique values to pool indices val schemaToPoolIndex = mutable.HashMap[AnyRef, Int]() @@ -716,10 +718,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Use pre-extracted metadata (no reflection needed) - icebergScanBuilder.setMetadataLocation(metadata.metadataLocation) + commonBuilder.setMetadataLocation(metadata.metadataLocation) metadata.catalogProperties.foreach { case (key, value) => - icebergScanBuilder.putCatalogProperties(key, value) + commonBuilder.putCatalogProperties(key, value) } // Set required_schema from output @@ -729,7 +731,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit .setName(attr.name) .setNullable(attr.nullable) serializeDataType(attr.dataType).foreach(field.setDataType) - icebergScanBuilder.addRequiredSchema(field.build()) + commonBuilder.addRequiredSchema(field.build()) } // Extract FileScanTasks from the InputPartitions in the RDD @@ -866,7 +868,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit schema, { val idx = schemaToPoolIndex.size val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] - icebergScanBuilder.addSchemaPool(schemaJson) + commonBuilder.addSchemaPool(schemaJson) idx }) taskBuilder.setSchemaIdx(schemaIdx) @@ -895,7 +897,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val idx = projectFieldIdsToPoolIndex.size val listBuilder = OperatorOuterClass.ProjectFieldIdList.newBuilder() projectFieldIds.foreach(id => listBuilder.addFieldIds(id)) - icebergScanBuilder.addProjectFieldIdsPool(listBuilder.build()) + commonBuilder.addProjectFieldIdsPool(listBuilder.build()) idx }) taskBuilder.setProjectFieldIdsIdx(projectFieldIdsIdx) @@ -918,7 +920,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val idx = deleteFilesToPoolIndex.size val listBuilder = OperatorOuterClass.DeleteFileList.newBuilder() deleteFilesList.foreach(df => listBuilder.addDeleteFiles(df)) - icebergScanBuilder.addDeleteFilesPool(listBuilder.build()) + commonBuilder.addDeleteFilesPool(listBuilder.build()) idx }) taskBuilder.setDeleteFilesIdx(deleteFilesIdx) @@ -947,7 +949,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val residualIdx = residualToPoolIndex.getOrElseUpdate( Some(residualExpr), { val idx = residualToPoolIndex.size - icebergScanBuilder.addResidualPool(residualExpr) + commonBuilder.addResidualPool(residualExpr) idx }) taskBuilder.setResidualIdx(residualIdx) @@ -959,7 +961,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit contentScanTaskClass, fileScanTaskClass, taskBuilder, - icebergScanBuilder, + commonBuilder, partitionTypeToPoolIndex, partitionSpecToPoolIndex, partitionDataToPoolIndex) @@ -969,7 +971,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val nmIdx = nameMappingToPoolIndex.getOrElseUpdate( nm, { val idx = nameMappingToPoolIndex.size - icebergScanBuilder.addNameMappingPool(nm) + commonBuilder.addNameMappingPool(nm) idx }) taskBuilder.setNameMappingIdx(nmIdx) @@ -1022,7 +1024,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Calculate partition data pool size in bytes (protobuf format) - val partitionDataPoolBytes = icebergScanBuilder.getPartitionDataPoolList.asScala + val partitionDataPoolBytes = commonBuilder.getPartitionDataPoolList.asScala .map(_.getSerializedSize) .sum @@ -1033,29 +1035,9 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit s"$partitionDataPoolBytes bytes (protobuf)") } - // Build common data (pools, metadata) for split mode - val commonBuilder = OperatorOuterClass.IcebergScanCommon.newBuilder() - commonBuilder.setMetadataLocation(icebergScanBuilder.getMetadataLocation) - commonBuilder.putAllCatalogProperties(icebergScanBuilder.getCatalogPropertiesMap) - icebergScanBuilder.getRequiredSchemaList.forEach(f => commonBuilder.addRequiredSchema(f)) - icebergScanBuilder.getSchemaPoolList.forEach(s => commonBuilder.addSchemaPool(s)) - icebergScanBuilder.getPartitionTypePoolList.forEach(s => - commonBuilder.addPartitionTypePool(s)) - icebergScanBuilder.getPartitionSpecPoolList.forEach(s => - commonBuilder.addPartitionSpecPool(s)) - icebergScanBuilder.getNameMappingPoolList.forEach(s => commonBuilder.addNameMappingPool(s)) - icebergScanBuilder.getProjectFieldIdsPoolList.forEach { p => - commonBuilder.addProjectFieldIdsPool(p) - } - icebergScanBuilder.getPartitionDataPoolList.forEach(p => - commonBuilder.addPartitionDataPool(p)) - icebergScanBuilder.getDeleteFilesPoolList.forEach(d => commonBuilder.addDeleteFilesPool(d)) - icebergScanBuilder.getResidualPoolList.forEach(r => commonBuilder.addResidualPool(r)) - - // Set split mode and embed common data (partition injected at execution time) - icebergScanBuilder.setSplitMode(true) + // Embed common data into IcebergScan (partition is injected at execution time) icebergScanBuilder.setCommon(commonBuilder.build()) - // Note: partition is NOT set here - it will be injected at execution time + // Note: partition is NOT set here - it gets injected per-partition at execution time // Store per-partition data for injection at execution time val commonBytes = commonBuilder.build().toByteArray @@ -1076,8 +1058,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit "Metadata should have been extracted in CometScanRule.") } - // Extract metadataLocation from the native operator - val metadataLocation = nativeOp.getIcebergScan.getMetadataLocation + // Extract metadataLocation from the native operator's common data + val metadataLocation = nativeOp.getIcebergScan.getCommon.getMetadataLocation // Retrieve split data from thread-local (set during convert()) val splitData = splitDataThreadLocal.get().getOrElse { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala index f5131bbc50..badc65893d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala @@ -114,7 +114,6 @@ object CometIcebergSplitRDD { val partition = IcebergFilePartition.parseFrom(partitionBytes) val scanBuilder = OperatorOuterClass.IcebergScan.newBuilder() - scanBuilder.setSplitMode(true) scanBuilder.setCommon(common) scanBuilder.setPartition(partition) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index c3824f1215..cb6bcfc48a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -78,10 +78,10 @@ private[comet] object IcebergPartitionInjector { def injectPartitionData(op: Operator, partitionBytes: Array[Byte]): Operator = { val builder = op.toBuilder - // If this is an IcebergScan with split_mode=true and no partition, inject it + // If this is an IcebergScan without partition data, inject it if (op.hasIcebergScan) { val scan = op.getIcebergScan - if (scan.getSplitMode && !scan.hasPartition) { + if (!scan.hasPartition) { val partition = IcebergFilePartition.parseFrom(partitionBytes) val scanBuilder = scan.toBuilder scanBuilder.setPartition(partition) From a5b03d7a89728275fc4b6d6639f2676721910648 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 09:49:32 -0700 Subject: [PATCH 13/23] Remove PR_DESCRIPTION.md Co-Authored-By: Claude Opus 4.5 --- PR_DESCRIPTION.md | 60 ----------------------------------------------- 1 file changed, 60 deletions(-) delete mode 100644 PR_DESCRIPTION.md diff --git a/PR_DESCRIPTION.md b/PR_DESCRIPTION.md deleted file mode 100644 index 1cb849e02d..0000000000 --- a/PR_DESCRIPTION.md +++ /dev/null @@ -1,60 +0,0 @@ -# Per-Partition Plan Building for Native Iceberg Scans - -## Summary - -This PR ensures that when Iceberg scans are executed natively, each Spark task's native plan only contains file scan tasks for that specific partition, rather than all partitions. - -## Problem - -Previously, when `CometIcebergNativeScanExec` was a child of another operator (e.g., `CometFilterExec`), the parent operator's serialized native plan (`nativeOp`) contained the entire `IcebergScan` message with data for all partitions. This meant: - -- Each task received metadata for all partitions when it only needed one -- The serialized plan size grew linearly with partition count -- Every task deserialized unnecessary partition data - -## Solution - -This PR implements per-partition plan building that injects only the relevant partition's data at execution time: - -### New Components - -**`CometIcebergSplitRDD`** - A custom RDD that: -- Holds common data (deduplication pools, catalog properties) in the closure -- Stores each partition's file scan tasks in its `Partition` objects -- Combines common + partition data at compute time to build partition-specific native plans - -**`IcebergPartitionInjector`** - A helper that traverses an `Operator` tree and injects partition data into `IcebergScan` nodes that are missing it - -**`findIcebergSplitData()`** - Locates `CometIcebergNativeScanExec` descendants in the plan tree and retrieves their per-partition data - -### Modified Execution Flow - -In `CometNativeExec.doExecuteColumnar()`: -1. Check if the plan tree contains an `IcebergScan` with per-partition data available -2. If so, for each partition: - - Parse the base operator tree from the serialized plan - - Inject that partition's file scan task data into the `IcebergScan` node - - Re-serialize and pass to native execution -3. Each task's native plan now only contains its own partition's data - -### Protobuf Changes - -Added `IcebergScanCommon` message to hold shared data (pools, metadata) separately from per-partition file scan tasks. The `IcebergScan` message now has: -- `common` field for shared deduplication pools -- `partition` field for a single partition's file tasks - -### Rust Changes - -Simplified the Iceberg scan handling in `planner.rs` to expect common + partition data, removing the code path that read from a list of all partitions. - -## Test Plan - -- [x] All 62 existing Iceberg tests pass -- [x] Filter pushdown tests verify parent operators work correctly -- [x] MOR (Merge-On-Read) tests with positional and equality deletes -- [x] Schema evolution, complex types, partition pruning tests -- [x] REST catalog integration test - ---- - -Generated with [Claude Code](https://claude.ai/code) From 3624aafe894c5cf640231f63404b114596834b0f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 10:50:00 -0700 Subject: [PATCH 14/23] fix: Use perPartitionData.length for Iceberg partition count When outputPartitioning is KeyGroupedPartitioning, the reported partition count can differ from the actual physical partition count in the RDD. This caused ArrayIndexOutOfBoundsException when partitionIndex exceeded perPartitionData.length. Fix by using perPartitionData.length as the source of truth for partition count, ensuring consistency between the serialized per-partition data and the number of partitions reported by the operator. Co-Authored-By: Claude Opus 4.5 --- .../spark/sql/comet/CometIcebergNativeScanExec.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index d5c6417b2d..e04f39682e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -208,10 +208,12 @@ object CometIcebergNativeScanExec { commonData: Array[Byte], perPartitionData: Array[Array[Byte]]): CometIcebergNativeScanExec = { - val numParts = scanExec.outputPartitioning match { - case p: KeyGroupedPartitioning => p.numPartitions - case _ => scanExec.inputRDD.getNumPartitions - } + // Use perPartitionData.length as the source of truth for partition count. + // This ensures consistency between the serialized per-partition data and + // the number of partitions reported by this operator. + // Note: scanExec.outputPartitioning (KeyGroupedPartitioning) may report + // a different count due to Iceberg's logical partitioning scheme. + val numParts = perPartitionData.length val exec = CometIcebergNativeScanExec( nativeOp, From 3d53a56b25aca7676d8fc2785e04d100ded16c75 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 11:56:08 -0700 Subject: [PATCH 15/23] fix: Override convertBlock() to preserve transient fields The parent CometNativeExec.convertBlock() uses makeCopy() which loses @transient fields. Override it in CometIcebergNativeScanExec to preserve commonData and perPartitionData fields which are needed for split-mode partition injection. This fixes ArrayIndexOutOfBoundsException when the plan transformation in CometExecRule.scala created a copy without the split serialization data. Co-Authored-By: Claude Opus 4.5 --- .../comet/CometIcebergNativeScanExec.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index e04f39682e..2d4892c6be 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -162,6 +162,32 @@ case class CometIcebergNativeScanExec( CometIcebergSplitRDD(sparkContext, commonData, perPartitionData, output.length, nativeMetrics) } + /** + * Override convertBlock to preserve @transient fields (commonData, perPartitionData). The + * parent implementation uses makeCopy() which loses transient fields. + */ + override def convertBlock(): CometIcebergNativeScanExec = { + // Serialize the native plan if not already done + val newSerializedPlan = if (serializedPlanOpt.isEmpty) { + val bytes = CometExec.serializeNativePlan(nativeOp) + SerializedPlan(Some(bytes)) + } else { + serializedPlanOpt + } + + // Create new instance preserving transient fields + CometIcebergNativeScanExec( + nativeOp, + output, + originalPlan, + newSerializedPlan, + metadataLocation, + numPartitions, + nativeIcebergScanMetadata, + commonData, + perPartitionData) + } + override protected def doCanonicalize(): CometIcebergNativeScanExec = { CometIcebergNativeScanExec( nativeOp, From 1b78f7524d38e53a0c06794e9bd0c3c3b743586f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 12:56:12 -0700 Subject: [PATCH 16/23] style: Remove unused KeyGroupedPartitioning import --- .../org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 2d4892c6be..5a3a6b8494 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch From 63e3ad6dfc9c51985de5990ec9a390fe6029f246 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 13:16:43 -0700 Subject: [PATCH 17/23] trigger CI From 76b60479d833ec9b4c2e64ce3653fa466b363b44 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 15:24:31 -0700 Subject: [PATCH 18/23] Handle partition index out of bounds for joins/multi-input scenarios When using CometIcebergNativeScanExec in join operations with ZippedPartitionsRDD, the partition index may exceed perPartitionData bounds since tables can have different partition counts. This adds bounds checking to inject an empty IcebergFilePartition for out-of-bounds indices, preventing ArrayIndexOutOfBoundsException. Co-Authored-By: Claude Opus 4.5 --- .../org/apache/spark/sql/comet/operators.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index cb6bcfc48a..c0d4120276 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -355,9 +355,16 @@ abstract class CometNativeExec extends CometExec { case Some(perPartitionData) => // Parse the base plan, inject partition data, and re-serialize val basePlan = OperatorOuterClass.Operator.parseFrom(serializedPlanCopy) - val injected = IcebergPartitionInjector.injectPartitionData( - basePlan, - perPartitionData(partitionIndex)) + // For joins/unions, partition indices may exceed perPartitionData bounds. + // In that case, inject an empty partition (no file tasks to read). + val partitionBytes = if (partitionIndex < perPartitionData.length) { + perPartitionData(partitionIndex) + } else { + // Create empty IcebergFilePartition + OperatorOuterClass.IcebergFilePartition.newBuilder().build().toByteArray + } + val injected = + IcebergPartitionInjector.injectPartitionData(basePlan, partitionBytes) IcebergPartitionInjector.serializeOperator(injected) case None => serializedPlanCopy From e20b09ebde697568b03590eedbb4bd09e7eda7e4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 16:29:52 -0700 Subject: [PATCH 19/23] Revert "Handle partition index out of bounds for joins/multi-input scenarios" This reverts commit 76b60479d833ec9b4c2e64ce3653fa466b363b44. --- .../org/apache/spark/sql/comet/operators.scala | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index c0d4120276..cb6bcfc48a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -355,16 +355,9 @@ abstract class CometNativeExec extends CometExec { case Some(perPartitionData) => // Parse the base plan, inject partition data, and re-serialize val basePlan = OperatorOuterClass.Operator.parseFrom(serializedPlanCopy) - // For joins/unions, partition indices may exceed perPartitionData bounds. - // In that case, inject an empty partition (no file tasks to read). - val partitionBytes = if (partitionIndex < perPartitionData.length) { - perPartitionData(partitionIndex) - } else { - // Create empty IcebergFilePartition - OperatorOuterClass.IcebergFilePartition.newBuilder().build().toByteArray - } - val injected = - IcebergPartitionInjector.injectPartitionData(basePlan, partitionBytes) + val injected = IcebergPartitionInjector.injectPartitionData( + basePlan, + perPartitionData(partitionIndex)) IcebergPartitionInjector.serializeOperator(injected) case None => serializedPlanCopy From 6372c1cc7ebcfb2df18f710bfa5a171995595f05 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 18:40:51 -0700 Subject: [PATCH 20/23] fix: Handle joins over multiple Iceberg tables in split serialization The previous implementation of split serialization only collected partition data from one Iceberg scan, causing joins over multiple Iceberg tables to use incorrect partition data for the second table. Changes: - Rename findIcebergSplitData to findAllIcebergSplitData - Return Map[String, Array[Array[Byte]]] keyed by metadataLocation instead of Option[Array[Array[Byte]]] to collect data from ALL Iceberg scans - Update IcebergPartitionInjector.injectPartitionData to take the map and match each IcebergScan by its metadata_location from common data - Add stage boundary checks to prevent partition index misalignment This fixes TestStoragePartitionedJoins.testJoinsWithMultipleTransformTypes which was returning 13 rows instead of 26 expected. Co-Authored-By: Claude Opus 4.5 --- .../operator/CometIcebergNativeScan.scala | 1 + .../sql/comet/CometIcebergSplitRDD.scala | 28 ++++- .../apache/spark/sql/comet/operators.scala | 113 +++++++++++++----- 3 files changed, 108 insertions(+), 34 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index e44263bc30..84297a244a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -1042,6 +1042,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Store per-partition data for injection at execution time val commonBytes = commonBuilder.build().toByteArray val perPartitionBytes = perPartitionBuilders.map(_.toByteArray).toArray + splitDataThreadLocal.set(Some(SplitData(commonBytes, perPartitionBytes))) builder.clearChildren() diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala index badc65893d..a49738fc36 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.comet import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.vectorized.ColumnarBatch @@ -52,10 +53,16 @@ private[spark] class CometIcebergSplitRDD( @transient perPartitionData: Array[Array[Byte]], numParts: Int, var computeFunc: (Array[Byte], CometMetricNode, Int, Int) => Iterator[ColumnarBatch]) - extends RDD[ColumnarBatch](sc, Nil) { + extends RDD[ColumnarBatch](sc, Nil) + with Logging { override protected def getPartitions: Array[Partition] = { + logInfo( + s"[ICEBERG-DEBUG] getPartitions called: numParts=$numParts, " + + s"perPartitionData.length=${perPartitionData.length}") perPartitionData.zipWithIndex.map { case (bytes, idx) => + val partition = IcebergFilePartition.parseFrom(bytes) + logInfo(s"[ICEBERG-DEBUG] Partition $idx has ${partition.getFileScanTasksCount} file tasks") new CometIcebergSplitPartition(idx, bytes) } } @@ -63,6 +70,10 @@ private[spark] class CometIcebergSplitRDD( override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { val partition = split.asInstanceOf[CometIcebergSplitPartition] + logInfo( + s"[ICEBERG-DEBUG] compute called for partition ${partition.index}, " + + s"numParts=$numParts, partitionBytes.length=${partition.partitionBytes.length}") + val combinedPlan = CometIcebergSplitRDD.buildCombinedPlan(commonData, partition.partitionBytes) @@ -79,7 +90,7 @@ private[spark] class CometIcebergSplitRDD( } } -object CometIcebergSplitRDD { +object CometIcebergSplitRDD extends Logging { def apply( sc: SparkContext, @@ -88,6 +99,19 @@ object CometIcebergSplitRDD { numOutputCols: Int, nativeMetrics: CometMetricNode): CometIcebergSplitRDD = { + logInfo( + s"[ICEBERG-DEBUG] CometIcebergSplitRDD.apply: " + + s"perPartitionData.length=${perPartitionData.length}, " + + s"commonData.length=${commonData.length}, numOutputCols=$numOutputCols") + + // Log details about each partition's file tasks + perPartitionData.zipWithIndex.foreach { case (bytes, idx) => + val partition = IcebergFilePartition.parseFrom(bytes) + logInfo( + s"[ICEBERG-DEBUG] apply: partition $idx has " + + s"${partition.getFileScanTasksCount} file scan tasks") + } + // Create compute function that captures nativeMetrics in its closure val computeFunc = (combinedPlan: Array[Byte], _: CometMetricNode, numParts: Int, partIndex: Int) => { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index cb6bcfc48a..e18f3f46ea 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -65,34 +65,47 @@ import org.apache.comet.serde.operator.CometSink private[comet] object IcebergPartitionInjector { /** - * Injects partition data into an Operator tree by finding IcebergScan nodes with - * split_mode=true and empty partition, then setting the partition field. + * Injects partition data into an Operator tree by finding IcebergScan nodes without partition + * data and setting them using the provided map keyed by metadata_location. + * + * This handles joins over multiple Iceberg tables by matching each IcebergScan with its + * corresponding partition data based on the table's metadata_location. * * @param op * The operator tree to modify - * @param partitionBytes - * Serialized IcebergFilePartition bytes to inject + * @param partitionDataByLocation + * Map of metadataLocation -> partition bytes for this partition index * @return * New operator tree with partition data injected */ - def injectPartitionData(op: Operator, partitionBytes: Array[Byte]): Operator = { + def injectPartitionData( + op: Operator, + partitionDataByLocation: Map[String, Array[Byte]]): Operator = { val builder = op.toBuilder - // If this is an IcebergScan without partition data, inject it + // If this is an IcebergScan without partition data, inject it based on metadata_location if (op.hasIcebergScan) { val scan = op.getIcebergScan - if (!scan.hasPartition) { - val partition = IcebergFilePartition.parseFrom(partitionBytes) - val scanBuilder = scan.toBuilder - scanBuilder.setPartition(partition) - builder.setIcebergScan(scanBuilder) + if (!scan.hasPartition && scan.hasCommon) { + val metadataLocation = scan.getCommon.getMetadataLocation + partitionDataByLocation.get(metadataLocation) match { + case Some(partitionBytes) => + val partition = IcebergFilePartition.parseFrom(partitionBytes) + val scanBuilder = scan.toBuilder + scanBuilder.setPartition(partition) + builder.setIcebergScan(scanBuilder) + case None => + // No partition data for this scan - this shouldn't happen in split mode + throw new CometRuntimeException( + s"No partition data found for Iceberg scan with metadata_location: $metadataLocation") + } } } // Recursively process children builder.clearChildren() op.getChildrenList.asScala.foreach { child => - builder.addChildren(injectPartitionData(child, partitionBytes)) + builder.addChildren(injectPartitionData(child, partitionDataByLocation)) } builder.build() @@ -302,10 +315,6 @@ abstract class CometNativeExec extends CometExec { // TODO: support native metrics for all operators. val nativeMetrics = CometMetricNode.fromCometPlan(this) - // Check for IcebergScan with split mode data that needs per-partition injection - val icebergSplitData: Option[Array[Array[Byte]]] = - findIcebergSplitData(this) - // Go over all the native scans, in order to see if they need encryption options. // For each relation in a CometNativeScan generate a hadoopConf, // for each file path in a relation associate with hadoopConf @@ -346,21 +355,41 @@ abstract class CometNativeExec extends CometExec { case None => (None, Seq.empty) } + // Check for IcebergScan with split mode data that needs per-partition injection. + // Only look within the current stage (stop at shuffle boundaries). + // Returns a map of metadataLocation -> perPartitionData to handle joins over + // multiple Iceberg tables. + val icebergSplitDataByLocation: Map[String, Array[Array[Byte]]] = + findAllIcebergSplitData(this) + def createCometExecIter( inputs: Seq[Iterator[ColumnarBatch]], numParts: Int, partitionIndex: Int): CometExecIterator = { - // Get the actual serialized plan - either shared or per-partition - val actualPlan = icebergSplitData match { - case Some(perPartitionData) => - // Parse the base plan, inject partition data, and re-serialize - val basePlan = OperatorOuterClass.Operator.parseFrom(serializedPlanCopy) - val injected = IcebergPartitionInjector.injectPartitionData( - basePlan, - perPartitionData(partitionIndex)) - IcebergPartitionInjector.serializeOperator(injected) - case None => - serializedPlanCopy + // Get the actual serialized plan - either shared or per-partition injected + // Only inject partition data when there are NO inputs - if there are inputs, + // the Iceberg scans already executed via their own RDDs with per-partition data + val actualPlan = if (icebergSplitDataByLocation.nonEmpty && inputs.isEmpty) { + // Build a map of metadataLocation -> partitionBytes for this partition index + val partitionDataByLocation = icebergSplitDataByLocation.map { + case (metadataLocation, perPartitionData) => + if (partitionIndex < perPartitionData.length) { + metadataLocation -> perPartitionData(partitionIndex) + } else { + throw new CometRuntimeException( + s"Partition index $partitionIndex out of bounds for Iceberg scan " + + s"with metadata_location $metadataLocation " + + s"(${perPartitionData.length} partitions)") + } + } + // No inputs = Iceberg scan is part of native plan, inject partition data + val basePlan = OperatorOuterClass.Operator.parseFrom(serializedPlanCopy) + val injected = + IcebergPartitionInjector.injectPartitionData(basePlan, partitionDataByLocation) + IcebergPartitionInjector.serializeOperator(injected) + } else { + // Has inputs or no split data - use plan as-is + serializedPlanCopy } val it = new CometExecIterator( @@ -510,16 +539,36 @@ abstract class CometNativeExec extends CometExec { } /** - * Find CometIcebergNativeScanExec with split mode data in the plan tree. Returns the - * per-partition data array if found, None otherwise. + * Find ALL CometIcebergNativeScanExec nodes with split mode data in the plan tree. Returns a + * map of metadataLocation -> perPartitionData for all Iceberg scans found. + * + * This supports joins over multiple Iceberg tables by collecting partition data from each scan + * and keying by metadata_location (which is unique per table). + * + * NOTE: This is only used when Iceberg scans are NOT executed via their own RDD. When Iceberg + * scans execute via CometIcebergSplitRDD, the partition data is handled there and this function + * returns an empty map. + * + * Stops at stage boundaries (shuffle exchanges, etc.) because partition indices are only valid + * within the same stage. */ - private def findIcebergSplitData(plan: SparkPlan): Option[Array[Array[Byte]]] = { + private def findAllIcebergSplitData(plan: SparkPlan): Map[String, Array[Array[Byte]]] = { plan match { + // Found an Iceberg scan with split data case iceberg: CometIcebergNativeScanExec if iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty => - Some(iceberg.perPartitionData) + Map(iceberg.metadataLocation -> iceberg.perPartitionData) + + // Stage boundaries - stop searching (partition indices won't align after these) + case _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | + _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | + _: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | + _: CometSparkToColumnarExec => + Map.empty + + // Continue searching through other operators, combining results from all children case _ => - plan.children.flatMap(findIcebergSplitData).headOption + plan.children.flatMap(c => findAllIcebergSplitData(c)).toMap } } From c2698fd2f90045edc87e3806dd852673186005a9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 19:16:46 -0700 Subject: [PATCH 21/23] scalastyle --- .../scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala index a49738fc36..7b185ee614 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala @@ -100,7 +100,7 @@ object CometIcebergSplitRDD extends Logging { nativeMetrics: CometMetricNode): CometIcebergSplitRDD = { logInfo( - s"[ICEBERG-DEBUG] CometIcebergSplitRDD.apply: " + + "[ICEBERG-DEBUG] CometIcebergSplitRDD.apply: " + s"perPartitionData.length=${perPartitionData.length}, " + s"commonData.length=${commonData.length}, numOutputCols=$numOutputCols") From faafb156ef300478a296e972ce1bc3c1ffdd5f1f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 19:20:35 -0700 Subject: [PATCH 22/23] chore: Remove debug logging from CometIcebergSplitRDD Co-Authored-By: Claude Opus 4.5 --- .../sql/comet/CometIcebergSplitRDD.scala | 28 ++----------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala index 7b185ee614..badc65893d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergSplitRDD.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.comet import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.vectorized.ColumnarBatch @@ -53,16 +52,10 @@ private[spark] class CometIcebergSplitRDD( @transient perPartitionData: Array[Array[Byte]], numParts: Int, var computeFunc: (Array[Byte], CometMetricNode, Int, Int) => Iterator[ColumnarBatch]) - extends RDD[ColumnarBatch](sc, Nil) - with Logging { + extends RDD[ColumnarBatch](sc, Nil) { override protected def getPartitions: Array[Partition] = { - logInfo( - s"[ICEBERG-DEBUG] getPartitions called: numParts=$numParts, " + - s"perPartitionData.length=${perPartitionData.length}") perPartitionData.zipWithIndex.map { case (bytes, idx) => - val partition = IcebergFilePartition.parseFrom(bytes) - logInfo(s"[ICEBERG-DEBUG] Partition $idx has ${partition.getFileScanTasksCount} file tasks") new CometIcebergSplitPartition(idx, bytes) } } @@ -70,10 +63,6 @@ private[spark] class CometIcebergSplitRDD( override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { val partition = split.asInstanceOf[CometIcebergSplitPartition] - logInfo( - s"[ICEBERG-DEBUG] compute called for partition ${partition.index}, " + - s"numParts=$numParts, partitionBytes.length=${partition.partitionBytes.length}") - val combinedPlan = CometIcebergSplitRDD.buildCombinedPlan(commonData, partition.partitionBytes) @@ -90,7 +79,7 @@ private[spark] class CometIcebergSplitRDD( } } -object CometIcebergSplitRDD extends Logging { +object CometIcebergSplitRDD { def apply( sc: SparkContext, @@ -99,19 +88,6 @@ object CometIcebergSplitRDD extends Logging { numOutputCols: Int, nativeMetrics: CometMetricNode): CometIcebergSplitRDD = { - logInfo( - "[ICEBERG-DEBUG] CometIcebergSplitRDD.apply: " + - s"perPartitionData.length=${perPartitionData.length}, " + - s"commonData.length=${commonData.length}, numOutputCols=$numOutputCols") - - // Log details about each partition's file tasks - perPartitionData.zipWithIndex.foreach { case (bytes, idx) => - val partition = IcebergFilePartition.parseFrom(bytes) - logInfo( - s"[ICEBERG-DEBUG] apply: partition $idx has " + - s"${partition.getFileScanTasksCount} file scan tasks") - } - // Create compute function that captures nativeMetrics in its closure val computeFunc = (combinedPlan: Array[Byte], _: CometMetricNode, numParts: Int, partIndex: Int) => { From 0a02bb7e001c096a7ac2692a7403616c53784deb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 20:29:42 -0700 Subject: [PATCH 23/23] fix: Handle broadcast joins over Iceberg tables in split serialization Two fixes for split serialization with Iceberg tables: 1. Look inside BroadcastQueryStageExec and CometBroadcastExchangeExec to find Iceberg scan partition data. For broadcast joins, the partition indices still align because broadcast data is replicated to all partitions. 2. Remove the inputs.isEmpty check when deciding to inject partition data. For broadcast joins, there are inputs (the broadcast data), but the native plan still contains IcebergScan nodes that need partition data. This fixes CometFuzzIcebergSuite "join" test which was failing with "IcebergScan missing partition data" for broadcast joins. Co-Authored-By: Claude Opus 4.5 --- .../apache/spark/sql/comet/operators.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index e18f3f46ea..5fb6638c5a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -367,9 +367,8 @@ abstract class CometNativeExec extends CometExec { numParts: Int, partitionIndex: Int): CometExecIterator = { // Get the actual serialized plan - either shared or per-partition injected - // Only inject partition data when there are NO inputs - if there are inputs, - // the Iceberg scans already executed via their own RDDs with per-partition data - val actualPlan = if (icebergSplitDataByLocation.nonEmpty && inputs.isEmpty) { + // Inject partition data if we have any IcebergScans with split data + val actualPlan = if (icebergSplitDataByLocation.nonEmpty) { // Build a map of metadataLocation -> partitionBytes for this partition index val partitionDataByLocation = icebergSplitDataByLocation.map { case (metadataLocation, perPartitionData) => @@ -382,13 +381,13 @@ abstract class CometNativeExec extends CometExec { s"(${perPartitionData.length} partitions)") } } - // No inputs = Iceberg scan is part of native plan, inject partition data + // Inject partition data into IcebergScan nodes in the native plan val basePlan = OperatorOuterClass.Operator.parseFrom(serializedPlanCopy) val injected = IcebergPartitionInjector.injectPartitionData(basePlan, partitionDataByLocation) IcebergPartitionInjector.serializeOperator(injected) } else { - // Has inputs or no split data - use plan as-is + // No split data - use plan as-is serializedPlanCopy } @@ -559,11 +558,18 @@ abstract class CometNativeExec extends CometExec { if iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty => Map(iceberg.metadataLocation -> iceberg.perPartitionData) + // For broadcast stages, we CAN look inside because broadcast data is replicated + // to all partitions, so partition indices align. This handles broadcast joins + // over Iceberg tables. + case bqs: BroadcastQueryStageExec => + findAllIcebergSplitData(bqs.plan) + case cbe: CometBroadcastExchangeExec => + cbe.children.flatMap(c => findAllIcebergSplitData(c)).toMap + // Stage boundaries - stop searching (partition indices won't align after these) case _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | - _: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | - _: CometSparkToColumnarExec => + _: ReusedExchangeExec | _: CometSparkToColumnarExec => Map.empty // Continue searching through other operators, combining results from all children