From 0ea82068a6232777bdc29099e310cd7fd792b7a9 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Mon, 6 Apr 2026 10:28:23 -0700 Subject: [PATCH 1/2] Add Phase 1: daily incremental assembly version tracking --- flows/parsers/backfill_missing_versions.py | 232 +++++++ .../parsers/update_historical_incremental.py | 401 +++++++++++ tests/test_incremental_update.py | 644 ++++++++++++++++++ 3 files changed, 1277 insertions(+) create mode 100644 flows/parsers/backfill_missing_versions.py create mode 100644 flows/parsers/update_historical_incremental.py create mode 100644 tests/test_incremental_update.py diff --git a/flows/parsers/backfill_missing_versions.py b/flows/parsers/backfill_missing_versions.py new file mode 100644 index 0000000..d153097 --- /dev/null +++ b/flows/parsers/backfill_missing_versions.py @@ -0,0 +1,232 @@ +"""Targeted backfill for assembly versions missing from historical records. + +Run this when the incremental updater reports assemblies whose previous +version was absent from the previous parsed TSV. Fetches only the specified +missing versions from NCBI, parses them, and merges the result into the +existing assembly_historical.tsv. + +Usage: + python -m flows.parsers.backfill_missing_versions \\ + --missing_json tmp/missing_versions.json \\ + --yaml_path configs/assembly_historical.types.yaml \\ + --work_dir tmp +""" + +import csv +import json +import os +from pathlib import Path +from typing import Optional + +from flows.lib import utils +from flows.lib.conditional_import import flow +from flows.lib.shared_args import WORK_DIR, YAML_PATH +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required +from flows.lib.utils import Parser +from flows.parsers.parse_backfill_historical_versions import ( + find_all_assembly_versions, + parse_historical_version, + parse_version, + setup_cache_directories, +) +from flows.parsers.parse_ncbi_assemblies import write_to_tsv + +MISSING_JSON = { + "flags": ["-m", "--missing_json"], + "keys": { + "help": "Path to the missing_versions.json produced by the incremental updater.", + "type": str, + }, +} + + +def load_missing_versions(missing_json: str) -> list[dict]: + """Load the list of missing versions from a JSON file. + + The file is written by run_incremental_historical_update when it + encounters assemblies with no matching previous version in the parsed TSV. + + Args: + missing_json (str): Path to missing_versions.json. + + Returns: + list: Missing-version records, each with base_accession, missing_version, + new_version, and new_accession keys. + """ + with open(missing_json, encoding="utf-8") as f: + return json.load(f) + + +def load_existing_historical(historical_tsv: str) -> dict[str, dict]: + """Load an existing assembly_historical.tsv keyed by genbankAccession. + + Args: + historical_tsv (str): Path to the existing historical TSV file. + + Returns: + dict: Rows keyed by genbankAccession, or empty dict if the file is absent. + """ + existing: dict[str, dict] = {} + if not Path(historical_tsv).exists(): + return existing + + with open(historical_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + acc = row.get("genbankAccession", "") + if acc: + existing[acc] = dict(row) + + return existing + + +@flow(log_prints=True) +def backfill_missing_versions( + missing_json: str, + yaml_path: str, + work_dir: str = ".", +) -> None: + """Fetch and parse assembly versions missing from the historical TSV. + + For each entry in missing_json, discovers all versions of that assembly + via NCBI FTP, fetches metadata for the specific missing version, parses it + through the standard GenomeHubs pipeline, and merges the result into the + existing assembly_historical.tsv. + + Args: + missing_json (str): Path to missing_versions.json from the incremental + updater. + yaml_path (str): Path to assembly_historical.types.yaml. + work_dir (str): Working directory for caches and output. + """ + setup_cache_directories(work_dir) + config = utils.load_config(config_file=yaml_path) + + missing = load_missing_versions(missing_json) + if not missing: + print("No missing versions to backfill.") + return + + historical_tsv = config.meta["file_name"] + existing = load_existing_historical(historical_tsv) + parsed = dict(existing) + + total = len(missing) + succeeded = 0 + failed = 0 + + separator = "=" * 80 + print(f"\n{separator}") + print("MISSING VERSION BACKFILL") + print(f"{separator}") + print(f" Missing entries to process: {total}") + print(f" Merging into: {historical_tsv}") + print(f" Existing records: {len(existing)}") + print(f"{separator}\n") + + for i, entry in enumerate(missing): + base_acc = entry["base_accession"] + missing_version = entry["missing_version"] + new_accession = entry["new_accession"] + target_acc = f"{base_acc}.{missing_version}" + + print(f"[{i + 1}/{total}] {target_acc}") + + all_versions = find_all_assembly_versions(new_accession, work_dir) + if not all_versions: + print(" Warning: No versions found via FTP — skipping.") + failed += 1 + continue + + version_data = next( + (v for v in all_versions if parse_version(v.get("accession", "")) == missing_version), + None, + ) + if version_data is None: + print(f" Warning: v{missing_version} not found in FTP listing — skipping.") + failed += 1 + continue + + try: + print(f" Parsing v{missing_version}...", end=" ", flush=True) + row = parse_historical_version( + version_data=version_data, + config=config, + base_accession=base_acc, + version_num=missing_version, + current_accession=new_accession, + ) + genbank_acc = row.get("genbankAccession", target_acc) + parsed[genbank_acc] = row + succeeded += 1 + print("done") + except Exception as e: + print(f"failed ({e})") + failed += 1 + continue + + if succeeded > 0: + print(f"\nWriting {len(parsed)} records to {historical_tsv}...") + write_to_tsv(parsed, config) + + print(f"\n{separator}") + print("MISSING VERSION BACKFILL COMPLETE") + print(f"{separator}") + print(f" Succeeded: {succeeded}/{total}") + if failed > 0: + print(f" Failed: {failed}/{total}") + print(f" Total records in {historical_tsv}: {len(parsed)}") + print(f"{separator}\n") + + +def backfill_missing_versions_wrapper( + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, +) -> None: + """Wrapper matching the fetch_parse_validate parser signature. + + Locates missing_versions.json in work_dir and delegates to + backfill_missing_versions. + + Args: + working_yaml (str): Path to the working YAML file. + work_dir (str): Path to the working directory containing + missing_versions.json. + append (bool): Unused; accepted for pipeline compatibility. + data_freeze_path (str, optional): Unused; accepted for pipeline + compatibility. + **kwargs: Additional keyword arguments passed by the pipeline. + """ + missing_json_path = os.path.join(work_dir, "missing_versions.json") + if not Path(missing_json_path).exists(): + raise FileNotFoundError(f"No missing_versions.json found in {work_dir}") + + backfill_missing_versions( + missing_json=missing_json_path, + yaml_path=working_yaml, + work_dir=work_dir, + ) + + +def plugin() -> Parser: + """Register the flow.""" + return Parser( + name="BACKFILL_MISSING_VERSIONS", + func=backfill_missing_versions_wrapper, + description="Targeted backfill for assembly versions missing from historical records.", + ) + + +if __name__ == "__main__": + args = _parse_args( + [required(MISSING_JSON), required(YAML_PATH), WORK_DIR], + description="Targeted backfill for assembly versions missing from historical records", + ) + backfill_missing_versions( + missing_json=args.missing_json, + yaml_path=args.yaml_path, + work_dir=args.work_dir, + ) diff --git a/flows/parsers/update_historical_incremental.py b/flows/parsers/update_historical_incremental.py new file mode 100644 index 0000000..3de0253 --- /dev/null +++ b/flows/parsers/update_historical_incremental.py @@ -0,0 +1,401 @@ +"""Daily incremental updates to historical assembly records. + +Identifies assembly versions newly superseded since the last run and appends them to assembly_historical.tsv. No NCBI fetches are required — data is copied directly from the previous assembly_current.tsv parse output. + +Usage: + python -m flows.parsers.update_historical_incremental \\ + --input_path assembly_data_report.jsonl \\ + --previous_tsv assembly_current.tsv.previous \\ + --historical_tsv outputs/assembly_historical.tsv +""" + +import csv +import json +import os +from glob import glob +from pathlib import Path +from typing import Optional + +from flows.lib.conditional_import import flow +from flows.lib.shared_args import INPUT_PATH +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required +from flows.lib.utils import Parser +from flows.parsers.parse_backfill_historical_versions import parse_accession + +PREVIOUS_TSV = { + "flags": ["-p", "--previous_tsv"], + "keys": { + "help": "Path to assembly_current.tsv from the previous run.", + "type": str, + }, +} + +HISTORICAL_TSV = { + "flags": ["-H", "--historical_tsv"], + "keys": { + "help": "Path to assembly_historical.tsv to update.", + "type": str, + }, +} + + +def load_previous_parsed_by_base(previous_tsv: str) -> dict[str, dict[int, dict]]: + """Load previous parsed results indexed by base accession and version. + + Args: + previous_tsv (str): Path to assembly_current.tsv from the previous run. + + Returns: + dict: Nested mapping of base_accession -> version -> row data. + Returns an empty dict if the file is not found, which is expected + on the first run after the Phase 0 backfill. + """ + previous_by_base: dict[str, dict[int, dict]] = {} + + try: + with open(previous_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + accession = row["accession"] + base_acc, version = parse_accession(accession) + if base_acc not in previous_by_base: + previous_by_base[base_acc] = {} + previous_by_base[base_acc][version] = dict(row) + except FileNotFoundError: + print(f"Warning: Previous TSV not found: {previous_tsv}") + print(" This is expected for the first run after the Phase 0 backfill.") + return {} + + total = sum(len(v) for v in previous_by_base.values()) + print(f"Loaded {total} assemblies from previous parsed results.") + print(f" Unique base accessions: {len(previous_by_base)}") + + return previous_by_base + + +def build_superseded_row( + previous_row: dict, + previous_version: int, + new_accession: str, + new_version: int, + release_date: str, +) -> dict: + """Build a superseded row from a previous row with updated metadata. + + Args: + previous_row (dict): Row data copied from the previous parsed TSV. + previous_version (int): Version number of the assembly being superseded. + new_accession (str): Accession of the assembly that supersedes this one. + new_version (int): Version number of the superseding assembly. + release_date (str): Release date of the superseding assembly. + + Returns: + dict: Updated row with version_status, assembly_id, and superseded_by fields. + """ + base_acc, _ = parse_accession(previous_row["accession"]) + row = previous_row.copy() + row["version_status"] = "superseded" + row["assembly_id"] = f"{base_acc}_{previous_version}" + row["superseded_by"] = new_accession + row["superseded_by_version"] = new_version + row["superseded_date"] = release_date + return row + + +def build_missing_version_record( + base_acc: str, + missing_version: int, + new_version: int, + new_accession: str, + is_new_series: bool = False, +) -> dict: + """Build a record describing a version missing from the previous parsed TSV. + + Args: + base_acc (str): Base accession without version suffix. + missing_version (int): The version number that could not be found. + new_version (int): The new version number that triggered this check. + new_accession (str): Full accession of the new assembly. + is_new_series (bool): True if this is a new assembly series with no + prior history at all. + + Returns: + dict: Record suitable for writing to a missing-versions JSON file. + """ + record = { + "base_accession": base_acc, + "missing_version": missing_version, + "new_version": new_version, + "new_accession": new_accession, + } + if is_new_series: + record["note"] = "New assembly series — prior versions may need backfill" + return record + + +def identify_newly_superseded( + new_jsonl: str, + previous_by_base: dict[str, dict[int, dict]], +) -> tuple[list[dict], list[dict]]: + """Identify versions that became superseded in the current JSONL update. + + For each assembly with version > 1 in the new JSONL, checks whether the + immediately preceding version exists in the previous parsed TSV. Assemblies + whose predecessor is found are added to the superseded list; those missing a + predecessor are recorded for optional backfill. + + Args: + new_jsonl (str): Path to the current assembly_data_report.jsonl. + previous_by_base (dict): Indexed previous parsed results from + load_previous_parsed_by_base. + + Returns: + tuple: (newly_superseded, missing_versions) where each is a list of dicts. + """ + newly_superseded: list[dict] = [] + missing_versions: list[dict] = [] + + with open(new_jsonl) as f: + for line in f: + assembly = json.loads(line) + accession = assembly["accession"] + base_acc, new_version = parse_accession(accession) + + if new_version <= 1: + continue + + previous_version = new_version - 1 + + if base_acc not in previous_by_base: + missing_versions.append(build_missing_version_record( + base_acc, previous_version, new_version, accession, + is_new_series=True, + )) + continue + + if previous_version not in previous_by_base[base_acc]: + missing_versions.append(build_missing_version_record( + base_acc, previous_version, new_version, accession, + )) + continue + + previous_row = previous_by_base[base_acc][previous_version] + release_date = assembly.get("releaseDate") or "" + newly_superseded.append(build_superseded_row( + previous_row, previous_version, accession, new_version, release_date, + )) + + return newly_superseded, missing_versions + + +def append_superseded_to_tsv( + newly_superseded: list[dict], historical_tsv: str +) -> None: + """Append newly superseded rows to the historical TSV, deduplicating by assembly_id. + + Reads the existing file if present, merges new rows (new rows take + precedence on duplicate assembly_id), and writes the combined result back. + + Args: + newly_superseded (list): Row dicts from identify_newly_superseded. + historical_tsv (str): Path to assembly_historical.tsv. + """ + if not newly_superseded: + print(" No newly superseded versions to add.") + return + + existing: dict[str, dict] = {} + historical_path = Path(historical_tsv) + + if historical_path.exists(): + with open(historical_tsv, encoding="utf-8") as f: + for row in csv.DictReader(f, delimiter="\t"): + existing[row["assembly_id"]] = dict(row) + + for row in newly_superseded: + existing[row["assembly_id"]] = row + + fieldnames = list(next(iter(existing.values())).keys()) + with open(historical_tsv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, fieldnames=fieldnames, delimiter="\t", extrasaction="ignore" + ) + writer.writeheader() + writer.writerows(existing.values()) + + print(f" Added {len(newly_superseded)} newly superseded versions.") + print(f" Total records in {historical_tsv}: {len(existing)}") + + +def print_superseded_summary(newly_superseded: list[dict]) -> None: + """Print a short summary of the newly superseded versions. + + Args: + newly_superseded (list): Row dicts from identify_newly_superseded. + """ + if not newly_superseded: + print(" Found: 0 newly superseded versions.") + return + + print(f" Found: {len(newly_superseded)} newly superseded versions.") + print(" Examples:") + for row in newly_superseded[:5]: + print(f" {row['accession']} -> superseded by v{row['superseded_by_version']}") + if len(newly_superseded) > 5: + print(f" ... and {len(newly_superseded) - 5} more") + + +def print_missing_versions_warning(missing: list[dict]) -> None: + """Print a warning listing versions absent from the previous parsed TSV. + + Args: + missing (list): Missing-version records from identify_newly_superseded. + """ + if not missing: + return + + print(f"\n Warning: {len(missing)} assemblies have missing previous versions.") + print(" These may need manual backfill:") + for m in missing[:5]: + print( + f" {m['base_accession']}: " + f"need v{m['missing_version']}, have v{m['new_version']}" + ) + if len(missing) > 5: + print(f" ... and {len(missing) - 5} more") + print("\n To backfill missing versions, run:") + print(" python -m flows.parsers.backfill_missing_versions") + + +@flow(log_prints=True) +def run_incremental_historical_update( + new_jsonl: str, + previous_tsv: str, + historical_tsv: str, +) -> dict: + """Daily incremental update of the historical assembly TSV. + + Called after parse_ncbi_assemblies completes. Uses the previous parsed + TSV — no NCBI fetches are made. + + Args: + new_jsonl (str): Path to the current assembly_data_report.jsonl. + previous_tsv (str): Path to assembly_current.tsv from the previous run. + historical_tsv (str): Path to assembly_historical.tsv to update. + + Returns: + dict: Summary with keys newly_superseded_count, missing_versions_count, + and missing_versions (list of records). + """ + separator = "=" * 80 + print(f"\n{separator}") + print("INCREMENTAL HISTORICAL UPDATE") + print(f"{separator}\n") + + print("[1/3] Loading previous parsed results...") + previous_by_base = load_previous_parsed_by_base(previous_tsv) + + if not previous_by_base: + print(" No previous parsed data available — skipping incremental update.") + print(" This is expected for the first run after the Phase 0 backfill.\n") + return { + "newly_superseded_count": 0, + "missing_versions_count": 0, + "missing_versions": [], + } + + print("\n[2/3] Identifying newly superseded versions...") + newly_superseded, missing = identify_newly_superseded(new_jsonl, previous_by_base) + print_superseded_summary(newly_superseded) + print_missing_versions_warning(missing) + + print("\n[3/3] Updating historical TSV...") + append_superseded_to_tsv(newly_superseded, historical_tsv) + + print(f"\n{separator}") + print( + f"INCREMENTAL UPDATE COMPLETE " + f"Superseded: {len(newly_superseded)} " + f"Missing: {len(missing)}" + ) + print(f"{separator}\n") + + return { + "newly_superseded_count": len(newly_superseded), + "missing_versions_count": len(missing), + "missing_versions": missing, + } + + +def incremental_update_wrapper( + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, +) -> None: + """Wrapper matching the fetch_parse_validate parser signature. + + Derives the previous TSV and historical TSV paths from work_dir and + delegates to run_incremental_historical_update. + + Args: + working_yaml (str): Path to the working YAML file (unused; accepted + for pipeline compatibility). + work_dir (str): Path to the working directory containing the JSONL, + the previous TSV, and the historical TSV. + append (bool): Unused; accepted for pipeline compatibility. + data_freeze_path (str, optional): Unused; accepted for pipeline + compatibility. + **kwargs: Additional keyword arguments passed by the pipeline. + """ + glob_path = os.path.join(work_dir, "*.jsonl") + paths = glob(glob_path) + if not paths: + raise FileNotFoundError(f"No jsonl file found in {work_dir}") + if len(paths) > 1: + raise ValueError(f"More than one jsonl file found in {work_dir}") + + results = run_incremental_historical_update( + new_jsonl=paths[0], + previous_tsv=os.path.join(work_dir, "assembly_current.tsv.previous"), + historical_tsv=os.path.join(work_dir, "assembly_historical.tsv"), + ) + + if results["missing_versions_count"] > 0: + missing_json_path = os.path.join(work_dir, "missing_versions.json") + with open(missing_json_path, "w", encoding="utf-8") as f: + json.dump(results["missing_versions"], f, indent=2) + print(f" Missing versions written to: {missing_json_path}") + + +def plugin() -> Parser: + """Register the flow.""" + return Parser( + name="UPDATE_HISTORICAL_INCREMENTAL", + func=incremental_update_wrapper, + description="Daily incremental update of historical assembly records.", + ) + + +if __name__ == "__main__": + args = _parse_args( + [required(INPUT_PATH), required(PREVIOUS_TSV), required(HISTORICAL_TSV)], + description="Daily incremental update of historical assembly records", + ) + results = run_incremental_historical_update( + new_jsonl=args.input_path, + previous_tsv=args.previous_tsv, + historical_tsv=args.historical_tsv, + ) + print(f"Summary: superseded={results['newly_superseded_count']}, " + f"missing={results['missing_versions_count']}") + if results["missing_versions_count"] > 0: + missing_json_path = Path(args.historical_tsv).parent / "missing_versions.json" + with open(missing_json_path, "w", encoding="utf-8") as f: + json.dump(results["missing_versions"], f, indent=2) + print( + f" Action needed: {results['missing_versions_count']} missing versions." + ) + print(f" Written to: {missing_json_path}") + print(" Run: python -m flows.parsers.backfill_missing_versions") diff --git a/tests/test_incremental_update.py b/tests/test_incremental_update.py new file mode 100644 index 0000000..9d05642 --- /dev/null +++ b/tests/test_incremental_update.py @@ -0,0 +1,644 @@ +"""Tests for update_historical_incremental.py and backfill_missing_versions.py + +Covers: +- Loading and indexing previous parsed TSV results +- Building superseded and missing-version records +- Core supersession detection logic (superseded, missing-with-gap, new-series, v1-skip) +- Appending to historical TSV with deduplication +- Incremental orchestrator flow behaviour +- Loading existing historical TSV (backfill helper) +- Backfill flow: version selection and TSV merge +""" + +import csv +import json +import os +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +os.environ["SKIP_PREFECT"] = "true" + +from flows.parsers import ( # noqa: E402 + backfill_missing_versions as backfill_module, + update_historical_incremental as incremental_module, +) +from flows.lib.utils import Parser # noqa: E402 +from flows.parsers.backfill_missing_versions import ( # noqa: E402 + backfill_missing_versions, + backfill_missing_versions_wrapper, + load_existing_historical, +) +from flows.parsers.update_historical_incremental import ( # noqa: E402 + append_superseded_to_tsv, + build_missing_version_record, + build_superseded_row, + identify_newly_superseded, + load_previous_parsed_by_base, + run_incremental_historical_update, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def write_tsv(path: Path, rows: list[dict]) -> None: + """Write a list of dicts to a tab-separated file.""" + if not rows: + return + with open(path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()), delimiter="\t") + writer.writeheader() + writer.writerows(rows) + + +def read_tsv(path: Path) -> list[dict]: + """Read a tab-separated file into a list of dicts.""" + with open(path, encoding="utf-8") as f: + return list(csv.DictReader(f, delimiter="\t")) + + +def write_jsonl(path: Path, records: list[dict]) -> None: + """Write a list of dicts as newline-delimited JSON.""" + with open(path, "w", encoding="utf-8") as f: + for record in records: + f.write(json.dumps(record) + "\n") + + +# --------------------------------------------------------------------------- +# TestLoadPreviousParsed +# --------------------------------------------------------------------------- + +class TestLoadPreviousParsed: + """load_previous_parsed_by_base indexes rows by base accession and version.""" + + def test_missing_file_returns_empty(self, tmp_path): + result = load_previous_parsed_by_base(str(tmp_path / "nope.tsv")) + assert result == {} + + def test_single_version_indexed(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [{"accession": "GCA_000222935.1", "taxon_id": "12345"}]) + result = load_previous_parsed_by_base(str(tsv)) + assert "GCA_000222935" in result + assert 1 in result["GCA_000222935"] + assert result["GCA_000222935"][1]["taxon_id"] == "12345" + + def test_multi_version_same_base(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"}, + {"accession": "GCA_000222935.2", "taxon_id": "1"}, + ]) + result = load_previous_parsed_by_base(str(tsv)) + assert len(result["GCA_000222935"]) == 2 + assert 1 in result["GCA_000222935"] + assert 2 in result["GCA_000222935"] + + def test_multiple_base_accessions(self, tmp_path): + tsv = tmp_path / "current.tsv" + write_tsv(tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"}, + {"accession": "GCA_000412225.1", "taxon_id": "2"}, + ]) + result = load_previous_parsed_by_base(str(tsv)) + assert len(result) == 2 + assert "GCA_000222935" in result + assert "GCA_000412225" in result + + +# --------------------------------------------------------------------------- +# TestBuildSupersededRow +# --------------------------------------------------------------------------- + +class TestBuildSupersededRow: + """build_superseded_row stamps the correct metadata onto a copied row.""" + + def _base_row(self): + return { + "accession": "GCA_000222935.1", + "taxon_id": "12345", + "assembly_level": "Chromosome", + } + + def test_version_status_set(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["version_status"] == "superseded" + + def test_assembly_id_format(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["assembly_id"] == "GCA_000222935_1" + + def test_superseded_by_fields(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["superseded_by"] == "GCA_000222935.2" + assert row["superseded_by_version"] == 2 + assert row["superseded_date"] == "2024-01-15" + + def test_original_row_not_mutated(self): + original = self._base_row() + build_superseded_row(original, 1, "GCA_000222935.2", 2, "2024-01-15") + assert "version_status" not in original + + def test_existing_fields_preserved(self): + row = build_superseded_row(self._base_row(), 1, "GCA_000222935.2", 2, "2024-01-15") + assert row["taxon_id"] == "12345" + assert row["assembly_level"] == "Chromosome" + + +# --------------------------------------------------------------------------- +# TestBuildMissingVersionRecord +# --------------------------------------------------------------------------- + +class TestBuildMissingVersionRecord: + """build_missing_version_record captures the gap details.""" + + def test_required_fields(self): + rec = build_missing_version_record("GCA_000222935", 2, 3, "GCA_000222935.3") + assert rec["base_accession"] == "GCA_000222935" + assert rec["missing_version"] == 2 + assert rec["new_version"] == 3 + assert rec["new_accession"] == "GCA_000222935.3" + + def test_no_note_by_default(self): + rec = build_missing_version_record("GCA_000222935", 1, 2, "GCA_000222935.2") + assert "note" not in rec + + def test_note_present_for_new_series(self): + rec = build_missing_version_record( + "GCA_000222935", 1, 2, "GCA_000222935.2", is_new_series=True + ) + assert "note" in rec + + +# --------------------------------------------------------------------------- +# TestIdentifyNewlySuperseded +# --------------------------------------------------------------------------- + +class TestIdentifyNewlySuperseded: + """identify_newly_superseded covers all branching cases.""" + + def _write_jsonl(self, tmp_path, records): + path = tmp_path / "new.jsonl" + write_jsonl(path, records) + return str(path) + + def test_v1_assembly_skipped(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_000222935.1"}]) + superseded, missing = identify_newly_superseded(jsonl, {}) + assert superseded == [] + assert missing == [] + + def test_superseded_found_when_previous_version_present(self, tmp_path): + jsonl = self._write_jsonl( + tmp_path, [{"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"}] + ) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert len(superseded) == 1 + assert superseded[0]["superseded_by"] == "GCA_000222935.2" + assert missing == [] + + def test_missing_with_version_gap(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_000222935.3"}]) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert superseded == [] + assert len(missing) == 1 + assert missing[0]["missing_version"] == 2 + + def test_new_series_no_prior_base(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [{"accession": "GCA_999999999.2"}]) + superseded, missing = identify_newly_superseded(jsonl, {}) + assert superseded == [] + assert len(missing) == 1 + assert missing[0]["note"] + + def test_mixed_batch(self, tmp_path): + jsonl = self._write_jsonl(tmp_path, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-01"}, + {"accession": "GCA_000412225.1"}, + {"accession": "GCA_999999999.2"}, + ]) + previous = { + "GCA_000222935": {1: {"accession": "GCA_000222935.1", "taxon_id": "1"}} + } + superseded, missing = identify_newly_superseded(jsonl, previous) + assert len(superseded) == 1 + assert len(missing) == 1 + + +# --------------------------------------------------------------------------- +# TestAppendSupersededToTsv +# --------------------------------------------------------------------------- + +class TestAppendSupersededToTsv: + """append_superseded_to_tsv correctly creates, appends, and deduplicates.""" + + def _make_row(self, acc, assembly_id, status="superseded"): + return { + "accession": acc, + "assembly_id": assembly_id, + "version_status": status, + } + + def test_creates_new_file(self, tmp_path): + tsv = tmp_path / "historical.tsv" + rows = [self._make_row("GCA_000222935.1", "GCA_000222935_1")] + append_superseded_to_tsv(rows, str(tsv)) + assert tsv.exists() + result = read_tsv(tsv) + assert len(result) == 1 + assert result[0]["accession"] == "GCA_000222935.1" + + def test_appends_to_existing(self, tmp_path): + tsv = tmp_path / "historical.tsv" + write_tsv(tsv, [self._make_row("GCA_000412225.1", "GCA_000412225_1")]) + append_superseded_to_tsv( + [self._make_row("GCA_000222935.1", "GCA_000222935_1")], str(tsv) + ) + result = read_tsv(tsv) + assert len(result) == 2 + + def test_dedup_on_assembly_id_keeps_new(self, tmp_path): + tsv = tmp_path / "historical.tsv" + old_row = { + "accession": "GCA_000222935.1", + "assembly_id": "GCA_000222935_1", + "version_status": "superseded", + "superseded_by": "GCA_000222935.2", + } + write_tsv(tsv, [old_row]) + new_row = dict(old_row) + new_row["superseded_by"] = "GCA_000222935.3" + append_superseded_to_tsv([new_row], str(tsv)) + result = read_tsv(tsv) + assert len(result) == 1 + assert result[0]["superseded_by"] == "GCA_000222935.3" + + def test_no_op_when_empty_list(self, tmp_path): + tsv = tmp_path / "historical.tsv" + append_superseded_to_tsv([], str(tsv)) + assert not tsv.exists() + + +# --------------------------------------------------------------------------- +# TestIncrementalOrchestrator +# --------------------------------------------------------------------------- + +class TestIncrementalOrchestrator: + """run_incremental_historical_update orchestrator behaviour.""" + + def test_no_previous_tsv_returns_empty_result(self, tmp_path): + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [{"accession": "GCA_000222935.2"}]) + result = run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(tmp_path / "nope.tsv"), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["newly_superseded_count"] == 0 + assert result["missing_versions_count"] == 0 + assert result["missing_versions"] == [] + + def test_one_superseded_produces_correct_counts(self, tmp_path): + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} + ]) + result = run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["newly_superseded_count"] == 1 + assert result["missing_versions_count"] == 0 + + def test_missing_version_detected_in_orchestrator_result(self, tmp_path): + """v3 present, v2 missing → missing_versions_count == 1.""" + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.3", "releaseDate": "2024-06-01"} + ]) + result = run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(tmp_path / "historical.tsv"), + ) + assert result["missing_versions_count"] == 1 + assert result["missing_versions"][0]["base_accession"] == "GCA_000222935" + assert result["missing_versions"][0]["missing_version"] == 2 + + def test_historical_tsv_written(self, tmp_path): + previous_tsv = tmp_path / "previous.tsv" + write_tsv(previous_tsv, [ + {"accession": "GCA_000222935.1", "taxon_id": "1"} + ]) + jsonl = tmp_path / "new.jsonl" + write_jsonl(jsonl, [ + {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} + ]) + historical_tsv = tmp_path / "historical.tsv" + run_incremental_historical_update( + new_jsonl=str(jsonl), + previous_tsv=str(previous_tsv), + historical_tsv=str(historical_tsv), + ) + assert historical_tsv.exists() + rows = read_tsv(historical_tsv) + assert len(rows) == 1 + assert rows[0]["version_status"] == "superseded" + + +# --------------------------------------------------------------------------- +# TestLoadExistingHistorical +# --------------------------------------------------------------------------- + +class TestLoadExistingHistorical: + """load_existing_historical indexes rows by genbankAccession.""" + + def test_missing_file_returns_empty(self, tmp_path): + result = load_existing_historical(str(tmp_path / "nope.tsv")) + assert result == {} + + def test_rows_keyed_by_genbank_accession(self, tmp_path): + tsv = tmp_path / "historical.tsv" + write_tsv(tsv, [ + {"genbankAccession": "GCA_000222935.1", "version_status": "superseded"}, + {"genbankAccession": "GCA_000412225.1", "version_status": "superseded"}, + ]) + result = load_existing_historical(str(tsv)) + assert "GCA_000222935.1" in result + assert "GCA_000412225.1" in result + assert len(result) == 2 + + def test_row_data_preserved(self, tmp_path): + tsv = tmp_path / "historical.tsv" + write_tsv(tsv, [ + {"genbankAccession": "GCA_000222935.1", "version_status": "superseded"} + ]) + result = load_existing_historical(str(tsv)) + assert result["GCA_000222935.1"]["version_status"] == "superseded" + + +# --------------------------------------------------------------------------- +# TestBackfillMissingVersionsFlow +# --------------------------------------------------------------------------- + +class TestBackfillMissingVersionsFlow: + """backfill_missing_versions selects the right version and merges into TSV.""" + + def _write_missing_json(self, tmp_path, entries): + path = tmp_path / "missing.json" + with open(path, "w") as f: + json.dump(entries, f) + return str(path) + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_correct_version_selected( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """Only the requested missing version should be parsed, not all versions.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.return_value = [ + {"accession": "GCA_000222935.1"}, + {"accession": "GCA_000222935.2"}, + ] + mock_parse.return_value = {"genbankAccession": "GCA_000222935.1"} + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_parse.assert_called_once() + parsed_version_data = mock_parse.call_args[1]["version_data"] + assert parsed_version_data["accession"] == "GCA_000222935.1" + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_merges_with_existing_historical( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """New rows must be merged with existing historical rows before writing.""" + historical_tsv = tmp_path / "historical.tsv" + write_tsv(historical_tsv, [ + {"genbankAccession": "GCA_000412225.1", "version_status": "superseded"} + ]) + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(historical_tsv)} + ) + mock_find.return_value = [{"accession": "GCA_000222935.1"}] + mock_parse.return_value = {"genbankAccession": "GCA_000222935.1"} + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_called_once() + written_parsed = mock_write.call_args[0][0] + assert "GCA_000412225.1" in written_parsed + assert "GCA_000222935.1" in written_parsed + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "utils") + def test_no_write_when_version_not_found_in_ftp( + self, mock_utils, mock_find, mock_write, tmp_path + ): + """If the FTP listing does not include the missing version, skip silently.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.return_value = [{"accession": "GCA_000222935.2"}] + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_not_called() + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "utils") + def test_no_versions_returned_from_ftp( + self, mock_utils, mock_find, mock_parse, mock_write, tmp_path + ): + """If FTP returns an empty list, skip gracefully without parsing or writing.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.return_value = [] + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + } + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_parse.assert_not_called() + mock_write.assert_not_called() + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_partial_parse_failure_writes_successful_rows( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """If one entry fails to parse, the successfully parsed ones are still written.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + mock_find.side_effect = [ + [{"accession": "GCA_000222935.1"}, {"accession": "GCA_000222935.2"}], + [{"accession": "GCA_000412225.1"}, {"accession": "GCA_000412225.2"}], + ] + mock_parse.side_effect = [ + {"genbankAccession": "GCA_000222935.1"}, + ValueError("simulated parse failure"), + ] + + missing_json = self._write_missing_json(tmp_path, [ + { + "base_accession": "GCA_000222935", + "missing_version": 1, + "new_accession": "GCA_000222935.2", + }, + { + "base_accession": "GCA_000412225", + "missing_version": 1, + "new_accession": "GCA_000412225.2", + }, + ]) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_called_once() + written = mock_write.call_args[0][0] + assert "GCA_000222935.1" in written + assert "GCA_000412225.1" not in written + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "utils") + def test_empty_missing_json_no_op( + self, mock_utils, mock_find, mock_write, tmp_path + ): + """An empty missing_versions.json should not call write_to_tsv.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": str(tmp_path / "historical.tsv")} + ) + missing_json = self._write_missing_json(tmp_path, []) + backfill_missing_versions( + missing_json=missing_json, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + mock_write.assert_not_called() + mock_find.assert_not_called() + + +# --------------------------------------------------------------------------- +# TestBackfillMissingVersionsWrapper +# --------------------------------------------------------------------------- + +class TestBackfillMissingVersionsWrapper: + """backfill_missing_versions_wrapper delegates to the flow correctly.""" + + @patch.object(backfill_module, "backfill_missing_versions") + def test_delegates_to_flow(self, mock_flow, tmp_path): + """Wrapper locates missing_versions.json and passes it to the flow.""" + missing_json = tmp_path / "missing_versions.json" + missing_json.write_text("[]", encoding="utf-8") + + backfill_missing_versions_wrapper( + working_yaml=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + append=False, + ) + + mock_flow.assert_called_once_with( + missing_json=str(missing_json), + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + + def test_raises_when_missing_json_absent(self, tmp_path): + """Wrapper raises FileNotFoundError if missing_versions.json does not exist.""" + with pytest.raises(FileNotFoundError): + backfill_missing_versions_wrapper( + working_yaml=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + append=False, + ) + + +# --------------------------------------------------------------------------- +# TestBackfillMissingVersionsPlugin +# --------------------------------------------------------------------------- + +class TestBackfillMissingVersionsPlugin: + """plugin() returns a correctly configured Parser.""" + + def test_plugin_returns_parser(self): + result = backfill_module.plugin() + assert isinstance(result, Parser) + assert result.name == "BACKFILL_MISSING_VERSIONS" + assert result.func is backfill_missing_versions_wrapper From 5f79605f85bd24b42db881a4cf39c1e437171ae3 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 24 Apr 2026 09:41:38 -0700 Subject: [PATCH 2/2] Rename and reorganise Phase 1 files per Rich's review feedback --- ...remental.py => parse_assembly_versions.py} | 28 +++--- .../update_assembly_versions.py} | 86 ++++++----------- ...al_update.py => test_assembly_versions.py} | 96 ++++++------------- 3 files changed, 74 insertions(+), 136 deletions(-) rename flows/parsers/{update_historical_incremental.py => parse_assembly_versions.py} (94%) rename flows/{parsers/backfill_missing_versions.py => updaters/update_assembly_versions.py} (69%) rename tests/{test_incremental_update.py => test_assembly_versions.py} (88%) diff --git a/flows/parsers/update_historical_incremental.py b/flows/parsers/parse_assembly_versions.py similarity index 94% rename from flows/parsers/update_historical_incremental.py rename to flows/parsers/parse_assembly_versions.py index 3de0253..01243fd 100644 --- a/flows/parsers/update_historical_incremental.py +++ b/flows/parsers/parse_assembly_versions.py @@ -1,9 +1,11 @@ """Daily incremental updates to historical assembly records. -Identifies assembly versions newly superseded since the last run and appends them to assembly_historical.tsv. No NCBI fetches are required — data is copied directly from the previous assembly_current.tsv parse output. +Identifies assembly versions newly superseded since the last run and appends +them to assembly_historical.tsv. No NCBI fetches are required — data is +copied directly from the previous assembly_current.tsv parse output. Usage: - python -m flows.parsers.update_historical_incremental \\ + python -m flows.parsers.parse_assembly_versions \\ --input_path assembly_data_report.jsonl \\ --previous_tsv assembly_current.tsv.previous \\ --historical_tsv outputs/assembly_historical.tsv @@ -264,11 +266,11 @@ def print_missing_versions_warning(missing: list[dict]) -> None: if len(missing) > 5: print(f" ... and {len(missing) - 5} more") print("\n To backfill missing versions, run:") - print(" python -m flows.parsers.backfill_missing_versions") + print(" python -m flows.updaters.update_assembly_versions") @flow(log_prints=True) -def run_incremental_historical_update( +def parse_assembly_versions( new_jsonl: str, previous_tsv: str, historical_tsv: str, @@ -289,7 +291,7 @@ def run_incremental_historical_update( """ separator = "=" * 80 print(f"\n{separator}") - print("INCREMENTAL HISTORICAL UPDATE") + print("ASSEMBLY VERSION PARSE") print(f"{separator}\n") print("[1/3] Loading previous parsed results...") @@ -314,7 +316,7 @@ def run_incremental_historical_update( print(f"\n{separator}") print( - f"INCREMENTAL UPDATE COMPLETE " + f"ASSEMBLY VERSION PARSE COMPLETE " f"Superseded: {len(newly_superseded)} " f"Missing: {len(missing)}" ) @@ -327,7 +329,7 @@ def run_incremental_historical_update( } -def incremental_update_wrapper( +def parse_assembly_versions_wrapper( working_yaml: str, work_dir: str, append: bool, @@ -337,7 +339,7 @@ def incremental_update_wrapper( """Wrapper matching the fetch_parse_validate parser signature. Derives the previous TSV and historical TSV paths from work_dir and - delegates to run_incremental_historical_update. + delegates to parse_assembly_versions. Args: working_yaml (str): Path to the working YAML file (unused; accepted @@ -356,7 +358,7 @@ def incremental_update_wrapper( if len(paths) > 1: raise ValueError(f"More than one jsonl file found in {work_dir}") - results = run_incremental_historical_update( + results = parse_assembly_versions( new_jsonl=paths[0], previous_tsv=os.path.join(work_dir, "assembly_current.tsv.previous"), historical_tsv=os.path.join(work_dir, "assembly_historical.tsv"), @@ -372,8 +374,8 @@ def incremental_update_wrapper( def plugin() -> Parser: """Register the flow.""" return Parser( - name="UPDATE_HISTORICAL_INCREMENTAL", - func=incremental_update_wrapper, + name="PARSE_ASSEMBLY_VERSIONS", + func=parse_assembly_versions_wrapper, description="Daily incremental update of historical assembly records.", ) @@ -383,7 +385,7 @@ def plugin() -> Parser: [required(INPUT_PATH), required(PREVIOUS_TSV), required(HISTORICAL_TSV)], description="Daily incremental update of historical assembly records", ) - results = run_incremental_historical_update( + results = parse_assembly_versions( new_jsonl=args.input_path, previous_tsv=args.previous_tsv, historical_tsv=args.historical_tsv, @@ -398,4 +400,4 @@ def plugin() -> Parser: f" Action needed: {results['missing_versions_count']} missing versions." ) print(f" Written to: {missing_json_path}") - print(" Run: python -m flows.parsers.backfill_missing_versions") + print(" Run: python -m flows.updaters.update_assembly_versions") diff --git a/flows/parsers/backfill_missing_versions.py b/flows/updaters/update_assembly_versions.py similarity index 69% rename from flows/parsers/backfill_missing_versions.py rename to flows/updaters/update_assembly_versions.py index d153097..fb86f9d 100644 --- a/flows/parsers/backfill_missing_versions.py +++ b/flows/updaters/update_assembly_versions.py @@ -1,12 +1,12 @@ -"""Targeted backfill for assembly versions missing from historical records. +"""Fetch assembly versions missing from historical records. -Run this when the incremental updater reports assemblies whose previous +Run this when parse_assembly_versions reports assemblies whose previous version was absent from the previous parsed TSV. Fetches only the specified -missing versions from NCBI, parses them, and merges the result into the +missing versions from NCBI FTP, parses them, and merges the result into the existing assembly_historical.tsv. Usage: - python -m flows.parsers.backfill_missing_versions \\ + python -m flows.updaters.update_assembly_versions \\ --missing_json tmp/missing_versions.json \\ --yaml_path configs/assembly_historical.types.yaml \\ --work_dir tmp @@ -19,11 +19,10 @@ from typing import Optional from flows.lib import utils -from flows.lib.conditional_import import flow +from flows.lib.conditional_import import emit_event, flow from flows.lib.shared_args import WORK_DIR, YAML_PATH from flows.lib.shared_args import parse_args as _parse_args from flows.lib.shared_args import required -from flows.lib.utils import Parser from flows.parsers.parse_backfill_historical_versions import ( find_all_assembly_versions, parse_historical_version, @@ -35,7 +34,7 @@ MISSING_JSON = { "flags": ["-m", "--missing_json"], "keys": { - "help": "Path to the missing_versions.json produced by the incremental updater.", + "help": "Path to the missing_versions.json produced by parse_assembly_versions.", "type": str, }, } @@ -44,8 +43,8 @@ def load_missing_versions(missing_json: str) -> list[dict]: """Load the list of missing versions from a JSON file. - The file is written by run_incremental_historical_update when it - encounters assemblies with no matching previous version in the parsed TSV. + The file is written by parse_assembly_versions when it encounters + assemblies with no matching previous version in the parsed TSV. Args: missing_json (str): Path to missing_versions.json. @@ -81,7 +80,7 @@ def load_existing_historical(historical_tsv: str) -> dict[str, dict]: @flow(log_prints=True) -def backfill_missing_versions( +def update_assembly_versions( missing_json: str, yaml_path: str, work_dir: str = ".", @@ -91,11 +90,10 @@ def backfill_missing_versions( For each entry in missing_json, discovers all versions of that assembly via NCBI FTP, fetches metadata for the specific missing version, parses it through the standard GenomeHubs pipeline, and merges the result into the - existing assembly_historical.tsv. + existing assembly_historical.tsv. Emits a completion event on finish. Args: - missing_json (str): Path to missing_versions.json from the incremental - updater. + missing_json (str): Path to missing_versions.json from parse_assembly_versions. yaml_path (str): Path to assembly_historical.types.yaml. work_dir (str): Working directory for caches and output. """ @@ -105,6 +103,14 @@ def backfill_missing_versions( missing = load_missing_versions(missing_json) if not missing: print("No missing versions to backfill.") + emit_event( + event="update.assembly_versions.completed", + resource={ + "prefect.resource.id": f"update.assembly_versions.{work_dir}", + "prefect.resource.type": "assembly.versions", + }, + payload={"succeeded": 0, "failed": 0, "status": "no_op"}, + ) return historical_tsv = config.meta["file_name"] @@ -117,7 +123,7 @@ def backfill_missing_versions( separator = "=" * 80 print(f"\n{separator}") - print("MISSING VERSION BACKFILL") + print("ASSEMBLY VERSION UPDATE") print(f"{separator}") print(f" Missing entries to process: {total}") print(f" Merging into: {historical_tsv}") @@ -170,7 +176,7 @@ def backfill_missing_versions( write_to_tsv(parsed, config) print(f"\n{separator}") - print("MISSING VERSION BACKFILL COMPLETE") + print("ASSEMBLY VERSION UPDATE COMPLETE") print(f"{separator}") print(f" Succeeded: {succeeded}/{total}") if failed > 0: @@ -178,54 +184,22 @@ def backfill_missing_versions( print(f" Total records in {historical_tsv}: {len(parsed)}") print(f"{separator}\n") - -def backfill_missing_versions_wrapper( - working_yaml: str, - work_dir: str, - append: bool, - data_freeze_path: Optional[str] = None, - **kwargs, -) -> None: - """Wrapper matching the fetch_parse_validate parser signature. - - Locates missing_versions.json in work_dir and delegates to - backfill_missing_versions. - - Args: - working_yaml (str): Path to the working YAML file. - work_dir (str): Path to the working directory containing - missing_versions.json. - append (bool): Unused; accepted for pipeline compatibility. - data_freeze_path (str, optional): Unused; accepted for pipeline - compatibility. - **kwargs: Additional keyword arguments passed by the pipeline. - """ - missing_json_path = os.path.join(work_dir, "missing_versions.json") - if not Path(missing_json_path).exists(): - raise FileNotFoundError(f"No missing_versions.json found in {work_dir}") - - backfill_missing_versions( - missing_json=missing_json_path, - yaml_path=working_yaml, - work_dir=work_dir, - ) - - -def plugin() -> Parser: - """Register the flow.""" - return Parser( - name="BACKFILL_MISSING_VERSIONS", - func=backfill_missing_versions_wrapper, - description="Targeted backfill for assembly versions missing from historical records.", + emit_event( + event="update.assembly_versions.completed", + resource={ + "prefect.resource.id": f"update.assembly_versions.{work_dir}", + "prefect.resource.type": "assembly.versions", + }, + payload={"succeeded": succeeded, "failed": failed, "status": "success"}, ) if __name__ == "__main__": args = _parse_args( [required(MISSING_JSON), required(YAML_PATH), WORK_DIR], - description="Targeted backfill for assembly versions missing from historical records", + description="Fetch assembly versions missing from historical records", ) - backfill_missing_versions( + update_assembly_versions( missing_json=args.missing_json, yaml_path=args.yaml_path, work_dir=args.work_dir, diff --git a/tests/test_incremental_update.py b/tests/test_assembly_versions.py similarity index 88% rename from tests/test_incremental_update.py rename to tests/test_assembly_versions.py index 9d05642..8dbd37e 100644 --- a/tests/test_incremental_update.py +++ b/tests/test_assembly_versions.py @@ -1,13 +1,13 @@ -"""Tests for update_historical_incremental.py and backfill_missing_versions.py +"""Tests for parse_assembly_versions.py and update_assembly_versions.py Covers: - Loading and indexing previous parsed TSV results - Building superseded and missing-version records - Core supersession detection logic (superseded, missing-with-gap, new-series, v1-skip) - Appending to historical TSV with deduplication -- Incremental orchestrator flow behaviour -- Loading existing historical TSV (backfill helper) -- Backfill flow: version selection and TSV merge +- Parser orchestrator flow behaviour +- Loading existing historical TSV (updater helper) +- Updater flow: version selection and TSV merge """ import csv @@ -23,23 +23,20 @@ os.environ["SKIP_PREFECT"] = "true" -from flows.parsers import ( # noqa: E402 - backfill_missing_versions as backfill_module, - update_historical_incremental as incremental_module, -) from flows.lib.utils import Parser # noqa: E402 -from flows.parsers.backfill_missing_versions import ( # noqa: E402 - backfill_missing_versions, - backfill_missing_versions_wrapper, - load_existing_historical, -) -from flows.parsers.update_historical_incremental import ( # noqa: E402 +from flows.parsers import parse_assembly_versions as incremental_module # noqa: E402 +from flows.parsers.parse_assembly_versions import ( # noqa: E402 append_superseded_to_tsv, build_missing_version_record, build_superseded_row, identify_newly_superseded, load_previous_parsed_by_base, - run_incremental_historical_update, + parse_assembly_versions, +) +from flows.updaters import update_assembly_versions as backfill_module # noqa: E402 +from flows.updaters.update_assembly_versions import ( # noqa: E402 + load_existing_historical, + update_assembly_versions, ) @@ -296,12 +293,12 @@ def test_no_op_when_empty_list(self, tmp_path): # --------------------------------------------------------------------------- class TestIncrementalOrchestrator: - """run_incremental_historical_update orchestrator behaviour.""" + """parse_assembly_versions orchestrator behaviour.""" def test_no_previous_tsv_returns_empty_result(self, tmp_path): jsonl = tmp_path / "new.jsonl" write_jsonl(jsonl, [{"accession": "GCA_000222935.2"}]) - result = run_incremental_historical_update( + result = parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(tmp_path / "nope.tsv"), historical_tsv=str(tmp_path / "historical.tsv"), @@ -319,7 +316,7 @@ def test_one_superseded_produces_correct_counts(self, tmp_path): write_jsonl(jsonl, [ {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} ]) - result = run_incremental_historical_update( + result = parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(previous_tsv), historical_tsv=str(tmp_path / "historical.tsv"), @@ -337,7 +334,7 @@ def test_missing_version_detected_in_orchestrator_result(self, tmp_path): write_jsonl(jsonl, [ {"accession": "GCA_000222935.3", "releaseDate": "2024-06-01"} ]) - result = run_incremental_historical_update( + result = parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(previous_tsv), historical_tsv=str(tmp_path / "historical.tsv"), @@ -356,7 +353,7 @@ def test_historical_tsv_written(self, tmp_path): {"accession": "GCA_000222935.2", "releaseDate": "2024-01-15"} ]) historical_tsv = tmp_path / "historical.tsv" - run_incremental_historical_update( + parse_assembly_versions( new_jsonl=str(jsonl), previous_tsv=str(previous_tsv), historical_tsv=str(historical_tsv), @@ -403,7 +400,7 @@ def test_row_data_preserved(self, tmp_path): # --------------------------------------------------------------------------- class TestBackfillMissingVersionsFlow: - """backfill_missing_versions selects the right version and merges into TSV.""" + """update_assembly_versions selects the right version and merges into TSV.""" def _write_missing_json(self, tmp_path, entries): path = tmp_path / "missing.json" @@ -435,7 +432,7 @@ def test_correct_version_selected( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -469,7 +466,7 @@ def test_merges_with_existing_historical( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -498,7 +495,7 @@ def test_no_write_when_version_not_found_in_ftp( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -525,7 +522,7 @@ def test_no_versions_returned_from_ftp( "new_accession": "GCA_000222935.2", } ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -565,7 +562,7 @@ def test_partial_parse_failure_writes_successful_rows( "new_accession": "GCA_000412225.2", }, ]) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -586,7 +583,7 @@ def test_empty_missing_json_no_op( meta={"file_name": str(tmp_path / "historical.tsv")} ) missing_json = self._write_missing_json(tmp_path, []) - backfill_missing_versions( + update_assembly_versions( missing_json=missing_json, yaml_path=str(tmp_path / "config.yaml"), work_dir=str(tmp_path), @@ -596,49 +593,14 @@ def test_empty_missing_json_no_op( # --------------------------------------------------------------------------- -# TestBackfillMissingVersionsWrapper -# --------------------------------------------------------------------------- - -class TestBackfillMissingVersionsWrapper: - """backfill_missing_versions_wrapper delegates to the flow correctly.""" - - @patch.object(backfill_module, "backfill_missing_versions") - def test_delegates_to_flow(self, mock_flow, tmp_path): - """Wrapper locates missing_versions.json and passes it to the flow.""" - missing_json = tmp_path / "missing_versions.json" - missing_json.write_text("[]", encoding="utf-8") - - backfill_missing_versions_wrapper( - working_yaml=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - append=False, - ) - - mock_flow.assert_called_once_with( - missing_json=str(missing_json), - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - - def test_raises_when_missing_json_absent(self, tmp_path): - """Wrapper raises FileNotFoundError if missing_versions.json does not exist.""" - with pytest.raises(FileNotFoundError): - backfill_missing_versions_wrapper( - working_yaml=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - append=False, - ) - - -# --------------------------------------------------------------------------- -# TestBackfillMissingVersionsPlugin +# TestParseAssemblyVersionsPlugin # --------------------------------------------------------------------------- -class TestBackfillMissingVersionsPlugin: +class TestParseAssemblyVersionsPlugin: """plugin() returns a correctly configured Parser.""" def test_plugin_returns_parser(self): - result = backfill_module.plugin() + result = incremental_module.plugin() assert isinstance(result, Parser) - assert result.name == "BACKFILL_MISSING_VERSIONS" - assert result.func is backfill_missing_versions_wrapper + assert result.name == "PARSE_ASSEMBLY_VERSIONS" + assert result.func is incremental_module.parse_assembly_versions_wrapper