From 53fee60f0316f15ac97d52cda5471dd26bd842b1 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 8 Sep 2025 21:21:42 -0300 Subject: [PATCH 01/11] Separate writing to stdout from read filtering using a MPSC channel - Use a `Rayon::scope` to spawn two tasks: one for filtering reads, one for printing. - Introduce `WritableRecord` struct to hold data needed for printing. - Move the `write_record` function into the `WritableRecord` implementation. --- src/main.rs | 117 ++++++++++++++++++++++++++----------------------- src/records.rs | 45 +++++++++++++++++++ 2 files changed, 108 insertions(+), 54 deletions(-) create mode 100644 src/records.rs diff --git a/src/main.rs b/src/main.rs index b9d98d6..d523e04 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,14 @@ use std::error::Error; use std::io::Read; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::mpsc::Sender; +use std::sync::{mpsc, Arc}; +use records::WritableRecord; use trimmers::*; use utils::file_reader; +mod records; mod trimmers; mod utils; @@ -192,69 +195,75 @@ where let total_reads_ = Arc::new(AtomicUsize::new(0)); let output_reads_ = Arc::new(AtomicUsize::new(0)); - fastq::Reader::new(input) - .records() - .par_bridge() - .for_each(|record| { - let record = record.expect("ERROR: problem parsing fastq record"); - total_reads_.fetch_add(1, Ordering::SeqCst); + let (sender, receiver): (Sender, _) = mpsc::channel(); - if record.is_empty() { - return; + rayon::scope(|s| { + s.spawn(move |_| { + while let Ok(writable_record) = receiver.recv() { + writable_record.write_record(); } + }); - let valid_qual = is_valid_quality(&record, args.minqual, args.maxqual); - let valid_len = is_valid_length(&record, args.minlength, args.maxlength); - - // If a GC content filter is set, validate the GC content; otherwise, assume it is valid. - let valid_gc_p = if args.mingc.is_some() || args.maxgc.is_some() { - is_valid_gc_percent(&record, args.mingc.unwrap_or(0.0), args.maxgc.unwrap_or(1.0)) - } else { true }; - - // If a contaminants filter is set, validate the reads; otherwise, assume it is valid - let is_not_contam = if let Some(ref aligner) = aligner_option { - !is_contamination(&record.seq(), aligner) - } else { true }; - - if (valid_gc_p && valid_len && valid_qual && is_not_contam) ^ args.inverse { - let pos_option = if let Some(ref trimmer) = trimmer_strategy { - trimmer.trim(&record) - } else { - Some((0, record.seq().len())) - }; - - if let Some((start, end)) = pos_option { - write_record(&record, start, end); - output_reads_.fetch_add(1, Ordering::SeqCst); - } - } + s.spawn(|_| { + fastq::Reader::new(input).records() + .par_bridge() + .for_each(|record| { + let record = record.expect("ERROR: problem parsing fastq record"); + total_reads_.fetch_add(1, Ordering::SeqCst); + + if record.is_empty() { + return; + } + + let valid_qual = is_valid_quality(&record, args.minqual, args.maxqual); + let valid_len = is_valid_length(&record, args.minlength, args.maxlength); + + // If a GC content filter is set, validate the GC content; otherwise, assume it is valid. + let valid_gc_p = if args.mingc.is_some() || args.maxgc.is_some() { + is_valid_gc_percent(&record, args.mingc.unwrap_or(0.0), args.maxgc.unwrap_or(1.0)) + } else { true }; + + // If a contaminants filter is set, validate the reads; otherwise, assume it is valid + let is_not_contam = if let Some(ref aligner) = aligner_option { + !is_contamination(&record.seq(), aligner) + } else { true }; + + if (valid_gc_p && valid_len && valid_qual && is_not_contam) ^ args.inverse { + let pos_option = if let Some(ref trimmer) = trimmer_strategy { + trimmer.trim(&record) + } else { + Some((0, record.seq().len())) + }; + + if let Some((start, end)) = pos_option { + let writable_record = WritableRecord::new( + record, + start, + end, + ); + + // It's not necessary to handle this, because the receiver remains pending + // until the last record has been processed. + let _ = sender.send(writable_record); + + // write_record(&record, start, end); + output_reads_.fetch_add(1, Ordering::SeqCst); + } + } + }); + + // Drop the sender to signal that no more fastq::Records will be sent + // and allow the receiver to finish processing the last record. + drop(sender); }); + }); + let output_reads = output_reads_.load(Ordering::SeqCst); let total_reads = total_reads_.load(Ordering::SeqCst); eprintln!("Kept {output_reads} reads out of {total_reads} reads"); } -/// Write a record to stdout -fn write_record(record: &fastq::Record, start_pos: usize, end_pos: usize) { - // Use a single formatted string with one allocation for the header - let header = match record.desc() { - Some(d) => format!("@{} {}", record.id(), d), - None => format!("@{}", record.id()), - }; - - // Apply the trimming to both sequence and quality data - let seq_slice = &record.seq()[start_pos..end_pos]; - let qual_slice = &record.qual()[start_pos..end_pos]; - - // Use a single print to minimize syscalls - println!( - "{}\n{}\n+\n{}", - header, - unsafe { std::str::from_utf8_unchecked(seq_slice) }, - unsafe { std::str::from_utf8_unchecked(qual_slice) } - ); -} /// This function calculates the average quality of a read, and does this correctly /// First the Phred scores are converted to probabilities using `phred_quality_to_probability` and summed diff --git a/src/records.rs b/src/records.rs new file mode 100644 index 0000000..fa1e160 --- /dev/null +++ b/src/records.rs @@ -0,0 +1,45 @@ +use bio::io::fastq; + +pub struct WritableRecord { + record: fastq::Record, + start: usize, + end: usize, +} + +impl WritableRecord { + /// Creates a `WritableRecord` from a FASTQ `Record`, restricting it + /// to the subsequence defined by `start..end`. + /// + /// # Arguments + /// * `record` - Original FASTQ record. + /// * `start` - Start index (inclusive). + /// * `end` - End index (exclusive). + pub fn new(record: fastq::Record, start: usize, end: usize) -> Self { + WritableRecord { + record, + start, + end, + } + } + + /// Write a record to stdout + pub fn write_record(&self) { + // Use a single formatted string with one allocation for the header + let header = match self.record.desc() { + Some(d) => format!("@{} {}", self.record.id(), d), + None => format!("@{}", self.record.id()), + }; + + // Apply the trimming to both sequence and quality data + let seq_slice = &self.record.seq()[self.start..self.end]; + let qual_slice = &self.record.qual()[self.start..self.end]; + + // Use a single print to minimize syscalls + println!( + "{}\n{}\n+\n{}", + header, + unsafe { std::str::from_utf8_unchecked(seq_slice) }, + unsafe { std::str::from_utf8_unchecked(qual_slice) } + ); + } +} From e34c7c01b574f17bc148099f8d32ffba9326a5c8 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 9 Sep 2025 21:49:21 -0300 Subject: [PATCH 02/11] Improve stdout writing performance with BufWriter and refactors - Use BufWriter to reduce syscall overhead when writing to stdout. - Move `output_reads_` counting to the writer thread to avoid parallel writes. - Relax atomic ordering from `SeqCst` to `Relaxed` for read counting. - Store pre-formatted FASTQ strings instead of `fastq::Record` to leverage parallel processing before sending through the channel. --- src/main.rs | 19 +++++++++++++------ src/records.rs | 36 ++++++++++++++++++++---------------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/main.rs b/src/main.rs index d523e04..bd7d9b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use clap::{ Parser, ValueEnum}; use minimap2::*; use rayon::prelude::*; use std::error::Error; -use std::io::Read; +use std::io::{BufWriter, Read, Write}; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::Sender; @@ -194,14 +194,24 @@ where let total_reads_ = Arc::new(AtomicUsize::new(0)); let output_reads_ = Arc::new(AtomicUsize::new(0)); + let output_reads_2 = Arc::clone(&output_reads_); let (sender, receiver): (Sender, _) = mpsc::channel(); rayon::scope(|s| { s.spawn(move |_| { + let mut read_counter: usize = 0; + let stdout = std::io::stdout(); + let mut writter = BufWriter::new(stdout.lock()); + while let Ok(writable_record) = receiver.recv() { - writable_record.write_record(); + let _ = writable_record.write_on_buffer(&mut writter); + + read_counter += 1; } + + writter.flush().unwrap(); + output_reads_2.fetch_add(read_counter, Ordering::Relaxed); }); s.spawn(|_| { @@ -209,7 +219,7 @@ where .par_bridge() .for_each(|record| { let record = record.expect("ERROR: problem parsing fastq record"); - total_reads_.fetch_add(1, Ordering::SeqCst); + total_reads_.fetch_add(1, Ordering::Relaxed); if record.is_empty() { return; @@ -245,9 +255,6 @@ where // It's not necessary to handle this, because the receiver remains pending // until the last record has been processed. let _ = sender.send(writable_record); - - // write_record(&record, start, end); - output_reads_.fetch_add(1, Ordering::SeqCst); } } }); diff --git a/src/records.rs b/src/records.rs index fa1e160..b83529b 100644 --- a/src/records.rs +++ b/src/records.rs @@ -1,9 +1,9 @@ +use std::io::{BufWriter, StdoutLock, Write}; + use bio::io::fastq; pub struct WritableRecord { - record: fastq::Record, - start: usize, - end: usize, + record: String, } impl WritableRecord { @@ -15,31 +15,35 @@ impl WritableRecord { /// * `start` - Start index (inclusive). /// * `end` - End index (exclusive). pub fn new(record: fastq::Record, start: usize, end: usize) -> Self { + let record = WritableRecord::record_to_string(&record, start, end); + WritableRecord { record, - start, - end, } } - - /// Write a record to stdout - pub fn write_record(&self) { + + /// Writes the record to the provided buffer for stdout output. + pub fn write_on_buffer(&self, buf: &mut BufWriter) -> Result { + buf.write(self.record.as_bytes()) + } + + /// Converts a `fastq::record` into a valid FASTQ string within the range `[start..end]`. + fn record_to_string(record: &fastq::Record, start: usize, end: usize) -> String { // Use a single formatted string with one allocation for the header - let header = match self.record.desc() { - Some(d) => format!("@{} {}", self.record.id(), d), - None => format!("@{}", self.record.id()), + let header = match record.desc() { + Some(d) => format!("@{} {}", record.id(), d), + None => format!("@{}", record.id()), }; // Apply the trimming to both sequence and quality data - let seq_slice = &self.record.seq()[self.start..self.end]; - let qual_slice = &self.record.qual()[self.start..self.end]; + let seq_slice = &record.seq()[start..end]; + let qual_slice = &record.qual()[start..end]; - // Use a single print to minimize syscalls - println!( + format!( "{}\n{}\n+\n{}", header, unsafe { std::str::from_utf8_unchecked(seq_slice) }, unsafe { std::str::from_utf8_unchecked(qual_slice) } - ); + ) } } From 83ef52b2681985f7bf91b845bc3d6e94c6e8b726 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 12 Sep 2025 20:58:46 -0300 Subject: [PATCH 03/11] fix: add missing newline at end of generated FASTQ record string --- src/records.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/records.rs b/src/records.rs index b83529b..b47aa98 100644 --- a/src/records.rs +++ b/src/records.rs @@ -40,7 +40,7 @@ impl WritableRecord { let qual_slice = &record.qual()[start..end]; format!( - "{}\n{}\n+\n{}", + "{}\n{}\n+\n{}\n", header, unsafe { std::str::from_utf8_unchecked(seq_slice) }, unsafe { std::str::from_utf8_unchecked(qual_slice) } From 7b4c4a988719f478e50d599f4ad95394367d7b0a Mon Sep 17 00:00:00 2001 From: diego Date: Sat, 20 Sep 2025 17:03:25 -0300 Subject: [PATCH 04/11] Create a dedicated channel per worker to avoid bottlenecks when sending reads - Add crossbeam crate to provide unbounded channels - Use Select struct to manage multiple receivers --- Cargo.lock | 34 +++++++++++++++++++++++- Cargo.toml | 1 + src/main.rs | 76 +++++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 96 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 409a788..8046655 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "adler2" @@ -240,6 +240,7 @@ dependencies = [ "bio", "bzip2", "clap", + "crossbeam", "flate2", "minimap2", "rayon", @@ -310,6 +311,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -329,6 +352,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" diff --git a/Cargo.toml b/Cargo.toml index 67ec685..65b0e97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ description = "A tool to filter fastq files" [dependencies] bio = "2.2.0" clap = { version = "4.5.37", features = ["derive"] } +crossbeam = "0.8" rayon = "1.7.0" approx = "0.5.1" minimap2 = "0.1.23+minimap2.2.28" diff --git a/src/main.rs b/src/main.rs index bd7d9b2..003f94b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,13 @@ use bio::io::fastq; use clap::{ Parser, ValueEnum}; +use crossbeam::channel::{Receiver, Select, Sender}; use minimap2::*; use rayon::prelude::*; use std::error::Error; use std::io::{BufWriter, Read, Write}; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::Sender; -use std::sync::{mpsc, Arc}; +use std::sync::{Arc, Mutex}; use records::WritableRecord; use trimmers::*; @@ -196,7 +196,8 @@ where let output_reads_ = Arc::new(AtomicUsize::new(0)); let output_reads_2 = Arc::clone(&output_reads_); - let (sender, receiver): (Sender, _) = mpsc::channel(); + let (senders, receivers) = create_channel_pool(args.threads - 1); + let senders = Arc::new(Mutex::new(senders)); rayon::scope(|s| { s.spawn(move |_| { @@ -204,10 +205,33 @@ where let stdout = std::io::stdout(); let mut writter = BufWriter::new(stdout.lock()); - while let Ok(writable_record) = receiver.recv() { - let _ = writable_record.write_on_buffer(&mut writter); + let mut sel = Select::new(); + for r in &receivers { + sel.recv(&r); + } - read_counter += 1; + let mut current_active_channels = receivers.len(); + + while 0 < current_active_channels { + // Wait until a receive operation becomes ready and try executing it. + let index = sel.ready(); + let res = receivers[index].try_recv(); + + match res { + Ok(writable_record) => { + let _ = writable_record.write_on_buffer(&mut writter); + read_counter += 1; + }, + Err(e) => { + if e.is_empty() { + // No messages available in the channel + continue; + } + // Remove channel because its sender is disconnected + sel.remove(index); + current_active_channels -= 1; + }, + } } writter.flush().unwrap(); @@ -217,7 +241,16 @@ where s.spawn(|_| { fastq::Reader::new(input).records() .par_bridge() - .for_each(|record| { + .for_each_init(|| { + // Return an Option with a Sender that will be used only by one worker + if let Ok(mut senders_guard) = senders.lock() { + senders_guard.pop() + } else { + None + } + }, + + |sender, record| { let record = record.expect("ERROR: problem parsing fastq record"); total_reads_.fetch_add(1, Ordering::Relaxed); @@ -252,16 +285,17 @@ where end, ); - // It's not necessary to handle this, because the receiver remains pending - // until the last record has been processed. - let _ = sender.send(writable_record); + if let Some(ref s) = sender { + // It's not necessary to handle this, because the receiver remains pending + // until the last record has been processed. + let _ = s.send(writable_record); + } else { + // This print is for debugging purposes (should not occur) + eprintln!("Error: failed to send the read for writting"); + } } } }); - - // Drop the sender to signal that no more fastq::Records will be sent - // and allow the receiver to finish processing the last record. - drop(sender); }); }); @@ -271,6 +305,20 @@ where eprintln!("Kept {output_reads} reads out of {total_reads} reads"); } +/// Returns a pair of vectors containing the senders and receivers of `n_channels` +/// unbounded channels. +fn create_channel_pool(n_channels: usize) -> (Vec>, Vec>) { + let mut senders = Vec::with_capacity(n_channels); + let mut receivers = Vec::with_capacity(n_channels); + + for _ in 0..n_channels { + let (tx, rx) = crossbeam::channel::unbounded(); + senders.push(tx); + receivers.push(rx); + } + + (senders, receivers) +} /// This function calculates the average quality of a read, and does this correctly /// First the Phred scores are converted to probabilities using `phred_quality_to_probability` and summed From 3e7ac16f7e968a26a6497d3f86768526f2526f68 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 1 Oct 2025 21:28:39 -0300 Subject: [PATCH 05/11] Separate sequential and parallel execution of the filter function - Introduced `get_valid_segment` to encapsulate record validation logic. - Added `sequential_filter` and `parallel_filter` for more efficient execution. - Refactored `filter` to delegate to the appropriate function. - Fixed typo: renamed variable `writter` to `writer`. --- src/main.rs | 162 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 44 deletions(-) diff --git a/src/main.rs b/src/main.rs index 003f94b..3fd5bed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -191,7 +191,70 @@ where } else { None }; let trimmer_strategy = build_trimming_approach(&args); - + + match args.threads { + 1 => sequential_filter(input, &args, &aligner_option, &trimmer_strategy), + _ => parallel_filter(input, &args, &aligner_option, &trimmer_strategy) + } +} + +/// Applies sequential filtering to the FASTQ records from the given `input`. +/// +/// Each record is validated against filtering criteria such as: +/// - Length +/// - Quality +/// - GC content +/// - Contamination (if an `aligner_option` is provided) +/// +/// If the record passes the filters, an optional trimming strategy can be applied +/// (`trimmer_strategy`). Valid records are then written to the standard output. +fn sequential_filter(input: &mut T, args: &Cli, aligner_option: &Option>>, trimmer_strategy: &Option>) +where + T: Read + std::marker::Send, +{ + let mut total_reads: usize = 0; + let mut output_reads: usize = 0; + + let stdout = std::io::stdout(); + let mut writer = BufWriter::new(stdout.lock()); + + fastq::Reader::new(input).records() + .into_iter() + .for_each(|record| { + let record = record.expect("ERROR: problem parsing fastq record"); + total_reads = total_reads.saturating_add(1); + + if let Some((start, end)) = get_valid_segment(&record, &args, &aligner_option, &trimmer_strategy) { + output_reads = output_reads.saturating_add(1); + + let writable_record = WritableRecord::new( + record, + start, + end, + ); + + let _ = writable_record.write_on_buffer(&mut writer); + } + }); + + writer.flush().unwrap(); + eprintln!("Kept {output_reads} reads out of {total_reads} reads"); +} + +/// Applies parallel filtering to the FASTQ records from the given `input`. +/// +/// Each record is validated against filtering criteria such as: +/// - Length +/// - Quality +/// - GC content +/// - Contamination (if an `aligner_option` is provided) +/// +/// If the record passes the filters, an optional trimming strategy can be applied +/// (`trimmer_strategy`). Valid records are then written to the standard output. +fn parallel_filter(input: &mut T, args: &Cli, aligner_option: &Option>>, trimmer_strategy: &Option>) +where + T: Read + std::marker::Send, +{ let total_reads_ = Arc::new(AtomicUsize::new(0)); let output_reads_ = Arc::new(AtomicUsize::new(0)); let output_reads_2 = Arc::clone(&output_reads_); @@ -203,7 +266,7 @@ where s.spawn(move |_| { let mut read_counter: usize = 0; let stdout = std::io::stdout(); - let mut writter = BufWriter::new(stdout.lock()); + let mut writer = BufWriter::new(stdout.lock()); let mut sel = Select::new(); for r in &receivers { @@ -219,7 +282,7 @@ where match res { Ok(writable_record) => { - let _ = writable_record.write_on_buffer(&mut writter); + let _ = writable_record.write_on_buffer(&mut writer); read_counter += 1; }, Err(e) => { @@ -234,7 +297,7 @@ where } } - writter.flush().unwrap(); + writer.flush().unwrap(); output_reads_2.fetch_add(read_counter, Ordering::Relaxed); }); @@ -253,50 +316,26 @@ where |sender, record| { let record = record.expect("ERROR: problem parsing fastq record"); total_reads_.fetch_add(1, Ordering::Relaxed); - - if record.is_empty() { - return; - } - - let valid_qual = is_valid_quality(&record, args.minqual, args.maxqual); - let valid_len = is_valid_length(&record, args.minlength, args.maxlength); - - // If a GC content filter is set, validate the GC content; otherwise, assume it is valid. - let valid_gc_p = if args.mingc.is_some() || args.maxgc.is_some() { - is_valid_gc_percent(&record, args.mingc.unwrap_or(0.0), args.maxgc.unwrap_or(1.0)) - } else { true }; - - // If a contaminants filter is set, validate the reads; otherwise, assume it is valid - let is_not_contam = if let Some(ref aligner) = aligner_option { - !is_contamination(&record.seq(), aligner) - } else { true }; - - if (valid_gc_p && valid_len && valid_qual && is_not_contam) ^ args.inverse { - let pos_option = if let Some(ref trimmer) = trimmer_strategy { - trimmer.trim(&record) + + if let Some((start, end)) = get_valid_segment(&record, &args, &aligner_option, &trimmer_strategy) { + let writable_record = WritableRecord::new( + record, + start, + end, + ); + + if let Some(ref s) = sender { + // It's not necessary to handle this, because the receiver remains pending + // until the last record has been processed. + let _ = s.send(writable_record); } else { - Some((0, record.seq().len())) - }; - - if let Some((start, end)) = pos_option { - let writable_record = WritableRecord::new( - record, - start, - end, - ); - - if let Some(ref s) = sender { - // It's not necessary to handle this, because the receiver remains pending - // until the last record has been processed. - let _ = s.send(writable_record); - } else { - // This print is for debugging purposes (should not occur) - eprintln!("Error: failed to send the read for writting"); - } + // This case should not happen unless the number of receivers is set to be less than the number of threads. + // (see: https://users.rust-lang.org/t/what-is-the-expected-behavior-of-for-each-init-with-par-bridge-in-rayon/134136/5?u=millarcd) + eprintln!("Error: failed to send the read for writing"); } } }); - }); + }); }); @@ -305,6 +344,41 @@ where eprintln!("Kept {output_reads} reads out of {total_reads} reads"); } +/// Analyzes the quality of a FASTQ record and determines whether it meets the filtering +/// criteria defined by the input parameters. +/// +/// # Returns +/// - `Some((usize, usize))`: The valid segment of the read (start, end indices) if the record passes all filters. +/// - `None`: If the record does not meet the filtering criteria. +fn get_valid_segment(record: &fastq::Record, args: &Cli, aligner_option: &Option>>, trimmer_strategy: &Option>) -> Option<(usize, usize)> { + if record.is_empty() { + return None; + } + + let valid_qual = is_valid_quality(&record, args.minqual, args.maxqual); + let valid_len = is_valid_length(&record, args.minlength, args.maxlength); + + // If a GC content filter is set, validate the GC content; otherwise, assume it is valid. + let valid_gc_p = if args.mingc.is_some() || args.maxgc.is_some() { + is_valid_gc_percent(&record, args.mingc.unwrap_or(0.0), args.maxgc.unwrap_or(1.0)) + } else { true }; + + // If a contaminants filter is set, validate the reads; otherwise, assume it is valid + let is_not_contam = if let Some(ref aligner) = aligner_option { + !is_contamination(&record.seq(), aligner) + } else { true }; + + if (valid_gc_p && valid_len && valid_qual && is_not_contam) ^ args.inverse { + if let Some(ref trimmer) = trimmer_strategy { + trimmer.trim(&record) + } else { + Some((0, record.seq().len())) + } + } else { + None + } +} + /// Returns a pair of vectors containing the senders and receivers of `n_channels` /// unbounded channels. fn create_channel_pool(n_channels: usize) -> (Vec>, Vec>) { From 6e7ba0c213617f58e076dcf16d9f3ed07068c02f Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 1 Oct 2025 21:44:16 -0300 Subject: [PATCH 06/11] Replace `crossbeam` with `crossbeam-channel` to reduce unused dependencies --- Cargo.lock | 24 +----------------------- Cargo.toml | 2 +- src/main.rs | 5 +++-- 3 files changed, 5 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8046655..4657b86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,7 +240,7 @@ dependencies = [ "bio", "bzip2", "clap", - "crossbeam", + "crossbeam-channel", "flate2", "minimap2", "rayon", @@ -311,19 +311,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" -dependencies = [ - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -352,15 +339,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" diff --git a/Cargo.toml b/Cargo.toml index 65b0e97..3e034b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ description = "A tool to filter fastq files" [dependencies] bio = "2.2.0" clap = { version = "4.5.37", features = ["derive"] } -crossbeam = "0.8" +crossbeam-channel = "0.5" rayon = "1.7.0" approx = "0.5.1" minimap2 = "0.1.23+minimap2.2.28" diff --git a/src/main.rs b/src/main.rs index 3fd5bed..1a77790 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use bio::io::fastq; use clap::{ Parser, ValueEnum}; -use crossbeam::channel::{Receiver, Select, Sender}; +use crossbeam_channel::{Receiver, Sender, Select}; use minimap2::*; use rayon::prelude::*; use std::error::Error; @@ -268,6 +268,7 @@ where let stdout = std::io::stdout(); let mut writer = BufWriter::new(stdout.lock()); + let mut sel = Select::new(); for r in &receivers { sel.recv(&r); @@ -386,7 +387,7 @@ fn create_channel_pool(n_channels: usize) -> (Vec>, Vec Date: Sat, 4 Oct 2025 15:32:39 -0300 Subject: [PATCH 07/11] Fix bug in trimming and add check for minimum segment length --- src/main.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 6e15996..41c5e18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -381,7 +381,11 @@ fn get_valid_segment(record: &fastq::Record, args: &Cli, aligner_option: &Option if (valid_gc_p && valid_len && valid_qual && is_not_contam) ^ args.inverse { if let Some(ref trimmer) = trimmer_strategy { - trimmer.trim(&record) + trimmer.trim(&record).into_iter() + // Verify minimum length for each segment + .filter(|&(start, end)| { + args.minlength <= end - start + }).collect() } else { vec![(0, record.seq().len())] } From f91534309cddf2c27a489887b6136a7425e0264a Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 9 Oct 2025 21:02:02 -0300 Subject: [PATCH 08/11] Add unit test for `record_to_string` function - Move `record_to_string` outside `WritableRecord` - Refactor `WritableRecord::write_on_buffer to accept` `BufWriter` --- src/records.rs | 170 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 140 insertions(+), 30 deletions(-) diff --git a/src/records.rs b/src/records.rs index 774ed5c..80c528f 100644 --- a/src/records.rs +++ b/src/records.rs @@ -1,4 +1,4 @@ -use std::io::{BufWriter, StdoutLock, Write}; +use std::io::{BufWriter, Write}; use bio::io::fastq; @@ -15,7 +15,7 @@ impl WritableRecord { /// * `start` - Start index (inclusive). /// * `end` - End index (exclusive). pub fn new(record: &fastq::Record, start: usize, end: usize, total_segments: usize, segment_idx: usize) -> Self { - let record = WritableRecord::record_to_string(&record, start, end, total_segments, segment_idx); + let record = record_to_string(&record, start, end, total_segments, segment_idx); WritableRecord { record, @@ -23,36 +23,146 @@ impl WritableRecord { } /// Writes the record to the provided buffer for stdout output. - pub fn write_on_buffer(&self, buf: &mut BufWriter) -> Result { + pub fn write_on_buffer(&self, buf: &mut BufWriter) -> Result { buf.write(self.record.as_bytes()) } - /// Converts a `fastq::record` into a valid FASTQ string within the range `[start..end]`. - fn record_to_string(record: &fastq::Record, start: usize, end: usize, total_segments: usize, segment_idx: usize) -> String { - // Use a single formatted string with one allocation for the header - let header = if total_segments > 1 { - // Add suffix for multiple segments - match record.desc() { - Some(d) => format!("@{}_segment_{} {}", record.id(), segment_idx + 1, d), - None => format!("@{}_segment_{}", record.id(), segment_idx + 1), - } - } else { - // Single segment, use original header - match record.desc() { - Some(d) => format!("@{} {}", record.id(), d), - None => format!("@{}", record.id()), - } - }; - - // Apply the trimming to both sequence and quality data - let seq_slice = &record.seq()[start..end]; - let qual_slice = &record.qual()[start..end]; - - format!( - "{}\n{}\n+\n{}\n", - header, - unsafe { std::str::from_utf8_unchecked(seq_slice) }, - unsafe { std::str::from_utf8_unchecked(qual_slice) } - ) +} + +/// Converts a `fastq::record` into a valid FASTQ string within the range `[start..end]`. +fn record_to_string(record: &fastq::Record, start: usize, end: usize, total_segments: usize, segment_idx: usize) -> String { + // Use a single formatted string with one allocation for the header + let header = if total_segments > 1 { + // Add suffix for multiple segments + match record.desc() { + Some(d) => format!("@{}_segment_{} {}", record.id(), segment_idx + 1, d), + None => format!("@{}_segment_{}", record.id(), segment_idx + 1), + } + } else { + // Single segment, use original header + match record.desc() { + Some(d) => format!("@{} {}", record.id(), d), + None => format!("@{}", record.id()), + } + }; + + // Apply the trimming to both sequence and quality data + let seq_slice = &record.seq()[start..end]; + let qual_slice = &record.qual()[start..end]; + + format!( + "{}\n{}\n+\n{}\n", + header, + unsafe { std::str::from_utf8_unchecked(seq_slice) }, + unsafe { std::str::from_utf8_unchecked(qual_slice) } + ) +} + +#[cfg(test)] +mod tests { + use bio::io::fastq; + + use crate::records::record_to_string; + + #[test] + fn test_completed_record_to_string() { + let record = fastq::Record::with_attrs( + "10-bases", + None, + b"AAAAAAAAAA", + b"IIIIIIIIII"); + + let start = 0; + let end = 10; + let total_segments = 1; + let segment_idx = 0; + + let expected = String::from( + "@10-bases\nAAAAAAAAAA\n+\nIIIIIIIIII\n"); + + let actual = record_to_string(&record, start, end, total_segments, segment_idx); + + assert_eq!(expected, actual); + } + + #[test] + fn test_record_to_string_one_segment() { + let record = fastq::Record::with_attrs( + "10-bases", + None, + b"TTAAAAAATT", + b"KKIIIIIIKK"); + + let start = 2; + let end = 8; + let total_segments = 1; + let segment_idx = 0; + + let expected = String::from( + "@10-bases\nAAAAAA\n+\nIIIIII\n"); + + let actual = record_to_string(&record, start, end, total_segments, segment_idx); + + assert_eq!(expected, actual); + } + + #[test] + #[should_panic] + fn test_record_to_string_with_no_valid_segment() { + let record = fastq::Record::with_attrs( + "10-bases", + None, + b"TTAAAAAATT", + b"KKIIIIIIKK"); + + let start = 8; + let end = 2; + let total_segments = 1; + let segment_idx = 0; + + + let _ = record_to_string(&record, start, end, total_segments, segment_idx); + } + + #[test] + fn test_record_to_string_multiple_segments() { + let record = fastq::Record::with_attrs( + "10-bases", + None, + b"TTAAAAAATT", + b"KKIIIIIIKK"); + + let start = 2; + let end = 8; + let total_segments = 2; + let segment_idx = 1; + + let expected = String::from( + "@10-bases_segment_2\nAAAAAA\n+\nIIIIII\n"); + + let actual = record_to_string(&record, start, end, total_segments, segment_idx); + + assert_eq!(expected, actual); + } + + #[test] + fn test_record_to_string_multiple_segments_with_desc() { + let record = fastq::Record::with_attrs( + "10-bases", + Some("description"), + b"TTAAAAAATT", + b"KKIIIIIIKK"); + + let start = 2; + let end = 8; + let total_segments = 2; + let segment_idx = 1; + + let expected = String::from( + "@10-bases_segment_2 description\nAAAAAA\n+\nIIIIII\n"); + + let actual = record_to_string(&record, start, end, total_segments, segment_idx); + + assert_eq!(expected, actual); } } From 03810241e53dcce0be635236557a208d9014510c Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 10 Oct 2025 21:32:07 -0300 Subject: [PATCH 09/11] Add `tests` module following Rust's conventions --- src/main.rs | 377 ++++++++++++++++++++++++++-------------------------- 1 file changed, 191 insertions(+), 186 deletions(-) diff --git a/src/main.rs b/src/main.rs index 41c5e18..7a9a179 100644 --- a/src/main.rs +++ b/src/main.rs @@ -488,200 +488,205 @@ fn cal_gc(readseq: &[u8]) -> f64 { (gc_count as f64) / (readseq.len() as f64) } -#[test] -fn test_ave_qual() { - // Original test values need to be adjusted by adding 33 to each value - assert_eq!(ave_qual(&[10+33]), 10.0); - assert!((ave_qual(&[10+33, 11+33, 12+33]) - 10.923583702678473) < 0.00001); - assert!((ave_qual(&[10+33, 11+33, 12+33, 20+33, 30+33, 40+33, 50+33]) - 14.408827647036087) < 0.00001); - assert!( - (ave_qual(&[ - 17+33, 19+33, 11+33, 5+33, 3+33, 19+33, 22+33, 24+33, 20+33, 22+33, 30+33, 31+33, 32+33, 20+33, 21+33, 30+33, 28+33, 10+33, 13+33, 12+33, 18+33, 18+33, - 18+33, 19+33, 24+33, 25+33, 35+33, 33+33, 34+33, 35+33, 34+33, 27+33, 29+33, 25+33, 21+33, 18+33, 19+33, 12+33, 14+33, 15+33, 24+33, 26+33, 24+33, 7+33, - 12+33, 17+33, 17+33, 19+33, 17+33, 8+33, 14+33, 15+33, 13+33, 15+33, 9+33, 3+33, 4+33, 23+33, 23+33, 29+33, 23+33, 10+33, 29+33, 30+33, 31+33, 27+33, 25+33, - 14+33, 2+33, 13+33, 19+33, 14+33, 13+33, 13+33, 3+33, 2+33, 10+33, 17+33, 19+33, 25+33, 27+33, 20+33, 19+33, 11+33, 5+33, 7+33, 8+33, 8+33, 5+33, 2+33, 10+33, - 12+33, 16+33, 18+33, 16+33, 14+33, 12+33, 15+33, 2+33, 3+33, 11+33, 10+33, 15+33, 17+33, 17+33, 16+33, 13+33, 18+33, 26+33, 26+33, 23+33, 25+33, 23+33, - 18+33, 16+33, 33+33, 30+33, 26+33, 26+33, 21+33, 23+33, 8+33, 8+33, 11+33, 11+33, 6+33, 14+33, 19+33, 22+33, 20+33, 20+33, 18+33, 17+33, 20+33, 23+33, - 24+33, 28+33, 28+33, 28+33, 21+33, 20+33, 25+33, 27+33, 37+33, 28+33, 36+33, 29+33, 24+33, 27+33, 16+33, 18+33, 12+33, 8+33, 5+33, 3+33, 4+33, 6+33, 5+33, - 4+33, 4+33, 2+33, 10+33, 12+33, 6+33, 9+33, 9+33, 15+33, 16+33, 11+33, 10+33, 8+33, 8+33, 4+33, 3+33, 5+33, 4+33, 6+33, 15+33, 10+33, 9+33, 8+33, 7+33, 12+33, 4+33, - 5+33, 11+33, 12+33, 17+33, 13+33, 11+33, 17+33, 16+33, 4+33, 4+33, 5+33, 5+33, 12+33, 18+33, 17+33, 21+33 - ]) - 10.017407548271677) - < 0.00001 - ) -} +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ave_qual() { + // Original test values need to be adjusted by adding 33 to each value + assert_eq!(ave_qual(&[10+33]), 10.0); + assert!((ave_qual(&[10+33, 11+33, 12+33]) - 10.923583702678473) < 0.00001); + assert!((ave_qual(&[10+33, 11+33, 12+33, 20+33, 30+33, 40+33, 50+33]) - 14.408827647036087) < 0.00001); + assert!( + (ave_qual(&[ + 17+33, 19+33, 11+33, 5+33, 3+33, 19+33, 22+33, 24+33, 20+33, 22+33, 30+33, 31+33, 32+33, 20+33, 21+33, 30+33, 28+33, 10+33, 13+33, 12+33, 18+33, 18+33, + 18+33, 19+33, 24+33, 25+33, 35+33, 33+33, 34+33, 35+33, 34+33, 27+33, 29+33, 25+33, 21+33, 18+33, 19+33, 12+33, 14+33, 15+33, 24+33, 26+33, 24+33, 7+33, + 12+33, 17+33, 17+33, 19+33, 17+33, 8+33, 14+33, 15+33, 13+33, 15+33, 9+33, 3+33, 4+33, 23+33, 23+33, 29+33, 23+33, 10+33, 29+33, 30+33, 31+33, 27+33, 25+33, + 14+33, 2+33, 13+33, 19+33, 14+33, 13+33, 13+33, 3+33, 2+33, 10+33, 17+33, 19+33, 25+33, 27+33, 20+33, 19+33, 11+33, 5+33, 7+33, 8+33, 8+33, 5+33, 2+33, 10+33, + 12+33, 16+33, 18+33, 16+33, 14+33, 12+33, 15+33, 2+33, 3+33, 11+33, 10+33, 15+33, 17+33, 17+33, 16+33, 13+33, 18+33, 26+33, 26+33, 23+33, 25+33, 23+33, + 18+33, 16+33, 33+33, 30+33, 26+33, 26+33, 21+33, 23+33, 8+33, 8+33, 11+33, 11+33, 6+33, 14+33, 19+33, 22+33, 20+33, 20+33, 18+33, 17+33, 20+33, 23+33, + 24+33, 28+33, 28+33, 28+33, 21+33, 20+33, 25+33, 27+33, 37+33, 28+33, 36+33, 29+33, 24+33, 27+33, 16+33, 18+33, 12+33, 8+33, 5+33, 3+33, 4+33, 6+33, 5+33, + 4+33, 4+33, 2+33, 10+33, 12+33, 6+33, 9+33, 9+33, 15+33, 16+33, 11+33, 10+33, 8+33, 8+33, 4+33, 3+33, 5+33, 4+33, 6+33, 15+33, 10+33, 9+33, 8+33, 7+33, 12+33, 4+33, + 5+33, 11+33, 12+33, 17+33, 13+33, 11+33, 17+33, 16+33, 4+33, 4+33, 5+33, 5+33, 12+33, 18+33, 17+33, 21+33 + ]) - 10.017407548271677) + < 0.00001 + ) + } -#[ignore] -#[test] -fn test_filter() { - filter( - &mut std::fs::File::open("test-data/test.fastq").unwrap(), - Cli { - minlength: 100, - maxlength: 100000, - minqual: 5.0, - maxqual: 200.0, - trim_approach: Some(TrimApproach::FixedCrop), - cutoff: None, - headcrop: 10, - tailcrop: 10, - threads: 1, - contam: None, - inverse: false, - input: None, - mingc: Some(0.0), - maxgc: Some(1.0), - }, - ); -} + #[ignore] + #[test] + fn test_filter() { + filter( + &mut std::fs::File::open("test-data/test.fastq").unwrap(), + Cli { + minlength: 100, + maxlength: 100000, + minqual: 5.0, + maxqual: 200.0, + trim_approach: Some(TrimApproach::FixedCrop), + cutoff: None, + headcrop: 10, + tailcrop: 10, + threads: 1, + contam: None, + inverse: false, + input: None, + mingc: Some(0.0), + maxgc: Some(1.0), + }, + ); + } -#[ignore] -#[test] -fn test_filter_with_trim_by_quality_approach() { - filter( - &mut std::fs::File::open("test-data/test.fastq").unwrap(), - Cli { - minlength: 100, - maxlength: 100000, - minqual: 5.0, - maxqual: 200.0, - trim_approach: Some(TrimApproach::TrimByQuality), - cutoff: Some(10), - headcrop: 0, - tailcrop: 0, - threads: 1, - contam: None, - inverse: false, - input: None, - mingc: Some(0.0), - maxgc: Some(1.0), - }, - ); -} + #[ignore] + #[test] + fn test_filter_with_trim_by_quality_approach() { + filter( + &mut std::fs::File::open("test-data/test.fastq").unwrap(), + Cli { + minlength: 100, + maxlength: 100000, + minqual: 5.0, + maxqual: 200.0, + trim_approach: Some(TrimApproach::TrimByQuality), + cutoff: Some(10), + headcrop: 0, + tailcrop: 0, + threads: 1, + contam: None, + inverse: false, + input: None, + mingc: Some(0.0), + maxgc: Some(1.0), + }, + ); + } -#[ignore] -#[test] -fn test_filter_with_best_read_segment_approach() { - filter( - &mut std::fs::File::open("test-data/test.fastq").unwrap(), - Cli { - minlength: 100, - maxlength: 100000, - minqual: 5.0, - maxqual: 200.0, - trim_approach: Some(TrimApproach::BestReadSegment), - cutoff: Some(10), - headcrop: 0, - tailcrop: 0, - threads: 1, - contam: None, - inverse: false, - input: None, - mingc: Some(0.0), - maxgc: Some(1.0), - }, - ); -} + #[ignore] + #[test] + fn test_filter_with_best_read_segment_approach() { + filter( + &mut std::fs::File::open("test-data/test.fastq").unwrap(), + Cli { + minlength: 100, + maxlength: 100000, + minqual: 5.0, + maxqual: 200.0, + trim_approach: Some(TrimApproach::BestReadSegment), + cutoff: Some(10), + headcrop: 0, + tailcrop: 0, + threads: 1, + contam: None, + inverse: false, + input: None, + mingc: Some(0.0), + maxgc: Some(1.0), + }, + ); + } -#[test] -fn test_contam() { - let t: usize = 8; - let aligner = setup_contamination_filter("test-data/random_contam.fa", &t); - let rec = fastq::Reader::new(std::fs::File::open("test-data/test.fastq").unwrap()) - .records() - .next() - .unwrap() - .unwrap(); - assert!(is_contamination(&rec.seq(), &aligner)); -} + #[test] + fn test_contam() { + let t: usize = 8; + let aligner = setup_contamination_filter("test-data/random_contam.fa", &t); + let rec = fastq::Reader::new(std::fs::File::open("test-data/test.fastq").unwrap()) + .records() + .next() + .unwrap() + .unwrap(); + assert!(is_contamination(&rec.seq(), &aligner)); + } -#[test] -fn test_no_contam() { - let t: usize = 8; - let aligner = setup_contamination_filter("test-data/random_contam.fa", &t); - let rec = fastq::Reader::new(std::fs::File::open("test-data/other-test.fastq").unwrap()) - .records() - .next() - .unwrap() - .unwrap(); - assert!(!is_contamination(&rec.seq(), &aligner)); -} + #[test] + fn test_no_contam() { + let t: usize = 8; + let aligner = setup_contamination_filter("test-data/random_contam.fa", &t); + let rec = fastq::Reader::new(std::fs::File::open("test-data/other-test.fastq").unwrap()) + .records() + .next() + .unwrap() + .unwrap(); + assert!(!is_contamination(&rec.seq(), &aligner)); + } -#[ignore] -#[test] -fn test_filter_with_contam() { - filter( - &mut std::fs::File::open("test-data/test.fastq").unwrap(), - Cli { - minlength: 100, - maxlength: 100000, - minqual: 5.0, - maxqual: 100.0, - trim_approach: Some(TrimApproach::FixedCrop), - cutoff: None, - headcrop: 10, - tailcrop: 10, - threads: 1, - contam: Some("test-data/random_contam.fa".to_owned()), - inverse: false, - input: None, - mingc: Some(0.0), - maxgc: Some(1.0), - }, - ); -} + #[ignore] + #[test] + fn test_filter_with_contam() { + filter( + &mut std::fs::File::open("test-data/test.fastq").unwrap(), + Cli { + minlength: 100, + maxlength: 100000, + minqual: 5.0, + maxqual: 100.0, + trim_approach: Some(TrimApproach::FixedCrop), + cutoff: None, + headcrop: 10, + tailcrop: 10, + threads: 1, + contam: Some("test-data/random_contam.fa".to_owned()), + inverse: false, + input: None, + mingc: Some(0.0), + maxgc: Some(1.0), + }, + ); + } -#[test] -fn test_record_qual_len() { - fastq::Reader::new(std::fs::File::open("test-data/test.fastq").unwrap()) - .records() - .for_each(|record| { - let record = record.unwrap(); - if !record.is_empty() { - let read_len = record.seq().len(); - let quals = record.qual(); - assert_eq!( - read_len, - quals.len(), - "length read doesn't equal length qual" - ); - } - }) -} + #[test] + fn test_record_qual_len() { + fastq::Reader::new(std::fs::File::open("test-data/test.fastq").unwrap()) + .records() + .for_each(|record| { + let record = record.unwrap(); + if !record.is_empty() { + let read_len = record.seq().len(); + let quals = record.qual(); + assert_eq!( + read_len, + quals.len(), + "length read doesn't equal length qual" + ); + } + }) + } -#[test] -fn test_quals() { - let rec = fastq::Reader::new(std::fs::File::open("test-data/test.fastq").unwrap()) - .records() - .next() - .unwrap() - .unwrap(); - let quals = &rec.qual()[0..100] - .iter() - .map(|i| i - 33) - .collect::>(); - assert_eq!( - quals, - &vec![ - 17, 19, 11, 5, 3, 19, 22, 24, 20, 22, 30, 31, 32, 20, 21, 30, 28, 10, 13, 12, 18, 18, - 18, 19, 24, 25, 35, 33, 34, 35, 34, 27, 29, 25, 21, 18, 19, 12, 14, 15, 24, 26, 24, 7, - 12, 17, 17, 19, 17, 8, 14, 15, 13, 15, 9, 3, 4, 23, 23, 29, 23, 10, 29, 30, 31, 27, 25, - 14, 2, 13, 19, 14, 13, 13, 3, 2, 10, 17, 19, 25, 27, 20, 19, 11, 5, 7, 8, 8, 5, 2, 10, - 12, 16, 18, 16, 14, 12, 15, 2, 3 - ], - "quals not as expected!" - ) -} + #[test] + fn test_quals() { + let rec = fastq::Reader::new(std::fs::File::open("test-data/test.fastq").unwrap()) + .records() + .next() + .unwrap() + .unwrap(); + let quals = &rec.qual()[0..100] + .iter() + .map(|i| i - 33) + .collect::>(); + assert_eq!( + quals, + &vec![ + 17, 19, 11, 5, 3, 19, 22, 24, 20, 22, 30, 31, 32, 20, 21, 30, 28, 10, 13, 12, 18, 18, + 18, 19, 24, 25, 35, 33, 34, 35, 34, 27, 29, 25, 21, 18, 19, 12, 14, 15, 24, 26, 24, 7, + 12, 17, 17, 19, 17, 8, 14, 15, 13, 15, 9, 3, 4, 23, 23, 29, 23, 10, 29, 30, 31, 27, 25, + 14, 2, 13, 19, 14, 13, 13, 3, 2, 10, 17, 19, 25, 27, 20, 19, 11, 5, 7, 8, 8, 5, 2, 10, + 12, 16, 18, 16, 14, 12, 15, 2, 3 + ], + "quals not as expected!" + ) + } -#[test] -fn phred_score_to_probability_test() { - let cases: [(u8, f64); 4] = [ - (20, 0.01), // Q20 - (30, 0.001), // Q30 - (15, 0.03162277660168379), // Q15 - (25, 0.0031622776601683794), // Q25 - ]; - - for (phred, prob) in cases { - assert_eq!(phred_score_to_probability(phred), prob); + #[test] + fn phred_score_to_probability_test() { + let cases: [(u8, f64); 4] = [ + (20, 0.01), // Q20 + (30, 0.001), // Q30 + (15, 0.03162277660168379), // Q15 + (25, 0.0031622776601683794), // Q25 + ]; + + for (phred, prob) in cases { + assert_eq!(phred_score_to_probability(phred), prob); + } } } From b96707e472dc8a0a04a7ce7312d0ab5e49602017 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 10 Oct 2025 21:51:16 -0300 Subject: [PATCH 10/11] Add unit tests for `get_valid_segment` function to validate `minlength` before and after trimming --- src/main.rs | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/src/main.rs b/src/main.rs index 7a9a179..6b75157 100644 --- a/src/main.rs +++ b/src/main.rs @@ -492,6 +492,94 @@ fn cal_gc(readseq: &[u8]) -> f64 { mod tests { use super::*; + /// Simple mock trimming strategy that returns predefined segments + struct MockTrimmer { + segments: Vec<(usize, usize)>, + } + + impl MockTrimmer { + fn new(segments: Vec<(usize, usize)>) -> Self { + MockTrimmer { segments } + } + } + + impl TrimStrategy for MockTrimmer { + fn trim(&self, _record: &fastq::Record) -> Vec<(usize, usize)> { + self.segments.clone() + } + } + + fn make_record(seq: &str, qual: &str) -> fastq::Record { + fastq::Record::with_attrs("read1", None, seq.as_bytes(), qual.as_bytes()) + } + + fn default_args() -> Cli { + Cli { + minqual: 0.0, + maxqual: 1000.0, + minlength: 5, + maxlength: usize::MAX, + mingc: None, + maxgc: None, + contam: None, + trim_approach: None, + cutoff: None, + headcrop: 0, + tailcrop: 0, + threads: 1, + input: None, + inverse: false, + } + } + + #[test] + fn test_empty_record_returns_empty_vec() { + let record = fastq::Record::new(); + let args = default_args(); + + let result = get_valid_segment(&record, &args, &None, &None); + assert!(result.is_empty(), "Expected empty result for empty record"); + } + + #[test] + fn test_too_short_record_filtered_before_trimming() { + let record = make_record("ATGC", "IIII"); // length 4 + let mut args = default_args(); + args.minlength = 5; + + let result = get_valid_segment(&record, &args, &None, &None); + assert!(result.is_empty(), "Expected record to be filtered out due to minlength"); + } + + #[test] + fn test_valid_before_and_after_trimming() { + let record = make_record("ATGCGTACGA", "IIIIIIIIII"); + let mut args = default_args(); + args.minlength = 5; + + let trimmer = MockTrimmer::new(vec![(0, 10)]); + let trimmer = Some(Arc::new(trimmer) as Arc); + + let result = get_valid_segment(&record, &args, &None, &trimmer); + assert_eq!(result, vec![(0, 10)], "Record should remain valid after trimming"); + } + + #[test] + fn test_multiple_segments_only_some_valid() { + let record = make_record("ATGCGTACGA", "IIIIIIIIII"); + let mut args = default_args(); + args.minlength = 4; + + // Return three segments, the second one is too short + let trimmer = MockTrimmer::new(vec![(0, 3), (3, 5), (5, 10)]); + let trimmer = Some(Arc::new(trimmer) as Arc); + + let result = get_valid_segment(&record, &args, &None, &trimmer); + + // Only (5,10) has length >=4 + assert_eq!(result, vec![(5, 10)]); + } + #[test] fn test_ave_qual() { // Original test values need to be adjusted by adding 33 to each value From 0a126eb0bf1709a1adb45d47bf2010ac3c17a3bf Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 14 Oct 2025 00:05:49 -0300 Subject: [PATCH 11/11] Rename function `get_valid_segment` to `get_valid_segments` to reflect that it returns multiple segments --- src/main.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6b75157..749e5e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -238,7 +238,7 @@ where total_reads = total_reads.saturating_add(1); - let valid_segments = get_valid_segment(&record, &args, &aligner_option, &trimmer_strategy); + let valid_segments = get_valid_segments(&record, &args, &aligner_option, &trimmer_strategy); valid_segments.iter().enumerate() .map(|(i, (start, end))| WritableRecord::new(&record, *start, *end, valid_segments.len(),i)) .for_each(|writable_record| { @@ -331,7 +331,7 @@ where total_reads_.fetch_add(1, Ordering::Relaxed); - let valid_segments = get_valid_segment(&record, &args, &aligner_option, &trimmer_strategy); + let valid_segments = get_valid_segments(&record, &args, &aligner_option, &trimmer_strategy); let valid_segments = valid_segments.iter().enumerate() .map(|(i, (start, end))| WritableRecord::new(&record, *start, *end, valid_segments.len(),i)).collect(); @@ -361,7 +361,7 @@ where /// # Returns /// - `Vec<(usize, usize)>`: A vector containing the valid segments of the read (start and end indices) /// if the record passes all filters. -fn get_valid_segment(record: &fastq::Record, args: &Cli, aligner_option: &Option>>, trimmer_strategy: &Option>) -> Vec<(usize, usize)> { +fn get_valid_segments(record: &fastq::Record, args: &Cli, aligner_option: &Option>>, trimmer_strategy: &Option>) -> Vec<(usize, usize)> { if record.is_empty() { return vec![]; } @@ -537,7 +537,7 @@ mod tests { let record = fastq::Record::new(); let args = default_args(); - let result = get_valid_segment(&record, &args, &None, &None); + let result = get_valid_segments(&record, &args, &None, &None); assert!(result.is_empty(), "Expected empty result for empty record"); } @@ -547,7 +547,7 @@ mod tests { let mut args = default_args(); args.minlength = 5; - let result = get_valid_segment(&record, &args, &None, &None); + let result = get_valid_segments(&record, &args, &None, &None); assert!(result.is_empty(), "Expected record to be filtered out due to minlength"); } @@ -560,7 +560,7 @@ mod tests { let trimmer = MockTrimmer::new(vec![(0, 10)]); let trimmer = Some(Arc::new(trimmer) as Arc); - let result = get_valid_segment(&record, &args, &None, &trimmer); + let result = get_valid_segments(&record, &args, &None, &trimmer); assert_eq!(result, vec![(0, 10)], "Record should remain valid after trimming"); } @@ -574,7 +574,7 @@ mod tests { let trimmer = MockTrimmer::new(vec![(0, 3), (3, 5), (5, 10)]); let trimmer = Some(Arc::new(trimmer) as Arc); - let result = get_valid_segment(&record, &args, &None, &trimmer); + let result = get_valid_segments(&record, &args, &None, &trimmer); // Only (5,10) has length >=4 assert_eq!(result, vec![(5, 10)]);