Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.csv
.DS_Store
*.jar
flink-processor/target/
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,35 @@ Kafka producer
Flink Processor
<img src="doc/images/flink-processor.png" width="700" alt="Flink Processor" />


# BerlinMOD-9 × 3 streaming forms — the parity matrix on Flink

The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..Q9) in three streaming forms each on this runtime: **continuous** (always-on, per-event emission), **windowed** (tumbling 10-second aggregation), and **snapshot** (5-second tick — the parity-oracle form whose output at watermark T equals the batch BerlinMOD-Q result on data up to T).

| Q | Topic | Continuous | Windowed | Snapshot |
|---|---|---|---|---|
| Q1 | "which vehicles have appeared in the stream?" | ✓ | ✓ | ✓ |
| Q2 | "where is vehicle X at time T?" | ✓ | ✓ | ✓ |
| Q3 | "vehicles within d of P at time T?" | ✓ | ✓ | ✓ |
| Q4 | "vehicles entered region R, and when?" | ✓ | ✓ | ✓ |
| Q5 | "pairs of vehicles meeting near P" | ✓ | ✓ | ✓ |
| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ |
| Q7 | "first passage of vehicles through POIs" | ✓ | ✓ | ✓ |
| Q8 | "vehicles close to a road segment" | ✓ | ✓ | ✓ |
| Q9 | "distance between vehicles X and Y at time T" | ✓ | ✓ | ✓ |

**27 / 27 cells** = the full MobilityFlink parity-matrix row. Each cell has a dedicated `Q<N>{Continuous,Windowed,Snapshot}Function` class in [`flink-processor/src/main/java/berlinmod/`](flink-processor/src/main/java/berlinmod/) and is locally verified via the companion `BerlinMODQ<N>LocalTest` driver running on a Flink mini-cluster.

The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD).

Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which calls MEOS' `geog_dwithin` over WGS84 geographies via [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18) (the geodesic-wrapper PR, branched off the MEOS 1.4 regen at JMEOS#15) when libmeos is loadable on the runtime path. The distance entry points use [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18)'s `utils.spatial.Haversine.distance` (MEOS `geog_distance` over POINT/POINT) and `utils.spatial.PointToSegment.distance` (MEOS `geog_distance` over POINT/LINESTRING). When libmeos is not present (e.g. on the mini-cluster local-test runs where `-Dmobilityflink.meos.enabled=false` is set), the bridge falls back to pure-Java great-circle (`Haversine`) and planar segment-distance (`SegmentDistance`) — same semantics, identical predicate truth values to within float-precision at BerlinMOD scale.

