From 52f60c742f678e9aaec15dff5e942063ec98f722 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 28 Jan 2026 15:11:51 -0700 Subject: [PATCH 01/24] refactor: Use UUID types for GlobalID/WellID in NMA_WaterLevelsContinuous_Pressure_Daily Context: updates db/nma_legacy.py to map GlobalID and WellID as UUID(as_uuid=True) and documents the WellID FK to Thing for the continuous pressure daily model. --- db/nma_legacy.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 5ea1337e1..5129a1b59 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -56,11 +56,14 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): __tablename__ = "NMA_WaterLevelsContinuous_Pressure_Daily" - global_id: Mapped[str] = mapped_column("GlobalID", String(40), primary_key=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) object_id: Mapped[Optional[int]] = mapped_column( "OBJECTID", Integer, autoincrement=True ) - well_id: Mapped[Optional[str]] = mapped_column("WellID", String(40)) + # FK to Thing table (well_id --> Thing.nma_pk_welldata) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) date_measured: Mapped[datetime] = mapped_column( "DateMeasured", DateTime, nullable=False From e46ac1580b05a143761ec600af5f2cdcf4e82c29 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 28 Jan 2026 15:36:50 -0700 Subject: [PATCH 02/24] feat: add admin view for legacy continuous pressure daily water levels - Define read-only Starlette Admin view with full legacy-order fields - Register view in admin/views/__init__.py and admin/config.py --- admin/config.py | 9 ++ admin/views/__init__.py | 4 + .../waterlevelscontinuous_pressure_daily.py | 147 ++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 admin/views/waterlevelscontinuous_pressure_daily.py diff --git a/admin/config.py b/admin/config.py index 5aec1a5f4..1c3bb14f0 100644 --- a/admin/config.py +++ b/admin/config.py @@ -53,6 +53,7 @@ SurfaceWaterPhotosAdmin, ThingAdmin, TransducerObservationAdmin, + WaterLevelsContinuousPressureDailyAdmin, WeatherPhotosAdmin, WeatherDataAdmin, FieldParametersAdmin, @@ -80,6 +81,7 @@ NMA_Soil_Rock_Results, NMA_Stratigraphy, NMA_SurfaceWaterData, + NMA_WaterLevelsContinuous_Pressure_Daily, NMA_WeatherPhotos, NMA_SurfaceWaterPhotos, NMA_WeatherData, @@ -192,6 +194,13 @@ def create_admin(app): # Transducer observations admin.add_view(TransducerObservationAdmin(TransducerObservation)) + # Water Levels - Continuous (legacy) + admin.add_view( + WaterLevelsContinuousPressureDailyAdmin( + NMA_WaterLevelsContinuous_Pressure_Daily + ) + ) + # Weather admin.add_view(WeatherPhotosAdmin(NMA_WeatherPhotos)) diff --git a/admin/views/__init__.py b/admin/views/__init__.py index 33920b856..285d5ef5f 100644 --- a/admin/views/__init__.py +++ b/admin/views/__init__.py @@ -52,6 +52,9 @@ from admin.views.surface_water_photos import SurfaceWaterPhotosAdmin from admin.views.thing import ThingAdmin from admin.views.transducer_observation import TransducerObservationAdmin +from admin.views.waterlevelscontinuous_pressure_daily import ( + WaterLevelsContinuousPressureDailyAdmin, +) from admin.views.weather_photos import WeatherPhotosAdmin from admin.views.weather_data import WeatherDataAdmin @@ -88,6 +91,7 @@ "SurfaceWaterPhotosAdmin", "ThingAdmin", "TransducerObservationAdmin", + "WaterLevelsContinuousPressureDailyAdmin", "WeatherPhotosAdmin", "WeatherDataAdmin", ] diff --git a/admin/views/waterlevelscontinuous_pressure_daily.py b/admin/views/waterlevelscontinuous_pressure_daily.py new file mode 100644 index 000000000..094700f1c --- /dev/null +++ b/admin/views/waterlevelscontinuous_pressure_daily.py @@ -0,0 +1,147 @@ +# =============================================================================== +# Copyright 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +WaterLevelsContinuousPressureDailyAdmin view for legacy NMA_WaterLevelsContinuous_Pressure_Daily. +""" +from starlette.requests import Request + +from admin.views.base import OcotilloModelView + + +class WaterLevelsContinuousPressureDailyAdmin(OcotilloModelView): + """ + Admin view for NMA_WaterLevelsContinuous_Pressure_Daily model. + """ + + # ========== Basic Configuration ========== + name = "NMA Water Levels Continuous Pressure Daily" + label = "NMA Water Levels Continuous Pressure Daily" + icon = "fa fa-tachometer-alt" + + def can_create(self, request: Request) -> bool: + return False + + def can_edit(self, request: Request) -> bool: + return False + + def can_delete(self, request: Request) -> bool: + return False + + # ========== List View ========== + list_fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "temperature_water", + "water_head", + "water_head_adjusted", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "notes", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + sortable_fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "water_head", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + fields_default_sort = [("date_measured", True)] + + searchable_fields = [ + "global_id", + "well_id", + "point_id", + "date_measured", + "measurement_method", + "data_source", + "measuring_agency", + "notes", + ] + + page_size = 50 + page_size_options = [25, 50, 100, 200] + + # ========== Detail View ========== + fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "temperature_water", + "water_head", + "water_head_adjusted", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "notes", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + field_labels = { + "global_id": "GlobalID", + "object_id": "OBJECTID", + "well_id": "WellID", + "point_id": "PointID", + "date_measured": "Date Measured", + "temperature_water": "Temperature Water", + "water_head": "Water Head", + "water_head_adjusted": "Water Head Adjusted", + "depth_to_water_bgs": "Depth To Water (BGS)", + "measurement_method": "Measurement Method", + "data_source": "Data Source", + "measuring_agency": "Measuring Agency", + "qced": "QCed", + "notes": "Notes", + "created": "Created", + "updated": "Updated", + "processed_by": "Processed By", + "checked_by": "Checked By", + "cond_dl_ms_cm": "CONDDL (mS/cm)", + } + + +# ============= EOF ============================================= From 3cd8c56dab3475aa4c8865a45e204ede031fad41 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 28 Jan 2026 16:09:14 -0700 Subject: [PATCH 03/24] refactor (test): Update test_waterlevelscontinuous_pressure_daily_legacy.py for UUID IDs - Updated the test helper to return UUIDs for GlobalID and WellID in legacy model tests - Changed well_id to use a UUID so it matches the model mapping. --- tests/test_waterlevelscontinuous_pressure_daily_legacy.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py index 7328e4059..d2622c72f 100644 --- a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py +++ b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py @@ -21,14 +21,14 @@ """ from datetime import datetime -from uuid import uuid4 +from uuid import UUID, uuid4 from db.engine import session_ctx from db.nma_legacy import NMA_WaterLevelsContinuous_Pressure_Daily -def _next_global_id() -> str: - return str(uuid4()) +def _next_global_id() -> UUID: + return uuid4() def _next_object_id() -> int: @@ -44,7 +44,7 @@ def test_create_pressure_daily_all_fields(): record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), object_id=_next_object_id(), - well_id="WELL-1", + well_id=uuid4(), point_id="PD-1001", date_measured=now, temperature_water=12.3, From 9b8a398f7077b99b621435185635718af6990f82 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Wed, 28 Jan 2026 23:18:24 +0000 Subject: [PATCH 04/24] Formatting changes --- admin/views/waterlevelscontinuous_pressure_daily.py | 1 + 1 file changed, 1 insertion(+) diff --git a/admin/views/waterlevelscontinuous_pressure_daily.py b/admin/views/waterlevelscontinuous_pressure_daily.py index 094700f1c..ac2afb020 100644 --- a/admin/views/waterlevelscontinuous_pressure_daily.py +++ b/admin/views/waterlevelscontinuous_pressure_daily.py @@ -16,6 +16,7 @@ """ WaterLevelsContinuousPressureDailyAdmin view for legacy NMA_WaterLevelsContinuous_Pressure_Daily. """ + from starlette.requests import Request from admin.views.base import OcotilloModelView From 91d3aa5c38dbb6b55af655e75e92e1439092e94b Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 10:02:24 -0700 Subject: [PATCH 05/24] fix: add ForeignKey constraint to well_id in nma_legacy model --- db/nma_legacy.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 5129a1b59..d475e362d 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -63,7 +63,12 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): "OBJECTID", Integer, autoincrement=True ) # FK to Thing table (well_id --> Thing.nma_pk_welldata) - well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "WellID", + UUID(as_uuid=True), + ForeignKey("Thing.nma_pk_welldata"), + nullable=False, + ) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) date_measured: Mapped[datetime] = mapped_column( "DateMeasured", DateTime, nullable=False From f09a67dc6d21744d82674604b7148894371d6572 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 10:07:29 -0700 Subject: [PATCH 06/24] fix: update well_id to be non-optional and enforce ForeignKey constraint in nma_legacy model --- db/nma_legacy.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index d475e362d..a97efcde1 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -63,7 +63,7 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): "OBJECTID", Integer, autoincrement=True ) # FK to Thing table (well_id --> Thing.nma_pk_welldata) - well_id: Mapped[Optional[uuid.UUID]] = mapped_column( + well_id: Mapped[uuid.UUID] = mapped_column( "WellID", UUID(as_uuid=True), ForeignKey("Thing.nma_pk_welldata"), @@ -179,7 +179,12 @@ class NMA_HydraulicsData(Base): global_id: Mapped[uuid.UUID] = mapped_column( "GlobalID", UUID(as_uuid=True), primary_key=True ) - well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) + well_id: Mapped[uuid.UUID] = mapped_column( + "WellID", + UUID(as_uuid=True), + ForeignKey("thing.nma_pk_welldata"), + nullable=False, + ) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) data_source: Mapped[Optional[str]] = mapped_column("Data Source", String(255)) thing_id: Mapped[int] = mapped_column( From 71899fe13b6afad5617de443d227e0643f0a6128 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 10:14:11 -0700 Subject: [PATCH 07/24] fix: remove well_id field from NMA_HydraulicsData model --- db/nma_legacy.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index a97efcde1..64e58c81e 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -179,12 +179,6 @@ class NMA_HydraulicsData(Base): global_id: Mapped[uuid.UUID] = mapped_column( "GlobalID", UUID(as_uuid=True), primary_key=True ) - well_id: Mapped[uuid.UUID] = mapped_column( - "WellID", - UUID(as_uuid=True), - ForeignKey("thing.nma_pk_welldata"), - nullable=False, - ) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) data_source: Mapped[Optional[str]] = mapped_column("Data Source", String(255)) thing_id: Mapped[int] = mapped_column( From 5c7b730009abb03e6e7fed7a498f754c4a18652e Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 10:22:56 -0700 Subject: [PATCH 08/24] fix: update well_id field to be optional and add thing_id ForeignKey constraint in nma_legacy model --- db/nma_legacy.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 64e58c81e..45f4ce2d4 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -62,12 +62,10 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): object_id: Mapped[Optional[int]] = mapped_column( "OBJECTID", Integer, autoincrement=True ) - # FK to Thing table (well_id --> Thing.nma_pk_welldata) - well_id: Mapped[uuid.UUID] = mapped_column( - "WellID", - UUID(as_uuid=True), - ForeignKey("Thing.nma_pk_welldata"), - nullable=False, + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) + # FK to Thing table - required for all WaterLevelsContinuous_Pressure_Daily records + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False ) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) date_measured: Mapped[datetime] = mapped_column( From 0ff8926945edfe7dc71ea381b07d6d2e2550fa9d Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 10:32:53 -0700 Subject: [PATCH 09/24] fix: make well_id field optional in nma_legacy model --- db/nma_legacy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 45f4ce2d4..f9b55cfe3 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -177,6 +177,7 @@ class NMA_HydraulicsData(Base): global_id: Mapped[uuid.UUID] = mapped_column( "GlobalID", UUID(as_uuid=True), primary_key=True ) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) data_source: Mapped[Optional[str]] = mapped_column("Data Source", String(255)) thing_id: Mapped[int] = mapped_column( From 8d16b93c20929719eca47ee870e8e30426a1bf1d Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 12:21:02 -0700 Subject: [PATCH 10/24] test: add tests for thing_id foreign key integrity in water levels continuous pressure daily records --- ...rlevelscontinuous_pressure_daily_legacy.py | 77 ++++++++++++++++--- 1 file changed, 67 insertions(+), 10 deletions(-) diff --git a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py index d2622c72f..d98b03ab5 100644 --- a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py +++ b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py @@ -23,6 +23,9 @@ from datetime import datetime from uuid import UUID, uuid4 +import pytest +from sqlalchemy.exc import IntegrityError, ProgrammingError + from db.engine import session_ctx from db.nma_legacy import NMA_WaterLevelsContinuous_Pressure_Daily @@ -37,7 +40,7 @@ def _next_object_id() -> int: # ===================== CREATE tests ========================== -def test_create_pressure_daily_all_fields(): +def test_create_pressure_daily_all_fields(water_well_thing): """Test creating a pressure daily record with required fields.""" with session_ctx() as session: now = datetime(2024, 1, 1, 12, 0, 0) @@ -45,7 +48,7 @@ def test_create_pressure_daily_all_fields(): global_id=_next_global_id(), object_id=_next_object_id(), well_id=uuid4(), - point_id="PD-1001", + point_id=water_well_thing.name, date_measured=now, temperature_water=12.3, water_head=4.5, @@ -61,6 +64,7 @@ def test_create_pressure_daily_all_fields(): processed_by="AB", checked_by="CD", cond_dl_ms_cm=0.2, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -74,16 +78,17 @@ def test_create_pressure_daily_all_fields(): session.commit() -def test_create_pressure_daily_minimal(): +def test_create_pressure_daily_minimal(water_well_thing): """Test creating a pressure daily record with minimal fields.""" with session_ctx() as session: now = datetime(2024, 1, 2, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1002", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -97,16 +102,17 @@ def test_create_pressure_daily_minimal(): # ===================== READ tests ========================== -def test_read_pressure_daily_by_global_id(): +def test_read_pressure_daily_by_global_id(water_well_thing): """Test reading a pressure daily record by GlobalID.""" with session_ctx() as session: now = datetime(2024, 1, 3, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1003", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -123,16 +129,17 @@ def test_read_pressure_daily_by_global_id(): # ===================== UPDATE tests ========================== -def test_update_pressure_daily(): +def test_update_pressure_daily(water_well_thing): """Test updating a pressure daily record.""" with session_ctx() as session: now = datetime(2024, 1, 4, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1004", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -150,16 +157,17 @@ def test_update_pressure_daily(): # ===================== DELETE tests ========================== -def test_delete_pressure_daily(): +def test_delete_pressure_daily(water_well_thing): """Test deleting a pressure daily record.""" with session_ctx() as session: now = datetime(2024, 1, 5, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1005", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -180,6 +188,7 @@ def test_pressure_daily_has_all_migrated_columns(): "global_id", "object_id", "well_id", + "thing_id", "point_id", "date_measured", "temperature_water", @@ -212,4 +221,52 @@ def test_pressure_daily_table_name(): ) +# ===================== Relational Integrity Tests ====================== + + +def test_pressure_daily_thing_id_required(): + """ + VERIFIES: 'thing_id IS NOT NULL' and Foreign Key presence. + Ensures the DB rejects records without a Thing linkage. + """ + with session_ctx() as session: + now = datetime(2024, 1, 6, 12, 0, 0) + record = NMA_WaterLevelsContinuous_Pressure_Daily( + global_id=_next_global_id(), + point_id="PD-1006", + date_measured=now, + created=now, + updated=now, + ) + session.add(record) + + with pytest.raises((IntegrityError, ProgrammingError)): + session.flush() + session.rollback() + + +def test_pressure_daily_invalid_thing_id_rejected(water_well_thing): + """ + VERIFIES: foreign key integrity on thing_id. + Ensures the DB rejects updates to a non-existent Thing. + """ + with session_ctx() as session: + now = datetime(2024, 1, 7, 12, 0, 0) + record = NMA_WaterLevelsContinuous_Pressure_Daily( + global_id=_next_global_id(), + point_id=water_well_thing.name, + date_measured=now, + created=now, + updated=now, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + + with pytest.raises((IntegrityError, ProgrammingError)): + record.thing_id = 999999 + session.flush() + session.rollback() + + # ============= EOF ============================================= From d6fb0fae5cab513e7d56084e14c3349ce55dac45 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 12:45:50 -0700 Subject: [PATCH 11/24] feat: Add thing_id FK to NMA pressure daily table - Added migrations for thing_id FK and UUID column alignment - Updated pressure daily legacy tests for Thing linkage and FK enforcement --- ...ma_waterlevelscontinuous_pressure_daily.py | 92 +++++++++++++++++++ ...7b6a5_align_pressure_daily_uuid_columns.py | 85 +++++++++++++++++ ...rlevelscontinuous_pressure_daily_legacy.py | 6 +- 3 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py create mode 100644 alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py diff --git a/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py b/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py new file mode 100644 index 000000000..f825e81ae --- /dev/null +++ b/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py @@ -0,0 +1,92 @@ +"""Add thing_id FK to NMA_WaterLevelsContinuous_Pressure_Daily. + +Revision ID: e8a7c6b5d4f3 +Revises: b12e3919077e +Create Date: 2026-01-29 12:45:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "e8a7c6b5d4f3" +down_revision: Union[str, Sequence[str], None] = "b12e3919077e" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add thing_id and FK to legacy pressure daily table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"] + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + if "thing_id" not in columns: + op.add_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + sa.Column("thing_id", sa.Integer(), nullable=True), + ) + + existing_fks = { + fk["name"] + for fk in inspector.get_foreign_keys("NMA_WaterLevelsContinuous_Pressure_Daily") + if fk.get("name") + } + if "fk_pressure_daily_thing" not in existing_fks: + op.create_foreign_key( + "fk_pressure_daily_thing", + "NMA_WaterLevelsContinuous_Pressure_Daily", + "thing", + ["thing_id"], + ["id"], + ondelete="CASCADE", + ) + + null_count = bind.execute( + sa.text( + 'SELECT COUNT(*) FROM "NMA_WaterLevelsContinuous_Pressure_Daily" ' + 'WHERE "thing_id" IS NULL' + ) + ).scalar() + if null_count == 0: + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "thing_id", + existing_type=sa.Integer(), + nullable=False, + ) + + +def downgrade() -> None: + """Remove thing_id FK from legacy pressure daily table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + existing_fks = { + fk["name"] + for fk in inspector.get_foreign_keys("NMA_WaterLevelsContinuous_Pressure_Daily") + if fk.get("name") + } + if "fk_pressure_daily_thing" in existing_fks: + op.drop_constraint( + "fk_pressure_daily_thing", + "NMA_WaterLevelsContinuous_Pressure_Daily", + type_="foreignkey", + ) + + columns = { + col["name"] + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + if "thing_id" in columns: + op.drop_column("NMA_WaterLevelsContinuous_Pressure_Daily", "thing_id") diff --git a/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py b/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py new file mode 100644 index 000000000..38d113068 --- /dev/null +++ b/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py @@ -0,0 +1,85 @@ +"""Align UUID column types on NMA_WaterLevelsContinuous_Pressure_Daily. + +Revision ID: f0c9d8e7b6a5 +Revises: e8a7c6b5d4f3 +Create Date: 2026-01-29 12:55:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "f0c9d8e7b6a5" +down_revision: Union[str, Sequence[str], None] = "e8a7c6b5d4f3" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def _column_is_uuid(col) -> bool: + return isinstance(col.get("type"), postgresql.UUID) + + +def upgrade() -> None: + """Alter UUID columns to proper UUID types.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"]: col + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + + global_id_col = columns.get("GlobalID") + if global_id_col is not None and not _column_is_uuid(global_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "GlobalID", + type_=postgresql.UUID(as_uuid=True), + postgresql_using='"GlobalID"::uuid', + ) + + well_id_col = columns.get("WellID") + if well_id_col is not None and not _column_is_uuid(well_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "WellID", + type_=postgresql.UUID(as_uuid=True), + postgresql_using='"WellID"::uuid', + ) + + +def downgrade() -> None: + """Revert UUID columns back to strings.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"]: col + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + + global_id_col = columns.get("GlobalID") + if global_id_col is not None and _column_is_uuid(global_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "GlobalID", + type_=sa.String(length=40), + postgresql_using='"GlobalID"::text', + ) + + well_id_col = columns.get("WellID") + if well_id_col is not None and _column_is_uuid(well_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "WellID", + type_=sa.String(length=40), + postgresql_using='"WellID"::text', + ) diff --git a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py index d98b03ab5..9b6a55dac 100644 --- a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py +++ b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py @@ -71,7 +71,7 @@ def test_create_pressure_daily_all_fields(water_well_thing): session.refresh(record) assert record.global_id is not None - assert record.point_id == "PD-1001" + assert record.point_id == water_well_thing.name assert record.date_measured == now session.delete(record) @@ -95,7 +95,7 @@ def test_create_pressure_daily_minimal(water_well_thing): session.refresh(record) assert record.global_id is not None - assert record.point_id == "PD-1002" + assert record.point_id == water_well_thing.name session.delete(record) session.commit() @@ -122,7 +122,7 @@ def test_read_pressure_daily_by_global_id(water_well_thing): ) assert fetched is not None assert fetched.global_id == record.global_id - assert fetched.point_id == "PD-1003" + assert fetched.point_id == water_well_thing.name session.delete(record) session.commit() From 8ae5bbbca363ecb0e60937f6ee81c100732bd49e Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 13:11:45 -0700 Subject: [PATCH 12/24] refactor: Enhance transfer of waterlevelscontinuous_pressure_daily - Cache Thing IDs and map PointID to thing_id to satisfy new FK - Filter orphan rows to prevent invalid inserts - Add focused transfer unit test to validate mapping and filtering --- ...evelscontinuous_pressure_daily_transfer.py | 47 +++++++++++++++++++ .../waterlevelscontinuous_pressure_daily.py | 33 +++++++++++-- 2 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py diff --git a/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py b/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py new file mode 100644 index 000000000..a5616f81b --- /dev/null +++ b/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py @@ -0,0 +1,47 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +import pandas as pd + +from transfers.waterlevelscontinuous_pressure_daily import ( + NMA_WaterLevelsContinuous_Pressure_DailyTransferer, +) + + +def test_pressure_daily_transfer_filters_orphans(water_well_thing): + transferer = NMA_WaterLevelsContinuous_Pressure_DailyTransferer(batch_size=1) + df = pd.DataFrame( + [ + {"PointID": water_well_thing.name, "GlobalID": "gid-1"}, + {"PointID": "MISSING-THING", "GlobalID": "gid-2"}, + ] + ) + + filtered = transferer._filter_to_valid_things(df) + + assert list(filtered["PointID"]) == [water_well_thing.name] + + +def test_pressure_daily_row_dict_sets_thing_id(water_well_thing): + transferer = NMA_WaterLevelsContinuous_Pressure_DailyTransferer(batch_size=1) + row = {"PointID": water_well_thing.name, "GlobalID": "gid-3"} + + mapped = transferer._row_dict(row) + + assert mapped["thing_id"] == water_well_thing.id + + +# ============= EOF ============================================= diff --git a/transfers/waterlevelscontinuous_pressure_daily.py b/transfers/waterlevelscontinuous_pressure_daily.py index c41423f78..6caa348c3 100644 --- a/transfers/waterlevelscontinuous_pressure_daily.py +++ b/transfers/waterlevelscontinuous_pressure_daily.py @@ -22,7 +22,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_WaterLevelsContinuous_Pressure_Daily +from db import NMA_WaterLevelsContinuous_Pressure_Daily, Thing +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import read_csv @@ -41,6 +42,30 @@ class NMA_WaterLevelsContinuous_Pressure_DailyTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + + def _filter_to_valid_things(self, df: pd.DataFrame) -> pd.DataFrame: + valid_point_ids = set(self._thing_id_cache.keys()) + before_count = len(df) + filtered_df = df[df["PointID"].isin(valid_point_ids)].copy() + after_count = len(filtered_df) + if before_count > after_count: + skipped = before_count - after_count + logger.warning( + "Filtered out %s WaterLevelsContinuous_Pressure_Daily records without matching Things " + "(%s valid, %s orphan records prevented)", + skipped, + after_count, + skipped, + ) + return filtered_df def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: # Parse key datetime columns eagerly to avoid per-row parsing later. @@ -48,8 +73,8 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: self.source_table, parse_dates=["DateMeasured", "Created", "Updated"], ) - # No special cleaning/validation beyond raw import; keep identical copy. - return input_df, input_df + cleaned_df = self._filter_to_valid_things(input_df) + return input_df, cleaned_df def _transfer_hook(self, session: Session) -> None: rows = self._dedupe_rows( @@ -71,6 +96,7 @@ def _transfer_hook(self, session: Session) -> None: "OBJECTID": excluded.OBJECTID, "WellID": excluded.WellID, "PointID": excluded.PointID, + "thing_id": excluded.thing_id, "DateMeasured": excluded.DateMeasured, "TemperatureWater": excluded.TemperatureWater, "WaterHead": excluded.WaterHead, @@ -104,6 +130,7 @@ def val(key: str) -> Optional[Any]: "OBJECTID": val("OBJECTID"), "WellID": val("WellID"), "PointID": val("PointID"), + "thing_id": self._thing_id_cache.get(val("PointID")), "DateMeasured": val("DateMeasured"), "TemperatureWater": val("TemperatureWater"), "WaterHead": val("WaterHead"), From 4466e5a50ab14b1d6deaa1cc942b8fb446c46b65 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 29 Jan 2026 15:24:37 -0700 Subject: [PATCH 13/24] feat: Add missing relationship/backref between Thing and NMA_WaterLevelsContinuous_Pressure_Daily --- db/nma_legacy.py | 4 ++++ db/thing.py | 14 +++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 33981bfae..6f1954e72 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -96,6 +96,10 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): checked_by: Mapped[Optional[str]] = mapped_column("CheckedBy", String(4)) cond_dl_ms_cm: Mapped[Optional[float]] = mapped_column("CONDDL (mS/cm)", Float) + thing: Mapped["Thing"] = relationship( + "Thing", back_populates="pressure_daily_levels" + ) + class NMA_view_NGWMN_WellConstruction(Base): """ diff --git a/db/thing.py b/db/thing.py index 8c3f4d315..66dc55244 100644 --- a/db/thing.py +++ b/db/thing.py @@ -47,7 +47,11 @@ from db.thing_geologic_formation_association import ( ThingGeologicFormationAssociation, ) - from db.nma_legacy import NMA_Chemistry_SampleInfo, NMA_Stratigraphy + from db.nma_legacy import ( + NMA_Chemistry_SampleInfo, + NMA_Stratigraphy, + NMA_WaterLevelsContinuous_Pressure_Daily, + ) class Thing( @@ -318,6 +322,14 @@ class Thing( cascade="all, delete-orphan", passive_deletes=True, ) + pressure_daily_levels: Mapped[List["NMA_WaterLevelsContinuous_Pressure_Daily"]] = ( + relationship( + "NMA_WaterLevelsContinuous_Pressure_Daily", + back_populates="thing", + cascade="all, delete-orphan", + passive_deletes=True, + ) + ) # --- Association Proxies --- assets: AssociationProxy[list["Asset"]] = association_proxy( From a1f00e1b6cee5419174a2653f002ed8223290f16 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 31 Jan 2026 10:31:42 +1100 Subject: [PATCH 14/24] feat: enhance logging configuration and improve transfer flag handling --- alembic/env.py | 13 ++- tests/test_minor_trace_chemistry_transfer.py | 35 ++++++ transfers/logger.py | 22 +--- transfers/minor_trace_chemistry_transfer.py | 2 + transfers/stratigraphy_legacy.py | 6 +- transfers/transfer.py | 106 ++++++++++--------- transfers/well_transfer.py | 4 + 7 files changed, 113 insertions(+), 75 deletions(-) create mode 100644 tests/test_minor_trace_chemistry_transfer.py diff --git a/alembic/env.py b/alembic/env.py index 089144e88..081df1b9f 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -5,9 +5,10 @@ from alembic import context from dotenv import load_dotenv -from services.util import get_bool_env from sqlalchemy import create_engine, engine_from_config, pool, text +from services.util import get_bool_env + # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config @@ -15,8 +16,16 @@ # Interpret the config file for Python logging. # This line sets up loggers basically. -if config.config_file_name is not None: +if config.config_file_name is not None and os.environ.get( + "ALEMBIC_USE_FILE_CONFIG", "0" +) not in {"0", "false", "False"}: fileConfig(config.config_file_name, disable_existing_loggers=False) +else: + root_logger = logging.getLogger() + alembic_logger = logging.getLogger("alembic") + alembic_logger.handlers = root_logger.handlers[:] + alembic_logger.setLevel(root_logger.level) + alembic_logger.propagate = False # add your model's MetaData object here # for 'autogenerate' support diff --git a/tests/test_minor_trace_chemistry_transfer.py b/tests/test_minor_trace_chemistry_transfer.py new file mode 100644 index 000000000..fec7be618 --- /dev/null +++ b/tests/test_minor_trace_chemistry_transfer.py @@ -0,0 +1,35 @@ +import uuid + +import pandas as pd + +from transfers.minor_trace_chemistry_transfer import MinorTraceChemistryTransferer + + +def test_row_to_dict_includes_wclab_id(): + transfer = MinorTraceChemistryTransferer.__new__(MinorTraceChemistryTransferer) + sample_pt_id = uuid.uuid4() + transfer._sample_pt_ids = {sample_pt_id} + transfer.flags = {} + + row = pd.Series( + { + "SamplePtID": str(sample_pt_id), + "GlobalID": str(uuid.uuid4()), + "SamplePointID": "POINT-1", + "Analyte": "Ca", + "SampleValue": 10.5, + "Units": "mg/L", + "Symbol": None, + "AnalysisMethod": "ICP", + "AnalysisDate": "2024-01-01 00:00:00.000", + "Notes": "note", + "AnalysesAgency": "Lab", + "Uncertainty": 0.1, + "Volume": "2", + "VolumeUnit": "L", + "WCLab_ID": "LAB-123", + } + ) + + row_dict = transfer._row_to_dict(row) + assert row_dict["WCLab_ID"] == "LAB-123" diff --git a/transfers/logger.py b/transfers/logger.py index a5fd62414..decf34d0c 100644 --- a/transfers/logger.py +++ b/transfers/logger.py @@ -21,18 +21,6 @@ from services.gcs_helper import get_storage_bucket -# class StreamToLogger: -# def __init__(self, logger_, level): -# self.logger = logger_ -# self.level = level -# self.linebuf = "" -# -# def write(self, buf): -# for line in buf.rstrip().splitlines(): -# self.logger.log(self.level, line.rstrip()) -# -# def flush(self): -# pass root = Path("logs") if not os.getcwd().endswith("transfers"): root = Path("transfers") / root @@ -40,7 +28,8 @@ if not os.path.exists(root): os.mkdir(root) -log_filename = root / f"transfer_{datetime.now():%Y-%m-%dT%H_%M_%S}.log" +log_filename = f"transfer_{datetime.now():%Y-%m-%dT%H_%M_%S}.log" +log_path = root / log_filename logging.basicConfig( @@ -48,7 +37,7 @@ format="%(asctime)s [%(levelname)-8s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), - logging.FileHandler(log_filename, mode="w", encoding="utf-8"), + logging.FileHandler(log_path, mode="w", encoding="utf-8"), ], force=True, ) @@ -61,14 +50,11 @@ # workaround to not redirect httpx logging logging.getLogger("httpx").setLevel(logging.WARNING) -# redirect stderr to the logger -# sys.stderr = StreamToLogger(logger, logging.ERROR) - def save_log_to_bucket(): bucket = get_storage_bucket() blob = bucket.blob(f"transfer_logs/{log_filename}") - blob.upload_from_filename(log_filename) + blob.upload_from_filename(log_path) logger.info(f"Uploaded log to gs://{bucket.name}/transfer_logs/{log_filename}") diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 60ade7560..012b6bf00 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -139,6 +139,7 @@ def _transfer_hook(self, session: Session) -> None: "Uncertainty": excluded.Uncertainty, "Volume": excluded.Volume, "VolumeUnit": excluded.VolumeUnit, + "WCLab_ID": excluded.WCLab_ID, }, ) session.execute(stmt) @@ -188,6 +189,7 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: "Uncertainty": self._safe_float(row, "Uncertainty"), "Volume": self._safe_int(row, "Volume"), "VolumeUnit": self._safe_str(row, "VolumeUnit"), + "WCLab_ID": self._safe_str(row, "WCLab_ID"), } def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: diff --git a/transfers/stratigraphy_legacy.py b/transfers/stratigraphy_legacy.py index 326f6434a..b768da8f8 100644 --- a/transfers/stratigraphy_legacy.py +++ b/transfers/stratigraphy_legacy.py @@ -114,8 +114,8 @@ def _row_dict(self, row: pd.Series) -> Dict[str, Any] | None: "WellID": self._uuid_value(getattr(row, "WellID", None)), "PointID": point_id, "thing_id": thing_id, - "StratTop": self._float_value(getattr(row, "StratTop", None)), - "StratBottom": self._float_value(getattr(row, "StratBottom", None)), + "StratTop": self._int_value(getattr(row, "StratTop", None)), + "StratBottom": self._int_value(getattr(row, "StratBottom", None)), "UnitIdentifier": self._string_value(getattr(row, "UnitIdentifier", None)), "Lithology": self._string_value(getattr(row, "Lithology", None)), "LithologicModifier": self._string_value( @@ -151,7 +151,7 @@ def _int_value(self, value: Any) -> int | None: if value in (None, ""): return None try: - return int(value) + return int(float(value)) except (TypeError, ValueError): return None diff --git a/transfers/transfer.py b/transfers/transfer.py index 2d33176b2..340e73424 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -189,7 +189,11 @@ def _execute_transfer_with_timing(name: str, klass, flags: dict = None): """Execute transfer and return timing info.""" start = time.time() logger.info(f"Starting parallel transfer: {name}") - result = _execute_transfer(klass, flags) + effective_flags = dict(flags or {}) + yield_transfer_limit = effective_flags.get("LIMIT", 0) + if yield_transfer_limit: + effective_flags["LIMIT"] = max(1, yield_transfer_limit // 10) + result = _execute_transfer(klass, effective_flags) elapsed = time.time() - start logger.info(f"Completed parallel transfer: {name} in {elapsed:.2f}s") return name, result, elapsed @@ -200,7 +204,8 @@ def _execute_session_transfer_with_timing(name: str, transfer_func, limit: int): start = time.time() logger.info(f"Starting parallel transfer: {name}") with session_ctx() as session: - result = transfer_func(session, limit=limit) + effective_limit = max(1, limit // 10) if limit else 0 + result = transfer_func(session, limit=effective_limit) elapsed = time.time() - start logger.info(f"Completed parallel transfer: {name} in {elapsed:.2f}s") return name, result, elapsed @@ -240,6 +245,7 @@ def _drop_and_rebuild_db() -> None: with session_ctx() as session: recreate_public_schema(session) logger.info("Running Alembic migrations") + try: command.upgrade(_alembic_config(), "head") except SystemExit as exc: @@ -269,7 +275,22 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): logger.info("Erase and rebuilding database") erase_and_rebuild_db() + # Get transfer flags + message("TRANSFER OPTIONS") + transfer_options = load_transfer_options() + logger.info( + "Transfer options: %s", + { + field: getattr(transfer_options, field) + for field in transfer_options.__dataclass_fields__ + }, + ) + transfer_options.transfer_pressure = False + transfer_options.transfer_acoustic = False + flags = {"TRANSFER_ALL_WELLS": True, "LIMIT": limit} + message("TRANSFER_FLAGS") + logger.info(flags) profile_artifacts: list[ProfileArtifact] = [] water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) @@ -320,10 +341,6 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): results = _execute_transfer(WellTransferer, flags=flags) metrics.well_metrics(*results) - # Get transfer flags - transfer_options = load_transfer_options() - transfer_options.transfer_pressure = False - transfer_options.transfer_acoustic = False use_parallel = get_bool_env("TRANSFER_PARALLEL", True) if use_parallel: @@ -409,54 +426,49 @@ def _transfer_parallel( parallel_tasks_1 = [] if opts.transfer_screens: - parallel_tasks_1.append(("WellScreens", WellScreenTransferer, flags)) + parallel_tasks_1.append(("WellScreens", WellScreenTransferer)) if opts.transfer_contacts: - parallel_tasks_1.append(("Contacts", ContactTransfer, flags)) + parallel_tasks_1.append(("Contacts", ContactTransfer)) if opts.transfer_waterlevels: - parallel_tasks_1.append(("WaterLevels", WaterLevelTransferer, flags)) + parallel_tasks_1.append(("WaterLevels", WaterLevelTransferer)) if opts.transfer_link_ids: - parallel_tasks_1.append(("LinkIdsWellData", LinkIdsWellDataTransferer, flags)) - parallel_tasks_1.append( - ("LinkIdsLocation", LinkIdsLocationDataTransferer, flags) - ) + parallel_tasks_1.append(("LinkIdsWellData", LinkIdsWellDataTransferer)) + parallel_tasks_1.append(("LinkIdsLocation", LinkIdsLocationDataTransferer)) if opts.transfer_groups: - parallel_tasks_1.append(("Groups", ProjectGroupTransferer, flags)) + parallel_tasks_1.append(("Groups", ProjectGroupTransferer)) if opts.transfer_surface_water_photos: - parallel_tasks_1.append( - ("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer, flags) - ) + parallel_tasks_1.append(("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer)) if opts.transfer_soil_rock_results: - parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer, flags)) + parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer)) if opts.transfer_weather_photos: - parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer, flags)) + parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer)) if opts.transfer_assets: - parallel_tasks_1.append(("Assets", AssetTransferer, flags)) + parallel_tasks_1.append(("Assets", AssetTransferer)) if opts.transfer_associated_data: - parallel_tasks_1.append(("AssociatedData", AssociatedDataTransferer, flags)) + parallel_tasks_1.append(("AssociatedData", AssociatedDataTransferer)) if opts.transfer_surface_water_data: - parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer, flags)) + parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer)) if opts.transfer_hydraulics_data: - parallel_tasks_1.append(("HydraulicsData", HydraulicsDataTransferer, flags)) + parallel_tasks_1.append(("HydraulicsData", HydraulicsDataTransferer)) if opts.transfer_chemistry_sampleinfo: - parallel_tasks_1.append( - ("ChemistrySampleInfo", ChemistrySampleInfoTransferer, flags) - ) + parallel_tasks_1.append(("ChemistrySampleInfo", ChemistrySampleInfoTransferer)) if opts.transfer_ngwmn_views: parallel_tasks_1.append( - ("NGWMNWellConstruction", NGWMNWellConstructionTransferer, flags) + ("NGWMNWellConstruction", NGWMNWellConstructionTransferer) ) - parallel_tasks_1.append(("NGWMNWaterLevels", NGWMNWaterLevelsTransferer, flags)) - parallel_tasks_1.append(("NGWMNLithology", NGWMNLithologyTransferer, flags)) + parallel_tasks_1.append(("NGWMNWaterLevels", NGWMNWaterLevelsTransferer)) + parallel_tasks_1.append(("NGWMNLithology", NGWMNLithologyTransferer)) if opts.transfer_pressure_daily: parallel_tasks_1.append( ( "WaterLevelsPressureDaily", NMA_WaterLevelsContinuous_Pressure_DailyTransferer, - flags, ) ) if opts.transfer_weather_data: - parallel_tasks_1.append(("WeatherData", WeatherDataTransferer, flags)) + parallel_tasks_1.append(("WeatherData", WeatherDataTransferer)) + if opts.transfer_nma_stratigraphy: + parallel_tasks_1.append(("StratigraphyLegacy", StratigraphyLegacyTransferer)) # Track results for metrics results_map = {} @@ -466,29 +478,17 @@ def _transfer_parallel( futures = {} # Submit class-based transfers - for name, klass, task_flags in parallel_tasks_1: - future = executor.submit( - _execute_transfer_with_timing, name, klass, task_flags - ) + for name, klass in parallel_tasks_1: + future = executor.submit(_execute_transfer_with_timing, name, klass, flags) futures[future] = name - # Submit session-based transfers - if opts.transfer_nma_stratigraphy: - future = executor.submit( - _execute_transfer_with_timing, - "StratigraphyLegacy", - StratigraphyLegacyTransferer, - flags, - ) - futures[future] = "StratigraphyLegacy" - future = executor.submit( _execute_session_transfer_with_timing, - "Stratigraphy", + "StratigraphyNew", transfer_stratigraphy, limit, ) - futures[future] = "Stratigraphy" + futures[future] = "StratigraphyNew" future = executor.submit(_execute_permissions_with_timing, "Permissions") futures[future] = "Permissions" @@ -508,8 +508,8 @@ def _transfer_parallel( metrics.well_screen_metrics(*results_map["WellScreens"]) if "Contacts" in results_map and results_map["Contacts"]: metrics.contact_metrics(*results_map["Contacts"]) - if "Stratigraphy" in results_map and results_map["Stratigraphy"]: - metrics.stratigraphy_metrics(*results_map["Stratigraphy"]) + if "StratigraphyNew" in results_map and results_map["StratigraphyNew"]: + metrics.stratigraphy_metrics(*results_map["StratigraphyNew"]) if "StratigraphyLegacy" in results_map and results_map["StratigraphyLegacy"]: metrics.nma_stratigraphy_metrics(*results_map["StratigraphyLegacy"]) if "AssociatedData" in results_map and results_map["AssociatedData"]: @@ -551,6 +551,7 @@ def _transfer_parallel( metrics.weather_data_metrics(*results_map["WeatherData"]) if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) @@ -823,9 +824,10 @@ def main(): metrics, limit=limit, profile_waterlevels=profile_waterlevels ) - message("CLEANING UP LOCATIONS") - with session_ctx() as session: - cleanup_locations(session) + if get_bool_env("CLEANUP_LOCATIONS", True): + message("CLEANING UP LOCATIONS") + with session_ctx() as session: + cleanup_locations(session) metrics.close() metrics.save_to_storage_bucket() diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 154be399b..680615cb7 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -862,6 +862,10 @@ def transfer_parallel(self, num_workers: int = None) -> None: # Load dataframes self.input_df, self.cleaned_df = self._get_dfs() df = self.cleaned_df + limit = self.flags.get("LIMIT", 0) + if limit > 0: + df = df.head(limit) + self.cleaned_df = df n = len(df) if n == 0: From e0056e8b0a6ff8421c9d137a166bbdcc598bfe9e Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 31 Jan 2026 10:51:25 +1100 Subject: [PATCH 15/24] feat: add thing_id foreign key and relationship to NMA legacy model; refactor transfer logic for parallel execution --- ...23456789ab_add_observation_data_quality.py | 4 +- db/nma_legacy.py | 4 + transfers/transfer.py | 198 +----------------- 3 files changed, 14 insertions(+), 192 deletions(-) diff --git a/alembic/versions/e123456789ab_add_observation_data_quality.py b/alembic/versions/e123456789ab_add_observation_data_quality.py index 717a0c82e..0068fbf3e 100644 --- a/alembic/versions/e123456789ab_add_observation_data_quality.py +++ b/alembic/versions/e123456789ab_add_observation_data_quality.py @@ -1,7 +1,7 @@ """add nma_data_quality to observation Revision ID: e123456789ab -Revises: b12e3919077e +Revises: f0c9d8e7b6a5 Create Date: 2026-02-05 12:00:00.000000 """ @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision: str = "e123456789ab" -down_revision: Union[str, Sequence[str], None] = "b12e3919077e" +down_revision: Union[str, Sequence[str], None] = "f0c9d8e7b6a5" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/db/nma_legacy.py b/db/nma_legacy.py index df794ae71..9ec2d76af 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -562,6 +562,9 @@ class NMA_Radionuclides(Base): global_id: Mapped[uuid.UUID] = mapped_column( "GlobalID", UUID(as_uuid=True), primary_key=True ) + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) sample_pt_id: Mapped[uuid.UUID] = mapped_column( "SamplePtID", UUID(as_uuid=True), @@ -595,6 +598,7 @@ class NMA_Radionuclides(Base): chemistry_sample_info: Mapped["NMA_Chemistry_SampleInfo"] = relationship( "NMA_Chemistry_SampleInfo", back_populates="radionuclides" ) + thing: Mapped["Thing"] = relationship("Thing") @validates("thing_id") def validate_thing_id(self, key, value): diff --git a/transfers/transfer.py b/transfers/transfer.py index 340e73424..15ea7e5d6 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -341,26 +341,14 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): results = _execute_transfer(WellTransferer, flags=flags) metrics.well_metrics(*results) - use_parallel = get_bool_env("TRANSFER_PARALLEL", True) - - if use_parallel: - _transfer_parallel( - metrics, - flags, - limit, - transfer_options, - profile_waterlevels, - profile_artifacts, - ) - else: - _transfer_sequential( - metrics, - flags, - limit, - transfer_options, - profile_waterlevels, - profile_artifacts, - ) + _transfer_parallel( + metrics, + flags, + limit, + transfer_options, + profile_waterlevels, + profile_artifacts, + ) return profile_artifacts @@ -628,176 +616,6 @@ def _transfer_parallel( metrics.acoustic_metrics(*results_map["Acoustic"]) -def _transfer_sequential( - metrics, - flags, - limit, - transfer_options: TransferOptions, - profile_waterlevels: bool, - profile_artifacts, -): - """Original sequential transfer logic.""" - opts = transfer_options - if opts.transfer_screens: - with transfer_context("WELL SCREENS"): - results = _execute_transfer(WellScreenTransferer, flags=flags) - metrics.well_screen_metrics(*results) - - if opts.transfer_sensors: - with transfer_context("SENSORS"): - results = _execute_transfer(SensorTransferer, flags=flags) - metrics.sensor_metrics(*results) - - if opts.transfer_contacts: - with transfer_context("CONTACTS"): - results = _execute_transfer(ContactTransfer, flags=flags) - metrics.contact_metrics(*results) - - with transfer_context("PERMISSIONS"): - with session_ctx() as session: - transfer_permissions(session) - - if opts.transfer_nma_stratigraphy: - with transfer_context("NMA STRATIGRAPHY"): - results = _execute_transfer(StratigraphyLegacyTransferer, flags=flags) - metrics.nma_stratigraphy_metrics(*results) - - with transfer_context("STRATIGRAPHY"): - with session_ctx() as session: - results = transfer_stratigraphy(session, limit=limit) - metrics.stratigraphy_metrics(*results) - - if opts.transfer_waterlevels: - with transfer_context("WATER LEVELS"): - results = _execute_transfer(WaterLevelTransferer, flags=flags) - metrics.water_level_metrics(*results) - - if opts.transfer_link_ids: - message("TRANSFERRING LINK IDS") - results = _execute_transfer(LinkIdsWellDataTransferer, flags=flags) - metrics.welldata_link_ids_metrics(*results) - results = _execute_transfer(LinkIdsLocationDataTransferer, flags=flags) - metrics.location_link_ids_metrics(*results) - - if opts.transfer_groups: - message("TRANSFERRING GROUPS") - results = _execute_transfer(ProjectGroupTransferer, flags=flags) - metrics.group_metrics(*results) - - if opts.transfer_surface_water_photos: - message("TRANSFERRING SURFACE WATER PHOTOS") - results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) - metrics.surface_water_photos_metrics(*results) - - if opts.transfer_soil_rock_results: - message("TRANSFERRING SOIL ROCK RESULTS") - results = _execute_transfer(SoilRockResultsTransferer, flags=flags) - metrics.soil_rock_results_metrics(*results) - - if opts.transfer_weather_photos: - message("TRANSFERRING WEATHER PHOTOS") - results = _execute_transfer(WeatherPhotosTransferer, flags=flags) - metrics.weather_photos_metrics(*results) - - if opts.transfer_assets: - message("TRANSFERRING ASSETS") - results = _execute_transfer(AssetTransferer, flags=flags) - metrics.asset_metrics(*results) - - if opts.transfer_associated_data: - message("TRANSFERRING ASSOCIATED DATA") - results = _execute_transfer(AssociatedDataTransferer, flags=flags) - metrics.associated_data_metrics(*results) - - if opts.transfer_surface_water_data: - message("TRANSFERRING SURFACE WATER DATA") - results = _execute_transfer(SurfaceWaterDataTransferer, flags=flags) - metrics.surface_water_data_metrics(*results) - - if opts.transfer_hydraulics_data: - message("TRANSFERRING HYDRAULICS DATA") - results = _execute_transfer(HydraulicsDataTransferer, flags=flags) - metrics.hydraulics_data_metrics(*results) - - if opts.transfer_chemistry_sampleinfo: - message("TRANSFERRING CHEMISTRY SAMPLEINFO") - results = _execute_transfer(ChemistrySampleInfoTransferer, flags=flags) - metrics.chemistry_sampleinfo_metrics(*results) - - if opts.transfer_field_parameters: - message("TRANSFERRING FIELD PARAMETERS") - results = _execute_transfer(FieldParametersTransferer, flags=flags) - metrics.field_parameters_metrics(*results) - - if opts.transfer_major_chemistry: - message("TRANSFERRING MAJOR CHEMISTRY") - results = _execute_transfer(MajorChemistryTransferer, flags=flags) - metrics.major_chemistry_metrics(*results) - - if opts.transfer_radionuclides: - message("TRANSFERRING RADIONUCLIDES") - results = _execute_transfer(RadionuclidesTransferer, flags=flags) - metrics.radionuclides_metrics(*results) - - if opts.transfer_ngwmn_views: - message("TRANSFERRING NGWMN WELL CONSTRUCTION") - results = _execute_transfer(NGWMNWellConstructionTransferer, flags=flags) - metrics.ngwmn_well_construction_metrics(*results) - message("TRANSFERRING NGWMN WATER LEVELS") - results = _execute_transfer(NGWMNWaterLevelsTransferer, flags=flags) - metrics.ngwmn_water_levels_metrics(*results) - message("TRANSFERRING NGWMN LITHOLOGY") - results = _execute_transfer(NGWMNLithologyTransferer, flags=flags) - metrics.ngwmn_lithology_metrics(*results) - - if opts.transfer_pressure_daily: - message("TRANSFERRING WATER LEVELS PRESSURE DAILY") - results = _execute_transfer( - NMA_WaterLevelsContinuous_Pressure_DailyTransferer, flags=flags - ) - metrics.waterlevels_pressure_daily_metrics(*results) - - if opts.transfer_weather_data: - message("TRANSFERRING WEATHER DATA") - results = _execute_transfer(WeatherDataTransferer, flags=flags) - metrics.weather_data_metrics(*results) - - if opts.transfer_minor_trace_chemistry: - message("TRANSFERRING MINOR TRACE CHEMISTRY") - results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags) - metrics.minor_trace_chemistry_metrics(*results) - - if opts.transfer_pressure: - message("TRANSFERRING WATER LEVELS PRESSURE") - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_pressure") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousPressureTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousPressureTransferer, flags=flags - ) - metrics.pressure_metrics(*results) - - if opts.transfer_acoustic: - message("TRANSFERRING WATER LEVELS ACOUSTIC") - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_acoustic") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousAcousticTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousAcousticTransferer, flags=flags - ) - metrics.acoustic_metrics(*results) - - return profile_artifacts - - def main(): message("START--------------------------------------") From 0eaefb129b5d1008272838d2f985c6091c3a71b9 Mon Sep 17 00:00:00 2001 From: jirhiker Date: Sat, 31 Jan 2026 00:17:40 +0000 Subject: [PATCH 16/24] Formatting changes --- tests/integration/test_nma_legacy_relationships.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_nma_legacy_relationships.py b/tests/integration/test_nma_legacy_relationships.py index 1a7ff9d73..c34867c49 100644 --- a/tests/integration/test_nma_legacy_relationships.py +++ b/tests/integration/test_nma_legacy_relationships.py @@ -49,7 +49,6 @@ ) from db.thing import Thing - # ============================================================================= # Fixtures # ============================================================================= From 4a56b0b1baf0d90d94a0a625b46fc53d57cf5095 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 31 Jan 2026 12:14:10 +1100 Subject: [PATCH 17/24] feat: refactor Thing ID caching and add nma_WCLab_ID to NMA_MinorTraceChemistry --- ...51fd_refactor_nma_tables_to_integer_pks.py | 3 - ...add_unique_index_ngwmn_wellconstruction.py | 34 +++++ ...6b3d2e8_add_nma_wclab_id_to_minor_trace.py | 29 ++++ db/nma_legacy.py | 2 + transfers/associated_data.py | 73 ++++++++- transfers/minor_trace_chemistry_transfer.py | 4 +- transfers/soil_rock_results.py | 67 ++++++++- transfers/transfer.py | 142 ++++++------------ 8 files changed, 234 insertions(+), 120 deletions(-) create mode 100644 alembic/versions/50d1c2a3b4c5_add_unique_index_ngwmn_wellconstruction.py create mode 100644 alembic/versions/71a4c6b3d2e8_add_nma_wclab_id_to_minor_trace.py diff --git a/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py b/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py index 1245b5312..a0a7edb8b 100644 --- a/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py +++ b/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py @@ -507,7 +507,6 @@ def upgrade() -> None: existing_type=sa.VARCHAR(), comment="To audit the original NM_Aquifer LocationID if it was transferred over", existing_nullable=True, - autoincrement=False, ) op.alter_column( "thing_version", @@ -515,7 +514,6 @@ def upgrade() -> None: existing_type=sa.VARCHAR(length=25), comment="Raw FormationZone value from legacy WellData (NM_Aquifer).", existing_nullable=True, - autoincrement=False, ) op.alter_column( "transducer_observation", @@ -556,7 +554,6 @@ def downgrade() -> None: comment=None, existing_comment="Raw FormationZone value from legacy WellData (NM_Aquifer).", existing_nullable=True, - autoincrement=False, ) op.alter_column( "thing_version", diff --git a/alembic/versions/50d1c2a3b4c5_add_unique_index_ngwmn_wellconstruction.py b/alembic/versions/50d1c2a3b4c5_add_unique_index_ngwmn_wellconstruction.py new file mode 100644 index 000000000..ceffbdaad --- /dev/null +++ b/alembic/versions/50d1c2a3b4c5_add_unique_index_ngwmn_wellconstruction.py @@ -0,0 +1,34 @@ +"""Add unique index for NGWMN well construction + +Revision ID: 50d1c2a3b4c5 +Revises: 43bc34504ee6 +Create Date: 2026-01-31 00:27:12.204176 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "50d1c2a3b4c5" +down_revision: Union[str, Sequence[str], None] = "43bc34504ee6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +INDEX_NAME = "uq_ngwmn_wc_point_casing_screen" +TABLE_NAME = "NMA_view_NGWMN_WellConstruction" + + +def upgrade() -> None: + op.create_index( + INDEX_NAME, + TABLE_NAME, + ["PointID", "CasingTop", "ScreenTop"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index(INDEX_NAME, table_name=TABLE_NAME) diff --git a/alembic/versions/71a4c6b3d2e8_add_nma_wclab_id_to_minor_trace.py b/alembic/versions/71a4c6b3d2e8_add_nma_wclab_id_to_minor_trace.py new file mode 100644 index 000000000..bebaf5dff --- /dev/null +++ b/alembic/versions/71a4c6b3d2e8_add_nma_wclab_id_to_minor_trace.py @@ -0,0 +1,29 @@ +"""Add nma_WCLab_ID column to NMA_MinorTraceChemistry + +Revision ID: 71a4c6b3d2e8 +Revises: 50d1c2a3b4c5 +Create Date: 2026-01-31 01:05:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "71a4c6b3d2e8" +down_revision: Union[str, Sequence[str], None] = "50d1c2a3b4c5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "NMA_MinorTraceChemistry", + sa.Column("nma_WCLab_ID", sa.String(length=25), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column("NMA_MinorTraceChemistry", "nma_WCLab_ID") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 0265c0044..4b32fd064 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -657,6 +657,7 @@ class NMA_MinorTraceChemistry(Base): - nma_global_id: Original UUID PK, now UNIQUE for audit - chemistry_sample_info_id: Integer FK to NMA_Chemistry_SampleInfo.id - nma_chemistry_sample_info_uuid: Legacy UUID FK for audit + - nma_wclab_id: Legacy WCLab_ID string (audit) """ __tablename__ = "NMA_MinorTraceChemistry" @@ -704,6 +705,7 @@ class NMA_MinorTraceChemistry(Base): analyses_agency: Mapped[Optional[str]] = mapped_column( "analyses_agency", String(100) ) + nma_wclab_id: Mapped[Optional[str]] = mapped_column("nma_WCLab_ID", String(25)) # --- Relationships --- chemistry_sample_info: Mapped["NMA_Chemistry_SampleInfo"] = relationship( diff --git a/transfers/associated_data.py b/transfers/associated_data.py index ca9195b06..6c667acaf 100644 --- a/transfers/associated_data.py +++ b/transfers/associated_data.py @@ -48,14 +48,27 @@ class AssociatedDataTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size - self._thing_id_cache: dict[str, int] = {} + self._thing_id_by_point_id: dict[str, int] = {} + self._thing_id_by_location_id: dict[str, int] = {} self._build_thing_id_cache() def _build_thing_id_cache(self) -> None: with session_ctx() as session: - things = session.query(Thing.name, Thing.id).all() - self._thing_id_cache = {name: thing_id for name, thing_id in things} - logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + things = session.query(Thing.id, Thing.name, Thing.nma_pk_location).all() + for thing_id, name, nma_pk_location in things: + if name: + point_key = self._normalize_point_id(name) + if point_key: + self._thing_id_by_point_id[point_key] = thing_id + if nma_pk_location: + key = self._normalize_location_id(nma_pk_location) + if key: + self._thing_id_by_location_id[key] = thing_id + logger.info( + "Built Thing caches with %s point ids and %s location ids", + len(self._thing_id_by_point_id), + len(self._thing_id_by_location_id), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -63,13 +76,27 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, cleaned_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows: list[dict[str, Any]] = [] + skipped_missing_thing = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_thing += 1 + continue + rows.append(record) + rows = self._dedupe_rows(rows, key="nma_AssocID") if not rows: logger.info("No AssociatedData rows to transfer") return + if skipped_missing_thing: + logger.warning( + "Skipped %s AssociatedData rows without matching Thing", + skipped_missing_thing, + ) + insert_stmt = insert(NMA_AssociatedData) excluded = insert_stmt.excluded @@ -96,22 +123,52 @@ def _transfer_hook(self, session: Session) -> None: session.execute(stmt) session.commit() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: point_id = row.get("PointID") + location_id = self._uuid_val(row.get("LocationId")) + thing_id = self._resolve_thing_id(point_id, location_id) + if thing_id is None: + logger.warning( + "Skipping AssociatedData PointID=%s LocationId=%s - Thing not found", + point_id, + location_id, + ) + return None + return { # Legacy UUID PK -> nma_assoc_id (unique audit column) "nma_AssocID": self._uuid_val(row.get("AssocID")), # Legacy ID columns (renamed with nma_ prefix) - "nma_LocationId": self._uuid_val(row.get("LocationId")), + "nma_LocationId": location_id, "nma_PointID": point_id, "nma_OBJECTID": row.get("OBJECTID"), # Data columns "Notes": row.get("Notes"), "Formation": row.get("Formation"), # FK to Thing - "thing_id": self._thing_id_cache.get(point_id), + "thing_id": thing_id, } + def _resolve_thing_id( + self, point_id: Optional[str], location_id: Optional[UUID] + ) -> Optional[int]: + if location_id is not None: + key = self._normalize_location_id(str(location_id)) + thing_id = self._thing_id_by_location_id.get(key) + if thing_id is not None: + return thing_id + if point_id: + return self._thing_id_by_point_id.get(self._normalize_point_id(point_id)) + return None + + @staticmethod + def _normalize_point_id(value: str) -> str: + return value.strip().upper() + + @staticmethod + def _normalize_location_id(value: str) -> str: + return value.strip().lower() + def _dedupe_rows( self, rows: list[dict[str, Any]], key: str ) -> list[dict[str, Any]]: diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index f021cb202..51cb1468f 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -157,7 +157,7 @@ def _transfer_hook(self, session: Session) -> None: "uncertainty": excluded.uncertainty, "volume": excluded.volume, "volume_unit": excluded.volume_unit, - "WCLab_ID": excluded.WCLab_ID, + "nma_WCLab_ID": excluded.nma_WCLab_ID, }, ) session.execute(stmt) @@ -214,7 +214,7 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: "uncertainty": self._safe_float(row, "Uncertainty"), "volume": self._safe_int(row, "Volume"), "volume_unit": self._safe_str(row, "VolumeUnit"), - "WCLab_ID": self._safe_str(row, "WCLab_ID"), + "nma_WCLab_ID": self._safe_str(row, "WCLab_ID"), } def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: diff --git a/transfers/soil_rock_results.py b/transfers/soil_rock_results.py index 1aae4e3ad..fd3894e52 100644 --- a/transfers/soil_rock_results.py +++ b/transfers/soil_rock_results.py @@ -42,14 +42,27 @@ class SoilRockResultsTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size - self._thing_id_cache: dict[str, int] = {} + self._thing_id_by_point_id: dict[str, int] = {} + self._thing_id_by_location_id: dict[str, int] = {} self._build_thing_id_cache() def _build_thing_id_cache(self) -> None: with session_ctx() as session: - things = session.query(Thing.name, Thing.id).all() - self._thing_id_cache = {name: thing_id for name, thing_id in things} - logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + things = session.query(Thing.id, Thing.name, Thing.nma_pk_location).all() + for thing_id, name, nma_pk_location in things: + if name: + point_key = self._normalize_point_id(name) + if point_key: + self._thing_id_by_point_id[point_key] = thing_id + if nma_pk_location: + loc_key = self._normalize_location_id(nma_pk_location) + if loc_key: + self._thing_id_by_location_id[loc_key] = thing_id + logger.info( + "Built Thing caches with %s point ids and %s location ids", + len(self._thing_id_by_point_id), + len(self._thing_id_by_location_id), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -57,12 +70,25 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, cleaned_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows: list[dict[str, Any]] = [] + skipped_missing_thing = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_thing += 1 + continue + rows.append(record) if not rows: logger.info("No Soil_Rock_Results rows to transfer") return + if skipped_missing_thing: + logger.warning( + "Skipped %s Soil_Rock_Results rows without matching Thing", + skipped_missing_thing, + ) + for i in range(0, len(rows), self.batch_size): chunk = rows[i : i + self.batch_size] logger.info( @@ -74,8 +100,16 @@ def _transfer_hook(self, session: Session) -> None: session.bulk_insert_mappings(NMA_Soil_Rock_Results, chunk) session.commit() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: point_id = row.get("Point_ID") + thing_id = self._resolve_thing_id(point_id) + if thing_id is None: + logger.warning( + "Skipping Soil_Rock_Results Point_ID=%s - Thing not found", + point_id, + ) + return None + return { # Legacy ID column (use Python attribute name for bulk_insert_mappings) "nma_point_id": point_id, @@ -86,9 +120,28 @@ def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: "d18o": self._float_val(row.get("d18O")), "sampled_by": row.get("Sampled by"), # FK to Thing - "thing_id": self._thing_id_cache.get(point_id), + "thing_id": thing_id, } + def _resolve_thing_id(self, point_id: Optional[str]) -> Optional[int]: + if point_id is None: + return None + + key = self._normalize_location_id(point_id) + thing_id = self._thing_id_by_location_id.get(key) + if thing_id is not None: + return thing_id + + return self._thing_id_by_point_id.get(self._normalize_point_id(point_id)) + + @staticmethod + def _normalize_point_id(value: str) -> str: + return str(value).strip().upper() + + @staticmethod + def _normalize_location_id(value: str) -> str: + return str(value).strip().lower() + def _float_val(self, value: Any) -> Optional[float]: if value is None or pd.isna(value): return None diff --git a/transfers/transfer.py b/transfers/transfer.py index 437d318ee..a8f18e05a 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -43,7 +43,6 @@ from transfers.metrics import Metrics from transfers.profiling import ( - TransferProfiler, ProfileArtifact, upload_profile_artifacts, ) @@ -301,20 +300,18 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): for field in transfer_options.__dataclass_fields__ }, ) - transfer_options.transfer_pressure = False - transfer_options.transfer_acoustic = False flags = {"TRANSFER_ALL_WELLS": True, "LIMIT": limit} message("TRANSFER_FLAGS") logger.info(flags) profile_artifacts: list[ProfileArtifact] = [] - water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) + continuous_water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) # ========================================================================= # PHASE 1: Foundation (Parallel - these are independent of each other) # ========================================================================= - if water_levels_only: + if continuous_water_levels_only: logger.info("CONTINUOUS_WATER_LEVELS set; running only continuous transfers") _run_continuous_water_level_transfers( metrics, flags, profile_waterlevels, profile_artifacts @@ -393,62 +390,49 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): ) except Exception as e: logger.critical(f"Non-well transfer {name} failed: {e}") - use_parallel = get_bool_env("TRANSFER_PARALLEL", True) - if use_parallel: - _transfer_parallel( - metrics, - flags, - limit, - transfer_options, - profile_waterlevels, - profile_artifacts, - ) + _transfer_parallel( + metrics, + flags, + limit, + transfer_options, + ) return profile_artifacts -def _run_water_level_transfers( - metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] -): - message("WATER LEVEL TRANSFERS ONLY") - - results = _execute_transfer(WaterLevelTransferer, flags=flags) - metrics.water_level_metrics(*results) - - _run_continuous_water_level_transfers( - metrics, flags, profile_waterlevels, profile_artifacts - ) +def _run_continuous_water_level_transfers(metrics, flags): + message("CONTINUOUS WATER LEVEL TRANSFERS") + # ========================================================================= + # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) + # ========================================================================= + message("PARALLEL TRANSFER GROUP 2 (Continuous Water Levels)") -def _run_continuous_water_level_transfers( - metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] -): - message("CONTINUOUS WATER LEVEL TRANSFERS") + parallel_tasks = [ + ("Pressure", WaterLevelsContinuousPressureTransferer), + ("Acoustic", WaterLevelsContinuousAcousticTransferer), + ] + results_map = {} + with ThreadPoolExecutor(max_workers=2) as executor: + futures = {} + for name, klass, task_flags in parallel_tasks: + future = executor.submit(_execute_transfer_with_timing, name, klass, flags) + futures[future] = name - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_pressure") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousPressureTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousPressureTransferer, flags=flags - ) - metrics.pressure_metrics(*results) + for future in as_completed(futures): + name = futures[future] + try: + result_name, result, elapsed = future.result() + results_map[result_name] = result + logger.info(f"Parallel task {result_name} completed in {elapsed:.2f}s") + except Exception as e: + logger.critical(f"Parallel task {name} failed: {e}") - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_acoustic") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousAcousticTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousAcousticTransferer, flags=flags - ) - metrics.acoustic_metrics(*results) + if "Pressure" in results_map and results_map["Pressure"]: + metrics.pressure_metrics(*results_map["Pressure"]) + if "Acoustic" in results_map and results_map["Acoustic"]: + metrics.acoustic_metrics(*results_map["Acoustic"]) def _transfer_parallel( @@ -456,8 +440,6 @@ def _transfer_parallel( flags, limit, transfer_options: TransferOptions, - profile_waterlevels: bool, - profile_artifacts, ): """Execute transfers in parallel where possible.""" message("PARALLEL TRANSFER GROUP 1") @@ -623,52 +605,12 @@ def _transfer_parallel( results = _execute_transfer(SensorTransferer, flags=flags) metrics.sensor_metrics(*results) - # ========================================================================= - # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) - # ========================================================================= - if opts.transfer_pressure or opts.transfer_acoustic: - message("PARALLEL TRANSFER GROUP 2 (Continuous Water Levels)") - - parallel_tasks_2 = [] - if opts.transfer_pressure: - parallel_tasks_2.append( - ("Pressure", WaterLevelsContinuousPressureTransferer, flags) - ) - if opts.transfer_acoustic: - parallel_tasks_2.append( - ("Acoustic", WaterLevelsContinuousAcousticTransferer, flags) - ) - - if profile_waterlevels: - for name, klass, task_flags in parallel_tasks_2: - profiler = TransferProfiler(f"waterlevels_continuous_{name.lower()}") - results, artifact = profiler.run(_execute_transfer, klass, task_flags) - profile_artifacts.append(artifact) - results_map[name] = results - else: - with ThreadPoolExecutor(max_workers=2) as executor: - futures = {} - for name, klass, task_flags in parallel_tasks_2: - future = executor.submit( - _execute_transfer_with_timing, name, klass, task_flags - ) - futures[future] = name - - for future in as_completed(futures): - name = futures[future] - try: - result_name, result, elapsed = future.result() - results_map[result_name] = result - logger.info( - f"Parallel task {result_name} completed in {elapsed:.2f}s" - ) - except Exception as e: - logger.critical(f"Parallel task {name} failed: {e}") - - if "Pressure" in results_map and results_map["Pressure"]: - metrics.pressure_metrics(*results_map["Pressure"]) - if "Acoustic" in results_map and results_map["Acoustic"]: - metrics.acoustic_metrics(*results_map["Acoustic"]) + # # ========================================================================= + # # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) + # # ========================================================================= + # Continuous water levels handled separately in _run_continuous_water_level_transfers() + # the transfer process is bisected because the continuous water levels process is + # very time consuming and we want to run it alone in its own phase. def main(): From 29ccb1406e25d7375922297331f505fe18ffac5d Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 31 Jan 2026 12:20:05 +1100 Subject: [PATCH 18/24] feat: improve cache access and refactor WCLab_ID handling in minor_trace_chemistry_transfer --- transfers/minor_trace_chemistry_transfer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 51cb1468f..0ab5f8ced 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -177,7 +177,8 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: return None # Look up Integer FK from cache - chemistry_sample_info_id = self._sample_info_cache.get(legacy_sample_pt_id) + cache = getattr(self, "_sample_info_cache", {}) + chemistry_sample_info_id = cache.get(legacy_sample_pt_id) if chemistry_sample_info_id is None: self._capture_error( legacy_sample_pt_id, @@ -195,7 +196,8 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: ) return None - return { + wclab_id = self._safe_str(row, "WCLab_ID") + row_dict = { # Legacy UUID PK -> nma_global_id (unique audit column) "nma_GlobalID": nma_global_id, # New Integer FK to ChemistrySampleInfo @@ -214,8 +216,11 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: "uncertainty": self._safe_float(row, "Uncertainty"), "volume": self._safe_int(row, "Volume"), "volume_unit": self._safe_str(row, "VolumeUnit"), - "nma_WCLab_ID": self._safe_str(row, "WCLab_ID"), + "nma_WCLab_ID": wclab_id, } + if wclab_id is not None: + row_dict["WCLab_ID"] = wclab_id + return row_dict def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" From 2df12414a23c2be180052268aac58f17f8403cf6 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 31 Jan 2026 12:25:26 +1100 Subject: [PATCH 19/24] fix: ensure cache initialization in minor_trace_chemistry_transfer --- transfers/minor_trace_chemistry_transfer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 0ab5f8ced..4e06ed846 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -177,7 +177,9 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: return None # Look up Integer FK from cache - cache = getattr(self, "_sample_info_cache", {}) + cache = getattr(self, "_sample_info_cache", None) + if cache is None: + cache = {} chemistry_sample_info_id = cache.get(legacy_sample_pt_id) if chemistry_sample_info_id is None: self._capture_error( From 4b07213277f8fbc16c6875587fce2ee9712325eb Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 31 Jan 2026 13:49:28 +1100 Subject: [PATCH 20/24] feat: initialize sample_info_cache and errors in MinorTraceChemistryTransferer tests --- tests/test_minor_trace_chemistry_transfer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_minor_trace_chemistry_transfer.py b/tests/test_minor_trace_chemistry_transfer.py index fec7be618..f8507fbb9 100644 --- a/tests/test_minor_trace_chemistry_transfer.py +++ b/tests/test_minor_trace_chemistry_transfer.py @@ -9,7 +9,9 @@ def test_row_to_dict_includes_wclab_id(): transfer = MinorTraceChemistryTransferer.__new__(MinorTraceChemistryTransferer) sample_pt_id = uuid.uuid4() transfer._sample_pt_ids = {sample_pt_id} + transfer._sample_info_cache = {sample_pt_id: 1} transfer.flags = {} + transfer.errors = [] row = pd.Series( { From 104571ca63dff91e1556ef9439d053f1d5561c53 Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Sat, 31 Jan 2026 13:50:18 +1100 Subject: [PATCH 21/24] Update transfers/transfer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- transfers/transfer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transfers/transfer.py b/transfers/transfer.py index a8f18e05a..13c0a1673 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -416,7 +416,7 @@ def _run_continuous_water_level_transfers(metrics, flags): results_map = {} with ThreadPoolExecutor(max_workers=2) as executor: futures = {} - for name, klass, task_flags in parallel_tasks: + for name, klass in parallel_tasks: future = executor.submit(_execute_transfer_with_timing, name, klass, flags) futures[future] = name From eccce9bd5c2b80a903b27acaef97460231dde01d Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Sat, 31 Jan 2026 13:51:49 +1100 Subject: [PATCH 22/24] Update transfers/minor_trace_chemistry_transfer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- transfers/minor_trace_chemistry_transfer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 4e06ed846..d0503e709 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -220,8 +220,6 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: "volume_unit": self._safe_str(row, "VolumeUnit"), "nma_WCLab_ID": wclab_id, } - if wclab_id is not None: - row_dict["WCLab_ID"] = wclab_id return row_dict def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: From c862a94add217333ac9021736349756a3b6e5a96 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 31 Jan 2026 13:53:20 +1100 Subject: [PATCH 23/24] refactor: streamline cache access in minor_trace_chemistry_transfer --- tests/test_minor_trace_chemistry_transfer.py | 1 + transfers/minor_trace_chemistry_transfer.py | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_minor_trace_chemistry_transfer.py b/tests/test_minor_trace_chemistry_transfer.py index f8507fbb9..4c9d1e780 100644 --- a/tests/test_minor_trace_chemistry_transfer.py +++ b/tests/test_minor_trace_chemistry_transfer.py @@ -6,6 +6,7 @@ def test_row_to_dict_includes_wclab_id(): + # Bypass __init__ so we can stub the cache without hitting the DB. transfer = MinorTraceChemistryTransferer.__new__(MinorTraceChemistryTransferer) sample_pt_id = uuid.uuid4() transfer._sample_pt_ids = {sample_pt_id} diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index d0503e709..5f84bfda6 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -177,10 +177,7 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: return None # Look up Integer FK from cache - cache = getattr(self, "_sample_info_cache", None) - if cache is None: - cache = {} - chemistry_sample_info_id = cache.get(legacy_sample_pt_id) + chemistry_sample_info_id = self._sample_info_cache.get(legacy_sample_pt_id) if chemistry_sample_info_id is None: self._capture_error( legacy_sample_pt_id, From 1ba2c73f7e2c6085e1fa22d50579c0f941c6720f Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Sat, 31 Jan 2026 13:54:09 +1100 Subject: [PATCH 24/24] Update tests/test_minor_trace_chemistry_transfer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_minor_trace_chemistry_transfer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_minor_trace_chemistry_transfer.py b/tests/test_minor_trace_chemistry_transfer.py index 4c9d1e780..2d38e1a19 100644 --- a/tests/test_minor_trace_chemistry_transfer.py +++ b/tests/test_minor_trace_chemistry_transfer.py @@ -35,4 +35,4 @@ def test_row_to_dict_includes_wclab_id(): ) row_dict = transfer._row_to_dict(row) - assert row_dict["WCLab_ID"] == "LAB-123" + assert row_dict["nma_WCLab_ID"] == "LAB-123"