Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
116bed1
feat(berlinmod): scaffold the full BerlinMOD-9 streaming-form parity …
estebanzimanyi May 20, 2026
56e5cbf
feat(berlinmod): route the streaming-form parity matrix through JMEOS…
estebanzimanyi May 21, 2026
b37ac88
codegen(meos): generate tier-aware MEOS facade for the full JMEOS 1.4…
estebanzimanyi May 21, 2026
1b18a8e
feat(wirings): stateless tier DataStream wirings for the generated ME…
estebanzimanyi May 21, 2026
49e7824
feat(wirings): bounded-state tier DataStream wiring + runnable demo
estebanzimanyi May 21, 2026
ad3cafe
feat(wirings): windowed tier DataStream wiring + runnable demo
estebanzimanyi May 21, 2026
d71d460
feat(wirings): cross-stream tier DataStream wiring + runnable demo (c…
estebanzimanyi May 21, 2026
173429b
feat(wirings): capstone demo composing all 4 tier wirings into one pi…
estebanzimanyi May 21, 2026
14b577f
test(parity): audit and verify the MEOS facade surface per family
estebanzimanyi May 27, 2026
df5c870
test(parity): resolve facade methods against libmeos and broaden the …
estebanzimanyi May 28, 2026
f2cab47
feat(parity): expose SQL-surface functions backed by internal MEOS he…
estebanzimanyi May 28, 2026
f8141d2
docs(parity): record the JMEOS-jar / MEOS-library resolution skew
estebanzimanyi May 28, 2026
caf44f5
feat(parity): expose the minDistance set-set distance functions in th…
estebanzimanyi May 29, 2026
1d47d3e
build(meos): select the extended temporal-type families via MobilityD…
estebanzimanyi May 29, 2026
80fc4dc
feat(meos): converge the MEOS facade onto the MEOS-API-generated JMEO…
estebanzimanyi May 29, 2026
4da3c2f
feat(meos): expose the H3 / th3index family in the facade
estebanzimanyi May 29, 2026
01100f0
feat(meos): expose the tbigint temporal family + always-covers, audit…
estebanzimanyi May 29, 2026
2bb4c19
feat(berlinmod): route the streaming-form spatial predicates through …
estebanzimanyi May 29, 2026
0d29a39
feat(berlinmod): run the streaming benchmark on real BerlinMOD data a…
estebanzimanyi May 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.csv
.DS_Store
*.jar
flink-processor/target/
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,35 @@ Kafka producer
Flink Processor
<img src="doc/images/flink-processor.png" width="700" alt="Flink Processor" />


# BerlinMOD-9 × 3 streaming forms — the parity matrix on Flink

The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..Q9) in three streaming forms each on this runtime: **continuous** (always-on, per-event emission), **windowed** (tumbling 10-second aggregation), and **snapshot** (5-second tick — the parity-oracle form whose output at watermark T equals the batch BerlinMOD-Q result on data up to T).

| Q | Topic | Continuous | Windowed | Snapshot |
|---|---|---|---|---|
| Q1 | "which vehicles have appeared in the stream?" | ✓ | ✓ | ✓ |
| Q2 | "where is vehicle X at time T?" | ✓ | ✓ | ✓ |
| Q3 | "vehicles within d of P at time T?" | ✓ | ✓ | ✓ |
| Q4 | "vehicles entered region R, and when?" | ✓ | ✓ | ✓ |
| Q5 | "pairs of vehicles meeting near P" | ✓ | ✓ | ✓ |
| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ |
| Q7 | "first passage of vehicles through POIs" | ✓ | ✓ | ✓ |
| Q8 | "vehicles close to a road segment" | ✓ | ✓ | ✓ |
| Q9 | "distance between vehicles X and Y at time T" | ✓ | ✓ | ✓ |

