From bf83e7612a16e46425f8f5c98257a48f42f85974 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 08:24:59 -0700 Subject: [PATCH 1/4] perf: Cache reflection lookups in Iceberg serde for 52% faster serialization Cache reflection and computation results to avoid redundant work: 1. ReflectionCache: Cache Class.forName() and getMethod() calls once per convert() instead of per-task (30,000+ times) 2. Partition spec deduplication by object identity: Only call toJson() for new unique specs, not for every task 3. Partition type deduplication by spec identity: Same spec = same partition type, so skip JSON building for duplicate specs 4. Field ID mapping cache: Cache buildFieldIdMapping() results by schema identity to avoid repeated reflection per-column Benchmark results (30,000 tasks): - Original: 34,425 ms per 100 iterations - After caching: 16,618 ms per 100 iterations - Improvement: 52% faster Co-Authored-By: Claude Opus 4.5 --- .../operator/CometIcebergNativeScan.scala | 437 +++++++++++------- 1 file changed, 260 insertions(+), 177 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 7238f8ae8c..174dfe8acd 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -69,6 +69,115 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } + /** + * Cache for reflection classes and methods to avoid repeated lookups. All Iceberg reflection + * operations are expensive, so we cache them once and reuse. + */ + private case class ReflectionCache( + // Iceberg classes + contentScanTaskClass: Class[_], + fileScanTaskClass: Class[_], + contentFileClass: Class[_], + deleteFileClass: Class[_], + schemaParserClass: Class[_], + schemaClass: Class[_], + partitionSpecParserClass: Class[_], + partitionSpecClass: Class[_], + // ContentScanTask methods + fileMethod: java.lang.reflect.Method, + startMethod: java.lang.reflect.Method, + lengthMethod: java.lang.reflect.Method, + partitionMethod: java.lang.reflect.Method, + residualMethod: java.lang.reflect.Method, + // FileScanTask methods + taskSchemaMethod: java.lang.reflect.Method, + deletesMethod: java.lang.reflect.Method, + specMethod: java.lang.reflect.Method, + // ContentFile methods + fileLocationMethod: java.lang.reflect.Method, + // DeleteFile methods + deleteContentMethod: java.lang.reflect.Method, + deleteSpecIdMethod: java.lang.reflect.Method, + deleteEqualityIdsMethod: java.lang.reflect.Method, + // Schema methods + schemaToJsonMethod: java.lang.reflect.Method, + // PartitionSpec methods + partitionSpecToJsonMethod: java.lang.reflect.Method) + + /** + * Creates a ReflectionCache by loading all classes and methods once. + */ + private def createReflectionCache(): ReflectionCache = { + // scalastyle:off classforname + val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) + val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) + val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) + val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) + val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) + val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) + val partitionSpecParserClass = + Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) + val partitionSpecClass = Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC) + // scalastyle:on classforname + + // ContentScanTask methods + val fileMethod = contentScanTaskClass.getMethod("file") + val startMethod = contentScanTaskClass.getMethod("start") + val lengthMethod = contentScanTaskClass.getMethod("length") + val partitionMethod = contentScanTaskClass.getMethod("partition") + val residualMethod = contentScanTaskClass.getMethod("residual") + + // FileScanTask methods + val taskSchemaMethod = fileScanTaskClass.getMethod("schema") + val deletesMethod = fileScanTaskClass.getMethod("deletes") + val specMethod = fileScanTaskClass.getMethod("spec") + + // ContentFile methods - try location() first, fall back to path() + val fileLocationMethod = + try { + contentFileClass.getMethod("location") + } catch { + case _: NoSuchMethodException => contentFileClass.getMethod("path") + } + + // DeleteFile methods + val deleteContentMethod = deleteFileClass.getMethod("content") + val deleteSpecIdMethod = deleteFileClass.getMethod("specId") + val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds") + + // Schema serialization + val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) + schemaToJsonMethod.setAccessible(true) + + // PartitionSpec serialization + val partitionSpecToJsonMethod = + partitionSpecParserClass.getMethod("toJson", partitionSpecClass) + + ReflectionCache( + contentScanTaskClass = contentScanTaskClass, + fileScanTaskClass = fileScanTaskClass, + contentFileClass = contentFileClass, + deleteFileClass = deleteFileClass, + schemaParserClass = schemaParserClass, + schemaClass = schemaClass, + partitionSpecParserClass = partitionSpecParserClass, + partitionSpecClass = partitionSpecClass, + fileMethod = fileMethod, + startMethod = startMethod, + lengthMethod = lengthMethod, + partitionMethod = partitionMethod, + residualMethod = residualMethod, + taskSchemaMethod = taskSchemaMethod, + deletesMethod = deletesMethod, + specMethod = specMethod, + fileLocationMethod = fileLocationMethod, + deleteContentMethod = deleteContentMethod, + deleteSpecIdMethod = deleteSpecIdMethod, + deleteEqualityIdsMethod = deleteEqualityIdsMethod, + schemaToJsonMethod = schemaToJsonMethod, + partitionSpecToJsonMethod = partitionSpecToJsonMethod) + } + /** * Converts an Iceberg partition value to protobuf format. Protobuf is less verbose than JSON. * The following types are also serialized as integer values instead of as strings - Timestamps, @@ -218,68 +327,63 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } /** - * Extracts delete files from an Iceberg FileScanTask as a list (for deduplication). + * Extracts delete files from an Iceberg FileScanTask as a list (for deduplication). Uses cached + * reflection methods for performance. */ private def extractDeleteFilesList( - task: Any, - contentFileClass: Class[_], - fileScanTaskClass: Class[_]): Seq[OperatorOuterClass.IcebergDeleteFile] = { + deletes: java.util.List[_], + cache: ReflectionCache): Seq[OperatorOuterClass.IcebergDeleteFile] = { try { - // scalastyle:off classforname - val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) - // scalastyle:on classforname - - val deletes = IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - deletes.asScala.flatMap { deleteFile => try { - IcebergReflection - .extractFileLocation(contentFileClass, deleteFile) - .map { deletePath => - val deleteBuilder = - OperatorOuterClass.IcebergDeleteFile.newBuilder() - deleteBuilder.setFilePath(deletePath) - - val contentType = - try { - val contentMethod = deleteFileClass.getMethod("content") - val content = contentMethod.invoke(deleteFile) - content.toString match { - case IcebergReflection.ContentTypes.POSITION_DELETES => - IcebergReflection.ContentTypes.POSITION_DELETES - case IcebergReflection.ContentTypes.EQUALITY_DELETES => - IcebergReflection.ContentTypes.EQUALITY_DELETES - case other => other - } - } catch { - case _: Exception => - IcebergReflection.ContentTypes.POSITION_DELETES - } - deleteBuilder.setContentType(contentType) - - val specId = - try { - val specIdMethod = deleteFileClass.getMethod("specId") - specIdMethod.invoke(deleteFile).asInstanceOf[Int] - } catch { - case _: Exception => - 0 - } - deleteBuilder.setPartitionSpecId(specId) + // Use cached fileLocationMethod + val deletePath = cache.fileLocationMethod.invoke(deleteFile) match { + case s: String => s + case cs: CharSequence => cs.toString + case _ => return Seq.empty + } - try { - val equalityIdsMethod = - deleteFileClass.getMethod("equalityFieldIds") - val equalityIds = equalityIdsMethod - .invoke(deleteFile) - .asInstanceOf[java.util.List[Integer]] - equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) - } catch { - case _: Exception => + val deleteBuilder = OperatorOuterClass.IcebergDeleteFile.newBuilder() + deleteBuilder.setFilePath(deletePath) + + // Use cached deleteContentMethod + val contentType = + try { + val content = cache.deleteContentMethod.invoke(deleteFile) + content.toString match { + case IcebergReflection.ContentTypes.POSITION_DELETES => + IcebergReflection.ContentTypes.POSITION_DELETES + case IcebergReflection.ContentTypes.EQUALITY_DELETES => + IcebergReflection.ContentTypes.EQUALITY_DELETES + case other => other } + } catch { + case _: Exception => IcebergReflection.ContentTypes.POSITION_DELETES + } + deleteBuilder.setContentType(contentType) - deleteBuilder.build() + // Use cached deleteSpecIdMethod + val specId = + try { + cache.deleteSpecIdMethod.invoke(deleteFile).asInstanceOf[Int] + } catch { + case _: Exception => 0 } + deleteBuilder.setPartitionSpecId(specId) + + // Use cached deleteEqualityIdsMethod + try { + val equalityIds = cache.deleteEqualityIdsMethod + .invoke(deleteFile) + .asInstanceOf[java.util.List[Integer]] + if (equalityIds != null) { + equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) + } + } catch { + case _: Exception => + } + + Some(deleteBuilder.build()) } catch { case e: Exception => logWarning(s"Failed to serialize delete file: ${e.getMessage}") @@ -288,9 +392,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit }.toSeq } catch { case e: Exception => - val msg = - "Iceberg reflection failure: Failed to extract deletes from FileScanTask: " + - s"${e.getMessage}" + val msg = "Iceberg reflection failure: " + + s"Failed to extract deletes from FileScanTask: ${e.getMessage}" logError(msg) throw new RuntimeException(msg, e) } @@ -306,33 +409,23 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit */ private def serializePartitionData( task: Any, - contentScanTaskClass: Class[_], - fileScanTaskClass: Class[_], + cache: ReflectionCache, taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, icebergScanBuilder: OperatorOuterClass.IcebergScan.Builder, - partitionTypeToPoolIndex: mutable.HashMap[String, Int], - partitionSpecToPoolIndex: mutable.HashMap[String, Int], + partitionTypeToPoolIndex: mutable.HashMap[AnyRef, Int], + partitionSpecToPoolIndex: mutable.HashMap[AnyRef, Int], partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = { try { - val specMethod = fileScanTaskClass.getMethod("spec") - val spec = specMethod.invoke(task) + val spec = cache.specMethod.invoke(task) if (spec != null) { - // Deduplicate partition spec + // Deduplicate partition spec using object identity - only call toJson for new specs try { - // scalastyle:off classforname - val partitionSpecParserClass = - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) - val toJsonMethod = partitionSpecParserClass.getMethod( - "toJson", - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC)) - // scalastyle:on classforname - val partitionSpecJson = toJsonMethod - .invoke(null, spec) - .asInstanceOf[String] - val specIdx = partitionSpecToPoolIndex.getOrElseUpdate( - partitionSpecJson, { + spec.asInstanceOf[AnyRef], { + val partitionSpecJson = cache.partitionSpecToJsonMethod + .invoke(null, spec) + .asInstanceOf[String] val idx = partitionSpecToPoolIndex.size icebergScanBuilder.addPartitionSpecPool(partitionSpecJson) idx @@ -344,8 +437,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Get partition data from the task (via file().partition()) - val partitionMethod = contentScanTaskClass.getMethod("partition") - val partitionData = partitionMethod.invoke(task) + val partitionData = cache.partitionMethod.invoke(task) if (partitionData != null) { // Get the partition type/schema from the spec @@ -367,57 +459,62 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Only serialize partition type if there are actual partition fields if (!fields.isEmpty) { try { - // Manually build StructType JSON to match iceberg-rust expectations. - // Using Iceberg's SchemaParser.toJson() would include schema-level - // metadata (e.g., "schema-id") that iceberg-rust's StructType - // deserializer rejects. We need pure StructType format: - // {"type":"struct","fields":[...]} - - // Filter out fields with unknown types (dropped partition fields). - // Unknown type fields represent partition columns that have been dropped - // from the schema. Per the Iceberg spec, unknown type fields are not - // stored in data files and iceberg-rust doesn't support deserializing - // them. Since these columns are dropped, we don't need to expose their - // partition values when reading. - val fieldsJson = fields.asScala.flatMap { field => - val fieldTypeStr = getFieldType(field) - - // Skip fields with unknown type (dropped partition columns) - if (fieldTypeStr == IcebergReflection.TypeNames.UNKNOWN) { - None - } else { - val fieldIdMethod = field.getClass.getMethod("fieldId") - val fieldId = fieldIdMethod.invoke(field).asInstanceOf[Int] - - val nameMethod = field.getClass.getMethod("name") - val fieldName = nameMethod.invoke(field).asInstanceOf[String] - - val isOptionalMethod = field.getClass.getMethod("isOptional") - val isOptional = - isOptionalMethod.invoke(field).asInstanceOf[Boolean] - val required = !isOptional - - Some( - ("id" -> fieldId) ~ - ("name" -> fieldName) ~ - ("required" -> required) ~ - ("type" -> fieldTypeStr)) - } - }.toList + // Use spec identity for deduplication - same spec = same partition type + // Only build JSON for new specs + val typeIdxOpt = partitionTypeToPoolIndex.get(spec.asInstanceOf[AnyRef]) + val typeIdx = typeIdxOpt.getOrElse { + // Manually build StructType JSON to match iceberg-rust expectations. + // Using Iceberg's SchemaParser.toJson() would include schema-level + // metadata (e.g., "schema-id") that iceberg-rust's StructType + // deserializer rejects. We need pure StructType format: + // {"type":"struct","fields":[...]} + + // Filter out fields with unknown types (dropped partition fields). + // Unknown type fields represent partition columns that have been dropped + // from the schema. Per the Iceberg spec, unknown type fields are not + // stored in data files and iceberg-rust doesn't support deserializing + // them. Since these columns are dropped, we don't need to expose their + // partition values when reading. + val fieldsJson = fields.asScala.flatMap { field => + val fieldTypeStr = getFieldType(field) + + // Skip fields with unknown type (dropped partition columns) + if (fieldTypeStr == IcebergReflection.TypeNames.UNKNOWN) { + None + } else { + val fieldIdMethod = field.getClass.getMethod("fieldId") + val fieldId = fieldIdMethod.invoke(field).asInstanceOf[Int] + + val nameMethod = field.getClass.getMethod("name") + val fieldName = nameMethod.invoke(field).asInstanceOf[String] + + val isOptionalMethod = field.getClass.getMethod("isOptional") + val isOptional = + isOptionalMethod.invoke(field).asInstanceOf[Boolean] + val required = !isOptional + + Some( + ("id" -> fieldId) ~ + ("name" -> fieldName) ~ + ("required" -> required) ~ + ("type" -> fieldTypeStr)) + } + }.toList - // Only serialize if we have non-unknown fields - if (fieldsJson.nonEmpty) { - val partitionTypeJson = compact( - render( - ("type" -> "struct") ~ + // Only add to pool if we have non-unknown fields + if (fieldsJson.nonEmpty) { + val partitionTypeJson = compact( + render(("type" -> "struct") ~ ("fields" -> fieldsJson))) - - val typeIdx = partitionTypeToPoolIndex.getOrElseUpdate( - partitionTypeJson, { - val idx = partitionTypeToPoolIndex.size - icebergScanBuilder.addPartitionTypePool(partitionTypeJson) - idx - }) + val idx = partitionTypeToPoolIndex.size + icebergScanBuilder.addPartitionTypePool(partitionTypeJson) + partitionTypeToPoolIndex.put(spec.asInstanceOf[AnyRef], idx) + idx + } else { + -1 // No valid fields + } + } + if (typeIdx >= 0) { taskBuilder.setPartitionTypeIdx(typeIdx) } } catch { @@ -685,14 +782,15 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Deduplication structures - map unique values to pool indices val schemaToPoolIndex = mutable.HashMap[AnyRef, Int]() - val partitionTypeToPoolIndex = mutable.HashMap[String, Int]() - val partitionSpecToPoolIndex = mutable.HashMap[String, Int]() + val partitionTypeToPoolIndex = mutable.HashMap[AnyRef, Int]() // Use spec identity + val partitionSpecToPoolIndex = mutable.HashMap[AnyRef, Int]() // Use object identity val nameMappingToPoolIndex = mutable.HashMap[String, Int]() val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]() val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64 bytes -> pool index val deleteFilesToPoolIndex = mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]() val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]() + val fieldIdMappingCache = mutable.HashMap[AnyRef, Map[String, Int]]() // Cache per schema var totalTasks = 0 @@ -725,6 +823,9 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Extract FileScanTasks from the InputPartitions in the RDD try { + // Create reflection cache once for all tasks + val cache = createReflectionCache() + scan.wrapped.inputRDD match { case rdd: org.apache.spark.sql.execution.datasources.v2.DataSourceRDD => val partitions = rdd.partitions @@ -754,51 +855,39 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit try { val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder() - // scalastyle:off classforname - val contentScanTaskClass = - Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) - val fileScanTaskClass = - Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val contentFileClass = - Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) - // scalastyle:on classforname - - val fileMethod = contentScanTaskClass.getMethod("file") - val dataFile = fileMethod.invoke(task) - - val filePathOpt = - IcebergReflection.extractFileLocation(contentFileClass, dataFile) - - filePathOpt match { - case Some(filePath) => - taskBuilder.setDataFilePath(filePath) - case None => - val msg = - "Iceberg reflection failure: Cannot extract file path from data file" + val dataFile = cache.fileMethod.invoke(task) + + // Use cached fileLocationMethod for performance + val filePath = cache.fileLocationMethod.invoke(dataFile) match { + case s: String => s + case cs: CharSequence => cs.toString + case other => + val msg = "Iceberg reflection failure: " + + s"Unexpected file path type: ${other.getClass}" logError(msg) throw new RuntimeException(msg) } + taskBuilder.setDataFilePath(filePath) - val startMethod = contentScanTaskClass.getMethod("start") - val start = startMethod.invoke(task).asInstanceOf[Long] + val start = cache.startMethod.invoke(task).asInstanceOf[Long] taskBuilder.setStart(start) - val lengthMethod = contentScanTaskClass.getMethod("length") - val length = lengthMethod.invoke(task).asInstanceOf[Long] + val length = cache.lengthMethod.invoke(task).asInstanceOf[Long] taskBuilder.setLength(length) + // Retrieve deletes once for reuse in schema selection and + // delete file serialization + val deletes = + cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] + val hasDeletes = deletes != null && !deletes.isEmpty + try { // Equality deletes require the full table schema to resolve field IDs, // even for columns not in the projection. Schema evolution requires // using the snapshot's schema to correctly read old data files. // These requirements conflict, so we choose based on delete presence. - val taskSchemaMethod = fileScanTaskClass.getMethod("schema") - val taskSchema = taskSchemaMethod.invoke(task) - - val deletes = - IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - val hasDeletes = !deletes.isEmpty + val taskSchema = cache.taskSchemaMethod.invoke(task) // Schema to pass to iceberg-rust's FileScanTask. // This is used by RecordBatchTransformer for field type lookups (e.g., in @@ -843,27 +932,23 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } - // scalastyle:off classforname - val schemaParserClass = - Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) - val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) - // scalastyle:on classforname - val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) - toJsonMethod.setAccessible(true) - // Use object identity for deduplication: Iceberg Schema objects are immutable // and reused across tasks, making identity-based deduplication safe + // Use cached schemaToJsonMethod for performance val schemaIdx = schemaToPoolIndex.getOrElseUpdate( schema, { val idx = schemaToPoolIndex.size - val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] + val schemaJson = + cache.schemaToJsonMethod.invoke(null, schema).asInstanceOf[String] icebergScanBuilder.addSchemaPool(schemaJson) idx }) taskBuilder.setSchemaIdx(schemaIdx) - // Build field ID mapping from the schema we're using - val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema) + // Build field ID mapping from the schema we're using (cached by identity) + val nameToFieldId = fieldIdMappingCache.getOrElseUpdate( + schema, + IcebergReflection.buildFieldIdMapping(schema)) // Extract project_field_ids for scan.output columns. // For schema evolution: try task schema first, then fall back to @@ -900,9 +985,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit throw new RuntimeException(msg, e) } - // Deduplicate delete files - val deleteFilesList = - extractDeleteFilesList(task, contentFileClass, fileScanTaskClass) + // Deduplicate delete files (using deletes already retrieved above) + val deleteFilesList = extractDeleteFilesList(deletes, cache) if (deleteFilesList.nonEmpty) { val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate( deleteFilesList, { @@ -916,10 +1000,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Extract and deduplicate residual expression + // Use cached residualMethod for performance val residualExprOpt = try { - val residualMethod = contentScanTaskClass.getMethod("residual") - val residualExpr = residualMethod.invoke(task) + val residualExpr = cache.residualMethod.invoke(task) val catalystExpr = convertIcebergExpression(residualExpr, scan.output) @@ -947,8 +1031,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Serialize partition spec and data (field definitions, transforms, values) serializePartitionData( task, - contentScanTaskClass, - fileScanTaskClass, + cache, taskBuilder, icebergScanBuilder, partitionTypeToPoolIndex, From a364384aba30800398958e6172888e476afe24e7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 09:10:10 -0700 Subject: [PATCH 2/4] Move ReflectionCache to IcebergReflection helper Move the ReflectionCache case class and createReflectionCache() method from CometIcebergNativeScan to the IcebergReflection helper class per code review feedback. This encapsulates all Iceberg reflection caching logic in the shared reflection utilities. Co-Authored-By: Claude Opus 4.5 --- .../comet/iceberg/IcebergReflection.scala | 108 ++++++++++++++++ .../operator/CometIcebergNativeScan.scala | 115 +----------------- 2 files changed, 111 insertions(+), 112 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 2d772063e4..de0db4b715 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -29,6 +29,114 @@ import org.apache.spark.internal.Logging */ object IcebergReflection extends Logging { + /** + * Cache for reflection classes and methods to avoid repeated lookups. All Iceberg reflection + * operations are expensive, so we cache them once and reuse. + */ + case class ReflectionCache( + // Iceberg classes + contentScanTaskClass: Class[_], + fileScanTaskClass: Class[_], + contentFileClass: Class[_], + deleteFileClass: Class[_], + schemaParserClass: Class[_], + schemaClass: Class[_], + partitionSpecParserClass: Class[_], + partitionSpecClass: Class[_], + // ContentScanTask methods + fileMethod: java.lang.reflect.Method, + startMethod: java.lang.reflect.Method, + lengthMethod: java.lang.reflect.Method, + partitionMethod: java.lang.reflect.Method, + residualMethod: java.lang.reflect.Method, + // FileScanTask methods + taskSchemaMethod: java.lang.reflect.Method, + deletesMethod: java.lang.reflect.Method, + specMethod: java.lang.reflect.Method, + // ContentFile methods + fileLocationMethod: java.lang.reflect.Method, + // DeleteFile methods + deleteContentMethod: java.lang.reflect.Method, + deleteSpecIdMethod: java.lang.reflect.Method, + deleteEqualityIdsMethod: java.lang.reflect.Method, + // Schema methods + schemaToJsonMethod: java.lang.reflect.Method, + // PartitionSpec methods + partitionSpecToJsonMethod: java.lang.reflect.Method) + + /** + * Creates a ReflectionCache by loading all classes and methods once. + */ + def createReflectionCache(): ReflectionCache = { + // scalastyle:off classforname + val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK) + val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK) + val contentFileClass = Class.forName(ClassNames.CONTENT_FILE) + val deleteFileClass = Class.forName(ClassNames.DELETE_FILE) + val schemaParserClass = Class.forName(ClassNames.SCHEMA_PARSER) + val schemaClass = Class.forName(ClassNames.SCHEMA) + val partitionSpecParserClass = Class.forName(ClassNames.PARTITION_SPEC_PARSER) + val partitionSpecClass = Class.forName(ClassNames.PARTITION_SPEC) + // scalastyle:on classforname + + // ContentScanTask methods + val fileMethod = contentScanTaskClass.getMethod("file") + val startMethod = contentScanTaskClass.getMethod("start") + val lengthMethod = contentScanTaskClass.getMethod("length") + val partitionMethod = contentScanTaskClass.getMethod("partition") + val residualMethod = contentScanTaskClass.getMethod("residual") + + // FileScanTask methods + val taskSchemaMethod = fileScanTaskClass.getMethod("schema") + val deletesMethod = fileScanTaskClass.getMethod("deletes") + val specMethod = fileScanTaskClass.getMethod("spec") + + // ContentFile methods - try location() first, fall back to path() + val fileLocationMethod = + try { + contentFileClass.getMethod("location") + } catch { + case _: NoSuchMethodException => contentFileClass.getMethod("path") + } + + // DeleteFile methods + val deleteContentMethod = deleteFileClass.getMethod("content") + val deleteSpecIdMethod = deleteFileClass.getMethod("specId") + val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds") + + // Schema serialization + val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) + schemaToJsonMethod.setAccessible(true) + + // PartitionSpec serialization + val partitionSpecToJsonMethod = + partitionSpecParserClass.getMethod("toJson", partitionSpecClass) + + ReflectionCache( + contentScanTaskClass = contentScanTaskClass, + fileScanTaskClass = fileScanTaskClass, + contentFileClass = contentFileClass, + deleteFileClass = deleteFileClass, + schemaParserClass = schemaParserClass, + schemaClass = schemaClass, + partitionSpecParserClass = partitionSpecParserClass, + partitionSpecClass = partitionSpecClass, + fileMethod = fileMethod, + startMethod = startMethod, + lengthMethod = lengthMethod, + partitionMethod = partitionMethod, + residualMethod = residualMethod, + taskSchemaMethod = taskSchemaMethod, + deletesMethod = deletesMethod, + specMethod = specMethod, + fileLocationMethod = fileLocationMethod, + deleteContentMethod = deleteContentMethod, + deleteSpecIdMethod = deleteSpecIdMethod, + deleteEqualityIdsMethod = deleteEqualityIdsMethod, + schemaToJsonMethod = schemaToJsonMethod, + partitionSpecToJsonMethod = partitionSpecToJsonMethod) + } + /** * Iceberg class names used throughout Comet. */ diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 174dfe8acd..0b9acd3b78 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -69,115 +69,6 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } - /** - * Cache for reflection classes and methods to avoid repeated lookups. All Iceberg reflection - * operations are expensive, so we cache them once and reuse. - */ - private case class ReflectionCache( - // Iceberg classes - contentScanTaskClass: Class[_], - fileScanTaskClass: Class[_], - contentFileClass: Class[_], - deleteFileClass: Class[_], - schemaParserClass: Class[_], - schemaClass: Class[_], - partitionSpecParserClass: Class[_], - partitionSpecClass: Class[_], - // ContentScanTask methods - fileMethod: java.lang.reflect.Method, - startMethod: java.lang.reflect.Method, - lengthMethod: java.lang.reflect.Method, - partitionMethod: java.lang.reflect.Method, - residualMethod: java.lang.reflect.Method, - // FileScanTask methods - taskSchemaMethod: java.lang.reflect.Method, - deletesMethod: java.lang.reflect.Method, - specMethod: java.lang.reflect.Method, - // ContentFile methods - fileLocationMethod: java.lang.reflect.Method, - // DeleteFile methods - deleteContentMethod: java.lang.reflect.Method, - deleteSpecIdMethod: java.lang.reflect.Method, - deleteEqualityIdsMethod: java.lang.reflect.Method, - // Schema methods - schemaToJsonMethod: java.lang.reflect.Method, - // PartitionSpec methods - partitionSpecToJsonMethod: java.lang.reflect.Method) - - /** - * Creates a ReflectionCache by loading all classes and methods once. - */ - private def createReflectionCache(): ReflectionCache = { - // scalastyle:off classforname - val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) - val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) - val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) - val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) - val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) - val partitionSpecParserClass = - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) - val partitionSpecClass = Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC) - // scalastyle:on classforname - - // ContentScanTask methods - val fileMethod = contentScanTaskClass.getMethod("file") - val startMethod = contentScanTaskClass.getMethod("start") - val lengthMethod = contentScanTaskClass.getMethod("length") - val partitionMethod = contentScanTaskClass.getMethod("partition") - val residualMethod = contentScanTaskClass.getMethod("residual") - - // FileScanTask methods - val taskSchemaMethod = fileScanTaskClass.getMethod("schema") - val deletesMethod = fileScanTaskClass.getMethod("deletes") - val specMethod = fileScanTaskClass.getMethod("spec") - - // ContentFile methods - try location() first, fall back to path() - val fileLocationMethod = - try { - contentFileClass.getMethod("location") - } catch { - case _: NoSuchMethodException => contentFileClass.getMethod("path") - } - - // DeleteFile methods - val deleteContentMethod = deleteFileClass.getMethod("content") - val deleteSpecIdMethod = deleteFileClass.getMethod("specId") - val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds") - - // Schema serialization - val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) - schemaToJsonMethod.setAccessible(true) - - // PartitionSpec serialization - val partitionSpecToJsonMethod = - partitionSpecParserClass.getMethod("toJson", partitionSpecClass) - - ReflectionCache( - contentScanTaskClass = contentScanTaskClass, - fileScanTaskClass = fileScanTaskClass, - contentFileClass = contentFileClass, - deleteFileClass = deleteFileClass, - schemaParserClass = schemaParserClass, - schemaClass = schemaClass, - partitionSpecParserClass = partitionSpecParserClass, - partitionSpecClass = partitionSpecClass, - fileMethod = fileMethod, - startMethod = startMethod, - lengthMethod = lengthMethod, - partitionMethod = partitionMethod, - residualMethod = residualMethod, - taskSchemaMethod = taskSchemaMethod, - deletesMethod = deletesMethod, - specMethod = specMethod, - fileLocationMethod = fileLocationMethod, - deleteContentMethod = deleteContentMethod, - deleteSpecIdMethod = deleteSpecIdMethod, - deleteEqualityIdsMethod = deleteEqualityIdsMethod, - schemaToJsonMethod = schemaToJsonMethod, - partitionSpecToJsonMethod = partitionSpecToJsonMethod) - } - /** * Converts an Iceberg partition value to protobuf format. Protobuf is less verbose than JSON. * The following types are also serialized as integer values instead of as strings - Timestamps, @@ -332,7 +223,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit */ private def extractDeleteFilesList( deletes: java.util.List[_], - cache: ReflectionCache): Seq[OperatorOuterClass.IcebergDeleteFile] = { + cache: IcebergReflection.ReflectionCache): Seq[OperatorOuterClass.IcebergDeleteFile] = { try { deletes.asScala.flatMap { deleteFile => try { @@ -409,7 +300,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit */ private def serializePartitionData( task: Any, - cache: ReflectionCache, + cache: IcebergReflection.ReflectionCache, taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, icebergScanBuilder: OperatorOuterClass.IcebergScan.Builder, partitionTypeToPoolIndex: mutable.HashMap[AnyRef, Int], @@ -824,7 +715,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Extract FileScanTasks from the InputPartitions in the RDD try { // Create reflection cache once for all tasks - val cache = createReflectionCache() + val cache = IcebergReflection.createReflectionCache() scan.wrapped.inputRDD match { case rdd: org.apache.spark.sql.execution.datasources.v2.DataSourceRDD => From ab020dceaafca2c22cff4e7f39082396270c8302 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 09:35:06 -0700 Subject: [PATCH 3/4] chore: trigger CI From c168832ea7c04c8bdd9d5cb5dba542abc02b8186 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 13:20:34 -0700 Subject: [PATCH 4/4] Cache additional partition data reflection methods Add caching for partition data extraction methods that were still being looked up per-field: - PartitionSpec.partitionType() - StructType.fields() - NestedField.type(), fieldId(), name(), isOptional() - StructLike.get(int, Class) These methods are called for every partition field in every task, so caching them provides significant speedup. Co-Authored-By: Claude Opus 4.5 --- .../comet/iceberg/IcebergReflection.scala | 45 +++++++++++++++++-- .../operator/CometIcebergNativeScan.scala | 32 +++++-------- 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index de0db4b715..490eae345d 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -43,6 +43,8 @@ object IcebergReflection extends Logging { schemaClass: Class[_], partitionSpecParserClass: Class[_], partitionSpecClass: Class[_], + structLikeClass: Class[_], + nestedFieldClass: Class[_], // ContentScanTask methods fileMethod: java.lang.reflect.Method, startMethod: java.lang.reflect.Method, @@ -62,7 +64,17 @@ object IcebergReflection extends Logging { // Schema methods schemaToJsonMethod: java.lang.reflect.Method, // PartitionSpec methods - partitionSpecToJsonMethod: java.lang.reflect.Method) + partitionSpecToJsonMethod: java.lang.reflect.Method, + partitionTypeMethod: java.lang.reflect.Method, + // StructType methods + structTypeFieldsMethod: java.lang.reflect.Method, + // NestedField methods + nestedFieldTypeMethod: java.lang.reflect.Method, + nestedFieldIdMethod: java.lang.reflect.Method, + nestedFieldNameMethod: java.lang.reflect.Method, + nestedFieldIsOptionalMethod: java.lang.reflect.Method, + // StructLike methods + structLikeGetMethod: java.lang.reflect.Method) /** * Creates a ReflectionCache by loading all classes and methods once. @@ -77,6 +89,9 @@ object IcebergReflection extends Logging { val schemaClass = Class.forName(ClassNames.SCHEMA) val partitionSpecParserClass = Class.forName(ClassNames.PARTITION_SPEC_PARSER) val partitionSpecClass = Class.forName(ClassNames.PARTITION_SPEC) + val structTypeClass = Class.forName(ClassNames.STRUCT_TYPE) + val nestedFieldClass = Class.forName(ClassNames.NESTED_FIELD) + val structLikeClass = Class.forName(ClassNames.STRUCT_LIKE) // scalastyle:on classforname // ContentScanTask methods @@ -108,9 +123,22 @@ object IcebergReflection extends Logging { val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) schemaToJsonMethod.setAccessible(true) - // PartitionSpec serialization + // PartitionSpec methods val partitionSpecToJsonMethod = partitionSpecParserClass.getMethod("toJson", partitionSpecClass) + val partitionTypeMethod = partitionSpecClass.getMethod("partitionType") + + // StructType methods + val structTypeFieldsMethod = structTypeClass.getMethod("fields") + + // NestedField methods + val nestedFieldTypeMethod = nestedFieldClass.getMethod("type") + val nestedFieldIdMethod = nestedFieldClass.getMethod("fieldId") + val nestedFieldNameMethod = nestedFieldClass.getMethod("name") + val nestedFieldIsOptionalMethod = nestedFieldClass.getMethod("isOptional") + + // StructLike methods + val structLikeGetMethod = structLikeClass.getMethod("get", classOf[Int], classOf[Class[_]]) ReflectionCache( contentScanTaskClass = contentScanTaskClass, @@ -121,6 +149,8 @@ object IcebergReflection extends Logging { schemaClass = schemaClass, partitionSpecParserClass = partitionSpecParserClass, partitionSpecClass = partitionSpecClass, + structLikeClass = structLikeClass, + nestedFieldClass = nestedFieldClass, fileMethod = fileMethod, startMethod = startMethod, lengthMethod = lengthMethod, @@ -134,7 +164,14 @@ object IcebergReflection extends Logging { deleteSpecIdMethod = deleteSpecIdMethod, deleteEqualityIdsMethod = deleteEqualityIdsMethod, schemaToJsonMethod = schemaToJsonMethod, - partitionSpecToJsonMethod = partitionSpecToJsonMethod) + partitionSpecToJsonMethod = partitionSpecToJsonMethod, + partitionTypeMethod = partitionTypeMethod, + structTypeFieldsMethod = structTypeFieldsMethod, + nestedFieldTypeMethod = nestedFieldTypeMethod, + nestedFieldIdMethod = nestedFieldIdMethod, + nestedFieldNameMethod = nestedFieldNameMethod, + nestedFieldIsOptionalMethod = nestedFieldIsOptionalMethod, + structLikeGetMethod = structLikeGetMethod) } /** @@ -154,6 +191,8 @@ object IcebergReflection extends Logging { val PARTITION_SPEC = "org.apache.iceberg.PartitionSpec" val PARTITION_FIELD = "org.apache.iceberg.PartitionField" val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate" + val STRUCT_TYPE = "org.apache.iceberg.types.Types$StructType" + val NESTED_FIELD = "org.apache.iceberg.types.Types$NestedField" } /** diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 0b9acd3b78..c68a2d61d5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -331,20 +331,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val partitionData = cache.partitionMethod.invoke(task) if (partitionData != null) { - // Get the partition type/schema from the spec - val partitionTypeMethod = spec.getClass.getMethod("partitionType") - val partitionType = partitionTypeMethod.invoke(spec) + // Get the partition type/schema from the spec (using cached method) + val partitionType = cache.partitionTypeMethod.invoke(spec) - // Check if partition type has any fields before serializing - val fieldsMethod = partitionType.getClass.getMethod("fields") - val fields = fieldsMethod + // Check if partition type has any fields before serializing (using cached method) + val fields = cache.structTypeFieldsMethod .invoke(partitionType) .asInstanceOf[java.util.List[_]] // Helper to get field type string (shared by both type and data serialization) def getFieldType(field: Any): String = { - val typeMethod = field.getClass.getMethod("type") - typeMethod.invoke(field).toString + cache.nestedFieldTypeMethod.invoke(field).toString } // Only serialize partition type if there are actual partition fields @@ -373,15 +370,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit if (fieldTypeStr == IcebergReflection.TypeNames.UNKNOWN) { None } else { - val fieldIdMethod = field.getClass.getMethod("fieldId") - val fieldId = fieldIdMethod.invoke(field).asInstanceOf[Int] - - val nameMethod = field.getClass.getMethod("name") - val fieldName = nameMethod.invoke(field).asInstanceOf[String] - - val isOptionalMethod = field.getClass.getMethod("isOptional") + val fieldId = cache.nestedFieldIdMethod.invoke(field).asInstanceOf[Int] + val fieldName = cache.nestedFieldNameMethod.invoke(field).asInstanceOf[String] val isOptional = - isOptionalMethod.invoke(field).asInstanceOf[Boolean] + cache.nestedFieldIsOptionalMethod.invoke(field).asInstanceOf[Boolean] val required = !isOptional Some( @@ -433,12 +425,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit None } else { // Use the partition type's field ID (same as in partition_type_json) - val fieldIdMethod = field.getClass.getMethod("fieldId") - val fieldId = fieldIdMethod.invoke(field).asInstanceOf[Int] + val fieldId = cache.nestedFieldIdMethod.invoke(field).asInstanceOf[Int] - val getMethod = - partitionData.getClass.getMethod("get", classOf[Int], classOf[Class[_]]) - val value = getMethod.invoke(partitionData, Integer.valueOf(idx), classOf[Object]) + val value = cache.structLikeGetMethod + .invoke(partitionData, Integer.valueOf(idx), classOf[Object]) Some(partitionValueToProto(fieldId, fieldTypeStr, value)) }