Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/config/src/main/resources/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ dataset {
multipart_upload_chunk_size_mib = 50
multipart_upload_chunk_size_mib = ${?DATASET_MULTIPART_UPLOAD_CHUNK_SIZE_MIB}
}

csv {
csv_parser_max_columns = 512
csv_parser_max_columns = ${?CSV_PARSER_MAX_COLUMNS}
}
54 changes: 54 additions & 0 deletions common/dao/src/main/scala/org/apache/texera/dao/SiteSettings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.texera.dao

import org.jooq.impl.DSL

import scala.util.Try

/**
* Read-side accessor for the `site_settings` key/value table that admin pages
* write through. Centralises the "look up by key, parse, fall back on any
* failure" pattern that previously lived inline in ConfigResource,
* CSVScanSourceOpExec, and DatasetResource.
*
* Failures swallowed by the outer Try include: SqlServer not initialised
* (e.g. on workers in distributed mode), no row for the key, and value that
* can't be parsed. In all of these cases the caller's default takes over.
*/
object SiteSettings {

def getInt(key: String, default: => Int): Int =
readAndParse(key, default)(_.toInt)

def getLong(key: String, default: => Long): Long =
readAndParse(key, default)(_.toLong)

private def readAndParse[T](key: String, default: => T)(parse: String => T): T =
Try {
val raw = SqlServer
.getInstance()
.createDSLContext()
.select(DSL.field("value", classOf[String]))
.from(DSL.table(DSL.name("texera_db", "site_settings")))
.where(DSL.field("key", classOf[String]).eq(key))
.fetchOneInto(classOf[String])
Option(raw).map(s => parse(s.trim)).getOrElse(default)
}.getOrElse(default)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import org.apache.texera.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc}
import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpExec
import org.apache.texera.amber.util.JSONUtils.objectMapper

