diff --git a/backend/app/core/parquet_utils.py b/backend/app/core/parquet_utils.py new file mode 100644 index 0000000..c91265a --- /dev/null +++ b/backend/app/core/parquet_utils.py @@ -0,0 +1,105 @@ +""" +General-purpose DuckDB / Hive-parquet utilities shared across routers. + +These helpers are dataset-agnostic — they work with any parquet_hive or +ducklake-format dataset registered in the system. +""" + +from datetime import datetime, timedelta +from typing import List, Tuple +from fastapi import HTTPException + + +def get_parquet_paths(dataset, data_table_name: str) -> Tuple[List[str], List[str]]: + """Construct parquet file paths for data table and adapter table. + + For parquet_hive format: paths in tables_metadata are absolute; adapter comes from entity_mapping.path. + For ducklake format: paths are relative filenames; prepend data_location + /main/ + table_name/. + """ + if not dataset.tables_metadata: + raise HTTPException( + status_code=500, + detail="Dataset metadata is missing. Please re-register the dataset with proper tables_metadata." + ) + + data_fnames = dataset.tables_metadata.get(data_table_name) + + if not data_fnames: + raise HTTPException( + status_code=500, + detail=f"Missing {data_table_name} file paths. Required: tables_metadata.{data_table_name}" + ) + + if dataset.data_format == "parquet_hive": + data_path = data_fnames + if not dataset.entity_mapping or not dataset.entity_mapping.get("path"): + raise HTTPException( + status_code=500, + detail="Missing entity_mapping.path for parquet_hive format. Please re-register with entity_mapping." + ) + adapter_path = [dataset.entity_mapping["path"]] + else: + # ducklake format: relative filenames, prepend data_location + # NOTE: Don't URL-decode - the filesystem actually has %20 in directory names + adapter_fnames = dataset.tables_metadata.get("adapter") + if not adapter_fnames: + raise HTTPException( + status_code=500, + detail="Missing adapter file paths. Required: tables_metadata.adapter" + ) + data_path = [ + f"{dataset.data_location}/main/{data_table_name}/{fname}" + for fname in data_fnames + ] + adapter_path = [ + f"{dataset.data_location}/main/adapter/{fname}" + for fname in adapter_fnames + ] + + return data_path, adapter_path + + +def compute_partition_starts(start_date: str, end_date: str, granularity: str) -> List[str]: + """Compute partition start dates covering [start_date, end_date] using date arithmetic. + + Assumes no gaps in the data. For weekly/monthly granularities, snaps start_date + back to the nearest partition boundary so that partial periods at the edges are included. + + Args: + start_date: Start of the date range (YYYY-MM-DD). + end_date: End of the date range (YYYY-MM-DD) + granularity: One of "daily", "weekly", "monthly" + + Returns: + Sorted list of partition start dates + + Example: + >>> compute_partition_starts("2024-10-07", "2024-10-27", "weekly") + ["2024-10-07", "2024-10-14", "2024-10-21"] + """ + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + if granularity == "daily": + current = start + elif granularity == "weekly": + current = start - timedelta(days=start.weekday()) + elif granularity == "monthly": + current = start.replace(day=1) + else: + raise ValueError(f"Unknown granularity: {granularity}") + + partitions = [] + while current <= end: + partitions.append(current.strftime("%Y-%m-%d")) + if granularity == "daily": + current += timedelta(days=1) + elif granularity == "weekly": + current += timedelta(weeks=1) + elif granularity == "monthly": + if current.month == 12: + current = current.replace(year=current.year + 1, month=1, day=1) + else: + current = current.replace(month=current.month + 1, day=1) + + return partitions diff --git a/backend/app/main.py b/backend/app/main.py index c3dd4b1..c77a65a 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -13,7 +13,7 @@ limiter = Limiter(key_func=get_remote_address) # Import routers after limiter is defined so they can use it -from .routers import open_academic_analytics, datasets, auth, wikimedia, annotations, dark_data_survey, scisciDB, datalakes, interdisciplinarity, storywrangler +from .routers import open_academic_analytics, datasets, auth, wikimedia, annotations, dark_data_survey, scisciDB, datalakes, interdisciplinarity, storywrangler, babynames, registry app = FastAPI( title=settings.app_name, @@ -83,7 +83,9 @@ async def shutdown_event(): app.include_router(auth.router, prefix="/auth", tags=["authentication"], include_in_schema=False) app.include_router(open_academic_analytics.router, prefix="/open-academic-analytics", tags=["academics"]) app.include_router(datasets.router, prefix="/datasets", tags=["datasets"]) +app.include_router(registry.router, prefix="/registry", tags=["registry"]) app.include_router(datalakes.router, prefix="/datalakes", tags=["datalakes"]) +app.include_router(babynames.router, prefix="/babynames", tags=["babynames"]) app.include_router(wikimedia.router, prefix="/wikimedia", tags=["wikimedia"]) app.include_router(annotations.router, prefix="/annotations", tags=["annotations"]) app.include_router(scisciDB.router, prefix="/scisciDB", tags=["scisciDB"]) @@ -95,8 +97,8 @@ async def shutdown_event(): # Admin endpoints (secured with admin authentication) app.include_router(auth.admin_router, prefix="/admin/auth", tags=["admin"], include_in_schema=False) app.include_router(datasets.admin_router, prefix="/admin/datasets", tags=["admin"], include_in_schema=False) +app.include_router(registry.admin_router, prefix="/admin/registry", tags=["admin"], include_in_schema=False) app.include_router(scisciDB.admin_router, prefix="/admin/scisciDB", tags=["admin"], include_in_schema=False) -app.include_router(datalakes.admin_router, prefix="/admin/datalakes", tags=["admin"], include_in_schema=False) app.include_router(open_academic_analytics.admin_router, prefix="/admin/open-academic-analytics", tags=["admin"], include_in_schema=False) app.include_router(annotations.admin_router, prefix="/admin/annotations", tags=["admin"], include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 2b24834..78f878a 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -7,7 +7,7 @@ from .annotation_datasets import AcademicResearchGroups from .auth import User, AnnotationHistory from .sciscidb import FieldYearCount -from .datalakes import Datalake +from .registry import Dataset from .interdisciplinarity import PaperAnnotation from .cached_papers import CachedPaper @@ -21,4 +21,4 @@ "PaperAnnotation", "CachedPaper", "FieldYearCount", - "Datalake"] \ No newline at end of file + "Dataset"] \ No newline at end of file diff --git a/backend/app/models/datalakes.py b/backend/app/models/datalakes.py deleted file mode 100644 index 7904f95..0000000 --- a/backend/app/models/datalakes.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -Datalake registry models for tracking available datalakes. -""" - -from sqlalchemy import Column, String, DateTime, Text, JSON, ForeignKey -from sqlalchemy.sql import func -from ..core.database import Base - - -class Datalake(Base): - """Registry of available datalakes.""" - - __tablename__ = "datalakes" - - dataset_id = Column(String, primary_key=True) - data_location = Column(String, nullable=False) - data_format = Column(String, nullable=False, default="ducklake") - description = Column(Text) - tables_metadata = Column(JSON) # Store ducklake table metadata for version control - ducklake_data_path = Column(String) # Path to ducklake data directory - data_schema = Column(JSON) # Table schema (column_name -> type) for query reference - partitioning = Column(JSON) # Partitioning metadata - entity_mapping = Column(JSON) # Entity mapping configuration - sources = Column(JSON) # Source URLs for validation - created_at = Column(DateTime(timezone=True), server_default=func.now()) - updated_at = Column(DateTime(timezone=True), onupdate=func.now()) - - def __repr__(self): - return f"" - - -class EntityMapping(Base): - """Entity mappings for datalakes.""" - - __tablename__ = "entity_mappings" - - id = Column(String, primary_key=True) # dataset_id:local_id - dataset_id = Column(String, ForeignKey("datalakes.dataset_id"), nullable=False) - local_id = Column(String, nullable=False) - entity_id = Column(String, nullable=False) # standardized identifier - entity_name = Column(String, nullable=False) - entity_ids = Column(JSON) # alternate identifiers - created_at = Column(DateTime(timezone=True), server_default=func.now()) - - def __repr__(self): - return f"" \ No newline at end of file diff --git a/backend/app/models/registry.py b/backend/app/models/registry.py new file mode 100644 index 0000000..6d252e6 --- /dev/null +++ b/backend/app/models/registry.py @@ -0,0 +1,82 @@ +""" +Generic dataset registry. + +A Dataset is any external data backend registered with the platform. + +Supported formats: + - ducklake : DuckLake catalog (tabular, versioned, schema-aware) + - parquet_hive : Hive-partitioned parquet tree (tabular, file-based) + - duckdb : Plain DuckDB database file (no tables/schema metadata needed) + +Primary key is (domain, dataset_id) so the same logical name (e.g. "ngrams") can +exist in multiple domains without collision. + +Common fields (all formats): + domain, dataset_id, data_location, data_format, description, + entity_mapping, sources, created_at, updated_at. + +Structured-data fields (ducklake, parquet_hive): + tables_metadata, partitioning. + +DuckLake-specific: + ducklake_data_path, data_schema. +""" + +from sqlalchemy import Column, String, DateTime, Text, JSON, ForeignKeyConstraint +from sqlalchemy.sql import func +from ..core.database import Base + + +class Dataset(Base): + """Registry of available datasets across all supported backends.""" + + __tablename__ = "datasets" + + domain = Column(String, primary_key=True) # owning domain: wikimedia | storywrangler | ... + dataset_id = Column(String, primary_key=True) # name within domain: ngrams | revisions | ... + data_location = Column(String, nullable=False) # base path or connection string + data_format = Column(String, nullable=False) # ducklake | parquet_hive | duckdb + description = Column(Text) + + # Structured/tabular formats (ducklake, parquet_hive) + tables_metadata = Column(JSON) # table_name -> [file_paths] or version info + partitioning = Column(JSON) # partitioning keys and scheme + + # DuckLake-specific + ducklake_data_path = Column(String) # path to ducklake catalog (.duckdb file) + data_schema = Column(JSON) # column_name -> type, for query reference + + # Shared optional metadata + entity_mapping = Column(JSON) # {path, local_id_column, entity_id_column} + sources = Column(JSON) # source URLs for validation + endpoint_schemas = Column(JSON) # [{type, time_dimension, entity_dimensions, filter_dimensions}] + + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + def __repr__(self): + return f"" + + +class EntityMapping(Base): + """Entity mappings for datasets.""" + + __tablename__ = "entity_mappings" + __table_args__ = ( + ForeignKeyConstraint( + ["domain", "dataset_id"], + ["datasets.domain", "datasets.dataset_id"], + ), + ) + + id = Column(String, primary_key=True) # domain:dataset_id:local_id + domain = Column(String, nullable=False) + dataset_id = Column(String, nullable=False) + local_id = Column(String, nullable=False) + entity_id = Column(String, nullable=False) # standardized identifier + entity_name = Column(String, nullable=False) + entity_ids = Column(JSON) # alternate identifiers + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + def __repr__(self): + return f"" diff --git a/backend/app/routers/babynames.py b/backend/app/routers/babynames.py new file mode 100644 index 0000000..9738f58 --- /dev/null +++ b/backend/app/routers/babynames.py @@ -0,0 +1,159 @@ +""" +Babynames API endpoints. +""" + +from fastapi import APIRouter, HTTPException, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from typing import Optional + +from ..core.database import get_db_session +from ..core.duckdb_client import get_duckdb_client +from ..models.registry import Dataset + +router = APIRouter() + +BabynamesDataset = select(Dataset).where(Dataset.domain == "babynames") + +@router.get("/top-ngrams") +async def get_babynames_top_ngrams( + dates: str = Query(default="1991,1993"), # First date range + dates2: Optional[str] = Query(default=None), # Optional second date range + locations: str = Query(default="wikidata:Q30"), # Single location + sex: Optional[str] = 'M', + limit: int = 100, + db: AsyncSession = Depends(get_db_session) +): + """Get top baby names with flexible comparative analysis. + + Supports: + - Single date range: dates=["1950,1952"] + - Dual date ranges: dates=["1950,1952", "1991,1993"] (requires single location) + - Single location: locations=["wikidata:Q30"] + - Multiple locations: locations=["wikidata:Q30", "wikidata:Q16"] (requires single date range) + + Examples: + - Temporal comparison: dates=["1950,1952", "1991,1993"]&locations=["wikidata:Q30"] + - Geographic comparison: dates=["1950,1952"]&locations=["wikidata:Q30", "wikidata:Q16"] + - Simple query: dates=["1950,1952"]&locations=["wikidata:Q30"] + + Returns structured data for comparison visualization. + """ + + # Parse dates parameters + # dates="1991,1993" + date_ranges = [] + + # Parse first date range + years1 = [int(y) for y in dates.split(',')] + if len(years1) == 1: + years1.append(years1[0]) # Single year becomes range [year, year] + date_ranges.append(years1) + + # Parse optional second date range + if dates2: + years2 = [int(y) for y in dates2.split(',')] + if len(years2) == 1: + years2.append(years2[0]) # Single year becomes range [year, year] + date_ranges.append(years2) + + # Single location (no longer a list) + # locations="wikidata:Q176" + location_list = [locations] + + # Look up babynames dataset + query = BabynamesDataset.where(Dataset.dataset_id == "babynames") + result = await db.execute(query) + dataset_obj = result.scalar_one_or_none() + + if not dataset_obj: + raise HTTPException(status_code=404, detail="Babynames dataset not found") + + try: + # Get DuckDB connection + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + # Use stored metadata to get exact file paths for current versions + if not dataset_obj.tables_metadata: + raise HTTPException( + status_code=500, + detail="Dataset metadata is missing. Please re-register the dataset with proper tables_metadata." + ) + + # babynames_fnames=["ducklake-019af69d-b42c-7877-9073-3f440e2ee162.parquet", ...] + babynames_fnames = dataset_obj.tables_metadata.get("babynames") + adapter_fnames = dataset_obj.tables_metadata.get("adapter") + + if not babynames_fnames or not adapter_fnames: + raise HTTPException( + status_code=500, + detail="Missing babynames or adapter file paths. Required: tables_metadata.babynames and tables_metadata.adapter" + ) + + # Construct full paths by combining data_location with the filenames + # For ducklake format, files are stored in metadata.ducklake.files/main/ subdirectories + babynames_path = [ + f"{dataset_obj.data_location}/{dataset_obj.ducklake_data_path}/main/babynames/{fname}" for fname in babynames_fnames + ] + adapter_path = [ + f"{dataset_obj.data_location}/{dataset_obj.ducklake_data_path}/main/adapter/{fname}" for fname in adapter_fnames + ] + + # Execute comparative queries + results = {} + + # Query for each combination of date ranges and locations + for date_range in date_ranges: + for location in location_list: + # Create key for result structure + if len(date_ranges) > 1: + # Temporal comparison: use readable date format + if date_range[0] == date_range[1]: + key = str(date_range[0]) # Single year: "1990" + else: + key = f"{date_range[0]}-{date_range[1]}" # Range: "2010-2015" + elif len(location_list) > 1: + # Geographic comparison: use location ID + key = location.replace(":", "_").replace("-", "_") + else: + key = "data" # Single query, return simple format + + sql_query = f""" + SELECT + b.types, + SUM(b.counts) as counts + FROM read_parquet(?) b + LEFT JOIN read_parquet(?) a ON b.geo = a.local_id + WHERE b.year BETWEEN ? AND ? + AND a.entity_id = ? + """ + + if sex: + sql_query += " AND b.sex = ?" + + sql_query += f""" + GROUP BY b.types + ORDER BY counts DESC + LIMIT ? + """ + + cursor = conn.execute(sql_query, [babynames_path, adapter_path, date_range[0], date_range[1], location, sex, limit]) + query_results = cursor.fetchall() + + # Structure results for comparison or simple format + try: + if key == "data": + # Simple single query - return flat array for backwards compatibility + return [{"types": row[0], "counts": row[1]} for row in query_results] + else: + # Comparative query - return array directly under the key + results[key] = [{"types": row[0], "counts": row[1]} for row in query_results] + except Exception as format_error: + print(f"❌ Error formatting results: {format_error}") + raise + + return results + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") diff --git a/backend/app/routers/datalakes.py b/backend/app/routers/datalakes.py index 15b4104..9038a68 100644 --- a/backend/app/routers/datalakes.py +++ b/backend/app/routers/datalakes.py @@ -5,231 +5,19 @@ from fastapi import APIRouter, HTTPException, Depends, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select -from typing import Dict, Any, Optional, List, Union -from pydantic import BaseModel, Field -import httpx +from typing import Dict, Any, Optional, List import time from datetime import datetime, timedelta from urllib.parse import quote from ..core.database import get_db_session from ..core.duckdb_client import get_duckdb_client -from ..models.datalakes import Datalake, EntityMapping -from ..routers.auth import get_admin_user -from ..models.auth import User +from ..core.parquet_utils import get_parquet_paths, compute_partition_starts +from ..models.registry import Dataset as Datalake router = APIRouter() -admin_router = APIRouter() -# Helper functions for top-ngrams endpoints -def get_parquet_paths(datalake, data_table_name: str): - """Construct parquet file paths for data table and adapter table. - - For parquet_hive format: paths in tables_metadata are absolute; adapter comes from entity_mapping.path. - For ducklake format: paths are relative filenames; prepend data_location + /main/ + table_name/. - """ - if not datalake.tables_metadata: - raise HTTPException( - status_code=500, - detail="Datalake metadata is missing. Please re-register the datalake with proper tables_metadata." - ) - - data_fnames = datalake.tables_metadata.get(data_table_name) - - if not data_fnames: - raise HTTPException( - status_code=500, - detail=f"Missing {data_table_name} file paths. Required: tables_metadata.{data_table_name}" - ) - - if datalake.data_format == "parquet_hive": - # Paths are already absolute - data_path = data_fnames - # Adapter path comes from entity_mapping.path - if not datalake.entity_mapping or not datalake.entity_mapping.get("path"): - raise HTTPException( - status_code=500, - detail="Missing entity_mapping.path for parquet_hive format. Please re-register with entity_mapping." - ) - adapter_path = [datalake.entity_mapping["path"]] - else: - # ducklake format: relative filenames, prepend data_location - # NOTE: Don't URL-decode - the filesystem actually has %20 in directory names - adapter_fnames = datalake.tables_metadata.get("adapter") - if not adapter_fnames: - raise HTTPException( - status_code=500, - detail="Missing adapter file paths. Required: tables_metadata.adapter" - ) - data_path = [ - f"{datalake.data_location}/main/{data_table_name}/{fname}" - for fname in data_fnames - ] - adapter_path = [ - f"{datalake.data_location}/main/adapter/{fname}" - for fname in adapter_fnames - ] - - return data_path, adapter_path - -def format_results(query_results, key: str): - """Format query results into structured response.""" - formatted_results = [] - for row in query_results: - formatted_results.append({"types": row[0], "counts": row[1]}) - - if key == "data": - # Simple single query - return flat array - return formatted_results, True # True = early return - else: - # Comparative query - return under key - return {key: formatted_results}, False # False = continue aggregating - -def compute_partition_starts(start_date: str, end_date: str, granularity: str) -> List[str]: - """Compute partition start dates covering [start_date, end_date] using date arithmetic. - - Assumes no gaps in the data. For weekly/monthly granularities, snaps start_date - back to the nearest partition boundary so that partial periods at the edges are included. - - Args: - start_date: Start of the date range (YYYY-MM-DD). Should be a partition boundary - for top-ngrams queries; will be snapped for server-computed windows. - end_date: End of the date range (YYYY-MM-DD) - granularity: One of "daily", "weekly", "monthly" - - Returns: - Sorted list of partition start dates - - Example: - >>> compute_partition_starts("2024-10-07", "2024-10-27", "weekly") - ["2024-10-07", "2024-10-14", "2024-10-21"] - """ - start = datetime.strptime(start_date, "%Y-%m-%d") - end = datetime.strptime(end_date, "%Y-%m-%d") - - if granularity == "daily": - current = start - elif granularity == "weekly": - # Snap back to the Monday of the week containing start_date - current = start - timedelta(days=start.weekday()) - elif granularity == "monthly": - # Snap back to the first of the month containing start_date - current = start.replace(day=1) - else: - raise ValueError(f"Unknown granularity: {granularity}") - - partitions = [] - while current <= end: - partitions.append(current.strftime("%Y-%m-%d")) - if granularity == "daily": - current += timedelta(days=1) - elif granularity == "weekly": - current += timedelta(weeks=1) - elif granularity == "monthly": - if current.month == 12: - current = current.replace(year=current.year + 1, month=1, day=1) - else: - current = current.replace(month=current.month + 1, day=1) - - return partitions - - -class EntityMappingConfig(BaseModel): - table: Optional[str] = Field(None, description="DuckLake table name (ducklake format)") - path: Optional[str] = Field(None, description="Parquet file path (parquet_hive format)") - local_id_column: str = Field(...) - entity_id_column: str = Field(...) - - -class DatalakeCreate(BaseModel): - dataset_id: str - data_location: str - data_format: str = "ducklake" - description: Optional[str] = None - tables_metadata: Optional[Dict] = None - ducklake_data_path: Optional[str] = None - data_schema: Optional[Dict[str, str]] = None # Column name -> type mapping - partitioning: Optional[Dict] = None # Partitioning metadata - entity_mapping: Optional[EntityMappingConfig] = None - sources: Optional[Dict[str, Dict[str, Union[str, List[str]]]]] = None - - -@router.get("/") -async def list_datalakes(db: AsyncSession = Depends(get_db_session)): - """List all registered datalakes.""" - query = select(Datalake).order_by(Datalake.dataset_id) - result = await db.execute(query) - datalakes = result.scalars().all() - - return { - "datalakes": [ - { - "dataset_id": dl.dataset_id, - "data_location": dl.data_location, - "data_format": dl.data_format, - "description": dl.description, - "created_at": dl.created_at, - "updated_at": dl.updated_at - } - for dl in datalakes - ], - "total": len(datalakes) - } - - -@admin_router.post("/") -async def register_datalake( - datalake: DatalakeCreate, - current_user: User = Depends(get_admin_user), - db: AsyncSession = Depends(get_db_session) -): - """Register a new datalake or update existing one.""" - # Check if dataset_id already exists - existing_result = await db.execute( - select(Datalake).where(Datalake.dataset_id == datalake.dataset_id) - ) - existing_datalake = existing_result.scalar_one_or_none() - - if existing_datalake: - # Update existing datalake - existing_datalake.data_location = datalake.data_location - existing_datalake.data_format = datalake.data_format - existing_datalake.description = datalake.description - existing_datalake.tables_metadata = datalake.tables_metadata - existing_datalake.ducklake_data_path = datalake.ducklake_data_path - existing_datalake.data_schema = datalake.data_schema - existing_datalake.partitioning = datalake.partitioning - existing_datalake.entity_mapping = datalake.entity_mapping.model_dump() if datalake.entity_mapping else None - existing_datalake.sources = datalake.sources - - await db.commit() - await db.refresh(existing_datalake) - - return { - "message": f"Datalake '{datalake.dataset_id}' updated successfully" - } - else: - # Create new datalake entry - datalake_data = datalake.model_dump() - - db_datalake = Datalake(**datalake_data) - db.add(db_datalake) - await db.commit() - await db.refresh(db_datalake) - - return { - "message": f"Datalake '{datalake.dataset_id}' registered successfully", - "datalake": { - "dataset_id": db_datalake.dataset_id, - "data_location": db_datalake.data_location, - "data_format": db_datalake.data_format, - "description": db_datalake.description, - "data_schema": db_datalake.data_schema, - "ducklake_data_path": db_datalake.ducklake_data_path - } - } - -@router.get("/search-terms2") +@router.get("/search-terms2", deprecated=True) async def search_terms_batch( types: str = Query(..., description="Comma-separated list of ngram terms"), date: Optional[str] = Query(None, description="First system focus date (YYYY-MM-DD)"), @@ -272,12 +60,12 @@ async def search_terms_batch( if not systems_input: raise HTTPException(status_code=400, detail="At least one of date or date2 must be provided") - query = select(Datalake).where(Datalake.dataset_id == "wikigrams") + query = select(Datalake).where(Datalake.domain == "wikimedia", Datalake.dataset_id == "ngrams") result = await db.execute(query) datalake = result.scalar_one_or_none() if not datalake: - raise HTTPException(status_code=404, detail="Wikigrams datalake not found") + raise HTTPException(status_code=404, detail="'ngrams' dataset not found") granularity_mapping = { "daily": ("wikigrams", "date"), @@ -527,267 +315,7 @@ async def search_terms_batch( raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") -@router.get("/{dataset_id}") -async def get_datalake_info( - dataset_id: str, - db: AsyncSession = Depends(get_db_session) -): - """Get metadata for a specific datalake.""" - - # Look up datalake in registry - query = select(Datalake).where(Datalake.dataset_id == dataset_id) - result = await db.execute(query) - datalake = result.scalar_one_or_none() - - if not datalake: - raise HTTPException( - status_code=404, - detail=f"Datalake '{dataset_id}' not found" - ) - - # Return all stored metadata - return { - "dataset_id": datalake.dataset_id, - "data_location": datalake.data_location, - "data_format": datalake.data_format, - "description": datalake.description, - # "tables_metadata": datalake.tables_metadata, - "ducklake_data_path": datalake.ducklake_data_path, - "data_schema": datalake.data_schema, - "partitioning": datalake.partitioning, - "entity_mapping": datalake.entity_mapping, - "sources": datalake.sources, - "created_at": datalake.created_at, - "updated_at": datalake.updated_at - } - -@router.get("/{dataset_id}/adapter") -async def get_adapter_info( - dataset_id: str, - db: AsyncSession = Depends(get_db_session) -): - """Get metadata for a specific datalake.""" - - # Look up datalake in registry - query = select(Datalake).where(Datalake.dataset_id == dataset_id) - result = await db.execute(query) - datalake = result.scalar_one_or_none() - - if not datalake: - raise HTTPException( - status_code=404, - detail=f"Datalake '{dataset_id}' not found" - ) - - try: - # Get DuckDB connection - duckdb_client = get_duckdb_client() - conn = duckdb_client.connect() - - print(datalake) - # Use stored metadata to get exact file paths for current versions - if not datalake.tables_metadata: - raise HTTPException( - status_code=500, - detail="Datalake metadata is missing. Please re-register the datalake with proper tables_metadata." - ) - - if datalake.data_format == "parquet_hive": - if not datalake.entity_mapping or not datalake.entity_mapping.get("path"): - raise HTTPException( - status_code=500, - detail="Missing entity_mapping.path for parquet_hive format." - ) - adapter_path = [datalake.entity_mapping["path"]] - else: - adapter_fnames = datalake.tables_metadata.get("adapter") - if not adapter_fnames: - raise HTTPException( - status_code=500, - detail="Missing adapter file paths. Required: tables_metadata.adapter" - ) - adapter_path = [ - f"{datalake.data_location}/{datalake.ducklake_data_path}/main/adapter/{fname}" for fname in adapter_fnames - ] - - # Execute comparative queries - results = {} - - # Query for each combination of date ranges and locations - - sql_query = f""" - SELECT - * - FROM read_parquet(?) - """ - - cursor = conn.execute(sql_query, [adapter_path]) - query_results = cursor.fetchall() - - print(f"🔍 Final results object: {query_results}") - return query_results - - except Exception as e: - raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") - -@router.get("/babynames/top-ngrams") -async def get_babynames_top_ngrams( - dates: str = Query(default="1991,1993"), # First date range - dates2: Optional[str] = Query(default=None), # Optional second date range - locations: str = Query(default="wikidata:Q30"), # Single location - sex: Optional[str] = 'M', - limit: int = 100, - db: AsyncSession = Depends(get_db_session) -): - """Get top baby names with flexible comparative analysis. - - Supports: - - Single date range: dates=["1950,1952"] - - Dual date ranges: dates=["1950,1952", "1991,1993"] (requires single location) - - Single location: locations=["wikidata:Q30"] - - Multiple locations: locations=["wikidata:Q30", "wikidata:Q16"] (requires single date range) - - Examples: - - Temporal comparison: dates=["1950,1952", "1991,1993"]&locations=["wikidata:Q30"] - - Geographic comparison: dates=["1950,1952"]&locations=["wikidata:Q30", "wikidata:Q16"] - - Simple query: dates=["1950,1952"]&locations=["wikidata:Q30"] - - Returns structured data for comparison visualization. - """ - - # Parse dates parameters - # dates="1991,1993" - date_ranges = [] - - # Parse first date range - years1 = [int(y) for y in dates.split(',')] - if len(years1) == 1: - years1.append(years1[0]) # Single year becomes range [year, year] - date_ranges.append(years1) - - # Parse optional second date range - if dates2: - years2 = [int(y) for y in dates2.split(',')] - if len(years2) == 1: - years2.append(years2[0]) # Single year becomes range [year, year] - date_ranges.append(years2) - - # Single location (no longer a list) - # locations="wikidata:Q176" - location_list = [locations] - - # Look up babynames datalake - query = select(Datalake).where(Datalake.dataset_id == "babynames") - result = await db.execute(query) - datalake = result.scalar_one_or_none() - - if not datalake: - raise HTTPException(status_code=404, detail="Babynames datalake not found") - - try: - # Get DuckDB connection - duckdb_client = get_duckdb_client() - conn = duckdb_client.connect() - - print(datalake) - # Use stored metadata to get exact file paths for current versions - if not datalake.tables_metadata: - raise HTTPException( - status_code=500, - detail="Datalake metadata is missing. Please re-register the datalake with proper tables_metadata." - ) - - # babynames_fnames=["ducklake-019af69d-b42c-7877-9073-3f440e2ee162.parquet", "ducklake-019af69d-d7d5-7c8d-b38e-6ef9eca5d606.parquet"] - babynames_fnames = datalake.tables_metadata.get("babynames") - - adapter_fnames = datalake.tables_metadata.get("adapter") - - if not babynames_fnames or not adapter_fnames: - raise HTTPException( - status_code=500, - detail="Missing babynames or adapter file paths. Required: tables_metadata.babynames and tables_metadata.adapter" - ) - - # Construct full paths by combining data_location with the filenames - # For ducklake format, files are stored in metadata.ducklake.files/main/ subdirectories - # babynames_path="/users/j/s/jstonge1/babynames/metadata.ducklake.files/main/babynames/*parquet" - babynames_path = [ - f"{datalake.data_location}/{datalake.ducklake_data_path}/main/babynames/{fname}" for fname in babynames_fnames - ] - # adapter_path= "/users/j/s/jstonge1/babynames/metadata.ducklake.files/main/adapter/*parquet" - adapter_path = [ - f"{datalake.data_location}/{datalake.ducklake_data_path}/main/adapter/{fname}" for fname in adapter_fnames - ] - - # Execute comparative queries - results = {} - - # Query for each combination of date ranges and locations - for date_range in date_ranges: - for location in location_list: - # Create key for result structure - if len(date_ranges) > 1: - # Temporal comparison: use readable date format - if date_range[0] == date_range[1]: - key = str(date_range[0]) # Single year: "1990" - else: - key = f"{date_range[0]}-{date_range[1]}" # Range: "2010-2015" - elif len(location_list) > 1: - # Geographic comparison: use location ID - key = location.replace(":", "_").replace("-", "_") - else: - key = "data" # Single query, return simple format - - sql_query = f""" - SELECT - b.types, - SUM(b.counts) as counts - FROM read_parquet(?) b - LEFT JOIN read_parquet(?) a ON b.geo = a.local_id - WHERE b.year BETWEEN ? AND ? - AND a.entity_id = ? - """ - - if sex: - sql_query += " AND b.sex = ?" - - sql_query += f""" - GROUP BY b.types - ORDER BY counts DESC - LIMIT ? - """ - - print(babynames_path) - cursor = conn.execute(sql_query, [babynames_path, adapter_path, date_range[0], date_range[1], location, sex, limit]) - query_results = cursor.fetchall() - - - # Structure results for comparison or simple format - try: - if key == "data": - # Simple single query - return flat array for backwards compatibility - formatted_results = [] - for row in query_results: - formatted_results.append({"types": row[0], "counts": row[1]}) - return formatted_results - else: - # Comparative query - return array directly under the key - formatted_results = [] - for row in query_results: - formatted_results.append({"types": row[0], "counts": row[1]}) - results[key] = formatted_results - except Exception as format_error: - print(f"❌ Error formatting results: {format_error}") - print(f"❌ Raw query_results: {query_results}") - raise - - print(f"🔍 Final results object: {results}") - return results - - except Exception as e: - raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") - -@router.get("/wikigrams/top-ngrams") +@router.get("/wikigrams/top-ngrams", deprecated=True) async def get_wikigrams_top_ngrams( dates: str = Query(default="2024-11-01,2024-11-07"), # First date range dates2: Optional[str] = Query(default=None), # Optional second date range @@ -835,13 +363,13 @@ async def get_wikigrams_top_ngrams( # Single location (no longer a list) location_list = [locations] - # Look up wikigrams datalake - query = select(Datalake).where(Datalake.dataset_id == "wikigrams") + # Look up ngrams dataset + query = select(Datalake).where(Datalake.domain == "wikimedia", Datalake.dataset_id == "ngrams") result = await db.execute(query) datalake = result.scalar_one_or_none() if not datalake: - raise HTTPException(status_code=404, detail="Wikigrams datalake not found") + raise HTTPException(status_code=404, detail="'ngrams' dataset not found") # Map granularity to table name and time column granularity_mapping = { @@ -983,7 +511,7 @@ async def get_wikigrams_top_ngrams( except Exception as e: raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") -@router.get("/search-term/{term}") +@router.get("/search-term/{term}", deprecated=True) async def search_term( term: str, location: str = Query("wikidata:Q30", description="Location entity ID (e.g., wikidata:Q30 for United States)"), @@ -1014,13 +542,13 @@ async def search_term( except ValueError as e: raise HTTPException(status_code=400, detail=f"Invalid date format. Use YYYY-MM-DD: {e}") - # Look up wikigrams datalake - query = select(Datalake).where(Datalake.dataset_id == "wikigrams") + # Look up ngrams dataset + query = select(Datalake).where(Datalake.domain == "wikimedia", Datalake.dataset_id == "ngrams") result = await db.execute(query) datalake = result.scalar_one_or_none() if not datalake: - raise HTTPException(status_code=404, detail="Wikigrams datalake not found") + raise HTTPException(status_code=404, detail="'ngrams' dataset not found") granularity_mapping = { "daily": ("wikigrams", "date"), @@ -1148,20 +676,20 @@ async def search_term( # ── Wiki Revisions endpoints ───────────────────────────────────────────────── -# Registered as "wiki_revisions" datalake; Hive-partitioned by identifier. +# Registered as "revisions" dataset (domain=wikimedia); Hive-partitioned by identifier. async def _get_revisions_path(db: AsyncSession) -> str: """Look up revisions data path from datalake DB.""" - query = select(Datalake).where(Datalake.dataset_id == "wiki_revisions") + query = select(Datalake).where(Datalake.domain == "wikimedia", Datalake.dataset_id == "revisions") result = await db.execute(query) datalake = result.scalar_one_or_none() if not datalake: - raise HTTPException(status_code=404, detail="wiki_revisions datalake not found") + raise HTTPException(status_code=404, detail="'revisions' dataset not found") return datalake.data_location -@router.get("/wikigrams/revisions") +@router.get("/wikigrams/revisions", deprecated=True) async def list_revision_articles( min_revisions: int = Query(default=1, description="Minimum revision count filter"), limit: int = Query(default=100, description="Max articles to return"), @@ -1208,7 +736,7 @@ async def list_revision_articles( raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") -@router.get("/wikigrams/revisions/{identifier}") +@router.get("/wikigrams/revisions/{identifier}", deprecated=True) async def get_revision_deltas( identifier: str, db: AsyncSession = Depends(get_db_session), @@ -1298,103 +826,3 @@ async def get_revision_deltas( except Exception as e: raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") - -@router.get("/{dataset_id}/validate-sources") -async def validate_datalake_sources( - dataset_id: str, - db: AsyncSession = Depends(get_db_session) -): - """Validate that all source URLs for a datalake are still accessible. - - Returns status for each source URL in the datalake's sources metadata. - """ - # Look up datalake - query = select(Datalake).where(Datalake.dataset_id == dataset_id) - result = await db.execute(query) - datalake = result.scalar_one_or_none() - - if not datalake: - raise HTTPException( - status_code=404, - detail=f"Datalake '{dataset_id}' not found" - ) - - if not datalake.sources: - return { - "dataset_id": dataset_id, - "message": "No sources configured for validation", - "sources": {} - } - - validation_results = {} - - # Iterate through dimensions and their sources - for dimension, locations in datalake.sources.items(): - validation_results[dimension] = {} - - for location, urls in locations.items(): - # Handle both single URL (string) and multiple URLs (list) - url_list = [urls] if isinstance(urls, str) else urls - location_results = [] - - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - } - - async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client: - for url in url_list: - method_used = "HEAD" - try: - # Try HEAD first (faster) - response = await client.head(url) - - # If HEAD fails with 403/405, try GET with Range (some servers block HEAD for downloads) - if response.status_code in [403, 405]: - method_used = "GET" - response = await client.get(url, headers={"Range": "bytes=0-0"}) - - location_results.append({ - "url": url, - "status": "accessible" if response.status_code < 400 else "error", - "status_code": response.status_code, - "method": method_used - }) - except httpx.TimeoutException: - location_results.append({ - "url": url, - "status": "timeout", - "error": "Request timed out after 10 seconds" - }) - except httpx.RequestError as e: - location_results.append({ - "url": url, - "status": "error", - "error": str(e) - }) - - validation_results[dimension][location] = location_results - - # Calculate summary - total_urls = sum( - len(locations) - for dimension_results in validation_results.values() - for locations in dimension_results.values() - ) - accessible_urls = sum( - 1 - for dimension_results in validation_results.values() - for locations in dimension_results.values() - for result in locations - if result.get("status") == "accessible" - ) - - return { - "dataset_id": dataset_id, - "summary": { - "total_urls": total_urls, - "accessible": accessible_urls, - "inaccessible": total_urls - accessible_urls, - "all_accessible": accessible_urls == total_urls - }, - "sources": validation_results - } \ No newline at end of file diff --git a/backend/app/routers/datasets.py b/backend/app/routers/datasets.py index 8582bea..84e2544 100644 --- a/backend/app/routers/datasets.py +++ b/backend/app/routers/datasets.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, HTTPException, Depends +from fastapi import APIRouter, HTTPException, Depends, Query from fastapi.responses import StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select @@ -7,7 +7,6 @@ import io import json from ..core.database import get_db_session -from ..core.duckdb_client import get_duckdb_client from ..models.annotation_datasets import AcademicResearchGroups, AcademicResearchGroupCreate, GoogleScholarVenues from ..routers.auth import get_admin_user, get_current_active_user from ..models.auth import User @@ -72,7 +71,6 @@ async def list_datasets(): "total": len(datasets) } -# Label Studio functions removed - no longer needed @admin_router.post("/academic-research-groups") async def create_academic_research_group( @@ -467,7 +465,8 @@ def parse_int(value): except Exception as e: await db.rollback() raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}") - + + #!TODO: fix once we have storage # @router.get("/s2orc/arxiv/stream") # async def stream_arxiv_texts( diff --git a/backend/app/routers/registry.py b/backend/app/routers/registry.py new file mode 100644 index 0000000..c097088 --- /dev/null +++ b/backend/app/routers/registry.py @@ -0,0 +1,267 @@ +""" +Dataset registry endpoints — discover and inspect registered file-based datasets. +""" + +from fastapi import APIRouter, HTTPException, Depends +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from typing import Dict, Any, Optional, List, Union +from pydantic import BaseModel, Field +import httpx + +from ..core.database import get_db_session +from ..core.duckdb_client import get_duckdb_client +from ..models.registry import Dataset +from ..routers.auth import get_admin_user +from ..models.auth import User + +router = APIRouter() +admin_router = APIRouter() + +# Domains that have actual registered routers. Update when adding a new router to main.py. +VALID_DOMAINS = { + "wikimedia", + "storywrangler", + "babynames", + "datalakes", + "open-academic-analytics", + "scisciDB", + "annotations", +} + + +class EntityMappingConfig(BaseModel): + table: Optional[str] = Field(None, description="DuckLake table name (ducklake format only)") + path: Optional[str] = Field(None, description="Absolute path to adapter parquet file (parquet_hive format only)") + local_id_column: str = Field(..., description="Column name in the adapter that holds the dataset-local identifier") + entity_id_column: str = Field(..., description="Column name in the adapter that holds the canonical entity ID (e.g. 'wikidata:Q30')") + + +class DatasetCreate(BaseModel): + """Register a dataset backend so API endpoints can discover and query it. + + The platform supports three data formats: + - parquet_hive: Hive-partitioned parquet tree on disk (most common for large tabular data) + - duckdb: Plain DuckDB database file + - ducklake: DuckLake catalog (versioned, schema-aware DuckDB extension) + + The (domain, dataset_id) pair must be unique. Use domain to group datasets by the + router/service that owns them (e.g. 'wikimedia', 'storywrangler'). + """ + dataset_id: str = Field(..., description="Short identifier, unique within domain. e.g. 'ngrams', 'revisions', 'babynames'") + domain: str = Field(..., description="Owning service or router. Groups related datasets. e.g. 'wikimedia', 'storywrangler', 'babynames'") + data_location: str = Field(..., description="Absolute path to the root of the dataset on disk (parquet_hive/duckdb) or connection string (ducklake)") + data_format: str = Field("parquet_hive", description="Storage format. One of: parquet_hive | duckdb | ducklake") + description: Optional[str] = Field(None, description="Human-readable description of the dataset") + tables_metadata: Optional[Dict] = Field( + None, + description=( + "parquet_hive only. Maps logical table names to lists of representative file paths. " + "Keys must match physical Hive partition directory names under data_location. " + "Example: {\"events\": [\"date=2024-01-01/geo=Q30/part.parquet\"], \"adapter\": [\"adapter.parquet\"]}" + ) + ) + ducklake_data_path: Optional[str] = Field(None, description="ducklake only. Path to the DuckLake catalog .duckdb file") + data_schema: Optional[Dict[str, str]] = Field(None, description="ducklake only. Column name → DuckDB type, for query reference. e.g. {\"title\": \"VARCHAR\", \"views\": \"BIGINT\"}") + partitioning: Optional[Dict] = Field(None, description="Partitioning scheme description. e.g. {\"keys\": [\"date\", \"geo\"], \"granularity\": \"daily\"}") + entity_mapping: Optional[EntityMappingConfig] = Field(None, description="How to map canonical entity IDs (e.g. Wikidata QIDs) to dataset-local identifiers. Required for geo/entity filtering endpoints.") + sources: Optional[Dict[str, Dict[str, Union[str, List[str]]]]] = Field(None, description="Source URLs for provenance/validation. e.g. {\"main\": {\"url\": \"https://...\"}}") + endpoint_schemas: Optional[List[Dict]] = Field(None, description="Endpoint types this dataset supports. Each entry: {type, time_dimension?, entity_dimensions?, filter_dimensions?}. e.g. [{\"type\": \"types-counts\", \"time_dimension\": \"date\", \"entity_dimensions\": [\"country\"]}]") + + +@admin_router.post("/register") +async def register_dataset( + dataset: DatasetCreate, + _current_user: User = Depends(get_admin_user), + db: AsyncSession = Depends(get_db_session) +): + """Register a new dataset or update an existing one.""" + if dataset.domain not in VALID_DOMAINS: + raise HTTPException( + status_code=422, + detail=f"Unknown domain '{dataset.domain}'. Valid domains: {sorted(VALID_DOMAINS)}" + ) + + existing_result = await db.execute( + select(Dataset).where(Dataset.domain == dataset.domain, Dataset.dataset_id == dataset.dataset_id) + ) + existing = existing_result.scalar_one_or_none() + + if existing: + existing.domain = dataset.domain + existing.data_location = dataset.data_location + existing.data_format = dataset.data_format + existing.description = dataset.description + existing.tables_metadata = dataset.tables_metadata + existing.ducklake_data_path = dataset.ducklake_data_path + existing.data_schema = dataset.data_schema + existing.partitioning = dataset.partitioning + existing.entity_mapping = dataset.entity_mapping.model_dump() if dataset.entity_mapping else None + existing.sources = dataset.sources + existing.endpoint_schemas = dataset.endpoint_schemas + + await db.commit() + await db.refresh(existing) + return {"message": f"Dataset '{dataset.dataset_id}' updated successfully"} + else: + db_dataset = Dataset(**dataset.model_dump()) + db.add(db_dataset) + await db.commit() + await db.refresh(db_dataset) + return { + "message": f"Dataset '{dataset.dataset_id}' registered successfully", + "dataset": { + "dataset_id": db_dataset.dataset_id, + "data_location": db_dataset.data_location, + "data_format": db_dataset.data_format, + "description": db_dataset.description, + "data_schema": db_dataset.data_schema, + "ducklake_data_path": db_dataset.ducklake_data_path + } + } + + +@router.get("/") +async def list_registered_datasets(db: AsyncSession = Depends(get_db_session)): + """List all registered datasets.""" + query = select(Dataset).order_by(Dataset.domain, Dataset.dataset_id) + result = await db.execute(query) + datasets = result.scalars().all() + return { + "datasets": [ + { + "domain": ds.domain, + "dataset_id": ds.dataset_id, + "data_location": ds.data_location, + "data_format": ds.data_format, + "description": ds.description, + "created_at": ds.created_at, + "updated_at": ds.updated_at, + } + for ds in datasets + ], + "total": len(datasets), + } + + +@router.get("/{domain}/{dataset_id}") +async def get_dataset_info( + domain: str, + dataset_id: str, + db: AsyncSession = Depends(get_db_session), +): + """Get metadata for a specific registered dataset.""" + result = await db.execute( + select(Dataset).where(Dataset.domain == domain, Dataset.dataset_id == dataset_id) + ) + ds = result.scalar_one_or_none() + if not ds: + raise HTTPException(status_code=404, detail=f"Dataset '{domain}/{dataset_id}' not found") + return { + "domain": ds.domain, + "dataset_id": ds.dataset_id, + "data_location": ds.data_location, + "data_format": ds.data_format, + "description": ds.description, + "ducklake_data_path": ds.ducklake_data_path, + "data_schema": ds.data_schema, + "partitioning": ds.partitioning, + "entity_mapping": ds.entity_mapping, + "sources": ds.sources, + "endpoint_schemas": ds.endpoint_schemas, + "created_at": ds.created_at, + "updated_at": ds.updated_at, + } + + +@router.get("/{domain}/{dataset_id}/adapter") +async def get_adapter_info( + domain: str, + dataset_id: str, + db: AsyncSession = Depends(get_db_session), +): + """Read the adapter table for a dataset (entity_id ↔ local_id mapping).""" + result = await db.execute( + select(Dataset).where(Dataset.domain == domain, Dataset.dataset_id == dataset_id) + ) + ds = result.scalar_one_or_none() + if not ds: + raise HTTPException(status_code=404, detail=f"Dataset '{domain}/{dataset_id}' not found") + + if not ds.tables_metadata: + raise HTTPException(status_code=500, detail="Dataset metadata is missing.") + + if ds.data_format == "parquet_hive": + if not ds.entity_mapping or not ds.entity_mapping.get("path"): + raise HTTPException(status_code=500, detail="Missing entity_mapping.path for parquet_hive format.") + adapter_path = [ds.entity_mapping["path"]] + else: + adapter_fnames = ds.tables_metadata.get("adapter") + if not adapter_fnames: + raise HTTPException(status_code=500, detail="Missing adapter file paths. Required: tables_metadata.adapter") + base = f"{ds.data_location}/{ds.ducklake_data_path}" if ds.ducklake_data_path else ds.data_location + adapter_path = [ + f"{base}/main/adapter/{fname}" for fname in adapter_fnames + ] + + try: + conn = get_duckdb_client().connect() + rows = conn.execute("SELECT * FROM read_parquet(?)", [adapter_path]).fetchall() + return rows + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + +@router.get("/{domain}/{dataset_id}/validate-sources") +async def validate_dataset_sources( + domain: str, + dataset_id: str, + db: AsyncSession = Depends(get_db_session), +): + """Validate that all source URLs for a dataset are still accessible.""" + result = await db.execute( + select(Dataset).where(Dataset.domain == domain, Dataset.dataset_id == dataset_id) + ) + ds = result.scalar_one_or_none() + if not ds: + raise HTTPException(status_code=404, detail=f"Dataset '{domain}/{dataset_id}' not found") + + if not ds.sources: + return {"domain": domain, "dataset_id": dataset_id, "message": "No sources configured", "sources": {}} + + validation_results: Dict[str, Any] = {} + headers = {"User-Agent": "Mozilla/5.0"} + + for dimension, locations in ds.sources.items(): + validation_results[dimension] = {} + for location, urls in locations.items(): + url_list = [urls] if isinstance(urls, str) else urls + location_results = [] + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client: + for url in url_list: + method_used = "HEAD" + try: + response = await client.head(url) + if response.status_code in [403, 405]: + method_used = "GET" + response = await client.get(url, headers={"Range": "bytes=0-0"}) + location_results.append({ + "url": url, + "status": "accessible" if response.status_code < 400 else "error", + "status_code": response.status_code, + "method": method_used, + }) + except httpx.TimeoutException: + location_results.append({"url": url, "status": "timeout", "error": "Request timed out after 10 seconds"}) + except httpx.RequestError as e: + location_results.append({"url": url, "status": "error", "error": str(e)}) + validation_results[dimension][location] = location_results + + total_urls = sum(len(locs) for dim in validation_results.values() for locs in dim.values()) + accessible = sum(1 for dim in validation_results.values() for locs in dim.values() for r in locs if r.get("status") == "accessible") + return { + "domain": domain, + "dataset_id": dataset_id, + "summary": {"total_urls": total_urls, "accessible": accessible, "inaccessible": total_urls - accessible, "all_accessible": accessible == total_urls}, + "sources": validation_results, + } diff --git a/backend/app/routers/storywrangler.py b/backend/app/routers/storywrangler.py index fd6360a..2e12786 100644 --- a/backend/app/routers/storywrangler.py +++ b/backend/app/routers/storywrangler.py @@ -13,63 +13,43 @@ from ..core.database import get_db_session from ..core.duckdb_client import get_duckdb_client -from ..models.datalakes import Datalake -from .datalakes import get_parquet_paths, compute_partition_starts + +from ..models.registry import Dataset router = APIRouter() -def _load_ngrams(conn, datalake, table_name: str, time_column: str, +def _load_ngrams(conn, dataset_obj, granularity: str, time_column: str, date_range: List[str], location: str, limit: int) -> dict: - """Load top ngrams for one (date_range, location) system from a datalake. + """Load top ngrams for one (date_range, location) system from a dataset. Returns a dict with "types" and "counts" lists ready for allotax input. """ - ngrams_path, adapter_path = get_parquet_paths(datalake, table_name) + if not dataset_obj.entity_mapping or not dataset_obj.entity_mapping.get("path"): + raise HTTPException(status_code=500, detail="Dataset missing entity_mapping.path.") + + adapter_path = dataset_obj.entity_mapping["path"] # Resolve entity_id → local_id via adapter adapter_row = conn.execute( "SELECT local_id FROM read_parquet(?) WHERE entity_id = ? LIMIT 1", - [adapter_path, location] + [[adapter_path], location] ).fetchone() if not adapter_row: raise HTTPException(status_code=400, detail=f"Location '{location}' not found in adapter") - local_geo = quote(adapter_row[0], safe='') - - # Filter to relevant partition directories - granularity = {v[1]: k for k, v in { - "daily": ("wikigrams", "date"), - "weekly": ("wikigrams_weekly", "week"), - "monthly": ("wikigrams_monthly", "month"), - }.items()}.get(time_column, "daily") - partition_starts = compute_partition_starts(date_range[0], date_range[1], granularity) - - filtered_path = [ - p for p in ngrams_path - if any(f"{time_column}={ps}" in p for ps in partition_starts) - and f"geo={local_geo}" in p - ] - - if not filtered_path: - raise HTTPException( - status_code=400, - detail=f"No data found for {date_range[0]} to {date_range[1]} / location {location}" - ) + encoded_country = quote(adapter_row[0], safe='') + + glob_path = f"{dataset_obj.data_location}/{granularity}/country={encoded_country}/{time_column}=*/data_0.parquet" - # Use snapped partition boundaries in SQL — raw input dates won't match stored - # week/month column values (e.g. input "2024-11-07" vs stored "2024-11-04"). sql = f""" - SELECT w.types, SUM(w.counts) AS counts - FROM read_parquet(?) w - LEFT JOIN read_parquet(?) a ON w.geo = a.local_id - WHERE w.{time_column} BETWEEN ? AND ? - AND a.entity_id = ? - GROUP BY w.types + SELECT ngram, SUM(pv_count) AS counts + FROM read_parquet('{glob_path}') + WHERE {time_column} BETWEEN ? AND ? + GROUP BY ngram ORDER BY counts DESC LIMIT ? """ - rows = conn.execute(sql, [filtered_path, adapter_path, - partition_starts[0], partition_starts[-1], location, limit]).fetchall() + rows = conn.execute(sql, [date_range[0], date_range[1], limit]).fetchall() types = [r[0] for r in rows] counts = [float(r[1]) for r in rows] @@ -85,7 +65,8 @@ async def allotax_endpoint( dates2: str = Query(..., description="Date range for system 2, e.g. '2024-02-01,2024-02-28'"), location2: str = Query(..., description="Location entity ID for system 2, e.g. 'wikidata:Q16'"), # Dataset - dataset: str = Query("wikigrams", description="Datalake dataset_id to query"), + domain: str = Query("wikimedia", description="Domain owning the dataset, e.g. 'wikimedia'"), + dataset: str = Query("ngrams", description="Dataset ID within the domain, e.g. 'ngrams'"), granularity: str = Query("daily", description="Partition granularity: daily, weekly, monthly"), # Allotax params alpha: float = Query(1.0, description="RTD alpha parameter"), @@ -97,7 +78,7 @@ async def allotax_endpoint( ): """Compute allotaxonometer (rank-turbulence divergence) between two ngram distributions. - Loads raw ngrams server-side from the specified datalake, runs the full allotax + Loads raw ngrams server-side from the specified dataset, runs the full allotax pipeline in Rust via PyO3, and returns lean visualization data (~30-50KB). Response shape (single alpha): @@ -113,12 +94,7 @@ async def allotax_endpoint( if granularity not in ("daily", "weekly", "monthly"): raise HTTPException(status_code=400, detail="granularity must be daily, weekly, or monthly") - granularity_map = { - "daily": ("wikigrams", "date"), - "weekly": ("wikigrams_weekly", "week"), - "monthly": ("wikigrams_monthly", "month"), - } - table_name, time_column = granularity_map[granularity] + time_column = {"daily": "date", "weekly": "week", "monthly": "month"}[granularity] def parse_range(s: str) -> List[str]: parts = s.split(",") @@ -127,18 +103,13 @@ def parse_range(s: str) -> List[str]: dr1 = parse_range(dates) dr2 = parse_range(dates2) - # Look up datalake - result = await db.execute(select(Datalake).where(Datalake.dataset_id == dataset)) - datalake = result.scalar_one_or_none() - if not datalake: - raise HTTPException(status_code=404, detail=f"Datalake '{dataset}' not found") - - if not datalake.tables_metadata or table_name not in datalake.tables_metadata: - available = list(datalake.tables_metadata.keys()) if datalake.tables_metadata else [] - raise HTTPException( - status_code=400, - detail=f"Table '{table_name}' not available. Found: {available}" - ) + # Look up dataset + result = await db.execute( + select(Dataset).where(Dataset.domain == domain, Dataset.dataset_id == dataset) + ) + dataset_obj = result.scalar_one_or_none() + if not dataset_obj: + raise HTTPException(status_code=404, detail=f"Dataset '{domain}/{dataset}' not found") try: import allotax @@ -152,8 +123,8 @@ def parse_range(s: str) -> List[str]: duckdb_client = get_duckdb_client() conn = duckdb_client.connect() - sys1 = _load_ngrams(conn, datalake, table_name, time_column, dr1, location, ngram_limit) - sys2 = _load_ngrams(conn, datalake, table_name, time_column, dr2, location2, ngram_limit) + sys1 = _load_ngrams(conn, dataset_obj, granularity, time_column, dr1, location, ngram_limit) + sys2 = _load_ngrams(conn, dataset_obj, granularity, time_column, dr2, location2, ngram_limit) except HTTPException: raise @@ -174,6 +145,7 @@ def parse_range(s: str) -> List[str]: "meta": { "system1": {"dates": dates, "location": location, "ngrams": len(sys1["types"])}, "system2": {"dates": dates2, "location": location2, "ngrams": len(sys2["types"])}, + "domain": domain, "dataset": dataset, "granularity": granularity, } diff --git a/backend/app/routers/test_datalakes.py b/backend/app/routers/test_datalakes.py deleted file mode 100644 index 40321c3..0000000 --- a/backend/app/routers/test_datalakes.py +++ /dev/null @@ -1,161 +0,0 @@ -""" -Tests for datalakes API endpoints and helper functions. -""" - -import pytest -from datetime import datetime, timedelta -from ..routers.datalakes import parse_partition_values, find_overlapping_partitions - - -class TestPartitionHelpers: - """Test helper functions for partition parsing and filtering.""" - - def test_parse_partition_values_weekly(self): - """Test parsing week partition values from Hive-style paths.""" - file_paths = [ - "geo=United States/week=2025-03-03/data_0.parquet", - "geo=United States/week=2025-03-10/data_0.parquet", - "geo=Canada/week=2025-03-03/data_0.parquet", - "geo=Canada/week=2025-03-10/data_0.parquet", - ] - - result = parse_partition_values(file_paths, "week") - - assert result == ["2025-03-03", "2025-03-10"] - assert len(result) == 2 # Should deduplicate - - def test_parse_partition_values_monthly(self): - """Test parsing month partition values.""" - file_paths = [ - "geo=United States/month=2025-01-01/data_0.parquet", - "geo=United States/month=2025-02-01/data_0.parquet", - "geo=United States/month=2025-03-01/data_0.parquet", - ] - - result = parse_partition_values(file_paths, "month") - - assert result == ["2025-01-01", "2025-02-01", "2025-03-01"] - - def test_parse_partition_values_daily(self): - """Test parsing date partition values.""" - file_paths = [ - "geo=United States/date=2025-03-05/data_0.parquet", - "geo=United States/date=2025-03-06/data_0.parquet", - ] - - result = parse_partition_values(file_paths, "date") - - assert result == ["2025-03-05", "2025-03-06"] - - def test_parse_partition_values_empty(self): - """Test with no matching partitions.""" - file_paths = ["geo=United States/data_0.parquet"] - - result = parse_partition_values(file_paths, "week") - - assert result == [] - - def test_find_overlapping_partitions_daily(self): - """Test finding overlapping daily partitions.""" - available = ["2025-03-05", "2025-03-06", "2025-03-07", "2025-03-10"] - - result = find_overlapping_partitions( - "2025-03-05", "2025-03-07", available, "daily" - ) - - assert result == ["2025-03-05", "2025-03-06", "2025-03-07"] - assert "2025-03-10" not in result - - def test_find_overlapping_partitions_weekly_exact_match(self): - """Test weekly partitions with exact week boundary match.""" - # Week starts on Monday 2025-03-03, ends Sunday 2025-03-09 - available = ["2025-03-03", "2025-03-10", "2025-03-17"] - - # User requests data from Wed to next Wed (spans 2 weeks) - result = find_overlapping_partitions( - "2025-03-05", "2025-03-12", available, "weekly" - ) - - assert result == ["2025-03-03", "2025-03-10"] - - def test_find_overlapping_partitions_weekly_partial(self): - """Test weekly partitions with partial week overlap.""" - available = ["2025-03-03", "2025-03-10", "2025-03-17"] - - # User requests just 2 days within first week - result = find_overlapping_partitions( - "2025-03-05", "2025-03-06", available, "weekly" - ) - - # Should still include the week since it overlaps - assert result == ["2025-03-03"] - - def test_find_overlapping_partitions_monthly(self): - """Test monthly partitions.""" - available = ["2025-01-01", "2025-02-01", "2025-03-01"] - - # Request spans February - result = find_overlapping_partitions( - "2025-02-10", "2025-02-20", available, "monthly" - ) - - assert result == ["2025-02-01"] - - def test_find_overlapping_partitions_monthly_span_multiple(self): - """Test monthly partitions spanning multiple months.""" - available = ["2025-01-01", "2025-02-01", "2025-03-01"] - - # Request spans Jan 15 to Mar 15 (3 months) - result = find_overlapping_partitions( - "2025-01-15", "2025-03-15", available, "monthly" - ) - - assert result == ["2025-01-01", "2025-02-01", "2025-03-01"] - - def test_find_overlapping_partitions_no_overlap(self): - """Test when no partitions overlap with date range.""" - available = ["2025-01-01", "2025-02-01"] - - result = find_overlapping_partitions( - "2025-03-10", "2025-03-20", available, "monthly" - ) - - assert result == [] - - def test_find_overlapping_partitions_december_boundary(self): - """Test monthly partition calculation across year boundary.""" - available = ["2024-12-01", "2025-01-01"] - - # Request spans December - result = find_overlapping_partitions( - "2024-12-15", "2024-12-25", available, "monthly" - ) - - assert result == ["2024-12-01"] - assert "2025-01-01" not in result - - -# FastAPI endpoint tests would go here -# These require TestClient and mocked database/duckdb connections -# Example structure: -# -# from fastapi.testclient import TestClient -# from ..main import app -# -# client = TestClient(app) -# -# class TestWikigramsEndpoint: -# def test_wikigrams_daily_granularity(self): -# """Test querying with daily granularity.""" -# response = client.get( -# "/wikigrams/top-ngrams", -# params={ -# "dates": "2025-03-05,2025-03-06", -# "granularity": "daily", -# "locations": "wikidata:Q30" -# } -# ) -# assert response.status_code == 200 -# data = response.json() -# assert "metadata" in data -# assert data["metadata"]["granularity"] == "daily" diff --git a/backend/app/routers/wikimedia.py b/backend/app/routers/wikimedia.py index 46567c5..8c89024 100644 --- a/backend/app/routers/wikimedia.py +++ b/backend/app/routers/wikimedia.py @@ -1,14 +1,21 @@ -from fastapi import APIRouter, HTTPException, Query -from typing import Dict, List, Optional -from datetime import datetime -from pydantic import BaseModel, Field +from fastapi import APIRouter, HTTPException, Query, Depends +from typing import Dict, List, Optional, Any +from datetime import datetime, timedelta +from pydantic import BaseModel import time import asyncio -from ..core.database import get_mongo_client +from urllib.parse import quote +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from ..core.database import get_mongo_client, get_db_session +from ..core.duckdb_client import get_duckdb_client +from ..core.parquet_utils import compute_partition_starts +from ..models.registry import Dataset from better_profanity import profanity router = APIRouter() +WikimediaDataset = select(Dataset).where(Dataset.domain == "wikimedia") class NgramResult(BaseModel): types: str @@ -16,6 +23,7 @@ class NgramResult(BaseModel): probs: Optional[float] = None totalunique: Optional[int] = None +# ── mongoDB endpoints ──────────────────────────────────────────────── @router.get("/top-ngrams", response_model_exclude_unset=True) async def get_top_ngrams( @@ -271,3 +279,509 @@ def execute_rank_divergence(): except Exception as e: raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/top-ngrams2") +async def get_wikigrams_top_ngrams( + dates: str = Query(default="2024-11-01,2024-11-07"), + dates2: Optional[str] = Query(default=None), + locations: str = Query(default="wikidata:Q30"), + granularity: str = Query(default="daily"), + limit: int = 100, + db: AsyncSession = Depends(get_db_session) +): + """Get top Wikipedia n-grams with flexible comparative analysis. + + Supports single or dual date ranges, single location, and granularity selection. + Replaces /datalakes/wikigrams/top-ngrams. + """ + if granularity not in ["daily", "weekly", "monthly"]: + raise HTTPException(status_code=400, detail="granularity must be one of: daily, weekly, monthly") + + date_ranges = [] + dates_str1 = dates.split(',') + if len(dates_str1) == 1: + dates_str1.append(dates_str1[0]) + date_ranges.append(dates_str1) + + if dates2: + dates_str2 = dates2.split(',') + if len(dates_str2) == 1: + dates_str2.append(dates_str2[0]) + date_ranges.append(dates_str2) + + location_list = [locations] + + query = WikimediaDataset.where(Dataset.dataset_id == "ngrams") + result = await db.execute(query) + dataset_obj = result.scalar_one_or_none() + if not dataset_obj: + raise HTTPException(status_code=404, detail="'ngrams' dataset not found") + + time_column = {"daily": "date", "weekly": "week", "monthly": "month"}[granularity] + + if not dataset_obj.entity_mapping or not dataset_obj.entity_mapping.get("path"): + raise HTTPException(status_code=500, detail="Dataset missing entity_mapping.path. Please re-register.") + + try: + conn = get_duckdb_client().connect() + adapter_path = dataset_obj.entity_mapping["path"] + + results = {} + queried_partitions_metadata = [] + + for date_range in date_ranges: + for location in location_list: + row = conn.execute( + "SELECT local_id FROM read_parquet(?) WHERE entity_id = ? LIMIT 1", + [[adapter_path], location] + ).fetchone() + if not row: + raise HTTPException(status_code=400, detail=f"Location '{location}' not found in adapter") + encoded_country = quote(row[0], safe='') + + partition_starts = compute_partition_starts(date_range[0], date_range[1], granularity) + queried_partitions_metadata.append({"date_range": date_range, "partitions": partition_starts}) + + if len(date_ranges) > 1: + key = date_range[0] if date_range[0] == date_range[1] else f"{date_range[0]}_{date_range[1]}" + elif len(location_list) > 1: + key = location.replace(":", "_").replace("-", "_") + else: + key = "data" + + glob_path = f"{dataset_obj.data_location}/{granularity}/country={encoded_country}/{time_column}=*/data_0.parquet" + + rows = conn.execute(f""" + SELECT ngram, SUM(pv_count) as counts + FROM read_parquet('{glob_path}') + WHERE {time_column} BETWEEN ? AND ? + GROUP BY ngram + ORDER BY counts DESC + LIMIT ? + """, [date_range[0], date_range[1], limit]).fetchall() + + formatted = [{"types": r[0], "counts": r[1]} for r in rows] + + if key == "data": + return { + "data": formatted, + "metadata": {"granularity": granularity, "time_column": time_column, "queried_partitions": partition_starts} + } + else: + results[key] = formatted + + return { + **results, + "metadata": {"granularity": granularity, "time_column": time_column, "queries": queried_partitions_metadata} + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") + + +# ── DuckDB endpoints ──────────────────────────────────────────────── + +async def _get_revisions_path(db: AsyncSession) -> str: + """Look up revisions data path from dataset registry.""" + query = WikimediaDataset.where(Dataset.dataset_id == "revisions") + result = await db.execute(query) + rev_dataset = result.scalar_one_or_none() + if not rev_dataset: + raise HTTPException(status_code=404, detail="'revisions' dataset not found") + return rev_dataset.data_location + + +@router.get("/search-terms2") +async def search_terms_batch( + types: str = Query(..., description="Comma-separated list of ngram terms"), + date: Optional[str] = Query(None, description="First system focus date (YYYY-MM-DD)"), + date2: Optional[str] = Query(None, description="Second system focus date (YYYY-MM-DD)"), + location: str = Query("wikidata:Q30", description="First system location entity ID"), + location2: Optional[str] = Query(None, description="Second system location entity ID (defaults to location)"), + granularity: str = Query("daily", description="Granularity: daily, weekly, monthly"), + window_size: int = Query(7, description="Number of granularity periods before/after each focus date"), + db: AsyncSession = Depends(get_db_session) +): + """ + Batch sparkline lookup for multiple ngram terms across one or two systems. + + Two comparison modes: + - Temporal (date + date2, same location): ONE DuckDB scan — both windows' paths merged. + - Geographic (date, location + location2): TWO DuckDB scans — paths live in separate geo dirs. + + Results are keyed as system1/system2 so the frontend can render both sides + without coordinating parallel calls. + """ + if granularity not in ["daily", "weekly", "monthly"]: + raise HTTPException(status_code=400, detail="granularity must be one of: daily, weekly, monthly") + + for d_str in [date, date2]: + if d_str: + try: + datetime.fromisoformat(d_str) + except ValueError as e: + raise HTTPException(status_code=400, detail=f"Invalid date format. Use YYYY-MM-DD: {e}") + + terms = [t.strip() for t in types.split(",") if t.strip()] + if not terms: + raise HTTPException(status_code=400, detail="At least one term is required") + + systems_input: Dict[str, Dict] = {} + if date: + systems_input["system1"] = {"date": date, "location": location} + if date2: + systems_input["system2"] = {"date": date2, "location": location2 or location} + if not systems_input: + raise HTTPException(status_code=400, detail="At least one of date or date2 must be provided") + + query = WikimediaDataset.where(Dataset.dataset_id == "ngrams") + result = await db.execute(query) + dataset_obj = result.scalar_one_or_none() + + if not dataset_obj: + raise HTTPException(status_code=404, detail="Wikigrams dataset not found") + + time_column = {"daily": "date", "weekly": "week", "monthly": "month"}[granularity] + + has_top_articles = granularity == "daily" + + try: + if not dataset_obj.entity_mapping or not dataset_obj.entity_mapping.get("path"): + raise HTTPException(status_code=500, detail="Dataset missing entity_mapping.path. Please re-register.") + + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + adapter_path = dataset_obj.entity_mapping["path"] + path_prefix_index: Dict[str, List[str]] = {} + + placeholders = ",".join(["?" for _ in terms]) + start_time = time.time() + + unique_locations = {s["location"] for s in systems_input.values()} + geo_map: Dict[str, str] = {} + t_adapter = time.time() + for loc in unique_locations: + row = conn.execute( + "SELECT local_id FROM read_parquet(?) WHERE entity_id = ? LIMIT 1", + [[adapter_path], loc] + ).fetchone() + if not row: + raise HTTPException(status_code=400, detail=f"Location '{loc}' not found in adapter") + encoded_country = quote(row[0], safe='') + geo_map[loc] = encoded_country + glob_pat = f"{dataset_obj.data_location}/{granularity}/country={encoded_country}/{time_column}=*/data_0.parquet" + for (p,) in conn.execute("SELECT * FROM glob(?)", [glob_pat]).fetchall(): + dir_path = p.rsplit("/", 1)[0] + path_prefix_index.setdefault(dir_path, []).append(p) + t_adapter_ms = (time.time() - t_adapter) * 1000 + + window_unit_days = {"daily": 1, "weekly": 7, "monthly": 30}[granularity] + effective_window = window_size * window_unit_days + + per_system: Dict[str, Dict] = {} + t_filter = time.time() + for sys_key, system in systems_input.items(): + loc = system["location"] + local_geo = geo_map[loc] + focus_date = datetime.fromisoformat(system["date"]) + w_start = (focus_date - timedelta(days=effective_window)).strftime("%Y-%m-%d") + w_end = (focus_date + timedelta(days=effective_window)).strftime("%Y-%m-%d") + window_partitions = compute_partition_starts(w_start, w_end, granularity) + focus_partition = compute_partition_starts(system["date"], system["date"], granularity)[0] + + base = f"{dataset_obj.data_location}/{granularity}/country={local_geo}" + query_paths = [] + for ps in window_partitions: + query_paths.extend(path_prefix_index.get(f"{base}/{time_column}={ps}", [])) + + if not query_paths: + raise HTTPException(status_code=404, detail=f"No data found for {sys_key} ({system['date']}, {loc})") + + focus_paths = path_prefix_index.get(f"{base}/{time_column}={focus_partition}", []) + + per_system[sys_key] = { + "loc": loc, + "focus_date_str": system["date"], + "window_partitions": window_partitions, + "window_set": set(window_partitions), + "focus_partition": focus_partition, + "query_paths": query_paths, + "focus_paths": focus_paths, + } + t_filter_ms = (time.time() - t_filter) * 1000 + + all_geos = {geo_map[s["location"]] for s in systems_input.values()} + temporal_comparison = len(systems_input) == 2 and len(all_geos) == 1 + total_paths = sum(len(v) for v in path_prefix_index.values()) + print(f" setup: adapter+paths={t_adapter_ms:.0f}ms, filter={t_filter_ms:.0f}ms | total_paths={total_paths}") + + system_results: Dict[str, Dict] = {} + + if temporal_comparison: + s1 = per_system["system1"] + s2 = per_system["system2"] + combined_paths = sorted(set(s1["query_paths"]) | set(s2["query_paths"])) + range_start = min(s1["window_partitions"][0], s2["window_partitions"][0]) + range_end = max(s1["window_partitions"][-1], s2["window_partitions"][-1]) + + spark_sql = f""" + SELECT + w.ngram, + w.{time_column}, + MIN(w.pv_rank) AS rank, + SUM(w.pv_count) AS counts + FROM read_parquet(?) w + WHERE w.{time_column} BETWEEN ? AND ? + AND w.ngram IN ({placeholders}) + GROUP BY w.ngram, w.{time_column} + ORDER BY w.ngram, w.{time_column} + """ + t_query = time.time() + cursor = conn.execute(spark_sql, [combined_paths, range_start, range_end] + terms) + t_spark_ms = (time.time() - t_query) * 1000 + + rows = cursor.fetchall() + cols = [desc[0] for desc in cursor.description] + + for sys_key, meta in per_system.items(): + system_results[sys_key] = { + "date": meta["focus_date_str"], + "location": meta["loc"], + "sparkData": {t: [] for t in terms}, + "topArticles": {}, + } + + for row in rows: + d = dict(zip(cols, row)) + term = d["ngram"] + date_val = str(d[time_column]) + point = {time_column: d[time_column], "rank": d["rank"], "counts": d["counts"]} + if date_val in s1["window_set"]: + system_results["system1"]["sparkData"][term].append(point) + if date_val in s2["window_set"]: + system_results["system2"]["sparkData"][term].append(point) + + t_articles = time.time() + if has_top_articles: + focus_paths = sorted(set(s1["focus_paths"]) | set(s2["focus_paths"])) + if focus_paths: + try: + art_cursor = conn.execute(f""" + SELECT + w.ngram, + ARG_MIN(w.top_articles, w.pv_rank) FILTER (WHERE w.{time_column} = ?) AS top_articles_s1, + ARG_MIN(w.top_articles, w.pv_rank) FILTER (WHERE w.{time_column} = ?) AS top_articles_s2 + FROM read_parquet(?) w + WHERE w.ngram IN ({placeholders}) + GROUP BY w.ngram + """, [s1["focus_partition"], s2["focus_partition"], focus_paths] + terms) + for row in art_cursor.fetchall(): + d = dict(zip([c[0] for c in art_cursor.description], row)) + if d.get("top_articles_s1") is not None: + system_results["system1"]["topArticles"][d["ngram"]] = d["top_articles_s1"] + if d.get("top_articles_s2") is not None: + system_results["system2"]["topArticles"][d["ngram"]] = d["top_articles_s2"] + except Exception: + pass + t_articles_ms = (time.time() - t_articles) * 1000 + + print(f" temporal: {len(combined_paths)} paths, spark={t_spark_ms:.0f}ms, articles={t_articles_ms:.0f}ms ({len(s1['focus_paths'])+len(s2['focus_paths'])} focus files)") + + else: + for sys_key, meta in per_system.items(): + query_paths = meta["query_paths"] + + t_query = time.time() + cursor = conn.execute(f""" + SELECT + w.ngram, + w.{time_column}, + MIN(w.pv_rank) AS rank, + SUM(w.pv_count) AS counts + FROM read_parquet(?) w + WHERE w.{time_column} BETWEEN ? AND ? + AND w.ngram IN ({placeholders}) + GROUP BY w.ngram, w.{time_column} + ORDER BY w.ngram, w.{time_column} + """, [query_paths, meta["window_partitions"][0], meta["window_partitions"][-1]] + terms) + t_query_ms = (time.time() - t_query) * 1000 + + rows = cursor.fetchall() + cols = [desc[0] for desc in cursor.description] + + spark_data: Dict[str, List[Dict]] = {t: [] for t in terms} + for row in rows: + d = dict(zip(cols, row)) + spark_data[d["ngram"]].append({ + time_column: d[time_column], + "rank": d["rank"], + "counts": d["counts"], + }) + + top_articles: Dict[str, Any] = {} + t_articles = time.time() + if has_top_articles and meta["focus_paths"]: + try: + art_cursor = conn.execute(f""" + SELECT + w.ngram, + ARG_MIN(w.top_articles, w.pv_rank) AS top_articles + FROM read_parquet(?) w + WHERE w.{time_column} = ? + AND w.ngram IN ({placeholders}) + GROUP BY w.ngram + """, [meta["focus_paths"], meta["focus_partition"]] + terms) + for row in art_cursor.fetchall(): + d = dict(zip([c[0] for c in art_cursor.description], row)) + if d.get("top_articles") is not None: + top_articles[d["ngram"]] = d["top_articles"] + except Exception: + pass + t_articles_ms = (time.time() - t_articles) * 1000 + + system_results[sys_key] = { + "date": meta["focus_date_str"], + "location": meta["loc"], + "sparkData": spark_data, + "topArticles": top_articles, + } + print(f" {sys_key}: {len(query_paths)} paths, spark={t_query_ms:.0f}ms, articles={t_articles_ms:.0f}ms ({len(meta['focus_paths'])} focus files)") + + duration = (time.time() - start_time) * 1000 + print(f"searchTermsBatch total={duration:.2f}ms — {'temporal' if temporal_comparison else 'geographic'} for {len(terms)} terms × {len(systems_input)} systems") + + return {**system_results, "duration": duration} + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") + + +@router.get("/revisions") +async def list_revision_articles( + min_revisions: int = Query(default=1, description="Minimum revision count filter"), + limit: int = Query(default=100, description="Max articles to return"), + db: AsyncSession = Depends(get_db_session), +): + """List articles with extracted revision histories.""" + try: + query = WikimediaDataset.where(Dataset.dataset_id == "revisions") + result = await db.execute(query) + rev_dataset = result.scalar_one_or_none() + if not rev_dataset: + raise HTTPException(status_code=404, detail="'revisions' dataset not found") + + adapter_path = rev_dataset.entity_mapping["path"] + start_time = time.time() + + conn = get_duckdb_client().connect() + rows = conn.execute( + "SELECT identifier, name, revision_count, first_edit, last_edit FROM read_parquet(?) WHERE revision_count >= ? ORDER BY revision_count DESC LIMIT ?", + [adapter_path, min_revisions, limit] + ).fetchall() + + duration = (time.time() - start_time) * 1000 + articles = [ + {"identifier": r[0], "name": r[1], "revision_count": r[2], "first_edit": r[3], "last_edit": r[4]} + for r in rows + ] + return {"articles": articles, "total": len(articles), "duration": duration} + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + +@router.get("/revisions/{identifier}") +async def get_revision_deltas( + identifier: str, + db: AsyncSession = Depends(get_db_session), +): + """Delta-encoded revision history for one article. + + Returns one entry per revision. The first revision (revision_idx=0) contains + the full token map. Subsequent revisions contain only changed tokens + (value 0 = token removed). + """ + try: + revisions_path = await _get_revisions_path(db) + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + start_time = time.time() + + rows = conn.execute(f""" + WITH ordered AS ( + SELECT *, + ROW_NUMBER() OVER (ORDER BY revision_id::BIGINT) - 1 AS rev_seq, + json(ngram_counts)::MAP(VARCHAR, INTEGER) AS m + FROM read_parquet('{revisions_path}/identifier={identifier}/*.parquet') + ), + curr AS ( + SELECT rev_seq, + unnest(map_keys(m)) AS token, + unnest(map_values(m)) AS curr_count + FROM ordered + ), + prev AS ( + SELECT rev_seq + 1 AS rev_seq, + unnest(map_keys(m)) AS token, + unnest(map_values(m)) AS prev_count + FROM ordered + ), + diffs AS ( + SELECT COALESCE(c.rev_seq, p.rev_seq) AS rev_seq, + COALESCE(c.token, p.token) AS token, + COALESCE(c.curr_count, 0) AS new_count + FROM curr c + FULL OUTER JOIN prev p + ON c.rev_seq = p.rev_seq AND c.token = p.token + WHERE prev_count IS NULL + OR curr_count IS NULL + OR curr_count != prev_count + ), + delta_agg AS ( + SELECT rev_seq, + json_group_object(token, new_count) AS delta + FROM diffs + GROUP BY rev_seq + ) + SELECT o.revision_id, + o.name, + o.date_modified, + o.revision_comment, + o.categories, + COALESCE(d.delta, '{{}}') AS token_diff + FROM ordered o + LEFT JOIN delta_agg d ON o.rev_seq = d.rev_seq + ORDER BY o.rev_seq + """).fetchall() + + if not rows: + raise HTTPException(status_code=404, detail=f"No revisions found for identifier {identifier}") + + duration = (time.time() - start_time) * 1000 + + return { + "revisions": [ + { + "revision_id": r[0], + "name": r[1], + "date_modified": r[2], + "revision_comment": r[3], + "categories": r[4], + "token_diff": r[5], + } + for r in rows + ], + "duration": duration, + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") diff --git a/frontend/src/lib/stories/allotaxonometry/allotax.remote.js b/frontend/src/lib/stories/allotaxonometry/allotax.remote.js index f9780ae..be447d0 100644 --- a/frontend/src/lib/stories/allotaxonometry/allotax.remote.js +++ b/frontend/src/lib/stories/allotaxonometry/allotax.remote.js @@ -8,7 +8,7 @@ const API_BASE_URL = API_BASE || 'http://localhost:8000' export const getAdapter = query(async () => { - const url = `${API_BASE_URL}/datalakes/babynames/adapter` + const url = `${API_BASE_URL}/registry/babynames/babynames/adapter` console.log('Fetching available locations:', url) const response = await fetch(url) @@ -40,7 +40,7 @@ export const getTopBabyNames = query( limit: limit, }) - const url = `${API_BASE_URL}/datalakes/babynames/top-ngrams?${params.toString()}` + const url = `${API_BASE_URL}/babynames/top-ngrams?${params.toString()}` const response = await fetch(url) if (!response.ok) {