如何使用Nextflow dsl2处理来自S3的多个输入(yaml/json)。

huangapple go评论114阅读模式
英文:

How to process multiple input (yaml/json) from S3 using Nextflow dsl2

问题

I see you have a complex Nextflow workflow, and you want to make several modifications. However, it's a lengthy code with multiple sections and dependencies. It's challenging to address all these modifications in a single response. It's best to tackle one specific issue or modification at a time.

If you have a specific question or need help with a particular aspect of your code or workflow, please provide more specific details, and I'll do my best to assist you.

英文:

I need to process over 1k samples with the nextflow (dsl2) pipeline in aws batch. current version of the workflow process single input per run. I'm looking workflow syntax (map tuple to iterate) to process multiple inputs to run in parralel. The inputs should be in json or yaml format, path to the input files are unique to each sample.

To preserve the input file path "s3://..." I used .fromPath in channel.

Following is my single sample input config input.yaml (-parms-file)

  1. id: HLA1001
  2. bam: s3://HLA/data/HLA1001.bam
  3. vcf: s3://HLA/data/HLA1001.vcf.gz

Workflow to run single sample input

  1. process samtools_stats {
  2. tag "${id}"
  3. publishDir "${params.publishdir}/${id}/samtools", mode: "copy"
  4. input:
  5. path bam
  6. output:
  7. path "${id}.stats"
  8. script:
  9. """
  10. samtools stats ${bam} > ${id}.stats
  11. """
  12. }
  13. process mosdepth_bam {
  14. tag "${id}"
  15. publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"
  16. input:
  17. path bam
  18. path bam_idx
  19. output:
  20. path "${id}.regions.bed.gz"
  21. script:
  22. """
  23. mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 ${id} ${bam}
  24. """
  25. }
  26. process mosdepth_cram {
  27. tag "${id}"
  28. publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"
  29. input:
  30. path bam
  31. path bam_idx
  32. path reference
  33. path reference_idx
  34. output:
  35. path "${id}.regions.bed.gz"
  36. script:
  37. """
  38. mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 --fasta ${reference} ${id} ${bam}
  39. """
  40. }
  41. process bcftools_stats {
  42. tag "${id}"
  43. publishDir "${params.publishdir}/${id}/bcftools", mode: "copy"
  44. input:
  45. path vcf
  46. path vcf_idx
  47. output:
  48. path "*"
  49. script:
  50. """
  51. bcftools stats -f PASS ${vcf} > ${id}.pass.stats
  52. """
  53. }
  54. process multiqc {
  55. tag "${id}"
  56. publishDir "${params.publishdir}/${id}/multiqc", mode: "copy"
  57. input:
  58. path "*"
  59. output:
  60. path "multiqc_data/*", emit: multiqc_ch
  61. script:
  62. """
  63. multiqc . --data-format json --enable-npm-plugin
  64. """
  65. }
  66. process compile_metrics {
  67. tag "${id}"
  68. publishDir "${params.publishdir}/${id}", mode: "copy"
  69. input:
  70. path multiqc
  71. output:
  72. path "${params.id}.metrics.json", emit: compile_metrics_out
  73. script:
  74. """
  75. # parse and calculate all the metrics in the multiqc output to compile
  76. compile_metrics.py \
  77. --multiqc_json multiqc_data.json \
  78. --output_json ${params.id}.metrics.json \
  79. --biosample_id ${params.id}
  80. """
  81. }
  82. /*
  83. ----------------------------------------------------------------------
  84. WORKFLOW
  85. ---------------------------------------------------------------------
  86. */
  87. id = params.id
  88. aln_file = file ( params.bam )
  89. aln_file_type = aln_file.getExtension()
  90. vcf_file = ( params.vcf )
  91. vcf_idx = channel.fromPath(params.vcf + ".tbi", checkIfExists: true)
  92. if (aln_file_type == "bam") {
  93. cbam = channel.fromPath(params.bam, checkIfExists: true)
  94. cbam_idx = channel.fromPath(params.bam + ".bai", checkIfExists: true)
  95. }
  96. else if (aln_file_type == "cram") {
  97. cbam = channel.fromPath(params.bam, checkIfExists: true)
  98. cbam_idx = channel.fromPath(params.bam + ".crai", checkIfExists: true)
  99. }
  100. reference = channel.fromPath(params.reference, checkIfExists: true)
  101. reference_idx = channel.fromPath(params.reference + ".fai", checkIfExists: true)
  102. // main
  103. workflow {
  104. if (aln_file_type == "bam") {
  105. samtools_stats( bam )
  106. mosdepth_bam( bam, bam_idx )
  107. bcftools_stats ( vcf, vcf_idx )
  108. multiqc( samtools_stats.out.mix( mosdepth_bam.out ).collect() )
  109. compile_metrics(multiqc.out)
  110. } else if (aln_file_type == "cram") {
  111. samtools_stats( bam )
  112. mosdepth_cram( bam, bam_idx, reference, reference_idx )
  113. bcftools_stats ( vcf, vcf_idx )
  114. multiqc( samtools_stats.out.mix( mosdepth_cram.out ).collect() )
  115. compile_metrics(multiqc.out)
  116. }
  117. }

