Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class BerlinMODQ1LocalTest {
private static final long SENTINEL_T2 = T0 + 20_001L;

public static void main(String[] args) {
System.setProperty("mobilitykafka.meos.enabled", "false");
LOG.info("BerlinMODQ1LocalTest starting (TopologyTestDriver, all continuous + snapshot forms)");

Properties props = new Properties();
Expand Down
33 changes: 0 additions & 33 deletions kafka-streams-app/src/main/java/berlinmod/Haversine.java

This file was deleted.

195 changes: 70 additions & 125 deletions kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java
Original file line number Diff line number Diff line change
@@ -1,158 +1,103 @@
package berlinmod;

import functions.functions;
import functions.GeneratedFunctions;
import jnr.ffi.Pointer;
import utils.spatial.PointToSegment;

/**
* Runtime bridge from MobilityKafka BerlinMOD streaming-form predicates to
* MEOS via JMEOS.
* Thin wiring from the BerlinMOD streaming-form predicates to MEOS via JMEOS.
*
* <p>All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold
* flow through this class. When the JMEOS native libmeos shared object is
* present and loadable, each predicate evaluates through MEOS' WGS84
* geography surface ({@code geom_to_geog} + {@code geog_dwithin}). When
* libmeos is not available, each predicate falls back to the corresponding
* pure-Java implementation in {@link Haversine} or {@link SegmentDistance}
* so the BerlinMOD mini-cluster local tests stay runnable on systems
* without a MEOS install.
*
* <p>The fallback is gated by the {@link #MEOS_AVAILABLE} static flag, set
* once at class-load time:
* <ul>
* <li>{@code -Dmobilitykafka.meos.enabled=false} forces the pure-Java path
* even when libmeos is present (used by {@code BerlinMODQ1LocalTest}).
* <li>Otherwise {@code MEOS_AVAILABLE} is {@code true} iff
* {@code functions.meos_initialize()} returns without throwing.
* </ul>
* <p>Every spatial predicate evaluates through MEOS: the within-distance
* predicate is the canonical temporal operator {@code edwithin_tgeo_geo} —
* ever-within between the vehicle's {@code tgeogpoint} instant and the query
* geography, in metres on the WGS84 spheroid; region containment is
* {@code eintersects_tgeo_geo} between the point's {@code tgeompoint} instant
* and the region polygon; distances are {@code geog_distance}. This class holds
* no spatial mathematics of its own: it constructs the MEOS inputs and delegates
* the computation to libmeos, initialising MEOS once per stream thread.
*/
public final class MEOSBridge {

/**
* {@code true} iff MEOS is available on this runtime and the bridge
* routes through it; {@code false} iff the bridge will use the pure-Java
* fallbacks.
*/
public static final boolean MEOS_AVAILABLE;

static {
boolean enabled =
Boolean.parseBoolean(System.getProperty("mobilitykafka.meos.enabled", "true"));
boolean ok = false;
if (enabled) {
try {
functions.meos_initialize();
ok = true;
} catch (Throwable t) {
// libmeos shared object not loadable on this runtime — fall back.
ok = false;
}
}
MEOS_AVAILABLE = ok;
}
private static final ThreadLocal<Boolean> INITIALIZED =
ThreadLocal.withInitial(() -> Boolean.FALSE);

private MEOSBridge() {
// utility
}

// ----------------------------------------------------------------------
// Public bridge surface — same shape as Haversine + SegmentDistance.
// ----------------------------------------------------------------------
private static void ensureInitializedOnThread() {
if (!INITIALIZED.get()) {
GeneratedFunctions.meos_initialize_error_handler((level, code, message) -> { });
GeneratedFunctions.meos_initialize();
INITIALIZED.set(Boolean.TRUE);
}
}

/**
* @return {@code true} if the great-circle distance from {@code (lon1, lat1)}
* to {@code (lon2, lat2)} on the WGS84 spheroid is at most
* {@code radiusMetres}. MEOS-backed via {@code geog_dwithin} when
* available, else pure-Java {@link Haversine#withinMetres}.
*/
/** @return {@code true} iff {@code (lon1, lat1)} is within {@code radiusMetres}
* of {@code (lon2, lat2)} on the WGS84 spheroid, via MEOS {@code edwithin_tgeo_geo}. */
public static boolean dwithinMetres(double lon1, double lat1,
double lon2, double lat2,
double radiusMetres) {
if (!MEOS_AVAILABLE) {
return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
}
Pointer g1 = pointGeog(lon1, lat1);
Pointer g2 = pointGeog(lon2, lat2);
if (g1 == null || g2 == null) {
return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
}
return functions.geog_dwithin(g1, g2, radiusMetres, true);
double lon2, double lat2, double radiusMetres) {
ensureInitializedOnThread();
return GeneratedFunctions.edwithin_tgeo_geo(
tgeogInst(lon1, lat1), pointGeog(lon2, lat2), radiusMetres) == 1;
}

/**
* @return {@code true} if the spheroidal distance from {@code (pLon, pLat)}
* to the LineString {@code (s1, s2)} is at most {@code radiusMetres}.
* MEOS-backed via {@code geog_dwithin} on geographies built from
* the point and line WKTs, else pure-Java
* {@link SegmentDistance#withinMetres}.
*/
/** @return {@code true} iff {@code (pLon, pLat)} is within {@code radiusMetres}
* of the LineString {@code (s1, s2)}, via MEOS {@code edwithin_tgeo_geo}. */
public static boolean dwithinSegmentMetres(double pLon, double pLat,
double s1Lon, double s1Lat,
double s2Lon, double s2Lat,
double radiusMetres) {
if (!MEOS_AVAILABLE) {
return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
}
Pointer pg = pointGeog(pLon, pLat);
Pointer lg = lineGeog(s1Lon, s1Lat, s2Lon, s2Lat);
if (pg == null || lg == null) {
return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
}
return functions.geog_dwithin(pg, lg, radiusMetres, true);
double s2Lon, double s2Lat, double radiusMetres) {
ensureInitializedOnThread();
return GeneratedFunctions.edwithin_tgeo_geo(
tgeogInst(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat), radiusMetres) == 1;
}

/**
* @return the spheroidal distance in metres between two WGS84 points.
* MEOS-backed via {@code utils.spatial.Haversine.distance}
* (which calls MEOS' {@code geog_distance} over two POINT
* geographies) when libmeos is loadable, else pure-Java
* {@link Haversine#distanceMetres}.
*/
public static double distanceMetres(double lon1, double lat1,
double lon2, double lat2) {
if (!MEOS_AVAILABLE) {
return Haversine.distanceMetres(lon1, lat1, lon2, lat2);
}
return utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2);
/** @return {@code true} iff {@code (lon, lat)} lies in the axis-aligned box, via
* MEOS {@code eintersects_tgeo_geo} against the box polygon (planar, SRID 4326). */
public static boolean intersectsBox(double lon, double lat,
double xmin, double ymin, double xmax, double ymax) {
ensureInitializedOnThread();
return GeneratedFunctions.eintersects_tgeo_geo(
tgeomInst(lon, lat), boxPolygon(xmin, ymin, xmax, ymax)) == 1;
}

/**
* @return the spheroidal distance in metres from {@code (pLon, pLat)} to
* the LineString {@code (s1, s2)}. MEOS-backed via
* {@code utils.spatial.PointToSegment.distance} when libmeos is
* loadable, else pure-Java
* {@link SegmentDistance#distanceMetres}.
*/
/** @return the WGS84 spheroidal distance in metres between two points, via MEOS {@code geog_distance}. */
public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) {
ensureInitializedOnThread();
return GeneratedFunctions.geog_distance(pointGeog(lon1, lat1), pointGeog(lon2, lat2));
}

/** @return the WGS84 spheroidal distance in metres from a point to the LineString,
* via MEOS {@code geog_distance}. */
public static double distanceSegmentMetres(double pLon, double pLat,
double s1Lon, double s1Lat,
double s2Lon, double s2Lat) {
if (!MEOS_AVAILABLE) {
return SegmentDistance.distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
}
return PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
double s1Lon, double s1Lat, double s2Lon, double s2Lat) {
ensureInitializedOnThread();
return GeneratedFunctions.geog_distance(
pointGeog(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat));
}

// ----------------------------------------------------------------------
// Internal helpers — WKT → geometry → geography in one MEOS-side step.
// ----------------------------------------------------------------------
private static Pointer tgeogInst(double lon, double lat) {
return GeneratedFunctions.tgeogpoint_in(
String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat));
}

private static Pointer tgeomInst(double lon, double lat) {
return GeneratedFunctions.tgeompoint_in(
String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat));
}

