Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.csv

import com.univocity.parsers.common.AbstractParser
import com.univocity.parsers.common.{AbstractParser, TextParsingException}
import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}

import org.apache.spark.SparkIllegalArgumentException
Expand Down Expand Up @@ -122,15 +122,24 @@ class CSVHeaderChecker(
def checkHeaderColumnNames(line: String): Unit = {
if (options.headerFlag) {
val parser = new CsvParser(options.asParserSettings)
checkHeaderColumnNames(parser.parseLine(line))
checkHeaderColumnNames(UnivocityParser.parseLine(parser, line))
}
}

// This is currently only used to parse CSV with multiLine mode.
private[csv] def checkHeaderColumnNames(tokenizer: AbstractParser[CsvParserSettings]): Unit = {
assert(options.multiLine, "This method should be executed with multiLine.")
if (options.headerFlag) {
val firstRecord = tokenizer.parseNext()
val firstRecord = try {
tokenizer.parseNext()
} catch {
// scalastyle:off line.size.limit
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
// scalastyle:on line.size.limit
throw UnivocityParser.malformedCsvRecord(e, Option(e.getParsedContent).getOrElse(""))
case e: ArrayIndexOutOfBoundsException =>
throw UnivocityParser.malformedCsvRecord(e, "")
}
checkHeaderColumnNames(firstRecord)
}
setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames))
Expand All @@ -146,9 +155,20 @@ class CSVHeaderChecker(
// be not extracted.
if (options.headerFlag && isStartOfFile) {
CSVExprUtils.extractHeader(lines, options).foreach { header =>
checkHeaderColumnNames(tokenizer.parseLine(header))
val tokens = try {
tokenizer.parseLine(header)
} catch {
// scalastyle:off line.size.limit
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
// scalastyle:on line.size.limit
throw UnivocityParser.malformedCsvRecord(e, header)
case e: ArrayIndexOutOfBoundsException =>
throw UnivocityParser.malformedCsvRecord(e, header)
}
checkHeaderColumnNames(tokens)
}
}
setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,8 @@ private[sql] object UnivocityParser {
* is bounded to CSVOptions.MAX_ERROR_CONTENT_LENGTH so an oversized value cannot produce a huge
* error message (SPARK-28431).
*/
private def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = {
private[csv] def malformedCsvRecord(
cause: Throwable, badRecord: String): SparkRuntimeException = {
val boundedRecord = if (badRecord.length > CSVOptions.MAX_ERROR_CONTENT_LENGTH) {
badRecord.take(CSVOptions.MAX_ERROR_CONTENT_LENGTH) + "..."
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ object TextInputCSVDataSource extends CSVDataSource {
maybeFirstLine: Option[String],
parsedOptions: CSVOptions): StructType = {
val csvParser = new CsvParser(parsedOptions.asParserSettings)
maybeFirstLine.map(csvParser.parseLine(_)) match {
maybeFirstLine.map(UnivocityParser.parseLine(csvParser, _)) match {
case Some(firstRow) if firstRow != null =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
Expand All @@ -362,9 +362,6 @@ object TextInputCSVDataSource extends CSVDataSource {
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
// Route data rows through UnivocityParser.parseLine so a too-many-columns row surfaces as
// MALFORMED_CSV_RECORD, not a raw ArrayIndexOutOfBoundsException (SPARK-57195). The
// first-line parse above stays raw to keep SPARK-28431's bounded TextParsingException.
linesWithoutHeader.map(UnivocityParser.parseLine(parser, _))
}
SQLExecution.withSQLConfPropagated(csv.sparkSession) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3588,6 +3588,79 @@ abstract class CSVSuite
matchPVals = true)
}

test("SPARK-57515: non-multiLine CSV read with header exceeding maxColumns surfaces " +
"MALFORMED_CSV_RECORD") {
// inferFromDataset called csvParser.parseLine(header) directly without the AIOOBE guard
// that UnivocityParser.parseLine wraps. A header line wider than maxColumns must surface
// as MALFORMED_CSV_RECORD, not a raw TextParsingException.
withTempPath { path =>
Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
val e = intercept[SparkRuntimeException] {
spark.read
.option("header", "true")
.option("maxColumns", "2")
.csv(path.getAbsolutePath)
.collect()
}
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> "a,b,c"),
matchPVals = false)
}
}

test("SPARK-57515: multiLine CSV read with header exceeding maxColumns surfaces " +
"MALFORMED_CSV_RECORD") {
// For multiLine reads, schema inference runs inside an RDD task, so the
// SparkRuntimeException(MALFORMED_CSV_RECORD) is wrapped in SparkException(FAILED_READ_FILE).
// Verify the cause chain surfaces the MALFORMED_CSV_RECORD condition.
withTempPath { path =>
Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
val e = intercept[SparkException] {
spark.read
.option("header", "true")
.option("multiLine", "true")
.option("maxColumns", "2")
.csv(path.getAbsolutePath)
.collect()
}
checkErrorMatchPVals(
exception = e,
condition = "FAILED_READ_FILE.NO_HINT",
parameters = Map("path" -> ".*"))
val cause = e.getCause
assert(cause.isInstanceOf[SparkRuntimeException])
checkError(
exception = cause.asInstanceOf[SparkRuntimeException],
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> ".*"),
matchPVals = true)
}
}

test("SPARK-57515: Dataset[String] CSV read with header exceeding maxColumns surfaces " +
"MALFORMED_CSV_RECORD") {
// inferFromDataset called csvParser.parseLine(header) directly without the AIOOBE guard
// that UnivocityParser.parseLine wraps.
val lines = spark.createDataset(Seq("a,b,c", "1,2,3"))
val e = intercept[SparkRuntimeException] {
spark.read
.option("header", "true")
.option("maxColumns", "2")
.csv(lines)
.collect()
}
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> "a,b,c"),
matchPVals = false)
}

test("csv with variant") {
withTempPath { path =>
val data =
Expand Down