Skip to content

Streaming MEOS-parity harness and NebulaStream adapter#44

Draft
estebanzimanyi wants to merge 286 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/nebula-streaming-parity-harness
Draft

Streaming MEOS-parity harness and NebulaStream adapter#44
estebanzimanyi wants to merge 286 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/nebula-streaming-parity-harness

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

@estebanzimanyi estebanzimanyi commented May 22, 2026

The streaming-platform counterpart of MobilityDB's cross-type parity methodology (doc/methodology/cross_type_parity.md) and audit harness (tools/parity_audit/). It measures, per streaming platform (NebulaStream, Flink, Kafka), how many streamable MEOS functions are backed by a passing test.

A function counts only at the proven layer: exported in the pinned libmeos (nm -D), wired by an operator that calls it, and exercised by a passing test. The reference surface is the streamable MEOS public API derived from meos-idl.json; io-meta, sequence-only, ambiguous, and internal functions are reason-marked non-streamable.

This branch is the integration-evidence and benchmark vehicle for the topical series #15#43: it folds them together so the systest suite and the BerlinMOD benchmark run on the combined state before the individual PRs land. It is not itself a merge target; the individual PRs are the deliverable.

@estebanzimanyi estebanzimanyi force-pushed the feat/nebula-streaming-parity-harness branch 19 times, most recently from e19e976 to 69dde26 Compare May 23, 2026 15:48
…245->249)

Add the four single-field windowed extent aggregates that fold a scalar field
directly through a MEOS extent transition fn (no trajectory string, no parse)
and serialize via the external typed span wrappers:

  - FLOAT_EXTENT      float_extent_transfn       -> FLOATSPAN  -> floatspan_out
  - INT_EXTENT        int_extent_transfn         -> INTSPAN    -> intspan_out
  - BIGINT_EXTENT     bigint_extent_transfn      -> BIGINTSPAN -> bigintspan_out
  - TIMESTAMPTZ_EXTENT timestamptz_extent_transfn -> TSTZSPAN   -> tstzspan_out

New PHYSICAL_CPP_SCALARFOLD template reuses the tnumber (value, ts) HPP / ctor /
lift / combine / reset / cleanup verbatim; only lower() differs — the Span state
threads across events as an opaque pointer (NULL initial -> span_make on first,
span_expand in place after; one allocation, freed after serialize). Logical
layer + 2-arg parser glue + optimizer lowering reused unchanged
(final_stamp_type=VARSIZED). TIMESTAMPTZ_EXTENT converts the epoch field to
TimestampTz arithmetically.

Per [internal vs external API] the typed *span_out wrappers are used, not the
Datum-generic span_out (meos_internal.h); the operators include only meos.h.

Locally compile-verified (build_local.sh, EXIT=0). A systest per operator
exercises it end-to-end (rides CI's sanitizer/leak matrix); expected span text
captured from a faithful MEOS probe — note MEOS canonicalizes integer spans to
half-open upper+1 ([9,18]->[9,19), [9e8,1.8e9]->[...,1800000001)).

Feed: float/int/bigint/timestamptz_extent_transfn now wired (249/1945).
…249->304)

Wire the position/topological predicate family between a temporal and an
STBox/TBox query literal, both argument orders:
  - 21 temporal-first  left_tspatial_stbox(temp, box), overlaps_tnumber_tbox, …
  - 34 box-first        above_stbox_tspatial(box, temp), after_tbox_tnumber, …
Operations: left/right/above/below/front/back, before/after, the over*
half-predicates, adjacent/contains/contained/overlaps/same — all bool, over
tgeompoint (tspatial) and tfloat (tnumber).

Three surgical generator changes, reusing the proven per-event box-literal
assembler (no new templates):
  - build_descriptor: map the abstract `tspatial` token -> tgeompoint builder
    (unblocks the 21 tspatial-first; tnumber-first were already wired in W26).
  - build_descriptor.temporal_x_box: accept the box-first form (box, Temporal*),
    taking the box parser from the C arg type (STBox->stbox_in / TBox->tbox_in;
    bare Span* stays suffix-resolved to avoid tstzspan/numspan ambiguity), and
    flag box_first.
  - codegen_nebula.assemble_generic_physical: when box_first, emit the literal
    before the temporal in the MEOS call.

Locally compile-verified (build_local.sh, EXIT=0, 711 targets). Call order
inspection-verified: above_stbox_tspatial(arg0B, temp) vs
above_tspatial_stbox(temp, arg0B). Parser glue reused from W26's box-literal
path. Feed: +55 (304/1945).
…>309)

FLOAT_UNION / INT_UNION / BIGINT_UNION / TIMESTAMPTZ_UNION: collect a window's
values into a deduplicated, sorted Set. Same scalar-fold mechanism as W28 but
the per-event *_union_transfn accumulates a Set state (not a Span), finalized
by set_union_finalfn into the canonical Set, serialized via the external typed
wrappers floatset_out / intset_out / bigintset_out / tstzset_out.

