From a81ab4f3eea335e6cc9d28ab28892af81063720b Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Thu, 15 Jan 2026 15:57:24 +0400 Subject: [PATCH 1/9] feat(spark): add trunc, date_trunc and time_trunc functions --- .../spark/src/function/datetime/date_trunc.rs | 131 ++++++++++++++++ datafusion/spark/src/function/datetime/mod.rs | 24 +++ .../spark/src/function/datetime/time_trunc.rs | 122 +++++++++++++++ .../spark/src/function/datetime/trunc.rs | 145 +++++++++++++++++ .../test_files/spark/datetime/date_trunc.slt | 147 ++++++++++++++---- .../test_files/spark/datetime/time_trunc.slt | 74 +++++++++ .../test_files/spark/datetime/trunc.slt | 101 +++++++++--- 7 files changed, 689 insertions(+), 55 deletions(-) create mode 100644 datafusion/spark/src/function/datetime/date_trunc.rs create mode 100644 datafusion/spark/src/function/datetime/time_trunc.rs create mode 100644 datafusion/spark/src/function/datetime/trunc.rs create mode 100644 datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt diff --git a/datafusion/spark/src/function/datetime/date_trunc.rs b/datafusion/spark/src/function/datetime/date_trunc.rs new file mode 100644 index 0000000000000..8626967eeabc9 --- /dev/null +++ b/datafusion/spark/src/function/datetime/date_trunc.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; +use datafusion_common::types::{NativeType, logical_string}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_expr::{ + Coercion, ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, + Signature, TypeSignatureClass, Volatility, +}; + +/// Spark date_trunc supports extra format aliases. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkDateTrunc { + signature: Signature, +} + +impl Default for SparkDateTrunc { + fn default() -> Self { + Self::new() + } +} + +impl SparkDateTrunc { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_implicit( + TypeSignatureClass::Timestamp, + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Timestamp(TimeUnit::Microsecond, None), + ), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkDateTrunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "date_trunc" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + + Ok(Arc::new(Field::new( + self.name(), + args.arg_fields[1].data_type().clone(), + nullable, + ))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!( + "spark date_trunc should have been simplified to standard date_trunc" + ) + } + + fn simplify( + &self, + args: Vec, + _info: &SimplifyContext, + ) -> Result { + let [fmt_expr, ts_expr] = take_function_args(self.name(), args)?; + + let fmt = match fmt_expr.as_literal() { + Some(ScalarValue::Utf8(Some(v))) + | Some(ScalarValue::Utf8View(Some(v))) + | Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(), + _ => { + return internal_err!( + "First argument of `DATE_TRUNC` must be non-null scalar Utf8" + ); + } + }; + + // Map Spark-specific fmt aliases to datafusion ones + let fmt = match fmt.as_str() { + "yy" | "yyyy" => "year", + "mm" | "mon" => "month", + "dd" => "day", + other => other, + }; + + let fmt_expr = Expr::Literal(ScalarValue::new_utf8(fmt), None); + + Ok(ExprSimplifyResult::Simplified(Expr::ScalarFunction( + ScalarFunction::new_udf( + datafusion_functions::datetime::date_trunc(), + vec![fmt_expr, ts_expr], + ), + ))) + } +} diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 849aa20895990..e20dfef45ef24 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -17,11 +17,14 @@ pub mod date_add; pub mod date_sub; +pub mod date_trunc; pub mod extract; pub mod last_day; pub mod make_dt_interval; pub mod make_interval; pub mod next_day; +pub mod time_trunc; +pub mod trunc; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; @@ -36,6 +39,9 @@ make_udf_function!(last_day::SparkLastDay, last_day); make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval); make_udf_function!(make_interval::SparkMakeInterval, make_interval); make_udf_function!(next_day::SparkNextDay, next_day); +make_udf_function!(date_trunc::SparkDateTrunc, date_trunc); +make_udf_function!(time_trunc::SparkTimeTrunc, time_trunc); +make_udf_function!(trunc::SparkTrunc, trunc); pub mod expr_fn { use datafusion_functions::export_functions; @@ -83,6 +89,21 @@ pub mod expr_fn { "Returns the first date which is later than start_date and named as indicated. The function returns NULL if at least one of the input parameters is NULL.", arg1 arg2 )); + export_functions!(( + date_trunc, + "Truncates a timestamp `ts` to the unit specified by the format `fmt`.", + fmt ts + )); + export_functions!(( + time_trunc, + "Truncates a time `t` to the unit specified by the format `fmt`.", + fmt t + )); + export_functions!(( + trunc, + "Truncates a date `dt` to the unit specified by the format `fmt`.", + dt fmt + )); } pub fn functions() -> Vec> { @@ -96,5 +117,8 @@ pub fn functions() -> Vec> { make_dt_interval(), make_interval(), next_day(), + date_trunc(), + time_trunc(), + trunc(), ] } diff --git a/datafusion/spark/src/function/datetime/time_trunc.rs b/datafusion/spark/src/function/datetime/time_trunc.rs new file mode 100644 index 0000000000000..5c2900fcfef4e --- /dev/null +++ b/datafusion/spark/src/function/datetime/time_trunc.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::types::logical_string; +use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_expr::{ + Coercion, ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, + Signature, TypeSignatureClass, Volatility, +}; + +/// Spark time_trunc function only handles time inputs. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkTimeTrunc { + signature: Signature, +} + +impl Default for SparkTimeTrunc { + fn default() -> Self { + Self::new() + } +} + +impl SparkTimeTrunc { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Time), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkTimeTrunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "time_trunc" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + + Ok(Arc::new(Field::new( + self.name(), + args.arg_fields[1].data_type().clone(), + nullable, + ))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!( + "spark time_trunc should have been simplified to standard date_trunc" + ) + } + + fn simplify( + &self, + args: Vec, + _info: &SimplifyContext, + ) -> Result { + let fmt_expr = &args[0]; + + let fmt = match fmt_expr.as_literal() { + Some(ScalarValue::Utf8(Some(v))) + | Some(ScalarValue::Utf8View(Some(v))) + | Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(), + _ => { + return internal_err!( + "First argument of `TIME_TRUNC` must be non-null scalar Utf8" + ); + } + }; + + if !matches!( + fmt.as_str(), + "hour" | "minute" | "second" | "millisecond" | "microsecond" + ) { + return internal_err!( + "The format argument of `TIME_TRUNC` must be one of: hour, minute, second, millisecond, microsecond" + ); + } + + Ok(ExprSimplifyResult::Simplified(Expr::ScalarFunction( + ScalarFunction::new_udf(datafusion_functions::datetime::date_trunc(), args), + ))) + } +} diff --git a/datafusion/spark/src/function/datetime/trunc.rs b/datafusion/spark/src/function/datetime/trunc.rs new file mode 100644 index 0000000000000..d908a57a56f38 --- /dev/null +++ b/datafusion/spark/src/function/datetime/trunc.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; +use datafusion_common::types::{NativeType, logical_date, logical_string}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_expr::{ + Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs, + ScalarUDFImpl, Signature, TypeSignatureClass, Volatility, +}; + +/// Spark trunc supports date inputs only and extra format aliases. +/// Also spark trunc's argument order is (date, format). +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkTrunc { + signature: Signature, +} + +impl Default for SparkTrunc { + fn default() -> Self { + Self::new() + } +} + +impl SparkTrunc { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![ + Coercion::new_implicit( + TypeSignatureClass::Native(logical_date()), + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Date, + ), + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkTrunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "trunc" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + + Ok(Arc::new(Field::new( + self.name(), + args.arg_fields[0].data_type().clone(), + nullable, + ))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + internal_err!( + "spark date_trunc should have been simplified to standard date_trunc" + ) + } + + fn simplify( + &self, + args: Vec, + info: &SimplifyContext, + ) -> Result { + let [dt_expr, fmt_expr] = take_function_args(self.name(), args)?; + + let fmt = match fmt_expr.as_literal() { + Some(ScalarValue::Utf8(Some(v))) + | Some(ScalarValue::Utf8View(Some(v))) + | Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(), + _ => { + return internal_err!( + "First argument of `TRUNC` must be non-null scalar Utf8" + ); + } + }; + + // Map Spark-specific fmt aliases to datafusion ones + let fmt = match fmt.as_str() { + "yy" | "yyyy" => "year", + "mm" | "mon" => "month", + "year" | "month" | "day" | "week" | "quarter" => fmt.as_str(), + _ => { + return internal_err!( + "The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter." + ); + } + }; + let return_type = dt_expr.get_type(info.schema())?; + + let fmt_expr = Expr::Literal(ScalarValue::new_utf8(fmt), None); + + // Spark uses Dates so we need to cast to timestamp and back to work with datafusion's date_trunc + Ok(ExprSimplifyResult::Simplified( + Expr::ScalarFunction(ScalarFunction::new_udf( + datafusion_functions::datetime::date_trunc(), + vec![ + fmt_expr, + dt_expr.cast_to( + &DataType::Timestamp(TimeUnit::Nanosecond, None), + info.schema(), + )?, + ], + )) + .cast_to(&return_type, info.schema())?, + )) + } +} diff --git a/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt index 8a15254e6795e..a66ca755093d4 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt @@ -15,33 +15,120 @@ # specific language governing permissions and limitations # under the License. -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT date_trunc('DD', '2015-03-05T09:32:05.359'); -## PySpark 3.5.5 Result: {'date_trunc(DD, 2015-03-05T09:32:05.359)': datetime.datetime(2015, 3, 5, 0, 0), 'typeof(date_trunc(DD, 2015-03-05T09:32:05.359))': 'timestamp', 'typeof(DD)': 'string', 'typeof(2015-03-05T09:32:05.359)': 'string'} -#query -#SELECT date_trunc('DD'::string, '2015-03-05T09:32:05.359'::string); - -## Original Query: SELECT date_trunc('HOUR', '2015-03-05T09:32:05.359'); -## PySpark 3.5.5 Result: {'date_trunc(HOUR, 2015-03-05T09:32:05.359)': datetime.datetime(2015, 3, 5, 9, 0), 'typeof(date_trunc(HOUR, 2015-03-05T09:32:05.359))': 'timestamp', 'typeof(HOUR)': 'string', 'typeof(2015-03-05T09:32:05.359)': 'string'} -#query -#SELECT date_trunc('HOUR'::string, '2015-03-05T09:32:05.359'::string); - -## Original Query: SELECT date_trunc('MILLISECOND', '2015-03-05T09:32:05.123456'); -## PySpark 3.5.5 Result: {'date_trunc(MILLISECOND, 2015-03-05T09:32:05.123456)': datetime.datetime(2015, 3, 5, 9, 32, 5, 123000), 'typeof(date_trunc(MILLISECOND, 2015-03-05T09:32:05.123456))': 'timestamp', 'typeof(MILLISECOND)': 'string', 'typeof(2015-03-05T09:32:05.123456)': 'string'} -#query -#SELECT date_trunc('MILLISECOND'::string, '2015-03-05T09:32:05.123456'::string); - -## Original Query: SELECT date_trunc('MM', '2015-03-05T09:32:05.359'); -## PySpark 3.5.5 Result: {'date_trunc(MM, 2015-03-05T09:32:05.359)': datetime.datetime(2015, 3, 1, 0, 0), 'typeof(date_trunc(MM, 2015-03-05T09:32:05.359))': 'timestamp', 'typeof(MM)': 'string', 'typeof(2015-03-05T09:32:05.359)': 'string'} -#query -#SELECT date_trunc('MM'::string, '2015-03-05T09:32:05.359'::string); - -## Original Query: SELECT date_trunc('YEAR', '2015-03-05T09:32:05.359'); -## PySpark 3.5.5 Result: {'date_trunc(YEAR, 2015-03-05T09:32:05.359)': datetime.datetime(2015, 1, 1, 0, 0), 'typeof(date_trunc(YEAR, 2015-03-05T09:32:05.359))': 'timestamp', 'typeof(YEAR)': 'string', 'typeof(2015-03-05T09:32:05.359)': 'string'} -#query -#SELECT date_trunc('YEAR'::string, '2015-03-05T09:32:05.359'::string); +# YEAR - truncate to first date of year, time zeroed +query P +SELECT date_trunc('YEAR', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-01-01T00:00:00 + +query P +SELECT date_trunc('YYYY', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-01-01T00:00:00 + +query P +SELECT date_trunc('YY', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-01-01T00:00:00 + +# QUARTER - truncate to first date of quarter, time zeroed +query P +SELECT date_trunc('QUARTER', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-01-01T00:00:00 + +# MONTH - truncate to first date of month, time zeroed +query P +SELECT date_trunc('MONTH', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-01T00:00:00 + +query P +SELECT date_trunc('MM', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-01T00:00:00 + +query P +SELECT date_trunc('MON', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-01T00:00:00 + +# WEEK - truncate to Monday of the week, time zeroed +query P +SELECT date_trunc('WEEK', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-02T00:00:00 + +# DAY - zero out time part +query P +SELECT date_trunc('DAY', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-05T00:00:00 + +query P +SELECT date_trunc('DD', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-05T00:00:00 + +# HOUR - zero out minute and second with fraction +query P +SELECT date_trunc('HOUR', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-05T09:00:00 + +# MINUTE - zero out second with fraction +query P +SELECT date_trunc('MINUTE', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-05T09:32:00 + +# SECOND - zero out fraction +query P +SELECT date_trunc('SECOND', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-05T09:32:05 + +# MILLISECOND - zero out microseconds +query P +SELECT date_trunc('MILLISECOND', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-05T09:32:05.123 + +# MICROSECOND - everything remains +query P +SELECT date_trunc('MICROSECOND', '2015-03-05T09:32:05.123456'::timestamp); +---- +2015-03-05T09:32:05.123456 + +query P +SELECT date_trunc('YEAR', column1) +FROM VALUES +('2015-03-05T09:32:05.123456'::timestamp), +('2020-11-15T22:45:30.654321'::timestamp), +('1999-07-20T14:20:10.000001'::timestamp), +(NULL::timestamp); +---- +2015-01-01T00:00:00 +2020-01-01T00:00:00 +1999-01-01T00:00:00 +NULL + +# String input +query P +SELECT date_trunc('YEAR', '2015-03-05T09:32:05.123456'); +---- +2015-01-01T00:00:00 + +# Null handling +query error Internal error: First argument of `DATE_TRUNC` must be non-null scalar Utf8. +SELECT date_trunc(NULL, '2015-03-05T09:32:05.123456'); + +query P +SELECT date_trunc('YEAR', NULL::timestamp); +---- +NULL + +# incorrect format +query error DataFusion error: Execution error: Unsupported date_trunc granularity: 'test'. Supported values are: microsecond, millisecond, second, minute, hour, day, week, month, quarter, year +SELECT date_trunc('test', '2015-03-05T09:32:05.123456'); + diff --git a/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt new file mode 100644 index 0000000000000..e879172b0b09e --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# HOUR - zero out minute and second with fraction +query D +SELECT time_trunc('HOUR', '09:32:05.123456'::time); +---- +09:00:00 + +# MINUTE - zero out second with fraction +query D +SELECT time_trunc('MINUTE', '09:32:05.123456'::time); +---- +09:32:00 + +# SECOND - zero out fraction +query D +SELECT time_trunc('SECOND', '09:32:05.123456'::time); +---- +09:32:05 + +# MILLISECOND - zero out microseconds +query D +SELECT time_trunc('MILLISECOND', '09:32:05.123456'::time); +---- +09:32:05.123 + +# MICROSECOND - everything remains +query D +SELECT time_trunc('MICROSECOND', '09:32:05.123456'::time); +---- +09:32:05.123456 + +query D +SELECT time_trunc('HOUR', column1) +FROM VALUES +('09:32:05.123456'::time), +('22:45:30.654321'::time), +('14:20:10.000001'::time), +(NULL::time); +---- +09:00:00 +22:00:00 +14:00:00 +NULL + + +# Null handling +query error Internal error: First argument of `TIME_TRUNC` must be non-null scalar Utf8. +SELECT time_trunc(NULL, '09:32:05.123456'::time); + +query D +SELECT time_trunc('HOUR', NULL::time); +---- +NULL + +# incorrect format +query error nternal error: The format argument of `TIME_TRUNC` must be one of: hour, minute, second, millisecond, microsecond. +SELECT time_trunc('test', '09:32:05.123456'::time); + diff --git a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt index a502e2f7f7b00..0ed8665fd5c13 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt @@ -15,28 +15,79 @@ # specific language governing permissions and limitations # under the License. -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT trunc('2009-02-12', 'MM'); -## PySpark 3.5.5 Result: {'trunc(2009-02-12, MM)': datetime.date(2009, 2, 1), 'typeof(trunc(2009-02-12, MM))': 'date', 'typeof(2009-02-12)': 'string', 'typeof(MM)': 'string'} -#query -#SELECT trunc('2009-02-12'::string, 'MM'::string); - -## Original Query: SELECT trunc('2015-10-27', 'YEAR'); -## PySpark 3.5.5 Result: {'trunc(2015-10-27, YEAR)': datetime.date(2015, 1, 1), 'typeof(trunc(2015-10-27, YEAR))': 'date', 'typeof(2015-10-27)': 'string', 'typeof(YEAR)': 'string'} -#query -#SELECT trunc('2015-10-27'::string, 'YEAR'::string); - -## Original Query: SELECT trunc('2019-08-04', 'quarter'); -## PySpark 3.5.5 Result: {'trunc(2019-08-04, quarter)': datetime.date(2019, 7, 1), 'typeof(trunc(2019-08-04, quarter))': 'date', 'typeof(2019-08-04)': 'string', 'typeof(quarter)': 'string'} -#query -#SELECT trunc('2019-08-04'::string, 'quarter'::string); - -## Original Query: SELECT trunc('2019-08-04', 'week'); -## PySpark 3.5.5 Result: {'trunc(2019-08-04, week)': datetime.date(2019, 7, 29), 'typeof(trunc(2019-08-04, week))': 'date', 'typeof(2019-08-04)': 'string', 'typeof(week)': 'string'} -#query -#SELECT trunc('2019-08-04'::string, 'week'::string); +# YEAR - truncate to first date of year +query D +SELECT trunc('2009-02-12'::date, 'YEAR'::string); +---- +2009-01-01 + +query D +SELECT trunc('2009-02-12'::date, 'YYYY'::string); +---- +2009-01-01 + +query D +SELECT trunc('2009-02-12'::date, 'YY'::string); +---- +2009-01-01 + +# QUARTER - truncate to first date of quarter +query D +SELECT trunc('2009-02-12'::date, 'QUARTER'::string); +---- +2009-01-01 + +# MONTH - truncate to first date of month +query D +SELECT trunc('2009-02-12'::date, 'MONTH'::string); +---- +2009-02-01 + +query D +SELECT trunc('2009-02-12'::date, 'MM'::string); +---- +2009-02-01 + +query D +SELECT trunc('2009-02-12'::date, 'MON'::string); +---- +2009-02-01 + +# WEEK - truncate to Monday of the week +query D +SELECT trunc('2009-02-12'::date, 'WEEK'::string); +---- +2009-02-09 + +# string input +query D +SELECT trunc('2009-02-12'::string, 'YEAR'::string); +---- +2009-01-01 + +query D +SELECT trunc(column1, 'YEAR'::string) +FROM VALUES +('2009-02-12'::date), +('2000-02-12'::date), +('2042-02-12'::date), +(NULL::date); +---- +2009-01-01 +2000-01-01 +2042-01-01 +NULL + +# Null handling +query D +SELECT trunc(NULL::date, 'YEAR'::string); +---- +NULL + +query error Internal error: First argument of `TRUNC` must be non-null scalar Utf8. +SELECT trunc('2009-02-12'::date, NULL::string); + +# incorrect format +query error nternal error: The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter. +SELECT trunc('2009-02-12'::date, 'test'::string); + From e214d6551065139369444a3dc798632f0975800f Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Thu, 15 Jan 2026 16:01:45 +0400 Subject: [PATCH 2/9] fix: typo in test --- datafusion/sqllogictest/test_files/spark/datetime/trunc.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt index 0ed8665fd5c13..32893937ab060 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt @@ -88,6 +88,6 @@ query error Internal error: First argument of `TRUNC` must be non-null scalar Ut SELECT trunc('2009-02-12'::date, NULL::string); # incorrect format -query error nternal error: The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter. +query error Internal error: The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter. SELECT trunc('2009-02-12'::date, 'test'::string); From 9d7addf6a82d3c2d36bb53317c3a53fdf0e04283 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Thu, 15 Jan 2026 21:56:07 +0400 Subject: [PATCH 3/9] fix: replace internal_err with plan_err in date and time trunc functions --- datafusion/spark/src/function/datetime/date_trunc.rs | 4 ++-- datafusion/spark/src/function/datetime/time_trunc.rs | 7 ++++--- datafusion/spark/src/function/datetime/trunc.rs | 12 +++++------- .../test_files/spark/datetime/time_trunc.slt | 4 ++-- .../sqllogictest/test_files/spark/datetime/trunc.slt | 4 ++-- 5 files changed, 15 insertions(+), 16 deletions(-) diff --git a/datafusion/spark/src/function/datetime/date_trunc.rs b/datafusion/spark/src/function/datetime/date_trunc.rs index 8626967eeabc9..7107cec6dd84d 100644 --- a/datafusion/spark/src/function/datetime/date_trunc.rs +++ b/datafusion/spark/src/function/datetime/date_trunc.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; use datafusion_common::types::{NativeType, logical_string}; use datafusion_common::utils::take_function_args; -use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_common::{Result, ScalarValue, internal_err, plan_err}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; use datafusion_expr::{ @@ -105,7 +105,7 @@ impl ScalarUDFImpl for SparkDateTrunc { | Some(ScalarValue::Utf8View(Some(v))) | Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(), _ => { - return internal_err!( + return plan_err!( "First argument of `DATE_TRUNC` must be non-null scalar Utf8" ); } diff --git a/datafusion/spark/src/function/datetime/time_trunc.rs b/datafusion/spark/src/function/datetime/time_trunc.rs index 5c2900fcfef4e..de85134948181 100644 --- a/datafusion/spark/src/function/datetime/time_trunc.rs +++ b/datafusion/spark/src/function/datetime/time_trunc.rs @@ -20,7 +20,8 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::types::logical_string; -use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, internal_err, plan_err}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; use datafusion_expr::{ @@ -100,7 +101,7 @@ impl ScalarUDFImpl for SparkTimeTrunc { | Some(ScalarValue::Utf8View(Some(v))) | Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(), _ => { - return internal_err!( + return plan_err!( "First argument of `TIME_TRUNC` must be non-null scalar Utf8" ); } @@ -110,7 +111,7 @@ impl ScalarUDFImpl for SparkTimeTrunc { fmt.as_str(), "hour" | "minute" | "second" | "millisecond" | "microsecond" ) { - return internal_err!( + return plan_err!( "The format argument of `TIME_TRUNC` must be one of: hour, minute, second, millisecond, microsecond" ); } diff --git a/datafusion/spark/src/function/datetime/trunc.rs b/datafusion/spark/src/function/datetime/trunc.rs index d908a57a56f38..b584cc9a70d44 100644 --- a/datafusion/spark/src/function/datetime/trunc.rs +++ b/datafusion/spark/src/function/datetime/trunc.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; use datafusion_common::types::{NativeType, logical_date, logical_string}; use datafusion_common::utils::take_function_args; -use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_common::{Result, ScalarValue, internal_err, plan_err}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; use datafusion_expr::{ @@ -89,9 +89,7 @@ impl ScalarUDFImpl for SparkTrunc { } fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { - internal_err!( - "spark date_trunc should have been simplified to standard date_trunc" - ) + internal_err!("spark trunc should have been simplified to standard date_trunc") } fn simplify( @@ -106,8 +104,8 @@ impl ScalarUDFImpl for SparkTrunc { | Some(ScalarValue::Utf8View(Some(v))) | Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(), _ => { - return internal_err!( - "First argument of `TRUNC` must be non-null scalar Utf8" + return plan_err!( + "Second argument of `TRUNC` must be non-null scalar Utf8" ); } }; @@ -118,7 +116,7 @@ impl ScalarUDFImpl for SparkTrunc { "mm" | "mon" => "month", "year" | "month" | "day" | "week" | "quarter" => fmt.as_str(), _ => { - return internal_err!( + return plan_err!( "The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter." ); } diff --git a/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt index e879172b0b09e..f00c40f0a9371 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/time_trunc.slt @@ -60,7 +60,7 @@ NULL # Null handling -query error Internal error: First argument of `TIME_TRUNC` must be non-null scalar Utf8. +query error DataFusion error: Optimizer rule 'simplify_expressions' failed\ncaused by\nError during planning: First argument of `TIME_TRUNC` must be non-null scalar Utf8 SELECT time_trunc(NULL, '09:32:05.123456'::time); query D @@ -69,6 +69,6 @@ SELECT time_trunc('HOUR', NULL::time); NULL # incorrect format -query error nternal error: The format argument of `TIME_TRUNC` must be one of: hour, minute, second, millisecond, microsecond. +query error DataFusion error: Optimizer rule 'simplify_expressions' failed\ncaused by\nError during planning: The format argument of `TIME_TRUNC` must be one of: hour, minute, second, millisecond, microsecond SELECT time_trunc('test', '09:32:05.123456'::time); diff --git a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt index 32893937ab060..f6bf6b5751ed2 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/trunc.slt @@ -84,10 +84,10 @@ SELECT trunc(NULL::date, 'YEAR'::string); ---- NULL -query error Internal error: First argument of `TRUNC` must be non-null scalar Utf8. +query error DataFusion error: Optimizer rule 'simplify_expressions' failed\ncaused by\nError during planning: Second argument of `TRUNC` must be non-null scalar Utf8 SELECT trunc('2009-02-12'::date, NULL::string); # incorrect format -query error Internal error: The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter. +query error DataFusion error: Optimizer rule 'simplify_expressions' failed\ncaused by\nError during planning: The format argument of `TRUNC` must be one of: year, yy, yyyy, month, mm, mon, day, week, quarter. SELECT trunc('2009-02-12'::date, 'test'::string); From e4025577bd0133703f28b8a36ca92882396f29c0 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Thu, 15 Jan 2026 21:59:25 +0400 Subject: [PATCH 4/9] fix --- datafusion/spark/src/function/datetime/time_trunc.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/spark/src/function/datetime/time_trunc.rs b/datafusion/spark/src/function/datetime/time_trunc.rs index de85134948181..718502a05ee6d 100644 --- a/datafusion/spark/src/function/datetime/time_trunc.rs +++ b/datafusion/spark/src/function/datetime/time_trunc.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::types::logical_string; -use datafusion_common::utils::take_function_args; use datafusion_common::{Result, ScalarValue, internal_err, plan_err}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; From 120b9e979b420699b9a1fe70de901084fdcef4dd Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Thu, 15 Jan 2026 23:10:03 +0400 Subject: [PATCH 5/9] feat(spark): enhance date_trunc to handle session timezone for truncation --- .../spark/src/function/datetime/date_trunc.rs | 44 +++++++++++++++++-- .../test_files/spark/datetime/date_trunc.slt | 36 +++++++++++++-- 2 files changed, 73 insertions(+), 7 deletions(-) diff --git a/datafusion/spark/src/function/datetime/date_trunc.rs b/datafusion/spark/src/function/datetime/date_trunc.rs index 7107cec6dd84d..880eb9a1a4786 100644 --- a/datafusion/spark/src/function/datetime/date_trunc.rs +++ b/datafusion/spark/src/function/datetime/date_trunc.rs @@ -25,11 +25,12 @@ use datafusion_common::{Result, ScalarValue, internal_err, plan_err}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; use datafusion_expr::{ - Coercion, ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, - Signature, TypeSignatureClass, Volatility, + Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs, + ScalarUDFImpl, Signature, TypeSignatureClass, Volatility, }; /// Spark date_trunc supports extra format aliases. +/// It also handles timestamps with timezones by converting to session timezone first. /// #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkDateTrunc { @@ -82,7 +83,7 @@ impl ScalarUDFImpl for SparkDateTrunc { Ok(Arc::new(Field::new( self.name(), - args.arg_fields[1].data_type().clone(), + DataType::Timestamp(TimeUnit::Microsecond, None), nullable, ))) } @@ -96,7 +97,7 @@ impl ScalarUDFImpl for SparkDateTrunc { fn simplify( &self, args: Vec, - _info: &SimplifyContext, + info: &SimplifyContext, ) -> Result { let [fmt_expr, ts_expr] = take_function_args(self.name(), args)?; @@ -119,6 +120,41 @@ impl ScalarUDFImpl for SparkDateTrunc { other => other, }; + let session_tz = info.config_options().execution.time_zone.clone(); + let ts_type = ts_expr.get_type(info.schema())?; + + // Spark interprets timestamps in the session timezone before truncating, + // then returns a timestamp without timezone at microsecond precision. + // See: https://github.com/apache/spark/blob/f310f4fcc95580a6824bc7d22b76006f79b8804a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L492 + // + // For sub-second truncations (second, millisecond, microsecond), timezone + // adjustment is unnecessary since timezone offsets are whole seconds. + let ts_expr = match (&ts_type, fmt) { + // Sub-second truncations don't need timezone adjustment + (_, "second" | "millisecond" | "microsecond") => ts_expr, + + // Timestamp with timezone: convert to local time (applying session tz if needed) + (DataType::Timestamp(_, Some(_)), _) => { + let ts_with_tz = match &session_tz { + Some(tz) => ts_expr.cast_to( + &DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::from(tz.as_str())), + ), + info.schema(), + )?, + None => ts_expr, + }; + Expr::ScalarFunction(ScalarFunction::new_udf( + datafusion_functions::datetime::to_local_time(), + vec![ts_with_tz], + )) + } + + // Timestamp without timezone: use as-is + _ => ts_expr, + }; + let fmt_expr = Expr::Literal(ScalarValue::new_utf8(fmt), None); Ok(ExprSimplifyResult::Simplified(Expr::ScalarFunction( diff --git a/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt index a66ca755093d4..705c320934fe8 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt @@ -33,9 +33,9 @@ SELECT date_trunc('YY', '2015-03-05T09:32:05.123456'::timestamp); # QUARTER - truncate to first date of quarter, time zeroed query P -SELECT date_trunc('QUARTER', '2015-03-05T09:32:05.123456'::timestamp); +SELECT date_trunc('QUARTER', '2015-05-05T09:32:05.123456'::timestamp); ---- -2015-01-01T00:00:00 +2015-04-01T00:00:00 # MONTH - truncate to first date of month, time zeroed query P @@ -120,7 +120,7 @@ SELECT date_trunc('YEAR', '2015-03-05T09:32:05.123456'); 2015-01-01T00:00:00 # Null handling -query error Internal error: First argument of `DATE_TRUNC` must be non-null scalar Utf8. +query error DataFusion error: Optimizer rule 'simplify_expressions' failed\ncaused by\nError during planning: First argument of `DATE_TRUNC` must be non-null scalar Utf8 SELECT date_trunc(NULL, '2015-03-05T09:32:05.123456'); query P @@ -132,3 +132,33 @@ NULL query error DataFusion error: Execution error: Unsupported date_trunc granularity: 'test'. Supported values are: microsecond, millisecond, second, minute, hour, day, week, month, quarter, year SELECT date_trunc('test', '2015-03-05T09:32:05.123456'); +# Timezone handling - Spark-compatible behavior +# Spark converts timestamps to session timezone before truncating for coarse granularities + +query P +SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, Some("UTC"))')); +---- +2024-07-15T00:00:00 + +query P +SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, None)')); +---- +2024-07-15T00:00:00 + +statement ok +SET datafusion.execution.time_zone = 'America/New_York'; + +# This timestamp is 03:30 UTC = 23:30 EDT (previous day) on July 14 +# With session timezone, truncation happens in America/New_York timezone +query P +SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, Some("UTC"))')); +---- +2024-07-14T00:00:00 + +query P +SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, None)')); +---- +2024-07-15T00:00:00 + +statement ok +RESET datafusion.execution.time_zone; From 3b6579ed9843343762c0411dacf5b9d1acef163e Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Fri, 16 Jan 2026 08:10:08 +0400 Subject: [PATCH 6/9] refactor: add return_field_from_args for DateTruncFunc --- .../functions/src/datetime/date_trunc.rs | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 8c8a4a1c1b771..0e961464baaf5 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -34,14 +34,16 @@ use arrow::array::types::{ use arrow::array::{Array, ArrayRef, PrimitiveArray}; use arrow::datatypes::DataType::{self, Time32, Time64, Timestamp}; use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second}; +use arrow::datatypes::{Field, FieldRef}; use datafusion_common::cast::as_primitive_array; use datafusion_common::types::{NativeType, logical_date, logical_string}; use datafusion_common::{ - DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, + DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err, }; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ - ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, + ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, }; use datafusion_expr_common::signature::{Coercion, TypeSignatureClass}; use datafusion_macros::user_doc; @@ -221,12 +223,21 @@ impl ScalarUDFImpl for DateTruncFunc { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types[1].is_null() { - Ok(Timestamp(Nanosecond, None)) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be called instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let return_type = if args.arg_fields[1].data_type().is_null() { + Timestamp(Nanosecond, None) } else { - Ok(arg_types[1].clone()) - } + args.arg_fields[1].data_type().clone() + }; + Ok(Arc::new(Field::new( + self.name(), + return_type, + args.arg_fields[1].is_nullable(), + ))) } fn invoke_with_args( From 48488eb323569e5acaf046c2cda9d3ca34155227 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Fri, 16 Jan 2026 08:49:32 +0400 Subject: [PATCH 7/9] fix: fix information schema generation --- .../functions/src/datetime/date_trunc.rs | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 0e961464baaf5..951ce7e882936 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -38,7 +38,7 @@ use arrow::datatypes::{Field, FieldRef}; use datafusion_common::cast::as_primitive_array; use datafusion_common::types::{NativeType, logical_date, logical_string}; use datafusion_common::{ - DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err, + DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, }; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ @@ -223,16 +223,23 @@ impl ScalarUDFImpl for DateTruncFunc { &self.signature } - fn return_type(&self, _arg_types: &[DataType]) -> Result { - internal_err!("return_field_from_args should be called instead") + // keep return_type implementation for information schema generation + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types[1].is_null() { + Ok(Timestamp(Nanosecond, None)) + } else { + Ok(arg_types[1].clone()) + } } fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - let return_type = if args.arg_fields[1].data_type().is_null() { - Timestamp(Nanosecond, None) - } else { - args.arg_fields[1].data_type().clone() - }; + let data_types = args + .arg_fields + .iter() + .map(|f| f.data_type()) + .cloned() + .collect::>(); + let return_type = self.return_type(&data_types)?; Ok(Arc::new(Field::new( self.name(), return_type, From fb1f92aceb101afcf6f7b197108cd0a0862fc3e3 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Fri, 16 Jan 2026 11:03:48 +0400 Subject: [PATCH 8/9] fix: update date_trunc to return UTC timestamps and improve timezone handling --- .../spark/src/function/datetime/date_trunc.rs | 17 +++++++++-------- .../test_files/spark/datetime/date_trunc.slt | 4 ++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/datafusion/spark/src/function/datetime/date_trunc.rs b/datafusion/spark/src/function/datetime/date_trunc.rs index 880eb9a1a4786..81e58a4e1c78f 100644 --- a/datafusion/spark/src/function/datetime/date_trunc.rs +++ b/datafusion/spark/src/function/datetime/date_trunc.rs @@ -83,7 +83,7 @@ impl ScalarUDFImpl for SparkDateTrunc { Ok(Arc::new(Field::new( self.name(), - DataType::Timestamp(TimeUnit::Microsecond, None), + args.arg_fields[1].data_type().clone(), nullable, ))) } @@ -124,7 +124,7 @@ impl ScalarUDFImpl for SparkDateTrunc { let ts_type = ts_expr.get_type(info.schema())?; // Spark interprets timestamps in the session timezone before truncating, - // then returns a timestamp without timezone at microsecond precision. + // then returns a timestamp at microsecond precision. // See: https://github.com/apache/spark/blob/f310f4fcc95580a6824bc7d22b76006f79b8804a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L492 // // For sub-second truncations (second, millisecond, microsecond), timezone @@ -133,13 +133,13 @@ impl ScalarUDFImpl for SparkDateTrunc { // Sub-second truncations don't need timezone adjustment (_, "second" | "millisecond" | "microsecond") => ts_expr, - // Timestamp with timezone: convert to local time (applying session tz if needed) - (DataType::Timestamp(_, Some(_)), _) => { - let ts_with_tz = match &session_tz { - Some(tz) => ts_expr.cast_to( + // Timestamp with timezone: convert to session timezone, strip timezone and convert back to original timezone + (DataType::Timestamp(unit, tz), _) => { + let ts_expr = match &session_tz { + Some(session_tz) => ts_expr.cast_to( &DataType::Timestamp( TimeUnit::Microsecond, - Some(Arc::from(tz.as_str())), + Some(Arc::from(session_tz.as_str())), ), info.schema(), )?, @@ -147,8 +147,9 @@ impl ScalarUDFImpl for SparkDateTrunc { }; Expr::ScalarFunction(ScalarFunction::new_udf( datafusion_functions::datetime::to_local_time(), - vec![ts_with_tz], + vec![ts_expr], )) + .cast_to(&DataType::Timestamp(*unit, tz.clone()), info.schema())? } // Timestamp without timezone: use as-is diff --git a/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt b/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt index 705c320934fe8..7fc1583bb9310 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/date_trunc.slt @@ -138,7 +138,7 @@ SELECT date_trunc('test', '2015-03-05T09:32:05.123456'); query P SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, Some("UTC"))')); ---- -2024-07-15T00:00:00 +2024-07-15T00:00:00Z query P SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, None)')); @@ -153,7 +153,7 @@ SET datafusion.execution.time_zone = 'America/New_York'; query P SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, Some("UTC"))')); ---- -2024-07-14T00:00:00 +2024-07-14T00:00:00Z query P SELECT date_trunc('DAY', arrow_cast(timestamp '2024-07-15T03:30:00', 'Timestamp(Microsecond, None)')); From a6cbe76203ac6c3a9b35e06082a05f3a67367d6c Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 17 Jan 2026 11:51:33 +0400 Subject: [PATCH 9/9] fix: improve error handling for DATE_TRUNC function when second argument is not a Timestamp --- datafusion/spark/src/function/datetime/date_trunc.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/datetime/date_trunc.rs b/datafusion/spark/src/function/datetime/date_trunc.rs index 81e58a4e1c78f..2199c90703b38 100644 --- a/datafusion/spark/src/function/datetime/date_trunc.rs +++ b/datafusion/spark/src/function/datetime/date_trunc.rs @@ -133,7 +133,7 @@ impl ScalarUDFImpl for SparkDateTrunc { // Sub-second truncations don't need timezone adjustment (_, "second" | "millisecond" | "microsecond") => ts_expr, - // Timestamp with timezone: convert to session timezone, strip timezone and convert back to original timezone + // convert to session timezone, strip timezone and convert back to original timezone (DataType::Timestamp(unit, tz), _) => { let ts_expr = match &session_tz { Some(session_tz) => ts_expr.cast_to( @@ -152,8 +152,12 @@ impl ScalarUDFImpl for SparkDateTrunc { .cast_to(&DataType::Timestamp(*unit, tz.clone()), info.schema())? } - // Timestamp without timezone: use as-is - _ => ts_expr, + _ => { + return plan_err!( + "Second argument of `DATE_TRUNC` must be Timestamp, got {}", + ts_type + ); + } }; let fmt_expr = Expr::Literal(ScalarValue::new_utf8(fmt), None);