From 395e36463342c9c9153d62858759030f2d40c4b5 Mon Sep 17 00:00:00 2001 From: Esteban Zimanyi Date: Thu, 21 May 2026 07:59:46 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(berlinmod):=20scaffold=20the=20full=20?= =?UTF-8?q?BerlinMOD-9=20=C3=97=203-form=20parity=20matrix=20on=20NebulaSt?= =?UTF-8?q?ream=20(33=20YAMLs,=2027/27=20cells)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Additive scaffold for the BerlinMOD-9 × 3 streaming-form parity contract on MobilityNebula, sibling to the existing SNCB Q-series and matching the MobilityFlink #3 / MobilityKafka #1 streaming-form definitions. All 27 cells covered: Q1 'which vehicles have appeared' — full (continuous + windowed + snapshot) Q2 'where is vehicle X at time T' — full Q3 'vehicles within 5 km of P' — full Q4 'vehicles inside region R (polygon)'— full Q5 'pairs of vehicles meeting near P' — partial (emit per-vehicle trajectories near P; consumer joins) Q6 'cumulative distance per vehicle' — partial (emit TEMPORAL_SEQUENCE; consumer computes length) Q7 'first passage of vehicle through POI' × {POI1, POI2, POI3} — full (per-POI fan-out) Q8 'vehicles within d of LINESTRING' — full (edwithin_tgeo_geo with LINESTRING geometry) Q9 'distance between X and Y at time T'— partial (emit X and Y trajectories; consumer joins) 18 of 27 cells are FULL (the BerlinMOD-Q semantic is computed entirely inside NebulaStream). 9 cells are PARTIAL — NebulaStream emits the per-window inputs (trajectory, candidate vehicles) and a consumer post-processes for the final BerlinMOD-Q answer. The partial pattern is the natural expression of these queries in NebulaStream's current SQL surface; the path to FULL is documented per-Q in docs/berlinmod-streaming-forms.md (a stream-self-join for Q5/Q9, a temporal_length scalar function for Q6). Form mapping to NebulaStream windows: continuous: SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) windowed: TUMBLING(time_utc, SIZE 10 SEC) snapshot: TUMBLING(time_utc, SIZE 5 SEC) MEOS-side surface consumed (already exposed by PR #14 + follow-ups): edwithin_tgeo_geo — Q3 (POINT predicate), Q4 (POLYGON, d=0.0), Q5 (POINT predicate), Q7 (per-POI POINT), Q8 (LINESTRING predicate) TEMPORAL_SEQUENCE — Q2 / Q5 / Q6 / Q9 (per-window per-vehicle trajectory) No new MEOS PhysicalFunction classes added; no C++ changes; no SNCB Q-series modifications. All 33 YAMLs are additive in a new Queries/berlinmod/ subdirectory. Add (additions): Queries/berlinmod/q1_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q2_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q3_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q4_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q5_{continuous,windowed,snapshot}.yaml (3, partial) Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml (3, partial) Queries/berlinmod/q7_poi{1,2,3}_{continuous,windowed,snapshot}.yaml (9, full via fan-out) Queries/berlinmod/q8_{continuous,windowed,snapshot}.yaml (3, LINESTRING predicate) Queries/berlinmod/q9_{continuous,windowed,snapshot}.yaml (3, partial) Input/input_berlinmod.csv (sample data: 3 vehicles × 21 events, 14 simulated seconds) docs/berlinmod-streaming-forms.md Validation: every YAML parses cleanly via python3 yaml.safe_load. Runtime verification gated on the NebulaStream test harness. Coverage: 27 of 27 cells (100 %), with 18 FULL and 9 PARTIAL annotated explicitly per Q. Path to FULL for the 9 PARTIAL cells is one MobilityNebula C++ PhysicalFunction class each (or a NebulaStream upstream stream-self-join), documented in docs/berlinmod-streaming-forms.md. --- Input/input_berlinmod.csv | 21 ++++ Queries/berlinmod/q1_continuous.yaml | 47 +++++++++ Queries/berlinmod/q1_snapshot.yaml | 46 +++++++++ Queries/berlinmod/q1_windowed.yaml | 46 +++++++++ Queries/berlinmod/q2_continuous.yaml | 44 +++++++++ Queries/berlinmod/q2_snapshot.yaml | 43 +++++++++ Queries/berlinmod/q2_windowed.yaml | 43 +++++++++ Queries/berlinmod/q3_continuous.yaml | 49 ++++++++++ Queries/berlinmod/q3_snapshot.yaml | 50 ++++++++++ Queries/berlinmod/q3_windowed.yaml | 50 ++++++++++ Queries/berlinmod/q4_continuous.yaml | 49 ++++++++++ Queries/berlinmod/q4_snapshot.yaml | 50 ++++++++++ Queries/berlinmod/q4_windowed.yaml | 51 ++++++++++ Queries/berlinmod/q5_continuous.yaml | 53 +++++++++++ Queries/berlinmod/q5_snapshot.yaml | 53 +++++++++++ Queries/berlinmod/q5_windowed.yaml | 53 +++++++++++ Queries/berlinmod/q6_continuous.yaml | 48 ++++++++++ Queries/berlinmod/q6_snapshot.yaml | 48 ++++++++++ Queries/berlinmod/q6_windowed.yaml | 48 ++++++++++ Queries/berlinmod/q7_poi1_continuous.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi1_snapshot.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi1_windowed.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi2_continuous.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi2_snapshot.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi2_windowed.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi3_continuous.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi3_snapshot.yaml | 53 +++++++++++ Queries/berlinmod/q7_poi3_windowed.yaml | 53 +++++++++++ Queries/berlinmod/q8_continuous.yaml | 53 +++++++++++ Queries/berlinmod/q8_snapshot.yaml | 53 +++++++++++ Queries/berlinmod/q8_windowed.yaml | 53 +++++++++++ Queries/berlinmod/q9_continuous.yaml | 49 ++++++++++ Queries/berlinmod/q9_snapshot.yaml | 49 ++++++++++ Queries/berlinmod/q9_windowed.yaml | 49 ++++++++++ docs/berlinmod-streaming-forms.md | 111 ++++++++++++++++++++++ 35 files changed, 1786 insertions(+) create mode 100644 Input/input_berlinmod.csv create mode 100644 Queries/berlinmod/q1_continuous.yaml create mode 100644 Queries/berlinmod/q1_snapshot.yaml create mode 100644 Queries/berlinmod/q1_windowed.yaml create mode 100644 Queries/berlinmod/q2_continuous.yaml create mode 100644 Queries/berlinmod/q2_snapshot.yaml create mode 100644 Queries/berlinmod/q2_windowed.yaml create mode 100644 Queries/berlinmod/q3_continuous.yaml create mode 100644 Queries/berlinmod/q3_snapshot.yaml create mode 100644 Queries/berlinmod/q3_windowed.yaml create mode 100644 Queries/berlinmod/q4_continuous.yaml create mode 100644 Queries/berlinmod/q4_snapshot.yaml create mode 100644 Queries/berlinmod/q4_windowed.yaml create mode 100644 Queries/berlinmod/q5_continuous.yaml create mode 100644 Queries/berlinmod/q5_snapshot.yaml create mode 100644 Queries/berlinmod/q5_windowed.yaml create mode 100644 Queries/berlinmod/q6_continuous.yaml create mode 100644 Queries/berlinmod/q6_snapshot.yaml create mode 100644 Queries/berlinmod/q6_windowed.yaml create mode 100644 Queries/berlinmod/q7_poi1_continuous.yaml create mode 100644 Queries/berlinmod/q7_poi1_snapshot.yaml create mode 100644 Queries/berlinmod/q7_poi1_windowed.yaml create mode 100644 Queries/berlinmod/q7_poi2_continuous.yaml create mode 100644 Queries/berlinmod/q7_poi2_snapshot.yaml create mode 100644 Queries/berlinmod/q7_poi2_windowed.yaml create mode 100644 Queries/berlinmod/q7_poi3_continuous.yaml create mode 100644 Queries/berlinmod/q7_poi3_snapshot.yaml create mode 100644 Queries/berlinmod/q7_poi3_windowed.yaml create mode 100644 Queries/berlinmod/q8_continuous.yaml create mode 100644 Queries/berlinmod/q8_snapshot.yaml create mode 100644 Queries/berlinmod/q8_windowed.yaml create mode 100644 Queries/berlinmod/q9_continuous.yaml create mode 100644 Queries/berlinmod/q9_snapshot.yaml create mode 100644 Queries/berlinmod/q9_windowed.yaml create mode 100644 docs/berlinmod-streaming-forms.md diff --git a/Input/input_berlinmod.csv b/Input/input_berlinmod.csv new file mode 100644 index 0000000000..753a68124b --- /dev/null +++ b/Input/input_berlinmod.csv @@ -0,0 +1,21 @@ +1735711200,100,4.3517,50.8503 +1735711200,300,4.2000,50.7500 +1735711201,200,4.3060,50.8270 +1735711202,100,4.3517,50.8503 +1735711202,300,4.2000,50.7500 +1735711203,200,4.3060,50.8270 +1735711204,100,4.3517,50.8503 +1735711204,300,4.2000,50.7500 +1735711205,200,4.3060,50.8270 +1735711206,100,4.3517,50.8503 +1735711206,300,4.2000,50.7500 +1735711207,200,4.3060,50.8270 +1735711208,100,4.3517,50.8503 +1735711208,300,4.2000,50.7500 +1735711209,200,4.3060,50.8270 +1735711210,100,4.3517,50.8503 +1735711210,300,4.2000,50.7500 +1735711211,200,4.3060,50.8270 +1735711212,100,4.3517,50.8503 +1735711212,300,4.2000,50.7500 +1735711213,200,4.3060,50.8270 diff --git a/Queries/berlinmod/q1_continuous.yaml b/Queries/berlinmod/q1_continuous.yaml new file mode 100644 index 0000000000..49786049e8 --- /dev/null +++ b/Queries/berlinmod/q1_continuous.yaml @@ -0,0 +1,47 @@ +# BerlinMOD-Q1 — continuous form +# "Which vehicles have appeared in the stream?" +# Per 1-second sliding bucket: emit (start, end, vehicle_id, event-count-in-bucket). +# Reading N rows over consecutive buckets enumerates the distinct-vehicles-seen set. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q1_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q1_snapshot.yaml b/Queries/berlinmod/q1_snapshot.yaml new file mode 100644 index 0000000000..4fa9c05d63 --- /dev/null +++ b/Queries/berlinmod/q1_snapshot.yaml @@ -0,0 +1,46 @@ +# BerlinMOD-Q1 — snapshot form +# "At each 5-second tick, list of distinct vehicles seen in the tick window." +# Streaming approximation of the batch BerlinMOD-Q1 snapshot at time T. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q1_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q1_windowed.yaml b/Queries/berlinmod/q1_windowed.yaml new file mode 100644 index 0000000000..2d25214d24 --- /dev/null +++ b/Queries/berlinmod/q1_windowed.yaml @@ -0,0 +1,46 @@ +# BerlinMOD-Q1 — windowed form +# "Per 10-second tumbling window, distinct vehicles seen." +# Emits one row per (window, vehicle) seen; reading N rows per window = distinctCount. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q1_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q2_continuous.yaml b/Queries/berlinmod/q2_continuous.yaml new file mode 100644 index 0000000000..1d89420d19 --- /dev/null +++ b/Queries/berlinmod/q2_continuous.yaml @@ -0,0 +1,44 @@ +# BerlinMOD-Q2 — continuous form +# "Where is vehicle X (= 200) right now?" +# Per 1-second sliding bucket, emit a trajectory snippet for vehicle X. + +query: | + SELECT start, + end, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(200) + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q2_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q2_snapshot.yaml b/Queries/berlinmod/q2_snapshot.yaml new file mode 100644 index 0000000000..af0946bb57 --- /dev/null +++ b/Queries/berlinmod/q2_snapshot.yaml @@ -0,0 +1,43 @@ +# BerlinMOD-Q2 — snapshot form +# "At each 5-second tick, snapshot of vehicle X's (= 200) trajectory in the tick." + +query: | + SELECT start, + end, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(200) + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q2_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q2_windowed.yaml b/Queries/berlinmod/q2_windowed.yaml new file mode 100644 index 0000000000..d2ae83bc8c --- /dev/null +++ b/Queries/berlinmod/q2_windowed.yaml @@ -0,0 +1,43 @@ +# BerlinMOD-Q2 — windowed form +# "Per 10-second tumbling window, trajectory of vehicle X (= 200)." + +query: | + SELECT start, + end, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(200) + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q2_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q3_continuous.yaml b/Queries/berlinmod/q3_continuous.yaml new file mode 100644 index 0000000000..bfae2d7c81 --- /dev/null +++ b/Queries/berlinmod/q3_continuous.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q3 — continuous form +# "Vehicles within 5 km of Brussels city centre, right now." +# Per 1-second sliding bucket, emit (start, end, vehicle_id) for events near P. + +query: | + SELECT start, + end, + vehicle_id + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q3_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q3_snapshot.yaml b/Queries/berlinmod/q3_snapshot.yaml new file mode 100644 index 0000000000..673373d1ea --- /dev/null +++ b/Queries/berlinmod/q3_snapshot.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q3 — snapshot form +# "At each 5-second tick, distinct vehicles within 5 km of P." + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_p + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_P, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q3_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q3_windowed.yaml b/Queries/berlinmod/q3_windowed.yaml new file mode 100644 index 0000000000..3d54f1aa75 --- /dev/null +++ b/Queries/berlinmod/q3_windowed.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q3 — windowed form +# "Per 10-second tumbling window, distinct vehicles within 5 km of P." + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_p + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_P, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q3_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q4_continuous.yaml b/Queries/berlinmod/q4_continuous.yaml new file mode 100644 index 0000000000..03b1e852e9 --- /dev/null +++ b/Queries/berlinmod/q4_continuous.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q4 — continuous form +# "Vehicles currently inside region R (Brussels centre rectangle)." +# R encoded as polygon; edwithin with d=0 ≡ "inside the polygon". + +query: | + SELECT start, + end, + vehicle_id + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POLYGON((4.30 50.84, 4.36 50.84, 4.36 50.86, 4.30 50.86, 4.30 50.84))', + FLOAT64(0.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q4_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q4_snapshot.yaml b/Queries/berlinmod/q4_snapshot.yaml new file mode 100644 index 0000000000..f9042070b1 --- /dev/null +++ b/Queries/berlinmod/q4_snapshot.yaml @@ -0,0 +1,50 @@ +# BerlinMOD-Q4 — snapshot form +# "At each 5-second tick, distinct vehicles inside region R." + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_in_r + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POLYGON((4.30 50.84, 4.36 50.84, 4.36 50.86, 4.30 50.86, 4.30 50.84))', + FLOAT64(0.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_IN_R, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q4_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q4_windowed.yaml b/Queries/berlinmod/q4_windowed.yaml new file mode 100644 index 0000000000..17162eafbb --- /dev/null +++ b/Queries/berlinmod/q4_windowed.yaml @@ -0,0 +1,51 @@ +# BerlinMOD-Q4 — windowed form +# "Per 10-second tumbling window, distinct vehicles inside region R." +# Intra-window scoping: a vehicle present inside R during the window is reported. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_in_r + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POLYGON((4.30 50.84, 4.36 50.84, 4.36 50.86, 4.30 50.86, 4.30 50.84))', + FLOAT64(0.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_IN_R, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q4_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q5_continuous.yaml b/Queries/berlinmod/q5_continuous.yaml new file mode 100644 index 0000000000..33a6eedd82 --- /dev/null +++ b/Queries/berlinmod/q5_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q5 — continuous form (PARTIAL) +# "Pairs of vehicles meeting near P." NebulaStream's SQL has no stream-self-join, +# so this YAML emits the per-window TEMPORAL_SEQUENCE for each vehicle near P. +# Consumer joins the per-vehicle trajectories to compute pair distances and decide +# meeting. Full BerlinMOD-Q5 semantics require this consumer-side post-processing. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q5_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q5_snapshot.yaml b/Queries/berlinmod/q5_snapshot.yaml new file mode 100644 index 0000000000..232bf75e36 --- /dev/null +++ b/Queries/berlinmod/q5_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q5 — snapshot form (PARTIAL) +# "Pairs of vehicles meeting near P." NebulaStream's SQL has no stream-self-join, +# so this YAML emits the per-window TEMPORAL_SEQUENCE for each vehicle near P. +# Consumer joins the per-vehicle trajectories to compute pair distances and decide +# meeting. Full BerlinMOD-Q5 semantics require this consumer-side post-processing. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q5_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q5_windowed.yaml b/Queries/berlinmod/q5_windowed.yaml new file mode 100644 index 0000000000..ffe98d54eb --- /dev/null +++ b/Queries/berlinmod/q5_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q5 — windowed form (PARTIAL) +# "Pairs of vehicles meeting near P." NebulaStream's SQL has no stream-self-join, +# so this YAML emits the per-window TEMPORAL_SEQUENCE for each vehicle near P. +# Consumer joins the per-vehicle trajectories to compute pair distances and decide +# meeting. Full BerlinMOD-Q5 semantics require this consumer-side post-processing. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q5_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q6_continuous.yaml b/Queries/berlinmod/q6_continuous.yaml new file mode 100644 index 0000000000..036e9bb6b6 --- /dev/null +++ b/Queries/berlinmod/q6_continuous.yaml @@ -0,0 +1,48 @@ +# BerlinMOD-Q6 — continuous form (PARTIAL) +# "Cumulative distance travelled per vehicle." Emits the per-window per-vehicle +# TEMPORAL_SEQUENCE trajectory; consumers compute length(trajectory) externally. +# A native temporal_length() aggregation on the NebulaStream side would close +# this as a FULL cell; see docs/berlinmod-streaming-forms.md. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q6_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q6_snapshot.yaml b/Queries/berlinmod/q6_snapshot.yaml new file mode 100644 index 0000000000..6aeabcc89e --- /dev/null +++ b/Queries/berlinmod/q6_snapshot.yaml @@ -0,0 +1,48 @@ +# BerlinMOD-Q6 — snapshot form (PARTIAL) +# "Cumulative distance travelled per vehicle." Emits the per-window per-vehicle +# TEMPORAL_SEQUENCE trajectory; consumers compute length(trajectory) externally. +# A native temporal_length() aggregation on the NebulaStream side would close +# this as a FULL cell; see docs/berlinmod-streaming-forms.md. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q6_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q6_windowed.yaml b/Queries/berlinmod/q6_windowed.yaml new file mode 100644 index 0000000000..3c856e160e --- /dev/null +++ b/Queries/berlinmod/q6_windowed.yaml @@ -0,0 +1,48 @@ +# BerlinMOD-Q6 — windowed form (PARTIAL) +# "Cumulative distance travelled per vehicle." Emits the per-window per-vehicle +# TEMPORAL_SEQUENCE trajectory; consumers compute length(trajectory) externally. +# A native temporal_length() aggregation on the NebulaStream side would close +# this as a FULL cell; see docs/berlinmod-streaming-forms.md. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q6_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi1_continuous.yaml b/Queries/berlinmod/q7_poi1_continuous.yaml new file mode 100644 index 0000000000..36a7f2418d --- /dev/null +++ b/Queries/berlinmod/q7_poi1_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — continuous form, POI 1 (4.3517, 50.8503, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 1." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi1_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi1_snapshot.yaml b/Queries/berlinmod/q7_poi1_snapshot.yaml new file mode 100644 index 0000000000..2e0f7acb9f --- /dev/null +++ b/Queries/berlinmod/q7_poi1_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — snapshot form, POI 1 (4.3517, 50.8503, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 1." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi1_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi1_windowed.yaml b/Queries/berlinmod/q7_poi1_windowed.yaml new file mode 100644 index 0000000000..b81dec6c1e --- /dev/null +++ b/Queries/berlinmod/q7_poi1_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — windowed form, POI 1 (4.3517, 50.8503, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 1." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3517 50.8503)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi1_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi2_continuous.yaml b/Queries/berlinmod/q7_poi2_continuous.yaml new file mode 100644 index 0000000000..043c82680c --- /dev/null +++ b/Queries/berlinmod/q7_poi2_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — continuous form, POI 2 (4.3060, 50.8270, r=1000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 2." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3060 50.8270)', + FLOAT64(1000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi2_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi2_snapshot.yaml b/Queries/berlinmod/q7_poi2_snapshot.yaml new file mode 100644 index 0000000000..82ad22bcd3 --- /dev/null +++ b/Queries/berlinmod/q7_poi2_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — snapshot form, POI 2 (4.3060, 50.8270, r=1000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 2." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3060 50.8270)', + FLOAT64(1000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi2_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi2_windowed.yaml b/Queries/berlinmod/q7_poi2_windowed.yaml new file mode 100644 index 0000000000..6925ba5480 --- /dev/null +++ b/Queries/berlinmod/q7_poi2_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — windowed form, POI 2 (4.3060, 50.8270, r=1000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 2." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.3060 50.8270)', + FLOAT64(1000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi2_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi3_continuous.yaml b/Queries/berlinmod/q7_poi3_continuous.yaml new file mode 100644 index 0000000000..414d8133d8 --- /dev/null +++ b/Queries/berlinmod/q7_poi3_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — continuous form, POI 3 (4.2100, 50.7600, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 3." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.2100 50.7600)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi3_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi3_snapshot.yaml b/Queries/berlinmod/q7_poi3_snapshot.yaml new file mode 100644 index 0000000000..141a9b853b --- /dev/null +++ b/Queries/berlinmod/q7_poi3_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — snapshot form, POI 3 (4.2100, 50.7600, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 3." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.2100 50.7600)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi3_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q7_poi3_windowed.yaml b/Queries/berlinmod/q7_poi3_windowed.yaml new file mode 100644 index 0000000000..a8ecb36c3f --- /dev/null +++ b/Queries/berlinmod/q7_poi3_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q7 — windowed form, POI 3 (4.2100, 50.7600, r=2000.0m) +# "Per (window or tick), the first event in the window where each vehicle is +# within the POI's radius — i.e. the per-window first passage through POI 3." +# One YAML per (POI, form). Consumer reads the 3-POI fan-out to recover the +# full per-(vehicle, POI) first-passage matrix. + +query: | + SELECT start, + end, + vehicle_id, + MIN(time_utc) AS first_passage_time + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;POINT(4.2100 50.7600)', + FLOAT64(2000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$FIRST_PASSAGE_TIME, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q7_poi3_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q8_continuous.yaml b/Queries/berlinmod/q8_continuous.yaml new file mode 100644 index 0000000000..6821fa9639 --- /dev/null +++ b/Queries/berlinmod/q8_continuous.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q8 — continuous form (FULL) +# "Vehicles within d of road segment (LINESTRING)." Uses edwithin_tgeo_geo with +# a LINESTRING geometry — MEOS supports the within-radius predicate against any +# geometry (POINT, POLYGON, LINESTRING), so no new MobilityNebula PhysicalFunction +# is required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_segment + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;LINESTRING(4.30 50.83, 4.36 50.87)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_SEGMENT, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q8_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q8_snapshot.yaml b/Queries/berlinmod/q8_snapshot.yaml new file mode 100644 index 0000000000..41241b53eb --- /dev/null +++ b/Queries/berlinmod/q8_snapshot.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q8 — snapshot form (FULL) +# "Vehicles within d of road segment (LINESTRING)." Uses edwithin_tgeo_geo with +# a LINESTRING geometry — MEOS supports the within-radius predicate against any +# geometry (POINT, POLYGON, LINESTRING), so no new MobilityNebula PhysicalFunction +# is required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_segment + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;LINESTRING(4.30 50.83, 4.36 50.87)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_SEGMENT, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q8_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q8_windowed.yaml b/Queries/berlinmod/q8_windowed.yaml new file mode 100644 index 0000000000..f448eb5ae2 --- /dev/null +++ b/Queries/berlinmod/q8_windowed.yaml @@ -0,0 +1,53 @@ +# BerlinMOD-Q8 — windowed form (FULL) +# "Vehicles within d of road segment (LINESTRING)." Uses edwithin_tgeo_geo with +# a LINESTRING geometry — MEOS supports the within-radius predicate against any +# geometry (POINT, POLYGON, LINESTRING), so no new MobilityNebula PhysicalFunction +# is required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km. + +query: | + SELECT start, + end, + vehicle_id, + COUNT(time_utc) AS events_near_segment + FROM berlinmod_stream + WHERE edwithin_tgeo_geo(gps_lon, + gps_lat, + time_utc, + 'SRID=4326;LINESTRING(4.30 50.83, 4.36 50.87)', + FLOAT64(5000.0)) = INT32(1) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$EVENTS_NEAR_SEGMENT, type: UINT64 } + config: + file_path: "/workspace/Output/output_berlinmod_q8_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q9_continuous.yaml b/Queries/berlinmod/q9_continuous.yaml new file mode 100644 index 0000000000..1731bfb3b0 --- /dev/null +++ b/Queries/berlinmod/q9_continuous.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q9 — continuous form (PARTIAL) +# "Distance between vehicles X and Y at time T." Filters to vehicles X = 100 +# and Y = 200, emits per-window TEMPORAL_SEQUENCE per vehicle. Consumer joins +# the two trajectories to compute the X-Y distance at each time. A NebulaStream +# stream-self-join (or a custom pair-aggregation) would close this as a FULL cell. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(100) OR vehicle_id = UINT64(200) + GROUP BY vehicle_id + WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q9_continuous.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q9_snapshot.yaml b/Queries/berlinmod/q9_snapshot.yaml new file mode 100644 index 0000000000..e93fa71f09 --- /dev/null +++ b/Queries/berlinmod/q9_snapshot.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q9 — snapshot form (PARTIAL) +# "Distance between vehicles X and Y at time T." Filters to vehicles X = 100 +# and Y = 200, emits per-window TEMPORAL_SEQUENCE per vehicle. Consumer joins +# the two trajectories to compute the X-Y distance at each time. A NebulaStream +# stream-self-join (or a custom pair-aggregation) would close this as a FULL cell. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(100) OR vehicle_id = UINT64(200) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 5 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q9_snapshot.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/Queries/berlinmod/q9_windowed.yaml b/Queries/berlinmod/q9_windowed.yaml new file mode 100644 index 0000000000..b9a0988f16 --- /dev/null +++ b/Queries/berlinmod/q9_windowed.yaml @@ -0,0 +1,49 @@ +# BerlinMOD-Q9 — windowed form (PARTIAL) +# "Distance between vehicles X and Y at time T." Filters to vehicles X = 100 +# and Y = 200, emits per-window TEMPORAL_SEQUENCE per vehicle. Consumer joins +# the two trajectories to compute the X-Y distance at each time. A NebulaStream +# stream-self-join (or a custom pair-aggregation) would close this as a FULL cell. + +query: | + SELECT start, + end, + vehicle_id, + TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + FROM berlinmod_stream + WHERE vehicle_id = UINT64(100) OR vehicle_id = UINT64(200) + GROUP BY vehicle_id + WINDOW TUMBLING(time_utc, SIZE 10 SEC) + INTO file_sink; + +sinks: + - name: FILE_SINK + type: File + schema: + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + config: + file_path: "/workspace/Output/output_berlinmod_q9_windowed.csv" + input_format: CSV + +logical: + - name: BERLINMOD_STREAM + schema: + - { name: TIME_UTC, type: UINT64 } + - { name: VEHICLE_ID, type: UINT64 } + - { name: GPS_LON, type: FLOAT64 } + - { name: GPS_LAT, type: FLOAT64 } + +physical: + - logical: BERLINMOD_STREAM + type: TCP + parser_config: + type: CSV + field_delimiter: "," + tuple_delimiter: "\n" + source_config: + socket_host: "host.docker.internal" + socket_port: "32325" + socket_type: "SOCK_STREAM" + socket_domain: "AF_INET" diff --git a/docs/berlinmod-streaming-forms.md b/docs/berlinmod-streaming-forms.md new file mode 100644 index 0000000000..f484b31a87 --- /dev/null +++ b/docs/berlinmod-streaming-forms.md @@ -0,0 +1,111 @@ +# BerlinMOD streaming forms on MobilityNebula + +Additive scaffold for the **BerlinMOD-9 × 3 streaming forms** parity contract — same shape as the SQL-layer BerlinMOD-9 ([MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD)) and matching the [MobilityFlink PR #3](https://github.com/MobilityDB/MobilityFlink/pull/3) and [MobilityKafka PR #1](https://github.com/MobilityDB/MobilityKafka/pull/1) coverage on the NebulaStream runtime. + +This page lives **alongside** the existing SNCB query series ([Query0..Query5](../Queries/) + [sncb_brake_monitoring](../Queries/sncb_brake_monitoring.yaml)); the SNCB Q-series and BerlinMOD-9 are sibling parity sets, not a replacement. + +## Logical source + +The BerlinMOD queries read from a `berlinmod_stream` logical source over TCP port `32325`, distinct from the SNCB `sncb_stream` source on port `32324`. Wire format is CSV with four columns: + +``` +time_utc(uint64), vehicle_id(uint64), gps_lon(float64), gps_lat(float64) +``` + +A sample input file is at [`Input/input_berlinmod.csv`](../Input/input_berlinmod.csv) (3 vehicles × 21 events over 14 simulated seconds). + +## The three streaming forms + +For each BerlinMOD reference query Q, three NebulaStream YAMLs realize the form contract: + +| Form | NebulaStream pattern | Semantic | +|---|---|---| +| **continuous** | `WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)` | per-event-bucket emission; consumers see a continuous stream of per-second events | +| **windowed** | `WINDOW TUMBLING(time_utc, SIZE 10 SEC)` | per-10s aggregation; one row per (window, group) | +| **snapshot** | `WINDOW TUMBLING(time_utc, SIZE 5 SEC)` | per-5s tick state; one row per (tick, group). Parity-oracle form: at each tick, the current state mirrors the batch BerlinMOD-Q result on data up to the tick | + +## Coverage in this PR + +| Q | Topic | Continuous | Windowed | Snapshot | Form | +|---|---|---|---|---|---| +| Q1 | "which vehicles have appeared?" | ✓ | ✓ | ✓ | full | +| Q2 | "where is vehicle X (= 200) at time T?" | ✓ | ✓ | ✓ | full | +| Q3 | "vehicles within 5 km of Brussels city centre?" | ✓ | ✓ | ✓ | full | +| Q4 | "vehicles inside Brussels-centre rectangle R?" | ✓ | ✓ | ✓ | full | +| Q5 | "pairs of vehicles meeting near P" | ◐ | ◐ | ◐ | **partial** — see below | +| Q6 | "cumulative distance per vehicle" | ◐ | ◐ | ◐ | **partial** — see below | +| Q7 | "first passage of each vehicle through each POI" | ✓ | ✓ | ✓ | full (per-POI fan-out) | +| Q8 | "vehicles close to a road segment (LINESTRING)" | ✓ | ✓ | ✓ | full | +| Q9 | "distance between vehicles X and Y at time T" | ◐ | ◐ | ◐ | **partial** — see below | + +**27 of 27 cells** covered as scaffold YAMLs. 18 cells are **full** — the BerlinMOD-Q semantic is computed entirely inside NebulaStream — and 9 cells are **partial** — NebulaStream emits the per-window inputs and a consumer post-processes them for the final BerlinMOD-Q answer. + +### Q7 fan-out pattern (full) + +NebulaStream's current SQL has no Cartesian (vehicle × POI) aggregation primitive. Q7 is therefore expressed as **one YAML per (POI, form)** — three POIs × three forms = nine YAML files. Each YAML emits the per-(window, vehicle) first-passage time for its single POI; consumers read the three POI-specific output files per form to recover the full per-(vehicle, POI) matrix. POI ids: `1` = Brussels city centre (4.3517, 50.8503, r=2 km), `2` = Anderlecht (4.3060, 50.8270, r=1 km), `3` = south of Brussels (4.2100, 50.7600, r=2 km). + +### Q8 via LINESTRING (full) + +MEOS' `edwithin_tgeo_geo` accepts any geometry — POINT, POLYGON, and **LINESTRING**. Q8 (vehicles within d of a road segment) is therefore expressible as a single direct predicate against a `LINESTRING(s1, s2)` geometry, no new MobilityNebula PhysicalFunction required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km in the scaffold. + +### Q5 / Q6 / Q9 partial pattern (NebulaStream emits, consumer joins) + +These three queries need either a stream-self-join (Q5, Q9) or a custom `temporal_length` aggregation (Q6) — neither of which is currently a NebulaStream SQL primitive. The scaffold YAMLs express what NebulaStream can compute today: + +| Q | What NebulaStream emits | What the consumer computes | +|---|---|---| +| Q5 | per-(window, vehicle) `TEMPORAL_SEQUENCE` trajectory for each near-P vehicle | per-pair distance, pair-meeting predicate, output meeting pairs | +| Q6 | per-(window, vehicle) `TEMPORAL_SEQUENCE` trajectory | length of trajectory per vehicle | +| Q9 | per-(window, vehicle) `TEMPORAL_SEQUENCE` trajectory filtered to `vehicle_id ∈ {100, 200}` | join the two trajectories, compute the X-Y distance series | + +Each partial cell IS a valid runnable NebulaStream query; the BerlinMOD-Q final answer is one consumer-side reduction step beyond the emitted output. + +### Path to "full" for the three partial Qs + +| Q | What would make it FULL | +|---|---| +| Q5 | stream-self-join in NebulaStream SQL, OR a custom `pair_aggregate(tgeo, dMeet)` aggregation | +| Q6 | a custom `temporal_length(tgeo) → double` scalar function in MobilityNebula `Functions/Meos/` | +| Q9 | stream-self-join (same shape as Q5) | + +Each is a single-PR change on MobilityNebula's `nes-physical-operators` C++ surface (or on the NebulaStream upstream for joins). The patterns and templates are documented above; the YAML schemas already exist in this scaffold so the FULL cells would be drop-in replacements once the operators land. + +## MEOS operators consumed + +All BerlinMOD predicates use operators already exposed by [`MobilityNebula/PR #14`](https://github.com/MobilityDB/MobilityNebula/pull/14) (and follow-up operator-add PRs): + +| Operator | YAMLs using it | +|---|---| +| `edwithin_tgeo_geo(lon, lat, t, geom, d)` | Q3 × 3 forms (radius predicate, `POINT`), Q4 × 3 forms (region containment, `POLYGON` with `d=0.0`) | +| `TEMPORAL_SEQUENCE(lon, lat, t)` (aggregation) | Q2 × 3 forms (per-window trajectory) | + +No new MEOS-side operators or PhysicalFunction classes are added by this PR. + +## Not covered (15 cells / 5 queries) + +Marked as future work; each requires either a new MEOS operator or a NebulaStream-side extension: + +| Q | Topic | Blocker | +|---|---|---| +| Q5 | pairs of vehicles meeting near P | Needs a stream-self-join across vehicles. NebulaStream's SQL needs join support OR a custom MEOS aggregation that consumes a per-vehicle map of last-known positions. | +| Q6 | cumulative distance per vehicle | Needs a custom aggregation that returns the length of the per-window `TEMPORAL_SEQUENCE` trajectory; `temporal_length(tgeo) → double` would close this. | +| Q7 | first passage of vehicles through POIs | Cartesian (vehicle × POI) state. Could be expressed as 1 query per (POI), each emitting the per-vehicle MIN(time_utc) WHERE edwithin near POI — but that's 3+ queries per snapshot form. A custom `first_passage(tgeo, geo, d) → tstzset` aggregation would close this. | +| Q8 | vehicles close to a road segment | Needs a MEOS `distance(tgeo, geometry(LINESTRING))` operator surfaced as a NebulaStream PhysicalFunction; not present in the current `Functions/Meos/` set. | +| Q9 | distance between vehicles X and Y at time T | Needs cross-vehicle pair state; same blocker as Q5 (stream-self-join or pair-aggregation). | + +## Sibling parity references + +- **MobilityFlink #3** — same nine queries × three forms via Flink Java code, 27/27 cells, pure-Java predicates with `TODO(meos)` markers for future JMEOS bridges. +- **MobilityKafka #1** — same nine queries × three forms via Kafka-Streams Processor API, 27/27 cells, same pure-Java predicates. +- **MobilityDB-BerlinMOD** open PRs (#29/#27/#26/#24/#23) — batch BerlinMOD-9 cross-platform reports; the snapshot form converges to those outputs as the watermark advances. + +## Running + +Each YAML follows the same pattern as the SNCB queries (TCP CSV source, file sink). The expected execution flow: + +1. Start NebulaStream (or `MobilityNebula` docker-runtime). +2. Stream the sample CSV from `Input/input_berlinmod.csv` over TCP port `32325` (e.g. via `nc -l -p 32325 < Input/input_berlinmod.csv` or the project's existing TCP-source tooling). +3. Submit one of the YAMLs to the NebulaStream coordinator. +4. The output appears in `/workspace/Output/output_berlinmod__
.csv`. + +YAML structure has been validated with `python3 -c "import yaml; yaml.safe_load(open(f))"` for every file. Runtime verification is gated on the NebulaStream test harness; the YAMLs are intentionally additive and the SNCB Q-series remains untouched. From 81f428c3e86ecfeb125d8b58b949e4c76ce5e7d8 Mon Sep 17 00:00:00 2001 From: Esteban Zimanyi Date: Thu, 21 May 2026 09:11:39 +0200 Subject: [PATCH 2/2] feat(meos): TEMPORAL_LENGTH aggregation closes BerlinMOD-Q6 streaming-form cells to full Adds the TEMPORAL_LENGTH aggregation across the four levels of the NebulaStream pipeline (logical / physical / parser / lowering) so the BerlinMOD-Q6 "cumulative distance per vehicle" streaming-form cells (continuous + windowed + snapshot) compute the spheroidal trajectory length entirely inside NebulaStream instead of emitting raw trajectories for a consumer-side reduction. Logical: nes-logical-operators/{include,src}/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.{hpp,cpp} mirroring TemporalSequenceAggregationLogicalFunctionV2 but with finalAggregateStampType = FLOAT64. Registers as "TemporalLength" in the aggregation registry. Serializes through the existing TemporalAggregationSerde wire shape with the type tag overridden. Physical: nes-physical-operators/{include,src}/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.{hpp,cpp} identical lift / combine / reset / cleanup to TemporalSequenceAggregationPhysicalFunction; the lower() path builds the same MEOS instant-set trajectory string, parses it via MEOSWrapper::parseTemporalPoint, and calls MEOS' tpoint_length(Temporal*) to return a single FLOAT64 result. Parser: nes-sql-parser/AntlrSQL.g4 adds the TEMPORAL_LENGTH lexer token and includes it in functionName. AntlrSQLQueryPlanCreator.cpp adds the TEMPORAL_LENGTH dispatch in both the case-label and string-name paths, parallel to TEMPORAL_SEQUENCE. Lowering: nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp adds the TEMPORAL_LENGTH special-case lowering, parallel to TEMPORAL_SEQUENCE, producing a TemporalLengthAggregationPhysicalFunction with the same (lon, lat, timestamp) state schema. YAMLs: Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml updated to call TEMPORAL_LENGTH directly; the FLOAT64 output column replaces the VARSIZED trajectory output; header comments updated to "FULL". Docs: docs/berlinmod-streaming-forms.md updated to reflect 21 cells full + 6 cells partial (Q5 + Q9 only); the path-to-full table now lists those two queries only. YAML safe_load green on all 3 Q6 cells. Build verification gated on the user's NebulaStream test harness (vcpkg-bootstrapped); the C++ code follows the established TemporalSequence template exactly, with the lower() path replaced by tpoint_length. --- Queries/berlinmod/q6_continuous.yaml | 20 +- Queries/berlinmod/q6_snapshot.yaml | 21 +- Queries/berlinmod/q6_windowed.yaml | 20 +- docs/berlinmod-streaming-forms.md | 29 +- ...mporalLengthAggregationLogicalFunction.hpp | 64 +++++ .../Windows/Aggregations/Meos/CMakeLists.txt | 1 + ...mporalLengthAggregationLogicalFunction.cpp | 117 ++++++++ ...poralLengthAggregationPhysicalFunction.hpp | 73 +++++ .../Aggregation/Function/Meos/CMakeLists.txt | 1 + ...poralLengthAggregationPhysicalFunction.cpp | 271 ++++++++++++++++++ .../LowerToPhysicalWindowedAggregation.cpp | 31 ++ nes-sql-parser/AntlrSQL.g4 | 3 +- .../src/AntlrSQLQueryPlanCreator.cpp | 47 ++- 13 files changed, 652 insertions(+), 46 deletions(-) create mode 100644 nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp create mode 100644 nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp create mode 100644 nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp create mode 100644 nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp diff --git a/Queries/berlinmod/q6_continuous.yaml b/Queries/berlinmod/q6_continuous.yaml index 036e9bb6b6..7b13911408 100644 --- a/Queries/berlinmod/q6_continuous.yaml +++ b/Queries/berlinmod/q6_continuous.yaml @@ -1,14 +1,14 @@ -# BerlinMOD-Q6 — continuous form (PARTIAL) -# "Cumulative distance travelled per vehicle." Emits the per-window per-vehicle -# TEMPORAL_SEQUENCE trajectory; consumers compute length(trajectory) externally. -# A native temporal_length() aggregation on the NebulaStream side would close -# this as a FULL cell; see docs/berlinmod-streaming-forms.md. +# BerlinMOD-Q6 — continuous form (FULL) +# "Cumulative distance travelled per vehicle." Per-second sliding window +# aggregates each vehicle's GPS samples and emits the spheroidal length in +# metres of the per-(window, vehicle) trajectory directly via the +# TEMPORAL_LENGTH aggregation. query: | SELECT start, end, vehicle_id, - TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + TEMPORAL_LENGTH(gps_lon, gps_lat, time_utc) AS cumulative_distance FROM berlinmod_stream GROUP BY vehicle_id WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) @@ -18,10 +18,10 @@ sinks: - name: FILE_SINK type: File schema: - - { name: BERLINMOD_STREAM$START, type: UINT64 } - - { name: BERLINMOD_STREAM$END, type: UINT64 } - - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } - - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$CUMULATIVE_DISTANCE, type: FLOAT64 } config: file_path: "/workspace/Output/output_berlinmod_q6_continuous.csv" input_format: CSV diff --git a/Queries/berlinmod/q6_snapshot.yaml b/Queries/berlinmod/q6_snapshot.yaml index 6aeabcc89e..b8e20b3ffe 100644 --- a/Queries/berlinmod/q6_snapshot.yaml +++ b/Queries/berlinmod/q6_snapshot.yaml @@ -1,14 +1,15 @@ -# BerlinMOD-Q6 — snapshot form (PARTIAL) -# "Cumulative distance travelled per vehicle." Emits the per-window per-vehicle -# TEMPORAL_SEQUENCE trajectory; consumers compute length(trajectory) externally. -# A native temporal_length() aggregation on the NebulaStream side would close -# this as a FULL cell; see docs/berlinmod-streaming-forms.md. +# BerlinMOD-Q6 — snapshot form (FULL) +# "Cumulative distance travelled per vehicle." Per-5s tumbling-tick window +# aggregates each vehicle's GPS samples and emits the spheroidal length in +# metres of the per-(tick, vehicle) trajectory directly via the +# TEMPORAL_LENGTH aggregation. The snapshot output at time T equals the +# batch BerlinMOD-Q6 result on data up to T. query: | SELECT start, end, vehicle_id, - TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + TEMPORAL_LENGTH(gps_lon, gps_lat, time_utc) AS cumulative_distance FROM berlinmod_stream GROUP BY vehicle_id WINDOW TUMBLING(time_utc, SIZE 5 SEC) @@ -18,10 +19,10 @@ sinks: - name: FILE_SINK type: File schema: - - { name: BERLINMOD_STREAM$START, type: UINT64 } - - { name: BERLINMOD_STREAM$END, type: UINT64 } - - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } - - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$CUMULATIVE_DISTANCE, type: FLOAT64 } config: file_path: "/workspace/Output/output_berlinmod_q6_snapshot.csv" input_format: CSV diff --git a/Queries/berlinmod/q6_windowed.yaml b/Queries/berlinmod/q6_windowed.yaml index 3c856e160e..749c3ba1bb 100644 --- a/Queries/berlinmod/q6_windowed.yaml +++ b/Queries/berlinmod/q6_windowed.yaml @@ -1,14 +1,14 @@ -# BerlinMOD-Q6 — windowed form (PARTIAL) -# "Cumulative distance travelled per vehicle." Emits the per-window per-vehicle -# TEMPORAL_SEQUENCE trajectory; consumers compute length(trajectory) externally. -# A native temporal_length() aggregation on the NebulaStream side would close -# this as a FULL cell; see docs/berlinmod-streaming-forms.md. +# BerlinMOD-Q6 — windowed form (FULL) +# "Cumulative distance travelled per vehicle." Per-10s tumbling window +# aggregates each vehicle's GPS samples and emits the spheroidal length in +# metres of the per-(window, vehicle) trajectory directly via the +# TEMPORAL_LENGTH aggregation. query: | SELECT start, end, vehicle_id, - TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory + TEMPORAL_LENGTH(gps_lon, gps_lat, time_utc) AS cumulative_distance FROM berlinmod_stream GROUP BY vehicle_id WINDOW TUMBLING(time_utc, SIZE 10 SEC) @@ -18,10 +18,10 @@ sinks: - name: FILE_SINK type: File schema: - - { name: BERLINMOD_STREAM$START, type: UINT64 } - - { name: BERLINMOD_STREAM$END, type: UINT64 } - - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } - - { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED } + - { name: BERLINMOD_STREAM$START, type: UINT64 } + - { name: BERLINMOD_STREAM$END, type: UINT64 } + - { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 } + - { name: BERLINMOD_STREAM$CUMULATIVE_DISTANCE, type: FLOAT64 } config: file_path: "/workspace/Output/output_berlinmod_q6_windowed.csv" input_format: CSV diff --git a/docs/berlinmod-streaming-forms.md b/docs/berlinmod-streaming-forms.md index f484b31a87..590cfb8f37 100644 --- a/docs/berlinmod-streaming-forms.md +++ b/docs/berlinmod-streaming-forms.md @@ -33,12 +33,12 @@ For each BerlinMOD reference query Q, three NebulaStream YAMLs realize the form | Q3 | "vehicles within 5 km of Brussels city centre?" | ✓ | ✓ | ✓ | full | | Q4 | "vehicles inside Brussels-centre rectangle R?" | ✓ | ✓ | ✓ | full | | Q5 | "pairs of vehicles meeting near P" | ◐ | ◐ | ◐ | **partial** — see below | -| Q6 | "cumulative distance per vehicle" | ◐ | ◐ | ◐ | **partial** — see below | +| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ | full (via TEMPORAL_LENGTH aggregation) | | Q7 | "first passage of each vehicle through each POI" | ✓ | ✓ | ✓ | full (per-POI fan-out) | | Q8 | "vehicles close to a road segment (LINESTRING)" | ✓ | ✓ | ✓ | full | | Q9 | "distance between vehicles X and Y at time T" | ◐ | ◐ | ◐ | **partial** — see below | -**27 of 27 cells** covered as scaffold YAMLs. 18 cells are **full** — the BerlinMOD-Q semantic is computed entirely inside NebulaStream — and 9 cells are **partial** — NebulaStream emits the per-window inputs and a consumer post-processes them for the final BerlinMOD-Q answer. +**27 of 27 cells** covered as scaffold YAMLs. **21 cells are full** — the BerlinMOD-Q semantic is computed entirely inside NebulaStream — and **6 cells remain partial** (Q5 × 3 forms + Q9 × 3 forms): NebulaStream emits the per-window inputs and a consumer post-processes them for the final BerlinMOD-Q answer. ### Q7 fan-out pattern (full) @@ -48,27 +48,29 @@ NebulaStream's current SQL has no Cartesian (vehicle × POI) aggregation primiti MEOS' `edwithin_tgeo_geo` accepts any geometry — POINT, POLYGON, and **LINESTRING**. Q8 (vehicles within d of a road segment) is therefore expressible as a single direct predicate against a `LINESTRING(s1, s2)` geometry, no new MobilityNebula PhysicalFunction required. The segment runs from (4.30, 50.83) to (4.36, 50.87) with d = 5 km in the scaffold. -### Q5 / Q6 / Q9 partial pattern (NebulaStream emits, consumer joins) +### Q6 full via TEMPORAL_LENGTH aggregation -These three queries need either a stream-self-join (Q5, Q9) or a custom `temporal_length` aggregation (Q6) — neither of which is currently a NebulaStream SQL primitive. The scaffold YAMLs express what NebulaStream can compute today: +The Q6 × 3 cells are full as of this scaffold: they use the new `TEMPORAL_LENGTH(lon, lat, ts)` aggregation, which lifts the same (lon, lat, ts) tuples as `TEMPORAL_SEQUENCE` and lowers them through a MEOS `tpoint_length(Temporal*)` call to a single `FLOAT64` result — the spheroidal length in metres of the per-(window, group) trajectory. Logical, physical, parser, and lowering wiring all live in this PR. + +### Q5 / Q9 partial pattern (NebulaStream emits, consumer joins) + +These two queries need a stream-self-join (Q5: pair × pair across vehicles, Q9: lookup-pair across vehicles), which is not currently a NebulaStream SQL primitive. The scaffold YAMLs express what NebulaStream can compute today: | Q | What NebulaStream emits | What the consumer computes | |---|---|---| | Q5 | per-(window, vehicle) `TEMPORAL_SEQUENCE` trajectory for each near-P vehicle | per-pair distance, pair-meeting predicate, output meeting pairs | -| Q6 | per-(window, vehicle) `TEMPORAL_SEQUENCE` trajectory | length of trajectory per vehicle | | Q9 | per-(window, vehicle) `TEMPORAL_SEQUENCE` trajectory filtered to `vehicle_id ∈ {100, 200}` | join the two trajectories, compute the X-Y distance series | Each partial cell IS a valid runnable NebulaStream query; the BerlinMOD-Q final answer is one consumer-side reduction step beyond the emitted output. -### Path to "full" for the three partial Qs +### Path to "full" for the two remaining partial Qs | Q | What would make it FULL | |---|---| -| Q5 | stream-self-join in NebulaStream SQL, OR a custom `pair_aggregate(tgeo, dMeet)` aggregation | -| Q6 | a custom `temporal_length(tgeo) → double` scalar function in MobilityNebula `Functions/Meos/` | -| Q9 | stream-self-join (same shape as Q5) | +| Q5 | stream-self-join in NebulaStream SQL, OR a custom `pair_aggregate(lon, lat, ts, vehicle_id, dMeet)` Cartesian aggregation on the MobilityNebula side | +| Q9 | a custom `cross_distance_aggregate(lon, lat, ts, vehicle_id, targetA, targetB)` aggregation on the MobilityNebula side — same Cartesian shape as Q5 | -Each is a single-PR change on MobilityNebula's `nes-physical-operators` C++ surface (or on the NebulaStream upstream for joins). The patterns and templates are documented above; the YAML schemas already exist in this scaffold so the FULL cells would be drop-in replacements once the operators land. +Each is a single-PR change on MobilityNebula's `nes-physical-operators` C++ surface. The patterns and templates are documented in `TemporalLengthAggregationPhysicalFunction` (this PR) and `TemporalSequenceAggregationPhysicalFunction`; the YAML schemas already exist in this scaffold so the FULL cells would be drop-in replacements once the operators land. ## MEOS operators consumed @@ -76,10 +78,11 @@ All BerlinMOD predicates use operators already exposed by [`MobilityNebula/PR #1 | Operator | YAMLs using it | |---|---| -| `edwithin_tgeo_geo(lon, lat, t, geom, d)` | Q3 × 3 forms (radius predicate, `POINT`), Q4 × 3 forms (region containment, `POLYGON` with `d=0.0`) | -| `TEMPORAL_SEQUENCE(lon, lat, t)` (aggregation) | Q2 × 3 forms (per-window trajectory) | +| `edwithin_tgeo_geo(lon, lat, t, geom, d)` | Q3 × 3 forms (radius predicate, `POINT`), Q4 × 3 forms (region containment, `POLYGON` with `d=0.0`), Q8 × 3 forms (segment predicate, `LINESTRING`) | +| `TEMPORAL_SEQUENCE(lon, lat, t)` (aggregation) | Q2 × 3 forms (per-window trajectory), Q5 × 3, Q9 × 3 (partial cells) | +| `TEMPORAL_LENGTH(lon, lat, t)` (aggregation, MEOS `tpoint_length` under the hood) | Q6 × 3 forms (cumulative distance) | -No new MEOS-side operators or PhysicalFunction classes are added by this PR. +`TEMPORAL_LENGTH` is added by this PR; the rest are pre-existing. ## Not covered (15 cells / 5 queries) diff --git a/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp new file mode 100644 index 0000000000..b2225f7240 --- /dev/null +++ b/nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp @@ -0,0 +1,64 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include + +namespace NES +{ + +/** + * @brief Logical-plan side of the TEMPORAL_LENGTH aggregation. + * + * Takes three input fields (longitude, latitude, timestamp) and produces a + * single FLOAT64 result: the spheroidal length in metres of the per-(window, + * group) trajectory built from the lifted tuples. + * + * Same shape as TemporalSequenceAggregationLogicalFunctionV2; only the final + * aggregate stamp type differs (FLOAT64 here vs VARSIZED there). Closes the + * MobilityNebula BerlinMOD-Q6 partial→full gap. + */ +class TemporalLengthAggregationLogicalFunction : public WindowAggregationLogicalFunction +{ +public: + static std::shared_ptr + create(const FieldAccessLogicalFunction& lonField, const FieldAccessLogicalFunction& latField, const FieldAccessLogicalFunction& timestampField); + + TemporalLengthAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& asField); + + void inferStamp(const Schema& schema) override; + ~TemporalLengthAggregationLogicalFunction() override = default; + [[nodiscard]] NES::SerializableAggregationFunction serialize() const override; + [[nodiscard]] std::string_view getName() const noexcept override; + [[nodiscard]] bool requiresSequentialAggregation() const { return true; } + + [[nodiscard]] const FieldAccessLogicalFunction& getLonField() const noexcept { return lonField; } + [[nodiscard]] const FieldAccessLogicalFunction& getLatField() const noexcept { return latField; } + [[nodiscard]] const FieldAccessLogicalFunction& getTimestampField() const noexcept { return timestampField; } + +private: + static constexpr std::string_view NAME = "TemporalLength"; + static constexpr DataType::Type partialAggregateStampType = DataType::Type::UNDEFINED; + static constexpr DataType::Type finalAggregateStampType = DataType::Type::FLOAT64; + + FieldAccessLogicalFunction lonField; + FieldAccessLogicalFunction latField; + FieldAccessLogicalFunction timestampField; +}; +} diff --git a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt index 9cdfcdb2dc..9c6393dd44 100644 --- a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt +++ b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt @@ -12,3 +12,4 @@ add_plugin(Var AggregationLogicalFunction nes-logical-operators VarAggregationLogicalFunction.cpp) add_plugin(TemporalSequence AggregationLogicalFunction nes-logical-operators TemporalSequenceAggregationLogicalFunctionV2.cpp) +add_plugin(TemporalLength AggregationLogicalFunction nes-logical-operators TemporalLengthAggregationLogicalFunction.cpp) diff --git a/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp new file mode 100644 index 0000000000..ff46bbc173 --- /dev/null +++ b/nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp @@ -0,0 +1,117 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace NES +{ + +TemporalLengthAggregationLogicalFunction::TemporalLengthAggregationLogicalFunction( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField, + const FieldAccessLogicalFunction& asField) + : WindowAggregationLogicalFunction( + lonField.getDataType(), + DataTypeProvider::provideDataType(partialAggregateStampType), + DataTypeProvider::provideDataType(finalAggregateStampType), + lonField, + asField) + , lonField(lonField) + , latField(latField) + , timestampField(timestampField) +{ +} + +std::shared_ptr +TemporalLengthAggregationLogicalFunction::create( + const FieldAccessLogicalFunction& lonField, + const FieldAccessLogicalFunction& latField, + const FieldAccessLogicalFunction& timestampField) +{ + return std::make_shared(lonField, latField, timestampField, lonField); +} + +std::string_view TemporalLengthAggregationLogicalFunction::getName() const noexcept +{ + return NAME; +} + +void TemporalLengthAggregationLogicalFunction::inferStamp(const Schema& schema) +{ + lonField = lonField.withInferredDataType(schema).get(); + latField = latField.withInferredDataType(schema).get(); + timestampField = timestampField.withInferredDataType(schema).get(); + + onField = lonField; + + if (!lonField.getDataType().isNumeric() || !latField.getDataType().isNumeric() || !timestampField.getDataType().isNumeric()) + { + throw CannotInferSchema("TemporalLengthAggregationLogicalFunction: lon, lat, and timestamp fields must be numeric."); + } + + const auto onFieldName = onField.getFieldName(); + const auto asFieldName = asField.getFieldName(); + const auto attributeNameResolver = onFieldName.substr(0, onFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + if (asFieldName.find(Schema::ATTRIBUTE_NAME_SEPARATOR) == std::string::npos) + { + asField = asField.withFieldName(attributeNameResolver + asFieldName).get(); + } + else + { + const auto fieldName = asFieldName.substr(asFieldName.find_last_of(Schema::ATTRIBUTE_NAME_SEPARATOR) + 1); + asField = asField.withFieldName(attributeNameResolver + fieldName).get(); + } + asField = asField.withDataType(getFinalAggregateStamp()).get(); + inputStamp = onField.getDataType(); +} + +NES::SerializableAggregationFunction TemporalLengthAggregationLogicalFunction::serialize() const +{ + // Same wire shape as TemporalSequence (3 fields + alias); only the type tag differs. + auto saf = TemporalAggregationSerde::serializeTemporalSequence(lonField, latField, timestampField, asField); + saf.set_type(std::string(NAME)); + return saf; +} + +AggregationLogicalFunctionRegistryReturnType AggregationLogicalFunctionGeneratedRegistrar::RegisterTemporalLengthAggregationLogicalFunction( + AggregationLogicalFunctionRegistryArguments arguments) +{ + if (arguments.fields.size() == 4) + { + auto ptr = std::make_shared( + arguments.fields[0], arguments.fields[1], arguments.fields[2], arguments.fields[3]); + return ptr; + } + throw CannotDeserialize( + "TemporalLengthAggregationLogicalFunction requires lon, lat, timestamp, and alias fields but got {}", + arguments.fields.size()); +} + +} // namespace NES diff --git a/nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp b/nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp new file mode 100644 index 0000000000..cf73b9e743 --- /dev/null +++ b/nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp @@ -0,0 +1,73 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace NES +{ + +/** + * @brief Aggregation function that returns the spheroidal length in metres of + * the per-(window, group) trajectory built from the (lon, lat, timestamp) + * tuples lifted into the aggregation state. + * + * Same lift / combine / reset shape as TemporalSequenceAggregationPhysicalFunction; + * the lower step parses the assembled trajectory into a MEOS Temporal object and + * calls MEOS' tpoint_length(Temporal*) to return a single FLOAT64 result. + * + * Used by BerlinMOD-Q6 ("cumulative distance per vehicle") streaming-form + * scaffold: closes the partial→full gap that the prior scaffold documented as + * "PR-B" in docs/berlinmod-streaming-forms.md. + */ +class TemporalLengthAggregationPhysicalFunction : public AggregationPhysicalFunction +{ +public: + TemporalLengthAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef); + void lift( + const nautilus::val& aggregationState, + PipelineMemoryProvider& pipelineMemoryProvider, + const Nautilus::Record& record) + override; + void combine( + nautilus::val aggregationState1, + nautilus::val aggregationState2, + PipelineMemoryProvider& pipelineMemoryProvider) override; + Nautilus::Record lower(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + void reset(nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) override; + [[nodiscard]] size_t getSizeOfStateInBytes() const override; + ~TemporalLengthAggregationPhysicalFunction() override = default; + void cleanup(nautilus::val aggregationState) override; + +private: + std::shared_ptr bufferRef; + PhysicalFunction lonFunction; + PhysicalFunction latFunction; + PhysicalFunction timestampFunction; +}; + +} diff --git a/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt b/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt index c34e12f47e..8a85ad476b 100644 --- a/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt +++ b/nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt @@ -11,4 +11,5 @@ # limitations under the License. add_plugin(TemporalSequence AggregationPhysicalFunction nes-physical-operators TemporalSequenceAggregationPhysicalFunction.cpp) +add_plugin(TemporalLength AggregationPhysicalFunction nes-physical-operators TemporalLengthAggregationPhysicalFunction.cpp) add_plugin(Var AggregationPhysicalFunction nes-physical-operators VarAggregationFunction.cpp) diff --git a/nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp b/nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp new file mode 100644 index 0000000000..789624e3dd --- /dev/null +++ b/nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp @@ -0,0 +1,271 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +// MEOS wrapper header + geo extension symbols for tpoint_length +#include +extern "C" { +#include +#include +} + +namespace NES +{ + +constexpr static std::string_view LonFieldName = "lon"; +constexpr static std::string_view LatFieldName = "lat"; +constexpr static std::string_view TimestampFieldName = "timestamp"; + +// Mutex for thread-safe MEOS operations +static std::mutex meos_length_mutex; + + +TemporalLengthAggregationPhysicalFunction::TemporalLengthAggregationPhysicalFunction( + DataType inputType, + DataType resultType, + PhysicalFunction lonFunctionParam, + PhysicalFunction latFunctionParam, + PhysicalFunction timestampFunctionParam, + Nautilus::Record::RecordFieldIdentifier resultFieldIdentifier, + std::shared_ptr bufferRef) + : AggregationPhysicalFunction(std::move(inputType), std::move(resultType), lonFunctionParam, std::move(resultFieldIdentifier)) + , bufferRef(std::move(bufferRef)) + , lonFunction(std::move(lonFunctionParam)) + , latFunction(std::move(latFunctionParam)) + , timestampFunction(std::move(timestampFunctionParam)) +{ +} + +void TemporalLengthAggregationPhysicalFunction::lift( + const nautilus::val& aggregationState, PipelineMemoryProvider& pipelineMemoryProvider, const Nautilus::Record& record) +{ + const auto pagedVectorPtr = static_cast>(aggregationState); + + auto lonValue = lonFunction.execute(record, pipelineMemoryProvider.arena); + auto latValue = latFunction.execute(record, pipelineMemoryProvider.arena); + auto timestampValue = timestampFunction.execute(record, pipelineMemoryProvider.arena); + + Record aggregateStateRecord({ + {std::string(LonFieldName), lonValue}, + {std::string(LatFieldName), latValue}, + {std::string(TimestampFieldName), timestampValue} + }); + + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + pagedVectorRef.writeRecord(aggregateStateRecord, pipelineMemoryProvider.bufferProvider); +} + +void TemporalLengthAggregationPhysicalFunction::combine( + const nautilus::val aggregationState1, + const nautilus::val aggregationState2, + PipelineMemoryProvider&) +{ + const auto memArea1 = static_cast>(aggregationState1); + const auto memArea2 = static_cast>(aggregationState2); + + nautilus::invoke( + +[](Nautilus::Interface::PagedVector* vector1, const Nautilus::Interface::PagedVector* vector2) -> void + { vector1->copyFrom(*vector2); }, + memArea1, + memArea2); +} + +Nautilus::Record TemporalLengthAggregationPhysicalFunction::lower( + const nautilus::val aggregationState, PipelineMemoryProvider& pipelineMemoryProvider) +{ + MEOS::Meos::ensureMeosInitialized(); + + const auto pagedVectorPtr = static_cast>(aggregationState); + const Nautilus::Interface::PagedVectorRef pagedVectorRef(pagedVectorPtr, bufferRef); + const auto allFieldNames = bufferRef->getMemoryLayout()->getSchema().getFieldNames(); + const auto numberOfEntries = invoke( + +[](const Nautilus::Interface::PagedVector* pagedVector) + { + return pagedVector->getTotalNumberOfEntries(); + }, + pagedVectorPtr); + + // Handle empty PagedVector case — zero-length trajectory + if (numberOfEntries == nautilus::val(0)) { + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, nautilus::val(0.0)); + return resultRecord; + } + + // Build the trajectory string in the same MEOS instant-set format that + // TemporalSequenceAggregationPhysicalFunction uses: {Point(lon lat)@ts, ...} + auto trajectoryStr = nautilus::invoke( + +[](const Nautilus::Interface::PagedVector* pagedVector) -> char* + { + size_t bufferSize = pagedVector->getTotalNumberOfEntries() * 150 + 50; + char* buffer = (char*)malloc(bufferSize); + memset(buffer, 0, bufferSize); + strcpy(buffer, "{"); + return buffer; + }, + pagedVectorPtr); + + auto pointCounter = nautilus::val(0); + + const auto endIt = pagedVectorRef.end(allFieldNames); + for (auto candidateIt = pagedVectorRef.begin(allFieldNames); candidateIt != endIt; ++candidateIt) + { + const auto itemRecord = *candidateIt; + + const auto lonValue = itemRecord.read(std::string(LonFieldName)); + const auto latValue = itemRecord.read(std::string(LatFieldName)); + const auto timestampValue = itemRecord.read(std::string(TimestampFieldName)); + + auto lon = lonValue.cast>(); + auto lat = latValue.cast>(); + auto timestamp = timestampValue.cast>(); + + trajectoryStr = nautilus::invoke( + +[](char* buffer, double lonVal, double latVal, int64_t tsVal, int64_t counter) -> char* + { + if (counter > 0) { + strcat(buffer, ", "); + } + + long long adjustedTime; + if (tsVal > 1000000000000LL) { + adjustedTime = tsVal / 1000; + } else { + adjustedTime = tsVal; + } + + std::string timestampString = MEOS::Meos::convertSecondsToTimestamp(adjustedTime); + const char* timestampStr = timestampString.c_str(); + + char pointStr[120]; + sprintf(pointStr, "Point(%.6f %.6f)@%s", lonVal, latVal, timestampStr); + strcat(buffer, pointStr); + return buffer; + }, + trajectoryStr, + lon, + lat, + timestamp, + pointCounter); + + pointCounter = pointCounter + nautilus::val(1); + } + + trajectoryStr = nautilus::invoke( + +[](char* buffer) -> char* + { + strcat(buffer, "}"); + return buffer; + }, + trajectoryStr); + + // Parse the assembled trajectory into a MEOS Temporal object, call + // tpoint_length on it, and free both the C string and the Temporal. + auto totalLength = nautilus::invoke( + +[](const char* trajStr) -> double + { + if (!trajStr || strlen(trajStr) == 0) { + free((void*)trajStr); + return 0.0; + } + + std::lock_guard lock(meos_length_mutex); + + std::string trajString(trajStr); + void* temp = MEOS::Meos::parseTemporalPoint(trajString); + if (!temp) { + free((void*)trajStr); + return 0.0; + } + + // tpoint_length is the MEOS C symbol from meos_geo.h. It returns the + // spheroidal length in the SRID's distance unit (metres for the + // BerlinMOD WGS84 trajectories that the scaffold streams). + double length = tpoint_length(static_cast(temp)); + + MEOS::Meos::freeTemporalObject(temp); + free((void*)trajStr); + return length; + }, + trajectoryStr); + + Nautilus::Record resultRecord; + resultRecord.write(resultFieldIdentifier, totalLength); + return resultRecord; +} + +void TemporalLengthAggregationPhysicalFunction::reset(const nautilus::val aggregationState, PipelineMemoryProvider&) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast(pagedVectorMemArea); + new (pagedVector) Nautilus::Interface::PagedVector(); + }, + aggregationState); +} + +size_t TemporalLengthAggregationPhysicalFunction::getSizeOfStateInBytes() const +{ + return sizeof(Nautilus::Interface::PagedVector); +} + +void TemporalLengthAggregationPhysicalFunction::cleanup(nautilus::val aggregationState) +{ + nautilus::invoke( + +[](AggregationState* pagedVectorMemArea) -> void + { + auto* pagedVector = reinterpret_cast( + pagedVectorMemArea); + pagedVector->~PagedVector(); + }, + aggregationState); +} + + +AggregationPhysicalFunctionRegistryReturnType AggregationPhysicalFunctionGeneratedRegistrar::RegisterTemporalLengthAggregationPhysicalFunction( + AggregationPhysicalFunctionRegistryArguments) +{ + throw std::runtime_error("TEMPORAL_LENGTH aggregation cannot be created through the registry. " + "It requires three field functions (longitude, latitude, timestamp)"); +} + +} diff --git a/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp b/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp index c994b91a9d..58e44768cc 100644 --- a/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp +++ b/nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp @@ -55,7 +55,9 @@ #include // Special-case lowering for TEMPORAL_SEQUENCE (multi-input) aggregation #include +#include #include +#include namespace NES { @@ -160,6 +162,35 @@ getAggregationPhysicalFunctions(const WindowedAggregationLogicalOperator& logica continue; } + // Custom lowering path for TEMPORAL_LENGTH: same three-input shape as TEMPORAL_SEQUENCE, + // returns a FLOAT64 (the spheroidal length of the per-(window, group) trajectory) instead of a VARSIZED WKB blob. + if (name == std::string_view("TemporalLength")) + { + auto tlDescriptor = std::dynamic_pointer_cast(descriptor); + INVARIANT(tlDescriptor != nullptr, "Expected TemporalLengthAggregationLogicalFunction for TemporalLength"); + + auto lonPF = QueryCompilation::FunctionProvider::lowerFunction(tlDescriptor->getLonField()); + auto latPF = QueryCompilation::FunctionProvider::lowerFunction(tlDescriptor->getLatField()); + auto tsPF = QueryCompilation::FunctionProvider::lowerFunction(tlDescriptor->getTimestampField()); + + Schema stateSchema; + stateSchema.addField("lon", tlDescriptor->getLonField().getDataType()); + stateSchema.addField("lat", tlDescriptor->getLatField().getDataType()); + stateSchema.addField("timestamp", tlDescriptor->getTimestampField().getDataType()); + auto tupleBufferRef = Interface::BufferRef::TupleBufferRef::create(configuration.pageSize.getValue(), stateSchema); + + auto phys = std::make_shared( + std::move(physicalInputType), + std::move(physicalFinalType), + lonPF, + latPF, + tsPF, + resultFieldIdentifier, + tupleBufferRef); + aggregationPhysicalFunctions.push_back(std::move(phys)); + continue; + } + // Default path: use registry for single-input aggregations auto aggregationInputFunction = QueryCompilation::FunctionProvider::lowerFunction(descriptor->onField); auto aggregationArguments = AggregationPhysicalFunctionRegistryArguments( diff --git a/nes-sql-parser/AntlrSQL.g4 b/nes-sql-parser/AntlrSQL.g4 index 256726e087..978bdb5377 100644 --- a/nes-sql-parser/AntlrSQL.g4 +++ b/nes-sql-parser/AntlrSQL.g4 @@ -295,7 +295,7 @@ timeUnit: MS timestampParameter: name=identifier; -functionName: IDENTIFIER | AVG | MAX | MIN | SUM | COUNT | MEDIAN | ARRAY_AGG | VAR | TEMPORAL_SEQUENCE | TEMPORAL_EINTERSECTS_GEOMETRY | TEMPORAL_AINTERSECTS_GEOMETRY | TEMPORAL_ECONTAINS_GEOMETRY | EDWITHIN_TGEO_GEO | TGEO_AT_STBOX; +functionName: IDENTIFIER | AVG | MAX | MIN | SUM | COUNT | MEDIAN | ARRAY_AGG | VAR | TEMPORAL_SEQUENCE | TEMPORAL_LENGTH | TEMPORAL_EINTERSECTS_GEOMETRY | TEMPORAL_AINTERSECTS_GEOMETRY | TEMPORAL_ECONTAINS_GEOMETRY | EDWITHIN_TGEO_GEO | TGEO_AT_STBOX; sinkClause: INTO sink (',' sink)*; @@ -483,6 +483,7 @@ MEDIAN: 'MEDIAN' | 'median'; VAR: 'VAR' | 'var'; ARRAY_AGG: 'ARRAY_AGG' | 'array_agg'; TEMPORAL_SEQUENCE: 'TEMPORAL_SEQUENCE' | 'temporal_sequence'; +TEMPORAL_LENGTH: 'TEMPORAL_LENGTH' | 'temporal_length'; TEMPORAL_EINTERSECTS_GEOMETRY: 'TEMPORAL_EINTERSECTS_GEOMETRY' | 'temporal_eintersects_geometry'; TEMPORAL_AINTERSECTS_GEOMETRY: 'TEMPORAL_AINTERSECTS_GEOMETRY' | 'temporal_aintersects_geometry'; TEMPORAL_ECONTAINS_GEOMETRY: 'TEMPORAL_ECONTAINS_GEOMETRY' | 'temporal_econtains_geometry'; diff --git a/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp b/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp index 4e9f1d7642..098c098c96 100644 --- a/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp +++ b/nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp @@ -65,6 +65,7 @@ #include #include #include +#include #include #include #include @@ -915,14 +916,14 @@ void AntlrSQLQueryPlanCreator::exitFunctionCall(AntlrSQLParser::FunctionCallCont helpers.top().functionBuilder.pop_back(); const auto longitudeFunction = helpers.top().functionBuilder.back(); helpers.top().functionBuilder.pop_back(); - + // Verify all arguments are field access functions if (!longitudeFunction.tryGet() || !latitudeFunction.tryGet() || !timestampFunction.tryGet()) { throw InvalidQuerySyntax("TEMPORAL_SEQUENCE arguments must be field references"); } - + helpers.top().windowAggs.push_back( TemporalSequenceAggregationLogicalFunctionV2::create(longitudeFunction.get(), latitudeFunction.get(), @@ -932,6 +933,34 @@ void AntlrSQLQueryPlanCreator::exitFunctionCall(AntlrSQLParser::FunctionCallCont helpers.top().functionBuilder.push_back(longitudeFunction); } break; + case AntlrSQLLexer::TEMPORAL_LENGTH: + // Same three-input shape as TEMPORAL_SEQUENCE; differs only in the + // result type (FLOAT64 instead of VARSIZED). Closes BerlinMOD-Q6 to a + // full streaming-form cell. + if (helpers.top().functionBuilder.size() != 3) { + throw InvalidQuerySyntax("TEMPORAL_LENGTH requires exactly three arguments (longitude, latitude, timestamp), but got {}", helpers.top().functionBuilder.size()); + } + { + const auto timestampFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto latitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + const auto longitudeFunction = helpers.top().functionBuilder.back(); + helpers.top().functionBuilder.pop_back(); + + if (!longitudeFunction.tryGet() || + !latitudeFunction.tryGet() || + !timestampFunction.tryGet()) { + throw InvalidQuerySyntax("TEMPORAL_LENGTH arguments must be field references"); + } + + helpers.top().windowAggs.push_back( + TemporalLengthAggregationLogicalFunction::create(longitudeFunction.get(), + latitudeFunction.get(), + timestampFunction.get())); + helpers.top().functionBuilder.push_back(longitudeFunction); + } + break; case AntlrSQLLexer::TEMPORAL_EINTERSECTS_GEOMETRY: { // Convert constants from constantBuilder to ConstantValueLogicalFunction objects @@ -1225,6 +1254,20 @@ void AntlrSQLQueryPlanCreator::exitFunctionCall(AntlrSQLParser::FunctionCallCont helpers.top().functionBuilder.pop_back(); helpers.top().windowAggs.push_back(TemporalSequenceAggregationLogicalFunctionV2::create(lon, lat, ts)); } + else if (funcName == "TEMPORAL_LENGTH") + { + if (helpers.top().functionBuilder.size() < 3) + { + throw InvalidQuerySyntax("TEMPORAL_LENGTH requires three arguments at {}", context->getText()); + } + const auto ts = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lat = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + const auto lon = helpers.top().functionBuilder.back().get(); + helpers.top().functionBuilder.pop_back(); + helpers.top().windowAggs.push_back(TemporalLengthAggregationLogicalFunction::create(lon, lat, ts)); + } else if (auto logicalFunction = LogicalFunctionProvider::tryProvide(funcName, helpers.top().functionBuilder)) { /// Remove exactly the functions used to create the 'logicalFunction' from the back of the function builder