PHYSICAL_CPP_SETFOLD is derived from PHYSICAL_CPP_SCALARFOLD by an asserted
swap of only the serialize lambda (Set state + finalfn); the fold loop / lift /
combine / reset / cleanup stay byte-identical. Descriptor adds fold:"set" +
finalfn. TIMESTAMPTZ_UNION converts the epoch field to TimestampTz.

Locally compile-verified (build_local.sh, EXIT=0). Systest per operator with
probe-captured expected text (dedup+sort confirmed: float {12.5,18,9.25,12.5}
-> {9.25, 12.5, 18}). adapters/nebula.py token regex recognizes *_UNION.
Feed: +5 (4 union transfns + set_union_finalfn) = 309/1945.
…egates

Add a `repeated string literals` slot to SerializableAggregationFunction so an
aggregate's query-literal constants (a windowed box/span/set predicate's
threshold operand; a meeting distance; a vid pair) survive plan serialization
instead of falling back to a hard-coded default.

- grpc/SerializableVariantDescriptor.proto: literals field (tag 5).
- AggregationLogicalFunctionRegistryArguments: std::vector<std::string> literals.
- FunctionSerializationUtil: populate args.literals from the proto in the
  TemporalSequence-shaped deserialize path (every MEOS aggregate uses it).

Additive and behavior-preserving for existing field-only aggregates (no
literals). Foundation for the windowed-extent predicate family (box/span/set
op against a query literal) and a follow-up that closes the PAIR_MEETING.dMeet /
CROSS_DISTANCE (vidA,vidB) round-trip gaps. Locally compile-verified
(build_local.sh, EXIT=0; proto regenerates the literals accessors).
Establish the efficient mechanism for windowed trajectory operators: instead of
a bespoke aggregate per MEOS function (each re-materializing the trajectory),
the per-group mini-trip trajectory is a first-class VARSIZED value (hex-WKB),
and the MEOS function library composes over it as stateless scalar operators —
the trajectory analogue of "all scalar functions compose on a float in the
window". One materialization, free composition; mirrors how the Flink/Kafka
JVM facade exposes the library over MEOS values.

codegen_nebula gains a `wkb_temporal` generic input: the operand is an upstream
VARSIZED hex-WKB MEOS value parsed via temporal_from_hexwkb, not a temporal
rebuilt from per-event scalar fields. Proof operator TPOINT_LENGTH_WKB applies
tpoint_length to such a value; trajectory functions like length are meaningful
only over the full trajectory the materialization provides, not a per-event
instant.

Locally compile-verified (build_local.sh, EXIT=0). Systest feeds a known
trajectory's hex-WKB and asserts its length (probe-confirmed lossless round-trip
temporal_from_hexwkb -> tpoint_length = 0.0131538303422). Feed unchanged
(tpoint_length already wired by the W16 aggregate); this is the composable
realization + the foundation for rolling the library over WKB values, plus a
windowing aggregate that emits the trajectory WKB.
…-WKB value

The value-producing half of the compose-over-values mechanism: a windowed
aggregate that materializes the per-group mini-trip as a SEQUENCE ([...], linear
interpolation) and emits its hex-WKB. This is the trajectory value the MEOS
function library composes over — its output is exactly the input that
TPOINT_LENGTH_WKB (and any wkb_temporal-input operator) consumes.

codegen_aggregations gains return_mode "wkb": derived from the tgeo scalar
template by swapping the empty-window write, the instant-set braces for sequence
brackets, and the finalize (temporal_as_hexwkb instead of extent+serialize) —
same asserted-swap pattern as the box-output mode.

Locally compile-verified (build_local.sh, EXIT=0). Systest asserts the exact
hex-WKB of a 3-point windowed trajectory (probe-matched), which is byte-identical
to the TPOINT_LENGTH_WKB systest input -> the two operators compose end-to-end
(TRAJECTORY_WKB -> tpoint_length = 0.0131538303422). Feed unchanged (value
producer via io-meta temporal_as_hexwkb; the composability substrate, not a
coverage symbol).
…->311)

The MEOS-native streaming aggregation, per the converged design: the aggregate
STATE is a live expandable Temporal* (a mini-trip trajectory) grown in place per
event via appendInstant; lower() applies the invariant MEOS scalar fn DIRECTLY
to the live trajectory — no per-event string build, no parse-the-window, no WKB.
This is how PG MobilityDB aggregates run; WKB is needed only when a value
crosses an operator boundary (the Flink/Kafka checkpointed-state form).

codegen_aggregations gains return_mode "expand" (PHYSICAL_CPP_TGEO_EXPAND): state
= Temporal* slot; lift builds an instant (tgeompoint_in, public) and
temporal_append_tinstant(..., expand=true) — doubles maxcount on append, so
amortized-O(1) without the internal pre-allocator (tsequence_make_exp /
tgeompointinst_in are internal and warrant promotion to the public API — a
MobilityDB MEOS-C follow-up); combine merges via temporal_merge; lower applies f;
cleanup frees. Double-pointer params are non-const in MEOS — cast (TInstant**),
never const.

Proof operator TLENGTH_EXP (tpoint_length over the live mini-trip). Locally
compile-verified (EXIT=0); systest asserts the length, probe-confirmed identical
across the expandable, string-parse, and WKB constructions (0.0131538303422).
Feed +2: temporal_append_tinstant + temporal_merge now wired (the streaming
primitives the expandable accumulator uses, which the PagedVector path did not).

