From bb7ddc81c72e0e68f13248e549bbe38bf9b7658c Mon Sep 17 00:00:00 2001 From: Vignesh <149236000+vigneshsiva11@users.noreply.github.com> Date: Mon, 26 Jan 2026 04:59:10 +0000 Subject: [PATCH 1/4] Fix datediff array length mismatch for dictionary-backed timestamps --- .../src/datetime_funcs/date_diff.rs | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/date_diff.rs b/native/spark-expr/src/datetime_funcs/date_diff.rs index 6a593f0f87..8a7350116d 100644 --- a/native/spark-expr/src/datetime_funcs/date_diff.rs +++ b/native/spark-expr/src/datetime_funcs/date_diff.rs @@ -16,6 +16,7 @@ // under the License. use arrow::array::{Array, Date32Array, Int32Array}; +use arrow::compute::cast; use arrow::compute::kernels::arity::binary; use arrow::datatypes::DataType; use datafusion::common::{utils::take_function_args, DataFusionError, Result}; @@ -71,9 +72,22 @@ impl ScalarUDFImpl for SparkDateDiff { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let [end_date, start_date] = take_function_args(self.name(), args.args)?; - // Convert scalars to arrays for uniform processing - let end_arr = end_date.into_array(1)?; - let start_arr = start_date.into_array(1)?; + // Determine target length (broadcast scalars to column length) + let len = match (&end_date, &start_date) { + (ColumnarValue::Array(a), _) => a.len(), + (_, ColumnarValue::Array(a)) => a.len(), + _ => 1, + }; + + // Convert both arguments to arrays of the same length + let end_arr = end_date.into_array(len)?; + let start_arr = start_date.into_array(len)?; + + // Normalize dictionary arrays (important for Iceberg) + let end_arr = arrow::compute::cast(&end_arr, &DataType::Date32) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + let start_arr = arrow::compute::cast(&start_arr, &DataType::Date32) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; let end_date_array = end_arr .as_any() @@ -97,8 +111,4 @@ impl ScalarUDFImpl for SparkDateDiff { Ok(ColumnarValue::Array(Arc::new(result))) } - - fn aliases(&self) -> &[String] { - &self.aliases - } } From 3783308e4817542be135871f24b08218ae669796 Mon Sep 17 00:00:00 2001 From: Vignesh <149236000+vigneshsiva11@users.noreply.github.com> Date: Mon, 26 Jan 2026 14:20:52 +0000 Subject: [PATCH 2/4] fix(spark-expr): remove unused import in datediff --- native/spark-expr/src/datetime_funcs/date_diff.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/spark-expr/src/datetime_funcs/date_diff.rs b/native/spark-expr/src/datetime_funcs/date_diff.rs index 8a7350116d..c39d2095e0 100644 --- a/native/spark-expr/src/datetime_funcs/date_diff.rs +++ b/native/spark-expr/src/datetime_funcs/date_diff.rs @@ -16,7 +16,6 @@ // under the License. use arrow::array::{Array, Date32Array, Int32Array}; -use arrow::compute::cast; use arrow::compute::kernels::arity::binary; use arrow::datatypes::DataType; use datafusion::common::{utils::take_function_args, DataFusionError, Result}; From 5da8d7716571a4017529ae3c73a40c76a20d4ad5 Mon Sep 17 00:00:00 2001 From: Vignesh <149236000+vigneshsiva11@users.noreply.github.com> Date: Thu, 29 Jan 2026 15:38:35 +0000 Subject: [PATCH 3/4] test(spark): add regression test for datediff on dictionary-encoded timestamps --- .../comet/ParquetDatetimeRebaseSuite.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala index bdb4a9d4b1..f991aaf451 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala @@ -116,6 +116,33 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { } } + test("COMET-XXXX: datediff works with dictionary-encoded timestamp columns") { + withTempPath { path => + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, + CometConf.COMET_ENABLED.key -> "true", + "spark.sql.parquet.enableDictionary" -> "true") { + val df = spark + .createDataFrame( + Seq( + ("a", java.sql.Timestamp.valueOf("2024-01-02 10:00:00")), + ("b", java.sql.Timestamp.valueOf("2024-01-03 11:00:00")))) + .toDF("id", "ts") + + df.write.mode("overwrite").parquet(path.getAbsolutePath) + + val readDf = spark.read.parquet(path.getAbsolutePath) + + val result = readDf + .selectExpr("datediff(current_date(), ts) as diff") + .collect() + + // Just verify it executes correctly (no CometNativeException) + assert(result.length == 2) + } + } + } + private def checkSparkNoRebaseAnswer(df: => DataFrame): Unit = { var expected: Array[Row] = Array.empty From 1d0fab5be195091c21208b9003c487a2e8758019 Mon Sep 17 00:00:00 2001 From: Vignesh <149236000+vigneshsiva11@users.noreply.github.com> Date: Tue, 3 Feb 2026 13:02:55 +0000 Subject: [PATCH 4/4] test(spark): add regression test for datediff with dictionary-encoded timestamps --- .../org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala index f991aaf451..50ed6844ce 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala @@ -116,7 +116,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { } } - test("COMET-XXXX: datediff works with dictionary-encoded timestamp columns") { + test("datediff works with dictionary-encoded timestamp columns") { withTempPath { path => withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,