The Kafka-source entry points for Q2 and Q3 are [`BerlinMODQ2Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java) and [`BerlinMODQ3Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java); the companion producer is [`python-producer-berlinmod.py`](kafka-producer/python-producer-berlinmod.py). Generate a BerlinMOD CSV with the upstream generator (`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in [`doc/berlinmod-q3-streaming-forms.md`](doc/berlinmod-q3-streaming-forms.md).

### Sibling parity work in the ecosystem

- [MobilityKafka#1](https://github.com/MobilityDB/MobilityKafka/pull/1) — the same 27-cell row on Kafka Streams
- [MobilityNebula#15](https://github.com/MobilityDB/MobilityNebula/pull/15) — 27 / 27 cells on NebulaStream scaffold (with [#16](https://github.com/MobilityDB/MobilityNebula/pull/16) adding `TEMPORAL_LENGTH` for Q6 and [#17](https://github.com/MobilityDB/MobilityNebula/pull/17) adding `PAIR_MEETING` + `CROSS_DISTANCE` for Q5/Q9, all calling MEOS C ABI directly)
- [MobilityDB-BerlinMOD#29](https://github.com/MobilityDB/MobilityDB-BerlinMOD/pull/29) — the batch BerlinMOD-9 cross-platform timings (the snapshot form's gold-answer source)
- [MobilityDB/.github#10](https://github.com/MobilityDB/.github/pull/10) — the ecosystem-profile description of the stream-layers tier

107 changes: 107 additions & 0 deletions doc/berlinmod-q3-streaming-forms.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# BerlinMOD-Q3 streaming forms

This document defines what **BerlinMOD-Q3** means in each of the three
streaming forms the parity contract specifies for the MobilityFlink /
MobilityKafka / MobilityNebula trio (see the planned-tier section of the
[ecosystem profile](https://github.com/MobilityDB/.github)).

## The batch query

> *Which vehicles were within distance `d` of point `P` at time `T`?*

Parameters: a point `P = (lon, lat)`, a radius `d` in metres, and a time `T`.
Returns: the set of `vehicle_id`s whose trajectory passed within `d` of `P` at `T`.

The batch reference implementation lives in
[MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD) and
runs against the three SQL surfaces (MobilityDB / MobilityDuck / MobilitySpark)
with byte-identical results — the batch oracle for the snapshot streaming form
below.

## The three streaming forms

### 1. Continuous form

> *"At every moment, which vehicles are currently within `d` of `P`?"*

For each incoming GPS event `(vehicle_id, t, lon, lat)`:

- Evaluate the radius predicate `distance((lon, lat), P) ≤ d`.
- Emit `(vehicle_id, t, near)` per event.

No window; output updates per event. Watermark-independent.

Use case: real-time geofence alerting where each event matters.

Implemented by [`Q3ContinuousFunction`](../flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java).

### 2. Windowed form

> *"Per N-second tumbling window, how many distinct vehicles were
> within `d` of `P` at any time during the window?"*

Tumbling event-time window of size `W` (default `W = 10s`). For each window:

- Collect all events whose timestamp falls in the window.
- Compute the distinct set `{vehicle_id : ∃ event in window with distance ≤ d}`.
- Emit `(window_start, window_end, distinct_count)`.

Use case: time-bucketed dashboards, near-real-time aggregates.

Implemented by [`Q3WindowedFunction`](../flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java).

### 3. Snapshot form — **the parity oracle**

> *"At time `T`, which vehicles are within `d` of `P`?"*

Watermark-driven. Per vehicle, maintain `lastKnownPosition` state. At each
snapshot tick (event-time timer at multiples of `snapshotTickMillis`,
default `5000 ms`):

- For each vehicle's most recent `(lon, lat)`, evaluate the radius predicate.
- Emit `(T, vehicle_id)` for every vehicle satisfying the predicate at `T`.

As the watermark advances to `T = max(event_times)`, the streaming snapshot
output **equals the batch BerlinMOD-Q3 result** on the same scale-factor
corpus. This is the parity property the contract enforces:

```
streaming-Q3-snapshot(T) ≡ batch-BerlinMOD-Q3 on data up to T
(same SF, same P, same d)
```

Use case: lambda-architecture style verification — streaming pipeline's
output must converge to the batch reference.

Implemented by [`Q3SnapshotFunction`](../flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java).

## Default parameters

The `BerlinMODQ3Main` entry point uses:

| Parameter | Value | Source |
|---|---|---|
| `P` (lon, lat) | (4.3517, 50.8503) — Brussels city centre | Default centre for the BerlinMOD-Brussels corpus |
| `d` (radius) | 5 000 m | Within-city-centre scale |
| `W` (window size) | 10 s | Same as the AIS example for consistency |
| Snapshot tick | 5 s | Half the window for finer parity-oracle granularity |
| Topic | `berlinmod` | Single shared topic across the three forms |

## Predicate implementation

The scaffold today uses a pure-Java great-circle (Haversine) distance check in
[`Haversine`](../flink-processor/src/main/java/berlinmod/Haversine.java). This
matches the predicate semantics of the MEOS `edwithin_tgeo_geo` operator (the
same call used by `MobilityNebula/Queries/Query1.yaml`), so swapping the
predicate body for a JMEOS-bridged `edwithin_tgeo_geo` call is a one-line
change once the JMEOS surface for that operator is verified — it is marked
`TODO(meos)` in each form's class.

## Companion producer

The BerlinMOD CSV → Kafka producer lives at
[`kafka-producer/python-producer-berlinmod.py`](../kafka-producer/python-producer-berlinmod.py).
Generate a BerlinMOD CSV at scale factor SF with the upstream generator
(`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB), name the
columns `(t, vehicle_id, lon, lat)`, and the producer streams it to the
`berlinmod` topic.
45 changes: 45 additions & 0 deletions flink-processor/docs/parity-status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# MobilityFlink parity status — MEOS surface audit

