From b20f35308c6d1f0770566f2b01c9d20826e55068 Mon Sep 17 00:00:00 2001 From: jstonge Date: Sat, 7 Mar 2026 13:31:12 -0500 Subject: [PATCH 1/4] revisions endpoints --- backend/app/routers/datalakes.py | 156 +++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/backend/app/routers/datalakes.py b/backend/app/routers/datalakes.py index 1371c5a..b3478ff 100644 --- a/backend/app/routers/datalakes.py +++ b/backend/app/routers/datalakes.py @@ -1147,6 +1147,162 @@ async def search_term( raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") +# ── Wiki Revisions endpoints ───────────────────────────────────────────────── +# Registered as "wiki_revisions" datalake; data lives under data_location/data/ + + +async def _get_revisions_path(db: AsyncSession) -> str: + """Look up revisions data path from datalake DB.""" + query = select(Datalake).where(Datalake.dataset_id == "wiki_revisions") + result = await db.execute(query) + datalake = result.scalar_one_or_none() + if not datalake: + raise HTTPException(status_code=404, detail="wiki_revisions datalake not found") + return f"{datalake.data_location}/data" + + +@router.get("/wikigrams/revisions") +async def list_revision_articles( + min_revisions: int = Query(default=1, description="Minimum revision count filter"), + limit: int = Query(default=100, description="Max articles to return"), + db: AsyncSession = Depends(get_db_session), +): + """List articles with extracted revision histories.""" + try: + revisions_path = await _get_revisions_path(db) + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + start_time = time.time() + + rows = conn.execute(f""" + SELECT identifier, + ANY_VALUE(name) as name, + ANY_VALUE(url) as url, + COUNT(*) as revision_count, + MIN(date_modified) as first_edit, + MAX(date_modified) as last_edit + FROM read_parquet('{revisions_path}/**/*.parquet', hive_partitioning=true) + GROUP BY identifier + HAVING COUNT(*) >= ? + ORDER BY revision_count DESC + LIMIT ? + """, [min_revisions, limit]).fetchall() + + duration = (time.time() - start_time) * 1000 + + return { + "articles": [ + { + "identifier": r[0], + "name": r[1], + "url": r[2], + "revision_count": r[3], + "first_edit": r[4], + "last_edit": r[5], + } + for r in rows + ], + "duration": duration, + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + +@router.get("/wikigrams/revisions/{identifier}") +async def get_revision_deltas( + identifier: str, + db: AsyncSession = Depends(get_db_session), +): + """Delta-encoded revision history for one article. + + Returns one entry per revision. The first revision (revision_idx=0) contains + the full token map. Subsequent revisions contain only changed tokens + (value 0 = token removed). + """ + try: + revisions_path = await _get_revisions_path(db) + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + start_time = time.time() + + rows = conn.execute(f""" + WITH ordered AS ( + SELECT *, + ROW_NUMBER() OVER (ORDER BY date_modified, revision_id) - 1 AS rev_seq, + json(ngram_counts)::MAP(VARCHAR, INTEGER) AS m + FROM read_parquet('{revisions_path}/identifier={identifier}/*.parquet') + ), + curr AS ( + SELECT rev_seq, + unnest(map_keys(m)) AS token, + unnest(map_values(m)) AS curr_count + FROM ordered + ), + prev AS ( + SELECT rev_seq + 1 AS rev_seq, + unnest(map_keys(m)) AS token, + unnest(map_values(m)) AS prev_count + FROM ordered + ), + diffs AS ( + SELECT COALESCE(c.rev_seq, p.rev_seq) AS rev_seq, + COALESCE(c.token, p.token) AS token, + COALESCE(c.curr_count, 0) AS new_count + FROM curr c + FULL OUTER JOIN prev p + ON c.rev_seq = p.rev_seq AND c.token = p.token + WHERE prev_count IS NULL + OR curr_count IS NULL + OR curr_count != prev_count + ), + delta_agg AS ( + SELECT rev_seq, + json_group_object(token, new_count) AS delta + FROM diffs + GROUP BY rev_seq + ) + SELECT o.rev_seq AS revision_idx, + o.revision_id, + o.name, + o.date_modified, + o.revision_comment, + o.total_tokens, + COALESCE(d.delta, '{{}}') AS delta + FROM ordered o + LEFT JOIN delta_agg d ON o.rev_seq = d.rev_seq + ORDER BY o.rev_seq + """).fetchall() + + if not rows: + raise HTTPException(status_code=404, detail=f"No revisions found for identifier {identifier}") + + duration = (time.time() - start_time) * 1000 + + return { + "revisions": [ + { + "revision_idx": r[0], + "revision_id": r[1], + "name": r[2], + "date_modified": r[3], + "comment": r[4], + "total_tokens": r[5], + "delta": r[6], + } + for r in rows + ], + "duration": duration, + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + @router.get("/{dataset_id}/validate-sources") async def validate_datalake_sources( dataset_id: str, From c6ebb06baccfe1784b84c745dde6bca9bb74ea5e Mon Sep 17 00:00:00 2001 From: jstonge Date: Sat, 7 Mar 2026 13:32:00 -0500 Subject: [PATCH 2/4] typo --- backend/app/routers/datalakes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/app/routers/datalakes.py b/backend/app/routers/datalakes.py index b3478ff..dd4b986 100644 --- a/backend/app/routers/datalakes.py +++ b/backend/app/routers/datalakes.py @@ -1148,7 +1148,7 @@ async def search_term( # ── Wiki Revisions endpoints ───────────────────────────────────────────────── -# Registered as "wiki_revisions" datalake; data lives under data_location/data/ +# Registered as "wiki_revisions" datalake; Hive-partitioned by identifier. async def _get_revisions_path(db: AsyncSession) -> str: @@ -1158,7 +1158,7 @@ async def _get_revisions_path(db: AsyncSession) -> str: datalake = result.scalar_one_or_none() if not datalake: raise HTTPException(status_code=404, detail="wiki_revisions datalake not found") - return f"{datalake.data_location}/data" + return datalake.data_location @router.get("/wikigrams/revisions") From 425828a3d467aa7c3c18b7032bc3c502e69a162e Mon Sep 17 00:00:00 2001 From: jstonge Date: Sat, 7 Mar 2026 14:40:32 -0500 Subject: [PATCH 3/4] adding some more metadata --- backend/app/routers/datalakes.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/backend/app/routers/datalakes.py b/backend/app/routers/datalakes.py index dd4b986..85a4d6e 100644 --- a/backend/app/routers/datalakes.py +++ b/backend/app/routers/datalakes.py @@ -1231,7 +1231,7 @@ async def get_revision_deltas( rows = conn.execute(f""" WITH ordered AS ( SELECT *, - ROW_NUMBER() OVER (ORDER BY date_modified, revision_id) - 1 AS rev_seq, + ROW_NUMBER() OVER (ORDER BY revision_id::BIGINT) - 1 AS rev_seq, json(ngram_counts)::MAP(VARCHAR, INTEGER) AS m FROM read_parquet('{revisions_path}/identifier={identifier}/*.parquet') ), @@ -1264,13 +1264,12 @@ async def get_revision_deltas( FROM diffs GROUP BY rev_seq ) - SELECT o.rev_seq AS revision_idx, - o.revision_id, + SELECT o.revision_id, o.name, o.date_modified, o.revision_comment, - o.total_tokens, - COALESCE(d.delta, '{{}}') AS delta + o.categories, + COALESCE(d.delta, '{{}}') AS token_diff FROM ordered o LEFT JOIN delta_agg d ON o.rev_seq = d.rev_seq ORDER BY o.rev_seq @@ -1284,13 +1283,12 @@ async def get_revision_deltas( return { "revisions": [ { - "revision_idx": r[0], - "revision_id": r[1], - "name": r[2], - "date_modified": r[3], - "comment": r[4], - "total_tokens": r[5], - "delta": r[6], + "revision_id": r[0], + "name": r[1], + "date_modified": r[2], + "revision_comment": r[3], + "categories": r[4], + "token_diff": r[5], } for r in rows ], From 1721c2ae2d13258a84e84f15d0559d4cfa34b100 Mon Sep 17 00:00:00 2001 From: jstonge Date: Sat, 7 Mar 2026 14:56:14 -0500 Subject: [PATCH 4/4] add back url --- backend/app/routers/datalakes.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/backend/app/routers/datalakes.py b/backend/app/routers/datalakes.py index 85a4d6e..15b4104 100644 --- a/backend/app/routers/datalakes.py +++ b/backend/app/routers/datalakes.py @@ -1178,7 +1178,6 @@ async def list_revision_articles( rows = conn.execute(f""" SELECT identifier, ANY_VALUE(name) as name, - ANY_VALUE(url) as url, COUNT(*) as revision_count, MIN(date_modified) as first_edit, MAX(date_modified) as last_edit @@ -1196,10 +1195,9 @@ async def list_revision_articles( { "identifier": r[0], "name": r[1], - "url": r[2], - "revision_count": r[3], - "first_edit": r[4], - "last_edit": r[5], + "revision_count": r[2], + "first_edit": r[3], + "last_edit": r[4], } for r in rows ],