Skip to content

Add the eDwithinPairs, tDwithinPairs and aDisjointPairs set-set spatial-join UDFs#20

Open
estebanzimanyi wants to merge 150 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/setset-spatial-join-udfs
Open

Add the eDwithinPairs, tDwithinPairs and aDisjointPairs set-set spatial-join UDFs#20
estebanzimanyi wants to merge 150 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/setset-spatial-join-udfs

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

MobilitySpark exposes the MEOS set-set spatial-join family as thin set-returning UDFs: eDwithinPairs and aDisjointPairs return array<struct<i,j>> and tDwithinPairs returns array<struct<i,j,periods>>, each marshalling both tgeompoint sides into a Temporal*[] like minDistance with a reachabilityFence around the native call, reading the kernel's flattened 0-based index pairs, and freeing the returned buffers. The BerlinMOD Q6/Q10/Q16 cross-join queries express the natural set-set form — array_agg of the trips with a parallel array_agg of the identity, joined only on the grouping keys — and preprocessForSpark maps the portable LATERAL set-returning-function form onto Spark's LATERAL VIEW inline; the vendored libmeos carries the family. The Q6/Q10/Q16 set-set forms run at 661/30979/2034 ms over 1620 trips returning 1/115/42 rows. Consumer of MobilityDB #1148 on the settled pin 8569019b7b; the single new commit builds on the unified-jar and bench foundation, so the diff against main is cumulative until that foundation lands.

Luis Alfredo Leon Villapun and others added 30 commits August 7, 2023 12:05
…ing families

The cbuffer, npoint, pose and rgeo sequence/sequence-set constructors call
temporal_to_tsequence and temporal_to_tsequenceset with the LINEAR
interpolation expressed as the integer interpType constant (3) from meos.h,
matching the regenerated JMEOS 1.4 signature used across the temporal surface.
…ily flags

Each optional extended temporal-type family (cbuffer, npoint, pose, rgeo, h3)
is included or excluded at build time through a Maven flag whose name mirrors
the MEOS/MobilityDB CMake options: -DCBUFFER=OFF, -DNPOINT=OFF, -DPOSE=OFF,
-DRGEO=OFF and -DH3=OFF drop that family's package from compilation. RGEO
depends on POSE so disabling POSE also drops rgeo, and disabling H3 also drops
the BerlinMOD demo and examples that materialise the th3index trip column.
MobilitySparkSession registers the families reflectively, so an excluded
package's absent registrar class is skipped with zero residue while the
remaining families stay intact. The CI workflow builds the fully excluded
variant and asserts the dropped packages produce no classes.
…acc/all

# Conflicts:
#	src/test/java/org/mobilitydb/spark/temporal/ConstructorUDFsExtTest.java
…/all

# Conflicts:
#	.github/workflows/maven.yml
#	src/main/java/org/mobilitydb/spark/MobilitySparkSession.java
#	src/test/java/org/mobilitydb/spark/temporal/MathUDFsExtTest.java
… into acc/all

