Nexflow只在子工作流程中处理一对样本中的一个。

huangapple go评论53阅读模式
英文:

Nexflow only processes one of my paired sample in a subworkflow

问题

I have a workflow consisting of 2 subworkflows.

我有一个包含2个子工作流程的工作流程。

params.reads = "$projectDir/data/raw/reads/*_{1,2}.fastq.gz"
params.kaiju_db = "$projectDir/data/kaijudb/viruses/kaiju_db_viruses.fmi"
params.kaiju_names = "$projectDir/data/kaijudb/viruses/names.dmp"
params.kaiju_nodes = "$projectDir/data/kaijudb/viruses/nodes.dmp"

workflow subworkflow_A {

  take:
    reads                                      // channel: [ val(sample), [ reads ] ]

  main:
    count_reads(reads)
    trim_reads(reads)

  emit:
    trimmed_reads = process2.out.reads      // channel: [ val(sample), [ trimmed_reads ] ]
}

workflow subworkflow_B {

  take:
    reads                                      // channel: [ val(sample), [ reads ] ]
    db            // channel: /path/to/kaiju/db.fmi
    nodes         // channel: /path/to/kaiju/nodes/file
    names         // channel: /path/to/kaiju/names/file

  main:
    taxonomic_classification(reads, nodes, db)
    kaiju_to_krona(taxonomic_classification.out, nodes, names)
    krona_import_text(kaiju_to_krona.out)
    kaiju_to_table(taxonomic_classification.out, nodes, names)
}

workflow main {
    ch_reads = Channel.fromFilePairs("$params.reads", checkIfExists:true)
    subworkflow_A(ch_reads)

    ch_db = Channel.fromPath("$params.kaiju_db", checkIfExists: true)
    ch_nodes = Channel.fromPath("$params.kaiju_nodes", checkIfExists: true)
    ch_names = Channel.fromPath("$params.kaiju_names", checkIfExists: true)
    ch_trimmed_reads = subworkflow_A.out.trimmed_reads
    subworkflow_B(ch_processed_reads, ch_db, ch_nodes, ch_names)
}

The input for params.reads is a directory like,

`params.reads` 的输入是一个目录,例如,

reads/
├── test_sample1_1.fastq.gz
├── test_sample1_2.fastq.gz
├── test_sample2_1.fastq.gz
└── test_sample2_2.fastq.gz

The input for subworkflow_A, ch_reads is:

`subworkflow_A` 的输入 `ch_reads` 是:

[test_sample1, [~project/data/raw/reads/test_sample1_1.fastq.gz, ~project/data/raw/reads/test_sample1_2.fastq.gz]]
[test_sample2, [~project/data/raw/reads/test_sample2_1.fastq.gz, ~project/data/raw/reads/test_sample2_2.fastq.gz]]

subworkflow_A then emits the following channel into ch_trimmed_reads

然后,`subworkflow_A` 向 `ch_trimmed_reads` 发射以下通道:

[test_sample1, [~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R1.fq.gz, ~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R2.fq.gz]]
[test_sample2, [~project/work/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R1.fq.gz, ~project/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R2.fq.gz]]

For some reason, subworkflow_B only runs the first sample test_sample1, and not the second sample test_sample1 when I want to run it over both samples.

由于某种原因,`subworkflow_B` 只运行第一个样本 `test_sample1`,而不运行第二个样本 `test_sample2`,但我希望它在两个样本上运行。
英文:

I have a workflow consisting of 2 subworkflows.

params.reads = "$projectDir/data/raw/reads/*_{1,2}.fastq.gz"
params.kaiju_db = "$projectDir/data/kaijudb/viruses/kaiju_db_viruses.fmi"
params.kaiju_names = "$projectDir/data/kaijudb/viruses/names.dmp"
params.kaiju_nodes = "$projectDir/data/kaijudb/viruses/nodes.dmp"

workflow subworkflow_A {

  take:
    reads                                      // channel: [ val(sample), [ reads ] ]

  main:
    count_reads(reads)
    trim_reads(reads)

  emit:
    trimmed_reads = process2.out.reads      // channel: [ val(sample), [ trimmed_reads ] ]
}

workflow subworkflow_B {

  take:
    reads                                      // channel: [ val(sample), [ reads ] ]
    db            // channel: /path/to/kaiju/db.fmi
    nodes         // channel: /path/to/kaiju/nodes/file
    names         // channel: /path/to/kaiju/names/file

  main:
    taxonomic_classification(reads, nodes, db)
    kaiju_to_krona(taxonomic_classification.out, nodes, names)
    krona_import_text(kaiju_to_krona.out)
    kaiju_to_table(taxonomic_classification.out, nodes, names)
}

