Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions backend/app/core/parquet_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
General-purpose DuckDB / Hive-parquet utilities shared across routers.

These helpers are dataset-agnostic — they work with any parquet_hive or
ducklake-format dataset registered in the system.
"""

from datetime import datetime, timedelta
from typing import List, Tuple
from fastapi import HTTPException


def get_parquet_paths(dataset, data_table_name: str) -> Tuple[List[str], List[str]]:
"""Construct parquet file paths for data table and adapter table.

For parquet_hive format: paths in tables_metadata are absolute; adapter comes from entity_mapping.path.
For ducklake format: paths are relative filenames; prepend data_location + /main/ + table_name/.
"""
if not dataset.tables_metadata:
raise HTTPException(
status_code=500,
detail="Dataset metadata is missing. Please re-register the dataset with proper tables_metadata."
)

data_fnames = dataset.tables_metadata.get(data_table_name)

if not data_fnames:
raise HTTPException(
status_code=500,
detail=f"Missing {data_table_name} file paths. Required: tables_metadata.{data_table_name}"
)

if dataset.data_format == "parquet_hive":
data_path = data_fnames
if not dataset.entity_mapping or not dataset.entity_mapping.get("path"):
raise HTTPException(
status_code=500,
detail="Missing entity_mapping.path for parquet_hive format. Please re-register with entity_mapping."
)
adapter_path = [dataset.entity_mapping["path"]]
else:
# ducklake format: relative filenames, prepend data_location
# NOTE: Don't URL-decode - the filesystem actually has %20 in directory names
adapter_fnames = dataset.tables_metadata.get("adapter")
if not adapter_fnames:
raise HTTPException(
status_code=500,
detail="Missing adapter file paths. Required: tables_metadata.adapter"
)
data_path = [
f"{dataset.data_location}/main/{data_table_name}/{fname}"
for fname in data_fnames
]
adapter_path = [
f"{dataset.data_location}/main/adapter/{fname}"
for fname in adapter_fnames
]

return data_path, adapter_path


def compute_partition_starts(start_date: str, end_date: str, granularity: str) -> List[str]:
"""Compute partition start dates covering [start_date, end_date] using date arithmetic.

Assumes no gaps in the data. For weekly/monthly granularities, snaps start_date
back to the nearest partition boundary so that partial periods at the edges are included.

Args:
start_date: Start of the date range (YYYY-MM-DD).
end_date: End of the date range (YYYY-MM-DD)
granularity: One of "daily", "weekly", "monthly"

Returns:
Sorted list of partition start dates

