Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,37 @@ trait SparkDateTimeUtils {
microsToInstant(v.epochMicros).plusNanos(v.nanosWithinMicro.toLong)
}

/**
* Converts a `TIMESTAMP_LTZ(p)` nanosecond value into the `TIMESTAMP_NTZ(precision)` wall-clock
* value observed in the time zone `zoneId`. The LTZ value denotes an absolute instant;
* rendering it as a local date-time at `zoneId` yields the NTZ representation. Time-zone
* offsets shift only whole seconds, so the sub-microsecond `nanosWithinMicro` component is
* preserved before being floored to the target `precision` (same flooring as same-family
* narrowing casts).
*/
def timestampLTZNanosToNTZNanos(
v: TimestampNanosVal,
zoneId: ZoneId,
precision: Int): TimestampNanosVal = {
val localDateTime = timestampNanosToInstant(v).atZone(zoneId).toLocalDateTime
localDateTimeToTimestampNanos(localDateTime, precision)
}

/**
* Converts a `TIMESTAMP_NTZ(q)` nanosecond value into the `TIMESTAMP_LTZ(precision)` instant
* obtained by interpreting its wall-clock local date-time in the time zone `zoneId`. This is
* the reverse of [[timestampLTZNanosToNTZNanos]]; the sub-microsecond `nanosWithinMicro`
* component is preserved across the (whole-second) offset shift before being floored to the
* target `precision`.
*/
def timestampNTZNanosToLTZNanos(
v: TimestampNanosVal,
zoneId: ZoneId,
precision: Int): TimestampNanosVal = {
val instant = timestampNanosToLocalDateTime(v).atZone(zoneId).toInstant
instantToTimestampNanos(instant, precision)
}