Doc: methodology gains a "Running-aggregation realization" section contrasting
Flink/Kafka (WKB-serialized checkpointed state + JMEOS facade) with NebulaStream
(in-process expandable Temporal* + direct API) — same scope, different state model.
…11->316)

Complete the mechanism toolkit: a value-OUTPUT finalize for the expandable
substrate — f(live mini-trip) -> Temporal* result, serialized to hex-WKB as
VARSIZED (the proven box-output VARSIZED tail). The MEOS library's
Temporal-returning single-temporal transforms become windowed aggregates over
the expandable trajectory, the per-event path could not emit (it only returned
scalars).

codegen_aggregations gains return_mode "expand_wkb" (PHYSICAL_CPP_TGEO_EXPAND_WKB,
derived from the expand template by an asserted swap of only the lower()).
Wires TGEO_CENTROID / TPOINT_AZIMUTH / TPOINT_ANGULAR_DIFFERENCE /
TGEOMPOINT_TO_TGEOMETRY / TEMPORAL_COPY over the windowed mini-trip.

Locally compile-verified (EXIT=0). Systest TEMPORAL_COPY_EXP asserts the
result hex-WKB (probe-confirmed: the expandable tsequence_make+appendInstant
sequence serializes byte-identically to TRAJECTORY_WKB). Feed +5
(tgeo_centroid, tpoint_azimuth, tpoint_angular_difference,
tgeompoint_to_tgeometry, temporal_copy) = 316/1945.
Extend the expandable value-output substrate to the tnumber input shape: the
per-event instant is a tfloat ("value@ts" via tfloat_in), accumulated by
appendInstant into the expandable Temporal*; lower() applies the invariant fn
and serializes the result temporal to hex-WKB. Derived from the tgeo expand-wkb
template by swapping only the ctor + lift (the Temporal*-slot lower / reset /
cleanup / value-output finalize are input-shape-independent).

Wires the tnumber Temporal-returning transforms over the windowed tfloat series:
TNUMBER_ABS / TNUMBER_DELTA_VALUE / TNUMBER_ANGULAR_DIFFERENCE /
TEMPORAL_DERIVATIVE / TEMPORAL_AT_MAX / TEMPORAL_AT_MIN / TEMPORAL_MINUS_MAX /
TEMPORAL_MINUS_MIN.

Locally compile-verified (EXIT=0). Systest TNUMBER_ABS_EXP asserts the
result hex-WKB (probe-confirmed). Feed +8 = 324/1945.
Every generated MEOS windowed-aggregate operator failed at query-plan
deserialization on the worker, so none of their systests actually executed
end-to-end. Three root causes, each surfaced by running the systests against a
locally-built single-node worker:

1. Serialized type vs registry key mismatch. The logical function serialized
   set_type(NAME) with NAME = the SQL token (e.g. "TLENGTH_EXP"), but the
   registry key is the add_plugin target = the PascalCase operator name
   ("TLengthExp"). deserializeWindowAggregationFunction therefore called
   create("TLENGTH_EXP") against a registry keyed by "TLengthExp" and threw
   UnknownLogicalOperator. The optimizer-lowering match (name == "...") had the
   same SQL-token spelling, so even a fixed NAME would not have lowered.
   Fix: NAME and the optimizer match now use the PascalCase name (matching the
   built-in Count/TemporalLength convention); the SQL spelling stays in the
   lexer/parser. codegen normalizes class_name_token = nebula_name so a spec
   value cannot reintroduce the divergence.

2. Two-field (value, timestamp) arity. serializeTemporalSequence only has a
   four-field (lon, lat, ts, as) form, so the tnumber shape packs the value
   field twice [value, ts, value, as]; the registrar required exactly three and
   threw CannotDeserialize ("...got 4"). Fix: the registrar reads four fields
   and uses [0]=value, [1]=ts, [3]=alias (the duplicate [2] is ignored).

3. Union double free. set_union_finalfn pfree()s its state internally and
   returns a new Set; the SETFOLD finalize also called free(state) afterwards,
   crashing the worker with "double free or corruption". Fix: drop the extra
   free; only the finalfn result is freed.

All 14 pre-existing windowed-aggregate systests (extent/union/value-output
families across the tgeo and tnumber shapes) now pass against a local worker.
Three windowed value-output aggregates over a network-constrained mini-trip
grown on the in-process expandable Temporal* (appendInstant), each resolving
npoint route+fraction against the loaded ways network and emitting the result
as hex-WKB:

  - TNPOINT_CUMULATIVE_LENGTH_EXP  tnpoint_cumulative_length
  - TNPOINT_SPEED_EXP              tnpoint_speed
  - TNPOINT_TO_TGEOMPOINT_EXP      tnpoint_to_tgeompoint (network-resolved
                                   spatial trajectory)

