diff --git a/native/Cargo.lock b/native/Cargo.lock index 4adf9ed06e..c315c579c5 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1918,6 +1918,7 @@ dependencies = [ "chrono-tz", "criterion", "datafusion", + "datafusion-common", "futures", "hex", "num", diff --git a/native/Cargo.toml b/native/Cargo.toml index 216057f9bd..8cdad19230 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -39,6 +39,7 @@ async-trait = { version = "0.1" } bytes = { version = "1.10.0" } parquet = { version = "57.2.0", default-features = false, features = ["experimental"] } datafusion = { version = "51.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } +datafusion-common = { version = "51.0.0"} datafusion-datasource = { version = "51.0.0" } datafusion-spark = { version = "51.0.0" } datafusion-comet-spark-expr = { path = "spark-expr" } diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 94653d8864..6c259e7c2b 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -30,6 +30,7 @@ edition = { workspace = true } arrow = { workspace = true } chrono = { workspace = true } datafusion = { workspace = true } +datafusion-common = { workspace = true } chrono-tz = { workspace = true } num = { workspace = true } regex = { workspace = true } diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 760dc3570f..8afbb9156a 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::datetime_funcs::to_timestamp; use crate::hash_funcs::*; use crate::math_funcs::abs::abs; use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub}; @@ -177,6 +178,10 @@ pub fn create_comet_physical_fun_with_eval_mode( let func = Arc::new(spark_modulo); make_comet_scalar_udf!("spark_modulo", func, without data_type, fail_on_error) } + "to_timestamp" => { + let func = Arc::new(to_timestamp); + make_comet_scalar_udf!("to_timestamp", func, without data_type) + } "abs" => { let func = Arc::new(abs); make_comet_scalar_udf!("abs", func, without data_type) diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index 1832711479..032731432e 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -19,6 +19,7 @@ mod date_diff; mod date_trunc; mod extract_date_part; mod timestamp_trunc; +mod to_timestamp; mod unix_timestamp; pub use date_diff::SparkDateDiff; @@ -27,4 +28,5 @@ pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use timestamp_trunc::TimestampTruncExpr; +pub use to_timestamp::to_timestamp; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/to_timestamp.rs b/native/spark-expr/src/datetime_funcs/to_timestamp.rs new file mode 100644 index 0000000000..b5a1265086 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/to_timestamp.rs @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Define JNI APIs which can be called from Java/Scala. + +use arrow::array::{ArrayRef, StringArray}; +use arrow::datatypes::DataType::Timestamp; +use arrow::datatypes::TimeUnit::Microsecond; +use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Timelike}; +use chrono_tz::Tz; +use datafusion::common::{DataFusionError, Result}; +use datafusion::functions::downcast_named_arg; +use datafusion::functions::utils::make_scalar_function; +use datafusion::logical_expr::ColumnarValue; +use datafusion::scalar::ScalarValue; +use datafusion_common::internal_err; +use std::str::FromStr; +use std::sync::Arc; + +const TO_TIMESTAMP: &str = "to_timestamp"; + +#[derive(Debug, Clone, Copy, PartialEq)] +enum FractionPrecision { + Millis, + Micros, + Nanos, +} + +/// Detect Spark fractional precision +fn detect_fraction_precision(fmt: &str) -> Option { + let count = fmt.chars().filter(|&c| c == 'S').count(); + match count { + 0 => None, + 1..=3 => Some(FractionPrecision::Millis), + 4..=6 => Some(FractionPrecision::Micros), + _ => Some(FractionPrecision::Nanos), + } +} + +/// Convert Spark → Chrono format +fn spark_to_chrono(fmt: &str) -> (String, Option) { + let precision = detect_fraction_precision(fmt); + + let mut out = fmt.to_string(); + + // Date + out = out.replace("yyyy", "%Y"); + out = out.replace("MM", "%m"); + out = out.replace("dd", "%d"); + + // Time + out = out.replace("HH", "%H"); + out = out.replace("mm", "%M"); + out = out.replace("ss", "%S"); + + // Fractions + out = out + .replace(".SSSSSSSSS", "%.f") + .replace(".SSSSSS", "%.f") + .replace(".SSS", "%.f"); + + // Timezones + out = out.replace("XXX", "%:z"); + out = out.replace("Z", "%z"); + + (out, precision) +} + +/// Detect if Spark format contains time +fn spark_format_has_time(fmt: &str) -> bool { + fmt.contains("HH") || fmt.contains("mm") || fmt.contains("ss") +} + +/// Parse Spark date or timestamp +fn parse_spark_naive( + value: &str, + spark_fmt: &str, +) -> Result<(NaiveDateTime, Option), chrono::ParseError> { + let (chrono_fmt, precision) = spark_to_chrono(spark_fmt); + + if spark_format_has_time(spark_fmt) { + let ts = NaiveDateTime::parse_from_str(value, &chrono_fmt)?; + Ok((ts, precision)) + } else { + let date = NaiveDate::parse_from_str(value, &chrono_fmt)?; + Ok((date.and_hms_opt(0, 0, 0).unwrap(), precision)) + } +} + +/// Normalize fractional seconds +fn normalize_fraction( + mut ts: NaiveDateTime, + precision: Option, +) -> Option { + match precision { + Some(FractionPrecision::Millis) => { + let ms = ts.and_utc().timestamp_subsec_millis(); + ts = ts.with_nanosecond(ms * 1_000_000)?; + } + Some(FractionPrecision::Micros) => { + let us = ts.and_utc().timestamp_subsec_micros(); + ts = ts.with_nanosecond(us * 1_000)?; + } + Some(FractionPrecision::Nanos) | None => {} + } + Some(ts) +} + +/// Final Spark-like timestamp parse → UTC micros +pub fn spark_to_timestamp_parse( + value: &str, + spark_fmt: &str, + tz: Tz, +) -> Result { + let (naive, precision) = parse_spark_naive(value, spark_fmt).map_err(|_| { + DataFusionError::Plan(format!("Error parsing '{value}' with format '{spark_fmt}'")) + })?; + + let naive = normalize_fraction(naive, precision) + .ok_or_else(|| DataFusionError::Plan("Invalid fractional timestamp".into()))?; + + let local: DateTime = tz.from_local_datetime(&naive).single().ok_or_else(|| { + DataFusionError::Plan(format!("Ambiguous or invalid datetime in timezone {tz}")) + })?; + + Ok(local.timestamp_micros()) +} + +pub fn to_timestamp(args: &[ColumnarValue]) -> Result { + make_scalar_function(spark_to_timestamp, vec![])(args) +} + +pub fn spark_to_timestamp(args: &[ArrayRef]) -> Result { + if args.len() < 2 || args.len() > 3 { + return internal_err!( + "`{}` function requires 2 or 3 arguments, got {} arguments", + TO_TIMESTAMP, + args.len() + ); + } + let dates: &StringArray = downcast_named_arg!(&args[0], "date", StringArray); + let format: &str = downcast_named_arg!(&args[1], "format", StringArray).value(0); + let tz: Tz = opt_downcast_arg!(&args[2], StringArray) + .and_then(|v| Tz::from_str(v.value(0)).ok()) + .unwrap_or(Tz::UTC); + + let utc_tz: String = chrono_tz::UTC.to_string(); + let utc_tz: Arc = Arc::from(utc_tz); + let values: Result> = dates + .iter() + .map(|value| match value { + None => { + ScalarValue::Int64(None).cast_to(&Timestamp(Microsecond, Some(Arc::clone(&utc_tz)))) + } + Some(date_raw) => { + let parsed_value: Result = spark_to_timestamp_parse(date_raw, format, tz); + + ScalarValue::Int64(Some(parsed_value?)) + .cast_to(&Timestamp(Microsecond, Some(Arc::clone(&utc_tz)))) + } + }) + .collect::>>(); + + let scalar_values: Vec = values?; + let decimal_array: ArrayRef = ScalarValue::iter_to_array(scalar_values)?; + + Ok(decimal_array) +} + +macro_rules! opt_downcast_arg { + ($ARG:expr, $ARRAY_TYPE:ident) => {{ + $ARG.as_any().downcast_ref::<$ARRAY_TYPE>() + }}; +} + +pub(crate) use opt_downcast_arg; + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{NaiveDate, NaiveDateTime}; + use chrono_tz::UTC; + + // ---------------------------- + // detect_fraction_precision + // ---------------------------- + + #[test] + fn detects_no_fraction() { + assert_eq!(detect_fraction_precision("yyyy-MM-dd HH:mm:ss"), None); + } + + #[test] + fn detects_millis_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSS"), + Some(FractionPrecision::Millis) + ); + } + + #[test] + fn detects_micros_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSSSSS"), + Some(FractionPrecision::Micros) + ); + } + + #[test] + fn detects_nanos_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"), + Some(FractionPrecision::Nanos) + ); + } + + // ---------------------------- + // spark_to_chrono + // ---------------------------- + + #[test] + fn converts_basic_date_format() { + let (fmt, precision) = spark_to_chrono("yyyy-MM-dd"); + assert_eq!(fmt, "%Y-%m-%d"); + assert_eq!(precision, None); + } + + #[test] + fn converts_timestamp_with_millis() { + let (fmt, precision) = spark_to_chrono("yyyy-MM-dd HH:mm:ss.SSS"); + + assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%.f"); + assert_eq!(precision, Some(FractionPrecision::Millis)); + } + + #[test] + fn converts_timestamp_with_timezone() { + let (fmt, _) = spark_to_chrono("yyyy-MM-dd HH:mm:ssXXX"); + + assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%:z"); + } + + // ---------------------------- + // spark_format_has_time + // ---------------------------- + + #[test] + fn detects_date_only_format() { + assert!(!spark_format_has_time("yyyy-MM-dd")); + } + + #[test] + fn detects_timestamp_format() { + assert!(spark_format_has_time("yyyy-MM-dd HH:mm:ss")); + } + + // ---------------------------- + // parse_spark_naive + // ---------------------------- + + #[test] + fn parses_date_as_midnight_timestamp() { + let (ts, _) = parse_spark_naive("2026-01-30", "yyyy-MM-dd").unwrap(); + + let expected = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + assert_eq!(ts, expected); + } + + #[test] + fn parses_timestamp_with_millis() { + let (ts, precision) = + parse_spark_naive("2026-01-30 10:30:52.123", "yyyy-MM-dd HH:mm:ss.SSS").unwrap(); + + assert_eq!(precision, Some(FractionPrecision::Millis)); + assert_eq!(ts.and_utc().timestamp_subsec_millis(), 123); + } + + // ---------------------------- + // normalize_fraction + // ---------------------------- + + #[test] + fn normalizes_millis_precision() { + let ts = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123456", "%Y-%m-%d %H:%M:%S%.6f") + .unwrap(); + + let normalized = normalize_fraction(ts, Some(FractionPrecision::Millis)).unwrap(); + + assert_eq!(normalized.and_utc().timestamp_subsec_nanos(), 123_000_000); + } + + #[test] + fn normalizes_micros_precision() { + let ts = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123456", "%Y-%m-%d %H:%M:%S%.6f") + .unwrap(); + + let normalized = normalize_fraction(ts, Some(FractionPrecision::Micros)).unwrap(); + + assert_eq!(normalized.and_utc().timestamp_subsec_nanos(), 123_456_000); + } + + // ---------------------------- + // spark_to_timestamp_parse (end-to-end) + // ---------------------------- + + #[test] + fn parses_timestamp_and_preserves_millis() { + let micros = + spark_to_timestamp_parse("2026-01-30 10:30:52.123", "yyyy-MM-dd HH:mm:ss.SSS", UTC) + .unwrap(); + + let expected = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123", "%Y-%m-%d %H:%M:%S%.3f") + .unwrap() + .and_utc() + .timestamp_micros(); + + assert_eq!(micros, expected); + } + + #[test] + fn parses_date_literal_as_midnight() { + let micros = spark_to_timestamp_parse("2026-01-30", "yyyy-MM-dd", UTC).unwrap(); + + let expected = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap() + .and_utc() + .timestamp_micros(); + + assert_eq!(micros, expected); + } +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 066680456e..654c9a7359 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -205,7 +205,10 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekDay] -> CometWeekDay, classOf[DayOfYear] -> CometDayOfYear, classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) + classOf[Quarter] -> CometQuarter, + classOf[GetTimestamp] -> CometGetTimestamp, + classOf[ParseToDate] -> CometParseToDate, + classOf[ParseToTimestamp] -> CometParseToTimestamp) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index a623146916..f85465e90e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, GetTimestamp, Hour, LastDay, Literal, Minute, Month, ParseToDate, ParseToTimestamp, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -176,6 +176,108 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF } } +object CometGetTimestamp extends CometExpressionSerde[GetTimestamp] { + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: GetTimestamp, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val leftExpr: Option[Expr] = + exprToProtoInternal(expr.left, inputs, binding) // timestamp or date + val rightExpr: Option[Expr] = exprToProtoInternal(expr.right, inputs, binding) // format + val tZ: Option[Expr] = + expr.timeZoneId.flatMap(tz => exprToProtoInternal(Literal(tz), inputs, binding)) + scalarFunctionExprToProtoWithReturnType( + "to_timestamp", + expr.dataType, + failOnError = expr.failOnError, + args = leftExpr, + rightExpr, + tZ) + } +} + +object CometParseToDate extends CometExpressionSerde[ParseToDate] { + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: ParseToDate, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childExpr: Option[Expr] = exprToProtoInternal(expr.left, inputs, binding) + val failOnErrorExpr: Option[Expr] = + exprToProtoInternal(Literal(expr.ansiEnabled), inputs, binding) + scalarFunctionExprToProtoWithReturnType( + "to_date", + expr.dataType, + expr.ansiEnabled, + childExpr, + failOnErrorExpr) + } +} + +object CometParseToTimestamp extends CometExpressionSerde[ParseToTimestamp] { + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: ParseToTimestamp, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childExpr: Option[Expr] = exprToProtoInternal(expr.left, inputs, binding) + val failOnErrorExpr: Option[Expr] = + exprToProtoInternal(Literal(expr.failOnError), inputs, binding) + scalarFunctionExprToProtoWithReturnType( + "to_timestamp", + expr.dataType, + expr.failOnError, + childExpr, + failOnErrorExpr) + } +} + object CometHour extends CometExpressionSerde[Hour] { override def convert( expr: Hour, diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql new file mode 100644 index 0000000000..ec5a9916c2 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql @@ -0,0 +1,46 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- to_date function +statement +CREATE TABLE test_to_date(col STRING) USING parquet + +statement +INSERT INTO test_to_date VALUES ('2026-01-30'), ('2026-03-10'), (NULL) + +query +SELECT col, to_date(col) FROM test_to_date + +statement +CREATE TABLE test_to_date_fmt(col STRING) USING parquet + +statement +INSERT INTO test_to_date_fmt VALUES ('2026/01/30'), ('2026/03/10'), (NULL) + +query +SELECT col, to_date(col, 'yyyy/MM/dd') FROM test_to_date_fmt + +query +SELECT to_date('2026-01-30') + + query +SELECT to_date('2026/01/30', 'yyyy/MM/dd') + +query +SELECT to_date('2026-01-30 10:30:00') diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_timestamp.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_timestamp.sql new file mode 100644 index 0000000000..9ba8e4d770 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_timestamp.sql @@ -0,0 +1,47 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Config: spark.comet.expression.Cast.allowIncompatible=true +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- to_timestamp function +statement +CREATE TABLE test_to_timestamp(col STRING) USING parquet + +statement +INSERT INTO test_to_timestamp VALUES ('2026-01-30'), ('2026-03-10'), (NULL) + +query +SELECT col, to_timestamp(col) FROM test_to_timestamp + +statement +CREATE TABLE test_to_timestamp_fmt(col STRING) USING parquet + +statement +INSERT INTO test_to_timestamp_fmt VALUES ('2026/01/30 10:30:52'), ('2026/03/10 10:30:52'), (NULL) + +query +SELECT col, to_timestamp(col, 'yyyy/MM/dd HH:mm:ss') FROM test_to_timestamp_fmt + +query +SELECT to_timestamp('2026-01-30') + + query +SELECT to_timestamp('2026/01/30 10:30:52', 'yyyy/MM/dd HH:mm:ss') + +query +SELECT to_timestamp('2026-01-30 10:30:52') diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..075eb56043 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -173,6 +173,190 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } + test("to_date parses date literal") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_date('2026-01-30')") + } + } + + test("to_date parses date literal with explicit format") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_date('2026/01/30', 'yyyy/MM/dd')") + } + } + + test("to_date parses date string column") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026-01-30"), Row("2026-03-10"), Row("2026-10-10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT dt_str, to_date(dt_str) FROM string_tbl") + } + } + + test("to_date parses date string column with explicit format") { + withoutConstantFolding { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026/01/30"), Row("2026/03/10"), Row("2026/10/10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') FROM string_tbl") + } + } + } + + test("to_date parses timestamp literal string") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_date('2026-01-30 04:17:52')") + } + } + + test("to_date parses timestamp string column") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026-01-30 04:17:52"), + Row("2026-03-10 04:17:52"), + Row("2026-10-10 04:17:52"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT dt_str, to_date(dt_str) FROM string_tbl") + } + } + + test("to_timestamp parses date literal as 00:00:00 timestamp") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026-01-30')") + } + } + + test("to_timestamp parses timestamp literal with default format") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026-01-30 10:30:52')") + } + } + + test("to_timestamp parses date literal using explicit date format") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026/01/30', 'yyyy/MM/dd')") + } + } + + test("to_timestamp parses timestamp literal using explicit timestamp format") { + withoutConstantFolding { + checkSparkAnswer("SELECT to_timestamp('2026/01/30 10:30:52', 'yyyy/MM/dd HH:mm:ss')") + } + } + + test("to_timestamp parses date string column as midnight timestamp") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026-01-30"), Row("2026-03-10"), Row("2026-10-10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT ts_str, to_timestamp(ts_str) FROM string_tbl") + } + } + + test("to_timestamp parses date string column using explicit date format") { + withoutConstantFolding { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026/01/30"), Row("2026/03/10"), Row("2026/10/10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT ts_str, to_timestamp(ts_str, 'yyyy/MM/dd') FROM string_tbl") + } + } + } + + test("to_timestamp parses timestamp string column with default format") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026-01-30 04:17:52"), + Row("2026-03-10 04:17:52"), + Row("2026-10-10 04:17:52"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer("SELECT ts_str, to_timestamp(ts_str) FROM string_tbl") + } + } + + test("to_timestamp parses timestamp string column using explicit timestamp format") { + withoutConstantFolding { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026/01/30 04:17:52"), + Row("2026/03/10 04:17:52"), + Row("2026/10/10 04:17:52"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer( + "SELECT ts_str, to_timestamp(ts_str, 'yyyy/MM/dd HH:mm:ss') FROM string_tbl") + } + } + } + test("to_timestamp parses timestamp literal with milliseconds") { + withoutConstantFolding { + checkSparkAnswer( + "SELECT to_timestamp('2026-01-30 10:30:52.123', 'yyyy-MM-dd HH:mm:ss.SSS')") + } + } + + test("to_timestamp parses timestamp string column with milliseconds") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026-01-30 10:30:52.123"), + Row("2026-03-10 10:30:52.999"), + Row("2026-10-10 10:30:52.000"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswer( + "SELECT ts_str, to_timestamp(ts_str, 'yyyy-MM-dd HH:mm:ss.SSS') FROM string_tbl") + } + } + private def createTimestampTestData = { val r = new Random(42) val schema = StructType( @@ -395,4 +579,9 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } + + private def withoutConstantFolding[A](f: => A): Unit = + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding")(f) }