I want to modify the workflow to run for the following multi sample input in parellel

  1. samples:
  2. -
  3. id: HLA1001
  4. bam: s3://HLA/data/udf/HLA1001.bam
  5. vcf: s3://HLA/data/udf/HLA1001.vcf.gz
  6. -
  7. id: NHLA1002
  8. bam: s3://HLA/data/sdd/HLA1002.bam
  9. vcf: s3://HLA/data/sdd/HLA1002.vcf.gz
  10. -
  11. id: NHLA1003
  12. bam: s3://HLA/data/klm/HLA1003.bam
  13. vcf: s3://HLA/data/klm/HLA1003.vcf.gz
  14. -
  15. id: NHLA2000
  16. bam: s3://HLA/data/rcb/HLA2000.bam
  17. vcf: s3://HLA/data/rcb/HLA2000.vcf.gz

The expected final output folder structure for the multiple samples..

  1. s3://mybucket/results/HLA1001/
  2. samtools/
  3. mosdepth/
  4. bcftools/
  5. multiqc/
  6. metrics/HLA1001.metrics.json
  7. s3://mybucket/results/HLA1002/
  8. samtools/
  9. mosdepth/
  10. bcftools/
  11. multiqc/
  12. metrics/HLA1002.metrics.json

The input of bam/cram, vcf and input of multiqc and compile_metrics all must fetch the same sample in every single process.

Appreciate your help! Thanks

Follwing the method answered by @steve..

Contents of main.nf: update

  1. include { compile_metrics } from './modules/compile_metrics'
  2. Channel
  3. .fromList( params.samples )
  4. .map { it.biosample_id }
  5. .set { sample_ids }
  6. compile_metrics ( sample_ids, multiqc.out.json_data )
  7. }

Contents of modules/compile_metrics/main.nf:

  1. process compile_metrics {
  2. tag { sample_ids }
  3. input:
  4. val(sample_ids)
  5. path "multiqc_data.json"
  6. output:
  7. tuple val(sample_ids), path("${sample_ids}.metrics.json"), emit: compile_metrics_out
  8. script:
  9. """
  10. compile_metrics.py \
  11. --multiqc_json multiqc_data.json \
  12. --output_json "${sample_ids}.metrics.json" \\
  13. --biosample_id "${sample_ids}" \\
  14. """
  15. }

Update main.nf:

  1. include { mosdepth_datamash } from './modules/mosdepth_datamash'
  2. autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )
  3. mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ).collect() )

