Skip to content

Commit 7ebf4d3

Browse files
Add bulk ingest of a fleet feed (POST /collections/{cid}/bulk)
POST /collections/{collectionId}/bulk accepts a batch of (vehicleId, position, time) observations and appends each as one instant to the matching moving feature's tgeompoint trajectory in temporal_geometries, creating the feature on first sight. The batch may be GeoJSON (a FeatureCollection of Point features) or GeoParquet (one row per observation), and may be gzip/deflate/br/zstd-compressed via Content-Encoding; the whole batch commits atomically and an out-of-order instant returns 409. Geometry and temporal work run in MobilityDB (ST_MakePoint / ST_GeomFromWKB, tgeompoint, appendInstant); the parsing and decompression are covered by unit tests.
1 parent 726470b commit 7ebf4d3

6 files changed

Lines changed: 301 additions & 0 deletions

File tree

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,6 @@ widgetsnbextension==4.0.15
169169
xyzservices==2025.10.0
170170
yarl==1.22.0
171171
zipp==3.23.0
172+
Brotli==1.1.0
173+
pyarrow==24.0.0
174+
zstandard==0.23.0

resource/bulk/Create.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Bulk ingestion of a real-time fleet feed (extension, not in conformsTo):
2+
# POST /collections/{collectionId}/bulk
3+
#
4+
# The body is a batch of (vehicleId, position, time) observations encoded as
5+
# GeoJSON (a FeatureCollection of Point features) or GeoParquet (one row per
6+
# observation), optionally compressed via Content-Encoding (gzip, deflate, br,
7+
# zstd). Each observation is appended as one instant to the matching moving
8+
# feature's trajectory, creating the feature on first sight. The whole batch
9+
# commits atomically.
10+
import json
11+
12+
from resource.bulk.bulk_helper import (
13+
decompress, parse_geojson_points, parse_geoparquet, ensure_tables, append_observations)
14+
15+
16+
def post_bulk(self, collection_id, connection, cursor):
17+
try:
18+
cursor.execute("SELECT id FROM collections WHERE id = %s", (collection_id,))
19+
if cursor.fetchone() is None:
20+
self.handle_error(404, f"Collection '{collection_id}' not found")
21+
return
22+
23+
length = int(self.headers.get("Content-Length", 0))
24+
raw = self.rfile.read(length)
25+
try:
26+
body = decompress(raw, self.headers.get("Content-Encoding"))
27+
except ValueError as e:
28+
self.handle_error(415, str(e))
29+
return
30+
except ImportError as e:
31+
self.handle_error(415, f"Content-Encoding needs an optional library: {e}")
32+
return
33+
34+
ctype = (self.headers.get("Content-Type") or "").lower()
35+
if "parquet" in ctype:
36+
observations, srid = parse_geoparquet(body)
37+
fmt = "geoparquet"
38+
else:
39+
observations, srid = parse_geojson_points(body)
40+
fmt = "geojson"
41+
42+
ensure_tables(cursor)
43+
summary = append_observations(cursor, collection_id, observations, srid)
44+
connection.commit()
45+
46+
summary.update({"collection": collection_id, "format": fmt, "srid": srid})
47+
self.send_response(201)
48+
self.send_header("Content-Type", "application/json")
49+
self.end_headers()
50+
self.wfile.write(json.dumps(summary).encode("utf-8"))
51+
52+
except ValueError as e:
53+
connection.rollback()
54+
self.handle_error(400, str(e))
55+
except Exception as e:
56+
connection.rollback()
57+
msg = str(e)
58+
if "increasing" in msg or "overlap" in msg.lower() or "ordered" in msg.lower():
59+
self.handle_error(409, f"an observation is not strictly after the feature's last instant: {msg}")
60+
else:
61+
self.handle_error(500, f"Internal server error: {msg}")

resource/bulk/__init__.py

Whitespace-only changes.

