Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""add thing_id to NMA_WeatherData

Revision ID: f6e5d4c3b2a1
Revises: c7f8a9b0c1d2
Create Date: 2026-02-05 10:40:00.000000

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "f6e5d4c3b2a1"
down_revision: Union[str, Sequence[str], None] = "c7f8a9b0c1d2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
op.add_column(
"NMA_WeatherData",
sa.Column("thing_id", sa.Integer(), nullable=True),
)
op.create_foreign_key(
"fk_weather_data_thing_id",
"NMA_WeatherData",
"thing",
["thing_id"],
["id"],
ondelete="CASCADE",
)
# Backfill thing_id based on LocationId -> Thing.nma_pk_location
op.execute("""
UPDATE "NMA_WeatherData" wd
SET thing_id = t.id
FROM thing t
WHERE t.nma_pk_location IS NOT NULL
AND wd."LocationId" IS NOT NULL
AND t.nma_pk_location = wd."LocationId"::text
""")
# Remove any rows that cannot be linked to a Thing, then enforce NOT NULL
op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL')
op.alter_column(
"NMA_WeatherData", "thing_id", existing_type=sa.Integer(), nullable=False
)


def downgrade() -> None:
"""Downgrade schema."""
op.drop_constraint(
"fk_weather_data_thing_id",
"NMA_WeatherData",
type_="foreignkey",
)
op.drop_column("NMA_WeatherData", "thing_id")
18 changes: 16 additions & 2 deletions db/nma_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ class NMA_WeatherData(Base):
"""
Legacy WeatherData table from AMPAPI.

Note: This table is OUT OF SCOPE for refactoring (not a Thing child).
Note: This table is a Thing child and must link to a parent Thing.
"""

__tablename__ = "NMA_WeatherData"
Expand All @@ -672,7 +672,9 @@ class NMA_WeatherData(Base):
object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True)

# FK
# FK not assigned.
thing_id: Mapped[int] = mapped_column(
Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False
)

