diff --git a/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java b/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
index 112a21e..591881f 100644
--- a/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
+++ b/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
@@ -42,7 +42,6 @@ public class BerlinMODQ1LocalTest {
private static final long SENTINEL_T2 = T0 + 20_001L;
public static void main(String[] args) {
- System.setProperty("mobilitykafka.meos.enabled", "false");
LOG.info("BerlinMODQ1LocalTest starting (TopologyTestDriver, all continuous + snapshot forms)");
Properties props = new Properties();
diff --git a/kafka-streams-app/src/main/java/berlinmod/Haversine.java b/kafka-streams-app/src/main/java/berlinmod/Haversine.java
deleted file mode 100644
index 0203583..0000000
--- a/kafka-streams-app/src/main/java/berlinmod/Haversine.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package berlinmod;
-
-/**
- * Great-circle distance in metres between two WGS84 (lon, lat) points.
- *
- *
Pure-Java fallback for {@link MEOSBridge#dwithinMetres} /
- * {@link MEOSBridge#distanceMetres}, used by the BerlinMOD-9 × 3-form
- * streaming scaffold when libmeos is not loadable on the runtime path. The
- * primary spatial-predicate surface is {@link MEOSBridge}.
- */
-public final class Haversine {
-
- private static final double EARTH_RADIUS_METRES = 6_371_000.0;
-
- private Haversine() {}
-
- public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) {
- double phi1 = Math.toRadians(lat1);
- double phi2 = Math.toRadians(lat2);
- double dPhi = Math.toRadians(lat2 - lat1);
- double dLambda = Math.toRadians(lon2 - lon1);
-
- double a = Math.sin(dPhi / 2) * Math.sin(dPhi / 2)
- + Math.cos(phi1) * Math.cos(phi2)
- * Math.sin(dLambda / 2) * Math.sin(dLambda / 2);
- double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
- return EARTH_RADIUS_METRES * c;
- }
-
- public static boolean withinMetres(double lon, double lat, double pLon, double pLat, double radiusMetres) {
- return distanceMetres(lon, lat, pLon, pLat) <= radiusMetres;
- }
-}
diff --git a/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java b/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java
index 6d1a9ff..c9c835d 100644
--- a/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java
+++ b/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java
@@ -1,158 +1,103 @@
package berlinmod;
-import functions.functions;
+import functions.GeneratedFunctions;
import jnr.ffi.Pointer;
-import utils.spatial.PointToSegment;
/**
- * Runtime bridge from MobilityKafka BerlinMOD streaming-form predicates to
- * MEOS via JMEOS.
+ * Thin wiring from the BerlinMOD streaming-form predicates to MEOS via JMEOS.
*
- *
All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold
- * flow through this class. When the JMEOS native libmeos shared object is
- * present and loadable, each predicate evaluates through MEOS' WGS84
- * geography surface ({@code geom_to_geog} + {@code geog_dwithin}). When
- * libmeos is not available, each predicate falls back to the corresponding
- * pure-Java implementation in {@link Haversine} or {@link SegmentDistance}
- * so the BerlinMOD mini-cluster local tests stay runnable on systems
- * without a MEOS install.
- *
- *
The fallback is gated by the {@link #MEOS_AVAILABLE} static flag, set
- * once at class-load time:
- *
- * - {@code -Dmobilitykafka.meos.enabled=false} forces the pure-Java path
- * even when libmeos is present (used by {@code BerlinMODQ1LocalTest}).
- *
- Otherwise {@code MEOS_AVAILABLE} is {@code true} iff
- * {@code functions.meos_initialize()} returns without throwing.
- *
+ * Every spatial predicate evaluates through MEOS: the within-distance
+ * predicate is the canonical temporal operator {@code edwithin_tgeo_geo} —
+ * ever-within between the vehicle's {@code tgeogpoint} instant and the query
+ * geography, in metres on the WGS84 spheroid; region containment is
+ * {@code eintersects_tgeo_geo} between the point's {@code tgeompoint} instant
+ * and the region polygon; distances are {@code geog_distance}. This class holds
+ * no spatial mathematics of its own: it constructs the MEOS inputs and delegates
+ * the computation to libmeos, initialising MEOS once per stream thread.
*/
public final class MEOSBridge {
- /**
- * {@code true} iff MEOS is available on this runtime and the bridge
- * routes through it; {@code false} iff the bridge will use the pure-Java
- * fallbacks.
- */
- public static final boolean MEOS_AVAILABLE;
-
- static {
- boolean enabled =
- Boolean.parseBoolean(System.getProperty("mobilitykafka.meos.enabled", "true"));
- boolean ok = false;
- if (enabled) {
- try {
- functions.meos_initialize();
- ok = true;
- } catch (Throwable t) {
- // libmeos shared object not loadable on this runtime — fall back.
- ok = false;
- }
- }
- MEOS_AVAILABLE = ok;
- }
+ private static final ThreadLocal INITIALIZED =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
private MEOSBridge() {
// utility
}
- // ----------------------------------------------------------------------
- // Public bridge surface — same shape as Haversine + SegmentDistance.
- // ----------------------------------------------------------------------
+ private static void ensureInitializedOnThread() {
+ if (!INITIALIZED.get()) {
+ GeneratedFunctions.meos_initialize_error_handler((level, code, message) -> { });
+ GeneratedFunctions.meos_initialize();
+ INITIALIZED.set(Boolean.TRUE);
+ }
+ }
- /**
- * @return {@code true} if the great-circle distance from {@code (lon1, lat1)}
- * to {@code (lon2, lat2)} on the WGS84 spheroid is at most
- * {@code radiusMetres}. MEOS-backed via {@code geog_dwithin} when
- * available, else pure-Java {@link Haversine#withinMetres}.
- */
+ /** @return {@code true} iff {@code (lon1, lat1)} is within {@code radiusMetres}
+ * of {@code (lon2, lat2)} on the WGS84 spheroid, via MEOS {@code edwithin_tgeo_geo}. */
public static boolean dwithinMetres(double lon1, double lat1,
- double lon2, double lat2,
- double radiusMetres) {
- if (!MEOS_AVAILABLE) {
- return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
- }
- Pointer g1 = pointGeog(lon1, lat1);
- Pointer g2 = pointGeog(lon2, lat2);
- if (g1 == null || g2 == null) {
- return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
- }
- return functions.geog_dwithin(g1, g2, radiusMetres, true);
+ double lon2, double lat2, double radiusMetres) {
+ ensureInitializedOnThread();
+ return GeneratedFunctions.edwithin_tgeo_geo(
+ tgeogInst(lon1, lat1), pointGeog(lon2, lat2), radiusMetres) == 1;
}
- /**
- * @return {@code true} if the spheroidal distance from {@code (pLon, pLat)}
- * to the LineString {@code (s1, s2)} is at most {@code radiusMetres}.
- * MEOS-backed via {@code geog_dwithin} on geographies built from
- * the point and line WKTs, else pure-Java
- * {@link SegmentDistance#withinMetres}.
- */
+ /** @return {@code true} iff {@code (pLon, pLat)} is within {@code radiusMetres}
+ * of the LineString {@code (s1, s2)}, via MEOS {@code edwithin_tgeo_geo}. */
public static boolean dwithinSegmentMetres(double pLon, double pLat,
double s1Lon, double s1Lat,
- double s2Lon, double s2Lat,
- double radiusMetres) {
- if (!MEOS_AVAILABLE) {
- return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
- }
- Pointer pg = pointGeog(pLon, pLat);
- Pointer lg = lineGeog(s1Lon, s1Lat, s2Lon, s2Lat);
- if (pg == null || lg == null) {
- return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
- }
- return functions.geog_dwithin(pg, lg, radiusMetres, true);
+ double s2Lon, double s2Lat, double radiusMetres) {
+ ensureInitializedOnThread();
+ return GeneratedFunctions.edwithin_tgeo_geo(
+ tgeogInst(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat), radiusMetres) == 1;
}
- /**
- * @return the spheroidal distance in metres between two WGS84 points.
- * MEOS-backed via {@code utils.spatial.Haversine.distance}
- * (which calls MEOS' {@code geog_distance} over two POINT
- * geographies) when libmeos is loadable, else pure-Java
- * {@link Haversine#distanceMetres}.
- */
- public static double distanceMetres(double lon1, double lat1,
- double lon2, double lat2) {
- if (!MEOS_AVAILABLE) {
- return Haversine.distanceMetres(lon1, lat1, lon2, lat2);
- }
- return utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2);
+ /** @return {@code true} iff {@code (lon, lat)} lies in the axis-aligned box, via
+ * MEOS {@code eintersects_tgeo_geo} against the box polygon (planar, SRID 4326). */
+ public static boolean intersectsBox(double lon, double lat,
+ double xmin, double ymin, double xmax, double ymax) {
+ ensureInitializedOnThread();
+ return GeneratedFunctions.eintersects_tgeo_geo(
+ tgeomInst(lon, lat), boxPolygon(xmin, ymin, xmax, ymax)) == 1;
}
- /**
- * @return the spheroidal distance in metres from {@code (pLon, pLat)} to
- * the LineString {@code (s1, s2)}. MEOS-backed via
- * {@code utils.spatial.PointToSegment.distance} when libmeos is
- * loadable, else pure-Java
- * {@link SegmentDistance#distanceMetres}.
- */
+ /** @return the WGS84 spheroidal distance in metres between two points, via MEOS {@code geog_distance}. */
+ public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) {
+ ensureInitializedOnThread();
+ return GeneratedFunctions.geog_distance(pointGeog(lon1, lat1), pointGeog(lon2, lat2));
+ }
+
+ /** @return the WGS84 spheroidal distance in metres from a point to the LineString,
+ * via MEOS {@code geog_distance}. */
public static double distanceSegmentMetres(double pLon, double pLat,
- double s1Lon, double s1Lat,
- double s2Lon, double s2Lat) {
- if (!MEOS_AVAILABLE) {
- return SegmentDistance.distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
- }
- return PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
+ double s1Lon, double s1Lat, double s2Lon, double s2Lat) {
+ ensureInitializedOnThread();
+ return GeneratedFunctions.geog_distance(
+ pointGeog(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat));
}
- // ----------------------------------------------------------------------
- // Internal helpers — WKT → geometry → geography in one MEOS-side step.
- // ----------------------------------------------------------------------
+ private static Pointer tgeogInst(double lon, double lat) {
+ return GeneratedFunctions.tgeogpoint_in(
+ String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat));
+ }
+
+ private static Pointer tgeomInst(double lon, double lat) {
+ return GeneratedFunctions.tgeompoint_in(
+ String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat));
+ }
private static Pointer pointGeog(double lon, double lat) {
- String wkt = String.format("SRID=4326;Point(%.7f %.7f)", lon, lat);
- Pointer g = functions.geom_in(wkt, -1);
- if (g == null) {
- return null;
- }
- return functions.geom_to_geog(g);
+ return GeneratedFunctions.geom_to_geog(
+ GeneratedFunctions.geom_in(String.format("SRID=4326;Point(%.7f %.7f)", lon, lat), -1));
}
- private static Pointer lineGeog(double s1Lon, double s1Lat,
- double s2Lon, double s2Lat) {
- String wkt = String.format("SRID=4326;LineString(%.7f %.7f, %.7f %.7f)",
- s1Lon, s1Lat, s2Lon, s2Lat);
- Pointer g = functions.geom_in(wkt, -1);
- if (g == null) {
- return null;
- }
- return functions.geom_to_geog(g);
+ private static Pointer lineGeog(double s1Lon, double s1Lat, double s2Lon, double s2Lat) {
+ return GeneratedFunctions.geom_to_geog(GeneratedFunctions.geom_in(String.format(
+ "SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", s1Lon, s1Lat, s2Lon, s2Lat), -1));
+ }
+
+ private static Pointer boxPolygon(double xmin, double ymin, double xmax, double ymax) {
+ return GeneratedFunctions.geom_in(String.format(
+ "SRID=4326;Polygon((%.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f))",
+ xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax, xmin, ymin), -1);
}
}
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java
index abd5755..dfdd987 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java
@@ -13,9 +13,7 @@
* eventTime, near)} per incoming GPS event. Same predicate semantics
* as MobilityFlink's {@code Q3ContinuousFunction}.
*
- * Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
- * over WGS84 geographies when libmeos is loadable, {@link Haversine}
- * fallback otherwise.
+ *
Predicate: {@link MEOS {@code edwithin_tgeo_geo} over WGS84 geographies.
*/
public class Q3ContinuousProcessor implements Processor {
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java
index e75cc85..2f47c2c 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java
@@ -22,7 +22,7 @@
* Caller keys the input by a constant so the shared cross-vehicle
* last-known store lives in one subtask. Per event: update last-known.
* Per STREAM_TIME punctuator fire: iterate last-known, evaluate the
- * Haversine radius predicate, forward {@code (currentTick, vehicleId)}
+ * MEOS edwithin_tgeo_geo radius predicate, forward {@code (currentTick, vehicleId)}
* for every near vehicle (sorted by vehicleId).
*/
public class Q3SnapshotProcessor implements Processor {
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java
index 95f069f..788d111 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java
@@ -54,6 +54,6 @@ public void process(Record record) {
}
private boolean inBox(double lon, double lat) {
- return lon >= xmin && lon <= xmax && lat >= ymin && lat <= ymax;
+ return MEOSBridge.intersectsBox(lon, lat, xmin, ymin, xmax, ymax);
}
}
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java
index 27a1383..5de820d 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java
@@ -62,8 +62,7 @@ public void init(ProcessorContext context) {
public void process(Record record) {
BerlinMODTrip trip = record.value();
if (trip == null || trip.getVehicleId() == -1) return;
- boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax
- && trip.getLat() >= ymin && trip.getLat() <= ymax;
+ boolean curr = MEOSBridge.intersectsBox(trip.getLon(), trip.getLat(), xmin, ymin, xmax, ymax);
Boolean prev = wasInside.get(trip.getVehicleId());
boolean prevInside = prev != null && prev;
if (curr && !prevInside) {
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java
index 346ec34..17f6e7b 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java
@@ -57,8 +57,7 @@ public void process(Record record) {
BerlinMODTrip trip = record.value();
if (trip == null || trip.getVehicleId() == -1) return;
long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs;
- boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax
- && trip.getLat() >= ymin && trip.getLat() <= ymax;
+ boolean curr = MEOSBridge.intersectsBox(trip.getLon(), trip.getLat(), xmin, ymin, xmax, ymax);
String s = winState.get(winStart);
// Parse per-vehicle records separated by '|'
StringBuilder rebuilt = new StringBuilder();
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java
index f5f7f4c..0f5a88a 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java
@@ -11,7 +11,7 @@
* "What is each vehicle's cumulative distance travelled so far?"
*
*
Keyed by vehicleId. Per-vehicle state holds the last-known (lon, lat)
- * and the running total in metres. On each event, accumulate the Haversine
+ * and the running total in metres. On each event, accumulate the MEOS geog_distance
* delta and emit the cumulative total.
*
*
State value uses a small string encoding "lon,lat,total" since the
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java
index 7fe87cc..7c7b0b5 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java
@@ -20,7 +20,7 @@
* up to T?"
*
*
Caller keys the input by a constant. State value "lon,lat,total"
- * per vehicleId. Per event: accumulate Haversine delta. Per STREAM_TIME
+ * per vehicleId. Per event: accumulate the MEOS geog_distance delta. Per STREAM_TIME
* punctuator fire: emit {@code (currentTick, vehicleId, total)} for
* every vehicle (sorted by vehicleId), encoded as "vid:total".
*/
diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java
index c540ad4..a148e0f 100644
--- a/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java
+++ b/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java
@@ -20,7 +20,7 @@
* during the window."
*
*
State encodes per-window per-vehicle {@code "vid:lastLon,lastLat,total|..."}.
- * On each event, accumulate Haversine delta from the previous in-window
+ * On each event, accumulate the MEOS geog_distance delta from the previous in-window
* position. On punctuator: emit per-vehicle totals for closed windows.
*/
public class Q6WindowedProcessor implements Processor {
diff --git a/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java b/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java
deleted file mode 100644
index da4efc9..0000000
--- a/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package berlinmod;
-
-/**
- * Distance from a (lon, lat) point to a (lon, lat) line segment, in metres,
- * via a local equirectangular projection centred on the segment midpoint.
- *
- * Pure-Java fallback for {@link MEOSBridge#dwithinSegmentMetres}, used
- * by the BerlinMOD-Q8 streaming scaffold when libmeos is not loadable on
- * the runtime path. The primary point-to-line spatial predicate is
- * {@link MEOSBridge#dwithinSegmentMetres}, which routes through MEOS'
- * {@code geog_dwithin} on a LineString geography when available.
- */
-public final class SegmentDistance {
-
- private static final double EARTH_RADIUS_METRES = 6_371_000.0;
-
- private SegmentDistance() {}
-
- public static double distanceMetres(
- double pLon, double pLat,
- double s1Lon, double s1Lat,
- double s2Lon, double s2Lat) {
- double midLat = (s1Lat + s2Lat) / 2.0;
- double mPerDegLat = Math.toRadians(1.0) * EARTH_RADIUS_METRES;
- double mPerDegLon = mPerDegLat * Math.cos(Math.toRadians(midLat));
-
- double px = pLon * mPerDegLon;
- double py = pLat * mPerDegLat;
- double s1x = s1Lon * mPerDegLon;
- double s1y = s1Lat * mPerDegLat;
- double s2x = s2Lon * mPerDegLon;
- double s2y = s2Lat * mPerDegLat;
-
- double dx = s2x - s1x;
- double dy = s2y - s1y;
- double lenSq = dx * dx + dy * dy;
- if (lenSq == 0.0) {
- return Math.hypot(px - s1x, py - s1y);
- }
- double t = ((px - s1x) * dx + (py - s1y) * dy) / lenSq;
- if (t < 0.0) t = 0.0;
- else if (t > 1.0) t = 1.0;
- double cx = s1x + t * dx;
- double cy = s1y + t * dy;
- return Math.hypot(px - cx, py - cy);
- }
-
- public static boolean withinMetres(
- double pLon, double pLat,
- double s1Lon, double s1Lat,
- double s2Lon, double s2Lat,
- double radiusMetres) {
- return distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) <= radiusMetres;
- }
-}