**27 / 27 cells** = the full MobilityFlink parity-matrix row. Each cell has a dedicated `Q<N>{Continuous,Windowed,Snapshot}Function` class in [`flink-processor/src/main/java/berlinmod/`](flink-processor/src/main/java/berlinmod/) and is locally verified via the companion `BerlinMODQ<N>LocalTest` driver running on a Flink mini-cluster.

The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD).

Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which holds no spatial mathematics of its own: it builds the MEOS temporal instants and geographies and delegates the computation to libmeos. The within-distance predicate uses MEOS `edwithin_tgeo_geo` — the vehicle position as a `tgeogpoint` instant, tested against the query geography in metres on the WGS84 spheroid; region containment uses `eintersects_tgeo_geo` between the point's `tgeompoint` instant and the region polygon; and the pairwise and cumulative distances use `geog_distance`.

The Kafka-source entry points for Q2 and Q3 are [`BerlinMODQ2Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java) and [`BerlinMODQ3Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java); the companion producer is [`python-producer-berlinmod.py`](kafka-producer/python-producer-berlinmod.py). Generate a BerlinMOD CSV with the upstream generator (`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in [`doc/berlinmod-q3-streaming-forms.md`](doc/berlinmod-q3-streaming-forms.md).

### Sibling parity work in the ecosystem

- [MobilityKafka#1](https://github.com/MobilityDB/MobilityKafka/pull/1) — the same 27-cell row on Kafka Streams
- [MobilityNebula#15](https://github.com/MobilityDB/MobilityNebula/pull/15) — 27 / 27 cells on NebulaStream scaffold (with [#16](https://github.com/MobilityDB/MobilityNebula/pull/16) adding `TEMPORAL_LENGTH` for Q6 and [#17](https://github.com/MobilityDB/MobilityNebula/pull/17) adding `PAIR_MEETING` + `CROSS_DISTANCE` for Q5/Q9, all calling MEOS C ABI directly)
- [MobilityDB-BerlinMOD#29](https://github.com/MobilityDB/MobilityDB-BerlinMOD/pull/29) — the batch BerlinMOD-9 cross-platform timings (the snapshot form's gold-answer source)
- [MobilityDB/.github#10](https://github.com/MobilityDB/.github/pull/10) — the ecosystem-profile description of the stream-layers tier

107 changes: 107 additions & 0 deletions doc/berlinmod-q3-streaming-forms.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# BerlinMOD-Q3 streaming forms

This document defines what **BerlinMOD-Q3** means in each of the three
streaming forms the parity contract specifies for the MobilityFlink /
MobilityKafka / MobilityNebula trio (see the planned-tier section of the
[ecosystem profile](https://github.com/MobilityDB/.github)).

## The batch query

> *Which vehicles were within distance `d` of point `P` at time `T`?*

Parameters: a point `P = (lon, lat)`, a radius `d` in metres, and a time `T`.
Returns: the set of `vehicle_id`s whose trajectory passed within `d` of `P` at `T`.

The batch reference implementation lives in
[MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD) and
runs against the three SQL surfaces (MobilityDB / MobilityDuck / MobilitySpark)
with byte-identical results — the batch oracle for the snapshot streaming form
below.

## The three streaming forms

### 1. Continuous form

> *"At every moment, which vehicles are currently within `d` of `P`?"*

For each incoming GPS event `(vehicle_id, t, lon, lat)`:

- Evaluate the radius predicate `distance((lon, lat), P) ≤ d`.
- Emit `(vehicle_id, t, near)` per event.

No window; output updates per event. Watermark-independent.

Use case: real-time geofence alerting where each event matters.

Implemented by [`Q3ContinuousFunction`](../flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java).

### 2. Windowed form

> *"Per N-second tumbling window, how many distinct vehicles were
> within `d` of `P` at any time during the window?"*

Tumbling event-time window of size `W` (default `W = 10s`). For each window:

- Collect all events whose timestamp falls in the window.
- Compute the distinct set `{vehicle_id : ∃ event in window with distance ≤ d}`.
- Emit `(window_start, window_end, distinct_count)`.

Use case: time-bucketed dashboards, near-real-time aggregates.

Implemented by [`Q3WindowedFunction`](../flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java).

### 3. Snapshot form — **the parity oracle**

> *"At time `T`, which vehicles are within `d` of `P`?"*

Watermark-driven. Per vehicle, maintain `lastKnownPosition` state. At each
snapshot tick (event-time timer at multiples of `snapshotTickMillis`,
default `5000 ms`):

- For each vehicle's most recent `(lon, lat)`, evaluate the radius predicate.
- Emit `(T, vehicle_id)` for every vehicle satisfying the predicate at `T`.

As the watermark advances to `T = max(event_times)`, the streaming snapshot
output **equals the batch BerlinMOD-Q3 result** on the same scale-factor
corpus. This is the parity property the contract enforces:

```
streaming-Q3-snapshot(T) ≡ batch-BerlinMOD-Q3 on data up to T
(same SF, same P, same d)
```

Use case: lambda-architecture style verification — streaming pipeline's
output must converge to the batch reference.

Implemented by [`Q3SnapshotFunction`](../flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java).

## Default parameters

The `BerlinMODQ3Main` entry point uses:

| Parameter | Value | Source |
|---|---|---|
| `P` (lon, lat) | (4.3517, 50.8503) — Brussels city centre | Default centre for the BerlinMOD-Brussels corpus |
| `d` (radius) | 5 000 m | Within-city-centre scale |
| `W` (window size) | 10 s | Same as the AIS example for consistency |
| Snapshot tick | 5 s | Half the window for finer parity-oracle granularity |
| Topic | `berlinmod` | Single shared topic across the three forms |

## Predicate implementation

The within-distance predicate evaluates through the MEOS `edwithin_tgeo_geo`
operator — the same call used by `MobilityNebula/Queries/Query1.yaml`. The
vehicle position is built as a `tgeogpoint` instant and tested against the
query geography in metres on the WGS84 spheroid. All spatial predicates route
through [`MEOSBridge`](../flink-processor/src/main/java/berlinmod/MEOSBridge.java),
which holds no spatial mathematics of its own: it constructs the MEOS inputs
(temporal instants and geographies) and delegates the computation to libmeos.

## Companion producer

The BerlinMOD CSV → Kafka producer lives at
[`kafka-producer/python-producer-berlinmod.py`](../kafka-producer/python-producer-berlinmod.py).
Generate a BerlinMOD CSV at scale factor SF with the upstream generator
(`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB), name the
columns `(t, vehicle_id, lon, lat)`, and the producer streams it to the
`berlinmod` topic.
89 changes: 89 additions & 0 deletions flink-processor/docs/benchmark-results.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# BerlinMOD streaming-matrix throughput

