嵌套并行处理使用 R 的 future 包。

huangapple go评论70阅读模式
英文:

Nested parallelism with R future

问题

我正在尝试使用future实现嵌套并行处理来读取多个大型CSV文件。

我有一台拥有32个内核的单机,并希望设置外部5个进程,每个进程有6个内核的嵌套并行(5乘以6)。我正在尝试利用data.table::fread(.., nThreads = 6)中的隐式并行性。

R包future提供了嵌套并行处理,我已经尝试了以下代码:

library(future)
plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))

但是上述代码实际上只使用了每个子进程的一个内核:

plan(list(tweak(multisession, workers = 5), 
          tweak(multisession, workers = 6)))
registerDoFuture()
foreach(i = 1:5) %dopar%  {
  availableCores()
}

[[1]]
mc.cores 
       1 

[[2]]
mc.cores 
       1 

[[3]]
mc.cores 
       1 

[[4]]
mc.cores 
       1 

[[5]]
mc.cores 
       1 

有没有一种方法可以实现这个目标?

英文:

I'm trying to read multiple large csv files with nested parallelism with future.

I have a single machine with 32 cores, and I want to set up nested parallel (5 by 6) with outer 5 process with 6 cores each. I'm trying to utilize implicit parallelism from data.table::fread(.., nThreads = 6).

The R package future provides nested parallelism, and I've tried

library(future)
plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))

but above is actually using only 1 cores for each subprocess:

plan(list(tweak(multisession, workers = 5), 
          tweak(multisession, workers = 6)))
registerDoFuture()
foreach(i = 1:5) %dopar%  {
  availableCores()
}

[[1]]
mc.cores 
       1 

[[2]]
mc.cores 
       1 

[[3]]
mc.cores 
       1 

[[4]]
mc.cores 
       1 

[[5]]
mc.cores 
       1 

Is there a way to achieve this?

答案1

得分: 1

(Futureverse 维护者在此)

> ... 但实际上上面每个子进程只使用了一个核心:

我明白了误解的原因。你想在这里使用 `nbrOfWorkers()`(来自 **[future]**)而不是 `availableCores()`(来自 **[parallelly]** - 从 **future** 中原样重新导出)。这将给你你期望的结果:

```r
> foreach(i = 1:5) %dopar% {
  nbrOfWorkers()
}
[[1]]
[1] 6
...
[[5]]
[1] 6

availableCores() 返回 1 的原因是因为 future 框架试图防止误嵌套并行化。它通过设置选项和环境变量来控制并行工作进程和 CPU 核心的数量,包括 options(mc.cores = 1L)availableCores() 正确地获取了这个值。这样做可以防止例如使用 y <- mclapply(X, FUN)cl <- makeCluster(avaiableCores())plan(multisession) 的包在已经在并行工作进程中运行时再次并行化。相比之下,nbrOfWorkers() 反映了由 plan() 指定的工作进程数量。在你的情况下,我们在并行工作进程中设置了 plan(multisession, workers = 6),来自于 plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6))) 的第二级。

为了说服自己确实在你的设置中并行运行,你可以采用 https://future.futureverse.org/articles/future-3-topologies.html 中的示例之一。

现在,并行的“线程”与并行的“进程”(也称为并行工作进程)并不相同。你可以将“线程”视为一个更低级别的并行化机制。重要的是,future 框架不会限制在并行工作进程中使用的线程数量,包括 data.table 使用的并行线程数量。因此,你需要明确调用:

data <- data.table::fread(.., nThreads = 6)

或者,如果你想对当前设置灵活应对,

data <- data.table::fread(.., nThreads = nbrOfWorkers())

以避免过度并行化。另外,你可以重新配置 data.table 如下:

## 设置 'data.table' 使用的并行线程数
## (默认为使用所有物理 CPU 核心)
data.table::setDTthreads(nbrOfWorkers())
data <- data.table::fread(..;)

顺便说一句,在 doFuture(>= 1.0.0)中,如果你将 %dopar% 替换为 %dofuture%,你不再需要 registerDoFuture()。因此,在并行读取许多 CSV 文件的要点是:

library(doFuture)
plan(list(tweak(multisession, workers = 5), 
          tweak(multisession, workers = 6)))

files <- dir(pattern = "*.csv$")
res <- foreach(file = files) %dofuture% {
  data.table::setDTthreads(nbrOfWorkers())
  data.table::fread(file)
}

说了这么多,要注意你的瓶颈可能是文件系统而不是 CPU。当你并行读取文件时,可能会超载文件系统,导致文件读取变慢而不是加快。有时并行读取两三个文件会更快,但多了会适得其反。因此,你需要使用不同数量的并行工作进程进行基准测试。

此外,如今有一些专门用于高效读取数据文件的 R 包。其中一些支持高效地读取多个文件。vroom 包就是一个例子。


<details>
<summary>英文:</summary>

(Futureverse maintainer here)

&gt; ... but above is actually using only 1 cores for each subprocess:

I see the misunderstanding here. You want to use `nbrOfWorkers()` (from **[future]**) here instead of `availableCores()` (from **[parallelly]** - reexported as-is from **future**).  This will give you what you&#39;d expected:

```r
&gt; foreach(i = 1:5) %dopar% {
  nbrOfWorkers()
}
[[1]]
[1] 6
...
[[5]]
[1] 6

