diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala index bec52747dea7c..9f4eb66775ce4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.csv -import com.univocity.parsers.common.AbstractParser +import com.univocity.parsers.common.{AbstractParser, TextParsingException} import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} import org.apache.spark.SparkIllegalArgumentException @@ -122,7 +122,7 @@ class CSVHeaderChecker( def checkHeaderColumnNames(line: String): Unit = { if (options.headerFlag) { val parser = new CsvParser(options.asParserSettings) - checkHeaderColumnNames(parser.parseLine(line)) + checkHeaderColumnNames(UnivocityParser.parseLine(parser, line)) } } @@ -130,7 +130,16 @@ class CSVHeaderChecker( private[csv] def checkHeaderColumnNames(tokenizer: AbstractParser[CsvParserSettings]): Unit = { assert(options.multiLine, "This method should be executed with multiLine.") if (options.headerFlag) { - val firstRecord = tokenizer.parseNext() + val firstRecord = try { + tokenizer.parseNext() + } catch { + // scalastyle:off line.size.limit + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + // scalastyle:on line.size.limit + throw UnivocityParser.malformedCsvRecord(e, Option(e.getParsedContent).getOrElse("")) + case e: ArrayIndexOutOfBoundsException => + throw UnivocityParser.malformedCsvRecord(e, "") + } checkHeaderColumnNames(firstRecord) } setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames)) @@ -146,9 +155,20 @@ class CSVHeaderChecker( // be not extracted. if (options.headerFlag && isStartOfFile) { CSVExprUtils.extractHeader(lines, options).foreach { header => - checkHeaderColumnNames(tokenizer.parseLine(header)) + val tokens = try { + tokenizer.parseLine(header) + } catch { + // scalastyle:off line.size.limit + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + // scalastyle:on line.size.limit + throw UnivocityParser.malformedCsvRecord(e, header) + case e: ArrayIndexOutOfBoundsException => + throw UnivocityParser.malformedCsvRecord(e, header) + } + checkHeaderColumnNames(tokens) } } setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames)) } + } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 113f9b088738b..a028f77495a48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -654,7 +654,8 @@ private[sql] object UnivocityParser { * is bounded to CSVOptions.MAX_ERROR_CONTENT_LENGTH so an oversized value cannot produce a huge * error message (SPARK-28431). */ - private def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = { + private[csv] def malformedCsvRecord( + cause: Throwable, badRecord: String): SparkRuntimeException = { val boundedRecord = if (badRecord.length > CSVOptions.MAX_ERROR_CONTENT_LENGTH) { badRecord.take(CSVOptions.MAX_ERROR_CONTENT_LENGTH) + "..." } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 7424d43341b26..5f7f046ad49ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -352,7 +352,7 @@ object TextInputCSVDataSource extends CSVDataSource { maybeFirstLine: Option[String], parsedOptions: CSVOptions): StructType = { val csvParser = new CsvParser(parsedOptions.asParserSettings) - maybeFirstLine.map(csvParser.parseLine(_)) match { + maybeFirstLine.map(UnivocityParser.parseLine(csvParser, _)) match { case Some(firstRow) if firstRow != null => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) @@ -362,9 +362,6 @@ object TextInputCSVDataSource extends CSVDataSource { val linesWithoutHeader = CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions) val parser = new CsvParser(parsedOptions.asParserSettings) - // Route data rows through UnivocityParser.parseLine so a too-many-columns row surfaces as - // MALFORMED_CSV_RECORD, not a raw ArrayIndexOutOfBoundsException (SPARK-57195). The - // first-line parse above stays raw to keep SPARK-28431's bounded TextParsingException. linesWithoutHeader.map(UnivocityParser.parseLine(parser, _)) } SQLExecution.withSQLConfPropagated(csv.sparkSession) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e48b453309aa8..ee775f8e28605 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3588,6 +3588,79 @@ abstract class CSVSuite matchPVals = true) } + test("SPARK-57515: non-multiLine CSV read with header exceeding maxColumns surfaces " + + "MALFORMED_CSV_RECORD") { + // inferFromDataset called csvParser.parseLine(header) directly without the AIOOBE guard + // that UnivocityParser.parseLine wraps. A header line wider than maxColumns must surface + // as MALFORMED_CSV_RECORD, not a raw TextParsingException. + withTempPath { path => + Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkRuntimeException] { + spark.read + .option("header", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + .collect() + } + checkError( + exception = e, + condition = "MALFORMED_CSV_RECORD", + sqlState = Some("KD000"), + parameters = Map("badRecord" -> "a,b,c"), + matchPVals = false) + } + } + + test("SPARK-57515: multiLine CSV read with header exceeding maxColumns surfaces " + + "MALFORMED_CSV_RECORD") { + // For multiLine reads, schema inference runs inside an RDD task, so the + // SparkRuntimeException(MALFORMED_CSV_RECORD) is wrapped in SparkException(FAILED_READ_FILE). + // Verify the cause chain surfaces the MALFORMED_CSV_RECORD condition. + withTempPath { path => + Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkException] { + spark.read + .option("header", "true") + .option("multiLine", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + .collect() + } + checkErrorMatchPVals( + exception = e, + condition = "FAILED_READ_FILE.NO_HINT", + parameters = Map("path" -> ".*")) + val cause = e.getCause + assert(cause.isInstanceOf[SparkRuntimeException]) + checkError( + exception = cause.asInstanceOf[SparkRuntimeException], + condition = "MALFORMED_CSV_RECORD", + sqlState = Some("KD000"), + parameters = Map("badRecord" -> ".*"), + matchPVals = true) + } + } + + test("SPARK-57515: Dataset[String] CSV read with header exceeding maxColumns surfaces " + + "MALFORMED_CSV_RECORD") { + // inferFromDataset called csvParser.parseLine(header) directly without the AIOOBE guard + // that UnivocityParser.parseLine wraps. + val lines = spark.createDataset(Seq("a,b,c", "1,2,3")) + val e = intercept[SparkRuntimeException] { + spark.read + .option("header", "true") + .option("maxColumns", "2") + .csv(lines) + .collect() + } + checkError( + exception = e, + condition = "MALFORMED_CSV_RECORD", + sqlState = Some("KD000"), + parameters = Map("badRecord" -> "a,b,c"), + matchPVals = false) + } + test("csv with variant") { withTempPath { path => val data =