resource/bulk/bulk_helper.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
"""Bulk ingestion helpers for a real-time fleet feed.
2+
3+
A city feed posts, on every tick, one (vehicleId, position, time) observation
4+
per vehicle as a GeoJSON Point feature or a GeoParquet row. Each observation is
5+
appended as one instant to that vehicle's moving feature, extending its
6+
`tgeompoint` trajectory in `temporal_geometries`. The geometry and temporal work
7+
run inside MobilityDB (ST_MakePoint / ST_GeomFromWKB, tgeompoint, appendInstant).
8+
"""
9+
import gzip
10+
import io
11+
import json
12+
import re
13+
import zlib
14+
15+
16+
def decompress(body, content_encoding):
17+
"""Transparently decode a compressed request body by its Content-Encoding.
18+
gzip and deflate use the standard library; br and zstd are supported when the
19+
optional library is installed.
20+
"""
21+
enc = (content_encoding or "").lower().strip()
22+
if not enc or enc == "identity":
23+
return body
24+
if enc in ("gzip", "x-gzip"):
25+
return gzip.decompress(body)
26+
if enc == "deflate":
27+
try:
28+
return zlib.decompress(body)
29+
except zlib.error:
30+
return zlib.decompress(body, -zlib.MAX_WBITS) # raw deflate stream
31+
if enc == "br":
32+
import brotli
33+
return brotli.decompress(body)
34+
if enc == "zstd":
35+
import zstandard
36+
return zstandard.ZstdDecompressor().decompress(body)
37+
raise ValueError(f"unsupported Content-Encoding: {enc}")
38+
39+
40+
def srid_from_crs(crs, default=4326):
41+
"""Extract an EPSG code from an OGC CRS object/string (e.g. EPSG::25832)."""
42+
if not crs:
43+
return default
44+
text = crs if isinstance(crs, str) else json.dumps(crs)
45+
m = re.search(r"EPSG\D*?(\d{4,5})", text)
46+
return int(m.group(1)) if m else default
47+
48+
49+
def _timestamp(feature, props):
50+
return (feature.get("when") or props.get("datetime") or props.get("timestamp")
51+
or props.get("time") or props.get("t"))
52+
53+
54+
def parse_geojson_points(body):
55+
"""A FeatureCollection of Point features, each with an id and a timestamp,
56+
into a list of {id, x, y, t} observations plus the SRID.
57+
"""
58+
gj = json.loads(body.decode("utf-8") if isinstance(body, (bytes, bytearray)) else body)
59+
if gj.get("type") != "FeatureCollection":
60+
raise ValueError("bulk GeoJSON ingest expects a FeatureCollection")
61+
srid = srid_from_crs(gj.get("crs"))
62+
observations = []
63+
for feat in gj.get("features", []):
64+
if feat.get("type") != "Feature":
65+
continue
66+
geom = feat.get("geometry") or {}
67+
if geom.get("type") != "Point":
68+
raise ValueError("bulk ingest expects Point geometries")
69+
coords = geom.get("coordinates") or []
70+
if len(coords) < 2:
71+
raise ValueError("a Point needs [x, y] coordinates")
72+
props = feat.get("properties") or {}
73+
ts = _timestamp(feat, props)
74+
if ts is None:
75+
raise ValueError("each feature needs a timestamp (properties.datetime)")
76+
fid = feat.get("id") if feat.get("id") is not None else props.get("id")
77+
if fid is None:
78+
raise ValueError("each feature needs an id (the vehicle identifier)")
79+
observations.append({"id": str(fid), "x": float(coords[0]), "y": float(coords[1]), "t": str(ts)})
80+
return observations, srid
81+
82+
83+
def parse_geoparquet(body, geom_col="geometry", id_col="id", time_col="ts"):
84+
"""A GeoParquet byte payload (one row per observation: WKB Point, id, ts) into
85+
{id, wkb, t} observations. The WKB is handed to PostGIS, not parsed here.
86+
"""
87+
import pyarrow.parquet as pq
88+
table = pq.read_table(io.BytesIO(body))
89+
for col in (geom_col, id_col, time_col):
90+
if col not in table.column_names:
91+
raise ValueError(f"GeoParquet is missing the '{col}' column")
92+
geoms = table.column(geom_col).to_pylist()
93+
ids = table.column(id_col).to_pylist()
94+
times = table.column(time_col).to_pylist()
95+
observations = []
96+
for g, fid, ts in zip(geoms, ids, times):
97+
if not g:
98+
raise ValueError("GeoParquet row is missing the geometry")
99+
observations.append({"id": str(fid), "wkb": bytes(g), "t": str(ts)})
100+
return observations, 4326
101+
102+
103+
# one instant appended to a tgeompoint trajectory; the point comes either from
104+
# x/y (GeoJSON) or from WKB handed to PostGIS (GeoParquet)
105+
_INST_XY = "tgeompoint(ST_SetSRID(ST_MakePoint(%s, %s), %s), %s::timestamptz)"
106+
_INST_WKB = "tgeompoint(ST_SetSRID(ST_GeomFromWKB(%s), %s), %s::timestamptz)"
107+
108+
109+
def _instant(observation, srid):
110+
if "wkb" in observation:
111+
return _INST_WKB, (observation["wkb"], srid, observation["t"])
112+
return _INST_XY, (observation["x"], observation["y"], srid, observation["t"])
113+
114+
115+
def ensure_tables(cursor):
116+
cursor.execute("""
117+
CREATE TABLE IF NOT EXISTS moving_features (
118+
id TEXT PRIMARY KEY,
119+
collection_id TEXT REFERENCES collections(id) ON DELETE CASCADE,
120+
type TEXT DEFAULT 'Feature',
121+
geometry geometry, properties JSONB, bbox JSONB,
122+
time_range TSTZRANGE, crs JSONB, trs JSONB,
123+
created_at TIMESTAMP DEFAULT NOW())""")
124+
cursor.execute("""
125+
CREATE TABLE IF NOT EXISTS temporal_geometries (
126+
id SERIAL PRIMARY KEY,
127+
feature_id TEXT REFERENCES moving_features(id) ON DELETE CASCADE,
128+
collection_id TEXT REFERENCES collections(id) ON DELETE CASCADE,
129+
geometry_type TEXT, geometry geometry, trajectory tgeompoint,
130+
interpolation TEXT, base JSONB,
131+
created_at TIMESTAMP DEFAULT NOW())""")
132+
133+
134+
def append_observations(cursor, collection_id, observations, srid):
135+
"""Append each observation as one instant, creating the feature/trajectory on
136+
first sight and extending it with appendInstant afterwards. Runs inside the
137+
caller's transaction so the whole batch commits atomically.
138+
"""
139+
created, extended = set(), 0
140+
for o in observations:
141+
inst, args = _instant(o, srid)
142+
cursor.execute(
143+
"INSERT INTO moving_features (id, collection_id, type) VALUES (%s, %s, 'Feature') "
144+
"ON CONFLICT (id) DO NOTHING RETURNING id", (o["id"], collection_id))
145+
if cursor.fetchone() is not None:
146+
created.add(o["id"])
147+
cursor.execute(
148+
f"UPDATE temporal_geometries SET trajectory = appendInstant(trajectory, {inst}) "
149+
"WHERE feature_id = %s RETURNING id", (*args, o["id"]))
150+
if cursor.fetchone() is None:
151+
cursor.execute(
152+
"INSERT INTO temporal_geometries "
153+
"(feature_id, collection_id, geometry_type, trajectory, interpolation) "
154+
f"VALUES (%s, %s, 'MovingPoint', {inst}, 'Linear')",
155+
(o["id"], collection_id, *args))
156+
extended += 1
157+
return {"observations": extended, "featuresCreated": len(created),
158+
"featuresExtended": extended - len(created)}

