From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 01/11] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 02/11] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 63fb715618a5c5a992477eb82491858bc89454ea Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 25 Jan 2026 17:45:21 +0400 Subject: [PATCH 03/11] Feat: impl elt function --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../org/apache/comet/serde/strings.scala | 12 +++++- .../comet/CometStringExpressionSuite.scala | 39 ++++++++++++++++++- 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 680cf80c75..998af0d91a 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -51,6 +51,7 @@ use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::math::hex::SparkHex; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; +use datafusion_spark::function::string::elt::SparkElt; use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; @@ -351,6 +352,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkHex::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkElt::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 066680456e..05b0950f75 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -172,7 +172,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[StringTrimRight] -> CometScalarFunction("rtrim"), classOf[Left] -> CometLeft, classOf[Substring] -> CometSubstring, - classOf[Upper] -> CometUpper) + classOf[Upper] -> CometUpper, + classOf[Elt] -> CometElt) private val bitwiseExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[BitwiseAnd] -> CometBitwiseAnd, diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index ea42b245aa..5d95b5109b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Expression, InitCap, Left, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Elt, Expression, InitCap, Left, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper} import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} import org.apache.comet.CometConf @@ -289,6 +289,16 @@ object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] { } } +object CometElt extends CometScalarFunction[Elt]("elt") { + + override def getSupportLevel(expr: Elt): SupportLevel = { + if (expr.failOnError) { + return Unsupported(Some("failOnError=true is not supported")) + } + Compatible(None) + } +} + trait CommonStringExprs { def stringDecode( diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index f9882780c8..a7a7c5442b 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -22,9 +22,10 @@ package org.apache.comet import scala.util.Random import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.sql.{CometTestBase, DataFrame} +import org.apache.spark.sql.{functions, CometTestBase, DataFrame} +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} @@ -391,4 +392,38 @@ class CometStringExpressionSuite extends CometTestBase { } } + test("elt") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + val r = new Random(42) + val fieldsCount = 10 + val indexes = Seq.range(1, fieldsCount) + val edgeCasesIndexes = Seq(-1, 0, -100, fieldsCount + 100) + val schema = indexes + .foldLeft(new StructType())((schema, idx) => + schema.add(s"c$idx", StringType, nullable = true)) + val df = FuzzDataGenerator.generateDataFrame( + r, + spark, + schema, + 100, + DataGenOptions(maxStringLength = 6)) + df.withColumn( + "idx", + lit(Random.shuffle(indexes ++ edgeCasesIndexes).headOption.getOrElse(-1))) + .createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + sql(s"SELECT elt(idx, ${schema.fieldNames.mkString(",")}) FROM t1")) + checkSparkAnswerAndOperator( + sql(s"SELECT elt(cast(null as int), ${schema.fieldNames.mkString(",")}) FROM t1")) + checkSparkAnswerMaybeThrows(sql(s"SELECT elt(1) FROM t1")) match { + case (Some(spark), Some(comet)) => + assert(spark.getMessage.contains("WRONG_NUM_ARGS.WITHOUT_SUGGESTION")) + assert(comet.getMessage.contains("WRONG_NUM_ARGS.WITHOUT_SUGGESTION")) + case (spark, comet) => + fail( + s"Expected Spark and Comet to throw exception, but got\nSpark: $spark\nComet: $comet") + } + } + } + } From 4b07cc4ea99a563de059d28359d6ffde62b411c8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 25 Jan 2026 18:06:56 +0400 Subject: [PATCH 04/11] Added micro benchmark test --- .../spark/sql/benchmark/CometStringExpressionBenchmark.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index c7c750aed6..d219477d77 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -76,7 +76,8 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { StringExprConfig("substring", "select substring(c1, 1, 100) from parquetV1Table"), StringExprConfig("translate", "select translate(c1, '123456', 'aBcDeF') from parquetV1Table"), StringExprConfig("trim", "select trim(c1) from parquetV1Table"), - StringExprConfig("upper", "select upper(c1) from parquetV1Table")) + StringExprConfig("upper", "select upper(c1) from parquetV1Table"), + StringExprConfig("elt", "select elt(2, c1, c1) from parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { runBenchmarkWithTable("String expressions", 1024) { v => From 0f518f6a6cf64504848f465523d904e4ff7a4b19 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 27 Jan 2026 10:38:47 +0400 Subject: [PATCH 05/11] Fix fmt --- .../scala/org/apache/comet/CometStringExpressionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index a7a7c5442b..286f26789b 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -22,7 +22,7 @@ package org.apache.comet import scala.util.Random import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.sql.{functions, CometTestBase, DataFrame} +import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} @@ -415,7 +415,7 @@ class CometStringExpressionSuite extends CometTestBase { sql(s"SELECT elt(idx, ${schema.fieldNames.mkString(",")}) FROM t1")) checkSparkAnswerAndOperator( sql(s"SELECT elt(cast(null as int), ${schema.fieldNames.mkString(",")}) FROM t1")) - checkSparkAnswerMaybeThrows(sql(s"SELECT elt(1) FROM t1")) match { + checkSparkAnswerMaybeThrows(sql("SELECT elt(1) FROM t1")) match { case (Some(spark), Some(comet)) => assert(spark.getMessage.contains("WRONG_NUM_ARGS.WITHOUT_SUGGESTION")) assert(comet.getMessage.contains("WRONG_NUM_ARGS.WITHOUT_SUGGESTION")) From 1604dc63dde385440386ce470b0cf4c7c3ab33a2 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 27 Jan 2026 21:26:46 +0400 Subject: [PATCH 06/11] Fix PR issues --- spark/src/main/scala/org/apache/comet/serde/strings.scala | 2 +- .../org/apache/comet/CometStringExpressionSuite.scala | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 5d95b5109b..2f662c4170 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -293,7 +293,7 @@ object CometElt extends CometScalarFunction[Elt]("elt") { override def getSupportLevel(expr: Elt): SupportLevel = { if (expr.failOnError) { - return Unsupported(Some("failOnError=true is not supported")) + return Unsupported(Some("ANSI mode not supported")) } Compatible(None) } diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 286f26789b..f51014cd99 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -31,6 +31,9 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometStringExpressionSuite extends CometTestBase { + private val WRONG_NUM_ARGS_WITHOUT_SUGGESTION_EXCEPTION_MSG = + "[WRONG_NUM_ARGS.WITHOUT_SUGGESTION] The `elt` requires > 1 parameters but the actual number is 1." + test("lpad string") { testStringPadding("lpad") } @@ -417,8 +420,8 @@ class CometStringExpressionSuite extends CometTestBase { sql(s"SELECT elt(cast(null as int), ${schema.fieldNames.mkString(",")}) FROM t1")) checkSparkAnswerMaybeThrows(sql("SELECT elt(1) FROM t1")) match { case (Some(spark), Some(comet)) => - assert(spark.getMessage.contains("WRONG_NUM_ARGS.WITHOUT_SUGGESTION")) - assert(comet.getMessage.contains("WRONG_NUM_ARGS.WITHOUT_SUGGESTION")) + assert(spark.getMessage.contains(WRONG_NUM_ARGS_WITHOUT_SUGGESTION_EXCEPTION_MSG)) + assert(comet.getMessage.contains(WRONG_NUM_ARGS_WITHOUT_SUGGESTION_EXCEPTION_MSG)) case (spark, comet) => fail( s"Expected Spark and Comet to throw exception, but got\nSpark: $spark\nComet: $comet") From fc22ee738eb6c0befe1cba5506cfa0070e71c756 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Thu, 29 Jan 2026 21:55:07 +0400 Subject: [PATCH 07/11] Fix PR issues --- .../org/apache/comet/serde/strings.scala | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 2f662c4170..710c011330 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -19,16 +19,15 @@ package org.apache.comet.serde -import java.util.Locale - -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Elt, Expression, InitCap, Left, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper} -import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} - import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Elt, Expression, InitCap, Left, Length, Like, Literal, Lower, RLike, RegExpReplace, StringLPad, StringRPad, StringRepeat, Substring, Upper} +import org.apache.spark.sql.types._ + +import java.util.Locale object CometStringRepeat extends CometExpressionSerde[StringRepeat] { @@ -295,6 +294,24 @@ object CometElt extends CometScalarFunction[Elt]("elt") { if (expr.failOnError) { return Unsupported(Some("ANSI mode not supported")) } + if (expr.children.length < 2) { + return Unsupported(Some("The `elt` requires > 1 parameters but the actual number is 1")) + } + val idxDataType = expr.children.head.dataType + if (idxDataType != IntegerType) { + return Unsupported(Some(s"Parameter 1 requires the int type, but got: $idxDataType")) + } + if (!expr.children.tail.forall(_.dataType == StringType)) { + val unsupportedTypes = + expr.children.tail + .filter(_.dataType != StringType) + .map(_.dataType) + .distinct + .mkString(", ") + return Unsupported( + Some( + s"Parameters 2 and onwards require the string type, but contains: $unsupportedTypes")) + } Compatible(None) } } From 8e3f482e63b78c62f205eba1a5f79d693e76e667 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Thu, 29 Jan 2026 22:22:04 +0400 Subject: [PATCH 08/11] Fix PR issues --- .../src/main/scala/org/apache/comet/serde/strings.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 710c011330..de11791a99 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -19,15 +19,16 @@ package org.apache.comet.serde +import java.util.Locale + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Elt, Expression, InitCap, Left, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper} +import org.apache.spark.sql.types._ + import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Elt, Expression, InitCap, Left, Length, Like, Literal, Lower, RLike, RegExpReplace, StringLPad, StringRPad, StringRepeat, Substring, Upper} -import org.apache.spark.sql.types._ - -import java.util.Locale object CometStringRepeat extends CometExpressionSerde[StringRepeat] { From 2e66014e4c917f41cfbea873d518ea5b87ce8fff Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 31 Jan 2026 20:02:27 +0400 Subject: [PATCH 09/11] Fix PR issues --- .../scala/org/apache/comet/serde/strings.scala | 18 ------------------ .../comet/CometStringExpressionSuite.scala | 18 +++++++++++------- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index de11791a99..94927a39f3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -295,24 +295,6 @@ object CometElt extends CometScalarFunction[Elt]("elt") { if (expr.failOnError) { return Unsupported(Some("ANSI mode not supported")) } - if (expr.children.length < 2) { - return Unsupported(Some("The `elt` requires > 1 parameters but the actual number is 1")) - } - val idxDataType = expr.children.head.dataType - if (idxDataType != IntegerType) { - return Unsupported(Some(s"Parameter 1 requires the int type, but got: $idxDataType")) - } - if (!expr.children.tail.forall(_.dataType == StringType)) { - val unsupportedTypes = - expr.children.tail - .filter(_.dataType != StringType) - .map(_.dataType) - .distinct - .mkString(", ") - return Unsupported( - Some( - s"Parameters 2 and onwards require the string type, but contains: $unsupportedTypes")) - } Compatible(None) } } diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index f51014cd99..71acc1d658 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -21,19 +21,18 @@ package org.apache.comet import scala.util.Random +import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} +import org.apache.comet.serde.CometElt import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometStringExpressionSuite extends CometTestBase { - private val WRONG_NUM_ARGS_WITHOUT_SUGGESTION_EXCEPTION_MSG = - "[WRONG_NUM_ARGS.WITHOUT_SUGGESTION] The `elt` requires > 1 parameters but the actual number is 1." - test("lpad string") { testStringPadding("lpad") } @@ -396,7 +395,12 @@ class CometStringExpressionSuite extends CometTestBase { } test("elt") { - withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + val wrongNumArgsWithoutSuggestionExceptionMsg = + "[WRONG_NUM_ARGS.WITHOUT_SUGGESTION] The `elt` requires > 1 parameters but the actual number is 1." + withSQLConf( + SQLConf.ANSI_ENABLED.key -> "false", + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { val r = new Random(42) val fieldsCount = 10 val indexes = Seq.range(1, fieldsCount) @@ -420,13 +424,13 @@ class CometStringExpressionSuite extends CometTestBase { sql(s"SELECT elt(cast(null as int), ${schema.fieldNames.mkString(",")}) FROM t1")) checkSparkAnswerMaybeThrows(sql("SELECT elt(1) FROM t1")) match { case (Some(spark), Some(comet)) => - assert(spark.getMessage.contains(WRONG_NUM_ARGS_WITHOUT_SUGGESTION_EXCEPTION_MSG)) - assert(comet.getMessage.contains(WRONG_NUM_ARGS_WITHOUT_SUGGESTION_EXCEPTION_MSG)) + assert(spark.getMessage.contains(wrongNumArgsWithoutSuggestionExceptionMsg)) + assert(comet.getMessage.contains(wrongNumArgsWithoutSuggestionExceptionMsg)) case (spark, comet) => fail( s"Expected Spark and Comet to throw exception, but got\nSpark: $spark\nComet: $comet") } + checkSparkAnswerAndOperator("SELECT elt(2, 'a', 'b', 'c')") } } - } From ce7a5c5780d3ce9c35b6e9c020a80201453b482e Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 2 Feb 2026 20:10:14 +0400 Subject: [PATCH 10/11] Fix fmt --- .../scala/org/apache/comet/CometStringExpressionSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 71acc1d658..726ecce688 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -21,14 +21,12 @@ package org.apache.comet import scala.util.Random -import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} -import org.apache.comet.serde.CometElt import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometStringExpressionSuite extends CometTestBase { From 854768485959827c2afc339976f88ad4f31d57ba Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 8 Feb 2026 21:42:35 +0400 Subject: [PATCH 11/11] add first sql elt test --- .../sql-tests/expressions/string/elt.sql | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 spark/src/test/resources/sql-tests/expressions/string/elt.sql diff --git a/spark/src/test/resources/sql-tests/expressions/string/elt.sql b/spark/src/test/resources/sql-tests/expressions/string/elt.sql new file mode 100644 index 0000000000..fe49bd25fd --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/string/elt.sql @@ -0,0 +1,27 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +statement +CREATE TABLE test_elt(a string, b string, c string, idx int) USING parquet + +statement +INSERT INTO test_elt VALUES ('a', 'b', 'c', 1), ('a', 'b', '', 2), (NULL, 'b', 'c', NULL), ('a', NULL, 'c', -100), (NULL, NULL, NULL, 0) + +query +SELECT elt(idx, a, b, c) FROM test_elt