diff --git a/multiqc/modules/bases2fastq/bases2fastq.py b/multiqc/modules/bases2fastq/bases2fastq.py index df4e29af77..3c8aae9d18 100644 --- a/multiqc/modules/bases2fastq/bases2fastq.py +++ b/multiqc/modules/bases2fastq/bases2fastq.py @@ -1,13 +1,15 @@ from collections import defaultdict import copy +from itertools import chain import re import json import logging import random -from typing import Any, Dict, List +from typing import Any, Callable, Dict, List, Optional, Tuple import uuid from pathlib import Path +from multiqc import config from multiqc.base_module import BaseMultiqcModule, ModuleNoSamplesFound from multiqc.types import LoadedFileDict from multiqc.utils import mqc_colour @@ -33,10 +35,118 @@ log = logging.getLogger(__name__) -MIN_POLONIES = 10000 +# Default minimum polony threshold - samples below this are skipped +DEFAULT_MIN_POLONIES = 10000 + + +def _get_min_polonies() -> int: + """ + Get the minimum polonies threshold from config or use default. + + Can be configured in multiqc_config.yaml: + bases2fastq_config: + min_polonies: 5000 + """ + cfg = getattr(config, "bases2fastq_config", {}) + if not isinstance(cfg, dict): + return DEFAULT_MIN_POLONIES + + min_polonies = cfg.get("min_polonies", DEFAULT_MIN_POLONIES) + try: + min_polonies = int(min_polonies) + except (ValueError, TypeError): + log.warning(f"Invalid min_polonies value '{min_polonies}', using default {DEFAULT_MIN_POLONIES}") + min_polonies = DEFAULT_MIN_POLONIES + + if min_polonies != DEFAULT_MIN_POLONIES: + log.debug(f"Using custom min_polonies threshold: {min_polonies}") + + return min_polonies class MultiqcModule(BaseMultiqcModule): + """ + Bases2Fastq is Element Biosciences' secondary analysis software for demultiplexing + sequencing data from AVITI systems and converting base calls into FASTQ files. + + Data Flow Overview + ------------------ + The module handles three distinct data hierarchy levels: + + 1. **Run Level**: Single sequencing run with all samples in one output + - Directory: `/` + - Files: `RunStats.json`, `RunManifest.json` + - Samples identified by: `{RunName}-{AnalysisID}__{SampleName}` + + 2. **Project Level**: Demultiplexing by project, samples split into project subdirectories + - Directory: `/Samples//` + - Files: Project-specific `RunStats.json` + - Run-level `RunManifest.json` accessed via `../../RunManifest.json` + - Samples identified by: `{RunName}-{AnalysisID}__{SampleName}` + + 3. **Combined Level**: Both run and project data present (merged view) + + Parsing Flow + ------------ + ``` + __init__() + │ + ├─> _init_data_structures() # Initialize empty dicts for all data levels + │ + ├─> _parse_and_validate_data() # Main parsing entry point + │ │ + │ ├─> _parse_run_project_data("bases2fastq/run") # Parse run-level RunStats.json + │ │ └─> Populates: run_level_data, run_level_samples, run_level_samples_to_project + │ │ + │ ├─> _parse_run_project_data("bases2fastq/project") # Parse project-level RunStats.json + │ │ └─> Populates: project_level_data, project_level_samples, project_level_samples_to_project + │ │ + │ └─> _determine_summary_path() # Returns: "run_level" | "project_level" | "combined_level" + │ + ├─> _select_data_by_summary_path() # Route to appropriate data sources + │ │ + │ ├─> _parse_run_manifest() or _parse_run_manifest_in_project() + │ │ └─> Returns: manifest_data (lane settings, adapter info) + │ │ + │ ├─> _parse_index_assignment() or _parse_index_assignment_in_project() + │ │ └─> Returns: index_assignment_data (per-sample index stats) + │ │ + │ └─> _parse_run_unassigned_sequences() (run_level only) + │ └─> Returns: unassigned_sequences (unknown barcodes) + │ + ├─> _setup_colors() # Assign colors to runs/projects/samples + │ + └─> _generate_plots() # Create all report sections and plots + ``` + + Data Structures + --------------- + - `run_level_data`: Dict[run_name, run_stats] - Run-level QC metrics + - `run_level_samples`: Dict[sample_id, sample_stats] - Sample metrics from run-level + - `project_level_data`: Dict[project_name, project_stats] - Project-level QC metrics + - `project_level_samples`: Dict[sample_id, sample_stats] - Sample metrics from project-level + - `*_samples_to_project`: Dict[sample_id, project_name] - Maps samples to their projects + + Sample Naming Convention + ------------------------ + Samples are uniquely identified as: `{RunName}-{AnalysisID[0:4]}__{SampleName}` + This ensures uniqueness across multiple runs while keeping names readable. + + Files Parsed + ------------ + - `RunStats.json`: Run/project QC metrics, sample statistics, lane data + - `RunManifest.json`: Sample sheet info, index sequences, adapter settings + + Metrics Displayed + ----------------- + - Polony counts and yields + - Base quality distributions (histogram and by-cycle) + - Index assignment statistics + - Per-sample sequence content and GC distribution + - Adapter content analysis + - Unassigned/unknown barcode sequences (run-level only) + """ + def __init__(self): super(MultiqcModule, self).__init__( name="Bases2Fastq", @@ -46,29 +156,144 @@ def __init__(self): doi="10.1038/s41587-023-01750-7", ) - # Initialize run, project and sample level structures - self.run_level_data = {} - self.run_level_samples = {} - self.run_level_samples_to_project = {} - self.project_level_data = {} - self.project_level_samples = {} - self.project_level_samples_to_project = {} - num_run_level_samples = 0 - num_project_level_samples = 0 - - # Initialize run and project groups - self.group_dict = dict() - self.group_lookup_dict = dict() - self.project_lookup_dict = dict() - - self.b2f_sample_data = dict() - self.b2f_run_data = dict() - self.b2f_run_project_data = dict() - self.b2f_run_project_sample_data = dict() - self.missing_runs = set() - self.sample_id_to_run = dict() - - # Define if call is project- or run-level + # Get configurable minimum polonies threshold + self.min_polonies = _get_min_polonies() + + # Initialize data structures + self._init_data_structures() + + # Parse and validate input data + summary_path = self._parse_and_validate_data() + + # Select data based on summary path and parse additional sources + run_data, sample_data, samples_to_projects, manifest_data, index_assignment_data, unassigned_sequences = ( + self._select_data_by_summary_path(summary_path) + ) + + # Set up color schemes for groups and samples + self._setup_colors(sample_data, samples_to_projects, summary_path) + + # Generate all plots and sections + self._generate_plots( + summary_path, + run_data, + sample_data, + samples_to_projects, + manifest_data, + index_assignment_data, + unassigned_sequences, + ) + + # Write main data file at the very end after all sections are added + self.write_data_file(sample_data, "bases2fastq") + + def _init_data_structures(self) -> None: + """ + Initialize all data structures used by the module. + + Data structures are organized by hierarchy level: + - Run level: Data from single-run Bases2Fastq output (no project splitting) + - Project level: Data from project-split Bases2Fastq output + - Combined: Merged data when both levels are present + """ + # File cache to avoid reading the same JSON files multiple times + # Key: resolved file path, Value: parsed JSON data + self._file_cache: Dict[str, Any] = {} + + # === Run-level data structures === + # Populated from /RunStats.json + self.run_level_data: Dict[str, Any] = {} # run_name -> full run stats + self.run_level_samples: Dict[str, Any] = {} # sample_id -> sample stats + self.run_level_samples_to_project: Dict[str, str] = {} # sample_id -> project name + + # === Project-level data structures === + # Populated from /Samples//RunStats.json + self.project_level_data: Dict[str, Any] = {} # project_name -> project stats + self.project_level_samples: Dict[str, Any] = {} # sample_id -> sample stats + self.project_level_samples_to_project: Dict[str, str] = {} # sample_id -> project name + + # === Grouping structures for color assignment === + self.group_dict: Dict[str, Any] = {} # group_name -> list of members + self.group_lookup_dict: Dict[str, Any] = {} # item -> group it belongs to + self.project_lookup_dict: Dict[str, Any] = {} # sample -> project mapping + + # === Legacy/auxiliary data structures === + self.b2f_sample_data: Dict[str, Any] = {} + self.b2f_run_data: Dict[str, Any] = {} + self.b2f_run_project_data: Dict[str, Any] = {} + self.b2f_run_project_sample_data: Dict[str, Any] = {} + self.missing_runs: set = set() # Runs referenced but not found + self.sample_id_to_run: Dict[str, str] = {} # sample_id -> run_analysis_name + + def _validate_path(self, file_path: Path, base_directory: Path) -> bool: + """ + Validate that a file path doesn't escape outside the expected directory hierarchy. + + Args: + file_path: Path to validate + base_directory: The base directory that the path should stay within + + Returns: + True if path is valid, False if it escapes the base directory + """ + try: + resolved_path = file_path.resolve() + resolved_base = base_directory.resolve() + # Check if the resolved path is within the base directory tree + resolved_path.relative_to(resolved_base) + return True + except ValueError: + # relative_to raises ValueError if path is not relative to base + log.warning( + f"Path {file_path} resolves outside expected directory {base_directory}. Skipping for security reasons." + ) + return False + + def _read_json_file(self, file_path: Path, base_directory: Optional[Path] = None) -> Optional[Dict[str, Any]]: + """ + Read and parse a JSON file with caching. + + Args: + file_path: Path to the JSON file + base_directory: Optional base directory to validate path against + + Returns: + Parsed JSON data or None if reading failed + """ + # Validate path doesn't escape expected directory if base is provided + if base_directory is not None and not self._validate_path(file_path, base_directory): + return None + + cache_key = str(file_path.resolve()) + + if cache_key in self._file_cache: + return self._file_cache[cache_key] + + if not file_path.exists(): + log.error( + f"{file_path.name} does not exist at {file_path}.\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + return None + + try: + with open(file_path) as _infile: + data = json.load(_infile) + self._file_cache[cache_key] = data + return data + except (json.JSONDecodeError, OSError) as e: + log.error(f"Error reading {file_path}: {e}") + return None + + def _parse_and_validate_data(self) -> str: + """ + Parse input data and validate that samples were found. + + Returns: + summary_path: The determined summary path ('run_level', 'project_level', or 'combined_level') + """ + # Check for available log files run_level_log_files = len(list(self.find_log_files("bases2fastq/run"))) project_level_log_files = len(list(self.find_log_files("bases2fastq/project"))) @@ -77,7 +302,7 @@ def __init__(self): log.error(error_msg) raise ModuleNoSamplesFound(error_msg) - # Parse data + # Parse data from available sources if run_level_log_files > 0: (self.run_level_data, self.run_level_samples, self.run_level_samples_to_project) = ( self._parse_run_project_data("bases2fastq/run") @@ -87,11 +312,11 @@ def __init__(self): self._parse_run_project_data("bases2fastq/project") ) - # Get run- and project-level samples + # Count samples num_run_level_samples = len(self.run_level_samples) num_project_level_samples = len(self.project_level_samples) - # Ensure run/sample data found + # Ensure at least some data was found if all( [ len(self.run_level_data) == 0, @@ -104,16 +329,10 @@ def __init__(self): log.error(error_msg) raise ModuleNoSamplesFound(error_msg) - # Choose path to take, if project use only project-level data, otherwise use run- and project-level - summary_path = "" - if len(self.run_level_data) > 0 and len(self.project_level_data) == 0: - summary_path = "run_level" - if len(self.run_level_data) == 0 and len(self.project_level_data) > 0: - summary_path = "project_level" - elif len(self.run_level_data) > 0 and len(self.project_level_data) > 0: - summary_path = "combined_level" + # Determine summary path + summary_path = self._determine_summary_path() - # Log runs, projects and samples found + # Log what was found log.info(f"Found {len(self.run_level_data)} run(s) within the Bases2Fastq results.") log.info(f"Found {len(self.project_level_data)} project(s) within the Bases2Fastq results.") if summary_path == "run_level": @@ -121,133 +340,226 @@ def __init__(self): else: log.info(f"Found {num_project_level_samples} sample(s) within the Bases2Fastq results.") - # Superfluous function call to confirm that it is used in this module + # Required call to confirm module is used self.add_software_version(None) - # Warn user if run-level/project-level or sample-level metrics were not found + # Warn if no data found if len(self.run_level_data) == 0 and len(self.project_level_data) == 0: log.warning("No run/project stats found!") if num_run_level_samples == 0 and num_project_level_samples == 0: log.warning("No sample stats found!") - # Define data to use - run_data = {} - sample_data = {} - samples_to_projects = {} - manifest_data = {} - index_assigment_data = {} - unassigned_sequences = {} + return summary_path + + def _determine_summary_path(self) -> str: + """ + Determine which summary path to use based on available data. + + Returns: + 'run_level', 'project_level', or 'combined_level' + """ + has_run_data = len(self.run_level_data) > 0 + has_project_data = len(self.project_level_data) > 0 + + if has_run_data and not has_project_data: + return "run_level" + elif not has_run_data and has_project_data: + return "project_level" + elif has_run_data and has_project_data: + return "combined_level" + else: + error_msg = "No run- or project-level data was retained. No report will be generated." + log.error(error_msg) + raise ModuleNoSamplesFound(error_msg) + + def _select_data_by_summary_path( + self, summary_path: str + ) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, str], Dict[str, Any], Dict[str, Any], Dict[str, Any]]: + """ + Select the appropriate data sources based on the summary path. + + Returns: + Tuple of (run_data, sample_data, samples_to_projects, manifest_data, + index_assignment_data, unassigned_sequences) + """ if summary_path == "run_level": - run_data = self.run_level_data - sample_data = self.run_level_samples - samples_to_projects = self.run_level_samples_to_project - manifest_data = self._parse_run_manifest("bases2fastq/manifest") - index_assigment_data = self._parse_index_assignment("bases2fastq/manifest") - unassigned_sequences = self._parse_run_unassigned_sequences("bases2fastq/run") + return ( + self.run_level_data, + self.run_level_samples, + self.run_level_samples_to_project, + self._parse_run_manifest("bases2fastq/manifest"), + self._parse_index_assignment("bases2fastq/manifest"), + self._parse_run_unassigned_sequences("bases2fastq/run"), + ) elif summary_path == "project_level": - run_data = self.project_level_data - sample_data = self.project_level_samples - samples_to_projects = self.project_level_samples_to_project - manifest_data = self._parse_run_manifest_in_project("bases2fastq/project") - index_assigment_data = self._parse_index_assignment_in_project("bases2fastq/project") + return ( + self.project_level_data, + self.project_level_samples, + self.project_level_samples_to_project, + self._parse_run_manifest_in_project("bases2fastq/project"), + self._parse_index_assignment_in_project("bases2fastq/project"), + {}, # No unassigned sequences for project level + ) elif summary_path == "combined_level": - run_data = self.run_level_data - sample_data = self.project_level_samples - samples_to_projects = self.project_level_samples_to_project - manifest_data = self._parse_run_manifest("bases2fastq/manifest") - index_assigment_data = self._parse_index_assignment("bases2fastq/manifest") - unassigned_sequences = self._parse_run_unassigned_sequences("bases2fastq/run") + return ( + self.run_level_data, + self.project_level_samples, + self.project_level_samples_to_project, + self._parse_run_manifest("bases2fastq/manifest"), + self._parse_index_assignment("bases2fastq/manifest"), + self._parse_run_unassigned_sequences("bases2fastq/run"), + ) else: error_msg = "No run- or project-level data was retained. No report will be generated." log.error(error_msg) raise ModuleNoSamplesFound(error_msg) + def _setup_colors( + self, sample_data: Dict[str, Any], samples_to_projects: Dict[str, str], summary_path: str + ) -> None: + """Set up color schemes for groups and samples.""" # Create run and project groups - run_groups = defaultdict(list) - project_groups = defaultdict(list) - in_project_sample_groups = defaultdict(list) - ind_sample_groups = defaultdict(list) - sample_to_run_group = {} + run_groups: Dict[str, List] = defaultdict(list) + project_groups: Dict[str, List] = defaultdict(list) + in_project_sample_groups: Dict[str, List] = defaultdict(list) + ind_sample_groups: Dict[str, List] = defaultdict(list) + for sample in sample_data.keys(): - (_run_name, _) = sample.split("__") - run_groups[_run_name].append(sample) - sample_to_run_group[sample] = _run_name + run_name, _ = sample.split("__") + run_groups[run_name].append(sample) sample_project = samples_to_projects[sample] project_groups[sample_project].append(sample) ind_sample_groups[sample] = [sample] if summary_path == "project_level": in_project_sample_groups[sample].append(sample) + merged_groups = {**run_groups, **project_groups, **in_project_sample_groups, **ind_sample_groups} - # Assign color for each group + # Build color palette self.color_getter = mqc_colour.mqc_colour_scale() - self.palette = sum( - [ + self.palette = list( + chain.from_iterable( self.color_getter.get_colours(hue) for hue in ["Set2", "Pastel1", "Accent", "Set1", "Set3", "Dark2", "Paired", "Pastel2"] - ], - [], + ) ) + + # Add extra colors if needed if len(merged_groups) > len(self.palette): extra_colors = [ - "#{:06x}".format(random.randrange(0, 0xFFFFFF)) for _ in range(len(self.palette), len(merged_groups)) + f"#{random.randrange(0, 0xFFFFFF):06x}" for _ in range(len(self.palette), len(merged_groups)) ] self.palette = self.palette + extra_colors - self.group_color = {g: c for g, c in zip(merged_groups.keys(), self.palette[: len(merged_groups)])} - self.sample_color = dict() - for s_name in samples_to_projects.keys(): - s_color = ( - self.group_color[s_name] - if (summary_path == "project_level" or len(project_groups) == 1) - else self.group_color[samples_to_projects[s_name]] - ) - self.sample_color.update({s_name: s_color}) - self.run_color = copy.deepcopy(self.group_color) # Make sure that run colors and group colors match + + # Assign colors to groups + self.group_color = { + group: color for group, color in zip(merged_groups.keys(), self.palette[: len(merged_groups)]) + } + + # Assign colors to samples + self.sample_color: Dict[str, str] = {} + for sample_name in samples_to_projects.keys(): + if summary_path == "project_level" or len(project_groups) == 1: + sample_color = self.group_color[sample_name] + else: + sample_color = self.group_color[samples_to_projects[sample_name]] + self.sample_color[sample_name] = sample_color + + # Copy group colors to run colors + self.run_color = copy.deepcopy(self.group_color) self.palette = self.palette[len(merged_groups) :] - # Plot metrics + def _generate_plots( + self, + summary_path: str, + run_data: Dict[str, Any], + sample_data: Dict[str, Any], + samples_to_projects: Dict[str, str], + manifest_data: Dict[str, Any], + index_assignment_data: Dict[str, Any], + unassigned_sequences: Dict[str, Any], + ) -> None: + """Generate all plots and add sections to the report.""" + # QC metrics table qc_metrics_function = ( tabulate_run_stats if summary_path in ["run_level", "combined_level"] else tabulate_project_stats ) self.add_run_plots(data=run_data, plot_functions=[qc_metrics_function]) - self.add_run_plots( - data=manifest_data, - plot_functions=[ - tabulate_manifest_stats, - ], - ) + + # Manifest stats + self.add_run_plots(data=manifest_data, plot_functions=[tabulate_manifest_stats]) + + # Index assignment stats + self.add_run_plots(data=index_assignment_data, plot_functions=[tabulate_index_assignment_stats]) + + # Unassigned sequences (only for run_level and combined_level) if summary_path in ["run_level", "combined_level"]: - self.add_run_plots( - data=index_assigment_data, - plot_functions=[ - tabulate_index_assignment_stats, - ], - ) - self.add_run_plots( - data=unassigned_sequences, - plot_functions=[ - tabulate_unassigned_index_stats, - ], - ) - else: - self.add_run_plots( - data=index_assigment_data, - plot_functions=[ - tabulate_index_assignment_stats, - ], - ) + self.add_run_plots(data=unassigned_sequences, plot_functions=[tabulate_unassigned_index_stats]) + # Run-level plots self.add_run_plots( data=run_data, plot_functions=[plot_run_stats, plot_base_quality_hist, plot_base_quality_by_cycle], ) - self.add_sample_plots(data=sample_data, group_lookup=samples_to_projects, project_lookup=samples_to_projects) + # Sample-level plots + self.add_sample_plots( + data=sample_data, + group_lookup=samples_to_projects, + project_lookup=samples_to_projects, + ) - def get_uuid(self): + def get_uuid(self) -> str: return str(uuid.uuid4()).replace("-", "").lower() + def _extract_run_analysis_name( + self, + data: Dict[str, Any], + source_info: str = "RunStats.json", + ) -> Optional[str]: + """ + Extract and validate run_analysis_name from data dict. + + Args: + data: Dictionary containing RunName and AnalysisID keys + source_info: Description of the data source for error messages + + Returns: + The run_analysis_name (RunName-AnalysisID[0:4]) or None if extraction failed + """ + run_name = data.get("RunName") + analysis_id = data.get("AnalysisID") + + if not run_name or not analysis_id: + log.error( + f"Error with {source_info}. Either RunName or AnalysisID is absent.\n" + f"RunName: {run_name}, AnalysisID: {analysis_id}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + return None + + return f"{run_name}-{analysis_id[0:4]}" + def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: + """ + Parse RunStats.json files to extract run/project and sample-level data. + + This is the primary parsing method that populates the core data structures. + It handles both run-level and project-level RunStats.json files. + + Args: + data_source: Search pattern key ("bases2fastq/run" or "bases2fastq/project") + + Returns: + List containing: + - runs_global_data: Dict[run_name, run_stats] - Run/project level metrics + - runs_sample_data: Dict[sample_id, sample_stats] - Per-sample metrics + - sample_to_project: Dict[sample_id, project_name] - Sample-to-project mapping + + Data Flow: + RunStats.json -> parse -> filter samples by min_polonies -> populate dicts + """ runs_global_data = {} runs_sample_data = {} sample_to_project = {} @@ -262,18 +574,10 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: data_to_return["SampleStats"] = [] # get run + analysis - run_name = data.get("RunName", None) - analysis_id = data.get("AnalysisID", None)[0:4] - - if not run_name or not analysis_id: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_name = data.get("RunName") + run_analysis_name = self._extract_run_analysis_name(data, source_info=f"RunStats.json ({f['fn']})") + if run_analysis_name is None: continue - - run_analysis_name = "-".join([run_name, analysis_id]) run_analysis_name = self.clean_s_name(run_analysis_name, f) # skip run if in user provider ignore list @@ -295,10 +599,10 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: run_analysis_sample_name = "__".join([run_analysis_name, sample_name]) num_polonies = sample_data["NumPolonies"] - if num_polonies < MIN_POLONIES: + if num_polonies < self.min_polonies: log.warning( - f"Skipping {run_analysis_sample_name} because it has" - f" <{MIN_POLONIES} assigned reads [n={num_polonies}]." + f"Skipping {run_analysis_sample_name} because it has " + f"<{self.min_polonies} assigned reads [n={num_polonies}]." ) continue @@ -318,6 +622,20 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: return [runs_global_data, runs_sample_data, sample_to_project] def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]: + """ + Parse RunManifest.json for run-level analysis to extract lane and adapter settings. + + Data Flow: + RunManifest.json (via data_source pattern) + + RunStats.json (for run name) from same directory + -> Extract per-lane: index masks, adapter settings, trim lengths + + Args: + data_source: Search pattern key for RunManifest.json files + + Returns: + Dict[run_lane, settings] where run_lane = "{run_name} | L{lane_id}" + """ runs_manifest_data = {} if data_source == "": @@ -330,28 +648,13 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]: # Get RunName and RunID from RunStats.json run_stats_path = Path(directory) / "RunStats.json" - if not run_stats_path.exists(): - log.error( - f"RunStats.json does not exist in the Bases2Fastq output directory {directory}.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_stats = self._read_json_file(run_stats_path) + if run_stats is None: continue - run_analysis_name = None - with open(run_stats_path) as _infile: - run_stats = json.load(_infile) - run_name = run_stats.get("RunName", None) - analysis_id = run_stats.get("AnalysisID", None) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - continue + run_analysis_name = self._extract_run_analysis_name(run_stats, source_info=str(run_stats_path)) + if run_analysis_name is None: + continue run_manifest = json.loads(f["f"]) if "Settings" not in run_manifest: @@ -393,6 +696,17 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]: return runs_manifest_data def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: + """ + Parse RunManifest.json for project-level analysis. + + Similar to _parse_run_manifest but navigates up from project directories + to find the run-level RunManifest.json (via ../../RunManifest.json). + + Data Flow: + Project RunStats.json (for run name) + + ../../RunManifest.json (run-level manifest) + -> Extract per-lane settings + """ project_manifest_data = {} if data_source == "": @@ -403,31 +717,14 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: if not directory: continue - # Get RunName and RunID from RunParameters.json - run_manifest = Path(directory) / "../../RunManifest.json" - if not run_manifest.exists(): - log.error( - f"RunManifest.json could not be found in {run_manifest}. Skipping index assignment.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - continue - + # Get RunManifest.json from run output root (two levels up from project directory) + base_directory = Path(directory).parent.parent + run_manifest = base_directory / "RunManifest.json" project_stats = json.loads(f["f"]) - run_analysis_name = None - run_name = project_stats.get("RunName", None) - analysis_id = project_stats.get("AnalysisID", None) - - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - "Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - log.debug(f"Error in RunStats.json: {f['fn']}") - log.debug(f"Missing: RunName: {run_name} or AnalysisID: {analysis_id}") + run_analysis_name = self._extract_run_analysis_name( + project_stats, source_info=f"project RunStats.json ({f['fn']})" + ) + if run_analysis_name is None: continue # skip run if in user provider ignore list @@ -435,9 +732,9 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: log.info(f"Skipping <{run_analysis_name}> because it is present in ignore list.") continue - run_manifest_data = None - with open(run_manifest) as _infile: - run_manifest_data = json.load(_infile) + run_manifest_data = self._read_json_file(run_manifest, base_directory=base_directory) + if run_manifest_data is None: + continue if "Settings" not in run_manifest_data: log.warning(f" section not found in {run_manifest}.\nSkipping RunManifest metrics.") @@ -482,6 +779,16 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: return project_manifest_data def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: + """ + Parse unassigned/unknown barcode sequences from run-level data. + + Only available for run-level analysis. Extracts sequences that could not + be assigned to any sample, useful for troubleshooting index issues. + + Data Flow: + RunStats.json -> Lanes -> UnassignedSequences + -> Extract: sequence, count, percentage of total polonies + """ run_unassigned_sequences = {} if data_source == "": return run_unassigned_sequences @@ -490,16 +797,9 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: data = json.loads(f["f"]) # Get RunName and AnalysisID - run_name = data.get("RunName", None) - analysis_id = data.get("AnalysisID", None)[0:4] - if not run_name or not analysis_id: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_analysis_name = self._extract_run_analysis_name(data, source_info=f"RunStats.json ({f['fn']})") + if run_analysis_name is None: continue - run_analysis_name = "-".join([run_name, analysis_id]) run_analysis_name = self.clean_s_name(run_analysis_name, f) # skip run if in user provider ignore list @@ -538,6 +838,17 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: return run_unassigned_sequences def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: + """ + Parse index assignment statistics for run-level analysis. + + Combines data from RunStats.json (polony counts) and RunManifest.json + (index sequences) to show how well each sample's index performed. + + Data Flow: + RunStats.json -> SampleStats -> per-sample polony counts + + RunManifest.json -> Samples -> index sequences (Index1, Index2) + -> Combined index assignment table + """ sample_to_index_assignment = {} if manifest_data_source == "": @@ -548,90 +859,71 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: if not directory: continue - # Get RunName and RunID from RunParameters.json + # Get RunName and RunID from RunStats.json run_stats_path = Path(directory) / "RunStats.json" - if not run_stats_path.exists(): - log.error( - f"RunStats.json does not exist in the Bases2Fastq output directory {directory}.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_stats = self._read_json_file(run_stats_path) + if run_stats is None: continue - run_analysis_name = None total_polonies = 0 - with open(run_stats_path) as _infile: - run_stats = json.load(_infile) - - # Get run name information - run_name = run_stats.get("RunName", None) - analysis_id = run_stats.get("AnalysisID", None) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - log.debug(f"Error in RunStats.json: {run_stats_path}") - log.debug(f"Missing: RunName: {run_name} or AnalysisID: {analysis_id}") - continue - # skip run if in user provider ignore list - if self.is_ignore_sample(run_analysis_name): - log.info(f"Skipping <{run_analysis_name}> because it is present in ignore list.") - continue + # Get run name information + run_analysis_name = self._extract_run_analysis_name(run_stats, source_info=str(run_stats_path)) + if run_analysis_name is None: + continue - # Ensure sample stats are present - if "SampleStats" not in run_stats: - log.error( - "Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - log.debug(f"Missing SampleStats in RunStats.json. Available keys: {list(run_stats.keys())}.") - continue + # skip run if in user provider ignore list + if self.is_ignore_sample(run_analysis_name): + log.info(f"Skipping <{run_analysis_name}> because it is present in ignore list.") + continue - # Extract per sample polony counts and overall total counts - total_polonies = run_stats.get("NumPoloniesBeforeTrimming", 0) - for sample_data in run_stats["SampleStats"]: - sample_name = sample_data.get("SampleName") - sample_id = None - if run_analysis_name and sample_name: - sample_id = "__".join([run_analysis_name, sample_name]) + # Ensure sample stats are present + if "SampleStats" not in run_stats: + log.error( + f"Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" + f"Available keys: {list(run_stats.keys())}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + continue + + # Extract per sample polony counts and overall total counts + total_polonies = run_stats.get("NumPoloniesBeforeTrimming", 0) + for sample_data in run_stats["SampleStats"]: + sample_name = sample_data.get("SampleName") + sample_id = None + if run_analysis_name and sample_name: + sample_id = "__".join([run_analysis_name, sample_name]) + + if "Occurrences" not in sample_data: + log.error(f"Missing data needed to extract index assignment for sample {sample_id}. Skipping.") + continue - if "Occurrences" not in sample_data: + for occurrence in sample_data["Occurrences"]: + sample_expected_seq = occurrence.get("ExpectedSequence") + sample_counts = occurrence.get("NumPoloniesBeforeTrimming") + if any([element is None for element in [sample_expected_seq, sample_counts, sample_id]]): log.error(f"Missing data needed to extract index assignment for sample {sample_id}. Skipping.") continue + if run_analysis_name not in sample_to_index_assignment: + sample_to_index_assignment[run_analysis_name] = {} + if sample_expected_seq not in sample_to_index_assignment[run_analysis_name]: + sample_to_index_assignment[run_analysis_name][sample_expected_seq] = { + "SampleID": sample_id, + "SamplePolonyCounts": 0, + "PercentOfPolonies": float("nan"), + "Index1": "", + "Index2": "", + } + sample_to_index_assignment[run_analysis_name][sample_expected_seq]["SamplePolonyCounts"] += ( + sample_counts + ) - for occurrence in sample_data["Occurrences"]: - sample_expected_seq = occurrence.get("ExpectedSequence") - sample_counts = occurrence.get("NumPoloniesBeforeTrimming") - if any([element is None for element in [sample_expected_seq, sample_counts, sample_id]]): - log.error( - f"Missing data needed to extract index assignment for sample {sample_id}. Skipping." - ) - continue - if run_analysis_name not in sample_to_index_assignment: - sample_to_index_assignment[run_analysis_name] = {} - if sample_expected_seq not in sample_to_index_assignment[run_analysis_name]: - sample_to_index_assignment[run_analysis_name][sample_expected_seq] = { - "SampleID": sample_id, - "SamplePolonyCounts": 0, - "PercentOfPolonies": float("nan"), - "Index1": "", - "Index2": "", - } - sample_to_index_assignment[run_analysis_name][sample_expected_seq]["SamplePolonyCounts"] += ( - sample_counts - ) - - for sample_data in sample_to_index_assignment[run_analysis_name].values(): - if total_polonies > 0: - sample_data["PercentOfPolonies"] = round( - sample_data["SamplePolonyCounts"] / total_polonies * 100, 2 - ) + for sample_data in sample_to_index_assignment[run_analysis_name].values(): + if total_polonies > 0: + sample_data["PercentOfPolonies"] = round( + sample_data["SamplePolonyCounts"] / total_polonies * 100, 2 + ) run_manifest = json.loads(f["f"]) if "Samples" not in run_manifest: @@ -668,6 +960,17 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: return sample_to_index_assignment def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any]: + """ + Parse index assignment statistics for project-level analysis. + + Similar to _parse_index_assignment but works with project-split output, + navigating up to find the run-level RunManifest.json. + + Data Flow: + Project RunStats.json -> SampleStats -> polony counts + + ../../RunManifest.json -> Samples -> index sequences + -> Combined index assignment table + """ sample_to_index_assignment = {} if data_source == "": @@ -678,32 +981,17 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] if not directory: continue - # Get RunName and RunID from RunParameters.json - run_manifest = Path(directory) / "../../RunManifest.json" - if not run_manifest.exists(): - log.error( - f"RunManifest.json could not be found in {run_manifest}. Skipping index assignment.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - continue + # Get RunManifest.json from run output root (two levels up from project directory) + base_directory = Path(directory).parent.parent + run_manifest = base_directory / "RunManifest.json" project_stats = json.loads(f["f"]) - run_analysis_name = None - run_name = project_stats.get("RunName", None) - analysis_id = project_stats.get("AnalysisID", None) project = self.clean_s_name(project_stats.get("Project", "DefaultProject"), f) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - "Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - log.debug(f"Error in RunStats.json: {f['fn']}") - log.debug(f"Missing: RunName: {run_name} or AnalysisID: {analysis_id}") + run_analysis_name = self._extract_run_analysis_name( + project_stats, source_info=f"project RunStats.json ({f['fn']})" + ) + if run_analysis_name is None: continue # skip run if in user provider ignore list @@ -714,11 +1002,11 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] # Ensure sample stats are present if "SampleStats" not in project_stats: log.error( - "Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" + f"Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" + f"Available keys: {list(project_stats.keys())}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" ) - log.debug(f"Missing SampleStats in RunStats.json. Available keys: {list(project_stats.keys())}.") continue # Extract per sample polony counts and overall total counts @@ -761,13 +1049,13 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] sample_data["SamplePolonyCounts"] / total_polonies * 100, 2 ) - run_manifest_data = None - with open(run_manifest) as _infile: - run_manifest_data = json.load(_infile) + run_manifest_data = self._read_json_file(run_manifest, base_directory=base_directory) + if run_manifest_data is None: + continue if "Samples" not in run_manifest_data: log.warning( - f" section not found in {directory}/RunManifest.json.\n" + f" section not found in {run_manifest}.\n" f"Skipping RunManifest sample index assignment metrics." ) elif len(sample_to_index_assignment) == 0: @@ -797,14 +1085,16 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] return sample_to_index_assignment - def add_run_plots(self, data, plot_functions): + def add_run_plots(self, data: Dict[str, Any], plot_functions: List[Callable]) -> None: for func in plot_functions: plot_html, plot_name, anchor, description, helptext, plot_data = func(data, self.run_color) self.add_section(name=plot_name, plot=plot_html, anchor=anchor, description=description, helptext=helptext) self.write_data_file(plot_data, f"base2fastq:{plot_name}") - def add_sample_plots(self, data, group_lookup, project_lookup): - plot_functions = [ + def add_sample_plots( + self, data: Dict[str, Any], group_lookup: Dict[str, str], project_lookup: Dict[str, str] + ) -> None: + plot_functions: List[Callable] = [ tabulate_sample_stats, sequence_content_plot, plot_per_cycle_N_content,