diff --git a/builders/server/core/api/routes.py b/builders/server/core/api/routes.py index ad6a4f6..32a87d5 100644 --- a/builders/server/core/api/routes.py +++ b/builders/server/core/api/routes.py @@ -41,8 +41,12 @@ def build( dataset_version: str, start: str = Query(...), end: str = Query(...), + dry_run: bool = Query(False, alias="dry-run"), ): - """Build missing data for a dataset in the given time range.""" + """Build missing data for a dataset in the given time range. + + With ``dry-run=true``, builders write to an in-memory store, not the database. + """ try: version = SemVer.parse(dataset_version) except ValueError as exc: @@ -59,18 +63,32 @@ def build( ) from exc structlog.contextvars.bind_contextvars( - dataset_name=dataset_name, version=str(version) + dataset_name=dataset_name, version=str(version), dry_run=dry_run ) try: - build_dataset(dataset_name, version, start_ts, end_ts) + produced = build_dataset( + dataset_name, version, start_ts, end_ts, dry_run=dry_run + ) except NoValidTimestampsError as e: raise HTTPException(status_code=422, detail=str(e)) from e except Exception as e: logger.exception("build failed") raise HTTPException(status_code=500, detail=str(e)) from e - return {"status": "ok"} + if not dry_run: + return {"status": "ok"} + + rows = [ + {"timestamp": ts.isoformat(), "data": data_list} + for ts, data_list in sorted((produced or {}).items()) + ] + return { + "dataset_name": dataset_name, + "dataset_version": str(version), + "dry_run": True, + "rows": rows, + } @router.get("/data/{dataset_name}/{dataset_version}") diff --git a/builders/server/core/service/builder.py b/builders/server/core/service/builder.py index 4ab4a1a..69e0b6b 100644 --- a/builders/server/core/service/builder.py +++ b/builders/server/core/service/builder.py @@ -6,6 +6,7 @@ import core.db.datasets from core.runtime import registry from core.service.orchestrator import run_build +from core.service.store import MemoryStore, PostgresStore from core.service.timestamps import NoValidTimestampsError, generate_timestamps from core.utils.semver import SemVer @@ -35,9 +36,17 @@ def build_dataset( dataset_version: SemVer, start: datetime, end: datetime, -) -> None: + *, + dry_run: bool = False, +) -> dict[datetime, list[dict]] | None: """Public entrypoint for building a dataset and its dependencies.""" - run_build(dataset_name, dataset_version, start, end) + if not dry_run: + run_build(dataset_name, dataset_version, start, end, store=PostgresStore()) + return None + + store = MemoryStore() + run_build(dataset_name, dataset_version, start, end, store=store) + return store.get_rows_range(dataset_name, dataset_version, start, end) def get_data( diff --git a/builders/server/core/service/orchestrator.py b/builders/server/core/service/orchestrator.py index 57e8b7d..659f093 100644 --- a/builders/server/core/service/orchestrator.py +++ b/builders/server/core/service/orchestrator.py @@ -21,6 +21,7 @@ import structlog from core.service.scheduler import schedule_build +from core.service.store import PostgresStore, Store from core.service.worker import execute_job from core.utils.semver import SemVer @@ -32,6 +33,7 @@ def run_build( dataset_version: SemVer, start: datetime, end: datetime, + store: Store | None = None, ) -> None: """Orchestrate a full build: schedule, then execute level by level. @@ -50,12 +52,16 @@ def run_build( dataset_version: version of the root dataset start: requested build start time end: requested build end time + store: data backend threaded down to each worker. defaults to + ``PostgresStore`` for real builds, ``MemoryStore`` for dry runs. Raises: ValueError: if end < start_date for any dataset (from scheduler) RuntimeError: if any job fails during execution NoValidTimestampsError: if a job has no valid calendar timestamps """ + if store is None: + store = PostgresStore() plan = schedule_build(dataset_name, dataset_version, start, end) cancelled = threading.Event() @@ -75,7 +81,7 @@ def run_build( ) for job in jobs: - result = execute_job(job, cancelled) + result = execute_job(job, cancelled, store) if not result.success: cancelled.set() diff --git a/builders/server/core/service/store.py b/builders/server/core/service/store.py new file mode 100644 index 0000000..7f69702 --- /dev/null +++ b/builders/server/core/service/store.py @@ -0,0 +1,176 @@ +"""Data store abstraction for the build path. + +``Store`` puts the build path's data operations behind an ABC with two implementations: + +- ``PostgresStore``: a thin shell over real build operations. +- ``MemoryStore``: an in-process dict, unrelated to the DB. + +Workers hold a ``store`` instead of calling the DB directly, +""" + +import json +from abc import ABC, abstractmethod +from collections import defaultdict +from contextlib import AbstractContextManager, nullcontext +from datetime import datetime + +import core.db.datasets +from core.service.locks import get_build_lock +from core.utils.semver import SemVer + + +class Store(ABC): + """Interface for the build path's data operations.""" + + @abstractmethod + def get_existing_timestamps( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> list[datetime]: + """Return distinct timestamps in [start, end] that already have rows.""" + ... + + @abstractmethod + def get_rows_range( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> dict[datetime, list[dict]]: + """Return rows for [start, end], keyed by timestamp.""" + ... + + @abstractmethod + def get_rows_timestamps( + self, + name: str, + version: SemVer, + timestamps: list[datetime], + ) -> dict[datetime, list[dict]]: + """Return rows for specific timestamps, keyed by timestamp.""" + ... + + @abstractmethod + def insert_rows( + self, + name: str, + version: SemVer, + rows: list[tuple[datetime, list[dict]]], + ) -> None: + """Insert (timestamp, list[dict]) rows for a dataset.""" + ... + + @abstractmethod + def build_lock(self, name: str, version: SemVer) -> AbstractContextManager: + """Return the critical-section lock for this dataset's build.""" + ... + + +class PostgresStore(Store): + """Real-build implementation that hits the DB""" + + def get_existing_timestamps( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> list[datetime]: + return core.db.datasets.get_existing_timestamps(name, version, start, end) + + def get_rows_range( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> dict[datetime, list[dict]]: + return core.db.datasets.get_rows_range(name, version, start, end) + + def get_rows_timestamps( + self, + name: str, + version: SemVer, + timestamps: list[datetime], + ) -> dict[datetime, list[dict]]: + return core.db.datasets.get_rows_timestamps(name, version, timestamps) + + def insert_rows( + self, + name: str, + version: SemVer, + rows: list[tuple[datetime, list[dict]]], + ) -> None: + core.db.datasets.insert_rows(name, version, rows) + + def build_lock(self, name: str, version: SemVer) -> AbstractContextManager: + return get_build_lock(name, str(version)) + + +class MemoryStore(Store): + """Dry-run implementation: holds produced rows in a dict, never hits the DB.""" + + def __init__(self) -> None: + self._data: dict[tuple[str, str], dict[datetime, list[dict]]] = defaultdict( + lambda: defaultdict(list) + ) + + def get_existing_timestamps( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> list[datetime]: + table = self._data.get((name, str(version)), {}) + return sorted(ts for ts, rows in table.items() if rows and start <= ts <= end) + + def get_rows_range( + self, + name: str, + version: SemVer, + start: datetime, + end: datetime, + ) -> dict[datetime, list[dict]]: + table = self._data.get((name, str(version)), {}) + return { + ts: list(table[ts]) + for ts in sorted(table) + if table[ts] and start <= ts <= end + } + + def get_rows_timestamps( + self, + name: str, + version: SemVer, + timestamps: list[datetime], + ) -> dict[datetime, list[dict]]: + if not timestamps: + return {} + table = self._data.get((name, str(version)), {}) + wanted = set(timestamps) + return { + ts: list(table[ts]) for ts in sorted(table) if table[ts] and ts in wanted + } + + def insert_rows( + self, + name: str, + version: SemVer, + rows: list[tuple[datetime, list[dict]]], + ) -> None: + if not rows: + return + table = self._data[(name, str(version))] + for ts, data_list in rows: + for data in data_list: + # json round-trip to mirror Postgres Jsonb serialization + table[ts].append(json.loads(json.dumps(data))) + + def build_lock(self, name: str, version: SemVer) -> AbstractContextManager: + """Stub to prevent interference with real build lock""" + return nullcontext() diff --git a/builders/server/core/service/worker.py b/builders/server/core/service/worker.py index 556f694..8bd3730 100644 --- a/builders/server/core/service/worker.py +++ b/builders/server/core/service/worker.py @@ -21,10 +21,9 @@ import structlog -import core.db.datasets from core.runtime import config, registry, runner, validator -from core.service.locks import get_build_lock from core.service.models import JobDescriptor, JobResult +from core.service.store import PostgresStore, Store from core.service.timestamps import NoValidTimestampsError, generate_timestamps logger = structlog.get_logger() @@ -33,6 +32,7 @@ def execute_job( job: JobDescriptor, cancelled: threading.Event, + store: Store | None = None, ) -> JobResult: """Execute a single build job: build missing timestamps for one dataset. @@ -49,12 +49,16 @@ def execute_job( job: describes which dataset to build and over what time range. cancelled: shared event that signals early termination. checked between timestamps so a failed sibling job can stop peers. + store: data interface for reads/writes and the build lock. + ``PostgresStore`` for real builds, ``MemoryStore`` for dry runs. Returns: JobResult indicating success or failure with error detail. """ + if store is None: + store = PostgresStore() try: - _execute(job, cancelled) + _execute(job, cancelled, store) return JobResult(job=job, success=True) except NoValidTimestampsError: # let NoValidTimestampsError propagate so routes.py can return 422 @@ -66,6 +70,7 @@ def execute_job( def _execute( job: JobDescriptor, cancelled: threading.Event, + store: Store, ) -> None: """Inner execution logic. Raises on any failure. @@ -87,10 +92,11 @@ def _execute( ) # acquire per-dataset lock to prevent concurrent builds from racing - # between the "check missing" read and "insert rows" write - with get_build_lock(job.dataset_name, str(job.dataset_version)): + # between the "check missing" read and "insert rows" write. + # dry runs skip the lock entirely + with store.build_lock(job.dataset_name, job.dataset_version): existing = set( - core.db.datasets.get_existing_timestamps( + store.get_existing_timestamps( job.dataset_name, job.dataset_version, job.start, job.end ) ) @@ -134,7 +140,7 @@ def _execute( "build cancelled by failed sibling job" ) - dep_data = _fetch_dep_data(cfg, ts) + dep_data = _fetch_dep_data(cfg, ts, store) result = runner.run_builder( script_dir, cfg.builder, dep_data, ts, env_file=env_file @@ -143,7 +149,7 @@ def _execute( rows.append((ts, result)) # bulk insert -- only reached if all timestamps succeeded - core.db.datasets.insert_rows(job.dataset_name, job.dataset_version, rows) + store.insert_rows(job.dataset_name, job.dataset_version, rows) logger.info( "inserted rows", dataset=job.dataset_name, @@ -155,6 +161,7 @@ def _execute( def _fetch_dep_data( cfg: config.DatasetConfig, ts: datetime, + store: Store, ) -> dict[str, dict[datetime, list[dict]]]: """Fetch dependency data for a single timestamp. @@ -166,16 +173,14 @@ def _fetch_dep_data( for dep_name, dep_info in cfg.dependencies.items(): if dep_info.lookback_subtract is not None: - dep_rows = core.db.datasets.get_rows_range( + dep_rows = store.get_rows_range( dep_name, dep_info.version, ts - dep_info.lookback_subtract, ts, ) else: - dep_rows = core.db.datasets.get_rows_timestamps( - dep_name, dep_info.version, [ts] - ) + dep_rows = store.get_rows_timestamps(dep_name, dep_info.version, [ts]) if not dep_rows: raise RuntimeError( diff --git a/builders/server/tests/core/api/test_routes.py b/builders/server/tests/core/api/test_routes.py index e72d7bc..be521eb 100644 --- a/builders/server/tests/core/api/test_routes.py +++ b/builders/server/tests/core/api/test_routes.py @@ -51,6 +51,65 @@ def test_build_endpoint_no_valid_timestamps(mock_build: MagicMock) -> None: assert "no valid timestamps in range" in resp.json()["detail"] +@patch("core.api.routes.build_dataset") +def test_build_endpoint_dry_run_returns_rows(mock_build: MagicMock) -> None: + """POST with dry-run=true returns the produced rows, not just status ok.""" + ts = datetime(2024, 1, 2) + mock_build.return_value = {ts: [{"ticker": "AAPL", "close": 150}]} + + resp = client.post( + "/api/v1/build/ds/0.1.0?start=2024-01-02&end=2024-01-02&dry-run=true" + ) + + assert resp.status_code == 200 + body = resp.json() + assert body["dataset_name"] == "ds" + assert body["dataset_version"] == "0.1.0" + assert body["dry_run"] is True + assert body["rows"] == [ + {"timestamp": "2024-01-02T00:00:00", "data": [{"ticker": "AAPL", "close": 150}]} + ] + # dry_run=True threaded through to build_dataset + assert mock_build.call_args.kwargs["dry_run"] is True + + +@patch("core.api.routes.build_dataset") +def test_build_endpoint_default_is_not_dry_run(mock_build: MagicMock) -> None: + """POST without dry-run defaults to a real build (status ok, dry_run=False).""" + mock_build.return_value = None + + resp = client.post("/api/v1/build/ds/0.1.0?start=2024-01-02&end=2024-01-02") + + assert resp.status_code == 200 + assert resp.json() == {"status": "ok"} + assert mock_build.call_args.kwargs["dry_run"] is False + + +@patch("core.api.routes.build_dataset") +def test_build_endpoint_dry_run_empty_rows(mock_build: MagicMock) -> None: + """dry-run with no produced rows returns an empty rows list.""" + mock_build.return_value = {} + + resp = client.post( + "/api/v1/build/ds/0.1.0?start=2024-01-02&end=2024-01-02&dry-run=true" + ) + + assert resp.status_code == 200 + assert resp.json()["rows"] == [] + + +@patch( + "core.api.routes.build_dataset", + side_effect=NoValidTimestampsError("no valid timestamps in range"), +) +def test_build_endpoint_dry_run_no_valid_timestamps(mock_build: MagicMock) -> None: + """dry-run surfaces the same 422 as a real build on no valid timestamps.""" + resp = client.post( + "/api/v1/build/ds/0.1.0?start=2024-01-06&end=2024-01-07&dry-run=true" + ) + assert resp.status_code == 422 + + # --- GET /data tests --- diff --git a/builders/server/tests/core/service/test_builder.py b/builders/server/tests/core/service/test_builder.py index e955808..2d986b9 100644 --- a/builders/server/tests/core/service/test_builder.py +++ b/builders/server/tests/core/service/test_builder.py @@ -129,12 +129,38 @@ def test_generate_timestamps_start_on_closed_day_no_valid_range_returns_empty() @patch("core.service.builder.run_build") def test_build_dataset_delegates_to_orchestrator(mock_run_build: MagicMock) -> None: - """build_dataset delegates to run_build with the same args.""" - build_dataset("ds", V010, datetime(2024, 1, 1), datetime(2024, 1, 5)) + """build_dataset delegates to run_build with a PostgresStore for real builds.""" + from core.service.store import PostgresStore - mock_run_build.assert_called_once_with( - "ds", V010, datetime(2024, 1, 1), datetime(2024, 1, 5) - ) + result = build_dataset("ds", V010, datetime(2024, 1, 1), datetime(2024, 1, 5)) + + assert result is None + mock_run_build.assert_called_once() + args, kwargs = mock_run_build.call_args + assert args == ("ds", V010, datetime(2024, 1, 1), datetime(2024, 1, 5)) + assert isinstance(kwargs["store"], PostgresStore) + + +@patch("core.service.builder.run_build") +def test_build_dataset_dry_run_uses_memory_store_and_returns_rows( + mock_run_build: MagicMock, +) -> None: + """dry_run build uses a MemoryStore and returns the produced rows.""" + from core.service.store import MemoryStore + + ts = datetime(2024, 1, 1) + + # simulate the worker writing into the injected store during the build + def fake_run_build(name, version, start, end, store): + store.insert_rows(name, version, [(ts, [{"v": 1}])]) + + mock_run_build.side_effect = fake_run_build + + result = build_dataset("ds", V010, ts, ts, dry_run=True) + + assert result == {ts: [{"v": 1}]} + store = mock_run_build.call_args.kwargs["store"] + assert isinstance(store, MemoryStore) @patch("core.service.builder.run_build") diff --git a/builders/server/tests/core/service/test_orchestrator.py b/builders/server/tests/core/service/test_orchestrator.py index 802af65..7972a13 100644 --- a/builders/server/tests/core/service/test_orchestrator.py +++ b/builders/server/tests/core/service/test_orchestrator.py @@ -66,7 +66,7 @@ def test_failure_stops_subsequent_levels(mock_registry, mock_execute) -> None: mock_registry.get_config.side_effect = lambda name, version: configs[name] # C succeeds, B fails - def mock_exec(job, cancelled): + def mock_exec(job, cancelled, store): if job.dataset_name == "B": return MagicMock(success=False, error="B crashed") return MagicMock(success=True) diff --git a/builders/server/tests/core/service/test_store.py b/builders/server/tests/core/service/test_store.py new file mode 100644 index 0000000..b62f5ff --- /dev/null +++ b/builders/server/tests/core/service/test_store.py @@ -0,0 +1,197 @@ +import threading +from contextlib import nullcontext +from datetime import datetime +from unittest.mock import patch + +import pytest +from core.service.store import MemoryStore, PostgresStore + +from .conftest import V010 + +JAN1 = datetime(2024, 1, 1) +JAN2 = datetime(2024, 1, 2) +JAN3 = datetime(2024, 1, 3) +JAN4 = datetime(2024, 1, 4) + + +# --- MemoryStore: insert + read round-trip --- + + +def test_memory_insert_and_get_rows_range() -> None: + """Inserted rows are read back by range, keyed by timestamp.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}]), (JAN2, [{"v": 2}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN2) + + assert result == {JAN1: [{"v": 1}], JAN2: [{"v": 2}]} + + +def test_memory_get_rows_range_filters_outside_window() -> None: + """get_rows_range excludes timestamps outside [start, end].""" + store = MemoryStore() + store.insert_rows( + "ds", V010, [(JAN1, [{"v": 1}]), (JAN2, [{"v": 2}]), (JAN3, [{"v": 3}])] + ) + + result = store.get_rows_range("ds", V010, JAN2, JAN3) + + assert result == {JAN2: [{"v": 2}], JAN3: [{"v": 3}]} + + +def test_memory_get_rows_range_is_sorted() -> None: + """get_rows_range returns timestamps ascending regardless of insert order.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN3, [{"v": 3}])]) + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + store.insert_rows("ds", V010, [(JAN2, [{"v": 2}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN3) + + assert list(result.keys()) == [JAN1, JAN2, JAN3] + + +def test_memory_multi_row_timestamp() -> None: + """Multiple rows sharing a timestamp accumulate into a list.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"t": "AAPL"}, {"t": "MSFT"}])]) + store.insert_rows("ds", V010, [(JAN1, [{"t": "GOOG"}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN1) + + assert result == {JAN1: [{"t": "AAPL"}, {"t": "MSFT"}, {"t": "GOOG"}]} + + +def test_memory_get_existing_timestamps() -> None: + """get_existing_timestamps returns distinct sorted timestamps in range with data.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}]), (JAN3, [{"v": 3}])]) + + assert store.get_existing_timestamps("ds", V010, JAN1, JAN4) == [JAN1, JAN3] + # out-of-range start clips JAN1 + assert store.get_existing_timestamps("ds", V010, JAN2, JAN4) == [JAN3] + + +def test_memory_get_rows_timestamps_selects_specific() -> None: + """get_rows_timestamps returns only the requested timestamps that have data.""" + store = MemoryStore() + store.insert_rows( + "ds", V010, [(JAN1, [{"v": 1}]), (JAN2, [{"v": 2}]), (JAN3, [{"v": 3}])] + ) + + result = store.get_rows_timestamps("ds", V010, [JAN1, JAN3]) + + assert result == {JAN1: [{"v": 1}], JAN3: [{"v": 3}]} + + +def test_memory_get_rows_timestamps_empty_input() -> None: + """Empty timestamp list returns empty dict (matches Postgres behavior).""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + + assert store.get_rows_timestamps("ds", V010, []) == {} + + +def test_memory_reads_unknown_dataset_return_empty() -> None: + """Reads for a dataset never inserted return empty results, not errors.""" + store = MemoryStore() + + assert store.get_existing_timestamps("ghost", V010, JAN1, JAN4) == [] + assert store.get_rows_range("ghost", V010, JAN1, JAN4) == {} + assert store.get_rows_timestamps("ghost", V010, [JAN1]) == {} + + +def test_memory_isolated_by_name_and_version() -> None: + """Different (name, version) pairs do not bleed into each other.""" + from core.utils.semver import SemVer + + v020 = SemVer.parse("0.2.0") + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + store.insert_rows("ds", v020, [(JAN1, [{"v": 99}])]) + store.insert_rows("other", V010, [(JAN1, [{"v": 7}])]) + + assert store.get_rows_range("ds", V010, JAN1, JAN1) == {JAN1: [{"v": 1}]} + assert store.get_rows_range("ds", v020, JAN1, JAN1) == {JAN1: [{"v": 99}]} + assert store.get_rows_range("other", V010, JAN1, JAN1) == {JAN1: [{"v": 7}]} + + +def test_memory_two_stores_are_independent() -> None: + """Each MemoryStore holds its own data (per-request isolation).""" + a = MemoryStore() + b = MemoryStore() + a.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + + assert a.get_rows_range("ds", V010, JAN1, JAN1) == {JAN1: [{"v": 1}]} + assert b.get_rows_range("ds", V010, JAN1, JAN1) == {} + + +def test_memory_insert_empty_is_noop() -> None: + """Inserting an empty row list does nothing.""" + store = MemoryStore() + store.insert_rows("ds", V010, []) + + assert store.get_rows_range("ds", V010, JAN1, JAN4) == {} + + +def test_memory_insert_rejects_non_serializable() -> None: + """json round-trip rejects non-serializable builder output (Jsonb parity).""" + store = MemoryStore() + + with pytest.raises(TypeError): + store.insert_rows("ds", V010, [(JAN1, [{"bad": {1, 2, 3}}])]) + + +def test_memory_reads_return_copies() -> None: + """Mutating a returned list does not corrupt the store's internal state.""" + store = MemoryStore() + store.insert_rows("ds", V010, [(JAN1, [{"v": 1}])]) + + result = store.get_rows_range("ds", V010, JAN1, JAN1) + result[JAN1].append({"v": 999}) + + assert store.get_rows_range("ds", V010, JAN1, JAN1) == {JAN1: [{"v": 1}]} + + +# --- build_lock behavior --- + + +def test_memory_build_lock_is_nullcontext() -> None: + """MemoryStore.build_lock never blocks -- a dry run takes no real lock.""" + store = MemoryStore() + lock = store.build_lock("ds", V010) + assert isinstance(lock, type(nullcontext())) + + +@patch("core.service.store.get_build_lock") +def test_postgres_build_lock_uses_shared_registry(mock_get_lock) -> None: + """PostgresStore.build_lock delegates to the shared per-dataset lock registry.""" + sentinel = threading.Lock() + mock_get_lock.return_value = sentinel + + result = PostgresStore().build_lock("ds", V010) + + mock_get_lock.assert_called_once_with("ds", "0.1.0") + assert result is sentinel + + +# --- PostgresStore forwarding --- + + +@patch("core.db.datasets") +def test_postgres_store_forwards_all_methods(mock_db) -> None: + """Every PostgresStore method forwards verbatim to core.db.datasets.""" + store = PostgresStore() + + store.get_existing_timestamps("ds", V010, JAN1, JAN2) + mock_db.get_existing_timestamps.assert_called_once_with("ds", V010, JAN1, JAN2) + + store.get_rows_range("ds", V010, JAN1, JAN2) + mock_db.get_rows_range.assert_called_once_with("ds", V010, JAN1, JAN2) + + store.get_rows_timestamps("ds", V010, [JAN1]) + mock_db.get_rows_timestamps.assert_called_once_with("ds", V010, [JAN1]) + + rows = [(JAN1, [{"v": 1}])] + store.insert_rows("ds", V010, rows) + mock_db.insert_rows.assert_called_once_with("ds", V010, rows) diff --git a/builders/server/tests/integration/test_dry_run.py b/builders/server/tests/integration/test_dry_run.py new file mode 100644 index 0000000..7266315 --- /dev/null +++ b/builders/server/tests/integration/test_dry_run.py @@ -0,0 +1,167 @@ +import random +from datetime import datetime +from unittest.mock import patch + +import pytest +from core.service.store import MemoryStore, PostgresStore +from core.utils.semver import SemVer + +pytestmark = pytest.mark.integration + +V010 = SemVer.parse("0.1.0") + + +def _db_row_count(db_conn) -> int: + """Total number of rows in the datasets table.""" + with db_conn.cursor() as cur: + cur.execute("SELECT count(*) FROM datasets") + return cur.fetchone()[0] + + +def _expected_ohlc(timestamp: datetime) -> dict: + """Reproduce the deterministic mock-ohlc output for a given timestamp.""" + random.seed(str(timestamp)) + base = round(random.uniform(100, 300), 2) + return { + "ticker": "AAPL", + "open": base, + "high": round(base + random.uniform(0, 50), 2), + "low": round(base - random.uniform(0, 30), 2), + "close": round(base + random.uniform(-10, 20), 2), + } + + +# --- DB is never touched --- + + +def test_dry_run_does_not_disturb_existing_data(client, db_conn): + """A dry run over a range with committed data neither reads it nor writes more.""" + # commit one real row first + client.post( + "/api/v1/build/mock-ohlc/0.1.0", + params={"start": "2024-01-02", "end": "2024-01-02"}, + ) + before = _db_row_count(db_conn) + assert before == 1 + + # dry run over a wider range: rebuilds the whole window in isolation + resp = client.post( + "/api/v1/build/mock-ohlc/0.1.0", + params={"start": "2024-01-02", "end": "2024-01-04", "dry-run": "true"}, + ) + assert resp.status_code == 200 + assert len(resp.json()["rows"]) == 3 + # row count unchanged: dry run wrote nothing + assert _db_row_count(db_conn) == before + + +# --- builders actually run, output is correct --- + + +def test_dry_run_returns_correct_builder_output(client, db_conn): + """Returned rows match the deterministic builder output.""" + resp = client.post( + "/api/v1/build/mock-ohlc/0.1.0", + params={"start": "2024-01-02", "end": "2024-01-02", "dry-run": "true"}, + ) + assert resp.status_code == 200 + + rows = resp.json()["rows"] + assert len(rows) == 1 + assert rows[0]["timestamp"] == "2024-01-02T00:00:00" + assert rows[0]["data"] == [_expected_ohlc(datetime(2024, 1, 2))] + assert _db_row_count(db_conn) == 0 + + +def test_dry_run_spies_on_runner(client, db_conn): + """The builder subprocess is actually invoked during a dry run.""" + from core.runtime import runner + + real_run_builder = runner.run_builder + with patch.object(runner, "run_builder", side_effect=real_run_builder) as spy: + resp = client.post( + "/api/v1/build/mock-ohlc/0.1.0", + params={"start": "2024-01-02", "end": "2024-01-03", "dry-run": "true"}, + ) + assert resp.status_code == 200 + # one builder invocation per timestamp + assert spy.call_count == 2 + + +# --- dependency graphs build in isolation --- + + +def test_dry_run_dependency_chain(client, db_conn): + """Dry run of a dependent dataset rebuilds the whole chain in memory.""" + resp = client.post( + "/api/v1/build/mock-daily-close/0.1.0", + params={"start": "2024-01-02", "end": "2024-01-02", "dry-run": "true"}, + ) + assert resp.status_code == 200 + + rows = resp.json()["rows"] + assert len(rows) == 1 + close = rows[0]["data"][0] + # the close should equal the ohlc close the dependency produced in-memory + assert close["close"] == _expected_ohlc(datetime(2024, 1, 2))["close"] + + # neither the dataset nor its dependency was written + assert _db_row_count(db_conn) == 0 + + +def test_dry_run_lookback_chain(client, db_conn): + """Dry run of a lookback dataset builds the expanded dependency window in memory.""" + resp = client.post( + "/api/v1/build/mock-moving-avg/0.1.0", + params={"start": "2024-01-08", "end": "2024-01-08", "dry-run": "true"}, + ) + assert resp.status_code == 200 + + rows = resp.json()["rows"] + assert len(rows) == 1 + assert "average" in rows[0]["data"][0] + assert _db_row_count(db_conn) == 0 + + +# --- store contract: PostgresStore vs MemoryStore parity --- + + +def _seed(store, name, version): + """Insert a fixed multi-timestamp, multi-row scenario into a store.""" + store.insert_rows( + name, + version, + [ + (datetime(2024, 1, 1), [{"t": "AAPL", "v": 1}, {"t": "MSFT", "v": 2}]), + (datetime(2024, 1, 2), [{"t": "AAPL", "v": 3}]), + (datetime(2024, 1, 4), [{"t": "AAPL", "v": 4}]), + ], + ) + + +def test_store_parity_read_methods(db_conn): + """PostgresStore and MemoryStore return identical results for the same data.""" + pg = PostgresStore() + mem = MemoryStore() + _seed(pg, "parity", V010) + _seed(mem, "parity", V010) + + jan1, jan2, jan3, jan4 = (datetime(2024, 1, d) for d in (1, 2, 3, 4)) + + # get_existing_timestamps over several windows + for start, end in [(jan1, jan4), (jan2, jan3), (jan3, jan3)]: + assert pg.get_existing_timestamps( + "parity", V010, start, end + ) == mem.get_existing_timestamps("parity", V010, start, end) + + # get_rows_range over several windows + for start, end in [(jan1, jan4), (jan1, jan2), (jan3, jan4)]: + assert pg.get_rows_range("parity", V010, start, end) == mem.get_rows_range( + "parity", V010, start, end + ) + + # get_rows_timestamps for specific selections + for sel in [[jan1], [jan1, jan4], [jan3], []]: + assert pg.get_rows_timestamps("parity", V010, sel) == mem.get_rows_timestamps( + "parity", V010, sel + ) diff --git a/dev-docs/SPEC-backend.md b/dev-docs/SPEC-backend.md index f9afdb0..6b2a28e 100644 --- a/dev-docs/SPEC-backend.md +++ b/dev-docs/SPEC-backend.md @@ -17,7 +17,7 @@ GET /datasets ``` ``` -POST /build/{dataset_name}/{dataset_version}?start=&end= +POST /build/{dataset_name}/{dataset_version}?start=&end=&dry-run= ``` ``` @@ -104,8 +104,31 @@ Each entry in `rows` contains all data dicts for that timestamp (matching the DB - Builders **never** have direct access to the database -- all reads and writes are handled by the server. For now, this is enforced by convention. TODO: add a runtime guard to enforce this. - After builder scripts are run, we upload the data to the Postgres database (see below). - Builds use a scheduler/worker/orchestrator architecture (see below). The scheduler computes a topological build plan, the orchestrator executes it level by level, and the worker handles building a single dataset. +- All data access during a build (reads and writes) goes through a `Store` abstraction (see "Store abstraction and dry-run builds" below), so the same build logic runs against Postgres for real builds and an in-memory store for dry runs. - The service is the only public-facing security boundary (auth, rate limiting, input validation). +#### Dry-run builds + +`POST /build` accepts an optional `dry-run` query parameter (default `false`). When `true`, the entire build runs against an in-memory store and **nothing is written to the database** — the request rebuilds the whole dependency graph in isolation (the store starts empty, so it never reads real committed data) and returns the produced rows so the caller can validate builder logic. + +- **Store selection happens at one boundary.** `build_dataset(dry_run=...)` is the only place the flag is read: it constructs a `PostgresStore` (real build) or a fresh `MemoryStore` (dry run) and passes the ready-made store into `run_build`. Everything below (`run_build → execute_job → _fetch_dep_data`) takes a `store` and never sees the `dry_run` flag, so build logic is identical across real and dry runs. +- **No lock.** A dry run uses a request-private `MemoryStore`, so it cannot corrupt real data and must not take the shared per-dataset build lock (which would block production builds). The lock is owned by the store: `PostgresStore.build_lock` returns the shared lock from `service/locks.py`; `MemoryStore.build_lock` returns a `nullcontext`. +- **No cleanup.** The `MemoryStore` is garbage-collected when the request ends (even on crash). The real DB was never touched, so there is nothing to roll back. +- **Response.** A real build returns `{"status": "ok"}`. A dry run returns an envelope with the produced rows for the requested dataset: + + ```json + { + "dataset_name": "mock-ohlc", + "dataset_version": "0.1.0", + "dry_run": true, + "rows": [ + {"timestamp": "2024-01-02T00:00:00", "data": [{"ticker": "AAPL", "open": 100, "high": 150, "low": 90, "close": 130}]} + ] + } + ``` + + `rows` uses the same shape as `GET /data` (sorted by timestamp; empty list when nothing was produced). Builder and validation failures surface the same 400/422/500 semantics as a real build. + ### Build architecture: scheduler / worker / orchestrator Build execution is split into three layers: @@ -163,29 +186,51 @@ class BuildPlan: `JobDescriptor` is frozen and hashable for use as dict keys. `JobResult` is lightweight -- the worker handles its own DB insert, so no need to carry rows back. +#### Store abstraction and dry-run builds + +The build path reads and writes data through a `Store` interface (`service/store.py`), an `abc.ABC` with four data methods plus a `build_lock` (matching the existing `Calendar` ABC convention, not a `Protocol`): + +```python +class Store(ABC): + def get_existing_timestamps(self, name, version, start, end) -> list[datetime]: ... + def get_rows_range(self, name, version, start, end) -> dict[datetime, list[dict]]: ... + def get_rows_timestamps(self, name, version, timestamps) -> dict[datetime, list[dict]]: ... + def insert_rows(self, name, version, rows) -> None: ... + def build_lock(self, name, version) -> AbstractContextManager: ... +``` + +Two concrete backends: + +- **`PostgresStore`** (real builds): a thin shell forwarding each data method to the corresponding `core.db.datasets` function — same SQL, same behavior. `build_lock` returns the shared per-dataset lock from `service/locks.py`. +- **`MemoryStore`** (dry runs): the same methods backed by a plain dict `{(name, version): {timestamp: [rows]}}` held in process. `insert_rows` appends (round-tripping each row through `json.dumps`/`loads` to mirror Postgres `Jsonb` serialization, so non-serializable builder output still fails); the read methods filter the dict by timestamp range. It never opens a DB connection, and `build_lock` returns a `nullcontext`. + +The `store` is threaded through `run_build → execute_job → _fetch_dep_data`; the worker never calls `core.db.datasets` directly. `build_dataset` is the single boundary that reads the `dry_run` flag and constructs the appropriate store (see "Dry-run builds" above). + #### Worker: single-job execution -`execute_job(job, cancelled)` builds one dataset over one time range: +`execute_job(job, cancelled, store)` builds one dataset over one time range: 1. Generate valid calendar timestamps via `generate_timestamps()` -2. Acquire per-dataset lock (`get_build_lock`) -3. Check which timestamps already exist in the DB -4. For each missing timestamp: fetch dependency data, run builder subprocess, validate output against schema -5. Bulk-insert all rows on success (atomicity: no partial inserts) +2. Acquire the build lock via `store.build_lock(...)` (a real lock for `PostgresStore`, a no-op for `MemoryStore`) +3. Check which timestamps already exist via `store.get_existing_timestamps(...)` +4. For each missing timestamp: fetch dependency data (`store.get_rows_range` / `store.get_rows_timestamps`), run builder subprocess, validate output against schema +5. Bulk-insert all rows on success via `store.insert_rows(...)` (atomicity: no partial inserts) 6. Check `cancelled` event between timestamps for early termination -The worker does NOT handle dependency graph walking or start-date clamping -- those are the scheduler's responsibility. +`store` defaults to `PostgresStore()` when omitted. The worker does NOT handle dependency graph walking or start-date clamping -- those are the scheduler's responsibility. #### Orchestrator: level-by-level execution -`run_build(dataset_name, version, start, end)`: +`run_build(dataset_name, version, start, end, store)`: 1. Calls `schedule_build()` to get a `BuildPlan` 2. Iterates levels sequentially (barrier model: all level N jobs must complete before level N+1 starts) -3. Within each level, executes jobs sequentially via `execute_job()` (MVP single worker -- future: parallelize within levels) +3. Within each level, executes jobs sequentially via `execute_job()`, passing the `store` down (MVP single worker -- future: parallelize within levels) 4. On any job failure: sets `cancelled` event, raises `RuntimeError` +`store` defaults to `PostgresStore()` when omitted, so the orchestrator stays dry-run agnostic. + #### Commit model -Data is committed per-level. Each level's data is inserted to DB before the next level starts. Workers read dependency data from DB. If level N fails, levels 0 through N-1 remain committed. +Data is committed per-level. Each level's data is inserted to the store before the next level starts. Workers read dependency data back from the same store. For real builds the store is Postgres; for dry runs it is the request-private `MemoryStore`, so the dependency hand-off `A → store → B` works identically without touching the DB. If level N fails, levels 0 through N-1 remain committed (real builds) or simply retained in the discarded `MemoryStore` (dry runs). Within each job, atomicity is preserved: rows are accumulated in memory and bulk-inserted only if all timestamps succeed. If any timestamp fails, no rows are inserted for that job. @@ -212,6 +257,7 @@ builders/server/ │ ├── orchestrator.py # level-by-level plan execution via run_build() │ ├── scheduler.py # dependency graph collection + Kahn's algorithm │ ├── worker.py # single-job execution via execute_job() +│ ├── store.py # Store ABC + PostgresStore + MemoryStore (dry-run backend) │ ├── models.py # JobDescriptor, JobResult, BuildPlan data types │ ├── timestamps.py # generate_timestamps(), NoValidTimestampsError │ └── locks.py # per-dataset threading.Lock registry