diff --git a/admin/config.py b/admin/config.py index 5aec1a5f4..1c3bb14f0 100644 --- a/admin/config.py +++ b/admin/config.py @@ -53,6 +53,7 @@ SurfaceWaterPhotosAdmin, ThingAdmin, TransducerObservationAdmin, + WaterLevelsContinuousPressureDailyAdmin, WeatherPhotosAdmin, WeatherDataAdmin, FieldParametersAdmin, @@ -80,6 +81,7 @@ NMA_Soil_Rock_Results, NMA_Stratigraphy, NMA_SurfaceWaterData, + NMA_WaterLevelsContinuous_Pressure_Daily, NMA_WeatherPhotos, NMA_SurfaceWaterPhotos, NMA_WeatherData, @@ -192,6 +194,13 @@ def create_admin(app): # Transducer observations admin.add_view(TransducerObservationAdmin(TransducerObservation)) + # Water Levels - Continuous (legacy) + admin.add_view( + WaterLevelsContinuousPressureDailyAdmin( + NMA_WaterLevelsContinuous_Pressure_Daily + ) + ) + # Weather admin.add_view(WeatherPhotosAdmin(NMA_WeatherPhotos)) diff --git a/admin/views/__init__.py b/admin/views/__init__.py index 33920b856..285d5ef5f 100644 --- a/admin/views/__init__.py +++ b/admin/views/__init__.py @@ -52,6 +52,9 @@ from admin.views.surface_water_photos import SurfaceWaterPhotosAdmin from admin.views.thing import ThingAdmin from admin.views.transducer_observation import TransducerObservationAdmin +from admin.views.waterlevelscontinuous_pressure_daily import ( + WaterLevelsContinuousPressureDailyAdmin, +) from admin.views.weather_photos import WeatherPhotosAdmin from admin.views.weather_data import WeatherDataAdmin @@ -88,6 +91,7 @@ "SurfaceWaterPhotosAdmin", "ThingAdmin", "TransducerObservationAdmin", + "WaterLevelsContinuousPressureDailyAdmin", "WeatherPhotosAdmin", "WeatherDataAdmin", ] diff --git a/admin/views/waterlevelscontinuous_pressure_daily.py b/admin/views/waterlevelscontinuous_pressure_daily.py new file mode 100644 index 000000000..ac2afb020 --- /dev/null +++ b/admin/views/waterlevelscontinuous_pressure_daily.py @@ -0,0 +1,148 @@ +# =============================================================================== +# Copyright 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +WaterLevelsContinuousPressureDailyAdmin view for legacy NMA_WaterLevelsContinuous_Pressure_Daily. +""" + +from starlette.requests import Request + +from admin.views.base import OcotilloModelView + + +class WaterLevelsContinuousPressureDailyAdmin(OcotilloModelView): + """ + Admin view for NMA_WaterLevelsContinuous_Pressure_Daily model. + """ + + # ========== Basic Configuration ========== + name = "NMA Water Levels Continuous Pressure Daily" + label = "NMA Water Levels Continuous Pressure Daily" + icon = "fa fa-tachometer-alt" + + def can_create(self, request: Request) -> bool: + return False + + def can_edit(self, request: Request) -> bool: + return False + + def can_delete(self, request: Request) -> bool: + return False + + # ========== List View ========== + list_fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "temperature_water", + "water_head", + "water_head_adjusted", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "notes", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + sortable_fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "water_head", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + fields_default_sort = [("date_measured", True)] + + searchable_fields = [ + "global_id", + "well_id", + "point_id", + "date_measured", + "measurement_method", + "data_source", + "measuring_agency", + "notes", + ] + + page_size = 50 + page_size_options = [25, 50, 100, 200] + + # ========== Detail View ========== + fields = [ + "global_id", + "object_id", + "well_id", + "point_id", + "date_measured", + "temperature_water", + "water_head", + "water_head_adjusted", + "depth_to_water_bgs", + "measurement_method", + "data_source", + "measuring_agency", + "qced", + "notes", + "created", + "updated", + "processed_by", + "checked_by", + "cond_dl_ms_cm", + ] + + field_labels = { + "global_id": "GlobalID", + "object_id": "OBJECTID", + "well_id": "WellID", + "point_id": "PointID", + "date_measured": "Date Measured", + "temperature_water": "Temperature Water", + "water_head": "Water Head", + "water_head_adjusted": "Water Head Adjusted", + "depth_to_water_bgs": "Depth To Water (BGS)", + "measurement_method": "Measurement Method", + "data_source": "Data Source", + "measuring_agency": "Measuring Agency", + "qced": "QCed", + "notes": "Notes", + "created": "Created", + "updated": "Updated", + "processed_by": "Processed By", + "checked_by": "Checked By", + "cond_dl_ms_cm": "CONDDL (mS/cm)", + } + + +# ============= EOF ============================================= diff --git a/alembic/env.py b/alembic/env.py index 526711ae9..62deed2df 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -5,9 +5,10 @@ from alembic import context from dotenv import load_dotenv -from services.util import get_bool_env from sqlalchemy import create_engine, engine_from_config, pool, text +from services.util import get_bool_env + # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config @@ -15,8 +16,16 @@ # Interpret the config file for Python logging. # This line sets up loggers basically. -if config.config_file_name is not None: +if config.config_file_name is not None and os.environ.get( + "ALEMBIC_USE_FILE_CONFIG", "0" +) not in {"0", "false", "False"}: fileConfig(config.config_file_name, disable_existing_loggers=False) +else: + root_logger = logging.getLogger() + alembic_logger = logging.getLogger("alembic") + alembic_logger.handlers = root_logger.handlers[:] + alembic_logger.setLevel(root_logger.level) + alembic_logger.propagate = False # add your model's MetaData object here # for 'autogenerate' support diff --git a/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py b/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py index fdfb8c55e..a0a7edb8b 100644 --- a/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py +++ b/alembic/versions/3cb924ca51fd_refactor_nma_tables_to_integer_pks.py @@ -8,10 +8,8 @@ from typing import Sequence, Union -from alembic import op -import geoalchemy2 import sqlalchemy as sa -import sqlalchemy_utils +from alembic import op from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. @@ -509,7 +507,6 @@ def upgrade() -> None: existing_type=sa.VARCHAR(), comment="To audit the original NM_Aquifer LocationID if it was transferred over", existing_nullable=True, - autoincrement=False, ) op.alter_column( "thing_version", @@ -517,7 +514,6 @@ def upgrade() -> None: existing_type=sa.VARCHAR(length=25), comment="Raw FormationZone value from legacy WellData (NM_Aquifer).", existing_nullable=True, - autoincrement=False, ) op.alter_column( "transducer_observation", @@ -558,7 +554,6 @@ def downgrade() -> None: comment=None, existing_comment="Raw FormationZone value from legacy WellData (NM_Aquifer).", existing_nullable=True, - autoincrement=False, ) op.alter_column( "thing_version", diff --git a/alembic/versions/43bc34504ee6_merge_migrations_after_staging_merge.py b/alembic/versions/43bc34504ee6_merge_migrations_after_staging_merge.py index 82f93b47a..86943385a 100644 --- a/alembic/versions/43bc34504ee6_merge_migrations_after_staging_merge.py +++ b/alembic/versions/43bc34504ee6_merge_migrations_after_staging_merge.py @@ -1,21 +1,16 @@ """merge_migrations_after_staging_merge Revision ID: 43bc34504ee6 -Revises: 3cb924ca51fd, e123456789ab +Revises: 3cb924ca51fd Create Date: 2026-01-30 11:52:41.932306 """ from typing import Sequence, Union -from alembic import op -import geoalchemy2 -import sqlalchemy as sa -import sqlalchemy_utils - # revision identifiers, used by Alembic. revision: str = "43bc34504ee6" -down_revision: Union[str, Sequence[str], None] = ("3cb924ca51fd", "e123456789ab") +down_revision: Union[str, Sequence[str], None] = "3cb924ca51fd" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/alembic/versions/50d1c2a3b4c5_add_unique_index_ngwmn_wellconstruction.py b/alembic/versions/50d1c2a3b4c5_add_unique_index_ngwmn_wellconstruction.py new file mode 100644 index 000000000..ceffbdaad --- /dev/null +++ b/alembic/versions/50d1c2a3b4c5_add_unique_index_ngwmn_wellconstruction.py @@ -0,0 +1,34 @@ +"""Add unique index for NGWMN well construction + +Revision ID: 50d1c2a3b4c5 +Revises: 43bc34504ee6 +Create Date: 2026-01-31 00:27:12.204176 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "50d1c2a3b4c5" +down_revision: Union[str, Sequence[str], None] = "43bc34504ee6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +INDEX_NAME = "uq_ngwmn_wc_point_casing_screen" +TABLE_NAME = "NMA_view_NGWMN_WellConstruction" + + +def upgrade() -> None: + op.create_index( + INDEX_NAME, + TABLE_NAME, + ["PointID", "CasingTop", "ScreenTop"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index(INDEX_NAME, table_name=TABLE_NAME) diff --git a/alembic/versions/71a4c6b3d2e8_add_nma_wclab_id_to_minor_trace.py b/alembic/versions/71a4c6b3d2e8_add_nma_wclab_id_to_minor_trace.py new file mode 100644 index 000000000..bebaf5dff --- /dev/null +++ b/alembic/versions/71a4c6b3d2e8_add_nma_wclab_id_to_minor_trace.py @@ -0,0 +1,29 @@ +"""Add nma_WCLab_ID column to NMA_MinorTraceChemistry + +Revision ID: 71a4c6b3d2e8 +Revises: 50d1c2a3b4c5 +Create Date: 2026-01-31 01:05:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "71a4c6b3d2e8" +down_revision: Union[str, Sequence[str], None] = "50d1c2a3b4c5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "NMA_MinorTraceChemistry", + sa.Column("nma_WCLab_ID", sa.String(length=25), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column("NMA_MinorTraceChemistry", "nma_WCLab_ID") diff --git a/alembic/versions/76e3ae8b99cb_enforce_thing_fk_for_nma_legacy_models.py b/alembic/versions/76e3ae8b99cb_enforce_thing_fk_for_nma_legacy_models.py index 33784c7e6..9f07be417 100644 --- a/alembic/versions/76e3ae8b99cb_enforce_thing_fk_for_nma_legacy_models.py +++ b/alembic/versions/76e3ae8b99cb_enforce_thing_fk_for_nma_legacy_models.py @@ -1,7 +1,7 @@ """enforce_thing_fk_for_nma_legacy_models Revision ID: 76e3ae8b99cb -Revises: c1d2e3f4a5b6 +Revises: e123456789ab Create Date: 2026-01-26 11:56:28.744603 Issue: #363 @@ -17,12 +17,12 @@ from typing import Sequence, Union -from alembic import op import sqlalchemy as sa +from alembic import op # revision identifiers, used by Alembic. revision: str = "76e3ae8b99cb" -down_revision: Union[str, Sequence[str], None] = "c1d2e3f4a5b6" +down_revision: Union[str, Sequence[str], None] = "e123456789ab" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/alembic/versions/e123456789ab_add_observation_data_quality.py b/alembic/versions/e123456789ab_add_observation_data_quality.py index 717a0c82e..0068fbf3e 100644 --- a/alembic/versions/e123456789ab_add_observation_data_quality.py +++ b/alembic/versions/e123456789ab_add_observation_data_quality.py @@ -1,7 +1,7 @@ """add nma_data_quality to observation Revision ID: e123456789ab -Revises: b12e3919077e +Revises: f0c9d8e7b6a5 Create Date: 2026-02-05 12:00:00.000000 """ @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision: str = "e123456789ab" -down_revision: Union[str, Sequence[str], None] = "b12e3919077e" +down_revision: Union[str, Sequence[str], None] = "f0c9d8e7b6a5" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py b/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py new file mode 100644 index 000000000..f825e81ae --- /dev/null +++ b/alembic/versions/e8a7c6b5d4f3_add_thing_id_to_nma_waterlevelscontinuous_pressure_daily.py @@ -0,0 +1,92 @@ +"""Add thing_id FK to NMA_WaterLevelsContinuous_Pressure_Daily. + +Revision ID: e8a7c6b5d4f3 +Revises: b12e3919077e +Create Date: 2026-01-29 12:45:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "e8a7c6b5d4f3" +down_revision: Union[str, Sequence[str], None] = "b12e3919077e" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add thing_id and FK to legacy pressure daily table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"] + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + if "thing_id" not in columns: + op.add_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + sa.Column("thing_id", sa.Integer(), nullable=True), + ) + + existing_fks = { + fk["name"] + for fk in inspector.get_foreign_keys("NMA_WaterLevelsContinuous_Pressure_Daily") + if fk.get("name") + } + if "fk_pressure_daily_thing" not in existing_fks: + op.create_foreign_key( + "fk_pressure_daily_thing", + "NMA_WaterLevelsContinuous_Pressure_Daily", + "thing", + ["thing_id"], + ["id"], + ondelete="CASCADE", + ) + + null_count = bind.execute( + sa.text( + 'SELECT COUNT(*) FROM "NMA_WaterLevelsContinuous_Pressure_Daily" ' + 'WHERE "thing_id" IS NULL' + ) + ).scalar() + if null_count == 0: + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "thing_id", + existing_type=sa.Integer(), + nullable=False, + ) + + +def downgrade() -> None: + """Remove thing_id FK from legacy pressure daily table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + existing_fks = { + fk["name"] + for fk in inspector.get_foreign_keys("NMA_WaterLevelsContinuous_Pressure_Daily") + if fk.get("name") + } + if "fk_pressure_daily_thing" in existing_fks: + op.drop_constraint( + "fk_pressure_daily_thing", + "NMA_WaterLevelsContinuous_Pressure_Daily", + type_="foreignkey", + ) + + columns = { + col["name"] + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + if "thing_id" in columns: + op.drop_column("NMA_WaterLevelsContinuous_Pressure_Daily", "thing_id") diff --git a/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py b/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py new file mode 100644 index 000000000..38d113068 --- /dev/null +++ b/alembic/versions/f0c9d8e7b6a5_align_pressure_daily_uuid_columns.py @@ -0,0 +1,85 @@ +"""Align UUID column types on NMA_WaterLevelsContinuous_Pressure_Daily. + +Revision ID: f0c9d8e7b6a5 +Revises: e8a7c6b5d4f3 +Create Date: 2026-01-29 12:55:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "f0c9d8e7b6a5" +down_revision: Union[str, Sequence[str], None] = "e8a7c6b5d4f3" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def _column_is_uuid(col) -> bool: + return isinstance(col.get("type"), postgresql.UUID) + + +def upgrade() -> None: + """Alter UUID columns to proper UUID types.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"]: col + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + + global_id_col = columns.get("GlobalID") + if global_id_col is not None and not _column_is_uuid(global_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "GlobalID", + type_=postgresql.UUID(as_uuid=True), + postgresql_using='"GlobalID"::uuid', + ) + + well_id_col = columns.get("WellID") + if well_id_col is not None and not _column_is_uuid(well_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "WellID", + type_=postgresql.UUID(as_uuid=True), + postgresql_using='"WellID"::uuid', + ) + + +def downgrade() -> None: + """Revert UUID columns back to strings.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + return + + columns = { + col["name"]: col + for col in inspector.get_columns("NMA_WaterLevelsContinuous_Pressure_Daily") + } + + global_id_col = columns.get("GlobalID") + if global_id_col is not None and _column_is_uuid(global_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "GlobalID", + type_=sa.String(length=40), + postgresql_using='"GlobalID"::text', + ) + + well_id_col = columns.get("WellID") + if well_id_col is not None and _column_is_uuid(well_id_col): + op.alter_column( + "NMA_WaterLevelsContinuous_Pressure_Daily", + "WellID", + type_=sa.String(length=40), + postgresql_using='"WellID"::text', + ) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 8717448bc..4b32fd064 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -69,7 +69,6 @@ from db.base import Base if TYPE_CHECKING: - from db.location import Location from db.thing import Thing @@ -87,11 +86,17 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): __tablename__ = "NMA_WaterLevelsContinuous_Pressure_Daily" - global_id: Mapped[str] = mapped_column("GlobalID", String(40), primary_key=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) object_id: Mapped[Optional[int]] = mapped_column( "OBJECTID", Integer, autoincrement=True ) - well_id: Mapped[Optional[str]] = mapped_column("WellID", String(40)) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) + # FK to Thing table - required for all WaterLevelsContinuous_Pressure_Daily records + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) date_measured: Mapped[datetime] = mapped_column( "DateMeasured", DateTime, nullable=False @@ -121,6 +126,10 @@ class NMA_WaterLevelsContinuous_Pressure_Daily(Base): checked_by: Mapped[Optional[str]] = mapped_column("CheckedBy", String(4)) cond_dl_ms_cm: Mapped[Optional[float]] = mapped_column("CONDDL (mS/cm)", Float) + thing: Mapped["Thing"] = relationship( + "Thing", back_populates="pressure_daily_levels" + ) + class NMA_view_NGWMN_WellConstruction(Base): """ @@ -648,6 +657,7 @@ class NMA_MinorTraceChemistry(Base): - nma_global_id: Original UUID PK, now UNIQUE for audit - chemistry_sample_info_id: Integer FK to NMA_Chemistry_SampleInfo.id - nma_chemistry_sample_info_uuid: Legacy UUID FK for audit + - nma_wclab_id: Legacy WCLab_ID string (audit) """ __tablename__ = "NMA_MinorTraceChemistry" @@ -695,6 +705,7 @@ class NMA_MinorTraceChemistry(Base): analyses_agency: Mapped[Optional[str]] = mapped_column( "analyses_agency", String(100) ) + nma_wclab_id: Mapped[Optional[str]] = mapped_column("nma_WCLab_ID", String(25)) # --- Relationships --- chemistry_sample_info: Mapped["NMA_Chemistry_SampleInfo"] = relationship( @@ -775,6 +786,7 @@ class NMA_Radionuclides(Base): chemistry_sample_info: Mapped["NMA_Chemistry_SampleInfo"] = relationship( "NMA_Chemistry_SampleInfo", back_populates="radionuclides" ) + thing: Mapped["Thing"] = relationship("Thing") @validates("thing_id") def validate_thing_id(self, key, value): diff --git a/db/thing.py b/db/thing.py index 96fb55361..bdfff8e58 100644 --- a/db/thing.py +++ b/db/thing.py @@ -54,6 +54,7 @@ NMA_Radionuclides, NMA_Soil_Rock_Results, NMA_Stratigraphy, + NMA_WaterLevelsContinuous_Pressure_Daily, ) @@ -361,6 +362,14 @@ class Thing( cascade="all, delete-orphan", passive_deletes=True, ) + pressure_daily_levels: Mapped[List["NMA_WaterLevelsContinuous_Pressure_Daily"]] = ( + relationship( + "NMA_WaterLevelsContinuous_Pressure_Daily", + back_populates="thing", + cascade="all, delete-orphan", + passive_deletes=True, + ) + ) # --- Association Proxies --- assets: AssociationProxy[list["Asset"]] = association_proxy( diff --git a/tests/integration/test_admin_minor_trace_chemistry.py b/tests/integration/test_admin_minor_trace_chemistry.py index 01fbe2ce6..fcdcd539a 100644 --- a/tests/integration/test_admin_minor_trace_chemistry.py +++ b/tests/integration/test_admin_minor_trace_chemistry.py @@ -31,8 +31,8 @@ from admin.views.minor_trace_chemistry import MinorTraceChemistryAdmin from db.engine import session_ctx from db.location import Location, LocationThingAssociation -from db.thing import Thing from db.nma_legacy import NMA_MinorTraceChemistry, NMA_Chemistry_SampleInfo +from db.thing import Thing ADMIN_IDENTITY = MinorTraceChemistryAdmin.identity ADMIN_BASE_URL = f"/admin/{ADMIN_IDENTITY}" diff --git a/tests/test_minor_trace_chemistry_transfer.py b/tests/test_minor_trace_chemistry_transfer.py new file mode 100644 index 000000000..2d38e1a19 --- /dev/null +++ b/tests/test_minor_trace_chemistry_transfer.py @@ -0,0 +1,38 @@ +import uuid + +import pandas as pd + +from transfers.minor_trace_chemistry_transfer import MinorTraceChemistryTransferer + + +def test_row_to_dict_includes_wclab_id(): + # Bypass __init__ so we can stub the cache without hitting the DB. + transfer = MinorTraceChemistryTransferer.__new__(MinorTraceChemistryTransferer) + sample_pt_id = uuid.uuid4() + transfer._sample_pt_ids = {sample_pt_id} + transfer._sample_info_cache = {sample_pt_id: 1} + transfer.flags = {} + transfer.errors = [] + + row = pd.Series( + { + "SamplePtID": str(sample_pt_id), + "GlobalID": str(uuid.uuid4()), + "SamplePointID": "POINT-1", + "Analyte": "Ca", + "SampleValue": 10.5, + "Units": "mg/L", + "Symbol": None, + "AnalysisMethod": "ICP", + "AnalysisDate": "2024-01-01 00:00:00.000", + "Notes": "note", + "AnalysesAgency": "Lab", + "Uncertainty": 0.1, + "Volume": "2", + "VolumeUnit": "L", + "WCLab_ID": "LAB-123", + } + ) + + row_dict = transfer._row_to_dict(row) + assert row_dict["nma_WCLab_ID"] == "LAB-123" diff --git a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py index 7328e4059..9b6a55dac 100644 --- a/tests/test_waterlevelscontinuous_pressure_daily_legacy.py +++ b/tests/test_waterlevelscontinuous_pressure_daily_legacy.py @@ -21,14 +21,17 @@ """ from datetime import datetime -from uuid import uuid4 +from uuid import UUID, uuid4 + +import pytest +from sqlalchemy.exc import IntegrityError, ProgrammingError from db.engine import session_ctx from db.nma_legacy import NMA_WaterLevelsContinuous_Pressure_Daily -def _next_global_id() -> str: - return str(uuid4()) +def _next_global_id() -> UUID: + return uuid4() def _next_object_id() -> int: @@ -37,15 +40,15 @@ def _next_object_id() -> int: # ===================== CREATE tests ========================== -def test_create_pressure_daily_all_fields(): +def test_create_pressure_daily_all_fields(water_well_thing): """Test creating a pressure daily record with required fields.""" with session_ctx() as session: now = datetime(2024, 1, 1, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), object_id=_next_object_id(), - well_id="WELL-1", - point_id="PD-1001", + well_id=uuid4(), + point_id=water_well_thing.name, date_measured=now, temperature_water=12.3, water_head=4.5, @@ -61,52 +64,55 @@ def test_create_pressure_daily_all_fields(): processed_by="AB", checked_by="CD", cond_dl_ms_cm=0.2, + thing_id=water_well_thing.id, ) session.add(record) session.commit() session.refresh(record) assert record.global_id is not None - assert record.point_id == "PD-1001" + assert record.point_id == water_well_thing.name assert record.date_measured == now session.delete(record) session.commit() -def test_create_pressure_daily_minimal(): +def test_create_pressure_daily_minimal(water_well_thing): """Test creating a pressure daily record with minimal fields.""" with session_ctx() as session: now = datetime(2024, 1, 2, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1002", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() session.refresh(record) assert record.global_id is not None - assert record.point_id == "PD-1002" + assert record.point_id == water_well_thing.name session.delete(record) session.commit() # ===================== READ tests ========================== -def test_read_pressure_daily_by_global_id(): +def test_read_pressure_daily_by_global_id(water_well_thing): """Test reading a pressure daily record by GlobalID.""" with session_ctx() as session: now = datetime(2024, 1, 3, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1003", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -116,23 +122,24 @@ def test_read_pressure_daily_by_global_id(): ) assert fetched is not None assert fetched.global_id == record.global_id - assert fetched.point_id == "PD-1003" + assert fetched.point_id == water_well_thing.name session.delete(record) session.commit() # ===================== UPDATE tests ========================== -def test_update_pressure_daily(): +def test_update_pressure_daily(water_well_thing): """Test updating a pressure daily record.""" with session_ctx() as session: now = datetime(2024, 1, 4, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1004", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -150,16 +157,17 @@ def test_update_pressure_daily(): # ===================== DELETE tests ========================== -def test_delete_pressure_daily(): +def test_delete_pressure_daily(water_well_thing): """Test deleting a pressure daily record.""" with session_ctx() as session: now = datetime(2024, 1, 5, 12, 0, 0) record = NMA_WaterLevelsContinuous_Pressure_Daily( global_id=_next_global_id(), - point_id="PD-1005", + point_id=water_well_thing.name, date_measured=now, created=now, updated=now, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -180,6 +188,7 @@ def test_pressure_daily_has_all_migrated_columns(): "global_id", "object_id", "well_id", + "thing_id", "point_id", "date_measured", "temperature_water", @@ -212,4 +221,52 @@ def test_pressure_daily_table_name(): ) +# ===================== Relational Integrity Tests ====================== + + +def test_pressure_daily_thing_id_required(): + """ + VERIFIES: 'thing_id IS NOT NULL' and Foreign Key presence. + Ensures the DB rejects records without a Thing linkage. + """ + with session_ctx() as session: + now = datetime(2024, 1, 6, 12, 0, 0) + record = NMA_WaterLevelsContinuous_Pressure_Daily( + global_id=_next_global_id(), + point_id="PD-1006", + date_measured=now, + created=now, + updated=now, + ) + session.add(record) + + with pytest.raises((IntegrityError, ProgrammingError)): + session.flush() + session.rollback() + + +def test_pressure_daily_invalid_thing_id_rejected(water_well_thing): + """ + VERIFIES: foreign key integrity on thing_id. + Ensures the DB rejects updates to a non-existent Thing. + """ + with session_ctx() as session: + now = datetime(2024, 1, 7, 12, 0, 0) + record = NMA_WaterLevelsContinuous_Pressure_Daily( + global_id=_next_global_id(), + point_id=water_well_thing.name, + date_measured=now, + created=now, + updated=now, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + + with pytest.raises((IntegrityError, ProgrammingError)): + record.thing_id = 999999 + session.flush() + session.rollback() + + # ============= EOF ============================================= diff --git a/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py b/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py new file mode 100644 index 000000000..a5616f81b --- /dev/null +++ b/tests/transfers/test_waterlevelscontinuous_pressure_daily_transfer.py @@ -0,0 +1,47 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +import pandas as pd + +from transfers.waterlevelscontinuous_pressure_daily import ( + NMA_WaterLevelsContinuous_Pressure_DailyTransferer, +) + + +def test_pressure_daily_transfer_filters_orphans(water_well_thing): + transferer = NMA_WaterLevelsContinuous_Pressure_DailyTransferer(batch_size=1) + df = pd.DataFrame( + [ + {"PointID": water_well_thing.name, "GlobalID": "gid-1"}, + {"PointID": "MISSING-THING", "GlobalID": "gid-2"}, + ] + ) + + filtered = transferer._filter_to_valid_things(df) + + assert list(filtered["PointID"]) == [water_well_thing.name] + + +def test_pressure_daily_row_dict_sets_thing_id(water_well_thing): + transferer = NMA_WaterLevelsContinuous_Pressure_DailyTransferer(batch_size=1) + row = {"PointID": water_well_thing.name, "GlobalID": "gid-3"} + + mapped = transferer._row_dict(row) + + assert mapped["thing_id"] == water_well_thing.id + + +# ============= EOF ============================================= diff --git a/transfers/associated_data.py b/transfers/associated_data.py index ca9195b06..6c667acaf 100644 --- a/transfers/associated_data.py +++ b/transfers/associated_data.py @@ -48,14 +48,27 @@ class AssociatedDataTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size - self._thing_id_cache: dict[str, int] = {} + self._thing_id_by_point_id: dict[str, int] = {} + self._thing_id_by_location_id: dict[str, int] = {} self._build_thing_id_cache() def _build_thing_id_cache(self) -> None: with session_ctx() as session: - things = session.query(Thing.name, Thing.id).all() - self._thing_id_cache = {name: thing_id for name, thing_id in things} - logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + things = session.query(Thing.id, Thing.name, Thing.nma_pk_location).all() + for thing_id, name, nma_pk_location in things: + if name: + point_key = self._normalize_point_id(name) + if point_key: + self._thing_id_by_point_id[point_key] = thing_id + if nma_pk_location: + key = self._normalize_location_id(nma_pk_location) + if key: + self._thing_id_by_location_id[key] = thing_id + logger.info( + "Built Thing caches with %s point ids and %s location ids", + len(self._thing_id_by_point_id), + len(self._thing_id_by_location_id), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -63,13 +76,27 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, cleaned_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows: list[dict[str, Any]] = [] + skipped_missing_thing = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_thing += 1 + continue + rows.append(record) + rows = self._dedupe_rows(rows, key="nma_AssocID") if not rows: logger.info("No AssociatedData rows to transfer") return + if skipped_missing_thing: + logger.warning( + "Skipped %s AssociatedData rows without matching Thing", + skipped_missing_thing, + ) + insert_stmt = insert(NMA_AssociatedData) excluded = insert_stmt.excluded @@ -96,22 +123,52 @@ def _transfer_hook(self, session: Session) -> None: session.execute(stmt) session.commit() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: point_id = row.get("PointID") + location_id = self._uuid_val(row.get("LocationId")) + thing_id = self._resolve_thing_id(point_id, location_id) + if thing_id is None: + logger.warning( + "Skipping AssociatedData PointID=%s LocationId=%s - Thing not found", + point_id, + location_id, + ) + return None + return { # Legacy UUID PK -> nma_assoc_id (unique audit column) "nma_AssocID": self._uuid_val(row.get("AssocID")), # Legacy ID columns (renamed with nma_ prefix) - "nma_LocationId": self._uuid_val(row.get("LocationId")), + "nma_LocationId": location_id, "nma_PointID": point_id, "nma_OBJECTID": row.get("OBJECTID"), # Data columns "Notes": row.get("Notes"), "Formation": row.get("Formation"), # FK to Thing - "thing_id": self._thing_id_cache.get(point_id), + "thing_id": thing_id, } + def _resolve_thing_id( + self, point_id: Optional[str], location_id: Optional[UUID] + ) -> Optional[int]: + if location_id is not None: + key = self._normalize_location_id(str(location_id)) + thing_id = self._thing_id_by_location_id.get(key) + if thing_id is not None: + return thing_id + if point_id: + return self._thing_id_by_point_id.get(self._normalize_point_id(point_id)) + return None + + @staticmethod + def _normalize_point_id(value: str) -> str: + return value.strip().upper() + + @staticmethod + def _normalize_location_id(value: str) -> str: + return value.strip().lower() + def _dedupe_rows( self, rows: list[dict[str, Any]], key: str ) -> list[dict[str, Any]]: diff --git a/transfers/logger.py b/transfers/logger.py index a5fd62414..decf34d0c 100644 --- a/transfers/logger.py +++ b/transfers/logger.py @@ -21,18 +21,6 @@ from services.gcs_helper import get_storage_bucket -# class StreamToLogger: -# def __init__(self, logger_, level): -# self.logger = logger_ -# self.level = level -# self.linebuf = "" -# -# def write(self, buf): -# for line in buf.rstrip().splitlines(): -# self.logger.log(self.level, line.rstrip()) -# -# def flush(self): -# pass root = Path("logs") if not os.getcwd().endswith("transfers"): root = Path("transfers") / root @@ -40,7 +28,8 @@ if not os.path.exists(root): os.mkdir(root) -log_filename = root / f"transfer_{datetime.now():%Y-%m-%dT%H_%M_%S}.log" +log_filename = f"transfer_{datetime.now():%Y-%m-%dT%H_%M_%S}.log" +log_path = root / log_filename logging.basicConfig( @@ -48,7 +37,7 @@ format="%(asctime)s [%(levelname)-8s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), - logging.FileHandler(log_filename, mode="w", encoding="utf-8"), + logging.FileHandler(log_path, mode="w", encoding="utf-8"), ], force=True, ) @@ -61,14 +50,11 @@ # workaround to not redirect httpx logging logging.getLogger("httpx").setLevel(logging.WARNING) -# redirect stderr to the logger -# sys.stderr = StreamToLogger(logger, logging.ERROR) - def save_log_to_bucket(): bucket = get_storage_bucket() blob = bucket.blob(f"transfer_logs/{log_filename}") - blob.upload_from_filename(log_filename) + blob.upload_from_filename(log_path) logger.info(f"Uploaded log to gs://{bucket.name}/transfer_logs/{log_filename}") diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index daeef7923..5f84bfda6 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -157,6 +157,7 @@ def _transfer_hook(self, session: Session) -> None: "uncertainty": excluded.uncertainty, "volume": excluded.volume, "volume_unit": excluded.volume_unit, + "nma_WCLab_ID": excluded.nma_WCLab_ID, }, ) session.execute(stmt) @@ -194,7 +195,8 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: ) return None - return { + wclab_id = self._safe_str(row, "WCLab_ID") + row_dict = { # Legacy UUID PK -> nma_global_id (unique audit column) "nma_GlobalID": nma_global_id, # New Integer FK to ChemistrySampleInfo @@ -213,7 +215,9 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: "uncertainty": self._safe_float(row, "Uncertainty"), "volume": self._safe_int(row, "Volume"), "volume_unit": self._safe_str(row, "VolumeUnit"), + "nma_WCLab_ID": wclab_id, } + return row_dict def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" diff --git a/transfers/soil_rock_results.py b/transfers/soil_rock_results.py index 1aae4e3ad..fd3894e52 100644 --- a/transfers/soil_rock_results.py +++ b/transfers/soil_rock_results.py @@ -42,14 +42,27 @@ class SoilRockResultsTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size - self._thing_id_cache: dict[str, int] = {} + self._thing_id_by_point_id: dict[str, int] = {} + self._thing_id_by_location_id: dict[str, int] = {} self._build_thing_id_cache() def _build_thing_id_cache(self) -> None: with session_ctx() as session: - things = session.query(Thing.name, Thing.id).all() - self._thing_id_cache = {name: thing_id for name, thing_id in things} - logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + things = session.query(Thing.id, Thing.name, Thing.nma_pk_location).all() + for thing_id, name, nma_pk_location in things: + if name: + point_key = self._normalize_point_id(name) + if point_key: + self._thing_id_by_point_id[point_key] = thing_id + if nma_pk_location: + loc_key = self._normalize_location_id(nma_pk_location) + if loc_key: + self._thing_id_by_location_id[loc_key] = thing_id + logger.info( + "Built Thing caches with %s point ids and %s location ids", + len(self._thing_id_by_point_id), + len(self._thing_id_by_location_id), + ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -57,12 +70,25 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, cleaned_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows: list[dict[str, Any]] = [] + skipped_missing_thing = 0 + for raw in self.cleaned_df.to_dict("records"): + record = self._row_dict(raw) + if record is None: + skipped_missing_thing += 1 + continue + rows.append(record) if not rows: logger.info("No Soil_Rock_Results rows to transfer") return + if skipped_missing_thing: + logger.warning( + "Skipped %s Soil_Rock_Results rows without matching Thing", + skipped_missing_thing, + ) + for i in range(0, len(rows), self.batch_size): chunk = rows[i : i + self.batch_size] logger.info( @@ -74,8 +100,16 @@ def _transfer_hook(self, session: Session) -> None: session.bulk_insert_mappings(NMA_Soil_Rock_Results, chunk) session.commit() - def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: point_id = row.get("Point_ID") + thing_id = self._resolve_thing_id(point_id) + if thing_id is None: + logger.warning( + "Skipping Soil_Rock_Results Point_ID=%s - Thing not found", + point_id, + ) + return None + return { # Legacy ID column (use Python attribute name for bulk_insert_mappings) "nma_point_id": point_id, @@ -86,9 +120,28 @@ def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: "d18o": self._float_val(row.get("d18O")), "sampled_by": row.get("Sampled by"), # FK to Thing - "thing_id": self._thing_id_cache.get(point_id), + "thing_id": thing_id, } + def _resolve_thing_id(self, point_id: Optional[str]) -> Optional[int]: + if point_id is None: + return None + + key = self._normalize_location_id(point_id) + thing_id = self._thing_id_by_location_id.get(key) + if thing_id is not None: + return thing_id + + return self._thing_id_by_point_id.get(self._normalize_point_id(point_id)) + + @staticmethod + def _normalize_point_id(value: str) -> str: + return str(value).strip().upper() + + @staticmethod + def _normalize_location_id(value: str) -> str: + return str(value).strip().lower() + def _float_val(self, value: Any) -> Optional[float]: if value is None or pd.isna(value): return None diff --git a/transfers/stratigraphy_legacy.py b/transfers/stratigraphy_legacy.py index 82bf8a3a5..79803d7a6 100644 --- a/transfers/stratigraphy_legacy.py +++ b/transfers/stratigraphy_legacy.py @@ -128,8 +128,8 @@ def _row_dict(self, row: pd.Series) -> Dict[str, Any] | None: # FK to Thing "thing_id": thing_id, # Data columns - "StratTop": self._float_value(getattr(row, "StratTop", None)), - "StratBottom": self._float_value(getattr(row, "StratBottom", None)), + "StratTop": self._int_value(getattr(row, "StratTop", None)), + "StratBottom": self._int_value(getattr(row, "StratBottom", None)), "UnitIdentifier": self._string_value(getattr(row, "UnitIdentifier", None)), "Lithology": self._string_value(getattr(row, "Lithology", None)), "LithologicModifier": self._string_value( @@ -164,7 +164,7 @@ def _int_value(self, value: Any) -> int | None: if value in (None, ""): return None try: - return int(value) + return int(float(value)) except (TypeError, ValueError): return None diff --git a/transfers/transfer.py b/transfers/transfer.py index caaa97945..13c0a1673 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -43,7 +43,6 @@ from transfers.metrics import Metrics from transfers.profiling import ( - TransferProfiler, ProfileArtifact, upload_profile_artifacts, ) @@ -205,7 +204,11 @@ def _execute_transfer_with_timing(name: str, klass, flags: dict = None): """Execute transfer and return timing info.""" start = time.time() logger.info(f"Starting parallel transfer: {name}") - result = _execute_transfer(klass, flags) + effective_flags = dict(flags or {}) + yield_transfer_limit = effective_flags.get("LIMIT", 0) + if yield_transfer_limit: + effective_flags["LIMIT"] = max(1, yield_transfer_limit // 10) + result = _execute_transfer(klass, effective_flags) elapsed = time.time() - start logger.info(f"Completed parallel transfer: {name} in {elapsed:.2f}s") return name, result, elapsed @@ -216,7 +219,8 @@ def _execute_session_transfer_with_timing(name: str, transfer_func, limit: int): start = time.time() logger.info(f"Starting parallel transfer: {name}") with session_ctx() as session: - result = transfer_func(session, limit=limit) + effective_limit = max(1, limit // 10) if limit else 0 + result = transfer_func(session, limit=effective_limit) elapsed = time.time() - start logger.info(f"Completed parallel transfer: {name} in {elapsed:.2f}s") return name, result, elapsed @@ -256,6 +260,7 @@ def _drop_and_rebuild_db() -> None: with session_ctx() as session: recreate_public_schema(session) logger.info("Running Alembic migrations") + try: command.upgrade(_alembic_config(), "head") except SystemExit as exc: @@ -285,15 +290,28 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): logger.info("Erase and rebuilding database") erase_and_rebuild_db() + # Get transfer flags + message("TRANSFER OPTIONS") + transfer_options = load_transfer_options() + logger.info( + "Transfer options: %s", + { + field: getattr(transfer_options, field) + for field in transfer_options.__dataclass_fields__ + }, + ) + flags = {"TRANSFER_ALL_WELLS": True, "LIMIT": limit} + message("TRANSFER_FLAGS") + logger.info(flags) profile_artifacts: list[ProfileArtifact] = [] - water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) + continuous_water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) # ========================================================================= # PHASE 1: Foundation (Parallel - these are independent of each other) # ========================================================================= - if water_levels_only: + if continuous_water_levels_only: logger.info("CONTINUOUS_WATER_LEVELS set; running only continuous transfers") _run_continuous_water_level_transfers( metrics, flags, profile_waterlevels, profile_artifacts @@ -372,71 +390,49 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): ) except Exception as e: logger.critical(f"Non-well transfer {name} failed: {e}") - use_parallel = get_bool_env("TRANSFER_PARALLEL", True) - if use_parallel: - _transfer_parallel( - metrics, - flags, - limit, - transfer_options, - profile_waterlevels, - profile_artifacts, - ) - else: - _transfer_sequential( - metrics, - flags, - limit, - transfer_options, - profile_waterlevels, - profile_artifacts, - ) + _transfer_parallel( + metrics, + flags, + limit, + transfer_options, + ) return profile_artifacts -def _run_water_level_transfers( - metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] -): - message("WATER LEVEL TRANSFERS ONLY") - - results = _execute_transfer(WaterLevelTransferer, flags=flags) - metrics.water_level_metrics(*results) - - _run_continuous_water_level_transfers( - metrics, flags, profile_waterlevels, profile_artifacts - ) +def _run_continuous_water_level_transfers(metrics, flags): + message("CONTINUOUS WATER LEVEL TRANSFERS") + # ========================================================================= + # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) + # ========================================================================= + message("PARALLEL TRANSFER GROUP 2 (Continuous Water Levels)") -def _run_continuous_water_level_transfers( - metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] -): - message("CONTINUOUS WATER LEVEL TRANSFERS") + parallel_tasks = [ + ("Pressure", WaterLevelsContinuousPressureTransferer), + ("Acoustic", WaterLevelsContinuousAcousticTransferer), + ] + results_map = {} + with ThreadPoolExecutor(max_workers=2) as executor: + futures = {} + for name, klass in parallel_tasks: + future = executor.submit(_execute_transfer_with_timing, name, klass, flags) + futures[future] = name - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_pressure") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousPressureTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousPressureTransferer, flags=flags - ) - metrics.pressure_metrics(*results) + for future in as_completed(futures): + name = futures[future] + try: + result_name, result, elapsed = future.result() + results_map[result_name] = result + logger.info(f"Parallel task {result_name} completed in {elapsed:.2f}s") + except Exception as e: + logger.critical(f"Parallel task {name} failed: {e}") - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_acoustic") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousAcousticTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousAcousticTransferer, flags=flags - ) - metrics.acoustic_metrics(*results) + if "Pressure" in results_map and results_map["Pressure"]: + metrics.pressure_metrics(*results_map["Pressure"]) + if "Acoustic" in results_map and results_map["Acoustic"]: + metrics.acoustic_metrics(*results_map["Acoustic"]) def _transfer_parallel( @@ -444,8 +440,6 @@ def _transfer_parallel( flags, limit, transfer_options: TransferOptions, - profile_waterlevels: bool, - profile_artifacts, ): """Execute transfers in parallel where possible.""" message("PARALLEL TRANSFER GROUP 1") @@ -457,54 +451,49 @@ def _transfer_parallel( parallel_tasks_1 = [] if opts.transfer_screens: - parallel_tasks_1.append(("WellScreens", WellScreenTransferer, flags)) + parallel_tasks_1.append(("WellScreens", WellScreenTransferer)) if opts.transfer_contacts: - parallel_tasks_1.append(("Contacts", ContactTransfer, flags)) + parallel_tasks_1.append(("Contacts", ContactTransfer)) if opts.transfer_waterlevels: - parallel_tasks_1.append(("WaterLevels", WaterLevelTransferer, flags)) + parallel_tasks_1.append(("WaterLevels", WaterLevelTransferer)) if opts.transfer_link_ids: - parallel_tasks_1.append(("LinkIdsWellData", LinkIdsWellDataTransferer, flags)) - parallel_tasks_1.append( - ("LinkIdsLocation", LinkIdsLocationDataTransferer, flags) - ) + parallel_tasks_1.append(("LinkIdsWellData", LinkIdsWellDataTransferer)) + parallel_tasks_1.append(("LinkIdsLocation", LinkIdsLocationDataTransferer)) if opts.transfer_groups: - parallel_tasks_1.append(("Groups", ProjectGroupTransferer, flags)) + parallel_tasks_1.append(("Groups", ProjectGroupTransferer)) if opts.transfer_surface_water_photos: - parallel_tasks_1.append( - ("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer, flags) - ) + parallel_tasks_1.append(("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer)) if opts.transfer_soil_rock_results: - parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer, flags)) + parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer)) if opts.transfer_weather_photos: - parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer, flags)) + parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer)) if opts.transfer_assets: - parallel_tasks_1.append(("Assets", AssetTransferer, flags)) + parallel_tasks_1.append(("Assets", AssetTransferer)) if opts.transfer_associated_data: - parallel_tasks_1.append(("AssociatedData", AssociatedDataTransferer, flags)) + parallel_tasks_1.append(("AssociatedData", AssociatedDataTransferer)) if opts.transfer_surface_water_data: - parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer, flags)) + parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer)) if opts.transfer_hydraulics_data: - parallel_tasks_1.append(("HydraulicsData", HydraulicsDataTransferer, flags)) + parallel_tasks_1.append(("HydraulicsData", HydraulicsDataTransferer)) if opts.transfer_chemistry_sampleinfo: - parallel_tasks_1.append( - ("ChemistrySampleInfo", ChemistrySampleInfoTransferer, flags) - ) + parallel_tasks_1.append(("ChemistrySampleInfo", ChemistrySampleInfoTransferer)) if opts.transfer_ngwmn_views: parallel_tasks_1.append( - ("NGWMNWellConstruction", NGWMNWellConstructionTransferer, flags) + ("NGWMNWellConstruction", NGWMNWellConstructionTransferer) ) - parallel_tasks_1.append(("NGWMNWaterLevels", NGWMNWaterLevelsTransferer, flags)) - parallel_tasks_1.append(("NGWMNLithology", NGWMNLithologyTransferer, flags)) + parallel_tasks_1.append(("NGWMNWaterLevels", NGWMNWaterLevelsTransferer)) + parallel_tasks_1.append(("NGWMNLithology", NGWMNLithologyTransferer)) if opts.transfer_pressure_daily: parallel_tasks_1.append( ( "WaterLevelsPressureDaily", NMA_WaterLevelsContinuous_Pressure_DailyTransferer, - flags, ) ) if opts.transfer_weather_data: - parallel_tasks_1.append(("WeatherData", WeatherDataTransferer, flags)) + parallel_tasks_1.append(("WeatherData", WeatherDataTransferer)) + if opts.transfer_nma_stratigraphy: + parallel_tasks_1.append(("StratigraphyLegacy", StratigraphyLegacyTransferer)) # Track results for metrics results_map = {} @@ -514,29 +503,17 @@ def _transfer_parallel( futures = {} # Submit class-based transfers - for name, klass, task_flags in parallel_tasks_1: - future = executor.submit( - _execute_transfer_with_timing, name, klass, task_flags - ) + for name, klass in parallel_tasks_1: + future = executor.submit(_execute_transfer_with_timing, name, klass, flags) futures[future] = name - # Submit session-based transfers - if opts.transfer_nma_stratigraphy: - future = executor.submit( - _execute_transfer_with_timing, - "StratigraphyLegacy", - StratigraphyLegacyTransferer, - flags, - ) - futures[future] = "StratigraphyLegacy" - future = executor.submit( _execute_session_transfer_with_timing, - "Stratigraphy", + "StratigraphyNew", transfer_stratigraphy, limit, ) - futures[future] = "Stratigraphy" + futures[future] = "StratigraphyNew" future = executor.submit(_execute_permissions_with_timing, "Permissions") futures[future] = "Permissions" @@ -556,8 +533,8 @@ def _transfer_parallel( metrics.well_screen_metrics(*results_map["WellScreens"]) if "Contacts" in results_map and results_map["Contacts"]: metrics.contact_metrics(*results_map["Contacts"]) - if "Stratigraphy" in results_map and results_map["Stratigraphy"]: - metrics.stratigraphy_metrics(*results_map["Stratigraphy"]) + if "StratigraphyNew" in results_map and results_map["StratigraphyNew"]: + metrics.stratigraphy_metrics(*results_map["StratigraphyNew"]) if "StratigraphyLegacy" in results_map and results_map["StratigraphyLegacy"]: metrics.nma_stratigraphy_metrics(*results_map["StratigraphyLegacy"]) if "AssociatedData" in results_map and results_map["AssociatedData"]: @@ -599,6 +576,7 @@ def _transfer_parallel( metrics.weather_data_metrics(*results_map["WeatherData"]) if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) @@ -627,222 +605,12 @@ def _transfer_parallel( results = _execute_transfer(SensorTransferer, flags=flags) metrics.sensor_metrics(*results) - # ========================================================================= - # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) - # ========================================================================= - if opts.transfer_pressure or opts.transfer_acoustic: - message("PARALLEL TRANSFER GROUP 2 (Continuous Water Levels)") - - parallel_tasks_2 = [] - if opts.transfer_pressure: - parallel_tasks_2.append( - ("Pressure", WaterLevelsContinuousPressureTransferer, flags) - ) - if opts.transfer_acoustic: - parallel_tasks_2.append( - ("Acoustic", WaterLevelsContinuousAcousticTransferer, flags) - ) - - if profile_waterlevels: - for name, klass, task_flags in parallel_tasks_2: - profiler = TransferProfiler(f"waterlevels_continuous_{name.lower()}") - results, artifact = profiler.run(_execute_transfer, klass, task_flags) - profile_artifacts.append(artifact) - results_map[name] = results - else: - with ThreadPoolExecutor(max_workers=2) as executor: - futures = {} - for name, klass, task_flags in parallel_tasks_2: - future = executor.submit( - _execute_transfer_with_timing, name, klass, task_flags - ) - futures[future] = name - - for future in as_completed(futures): - name = futures[future] - try: - result_name, result, elapsed = future.result() - results_map[result_name] = result - logger.info( - f"Parallel task {result_name} completed in {elapsed:.2f}s" - ) - except Exception as e: - logger.critical(f"Parallel task {name} failed: {e}") - - if "Pressure" in results_map and results_map["Pressure"]: - metrics.pressure_metrics(*results_map["Pressure"]) - if "Acoustic" in results_map and results_map["Acoustic"]: - metrics.acoustic_metrics(*results_map["Acoustic"]) - - -def _transfer_sequential( - metrics, - flags, - limit, - transfer_options: TransferOptions, - profile_waterlevels: bool, - profile_artifacts, -): - """Original sequential transfer logic.""" - opts = transfer_options - if opts.transfer_screens: - with transfer_context("WELL SCREENS"): - results = _execute_transfer(WellScreenTransferer, flags=flags) - metrics.well_screen_metrics(*results) - - if opts.transfer_sensors: - with transfer_context("SENSORS"): - results = _execute_transfer(SensorTransferer, flags=flags) - metrics.sensor_metrics(*results) - - if opts.transfer_contacts: - with transfer_context("CONTACTS"): - results = _execute_transfer(ContactTransfer, flags=flags) - metrics.contact_metrics(*results) - - with transfer_context("PERMISSIONS"): - with session_ctx() as session: - transfer_permissions(session) - - if opts.transfer_nma_stratigraphy: - with transfer_context("NMA STRATIGRAPHY"): - results = _execute_transfer(StratigraphyLegacyTransferer, flags=flags) - metrics.nma_stratigraphy_metrics(*results) - - with transfer_context("STRATIGRAPHY"): - with session_ctx() as session: - results = transfer_stratigraphy(session, limit=limit) - metrics.stratigraphy_metrics(*results) - - if opts.transfer_waterlevels: - with transfer_context("WATER LEVELS"): - results = _execute_transfer(WaterLevelTransferer, flags=flags) - metrics.water_level_metrics(*results) - - if opts.transfer_link_ids: - message("TRANSFERRING LINK IDS") - results = _execute_transfer(LinkIdsWellDataTransferer, flags=flags) - metrics.welldata_link_ids_metrics(*results) - results = _execute_transfer(LinkIdsLocationDataTransferer, flags=flags) - metrics.location_link_ids_metrics(*results) - - if opts.transfer_groups: - message("TRANSFERRING GROUPS") - results = _execute_transfer(ProjectGroupTransferer, flags=flags) - metrics.group_metrics(*results) - - if opts.transfer_surface_water_photos: - message("TRANSFERRING SURFACE WATER PHOTOS") - results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) - metrics.surface_water_photos_metrics(*results) - - if opts.transfer_soil_rock_results: - message("TRANSFERRING SOIL ROCK RESULTS") - results = _execute_transfer(SoilRockResultsTransferer, flags=flags) - metrics.soil_rock_results_metrics(*results) - - if opts.transfer_weather_photos: - message("TRANSFERRING WEATHER PHOTOS") - results = _execute_transfer(WeatherPhotosTransferer, flags=flags) - metrics.weather_photos_metrics(*results) - - if opts.transfer_assets: - message("TRANSFERRING ASSETS") - results = _execute_transfer(AssetTransferer, flags=flags) - metrics.asset_metrics(*results) - - if opts.transfer_associated_data: - message("TRANSFERRING ASSOCIATED DATA") - results = _execute_transfer(AssociatedDataTransferer, flags=flags) - metrics.associated_data_metrics(*results) - - if opts.transfer_surface_water_data: - message("TRANSFERRING SURFACE WATER DATA") - results = _execute_transfer(SurfaceWaterDataTransferer, flags=flags) - metrics.surface_water_data_metrics(*results) - - if opts.transfer_hydraulics_data: - message("TRANSFERRING HYDRAULICS DATA") - results = _execute_transfer(HydraulicsDataTransferer, flags=flags) - metrics.hydraulics_data_metrics(*results) - - if opts.transfer_chemistry_sampleinfo: - message("TRANSFERRING CHEMISTRY SAMPLEINFO") - results = _execute_transfer(ChemistrySampleInfoTransferer, flags=flags) - metrics.chemistry_sampleinfo_metrics(*results) - - if opts.transfer_field_parameters: - message("TRANSFERRING FIELD PARAMETERS") - results = _execute_transfer(FieldParametersTransferer, flags=flags) - metrics.field_parameters_metrics(*results) - - if opts.transfer_major_chemistry: - message("TRANSFERRING MAJOR CHEMISTRY") - results = _execute_transfer(MajorChemistryTransferer, flags=flags) - metrics.major_chemistry_metrics(*results) - - if opts.transfer_radionuclides: - message("TRANSFERRING RADIONUCLIDES") - results = _execute_transfer(RadionuclidesTransferer, flags=flags) - metrics.radionuclides_metrics(*results) - - if opts.transfer_ngwmn_views: - message("TRANSFERRING NGWMN WELL CONSTRUCTION") - results = _execute_transfer(NGWMNWellConstructionTransferer, flags=flags) - metrics.ngwmn_well_construction_metrics(*results) - message("TRANSFERRING NGWMN WATER LEVELS") - results = _execute_transfer(NGWMNWaterLevelsTransferer, flags=flags) - metrics.ngwmn_water_levels_metrics(*results) - message("TRANSFERRING NGWMN LITHOLOGY") - results = _execute_transfer(NGWMNLithologyTransferer, flags=flags) - metrics.ngwmn_lithology_metrics(*results) - - if opts.transfer_pressure_daily: - message("TRANSFERRING WATER LEVELS PRESSURE DAILY") - results = _execute_transfer( - NMA_WaterLevelsContinuous_Pressure_DailyTransferer, flags=flags - ) - metrics.waterlevels_pressure_daily_metrics(*results) - - if opts.transfer_weather_data: - message("TRANSFERRING WEATHER DATA") - results = _execute_transfer(WeatherDataTransferer, flags=flags) - metrics.weather_data_metrics(*results) - - if opts.transfer_minor_trace_chemistry: - message("TRANSFERRING MINOR TRACE CHEMISTRY") - results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags) - metrics.minor_trace_chemistry_metrics(*results) - - if opts.transfer_pressure: - message("TRANSFERRING WATER LEVELS PRESSURE") - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_pressure") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousPressureTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousPressureTransferer, flags=flags - ) - metrics.pressure_metrics(*results) - - if opts.transfer_acoustic: - message("TRANSFERRING WATER LEVELS ACOUSTIC") - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_acoustic") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousAcousticTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousAcousticTransferer, flags=flags - ) - metrics.acoustic_metrics(*results) - - return profile_artifacts + # # ========================================================================= + # # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) + # # ========================================================================= + # Continuous water levels handled separately in _run_continuous_water_level_transfers() + # the transfer process is bisected because the continuous water levels process is + # very time consuming and we want to run it alone in its own phase. def main(): @@ -871,9 +639,10 @@ def main(): metrics, limit=limit, profile_waterlevels=profile_waterlevels ) - message("CLEANING UP LOCATIONS") - with session_ctx() as session: - cleanup_locations(session) + if get_bool_env("CLEANUP_LOCATIONS", True): + message("CLEANING UP LOCATIONS") + with session_ctx() as session: + cleanup_locations(session) metrics.close() metrics.save_to_storage_bucket() diff --git a/transfers/waterlevelscontinuous_pressure_daily.py b/transfers/waterlevelscontinuous_pressure_daily.py index c41423f78..6caa348c3 100644 --- a/transfers/waterlevelscontinuous_pressure_daily.py +++ b/transfers/waterlevelscontinuous_pressure_daily.py @@ -22,7 +22,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_WaterLevelsContinuous_Pressure_Daily +from db import NMA_WaterLevelsContinuous_Pressure_Daily, Thing +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import read_csv @@ -41,6 +42,30 @@ class NMA_WaterLevelsContinuous_Pressure_DailyTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + + def _filter_to_valid_things(self, df: pd.DataFrame) -> pd.DataFrame: + valid_point_ids = set(self._thing_id_cache.keys()) + before_count = len(df) + filtered_df = df[df["PointID"].isin(valid_point_ids)].copy() + after_count = len(filtered_df) + if before_count > after_count: + skipped = before_count - after_count + logger.warning( + "Filtered out %s WaterLevelsContinuous_Pressure_Daily records without matching Things " + "(%s valid, %s orphan records prevented)", + skipped, + after_count, + skipped, + ) + return filtered_df def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: # Parse key datetime columns eagerly to avoid per-row parsing later. @@ -48,8 +73,8 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: self.source_table, parse_dates=["DateMeasured", "Created", "Updated"], ) - # No special cleaning/validation beyond raw import; keep identical copy. - return input_df, input_df + cleaned_df = self._filter_to_valid_things(input_df) + return input_df, cleaned_df def _transfer_hook(self, session: Session) -> None: rows = self._dedupe_rows( @@ -71,6 +96,7 @@ def _transfer_hook(self, session: Session) -> None: "OBJECTID": excluded.OBJECTID, "WellID": excluded.WellID, "PointID": excluded.PointID, + "thing_id": excluded.thing_id, "DateMeasured": excluded.DateMeasured, "TemperatureWater": excluded.TemperatureWater, "WaterHead": excluded.WaterHead, @@ -104,6 +130,7 @@ def val(key: str) -> Optional[Any]: "OBJECTID": val("OBJECTID"), "WellID": val("WellID"), "PointID": val("PointID"), + "thing_id": self._thing_id_cache.get(val("PointID")), "DateMeasured": val("DateMeasured"), "TemperatureWater": val("TemperatureWater"), "WaterHead": val("WaterHead"), diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 154be399b..680615cb7 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -862,6 +862,10 @@ def transfer_parallel(self, num_workers: int = None) -> None: # Load dataframes self.input_df, self.cleaned_df = self._get_dfs() df = self.cleaned_df + limit = self.flags.get("LIMIT", 0) + if limit > 0: + df = df.head(limit) + self.cleaned_df = df n = len(df) if n == 0: