Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ def upgrade() -> None:
["SurfaceID"],
ondelete="CASCADE",
)
op.execute(
"""
op.execute("""
DELETE FROM "NMA_SurfaceWaterPhotos" p
WHERE p."SurfaceID" IS NULL
OR NOT EXISTS (
SELECT 1
FROM "NMA_SurfaceWaterData" d
WHERE d."SurfaceID" = p."SurfaceID"
)
"""
)
""")
op.alter_column(
"NMA_SurfaceWaterPhotos",
"SurfaceID",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""link weather photos to weather data

Revision ID: b7c8d9e0f1a2
Revises: a1b2c3d4e5f6
Create Date: 2026-02-05 11:20:00.000000

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "b7c8d9e0f1a2"
down_revision: Union[str, Sequence[str], None] = "a1b2c3d4e5f6"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
op.create_unique_constraint(
"uq_weather_data_weather_id",
"NMA_WeatherData",
["WeatherID"],
)
op.create_foreign_key(
"fk_weather_photos_weather_id",
"NMA_WeatherPhotos",
"NMA_WeatherData",
["WeatherID"],
["WeatherID"],
ondelete="CASCADE",
)
op.execute("""
DELETE FROM "NMA_WeatherPhotos" p
WHERE p."WeatherID" IS NULL
OR NOT EXISTS (
SELECT 1
FROM "NMA_WeatherData" d
WHERE d."WeatherID" = p."WeatherID"
)
""")
op.alter_column(
"NMA_WeatherPhotos",
"WeatherID",
existing_type=sa.UUID(),
nullable=False,
)


def downgrade() -> None:
"""Downgrade schema."""
op.alter_column(
"NMA_WeatherPhotos",
"WeatherID",
existing_type=sa.UUID(),
nullable=True,
)
op.drop_constraint(
"fk_weather_photos_weather_id",
"NMA_WeatherPhotos",
type_="foreignkey",
)
op.drop_constraint(
"uq_weather_data_weather_id",
"NMA_WeatherData",
type_="unique",
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@ def upgrade() -> None:
ondelete="CASCADE",
)
# Backfill thing_id based on LocationId -> Thing.nma_pk_location
op.execute(
"""
op.execute("""
UPDATE "NMA_WeatherData" wd
SET thing_id = t.id
FROM thing t
WHERE t.nma_pk_location IS NOT NULL
AND wd."LocationId" IS NOT NULL
AND t.nma_pk_location = wd."LocationId"::text
"""
)
""")
# Remove any rows that cannot be linked to a Thing, then enforce NOT NULL
op.execute('DELETE FROM "NMA_WeatherData" WHERE thing_id IS NULL')
op.alter_column(
Expand Down
36 changes: 30 additions & 6 deletions db/nma_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ class NMA_WeatherData(Base):

# Legacy PK (for audit)
weather_id: Mapped[Optional[uuid.UUID]] = mapped_column(
"WeatherID", UUID(as_uuid=True)
"WeatherID", UUID(as_uuid=True), unique=True
)

# Legacy FK (for audit)
Expand All @@ -715,6 +715,12 @@ class NMA_WeatherData(Base):

# Relationships
thing: Mapped["Thing"] = relationship("Thing", back_populates="weather_data")
weather_photos: Mapped[list["NMA_WeatherPhotos"]] = relationship(
"NMA_WeatherPhotos",
back_populates="weather_data",
cascade="all, delete-orphan",
passive_deletes=True,
)

@validates("thing_id")
def validate_thing_id(self, key, value):
Expand All @@ -730,7 +736,7 @@ class NMA_WeatherPhotos(Base):
"""
Legacy WeatherPhotos table from NM_Aquifer.

Note: This table is OUT OF SCOPE for refactoring (not a Thing child).
Note: This table is a child of NMA_WeatherData via WeatherID.
"""

__tablename__ = "NMA_WeatherPhotos"
Expand All @@ -741,21 +747,39 @@ class NMA_WeatherPhotos(Base):
)

# FK:
# FK not assigned.
weather_id: Mapped[uuid.UUID] = mapped_column(
"WeatherID",
UUID(as_uuid=True),
ForeignKey("NMA_WeatherData.WeatherID", ondelete="CASCADE"),
nullable=False,
)

# Legacy PK (for audit):
# Current `global_id` is also the original PK in the legacy DB

# Legacy FK (for audit):
weather_id: Mapped[Optional[uuid.UUID]] = mapped_column(
"WeatherID", UUID(as_uuid=True)
)
# weather_id is also the legacy FK in the source table.

# Additional columns
point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False)
ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50))
object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True)

