Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions singlem/lyrebird.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def main():
sequences = args.forward,
reverse_read_files = args.reverse,
input_sra_files = args.sra_files,
read_chunk_size = args.read_chunk_size,
read_chunk_number = args.read_chunk_number,
otu_table = args.otu_table,
archive_otu_table = args.archive_otu_table,
sleep_after_mkfifo = args.sleep_after_mkfifo,
Expand Down
4 changes: 1 addition & 3 deletions singlem/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def add_less_common_pipe_arguments(argument_group, extra_args=False):
argument_group.add_argument('--read-chunk-size',
type=int,
metavar='num_reads',
help='Size chunk to process at a time. Requires unwrapped sequence input. If input is FASTA, chunk size is the number of reads. If the input is FASTQ, chunk size is half the specified number of reads. Chunk size must be divisible by 2 to ensure compatibility with FASTQ input. Requires --read-chunk-number.')
help='Number of reads per chunk. Requires unwrapped sequence input. Both FASTA and FASTQ inputs are supported; the chunk size is the number of reads in each chunk. Requires --read-chunk-number.')
argument_group.add_argument('--read-chunk-number',
type=int,
metavar='chunk_number',
Expand Down Expand Up @@ -251,8 +251,6 @@ def validate_pipe_args(args, subparser='pipe'):
raise Exception("Can't use --read-chunk-size with more than one --sra-file")
if args.read_chunk_size and args.genome_fasta_files:
raise Exception("Can't use --read-chunk-size with input genomes currently")
if args.read_chunk_size and args.read_chunk_size % 2 != 0:
raise Exception("--read-chunk-size must be divisible by 2 to ensure compatibility with FASTQ input")

def add_condense_arguments(parser):
input_condense_arguments = parser.add_argument_group("Input arguments (1+ required)")
Expand Down
19 changes: 13 additions & 6 deletions singlem/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ def prepare_zstd_fifos(file_paths, temp_dir, sleep_after_mkfifo=None):
prepared_paths.append(fifo_path)
return prepared_paths, processes

def add_chunking_pipe(read_chunk_size, read_chunk_number):
# Pipe to extract a chunk of reads. We assume fasta format here i.e.
# 2 lines per read
start_offset = (read_chunk_size * (read_chunk_number - 1)) * 2 + 1
head = read_chunk_size * 2
def add_chunking_pipe(read_chunk_size, read_chunk_number, lines_per_read=2):
# Pipe to extract a chunk of reads.
start_offset = (read_chunk_size * (read_chunk_number - 1)) * lines_per_read + 1
head = read_chunk_size * lines_per_read
logging.debug(f"Chunking pipe: tail -n +{start_offset} | head -n {head} (chunk_size={read_chunk_size}, chunk_num={read_chunk_number}, lines_per_read={lines_per_read})")
return f" | tail -n +{start_offset} | head -n {head}"

def prepare_chunking_fifos(file_paths, temp_dir, read_chunk_size, read_chunk_number, sleep_after_mkfifo=None):
Expand All @@ -104,6 +104,13 @@ def prepare_chunking_fifos(file_paths, temp_dir, read_chunk_size, read_chunk_num
base = os.path.basename(path)
if base.endswith('.gz'):
base = base[:-3]
if base.endswith(('.fq', '.fastq')):
Comment on lines 105 to +107
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Support zstd filenames when inferring chunk input format

prepare_chunking_fifos now strips only .gz before checking FASTA/FASTQ suffixes, so non-.gz compressed names fall through to the new exception path. In SearchPipe.run, chunking is applied after prepare_zstd_fifos, which renames zstd inputs to FIFOs like *.zst.fifo; with --read-chunk-size/--read-chunk-number, these files no longer match the FASTA/FASTQ suffix checks and the pipeline aborts instead of chunking reads. This introduces a regression for chunked zstd input workflows.

Useful? React with 👍 / 👎.

lines_per_read = 4
elif base.endswith(('.fa', '.fasta', '.fna')):
lines_per_read = 2
else:
raise Exception(f"Cannot determine format (FASTA or FASTQ) for chunking from file extension: {path}")
logging.debug(f"Detected {'FASTQ' if lines_per_read == 4 else 'FASTA'} format for {path} ({lines_per_read} lines/read)")
chunking_dir = os.path.join(temp_dir, "chunking")
os.makedirs(chunking_dir, exist_ok=True)
fifo_path = os.path.join(chunking_dir, base)
Expand All @@ -123,7 +130,7 @@ def prepare_chunking_fifos(file_paths, temp_dir, read_chunk_size, read_chunk_num
read_cmd = f"gzip -dc {shlex.quote(path)}"
else:
read_cmd = f"cat {shlex.quote(path)}"
cmd = f"{read_cmd} {add_chunking_pipe(read_chunk_size, read_chunk_number)} > {shlex.quote(fifo_path)}"
cmd = f"{read_cmd} {add_chunking_pipe(read_chunk_size, read_chunk_number, lines_per_read)} > {shlex.quote(fifo_path)}"
logging.debug("Running chunking command: {}".format(cmd))
process = subprocess.Popen(
['bash','-c',cmd],
Expand Down
Loading