diff --git a/admin/config.py b/admin/config.py index 5aec1a5f..1c3bb14f 100644 --- a/admin/config.py +++ b/admin/config.py @@ -53,6 +53,7 @@ SurfaceWaterPhotosAdmin, ThingAdmin, TransducerObservationAdmin, + WaterLevelsContinuousPressureDailyAdmin, WeatherPhotosAdmin, WeatherDataAdmin, FieldParametersAdmin, @@ -80,6 +81,7 @@ NMA_Soil_Rock_Results, NMA_Stratigraphy, NMA_SurfaceWaterData, + NMA_WaterLevelsContinuous_Pressure_Daily, NMA_WeatherPhotos, NMA_SurfaceWaterPhotos, NMA_WeatherData, @@ -192,6 +194,13 @@ def create_admin(app): # Transducer observations admin.add_view(TransducerObservationAdmin(TransducerObservation)) + # Water Levels - Continuous (legacy) + admin.add_view( + WaterLevelsContinuousPressureDailyAdmin( + NMA_WaterLevelsContinuous_Pressure_Daily + ) + ) + # Weather admin.add_view(WeatherPhotosAdmin(NMA_WeatherPhotos)) diff --git a/admin/views/__init__.py b/admin/views/__init__.py index 33920b85..285d5ef5 100644 --- a/admin/views/__init__.py +++ b/admin/views/__init__.py @@ -52,6 +52,9 @@ from admin.views.surface_water_photos import SurfaceWaterPhotosAdmin from admin.views.thing import ThingAdmin from admin.views.transducer_observation import TransducerObservationAdmin +from admin.views.waterlevelscontinuous_pressure_daily import ( + WaterLevelsContinuousPressureDailyAdmin, +) from admin.views.weather_photos import WeatherPhotosAdmin from admin.views.weather_data import WeatherDataAdmin @@ -88,6 +91,7 @@ "SurfaceWaterPhotosAdmin", "ThingAdmin", "TransducerObservationAdmin", + "WaterLevelsContinuousPressureDailyAdmin", "WeatherPhotosAdmin", "WeatherDataAdmin", ] diff --git a/admin/views/waterlevelscontinuous_pressure_daily.py b/admin/views/waterlevelscontinuous_pressure_daily.py new file mode 100644 index 00000000..ac2afb02 --- /dev/null +++ b/admin/views/waterlevelscontinuous_pressure_daily.py @@ -0,0 +1,148 @@ +# =============================================================================== +# Copyright 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +WaterLevelsContinuousPressureDailyAdmin view for legacy NMA_WaterLevelsContinuous_Pressure_Daily. +""" + +from starlette.requests import Request + +from admin.views.base import OcotilloModelView + + +class WaterLevelsContinuousPressureDailyAdmin(OcotilloModelView): + """ + Admin view for NMA_WaterLevelsContinuous_Pressure_Daily model. + """ + + # ========== Basic Configuration ========== + name = "NMA Water Levels Continuous Pressure Daily" + label = "NMA Water Levels Continuous Pressure Daily" + icon = "fa fa-tachometer-alt" + + def can_create(self, request: Request) -> bool: + return False + + def can_edit(self, request: Request) -> bool: + return False + + def can_delete(self, request: Request) -> bool: + return False + + # ========== List View ========== + list_fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "temperature_water", + "water_head", + "water_head_adjusted", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "notes", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + sortable_fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "water_head", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + fields_default_sort = [("date_measured", True)] + + searchable_fields = [ + "global_id", + "well_id", + "point_id", + "date_measured", + "measurement_method", + "data_source", + "measuring_agency", + "notes", + ] + + page_size = 50 + page_size_options = [25, 50, 100, 200] + + # ========== Detail View ========== + fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "temperature_water", + "water_head", + "water_head_adjusted", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "notes", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + field_labels = { + "global_id": "GlobalID", + "object_id": "OBJECTID", + "well_id": "WellID", + "point_id": "PointID", + "date_measured": "Date Measured", + "temperature_water": "Temperature Water", + "water_head": "Water Head", + "water_head_adjusted": "Water Head Adjusted", + "depth_to_water_bgs": "Depth To Water (BGS)", + "measurement_method": "Measurement Method", + "data_source": "Data Source", + "measuring_agency": "Measuring Agency", + "qced": "QCed", + "notes": "Notes", + "created": "Created", + "updated": "Updated", + "processed_by": "Processed By", + "checked_by": "Checked By", + "cond_dl_ms_cm": "CONDDL (mS/cm)", + } + + +# ============= EOF ============================================= diff --git a/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py b/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py new file mode 100644 index 00000000..f825e81a --- /dev/null +++ b/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py @@ -0,0 +1,92 @@ +"""Add thing_id FK to NMA_WaterLevelsContinuous_Pressure_Daily. + +Revision ID: e8a7c6b5d4f3 +Revises: b12e3919077e +Create Date: 2026-01-29 12:45:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "e8a7c6b5d4f3" +down_revision: Union[str, Sequence[str], None] = "b12e3919077e" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add thing_id and FK to legacy pressure daily table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"] + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + if "thing_id" not in columns: + op.add_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + sa.Column("thing_id", sa.Integer(), nullable=True), + ) + + existing_fks = { + fk["name"] + for fk in inspector.get_foreign_keys("NMA_WaterLevelsContinuous_Pressure_Daily") + if fk.get("name") + } + if "fk_pressure_daily_thing" not in existing_fks: + op.create_foreign_key( + "fk_pressure_daily_thing", + "NMA_WaterLevelsContinuous_Pressure_Daily", + "thing", + ["thing_id"], + ["id"], + ondelete="CASCADE", + ) + + null_count = bind.execute( + sa.text( + 'SELECT COUNT(*) FROM "NMA_WaterLevelsContinuous_Pressure_Daily" ' + 'WHERE "thing_id" IS NULL' + ) + ).scalar() + if null_count == 0: + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "thing_id", + existing_type=sa.Integer(), + nullable=False, + ) + + +def downgrade() -> None: + """Remove thing_id FK from legacy pressure daily table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + existing_fks = { + fk["name"] + for fk in inspector.get_foreign_keys("NMA_WaterLevelsContinuous_Pressure_Daily") + if fk.get("name") + } + if "fk_pressure_daily_thing" in existing_fks: + op.drop_constraint( + "fk_pressure_daily_thing", + "NMA_WaterLevelsContinuous_Pressure_Daily", + type_="foreignkey", + ) + + columns = { + col["name"] + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + if "thing_id" in columns: + op.drop_column("NMA_WaterLevelsContinuous_Pressure_Daily", "thing_id") diff --git a/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py b/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py new file mode 100644 index 00000000..38d11306 --- /dev/null +++ b/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py @@ -0,0 +1,85 @@ +"""Align UUID column types on NMA_WaterLevelsContinuous_Pressure_Daily. + +Revision ID: f0c9d8e7b6a5 +Revises: e8a7c6b5d4f3 +Create Date: 2026-01-29 12:55:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "f0c9d8e7b6a5" +down_revision: Union[str, Sequence[str], None] = "e8a7c6b5d4f3" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def _column_is_uuid(col) -> bool: + return isinstance(col.get("type"), postgresql.UUID) + + +def upgrade() -> None: + """Alter UUID columns to proper UUID types.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"]: col + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + + global_id_col = columns.get("GlobalID") + if global_id_col is not None and not _column_is_uuid(global_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "GlobalID", + type_=postgresql.UUID(as_uuid=True), + postgresql_using='"GlobalID"::uuid', + ) + + well_id_col = columns.get("WellID") + if well_id_col is not None and not _column_is_uuid(well_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "WellID", + type_=postgresql.UUID(as_uuid=True), + postgresql_using='"WellID"::uuid', + ) + + +def downgrade() -> None: + """Revert UUID columns back to strings.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"]: col + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + + global_id_col = columns.get("GlobalID") + if global_id_col is not None and _column_is_uuid(global_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "GlobalID", + type_=sa.String(length=40), + postgresql_using='"GlobalID"::text', + ) + + well_id_col = columns.get("WellID") + if well_id_col is not None and _column_is_uuid(well_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "WellID", + type_=sa.String(length=40), + postgresql_using='"WellID"::text', + ) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 4e2bb169..6f1954e7 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -56,11 +56,17 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): __tablename__ = "NMA_WaterLevelsContinuous_Pressure_Daily" - global_id: Mapped[str] = mapped_column("GlobalID", String(40), primary_key=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) object_id: Mapped[Optional[int]] = mapped_column( "OBJECTID", Integer, autoincrement=True ) - well_id: Mapped[Optional[str]] = mapped_column("WellID", String(40)) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) + # FK to Thing table - required for all WaterLevelsContinuous_Pressure_Daily records + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) date_measured: Mapped[datetime] = mapped_column( "DateMeasured", DateTime, nullable=False @@ -90,6 +96,10 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): checked_by: Mapped[Optional[str]] = mapped_column("CheckedBy", String(4)) cond_dl_ms_cm: Mapped[Optional[float]] = mapped_column("CONDDL (mS/cm)", Float) + thing: Mapped["Thing"] = relationship( + "Thing", back_populates="pressure_daily_levels" + ) + class NMA_view_NGWMN_WellConstruction(Base): """ diff --git a/db/thing.py b/db/thing.py index 8c3f4d31..66dc5524 100644 --- a/db/thing.py +++ b/db/thing.py @@ -47,7 +47,11 @@ from db.thing_geologic_formation_association import ( ThingGeologicFormationAssociation, ) - from db.nma_legacy import NMA_Chemistry_SampleInfo, NMA_Stratigraphy + from db.nma_legacy import ( + NMA_Chemistry_SampleInfo, + NMA_Stratigraphy, + NMA_WaterLevelsContinuous_Pressure_Daily, + ) class Thing( @@ -318,6 +322,14 @@ class Thing( cascade="all, delete-orphan", passive_deletes=True, ) + pressure_daily_levels: Mapped[List["NMA_WaterLevelsContinuous_Pressure_Daily"]] = ( + relationship( + "NMA_WaterLevelsContinuous_Pressure_Daily", + back_populates="thing", + cascade="all, delete-orphan", + passive_deletes=True, + ) + ) # --- Association Proxies --- assets: AssociationProxy[list["Asset"]] = association_proxy( diff --git a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py index 7328e405..9b6a55da 100644 --- a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py +++ b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py @@ -21,14 +21,17 @@ """ from datetime import datetime -from uuid import uuid4 +from uuid import UUID, uuid4 + +import pytest +from sqlalchemy.exc import IntegrityError, ProgrammingError from db.engine import session_ctx from db.nma_legacy import NMA_WaterLevelsContinuous_Pressure_Daily -def _next_global_id() -> str: - return str(uuid4()) +def _next_global_id() -> UUID: + return uuid4() def _next_object_id() -> int: @@ -37,15 +40,15 @@ def _next_object_id() -> int: # ===================== CREATE tests ========================== -def test_create_pressure_daily_all_fields(): +def test_create_pressure_daily_all_fields(water_well_thing): """Test creating a pressure daily record with required fields.""" with session_ctx() as session: now = datetime(2024, 1, 1, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), object_id=_next_object_id(), - well_id="WELL-1", - point_id="PD-1001", + well_id=uuid4(), + point_id=water_well_thing.name, date_measured=now, temperature_water=12.3, water_head=4.5, @@ -61,52 +64,55 @@ def test_create_pressure_daily_all_fields(): processed_by="AB", checked_by="CD", cond_dl_ms_cm=0.2, + thing_id=water_well_thing.id, ) session.add(record) session.commit() session.refresh(record) assert record.global_id is not None - assert record.point_id == "PD-1001" + assert record.point_id == water_well_thing.name assert record.date_measured == now session.delete(record) session.commit() -def test_create_pressure_daily_minimal(): +def test_create_pressure_daily_minimal(water_well_thing): """Test creating a pressure daily record with minimal fields.""" with session_ctx() as session: now = datetime(2024, 1, 2, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1002", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() session.refresh(record) assert record.global_id is not None - assert record.point_id == "PD-1002" + assert record.point_id == water_well_thing.name session.delete(record) session.commit() # ===================== READ tests ========================== -def test_read_pressure_daily_by_global_id(): +def test_read_pressure_daily_by_global_id(water_well_thing): """Test reading a pressure daily record by GlobalID.""" with session_ctx() as session: now = datetime(2024, 1, 3, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1003", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -116,23 +122,24 @@ def test_read_pressure_daily_by_global_id(): ) assert fetched is not None assert fetched.global_id == record.global_id - assert fetched.point_id == "PD-1003" + assert fetched.point_id == water_well_thing.name session.delete(record) session.commit() # ===================== UPDATE tests ========================== -def test_update_pressure_daily(): +def test_update_pressure_daily(water_well_thing): """Test updating a pressure daily record.""" with session_ctx() as session: now = datetime(2024, 1, 4, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1004", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -150,16 +157,17 @@ def test_update_pressure_daily(): # ===================== DELETE tests ========================== -def test_delete_pressure_daily(): +def test_delete_pressure_daily(water_well_thing): """Test deleting a pressure daily record.""" with session_ctx() as session: now = datetime(2024, 1, 5, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1005", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -180,6 +188,7 @@ def test_pressure_daily_has_all_migrated_columns(): "global_id", "object_id", "well_id", + "thing_id", "point_id", "date_measured", "temperature_water", @@ -212,4 +221,52 @@ def test_pressure_daily_table_name(): ) +# ===================== Relational Integrity Tests ====================== + + +def test_pressure_daily_thing_id_required(): + """ + VERIFIES: 'thing_id IS NOT NULL' and Foreign Key presence. + Ensures the DB rejects records without a Thing linkage. + """ + with session_ctx() as session: + now = datetime(2024, 1, 6, 12, 0, 0) + record = NMA_WaterLevelsContinuous_Pressure_Daily( + global_id=_next_global_id(), + point_id="PD-1006", + date_measured=now, + created=now, + updated=now, + ) + session.add(record) + + with pytest.raises((IntegrityError, ProgrammingError)): + session.flush() + session.rollback() + + +def test_pressure_daily_invalid_thing_id_rejected(water_well_thing): + """ + VERIFIES: foreign key integrity on thing_id. + Ensures the DB rejects updates to a non-existent Thing. + """ + with session_ctx() as session: + now = datetime(2024, 1, 7, 12, 0, 0) + record = NMA_WaterLevelsContinuous_Pressure_Daily( + global_id=_next_global_id(), + point_id=water_well_thing.name, + date_measured=now, + created=now, + updated=now, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + + with pytest.raises((IntegrityError, ProgrammingError)): + record.thing_id = 999999 + session.flush() + session.rollback() + + # ============= EOF ============================================= diff --git a/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py b/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py new file mode 100644 index 00000000..a5616f81 --- /dev/null +++ b/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py @@ -0,0 +1,47 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +import pandas as pd + +from transfers.waterlevelscontinuous_pressure_daily import ( + NMA_WaterLevelsContinuous_Pressure_DailyTransferer, +) + + +def test_pressure_daily_transfer_filters_orphans(water_well_thing): + transferer = NMA_WaterLevelsContinuous_Pressure_DailyTransferer(batch_size=1) + df = pd.DataFrame( + [ + {"PointID": water_well_thing.name, "GlobalID": "gid-1"}, + {"PointID": "MISSING-THING", "GlobalID": "gid-2"}, + ] + ) + + filtered = transferer._filter_to_valid_things(df) + + assert list(filtered["PointID"]) == [water_well_thing.name] + + +def test_pressure_daily_row_dict_sets_thing_id(water_well_thing): + transferer = NMA_WaterLevelsContinuous_Pressure_DailyTransferer(batch_size=1) + row = {"PointID": water_well_thing.name, "GlobalID": "gid-3"} + + mapped = transferer._row_dict(row) + + assert mapped["thing_id"] == water_well_thing.id + + +# ============= EOF ============================================= diff --git a/transfers/waterlevelscontinuous_pressure_daily.py b/transfers/waterlevelscontinuous_pressure_daily.py index c41423f7..6caa348c 100644 --- a/transfers/waterlevelscontinuous_pressure_daily.py +++ b/transfers/waterlevelscontinuous_pressure_daily.py @@ -22,7 +22,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_WaterLevelsContinuous_Pressure_Daily +from db import NMA_WaterLevelsContinuous_Pressure_Daily, Thing +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import read_csv @@ -41,6 +42,30 @@ class NMA_WaterLevelsContinuous_Pressure_DailyTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + + def _filter_to_valid_things(self, df: pd.DataFrame) -> pd.DataFrame: + valid_point_ids = set(self._thing_id_cache.keys()) + before_count = len(df) + filtered_df = df[df["PointID"].isin(valid_point_ids)].copy() + after_count = len(filtered_df) + if before_count > after_count: + skipped = before_count - after_count + logger.warning( + "Filtered out %s WaterLevelsContinuous_Pressure_Daily records without matching Things " + "(%s valid, %s orphan records prevented)", + skipped, + after_count, + skipped, + ) + return filtered_df def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: # Parse key datetime columns eagerly to avoid per-row parsing later. @@ -48,8 +73,8 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: self.source_table, parse_dates=["DateMeasured", "Created", "Updated"], ) - # No special cleaning/validation beyond raw import; keep identical copy. - return input_df, input_df + cleaned_df = self._filter_to_valid_things(input_df) + return input_df, cleaned_df def _transfer_hook(self, session: Session) -> None: rows = self._dedupe_rows( @@ -71,6 +96,7 @@ def _transfer_hook(self, session: Session) -> None: "OBJECTID": excluded.OBJECTID, "WellID": excluded.WellID, "PointID": excluded.PointID, + "thing_id": excluded.thing_id, "DateMeasured": excluded.DateMeasured, "TemperatureWater": excluded.TemperatureWater, "WaterHead": excluded.WaterHead, @@ -104,6 +130,7 @@ def val(key: str) -> Optional[Any]: "OBJECTID": val("OBJECTID"), "WellID": val("WellID"), "PointID": val("PointID"), + "thing_id": self._thing_id_cache.get(val("PointID")), "DateMeasured": val("DateMeasured"), "TemperatureWater": val("TemperatureWater"), "WaterHead": val("WaterHead"),