workflow main {
	ch_reads = Channel.fromFilePairs("$params.reads", checkIfExists:true)
	subworkflow_A(ch_reads)
	
    ch_db = Channel.fromPath("$params.kaiju_db", checkIfExists: true)
    ch_nodes = Channel.fromPath("$params.kaiju_nodes", checkIfExists: true)
    ch_names = Channel.fromPath("$params.kaiju_names", checkIfExists: true)
	ch_trimmed_reads = subworkflow_A.out.trimmed_reads
	subworkflow_B(ch_processed_reads, ch_db, ch_nodes, ch_names)
}

The input for params.reads is a directory like,

reads/
├── test_sample1_1.fastq.gz
├── test_sample1_2.fastq.gz
├── test_sample2_1.fastq.gz
└── test_sample2_2.fastq.gz

The input for subworkflow_A, ch_reads is:

[test_sample1, [~project/data/raw/reads/test_sample1_1.fastq.gz, ~project/data/raw/reads/test_sample1_2.fastq.gz]]
[test_sample2, [~project/data/raw/reads/test_sample2_1.fastq.gz, ~project/data/raw/reads/test_sample2_2.fastq.gz]]

subworkflow_A then emits the following channel into ch_trimmed_reads

[test_sample1, [~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R1.fq.gz, ~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R2.fq.gz]]
[test_sample2, [~project/work/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R1.fq.gz, ~project/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R2.fq.gz]]

For some reason, subworkflow_B only runs the first sample test_sample1, and not the second sample test_sample1 when I want to run it over both samples.

答案1

得分: 1

请注意,当使用简单值调用进程时,会隐式创建一个value channel。这意味着您可以直接传递一个普通的 file 对象。例如:

    ch_reads = Channel.fromFilePairs( params.reads, checkIfExists:true )

    db = file( params.kaiju_db )
    nodes = file( params.kaiju_nodes )
    names = file( params.kaiju_names )

    subworkflow_B( ch_reads, db, nodes, names )
}

大多数情况下,当您的进程需要多个输入通道时,您希望有一个queue通道和一个或多个value通道:

当将两个或更多通道声明为进程输入时,进程会等待,直到存在完整的输入配置,即直到从每个输入通道接收一个值。满足此条件后,进程会从每个通道中消耗一个值并启动一个新任务,重复此逻辑,直到一个或多个通道为空。

结果,通道值按顺序消耗,任何空通道都会导致进程等待,即使其他通道有值。

使用value channel时应用了不同的语义。这种类型的通道是通过Channel.value工厂方法创建的,或在使用不是通道的参数调用进程时隐式创建。根据定义,值通道绑定到单个值,可以无限次地读取而不消耗其内容。因此,在将值通道与一个或多个(队列)通道混合使用时,它不会影响进程的终止,因为底层值会重复应用。

英文:

Note that a value channel is implicitly created by a process when it is invoked with a simple value. This means you can just pass in a plain file object. For example:

workflow main {
    ch_reads = Channel.fromFilePairs( params.reads, checkIfExists:true )

    db = file( params.kaiju_db )
    nodes = file( params.kaiju_nodes )
    names = file( params.kaiju_names )

    subworkflow_B( ch_reads, db, nodes, names )
}

Most of the time, what you want is one queue channel and one or more value channels when your process requires multiple input channels:

> When two or more channels are declared as process inputs, the process
> waits until there is a complete input configuration, i.e. until it
> receives a value from each input channel. When this condition is
> satisfied, the process consumes a value from each channel and launches
> a new task, repeating this logic until one or more channels are empty.
>
> As a result, channel values are consumed sequentially and any empty
> channel will cause the process to wait, even if the other channels
> have values.
>
> A different semantic is applied when using a value channel. This kind
> of channel is created by the Channel.value factory method or
> implicitly when a process is invoked with an argument that is not a
> channel. By definition, a value channel is bound to a single value and
> it can be read an unlimited number of times without consuming its
> content. Therefore, when mixing a value channel with one or more
> (queue) channels, it does not affect the process termination because
> the underlying value is applied repeatedly.

huangapple
  • 本文由 发表于 2023年2月10日 12:18:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/75406907.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定