The lift reuses the three-field tgeo glue (the three args are rid, frac, ts)
and builds each instant with tnpoint_in; codegen gains an expand_wkb_tnpoint
return mode (PHYSICAL_CPP_TNPOINT_EXPAND_WKB) that swaps the tgeo lift for the
tnpoint lift and adds the meos_npoint.h include. The operators link the same
libmeos as the rest of the engine, whose default ways CSV resolves the routes
at runtime; a systest per operator runs end-to-end against a local worker with
the network loaded, with the exact hex captured from a faithful MEOS probe.

The parity adapter counts a conversion helper (tnpoint_to_tgeompoint) as the
wired op when an operator is named for it (a dedicated conversion), keeping it
plumbing-excluded only when it co-occurs with another streamable call.

Feed: tnpoint_cumulative_length / tnpoint_speed / tnpoint_to_tgeompoint now
wired (327/1945).
parse_actual_rows dropped any result row containing parentheses, which
discarded legitimate MEOS value literals (NPoint(1 0.5), STBOX(...),
{NSegment(...)}) — only English explanatory notes should be skipped, so
gate the paren filter on note-like keywords. Also preserve the spaces in
a trailing "@<timestamp>" (e.g. 2021-01-01 00:00:00+00) that the systest
comparison keeps, and skip the 'Actual Results(Sorted)' header line.

Repoint the default image to mobilitynebula-v12, the canonical pin (v11
span-adjacency + quoted text, plus the tpose_to_tpoint rename).
Generate four restriction operators that intersect/subtract a temporal
network point against a static geometry or an STBox, closing the
tnpoint_at_geom / tnpoint_at_stbox / tnpoint_minus_geom /
tnpoint_minus_stbox parity gaps. Adds the logical and physical
operators, the SQL grammar tokens and dispatch glue, and the recorded
systests.
Regenerate the NebulaStream feed from the green v12 systest run, which
credits the wave-13 tnpoint at/minus geom/stbox operators and the
tpose_to_tpoint conversion as confirmed-callable. NebulaStream L3 rises
1,803 / 1,939 (93.0%), 15 wired-only, 121 not-wired gap.
Ten distance/spatial operators were already generated and wired but
lacked an exercising systest, so they measured as wired-only. Add a
recorded systest for each — nad/nai/shortestline against a static
geometry or another temporal for tnpoint and trgeometry, plus
nad_tnpoint_stbox and tdistance_trgeometry_geo — exercising the existing
operators end-to-end on a local worker against v12 libmeos.

Regenerate the NebulaStream feed: L3 rises to 1,813 / 1,939 (93.5%),
wired-only falls 15 -> 5 (the remainder are sequence-subtype and
windowed-aggregation accessors that a single per-event instant cannot
satisfy).
Generate nine per-event operators over text/textset/tbool/tcbuffer/stbox/
tbox primaries via the generic codegen path: textcat_text_text,
textcat_textset_text, text_to_set, tbool_when_true, tcbuffer_expand,
stbox_expand_space, tfloatbox_expand and tintbox_expand are exercised by
recorded systests; tcbuffer_radius is wired (its return type is corrected
from a set to a tfloat in the next wave). Suite 1807/1807 green under v12.

Regenerate the NebulaStream feed: L3 rises to 1,821 / 1,939 (93.9%),
not-wired gap 121 -> 112.
…e 15)

Generate six per-event operators: nad_trgeometry_stbox, tspatial_to_stbox,
spatialset_srid, and the tgeo/trgeometry/tcbuffer traversed-area family.
The traversed-area operators take MEOS's bool unary_union argument, which
the generic codegen path already supports via a scalar extra-arg of cpp
type bool — no template change needed. Each is exercised by a recorded
systest; suite green under v12 (text_cmp is an intermittent pre-existing
flake, passes on re-run).

Regenerate the NebulaStream feed: L3 rises to 1,827 / 1,939 (94.2%),
not-wired gap 112 -> 106.
Introduce an "arraymake" aggregation fold in codegen_aggregations.py: it
collects a window's scalar values into a malloc'd C array threaded across
the Nautilus invoke ABI (the proven CrossDistance scratch pattern) and
calls the MEOS array constructor once at finalize, serializing the result
via the typed *_out wrapper. Generate floatset_make, intset_make,
bigintset_make, dateset_make and tstzset_make as windowed aggregations,
each exercised by a GROUP BY ... WINDOW systest producing the real set
({1.5,2.5,3.5}, {2020-01-01,...}, etc.). Suite 1818/1818 green under v12.

Regenerate the NebulaStream feed: L3 rises to 1,832 / 1,939 (94.5%),
not-wired gap 106 -> 101.
…ox (wave 17)

Wire the forward STBox conversions to the PostGIS box types: STBOX_TO_BOX3D
(stbox_to_box3d -> BOX3D) and STBOX_TO_GBOX (stbox_to_gbox -> GBOX). The
operand is a VARSIZED stbox text literal (parsed via stbox_in over the
stbox_text generic input); the heap BOX3D*/GBOX* result is serialized via
box3d_out/gbox_out and freed like any other VARSIZED return.

Add the box3d_text_out / gbox_text_out return-kinds to codegen_nebula.py,
mirroring the existing stbox_text_out machinery. Each conversion carries a
systest over a 2D and a Z-dimensioned box, recorded under v12 (a 2D box
yields z=0; the Z box preserves its z range). Suite 1820/1820 green under v12.

