Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion AGENTS.MD
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ these transfers, keep the following rules in mind to avoid hour-long runs:
right instance before running destructive suites.
- When done, `deactivate` to exit the venv and avoid polluting other shells.

## 3. Data migrations must be idempotent
- Data migrations should be safe to re-run without creating duplicate rows or corrupting data.
- Use upserts or duplicate checks and update source fields only after successful inserts.

Following this playbook keeps ETL runs measured in seconds/minutes instead of hours. EOF

## Activate python venv
Always use `source .venv/bin/activate` to activate the venv running python
Always use `source .venv/bin/activate` to activate the venv running python
200 changes: 160 additions & 40 deletions cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,57 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
import click
from enum import Enum
from pathlib import Path

import typer
from dotenv import load_dotenv

load_dotenv()

cli = typer.Typer(help="Command line interface for managing the application.")
water_levels = typer.Typer(help="Water-level utilities")
data_migrations = typer.Typer(help="Data migration utilities")
cli.add_typer(water_levels, name="water-levels")
cli.add_typer(data_migrations, name="data-migrations")


@click.group()
def cli():
"""Command line interface for managing the application."""
pass
class OutputFormat(str, Enum):
json = "json"


@cli.command()
@cli.command("initialize-lexicon")
def initialize_lexicon():
from core.initializers import init_lexicon

init_lexicon()


@cli.command()
@click.argument(
"root_directory",
type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True),
)
def associate_assets_command(root_directory: str):
@cli.command("associate-assets")
def associate_assets_command(
root_directory: str = typer.Argument(
...,
exists=True,
file_okay=False,
dir_okay=True,
readable=True,
)
):
from cli.service_adapter import associate_assets

associate_assets(root_directory)


@cli.command()
@click.argument(
"file_path",
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
)
def well_inventory_csv(file_path: str):
@cli.command("well-inventory-csv")
def well_inventory_csv(
file_path: str = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
)
):
"""
parse and upload a csv to database
"""
Expand All @@ -58,38 +73,143 @@ def well_inventory_csv(file_path: str):
well_inventory_csv(file_path)


@cli.group()
def water_levels():
"""Water-level utilities"""
pass


@water_levels.command("bulk-upload")
@click.option(
"--file",
"file_path",
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
required=True,
help="Path to CSV file containing water level rows",
)
@click.option(
"--output",
"output_format",
type=click.Choice(["json"], case_sensitive=False),
default=None,
help="Optional output format",
)
def water_levels_bulk_upload(file_path: str, output_format: str | None):
def water_levels_bulk_upload(
file_path: str = typer.Option(
...,
"--file",
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
help="Path to CSV file containing water level rows",
),
output_format: OutputFormat | None = typer.Option(
None,
"--output",
help="Optional output format",
),
):
"""
parse and upload a csv
"""
# TODO: use the same helper function used by api to parse and upload a WL csv
from cli.service_adapter import water_levels_csv

pretty_json = (output_format or "").lower() == "json"
pretty_json = output_format == OutputFormat.json
water_levels_csv(file_path, pretty_json=pretty_json)


@data_migrations.command("list")
def data_migrations_list():
from data_migrations.registry import list_migrations

migrations = list_migrations()
if not migrations:
typer.echo("No data migrations registered.")
return
for migration in migrations:
repeatable = " (repeatable)" if migration.is_repeatable else ""
typer.echo(f"{migration.id}: {migration.name}{repeatable}")


@data_migrations.command("status")
def data_migrations_status():
from db.engine import session_ctx
from data_migrations.runner import get_status

with session_ctx() as session:
statuses = get_status(session)
if not statuses:
typer.echo("No data migrations registered.")
return
for status in statuses:
last_applied = (
status.last_applied_at.isoformat() if status.last_applied_at else "never"
)
typer.echo(
f"{status.id}: applied {status.applied_count} time(s), last={last_applied}"
)


@data_migrations.command("run")
def data_migrations_run(
migration_id: str = typer.Argument(...),
force: bool = typer.Option(
False, "--force", help="Re-run even if already applied."
),
):
from db.engine import session_ctx
from data_migrations.runner import run_migration_by_id

with session_ctx() as session:
ran = run_migration_by_id(session, migration_id, force=force)
typer.echo("applied" if ran else "skipped")


