Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion builders/server/core/service/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import structlog

from core.service.scheduler import schedule_build
from core.service.store import PostgresStore, Store
from core.service.worker import execute_job
from core.utils.semver import SemVer

Expand All @@ -32,6 +33,7 @@ def run_build(
dataset_version: SemVer,
start: datetime,
end: datetime,
store: Store | None = None,
) -> None:
"""Orchestrate a full build: schedule, then execute level by level.

Expand All @@ -50,12 +52,16 @@ def run_build(
dataset_version: version of the root dataset
start: requested build start time
end: requested build end time
store: data backend threaded down to each worker. defaults to
``PostgresStore`` for real builds, ``MemoryStore`` for dry runs.

Raises:
ValueError: if end < start_date for any dataset (from scheduler)
RuntimeError: if any job fails during execution
NoValidTimestampsError: if a job has no valid calendar timestamps
"""
if store is None:
store = PostgresStore()
plan = schedule_build(dataset_name, dataset_version, start, end)
cancelled = threading.Event()

Expand All @@ -75,7 +81,7 @@ def run_build(
)

for job in jobs:
result = execute_job(job, cancelled)
result = execute_job(job, cancelled, store)

if not result.success:
cancelled.set()
Expand Down
176 changes: 176 additions & 0 deletions builders/server/core/service/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Data store abstraction for the build path.

``Store`` puts the build path's data operations behind an ABC with two implementations:

- ``PostgresStore``: a thin shell over real build operations.
- ``MemoryStore``: an in-process dict, unrelated to the DB.

Workers hold a ``store`` instead of calling the DB directly,
"""

import json
from abc import ABC, abstractmethod
from collections import defaultdict
from contextlib import AbstractContextManager, nullcontext
from datetime import datetime

import core.db.datasets
from core.service.locks import get_build_lock
from core.utils.semver import SemVer


class Store(ABC):
"""Interface for the build path's data operations."""

