Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
val parquetMetadataValidationResult =
ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
parquetMetadataValidationResult.map(
reason => s"Detected unsupported metadata in parquet files: $reason")
ParquetMetadataUtils
.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
.map(reason => s"Detected unsupported metadata in parquet files: $reason")
}

def validateDataSchema(): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ object VeloxValidatorApi {
case map: MapType =>
validateSchema(map.keyType).orElse(validateSchema(map.valueType))
case struct: StructType =>
// Detect variant shredded struct produced by Spark's PushVariantIntoScan.
// These structs have all fields annotated with __VARIANT_METADATA_KEY metadata.
// Velox cannot read the variant shredding encoding in Parquet files.
if (
struct.fields.nonEmpty &&
struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY"))
) {
return Some(s"Variant shredded struct is not supported: $struct")
}
struct.foreach {
field =>
val reason = validateSchema(field.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,53 +24,38 @@ import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.metadata.ParquetMetadata

object ParquetMetadataUtils extends Logging {

/**
* Validates whether Parquet metadata is unsupported for the given paths.
* Validates Parquet file metadata for unsupported features. Iterates files once, reads each
* footer once, and runs all applicable checks against it.
*
* - If there is at least one Parquet file with encryption enabled, fail the validation.
* Checks always performed (correctness):
* - Variant annotation detection (Spark 4.1+)
*
* @param rootPaths
* List of file paths to scan
* @param hadoopConf
* Hadoop configuration
* @return
* [[Option[String]]] Empty if the Parquet metadata is supported. Fallback reason otherwise.
* Checks gated by parquetMetadataValidationEnabled:
* - Encrypted footer / encrypted file
* - Unsupported codec
* - Legacy timezone metadata
*/
def validateMetadata(
rootPaths: Seq[String],
hadoopConf: Configuration,
parquetOptions: ParquetOptions,
fileLimit: Int
): Option[String] = {
if (!GlutenConfig.get.parquetMetadataValidationEnabled) {
None
val enabled = GlutenConfig.get.parquetMetadataValidationEnabled
if (enabled || SparkShimLoader.getSparkShims.needsVariantAnnotationCheck) {
parquetFooters(rootPaths, hadoopConf, fileLimit)
.map(isUnsupportedMetadata(_, parquetOptions, enabled))
.find(_.isDefined)
.flatten
} else {
rootPaths.foreach {
rootPath =>
val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
val maybeReason =
checkForUnexpectedMetadataWithLimit(
fs,
new Path(rootPath),
hadoopConf,
parquetOptions,
fileLimit = fileLimit)
if (maybeReason.isDefined) {
return maybeReason
}
} catch {
case e: Exception =>
logWarning("Catch exception when validating parquet file metadata", e)
}
}
None
}
}
Expand All @@ -89,77 +74,71 @@ object ParquetMetadataUtils extends Logging {
}

/**
* Check any Parquet file under the given path is with unexpected metadata using a recursive
* iterator. Only the first `fileLimit` files are processed for efficiency.
*
* @param fs
* FileSystem to use
* @param path
* Root path to check
* @param conf
* Hadoop configuration
* @param fileLimit
* Maximum number of files to inspect
* @return
* (String, Int) if an unsupported metadata is detected,empty otherwise and the number of
* checked files
* Iterates over Parquet files under rootPaths, reads footer once per file. Returns an iterator of
* Either[Exception, ParquetMetadata] where Left represents a readFooter failure.
*/
private def checkForUnexpectedMetadataWithLimit(
fs: FileSystem,
path: Path,
conf: Configuration,
parquetOptions: ParquetOptions,
private def parquetFooters(
rootPaths: Seq[String],
hadoopConf: Configuration,
fileLimit: Int
): Option[String] = {
val filesIterator = fs.listFiles(path, true)
var checkedFileCount = 0
while (filesIterator.hasNext && checkedFileCount < fileLimit) {
val fileStatus = filesIterator.next()
checkedFileCount += 1
val metadataUnsupported = isUnsupportedMetadata(fileStatus, conf, parquetOptions)
if (metadataUnsupported.isDefined) {
return metadataUnsupported
}
): Iterator[Either[Exception, ParquetMetadata]] = {
rootPaths.iterator.flatMap {
rootPath =>
val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
val filesIterator = fs.listFiles(new Path(rootPath), true)
new Iterator[LocatedFileStatus] {
def hasNext: Boolean = filesIterator.hasNext
def next(): LocatedFileStatus = filesIterator.next()
}.take(fileLimit)
.map {
fileStatus =>
try {
Right(
ParquetFooterReaderShim
.readFooter(hadoopConf, fileStatus, ParquetMetadataConverter.NO_FILTER))
} catch {
case e: Exception => Left(e)
}
}
} catch {
case e: Exception =>
logWarning("Catch exception when validating parquet file metadata", e)
Iterator.empty
}
}
None
}

/**
* Checks whether there are timezones set with Spark key SPARK_TIMEZONE_METADATA_KEY in the
* Parquet metadata. In this case, the Parquet scan should fall back to vanilla Spark since Velox
* doesn't yet support Spark legacy datetime.
*/
private def isUnsupportedMetadata(
fileStatus: LocatedFileStatus,
conf: Configuration,
parquetOptions: ParquetOptions): Option[String] = {
val footer =
try {
ParquetFooterReaderShim.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER)
} catch {
case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
return Some("Encrypted Parquet footer detected.")
case _: RuntimeException =>
// Ignored as it's could be a "Not a Parquet file" exception.
return None
}
val validationChecks = Seq(
validateCodec(footer),
isTimezoneFoundInMetadata(footer, parquetOptions)
)

for (check <- validationChecks) {
if (check.isDefined) {
return check
}
}

// Previous Spark3.4 version uses toString to check if the data is encrypted,
// so place the check to the end
if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer)) {
return Some("Encrypted Parquet file detected.")
footerOrError: Either[Exception, ParquetMetadata],
parquetOptions: ParquetOptions,
metadataValidationEnabled: Boolean): Option[String] = {
footerOrError match {
case Left(e)
if metadataValidationEnabled &&
ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
Some("Encrypted Parquet footer detected.")
case Left(_: RuntimeException) =>
// Ignored as it's could be a "Not a Parquet file" exception.
None
case Left(e) =>
logWarning("Catch exception when validating parquet file metadata", e)
None
case Right(footer) =>
// Always-on check (correctness).
if (SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer)) {
Some("Variant annotation detected in Parquet file.")
} else if (metadataValidationEnabled) {
// Previous Spark3.4 version uses toString to check if the data is encrypted,
// so place the check to the end
validateCodec(footer)
.orElse(isTimezoneFoundInMetadata(footer, parquetOptions))
.orElse(Option.when(SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer))(
"Encrypted Parquet file detected."))
} else {
None
}
}
None
}

private def isTimezoneFoundInMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
enableSuite[GlutenVariantEndToEndSuite]
enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
enableSuite[GlutenXmlFunctionsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("parquet widening conversion ShortType -> DecimalType(20,0)")
.exclude("parquet widening conversion ShortType -> DecimalType(38,0)")
.exclude("parquet widening conversion ShortType -> DoubleType")
// TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
// TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
Expand Down Expand Up @@ -822,8 +822,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
enableSuite[GlutenVariantEndToEndSuite]
enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
enableSuite[GlutenXmlFunctionsSuite]
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true
-Dfile.encoding=UTF-8
</extraJavaTestArgs>
<log4j.conf>file:src/test/resources/log4j2.properties</log4j.conf>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ trait SparkShims {

def isParquetFileEncrypted(footer: ParquetMetadata): Boolean

def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false

def needsVariantAnnotationCheck: Boolean = false

def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata}
import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType}

import java.time.ZoneOffset
import java.util.{Map => JMap}
Expand Down Expand Up @@ -571,6 +571,21 @@ class Spark41Shims extends SparkShims {
}
}

override def needsVariantAnnotationCheck: Boolean =
!SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)

override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean =
needsVariantAnnotationCheck && containsVariantAnnotation(footer.getFileMetaData.getSchema)

private def containsVariantAnnotation(groupType: GroupType): Boolean = {
groupType.getFields.asScala.exists {
field =>
Option(field.getLogicalTypeAnnotation)
.exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) ||
(!field.isPrimitive && containsVariantAnnotation(field.asGroupType()))
}
}

override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]

Expand Down
Loading