Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,20 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer
)
}

test("SPARK-57103: Min/Max accept nanosecond-precision timestamp types and preserve them") {
// Min/Max gate only on orderability (TypeUtils.checkForOrderingExpr), and the nanosecond
// timestamp types are orderable AtomicTypes (SPARK-57103), so the analysis gate passes and the
// result type preserves the input precision (dataType = child.dataType). No inputTypes / type
// matcher is involved, so no production change to Min/Max is needed.
Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(7)).foreach { dt =>
val a = AttributeReference("c", dt)()
assert(Max(a).checkInputDataTypes() == TypeCheckResult.TypeCheckSuccess)
assert(Min(a).checkInputDataTypes() == TypeCheckResult.TypeCheckSuccess)
assert(Max(a).dataType == dt)
assert(Min(a).dataType == dt)
}
}

test("check types for aggregates") {
// We use AggregateFunction directly at here because the error will be thrown from it
// instead of from AggregateExpression, which is the wrapper of an AggregateFunction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,30 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
SELECT max(c), min(c) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC'),
(CAST(NULL AS timestamp_ltz(9))) AS t(c)
-- !query analysis
Aggregate [max(c#x) AS max(c)#x, min(c#x) AS min(c)#x]
+- SubqueryAlias t
+- LocalRelation [c#x]


-- !query
SELECT c, count(*) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC') AS t(c)
GROUP BY c ORDER BY c
-- !query analysis
Sort [c#x ASC NULLS FIRST], true
+- Aggregate [c#x], [c#x, count(1) AS count(1)#xL]
+- SubqueryAlias t
+- LocalRelation [c#x]


-- !query
SELECT unix_timestamp(TIMESTAMP_LTZ '2020-01-01 13:24:35.123456789')
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,30 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
SELECT max(c), min(c) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999'),
(CAST(NULL AS timestamp_ntz(9))) AS t(c)
-- !query analysis
Aggregate [max(c#x) AS max(c)#x, min(c#x) AS min(c)#x]
+- SubqueryAlias t
+- LocalRelation [c#x]


-- !query
SELECT c, count(*) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001') AS t(c)
GROUP BY c ORDER BY c
-- !query analysis
Sort [c#x ASC NULLS FIRST], true
+- Aggregate [c#x], [c#x, count(1) AS count(1)#xL]
+- SubqueryAlias t
+- LocalRelation [c#x]


-- !query
SELECT unix_timestamp(TIMESTAMP_NTZ '2020-01-01 13:24:35.123456789')
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ SELECT TIMESTAMP_LTZ '1960-01-02 03:04:05.123456789 UTC' +
SELECT TIMESTAMP_LTZ '2020-01-02 03:04:05.123456789 UTC' + make_interval(0, 1, 0, 2, 0, 0, 0);
SELECT TIMESTAMP_LTZ '2020-01-02 03:04:05.123456789 UTC' + INTERVAL '1' MONTH;

-- SPARK-57103: MAX / MIN over nanosecond-precision TIMESTAMP_LTZ. The aggregate preserves the
-- nanosecond type and orders by the sub-microsecond remainder; NULLs are ignored. Values are
-- rendered in the session time zone (America/Los_Angeles).
SELECT max(c), min(c) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC'),
(CAST(NULL AS timestamp_ltz(9))) AS t(c);
-- GROUP BY a nanosecond key: two keys that share epochMicros but differ within the microsecond
-- must not collapse into one group.
SELECT c, count(*) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC') AS t(c)
GROUP BY c ORDER BY c;

-- SPARK-57528: unix_timestamp / to_unix_timestamp over nanosecond-precision values. The result is
-- whole-second BIGINT; the sub-second digits are dropped. A literal without an explicit zone is
-- read in the session time zone (America/Los_Angeles, UTC-08:00); an explicit-zone literal fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ SELECT TIMESTAMP_NTZ '1960-01-02 03:04:05.123456789' + INTERVAL '0 00:00:00.0000
SELECT TIMESTAMP_NTZ '2020-01-02 03:04:05.123456789' + make_interval(0, 1, 0, 2, 0, 0, 0);
SELECT TIMESTAMP_NTZ '2020-01-02 03:04:05.123456789' + INTERVAL '1' MONTH;

-- SPARK-57103: MAX / MIN over nanosecond-precision TIMESTAMP_NTZ. The aggregate preserves the
-- nanosecond type and orders by the sub-microsecond remainder (two values share the same
-- microsecond and differ only within it); NULLs are ignored.
SELECT max(c), min(c) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999'),
(CAST(NULL AS timestamp_ntz(9))) AS t(c);
-- GROUP BY a nanosecond key: two keys that share epochMicros but differ within the microsecond
-- must not collapse into one group.
SELECT c, count(*) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001') AS t(c)
GROUP BY c ORDER BY c;

-- SPARK-57528: unix_timestamp / to_unix_timestamp over nanosecond-precision values. The result is
-- whole-second BIGINT; the sub-second digits are dropped and NTZ applies no zone shift, so the
-- wall-clock value is read as the epoch instant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,30 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
SELECT max(c), min(c) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC'),
(CAST(NULL AS timestamp_ltz(9))) AS t(c)
-- !query schema
struct<max(c):timestamp_ltz(9),min(c):timestamp_ltz(9)>
-- !query output
2019-12-31 16:00:00.000000999 2019-12-31 16:00:00.000000001


-- !query
SELECT c, count(*) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC'),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC') AS t(c)
GROUP BY c ORDER BY c
-- !query schema
struct<c:timestamp_ltz(9),count(1):bigint>
-- !query output
2019-12-31 16:00:00.000000001 2
2019-12-31 16:00:00.000000999 1


-- !query
SELECT unix_timestamp(TIMESTAMP_LTZ '2020-01-01 13:24:35.123456789')
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,30 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
SELECT max(c), min(c) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999'),
(CAST(NULL AS timestamp_ntz(9))) AS t(c)
-- !query schema
struct<max(c):timestamp_ntz(9),min(c):timestamp_ntz(9)>
-- !query output
2020-01-01 00:00:00.000000999 2020-01-01 00:00:00.000000001


-- !query
SELECT c, count(*) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999'),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001') AS t(c)
GROUP BY c ORDER BY c
-- !query schema
struct<c:timestamp_ntz(9),count(1):bigint>
-- !query output
2020-01-01 00:00:00.000000001 2
2020-01-01 00:00:00.000000999 1


-- !query
SELECT unix_timestamp(TIMESTAMP_NTZ '2020-01-01 13:24:35.123456789')
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._

/**
* End-to-end tests for the `hour`, `minute` and `second` functions over the nanosecond-precision
* timestamp types `TIMESTAMP_NTZ(p)` / `TIMESTAMP_LTZ(p)` (`p` in `[7, 9]`), part of the
* nanosecond timestamp preview (SPARK-56822). Each test exercises both the SQL path
* (`selectExpr`) and the Scala `Column` API (`functions.hour` / `minute` / `second`).
* End-to-end tests over the nanosecond-precision timestamp types `TIMESTAMP_NTZ(p)` /
* `TIMESTAMP_LTZ(p)` (`p` in `[7, 9]`), part of the nanosecond timestamp preview (SPARK-56822).
* Covers the datetime functions (`hour`/`minute`/`second`, `EXTRACT`/`date_part`, the date-field
* functions) and the `MIN`/`MAX` aggregates (plus `min_by`/`max_by`/`greatest`/`least`). Most
* tests use the SQL path (`selectExpr`); several also cross-check the Scala `Column` API. The two
* subclasses run every test with ANSI mode on and off.
*/
abstract class TimestampNanosFunctionsSuiteBase extends SharedSparkSession {

Expand Down Expand Up @@ -240,6 +242,166 @@ abstract class TimestampNanosFunctionsSuiteBase extends SharedSparkSession {
}
}

// ===== MIN / MAX aggregates over nanosecond-precision timestamps (SPARK-56822) =====

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are MIN/MAX aggregate tests, but they land in a suite whose class Scaladoc (lines 28-33) scopes it to "the hour, minute and second functions". The TimeType precedent this PR explicitly mirrors put its analogous aggregate tests in DataFrameAggregateSuite (SPARK-52626 group-by at :3713, SPARK-52660 codegen-split max/min at :3724) — the dedicated aggregate suite. Consider co-locating these there (or in a dedicated nanos-aggregate suite) so aggregate-over-temporal-type coverage stays discoverable in one place.

There's a legitimate pull toward keeping them here — this suite gives the ANSI on/off matrix, the nanos preview-flag session, and the fixed session timezone for free — so this is a judgment call, not a defect.

Minor, regardless of where they end up: the class Scaladoc is now stale. It still says the suite tests only hour/minute/second and that "Each test exercises both the SQL path (selectExpr) and the Scala Column API (functions.hour / minute / second)" — no longer true now that aggregate tests are present (several use only selectExpr). Please update it (or move the tests).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept them here for the free ANSI on/off matrix, preview-flag session, and fixed timezone. Updated the class Scaladoc.

// `Min`/`Max` are type-agnostic `DeclarativeAggregate`s gated only on orderability
// (`TypeUtils.checkForOrderingExpr`); the nanosecond timestamp types became orderable in
// SPARK-57103, so MIN/MAX (and `min_by`/`max_by`/`greatest`/`least`, which ride the same gate)
// work without any change to the aggregates themselves. These end-to-end tests lock that in,
// mirroring the TimeType precedent (SPARK-52626 group-by, SPARK-52660 codegen split). The result
// type preserves the input precision (`dataType = child.dataType`). Mixed-precision inputs route
// through `findWiderDateTimeType`, which has no nanos arm yet, so they are out of scope here
// (SPARK-57454); every column below is strictly same-precision.

test("SPARK-57103: max/min over nanosecond-precision timestamps preserve the input type") {
Seq(7, 8, 9).foreach { p =>
val schema = new StructType()
.add("ntz", TimestampNTZNanosType(p))
.add("ltz", TimestampLTZNanosType(p))
val data = Seq(
Row(LocalDateTime.parse("2020-01-01T00:00:01.100000000"),
Instant.parse("2020-01-01T00:00:01.100000000Z")),
Row(LocalDateTime.parse("2020-01-01T00:00:02.200000000"),
Instant.parse("2020-01-01T00:00:02.200000000Z")),
Row(LocalDateTime.parse("2020-01-01T00:00:00.300000000"),
Instant.parse("2020-01-01T00:00:00.300000000Z")),
Row(null, null))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
val sqlRes = df.selectExpr("max(ntz)", "min(ntz)", "max(ltz)", "min(ltz)")
val colRes = df.select(
max(col("ntz")), min(col("ntz")), max(col("ltz")), min(col("ltz")))
// The SQL and the Scala Column API agree.
checkAnswer(sqlRes, colRes)
// Absolute values (NTZ collects to LocalDateTime, LTZ to Instant; SPARK-57033).
checkAnswer(sqlRes, Row(
LocalDateTime.parse("2020-01-01T00:00:02.200000000"),
LocalDateTime.parse("2020-01-01T00:00:00.300000000"),
Instant.parse("2020-01-01T00:00:02.200000000Z"),
Instant.parse("2020-01-01T00:00:00.300000000Z")))
// The result keeps both the family (NTZ/LTZ) and the precision of the input.
assert(sqlRes.schema.map(_.dataType) === Seq(
TimestampNTZNanosType(p), TimestampNTZNanosType(p),
TimestampLTZNanosType(p), TimestampLTZNanosType(p)))
}
}

test("SPARK-57103: max/min over nanos order by the sub-microsecond remainder") {
// Two values share the same epochMicros and differ only within the microsecond, so a correct
// result must use the full `TimestampNanosVal` comparison and never truncate to micros.
// Run on both the codegen (`CodeGenerator.genComp` AnyTimestampNanoType arm) and the
// interpreted (`Ordering[TimestampNanosVal]`) paths.
Seq(
Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY"),
Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.CODEGEN_FACTORY_MODE.key -> "NO_CODEGEN")
).foreach { conf =>
withSQLConf(conf: _*) {
val ntz = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row(LocalDateTime.parse("2020-01-01T00:00:00.000000001")),
Row(LocalDateTime.parse("2020-01-01T00:00:00.000000999")),
Row(null))),
new StructType().add("c", TimestampNTZNanosType(9)))
checkAnswer(ntz.selectExpr("max(c)", "min(c)"),
Row(LocalDateTime.parse("2020-01-01T00:00:00.000000999"),
LocalDateTime.parse("2020-01-01T00:00:00.000000001")))

val ltz = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row(Instant.parse("2020-01-01T00:00:00.000000001Z")),
Row(Instant.parse("2020-01-01T00:00:00.000000999Z")),
Row(null))),
new StructType().add("c", TimestampLTZNanosType(9)))
checkAnswer(ltz.selectExpr("max(c)", "min(c)"),
Row(Instant.parse("2020-01-01T00:00:00.000000999Z"),
Instant.parse("2020-01-01T00:00:00.000000001Z")))
}
}
}

test("SPARK-57103: max/min over all-NULL or empty nanos input return NULL") {
Seq(7, 8, 9).foreach { p =>
val ntz = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(null), Row(null))),
new StructType().add("c", TimestampNTZNanosType(p)))
checkAnswer(ntz.selectExpr("max(c)", "min(c)"), Row(null, null))
// Global aggregate over zero rows still produces one all-NULL row.
checkAnswer(ntz.filter(lit(false)).selectExpr("max(c)", "min(c)"), Row(null, null))

val ltz = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(null), Row(null))),
new StructType().add("c", TimestampLTZNanosType(p)))
checkAnswer(ltz.selectExpr("max(c)", "min(c)"), Row(null, null))
}
}

