From ed1f1887d46149f22676512817c15fe91ecaae83 Mon Sep 17 00:00:00 2001 From: Esteban Zimanyi Date: Sun, 31 May 2026 10:40:46 +0200 Subject: [PATCH] feat(berlinmod): route the Kafka streaming-form spatial predicates through MEOS Every BerlinMOD spatial predicate evaluates through MEOS via the thin MEOSBridge: within-distance through edwithin_tgeo_geo (the vehicle position as a tgeogpoint instant, metres on the WGS84 spheroid), region containment through eintersects_tgeo_geo, and distances through geog_distance. MEOSBridge holds no spatial mathematics of its own and initialises MEOS per stream thread; the pure-Java Haversine and SegmentDistance classes are removed. --- .../java/berlinmod/BerlinMODQ1LocalTest.java | 1 - .../src/main/java/berlinmod/Haversine.java | 33 --- .../src/main/java/berlinmod/MEOSBridge.java | 195 +++++++----------- .../java/berlinmod/Q3ContinuousProcessor.java | 4 +- .../java/berlinmod/Q3SnapshotProcessor.java | 2 +- .../java/berlinmod/Q4ContinuousProcessor.java | 2 +- .../java/berlinmod/Q4SnapshotProcessor.java | 3 +- .../java/berlinmod/Q4WindowedProcessor.java | 3 +- .../java/berlinmod/Q6ContinuousProcessor.java | 2 +- .../java/berlinmod/Q6SnapshotProcessor.java | 2 +- .../java/berlinmod/Q6WindowedProcessor.java | 2 +- .../main/java/berlinmod/SegmentDistance.java | 55 ----- 12 files changed, 78 insertions(+), 226 deletions(-) delete mode 100644 kafka-streams-app/src/main/java/berlinmod/Haversine.java delete mode 100644 kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java diff --git a/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java b/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java index 112a21e..591881f 100644 --- a/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java +++ b/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java @@ -42,7 +42,6 @@ public class BerlinMODQ1LocalTest { private static final long SENTINEL_T2 = T0 + 20_001L; public static void main(String[] args) { - System.setProperty("mobilitykafka.meos.enabled", "false"); LOG.info("BerlinMODQ1LocalTest starting (TopologyTestDriver, all continuous + snapshot forms)"); Properties props = new Properties(); diff --git a/kafka-streams-app/src/main/java/berlinmod/Haversine.java b/kafka-streams-app/src/main/java/berlinmod/Haversine.java deleted file mode 100644 index 0203583..0000000 --- a/kafka-streams-app/src/main/java/berlinmod/Haversine.java +++ /dev/null @@ -1,33 +0,0 @@ -package berlinmod; - -/** - * Great-circle distance in metres between two WGS84 (lon, lat) points. - * - *

Pure-Java fallback for {@link MEOSBridge#dwithinMetres} / - * {@link MEOSBridge#distanceMetres}, used by the BerlinMOD-9 × 3-form - * streaming scaffold when libmeos is not loadable on the runtime path. The - * primary spatial-predicate surface is {@link MEOSBridge}. - */ -public final class Haversine { - - private static final double EARTH_RADIUS_METRES = 6_371_000.0; - - private Haversine() {} - - public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) { - double phi1 = Math.toRadians(lat1); - double phi2 = Math.toRadians(lat2); - double dPhi = Math.toRadians(lat2 - lat1); - double dLambda = Math.toRadians(lon2 - lon1); - - double a = Math.sin(dPhi / 2) * Math.sin(dPhi / 2) - + Math.cos(phi1) * Math.cos(phi2) - * Math.sin(dLambda / 2) * Math.sin(dLambda / 2); - double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); - return EARTH_RADIUS_METRES * c; - } - - public static boolean withinMetres(double lon, double lat, double pLon, double pLat, double radiusMetres) { - return distanceMetres(lon, lat, pLon, pLat) <= radiusMetres; - } -} diff --git a/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java b/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java index 6d1a9ff..c9c835d 100644 --- a/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java +++ b/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java @@ -1,158 +1,103 @@ package berlinmod; -import functions.functions; +import functions.GeneratedFunctions; import jnr.ffi.Pointer; -import utils.spatial.PointToSegment; /** - * Runtime bridge from MobilityKafka BerlinMOD streaming-form predicates to - * MEOS via JMEOS. + * Thin wiring from the BerlinMOD streaming-form predicates to MEOS via JMEOS. * - *

All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold - * flow through this class. When the JMEOS native libmeos shared object is - * present and loadable, each predicate evaluates through MEOS' WGS84 - * geography surface ({@code geom_to_geog} + {@code geog_dwithin}). When - * libmeos is not available, each predicate falls back to the corresponding - * pure-Java implementation in {@link Haversine} or {@link SegmentDistance} - * so the BerlinMOD mini-cluster local tests stay runnable on systems - * without a MEOS install. - * - *

The fallback is gated by the {@link #MEOS_AVAILABLE} static flag, set - * once at class-load time: - *

+ *

Every spatial predicate evaluates through MEOS: the within-distance + * predicate is the canonical temporal operator {@code edwithin_tgeo_geo} — + * ever-within between the vehicle's {@code tgeogpoint} instant and the query + * geography, in metres on the WGS84 spheroid; region containment is + * {@code eintersects_tgeo_geo} between the point's {@code tgeompoint} instant + * and the region polygon; distances are {@code geog_distance}. This class holds + * no spatial mathematics of its own: it constructs the MEOS inputs and delegates + * the computation to libmeos, initialising MEOS once per stream thread. */ public final class MEOSBridge { - /** - * {@code true} iff MEOS is available on this runtime and the bridge - * routes through it; {@code false} iff the bridge will use the pure-Java - * fallbacks. - */ - public static final boolean MEOS_AVAILABLE; - - static { - boolean enabled = - Boolean.parseBoolean(System.getProperty("mobilitykafka.meos.enabled", "true")); - boolean ok = false; - if (enabled) { - try { - functions.meos_initialize(); - ok = true; - } catch (Throwable t) { - // libmeos shared object not loadable on this runtime — fall back. - ok = false; - } - } - MEOS_AVAILABLE = ok; - } + private static final ThreadLocal INITIALIZED = + ThreadLocal.withInitial(() -> Boolean.FALSE); private MEOSBridge() { // utility } - // ---------------------------------------------------------------------- - // Public bridge surface — same shape as Haversine + SegmentDistance. - // ---------------------------------------------------------------------- + private static void ensureInitializedOnThread() { + if (!INITIALIZED.get()) { + GeneratedFunctions.meos_initialize_error_handler((level, code, message) -> { }); + GeneratedFunctions.meos_initialize(); + INITIALIZED.set(Boolean.TRUE); + } + } - /** - * @return {@code true} if the great-circle distance from {@code (lon1, lat1)} - * to {@code (lon2, lat2)} on the WGS84 spheroid is at most - * {@code radiusMetres}. MEOS-backed via {@code geog_dwithin} when - * available, else pure-Java {@link Haversine#withinMetres}. - */ + /** @return {@code true} iff {@code (lon1, lat1)} is within {@code radiusMetres} + * of {@code (lon2, lat2)} on the WGS84 spheroid, via MEOS {@code edwithin_tgeo_geo}. */ public static boolean dwithinMetres(double lon1, double lat1, - double lon2, double lat2, - double radiusMetres) { - if (!MEOS_AVAILABLE) { - return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres); - } - Pointer g1 = pointGeog(lon1, lat1); - Pointer g2 = pointGeog(lon2, lat2); - if (g1 == null || g2 == null) { - return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres); - } - return functions.geog_dwithin(g1, g2, radiusMetres, true); + double lon2, double lat2, double radiusMetres) { + ensureInitializedOnThread(); + return GeneratedFunctions.edwithin_tgeo_geo( + tgeogInst(lon1, lat1), pointGeog(lon2, lat2), radiusMetres) == 1; } - /** - * @return {@code true} if the spheroidal distance from {@code (pLon, pLat)} - * to the LineString {@code (s1, s2)} is at most {@code radiusMetres}. - * MEOS-backed via {@code geog_dwithin} on geographies built from - * the point and line WKTs, else pure-Java - * {@link SegmentDistance#withinMetres}. - */ + /** @return {@code true} iff {@code (pLon, pLat)} is within {@code radiusMetres} + * of the LineString {@code (s1, s2)}, via MEOS {@code edwithin_tgeo_geo}. */ public static boolean dwithinSegmentMetres(double pLon, double pLat, double s1Lon, double s1Lat, - double s2Lon, double s2Lat, - double radiusMetres) { - if (!MEOS_AVAILABLE) { - return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres); - } - Pointer pg = pointGeog(pLon, pLat); - Pointer lg = lineGeog(s1Lon, s1Lat, s2Lon, s2Lat); - if (pg == null || lg == null) { - return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres); - } - return functions.geog_dwithin(pg, lg, radiusMetres, true); + double s2Lon, double s2Lat, double radiusMetres) { + ensureInitializedOnThread(); + return GeneratedFunctions.edwithin_tgeo_geo( + tgeogInst(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat), radiusMetres) == 1; } - /** - * @return the spheroidal distance in metres between two WGS84 points. - * MEOS-backed via {@code utils.spatial.Haversine.distance} - * (which calls MEOS' {@code geog_distance} over two POINT - * geographies) when libmeos is loadable, else pure-Java - * {@link Haversine#distanceMetres}. - */ - public static double distanceMetres(double lon1, double lat1, - double lon2, double lat2) { - if (!MEOS_AVAILABLE) { - return Haversine.distanceMetres(lon1, lat1, lon2, lat2); - } - return utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2); + /** @return {@code true} iff {@code (lon, lat)} lies in the axis-aligned box, via + * MEOS {@code eintersects_tgeo_geo} against the box polygon (planar, SRID 4326). */ + public static boolean intersectsBox(double lon, double lat, + double xmin, double ymin, double xmax, double ymax) { + ensureInitializedOnThread(); + return GeneratedFunctions.eintersects_tgeo_geo( + tgeomInst(lon, lat), boxPolygon(xmin, ymin, xmax, ymax)) == 1; } - /** - * @return the spheroidal distance in metres from {@code (pLon, pLat)} to - * the LineString {@code (s1, s2)}. MEOS-backed via - * {@code utils.spatial.PointToSegment.distance} when libmeos is - * loadable, else pure-Java - * {@link SegmentDistance#distanceMetres}. - */ + /** @return the WGS84 spheroidal distance in metres between two points, via MEOS {@code geog_distance}. */ + public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) { + ensureInitializedOnThread(); + return GeneratedFunctions.geog_distance(pointGeog(lon1, lat1), pointGeog(lon2, lat2)); + } + + /** @return the WGS84 spheroidal distance in metres from a point to the LineString, + * via MEOS {@code geog_distance}. */ public static double distanceSegmentMetres(double pLon, double pLat, - double s1Lon, double s1Lat, - double s2Lon, double s2Lat) { - if (!MEOS_AVAILABLE) { - return SegmentDistance.distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat); - } - return PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat); + double s1Lon, double s1Lat, double s2Lon, double s2Lat) { + ensureInitializedOnThread(); + return GeneratedFunctions.geog_distance( + pointGeog(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat)); } - // ---------------------------------------------------------------------- - // Internal helpers — WKT → geometry → geography in one MEOS-side step. - // ---------------------------------------------------------------------- + private static Pointer tgeogInst(double lon, double lat) { + return GeneratedFunctions.tgeogpoint_in( + String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat)); + } + + private static Pointer tgeomInst(double lon, double lat) { + return GeneratedFunctions.tgeompoint_in( + String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat)); + } private static Pointer pointGeog(double lon, double lat) { - String wkt = String.format("SRID=4326;Point(%.7f %.7f)", lon, lat); - Pointer g = functions.geom_in(wkt, -1); - if (g == null) { - return null; - } - return functions.geom_to_geog(g); + return GeneratedFunctions.geom_to_geog( + GeneratedFunctions.geom_in(String.format("SRID=4326;Point(%.7f %.7f)", lon, lat), -1)); } - private static Pointer lineGeog(double s1Lon, double s1Lat, - double s2Lon, double s2Lat) { - String wkt = String.format("SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", - s1Lon, s1Lat, s2Lon, s2Lat); - Pointer g = functions.geom_in(wkt, -1); - if (g == null) { - return null; - } - return functions.geom_to_geog(g); + private static Pointer lineGeog(double s1Lon, double s1Lat, double s2Lon, double s2Lat) { + return GeneratedFunctions.geom_to_geog(GeneratedFunctions.geom_in(String.format( + "SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", s1Lon, s1Lat, s2Lon, s2Lat), -1)); + } + + private static Pointer boxPolygon(double xmin, double ymin, double xmax, double ymax) { + return GeneratedFunctions.geom_in(String.format( + "SRID=4326;Polygon((%.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f))", + xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax, xmin, ymin), -1); } } diff --git a/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java index abd5755..dfdd987 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java @@ -13,9 +13,7 @@ * eventTime, near)} per incoming GPS event. Same predicate semantics * as MobilityFlink's {@code Q3ContinuousFunction}. * - *

Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin} - * over WGS84 geographies when libmeos is loadable, {@link Haversine} - * fallback otherwise. + *

Predicate: {@link MEOS {@code edwithin_tgeo_geo} over WGS84 geographies. */ public class Q3ContinuousProcessor implements Processor { diff --git a/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java index e75cc85..2f47c2c 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java @@ -22,7 +22,7 @@ *

Caller keys the input by a constant so the shared cross-vehicle * last-known store lives in one subtask. Per event: update last-known. * Per STREAM_TIME punctuator fire: iterate last-known, evaluate the - * Haversine radius predicate, forward {@code (currentTick, vehicleId)} + * MEOS edwithin_tgeo_geo radius predicate, forward {@code (currentTick, vehicleId)} * for every near vehicle (sorted by vehicleId). */ public class Q3SnapshotProcessor implements Processor { diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java index 95f069f..788d111 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java @@ -54,6 +54,6 @@ public void process(Record record) { } private boolean inBox(double lon, double lat) { - return lon >= xmin && lon <= xmax && lat >= ymin && lat <= ymax; + return MEOSBridge.intersectsBox(lon, lat, xmin, ymin, xmax, ymax); } } diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java index 27a1383..5de820d 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java @@ -62,8 +62,7 @@ public void init(ProcessorContext context) { public void process(Record record) { BerlinMODTrip trip = record.value(); if (trip == null || trip.getVehicleId() == -1) return; - boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax - && trip.getLat() >= ymin && trip.getLat() <= ymax; + boolean curr = MEOSBridge.intersectsBox(trip.getLon(), trip.getLat(), xmin, ymin, xmax, ymax); Boolean prev = wasInside.get(trip.getVehicleId()); boolean prevInside = prev != null && prev; if (curr && !prevInside) { diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java index 346ec34..17f6e7b 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java @@ -57,8 +57,7 @@ public void process(Record record) { BerlinMODTrip trip = record.value(); if (trip == null || trip.getVehicleId() == -1) return; long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; - boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax - && trip.getLat() >= ymin && trip.getLat() <= ymax; + boolean curr = MEOSBridge.intersectsBox(trip.getLon(), trip.getLat(), xmin, ymin, xmax, ymax); String s = winState.get(winStart); // Parse per-vehicle records separated by '|' StringBuilder rebuilt = new StringBuilder(); diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java index f5f7f4c..0f5a88a 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java @@ -11,7 +11,7 @@ *

"What is each vehicle's cumulative distance travelled so far?" * *

Keyed by vehicleId. Per-vehicle state holds the last-known (lon, lat) - * and the running total in metres. On each event, accumulate the Haversine + * and the running total in metres. On each event, accumulate the MEOS geog_distance * delta and emit the cumulative total. * *

State value uses a small string encoding "lon,lat,total" since the diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java index 7fe87cc..7c7b0b5 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java @@ -20,7 +20,7 @@ * up to T?" * *

Caller keys the input by a constant. State value "lon,lat,total" - * per vehicleId. Per event: accumulate Haversine delta. Per STREAM_TIME + * per vehicleId. Per event: accumulate the MEOS geog_distance delta. Per STREAM_TIME * punctuator fire: emit {@code (currentTick, vehicleId, total)} for * every vehicle (sorted by vehicleId), encoded as "vid:total". */ diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java index c540ad4..a148e0f 100644 --- a/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java +++ b/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java @@ -20,7 +20,7 @@ * during the window." * *

State encodes per-window per-vehicle {@code "vid:lastLon,lastLat,total|..."}. - * On each event, accumulate Haversine delta from the previous in-window + * On each event, accumulate the MEOS geog_distance delta from the previous in-window * position. On punctuator: emit per-vehicle totals for closed windows. */ public class Q6WindowedProcessor implements Processor { diff --git a/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java b/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java deleted file mode 100644 index da4efc9..0000000 --- a/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java +++ /dev/null @@ -1,55 +0,0 @@ -package berlinmod; - -/** - * Distance from a (lon, lat) point to a (lon, lat) line segment, in metres, - * via a local equirectangular projection centred on the segment midpoint. - * - *

Pure-Java fallback for {@link MEOSBridge#dwithinSegmentMetres}, used - * by the BerlinMOD-Q8 streaming scaffold when libmeos is not loadable on - * the runtime path. The primary point-to-line spatial predicate is - * {@link MEOSBridge#dwithinSegmentMetres}, which routes through MEOS' - * {@code geog_dwithin} on a LineString geography when available. - */ -public final class SegmentDistance { - - private static final double EARTH_RADIUS_METRES = 6_371_000.0; - - private SegmentDistance() {} - - public static double distanceMetres( - double pLon, double pLat, - double s1Lon, double s1Lat, - double s2Lon, double s2Lat) { - double midLat = (s1Lat + s2Lat) / 2.0; - double mPerDegLat = Math.toRadians(1.0) * EARTH_RADIUS_METRES; - double mPerDegLon = mPerDegLat * Math.cos(Math.toRadians(midLat)); - - double px = pLon * mPerDegLon; - double py = pLat * mPerDegLat; - double s1x = s1Lon * mPerDegLon; - double s1y = s1Lat * mPerDegLat; - double s2x = s2Lon * mPerDegLon; - double s2y = s2Lat * mPerDegLat; - - double dx = s2x - s1x; - double dy = s2y - s1y; - double lenSq = dx * dx + dy * dy; - if (lenSq == 0.0) { - return Math.hypot(px - s1x, py - s1y); - } - double t = ((px - s1x) * dx + (py - s1y) * dy) / lenSq; - if (t < 0.0) t = 0.0; - else if (t > 1.0) t = 1.0; - double cx = s1x + t * dx; - double cy = s1y + t * dy; - return Math.hypot(px - cx, py - cy); - } - - public static boolean withinMetres( - double pLon, double pLat, - double s1Lon, double s1Lat, - double s2Lon, double s2Lat, - double radiusMetres) { - return distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) <= radiusMetres; - } -}