# Legacy PK (for audit)
weather_id: Mapped[Optional[uuid.UUID]] = mapped_column(
Expand All @@ -687,6 +689,18 @@ class NMA_WeatherData(Base):
# Additional columns
point_id: Mapped[str] = mapped_column("PointID", String(10))

# Relationships
thing: Mapped["Thing"] = relationship("Thing", back_populates="weather_data")

@validates("thing_id")
def validate_thing_id(self, key, value):
"""Prevent orphan NMA_WeatherData - must have a parent Thing."""
if value is None:
raise ValueError(
"NMA_WeatherData requires a parent Thing (thing_id cannot be None)"
)
return value


class NMA_WeatherPhotos(Base):
"""
Expand Down
7 changes: 7 additions & 0 deletions db/thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
NMA_Stratigraphy,
NMA_SurfaceWaterData,
NMA_WaterLevelsContinuous_Pressure_Daily,
NMA_WeatherData,
)


Expand Down Expand Up @@ -368,6 +369,12 @@ class Thing(
cascade="all, delete-orphan",
passive_deletes=True,
)
weather_data: Mapped[List["NMA_WeatherData"]] = relationship(
"NMA_WeatherData",
back_populates="thing",
cascade="all, delete-orphan",
passive_deletes=True,
)

# --- Association Proxies ---
assets: AssociationProxy[list["Asset"]] = association_proxy(
Expand Down
45 changes: 37 additions & 8 deletions tests/test_weather_data_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,33 @@

from db.engine import session_ctx
from db.nma_legacy import NMA_WeatherData
from db.thing import Thing


def _next_object_id() -> int:
# Use a negative value to avoid collisions with existing legacy OBJECTIDs.
return -(uuid4().int % 2_000_000_000)


def _attach_thing_with_location(session, water_well_thing):
location_id = uuid4()
thing = session.get(Thing, water_well_thing.id)
thing.nma_pk_location = str(location_id)
session.commit()
return thing, location_id


# ===================== CREATE tests ==========================
def test_create_weather_data_all_fields():
def test_create_weather_data_all_fields(water_well_thing):
"""Test creating a weather data record with all migrated fields."""
with session_ctx() as session:
thing, location_id = _attach_thing_with_location(session, water_well_thing)
record = NMA_WeatherData(
object_id=_next_object_id(),
location_id=uuid4(),
location_id=location_id,
point_id="WX-1001",
weather_id=uuid4(),
thing_id=thing.id,
)
session.add(record)
session.commit()
Expand All @@ -58,33 +69,39 @@ def test_create_weather_data_all_fields():
session.commit()


def test_create_weather_data_minimal():
def test_create_weather_data_minimal(water_well_thing):
"""Test creating a weather data record with minimal fields."""
with session_ctx() as session:
thing, location_id = _attach_thing_with_location(session, water_well_thing)
record = NMA_WeatherData(
object_id=_next_object_id(),
point_id="WX-1002",
location_id=location_id,
thing_id=thing.id,
)
session.add(record)
session.commit()
session.refresh(record)

assert record.object_id is not None
assert record.point_id == "WX-1002"
assert record.location_id is None
assert record.location_id is not None
assert record.weather_id is None

session.delete(record)
session.commit()


# ===================== READ tests ==========================
def test_read_weather_data_by_object_id():
def test_read_weather_data_by_object_id(water_well_thing):
"""Test reading a specific weather data record by OBJECTID."""
with session_ctx() as session:
thing, location_id = _attach_thing_with_location(session, water_well_thing)
record = NMA_WeatherData(
object_id=_next_object_id(),
point_id="WX-1003",
location_id=location_id,
thing_id=thing.id,
)
session.add(record)
session.commit()
Expand All @@ -98,16 +115,21 @@ def test_read_weather_data_by_object_id():
session.commit()


def test_query_weather_data_by_point_id():
def test_query_weather_data_by_point_id(water_well_thing):
"""Test querying weather data by point_id."""
with session_ctx() as session:
thing, location_id = _attach_thing_with_location(session, water_well_thing)
record1 = NMA_WeatherData(
object_id=_next_object_id(),
point_id="WX-1004",
location_id=location_id,
thing_id=thing.id,
)
record2 = NMA_WeatherData(
object_id=_next_object_id(),
point_id="WX-1005",
location_id=location_id,
thing_id=thing.id,
)
session.add_all([record1, record2])
session.commit()
Expand All @@ -126,12 +148,15 @@ def test_query_weather_data_by_point_id():


# ===================== UPDATE tests ==========================
def test_update_weather_data():
def test_update_weather_data(water_well_thing):
"""Test updating a weather data record."""
with session_ctx() as session:
thing, location_id = _attach_thing_with_location(session, water_well_thing)
record = NMA_WeatherData(
object_id=_next_object_id(),
point_id="WX-1006",
location_id=location_id,
thing_id=thing.id,
)
session.add(record)
session.commit()
Expand All @@ -151,12 +176,15 @@ def test_update_weather_data():


# ===================== DELETE tests ==========================
def test_delete_weather_data():
def test_delete_weather_data(water_well_thing):
"""Test deleting a weather data record."""
with session_ctx() as session:
thing, location_id = _attach_thing_with_location(session, water_well_thing)
record = NMA_WeatherData(
object_id=_next_object_id(),
point_id="WX-1007",
location_id=location_id,
thing_id=thing.id,
)
session.add(record)
session.commit()
Expand All @@ -178,6 +206,7 @@ def test_weather_data_has_all_migrated_columns():
"point_id",
"weather_id",
"object_id",
"thing_id",
]

for column in expected_columns:
Expand Down
63 changes: 56 additions & 7 deletions transfers/weather_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from db import NMA_WeatherData
from db import NMA_WeatherData, Thing
from db.engine import session_ctx
from transfers.logger import logger
from transfers.transferer import Transferer
from transfers.util import read_csv
Expand All @@ -39,16 +40,43 @@ class WeatherDataTransferer(Transferer):
def __init__(self, *args, batch_size: int = 1000, **kwargs):
super().__init__(*args, **kwargs)
self.batch_size = batch_size
self._thing_id_by_location_id: dict[str, int] = {}
self._build_thing_id_cache()

def _build_thing_id_cache(self) -> None:
with session_ctx() as session:
things = session.query(Thing.id, Thing.nma_pk_location).all()
for thing_id, nma_pk_location in things:
if nma_pk_location:
key = self._normalize_location_id(nma_pk_location)
if key:
self._thing_id_by_location_id[key] = thing_id
logger.info(
"Built Thing cache with %s location ids",
len(self._thing_id_by_location_id),
)

def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
df = read_csv(self.source_table)
return df, df

def _transfer_hook(self, session: Session) -> None:
rows = self._dedupe_rows(
[self._row_dict(row) for row in self.cleaned_df.to_dict("records")],
key="OBJECTID",
)
rows: list[dict[str, Any]] = []
skipped_missing_thing = 0
for raw in self.cleaned_df.to_dict("records"):
record = self._row_dict(raw)
if record is None:
skipped_missing_thing += 1
continue
rows.append(record)

rows = self._dedupe_rows(rows, key="OBJECTID")

if skipped_missing_thing:
logger.warning(
"Skipped %s WeatherData rows without matching Thing",
skipped_missing_thing,
)

insert_stmt = insert(NMA_WeatherData)
excluded = insert_stmt.excluded
Expand All @@ -61,6 +89,7 @@ def _transfer_hook(self, session: Session) -> None:
stmt = insert_stmt.values(chunk).on_conflict_do_update(
index_elements=["OBJECTID"],
set_={
"thing_id": excluded["thing_id"],
"LocationId": excluded.LocationId,
"PointID": excluded.PointID,
"WeatherID": excluded.WeatherID,
Expand All @@ -71,7 +100,7 @@ def _transfer_hook(self, session: Session) -> None:
session.commit()
session.expunge_all()

def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]:
def val(key: str) -> Optional[Any]:
v = row.get(key)
if pd.isna(v):
Expand All @@ -87,11 +116,21 @@ def to_uuid(v: Any) -> Optional[uuid.UUID]:
return uuid.UUID(v)
return None

location_id = to_uuid(val("LocationId"))
thing_id = self._resolve_thing_id(location_id)
if thing_id is None:
logger.warning(
"Skipping WeatherData LocationId=%s - Thing not found",
location_id,
)
return None

return {
"LocationId": to_uuid(val("LocationId")),
"LocationId": location_id,
"PointID": val("PointID"),
"WeatherID": to_uuid(val("WeatherID")),
"OBJECTID": val("OBJECTID"),
"thing_id": thing_id,
}

def _dedupe_rows(
Expand All @@ -111,6 +150,16 @@ def _dedupe_rows(
deduped[row_key] = row
return list(deduped.values()) + passthrough

def _resolve_thing_id(self, location_id: Optional[uuid.UUID]) -> Optional[int]:
if location_id is None:
return None
key = self._normalize_location_id(str(location_id))
return self._thing_id_by_location_id.get(key)

@staticmethod
def _normalize_location_id(value: str) -> str:
return value.strip().lower()


def run(batch_size: int = 1000) -> None:
"""Entrypoint to execute the transfer."""
Expand Down