import java.io.{IOException, InputStreamReader}
Expand Down Expand Up @@ -89,6 +90,8 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc {
csvFormat.setLineSeparator("\n")
val csvSetting = new CsvParserSettings()
csvSetting.setMaxCharsPerColumn(-1)
val maxColumns = CSVScanSourceOpExec.getMaxColumns
csvSetting.setMaxColumns(maxColumns)
csvSetting.setFormat(csvFormat)
csvSetting.setHeaderExtractionEnabled(hasHeader)
csvSetting.setNullValue("")
Expand All @@ -97,8 +100,8 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc {

var data: Array[Array[String]] = Array()
val readLimit = limit.getOrElse(INFER_READ_LIMIT).min(INFER_READ_LIMIT)
for (i <- 0 until readLimit) {
val row = parser.parseNext()
for (_ <- 0 until readLimit) {
val row = CSVScanSourceOpExec.parseNextRow(parser, maxColumns)
if (row != null) {
data = data :+ row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,26 @@

package org.apache.texera.amber.operator.source.scan.csv

import com.univocity.parsers.common.TextParsingException
import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.texera.dao.SiteSettings

import java.io.InputStreamReader
import java.net.URI
import scala.collection.immutable.ArraySeq
import scala.util.Try

class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperatorExecutor {
val desc: CSVScanSourceOpDesc = objectMapper.readValue(descString, classOf[CSVScanSourceOpDesc])
var inputReader: InputStreamReader = _
var parser: CsvParser = _
var nextRow: Array[String] = _
var numRowGenerated = 0
private var maxColumns: Int = CSVScanSourceOpExec.DEFAULT_MAX_COLUMNS
private val schema: Schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] = {
Expand All @@ -44,7 +48,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
if (nextRow != null) {
return true
}
nextRow = parser.parseNext()
nextRow = CSVScanSourceOpExec.parseNextRow(parser, maxColumns)
nextRow != null
}

Expand Down Expand Up @@ -90,6 +94,8 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
) // disable skipping lines starting with # (default comment character)
val csvSetting = new CsvParserSettings()
csvSetting.setMaxCharsPerColumn(-1)
maxColumns = CSVScanSourceOpExec.getMaxColumns
csvSetting.setMaxColumns(maxColumns)
csvSetting.setFormat(csvFormat)
csvSetting.setHeaderExtractionEnabled(desc.hasHeader)

Expand All @@ -106,3 +112,41 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
}
}
}

object CSVScanSourceOpExec {
val DEFAULT_MAX_COLUMNS = 512

def getMaxColumns: Int =
SiteSettings.getInt("csv_parser_max_columns", DEFAULT_MAX_COLUMNS)

/**
* Wraps `parser.parseNext()` so a column-count overflow is reported to the user
* as a clear instruction rather than a deep Univocity stack trace. Other parser
* failures are rethrown unchanged.
*
* The thrown RuntimeException's message bubbles up through DataProcessor.handleExecutorException
* and becomes the title of the console message that drives the top-of-page toast.
*/
def parseNextRow(parser: CsvParser, maxColumns: Int): Array[String] = {
try parser.parseNext()
catch {
case e: TextParsingException if isColumnOverflow(e, maxColumns) =>
throw new RuntimeException(columnOverflowMessage(maxColumns), e)
}
}

private[csv] def isColumnOverflow(e: TextParsingException, maxColumns: Int): Boolean =
Option(e.getCause)
.collect { case aioobe: ArrayIndexOutOfBoundsException => aioobe }
.exists(aioobe => aioobeIndex(aioobe).forall(_ == maxColumns))

private def aioobeIndex(aioobe: ArrayIndexOutOfBoundsException): Option[Int] = {
val msg = Option(aioobe.getMessage).getOrElse("")
Try(msg.trim.toInt).toOption.orElse {
raw"Index (\d+) out of bounds".r.findFirstMatchIn(msg).map(_.group(1).toInt)
}
}

private[csv] def columnOverflowMessage(maxColumns: Int): String =
s"Max columns of $maxColumns exceeded."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.source.scan.csv

import com.univocity.parsers.common.TextParsingException
import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
import org.scalatest.flatspec.AnyFlatSpec

import java.io.StringReader

/**
* Verifies the column-overflow translation in [[CSVScanSourceOpExec.parseNextRow]]
* — the path that turns a deep Univocity stack trace into a single-sentence message
* the workflow user can act on.
*/
class CSVScanSourceOpExecSpec extends AnyFlatSpec {

private def parserWithMaxColumns(max: Int): CsvParser = {
val settings = new CsvParserSettings()
settings.setMaxColumns(max)
settings.setMaxCharsPerColumn(-1)
new CsvParser(settings)
}

"parseNextRow" should "return the parsed row when the input is within the column limit" in {
val parser = parserWithMaxColumns(10)
parser.beginParsing(new StringReader("a,b,c\n"))

val row = CSVScanSourceOpExec.parseNextRow(parser, 10)

assert(row.toSeq == Seq("a", "b", "c"))
}

it should "return null at end of input (so the iterator can terminate cleanly)" in {
val parser = parserWithMaxColumns(10)
parser.beginParsing(new StringReader(""))

assert(CSVScanSourceOpExec.parseNextRow(parser, 10) == null)
}

it should "translate a column-overflow TextParsingException into a clear user message" in {
val maxColumns = 2
val parser = parserWithMaxColumns(maxColumns)
parser.beginParsing(new StringReader("a,b,c,d,e\n"))

val ex = intercept[RuntimeException] {
CSVScanSourceOpExec.parseNextRow(parser, maxColumns)
}

// The message must mention the configured limit so the user knows what was hit.
assert(ex.getMessage.contains(maxColumns.toString))
assert(ex.getMessage.toLowerCase.contains("max columns"))
assert(ex.getMessage.toLowerCase.contains("exceeded"))
// The original Univocity exception is preserved as the cause so developers
// can still inspect the underlying parser state if needed.
assert(ex.getCause.isInstanceOf[TextParsingException])
}

"isColumnOverflow" should "detect AIOOBE causes from Java 8's plain-integer message" in {
val cause = new ArrayIndexOutOfBoundsException("5")
val ex = new TextParsingException(null, "wrapper", cause)
assert(CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 5))
assert(!CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 6))
}

it should "detect AIOOBE causes from Java 9+'s 'Index N out of bounds for length M' message" in {
val cause = new ArrayIndexOutOfBoundsException("Index 5 out of bounds for length 5")
val ex = new TextParsingException(null, "wrapper", cause)
assert(CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 5))
assert(!CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 6))
}

it should "ignore TextParsingExceptions whose cause is unrelated" in {
val unrelated = new TextParsingException(null, "Some other parsing problem")
val withDifferentCause =
new TextParsingException(null, "wrapper", new IllegalStateException("nope"))
assert(!CSVScanSourceOpExec.isColumnOverflow(unrelated, maxColumns = 5))
assert(!CSVScanSourceOpExec.isColumnOverflow(withDifferentCause, maxColumns = 5))
}