Throughput of the BerlinMOD-9 × 3-form streaming matrix (9 queries ×
{continuous, windowed, snapshot} = 27 cells) on the Flink local mini-cluster
over the real BerlinMOD instants corpus. The spatial predicates evaluate through
MEOS: within-distance through `edwithin_tgeo_geo`, region containment through
`eintersects_tgeo_geo`, and distances through `geog_distance` (see
[`MEOSBridge`](../src/main/java/berlinmod/MEOSBridge.java)).

## Method

The corpus is the BerlinMOD `berlinmod_instants.csv` produced by the BerlinMOD
generator — 216 075 instants, 5 vehicles, over ~11 days. Instants are stored in
EPSG:3857 and reprojected to EPSG:4326 through MEOS `geo_transform` at load (see
[`BerlinMODCorpus`](../src/main/java/berlinmod/BerlinMODCorpus.java)); the
per-query parameters (point `P` = corpus centroid, region box, road segment,
points of interest, target vehicle ids) and the window/tick granularity are
derived from the corpus so each spatial cell is selective and the matrix
produces a comparable number of windows. Each cell runs as its own Flink job
terminated by a counting sink; throughput is input events ÷ wall-clock and
`output rows` is the sink cardinality. Parallelism 1, Flink 1.16, Java 21,
16-core x86-64 Linux; libmeos built `-DMEOS=ON -DCBUFFER=ON -DNPOINT=ON
-DPOSE=ON -DRGEO=ON`.

