Skip to content

Commit 14f1144

Browse files
Stream a collection's features as an NDJSON lakehouse feed at GET .../export
GET /collections/{collectionId}/export writes the collection's moving features as NDJSON — one Feature per line, the temporal geometry as MF-JSON — from a server-side cursor, so memory stays bounded for any collection size and a lake consumer (DuckDB / MobilityDuck / Spark) can ingest the stream directly.
1 parent ef707b4 commit 14f1144

2 files changed

Lines changed: 59 additions & 1 deletion

File tree

resource/collection/Export.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# GET /collections/{collectionId}/export
2+
# Lakehouse bulk feed: the collection's moving features streamed as NDJSON (one
3+
# Feature per line, the temporal geometry as MF-JSON) from a server-side cursor,
4+
# so memory is bounded regardless of collection size. A lake consumer (DuckDB /
5+
# MobilityDuck / Spark) ingests the stream directly.
6+
import json
7+
8+
from utils import handle_error
9+
10+
11+
def export_collection(self, collection_id, connection, cursor):
12+
cursor.execute("SELECT id FROM collections WHERE id = %s", (collection_id,))
13+
if cursor.fetchone() is None:
14+
handle_error(self, 404, f"Collection '{collection_id}' not found")
15+
return
16+
17+
self.send_response(200)
18+
self.send_header("Content-Type", "application/x-ndjson")
19+
self.end_headers()
20+
21+
# Server-side cursor: rows are fetched in batches, so a large collection
22+
# streams at bounded memory.
23+
stream = connection.cursor(name="mfapi_export")
24+
stream.itersize = 1000
25+
try:
26+
stream.execute(
27+
"""
28+
SELECT mf.id,
29+
coalesce(mf.properties, '{}'::jsonb)::text,
30+
asMFJSON(tg.trajectory)
31+
FROM moving_features mf
32+
LEFT JOIN temporal_geometries tg
33+
ON mf.id = tg.feature_id AND mf.collection_id = tg.collection_id
34+
WHERE mf.collection_id = %s
35+
ORDER BY mf.id
36+
""",
37+
(collection_id,),
38+
)
39+
for fid, properties, tgeom in stream:
40+
feature = {
41+
"type": "Feature",
42+
"id": fid,
43+
"properties": json.loads(properties),
44+
"temporalGeometry": json.loads(tgeom) if tgeom else None,
45+
}
46+
self.wfile.write((json.dumps(feature) + "\n").encode("utf-8"))
47+
finally:
48+
stream.close()

server.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from resource.collections.Create import post_collections
1010
from resource.collections.Retrieve import get_collections
1111
from resource.collection.Retrieve import get_collection_id
12+
from resource.collection.Export import export_collection
1213
from resource.collection.Delete import delete_collection
1314
from resource.collection.Replace import put_collection
1415
from resource.moving_features.Create import post_collection_items, insert_feature
@@ -143,7 +144,13 @@ def do_GET(self):
143144
elif self.path == '/collections':
144145
self.get_collections(connection, cursor)
145146
return
146-
147+
148+
# /collections/{collectionId}/export — lakehouse NDJSON feed
149+
elif self.path.startswith('/collections/') and urlparse(self.path).path.endswith('/export'):
150+
collection_id = urlparse(self.path).path.split('/')[2]
151+
self.export_collection(collection_id, connection, cursor)
152+
return
153+
147154
# /collections/{collectionId}
148155
elif self.path.startswith('/collections/'):
149156
path_only = urlparse(self.path).path
@@ -378,6 +385,9 @@ def delete_single_moving_feature(self, collectionId, mFeature_id, connection, cu
378385
def put_single_moving_feature(self, collectionId, mFeature_id, connection, cursor):
379386
put_single_moving_feature(self, collectionId, mFeature_id, connection, cursor)
380387

388+
def export_collection(self, collection_id, connection, cursor):
389+
export_collection(self, collection_id, connection, cursor)
390+
381391

382392
## Resource Temporal Geometry Sequence
383393
#Get

0 commit comments

Comments
 (0)