From ff31977e0dcfe2fb2def09384113e015c89cc873 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Fri, 13 Mar 2026 14:09:11 +0000 Subject: [PATCH] [GLUTEN-11550][VL][UT] Enable Variant test suites Enable GlutenVariantEndToEndSuite, GlutenVariantShreddingSuite, and GlutenParquetVariantShreddingSuite for both spark40 and spark41. Changes: 1. VeloxValidatorApi: Detect variant shredded structs produced by Spark's PushVariantIntoScan (checking __VARIANT_METADATA_KEY metadata) to trigger fallback to Spark's native Parquet reader. 2. ParquetMetadataUtils: Refactor to single-pass architecture. - Extract parquetFooters() returning Iterator[Either[Exception, ParquetMetadata]] to read each footer once. - Merge validateVariantAnnotation into isUnsupportedMetadata with metadataValidationEnabled parameter. - Variant annotation check is always-on (correctness); codec, timezone, encryption checks are gated by config. - VeloxBackend simplified to single validateMetadata() call. 3. Spark41Shims: Add shouldFallbackForParquetVariantAnnotation to detect Parquet variant logical type annotations. Add needsVariantAnnotationCheck to avoid unnecessary I/O on Spark versions that don't support variant annotations. 4. pom.xml: Add -Dfile.encoding=UTF-8 to test JVM args. On JDK 17 with LANG=C (CI containers centos-8/9), the default charset is US-ASCII causing garbled output for multi-byte characters. JDK 18+ defaults to UTF-8 via JEP 400. See: https://github.com/apache/spark/blob/v4.0.1/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java#L508 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../backendsapi/velox/VeloxBackend.scala | 7 +- .../backendsapi/velox/VeloxValidatorApi.scala | 9 + .../gluten/utils/ParquetMetadataUtils.scala | 167 ++++++++---------- .../utils/velox/VeloxTestSettings.scala | 4 +- .../utils/velox/VeloxTestSettings.scala | 6 +- pom.xml | 1 + .../apache/gluten/sql/shims/SparkShims.scala | 4 + .../sql/shims/spark41/Spark41Shims.scala | 17 +- 8 files changed, 111 insertions(+), 104 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 2ab3af7ceaad..4b50317abb10 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -209,10 +209,9 @@ object VeloxBackendSettings extends BackendSettingsApi { } val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) - val parquetMetadataValidationResult = - ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) - parquetMetadataValidationResult.map( - reason => s"Detected unsupported metadata in parquet files: $reason") + ParquetMetadataUtils + .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) + .map(reason => s"Detected unsupported metadata in parquet files: $reason") } def validateDataSchema(): Option[String] = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 8b2193b58042..0ba86c16c782 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -121,6 +121,15 @@ object VeloxValidatorApi { case map: MapType => validateSchema(map.keyType).orElse(validateSchema(map.valueType)) case struct: StructType => + // Detect variant shredded struct produced by Spark's PushVariantIntoScan. + // These structs have all fields annotated with __VARIANT_METADATA_KEY metadata. + // Velox cannot read the variant shredding encoding in Parquet files. + if ( + struct.fields.nonEmpty && + struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) + ) { + return Some(s"Variant shredded struct is not supported: $struct") + } struct.foreach { field => val reason = validateSchema(field.dataType) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index ab76cba4aa5d..471474211ade 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{LocatedFileStatus, Path} import org.apache.parquet.crypto.ParquetCryptoRuntimeException import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.metadata.ParquetMetadata @@ -32,16 +32,16 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata object ParquetMetadataUtils extends Logging { /** - * Validates whether Parquet metadata is unsupported for the given paths. + * Validates Parquet file metadata for unsupported features. Iterates files once, reads each + * footer once, and runs all applicable checks against it. * - * - If there is at least one Parquet file with encryption enabled, fail the validation. + * Checks always performed (correctness): + * - Variant annotation detection (Spark 4.1+) * - * @param rootPaths - * List of file paths to scan - * @param hadoopConf - * Hadoop configuration - * @return - * [[Option[String]]] Empty if the Parquet metadata is supported. Fallback reason otherwise. + * Checks gated by parquetMetadataValidationEnabled: + * - Encrypted footer / encrypted file + * - Unsupported codec + * - Legacy timezone metadata */ def validateMetadata( rootPaths: Seq[String], @@ -49,28 +49,13 @@ object ParquetMetadataUtils extends Logging { parquetOptions: ParquetOptions, fileLimit: Int ): Option[String] = { - if (!GlutenConfig.get.parquetMetadataValidationEnabled) { - None + val enabled = GlutenConfig.get.parquetMetadataValidationEnabled + if (enabled || SparkShimLoader.getSparkShims.needsVariantAnnotationCheck) { + parquetFooters(rootPaths, hadoopConf, fileLimit) + .map(isUnsupportedMetadata(_, parquetOptions, enabled)) + .find(_.isDefined) + .flatten } else { - rootPaths.foreach { - rootPath => - val fs = new Path(rootPath).getFileSystem(hadoopConf) - try { - val maybeReason = - checkForUnexpectedMetadataWithLimit( - fs, - new Path(rootPath), - hadoopConf, - parquetOptions, - fileLimit = fileLimit) - if (maybeReason.isDefined) { - return maybeReason - } - } catch { - case e: Exception => - logWarning("Catch exception when validating parquet file metadata", e) - } - } None } } @@ -89,77 +74,71 @@ object ParquetMetadataUtils extends Logging { } /** - * Check any Parquet file under the given path is with unexpected metadata using a recursive - * iterator. Only the first `fileLimit` files are processed for efficiency. - * - * @param fs - * FileSystem to use - * @param path - * Root path to check - * @param conf - * Hadoop configuration - * @param fileLimit - * Maximum number of files to inspect - * @return - * (String, Int) if an unsupported metadata is detected,empty otherwise and the number of - * checked files + * Iterates over Parquet files under rootPaths, reads footer once per file. Returns an iterator of + * Either[Exception, ParquetMetadata] where Left represents a readFooter failure. */ - private def checkForUnexpectedMetadataWithLimit( - fs: FileSystem, - path: Path, - conf: Configuration, - parquetOptions: ParquetOptions, + private def parquetFooters( + rootPaths: Seq[String], + hadoopConf: Configuration, fileLimit: Int - ): Option[String] = { - val filesIterator = fs.listFiles(path, true) - var checkedFileCount = 0 - while (filesIterator.hasNext && checkedFileCount < fileLimit) { - val fileStatus = filesIterator.next() - checkedFileCount += 1 - val metadataUnsupported = isUnsupportedMetadata(fileStatus, conf, parquetOptions) - if (metadataUnsupported.isDefined) { - return metadataUnsupported - } + ): Iterator[Either[Exception, ParquetMetadata]] = { + rootPaths.iterator.flatMap { + rootPath => + val fs = new Path(rootPath).getFileSystem(hadoopConf) + try { + val filesIterator = fs.listFiles(new Path(rootPath), true) + new Iterator[LocatedFileStatus] { + def hasNext: Boolean = filesIterator.hasNext + def next(): LocatedFileStatus = filesIterator.next() + }.take(fileLimit) + .map { + fileStatus => + try { + Right( + ParquetFooterReaderShim + .readFooter(hadoopConf, fileStatus, ParquetMetadataConverter.NO_FILTER)) + } catch { + case e: Exception => Left(e) + } + } + } catch { + case e: Exception => + logWarning("Catch exception when validating parquet file metadata", e) + Iterator.empty + } } - None } - /** - * Checks whether there are timezones set with Spark key SPARK_TIMEZONE_METADATA_KEY in the - * Parquet metadata. In this case, the Parquet scan should fall back to vanilla Spark since Velox - * doesn't yet support Spark legacy datetime. - */ private def isUnsupportedMetadata( - fileStatus: LocatedFileStatus, - conf: Configuration, - parquetOptions: ParquetOptions): Option[String] = { - val footer = - try { - ParquetFooterReaderShim.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER) - } catch { - case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => - return Some("Encrypted Parquet footer detected.") - case _: RuntimeException => - // Ignored as it's could be a "Not a Parquet file" exception. - return None - } - val validationChecks = Seq( - validateCodec(footer), - isTimezoneFoundInMetadata(footer, parquetOptions) - ) - - for (check <- validationChecks) { - if (check.isDefined) { - return check - } - } - - // Previous Spark3.4 version uses toString to check if the data is encrypted, - // so place the check to the end - if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer)) { - return Some("Encrypted Parquet file detected.") + footerOrError: Either[Exception, ParquetMetadata], + parquetOptions: ParquetOptions, + metadataValidationEnabled: Boolean): Option[String] = { + footerOrError match { + case Left(e) + if metadataValidationEnabled && + ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + Some("Encrypted Parquet footer detected.") + case Left(_: RuntimeException) => + // Ignored as it's could be a "Not a Parquet file" exception. + None + case Left(e) => + logWarning("Catch exception when validating parquet file metadata", e) + None + case Right(footer) => + // Always-on check (correctness). + if (SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer)) { + Some("Variant annotation detected in Parquet file.") + } else if (metadataValidationEnabled) { + // Previous Spark3.4 version uses toString to check if the data is encrypted, + // so place the check to the end + validateCodec(footer) + .orElse(isTimezoneFoundInMetadata(footer, parquetOptions)) + .orElse(Option.when(SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer))( + "Encrypted Parquet file detected.")) + } else { + None + } } - None } private def isTimezoneFoundInMetadata( diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 9964ed2c4d82..6238c54b33ca 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -857,8 +857,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] enableSuite[GlutenXmlFunctionsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index fee8c067a323..ed6ffd537e1a 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -399,7 +399,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") - // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure + enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure @@ -822,8 +822,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] enableSuite[GlutenXmlFunctionsSuite] diff --git a/pom.xml b/pom.xml index df2c3a1a8eb4..d0c5398f27f1 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,7 @@ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true + -Dfile.encoding=UTF-8 file:src/test/resources/log4j2.properties diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 1f6d015393f1..c257e40ec2c2 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -237,6 +237,10 @@ trait SparkShims { def isParquetFileEncrypted(footer: ParquetMetadata): Boolean + def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false + + def needsVariantAnnotationCheck: Boolean = false + def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 0e3e752f9970..32c6905f0597 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata} import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType} import java.time.ZoneOffset import java.util.{Map => JMap} @@ -571,6 +571,21 @@ class Spark41Shims extends SparkShims { } } + override def needsVariantAnnotationCheck: Boolean = + !SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION) + + override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = + needsVariantAnnotationCheck && containsVariantAnnotation(footer.getFileMetaData.getSchema) + + private def containsVariantAnnotation(groupType: GroupType): Boolean = { + groupType.getFields.asScala.exists { + field => + Option(field.getLogicalTypeAnnotation) + .exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) || + (!field.isPrimitive && containsVariantAnnotation(field.asGroupType())) + } + } + override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]