From 98e7de507ea17c8af8deb7a46e0e32d0168e798e Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Tue, 16 Jun 2026 07:34:15 +0200 Subject: [PATCH] feat(reader): lazy DateTimeParts reassembly via LazyDateTimePartsLongArray MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pre-existing DateTimePartsEncodingDecoder returned a generic GenericArray wrapping the three children (days, seconds, subseconds) but no consumer in the extension-decode path (ExtensionStorage, TimestampExtensionDecoder, DateExtensionDecoder) knew how to reassemble that shape back into the epoch count their accessors expect. The path was effectively dead at scan time — the encoder tests round-tripped the children individually but never reconstructed an epoch. Add LazyDateTimePartsLongArray (record, implements LongArray) that holds the three children plus the precomputed unitsPerDay / unitsPerSecond multipliers and reassembles on demand: getLong(i) = days[i] * unitsPerDay + seconds[i] * unitsPerSecond + subseconds[i] DateTimePartsArrays (package-private) centralises the per-row read so each child can use whichever signed-integer ptype the encoder picked (Byte / Short / Int / Long Array, optionally wrapped in MaskedArray). DateTimePartsEncodingDecoder parses the parent Extension dtype's TimeUnit metadata byte, computes unitsPerSecond = TimeUnit.divisor() (falling back to 1 for the Days unit, whose seconds and subseconds children are zero) and unitsPerDay = 86_400 × unitsPerSecond, then constructs the lazy record. No buffer allocation, no per-row copy. Now the extension-decode pipeline composes correctly: scanning a vortex.datetimeparts-encoded column under a vortex.timestamp extension produces a LongArray of reassembled epoch counts, which feeds into TimestampExtensionDecoder.instant exactly like a Materialized child. Updated DateTimePartsEncodingEncoderTest to assert the reassembled epoch value instead of the (now hidden) per-child structure — the behaviour the encoder is actually guaranteeing. 3 new unit tests in LazyDateTimePartsLongArrayTest cover the millisecond reassembly, widening from narrower child ptypes, and the fold reduction. ./mvnw verify green (13 modules, integration suite 40s). Co-Authored-By: Claude Opus 4.7 --- .../reader/array/DateTimePartsArrays.java | 40 ++++++++ .../array/LazyDateTimePartsLongArray.java | 67 +++++++++++++ .../decode/DateTimePartsEncodingDecoder.java | 37 +++++++- .../array/LazyDateTimePartsLongArrayTest.java | 95 +++++++++++++++++++ .../DateTimePartsEncodingEncoderTest.java | 56 +++-------- 5 files changed, 250 insertions(+), 45 deletions(-) create mode 100644 reader/src/main/java/io/github/dfa1/vortex/reader/array/DateTimePartsArrays.java create mode 100644 reader/src/main/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArray.java create mode 100644 reader/src/test/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArrayTest.java diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/array/DateTimePartsArrays.java b/reader/src/main/java/io/github/dfa1/vortex/reader/array/DateTimePartsArrays.java new file mode 100644 index 00000000..d48380f6 --- /dev/null +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/array/DateTimePartsArrays.java @@ -0,0 +1,40 @@ +package io.github.dfa1.vortex.reader.array; + +import io.github.dfa1.vortex.core.VortexException; + +/// Package-private helper for the {@link LazyDateTimePartsLongArray} record. +/// +/// {@code days}, {@code seconds} and {@code subseconds} children can each be one of +/// the four signed-integer typed array interfaces; the writer picks the narrowest +/// ptype that fits. {@link #readLong(Array, long)} centralises the per-row read so +/// the record itself stays compact. +final class DateTimePartsArrays { + + private DateTimePartsArrays() { + } + + /// Reads {@code arr[i]} as a signed long. Recurses through {@link MaskedArray}; + /// throws on null cells so callers don't silently get garbage for nullable + /// columns. + /// + /// @param arr source typed Array + /// @param i row index + /// @return cell value as long + /// @throws VortexException for null cells or unsupported array types + static long readLong(Array arr, long i) { + return switch (arr) { + case ByteArray a -> a.getByte(i); + case ShortArray a -> a.getShort(i); + case IntArray a -> a.getInt(i); + case LongArray a -> a.getLong(i); + case MaskedArray a -> { + if (!a.isValid(i)) { + throw new VortexException("DateTimeParts: null cell at index " + i); + } + yield readLong(a.inner(), i); + } + default -> throw new VortexException( + "DateTimeParts: unsupported child array type: " + arr.getClass().getSimpleName()); + }; + } +} diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArray.java b/reader/src/main/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArray.java new file mode 100644 index 00000000..afca7850 --- /dev/null +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArray.java @@ -0,0 +1,67 @@ +package io.github.dfa1.vortex.reader.array; + +import io.github.dfa1.vortex.core.DType; + +import java.util.function.LongBinaryOperator; +import java.util.function.LongConsumer; + +/// Lazy {@code vortex.datetimeparts} reassembly as a {@link LongArray}. +/// +/// The encoding splits each raw epoch count into three children — {@code days}, +/// {@code seconds} (within the day) and {@code subseconds} (within the second). +/// Reconstruction is +/// +/// ``` +/// raw = days * unitsPerDay + seconds * unitsPerSecond + subseconds +/// ``` +/// +/// where {@code unitsPerSecond} = `TimeUnit.divisor()` and +/// {@code unitsPerDay} = `86_400 * unitsPerSecond`. The reassembled long carries the +/// same epoch count the downstream extension decoder +/// ({@code TimestampExtensionDecoder}, {@code DateExtensionDecoder}, etc.) expects; +/// no buffer materialisation occurs at construction time. +/// +/// The record's {@link #dtype()} is the parent Extension dtype (e.g. +/// {@code vortex.timestamp}) so it slots transparently into the extension-decode +/// pipeline. Children may be any signed integer typed Array +/// ({@link ByteArray}/{@link ShortArray}/{@link IntArray}/{@link LongArray}); the +/// per-row {@link DateTimePartsArrays#readLong} switch handles widening. +/// +/// @param dtype logical element type (typically a {@code DType.Extension}) +/// @param length total logical row count +/// @param daysArr per-row signed days +/// @param secondsArr per-row signed seconds within the day +/// @param subsecondsArr per-row signed sub-second count +/// @param unitsPerDay multiplier for the days component (= 86_400 × unitsPerSecond) +/// @param unitsPerSecond multiplier for the seconds component (= unit divisor) +public record LazyDateTimePartsLongArray( + DType dtype, long length, + Array daysArr, Array secondsArr, Array subsecondsArr, + long unitsPerDay, long unitsPerSecond) + implements LongArray { + + @Override + public long getLong(long i) { + return DateTimePartsArrays.readLong(daysArr, i) * unitsPerDay + + DateTimePartsArrays.readLong(secondsArr, i) * unitsPerSecond + + DateTimePartsArrays.readLong(subsecondsArr, i); + } + + @Override + public void forEachLong(LongConsumer c) { + long n = length; + for (long i = 0; i < n; i++) { + c.accept(getLong(i)); + } + } + + @Override + public long fold(long identity, LongBinaryOperator op) { + long acc = identity; + long n = length; + for (long i = 0; i < n; i++) { + acc = op.applyAsLong(acc, getLong(i)); + } + return acc; + } +} diff --git a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/DateTimePartsEncodingDecoder.java b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/DateTimePartsEncodingDecoder.java index 92337462..ad6ee3c4 100644 --- a/reader/src/main/java/io/github/dfa1/vortex/reader/decode/DateTimePartsEncodingDecoder.java +++ b/reader/src/main/java/io/github/dfa1/vortex/reader/decode/DateTimePartsEncodingDecoder.java @@ -3,18 +3,27 @@ import io.github.dfa1.vortex.core.DType; import io.github.dfa1.vortex.core.PType; import io.github.dfa1.vortex.core.VortexException; -import io.github.dfa1.vortex.reader.array.Array; -import io.github.dfa1.vortex.reader.array.GenericArray; import io.github.dfa1.vortex.encoding.EncodingId; +import io.github.dfa1.vortex.encoding.TimeUnit; import io.github.dfa1.vortex.proto.DateTimePartsMetadata; +import io.github.dfa1.vortex.reader.array.Array; +import io.github.dfa1.vortex.reader.array.LazyDateTimePartsLongArray; import java.io.IOException; import java.lang.foreign.MemorySegment; import java.nio.ByteBuffer; /// Read-only decoder for {@code vortex.datetimeparts}. +/// +/// Reassembles the three children (days, seconds, subseconds) into a +/// {@link LazyDateTimePartsLongArray} of epoch counts in the extension's +/// {@link TimeUnit}. No per-row materialisation happens at decode time — +/// the downstream extension decoder reads the reassembled long via the +/// lazy {@code getLong} accessor. public final class DateTimePartsEncodingDecoder implements EncodingDecoder { + private static final long SECONDS_PER_DAY = 86_400L; + /// Public no-arg constructor required by {@link java.util.ServiceLoader}. public DateTimePartsEncodingDecoder() { } @@ -52,7 +61,27 @@ public Array decode(DecodeContext ctx) { Array seconds = ctx.decodeChild(1, new DType.Primitive(secondsPtype, false), ctx.rowCount()); Array subseconds = ctx.decodeChild(2, new DType.Primitive(subsecondsPtype, false), ctx.rowCount()); - return new GenericArray(ctx.dtype(), ctx.rowCount(), new MemorySegment[0], - new Array[]{days, seconds, subseconds}); + if (!(ctx.dtype() instanceof DType.Extension ext)) { + throw new VortexException(EncodingId.VORTEX_DATETIMEPARTS, + "expected Extension dtype, got " + ctx.dtype()); + } + long unitsPerSecond = readUnitsPerSecond(ext); + long unitsPerDay = SECONDS_PER_DAY * unitsPerSecond; + + return new LazyDateTimePartsLongArray(ctx.dtype(), ctx.rowCount(), + days, seconds, subseconds, unitsPerDay, unitsPerSecond); + } + + /// Returns {@code TimeUnit.divisor()} for the extension's declared time unit, or + /// {@code 1} when the unit is {@link TimeUnit#Days} (days carry no sub-second + /// component; seconds and subseconds children are expected to be zero). + private static long readUnitsPerSecond(DType.Extension ext) { + ByteBuffer extMeta = ext.metadata(); + if (extMeta == null || !extMeta.hasRemaining()) { + throw new VortexException(EncodingId.VORTEX_DATETIMEPARTS, + "extension " + ext.extensionId() + " missing TimeUnit metadata byte"); + } + TimeUnit unit = TimeUnit.fromTag(extMeta.get(extMeta.position())); + return unit == TimeUnit.Days ? 1L : unit.divisor(); } } diff --git a/reader/src/test/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArrayTest.java b/reader/src/test/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArrayTest.java new file mode 100644 index 00000000..6b851812 --- /dev/null +++ b/reader/src/test/java/io/github/dfa1/vortex/reader/array/LazyDateTimePartsLongArrayTest.java @@ -0,0 +1,95 @@ +package io.github.dfa1.vortex.reader.array; + +import io.github.dfa1.vortex.core.DType; +import io.github.dfa1.vortex.core.PType; +import org.junit.jupiter.api.Test; + +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; + +import static org.assertj.core.api.Assertions.assertThat; + +/// Unit tests for {@link LazyDateTimePartsLongArray}. Verifies the +/// {@code days * unitsPerDay + seconds * unitsPerSecond + subseconds} +/// reassembly across the supported time units, and the widening read path +/// that lets each child use whatever signed-integer ptype the encoder picked. +class LazyDateTimePartsLongArrayTest { + + private static final DType I64 = new DType.Primitive(PType.I64, false); + private static final DType I32 = new DType.Primitive(PType.I32, false); + // Decoder constructs records carrying the parent Extension dtype; use I64 here + // as a stand-in since the record never inspects dtype semantics directly. + + @Test + void millisecondsReassembly() { + // Given 2 rows of arbitrary (days, seconds_in_day, subseconds) for ms unit. + // unitsPerSecond = 1_000; unitsPerDay = 86_400_000. + // Row 0: days=20_000 -> 2024-12-31 area, seconds=12345, subseconds=678 -> 1_728_012_345_678 + try (Arena arena = Arena.ofConfined()) { + LongArray days = longArray(arena, 20_000L, 0L); + LongArray seconds = longArray(arena, 12_345L, 0L); + LongArray subseconds = longArray(arena, 678L, 0L); + long unitsPerSecond = 1_000L; + long unitsPerDay = 86_400L * unitsPerSecond; + + var sut = new LazyDateTimePartsLongArray(I64, 2, + days, seconds, subseconds, unitsPerDay, unitsPerSecond); + + assertThat(sut.getLong(0)).isEqualTo( + 20_000L * unitsPerDay + 12_345L * unitsPerSecond + 678L); + assertThat(sut.getLong(1)).isZero(); + } + } + + @Test + void widensFromNarrowerChildPtypes() { + // Days as I32, seconds as I32, subseconds as I64 — encoder is free to pick. + try (Arena arena = Arena.ofConfined()) { + IntArray days = intArray(arena, 1); + IntArray seconds = intArray(arena, 2); + LongArray subseconds = longArray(arena, 3L); + long ups = 1_000_000_000L; // nanos + long upd = 86_400L * ups; + + var sut = new LazyDateTimePartsLongArray(I64, 1, + days, seconds, subseconds, upd, ups); + + assertThat(sut.getLong(0)).isEqualTo(1L * upd + 2L * ups + 3L); + } + } + + @Test + void foldSumsAllRows() { + try (Arena arena = Arena.ofConfined()) { + LongArray days = longArray(arena, 1L, 2L, 3L); + LongArray seconds = longArray(arena, 0L, 0L, 0L); + LongArray subseconds = longArray(arena, 0L, 0L, 0L); + long ups = 1L; + long upd = 86_400L; + + var sut = new LazyDateTimePartsLongArray(I64, 3, + days, seconds, subseconds, upd, ups); + + long sum = sut.fold(0L, Long::sum); + // 1*86400 + 2*86400 + 3*86400 = 6*86400 + assertThat(sum).isEqualTo(6L * upd); + } + } + + private static LongArray longArray(Arena arena, long... vs) { + MemorySegment seg = arena.allocate(vs.length * 8L, 8); + for (int i = 0; i < vs.length; i++) { + seg.setAtIndex(ValueLayout.JAVA_LONG, i, vs[i]); + } + return new MaterializedLongArray(I64, vs.length, seg.asReadOnly()); + } + + private static IntArray intArray(Arena arena, int... vs) { + MemorySegment seg = arena.allocate(vs.length * 4L, 4); + for (int i = 0; i < vs.length; i++) { + seg.setAtIndex(ValueLayout.JAVA_INT, i, vs[i]); + } + return new MaterializedIntArray(I32, vs.length, seg.asReadOnly()); + } +} diff --git a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/DateTimePartsEncodingEncoderTest.java b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/DateTimePartsEncodingEncoderTest.java index 93fd0804..bb38a293 100644 --- a/writer/src/test/java/io/github/dfa1/vortex/writer/encode/DateTimePartsEncodingEncoderTest.java +++ b/writer/src/test/java/io/github/dfa1/vortex/writer/encode/DateTimePartsEncodingEncoderTest.java @@ -2,7 +2,6 @@ import io.github.dfa1.vortex.core.DType; import io.github.dfa1.vortex.core.PType; -import io.github.dfa1.vortex.reader.array.GenericArray; import io.github.dfa1.vortex.reader.array.LongArray; import io.github.dfa1.vortex.reader.decode.ArrayNode; import io.github.dfa1.vortex.encoding.DTypes; @@ -104,15 +103,10 @@ void roundTrip_milliseconds_preservesDaysSecondsSubseconds() { MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new); DecodeContext ctx = new DecodeContext( toArrayNode(result.rootNode()), EXT_TIMESTAMP_MS, 1, bufs, REGISTRY, Arena.global()); - GenericArray decoded = (GenericArray) DECODER.decode(ctx); + LongArray decoded = (LongArray) DECODER.decode(ctx); assertThat(decoded.length()).isEqualTo(1); - LongArray days = (LongArray) decoded.child(0); - LongArray seconds = (LongArray) decoded.child(1); - LongArray subseconds = (LongArray) decoded.child(2); - assertThat(days.getLong(0)).isEqualTo(1L); - assertThat(seconds.getLong(0)).isEqualTo(3723L); - assertThat(subseconds.getLong(0)).isEqualTo(456L); + assertThat(decoded.getLong(0)).isEqualTo(ts); } @Test @@ -126,14 +120,9 @@ void roundTrip_nanoseconds_preservesSubsecondPrecision() { MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new); DecodeContext ctx = new DecodeContext( toArrayNode(result.rootNode()), EXT_TIMESTAMP_NS, 1, bufs, REGISTRY, Arena.global()); - GenericArray decoded = (GenericArray) DECODER.decode(ctx); - - LongArray days = (LongArray) decoded.child(0); - LongArray seconds = (LongArray) decoded.child(1); - LongArray subseconds = (LongArray) decoded.child(2); - assertThat(days.getLong(0)).isEqualTo(1L); - assertThat(seconds.getLong(0)).isEqualTo(3723L); - assertThat(subseconds.getLong(0)).isEqualTo(456_789_123L); + LongArray decoded = (LongArray) DECODER.decode(ctx); + + assertThat(decoded.getLong(0)).isEqualTo(ts); } @Test @@ -145,14 +134,9 @@ void roundTrip_epoch_allZero() { MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new); DecodeContext ctx = new DecodeContext( toArrayNode(result.rootNode()), EXT_TIMESTAMP_MS, 1, bufs, REGISTRY, Arena.global()); - GenericArray decoded = (GenericArray) DECODER.decode(ctx); - - LongArray days = (LongArray) decoded.child(0); - LongArray seconds = (LongArray) decoded.child(1); - LongArray subseconds = (LongArray) decoded.child(2); - assertThat(days.getLong(0)).isEqualTo(0L); - assertThat(seconds.getLong(0)).isEqualTo(0L); - assertThat(subseconds.getLong(0)).isEqualTo(0L); + LongArray decoded = (LongArray) DECODER.decode(ctx); + + assertThat(decoded.getLong(0)).isZero(); } @Test @@ -165,17 +149,12 @@ void roundTrip_multipleTimestamps_allRowsPreserved() { MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new); DecodeContext ctx = new DecodeContext( toArrayNode(result.rootNode()), EXT_TIMESTAMP_MS, 4, bufs, REGISTRY, Arena.global()); - GenericArray decoded = (GenericArray) DECODER.decode(ctx); + LongArray decoded = (LongArray) DECODER.decode(ctx); assertThat(decoded.length()).isEqualTo(4); - LongArray days = (LongArray) decoded.child(0); - assertThat(days.getLong(0)).isEqualTo(0L); - assertThat(days.getLong(1)).isEqualTo(1L); - assertThat(days.getLong(2)).isEqualTo(1L); - assertThat(days.getLong(3)).isEqualTo(1L); - LongArray subseconds = (LongArray) decoded.child(2); - assertThat(subseconds.getLong(2)).isEqualTo(0L); - assertThat(subseconds.getLong(3)).isEqualTo(1L); + for (int i = 0; i < timestamps.length; i++) { + assertThat(decoded.getLong(i)).as("row %d", i).isEqualTo(timestamps[i]); + } } @ParameterizedTest @@ -189,14 +168,9 @@ void roundTrip_allUnits_epochIsZero(TimeUnit unit) { MemorySegment[] bufs = result.buffers().toArray(MemorySegment[]::new); DecodeContext ctx = new DecodeContext( toArrayNode(result.rootNode()), dtype, 1, bufs, REGISTRY, Arena.global()); - GenericArray decoded = (GenericArray) DECODER.decode(ctx); - - LongArray days = (LongArray) decoded.child(0); - LongArray seconds = (LongArray) decoded.child(1); - LongArray subseconds = (LongArray) decoded.child(2); - assertThat(days.getLong(0)).isZero(); - assertThat(seconds.getLong(0)).isZero(); - assertThat(subseconds.getLong(0)).isZero(); + LongArray decoded = (LongArray) DECODER.decode(ctx); + + assertThat(decoded.getLong(0)).isZero(); } @Test