private static Pointer pointGeog(double lon, double lat) {
String wkt = String.format("SRID=4326;Point(%.7f %.7f)", lon, lat);
Pointer g = functions.geom_in(wkt, -1);
if (g == null) {
return null;
}
return functions.geom_to_geog(g);
return GeneratedFunctions.geom_to_geog(
GeneratedFunctions.geom_in(String.format("SRID=4326;Point(%.7f %.7f)", lon, lat), -1));
}

private static Pointer lineGeog(double s1Lon, double s1Lat,
double s2Lon, double s2Lat) {
String wkt = String.format("SRID=4326;LineString(%.7f %.7f, %.7f %.7f)",
s1Lon, s1Lat, s2Lon, s2Lat);
Pointer g = functions.geom_in(wkt, -1);
if (g == null) {
return null;
}
return functions.geom_to_geog(g);
private static Pointer lineGeog(double s1Lon, double s1Lat, double s2Lon, double s2Lat) {
return GeneratedFunctions.geom_to_geog(GeneratedFunctions.geom_in(String.format(
"SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", s1Lon, s1Lat, s2Lon, s2Lat), -1));
}

private static Pointer boxPolygon(double xmin, double ymin, double xmax, double ymax) {
return GeneratedFunctions.geom_in(String.format(
"SRID=4326;Polygon((%.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f))",
xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax, xmin, ymin), -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
* eventTime, near)} per incoming GPS event. Same predicate semantics
* as MobilityFlink's {@code Q3ContinuousFunction}.
*
* <p>Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
* over WGS84 geographies when libmeos is loadable, {@link Haversine}
* fallback otherwise.
* <p>Predicate: {@link MEOS {@code edwithin_tgeo_geo} over WGS84 geographies.
*/
public class Q3ContinuousProcessor implements Processor<Integer, BerlinMODTrip, Integer, Boolean> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* <p>Caller keys the input by a constant so the shared cross-vehicle
* last-known store lives in one subtask. Per event: update last-known.
* Per STREAM_TIME punctuator fire: iterate last-known, evaluate the
* Haversine radius predicate, forward {@code (currentTick, vehicleId)}
* MEOS edwithin_tgeo_geo radius predicate, forward {@code (currentTick, vehicleId)}
* for every near vehicle (sorted by vehicleId).
*/
public class Q3SnapshotProcessor implements Processor<Integer, BerlinMODTrip, Long, Integer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ public void process(Record<Integer, BerlinMODTrip> record) {
}

private boolean inBox(double lon, double lat) {
return lon >= xmin && lon <= xmax && lat >= ymin && lat <= ymax;
return MEOSBridge.intersectsBox(lon, lat, xmin, ymin, xmax, ymax);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public void init(ProcessorContext<Long, String> context) {
public void process(Record<Integer, BerlinMODTrip> record) {
BerlinMODTrip trip = record.value();
if (trip == null || trip.getVehicleId() == -1) return;
boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax
&& trip.getLat() >= ymin && trip.getLat() <= ymax;
boolean curr = MEOSBridge.intersectsBox(trip.getLon(), trip.getLat(), xmin, ymin, xmax, ymax);
Boolean prev = wasInside.get(trip.getVehicleId());
boolean prevInside = prev != null && prev;
if (curr && !prevInside) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public void process(Record<Integer, BerlinMODTrip> record) {
BerlinMODTrip trip = record.value();
if (trip == null || trip.getVehicleId() == -1) return;
long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs;
boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax
&& trip.getLat() >= ymin && trip.getLat() <= ymax;
boolean curr = MEOSBridge.intersectsBox(trip.getLon(), trip.getLat(), xmin, ymin, xmax, ymax);
String s = winState.get(winStart);
// Parse per-vehicle records separated by '|'
StringBuilder rebuilt = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* <p><i>"What is each vehicle's cumulative distance travelled so far?"</i>
*
* <p>Keyed by vehicleId. Per-vehicle state holds the last-known (lon, lat)
* and the running total in metres. On each event, accumulate the Haversine
* and the running total in metres. On each event, accumulate the MEOS geog_distance
* delta and emit the cumulative total.
*
* <p>State value uses a small string encoding "lon,lat,total" since the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* up to T?"</i>
*
* <p>Caller keys the input by a constant. State value "lon,lat,total"
* per vehicleId. Per event: accumulate Haversine delta. Per STREAM_TIME
* per vehicleId. Per event: accumulate the MEOS geog_distance delta. Per STREAM_TIME
* punctuator fire: emit {@code (currentTick, vehicleId, total)} for
* every vehicle (sorted by vehicleId), encoded as "vid:total".
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* during the window."</i>
*
* <p>State encodes per-window per-vehicle {@code "vid:lastLon,lastLat,total|..."}.
* On each event, accumulate Haversine delta from the previous in-window
* On each event, accumulate the MEOS geog_distance delta from the previous in-window
* position. On punctuator: emit per-vehicle totals for closed windows.
*/
public class Q6WindowedProcessor implements Processor<Integer, BerlinMODTrip, Long, String> {
Expand Down
Loading