diff --git a/common/config/src/main/resources/default.conf b/common/config/src/main/resources/default.conf index 0ce2a2e38b6..b08db4134f0 100644 --- a/common/config/src/main/resources/default.conf +++ b/common/config/src/main/resources/default.conf @@ -91,3 +91,8 @@ dataset { multipart_upload_chunk_size_mib = 50 multipart_upload_chunk_size_mib = ${?DATASET_MULTIPART_UPLOAD_CHUNK_SIZE_MIB} } + +csv { + csv_parser_max_columns = 512 + csv_parser_max_columns = ${?CSV_PARSER_MAX_COLUMNS} +} \ No newline at end of file diff --git a/common/dao/src/main/scala/org/apache/texera/dao/SiteSettings.scala b/common/dao/src/main/scala/org/apache/texera/dao/SiteSettings.scala new file mode 100644 index 00000000000..f93772c406d --- /dev/null +++ b/common/dao/src/main/scala/org/apache/texera/dao/SiteSettings.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.texera.dao + +import org.jooq.impl.DSL + +import scala.util.Try + +/** + * Read-side accessor for the `site_settings` key/value table that admin pages + * write through. Centralises the "look up by key, parse, fall back on any + * failure" pattern that previously lived inline in ConfigResource, + * CSVScanSourceOpExec, and DatasetResource. + * + * Failures swallowed by the outer Try include: SqlServer not initialised + * (e.g. on workers in distributed mode), no row for the key, and value that + * can't be parsed. In all of these cases the caller's default takes over. + */ +object SiteSettings { + + def getInt(key: String, default: => Int): Int = + readAndParse(key, default)(_.toInt) + + def getLong(key: String, default: => Long): Long = + readAndParse(key, default)(_.toLong) + + private def readAndParse[T](key: String, default: => T)(parse: String => T): T = + Try { + val raw = SqlServer + .getInstance() + .createDSLContext() + .select(DSL.field("value", classOf[String])) + .from(DSL.table(DSL.name("texera_db", "site_settings"))) + .where(DSL.field("key", classOf[String]).eq(key)) + .fetchOneInto(classOf[String]) + Option(raw).map(s => parse(s.trim)).getOrElse(default) + }.getOrElse(default) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala index a44e2765d5e..57b173583ec 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala @@ -29,6 +29,7 @@ import org.apache.texera.amber.core.tuple.{AttributeType, Schema} import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import org.apache.texera.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc +import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpExec import org.apache.texera.amber.util.JSONUtils.objectMapper import java.io.{IOException, InputStreamReader} @@ -89,6 +90,8 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { csvFormat.setLineSeparator("\n") val csvSetting = new CsvParserSettings() csvSetting.setMaxCharsPerColumn(-1) + val maxColumns = CSVScanSourceOpExec.getMaxColumns + csvSetting.setMaxColumns(maxColumns) csvSetting.setFormat(csvFormat) csvSetting.setHeaderExtractionEnabled(hasHeader) csvSetting.setNullValue("") @@ -97,8 +100,8 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { var data: Array[Array[String]] = Array() val readLimit = limit.getOrElse(INFER_READ_LIMIT).min(INFER_READ_LIMIT) - for (i <- 0 until readLimit) { - val row = parser.parseNext() + for (_ <- 0 until readLimit) { + val row = CSVScanSourceOpExec.parseNextRow(parser, maxColumns) if (row != null) { data = data :+ row } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala index c3fbbe9bb55..13227573fd0 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala @@ -19,15 +19,18 @@ package org.apache.texera.amber.operator.source.scan.csv +import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings} import org.apache.texera.amber.core.executor.SourceOperatorExecutor import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike} import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.dao.SiteSettings import java.io.InputStreamReader import java.net.URI import scala.collection.immutable.ArraySeq +import scala.util.Try class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperatorExecutor { val desc: CSVScanSourceOpDesc = objectMapper.readValue(descString, classOf[CSVScanSourceOpDesc]) @@ -35,6 +38,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat var parser: CsvParser = _ var nextRow: Array[String] = _ var numRowGenerated = 0 + private var maxColumns: Int = CSVScanSourceOpExec.DEFAULT_MAX_COLUMNS private val schema: Schema = desc.sourceSchema() override def produceTuple(): Iterator[TupleLike] = { @@ -44,7 +48,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat if (nextRow != null) { return true } - nextRow = parser.parseNext() + nextRow = CSVScanSourceOpExec.parseNextRow(parser, maxColumns) nextRow != null } @@ -90,6 +94,8 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat ) // disable skipping lines starting with # (default comment character) val csvSetting = new CsvParserSettings() csvSetting.setMaxCharsPerColumn(-1) + maxColumns = CSVScanSourceOpExec.getMaxColumns + csvSetting.setMaxColumns(maxColumns) csvSetting.setFormat(csvFormat) csvSetting.setHeaderExtractionEnabled(desc.hasHeader) @@ -106,3 +112,41 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat } } } + +object CSVScanSourceOpExec { + val DEFAULT_MAX_COLUMNS = 512 + + def getMaxColumns: Int = + SiteSettings.getInt("csv_parser_max_columns", DEFAULT_MAX_COLUMNS) + + /** + * Wraps `parser.parseNext()` so a column-count overflow is reported to the user + * as a clear instruction rather than a deep Univocity stack trace. Other parser + * failures are rethrown unchanged. + * + * The thrown RuntimeException's message bubbles up through DataProcessor.handleExecutorException + * and becomes the title of the console message that drives the top-of-page toast. + */ + def parseNextRow(parser: CsvParser, maxColumns: Int): Array[String] = { + try parser.parseNext() + catch { + case e: TextParsingException if isColumnOverflow(e, maxColumns) => + throw new RuntimeException(columnOverflowMessage(maxColumns), e) + } + } + + private[csv] def isColumnOverflow(e: TextParsingException, maxColumns: Int): Boolean = + Option(e.getCause) + .collect { case aioobe: ArrayIndexOutOfBoundsException => aioobe } + .exists(aioobe => aioobeIndex(aioobe).forall(_ == maxColumns)) + + private def aioobeIndex(aioobe: ArrayIndexOutOfBoundsException): Option[Int] = { + val msg = Option(aioobe.getMessage).getOrElse("") + Try(msg.trim.toInt).toOption.orElse { + raw"Index (\d+) out of bounds".r.findFirstMatchIn(msg).map(_.group(1).toInt) + } + } + + private[csv] def columnOverflowMessage(maxColumns: Int): String = + s"Max columns of $maxColumns exceeded." +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExecSpec.scala new file mode 100644 index 00000000000..3818a8f5e92 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExecSpec.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan.csv + +import com.univocity.parsers.common.TextParsingException +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.scalatest.flatspec.AnyFlatSpec + +import java.io.StringReader + +/** + * Verifies the column-overflow translation in [[CSVScanSourceOpExec.parseNextRow]] + * — the path that turns a deep Univocity stack trace into a single-sentence message + * the workflow user can act on. + */ +class CSVScanSourceOpExecSpec extends AnyFlatSpec { + + private def parserWithMaxColumns(max: Int): CsvParser = { + val settings = new CsvParserSettings() + settings.setMaxColumns(max) + settings.setMaxCharsPerColumn(-1) + new CsvParser(settings) + } + + "parseNextRow" should "return the parsed row when the input is within the column limit" in { + val parser = parserWithMaxColumns(10) + parser.beginParsing(new StringReader("a,b,c\n")) + + val row = CSVScanSourceOpExec.parseNextRow(parser, 10) + + assert(row.toSeq == Seq("a", "b", "c")) + } + + it should "return null at end of input (so the iterator can terminate cleanly)" in { + val parser = parserWithMaxColumns(10) + parser.beginParsing(new StringReader("")) + + assert(CSVScanSourceOpExec.parseNextRow(parser, 10) == null) + } + + it should "translate a column-overflow TextParsingException into a clear user message" in { + val maxColumns = 2 + val parser = parserWithMaxColumns(maxColumns) + parser.beginParsing(new StringReader("a,b,c,d,e\n")) + + val ex = intercept[RuntimeException] { + CSVScanSourceOpExec.parseNextRow(parser, maxColumns) + } + + // The message must mention the configured limit so the user knows what was hit. + assert(ex.getMessage.contains(maxColumns.toString)) + assert(ex.getMessage.toLowerCase.contains("max columns")) + assert(ex.getMessage.toLowerCase.contains("exceeded")) + // The original Univocity exception is preserved as the cause so developers + // can still inspect the underlying parser state if needed. + assert(ex.getCause.isInstanceOf[TextParsingException]) + } + + "isColumnOverflow" should "detect AIOOBE causes from Java 8's plain-integer message" in { + val cause = new ArrayIndexOutOfBoundsException("5") + val ex = new TextParsingException(null, "wrapper", cause) + assert(CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 5)) + assert(!CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 6)) + } + + it should "detect AIOOBE causes from Java 9+'s 'Index N out of bounds for length M' message" in { + val cause = new ArrayIndexOutOfBoundsException("Index 5 out of bounds for length 5") + val ex = new TextParsingException(null, "wrapper", cause) + assert(CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 5)) + assert(!CSVScanSourceOpExec.isColumnOverflow(ex, maxColumns = 6)) + } + + it should "ignore TextParsingExceptions whose cause is unrelated" in { + val unrelated = new TextParsingException(null, "Some other parsing problem") + val withDifferentCause = + new TextParsingException(null, "wrapper", new IllegalStateException("nope")) + assert(!CSVScanSourceOpExec.isColumnOverflow(unrelated, maxColumns = 5)) + assert(!CSVScanSourceOpExec.isColumnOverflow(withDifferentCause, maxColumns = 5)) + } + + "columnOverflowMessage" should "include the configured maximum so the user knows the current limit" in { + val msg = CSVScanSourceOpExec.columnOverflowMessage(750) + assert(msg.contains("750")) + assert(msg.toLowerCase.contains("max columns")) + assert(msg.toLowerCase.contains("exceeded")) + } +} diff --git a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala index b7517d81eb7..d0c112ce098 100644 --- a/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala +++ b/config-service/src/main/scala/org/apache/texera/service/resource/ConfigResource.scala @@ -23,6 +23,7 @@ import jakarta.annotation.security.RolesAllowed import jakarta.ws.rs.core.MediaType import jakarta.ws.rs.{GET, Path, Produces} import org.apache.texera.config.{AuthConfig, ComputingUnitConfig, GuiConfig, UserSystemConfig} +import org.apache.texera.dao.SiteSettings @Path("/config") @Produces(Array(MediaType.APPLICATION_JSON)) @@ -57,7 +58,10 @@ class ConfigResource { ), "activeTimeInMinutes" -> GuiConfig.guiWorkflowWorkspaceActiveTimeInMinutes, "copilotEnabled" -> GuiConfig.guiWorkflowWorkspaceCopilotEnabled, - "limitColumns" -> GuiConfig.guiWorkflowWorkspaceLimitColumns, + "limitColumns" -> SiteSettings.getInt( + "result_table_columns_per_batch", + GuiConfig.guiWorkflowWorkspaceLimitColumns + ), // flags from the auth.conf if needed "expirationTimeInMinutes" -> AuthConfig.jwtExpirationMinutes ) diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index 7bcd3bb77c3..46457c9454e 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -28,6 +28,7 @@ import org.apache.texera.amber.core.storage.model.OnDataset import org.apache.texera.amber.core.storage.util.LakeFSStorageClient import org.apache.texera.amber.core.storage.{DocumentFactory, FileResolver} import org.apache.texera.auth.SessionUser +import org.apache.texera.dao.SiteSettings import org.apache.texera.dao.SqlServer import org.apache.texera.dao.SqlServer.withTransaction import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum @@ -87,15 +88,8 @@ object DatasetResource { .getInstance() .createDSLContext() - private def singleFileUploadMaxBytes(ctx: DSLContext, defaultMiB: Long = 20L): Long = { - val limit = ctx - .select(DSL.field("value", classOf[String])) - .from(DSL.table(DSL.name("texera_db", "site_settings"))) - .where(DSL.field("key", classOf[String]).eq("single_file_upload_max_size_mib")) - .fetchOneInto(classOf[String]) - Try(Option(limit).getOrElse(defaultMiB.toString).trim.toLong) - .getOrElse(defaultMiB) * 1024L * 1024L - } + private def singleFileUploadMaxBytes(defaultMiB: Long = 20L): Long = + SiteSettings.getLong("single_file_upload_max_size_mib", defaultMiB) * 1024L * 1024L /** * Helper function to get the dataset from DB using did @@ -1577,7 +1571,7 @@ class DatasetResource { if (fileSizeBytesValue <= 0L) throw new BadRequestException("fileSizeBytes must be > 0") if (partSizeBytesValue <= 0L) throw new BadRequestException("partSizeBytes must be > 0") - val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx) + val totalMaxBytes: Long = singleFileUploadMaxBytes() if (totalMaxBytes <= 0L) { throw new WebApplicationException( "singleFileUploadMaxBytes must be > 0", @@ -1969,7 +1963,7 @@ class DatasetResource { ) } - val maxBytes = singleFileUploadMaxBytes(ctx) + val maxBytes = singleFileUploadMaxBytes() val tooLarge = actualSizeBytes > maxBytes if (tooLarge) { diff --git a/frontend/src/app/dashboard/component/admin/settings/admin-settings.component.html b/frontend/src/app/dashboard/component/admin/settings/admin-settings.component.html index a0ea29e77b5..bc5bc88649e 100644 --- a/frontend/src/app/dashboard/component/admin/settings/admin-settings.component.html +++ b/frontend/src/app/dashboard/component/admin/settings/admin-settings.component.html @@ -340,3 +340,36 @@