Nextflow的CPU设置配置文件

huangapple go评论60阅读模式
英文:

Nextflow config file for cpus setting

问题

关于Nextflow配置中的cpus设置,这个设置是指实际的CPU核心数还是线程数?

cpus 设置表示实际的CPU核心数,而不是线程数。

这个设置的意思是,无论有多少并行作业正在运行,系统都不会超过这个设置吗?还是我必须在资源枯竭的过程中设置并行数?

这个设置的意思是,每个流程(process)将使用指定数量的CPU核心,但不会限制系统中同时运行的并行作业数量。如果你希望限制同时运行的作业数量,需要使用 Nextflow 的资源管理功能,例如使用 maxForks 来控制并行作业的数量。

英文:

I am confused about the Nextflow config cups setting, is this cpus number the actual CPU number or thread number?

process {

    executor = 'local'

    cpus = 24
    memory = 128.GB
    time = 6.h

    withName: CENTRIFUGE {
        cpus = 32
        memory = 384.GB
        time = 3.h
    }
}

Does this setting means, no mater how many parallel jobs are running, the system would never exceed this setting? Or do I have to set the parallelization number in a resources exhausting process?

def barcodes = (1..20).collect { String.format("barcode%02d", it) }
params.infq = barcodes.collect { "../fastq_pass/$it/*.fastq.gz" }
params.outdir = "Analysis"
params.hostref = "/mnt/genomic.fna"
params.targetref = "/mnt/NTM.fasta"
params.db = "/mnt/db"

process CAT {
    debug true
    publishDir "${params.outdir}/orifq", mode: 'copy'

    input:
    tuple val(bc), path(fq)

    output:
    path "${bc}.fastq.gz"

    """
    cat ${fq} > ${bc}.fastq.gz
    """
}

process ALL_STATS {
    debug true
    publishDir "${params.outdir}/stats", mode: 'copy'

    input:
    path orifq

    output:
    path "all_stats.txt"

    """
    seqkit stat ${orifq} -T > all_stats.txt
    """
}

process NanoLyse {
    debug true
    publishDir "${params.outdir}/outfq", mode: 'copy'

    input:
    path orifq

    output:
    path "${orifq.getSimpleName()}_reads_without_host.fastq.gz"

    """
    gunzip -c ${orifq} | NanoLyse --reference ${params.hostref} | gzip > ${orifq.getSimpleName()}_reads_without_host.fastq.gz
    """
}

process CENTRIFUGE {
    debug true
    publishDir "${params.outdir}/centrifuge", mode: 'copy'

    input:
    path filterfq
    path params.db

    output:
    path "${filterfq.getSimpleName()}_cenout.csv"
    path "${filterfq.getSimpleName()}_report.tsv"
    path "${filterfq.getSimpleName()}_cenout.kraken"

    """
    centrifuge -x ${params.db} -U ${filterfq} -S ${filterfq.getSimpleName()}_cenout.csv --report-file ${filterfq.getSimpleName()}_report.tsv
    centrifuge-kreport -x ${params.db} ${filterfq.getSimpleName()}_cenout.csv > ${filterfq.getSimpleName()}_cenout.kraken
    """
}

process NO_HOST_STATS {
    debug true
    publishDir "${params.outdir}/stats", mode: 'copy'

    input:
    path filterfq

    output:
    path "no_host_stats.txt"

    """
    seqkit stat ${filterfq} -T > no_host_stats.txt
    """
}

process MAPPING {
    debug true
    publishDir "${params.outdir}/bam", mode: 'copy'

    input:
    path filterfq

    output:
    path ("${filterfq.getSimpleName()}_no_host.bam"), emit: bam
    path ("${filterfq.getSimpleName()}_no_host.bam.bai")

    """
    minimap2 -ax map-ont ${params.targetref} ${filterfq} | samtools view -b | samtools sort > ${filterfq.getSimpleName()}_no_host.bam
    samtools index ${filterfq.getSimpleName()}_no_host.bam
    """
}

process EXTRACT_MAPPED {
    debug true
    publishDir "${params.outdir}/outfq", mode: 'copy'

    input:
    path bam

    output:
    path "${bam.getBaseName()}_mapped.fastq.gz"

    """
    samtools fastq -F 0X904 ${bam} | gzip > ${bam.getBaseName()}_mapped.fastq.gz
    """
}

