diff --git a/config_template/template_neat_config.yml b/config_template/template_neat_config.yml index 4bdc8fb1..a0db4968 100644 --- a/config_template/template_neat_config.yml +++ b/config_template/template_neat_config.yml @@ -133,8 +133,7 @@ rng_seed: . # type: int | required = no min_mutations: . -# Overwrite the output files, if they are named the same as the current run. -# Default is to quit if files already exist to avoid data destruction +# Overwrite output files if they already exist # type: bool | required = no | default = false overwrite_output: . @@ -153,12 +152,12 @@ parallel_block_size: . threads: . # Delete the 'splits' directory after stitching completes -# Note if threads == 1, this option has no effect. +# Note: If threads == 1, this option has no effect. # type = bool | required: no | default = true cleanup_splits: . # Reuse existing files in '/splits' and skip the split step. # The directory must contain NEAT-generated files and must be in the output directory within "splits" -# Note if threads == 1, this option has no effect. +# Note: If threads == 1, this option has no effect. # type = bool | required: no | default = False -reuse_splits: . \ No newline at end of file +reuse_splits: . diff --git a/neat/cli/commands/__init__.py b/neat/cli/commands/__init__.py index f55b4b86..a3086440 100644 --- a/neat/cli/commands/__init__.py +++ b/neat/cli/commands/__init__.py @@ -1,2 +1,3 @@ -"""Modules related to the subcommands' interfaces""" +"""Modules related to the subcommands""" + from .base import * \ No newline at end of file diff --git a/neat/cli/commands/model_qual_score.py b/neat/cli/commands/model_qual_score.py new file mode 100644 index 00000000..aea15edf --- /dev/null +++ b/neat/cli/commands/model_qual_score.py @@ -0,0 +1,87 @@ +import argparse + +from ...model_quality_score import model_qual_score_runner +from .base import BaseCommand +from .options import output_group + + +class Command(BaseCommand): + """ + Generate a quality score model (traditional or Markov) from FASTQ. + """ + + name = "model-qual-score" + description = "Generate quality score model from FASTQ (optional Markov chain)." + + def add_arguments(self, parser: argparse.ArgumentParser): + + parser.add_argument( + "-i", + dest="input_files", + metavar="FILE", + nargs="+", + required=True, + help="Input FASTQ file(s) (gzipped or plain).", + ) + + parser.add_argument( + "-q", + dest="quality_offset", + type=int, + default=33, + help="Quality score offset [33].", + ) + + parser.add_argument( + "-Q", + dest="quality_scores", + type=int, + nargs="+", + default=[42], + help="Max quality or explicit list of quality scores [42].", + ) + + parser.add_argument( + "-m", + dest="max_num", + type=int, + default=-1, + help="Max number of reads to process [-1 = all].", + ) + + parser.add_argument( + "--markov", + dest="use_markov", + action="store_true", + default=False, + help="Use Markov quality model instead of the traditional model.", + ) + + parser.add_argument( + "--overwrite", + dest="overwrite", + action="store_true", + default=False, + help="Overwrite existing output file if present.", + ) + + output_group.add_to_parser(parser) + + def execute(self, arguments: argparse.Namespace): + + if len(arguments.quality_scores) == 1: + qual_scores: int | list[int] = arguments.quality_scores[0] + + else: + qual_scores = arguments.quality_scores + + model_qual_score_runner( + files=arguments.input_files, + offset=arguments.quality_offset, + qual_scores=qual_scores, + max_reads=arguments.max_num, + overwrite=arguments.overwrite, + output_dir=arguments.output_dir, + output_prefix=arguments.prefix, + use_markov=arguments.use_markov, + ) diff --git a/neat/model_quality_score/__init__.py b/neat/model_quality_score/__init__.py new file mode 100644 index 00000000..a6b92eb9 --- /dev/null +++ b/neat/model_quality_score/__init__.py @@ -0,0 +1,9 @@ +""" +Submodule to build quality score models for NEAT that wraps NEAT’s +existing sequencing error model and traditional quality model, with the +option to construct a Markov chain–based quality model instead +""" + +__all__ = ["model_qual_score_runner"] + +from .runner import model_qual_score_runner \ No newline at end of file diff --git a/neat/model_quality_score/runner.py b/neat/model_quality_score/runner.py new file mode 100644 index 00000000..2c355c7b --- /dev/null +++ b/neat/model_quality_score/runner.py @@ -0,0 +1,197 @@ +""" +Runner for creating quality score models. + +This module implements the core logic for generating quality score models +from input FASTQ files. +""" + +import gzip +import pickle +import logging +from pathlib import Path +from typing import Iterable, List, Optional, Tuple + +import numpy as np + +from ..common import validate_input_path, validate_output_path +from ..model_sequencing_error.utils import parse_file +from ..models import SequencingErrorModel, TraditionalQualityModel +from ..models.markov_quality_model import MarkovQualityModel +from ..quality_score_modeling.markov_utils import build_markov_model + +__all__ = ["model_qual_score_runner"] + +_LOG = logging.getLogger(__name__) + + +def _prepare_quality_scores_argument( + qual_scores: int | Iterable[int | float], +) -> Tuple[List[int], Optional[List[int]]]: + """ + Returns: + - full_range_scores: required by parse_file indexing + - allowed_bins: None (no binning) or sorted unique user bins (for Markov binning) + """ + + if isinstance(qual_scores, int): + max_q = int(qual_scores) + return list(range(0, max_q + 1)), None + + bins = sorted({int(x) for x in qual_scores}) + + if not bins: + raise ValueError("quality_scores list must not be empty.") + + max_q = max(bins) + + # parse_file requires indices up to max_q (it indexes by raw Q value) + return list(range(0, max_q + 1)), bins + + +def model_qual_score_runner( + files: List[str], + offset: int, + qual_scores: int | Iterable[int | float], + max_reads: int, + use_markov: bool, + overwrite: bool, + output_dir: str, + output_prefix: str, +) -> None: + """Create and save a quality score model from FASTQ data.""" + + if len(files) > 2: + _LOG.info("Only processing the first two input files") + files = files[:2] + + # Validate input paths + for file in files: + validate_input_path(file) + + _LOG.debug("Input files: %s", ", ".join(str(x) for x in files)) + _LOG.debug("Quality offset: %d", offset) + + final_quality_scores, allowed_bins = _prepare_quality_scores_argument(qual_scores) + _LOG.debug("Quality scores range: %s", final_quality_scores) + + if allowed_bins is not None: + _LOG.debug("Markov binning enabled with bins: %s", allowed_bins) + + if max_reads in (-1, None): + num_records_to_process = float("inf") + + else: + num_records_to_process = max_reads + + # Validate output directory and file + validate_output_path(output_dir, is_file=False) + output_path = Path(output_dir) + output_file = output_path / f"{output_prefix}.p.gz" + validate_output_path(output_file, overwrite=overwrite) + + _LOG.info("Writing output to: %s", output_file) + + # Containers for per-file quality model parameters + read_parameters: List[np.ndarray] = [] + average_errors: List[float] = [] + read_length = 0 + + # Traditional model parameters (existing NEAT utility) + for idx_file, file in enumerate(files, start=1): + + _LOG.info("Reading file %d of %d", idx_file, len(files)) + + params_by_position, file_avg_error, read_length = parse_file( + file, + final_quality_scores, + num_records_to_process, + offset, + read_length, + ) + + read_parameters.append(params_by_position) + average_errors.append(file_avg_error) + + _LOG.info("Finished reading file %d", idx_file) + + if not read_parameters: + raise RuntimeError("No quality score parameters were computed. Check input FASTQ files.") + + average_error = float(np.average(average_errors)) if average_errors else 0.0 + _LOG.info("Average sequencing error across files: %f", average_error) + + # Prepare models for each input file + models: List[Tuple[SequencingErrorModel, TraditionalQualityModel, Optional[MarkovQualityModel]]] = [] + + for idx in range(len(read_parameters)): + + # Sequencing error model (always produced) + seq_err_model = SequencingErrorModel(avg_seq_error=average_error, read_length=read_length) + + # Traditional quality model (always produced) + trad_model = TraditionalQualityModel( + average_error=average_error, + quality_scores=np.array(final_quality_scores), + qual_score_probs=read_parameters[idx], + ) + + markov_model: Optional[MarkovQualityModel] = None + + # Optionally build Markov quality model + + if use_markov: + + # Position-specific transition matrices + init_dist, pos_dists, trans_dists, max_quality, train_read_length = build_markov_model( + [files[idx]], + num_records_to_process, + offset, + allowed_quality_scores=allowed_bins, + ) + + markov_model = MarkovQualityModel( + initial_distribution=init_dist, + position_distributions=pos_dists, + max_quality=max_quality, + read_length=train_read_length, + transition_distributions=trans_dists, + ) + + models.append((seq_err_model, trad_model, markov_model)) + + # Write out the models + + with gzip.open(output_file, "wb") as out_model: + + if len(models) == 1: + + seq_err1, trad1, markov1 = models[0] + pickle.dump( + { + "error_model1": seq_err1, + "error_model2": None, + "qual_score_model1": markov1 if use_markov else trad1, + "qual_score_model2": None, + }, + out_model, + ) + + elif len(models) == 2: + + (seq_err1, trad1, markov1), (seq_err2, trad2, markov2) = models + pickle.dump( + { + "error_model1": seq_err1, + "error_model2": seq_err2, + "qual_score_model1": markov1 if use_markov else trad1, + "qual_score_model2": markov2 if use_markov else trad2, + }, + out_model, + ) + + else: + + # NEAT's read simulator only understands one or two models + raise RuntimeError(f"Expected at most two quality models, but constructed {len(models)}.") + + _LOG.info("Quality score model saved to %s", output_file) diff --git a/neat/models/__init__.py b/neat/models/__init__.py index 5a63fe16..7fb80f1b 100644 --- a/neat/models/__init__.py +++ b/neat/models/__init__.py @@ -1,3 +1,4 @@ from .error_models import * from .mutation_model import * from .fragment_model import * +from .markov_quality_model import * \ No newline at end of file diff --git a/neat/models/markov_quality_model.py b/neat/models/markov_quality_model.py new file mode 100644 index 00000000..15e16dc4 --- /dev/null +++ b/neat/models/markov_quality_model.py @@ -0,0 +1,212 @@ +""" +Position-specific quality score model for NEAT. + +The model consists of an initial distribution giving the probability of observing each +quality score at the first position of a read, per-position marginals, and per-position +transition matrices. + +If transition_distributions is None, the model independently samples from +per-position marginals. +""" + +from dataclasses import dataclass +from typing import Dict, List, Optional + +import numpy as np + +__all__ = ["MarkovQualityModel"] + +@dataclass +class MarkovQualityModel: + """ + Parameters + ---------- + initial_distribution: + A mapping from integer quality scores to their observed + probabilities at position 0. + position_distributions: + A list of mappings from integer quality scores to their observed + probabilities at each position. Element ``i`` in the list + corresponds to position ``i`` in the read. + max_quality: + The maximum quality observed in the training data. Generated + qualities will be clipped to not exceed the maximum quality. + read_length: + The length of reads used during training determines how many + position-specific distributions exist. + transition_distributions: + Optional list trans[i][q_prev][q_next] = count/prob. + """ + + initial_distribution: Dict[int, float] + position_distributions: List[Dict[int, float]] + max_quality: int + read_length: int + transition_distributions: Optional[List[Dict[int, Dict[int, float]]]] = None + + def __post_init__(self) -> None: + + # Normalize the initial distribution + total_init = float(sum(self.initial_distribution.values())) + + if total_init <= 0: + raise ValueError( + "Initial distribution must have positive probability mass." + ) + + self.initial_distribution = { + int(k): float(v) / total_init for k, v in self.initial_distribution.items() + } + + # Normalize per-position marginals + norm_positions: List[Dict[int, float]] = [] + + for dist in self.position_distributions: + total = float(sum(dist.values())) + + if total <= 0: + norm_positions.append({0: 1.0}) + continue + + norm_positions.append({int(k): float(v) / total for k, v in dist.items()}) + + self.position_distributions = norm_positions + + # Ensure max_quality and read_length are integers + self.max_quality = int(self.max_quality) + self.read_length = int(self.read_length) + + # Precompute numpy arrays for fast sampling + init_keys = list(self.initial_distribution.keys()) + init_vals = [self.initial_distribution[k] for k in init_keys] + self._init_scores = np.asarray(init_keys, dtype=int) + self._init_probs = np.asarray(init_vals, dtype=float) + + self._values_by_pos: List[np.ndarray] = [] + self._probs_by_pos: List[np.ndarray] = [] + + for dist in self.position_distributions: + keys = list(dist.keys()) + vals = [dist[k] for k in keys] + self._values_by_pos.append(np.asarray(keys, dtype=int)) + self._probs_by_pos.append(np.asarray(vals, dtype=float)) + + # Normalize and cache transitions + self._has_transitions = False + self._trans_values_by_pos: List[Dict[int, np.ndarray]] = [] + self._trans_probs_by_pos: List[Dict[int, np.ndarray]] = [] + + if self.transition_distributions is not None: + if len(self.transition_distributions) not in (0, max(0, self.read_length - 1)): + raise ValueError( + "transition_distributions must have length read_length-1 " + f"(expected {max(0, self.read_length - 1)}, got {len(self.transition_distributions)})." + ) + + # Normalize each row q_prev -> distribution over q_next + self._has_transitions = len(self.transition_distributions) > 0 + + for pos_trans in self.transition_distributions: + cache_vals: Dict[int, np.ndarray] = {} + cache_probs: Dict[int, np.ndarray] = {} + + for q_prev, next_map in pos_trans.items(): + total = float(sum(next_map.values())) + if total <= 0: + continue + + keys = [] + probs = [] + + for qn, c in next_map.items(): + keys.append(int(qn)) + probs.append(float(c) / total) + + cache_vals[int(q_prev)] = np.asarray(keys, dtype=int) + cache_probs[int(q_prev)] = np.asarray(probs, dtype=float) + + self._trans_values_by_pos.append(cache_vals) + self._trans_probs_by_pos.append(cache_probs) + + @property + def quality_scores(self) -> List[int]: + """List of supported quality scores.""" + + return list(range(0, self.max_quality + 1)) + + def _position_index_for_length(self, pos: int, length: int) -> int: + """Map a position in a generated read onto a training position.""" + + if length <= 1 or self.read_length <= 1: + return 0 + + idx = int(round((pos / max(1, length - 1)) * (self.read_length - 1))) + + if idx < 0: + idx = 0 + + elif idx > self.read_length - 1: + idx = self.read_length - 1 + + return idx + + def get_quality_scores( + self, + model_read_length: int, + length: int, + rng: np.random.Generator, + ) -> np.ndarray: + """Generate a synthetic quality score array. + + If transitions are provided, use the Markov-based model. + + If transitions are not provided, use an empirical version of the model. + """ + + _ = model_read_length + + if length <= 0: + return np.zeros(0, dtype=int) + + qualities = np.zeros(length, dtype=int) + + # Sample initial quality from the starting distribution + qualities[0] = int(rng.choice(self._init_scores, p=self._init_probs)) + + for i in range(1, length): + p_cur = self._position_index_for_length(i, length) + + q = None + + if self._has_transitions and self._trans_values_by_pos: + + p_prev = self._position_index_for_length(i - 1, length) + + # Transitions are defined for positions + p_prev = min(p_prev, len(self._trans_values_by_pos) - 1) + + q_prev = int(qualities[i - 1]) + vals_map = self._trans_values_by_pos[p_prev] + probs_map = self._trans_probs_by_pos[p_prev] + + if q_prev in vals_map: + vals = vals_map[q_prev] + probs = probs_map[q_prev] + q = int(rng.choice(vals, p=probs)) + + # Use marginal method if no transition row + if q is None: + p_idx = min(p_cur, len(self._values_by_pos) - 1) + vals = self._values_by_pos[p_idx] + probs = self._probs_by_pos[p_idx] + q = int(rng.choice(vals, p=probs)) + + # Clip to valid range + if q < 0: + q = 0 + elif q > self.max_quality: + q = self.max_quality + + qualities[i] = q + + return qualities diff --git a/neat/models/quality_score_model.py b/neat/models/quality_score_model.py deleted file mode 100644 index 2c52987b..00000000 --- a/neat/models/quality_score_model.py +++ /dev/null @@ -1,426 +0,0 @@ -import pysam -import numpy as np -import pandas as pd -import matplotlib.pyplot as plt -from matplotlib.patches import Patch -import seaborn as sns -from scipy.stats import ttest_ind, ttest_rel, f_oneway, norm, levene, shapiro -from sklearn.utils import resample -import pathlib -import pickle - - -def make_qual_score_list(bam_file): - """Takes an input BAM file and creates lists of quality scores. This becomes a data frame, which will be - pre-processed for Markov chain analysis.""" - - index = f"{bam_file}.bai" - - if not pathlib.Path(index).exists(): - print("No index found, creating one.") - - pysam.index(bam_file) - - file_to_parse = pysam.AlignmentFile(bam_file, "rb", check_sq=False) - num_recs = file_to_parse.count() - print(f"{num_recs} records to parse") - - modulo = round(num_recs / 9) - - qual_list = [] - i = 0 - j = 0 - - def print_update(number, factor, percent): - - if number % factor == 0: - percent += 10 - print(f"{percent}% complete", end="\r") - - return percent - - print("Parsing file") - - for item in file_to_parse.fetch(): - - if item.is_unmapped or "S" in item.cigarstring: - i += 1 - j = print_update(i, modulo, j) - - continue - - # mapping quality scores - - align_qual = item.query_alignment_qualities - - # append to master lists - - qual_list.append(align_qual) - i += 1 - j = print_update(i, modulo, j) - - print(f"100% complete") - file_to_parse.close() - - quality_df = pd.DataFrame(qual_list) # turn list of lists into a dataframe - quality_df = quality_df.fillna(0) # pre-processing - fill in missing data - - # filters to process outliers - - quality_df[quality_df > 40] = 40 - quality_df[quality_df < 0] = 0 - - return quality_df - - -def estimate_transition_probabilities(std_dev): - """Takes a standard deviation as an input and generates the transition probabilities with a normal - distribution that can be used to represent a Markov process.""" - - # define the probabilities for transition states based on a normal distribution - - transition_probs = { - -3: norm.pdf(-3, 0, std_dev), - -2: norm.pdf(-2, 0, std_dev), - -1: norm.pdf(-1, 0, std_dev), - 0: norm.pdf(0, 0, std_dev), - 1: norm.pdf(1, 0, std_dev), - 2: norm.pdf(2, 0, std_dev), - 3: norm.pdf(3, 0, std_dev), - } - - # normalize the probabilities to sum to 1 - - total_prob = sum(transition_probs.values()) - - for k in transition_probs: - transition_probs[k] /= total_prob - - return transition_probs - - -def apply_markov_chain(quality_df, noise_level=10, std_dev=2): - """Takes a data frame representing quality scores by position along a read and parameters to increase - variability in the Markov process as inputs and generates predictions based on an ergodic Markov chain. - Generates a data frame with simulated reads.""" - - transition_probs = estimate_transition_probabilities(std_dev) - num_rows, num_cols = quality_df.shape - - count = 0 - markov_preds = [] - - for row in quality_df.iterrows(): - - qualities = row[1].values - pred_qualities = np.zeros_like(qualities) - pred_qualities[0] = qualities[0] # initial state - - print(count, ":", qualities) - row_mean = np.mean(qualities) - row_median = np.median(qualities) - row_std = np.std(qualities) - - for i in range(1, len(quality_df.columns)): - prev_quality = pred_qualities[i - 1] - transitions = list(transition_probs.keys()) - probabilities = list(transition_probs.values()) - next_quality = np.random.choice(transitions, p=probabilities) - - pred_qualities[i] = max(0, prev_quality + next_quality) # ensuring no negative qualities - - # the noise parameter prevents long stretches of the predicted quality scores being very similar - - pred_qualities[i] += np.random.normal(0, noise_level) # add some noise to the predictions - pred_qualities[i] = min(max(pred_qualities[i], 0), 40) # finalize range - - print(count, "mean:", row_mean, "median:", row_median, "st dev:", row_std) - count += 1 - - for i in range(1, len(quality_df.columns)): - - if pred_qualities[i] < row_mean - 2 * row_std: - - if np.random.rand() < 0.95: # 95% chance to substitute abnormally low quality scores - - # uses median and standard deviation from read (not the mean because of outliers) - - new_quality = np.random.normal(row_median, row_std) - pred_qualities[i] = min(max(new_quality, 0), 40) - - # the maximum predicted quality score should be derived from the data - - max_quality = np.max(qualities) - pred_qualities = np.clip(pred_qualities, 0, max_quality) - - markov_preds.append(pred_qualities) - - # randomly sample 30% of the total quality scores for a given read to have the maximum value - - num_samples = int(0.3 * len(quality_df.columns)) # TO DO: make this a parameter - sample_indices = np.random.choice(len(quality_df.columns), num_samples, replace=False) - pred_qualities[sample_indices] = max_quality - - markov_preds_df = pd.DataFrame(markov_preds) - - # apply final linear transformations - - edge_len = int(len(quality_df.columns) * 0.01) - # mid_start = int(len(quality_df.columns) * 0.40) - # mid_end = int(len(quality_df.columns) * 0.60) - - markov_preds_df.iloc[:, :edge_len] -= 5 - markov_preds_df.iloc[:, :edge_len] = markov_preds_df.iloc[:, :edge_len].clip(lower=0) - - markov_preds_df.iloc[:, -edge_len:] -= 5 - markov_preds_df.iloc[:, -edge_len:] = markov_preds_df.iloc[:, -edge_len:].clip(lower=0) - - # markov_preds_df.iloc[:, mid_start:mid_end] += 2 - - return markov_preds_df - - -def plot_heatmap(y_preds_df, file_path): - """Takes a dataframe of predicted quality scores and plots a seaborn heatmap to visualize them.""" - - sns.heatmap(y_preds_df, vmin=0, vmax=max(markov_preds_df.max()), cmap="viridis") - plt.savefig(file_path) - print("Heatmap plotted") - - -def save_file(df, csv_file_path, pickle_file_path): - """Saves the dataframe to a CSV file and a pickle file.""" - - df.to_csv(csv_file_path, index=False) - - with open(pickle_file_path, "wb") as f: - pickle.dump(df, f) - - print(f"Data saved to {csv_file_path} and {pickle_file_path}") - - -def compare_quality_scores(quality_df, markov_preds_df): - """Compares the means and variances of quality scores within each row of the original and predicted data frames.""" - - mean_p_values = [] - variance_p_values = [] - - for i in range(len(quality_df)): - original = quality_df.iloc[i].values - predicted = markov_preds_df.iloc[i].values - - original_mean = np.mean(original) - predicted_mean = np.mean(predicted) - - # print the pairs of means - - print(f'Row {i}: Original Mean = {original_mean}, Predicted Mean = {predicted_mean}') - - # test for equality of means - - t_stat, mean_p_value = ttest_ind(original, predicted, equal_var=False) - mean_p_values.append(mean_p_value) - - # test for equality of variances - - stat, variance_p_value = levene(original, predicted) - variance_p_values.append(variance_p_value) - - return mean_p_values, variance_p_values - - -def plot_comparison_results(mean_p_values, variance_p_values): - """Plots the comparison results for means and variances of quality scores.""" - - rows = range(len(mean_p_values)) - - fig, ax = plt.subplots(1, 2, figsize=(12, 6)) - - ax[0].scatter(rows, mean_p_values, marker='o') - ax[0].set_title('P-values for Equality of Means') - ax[0].set_xlabel('Row Index') - ax[0].set_ylabel('P-value') - - ax[1].scatter(rows, variance_p_values, marker='o') - ax[1].set_title('P-values for Equality of Variances') - ax[1].set_xlabel('Row Index') - ax[1].set_ylabel('P-value') - - plt.tight_layout() - plt.show() - - -def calculate_row_means(df): - return df.mean(axis=1) - - -def calculate_row_variances(df): - return df.var(axis=1) - - -def bootstrap_p_values(list1, list2, n_bootstrap=1000): - bootstrapped_p_values = [] - - for _ in range(n_bootstrap): - sample1 = resample(list1, replace=True) - sample2 = resample(list2, replace=True) - - t_stat, p_value = ttest_ind(sample1, sample2, equal_var=False) - bootstrapped_p_values.append(p_value) - - return bootstrapped_p_values - - -def permutation_test(list1, list2, n_permutations=10000): - observed_diff = abs(np.mean(list1) - np.mean(list2)) - combined = np.concatenate([list1, list2]) - perm_diffs = np.zeros(n_permutations) - - for i in range(n_permutations): - np.random.shuffle(combined) - perm_list1 = combined[:len(list1)] - perm_list2 = combined[len(list1):] - perm_diffs[i] = abs(np.mean(perm_list1) - np.mean(perm_list2)) - - p_value = np.sum(perm_diffs >= observed_diff) / n_permutations - return p_value - - -def test_normality(quality_df, markov_preds_df): - """Tests normality of quality scores within each row using the Shapiro-Wilk test.""" - - quality_df_normality = [] - markov_preds_df_normality = [] - - for i in range(len(quality_df)): - original = quality_df.iloc[i].values - predicted = markov_preds_df.iloc[i].values - - # test normality for the original data - - stat, p_value = shapiro(original) - quality_df_normality.append(p_value > 0.05) - - # test normality for the predicted data - - stat, p_value = shapiro(predicted) - markov_preds_df_normality.append(p_value > 0.05) - - return quality_df_normality, markov_preds_df_normality - - -def plot_normality_results(quality_df_normality, markov_preds_df_normality): - """Plots the normality results for quality scores in side by side heatmaps.""" - - fig, ax = plt.subplots(1, 2, figsize=(12, 6)) - - quality_df_normality = np.array(quality_df_normality).reshape(-1, 1) - markov_preds_df_normality = np.array(markov_preds_df_normality).reshape(-1, 1) - - # create a color map for the heatmap - - cmap = sns.color_palette(["orange", "blue"]) - - sns.heatmap(quality_df_normality, ax=ax[0], cbar=False, cmap=cmap) - ax[0].set_title('Normality of Original Quality Scores') - ax[0].set_xlabel('Quality Scores') - ax[0].set_ylabel('Row Index') - - sns.heatmap(markov_preds_df_normality, ax=ax[1], cbar=False, cmap=cmap) - ax[1].set_title('Normality of Predicted Quality Scores') - ax[1].set_xlabel('Quality Scores') - - legend_elements = [Patch(facecolor='orange', edgecolor='black', label='Non-normal'), - Patch(facecolor='blue', edgecolor='black', label='Normal')] - fig.legend(handles=legend_elements, loc='lower right', title='Distribution') - - plt.tight_layout() - plt.show() - - -# example usage - -# bam_file = "/Users/keshavgandhi/Downloads/H1N1.bam" - -bam_file = "/Users/keshavgandhi/Downloads/subsample_3.125.bam" -quality_df = make_qual_score_list(bam_file) - -markov_preds_df = apply_markov_chain(quality_df) - -# plot_heatmap(markov_preds_df, 'markov_chain_heatmap.svg') -# save_to_csv_and_pickle(markov_preds_df, 'markov_preds.csv', 'markov_preds.pickle') - -sns.heatmap(quality_df, vmin=0, vmax=max(quality_df.max()), cmap='viridis') -sns.heatmap(markov_preds_df, vmin=0, vmax=max(markov_preds_df.max()), cmap='viridis') - -# markov_preds_df - -# for i in range (1, max(markov_preds_df)): -# print(max(markov_preds_df[i])) - -# quality_df.iloc[100][25:75] -# markov_preds_df.iloc[100][25:75] - -bam_file = "/Users/keshavgandhi/Downloads/H1N1.bam" -test_df = make_qual_score_list(bam_file) -markov_preds_df = apply_markov_chain(test_df) - -# compare quality scores - -mean_p_values, variance_p_values = compare_quality_scores(test_df, markov_preds_df) - -markov_means = calculate_row_means(markov_preds_df).tolist() -quality_means = calculate_row_means(test_df).tolist() - -markov_variances = calculate_row_variances(markov_preds_df).tolist() -quality_variances = calculate_row_variances(test_df).tolist() - -# perform permutation test - -permutation_p_value_means = permutation_test(markov_means, quality_means) - -# perform two-sample t-test - -t_stat_means, ttest_p_value_means = ttest_ind(markov_means, quality_means, equal_var=False) - -# bootstrap analysis for means - -bootstrapped_p_values_means = bootstrap_p_values(markov_means, quality_means) -mean_bootstrap_p_value_means = np.mean(bootstrapped_p_values_means) -std_bootstrap_p_value_means = np.std(bootstrapped_p_values_means) - -print(f'Permutation test p-value (means): {permutation_p_value_means}') -print(f'Two-sample t-test p-value (means): {ttest_p_value_means}') -print(f'Bootstrap mean p-value (means): {mean_bootstrap_p_value_means}') -print(f'Bootstrap p-value standard deviation (means): {std_bootstrap_p_value_means}') - -# perform permutation test for variances - -permutation_p_value_variances = permutation_test(markov_variances, quality_variances) - -# perform two-sample t-test for variances - -t_stat_variances, ttest_p_value_variances = ttest_ind(markov_variances, quality_variances, equal_var=False) - -# perform bootstrap analysis for variances - -bootstrapped_p_values_variances = bootstrap_p_values(markov_variances, quality_variances) -mean_bootstrap_p_value_variances = np.mean(bootstrapped_p_values_variances) -std_bootstrap_p_value_variances = np.std(bootstrapped_p_values_variances) - -print(f'Permutation test p-value (variances): {permutation_p_value_variances}') -print(f'Two-sample t-test p-value (variances): {ttest_p_value_variances}') -print(f'Bootstrap mean p-value (variances): {mean_bootstrap_p_value_variances}') -print(f'Bootstrap p-value standard deviation (variances): {std_bootstrap_p_value_variances}') - -# plot comparison results - -plot_comparison_results(mean_p_values, variance_p_values) - -# test normality - -quality_df_normality_test, markov_preds_df_normality_test = test_normality(test_df, markov_preds_df) - -# plot normality results - -plot_normality_results(quality_df_normality_test, markov_preds_df_normality_test) diff --git a/neat/quality_score_modeling/__init__.py b/neat/quality_score_modeling/__init__.py new file mode 100644 index 00000000..3e110be4 --- /dev/null +++ b/neat/quality_score_modeling/__init__.py @@ -0,0 +1,7 @@ +""" +Load quality score modeling utilities for NEAT +""" + +__all__ = ["build_markov_model"] + +from .markov_utils import build_markov_model \ No newline at end of file diff --git a/neat/quality_score_modeling/markov_utils.py b/neat/quality_score_modeling/markov_utils.py new file mode 100644 index 00000000..d75c7a47 --- /dev/null +++ b/neat/quality_score_modeling/markov_utils.py @@ -0,0 +1,234 @@ +""" +Utility functions for building a position-specific Markov quality score model. + +The functions in this module extract per-read quality scores from FASTQ files +and compute the information required by the ``MarkovQualityModel``. +""" + +import logging +from bisect import bisect_right +from collections import defaultdict +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Tuple + +from ..common import open_input +from ..model_sequencing_error.utils import convert_quality_string + +_LOG = logging.getLogger(__name__) + +__all__ = [ + "read_quality_lists", + "compute_initial_distribution", + "compute_position_distributions", + "compute_transition_distributions", + "build_markov_model", +] + + +def _down_bin_quality(q: int, allowed: List[int]) -> int: + """ + Map q to the greatest allowed value <= q (down-binning). + If q is below the smallest allowed, map to allowed[0]. + """ + + if not allowed: + return int(q) + + q = int(q) + i = bisect_right(allowed, q) - 1 + + if i < 0: + return int(allowed[0]) + + return int(allowed[i]) + + +def read_quality_lists( + files: Iterable[str], + max_reads: int | float, + offset: int, + allowed_quality_scores: Optional[Iterable[int]] = None, +) -> Tuple[List[List[int]], int]: + """Read per-read quality scores from one or more FASTQ files. + + Returns (qualities, read_length). If no reads are found, returns ([], 0). + """ + + allowed_sorted: Optional[List[int]] = None + + if allowed_quality_scores is not None: + allowed_sorted = sorted({int(x) for x in allowed_quality_scores}) + + if not allowed_sorted: + allowed_sorted = None + + qualities: List[List[int]] = [] + read_length = 0 + + for file in files: + + path = Path(file) + if not path.exists(): + raise FileNotFoundError(f"Input FASTQ file not found: {file}") + + reads_to_parse = max_reads + + if reads_to_parse in (None, -1): + reads_to_parse = float("inf") + + reads_read = 0 + + with open_input(file) as fq_in: + + while reads_read < reads_to_parse: + + # FASTQ format: four lines per record + header = fq_in.readline() + + if not header: + break # end of file + + _ = fq_in.readline() # seq + _ = fq_in.readline() # plus + qual = fq_in.readline() + + if not qual: + break + + qlist = convert_quality_string(qual.strip(), offset) + + if not read_length: + read_length = len(qlist) + + # Skip reads that do not match the inferred read length + if len(qlist) != read_length: + _LOG.debug( + "Skipping record of length %d (expected %d)", + len(qlist), + read_length, + ) + continue + + if allowed_sorted is not None: + qlist = [_down_bin_quality(q, allowed_sorted) for q in qlist] + + qualities.append(qlist) + reads_read += 1 + + return qualities, read_length + + +def compute_initial_distribution(qualities: Iterable[List[int]]) -> Dict[int, float]: + """Counts of Q at position 0.""" + + counts: Dict[int, float] = defaultdict(float) + + for qlist in qualities: + if qlist: + counts[int(qlist[0])] += 1.0 + + return dict(counts) + + +def compute_position_distributions( + qualities: Iterable[List[int]], + read_length: int, +) -> List[Dict[int, float]]: + """Counts of Q at position i.""" + + if read_length <= 0: + return [] + + histograms: List[Dict[int, float]] = [defaultdict(float) for _ in range(read_length)] + + for qlist in qualities: + + if len(qlist) != read_length: + continue + + for i, q in enumerate(qlist): + histograms[i][int(q)] += 1.0 + + return [dict(h) for h in histograms] + + +def compute_transition_distributions( + qualities: Iterable[List[int]], + read_length: int, +) -> List[Dict[int, Dict[int, float]]]: + """ + Transition counts per position i (0..read_length-2): + trans[i][q_prev][q_next] += 1 + """ + + if read_length <= 1: + return [] + + trans: List[Dict[int, Dict[int, float]]] = [ + defaultdict(lambda: defaultdict(float)) for _ in range(read_length - 1) + ] + + for qlist in qualities: + + if len(qlist) != read_length: + continue + + for i in range(read_length - 1): + q_prev = int(qlist[i]) + q_next = int(qlist[i + 1]) + trans[i][q_prev][q_next] += 1.0 + + # Convert nested defaultdicts to plain dicts + out: List[Dict[int, Dict[int, float]]] = [] + + for i in range(read_length - 1): + + pos_dict: Dict[int, Dict[int, float]] = {} + + for q_prev, nexts in trans[i].items(): + pos_dict[int(q_prev)] = {int(qn): float(c) for qn, c in nexts.items()} + + out.append(pos_dict) + + return out + + +def build_markov_model( + files: Iterable[str], + max_reads: int, + offset: int, + allowed_quality_scores: Optional[Iterable[int]] = None, +) -> Tuple[ + Dict[int, float], + List[Dict[int, float]], + List[Dict[int, Dict[int, float]]], + int, + int, +]: + """Wrapper to create the Markov model.""" + + qualities, read_length = read_quality_lists( + files, max_reads, offset, allowed_quality_scores=allowed_quality_scores + ) + + if not qualities: + raise ValueError("No quality scores could be read from the input files.") + + init_dist = compute_initial_distribution(qualities) + pos_dists = compute_position_distributions(qualities, read_length) + trans_dists = compute_transition_distributions(qualities, read_length) + + # Determine maximum quality (post-binning, if applied) + max_quality = 0 + for qlist in qualities: + if qlist: + max_quality = max(max_quality, max(qlist)) + + # If user supplied bins, max_quality should not exceed the max bin. + if allowed_quality_scores is not None: + bins = sorted({int(x) for x in allowed_quality_scores}) + + if bins: + max_quality = min(max_quality, max(bins)) + + return init_dist, pos_dists, trans_dists, int(max_quality), int(read_length) diff --git a/tests/test_models/test_error_and_mut_models.py b/tests/test_models/test_error_and_mut_models.py index a255ac96..5c3d4c80 100644 --- a/tests/test_models/test_error_and_mut_models.py +++ b/tests/test_models/test_error_and_mut_models.py @@ -189,4 +189,4 @@ def test_sequencing_error_model_nonzero_error_introduces_in_bounds_errors(): for e in introduced: assert 0 <= e.location < len(ref) assert e.ref in ["A", "C", "G", "T"] - assert e.alt in ["A", "C", "G", "T"] \ No newline at end of file + assert e.alt in ["A", "C", "G", "T"] diff --git a/tests/test_models/test_qual_score_models.py b/tests/test_models/test_qual_score_models.py new file mode 100644 index 00000000..9a8485eb --- /dev/null +++ b/tests/test_models/test_qual_score_models.py @@ -0,0 +1,80 @@ +""" +Unit tests for MarkovQualityModel +""" + +import pytest +import numpy as np +from numpy.random import default_rng + +from neat.models.markov_quality_model import MarkovQualityModel + + +def test_markov_quality_model_shapes_and_range(): + """ + Basic sanity check for MarkovQualityModel: output array shape and bounds. + """ + rng = default_rng(11) + # Simple symmetric distributions around a high-quality region + init_dist = {30: 1.0, 31: 1.0, 32: 1.0} + single_pos_dist = {-1: 1.0, 0: 2.0, 1: 1.0} + max_q = 42 + read_length = 151 + # Position-specific distributions that reuse the same shape at every position + position_distributions = [single_pos_dist] * read_length + qm = MarkovQualityModel(init_dist, position_distributions, max_q, read_length) + qs = qm.get_quality_scores(model_read_length=read_length, length=75, rng=rng) + assert isinstance(qs, np.ndarray) + assert len(qs) == 75 + # Ensure we stay within the valid range + assert qs.min() >= 0 + assert qs.max() <= max_q + + +def test_markov_quality_model_quality_scores_property_matches_range(): + """ + quality_scores should expose the full discrete range [0, max_quality]. + """ + init_dist = {35: 1.0} + single_pos_dist = {0: 1.0} + max_q = 40 + read_length = 151 + position_distributions = [single_pos_dist] * read_length + qm = MarkovQualityModel(init_dist, position_distributions, max_q, read_length) + scores = qm.quality_scores + assert isinstance(scores, list) + assert scores[0] == 0 + assert scores[-1] == max_q + # The range should be contiguous + assert scores == list(range(0, max_q + 1)) + + +def test_markov_quality_model_reproducible_with_seed(): + """Markov quality model should be deterministic for a fixed RNG state.""" + init_dist = {30: 1.0, 31: 1.0} + single_pos_dist = {-1: 1.0, 0: 2.0, 1: 1.0} + max_q = 42 + read_length = 151 + position_distributions = [single_pos_dist] * read_length + qm = MarkovQualityModel(init_dist, position_distributions, max_q, read_length) + rng1 = default_rng(12) + rng2 = default_rng(12) + qs1 = qm.get_quality_scores(model_read_length=read_length, length=100, rng=rng1) + qs2 = qm.get_quality_scores(model_read_length=read_length, length=100, rng=rng2) + assert isinstance(qs1, np.ndarray) + assert isinstance(qs2, np.ndarray) + assert np.array_equal(qs1, qs2) + + +def test_markov_quality_model_invalid_initial_distribution_raises(): + """ + The model should reject empty or zero-mass initial distributions. + """ + read_length = 151 + single_pos_dist = {0: 1.0} + position_distributions = [single_pos_dist] * read_length + # Empty initial distribution + with pytest.raises(ValueError): + MarkovQualityModel({}, position_distributions, 40, read_length) + # Zero total mass in initial distribution + with pytest.raises(ValueError): + MarkovQualityModel({30: 0.0}, position_distributions, 40, read_length) diff --git a/tests/test_read_simulator/test_options.py b/tests/test_read_simulator/test_options.py index dad1cbfe..8f9c1e8a 100644 --- a/tests/test_read_simulator/test_options.py +++ b/tests/test_read_simulator/test_options.py @@ -207,4 +207,4 @@ def test_from_cli_reuse_splits_missing_dir_raises(tmp_path: _PathAlias): outdir.mkdir(parents=True, exist_ok=True) with _pytest.raises(FileNotFoundError, match=r"reuse_splits=True"): - Options.from_cli(outdir, "reuse", yml_path) \ No newline at end of file + Options.from_cli(outdir, "reuse", yml_path)