Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 71 additions & 29 deletions tutorial-stream/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
"id": "f17b3a7e",
"metadata": {},
"source": [
"# MobilityAPI Tutorial (streaming)\n",
"# MobilityAPI \u2014 Tutorial (streaming)\n",
"\n",
"This notebook walks through the [OGC API Moving Features Part 4](https://www.opengis.net/spec/ogcapi-movingfeatures-4/1.0)\n",
"This notebook walks through the [OGC API \u2013 Moving Features \u2013 Part 4](https://www.opengis.net/spec/ogcapi-movingfeatures-4/1.0)\n",
"(Stream Extension) endpoints exposed by **MobilityAPI**, the thin compiled (Go)\n",
"tier over **MobilityDB/MEOS**, using the same one day of AIS (Automatic\n",
"Identification System) data from the [Danish Maritime Authority](http://aisdata.ais.dk/).\n",
"\n",
"Each moving-feature quantity is delivered as Server-Sent Events from a continuous\n",
"query; its request–response counterpart,\n",
"query; its request\u2013response counterpart,\n",
"[`tutorial/tutorial.ipynb`](../tutorial/tutorial.ipynb), retrieves the same\n",
"quantities with a `GET`."
]
Expand All @@ -23,20 +23,20 @@
"id": "34fb92c8",
"metadata": {},
"source": [
"## Relationship to the request–response tutorial\n",
"## Relationship to the request\u2013response tutorial\n",
"\n",
"Both tutorials operate on the same `ships` collection and expose the same\n",
"moving-feature quantities a feature, its speed-over-ground property, derived\n",
"moving-feature quantities \u2014 a feature, its speed-over-ground property, derived\n",
"measures, and a windowed aggregate. They differ only in interaction model: the\n",
"[request–response tutorial](../tutorial/tutorial.ipynb) retrieves each value with a\n",
"[request\u2013response tutorial](../tutorial/tutorial.ipynb) retrieves each value with a\n",
"`GET`, whereas this notebook registers a continuous query and receives the values\n",
"over Server-Sent Events. The corresponding operations are:\n",
"\n",
"| Quantity | Request–response | Streaming |\n",
"| Quantity | Request\u2013response | Streaming |\n",
"|---|---|---|\n",
"| A moving feature | `GET /items/{id}` | the same vessel |\n",
"| Speed over ground | `GET /tproperties/speed` | `POST /queries` SSE |\n",
"| Derived speed | `GET /velocity` | a `transform` continuous query |\n",
"| A moving feature | `GET \u2026/items/{id}` | the same vessel |\n",
"| Speed over ground | `GET \u2026/tproperties/speed` | `POST \u2026/queries` \u2192 SSE |\n",
"| Derived speed | `GET \u2026/velocity` | a `transform` continuous query |\n",
"| Windowed aggregate | aggregate the returned series | a windowed continuous query |\n",
"| Visualization | a static plot | the live animated map |"
]
Expand Down Expand Up @@ -90,7 +90,7 @@
"source": [
"## A moving feature and its speed\n",
"\n",
"The streaming tier serves the same `ships` collection as the request–response\n",
"The streaming tier serves the same `ships` collection as the request\u2013response\n",
"tutorial: one day of Danish AIS, each vessel a `tgeompoint` trajectory with a\n",
"stored speed-over-ground `tfloat` (the AIS `SOG`, in knots). This section reads one\n",
"vessel's `speed` property; the sections below stream it."
Expand All @@ -107,7 +107,7 @@
"FID = int(os.environ.get('MFAPI_FID', '1')) # most-travelled vessel by default\n",
"\n",
"show(S.get(f'{HOST}/collections/{CID}'))\n",
"# the vessel's stored speed over ground (AIS SOG) a tfloat in knots\n",
"# the vessel's stored speed over ground (AIS SOG) \u2014 a tfloat in knots\n",
"show(S.get(f'{HOST}/collections/{CID}/items/{FID}/tproperties/{PROP}'))"
]
},
Expand All @@ -118,14 +118,14 @@
"source": [
"## Continuous transform of a property\n",
"\n",
"`POST /tproperties/{name}/queries` registers a continuous query. The stored speed\n",
"is in knots; this query converts it to km/h (`× 1.852`) a lifted scalar\n",
"`POST \u2026/tproperties/{name}/queries` registers a continuous query. The stored speed\n",
"is in knots; this query converts it to km/h (`\u00d7 1.852`) \u2014 a lifted scalar\n",
"multiplication applied to each streamed instant. The response is the OGC `cquery`\n",
"link object: a `queryId`, a `status`, and the `href` of the Server-Sent Events\n",
"stream.\n",
"\n",
"Supported operations are the unary `ln, exp, log10, ceil, floor, abs, degrees,\n",
"radians` and the scalar-argument `add, sub, mul, div`."
"radians, sin, cos, tan` and the scalar-argument `add, sub, mul, div`."
]
},
{
Expand All @@ -150,7 +150,7 @@
"### Consuming the stream\n",
"\n",
"The `href` is a Server-Sent Events endpoint. Each `instant` event carries the\n",
"transformed value at its timestamp speed in km/h as the vessel's track is\n",
"transformed value at its timestamp \u2014 speed in km/h \u2014 as the vessel's track is\n",
"replayed. The next cell reads a few events and stops."
]
},
Expand All @@ -174,14 +174,53 @@
"read_sse(STREAM, n=10)"
]
},
{
"cell_type": "markdown",
"id": "a1b2c3d4",
"metadata": {},
"source": [
"### Trigonometric transforms\n",
"\n",
"MEOS also lifts `sin`, `cos`, and `tan` over every streamed instant.\n",
"These are most useful on angular properties \u2014 compass headings, phase\n",
"angles, or any cyclic sensor datum. For illustration the next cell applies\n",
"`sin` to the speed-over-ground property; on an angular property the output\n",
"would be a unit-circle component of the bearing."
]
},
{
"cell_type": "code",
"id": "e5f6a7b8",
"metadata": {},
"source": [
"q_sin = S.post(f'{HOST}/collections/{CID}/items/{FID}/tproperties/{PROP}/queries',\n",
" json={'operation': 'sin', 'intervalMs': 300})\n",
"show(q_sin)\n",
"SIN_ID = q_sin.json()['queryId']\n",
"\n",
"def read_trig(url, n=5, timeout=30):\n",
" with S.get(url, stream=True, timeout=timeout) as r:\n",
" got = 0\n",
" for line in r.iter_lines(decode_unicode=True):\n",
" if line and line.startswith('data:'):\n",
" ev = _json.loads(line[5:].strip())\n",
" print(f\"{ev['datetime']} {ev['operation']}({ev['property']}) = {ev['value']:.6f}\")\n",
" got += 1\n",
" if got >= n: break\n",
"read_trig(q_sin.json()['href'], n=5)\n",
"S.delete(f'{HOST}/collections/{CID}/items/{FID}/tproperties/{PROP}/queries/{SIN_ID}')"
],
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"id": "f2fb0517",
"metadata": {},
"source": [
"## Windowed aggregation\n",
"\n",
"A continuous query can also aggregate over a window (OGC MF Part 4): `AVG`, `SUM`,\n",
"A continuous query can also aggregate over a window (OGC MF \u2013 Part 4): `AVG`, `SUM`,\n",
"`MIN`, `MAX` or `COUNT` over a `COUNT`, `TUMBLING` or `HOPPING` window. Each result\n",
"carries the aggregate value together with its window bounds."
]
Expand Down Expand Up @@ -215,20 +254,20 @@
"id": "f0b86d5b",
"metadata": {},
"source": [
"## Visualization the animated fleet\n",
"## Visualization \u2014 the animated fleet\n",
"\n",
"The sections above retrieve streamed values through the API. The map below animates\n",
"the whole fleet of the same `ships` data on a single temporal clock. Its\n",
"distinctive property is that MEOS executes in the browser (WebAssembly): the tier\n",
"transmits each vessel's trajectory once MEOS-sampled to a visualization interval\n",
"and decoupled into a `path` and parallel `timestamps` and MEOS.js reconstructs a\n",
"transmits each vessel's trajectory once \u2014 MEOS-sampled to a visualization interval\n",
"and decoupled into a `path` and parallel `timestamps` \u2014 and MEOS.js reconstructs a\n",
"`TGeomPoint` per vessel and computes every vessel's position with `valueAtTimestamp`\n",
"on each animation frame. DeckGL renders the resulting positions over a MapLibre\n",
"basemap, with a play/pause, scrubber and pace controller.\n",
"\n",
"![Animated fleet MEOS.js computing every ship's position live, in the browser](map/fleet.gif)\n",
"![Animated fleet \u2014 MEOS.js computing every ship's position live, in the browser](map/fleet.gif)\n",
"\n",
" **[See the live, interactive animation](http://localhost:5174/fleet.html)** run\n",
"\u25b6 **[See the live, interactive animation](http://localhost:5174/fleet.html)** \u2014 run\n",
"`npm run build && npm run preview` in `tutorial-stream/map`, then open the URL to\n",
"zoom, pan, scrub the timeline and change the pace over the full fleet."
]
Expand All @@ -240,7 +279,7 @@
"metadata": {},
"outputs": [],
"source": [
"# the animated fleet map run `npm run build && npm run preview` in tutorial-stream/map first.\n",
"# the animated fleet map \u2014 run `npm run build && npm run preview` in tutorial-stream/map first.\n",
"# When this notebook is executed live, the cell renders the interactive map inline;\n",
"# GitHub's static view shows the GIF above instead.\n",
"from IPython.display import IFrame\n",
Expand All @@ -258,11 +297,14 @@
"- A stored `tfloat` (the vessel's AIS speed over ground) is transformed by a lifted\n",
" MEOS function in process, per record; the transform is exact because each streamed\n",
" record is an instant.\n",
"- Trigonometric transforms (`sin`, `cos`, `tan`) lift pointwise over every streamed\n",
" instant, enabling angular-property processing (headings, phase data) with no\n",
" client-side code.\n",
"- A windowed aggregation reduces the stream over `COUNT`, `TUMBLING` or `HOPPING`\n",
" windows.\n",
"- The request–response tutorial reads the same property and derived measures with\n",
"- The request\u2013response tutorial reads the same property and derived measures with\n",
" `GET`; this notebook receives them over Server-Sent Events. The data and the MEOS\n",
" computation are identical only the delivery differs."
" computation are identical \u2014 only the delivery differs."
]
},
{
Expand All @@ -272,8 +314,8 @@
"source": [
"## Query lifecycle\n",
"\n",
"The streaming API exposes a query lifecycle: `GET /queries/{queryId}` reports the\n",
"status and `DELETE` stops the query (`registered running stopped`), identically\n",
"The streaming API exposes a query lifecycle: `GET \u2026/queries/{queryId}` reports the\n",
"status and `DELETE` stops the query (`registered \u2192 running \u2192 stopped`), identically\n",
"across engines."
]
},
Expand Down Expand Up @@ -313,8 +355,8 @@
"```\n",
"\n",
"The `cquery` link object, the SSE stream, and the lifecycle are identical across\n",
"engines a Kafka Streams or Spark Structured Streaming engine plugs into the same\n",
"seam through `MFAPI_FLINK_CMD`. This mirrors the request–response tutorial, where\n",
"engines \u2014 a Kafka Streams or Spark Structured Streaming engine plugs into the same\n",
"seam through `MFAPI_FLINK_CMD`. This mirrors the request\u2013response tutorial, where\n",
"`MFAPI_DSN`'s scheme (`postgres://`, `duckdb:`, `spark:`) switches the database\n",
"backend with no change to the notebook."
]
Expand Down