# Relationships
weather_data: Mapped["NMA_WeatherData"] = relationship(
"NMA_WeatherData", back_populates="weather_photos"
)

@validates("weather_id")
def validate_weather_id(self, key, value):
"""Prevent orphan NMA_WeatherPhotos - must have a parent NMA_WeatherData."""
if value is None:
raise ValueError(
"NMA_WeatherPhotos requires a parent NMA_WeatherData "
"(weather_id cannot be None)"
)
return value


class NMA_Soil_Rock_Results(Base):
"""
Expand Down
43 changes: 38 additions & 5 deletions tests/test_weather_photos_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,43 @@
from uuid import uuid4

from db.engine import session_ctx
from db.nma_legacy import NMA_WeatherPhotos
from db.nma_legacy import NMA_WeatherData, NMA_WeatherPhotos
from db.thing import Thing


def test_create_weather_photos_all_fields():
def _next_object_id() -> int:
# Use a negative value to avoid collisions with existing legacy OBJECTIDs.
return -(uuid4().int % 2_000_000_000)


def _attach_thing_with_location(session, water_well_thing):
location_id = uuid4()
thing = session.get(Thing, water_well_thing.id)
thing.nma_pk_location = str(location_id)
session.commit()
return thing, location_id


def _create_weather_data(session, water_well_thing):
thing, location_id = _attach_thing_with_location(session, water_well_thing)
record = NMA_WeatherData(
object_id=_next_object_id(),
location_id=location_id,
point_id="WX-1000",
weather_id=uuid4(),
thing_id=thing.id,
)
session.add(record)
session.commit()
return record


def test_create_weather_photos_all_fields(water_well_thing):
"""Test creating a weather photos record with all fields."""
with session_ctx() as session:
parent = _create_weather_data(session, water_well_thing)
record = NMA_WeatherPhotos(
weather_id=uuid4(),
weather_id=parent.weather_id,
point_id="WP-0001",
ole_path="weather.jpg",
object_id=321,
Expand All @@ -52,14 +81,17 @@ def test_create_weather_photos_all_fields():
assert record.object_id == 321

session.delete(record)
session.delete(parent)
session.commit()


def test_create_weather_photos_minimal():
def test_create_weather_photos_minimal(water_well_thing):
"""Test creating a weather photos record with required fields only."""
with session_ctx() as session:
parent = _create_weather_data(session, water_well_thing)
record = NMA_WeatherPhotos(
point_id="WP-0002",
weather_id=parent.weather_id,
global_id=uuid4(),
)
session.add(record)
Expand All @@ -68,11 +100,12 @@ def test_create_weather_photos_minimal():

assert record.global_id is not None
assert record.point_id == "WP-0002"
assert record.weather_id is None
assert record.weather_id is not None
assert record.ole_path is None
assert record.object_id is None

session.delete(record)
session.delete(parent)
session.commit()


Expand Down
9 changes: 5 additions & 4 deletions transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,6 @@ def _transfer_parallel(
parallel_tasks_1.append(("Groups", ProjectGroupTransferer))
if opts.transfer_soil_rock_results:
parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer))
if opts.transfer_weather_photos:
parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer))
if opts.transfer_assets:
parallel_tasks_1.append(("Assets", AssetTransferer))
if opts.transfer_associated_data:
Expand Down Expand Up @@ -622,14 +620,17 @@ def _transfer_parallel(
)
if "WeatherData" in results_map and results_map["WeatherData"]:
metrics.weather_data_metrics(*results_map["WeatherData"])
if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]:
metrics.weather_photos_metrics(*results_map["WeatherPhotos"])

if opts.transfer_surface_water_photos:
message("TRANSFERRING SURFACE WATER PHOTOS")
results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags)
metrics.surface_water_photos_metrics(*results)

if opts.transfer_weather_photos:
message("TRANSFERRING WEATHER PHOTOS")
results = _execute_transfer(WeatherPhotosTransferer, flags=flags)
metrics.weather_photos_metrics(*results)

if opts.transfer_major_chemistry:
message("TRANSFERRING MAJOR CHEMISTRY")
results = _execute_transfer(MajorChemistryTransferer, flags=flags)
Expand Down
48 changes: 44 additions & 4 deletions transfers/weather_photos.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from db import NMA_WeatherPhotos
from db import NMA_WeatherData, NMA_WeatherPhotos
from db.engine import session_ctx
from transfers.logger import logger
from transfers.transferer import Transferer
from transfers.util import replace_nans
Expand All @@ -37,19 +38,44 @@ class WeatherPhotosTransferer(Transferer):
def __init__(self, *args, batch_size: int = 1000, **kwargs):
super().__init__(*args, **kwargs)
self.batch_size = batch_size
self._weather_id_cache: set[str] = set()
self._build_weather_id_cache()

def _build_weather_id_cache(self) -> None:
with session_ctx() as session:
weather_ids = session.query(NMA_WeatherData.weather_id).all()
for (weather_id,) in weather_ids:
if weather_id:
self._weather_id_cache.add(self._normalize_weather_id(weather_id))
logger.info(
"Built WeatherData cache with %s weather ids",
len(self._weather_id_cache),
)

def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
df = self._read_csv(self.source_table)
cleaned_df = replace_nans(df)
return df, cleaned_df

def _transfer_hook(self, session: Session) -> None:
rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")]
rows: list[dict[str, Any]] = []
skipped_missing_parent = 0
for raw in self.cleaned_df.to_dict("records"):
record = self._row_dict(raw)
if record is None:
skipped_missing_parent += 1
continue
rows.append(record)
rows = self._dedupe_rows(rows, key="GlobalID")

if not rows:
logger.info("No WeatherPhotos rows to transfer")
return
if skipped_missing_parent:
logger.warning(
"Skipped %s WeatherPhotos rows without matching WeatherData",
skipped_missing_parent,
)

insert_stmt = insert(NMA_WeatherPhotos)
excluded = insert_stmt.excluded
Expand All @@ -74,9 +100,16 @@ def _transfer_hook(self, session: Session) -> None:
session.execute(stmt)
session.commit()

def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]:
weather_id = self._uuid_val(row.get("WeatherID"))
if weather_id is None or not self._has_weather_id(weather_id):
logger.warning(
"Skipping WeatherPhotos WeatherID=%s - WeatherData not found",
weather_id,
)
return None
return {
"WeatherID": self._uuid_val(row.get("WeatherID")),
"WeatherID": weather_id,
"PointID": row.get("PointID"),
"OLEPath": row.get("OLEPath"),
"OBJECTID": row.get("OBJECTID"),
Expand Down Expand Up @@ -107,6 +140,13 @@ def _uuid_val(self, value: Any) -> Optional[UUID]:
return None
return None

def _has_weather_id(self, weather_id: UUID) -> bool:
return self._normalize_weather_id(weather_id) in self._weather_id_cache

@staticmethod
def _normalize_weather_id(value: UUID) -> str:
return str(value).strip().lower()


def run(batch_size: int = 1000) -> None:
"""Entrypoint to execute the transfer."""
Expand Down
Loading