Generated 2026-05-29 by `tools/parity/parity_audit.py`.

The MobilityFlink MEOS facade (`org.mobilitydb.flink.meos.MeosOps*`) exposes MEOS C functions to Flink through JMEOS. This audit measures, per type family, the share of the **MEOS public C API** that the facade exposes and that JMEOS binds.

**Headline.** The facade exposes **2153 of 2153 public MEOS functions (100.0%)**. The MEOS public surface (`meos/include/meos*.h`, excluding internal headers) is 2158 functions; JMEOS binds 2153 of them. 0 bindable functions are not exposed (listed in §3).

Coverage is **static**: a function counts as covered when the facade declares a method of the same name and arity that delegates to a JMEOS export.

Per-family runtime behaviour is asserted by `src/test/java/org/mobilitydb/flink/meos/MeosFacadeSmokeTest.java`, which constructs and reads back a value in the core, geo, cbuffer, npoint and pose families through the facade against libmeos. The cbuffer, npoint and pose families require a libmeos built with the extended modules (`-DCBUFFER=ON -DNPOINT=ON -DPOSE=ON -DRGEO=ON`); the stock library carries the core and geo surfaces only.

## 1. Reference surface and method

- **Denominator**: distinct function names declared `extern` in the MEOS public headers `meos.h`, `meos_geo.h`, `meos_cbuffer.h`, `meos_npoint.h`, `meos_pose.h`, `meos_rgeo.h`. Internal headers (`meos_internal*.h`) are excluded.

- **Numerator**: `public static` methods on the generated `MeosOps*` facade whose name is also a `functions.GeneratedFunctions` export in the bundled JMEOS jar.

- **JMEOS jar**: jar/JMEOS.jar exports 2699 static methods.

## 2. Per-family coverage of the public MEOS surface

| Family (header) | Public ∩ JMEOS | Exposed | Missing | Coverage |
|---|---:|---:|---:|---:|
| core temporal / set / span / spanset / tbox (`meos.h`) | 1274 | 1274 | 0 | 100.0% |
| geo (tgeo / tpoint / stbox) (`meos_geo.h`) | 419 | 419 | 0 | 100.0% |
| cbuffer (`meos_cbuffer.h`) | 173 | 173 | 0 | 100.0% |
| npoint (`meos_npoint.h`) | 118 | 118 | 0 | 100.0% |
| pose (`meos_pose.h`) | 101 | 101 | 0 | 100.0% |
| rgeo (`meos_rgeo.h`) | 68 | 68 | 0 | 100.0% |
| **total** | **2153** | **2153** | **0** | **100.0%** |

## 3. Bindable MEOS functions not exposed by the facade

0 functions are present in the public MEOS headers and bound by JMEOS but not generated into the facade:


## 4. MobilityDB SQL-surface cross-check

The facade is also matched against the underlying MEOS C symbol of each addressable `CREATE FUNCTION` in `mobilitydb/sql/**/*.in.sql` (PG-only sections and helper symbols bucketed out; 876 out-of-scope, 113 SQL/plpgsql-composed functions with no single C symbol). Functions the SQL layer implements through the internal MEOS headers (`meos_internal*.h`) are exposed via `MeosOpsSqlSurface`.

- Addressable distinct C symbols: **1336**; bound by JMEOS: **1065**; exposed by the facade: **1065** (100.0% of the JMEOS-bindable SQL surface).

- The remaining **271** addressable C symbols are not exported by JMEOS under the name the SQL layer's extension wrapper uses; the wrapper names differ from the MEOS function names they call.

Binary file modified flink-processor/jar/JMEOS.jar
Binary file not shown.
Loading