Regenerate the NebulaStream feed: L3 rises to 1,834 / 1,939 (94.6%),
not-wired gap 101 -> 99.

The reverse box3d_to_stbox / gbox_to_stbox are not wired here: MEOS exposes
no box3d_in / gbox_in parser to read the operand literal, so they await a
MEOS-side parser before they can be generated.
…ox (wave 18)

Close the box-conversion family with the reverse pair: BOX3D_TO_STBOX
(box3d_to_stbox) and GBOX_TO_STBOX (gbox_to_stbox), both returning an STBox.
The PostGIS box operand is parsed internally by box3d_in / gbox_in -- the new
MEOS text parsers (pin b0742efeed, ecosystem-pin-2026-06-05b) that invert
box3d_out / gbox_out -- and converted to the catalog-registered, WKB-serializable
STBox at the boundary; the raw BOX3D / GBOX never surface to the user.

Add the box3d_text / gbox_text generic primary inputs to codegen_nebula.py
(parse via box3d_in / gbox_in, mirroring the stbox_text input); the result uses
the existing stbox_text_out return-kind. Each conversion carries a systest over
a 2D and a Z-dimensioned box, recorded under v13 (the dev image rebuilt on the
b0742efeed MEOS, which adds box3d_in / gbox_in). Suite 1822/1822 green.

Regenerate the NebulaStream feed: L3 rises to 1,836 / 1,939 (94.7%), not-wired
gap 99 -> 97. The box-conversion family (forward stbox_to_box3d / stbox_to_gbox
in wave 17, reverse here) is complete.
…_interp (wave 20)

Wire TEMPORAL_SUBTYPE (temporal_subtype) and TEMPORAL_INTERP (temporal_interp),
returning a temporal value's subtype (Instant/Sequence/...) and interpolation
(None/Discrete/Step/Linear) as their static catalog names.

Add a const_char_out return-kind to codegen_nebula.py: a const char* MEOS result
that is a STATIC catalog string is copied out (strdup) and NEVER freed, unlike
the heap-owned VARSIZED returns. This is the first of the now-public
meos_catalog.h type-reflection surface (the catalog enum publication the Spark
binding requested); the enum-argument forms (geo_basetype, *_name, ...) follow.

Each accessor carries a systest over a tfloat instant (subtype Instant, interp
None), recorded under v13. Suite 1824/1824 green; the NebulaStream feed L3 rises
to 1,838 / 1,939 (94.8%), not-wired gap 97 -> 95.
Wire the now-public meos_catalog.h type-reflection functions that the
Spark binding requested, as per-event scalar operators over an INT32
field cast to the catalog enum:

  five MeosType -> bool predicates
    GEO_BASETYPE, SPATIAL_BASETYPE, BASETYPE_BYVALUE,
    BASETYPE_VARLENGTH, TYPE_SPAN_BBOX
  four enum -> name accessors (const char *, static catalog string)
    MEOSOPER_NAME, GEO_TYPENAME, TEMPSUBTYPE_NAME, INTERPTYPE_NAME

codegen_nebula.py gains the catalog enum primary inputs (meostype_base /
meosoper_base / tempsubtype_base / interptype_base -> the MeosType /
MeosOper / tempSubtype / interpType enum cast); the const_char_out
return kind added in wave 20 carries the name accessors.

Each operator is exercised by a recorded systest on genuine catalog
values (MEOSOPER_NAME(EQ_OP)='=', MEOSOPER_NAME(ADJACENT_OP)='-|-',
GEO_TYPENAME Point/Polygon, INTERPTYPE_NAME Linear/Discrete,
TEMPSUBTYPE_NAME Instant/Sequence, TYPE_SPAN_BBOX over T_INTSPAN/
T_GEOMETRY). Full meos suite 1833/1833 green under the v13 image.

Streaming parity: NebulaStream 1,838 -> 1,847 / 1,939 confirmed
callable (94.8% -> 95.3%), not-wired gap 95 -> 86.
Close the windowed sequence accessors and three deferred per-event
operators in one pass.

Sequence accessors (tgeompoint windowed aggregations). The window's
point stream is built as a CONTINUOUS sequence ([...], linear interp)
rather than the discrete instant set ({...}) the other tgeo aggregates
build, because the MEOS sequence accessors reject a discrete sequence
("Input must be a temporal continuous sequence (set)"). A new
codegen_aggregations fold pair drives them:
  tgeoseq      TEMPORAL_NUM_SEQUENCES  (int; fixes the prior wired-only
               operator, which built a discrete set and could never pass)
  tgeoseqtext  TEMPORAL_START_SEQUENCE / TEMPORAL_END_SEQUENCE /
               TEMPORAL_SEQUENCE_N    (a TSequence* serialized via
               tspatial_out and emitted as VARSIZED)

