Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions backend/app/features/ingestion/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- A run is "successful" only if all required metadata files are present.
- case_name (from timing files) is the identity for Case grouping.
- The first successful run per case is the reference simulation.
- CASE_HASH is stored as execution metadata and used for drift warnings.
- Each run creates a Simulation linked to a Case via case_id.
- Reference simulations have run_config_deltas = None.
- Non-reference runs store config differences vs the reference.
Expand Down Expand Up @@ -99,6 +100,7 @@ class SimulationCreateDraft:
last_updated_by: UUID | None
hpc_username: str | None
run_config_deltas: dict[str, dict[str, str | None]] | None = None
case_hash: str | None = None


def ingest_archive(
Expand Down Expand Up @@ -172,6 +174,8 @@ def ingest_archive(
errors: list[dict[str, str]] = []
reference_cache: dict[str, SimulationConfigSnapshot] = {}
persisted_reference_cache: dict[UUID, SimulationConfigSnapshot | None] = {}
case_hash_cache: dict[str, str] = {}
persisted_case_hash_cache: dict[UUID, str | None] = {}

for parsed_simulation in parsed_simulations:
try:
Expand All @@ -180,6 +184,8 @@ def ingest_archive(
db=db,
reference_cache=reference_cache,
persisted_reference_cache=persisted_reference_cache,
case_hash_cache=case_hash_cache,
persisted_case_hash_cache=persisted_case_hash_cache,
)

if is_duplicate:
Expand Down Expand Up @@ -221,6 +227,8 @@ def _process_simulation_for_ingest(
db: Session,
reference_cache: dict[str, SimulationConfigSnapshot],
persisted_reference_cache: dict[UUID, SimulationConfigSnapshot | None],
case_hash_cache: dict[str, str],
persisted_case_hash_cache: dict[UUID, str | None],
) -> tuple[SimulationCreate | None, bool]:
"""Process one parsed simulation entry.

Expand Down Expand Up @@ -254,6 +262,13 @@ def _process_simulation_for_ingest(

prevalidated_draft = _prevalidate_simulation_create(parsed_simulation, machine_id)
case = _resolve_case(parsed_simulation, case_name, db)
_track_case_hash_observation(
parsed_simulation=parsed_simulation,
case=case,
case_hash_cache=case_hash_cache,
persisted_case_hash_cache=persisted_case_hash_cache,
db=db,
)

simulation = _build_simulation_create(
parsed_simulation=parsed_simulation,
Expand All @@ -267,6 +282,70 @@ def _process_simulation_for_ingest(
return simulation, False


def _track_case_hash_observation(
parsed_simulation: ParsedSimulation,
case: Case,
case_hash_cache: dict[str, str],
persisted_case_hash_cache: dict[UUID, str | None],
db: Session,
) -> None:
"""Track CASE_HASH drift without changing case-name grouping."""
current_hash = parsed_simulation.case_hash
if not current_hash:
return

baseline_hash = _get_case_hash_baseline(
case=case,
case_hash_cache=case_hash_cache,
persisted_case_hash_cache=persisted_case_hash_cache,
db=db,
)
if baseline_hash is None:
case_hash_cache.setdefault(case.name, current_hash)
return

case_hash_cache.setdefault(case.name, baseline_hash)
if baseline_hash == current_hash:
return

logger.warning(
"Observed CASE_HASH drift for case '%s': baseline='%s', current='%s' "
"from %s. Retaining case-name grouping.",
case.name,
baseline_hash,
current_hash,
parsed_simulation.execution_dir,
)


def _get_case_hash_baseline(
case: Case,
case_hash_cache: dict[str, str],
persisted_case_hash_cache: dict[UUID, str | None],
db: Session,
) -> str | None:
"""Return the CASE_HASH baseline for drift detection."""
if case.reference_simulation_id is not None:
if case.id not in persisted_case_hash_cache:
reference_simulation = (
db.query(Simulation)
.filter(Simulation.id == case.reference_simulation_id)
.first()
)
baseline_hash = (
reference_simulation.case_hash if reference_simulation else None
)
persisted_case_hash_cache[case.id] = baseline_hash
if baseline_hash is not None:
case_hash_cache.setdefault(case.name, baseline_hash)

baseline_hash = persisted_case_hash_cache[case.id]
if baseline_hash is not None:
return baseline_hash

return case_hash_cache.get(case.name)
Comment thread
tomvothecoder marked this conversation as resolved.


def _require_case_name(parsed_simulation: ParsedSimulation) -> str:
"""Return case_name from metadata or raise a descriptive error."""
case_name = parsed_simulation.case_name
Expand Down Expand Up @@ -719,6 +798,7 @@ def _build_simulation_create_draft(
last_updated_by=None,
hpc_username=parsed_simulation.hpc_username,
run_config_deltas=run_config_deltas,
case_hash=parsed_simulation.case_hash,
)

return simulation_draft
Expand Down
3 changes: 3 additions & 0 deletions backend/app/features/ingestion/parsers/case_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def parse_env_case(env_case_path: str | Path) -> dict[str, str | None]:
Dictionary with case metadata (values are str or None), including:

- ``case_name``: Case name (``CASE``)
- ``case_hash``: Case hash (``CASE_HASH``)
- ``case_group``: Case group (``CASE_GROUP``)
- ``machine``: Machine name (``MACH``)
- ``user``: Real user (``REALUSER``)
Expand All @@ -34,6 +35,7 @@ def parse_env_case(env_case_path: str | Path) -> dict[str, str | None]:
env_case_path = Path(env_case_path)

case_name = _extract_value_from_file(env_case_path, "CASE")
case_hash = _extract_value_from_file(env_case_path, "CASE_HASH")
case_group = _extract_value_from_file(env_case_path, "CASE_GROUP")
machine = _extract_value_from_file(env_case_path, "MACH")
user = _extract_value_from_file(env_case_path, "REALUSER")
Expand All @@ -44,6 +46,7 @@ def parse_env_case(env_case_path: str | Path) -> dict[str, str | None]:

return {
"case_name": case_name,
"case_hash": case_hash,
"case_group": case_group,
"machine": machine,
"user": user,
Expand Down
1 change: 1 addition & 0 deletions backend/app/features/ingestion/parsers/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def _parse_all_files(exec_dir: str, files: dict[str, str | None]) -> ParsedSimul
git_tag=metadata.get("git_tag"),
git_commit_hash=metadata.get("git_commit_hash"),
status=metadata.get("status") or SimulationStatus.UNKNOWN.value,
case_hash=metadata.get("case_hash"),
)


Expand Down
1 change: 1 addition & 0 deletions backend/app/features/ingestion/parsers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ class ParsedSimulation:
git_tag: str | None
git_commit_hash: str | None
status: str | None
case_hash: str | None = None
1 change: 1 addition & 0 deletions backend/app/features/simulation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Simulation(Base, IDMixin, TimestampMixin):
execution_id: Mapped[str] = mapped_column(
Text, unique=True, index=True, nullable=False
)
case_hash: Mapped[str | None] = mapped_column(String(64), nullable=True)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
compset: Mapped[str] = mapped_column(String(120))
compset_alias: Mapped[str] = mapped_column(Text)
Expand Down
20 changes: 20 additions & 0 deletions backend/app/features/simulation/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ class SimulationCreate(CamelInBaseModel):
),
),
]
case_hash: Annotated[
str | None,
Field(
None,
description=(
"Optional CASE_HASH parsed from env_case.xml. Informational for "
"ingestion consistency checks; not currently the grouping key."
),
),
]
description: Annotated[
str | None, Field(None, description="Optional description of the simulation")
]
Expand Down Expand Up @@ -404,6 +414,16 @@ class SimulationOut(CamelOutBaseModel):
),
),
]
case_hash: Annotated[
str | None,
Field(
None,
description=(
"Optional CASE_HASH parsed from env_case.xml. Informational for "
"ingestion consistency checks; not currently the grouping key."
),
),
]
is_reference: Annotated[
bool,
Field(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Add case_hash column to simulations.

Revision ID: 20260414_000000
Revises: 20260331_000000
Create Date: 2026-03-30 00:00:00.000000
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "20260414_000000"
down_revision: Union[str, Sequence[str], None] = "20260331_000000"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add nullable CASE_HASH storage to simulations."""
op.add_column(
"simulations",
sa.Column("case_hash", sa.String(length=64), nullable=True),
)


def downgrade() -> None:
"""Remove CASE_HASH storage from simulations."""
op.drop_column("simulations", "case_hash")
3 changes: 3 additions & 0 deletions backend/tests/features/ingestion/parsers/test_case_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def test_campaign_and_experiment_type_are_derived(self, tmp_path):
xml_case = """
<config>
<entry id="CASE" value="v3.LR.historical_0121" />
<entry id="CASE_HASH" value="abc123def456" />
</config>
"""
tmp_case = tmp_path / "env_case_campaign.xml"
Expand All @@ -68,6 +69,7 @@ def test_campaign_and_experiment_type_are_derived(self, tmp_path):

assert result["campaign"] == "v3.LR.historical"
assert result["experiment_type"] == "historical"
assert result["case_hash"] == "abc123def456"

def test_non_dot_case_name_does_not_set_campaign(self, tmp_path):
xml_case = """
Expand All @@ -91,6 +93,7 @@ def test_read_error_returns_none(self):
result = parse_env_case(Path("/tmp/missing.xml"))

assert result["case_name"] is None
assert result["case_hash"] is None
assert result["case_group"] is None


Expand Down
Loading
Loading