From e008b58d192f2fbcb42245f8665108eba4cc3752 Mon Sep 17 00:00:00 2001 From: Scr4tch587 Date: Mon, 22 Jun 2026 19:03:21 -0400 Subject: [PATCH] feat: add Store abstraction for build data access Co-Authored-By: Claude Opus 4.8 --- builders/server/core/service/orchestrator.py | 8 +- builders/server/core/service/store.py | 176 ++++++++++++++++ builders/server/core/service/worker.py | 29 +-- .../tests/core/service/test_orchestrator.py | 2 +- .../server/tests/core/service/test_store.py | 197 ++++++++++++++++++ 5 files changed, 398 insertions(+), 14 deletions(-) create mode 100644 builders/server/core/service/store.py create mode 100644 builders/server/tests/core/service/test_store.py diff --git a/builders/server/core/service/orchestrator.py b/builders/server/core/service/orchestrator.py index 57e8b7d..659f093 100644 --- a/builders/server/core/service/orchestrator.py +++ b/builders/server/core/service/orchestrator.py @@ -21,6 +21,7 @@ import structlog from core.service.scheduler import schedule_build +from core.service.store import PostgresStore, Store from core.service.worker import execute_job from core.utils.semver import SemVer @@ -32,6 +33,7 @@ def run_build( dataset_version: SemVer, start: datetime, end: datetime, + store: Store | None = None, ) -> None: """Orchestrate a full build: schedule, then execute level by level. @@ -50,12 +52,16 @@ def run_build( dataset_version: version of the root dataset start: requested build start time end: requested build end time + store: data backend threaded down to each worker. defaults to + ``PostgresStore`` for real builds, ``MemoryStore`` for dry runs. Raises: ValueError: if end < start_date for any dataset (from scheduler) RuntimeError: if any job fails during execution NoValidTimestampsError: if a job has no valid calendar timestamps """ + if store is None: + store = PostgresStore() plan = schedule_build(dataset_name, dataset_version, start, end) cancelled = threading.Event() @@ -75,7 +81,7 @@ def run_build( ) for job in jobs: - result = execute_job(job, cancelled) + result = execute_job(job, cancelled, store) if not result.success: cancelled.set() diff --git a/builders/server/core/service/store.py b/builders/server/core/service/store.py new file mode 100644 index 0000000..7f69702 --- /dev/null +++ b/builders/server/core/service/store.py @@ -0,0 +1,176 @@ +"""Data store abstraction for the build path. + +``Store`` puts the build path's data operations behind an ABC with two implementations: + +- ``PostgresStore``: a thin shell over real build operations. +- ``MemoryStore``: an in-process dict, unrelated to the DB. + +Workers hold a ``store`` instead of calling the DB directly, +""" + +import json +from abc import ABC, abstractmethod +from collections import defaultdict +from contextlib import AbstractContextManager, nullcontext +from datetime import datetime + +import core.db.datasets +from core.service.locks import get_build_lock +from core.utils.semver import SemVer + + +class Store(ABC): + """Interface for the build path's data operations.""" + + @abstractmethod + def get_existing_timestamps( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> list[datetime]: + """Return distinct timestamps in [start, end] that already have rows.""" + ... + + @abstractmethod + def get_rows_range( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> dict[datetime, list[dict]]: + """Return rows for [start, end], keyed by timestamp.""" + ... + + @abstractmethod + def get_rows_timestamps( + self, + name: str, + version: SemVer, + timestamps: list[datetime], + ) -> dict[datetime, list[dict]]: + """Return rows for specific timestamps, keyed by timestamp.""" + ... + + @abstractmethod + def insert_rows( + self, + name: str, + version: SemVer, + rows: list[tuple[datetime, list[dict]]], + ) -> None: + """Insert (timestamp, list[dict]) rows for a dataset.""" + ... + + @abstractmethod + def build_lock(self, name: str, version: SemVer) -> AbstractContextManager: + """Return the critical-section lock for this dataset's build.""" + ... + + +class PostgresStore(Store): + """Real-build implementation that hits the DB""" + + def get_existing_timestamps( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> list[datetime]: + return core.db.datasets.get_existing_timestamps(name, version, start, end) + + def get_rows_range( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> dict[datetime, list[dict]]: + return core.db.datasets.get_rows_range(name, version, start, end) + + def get_rows_timestamps( + self, + name: str, + version: SemVer, + timestamps: list[datetime], + ) -> dict[datetime, list[dict]]: + return core.db.datasets.get_rows_timestamps(name, version, timestamps) + + def insert_rows( + self, + name: str, + version: SemVer, + rows: list[tuple[datetime, list[dict]]], + ) -> None: + core.db.datasets.insert_rows(name, version, rows) + + def build_lock(self, name: str, version: SemVer) -> AbstractContextManager: + return get_build_lock(name, str(version)) + + +class MemoryStore(Store): + """Dry-run implementation: holds produced rows in a dict, never hits the DB.""" + + def __init__(self) -> None: + self._data: dict[tuple[str, str], dict[datetime, list[dict]]] = defaultdict( + lambda: defaultdict(list) + ) + + def get_existing_timestamps( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> list[datetime]: + table = self._data.get((name, str(version)), {}) + return sorted(ts for ts, rows in table.items() if rows and start <= ts <= end) + + def get_rows_range( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> dict[datetime, list[dict]]: + table = self._data.get((name, str(version)), {}) + return { + ts: list(table[ts]) + for ts in sorted(table) + if table[ts] and start <= ts <= end + } + + def get_rows_timestamps( + self, + name: str, + version: SemVer, + timestamps: list[datetime], + ) -> dict[datetime, list[dict]]: + if not timestamps: + return {} + table = self._data.get((name, str(version)), {}) + wanted = set(timestamps) + return { + ts: list(table[ts]) for ts in sorted(table) if table[ts] and ts in wanted + } + + def insert_rows( + self, + name: str, + version: SemVer, + rows: list[tuple[datetime, list[dict]]], + ) -> None: + if not rows: + return + table = self._data[(name, str(version))] + for ts, data_list in rows: + for data in data_list: + # json round-trip to mirror Postgres Jsonb serialization + table[ts].append(json.loads(json.dumps(data))) + + def build_lock(self, name: str, version: SemVer) -> AbstractContextManager: + """Stub to prevent interference with real build lock""" + return nullcontext() diff --git a/builders/server/core/service/worker.py b/builders/server/core/service/worker.py index 556f694..8bd3730 100644 --- a/builders/server/core/service/worker.py +++ b/builders/server/core/service/worker.py @@ -21,10 +21,9 @@ import structlog -import core.db.datasets from core.runtime import config, registry, runner, validator -from core.service.locks import get_build_lock from core.service.models import JobDescriptor, JobResult +from core.service.store import PostgresStore, Store from core.service.timestamps import NoValidTimestampsError, generate_timestamps logger = structlog.get_logger() @@ -33,6 +32,7 @@ def execute_job( job: JobDescriptor, cancelled: threading.Event, + store: Store | None = None, ) -> JobResult: """Execute a single build job: build missing timestamps for one dataset. @@ -49,12 +49,16 @@ def execute_job( job: describes which dataset to build and over what time range. cancelled: shared event that signals early termination. checked between timestamps so a failed sibling job can stop peers. + store: data interface for reads/writes and the build lock. + ``PostgresStore`` for real builds, ``MemoryStore`` for dry runs. Returns: JobResult indicating success or failure with error detail. """ + if store is None: + store = PostgresStore() try: - _execute(job, cancelled) + _execute(job, cancelled, store) return JobResult(job=job, success=True) except NoValidTimestampsError: # let NoValidTimestampsError propagate so routes.py can return 422 @@ -66,6 +70,7 @@ def execute_job( def _execute( job: JobDescriptor, cancelled: threading.Event, + store: Store, ) -> None: """Inner execution logic. Raises on any failure. @@ -87,10 +92,11 @@ def _execute( ) # acquire per-dataset lock to prevent concurrent builds from racing - # between the "check missing" read and "insert rows" write - with get_build_lock(job.dataset_name, str(job.dataset_version)): + # between the "check missing" read and "insert rows" write. + # dry runs skip the lock entirely + with store.build_lock(job.dataset_name, job.dataset_version): existing = set( - core.db.datasets.get_existing_timestamps( + store.get_existing_timestamps( job.dataset_name, job.dataset_version, job.start, job.end ) ) @@ -134,7 +140,7 @@ def _execute( "build cancelled by failed sibling job" ) - dep_data = _fetch_dep_data(cfg, ts) + dep_data = _fetch_dep_data(cfg, ts, store) result = runner.run_builder( script_dir, cfg.builder, dep_data, ts, env_file=env_file @@ -143,7 +149,7 @@ def _execute( rows.append((ts, result)) # bulk insert -- only reached if all timestamps succeeded - core.db.datasets.insert_rows(job.dataset_name, job.dataset_version, rows) + store.insert_rows(job.dataset_name, job.dataset_version, rows) logger.info( "inserted rows", dataset=job.dataset_name, @@ -155,6 +161,7 @@ def _execute( def _fetch_dep_data( cfg: config.DatasetConfig, ts: datetime, + store: Store, ) -> dict[str, dict[datetime, list[dict]]]: """Fetch dependency data for a single timestamp. @@ -166,16 +173,14 @@ def _fetch_dep_data( for dep_name, dep_info in cfg.dependencies.items(): if dep_info.lookback_subtract is not None: - dep_rows = core.db.datasets.get_rows_range( + dep_rows = store.get_rows_range( dep_name, dep_info.version, ts - dep_info.lookback_subtract, ts, ) else: - dep_rows = core.db.datasets.get_rows_timestamps( - dep_name, dep_info.version, [ts] - ) + dep_rows = store.get_rows_timestamps(dep_name, dep_info.version, [ts]) if not dep_rows: raise RuntimeError( diff --git a/builders/server/tests/core/service/test_orchestrator.py b/builders/server/tests/core/service/test_orchestrator.py index 802af65..7972a13 100644 --- a/builders/server/tests/core/service/test_orchestrator.py +++ b/builders/server/tests/core/service/test_orchestrator.py @@ -66,7 +66,7 @@ def test_failure_stops_subsequent_levels(mock_registry, mock_execute) -> None: mock_registry.get_config.side_effect = lambda name, version: configs[name] # C succeeds, B fails - def mock_exec(job, cancelled): + def mock_exec(job, cancelled, store): if job.dataset_name == "B": return MagicMock(success=False, error="B crashed") return MagicMock(success=True) diff --git a/builders/server/tests/core/service/test_store.py b/builders/server/tests/core/service/test_store.py new file mode 100644 index 0000000..b62f5ff --- /dev/null +++ b/builders/server/tests/core/service/test_store.py @@ -0,0 +1,197 @@ +import threading +from contextlib import nullcontext +from datetime import datetime +from unittest.mock import patch + +import pytest +from core.service.store import MemoryStore, PostgresStore + +from .conftest import V010 + +JAN1 = datetime(2024, 1, 1) +JAN2 = datetime(2024, 1, 2) +JAN3 = datetime(2024, 1, 3) +JAN4 = datetime(2024, 1, 4) + + +# --- MemoryStore: insert + read round-trip --- + + +def test_memory_insert_and_get_rows_range() -> None: + """Inserted rows are read back by range, keyed by timestamp.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}]), (JAN2, [{"v": 2}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN2) + + assert result == {JAN1: [{"v": 1}], JAN2: [{"v": 2}]} + + +def test_memory_get_rows_range_filters_outside_window() -> None: + """get_rows_range excludes timestamps outside [start, end].""" + store = MemoryStore() + store.insert_rows( + "ds", V010, [(JAN1, [{"v": 1}]), (JAN2, [{"v": 2}]), (JAN3, [{"v": 3}])] + ) + + result = store.get_rows_range("ds", V010, JAN2, JAN3) + + assert result == {JAN2: [{"v": 2}], JAN3: [{"v": 3}]} + + +def test_memory_get_rows_range_is_sorted() -> None: + """get_rows_range returns timestamps ascending regardless of insert order.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN3, [{"v": 3}])]) + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + store.insert_rows("ds", V010, [(JAN2, [{"v": 2}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN3) + + assert list(result.keys()) == [JAN1, JAN2, JAN3] + + +def test_memory_multi_row_timestamp() -> None: + """Multiple rows sharing a timestamp accumulate into a list.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"t": "AAPL"}, {"t": "MSFT"}])]) + store.insert_rows("ds", V010, [(JAN1, [{"t": "GOOG"}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN1) + + assert result == {JAN1: [{"t": "AAPL"}, {"t": "MSFT"}, {"t": "GOOG"}]} + + +def test_memory_get_existing_timestamps() -> None: + """get_existing_timestamps returns distinct sorted timestamps in range with data.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}]), (JAN3, [{"v": 3}])]) + + assert store.get_existing_timestamps("ds", V010, JAN1, JAN4) == [JAN1, JAN3] + # out-of-range start clips JAN1 + assert store.get_existing_timestamps("ds", V010, JAN2, JAN4) == [JAN3] + + +def test_memory_get_rows_timestamps_selects_specific() -> None: + """get_rows_timestamps returns only the requested timestamps that have data.""" + store = MemoryStore() + store.insert_rows( + "ds", V010, [(JAN1, [{"v": 1}]), (JAN2, [{"v": 2}]), (JAN3, [{"v": 3}])] + ) + + result = store.get_rows_timestamps("ds", V010, [JAN1, JAN3]) + + assert result == {JAN1: [{"v": 1}], JAN3: [{"v": 3}]} + + +def test_memory_get_rows_timestamps_empty_input() -> None: + """Empty timestamp list returns empty dict (matches Postgres behavior).""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + + assert store.get_rows_timestamps("ds", V010, []) == {} + + +def test_memory_reads_unknown_dataset_return_empty() -> None: + """Reads for a dataset never inserted return empty results, not errors.""" + store = MemoryStore() + + assert store.get_existing_timestamps("ghost", V010, JAN1, JAN4) == [] + assert store.get_rows_range("ghost", V010, JAN1, JAN4) == {} + assert store.get_rows_timestamps("ghost", V010, [JAN1]) == {} + + +def test_memory_isolated_by_name_and_version() -> None: + """Different (name, version) pairs do not bleed into each other.""" + from core.utils.semver import SemVer + + v020 = SemVer.parse("0.2.0") + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + store.insert_rows("ds", v020, [(JAN1, [{"v": 99}])]) + store.insert_rows("other", V010, [(JAN1, [{"v": 7}])]) + + assert store.get_rows_range("ds", V010, JAN1, JAN1) == {JAN1: [{"v": 1}]} + assert store.get_rows_range("ds", v020, JAN1, JAN1) == {JAN1: [{"v": 99}]} + assert store.get_rows_range("other", V010, JAN1, JAN1) == {JAN1: [{"v": 7}]} + + +def test_memory_two_stores_are_independent() -> None: + """Each MemoryStore holds its own data (per-request isolation).""" + a = MemoryStore() + b = MemoryStore() + a.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + + assert a.get_rows_range("ds", V010, JAN1, JAN1) == {JAN1: [{"v": 1}]} + assert b.get_rows_range("ds", V010, JAN1, JAN1) == {} + + +def test_memory_insert_empty_is_noop() -> None: + """Inserting an empty row list does nothing.""" + store = MemoryStore() + store.insert_rows("ds", V010, []) + + assert store.get_rows_range("ds", V010, JAN1, JAN4) == {} + + +def test_memory_insert_rejects_non_serializable() -> None: + """json round-trip rejects non-serializable builder output (Jsonb parity).""" + store = MemoryStore() + + with pytest.raises(TypeError): + store.insert_rows("ds", V010, [(JAN1, [{"bad": {1, 2, 3}}])]) + + +def test_memory_reads_return_copies() -> None: + """Mutating a returned list does not corrupt the store's internal state.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN1) + result[JAN1].append({"v": 999}) + + assert store.get_rows_range("ds", V010, JAN1, JAN1) == {JAN1: [{"v": 1}]} + + +# --- build_lock behavior --- + + +def test_memory_build_lock_is_nullcontext() -> None: + """MemoryStore.build_lock never blocks -- a dry run takes no real lock.""" + store = MemoryStore() + lock = store.build_lock("ds", V010) + assert isinstance(lock, type(nullcontext())) + + +@patch("core.service.store.get_build_lock") +def test_postgres_build_lock_uses_shared_registry(mock_get_lock) -> None: + """PostgresStore.build_lock delegates to the shared per-dataset lock registry.""" + sentinel = threading.Lock() + mock_get_lock.return_value = sentinel + + result = PostgresStore().build_lock("ds", V010) + + mock_get_lock.assert_called_once_with("ds", "0.1.0") + assert result is sentinel + + +# --- PostgresStore forwarding --- + + +@patch("core.db.datasets") +def test_postgres_store_forwards_all_methods(mock_db) -> None: + """Every PostgresStore method forwards verbatim to core.db.datasets.""" + store = PostgresStore() + + store.get_existing_timestamps("ds", V010, JAN1, JAN2) + mock_db.get_existing_timestamps.assert_called_once_with("ds", V010, JAN1, JAN2) + + store.get_rows_range("ds", V010, JAN1, JAN2) + mock_db.get_rows_range.assert_called_once_with("ds", V010, JAN1, JAN2) + + store.get_rows_timestamps("ds", V010, [JAN1]) + mock_db.get_rows_timestamps.assert_called_once_with("ds", V010, [JAN1]) + + rows = [(JAN1, [{"v": 1}])] + store.insert_rows("ds", V010, rows) + mock_db.insert_rows.assert_called_once_with("ds", V010, rows)