diff --git a/aws-params.yaml b/aws-params.yaml index ba0ae3e..5c2aeea 100644 --- a/aws-params.yaml +++ b/aws-params.yaml @@ -1,8 +1,9 @@ -readsdir: "s3://vkc-nextflow/rawfastq/" +readsdir: "s3://vkc-nextflow/scratch/" outdir: "s3://vkc-nextflow/output/" human_genome: "s3://biobakery-databases/kneaddata_databases/" -metaphlan_db: "s3://biobakery-databases/metaphlan_databases/" +metaphlan_db: "s3://biobakery-databases/metaphlan_v4_databases/" humann_bowtie_db: "s3://biobakery-databases/humann_databases/chocophlan" humann_protein_db: "s3://biobakery-databases/humann_databases/uniref" humann_utility_db: "s3://biobakery-databases/humann_databases/utility_mapping" -filepattern: "*_L00{1,2,3,4}_R{1,2}_001.fastq.gz" \ No newline at end of file +filepattern: "*_{1,2}.fastq.gz" +sra_list: "s3://vkc-nextflow/sra_accessions.txt" \ No newline at end of file diff --git a/main.nf b/main.nf index e99896d..32fdf79 100755 --- a/main.nf +++ b/main.nf @@ -14,14 +14,8 @@ workflow { human_genome = params.human_genome metaphlan_db = params.metaphlan_db - humann_bowtie_db = params.humann_bowtie_db - humann_protein_db = params.humann_protein_db - humann_utility_db = params.humann_utility_db knead_out = kneaddata(read_pairs_ch, human_genome) - metaphlan_out = metaphlan(knead_out[0], knead_out[1], metaphlan_db) + metaphlan_out = metaphlan(knead_out[0], metaphlan_db) metaphlan_bzip = metaphlan_bzip(metaphlan_out[0], metaphlan_out[4]) - humann_out = humann(metaphlan_out[0], metaphlan_out[1], metaphlan_out[2], humann_bowtie_db, humann_protein_db) - regroup_out = humann_regroup(humann_out[0], humann_out[1], humann_utility_db) - humann_rename(regroup_out, humann_utility_db) } diff --git a/metaphlan_only.nf b/metaphlan_only.nf new file mode 100644 index 0000000..e4f02fc --- /dev/null +++ b/metaphlan_only.nf @@ -0,0 +1,18 @@ +#!/usr/bin/env nextflow +nextflow.enable.dsl=2 + +include { metaphlan; metaphlan_bzip } from './processes/metaphlan.nf' + +workflow { + + read_pairs_ch = Channel + .fromFilePairs( + [ "$params.readsdir/$params.filepattern", + "$params.readsdir/*_kneaddata.fastq.gz" ], + size:-1) + + metaphlan_db = params.metaphlan_db + + metaphlan_out = metaphlan(read_pairs_ch, metaphlan_db) + metaphlan_bzip = metaphlan_bzip(metaphlan_out[0], metaphlan_out[4]) +} diff --git a/nextflow.config b/nextflow.config index 0097e86..25c1d71 100644 --- a/nextflow.config +++ b/nextflow.config @@ -46,6 +46,15 @@ profiles { process { executor = 'awsbatch' + + withName: prefetch_and_split { + maxForks = 1 + memory = '8.G' + time = '2.h' + cpus = 2 + container = 'public.ecr.aws/j5i5h1i5/bzip2sra:mamba-v1.0' + queue = 'Nextflow-IOPS' + } withName: kneaddata { maxForks = 4 @@ -59,10 +68,10 @@ profiles { withName: metaphlan { maxForks = 4 - memory = '8.G' + memory = '32.G' time = '12.h' cpus = 8 - container = 'public.ecr.aws/j5i5h1i5/metaphlan-nodb:mamba-v3.1' + container = 'public.ecr.aws/j5i5h1i5/metaphlan-nodb:mamba-v4.1' queue = 'Nextflow-metaphlan' } diff --git a/processes/metaphlan.nf b/processes/metaphlan.nf index e081e44..47ba0b7 100644 --- a/processes/metaphlan.nf +++ b/processes/metaphlan.nf @@ -3,9 +3,8 @@ process metaphlan { publishDir "$params.outdir/metaphlan", pattern: "{*.tsv}" input: - tuple val(sample), path(kneads) - path unmatched - path metaphlan_db + tuple val(sample), path(kneads) + path metaphlan_db output: val sample , emit: sample @@ -15,13 +14,9 @@ process metaphlan { path "${sample}.sam" script: - def forward = kneads[0] - def reverse = kneads[1] - def unf = unmatched[0] - def unr = unmatched[1] - + """ - cat $forward $reverse $unf $unr > ${sample}_grouped.fastq.gz + cat $kneads > ${sample}_grouped.fastq.gz metaphlan ${sample}_grouped.fastq.gz ${sample}_profile.tsv \ --bowtie2out ${sample}_bowtie2.tsv \ @@ -49,4 +44,4 @@ process metaphlan { """ bzip2 -v $sam """ -} \ No newline at end of file +} diff --git a/processes/prefetch_and_split.nf b/processes/prefetch_and_split.nf new file mode 100644 index 0000000..6a4bd06 --- /dev/null +++ b/processes/prefetch_and_split.nf @@ -0,0 +1,24 @@ +process prefetch_and_split { + tag "$sra_id" + + input: + val sra_id + + output: + tuple val(sra_id), path("${sra_id}_*.fastq.gz") + + publishDir "${params.readsdir}", mode: 'copy' + + script: + """ + echo "Prefetching $sra_id to S3" + prefetch $sra_id -v -v + + echo "Splitting $sra_id into FASTQ files directly on S3" + fasterq-dump $sra_id --split-files --threads ${task.cpus} -v -v + + echo "Compressing FASTQ files" + gzip ${sra_id}_1.fastq + gzip ${sra_id}_2.fastq + """ +} \ No newline at end of file diff --git a/tubular.nf b/tubular.nf new file mode 100644 index 0000000..631af1a --- /dev/null +++ b/tubular.nf @@ -0,0 +1,26 @@ +#!/usr/bin/env nextflow + +nextflow.enable.dsl=2 + +include { prefetch_and_split } from './processes/prefetch_and_split.nf' +include { kneaddata } from './processes/kneaddata.nf' +include { metaphlan; metaphlan_bzip } from './processes/metaphlan.nf' +include { humann; humann_regroup; humann_rename } from './processes/humann.nf' + +workflow { + + // Step 1: Read the SRA accession list and create a channel + sra_ch = Channel.fromPath(params.sra_list) + .splitText() + .map { it.trim() } + + // Step 2: Prefetch and split FASTQ files + fastq_pairs_ch = prefetch_and_split(sra_ch) + + human_genome = params.human_genome + metaphlan_db = params.metaphlan_db + + knead_out = kneaddata(fastq_pairs_ch, human_genome) + metaphlan_out = metaphlan(knead_out[0], metaphlan_db) + metaphlan_bzip = metaphlan_bzip(metaphlan_out[0], metaphlan_out[4]) +}