From 05ec659d3a69beeb06680981f2a0afe0edebe9f0 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 5 Feb 2026 09:59:52 -0500 Subject: [PATCH 1/3] support expression years --- .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../org/apache/comet/serde/datetime.scala | 37 ++++++++++++++++-- .../apache/comet/CometExpressionSuite.scala | 38 ++++++++++++++++++- 3 files changed, 73 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e25d7fb4eb..5e4abb48c9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -206,7 +206,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekDay] -> CometWeekDay, classOf[DayOfYear] -> CometDayOfYear, classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) + classOf[Quarter] -> CometQuarter, + classOf[Years] -> CometYears) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index a623146916..fd01ea25be 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -20,9 +20,8 @@ package org.apache.comet.serde import java.util.Locale - -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} -import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year, Years} +import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -537,3 +536,35 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } } } + +object CometYears extends CometExpressionSerde[Years] { + + override def getSupportLevel(expr: Years): SupportLevel = { + expr.child.dataType match { + case DateType | TimestampType | TimestampNTZType => Compatible() + case _ => Unsupported(Some(s"Years does not support type: ${expr.child.dataType}")) + } + } + + override def convert(expr: Years, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val periodType = exprToProtoInternal(Literal("year"), inputs, binding) + val childExpr = exprToProtoInternal(expr.child, inputs, binding) + val optExpr = scalarFunctionExprToProto("datepart", Seq(periodType, childExpr): _*) + .map(e => { + Expr + .newBuilder() + .setCast( + ExprOuterClass.Cast + .newBuilder() + .setChild(e) + .setDatatype(serializeDataType(IntegerType).get) + .setEvalMode(ExprOuterClass.EvalMode.LEGACY) + .setAllowIncompat(false) + .build()) + .build() + }) + optExprWithInfo(optExpr, expr, expr.child) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 5a22583ae0..717672979e 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp, Years} import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps import org.apache.spark.sql.comet.CometProjectExec import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} @@ -39,6 +39,8 @@ import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types._ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus +import org.apache.comet.serde.{CometYears, Compatible, Unsupported} +import org.apache.comet.serde.QueryPlanSerde.exprToProto import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -3162,4 +3164,38 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("support Years partition transform (serialization only)") { + val input = Seq(java.sql.Date.valueOf("2024-01-15")).toDF("col") + val inputAttrs = input.queryExecution.analyzed.output + val yearsExpr = Years(input.col("col").expr) + val proto = exprToProto(yearsExpr, inputAttrs, binding = false) + + assert(proto.isDefined, "Comet failed to serialize the Years expression!") + + val expr = proto.get + assert(expr.hasCast, "Expected the result to be a Cast (to Integer)") + assert(expr.getCast.getChild.hasScalarFunc, "Expected Cast child to be a Scalar Function") + assert(expr.getCast.getChild.getScalarFunc.getFunc == "datepart", "Expected function to be 'datepart'") + } + + test("Years support level") { + val supportedTypes = Seq(DateType, TimestampType, TimestampNTZType) + val unsupportedTypes = Seq(StringType, IntegerType, LongType) + + supportedTypes.foreach { dt => + val child = Literal.default(dt) + val expr = Years(child) + val result = CometYears.getSupportLevel(expr) + + assert(result.isInstanceOf[Compatible], s"Expected $dt to be Compatible") + } + + unsupportedTypes.foreach { dt => + val child = Literal.default(dt) + val expr = Years(child) + val result = CometYears.getSupportLevel(expr) + + assert(result.isInstanceOf[Unsupported], s"Expected $dt to be Unsupported") + } + } } From 379180e72a77eb2bd237750fc8ca17ae0eee98f2 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Thu, 5 Feb 2026 11:44:00 -0500 Subject: [PATCH 2/3] style: fix Scala style --- .../src/main/scala/org/apache/comet/serde/datetime.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index fd01ea25be..9f618d8bf2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -20,6 +20,7 @@ package org.apache.comet.serde import java.util.Locale + import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year, Years} import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -546,9 +547,10 @@ object CometYears extends CometExpressionSerde[Years] { } } - override def convert(expr: Years, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { + override def convert( + expr: Years, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { val periodType = exprToProtoInternal(Literal("year"), inputs, binding) val childExpr = exprToProtoInternal(expr.child, inputs, binding) val optExpr = scalarFunctionExprToProto("datepart", Seq(periodType, childExpr): _*) From 064db2bf2636e7f4e745ba3e0423301419b20e52 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Fri, 6 Feb 2026 11:39:19 -0500 Subject: [PATCH 3/3] style: spotless apply --- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 717672979e..150d2ecfa7 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -3175,7 +3175,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val expr = proto.get assert(expr.hasCast, "Expected the result to be a Cast (to Integer)") assert(expr.getCast.getChild.hasScalarFunc, "Expected Cast child to be a Scalar Function") - assert(expr.getCast.getChild.getScalarFunc.getFunc == "datepart", "Expected function to be 'datepart'") + assert( + expr.getCast.getChild.getScalarFunc.getFunc == "datepart", + "Expected function to be 'datepart'") } test("Years support level") {