Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- "singularity"
test_name:
- "test"
- "test_full"
isMaster:
- ${{ github.base_ref == 'main' }}
# Exclude conda and singularity on dev
Expand Down Expand Up @@ -82,4 +83,7 @@ jobs:

- name: "Run pipeline with test data ${{ matrix.NXF_VER }} | ${{ matrix.test_name }} | ${{ matrix.profile }}"
run: |
nextflow run ${GITHUB_WORKSPACE} -profile ${{ matrix.test_name }},${{ matrix.profile }} --outdir ./results
nextflow run ${GITHUB_WORKSPACE} \
-profile ${{ matrix.test_name }},${{ matrix.profile }} \
-stub \
--outdir ./results
2 changes: 2 additions & 0 deletions assets/test/fetchngs_samplesheet.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
sample,run_accession,instrument_platform,instrument_model,library_strategy,library_layout,fastq_1,fastq_2
test,SRR123456,ILLUMINA,HiSeq2500,WGS,PAIRED,assets/test/reads_1.fastq.gz,assets/test/reads_2.fastq.gz
Empty file added assets/test/reads_1.fastq.gz
Empty file.
Empty file added assets/test/reads_2.fastq.gz
Empty file.
19 changes: 11 additions & 8 deletions conf/test_full.config
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ params {
config_profile_name = 'Full test profile'
config_profile_description = 'Full test dataset to check pipeline function'

// Input data for full size test
// Input data for full size tests
// Specify the paths to your full test data
// Give any required params for the test so that command line flags are not needed
input = "${projectDir}/assets/test_full/full_samplesheet.csv"

// Fasta references
fasta = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/assembly/release/gfLaeSulp1.1/insdc/GCA_927399515.1.fasta.gz"
accession = "GCA_927399515.1"
taxon = "Laetiporus sulphureus"
input = "${projectDir}/assets/test/fetchngs_samplesheet.csv"
fetchngs_samplesheet = true
align = true
mask = true

// Fasta references (same as your minimal test, but we exercise BUSCO download)
fasta = "https://tolit.cog.sanger.ac.uk/test-data/Meles_meles/assembly/release/mMelMel3.1_paternal_haplotype/GCA_922984935.2.subset.phiXspike.fasta.gz"
accession = "GCA_922984935.2"
taxon = "Meles meles"

// Databases
taxdump = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/new_taxdump/new_taxdump.tar.gz"
busco = "/lustre/scratch123/tol/resources/busco/latest"
taxdump = "https://tolit.cog.sanger.ac.uk/test-data/resources/new_taxdump.tar.gz"
blastp = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/resources/gfLaeSulp1.1.buscogenes.dmnd.tar.gz"
blastx = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/resources/gfLaeSulp1.1.buscoregions.dmnd.tar.gz"
blastn = "https://tolit.cog.sanger.ac.uk/test-data/Laetiporus_sulphureus/resources/nt_gfLaeSulp1.1.tar.gz"
Expand Down
Empty file added dummy.fa
Empty file.
158 changes: 81 additions & 77 deletions subworkflows/local/input_check.nf
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ include { JSONIFY_TAXDUMP } from '../../modules/local/jsonify_taxdump'

workflow INPUT_CHECK {
take:
samplesheet // channel: /path/to/samplesheet
fasta // channel: [ meta, path(fasta) ]
taxon // channel: val(taxon)
busco_lin // channel: val([busco_lin])
lineage_tax_ids // channel: /path/to/lineage_tax_ids
databases
samplesheet // channel: /path/to/samplesheet
fasta // channel: [ meta, path(fasta) ]
taxon // channel: val(taxon)
busco_lin // channel: val([busco_lin])
lineage_tax_ids // channel: /path/to/lineage_tax_ids
databases

main:
ch_versions = Channel.empty()
Expand All @@ -31,7 +31,7 @@ workflow INPUT_CHECK {
ch_dbs_for_untar = databases
.branch { db_meta, db_path ->
untar: db_path.name.endsWith( ".tar.gz" )
skip: true
skip: ! db_path.name.endsWith(".tar.gz")
}

// Untar the databases
Expand Down Expand Up @@ -66,21 +66,30 @@ workflow INPUT_CHECK {
.fromList(samplesheetToList(samplesheet, "assets/schema_fetchngs_input.json"))
.map {it[0]}
.branch { row ->
paired: row.fastq_2
// Reformat for CAT_CAT
[[id: row.run_accession, row:row], [row.fastq_1, row.fastq_2]]
not_paired: true
paired: row.fastq_2 != null
not_paired: row.fastq_2 == null
}
.set { reads_pairedness }

CAT_CAT ( reads_pairedness.paired )
ch_versions = ch_versions.mix ( CAT_CAT.out.versions.first() )
reads_pairedness.paired
.map { row ->
def meta = [ id: row.run_accession, row: row ]
def files = [ row.fastq_1, row.fastq_2 ]
[ meta, files ]
}
.set { reads_for_cat_cat }

CAT_CAT(reads_for_cat_cat)
ch_versions = ch_versions.mix( CAT_CAT.out.versions.first() )

CAT_CAT.out.file_out
| map { meta, file -> meta.row + [fastq_1: file] }
| mix ( reads_pairedness.not_paired )
| map { create_data_channels_from_fetchngs(it) }
| set { read_files }
.map { meta, file ->
// reconstruct a “row” map with the new fastq_1 path
meta.row + [ fastq_1: file ]
}
.mix( reads_pairedness.not_paired )
.map { row -> create_data_channels_from_fetchngs(row) }
.set { read_files }

} else {
Channel
Expand Down Expand Up @@ -114,7 +123,7 @@ workflow INPUT_CHECK {
ch_databases.blastn,
lineage_tax_ids,
reads.collect(flat: false).ifEmpty([]),
db_paths.collect(flat: false),
db_paths.collect(flat: false)
)
ch_versions = ch_versions.mix(GENERATE_CONFIG.out.versions.first())

Expand All @@ -123,32 +132,25 @@ workflow INPUT_CHECK {
// Parse the CSV file
//
GENERATE_CONFIG.out.csv
| map { meta, csv -> csv }
| splitCsv(header: ['key', 'value'])
| branch {
taxon_id: it.key == "taxon_id"
return it.value
busco_lineage: it.key == "busco_lineage"
return it.value
}
| set { ch_parsed_csv }

.map { meta, csv -> csv }
.splitCsv(header: ['key','value'])
.branch { rec -> // branch *only* on boolean tests
taxon_id: rec.key == 'taxon_id'
busco_lineage: rec.key == 'busco_lineage'
}
.set { ch_parsed_csv }

//
// Get the taxon ID if we do taxon filtering in blast* searches
//
// 1. taxon_id → single value (apply skip_taxon_filtering if set)
ch_parsed_csv.taxon_id
| map { params.skip_taxon_filtering ? '' : it }
| first
| set { ch_taxon_id }


//
// Get the BUSCO linages
//
.map { rec -> params.skip_taxon_filtering ? '' : rec.value }
.first()
.set { ch_taxon_id }
// 2. busco_lineage → potentially multiple lineages
ch_parsed_csv.busco_lineage
| collect
| set { ch_busco_lineages }
.map { rec -> rec.value }
.collect()
.set { ch_busco_lineages }


// Format pre-computed BUSCOs (if provided)
// Parse the BUSCO output directories
Expand All @@ -161,6 +163,7 @@ workflow INPUT_CHECK {
}
}


// Remove any invalid lineages from precomputed_busco
ch_busco_lineages_list = ch_busco_lineages.flatten()
ch_parsed_busco_filtered = ch_parsed_busco
Expand All @@ -169,54 +172,55 @@ workflow INPUT_CHECK {
}
ch_parsed_busco_filtered = ch_parsed_busco_filtered.ifEmpty { Channel.value([]) }


//
// Get the BUSCO path if set
//
ch_databases.busco
| map { _, db_path -> db_path }
| ifEmpty( [] )
| set { ch_busco_db }
.map { _, db_path -> db_path }
.ifEmpty { Channel.empty() }
.set { ch_busco_db }


//
// Convert the taxdump to a JSON file if there isn't one yet
//
ch_databases.taxdump
| filter { meta, db_path -> ! db_path.isFile() }
| map { meta, db_path -> [meta, db_path, db_path.listFiles().find { it.getName().endsWith('.json') }] }
| branch { meta, db_path, json_path ->
json: json_path
return [meta, json_path]
dir: true
return [meta, db_path]
}
| set { taxdump_dirs }

JSONIFY_TAXDUMP( taxdump_dirs.dir )
ch_versions = ch_versions.mix(JSONIFY_TAXDUMP.out.versions.first())
.filter { meta, db -> ! db.isFile() }
.map { meta, db ->
def existing = db.listFiles().find { it.name.endsWith('.json') }
[ meta, db, existing ]
}
.branch { meta, db, existing ->
needsJson: existing == null
hasJson : existing != null
}
.set { taxdump_branched }

ch_databases.taxdump
| filter { meta, db_path -> db_path.isFile() }
| mix ( taxdump_dirs.json )
| mix( JSONIFY_TAXDUMP.out.json )
| map { _, db_path -> db_path }
| set { ch_taxdump }
taxdump_needsJson = taxdump_branched.needsJson
taxdump_hasJson = taxdump_branched.hasJson

// 1) Generate new JSON for those that need it
JSONIFY_TAXDUMP( taxdump_needsJson.map { meta, db, _ -> [meta, db] } )
ch_versions = ch_versions.mix( JSONIFY_TAXDUMP.out.versions.first() )
// 2) Merge the newly‐made JSON with the ones we already had
taxdump_hasJson.map { meta, db, existing -> existing }
.mix( JSONIFY_TAXDUMP.out.json.map { meta, json -> json } )
.mix( ch_databases.taxdump.filter { meta, db -> db.isFile() }.map { meta, db -> db } )
.set { ch_taxdump }

emit:
reads // channel: [ val(meta), path(datafile) ]
config = GENERATE_CONFIG.out.yaml // channel: [ val(meta), path(yaml) ]
synonyms_tsv = GENERATE_CONFIG.out.synonyms_tsv // channel: [ val(meta), path(tsv) ]
categories_tsv = GENERATE_CONFIG.out.categories_tsv // channel: [ val(meta), path(tsv) ]
taxon_id = ch_taxon_id // channel: val(taxon_id)
busco_lineages = ch_busco_lineages // channel: val([busco_lin])
blastn = ch_databases.blastn.first() // channel: [ val(meta), path(blastn_db) ]
blastp = ch_databases.blastp.first() // channel: [ val(meta), path(blastp_db) ]
blastx = ch_databases.blastx.first() // channel: [ val(meta), path(blastx_db) ]
precomputed_busco = ch_parsed_busco // channel: [ val(meta), path(busco_run_dir) ]
busco_db = ch_busco_db.first() // channel: [ path(busco_db) ]
taxdump = ch_taxdump.first() // channel: [ path(taxdump) ]
versions = ch_versions // channel: [ versions.yml ]
reads // channel: [ val(meta), path(datafile) ]
config = GENERATE_CONFIG.out.yaml // channel: [ val(meta), path(yaml) ]
synonyms_tsv = GENERATE_CONFIG.out.synonyms_tsv // channel: [ val(meta), path(tsv) ]
categories_tsv = GENERATE_CONFIG.out.categories_tsv // channel: [ val(meta), path(tsv) ]
taxon_id = ch_taxon_id // channel: val(taxon_id)
busco_lineages = ch_busco_lineages // channel: val([busco_lin])
blastn = ch_databases.blastn.first() // channel: [ val(meta), path(blastn_db) ]
blastp = ch_databases.blastp.first() // channel: [ val(meta), path(blastp_db) ]
blastx = ch_databases.blastx.first() // channel: [ val(meta), path(blastx_db) ]
precomputed_busco = ch_parsed_busco // channel: [ val(meta), path(busco_run_dir) ]
busco_db = ch_busco_db.first() // channel: [ path(busco_db) ]
taxdump = ch_taxdump.first() // channel: [ path(taxdump) ]
versions = ch_versions // channel: [ versions.yml ]
}

// Function to get list of [ meta, datafile ]
Expand Down
Loading