From bfac303c34272d8d162c10d0b9df40762f02e7e5 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 5 Feb 2026 12:14:36 -0700 Subject: [PATCH 1/6] feat(nma_legacy): add thing_id foreign key to NMA_WeatherData and establish relationship with Thing - add thing_id FK + relationship on NMA_WeatherData with validation - cache Thing.nma_pk_location and resolve thing_id in WeatherData transfer - skip unlinked WeatherData rows to prevent orphaned children - add alembic migration to backfill thing_id and enforce NOT NULL - update WeatherData legacy tests to include Thing linkage --- ...c3b2a1_add_thing_id_to_nma_weather_data.py | 60 ++++++++++++++++++ db/nma_legacy.py | 18 +++++- db/thing.py | 7 +++ tests/test_weather_data_legacy.py | 45 ++++++++++--- transfers/weather_data.py | 63 ++++++++++++++++--- 5 files changed, 176 insertions(+), 17 deletions(-) create mode 100644 alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py diff --git a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py new file mode 100644 index 00000000..3efcbbfb --- /dev/null +++ b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py @@ -0,0 +1,60 @@ +"""add thing_id to NMA_WeatherData + +Revision ID: f6e5d4c3b2a1 +Revises: c7f8a9b0c1d2 +Create Date: 2026-02-05 10:40:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "f6e5d4c3b2a1" +down_revision: Union[str, Sequence[str], None] = "c7f8a9b0c1d2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.add_column( + "NMA_WeatherData", + sa.Column("thing_id", sa.Integer(), nullable=True), + ) + op.create_foreign_key( + "fk_weather_data_thing_id", + "NMA_WeatherData", + "thing", + ["thing_id"], + ["id"], + ondelete="CASCADE", + ) + # Backfill thing_id based on LocationId -> Thing.nma_pk_location + op.execute( + """ + UPDATE "NMA_WeatherData" wd + SET thing_id = t.id + FROM thing t + WHERE t.nma_pk_location IS NOT NULL + AND wd."LocationId" IS NOT NULL + AND t.nma_pk_location = wd."LocationId"::text + """ + ) + # Remove any rows that cannot be linked to a Thing, then enforce NOT NULL + op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL') + op.alter_column( + "NMA_WeatherData", "thing_id", existing_type=sa.Integer(), nullable=False + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_constraint( + "fk_weather_data_thing_id", + "NMA_WeatherData", + type_="foreignkey", + ) + op.drop_column("NMA_WeatherData", "thing_id") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 557c415a..00ff0214 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -663,7 +663,7 @@ class NMA_WeatherData(Base): """ Legacy WeatherData table from AMPAPI. - Note: This table is OUT OF SCOPE for refactoring (not a Thing child). + Note: This table is a Thing child and must link to a parent Thing. """ __tablename__ = "NMA_WeatherData" @@ -672,7 +672,9 @@ class NMA_WeatherData(Base): object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True) # FK - # FK not assigned. + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) # Legacy PK (for audit) weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( @@ -687,6 +689,18 @@ class NMA_WeatherData(Base): # Additional columns point_id: Mapped[str] = mapped_column("PointID", String(10)) + # Relationships + thing: Mapped["Thing"] = relationship("Thing", back_populates="weather_data") + + @validates("thing_id") + def validate_thing_id(self, key, value): + """Prevent orphan NMA_WeatherData - must have a parent Thing.""" + if value is None: + raise ValueError( + "NMA_WeatherData requires a parent Thing (thing_id cannot be None)" + ) + return value + class NMA_WeatherPhotos(Base): """ diff --git a/db/thing.py b/db/thing.py index fb046d3e..26ae6200 100644 --- a/db/thing.py +++ b/db/thing.py @@ -55,6 +55,7 @@ NMA_Stratigraphy, NMA_SurfaceWaterData, NMA_WaterLevelsContinuous_Pressure_Daily, + NMA_WeatherData, ) @@ -368,6 +369,12 @@ class Thing( cascade="all, delete-orphan", passive_deletes=True, ) + weather_data: Mapped[List["NMA_WeatherData"]] = relationship( + "NMA_WeatherData", + back_populates="thing", + cascade="all, delete-orphan", + passive_deletes=True, + ) # --- Association Proxies --- assets: AssociationProxy[list["Asset"]] = association_proxy( diff --git a/tests/test_weather_data_legacy.py b/tests/test_weather_data_legacy.py index cce28e66..711c828a 100644 --- a/tests/test_weather_data_legacy.py +++ b/tests/test_weather_data_legacy.py @@ -28,6 +28,7 @@ from db.engine import session_ctx from db.nma_legacy import NMA_WeatherData +from db.thing import Thing def _next_object_id() -> int: @@ -35,15 +36,25 @@ def _next_object_id() -> int: return -(uuid4().int % 2_000_000_000) +def _attach_thing_with_location(session, water_well_thing): + location_id = uuid4() + thing = session.get(Thing, water_well_thing.id) + thing.nma_pk_location = str(location_id) + session.commit() + return thing, location_id + + # ===================== CREATE tests ========================== -def test_create_weather_data_all_fields(): +def test_create_weather_data_all_fields(water_well_thing): """Test creating a weather data record with all migrated fields.""" with session_ctx() as session: + thing, location_id = _attach_thing_with_location(session, water_well_thing) record = NMA_WeatherData( object_id=_next_object_id(), - location_id=uuid4(), + location_id=location_id, point_id="WX-1001", weather_id=uuid4(), + thing_id=thing.id, ) session.add(record) session.commit() @@ -58,12 +69,15 @@ def test_create_weather_data_all_fields(): session.commit() -def test_create_weather_data_minimal(): +def test_create_weather_data_minimal(water_well_thing): """Test creating a weather data record with minimal fields.""" with session_ctx() as session: + thing, location_id = _attach_thing_with_location(session, water_well_thing) record = NMA_WeatherData( object_id=_next_object_id(), point_id="WX-1002", + location_id=location_id, + thing_id=thing.id, ) session.add(record) session.commit() @@ -71,7 +85,7 @@ def test_create_weather_data_minimal(): assert record.object_id is not None assert record.point_id == "WX-1002" - assert record.location_id is None + assert record.location_id is not None assert record.weather_id is None session.delete(record) @@ -79,12 +93,15 @@ def test_create_weather_data_minimal(): # ===================== READ tests ========================== -def test_read_weather_data_by_object_id(): +def test_read_weather_data_by_object_id(water_well_thing): """Test reading a specific weather data record by OBJECTID.""" with session_ctx() as session: + thing, location_id = _attach_thing_with_location(session, water_well_thing) record = NMA_WeatherData( object_id=_next_object_id(), point_id="WX-1003", + location_id=location_id, + thing_id=thing.id, ) session.add(record) session.commit() @@ -98,16 +115,21 @@ def test_read_weather_data_by_object_id(): session.commit() -def test_query_weather_data_by_point_id(): +def test_query_weather_data_by_point_id(water_well_thing): """Test querying weather data by point_id.""" with session_ctx() as session: + thing, location_id = _attach_thing_with_location(session, water_well_thing) record1 = NMA_WeatherData( object_id=_next_object_id(), point_id="WX-1004", + location_id=location_id, + thing_id=thing.id, ) record2 = NMA_WeatherData( object_id=_next_object_id(), point_id="WX-1005", + location_id=location_id, + thing_id=thing.id, ) session.add_all([record1, record2]) session.commit() @@ -126,12 +148,15 @@ def test_query_weather_data_by_point_id(): # ===================== UPDATE tests ========================== -def test_update_weather_data(): +def test_update_weather_data(water_well_thing): """Test updating a weather data record.""" with session_ctx() as session: + thing, location_id = _attach_thing_with_location(session, water_well_thing) record = NMA_WeatherData( object_id=_next_object_id(), point_id="WX-1006", + location_id=location_id, + thing_id=thing.id, ) session.add(record) session.commit() @@ -151,12 +176,15 @@ def test_update_weather_data(): # ===================== DELETE tests ========================== -def test_delete_weather_data(): +def test_delete_weather_data(water_well_thing): """Test deleting a weather data record.""" with session_ctx() as session: + thing, location_id = _attach_thing_with_location(session, water_well_thing) record = NMA_WeatherData( object_id=_next_object_id(), point_id="WX-1007", + location_id=location_id, + thing_id=thing.id, ) session.add(record) session.commit() @@ -178,6 +206,7 @@ def test_weather_data_has_all_migrated_columns(): "point_id", "weather_id", "object_id", + "thing_id", ] for column in expected_columns: diff --git a/transfers/weather_data.py b/transfers/weather_data.py index 4d75d1b4..da0dbd47 100644 --- a/transfers/weather_data.py +++ b/transfers/weather_data.py @@ -23,7 +23,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_WeatherData +from db import NMA_WeatherData, Thing +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import read_csv @@ -39,16 +40,43 @@ class WeatherDataTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._thing_id_by_location_id: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.id, Thing.nma_pk_location).all() + for thing_id, nma_pk_location in things: + if nma_pk_location: + key = self._normalize_location_id(nma_pk_location) + if key: + self._thing_id_by_location_id[key] = thing_id + logger.info( + "Built Thing cache with %s location ids", + len(self._thing_id_by_location_id), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = read_csv(self.source_table) return df, df def _transfer_hook(self, session: Session) -> None: - rows = self._dedupe_rows( - [self._row_dict(row) for row in self.cleaned_df.to_dict("records")], - key="OBJECTID", - ) + rows: list[dict[str, Any]] = [] + skipped_missing_thing = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_thing += 1 + continue + rows.append(record) + + rows = self._dedupe_rows(rows, key="OBJECTID") + + if skipped_missing_thing: + logger.warning( + "Skipped %s WeatherData rows without matching Thing", + skipped_missing_thing, + ) insert_stmt = insert(NMA_WeatherData) excluded = insert_stmt.excluded @@ -61,6 +89,7 @@ def _transfer_hook(self, session: Session) -> None: stmt = insert_stmt.values(chunk).on_conflict_do_update( index_elements=["OBJECTID"], set_={ + "thing_id": excluded["thing_id"], "LocationId": excluded.LocationId, "PointID": excluded.PointID, "WeatherID": excluded.WeatherID, @@ -71,7 +100,7 @@ def _transfer_hook(self, session: Session) -> None: session.commit() session.expunge_all() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: def val(key: str) -> Optional[Any]: v = row.get(key) if pd.isna(v): @@ -87,11 +116,21 @@ def to_uuid(v: Any) -> Optional[uuid.UUID]: return uuid.UUID(v) return None + location_id = to_uuid(val("LocationId")) + thing_id = self._resolve_thing_id(location_id) + if thing_id is None: + logger.warning( + "Skipping WeatherData LocationId=%s - Thing not found", + location_id, + ) + return None + return { - "LocationId": to_uuid(val("LocationId")), + "LocationId": location_id, "PointID": val("PointID"), "WeatherID": to_uuid(val("WeatherID")), "OBJECTID": val("OBJECTID"), + "thing_id": thing_id, } def _dedupe_rows( @@ -111,6 +150,16 @@ def _dedupe_rows( deduped[row_key] = row return list(deduped.values()) + passthrough + def _resolve_thing_id(self, location_id: Optional[uuid.UUID]) -> Optional[int]: + if location_id is None: + return None + key = self._normalize_location_id(str(location_id)) + return self._thing_id_by_location_id.get(key) + + @staticmethod + def _normalize_location_id(value: str) -> str: + return value.strip().lower() + def run(batch_size: int = 1000) -> None: """Entrypoint to execute the transfer.""" From 5f31f62da99ccfce7810d4ec0a8113cfda4e90c9 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Thu, 5 Feb 2026 23:15:54 +0000 Subject: [PATCH 2/6] Formatting changes --- .../f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py index 3efcbbfb..53bede05 100644 --- a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py +++ b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py @@ -33,16 +33,14 @@ def upgrade() -> None: ondelete="CASCADE", ) # Backfill thing_id based on LocationId -> Thing.nma_pk_location - op.execute( - """ + op.execute(""" UPDATE "NMA_WeatherData" wd SET thing_id = t.id FROM thing t WHERE t.nma_pk_location IS NOT NULL AND wd."LocationId" IS NOT NULL AND t.nma_pk_location = wd."LocationId"::text - """ - ) + """) # Remove any rows that cannot be linked to a Thing, then enforce NOT NULL op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL') op.alter_column( From d3de737aa9b68a08f59c4829d3f7bedbc6addabc Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 5 Feb 2026 13:30:56 -0700 Subject: [PATCH 3/6] feat(nma_legacy): establish relationship between NMA_SurfaceWaterPhotos and NMA_SurfaceWaterData via SurfaceID - make NMA_SurfaceWaterData.surface_id unique to support FK target - add FK + relationship on NMA_SurfaceWaterPhotos.surface_id with validation - enforce parent-first ordering by running SurfaceWaterPhotos after SurfaceWaterData - skip orphan photo rows in transfer using cached SurfaceIDs - add migration to backfill/cleanup and enforce NOT NULL + FK - update legacy tests to create photos with real parent SurfaceWaterData --- ...face_water_photos_to_surface_water_data.py | 72 +++++++++++++++++++ db/nma_legacy.py | 36 ++++++++-- tests/test_surface_water_photos_legacy.py | 43 +++++++++-- transfers/surface_water_photos.py | 48 +++++++++++-- transfers/transfer.py | 9 +-- 5 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py diff --git a/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py new file mode 100644 index 00000000..a8b60040 --- /dev/null +++ b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py @@ -0,0 +1,72 @@ +"""link surface water photos to surface water data + +Revision ID: a1b2c3d4e5f6 +Revises: f6e5d4c3b2a1 +Create Date: 2026-02-05 11:10:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "a1b2c3d4e5f6" +down_revision: Union[str, Sequence[str], None] = "f6e5d4c3b2a1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_unique_constraint( + "uq_surface_water_data_surface_id", + "NMA_SurfaceWaterData", + ["SurfaceID"], + ) + op.create_foreign_key( + "fk_surface_water_photos_surface_id", + "NMA_SurfaceWaterPhotos", + "NMA_SurfaceWaterData", + ["SurfaceID"], + ["SurfaceID"], + ondelete="CASCADE", + ) + op.execute( + """ + DELETE FROM "NMA_SurfaceWaterPhotos" p + WHERE p."SurfaceID" IS NULL + OR NOT EXISTS ( + SELECT 1 + FROM "NMA_SurfaceWaterData" d + WHERE d."SurfaceID" = p."SurfaceID" + ) + """ + ) + op.alter_column( + "NMA_SurfaceWaterPhotos", + "SurfaceID", + existing_type=sa.UUID(), + nullable=False, + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.alter_column( + "NMA_SurfaceWaterPhotos", + "SurfaceID", + existing_type=sa.UUID(), + nullable=True, + ) + op.drop_constraint( + "fk_surface_water_photos_surface_id", + "NMA_SurfaceWaterPhotos", + type_="foreignkey", + ) + op.drop_constraint( + "uq_surface_water_data_surface_id", + "NMA_SurfaceWaterData", + type_="unique", + ) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 00ff0214..d943b255 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -586,7 +586,7 @@ class NMA_SurfaceWaterData(Base): # Legacy PK (for audit) surface_id: Mapped[uuid.UUID] = mapped_column( - "SurfaceID", UUID(as_uuid=True), nullable=False + "SurfaceID", UUID(as_uuid=True), nullable=False, unique=True ) # Legacy FK (for audit) @@ -617,6 +617,12 @@ class NMA_SurfaceWaterData(Base): # Relationships thing: Mapped["Thing"] = relationship("Thing", back_populates="surface_water_data") + surface_water_photos: Mapped[list["NMA_SurfaceWaterPhotos"]] = relationship( + "NMA_SurfaceWaterPhotos", + back_populates="surface_water_data", + cascade="all, delete-orphan", + passive_deletes=True, + ) @validates("thing_id") def validate_thing_id(self, key, value): @@ -632,7 +638,7 @@ class NMA_SurfaceWaterPhotos(Base): """ Legacy SurfaceWaterPhotos table from NM_Aquifer. - Note: This table is OUT OF SCOPE for refactoring (not a Thing child). + Note: This table is a child of NMA_SurfaceWaterData via SurfaceID. """ __tablename__ = "NMA_SurfaceWaterPhotos" @@ -643,21 +649,39 @@ class NMA_SurfaceWaterPhotos(Base): ) # FK - # FK not assigned. + surface_id: Mapped[uuid.UUID] = mapped_column( + "SurfaceID", + UUID(as_uuid=True), + ForeignKey("NMA_SurfaceWaterData.SurfaceID", ondelete="CASCADE"), + nullable=False, + ) # Legacy PK (for audit) # Current `global_id` is also the original PK in the legacy DB # Legacy FK (for audit) - surface_id: Mapped[Optional[uuid.UUID]] = mapped_column( - "SurfaceID", UUID(as_uuid=True) - ) + # surface_id is also the legacy FK in the source table. # Additional columns point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50)) object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + # Relationships + surface_water_data: Mapped["NMA_SurfaceWaterData"] = relationship( + "NMA_SurfaceWaterData", back_populates="surface_water_photos" + ) + + @validates("surface_id") + def validate_surface_id(self, key, value): + """Prevent orphan NMA_SurfaceWaterPhotos - must have a parent NMA_SurfaceWaterData.""" + if value is None: + raise ValueError( + "NMA_SurfaceWaterPhotos requires a parent NMA_SurfaceWaterData " + "(surface_id cannot be None)" + ) + return value + class NMA_WeatherData(Base): """ diff --git a/tests/test_surface_water_photos_legacy.py b/tests/test_surface_water_photos_legacy.py index 7f6416b5..d8021194 100644 --- a/tests/test_surface_water_photos_legacy.py +++ b/tests/test_surface_water_photos_legacy.py @@ -28,14 +28,43 @@ from uuid import uuid4 from db.engine import session_ctx -from db.nma_legacy import NMA_SurfaceWaterPhotos +from db.nma_legacy import NMA_SurfaceWaterData, NMA_SurfaceWaterPhotos +from db.thing import Thing -def test_create_surface_water_photos_all_fields(): +def _next_object_id() -> int: + # Use a negative value to avoid collisions with existing legacy OBJECTIDs. + return -(uuid4().int % 2_000_000_000) + + +def _attach_thing_with_location(session, water_well_thing): + location_id = uuid4() + thing = session.get(Thing, water_well_thing.id) + thing.nma_pk_location = str(location_id) + session.commit() + return thing, location_id + + +def _create_surface_water_data(session, water_well_thing): + thing, location_id = _attach_thing_with_location(session, water_well_thing) + record = NMA_SurfaceWaterData( + location_id=location_id, + thing_id=thing.id, + surface_id=uuid4(), + point_id="SW-1000", + object_id=_next_object_id(), + ) + session.add(record) + session.commit() + return record + + +def test_create_surface_water_photos_all_fields(water_well_thing): """Test creating a surface water photos record with all fields.""" with session_ctx() as session: + parent = _create_surface_water_data(session, water_well_thing) record = NMA_SurfaceWaterPhotos( - surface_id=uuid4(), + surface_id=parent.surface_id, point_id="SW-0001", ole_path="photo.jpg", object_id=123, @@ -52,14 +81,17 @@ def test_create_surface_water_photos_all_fields(): assert record.object_id == 123 session.delete(record) + session.delete(parent) session.commit() -def test_create_surface_water_photos_minimal(): +def test_create_surface_water_photos_minimal(water_well_thing): """Test creating a surface water photos record with required fields only.""" with session_ctx() as session: + parent = _create_surface_water_data(session, water_well_thing) record = NMA_SurfaceWaterPhotos( point_id="SW-0002", + surface_id=parent.surface_id, global_id=uuid4(), ) session.add(record) @@ -68,11 +100,12 @@ def test_create_surface_water_photos_minimal(): assert record.global_id is not None assert record.point_id == "SW-0002" - assert record.surface_id is None + assert record.surface_id is not None assert record.ole_path is None assert record.object_id is None session.delete(record) + session.delete(parent) session.commit() diff --git a/transfers/surface_water_photos.py b/transfers/surface_water_photos.py index 43f11581..11ea7822 100644 --- a/transfers/surface_water_photos.py +++ b/transfers/surface_water_photos.py @@ -23,7 +23,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_SurfaceWaterPhotos +from db import NMA_SurfaceWaterData, NMA_SurfaceWaterPhotos +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import replace_nans @@ -37,6 +38,19 @@ class SurfaceWaterPhotosTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._surface_id_cache: set[str] = set() + self._build_surface_id_cache() + + def _build_surface_id_cache(self) -> None: + with session_ctx() as session: + surface_ids = session.query(NMA_SurfaceWaterData.surface_id).all() + for (surface_id,) in surface_ids: + if surface_id: + self._surface_id_cache.add(self._normalize_surface_id(surface_id)) + logger.info( + "Built SurfaceWaterData cache with %s surface ids", + len(self._surface_id_cache), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -44,12 +58,24 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, cleaned_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows: list[dict[str, Any]] = [] + skipped_missing_parent = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_parent += 1 + continue + rows.append(record) rows = self._dedupe_rows(rows, key="GlobalID") if not rows: logger.info("No SurfaceWaterPhotos rows to transfer") return + if skipped_missing_parent: + logger.warning( + "Skipped %s SurfaceWaterPhotos rows without matching SurfaceWaterData", + skipped_missing_parent, + ) insert_stmt = insert(NMA_SurfaceWaterPhotos) excluded = insert_stmt.excluded @@ -74,9 +100,16 @@ def _transfer_hook(self, session: Session) -> None: session.execute(stmt) session.commit() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: + surface_id = self._uuid_val(row.get("SurfaceID")) + if surface_id is None or not self._has_surface_id(surface_id): + logger.warning( + "Skipping SurfaceWaterPhotos SurfaceID=%s - SurfaceWaterData not found", + surface_id, + ) + return None return { - "SurfaceID": self._uuid_val(row.get("SurfaceID")), + "SurfaceID": surface_id, "PointID": row.get("PointID"), "OLEPath": row.get("OLEPath"), "OBJECTID": row.get("OBJECTID"), @@ -107,6 +140,13 @@ def _uuid_val(self, value: Any) -> Optional[UUID]: return None return None + def _has_surface_id(self, surface_id: UUID) -> bool: + return self._normalize_surface_id(surface_id) in self._surface_id_cache + + @staticmethod + def _normalize_surface_id(value: UUID) -> str: + return str(value).strip().lower() + def run(batch_size: int = 1000) -> None: """Entrypoint to execute the transfer.""" diff --git a/transfers/transfer.py b/transfers/transfer.py index 5bca4378..1a44e3c3 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -513,8 +513,6 @@ def _transfer_parallel( parallel_tasks_1.append(("LinkIdsLocation", LinkIdsLocationDataTransferer)) if opts.transfer_groups: parallel_tasks_1.append(("Groups", ProjectGroupTransferer)) - if opts.transfer_surface_water_photos: - parallel_tasks_1.append(("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer)) if opts.transfer_soil_rock_results: parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer)) if opts.transfer_weather_photos: @@ -599,8 +597,6 @@ def _transfer_parallel( metrics.location_link_ids_metrics(*results_map["LinkIdsLocation"]) if "Groups" in results_map and results_map["Groups"]: metrics.group_metrics(*results_map["Groups"]) - if "SurfaceWaterPhotos" in results_map and results_map["SurfaceWaterPhotos"]: - metrics.surface_water_photos_metrics(*results_map["SurfaceWaterPhotos"]) if "SoilRockResults" in results_map and results_map["SoilRockResults"]: metrics.soil_rock_results_metrics(*results_map["SoilRockResults"]) if "Assets" in results_map and results_map["Assets"]: @@ -629,6 +625,11 @@ def _transfer_parallel( if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) + if opts.transfer_surface_water_photos: + message("TRANSFERRING SURFACE WATER PHOTOS") + results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) + metrics.surface_water_photos_metrics(*results) + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) From f174966f382c49d443d5d5afbb6309d22c9656ba Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Thu, 5 Feb 2026 23:37:59 +0000 Subject: [PATCH 4/6] Formatting changes --- ...4e5f6_link_surface_water_photos_to_surface_water_data.py | 6 ++---- .../f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py index a8b60040..daffdccb 100644 --- a/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py +++ b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py @@ -33,8 +33,7 @@ def upgrade() -> None: ["SurfaceID"], ondelete="CASCADE", ) - op.execute( - """ + op.execute(""" DELETE FROM "NMA_SurfaceWaterPhotos" p WHERE p."SurfaceID" IS NULL OR NOT EXISTS ( @@ -42,8 +41,7 @@ def upgrade() -> None: FROM "NMA_SurfaceWaterData" d WHERE d."SurfaceID" = p."SurfaceID" ) - """ - ) + """) op.alter_column( "NMA_SurfaceWaterPhotos", "SurfaceID", diff --git a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py index 3efcbbfb..53bede05 100644 --- a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py +++ b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py @@ -33,16 +33,14 @@ def upgrade() -> None: ondelete="CASCADE", ) # Backfill thing_id based on LocationId -> Thing.nma_pk_location - op.execute( - """ + op.execute(""" UPDATE "NMA_WeatherData" wd SET thing_id = t.id FROM thing t WHERE t.nma_pk_location IS NOT NULL AND wd."LocationId" IS NOT NULL AND t.nma_pk_location = wd."LocationId"::text - """ - ) + """) # Remove any rows that cannot be linked to a Thing, then enforce NOT NULL op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL') op.alter_column( From 87bc8ca4047106c088c59d5e0e2f9390c852c96d Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 5 Feb 2026 13:55:49 -0700 Subject: [PATCH 5/6] feat(nma_legacy): establish relationship between NMA_WeatherData and NMA_WeatherPhotos via WeatherID - move weather_id under FK section and enforce NOT NULL - add FK + relationship + validator on NMA_WeatherPhotos - mark NMA_WeatherData.weather_id unique for FK target - run WeatherPhotos transfer after WeatherData and skip orphan photos - add migration to backfill/cleanup and enforce FK/NOT NULL - update legacy tests to create WeatherPhotos with a real parent WeatherData --- ...1a2_link_weather_photos_to_weather_data.py | 72 +++++++++++++++++++ db/nma_legacy.py | 36 ++++++++-- tests/test_weather_photos_legacy.py | 43 +++++++++-- transfers/transfer.py | 9 +-- transfers/weather_photos.py | 48 +++++++++++-- 5 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py diff --git a/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py b/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py new file mode 100644 index 00000000..77cabb05 --- /dev/null +++ b/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py @@ -0,0 +1,72 @@ +"""link weather photos to weather data + +Revision ID: b7c8d9e0f1a2 +Revises: a1b2c3d4e5f6 +Create Date: 2026-02-05 11:20:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "b7c8d9e0f1a2" +down_revision: Union[str, Sequence[str], None] = "a1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_unique_constraint( + "uq_weather_data_weather_id", + "NMA_WeatherData", + ["WeatherID"], + ) + op.create_foreign_key( + "fk_weather_photos_weather_id", + "NMA_WeatherPhotos", + "NMA_WeatherData", + ["WeatherID"], + ["WeatherID"], + ondelete="CASCADE", + ) + op.execute( + """ + DELETE FROM "NMA_WeatherPhotos" p + WHERE p."WeatherID" IS NULL + OR NOT EXISTS ( + SELECT 1 + FROM "NMA_WeatherData" d + WHERE d."WeatherID" = p."WeatherID" + ) + """ + ) + op.alter_column( + "NMA_WeatherPhotos", + "WeatherID", + existing_type=sa.UUID(), + nullable=False, + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.alter_column( + "NMA_WeatherPhotos", + "WeatherID", + existing_type=sa.UUID(), + nullable=True, + ) + op.drop_constraint( + "fk_weather_photos_weather_id", + "NMA_WeatherPhotos", + type_="foreignkey", + ) + op.drop_constraint( + "uq_weather_data_weather_id", + "NMA_WeatherData", + type_="unique", + ) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index d943b255..3c80eb7a 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -702,7 +702,7 @@ class NMA_WeatherData(Base): # Legacy PK (for audit) weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( - "WeatherID", UUID(as_uuid=True) + "WeatherID", UUID(as_uuid=True), unique=True ) # Legacy FK (for audit) @@ -715,6 +715,12 @@ class NMA_WeatherData(Base): # Relationships thing: Mapped["Thing"] = relationship("Thing", back_populates="weather_data") + weather_photos: Mapped[list["NMA_WeatherPhotos"]] = relationship( + "NMA_WeatherPhotos", + back_populates="weather_data", + cascade="all, delete-orphan", + passive_deletes=True, + ) @validates("thing_id") def validate_thing_id(self, key, value): @@ -730,7 +736,7 @@ class NMA_WeatherPhotos(Base): """ Legacy WeatherPhotos table from NM_Aquifer. - Note: This table is OUT OF SCOPE for refactoring (not a Thing child). + Note: This table is a child of NMA_WeatherData via WeatherID. """ __tablename__ = "NMA_WeatherPhotos" @@ -741,21 +747,39 @@ class NMA_WeatherPhotos(Base): ) # FK: - # FK not assigned. + weather_id: Mapped[uuid.UUID] = mapped_column( + "WeatherID", + UUID(as_uuid=True), + ForeignKey("NMA_WeatherData.WeatherID", ondelete="CASCADE"), + nullable=False, + ) # Legacy PK (for audit): # Current `global_id` is also the original PK in the legacy DB # Legacy FK (for audit): - weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( - "WeatherID", UUID(as_uuid=True) - ) + # weather_id is also the legacy FK in the source table. # Additional columns point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50)) object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + # Relationships + weather_data: Mapped["NMA_WeatherData"] = relationship( + "NMA_WeatherData", back_populates="weather_photos" + ) + + @validates("weather_id") + def validate_weather_id(self, key, value): + """Prevent orphan NMA_WeatherPhotos - must have a parent NMA_WeatherData.""" + if value is None: + raise ValueError( + "NMA_WeatherPhotos requires a parent NMA_WeatherData " + "(weather_id cannot be None)" + ) + return value + class NMA_Soil_Rock_Results(Base): """ diff --git a/tests/test_weather_photos_legacy.py b/tests/test_weather_photos_legacy.py index f808dd87..b76fd5a5 100644 --- a/tests/test_weather_photos_legacy.py +++ b/tests/test_weather_photos_legacy.py @@ -28,14 +28,43 @@ from uuid import uuid4 from db.engine import session_ctx -from db.nma_legacy import NMA_WeatherPhotos +from db.nma_legacy import NMA_WeatherData, NMA_WeatherPhotos +from db.thing import Thing -def test_create_weather_photos_all_fields(): +def _next_object_id() -> int: + # Use a negative value to avoid collisions with existing legacy OBJECTIDs. + return -(uuid4().int % 2_000_000_000) + + +def _attach_thing_with_location(session, water_well_thing): + location_id = uuid4() + thing = session.get(Thing, water_well_thing.id) + thing.nma_pk_location = str(location_id) + session.commit() + return thing, location_id + + +def _create_weather_data(session, water_well_thing): + thing, location_id = _attach_thing_with_location(session, water_well_thing) + record = NMA_WeatherData( + object_id=_next_object_id(), + location_id=location_id, + point_id="WX-1000", + weather_id=uuid4(), + thing_id=thing.id, + ) + session.add(record) + session.commit() + return record + + +def test_create_weather_photos_all_fields(water_well_thing): """Test creating a weather photos record with all fields.""" with session_ctx() as session: + parent = _create_weather_data(session, water_well_thing) record = NMA_WeatherPhotos( - weather_id=uuid4(), + weather_id=parent.weather_id, point_id="WP-0001", ole_path="weather.jpg", object_id=321, @@ -52,14 +81,17 @@ def test_create_weather_photos_all_fields(): assert record.object_id == 321 session.delete(record) + session.delete(parent) session.commit() -def test_create_weather_photos_minimal(): +def test_create_weather_photos_minimal(water_well_thing): """Test creating a weather photos record with required fields only.""" with session_ctx() as session: + parent = _create_weather_data(session, water_well_thing) record = NMA_WeatherPhotos( point_id="WP-0002", + weather_id=parent.weather_id, global_id=uuid4(), ) session.add(record) @@ -68,11 +100,12 @@ def test_create_weather_photos_minimal(): assert record.global_id is not None assert record.point_id == "WP-0002" - assert record.weather_id is None + assert record.weather_id is not None assert record.ole_path is None assert record.object_id is None session.delete(record) + session.delete(parent) session.commit() diff --git a/transfers/transfer.py b/transfers/transfer.py index 1a44e3c3..fbee46f3 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -515,8 +515,6 @@ def _transfer_parallel( parallel_tasks_1.append(("Groups", ProjectGroupTransferer)) if opts.transfer_soil_rock_results: parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer)) - if opts.transfer_weather_photos: - parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer)) if opts.transfer_assets: parallel_tasks_1.append(("Assets", AssetTransferer)) if opts.transfer_associated_data: @@ -622,14 +620,17 @@ def _transfer_parallel( ) if "WeatherData" in results_map and results_map["WeatherData"]: metrics.weather_data_metrics(*results_map["WeatherData"]) - if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: - metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) if opts.transfer_surface_water_photos: message("TRANSFERRING SURFACE WATER PHOTOS") results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) metrics.surface_water_photos_metrics(*results) + if opts.transfer_weather_photos: + message("TRANSFERRING WEATHER PHOTOS") + results = _execute_transfer(WeatherPhotosTransferer, flags=flags) + metrics.weather_photos_metrics(*results) + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) diff --git a/transfers/weather_photos.py b/transfers/weather_photos.py index a223c42a..6d5e804c 100644 --- a/transfers/weather_photos.py +++ b/transfers/weather_photos.py @@ -23,7 +23,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_WeatherPhotos +from db import NMA_WeatherData, NMA_WeatherPhotos +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import replace_nans @@ -37,6 +38,19 @@ class WeatherPhotosTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._weather_id_cache: set[str] = set() + self._build_weather_id_cache() + + def _build_weather_id_cache(self) -> None: + with session_ctx() as session: + weather_ids = session.query(NMA_WeatherData.weather_id).all() + for (weather_id,) in weather_ids: + if weather_id: + self._weather_id_cache.add(self._normalize_weather_id(weather_id)) + logger.info( + "Built WeatherData cache with %s weather ids", + len(self._weather_id_cache), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -44,12 +58,24 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, cleaned_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows: list[dict[str, Any]] = [] + skipped_missing_parent = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_parent += 1 + continue + rows.append(record) rows = self._dedupe_rows(rows, key="GlobalID") if not rows: logger.info("No WeatherPhotos rows to transfer") return + if skipped_missing_parent: + logger.warning( + "Skipped %s WeatherPhotos rows without matching WeatherData", + skipped_missing_parent, + ) insert_stmt = insert(NMA_WeatherPhotos) excluded = insert_stmt.excluded @@ -74,9 +100,16 @@ def _transfer_hook(self, session: Session) -> None: session.execute(stmt) session.commit() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: + weather_id = self._uuid_val(row.get("WeatherID")) + if weather_id is None or not self._has_weather_id(weather_id): + logger.warning( + "Skipping WeatherPhotos WeatherID=%s - WeatherData not found", + weather_id, + ) + return None return { - "WeatherID": self._uuid_val(row.get("WeatherID")), + "WeatherID": weather_id, "PointID": row.get("PointID"), "OLEPath": row.get("OLEPath"), "OBJECTID": row.get("OBJECTID"), @@ -107,6 +140,13 @@ def _uuid_val(self, value: Any) -> Optional[UUID]: return None return None + def _has_weather_id(self, weather_id: UUID) -> bool: + return self._normalize_weather_id(weather_id) in self._weather_id_cache + + @staticmethod + def _normalize_weather_id(value: UUID) -> str: + return str(value).strip().lower() + def run(batch_size: int = 1000) -> None: """Entrypoint to execute the transfer.""" From 438ef7d526a45063b5b22e4c916db2137009ee69 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Thu, 5 Feb 2026 23:42:30 +0000 Subject: [PATCH 6/6] Formatting changes --- ...4e5f6_link_surface_water_photos_to_surface_water_data.py | 6 ++---- .../b7c8d9e0f1a2_link_weather_photos_to_weather_data.py | 6 ++---- .../f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py | 6 ++---- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py index a8b60040..daffdccb 100644 --- a/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py +++ b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py @@ -33,8 +33,7 @@ def upgrade() -> None: ["SurfaceID"], ondelete="CASCADE", ) - op.execute( - """ + op.execute(""" DELETE FROM "NMA_SurfaceWaterPhotos" p WHERE p."SurfaceID" IS NULL OR NOT EXISTS ( @@ -42,8 +41,7 @@ def upgrade() -> None: FROM "NMA_SurfaceWaterData" d WHERE d."SurfaceID" = p."SurfaceID" ) - """ - ) + """) op.alter_column( "NMA_SurfaceWaterPhotos", "SurfaceID", diff --git a/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py b/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py index 77cabb05..b3e17b21 100644 --- a/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py +++ b/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py @@ -33,8 +33,7 @@ def upgrade() -> None: ["WeatherID"], ondelete="CASCADE", ) - op.execute( - """ + op.execute(""" DELETE FROM "NMA_WeatherPhotos" p WHERE p."WeatherID" IS NULL OR NOT EXISTS ( @@ -42,8 +41,7 @@ def upgrade() -> None: FROM "NMA_WeatherData" d WHERE d."WeatherID" = p."WeatherID" ) - """ - ) + """) op.alter_column( "NMA_WeatherPhotos", "WeatherID", diff --git a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py index 3efcbbfb..53bede05 100644 --- a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py +++ b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py @@ -33,16 +33,14 @@ def upgrade() -> None: ondelete="CASCADE", ) # Backfill thing_id based on LocationId -> Thing.nma_pk_location - op.execute( - """ + op.execute(""" UPDATE "NMA_WeatherData" wd SET thing_id = t.id FROM thing t WHERE t.nma_pk_location IS NOT NULL AND wd."LocationId" IS NOT NULL AND t.nma_pk_location = wd."LocationId"::text - """ - ) + """) # Remove any rows that cannot be linked to a Thing, then enforce NOT NULL op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL') op.alter_column(