diff --git a/backend/app/routers/datalakes.py b/backend/app/routers/datalakes.py index 1371c5a..15b4104 100644 --- a/backend/app/routers/datalakes.py +++ b/backend/app/routers/datalakes.py @@ -1147,6 +1147,158 @@ async def search_term( raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") +# ── Wiki Revisions endpoints ───────────────────────────────────────────────── +# Registered as "wiki_revisions" datalake; Hive-partitioned by identifier. + + +async def _get_revisions_path(db: AsyncSession) -> str: + """Look up revisions data path from datalake DB.""" + query = select(Datalake).where(Datalake.dataset_id == "wiki_revisions") + result = await db.execute(query) + datalake = result.scalar_one_or_none() + if not datalake: + raise HTTPException(status_code=404, detail="wiki_revisions datalake not found") + return datalake.data_location + + +@router.get("/wikigrams/revisions") +async def list_revision_articles( + min_revisions: int = Query(default=1, description="Minimum revision count filter"), + limit: int = Query(default=100, description="Max articles to return"), + db: AsyncSession = Depends(get_db_session), +): + """List articles with extracted revision histories.""" + try: + revisions_path = await _get_revisions_path(db) + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + start_time = time.time() + + rows = conn.execute(f""" + SELECT identifier, + ANY_VALUE(name) as name, + COUNT(*) as revision_count, + MIN(date_modified) as first_edit, + MAX(date_modified) as last_edit + FROM read_parquet('{revisions_path}/**/*.parquet', hive_partitioning=true) + GROUP BY identifier + HAVING COUNT(*) >= ? + ORDER BY revision_count DESC + LIMIT ? + """, [min_revisions, limit]).fetchall() + + duration = (time.time() - start_time) * 1000 + + return { + "articles": [ + { + "identifier": r[0], + "name": r[1], + "revision_count": r[2], + "first_edit": r[3], + "last_edit": r[4], + } + for r in rows + ], + "duration": duration, + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + +@router.get("/wikigrams/revisions/{identifier}") +async def get_revision_deltas( + identifier: str, + db: AsyncSession = Depends(get_db_session), +): + """Delta-encoded revision history for one article. + + Returns one entry per revision. The first revision (revision_idx=0) contains + the full token map. Subsequent revisions contain only changed tokens + (value 0 = token removed). + """ + try: + revisions_path = await _get_revisions_path(db) + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + start_time = time.time() + + rows = conn.execute(f""" + WITH ordered AS ( + SELECT *, + ROW_NUMBER() OVER (ORDER BY revision_id::BIGINT) - 1 AS rev_seq, + json(ngram_counts)::MAP(VARCHAR, INTEGER) AS m + FROM read_parquet('{revisions_path}/identifier={identifier}/*.parquet') + ), + curr AS ( + SELECT rev_seq, + unnest(map_keys(m)) AS token, + unnest(map_values(m)) AS curr_count + FROM ordered + ), + prev AS ( + SELECT rev_seq + 1 AS rev_seq, + unnest(map_keys(m)) AS token, + unnest(map_values(m)) AS prev_count + FROM ordered + ), + diffs AS ( + SELECT COALESCE(c.rev_seq, p.rev_seq) AS rev_seq, + COALESCE(c.token, p.token) AS token, + COALESCE(c.curr_count, 0) AS new_count + FROM curr c + FULL OUTER JOIN prev p + ON c.rev_seq = p.rev_seq AND c.token = p.token + WHERE prev_count IS NULL + OR curr_count IS NULL + OR curr_count != prev_count + ), + delta_agg AS ( + SELECT rev_seq, + json_group_object(token, new_count) AS delta + FROM diffs + GROUP BY rev_seq + ) + SELECT o.revision_id, + o.name, + o.date_modified, + o.revision_comment, + o.categories, + COALESCE(d.delta, '{{}}') AS token_diff + FROM ordered o + LEFT JOIN delta_agg d ON o.rev_seq = d.rev_seq + ORDER BY o.rev_seq + """).fetchall() + + if not rows: + raise HTTPException(status_code=404, detail=f"No revisions found for identifier {identifier}") + + duration = (time.time() - start_time) * 1000 + + return { + "revisions": [ + { + "revision_id": r[0], + "name": r[1], + "date_modified": r[2], + "revision_comment": r[3], + "categories": r[4], + "token_diff": r[5], + } + for r in rows + ], + "duration": duration, + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + @router.get("/{dataset_id}/validate-sources") async def validate_datalake_sources( dataset_id: str,