nextflow – spltiCSV – each element – error : 如果需要重复使用相同的组件

huangapple go评论198阅读模式
英文:

nextflow - spltiCSV - each element - error : If you need to reuse the same component

问题

对于一个Nextflow管道,我想要读取一个包含五列的CSV文件:

  1. sample1,path/normal_R1.fastq,path/normal_R2.fastq,path/tumor_R1.fastq,path/tumor_R2.fastq
  2. sample2,path/normal_R1.fastq,path/normal_R2.fastq,path/tumor_R1.fastq,path/tumor_R2.fastq

我读取了文件,创建了一个LinkedHashMap。对于每个元素,我想运行一些处理过程。在没有CSV迭代的情况下,这些处理过程一直正常工作,因为它们是通过肿瘤文件的通道和正常文件的通道提供的。

当我编辑带有CSV的代码时,我收到以下错误消息:

  1. Process 'FASTP' has been already used -- If you need to reuse the same
  2. component, include it with a different name or include it in a
  3. different workflow context

以下是代码:

  1. include { FASTP} from './fastp_process.nf'
  2. include {bwa_index} from './index_process.nf'
  3. include { align_bwa_mem} from './bwamem_process_already_index.nf'
  4. include { gatk_markduplicates} from './gatk_markduplicates_process.nf'
  5. include {setupnmdtags} from './setupnmdtags_process.nf'
  6. include { recalibrate_bam } from './recalibratebam_process.nf'
  7. include { applybqsr } from './applybqsr_process.nf'
  8. include { mutect2 } from './mutect2_process.nf'
  9. include { lancet } from './lancet_process.nf'
  10. include { manta } from './manta_process.nf'
  11. include { strelka } from './strelka_process.nf'
  12. include { gatk_merge_vcfs } from './gatk_merge_vcfs.nf'
  13. workflow {
  14. def csvFile = file("input_nextflow_files.csv")
  15. def csvLines = csvFile.text.readLines()
  16. def sampleMap = csvLines.collectEntries { line ->
  17. def lineCols = line.split(',')
  18. if (lineCols.size() >= 5) {
  19. def sampleName = lineCols[0]
  20. def normalR1 = file(lineCols[1])
  21. def normalR2 = file(lineCols[2])
  22. def tumorR1 = file(lineCols[3])
  23. def tumorR2 = file(lineCols[4])
  24. [(sampleName): [tuple(normalR1, normalR2), tuple(tumorR1, tumorR2)]]
  25. } else {
  26. return [:]
  27. }
  28. }
  29. sampleMap.each { sampleName, pairList ->
  30. def normalPair = pairList[0]
  31. def tumorPair = pairList[1]
  32. FASTP(tumorPair,normalPair,sampleName)
  33. align_bwa_mem(FASTP.out.reads_tumor,FASTP.out.reads_normal) //already_created index
  34. }
  35. }

