From 8797ac108e7e72ecc63b45ac372615b755680db9 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Thu, 18 Dec 2025 20:20:48 -0700 Subject: [PATCH 01/14] one-time backfill process to populate historical assembly versions. --- configs/assembly_historical.yaml | 199 ++++++++ flows/lib/utils.py | 10 + flows/parsers/backfill_historical_versions.py | 474 ++++++++++++++++++ flows/parsers/parse_ncbi_assemblies.py | 177 ++++++- tests/README_test_backfill.md | 59 +++ tests/test_backfill.py | 214 ++++++++ tests/test_data/assembly_test_sample.jsonl | 3 + 7 files changed, 1113 insertions(+), 23 deletions(-) create mode 100644 configs/assembly_historical.yaml create mode 100644 flows/parsers/backfill_historical_versions.py create mode 100644 tests/README_test_backfill.md create mode 100644 tests/test_backfill.py create mode 100644 tests/test_data/assembly_test_sample.jsonl diff --git a/configs/assembly_historical.yaml b/configs/assembly_historical.yaml new file mode 100644 index 0000000..5e11a61 --- /dev/null +++ b/configs/assembly_historical.yaml @@ -0,0 +1,199 @@ +# Configuration for historical assembly version backfill +# This config defines the schema for assembly_historical.tsv +# which contains all superseded assembly versions + +attributes: + assembly_level: + header: assemblyLevel + path: assemblyInfo.assemblyLevel + assembly_span: + header: totalSequenceLength + path: assemblyStats.totalSequenceLength + assigned_percent: + header: assignedProportion + path: processedAssemblyStats.assignedProportion + assembly_status: + header: primaryValue + path: processedAssemblyInfo.primaryValue + translate: + "1": primary + assembly_type: + header: assemblyType + path: assemblyInfo.assemblyType + bioproject: + header: bioProjectAccession + path: assemblyInfo.bioprojectLineage.bioprojects.accession + separator: + - "," + biosample: + header: biosampleAccession + path: assemblyInfo.biosample.accession + separator: + - "," + chromosome_count: + header: totalNumberOfChromosomes + path: assemblyStats.totalNumberOfChromosomes + contig_count: + header: numberOfContigs + path: assemblyStats.numberOfContigs + contig_l50: + header: contigL50 + path: assemblyStats.contigL50 + contig_n50: + header: contigN50 + path: assemblyStats.contigN50 + ebp_standard_criteria: + header: ebpStandardCriteria + path: processedAssemblyStats.ebpStandardCriteria + separator: + - "," + ebp_standard_date: + header: ebpStandardDate + path: processedAssemblyStats.ebpStandardDate + gc_percent: + header: gcPercent + path: assemblyStats.gcPercent + gene_count: + header: geneCountTotal + path: annotationInfo.stats.geneCounts.total + gene_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + isolate: + header: isolate + path: assemblyInfo.biosample.attributes.name==isolate.value + last_updated: + header: releaseDate + path: assemblyInfo.releaseDate + mitochondrion_accession: + header: mitochondrionAccession + path: processedOrganelleInfo.mitochondrion.accession + separator: + - ; + mitochondrion_assembly_span: + header: mitochondrionAssemblySpan + path: processedOrganelleInfo.mitochondrion.assemblySpan + mitochondrion_gc_percent: + header: mitochondrionGcPercent + path: processedOrganelleInfo.mitochondrion.gcPercent + mitochondrion_scaffolds: + header: mitochondrionScaffolds + path: processedOrganelleInfo.mitochondrion.scaffolds + separator: + - ; + noncoding_gene_count: + header: geneCountNoncoding + path: annotationInfo.stats.geneCounts.nonCoding + noncoding_gene_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + plastid_accession: + header: plastidAccession + path: processedOrganelleInfo.plastid.accession + separator: + - ; + plastid_assembly_span: + header: plastidAssemblySpan + path: processedOrganelleInfo.plastid.assemblySpan + plastid_gc_percent: + header: plastidGcPercent + path: processedOrganelleInfo.plastid.gcPercent + plastid_scaffolds: + header: plastidScaffolds + path: processedOrganelleInfo.plastid.scaffolds + separator: + - ; + protein_count: + header: geneCountProteincoding + path: annotationInfo.stats.geneCounts.proteinCoding + protein_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + pseudogene_count: + header: geneCountPseudogene + path: annotationInfo.stats.geneCounts.pseudogene + pseudogene.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + refseq_category: + header: refseqCategory + path: assemblyInfo.refseqCategory + sample_sex: + header: sex + path: assemblyInfo.biosample.attributes.name==sex.value + scaffold_count: + header: numberOfScaffolds + path: assemblyStats.numberOfScaffolds + scaffold_l50: + header: scaffoldL50 + path: assemblyStats.scaffoldL50 + scaffold_n50: + header: scaffoldN50 + path: assemblyStats.scaffoldN50 + sequence_count: + header: numberOfComponentSequences + path: assemblyStats.numberOfComponentSequences + submitter: + header: submitter + path: assemblyInfo.submitter + ungapped_span: + header: totalUngappedLength + path: assemblyStats.totalUngappedLength + # NEW: Version-specific field to indicate this is a historical/superseded version + version_status: + header: versionStatus + path: processedAssemblyInfo.versionStatus + +file: + display_group: general + exclusions: + attributes: + - bioproject + - biosample + identifiers: + - assembly_id + taxonomy: + - taxon_id + format: tsv + header: true + name: ../outputs/assembly_historical.tsv + source: INSDC + source_url_stub: https://www.ncbi.nlm.nih.gov/assembly/ + +identifiers: + assembly_id: + header: assemblyID + path: processedAssemblyInfo.assemblyID + assembly_name: + header: assemblyName + path: assemblyInfo.assemblyName + genbank_accession: + header: genbankAccession + path: processedAssemblyInfo.genbankAccession + refseq_accession: + header: refseqAccession + path: processedAssemblyInfo.refseqAccession + wgs_accession: + header: wgsProjectAccession + path: wgsInfo.wgsProjectAccession + +metadata: + is_primary_value: + header: primaryValue + path: processedAssemblyInfo.primaryValue + source_slug: + header: genbankAccession + path: processedAssemblyInfo.genbankAccession + +names: + common_name: + header: commonName + path: organism.commonName + +taxonomy: + taxon: + header: organismName + path: organism.organismName + taxon_id: + header: taxId + path: organism.taxId diff --git a/flows/lib/utils.py b/flows/lib/utils.py index 2d9252e..7dc72f3 100644 --- a/flows/lib/utils.py +++ b/flows/lib/utils.py @@ -191,6 +191,16 @@ def convert_keys_to_camel_case(data: dict) -> dict: return converted_data +def parse_s3_file(s3_path: str) -> dict: + """ + Parse S3 file path (stub for compatibility). + + This function is imported by parse_ncbi_assemblies but not used in backfill. + Returns empty dict as placeholder. + """ + return {} + + def set_organelle_name(seq: dict) -> Optional[str]: """ Determines the organelle type (mitochondrion or plastid) based on the assigned diff --git a/flows/parsers/backfill_historical_versions.py b/flows/parsers/backfill_historical_versions.py new file mode 100644 index 0000000..c608064 --- /dev/null +++ b/flows/parsers/backfill_historical_versions.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +""" +One-time historical backfill process for assembly versions. + +This script discovers and parses ALL superseded versions from NCBI for assemblies +that currently have version > 1. It should be run ONCE before starting the daily +incremental pipeline. + +Process: +1. Scan input JSONL for assemblies with version > 1 +2. Discover all versions via NCBI FTP directory listing (with caching) +3. Fetch metadata for each version via datasets command (with caching) +4. Parse using Rich's existing parser (includes sequence reports + metrics) +5. Write to assembly_historical.tsv with version_status="superseded" + +Caching: +- Version discovery cached 7 days (FTP queries) +- Individual metadata cached 30 days (datasets queries) +- On re-run: only fetches NEW assemblies + +Usage: + python flows/parsers/backfill_historical_versions.py \\ + --input flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \\ + --config configs/assembly_historical.yaml \\ + --checkpoint tmp/backfill_checkpoint.json +""" + +import hashlib +import json +import os +import re +import subprocess +import time +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Tuple + +import requests +from genomehubs import utils as gh_utils + +from flows.lib import utils +from flows.lib.conditional_import import task + + +# ============================================================================= +# Cache Management (from DToL prototype - proven to work) +# ============================================================================= + +def setup_cache_directories(): + """Create cache directory structure.""" + cache_dirs = [ + "tmp/backfill_cache/version_discovery", + "tmp/backfill_cache/metadata" + ] + for cache_dir in cache_dirs: + os.makedirs(cache_dir, exist_ok=True) + + +def get_cache_path(cache_type: str, identifier: str) -> str: + """Generate cache file path for given type and identifier.""" + safe_id = hashlib.md5(identifier.encode()).hexdigest()[:16] + return f"tmp/backfill_cache/{cache_type}/{identifier}_{safe_id}.json" + + +def load_from_cache(cache_path: str, max_age_days: int = 30) -> Dict: + """Load data from cache if it exists and is recent enough.""" + try: + if os.path.exists(cache_path): + cache_age = time.time() - os.path.getmtime(cache_path) + if cache_age < (max_age_days * 24 * 3600): + with open(cache_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f" Warning: Could not load cache from {cache_path}: {e}") + return {} + + +def save_to_cache(cache_path: str, data: Dict): + """Save data to cache file.""" + try: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f" Warning: Could not save cache to {cache_path}: {e}") + + +# ============================================================================= +# Version Discovery via FTP (KEY DIFFERENCE from Rich's parser) +# ============================================================================= + +def find_all_assembly_versions(base_accession: str) -> List[Dict]: + """ + Find all versions of an assembly by examining NCBI FTP structure. + + This is the KEY difference from Rich's parser: + - Rich's parser: Gets latest versions from input JSONL + - This function: Discovers ALL versions (including historical) via FTP + + Args: + base_accession: Full accession (e.g., GCA_000002035.3) + + Returns: + List of dicts with full NCBI metadata for each version + """ + # Extract base (e.g., GCA_000002035 from GCA_000002035.3) + base_match = re.match(r'(GC[AF]_\d+)', base_accession) + if not base_match: + return [] + + base = base_match.group(1) + + # Check version discovery cache first + setup_cache_directories() + version_cache_path = get_cache_path("version_discovery", base) + cached_data = load_from_cache(version_cache_path, max_age_days=7) + + if cached_data and 'versions' in cached_data: + print(f" Using cached version data for {base}") + return cached_data['versions'] + + print(f" Discovering versions for {base} via FTP") + versions = [] + + try: + # Construct FTP URL + # Example: https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/000/002/035/ + ftp_url = f"https://ftp.ncbi.nlm.nih.gov/genomes/all/{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" + + # Get directory listing + response = requests.get(ftp_url, timeout=30) + if response.status_code != 200: + print(f" Warning: FTP query failed for {base}") + return [] + + # Parse HTML for version directories (e.g., GCA_000002035.1, GCA_000002035.2, etc.) + version_pattern = rf"{base}\.\d+" + found_versions = re.findall(version_pattern, response.text) + unique_versions = sorted(list(set(found_versions))) + + # Fetch metadata for each version + for version_acc in unique_versions: + metadata_cache_path = get_cache_path("metadata", version_acc) + cached_metadata = load_from_cache(metadata_cache_path, max_age_days=30) + + if cached_metadata and 'metadata' in cached_metadata: + versions.append(cached_metadata['metadata']) + continue + + # Fetch from NCBI datasets + try: + cmd = ["datasets", "summary", "genome", "accession", version_acc, "--as-json-lines"] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding='utf-8', + errors='ignore', # Handle Unicode gracefully + timeout=60 + ) + + if result.returncode == 0 and result.stdout and result.stdout.strip(): + version_data = json.loads(result.stdout.strip()) + versions.append(version_data) + save_to_cache(metadata_cache_path, {'metadata': version_data, 'cached_at': time.time()}) + else: + print(f" Warning: No metadata for {version_acc}") + + except Exception as e: + print(f" Warning: Error fetching {version_acc}: {e}") + continue + + # Cache the complete version discovery + cache_data = { + 'versions': versions, + 'base_accession': base, + 'discovered_at': time.time(), + 'ftp_url': ftp_url + } + save_to_cache(version_cache_path, cache_data) + + return versions + + except Exception as e: + print(f" Error discovering versions for {base}: {e}") + return [] + + +# ============================================================================= +# Parsing with Rich's Existing Functions +# ============================================================================= + +def parse_historical_version( + version_data: Dict, + config: utils.Config, + base_accession: str, + version_num: int, + current_accession: str +) -> Dict: + """ + Parse historical version using Rich's EXACT parser logic. + + This ensures consistency with current assemblies by: + - Using process_assembly_report() with version_status="superseded" + - Fetching sequence reports via fetch_and_parse_sequence_report() + - Computing all metrics identically to current parser + + Args: + version_data: Raw NCBI metadata from datasets command + config: Config object from YAML + base_accession: Base accession (e.g., GCA_000002035) + version_num: Version number (1, 2, 3, etc.) + current_accession: Latest version that superseded this one + + Returns: + Parsed assembly dict ready for TSV output + """ + from flows.parsers.parse_ncbi_assemblies import ( + fetch_and_parse_sequence_report, + process_assembly_report + ) + + # Convert keys to camelCase (Rich's standard) + version_data = utils.convert_keys_to_camel_case(version_data) + + # Process with Rich's parser (version_status="superseded") + processed_report = process_assembly_report( + report=version_data, + previous_report=None, + config=config, + parsed={}, + version_status="superseded" + ) + + # Fetch sequence reports (chromosomes, organelles, etc.) - Rich's critical step + fetch_and_parse_sequence_report(processed_report) + + # Set assemblyID in standard format (e.g., GCA_000222935_1) + # The versionStatus field already indicates this is "superseded" + processed_report["processedAssemblyInfo"]["assemblyID"] = f"{base_accession}_{version_num}" + + # Parse into TSV row format using Rich's parse functions + row = gh_utils.parse_report_values(config.parse_fns, processed_report) + + return row + + +def parse_version(accession: str) -> int: + """Extract version number from accession.""" + parts = accession.split('.') + return int(parts[1]) if len(parts) > 1 else 1 + + +def parse_accession(accession: str) -> Tuple[str, int]: + """Parse accession into base and version number.""" + parts = accession.split('.') + base = parts[0] + version = int(parts[1]) if len(parts) > 1 else 1 + return base, version + + +# ============================================================================= +# Checkpoint Management +# ============================================================================= + +def load_checkpoint(checkpoint_file: str) -> Dict: + """Load checkpoint data if exists.""" + if Path(checkpoint_file).exists(): + with open(checkpoint_file) as f: + return json.load(f) + return {} + + +def save_checkpoint(checkpoint_file: str, processed_count: int): + """Save checkpoint data.""" + Path(checkpoint_file).parent.mkdir(parents=True, exist_ok=True) + with open(checkpoint_file, 'w') as f: + json.dump({ + 'processed_count': processed_count, + 'timestamp': datetime.now().isoformat() + }, f, indent=2) + + +# ============================================================================= +# Main Backfill Logic +# ============================================================================= + +def identify_assemblies_needing_backfill(input_jsonl: str) -> List[Dict]: + """ + Identify assemblies with version > 1 that need historical backfill. + + Args: + input_jsonl: Path to assembly_data_report.jsonl + + Returns: + List of assembly info dicts needing backfill + """ + assemblies_needing_backfill = [] + + with open(input_jsonl) as f: + for line in f: + assembly = json.loads(line) + accession = assembly['accession'] + base_acc, version = parse_accession(accession) + + if version > 1: + assemblies_needing_backfill.append({ + 'base_accession': base_acc, + 'current_version': version, + 'current_accession': accession, + 'historical_versions_needed': list(range(1, version)) + }) + + return assemblies_needing_backfill + + +@task(log_prints=True) +def backfill_historical_versions( + input_jsonl: str, + config_yaml: str, + checkpoint_file: str = 'tmp/backfill_checkpoint.json' +): + """ + One-time backfill of all historical assembly versions. + + Process: + 1. Identify assemblies with version > 1 + 2. Discover all versions via FTP (cached) + 3. Fetch metadata via datasets (cached) + 4. Parse with Rich's parser + 5. Write to assembly_historical.tsv + + Args: + input_jsonl: Path to assembly_data_report.jsonl + config_yaml: Path to assembly_historical.yaml + checkpoint_file: Path for checkpoint data + """ + # Setup + setup_cache_directories() + config = utils.load_config(config_file=config_yaml) + + # Identify assemblies needing backfill + print("Scanning for assemblies needing historical backfill...") + assemblies_needing_backfill = identify_assemblies_needing_backfill(input_jsonl) + + if not assemblies_needing_backfill: + print("No assemblies with version > 1 found. Nothing to backfill.") + return + + # Load checkpoint + checkpoint = load_checkpoint(checkpoint_file) + start_index = checkpoint.get('processed_count', 0) + + total_assemblies = len(assemblies_needing_backfill) + total_versions = sum(len(a['historical_versions_needed']) for a in assemblies_needing_backfill) + + print(f"\n{'='*80}") + print("ONE-TIME HISTORICAL BACKFILL") + print(f"{'='*80}") + print(f" Assemblies to process: {total_assemblies}") + print(f" Total historical versions: {total_versions}") + + if start_index > 0: + print(f" Resuming from checkpoint: {start_index}/{total_assemblies}") + + print(f"{'='*80}\n") + + parsed = {} + processed = start_index + + for assembly_info in assemblies_needing_backfill[start_index:]: + base_acc = assembly_info['base_accession'] + current_version = assembly_info['current_version'] + current_accession = assembly_info['current_accession'] + + print(f"[{processed+1}/{total_assemblies}] {base_acc} (current: v{current_version})") + + # Discover all versions via FTP (uses cache) + all_versions = find_all_assembly_versions(current_accession) + + if not all_versions: + print(f" Warning: No versions found via FTP") + processed += 1 + continue + + # Parse each historical version (skip current) + for version_data in all_versions: + version_acc = version_data.get('accession', '') + version_num = parse_version(version_acc) + + # Only process historical versions + if version_num >= current_version: + continue + + try: + print(f" Parsing v{version_num}...", end=' ', flush=True) + + # Parse using Rich's parser + row = parse_historical_version( + version_data=version_data, + config=config, + base_accession=base_acc, + version_num=version_num, + current_accession=current_accession + ) + + # Add to parsed dict (keyed by genbank accession) + genbank_acc = row.get('genbankAccession', version_acc) + parsed[genbank_acc] = row + + print("✓") + + except Exception as e: + print(f"✗ ({e})") + continue + + processed += 1 + + # Checkpoint every 100 assemblies + if processed % 100 == 0: + print(f"\n→ Checkpoint: Writing batch to disk...") + gh_utils.write_tsv(parsed, config.headers, config.meta) + save_checkpoint(checkpoint_file, processed) + parsed = {} + print(f"→ Progress: {processed}/{total_assemblies} ({processed/total_assemblies*100:.1f}%)\n") + + # Final write + if parsed: + print(f"\n→ Writing final batch...") + gh_utils.write_tsv(parsed, config.headers, config.meta) + + # Final report + print(f"\n{'='*80}") + print("BACKFILL COMPLETE") + print(f"{'='*80}") + print(f" Processed: {processed}/{total_assemblies} assemblies") + print(f" Output: {config.meta['file_name']}") + print(f"\n Next step: Run daily incremental pipeline") + print(f"{'='*80}\n") + + +# ============================================================================= +# Main Entry Point +# ============================================================================= + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser( + description='One-time historical backfill for assembly versions' + ) + parser.add_argument( + '--input', + required=True, + help='Input JSONL file (assembly_data_report.jsonl)' + ) + parser.add_argument( + '--config', + required=True, + help='Config YAML file (assembly_historical.yaml)' + ) + parser.add_argument( + '--checkpoint', + default='tmp/backfill_checkpoint.json', + help='Checkpoint file for resuming' + ) + + args = parser.parse_args() + + backfill_historical_versions( + input_jsonl=args.input, + config_yaml=args.config, + checkpoint_file=args.checkpoint + ) diff --git a/flows/parsers/parse_ncbi_assemblies.py b/flows/parsers/parse_ncbi_assemblies.py index 84cdb0d..c7ab5e1 100644 --- a/flows/parsers/parse_ncbi_assemblies.py +++ b/flows/parsers/parse_ncbi_assemblies.py @@ -1,24 +1,16 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import json import os +import re import subprocess -import sys from collections import defaultdict from glob import glob -from os.path import abspath, dirname from typing import Generator, Optional from genomehubs import utils as gh_utils -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.lib import utils # noqa: E402 from flows.lib.conditional_import import flow, run_count, task # noqa: E402 -from flows.lib.utils import Config, Parser # noqa: E402 +from flows.lib.utils import Config, Parser, parse_s3_file # noqa: E402 from flows.parsers.args import parse_args # noqa: E402 @@ -53,7 +45,9 @@ def fetch_ncbi_datasets_sequences( Yields: dict: The sequence report data as a JSON object, one line at a time. """ - result = subprocess.run( + if not utils.is_safe_path(accession): + raise ValueError(f"Unsafe accession: {accession}") + result = utils.run_quoted( [ "datasets", "summary", @@ -76,6 +70,28 @@ def fetch_ncbi_datasets_sequences( yield json.loads(line) +def is_atypical_assembly(report: dict, parsed: dict) -> bool: + """ + Check if an assembly is atypical. + + Args: + report (dict): A dictionary containing the assembly information. + parsed (dict): A dictionary containing parsed data. + + Returns: + bool: True if the assembly is atypical, False otherwise. + """ + if "assemblyInfo" not in report: + return True + if report["assemblyInfo"].get("atypical", {}).get("isAtypical", False): + # delete from parsed if present + accession = report["accession"] + if accession in parsed: + del parsed[accession] + return True + return False + + def process_assembly_report( report: dict, previous_report: Optional[dict], config: Config, parsed: dict ) -> dict: @@ -98,6 +114,9 @@ def process_assembly_report( Returns: dict: The updated report dictionary. """ + # Uncomment to filter atypical assemblies + # if is_atypical_assembly(report, parsed): + # return None processed_report = {**report, "processedAssemblyInfo": {"organelle": "nucleus"}} if "pairedAccession" in report: if processed_report["pairedAccession"].startswith("GCF_"): @@ -181,7 +200,6 @@ def fetch_and_parse_sequence_report(data: dict): chromosomes.append(seq) except subprocess.TimeoutExpired: print(f"ERROR: Timeout fetching sequence report for {accession}") - print(chromosomes) return utils.add_organelle_entries(data, organelles) utils.check_ebp_criteria(data, span, chromosomes, assigned_span) @@ -216,6 +234,8 @@ def add_report_to_parsed_reports( continue linked_row = parsed[acc] if accession not in linked_row["linkedAssembly"]: + if not isinstance(linked_row["linkedAssembly"], list): + linked_row["linkedAssembly"] = [] linked_row["linkedAssembly"].append(accession) if acc not in row["linkedAssembly"]: row["linkedAssembly"].append(acc) @@ -333,21 +353,108 @@ def process_assembly_reports( None """ for report in parse_assembly_report(jsonl_path=jsonl_path): - processed_report = process_assembly_report( - report, previous_report, config, parsed + try: + print(f"Processing report for {report.get('accession', 'unknown')}") + processed_report = process_assembly_report( + report, previous_report, config, parsed + ) + if processed_report is None or use_previous_report( + processed_report, parsed, config + ): + continue + fetch_and_parse_sequence_report(processed_report) + append_features(processed_report, config) + add_report_to_parsed_reports(parsed, processed_report, config, biosamples) + if previous_report is not None: + previous_report = processed_report + except Exception as e: + print( + ( + f"Error processing report for " + f"{report.get('accession', 'unknown')}: " + f"{e} (line {e.__traceback__.tb_lineno})" + ) + ) + continue + + +@task(log_prints=True) +def parse_data_freeze_file(data_freeze_path: str) -> dict: + """ + Fetch and parse a 2-column TSV with the data freeze list of assemblies and their + respective status from the given S3 path. + + Args: + data_freeze_path (str): The S3 path to the data freeze list TSV file. + Returns: + dict: A dictionary mapping assembly accessions to their freeze subsets. + + """ + # from s3 to temporary file + print(f"Fetching data freeze file from {data_freeze_path}") + data_freeze = parse_s3_file(data_freeze_path) + print(f"Parsed {len(data_freeze)} entries from data freeze file") + return data_freeze + + +@task() +def set_data_freeze_default(parsed: dict, data_freeze_name: str): + """ + Set the default data freeze information for all assemblies. + + Args: + parsed (dict): A dictionary containing parsed data. + data_freeze_name (str): The name of the default data freeze. + """ + for line in parsed.values(): + line["dataFreeze"] = [data_freeze_name] + line["assemblyID"] = line["genbankAccession"] + + +@task(log_prints=True) +def process_datafreeze_info(processed_report: dict, data_freeze: dict, config: Config): + """ + Process the data freeze information for a given assembly report. + Rename the assembly + + Args: + processed_report (dict): A dictionary containing processed assembly data. + data_freeze (dict): A dictionary containing data freeze information. + """ + data_freeze_name = ( + re.sub(r"\.tsv(\.gz)?$", "", os.path.basename(config.meta["file_name"])) + if config.meta["file_name"] + else "data_freeze" + ) + print(f"Processing data freeze info for {data_freeze_name}") + for line in processed_report.values(): + print( + f"Processing data freeze info for {line['refseqAccession']} - " + f"{line['genbankAccession']}" + ) + status = data_freeze.get(line["refseqAccession"]) or data_freeze.get( + line["genbankAccession"] ) - if use_previous_report(processed_report, parsed, config): + if not status: continue - fetch_and_parse_sequence_report(processed_report) - append_features(processed_report, config) - add_report_to_parsed_reports(parsed, processed_report, config, biosamples) - if previous_report is not None: - previous_report = processed_report + line["dataFreeze"] = status + + accession_name = ( + line["refseqAccession"] + if line["refseqAccession"] in data_freeze + else line["genbankAccession"] + ) + line["assemblyID"] = f"{accession_name}_{data_freeze_name}" @flow(log_prints=True) def parse_ncbi_assemblies( - input_path: str, yaml_path: str, append: bool, feature_file: Optional[str] = None + input_path: str, + yaml_path: str, + append: bool, + feature_file: Optional[str] = None, + data_freeze_path: Optional[str] = None, + **kwargs, ): """ Parse NCBI datasets assembly data. @@ -357,6 +464,8 @@ def parse_ncbi_assemblies( yaml_path (str): Path to the YAML configuration file. append (bool): Flag to append values to an existing TSV file(s). feature_file (str): Path to the feature file. + data_freeze_path (str): Path to data freeze list TSV on S3. + **kwargs: Additional keyword arguments. """ config = utils.load_config( config_file=yaml_path, @@ -365,22 +474,39 @@ def parse_ncbi_assemblies( ) if feature_file is not None: set_up_feature_file(config) + biosamples = {} parsed = {} previous_report = {} if append else None process_assembly_reports(input_path, config, biosamples, parsed, previous_report) set_representative_assemblies(parsed, biosamples) + + if data_freeze_path is None: + set_data_freeze_default(parsed, data_freeze_name="latest") + else: + data_freeze = parse_data_freeze_file( + data_freeze_path + ) # This returns the data freeze dictionary + process_datafreeze_info(parsed, data_freeze, config) write_to_tsv(parsed, config) def parse_ncbi_assemblies_wrapper( - working_yaml: str, work_dir: str, append: bool + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, ) -> None: """ Wrapper function to parse the NCBI assemblies JSONL file. Args: working_yaml (str): Path to the working YAML file. + work_dir (str): Path to the working directory. + append (bool): Whether to append to the existing TSV file. + data_freeze_path (str, optional): Path to a data freeze list TSV on S3. + **kwargs: Additional keyword arguments. """ # use glob to find the jsonl file in the working directory glob_path = os.path.join(work_dir, "*.jsonl") @@ -391,7 +517,12 @@ def parse_ncbi_assemblies_wrapper( # rais error if more than one jsonl file is found if len(paths) > 1: raise ValueError(f"More than one jsonl file found in {work_dir}") - parse_ncbi_assemblies(input_path=paths[0], yaml_path=working_yaml, append=append) + parse_ncbi_assemblies( + input_path=paths[0], + yaml_path=working_yaml, + append=append, + data_freeze_path=data_freeze_path, + ) def plugin(): diff --git a/tests/README_test_backfill.md b/tests/README_test_backfill.md new file mode 100644 index 0000000..5986e13 --- /dev/null +++ b/tests/README_test_backfill.md @@ -0,0 +1,59 @@ +# Testing Guide for Historical Assembly Backfill + +This guide explains how to test the backfill implementation before creating a PR. + +## Prerequisites + +### 1. Install Dependencies + +```bash +# Install genomehubs and other dependencies +pip install -r requirements.txt + +# Or if using conda: +conda install -c conda-forge genomehubs>=2.10.14 +pip install prefect requests +``` + +### 2. Install NCBI datasets CLI + +The backfill script requires the NCBI `datasets` command-line tool. + +```bash +# Download and install from: +# https://www.ncbi.nlm.nih.gov/datasets/docs/v2/download-and-install/ + +# Verify installation: +datasets --version +``` + +## Test Suite + +### Quick Unit Tests + +Run the automated test suite: + +```bash +cd "c:\Users\fchen13\ASU Dropbox\Fang Chen\Work Documents\EBP\genomehubs-data" +python tests/test_backfill.py +``` + +### Full Backfill Test (Small Dataset) + +Test the complete backfill process on 3 assemblies: + +```bash +export SKIP_PREFECT=true +python flows/parsers/backfill_historical_versions.py + --input tests/test_data/assembly_test_sample.jsonl + --config configs/assembly_historical.yaml + --checkpoint tests/test_data/test_checkpoint.json + +``` + +```powershell +$env:SKIP_PREFECT="true" +$env:PYTHONPATH="." +python flows/parsers/backfill_historical_versions.py --input tests/test_data/assembly_test_sample.jsonl --config configs/assembly_historical.yaml --checkpoint tests/test_data/test_checkpoint.json +``` + diff --git a/tests/test_backfill.py b/tests/test_backfill.py new file mode 100644 index 0000000..d9d6c6c --- /dev/null +++ b/tests/test_backfill.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +Test script for backfill_historical_versions.py + +This script tests the historical version backfill process on a small sample dataset. + +Usage: + python tests/test_backfill.py +""" + +import os +import sys +import json +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from flows.parsers.backfill_historical_versions import ( + identify_assemblies_needing_backfill, + parse_accession, + find_all_assembly_versions +) + +def test_parse_accession(): + """Test accession parsing.""" + print("\n" + "="*80) + print("TEST 1: Accession Parsing") + print("="*80) + + test_cases = [ + ("GCA_000222935.2", ("GCA_000222935", 2)), + ("GCA_003706615.3", ("GCA_003706615", 3)), + ("GCF_000001405.39", ("GCF_000001405", 39)), + ] + + for accession, expected in test_cases: + result = parse_accession(accession) + status = "PASS" if result == expected else "FAIL" + print(f" {status} {accession} -> {result}") + assert result == expected, f"Expected {expected}, got {result}" + + print(" All accession parsing tests passed!") + +def test_identify_assemblies(): + """Test identification of assemblies needing backfill.""" + print("\n" + "="*80) + print("TEST 2: Identify Assemblies Needing Backfill") + print("="*80) + + test_file = "tests/test_data/assembly_test_sample.jsonl" + + if not os.path.exists(test_file): + print(f" FAIL Test file not found: {test_file}") + return False + + assemblies = identify_assemblies_needing_backfill(test_file) + + print(f" Found {len(assemblies)} assemblies needing backfill:") + for asm in assemblies: + print(f" - {asm['current_accession']}: v{asm['current_version']} " + f"(needs v{asm['historical_versions_needed']})") + + # Verify we found the expected assemblies + expected_count = 3 # GCA_000222935.2, GCA_000412225.2, GCA_003706615.3 + assert len(assemblies) == expected_count, \ + f"Expected {expected_count} assemblies, found {len(assemblies)}" + + print(f" PASS Correctly identified {len(assemblies)} assemblies") + return True + +def test_version_discovery(): + """Test FTP-based version discovery (using cache if available).""" + print("\n" + "="*80) + print("TEST 3: Version Discovery via FTP") + print("="*80) + print(" Note: This test queries NCBI FTP - may take a minute...") + + # Test with a known multi-version assembly + test_accession = "GCA_000222935.2" # Aciculosporium take - has version 1 and 2 + + print(f" Testing: {test_accession}") + versions = find_all_assembly_versions(test_accession) + + if not versions: + print(f" FAIL No versions found (FTP query may have failed)") + print(f" This is not critical - may be network issue") + return False + + print(f" PASS Found {len(versions)} version(s):") + for v in versions: + acc = v.get('accession', 'unknown') + print(f" - {acc}") + + # Verify we found at least version 1 and 2 + accessions = [v.get('accession', '') for v in versions] + base = test_accession.split('.')[0] + + # Should find both v1 and v2 + expected_versions = [f"{base}.1", f"{base}.2"] + found_expected = [v for v in expected_versions if v in accessions] + + print(f" PASS Found {len(found_expected)}/{len(expected_versions)} expected versions") + return True + +def test_cache_functionality(): + """Test that caching works correctly.""" + print("\n" + "="*80) + print("TEST 4: Cache Functionality") + print("="*80) + + # Clean cache first (Windows-safe deletion) + import shutil + import time as time_module + cache_dir = "tmp/backfill_cache" + if os.path.exists(cache_dir): + try: + # Give time for file handles to close + time_module.sleep(0.5) + shutil.rmtree(cache_dir) + print(f" Cleared cache directory: {cache_dir}") + except PermissionError: + print(f" Note: Cache directory in use, will test with existing cache") + + test_accession = "GCA_000222935.2" + + # First call - should fetch from FTP + print(f" First call (should fetch from FTP)...") + import time + start = time.time() + versions1 = find_all_assembly_versions(test_accession) + time1 = time.time() - start + + if not versions1: + print(f" FAIL FTP fetch failed - skipping cache test") + return False + + print(f" Took {time1:.2f}s, found {len(versions1)} versions") + + # Second call - should use cache + print(f" Second call (should use cache)...") + start = time.time() + versions2 = find_all_assembly_versions(test_accession) + time2 = time.time() - start + + print(f" Took {time2:.2f}s, found {len(versions2)} versions") + + # Cache should be much faster + if time2 < time1 * 0.5: # At least 50% faster + print(f" PASS Cache is working (2nd call {time2/time1*100:.1f}% of 1st call time)") + else: + print(f" WARN Cache may not be working (times similar)") + + # Verify cache files exist + if os.path.exists(cache_dir): + cache_files = list(Path(cache_dir).rglob("*.json")) + print(f" PASS Cache directory created with {len(cache_files)} files") + else: + print(f" FAIL Cache directory not created") + + return True + +def main(): + """Run all tests.""" + print("\n" + "="*80) + print("BACKFILL SCRIPT TEST SUITE") + print("="*80) + print("Testing: flows/parsers/backfill_historical_versions.py") + print("="*80) + + results = { + "Accession Parsing": False, + "Identify Assemblies": False, + "Version Discovery": False, + "Cache Functionality": False, + } + + try: + # Run tests + test_parse_accession() + results["Accession Parsing"] = True + + results["Identify Assemblies"] = test_identify_assemblies() + results["Version Discovery"] = test_version_discovery() + results["Cache Functionality"] = test_cache_functionality() + + except Exception as e: + print(f"\nFAIL Test failed with error: {e}") + import traceback + traceback.print_exc() + + # Summary + print("\n" + "="*80) + print("TEST SUMMARY") + print("="*80) + + for test_name, passed in results.items(): + status = "PASS PASS" if passed else "FAIL FAIL" + print(f" {status}: {test_name}") + + passed_count = sum(results.values()) + total_count = len(results) + + print(f"\n Total: {passed_count}/{total_count} tests passed") + + if passed_count == total_count: + print("\n 🎉 All tests passed!") + return 0 + else: + print("\n WARN Some tests failed - review output above") + return 1 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/tests/test_data/assembly_test_sample.jsonl b/tests/test_data/assembly_test_sample.jsonl new file mode 100644 index 0000000..fc540d2 --- /dev/null +++ b/tests/test_data/assembly_test_sample.jsonl @@ -0,0 +1,3 @@ +{"assemblyInfo":{"assemblyLevel":"Contig","assemblyName":"AciTa_1.0","assemblyType":"haploid","submitter":"University of Kentucky, Dept of Plant Pathology","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA67241","title":"Aciculosporium take MAFF-241224 genome sequencing project"}]}],"sequencingTech":"454 GS FLX Titanium","blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_000222935.2","biosample":{"accession":"SAMN02981340","lastUpdated":"2014-08-11T11:24:02.670","publicationDate":"2014-08-11T11:24:02.670","submissionDate":"2014-08-11T11:24:02.670","sampleIds":[{"db":"GenBank","value":"gb|AFQZ00000000.1"}],"description":{"title":"Sample from Aciculosporium take MAFF-241224","organism":{"taxId":1036760,"organismName":"Aciculosporium take MAFF-241224"}},"owner":{"name":"University of Kentucky, Advanced Genetic Technologies Center"},"models":["Generic"],"bioprojects":[{"accession":"PRJNA67241"}],"package":"Generic.1.0","attributes":[{"name":"strain","value":"MAFF-241224"}],"status":{"status":"live","when":"2014-08-11T11:24:02.670"},"strain":"MAFF-241224"},"assemblyStatus":"current","bioprojectAccession":"PRJNA67241","assemblyMethod":"Newbler v. 2.5.3","releaseDate":"2011-08-03"},"assemblyStats":{"totalSequenceLength":"58836405","totalUngappedLength":"58836405","numberOfContigs":3298,"contigN50":40517,"contigL50":411,"numberOfScaffolds":3298,"scaffoldN50":40517,"scaffoldL50":411,"numberOfComponentSequences":3298,"gcCount":"23621104","gcPercent":40,"genomeCoverage":"18.5","atgcCount":"58836294"},"wgsInfo":{"wgsProjectAccession":"AFQZ01","masterWgsUrl":"https://www.ncbi.nlm.nih.gov/nuccore/AFQZ00000000.1","wgsContigsUrl":"https://www.ncbi.nlm.nih.gov/Traces/wgs/AFQZ01"},"currentAccession":"GCA_000222935.2","accession":"GCA_000222935.2","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":1036760,"organismName":"Aciculosporium take MAFF-241224","infraspecificNames":{"strain":"MAFF-241224"}}} +{"assemblyInfo":{"assemblyLevel":"Complete Genome","assemblyName":"ASM41222v2","assemblyType":"haploid","submitter":"Duke University, Molecular Genetics and Microbiology","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA39551","title":"[Ashbya] aceris (nom. inval.) Genome sequencing"}]}],"blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_000412225.2","biosample":{"accession":"SAMN03081470","lastUpdated":"2021-07-14T21:04:31.245","publicationDate":"2014-09-26T00:00:00.000","submissionDate":"2014-09-26T10:52:27.116","sampleIds":[{"db":"GenBank","value":"gb|CP006020.1"}],"description":{"title":"Sample from [Ashbya] aceris (nom. inval.)","organism":{"taxId":566037,"organismName":"[Ashbya] aceris (nom. inval.)"}},"owner":{"name":"Duke University, Molecular Genetics and Microbiology and Institute for Genome Sciences & Policy"},"models":["Generic"],"bioprojects":[{"accession":"PRJNA39551"}],"package":"Generic.1.0","attributes":[{"name":"host","value":"Boisea trivittata"}],"status":{"status":"live","when":"2014-10-01T12:30:14"},"host":"Boisea trivittata"},"assemblyStatus":"current","bioprojectAccession":"PRJNA39551","releaseDate":"2014-06-05"},"assemblyStats":{"totalNumberOfChromosomes":7,"totalSequenceLength":"8867527","totalUngappedLength":"8867527","numberOfContigs":7,"contigN50":1493473,"contigL50":3,"numberOfScaffolds":7,"scaffoldN50":1493473,"scaffoldL50":3,"numberOfComponentSequences":7,"gcCount":"4548443","gcPercent":51.5,"numberOfOrganelles":1,"atgcCount":"8867527"},"organelleInfo":[{"description":"Mitochondrion","totalSeqLength":"26996","submitter":"Duke University, Molecular Genetics and Microbiology"}],"annotationInfo":{"name":"Annotation submitted by Duke University, Molecular Genetics and Microbiology","provider":"Duke University, Molecular Genetics and Microbiology","releaseDate":"2014-10-01","stats":{"geneCounts":{"total":4690,"proteinCoding":4487,"nonCoding":203}}},"currentAccession":"GCA_000412225.2","accession":"GCA_000412225.2","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":566037,"organismName":"[Ashbya] aceris (nom. inval.)"}} +{"assemblyInfo":{"assemblyLevel":"Scaffold","assemblyName":"ASM370661v3","assemblyType":"haploid","submitter":"UW-Madison","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA429441","title":"Sequencing of 196 yeast species from the subphylum Saccharomycotina Genome sequencing and assembly"}]}],"sequencingTech":"Illumina HiSeq","blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_003706615.3","biosample":{"accession":"SAMN08343339","lastUpdated":"2019-06-20T18:14:41.872","publicationDate":"2018-11-02T00:00:00.000","submissionDate":"2018-01-10T12:39:16.190","sampleIds":[{"label":"Sample name","value":"Candida_montana"},{"db":"SRA","value":"SRS2837972"}],"description":{"title":"Microbe sample from [Candida] montana","organism":{"taxId":49329,"organismName":"[Candida] montana"}},"owner":{"name":"UW-Madison","contacts":[{}]},"models":["Microbe, viral or environmental"],"package":"Microbe.1.0","attributes":[{"name":"strain","value":"NRRL Y-17326"},{"name":"isolation_source","value":"missing"},{"name":"collection_date","value":"2016"},{"name":"geo_loc_name","value":"USA: Madison, Wisconsin"},{"name":"sample_type","value":"cell culture"},{"name":"type-material","value":"type material of Candida montana"},{"name":"culture_collection","value":"NRRL:Y-17326"}],"status":{"status":"live","when":"2018-11-02T12:37:45.337"},"collectionDate":"2016","geoLocName":"USA: Madison, Wisconsin","isolationSource":"missing","strain":"NRRL Y-17326"},"assemblyStatus":"current","bioprojectAccession":"PRJNA429441","assemblyMethod":"SPADES v. 3.7","releaseDate":"2023-07-14"},"assemblyStats":{"totalSequenceLength":"12493498","totalUngappedLength":"12493434","numberOfContigs":61,"contigN50":704842,"contigL50":7,"numberOfScaffolds":60,"scaffoldN50":704842,"scaffoldL50":7,"numberOfComponentSequences":60,"gcCount":"4518305","gcPercent":36,"genomeCoverage":"63.8148","atgcCount":"12493434"},"wgsInfo":{"wgsProjectAccession":"PPLU03","masterWgsUrl":"https://www.ncbi.nlm.nih.gov/nuccore/PPLU00000000.3","wgsContigsUrl":"https://www.ncbi.nlm.nih.gov/Traces/wgs/PPLU03"},"currentAccession":"GCA_003706615.3","typeMaterial":{"typeLabel":"TYPE_MATERIAL","typeDisplayText":"assembly from type material"},"accession":"GCA_003706615.3","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":49329,"organismName":"[Candida] montana","infraspecificNames":{"strain":"NRRL Y-17326"}}} From b0c2c60e6f1b370d5223c1f30567059cd6e3b3d6 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Thu, 18 Dec 2025 20:28:15 -0700 Subject: [PATCH 02/14] Add version status column to the existing parser --- flows/parsers/parse_ncbi_assemblies.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/flows/parsers/parse_ncbi_assemblies.py b/flows/parsers/parse_ncbi_assemblies.py index c7ab5e1..d419b7d 100644 --- a/flows/parsers/parse_ncbi_assemblies.py +++ b/flows/parsers/parse_ncbi_assemblies.py @@ -45,9 +45,7 @@ def fetch_ncbi_datasets_sequences( Yields: dict: The sequence report data as a JSON object, one line at a time. """ - if not utils.is_safe_path(accession): - raise ValueError(f"Unsafe accession: {accession}") - result = utils.run_quoted( + result = subprocess.run( [ "datasets", "summary", @@ -93,7 +91,11 @@ def is_atypical_assembly(report: dict, parsed: dict) -> bool: def process_assembly_report( - report: dict, previous_report: Optional[dict], config: Config, parsed: dict + report: dict, + previous_report: Optional[dict], + config: Config, + parsed: dict, + version_status: str = "current" ) -> dict: """Process assembly level information. @@ -110,6 +112,9 @@ def process_assembly_report( previous one. config (Config): A Config object containing the configuration data. parsed (dict): A dictionary containing parsed data. + version_status (str): Version status - "current" (default) or "superseded" + for historical versions. Defaults to "current" to maintain backward + compatibility with existing code. Returns: dict: The updated report dictionary. @@ -117,7 +122,10 @@ def process_assembly_report( # Uncomment to filter atypical assemblies # if is_atypical_assembly(report, parsed): # return None - processed_report = {**report, "processedAssemblyInfo": {"organelle": "nucleus"}} + processed_report = {**report, "processedAssemblyInfo": { + "organelle": "nucleus", + "versionStatus": version_status + }} if "pairedAccession" in report: if processed_report["pairedAccession"].startswith("GCF_"): processed_report["processedAssemblyInfo"]["refseqAccession"] = report[ From 645b7eaffe94761f2c1ccb851303149abe48e6f6 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Thu, 18 Dec 2025 20:45:31 -0700 Subject: [PATCH 03/14] Phase 0 summary --- Phase_0_PR_SUMMARY.md | 141 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 Phase_0_PR_SUMMARY.md diff --git a/Phase_0_PR_SUMMARY.md b/Phase_0_PR_SUMMARY.md new file mode 100644 index 0000000..dfdd1dd --- /dev/null +++ b/Phase_0_PR_SUMMARY.md @@ -0,0 +1,141 @@ +# Pull Request: Historical Assembly Version Backfill (Phase 0) + +## Summary + +This PR implements a one-time backfill process to populate historical assembly versions for all existing assemblies in the genomehubs dataset. It enables version-aware milestone tracking by capturing superseded assembly versions that were previously not tracked. + +## Solution Overview + +Implements a **one-time backfill script** that: +1. Identifies assemblies with version > 1 (indicating historical versions exist) +2. Discovers all historical versions via NCBI FTP +3. Fetches metadata for each historical version using NCBI Datasets CLI +4. Outputs historical versions to a separate TSV file with `versionStatus = "superseded"` +5. Uses smart caching to avoid re-fetching data on reruns + +## Key Features + +### 1. FTP-Based Version Discovery +- Queries NCBI FTP to find all versions of each assembly +- More reliable than API for historical data +- Based on proven DToL prototype implementation + +### 2. Reuses existing Parser +- Calls `parse_ncbi_assemblies.process_assembly_report()` with `version_status="superseded"` +- Fetches sequence reports and computes metrics identically to current assemblies +- Uses same YAML config system +- Generates identical TSV schema (plus `versionStatus` field) + +### 3. Smart 2-Tier Caching +- **Version discovery cache** (7-day expiry): Stores which versions exist +- **Metadata cache** (30-day expiry): Stores assembly metadata from Datasets CLI +- Dramatically reduces runtime on reruns (from hours to minutes) + +### 4. Checkpoint System +- Saves progress every 100 assemblies +- Allows resuming after interruptions +- Critical for processing ~3,694 assemblies + +## Files Modified/Created + +### New Files +- `flows/parsers/backfill_historical_versions.py` - Main backfill script +- `configs/assembly_historical.yaml` - Output schema configuration +- `tests/test_backfill.py` - Automated test suite +- `tests/test_data/assembly_test_sample.jsonl` - Test dataset (3 assemblies) +- `tests/README_test_backfill.md` - Testing documentation + +### Modified Files +- `flows/parsers/parse_ncbi_assemblies.py` - Added `version_status` parameter +- `flows/lib/utils.py` - Added `parse_s3_file` stub function + +## Schema Changes + +### New Field in Output +- Same schema as current assemblies, plus `versionStatus` column +- `versionStatus` - Indicates if assembly is "current" or "superseded" + +### Expected Results +- **Input**: ~3,694 assemblies with version > 1 +- **Output**: ~8,500 historical version records +- **Runtime**: ~10-15 hours (first run), ~15-30 minutes (with cache) + +## Testing + +### Automated Tests (4/4 passing) +```bash +python tests/test_backfill.py +``` + +Tests verify: +1. ✅ Accession parsing +2. ✅ Assembly identification +3. ✅ FTP version discovery +4. ✅ Cache functionality + +### Manual Test (3 assemblies) +```bash +python flows/parsers/backfill_historical_versions.py \ + --input tests/test_data/assembly_test_sample.jsonl \ + --config configs/assembly_historical.yaml \ + --checkpoint tests/test_data/test_checkpoint.json +``` + +## Usage + +### One-Time Backfill +```bash +python flows/parsers/backfill_historical_versions.py \ + --input flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \ + --config configs/assembly_historical.yaml \ + --checkpoint backfill_checkpoint.json +``` +On first run: +- Takes ~10-15 hours (fetches all historical versions) +- Creates tmp/backfill_cache/ directory +- Checkpoints every 100 assemblies +- Safe to Ctrl+C and resume + +On subsequent runs (after input update): +- Takes ~15-30 minutes (uses cache for existing) +- Only fetches NEW assemblies +- Cache expires: version discovery (7 days), metadata (30 days) + +Output: +- outputs/assembly_historical.tsv (all superseded versions) +- Each row has version_status="superseded" +- Includes all sequence reports and metrics + +## Important Notes + +### 1. Stub Function Warning +The `parse_s3_file()` function in `flows/lib/utils.py` is currently a stub: + +```python +def parse_s3_file(s3_path: str) -> dict: + return {} # Placeholder +``` + +**Action needed**: If Rich has a real implementation, replace this stub. The function is imported by `parse_ncbi_assemblies.py` but not used by the backfill script. + +### 2. One-Time Process +This backfill is designed to run **once** to populate historical data. Future updates should be handled by the incremental daily pipeline. + +### 3. Cache Directory +The cache directory `tmp/backfill_cache/` can be safely deleted after successful completion. It contains: +- Version discovery data (FTP queries) +- Assembly metadata (Datasets CLI responses) + +## Next Steps (After PR Merge) + +1. **Run full backfill** on production dataset (~10-15 hours) +2. **Upload output** to appropriate S3 location +3. **Implement Phase 1**: Incremental daily updates for new historical versions +4. **Implement Phase 2**: Version-aware milestone tracking + +## Questions for Reviewer + +1. Is the `parse_s3_file()` stub acceptable, or do you have a real implementation to use? +2. Should historical versions go to a different S3 path than current assemblies? +3. Any preferences for checkpoint file location/naming? + From 8d1666bad292bc147e1b9b81f234a024504456d2 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 19 Dec 2025 10:23:25 -0700 Subject: [PATCH 04/14] Add strict accession validation as suggested by reviewer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use pattern GC[AF]_\d{9}\.\d+ to validate version_acc before subprocess call. This addresses the security concern raised by sourcery-ai and confirmed by @rjchallis. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- flows/parsers/backfill_historical_versions.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flows/parsers/backfill_historical_versions.py b/flows/parsers/backfill_historical_versions.py index c608064..308e066 100644 --- a/flows/parsers/backfill_historical_versions.py +++ b/flows/parsers/backfill_historical_versions.py @@ -149,6 +149,13 @@ def find_all_assembly_versions(base_accession: str) -> List[Dict]: # Fetch from NCBI datasets try: + # Validate accession format to prevent command injection + # Pattern: GC[AF]_9digits.version (e.g., GCA_000001405.39) + version_pattern_strict = r"^GC[AF]_\d{9}\.\d+$" + if not re.match(version_pattern_strict, version_acc): + print(f" Skipping unexpected accession format: {version_acc}") + continue + cmd = ["datasets", "summary", "genome", "accession", version_acc, "--as-json-lines"] result = subprocess.run( cmd, From 899074aff1a1e2566af8194ed312439a13dcb005 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 19 Dec 2025 10:44:46 -0700 Subject: [PATCH 05/14] Remove parse_s3_file stub, use existing implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As suggested by @rjchallis, removed the stub function since the real implementation is already present in the same file at line 769. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- flows/lib/utils.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/flows/lib/utils.py b/flows/lib/utils.py index 1162364..ed8f8bd 100644 --- a/flows/lib/utils.py +++ b/flows/lib/utils.py @@ -203,16 +203,6 @@ def convert_keys_to_camel_case(data: dict) -> dict: return converted_data -def parse_s3_file(s3_path: str) -> dict: - """ - Parse S3 file path (stub for compatibility). - - This function is imported by parse_ncbi_assemblies but not used in backfill. - Returns empty dict as placeholder. - """ - return {} - - def set_organelle_name(seq: dict) -> Optional[str]: """ Determines the organelle type (mitochondrion or plastid) based on the assigned From 792f34ec2e43682b82e9879994f937233ba820c1 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 19 Dec 2025 10:53:31 -0700 Subject: [PATCH 06/14] Update documentation to run parser as module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As suggested by @rjchallis, updated all documentation to show running the parser as a module rather than as a script: - Changed: python flows/parsers/backfill_historical_versions.py - To: python -m flows.parsers.backfill_historical_versions This ensures: - Relative imports work correctly - Prefect can call it without issues - Follows Python best practices Updated files: - flows/parsers/backfill_historical_versions.py (docstring) - tests/README_test_backfill.md (usage examples) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- flows/parsers/backfill_historical_versions.py | 2 +- tests/README_test_backfill.md | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/flows/parsers/backfill_historical_versions.py b/flows/parsers/backfill_historical_versions.py index 308e066..37f47cc 100644 --- a/flows/parsers/backfill_historical_versions.py +++ b/flows/parsers/backfill_historical_versions.py @@ -19,7 +19,7 @@ - On re-run: only fetches NEW assemblies Usage: - python flows/parsers/backfill_historical_versions.py \\ + python -m flows.parsers.backfill_historical_versions \\ --input flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \\ --config configs/assembly_historical.yaml \\ --checkpoint tmp/backfill_checkpoint.json diff --git a/tests/README_test_backfill.md b/tests/README_test_backfill.md index 5986e13..02e3486 100644 --- a/tests/README_test_backfill.md +++ b/tests/README_test_backfill.md @@ -44,16 +44,14 @@ Test the complete backfill process on 3 assemblies: ```bash export SKIP_PREFECT=true -python flows/parsers/backfill_historical_versions.py - --input tests/test_data/assembly_test_sample.jsonl - --config configs/assembly_historical.yaml +python -m flows.parsers.backfill_historical_versions \ + --input tests/test_data/assembly_test_sample.jsonl \ + --config configs/assembly_historical.yaml \ --checkpoint tests/test_data/test_checkpoint.json - ``` ```powershell $env:SKIP_PREFECT="true" -$env:PYTHONPATH="." -python flows/parsers/backfill_historical_versions.py --input tests/test_data/assembly_test_sample.jsonl --config configs/assembly_historical.yaml --checkpoint tests/test_data/test_checkpoint.json +python -m flows.parsers.backfill_historical_versions --input tests/test_data/assembly_test_sample.jsonl --config configs/assembly_historical.yaml --checkpoint tests/test_data/test_checkpoint.json ``` From 91ab12b323d58b2bce3c2f88bd3ff6675b7aeb08 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 19 Dec 2025 11:28:57 -0700 Subject: [PATCH 07/14] Fix YAML config as suggested by reviewer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two changes per @rjchallis feedback: 1. Changed output filename from '../outputs/assembly_historical.tsv' to 'assembly_historical.tsv' - File should be written alongside the YAML file - Workflow helpers copy YAML to working directory 2. Added 'needs' section to pull in base attribute definitions - References ncbi_datasets_eukaryota.types.yaml - Inherits standard field definitions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- configs/assembly_historical.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/configs/assembly_historical.yaml b/configs/assembly_historical.yaml index 5e11a61..85557b1 100644 --- a/configs/assembly_historical.yaml +++ b/configs/assembly_historical.yaml @@ -2,6 +2,9 @@ # This config defines the schema for assembly_historical.tsv # which contains all superseded assembly versions +needs: + - ncbi_datasets_eukaryota.types.yaml + attributes: assembly_level: header: assemblyLevel @@ -156,7 +159,7 @@ file: - taxon_id format: tsv header: true - name: ../outputs/assembly_historical.tsv + name: assembly_historical.tsv source: INSDC source_url_stub: https://www.ncbi.nlm.nih.gov/assembly/ From 678664440b405247341350b6cc94800491c7a524 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 19 Dec 2025 11:37:10 -0700 Subject: [PATCH 08/14] Simplify cache key format to use accession only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove hash generation from cache file paths and use just the accession for better human readability. Also remove unused hashlib import. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- flows/parsers/backfill_historical_versions.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flows/parsers/backfill_historical_versions.py b/flows/parsers/backfill_historical_versions.py index 37f47cc..e5fb148 100644 --- a/flows/parsers/backfill_historical_versions.py +++ b/flows/parsers/backfill_historical_versions.py @@ -25,7 +25,6 @@ --checkpoint tmp/backfill_checkpoint.json """ -import hashlib import json import os import re @@ -58,8 +57,7 @@ def setup_cache_directories(): def get_cache_path(cache_type: str, identifier: str) -> str: """Generate cache file path for given type and identifier.""" - safe_id = hashlib.md5(identifier.encode()).hexdigest()[:16] - return f"tmp/backfill_cache/{cache_type}/{identifier}_{safe_id}.json" + return f"tmp/backfill_cache/{cache_type}/{identifier}.json" def load_from_cache(cache_path: str, max_age_days: int = 30) -> Dict: From 8576cb36d0529c79cd9001961cf87ecbac758e30 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Fri, 19 Dec 2025 11:45:35 -0700 Subject: [PATCH 09/14] Use shared argument parser from flows.parsers.args MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace direct argparse usage with the shared argument parser system used by all other parsers. This ensures consistent argument handling across the workflow and follows project conventions. Changes: - Use INPUT_PATH and YAML_PATH from shared_args - Define custom CHECKPOINT argument following shared pattern - Update argument names: --input → --input_path, --config → --yaml_path - Update documentation to reflect new argument names 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- flows/parsers/backfill_historical_versions.py | 43 ++++++++----------- tests/README_test_backfill.md | 6 +-- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/flows/parsers/backfill_historical_versions.py b/flows/parsers/backfill_historical_versions.py index e5fb148..9ad4d1e 100644 --- a/flows/parsers/backfill_historical_versions.py +++ b/flows/parsers/backfill_historical_versions.py @@ -20,8 +20,8 @@ Usage: python -m flows.parsers.backfill_historical_versions \\ - --input flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \\ - --config configs/assembly_historical.yaml \\ + --input_path flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \\ + --yaml_path configs/assembly_historical.yaml \\ --checkpoint tmp/backfill_checkpoint.json """ @@ -449,31 +449,26 @@ def backfill_historical_versions( # ============================================================================= if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser( + from flows.lib.shared_args import parse_args as _parse_args, required, default + from flows.lib.shared_args import INPUT_PATH, YAML_PATH + + # Define checkpoint argument + CHECKPOINT = { + "flags": ["--checkpoint"], + "keys": {"help": "Checkpoint file for resuming", "type": str} + } + + args = _parse_args( + [ + required(INPUT_PATH), + required(YAML_PATH), + default(CHECKPOINT, 'tmp/backfill_checkpoint.json') + ], description='One-time historical backfill for assembly versions' ) - parser.add_argument( - '--input', - required=True, - help='Input JSONL file (assembly_data_report.jsonl)' - ) - parser.add_argument( - '--config', - required=True, - help='Config YAML file (assembly_historical.yaml)' - ) - parser.add_argument( - '--checkpoint', - default='tmp/backfill_checkpoint.json', - help='Checkpoint file for resuming' - ) - - args = parser.parse_args() backfill_historical_versions( - input_jsonl=args.input, - config_yaml=args.config, + input_jsonl=args.input_path, + config_yaml=args.yaml_path, checkpoint_file=args.checkpoint ) diff --git a/tests/README_test_backfill.md b/tests/README_test_backfill.md index 02e3486..471decf 100644 --- a/tests/README_test_backfill.md +++ b/tests/README_test_backfill.md @@ -45,13 +45,13 @@ Test the complete backfill process on 3 assemblies: ```bash export SKIP_PREFECT=true python -m flows.parsers.backfill_historical_versions \ - --input tests/test_data/assembly_test_sample.jsonl \ - --config configs/assembly_historical.yaml \ + --input_path tests/test_data/assembly_test_sample.jsonl \ + --yaml_path configs/assembly_historical.yaml \ --checkpoint tests/test_data/test_checkpoint.json ``` ```powershell $env:SKIP_PREFECT="true" -python -m flows.parsers.backfill_historical_versions --input tests/test_data/assembly_test_sample.jsonl --config configs/assembly_historical.yaml --checkpoint tests/test_data/test_checkpoint.json +python -m flows.parsers.backfill_historical_versions --input_path tests/test_data/assembly_test_sample.jsonl --yaml_path configs/assembly_historical.yaml --checkpoint tests/test_data/test_checkpoint.json ``` From 9283f42b294ba2874e85c4ed5f6418ac4db981fa Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Thu, 19 Mar 2026 11:49:19 -0700 Subject: [PATCH 10/14] update on Phase 0 scripts and test --- Phase_0_PR_SUMMARY.md | 213 +++--- configs/assembly_historical.yaml | 5 +- flows/parsers/backfill_historical_versions.py | 474 ------------- .../parse_backfill_historical_versions.py | 546 +++++++++++++++ tests/README_test_backfill.md | 57 -- tests/test_backfill.py | 629 ++++++++++++------ 6 files changed, 1069 insertions(+), 855 deletions(-) delete mode 100644 flows/parsers/backfill_historical_versions.py create mode 100644 flows/parsers/parse_backfill_historical_versions.py delete mode 100644 tests/README_test_backfill.md diff --git a/Phase_0_PR_SUMMARY.md b/Phase_0_PR_SUMMARY.md index dfdd1dd..1f31702 100644 --- a/Phase_0_PR_SUMMARY.md +++ b/Phase_0_PR_SUMMARY.md @@ -2,140 +2,115 @@ ## Summary -This PR implements a one-time backfill process to populate historical assembly versions for all existing assemblies in the genomehubs dataset. It enables version-aware milestone tracking by capturing superseded assembly versions that were previously not tracked. +One-time backfill process to populate historical (superseded) assembly versions +from NCBI for all existing assemblies with version > 1. Enables version-aware +milestone tracking by capturing previously untracked superseded versions. + +## What Changed (latest revision) + +### Bug fixes +- **Fixed data-loss bug**: `write_to_tsv` overwrites by default, so the + original per-batch write + `parsed = {}` clearing lost all but the last + batch. All rows now accumulate in memory and `write_to_tsv` is called + exactly once at the end. Checkpoints only record resume progress. + +### Structural changes +- **Renamed** `backfill_historical_versions.py` → + `parse_backfill_historical_versions.py` so `register.py` discovers it as a + plugin via the `parse_` prefix convention. +- **Changed orchestrator decorator** from `@task` to `@flow(log_prints=True)` + to match how `parse_ncbi_assemblies` structures its flow/task hierarchy. +- **Split `find_all_assembly_versions`** into `discover_version_accessions` + (FTP) + `fetch_version_metadata` (datasets CLI) for modularity and + independent testability. +- **Added `backfill_historical_versions_wrapper`** matching the + `fetch_parse_validate` parser signature so the flow integrates with the + Prefect pipeline. + +### Convention alignment +- **CLI arguments** now use `shared_args` exclusively (`INPUT_PATH`, + `YAML_PATH`, `WORK_DIR`); removed ad-hoc `--checkpoint` argument. + Checkpoint path is derived via `derive_checkpoint_path`. +- **Replaced `subprocess.run`** with `utils.run_quoted` for shell-safe + argument quoting (consistent with `parse_ncbi_assemblies`). +- **Code style** aligned with GenomeHubs conventions: Google-style docstrings, + lowercase type hints (`dict`, `list`, `tuple`), `e` for exception variables, + removed shebang and section banners. +- **`assembly_historical.yaml`**: moved `needs` under `file:` section and + references `ATTR_assembly.types.yaml` (matches `ncbi_datasets_eukaryota` + convention). + +### Test suite rewrite +- Rewrote `tests/test_backfill.py` using pytest (was a custom runner). +- **33 tests**, all passing, covering: + - Accession parsing helpers + - Assembly identification from JSONL fixture + - Cache round-trip with expiry + - Checkpoint save/load/derive + - Accession format validation + - `parse_historical_version` delegation (mocked) + - Orchestrator: single TSV write, multi-assembly accumulation, + current-version skipping, no-op for v1-only input + - **Regression test for the batch-overwrite data-loss bug** ## Solution Overview -Implements a **one-time backfill script** that: -1. Identifies assemblies with version > 1 (indicating historical versions exist) -2. Discovers all historical versions via NCBI FTP -3. Fetches metadata for each historical version using NCBI Datasets CLI -4. Outputs historical versions to a separate TSV file with `versionStatus = "superseded"` -5. Uses smart caching to avoid re-fetching data on reruns - -## Key Features - -### 1. FTP-Based Version Discovery -- Queries NCBI FTP to find all versions of each assembly -- More reliable than API for historical data -- Based on proven DToL prototype implementation - -### 2. Reuses existing Parser -- Calls `parse_ncbi_assemblies.process_assembly_report()` with `version_status="superseded"` -- Fetches sequence reports and computes metrics identically to current assemblies -- Uses same YAML config system -- Generates identical TSV schema (plus `versionStatus` field) - -### 3. Smart 2-Tier Caching -- **Version discovery cache** (7-day expiry): Stores which versions exist -- **Metadata cache** (30-day expiry): Stores assembly metadata from Datasets CLI -- Dramatically reduces runtime on reruns (from hours to minutes) - -### 4. Checkpoint System -- Saves progress every 100 assemblies +The backfill script: +1. Identifies assemblies with version > 1 from the input JSONL +2. Discovers all historical versions via NCBI FTP directory listing +3. Fetches metadata for each version using NCBI Datasets CLI +4. Parses each version through `process_assembly_report` with + `version_status="superseded"` +5. Writes all accumulated rows to a single TSV via `write_to_tsv` + +### Smart 2-Tier Caching +- **Version discovery cache** (7-day expiry): FTP directory listings +- **Metadata cache** (30-day expiry): Datasets CLI responses +- Reduces reruns from hours to minutes + +### Checkpoint System +- Saves progress every 100 assemblies to `{work_dir}/checkpoints/` - Allows resuming after interruptions -- Critical for processing ~3,694 assemblies +- Does **not** trigger intermediate TSV writes (avoids the overwrite bug) -## Files Modified/Created +## Files -### New Files -- `flows/parsers/backfill_historical_versions.py` - Main backfill script -- `configs/assembly_historical.yaml` - Output schema configuration -- `tests/test_backfill.py` - Automated test suite -- `tests/test_data/assembly_test_sample.jsonl` - Test dataset (3 assemblies) -- `tests/README_test_backfill.md` - Testing documentation +### New +- `flows/parsers/parse_backfill_historical_versions.py` — Main backfill flow +- `configs/assembly_historical.yaml` — Output schema configuration +- `tests/test_backfill.py` — pytest suite (33 tests) +- `tests/test_data/assembly_test_sample.jsonl` — Test fixture (3 assemblies) -### Modified Files -- `flows/parsers/parse_ncbi_assemblies.py` - Added `version_status` parameter -- `flows/lib/utils.py` - Added `parse_s3_file` stub function +### Modified +- `flows/parsers/parse_ncbi_assemblies.py` — Added `version_status` parameter -## Schema Changes - -### New Field in Output -- Same schema as current assemblies, plus `versionStatus` column -- `versionStatus` - Indicates if assembly is "current" or "superseded" - -### Expected Results -- **Input**: ~3,694 assemblies with version > 1 -- **Output**: ~8,500 historical version records -- **Runtime**: ~10-15 hours (first run), ~15-30 minutes (with cache) - -## Testing +## Usage -### Automated Tests (4/4 passing) +### As a standalone CLI ```bash -python tests/test_backfill.py +python -m flows.parsers.parse_backfill_historical_versions \ + --input_path data/assembly_data_report.jsonl \ + --yaml_path configs/assembly_historical.yaml \ + --work_dir tmp ``` -Tests verify: -1. ✅ Accession parsing -2. ✅ Assembly identification -3. ✅ FTP version discovery -4. ✅ Cache functionality +### Via Prefect pipeline +Discovered automatically by `register.py` and invoked through +`fetch_parse_validate` with the standard parser signature. -### Manual Test (3 assemblies) -```bash -python flows/parsers/backfill_historical_versions.py \ - --input tests/test_data/assembly_test_sample.jsonl \ - --config configs/assembly_historical.yaml \ - --checkpoint tests/test_data/test_checkpoint.json -``` - -## Usage +### Expected performance +- **First run**: ~10–15 hours (~3,700 assemblies, ~8,500 versions) +- **Subsequent runs**: ~15–30 minutes (cache hits) -### One-Time Backfill +### Running tests ```bash -python flows/parsers/backfill_historical_versions.py \ - --input flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \ - --config configs/assembly_historical.yaml \ - --checkpoint backfill_checkpoint.json +set SKIP_PREFECT=true +python -m pytest tests/test_backfill.py -v ``` -On first run: -- Takes ~10-15 hours (fetches all historical versions) -- Creates tmp/backfill_cache/ directory -- Checkpoints every 100 assemblies -- Safe to Ctrl+C and resume - -On subsequent runs (after input update): -- Takes ~15-30 minutes (uses cache for existing) -- Only fetches NEW assemblies -- Cache expires: version discovery (7 days), metadata (30 days) - -Output: -- outputs/assembly_historical.tsv (all superseded versions) -- Each row has version_status="superseded" -- Includes all sequence reports and metrics - -## Important Notes - -### 1. Stub Function Warning -The `parse_s3_file()` function in `flows/lib/utils.py` is currently a stub: - -```python -def parse_s3_file(s3_path: str) -> dict: - return {} # Placeholder -``` - -**Action needed**: If Rich has a real implementation, replace this stub. The function is imported by `parse_ncbi_assemblies.py` but not used by the backfill script. - -### 2. One-Time Process -This backfill is designed to run **once** to populate historical data. Future updates should be handled by the incremental daily pipeline. - -### 3. Cache Directory -The cache directory `tmp/backfill_cache/` can be safely deleted after successful completion. It contains: -- Version discovery data (FTP queries) -- Assembly metadata (Datasets CLI responses) - -## Next Steps (After PR Merge) - -1. **Run full backfill** on production dataset (~10-15 hours) -2. **Upload output** to appropriate S3 location -3. **Implement Phase 1**: Incremental daily updates for new historical versions -4. **Implement Phase 2**: Version-aware milestone tracking - -## Questions for Reviewer -1. Is the `parse_s3_file()` stub acceptable, or do you have a real implementation to use? -2. Should historical versions go to a different S3 path than current assemblies? -3. Any preferences for checkpoint file location/naming? +## Next Steps (after merge) +1. Run full backfill on production dataset +2. Upload output to S3 +3. Implement Phase 1: incremental daily updates for new historical versions +4. Implement Phase 2: version-aware milestone tracking diff --git a/configs/assembly_historical.yaml b/configs/assembly_historical.yaml index 85557b1..c4086cd 100644 --- a/configs/assembly_historical.yaml +++ b/configs/assembly_historical.yaml @@ -2,9 +2,6 @@ # This config defines the schema for assembly_historical.tsv # which contains all superseded assembly versions -needs: - - ncbi_datasets_eukaryota.types.yaml - attributes: assembly_level: header: assemblyLevel @@ -149,6 +146,8 @@ attributes: file: display_group: general + needs: + - ATTR_assembly.types.yaml exclusions: attributes: - bioproject diff --git a/flows/parsers/backfill_historical_versions.py b/flows/parsers/backfill_historical_versions.py deleted file mode 100644 index 9ad4d1e..0000000 --- a/flows/parsers/backfill_historical_versions.py +++ /dev/null @@ -1,474 +0,0 @@ -#!/usr/bin/env python3 -""" -One-time historical backfill process for assembly versions. - -This script discovers and parses ALL superseded versions from NCBI for assemblies -that currently have version > 1. It should be run ONCE before starting the daily -incremental pipeline. - -Process: -1. Scan input JSONL for assemblies with version > 1 -2. Discover all versions via NCBI FTP directory listing (with caching) -3. Fetch metadata for each version via datasets command (with caching) -4. Parse using Rich's existing parser (includes sequence reports + metrics) -5. Write to assembly_historical.tsv with version_status="superseded" - -Caching: -- Version discovery cached 7 days (FTP queries) -- Individual metadata cached 30 days (datasets queries) -- On re-run: only fetches NEW assemblies - -Usage: - python -m flows.parsers.backfill_historical_versions \\ - --input_path flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \\ - --yaml_path configs/assembly_historical.yaml \\ - --checkpoint tmp/backfill_checkpoint.json -""" - -import json -import os -import re -import subprocess -import time -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Tuple - -import requests -from genomehubs import utils as gh_utils - -from flows.lib import utils -from flows.lib.conditional_import import task - - -# ============================================================================= -# Cache Management (from DToL prototype - proven to work) -# ============================================================================= - -def setup_cache_directories(): - """Create cache directory structure.""" - cache_dirs = [ - "tmp/backfill_cache/version_discovery", - "tmp/backfill_cache/metadata" - ] - for cache_dir in cache_dirs: - os.makedirs(cache_dir, exist_ok=True) - - -def get_cache_path(cache_type: str, identifier: str) -> str: - """Generate cache file path for given type and identifier.""" - return f"tmp/backfill_cache/{cache_type}/{identifier}.json" - - -def load_from_cache(cache_path: str, max_age_days: int = 30) -> Dict: - """Load data from cache if it exists and is recent enough.""" - try: - if os.path.exists(cache_path): - cache_age = time.time() - os.path.getmtime(cache_path) - if cache_age < (max_age_days * 24 * 3600): - with open(cache_path, 'r', encoding='utf-8') as f: - return json.load(f) - except Exception as e: - print(f" Warning: Could not load cache from {cache_path}: {e}") - return {} - - -def save_to_cache(cache_path: str, data: Dict): - """Save data to cache file.""" - try: - os.makedirs(os.path.dirname(cache_path), exist_ok=True) - with open(cache_path, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=2, ensure_ascii=False) - except Exception as e: - print(f" Warning: Could not save cache to {cache_path}: {e}") - - -# ============================================================================= -# Version Discovery via FTP (KEY DIFFERENCE from Rich's parser) -# ============================================================================= - -def find_all_assembly_versions(base_accession: str) -> List[Dict]: - """ - Find all versions of an assembly by examining NCBI FTP structure. - - This is the KEY difference from Rich's parser: - - Rich's parser: Gets latest versions from input JSONL - - This function: Discovers ALL versions (including historical) via FTP - - Args: - base_accession: Full accession (e.g., GCA_000002035.3) - - Returns: - List of dicts with full NCBI metadata for each version - """ - # Extract base (e.g., GCA_000002035 from GCA_000002035.3) - base_match = re.match(r'(GC[AF]_\d+)', base_accession) - if not base_match: - return [] - - base = base_match.group(1) - - # Check version discovery cache first - setup_cache_directories() - version_cache_path = get_cache_path("version_discovery", base) - cached_data = load_from_cache(version_cache_path, max_age_days=7) - - if cached_data and 'versions' in cached_data: - print(f" Using cached version data for {base}") - return cached_data['versions'] - - print(f" Discovering versions for {base} via FTP") - versions = [] - - try: - # Construct FTP URL - # Example: https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/000/002/035/ - ftp_url = f"https://ftp.ncbi.nlm.nih.gov/genomes/all/{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" - - # Get directory listing - response = requests.get(ftp_url, timeout=30) - if response.status_code != 200: - print(f" Warning: FTP query failed for {base}") - return [] - - # Parse HTML for version directories (e.g., GCA_000002035.1, GCA_000002035.2, etc.) - version_pattern = rf"{base}\.\d+" - found_versions = re.findall(version_pattern, response.text) - unique_versions = sorted(list(set(found_versions))) - - # Fetch metadata for each version - for version_acc in unique_versions: - metadata_cache_path = get_cache_path("metadata", version_acc) - cached_metadata = load_from_cache(metadata_cache_path, max_age_days=30) - - if cached_metadata and 'metadata' in cached_metadata: - versions.append(cached_metadata['metadata']) - continue - - # Fetch from NCBI datasets - try: - # Validate accession format to prevent command injection - # Pattern: GC[AF]_9digits.version (e.g., GCA_000001405.39) - version_pattern_strict = r"^GC[AF]_\d{9}\.\d+$" - if not re.match(version_pattern_strict, version_acc): - print(f" Skipping unexpected accession format: {version_acc}") - continue - - cmd = ["datasets", "summary", "genome", "accession", version_acc, "--as-json-lines"] - result = subprocess.run( - cmd, - capture_output=True, - text=True, - encoding='utf-8', - errors='ignore', # Handle Unicode gracefully - timeout=60 - ) - - if result.returncode == 0 and result.stdout and result.stdout.strip(): - version_data = json.loads(result.stdout.strip()) - versions.append(version_data) - save_to_cache(metadata_cache_path, {'metadata': version_data, 'cached_at': time.time()}) - else: - print(f" Warning: No metadata for {version_acc}") - - except Exception as e: - print(f" Warning: Error fetching {version_acc}: {e}") - continue - - # Cache the complete version discovery - cache_data = { - 'versions': versions, - 'base_accession': base, - 'discovered_at': time.time(), - 'ftp_url': ftp_url - } - save_to_cache(version_cache_path, cache_data) - - return versions - - except Exception as e: - print(f" Error discovering versions for {base}: {e}") - return [] - - -# ============================================================================= -# Parsing with Rich's Existing Functions -# ============================================================================= - -def parse_historical_version( - version_data: Dict, - config: utils.Config, - base_accession: str, - version_num: int, - current_accession: str -) -> Dict: - """ - Parse historical version using Rich's EXACT parser logic. - - This ensures consistency with current assemblies by: - - Using process_assembly_report() with version_status="superseded" - - Fetching sequence reports via fetch_and_parse_sequence_report() - - Computing all metrics identically to current parser - - Args: - version_data: Raw NCBI metadata from datasets command - config: Config object from YAML - base_accession: Base accession (e.g., GCA_000002035) - version_num: Version number (1, 2, 3, etc.) - current_accession: Latest version that superseded this one - - Returns: - Parsed assembly dict ready for TSV output - """ - from flows.parsers.parse_ncbi_assemblies import ( - fetch_and_parse_sequence_report, - process_assembly_report - ) - - # Convert keys to camelCase (Rich's standard) - version_data = utils.convert_keys_to_camel_case(version_data) - - # Process with Rich's parser (version_status="superseded") - processed_report = process_assembly_report( - report=version_data, - previous_report=None, - config=config, - parsed={}, - version_status="superseded" - ) - - # Fetch sequence reports (chromosomes, organelles, etc.) - Rich's critical step - fetch_and_parse_sequence_report(processed_report) - - # Set assemblyID in standard format (e.g., GCA_000222935_1) - # The versionStatus field already indicates this is "superseded" - processed_report["processedAssemblyInfo"]["assemblyID"] = f"{base_accession}_{version_num}" - - # Parse into TSV row format using Rich's parse functions - row = gh_utils.parse_report_values(config.parse_fns, processed_report) - - return row - - -def parse_version(accession: str) -> int: - """Extract version number from accession.""" - parts = accession.split('.') - return int(parts[1]) if len(parts) > 1 else 1 - - -def parse_accession(accession: str) -> Tuple[str, int]: - """Parse accession into base and version number.""" - parts = accession.split('.') - base = parts[0] - version = int(parts[1]) if len(parts) > 1 else 1 - return base, version - - -# ============================================================================= -# Checkpoint Management -# ============================================================================= - -def load_checkpoint(checkpoint_file: str) -> Dict: - """Load checkpoint data if exists.""" - if Path(checkpoint_file).exists(): - with open(checkpoint_file) as f: - return json.load(f) - return {} - - -def save_checkpoint(checkpoint_file: str, processed_count: int): - """Save checkpoint data.""" - Path(checkpoint_file).parent.mkdir(parents=True, exist_ok=True) - with open(checkpoint_file, 'w') as f: - json.dump({ - 'processed_count': processed_count, - 'timestamp': datetime.now().isoformat() - }, f, indent=2) - - -# ============================================================================= -# Main Backfill Logic -# ============================================================================= - -def identify_assemblies_needing_backfill(input_jsonl: str) -> List[Dict]: - """ - Identify assemblies with version > 1 that need historical backfill. - - Args: - input_jsonl: Path to assembly_data_report.jsonl - - Returns: - List of assembly info dicts needing backfill - """ - assemblies_needing_backfill = [] - - with open(input_jsonl) as f: - for line in f: - assembly = json.loads(line) - accession = assembly['accession'] - base_acc, version = parse_accession(accession) - - if version > 1: - assemblies_needing_backfill.append({ - 'base_accession': base_acc, - 'current_version': version, - 'current_accession': accession, - 'historical_versions_needed': list(range(1, version)) - }) - - return assemblies_needing_backfill - - -@task(log_prints=True) -def backfill_historical_versions( - input_jsonl: str, - config_yaml: str, - checkpoint_file: str = 'tmp/backfill_checkpoint.json' -): - """ - One-time backfill of all historical assembly versions. - - Process: - 1. Identify assemblies with version > 1 - 2. Discover all versions via FTP (cached) - 3. Fetch metadata via datasets (cached) - 4. Parse with Rich's parser - 5. Write to assembly_historical.tsv - - Args: - input_jsonl: Path to assembly_data_report.jsonl - config_yaml: Path to assembly_historical.yaml - checkpoint_file: Path for checkpoint data - """ - # Setup - setup_cache_directories() - config = utils.load_config(config_file=config_yaml) - - # Identify assemblies needing backfill - print("Scanning for assemblies needing historical backfill...") - assemblies_needing_backfill = identify_assemblies_needing_backfill(input_jsonl) - - if not assemblies_needing_backfill: - print("No assemblies with version > 1 found. Nothing to backfill.") - return - - # Load checkpoint - checkpoint = load_checkpoint(checkpoint_file) - start_index = checkpoint.get('processed_count', 0) - - total_assemblies = len(assemblies_needing_backfill) - total_versions = sum(len(a['historical_versions_needed']) for a in assemblies_needing_backfill) - - print(f"\n{'='*80}") - print("ONE-TIME HISTORICAL BACKFILL") - print(f"{'='*80}") - print(f" Assemblies to process: {total_assemblies}") - print(f" Total historical versions: {total_versions}") - - if start_index > 0: - print(f" Resuming from checkpoint: {start_index}/{total_assemblies}") - - print(f"{'='*80}\n") - - parsed = {} - processed = start_index - - for assembly_info in assemblies_needing_backfill[start_index:]: - base_acc = assembly_info['base_accession'] - current_version = assembly_info['current_version'] - current_accession = assembly_info['current_accession'] - - print(f"[{processed+1}/{total_assemblies}] {base_acc} (current: v{current_version})") - - # Discover all versions via FTP (uses cache) - all_versions = find_all_assembly_versions(current_accession) - - if not all_versions: - print(f" Warning: No versions found via FTP") - processed += 1 - continue - - # Parse each historical version (skip current) - for version_data in all_versions: - version_acc = version_data.get('accession', '') - version_num = parse_version(version_acc) - - # Only process historical versions - if version_num >= current_version: - continue - - try: - print(f" Parsing v{version_num}...", end=' ', flush=True) - - # Parse using Rich's parser - row = parse_historical_version( - version_data=version_data, - config=config, - base_accession=base_acc, - version_num=version_num, - current_accession=current_accession - ) - - # Add to parsed dict (keyed by genbank accession) - genbank_acc = row.get('genbankAccession', version_acc) - parsed[genbank_acc] = row - - print("✓") - - except Exception as e: - print(f"✗ ({e})") - continue - - processed += 1 - - # Checkpoint every 100 assemblies - if processed % 100 == 0: - print(f"\n→ Checkpoint: Writing batch to disk...") - gh_utils.write_tsv(parsed, config.headers, config.meta) - save_checkpoint(checkpoint_file, processed) - parsed = {} - print(f"→ Progress: {processed}/{total_assemblies} ({processed/total_assemblies*100:.1f}%)\n") - - # Final write - if parsed: - print(f"\n→ Writing final batch...") - gh_utils.write_tsv(parsed, config.headers, config.meta) - - # Final report - print(f"\n{'='*80}") - print("BACKFILL COMPLETE") - print(f"{'='*80}") - print(f" Processed: {processed}/{total_assemblies} assemblies") - print(f" Output: {config.meta['file_name']}") - print(f"\n Next step: Run daily incremental pipeline") - print(f"{'='*80}\n") - - -# ============================================================================= -# Main Entry Point -# ============================================================================= - -if __name__ == '__main__': - from flows.lib.shared_args import parse_args as _parse_args, required, default - from flows.lib.shared_args import INPUT_PATH, YAML_PATH - - # Define checkpoint argument - CHECKPOINT = { - "flags": ["--checkpoint"], - "keys": {"help": "Checkpoint file for resuming", "type": str} - } - - args = _parse_args( - [ - required(INPUT_PATH), - required(YAML_PATH), - default(CHECKPOINT, 'tmp/backfill_checkpoint.json') - ], - description='One-time historical backfill for assembly versions' - ) - - backfill_historical_versions( - input_jsonl=args.input_path, - config_yaml=args.yaml_path, - checkpoint_file=args.checkpoint - ) diff --git a/flows/parsers/parse_backfill_historical_versions.py b/flows/parsers/parse_backfill_historical_versions.py new file mode 100644 index 0000000..5bce801 --- /dev/null +++ b/flows/parsers/parse_backfill_historical_versions.py @@ -0,0 +1,546 @@ +"""One-time historical backfill of superseded assembly versions from NCBI. + +Discovers and parses all superseded versions for assemblies with version > 1. +Run once before starting the daily incremental pipeline. + +Usage: + python -m flows.parsers.parse_backfill_historical_versions \\ + --input_path data/assembly_data_report.jsonl \\ + --yaml_path configs/assembly_historical.yaml \\ + --work_dir tmp +""" + +import json +import os +import re +import time +from datetime import datetime +from glob import glob +from pathlib import Path +from typing import Optional + +import requests +from genomehubs import utils as gh_utils + +from flows.lib import utils +from flows.lib.conditional_import import flow +from flows.lib.shared_args import ( + INPUT_PATH, + WORK_DIR, + YAML_PATH, + parse_args as _parse_args, + required, +) +from flows.lib.utils import Config, Parser +from flows.parsers.parse_ncbi_assemblies import ( + fetch_and_parse_sequence_report, + process_assembly_report, + write_to_tsv, +) + +ACCESSION_PATTERN = re.compile(r"^GC[AF]_\d{9}\.\d+$") + + +def setup_cache_directories(work_dir: str): + """Create cache directory structure under work_dir. + + Args: + work_dir (str): Path to the working directory. + """ + for subdir in ("version_discovery", "metadata"): + os.makedirs( + os.path.join(work_dir, "backfill_cache", subdir), exist_ok=True + ) + + +def get_cache_path(work_dir: str, cache_type: str, identifier: str) -> str: + """Generate a human-readable cache file path. + + Args: + work_dir (str): Path to the working directory. + cache_type (str): Cache category (version_discovery or metadata). + identifier (str): Accession string used as the filename stem. + + Returns: + str: Path to the JSON cache file. + """ + safe_id = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier) + return os.path.join(work_dir, "backfill_cache", cache_type, f"{safe_id}.json") + + +def load_from_cache(cache_path: str, max_age_days: int = 30) -> dict: + """Load data from cache if it exists and is recent enough. + + Args: + cache_path (str): Path to the cache JSON file. + max_age_days (int): Maximum acceptable age in days. + + Returns: + dict: Cached data, or empty dict on miss/expiry. + """ + try: + if os.path.exists(cache_path): + cache_age = time.time() - os.path.getmtime(cache_path) + if cache_age < (max_age_days * 24 * 3600): + with open(cache_path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f" Warning: Could not load cache from {cache_path}: {e}") + return {} + + +def save_to_cache(cache_path: str, data: dict): + """Save data to a cache file, creating parent dirs as needed. + + Args: + cache_path (str): Path to the cache JSON file. + data (dict): Data to persist. + """ + try: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f" Warning: Could not save cache to {cache_path}: {e}") + + +def discover_version_accessions( + base_accession: str, work_dir: str +) -> list[str]: + """Discover all versioned accessions for a base assembly via NCBI FTP. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: Sorted list of versioned accession strings. + """ + base_match = re.match(r"(GC[AF]_\d+)", base_accession) + if not base_match: + return [] + + base = base_match.group(1) + setup_cache_directories(work_dir) + cache_path = get_cache_path(work_dir, "version_discovery", base) + cached = load_from_cache(cache_path, max_age_days=7) + + if cached and "accessions" in cached: + print(f" Using cached version list for {base}") + return cached["accessions"] + + print(f" Discovering versions for {base} via FTP") + ftp_url = ( + f"https://ftp.ncbi.nlm.nih.gov/genomes/all/" + f"{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" + ) + + try: + response = requests.get(ftp_url, timeout=30) + if response.status_code != 200: + print(f" Warning: FTP query failed for {base}") + return [] + except Exception as e: + print(f" Error querying FTP for {base}: {e}") + return [] + + version_pattern = rf"{re.escape(base)}\.\d+" + accessions = sorted(set(re.findall(version_pattern, response.text))) + + save_to_cache(cache_path, { + "accessions": accessions, + "base_accession": base, + "ftp_url": ftp_url, + }) + return accessions + + +def fetch_version_metadata(version_acc: str, work_dir: str) -> dict: + """Fetch NCBI datasets metadata for a single assembly version. + + Uses utils.run_quoted to safely invoke the datasets CLI. Results are + cached for 30 days. + + Args: + version_acc (str): Versioned accession (e.g. GCA_000002035.1). + work_dir (str): Working directory for cache storage. + + Returns: + dict: Metadata dict, or empty dict on failure. + """ + cache_path = get_cache_path(work_dir, "metadata", version_acc) + cached = load_from_cache(cache_path, max_age_days=30) + + if cached and "metadata" in cached: + return cached["metadata"] + + if not ACCESSION_PATTERN.match(version_acc): + print(f" Skipping unexpected accession format: {version_acc}") + return {} + + cmd = [ + "datasets", "summary", "genome", "accession", + version_acc, "--as-json-lines", + ] + try: + result = utils.run_quoted( + cmd, + capture_output=True, + text=True, + encoding="utf-8", + errors="ignore", + timeout=60, + ) + if result.returncode == 0 and result.stdout and result.stdout.strip(): + version_data = json.loads(result.stdout.strip()) + save_to_cache(cache_path, { + "metadata": version_data, + "cached_at": time.time(), + }) + return version_data + + print(f" Warning: No metadata for {version_acc}") + except Exception as e: + print(f" Warning: Error fetching {version_acc}: {e}") + + return {} + + +def find_all_assembly_versions( + base_accession: str, work_dir: str +) -> list[dict]: + """Discover all versions and fetch metadata for each. + + Delegates to discover_version_accessions for FTP discovery and + fetch_version_metadata for per-version metadata retrieval. Both layers + use independent caches. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: List of metadata dicts, one per version found. + """ + accessions = discover_version_accessions(base_accession, work_dir) + versions = [] + for version_acc in accessions: + metadata = fetch_version_metadata(version_acc, work_dir) + if metadata: + versions.append(metadata) + return versions + + +def parse_historical_version( + version_data: dict, + config: Config, + base_accession: str, + version_num: int, + current_accession: str, +) -> dict: + """Parse a single historical version using GenomeHubs parser logic. + + Ensures consistency with current assemblies by reusing + process_assembly_report with version_status="superseded" and + fetch_and_parse_sequence_report. + + Args: + version_data (dict): Raw NCBI metadata from the datasets CLI. + config (Config): Config object loaded from the YAML file. + base_accession (str): Base accession (e.g. GCA_000002035). + version_num (int): Integer version (1, 2, 3, ...). + current_accession (str): The latest accession that superseded this one. + + Returns: + dict: Parsed row dict ready for TSV output. + """ + version_data = utils.convert_keys_to_camel_case(version_data) + + processed_report = process_assembly_report( + report=version_data, + previous_report=None, + config=config, + parsed={}, + version_status="superseded", + ) + + fetch_and_parse_sequence_report(processed_report) + + processed_report["processedAssemblyInfo"]["assemblyID"] = ( + f"{base_accession}_{version_num}" + ) + + return gh_utils.parse_report_values(config.parse_fns, processed_report) + + +def parse_version(accession: str) -> int: + """Extract version number from a dotted accession string. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + int: Version number (defaults to 1 if no dot-suffix). + """ + parts = accession.split(".") + return int(parts[1]) if len(parts) > 1 else 1 + + +def parse_accession(accession: str) -> tuple[str, int]: + """Split an accession into its base and version components. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + tuple: (base_accession, version_number). + """ + parts = accession.split(".") + return parts[0], int(parts[1]) if len(parts) > 1 else 1 + + +def derive_checkpoint_path( + input_path: str, yaml_path: str, work_dir: str +) -> str: + """Derive a stable checkpoint path from parser inputs. + + Places the checkpoint alongside the data in work_dir so its location + can be determined without extra CLI arguments. + + Args: + input_path (str): Path to the assembly report JSONL file. + yaml_path (str): Path to the parser YAML configuration file. + work_dir (str): Working directory. + + Returns: + str: Path to the checkpoint JSON file. + """ + input_stem = Path(input_path).stem + config_stem = Path(yaml_path).stem + checkpoint_dir = Path(work_dir) / "checkpoints" + checkpoint_dir.mkdir(parents=True, exist_ok=True) + name = f"backfill__{config_stem}__{input_stem}.json" + return str(checkpoint_dir / name) + + +def load_checkpoint(checkpoint_file: str) -> dict: + """Load checkpoint data if the file exists. + + Args: + checkpoint_file (str): Path to the checkpoint JSON file. + + Returns: + dict: Checkpoint dict, or empty dict if absent. + """ + if Path(checkpoint_file).exists(): + with open(checkpoint_file) as f: + return json.load(f) + return {} + + +def save_checkpoint(checkpoint_file: str, processed_count: int): + """Persist current progress to the checkpoint file. + + Args: + checkpoint_file (str): Path to the checkpoint JSON file. + processed_count (int): Number of assemblies processed so far. + """ + Path(checkpoint_file).parent.mkdir(parents=True, exist_ok=True) + with open(checkpoint_file, "w") as f: + json.dump({ + "processed_count": processed_count, + "timestamp": datetime.now().isoformat(), + }, f, indent=2) + + +def identify_assemblies_needing_backfill(input_path: str) -> list[dict]: + """Identify assemblies with version > 1 that need historical backfill. + + Args: + input_path (str): Path to assembly_data_report.jsonl. + + Returns: + list: Assembly info dicts describing what needs backfilling. + """ + assemblies = [] + with open(input_path) as f: + for line in f: + record = json.loads(line) + accession = record["accession"] + base_acc, version = parse_accession(accession) + + if version > 1: + assemblies.append({ + "base_accession": base_acc, + "current_version": version, + "current_accession": accession, + "historical_versions_needed": list(range(1, version)), + }) + return assemblies + + +@flow(log_prints=True) +def backfill_historical_versions( + input_path: str, + yaml_path: str, + work_dir: str = ".", + checkpoint_file: Optional[str] = None, +): + """One-time backfill of all historical assembly versions. + + Accumulates all parsed rows in memory and writes the output TSV once at + the end. Checkpoints are saved periodically so the run can be resumed + after interruption but do not trigger intermediate TSV writes. + + Args: + input_path (str): Path to assembly_data_report.jsonl. + yaml_path (str): Path to assembly_historical.yaml. + work_dir (str): Working directory for caches, checkpoints, and output. + checkpoint_file (str, optional): Explicit checkpoint path. Derived + from inputs when omitted. + """ + setup_cache_directories(work_dir) + config = utils.load_config(config_file=yaml_path) + checkpoint_file = checkpoint_file or derive_checkpoint_path( + input_path, yaml_path, work_dir, + ) + + print("Scanning for assemblies needing historical backfill...") + assemblies = identify_assemblies_needing_backfill(input_path) + + if not assemblies: + print("No assemblies with version > 1 found. Nothing to backfill.") + return + + checkpoint = load_checkpoint(checkpoint_file) + start_index = checkpoint.get("processed_count", 0) + + total_assemblies = len(assemblies) + total_versions = sum( + len(a["historical_versions_needed"]) for a in assemblies + ) + + print(f"\n{'=' * 80}") + print("ONE-TIME HISTORICAL BACKFILL") + print(f"{'=' * 80}") + print(f" Assemblies to process: {total_assemblies}") + print(f" Total historical versions: {total_versions}") + if start_index > 0: + print(f" Resuming from checkpoint: {start_index}/{total_assemblies}") + print(f"{'=' * 80}\n") + + parsed = {} + processed = start_index + + for assembly_info in assemblies[start_index:]: + base_acc = assembly_info["base_accession"] + current_version = assembly_info["current_version"] + current_accession = assembly_info["current_accession"] + + print( + f"[{processed + 1}/{total_assemblies}] " + f"{base_acc} (current: v{current_version})" + ) + + all_versions = find_all_assembly_versions(current_accession, work_dir) + if not all_versions: + print(" Warning: No versions found via FTP") + processed += 1 + continue + + for version_data in all_versions: + version_acc = version_data.get("accession", "") + version_num = parse_version(version_acc) + + if version_num >= current_version: + continue + + try: + print(f" Parsing v{version_num}...", end=" ", flush=True) + row = parse_historical_version( + version_data=version_data, + config=config, + base_accession=base_acc, + version_num=version_num, + current_accession=current_accession, + ) + genbank_acc = row.get("genbankAccession", version_acc) + parsed[genbank_acc] = row + print("done") + except Exception as e: + print(f"failed ({e})") + continue + + processed += 1 + + if processed % 100 == 0: + save_checkpoint(checkpoint_file, processed) + pct = processed / total_assemblies * 100 + print( + f"\n Checkpoint saved: " + f"{processed}/{total_assemblies} ({pct:.1f}%)\n" + ) + + if parsed: + print(f"\nWriting {len(parsed)} records to TSV...") + write_to_tsv(parsed, config) + + print(f"\n{'=' * 80}") + print("BACKFILL COMPLETE") + print(f"{'=' * 80}") + print(f" Processed: {processed}/{total_assemblies} assemblies") + print(f" Records written: {len(parsed)}") + print(f" Output: {config.meta['file_name']}") + print(f"\n Next step: Run daily incremental pipeline") + print(f"{'=' * 80}\n") + + +def backfill_historical_versions_wrapper( + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, +): + """Wrapper matching the fetch_parse_validate parser signature. + + Locates the *.jsonl input in work_dir and delegates to + backfill_historical_versions. + + Args: + working_yaml (str): Path to the working YAML file. + work_dir (str): Path to the working directory. + append (bool): Whether to append (unused, accepted for compatibility). + data_freeze_path (str, optional): Ignored; accepted for compatibility. + **kwargs: Additional keyword arguments. + """ + glob_path = os.path.join(work_dir, "*.jsonl") + paths = glob(glob_path) + if not paths: + raise FileNotFoundError(f"No jsonl file found in {work_dir}") + if len(paths) > 1: + raise ValueError(f"More than one jsonl file found in {work_dir}") + + backfill_historical_versions( + input_path=paths[0], + yaml_path=working_yaml, + work_dir=work_dir, + ) + + +def plugin(): + """Register the flow.""" + return Parser( + name="BACKFILL_HISTORICAL_VERSIONS", + func=backfill_historical_versions_wrapper, + description="One-time backfill of historical assembly versions.", + ) + + +if __name__ == "__main__": + """Run the flow.""" + args = _parse_args( + [required(INPUT_PATH), required(YAML_PATH), WORK_DIR], + description="One-time historical backfill for assembly versions", + ) + backfill_historical_versions(**vars(args)) diff --git a/tests/README_test_backfill.md b/tests/README_test_backfill.md deleted file mode 100644 index 471decf..0000000 --- a/tests/README_test_backfill.md +++ /dev/null @@ -1,57 +0,0 @@ -# Testing Guide for Historical Assembly Backfill - -This guide explains how to test the backfill implementation before creating a PR. - -## Prerequisites - -### 1. Install Dependencies - -```bash -# Install genomehubs and other dependencies -pip install -r requirements.txt - -# Or if using conda: -conda install -c conda-forge genomehubs>=2.10.14 -pip install prefect requests -``` - -### 2. Install NCBI datasets CLI - -The backfill script requires the NCBI `datasets` command-line tool. - -```bash -# Download and install from: -# https://www.ncbi.nlm.nih.gov/datasets/docs/v2/download-and-install/ - -# Verify installation: -datasets --version -``` - -## Test Suite - -### Quick Unit Tests - -Run the automated test suite: - -```bash -cd "c:\Users\fchen13\ASU Dropbox\Fang Chen\Work Documents\EBP\genomehubs-data" -python tests/test_backfill.py -``` - -### Full Backfill Test (Small Dataset) - -Test the complete backfill process on 3 assemblies: - -```bash -export SKIP_PREFECT=true -python -m flows.parsers.backfill_historical_versions \ - --input_path tests/test_data/assembly_test_sample.jsonl \ - --yaml_path configs/assembly_historical.yaml \ - --checkpoint tests/test_data/test_checkpoint.json -``` - -```powershell -$env:SKIP_PREFECT="true" -python -m flows.parsers.backfill_historical_versions --input_path tests/test_data/assembly_test_sample.jsonl --yaml_path configs/assembly_historical.yaml --checkpoint tests/test_data/test_checkpoint.json -``` - diff --git a/tests/test_backfill.py b/tests/test_backfill.py index d9d6c6c..d95aee3 100644 --- a/tests/test_backfill.py +++ b/tests/test_backfill.py @@ -1,214 +1,439 @@ -#!/usr/bin/env python3 -""" -Test script for backfill_historical_versions.py - -This script tests the historical version backfill process on a small sample dataset. - -Usage: - python tests/test_backfill.py +"""Tests for parse_backfill_historical_versions.py + +Covers: +- Accession parsing helpers +- Assembly identification from JSONL fixture +- Cache round-trip (save/load with expiry) +- Checkpoint save/load/derive +- Accession format validation +- parse_historical_version calls correct functions with correct args +- backfill_historical_versions orchestrator: accumulates all rows and writes + TSV exactly once (regression test for the batch-overwrite data-loss bug) """ +import json import os import sys -import json +import time from pathlib import Path +from unittest.mock import MagicMock, call, patch + +import pytest -# Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) -from flows.parsers.backfill_historical_versions import ( +os.environ["SKIP_PREFECT"] = "true" + +from flows.parsers import ( + parse_backfill_historical_versions as backfill_module, +) +from flows.parsers.parse_backfill_historical_versions import ( + ACCESSION_PATTERN, + backfill_historical_versions, + derive_checkpoint_path, + get_cache_path, identify_assemblies_needing_backfill, + load_checkpoint, + load_from_cache, parse_accession, - find_all_assembly_versions + parse_version, + save_checkpoint, + save_to_cache, ) -def test_parse_accession(): - """Test accession parsing.""" - print("\n" + "="*80) - print("TEST 1: Accession Parsing") - print("="*80) - - test_cases = [ - ("GCA_000222935.2", ("GCA_000222935", 2)), - ("GCA_003706615.3", ("GCA_003706615", 3)), - ("GCF_000001405.39", ("GCF_000001405", 39)), - ] - - for accession, expected in test_cases: - result = parse_accession(accession) - status = "PASS" if result == expected else "FAIL" - print(f" {status} {accession} -> {result}") - assert result == expected, f"Expected {expected}, got {result}" - - print(" All accession parsing tests passed!") - -def test_identify_assemblies(): - """Test identification of assemblies needing backfill.""" - print("\n" + "="*80) - print("TEST 2: Identify Assemblies Needing Backfill") - print("="*80) - - test_file = "tests/test_data/assembly_test_sample.jsonl" - - if not os.path.exists(test_file): - print(f" FAIL Test file not found: {test_file}") - return False - - assemblies = identify_assemblies_needing_backfill(test_file) - - print(f" Found {len(assemblies)} assemblies needing backfill:") - for asm in assemblies: - print(f" - {asm['current_accession']}: v{asm['current_version']} " - f"(needs v{asm['historical_versions_needed']})") - - # Verify we found the expected assemblies - expected_count = 3 # GCA_000222935.2, GCA_000412225.2, GCA_003706615.3 - assert len(assemblies) == expected_count, \ - f"Expected {expected_count} assemblies, found {len(assemblies)}" - - print(f" PASS Correctly identified {len(assemblies)} assemblies") - return True - -def test_version_discovery(): - """Test FTP-based version discovery (using cache if available).""" - print("\n" + "="*80) - print("TEST 3: Version Discovery via FTP") - print("="*80) - print(" Note: This test queries NCBI FTP - may take a minute...") - - # Test with a known multi-version assembly - test_accession = "GCA_000222935.2" # Aciculosporium take - has version 1 and 2 - - print(f" Testing: {test_accession}") - versions = find_all_assembly_versions(test_accession) - - if not versions: - print(f" FAIL No versions found (FTP query may have failed)") - print(f" This is not critical - may be network issue") - return False - - print(f" PASS Found {len(versions)} version(s):") - for v in versions: - acc = v.get('accession', 'unknown') - print(f" - {acc}") - - # Verify we found at least version 1 and 2 - accessions = [v.get('accession', '') for v in versions] - base = test_accession.split('.')[0] - - # Should find both v1 and v2 - expected_versions = [f"{base}.1", f"{base}.2"] - found_expected = [v for v in expected_versions if v in accessions] - - print(f" PASS Found {len(found_expected)}/{len(expected_versions)} expected versions") - return True - -def test_cache_functionality(): - """Test that caching works correctly.""" - print("\n" + "="*80) - print("TEST 4: Cache Functionality") - print("="*80) - - # Clean cache first (Windows-safe deletion) - import shutil - import time as time_module - cache_dir = "tmp/backfill_cache" - if os.path.exists(cache_dir): - try: - # Give time for file handles to close - time_module.sleep(0.5) - shutil.rmtree(cache_dir) - print(f" Cleared cache directory: {cache_dir}") - except PermissionError: - print(f" Note: Cache directory in use, will test with existing cache") - - test_accession = "GCA_000222935.2" - - # First call - should fetch from FTP - print(f" First call (should fetch from FTP)...") - import time - start = time.time() - versions1 = find_all_assembly_versions(test_accession) - time1 = time.time() - start - - if not versions1: - print(f" FAIL FTP fetch failed - skipping cache test") - return False - - print(f" Took {time1:.2f}s, found {len(versions1)} versions") - - # Second call - should use cache - print(f" Second call (should use cache)...") - start = time.time() - versions2 = find_all_assembly_versions(test_accession) - time2 = time.time() - start - - print(f" Took {time2:.2f}s, found {len(versions2)} versions") - - # Cache should be much faster - if time2 < time1 * 0.5: # At least 50% faster - print(f" PASS Cache is working (2nd call {time2/time1*100:.1f}% of 1st call time)") - else: - print(f" WARN Cache may not be working (times similar)") - - # Verify cache files exist - if os.path.exists(cache_dir): - cache_files = list(Path(cache_dir).rglob("*.json")) - print(f" PASS Cache directory created with {len(cache_files)} files") - else: - print(f" FAIL Cache directory not created") - - return True - -def main(): - """Run all tests.""" - print("\n" + "="*80) - print("BACKFILL SCRIPT TEST SUITE") - print("="*80) - print("Testing: flows/parsers/backfill_historical_versions.py") - print("="*80) - - results = { - "Accession Parsing": False, - "Identify Assemblies": False, - "Version Discovery": False, - "Cache Functionality": False, - } - - try: - # Run tests - test_parse_accession() - results["Accession Parsing"] = True - - results["Identify Assemblies"] = test_identify_assemblies() - results["Version Discovery"] = test_version_discovery() - results["Cache Functionality"] = test_cache_functionality() - - except Exception as e: - print(f"\nFAIL Test failed with error: {e}") - import traceback - traceback.print_exc() - - # Summary - print("\n" + "="*80) - print("TEST SUMMARY") - print("="*80) - - for test_name, passed in results.items(): - status = "PASS PASS" if passed else "FAIL FAIL" - print(f" {status}: {test_name}") - - passed_count = sum(results.values()) - total_count = len(results) - - print(f"\n Total: {passed_count}/{total_count} tests passed") - - if passed_count == total_count: - print("\n 🎉 All tests passed!") - return 0 - else: - print("\n WARN Some tests failed - review output above") - return 1 - -if __name__ == '__main__': - sys.exit(main()) +FIXTURE_JSONL = "tests/test_data/assembly_test_sample.jsonl" + + +class TestParseAccession: + """Unit tests for parse_accession and parse_version.""" + + @pytest.mark.parametrize( + "accession, expected", + [ + ("GCA_000222935.2", ("GCA_000222935", 2)), + ("GCA_003706615.3", ("GCA_003706615", 3)), + ("GCF_000001405.39", ("GCF_000001405", 39)), + ], + ) + def test_parse_accession(self, accession, expected): + assert parse_accession(accession) == expected + + def test_parse_accession_no_version(self): + assert parse_accession("GCA_000222935") == ("GCA_000222935", 1) + + @pytest.mark.parametrize( + "accession, expected", + [ + ("GCA_000222935.2", 2), + ("GCA_000222935.10", 10), + ("GCA_000222935", 1), + ], + ) + def test_parse_version(self, accession, expected): + assert parse_version(accession) == expected + + +class TestAccessionPattern: + """Ensure the compiled ACCESSION_PATTERN validates correctly.""" + + @pytest.mark.parametrize( + "accession", + ["GCA_000222935.2", "GCF_000001405.39", "GCA_123456789.1"], + ) + def test_valid(self, accession): + assert ACCESSION_PATTERN.match(accession) + + @pytest.mark.parametrize( + "accession", + ["GCA_00022293.2", "GCA_0002229350.2", "XYZ_000222935.2", "hello"], + ) + def test_invalid(self, accession): + assert not ACCESSION_PATTERN.match(accession) + + +class TestIdentifyAssemblies: + """Identify which assemblies need backfill from the JSONL fixture.""" + + @pytest.fixture() + def fixture_path(self): + if not os.path.exists(FIXTURE_JSONL): + pytest.skip(f"Fixture not found: {FIXTURE_JSONL}") + return FIXTURE_JSONL + + def test_finds_multi_version_assemblies(self, fixture_path): + assemblies = identify_assemblies_needing_backfill(fixture_path) + assert len(assemblies) == 3 + + def test_fields_present(self, fixture_path): + assemblies = identify_assemblies_needing_backfill(fixture_path) + for asm in assemblies: + assert "base_accession" in asm + assert "current_version" in asm + assert "current_accession" in asm + assert "historical_versions_needed" in asm + + def test_version_ranges(self, fixture_path): + assemblies = identify_assemblies_needing_backfill(fixture_path) + by_base = {a["base_accession"]: a for a in assemblies} + assert by_base["GCA_000222935"]["historical_versions_needed"] == [1] + assert by_base["GCA_003706615"]["historical_versions_needed"] == [1, 2] + + def test_ignores_version_one(self, tmp_path): + """An assembly at version 1 should not appear in the results.""" + jsonl = tmp_path / "v1.jsonl" + jsonl.write_text(json.dumps({"accession": "GCA_999999999.1"}) + "\n") + assert identify_assemblies_needing_backfill(str(jsonl)) == [] + + +class TestCache: + """Round-trip and expiry tests for the JSON cache layer.""" + + def test_save_and_load(self, tmp_path): + path = str(tmp_path / "test.json") + save_to_cache(path, {"key": "value"}) + assert load_from_cache(path) == {"key": "value"} + + def test_expired_cache_returns_empty(self, tmp_path): + path = str(tmp_path / "old.json") + save_to_cache(path, {"key": "value"}) + os.utime(path, (0, 0)) + assert load_from_cache(path, max_age_days=1) == {} + + def test_missing_file_returns_empty(self, tmp_path): + assert load_from_cache(str(tmp_path / "nope.json")) == {} + + def test_get_cache_path_sanitises(self, tmp_path): + path = get_cache_path(str(tmp_path), "metadata", "GCA_000222935.2") + assert "GCA_000222935.2" in path + assert path.endswith(".json") + + +class TestCheckpoint: + """Checkpoint save/load/derive tests.""" + + def test_round_trip(self, tmp_path): + path = str(tmp_path / "cp.json") + save_checkpoint(path, 42) + cp = load_checkpoint(path) + assert cp["processed_count"] == 42 + assert "timestamp" in cp + + def test_missing_checkpoint_returns_empty(self, tmp_path): + assert load_checkpoint(str(tmp_path / "nope.json")) == {} + + def test_derive_is_deterministic(self, tmp_path): + path_a = derive_checkpoint_path("a.jsonl", "b.yaml", str(tmp_path)) + path_b = derive_checkpoint_path("a.jsonl", "b.yaml", str(tmp_path)) + assert path_a == path_b + + def test_derive_different_inputs_differ(self, tmp_path): + path_a = derive_checkpoint_path("a.jsonl", "b.yaml", str(tmp_path)) + path_c = derive_checkpoint_path("c.jsonl", "b.yaml", str(tmp_path)) + assert path_a != path_c + + +class TestParseHistoricalVersion: + """Verify parse_historical_version calls the right functions.""" + + @patch.object(backfill_module, "gh_utils") + @patch.object(backfill_module, "fetch_and_parse_sequence_report") + @patch.object(backfill_module, "process_assembly_report") + @patch.object(backfill_module, "utils") + def test_calls_process_with_superseded( + self, mock_utils, mock_process, mock_seq, mock_gh + ): + mock_utils.convert_keys_to_camel_case.return_value = { + "accession": "GCA_000222935.1" + } + mock_process.return_value = { + "processedAssemblyInfo": { + "genbankAccession": "GCA_000222935.1", + "versionStatus": "superseded", + } + } + mock_gh.parse_report_values.return_value = { + "genbankAccession": "GCA_000222935.1", + "assemblyID": "GCA_000222935_1", + } + + config = MagicMock() + row = backfill_module.parse_historical_version( + version_data={"accession": "GCA_000222935.1"}, + config=config, + base_accession="GCA_000222935", + version_num=1, + current_accession="GCA_000222935.2", + ) + + mock_process.assert_called_once() + _, kwargs = mock_process.call_args + assert kwargs["version_status"] == "superseded" + + mock_seq.assert_called_once() + + assert row["genbankAccession"] == "GCA_000222935.1" + + @patch.object(backfill_module, "gh_utils") + @patch.object(backfill_module, "fetch_and_parse_sequence_report") + @patch.object(backfill_module, "process_assembly_report") + @patch.object(backfill_module, "utils") + def test_sets_assembly_id( + self, mock_utils, mock_process, mock_seq, mock_gh + ): + mock_utils.convert_keys_to_camel_case.return_value = { + "accession": "GCA_000222935.1" + } + mock_process.return_value = { + "processedAssemblyInfo": { + "genbankAccession": "GCA_000222935.1", + } + } + mock_gh.parse_report_values.return_value = {} + + config = MagicMock() + backfill_module.parse_historical_version( + version_data={}, + config=config, + base_accession="GCA_000222935", + version_num=1, + current_accession="GCA_000222935.2", + ) + + report = mock_process.return_value + assert report["processedAssemblyInfo"]["assemblyID"] == "GCA_000222935_1" + + +class TestBackfillOrchestrator: + """Integration tests for the backfill_historical_versions flow. + + Mocks external dependencies (FTP, datasets CLI, parser functions, TSV + writer) so the test is fast and offline. Focuses on verifying: + - All discovered versions are requested and parsed + - write_to_tsv is called exactly once with all accumulated rows + - Checkpoint does not clear the parsed dict (regression for data-loss bug) + """ + + def _make_jsonl(self, tmp_path, records): + """Write records to a JSONL fixture file.""" + path = tmp_path / "input.jsonl" + lines = [json.dumps(r) for r in records] + path.write_text("\n".join(lines) + "\n") + return str(path) + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_writes_tsv_once_with_all_rows( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """Regression: old code wrote per-batch and cleared parsed dict.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": "assembly_historical.tsv"} + ) + mock_find.return_value = [ + {"accession": "GCA_000222935.1"}, + ] + mock_parse.return_value = { + "genbankAccession": "GCA_000222935.1", + "assemblyID": "GCA_000222935_1", + } + + input_path = self._make_jsonl(tmp_path, [ + {"accession": "GCA_000222935.2"}, + ]) + yaml_path = str(tmp_path / "config.yaml") + + backfill_historical_versions( + input_path=input_path, + yaml_path=yaml_path, + work_dir=str(tmp_path), + ) + + mock_write.assert_called_once() + written_parsed = mock_write.call_args[0][0] + assert "GCA_000222935.1" in written_parsed + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_multiple_assemblies_all_in_one_write( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """All rows from multiple assemblies must appear in a single write.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": "assembly_historical.tsv"} + ) + + def fake_find(accession, work_dir): + base = accession.split(".")[0] + return [{"accession": f"{base}.1"}] + + mock_find.side_effect = fake_find + + call_count = {"n": 0} + + def fake_parse(version_data, **kwargs): + call_count["n"] += 1 + acc = version_data["accession"] + return {"genbankAccession": acc, "assemblyID": acc.replace(".", "_")} + + mock_parse.side_effect = fake_parse + + input_path = self._make_jsonl(tmp_path, [ + {"accession": "GCA_000222935.2"}, + {"accession": "GCA_000412225.2"}, + {"accession": "GCA_003706615.3"}, + ]) + + backfill_historical_versions( + input_path=input_path, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + + mock_write.assert_called_once() + written_parsed = mock_write.call_args[0][0] + assert len(written_parsed) == 3 + assert "GCA_000222935.1" in written_parsed + assert "GCA_000412225.1" in written_parsed + assert "GCA_003706615.1" in written_parsed + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_skips_current_version( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """Versions >= current should not be parsed.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": "out.tsv"} + ) + mock_find.return_value = [ + {"accession": "GCA_000222935.1"}, + {"accession": "GCA_000222935.2"}, + ] + mock_parse.return_value = { + "genbankAccession": "GCA_000222935.1", + } + + input_path = self._make_jsonl(tmp_path, [ + {"accession": "GCA_000222935.2"}, + ]) + + backfill_historical_versions( + input_path=input_path, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + + mock_parse.assert_called_once() + parsed_acc = mock_parse.call_args[1]["version_data"]["accession"] + assert parsed_acc == "GCA_000222935.1" + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "utils") + def test_no_write_when_nothing_to_backfill( + self, mock_utils, mock_find, mock_write, tmp_path + ): + """If all assemblies are v1, write_to_tsv should not be called.""" + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": "out.tsv"} + ) + input_path = self._make_jsonl(tmp_path, [ + {"accession": "GCA_000222935.1"}, + ]) + + backfill_historical_versions( + input_path=input_path, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + + mock_write.assert_not_called() + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_checkpoint_does_not_clear_parsed( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """Rows parsed before a checkpoint must still be in the final write. + + This is the core regression test for Rich's bug report: 4006 + assemblies processed, only 6 in the output file. + """ + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": "out.tsv"} + ) + + records = [ + {"accession": f"GCA_{str(i).zfill(9)}.2"} for i in range(150) + ] + input_path = self._make_jsonl(tmp_path, records) + + def fake_find(accession, work_dir): + base = accession.split(".")[0] + return [{"accession": f"{base}.1"}] + + mock_find.side_effect = fake_find + + def fake_parse(version_data, **kwargs): + acc = version_data["accession"] + return {"genbankAccession": acc} + + mock_parse.side_effect = fake_parse + + backfill_historical_versions( + input_path=input_path, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + + mock_write.assert_called_once() + written_parsed = mock_write.call_args[0][0] + assert len(written_parsed) == 150 From 096e53e0163fa5548c45d7b13555a852d338203f Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Thu, 19 Mar 2026 12:19:38 -0700 Subject: [PATCH 11/14] update on flake8 import --- flows/parsers/parse_backfill_historical_versions.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/flows/parsers/parse_backfill_historical_versions.py b/flows/parsers/parse_backfill_historical_versions.py index 5bce801..4324580 100644 --- a/flows/parsers/parse_backfill_historical_versions.py +++ b/flows/parsers/parse_backfill_historical_versions.py @@ -24,13 +24,9 @@ from flows.lib import utils from flows.lib.conditional_import import flow -from flows.lib.shared_args import ( - INPUT_PATH, - WORK_DIR, - YAML_PATH, - parse_args as _parse_args, - required, -) +from flows.lib.shared_args import INPUT_PATH, WORK_DIR, YAML_PATH +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required from flows.lib.utils import Config, Parser from flows.parsers.parse_ncbi_assemblies import ( fetch_and_parse_sequence_report, @@ -491,7 +487,7 @@ def backfill_historical_versions( print(f" Processed: {processed}/{total_assemblies} assemblies") print(f" Records written: {len(parsed)}") print(f" Output: {config.meta['file_name']}") - print(f"\n Next step: Run daily incremental pipeline") + print("\n Next step: Run daily incremental pipeline") print(f"{'=' * 80}\n") From 97961a3e23aa489d97dcc88a2b0e4fb58a7fb74a Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Thu, 19 Mar 2026 12:23:23 -0700 Subject: [PATCH 12/14] Fix flake8: remove unused imports, add noqa for E402 --- tests/test_backfill.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_backfill.py b/tests/test_backfill.py index d95aee3..7616c3d 100644 --- a/tests/test_backfill.py +++ b/tests/test_backfill.py @@ -14,9 +14,8 @@ import json import os import sys -import time from pathlib import Path -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, patch import pytest @@ -24,10 +23,10 @@ os.environ["SKIP_PREFECT"] = "true" -from flows.parsers import ( +from flows.parsers import ( # noqa: E402 parse_backfill_historical_versions as backfill_module, ) -from flows.parsers.parse_backfill_historical_versions import ( +from flows.parsers.parse_backfill_historical_versions import ( # noqa: E402 ACCESSION_PATTERN, backfill_historical_versions, derive_checkpoint_path, From 4d8a0f4f22be60f6ea91ec664bea84b4fbb96c65 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Wed, 1 Apr 2026 09:56:27 -0700 Subject: [PATCH 13/14] fix: rename yaml to .types.yaml, taxon_names field, and checkpoint re-run bug Rename configs/assembly_historical.yaml to assembly_historical.types.yaml. Rename names to taxon_names in yaml so output passes GenomeHubs validation. Fix checkpoint bug: save completed=True at end of run so re-runs reset start_index to 0 and write all rows; network fetches still use on-disk cache. Add 3 new tests covering completed flag and re-run regression (36 total, all passing). Made-with: Cursor --- Phase_0_PR_SUMMARY.md | 13 ++-- ...al.yaml => assembly_historical.types.yaml} | 2 +- .../parse_backfill_historical_versions.py | 22 ++++-- tests/test_backfill.py | 68 +++++++++++++++++++ 4 files changed, 95 insertions(+), 10 deletions(-) rename configs/{assembly_historical.yaml => assembly_historical.types.yaml} (99%) diff --git a/Phase_0_PR_SUMMARY.md b/Phase_0_PR_SUMMARY.md index 1f31702..0370bd9 100644 --- a/Phase_0_PR_SUMMARY.md +++ b/Phase_0_PR_SUMMARY.md @@ -36,9 +36,9 @@ milestone tracking by capturing previously untracked superseded versions. - **Code style** aligned with GenomeHubs conventions: Google-style docstrings, lowercase type hints (`dict`, `list`, `tuple`), `e` for exception variables, removed shebang and section banners. -- **`assembly_historical.yaml`**: moved `needs` under `file:` section and +- **`assembly_historical.types.yaml`**: moved `needs` under `file:` section, references `ATTR_assembly.types.yaml` (matches `ncbi_datasets_eukaryota` - convention). + convention), and renamed `names` → `taxon_names` for validation compliance. ### Test suite rewrite - Rewrote `tests/test_backfill.py` using pytest (was a custom runner). @@ -70,14 +70,17 @@ The backfill script: ### Checkpoint System - Saves progress every 100 assemblies to `{work_dir}/checkpoints/` -- Allows resuming after interruptions +- Allows resuming after interruptions without re-fetching cached data +- Marks the checkpoint as `completed` at the end of a full run so the next + re-run starts from index 0 (all rows collected; network fetches still served + from cache) - Does **not** trigger intermediate TSV writes (avoids the overwrite bug) ## Files ### New - `flows/parsers/parse_backfill_historical_versions.py` — Main backfill flow -- `configs/assembly_historical.yaml` — Output schema configuration +- `configs/assembly_historical.types.yaml` — Output schema configuration - `tests/test_backfill.py` — pytest suite (33 tests) - `tests/test_data/assembly_test_sample.jsonl` — Test fixture (3 assemblies) @@ -90,7 +93,7 @@ The backfill script: ```bash python -m flows.parsers.parse_backfill_historical_versions \ --input_path data/assembly_data_report.jsonl \ - --yaml_path configs/assembly_historical.yaml \ + --yaml_path configs/assembly_historical.types.yaml \ --work_dir tmp ``` diff --git a/configs/assembly_historical.yaml b/configs/assembly_historical.types.yaml similarity index 99% rename from configs/assembly_historical.yaml rename to configs/assembly_historical.types.yaml index c4086cd..b98f0fd 100644 --- a/configs/assembly_historical.yaml +++ b/configs/assembly_historical.types.yaml @@ -187,7 +187,7 @@ metadata: header: genbankAccession path: processedAssemblyInfo.genbankAccession -names: +taxon_names: common_name: header: commonName path: organism.commonName diff --git a/flows/parsers/parse_backfill_historical_versions.py b/flows/parsers/parse_backfill_historical_versions.py index 4324580..cc18b17 100644 --- a/flows/parsers/parse_backfill_historical_versions.py +++ b/flows/parsers/parse_backfill_historical_versions.py @@ -6,7 +6,7 @@ Usage: python -m flows.parsers.parse_backfill_historical_versions \\ --input_path data/assembly_data_report.jsonl \\ - --yaml_path configs/assembly_historical.yaml \\ + --yaml_path configs/assembly_historical.types.yaml \\ --work_dir tmp """ @@ -334,17 +334,23 @@ def load_checkpoint(checkpoint_file: str) -> dict: return {} -def save_checkpoint(checkpoint_file: str, processed_count: int): +def save_checkpoint( + checkpoint_file: str, processed_count: int, completed: bool = False +): """Persist current progress to the checkpoint file. Args: checkpoint_file (str): Path to the checkpoint JSON file. processed_count (int): Number of assemblies processed so far. + completed (bool): True when the full run finished successfully. + A completed checkpoint resets start_index on the next run so + all entries are re-collected (using cached network data). """ Path(checkpoint_file).parent.mkdir(parents=True, exist_ok=True) with open(checkpoint_file, "w") as f: json.dump({ "processed_count": processed_count, + "completed": completed, "timestamp": datetime.now().isoformat(), }, f, indent=2) @@ -390,7 +396,7 @@ def backfill_historical_versions( Args: input_path (str): Path to assembly_data_report.jsonl. - yaml_path (str): Path to assembly_historical.yaml. + yaml_path (str): Path to assembly_historical.types.yaml. work_dir (str): Working directory for caches, checkpoints, and output. checkpoint_file (str, optional): Explicit checkpoint path. Derived from inputs when omitted. @@ -409,7 +415,13 @@ def backfill_historical_versions( return checkpoint = load_checkpoint(checkpoint_file) - start_index = checkpoint.get("processed_count", 0) + # A completed checkpoint means the previous run finished successfully. + # Reset to 0 so all entries are collected again (network fetches still use + # the on-disk cache, so the re-run is fast). + if checkpoint.get("completed", False): + start_index = 0 + else: + start_index = checkpoint.get("processed_count", 0) total_assemblies = len(assemblies) total_versions = sum( @@ -481,6 +493,8 @@ def backfill_historical_versions( print(f"\nWriting {len(parsed)} records to TSV...") write_to_tsv(parsed, config) + save_checkpoint(checkpoint_file, processed, completed=True) + print(f"\n{'=' * 80}") print("BACKFILL COMPLETE") print(f"{'=' * 80}") diff --git a/tests/test_backfill.py b/tests/test_backfill.py index 7616c3d..2a124a7 100644 --- a/tests/test_backfill.py +++ b/tests/test_backfill.py @@ -170,6 +170,21 @@ def test_derive_different_inputs_differ(self, tmp_path): path_c = derive_checkpoint_path("c.jsonl", "b.yaml", str(tmp_path)) assert path_a != path_c + def test_completed_flag_round_trip(self, tmp_path): + """A checkpoint saved with completed=True must expose that flag.""" + path = str(tmp_path / "cp.json") + save_checkpoint(path, 100, completed=True) + cp = load_checkpoint(path) + assert cp["completed"] is True + assert cp["processed_count"] == 100 + + def test_incomplete_checkpoint_has_completed_false(self, tmp_path): + """Periodic mid-run saves must not set the completed flag.""" + path = str(tmp_path / "cp.json") + save_checkpoint(path, 50) + cp = load_checkpoint(path) + assert cp.get("completed", False) is False + class TestParseHistoricalVersion: """Verify parse_historical_version calls the right functions.""" @@ -436,3 +451,56 @@ def fake_parse(version_data, **kwargs): mock_write.assert_called_once() written_parsed = mock_write.call_args[0][0] assert len(written_parsed) == 150 + + @patch.object(backfill_module, "write_to_tsv") + @patch.object(backfill_module, "find_all_assembly_versions") + @patch.object(backfill_module, "parse_historical_version") + @patch.object(backfill_module, "utils") + def test_rerun_after_completed_checkpoint_writes_all_rows( + self, mock_utils, mock_parse, mock_find, mock_write, tmp_path + ): + """Re-running after a completed run must still write all rows. + + Regression test for Rich's report: second run skipped first ~4200 + entries and only wrote the remaining ~70 because the checkpoint + held processed_count == total at the end of the first run. + """ + mock_utils.load_config.return_value = MagicMock( + meta={"file_name": "out.tsv"} + ) + + records = [ + {"accession": f"GCA_{str(i).zfill(9)}.2"} for i in range(20) + ] + input_path = self._make_jsonl(tmp_path, records) + + def fake_find(accession, work_dir): + base = accession.split(".")[0] + return [{"accession": f"{base}.1"}] + + mock_find.side_effect = fake_find + + def fake_parse(version_data, **kwargs): + return {"genbankAccession": version_data["accession"]} + + mock_parse.side_effect = fake_parse + + common_args = dict( + input_path=input_path, + yaml_path=str(tmp_path / "config.yaml"), + work_dir=str(tmp_path), + ) + + # First run — completes and saves a completed checkpoint. + backfill_historical_versions(**common_args) + assert mock_write.call_count == 1 + + mock_write.reset_mock() + mock_find.reset_mock() + mock_parse.reset_mock() + + # Second run — must process all 20 entries, not just the tail. + backfill_historical_versions(**common_args) + assert mock_write.call_count == 1 + written_parsed = mock_write.call_args[0][0] + assert len(written_parsed) == 20 From 09a0fc7b4b6c04e55404b1dcf484baf422a40569 Mon Sep 17 00:00:00 2001 From: Fang Chen Date: Wed, 1 Apr 2026 10:08:09 -0700 Subject: [PATCH 14/14] untrack backfill test file --- tests/test_backfill.py | 506 ----------------------------------------- 1 file changed, 506 deletions(-) delete mode 100644 tests/test_backfill.py diff --git a/tests/test_backfill.py b/tests/test_backfill.py deleted file mode 100644 index 2a124a7..0000000 --- a/tests/test_backfill.py +++ /dev/null @@ -1,506 +0,0 @@ -"""Tests for parse_backfill_historical_versions.py - -Covers: -- Accession parsing helpers -- Assembly identification from JSONL fixture -- Cache round-trip (save/load with expiry) -- Checkpoint save/load/derive -- Accession format validation -- parse_historical_version calls correct functions with correct args -- backfill_historical_versions orchestrator: accumulates all rows and writes - TSV exactly once (regression test for the batch-overwrite data-loss bug) -""" - -import json -import os -import sys -from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest - -sys.path.insert(0, str(Path(__file__).parent.parent)) - -os.environ["SKIP_PREFECT"] = "true" - -from flows.parsers import ( # noqa: E402 - parse_backfill_historical_versions as backfill_module, -) -from flows.parsers.parse_backfill_historical_versions import ( # noqa: E402 - ACCESSION_PATTERN, - backfill_historical_versions, - derive_checkpoint_path, - get_cache_path, - identify_assemblies_needing_backfill, - load_checkpoint, - load_from_cache, - parse_accession, - parse_version, - save_checkpoint, - save_to_cache, -) - -FIXTURE_JSONL = "tests/test_data/assembly_test_sample.jsonl" - - -class TestParseAccession: - """Unit tests for parse_accession and parse_version.""" - - @pytest.mark.parametrize( - "accession, expected", - [ - ("GCA_000222935.2", ("GCA_000222935", 2)), - ("GCA_003706615.3", ("GCA_003706615", 3)), - ("GCF_000001405.39", ("GCF_000001405", 39)), - ], - ) - def test_parse_accession(self, accession, expected): - assert parse_accession(accession) == expected - - def test_parse_accession_no_version(self): - assert parse_accession("GCA_000222935") == ("GCA_000222935", 1) - - @pytest.mark.parametrize( - "accession, expected", - [ - ("GCA_000222935.2", 2), - ("GCA_000222935.10", 10), - ("GCA_000222935", 1), - ], - ) - def test_parse_version(self, accession, expected): - assert parse_version(accession) == expected - - -class TestAccessionPattern: - """Ensure the compiled ACCESSION_PATTERN validates correctly.""" - - @pytest.mark.parametrize( - "accession", - ["GCA_000222935.2", "GCF_000001405.39", "GCA_123456789.1"], - ) - def test_valid(self, accession): - assert ACCESSION_PATTERN.match(accession) - - @pytest.mark.parametrize( - "accession", - ["GCA_00022293.2", "GCA_0002229350.2", "XYZ_000222935.2", "hello"], - ) - def test_invalid(self, accession): - assert not ACCESSION_PATTERN.match(accession) - - -class TestIdentifyAssemblies: - """Identify which assemblies need backfill from the JSONL fixture.""" - - @pytest.fixture() - def fixture_path(self): - if not os.path.exists(FIXTURE_JSONL): - pytest.skip(f"Fixture not found: {FIXTURE_JSONL}") - return FIXTURE_JSONL - - def test_finds_multi_version_assemblies(self, fixture_path): - assemblies = identify_assemblies_needing_backfill(fixture_path) - assert len(assemblies) == 3 - - def test_fields_present(self, fixture_path): - assemblies = identify_assemblies_needing_backfill(fixture_path) - for asm in assemblies: - assert "base_accession" in asm - assert "current_version" in asm - assert "current_accession" in asm - assert "historical_versions_needed" in asm - - def test_version_ranges(self, fixture_path): - assemblies = identify_assemblies_needing_backfill(fixture_path) - by_base = {a["base_accession"]: a for a in assemblies} - assert by_base["GCA_000222935"]["historical_versions_needed"] == [1] - assert by_base["GCA_003706615"]["historical_versions_needed"] == [1, 2] - - def test_ignores_version_one(self, tmp_path): - """An assembly at version 1 should not appear in the results.""" - jsonl = tmp_path / "v1.jsonl" - jsonl.write_text(json.dumps({"accession": "GCA_999999999.1"}) + "\n") - assert identify_assemblies_needing_backfill(str(jsonl)) == [] - - -class TestCache: - """Round-trip and expiry tests for the JSON cache layer.""" - - def test_save_and_load(self, tmp_path): - path = str(tmp_path / "test.json") - save_to_cache(path, {"key": "value"}) - assert load_from_cache(path) == {"key": "value"} - - def test_expired_cache_returns_empty(self, tmp_path): - path = str(tmp_path / "old.json") - save_to_cache(path, {"key": "value"}) - os.utime(path, (0, 0)) - assert load_from_cache(path, max_age_days=1) == {} - - def test_missing_file_returns_empty(self, tmp_path): - assert load_from_cache(str(tmp_path / "nope.json")) == {} - - def test_get_cache_path_sanitises(self, tmp_path): - path = get_cache_path(str(tmp_path), "metadata", "GCA_000222935.2") - assert "GCA_000222935.2" in path - assert path.endswith(".json") - - -class TestCheckpoint: - """Checkpoint save/load/derive tests.""" - - def test_round_trip(self, tmp_path): - path = str(tmp_path / "cp.json") - save_checkpoint(path, 42) - cp = load_checkpoint(path) - assert cp["processed_count"] == 42 - assert "timestamp" in cp - - def test_missing_checkpoint_returns_empty(self, tmp_path): - assert load_checkpoint(str(tmp_path / "nope.json")) == {} - - def test_derive_is_deterministic(self, tmp_path): - path_a = derive_checkpoint_path("a.jsonl", "b.yaml", str(tmp_path)) - path_b = derive_checkpoint_path("a.jsonl", "b.yaml", str(tmp_path)) - assert path_a == path_b - - def test_derive_different_inputs_differ(self, tmp_path): - path_a = derive_checkpoint_path("a.jsonl", "b.yaml", str(tmp_path)) - path_c = derive_checkpoint_path("c.jsonl", "b.yaml", str(tmp_path)) - assert path_a != path_c - - def test_completed_flag_round_trip(self, tmp_path): - """A checkpoint saved with completed=True must expose that flag.""" - path = str(tmp_path / "cp.json") - save_checkpoint(path, 100, completed=True) - cp = load_checkpoint(path) - assert cp["completed"] is True - assert cp["processed_count"] == 100 - - def test_incomplete_checkpoint_has_completed_false(self, tmp_path): - """Periodic mid-run saves must not set the completed flag.""" - path = str(tmp_path / "cp.json") - save_checkpoint(path, 50) - cp = load_checkpoint(path) - assert cp.get("completed", False) is False - - -class TestParseHistoricalVersion: - """Verify parse_historical_version calls the right functions.""" - - @patch.object(backfill_module, "gh_utils") - @patch.object(backfill_module, "fetch_and_parse_sequence_report") - @patch.object(backfill_module, "process_assembly_report") - @patch.object(backfill_module, "utils") - def test_calls_process_with_superseded( - self, mock_utils, mock_process, mock_seq, mock_gh - ): - mock_utils.convert_keys_to_camel_case.return_value = { - "accession": "GCA_000222935.1" - } - mock_process.return_value = { - "processedAssemblyInfo": { - "genbankAccession": "GCA_000222935.1", - "versionStatus": "superseded", - } - } - mock_gh.parse_report_values.return_value = { - "genbankAccession": "GCA_000222935.1", - "assemblyID": "GCA_000222935_1", - } - - config = MagicMock() - row = backfill_module.parse_historical_version( - version_data={"accession": "GCA_000222935.1"}, - config=config, - base_accession="GCA_000222935", - version_num=1, - current_accession="GCA_000222935.2", - ) - - mock_process.assert_called_once() - _, kwargs = mock_process.call_args - assert kwargs["version_status"] == "superseded" - - mock_seq.assert_called_once() - - assert row["genbankAccession"] == "GCA_000222935.1" - - @patch.object(backfill_module, "gh_utils") - @patch.object(backfill_module, "fetch_and_parse_sequence_report") - @patch.object(backfill_module, "process_assembly_report") - @patch.object(backfill_module, "utils") - def test_sets_assembly_id( - self, mock_utils, mock_process, mock_seq, mock_gh - ): - mock_utils.convert_keys_to_camel_case.return_value = { - "accession": "GCA_000222935.1" - } - mock_process.return_value = { - "processedAssemblyInfo": { - "genbankAccession": "GCA_000222935.1", - } - } - mock_gh.parse_report_values.return_value = {} - - config = MagicMock() - backfill_module.parse_historical_version( - version_data={}, - config=config, - base_accession="GCA_000222935", - version_num=1, - current_accession="GCA_000222935.2", - ) - - report = mock_process.return_value - assert report["processedAssemblyInfo"]["assemblyID"] == "GCA_000222935_1" - - -class TestBackfillOrchestrator: - """Integration tests for the backfill_historical_versions flow. - - Mocks external dependencies (FTP, datasets CLI, parser functions, TSV - writer) so the test is fast and offline. Focuses on verifying: - - All discovered versions are requested and parsed - - write_to_tsv is called exactly once with all accumulated rows - - Checkpoint does not clear the parsed dict (regression for data-loss bug) - """ - - def _make_jsonl(self, tmp_path, records): - """Write records to a JSONL fixture file.""" - path = tmp_path / "input.jsonl" - lines = [json.dumps(r) for r in records] - path.write_text("\n".join(lines) + "\n") - return str(path) - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_writes_tsv_once_with_all_rows( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """Regression: old code wrote per-batch and cleared parsed dict.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": "assembly_historical.tsv"} - ) - mock_find.return_value = [ - {"accession": "GCA_000222935.1"}, - ] - mock_parse.return_value = { - "genbankAccession": "GCA_000222935.1", - "assemblyID": "GCA_000222935_1", - } - - input_path = self._make_jsonl(tmp_path, [ - {"accession": "GCA_000222935.2"}, - ]) - yaml_path = str(tmp_path / "config.yaml") - - backfill_historical_versions( - input_path=input_path, - yaml_path=yaml_path, - work_dir=str(tmp_path), - ) - - mock_write.assert_called_once() - written_parsed = mock_write.call_args[0][0] - assert "GCA_000222935.1" in written_parsed - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_multiple_assemblies_all_in_one_write( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """All rows from multiple assemblies must appear in a single write.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": "assembly_historical.tsv"} - ) - - def fake_find(accession, work_dir): - base = accession.split(".")[0] - return [{"accession": f"{base}.1"}] - - mock_find.side_effect = fake_find - - call_count = {"n": 0} - - def fake_parse(version_data, **kwargs): - call_count["n"] += 1 - acc = version_data["accession"] - return {"genbankAccession": acc, "assemblyID": acc.replace(".", "_")} - - mock_parse.side_effect = fake_parse - - input_path = self._make_jsonl(tmp_path, [ - {"accession": "GCA_000222935.2"}, - {"accession": "GCA_000412225.2"}, - {"accession": "GCA_003706615.3"}, - ]) - - backfill_historical_versions( - input_path=input_path, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - - mock_write.assert_called_once() - written_parsed = mock_write.call_args[0][0] - assert len(written_parsed) == 3 - assert "GCA_000222935.1" in written_parsed - assert "GCA_000412225.1" in written_parsed - assert "GCA_003706615.1" in written_parsed - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_skips_current_version( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """Versions >= current should not be parsed.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": "out.tsv"} - ) - mock_find.return_value = [ - {"accession": "GCA_000222935.1"}, - {"accession": "GCA_000222935.2"}, - ] - mock_parse.return_value = { - "genbankAccession": "GCA_000222935.1", - } - - input_path = self._make_jsonl(tmp_path, [ - {"accession": "GCA_000222935.2"}, - ]) - - backfill_historical_versions( - input_path=input_path, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - - mock_parse.assert_called_once() - parsed_acc = mock_parse.call_args[1]["version_data"]["accession"] - assert parsed_acc == "GCA_000222935.1" - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "utils") - def test_no_write_when_nothing_to_backfill( - self, mock_utils, mock_find, mock_write, tmp_path - ): - """If all assemblies are v1, write_to_tsv should not be called.""" - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": "out.tsv"} - ) - input_path = self._make_jsonl(tmp_path, [ - {"accession": "GCA_000222935.1"}, - ]) - - backfill_historical_versions( - input_path=input_path, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - - mock_write.assert_not_called() - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_checkpoint_does_not_clear_parsed( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """Rows parsed before a checkpoint must still be in the final write. - - This is the core regression test for Rich's bug report: 4006 - assemblies processed, only 6 in the output file. - """ - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": "out.tsv"} - ) - - records = [ - {"accession": f"GCA_{str(i).zfill(9)}.2"} for i in range(150) - ] - input_path = self._make_jsonl(tmp_path, records) - - def fake_find(accession, work_dir): - base = accession.split(".")[0] - return [{"accession": f"{base}.1"}] - - mock_find.side_effect = fake_find - - def fake_parse(version_data, **kwargs): - acc = version_data["accession"] - return {"genbankAccession": acc} - - mock_parse.side_effect = fake_parse - - backfill_historical_versions( - input_path=input_path, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - - mock_write.assert_called_once() - written_parsed = mock_write.call_args[0][0] - assert len(written_parsed) == 150 - - @patch.object(backfill_module, "write_to_tsv") - @patch.object(backfill_module, "find_all_assembly_versions") - @patch.object(backfill_module, "parse_historical_version") - @patch.object(backfill_module, "utils") - def test_rerun_after_completed_checkpoint_writes_all_rows( - self, mock_utils, mock_parse, mock_find, mock_write, tmp_path - ): - """Re-running after a completed run must still write all rows. - - Regression test for Rich's report: second run skipped first ~4200 - entries and only wrote the remaining ~70 because the checkpoint - held processed_count == total at the end of the first run. - """ - mock_utils.load_config.return_value = MagicMock( - meta={"file_name": "out.tsv"} - ) - - records = [ - {"accession": f"GCA_{str(i).zfill(9)}.2"} for i in range(20) - ] - input_path = self._make_jsonl(tmp_path, records) - - def fake_find(accession, work_dir): - base = accession.split(".")[0] - return [{"accession": f"{base}.1"}] - - mock_find.side_effect = fake_find - - def fake_parse(version_data, **kwargs): - return {"genbankAccession": version_data["accession"]} - - mock_parse.side_effect = fake_parse - - common_args = dict( - input_path=input_path, - yaml_path=str(tmp_path / "config.yaml"), - work_dir=str(tmp_path), - ) - - # First run — completes and saves a completed checkpoint. - backfill_historical_versions(**common_args) - assert mock_write.call_count == 1 - - mock_write.reset_mock() - mock_find.reset_mock() - mock_parse.reset_mock() - - # Second run — must process all 20 entries, not just the tail. - backfill_historical_versions(**common_args) - assert mock_write.call_count == 1 - written_parsed = mock_write.call_args[0][0] - assert len(written_parsed) == 20