Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
116bed1
feat(berlinmod): scaffold the full BerlinMOD-9 streaming-form parity …
estebanzimanyi May 20, 2026
56e5cbf
feat(berlinmod): route the streaming-form parity matrix through JMEOS…
estebanzimanyi May 21, 2026
b37ac88
codegen(meos): generate tier-aware MEOS facade for the full JMEOS 1.4…
estebanzimanyi May 21, 2026
1b18a8e
feat(wirings): stateless tier DataStream wirings for the generated ME…
estebanzimanyi May 21, 2026
49e7824
feat(wirings): bounded-state tier DataStream wiring + runnable demo
estebanzimanyi May 21, 2026
ad3cafe
feat(wirings): windowed tier DataStream wiring + runnable demo
estebanzimanyi May 21, 2026
d71d460
feat(wirings): cross-stream tier DataStream wiring + runnable demo (c…
estebanzimanyi May 21, 2026
173429b
feat(wirings): capstone demo composing all 4 tier wirings into one pi…
estebanzimanyi May 21, 2026
14b577f
test(parity): audit and verify the MEOS facade surface per family
estebanzimanyi May 27, 2026
df5c870
test(parity): resolve facade methods against libmeos and broaden the …
estebanzimanyi May 28, 2026
f2cab47
feat(parity): expose SQL-surface functions backed by internal MEOS he…
estebanzimanyi May 28, 2026
f8141d2
docs(parity): record the JMEOS-jar / MEOS-library resolution skew
estebanzimanyi May 28, 2026
caf44f5
feat(parity): expose the minDistance set-set distance functions in th…
estebanzimanyi May 29, 2026
1d47d3e
build(meos): select the extended temporal-type families via MobilityD…
estebanzimanyi May 29, 2026
80fc4dc
feat(meos): converge the MEOS facade onto the MEOS-API-generated JMEO…
estebanzimanyi May 29, 2026
4da3c2f
feat(meos): expose the H3 / th3index family in the facade
estebanzimanyi May 29, 2026
01100f0
feat(meos): expose the tbigint temporal family + always-covers, audit…
estebanzimanyi May 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.csv
.DS_Store
*.jar
flink-processor/target/
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,35 @@ Kafka producer
Flink Processor
<img src="doc/images/flink-processor.png" width="700" alt="Flink Processor" />


# BerlinMOD-9 × 3 streaming forms — the parity matrix on Flink

The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..Q9) in three streaming forms each on this runtime: **continuous** (always-on, per-event emission), **windowed** (tumbling 10-second aggregation), and **snapshot** (5-second tick — the parity-oracle form whose output at watermark T equals the batch BerlinMOD-Q result on data up to T).

| Q | Topic | Continuous | Windowed | Snapshot |
|---|---|---|---|---|
| Q1 | "which vehicles have appeared in the stream?" | ✓ | ✓ | ✓ |
| Q2 | "where is vehicle X at time T?" | ✓ | ✓ | ✓ |
| Q3 | "vehicles within d of P at time T?" | ✓ | ✓ | ✓ |
| Q4 | "vehicles entered region R, and when?" | ✓ | ✓ | ✓ |
| Q5 | "pairs of vehicles meeting near P" | ✓ | ✓ | ✓ |
| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ |
| Q7 | "first passage of vehicles through POIs" | ✓ | ✓ | ✓ |
| Q8 | "vehicles close to a road segment" | ✓ | ✓ | ✓ |
| Q9 | "distance between vehicles X and Y at time T" | ✓ | ✓ | ✓ |

**27 / 27 cells** = the full MobilityFlink parity-matrix row. Each cell has a dedicated `Q<N>{Continuous,Windowed,Snapshot}Function` class in [`flink-processor/src/main/java/berlinmod/`](flink-processor/src/main/java/berlinmod/) and is locally verified via the companion `BerlinMODQ<N>LocalTest` driver running on a Flink mini-cluster.

The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD).

Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which calls MEOS' `geog_dwithin` over WGS84 geographies via [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18) (the geodesic-wrapper PR, branched off the MEOS 1.4 regen at JMEOS#15) when libmeos is loadable on the runtime path. The distance entry points use [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18)'s `utils.spatial.Haversine.distance` (MEOS `geog_distance` over POINT/POINT) and `utils.spatial.PointToSegment.distance` (MEOS `geog_distance` over POINT/LINESTRING). When libmeos is not present (e.g. on the mini-cluster local-test runs where `-Dmobilityflink.meos.enabled=false` is set), the bridge falls back to pure-Java great-circle (`Haversine`) and planar segment-distance (`SegmentDistance`) — same semantics, identical predicate truth values to within float-precision at BerlinMOD scale.

