Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 85 additions & 50 deletions multiqc/modules/bases2fastq/bases2fastq.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
from collections import defaultdict
import copy
from itertools import chain
import re
import json
import logging
import random
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import re
import uuid
from collections import defaultdict
from itertools import chain
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple

from natsort import natsorted

from multiqc import config
from multiqc.base_module import BaseMultiqcModule, ModuleNoSamplesFound
from multiqc.types import LoadedFileDict
from multiqc.utils import mqc_colour

from multiqc.modules.bases2fastq.plot_runs import (
plot_base_quality_by_cycle,
plot_base_quality_hist,
plot_run_stats,
tabulate_manifest_stats,
tabulate_index_assignment_stats,
tabulate_unassigned_index_stats,
tabulate_run_stats,
tabulate_manifest_stats,
tabulate_project_stats,
plot_base_quality_hist,
plot_base_quality_by_cycle,
tabulate_run_stats,
tabulate_unassigned_index_stats,
)
from multiqc.modules.bases2fastq.plot_samples import (
tabulate_sample_stats,
sequence_content_plot,
plot_per_cycle_N_content,
plot_adapter_content,
plot_per_cycle_N_content,
plot_per_read_gc_hist,
sequence_content_plot,
tabulate_sample_stats,
)
from multiqc.types import LoadedFileDict
from multiqc.utils import mqc_colour

log = logging.getLogger(__name__)

ELEMBIO_DOCS_URL = "https://docs.elembio.io/docs/bases2fastq/introduction/"