server.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from resource.moving_feature.Delete import delete_single_moving_feature
1919
from resource.temporal_geom_seq.Retrieve import get_tgsequence
2020
from resource.temporal_geom_seq.Create import post_tgsequence
21+
from resource.bulk.Create import post_bulk
2122
from resource.temporal_prim_geom.Delete import delete_single_temporal_primitive_geo
2223
#
2324
from resource.temporal_properties.Retrieve import get_tproperties
@@ -154,6 +155,11 @@ def do_POST(self):
154155
elif self.path == '/collections':
155156
self.post_collections(connection, cursor)
156157

158+
# ============================================ BULK INGEST (extension) ====================================================
159+
elif self.path.startswith('/collections/') and self.path.endswith('/bulk'):
160+
collection_id = self.path.split('/')[2]
161+
self.post_bulk(collection_id, connection, cursor)
162+
157163
# ================================================ MOVING FEATURES ========================================================
158164
elif '/items' in self.path and self.path.startswith('/collections/'):
159165
collection_id = self.path.split('/')[2]
@@ -288,6 +294,9 @@ def get_tgsequence(self, connection, cursor):
288294
#Post:
289295
def post_tgsequence(self,connection, cursor):
290296
post_tgsequence(self, connection, cursor)
297+
298+
def post_bulk(self, collection_id, connection, cursor):
299+
post_bulk(self, collection_id, connection, cursor)
291300