# Conflicts:
#	src/main/java/org/mobilitydb/spark/cbuffer/CbufferUDFs.java
cbuffer_cmp is not a stable total order under standalone MEOS: the embedded
geometry carries uninitialized padding (the same reason cbuffer_hash and
cbuffer_hash_extended are unbound), so a pairwise gt/cmp scalar invariant on
two values is non-deterministic across calls. MobilityDB exercises cbuffer
ordering through 151_cbufferset_tbl as numValues(set(array_agg(DISTINCT cb
ORDER BY cb))) — an order-independent distinct count where cbuffer_cmp only
feeds the aggregate's ORDER BY. The Spark test mirrors that form: it dedups and
orders the values (TreeSet) and asserts the count of distinct cbuffers, which is
deterministic because equal cbuffers serialize to identical hex-WKB.
The MobilitySpark UDF layer resolves the MEOS native surface through the unified
functions.GeneratedFunctions facade (the MEOS-API/meos-idl.json codegen output
shared with MobilityFlink), bundled as libs/JMEOS-1.4.jar, and the bundled
lib/libmeos.so the CI copies into place carries the matching surface: th3index,
the mul_ temporal multiplication naming, the always-covers spatial relations,
the JVM-safe noexit error handler, and reentrant GEOS. Owned char* returns are
freed in the facade, so String-returning UDFs leave no native allocation behind.
The per-worktree isolated Maven repository is dropped from tracking and ignored.
The native-leak tests sample VmRSS after System.gc(), which never returns
glibc's freed malloc arenas to the OS, so a UDF that allocates and frees a
large transient buffer (the merged trajectory geometry in trajectory()) leaves
the freed chunks resident in the arena. That retention is glibc-version
dependent, making the RSS proxy unstable across environments. forceGc() now
calls malloc_trim(0) through a minimal libc binding before sampling, so VmRSS
reflects genuinely-retained native memory; a real leak survives the trim and
still trips the assert. glibc-only — the binding is null and skipped elsewhere.
minDistance(tgeompoint[], tgeompoint[]) is registered under the SQL name
minDistance, backed by GeneratedFunctions.mindistance_tgeoarr_tgeoarr. The UDF
marshals each array of hex trips into a native Temporal*[] and keeps the
buffers strongly reachable across the native call with reachabilityFence, so
the kernel never reads memory the JVM reclaimed mid-call. BerlinMOD Q5 uses the
set-set form minDistance(array_agg(trips), array_agg(trips)) over the licence
pairs -- identical SQL across MobilityDB, MobilityDuck and MobilitySpark, the
N-by-N resolved inside the aggregate by the STBox prune -- so the Spark-specific
q05_spark variant is removed. The bundled jar and libmeos carry the
mindistance_tgeoarr_tgeoarr name and reentrant GEOS.
queries.sql is the one `-- @query`-delimited source that all three
runners (PostgreSQL, DuckDB, MobilitySpark) split, so the benchmark SQL
cannot drift between platforms; the Spark runner applies preprocessForSpark
as a dialect transform on each section. The bench measures each query with
the noop sink, which forces full materialisation of projection-only
expressions such as the set-set minDistance(tgeompoint[], tgeompoint[]) in
Q5. Query geometries parse with the configurable GEOM_SRID
(-Dberlinmod.srid) so they match the SRID the trips carry, avoiding
mixed-SRID spatial operators in Q11/Q12/Q15. The trip_h3 column
materialises through the registered tgeompointToTh3Index UDF under the
berlinmod.bench.th3index.enable flag.
Spark registers a name as exclusively scalar or aggregate, so a name
backing both fails to load. The bare merge stays the scalar form
(AccessorUDFs) and the column aggregate is mergeAgg, tracking the
upstream Agg-suffix rename of the aggregate forms.
…e type

