+```
+
+## Results — real BerlinMOD instants (216 075 events)
+
+| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) |
+|---|---:|---:|---:|---:|
+| Q1-continuous | 216075 | 5 | 2508 | 86,154 |
+| Q1-windowed | 216075 | 86 | 1294 | 166,982 |
+| Q1-snapshot | 216075 | 274 | 1056 | 204,616 |
+| Q2-continuous | 216075 | 61170 | 1074 | 201,187 |
+| Q2-windowed | 216075 | 50 | 1027 | 210,394 |
+| Q2-snapshot | 216075 | 71 | 985 | 219,365 |
+| Q3-continuous | 216075 | 216075 | 2928 | 73,796 |
+| Q3-windowed | 216075 | 86 | 2507 | 86,189 |
+| Q3-snapshot | 216075 | 0 | 926 | 233,342 |
+| Q4-continuous | 216075 | 62 | 3254 | 66,403 |
+| Q4-windowed | 216075 | 98 | 3234 | 66,814 |
+| Q4-snapshot | 216075 | 1944 | 3223 | 67,042 |
+| Q5-continuous | 216075 | 73063 | 9161 | 23,586 |
+| Q5-windowed | 216075 | 6 | 954 | 226,494 |
+| Q5-snapshot | 216075 | 0 | 915 | 236,148 |
+| Q6-continuous | 216075 | 216075 | 2382 | 90,712 |
+| Q6-windowed | 216075 | 203 | 2637 | 81,940 |
+| Q6-snapshot | 216075 | 274 | 2214 | 97,595 |
+| Q7-continuous | 216075 | 5 | 3973 | 54,386 |
+| Q7-windowed | 216075 | 53 | 5004 | 43,180 |
+| Q7-snapshot | 216075 | 288 | 3931 | 54,967 |
+| Q8-continuous | 216075 | 216075 | 2883 | 74,948 |
+| Q8-windowed | 216075 | 86 | 2864 | 75,445 |
+| Q8-snapshot | 216075 | 126 | 928 | 232,839 |
+| Q9-continuous | 216075 | 107870 | 1858 | 116,294 |
+| Q9-windowed | 216075 | 22 | 924 | 233,847 |
+| Q9-snapshot | 216075 | 95 | 992 | 217,818 |
+
+## Parity — streaming continuous form ≡ batch MEOS predicate
+
+The continuous form emits `predicate(event)` for every event, so it is checked
+event-for-event against a batch pass over the same corpus through the same
+`MEOSBridge` call ([`BerlinMODParity`](../src/main/java/berlinmod/BerlinMODParity.java)).
+Both spatial-membership queries match exactly.
+
+| Query | Events | Streaming-true | Batch-true | Mismatches | Parity |
+|---|---:|---:|---:|---:|---|
+| Q3 (within `d` of `P`) | 216075 | 56086 | 56086 | 0 | exact |
+| Q8 (within `d` of segment) | 216075 | 118498 | 118498 | 0 | exact |
+
+## Characteristics
+
+Q5-continuous enumerates every meeting pair across all vehicles on each event
+(O(V²) per event, keyed to a single subtask); it is the lowest throughput of the
+matrix. The snapshot form is a sampled form — it evaluates each vehicle's
+last-known position at tick instants — so a within-`P` snapshot can be empty when
+no vehicle is within `d` of `P` at a tick boundary even though the continuous
+form reports near-`P` events between boundaries.
diff --git a/flink-processor/docs/parity-status.md b/flink-processor/docs/parity-status.md
new file mode 100644
index 0000000..dd519ee
--- /dev/null
+++ b/flink-processor/docs/parity-status.md
@@ -0,0 +1,46 @@
+# MobilityFlink parity status — MEOS surface audit
+
+Generated 2026-05-29 by `tools/parity/parity_audit.py`.
+
+The MobilityFlink MEOS facade (`org.mobilitydb.flink.meos.MeosOps*`) exposes MEOS C functions to Flink through JMEOS. This audit measures, per type family, the share of the **MEOS public C API** that the facade exposes and that JMEOS binds.
+
+**Headline.** The facade exposes **2296 of 2296 public MEOS functions (100.0%)**. The MEOS public surface (`meos/include/meos*.h`, excluding internal headers) is 2297 functions; JMEOS binds 2296 of them. 0 bindable functions are not exposed (listed in §3).
+
+Coverage is **static**: a function counts as covered when the facade declares a method of the same name and arity that delegates to a JMEOS export.
+
+Per-family runtime behaviour is asserted by `src/test/java/org/mobilitydb/flink/meos/MeosFacadeSmokeTest.java`, which constructs and reads back a value in the core, geo, cbuffer, npoint and pose families through the facade against libmeos. The cbuffer, npoint and pose families require a libmeos built with the extended modules (`-DCBUFFER=ON -DNPOINT=ON -DPOSE=ON -DRGEO=ON`); the stock library carries the core and geo surfaces only.
+
+## 1. Reference surface and method
+
+- **Denominator**: distinct function names declared `extern` in the MEOS public headers `meos.h`, `meos_geo.h`, `meos_cbuffer.h`, `meos_npoint.h`, `meos_pose.h`, `meos_rgeo.h`. Internal headers (`meos_internal*.h`) are excluded.
+
+- **Numerator**: `public static` methods on the generated `MeosOps*` facade whose name is also a `functions.GeneratedFunctions` export in the bundled JMEOS jar.
+
+- **JMEOS jar**: jar/JMEOS.jar exports 2916 static methods.
+
+## 2. Per-family coverage of the public MEOS surface
+
+| Family (header) | Public ∩ JMEOS | Exposed | Missing | Coverage |
+|---|---:|---:|---:|---:|
+| core temporal / set / span / spanset / tbox (`meos.h`) | 1343 | 1343 | 0 | 100.0% |
+| geo (tgeo / tpoint / stbox) (`meos_geo.h`) | 421 | 421 | 0 | 100.0% |
+| cbuffer (`meos_cbuffer.h`) | 175 | 175 | 0 | 100.0% |
+| npoint (`meos_npoint.h`) | 119 | 119 | 0 | 100.0% |
+| pose (`meos_pose.h`) | 101 | 101 | 0 | 100.0% |
+| rgeo (`meos_rgeo.h`) | 68 | 68 | 0 | 100.0% |
+| h3 / th3index (`meos_h3.h`) | 69 | 69 | 0 | 100.0% |
+| **total** | **2296** | **2296** | **0** | **100.0%** |
+
+## 3. Bindable MEOS functions not exposed by the facade
+
+0 functions are present in the public MEOS headers and bound by JMEOS but not generated into the facade:
+
+
+## 4. MobilityDB SQL-surface cross-check
+
+The facade is also matched against the underlying MEOS C symbol of each addressable `CREATE FUNCTION` in `mobilitydb/sql/**/*.in.sql` (PG-only sections and helper symbols bucketed out; 876 out-of-scope, 113 SQL/plpgsql-composed functions with no single C symbol). Functions the SQL layer implements through the internal MEOS headers (`meos_internal*.h`) are exposed via `MeosOpsSqlSurface`.
+
+- Addressable distinct C symbols: **1336**; bound by JMEOS: **1068**; exposed by the facade: **1068** (100.0% of the JMEOS-bindable SQL surface).
+
+- The remaining **268** addressable C symbols are not exported by JMEOS under the name the SQL layer's extension wrapper uses; the wrapper names differ from the MEOS function names they call.
+
diff --git a/flink-processor/jar/JMEOS.jar b/flink-processor/jar/JMEOS.jar
index 3c22044..b970140 100644
Binary files a/flink-processor/jar/JMEOS.jar and b/flink-processor/jar/JMEOS.jar differ
diff --git a/flink-processor/pom.xml b/flink-processor/pom.xml
index f4c288f..99ae9d0 100755
--- a/flink-processor/pom.xml
+++ b/flink-processor/pom.xml
@@ -216,4 +216,318 @@
+
+
+
+
+ cbuffer-exclude-unset
+
+ !CBUFFER
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTCbuffer.java
+ **/meos/MeosOpsFreeCbuffer.java
+ **/meos/MeosOpsCbufferSet.java
+
+
+ **/MeosCbufferSmokeTest.java
+
+
+
+
+
+
+
+ cbuffer-exclude-off
+
+ CBUFFER OFF
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTCbuffer.java
+ **/meos/MeosOpsFreeCbuffer.java
+ **/meos/MeosOpsCbufferSet.java
+
+
+ **/MeosCbufferSmokeTest.java
+
+
+
+
+
+
+
+ cbuffer-exclude-zero
+
+ CBUFFER 0
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTCbuffer.java
+ **/meos/MeosOpsFreeCbuffer.java
+ **/meos/MeosOpsCbufferSet.java
+
+
+ **/MeosCbufferSmokeTest.java
+
+
+
+
+
+
+
+ pose-exclude-unset
+
+ !POSE
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTPose.java
+ **/meos/MeosOpsFreePose.java
+ **/meos/MeosOpsPoseSet.java
+
+
+ **/MeosPoseSmokeTest.java
+
+
+
+
+
+
+
+ pose-exclude-off
+
+ POSE OFF
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTPose.java
+ **/meos/MeosOpsFreePose.java
+ **/meos/MeosOpsPoseSet.java
+
+
+ **/MeosPoseSmokeTest.java
+
+
+
+
+
+
+
+ pose-exclude-zero
+
+ POSE 0
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTPose.java
+ **/meos/MeosOpsFreePose.java
+ **/meos/MeosOpsPoseSet.java
+
+
+ **/MeosPoseSmokeTest.java
+
+
+
+
+
+
+
+ rgeo-exclude-unset
+
+ !RGEO
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTRGeometry.java
+ **/meos/MeosOpsTRGeometryInst.java
+ **/meos/MeosOpsFreeRgeo.java
+
+
+
+
+
+
+
+ rgeo-exclude-off
+
+ RGEO OFF
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTRGeometry.java
+ **/meos/MeosOpsTRGeometryInst.java
+ **/meos/MeosOpsFreeRgeo.java
+
+
+
+
+
+
+
+ rgeo-exclude-zero
+
+ RGEO 0
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTRGeometry.java
+ **/meos/MeosOpsTRGeometryInst.java
+ **/meos/MeosOpsFreeRgeo.java
+
+
+
+
+
+
+
+ h3-exclude-unset
+
+ !H3
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTh3index.java
+ **/meos/MeosOpsFreeH3.java
+
+
+
+
+
+
+
+ h3-exclude-off
+
+ H3 OFF
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTh3index.java
+ **/meos/MeosOpsFreeH3.java
+
+
+
+
+
+
+
+ h3-exclude-zero
+
+ H3 0
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTh3index.java
+ **/meos/MeosOpsFreeH3.java
+
+
+
+
+
+
+
+ npoint-exclude-off
+
+ NPOINT OFF
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTNpoint.java
+ **/meos/MeosOpsTNpointInst.java
+ **/meos/MeosOpsFreeNpoint.java
+ **/meos/MeosOpsNpointSet.java
+
+
+ **/MeosNpointSmokeTest.java
+
+
+
+
+
+
+
+ npoint-exclude-zero
+
+ NPOINT 0
+
+
+
+
+ maven-compiler-plugin
+
+
+ **/meos/MeosOpsTNpoint.java
+ **/meos/MeosOpsTNpointInst.java
+ **/meos/MeosOpsFreeNpoint.java
+ **/meos/MeosOpsNpointSet.java
+
+
+ **/MeosNpointSmokeTest.java
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-processor/src/main/java/aisdata/Main.java b/flink-processor/src/main/java/aisdata/Main.java
index 148caa0..5a62273 100644
--- a/flink-processor/src/main/java/aisdata/Main.java
+++ b/flink-processor/src/main/java/aisdata/Main.java
@@ -56,7 +56,9 @@ public static void main(String[] args) throws Exception {
// Initialize MEOS with proper error handling
try {
logger.info("Initializing MEOS library");
- functions.meos_initialize("UTC", errorHandler);
+ // JMEOS 1.4 split: no-arg meos_initialize() + separate tz + error-handler entry points
+ functions.meos_initialize();
+ functions.meos_initialize_timezone("UTC");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java b/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java
index e047143..d5e1adb 100644
--- a/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java
+++ b/flink-processor/src/main/java/aisdata/TrajectoryWindowFunction.java
@@ -31,7 +31,9 @@ public class TrajectoryWindowFunction extends
public void open(Configuration parameters) throws Exception {
super.open(parameters);
errorHandler = new error_handler(); // Initialize error handler here
- functions.meos_initialize("UTC", errorHandler);
+ // JMEOS 1.4 split: no-arg meos_initialize() + separate tz + error-handler entry points
+ functions.meos_initialize();
+ functions.meos_initialize_timezone("UTC");
logger.info("MEOS initialized in TrajectoryWindowFunction.open()");
}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODBenchmark.java b/flink-processor/src/main/java/berlinmod/BerlinMODBenchmark.java
new file mode 100644
index 0000000..3bd3a3a
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODBenchmark.java
@@ -0,0 +1,153 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
+
+/**
+ * Throughput benchmark for the BerlinMOD-9 × 3-form streaming matrix.
+ *
+ * Runs all 27 cells (9 queries × {continuous, windowed, snapshot} = 27 cells)
+ * on the Flink local mini-cluster, with the spatial predicates evaluating
+ * through MEOS (see {@link MEOSBridge}). Each cell runs its own job terminated by
+ * a counting sink; the harness records input events, output rows, wall-clock,
+ * and throughput (events per second), then prints a Markdown results table.
+ *
+ *
The corpus is either the real BerlinMOD instants ({@code --csv }) or a
+ * deterministic synthetic corpus ({@code --vehicles}/{@code --events}); the
+ * per-query parameters and the window/tick granularity are derived from the
+ * corpus by {@link BerlinMODCorpus}.
+ *
+ * Usage (from {@code flink-processor/}, with an extended libmeos on the
+ * loader path and the Flink-on-Java-21 {@code --add-opens} flags):
+ *
+ * java … berlinmod.BerlinMODBenchmark --csv <berlinmod_instants.csv> [--max N] [--only Q3]
+ * java … berlinmod.BerlinMODBenchmark --vehicles 50 --events 600 [--only continuous]
+ *
+ */
+public final class BerlinMODBenchmark {
+
+ private static final ConcurrentHashMap COUNTS = new ConcurrentHashMap<>();
+
+ private BerlinMODBenchmark() { /* utility */ }
+
+ public static void main(String[] args) throws Exception {
+ String csv = null, only = null;
+ int maxRows = 0, vehicles = 50, events = 600;
+ for (int i = 0; i < args.length; i++) {
+ switch (args[i]) {
+ case "--csv": csv = args[++i]; break;
+ case "--max": maxRows = Integer.parseInt(args[++i]); break;
+ case "--vehicles": vehicles = Integer.parseInt(args[++i]); break;
+ case "--events": events = Integer.parseInt(args[++i]); break;
+ case "--only": only = args[++i]; break;
+ default: break;
+ }
+ }
+
+ List corpus = csv != null
+ ? BerlinMODCorpus.fromInstantsCsv(csv, maxRows)
+ : BerlinMODCorpus.synthetic(vehicles, events);
+ int n = corpus.size();
+ BerlinMODCorpus.Params p = BerlinMODCorpus.derive(corpus);
+ System.out.printf("Corpus: %s, %d events; window=%ds tick=%dms; P=(%.5f,%.5f) targets=%d/%d/%d%n",
+ csv != null ? "real BerlinMOD instants" : "synthetic",
+ n, p.windowSeconds, p.snapshotTickMillis, p.pLon, p.pLat, p.targetId, p.xId, p.yId);
+
+ Map, DataStream>>> cells = new LinkedHashMap<>();
+ cells.put("Q1-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q1ContinuousFunction()));
+ cells.put("Q1-windowed", t -> t.windowAll(tumble(p)).process(new Q1WindowedFunction()));
+ cells.put("Q1-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q1SnapshotFunction(p.snapshotTickMillis)));
+ cells.put("Q2-continuous", t -> t.process(new Q2ContinuousFunction(p.targetId)));
+ cells.put("Q2-windowed", t -> t.windowAll(tumble(p)).process(new Q2WindowedFunction(p.targetId)));
+ cells.put("Q2-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q2SnapshotFunction(p.targetId, p.snapshotTickMillis)));
+ cells.put("Q3-continuous", t -> t.process(new Q3ContinuousFunction(p.pLon, p.pLat, p.radiusMetres)));
+ cells.put("Q3-windowed", t -> t.windowAll(tumble(p)).process(new Q3WindowedFunction(p.pLon, p.pLat, p.radiusMetres)));
+ cells.put("Q3-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q3SnapshotFunction(p.pLon, p.pLat, p.radiusMetres, p.snapshotTickMillis)));
+ cells.put("Q4-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q4ContinuousFunction(p.xmin, p.ymin, p.xmax, p.ymax)));
+ cells.put("Q4-windowed", t -> t.windowAll(tumble(p)).process(new Q4WindowedFunction(p.xmin, p.ymin, p.xmax, p.ymax)));
+ cells.put("Q4-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q4SnapshotFunction(p.xmin, p.ymin, p.xmax, p.ymax, p.snapshotTickMillis)));
+ cells.put("Q5-continuous", t -> t.keyBy(x -> 0).process(new Q5ContinuousFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres)));
+ cells.put("Q5-windowed", t -> t.windowAll(tumble(p)).process(new Q5WindowedFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres)));
+ cells.put("Q5-snapshot", t -> t.keyBy(x -> 0).process(new Q5SnapshotFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres, p.snapshotTickMillis)));
+ cells.put("Q6-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q6ContinuousFunction()));
+ cells.put("Q6-windowed", t -> t.keyBy(BerlinMODTrip::getVehicleId).window(tumble(p)).process(new Q6WindowedFunction()));
+ cells.put("Q6-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q6SnapshotFunction(p.snapshotTickMillis)));
+ cells.put("Q7-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q7ContinuousFunction(p.pois)));
+ cells.put("Q7-windowed", t -> t.windowAll(tumble(p)).process(new Q7WindowedFunction(p.pois)));
+ cells.put("Q7-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q7SnapshotFunction(p.pois, p.snapshotTickMillis)));
+ cells.put("Q8-continuous", t -> t.process(new Q8ContinuousFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres)));
+ cells.put("Q8-windowed", t -> t.windowAll(tumble(p)).process(new Q8WindowedFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres)));
+ cells.put("Q8-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q8SnapshotFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres, p.snapshotTickMillis)));
+ cells.put("Q9-continuous", t -> t.keyBy(x -> 0).process(new Q9ContinuousFunction(p.xId, p.yId)));
+ cells.put("Q9-windowed", t -> t.windowAll(tumble(p)).process(new Q9WindowedFunction(p.xId, p.yId)));
+ cells.put("Q9-snapshot", t -> t.keyBy(x -> 0).process(new Q9SnapshotFunction(p.xId, p.yId, p.snapshotTickMillis)));
+
+ List rows = new ArrayList<>();
+ for (Map.Entry, DataStream>>> cell : cells.entrySet()) {
+ if (only != null && !cell.getKey().contains(only)) {
+ continue;
+ }
+ long[] r = runCell(cell.getKey(), cell.getValue(), corpus);
+ double secs = r[1] / 1000.0;
+ double tput = secs > 0 ? n / secs : 0;
+ rows.add(new String[]{cell.getKey(), String.valueOf(n), String.valueOf(r[0]),
+ String.valueOf(r[1]), String.format("%,.0f", tput)});
+ System.out.printf(" %-14s out=%-8d %6d ms %,.0f ev/s%n", cell.getKey(), r[0], r[1], tput);
+ }
+
+ System.out.println();
+ System.out.println("| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) |");
+ System.out.println("|---|---:|---:|---:|---:|");
+ for (String[] r : rows) {
+ System.out.printf("| %s | %s | %s | %s | %s |%n", r[0], r[1], r[2], r[3], r[4]);
+ }
+ }
+
+ /** @return {outputRows, wallMillis} for one cell. */
+ private static long[] runCell(String name,
+ Function, DataStream>> wiring,
+ List corpus) throws Exception {
+ COUNTS.put(name, new LongAdder());
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ DataStream trips = env.fromCollection(corpus)
+ .assignTimestampsAndWatermarks(WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, ts) -> e.getTimestamp()));
+ @SuppressWarnings("unchecked")
+ DataStream out = (DataStream) wiring.apply(trips);
+ out.addSink(new CountingSink(name));
+ long t0 = System.nanoTime();
+ env.execute(name);
+ long wall = (System.nanoTime() - t0) / 1_000_000L;
+ return new long[]{COUNTS.get(name).sum(), wall};
+ }
+
+ private static TumblingEventTimeWindows tumble(BerlinMODCorpus.Params p) {
+ return TumblingEventTimeWindows.of(Time.seconds(p.windowSeconds));
+ }
+
+ /** Counts records into the shared per-cell {@link LongAdder}. */
+ private static final class CountingSink extends RichSinkFunction {
+ private final String cell;
+ CountingSink(String cell) { this.cell = cell; }
+ @Override public void open(Configuration cfg) { }
+ @Override public void invoke(Object value, Context context) {
+ COUNTS.get(cell).increment();
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODCorpus.java b/flink-processor/src/main/java/berlinmod/BerlinMODCorpus.java
new file mode 100644
index 0000000..f2f9f24
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODCorpus.java
@@ -0,0 +1,156 @@
+package berlinmod;
+
+import functions.GeneratedFunctions;
+import jnr.ffi.Pointer;
+import org.mobilitydb.flink.meos.wirings.MeosWiringRuntime;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.stream.Stream;
+
+/**
+ * Corpus loader and query-parameter derivation for the BerlinMOD streaming
+ * benchmark.
+ *
+ * Supplies either a deterministic synthetic corpus or the real BerlinMOD
+ * instants corpus read from the {@code berlinmod_instants.csv} produced by the
+ * BerlinMOD generator. Real instants are stored in EPSG:3857; they are
+ * reprojected to EPSG:4326 through MEOS {@code geo_transform} at load — the
+ * loader holds no projection mathematics of its own.
+ *
+ *
{@link Params} fixes the per-query parameters from the corpus itself (its
+ * centroid, bounding box, vehicle ids, and time span) so every spatial cell is
+ * selective and the windowing granularity yields a comparable number of windows
+ * regardless of the corpus time span.
+ */
+public final class BerlinMODCorpus {
+
+ private static final DateTimeFormatter TS = new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd HH:mm:ss")
+ .optionalStart().appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true).optionalEnd()
+ .appendOffset("+HH", "Z")
+ .toFormatter();
+
+ private BerlinMODCorpus() { /* utility */ }
+
+ /** Query parameters derived from a corpus. */
+ public static final class Params {
+ public final double pLon, pLat, radiusMetres, dMeetMetres;
+ public final double xmin, ymin, xmax, ymax;
+ public final double s1Lon, s1Lat, s2Lon, s2Lat;
+ public final List pois;
+ public final int targetId, xId, yId;
+ public final long windowSeconds, snapshotTickMillis;
+
+ Params(double pLon, double pLat, double radiusMetres, double dMeetMetres,
+ double xmin, double ymin, double xmax, double ymax,
+ double s1Lon, double s1Lat, double s2Lon, double s2Lat,
+ List pois, int targetId, int xId, int yId,
+ long windowSeconds, long snapshotTickMillis) {
+ this.pLon = pLon; this.pLat = pLat; this.radiusMetres = radiusMetres; this.dMeetMetres = dMeetMetres;
+ this.xmin = xmin; this.ymin = ymin; this.xmax = xmax; this.ymax = ymax;
+ this.s1Lon = s1Lon; this.s1Lat = s1Lat; this.s2Lon = s2Lon; this.s2Lat = s2Lat;
+ this.pois = pois; this.targetId = targetId; this.xId = xId; this.yId = yId;
+ this.windowSeconds = windowSeconds; this.snapshotTickMillis = snapshotTickMillis;
+ }
+ }
+
+ /** Deterministic synthetic corpus: vehicles on a disc around Brussels centre,
+ * drifting per event, with monotonically increasing timestamps. */
+ public static List synthetic(int vehicles, int perVehicle) {
+ final double centreLon = 4.3517, centreLat = 50.8503, spread = 0.12;
+ final long t0 = 1_735_711_200_000L, spanMillis = 600_000L;
+ int total = vehicles * perVehicle;
+ long step = Math.max(1L, spanMillis / total);
+ List events = new ArrayList<>(total);
+ long g = 0;
+ for (int e = 0; e < perVehicle; e++) {
+ for (int v = 0; v < vehicles; v++) {
+ double ang = (v * 2.399963) % (2 * Math.PI);
+ double rad = spread * ((v % 17) / 17.0);
+ double drift = 0.0005 * Math.sin((e + v) * 0.13);
+ events.add(make(100 + v, t0 + g * step,
+ centreLon + rad * Math.cos(ang) + drift,
+ centreLat + rad * Math.sin(ang) + drift));
+ g++;
+ }
+ }
+ return events;
+ }
+
+ /** Real BerlinMOD instants from {@code berlinmod_instants.csv}
+ * (columns {@code tripid,vehid,day,seqno,geom,t}), reprojected 3857→4326
+ * through MEOS, sorted by timestamp. {@code maxRows <= 0} loads all rows. */
+ public static List fromInstantsCsv(String path, int maxRows) throws Exception {
+ MeosWiringRuntime.ensureInitializedOnThread();
+ List events = new ArrayList<>();
+ try (Stream lines = Files.lines(Paths.get(path))) {
+ java.util.Iterator it = lines.iterator();
+ if (it.hasNext()) {
+ it.next(); // header
+ }
+ while (it.hasNext() && (maxRows <= 0 || events.size() < maxRows)) {
+ String[] f = it.next().split(",");
+ int vid = Integer.parseInt(f[1].trim());
+ long ms = OffsetDateTime.parse(f[5].trim(), TS).toInstant().toEpochMilli();
+ Pointer g4326 = GeneratedFunctions.geo_transform(
+ GeneratedFunctions.geom_in(f[4].trim(), -1), 4326);
+ String txt = GeneratedFunctions.geo_as_text(g4326, 7); // POINT(lon lat)
+ String[] xy = txt.substring(txt.indexOf('(') + 1, txt.indexOf(')')).trim().split("\\s+");
+ events.add(make(vid, ms, Double.parseDouble(xy[0]), Double.parseDouble(xy[1])));
+ }
+ }
+ events.sort((a, b) -> Long.compare(a.getTimestamp(), b.getTimestamp()));
+ return events;
+ }
+
+ /** Derive selective per-query parameters and a window/tick granularity that
+ * yields ~200 windows over the corpus time span. */
+ public static Params derive(List corpus) {
+ double sumLon = 0, sumLat = 0, minLon = Double.MAX_VALUE, minLat = Double.MAX_VALUE,
+ maxLon = -Double.MAX_VALUE, maxLat = -Double.MAX_VALUE, minT = Double.MAX_VALUE, maxT = -Double.MAX_VALUE;
+ TreeSet ids = new TreeSet<>();
+ for (BerlinMODTrip t : corpus) {
+ sumLon += t.getLon(); sumLat += t.getLat();
+ minLon = Math.min(minLon, t.getLon()); maxLon = Math.max(maxLon, t.getLon());
+ minLat = Math.min(minLat, t.getLat()); maxLat = Math.max(maxLat, t.getLat());
+ minT = Math.min(minT, t.getTimestamp()); maxT = Math.max(maxT, t.getTimestamp());
+ ids.add(t.getVehicleId());
+ }
+ int n = corpus.size();
+ double cLon = sumLon / n, cLat = sumLat / n;
+ double exLon = maxLon - minLon, exLat = maxLat - minLat;
+ List idList = new ArrayList<>(ids);
+ int targetId = idList.get(idList.size() / 2);
+ int xId = idList.get(0);
+ int yId = idList.get(Math.min(idList.size() - 1, idList.size() / 2));
+ long span = (long) (maxT - minT);
+ long windowSeconds = Math.max(1L, span / 1000 / 200);
+ long tickMillis = Math.max(1000L, windowSeconds * 1000L / 2);
+ List pois = Arrays.asList(
+ new PointOfInterest(1, cLon, cLat, 2_000.0),
+ new PointOfInterest(2, cLon + 0.1 * exLon, cLat + 0.1 * exLat, 1_000.0),
+ new PointOfInterest(3, cLon - 0.1 * exLon, cLat - 0.1 * exLat, 2_000.0));
+ return new Params(cLon, cLat, 5_000.0, 5_000.0,
+ cLon - 0.25 * exLon, cLat - 0.25 * exLat, cLon + 0.25 * exLon, cLat + 0.25 * exLat,
+ minLon + 0.25 * exLon, cLat, maxLon - 0.25 * exLon, cLat,
+ pois, targetId, xId, yId, windowSeconds, tickMillis);
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODDeserializationSchema.java b/flink-processor/src/main/java/berlinmod/BerlinMODDeserializationSchema.java
new file mode 100644
index 0000000..a45b6d1
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODDeserializationSchema.java
@@ -0,0 +1,64 @@
+package berlinmod;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * JSON → {@link BerlinMODTrip} deserializer for the Kafka {@code berlinmod} topic.
+ *
+ * Expected JSON shape per record:
+ *
+ * { "t": "2007-05-28 06:00:00", "vehicle_id": 42, "lon": 4.36, "lat": 50.84 }
+ *
+ *
+ * The timestamp format is the same {@code yyyy-MM-dd HH:mm:ss} the BerlinMOD
+ * generator emits in {@code generate_berlinmod_trips.sql}; we parse it as UTC
+ * to match the AIS pipeline's event-time convention.
+ */
+public class BerlinMODDeserializationSchema implements DeserializationSchema {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final DateTimeFormatter TS_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ public BerlinMODDeserializationSchema() {
+ OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Override
+ public BerlinMODTrip deserialize(byte[] message) throws IOException {
+ JsonNode node = OBJECT_MAPPER.readTree(message);
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setTimestamp(parseTimestamp(node.get("t").asText()));
+ trip.setVehicleId(node.get("vehicle_id").asInt());
+ trip.setLon(node.get("lon").asDouble());
+ trip.setLat(node.get("lat").asDouble());
+ return trip;
+ }
+
+ private long parseTimestamp(String s) {
+ LocalDateTime dt = LocalDateTime.parse(s, TS_FORMATTER);
+ return dt.atZone(ZoneId.of("UTC")).toInstant().toEpochMilli();
+ }
+
+ @Override
+ public boolean isEndOfStream(BerlinMODTrip nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeExtractor.getForClass(BerlinMODTrip.class);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODParity.java b/flink-processor/src/main/java/berlinmod/BerlinMODParity.java
new file mode 100644
index 0000000..60ac61f
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODParity.java
@@ -0,0 +1,129 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiPredicate;
+
+/**
+ * Snapshot-form parity check for the BerlinMOD streaming benchmark.
+ *
+ * The streaming parity contract is that a streaming query computes the same
+ * result as a batch evaluation of the same MEOS predicate. This driver verifies
+ * it on the continuous form, which is timing-independent: the continuous form
+ * emits {@code predicate(event)} for every event, and a batch pass over the same
+ * corpus computes {@code predicate(event)} directly through the same
+ * {@link MEOSBridge} call. The two must agree event-for-event.
+ *
+ *
Checked queries: Q3 (within {@code d} of point {@code P}, MEOS
+ * {@code edwithin_tgeo_geo}) and Q8 (within {@code d} of a road segment, MEOS
+ * {@code edwithin_tgeo_geo} against a line). The corpus and parameters are the
+ * same as {@link BerlinMODBenchmark}.
+ *
+ *
+ * java … berlinmod.BerlinMODParity --csv <berlinmod_instants.csv> [--max N]
+ * java … berlinmod.BerlinMODParity --vehicles 50 --events 600
+ *
+ */
+public final class BerlinMODParity {
+
+ // Continuous-form outputs in arrival order (parallelism 1, no keyBy → stream
+ // order equals corpus order), so element i corresponds to corpus event i.
+ private static final ConcurrentLinkedQueue STREAMED = new ConcurrentLinkedQueue<>();
+
+ private BerlinMODParity() { /* utility */ }
+
+ public static void main(String[] args) throws Exception {
+ String csv = null;
+ int maxRows = 0, vehicles = 50, events = 600;
+ for (int i = 0; i < args.length; i++) {
+ switch (args[i]) {
+ case "--csv": csv = args[++i]; break;
+ case "--max": maxRows = Integer.parseInt(args[++i]); break;
+ case "--vehicles": vehicles = Integer.parseInt(args[++i]); break;
+ case "--events": events = Integer.parseInt(args[++i]); break;
+ default: break;
+ }
+ }
+ List corpus = csv != null
+ ? BerlinMODCorpus.fromInstantsCsv(csv, maxRows)
+ : BerlinMODCorpus.synthetic(vehicles, events);
+ BerlinMODCorpus.Params p = BerlinMODCorpus.derive(corpus);
+ System.out.printf("Corpus: %s, %d events; P=(%.5f,%.5f) r=%.0fm%n",
+ csv != null ? "real BerlinMOD instants" : "synthetic", corpus.size(),
+ p.pLon, p.pLat, p.radiusMetres);
+
+ MeosWiringInit();
+ BiPredicate q3 = (lon, lat) ->
+ MEOSBridge.dwithinMetres(lon, lat, p.pLon, p.pLat, p.radiusMetres);
+ BiPredicate q8 = (lon, lat) ->
+ MEOSBridge.dwithinSegmentMetres(lon, lat, p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres);
+
+ List rows = new ArrayList<>();
+ rows.add(check("Q3", corpus,
+ t -> t.process(new Q3ContinuousFunction(p.pLon, p.pLat, p.radiusMetres)), q3));
+ rows.add(check("Q8", corpus,
+ t -> t.process(new Q8ContinuousFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres)), q8));
+
+ System.out.println();
+ System.out.println("| Query | Events | Streaming-true | Batch-true | Mismatches | Parity |");
+ System.out.println("|---|---:|---:|---:|---:|---|");
+ for (String[] r : rows) {
+ System.out.printf("| %s | %s | %s | %s | %s | %s |%n", r[0], r[1], r[2], r[3], r[4], r[5]);
+ }
+ }
+
+ private static String[] check(String query, List corpus,
+ java.util.function.Function, DataStream>> wiring,
+ BiPredicate batch) throws Exception {
+ STREAMED.clear();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ DataStream trips = env.fromCollection(corpus)
+ .assignTimestampsAndWatermarks(WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, ts) -> e.getTimestamp()));
+ wiring.apply(trips).addSink(new CollectSink());
+ env.execute("parity-" + query);
+
+ Boolean[] streamed = STREAMED.toArray(new Boolean[0]);
+ long streamingTrue = 0, batchTrue = 0, mismatches = 0;
+ for (int i = 0; i < corpus.size(); i++) {
+ boolean expected = batch.test(corpus.get(i).getLon(), corpus.get(i).getLat());
+ if (expected) {
+ batchTrue++;
+ }
+ if (i < streamed.length && streamed[i]) {
+ streamingTrue++;
+ }
+ if (i >= streamed.length || streamed[i].booleanValue() != expected) {
+ mismatches++;
+ }
+ }
+ boolean parity = mismatches == 0 && streamed.length == corpus.size();
+ System.out.printf(" %s: events=%d streaming-out=%d streaming-true=%d batch-true=%d mismatches=%d parity=%s%n",
+ query, corpus.size(), streamed.length, streamingTrue, batchTrue, mismatches, parity ? "YES" : "NO");
+ return new String[]{query, String.valueOf(corpus.size()), String.valueOf(streamingTrue),
+ String.valueOf(batchTrue), String.valueOf(mismatches), parity ? "exact" : "MISMATCH"};
+ }
+
+ private static void MeosWiringInit() {
+ org.mobilitydb.flink.meos.wirings.MeosWiringRuntime.ensureInitializedOnThread();
+ }
+
+ /** Records each continuous-form output's {@code near} flag in arrival order. */
+ private static final class CollectSink extends RichSinkFunction> {
+ @Override public void open(Configuration cfg) { }
+ @Override public void invoke(Tuple3 v, Context context) {
+ STREAMED.add(v.f2);
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
new file mode 100644
index 0000000..9218e16
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
@@ -0,0 +1,95 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q1 three streaming forms.
+ *
+ * Same 21-event synthetic corpus as Q2/Q3 local tests. Q1 has no spatial
+ * predicate and no per-event filter parameter — it simply enumerates vehicles
+ * seen.
+ *
+ *
Expected output:
+ *
+ * Q1-continuous : 3 lines, one per distinct vehicle (firstSeenTime)
+ * Q1-windowed : 2 windows, each with distinctCount=3
+ * Q1-snapshot : 9 lines (3 ticks × 3 vehicles all seen by source-close)
+ *
+ */
+public class BerlinMODQ1LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ1LocalTest.class);
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ1LocalTest starting; window={}s tick={}ms",
+ WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q1ContinuousFunction());
+ cont.print("Q1-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q1WindowedFunction());
+ win.print("Q1-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q1SnapshotFunction(SNAPSHOT_TICK_MILLIS));
+ snap.print("Q1-snapshot");
+
+ env.execute("BerlinMODQ1LocalTest");
+ LOG.info("BerlinMODQ1LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java
new file mode 100644
index 0000000..84950c9
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java
@@ -0,0 +1,104 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q2 three streaming forms.
+ *
+ * Same structural shape as {@link BerlinMODQ3LocalTest} but exercises Q2 ("where is vehicle X
+ * at time T?") with {@code X = 200} (the Anderlecht vehicle). Same 3-vehicle /
+ * 21-event synthetic corpus.
+ *
+ *
Expected output shape (with {@code X = 200}):
+ *
+ * Q2-continuous : 7 events (the 7 vehicle-200 events; vehicles 100 and 300 filtered out)
+ * Q2-windowed : 2 windows of size 10 s, each emitting the last vehicle-200 position seen in the window
+ * Q2-snapshot : 3 ticks × 1 emission each = 3 lines (vehicle 200's last-known position at each 5 s tick)
+ *
+ *
+ * Run after {@code mvn package} with:
+ *
+ * java -cp target/flink-kafka2postgres-1.0-SNAPSHOT.jar berlinmod.BerlinMODQ2LocalTest
+ *
+ */
+public class BerlinMODQ2LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ2LocalTest.class);
+
+ private static final int TARGET_VEHICLE_ID = 200;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ2LocalTest starting; X={} window={}s tick={}ms",
+ TARGET_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1); // deterministic output ordering
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream cont = trips
+ .process(new Q2ContinuousFunction(TARGET_VEHICLE_ID));
+ cont.map(t -> String.format("v=%d t=%d (%.4f,%.4f)",
+ t.getVehicleId(), t.getTimestamp(), t.getLon(), t.getLat()))
+ .print("Q2-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q2WindowedFunction(TARGET_VEHICLE_ID));
+ win.print("Q2-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q2SnapshotFunction(TARGET_VEHICLE_ID, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q2-snapshot");
+
+ env.execute("BerlinMODQ2LocalTest");
+ LOG.info("BerlinMODQ2LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ // Same synthetic corpus as Q3LocalTest, so any user can run both and
+ // see them work over identical inputs.
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java
new file mode 100644
index 0000000..c5b9220
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java
@@ -0,0 +1,108 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/**
+ * Entry point for the BerlinMOD-Q2 scaffold on MobilityFlink.
+ *
+ * Runs the three streaming forms of BerlinMOD-Q2 ("where is vehicle X at
+ * time T?") side by side over the same Kafka input topic {@code berlinmod}:
+ *
+ * {@link Q2ContinuousFunction} — emit every event of vehicle X as it arrives
+ * {@link Q2WindowedFunction} — last-known (lon, lat) of vehicle X per N-second tumbling window
+ * {@link Q2SnapshotFunction} — vehicle X's last-known (lon, lat) at each watermark tick;
+ * the parity-oracle form (≡ batch BerlinMOD-Q2 at the same scale factor)
+ *
+ *
+ * The queried vehicle id and other defaults match
+ * {@code doc/berlinmod-q3-streaming-forms.md}. The companion local test driver
+ * is {@link BerlinMODQ2LocalTest}.
+ */
+public class BerlinMODQ2Main {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ2Main.class);
+
+ // Default Q2 parameters — query vehicle 200 (Anderlecht), 10 s windows,
+ // 5 s snapshot tick. Matches the synthetic-corpus defaults.
+ private static final int TARGET_VEHICLE_ID = 200;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final String KAFKA_TOPIC = "berlinmod";
+ private static final String KAFKA_BOOTSTRAP = "kafka:29092";
+ private static final String CONSUMER_GROUP = "flink_berlinmod_q2";
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ2Main starting; X={} window={}s tick={}ms",
+ TARGET_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ KafkaSource kafkaSource = KafkaSource.builder()
+ .setBootstrapServers(KAFKA_BOOTSTRAP)
+ .setGroupId(CONSUMER_GROUP)
+ .setTopics(KAFKA_TOPIC)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+ DataStream raw = env.fromSource(
+ kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source (berlinmod)");
+
+ DataStream trips = raw
+ .map(new DeserializeBerlinMODMapFunction())
+ .assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
+ .withTimestampAssigner(new BerlinMODTimestampAssigner())
+ .withIdleness(Duration.ofMinutes(1))
+ );
+
+ // Continuous form — per-event pass-through for the queried vehicle
+ DataStream continuous = trips
+ .process(new Q2ContinuousFunction(TARGET_VEHICLE_ID));
+ continuous.print("Q2-continuous");
+
+ // Windowed form — last-known (lon, lat) per tumbling window
+ DataStream> windowed = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q2WindowedFunction(TARGET_VEHICLE_ID));
+ windowed.print("Q2-windowed");
+
+ // Snapshot form — keyed by vehicle, emits queried vehicle's last position at each tick
+ DataStream> snapshot = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q2SnapshotFunction(TARGET_VEHICLE_ID, SNAPSHOT_TICK_MILLIS));
+ snapshot.print("Q2-snapshot");
+
+ env.execute("BerlinMOD-Q2 (continuous / windowed / snapshot)");
+ LOG.info("BerlinMODQ2Main done");
+ }
+
+ public static class DeserializeBerlinMODMapFunction implements MapFunction {
+ @Override
+ public BerlinMODTrip map(String value) throws Exception {
+ return new BerlinMODDeserializationSchema().deserialize(value.getBytes());
+ }
+ }
+
+ public static class BerlinMODTimestampAssigner implements SerializableTimestampAssigner {
+ @Override
+ public long extractTimestamp(BerlinMODTrip element, long recordTimestamp) {
+ return element.getTimestamp();
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java
new file mode 100644
index 0000000..69e2022
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java
@@ -0,0 +1,114 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q3 three streaming forms.
+ *
+ * Runs the same three form functions {@link BerlinMODQ3Main} runs (continuous,
+ * windowed, snapshot) but reads from a hardcoded synthetic event list via
+ * {@code env.fromCollection(...)} instead of from Kafka. This lets the scaffold
+ * be verified on any machine with Java + Maven, without Docker, a Kafka broker,
+ * the MEOS native lib, or any JMEOS call.
+ *
+ *
Synthetic corpus: 3 vehicles, 21 events over 14 simulated seconds —
+ *
+ * Vehicle 100 — sits on Brussels city centre {@code P}, distance 0 m, near
+ * Vehicle 200 — Anderlecht, ~4.1 km from {@code P}, near (within the 5 km radius)
+ * Vehicle 300 — Forest, ~15.4 km from {@code P}, not near (outside the 5 km radius)
+ *
+ *
+ * Expected output shape:
+ *
+ * Q3-continuous : 21 lines, {@code near=true} for vehicles 100 and 200, {@code false} for 300
+ * Q3-windowed : 2 windows of size 10 s, each with {@code distinctCount=2} (vehicles 100 and 200)
+ * Q3-snapshot : 3 ticks × 2 near vehicles = 6 lines (vehicles 100 and 200 at each of the three 5 s ticks)
+ *
+ *
+ * Run after {@code mvn package} with:
+ *
+ * java -cp target/flink-kafka2postgres-1.0-SNAPSHOT.jar berlinmod.BerlinMODQ3LocalTest
+ *
+ */
+public class BerlinMODQ3LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ3LocalTest.class);
+
+ private static final double P_LON = 4.3517;
+ private static final double P_LAT = 50.8503;
+ private static final double RADIUS_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ3LocalTest starting; P=({}, {}) radius={}m window={}s tick={}ms",
+ P_LON, P_LAT, RADIUS_METRES, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1); // deterministic output ordering for the test
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .process(new Q3ContinuousFunction(P_LON, P_LAT, RADIUS_METRES));
+ cont.print("Q3-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q3WindowedFunction(P_LON, P_LAT, RADIUS_METRES));
+ win.print("Q3-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q3SnapshotFunction(P_LON, P_LAT, RADIUS_METRES, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q3-snapshot");
+
+ env.execute("BerlinMODQ3LocalTest");
+ LOG.info("BerlinMODQ3LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ // Vehicle 100 — Brussels city centre (= P), 7 events at t0, t0+2s, …, t0+12s
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ // Vehicle 200 — Anderlecht ~4.1 km from P, 7 events at t0+1s, t0+3s, …, t0+13s
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ // Vehicle 300 — Forest ~15.4 km from P, 7 events at t0, t0+2s, …, t0+12s
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java
new file mode 100644
index 0000000..29bc518
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java
@@ -0,0 +1,111 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/**
+ * Entry point for the BerlinMOD-Q3 scaffold on MobilityFlink.
+ *
+ * Runs the three streaming forms of BerlinMOD-Q3 side by side over the same
+ * Kafka input topic {@code berlinmod}:
+ *
+ * {@link Q3ContinuousFunction} — per-event near/not-near
+ * {@link Q3WindowedFunction} — distinct-count per N-second tumbling window
+ * {@link Q3SnapshotFunction} — set of vehicles near P at each watermark tick
+ * (the parity-oracle form; ≡ batch BerlinMOD-Q3 at the same scale factor)
+ *
+ *
+ * Reference point P, radius {@code d}, window size and snapshot tick are
+ * the hardcoded defaults from the BerlinMOD-Q3 streaming-forms spec (see
+ * {@code doc/berlinmod-q3-streaming-forms.md}). The Kafka producer is the
+ * companion {@code kafka-producer/python-producer-berlinmod.py}.
+ */
+public class BerlinMODQ3Main {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ3Main.class);
+
+ // Default Q3 parameters — Brussels city centre, 5 km radius, 10 s windows,
+ // 5 s snapshot tick. Matches the defaults in the spec doc.
+ private static final double P_LON = 4.3517;
+ private static final double P_LAT = 50.8503;
+ private static final double RADIUS_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final String KAFKA_TOPIC = "berlinmod";
+ private static final String KAFKA_BOOTSTRAP = "kafka:29092";
+ private static final String CONSUMER_GROUP = "flink_berlinmod_q3";
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ3Main starting; P=({}, {}) radius={}m window={}s tick={}ms",
+ P_LON, P_LAT, RADIUS_METRES, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ KafkaSource kafkaSource = KafkaSource.builder()
+ .setBootstrapServers(KAFKA_BOOTSTRAP)
+ .setGroupId(CONSUMER_GROUP)
+ .setTopics(KAFKA_TOPIC)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+ DataStream raw = env.fromSource(
+ kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source (berlinmod)");
+
+ DataStream trips = raw
+ .map(new DeserializeBerlinMODMapFunction())
+ .assignTimestampsAndWatermarks(
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
+ .withTimestampAssigner(new BerlinMODTimestampAssigner())
+ .withIdleness(Duration.ofMinutes(1))
+ );
+
+ // Continuous form — per-event near/not-near
+ DataStream> continuous = trips
+ .process(new Q3ContinuousFunction(P_LON, P_LAT, RADIUS_METRES));
+ continuous.print("Q3-continuous");
+
+ // Windowed form — distinct count per tumbling window
+ DataStream> windowed = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q3WindowedFunction(P_LON, P_LAT, RADIUS_METRES));
+ windowed.print("Q3-windowed");
+
+ // Snapshot form — keyed by vehicle, emits set of vehicles near P at each tick
+ DataStream> snapshot = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q3SnapshotFunction(P_LON, P_LAT, RADIUS_METRES, SNAPSHOT_TICK_MILLIS));
+ snapshot.print("Q3-snapshot");
+
+ env.execute("BerlinMOD-Q3 (continuous / windowed / snapshot)");
+ LOG.info("BerlinMODQ3Main done");
+ }
+
+ public static class DeserializeBerlinMODMapFunction implements MapFunction {
+ @Override
+ public BerlinMODTrip map(String value) throws Exception {
+ return new BerlinMODDeserializationSchema().deserialize(value.getBytes());
+ }
+ }
+
+ public static class BerlinMODTimestampAssigner implements SerializableTimestampAssigner {
+ @Override
+ public long extractTimestamp(BerlinMODTrip element, long recordTimestamp) {
+ return element.getTimestamp();
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java
new file mode 100644
index 0000000..428e7fc
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java
@@ -0,0 +1,130 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q4 three streaming forms.
+ *
+ * Region R = bounding box {@code (4.30, 50.84, 4.36, 50.86)} — a rectangle
+ * around Brussels city centre. The synthetic corpus is designed to produce
+ * multiple outside → inside transitions so the entry-detection logic
+ * is exercised non-trivially:
+ *
+ *
+ * Vehicle 100 sits inside R for all 7 events (no transitions).
+ * Vehicle 200 oscillates: outside at t=1, inside at t=3 (entry),
+ * outside at t=5, inside at t=7 (entry), outside at t=9, inside at
+ * t=11 (entry), outside at t=13 → three entries .
+ * Vehicle 300 stays in Forest (outside R) for all 7 events.
+ *
+ *
+ * Expected output:
+ *
+ * Q4-continuous : 3 entries (v200's three outside → inside transitions)
+ * Q4-windowed : per the intra-window scoping convention — window
+ * [0, 10 s) contains v100's first-seen-inside event AND v200's two entries
+ * (t=3, t=7); window [10, 20 s) contains v100's first-event-in-window
+ * AND v200's third entry (t=11). 5 emissions total.
+ * Q4-snapshot : cumulative entries up to each tick. Tick 5: 1
+ * (v200 t=3). Tick 10: 2 (v200 t=3, t=7). Tick 15: 3 (v200 t=3, t=7,
+ * t=11). v100 contributes 0 (always inside, no transition). v300
+ * contributes 0. 6 emissions total (1+2+3).
+ *
+ */
+public class BerlinMODQ4LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ4LocalTest.class);
+
+ // Region R — Brussels centre rectangle
+ private static final double XMIN = 4.30;
+ private static final double YMIN = 50.84;
+ private static final double XMAX = 4.36;
+ private static final double YMAX = 50.86;
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ4LocalTest starting; R=({},{},{},{}) window={}s tick={}ms",
+ XMIN, YMIN, XMAX, YMAX, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q4ContinuousFunction(XMIN, YMIN, XMAX, YMAX));
+ cont.print("Q4-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q4WindowedFunction(XMIN, YMIN, XMAX, YMAX));
+ win.print("Q4-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q4SnapshotFunction(XMIN, YMIN, XMAX, YMAX, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q4-snapshot");
+
+ env.execute("BerlinMODQ4LocalTest");
+ LOG.info("BerlinMODQ4LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ // v100 always inside R
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ // v200 oscillates in/out: out, IN, out, IN, out, IN, out
+ double[][] v200Path = {
+ {4.3060, 50.8270}, // t=1 out (lat<50.84)
+ {4.3060, 50.8500}, // t=3 IN
+ {4.3060, 50.8300}, // t=5 out
+ {4.3060, 50.8500}, // t=7 IN
+ {4.3060, 50.8100}, // t=9 out
+ {4.3060, 50.8500}, // t=11 IN
+ {4.3060, 50.8300}, // t=13 out
+ };
+ int idx = 0;
+ for (int i = 1; i <= 13; i += 2, idx++) {
+ events.add(make(200, T0 + i * 1000L, v200Path[idx][0], v200Path[idx][1]));
+ }
+ // v300 always outside R
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java
new file mode 100644
index 0000000..394932d
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java
@@ -0,0 +1,109 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q5 three streaming forms.
+ *
+ * Same stationary-vehicle corpus as Q1/Q2/Q3/Q9. Reference point P =
+ * Brussels city centre (4.3517, 50.8503); {@code dP = 5 km} (vehicle near P);
+ * {@code dMeet = 5 km} (pair-meeting threshold).
+ *
+ *
Pairs:
+ *
+ * (100, 200) — both near P; distance 4.1 km ≤ dMeet → MEET
+ * (100, 300) — v300 not near P → don't qualify
+ * (200, 300) — v300 not near P → don't qualify
+ *
+ *
+ * Expected output (only the (100, 200) pair qualifies):
+ *
+ * Q5-continuous : pair (100, 200) emits on every event from t=1
+ * onward (the first t=0 events of v100 and v300 happen before v200 is
+ * known, so no pair exists yet). 21 - 2 = 19 emissions.
+ * Q5-windowed : each of the two 10-second windows contains
+ * events for v100 and v200 — both qualify, the pair meets. 2 emissions.
+ * Q5-snapshot : 3 ticks × 1 meeting pair = 3 emissions.
+ *
+ */
+public class BerlinMODQ5LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ5LocalTest.class);
+
+ private static final double P_LON = 4.3517;
+ private static final double P_LAT = 50.8503;
+ private static final double D_P_METRES = 5_000.0;
+ private static final double D_MEET_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ5LocalTest starting; P=({}, {}) dP={}m dMeet={}m",
+ P_LON, P_LAT, D_P_METRES, D_MEET_METRES);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(t -> 0)
+ .process(new Q5ContinuousFunction(P_LON, P_LAT, D_P_METRES, D_MEET_METRES));
+ cont.print("Q5-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q5WindowedFunction(P_LON, P_LAT, D_P_METRES, D_MEET_METRES));
+ win.print("Q5-windowed");
+
+ DataStream> snap = trips
+ .keyBy(t -> 0)
+ .process(new Q5SnapshotFunction(P_LON, P_LAT, D_P_METRES, D_MEET_METRES, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q5-snapshot");
+
+ env.execute("BerlinMODQ5LocalTest");
+ LOG.info("BerlinMODQ5LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java
new file mode 100644
index 0000000..6a6fd2b
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java
@@ -0,0 +1,121 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q6 three streaming forms.
+ *
+ * Unlike Q1/Q2/Q3 which use a stationary-vehicles corpus, Q6 needs vehicles
+ * that actually move so the cumulative-distance arithmetic produces non-zero
+ * output. The synthetic corpus here drifts each vehicle by a fixed bearing
+ * per event:
+ *
+ *
+ * Vehicle 100 drifts east ~100 m per 2 s event (0.001423° lon at lat 50.85)
+ * Vehicle 200 drifts south ~50 m per 2 s event (0.000450° lat)
+ * Vehicle 300 drifts west ~200 m per 2 s event (0.002846° lon)
+ *
+ *
+ * With 7 events per vehicle (6 inter-event steps), per-vehicle totals are
+ * approximately:
+ *
+ *
+ * v100: 6 × 100 m = 600 m
+ * v200: 6 × 50 m = 300 m
+ * v300: 6 × 200 m = 1200 m
+ *
+ *
+ * Expected output:
+ *
+ *
+ * Q6-continuous : 21 lines, cumulative metres rising monotonically per vehicle
+ * Q6-windowed : 6 windowed emissions (2 windows × 3 vehicles)
+ * Q6-snapshot : 9 emissions (3 ticks × 3 vehicles, all-source-closed)
+ *
+ */
+public class BerlinMODQ6LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ6LocalTest.class);
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ // Drift per event step
+ private static final double V100_DLON = 100.0 / (111_000.0 * Math.cos(Math.toRadians(50.85)));
+ private static final double V200_DLAT = -50.0 / 111_000.0;
+ private static final double V300_DLON = -200.0 / (111_000.0 * Math.cos(Math.toRadians(50.85)));
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ6LocalTest starting; window={}s tick={}ms",
+ WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q6ContinuousFunction());
+ cont.print("Q6-continuous");
+
+ DataStream> win = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q6WindowedFunction());
+ win.print("Q6-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q6SnapshotFunction(SNAPSHOT_TICK_MILLIS));
+ snap.print("Q6-snapshot");
+
+ env.execute("BerlinMODQ6LocalTest");
+ LOG.info("BerlinMODQ6LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ int step = 0;
+ for (int i = 0; i <= 12; i += 2, step++) {
+ events.add(make(100, T0 + i * 1000L, 4.3517 + step * V100_DLON, 50.8503));
+ }
+ step = 0;
+ for (int i = 1; i <= 13; i += 2, step++) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270 + step * V200_DLAT));
+ }
+ step = 0;
+ for (int i = 0; i <= 12; i += 2, step++) {
+ events.add(make(300, T0 + i * 1000L, 4.2000 + step * V300_DLON, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java
new file mode 100644
index 0000000..e7560c8
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java
@@ -0,0 +1,116 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q7 three streaming forms.
+ *
+ * Same stationary-vehicle corpus as Q1/Q2/Q3/Q5/Q9. POI list:
+ *
+ * POI 1 = Brussels city centre (4.3517, 50.8503), radius 2000 m
+ * POI 2 = Anderlecht (4.3060, 50.8270), radius 1000 m
+ * POI 3 = south of Brussels (4.2100, 50.7600), radius 2000 m
+ *
+ *
+ * Per (vehicle, POI) match-up:
+ *
+ * v100 is inside POI 1 (0 m), outside POI 2 (~4.1 km) and POI 3 (~13 km)
+ * v200 is inside POI 2 (0 m), outside POI 1 and POI 3
+ * v300 is inside POI 3 (~1.3 km), outside POI 1 and POI 2
+ *
+ *
+ * Expected output:
+ *
+ * Q7-continuous : 3 emissions — first-passages on each vehicle's
+ * very first event (v100 t=0 → POI 1; v200 t=1 → POI 2; v300 t=0 →
+ * POI 3)
+ * Q7-windowed : per-window intra-window first-passages —
+ * window [0, 10 s) sees all 3 first-passages; window [10, 20 s) sees
+ * all 3 again (intra-window scoping has no cross-window memory). 6 lines.
+ * Q7-snapshot : 3 ticks × 3 cumulative (vehicle, POI) first-passages = 9 lines
+ *
+ */
+public class BerlinMODQ7LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ7LocalTest.class);
+
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ private static final List POIS = Arrays.asList(
+ new PointOfInterest(1, 4.3517, 50.8503, 2_000.0),
+ new PointOfInterest(2, 4.3060, 50.8270, 1_000.0),
+ new PointOfInterest(3, 4.2100, 50.7600, 2_000.0));
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ7LocalTest starting; #POIs={} window={}s tick={}ms",
+ POIS.size(), WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q7ContinuousFunction(POIS));
+ cont.print("Q7-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q7WindowedFunction(POIS));
+ win.print("Q7-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q7SnapshotFunction(POIS, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q7-snapshot");
+
+ env.execute("BerlinMODQ7LocalTest");
+ LOG.info("BerlinMODQ7LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java
new file mode 100644
index 0000000..9dc6709
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java
@@ -0,0 +1,107 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q8 three streaming forms.
+ *
+ * Same stationary-vehicle corpus as the other Qs. Road segment runs
+ * from (4.30, 50.83) to (4.36, 50.87) — a diagonal across the Brussels-
+ * centre region. With a {@code d = 5 km} proximity threshold:
+ *
+ *
+ * v100 at (4.3517, 50.8503) — ~1.1 km from segment → near
+ * v200 at (4.3060, 50.8270) — ~0.5 km from segment → near
+ * v300 at (4.2000, 50.7500) — ~13 km from segment → not near
+ *
+ *
+ * Expected output shape:
+ *
+ * Q8-continuous : 21 events (14 near=true for v100/v200, 7 near=false for v300)
+ * Q8-windowed : 2 windows, each with {@code distinctCount=2} (vehicles 100 and 200)
+ * Q8-snapshot : 3 ticks × 2 near vehicles = 6 emissions
+ *
+ *
+ * Same shape as Q3 with a segment-distance predicate substituted for the
+ * point-radius one — the algebraic pattern parity intentional.
+ */
+public class BerlinMODQ8LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ8LocalTest.class);
+
+ // Road segment endpoints
+ private static final double S1_LON = 4.30, S1_LAT = 50.83;
+ private static final double S2_LON = 4.36, S2_LAT = 50.87;
+ private static final double RADIUS_METRES = 5_000.0;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ8LocalTest starting; segment=({},{}) → ({},{}) d={}m",
+ S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ DataStream> cont = trips
+ .process(new Q8ContinuousFunction(S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES));
+ cont.print("Q8-continuous");
+
+ DataStream> win = trips
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q8WindowedFunction(S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES));
+ win.print("Q8-windowed");
+
+ DataStream> snap = trips
+ .keyBy(BerlinMODTrip::getVehicleId)
+ .process(new Q8SnapshotFunction(S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q8-snapshot");
+
+ env.execute("BerlinMODQ8LocalTest");
+ LOG.info("BerlinMODQ8LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java
new file mode 100644
index 0000000..f990031
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java
@@ -0,0 +1,107 @@
+package berlinmod;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Local end-to-end test driver for the BerlinMOD-Q9 three streaming forms.
+ *
+ * Same stationary-vehicle synthetic corpus as Q1/Q2/Q3 (3 vehicles, 21
+ * events). Queried pair X = 100 (Brussels city centre), Y = 200 (Anderlecht);
+ * their actual distance is ~4.1 km — the expected output for every emission.
+ *
+ *
Expected output:
+ *
+ * Q9-continuous : 13 lines — emitted whenever either X or Y has
+ * a new event AND the other has been seen at least once. v100 fires
+ * first at t=0; v200's first event at t=1 produces the first paired
+ * emission; subsequent 12 events (alternating) each produce one
+ * emission.
+ * Q9-windowed : 2 windows — both contain X and Y events, each
+ * emits the X-Y distance using last seen-in-window positions.
+ * Q9-snapshot : 3 ticks × 1 emission each = 3 lines.
+ *
+ */
+public class BerlinMODQ9LocalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ9LocalTest.class);
+
+ private static final int X_VEHICLE_ID = 100;
+ private static final int Y_VEHICLE_ID = 200;
+ private static final long WINDOW_SIZE_SECONDS = 10L;
+ private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
+ private static final long T0 = 1_735_711_200_000L;
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("BerlinMODQ9LocalTest starting; X={} Y={} window={}s tick={}ms",
+ X_VEHICLE_ID, Y_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ List events = buildEvents();
+ DataStreamSource raw = env.fromCollection(events);
+ DataStream trips = raw.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(1))
+ .withTimestampAssigner((e, t) -> e.getTimestamp()));
+
+ // Pre-filter to {X, Y} and key by a constant so the shared X+Y state
+ // lives in a single subtask.
+ DataStream xy = trips
+ .filter(t -> t.getVehicleId() == X_VEHICLE_ID || t.getVehicleId() == Y_VEHICLE_ID);
+
+ DataStream> cont = xy
+ .keyBy(t -> 0)
+ .process(new Q9ContinuousFunction(X_VEHICLE_ID, Y_VEHICLE_ID));
+ cont.print("Q9-continuous");
+
+ DataStream> win = xy
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
+ .process(new Q9WindowedFunction(X_VEHICLE_ID, Y_VEHICLE_ID));
+ win.print("Q9-windowed");
+
+ DataStream> snap = xy
+ .keyBy(t -> 0)
+ .process(new Q9SnapshotFunction(X_VEHICLE_ID, Y_VEHICLE_ID, SNAPSHOT_TICK_MILLIS));
+ snap.print("Q9-snapshot");
+
+ env.execute("BerlinMODQ9LocalTest");
+ LOG.info("BerlinMODQ9LocalTest done");
+ }
+
+ private static List buildEvents() {
+ List events = new ArrayList<>();
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
+ }
+ for (int i = 1; i <= 13; i += 2) {
+ events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
+ }
+ for (int i = 0; i <= 12; i += 2) {
+ events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
+ }
+ return events;
+ }
+
+ private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
+ BerlinMODTrip trip = new BerlinMODTrip();
+ trip.setVehicleId(vid);
+ trip.setTimestamp(t);
+ trip.setLon(lon);
+ trip.setLat(lat);
+ return trip;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODTrip.java b/flink-processor/src/main/java/berlinmod/BerlinMODTrip.java
new file mode 100644
index 0000000..6eb8e80
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/BerlinMODTrip.java
@@ -0,0 +1,48 @@
+package berlinmod;
+
+/**
+ * Plain data class for a single GPS event from a BerlinMOD trip.
+ *
+ * Matches the {@code aisdata.AISData} field set but uses the BerlinMOD vehicle
+ * identifier {@code vehicleId} instead of an AIS {@code mmsi} and drops the
+ * AIS-specific {@code speed}/{@code course} channels (BerlinMOD's generator
+ * does not export those for the streaming form).
+ */
+public class BerlinMODTrip {
+ private long timestamp; // epoch milliseconds (event time)
+ private int vehicleId;
+ private double lon;
+ private double lat;
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public int getVehicleId() {
+ return vehicleId;
+ }
+
+ public void setVehicleId(int vehicleId) {
+ this.vehicleId = vehicleId;
+ }
+
+ public double getLon() {
+ return lon;
+ }
+
+ public void setLon(double lon) {
+ this.lon = lon;
+ }
+
+ public double getLat() {
+ return lat;
+ }
+
+ public void setLat(double lat) {
+ this.lat = lat;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/MEOSBridge.java b/flink-processor/src/main/java/berlinmod/MEOSBridge.java
new file mode 100644
index 0000000..6b29952
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/MEOSBridge.java
@@ -0,0 +1,133 @@
+package berlinmod;
+
+import functions.GeneratedFunctions;
+import jnr.ffi.Pointer;
+import org.mobilitydb.flink.meos.wirings.MeosWiringRuntime;
+
+/**
+ * Thin wiring from the BerlinMOD streaming-form predicates to MEOS via JMEOS.
+ *
+ *
Every spatial predicate exercised by the BerlinMOD-9 × 3-form scaffold
+ * flows through this class, and every predicate evaluates through MEOS. The
+ * within-distance predicate is the canonical temporal operator
+ * {@code edwithin_tgeo_geo} — ever-within between the vehicle's {@code tgeogpoint}
+ * instant and the query geography, in metres on the WGS84 spheroid — the same
+ * MEOS operator the streaming-form parity contract names. Distances are
+ * {@code geog_distance} over WGS84 geographies. This class holds no spatial
+ * mathematics of its own: it constructs the MEOS inputs and delegates the
+ * computation to libmeos.
+ *
+ *
{@link MeosWiringRuntime#ensureInitializedOnThread()} initialises MEOS on
+ * the calling task thread (idempotent per thread) before the first call, since
+ * MEOS keeps its session state per OS thread.
+ */
+public final class MEOSBridge {
+
+ private MEOSBridge() {
+ // utility
+ }
+
+ // ----------------------------------------------------------------------
+ // Public predicate surface — all evaluation delegated to MEOS.
+ // ----------------------------------------------------------------------
+
+ /**
+ * @return {@code true} iff the WGS84 spheroidal distance from
+ * {@code (lon1, lat1)} to {@code (lon2, lat2)} is at most
+ * {@code radiusMetres}, via MEOS {@code edwithin_tgeo_geo} between
+ * the {@code (lon1, lat1)} {@code tgeogpoint} instant and the
+ * {@code (lon2, lat2)} point geography.
+ */
+ public static boolean dwithinMetres(double lon1, double lat1,
+ double lon2, double lat2,
+ double radiusMetres) {
+ MeosWiringRuntime.ensureInitializedOnThread();
+ return GeneratedFunctions.edwithin_tgeo_geo(
+ tgeogInst(lon1, lat1), pointGeog(lon2, lat2), radiusMetres) == 1;
+ }
+
+ /**
+ * @return {@code true} iff the WGS84 spheroidal distance from
+ * {@code (pLon, pLat)} to the LineString {@code (s1, s2)} is at most
+ * {@code radiusMetres}, via MEOS {@code edwithin_tgeo_geo} between
+ * the point {@code tgeogpoint} instant and the line geography.
+ */
+ public static boolean dwithinSegmentMetres(double pLon, double pLat,
+ double s1Lon, double s1Lat,
+ double s2Lon, double s2Lat,
+ double radiusMetres) {
+ MeosWiringRuntime.ensureInitializedOnThread();
+ return GeneratedFunctions.edwithin_tgeo_geo(
+ tgeogInst(pLon, pLat),
+ lineGeog(s1Lon, s1Lat, s2Lon, s2Lat), radiusMetres) == 1;
+ }
+
+ /**
+ * @return {@code true} iff {@code (lon, lat)} lies in the axis-aligned box
+ * {@code [xmin, xmax] × [ymin, ymax]}, via MEOS
+ * {@code eintersects_tgeo_geo} between the point's {@code tgeompoint}
+ * instant and the box polygon (planar, SRID 4326).
+ */
+ public static boolean intersectsBox(double lon, double lat,
+ double xmin, double ymin,
+ double xmax, double ymax) {
+ MeosWiringRuntime.ensureInitializedOnThread();
+ return GeneratedFunctions.eintersects_tgeo_geo(
+ tgeomInst(lon, lat), boxPolygon(xmin, ymin, xmax, ymax)) == 1;
+ }
+
+ /**
+ * @return the WGS84 spheroidal distance in metres between two points, via
+ * MEOS {@code geog_distance}.
+ */
+ public static double distanceMetres(double lon1, double lat1,
+ double lon2, double lat2) {
+ MeosWiringRuntime.ensureInitializedOnThread();
+ return GeneratedFunctions.geog_distance(pointGeog(lon1, lat1), pointGeog(lon2, lat2));
+ }
+
+ /**
+ * @return the WGS84 spheroidal distance in metres from {@code (pLon, pLat)}
+ * to the LineString {@code (s1, s2)}, via MEOS {@code geog_distance}.
+ */
+ public static double distanceSegmentMetres(double pLon, double pLat,
+ double s1Lon, double s1Lat,
+ double s2Lon, double s2Lat) {
+ MeosWiringRuntime.ensureInitializedOnThread();
+ return GeneratedFunctions.geog_distance(
+ pointGeog(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat));
+ }
+
+ // ----------------------------------------------------------------------
+ // Internal helpers — construct the MEOS temporal / geography inputs.
+ // ----------------------------------------------------------------------
+
+ private static Pointer tgeogInst(double lon, double lat) {
+ return GeneratedFunctions.tgeogpoint_in(
+ String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat));
+ }
+
+ private static Pointer tgeomInst(double lon, double lat) {
+ return GeneratedFunctions.tgeompoint_in(
+ String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat));
+ }
+
+ private static Pointer boxPolygon(double xmin, double ymin,
+ double xmax, double ymax) {
+ return GeneratedFunctions.geom_in(String.format(
+ "SRID=4326;Polygon((%.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f))",
+ xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax, xmin, ymin), -1);
+ }
+
+ private static Pointer pointGeog(double lon, double lat) {
+ return GeneratedFunctions.geom_to_geog(
+ GeneratedFunctions.geom_in(String.format("SRID=4326;Point(%.7f %.7f)", lon, lat), -1));
+ }
+
+ private static Pointer lineGeog(double s1Lon, double s1Lat,
+ double s2Lon, double s2Lat) {
+ return GeneratedFunctions.geom_to_geog(
+ GeneratedFunctions.geom_in(String.format(
+ "SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", s1Lon, s1Lat, s2Lon, s2Lat), -1));
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/PointOfInterest.java b/flink-processor/src/main/java/berlinmod/PointOfInterest.java
new file mode 100644
index 0000000..0dc3ac5
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/PointOfInterest.java
@@ -0,0 +1,24 @@
+package berlinmod;
+
+import java.io.Serializable;
+
+/**
+ * Simple point-of-interest record for BerlinMOD-Q7 — a (lon, lat) plus a
+ * proximity radius in metres and an integer id. Serializable for use in
+ * Flink operator state and configuration.
+ */
+public final class PointOfInterest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public final int id;
+ public final double lon;
+ public final double lat;
+ public final double radiusMetres;
+
+ public PointOfInterest(int id, double lon, double lat, double radiusMetres) {
+ this.id = id;
+ this.lon = lon;
+ this.lat = lat;
+ this.radiusMetres = radiusMetres;
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q1ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q1ContinuousFunction.java
new file mode 100644
index 0000000..dcaa383
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q1ContinuousFunction.java
@@ -0,0 +1,41 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q1 — continuous form .
+ *
+ *
"Which vehicles have appeared in the stream?"
+ *
+ *
Emits {@code (vehicleId, firstSeenTimestamp)} the first time each vehicle
+ * is seen; subsequent events for the same vehicle are deduplicated via keyed
+ * state. Keyed by vehicleId.
+ */
+public class Q1ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private transient ValueState seen;
+
+ @Override
+ public void open(Configuration parameters) {
+ seen = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q1SeenVehicle", Boolean.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ Boolean s = seen.value();
+ if (s == null || !s) {
+ out.collect(new Tuple2<>(trip.getVehicleId(), trip.getTimestamp()));
+ seen.update(true);
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q1SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q1SnapshotFunction.java
new file mode 100644
index 0000000..03c171a
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q1SnapshotFunction.java
@@ -0,0 +1,59 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q1 — snapshot form .
+ *
+ * "At time T, which vehicles have appeared in the stream up to T?"
+ *
+ *
Keyed by vehicleId. On each event, mark the vehicle as seen and register
+ * an event-time timer at the next snapshot tick. When the timer fires at time
+ * T, emit {@code (T, vehicleId)} for each vehicle that has been seen by T.
+ *
+ *
This is the parity-oracle form: at watermark T, the streaming output is
+ * the set of vehicleIds whose first event occurred at or before T, which
+ * equals the batch BerlinMOD-Q1 result on data up to T.
+ */
+public class Q1SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final long snapshotTickMillis;
+ private transient ValueState seen;
+
+ public Q1SnapshotFunction(long snapshotTickMillis) {
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ seen = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q1SnapshotSeen", Boolean.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ seen.update(true);
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Boolean s = seen.value();
+ if (Boolean.TRUE.equals(s)) {
+ out.collect(new Tuple2<>(timestamp, ctx.getCurrentKey()));
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q1WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q1WindowedFunction.java
new file mode 100644
index 0000000..4581422
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q1WindowedFunction.java
@@ -0,0 +1,33 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * BerlinMOD-Q1 — windowed form .
+ *
+ * "Per N-second tumbling window, how many distinct vehicles appeared
+ * in the window?"
+ *
+ *
Emits {@code (windowStart, windowEnd, distinctCount)} per window.
+ */
+public class Q1WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ Set distinct = new HashSet<>();
+ for (BerlinMODTrip trip : elements) {
+ distinct.add(trip.getVehicleId());
+ }
+ out.collect(new Tuple3<>(ctx.window().getStart(), ctx.window().getEnd(), (long) distinct.size()));
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q2ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q2ContinuousFunction.java
new file mode 100644
index 0000000..9d87dcb
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q2ContinuousFunction.java
@@ -0,0 +1,40 @@
+package berlinmod;
+
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q2 — continuous form .
+ *
+ * "Where is vehicle X right now?"
+ *
+ *
For each incoming GPS event {@link BerlinMODTrip}, emit it unchanged if it
+ * belongs to the queried vehicle, otherwise drop. No windowing, no state —
+ * a per-event filter against {@code targetVehicleId}.
+ */
+public class Q2ContinuousFunction extends ProcessFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q2ContinuousFunction.class);
+
+ private final int targetVehicleId;
+
+ public Q2ContinuousFunction(int targetVehicleId) {
+ this.targetVehicleId = targetVehicleId;
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector out) {
+ if (trip.getVehicleId() == targetVehicleId) {
+ out.collect(trip);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q2-continuous: vehicle={} t={} ({}, {})",
+ trip.getVehicleId(), trip.getTimestamp(), trip.getLon(), trip.getLat());
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q2SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q2SnapshotFunction.java
new file mode 100644
index 0000000..4468d82
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q2SnapshotFunction.java
@@ -0,0 +1,83 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q2 — snapshot form .
+ *
+ * "At time T, where is vehicle X?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q2 result on the same data up to T (the most
+ * recent known position of vehicle X on or before T).
+ *
+ *
Keyed by vehicleId (so the operator scales naturally if the queried
+ * vehicle changes, and so reuse across multiple queried vehicles is a
+ * fan-out keying choice rather than a code change). For events whose key
+ * matches {@code targetVehicleId}, update last-known state and register an
+ * event-time timer for the next snapshot tick. When the timer fires, emit
+ * {@code (T, lon, lat, t_of_last_event)}.
+ */
+public class Q2SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q2SnapshotFunction.class);
+
+ private final int targetVehicleId;
+ private final long snapshotTickMillis;
+
+ private transient ValueState> lastKnown; // (lon, lat, ts)
+
+ public Q2SnapshotFunction(int targetVehicleId, long snapshotTickMillis) {
+ this.targetVehicleId = targetVehicleId;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> tInfo =
+ TypeInformation.of(new TypeHint>() {});
+ ValueStateDescriptor> desc =
+ new ValueStateDescriptor<>("q2LastKnownPosition", tInfo);
+ lastKnown = getRuntimeContext().getState(desc);
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ if (trip.getVehicleId() != targetVehicleId) {
+ return;
+ }
+ lastKnown.update(new Tuple3<>(trip.getLon(), trip.getLat(), trip.getTimestamp()));
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Tuple3 p = lastKnown.value();
+ if (p == null) {
+ return;
+ }
+ out.collect(new Tuple4<>(timestamp, p.f0, p.f1, p.f2));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q2-snapshot: T={} vehicle={} ({}, {}) at t={}",
+ timestamp, ctx.getCurrentKey(), p.f0, p.f1, p.f2);
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q2WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q2WindowedFunction.java
new file mode 100644
index 0000000..38ee0ff
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q2WindowedFunction.java
@@ -0,0 +1,58 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q2 — windowed form .
+ *
+ * "Per N-second tumbling window, what is vehicle X's most recent
+ * position seen within the window?"
+ *
+ *
For each window, filter to events matching {@code targetVehicleId}, keep
+ * the event with the largest timestamp, and emit
+ * {@code (windowStart, windowEnd, vehicleId, lon, lat)}. If the vehicle had
+ * no events in the window, emit nothing.
+ */
+public class Q2WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q2WindowedFunction.class);
+
+ private final int targetVehicleId;
+
+ public Q2WindowedFunction(int targetVehicleId) {
+ this.targetVehicleId = targetVehicleId;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ BerlinMODTrip latest = null;
+ for (BerlinMODTrip trip : elements) {
+ if (trip.getVehicleId() != targetVehicleId) {
+ continue;
+ }
+ if (latest == null || trip.getTimestamp() > latest.getTimestamp()) {
+ latest = trip;
+ }
+ }
+ if (latest != null) {
+ out.collect(new Tuple5<>(
+ ctx.window().getStart(),
+ ctx.window().getEnd(),
+ latest.getVehicleId(),
+ latest.getLon(),
+ latest.getLat()));
+ LOG.info("Q2-windowed: [{}, {}) vehicle={} last=({}, {})",
+ ctx.window().getStart(), ctx.window().getEnd(),
+ latest.getVehicleId(), latest.getLon(), latest.getLat());
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java
new file mode 100644
index 0000000..e28f825
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java
@@ -0,0 +1,47 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q3 — continuous form .
+ *
+ * "At every moment, which vehicles are currently within {@code d} metres
+ * of point P?"
+ *
+ *
For each incoming GPS event {@link BerlinMODTrip}, evaluate the radius
+ * predicate and emit {@code (vehicleId, eventTimeMillis, isNear)} per event.
+ * No windowing — output updates per event, watermark-independent.
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS
+ * {@code edwithin_tgeo_geo} over WGS84 geographies.
+ */
+public class Q3ContinuousFunction extends ProcessFunction> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q3ContinuousFunction.class);
+
+ private final double pLon;
+ private final double pLat;
+ private final double radiusMetres;
+
+ public Q3ContinuousFunction(double pLon, double pLat, double radiusMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.radiusMetres = radiusMetres;
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) {
+ boolean near = MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), pLon, pLat, radiusMetres);
+ out.collect(new Tuple3<>(trip.getVehicleId(), trip.getTimestamp(), near));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q3-continuous: vehicle={} ts={} near={}", trip.getVehicleId(), trip.getTimestamp(), near);
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java
new file mode 100644
index 0000000..467a19b
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java
@@ -0,0 +1,90 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BerlinMOD-Q3 — snapshot form .
+ *
+ * "At time T, which vehicles are within {@code d} metres of point P?"
+ *
+ *
This is the parity-oracle form : streaming output at watermark T
+ * must equal the batch BerlinMOD-Q3 result on the same data up to T.
+ *
+ *
Keyed by vehicleId. Maintains a per-vehicle {@code lastKnownPosition}
+ * state. On each event, update the state, then register an event-time timer
+ * for the snapshot tick. When the timer fires at time T, evaluate the radius
+ * predicate against the most recent known position and emit
+ * {@code (T, vehicleId)} if the vehicle is within {@code d} of P at that
+ * snapshot.
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS
+ * {@code edwithin_tgeo_geo} over WGS84 geographies. The snapshot-form output
+ * at watermark T is equal to the batch BerlinMOD-Q3 result up to T.
+ */
+public class Q3SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q3SnapshotFunction.class);
+
+ private final double pLon;
+ private final double pLat;
+ private final double radiusMetres;
+ private final long snapshotTickMillis;
+
+ private transient ValueState> lastKnown; // (lon, lat, ts)
+
+ public Q3SnapshotFunction(
+ double pLon, double pLat, double radiusMetres, long snapshotTickMillis) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.radiusMetres = radiusMetres;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> tInfo =
+ TypeInformation.of(new TypeHint>() {});
+ ValueStateDescriptor> desc =
+ new ValueStateDescriptor<>("lastKnownPosition", tInfo);
+ lastKnown = getRuntimeContext().getState(desc);
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ lastKnown.update(new Tuple3<>(trip.getLon(), trip.getLat(), trip.getTimestamp()));
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Tuple3 p = lastKnown.value();
+ if (p == null) {
+ return;
+ }
+ if (MEOSBridge.dwithinMetres(p.f0, p.f1, pLon, pLat, radiusMetres)) {
+ Integer vehicleId = ctx.getCurrentKey();
+ out.collect(new Tuple2<>(timestamp, vehicleId));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Q3-snapshot: T={} vehicle={}", timestamp, vehicleId);
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java
new file mode 100644
index 0000000..8806791
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java
@@ -0,0 +1,58 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * BerlinMOD-Q3 — windowed form .
+ *
+ * "Per N-second window, how many distinct vehicles were within
+ * {@code d} metres of point P at any time during the window?"
+ *
+ *
Tumbling event-time window of configurable size. For each window, scan
+ * all events whose timestamp falls in the window, count distinct vehicleIds
+ * for which at least one event satisfies the radius predicate, and emit
+ * {@code (windowStart, windowEnd, distinctCount)}.
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS
+ * {@code edwithin_tgeo_geo} over WGS84 geographies.
+ */
+public class Q3WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Q3WindowedFunction.class);
+
+ private final double pLon;
+ private final double pLat;
+ private final double radiusMetres;
+
+ public Q3WindowedFunction(double pLon, double pLat, double radiusMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.radiusMetres = radiusMetres;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ Set distinctNear = new HashSet<>();
+ for (BerlinMODTrip trip : elements) {
+ if (MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), pLon, pLat, radiusMetres)) {
+ distinctNear.add(trip.getVehicleId());
+ }
+ }
+ long count = distinctNear.size();
+ out.collect(new Tuple3<>(ctx.window().getStart(), ctx.window().getEnd(), count));
+ LOG.info("Q3-windowed: [{}, {}) distinct-near={}",
+ ctx.window().getStart(), ctx.window().getEnd(), count);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q4ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q4ContinuousFunction.java
new file mode 100644
index 0000000..472d1f1
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q4ContinuousFunction.java
@@ -0,0 +1,61 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q4 — continuous form .
+ *
+ * "Which vehicles entered region R (transition outside → inside),
+ * and when?"
+ *
+ *
Keyed by vehicleId. Per-vehicle state tracks the last seen
+ * inside-or-outside flag for R. On each event, computes the current
+ * inside-or-outside, and if the transition is outside→inside, emits
+ * {@code (vehicleId, entryTime)}.
+ *
+ *
Predicate: {@link MEOSBridge#intersectsBox} — MEOS
+ * {@code eintersects_tgeo_geo} between the point's {@code tgeompoint} instant
+ * and the region polygon.
+ */
+public class Q4ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private final double xmin, ymin, xmax, ymax;
+ private transient ValueState wasInside;
+
+ public Q4ContinuousFunction(double xmin, double ymin, double xmax, double ymax) {
+ this.xmin = xmin;
+ this.ymin = ymin;
+ this.xmax = xmax;
+ this.ymax = ymax;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ wasInside = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q4WasInside", Boolean.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ boolean isInside = inBox(trip.getLon(), trip.getLat());
+ Boolean prev = wasInside.value();
+ boolean prevInside = prev != null && prev;
+ if (isInside && !prevInside) {
+ out.collect(new Tuple2<>(trip.getVehicleId(), trip.getTimestamp()));
+ }
+ wasInside.update(isInside);
+ }
+
+ private boolean inBox(double lon, double lat) {
+ return MEOSBridge.intersectsBox(lon, lat, xmin, ymin, xmax, ymax);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q4SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q4SnapshotFunction.java
new file mode 100644
index 0000000..45b05fa
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q4SnapshotFunction.java
@@ -0,0 +1,82 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * BerlinMOD-Q4 — snapshot form .
+ *
+ * "At time T, what is the list of (vehicleId, entryTime) pairs for all
+ * vehicles that entered region R at or before T?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q4 result on the same data up to T.
+ *
+ *
Keyed by vehicleId. Per-vehicle state: a {@code wasInside} flag plus a
+ * {@code ListState} of recorded entry times. On each event, detect
+ * outside → inside transitions and append entry time. Register an event-time
+ * timer at the next snapshot tick. On timer fire at T, emit one
+ * {@code (T, vehicleId, entryTime)} per recorded entry.
+ */
+public class Q4SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final double xmin, ymin, xmax, ymax;
+ private final long snapshotTickMillis;
+ private transient ValueState wasInside;
+ private transient ListState entries;
+
+ public Q4SnapshotFunction(double xmin, double ymin, double xmax, double ymax, long snapshotTickMillis) {
+ this.xmin = xmin;
+ this.ymin = ymin;
+ this.xmax = xmax;
+ this.ymax = ymax;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ wasInside = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("q4SnapWasInside", Boolean.class));
+ entries = getRuntimeContext().getListState(
+ new ListStateDescriptor<>("q4SnapEntries", Long.class));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ boolean curr = inBox(trip.getLon(), trip.getLat());
+ Boolean prev = wasInside.value();
+ boolean prevInside = prev != null && prev;
+ if (curr && !prevInside) {
+ entries.add(trip.getTimestamp());
+ }
+ wasInside.update(curr);
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ for (Long entry : entries.get()) {
+ if (entry <= timestamp) {
+ out.collect(new Tuple3<>(timestamp, ctx.getCurrentKey(), entry));
+ }
+ }
+ }
+
+ private boolean inBox(double lon, double lat) {
+ return MEOSBridge.intersectsBox(lon, lat, xmin, ymin, xmax, ymax);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q4WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q4WindowedFunction.java
new file mode 100644
index 0000000..e38c364
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q4WindowedFunction.java
@@ -0,0 +1,74 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q4 — windowed form .
+ *
+ * "Per N-second tumbling window, which vehicles entered region R during
+ * the window, and at what time?"
+ *
+ *
Scans all events in the window, sorted per-vehicle by time, and detects
+ * outside → inside transitions within the window. Emits one
+ * {@code (windowStart, windowEnd, vehicleId, entryTime)} per detected entry.
+ *
+ *
Note: a vehicle's "outside" state at window start is inferred only from
+ * the window's first event (no cross-window state). This intra-window
+ * scoping matches BerlinMOD-Q4's "first N entries during a period" form.
+ */
+public class Q4WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private final double xmin, ymin, xmax, ymax;
+
+ public Q4WindowedFunction(double xmin, double ymin, double xmax, double ymax) {
+ this.xmin = xmin;
+ this.ymin = ymin;
+ this.xmax = xmax;
+ this.ymax = ymax;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ Map> perVehicle = new HashMap<>();
+ for (BerlinMODTrip trip : elements) {
+ perVehicle.computeIfAbsent(trip.getVehicleId(), k -> new ArrayList<>()).add(trip);
+ }
+ for (Map.Entry> e : perVehicle.entrySet()) {
+ List sorted = e.getValue();
+ sorted.sort((a, b) -> Long.compare(a.getTimestamp(), b.getTimestamp()));
+ boolean prevInside = false; // intra-window only — treats first event as the prior
+ for (int i = 0; i < sorted.size(); i++) {
+ BerlinMODTrip t = sorted.get(i);
+ boolean curr = inBox(t.getLon(), t.getLat());
+ if (i == 0) {
+ if (curr) {
+ // first event already inside — count as entry per the
+ // intra-window scoping convention (no prior visibility)
+ out.collect(new Tuple4<>(ctx.window().getStart(), ctx.window().getEnd(),
+ e.getKey(), t.getTimestamp()));
+ }
+ } else if (curr && !prevInside) {
+ out.collect(new Tuple4<>(ctx.window().getStart(), ctx.window().getEnd(),
+ e.getKey(), t.getTimestamp()));
+ }
+ prevInside = curr;
+ }
+ }
+ }
+
+ private boolean inBox(double lon, double lat) {
+ return MEOSBridge.intersectsBox(lon, lat, xmin, ymin, xmax, ymax);
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q5ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q5ContinuousFunction.java
new file mode 100644
index 0000000..51037b8
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q5ContinuousFunction.java
@@ -0,0 +1,94 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q5 — continuous form .
+ *
+ * "Which pairs of vehicles are currently meeting near point P?"
+ *
+ *
A pair {@code (a, b)} meets near P when both vehicles are within
+ * {@code dP} metres of {@code P} and the distance between them is at most
+ * {@code dMeet} metres.
+ *
+ *
Caller is expected to key the input stream by a constant so the shared
+ * cross-vehicle last-known state lives in a single subtask. Per-event:
+ * update the last-known position of the event's vehicle, then enumerate all
+ * known pairs and emit {@code (a, b, eventTime, distanceMetres)} for every
+ * currently-meeting pair (with {@code a < b} for stable identity).
+ *
+ *
Predicate: {@link MEOSBridge#dwithinMetres} (MEOS
+ * {@code edwithin_tgeo_geo}) for the near-P filter and
+ * {@link MEOSBridge#distanceMetres} (MEOS {@code geog_distance}) for the
+ * pairwise meeting distance.
+ */
+public class Q5ContinuousFunction
+ extends KeyedProcessFunction> {
+
+ private final double pLon, pLat, dPMetres, dMeetMetres;
+ private transient MapState> lastPos;
+
+ public Q5ContinuousFunction(double pLon, double pLat, double dPMetres, double dMeetMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.dPMetres = dPMetres;
+ this.dMeetMetres = dMeetMetres;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> vInfo =
+ TypeInformation.of(new TypeHint>() {});
+ lastPos = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>("q5LastPos", TypeInformation.of(Integer.class), vInfo));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ lastPos.put(trip.getVehicleId(), new Tuple2<>(trip.getLon(), trip.getLat()));
+
+ // Snapshot the map and pick pairs near P
+ Map> snap = new HashMap<>();
+ for (Map.Entry> e : lastPos.entries()) {
+ snap.put(e.getKey(), e.getValue());
+ }
+ List>> nearP = new ArrayList<>();
+ for (Map.Entry> e : snap.entrySet()) {
+ Tuple2 p = e.getValue();
+ if (MEOSBridge.dwithinMetres(p.f0, p.f1, pLon, pLat, dPMetres)) {
+ nearP.add(e);
+ }
+ }
+ nearP.sort(Comparator.comparingInt(Map.Entry::getKey));
+
+ for (int i = 0; i < nearP.size(); i++) {
+ for (int j = i + 1; j < nearP.size(); j++) {
+ Tuple2 a = nearP.get(i).getValue();
+ Tuple2 b = nearP.get(j).getValue();
+ double d = MEOSBridge.distanceMetres(a.f0, a.f1, b.f0, b.f1);
+ if (d <= dMeetMetres) {
+ out.collect(new Tuple4<>(
+ nearP.get(i).getKey(), nearP.get(j).getKey(),
+ trip.getTimestamp(), d));
+ }
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q5SnapshotFunction.java b/flink-processor/src/main/java/berlinmod/Q5SnapshotFunction.java
new file mode 100644
index 0000000..b2f2758
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q5SnapshotFunction.java
@@ -0,0 +1,97 @@
+package berlinmod;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q5 — snapshot form .
+ *
+ * "At time T, which pairs of vehicles are meeting near P (using each
+ * vehicle's most-recent-known position on or before T)?"
+ *
+ *
This is the parity-oracle form: streaming output at watermark T must
+ * equal the batch BerlinMOD-Q5 result on the same data up to T.
+ *
+ *
Caller is expected to key the stream by a constant; the
+ * cross-vehicle last-known state is a {@link MapState}. On each event,
+ * update last-known and register an event-time timer at the next snapshot
+ * tick. On timer fire at T, snapshot the map and emit all meeting pairs.
+ */
+public class Q5SnapshotFunction
+ extends KeyedProcessFunction> {
+
+ private final double pLon, pLat, dPMetres, dMeetMetres;
+ private final long snapshotTickMillis;
+ private transient MapState> lastPos;
+
+ public Q5SnapshotFunction(double pLon, double pLat, double dPMetres,
+ double dMeetMetres, long snapshotTickMillis) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.dPMetres = dPMetres;
+ this.dMeetMetres = dMeetMetres;
+ this.snapshotTickMillis = snapshotTickMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ TypeInformation> vInfo =
+ TypeInformation.of(new TypeHint>() {});
+ lastPos = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>("q5SnapLastPos", TypeInformation.of(Integer.class), vInfo));
+ }
+
+ @Override
+ public void processElement(
+ BerlinMODTrip trip,
+ Context ctx,
+ Collector> out) throws Exception {
+ lastPos.put(trip.getVehicleId(), new Tuple2<>(trip.getLon(), trip.getLat()));
+ long nextTick = ((trip.getTimestamp() / snapshotTickMillis) + 1) * snapshotTickMillis;
+ ctx.timerService().registerEventTimeTimer(nextTick);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector> out) throws Exception {
+ Map> snap = new HashMap<>();
+ for (Map.Entry> e : lastPos.entries()) {
+ snap.put(e.getKey(), e.getValue());
+ }
+ List>> nearP = new ArrayList<>();
+ for (Map.Entry> e : snap.entrySet()) {
+ Tuple2 p = e.getValue();
+ if (MEOSBridge.dwithinMetres(p.f0, p.f1, pLon, pLat, dPMetres)) {
+ nearP.add(e);
+ }
+ }
+ nearP.sort(Comparator.comparingInt(Map.Entry::getKey));
+
+ for (int i = 0; i < nearP.size(); i++) {
+ for (int j = i + 1; j < nearP.size(); j++) {
+ Tuple2 a = nearP.get(i).getValue();
+ Tuple2 b = nearP.get(j).getValue();
+ double d = MEOSBridge.distanceMetres(a.f0, a.f1, b.f0, b.f1);
+ if (d <= dMeetMetres) {
+ out.collect(new Tuple4<>(timestamp,
+ nearP.get(i).getKey(), nearP.get(j).getKey(), d));
+ }
+ }
+ }
+ }
+}
diff --git a/flink-processor/src/main/java/berlinmod/Q5WindowedFunction.java b/flink-processor/src/main/java/berlinmod/Q5WindowedFunction.java
new file mode 100644
index 0000000..8140d0f
--- /dev/null
+++ b/flink-processor/src/main/java/berlinmod/Q5WindowedFunction.java
@@ -0,0 +1,75 @@
+package berlinmod;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BerlinMOD-Q5 — windowed form .
+ *
+ * "Per N-second tumbling window, which pairs of vehicles met near P
+ * (using each vehicle's last-seen-in-window position)?"
+ *
+ *
Within each window, take each vehicle's latest position from the
+ * window's events. Run the same near-P-and-within-meet-distance pair check
+ * as the continuous form. Emit {@code (windowStart, windowEnd, a, b,
+ * distanceMetres)} per meeting pair.
+ */
+public class Q5WindowedFunction
+ extends ProcessAllWindowFunction, TimeWindow> {
+
+ private final double pLon, pLat, dPMetres, dMeetMetres;
+
+ public Q5WindowedFunction(double pLon, double pLat, double dPMetres, double dMeetMetres) {
+ this.pLon = pLon;
+ this.pLat = pLat;
+ this.dPMetres = dPMetres;
+ this.dMeetMetres = dMeetMetres;
+ }
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable elements,
+ Collector> out) {
+ // Last position per vehicle within the window
+ Map latest = new HashMap<>();
+ for (BerlinMODTrip trip : elements) {
+ BerlinMODTrip prev = latest.get(trip.getVehicleId());
+ if (prev == null || trip.getTimestamp() > prev.getTimestamp()) {
+ latest.put(trip.getVehicleId(), trip);
+ }
+ }
+
+ // Filter to vehicles near P
+ List>> nearP = new ArrayList<>();
+ for (Map.Entry e : latest.entrySet()) {
+ BerlinMODTrip t = e.getValue();
+ if (MEOSBridge.dwithinMetres(t.getLon(), t.getLat(), pLon, pLat, dPMetres)) {
+ nearP.add(new HashMap.SimpleEntry<>(e.getKey(), new Tuple2<>(t.getLon(), t.getLat())));
+ }
+ }
+ nearP.sort(Comparator.comparingInt(Map.Entry::getKey));
+
+ for (int i = 0; i < nearP.size(); i++) {
+ for (int j = i + 1; j < nearP.size(); j++) {
+ Tuple2 a = nearP.get(i).getValue();
+ Tuple2