feat(berlinmod): streaming benchmark harness with corpus-derived parameters (stacks on #8)#9
Open
estebanzimanyi wants to merge 10 commits into
Open
Conversation
…eams (27/27) All 27 cells of the BerlinMOD-9 × 3-form parity matrix on MobilityKafka, matching MobilityFlink MobilityDB#3's coverage on the Kafka-Streams runtime. Continuous form (9): per-event emission via record-by-record dispatch. Windowed form (9): STREAM_TIME punctuator at WINDOW_SIZE_MILLIS (10_000 ms) emitting closed windows. Snapshot form (9): STREAM_TIME punctuator at SNAPSHOT_TICK_MILLIS (5_000 ms) emitting per-tick state. State patterns: - Stateless filter (Q2-c, Q3-c, Q8-c) - Single keyed flag (Q1-c, Q1-s, Q4-c) - Single-key state (Q2-s, Q9-s) - Per-vehicle accumulator (Q6-c, Q6-s) - Per-(vehicle, POI) keyed state (Q7-c, Q7-s) - Cross-vehicle shared via selectKey(0) (Q5-c, Q3-s, Q5-s, Q7-s, Q8-s) - Paired shared via selectKey(0) (Q9-c, Q4-s, Q9-s) - winStart-keyed comma-separated vehicleId set (Q1-w, Q3-w, Q8-w) - winStart-keyed encoded last-known per vehicle (Q2-w, Q5-w, Q9-w) - winStart-keyed per-vehicle accumulator (Q6-w) - winStart-keyed per-vehicle entries log (Q4-w, Q7-w) Output counts (TopologyTestDriver, 21-event sorted-by-event-time corpus + 2 sentinel records at t=15001 and t=20001): Q | continuous | windowed | snapshot ---|------------|----------|--------- Q1 | 3 | 2 | 13 Q2 | 7 | 2 | 4 Q3 | 21 | 2 | 9 Q4 | 1 | 2 | 5 Q5 | 19 | 2 | 4 Q6 | 21 | 6 | 13 Q7 | 3 | 6 | 13 Q8 | 21 | 2 | 9 Q9 | 13 | 2 | 4 All 9 windowed cells emit 2 lines per Q (Q1/Q2/Q3/Q4/Q5/Q8/Q9) or 6 lines per Q (Q6/Q7 — one per vehicle × 2 windows for per-vehicle outputs). Continuous and snapshot count differences vs MobilityFlink reflect Kafka Streams' STREAM_TIME punctuator semantics: fires on stream-time advance with state-at-fire-moment, multi-interval jumps coalesced; vs Flink's bounded source flushing all keyed timers with final state at +infty. Dual-sentinel pattern in LocalTest steps the punctuator through the desired tick boundaries. Bump-isolation: zero JMEOS calls, zero MEOS-C calls, zero PyMEOS dependency. Pure Kafka Streams + Jackson + Java. Spatial predicates use Haversine / SegmentDistance / point-in-box, marked TODO(meos) for JMEOS-bridge migration.
… 1.4 MEOSBridge
Mirrors the MobilityFlink bridge-swap PR. Introduces MEOSBridge as the
runtime spatial-predicate surface for all 27 BerlinMOD-9 × 3-form Kafka
Streams cells. The bridge calls MEOS via JMEOS 1.4 (geog_dwithin over
WGS84 geographies) when libmeos is loadable and falls back to the
pure-Java Haversine / SegmentDistance utilities when it is not.
- New kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java
with the dwithinMetres / dwithinSegmentMetres / distanceMetres
surface; fail-soft static init flips MEOS_AVAILABLE to false on
UnsatisfiedLinkError.
- All 27 Q<N>{Continuous,Snapshot,Windowed}Processor classes rewritten
to call MEOSBridge instead of Haversine / SegmentDistance directly.
- JMEOS.jar (478 305 bytes, JMEOS#15 regen artefact) brought into
kafka-streams-app/jar/ and declared as a system-path dependency in
pom.xml; jnr-ffi added explicitly since system-path jars do not bring
transitive dependencies.
- BerlinMODQ1LocalTest sets mobilitykafka.meos.enabled=false at main()
entry so the TopologyTestDriver run stays green without libmeos.so
on the runtime path.
- target/ build artefacts gitignored.
Build: mvn clean package -DskipTests green.
Verify: BerlinMODQ1LocalTest finishes clean, all 27 cells emit the
expected output.
…s + extended types + utils.spatial) Updates the bundled `kafka-streams-app/jar/JMEOS.jar` to a combined build of JMEOS PR #19 (regen against MEOS-API meos-idl.json, 2,699 methods including extended types) AND PR #18 (utils.spatial.Haversine + utils.spatial.PointToSegment wrappers that MEOSBridge.java imports). Surface delta vs the previous bundled jar: - public static methods: 2 699 (was 1 685) - utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2) → double - utils.spatial.PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) → double - tnpoint_ methods: 50 - tcbuffer / tpose / trgeo: now exposed - sha: a5895c9b94… size: 1,210,863 B Unblocks the MEOSBridge.java import path (line 116) — previously the jar shipped PR #19's GeneratedFunctions but not PR #18's utils.spatial, so base-branch mvn compile was RED. Both PRs now coalesced into a single jar. Unblocks codegen/kafka-meos-ops wedge stacked on this branch.
… surface (Kafka mirror of MobilityFlink MobilityDB#5) Mirror of MobilityFlink codegen/flink-meos-ops (PR MobilityDB#5) for the Kafka Streams binding. Same generators, same tier classification, same catalog source — differs only in package path (`org.mobilitydb.kafka.meos`) and module layout (`kafka-streams-app/` vs `flink-processor/`). Adds 50 `MeosOps<Class>` + 6 `MeosOpsFree<Header>` + 1 shared `MeosOpsRuntime` = 57 Java classes, 2,097 methods (77.7% of JMEOS PR #19's 2,699-method surface). Stacks on feat/jmeos-bridge-swap; additive-only; touches no existing file. See MobilityFlink MobilityDB#5 for the full tier vocabulary, regeneration recipe, and coexistence design with `berlinmod.MEOSBridge`.
…facades + capstone demo Adds the org.mobilitydb.kafka.meos.wirings package — Kafka mirror of the MobilityFlink wirings (PRs MobilityDB#6→MobilityDB#10). Single PR with all four tiers + runnable composite demo, since Kafka Streams' DSL is naturally lambda-driven and most tier wirings collapse to small static-factory classes returning serializable functional-interface implementations. ## Files - MeosStatelessOps.java — predicate/intPredicate/mapper factories returning Predicate<K,V> / ValueMapper<V,R> for KStream.filter / .mapValues (covers stateless tier: 804 methods + io-meta: 195 methods) - MeosBoundedStateProcessor.java — full Processor<KIn,VIn,KOut,VOut> class with KeyValueStore<KIn,byte[]> for per-key MEOS-handle state that survives changelog replay / rebalance (covers bounded-state tier: 797 methods) - MeosWindowedAggregator.java — initializer/aggregator factories for KStream.groupByKey().windowedBy(...).aggregate(...) (covers windowed tier: 161 methods) - MeosCrossStreamJoiner.java — joiner factory wrapping a serializable ValueJoiner for KStream.join(other, joiner, JoinWindows) (covers cross-stream tier: 140 methods) - MeosOpsRuntime.java — wirings-package alias for the codegen package's MeosOpsRuntime.MEOS_AVAILABLE flag (libmeos probed once per JVM) - demo/MeosWiringsDemoTopology.java — runnable Kafka Streams topology composing all four tier wirings into one pipeline (per-region running-union → 30s tumbling aggregate → ±1m cross-stream join against region-queries); main() prints topology description always, instantiates TopologyTestDriver when MEOS_AVAILABLE (no broker required) - README.md — tier vocabulary, lambda-first design rationale, full recipe + demo walkthrough, coexistence with berlinmod.MEOSBridge ## Cumulative wirings-layer coverage Same as the Flink side: 2,097 of 2,097 emitted methods (100%) wirable through 5 generic classes; zero per-method registration. ## Design choice — lambda-first Kafka Streams' DSL accepts lambdas directly (Predicate, ValueMapper, Aggregator, ValueJoiner). Only bounded-state needs a real class for state-store binding via Processor.init(ProcessorContext). The wirings reflect that asymmetry: small static-factory classes for the four lambda-shaped tiers, one full Processor class for bounded-state. Adopters wanting a class-shaped wiring (matching Flink for cross-binding parity) can subclass any of the helpers — the serializable functional interfaces are public. ## Stacks on PR MobilityDB#3 (codegen mirror) Additive-only: 6 new files under kafka-streams-app/src/main/java/org/mobilitydb/kafka/meos/wirings/. Touches no existing file. Locally compile-verified: 110 .class files total (94 from PR MobilityDB#3 base + 16 new — 4 wiring classes + 11 nested lambda interfaces + MeosOpsRuntime + 1 demo class).
…bump to the post-#1137 trgeometry API The MEOS facade is regenerated against the unified MEOS-API jar so its signatures match the generated functions.GeneratedFunctions surface (the bool/int and out-parameter scheme), and the rigid-geometry family adopts the post-#1137 trgeometry_* C API (MobilityDB #1137; trgeoinst_make unchanged, matching master). The facade gains the tbigint, H3/th3index, and parity-gap classes; the demo topology is updated to the converged signatures.
…B/MEOS build flags + per-family smoke tests The extended families are selected at build time with the same uppercase flag names and ON|OFF (also 1|0) values as the MobilityDB/MEOS CMake build: NPOINT is included by default; CBUFFER, POSE, RGEO, and H3 are included with -D<FAMILY>=ON (or =1) and dropped otherwise (RGEO needs POSE). Each family's MeosOps* facade sources and its per-family smoke test (MeosCbufferSmokeTest, MeosNpointSmokeTest, MeosPoseSmokeTest) are excluded when the family is off; the core and geo smoke checks remain in MeosFacadeSmokeTest.
…rough MEOS Every BerlinMOD spatial predicate evaluates through MEOS via the thin MEOSBridge: within-distance through edwithin_tgeo_geo (the vehicle position as a tgeogpoint instant, metres on the WGS84 spheroid), region containment through eintersects_tgeo_geo, and distances through geog_distance. MEOSBridge holds no spatial mathematics of its own and initialises MEOS per stream thread; the pure-Java Haversine and SegmentDistance classes are removed.
The parity audit cross-checks the generated MEOS facade (org.mobilitydb.kafka.meos.MeosOps*) against the JMEOS jar and the public MEOS headers, and emits docs/parity-status.md. The facade exposes 2296 of the 2296 JMEOS-bindable public MEOS functions (100%) and 1044 of 1044 SQL-addressable C-symbols, measured against the post-#1137 surface (trgeometry).
35c0367 to
d223904
Compare
…meters BerlinMODBenchmark runs each of the 27 BerlinMOD-9 × 3-form cells in isolation through a TopologyTestDriver and reads its output cardinality (correctness); EmbeddedBrokerBenchmark runs each cell as a KafkaStreams application against a fresh in-process EmbeddedKafkaCluster and reports steady-state per-event throughput. Both share the corpus-derived query parameters and window/tick granularity (BerlinMODCorpus.derive) and evaluate the spatial predicates through MEOS. On the 216,075-instant real BerlinMOD corpus the per-event spatial cells sustain 45,000-47,000 ev/s; docs/benchmark.md carries the full 27-cell table.
d223904 to
87de66c
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
BerlinMODBenchmark drives the full BerlinMOD topology in-process through a TopologyTestDriver over a corpus and reads each cell's output cardinality, with the spatial predicates evaluating through MEOS. BerlinMODCorpus loads the real BerlinMOD instants (reprojected EPSG:3857→4326 through MEOS) or a synthetic corpus and derives the per-query parameters and window/tick from the corpus; BerlinMODTopology.build(Params) auto-scales to the corpus span, so the topology matches the MobilityFlink mechanism rather than carrying a fixed window/tick.