我认为与下面的FASTP处理过程(input)有关:

  1. process FASTP {
  2. maxForks 3
  3. debug true
  4. input:
  5. path(reads_tumor) //val outdir //不适用于 path (outdir) // 传递多个读取 - 用于肿瘤和正常
  6. path(reads_normal) //val outdir //不适用于 path (outdir)
  7. val (sample_name)
  8. output:
  9. tuple val(sample_name), path("${sample_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
  10. path("${sample_id_tumor}.fastp.json"), emit: json_tumor
  11. path("${sample_id_tumor}.fastp.html"), emit: html_tumor
  12. tuple val(sample_id_normal), path("${sample_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
  13. path("${sample_id_normal}.fastp.json"), emit: json_normal
  14. path("${sample_id_normal}.fastp.html"), emit: html_normal
  15. script:
  16. def (r1_normal, r2_normal) = reads_normal
  17. def (r1_tumor, r2_tumor)=reads_tumor
  18. """
  19. ml fastp
  20. fastp --in1 "${r1_normal}" --in2 "${r2_normal}" -q 20 -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_normal}_trim_1.fq.gz" --out2 "${sample_id_normal}_trim_2.fq.gz" --json "${sample_id_normal}.fastp.json" --html "${sample_id_normal}.fastp.html" --thread 12
  21. fastp --in1 "${r1_tumor}" --in2 "${r2_tumor}" -q 20 -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_tumor}_trim_1.fq.gz" --out2 "${sample_id_tumor}_trim_2.fq.gz" --json "${sample_id_tumor}.fastp.json" --html "${sample_id_tumor}.fastp.html" --thread 12
  22. echo "Exiting fastp"
  23. """
  24. }

我不知道如何解决这个错误。我多次检查了是否多次包含了FASTP处理过程,但没有问题。我删除了包含和FASTP调用过程,但它们仍然不起作用。因此,我无法弄清楚出了什么问题。

英文:

For a nextflow pipeline I'd like to read in a CSV file with five columns:

  1. sample1,path/normal_R1.fastq,path/normal_R2.fastq,path/tumor_R1.fastq,path/tumor_R2.fastq
  2. sample2,path/normal_R1.fastq,path/normal_R2.fastq,path/tumor_R1.fastq,path/tumor_R2.fastq

I read in the file, create a linkedHashMap. For each element I'd like to run few processes. The processes has been working fine without CSV iteration, as they were provided by a channel of tumor files and a channel of normal files.

When I edit the code with a CSV, I get error as:

> Process 'FASTP' has been already used -- If you need to reuse the same
> component, include it with a different name or include it in a
> different workflow context

Below is the code:

  1. include { FASTP} from './fastp_process.nf'
  2. include {bwa_index} from './index_process.nf'
  3. include { align_bwa_mem} from './bwamem_process_already_index.nf'
  4. include { gatk_markduplicates} from './gatk_markduplicates_process.nf'
  5. include {setupnmdtags} from './setupnmdtags_process.nf'
  6. include { recalibrate_bam } from './recalibratebam_process.nf'
  7. include { applybqsr } from './applybqsr_process.nf'
  8. include { mutect2 } from './mutect2_process.nf'
  9. include { lancet } from './lancet_process.nf'
  10. include { manta } from './manta_process.nf'
  11. include { strelka } from './strelka_process.nf'
  12. include { gatk_merge_vcfs } from './gatk_merge_vcfs.nf'
  13. workflow {
  14. def csvFile = file("input_nextflow_files.csv")
  15. def csvLines = csvFile.text.readLines()
  16. def sampleMap = csvLines.collectEntries { line ->
  17. def lineCols = line.split(',')
  18. if (lineCols.size() >= 5) {
  19. def sampleName = lineCols[0]
  20. def normalR1 = file(lineCols[1])
  21. def normalR2 = file(lineCols[2])
  22. def tumorR1 = file(lineCols[3])
  23. def tumorR2 = file(lineCols[4])
  24. [(sampleName): [tuple(normalR1, normalR2), tuple(tumorR1, tumorR2)]]
  25. } else {
  26. return [:]
  27. }
  28. }
  29. sampleMap.each { sampleName, pairList ->
  30. def normalPair = pairList[0]
  31. def tumorPair = pairList[1]
  32. FASTP(tumorPair,normalPair,sampleName)
  33. align_bwa_mem(FASTP.out.reads_tumor,FASTP.out.reads_normal) //already_created index
  34. }
  35. }

I believe it is something related to the FASTP process below (input):

  1. process FASTP {
  2. maxForks 3
  3. debug true
  4. input:
  5. path(reads_tumor) //val outdir //doesn't work with path (outdir) // we pass multiple reads - for tumor and normal
  6. path(reads_normal) //val outdir //doesn't work with path (outdir)
  7. val (sample_name)
  8. output:
  9. tuple val(sample_name), path("${sample_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
  10. path("${sample_id_tumor}.fastp.json"), emit: json_tumor
  11. path("${sample_id_tumor}.fastp.html"), emit: html_tumor
  12. tuple val(sample_id_normal), path("${sample_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
  13. path("${sample_id_normal}.fastp.json"), emit: json_normal
  14. path("${sample_id_normal}.fastp.html"), emit: html_normal
  15. script:
  16. def (r1_normal, r2_normal) = reads_normal
  17. def (r1_tumor, r2_tumor)=reads_tumor
  18. """
  19. ml fastp
  20. fastp --in1 "${r1_normal}" --in2 "${r2_normal}" -q 20 -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_normal}_trim_1.fq.gz" --out2 "${sample_id_normal}_trim_2.fq.gz" --json "${sample_id_normal}.fastp.json" --html "${sample_id_normal}.fastp.html" --thread 12
  21. fastp --in1 "${r1_tumor}" --in2 "${r2_tumor}" -q 20 -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_tumor}_trim_1.fq.gz" --out2 "${sample_id_tumor}_trim_2.fq.gz" --json "${sample_id_tumor}.fastp.json" --html "${sample_id_tumor}.fastp.html" --thread 12
  22. echo "Exiting fastp"
  23. """
  24. }

I do not know how to fix this error. I checked if multiple times I'm not including FASTP process it is fine. I remove include and FASTP calling process they didn't work. So I cannot figure out what's going on.

答案1

得分: 1

当您使用each迭代遍历样本映射时,实际上是在每次迭代中尝试重用_FASTP_和_align_bwa_mem_进程。Nextflow只是在提醒,如果它们(即进程)需要被重用,它们需要以不同的名称包括(即使用模块别名)或在不同的工作流上下文中包括(即使用子工作流)。实现您想要的更好方式是使用通道splitCSV操作符,例如:

  1. params.samples_csv = 'input_nextflow_files.csv';
  2. include { FASTP } from './fastp_process.nf';
  3. workflow {
  4. def header = ['sampleName', 'normalR1', 'normalR2', 'tumorR1', 'tumorR2'];
  5. Channel
  6. .fromPath(params.samples_csv)
  7. .splitCsv(header: header)
  8. .multiMap { row ->
  9. def tumor_reads = tuple(file(row.tumorR1), file(row.tumorR2));
  10. def normal_reads = tuple(file(row.normalR1), file(row.normalR2));
  11. tumor:
  12. tuple(row.sampleName, tumor_reads);
  13. normal:
  14. tuple(row.sampleName, normal_reads);
  15. }
  16. .set { samples }
  17. FASTP(samples.tumor.mix(samples.normal))
  18. ...
  19. }

或者,如果您想要更多灵活性,另一种方式是使用模块别名导入_FASTP_:

  1. params.samples_csv = 'input_nextflow_files.csv';
  2. include { FASTP as FASTP_TUMOR } from './fastp_process.nf';
  3. include { FASTP as FASTP_NORMAL } from './fastp_process.nf';
  4. workflow {
  5. ...
  6. FASTP_TUMOR(samples.tumor)
  7. FASTP_NORMAL(samples.normal)
  8. ...
  9. }

./fastp_process.nf的内容如下:

  1. process FASTP {
  2. tag { sample_id }
  3. input:
  4. tuple val(sample_id), path(reads, stageAs: 'reads/*')
  5. output:
  6. tuple val(sample_id), path("${sample_id}_trim_{1,2}.fq.gz"), emit: reads
  7. path "${sample_id}.fastp.json", emit: json
  8. path "${sample_id}.fastp.html", emit: html
  9. script:
  10. def (r1, r2) = reads
  11. """
  12. fastp \\
  13. --in1 "${r1}" \\
  14. --in2 "${r2}" \\
  15. -q 20 \\
  16. -u 20 \\
  17. -l 40 \\
  18. --detect_adapter_for_pe \\
  19. --out1 "${sample_id}_trim_1.fq.gz" \\
  20. --out2 "${sample_id}_trim_2.fq.gz" \\
  21. --json "${sample_id}.fastp.json" \\
  22. --html "${sample_id}.fastp.html" \\
  23. --thread {task.cpus}
  24. """
  25. }
英文:

When you iterate through your sample map using each, you are effectively trying to re-use the FASTP and align_bwa_mem processes with each iteration. Nextflow just complains that if they (i.e. the processes) need to be re-used, they'll need to be included using a different name (i.e. using a module alias) or in a different workflow context (i.e. using a sub-workflow). A better way to achieve what you want is to use channels and the splitCSV operator, for example:

  1. params.samples_csv = 'input_nextflow_files.csv'
  2. include { FASTP } from './fastp_process.nf'
  3. workflow {
  4. def header = ['sampleName', 'normalR1', 'normalR2', 'tumorR1', 'tumorR2']
  5. Channel
  6. .fromPath( params.samples_csv )
  7. .splitCsv( header: header )
  8. .multiMap { row ->
  9. def tumor_reads = tuple( file(row.tumorR1), file(row.tumorR2) )
  10. def normal_reads = tuple( file(row.normalR1), file(row.normalR2) )
  11. tumor:
  12. tuple( row.sampleName, tumor_reads )
  13. normal:
  14. tuple( row.sampleName, normal_reads )
  15. }
  16. .set { samples }
  17. FASTP( samples.tumor.mix( samples.normal ) )
  18. ...
  19. }

Or if you wanted more flexibility, another way would be to import FASTP using a module alias:

  1. params.samples_csv = 'input_nextflow_files.csv'
  2. include { FASTP as FASTP_TUMOR } from './fastp_process.nf'
  3. include { FASTP as FASTP_NORMAL } from './fastp_process.nf'
  4. workflow {
  5. ...
  6. FASTP_TUMOR( samples.tumor )
  7. FASTP_NORMAL( samples.normal )
  8. ...
  9. }

Contents of ./fastp_process.nf:

  1. process FASTP {
  2. tag { sample_id }
  3. input:
  4. tuple val(sample_id), path(reads, stageAs: 'reads/*')
  5. output:
  6. tuple val(sample_id), path("${sample_id}_trim_{1,2}.fq.gz"), emit: reads
  7. path "${sample_id}.fastp.json", emit: json
  8. path "${sample_id}.fastp.html", emit: html
  9. script:
  10. def (r1, r2) = reads
  11. """
  12. fastp \\
  13. --in1 "${r1}" \\
  14. --in2 "${r2}" \\
  15. -q 20 \\
  16. -u 20 \\
  17. -l 40 \\
  18. --detect_adapter_for_pe \\
  19. --out1 "${sample_id}_trim_1.fq.gz" \\
  20. --out2 "${sample_id}_trim_2.fq.gz" \\
  21. --json "${sample_id}.fastp.json" \\
  22. --html "${sample_id}.fastp.html" \\
  23. --thread {task.cpus}
  24. """
  25. }

huangapple
  • 本文由 发表于 2023年8月4日 06:27:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76831937.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定