英文:
Nexflow only processes one of my paired sample in a subworkflow
问题
I have a workflow consisting of 2 subworkflows.
我有一个包含2个子工作流程的工作流程。
params.reads = "$projectDir/data/raw/reads/*_{1,2}.fastq.gz"
params.kaiju_db = "$projectDir/data/kaijudb/viruses/kaiju_db_viruses.fmi"
params.kaiju_names = "$projectDir/data/kaijudb/viruses/names.dmp"
params.kaiju_nodes = "$projectDir/data/kaijudb/viruses/nodes.dmp"
workflow subworkflow_A {
take:
reads // channel: [ val(sample), [ reads ] ]
main:
count_reads(reads)
trim_reads(reads)
emit:
trimmed_reads = process2.out.reads // channel: [ val(sample), [ trimmed_reads ] ]
}
workflow subworkflow_B {
take:
reads // channel: [ val(sample), [ reads ] ]
db // channel: /path/to/kaiju/db.fmi
nodes // channel: /path/to/kaiju/nodes/file
names // channel: /path/to/kaiju/names/file
main:
taxonomic_classification(reads, nodes, db)
kaiju_to_krona(taxonomic_classification.out, nodes, names)
krona_import_text(kaiju_to_krona.out)
kaiju_to_table(taxonomic_classification.out, nodes, names)
}
workflow main {
ch_reads = Channel.fromFilePairs("$params.reads", checkIfExists:true)
subworkflow_A(ch_reads)
ch_db = Channel.fromPath("$params.kaiju_db", checkIfExists: true)
ch_nodes = Channel.fromPath("$params.kaiju_nodes", checkIfExists: true)
ch_names = Channel.fromPath("$params.kaiju_names", checkIfExists: true)
ch_trimmed_reads = subworkflow_A.out.trimmed_reads
subworkflow_B(ch_processed_reads, ch_db, ch_nodes, ch_names)
}
The input for params.reads
is a directory like,
`params.reads` 的输入是一个目录,例如,
reads/
├── test_sample1_1.fastq.gz
├── test_sample1_2.fastq.gz
├── test_sample2_1.fastq.gz
└── test_sample2_2.fastq.gz
The input for subworkflow_A
, ch_reads
is:
`subworkflow_A` 的输入 `ch_reads` 是:
[test_sample1, [~project/data/raw/reads/test_sample1_1.fastq.gz, ~project/data/raw/reads/test_sample1_2.fastq.gz]]
[test_sample2, [~project/data/raw/reads/test_sample2_1.fastq.gz, ~project/data/raw/reads/test_sample2_2.fastq.gz]]
subworkflow_A
then emits the following channel into ch_trimmed_reads
然后,`subworkflow_A` 向 `ch_trimmed_reads` 发射以下通道:
[test_sample1, [~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R1.fq.gz, ~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R2.fq.gz]]
[test_sample2, [~project/work/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R1.fq.gz, ~project/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R2.fq.gz]]
For some reason, subworkflow_B
only runs the first sample test_sample1
, and not the second sample test_sample1
when I want to run it over both samples.
由于某种原因,`subworkflow_B` 只运行第一个样本 `test_sample1`,而不运行第二个样本 `test_sample2`,但我希望它在两个样本上运行。
英文:
I have a workflow consisting of 2 subworkflows.
params.reads = "$projectDir/data/raw/reads/*_{1,2}.fastq.gz"
params.kaiju_db = "$projectDir/data/kaijudb/viruses/kaiju_db_viruses.fmi"
params.kaiju_names = "$projectDir/data/kaijudb/viruses/names.dmp"
params.kaiju_nodes = "$projectDir/data/kaijudb/viruses/nodes.dmp"
workflow subworkflow_A {
take:
reads // channel: [ val(sample), [ reads ] ]
main:
count_reads(reads)
trim_reads(reads)
emit:
trimmed_reads = process2.out.reads // channel: [ val(sample), [ trimmed_reads ] ]
}
workflow subworkflow_B {
take:
reads // channel: [ val(sample), [ reads ] ]
db // channel: /path/to/kaiju/db.fmi
nodes // channel: /path/to/kaiju/nodes/file
names // channel: /path/to/kaiju/names/file
main:
taxonomic_classification(reads, nodes, db)
kaiju_to_krona(taxonomic_classification.out, nodes, names)
krona_import_text(kaiju_to_krona.out)
kaiju_to_table(taxonomic_classification.out, nodes, names)
}
workflow main {
ch_reads = Channel.fromFilePairs("$params.reads", checkIfExists:true)
subworkflow_A(ch_reads)
ch_db = Channel.fromPath("$params.kaiju_db", checkIfExists: true)
ch_nodes = Channel.fromPath("$params.kaiju_nodes", checkIfExists: true)
ch_names = Channel.fromPath("$params.kaiju_names", checkIfExists: true)
ch_trimmed_reads = subworkflow_A.out.trimmed_reads
subworkflow_B(ch_processed_reads, ch_db, ch_nodes, ch_names)
}
The input for params.reads
is a directory like,
reads/
├── test_sample1_1.fastq.gz
├── test_sample1_2.fastq.gz
├── test_sample2_1.fastq.gz
└── test_sample2_2.fastq.gz
The input for subworkflow_A
, ch_reads
is:
[test_sample1, [~project/data/raw/reads/test_sample1_1.fastq.gz, ~project/data/raw/reads/test_sample1_2.fastq.gz]]
[test_sample2, [~project/data/raw/reads/test_sample2_1.fastq.gz, ~project/data/raw/reads/test_sample2_2.fastq.gz]]
subworkflow_A
then emits the following channel into ch_trimmed_reads
[test_sample1, [~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R1.fq.gz, ~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R2.fq.gz]]
[test_sample2, [~project/work/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R1.fq.gz, ~project/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R2.fq.gz]]
For some reason, subworkflow_B
only runs the first sample test_sample1
, and not the second sample test_sample1
when I want to run it over both samples.
答案1
得分: 1
请注意,当使用简单值调用进程时,会隐式创建一个value channel。这意味着您可以直接传递一个普通的 file
对象。例如:
ch_reads = Channel.fromFilePairs( params.reads, checkIfExists:true )
db = file( params.kaiju_db )
nodes = file( params.kaiju_nodes )
names = file( params.kaiju_names )
subworkflow_B( ch_reads, db, nodes, names )
}
大多数情况下,当您的进程需要多个输入通道时,您希望有一个queue通道和一个或多个value通道:
当将两个或更多通道声明为进程输入时,进程会等待,直到存在完整的输入配置,即直到从每个输入通道接收一个值。满足此条件后,进程会从每个通道中消耗一个值并启动一个新任务,重复此逻辑,直到一个或多个通道为空。
结果,通道值按顺序消耗,任何空通道都会导致进程等待,即使其他通道有值。
使用value channel时应用了不同的语义。这种类型的通道是通过Channel.value工厂方法创建的,或在使用不是通道的参数调用进程时隐式创建。根据定义,值通道绑定到单个值,可以无限次地读取而不消耗其内容。因此,在将值通道与一个或多个(队列)通道混合使用时,它不会影响进程的终止,因为底层值会重复应用。
英文:
Note that a value channel is implicitly created by a process when it is invoked with a simple value. This means you can just pass in a plain file
object. For example:
workflow main {
ch_reads = Channel.fromFilePairs( params.reads, checkIfExists:true )
db = file( params.kaiju_db )
nodes = file( params.kaiju_nodes )
names = file( params.kaiju_names )
subworkflow_B( ch_reads, db, nodes, names )
}
Most of the time, what you want is one queue channel and one or more value channels when your process requires multiple input channels:
> When two or more channels are declared as process inputs, the process
> waits until there is a complete input configuration, i.e. until it
> receives a value from each input channel. When this condition is
> satisfied, the process consumes a value from each channel and launches
> a new task, repeating this logic until one or more channels are empty.
>
> As a result, channel values are consumed sequentially and any empty
> channel will cause the process to wait, even if the other channels
> have values.
>
> A different semantic is applied when using a value channel. This kind
> of channel is created by the Channel.value factory method or
> implicitly when a process is invoked with an argument that is not a
> channel. By definition, a value channel is bound to a single value and
> it can be read an unlimited number of times without consuming its
> content. Therefore, when mixing a value channel with one or more
> (queue) channels, it does not affect the process termination because
> the underlying value is applied repeatedly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论