diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 2d772063e4..490eae345d 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -29,6 +29,151 @@ import org.apache.spark.internal.Logging */ object IcebergReflection extends Logging { + /** + * Cache for reflection classes and methods to avoid repeated lookups. All Iceberg reflection + * operations are expensive, so we cache them once and reuse. + */ + case class ReflectionCache( + // Iceberg classes + contentScanTaskClass: Class[_], + fileScanTaskClass: Class[_], + contentFileClass: Class[_], + deleteFileClass: Class[_], + schemaParserClass: Class[_], + schemaClass: Class[_], + partitionSpecParserClass: Class[_], + partitionSpecClass: Class[_], + structLikeClass: Class[_], + nestedFieldClass: Class[_], + // ContentScanTask methods + fileMethod: java.lang.reflect.Method, + startMethod: java.lang.reflect.Method, + lengthMethod: java.lang.reflect.Method, + partitionMethod: java.lang.reflect.Method, + residualMethod: java.lang.reflect.Method, + // FileScanTask methods + taskSchemaMethod: java.lang.reflect.Method, + deletesMethod: java.lang.reflect.Method, + specMethod: java.lang.reflect.Method, + // ContentFile methods + fileLocationMethod: java.lang.reflect.Method, + // DeleteFile methods + deleteContentMethod: java.lang.reflect.Method, + deleteSpecIdMethod: java.lang.reflect.Method, + deleteEqualityIdsMethod: java.lang.reflect.Method, + // Schema methods + schemaToJsonMethod: java.lang.reflect.Method, + // PartitionSpec methods + partitionSpecToJsonMethod: java.lang.reflect.Method, + partitionTypeMethod: java.lang.reflect.Method, + // StructType methods + structTypeFieldsMethod: java.lang.reflect.Method, + // NestedField methods + nestedFieldTypeMethod: java.lang.reflect.Method, + nestedFieldIdMethod: java.lang.reflect.Method, + nestedFieldNameMethod: java.lang.reflect.Method, + nestedFieldIsOptionalMethod: java.lang.reflect.Method, + // StructLike methods + structLikeGetMethod: java.lang.reflect.Method) + + /** + * Creates a ReflectionCache by loading all classes and methods once. + */ + def createReflectionCache(): ReflectionCache = { + // scalastyle:off classforname + val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK) + val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK) + val contentFileClass = Class.forName(ClassNames.CONTENT_FILE) + val deleteFileClass = Class.forName(ClassNames.DELETE_FILE) + val schemaParserClass = Class.forName(ClassNames.SCHEMA_PARSER) + val schemaClass = Class.forName(ClassNames.SCHEMA) + val partitionSpecParserClass = Class.forName(ClassNames.PARTITION_SPEC_PARSER) + val partitionSpecClass = Class.forName(ClassNames.PARTITION_SPEC) + val structTypeClass = Class.forName(ClassNames.STRUCT_TYPE) + val nestedFieldClass = Class.forName(ClassNames.NESTED_FIELD) + val structLikeClass = Class.forName(ClassNames.STRUCT_LIKE) + // scalastyle:on classforname + + // ContentScanTask methods + val fileMethod = contentScanTaskClass.getMethod("file") + val startMethod = contentScanTaskClass.getMethod("start") + val lengthMethod = contentScanTaskClass.getMethod("length") + val partitionMethod = contentScanTaskClass.getMethod("partition") + val residualMethod = contentScanTaskClass.getMethod("residual") + + // FileScanTask methods + val taskSchemaMethod = fileScanTaskClass.getMethod("schema") + val deletesMethod = fileScanTaskClass.getMethod("deletes") + val specMethod = fileScanTaskClass.getMethod("spec") + + // ContentFile methods - try location() first, fall back to path() + val fileLocationMethod = + try { + contentFileClass.getMethod("location") + } catch { + case _: NoSuchMethodException => contentFileClass.getMethod("path") + } + + // DeleteFile methods + val deleteContentMethod = deleteFileClass.getMethod("content") + val deleteSpecIdMethod = deleteFileClass.getMethod("specId") + val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds") + + // Schema serialization + val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) + schemaToJsonMethod.setAccessible(true) + + // PartitionSpec methods + val partitionSpecToJsonMethod = + partitionSpecParserClass.getMethod("toJson", partitionSpecClass) + val partitionTypeMethod = partitionSpecClass.getMethod("partitionType") + + // StructType methods + val structTypeFieldsMethod = structTypeClass.getMethod("fields") + + // NestedField methods + val nestedFieldTypeMethod = nestedFieldClass.getMethod("type") + val nestedFieldIdMethod = nestedFieldClass.getMethod("fieldId") + val nestedFieldNameMethod = nestedFieldClass.getMethod("name") + val nestedFieldIsOptionalMethod = nestedFieldClass.getMethod("isOptional") + + // StructLike methods + val structLikeGetMethod = structLikeClass.getMethod("get", classOf[Int], classOf[Class[_]]) + + ReflectionCache( + contentScanTaskClass = contentScanTaskClass, + fileScanTaskClass = fileScanTaskClass, + contentFileClass = contentFileClass, + deleteFileClass = deleteFileClass, + schemaParserClass = schemaParserClass, + schemaClass = schemaClass, + partitionSpecParserClass = partitionSpecParserClass, + partitionSpecClass = partitionSpecClass, + structLikeClass = structLikeClass, + nestedFieldClass = nestedFieldClass, + fileMethod = fileMethod, + startMethod = startMethod, + lengthMethod = lengthMethod, + partitionMethod = partitionMethod, + residualMethod = residualMethod, + taskSchemaMethod = taskSchemaMethod, + deletesMethod = deletesMethod, + specMethod = specMethod, + fileLocationMethod = fileLocationMethod, + deleteContentMethod = deleteContentMethod, + deleteSpecIdMethod = deleteSpecIdMethod, + deleteEqualityIdsMethod = deleteEqualityIdsMethod, + schemaToJsonMethod = schemaToJsonMethod, + partitionSpecToJsonMethod = partitionSpecToJsonMethod, + partitionTypeMethod = partitionTypeMethod, + structTypeFieldsMethod = structTypeFieldsMethod, + nestedFieldTypeMethod = nestedFieldTypeMethod, + nestedFieldIdMethod = nestedFieldIdMethod, + nestedFieldNameMethod = nestedFieldNameMethod, + nestedFieldIsOptionalMethod = nestedFieldIsOptionalMethod, + structLikeGetMethod = structLikeGetMethod) + } + /** * Iceberg class names used throughout Comet. */ @@ -46,6 +191,8 @@ object IcebergReflection extends Logging { val PARTITION_SPEC = "org.apache.iceberg.PartitionSpec" val PARTITION_FIELD = "org.apache.iceberg.PartitionField" val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate" + val STRUCT_TYPE = "org.apache.iceberg.types.Types$StructType" + val NESTED_FIELD = "org.apache.iceberg.types.Types$NestedField" } /** diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 7238f8ae8c..c68a2d61d5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -218,68 +218,63 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } /** - * Extracts delete files from an Iceberg FileScanTask as a list (for deduplication). + * Extracts delete files from an Iceberg FileScanTask as a list (for deduplication). Uses cached + * reflection methods for performance. */ private def extractDeleteFilesList( - task: Any, - contentFileClass: Class[_], - fileScanTaskClass: Class[_]): Seq[OperatorOuterClass.IcebergDeleteFile] = { + deletes: java.util.List[_], + cache: IcebergReflection.ReflectionCache): Seq[OperatorOuterClass.IcebergDeleteFile] = { try { - // scalastyle:off classforname - val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) - // scalastyle:on classforname - - val deletes = IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - deletes.asScala.flatMap { deleteFile => try { - IcebergReflection - .extractFileLocation(contentFileClass, deleteFile) - .map { deletePath => - val deleteBuilder = - OperatorOuterClass.IcebergDeleteFile.newBuilder() - deleteBuilder.setFilePath(deletePath) - - val contentType = - try { - val contentMethod = deleteFileClass.getMethod("content") - val content = contentMethod.invoke(deleteFile) - content.toString match { - case IcebergReflection.ContentTypes.POSITION_DELETES => - IcebergReflection.ContentTypes.POSITION_DELETES - case IcebergReflection.ContentTypes.EQUALITY_DELETES => - IcebergReflection.ContentTypes.EQUALITY_DELETES - case other => other - } - } catch { - case _: Exception => - IcebergReflection.ContentTypes.POSITION_DELETES - } - deleteBuilder.setContentType(contentType) - - val specId = - try { - val specIdMethod = deleteFileClass.getMethod("specId") - specIdMethod.invoke(deleteFile).asInstanceOf[Int] - } catch { - case _: Exception => - 0 - } - deleteBuilder.setPartitionSpecId(specId) + // Use cached fileLocationMethod + val deletePath = cache.fileLocationMethod.invoke(deleteFile) match { + case s: String => s + case cs: CharSequence => cs.toString + case _ => return Seq.empty + } - try { - val equalityIdsMethod = - deleteFileClass.getMethod("equalityFieldIds") - val equalityIds = equalityIdsMethod - .invoke(deleteFile) - .asInstanceOf[java.util.List[Integer]] - equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) - } catch { - case _: Exception => + val deleteBuilder = OperatorOuterClass.IcebergDeleteFile.newBuilder() + deleteBuilder.setFilePath(deletePath) + + // Use cached deleteContentMethod + val contentType = + try { + val content = cache.deleteContentMethod.invoke(deleteFile) + content.toString match { + case IcebergReflection.ContentTypes.POSITION_DELETES => + IcebergReflection.ContentTypes.POSITION_DELETES + case IcebergReflection.ContentTypes.EQUALITY_DELETES => + IcebergReflection.ContentTypes.EQUALITY_DELETES + case other => other } + } catch { + case _: Exception => IcebergReflection.ContentTypes.POSITION_DELETES + } + deleteBuilder.setContentType(contentType) - deleteBuilder.build() + // Use cached deleteSpecIdMethod + val specId = + try { + cache.deleteSpecIdMethod.invoke(deleteFile).asInstanceOf[Int] + } catch { + case _: Exception => 0 + } + deleteBuilder.setPartitionSpecId(specId) + + // Use cached deleteEqualityIdsMethod + try { + val equalityIds = cache.deleteEqualityIdsMethod + .invoke(deleteFile) + .asInstanceOf[java.util.List[Integer]] + if (equalityIds != null) { + equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) } + } catch { + case _: Exception => + } + + Some(deleteBuilder.build()) } catch { case e: Exception => logWarning(s"Failed to serialize delete file: ${e.getMessage}") @@ -288,9 +283,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit }.toSeq } catch { case e: Exception => - val msg = - "Iceberg reflection failure: Failed to extract deletes from FileScanTask: " + - s"${e.getMessage}" + val msg = "Iceberg reflection failure: " + + s"Failed to extract deletes from FileScanTask: ${e.getMessage}" logError(msg) throw new RuntimeException(msg, e) } @@ -306,33 +300,23 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit */ private def serializePartitionData( task: Any, - contentScanTaskClass: Class[_], - fileScanTaskClass: Class[_], + cache: IcebergReflection.ReflectionCache, taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, icebergScanBuilder: OperatorOuterClass.IcebergScan.Builder, - partitionTypeToPoolIndex: mutable.HashMap[String, Int], - partitionSpecToPoolIndex: mutable.HashMap[String, Int], + partitionTypeToPoolIndex: mutable.HashMap[AnyRef, Int], + partitionSpecToPoolIndex: mutable.HashMap[AnyRef, Int], partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = { try { - val specMethod = fileScanTaskClass.getMethod("spec") - val spec = specMethod.invoke(task) + val spec = cache.specMethod.invoke(task) if (spec != null) { - // Deduplicate partition spec + // Deduplicate partition spec using object identity - only call toJson for new specs try { - // scalastyle:off classforname - val partitionSpecParserClass = - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) - val toJsonMethod = partitionSpecParserClass.getMethod( - "toJson", - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC)) - // scalastyle:on classforname - val partitionSpecJson = toJsonMethod - .invoke(null, spec) - .asInstanceOf[String] - val specIdx = partitionSpecToPoolIndex.getOrElseUpdate( - partitionSpecJson, { + spec.asInstanceOf[AnyRef], { + val partitionSpecJson = cache.partitionSpecToJsonMethod + .invoke(null, spec) + .asInstanceOf[String] val idx = partitionSpecToPoolIndex.size icebergScanBuilder.addPartitionSpecPool(partitionSpecJson) idx @@ -344,80 +328,76 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Get partition data from the task (via file().partition()) - val partitionMethod = contentScanTaskClass.getMethod("partition") - val partitionData = partitionMethod.invoke(task) + val partitionData = cache.partitionMethod.invoke(task) if (partitionData != null) { - // Get the partition type/schema from the spec - val partitionTypeMethod = spec.getClass.getMethod("partitionType") - val partitionType = partitionTypeMethod.invoke(spec) + // Get the partition type/schema from the spec (using cached method) + val partitionType = cache.partitionTypeMethod.invoke(spec) - // Check if partition type has any fields before serializing - val fieldsMethod = partitionType.getClass.getMethod("fields") - val fields = fieldsMethod + // Check if partition type has any fields before serializing (using cached method) + val fields = cache.structTypeFieldsMethod .invoke(partitionType) .asInstanceOf[java.util.List[_]] // Helper to get field type string (shared by both type and data serialization) def getFieldType(field: Any): String = { - val typeMethod = field.getClass.getMethod("type") - typeMethod.invoke(field).toString + cache.nestedFieldTypeMethod.invoke(field).toString } // Only serialize partition type if there are actual partition fields if (!fields.isEmpty) { try { - // Manually build StructType JSON to match iceberg-rust expectations. - // Using Iceberg's SchemaParser.toJson() would include schema-level - // metadata (e.g., "schema-id") that iceberg-rust's StructType - // deserializer rejects. We need pure StructType format: - // {"type":"struct","fields":[...]} - - // Filter out fields with unknown types (dropped partition fields). - // Unknown type fields represent partition columns that have been dropped - // from the schema. Per the Iceberg spec, unknown type fields are not - // stored in data files and iceberg-rust doesn't support deserializing - // them. Since these columns are dropped, we don't need to expose their - // partition values when reading. - val fieldsJson = fields.asScala.flatMap { field => - val fieldTypeStr = getFieldType(field) - - // Skip fields with unknown type (dropped partition columns) - if (fieldTypeStr == IcebergReflection.TypeNames.UNKNOWN) { - None - } else { - val fieldIdMethod = field.getClass.getMethod("fieldId") - val fieldId = fieldIdMethod.invoke(field).asInstanceOf[Int] - - val nameMethod = field.getClass.getMethod("name") - val fieldName = nameMethod.invoke(field).asInstanceOf[String] - - val isOptionalMethod = field.getClass.getMethod("isOptional") - val isOptional = - isOptionalMethod.invoke(field).asInstanceOf[Boolean] - val required = !isOptional - - Some( - ("id" -> fieldId) ~ - ("name" -> fieldName) ~ - ("required" -> required) ~ - ("type" -> fieldTypeStr)) - } - }.toList + // Use spec identity for deduplication - same spec = same partition type + // Only build JSON for new specs + val typeIdxOpt = partitionTypeToPoolIndex.get(spec.asInstanceOf[AnyRef]) + val typeIdx = typeIdxOpt.getOrElse { + // Manually build StructType JSON to match iceberg-rust expectations. + // Using Iceberg's SchemaParser.toJson() would include schema-level + // metadata (e.g., "schema-id") that iceberg-rust's StructType + // deserializer rejects. We need pure StructType format: + // {"type":"struct","fields":[...]} + + // Filter out fields with unknown types (dropped partition fields). + // Unknown type fields represent partition columns that have been dropped + // from the schema. Per the Iceberg spec, unknown type fields are not + // stored in data files and iceberg-rust doesn't support deserializing + // them. Since these columns are dropped, we don't need to expose their + // partition values when reading. + val fieldsJson = fields.asScala.flatMap { field => + val fieldTypeStr = getFieldType(field) + + // Skip fields with unknown type (dropped partition columns) + if (fieldTypeStr == IcebergReflection.TypeNames.UNKNOWN) { + None + } else { + val fieldId = cache.nestedFieldIdMethod.invoke(field).asInstanceOf[Int] + val fieldName = cache.nestedFieldNameMethod.invoke(field).asInstanceOf[String] + val isOptional = + cache.nestedFieldIsOptionalMethod.invoke(field).asInstanceOf[Boolean] + val required = !isOptional + + Some( + ("id" -> fieldId) ~ + ("name" -> fieldName) ~ + ("required" -> required) ~ + ("type" -> fieldTypeStr)) + } + }.toList - // Only serialize if we have non-unknown fields - if (fieldsJson.nonEmpty) { - val partitionTypeJson = compact( - render( - ("type" -> "struct") ~ + // Only add to pool if we have non-unknown fields + if (fieldsJson.nonEmpty) { + val partitionTypeJson = compact( + render(("type" -> "struct") ~ ("fields" -> fieldsJson))) - - val typeIdx = partitionTypeToPoolIndex.getOrElseUpdate( - partitionTypeJson, { - val idx = partitionTypeToPoolIndex.size - icebergScanBuilder.addPartitionTypePool(partitionTypeJson) - idx - }) + val idx = partitionTypeToPoolIndex.size + icebergScanBuilder.addPartitionTypePool(partitionTypeJson) + partitionTypeToPoolIndex.put(spec.asInstanceOf[AnyRef], idx) + idx + } else { + -1 // No valid fields + } + } + if (typeIdx >= 0) { taskBuilder.setPartitionTypeIdx(typeIdx) } } catch { @@ -445,12 +425,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit None } else { // Use the partition type's field ID (same as in partition_type_json) - val fieldIdMethod = field.getClass.getMethod("fieldId") - val fieldId = fieldIdMethod.invoke(field).asInstanceOf[Int] + val fieldId = cache.nestedFieldIdMethod.invoke(field).asInstanceOf[Int] - val getMethod = - partitionData.getClass.getMethod("get", classOf[Int], classOf[Class[_]]) - val value = getMethod.invoke(partitionData, Integer.valueOf(idx), classOf[Object]) + val value = cache.structLikeGetMethod + .invoke(partitionData, Integer.valueOf(idx), classOf[Object]) Some(partitionValueToProto(fieldId, fieldTypeStr, value)) } @@ -685,14 +663,15 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Deduplication structures - map unique values to pool indices val schemaToPoolIndex = mutable.HashMap[AnyRef, Int]() - val partitionTypeToPoolIndex = mutable.HashMap[String, Int]() - val partitionSpecToPoolIndex = mutable.HashMap[String, Int]() + val partitionTypeToPoolIndex = mutable.HashMap[AnyRef, Int]() // Use spec identity + val partitionSpecToPoolIndex = mutable.HashMap[AnyRef, Int]() // Use object identity val nameMappingToPoolIndex = mutable.HashMap[String, Int]() val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]() val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64 bytes -> pool index val deleteFilesToPoolIndex = mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]() val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]() + val fieldIdMappingCache = mutable.HashMap[AnyRef, Map[String, Int]]() // Cache per schema var totalTasks = 0 @@ -725,6 +704,9 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Extract FileScanTasks from the InputPartitions in the RDD try { + // Create reflection cache once for all tasks + val cache = IcebergReflection.createReflectionCache() + scan.wrapped.inputRDD match { case rdd: org.apache.spark.sql.execution.datasources.v2.DataSourceRDD => val partitions = rdd.partitions @@ -754,51 +736,39 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit try { val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder() - // scalastyle:off classforname - val contentScanTaskClass = - Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) - val fileScanTaskClass = - Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val contentFileClass = - Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) - // scalastyle:on classforname - - val fileMethod = contentScanTaskClass.getMethod("file") - val dataFile = fileMethod.invoke(task) - - val filePathOpt = - IcebergReflection.extractFileLocation(contentFileClass, dataFile) - - filePathOpt match { - case Some(filePath) => - taskBuilder.setDataFilePath(filePath) - case None => - val msg = - "Iceberg reflection failure: Cannot extract file path from data file" + val dataFile = cache.fileMethod.invoke(task) + + // Use cached fileLocationMethod for performance + val filePath = cache.fileLocationMethod.invoke(dataFile) match { + case s: String => s + case cs: CharSequence => cs.toString + case other => + val msg = "Iceberg reflection failure: " + + s"Unexpected file path type: ${other.getClass}" logError(msg) throw new RuntimeException(msg) } + taskBuilder.setDataFilePath(filePath) - val startMethod = contentScanTaskClass.getMethod("start") - val start = startMethod.invoke(task).asInstanceOf[Long] + val start = cache.startMethod.invoke(task).asInstanceOf[Long] taskBuilder.setStart(start) - val lengthMethod = contentScanTaskClass.getMethod("length") - val length = lengthMethod.invoke(task).asInstanceOf[Long] + val length = cache.lengthMethod.invoke(task).asInstanceOf[Long] taskBuilder.setLength(length) + // Retrieve deletes once for reuse in schema selection and + // delete file serialization + val deletes = + cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] + val hasDeletes = deletes != null && !deletes.isEmpty + try { // Equality deletes require the full table schema to resolve field IDs, // even for columns not in the projection. Schema evolution requires // using the snapshot's schema to correctly read old data files. // These requirements conflict, so we choose based on delete presence. - val taskSchemaMethod = fileScanTaskClass.getMethod("schema") - val taskSchema = taskSchemaMethod.invoke(task) - - val deletes = - IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - val hasDeletes = !deletes.isEmpty + val taskSchema = cache.taskSchemaMethod.invoke(task) // Schema to pass to iceberg-rust's FileScanTask. // This is used by RecordBatchTransformer for field type lookups (e.g., in @@ -843,27 +813,23 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } - // scalastyle:off classforname - val schemaParserClass = - Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) - val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) - // scalastyle:on classforname - val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) - toJsonMethod.setAccessible(true) - // Use object identity for deduplication: Iceberg Schema objects are immutable // and reused across tasks, making identity-based deduplication safe + // Use cached schemaToJsonMethod for performance val schemaIdx = schemaToPoolIndex.getOrElseUpdate( schema, { val idx = schemaToPoolIndex.size - val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] + val schemaJson = + cache.schemaToJsonMethod.invoke(null, schema).asInstanceOf[String] icebergScanBuilder.addSchemaPool(schemaJson) idx }) taskBuilder.setSchemaIdx(schemaIdx) - // Build field ID mapping from the schema we're using - val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema) + // Build field ID mapping from the schema we're using (cached by identity) + val nameToFieldId = fieldIdMappingCache.getOrElseUpdate( + schema, + IcebergReflection.buildFieldIdMapping(schema)) // Extract project_field_ids for scan.output columns. // For schema evolution: try task schema first, then fall back to @@ -900,9 +866,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit throw new RuntimeException(msg, e) } - // Deduplicate delete files - val deleteFilesList = - extractDeleteFilesList(task, contentFileClass, fileScanTaskClass) + // Deduplicate delete files (using deletes already retrieved above) + val deleteFilesList = extractDeleteFilesList(deletes, cache) if (deleteFilesList.nonEmpty) { val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate( deleteFilesList, { @@ -916,10 +881,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Extract and deduplicate residual expression + // Use cached residualMethod for performance val residualExprOpt = try { - val residualMethod = contentScanTaskClass.getMethod("residual") - val residualExpr = residualMethod.invoke(task) + val residualExpr = cache.residualMethod.invoke(task) val catalystExpr = convertIcebergExpression(residualExpr, scan.output) @@ -947,8 +912,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Serialize partition spec and data (field definitions, transforms, values) serializePartitionData( task, - contentScanTaskClass, - fileScanTaskClass, + cache, taskBuilder, icebergScanBuilder, partitionTypeToPoolIndex,