"columnOverflowMessage" should "include the configured maximum so the user knows the current limit" in {
val msg = CSVScanSourceOpExec.columnOverflowMessage(750)
assert(msg.contains("750"))
assert(msg.toLowerCase.contains("max columns"))
assert(msg.toLowerCase.contains("exceeded"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import jakarta.annotation.security.RolesAllowed
import jakarta.ws.rs.core.MediaType
import jakarta.ws.rs.{GET, Path, Produces}
import org.apache.texera.config.{AuthConfig, ComputingUnitConfig, GuiConfig, UserSystemConfig}
import org.apache.texera.dao.SiteSettings

@Path("/config")
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -57,7 +58,10 @@ class ConfigResource {
),
"activeTimeInMinutes" -> GuiConfig.guiWorkflowWorkspaceActiveTimeInMinutes,
"copilotEnabled" -> GuiConfig.guiWorkflowWorkspaceCopilotEnabled,
"limitColumns" -> GuiConfig.guiWorkflowWorkspaceLimitColumns,
"limitColumns" -> SiteSettings.getInt(
"result_table_columns_per_batch",
GuiConfig.guiWorkflowWorkspaceLimitColumns
),
// flags from the auth.conf if needed
"expirationTimeInMinutes" -> AuthConfig.jwtExpirationMinutes
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.texera.amber.core.storage.model.OnDataset
import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
import org.apache.texera.amber.core.storage.{DocumentFactory, FileResolver}
import org.apache.texera.auth.SessionUser
import org.apache.texera.dao.SiteSettings
import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.SqlServer.withTransaction
import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum
Expand Down Expand Up @@ -87,15 +88,8 @@ object DatasetResource {
.getInstance()
.createDSLContext()

private def singleFileUploadMaxBytes(ctx: DSLContext, defaultMiB: Long = 20L): Long = {
val limit = ctx
.select(DSL.field("value", classOf[String]))
.from(DSL.table(DSL.name("texera_db", "site_settings")))
.where(DSL.field("key", classOf[String]).eq("single_file_upload_max_size_mib"))
.fetchOneInto(classOf[String])
Try(Option(limit).getOrElse(defaultMiB.toString).trim.toLong)
.getOrElse(defaultMiB) * 1024L * 1024L
}
private def singleFileUploadMaxBytes(defaultMiB: Long = 20L): Long =
SiteSettings.getLong("single_file_upload_max_size_mib", defaultMiB) * 1024L * 1024L

/**
* Helper function to get the dataset from DB using did
Expand Down Expand Up @@ -1577,7 +1571,7 @@ class DatasetResource {
if (fileSizeBytesValue <= 0L) throw new BadRequestException("fileSizeBytes must be > 0")
if (partSizeBytesValue <= 0L) throw new BadRequestException("partSizeBytes must be > 0")

val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx)
val totalMaxBytes: Long = singleFileUploadMaxBytes()
if (totalMaxBytes <= 0L) {
throw new WebApplicationException(
"singleFileUploadMaxBytes must be > 0",
Expand Down Expand Up @@ -1969,7 +1963,7 @@ class DatasetResource {
)
}

val maxBytes = singleFileUploadMaxBytes(ctx)
val maxBytes = singleFileUploadMaxBytes()
val tooLarge = actualSizeBytes > maxBytes

if (tooLarge) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,36 @@ <h2 class="page-title">General Settings</h2>
</button>
</div>
</nz-card>

<nz-card nzTitle="Result Panel">
<div class="settings-row">
<span>Max Columns:</span>
<nz-input-number
[(ngModel)]="csvMaxColumns"
[nzMin]="MIN_CSV_MAX_COLUMNS"
[nzMax]="MAX_CSV_MAX_COLUMNS"
[nzStep]="100"
[nzPrecision]="0">
</nz-input-number>
</div>
<div class="help-text-number">
Maximum number of columns the CSV parser will accept per row. The Univocity parser default is 512. Increase this
value if your CSV files have more than 512 columns. (Range: {{ MIN_CSV_MAX_COLUMNS }} - {{ MAX_CSV_MAX_COLUMNS |
number }})
</div>

<div class="button-row">
<button
nz-button
nzType="primary"
(click)="saveCsvSettings()">
Save
</button>
<button
nz-button
nzType="default"
(click)="resetCsvSettings()">
Reset
</button>
</div>
</nz-card>
Loading
Loading