Per-event algebra (gen_algebra_wave classifier extensions):
  TDISTANCE_TRGEOMETRY_TRGEOMETRY  two-temporal trgeometry -> tfloat
               (tdistance -> tfloat_out in the two-temporal classifier)
  TNPOINT_AT_NPOINTSET / TNPOINT_MINUS_NPOINTSET  tnpoint restricted by
               a network-point Set parsed via npointset_in

Each operator is exercised by a recorded systest on genuine MEOS
values; the four aggregations run as windowed GROUP BY ... WINDOW
TUMBLING. Full meos suite 1840/1840 green under the v13 image.

Streaming parity: NebulaStream 1,847 -> 1,854 / 1,939 confirmed
callable (95.3% -> 95.6%), wired-only 6 -> 5, gap 86 -> 80.
…wave 23)

Close the object-element set constructors textset_make, cbufferset_make,
npointset_make, poseset_make and geoset_make as windowed aggregations: each
collects a window of VARSIZED object literals, parses every element via the
type's *_in into a heap pointer, builds the set with *set_make over the
pointer array, and serializes via the type's *set_out (spatialset_as_text for
geometries). A new objarraymake fold in codegen_aggregations.py reuses the
numeric arraymake scaffold with an element-parse lift.

The five operators are confirmed callable end-to-end on a local worker
(1845/1845 systests green); the streaming-parity feed and assessment move
NebulaStream to 1,859 / 1,939 (95.9%).
…on (wave 24)

Close the geometry-returning array constructors geo_collect_garray,
geo_makeline_garray, geom_array_union and cbufferarr_to_geom as windowed
aggregations: each collects a window of VARSIZED geometry (or cbuffer)
literals, parses every element via geom_in / cbuffer_in into a heap pointer,
builds a single GSERIALIZED from the pointer array via the constructor, and
serializes via geo_as_text. These reuse the objarraymake fold unchanged — the
template already parameterizes the make return type, argument type and
serializer, so only a descriptor is added.

The four operators are confirmed callable end-to-end on a local worker
(1849/1849 systests green); the streaming-parity feed and assessment move
NebulaStream to 1,863 / 1,939 (96.1%).
The temporal-output path serialized results with temporal_as_hexwkb(res, 0,
...), whose variant 0 is plain WKB and drops the SRID: a tgeompoint round-trip
through temporal_from_hexwkb came back with SRID 0 instead of its original
(e.g. 4326). Switch the serializer to the WKB_EXTENDED variant (0x04), which
carries the SRID; temporal_from_hexwkb already reads EWKB transparently, so the
input side is unchanged.

The fix is applied at the source in codegen_nebula.py and
codegen_aggregations.py and across the 61 generated temporal-output operators.
Only spatial temporals change output (non-spatial tnumber/tbool have no SRID,
so EWKB equals WKB); the four affected systests (tgeo_centroid,
tnpoint_tcentroid_transfn, tnpoint_to_tgeompoint_exp, temporal_copy_exp) are
re-recorded and now carry the SRID (4326, 5676). Full suite 1849/1849 green.
Close six streamable MEOS functions, lifting NebulaStream parity from
1,863 to 1,869 / 1,939 (96.4%):

- minus_date_date / minus_timestamptz_timestamptz: date and timestamp
  difference, over the date_base / timestamptz_base scalar primaries.
- box3d_make / gbox_make: PostGIS BOX3D / GBOX constructors from their
  bounds, via the box3d_text_out / gbox_text_out return-kinds.
- tcbuffer_make / tpose_make: temporal circular-buffer and temporal pose
  constructors from a temporal point and a temporal float, over the two
  hex-WKB temporal operand shape (EWKB output preserves the SRID).

All six are generated by the existing build_generic machinery (no
generator change) and exercised by a passing systest each; the full
MEOS suite stays green.
… op (wave 26)

Close six more streamable MEOS functions, lifting NebulaStream parity
from 1,869 to 1,875 / 1,939 (96.7%):

- add_date_int / minus_date_int: shift a date by a number of days. The
  MEOS call returns a DateADT value (not a heap pointer), serialized via
  the new date_out value-return kind (VALUE_OUT_RETURNS) — a small,
  reusable generator addition for value-typed returns.
- set_spans / spanset_spans / spanset_spanarr: the span array of a set /
  span set, over the existing array_out machinery (span_val for the two
  contiguous-array returns, ptr for the pointer-array return).
- dwithin_cbuffer_cbuffer: whether two circular buffers are within a
  distance — the first operator over the new cbuffer object-literal
  primary (cbuffer_in), with the second Cbuffer as a box-kind extra. The
  same addition registers pose/npoint object-literal primaries, unlocking
  the remaining static-object operators.

These symbols were always present in libmeos (the CBUFFER/POSE/NPOINT
gates are on); the gap was a missing generator input-shape, now added.
Each op is exercised by a passing systest; the full MEOS suite is green.
Close three more streamable MEOS functions over the static-object
primaries added in wave 26, lifting NebulaStream parity from 1,875 to
1,878 / 1,939 (96.9%):

- cbuffer_set_srid / pose_set_srid: set the SRID of a circular buffer /
  pose. These are void in-place mutators, serialized via the new self_out
  return-kind: the mutated primary is rendered with its own *_out and then
  freed (the op supplies self_out_fn).