292301
## Resource Temporal Geometry Query
293302
#Get

tests/test_bulk.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Unit tests for the bulk-ingest parsing and decompression, with no server or
2+
database: the SQL append path is exercised by the integration suite and shares
3+
MobilityDB's appendInstant with the rest of the tier.
4+
"""
5+
import gzip
6+
import io
7+
import zlib
8+
9+
import pyarrow as pa
10+
import pyarrow.parquet as pq
11+
12+
from resource.bulk.bulk_helper import (
13+
decompress, parse_geojson_points, parse_geoparquet, _instant)
14+
15+
GEOJSON = (
16+
b'{"type":"FeatureCollection","features":['
17+
b'{"type":"Feature","id":"bus_42","geometry":{"type":"Point","coordinates":[4.3517,50.8466]},'
18+
b'"properties":{"datetime":"2026-02-26T10:00:00Z"}},'
19+
b'{"type":"Feature","geometry":{"type":"Point","coordinates":[4.349,50.8501]},'
20+
b'"properties":{"id":"bus_57","time":"2026-02-26T10:00:00Z"}}]}'
21+
)
22+
23+
24+
def test_decompress_gzip_deflate_identity():
25+
assert decompress(GEOJSON, None) == GEOJSON
26+
assert decompress(GEOJSON, "identity") == GEOJSON
27+
assert decompress(gzip.compress(GEOJSON), "gzip") == GEOJSON
28+
assert decompress(zlib.compress(GEOJSON), "deflate") == GEOJSON
29+
# raw (headerless) deflate is accepted via the fallback
30+
co = zlib.compressobj(wbits=-zlib.MAX_WBITS)
31+
raw = co.compress(GEOJSON) + co.flush()
32+
assert decompress(raw, "deflate") == GEOJSON
33+
34+
35+
def test_decompress_unsupported():
36+
try:
37+
decompress(GEOJSON, "lzma")
38+
except ValueError:
39+
return
40+
raise AssertionError("unsupported Content-Encoding should raise ValueError")
41+
42+
43+
def test_parse_geojson_points():
44+
obs, srid = parse_geojson_points(GEOJSON)
45+
assert srid == 4326
46+
assert len(obs) == 2
47+
assert obs[0] == {"id": "bus_42", "x": 4.3517, "y": 50.8466, "t": "2026-02-26T10:00:00Z"}
48+
assert obs[1]["id"] == "bus_57" # id and time taken from properties
49+
50+
51+
def test_parse_geoparquet():
52+
table = pa.table({
53+
"geometry": pa.array([b"\x01\x02\x03", b"\x04\x05\x06"], type=pa.binary()),
54+
"id": ["bus_42", "bus_57"],
55+
"ts": ["2026-02-26T10:00:00Z", "2026-02-26T10:01:00Z"],
56+
})
57+
buf = io.BytesIO()
58+
pq.write_table(table, buf)
59+
obs, srid = parse_geoparquet(buf.getvalue())
60+
assert srid == 4326
61+
assert len(obs) == 2
62+
assert obs[0]["id"] == "bus_42" and obs[0]["wkb"] == b"\x01\x02\x03"
63+
assert obs[1]["t"] == "2026-02-26T10:01:00Z"
64+
65+
66+
def test_instant_sql_fragment():
67+
xy, args = _instant({"x": 4.35, "y": 50.84, "t": "2026-02-26T10:00:00Z"}, 4326)
68+
assert "ST_MakePoint" in xy and args == (4.35, 50.84, 4326, "2026-02-26T10:00:00Z")
69+
wkb, wargs = _instant({"wkb": b"\x01", "t": "2026-02-26T10:00:00Z"}, 4326)
70+
assert "ST_GeomFromWKB" in wkb and wargs == (b"\x01", 4326, "2026-02-26T10:00:00Z")

0 commit comments

Comments
 (0)