@data_migrations.command("run-all")
def data_migrations_run_all(
include_repeatable: bool = typer.Option(
False,
"--include-repeatable/--exclude-repeatable",
help="Whether to include repeatable migrations.",
),
force: bool = typer.Option(
False, "--force", help="Re-run non-repeatable migrations."
),
):
from db.engine import session_ctx
from data_migrations.runner import run_all

with session_ctx() as session:
ran = run_all(session, include_repeatable=include_repeatable, force=force)
typer.echo(f"applied {len(ran)} migration(s)")


@cli.command("alembic-upgrade-and-data")
def alembic_upgrade_and_data(
revision: str = typer.Argument("head"),
include_repeatable: bool = typer.Option(
False,
"--include-repeatable/--exclude-repeatable",
help="Whether to include repeatable migrations.",
),
force: bool = typer.Option(
False, "--force", help="Re-run non-repeatable migrations."
),
):
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from db.engine import engine, session_ctx
from data_migrations.runner import run_all

root = Path(__file__).resolve().parents[1]
cfg = Config(str(root / "alembic.ini"))
cfg.set_main_option("script_location", str(root / "alembic"))

command.upgrade(cfg, revision)

with engine.connect() as conn:
context = MigrationContext.configure(conn)
heads = context.get_current_heads()
script = ScriptDirectory.from_config(cfg)
applied_revisions: set[str] = set()
for head in heads:
for rev in script.iterate_revisions(head, "base"):
applied_revisions.add(rev.revision)

with session_ctx() as session:
ran = run_all(
session,
include_repeatable=include_repeatable,
force=force,
allowed_alembic_revisions=applied_revisions,
)
typer.echo(f"applied {len(ran)} migration(s)")


if __name__ == "__main__":
cli()

Expand Down
1 change: 1 addition & 0 deletions data_migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Data migrations package
29 changes: 29 additions & 0 deletions data_migrations/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# ===============================================================================
# Copyright 2026 ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
from dataclasses import dataclass
from typing import Callable

from sqlalchemy.orm import Session


@dataclass(frozen=True)
class DataMigration:
id: str
alembic_revision: str
name: str
description: str
run: Callable[[Session], None]
is_repeatable: bool = False
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# ===============================================================================
# Copyright 2026 ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
from sqlalchemy import insert, select, update
from sqlalchemy.orm import Session

from data_migrations.base import DataMigration
from db.location import Location
from db.notes import Notes

NOTE_TYPE = "General"
BATCH_SIZE = 1000


def _iter_location_notes(session: Session):
stmt = select(
Location.id,
Location.nma_location_notes,
Location.release_status,
).where(Location.nma_location_notes.isnot(None))
for row in session.execute(stmt):
note = (row.nma_location_notes or "").strip()
if not note:
continue
yield row.id, note, row.release_status


def run(session: Session) -> None:
buffer: list[tuple[int, str, str]] = []
for item in _iter_location_notes(session):
buffer.append(item)
if len(buffer) >= BATCH_SIZE:
_flush_batch(session, buffer)
buffer.clear()
if buffer:
_flush_batch(session, buffer)


def _flush_batch(session: Session, batch: list[tuple[int, str, str]]) -> None:
location_ids = [row[0] for row in batch]
existing = session.execute(
select(Notes.target_id, Notes.content).where(
Notes.target_table == "location",
Notes.note_type == NOTE_TYPE,
Notes.target_id.in_(location_ids),
)
).all()
existing_set = {(row.target_id, row.content) for row in existing}

inserts = []
for location_id, note, release_status in batch:
if (location_id, note) in existing_set:
continue
inserts.append(
{
"target_id": location_id,
"target_table": "location",
"note_type": NOTE_TYPE,
"content": note,
"release_status": release_status or "draft",
}
)

if inserts:
session.execute(insert(Notes), inserts)

session.execute(
update(Location)
.where(Location.id.in_(location_ids))
.values(nma_location_notes=None)
)
session.commit()


MIGRATION = DataMigration(
id="20260205_0001_move_nma_location_notes",
alembic_revision="f0c9d8e7b6a5",
name="Move NMA location notes to Notes table",
description="Backfill polymorphic notes from Location.nma_location_notes.",
run=run,
is_repeatable=False,
)
1 change: 1 addition & 0 deletions data_migrations/migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Data migrations live here.
Loading
Loading