- pose_orientation: the orientation quaternion of a 3D pose, returned as a
  double array via the existing array_out machinery over the pose primary.

Each op is exercised by a passing systest; the full MEOS suite is green.
@estebanzimanyi estebanzimanyi force-pushed the feat/nebula-streaming-parity-harness branch from 75344de to 3cb5ff3 Compare June 6, 2026 05:12
…t_to_tgeometry (wave 28)

Lift NebulaStream parity from 1,878 to 1,884 / 1,939 (97.2%), and remove a
modeling irregularity.

Regularize tgeompoint_to_tgeometry. It had been modeled as a windowed
aggregation (TgeompointToTgeometryAggregation) reached through a bespoke
three-argument parser case — a per-event conversion masquerading as an
aggregate, which could never pass (building a linear sequence then
converting raises "tgeometry cannot have linear interpolation"), so it
stayed wired-only. Remove that operator, its bespoke parser glue, and its
optimizer-lowering branch; the token now dispatches through the uniform
codegen glue to the existing per-event TgeompointToTgeometry conversion
(tgeompoint_in -> tgeompoint_to_tgeometry -> tspatial_as_text), exactly
like its proven siblings trgeometry_to_tpoint/tpose/tinstant. It is now
exercised by a passing systest.

New single-temporal -> Temporal* operators via two generator additions:
- temporal_wkb return-kind: serialize a Temporal* result through
  temporal_as_hexwkb (WKB_EXTENDED, SRID kept) in the flexible varsized
  assembler, so the operand may be any input_type (a per-event temporal,
  a hex temporal, or a geometry primary with a temporal extra).
- wkb_temporal as a varsized extra-arg (a temporal operand carried as a
  hex field), for the geometry-first signatures.
Closes tgeometry_to_tcbuffer, tdwithin_geo_tgeo, tdwithin_geo_tcbuffer,
tpoint_at_elevation, tpoint_minus_elevation.

The full MEOS suite is green; the wired-only set is now purely the
structural sequence accessors.
…span_n (wave 29)

Close five more streamable MEOS functions, lifting NebulaStream parity
from 1,884 to 1,889 / 1,939 (97.4%):

- tpoint_from_base_temp / tgeo_from_base_temp: a temporal point / geo
  holding a fixed geometry value over a temporal's time frame — a geometry
  primary with the time-frame temporal as a hex (wkb_temporal) extra,
  returning a Temporal* via temporal_wkb.
- geo_pointarr / geo_stboxes: the point array of a (multi)point and the
  spatiotemporal-box array of a (multi)line, over the existing array_out
  machinery (ptr for GSERIALIZED**, span_val for the contiguous STBox*).
- spanset_span_n: the n-th span of a span set (intspan_text).

Each op is exercised by a passing systest; the full MEOS suite is green.

tcbuffer_radius stays wired-only: it returns a radius Set only for a
temporal sequence (a per-event instant raises "tfloat is not a set
type"), so it is sequence-only like the other structural accessors.
Close two more streamable MEOS functions, lifting NebulaStream parity
from 1,889 to 1,891 / 1,939 (97.5%):

- trgeometry_instant_n: the n-th instant of a temporal rigid geometry,
  over the per-event trgeometry primary, returned via temporal_wkb.
- textcat_text_textset: a text concatenated before each element of a text
  set — over the new text object-literal primary (text_in) with the text
  set as a box-kind extra (textset_in), returned via textset_text.

Each op is exercised by a passing systest; the full MEOS suite is green.
…l serializer (wave 31)

New `tgeoseqarray` aggregation fold: a windowed continuous sequence feeds a
`T**`-returning accessor whose elements are each serialized to SRID-preserving
EWKB hex (temporal_as_hexwkb, 0x04) and brace-joined. Closes temporal_segments
and tpoint_make_simple — 1893/1938 confirmed callable (97.7%).

Re-pin to ecosystem-pin-2026-06-06d (a37a23a672): the now-internal
trgeometry_restrict_value(Datum) leaves the streamable universe (1939->1938);
the bindable trgeometry_restrict_values(Set) stays.

Fix the aggregation parser-injector: it inserted the case-switch / funcName-chain
glue right after the last marker even when that marker was the final block inside
a `#if CBUFFER ... #endif` family guard, nesting generic glue in the guard and
orphaning the close (`#endif without #if`). It now skips past a trailing family
`#endif` so generic blocks land at switch scope, mirroring the function-op
injector.
… 32)

temporal_sequences returns the array of continuous sequences of a windowed
trajectory, so it is a windowed aggregation, not a per-event function: the
per-event form can never be satisfied by a single instant. Replace the
TemporalSequences per-event logical/physical function with a
TemporalSequencesAggregation built on the tgeoseqarray serializer fold
(collect the window's lon/lat/ts into a continuous point sequence, then
serialize each element via temporal_as_hexwkb as EWKB hex), mirroring
temporal_segments and tpoint_make_simple.

Parser dispatch moves from the function case to the windowed-aggregation
case-switch + funcName chain; the optimizer lowers TemporalSequences to the
new physical aggregation. The TEMPORAL_SEQUENCES g4 token is reused.