Update mosdepth_datamash:

  1. process mosdepth_datamash {
  2. tag { sample }
  3. input:
  4. path autosomes_non_gap_regions
  5. tuple val(sample), path(regions)
  6. output:
  7. tuple val(sample), path("${sample}.mosdepth.csv"), emit: coverage
  8. script:
  9. """
  10. zcat "${sample}.regions.bed.gz" | bedtools intersect -a stdin -b ${autosomes_non_gap_regions} | gzip -9c > "${sample}.regions.autosomes_non_gap_n_bases.bed.gz"
  11. .....
  12. }

Update main.nf: fix - use queue channel instead of collect

  1. mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ) )

Process verifybamid works with file instead of channel.fromPath

  1. vbi2_ud = file( params.vbi2_ud )
  2. vbi2_bed = file( params.vbi2_bed )
  3. vbi2_mean = file( params.vbi2_mean )

How to modify the channel to handle backward (previous version of workflow single sample input format) compatible of single sample input format which lacks sample key

  1. id: HLA1001
  2. bam: s3://HLA/data/HLA1001.bam
  3. vcf: s3://HLA/data/HLA1001.vcf.gz

Content of processing the input mani.nf:

  1. Channel
  2. .fromList( params.samples )
  3. .branch { rec ->
  4. def aln_file = file( rec.bam )
  5. bam: aln_file.extension == 'bam'
  6. def bam_idx = file( "${rec.bam}.bai" )
  7. return tuple( rec.id, aln_file, bam_idx )
  8. cram: aln_file.extension == 'cram'
  9. def cram_idx = file( "${rec.bam}.crai" )
  10. return tuple( rec.id, aln_file, cram_idx )
  11. }
  12. .set { aln_inputs }
  13. Channel
  14. .fromList( params.samples )
  15. .map { rec ->
  16. def vcf_file = file( rec.vcf )
  17. def vcf_idx = file( "${rec.vcf}.tbi" )
  18. tuple( rec.id, vcf_file, vcf_idx )
  19. }
  20. .set { vcf_inputs }
  21. Channel
  22. .fromList( params.samples )
  23. .map { it.biosample_id }
  24. .set { sample_ids }

Updated main.nf works well

  1. INPUT format A or B:
  2. A)
  3. biosample_id: NA12878-chr14
  4. bam: s3://sample-qc/data/NA12878-chr14.bam
  5. B)
  6. samples:
  7. -
  8. biosample_id: NA12878-chr14
  9. bam: s3://sample-qc/data/NA12878-chr14.bam
  10. ---------------------------------------------------------------
  11. workflow {
  12. ....
  13. ....
  14. params.samples = null
  15. Channel
  16. .fromList( params.samples )
  17. .ifEmpty { ['biosample_id': params.biosample_id, 'bam': params.bam] }
  18. .branch { rec ->
  19. def aln_file = rec.bam ? file( rec.bam ) : null
  20. bam: rec.biosample_id && aln_file?.extension == 'bam'
  21. def bam_idx = file( "${rec.bam}.bai" )
  22. return tuple( rec.biosample_id, aln_file, bam_idx )
  23. cram: rec.biosample_id && aln_file?.extension == 'cram'
  24. def cram_idx = file( "${rec.bam}.crai" )
  25. return tuple( rec.biosample_id, aln_file, cram_idx )
  26. }
  27. .set { aln_inputs }
  28. Channel
  29. .fromList( params.samples )
  30. .ifEmpty { ['biosample_id': params.biosample_id] }
  31. .map { it.biosample_id }
  32. .set { sample_ids }
  33. compile_metrics ( sample_ids, multiqc.out.json_data )
  34. ...
  35. ...
  36. }

Trying other way of not duplicate the code (the above code block .ifEmpty) in each process to parse the params.sample. eg two process here required to use params.sample

  1. INPUT format A or B:
  2. A)
  3. biosample_id: NA12878-chr14
  4. bam: s3://sample-qc/data/NA12878-chr14.bam
  5. B)
  6. samples:
  7. -
  8. biosample_id: NA12878-chr14
  9. bam: s3://sample-qc/data/NA12878-chr14.bam
  10. -------------------------------------------------------------
  11. params.samples = ''
  12. // params.samples = null
  13. def get_samples_list() {
  14. if (params.samples) {
  15. return params.samples
  16. }
  17. else {
  18. return ['biosample_id': params.biosample_id, 'bam': params.bam]
  19. }
  20. }
  21. workflow {
  22. // params.samples = ''
  23. samples = get_samples_list()
  24. ...
  25. ...
  26. Channel
  27. .fromList( samples )
  28. .branch { rec ->
  29. def aln_file = rec.bam ? file( rec.bam ) : null
  30. bam: rec.biosample_id && aln_file?.extension == 'bam'
  31. def bam_idx = file( "${rec.bam}.bai" )
  32. return tuple( rec.biosample_id, aln_file, bam_idx )
  33. cram: rec.biosample_id && aln_file?.extension == 'cram'
  34. def cram_idx = file( "${rec.bam}.crai" )
  35. return tuple( rec.biosample_id, aln_file, cram_idx )
  36. }
  37. .set { aln_inputs }
  38. samtools_stats_bam( aln_inputs.bam, [] )
  39. samtools_stats_cram( aln_inputs.cram, ref_fasta )
  40. Channel
  41. .fromList( params.samples )
  42. .map { it.biosample_id }
  43. .set { sample_ids }
  44. compile_metrics ( sample_ids, multiqc.out.json_data )
  45. }

ERROR:

  1. Workflow execution stopped with the following message:
  2. Exit status : null
  3. Error message : Cannot invoke method branch() on null object
  4. Error report : Cannot invoke method branch() on null object
  5. ERROR ~ Cannot invoke method branch() on null object

Calling the samples from channel to reuse works well and much better approach.

  1. Channel
  2. .fromList( params.samples )
  3. .ifEmpty { ['biosample_id': params.biosample_id, 'bam': params.bam] }
  4. .set { samples }
  5. Channel
  6. samples.branch { rec ->
  7. ....
  8. Channel
  9. samples.map { it.biosample_id }

How to read the input.yml as an argument --input_listusing .fromList and read as list compatible with code in the 'sample channel with minimal change?
--input_list s3://mybucket/input.yaml instead of directly reading -params-file input.yaml as a list in the channel.
eg.

  1. nextflow run main.nf \
  2. -ansi-log false \
  3. // -params-file input.yaml \
  4. --input_list s3://mybucket/input.yaml
  5. -work 's3://mybucket/work' \
  6. --publish_dir 's3://mybucket/results' \
  7. --ref_fasta 's3://mybucket/ref.fa'

Current code
....

  1. Channel
  2. // .fromPath( params.input_list )
  3. .fromList( params.samples )
  4. .ifEmpty { ['biosample_id': params.biosample_id, 'bam': params.bam] }
  5. .set { samples }

input.yaml

  1. samples:
  2. -
  3. id: HLA1001
  4. bam: s3://HLA/data/udf/HLA1001.bam
  5. -
  6. id: NHLA1002
  7. bam: s3://HLA/data/sdd/HLA1002.bam

答案1

得分: 2

以下是您提供的内容的翻译:

使用通道,可以处理任意数量的样本,包括一个样本。以下是一种使用模块来处理BAM和CRAM输入的方法。请注意,下面的每个进程都期望一个输入tuple,其中第一个元素是样本名称或键。为了能够在下游合并通道时提供很大的帮助,我们还应确保输出具有相同的样本名称或键。以下内容未经AWS Batch测试,但至少可以帮助您入门:

main.nf的内容:

  1. include { mosdepth as mosdepth_bam } from './modules/mosdepth'
  2. include { mosdepth as mosdepth_cram } from './modules/mosdepth'
  3. include { multiqc } from './modules/multiqc'
  4. include { samtools_stats as samtools_stats_bam } from './modules/samtools'
  5. include { samtools_stats as samtools_stats_cram } from './modules/samtools'
  6. include { mosdepth_datamash } from './modules/mosdepth_datamash'```
  7. 工作流程:
  8. ```workflow {
  9. ref_fasta = file( params.ref_fasta )
  10. autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )
  11. Channel
  12. .fromList( params.samples )
  13. .ifEmpty { ['id': params.id, 'bam': params.bam] }
  14. .branch { rec ->
  15. def aln_file = rec.bam ? file( rec.bam ) : null
  16. bam: rec.id && aln_file?.extension == 'bam'
  17. def bam_idx = file( "${rec.bam}.bai" )
  18. return tuple( rec.id, aln_file, bam_idx )
  19. cram: rec.id && aln_file?.extension == 'cram'
  20. def cram_idx = file( "${rec.bam}.crai" )
  21. return tuple( rec.id, aln_file, cram_idx )
  22. }
  23. .set { aln_inputs }
  24. Channel
  25. .fromList( params.samples )
  26. .ifEmpty { ['id': params.id, 'vcf': params.vcf] }
  27. .branch { rec ->
  28. def vcf_file = rec.vcf ? file( rec.vcf ) : null
  29. output: rec.id && vcf_file
  30. def vcf_idx = file( "${rec.vcf}.tbi" )
  31. return tuple( rec.id, vcf_file, vcf_idx )
  32. }
  33. .set { vcf_inputs }
  34. mosdepth_bam( aln_inputs.bam, [] )
  35. mosdepth_cram( aln_inputs.cram, ref_fasta )
  36. samtools_stats_bam( aln_inputs.bam, [] )
  37. samtools_stats_cram( aln_inputs.cram, ref_fasta )
  38. bcftools_stats( vcf_inputs )
  39. Channel
  40. .empty()
  41. .mix( mosdepth_bam.out.regions )
  42. .mix( mosdepth_cram.out.regions )
  43. .set { mosdepth_regions }
  44. mosdepth_datamash( mosdepth_regions, autosomes_non_gap_regions )
  45. Channel
  46. .empty()
  47. .mix( mosdepth_bam.out.dists )
  48. .mix( mosdepth_bam.out.summary )
  49. .mix( mosdepth_cram.out.dists )
  50. .mix( mosdepth_cram.out.summary )
  51. .mix( samtools_stats_bam.out )
  52. .mix( samtools_stats_cram.out )
  53. .mix( bcftools_stats.out )
  54. .mix( mosdepth_datamash.out )
  55. .map { sample, files -> files }
  56. .collect()
  57. .set { log_files }
  58. multiqc( log_files )
  59. }```
  60. `modules/samtools/main.nf`的内容:
  61. ```process samtools_stats {
  62. tag { sample }
  63. input:
  64. tuple val(sample), path(bam), path(bai)
  65. path ref_fasta
  66. output:
  67. tuple val(sample), path("${sample}.stats")
  68. script:
  69. def reference = ref_fasta ? /--reference "${ref_fasta}"/ : ''
  70. """
  71. samtools stats \\
  72. ${reference} \\
  73. "${bam}" \\
  74. > "${sample}.stats"
  75. """
  76. }```
  77. `modules/mosdepth/main.nf`的内容:
  78. ```process mosdepth {
  79. tag { sample }
  80. input:
  81. tuple val(sample), path(bam), path(bai)
  82. path ref_fasta
  83. output:
  84. tuple val(sample), path("*.regions.bed.gz"), emit: regions
  85. tuple val(sample), path("*.dist.txt"), emit: dists
  86. tuple val(sample), path("*.summary.txt"), emit: summary
  87. script:
  88. def fasta = ref_fasta ? /--fasta "${ref_fasta}"/ : ''
  89. """
  90. mosdepth \\
  91. --no-per-base \\
  92. --by 1000 \\
  93. --mapq 20 \\
  94. --threads ${task.cpus} \\
  95. ${fasta} \\
  96. "${sample}" \\
  97. "${bam}"
  98. """
  99. }```
  100. `modules/bcftools/main.nf`的内容:
  101. ```process bcftools_stats {
  102. tag { sample }
  103. input:
  104. tuple val(sample), path(vcf), path(tbi)
  105. output:
  106. tuple val(sample), path("${sample}.pass.stats")
  107. """
  108. bcftools stats \\
  109. -f PASS \\
  110. "${vcf}" \\
  111. > "${sample}.pass.stats"
  112. """
  113. }```
  114. `modules/multiqc/main.nf`的内容:
  115. ```process multiqc {
  116. input:
  117. path 'data/*'
  118. output:
  119. path "multiqc_report.html", emit: report
  120. path "multiqc_data", emit: data
  121. """
  122. multiqc \\
  123. --data-format json \\
  124. .
  125. """
  126. }```
  127. `modules/compile_metrics/main.nf`的内容:
  128. ```process compile_metrics {
  129. tag { sample_id }
  130. input:
  131. val sample_id
  132. path multiqc_json
  133. output:
  134. tuple val(sample_id), path("${sample_id}.metrics.json")
  135. """
  136. compile_metrics.py \\
  137. --multiqc_json "${multiqc_json}" \\
  138. --output_json "${sample_id}.metrics.json" \\
  139. --biosample_id "${sample_id}"
  140. """
  141. }```
  142. `nextflow.config`的内容:
  143. ```plugins {
  144. id 'nf-amazon'
  145. }
  146. params {
  147. ref_fasta = null
  148. autosomes_non_gap_regions = null
  149. samples = null
  150. id = null
  151. bam = null
  152. vcf = null
  153. publish_dir = './results'
  154. publish_mode = 'copy'
  155. }
  156. process {
  157. executor = 'awsbatch'
  158. queue = 'test-queue'
  159. errorStrategy = 'retry'
  160. maxRetries = 3
  161. withName: 'samtools_stats' {
  162. publishDir = [
  163. path: "${params.publish_dir}/samtools",
  164. <details>
  165. <summary>英文:</summary>
  166. With [channels](https://www.nextflow.io/docs/latest/channel.html#channels) it is possible to process any number of samples, including just one. Here&#39;s one way that use [modules](https://www.nextflow.io/docs/latest/dsl2.html#modules) to handle both BAM and CRAM inputs. Note that each process below expects an input [`tuple`](https://www.nextflow.io/docs/latest/process.html#input-type-tuple) where the first element is a sample name or key. To greatly assist with being able to merge channels downstream, we should also ensure we output tuples with the same sample name or key. The following is untested on AWS Batch, but it should at least get you started:
  167. Contents of `main.nf`:

include { bcftools_stats } from './modules/bcftools'
include { mosdepth as mosdepth_bam } from './modules/mosdepth'
include { mosdepth as mosdepth_cram } from './modules/mosdepth'
include { multiqc } from './modules/multiqc'
include { samtools_stats as samtools_stats_bam } from './modules/samtools'
include { samtools_stats as samtools_stats_cram } from './modules/samtools'
include { mosdepth_datamash } from './modules/mosdepth_datamash'

workflow {

  1. ref_fasta = file( params.ref_fasta )
  2. autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )
  3. Channel
  4. .fromList( params.samples )
  5. .ifEmpty { [&#39;id&#39;: params.id, &#39;bam&#39;: params.bam] }
  6. .branch { rec -&gt;
  7. def aln_file = rec.bam ? file( rec.bam ) : null
  8. bam: rec.id &amp;&amp; aln_file?.extension == &#39;bam&#39;
  9. def bam_idx = file( &quot;${rec.bam}.bai&quot; )
  10. return tuple( rec.id, aln_file, bam_idx )
  11. cram: rec.id &amp;&amp; aln_file?.extension == &#39;cram&#39;
  12. def cram_idx = file( &quot;${rec.bam}.crai&quot; )
  13. return tuple( rec.id, aln_file, cram_idx )
  14. }
  15. .set { aln_inputs }
  16. Channel
  17. .fromList( params.samples )
  18. .ifEmpty { [&#39;id&#39;: params.id, &#39;vcf&#39;: params.vcf] }
  19. .branch { rec -&gt;
  20. def vcf_file = rec.vcf ? file( rec.vcf ) : null
  21. output: rec.id &amp;&amp; vcf_file
  22. def vcf_idx = file( &quot;${rec.vcf}.tbi&quot; )
  23. return tuple( rec.id, vcf_file, vcf_idx )
  24. }
  25. .set { vcf_inputs }
  26. mosdepth_bam( aln_inputs.bam, [] )
  27. mosdepth_cram( aln_inputs.cram, ref_fasta )
  28. samtools_stats_bam( aln_inputs.bam, [] )
  29. samtools_stats_cram( aln_inputs.cram, ref_fasta )
  30. bcftools_stats( vcf_inputs )
  31. Channel
  32. .empty()
  33. .mix( mosdepth_bam.out.regions )
  34. .mix( mosdepth_cram.out.regions )
  35. .set { mosdepth_regions }
  36. mosdepth_datamash( mosdepth_regions, autosomes_non_gap_regions )
  37. Channel
  38. .empty()
  39. .mix( mosdepth_bam.out.dists )
  40. .mix( mosdepth_bam.out.summary )
  41. .mix( mosdepth_cram.out.dists )
  42. .mix( mosdepth_cram.out.summary )
  43. .mix( samtools_stats_bam.out )
  44. .mix( samtools_stats_cram.out )
  45. .mix( bcftools_stats.out )
  46. .mix( mosdepth_datamash.out )
  47. .map { sample, files -&gt; files }
  48. .collect()
  49. .set { log_files }
  50. multiqc( log_files )

}

  1. Contents of `modules/samtools/main.nf`:

process samtools_stats {

  1. tag { sample }
  2. input:
  3. tuple val(sample), path(bam), path(bai)
  4. path ref_fasta
  5. output:
  6. tuple val(sample), path(&quot;${sample}.stats&quot;)
  7. script:
  8. def reference = ref_fasta ? /--reference &quot;${ref_fasta}&quot;/ : &#39;&#39;
  9. &quot;&quot;&quot;
  10. samtools stats \\
  11. ${reference} \\
  12. &quot;${bam}&quot; \\
  13. &gt; &quot;${sample}.stats&quot;
  14. &quot;&quot;&quot;

}

  1. Contents of `modules/mosdepth/main.nf`:

process mosdepth {

  1. tag { sample }
  2. input:
  3. tuple val(sample), path(bam), path(bai)
  4. path ref_fasta
  5. output:
  6. tuple val(sample), path(&quot;*.regions.bed.gz&quot;), emit: regions
  7. tuple val(sample), path(&quot;*.dist.txt&quot;), emit: dists
  8. tuple val(sample), path(&quot;*.summary.txt&quot;), emit: summary
  9. script:
  10. def fasta = ref_fasta ? /--fasta &quot;${ref_fasta}&quot;/ : &#39;&#39;
  11. &quot;&quot;&quot;
  12. mosdepth \\
  13. --no-per-base \\
  14. --by 1000 \\
  15. --mapq 20 \\
  16. --threads ${task.cpus} \\
  17. ${fasta} \\
  18. &quot;${sample}&quot; \\
  19. &quot;${bam}&quot;
  20. &quot;&quot;&quot;

}

  1. Contents of `modules/bcftools/main.nf`:

process bcftools_stats {

  1. tag { sample }
  2. input:
  3. tuple val(sample), path(vcf), path(tbi)
  4. output:
  5. tuple val(sample), path(&quot;${sample}.pass.stats&quot;)
  6. &quot;&quot;&quot;
  7. bcftools stats \\
  8. -f PASS \\
  9. &quot;${vcf}&quot; \\
  10. &gt; &quot;${sample}.pass.stats&quot;
  11. &quot;&quot;&quot;

}

  1. Contents of `modules/multiqc/main.nf`:

process multiqc {

  1. input:
  2. path &#39;data/*&#39;
  3. output:
  4. path &quot;multiqc_report.html&quot;, emit: report
  5. path &quot;multiqc_data&quot;, emit: data
  6. &quot;&quot;&quot;
  7. multiqc \\
  8. --data-format json \\
  9. .
  10. &quot;&quot;&quot;

}

  1. Contents of `modules/compile_metrics/main.nf`:

process compile_metrics {

  1. tag { sample_id }
  2. input:
  3. val sample_id
  4. path multiqc_json
  5. output:
  6. tuple val(sample_id), path(&quot;${sample_id}.metrics.json&quot;)
  7. &quot;&quot;&quot;
  8. compile_metrics.py \\
  9. --multiqc_json &quot;${multiqc_json}&quot; \\
  10. --output_json &quot;${sample_id}.metrics.json&quot; \\
  11. --biosample_id &quot;${sample_id}&quot;
  12. &quot;&quot;&quot;

}

  1. Contents of `./modules/mosdepth_datamash/main.nf`:

process mosdepth_datamash {

  1. tag { sample_id }
  2. input:
  3. tuple val(sample_id), path(regions_bed)
  4. path autosomes_non_gap_regions
  5. output:
  6. tuple val(sample_id), path(&quot;${sample_id}.mosdepth.csv&quot;)
  7. &quot;&quot;&quot;
  8. zcat -f &quot;${regions_bed}&quot; |
  9. bedtools intersect -a stdin -b &quot;${autosomes_non_gap_regions}&quot; |
  10. gzip -9 &gt; &quot;${sample_id}.regions.autosomes_non_gap_n_bases.bed.gz&quot;
  11. # do something
  12. touch &quot;${sample_id}.mosdepth.csv&quot;
  13. &quot;&quot;&quot;

}

  1. Contents of `nextflow.config`:

plugins {

  1. id &#39;nf-amazon&#39;

}

params {

  1. ref_fasta = null
  2. autosomes_non_gap_regions = null
  3. samples = null
  4. id = null
  5. bam = null
  6. vcf = null
  7. publish_dir = &#39;./results&#39;
  8. publish_mode = &#39;copy&#39;

}

process {

  1. executor = &#39;awsbatch&#39;
  2. queue = &#39;test-queue&#39;
  3. errorStrategy = &#39;retry&#39;
  4. maxRetries = 3
  5. withName: &#39;samtools_stats&#39; {
  6. publishDir = [
  7. path: &quot;${params.publish_dir}/samtools&quot;,
  8. mode: params.publish_mode,
  9. ]
  10. }
  11. withName: &#39;bcftools_stats&#39; {
  12. publishDir = [
  13. path: &quot;${params.publish_dir}/bcftools&quot;,
  14. mode: params.publish_mode,
  15. ]
  16. }
  17. withName: &#39;mosdepth&#39; {
  18. cpus = 4
  19. publishDir = [
  20. path: &quot;${params.publish_dir}/mosdepth&quot;,
  21. mode: params.publish_mode,
  22. ]
  23. }
  24. withName: &#39;multiqc&#39; {
  25. publishDir = [
  26. path: &quot;${params.publish_dir}/multiqc&quot;,
  27. mode: params.publish_mode,
  28. ]
  29. }

}

aws {

  1. region = &#39;us-east-1&#39;
  2. batch {
  3. cliPath = &#39;/home/ec2-user/miniconda/bin/aws&#39;
  4. }

}

  1. And run using something like:

$ nextflow run main.nf
-ansi-log false
-params-file input.yaml
-work 's3://mybucket/work'
--publish_dir 's3://mybucket/results'
--ref_fasta 's3://mybucket/ref.fa'

  1. </details>

huangapple
  • 本文由 发表于 2023年5月10日 19:16:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/76217754.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定