The canonical overloaded aggregates tmin/tmax (renamed tminAgg/tmaxAgg by
MobilityDB #828) and tsum span tint, tfloat and ttext signatures. Spark
registers a UDAF name as exclusively one aggregate and cannot overload by
signature, so each becomes a single UDAF that dispatches on the base type
returned by temporal_basetype_name (#1139) — int4, float8 or text — to the
matching per-type transfn. This replaces the invented per-type tIntMin /
tFloatMin / tTextMin … surface with the canonical names; tsum keeps the
bare name since no box accessor collides with it. The vendored libmeos.so
and JMEOS jar export temporal_basetype_name.
VmRSS-based leak detection bounds native-heap growth, but glibc 2.39 on
the ubuntu-noble runner retains ~15-21 MB of freed malloc arena across the
5 000-call probe even after malloc_trim(0), so a 10 MB bound flags
freed-but-unreturned memory rather than a leak. The 50 MB bound clears
that arena floor while still catching real Temporal* leaks (≥100 KB/call →
≥500 MB) with a 10x margin.
Bundle the libmeos and the regenerated functions.GeneratedFunctions facade
built from the consolidated MobilityDB tip a2625869f9. The facade carries the
full exported MEOS C API at this tip: ecovers_geo_tcbuffer completes the
circular-buffer ever-covers family, tdistance_tpose_geo and
tdistance_tnpoint_geo regularize the GSERIALIZED distance argument token,
the rigid-geometry surface is on the canonical trgeometry_* prefix
(geo_tpose_to_trgeometry, trgeometry_to_tpose, accessors and restrictions),
the h3 cell I/O uses h3index_parse and h3index_to_string, and the errno-only
meos_initialize_noexit_error_handler keeps a MEOS error on a worker thread
from tearing down the JVM. tpose_to_tpoint stays a supertype cast that derives
tgeompoint or tgeogpoint from the geodetic flag.

MeosApiCoverageTest proves every one of the 3029 exported public MEOS symbols
has a facade binding and every public-header declaration is generated, and
SparkUdfParityTest proves every addressable MobilityDB SQL function is either a
same-name Spark UDF or a categorized exclusion. The MathGapUDFs, AccessorGapUDFs
and SetSpanGapUDFs registrars close the remaining same-name SQL parity gaps.
The contract resources in src/test/resources/meos are derived from this tip.
The Trips x Trips and Trips x QueryPoints x QueryInstants cross-join queries
re-parsed each multi-thousand-instant trip hex-WKB on every candidate pair,
so the spatial `&&` bounding-box prefilter cost as much as the exact predicate
it was meant to guard. Materialise the whole-trip STBox once per trip as the
trip_bbox column and route the spatial prefilter through it: preprocessForSpark
rewrites `trip && <stbox>` to stboxOverlaps(trip_bbox, <stbox>) and feeds
expandSpace the trip_bbox directly, so the prefilter parses a ~50-byte STBox
instead of the full trip. stboxOverlaps and an STBox-aware expandSpace are the
two supporting UDFs; trip_bbox is materialised next to trip_h3 at load.

Also un-shadow Spark's built-in numeric ROUND by not registering the temporal
round UDF under the bare name (temporal-float rounding stays available as
tfloatRound), and normalise the th3index set-membership prefilter spelling to
the registered everIntersectsH3IndexSetTh3Index.
… in Spark

The th3index cell prefilter is an indexed cell-membership seek on PostgreSQL
and DuckDB, but in Spark's string-storage model the trip_h3 sequence is
re-parsed per candidate pair and the H3 cell of a projected (non lon/lat)
BerlinMOD coordinate is undefined, so the prefilter neither prunes nor runs
cheaply. preprocessForSpark resolves the three th3index prefilter forms
(everIntersectsH3IndexSetTh3Index, everEqH3IndexTh3Index, everEqTh3IndexTh3Index)
to the materialised trip_bbox STBox overlap that the same dialect already uses
for the `&&` operator. Each is a sound spatial superset filter computed on a
~50-byte STBox, so the exact eIntersects / nearestApproachDistance still runs on
the surviving pairs and results are unchanged while the Trips x QueryRegions and
Trips x QueryPoints scans drop from tens of seconds to sub-second.
…al-join UDFs

MobilitySpark exposes the MEOS set-set spatial-join family as thin set-returning
UDFs: eDwithinPairs and aDisjointPairs return array<struct<i,j>> and tDwithinPairs
returns array<struct<i,j,periods>>, each marshalling both tgeompoint sides into a
Temporal*[] like minDistance with a reachabilityFence around the native call,
reading the kernel's flattened 0-based index pairs, and freeing the returned
buffers.  The BerlinMOD Q6/Q10/Q16 cross-join queries express the natural set-set
form — array_agg of the trips with a parallel array_agg of the identity, joined
only on the grouping keys — and preprocessForSpark maps the portable LATERAL
set-returning-function form onto Spark's LATERAL VIEW inline.  The vendored libmeos
carries the family.  Consumer of MobilityDB #1148; it builds on the unified-jar and
bench foundation, so the diff against main is cumulative until that foundation lands.
@estebanzimanyi estebanzimanyi force-pushed the feat/setset-spatial-join-udfs branch 2 times, most recently from e8aefe6 to 1a0c2de Compare June 5, 2026 17:34
The bundled libmeos and the JMEOS jar carry the 54a9d4bc54 public surface: the
per-thread PROJ context for thread-safe SRID transforms and the supertype
tpose_to_tpoint conversion. The windowed-aggregate interval parser binds the
public pg_interval_in wrapper, and the MeosApiCoverageTest contract lists the
exported symbols, the declared-not-exported set, and the undeclared internal
helpers.
@estebanzimanyi estebanzimanyi force-pushed the feat/setset-spatial-join-udfs branch from 1a0c2de to d7c4965 Compare June 5, 2026 19:59
The trip_h3 temporal H3-cell index is produced point-by-point at trip-assembly
time in the source data, so the benchmark reuses it when the loaded Trips table
already carries the column instead of re-deriving it from the assembled
trajectory. Only the trip_bbox prefilter column is materialised; trip_h3 is
materialised solely when the dataset does not provide it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants