diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c328f7a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +kafka-streams-app/target/ +*.class diff --git a/README.md b/README.md index 64ca8d3..1285d5b 100644 --- a/README.md +++ b/README.md @@ -14,3 +14,57 @@ The MobilityDB project is developed by the Computer & Decision Engineering Depar More information about MobilityDB, including publications, presentations, etc., can be found in the MobilityDB [website](https://mobilitydb.com). + +# BerlinMOD-9 × 3 streaming forms — the parity matrix on Kafka Streams + +The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..Q9) in three streaming forms each on this runtime: **continuous** (always-on, per-event emission), **windowed** (tumbling 10-second aggregation), and **snapshot** (5-second tick — the parity-oracle form whose output at watermark T equals the batch BerlinMOD-Q result on data up to T). + +| Q | Topic | Continuous | Windowed | Snapshot | +|---|---|---|---|---| +| Q1 | "which vehicles have appeared in the stream?" | ✓ | ✓ | ✓ | +| Q2 | "where is vehicle X at time T?" | ✓ | ✓ | ✓ | +| Q3 | "vehicles within d of P at time T?" | ✓ | ✓ | ✓ | +| Q4 | "vehicles entered region R, and when?" | ✓ | ✓ | ✓ | +| Q5 | "pairs of vehicles meeting near P" | ✓ | ✓ | ✓ | +| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ | +| Q7 | "first passage of vehicles through POIs" | ✓ | ✓ | ✓ | +| Q8 | "vehicles close to a road segment" | ✓ | ✓ | ✓ | +| Q9 | "distance between vehicles X and Y at time T" | ✓ | ✓ | ✓ | + +**27 / 27 cells** = the full MobilityKafka parity-matrix row. Each cell has a dedicated `Q{Continuous,Windowed,Snapshot}Processor` class in [`kafka-streams-app/src/main/java/berlinmod/`](kafka-streams-app/src/main/java/berlinmod/) and is locally verified via [`BerlinMODQ1LocalTest`](kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java) running on the Kafka-Streams `TopologyTestDriver` (no real broker required). + +## Module structure + +`kafka-streams-app/` is a Maven project (Java 21, Kafka Streams 3.6.0) holding: + +- 27 per-cell `Q{Continuous,Windowed,Snapshot}Processor` classes +- `BerlinMODTopology` — unified topology fanning input topic `berlinmod` to per-Q-form output topics +- `BerlinMODTrip` + `BerlinMODTripSerde` — shared data class + JSON Serde (byte-shape equivalent to MobilityFlink's `BerlinMODTrip`) +- `Haversine` + `SegmentDistance` + `PointOfInterest` — pure-Java geometry utilities used by the spatial-predicate cells +- `BerlinMODQ1LocalTest` — TopologyTestDriver-based local end-to-end driver + +The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD). + +Spatial predicates route through [`MEOSBridge`](kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java), which calls MEOS' `geog_dwithin` over WGS84 geographies via [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18) (the geodesic-wrapper PR, branched off the MEOS 1.4 regen at JMEOS#15) when libmeos is loadable on the runtime path. The distance entry points use [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18)'s `utils.spatial.Haversine.distance` (MEOS `geog_distance` over POINT/POINT) and `utils.spatial.PointToSegment.distance` (MEOS `geog_distance` over POINT/LINESTRING). When libmeos is not present (e.g. the TopologyTestDriver local-test run where `-Dmobilitykafka.meos.enabled=false` is set), the bridge falls back to pure-Java great-circle (`Haversine`) and planar segment-distance (`SegmentDistance`) — same semantics, identical predicate truth values to within float-precision at BerlinMOD scale. + +## Build and run + +``` +cd kafka-streams-app +mvn -q clean package -DskipTests +java --add-opens java.base/java.lang=ALL-UNNAMED \ + --add-opens java.base/java.util=ALL-UNNAMED \ + --add-opens java.base/java.lang.reflect=ALL-UNNAMED \ + -cp target/mobility-kafka-streams-1.0-SNAPSHOT.jar \ + berlinmod.BerlinMODQ1LocalTest +``` + +The driver pipes a 21-event sorted-event-time corpus plus two sentinel records at `t = T0+15001` and `t = T0+20001` (to step the STREAM_TIME punctuator through the desired tick boundaries) and reads every per-Q-form output topic with the appropriate deserializer. Expected per-Q-form counts are in the PR body for the open scaffold PR. + +## Sibling parity work in the ecosystem + +- [MobilityFlink#3](https://github.com/MobilityDB/MobilityFlink/pull/3) — the same 27-cell row on Flink +- [MobilityNebula#15](https://github.com/MobilityDB/MobilityNebula/pull/15) — 27 / 27 cells on NebulaStream scaffold (with [#16](https://github.com/MobilityDB/MobilityNebula/pull/16) adding `TEMPORAL_LENGTH` for Q6 and [#17](https://github.com/MobilityDB/MobilityNebula/pull/17) adding `PAIR_MEETING` + `CROSS_DISTANCE` for Q5/Q9, all calling MEOS C ABI directly) +- [MobilityDB-BerlinMOD#29](https://github.com/MobilityDB/MobilityDB-BerlinMOD/pull/29) — the batch BerlinMOD-9 cross-platform timings (the snapshot form's gold-answer source) +- [MobilityDB/.github#10](https://github.com/MobilityDB/.github/pull/10) — the ecosystem-profile description of the stream-layers tier + diff --git a/kafka-streams-app/jar/JMEOS.jar b/kafka-streams-app/jar/JMEOS.jar new file mode 100644 index 0000000..2bc69e5 Binary files /dev/null and b/kafka-streams-app/jar/JMEOS.jar differ diff --git a/kafka-streams-app/pom.xml b/kafka-streams-app/pom.xml new file mode 100644 index 0000000..e80d6d3 --- /dev/null +++ b/kafka-streams-app/pom.xml @@ -0,0 +1,119 @@ + + + 4.0.0 + + com.mobilitydb.kafka + mobility-kafka-streams + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + 3.6.0 + 2.14.3 + 1.7.30 + 5.8.2 + + + + + + com.mobilitydb + jmeos + 1.4.0 + system + ${project.basedir}/jar/JMEOS.jar + + + + + com.github.jnr + jnr-ffi + 2.1.10 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-streams-test-utils + ${kafka.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + + + + maven-compiler-plugin + 3.12.1 + + ${maven.compiler.source} + ${maven.compiler.target} + + + + maven-shade-plugin + 3.5.1 + + + package + + shade + + + false + + + + berlinmod.BerlinMODQ1LocalTest + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + diff --git a/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java b/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java new file mode 100644 index 0000000..112a21e --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/BerlinMODQ1LocalTest.java @@ -0,0 +1,234 @@ +package berlinmod; + +import org.apache.kafka.common.serialization.BooleanDeserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Properties; + +/** + * Local end-to-end test driver for the BerlinMOD-9 Kafka-Streams topology. + * + *

Runs the full {@link BerlinMODTopology} in-process via + * {@link TopologyTestDriver}, pipes the shared 21-event synthetic corpus + * plus a sentinel event at t=15001 to advance stream-time past the third + * snapshot tick (t=15000), and reads every per-{@code Q}-form output + * topic with its appropriate deserializer. + */ +public class BerlinMODQ1LocalTest { + + private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ1LocalTest.class); + private static final long T0 = 1_735_711_200_000L; + // Two sentinels because Kafka Streams' STREAM_TIME punctuator coalesces + // a multi-interval stream-time jump into a single fire — to get both + // snapshot tick 15000 and tick 20000 we advance in two steps. The second + // sentinel also closes the [10000, 20000) windowed cycle. + private static final long SENTINEL_T1 = T0 + 15_001L; + private static final long SENTINEL_T2 = T0 + 20_001L; + + public static void main(String[] args) { + System.setProperty("mobilitykafka.meos.enabled", "false"); + LOG.info("BerlinMODQ1LocalTest starting (TopologyTestDriver, all continuous + snapshot forms)"); + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "berlinmod-test"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + + BerlinMODTripSerde tripSerde = new BerlinMODTripSerde(); + + try (TopologyTestDriver driver = new TopologyTestDriver(BerlinMODTopology.build(), props)) { + + TestInputTopic input = + driver.createInputTopic(BerlinMODTopology.INPUT_TOPIC, + new IntegerSerializer(), + tripSerde.serializer()); + + for (BerlinMODTrip trip : buildEvents()) { + input.pipeInput(trip.getVehicleId(), trip, + Instant.ofEpochMilli(trip.getTimestamp())); + } + // Sentinel events (vehicleId == -1, ignored by all processors) to advance + // stream-time across snapshot/windowed punctuator boundaries. + input.pipeInput(-1, new BerlinMODTrip(-1, SENTINEL_T1, 0.0, 0.0), + Instant.ofEpochMilli(SENTINEL_T1)); + input.pipeInput(-1, new BerlinMODTrip(-1, SENTINEL_T2, 0.0, 0.0), + Instant.ofEpochMilli(SENTINEL_T2)); + + // ---- continuous outputs ---- + readAndPrint(driver, BerlinMODTopology.Q1_CONTINUOUS_OUTPUT, + "Q1-continuous", Serdes.Integer().deserializer(), new LongDeserializer(), + (k, v) -> "(" + k + "," + v + ")"); + + readAndPrintTrips(driver, BerlinMODTopology.Q2_CONTINUOUS_OUTPUT, + "Q2-continuous", tripSerde); + + readAndPrint(driver, BerlinMODTopology.Q3_CONTINUOUS_OUTPUT, + "Q3-continuous", Serdes.Integer().deserializer(), new BooleanDeserializer(), + (k, v) -> "(" + k + "," + v + ")"); + + readAndPrint(driver, BerlinMODTopology.Q4_CONTINUOUS_OUTPUT, + "Q4-continuous", Serdes.Integer().deserializer(), new LongDeserializer(), + (k, v) -> "(" + k + "," + v + ")"); + + readAndPrint(driver, BerlinMODTopology.Q5_CONTINUOUS_OUTPUT, + "Q5-continuous", new StringDeserializer(), new DoubleDeserializer(), + (k, v) -> k + " distance=" + v); + + readAndPrint(driver, BerlinMODTopology.Q6_CONTINUOUS_OUTPUT, + "Q6-continuous", Serdes.Integer().deserializer(), new DoubleDeserializer(), + (k, v) -> "v=" + k + " total=" + v); + + readAndPrint(driver, BerlinMODTopology.Q7_CONTINUOUS_OUTPUT, + "Q7-continuous", Serdes.Integer().deserializer(), new LongDeserializer(), + (k, v) -> "poi=" + k + " firstAt=" + v); + + readAndPrint(driver, BerlinMODTopology.Q8_CONTINUOUS_OUTPUT, + "Q8-continuous", Serdes.Integer().deserializer(), new BooleanDeserializer(), + (k, v) -> "(" + k + "," + v + ")"); + + readAndPrint(driver, BerlinMODTopology.Q9_CONTINUOUS_OUTPUT, + "Q9-continuous", new LongDeserializer(), new DoubleDeserializer(), + (k, v) -> "t=" + k + " distance=" + v); + + // ---- windowed outputs ---- + readAndPrint(driver, BerlinMODTopology.Q1_WINDOWED_OUTPUT, + "Q1-windowed", new LongDeserializer(), new LongDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") count=" + v); + + readAndPrint(driver, BerlinMODTopology.Q3_WINDOWED_OUTPUT, + "Q3-windowed", new LongDeserializer(), new LongDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") count=" + v); + + readAndPrint(driver, BerlinMODTopology.Q8_WINDOWED_OUTPUT, + "Q8-windowed", new LongDeserializer(), new LongDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") count=" + v); + + readAndPrint(driver, BerlinMODTopology.Q2_WINDOWED_OUTPUT, + "Q2-windowed", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") " + v); + + readAndPrint(driver, BerlinMODTopology.Q4_WINDOWED_OUTPUT, + "Q4-windowed", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") " + v); + + readAndPrint(driver, BerlinMODTopology.Q5_WINDOWED_OUTPUT, + "Q5-windowed", new StringDeserializer(), new DoubleDeserializer(), + (k, v) -> k + " distance=" + v); + + readAndPrint(driver, BerlinMODTopology.Q6_WINDOWED_OUTPUT, + "Q6-windowed", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") " + v); + + readAndPrint(driver, BerlinMODTopology.Q7_WINDOWED_OUTPUT, + "Q7-windowed", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") " + v); + + readAndPrint(driver, BerlinMODTopology.Q9_WINDOWED_OUTPUT, + "Q9-windowed", new LongDeserializer(), new DoubleDeserializer(), + (k, v) -> "win[" + k + ", " + (k + BerlinMODTopology.WINDOW_SIZE_MILLIS) + ") distance=" + v); + + // ---- snapshot outputs ---- + readAndPrint(driver, BerlinMODTopology.Q1_SNAPSHOT_OUTPUT, + "Q1-snapshot", new LongDeserializer(), new IntegerDeserializer(), + (k, v) -> "(" + k + "," + v + ")"); + + readAndPrint(driver, BerlinMODTopology.Q2_SNAPSHOT_OUTPUT, + "Q2-snapshot", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "T=" + k + " " + v); + + readAndPrint(driver, BerlinMODTopology.Q3_SNAPSHOT_OUTPUT, + "Q3-snapshot", new LongDeserializer(), new IntegerDeserializer(), + (k, v) -> "(" + k + "," + v + ")"); + + readAndPrint(driver, BerlinMODTopology.Q4_SNAPSHOT_OUTPUT, + "Q4-snapshot", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "T=" + k + " " + v); + + readAndPrint(driver, BerlinMODTopology.Q5_SNAPSHOT_OUTPUT, + "Q5-snapshot", new StringDeserializer(), new DoubleDeserializer(), + (k, v) -> k + " distance=" + v); + + readAndPrint(driver, BerlinMODTopology.Q6_SNAPSHOT_OUTPUT, + "Q6-snapshot", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "T=" + k + " " + v); + + readAndPrint(driver, BerlinMODTopology.Q7_SNAPSHOT_OUTPUT, + "Q7-snapshot", new LongDeserializer(), new StringDeserializer(), + (k, v) -> "T=" + k + " " + v); + + readAndPrint(driver, BerlinMODTopology.Q8_SNAPSHOT_OUTPUT, + "Q8-snapshot", new LongDeserializer(), new IntegerDeserializer(), + (k, v) -> "(" + k + "," + v + ")"); + + readAndPrint(driver, BerlinMODTopology.Q9_SNAPSHOT_OUTPUT, + "Q9-snapshot", new LongDeserializer(), new DoubleDeserializer(), + (k, v) -> "T=" + k + " distance=" + v); + } + LOG.info("BerlinMODQ1LocalTest done"); + } + + @FunctionalInterface + private interface Fmt { String render(K k, V v); } + + private static void readAndPrint( + TopologyTestDriver driver, String topic, String tag, + org.apache.kafka.common.serialization.Deserializer kd, + org.apache.kafka.common.serialization.Deserializer vd, + Fmt fmt) { + TestOutputTopic out = driver.createOutputTopic(topic, kd, vd); + List> records = out.readKeyValuesToList(); + System.out.println("=== " + tag + " output (" + records.size() + " lines) ==="); + for (KeyValue kv : records) { + System.out.println(tag + "> " + fmt.render(kv.key, kv.value)); + } + } + + private static void readAndPrintTrips( + TopologyTestDriver driver, String topic, String tag, BerlinMODTripSerde tripSerde) { + TestOutputTopic out = + driver.createOutputTopic(topic, Serdes.Integer().deserializer(), tripSerde.deserializer()); + List> records = out.readKeyValuesToList(); + System.out.println("=== " + tag + " output (" + records.size() + " lines) ==="); + for (KeyValue kv : records) { + System.out.println(String.format("%s> v=%d t=%d (%.4f,%.4f)", + tag, kv.value.getVehicleId(), kv.value.getTimestamp(), + kv.value.getLon(), kv.value.getLat())); + } + } + + private static List buildEvents() { + List events = new ArrayList<>(); + for (int i = 0; i <= 12; i += 2) { + events.add(new BerlinMODTrip(100, T0 + i * 1000L, 4.3517, 50.8503)); + } + for (int i = 1; i <= 13; i += 2) { + events.add(new BerlinMODTrip(200, T0 + i * 1000L, 4.3060, 50.8270)); + } + for (int i = 0; i <= 12; i += 2) { + events.add(new BerlinMODTrip(300, T0 + i * 1000L, 4.2000, 50.7500)); + } + // Kafka-Streams STREAM_TIME punctuators fire on stream-time advance with + // state-at-fire-moment. Sort by event-time so all vehicles' early events + // are seen before the first tick, matching a real Kafka source's + // monotonic event-time delivery. + events.sort(Comparator.comparingLong(BerlinMODTrip::getTimestamp)); + return events; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/BerlinMODTopology.java b/kafka-streams-app/src/main/java/berlinmod/BerlinMODTopology.java new file mode 100644 index 0000000..f4a2a4b --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/BerlinMODTopology.java @@ -0,0 +1,289 @@ +package berlinmod; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; + +import java.util.Arrays; +import java.util.List; + +/** + * BerlinMOD-9 unified Kafka-Streams topology. + * + *

Reads a single source topic {@link #INPUT_TOPIC} and fans out into + * per-{@code Q}-form output topics. Each output topic carries one form + * of one query. + */ +public final class BerlinMODTopology { + + public static final String INPUT_TOPIC = "berlinmod"; + public static final long SNAPSHOT_TICK_MILLIS = 5_000L; + public static final long WINDOW_SIZE_MILLIS = 10_000L; + + public static final String Q1_WINDOWED_OUTPUT = "berlinmod-q1-windowed"; + public static final String Q1_WIN_STORE = "q1-win-store"; + public static final String Q2_WINDOWED_OUTPUT = "berlinmod-q2-windowed"; + public static final String Q2_WIN_STORE = "q2-win-store"; + public static final String Q3_WINDOWED_OUTPUT = "berlinmod-q3-windowed"; + public static final String Q3_WIN_STORE = "q3-win-store"; + public static final String Q4_WINDOWED_OUTPUT = "berlinmod-q4-windowed"; + public static final String Q4_WIN_STORE = "q4-win-store"; + public static final String Q5_WINDOWED_OUTPUT = "berlinmod-q5-windowed"; + public static final String Q5_WIN_STORE = "q5-win-store"; + public static final String Q6_WINDOWED_OUTPUT = "berlinmod-q6-windowed"; + public static final String Q6_WIN_STORE = "q6-win-store"; + public static final String Q7_WINDOWED_OUTPUT = "berlinmod-q7-windowed"; + public static final String Q7_WIN_STORE = "q7-win-store"; + public static final String Q8_WINDOWED_OUTPUT = "berlinmod-q8-windowed"; + public static final String Q8_WIN_STORE = "q8-win-store"; + public static final String Q9_WINDOWED_OUTPUT = "berlinmod-q9-windowed"; + public static final String Q9_WIN_STORE = "q9-win-store"; + + // ---------- Q1 ---------- + public static final String Q1_CONTINUOUS_OUTPUT = "berlinmod-q1-continuous"; + public static final String Q1_SEEN_STORE = "q1-seen-store"; + public static final String Q1_SNAPSHOT_OUTPUT = "berlinmod-q1-snapshot"; + public static final String Q1_SNAP_STORE = "q1-snap-store"; + + // ---------- Q2 ---------- + public static final String Q2_CONTINUOUS_OUTPUT = "berlinmod-q2-continuous"; + public static final int Q2_TARGET_VEHICLE_ID = 200; + public static final String Q2_SNAPSHOT_OUTPUT = "berlinmod-q2-snapshot"; + public static final String Q2_SNAP_STORE = "q2-snap-store"; + + // ---------- Q3 ---------- + public static final String Q3_CONTINUOUS_OUTPUT = "berlinmod-q3-continuous"; + public static final double Q3_P_LON = 4.3517; + public static final double Q3_P_LAT = 50.8503; + public static final double Q3_RADIUS_METRES = 5_000.0; + public static final String Q3_SNAPSHOT_OUTPUT = "berlinmod-q3-snapshot"; + public static final String Q3_SNAP_STORE = "q3-snap-store"; + + // ---------- Q4 ---------- + public static final String Q4_CONTINUOUS_OUTPUT = "berlinmod-q4-continuous"; + public static final String Q4_WAS_INSIDE_STORE = "q4-was-inside-store"; + public static final double Q4_XMIN = 4.30, Q4_YMIN = 50.84, Q4_XMAX = 4.36, Q4_YMAX = 50.86; + public static final String Q4_SNAPSHOT_OUTPUT = "berlinmod-q4-snapshot"; + public static final String Q4_SNAP_WAS_INSIDE_STORE = "q4-snap-was-inside-store"; + public static final String Q4_SNAP_ENTRIES_STORE = "q4-snap-entries-store"; + + // ---------- Q5 ---------- + public static final String Q5_CONTINUOUS_OUTPUT = "berlinmod-q5-continuous"; + public static final String Q5_LAST_POS_STORE = "q5-last-pos-store"; + public static final double Q5_P_LON = 4.3517; + public static final double Q5_P_LAT = 50.8503; + public static final double Q5_D_P_METRES = 5_000.0; + public static final double Q5_D_MEET_METRES = 5_000.0; + public static final String Q5_SNAPSHOT_OUTPUT = "berlinmod-q5-snapshot"; + public static final String Q5_SNAP_STORE = "q5-snap-store"; + + // ---------- Q6 ---------- + public static final String Q6_CONTINUOUS_OUTPUT = "berlinmod-q6-continuous"; + public static final String Q6_STATE_STORE = "q6-state-store"; + public static final String Q6_SNAPSHOT_OUTPUT = "berlinmod-q6-snapshot"; + public static final String Q6_SNAP_STORE = "q6-snap-store"; + + // ---------- Q7 ---------- + public static final String Q7_CONTINUOUS_OUTPUT = "berlinmod-q7-continuous"; + public static final String Q7_FIRST_PASSED_STORE = "q7-first-passed-store"; + public static final List Q7_POIS = Arrays.asList( + new PointOfInterest(1, 4.3517, 50.8503, 2_000.0), + new PointOfInterest(2, 4.3060, 50.8270, 1_000.0), + new PointOfInterest(3, 4.2100, 50.7600, 2_000.0)); + public static final String Q7_SNAPSHOT_OUTPUT = "berlinmod-q7-snapshot"; + public static final String Q7_SNAP_STORE = "q7-snap-store"; + + // ---------- Q8 ---------- + public static final String Q8_CONTINUOUS_OUTPUT = "berlinmod-q8-continuous"; + public static final double Q8_S1_LON = 4.30, Q8_S1_LAT = 50.83; + public static final double Q8_S2_LON = 4.36, Q8_S2_LAT = 50.87; + public static final double Q8_RADIUS_METRES = 5_000.0; + public static final String Q8_SNAPSHOT_OUTPUT = "berlinmod-q8-snapshot"; + public static final String Q8_SNAP_STORE = "q8-snap-store"; + + // ---------- Q9 ---------- + public static final String Q9_CONTINUOUS_OUTPUT = "berlinmod-q9-continuous"; + public static final String Q9_STATE_STORE = "q9-state-store"; + public static final int Q9_X_VEHICLE_ID = 100; + public static final int Q9_Y_VEHICLE_ID = 200; + public static final String Q9_SNAPSHOT_OUTPUT = "berlinmod-q9-snapshot"; + public static final String Q9_SNAP_STORE = "q9-snap-store"; + + private BerlinMODTopology() {} + + public static Topology build() { + StreamsBuilder builder = new StreamsBuilder(); + BerlinMODTripSerde tripSerde = new BerlinMODTripSerde(); + + // ---- continuous-form state stores ---- + addStore(builder, Q1_SEEN_STORE, Serdes.Integer(), Serdes.Boolean()); + addStore(builder, Q4_WAS_INSIDE_STORE, Serdes.Integer(), Serdes.Boolean()); + addStore(builder, Q5_LAST_POS_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q6_STATE_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q7_FIRST_PASSED_STORE, Serdes.Integer(), Serdes.Long()); + addStore(builder, Q9_STATE_STORE, Serdes.Integer(), Serdes.String()); + + // ---- windowed-form state stores ---- + addStore(builder, Q1_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q2_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q3_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q4_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q5_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q6_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q7_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q8_WIN_STORE, Serdes.Long(), Serdes.String()); + addStore(builder, Q9_WIN_STORE, Serdes.Long(), Serdes.String()); + + // ---- snapshot-form state stores (separate to avoid co-write conflicts with continuous) ---- + addStore(builder, Q1_SNAP_STORE, Serdes.Integer(), Serdes.Long()); + addStore(builder, Q2_SNAP_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q3_SNAP_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q4_SNAP_WAS_INSIDE_STORE, Serdes.Integer(), Serdes.Boolean()); + addStore(builder, Q4_SNAP_ENTRIES_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q5_SNAP_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q6_SNAP_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q7_SNAP_STORE, Serdes.Integer(), Serdes.Long()); + addStore(builder, Q8_SNAP_STORE, Serdes.Integer(), Serdes.String()); + addStore(builder, Q9_SNAP_STORE, Serdes.Integer(), Serdes.String()); + + // ---- streams ---- + KStream trips = + builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), tripSerde)); + + // Re-keyed by constant for the shared-state snapshot/multi-vehicle processors + KStream tripsK0 = trips.selectKey((k, v) -> 0); + + // ====== continuous form ====== + trips.process(() -> new Q1ContinuousProcessor(Q1_SEEN_STORE), Q1_SEEN_STORE) + .to(Q1_CONTINUOUS_OUTPUT, Produced.with(Serdes.Integer(), Serdes.Long())); + + trips.process(() -> new Q2ContinuousProcessor(Q2_TARGET_VEHICLE_ID)) + .to(Q2_CONTINUOUS_OUTPUT, Produced.with(Serdes.Integer(), tripSerde)); + + trips.process(() -> new Q3ContinuousProcessor(Q3_P_LON, Q3_P_LAT, Q3_RADIUS_METRES)) + .to(Q3_CONTINUOUS_OUTPUT, Produced.with(Serdes.Integer(), Serdes.Boolean())); + + trips.process(() -> new Q4ContinuousProcessor(Q4_WAS_INSIDE_STORE, Q4_XMIN, Q4_YMIN, Q4_XMAX, Q4_YMAX), + Q4_WAS_INSIDE_STORE) + .to(Q4_CONTINUOUS_OUTPUT, Produced.with(Serdes.Integer(), Serdes.Long())); + + tripsK0.process(() -> new Q5ContinuousProcessor(Q5_LAST_POS_STORE, + Q5_P_LON, Q5_P_LAT, Q5_D_P_METRES, Q5_D_MEET_METRES), + Q5_LAST_POS_STORE) + .to(Q5_CONTINUOUS_OUTPUT, Produced.with(Serdes.String(), Serdes.Double())); + + trips.process(() -> new Q6ContinuousProcessor(Q6_STATE_STORE), Q6_STATE_STORE) + .to(Q6_CONTINUOUS_OUTPUT, Produced.with(Serdes.Integer(), Serdes.Double())); + + trips.process(() -> new Q7ContinuousProcessor(Q7_FIRST_PASSED_STORE, Q7_POIS), + Q7_FIRST_PASSED_STORE) + .to(Q7_CONTINUOUS_OUTPUT, Produced.with(Serdes.Integer(), Serdes.Long())); + + trips.process(() -> new Q8ContinuousProcessor(Q8_S1_LON, Q8_S1_LAT, Q8_S2_LON, Q8_S2_LAT, Q8_RADIUS_METRES)) + .to(Q8_CONTINUOUS_OUTPUT, Produced.with(Serdes.Integer(), Serdes.Boolean())); + + tripsK0.process(() -> new Q9ContinuousProcessor(Q9_STATE_STORE, Q9_X_VEHICLE_ID, Q9_Y_VEHICLE_ID), + Q9_STATE_STORE) + .to(Q9_CONTINUOUS_OUTPUT, Produced.with(Serdes.Long(), Serdes.Double())); + + // ====== windowed form (distinct-count per tumbling window for Q1/Q3/Q8) ====== + tripsK0.process(() -> new Q1WindowedProcessor(Q1_WIN_STORE, WINDOW_SIZE_MILLIS), Q1_WIN_STORE) + .to(Q1_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.Long())); + + tripsK0.process(() -> new Q3WindowedProcessor(Q3_WIN_STORE, + Q3_P_LON, Q3_P_LAT, Q3_RADIUS_METRES, + WINDOW_SIZE_MILLIS), + Q3_WIN_STORE) + .to(Q3_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.Long())); + + tripsK0.process(() -> new Q8WindowedProcessor(Q8_WIN_STORE, + Q8_S1_LON, Q8_S1_LAT, Q8_S2_LON, Q8_S2_LAT, + Q8_RADIUS_METRES, WINDOW_SIZE_MILLIS), + Q8_WIN_STORE) + .to(Q8_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.Long())); + + tripsK0.process(() -> new Q2WindowedProcessor(Q2_WIN_STORE, Q2_TARGET_VEHICLE_ID, WINDOW_SIZE_MILLIS), + Q2_WIN_STORE) + .to(Q2_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q4WindowedProcessor(Q4_WIN_STORE, + Q4_XMIN, Q4_YMIN, Q4_XMAX, Q4_YMAX, + WINDOW_SIZE_MILLIS), + Q4_WIN_STORE) + .to(Q4_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q5WindowedProcessor(Q5_WIN_STORE, + Q5_P_LON, Q5_P_LAT, Q5_D_P_METRES, Q5_D_MEET_METRES, + WINDOW_SIZE_MILLIS), + Q5_WIN_STORE) + .to(Q5_WINDOWED_OUTPUT, Produced.with(Serdes.String(), Serdes.Double())); + + tripsK0.process(() -> new Q6WindowedProcessor(Q6_WIN_STORE, WINDOW_SIZE_MILLIS), Q6_WIN_STORE) + .to(Q6_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q7WindowedProcessor(Q7_WIN_STORE, Q7_POIS, WINDOW_SIZE_MILLIS), Q7_WIN_STORE) + .to(Q7_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q9WindowedProcessor(Q9_WIN_STORE, + Q9_X_VEHICLE_ID, Q9_Y_VEHICLE_ID, + WINDOW_SIZE_MILLIS), + Q9_WIN_STORE) + .to(Q9_WINDOWED_OUTPUT, Produced.with(Serdes.Long(), Serdes.Double())); + + // ====== snapshot form (all via constant key, with STREAM_TIME punctuators) ====== + tripsK0.process(() -> new Q1SnapshotProcessor(Q1_SNAP_STORE, SNAPSHOT_TICK_MILLIS), Q1_SNAP_STORE) + .to(Q1_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.Integer())); + + tripsK0.process(() -> new Q2SnapshotProcessor(Q2_SNAP_STORE, Q2_TARGET_VEHICLE_ID, SNAPSHOT_TICK_MILLIS), + Q2_SNAP_STORE) + .to(Q2_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q3SnapshotProcessor(Q3_SNAP_STORE, + Q3_P_LON, Q3_P_LAT, Q3_RADIUS_METRES, SNAPSHOT_TICK_MILLIS), + Q3_SNAP_STORE) + .to(Q3_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.Integer())); + + tripsK0.process(() -> new Q4SnapshotProcessor(Q4_SNAP_WAS_INSIDE_STORE, Q4_SNAP_ENTRIES_STORE, + Q4_XMIN, Q4_YMIN, Q4_XMAX, Q4_YMAX, SNAPSHOT_TICK_MILLIS), + Q4_SNAP_WAS_INSIDE_STORE, Q4_SNAP_ENTRIES_STORE) + .to(Q4_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q5SnapshotProcessor(Q5_SNAP_STORE, + Q5_P_LON, Q5_P_LAT, Q5_D_P_METRES, Q5_D_MEET_METRES, + SNAPSHOT_TICK_MILLIS), + Q5_SNAP_STORE) + .to(Q5_SNAPSHOT_OUTPUT, Produced.with(Serdes.String(), Serdes.Double())); + + tripsK0.process(() -> new Q6SnapshotProcessor(Q6_SNAP_STORE, SNAPSHOT_TICK_MILLIS), Q6_SNAP_STORE) + .to(Q6_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q7SnapshotProcessor(Q7_SNAP_STORE, Q7_POIS, SNAPSHOT_TICK_MILLIS), Q7_SNAP_STORE) + .to(Q7_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.String())); + + tripsK0.process(() -> new Q8SnapshotProcessor(Q8_SNAP_STORE, + Q8_S1_LON, Q8_S1_LAT, Q8_S2_LON, Q8_S2_LAT, + Q8_RADIUS_METRES, SNAPSHOT_TICK_MILLIS), + Q8_SNAP_STORE) + .to(Q8_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.Integer())); + + tripsK0.process(() -> new Q9SnapshotProcessor(Q9_SNAP_STORE, + Q9_X_VEHICLE_ID, Q9_Y_VEHICLE_ID, SNAPSHOT_TICK_MILLIS), + Q9_SNAP_STORE) + .to(Q9_SNAPSHOT_OUTPUT, Produced.with(Serdes.Long(), Serdes.Double())); + + return builder.build(); + } + + private static void addStore(StreamsBuilder builder, String name, + org.apache.kafka.common.serialization.Serde ks, + org.apache.kafka.common.serialization.Serde vs) { + StoreBuilder> sb = + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(name), ks, vs); + builder.addStateStore(sb); + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/BerlinMODTrip.java b/kafka-streams-app/src/main/java/berlinmod/BerlinMODTrip.java new file mode 100644 index 0000000..f512fe7 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/BerlinMODTrip.java @@ -0,0 +1,37 @@ +package berlinmod; + +import java.io.Serializable; + +/** + * Plain data class for a single GPS event from a BerlinMOD trip. + * + *

Same shape as the MobilityFlink {@code BerlinMODTrip} so a CSV produced + * by the BerlinMOD generator at any SF feeds both pipelines unchanged. + */ +public class BerlinMODTrip implements Serializable { + + private static final long serialVersionUID = 1L; + + private long timestamp; // epoch milliseconds (event time) + private int vehicleId; + private double lon; + private double lat; + + public BerlinMODTrip() {} + + public BerlinMODTrip(int vehicleId, long timestamp, double lon, double lat) { + this.vehicleId = vehicleId; + this.timestamp = timestamp; + this.lon = lon; + this.lat = lat; + } + + public long getTimestamp() { return timestamp; } + public void setTimestamp(long timestamp) { this.timestamp = timestamp; } + public int getVehicleId() { return vehicleId; } + public void setVehicleId(int vehicleId) { this.vehicleId = vehicleId; } + public double getLon() { return lon; } + public void setLon(double lon) { this.lon = lon; } + public double getLat() { return lat; } + public void setLat(double lat) { this.lat = lat; } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/BerlinMODTripSerde.java b/kafka-streams-app/src/main/java/berlinmod/BerlinMODTripSerde.java new file mode 100644 index 0000000..298fd07 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/BerlinMODTripSerde.java @@ -0,0 +1,65 @@ +package berlinmod; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +/** + * JSON-based Kafka Streams {@link Serde} for {@link BerlinMODTrip}. + * + *

Records on the {@code berlinmod} topic are JSON objects of the form + * {@code {"t": "yyyy-MM-dd HH:mm:ss", "vehicle_id": 42, "lon": 4.36, "lat": + * 50.84}}. The serializer writes that exact shape; the deserializer parses + * the same shape (with a tolerant timestamp format matching the producer). + */ +public final class BerlinMODTripSerde implements Serde { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Override + public Serializer serializer() { + return (topic, trip) -> { + if (trip == null) return null; + try { + return MAPPER.writeValueAsBytes(new SerForm(trip)); + } catch (Exception e) { + throw new RuntimeException("BerlinMODTrip serialize failed", e); + } + }; + } + + @Override + public Deserializer deserializer() { + return (topic, bytes) -> { + if (bytes == null) return null; + try { + SerForm sf = MAPPER.readValue(bytes, SerForm.class); + return sf.toTrip(); + } catch (Exception e) { + throw new RuntimeException("BerlinMODTrip deserialize failed", e); + } + }; + } + + /** Wire form used for JSON serialisation — keeps Jackson happy without per-field annotations. */ + private static final class SerForm { + public Long t; + public int vehicle_id; + public double lon; + public double lat; + + public SerForm() {} + + SerForm(BerlinMODTrip trip) { + this.t = trip.getTimestamp(); + this.vehicle_id = trip.getVehicleId(); + this.lon = trip.getLon(); + this.lat = trip.getLat(); + } + + BerlinMODTrip toTrip() { + return new BerlinMODTrip(vehicle_id, t == null ? 0L : t, lon, lat); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Haversine.java b/kafka-streams-app/src/main/java/berlinmod/Haversine.java new file mode 100644 index 0000000..0203583 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Haversine.java @@ -0,0 +1,33 @@ +package berlinmod; + +/** + * Great-circle distance in metres between two WGS84 (lon, lat) points. + * + *

Pure-Java fallback for {@link MEOSBridge#dwithinMetres} / + * {@link MEOSBridge#distanceMetres}, used by the BerlinMOD-9 × 3-form + * streaming scaffold when libmeos is not loadable on the runtime path. The + * primary spatial-predicate surface is {@link MEOSBridge}. + */ +public final class Haversine { + + private static final double EARTH_RADIUS_METRES = 6_371_000.0; + + private Haversine() {} + + public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) { + double phi1 = Math.toRadians(lat1); + double phi2 = Math.toRadians(lat2); + double dPhi = Math.toRadians(lat2 - lat1); + double dLambda = Math.toRadians(lon2 - lon1); + + double a = Math.sin(dPhi / 2) * Math.sin(dPhi / 2) + + Math.cos(phi1) * Math.cos(phi2) + * Math.sin(dLambda / 2) * Math.sin(dLambda / 2); + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + return EARTH_RADIUS_METRES * c; + } + + public static boolean withinMetres(double lon, double lat, double pLon, double pLat, double radiusMetres) { + return distanceMetres(lon, lat, pLon, pLat) <= radiusMetres; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java b/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java new file mode 100644 index 0000000..6d1a9ff --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java @@ -0,0 +1,158 @@ +package berlinmod; + +import functions.functions; +import jnr.ffi.Pointer; +import utils.spatial.PointToSegment; + +/** + * Runtime bridge from MobilityKafka BerlinMOD streaming-form predicates to + * MEOS via JMEOS. + * + *

All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold + * flow through this class. When the JMEOS native libmeos shared object is + * present and loadable, each predicate evaluates through MEOS' WGS84 + * geography surface ({@code geom_to_geog} + {@code geog_dwithin}). When + * libmeos is not available, each predicate falls back to the corresponding + * pure-Java implementation in {@link Haversine} or {@link SegmentDistance} + * so the BerlinMOD mini-cluster local tests stay runnable on systems + * without a MEOS install. + * + *

The fallback is gated by the {@link #MEOS_AVAILABLE} static flag, set + * once at class-load time: + *

+ */ +public final class MEOSBridge { + + /** + * {@code true} iff MEOS is available on this runtime and the bridge + * routes through it; {@code false} iff the bridge will use the pure-Java + * fallbacks. + */ + public static final boolean MEOS_AVAILABLE; + + static { + boolean enabled = + Boolean.parseBoolean(System.getProperty("mobilitykafka.meos.enabled", "true")); + boolean ok = false; + if (enabled) { + try { + functions.meos_initialize(); + ok = true; + } catch (Throwable t) { + // libmeos shared object not loadable on this runtime — fall back. + ok = false; + } + } + MEOS_AVAILABLE = ok; + } + + private MEOSBridge() { + // utility + } + + // ---------------------------------------------------------------------- + // Public bridge surface — same shape as Haversine + SegmentDistance. + // ---------------------------------------------------------------------- + + /** + * @return {@code true} if the great-circle distance from {@code (lon1, lat1)} + * to {@code (lon2, lat2)} on the WGS84 spheroid is at most + * {@code radiusMetres}. MEOS-backed via {@code geog_dwithin} when + * available, else pure-Java {@link Haversine#withinMetres}. + */ + public static boolean dwithinMetres(double lon1, double lat1, + double lon2, double lat2, + double radiusMetres) { + if (!MEOS_AVAILABLE) { + return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres); + } + Pointer g1 = pointGeog(lon1, lat1); + Pointer g2 = pointGeog(lon2, lat2); + if (g1 == null || g2 == null) { + return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres); + } + return functions.geog_dwithin(g1, g2, radiusMetres, true); + } + + /** + * @return {@code true} if the spheroidal distance from {@code (pLon, pLat)} + * to the LineString {@code (s1, s2)} is at most {@code radiusMetres}. + * MEOS-backed via {@code geog_dwithin} on geographies built from + * the point and line WKTs, else pure-Java + * {@link SegmentDistance#withinMetres}. + */ + public static boolean dwithinSegmentMetres(double pLon, double pLat, + double s1Lon, double s1Lat, + double s2Lon, double s2Lat, + double radiusMetres) { + if (!MEOS_AVAILABLE) { + return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres); + } + Pointer pg = pointGeog(pLon, pLat); + Pointer lg = lineGeog(s1Lon, s1Lat, s2Lon, s2Lat); + if (pg == null || lg == null) { + return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres); + } + return functions.geog_dwithin(pg, lg, radiusMetres, true); + } + + /** + * @return the spheroidal distance in metres between two WGS84 points. + * MEOS-backed via {@code utils.spatial.Haversine.distance} + * (which calls MEOS' {@code geog_distance} over two POINT + * geographies) when libmeos is loadable, else pure-Java + * {@link Haversine#distanceMetres}. + */ + public static double distanceMetres(double lon1, double lat1, + double lon2, double lat2) { + if (!MEOS_AVAILABLE) { + return Haversine.distanceMetres(lon1, lat1, lon2, lat2); + } + return utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2); + } + + /** + * @return the spheroidal distance in metres from {@code (pLon, pLat)} to + * the LineString {@code (s1, s2)}. MEOS-backed via + * {@code utils.spatial.PointToSegment.distance} when libmeos is + * loadable, else pure-Java + * {@link SegmentDistance#distanceMetres}. + */ + public static double distanceSegmentMetres(double pLon, double pLat, + double s1Lon, double s1Lat, + double s2Lon, double s2Lat) { + if (!MEOS_AVAILABLE) { + return SegmentDistance.distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat); + } + return PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat); + } + + // ---------------------------------------------------------------------- + // Internal helpers — WKT → geometry → geography in one MEOS-side step. + // ---------------------------------------------------------------------- + + private static Pointer pointGeog(double lon, double lat) { + String wkt = String.format("SRID=4326;Point(%.7f %.7f)", lon, lat); + Pointer g = functions.geom_in(wkt, -1); + if (g == null) { + return null; + } + return functions.geom_to_geog(g); + } + + private static Pointer lineGeog(double s1Lon, double s1Lat, + double s2Lon, double s2Lat) { + String wkt = String.format("SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", + s1Lon, s1Lat, s2Lon, s2Lat); + Pointer g = functions.geom_in(wkt, -1); + if (g == null) { + return null; + } + return functions.geom_to_geog(g); + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/PointOfInterest.java b/kafka-streams-app/src/main/java/berlinmod/PointOfInterest.java new file mode 100644 index 0000000..067f804 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/PointOfInterest.java @@ -0,0 +1,23 @@ +package berlinmod; + +import java.io.Serializable; + +/** + * Point-of-interest record used by BerlinMOD-Q7: an integer id, a (lon, lat) + * location, and a proximity radius in metres. + */ +public final class PointOfInterest implements Serializable { + private static final long serialVersionUID = 1L; + + public final int id; + public final double lon; + public final double lat; + public final double radiusMetres; + + public PointOfInterest(int id, double lon, double lat, double radiusMetres) { + this.id = id; + this.lon = lon; + this.lat = lat; + this.radiusMetres = radiusMetres; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q1ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q1ContinuousProcessor.java new file mode 100644 index 0000000..08ce438 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q1ContinuousProcessor.java @@ -0,0 +1,56 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BerlinMOD-Q1 — continuous form, Kafka-Streams Processor API. + * + *

"Which vehicles have appeared in the stream?" + * + *

Uses a {@link KeyValueStore} keyed by {@code vehicleId} (named in the + * caller) to dedupe — emits {@code (vehicleId, firstSeenTimestamp)} the + * first time each vehicle is seen and ignores subsequent events. + * + *

Same semantic as the MobilityFlink {@code Q1ContinuousFunction}; the + * differences are purely in the runtime API (Kafka Streams Processor vs + * Flink {@code KeyedProcessFunction}). + */ +public class Q1ContinuousProcessor implements Processor { + + private static final Logger LOG = LoggerFactory.getLogger(Q1ContinuousProcessor.class); + + private final String storeName; + private KeyValueStore seen; + private ProcessorContext ctx; + + public Q1ContinuousProcessor(String storeName) { + this.storeName = storeName; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.seen = context.getStateStore(storeName); + } + + @Override + public void process(Record record) { + Integer vehicleId = record.key(); + BerlinMODTrip trip = record.value(); + if (vehicleId == null || trip == null || vehicleId == -1) return; + + Boolean alreadySeen = seen.get(vehicleId); + if (alreadySeen == null || !alreadySeen) { + seen.put(vehicleId, true); + ctx.forward(new Record<>(vehicleId, trip.getTimestamp(), trip.getTimestamp())); + if (LOG.isDebugEnabled()) { + LOG.debug("Q1-continuous first-sighting: vehicle={} t={}", vehicleId, trip.getTimestamp()); + } + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q1SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q1SnapshotProcessor.java new file mode 100644 index 0000000..56ed8ea --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q1SnapshotProcessor.java @@ -0,0 +1,74 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q1 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, which vehicles have appeared in the stream up to T?" + * + *

The parity-oracle form: streaming output at watermark T equals the + * batch BerlinMOD-Q1 result on data up to T. + * + *

Caller keys the input by a constant so the shared cross-vehicle "seen" + * map lives in a single subtask. On each event (sentinel vehicleId == -1 + * is ignored), record vehicleId → firstSeenTime if not already present. + * On each STREAM_TIME punctuator fire (every {@code snapshotTickMillis}), + * walk the map and forward {@code (currentStreamTime, vehicleId)} per + * recorded vehicle, sorted by vehicleId for deterministic output. + */ +public class Q1SnapshotProcessor implements Processor { + + private final String storeName; + private final long snapshotTickMillis; + private KeyValueStore seen; + private ProcessorContext ctx; + + public Q1SnapshotProcessor(String storeName, long snapshotTickMillis) { + this.storeName = storeName; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.seen = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + if (seen.get(trip.getVehicleId()) == null) { + seen.put(trip.getVehicleId(), trip.getTimestamp()); + } + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + List ids = new ArrayList<>(); + try (KeyValueIterator it = seen.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + ids.add(kv.key); + } + } + ids.sort(Comparator.naturalOrder()); + for (Integer vehicleId : ids) { + ctx.forward(new Record<>(tick, vehicleId, tick)); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q1WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q1WindowedProcessor.java new file mode 100644 index 0000000..ea12251 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q1WindowedProcessor.java @@ -0,0 +1,95 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * BerlinMOD-Q1 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, how many distinct vehicles appeared + * in the window?" + * + *

State value is a comma-separated set of vehicleIds seen in each + * window start. STREAM_TIME punctuator at {@code windowSizeMs} interval + * emits closed windows and removes them from the store. + */ +public class Q1WindowedProcessor implements Processor { + + private final String storeName; + private final long windowSizeMs; + private KeyValueStore winState; // winStart -> "vid1,vid2,..." + private ProcessorContext ctx; + + public Q1WindowedProcessor(String storeName, long windowSizeMs) { + this.storeName = storeName; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + String prior = winState.get(winStart); + String sv = Integer.toString(trip.getVehicleId()); + if (prior == null) { + winState.put(winStart, sv); + } else if (!containsId(prior, sv)) { + winState.put(winStart, prior + "," + sv); + } + } + + private void punctuate(long currentStreamTime) { + // Emit closed windows (winEnd <= currentStreamTime) and remove + List toEmit = new ArrayList<>(); + List counts = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + toEmit.add(kv.key); + Set distinct = new HashSet<>(); + for (String s : kv.value.split(",")) distinct.add(s); + counts.add((long) distinct.size()); + } + } + } + // Sort by winStart for deterministic order + Integer[] idx = new Integer[toEmit.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(toEmit::get)); + for (Integer i : idx) { + long winStart = toEmit.get(i); + ctx.forward(new Record<>(winStart, counts.get(i), winStart + windowSizeMs - 1)); + winState.delete(winStart); + } + } + + private boolean containsId(String csv, String id) { + if (csv == null) return false; + for (String s : csv.split(",")) { + if (s.equals(id)) return true; + } + return false; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q2ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q2ContinuousProcessor.java new file mode 100644 index 0000000..e5881af --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q2ContinuousProcessor.java @@ -0,0 +1,36 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; + +/** + * BerlinMOD-Q2 — continuous form, Kafka-Streams Processor API. + * + *

"Where is vehicle X right now?" + * + *

Pure stateless filter: forward records whose key matches the queried + * {@code targetVehicleId}, drop the rest. Matches the MobilityFlink + * {@code Q2ContinuousFunction} pattern. + */ +public class Q2ContinuousProcessor implements Processor { + + private final int targetVehicleId; + private ProcessorContext ctx; + + public Q2ContinuousProcessor(int targetVehicleId) { + this.targetVehicleId = targetVehicleId; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + } + + @Override + public void process(Record record) { + if (record.key() != null && record.key() == targetVehicleId) { + ctx.forward(record); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q2SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q2SnapshotProcessor.java new file mode 100644 index 0000000..bdca33d --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q2SnapshotProcessor.java @@ -0,0 +1,60 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; + +/** + * BerlinMOD-Q2 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, where is vehicle X?" using X's most-recent-known + * position on or before T. + * + *

Single-key state (key=0) value "lon,lat,t" updated only when an event + * arrives for the queried {@code targetVehicleId}. STREAM_TIME punctuator + * every {@code snapshotTickMillis} emits {@code (currentTick, lon, lat, + * lastEventT)} when state is set. + */ +public class Q2SnapshotProcessor implements Processor { + + private final String storeName; + private final int targetVehicleId; + private final long snapshotTickMillis; + private KeyValueStore state; + private ProcessorContext ctx; + + public Q2SnapshotProcessor(String storeName, int targetVehicleId, long snapshotTickMillis) { + this.storeName = storeName; + this.targetVehicleId = targetVehicleId; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.state = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + if (trip.getVehicleId() == targetVehicleId) { + state.put(0, trip.getLon() + "," + trip.getLat() + "," + trip.getTimestamp()); + } + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + String v = state.get(0); + if (v != null) { + ctx.forward(new Record<>(tick, v, tick)); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q2WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q2WindowedProcessor.java new file mode 100644 index 0000000..c5fb102 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q2WindowedProcessor.java @@ -0,0 +1,78 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q2 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, last-known (lon, lat) of vehicle X + * seen in the window." + * + *

State value "lon,lat,t" overwritten on every X event whose timestamp + * falls in {@code winStart}. STREAM_TIME punctuator at {@code windowSizeMs} + * emits closed windows. + */ +public class Q2WindowedProcessor implements Processor { + + private final String storeName; + private final int targetVehicleId; + private final long windowSizeMs; + private KeyValueStore winState; + private ProcessorContext ctx; + + public Q2WindowedProcessor(String storeName, int targetVehicleId, long windowSizeMs) { + this.storeName = storeName; + this.targetVehicleId = targetVehicleId; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + if (trip.getVehicleId() != targetVehicleId) return; + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + winState.put(winStart, trip.getLon() + "," + trip.getLat() + "," + trip.getTimestamp()); + } + + private void punctuate(long currentStreamTime) { + List toEmit = new ArrayList<>(); + List values = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + toEmit.add(kv.key); + values.add(kv.value); + } + } + } + Integer[] idx = new Integer[toEmit.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(toEmit::get)); + for (Integer i : idx) { + long winStart = toEmit.get(i); + ctx.forward(new Record<>(winStart, values.get(i), winStart + windowSizeMs - 1)); + winState.delete(winStart); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java new file mode 100644 index 0000000..abd5755 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q3ContinuousProcessor.java @@ -0,0 +1,46 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; + +/** + * BerlinMOD-Q3 — continuous form, Kafka-Streams Processor API. + * + *

"Is this vehicle currently within {@code d} metres of point P?" + * + *

Stateless per-event predicate: forward {@code (vehicleId, + * eventTime, near)} per incoming GPS event. Same predicate semantics + * as MobilityFlink's {@code Q3ContinuousFunction}. + * + *

Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin} + * over WGS84 geographies when libmeos is loadable, {@link Haversine} + * fallback otherwise. + */ +public class Q3ContinuousProcessor implements Processor { + + private final double pLon; + private final double pLat; + private final double radiusMetres; + private ProcessorContext ctx; + + public Q3ContinuousProcessor(double pLon, double pLat, double radiusMetres) { + this.pLon = pLon; + this.pLat = pLat; + this.radiusMetres = radiusMetres; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + boolean near = MEOSBridge.dwithinMetres( + trip.getLon(), trip.getLat(), pLon, pLat, radiusMetres); + ctx.forward(new Record<>(trip.getVehicleId(), near, trip.getTimestamp())); + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java new file mode 100644 index 0000000..e75cc85 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q3SnapshotProcessor.java @@ -0,0 +1,79 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q3 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, which vehicles are within {@code d} metres of point P?" + * using each vehicle's most-recent-known position on or before T. + * + *

Caller keys the input by a constant so the shared cross-vehicle + * last-known store lives in one subtask. Per event: update last-known. + * Per STREAM_TIME punctuator fire: iterate last-known, evaluate the + * Haversine radius predicate, forward {@code (currentTick, vehicleId)} + * for every near vehicle (sorted by vehicleId). + */ +public class Q3SnapshotProcessor implements Processor { + + private final String storeName; + private final double pLon, pLat, radiusMetres; + private final long snapshotTickMillis; + private KeyValueStore lastPos; // vehicleId -> "lon,lat" + private ProcessorContext ctx; + + public Q3SnapshotProcessor(String storeName, double pLon, double pLat, + double radiusMetres, long snapshotTickMillis) { + this.storeName = storeName; + this.pLon = pLon; + this.pLat = pLat; + this.radiusMetres = radiusMetres; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.lastPos = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + lastPos.put(trip.getVehicleId(), trip.getLon() + "," + trip.getLat()); + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + List nearIds = new ArrayList<>(); + try (KeyValueIterator it = lastPos.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + String[] ll = kv.value.split(",", 2); + double lon = Double.parseDouble(ll[0]); + double lat = Double.parseDouble(ll[1]); + if (MEOSBridge.dwithinMetres(lon, lat, pLon, pLat, radiusMetres)) { + nearIds.add(kv.key); + } + } + } + nearIds.sort(Comparator.naturalOrder()); + for (Integer vid : nearIds) { + ctx.forward(new Record<>(tick, vid, tick)); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q3WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q3WindowedProcessor.java new file mode 100644 index 0000000..1e01aaf --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q3WindowedProcessor.java @@ -0,0 +1,97 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * BerlinMOD-Q3 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, how many distinct vehicles were + * within {@code d} metres of point P at any time during the window?" + * + *

Same shape as {@link Q1WindowedProcessor} but only records vehicles + * whose event satisfies the radius predicate. + */ +public class Q3WindowedProcessor implements Processor { + + private final String storeName; + private final double pLon, pLat, radiusMetres; + private final long windowSizeMs; + private KeyValueStore winState; + private ProcessorContext ctx; + + public Q3WindowedProcessor(String storeName, double pLon, double pLat, + double radiusMetres, long windowSizeMs) { + this.storeName = storeName; + this.pLon = pLon; + this.pLat = pLat; + this.radiusMetres = radiusMetres; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + if (!MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), pLon, pLat, radiusMetres)) { + return; + } + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + String prior = winState.get(winStart); + String sv = Integer.toString(trip.getVehicleId()); + if (prior == null) { + winState.put(winStart, sv); + } else if (!Q1WindowedProcessor.class.getName().isEmpty() && !contains(prior, sv)) { + winState.put(winStart, prior + "," + sv); + } + } + + private void punctuate(long currentStreamTime) { + List toEmit = new ArrayList<>(); + List counts = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + toEmit.add(kv.key); + Set distinct = new HashSet<>(); + for (String s : kv.value.split(",")) distinct.add(s); + counts.add((long) distinct.size()); + } + } + } + Integer[] idx = new Integer[toEmit.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(toEmit::get)); + for (Integer i : idx) { + long winStart = toEmit.get(i); + ctx.forward(new Record<>(winStart, counts.get(i), winStart + windowSizeMs - 1)); + winState.delete(winStart); + } + } + + private static boolean contains(String csv, String id) { + for (String s : csv.split(",")) if (s.equals(id)) return true; + return false; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java new file mode 100644 index 0000000..95f069f --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q4ContinuousProcessor.java @@ -0,0 +1,59 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; + +/** + * BerlinMOD-Q4 — continuous form, Kafka-Streams Processor API. + * + *

"Which vehicles entered region R (transition outside → inside)?" + * + *

Keyed by vehicleId. Per-vehicle state tracks the last seen + * inside-or-outside flag for R; on each event, detect outside → inside + * transition and emit {@code (vehicleId, entryTime)}. + * + *

Predicate: pure-Java axis-aligned point-in-box. The rectangular region + * is degenerate as a geographic predicate (no projection needed); a generic + * polygon-R variant would route through {@link MEOSBridge} for MEOS + * {@code eintersects_tgeo_geo}. + */ +public class Q4ContinuousProcessor implements Processor { + + private final String storeName; + private final double xmin, ymin, xmax, ymax; + private KeyValueStore wasInside; + private ProcessorContext ctx; + + public Q4ContinuousProcessor(String storeName, double xmin, double ymin, double xmax, double ymax) { + this.storeName = storeName; + this.xmin = xmin; + this.ymin = ymin; + this.xmax = xmax; + this.ymax = ymax; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.wasInside = context.getStateStore(storeName); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + boolean isInside = inBox(trip.getLon(), trip.getLat()); + Boolean prev = wasInside.get(trip.getVehicleId()); + boolean prevInside = prev != null && prev; + if (isInside && !prevInside) { + ctx.forward(new Record<>(trip.getVehicleId(), trip.getTimestamp(), trip.getTimestamp())); + } + wasInside.put(trip.getVehicleId(), isInside); + } + + private boolean inBox(double lon, double lat) { + return lon >= xmin && lon <= xmax && lat >= ymin && lat <= ymax; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java new file mode 100644 index 0000000..27a1383 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q4SnapshotProcessor.java @@ -0,0 +1,99 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q4 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, what is the list of (vehicleId, entryTime) pairs for all + * vehicles that entered region R at or before T?" + * + *

Caller keys the input by a constant so the shared cross-vehicle state + * lives in one subtask. Per-vehicle two stores: {@code wasInside} (Boolean) + * and {@code entries} (semicolon-separated entry times). Per event: detect + * outside→inside transition and append the entry time. Per STREAM_TIME + * punctuator fire: walk every vehicle, emit one + * {@code (currentTick, vehicleId, entryTime)} per recorded entry time + * ≤ currentTick. + */ +public class Q4SnapshotProcessor implements Processor { + + private final String wasInsideStoreName; + private final String entriesStoreName; + private final double xmin, ymin, xmax, ymax; + private final long snapshotTickMillis; + private KeyValueStore wasInside; + private KeyValueStore entries; // vehicleId -> "t1;t2;..." + private ProcessorContext ctx; + + public Q4SnapshotProcessor(String wasInsideStoreName, String entriesStoreName, + double xmin, double ymin, double xmax, double ymax, + long snapshotTickMillis) { + this.wasInsideStoreName = wasInsideStoreName; + this.entriesStoreName = entriesStoreName; + this.xmin = xmin; + this.ymin = ymin; + this.xmax = xmax; + this.ymax = ymax; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.wasInside = context.getStateStore(wasInsideStoreName); + this.entries = context.getStateStore(entriesStoreName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax + && trip.getLat() >= ymin && trip.getLat() <= ymax; + Boolean prev = wasInside.get(trip.getVehicleId()); + boolean prevInside = prev != null && prev; + if (curr && !prevInside) { + String prior = entries.get(trip.getVehicleId()); + String updated = (prior == null || prior.isEmpty()) + ? Long.toString(trip.getTimestamp()) + : prior + ";" + trip.getTimestamp(); + entries.put(trip.getVehicleId(), updated); + } + wasInside.put(trip.getVehicleId(), curr); + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + List ids = new ArrayList<>(); + try (KeyValueIterator it = entries.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.value != null && !kv.value.isEmpty()) ids.add(kv.key); + } + } + ids.sort(Comparator.naturalOrder()); + for (Integer vid : ids) { + String list = entries.get(vid); + for (String s : list.split(";")) { + long entryTime = Long.parseLong(s); + if (entryTime <= tick) { + ctx.forward(new Record<>(tick, vid + "@" + entryTime, tick)); + } + } + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java new file mode 100644 index 0000000..346ec34 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q4WindowedProcessor.java @@ -0,0 +1,143 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q4 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, which vehicles entered region R during + * the window?" Intra-window scoping: a vehicle's first event in the + * window with {@code inBox(...) == true} counts as an entry (no cross-window + * memory of prior inside-state). + * + *

State value encodes the entries already recorded for the window plus + * a "last seen inside" flag per vehicle: comma-separated list of + * {@code "vid:wasInside:entryTime"} triples. Tracks per-(window, vehicle) + * to detect intra-window outside→inside transitions. + */ +public class Q4WindowedProcessor implements Processor { + + private final String storeName; + private final double xmin, ymin, xmax, ymax; + private final long windowSizeMs; + private KeyValueStore winState; // winStart -> "vid:wasInside:entries|..." + private ProcessorContext ctx; + + public Q4WindowedProcessor(String storeName, double xmin, double ymin, double xmax, double ymax, + long windowSizeMs) { + this.storeName = storeName; + this.xmin = xmin; + this.ymin = ymin; + this.xmax = xmax; + this.ymax = ymax; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + boolean curr = trip.getLon() >= xmin && trip.getLon() <= xmax + && trip.getLat() >= ymin && trip.getLat() <= ymax; + String s = winState.get(winStart); + // Parse per-vehicle records separated by '|' + StringBuilder rebuilt = new StringBuilder(); + boolean foundVehicle = false; + if (s != null && !s.isEmpty()) { + for (String chunk : s.split("\\|")) { + String[] f = chunk.split(":", 3); + int vid = Integer.parseInt(f[0]); + boolean wasInside = Boolean.parseBoolean(f[1]); + String entries = f.length > 2 ? f[2] : ""; + if (vid == trip.getVehicleId()) { + foundVehicle = true; + String newEntries = entries; + if (curr && !wasInside) { + newEntries = entries.isEmpty() + ? Long.toString(trip.getTimestamp()) + : entries + "," + trip.getTimestamp(); + } + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(vid).append(":").append(curr).append(":").append(newEntries); + } else { + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(chunk); + } + } + } + if (!foundVehicle) { + // First event for this vehicle in this window — intra-window scoping + // treats first-seen-inside as an entry. + String entries = curr ? Long.toString(trip.getTimestamp()) : ""; + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(trip.getVehicleId()).append(":").append(curr).append(":").append(entries); + } + winState.put(winStart, rebuilt.toString()); + } + + private void punctuate(long currentStreamTime) { + List closedStarts = new ArrayList<>(); + List closedStates = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + closedStarts.add(kv.key); + closedStates.add(kv.value); + } + } + } + Integer[] idx = new Integer[closedStarts.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(closedStarts::get)); + for (Integer i : idx) { + long winStart = closedStarts.get(i); + // Emit one "vid:entryTime" per recorded entry, sorted by (vid, entryTime) + List vidIdx = new ArrayList<>(); + List times = new ArrayList<>(); + for (String chunk : closedStates.get(i).split("\\|")) { + if (chunk.isEmpty()) continue; + String[] f = chunk.split(":", 3); + int vid = Integer.parseInt(f[0]); + String entries = f.length > 2 ? f[2] : ""; + if (entries.isEmpty()) continue; + for (String s : entries.split(",")) { + vidIdx.add(new int[]{vid}); + times.add(new long[]{Long.parseLong(s)}); + } + } + // Stable sort by vid then time + Integer[] sortedIdx = new Integer[vidIdx.size()]; + for (int k = 0; k < sortedIdx.length; k++) sortedIdx[k] = k; + java.util.Arrays.sort(sortedIdx, Comparator + .comparingInt((Integer k) -> vidIdx.get(k)[0]) + .thenComparingLong(k -> times.get(k)[0])); + for (Integer k : sortedIdx) { + ctx.forward(new Record<>(winStart, + vidIdx.get(k)[0] + ":" + times.get(k)[0], + winStart + windowSizeMs - 1)); + } + winState.delete(winStart); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q5ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q5ContinuousProcessor.java new file mode 100644 index 0000000..a40048c --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q5ContinuousProcessor.java @@ -0,0 +1,94 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q5 — continuous form, Kafka-Streams Processor API. + * + *

"Which pairs of vehicles are currently meeting near point P?" + * + *

Caller should key the input stream by a constant so the shared + * cross-vehicle last-known state lives in one subtask. Per-event: + * update last-known of the event's vehicle, then enumerate all known + * pairs and forward {@code (a, b, eventTime, distanceMetres)} encoded + * as a string key + Double value for every currently-meeting pair + * (a < b for stable identity). + * + *

State value encoded as "lon,lat" string; the encoding is private + * to this processor (avoids declaring a tuple SerDe for the scaffold). + */ +public class Q5ContinuousProcessor implements Processor { + + private final String storeName; + private final double pLon, pLat, dPMetres, dMeetMetres; + private KeyValueStore lastPos; + private ProcessorContext ctx; + + public Q5ContinuousProcessor(String storeName, double pLon, double pLat, + double dPMetres, double dMeetMetres) { + this.storeName = storeName; + this.pLon = pLon; + this.pLat = pLat; + this.dPMetres = dPMetres; + this.dMeetMetres = dMeetMetres; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.lastPos = context.getStateStore(storeName); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + lastPos.put(trip.getVehicleId(), trip.getLon() + "," + trip.getLat()); + + // Snapshot near-P vehicles (sorted by id for stable pair iteration) + List ids = new ArrayList<>(); + List positions = new ArrayList<>(); + try (KeyValueIterator it = lastPos.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + String[] ll = kv.value.split(",", 2); + double lon = Double.parseDouble(ll[0]); + double lat = Double.parseDouble(ll[1]); + if (MEOSBridge.dwithinMetres(lon, lat, pLon, pLat, dPMetres)) { + ids.add(new int[]{kv.key}); + positions.add(new double[]{lon, lat}); + } + } + } + // Sort by id for stable output (small N — bubble is fine) + int n = ids.size(); + for (int i = 0; i < n - 1; i++) { + for (int j = i + 1; j < n; j++) { + if (ids.get(i)[0] > ids.get(j)[0]) { + int[] ti = ids.get(i); ids.set(i, ids.get(j)); ids.set(j, ti); + double[] tp = positions.get(i); positions.set(i, positions.get(j)); positions.set(j, tp); + } + } + } + for (int i = 0; i < n; i++) { + for (int j = i + 1; j < n; j++) { + double d = MEOSBridge.distanceMetres( + positions.get(i)[0], positions.get(i)[1], + positions.get(j)[0], positions.get(j)[1]); + if (d <= dMeetMetres) { + String pairKey = ids.get(i)[0] + "_" + ids.get(j)[0] + "@" + trip.getTimestamp(); + ctx.forward(new Record<>(pairKey, d, trip.getTimestamp())); + } + } + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q5SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q5SnapshotProcessor.java new file mode 100644 index 0000000..20f5f9a --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q5SnapshotProcessor.java @@ -0,0 +1,99 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q5 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, which pairs of vehicles are meeting near point P + * (using each vehicle's most-recent-known position on or before T)?" + * + *

Caller keys the input by a constant so the shared cross-vehicle + * last-known store lives in one subtask. Per event: update last-known. + * Per STREAM_TIME punctuator fire: snapshot the map, filter to near-P, + * enumerate all pairs (a { + + private final String storeName; + private final double pLon, pLat, dPMetres, dMeetMetres; + private final long snapshotTickMillis; + private KeyValueStore lastPos; // vehicleId -> "lon,lat" + private ProcessorContext ctx; + + public Q5SnapshotProcessor(String storeName, double pLon, double pLat, + double dPMetres, double dMeetMetres, + long snapshotTickMillis) { + this.storeName = storeName; + this.pLon = pLon; + this.pLat = pLat; + this.dPMetres = dPMetres; + this.dMeetMetres = dMeetMetres; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.lastPos = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + lastPos.put(trip.getVehicleId(), trip.getLon() + "," + trip.getLat()); + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + List ids = new ArrayList<>(); + List positions = new ArrayList<>(); + try (KeyValueIterator it = lastPos.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + String[] ll = kv.value.split(",", 2); + double lon = Double.parseDouble(ll[0]); + double lat = Double.parseDouble(ll[1]); + if (MEOSBridge.dwithinMetres(lon, lat, pLon, pLat, dPMetres)) { + ids.add(new int[]{kv.key}); + positions.add(new double[]{lon, lat}); + } + } + } + int n = ids.size(); + for (int i = 0; i < n - 1; i++) { + for (int j = i + 1; j < n; j++) { + if (ids.get(i)[0] > ids.get(j)[0]) { + int[] ti = ids.get(i); ids.set(i, ids.get(j)); ids.set(j, ti); + double[] tp = positions.get(i); positions.set(i, positions.get(j)); positions.set(j, tp); + } + } + } + for (int i = 0; i < n; i++) { + for (int j = i + 1; j < n; j++) { + double d = MEOSBridge.distanceMetres( + positions.get(i)[0], positions.get(i)[1], + positions.get(j)[0], positions.get(j)[1]); + if (d <= dMeetMetres) { + String pairKey = ids.get(i)[0] + "_" + ids.get(j)[0] + "@" + tick; + ctx.forward(new Record<>(pairKey, d, tick)); + } + } + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q5WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q5WindowedProcessor.java new file mode 100644 index 0000000..c8cb013 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q5WindowedProcessor.java @@ -0,0 +1,137 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q5 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, which pairs of vehicles met near P + * (using each vehicle's last-seen-in-window position)?" + * + *

State value encodes per-window per-vehicle last position as + * {@code "vid:lon,lat|vid:lon,lat|..."}. On the STREAM_TIME punctuator + * for each closed window, filter to near-P, enumerate sorted pairs, + * forward meeting pairs. + */ +public class Q5WindowedProcessor implements Processor { + + private final String storeName; + private final double pLon, pLat, dPMetres, dMeetMetres; + private final long windowSizeMs; + private KeyValueStore winState; + private ProcessorContext ctx; + + public Q5WindowedProcessor(String storeName, double pLon, double pLat, + double dPMetres, double dMeetMetres, long windowSizeMs) { + this.storeName = storeName; + this.pLon = pLon; + this.pLat = pLat; + this.dPMetres = dPMetres; + this.dMeetMetres = dMeetMetres; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + String s = winState.get(winStart); + StringBuilder rebuilt = new StringBuilder(); + boolean replaced = false; + if (s != null && !s.isEmpty()) { + for (String chunk : s.split("\\|")) { + int colon = chunk.indexOf(':'); + int vid = Integer.parseInt(chunk.substring(0, colon)); + if (vid == trip.getVehicleId()) { + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(vid).append(":").append(trip.getLon()).append(",").append(trip.getLat()); + replaced = true; + } else { + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(chunk); + } + } + } + if (!replaced) { + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(trip.getVehicleId()).append(":").append(trip.getLon()).append(",").append(trip.getLat()); + } + winState.put(winStart, rebuilt.toString()); + } + + private void punctuate(long currentStreamTime) { + List closedStarts = new ArrayList<>(); + List closedStates = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + closedStarts.add(kv.key); + closedStates.add(kv.value); + } + } + } + Integer[] idx = new Integer[closedStarts.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(closedStarts::get)); + for (Integer i : idx) { + long winStart = closedStarts.get(i); + // Parse positions, filter near-P + List nearIds = new ArrayList<>(); + List positions = new ArrayList<>(); + for (String chunk : closedStates.get(i).split("\\|")) { + int colon = chunk.indexOf(':'); + if (colon < 0) continue; + int vid = Integer.parseInt(chunk.substring(0, colon)); + String[] ll = chunk.substring(colon + 1).split(",", 2); + double lon = Double.parseDouble(ll[0]); + double lat = Double.parseDouble(ll[1]); + if (MEOSBridge.dwithinMetres(lon, lat, pLon, pLat, dPMetres)) { + nearIds.add(new int[]{vid}); + positions.add(new double[]{lon, lat}); + } + } + int n = nearIds.size(); + for (int a = 0; a < n - 1; a++) { + for (int b = a + 1; b < n; b++) { + if (nearIds.get(a)[0] > nearIds.get(b)[0]) { + int[] ti = nearIds.get(a); nearIds.set(a, nearIds.get(b)); nearIds.set(b, ti); + double[] tp = positions.get(a); positions.set(a, positions.get(b)); positions.set(b, tp); + } + } + } + for (int a = 0; a < n; a++) { + for (int b = a + 1; b < n; b++) { + double d = MEOSBridge.distanceMetres( + positions.get(a)[0], positions.get(a)[1], + positions.get(b)[0], positions.get(b)[1]); + if (d <= dMeetMetres) { + String pairKey = nearIds.get(a)[0] + "_" + nearIds.get(b)[0] + "@win" + winStart; + ctx.forward(new Record<>(pairKey, d, winStart + windowSizeMs - 1)); + } + } + } + winState.delete(winStart); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java new file mode 100644 index 0000000..f5f7f4c --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q6ContinuousProcessor.java @@ -0,0 +1,60 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; + +/** + * BerlinMOD-Q6 — continuous form, Kafka-Streams Processor API. + * + *

"What is each vehicle's cumulative distance travelled so far?" + * + *

Keyed by vehicleId. Per-vehicle state holds the last-known (lon, lat) + * and the running total in metres. On each event, accumulate the Haversine + * delta and emit the cumulative total. + * + *

State value uses a small string encoding "lon,lat,total" since the + * scaffold avoids declaring a dedicated tuple SerDe; the encoding is + * private to this processor. + * + *

Cumulative distance: per consecutive position-pair via + * {@link MEOSBridge#distanceMetres}. The future "full" path uses MEOS' + * {@code tpoint_length} over an aggregated trajectory; the per-event + * cumulative form is the same numeric quantity either way. + */ +public class Q6ContinuousProcessor implements Processor { + + private final String storeName; + private KeyValueStore state; // "lastLon,lastLat,total" + private ProcessorContext ctx; + + public Q6ContinuousProcessor(String storeName) { + this.storeName = storeName; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.state = context.getStateStore(storeName); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + String prev = state.get(trip.getVehicleId()); + double total; + if (prev == null) { + total = 0.0; + } else { + String[] parts = prev.split(",", 3); + double lastLon = Double.parseDouble(parts[0]); + double lastLat = Double.parseDouble(parts[1]); + double prevTotal = Double.parseDouble(parts[2]); + total = prevTotal + MEOSBridge.distanceMetres(lastLon, lastLat, trip.getLon(), trip.getLat()); + } + state.put(trip.getVehicleId(), trip.getLon() + "," + trip.getLat() + "," + total); + ctx.forward(new Record<>(trip.getVehicleId(), total, trip.getTimestamp())); + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java new file mode 100644 index 0000000..7fe87cc --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q6SnapshotProcessor.java @@ -0,0 +1,81 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q6 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, what is each vehicle's total distance travelled + * up to T?" + * + *

Caller keys the input by a constant. State value "lon,lat,total" + * per vehicleId. Per event: accumulate Haversine delta. Per STREAM_TIME + * punctuator fire: emit {@code (currentTick, vehicleId, total)} for + * every vehicle (sorted by vehicleId), encoded as "vid:total". + */ +public class Q6SnapshotProcessor implements Processor { + + private final String storeName; + private final long snapshotTickMillis; + private KeyValueStore state; + private ProcessorContext ctx; + + public Q6SnapshotProcessor(String storeName, long snapshotTickMillis) { + this.storeName = storeName; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.state = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + String prev = state.get(trip.getVehicleId()); + double total; + if (prev == null) { + total = 0.0; + } else { + String[] parts = prev.split(",", 3); + double lastLon = Double.parseDouble(parts[0]); + double lastLat = Double.parseDouble(parts[1]); + double prevTotal = Double.parseDouble(parts[2]); + total = prevTotal + MEOSBridge.distanceMetres(lastLon, lastLat, trip.getLon(), trip.getLat()); + } + state.put(trip.getVehicleId(), trip.getLon() + "," + trip.getLat() + "," + total); + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + List ids = new ArrayList<>(); + try (KeyValueIterator it = state.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + ids.add(kv.key); + } + } + ids.sort(Comparator.naturalOrder()); + for (Integer vid : ids) { + String[] parts = state.get(vid).split(",", 3); + double total = Double.parseDouble(parts[2]); + ctx.forward(new Record<>(tick, vid + ":" + total, tick)); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java new file mode 100644 index 0000000..c540ad4 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q6WindowedProcessor.java @@ -0,0 +1,126 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q6 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, per vehicle, distance travelled + * during the window." + * + *

State encodes per-window per-vehicle {@code "vid:lastLon,lastLat,total|..."}. + * On each event, accumulate Haversine delta from the previous in-window + * position. On punctuator: emit per-vehicle totals for closed windows. + */ +public class Q6WindowedProcessor implements Processor { + + private final String storeName; + private final long windowSizeMs; + private KeyValueStore winState; + private ProcessorContext ctx; + + public Q6WindowedProcessor(String storeName, long windowSizeMs) { + this.storeName = storeName; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + String s = winState.get(winStart); + StringBuilder rebuilt = new StringBuilder(); + boolean replaced = false; + if (s != null && !s.isEmpty()) { + for (String chunk : s.split("\\|")) { + int colon = chunk.indexOf(':'); + int vid = Integer.parseInt(chunk.substring(0, colon)); + String body = chunk.substring(colon + 1); + if (vid == trip.getVehicleId()) { + String[] f = body.split(",", 3); + double lastLon = Double.parseDouble(f[0]); + double lastLat = Double.parseDouble(f[1]); + double prevTotal = Double.parseDouble(f[2]); + double newTotal = prevTotal + MEOSBridge.distanceMetres( + lastLon, lastLat, trip.getLon(), trip.getLat()); + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(vid).append(":") + .append(trip.getLon()).append(",") + .append(trip.getLat()).append(",") + .append(newTotal); + replaced = true; + } else { + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(chunk); + } + } + } + if (!replaced) { + if (rebuilt.length() > 0) rebuilt.append("|"); + rebuilt.append(trip.getVehicleId()).append(":") + .append(trip.getLon()).append(",") + .append(trip.getLat()).append(",0.0"); + } + winState.put(winStart, rebuilt.toString()); + } + + private void punctuate(long currentStreamTime) { + List closedStarts = new ArrayList<>(); + List closedStates = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + closedStarts.add(kv.key); + closedStates.add(kv.value); + } + } + } + Integer[] idx = new Integer[closedStarts.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(closedStarts::get)); + for (Integer i : idx) { + long winStart = closedStarts.get(i); + List ids = new ArrayList<>(); + List totals = new ArrayList<>(); + for (String chunk : closedStates.get(i).split("\\|")) { + int colon = chunk.indexOf(':'); + if (colon < 0) continue; + int vid = Integer.parseInt(chunk.substring(0, colon)); + String[] f = chunk.substring(colon + 1).split(",", 3); + double total = Double.parseDouble(f[2]); + ids.add(new int[]{vid}); + totals.add(new double[]{total}); + } + Integer[] sortedIdx = new Integer[ids.size()]; + for (int k = 0; k < sortedIdx.length; k++) sortedIdx[k] = k; + java.util.Arrays.sort(sortedIdx, Comparator.comparingInt(k -> ids.get(k)[0])); + for (Integer k : sortedIdx) { + ctx.forward(new Record<>(winStart, + ids.get(k)[0] + ":" + totals.get(k)[0], + winStart + windowSizeMs - 1)); + } + winState.delete(winStart); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q7ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q7ContinuousProcessor.java new file mode 100644 index 0000000..6610e38 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q7ContinuousProcessor.java @@ -0,0 +1,60 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.List; + +/** + * BerlinMOD-Q7 — continuous form, Kafka-Streams Processor API. + * + *

"For each (vehicle, POI) pair, when did the vehicle first come + * within the POI's radius?" + * + *

Keyed by vehicleId. State is a {@link KeyValueStore} whose composite + * key is the POI id, scoped per-vehicle by Kafka Streams' implicit + * per-key state partitioning. On each event, scan the POI list; for each + * POI not yet recorded as passed and within radius, record the time and + * emit {@code (poiId, firstPassageTime)} (the key carries the vehicleId + * implicitly via the upstream keying). + */ +public class Q7ContinuousProcessor implements Processor { + + private final String storeName; + private final List pois; + private KeyValueStore firstPassed; // poiId -> firstPassageTime, per-vehicle by keying + private ProcessorContext ctx; + + public Q7ContinuousProcessor(String storeName, List pois) { + this.storeName = storeName; + this.pois = pois; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.firstPassed = context.getStateStore(storeName); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + Integer vehicleId = record.key(); + for (PointOfInterest poi : pois) { + int compositeKey = compositeKey(vehicleId, poi.id); + if (firstPassed.get(compositeKey) != null) continue; + if (MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), poi.lon, poi.lat, poi.radiusMetres)) { + firstPassed.put(compositeKey, trip.getTimestamp()); + ctx.forward(new Record<>(poi.id, trip.getTimestamp(), trip.getTimestamp())); + } + } + } + + /** Pack (vehicleId, poiId) into a single Integer for the store key. */ + private static int compositeKey(int vehicleId, int poiId) { + return (vehicleId * 1000) + poiId; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q7SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q7SnapshotProcessor.java new file mode 100644 index 0000000..e9f2e35 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q7SnapshotProcessor.java @@ -0,0 +1,91 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q7 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, for each (vehicle, POI), the first time the vehicle + * came within the POI's radius on or before T." + * + *

Caller keys the input by a constant. Store key is the composite + * {@code vehicleId * 1000 + poiId} integer with first-passage timestamp + * as value. Per event: detect any (vehicle, POI) first-passages. Per + * STREAM_TIME punctuator fire: walk store, emit each + * {@code (currentTick, "vid:poiId:firstPassageTime")} for entries + * with firstPassageTime ≤ currentTick. + */ +public class Q7SnapshotProcessor implements Processor { + + private final String storeName; + private final List pois; + private final long snapshotTickMillis; + private KeyValueStore firstPassed; // compositeKey -> firstPassageTime + private ProcessorContext ctx; + + public Q7SnapshotProcessor(String storeName, List pois, long snapshotTickMillis) { + this.storeName = storeName; + this.pois = pois; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.firstPassed = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + for (PointOfInterest poi : pois) { + int composite = trip.getVehicleId() * 1000 + poi.id; + if (firstPassed.get(composite) != null) continue; + if (MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), poi.lon, poi.lat, poi.radiusMetres)) { + firstPassed.put(composite, trip.getTimestamp()); + } + } + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + List rows = new ArrayList<>(); // {vehicleId, poiId, firstPassageTime as int-of-long? — keep long via array of longs} + // separate list for longs since arrays of ints lose precision + List firstPassages = new ArrayList<>(); + try (KeyValueIterator it = firstPassed.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.value <= tick) { + int vid = kv.key / 1000; + int poiId = kv.key % 1000; + rows.add(new int[]{vid, poiId}); + firstPassages.add(kv.value); + } + } + } + // Sort by (vehicleId, poiId) for deterministic output via parallel sort of two lists + Integer[] idx = new Integer[rows.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator + .comparingInt((Integer i) -> rows.get(i)[0]) + .thenComparingInt(i -> rows.get(i)[1])); + for (Integer i : idx) { + int[] r = rows.get(i); + ctx.forward(new Record<>(tick, r[0] + ":" + r[1] + ":" + firstPassages.get(i), tick)); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q7WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q7WindowedProcessor.java new file mode 100644 index 0000000..7114a26 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q7WindowedProcessor.java @@ -0,0 +1,115 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q7 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, for each (vehicle, POI), the first + * event in the window where the vehicle is inside the POI." Intra- + * window scoping (no cross-window first-passage state). + * + *

State encodes per-window the recorded (vehicle, POI, time) triples + * as {@code "vid:poiId:t,vid:poiId:t,..."}. On each event scan POIs; if + * vehicle inside and this (vehicle, POI) not yet recorded for this + * window, append. Punctuator emits each recorded triple for closed + * windows. + */ +public class Q7WindowedProcessor implements Processor { + + private final String storeName; + private final List pois; + private final long windowSizeMs; + private KeyValueStore winState; + private ProcessorContext ctx; + + public Q7WindowedProcessor(String storeName, List pois, long windowSizeMs) { + this.storeName = storeName; + this.pois = pois; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + String s = winState.get(winStart); + if (s == null) s = ""; + StringBuilder appended = new StringBuilder(s); + for (PointOfInterest poi : pois) { + String marker = trip.getVehicleId() + ":" + poi.id + ":"; + if (s.contains(marker)) continue; + if (MEOSBridge.dwithinMetres(trip.getLon(), trip.getLat(), poi.lon, poi.lat, poi.radiusMetres)) { + if (appended.length() > 0) appended.append(","); + appended.append(trip.getVehicleId()).append(":").append(poi.id).append(":").append(trip.getTimestamp()); + } + } + if (appended.length() > s.length()) { + winState.put(winStart, appended.toString()); + } else if (s.isEmpty()) { + winState.put(winStart, ""); + } + } + + private void punctuate(long currentStreamTime) { + List closedStarts = new ArrayList<>(); + List closedStates = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + closedStarts.add(kv.key); + closedStates.add(kv.value); + } + } + } + Integer[] idx = new Integer[closedStarts.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(closedStarts::get)); + for (Integer i : idx) { + long winStart = closedStarts.get(i); + String s = closedStates.get(i); + if (s != null && !s.isEmpty()) { + List vps = new ArrayList<>(); + List times = new ArrayList<>(); + for (String chunk : s.split(",")) { + String[] f = chunk.split(":", 3); + vps.add(new int[]{Integer.parseInt(f[0]), Integer.parseInt(f[1])}); + times.add(new long[]{Long.parseLong(f[2])}); + } + Integer[] sortedIdx = new Integer[vps.size()]; + for (int k = 0; k < sortedIdx.length; k++) sortedIdx[k] = k; + java.util.Arrays.sort(sortedIdx, Comparator + .comparingInt((Integer k) -> vps.get(k)[0]) + .thenComparingInt(k -> vps.get(k)[1])); + for (Integer k : sortedIdx) { + int[] vp = vps.get(k); + ctx.forward(new Record<>(winStart, + vp[0] + ":" + vp[1] + ":" + times.get(k)[0], + winStart + windowSizeMs - 1)); + } + } + winState.delete(winStart); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q8ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q8ContinuousProcessor.java new file mode 100644 index 0000000..34a0f7f --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q8ContinuousProcessor.java @@ -0,0 +1,45 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; + +/** + * BerlinMOD-Q8 — continuous form, Kafka-Streams Processor API. + * + *

"Is this vehicle currently within {@code d} metres of the road + * segment?" + * + *

Stateless per-event predicate using planar point-to-segment distance. + * Same shape as {@link Q3ContinuousProcessor} but with a segment-distance + * predicate instead of a point-radius one. + */ +public class Q8ContinuousProcessor implements Processor { + + private final double s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres; + private ProcessorContext ctx; + + public Q8ContinuousProcessor(double s1Lon, double s1Lat, double s2Lon, double s2Lat, double radiusMetres) { + this.s1Lon = s1Lon; + this.s1Lat = s1Lat; + this.s2Lon = s2Lon; + this.s2Lat = s2Lat; + this.radiusMetres = radiusMetres; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + boolean near = MEOSBridge.dwithinSegmentMetres( + trip.getLon(), trip.getLat(), + s1Lon, s1Lat, s2Lon, s2Lat, + radiusMetres); + ctx.forward(new Record<>(trip.getVehicleId(), near, trip.getTimestamp())); + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q8SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q8SnapshotProcessor.java new file mode 100644 index 0000000..8f9b268 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q8SnapshotProcessor.java @@ -0,0 +1,80 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q8 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, which vehicles are within {@code d} metres of the road + * segment (using each vehicle's most-recent-known position on or before + * T)?" + * + *

Same shape as {@link Q3SnapshotProcessor} with the point-to-segment + * predicate substituted for the point-radius one. + */ +public class Q8SnapshotProcessor implements Processor { + + private final String storeName; + private final double s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres; + private final long snapshotTickMillis; + private KeyValueStore lastPos; + private ProcessorContext ctx; + + public Q8SnapshotProcessor(String storeName, double s1Lon, double s1Lat, + double s2Lon, double s2Lat, double radiusMetres, + long snapshotTickMillis) { + this.storeName = storeName; + this.s1Lon = s1Lon; + this.s1Lat = s1Lat; + this.s2Lon = s2Lon; + this.s2Lat = s2Lat; + this.radiusMetres = radiusMetres; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.lastPos = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + lastPos.put(trip.getVehicleId(), trip.getLon() + "," + trip.getLat()); + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + List nearIds = new ArrayList<>(); + try (KeyValueIterator it = lastPos.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + String[] ll = kv.value.split(",", 2); + double lon = Double.parseDouble(ll[0]); + double lat = Double.parseDouble(ll[1]); + if (MEOSBridge.dwithinSegmentMetres(lon, lat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres)) { + nearIds.add(kv.key); + } + } + } + nearIds.sort(Comparator.naturalOrder()); + for (Integer vid : nearIds) { + ctx.forward(new Record<>(tick, vid, tick)); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q8WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q8WindowedProcessor.java new file mode 100644 index 0000000..a0a6932 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q8WindowedProcessor.java @@ -0,0 +1,103 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * BerlinMOD-Q8 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, how many distinct vehicles were + * within {@code d} metres of the road segment at any time during the + * window?" + * + *

Same shape as {@link Q3WindowedProcessor} with the segment-distance + * predicate substituted for the point-radius one. + */ +public class Q8WindowedProcessor implements Processor { + + private final String storeName; + private final double s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres; + private final long windowSizeMs; + private KeyValueStore winState; + private ProcessorContext ctx; + + public Q8WindowedProcessor(String storeName, double s1Lon, double s1Lat, + double s2Lon, double s2Lat, double radiusMetres, + long windowSizeMs) { + this.storeName = storeName; + this.s1Lon = s1Lon; + this.s1Lat = s1Lat; + this.s2Lon = s2Lon; + this.s2Lat = s2Lat; + this.radiusMetres = radiusMetres; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + if (!MEOSBridge.dwithinSegmentMetres(trip.getLon(), trip.getLat(), + s1Lon, s1Lat, s2Lon, s2Lat, + radiusMetres)) { + return; + } + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + String prior = winState.get(winStart); + String sv = Integer.toString(trip.getVehicleId()); + if (prior == null) { + winState.put(winStart, sv); + } else if (!contains(prior, sv)) { + winState.put(winStart, prior + "," + sv); + } + } + + private void punctuate(long currentStreamTime) { + List toEmit = new ArrayList<>(); + List counts = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + toEmit.add(kv.key); + Set distinct = new HashSet<>(); + for (String s : kv.value.split(",")) distinct.add(s); + counts.add((long) distinct.size()); + } + } + } + Integer[] idx = new Integer[toEmit.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(toEmit::get)); + for (Integer i : idx) { + long winStart = toEmit.get(i); + ctx.forward(new Record<>(winStart, counts.get(i), winStart + windowSizeMs - 1)); + winState.delete(winStart); + } + } + + private static boolean contains(String csv, String id) { + for (String s : csv.split(",")) if (s.equals(id)) return true; + return false; + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q9ContinuousProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q9ContinuousProcessor.java new file mode 100644 index 0000000..bc92a85 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q9ContinuousProcessor.java @@ -0,0 +1,69 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; + +/** + * BerlinMOD-Q9 — continuous form, Kafka-Streams Processor API. + * + *

"What is the current distance between vehicles X and Y?" + * + *

Caller should pre-filter the stream to {@code vehicleId ∈ {X, Y}} and + * key by a constant so the shared X+Y state lives in a single subtask. + * State encoded as a "xLon,xLat|yLon,yLat" string with NaN sentinels for + * unseen slots; per-event update the X or Y slot then forward + * {@code (eventTime, distanceMetres)} if both slots are known. + */ +public class Q9ContinuousProcessor implements Processor { + + private static final String UNSET = "NaN,NaN"; + + private final String storeName; + private final int xVehicleId; + private final int yVehicleId; + private KeyValueStore state; // single-key (0) -> "xLon,xLat|yLon,yLat" + private ProcessorContext ctx; + + public Q9ContinuousProcessor(String storeName, int xVehicleId, int yVehicleId) { + this.storeName = storeName; + this.xVehicleId = xVehicleId; + this.yVehicleId = yVehicleId; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.state = context.getStateStore(storeName); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + String s = state.get(0); + if (s == null) { + s = UNSET + "|" + UNSET; + } + String[] parts = s.split("\\|", 2); + String xSlot = parts[0]; + String ySlot = parts[1]; + if (trip.getVehicleId() == xVehicleId) { + xSlot = trip.getLon() + "," + trip.getLat(); + } else if (trip.getVehicleId() == yVehicleId) { + ySlot = trip.getLon() + "," + trip.getLat(); + } else { + return; + } + state.put(0, xSlot + "|" + ySlot); + if (!xSlot.startsWith("NaN") && !ySlot.startsWith("NaN")) { + String[] x = xSlot.split(",", 2); + String[] y = ySlot.split(",", 2); + double d = MEOSBridge.distanceMetres( + Double.parseDouble(x[0]), Double.parseDouble(x[1]), + Double.parseDouble(y[0]), Double.parseDouble(y[1])); + ctx.forward(new Record<>(trip.getTimestamp(), d, trip.getTimestamp())); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q9SnapshotProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q9SnapshotProcessor.java new file mode 100644 index 0000000..cbd0cad --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q9SnapshotProcessor.java @@ -0,0 +1,79 @@ +package berlinmod; + +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; + +/** + * BerlinMOD-Q9 — snapshot form, Kafka-Streams Processor API. + * + *

"At time T, what is the distance between vehicles X and Y (using + * their most-recent-known positions on or before T)?" + * + *

Single-key state holds the X+Y position pair. Per event for X or Y: + * update the slot. Per STREAM_TIME punctuator fire: if both slots are + * known, emit {@code (currentTick, distanceMetres)}. + */ +public class Q9SnapshotProcessor implements Processor { + + private static final String UNSET = "NaN,NaN"; + + private final String storeName; + private final int xVehicleId; + private final int yVehicleId; + private final long snapshotTickMillis; + private KeyValueStore state; // key=0 -> "xLon,xLat|yLon,yLat" + private ProcessorContext ctx; + + public Q9SnapshotProcessor(String storeName, int xVehicleId, int yVehicleId, long snapshotTickMillis) { + this.storeName = storeName; + this.xVehicleId = xVehicleId; + this.yVehicleId = yVehicleId; + this.snapshotTickMillis = snapshotTickMillis; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.state = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(snapshotTickMillis), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + String s = state.get(0); + if (s == null) s = UNSET + "|" + UNSET; + String[] parts = s.split("\\|", 2); + String xSlot = parts[0]; + String ySlot = parts[1]; + if (trip.getVehicleId() == xVehicleId) { + xSlot = trip.getLon() + "," + trip.getLat(); + } else if (trip.getVehicleId() == yVehicleId) { + ySlot = trip.getLon() + "," + trip.getLat(); + } else { + return; + } + state.put(0, xSlot + "|" + ySlot); + } + + private void punctuate(long currentStreamTime) { + long tick = (currentStreamTime / snapshotTickMillis) * snapshotTickMillis; + String s = state.get(0); + if (s == null) return; + String[] parts = s.split("\\|", 2); + if (parts[0].startsWith("NaN") || parts[1].startsWith("NaN")) return; + String[] x = parts[0].split(",", 2); + String[] y = parts[1].split(",", 2); + double d = MEOSBridge.distanceMetres( + Double.parseDouble(x[0]), Double.parseDouble(x[1]), + Double.parseDouble(y[0]), Double.parseDouble(y[1])); + ctx.forward(new Record<>(tick, d, tick)); + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/Q9WindowedProcessor.java b/kafka-streams-app/src/main/java/berlinmod/Q9WindowedProcessor.java new file mode 100644 index 0000000..ca3b0ee --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/Q9WindowedProcessor.java @@ -0,0 +1,102 @@ +package berlinmod; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BerlinMOD-Q9 — windowed form, Kafka-Streams Processor API. + * + *

"Per N-second tumbling window, X-Y distance at window-end using + * each vehicle's last position in the window." + * + *

State value encodes per-window the latest X and Y positions seen + * as {@code "xLon,xLat|yLon,yLat"} (with {@code NaN,NaN} for unseen). + * Punctuator emits distance for each closed window where both X and Y + * are known. + */ +public class Q9WindowedProcessor implements Processor { + + private static final String UNSET = "NaN,NaN"; + + private final String storeName; + private final int xVehicleId, yVehicleId; + private final long windowSizeMs; + private KeyValueStore winState; + private ProcessorContext ctx; + + public Q9WindowedProcessor(String storeName, int xVehicleId, int yVehicleId, long windowSizeMs) { + this.storeName = storeName; + this.xVehicleId = xVehicleId; + this.yVehicleId = yVehicleId; + this.windowSizeMs = windowSizeMs; + } + + @Override + public void init(ProcessorContext context) { + this.ctx = context; + this.winState = context.getStateStore(storeName); + context.schedule(Duration.ofMillis(windowSizeMs), + PunctuationType.STREAM_TIME, this::punctuate); + } + + @Override + public void process(Record record) { + BerlinMODTrip trip = record.value(); + if (trip == null || trip.getVehicleId() == -1) return; + long winStart = (trip.getTimestamp() / windowSizeMs) * windowSizeMs; + String s = winState.get(winStart); + if (s == null) s = UNSET + "|" + UNSET; + String[] parts = s.split("\\|", 2); + String xSlot = parts[0]; + String ySlot = parts[1]; + if (trip.getVehicleId() == xVehicleId) { + xSlot = trip.getLon() + "," + trip.getLat(); + } else if (trip.getVehicleId() == yVehicleId) { + ySlot = trip.getLon() + "," + trip.getLat(); + } else { + return; + } + winState.put(winStart, xSlot + "|" + ySlot); + } + + private void punctuate(long currentStreamTime) { + List closedStarts = new ArrayList<>(); + List closedStates = new ArrayList<>(); + try (KeyValueIterator it = winState.all()) { + while (it.hasNext()) { + KeyValue kv = it.next(); + if (kv.key + windowSizeMs <= currentStreamTime) { + closedStarts.add(kv.key); + closedStates.add(kv.value); + } + } + } + Integer[] idx = new Integer[closedStarts.size()]; + for (int i = 0; i < idx.length; i++) idx[i] = i; + java.util.Arrays.sort(idx, Comparator.comparingLong(closedStarts::get)); + for (Integer i : idx) { + long winStart = closedStarts.get(i); + String s = closedStates.get(i); + String[] parts = s.split("\\|", 2); + if (!parts[0].startsWith("NaN") && !parts[1].startsWith("NaN")) { + String[] x = parts[0].split(",", 2); + String[] y = parts[1].split(",", 2); + double d = MEOSBridge.distanceMetres( + Double.parseDouble(x[0]), Double.parseDouble(x[1]), + Double.parseDouble(y[0]), Double.parseDouble(y[1])); + ctx.forward(new Record<>(winStart, d, winStart + windowSizeMs - 1)); + } + winState.delete(winStart); + } + } +} diff --git a/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java b/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java new file mode 100644 index 0000000..da4efc9 --- /dev/null +++ b/kafka-streams-app/src/main/java/berlinmod/SegmentDistance.java @@ -0,0 +1,55 @@ +package berlinmod; + +/** + * Distance from a (lon, lat) point to a (lon, lat) line segment, in metres, + * via a local equirectangular projection centred on the segment midpoint. + * + *

Pure-Java fallback for {@link MEOSBridge#dwithinSegmentMetres}, used + * by the BerlinMOD-Q8 streaming scaffold when libmeos is not loadable on + * the runtime path. The primary point-to-line spatial predicate is + * {@link MEOSBridge#dwithinSegmentMetres}, which routes through MEOS' + * {@code geog_dwithin} on a LineString geography when available. + */ +public final class SegmentDistance { + + private static final double EARTH_RADIUS_METRES = 6_371_000.0; + + private SegmentDistance() {} + + public static double distanceMetres( + double pLon, double pLat, + double s1Lon, double s1Lat, + double s2Lon, double s2Lat) { + double midLat = (s1Lat + s2Lat) / 2.0; + double mPerDegLat = Math.toRadians(1.0) * EARTH_RADIUS_METRES; + double mPerDegLon = mPerDegLat * Math.cos(Math.toRadians(midLat)); + + double px = pLon * mPerDegLon; + double py = pLat * mPerDegLat; + double s1x = s1Lon * mPerDegLon; + double s1y = s1Lat * mPerDegLat; + double s2x = s2Lon * mPerDegLon; + double s2y = s2Lat * mPerDegLat; + + double dx = s2x - s1x; + double dy = s2y - s1y; + double lenSq = dx * dx + dy * dy; + if (lenSq == 0.0) { + return Math.hypot(px - s1x, py - s1y); + } + double t = ((px - s1x) * dx + (py - s1y) * dy) / lenSq; + if (t < 0.0) t = 0.0; + else if (t > 1.0) t = 1.0; + double cx = s1x + t * dx; + double cy = s1y + t * dy; + return Math.hypot(px - cx, py - cy); + } + + public static boolean withinMetres( + double pLon, double pLat, + double s1Lon, double s1Lat, + double s2Lon, double s2Lat, + double radiusMetres) { + return distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) <= radiusMetres; + } +} diff --git a/kafka-streams-app/src/main/resources/log4j.properties b/kafka-streams-app/src/main/resources/log4j.properties new file mode 100644 index 0000000..641e984 --- /dev/null +++ b/kafka-streams-app/src/main/resources/log4j.properties @@ -0,0 +1,13 @@ +# Minimal log4j config for the MobilityKafka scaffold + +log4j.rootLogger=WARN, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} [%t] %-5p %c{1} - %m%n + +log4j.logger.berlinmod=INFO + +# Kafka-Streams internals at WARN to keep test output readable +log4j.logger.org.apache.kafka=WARN +log4j.logger.org.apache.kafka.streams=WARN