Example:
>>> compute_partition_starts("2024-10-07", "2024-10-27", "weekly")
["2024-10-07", "2024-10-14", "2024-10-21"]
"""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")

if granularity == "daily":
current = start
elif granularity == "weekly":
current = start - timedelta(days=start.weekday())
elif granularity == "monthly":
current = start.replace(day=1)
else:
raise ValueError(f"Unknown granularity: {granularity}")

partitions = []
while current <= end:
partitions.append(current.strftime("%Y-%m-%d"))
if granularity == "daily":
current += timedelta(days=1)
elif granularity == "weekly":
current += timedelta(weeks=1)
elif granularity == "monthly":
if current.month == 12:
current = current.replace(year=current.year + 1, month=1, day=1)
else:
current = current.replace(month=current.month + 1, day=1)

return partitions
6 changes: 4 additions & 2 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
limiter = Limiter(key_func=get_remote_address)

# Import routers after limiter is defined so they can use it
from .routers import open_academic_analytics, datasets, auth, wikimedia, annotations, dark_data_survey, scisciDB, datalakes, interdisciplinarity, storywrangler
from .routers import open_academic_analytics, datasets, auth, wikimedia, annotations, dark_data_survey, scisciDB, datalakes, interdisciplinarity, storywrangler, babynames, registry

app = FastAPI(
title=settings.app_name,
Expand Down Expand Up @@ -83,7 +83,9 @@ async def shutdown_event():
app.include_router(auth.router, prefix="/auth", tags=["authentication"], include_in_schema=False)
app.include_router(open_academic_analytics.router, prefix="/open-academic-analytics", tags=["academics"])
app.include_router(datasets.router, prefix="/datasets", tags=["datasets"])
app.include_router(registry.router, prefix="/registry", tags=["registry"])
app.include_router(datalakes.router, prefix="/datalakes", tags=["datalakes"])
app.include_router(babynames.router, prefix="/babynames", tags=["babynames"])
app.include_router(wikimedia.router, prefix="/wikimedia", tags=["wikimedia"])
app.include_router(annotations.router, prefix="/annotations", tags=["annotations"])
app.include_router(scisciDB.router, prefix="/scisciDB", tags=["scisciDB"])
Expand All @@ -95,8 +97,8 @@ async def shutdown_event():
# Admin endpoints (secured with admin authentication)
app.include_router(auth.admin_router, prefix="/admin/auth", tags=["admin"], include_in_schema=False)
app.include_router(datasets.admin_router, prefix="/admin/datasets", tags=["admin"], include_in_schema=False)
app.include_router(registry.admin_router, prefix="/admin/registry", tags=["admin"], include_in_schema=False)
app.include_router(scisciDB.admin_router, prefix="/admin/scisciDB", tags=["admin"], include_in_schema=False)
app.include_router(datalakes.admin_router, prefix="/admin/datalakes", tags=["admin"], include_in_schema=False)
app.include_router(open_academic_analytics.admin_router, prefix="/admin/open-academic-analytics", tags=["admin"], include_in_schema=False)
app.include_router(annotations.admin_router, prefix="/admin/annotations", tags=["admin"], include_in_schema=False)

Expand Down
4 changes: 2 additions & 2 deletions backend/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .annotation_datasets import AcademicResearchGroups
from .auth import User, AnnotationHistory
from .sciscidb import FieldYearCount
from .datalakes import Datalake
from .registry import Dataset
from .interdisciplinarity import PaperAnnotation
from .cached_papers import CachedPaper

Expand All @@ -21,4 +21,4 @@
"PaperAnnotation",
"CachedPaper",
"FieldYearCount",
"Datalake"]
"Dataset"]
46 changes: 0 additions & 46 deletions backend/app/models/datalakes.py

This file was deleted.

82 changes: 82 additions & 0 deletions backend/app/models/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
Generic dataset registry.

A Dataset is any external data backend registered with the platform.

Supported formats:
- ducklake : DuckLake catalog (tabular, versioned, schema-aware)
- parquet_hive : Hive-partitioned parquet tree (tabular, file-based)
- duckdb : Plain DuckDB database file (no tables/schema metadata needed)

Primary key is (domain, dataset_id) so the same logical name (e.g. "ngrams") can
exist in multiple domains without collision.

Common fields (all formats):
domain, dataset_id, data_location, data_format, description,
entity_mapping, sources, created_at, updated_at.

Structured-data fields (ducklake, parquet_hive):
tables_metadata, partitioning.

DuckLake-specific:
ducklake_data_path, data_schema.
"""

from sqlalchemy import Column, String, DateTime, Text, JSON, ForeignKeyConstraint
from sqlalchemy.sql import func
from ..core.database import Base


class Dataset(Base):
"""Registry of available datasets across all supported backends."""

__tablename__ = "datasets"

domain = Column(String, primary_key=True) # owning domain: wikimedia | storywrangler | ...
dataset_id = Column(String, primary_key=True) # name within domain: ngrams | revisions | ...
data_location = Column(String, nullable=False) # base path or connection string
data_format = Column(String, nullable=False) # ducklake | parquet_hive | duckdb
description = Column(Text)

# Structured/tabular formats (ducklake, parquet_hive)
tables_metadata = Column(JSON) # table_name -> [file_paths] or version info
partitioning = Column(JSON) # partitioning keys and scheme

# DuckLake-specific
ducklake_data_path = Column(String) # path to ducklake catalog (.duckdb file)
data_schema = Column(JSON) # column_name -> type, for query reference

# Shared optional metadata
entity_mapping = Column(JSON) # {path, local_id_column, entity_id_column}
sources = Column(JSON) # source URLs for validation
endpoint_schemas = Column(JSON) # [{type, time_dimension, entity_dimensions, filter_dimensions}]

created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

def __repr__(self):
return f"<Dataset(domain='{self.domain}', id='{self.dataset_id}', format='{self.data_format}')>"


class EntityMapping(Base):
"""Entity mappings for datasets."""

__tablename__ = "entity_mappings"
__table_args__ = (
ForeignKeyConstraint(
["domain", "dataset_id"],
["datasets.domain", "datasets.dataset_id"],
),
)

id = Column(String, primary_key=True) # domain:dataset_id:local_id
domain = Column(String, nullable=False)
dataset_id = Column(String, nullable=False)
local_id = Column(String, nullable=False)
entity_id = Column(String, nullable=False) # standardized identifier
entity_name = Column(String, nullable=False)
entity_ids = Column(JSON) # alternate identifiers
created_at = Column(DateTime(timezone=True), server_default=func.now())

def __repr__(self):
return f"<EntityMapping(local='{self.local_id}', entity='{self.entity_id}')>"
Loading
Loading