The Kafka-source entry points for Q2 and Q3 are [`BerlinMODQ2Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java) and [`BerlinMODQ3Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java); the companion producer is [`python-producer-berlinmod.py`](kafka-producer/python-producer-berlinmod.py). Generate a BerlinMOD CSV with the upstream generator (`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in [`doc/berlinmod-q3-streaming-forms.md`](doc/berlinmod-q3-streaming-forms.md).

### Sibling parity work in the ecosystem

- [MobilityKafka#1](https://github.com/MobilityDB/MobilityKafka/pull/1) — the same 27-cell row on Kafka Streams
- [MobilityNebula#15](https://github.com/MobilityDB/MobilityNebula/pull/15) — 27 / 27 cells on NebulaStream scaffold (with [#16](https://github.com/MobilityDB/MobilityNebula/pull/16) adding `TEMPORAL_LENGTH` for Q6 and [#17](https://github.com/MobilityDB/MobilityNebula/pull/17) adding `PAIR_MEETING` + `CROSS_DISTANCE` for Q5/Q9, all calling MEOS C ABI directly)
- [MobilityDB-BerlinMOD#29](https://github.com/MobilityDB/MobilityDB-BerlinMOD/pull/29) — the batch BerlinMOD-9 cross-platform timings (the snapshot form's gold-answer source)
- [MobilityDB/.github#10](https://github.com/MobilityDB/.github/pull/10) — the ecosystem-profile description of the stream-layers tier

107 changes: 107 additions & 0 deletions doc/berlinmod-q3-streaming-forms.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# BerlinMOD-Q3 streaming forms

This document defines what **BerlinMOD-Q3** means in each of the three
streaming forms the parity contract specifies for the MobilityFlink /
MobilityKafka / MobilityNebula trio (see the planned-tier section of the
[ecosystem profile](https://github.com/MobilityDB/.github)).

## The batch query

> *Which vehicles were within distance `d` of point `P` at time `T`?*

Parameters: a point `P = (lon, lat)`, a radius `d` in metres, and a time `T`.
Returns: the set of `vehicle_id`s whose trajectory passed within `d` of `P` at `T`.

The batch reference implementation lives in
[MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD) and
runs against the three SQL surfaces (MobilityDB / MobilityDuck / MobilitySpark)
with byte-identical results — the batch oracle for the snapshot streaming form
below.

## The three streaming forms

### 1. Continuous form

> *"At every moment, which vehicles are currently within `d` of `P`?"*

For each incoming GPS event `(vehicle_id, t, lon, lat)`:

- Evaluate the radius predicate `distance((lon, lat), P) ≤ d`.
- Emit `(vehicle_id, t, near)` per event.

No window; output updates per event. Watermark-independent.

Use case: real-time geofence alerting where each event matters.

Implemented by [`Q3ContinuousFunction`](../flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java).

### 2. Windowed form

> *"Per N-second tumbling window, how many distinct vehicles were
> within `d` of `P` at any time during the window?"*

Tumbling event-time window of size `W` (default `W = 10s`). For each window:

- Collect all events whose timestamp falls in the window.
- Compute the distinct set `{vehicle_id : ∃ event in window with distance ≤ d}`.
- Emit `(window_start, window_end, distinct_count)`.

Use case: time-bucketed dashboards, near-real-time aggregates.

Implemented by [`Q3WindowedFunction`](../flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java).

### 3. Snapshot form — **the parity oracle**

> *"At time `T`, which vehicles are within `d` of `P`?"*

Watermark-driven. Per vehicle, maintain `lastKnownPosition` state. At each
snapshot tick (event-time timer at multiples of `snapshotTickMillis`,
default `5000 ms`):

- For each vehicle's most recent `(lon, lat)`, evaluate the radius predicate.
- Emit `(T, vehicle_id)` for every vehicle satisfying the predicate at `T`.

As the watermark advances to `T = max(event_times)`, the streaming snapshot
output **equals the batch BerlinMOD-Q3 result** on the same scale-factor
corpus. This is the parity property the contract enforces:

```
streaming-Q3-snapshot(T) ≡ batch-BerlinMOD-Q3 on data up to T
(same SF, same P, same d)
```

Use case: lambda-architecture style verification — streaming pipeline's
output must converge to the batch reference.

Implemented by [`Q3SnapshotFunction`](../flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java).

## Default parameters

The `BerlinMODQ3Main` entry point uses:

| Parameter | Value | Source |
|---|---|---|
| `P` (lon, lat) | (4.3517, 50.8503) — Brussels city centre | Default centre for the BerlinMOD-Brussels corpus |
| `d` (radius) | 5 000 m | Within-city-centre scale |
| `W` (window size) | 10 s | Same as the AIS example for consistency |
| Snapshot tick | 5 s | Half the window for finer parity-oracle granularity |
| Topic | `berlinmod` | Single shared topic across the three forms |

## Predicate implementation

The scaffold today uses a pure-Java great-circle (Haversine) distance check in
[`Haversine`](../flink-processor/src/main/java/berlinmod/Haversine.java). This
matches the predicate semantics of the MEOS `edwithin_tgeo_geo` operator (the
same call used by `MobilityNebula/Queries/Query1.yaml`), so swapping the
predicate body for a JMEOS-bridged `edwithin_tgeo_geo` call is a one-line
change once the JMEOS surface for that operator is verified — it is marked
`TODO(meos)` in each form's class.

## Companion producer

The BerlinMOD CSV → Kafka producer lives at
[`kafka-producer/python-producer-berlinmod.py`](../kafka-producer/python-producer-berlinmod.py).
Generate a BerlinMOD CSV at scale factor SF with the upstream generator
(`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB), name the
columns `(t, vehicle_id, lon, lat)`, and the producer streams it to the
`berlinmod` topic.
46 changes: 46 additions & 0 deletions flink-processor/docs/parity-status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# MobilityFlink parity status — MEOS surface audit