test("SPARK-57103: group by a nanosecond key with per-group max/min") {
// The grouping keys k1/k2 share their epochMicros but differ within the microsecond, so
// hashing/grouping (SPARK-57103) must distinguish sub-microsecond keys; the per-group max/min
// then order by the remainder.
val schema = new StructType()
.add("k", TimestampNTZNanosType(9))
.add("v", TimestampLTZNanosType(9))
val k1 = "2020-01-01T00:00:00.000000001"
val k2 = "2020-01-01T00:00:00.000000002"
val data = Seq(
Row(LocalDateTime.parse(k1), Instant.parse("2020-01-01T10:00:00.000000111Z")),
Row(LocalDateTime.parse(k1), Instant.parse("2020-01-01T10:00:00.000000999Z")),
Row(LocalDateTime.parse(k2), Instant.parse("2020-01-01T10:00:00.000000500Z")))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
val res = df.groupBy("k").agg(max("v").as("mx"), min("v").as("mn")).orderBy("k")
checkAnswer(res, Seq(
Row(LocalDateTime.parse(k1),
Instant.parse("2020-01-01T10:00:00.000000999Z"),
Instant.parse("2020-01-01T10:00:00.000000111Z")),
Row(LocalDateTime.parse(k2),
Instant.parse("2020-01-01T10:00:00.000000500Z"),
Instant.parse("2020-01-01T10:00:00.000000500Z"))))
// The two sub-microsecond-distinct keys do not collapse into one group.
assert(res.count() === 2)
assert(res.schema("k").dataType === TimestampNTZNanosType(9))
}

