diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0cdbc6cd..4fcc2ec8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,7 @@ jobs: - "singularity" test_name: - "test" + - "test_full" isMaster: - ${{ github.base_ref == 'main' }} # Exclude conda and singularity on dev @@ -82,4 +83,7 @@ jobs: - name: "Run pipeline with test data ${{ matrix.NXF_VER }} | ${{ matrix.test_name }} | ${{ matrix.profile }}" run: | - nextflow run ${GITHUB_WORKSPACE} -profile ${{ matrix.test_name }},${{ matrix.profile }} --outdir ./results + nextflow run ${GITHUB_WORKSPACE} \ + -profile ${{ matrix.test_name }},${{ matrix.profile }} \ + -stub \ + --outdir ./results diff --git a/assets/test/fetchngs_samplesheet.csv b/assets/test/fetchngs_samplesheet.csv new file mode 100644 index 00000000..a58de406 --- /dev/null +++ b/assets/test/fetchngs_samplesheet.csv @@ -0,0 +1,2 @@ +sample,run_accession,instrument_platform,instrument_model,library_strategy,library_layout,fastq_1,fastq_2 +test,SRR123456,ILLUMINA,HiSeq2500,WGS,PAIRED,assets/test/reads_1.fastq.gz,assets/test/reads_2.fastq.gz diff --git a/assets/test/reads_1.fastq.gz b/assets/test/reads_1.fastq.gz new file mode 100644 index 00000000..e69de29b diff --git a/assets/test/reads_2.fastq.gz b/assets/test/reads_2.fastq.gz new file mode 100644 index 00000000..e69de29b diff --git a/conf/test_full.config b/conf/test_full.config index a86e0050..98af2569 100644 --- a/conf/test_full.config +++ b/conf/test_full.config @@ -14,19 +14,22 @@ params { config_profile_name = 'Full test profile' config_profile_description = 'Full test dataset to check pipeline function' - // Input data for full size test + // Input data for full size tests // Specify the paths to your full test data // Give any required params for the test so that command line flags are not needed - input = "${projectDir}/assets/test_full/full_samplesheet.csv" - // Fasta references - fasta = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/assembly/release/gfLaeSulp1.1/insdc/GCA_927399515.1.fasta.gz" - accession = "GCA_927399515.1" - taxon = "Laetiporus sulphureus" + input = "${projectDir}/assets/test/fetchngs_samplesheet.csv" + fetchngs_samplesheet = true + align = true + mask = true + + // Fasta references (same as your minimal test, but we exercise BUSCO download) + fasta = "https://tolit.cog.sanger.ac.uk/test-data/Meles_meles/assembly/release/mMelMel3.1_paternal_haplotype/GCA_922984935.2.subset.phiXspike.fasta.gz" + accession = "GCA_922984935.2" + taxon = "Meles meles" // Databases - taxdump = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/new_taxdump/new_taxdump.tar.gz" - busco = "/lustre/scratch123/tol/resources/busco/latest" + taxdump = "https://tolit.cog.sanger.ac.uk/test-data/resources/new_taxdump.tar.gz" blastp = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/resources/gfLaeSulp1.1.buscogenes.dmnd.tar.gz" blastx = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/resources/gfLaeSulp1.1.buscoregions.dmnd.tar.gz" blastn = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/resources/nt_gfLaeSulp1.1.tar.gz" diff --git a/dummy.fa b/dummy.fa new file mode 100644 index 00000000..e69de29b diff --git a/subworkflows/local/input_check.nf b/subworkflows/local/input_check.nf index 32507e01..8bcc5221 100644 --- a/subworkflows/local/input_check.nf +++ b/subworkflows/local/input_check.nf @@ -13,12 +13,12 @@ include { JSONIFY_TAXDUMP } from '../../modules/local/jsonify_taxdump' workflow INPUT_CHECK { take: - samplesheet // channel: /path/to/samplesheet - fasta // channel: [ meta, path(fasta) ] - taxon // channel: val(taxon) - busco_lin // channel: val([busco_lin]) - lineage_tax_ids // channel: /path/to/lineage_tax_ids - databases + samplesheet // channel: /path/to/samplesheet + fasta // channel: [ meta, path(fasta) ] + taxon // channel: val(taxon) + busco_lin // channel: val([busco_lin]) + lineage_tax_ids // channel: /path/to/lineage_tax_ids + databases main: ch_versions = Channel.empty() @@ -31,7 +31,7 @@ workflow INPUT_CHECK { ch_dbs_for_untar = databases .branch { db_meta, db_path -> untar: db_path.name.endsWith( ".tar.gz" ) - skip: true + skip: ! db_path.name.endsWith(".tar.gz") } // Untar the databases @@ -66,21 +66,30 @@ workflow INPUT_CHECK { .fromList(samplesheetToList(samplesheet, "assets/schema_fetchngs_input.json")) .map {it[0]} .branch { row -> - paired: row.fastq_2 - // Reformat for CAT_CAT - [[id: row.run_accession, row:row], [row.fastq_1, row.fastq_2]] - not_paired: true + paired: row.fastq_2 != null + not_paired: row.fastq_2 == null } .set { reads_pairedness } - CAT_CAT ( reads_pairedness.paired ) - ch_versions = ch_versions.mix ( CAT_CAT.out.versions.first() ) + reads_pairedness.paired + .map { row -> + def meta = [ id: row.run_accession, row: row ] + def files = [ row.fastq_1, row.fastq_2 ] + [ meta, files ] + } + .set { reads_for_cat_cat } + + CAT_CAT(reads_for_cat_cat) + ch_versions = ch_versions.mix( CAT_CAT.out.versions.first() ) CAT_CAT.out.file_out - | map { meta, file -> meta.row + [fastq_1: file] } - | mix ( reads_pairedness.not_paired ) - | map { create_data_channels_from_fetchngs(it) } - | set { read_files } + .map { meta, file -> + // reconstruct a “row” map with the new fastq_1 path + meta.row + [ fastq_1: file ] + } + .mix( reads_pairedness.not_paired ) + .map { row -> create_data_channels_from_fetchngs(row) } + .set { read_files } } else { Channel @@ -114,7 +123,7 @@ workflow INPUT_CHECK { ch_databases.blastn, lineage_tax_ids, reads.collect(flat: false).ifEmpty([]), - db_paths.collect(flat: false), + db_paths.collect(flat: false) ) ch_versions = ch_versions.mix(GENERATE_CONFIG.out.versions.first()) @@ -123,32 +132,25 @@ workflow INPUT_CHECK { // Parse the CSV file // GENERATE_CONFIG.out.csv - | map { meta, csv -> csv } - | splitCsv(header: ['key', 'value']) - | branch { - taxon_id: it.key == "taxon_id" - return it.value - busco_lineage: it.key == "busco_lineage" - return it.value - } - | set { ch_parsed_csv } - + .map { meta, csv -> csv } + .splitCsv(header: ['key','value']) + .branch { rec -> // branch *only* on boolean tests + taxon_id: rec.key == 'taxon_id' + busco_lineage: rec.key == 'busco_lineage' + } + .set { ch_parsed_csv } - // - // Get the taxon ID if we do taxon filtering in blast* searches - // + // 1. taxon_id → single value (apply skip_taxon_filtering if set) ch_parsed_csv.taxon_id - | map { params.skip_taxon_filtering ? '' : it } - | first - | set { ch_taxon_id } - - - // - // Get the BUSCO linages - // + .map { rec -> params.skip_taxon_filtering ? '' : rec.value } + .first() + .set { ch_taxon_id } + // 2. busco_lineage → potentially multiple lineages ch_parsed_csv.busco_lineage - | collect - | set { ch_busco_lineages } + .map { rec -> rec.value } + .collect() + .set { ch_busco_lineages } + // Format pre-computed BUSCOs (if provided) // Parse the BUSCO output directories @@ -161,6 +163,7 @@ workflow INPUT_CHECK { } } + // Remove any invalid lineages from precomputed_busco ch_busco_lineages_list = ch_busco_lineages.flatten() ch_parsed_busco_filtered = ch_parsed_busco @@ -169,54 +172,55 @@ workflow INPUT_CHECK { } ch_parsed_busco_filtered = ch_parsed_busco_filtered.ifEmpty { Channel.value([]) } + // // Get the BUSCO path if set // ch_databases.busco - | map { _, db_path -> db_path } - | ifEmpty( [] ) - | set { ch_busco_db } + .map { _, db_path -> db_path } + .ifEmpty { Channel.empty() } + .set { ch_busco_db } - // // Convert the taxdump to a JSON file if there isn't one yet - // ch_databases.taxdump - | filter { meta, db_path -> ! db_path.isFile() } - | map { meta, db_path -> [meta, db_path, db_path.listFiles().find { it.getName().endsWith('.json') }] } - | branch { meta, db_path, json_path -> - json: json_path - return [meta, json_path] - dir: true - return [meta, db_path] - } - | set { taxdump_dirs } - - JSONIFY_TAXDUMP( taxdump_dirs.dir ) - ch_versions = ch_versions.mix(JSONIFY_TAXDUMP.out.versions.first()) + .filter { meta, db -> ! db.isFile() } + .map { meta, db -> + def existing = db.listFiles().find { it.name.endsWith('.json') } + [ meta, db, existing ] + } + .branch { meta, db, existing -> + needsJson: existing == null + hasJson : existing != null + } + .set { taxdump_branched } - ch_databases.taxdump - | filter { meta, db_path -> db_path.isFile() } - | mix ( taxdump_dirs.json ) - | mix( JSONIFY_TAXDUMP.out.json ) - | map { _, db_path -> db_path } - | set { ch_taxdump } + taxdump_needsJson = taxdump_branched.needsJson + taxdump_hasJson = taxdump_branched.hasJson + // 1) Generate new JSON for those that need it + JSONIFY_TAXDUMP( taxdump_needsJson.map { meta, db, _ -> [meta, db] } ) + ch_versions = ch_versions.mix( JSONIFY_TAXDUMP.out.versions.first() ) + // 2) Merge the newly‐made JSON with the ones we already had + taxdump_hasJson.map { meta, db, existing -> existing } + .mix( JSONIFY_TAXDUMP.out.json.map { meta, json -> json } ) + .mix( ch_databases.taxdump.filter { meta, db -> db.isFile() }.map { meta, db -> db } ) + .set { ch_taxdump } emit: - reads // channel: [ val(meta), path(datafile) ] - config = GENERATE_CONFIG.out.yaml // channel: [ val(meta), path(yaml) ] - synonyms_tsv = GENERATE_CONFIG.out.synonyms_tsv // channel: [ val(meta), path(tsv) ] - categories_tsv = GENERATE_CONFIG.out.categories_tsv // channel: [ val(meta), path(tsv) ] - taxon_id = ch_taxon_id // channel: val(taxon_id) - busco_lineages = ch_busco_lineages // channel: val([busco_lin]) - blastn = ch_databases.blastn.first() // channel: [ val(meta), path(blastn_db) ] - blastp = ch_databases.blastp.first() // channel: [ val(meta), path(blastp_db) ] - blastx = ch_databases.blastx.first() // channel: [ val(meta), path(blastx_db) ] - precomputed_busco = ch_parsed_busco // channel: [ val(meta), path(busco_run_dir) ] - busco_db = ch_busco_db.first() // channel: [ path(busco_db) ] - taxdump = ch_taxdump.first() // channel: [ path(taxdump) ] - versions = ch_versions // channel: [ versions.yml ] + reads // channel: [ val(meta), path(datafile) ] + config = GENERATE_CONFIG.out.yaml // channel: [ val(meta), path(yaml) ] + synonyms_tsv = GENERATE_CONFIG.out.synonyms_tsv // channel: [ val(meta), path(tsv) ] + categories_tsv = GENERATE_CONFIG.out.categories_tsv // channel: [ val(meta), path(tsv) ] + taxon_id = ch_taxon_id // channel: val(taxon_id) + busco_lineages = ch_busco_lineages // channel: val([busco_lin]) + blastn = ch_databases.blastn.first() // channel: [ val(meta), path(blastn_db) ] + blastp = ch_databases.blastp.first() // channel: [ val(meta), path(blastp_db) ] + blastx = ch_databases.blastx.first() // channel: [ val(meta), path(blastx_db) ] + precomputed_busco = ch_parsed_busco // channel: [ val(meta), path(busco_run_dir) ] + busco_db = ch_busco_db.first() // channel: [ path(busco_db) ] + taxdump = ch_taxdump.first() // channel: [ path(taxdump) ] + versions = ch_versions // channel: [ versions.yml ] } // Function to get list of [ meta, datafile ]