From 480d2c874e837da950a93fd2df6d2a6bac7a5126 Mon Sep 17 00:00:00 2001 From: Carlos Ruiz Date: Wed, 17 Dec 2025 12:02:25 -0800 Subject: [PATCH 1/4] Addressed pressing comments --- multiqc/modules/bases2fastq/bases2fastq.py | 570 +++++++++++++-------- 1 file changed, 351 insertions(+), 219 deletions(-) diff --git a/multiqc/modules/bases2fastq/bases2fastq.py b/multiqc/modules/bases2fastq/bases2fastq.py index df4e29af77..9ba5dc3e48 100644 --- a/multiqc/modules/bases2fastq/bases2fastq.py +++ b/multiqc/modules/bases2fastq/bases2fastq.py @@ -8,6 +8,7 @@ import uuid from pathlib import Path +from multiqc import config from multiqc.base_module import BaseMultiqcModule, ModuleNoSamplesFound from multiqc.types import LoadedFileDict from multiqc.utils import mqc_colour @@ -33,10 +34,56 @@ log = logging.getLogger(__name__) -MIN_POLONIES = 10000 +# Default minimum polony threshold - samples below this are skipped +DEFAULT_MIN_POLONIES = 10000 + + +def _get_min_polonies() -> int: + """ + Get the minimum polonies threshold from config or use default. + + Can be configured in multiqc_config.yaml: + bases2fastq_config: + min_polonies: 5000 + """ + cfg = getattr(config, "bases2fastq_config", {}) + if not isinstance(cfg, dict): + return DEFAULT_MIN_POLONIES + + min_polonies = cfg.get("min_polonies", DEFAULT_MIN_POLONIES) + try: + min_polonies = int(min_polonies) + except (ValueError, TypeError): + log.warning(f"Invalid min_polonies value '{min_polonies}', using default {DEFAULT_MIN_POLONIES}") + min_polonies = DEFAULT_MIN_POLONIES + + if min_polonies != DEFAULT_MIN_POLONIES: + log.debug(f"Using custom min_polonies threshold: {min_polonies}") + + return min_polonies class MultiqcModule(BaseMultiqcModule): + """ + Bases2Fastq is Element Biosciences' secondary analysis software for demultiplexing + sequencing data from AVITI systems and converting base calls into FASTQ files. + + The module parses the following output files from Bases2Fastq: + + - `RunStats.json`: Contains run-level and sample-level QC metrics + - `RunManifest.json`: Contains sample sheet information including indexing and adapter settings + - Project-level `RunStats.json`: Contains project-specific metrics when demultiplexing by project + + The module supports both run-level analysis (single run) and project-level analysis + (aggregated metrics across projects), displaying metrics such as: + + - Polony counts and yields + - Base quality distributions + - Index assignment statistics + - Per-sample sequence content and GC distribution + - Adapter content analysis + """ + def __init__(self): super(MultiqcModule, self).__init__( name="Bases2Fastq", @@ -46,29 +93,63 @@ def __init__(self): doi="10.1038/s41587-023-01750-7", ) - # Initialize run, project and sample level structures - self.run_level_data = {} - self.run_level_samples = {} - self.run_level_samples_to_project = {} - self.project_level_data = {} - self.project_level_samples = {} - self.project_level_samples_to_project = {} - num_run_level_samples = 0 - num_project_level_samples = 0 - - # Initialize run and project groups - self.group_dict = dict() - self.group_lookup_dict = dict() - self.project_lookup_dict = dict() - - self.b2f_sample_data = dict() - self.b2f_run_data = dict() - self.b2f_run_project_data = dict() - self.b2f_run_project_sample_data = dict() - self.missing_runs = set() - self.sample_id_to_run = dict() - - # Define if call is project- or run-level + # Get configurable minimum polonies threshold + self.min_polonies = _get_min_polonies() + + # Initialize data structures + self._init_data_structures() + + # Parse and validate input data + summary_path = self._parse_and_validate_data() + + # Select data based on summary path and parse additional sources + run_data, sample_data, samples_to_projects, manifest_data, index_assignment_data, unassigned_sequences = ( + self._select_data_by_summary_path(summary_path) + ) + + # Set up color schemes for groups and samples + self._setup_colors(sample_data, samples_to_projects, summary_path) + + # Generate all plots and sections + self._generate_plots( + summary_path, run_data, sample_data, samples_to_projects, + manifest_data, index_assignment_data, unassigned_sequences + ) + + # Write main data file at the very end after all sections are added + self.write_data_file(sample_data, "bases2fastq") + + def _init_data_structures(self) -> None: + """Initialize all data structures used by the module.""" + # Run, project and sample level structures + self.run_level_data: Dict[str, Any] = {} + self.run_level_samples: Dict[str, Any] = {} + self.run_level_samples_to_project: Dict[str, str] = {} + self.project_level_data: Dict[str, Any] = {} + self.project_level_samples: Dict[str, Any] = {} + self.project_level_samples_to_project: Dict[str, str] = {} + + # Run and project groups + self.group_dict: Dict[str, Any] = {} + self.group_lookup_dict: Dict[str, Any] = {} + self.project_lookup_dict: Dict[str, Any] = {} + + # Additional data structures + self.b2f_sample_data: Dict[str, Any] = {} + self.b2f_run_data: Dict[str, Any] = {} + self.b2f_run_project_data: Dict[str, Any] = {} + self.b2f_run_project_sample_data: Dict[str, Any] = {} + self.missing_runs: set = set() + self.sample_id_to_run: Dict[str, str] = {} + + def _parse_and_validate_data(self) -> str: + """ + Parse input data and validate that samples were found. + + Returns: + summary_path: The determined summary path ('run_level', 'project_level', or 'combined_level') + """ + # Check for available log files run_level_log_files = len(list(self.find_log_files("bases2fastq/run"))) project_level_log_files = len(list(self.find_log_files("bases2fastq/project"))) @@ -77,7 +158,7 @@ def __init__(self): log.error(error_msg) raise ModuleNoSamplesFound(error_msg) - # Parse data + # Parse data from available sources if run_level_log_files > 0: (self.run_level_data, self.run_level_samples, self.run_level_samples_to_project) = ( self._parse_run_project_data("bases2fastq/run") @@ -87,33 +168,25 @@ def __init__(self): self._parse_run_project_data("bases2fastq/project") ) - # Get run- and project-level samples + # Count samples num_run_level_samples = len(self.run_level_samples) num_project_level_samples = len(self.project_level_samples) - # Ensure run/sample data found - if all( - [ - len(self.run_level_data) == 0, - num_run_level_samples == 0, - len(self.project_level_data) == 0, - num_project_level_samples == 0, - ] - ): + # Ensure at least some data was found + if all([ + len(self.run_level_data) == 0, + num_run_level_samples == 0, + len(self.project_level_data) == 0, + num_project_level_samples == 0, + ]): error_msg = "No run-, project- or sample-level data found" log.error(error_msg) raise ModuleNoSamplesFound(error_msg) - # Choose path to take, if project use only project-level data, otherwise use run- and project-level - summary_path = "" - if len(self.run_level_data) > 0 and len(self.project_level_data) == 0: - summary_path = "run_level" - if len(self.run_level_data) == 0 and len(self.project_level_data) > 0: - summary_path = "project_level" - elif len(self.run_level_data) > 0 and len(self.project_level_data) > 0: - summary_path = "combined_level" + # Determine summary path + summary_path = self._determine_summary_path() - # Log runs, projects and samples found + # Log what was found log.info(f"Found {len(self.run_level_data)} run(s) within the Bases2Fastq results.") log.info(f"Found {len(self.project_level_data)} project(s) within the Bases2Fastq results.") if summary_path == "run_level": @@ -121,65 +194,98 @@ def __init__(self): else: log.info(f"Found {num_project_level_samples} sample(s) within the Bases2Fastq results.") - # Superfluous function call to confirm that it is used in this module + # Required call to confirm module is used self.add_software_version(None) - # Warn user if run-level/project-level or sample-level metrics were not found + # Warn if no data found if len(self.run_level_data) == 0 and len(self.project_level_data) == 0: log.warning("No run/project stats found!") if num_run_level_samples == 0 and num_project_level_samples == 0: log.warning("No sample stats found!") - # Define data to use - run_data = {} - sample_data = {} - samples_to_projects = {} - manifest_data = {} - index_assigment_data = {} - unassigned_sequences = {} + return summary_path + + def _determine_summary_path(self) -> str: + """ + Determine which summary path to use based on available data. + + Returns: + 'run_level', 'project_level', or 'combined_level' + """ + has_run_data = len(self.run_level_data) > 0 + has_project_data = len(self.project_level_data) > 0 + + if has_run_data and not has_project_data: + return "run_level" + elif not has_run_data and has_project_data: + return "project_level" + elif has_run_data and has_project_data: + return "combined_level" + else: + error_msg = "No run- or project-level data was retained. No report will be generated." + log.error(error_msg) + raise ModuleNoSamplesFound(error_msg) + + def _select_data_by_summary_path(self, summary_path: str): + """ + Select the appropriate data sources based on the summary path. + + Returns: + Tuple of (run_data, sample_data, samples_to_projects, manifest_data, + index_assignment_data, unassigned_sequences) + """ if summary_path == "run_level": - run_data = self.run_level_data - sample_data = self.run_level_samples - samples_to_projects = self.run_level_samples_to_project - manifest_data = self._parse_run_manifest("bases2fastq/manifest") - index_assigment_data = self._parse_index_assignment("bases2fastq/manifest") - unassigned_sequences = self._parse_run_unassigned_sequences("bases2fastq/run") + return ( + self.run_level_data, + self.run_level_samples, + self.run_level_samples_to_project, + self._parse_run_manifest("bases2fastq/manifest"), + self._parse_index_assignment("bases2fastq/manifest"), + self._parse_run_unassigned_sequences("bases2fastq/run"), + ) elif summary_path == "project_level": - run_data = self.project_level_data - sample_data = self.project_level_samples - samples_to_projects = self.project_level_samples_to_project - manifest_data = self._parse_run_manifest_in_project("bases2fastq/project") - index_assigment_data = self._parse_index_assignment_in_project("bases2fastq/project") + return ( + self.project_level_data, + self.project_level_samples, + self.project_level_samples_to_project, + self._parse_run_manifest_in_project("bases2fastq/project"), + self._parse_index_assignment_in_project("bases2fastq/project"), + {}, # No unassigned sequences for project level + ) elif summary_path == "combined_level": - run_data = self.run_level_data - sample_data = self.project_level_samples - samples_to_projects = self.project_level_samples_to_project - manifest_data = self._parse_run_manifest("bases2fastq/manifest") - index_assigment_data = self._parse_index_assignment("bases2fastq/manifest") - unassigned_sequences = self._parse_run_unassigned_sequences("bases2fastq/run") + return ( + self.run_level_data, + self.project_level_samples, + self.project_level_samples_to_project, + self._parse_run_manifest("bases2fastq/manifest"), + self._parse_index_assignment("bases2fastq/manifest"), + self._parse_run_unassigned_sequences("bases2fastq/run"), + ) else: error_msg = "No run- or project-level data was retained. No report will be generated." log.error(error_msg) raise ModuleNoSamplesFound(error_msg) + def _setup_colors(self, sample_data: Dict, samples_to_projects: Dict, summary_path: str) -> None: + """Set up color schemes for groups and samples.""" # Create run and project groups - run_groups = defaultdict(list) - project_groups = defaultdict(list) - in_project_sample_groups = defaultdict(list) - ind_sample_groups = defaultdict(list) - sample_to_run_group = {} + run_groups: Dict[str, List] = defaultdict(list) + project_groups: Dict[str, List] = defaultdict(list) + in_project_sample_groups: Dict[str, List] = defaultdict(list) + ind_sample_groups: Dict[str, List] = defaultdict(list) + for sample in sample_data.keys(): - (_run_name, _) = sample.split("__") - run_groups[_run_name].append(sample) - sample_to_run_group[sample] = _run_name + run_name, _ = sample.split("__") + run_groups[run_name].append(sample) sample_project = samples_to_projects[sample] project_groups[sample_project].append(sample) ind_sample_groups[sample] = [sample] if summary_path == "project_level": in_project_sample_groups[sample].append(sample) + merged_groups = {**run_groups, **project_groups, **in_project_sample_groups, **ind_sample_groups} - # Assign color for each group + # Build color palette self.color_getter = mqc_colour.mqc_colour_scale() self.palette = sum( [ @@ -188,61 +294,71 @@ def __init__(self): ], [], ) + + # Add extra colors if needed if len(merged_groups) > len(self.palette): extra_colors = [ - "#{:06x}".format(random.randrange(0, 0xFFFFFF)) for _ in range(len(self.palette), len(merged_groups)) + f"#{random.randrange(0, 0xFFFFFF):06x}" for _ in range(len(self.palette), len(merged_groups)) ] self.palette = self.palette + extra_colors - self.group_color = {g: c for g, c in zip(merged_groups.keys(), self.palette[: len(merged_groups)])} - self.sample_color = dict() - for s_name in samples_to_projects.keys(): - s_color = ( - self.group_color[s_name] - if (summary_path == "project_level" or len(project_groups) == 1) - else self.group_color[samples_to_projects[s_name]] - ) - self.sample_color.update({s_name: s_color}) - self.run_color = copy.deepcopy(self.group_color) # Make sure that run colors and group colors match - self.palette = self.palette[len(merged_groups) :] - # Plot metrics + # Assign colors to groups + self.group_color = { + group: color for group, color in zip(merged_groups.keys(), self.palette[:len(merged_groups)]) + } + + # Assign colors to samples + self.sample_color: Dict[str, str] = {} + for sample_name in samples_to_projects.keys(): + if summary_path == "project_level" or len(project_groups) == 1: + sample_color = self.group_color[sample_name] + else: + sample_color = self.group_color[samples_to_projects[sample_name]] + self.sample_color[sample_name] = sample_color + + # Copy group colors to run colors + self.run_color = copy.deepcopy(self.group_color) + self.palette = self.palette[len(merged_groups):] + + def _generate_plots( + self, + summary_path: str, + run_data: Dict, + sample_data: Dict, + samples_to_projects: Dict, + manifest_data: Dict, + index_assignment_data: Dict, + unassigned_sequences: Dict, + ) -> None: + """Generate all plots and add sections to the report.""" + # QC metrics table qc_metrics_function = ( tabulate_run_stats if summary_path in ["run_level", "combined_level"] else tabulate_project_stats ) self.add_run_plots(data=run_data, plot_functions=[qc_metrics_function]) - self.add_run_plots( - data=manifest_data, - plot_functions=[ - tabulate_manifest_stats, - ], - ) + + # Manifest stats + self.add_run_plots(data=manifest_data, plot_functions=[tabulate_manifest_stats]) + + # Index assignment stats + self.add_run_plots(data=index_assignment_data, plot_functions=[tabulate_index_assignment_stats]) + + # Unassigned sequences (only for run_level and combined_level) if summary_path in ["run_level", "combined_level"]: - self.add_run_plots( - data=index_assigment_data, - plot_functions=[ - tabulate_index_assignment_stats, - ], - ) - self.add_run_plots( - data=unassigned_sequences, - plot_functions=[ - tabulate_unassigned_index_stats, - ], - ) - else: - self.add_run_plots( - data=index_assigment_data, - plot_functions=[ - tabulate_index_assignment_stats, - ], - ) + self.add_run_plots(data=unassigned_sequences, plot_functions=[tabulate_unassigned_index_stats]) + # Run-level plots self.add_run_plots( data=run_data, plot_functions=[plot_run_stats, plot_base_quality_hist, plot_base_quality_by_cycle], ) - self.add_sample_plots(data=sample_data, group_lookup=samples_to_projects, project_lookup=samples_to_projects) + # Sample-level plots + self.add_sample_plots( + data=sample_data, + group_lookup=samples_to_projects, + project_lookup=samples_to_projects, + ) def get_uuid(self): return str(uuid.uuid4()).replace("-", "").lower() @@ -263,7 +379,7 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: # get run + analysis run_name = data.get("RunName", None) - analysis_id = data.get("AnalysisID", None)[0:4] + analysis_id = data.get("AnalysisID", None) if not run_name or not analysis_id: log.error( @@ -273,6 +389,8 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: ) continue + analysis_id = analysis_id[0:4] + run_analysis_name = "-".join([run_name, analysis_id]) run_analysis_name = self.clean_s_name(run_analysis_name, f) @@ -295,10 +413,10 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: run_analysis_sample_name = "__".join([run_analysis_name, sample_name]) num_polonies = sample_data["NumPolonies"] - if num_polonies < MIN_POLONIES: + if num_polonies < self.min_polonies: log.warning( - f"Skipping {run_analysis_sample_name} because it has" - f" <{MIN_POLONIES} assigned reads [n={num_polonies}]." + f"Skipping {run_analysis_sample_name} because it has " + f"<{self.min_polonies} assigned reads [n={num_polonies}]." ) continue @@ -339,19 +457,23 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]: continue run_analysis_name = None - with open(run_stats_path) as _infile: - run_stats = json.load(_infile) - run_name = run_stats.get("RunName", None) - analysis_id = run_stats.get("AnalysisID", None) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - continue + try: + with open(run_stats_path) as _infile: + run_stats = json.load(_infile) + run_name = run_stats.get("RunName", None) + analysis_id = run_stats.get("AnalysisID", None) + if run_name and analysis_id: + run_analysis_name = "-".join([run_name, analysis_id[0:4]]) + else: + log.error( + "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" + "Please visit Elembio online documentation for more information - " + "https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + continue + except (json.JSONDecodeError, OSError) as e: + log.error(f"Error reading {run_stats_path}: {e}") + continue run_manifest = json.loads(f["f"]) if "Settings" not in run_manifest: @@ -422,12 +544,11 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: run_analysis_name = "-".join([run_name, analysis_id[0:4]]) else: log.error( - "Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" + f"Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" + f"File: {f['fn']}, RunName: {run_name}, AnalysisID: {analysis_id}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" ) - log.debug(f"Error in RunStats.json: {f['fn']}") - log.debug(f"Missing: RunName: {run_name} or AnalysisID: {analysis_id}") continue # skip run if in user provider ignore list @@ -436,8 +557,12 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: continue run_manifest_data = None - with open(run_manifest) as _infile: - run_manifest_data = json.load(_infile) + try: + with open(run_manifest) as _infile: + run_manifest_data = json.load(_infile) + except (json.JSONDecodeError, OSError) as e: + log.error(f"Error reading {run_manifest}: {e}") + continue if "Settings" not in run_manifest_data: log.warning(f" section not found in {run_manifest}.\nSkipping RunManifest metrics.") @@ -491,7 +616,7 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: # Get RunName and AnalysisID run_name = data.get("RunName", None) - analysis_id = data.get("AnalysisID", None)[0:4] + analysis_id = data.get("AnalysisID", None) if not run_name or not analysis_id: log.error( "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" @@ -499,6 +624,7 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: "https://docs.elembio.io/docs/bases2fastq/introduction/" ) continue + analysis_id = analysis_id[0:4] run_analysis_name = "-".join([run_name, analysis_id]) run_analysis_name = self.clean_s_name(run_analysis_name, f) @@ -560,78 +686,81 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: run_analysis_name = None total_polonies = 0 - with open(run_stats_path) as _infile: - run_stats = json.load(_infile) - - # Get run name information - run_name = run_stats.get("RunName", None) - analysis_id = run_stats.get("AnalysisID", None) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - log.debug(f"Error in RunStats.json: {run_stats_path}") - log.debug(f"Missing: RunName: {run_name} or AnalysisID: {analysis_id}") - continue + try: + with open(run_stats_path) as _infile: + run_stats = json.load(_infile) + except (json.JSONDecodeError, OSError) as e: + log.error(f"Error reading {run_stats_path}: {e}") + continue - # skip run if in user provider ignore list - if self.is_ignore_sample(run_analysis_name): - log.info(f"Skipping <{run_analysis_name}> because it is present in ignore list.") - continue + # Get run name information + run_name = run_stats.get("RunName", None) + analysis_id = run_stats.get("AnalysisID", None) + if run_name and analysis_id: + run_analysis_name = "-".join([run_name, analysis_id[0:4]]) + else: + log.error( + f"Error with RunStats.json. Either RunName or AnalysisID is absent.\n" + f"File: {run_stats_path}, RunName: {run_name}, AnalysisID: {analysis_id}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + continue - # Ensure sample stats are present - if "SampleStats" not in run_stats: - log.error( - "Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - log.debug(f"Missing SampleStats in RunStats.json. Available keys: {list(run_stats.keys())}.") - continue + # skip run if in user provider ignore list + if self.is_ignore_sample(run_analysis_name): + log.info(f"Skipping <{run_analysis_name}> because it is present in ignore list.") + continue - # Extract per sample polony counts and overall total counts - total_polonies = run_stats.get("NumPoloniesBeforeTrimming", 0) - for sample_data in run_stats["SampleStats"]: - sample_name = sample_data.get("SampleName") - sample_id = None - if run_analysis_name and sample_name: - sample_id = "__".join([run_analysis_name, sample_name]) + # Ensure sample stats are present + if "SampleStats" not in run_stats: + log.error( + f"Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" + f"Available keys: {list(run_stats.keys())}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + continue - if "Occurrences" not in sample_data: - log.error(f"Missing data needed to extract index assignment for sample {sample_id}. Skipping.") - continue + # Extract per sample polony counts and overall total counts + total_polonies = run_stats.get("NumPoloniesBeforeTrimming", 0) + for sample_data in run_stats["SampleStats"]: + sample_name = sample_data.get("SampleName") + sample_id = None + if run_analysis_name and sample_name: + sample_id = "__".join([run_analysis_name, sample_name]) - for occurrence in sample_data["Occurrences"]: - sample_expected_seq = occurrence.get("ExpectedSequence") - sample_counts = occurrence.get("NumPoloniesBeforeTrimming") - if any([element is None for element in [sample_expected_seq, sample_counts, sample_id]]): - log.error( - f"Missing data needed to extract index assignment for sample {sample_id}. Skipping." - ) - continue - if run_analysis_name not in sample_to_index_assignment: - sample_to_index_assignment[run_analysis_name] = {} - if sample_expected_seq not in sample_to_index_assignment[run_analysis_name]: - sample_to_index_assignment[run_analysis_name][sample_expected_seq] = { - "SampleID": sample_id, - "SamplePolonyCounts": 0, - "PercentOfPolonies": float("nan"), - "Index1": "", - "Index2": "", - } - sample_to_index_assignment[run_analysis_name][sample_expected_seq]["SamplePolonyCounts"] += ( - sample_counts - ) + if "Occurrences" not in sample_data: + log.error(f"Missing data needed to extract index assignment for sample {sample_id}. Skipping.") + continue - for sample_data in sample_to_index_assignment[run_analysis_name].values(): - if total_polonies > 0: - sample_data["PercentOfPolonies"] = round( - sample_data["SamplePolonyCounts"] / total_polonies * 100, 2 + for occurrence in sample_data["Occurrences"]: + sample_expected_seq = occurrence.get("ExpectedSequence") + sample_counts = occurrence.get("NumPoloniesBeforeTrimming") + if any([element is None for element in [sample_expected_seq, sample_counts, sample_id]]): + log.error( + f"Missing data needed to extract index assignment for sample {sample_id}. Skipping." ) + continue + if run_analysis_name not in sample_to_index_assignment: + sample_to_index_assignment[run_analysis_name] = {} + if sample_expected_seq not in sample_to_index_assignment[run_analysis_name]: + sample_to_index_assignment[run_analysis_name][sample_expected_seq] = { + "SampleID": sample_id, + "SamplePolonyCounts": 0, + "PercentOfPolonies": float("nan"), + "Index1": "", + "Index2": "", + } + sample_to_index_assignment[run_analysis_name][sample_expected_seq]["SamplePolonyCounts"] += ( + sample_counts + ) + + for sample_data in sample_to_index_assignment[run_analysis_name].values(): + if total_polonies > 0: + sample_data["PercentOfPolonies"] = round( + sample_data["SamplePolonyCounts"] / total_polonies * 100, 2 + ) run_manifest = json.loads(f["f"]) if "Samples" not in run_manifest: @@ -698,12 +827,11 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] run_analysis_name = "-".join([run_name, analysis_id[0:4]]) else: log.error( - "Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" + f"Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" + f"File: {f['fn']}, RunName: {run_name}, AnalysisID: {analysis_id}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" ) - log.debug(f"Error in RunStats.json: {f['fn']}") - log.debug(f"Missing: RunName: {run_name} or AnalysisID: {analysis_id}") continue # skip run if in user provider ignore list @@ -714,11 +842,11 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] # Ensure sample stats are present if "SampleStats" not in project_stats: log.error( - "Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" + f"Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n" + f"Available keys: {list(project_stats.keys())}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" ) - log.debug(f"Missing SampleStats in RunStats.json. Available keys: {list(project_stats.keys())}.") continue # Extract per sample polony counts and overall total counts @@ -762,8 +890,12 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] ) run_manifest_data = None - with open(run_manifest) as _infile: - run_manifest_data = json.load(_infile) + try: + with open(run_manifest) as _infile: + run_manifest_data = json.load(_infile) + except (json.JSONDecodeError, OSError) as e: + log.error(f"Error reading {run_manifest}: {e}") + continue if "Samples" not in run_manifest_data: log.warning( From 348c14f050f984117972a8621becde530bd03d4e Mon Sep 17 00:00:00 2001 From: Carlos Ruiz Date: Wed, 17 Dec 2025 12:29:02 -0800 Subject: [PATCH 2/4] Address performance comments --- multiqc/modules/bases2fastq/bases2fastq.py | 229 +++++++++------------ 1 file changed, 97 insertions(+), 132 deletions(-) diff --git a/multiqc/modules/bases2fastq/bases2fastq.py b/multiqc/modules/bases2fastq/bases2fastq.py index 9ba5dc3e48..f884565e69 100644 --- a/multiqc/modules/bases2fastq/bases2fastq.py +++ b/multiqc/modules/bases2fastq/bases2fastq.py @@ -1,10 +1,11 @@ from collections import defaultdict import copy +from itertools import chain import re import json import logging import random -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import uuid from pathlib import Path @@ -121,6 +122,9 @@ def __init__(self): def _init_data_structures(self) -> None: """Initialize all data structures used by the module.""" + # File cache to avoid reading the same JSON files multiple times + self._file_cache: Dict[str, Any] = {} + # Run, project and sample level structures self.run_level_data: Dict[str, Any] = {} self.run_level_samples: Dict[str, Any] = {} @@ -142,6 +146,38 @@ def _init_data_structures(self) -> None: self.missing_runs: set = set() self.sample_id_to_run: Dict[str, str] = {} + def _read_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]: + """ + Read and parse a JSON file with caching. + + Args: + file_path: Path to the JSON file + + Returns: + Parsed JSON data or None if reading failed + """ + cache_key = str(file_path.resolve()) + + if cache_key in self._file_cache: + return self._file_cache[cache_key] + + if not file_path.exists(): + log.error( + f"{file_path.name} does not exist at {file_path}.\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + return None + + try: + with open(file_path) as _infile: + data = json.load(_infile) + self._file_cache[cache_key] = data + return data + except (json.JSONDecodeError, OSError) as e: + log.error(f"Error reading {file_path}: {e}") + return None + def _parse_and_validate_data(self) -> str: """ Parse input data and validate that samples were found. @@ -287,13 +323,10 @@ def _setup_colors(self, sample_data: Dict, samples_to_projects: Dict, summary_pa # Build color palette self.color_getter = mqc_colour.mqc_colour_scale() - self.palette = sum( - [ - self.color_getter.get_colours(hue) - for hue in ["Set2", "Pastel1", "Accent", "Set1", "Set3", "Dark2", "Paired", "Pastel2"] - ], - [], - ) + self.palette = list(chain.from_iterable( + self.color_getter.get_colours(hue) + for hue in ["Set2", "Pastel1", "Accent", "Set1", "Set3", "Dark2", "Paired", "Pastel2"] + )) # Add extra colors if needed if len(merged_groups) > len(self.palette): @@ -363,6 +396,35 @@ def _generate_plots( def get_uuid(self): return str(uuid.uuid4()).replace("-", "").lower() + def _extract_run_analysis_name( + self, + data: Dict[str, Any], + source_info: str = "RunStats.json", + ) -> Optional[str]: + """ + Extract and validate run_analysis_name from data dict. + + Args: + data: Dictionary containing RunName and AnalysisID keys + source_info: Description of the data source for error messages + + Returns: + The run_analysis_name (RunName-AnalysisID[0:4]) or None if extraction failed + """ + run_name = data.get("RunName") + analysis_id = data.get("AnalysisID") + + if not run_name or not analysis_id: + log.error( + f"Error with {source_info}. Either RunName or AnalysisID is absent.\n" + f"RunName: {run_name}, AnalysisID: {analysis_id}\n" + f"Please visit Elembio online documentation for more information - " + f"https://docs.elembio.io/docs/bases2fastq/introduction/" + ) + return None + + return f"{run_name}-{analysis_id[0:4]}" + def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: runs_global_data = {} runs_sample_data = {} @@ -378,20 +440,10 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: data_to_return["SampleStats"] = [] # get run + analysis - run_name = data.get("RunName", None) - analysis_id = data.get("AnalysisID", None) - - if not run_name or not analysis_id: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_name = data.get("RunName") + run_analysis_name = self._extract_run_analysis_name(data, source_info=f"RunStats.json ({f['fn']})") + if run_analysis_name is None: continue - - analysis_id = analysis_id[0:4] - - run_analysis_name = "-".join([run_name, analysis_id]) run_analysis_name = self.clean_s_name(run_analysis_name, f) # skip run if in user provider ignore list @@ -448,31 +500,12 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]: # Get RunName and RunID from RunStats.json run_stats_path = Path(directory) / "RunStats.json" - if not run_stats_path.exists(): - log.error( - f"RunStats.json does not exist in the Bases2Fastq output directory {directory}.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_stats = self._read_json_file(run_stats_path) + if run_stats is None: continue - run_analysis_name = None - try: - with open(run_stats_path) as _infile: - run_stats = json.load(_infile) - run_name = run_stats.get("RunName", None) - analysis_id = run_stats.get("AnalysisID", None) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - continue - except (json.JSONDecodeError, OSError) as e: - log.error(f"Error reading {run_stats_path}: {e}") + run_analysis_name = self._extract_run_analysis_name(run_stats, source_info=str(run_stats_path)) + if run_analysis_name is None: continue run_manifest = json.loads(f["f"]) @@ -527,28 +560,11 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: # Get RunName and RunID from RunParameters.json run_manifest = Path(directory) / "../../RunManifest.json" - if not run_manifest.exists(): - log.error( - f"RunManifest.json could not be found in {run_manifest}. Skipping index assignment.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - continue - project_stats = json.loads(f["f"]) - run_analysis_name = None - run_name = project_stats.get("RunName", None) - analysis_id = project_stats.get("AnalysisID", None) - - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - f"Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" - f"File: {f['fn']}, RunName: {run_name}, AnalysisID: {analysis_id}\n" - f"Please visit Elembio online documentation for more information - " - f"https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_analysis_name = self._extract_run_analysis_name( + project_stats, source_info=f"project RunStats.json ({f['fn']})" + ) + if run_analysis_name is None: continue # skip run if in user provider ignore list @@ -556,12 +572,8 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: log.info(f"Skipping <{run_analysis_name}> because it is present in ignore list.") continue - run_manifest_data = None - try: - with open(run_manifest) as _infile: - run_manifest_data = json.load(_infile) - except (json.JSONDecodeError, OSError) as e: - log.error(f"Error reading {run_manifest}: {e}") + run_manifest_data = self._read_json_file(run_manifest) + if run_manifest_data is None: continue if "Settings" not in run_manifest_data: @@ -615,17 +627,9 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: data = json.loads(f["f"]) # Get RunName and AnalysisID - run_name = data.get("RunName", None) - analysis_id = data.get("AnalysisID", None) - if not run_name or not analysis_id: - log.error( - "Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_analysis_name = self._extract_run_analysis_name(data, source_info=f"RunStats.json ({f['fn']})") + if run_analysis_name is None: continue - analysis_id = analysis_id[0:4] - run_analysis_name = "-".join([run_name, analysis_id]) run_analysis_name = self.clean_s_name(run_analysis_name, f) # skip run if in user provider ignore list @@ -674,37 +678,17 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: if not directory: continue - # Get RunName and RunID from RunParameters.json + # Get RunName and RunID from RunStats.json run_stats_path = Path(directory) / "RunStats.json" - if not run_stats_path.exists(): - log.error( - f"RunStats.json does not exist in the Bases2Fastq output directory {directory}.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_stats = self._read_json_file(run_stats_path) + if run_stats is None: continue - run_analysis_name = None total_polonies = 0 - try: - with open(run_stats_path) as _infile: - run_stats = json.load(_infile) - except (json.JSONDecodeError, OSError) as e: - log.error(f"Error reading {run_stats_path}: {e}") - continue # Get run name information - run_name = run_stats.get("RunName", None) - analysis_id = run_stats.get("AnalysisID", None) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - f"Error with RunStats.json. Either RunName or AnalysisID is absent.\n" - f"File: {run_stats_path}, RunName: {run_name}, AnalysisID: {analysis_id}\n" - f"Please visit Elembio online documentation for more information - " - f"https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_analysis_name = self._extract_run_analysis_name(run_stats, source_info=str(run_stats_path)) + if run_analysis_name is None: continue # skip run if in user provider ignore list @@ -807,31 +791,16 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] if not directory: continue - # Get RunName and RunID from RunParameters.json + # Get RunManifest.json path for later use run_manifest = Path(directory) / "../../RunManifest.json" - if not run_manifest.exists(): - log.error( - f"RunManifest.json could not be found in {run_manifest}. Skipping index assignment.\n" - "Please visit Elembio online documentation for more information - " - "https://docs.elembio.io/docs/bases2fastq/introduction/" - ) - continue project_stats = json.loads(f["f"]) - run_analysis_name = None - run_name = project_stats.get("RunName", None) - analysis_id = project_stats.get("AnalysisID", None) project = self.clean_s_name(project_stats.get("Project", "DefaultProject"), f) - if run_name and analysis_id: - run_analysis_name = "-".join([run_name, analysis_id[0:4]]) - else: - log.error( - f"Error with project's RunStats.json. Either RunName or AnalysisID is absent.\n" - f"File: {f['fn']}, RunName: {run_name}, AnalysisID: {analysis_id}\n" - f"Please visit Elembio online documentation for more information - " - f"https://docs.elembio.io/docs/bases2fastq/introduction/" - ) + run_analysis_name = self._extract_run_analysis_name( + project_stats, source_info=f"project RunStats.json ({f['fn']})" + ) + if run_analysis_name is None: continue # skip run if in user provider ignore list @@ -889,12 +858,8 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] sample_data["SamplePolonyCounts"] / total_polonies * 100, 2 ) - run_manifest_data = None - try: - with open(run_manifest) as _infile: - run_manifest_data = json.load(_infile) - except (json.JSONDecodeError, OSError) as e: - log.error(f"Error reading {run_manifest}: {e}") + run_manifest_data = self._read_json_file(run_manifest) + if run_manifest_data is None: continue if "Samples" not in run_manifest_data: From e2f347e3b51de5d3288a2e5ad3f9495e6c39e6ed Mon Sep 17 00:00:00 2001 From: Carlos Ruiz Date: Wed, 17 Dec 2025 19:46:09 -0800 Subject: [PATCH 3/4] Added documentation and better path handling --- multiqc/modules/bases2fastq/bases2fastq.py | 249 ++++++++++++++++++--- 1 file changed, 216 insertions(+), 33 deletions(-) diff --git a/multiqc/modules/bases2fastq/bases2fastq.py b/multiqc/modules/bases2fastq/bases2fastq.py index f884565e69..a986dcf1e6 100644 --- a/multiqc/modules/bases2fastq/bases2fastq.py +++ b/multiqc/modules/bases2fastq/bases2fastq.py @@ -69,20 +69,82 @@ class MultiqcModule(BaseMultiqcModule): Bases2Fastq is Element Biosciences' secondary analysis software for demultiplexing sequencing data from AVITI systems and converting base calls into FASTQ files. - The module parses the following output files from Bases2Fastq: - - - `RunStats.json`: Contains run-level and sample-level QC metrics - - `RunManifest.json`: Contains sample sheet information including indexing and adapter settings - - Project-level `RunStats.json`: Contains project-specific metrics when demultiplexing by project - - The module supports both run-level analysis (single run) and project-level analysis - (aggregated metrics across projects), displaying metrics such as: - + Data Flow Overview + ------------------ + The module handles three distinct data hierarchy levels: + + 1. **Run Level**: Single sequencing run with all samples in one output + - Directory: `/` + - Files: `RunStats.json`, `RunManifest.json` + - Samples identified by: `{RunName}-{AnalysisID}__{SampleName}` + + 2. **Project Level**: Demultiplexing by project, samples split into project subdirectories + - Directory: `/Samples//` + - Files: Project-specific `RunStats.json` + - Run-level `RunManifest.json` accessed via `../../RunManifest.json` + - Samples identified by: `{RunName}-{AnalysisID}__{SampleName}` + + 3. **Combined Level**: Both run and project data present (merged view) + + Parsing Flow + ------------ + ``` + __init__() + │ + ├─> _init_data_structures() # Initialize empty dicts for all data levels + │ + ├─> _parse_and_validate_data() # Main parsing entry point + │ │ + │ ├─> _parse_run_project_data("bases2fastq/run") # Parse run-level RunStats.json + │ │ └─> Populates: run_level_data, run_level_samples, run_level_samples_to_project + │ │ + │ ├─> _parse_run_project_data("bases2fastq/project") # Parse project-level RunStats.json + │ │ └─> Populates: project_level_data, project_level_samples, project_level_samples_to_project + │ │ + │ └─> _determine_summary_path() # Returns: "run_level" | "project_level" | "combined_level" + │ + ├─> _select_data_by_summary_path() # Route to appropriate data sources + │ │ + │ ├─> _parse_run_manifest() or _parse_run_manifest_in_project() + │ │ └─> Returns: manifest_data (lane settings, adapter info) + │ │ + │ ├─> _parse_index_assignment() or _parse_index_assignment_in_project() + │ │ └─> Returns: index_assignment_data (per-sample index stats) + │ │ + │ └─> _parse_run_unassigned_sequences() (run_level only) + │ └─> Returns: unassigned_sequences (unknown barcodes) + │ + ├─> _setup_colors() # Assign colors to runs/projects/samples + │ + └─> _generate_plots() # Create all report sections and plots + ``` + + Data Structures + --------------- + - `run_level_data`: Dict[run_name, run_stats] - Run-level QC metrics + - `run_level_samples`: Dict[sample_id, sample_stats] - Sample metrics from run-level + - `project_level_data`: Dict[project_name, project_stats] - Project-level QC metrics + - `project_level_samples`: Dict[sample_id, sample_stats] - Sample metrics from project-level + - `*_samples_to_project`: Dict[sample_id, project_name] - Maps samples to their projects + + Sample Naming Convention + ------------------------ + Samples are uniquely identified as: `{RunName}-{AnalysisID[0:4]}__{SampleName}` + This ensures uniqueness across multiple runs while keeping names readable. + + Files Parsed + ------------ + - `RunStats.json`: Run/project QC metrics, sample statistics, lane data + - `RunManifest.json`: Sample sheet info, index sequences, adapter settings + + Metrics Displayed + ----------------- - Polony counts and yields - - Base quality distributions + - Base quality distributions (histogram and by-cycle) - Index assignment statistics - Per-sample sequence content and GC distribution - Adapter content analysis + - Unassigned/unknown barcode sequences (run-level only) """ def __init__(self): @@ -121,41 +183,85 @@ def __init__(self): self.write_data_file(sample_data, "bases2fastq") def _init_data_structures(self) -> None: - """Initialize all data structures used by the module.""" + """ + Initialize all data structures used by the module. + + Data structures are organized by hierarchy level: + - Run level: Data from single-run Bases2Fastq output (no project splitting) + - Project level: Data from project-split Bases2Fastq output + - Combined: Merged data when both levels are present + """ # File cache to avoid reading the same JSON files multiple times + # Key: resolved file path, Value: parsed JSON data self._file_cache: Dict[str, Any] = {} - # Run, project and sample level structures - self.run_level_data: Dict[str, Any] = {} - self.run_level_samples: Dict[str, Any] = {} - self.run_level_samples_to_project: Dict[str, str] = {} - self.project_level_data: Dict[str, Any] = {} - self.project_level_samples: Dict[str, Any] = {} - self.project_level_samples_to_project: Dict[str, str] = {} + # === Run-level data structures === + # Populated from /RunStats.json + self.run_level_data: Dict[str, Any] = {} # run_name -> full run stats + self.run_level_samples: Dict[str, Any] = {} # sample_id -> sample stats + self.run_level_samples_to_project: Dict[str, str] = {} # sample_id -> project name + + # === Project-level data structures === + # Populated from /Samples//RunStats.json + self.project_level_data: Dict[str, Any] = {} # project_name -> project stats + self.project_level_samples: Dict[str, Any] = {} # sample_id -> sample stats + self.project_level_samples_to_project: Dict[str, str] = {} # sample_id -> project name - # Run and project groups - self.group_dict: Dict[str, Any] = {} - self.group_lookup_dict: Dict[str, Any] = {} - self.project_lookup_dict: Dict[str, Any] = {} + # === Grouping structures for color assignment === + self.group_dict: Dict[str, Any] = {} # group_name -> list of members + self.group_lookup_dict: Dict[str, Any] = {} # item -> group it belongs to + self.project_lookup_dict: Dict[str, Any] = {} # sample -> project mapping - # Additional data structures + # === Legacy/auxiliary data structures === self.b2f_sample_data: Dict[str, Any] = {} self.b2f_run_data: Dict[str, Any] = {} self.b2f_run_project_data: Dict[str, Any] = {} self.b2f_run_project_sample_data: Dict[str, Any] = {} - self.missing_runs: set = set() - self.sample_id_to_run: Dict[str, str] = {} + self.missing_runs: set = set() # Runs referenced but not found + self.sample_id_to_run: Dict[str, str] = {} # sample_id -> run_analysis_name - def _read_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]: + def _validate_path(self, file_path: Path, base_directory: Path) -> bool: + """ + Validate that a file path doesn't escape outside the expected directory hierarchy. + + Args: + file_path: Path to validate + base_directory: The base directory that the path should stay within + + Returns: + True if path is valid, False if it escapes the base directory + """ + try: + resolved_path = file_path.resolve() + resolved_base = base_directory.resolve() + # Check if the resolved path is within the base directory tree + resolved_path.relative_to(resolved_base) + return True + except ValueError: + # relative_to raises ValueError if path is not relative to base + log.warning( + f"Path {file_path} resolves outside expected directory {base_directory}. " + f"Skipping for security reasons." + ) + return False + + def _read_json_file( + self, file_path: Path, base_directory: Optional[Path] = None + ) -> Optional[Dict[str, Any]]: """ Read and parse a JSON file with caching. Args: file_path: Path to the JSON file + base_directory: Optional base directory to validate path against Returns: Parsed JSON data or None if reading failed """ + # Validate path doesn't escape expected directory if base is provided + if base_directory is not None and not self._validate_path(file_path, base_directory): + return None + cache_key = str(file_path.resolve()) if cache_key in self._file_cache: @@ -426,6 +532,24 @@ def _extract_run_analysis_name( return f"{run_name}-{analysis_id[0:4]}" def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: + """ + Parse RunStats.json files to extract run/project and sample-level data. + + This is the primary parsing method that populates the core data structures. + It handles both run-level and project-level RunStats.json files. + + Args: + data_source: Search pattern key ("bases2fastq/run" or "bases2fastq/project") + + Returns: + List containing: + - runs_global_data: Dict[run_name, run_stats] - Run/project level metrics + - runs_sample_data: Dict[sample_id, sample_stats] - Per-sample metrics + - sample_to_project: Dict[sample_id, project_name] - Sample-to-project mapping + + Data Flow: + RunStats.json -> parse -> filter samples by min_polonies -> populate dicts + """ runs_global_data = {} runs_sample_data = {} sample_to_project = {} @@ -488,6 +612,20 @@ def _parse_run_project_data(self, data_source: str) -> List[Dict[str, Any]]: return [runs_global_data, runs_sample_data, sample_to_project] def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]: + """ + Parse RunManifest.json for run-level analysis to extract lane and adapter settings. + + Data Flow: + RunManifest.json (via data_source pattern) + + RunStats.json (for run name) from same directory + -> Extract per-lane: index masks, adapter settings, trim lengths + + Args: + data_source: Search pattern key for RunManifest.json files + + Returns: + Dict[run_lane, settings] where run_lane = "{run_name} | L{lane_id}" + """ runs_manifest_data = {} if data_source == "": @@ -548,6 +686,17 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]: return runs_manifest_data def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: + """ + Parse RunManifest.json for project-level analysis. + + Similar to _parse_run_manifest but navigates up from project directories + to find the run-level RunManifest.json (via ../../RunManifest.json). + + Data Flow: + Project RunStats.json (for run name) + + ../../RunManifest.json (run-level manifest) + -> Extract per-lane settings + """ project_manifest_data = {} if data_source == "": @@ -558,8 +707,9 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: if not directory: continue - # Get RunName and RunID from RunParameters.json - run_manifest = Path(directory) / "../../RunManifest.json" + # Get RunManifest.json from run output root (two levels up from project directory) + base_directory = Path(directory).parent.parent + run_manifest = base_directory / "RunManifest.json" project_stats = json.loads(f["f"]) run_analysis_name = self._extract_run_analysis_name( project_stats, source_info=f"project RunStats.json ({f['fn']})" @@ -572,7 +722,7 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: log.info(f"Skipping <{run_analysis_name}> because it is present in ignore list.") continue - run_manifest_data = self._read_json_file(run_manifest) + run_manifest_data = self._read_json_file(run_manifest, base_directory=base_directory) if run_manifest_data is None: continue @@ -619,6 +769,16 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]: return project_manifest_data def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: + """ + Parse unassigned/unknown barcode sequences from run-level data. + + Only available for run-level analysis. Extracts sequences that could not + be assigned to any sample, useful for troubleshooting index issues. + + Data Flow: + RunStats.json -> Lanes -> UnassignedSequences + -> Extract: sequence, count, percentage of total polonies + """ run_unassigned_sequences = {} if data_source == "": return run_unassigned_sequences @@ -668,6 +828,17 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[str, Any]: return run_unassigned_sequences def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: + """ + Parse index assignment statistics for run-level analysis. + + Combines data from RunStats.json (polony counts) and RunManifest.json + (index sequences) to show how well each sample's index performed. + + Data Flow: + RunStats.json -> SampleStats -> per-sample polony counts + + RunManifest.json -> Samples -> index sequences (Index1, Index2) + -> Combined index assignment table + """ sample_to_index_assignment = {} if manifest_data_source == "": @@ -781,6 +952,17 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: return sample_to_index_assignment def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any]: + """ + Parse index assignment statistics for project-level analysis. + + Similar to _parse_index_assignment but works with project-split output, + navigating up to find the run-level RunManifest.json. + + Data Flow: + Project RunStats.json -> SampleStats -> polony counts + + ../../RunManifest.json -> Samples -> index sequences + -> Combined index assignment table + """ sample_to_index_assignment = {} if data_source == "": @@ -791,8 +973,9 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] if not directory: continue - # Get RunManifest.json path for later use - run_manifest = Path(directory) / "../../RunManifest.json" + # Get RunManifest.json from run output root (two levels up from project directory) + base_directory = Path(directory).parent.parent + run_manifest = base_directory / "RunManifest.json" project_stats = json.loads(f["f"]) project = self.clean_s_name(project_stats.get("Project", "DefaultProject"), f) @@ -858,13 +1041,13 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] sample_data["SamplePolonyCounts"] / total_polonies * 100, 2 ) - run_manifest_data = self._read_json_file(run_manifest) + run_manifest_data = self._read_json_file(run_manifest, base_directory=base_directory) if run_manifest_data is None: continue if "Samples" not in run_manifest_data: log.warning( - f" section not found in {directory}/RunManifest.json.\n" + f" section not found in {run_manifest}.\n" f"Skipping RunManifest sample index assignment metrics." ) elif len(sample_to_index_assignment) == 0: From 16cdc326f8ec181c2d18e0fed549943153a968e7 Mon Sep 17 00:00:00 2001 From: Carlos Ruiz Date: Wed, 17 Dec 2025 19:54:33 -0800 Subject: [PATCH 4/4] Linting --- multiqc/modules/bases2fastq/bases2fastq.py | 80 ++++++++++++---------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/multiqc/modules/bases2fastq/bases2fastq.py b/multiqc/modules/bases2fastq/bases2fastq.py index a986dcf1e6..3c8aae9d18 100644 --- a/multiqc/modules/bases2fastq/bases2fastq.py +++ b/multiqc/modules/bases2fastq/bases2fastq.py @@ -5,7 +5,7 @@ import json import logging import random -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Tuple import uuid from pathlib import Path @@ -175,8 +175,13 @@ def __init__(self): # Generate all plots and sections self._generate_plots( - summary_path, run_data, sample_data, samples_to_projects, - manifest_data, index_assignment_data, unassigned_sequences + summary_path, + run_data, + sample_data, + samples_to_projects, + manifest_data, + index_assignment_data, + unassigned_sequences, ) # Write main data file at the very end after all sections are added @@ -240,14 +245,11 @@ def _validate_path(self, file_path: Path, base_directory: Path) -> bool: except ValueError: # relative_to raises ValueError if path is not relative to base log.warning( - f"Path {file_path} resolves outside expected directory {base_directory}. " - f"Skipping for security reasons." + f"Path {file_path} resolves outside expected directory {base_directory}. Skipping for security reasons." ) return False - def _read_json_file( - self, file_path: Path, base_directory: Optional[Path] = None - ) -> Optional[Dict[str, Any]]: + def _read_json_file(self, file_path: Path, base_directory: Optional[Path] = None) -> Optional[Dict[str, Any]]: """ Read and parse a JSON file with caching. @@ -315,12 +317,14 @@ def _parse_and_validate_data(self) -> str: num_project_level_samples = len(self.project_level_samples) # Ensure at least some data was found - if all([ - len(self.run_level_data) == 0, - num_run_level_samples == 0, - len(self.project_level_data) == 0, - num_project_level_samples == 0, - ]): + if all( + [ + len(self.run_level_data) == 0, + num_run_level_samples == 0, + len(self.project_level_data) == 0, + num_project_level_samples == 0, + ] + ): error_msg = "No run-, project- or sample-level data found" log.error(error_msg) raise ModuleNoSamplesFound(error_msg) @@ -368,7 +372,9 @@ def _determine_summary_path(self) -> str: log.error(error_msg) raise ModuleNoSamplesFound(error_msg) - def _select_data_by_summary_path(self, summary_path: str): + def _select_data_by_summary_path( + self, summary_path: str + ) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, str], Dict[str, Any], Dict[str, Any], Dict[str, Any]]: """ Select the appropriate data sources based on the summary path. @@ -408,7 +414,9 @@ def _select_data_by_summary_path(self, summary_path: str): log.error(error_msg) raise ModuleNoSamplesFound(error_msg) - def _setup_colors(self, sample_data: Dict, samples_to_projects: Dict, summary_path: str) -> None: + def _setup_colors( + self, sample_data: Dict[str, Any], samples_to_projects: Dict[str, str], summary_path: str + ) -> None: """Set up color schemes for groups and samples.""" # Create run and project groups run_groups: Dict[str, List] = defaultdict(list) @@ -429,10 +437,12 @@ def _setup_colors(self, sample_data: Dict, samples_to_projects: Dict, summary_pa # Build color palette self.color_getter = mqc_colour.mqc_colour_scale() - self.palette = list(chain.from_iterable( - self.color_getter.get_colours(hue) - for hue in ["Set2", "Pastel1", "Accent", "Set1", "Set3", "Dark2", "Paired", "Pastel2"] - )) + self.palette = list( + chain.from_iterable( + self.color_getter.get_colours(hue) + for hue in ["Set2", "Pastel1", "Accent", "Set1", "Set3", "Dark2", "Paired", "Pastel2"] + ) + ) # Add extra colors if needed if len(merged_groups) > len(self.palette): @@ -443,7 +453,7 @@ def _setup_colors(self, sample_data: Dict, samples_to_projects: Dict, summary_pa # Assign colors to groups self.group_color = { - group: color for group, color in zip(merged_groups.keys(), self.palette[:len(merged_groups)]) + group: color for group, color in zip(merged_groups.keys(), self.palette[: len(merged_groups)]) } # Assign colors to samples @@ -457,17 +467,17 @@ def _setup_colors(self, sample_data: Dict, samples_to_projects: Dict, summary_pa # Copy group colors to run colors self.run_color = copy.deepcopy(self.group_color) - self.palette = self.palette[len(merged_groups):] + self.palette = self.palette[len(merged_groups) :] def _generate_plots( self, summary_path: str, - run_data: Dict, - sample_data: Dict, - samples_to_projects: Dict, - manifest_data: Dict, - index_assignment_data: Dict, - unassigned_sequences: Dict, + run_data: Dict[str, Any], + sample_data: Dict[str, Any], + samples_to_projects: Dict[str, str], + manifest_data: Dict[str, Any], + index_assignment_data: Dict[str, Any], + unassigned_sequences: Dict[str, Any], ) -> None: """Generate all plots and add sections to the report.""" # QC metrics table @@ -499,7 +509,7 @@ def _generate_plots( project_lookup=samples_to_projects, ) - def get_uuid(self): + def get_uuid(self) -> str: return str(uuid.uuid4()).replace("-", "").lower() def _extract_run_analysis_name( @@ -893,9 +903,7 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]: sample_expected_seq = occurrence.get("ExpectedSequence") sample_counts = occurrence.get("NumPoloniesBeforeTrimming") if any([element is None for element in [sample_expected_seq, sample_counts, sample_id]]): - log.error( - f"Missing data needed to extract index assignment for sample {sample_id}. Skipping." - ) + log.error(f"Missing data needed to extract index assignment for sample {sample_id}. Skipping.") continue if run_analysis_name not in sample_to_index_assignment: sample_to_index_assignment[run_analysis_name] = {} @@ -1077,14 +1085,16 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any] return sample_to_index_assignment - def add_run_plots(self, data, plot_functions): + def add_run_plots(self, data: Dict[str, Any], plot_functions: List[Callable]) -> None: for func in plot_functions: plot_html, plot_name, anchor, description, helptext, plot_data = func(data, self.run_color) self.add_section(name=plot_name, plot=plot_html, anchor=anchor, description=description, helptext=helptext) self.write_data_file(plot_data, f"base2fastq:{plot_name}") - def add_sample_plots(self, data, group_lookup, project_lookup): - plot_functions = [ + def add_sample_plots( + self, data: Dict[str, Any], group_lookup: Dict[str, str], project_lookup: Dict[str, str] + ) -> None: + plot_functions: List[Callable] = [ tabulate_sample_stats, sequence_content_plot, plot_per_cycle_N_content,