test("SPARK-57103: min_by/max_by and greatest/least over same-precision nanos") {
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row("early", LocalDateTime.parse("2020-01-01T00:00:00.000000001")),
Row("late", LocalDateTime.parse("2020-01-01T00:00:00.000000999")))),
new StructType().add("label", StringType).add("ts", TimestampNTZNanosType(9)))
checkAnswer(df.selectExpr("max_by(label, ts)", "min_by(label, ts)"), Row("late", "early"))

val df2 = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(
LocalDateTime.parse("2020-01-01T00:00:00.000000001"),
LocalDateTime.parse("2020-01-01T00:00:00.000000999")))),
new StructType()
.add("a", TimestampNTZNanosType(9))
.add("b", TimestampNTZNanosType(9)))
checkAnswer(df2.selectExpr("greatest(a, b)", "least(a, b)"),
Row(LocalDateTime.parse("2020-01-01T00:00:00.000000999"),
LocalDateTime.parse("2020-01-01T00:00:00.000000001")))
}

test("SPARK-57103: max/min over nanos agree with the micros path when sub-micro digits are 0") {
Seq(7, 8, 9).foreach { p =>
val ldts = Seq(
"2020-01-01T00:00:01.100000000",
"2020-01-01T00:00:02.200000000",
"2020-01-01T00:00:00.300000000")
val nanos = spark.createDataFrame(
spark.sparkContext.parallelize(ldts.map(s => Row(LocalDateTime.parse(s)))),
new StructType().add("c", TimestampNTZNanosType(p)))
val micro = spark.createDataFrame(
spark.sparkContext.parallelize(ldts.map(s => Row(LocalDateTime.parse(s)))),
new StructType().add("c", TimestampNTZType))
// Compare via the string rendering so the differing result types (nanos vs micros) do not
// matter; the sub-microsecond digits are all zero, so the values agree.
checkAnswer(
nanos.selectExpr("cast(max(c) as string)", "cast(min(c) as string)"),
micro.selectExpr("cast(max(c) as string)", "cast(min(c) as string)"))
}
}

test("SPARK-57528: unix_timestamp / to_unix_timestamp over nanosecond-precision timestamps") {
// unix_timestamp returns whole-second BIGINT and applies no zone shift to a timestamp
// argument, so the sub-second digits are dropped and the nanos result equals the
Expand Down