Run from `flink-processor/`:

```
LD_LIBRARY_PATH=<libmeos-dir> java \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.time=ALL-UNNAMED \
-cp target/classes:jar/JMEOS.jar:<deps> \
berlinmod.BerlinMODBenchmark --csv <berlinmod_instants.csv>
```

## Results — real BerlinMOD instants (216 075 events)

| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) |
|---|---:|---:|---:|---:|
| Q1-continuous | 216075 | 5 | 2508 | 86,154 |
| Q1-windowed | 216075 | 86 | 1294 | 166,982 |
| Q1-snapshot | 216075 | 274 | 1056 | 204,616 |
| Q2-continuous | 216075 | 61170 | 1074 | 201,187 |
| Q2-windowed | 216075 | 50 | 1027 | 210,394 |
| Q2-snapshot | 216075 | 71 | 985 | 219,365 |
| Q3-continuous | 216075 | 216075 | 2928 | 73,796 |
| Q3-windowed | 216075 | 86 | 2507 | 86,189 |
| Q3-snapshot | 216075 | 0 | 926 | 233,342 |
| Q4-continuous | 216075 | 62 | 3254 | 66,403 |
| Q4-windowed | 216075 | 98 | 3234 | 66,814 |
| Q4-snapshot | 216075 | 1944 | 3223 | 67,042 |
| Q5-continuous | 216075 | 73063 | 9161 | 23,586 |
| Q5-windowed | 216075 | 6 | 954 | 226,494 |
| Q5-snapshot | 216075 | 0 | 915 | 236,148 |
| Q6-continuous | 216075 | 216075 | 2382 | 90,712 |
| Q6-windowed | 216075 | 203 | 2637 | 81,940 |
| Q6-snapshot | 216075 | 274 | 2214 | 97,595 |
| Q7-continuous | 216075 | 5 | 3973 | 54,386 |
| Q7-windowed | 216075 | 53 | 5004 | 43,180 |
| Q7-snapshot | 216075 | 288 | 3931 | 54,967 |
| Q8-continuous | 216075 | 216075 | 2883 | 74,948 |
| Q8-windowed | 216075 | 86 | 2864 | 75,445 |
| Q8-snapshot | 216075 | 126 | 928 | 232,839 |
| Q9-continuous | 216075 | 107870 | 1858 | 116,294 |
| Q9-windowed | 216075 | 22 | 924 | 233,847 |
| Q9-snapshot | 216075 | 95 | 992 | 217,818 |

## Parity — streaming continuous form ≡ batch MEOS predicate

The continuous form emits `predicate(event)` for every event, so it is checked
event-for-event against a batch pass over the same corpus through the same
`MEOSBridge` call ([`BerlinMODParity`](../src/main/java/berlinmod/BerlinMODParity.java)).
Both spatial-membership queries match exactly.

| Query | Events | Streaming-true | Batch-true | Mismatches | Parity |
|---|---:|---:|---:|---:|---|
| Q3 (within `d` of `P`) | 216075 | 56086 | 56086 | 0 | exact |
| Q8 (within `d` of segment) | 216075 | 118498 | 118498 | 0 | exact |

## Characteristics

Q5-continuous enumerates every meeting pair across all vehicles on each event
(O(V²) per event, keyed to a single subtask); it is the lowest throughput of the
matrix. The snapshot form is a sampled form — it evaluates each vehicle's
last-known position at tick instants — so a within-`P` snapshot can be empty when
no vehicle is within `d` of `P` at a tick boundary even though the continuous
form reports near-`P` events between boundaries.
46 changes: 46 additions & 0 deletions flink-processor/docs/parity-status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# MobilityFlink parity status — MEOS surface audit