@abstractmethod
def get_existing_timestamps(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> list[datetime]:
"""Return distinct timestamps in [start, end] that already have rows."""
...

@abstractmethod
def get_rows_range(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> dict[datetime, list[dict]]:
"""Return rows for [start, end], keyed by timestamp."""
...

@abstractmethod
def get_rows_timestamps(
self,
name: str,
version: SemVer,
timestamps: list[datetime],
) -> dict[datetime, list[dict]]:
"""Return rows for specific timestamps, keyed by timestamp."""
...

@abstractmethod
def insert_rows(
self,
name: str,
version: SemVer,
rows: list[tuple[datetime, list[dict]]],
) -> None:
"""Insert (timestamp, list[dict]) rows for a dataset."""
...

@abstractmethod
def build_lock(self, name: str, version: SemVer) -> AbstractContextManager:
"""Return the critical-section lock for this dataset's build."""
...


class PostgresStore(Store):
"""Real-build implementation that hits the DB"""

def get_existing_timestamps(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> list[datetime]:
return core.db.datasets.get_existing_timestamps(name, version, start, end)

def get_rows_range(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> dict[datetime, list[dict]]:
return core.db.datasets.get_rows_range(name, version, start, end)

def get_rows_timestamps(
self,
name: str,
version: SemVer,
timestamps: list[datetime],
) -> dict[datetime, list[dict]]:
return core.db.datasets.get_rows_timestamps(name, version, timestamps)

def insert_rows(
self,
name: str,
version: SemVer,
rows: list[tuple[datetime, list[dict]]],
) -> None:
core.db.datasets.insert_rows(name, version, rows)

def build_lock(self, name: str, version: SemVer) -> AbstractContextManager:
return get_build_lock(name, str(version))


class MemoryStore(Store):
"""Dry-run implementation: holds produced rows in a dict, never hits the DB."""

def __init__(self) -> None:
self._data: dict[tuple[str, str], dict[datetime, list[dict]]] = defaultdict(
lambda: defaultdict(list)
)

def get_existing_timestamps(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> list[datetime]:
table = self._data.get((name, str(version)), {})
return sorted(ts for ts, rows in table.items() if rows and start <= ts <= end)

def get_rows_range(
self,
name: str,
version: SemVer,
start: datetime,
end: datetime,
) -> dict[datetime, list[dict]]:
table = self._data.get((name, str(version)), {})
return {
ts: list(table[ts])
for ts in sorted(table)
if table[ts] and start <= ts <= end
}

def get_rows_timestamps(
self,
name: str,
version: SemVer,
timestamps: list[datetime],
) -> dict[datetime, list[dict]]:
if not timestamps:
return {}
table = self._data.get((name, str(version)), {})
wanted = set(timestamps)
return {
ts: list(table[ts]) for ts in sorted(table) if table[ts] and ts in wanted
}

def insert_rows(
self,
name: str,
version: SemVer,
rows: list[tuple[datetime, list[dict]]],
) -> None:
if not rows:
return
table = self._data[(name, str(version))]
for ts, data_list in rows:
for data in data_list:
# json round-trip to mirror Postgres Jsonb serialization
table[ts].append(json.loads(json.dumps(data)))

def build_lock(self, name: str, version: SemVer) -> AbstractContextManager:
"""Stub to prevent interference with real build lock"""
return nullcontext()
29 changes: 17 additions & 12 deletions builders/server/core/service/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@

import structlog

import core.db.datasets
from core.runtime import config, registry, runner, validator
from core.service.locks import get_build_lock
from core.service.models import JobDescriptor, JobResult
from core.service.store import PostgresStore, Store
from core.service.timestamps import NoValidTimestampsError, generate_timestamps

logger = structlog.get_logger()
Expand All @@ -33,6 +32,7 @@
def execute_job(
job: JobDescriptor,
cancelled: threading.Event,
store: Store | None = None,
) -> JobResult:
"""Execute a single build job: build missing timestamps for one dataset.

Expand All @@ -49,12 +49,16 @@ def execute_job(
job: describes which dataset to build and over what time range.
cancelled: shared event that signals early termination. checked
between timestamps so a failed sibling job can stop peers.
store: data interface for reads/writes and the build lock.
``PostgresStore`` for real builds, ``MemoryStore`` for dry runs.

Returns:
JobResult indicating success or failure with error detail.
"""
if store is None:
store = PostgresStore()
try:
_execute(job, cancelled)
_execute(job, cancelled, store)
return JobResult(job=job, success=True)
except NoValidTimestampsError:
# let NoValidTimestampsError propagate so routes.py can return 422
Expand All @@ -66,6 +70,7 @@ def execute_job(
def _execute(
job: JobDescriptor,
cancelled: threading.Event,
store: Store,
) -> None:
"""Inner execution logic. Raises on any failure.

Expand All @@ -87,10 +92,11 @@ def _execute(
)

# acquire per-dataset lock to prevent concurrent builds from racing
# between the "check missing" read and "insert rows" write
with get_build_lock(job.dataset_name, str(job.dataset_version)):
# between the "check missing" read and "insert rows" write.
# dry runs skip the lock entirely
with store.build_lock(job.dataset_name, job.dataset_version):
existing = set(
core.db.datasets.get_existing_timestamps(
store.get_existing_timestamps(
job.dataset_name, job.dataset_version, job.start, job.end
)
)
Expand Down Expand Up @@ -134,7 +140,7 @@ def _execute(
"build cancelled by failed sibling job"
)

dep_data = _fetch_dep_data(cfg, ts)
dep_data = _fetch_dep_data(cfg, ts, store)

result = runner.run_builder(
script_dir, cfg.builder, dep_data, ts, env_file=env_file
Expand All @@ -143,7 +149,7 @@ def _execute(
rows.append((ts, result))

# bulk insert -- only reached if all timestamps succeeded
core.db.datasets.insert_rows(job.dataset_name, job.dataset_version, rows)
store.insert_rows(job.dataset_name, job.dataset_version, rows)
logger.info(
"inserted rows",
dataset=job.dataset_name,
Expand All @@ -155,6 +161,7 @@ def _execute(
def _fetch_dep_data(
cfg: config.DatasetConfig,
ts: datetime,
store: Store,
) -> dict[str, dict[datetime, list[dict]]]:
"""Fetch dependency data for a single timestamp.

Expand All @@ -166,16 +173,14 @@ def _fetch_dep_data(

for dep_name, dep_info in cfg.dependencies.items():
if dep_info.lookback_subtract is not None:
dep_rows = core.db.datasets.get_rows_range(
dep_rows = store.get_rows_range(
dep_name,
dep_info.version,
ts - dep_info.lookback_subtract,
ts,
)
else:
dep_rows = core.db.datasets.get_rows_timestamps(
dep_name, dep_info.version, [ts]
)
dep_rows = store.get_rows_timestamps(dep_name, dep_info.version, [ts])

if not dep_rows:
raise RuntimeError(
Expand Down
2 changes: 1 addition & 1 deletion builders/server/tests/core/service/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_failure_stops_subsequent_levels(mock_registry, mock_execute) -> None:
mock_registry.get_config.side_effect = lambda name, version: configs[name]

# C succeeds, B fails
def mock_exec(job, cancelled):
def mock_exec(job, cancelled, store):
if job.dataset_name == "B":
return MagicMock(success=False, error="B crashed")
return MagicMock(success=True)
Expand Down
Loading
Loading