# Default minimum polony threshold - samples below this are skipped
DEFAULT_MIN_POLONIES = 1000
Expand Down Expand Up @@ -151,7 +153,7 @@ def __init__(self):
super(MultiqcModule, self).__init__(
name="Bases2Fastq",
anchor="bases2fastq",
href="https://docs.elembio.io/docs/bases2fastq/introduction/",
href=ELEMBIO_DOCS_URL,
info="Demultiplexes and converts Element AVITI base calls into FASTQ files",
doi="10.1038/s41587-023-01750-7",
)
Expand Down Expand Up @@ -265,7 +267,7 @@ def _read_json_file(self, file_path: Path, base_directory: Optional[Path] = None
log.error(
f"{file_path.name} does not exist at {file_path}.\n"
f"Please visit Elembio online documentation for more information - "
f"https://docs.elembio.io/docs/bases2fastq/introduction/"
f"{ELEMBIO_DOCS_URL}"
)
return None

Expand All @@ -285,9 +287,12 @@ def _parse_and_validate_data(self) -> str:
Returns:
summary_path: The determined summary path ('run_level', 'project_level', or 'combined_level')
"""
# Collect log files once per pattern (find_log_files returns a generator)
run_level_log_files = list(self.find_log_files("bases2fastq/run"))
project_level_log_files = list(self.find_log_files("bases2fastq/project"))
# Collect log files once per pattern (find_log_files returns a generator).
# Stored as instance vars so downstream parsers can reuse them.
self._run_level_log_files = list(self.find_log_files("bases2fastq/run"))
self._project_level_log_files = list(self.find_log_files("bases2fastq/project"))
run_level_log_files = self._run_level_log_files
project_level_log_files = self._project_level_log_files

if len(run_level_log_files) == 0 and len(project_level_log_files) == 0:
error_msg = "No run- or project-level log files found within the Bases2Fastq results."
Expand Down Expand Up @@ -377,31 +382,35 @@ def _select_data_by_summary_path(
index_assignment_data, unassigned_sequences)
"""
if summary_path == "run_level":
manifest_log_files = list(self.find_log_files("bases2fastq/manifest"))
return (
self.run_level_data,
self.run_level_samples,
self.run_level_samples_to_project,
self._parse_run_manifest("bases2fastq/manifest"),
self._parse_index_assignment("bases2fastq/manifest"),
self._parse_run_unassigned_sequences("bases2fastq/run"),
self._parse_run_manifest("bases2fastq/manifest", log_files=manifest_log_files),
self._parse_index_assignment("bases2fastq/manifest", log_files=manifest_log_files),
self._parse_run_unassigned_sequences("bases2fastq/run", log_files=self._run_level_log_files),
)
elif summary_path == "project_level":
return (
self.project_level_data,
self.project_level_samples,
self.project_level_samples_to_project,
self._parse_run_manifest_in_project("bases2fastq/project"),
self._parse_index_assignment_in_project("bases2fastq/project"),
self._parse_run_manifest_in_project("bases2fastq/project", log_files=self._project_level_log_files),
self._parse_index_assignment_in_project("bases2fastq/project", log_files=self._project_level_log_files),
{}, # No unassigned sequences for project level
)
elif summary_path == "combined_level":
# Use run-level stats for the run table (more complete), but
# project-level samples for per-sample plots (properly split by project).
manifest_log_files = list(self.find_log_files("bases2fastq/manifest"))
return (
self.run_level_data,
self.project_level_samples,
self.project_level_samples_to_project,
self._parse_run_manifest("bases2fastq/manifest"),
self._parse_index_assignment("bases2fastq/manifest"),
self._parse_run_unassigned_sequences("bases2fastq/run"),
self._parse_run_manifest("bases2fastq/manifest", log_files=manifest_log_files),
self._parse_index_assignment("bases2fastq/manifest", log_files=manifest_log_files),
self._parse_run_unassigned_sequences("bases2fastq/run", log_files=self._run_level_log_files),
)
else:
error_msg = "No run- or project-level data was retained. No report will be generated."
Expand All @@ -418,7 +427,7 @@ def _setup_colors(
ind_sample_groups: Dict[str, List] = defaultdict(list)

for sample in natsorted(sample_data.keys()):
run_name, _ = sample.split("__")
run_name, _ = sample.split("__", maxsplit=1)
run_groups[run_name].append(sample)
sample_project = samples_to_projects.get(sample, "DefaultProject")
project_groups[sample_project].append(sample)
Expand Down Expand Up @@ -526,7 +535,7 @@ def _extract_run_analysis_name(
f"Error with {source_info}. Either RunName or AnalysisID is absent.\n"
f"RunName: {run_name}, AnalysisID: {analysis_id}\n"
f"Please visit Elembio online documentation for more information - "
f"https://docs.elembio.io/docs/bases2fastq/introduction/"
f"{ELEMBIO_DOCS_URL}"
)
return None

Expand Down Expand Up @@ -659,7 +668,9 @@ def _extract_manifest_lane_settings(
result[run_lane]["R2AdapterMinimumTrimmedLength"] = lane_data.get("R2AdapterMinimumTrimmedLength", "N/A")
return result

def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]:
def _parse_run_manifest(
self, data_source: str, log_files: Optional[List[LoadedFileDict[Any]]] = None
) -> Dict[str, Any]:
"""
Parse RunManifest.json for run-level analysis to extract lane and adapter settings.

Expand All @@ -670,6 +681,7 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]:

Args:
data_source: Search pattern key for RunManifest.json files
log_files: Optional pre-collected list of file dicts from find_log_files.

Returns:
Dict[run_lane, settings] where run_lane = "{run_name} | L{lane_id}"
Expand All @@ -679,7 +691,8 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]:
if data_source == "":
return runs_manifest_data

for f in self.find_log_files(data_source):
files_to_process = log_files if log_files is not None else list(self.find_log_files(data_source))
for f in files_to_process:
directory = f.get("root")
if not directory:
continue
Expand All @@ -706,7 +719,9 @@ def _parse_run_manifest(self, data_source: str) -> Dict[str, Any]:

return runs_manifest_data

def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]:
def _parse_run_manifest_in_project(
self, data_source: str, log_files: Optional[List[LoadedFileDict[Any]]] = None
) -> Dict[str, Any]:
"""
Parse RunManifest.json for project-level analysis.

Expand All @@ -723,16 +738,17 @@ def _parse_run_manifest_in_project(self, data_source: str) -> Dict[str, Any]:
if data_source == "":
return project_manifest_data

for f in self.find_log_files(data_source):
files_to_process = log_files if log_files is not None else list(self.find_log_files(data_source))
for f in files_to_process:
directory = f.get("root")
if not directory:
continue

# Get RunManifest.json from run output root (check if it exists in the same directory or try two levels up)
# Resolve base_directory to the run output root (not the project subdirectory),
# since RunManifest.json lives at the run root. Path validation in _read_json_file
# will check the manifest path against this run root directory.
base_directory = Path(directory).resolve()
if (base_directory / "RunManifest.json").exists():
base_directory = base_directory
else:
if not (base_directory / "RunManifest.json").exists():
base_directory = base_directory.parent.parent
run_manifest = base_directory / "RunManifest.json"
project_stats = json.loads(f["f"])
Expand Down Expand Up @@ -843,7 +859,9 @@ def _merge_manifest_index_sequences(
run_data[merged_indices]["Index1"] = index_1
run_data[merged_indices]["Index2"] = index_2

def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[int, Dict[str, Any]]:
def _parse_run_unassigned_sequences(
self, data_source: str, log_files: Optional[List[LoadedFileDict[Any]]] = None
) -> Dict[int, Dict[str, Any]]:
"""
Parse unassigned/unknown barcode sequences from run-level data.

Expand All @@ -858,7 +876,8 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[int, Dict[st
if data_source == "":
return run_unassigned_sequences

for f in self.find_log_files(data_source):
files_to_process = log_files if log_files is not None else list(self.find_log_files(data_source))
for f in files_to_process:
data = json.loads(f["f"])

# Get RunName and AnalysisID
Expand Down Expand Up @@ -902,7 +921,9 @@ def _parse_run_unassigned_sequences(self, data_source: str) -> Dict[int, Dict[st

return run_unassigned_sequences

def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]:
def _parse_index_assignment(
self, manifest_data_source: str, log_files: Optional[List[LoadedFileDict[Any]]] = None
) -> Dict[str, Any]:
"""
Parse index assignment statistics for run-level analysis.

Expand All @@ -919,7 +940,8 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]:
if manifest_data_source == "":
return sample_to_index_assignment

for f in self.find_log_files(manifest_data_source):
files_to_process = log_files if log_files is not None else list(self.find_log_files(manifest_data_source))
for f in files_to_process:
directory = f.get("root")
if not directory:
continue
Expand All @@ -944,7 +966,7 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]:
f"Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n"
f"Available keys: {list(run_stats.keys())}\n"
f"Please visit Elembio online documentation for more information - "
f"https://docs.elembio.io/docs/bases2fastq/introduction/"
f"{ELEMBIO_DOCS_URL}"
)
continue

