Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions backend/app/routers/datalakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,158 @@ async def search_term(
raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}")


# ── Wiki Revisions endpoints ─────────────────────────────────────────────────
# Registered as "wiki_revisions" datalake; Hive-partitioned by identifier.


async def _get_revisions_path(db: AsyncSession) -> str:
"""Look up revisions data path from datalake DB."""
query = select(Datalake).where(Datalake.dataset_id == "wiki_revisions")
result = await db.execute(query)
datalake = result.scalar_one_or_none()
if not datalake:
raise HTTPException(status_code=404, detail="wiki_revisions datalake not found")
return datalake.data_location


@router.get("/wikigrams/revisions")
async def list_revision_articles(
min_revisions: int = Query(default=1, description="Minimum revision count filter"),
limit: int = Query(default=100, description="Max articles to return"),
db: AsyncSession = Depends(get_db_session),
):
"""List articles with extracted revision histories."""
try:
revisions_path = await _get_revisions_path(db)
duckdb_client = get_duckdb_client()
conn = duckdb_client.connect()

start_time = time.time()

rows = conn.execute(f"""
SELECT identifier,
ANY_VALUE(name) as name,
COUNT(*) as revision_count,
MIN(date_modified) as first_edit,
MAX(date_modified) as last_edit
FROM read_parquet('{revisions_path}/**/*.parquet', hive_partitioning=true)
GROUP BY identifier
HAVING COUNT(*) >= ?
ORDER BY revision_count DESC
LIMIT ?
""", [min_revisions, limit]).fetchall()

duration = (time.time() - start_time) * 1000

return {
"articles": [
{
"identifier": r[0],
"name": r[1],
"revision_count": r[2],
"first_edit": r[3],
"last_edit": r[4],
}
for r in rows
],
"duration": duration,
}

except Exception as e:
raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}")


@router.get("/wikigrams/revisions/{identifier}")
async def get_revision_deltas(
identifier: str,
db: AsyncSession = Depends(get_db_session),
):
"""Delta-encoded revision history for one article.

Returns one entry per revision. The first revision (revision_idx=0) contains
the full token map. Subsequent revisions contain only changed tokens
(value 0 = token removed).
"""
try:
revisions_path = await _get_revisions_path(db)
duckdb_client = get_duckdb_client()
conn = duckdb_client.connect()

start_time = time.time()

rows = conn.execute(f"""
WITH ordered AS (
SELECT *,
ROW_NUMBER() OVER (ORDER BY revision_id::BIGINT) - 1 AS rev_seq,
json(ngram_counts)::MAP(VARCHAR, INTEGER) AS m
FROM read_parquet('{revisions_path}/identifier={identifier}/*.parquet')
),
curr AS (
SELECT rev_seq,
unnest(map_keys(m)) AS token,
unnest(map_values(m)) AS curr_count
FROM ordered
),
prev AS (
SELECT rev_seq + 1 AS rev_seq,
unnest(map_keys(m)) AS token,
unnest(map_values(m)) AS prev_count
FROM ordered
),
diffs AS (
SELECT COALESCE(c.rev_seq, p.rev_seq) AS rev_seq,
COALESCE(c.token, p.token) AS token,
COALESCE(c.curr_count, 0) AS new_count
FROM curr c
FULL OUTER JOIN prev p
ON c.rev_seq = p.rev_seq AND c.token = p.token
WHERE prev_count IS NULL
OR curr_count IS NULL
OR curr_count != prev_count
),
delta_agg AS (
SELECT rev_seq,
json_group_object(token, new_count) AS delta
FROM diffs
GROUP BY rev_seq
)
SELECT o.revision_id,
o.name,
o.date_modified,
o.revision_comment,
o.categories,
COALESCE(d.delta, '{{}}') AS token_diff
FROM ordered o
LEFT JOIN delta_agg d ON o.rev_seq = d.rev_seq
ORDER BY o.rev_seq
""").fetchall()

if not rows:
raise HTTPException(status_code=404, detail=f"No revisions found for identifier {identifier}")

duration = (time.time() - start_time) * 1000

return {
"revisions": [
{
"revision_id": r[0],
"name": r[1],
"date_modified": r[2],
"revision_comment": r[3],
"categories": r[4],
"token_diff": r[5],
}
for r in rows
],
"duration": duration,
}

except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}")


@router.get("/{dataset_id}/validate-sources")
async def validate_datalake_sources(
dataset_id: str,
Expand Down