Streaming parity: temporal_sequences flips wired-only -> confirmed callable,
1,894 / 1,938 (97.7%). Full MEOS systest suite green.
…ave 34)

The trgeometry sequence accessors take a temporal rigid geometry, which a
streaming engine materializes as a windowed aggregation. Add a new `tpose`
input_shape to the aggregation generator: it collects a window of
(x, y, theta, ts) into a continuous tpose sequence, parses it via tpose_in,
converts it to a trgeometry with geo_tpose_to_trgeometry over a unit-square
reference geometry (the established per-event trgeometry path), then applies
the accessor and serializes each element as EWKB hex — the array-of-temporal
serializer used by temporal_segments/sequences.

The tpose shape mirrors the tgeo shape with four data fields instead of
three; serializeTemporalSequence gains a 5-field overload that packs
(y, theta, ts) as extras, which the count-agnostic parser round-trips.

Closes trgeometry_instants, trgeometry_segments and trgeometry_sequences
(wired -> confirmed callable): 1,897 / 1,938 (97.9%). Full MEOS systest
suite green. The single-element accessors (sequence_n/start/end_sequence)
reuse this builder with a one-element finalize variant in a follow-up.
… (wave 35)

trgeometry_start_sequence and trgeometry_end_sequence return the first/last
sequence of a temporal rigid geometry, which a streaming engine materializes
as a windowed aggregation — the per-event form can never be satisfied by a
single instant. Replace the per-event TrgeometryStart/EndSequence functions
with windowed aggregations on the wave-34 tpose builder, adding a one-element
finalize variant (fold tposeseq1): the accessor returns a single TSequence*,
serialized directly to EWKB hex (no array loop).

Parser dispatch for both reused tokens moves from the function case to the
windowed-aggregation glue, and the optimizer lowers them to the new physical
aggregations.

Closes trgeometry_start_sequence and trgeometry_end_sequence (wired-only ->
confirmed callable): 1,899 / 1,938 (98.0%), one wired-only remaining. Full
MEOS systest suite green.
… 36)

tpoint_direction(const Temporal*, double* out) returns the azimuth of a
temporal point through an out-parameter, gated by a validity bool. Add an
out_param_scalar value_compute to the windowed tgeo aggregation: seed the
result to zero and let the call fill it, so a false flag leaves zero. This
reuses the proven tgeo windowed builder with no new template, and the shape
generalizes to other bool-gated out-param scalar accessors.

Closes tpoint_direction: 1,900 / 1,938 (98.0%). Full MEOS systest suite green.
…min_dist (wave 37)

Several essential MEOS aggregations take constant (non-field) arguments
alongside the windowed sequence — a tolerance, an interval, an interpolation
mode. Add general constant-argument support to the windowed-aggregation
generator: each constant rides as a literal string carried through the
SerializableAggregationFunction literals (serializeConstArgs / parseConstArgs,
mirroring how the extra window fields are packed), reconstructed by the
registrar, threaded by the optimizer into the physical function, and parsed in
the finalize to its C type (double/int/bool/Interval/TimestampTz/interpType/Set)
before the MEOS call. The transform fold serializes the resulting Temporal* as
EWKB hex.

Closes temporal_simplify_min_dist (a windowed simplify with a distance
tolerance): 1,901 / 1,938 (98.1%). Full MEOS systest suite green. The capability
generalizes to temporal_tsample/stops/segm_duration and (with a tpose branch)
trgeometry_sequence_n/restrict_values/set_interp.
…ve 38)

Three windowed temporal transforms that take constant arguments, riding the
wave-37 constant-args capability with no new machinery — only a wave-input
spec listing each op's constants in signature order:
- temporal_tsample(interval, timestamptz, interptype)
- temporal_stops(double, interval)
- temporal_segm_duration(interval, bool, bool)

The finalize parses each literal (interval_in / timestamptz_in / (interpType)
atoi / atof / bool) and applies the MEOS call. Harden the transform finalize
to cast the result to Temporal*, since temporal_stops/segm_duration return a
TSequenceSet* (a layout-compatible Temporal subtype that needs an explicit
C++ cast).

1,904 / 1,938 (98.2%). Full MEOS systest suite green.
…st fold (wave 39)

Add a tposetransform fold combining the windowed tpose builder (4 fields
x/y/theta/ts -> tpose sequence -> geo_tpose_to_trgeometry) with the
constant-args mechanism, so trgeometry transforms that take a constant close
with a wave-input spec:
- trgeometry_sequence_n(int i)         -> the i-th sequence
- trgeometry_set_interp(interpType)    -> reinterpolated trgeometry
- trgeometry_restrict_values(Set, bool) -> generated and wired

trgeometry_restrict_values stays wired-only: MEOS rejects every constructible
set type for it (a geomset raises "mixed temporal and set types", a poseset
raises "must be of type geomset") — a contradictory/un-constructible set-type
validation relayed to MEOS. trgeometry_set_interp is exercised with linear
interpolation (step/discrete are not valid for a rigid geometry).

1,906 / 1,938 (98.3%), 2 wired-only. Full MEOS systest suite green.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant