Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
768b3e9
impl map_from_entries
Dec 14, 2025
c68c342
Revert "impl map_from_entries"
Dec 16, 2025
d887555
Merge branch 'apache:main' into main
kazantsev-maksim Dec 16, 2025
231aa90
Merge branch 'apache:main' into main
kazantsev-maksim Dec 17, 2025
9500bbb
Merge branch 'apache:main' into main
kazantsev-maksim Dec 24, 2025
9577481
Merge branch 'apache:main' into main
kazantsev-maksim Dec 28, 2025
3791557
Merge branch 'apache:main' into main
kazantsev-maksim Jan 2, 2026
7c2f082
Merge branch 'apache:main' into main
kazantsev-maksim Jan 3, 2026
609a605
Merge branch 'apache:main' into main
kazantsev-maksim Jan 6, 2026
a151b2c
Merge branch 'apache:main' into main
kazantsev-maksim Jan 7, 2026
ad3e7f5
Merge branch 'apache:main' into main
kazantsev-maksim Jan 10, 2026
ea92e4b
Merge branch 'apache:main' into main
kazantsev-maksim Jan 14, 2026
8dfeca3
Merge branch 'apache:main' into main
kazantsev-maksim Jan 17, 2026
559741e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 20, 2026
ebda14e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 21, 2026
408152e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 23, 2026
d7857b2
Merge branch 'apache:main' into main
kazantsev-maksim Jan 24, 2026
63fb715
Feat: impl elt function
Jan 25, 2026
4b07cc4
Added micro benchmark test
Jan 25, 2026
0f518f6
Fix fmt
Jan 27, 2026
1604dc6
Fix PR issues
Jan 27, 2026
fc22ee7
Fix PR issues
Jan 29, 2026
aef41be
Merge branch 'apache:main' into main
kazantsev-maksim Jan 29, 2026
70c400b
Merge remote-tracking branch 'origin/main' into elt_func
Jan 29, 2026
8e3f482
Fix PR issues
Jan 29, 2026
2e66014
Fix PR issues
Jan 31, 2026
ce7a5c5
Fix fmt
Feb 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_spark::function::math::hex::SparkHex;
use datafusion_spark::function::math::width_bucket::SparkWidthBucket;
use datafusion_spark::function::string::char::CharFunc;
use datafusion_spark::function::string::concat::SparkConcat;
use datafusion_spark::function::string::elt::SparkElt;
use futures::poll;
use futures::stream::StreamExt;
use jni::objects::JByteBuffer;
Expand Down Expand Up @@ -353,6 +354,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkHex::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkWidthBucket::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkElt::default()));
}

/// Prepares arrow arrays for output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[StringTrimRight] -> CometScalarFunction("rtrim"),
classOf[Left] -> CometLeft,
classOf[Substring] -> CometSubstring,
classOf[Upper] -> CometUpper)
classOf[Upper] -> CometUpper,
classOf[Elt] -> CometElt)

private val bitwiseExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[BitwiseAnd] -> CometBitwiseAnd,
Expand Down
14 changes: 12 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Expression, InitCap, Left, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper}
import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Elt, Expression, InitCap, Left, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper}
import org.apache.spark.sql.types._

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.withInfo
Expand Down Expand Up @@ -289,6 +289,16 @@ object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
}
}

object CometElt extends CometScalarFunction[Elt]("elt") {

override def getSupportLevel(expr: Elt): SupportLevel = {
if (expr.failOnError) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @kazantsev-maksim .Minor nitpick : We generally add ANSI mode not supported to make it clear to the user. I can work in enabling support for ANSI mode once this PR is merged as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed

return Unsupported(Some("ANSI mode not supported"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also might want to check if the inputs are a number and an array to make sure we terminate early in case the user provides malformed input

Copy link
Contributor

@coderfender coderfender Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant examples : CometLength , CometRPad , CometLPad

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first argument can be a numeric string - '2', it seems we can't cover all cases here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do a selective support for now (say Int, String) inputs for now and file an issue for further enhancements down the line. WDYT @kazantsev-maksim ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added it. @coderfender Can you please see?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you

Compatible(None)
}
}

trait CommonStringExprs {

def stringDecode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.util.Random

import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql.{CometTestBase, DataFrame}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}

import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}

Expand Down Expand Up @@ -391,4 +392,43 @@ class CometStringExpressionSuite extends CometTestBase {
}
}

test("elt") {
val wrongNumArgsWithoutSuggestionExceptionMsg =
"[WRONG_NUM_ARGS.WITHOUT_SUGGESTION] The `elt` requires > 1 parameters but the actual number is 1."
withSQLConf(
SQLConf.ANSI_ENABLED.key -> "false",
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
val r = new Random(42)
val fieldsCount = 10
val indexes = Seq.range(1, fieldsCount)
val edgeCasesIndexes = Seq(-1, 0, -100, fieldsCount + 100)
val schema = indexes
.foldLeft(new StructType())((schema, idx) =>
schema.add(s"c$idx", StringType, nullable = true))
val df = FuzzDataGenerator.generateDataFrame(
r,
spark,
schema,
100,
DataGenOptions(maxStringLength = 6))
df.withColumn(
"idx",
lit(Random.shuffle(indexes ++ edgeCasesIndexes).headOption.getOrElse(-1)))
.createOrReplaceTempView("t1")
checkSparkAnswerAndOperator(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be worth adding a literal test with constant folding disabled, something like:

                                                                                                                                                                                 
  withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->                                                                                                                             
      "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {                                                                                                                
    checkSparkAnswerAndOperator("SELECT elt(2, 'a', 'b', 'c')")          

It is totally fine to fall back to Spark if all inputs are literal.

sql(s"SELECT elt(idx, ${schema.fieldNames.mkString(",")}) FROM t1"))
checkSparkAnswerAndOperator(
sql(s"SELECT elt(cast(null as int), ${schema.fieldNames.mkString(",")}) FROM t1"))
checkSparkAnswerMaybeThrows(sql("SELECT elt(1) FROM t1")) match {
case (Some(spark), Some(comet)) =>
assert(spark.getMessage.contains(wrongNumArgsWithoutSuggestionExceptionMsg))
assert(comet.getMessage.contains(wrongNumArgsWithoutSuggestionExceptionMsg))
case (spark, comet) =>
fail(
s"Expected Spark and Comet to throw exception, but got\nSpark: $spark\nComet: $comet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great if you can add more tests to cover other code paths

  1. Non string data types from pos 2
  2. ANSI mode should fall back successfully to Spark
  3. Index out of range and make sure we return NULL
  4. Verify that first arg being Int is being enforced properly (perhaps pass a string / long type input as a negative test)

}
checkSparkAnswerAndOperator("SELECT elt(2, 'a', 'b', 'c')")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
StringExprConfig("substring", "select substring(c1, 1, 100) from parquetV1Table"),
StringExprConfig("translate", "select translate(c1, '123456', 'aBcDeF') from parquetV1Table"),
StringExprConfig("trim", "select trim(c1) from parquetV1Table"),
StringExprConfig("upper", "select upper(c1) from parquetV1Table"))
StringExprConfig("upper", "select upper(c1) from parquetV1Table"),
StringExprConfig("elt", "select elt(2, c1, c1) from parquetV1Table"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for adding this in the benchmark setup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


override def runCometBenchmark(mainArgs: Array[String]): Unit = {
runBenchmarkWithTable("String expressions", 1024) { v =>
Expand Down
Loading