From b33fd7de3207ed4ce88e033de7b11616669b3d69 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Tue, 3 Feb 2026 12:07:53 +0100 Subject: [PATCH 01/11] implement to_timestamp --- native/Cargo.lock | 1 + native/Cargo.toml | 1 + native/spark-expr/Cargo.toml | 1 + native/spark-expr/src/comet_scalar_funcs.rs | 5 + native/spark-expr/src/datetime_funcs/mod.rs | 2 + .../src/datetime_funcs/to_timestamp.rs | 119 ++++++++++++++++++ .../apache/comet/serde/QueryPlanSerde.scala | 4 +- .../org/apache/comet/serde/datetime.scala | 92 +++++++++++++- .../comet/CometTemporalExpressionSuite.scala | 49 ++++++++ 9 files changed, 271 insertions(+), 3 deletions(-) create mode 100644 native/spark-expr/src/datetime_funcs/to_timestamp.rs diff --git a/native/Cargo.lock b/native/Cargo.lock index 4adf9ed06e..c315c579c5 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1918,6 +1918,7 @@ dependencies = [ "chrono-tz", "criterion", "datafusion", + "datafusion-common", "futures", "hex", "num", diff --git a/native/Cargo.toml b/native/Cargo.toml index 216057f9bd..8cdad19230 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -39,6 +39,7 @@ async-trait = { version = "0.1" } bytes = { version = "1.10.0" } parquet = { version = "57.2.0", default-features = false, features = ["experimental"] } datafusion = { version = "51.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } +datafusion-common = { version = "51.0.0"} datafusion-datasource = { version = "51.0.0" } datafusion-spark = { version = "51.0.0" } datafusion-comet-spark-expr = { path = "spark-expr" } diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 94653d8864..6c259e7c2b 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -30,6 +30,7 @@ edition = { workspace = true } arrow = { workspace = true } chrono = { workspace = true } datafusion = { workspace = true } +datafusion-common = { workspace = true } chrono-tz = { workspace = true } num = { workspace = true } regex = { workspace = true } diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 760dc3570f..66d39e5b44 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::datetime_funcs::custom_to_timestamp; use crate::hash_funcs::*; use crate::math_funcs::abs::abs; use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub}; @@ -177,6 +178,10 @@ pub fn create_comet_physical_fun_with_eval_mode( let func = Arc::new(spark_modulo); make_comet_scalar_udf!("spark_modulo", func, without data_type, fail_on_error) } + "custom_to_timestamp" => { + let func = Arc::new(custom_to_timestamp); + make_comet_scalar_udf!("custom_to_timestamp", func, without data_type) + } "abs" => { let func = Arc::new(abs); make_comet_scalar_udf!("abs", func, without data_type) diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index 1832711479..45d5db2a2e 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -19,6 +19,7 @@ mod date_diff; mod date_trunc; mod extract_date_part; mod timestamp_trunc; +mod to_timestamp; mod unix_timestamp; pub use date_diff::SparkDateDiff; @@ -27,4 +28,5 @@ pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use timestamp_trunc::TimestampTruncExpr; +pub use to_timestamp::custom_to_timestamp; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/to_timestamp.rs b/native/spark-expr/src/datetime_funcs/to_timestamp.rs new file mode 100644 index 0000000000..e04b8ab301 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/to_timestamp.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Define JNI APIs which can be called from Java/Scala. + +use arrow::array::{ArrayRef, StringArray}; +use arrow::datatypes::DataType::Timestamp; +use arrow::datatypes::TimeUnit::Microsecond; +use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone}; +use chrono_tz::Tz; +use datafusion::common::{DataFusionError, Result}; +use datafusion::functions::downcast_named_arg; +use datafusion::functions::utils::make_scalar_function; +use datafusion::logical_expr::ColumnarValue; +use datafusion::scalar::ScalarValue; +use datafusion_common; +use datafusion_common::internal_err; +use std::str::FromStr; +use std::sync::Arc; + +const TO_TIMESTAMP: &str = "custom_to_timestamp"; + +/// VERY SMALL subset Spark → chrono +fn spark_to_chrono(fmt: &str) -> String { + fmt.replace("yyyy", "%Y") + .replace("MM", "%m") + .replace("dd", "%d") + .replace("HH", "%H") + .replace("mm", "%M") + .replace("ss", "%S") +} + +fn format_has_time(fmt: &str) -> bool { + fmt.contains("%H") || fmt.contains("%M") || fmt.contains("%S") +} + +fn parse_date_or_timestamp(value: &str, format: &str) -> Result { + if format_has_time(format) { + NaiveDateTime::parse_from_str(value, format) + } else { + let date: NaiveDate = NaiveDate::parse_from_str(value, format)?; + Ok(date.and_hms_opt(0, 0, 0).unwrap()) + } +} + +fn to_utc_micros(naive: NaiveDateTime, tz: Tz) -> Option { + let local: DateTime = tz.from_local_datetime(&naive).single()?; + Some(local.with_timezone(&tz).timestamp_micros()) +} + +fn spark_to_timestamp_parse(value: &str, format: &str, tz: Tz) -> Result { + let result: NaiveDateTime = parse_date_or_timestamp(value, format).map_err(|_| { + DataFusionError::Plan(format!("Error parsing '{value}' with format'{format}'.")) + })?; + to_utc_micros(result, tz) + .ok_or_else(|| DataFusionError::Plan(format!("Error using the timezone {tz}."))) +} + +pub fn custom_to_timestamp(args: &[ColumnarValue]) -> Result { + make_scalar_function(spark_custom_to_timestamp, vec![])(&args) +} + +pub fn spark_custom_to_timestamp(args: &[ArrayRef]) -> Result { + if args.len() < 2 || args.len() > 3 { + return internal_err!( + "`{}` function requires 2 or 3 arguments, got {} arguments", + TO_TIMESTAMP, + args.len() + ); + } + let dates: &StringArray = downcast_named_arg!(&args[0], "date", StringArray); + let format: &str = downcast_named_arg!(&args[1], "format", StringArray).value(0); + let format: String = spark_to_chrono(format); + let tz: Tz = opt_downcast_arg!(&args[2], StringArray) + .and_then(|v| Tz::from_str(v.value(0)).ok()) + .unwrap_or(Tz::UTC); + + let utc_tz: String = chrono_tz::UTC.to_string(); + let utc_tz: Arc = Arc::from(utc_tz); + let values: Result> = dates + .iter() + .map(|value| match value { + None => ScalarValue::Int64(None).cast_to(&Timestamp(Microsecond, Some(utc_tz.clone()))), + Some(date_raw) => { + let parsed_value: Result = spark_to_timestamp_parse(date_raw, &format, tz); + + ScalarValue::Int64(Some(parsed_value?)) + .cast_to(&Timestamp(Microsecond, Some(utc_tz.clone()))) + } + }) + .collect::>>(); + + let scalar_values: Vec = values?; + let decimal_array: ArrayRef = ScalarValue::iter_to_array(scalar_values)?; + + Ok(decimal_array) +} + +macro_rules! opt_downcast_arg { + ($ARG:expr, $ARRAY_TYPE:ident) => {{ + $ARG.as_any().downcast_ref::<$ARRAY_TYPE>() + }}; +} + +pub(crate) use opt_downcast_arg; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 066680456e..17b63e2df7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -205,7 +205,9 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekDay] -> CometWeekDay, classOf[DayOfYear] -> CometDayOfYear, classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) + classOf[Quarter] -> CometQuarter, + classOf[GetTimestamp] -> CometGetTimestamp, + classOf[ParseToDate] -> CometParseToDate) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index a623146916..c3204ccbbf 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -19,10 +19,13 @@ package org.apache.comet.serde +import java.time.ZoneId import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} -import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} +import scala.util.Try + +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, GetTimestamp, Hour, LastDay, Literal, Minute, Month, ParseToDate, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.types.{DataType, DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -176,6 +179,91 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF } } +object CometGetTimestamp extends CometExpressionSerde[GetTimestamp] { + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: GetTimestamp, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val leftExpr: Option[Expr] = + exprToProtoInternal(expr.left, inputs, binding) // timestamp or date + val rightExpr: Option[Expr] = exprToProtoInternal(expr.right, inputs, binding) // format + val tZ: Option[Expr] = + expr.timeZoneId.flatMap(tz => exprToProtoInternal(Literal(tz), inputs, binding)) + scalarFunctionExprToProtoWithReturnType( + "custom_to_timestamp", + expr.dataType, + failOnError = expr.failOnError, + args = leftExpr, + rightExpr, + tZ) + } +} + +object CometParseToDate extends CometExpressionSerde[ParseToDate] { + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: ParseToDate, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childExpr: Option[Expr] = exprToProtoInternal(expr.left, inputs, binding) + val failOnErrorExpr: Option[Expr] = + exprToProtoInternal(Literal(expr.ansiEnabled), inputs, binding) + scalarFunctionExprToProtoWithReturnType( + "to_date", + expr.dataType, + expr.ansiEnabled, + childExpr, + failOnErrorExpr) + } + +// private val isValidTz: String => Boolean = tz => Try(ZoneId.of(tz)).isSuccess +// +// override def getSupportLevel(expr: ParseToDate): SupportLevel = { +// val formatDataType: Option[DataType] = expr.format.map(e => e.dataType) +// (expr.dataType, formatDataType, expr.timeZoneId) match { +// case (StringType, Some(StringType) | None, Some(timeZone)) => +// if (isValidTz(timeZone)) Compatible() +// else { +// Incompatible( +// Option("Invalid time zone. Use a valid time zone (e.g. 'UTC', 'Europe/Madrid')")) +// } +// case (StringType, Some(StringType) | None, None) => Compatible() +// case _ => Incompatible(Option("Only string types are supported")) +// } +// } +} + object CometHour extends CometExpressionSerde[Hour] { override def convert( expr: Hour, diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..edd56e1a9e 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -173,6 +173,55 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } + test("to_date - string input") { + withTempView("string_tbl") { + // Create test data with timestamp strings + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, true))) + val data = Seq(Row("2020-01-01"), Row("2021-06-15"), Row("2022-12-31"), Row(null)) + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + spark.sql("SELECT dt_str, to_date(dt_str, 'yyyy-MM-dd') from string_tbl").show(20, false) + // String input with custom format should also fall back + checkSparkAnswer("SELECT dt_str, to_date(dt_str, 'yyyy-MM-dd') from string_tbl") + } + } + + test("to_date - string input - 2") { + withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withTempView("string_tbl") { + // Create test data with timestamp strings + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, true))) + val data = Seq(Row("2020-01-01"), Row("2021-06-15"), Row("2022-12-31"), Row(null)) + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + // String input with custom format should also fall back + checkSparkAnswer("SELECT to_date('2020-01-01', 'yyyy-MM-dd') from string_tbl") + } + } + } + + test("to_date - string input - 3") { + withTempView("string_tbl") { + // Create test data with timestamp strings + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, true))) + val data = Seq(Row("2020/01/01"), Row("2021/06/15"), Row("2022/12/31"), Row(null)) + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + spark.sql("SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') from string_tbl").show(20, false) + // String input with custom format should also fall back + checkSparkAnswer("SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') from string_tbl") + } + } + + + private def createTimestampTestData = { val r = new Random(42) val schema = StructType( From 1e41cb4ab3824a94a70e47042706e95a795ab62b Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 09:32:19 +0100 Subject: [PATCH 02/11] add to_date tests --- .../comet/CometTemporalExpressionSuite.scala | 101 ++++++++++++++---- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index edd56e1a9e..de95a0e7f3 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -173,55 +173,110 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - test("to_date - string input") { + test("to_date parses date literal") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + checkSparkAnswer( + "SELECT to_date('2026-01-30')" + ) + } + } + + test("to_date parses date literal with explicit format") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + checkSparkAnswer( + "SELECT to_date('2026/01/30', 'yyyy/MM/dd')" + ) + } + } + + test("to_date parses date string column") { withTempView("string_tbl") { - // Create test data with timestamp strings - val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, true))) - val data = Seq(Row("2020-01-01"), Row("2021-06-15"), Row("2022-12-31"), Row(null)) + val schema = StructType( + Seq(StructField("dt_str", DataTypes.StringType, nullable = true)) + ) + + val data = Seq( + Row("2026-01-30"), + Row("2026-03-10"), + Row("2026-10-10"), + Row(null) + ) + spark .createDataFrame(spark.sparkContext.parallelize(data), schema) .createOrReplaceTempView("string_tbl") - spark.sql("SELECT dt_str, to_date(dt_str, 'yyyy-MM-dd') from string_tbl").show(20, false) - // String input with custom format should also fall back - checkSparkAnswer("SELECT dt_str, to_date(dt_str, 'yyyy-MM-dd') from string_tbl") + checkSparkAnswer( + "SELECT dt_str, to_date(dt_str) FROM string_tbl" + ) } } - test("to_date - string input - 2") { - withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + test("to_date parses date string column with explicit format") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { withTempView("string_tbl") { - // Create test data with timestamp strings - val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, true))) - val data = Seq(Row("2020-01-01"), Row("2021-06-15"), Row("2022-12-31"), Row(null)) + val schema = StructType( + Seq(StructField("dt_str", DataTypes.StringType, nullable = true)) + ) + + val data = Seq( + Row("2026/01/30"), + Row("2026/03/10"), + Row("2026/10/10"), + Row(null) + ) + spark .createDataFrame(spark.sparkContext.parallelize(data), schema) .createOrReplaceTempView("string_tbl") - // String input with custom format should also fall back - checkSparkAnswer("SELECT to_date('2020-01-01', 'yyyy-MM-dd') from string_tbl") + checkSparkAnswer( + "SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') FROM string_tbl" + ) } } } - test("to_date - string input - 3") { + test("to_date parses timestamp literal string") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + checkSparkAnswer( + "SELECT to_date('2026-01-30 04:17:52')" + ) + } + } + + test("to_date parses timestamp string column") { withTempView("string_tbl") { - // Create test data with timestamp strings - val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, true))) - val data = Seq(Row("2020/01/01"), Row("2021/06/15"), Row("2022/12/31"), Row(null)) + val schema = StructType( + Seq(StructField("dt_str", DataTypes.StringType, nullable = true)) + ) + + val data = Seq( + Row("2026-01-30 04:17:52"), + Row("2026-03-10 04:17:52"), + Row("2026-10-10 04:17:52"), + Row(null) + ) + spark .createDataFrame(spark.sparkContext.parallelize(data), schema) .createOrReplaceTempView("string_tbl") - spark.sql("SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') from string_tbl").show(20, false) - // String input with custom format should also fall back - checkSparkAnswer("SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') from string_tbl") + checkSparkAnswer( + "SELECT dt_str, to_date(dt_str) FROM string_tbl" + ) } } - private def createTimestampTestData = { val r = new Random(42) val schema = StructType( From 808e462d93d6d64e28e47c78b3c88c475ff5dd63 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 09:47:07 +0100 Subject: [PATCH 03/11] add to_timestamp tests --- .../comet/CometTemporalExpressionSuite.scala | 189 +++++++++++++----- 1 file changed, 137 insertions(+), 52 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index de95a0e7f3..a7f5259d88 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -174,108 +174,188 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } test("to_date parses date literal") { - withSQLConf( - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - checkSparkAnswer( - "SELECT to_date('2026-01-30')" - ) + withoutConstantFolding { + checkSparkAnswer("SELECT to_date('2026-01-30')") } } test("to_date parses date literal with explicit format") { - withSQLConf( - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - checkSparkAnswer( - "SELECT to_date('2026/01/30', 'yyyy/MM/dd')" - ) + withoutConstantFolding { + checkSparkAnswer("SELECT to_date('2026/01/30', 'yyyy/MM/dd')") } } test("to_date parses date string column") { withTempView("string_tbl") { - val schema = StructType( - Seq(StructField("dt_str", DataTypes.StringType, nullable = true)) - ) + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) - val data = Seq( - Row("2026-01-30"), - Row("2026-03-10"), - Row("2026-10-10"), - Row(null) - ) + val data = Seq(Row("2026-01-30"), Row("2026-03-10"), Row("2026-10-10"), Row(null)) spark .createDataFrame(spark.sparkContext.parallelize(data), schema) .createOrReplaceTempView("string_tbl") - checkSparkAnswer( - "SELECT dt_str, to_date(dt_str) FROM string_tbl" - ) + checkSparkAnswer("SELECT dt_str, to_date(dt_str) FROM string_tbl") } } test("to_date parses date string column with explicit format") { - withSQLConf( - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withoutConstantFolding { withTempView("string_tbl") { - val schema = StructType( - Seq(StructField("dt_str", DataTypes.StringType, nullable = true)) - ) + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) - val data = Seq( - Row("2026/01/30"), - Row("2026/03/10"), - Row("2026/10/10"), - Row(null) - ) + val data = Seq(Row("2026/01/30"), Row("2026/03/10"), Row("2026/10/10"), Row(null)) spark .createDataFrame(spark.sparkContext.parallelize(data), schema) .createOrReplaceTempView("string_tbl") - checkSparkAnswer( - "SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') FROM string_tbl" - ) + checkSparkAnswer("SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') FROM string_tbl") } } } test("to_date parses timestamp literal string") { - withSQLConf( - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - checkSparkAnswer( - "SELECT to_date('2026-01-30 04:17:52')" - ) + withoutConstantFolding { + checkSparkAnswer("SELECT to_date('2026-01-30 04:17:52')") } } test("to_date parses timestamp string column") { withTempView("string_tbl") { - val schema = StructType( - Seq(StructField("dt_str", DataTypes.StringType, nullable = true)) - ) + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026-01-30 04:17:52"), + Row("2026-03-10 04:17:52"), + Row("2026-10-10 04:17:52"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT dt_str, to_date(dt_str) FROM string_tbl") + } + } + + test("to_timestamp parses date literal as midnight timestamp") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026-01-30')") + } + } + + test("to_timestamp parses timestamp literal with default format") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026-01-30 10:30:52')") + } + } + + test("to_timestamp parses date literal using explicit date format") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026/01/30', 'yyyy/MM/dd')") + } + } + + test("to_timestamp parses timestamp literal using explicit timestamp format") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026/01/30 10:30:52', 'yyyy/MM/dd HH:mm:ss')") + } + } + + test("to_timestamp parses date string column as midnight timestamp") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026-01-30"), Row("2026-03-10"), Row("2026-10-10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT ts_str, to_timestamp(ts_str) FROM string_tbl") + } + } + + test("to_timestamp parses date string column using explicit date format") { + withoutConstantFolding { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026/01/30"), Row("2026/03/10"), Row("2026/10/10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT ts_str, to_timestamp(ts_str, 'yyyy/MM/dd') FROM string_tbl") + } + } + } + + test("to_timestamp parses timestamp string column with default format") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) val data = Seq( Row("2026-01-30 04:17:52"), Row("2026-03-10 04:17:52"), Row("2026-10-10 04:17:52"), - Row(null) - ) + Row(null)) spark .createDataFrame(spark.sparkContext.parallelize(data), schema) .createOrReplaceTempView("string_tbl") + checkSparkAnswer("SELECT ts_str, to_timestamp(ts_str) FROM string_tbl") + } + } + + test("to_timestamp parses timestamp string column using explicit timestamp format") { + withoutConstantFolding { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026/01/30 04:17:52"), + Row("2026/03/10 04:17:52"), + Row("2026/10/10 04:17:52"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer( + "SELECT ts_str, to_timestamp(ts_str, 'yyyy/MM/dd HH:mm:ss') FROM string_tbl") + } + } + } + test("to_timestamp parses timestamp literal with milliseconds") { + withoutConstantFolding { checkSparkAnswer( - "SELECT dt_str, to_date(dt_str) FROM string_tbl" - ) + "SELECT to_timestamp('2026-01-30 10:30:52.123', 'yyyy-MM-dd HH:mm:ss.SSS')") } } + test("to_timestamp parses timestamp string column with milliseconds") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026-01-30 10:30:52.123"), + Row("2026-03-10 10:30:52.999"), + Row("2026-10-10 10:30:52.000"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer( + "SELECT ts_str, to_timestamp(ts_str, 'yyyy-MM-dd HH:mm:ss.SSS') FROM string_tbl") + } + } private def createTimestampTestData = { val r = new Random(42) @@ -499,4 +579,9 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } + + private def withoutConstantFolding[A](f: => A): Unit = + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding")(f) } From 105e76c768f3f01887383da43f59b0583ba1ee77 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 09:47:15 +0100 Subject: [PATCH 04/11] add to_timestamp expression --- .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../org/apache/comet/serde/datetime.scala | 54 ++++++++++++------- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 17b63e2df7..654c9a7359 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -207,7 +207,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekOfYear] -> CometWeekOfYear, classOf[Quarter] -> CometQuarter, classOf[GetTimestamp] -> CometGetTimestamp, - classOf[ParseToDate] -> CometParseToDate) + classOf[ParseToDate] -> CometParseToDate, + classOf[ParseToTimestamp] -> CometParseToTimestamp) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index c3204ccbbf..6c45e64a81 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -19,13 +19,10 @@ package org.apache.comet.serde -import java.time.ZoneId import java.util.Locale -import scala.util.Try - -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, GetTimestamp, Hour, LastDay, Literal, Minute, Month, ParseToDate, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} -import org.apache.spark.sql.types.{DataType, DateType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, GetTimestamp, Hour, LastDay, Literal, Minute, Month, ParseToDate, ParseToTimestamp, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -246,22 +243,39 @@ object CometParseToDate extends CometExpressionSerde[ParseToDate] { childExpr, failOnErrorExpr) } +} -// private val isValidTz: String => Boolean = tz => Try(ZoneId.of(tz)).isSuccess -// -// override def getSupportLevel(expr: ParseToDate): SupportLevel = { -// val formatDataType: Option[DataType] = expr.format.map(e => e.dataType) -// (expr.dataType, formatDataType, expr.timeZoneId) match { -// case (StringType, Some(StringType) | None, Some(timeZone)) => -// if (isValidTz(timeZone)) Compatible() -// else { -// Incompatible( -// Option("Invalid time zone. Use a valid time zone (e.g. 'UTC', 'Europe/Madrid')")) -// } -// case (StringType, Some(StringType) | None, None) => Compatible() -// case _ => Incompatible(Option("Only string types are supported")) -// } -// } +object CometParseToTimestamp extends CometExpressionSerde[ParseToTimestamp] { + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: ParseToTimestamp, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childExpr: Option[Expr] = exprToProtoInternal(expr.left, inputs, binding) + val failOnErrorExpr: Option[Expr] = + exprToProtoInternal(Literal(expr.failOnError), inputs, binding) + scalarFunctionExprToProtoWithReturnType( + "to_timestamp", + expr.dataType, + expr.failOnError, + childExpr, + failOnErrorExpr) + } } object CometHour extends CometExpressionSerde[Hour] { From 50b85526226dc8f5233b1c36d138331990a19a92 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 10:35:05 +0100 Subject: [PATCH 05/11] fix milliseconds --- .../src/datetime_funcs/to_timestamp.rs | 326 ++++++++++++++++-- 1 file changed, 300 insertions(+), 26 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/to_timestamp.rs b/native/spark-expr/src/datetime_funcs/to_timestamp.rs index e04b8ab301..97ced949fc 100644 --- a/native/spark-expr/src/datetime_funcs/to_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/to_timestamp.rs @@ -17,10 +17,10 @@ //! Define JNI APIs which can be called from Java/Scala. -use arrow::array::{ArrayRef, StringArray}; +use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Microsecond; -use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Timelike}; use chrono_tz::Tz; use datafusion::common::{DataFusionError, Result}; use datafusion::functions::downcast_named_arg; @@ -34,40 +34,112 @@ use std::sync::Arc; const TO_TIMESTAMP: &str = "custom_to_timestamp"; -/// VERY SMALL subset Spark → chrono -fn spark_to_chrono(fmt: &str) -> String { - fmt.replace("yyyy", "%Y") - .replace("MM", "%m") - .replace("dd", "%d") - .replace("HH", "%H") - .replace("mm", "%M") - .replace("ss", "%S") +#[derive(Debug, Clone, Copy)] +#[derive(PartialEq)] +enum FractionPrecision { + Millis, + Micros, + Nanos, } -fn format_has_time(fmt: &str) -> bool { - fmt.contains("%H") || fmt.contains("%M") || fmt.contains("%S") +/// Detect Spark fractional precision +fn detect_fraction_precision(fmt: &str) -> Option { + let count = fmt.chars().filter(|&c| c == 'S').count(); + match count { + 0 => None, + 1..=3 => Some(FractionPrecision::Millis), + 4..=6 => Some(FractionPrecision::Micros), + _ => Some(FractionPrecision::Nanos), + } +} + +/// Convert Spark → Chrono format +fn spark_to_chrono(fmt: &str) -> (String, Option) { + let precision = detect_fraction_precision(fmt); + + let mut out = fmt.to_string(); + + // Date + out = out.replace("yyyy", "%Y"); + out = out.replace("MM", "%m"); + out = out.replace("dd", "%d"); + + // Time + out = out.replace("HH", "%H"); + out = out.replace("mm", "%M"); + out = out.replace("ss", "%S"); + + // Fractions + out = out + .replace(".SSSSSSSSS", "%.f") + .replace(".SSSSSS", "%.f") + .replace(".SSS", "%.f"); + + // Timezones + out = out.replace("XXX", "%:z"); + out = out.replace("Z", "%z"); + + (out, precision) } -fn parse_date_or_timestamp(value: &str, format: &str) -> Result { - if format_has_time(format) { - NaiveDateTime::parse_from_str(value, format) +/// Detect if Spark format contains time +fn spark_format_has_time(fmt: &str) -> bool { + fmt.contains("HH") || fmt.contains("mm") || fmt.contains("ss") +} + +/// Parse Spark date or timestamp +fn parse_spark_naive( + value: &str, + spark_fmt: &str, +) -> Result<(NaiveDateTime, Option), chrono::ParseError> { + let (chrono_fmt, precision) = spark_to_chrono(spark_fmt); + + if spark_format_has_time(spark_fmt) { + let ts = NaiveDateTime::parse_from_str(value, &chrono_fmt)?; + Ok((ts, precision)) } else { - let date: NaiveDate = NaiveDate::parse_from_str(value, format)?; - Ok(date.and_hms_opt(0, 0, 0).unwrap()) + let date = NaiveDate::parse_from_str(value, &chrono_fmt)?; + Ok((date.and_hms_opt(0, 0, 0).unwrap(), precision)) } } -fn to_utc_micros(naive: NaiveDateTime, tz: Tz) -> Option { - let local: DateTime = tz.from_local_datetime(&naive).single()?; - Some(local.with_timezone(&tz).timestamp_micros()) +/// Normalize fractional seconds +fn normalize_fraction( + mut ts: NaiveDateTime, + precision: Option, +) -> Option { + match precision { + Some(FractionPrecision::Millis) => { + let ms = ts.and_utc().timestamp_subsec_millis(); + ts = ts.with_nanosecond(ms * 1_000_000)?; + } + Some(FractionPrecision::Micros) => { + let us = ts.and_utc().timestamp_subsec_micros(); + ts = ts.with_nanosecond(us * 1_000)?; + } + Some(FractionPrecision::Nanos) | None => {} + } + Some(ts) } -fn spark_to_timestamp_parse(value: &str, format: &str, tz: Tz) -> Result { - let result: NaiveDateTime = parse_date_or_timestamp(value, format).map_err(|_| { - DataFusionError::Plan(format!("Error parsing '{value}' with format'{format}'.")) +/// Final Spark-like timestamp parse → UTC micros +pub fn spark_to_timestamp_parse( + value: &str, + spark_fmt: &str, + tz: Tz, +) -> Result { + let (naive, precision) = parse_spark_naive(value, spark_fmt).map_err(|_| { + DataFusionError::Plan(format!("Error parsing '{value}' with format '{spark_fmt}'")) })?; - to_utc_micros(result, tz) - .ok_or_else(|| DataFusionError::Plan(format!("Error using the timezone {tz}."))) + + let naive = normalize_fraction(naive, precision) + .ok_or_else(|| DataFusionError::Plan("Invalid fractional timestamp".into()))?; + + let local: DateTime = tz.from_local_datetime(&naive).single().ok_or_else(|| { + DataFusionError::Plan(format!("Ambiguous or invalid datetime in timezone {tz}")) + })?; + + Ok(local.timestamp_micros()) } pub fn custom_to_timestamp(args: &[ColumnarValue]) -> Result { @@ -84,7 +156,6 @@ pub fn spark_custom_to_timestamp(args: &[ArrayRef]) -> Result { } let dates: &StringArray = downcast_named_arg!(&args[0], "date", StringArray); let format: &str = downcast_named_arg!(&args[1], "format", StringArray).value(0); - let format: String = spark_to_chrono(format); let tz: Tz = opt_downcast_arg!(&args[2], StringArray) .and_then(|v| Tz::from_str(v.value(0)).ok()) .unwrap_or(Tz::UTC); @@ -117,3 +188,206 @@ macro_rules! opt_downcast_arg { } pub(crate) use opt_downcast_arg; + + + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{NaiveDate, NaiveDateTime}; + use chrono_tz::UTC; + + // ---------------------------- + // detect_fraction_precision + // ---------------------------- + + #[test] + fn detects_no_fraction() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss"), + None + ); + } + + #[test] + fn detects_millis_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSS"), + Some(FractionPrecision::Millis) + ); + } + + #[test] + fn detects_micros_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSSSSS"), + Some(FractionPrecision::Micros) + ); + } + + #[test] + fn detects_nanos_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"), + Some(FractionPrecision::Nanos) + ); + } + + // ---------------------------- + // spark_to_chrono + // ---------------------------- + + #[test] + fn converts_basic_date_format() { + let (fmt, precision) = spark_to_chrono("yyyy-MM-dd"); + assert_eq!(fmt, "%Y-%m-%d"); + assert_eq!(precision, None); + } + + #[test] + fn converts_timestamp_with_millis() { + let (fmt, precision) = + spark_to_chrono("yyyy-MM-dd HH:mm:ss.SSS"); + + assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%.f"); + assert_eq!(precision, Some(FractionPrecision::Millis)); + } + + #[test] + fn converts_timestamp_with_timezone() { + let (fmt, _) = + spark_to_chrono("yyyy-MM-dd HH:mm:ssXXX"); + + assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%:z"); + } + + // ---------------------------- + // spark_format_has_time + // ---------------------------- + + #[test] + fn detects_date_only_format() { + assert!(!spark_format_has_time("yyyy-MM-dd")); + } + + #[test] + fn detects_timestamp_format() { + assert!(spark_format_has_time("yyyy-MM-dd HH:mm:ss")); + } + + // ---------------------------- + // parse_spark_naive + // ---------------------------- + + #[test] + fn parses_date_as_midnight_timestamp() { + let (ts, _) = parse_spark_naive( + "2026-01-30", + "yyyy-MM-dd", + ) + .unwrap(); + + let expected = + NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + assert_eq!(ts, expected); + } + + #[test] + fn parses_timestamp_with_millis() { + let (ts, precision) = parse_spark_naive( + "2026-01-30 10:30:52.123", + "yyyy-MM-dd HH:mm:ss.SSS", + ) + .unwrap(); + + assert_eq!(precision, Some(FractionPrecision::Millis)); + assert_eq!(ts.and_utc().timestamp_subsec_millis(), 123); + } + + // ---------------------------- + // normalize_fraction + // ---------------------------- + + #[test] + fn normalizes_millis_precision() { + let ts = NaiveDateTime::parse_from_str( + "2026-01-30 10:30:52.123456", + "%Y-%m-%d %H:%M:%S%.6f", + ) + .unwrap(); + + let normalized = + normalize_fraction(ts, Some(FractionPrecision::Millis)) + .unwrap(); + + assert_eq!( + normalized.and_utc().timestamp_subsec_nanos(), + 123_000_000 + ); + } + + #[test] + fn normalizes_micros_precision() { + let ts = NaiveDateTime::parse_from_str( + "2026-01-30 10:30:52.123456", + "%Y-%m-%d %H:%M:%S%.6f", + ) + .unwrap(); + + let normalized = + normalize_fraction(ts, Some(FractionPrecision::Micros)) + .unwrap(); + + assert_eq!( + normalized.and_utc().timestamp_subsec_nanos(), + 123_456_000 + ); + } + + // ---------------------------- + // spark_to_timestamp_parse (end-to-end) + // ---------------------------- + + #[test] + fn parses_timestamp_and_preserves_millis() { + let micros = spark_to_timestamp_parse( + "2026-01-30 10:30:52.123", + "yyyy-MM-dd HH:mm:ss.SSS", + UTC, + ) + .unwrap(); + + let expected = NaiveDateTime::parse_from_str( + "2026-01-30 10:30:52.123", + "%Y-%m-%d %H:%M:%S%.3f", + ) + .unwrap() + .and_utc() + .timestamp_micros(); + + assert_eq!(micros, expected); + } + + #[test] + fn parses_date_literal_as_midnight() { + let micros = spark_to_timestamp_parse( + "2026-01-30", + "yyyy-MM-dd", + UTC, + ) + .unwrap(); + + let expected = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap() + .and_utc() + .timestamp_micros(); + + assert_eq!(micros, expected); + } +} From 44389db3d1e04fc0580daf2cac63eef74655d0c4 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 10:38:44 +0100 Subject: [PATCH 06/11] edit test --- .../scala/org/apache/comet/CometTemporalExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index a7f5259d88..075eb56043 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -239,7 +239,7 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - test("to_timestamp parses date literal as midnight timestamp") { + test("to_timestamp parses date literal as 00:00:00 timestamp") { withoutConstantFolding { checkSparkAnswer("SELECT to_timestamp('2026-01-30')") } From e779d52a4974b4a64b1a15520374b4bfb4933669 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 10:44:02 +0100 Subject: [PATCH 07/11] format --- .../src/datetime_funcs/to_timestamp.rs | 102 ++++++------------ 1 file changed, 31 insertions(+), 71 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/to_timestamp.rs b/native/spark-expr/src/datetime_funcs/to_timestamp.rs index 97ced949fc..48c806fcca 100644 --- a/native/spark-expr/src/datetime_funcs/to_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/to_timestamp.rs @@ -17,7 +17,7 @@ //! Define JNI APIs which can be called from Java/Scala. -use arrow::array::{ArrayRef, Int32Array, StringArray}; +use arrow::array::{ArrayRef, StringArray}; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Microsecond; use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Timelike}; @@ -34,8 +34,7 @@ use std::sync::Arc; const TO_TIMESTAMP: &str = "custom_to_timestamp"; -#[derive(Debug, Clone, Copy)] -#[derive(PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] enum FractionPrecision { Millis, Micros, @@ -189,8 +188,6 @@ macro_rules! opt_downcast_arg { pub(crate) use opt_downcast_arg; - - #[cfg(test)] mod tests { use super::*; @@ -203,10 +200,7 @@ mod tests { #[test] fn detects_no_fraction() { - assert_eq!( - detect_fraction_precision("yyyy-MM-dd HH:mm:ss"), - None - ); + assert_eq!(detect_fraction_precision("yyyy-MM-dd HH:mm:ss"), None); } #[test] @@ -246,8 +240,7 @@ mod tests { #[test] fn converts_timestamp_with_millis() { - let (fmt, precision) = - spark_to_chrono("yyyy-MM-dd HH:mm:ss.SSS"); + let (fmt, precision) = spark_to_chrono("yyyy-MM-dd HH:mm:ss.SSS"); assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%.f"); assert_eq!(precision, Some(FractionPrecision::Millis)); @@ -255,8 +248,7 @@ mod tests { #[test] fn converts_timestamp_with_timezone() { - let (fmt, _) = - spark_to_chrono("yyyy-MM-dd HH:mm:ssXXX"); + let (fmt, _) = spark_to_chrono("yyyy-MM-dd HH:mm:ssXXX"); assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%:z"); } @@ -281,28 +273,20 @@ mod tests { #[test] fn parses_date_as_midnight_timestamp() { - let (ts, _) = parse_spark_naive( - "2026-01-30", - "yyyy-MM-dd", - ) - .unwrap(); + let (ts, _) = parse_spark_naive("2026-01-30", "yyyy-MM-dd").unwrap(); - let expected = - NaiveDate::from_ymd_opt(2026, 1, 30) - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap(); + let expected = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); assert_eq!(ts, expected); } #[test] fn parses_timestamp_with_millis() { - let (ts, precision) = parse_spark_naive( - "2026-01-30 10:30:52.123", - "yyyy-MM-dd HH:mm:ss.SSS", - ) - .unwrap(); + let (ts, precision) = + parse_spark_naive("2026-01-30 10:30:52.123", "yyyy-MM-dd HH:mm:ss.SSS").unwrap(); assert_eq!(precision, Some(FractionPrecision::Millis)); assert_eq!(ts.and_utc().timestamp_subsec_millis(), 123); @@ -314,38 +298,24 @@ mod tests { #[test] fn normalizes_millis_precision() { - let ts = NaiveDateTime::parse_from_str( - "2026-01-30 10:30:52.123456", - "%Y-%m-%d %H:%M:%S%.6f", - ) - .unwrap(); - - let normalized = - normalize_fraction(ts, Some(FractionPrecision::Millis)) + let ts = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123456", "%Y-%m-%d %H:%M:%S%.6f") .unwrap(); - assert_eq!( - normalized.and_utc().timestamp_subsec_nanos(), - 123_000_000 - ); + let normalized = normalize_fraction(ts, Some(FractionPrecision::Millis)).unwrap(); + + assert_eq!(normalized.and_utc().timestamp_subsec_nanos(), 123_000_000); } #[test] fn normalizes_micros_precision() { - let ts = NaiveDateTime::parse_from_str( - "2026-01-30 10:30:52.123456", - "%Y-%m-%d %H:%M:%S%.6f", - ) - .unwrap(); - - let normalized = - normalize_fraction(ts, Some(FractionPrecision::Micros)) + let ts = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123456", "%Y-%m-%d %H:%M:%S%.6f") .unwrap(); - assert_eq!( - normalized.and_utc().timestamp_subsec_nanos(), - 123_456_000 - ); + let normalized = normalize_fraction(ts, Some(FractionPrecision::Micros)).unwrap(); + + assert_eq!(normalized.and_utc().timestamp_subsec_nanos(), 123_456_000); } // ---------------------------- @@ -354,32 +324,22 @@ mod tests { #[test] fn parses_timestamp_and_preserves_millis() { - let micros = spark_to_timestamp_parse( - "2026-01-30 10:30:52.123", - "yyyy-MM-dd HH:mm:ss.SSS", - UTC, - ) - .unwrap(); + let micros = + spark_to_timestamp_parse("2026-01-30 10:30:52.123", "yyyy-MM-dd HH:mm:ss.SSS", UTC) + .unwrap(); - let expected = NaiveDateTime::parse_from_str( - "2026-01-30 10:30:52.123", - "%Y-%m-%d %H:%M:%S%.3f", - ) - .unwrap() - .and_utc() - .timestamp_micros(); + let expected = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123", "%Y-%m-%d %H:%M:%S%.3f") + .unwrap() + .and_utc() + .timestamp_micros(); assert_eq!(micros, expected); } #[test] fn parses_date_literal_as_midnight() { - let micros = spark_to_timestamp_parse( - "2026-01-30", - "yyyy-MM-dd", - UTC, - ) - .unwrap(); + let micros = spark_to_timestamp_parse("2026-01-30", "yyyy-MM-dd", UTC).unwrap(); let expected = NaiveDate::from_ymd_opt(2026, 1, 30) .unwrap() From fd7f038b5a1adadc02937967ab69b527eedc86e0 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 11:01:41 +0100 Subject: [PATCH 08/11] rename function --- native/spark-expr/src/comet_scalar_funcs.rs | 8 ++++---- native/spark-expr/src/datetime_funcs/mod.rs | 2 +- native/spark-expr/src/datetime_funcs/to_timestamp.rs | 8 ++++---- .../src/main/scala/org/apache/comet/serde/datetime.scala | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 66d39e5b44..8afbb9156a 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::datetime_funcs::custom_to_timestamp; +use crate::datetime_funcs::to_timestamp; use crate::hash_funcs::*; use crate::math_funcs::abs::abs; use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub}; @@ -178,9 +178,9 @@ pub fn create_comet_physical_fun_with_eval_mode( let func = Arc::new(spark_modulo); make_comet_scalar_udf!("spark_modulo", func, without data_type, fail_on_error) } - "custom_to_timestamp" => { - let func = Arc::new(custom_to_timestamp); - make_comet_scalar_udf!("custom_to_timestamp", func, without data_type) + "to_timestamp" => { + let func = Arc::new(to_timestamp); + make_comet_scalar_udf!("to_timestamp", func, without data_type) } "abs" => { let func = Arc::new(abs); diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index 45d5db2a2e..032731432e 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -28,5 +28,5 @@ pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use timestamp_trunc::TimestampTruncExpr; -pub use to_timestamp::custom_to_timestamp; +pub use to_timestamp::to_timestamp; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/to_timestamp.rs b/native/spark-expr/src/datetime_funcs/to_timestamp.rs index 48c806fcca..81b84ca180 100644 --- a/native/spark-expr/src/datetime_funcs/to_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/to_timestamp.rs @@ -32,7 +32,7 @@ use datafusion_common::internal_err; use std::str::FromStr; use std::sync::Arc; -const TO_TIMESTAMP: &str = "custom_to_timestamp"; +const TO_TIMESTAMP: &str = "to_timestamp"; #[derive(Debug, Clone, Copy, PartialEq)] enum FractionPrecision { @@ -141,11 +141,11 @@ pub fn spark_to_timestamp_parse( Ok(local.timestamp_micros()) } -pub fn custom_to_timestamp(args: &[ColumnarValue]) -> Result { - make_scalar_function(spark_custom_to_timestamp, vec![])(&args) +pub fn to_timestamp(args: &[ColumnarValue]) -> Result { + make_scalar_function(spark_to_timestamp, vec![])(&args) } -pub fn spark_custom_to_timestamp(args: &[ArrayRef]) -> Result { +pub fn spark_to_timestamp(args: &[ArrayRef]) -> Result { if args.len() < 2 || args.len() > 3 { return internal_err!( "`{}` function requires 2 or 3 arguments, got {} arguments", diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 6c45e64a81..f85465e90e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -203,7 +203,7 @@ object CometGetTimestamp extends CometExpressionSerde[GetTimestamp] { val tZ: Option[Expr] = expr.timeZoneId.flatMap(tz => exprToProtoInternal(Literal(tz), inputs, binding)) scalarFunctionExprToProtoWithReturnType( - "custom_to_timestamp", + "to_timestamp", expr.dataType, failOnError = expr.failOnError, args = leftExpr, From 92e7394ff73de30ae6884e59c936af2106a71803 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 11:09:10 +0100 Subject: [PATCH 09/11] add to_date spark-sql tests --- .../expressions/datetime/to_date.sql | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql new file mode 100644 index 0000000000..ec5a9916c2 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql @@ -0,0 +1,46 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- to_date function +statement +CREATE TABLE test_to_date(col STRING) USING parquet + +statement +INSERT INTO test_to_date VALUES ('2026-01-30'), ('2026-03-10'), (NULL) + +query +SELECT col, to_date(col) FROM test_to_date + +statement +CREATE TABLE test_to_date_fmt(col STRING) USING parquet + +statement +INSERT INTO test_to_date_fmt VALUES ('2026/01/30'), ('2026/03/10'), (NULL) + +query +SELECT col, to_date(col, 'yyyy/MM/dd') FROM test_to_date_fmt + +query +SELECT to_date('2026-01-30') + + query +SELECT to_date('2026/01/30', 'yyyy/MM/dd') + +query +SELECT to_date('2026-01-30 10:30:00') From 376dff201a02b0d1e2ee5c6187ebc24e9b82fdaa Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 11:12:53 +0100 Subject: [PATCH 10/11] cargo clippy suggestions --- native/spark-expr/src/datetime_funcs/to_timestamp.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/to_timestamp.rs b/native/spark-expr/src/datetime_funcs/to_timestamp.rs index 81b84ca180..b5a1265086 100644 --- a/native/spark-expr/src/datetime_funcs/to_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/to_timestamp.rs @@ -27,7 +27,6 @@ use datafusion::functions::downcast_named_arg; use datafusion::functions::utils::make_scalar_function; use datafusion::logical_expr::ColumnarValue; use datafusion::scalar::ScalarValue; -use datafusion_common; use datafusion_common::internal_err; use std::str::FromStr; use std::sync::Arc; @@ -142,7 +141,7 @@ pub fn spark_to_timestamp_parse( } pub fn to_timestamp(args: &[ColumnarValue]) -> Result { - make_scalar_function(spark_to_timestamp, vec![])(&args) + make_scalar_function(spark_to_timestamp, vec![])(args) } pub fn spark_to_timestamp(args: &[ArrayRef]) -> Result { @@ -164,12 +163,14 @@ pub fn spark_to_timestamp(args: &[ArrayRef]) -> Result { let values: Result> = dates .iter() .map(|value| match value { - None => ScalarValue::Int64(None).cast_to(&Timestamp(Microsecond, Some(utc_tz.clone()))), + None => { + ScalarValue::Int64(None).cast_to(&Timestamp(Microsecond, Some(Arc::clone(&utc_tz)))) + } Some(date_raw) => { - let parsed_value: Result = spark_to_timestamp_parse(date_raw, &format, tz); + let parsed_value: Result = spark_to_timestamp_parse(date_raw, format, tz); ScalarValue::Int64(Some(parsed_value?)) - .cast_to(&Timestamp(Microsecond, Some(utc_tz.clone()))) + .cast_to(&Timestamp(Microsecond, Some(Arc::clone(&utc_tz)))) } }) .collect::>>(); From 8b016ea77992bd26fc6924f097bda47c97391f92 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 7 Feb 2026 11:31:54 +0100 Subject: [PATCH 11/11] add to_timestamp spark-sql tests --- .../expressions/datetime/to_timestamp.sql | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/to_timestamp.sql diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_timestamp.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_timestamp.sql new file mode 100644 index 0000000000..9ba8e4d770 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_timestamp.sql @@ -0,0 +1,47 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Config: spark.comet.expression.Cast.allowIncompatible=true +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- to_timestamp function +statement +CREATE TABLE test_to_timestamp(col STRING) USING parquet + +statement +INSERT INTO test_to_timestamp VALUES ('2026-01-30'), ('2026-03-10'), (NULL) + +query +SELECT col, to_timestamp(col) FROM test_to_timestamp + +statement +CREATE TABLE test_to_timestamp_fmt(col STRING) USING parquet + +statement +INSERT INTO test_to_timestamp_fmt VALUES ('2026/01/30 10:30:52'), ('2026/03/10 10:30:52'), (NULL) + +query +SELECT col, to_timestamp(col, 'yyyy/MM/dd HH:mm:ss') FROM test_to_timestamp_fmt + +query +SELECT to_timestamp('2026-01-30') + + query +SELECT to_timestamp('2026/01/30 10:30:52', 'yyyy/MM/dd HH:mm:ss') + +query +SELECT to_timestamp('2026-01-30 10:30:52')