Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions builders/server/core/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ def build(
dataset_version: str,
start: str = Query(...),
end: str = Query(...),
dry_run: bool = Query(False, alias="dry-run"),
):
"""Build missing data for a dataset in the given time range."""
"""Build missing data for a dataset in the given time range.

With ``dry-run=true``, builders write to an in-memory store, not the database.
"""
try:
version = SemVer.parse(dataset_version)
except ValueError as exc:
Expand All @@ -59,18 +63,32 @@ def build(
) from exc

structlog.contextvars.bind_contextvars(
dataset_name=dataset_name, version=str(version)
dataset_name=dataset_name, version=str(version), dry_run=dry_run
)

try:
build_dataset(dataset_name, version, start_ts, end_ts)
produced = build_dataset(
dataset_name, version, start_ts, end_ts, dry_run=dry_run
)
except NoValidTimestampsError as e:
raise HTTPException(status_code=422, detail=str(e)) from e
except Exception as e:
logger.exception("build failed")
raise HTTPException(status_code=500, detail=str(e)) from e

return {"status": "ok"}
if not dry_run:
return {"status": "ok"}

rows = [
{"timestamp": ts.isoformat(), "data": data_list}
for ts, data_list in sorted((produced or {}).items())
]
return {
"dataset_name": dataset_name,
"dataset_version": str(version),
"dry_run": True,
"rows": rows,
}


@router.get("/data/{dataset_name}/{dataset_version}")
Expand Down
13 changes: 11 additions & 2 deletions builders/server/core/service/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import core.db.datasets
from core.runtime import registry
from core.service.orchestrator import run_build
from core.service.store import MemoryStore, PostgresStore
from core.service.timestamps import NoValidTimestampsError, generate_timestamps
from core.utils.semver import SemVer

Expand Down Expand Up @@ -35,9 +36,17 @@ def build_dataset(
dataset_version: SemVer,
start: datetime,
end: datetime,
) -> None:
*,
dry_run: bool = False,
) -> dict[datetime, list[dict]] | None:
"""Public entrypoint for building a dataset and its dependencies."""
run_build(dataset_name, dataset_version, start, end)
if not dry_run:
run_build(dataset_name, dataset_version, start, end, store=PostgresStore())
return None

store = MemoryStore()
run_build(dataset_name, dataset_version, start, end, store=store)
return store.get_rows_range(dataset_name, dataset_version, start, end)


def get_data(
Expand Down
8 changes: 7 additions & 1 deletion builders/server/core/service/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import structlog

from core.service.scheduler import schedule_build
from core.service.store import PostgresStore, Store
from core.service.worker import execute_job
from core.utils.semver import SemVer

Expand All @@ -32,6 +33,7 @@ def run_build(
dataset_version: SemVer,
start: datetime,
end: datetime,
store: Store | None = None,
) -> None:
"""Orchestrate a full build: schedule, then execute level by level.

Expand All @@ -50,12 +52,16 @@ def run_build(
dataset_version: version of the root dataset
start: requested build start time
end: requested build end time
store: data backend threaded down to each worker. defaults to
``PostgresStore`` for real builds, ``MemoryStore`` for dry runs.

Raises:
ValueError: if end < start_date for any dataset (from scheduler)
RuntimeError: if any job fails during execution
NoValidTimestampsError: if a job has no valid calendar timestamps
"""
if store is None:
store = PostgresStore()
plan = schedule_build(dataset_name, dataset_version, start, end)
cancelled = threading.Event()

Expand All @@ -75,7 +81,7 @@ def run_build(
)

for job in jobs:
result = execute_job(job, cancelled)
result = execute_job(job, cancelled, store)

if not result.success:
cancelled.set()
Expand Down
176 changes: 176 additions & 0 deletions builders/server/core/service/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Data store abstraction for the build path.

``Store`` puts the build path's data operations behind an ABC with two implementations:

- ``PostgresStore``: a thin shell over real build operations.
- ``MemoryStore``: an in-process dict, unrelated to the DB.

Workers hold a ``store`` instead of calling the DB directly,
"""

import json
from abc import ABC, abstractmethod
from collections import defaultdict
from contextlib import AbstractContextManager, nullcontext
from datetime import datetime

import core.db.datasets
from core.service.locks import get_build_lock
from core.utils.semver import SemVer


class Store(ABC):
"""Interface for the build path's data operations."""

@abstractmethod
def get_existing_timestamps(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> list[datetime]:
"""Return distinct timestamps in [start, end] that already have rows."""
...

@abstractmethod
def get_rows_range(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> dict[datetime, list[dict]]:
"""Return rows for [start, end], keyed by timestamp."""
...

@abstractmethod
def get_rows_timestamps(
self,
name: str,
version: SemVer,
timestamps: list[datetime],
) -> dict[datetime, list[dict]]:
"""Return rows for specific timestamps, keyed by timestamp."""
...

@abstractmethod
def insert_rows(
self,
name: str,
version: SemVer,
rows: list[tuple[datetime, list[dict]]],
) -> None:
"""Insert (timestamp, list[dict]) rows for a dataset."""
...

@abstractmethod
def build_lock(self, name: str, version: SemVer) -> AbstractContextManager:
"""Return the critical-section lock for this dataset's build."""
...


class PostgresStore(Store):
"""Real-build implementation that hits the DB"""

def get_existing_timestamps(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> list[datetime]:
return core.db.datasets.get_existing_timestamps(name, version, start, end)

def get_rows_range(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> dict[datetime, list[dict]]:
return core.db.datasets.get_rows_range(name, version, start, end)

def get_rows_timestamps(
self,
name: str,
version: SemVer,
timestamps: list[datetime],
) -> dict[datetime, list[dict]]:
return core.db.datasets.get_rows_timestamps(name, version, timestamps)

def insert_rows(
self,
name: str,
version: SemVer,
rows: list[tuple[datetime, list[dict]]],
) -> None:
core.db.datasets.insert_rows(name, version, rows)

def build_lock(self, name: str, version: SemVer) -> AbstractContextManager:
return get_build_lock(name, str(version))


class MemoryStore(Store):
"""Dry-run implementation: holds produced rows in a dict, never hits the DB."""

def __init__(self) -> None:
self._data: dict[tuple[str, str], dict[datetime, list[dict]]] = defaultdict(
lambda: defaultdict(list)
)

def get_existing_timestamps(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> list[datetime]:
table = self._data.get((name, str(version)), {})
return sorted(ts for ts, rows in table.items() if rows and start <= ts <= end)

def get_rows_range(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> dict[datetime, list[dict]]:
table = self._data.get((name, str(version)), {})
return {
ts: list(table[ts])
for ts in sorted(table)
if table[ts] and start <= ts <= end
}

def get_rows_timestamps(
self,
name: str,
version: SemVer,
timestamps: list[datetime],
) -> dict[datetime, list[dict]]:
if not timestamps:
return {}
table = self._data.get((name, str(version)), {})
wanted = set(timestamps)
return {
ts: list(table[ts]) for ts in sorted(table) if table[ts] and ts in wanted
}

def insert_rows(
self,
name: str,
version: SemVer,
rows: list[tuple[datetime, list[dict]]],
) -> None:
if not rows:
return
table = self._data[(name, str(version))]
for ts, data_list in rows:
for data in data_list:
# json round-trip to mirror Postgres Jsonb serialization
table[ts].append(json.loads(json.dumps(data)))

def build_lock(self, name: str, version: SemVer) -> AbstractContextManager:
"""Stub to prevent interference with real build lock"""
return nullcontext()
Loading