process MAPPED_STATS {
    debug true
    publishDir "${params.outdir}/stats", mode: 'copy'

    input:
    path mappedfq

    output:
    path "mapped_stats.txt"

    """
    seqkit stat ${mappedfq} -T > mapped_stats.txt
    """
}


process STATS_SUMMARY {
    debug true
    publishDir "${params.outdir}/stats", mode: 'copy'

    input:
    path "${projectDir}/Analysis/stats/all_stats.txt"
    path "${projectDir}/Analysis/stats/no_host_stats.txt"
    path "${projectDir}/Analysis/stats/mapped_stats.txt"

    output:
    path "Summary.csv"

    """
    python ${projectDir}/summ.py     ${projectDir}/Analysis/stats/all_stats.txt ${projectDir}/Analysis/stats/no_host_stats.txt {projectDir}/Analysis/stats/mapped_stats.txt
    """
}

workflow {
    Channel
    .fromPath(params.infq, checkIfExists: true)
    .map { it -> [it.name.split("_")[2], it] }
    .groupTuple()
    .set{infq_ch}

    CAT(infq_ch) \
    |collect \
    |ALL_STATS

    NanoLyse(CAT.out) \
    |collect \
    |NO_HOST_STATS

    CENTRIFUGE(NanoLyse.out, params.db)

    MAPPING(NanoLyse.out)

    EXTRACT_MAPPED(MAPPING.out.bam) \
    |collect \
    |MAPPED_STATS

    STATS_SUMMARY(ALL_STATS.out, NO_HOST_STATS.out, MAPPED_STATS.out)
}

答案1

得分: 1

cpus 指令只是允许您设置资源请求中的 CPU 数量。然后由作业调度程序来满足此请求。请注意,这只是一个“请求”,并非所有资源管理器都会强制执行 CPU 或内存限制。这意味着最终由您负责确保您的作业不会使用比您请求的资源多(或远少)。例如,使用比您请求的资源更多可能会潜在地超额分配您的作业所在的节点。要使用使用 cpus 指令定义的 CPU 数量,我们可以在我们的脚本块中使用 task.cpus 隐式变量,例如:

params.seqs = '/path/to/seqs/*.fa';

process blastp {

    input:
    path input_sequence

    """
    blastp \\
        -num_threads ${task.cpus} \\
        -query input_sequence
    """
}

workflow {

    seqs = Channel.fromPath( params.seqs )

    blastp( seqs )
}

并且可以使用一个或多个 process selectors 来覆盖任何默认设置在您的 nextflow.config 中:

process {

    executor = 'local';

    cpus = 2
    memory = 128.GB
    time = 6.h

    withName: blastp {
        cpus = 8
        memory = 12.GB
        time = 1.h
    }
}

如果我在我的本地工作站上运行上述工作流,该工作站配备了一个八核的 Xeon 处理器,Nextflow 将一次只运行一个作业。如果您尝试请求比系统已安装的 CPU 或内存更多的资源,我相信会引发异常。

英文:

The cpus directive just lets you set the number of CPUs in your resource request. It is then up to your job scheduler to fulfill this request. Note that this is just a request and not all resource managers will impose CPU or memory limits. This means that it is ultimately your responsibility to ensure that your jobs do not use more (or much less) than what you ask for. Using more than what you ask for, for example, can potentially over-subscribe the node(s) that your jobs land on. To use the number of CPUs defined using the cpus directive, we can use the task.cpus implicit variable in our script block, for example:

params.seqs = '/path/to/seqs/*.fa'

process blastp {

    input:
    path input_sequence

    """
    blastp \\
        -num_threads ${task.cpus} \\
        -query input_sequence

    """
}

workflow {

    seqs = Channel.fromPath( params.seqs )

    blastp( seqs )
}

And override any defaults using one or more process selectors in your nextflow.config:

process {

    executor = 'local'

    cpus = 2
    memory = 128.GB
    time = 6.h

    withName: blastp {
        cpus = 8
        memory = 12.GB
        time = 1.h
    }
}

If I was to run the above workflow on my local workstation which has an octa-core Xeon processor, Nextflow will run only one job at a time. I believe an exception is raised if you try to ask for more CPU or memory than what your system has installed.

huangapple
  • 本文由 发表于 2023年7月3日 16:57:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76603283.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定