/**
* Converts the local date to the number of days since 1970-01-01.
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,23 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkNullCast(DateType, TimestampLTZNanosType(p))
checkNullCast(TimestampLTZNanosType(p), DateType)
}
// Same-family cross-precision nanos casts.
// Same-family and cross-family cross-precision nanos casts.
for {
p1 <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION
p2 <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION
} {
checkNullCast(TimestampNTZNanosType(p1), TimestampNTZNanosType(p2))
checkNullCast(TimestampLTZNanosType(p1), TimestampLTZNanosType(p2))
checkNullCast(TimestampLTZNanosType(p1), TimestampNTZNanosType(p2))
checkNullCast(TimestampNTZNanosType(p1), TimestampLTZNanosType(p2))
}
// Cross-family casts that involve the precision-6 micro family member (TIMESTAMP_LTZ(6) =
// TIMESTAMP and TIMESTAMP_NTZ(6) = TIMESTAMP_NTZ) and the other family's nanosecond member.
foreachNanosPrecision { p =>
checkNullCast(TimestampType, TimestampNTZNanosType(p))
checkNullCast(TimestampNTZNanosType(p), TimestampType)
checkNullCast(TimestampNTZType, TimestampLTZNanosType(p))
checkNullCast(TimestampLTZNanosType(p), TimestampNTZType)
}
checkNullCast(StringType, BinaryType)
checkNullCast(StringType, BooleanType)
Expand Down Expand Up @@ -791,6 +801,61 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("cross-family nanos cast: admissibility, store-assignment and up-cast contract") {
for {
p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION
q <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION
} {
val ltz = TimestampLTZNanosType(p)
val ntz = TimestampNTZNanosType(q)
// Explicit casts are allowed in both directions, both in ANSI and non-ANSI modes.
assert(Cast.canCast(ltz, ntz))
assert(Cast.canCast(ntz, ltz))
assert(Cast.canAnsiCast(ltz, ntz))
assert(Cast.canAnsiCast(ntz, ltz))
// The cross-family reinterpretation against the session zone is never a safe up-cast.
assert(!Cast.canUpCast(ltz, ntz))
assert(!Cast.canUpCast(ntz, ltz))
// They stay explicit-only: never silent store assignments (mirroring the other nanos casts).
assert(!Cast.canANSIStoreAssign(ltz, ntz))
assert(!Cast.canANSIStoreAssign(ntz, ltz))
// The conversion depends on the session time zone in both directions.
assert(Cast.needsTimeZone(ltz, ntz))
assert(Cast.needsTimeZone(ntz, ltz))
// Reinterpreting a non-null instant/local date-time never yields null, so the cast is
// null-safe in both directions (mirroring the micro TIMESTAMP <-> TIMESTAMP_NTZ pair).
assert(!Cast.forceNullable(ltz, ntz))
assert(!Cast.forceNullable(ntz, ltz))
}
}

test("cross-family nanos cast: micro boundary (precision 6) admissibility and store contract") {
// TIMESTAMP_LTZ(6) = TIMESTAMP and TIMESTAMP_NTZ(6) = TIMESTAMP_NTZ, so the precision-6
// cross-family casts are the mixed micro/nanos pairs covered here.
foreachNanosPrecision { p =>
val pairs = Seq(
(TimestampType: DataType, TimestampNTZNanosType(p): DataType), // LTZ(6) -> NTZ(p)
(TimestampNTZNanosType(p): DataType, TimestampType: DataType), // NTZ(p) -> LTZ(6)
(TimestampNTZType: DataType, TimestampLTZNanosType(p): DataType),// NTZ(6) -> LTZ(p)
(TimestampLTZNanosType(p): DataType, TimestampNTZType: DataType))// LTZ(p) -> NTZ(6)
pairs.foreach { case (from, to) =>
// Explicit casts are allowed (ANSI and non-ANSI), but are never safe up-casts and never
// silent store assignments, and they depend on the session time zone.
assert(Cast.canCast(from, to))
assert(Cast.canAnsiCast(from, to))
assert(!Cast.canUpCast(from, to))
assert(!Cast.canANSIStoreAssign(from, to))
assert(Cast.needsTimeZone(from, to))
// Null-safe like the micro TIMESTAMP <-> TIMESTAMP_NTZ pair.
assert(!Cast.forceNullable(from, to))
}
}
// Sanity: the all-micro TIMESTAMP <-> TIMESTAMP_NTZ pair (precision 6 <-> 6) stays a silent
// store assignment, unlike the mixed micro/nanos pairs above.
assert(Cast.canANSIStoreAssign(TimestampType, TimestampNTZType))
assert(Cast.canANSIStoreAssign(TimestampNTZType, TimestampType))
}

test("SPARK-40389: canUpCast: return false if casting decimal to integral types can cause" +
" overflow") {
Seq(ByteType, ShortType, IntegerType, LongType).foreach { integralType =>
Expand Down Expand Up @@ -1398,6 +1463,186 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("cross-family nanos cast: timestamp_ltz to timestamp_ntz") {
// LTZ(p) denotes an absolute instant; LTZ(p) -> NTZ(q) renders it as the wall-clock local
// date-time observed in the session zone (mirroring the micro TIMESTAMP -> TIMESTAMP_NTZ
// conversion on the epoch-micros part) and re-floors the sub-microsecond digits to q.
Seq(UTC, LA).foreach { zone =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LA cases are at non-transition instants. Since the NTZ→LTZ path resolves DST via LocalDateTime.atZone(zone) (same resolver as micro convertTz), consider adding a gap case (2020-03-08 02:30 America/Los_Angeles, a non-existent local time → java.time shifts forward) and a fall-back overlap (2020-11-01 01:30 → earlier offset) for NTZ↔LTZ, asserting the result matches the corresponding micro TIMESTAMP_NTZ↔TIMESTAMP cast. Optionally also assert(!Cast.forceNullable(from, to)) in the contract test, mirroring the null-safe micro pair.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both addressed in 1d2b271.

Added a dedicated test cross-family nanos cast: DST gap and overlap resolve like the micro cast covering the LA spring-forward gap (2020-03-08 02:30 -> shifted forward) and fall-back overlap (2020-11-01 01:30 -> earlier offset). For each p, q in [7, 9] it asserts the nanos NTZ(q) -> LTZ(p) and LTZ(p) -> NTZ(q) results match the production micro TIMESTAMP_NTZ <-> TIMESTAMP cast on the epoch-micros part (the expected micro values are obtained via evaluateWithoutCodegen, so the parity is against the real cast rather than a re-derivation).

Also added assert(!Cast.forceNullable(from, to)) to both cross-family contract tests (the nanos<->nanos one and the precision-6 micro-boundary one), mirroring the null-safe micro pair.

val zid = Option(zone.getId)
val instants = Seq(
timestampLTZ(2020, 1, 1, 12, 30, 15, 123456789),
// Pre-epoch: epochMicros is negative, the sub-microsecond part stays in [0, 999].
timestampLTZ(1969, 12, 31, 23, 59, 59, 999999789))
instants.foreach { instant =>
val srcMicros = instantToNanosVal(instant).epochMicros
val ntzMicros = convertTz(srcMicros, UTC, zone)
for {
p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION
q <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION
} {
// The source already floors to p; the cross-family cast then floors to q, i.e. to
// min(p, q). Only the wall-clock epoch-micros part shifts with the zone.
val srcNanos = floorNanosToPrecision(789, p)
val expectedNanos = floorNanosToPrecision(srcNanos, q)
checkEvaluation(
cast(Literal.create(nanosVal(srcMicros, srcNanos), TimestampLTZNanosType(p)),
TimestampNTZNanosType(q), zid),
nanosVal(ntzMicros, expectedNanos))
checkEvaluation(
cast(Literal.create(null, TimestampLTZNanosType(p)), TimestampNTZNanosType(q), zid),
null)
}
}
// Zone sensitivity (exercises needsTimeZone): in LA the wall-clock micros differ from UTC.
if (zone == LA) {
assert(convertTz(instantToNanosVal(instants.head).epochMicros, UTC, zone) !=
instantToNanosVal(instants.head).epochMicros)
}
}
}

test("cross-family nanos cast: timestamp_ntz to timestamp_ltz") {
// NTZ(q) denotes a wall-clock local date-time; NTZ(q) -> LTZ(p) interprets it in the session
// zone to obtain the absolute instant (mirroring micro TIMESTAMP_NTZ -> TIMESTAMP on the
// epoch-micros part) and re-floors the sub-microsecond digits to p.
Seq(UTC, LA).foreach { zone =>
val zid = Option(zone.getId)
val localDateTimes = Seq(
LocalDateTime.of(2020, 1, 1, 12, 30, 15, 123456789),
LocalDateTime.of(1969, 12, 31, 23, 59, 59, 999999789))
localDateTimes.foreach { localDateTime =>
val srcMicros = localDateTimeToNanosVal(localDateTime).epochMicros
val ltzMicros = convertTz(srcMicros, zone, UTC)
for {
p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION
q <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION
} {
val srcNanos = floorNanosToPrecision(789, q)
val expectedNanos = floorNanosToPrecision(srcNanos, p)
checkEvaluation(
cast(Literal.create(nanosVal(srcMicros, srcNanos), TimestampNTZNanosType(q)),
TimestampLTZNanosType(p), zid),
nanosVal(ltzMicros, expectedNanos))
checkEvaluation(
cast(Literal.create(null, TimestampNTZNanosType(q)), TimestampLTZNanosType(p), zid),
null)
}
}
if (zone == LA) {
assert(convertTz(localDateTimeToNanosVal(localDateTimes.head).epochMicros, zone, UTC) !=
localDateTimeToNanosVal(localDateTimes.head).epochMicros)
}
}
}

test("cross-family nanos cast: round-trip ltz -> ntz -> ltz preserves the instant") {
// Round-tripping through the wall-clock NTZ form and back to LTZ in the same zone is the
// identity at equal precision (no sub-microsecond digits are dropped).
Seq(UTC, LA).foreach { zone =>
val zid = Option(zone.getId)
foreachNanosPrecision { p =>
val src = nanosVal(
instantToNanosVal(timestampLTZ(2020, 7, 1, 6, 15, 30, 123456789)).epochMicros,
floorNanosToPrecision(789, p))
checkEvaluation(
cast(
cast(Literal.create(src, TimestampLTZNanosType(p)), TimestampNTZNanosType(p), zid),
TimestampLTZNanosType(p), zid),
src)
}
}
}

test("cross-family nanos cast: DST gap and overlap resolve like the micro cast") {
// The cross-family nanos casts reuse the same java.time resolver as the micro
// TIMESTAMP <-> TIMESTAMP_NTZ casts (NTZ -> LTZ interprets the wall clock via
// LocalDateTime.atZone; LTZ -> NTZ renders the instant via Instant.atZone). Exercise the LA
// spring-forward gap (02:30 does not exist -> java.time shifts forward) and the fall-back
// overlap (01:30 occurs twice -> earlier offset), asserting the nanos result matches the
// corresponding micro cast on the epoch-micros part. The existing zone tests above use
// non-transition instants, so this is the only place DST resolution is observable.
val la = Option(LA.getId)
Seq(
LocalDateTime.of(2020, 3, 8, 2, 30, 0, 123456789), // inside the PST -> PDT gap
LocalDateTime.of(2020, 11, 1, 1, 30, 0, 123456789)) // inside the PDT -> PST overlap
.foreach { ldt =>
// NTZ stores the wall-clock fields as epoch-micros-at-UTC, exactly like micro TIMESTAMP_NTZ.
val ntzMicros = localDateTimeToNanosVal(ldt).epochMicros
// The expected LTZ instant is whatever the micro TIMESTAMP_NTZ -> TIMESTAMP cast produces;
// for the gap this is the forward-shifted instant, for the overlap the earlier offset.
val microLtz = evaluateWithoutCodegen(
cast(Literal.create(ntzMicros, TimestampNTZType), TimestampType, la)).asInstanceOf[Long]
// The reverse micro TIMESTAMP -> TIMESTAMP_NTZ of that instant (for the gap, the wall clock
// is the shifted 03:30, not the original 02:30).
val microNtz = evaluateWithoutCodegen(
cast(Literal.create(microLtz, TimestampType), TimestampNTZType, la)).asInstanceOf[Long]
for {
p <- TimestampLTZNanosType.MIN_PRECISION to TimestampLTZNanosType.MAX_PRECISION
q <- TimestampNTZNanosType.MIN_PRECISION to TimestampNTZNanosType.MAX_PRECISION
} {
// NTZ(q) -> LTZ(p): the epoch-micros part resolves DST exactly like the micro cast.
val ntzToLtzSrc = floorNanosToPrecision(789, q)
checkEvaluation(
cast(Literal.create(nanosVal(ntzMicros, ntzToLtzSrc), TimestampNTZNanosType(q)),
TimestampLTZNanosType(p), la),
nanosVal(microLtz, floorNanosToPrecision(ntzToLtzSrc, p)))
// LTZ(p) -> NTZ(q): rendering the resolved instant matches the micro cast too.
val ltzToNtzSrc = floorNanosToPrecision(789, p)
checkEvaluation(
cast(Literal.create(nanosVal(microLtz, ltzToNtzSrc), TimestampLTZNanosType(p)),
TimestampNTZNanosType(q), la),
nanosVal(microNtz, floorNanosToPrecision(ltzToNtzSrc, q)))
}
}
}

test("cross-family nanos cast: micro family member (precision 6) to/from nanos") {
// The precision-6 micro family members are TIMESTAMP_LTZ(6) = TIMESTAMP and
// TIMESTAMP_NTZ(6) = TIMESTAMP_NTZ. Casting them across families reinterprets the value against
// the session zone (mirroring the all-micro TIMESTAMP <-> TIMESTAMP_NTZ conversion); micro
// targets carry a zero sub-microsecond part and micro sources contribute none.
Seq(UTC, LA).foreach { zone =>
val zid = Option(zone.getId)
// Micro LTZ instant (sub-micro digits are not representable in the micro type).
val ltzMicros = instantToMicros(timestampLTZ(2020, 1, 1, 12, 30, 15, 123456000))
val ntzMicrosForLtz = convertTz(ltzMicros, UTC, zone)
// Micro NTZ wall-clock value.
val ntzMicros =
localDateTimeToNanosVal(LocalDateTime.of(2020, 1, 1, 12, 30, 15, 123456000)).epochMicros
val ltzMicrosForNtz = convertTz(ntzMicros, zone, UTC)
foreachNanosPrecision { p =>
// TIMESTAMP (LTZ(6)) -> TIMESTAMP_NTZ(p): wall clock in the session zone, sub-micro = 0.
checkEvaluation(
cast(Literal.create(ltzMicros, TimestampType), TimestampNTZNanosType(p), zid),
nanosVal(ntzMicrosForLtz, 0))
// TIMESTAMP_NTZ (NTZ(6)) -> TIMESTAMP_LTZ(p): instant of the wall clock, sub-micro = 0.
checkEvaluation(
cast(Literal.create(ntzMicros, TimestampNTZType), TimestampLTZNanosType(p), zid),
nanosVal(ltzMicrosForNtz, 0))
// TIMESTAMP_NTZ(p) -> TIMESTAMP (LTZ(6)): drops the sub-microsecond digits before the zone
// reinterpretation. A non-zero nanosWithinMicro on the source proves the truncation.
checkEvaluation(
cast(Literal.create(nanosVal(ntzMicros, floorNanosToPrecision(789, p)),
TimestampNTZNanosType(p)), TimestampType, zid),
convertTz(ntzMicros, zone, UTC))
// TIMESTAMP_LTZ(p) -> TIMESTAMP_NTZ (NTZ(6)): drops the sub-microsecond digits likewise.
checkEvaluation(
cast(Literal.create(nanosVal(ltzMicros, floorNanosToPrecision(789, p)),
TimestampLTZNanosType(p)), TimestampNTZType, zid),
convertTz(ltzMicros, UTC, zone))
// Null input in all four directions.
checkEvaluation(
cast(Literal.create(null, TimestampType), TimestampNTZNanosType(p), zid), null)
checkEvaluation(
cast(Literal.create(null, TimestampNTZType), TimestampLTZNanosType(p), zid), null)
checkEvaluation(
cast(Literal.create(null, TimestampNTZNanosType(p)), TimestampType, zid), null)
checkEvaluation(
cast(Literal.create(null, TimestampLTZNanosType(p)), TimestampNTZType, zid), null)
}
}
}

test("SPARK-57323: cast between date and timestamp_ntz with nanosecond precision") {
// NTZ casts use a fixed UTC wall-clock grid, independent of the session time zone.
val date = LocalDate.of(2020, 1, 1)
Expand Down
Loading