The reason for availableCores() returning one (1) is because the future framework tries to prevent nested parallelization by mistake. It does this by setting options and environment variables that controls number of parallel workers and CPU cores, including options(mc.cores = 1L). This is correctly picked up by availableCores(). This prevents, for instance, a package that uses y &lt;- mclapply(X, FUN), cl &lt;- makeCluster(avaiableCores()), or plan(multisession) from running in parallel if already running in a parallel worker. In contrast, nbrOfWorkers() reflects the number of workers specified by plan(). In your case, we have plan(multisession, workers = 6) set in the parallel workers, from the second level in plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6))).

To convince yourself you're indeed are running in parallel with your setup, you can adopt one of the examples in <https://future.futureverse.org/articles/future-3-topologies.html>.

Now, parallel threads are not the same as parallel processes (aka parallel workers). You can think of threads as a much lower-level parallelization mechanism. Importantly, the future framework does not constrain the number of threads used in parallel workers, including the number of parallel threads that data.table uses. Because of this, you need to explicitly call:

data &lt;- data.table::fread(.., nThreads = 6)

or, if you want to be agile to the current settings,

data &lt;- data.table::fread(.., nThreads = nbrOfWorkers())

to avoid over-parallelization. Alternatively, you can reconfigure data.table as:

## Set the number of parallel threads used by &#39;data.table&#39;
## (the default is to use all physical CPU cores)
data.table::setDTthreads(nbrOfWorkers())
data &lt;- data.table::fread(..;)

BTW, in doFuture (>= 1.0.0), you longer need registerDoFuture() if you replace %dopar% with %dofuture%. So, the gist of reading lots of CSV files in parallel is:

library(doFuture)
plan(list(tweak(multisession, workers = 5), 
          tweak(multisession, workers = 6)))

files &lt;- dir(pattern = &quot;*.csv$&quot;)
res &lt;- foreach(file = files) %dofuture% {
  data.table::setDTthreads(nbrOfWorkers())
  data.table::fread(file)
}

With all that said, note that your bottleneck will probably be the file system rather than the CPU. When you parallelize reading files, you might overwhelm the file system and end up slowing down the file reading rather than speeding it up. Sometimes it gets faster to read two-three files in parallel, but with more it becomes counterproductive. So, you need to benchmark with different number of parallel workers.

Moreover, these days, there are R packages that are highly-specialized for reading data files into R efficiently. Some of them supports reading multiple files efficiently. The vroom package is one such example.

huangapple
  • 本文由 发表于 2023年3月31日 03:06:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/75892073.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定