diff --git a/Phase_0_PR_SUMMARY.md b/Phase_0_PR_SUMMARY.md new file mode 100644 index 0000000..0370bd9 --- /dev/null +++ b/Phase_0_PR_SUMMARY.md @@ -0,0 +1,119 @@ +# Pull Request: Historical Assembly Version Backfill (Phase 0) + +## Summary + +One-time backfill process to populate historical (superseded) assembly versions +from NCBI for all existing assemblies with version > 1. Enables version-aware +milestone tracking by capturing previously untracked superseded versions. + +## What Changed (latest revision) + +### Bug fixes +- **Fixed data-loss bug**: `write_to_tsv` overwrites by default, so the + original per-batch write + `parsed = {}` clearing lost all but the last + batch. All rows now accumulate in memory and `write_to_tsv` is called + exactly once at the end. Checkpoints only record resume progress. + +### Structural changes +- **Renamed** `backfill_historical_versions.py` → + `parse_backfill_historical_versions.py` so `register.py` discovers it as a + plugin via the `parse_` prefix convention. +- **Changed orchestrator decorator** from `@task` to `@flow(log_prints=True)` + to match how `parse_ncbi_assemblies` structures its flow/task hierarchy. +- **Split `find_all_assembly_versions`** into `discover_version_accessions` + (FTP) + `fetch_version_metadata` (datasets CLI) for modularity and + independent testability. +- **Added `backfill_historical_versions_wrapper`** matching the + `fetch_parse_validate` parser signature so the flow integrates with the + Prefect pipeline. + +### Convention alignment +- **CLI arguments** now use `shared_args` exclusively (`INPUT_PATH`, + `YAML_PATH`, `WORK_DIR`); removed ad-hoc `--checkpoint` argument. + Checkpoint path is derived via `derive_checkpoint_path`. +- **Replaced `subprocess.run`** with `utils.run_quoted` for shell-safe + argument quoting (consistent with `parse_ncbi_assemblies`). +- **Code style** aligned with GenomeHubs conventions: Google-style docstrings, + lowercase type hints (`dict`, `list`, `tuple`), `e` for exception variables, + removed shebang and section banners. +- **`assembly_historical.types.yaml`**: moved `needs` under `file:` section, + references `ATTR_assembly.types.yaml` (matches `ncbi_datasets_eukaryota` + convention), and renamed `names` → `taxon_names` for validation compliance. + +### Test suite rewrite +- Rewrote `tests/test_backfill.py` using pytest (was a custom runner). +- **33 tests**, all passing, covering: + - Accession parsing helpers + - Assembly identification from JSONL fixture + - Cache round-trip with expiry + - Checkpoint save/load/derive + - Accession format validation + - `parse_historical_version` delegation (mocked) + - Orchestrator: single TSV write, multi-assembly accumulation, + current-version skipping, no-op for v1-only input + - **Regression test for the batch-overwrite data-loss bug** + +## Solution Overview + +The backfill script: +1. Identifies assemblies with version > 1 from the input JSONL +2. Discovers all historical versions via NCBI FTP directory listing +3. Fetches metadata for each version using NCBI Datasets CLI +4. Parses each version through `process_assembly_report` with + `version_status="superseded"` +5. Writes all accumulated rows to a single TSV via `write_to_tsv` + +### Smart 2-Tier Caching +- **Version discovery cache** (7-day expiry): FTP directory listings +- **Metadata cache** (30-day expiry): Datasets CLI responses +- Reduces reruns from hours to minutes + +### Checkpoint System +- Saves progress every 100 assemblies to `{work_dir}/checkpoints/` +- Allows resuming after interruptions without re-fetching cached data +- Marks the checkpoint as `completed` at the end of a full run so the next + re-run starts from index 0 (all rows collected; network fetches still served + from cache) +- Does **not** trigger intermediate TSV writes (avoids the overwrite bug) + +## Files + +### New +- `flows/parsers/parse_backfill_historical_versions.py` — Main backfill flow +- `configs/assembly_historical.types.yaml` — Output schema configuration +- `tests/test_backfill.py` — pytest suite (33 tests) +- `tests/test_data/assembly_test_sample.jsonl` — Test fixture (3 assemblies) + +### Modified +- `flows/parsers/parse_ncbi_assemblies.py` — Added `version_status` parameter + +## Usage + +### As a standalone CLI +```bash +python -m flows.parsers.parse_backfill_historical_versions \ + --input_path data/assembly_data_report.jsonl \ + --yaml_path configs/assembly_historical.types.yaml \ + --work_dir tmp +``` + +### Via Prefect pipeline +Discovered automatically by `register.py` and invoked through +`fetch_parse_validate` with the standard parser signature. + +### Expected performance +- **First run**: ~10–15 hours (~3,700 assemblies, ~8,500 versions) +- **Subsequent runs**: ~15–30 minutes (cache hits) + +### Running tests +```bash +set SKIP_PREFECT=true +python -m pytest tests/test_backfill.py -v +``` + +## Next Steps (after merge) + +1. Run full backfill on production dataset +2. Upload output to S3 +3. Implement Phase 1: incremental daily updates for new historical versions +4. Implement Phase 2: version-aware milestone tracking diff --git a/configs/assembly_historical.types.yaml b/configs/assembly_historical.types.yaml new file mode 100644 index 0000000..b98f0fd --- /dev/null +++ b/configs/assembly_historical.types.yaml @@ -0,0 +1,201 @@ +# Configuration for historical assembly version backfill +# This config defines the schema for assembly_historical.tsv +# which contains all superseded assembly versions + +attributes: + assembly_level: + header: assemblyLevel + path: assemblyInfo.assemblyLevel + assembly_span: + header: totalSequenceLength + path: assemblyStats.totalSequenceLength + assigned_percent: + header: assignedProportion + path: processedAssemblyStats.assignedProportion + assembly_status: + header: primaryValue + path: processedAssemblyInfo.primaryValue + translate: + "1": primary + assembly_type: + header: assemblyType + path: assemblyInfo.assemblyType + bioproject: + header: bioProjectAccession + path: assemblyInfo.bioprojectLineage.bioprojects.accession + separator: + - "," + biosample: + header: biosampleAccession + path: assemblyInfo.biosample.accession + separator: + - "," + chromosome_count: + header: totalNumberOfChromosomes + path: assemblyStats.totalNumberOfChromosomes + contig_count: + header: numberOfContigs + path: assemblyStats.numberOfContigs + contig_l50: + header: contigL50 + path: assemblyStats.contigL50 + contig_n50: + header: contigN50 + path: assemblyStats.contigN50 + ebp_standard_criteria: + header: ebpStandardCriteria + path: processedAssemblyStats.ebpStandardCriteria + separator: + - "," + ebp_standard_date: + header: ebpStandardDate + path: processedAssemblyStats.ebpStandardDate + gc_percent: + header: gcPercent + path: assemblyStats.gcPercent + gene_count: + header: geneCountTotal + path: annotationInfo.stats.geneCounts.total + gene_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + isolate: + header: isolate + path: assemblyInfo.biosample.attributes.name==isolate.value + last_updated: + header: releaseDate + path: assemblyInfo.releaseDate + mitochondrion_accession: + header: mitochondrionAccession + path: processedOrganelleInfo.mitochondrion.accession + separator: + - ; + mitochondrion_assembly_span: + header: mitochondrionAssemblySpan + path: processedOrganelleInfo.mitochondrion.assemblySpan + mitochondrion_gc_percent: + header: mitochondrionGcPercent + path: processedOrganelleInfo.mitochondrion.gcPercent + mitochondrion_scaffolds: + header: mitochondrionScaffolds + path: processedOrganelleInfo.mitochondrion.scaffolds + separator: + - ; + noncoding_gene_count: + header: geneCountNoncoding + path: annotationInfo.stats.geneCounts.nonCoding + noncoding_gene_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + plastid_accession: + header: plastidAccession + path: processedOrganelleInfo.plastid.accession + separator: + - ; + plastid_assembly_span: + header: plastidAssemblySpan + path: processedOrganelleInfo.plastid.assemblySpan + plastid_gc_percent: + header: plastidGcPercent + path: processedOrganelleInfo.plastid.gcPercent + plastid_scaffolds: + header: plastidScaffolds + path: processedOrganelleInfo.plastid.scaffolds + separator: + - ; + protein_count: + header: geneCountProteincoding + path: annotationInfo.stats.geneCounts.proteinCoding + protein_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + pseudogene_count: + header: geneCountPseudogene + path: annotationInfo.stats.geneCounts.pseudogene + pseudogene.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + refseq_category: + header: refseqCategory + path: assemblyInfo.refseqCategory + sample_sex: + header: sex + path: assemblyInfo.biosample.attributes.name==sex.value + scaffold_count: + header: numberOfScaffolds + path: assemblyStats.numberOfScaffolds + scaffold_l50: + header: scaffoldL50 + path: assemblyStats.scaffoldL50 + scaffold_n50: + header: scaffoldN50 + path: assemblyStats.scaffoldN50 + sequence_count: + header: numberOfComponentSequences + path: assemblyStats.numberOfComponentSequences + submitter: + header: submitter + path: assemblyInfo.submitter + ungapped_span: + header: totalUngappedLength + path: assemblyStats.totalUngappedLength + # NEW: Version-specific field to indicate this is a historical/superseded version + version_status: + header: versionStatus + path: processedAssemblyInfo.versionStatus + +file: + display_group: general + needs: + - ATTR_assembly.types.yaml + exclusions: + attributes: + - bioproject + - biosample + identifiers: + - assembly_id + taxonomy: + - taxon_id + format: tsv + header: true + name: assembly_historical.tsv + source: INSDC + source_url_stub: https://www.ncbi.nlm.nih.gov/assembly/ + +identifiers: + assembly_id: + header: assemblyID + path: processedAssemblyInfo.assemblyID + assembly_name: + header: assemblyName + path: assemblyInfo.assemblyName + genbank_accession: + header: genbankAccession + path: processedAssemblyInfo.genbankAccession + refseq_accession: + header: refseqAccession + path: processedAssemblyInfo.refseqAccession + wgs_accession: + header: wgsProjectAccession + path: wgsInfo.wgsProjectAccession + +metadata: + is_primary_value: + header: primaryValue + path: processedAssemblyInfo.primaryValue + source_slug: + header: genbankAccession + path: processedAssemblyInfo.genbankAccession + +taxon_names: + common_name: + header: commonName + path: organism.commonName + +taxonomy: + taxon: + header: organismName + path: organism.organismName + taxon_id: + header: taxId + path: organism.taxId diff --git a/flows/parsers/parse_backfill_historical_versions.py b/flows/parsers/parse_backfill_historical_versions.py new file mode 100644 index 0000000..cc18b17 --- /dev/null +++ b/flows/parsers/parse_backfill_historical_versions.py @@ -0,0 +1,556 @@ +"""One-time historical backfill of superseded assembly versions from NCBI. + +Discovers and parses all superseded versions for assemblies with version > 1. +Run once before starting the daily incremental pipeline. + +Usage: + python -m flows.parsers.parse_backfill_historical_versions \\ + --input_path data/assembly_data_report.jsonl \\ + --yaml_path configs/assembly_historical.types.yaml \\ + --work_dir tmp +""" + +import json +import os +import re +import time +from datetime import datetime +from glob import glob +from pathlib import Path +from typing import Optional + +import requests +from genomehubs import utils as gh_utils + +from flows.lib import utils +from flows.lib.conditional_import import flow +from flows.lib.shared_args import INPUT_PATH, WORK_DIR, YAML_PATH +from flows.lib.shared_args import parse_args as _parse_args +from flows.lib.shared_args import required +from flows.lib.utils import Config, Parser +from flows.parsers.parse_ncbi_assemblies import ( + fetch_and_parse_sequence_report, + process_assembly_report, + write_to_tsv, +) + +ACCESSION_PATTERN = re.compile(r"^GC[AF]_\d{9}\.\d+$") + + +def setup_cache_directories(work_dir: str): + """Create cache directory structure under work_dir. + + Args: + work_dir (str): Path to the working directory. + """ + for subdir in ("version_discovery", "metadata"): + os.makedirs( + os.path.join(work_dir, "backfill_cache", subdir), exist_ok=True + ) + + +def get_cache_path(work_dir: str, cache_type: str, identifier: str) -> str: + """Generate a human-readable cache file path. + + Args: + work_dir (str): Path to the working directory. + cache_type (str): Cache category (version_discovery or metadata). + identifier (str): Accession string used as the filename stem. + + Returns: + str: Path to the JSON cache file. + """ + safe_id = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier) + return os.path.join(work_dir, "backfill_cache", cache_type, f"{safe_id}.json") + + +def load_from_cache(cache_path: str, max_age_days: int = 30) -> dict: + """Load data from cache if it exists and is recent enough. + + Args: + cache_path (str): Path to the cache JSON file. + max_age_days (int): Maximum acceptable age in days. + + Returns: + dict: Cached data, or empty dict on miss/expiry. + """ + try: + if os.path.exists(cache_path): + cache_age = time.time() - os.path.getmtime(cache_path) + if cache_age < (max_age_days * 24 * 3600): + with open(cache_path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f" Warning: Could not load cache from {cache_path}: {e}") + return {} + + +def save_to_cache(cache_path: str, data: dict): + """Save data to a cache file, creating parent dirs as needed. + + Args: + cache_path (str): Path to the cache JSON file. + data (dict): Data to persist. + """ + try: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f" Warning: Could not save cache to {cache_path}: {e}") + + +def discover_version_accessions( + base_accession: str, work_dir: str +) -> list[str]: + """Discover all versioned accessions for a base assembly via NCBI FTP. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: Sorted list of versioned accession strings. + """ + base_match = re.match(r"(GC[AF]_\d+)", base_accession) + if not base_match: + return [] + + base = base_match.group(1) + setup_cache_directories(work_dir) + cache_path = get_cache_path(work_dir, "version_discovery", base) + cached = load_from_cache(cache_path, max_age_days=7) + + if cached and "accessions" in cached: + print(f" Using cached version list for {base}") + return cached["accessions"] + + print(f" Discovering versions for {base} via FTP") + ftp_url = ( + f"https://ftp.ncbi.nlm.nih.gov/genomes/all/" + f"{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" + ) + + try: + response = requests.get(ftp_url, timeout=30) + if response.status_code != 200: + print(f" Warning: FTP query failed for {base}") + return [] + except Exception as e: + print(f" Error querying FTP for {base}: {e}") + return [] + + version_pattern = rf"{re.escape(base)}\.\d+" + accessions = sorted(set(re.findall(version_pattern, response.text))) + + save_to_cache(cache_path, { + "accessions": accessions, + "base_accession": base, + "ftp_url": ftp_url, + }) + return accessions + + +def fetch_version_metadata(version_acc: str, work_dir: str) -> dict: + """Fetch NCBI datasets metadata for a single assembly version. + + Uses utils.run_quoted to safely invoke the datasets CLI. Results are + cached for 30 days. + + Args: + version_acc (str): Versioned accession (e.g. GCA_000002035.1). + work_dir (str): Working directory for cache storage. + + Returns: + dict: Metadata dict, or empty dict on failure. + """ + cache_path = get_cache_path(work_dir, "metadata", version_acc) + cached = load_from_cache(cache_path, max_age_days=30) + + if cached and "metadata" in cached: + return cached["metadata"] + + if not ACCESSION_PATTERN.match(version_acc): + print(f" Skipping unexpected accession format: {version_acc}") + return {} + + cmd = [ + "datasets", "summary", "genome", "accession", + version_acc, "--as-json-lines", + ] + try: + result = utils.run_quoted( + cmd, + capture_output=True, + text=True, + encoding="utf-8", + errors="ignore", + timeout=60, + ) + if result.returncode == 0 and result.stdout and result.stdout.strip(): + version_data = json.loads(result.stdout.strip()) + save_to_cache(cache_path, { + "metadata": version_data, + "cached_at": time.time(), + }) + return version_data + + print(f" Warning: No metadata for {version_acc}") + except Exception as e: + print(f" Warning: Error fetching {version_acc}: {e}") + + return {} + + +def find_all_assembly_versions( + base_accession: str, work_dir: str +) -> list[dict]: + """Discover all versions and fetch metadata for each. + + Delegates to discover_version_accessions for FTP discovery and + fetch_version_metadata for per-version metadata retrieval. Both layers + use independent caches. + + Args: + base_accession (str): Full accession (e.g. GCA_000002035.3). + work_dir (str): Working directory for cache storage. + + Returns: + list: List of metadata dicts, one per version found. + """ + accessions = discover_version_accessions(base_accession, work_dir) + versions = [] + for version_acc in accessions: + metadata = fetch_version_metadata(version_acc, work_dir) + if metadata: + versions.append(metadata) + return versions + + +def parse_historical_version( + version_data: dict, + config: Config, + base_accession: str, + version_num: int, + current_accession: str, +) -> dict: + """Parse a single historical version using GenomeHubs parser logic. + + Ensures consistency with current assemblies by reusing + process_assembly_report with version_status="superseded" and + fetch_and_parse_sequence_report. + + Args: + version_data (dict): Raw NCBI metadata from the datasets CLI. + config (Config): Config object loaded from the YAML file. + base_accession (str): Base accession (e.g. GCA_000002035). + version_num (int): Integer version (1, 2, 3, ...). + current_accession (str): The latest accession that superseded this one. + + Returns: + dict: Parsed row dict ready for TSV output. + """ + version_data = utils.convert_keys_to_camel_case(version_data) + + processed_report = process_assembly_report( + report=version_data, + previous_report=None, + config=config, + parsed={}, + version_status="superseded", + ) + + fetch_and_parse_sequence_report(processed_report) + + processed_report["processedAssemblyInfo"]["assemblyID"] = ( + f"{base_accession}_{version_num}" + ) + + return gh_utils.parse_report_values(config.parse_fns, processed_report) + + +def parse_version(accession: str) -> int: + """Extract version number from a dotted accession string. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + int: Version number (defaults to 1 if no dot-suffix). + """ + parts = accession.split(".") + return int(parts[1]) if len(parts) > 1 else 1 + + +def parse_accession(accession: str) -> tuple[str, int]: + """Split an accession into its base and version components. + + Args: + accession (str): e.g. GCA_000002035.3 + + Returns: + tuple: (base_accession, version_number). + """ + parts = accession.split(".") + return parts[0], int(parts[1]) if len(parts) > 1 else 1 + + +def derive_checkpoint_path( + input_path: str, yaml_path: str, work_dir: str +) -> str: + """Derive a stable checkpoint path from parser inputs. + + Places the checkpoint alongside the data in work_dir so its location + can be determined without extra CLI arguments. + + Args: + input_path (str): Path to the assembly report JSONL file. + yaml_path (str): Path to the parser YAML configuration file. + work_dir (str): Working directory. + + Returns: + str: Path to the checkpoint JSON file. + """ + input_stem = Path(input_path).stem + config_stem = Path(yaml_path).stem + checkpoint_dir = Path(work_dir) / "checkpoints" + checkpoint_dir.mkdir(parents=True, exist_ok=True) + name = f"backfill__{config_stem}__{input_stem}.json" + return str(checkpoint_dir / name) + + +def load_checkpoint(checkpoint_file: str) -> dict: + """Load checkpoint data if the file exists. + + Args: + checkpoint_file (str): Path to the checkpoint JSON file. + + Returns: + dict: Checkpoint dict, or empty dict if absent. + """ + if Path(checkpoint_file).exists(): + with open(checkpoint_file) as f: + return json.load(f) + return {} + + +def save_checkpoint( + checkpoint_file: str, processed_count: int, completed: bool = False +): + """Persist current progress to the checkpoint file. + + Args: + checkpoint_file (str): Path to the checkpoint JSON file. + processed_count (int): Number of assemblies processed so far. + completed (bool): True when the full run finished successfully. + A completed checkpoint resets start_index on the next run so + all entries are re-collected (using cached network data). + """ + Path(checkpoint_file).parent.mkdir(parents=True, exist_ok=True) + with open(checkpoint_file, "w") as f: + json.dump({ + "processed_count": processed_count, + "completed": completed, + "timestamp": datetime.now().isoformat(), + }, f, indent=2) + + +def identify_assemblies_needing_backfill(input_path: str) -> list[dict]: + """Identify assemblies with version > 1 that need historical backfill. + + Args: + input_path (str): Path to assembly_data_report.jsonl. + + Returns: + list: Assembly info dicts describing what needs backfilling. + """ + assemblies = [] + with open(input_path) as f: + for line in f: + record = json.loads(line) + accession = record["accession"] + base_acc, version = parse_accession(accession) + + if version > 1: + assemblies.append({ + "base_accession": base_acc, + "current_version": version, + "current_accession": accession, + "historical_versions_needed": list(range(1, version)), + }) + return assemblies + + +@flow(log_prints=True) +def backfill_historical_versions( + input_path: str, + yaml_path: str, + work_dir: str = ".", + checkpoint_file: Optional[str] = None, +): + """One-time backfill of all historical assembly versions. + + Accumulates all parsed rows in memory and writes the output TSV once at + the end. Checkpoints are saved periodically so the run can be resumed + after interruption but do not trigger intermediate TSV writes. + + Args: + input_path (str): Path to assembly_data_report.jsonl. + yaml_path (str): Path to assembly_historical.types.yaml. + work_dir (str): Working directory for caches, checkpoints, and output. + checkpoint_file (str, optional): Explicit checkpoint path. Derived + from inputs when omitted. + """ + setup_cache_directories(work_dir) + config = utils.load_config(config_file=yaml_path) + checkpoint_file = checkpoint_file or derive_checkpoint_path( + input_path, yaml_path, work_dir, + ) + + print("Scanning for assemblies needing historical backfill...") + assemblies = identify_assemblies_needing_backfill(input_path) + + if not assemblies: + print("No assemblies with version > 1 found. Nothing to backfill.") + return + + checkpoint = load_checkpoint(checkpoint_file) + # A completed checkpoint means the previous run finished successfully. + # Reset to 0 so all entries are collected again (network fetches still use + # the on-disk cache, so the re-run is fast). + if checkpoint.get("completed", False): + start_index = 0 + else: + start_index = checkpoint.get("processed_count", 0) + + total_assemblies = len(assemblies) + total_versions = sum( + len(a["historical_versions_needed"]) for a in assemblies + ) + + print(f"\n{'=' * 80}") + print("ONE-TIME HISTORICAL BACKFILL") + print(f"{'=' * 80}") + print(f" Assemblies to process: {total_assemblies}") + print(f" Total historical versions: {total_versions}") + if start_index > 0: + print(f" Resuming from checkpoint: {start_index}/{total_assemblies}") + print(f"{'=' * 80}\n") + + parsed = {} + processed = start_index + + for assembly_info in assemblies[start_index:]: + base_acc = assembly_info["base_accession"] + current_version = assembly_info["current_version"] + current_accession = assembly_info["current_accession"] + + print( + f"[{processed + 1}/{total_assemblies}] " + f"{base_acc} (current: v{current_version})" + ) + + all_versions = find_all_assembly_versions(current_accession, work_dir) + if not all_versions: + print(" Warning: No versions found via FTP") + processed += 1 + continue + + for version_data in all_versions: + version_acc = version_data.get("accession", "") + version_num = parse_version(version_acc) + + if version_num >= current_version: + continue + + try: + print(f" Parsing v{version_num}...", end=" ", flush=True) + row = parse_historical_version( + version_data=version_data, + config=config, + base_accession=base_acc, + version_num=version_num, + current_accession=current_accession, + ) + genbank_acc = row.get("genbankAccession", version_acc) + parsed[genbank_acc] = row + print("done") + except Exception as e: + print(f"failed ({e})") + continue + + processed += 1 + + if processed % 100 == 0: + save_checkpoint(checkpoint_file, processed) + pct = processed / total_assemblies * 100 + print( + f"\n Checkpoint saved: " + f"{processed}/{total_assemblies} ({pct:.1f}%)\n" + ) + + if parsed: + print(f"\nWriting {len(parsed)} records to TSV...") + write_to_tsv(parsed, config) + + save_checkpoint(checkpoint_file, processed, completed=True) + + print(f"\n{'=' * 80}") + print("BACKFILL COMPLETE") + print(f"{'=' * 80}") + print(f" Processed: {processed}/{total_assemblies} assemblies") + print(f" Records written: {len(parsed)}") + print(f" Output: {config.meta['file_name']}") + print("\n Next step: Run daily incremental pipeline") + print(f"{'=' * 80}\n") + + +def backfill_historical_versions_wrapper( + working_yaml: str, + work_dir: str, + append: bool, + data_freeze_path: Optional[str] = None, + **kwargs, +): + """Wrapper matching the fetch_parse_validate parser signature. + + Locates the *.jsonl input in work_dir and delegates to + backfill_historical_versions. + + Args: + working_yaml (str): Path to the working YAML file. + work_dir (str): Path to the working directory. + append (bool): Whether to append (unused, accepted for compatibility). + data_freeze_path (str, optional): Ignored; accepted for compatibility. + **kwargs: Additional keyword arguments. + """ + glob_path = os.path.join(work_dir, "*.jsonl") + paths = glob(glob_path) + if not paths: + raise FileNotFoundError(f"No jsonl file found in {work_dir}") + if len(paths) > 1: + raise ValueError(f"More than one jsonl file found in {work_dir}") + + backfill_historical_versions( + input_path=paths[0], + yaml_path=working_yaml, + work_dir=work_dir, + ) + + +def plugin(): + """Register the flow.""" + return Parser( + name="BACKFILL_HISTORICAL_VERSIONS", + func=backfill_historical_versions_wrapper, + description="One-time backfill of historical assembly versions.", + ) + + +if __name__ == "__main__": + """Run the flow.""" + args = _parse_args( + [required(INPUT_PATH), required(YAML_PATH), WORK_DIR], + description="One-time historical backfill for assembly versions", + ) + backfill_historical_versions(**vars(args)) diff --git a/flows/parsers/parse_ncbi_assemblies.py b/flows/parsers/parse_ncbi_assemblies.py index 4bc4dd4..c5a566c 100644 --- a/flows/parsers/parse_ncbi_assemblies.py +++ b/flows/parsers/parse_ncbi_assemblies.py @@ -98,7 +98,13 @@ def is_atypical_assembly(report: dict, parsed: dict) -> bool: return False -def process_assembly_report(report: dict, previous_report: Optional[dict], config: Config, parsed: dict) -> dict: +def process_assembly_report( + report: dict, + previous_report: Optional[dict], + config: Config, + parsed: dict, + version_status: str = "current", +) -> dict: """Process assembly level information. This function takes a data dictionary and an optional previous_report dictionary, @@ -114,6 +120,9 @@ def process_assembly_report(report: dict, previous_report: Optional[dict], confi previous one. config (Config): A Config object containing the configuration data. parsed (dict): A dictionary containing parsed data. + version_status (str): Version status - "current" (default) or "superseded" + for historical versions. Defaults to "current" to maintain backward + compatibility with existing code. Returns: dict: The updated report dictionary. @@ -121,7 +130,10 @@ def process_assembly_report(report: dict, previous_report: Optional[dict], confi # Uncomment to filter atypical assemblies # if is_atypical_assembly(report, parsed): # return None - processed_report = {**report, "processedAssemblyInfo": {"organelle": "nucleus"}} + processed_report = {**report, "processedAssemblyInfo": { + "organelle": "nucleus", + "versionStatus": version_status + }} if "pairedAccession" in report: if processed_report["pairedAccession"].startswith("GCF_"): processed_report["processedAssemblyInfo"]["refseqAccession"] = report["pairedAccession"] diff --git a/tests/test_data/assembly_test_sample.jsonl b/tests/test_data/assembly_test_sample.jsonl new file mode 100644 index 0000000..fc540d2 --- /dev/null +++ b/tests/test_data/assembly_test_sample.jsonl @@ -0,0 +1,3 @@ +{"assemblyInfo":{"assemblyLevel":"Contig","assemblyName":"AciTa_1.0","assemblyType":"haploid","submitter":"University of Kentucky, Dept of Plant Pathology","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA67241","title":"Aciculosporium take MAFF-241224 genome sequencing project"}]}],"sequencingTech":"454 GS FLX Titanium","blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_000222935.2","biosample":{"accession":"SAMN02981340","lastUpdated":"2014-08-11T11:24:02.670","publicationDate":"2014-08-11T11:24:02.670","submissionDate":"2014-08-11T11:24:02.670","sampleIds":[{"db":"GenBank","value":"gb|AFQZ00000000.1"}],"description":{"title":"Sample from Aciculosporium take MAFF-241224","organism":{"taxId":1036760,"organismName":"Aciculosporium take MAFF-241224"}},"owner":{"name":"University of Kentucky, Advanced Genetic Technologies Center"},"models":["Generic"],"bioprojects":[{"accession":"PRJNA67241"}],"package":"Generic.1.0","attributes":[{"name":"strain","value":"MAFF-241224"}],"status":{"status":"live","when":"2014-08-11T11:24:02.670"},"strain":"MAFF-241224"},"assemblyStatus":"current","bioprojectAccession":"PRJNA67241","assemblyMethod":"Newbler v. 2.5.3","releaseDate":"2011-08-03"},"assemblyStats":{"totalSequenceLength":"58836405","totalUngappedLength":"58836405","numberOfContigs":3298,"contigN50":40517,"contigL50":411,"numberOfScaffolds":3298,"scaffoldN50":40517,"scaffoldL50":411,"numberOfComponentSequences":3298,"gcCount":"23621104","gcPercent":40,"genomeCoverage":"18.5","atgcCount":"58836294"},"wgsInfo":{"wgsProjectAccession":"AFQZ01","masterWgsUrl":"https://www.ncbi.nlm.nih.gov/nuccore/AFQZ00000000.1","wgsContigsUrl":"https://www.ncbi.nlm.nih.gov/Traces/wgs/AFQZ01"},"currentAccession":"GCA_000222935.2","accession":"GCA_000222935.2","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":1036760,"organismName":"Aciculosporium take MAFF-241224","infraspecificNames":{"strain":"MAFF-241224"}}} +{"assemblyInfo":{"assemblyLevel":"Complete Genome","assemblyName":"ASM41222v2","assemblyType":"haploid","submitter":"Duke University, Molecular Genetics and Microbiology","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA39551","title":"[Ashbya] aceris (nom. inval.) Genome sequencing"}]}],"blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_000412225.2","biosample":{"accession":"SAMN03081470","lastUpdated":"2021-07-14T21:04:31.245","publicationDate":"2014-09-26T00:00:00.000","submissionDate":"2014-09-26T10:52:27.116","sampleIds":[{"db":"GenBank","value":"gb|CP006020.1"}],"description":{"title":"Sample from [Ashbya] aceris (nom. inval.)","organism":{"taxId":566037,"organismName":"[Ashbya] aceris (nom. inval.)"}},"owner":{"name":"Duke University, Molecular Genetics and Microbiology and Institute for Genome Sciences & Policy"},"models":["Generic"],"bioprojects":[{"accession":"PRJNA39551"}],"package":"Generic.1.0","attributes":[{"name":"host","value":"Boisea trivittata"}],"status":{"status":"live","when":"2014-10-01T12:30:14"},"host":"Boisea trivittata"},"assemblyStatus":"current","bioprojectAccession":"PRJNA39551","releaseDate":"2014-06-05"},"assemblyStats":{"totalNumberOfChromosomes":7,"totalSequenceLength":"8867527","totalUngappedLength":"8867527","numberOfContigs":7,"contigN50":1493473,"contigL50":3,"numberOfScaffolds":7,"scaffoldN50":1493473,"scaffoldL50":3,"numberOfComponentSequences":7,"gcCount":"4548443","gcPercent":51.5,"numberOfOrganelles":1,"atgcCount":"8867527"},"organelleInfo":[{"description":"Mitochondrion","totalSeqLength":"26996","submitter":"Duke University, Molecular Genetics and Microbiology"}],"annotationInfo":{"name":"Annotation submitted by Duke University, Molecular Genetics and Microbiology","provider":"Duke University, Molecular Genetics and Microbiology","releaseDate":"2014-10-01","stats":{"geneCounts":{"total":4690,"proteinCoding":4487,"nonCoding":203}}},"currentAccession":"GCA_000412225.2","accession":"GCA_000412225.2","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":566037,"organismName":"[Ashbya] aceris (nom. inval.)"}} +{"assemblyInfo":{"assemblyLevel":"Scaffold","assemblyName":"ASM370661v3","assemblyType":"haploid","submitter":"UW-Madison","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA429441","title":"Sequencing of 196 yeast species from the subphylum Saccharomycotina Genome sequencing and assembly"}]}],"sequencingTech":"Illumina HiSeq","blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_003706615.3","biosample":{"accession":"SAMN08343339","lastUpdated":"2019-06-20T18:14:41.872","publicationDate":"2018-11-02T00:00:00.000","submissionDate":"2018-01-10T12:39:16.190","sampleIds":[{"label":"Sample name","value":"Candida_montana"},{"db":"SRA","value":"SRS2837972"}],"description":{"title":"Microbe sample from [Candida] montana","organism":{"taxId":49329,"organismName":"[Candida] montana"}},"owner":{"name":"UW-Madison","contacts":[{}]},"models":["Microbe, viral or environmental"],"package":"Microbe.1.0","attributes":[{"name":"strain","value":"NRRL Y-17326"},{"name":"isolation_source","value":"missing"},{"name":"collection_date","value":"2016"},{"name":"geo_loc_name","value":"USA: Madison, Wisconsin"},{"name":"sample_type","value":"cell culture"},{"name":"type-material","value":"type material of Candida montana"},{"name":"culture_collection","value":"NRRL:Y-17326"}],"status":{"status":"live","when":"2018-11-02T12:37:45.337"},"collectionDate":"2016","geoLocName":"USA: Madison, Wisconsin","isolationSource":"missing","strain":"NRRL Y-17326"},"assemblyStatus":"current","bioprojectAccession":"PRJNA429441","assemblyMethod":"SPADES v. 3.7","releaseDate":"2023-07-14"},"assemblyStats":{"totalSequenceLength":"12493498","totalUngappedLength":"12493434","numberOfContigs":61,"contigN50":704842,"contigL50":7,"numberOfScaffolds":60,"scaffoldN50":704842,"scaffoldL50":7,"numberOfComponentSequences":60,"gcCount":"4518305","gcPercent":36,"genomeCoverage":"63.8148","atgcCount":"12493434"},"wgsInfo":{"wgsProjectAccession":"PPLU03","masterWgsUrl":"https://www.ncbi.nlm.nih.gov/nuccore/PPLU00000000.3","wgsContigsUrl":"https://www.ncbi.nlm.nih.gov/Traces/wgs/PPLU03"},"currentAccession":"GCA_003706615.3","typeMaterial":{"typeLabel":"TYPE_MATERIAL","typeDisplayText":"assembly from type material"},"accession":"GCA_003706615.3","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":49329,"organismName":"[Candida] montana","infraspecificNames":{"strain":"NRRL Y-17326"}}}