diff --git a/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py index a8b60040..daffdccb 100644 --- a/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py +++ b/alembic/versions/a1b2c3d4e5f6_link_surface_water_photos_to_surface_water_data.py @@ -33,8 +33,7 @@ def upgrade() -> None: ["SurfaceID"], ondelete="CASCADE", ) - op.execute( - """ + op.execute(""" DELETE FROM "NMA_SurfaceWaterPhotos" p WHERE p."SurfaceID" IS NULL OR NOT EXISTS ( @@ -42,8 +41,7 @@ def upgrade() -> None: FROM "NMA_SurfaceWaterData" d WHERE d."SurfaceID" = p."SurfaceID" ) - """ - ) + """) op.alter_column( "NMA_SurfaceWaterPhotos", "SurfaceID", diff --git a/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py b/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py new file mode 100644 index 00000000..b3e17b21 --- /dev/null +++ b/alembic/versions/b7c8d9e0f1a2_link_weather_photos_to_weather_data.py @@ -0,0 +1,70 @@ +"""link weather photos to weather data + +Revision ID: b7c8d9e0f1a2 +Revises: a1b2c3d4e5f6 +Create Date: 2026-02-05 11:20:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "b7c8d9e0f1a2" +down_revision: Union[str, Sequence[str], None] = "a1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_unique_constraint( + "uq_weather_data_weather_id", + "NMA_WeatherData", + ["WeatherID"], + ) + op.create_foreign_key( + "fk_weather_photos_weather_id", + "NMA_WeatherPhotos", + "NMA_WeatherData", + ["WeatherID"], + ["WeatherID"], + ondelete="CASCADE", + ) + op.execute(""" + DELETE FROM "NMA_WeatherPhotos" p + WHERE p."WeatherID" IS NULL + OR NOT EXISTS ( + SELECT 1 + FROM "NMA_WeatherData" d + WHERE d."WeatherID" = p."WeatherID" + ) + """) + op.alter_column( + "NMA_WeatherPhotos", + "WeatherID", + existing_type=sa.UUID(), + nullable=False, + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.alter_column( + "NMA_WeatherPhotos", + "WeatherID", + existing_type=sa.UUID(), + nullable=True, + ) + op.drop_constraint( + "fk_weather_photos_weather_id", + "NMA_WeatherPhotos", + type_="foreignkey", + ) + op.drop_constraint( + "uq_weather_data_weather_id", + "NMA_WeatherData", + type_="unique", + ) diff --git a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py index 3efcbbfb..53bede05 100644 --- a/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py +++ b/alembic/versions/f6e5d4c3b2a1_add_thing_id_to_nma_weather_data.py @@ -33,16 +33,14 @@ def upgrade() -> None: ondelete="CASCADE", ) # Backfill thing_id based on LocationId -> Thing.nma_pk_location - op.execute( - """ + op.execute(""" UPDATE "NMA_WeatherData" wd SET thing_id = t.id FROM thing t WHERE t.nma_pk_location IS NOT NULL AND wd."LocationId" IS NOT NULL AND t.nma_pk_location = wd."LocationId"::text - """ - ) + """) # Remove any rows that cannot be linked to a Thing, then enforce NOT NULL op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL') op.alter_column( diff --git a/db/nma_legacy.py b/db/nma_legacy.py index d943b255..3c80eb7a 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -702,7 +702,7 @@ class NMA_WeatherData(Base): # Legacy PK (for audit) weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( - "WeatherID", UUID(as_uuid=True) + "WeatherID", UUID(as_uuid=True), unique=True ) # Legacy FK (for audit) @@ -715,6 +715,12 @@ class NMA_WeatherData(Base): # Relationships thing: Mapped["Thing"] = relationship("Thing", back_populates="weather_data") + weather_photos: Mapped[list["NMA_WeatherPhotos"]] = relationship( + "NMA_WeatherPhotos", + back_populates="weather_data", + cascade="all, delete-orphan", + passive_deletes=True, + ) @validates("thing_id") def validate_thing_id(self, key, value): @@ -730,7 +736,7 @@ class NMA_WeatherPhotos(Base): """ Legacy WeatherPhotos table from NM_Aquifer. - Note: This table is OUT OF SCOPE for refactoring (not a Thing child). + Note: This table is a child of NMA_WeatherData via WeatherID. """ __tablename__ = "NMA_WeatherPhotos" @@ -741,21 +747,39 @@ class NMA_WeatherPhotos(Base): ) # FK: - # FK not assigned. + weather_id: Mapped[uuid.UUID] = mapped_column( + "WeatherID", + UUID(as_uuid=True), + ForeignKey("NMA_WeatherData.WeatherID", ondelete="CASCADE"), + nullable=False, + ) # Legacy PK (for audit): # Current `global_id` is also the original PK in the legacy DB # Legacy FK (for audit): - weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( - "WeatherID", UUID(as_uuid=True) - ) + # weather_id is also the legacy FK in the source table. # Additional columns point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50)) object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + # Relationships + weather_data: Mapped["NMA_WeatherData"] = relationship( + "NMA_WeatherData", back_populates="weather_photos" + ) + + @validates("weather_id") + def validate_weather_id(self, key, value): + """Prevent orphan NMA_WeatherPhotos - must have a parent NMA_WeatherData.""" + if value is None: + raise ValueError( + "NMA_WeatherPhotos requires a parent NMA_WeatherData " + "(weather_id cannot be None)" + ) + return value + class NMA_Soil_Rock_Results(Base): """ diff --git a/tests/test_weather_photos_legacy.py b/tests/test_weather_photos_legacy.py index f808dd87..b76fd5a5 100644 --- a/tests/test_weather_photos_legacy.py +++ b/tests/test_weather_photos_legacy.py @@ -28,14 +28,43 @@ from uuid import uuid4 from db.engine import session_ctx -from db.nma_legacy import NMA_WeatherPhotos +from db.nma_legacy import NMA_WeatherData, NMA_WeatherPhotos +from db.thing import Thing -def test_create_weather_photos_all_fields(): +def _next_object_id() -> int: + # Use a negative value to avoid collisions with existing legacy OBJECTIDs. + return -(uuid4().int % 2_000_000_000) + + +def _attach_thing_with_location(session, water_well_thing): + location_id = uuid4() + thing = session.get(Thing, water_well_thing.id) + thing.nma_pk_location = str(location_id) + session.commit() + return thing, location_id + + +def _create_weather_data(session, water_well_thing): + thing, location_id = _attach_thing_with_location(session, water_well_thing) + record = NMA_WeatherData( + object_id=_next_object_id(), + location_id=location_id, + point_id="WX-1000", + weather_id=uuid4(), + thing_id=thing.id, + ) + session.add(record) + session.commit() + return record + + +def test_create_weather_photos_all_fields(water_well_thing): """Test creating a weather photos record with all fields.""" with session_ctx() as session: + parent = _create_weather_data(session, water_well_thing) record = NMA_WeatherPhotos( - weather_id=uuid4(), + weather_id=parent.weather_id, point_id="WP-0001", ole_path="weather.jpg", object_id=321, @@ -52,14 +81,17 @@ def test_create_weather_photos_all_fields(): assert record.object_id == 321 session.delete(record) + session.delete(parent) session.commit() -def test_create_weather_photos_minimal(): +def test_create_weather_photos_minimal(water_well_thing): """Test creating a weather photos record with required fields only.""" with session_ctx() as session: + parent = _create_weather_data(session, water_well_thing) record = NMA_WeatherPhotos( point_id="WP-0002", + weather_id=parent.weather_id, global_id=uuid4(), ) session.add(record) @@ -68,11 +100,12 @@ def test_create_weather_photos_minimal(): assert record.global_id is not None assert record.point_id == "WP-0002" - assert record.weather_id is None + assert record.weather_id is not None assert record.ole_path is None assert record.object_id is None session.delete(record) + session.delete(parent) session.commit() diff --git a/transfers/transfer.py b/transfers/transfer.py index 1a44e3c3..fbee46f3 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -515,8 +515,6 @@ def _transfer_parallel( parallel_tasks_1.append(("Groups", ProjectGroupTransferer)) if opts.transfer_soil_rock_results: parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer)) - if opts.transfer_weather_photos: - parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer)) if opts.transfer_assets: parallel_tasks_1.append(("Assets", AssetTransferer)) if opts.transfer_associated_data: @@ -622,14 +620,17 @@ def _transfer_parallel( ) if "WeatherData" in results_map and results_map["WeatherData"]: metrics.weather_data_metrics(*results_map["WeatherData"]) - if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: - metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) if opts.transfer_surface_water_photos: message("TRANSFERRING SURFACE WATER PHOTOS") results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) metrics.surface_water_photos_metrics(*results) + if opts.transfer_weather_photos: + message("TRANSFERRING WEATHER PHOTOS") + results = _execute_transfer(WeatherPhotosTransferer, flags=flags) + metrics.weather_photos_metrics(*results) + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) diff --git a/transfers/weather_photos.py b/transfers/weather_photos.py index a223c42a..6d5e804c 100644 --- a/transfers/weather_photos.py +++ b/transfers/weather_photos.py @@ -23,7 +23,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_WeatherPhotos +from db import NMA_WeatherData, NMA_WeatherPhotos +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import replace_nans @@ -37,6 +38,19 @@ class WeatherPhotosTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._weather_id_cache: set[str] = set() + self._build_weather_id_cache() + + def _build_weather_id_cache(self) -> None: + with session_ctx() as session: + weather_ids = session.query(NMA_WeatherData.weather_id).all() + for (weather_id,) in weather_ids: + if weather_id: + self._weather_id_cache.add(self._normalize_weather_id(weather_id)) + logger.info( + "Built WeatherData cache with %s weather ids", + len(self._weather_id_cache), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -44,12 +58,24 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, cleaned_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows: list[dict[str, Any]] = [] + skipped_missing_parent = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_parent += 1 + continue + rows.append(record) rows = self._dedupe_rows(rows, key="GlobalID") if not rows: logger.info("No WeatherPhotos rows to transfer") return + if skipped_missing_parent: + logger.warning( + "Skipped %s WeatherPhotos rows without matching WeatherData", + skipped_missing_parent, + ) insert_stmt = insert(NMA_WeatherPhotos) excluded = insert_stmt.excluded @@ -74,9 +100,16 @@ def _transfer_hook(self, session: Session) -> None: session.execute(stmt) session.commit() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: + weather_id = self._uuid_val(row.get("WeatherID")) + if weather_id is None or not self._has_weather_id(weather_id): + logger.warning( + "Skipping WeatherPhotos WeatherID=%s - WeatherData not found", + weather_id, + ) + return None return { - "WeatherID": self._uuid_val(row.get("WeatherID")), + "WeatherID": weather_id, "PointID": row.get("PointID"), "OLEPath": row.get("OLEPath"), "OBJECTID": row.get("OBJECTID"), @@ -107,6 +140,13 @@ def _uuid_val(self, value: Any) -> Optional[UUID]: return None return None + def _has_weather_id(self, weather_id: UUID) -> bool: + return self._normalize_weather_id(weather_id) in self._weather_id_cache + + @staticmethod + def _normalize_weather_id(value: UUID) -> str: + return str(value).strip().lower() + def run(batch_size: int = 1000) -> None: """Entrypoint to execute the transfer."""