diff --git a/backend/app/routers/datalakes.py b/backend/app/routers/datalakes.py index 9142ce6..bbd812a 100644 --- a/backend/app/routers/datalakes.py +++ b/backend/app/routers/datalakes.py @@ -8,9 +8,9 @@ from typing import Dict, Any, Optional, List, Union from pydantic import BaseModel, Field import httpx -import re +import time from datetime import datetime, timedelta -from urllib.parse import unquote +from urllib.parse import quote from ..core.database import get_db_session from ..core.duckdb_client import get_duckdb_client from ..models.datalakes import Datalake, EntityMapping @@ -25,8 +25,8 @@ def get_parquet_paths(datalake, data_table_name: str): """Construct parquet file paths for data table and adapter table. - The paths stored in tables_metadata are relative paths from the ducklake data directory. - We prepend data_location to make them absolute. + For parquet_hive format: paths in tables_metadata are absolute; adapter comes from entity_mapping.path. + For ducklake format: paths are relative filenames; prepend data_location + /main/ + table_name/. """ if not datalake.tables_metadata: raise HTTPException( @@ -35,25 +35,40 @@ def get_parquet_paths(datalake, data_table_name: str): ) data_fnames = datalake.tables_metadata.get(data_table_name) - adapter_fnames = datalake.tables_metadata.get("adapter") - if not data_fnames or not adapter_fnames: + if not data_fnames: raise HTTPException( status_code=500, - detail=f"Missing {data_table_name} or adapter file paths. Required: tables_metadata.{data_table_name} and tables_metadata.adapter" + detail=f"Missing {data_table_name} file paths. Required: tables_metadata.{data_table_name}" ) - # Paths in tables_metadata are relative paths like "geo=US/date=2024-11-01/file.parquet" - # We need to construct: {data_location}/main/{table_name}/{relative_path} - # NOTE: Don't URL-decode - the filesystem actually has %20 in directory names - data_path = [ - f"{datalake.data_location}/main/{data_table_name}/{fname}" - for fname in data_fnames - ] - adapter_path = [ - f"{datalake.data_location}/main/adapter/{fname}" - for fname in adapter_fnames - ] + if datalake.data_format == "parquet_hive": + # Paths are already absolute + data_path = data_fnames + # Adapter path comes from entity_mapping.path + if not datalake.entity_mapping or not datalake.entity_mapping.get("path"): + raise HTTPException( + status_code=500, + detail="Missing entity_mapping.path for parquet_hive format. Please re-register with entity_mapping." + ) + adapter_path = [datalake.entity_mapping["path"]] + else: + # ducklake format: relative filenames, prepend data_location + # NOTE: Don't URL-decode - the filesystem actually has %20 in directory names + adapter_fnames = datalake.tables_metadata.get("adapter") + if not adapter_fnames: + raise HTTPException( + status_code=500, + detail="Missing adapter file paths. Required: tables_metadata.adapter" + ) + data_path = [ + f"{datalake.data_location}/main/{data_table_name}/{fname}" + for fname in data_fnames + ] + adapter_path = [ + f"{datalake.data_location}/main/adapter/{fname}" + for fname in adapter_fnames + ] return data_path, adapter_path @@ -70,122 +85,60 @@ def format_results(query_results, key: str): # Comparative query - return under key return {key: formatted_results}, False # False = continue aggregating -def parse_partition_values(file_paths: List[str], partition_key: str) -> List[str]: - """Extract unique partition values from Hive-style file paths. - - Args: - file_paths: List of Hive-partitioned paths like ["geo=US/week=2025-03-03/data.parquet"] - partition_key: Partition key to extract (e.g., "week", "month", "date") - - Returns: - Sorted list of unique partition values +def compute_partition_starts(start_date: str, end_date: str, granularity: str) -> List[str]: + """Compute partition start dates covering [start_date, end_date] using date arithmetic. - Example: - >>> parse_partition_values(["geo=US/week=2025-03-03/data.parquet"], "week") - ["2025-03-03"] - """ - pattern = re.compile(f"{partition_key}=([^/]+)") - values = set() - for path in file_paths: - match = pattern.search(path) - if match: - values.add(match.group(1)) - return sorted(values) - -def filter_paths_by_partitions( - file_paths: List[str], - partition_values: List[str], - partition_key: str -) -> List[str]: - """Filter file paths to only include those matching the given partition values. + Assumes no gaps in the data. For weekly/monthly granularities, snaps start_date + back to the nearest partition boundary so that partial periods at the edges are included. Args: - file_paths: List of Hive-partitioned file paths - partition_values: List of partition values to keep (e.g., ["2024-11-01", "2024-11-02"]) - partition_key: Partition key to match (e.g., "date", "week", "month") - - Returns: - Filtered list of file paths - - Example: - >>> filter_paths_by_partitions( - ... ["geo=US/date=2024-11-01/data.parquet", "geo=US/date=2024-10-01/data.parquet"], - ... ["2024-11-01"], - ... "date" - ... ) - ["geo=US/date=2024-11-01/data.parquet"] - """ - filtered = [] - pattern = re.compile(f"{partition_key}=([^/]+)") - - for path in file_paths: - match = pattern.search(path) - if match and match.group(1) in partition_values: - filtered.append(path) - - return filtered - - -def find_overlapping_partitions( - user_start_date: str, - user_end_date: str, - partition_values: List[str], - granularity: str -) -> List[str]: - """Find partition values that overlap with user's date range. - - Args: - user_start_date: Start date in ISO format (YYYY-MM-DD) - user_end_date: End date in ISO format (YYYY-MM-DD) - partition_values: Available partition values from parse_partition_values() + start_date: Start of the date range (YYYY-MM-DD). Should be a partition boundary + for top-ngrams queries; will be snapped for server-computed windows. + end_date: End of the date range (YYYY-MM-DD) granularity: One of "daily", "weekly", "monthly" Returns: - List of partition values that overlap with the date range + Sorted list of partition start dates Example: - >>> find_overlapping_partitions("2025-03-05", "2025-03-12", - ... ["2025-03-03", "2025-03-10"], "weekly") - ["2025-03-03", "2025-03-10"] + >>> compute_partition_starts("2024-10-07", "2024-10-27", "weekly") + ["2024-10-07", "2024-10-14", "2024-10-21"] """ - overlapping = [] + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + if granularity == "daily": + current = start + elif granularity == "weekly": + # Snap back to the Monday of the week containing start_date + current = start - timedelta(days=start.weekday()) + elif granularity == "monthly": + # Snap back to the first of the month containing start_date + current = start.replace(day=1) + else: + raise ValueError(f"Unknown granularity: {granularity}") - for partition_value in partition_values: + partitions = [] + while current <= end: + partitions.append(current.strftime("%Y-%m-%d")) if granularity == "daily": - # Partition value is a date like "2025-03-05" - if user_start_date <= partition_value <= user_end_date: - overlapping.append(partition_value) - + current += timedelta(days=1) elif granularity == "weekly": - # Partition value is week start (Monday) like "2025-03-03" - week_start = partition_value - week_end = (datetime.strptime(week_start, "%Y-%m-%d") + timedelta(days=6)).strftime("%Y-%m-%d") - # Include week if it overlaps: week_start <= user_end AND week_end >= user_start - if week_start <= user_end_date and week_end >= user_start_date: - overlapping.append(partition_value) - + current += timedelta(weeks=1) elif granularity == "monthly": - # Partition value is month start like "2025-03-01" - month_start = partition_value - # Calculate last day of month - month_dt = datetime.strptime(month_start, "%Y-%m-%d") - if month_dt.month == 12: - next_month = month_dt.replace(year=month_dt.year + 1, month=1, day=1) + if current.month == 12: + current = current.replace(year=current.year + 1, month=1, day=1) else: - next_month = month_dt.replace(month=month_dt.month + 1, day=1) - month_end = (next_month - timedelta(days=1)).strftime("%Y-%m-%d") - # Include month if it overlaps - if month_start <= user_end_date and month_end >= user_start_date: - overlapping.append(partition_value) + current = current.replace(month=current.month + 1, day=1) - return overlapping + return partitions class EntityMappingConfig(BaseModel): - """Configuration for entity mapping table""" - table: str = Field(..., description="Name of the adapter table containing entity mappings") - local_id_column: str = Field(..., description="Column name for local identifiers") - entity_id_column: str = Field(..., description="Column name for standardized entity identifiers") + table: Optional[str] = Field(None, description="DuckLake table name (ducklake format)") + path: Optional[str] = Field(None, description="Parquet file path (parquet_hive format)") + local_id_column: str = Field(...) + entity_id_column: str = Field(...) class DatalakeCreate(BaseModel): @@ -195,7 +148,8 @@ class DatalakeCreate(BaseModel): description: Optional[str] = None tables_metadata: Optional[Dict] = None ducklake_data_path: Optional[str] = None - schema: Optional[Dict[str, str]] = None # Column name -> type mapping + data_schema: Optional[Dict[str, str]] = None # Column name -> type mapping + partitioning: Optional[Dict] = None # Partitioning metadata entity_mapping: Optional[EntityMappingConfig] = None sources: Optional[Dict[str, Dict[str, Union[str, List[str]]]]] = None @@ -243,7 +197,8 @@ async def register_datalake( existing_datalake.description = datalake.description existing_datalake.tables_metadata = datalake.tables_metadata existing_datalake.ducklake_data_path = datalake.ducklake_data_path - existing_datalake.schema = datalake.schema + existing_datalake.data_schema = datalake.data_schema + existing_datalake.partitioning = datalake.partitioning existing_datalake.entity_mapping = datalake.entity_mapping.model_dump() if datalake.entity_mapping else None existing_datalake.sources = datalake.sources @@ -257,12 +212,6 @@ async def register_datalake( # Create new datalake entry datalake_data = datalake.model_dump() - # Convert Pydantic models to dicts for JSON storage - if datalake_data.get('entity_mapping'): - datalake_data['entity_mapping'] = datalake_data['entity_mapping'] - if datalake_data.get('dimensions'): - datalake_data['dimensions'] = datalake_data['dimensions'] - db_datalake = Datalake(**datalake_data) db.add(db_datalake) await db.commit() @@ -275,7 +224,7 @@ async def register_datalake( "data_location": db_datalake.data_location, "data_format": db_datalake.data_format, "description": db_datalake.description, - "schema": db_datalake.schema, + "data_schema": db_datalake.data_schema, "ducklake_data_path": db_datalake.ducklake_data_path } } @@ -306,7 +255,8 @@ async def get_datalake_info( "description": datalake.description, # "tables_metadata": datalake.tables_metadata, "ducklake_data_path": datalake.ducklake_data_path, - "schema": datalake.schema, + "data_schema": datalake.data_schema, + "partitioning": datalake.partitioning, "entity_mapping": datalake.entity_mapping, "sources": datalake.sources, "created_at": datalake.created_at, @@ -375,11 +325,6 @@ async def get_adapter_info( except Exception as e: raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") - finally: - try: - duckdb_client.close() - except: - pass @router.get("/babynames/top-ngrams") async def get_babynames_top_ngrams( @@ -474,8 +419,8 @@ async def get_babynames_top_ngrams( results = {} # Query for each combination of date ranges and locations - for i, date_range in enumerate(date_ranges): - for j, location in enumerate(location_list): + for date_range in date_ranges: + for location in location_list: # Create key for result structure if len(date_ranges) > 1: # Temporal comparison: use readable date format @@ -518,13 +463,13 @@ async def get_babynames_top_ngrams( if key == "data": # Simple single query - return flat array for backwards compatibility formatted_results = [] - for i, row in enumerate(query_results): + for row in query_results: formatted_results.append({"types": row[0], "counts": row[1]}) return formatted_results else: # Comparative query - return array directly under the key formatted_results = [] - for i, row in enumerate(query_results): + for row in query_results: formatted_results.append({"types": row[0], "counts": row[1]}) results[key] = formatted_results except Exception as format_error: @@ -537,11 +482,6 @@ async def get_babynames_top_ngrams( except Exception as e: raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") - finally: - try: - duckdb_client.close() - except: - pass @router.get("/wikigrams/top-ngrams") async def get_wikigrams_top_ngrams( @@ -631,67 +571,41 @@ async def get_wikigrams_top_ngrams( # Get file paths using helper function wikigrams_path, adapter_path = get_parquet_paths(datalake, table_name) - # Parse available partition values from file paths - available_partitions = parse_partition_values( - datalake.tables_metadata[table_name], - time_column - ) - - print(f"🔍 DEBUG: Available partitions ({len(available_partitions)} total): {available_partitions[:10] if len(available_partitions) > 10 else available_partitions}...") - # Execute comparative queries results = {} queried_partitions_metadata = [] # Query for each combination of date ranges and locations - for i, date_range in enumerate(date_ranges): - for j, location in enumerate(location_list): - print(f"🔍 DEBUG: User requested date range: {date_range[0]} to {date_range[1]}") - - # Find partitions that overlap with this date range - overlapping_partitions = find_overlapping_partitions( - date_range[0], - date_range[1], - available_partitions, - granularity - ) - - print(f"🔍 DEBUG: Overlapping partitions found: {overlapping_partitions}") - - if not overlapping_partitions: + for date_range in date_ranges: + for location in location_list: + # Resolve entity_id → local_id (geo name) via adapter + adapter_row = conn.execute( + "SELECT local_id FROM read_parquet(?) WHERE entity_id = ? LIMIT 1", + [adapter_path, location] + ).fetchone() + if not adapter_row: + raise HTTPException(status_code=400, detail=f"Location '{location}' not found in adapter") + local_geo = quote(adapter_row[0], safe='') + + # Compute partition directories by stepping through the date range + partition_starts = compute_partition_starts(date_range[0], date_range[1], granularity) + + # Filter paths to partition directories and geo + filtered_wikigrams_path = [ + p for p in wikigrams_path + if any(f"{time_column}={ps}" in p for ps in partition_starts) + and f"geo={local_geo}" in p + ] + + if not filtered_wikigrams_path: raise HTTPException( status_code=400, - detail=f"No {granularity} data available for date range {date_range[0]} to {date_range[1]}" + detail=f"No {granularity} data found for {date_range[0]} to {date_range[1]}" ) - # Filter file paths to only include the overlapping partitions - filtered_wikigrams_path = filter_paths_by_partitions( - wikigrams_path, - overlapping_partitions, - time_column - ) - - # Map entity_id to local geo name for file path filtering - # This is a hardcoded mapping - ideally we'd query the adapter table - entity_to_geo = { - "wikidata:Q30": "United%20States", - "wikidata:Q145": "United%20Kingdom", - "wikidata:Q16": "Canada", - "wikidata:Q408": "Australia" - } - - local_geo = entity_to_geo.get(location) - if local_geo: - # Further filter by geo partition - filtered_wikigrams_path = [ - path for path in filtered_wikigrams_path - if f"geo={local_geo}" in path - ] - print(f"🔍 DEBUG: Filtered to {len(filtered_wikigrams_path)} files for geo={local_geo}") - queried_partitions_metadata.append({ "date_range": date_range, - "partitions": overlapping_partitions + "partitions": partition_starts }) # Create key for result structure @@ -707,24 +621,20 @@ async def get_wikigrams_top_ngrams( else: key = "data" # Single query, return simple format - # Build IN clause for partition filtering - partition_placeholders = ",".join(["?" for _ in overlapping_partitions]) - sql_query = f""" SELECT w.types, SUM(w.counts) as counts FROM read_parquet(?) w LEFT JOIN read_parquet(?) a ON w.geo = a.local_id - WHERE w.{time_column} IN ({partition_placeholders}) + WHERE w.{time_column} BETWEEN ? AND ? AND a.entity_id = ? GROUP BY w.types ORDER BY counts DESC LIMIT ? """ - # Build parameter list: [filtered_wikigrams_path, adapter_path, ...partition_values, location, limit] - params = [filtered_wikigrams_path, adapter_path] + overlapping_partitions + [location, limit] + params = [filtered_wikigrams_path, adapter_path, date_range[0], date_range[1], location, limit] cursor = conn.execute(sql_query, params) query_results = cursor.fetchall() @@ -735,7 +645,7 @@ async def get_wikigrams_top_ngrams( if key == "data": # Simple single query - return with metadata formatted_results = [] - for i, row in enumerate(query_results): + for row in query_results: formatted_results.append({"types": row[0], "counts": row[1]}) return { "data": formatted_results, @@ -743,13 +653,13 @@ async def get_wikigrams_top_ngrams( "granularity": granularity, "table_used": table_name, "time_column": time_column, - "queried_partitions": overlapping_partitions + "queried_partitions": partition_starts } } else: # Comparative query - return array directly under the key formatted_results = [] - for i, row in enumerate(query_results): + for row in query_results: formatted_results.append({"types": row[0], "counts": row[1]}) results[key] = formatted_results except Exception as format_error: @@ -770,11 +680,170 @@ async def get_wikigrams_top_ngrams( except Exception as e: raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") - finally: + +@router.get("/search-term/{term}") +async def search_term( + term: str, + location: str = Query("wikidata:Q30", description="Location entity ID (e.g., wikidata:Q30 for United States)"), + date: Optional[str] = Query(None, description="Optional date filter (YYYY-MM-DD)"), + granularity: str = Query("daily", description="Granularity: daily, weekly, monthly"), + window_size: int = Query(7, description="Number of granularity periods before/after the chosen date for spark plot data (e.g. 7 = 7 days for daily, 7 weeks for weekly, 7 months for monthly)"), + db: AsyncSession = Depends(get_db_session) +): + """ + Search for a specific ngram term in the wikigrams datalake. + + Args: + term: The ngram term to search for + location: Location entity ID (default: wikidata:Q30 for United States) + date: Optional date filter in YYYY-MM-DD format. If omitted, returns all dates. + granularity: Partition granularity - daily, weekly, or monthly (default: daily) + window_days: Days before/after the chosen date for spark plot rank data (default: 7) + + Returns: + Dictionary containing termData, sparkData (rank over window), and query duration + """ + if granularity not in ["daily", "weekly", "monthly"]: + raise HTTPException(status_code=400, detail="granularity must be one of: daily, weekly, monthly") + + if date: try: - duckdb_client.close() - except: - pass + datetime.fromisoformat(date) + except ValueError as e: + raise HTTPException(status_code=400, detail=f"Invalid date format. Use YYYY-MM-DD: {e}") + + # Look up wikigrams datalake + query = select(Datalake).where(Datalake.dataset_id == "wikigrams") + result = await db.execute(query) + datalake = result.scalar_one_or_none() + + if not datalake: + raise HTTPException(status_code=404, detail="Wikigrams datalake not found") + + granularity_mapping = { + "daily": ("wikigrams", "date"), + "weekly": ("wikigrams_weekly", "week"), + "monthly": ("wikigrams_monthly", "month") + } + table_name, time_column = granularity_mapping[granularity] + + try: + duckdb_client = get_duckdb_client() + conn = duckdb_client.connect() + + if not datalake.tables_metadata: + raise HTTPException(status_code=500, detail="Datalake metadata is missing.") + + if table_name not in datalake.tables_metadata: + available = [k for k in datalake.tables_metadata.keys() if k.startswith("wikigrams")] + raise HTTPException( + status_code=400, + detail=f"Table '{table_name}' not found. Available: {available}." + ) + + wikigrams_path, adapter_path = get_parquet_paths(datalake, table_name) + + # Resolve entity_id → local_id (geo name) via adapter + adapter_row = conn.execute( + "SELECT local_id FROM read_parquet(?) WHERE entity_id = ? LIMIT 1", + [adapter_path, location] + ).fetchone() + if not adapter_row: + raise HTTPException(status_code=400, detail=f"Location '{location}' not found in adapter") + local_geo = quote(adapter_row[0], safe='') + wikigrams_path = [p for p in wikigrams_path if f"geo={local_geo}" in p] + + # When a date is given, compute the window partition directories arithmetically. + # Use the wider window paths for both queries so we only open parquet files once. + window_start = window_end = None + partition_date = None # date snapped to the partition boundary for time_column filter + if date: + focus_date = datetime.fromisoformat(date) + # Scale window to match the granularity unit + window_unit_days = {"daily": 1, "weekly": 7, "monthly": 30}[granularity] + effective_window = window_size * window_unit_days + window_start = (focus_date - timedelta(days=effective_window)).strftime("%Y-%m-%d") + window_end = (focus_date + timedelta(days=effective_window)).strftime("%Y-%m-%d") + window_partitions = compute_partition_starts(window_start, window_end, granularity) + # Snap focus date to partition boundary (e.g. Tuesday → Monday for weekly) + partition_date = compute_partition_starts(date, date, granularity)[0] + query_paths = [ + p for p in wikigrams_path + if any(f"{time_column}={ps}" in p for ps in window_partitions) + ] + else: + query_paths = wikigrams_path + + if not query_paths: + raise HTTPException(status_code=404, detail="No data files found for the given filters") + + start_time = time.time() + + # --- Main query: counts for the specific date (or all dates) --- + sql_query = f""" + SELECT + w.types, + w.{time_column}, + w.geo, + SUM(w.counts) as counts + FROM read_parquet(?) w + LEFT JOIN read_parquet(?) a ON w.geo = a.local_id + WHERE w.types = ? + AND a.entity_id = ? + """ + params = [query_paths, adapter_path, term, location] + + if partition_date: + sql_query += f" AND w.{time_column} = ?" + params.append(partition_date) + + sql_query += f" GROUP BY w.types, w.{time_column}, w.geo ORDER BY w.{time_column}" + + cursor = conn.execute(sql_query, params) + query_results = cursor.fetchall() + columns = [desc[0] for desc in cursor.description] + + if not query_results: + raise HTTPException(status_code=404, detail="Search term not found") + + term_data = [dict(zip(columns, row)) for row in query_results] + + # --- Spark query: rank of this term for each date in the window --- + spark_data = [] + if partition_date and window_start and window_end: + # query_paths is already filtered to the correct geo (lines above filter by local_geo), + # so no adapter JOIN is needed here — avoids a full adapter parquet scan per row. + spark_sql = f""" + SELECT + w.{time_column}, + MIN(w.rank) AS rank, + SUM(w.counts) AS counts + FROM read_parquet(?) w + WHERE w.{time_column} BETWEEN ? AND ? + AND w.types = ? + GROUP BY w.{time_column} + ORDER BY w.{time_column} + """ + spark_params = [query_paths, window_partitions[0], window_partitions[-1], term] + spark_cursor = conn.execute(spark_sql, spark_params) + spark_results = spark_cursor.fetchall() + spark_cols = [desc[0] for desc in spark_cursor.description] + spark_data = [dict(zip(spark_cols, row)) for row in spark_results] + + duration = (time.time() - start_time) * 1000 + print(f"searchTerm query took {duration:.2f}ms") + + return { + "termData": term_data, + "sparkData": spark_data, + "duration": duration + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Query execution failed: {str(e)}") + @router.get("/{dataset_id}/validate-sources") async def validate_datalake_sources(