Expand All @@ -964,7 +986,9 @@ def _parse_index_assignment(self, manifest_data_source: str) -> Dict[str, Any]:

return sample_to_index_assignment

def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any]:
def _parse_index_assignment_in_project(
self, data_source: str, log_files: Optional[List[LoadedFileDict[Any]]] = None
) -> Dict[str, Any]:
"""
Parse index assignment statistics for project-level analysis.

Expand All @@ -981,7 +1005,8 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any]
if data_source == "":
return sample_to_index_assignment

for f in self.find_log_files(data_source):
files_to_process = log_files if log_files is not None else list(self.find_log_files(data_source))
for f in files_to_process:
directory = f.get("root")
if not directory:
continue
Expand Down Expand Up @@ -1009,7 +1034,7 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any]
f"Error, missing SampleStats in RunStats.json. Skipping index assignment metrics.\n"
f"Available keys: {list(project_stats.keys())}\n"
f"Please visit Elembio online documentation for more information - "
f"https://docs.elembio.io/docs/bases2fastq/introduction/"
f"{ELEMBIO_DOCS_URL}"
)
continue

Expand All @@ -1033,14 +1058,21 @@ def _parse_index_assignment_in_project(self, data_source: str) -> Dict[str, Any]
return sample_to_index_assignment

def add_run_plots(self, data: Dict[Any, Any], plot_functions: List[Callable]) -> None:
if not data:
return
for func in plot_functions:
plot_html, plot_name, anchor, description, helptext, plot_data = func(data, self.run_color)
self.add_section(name=plot_name, plot=plot_html, anchor=anchor, description=description, helptext=helptext)
self.write_data_file(plot_data, f"base2fastq:{plot_name}")
if plot_html is not None:
self.add_section(
name=plot_name, plot=plot_html, anchor=anchor, description=description, helptext=helptext
)
self.write_data_file(plot_data, f"base2fastq:{plot_name}")

def add_sample_plots(
self, data: Dict[str, Any], group_lookup: Dict[str, str], project_lookup: Dict[str, str]
) -> None:
if not data:
return
plot_functions: List[Callable] = [
tabulate_sample_stats,
sequence_content_plot,
Expand All @@ -1052,5 +1084,8 @@ def add_sample_plots(
plot_html, plot_name, anchor, description, helptext, plot_data = func(
data, group_lookup, project_lookup, self.sample_color
)
self.add_section(name=plot_name, plot=plot_html, anchor=anchor, description=description, helptext=helptext)
self.write_data_file(plot_data, f"base2fastq:{plot_name}")
if plot_html is not None:
self.add_section(
name=plot_name, plot=plot_html, anchor=anchor, description=description, helptext=helptext
)
self.write_data_file(plot_data, f"base2fastq:{plot_name}")
6 changes: 3 additions & 3 deletions multiqc/modules/bases2fastq/plot_runs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math
from typing import Any, Dict, cast
from typing import Any, Dict, List, cast

from multiqc.plots import bargraph, linegraph, table
from multiqc.plots.table_object import ColumnDict, SectionT
Expand Down Expand Up @@ -429,7 +429,7 @@ def tabulate_index_assignment_stats(run_data, color_dict):
if "Project" in sample_data:
sample_index_stats.update({"project": sample_data["Project"]})
project_present = True
sample_index_stats.update({"sample_name": sample_data["SampleID"].split("__")[1]})
sample_index_stats.update({"sample_name": sample_data["SampleID"].split("__", maxsplit=1)[1]})
sample_index_stats.update({"index_1": sample_data["Index1"]})
sample_index_stats.update({"index_2": sample_data["Index2"]})
sample_index_stats.update({"assigned_polonies": sample_data["SamplePolonyCounts"]})
Expand Down Expand Up @@ -655,7 +655,7 @@ def plot_base_quality_by_cycle(run_data, color_dict):
# Prepare plot data for median BQ of each cycle (skip runs without Reads/Cycles)
runs_with_reads = [s for s in run_data if _run_has_reads(run_data[s]) and run_data[s]["Reads"][0].get("Cycles")]
if not runs_with_reads:
plot_content: list[Any] = []
plot_content: List[Any] = []
plot_html = linegraph.plot(
plot_content,
pconfig={"id": "bases2fastq_run_bq_by_cycle", "title": "bases2fastq: Run Base Quality by Cycle"},
Expand Down
Loading