Generated 2026-05-29 by `tools/parity/parity_audit.py`.

The MobilityFlink MEOS facade (`org.mobilitydb.flink.meos.MeosOps*`) exposes MEOS C functions to Flink through JMEOS. This audit measures, per type family, the share of the **MEOS public C API** that the facade exposes and that JMEOS binds.

**Headline.** The facade exposes **2296 of 2296 public MEOS functions (100.0%)**. The MEOS public surface (`meos/include/meos*.h`, excluding internal headers) is 2297 functions; JMEOS binds 2296 of them. 0 bindable functions are not exposed (listed in §3).

Coverage is **static**: a function counts as covered when the facade declares a method of the same name and arity that delegates to a JMEOS export.

Per-family runtime behaviour is asserted by `src/test/java/org/mobilitydb/flink/meos/MeosFacadeSmokeTest.java`, which constructs and reads back a value in the core, geo, cbuffer, npoint and pose families through the facade against libmeos. The cbuffer, npoint and pose families require a libmeos built with the extended modules (`-DCBUFFER=ON -DNPOINT=ON -DPOSE=ON -DRGEO=ON`); the stock library carries the core and geo surfaces only.

## 1. Reference surface and method

- **Denominator**: distinct function names declared `extern` in the MEOS public headers `meos.h`, `meos_geo.h`, `meos_cbuffer.h`, `meos_npoint.h`, `meos_pose.h`, `meos_rgeo.h`. Internal headers (`meos_internal*.h`) are excluded.

- **Numerator**: `public static` methods on the generated `MeosOps*` facade whose name is also a `functions.GeneratedFunctions` export in the bundled JMEOS jar.

- **JMEOS jar**: jar/JMEOS.jar exports 2916 static methods.

## 2. Per-family coverage of the public MEOS surface

| Family (header) | Public ∩ JMEOS | Exposed | Missing | Coverage |
|---|---:|---:|---:|---:|
| core temporal / set / span / spanset / tbox (`meos.h`) | 1343 | 1343 | 0 | 100.0% |
| geo (tgeo / tpoint / stbox) (`meos_geo.h`) | 421 | 421 | 0 | 100.0% |
| cbuffer (`meos_cbuffer.h`) | 175 | 175 | 0 | 100.0% |
| npoint (`meos_npoint.h`) | 119 | 119 | 0 | 100.0% |
| pose (`meos_pose.h`) | 101 | 101 | 0 | 100.0% |
| rgeo (`meos_rgeo.h`) | 68 | 68 | 0 | 100.0% |
| h3 / th3index (`meos_h3.h`) | 69 | 69 | 0 | 100.0% |
| **total** | **2296** | **2296** | **0** | **100.0%** |

## 3. Bindable MEOS functions not exposed by the facade

0 functions are present in the public MEOS headers and bound by JMEOS but not generated into the facade:


## 4. MobilityDB SQL-surface cross-check

The facade is also matched against the underlying MEOS C symbol of each addressable `CREATE FUNCTION` in `mobilitydb/sql/**/*.in.sql` (PG-only sections and helper symbols bucketed out; 876 out-of-scope, 113 SQL/plpgsql-composed functions with no single C symbol). Functions the SQL layer implements through the internal MEOS headers (`meos_internal*.h`) are exposed via `MeosOpsSqlSurface`.

- Addressable distinct C symbols: **1336**; bound by JMEOS: **1068**; exposed by the facade: **1068** (100.0% of the JMEOS-bindable SQL surface).

- The remaining **268** addressable C symbols are not exported by JMEOS under the name the SQL layer's extension wrapper uses; the wrapper names differ from the MEOS function names they call.

Binary file modified flink-processor/jar/JMEOS.jar
Binary file not shown.
Loading