Generated 2026-05-29 by `tools/parity/parity_audit.py`.

The MobilityFlink MEOS facade (`org.mobilitydb.flink.meos.MeosOps*`) exposes MEOS C functions to Flink through JMEOS. This audit measures, per type family, the share of the **MEOS public C API** that the facade exposes and that JMEOS binds.

**Headline.** The facade exposes **2296 of 2296 public MEOS functions (100.0%)**. The MEOS public surface (`meos/include/meos*.h`, excluding internal headers) is 2297 functions; JMEOS binds 2296 of them. 0 bindable functions are not exposed (listed in §3).

Coverage is **static**: a function counts as covered when the facade declares a method of the same name and arity that delegates to a JMEOS export.

Per-family runtime behaviour is asserted by `src/test/java/org/mobilitydb/flink/meos/MeosFacadeSmokeTest.java`, which constructs and reads back a value in the core, geo, cbuffer, npoint and pose families through the facade against libmeos. The cbuffer, npoint and pose families require a libmeos built with the extended modules (`-DCBUFFER=ON -DNPOINT=ON -DPOSE=ON -DRGEO=ON`); the stock library carries the core and geo surfaces only.

## 1. Reference surface and method

- **Denominator**: distinct function names declared `extern` in the MEOS public headers `meos.h`, `meos_geo.h`, `meos_cbuffer.h`, `meos_npoint.h`, `meos_pose.h`, `meos_rgeo.h`. Internal headers (`meos_internal*.h`) are excluded.

- **Numerator**: `public static` methods on the generated `MeosOps*` facade whose name is also a `functions.GeneratedFunctions` export in the bundled JMEOS jar.

- **JMEOS jar**: jar/JMEOS.jar exports 2916 static methods.

## 2. Per-family coverage of the public MEOS surface

| Family (header) | Public ∩ JMEOS | Exposed | Missing | Coverage |
|---|---:|---:|---:|---:|
| core temporal / set / span / spanset / tbox (`meos.h`) | 1343 | 1343 | 0 | 100.0% |
| geo (tgeo / tpoint / stbox) (`meos_geo.h`) | 421 | 421 | 0 | 100.0% |
| cbuffer (`meos_cbuffer.h`) | 175 | 175 | 0 | 100.0% |
| npoint (`meos_npoint.h`) | 119 | 119 | 0 | 100.0% |
| pose (`meos_pose.h`) | 101 | 101 | 0 | 100.0% |
| rgeo (`meos_rgeo.h`) | 68 | 68 | 0 | 100.0% |
| h3 / th3index (`meos_h3.h`) | 69 | 69 | 0 | 100.0% |
| **total** | **2296** | **2296** | **0** | **100.0%** |

## 3. Bindable MEOS functions not exposed by the facade

0 functions are present in the public MEOS headers and bound by JMEOS but not generated into the facade:


## 4. MobilityDB SQL-surface cross-check

The facade is also matched against the underlying MEOS C symbol of each addressable `CREATE FUNCTION` in `mobilitydb/sql/**/*.in.sql` (PG-only sections and helper symbols bucketed out; 876 out-of-scope, 113 SQL/plpgsql-composed functions with no single C symbol). Functions the SQL layer implements through the internal MEOS headers (`meos_internal*.h`) are exposed via `MeosOpsSqlSurface`.

- Addressable distinct C symbols: **1336**; bound by JMEOS: **1068**; exposed by the facade: **1068** (100.0% of the JMEOS-bindable SQL surface).

- The remaining **268** addressable C symbols are not exported by JMEOS under the name the SQL layer's extension wrapper uses; the wrapper names differ from the MEOS function names they call.

Binary file modified flink-processor/jar/JMEOS.jar
Binary file not shown.
Loading