线程数量与子任务

huangapple go评论62阅读模式
英文:

Amount of Threads with subtasks

问题

线程池中的最佳线程数是与具体情况相关的,尽管有一个经验法则,它说 #线程数 = #CPU + 1。
然而,当涉及跨其他线程并等待(即在 thread.join() 成功之前被阻塞)这些“子线程”的线程时,情况会如何呢?

假设我有一段需要执行任务列表(2)的代码,其中包含子任务(2),而子任务又包含子子任务(3),依此类推。总任务数为 223 = 12,但将创建 18 个线程(因为一个线程将“生成”更多的子任务(线程),生成更多线程的线程将被阻塞,直到所有任务完成为止。请参见下面的伪代码。

我假设对于具有 N 个核心的 CPU,一个经验法则是,如果最高活动线程数(12)等于 #CPU + 1,那么一切都可以并行化。这个理解正确吗?

伪代码

outputOfTask = []
for subtask in SubTaskList
   outputOfTask --> append(subtask.doCompute())
// 等待直到所有输出完成。

在 subtask.java 中:
例如,每个子任务都实现了相同的接口,但可能是不同的。

   outputOfSubtask = []
    for task in subsubTaskList
        // 根据子任务类型执行一些操作
        outputOfSubtask -> append( task.doCompute())
    return outputOfSubtask

在 subsubtask.java 中:

 outputOfSubsubtask = []
    for task in subsubsubtask
        // 根据子子任务类型执行一些操作
        outputOfSubsubtask -> append( task.doCompute())
    return outputOfSubsubtask

编辑:
以下是 Java 代码的示例。我在原始问题中使用了这段代码来检查活动线程的数量,但我认为伪代码更加清晰。请注意:我使用了 Eclipse Collections,这引入了 asParallel 函数,允许代码的缩写表示。

@Test
public void testasParallelthreads() {
	// // ExecutorService executor = Executors.newWorkStealingPool();
	ExecutorService executor = Executors.newCachedThreadPool();
	
	MutableList<Double> myMainTask = Lists.mutable.with(1.0, 2.0);
	MutableList<Double> mySubTask = Lists.mutable.with(1.0, 2.0);
	MutableList<Double> mySubSubTask = Lists.mutable.with(1.0, 2.0);
	MutableList<Double> mySubSubSubTask = Lists.mutable.with(1.0, 2.0, 2.0);
	
	MutableList<Double> a = myMainTask.asParallel(executor, 1)
			.flatCollect(task -> mySubTask.asParallel(executor,1)
			.flatCollect(subTask -> mySubSubTask.asParallel(executor, 1)
			.flatCollect(subsubTask   -> mySubSubSubTask.asParallel(executor, 1)
			.flatCollect(subsubTask -> dummyFunction(task, subTask, subsubTask, subsubTask,executor))
			.toList()).toList()).toList()).toList();		

	System.out.println("pool size: " + ((ThreadPoolExecutor) executor).getPoolSize());
	executor.shutdownNow();
}

private MutableList<Double> dummyFunction(double a, double b, double c, double d, ExecutorService ex) {
	System.out.println("ThreadId: " + Thread.currentThread().getId());
	System.out.println("Active threads size: " + ((ThreadPoolExecutor) ex).getActiveCount());
	return Lists.mutable.with(a,b,c,d);
}
英文:

An optimum of threads in a pool is something that is case specific, though there is a rule of thumb which says #threads = #CPU +1.
However, how does this work with threads spanning other threads and waiting (i.e. blocked until thread.join() is successful) for these 'subthreads'?

Assume that I have code that requires the execution of list of tasks (2), which has subtasks(2), which has subsubtasks(3) and so on. The total number of tasks is 223 = 12, though 18 threads will be created (because a threads will 'spawn' more subtasks (threads), where the thread spawning more threads will be blocked untill all is over. See below for pseudo code.

I am assuming that for a CPU with N cores there is a rule of thumb that everything can be parallelized if the highest number of active threads (12) is #CPU + 1. Is this correct?

PseudoCode

outputOfTask = []
for subtask in SubTaskList
   outputOfTask --&gt; append(subtask.doCompute())
// wait untill all output is finished.

in subtask.java:
Each subtask, for example, implements the same interface, but can be different.

   outputOfSubtask = []
    for task in subsubTaskList
        // do some magic depending on the type of subtask
        outputOfSubtask -&gt; append( task.doCompute())
    return outputOfSubtask

in subsubtask.java:

 outputOfSubsubtask = []
    for task in subsubsubtask
        // do some magic depending on the type of subsubtask
        outputOfSubsubtask -&gt; append( task.doCompute())
    return outputOfSubsubtask

EDIT:
Dummy code Java code. I used this in my original question to check how many threads were active, but I assume that the pseudocode is more clear. Please note: I used the Eclipse Collection, this introduces the asParallel function which allows for a shorter notation of the code.

@Test
public void testasParallelthreads() {
	// // ExecutorService executor = Executors.newWorkStealingPool();
	ExecutorService executor = Executors.newCachedThreadPool();
	
	MutableList&lt;Double&gt; myMainTask = Lists.mutable.with(1.0, 2.0);
	MutableList&lt;Double&gt; mySubTask = Lists.mutable.with(1.0, 2.0);
	MutableList&lt;Double&gt; mySubSubTask = Lists.mutable.with(1.0, 2.0);
	MutableList&lt;Double&gt; mySubSubSubTask = Lists.mutable.with(1.0, 2.0, 2.0);
	
	MutableList&lt;Double&gt; a = myMainTask.asParallel(executor, 1)
			.flatCollect(task -&gt; mySubTask.asParallel(executor,1)
			.flatCollect(subTask -&gt; mySubSubTask.asParallel(executor, 1)
			.flatCollect(subsubTask   -&gt; mySubSubSubTask.asParallel(executor, 1)
			.flatCollect(subsubTask -&gt; dummyFunction(task, subTask, subsubTask, subsubTask,executor))
			.toList()).toList()).toList()).toList();		

	System.out.println(&quot;pool size: &quot; + ((ThreadPoolExecutor) executor).getPoolSize());
	executor.shutdownNow();
}

private MutableList&lt;Double&gt; dummyFunction(double a, double b, double c, double d, ExecutorService ex) {
	System.out.println(&quot;ThreadId: &quot; + Thread.currentThread().getId());
	System.out.println(&quot;Active threads size: &quot; + ((ThreadPoolExecutor) ex).getActiveCount());
	return Lists.mutable.with(a,b,c,d);
}

答案1

得分: 0

>然而,这在涵盖其他线程并等待(即在 thread.join() 成功之前被阻塞)这些“子线程”的情况下,是如何工作的呢?

线程将被阻塞,是否安排另一个线程取决于操作系统/虚拟机是否可能这样做。如果您有一个单线程池执行器并且从其中一个任务调用了 join,另一个任务甚至不会开始。使用更多线程的执行器,阻塞任务将阻塞单个线程,操作系统/虚拟机可以自由地调度其他线程。

>这些被阻塞的线程不应该消耗 CPU 时间,因为它们被阻塞了。因此,我推测对于具有 N 个内核的 CPU 来说,有一个经验法则,即如果活动线程的最高数量(24)为 #CPU + 1,那么所有工作都可以并行处理。这个推测正确吗?

活动线程可以处于阻塞状态。我认为您在这里混淆了术语,“#CPU”是指核心数,包括物理核心数和虚拟核心数。如果您有 N 个物理内核,那么您可以并行运行 N 个 CPU 密集型任务。当您有其他类型的阻塞或非常短暂的任务时,您可以有更多的并行任务。

英文:

>However, how does this work with threads spanning other threads and waiting (i.e. blocked until thread.join() is successful) for these 'subthreads'?

Threads will block, and it is up to the os/jvm to schedule another one if possible. If you have a single thread pool executor and call join from one of your tasks, the other task won't even get started. With executors that use more threads, then the blocking task will block a single thread and the os/jvm is free to scheduled other threads.

>These blocked threads should not consume CPU time, because they are blocked. So I am assuming that for a CPU with N cores there is a rule of thumb that everything can be parallelized if the highest number of active threads (24) is #CPU + 1. Is this correct?

Active threads can be blocking. I think you're mixing terms here, #CPU, the number of cores, and the number of virtual cores. If you have N physical cores, then you can run N cpu bound tasks in parallel. When you have other types of blocking or very short lived tasks, then you can have more parallel tasks.

答案2

得分: 0

我假设对于一个有N个核心的CPU,有一个经验法则,即如果最高活动线程数(12)为 #CPU + 1,那么一切都可以并行处理。这个说法正确吗?

这个主题非常难以概括。即使有了实际的代码,应用程序的性能也很难确定。即使你能够得出一个估计值,实际性能在不同运行之间可能会有很大的变化,特别是考虑到线程之间的相互交互。唯一能够使用 #CPU + 1 这个数字的情况是,当提交到线程池的作业是独立且完全受限于CPU的。

我建议在模拟负载下尝试多种不同的线程池大小值,以找到适合你的应用程序的最优值。检查整体吞吐量数字或系统负载统计数据应该能够为你提供所需的反馈。

英文:

> I am assuming that for a CPU with N cores there is a rule of thumb that everything can be parallelized if the highest number of active threads (12) is #CPU + 1. Is this correct?

This topic is extremely hard to generalize about. Even with the actual code, the performance of your application is going to be very difficult to determine. Even if you could come up an estimation, the actual performance may vary wildly between runs – especially considering that the threads are interacting with each other. The only time we can take the #CPU + 1 number is if the jobs that are submitted into the thread-pool are independent and completely CPU bound.

I'd recommend trying a number of different thread-pool size values under simulated load to find the optimal values for your application. Examining the overall throughput numbers or system load stats should give you the feedback you need.

huangapple
  • 本文由 发表于 2020年4月3日 19:27:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/61010827.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定