并行 For 循环

huangapple go评论103阅读模式
英文:

Parallel For-Loop

问题

我想要使用Go协程使for循环并行化。我尝试使用通道,但没有成功。我的主要问题是,我希望在继续之前等待所有迭代完成。这就是为什么简单地在前面加上go不起作用的原因。我尝试使用通道(可能是错误的方式),但这使得我的代码变得更慢。

func createPopulation(populationSize int, individualSize int) []Individual {
    population := make([]Individual, populationSize)

    // 我希望这个循环可以并行工作
    for i := 0; i < len(population); i++ {
        population[i] = createIndividual(individualSize)
    }

    return population
}

func createIndividual(size int) Individual {
    var individual = Individual{make([]bool, size), 0}

    for i := 0; i < len(individual.gene); i++ {
        if rand.Intn(2)%2 == 1 {
            individual.gene[i] = true
        } else {
            individual.gene[i] = false
        }
    }

    return individual
}

type Individual struct {
    gene    []bool
    fitness int
}

希望这次的回答对你有帮助!如果你还有其他问题,请随时提问。

英文:

I want the for-loop to be parallel using go routines. i tried using channels but that didnt work. My main problem is, that i want to wait for all iterations to be finished before continuing. That's why simply writing go before it doesn't work. I tried to using channels (i think the wrong way) but that made my code even slower

func createPopulation(populationSize int, individualSize int) []Individual {
    population := make([]Individual, populationSize)

    //i want this loop to be work parallel
    for i := 0; i &lt; len(population); i++ {
        population[i] = createIndividual(individualSize)
    }

    return population
}

func createIndividual(size int) Individual {
    var individual = Individual{make([]bool, size), 0}

    for i := 0; i &lt; len(individual.gene); i++ {
        if rand.Intn(2)%2 == 1 {
            individual.gene[i] = true
        } else {
            individual.gene[i] = false
        }
    }

    return individual
}

My struct looks like this:

type Individual struct {
    gene []bool
    fitness int
}

答案1

得分: 32

所以基本上,goroutine 不应该返回一个值,而是将其推送到一个通道中。如果你想要等待所有的 goroutine 完成,你可以简单地计数 goroutine 的数量,或者使用一个 WaitGroup。在这个例子中,这可能有点过度,因为大小是已知的,但这是一个好的实践。下面是一个修改后的例子:

package main

import (
	"math/rand"
	"sync"
)

type Individual struct {
	gene    []bool
	fitness int
}

func createPopulation(populationSize int, individualSize int) []Individual {
	// 创建一个容量为 populationSize,但大小为 0 的切片,这样我们就可以避免额外的不必要的分配
	population := make([]Individual, 0, populationSize)

	// 创建一个带缓冲的通道,这样在等待 waitgroup 完成时写入它不会阻塞
	ch := make(chan Individual, populationSize)

	// 创建一个 waitgroup,基本上是在 N 个任务都完成之前阻塞
	wg := sync.WaitGroup{}

	for i := 0; i < populationSize; i++ {
		// 将 waitgroup 加 1,每个 worker 将其减回来
		wg.Add(1)

		// 现在我们生成一个 goroutine
		go createIndividual(individualSize, ch, &wg)
	}

	// 现在我们等待所有人完成 - 再次强调,这不是必须的。
	// 你可以从通道接收 N 次,并使用超时或其他安全机制
	wg.Wait()

	// 我们需要关闭通道,否则下面的循环将会被阻塞
	close(ch)

	// 我们遍历关闭的通道,并从中接收所有的数据
	for individual := range ch {
		population = append(population, individual)
	}

	return population
}

func createIndividual(size int, ch chan Individual, wg *sync.WaitGroup) {
	var individual = Individual{make([]bool, size), 0}

	for i := 0; i < len(individual.gene); i++ {
		if rand.Intn(2)%2 == 1 {
			individual.gene[i] = true
		} else {
			individual.gene[i] = false
		}
	}

	// 将 population 对象推送到通道中
	ch <- individual
	// 让 waitgroup 知道我们已经完成
	wg.Done()
}

以上是修改后的例子。

英文:

So basically the goroutine should not return a value but push it down a channel. If you want to wait for all goroutines to finish you can just count to the number of goroutines, or use a WaitGroup. In this example it's an overkill because the size is known, but it's good practice anyway. Here's a modified example:

package main
import (
&quot;math/rand&quot;
&quot;sync&quot;
)
type Individual struct {
gene    []bool
fitness int
}
func createPopulation(populationSize int, individualSize int) []Individual  {
// we create a slice with a capacity of populationSize but 0 size
// so we&#39;ll avoid extra unneeded allocations
population := make([]Individual, 0, populationSize)
// we create a buffered channel so writing to it won&#39;t block while we wait for the waitgroup to finish
ch := make(chan Individual, populationSize)
// we create a waitgroup - basically block until N tasks say they are done
wg := sync.WaitGroup{}
for i := 0; i &lt; populationSize; i++ {
//we add 1 to the wait group - each worker will decrease it back
wg.Add(1)
//now we spawn a goroutine
go createIndividual(individualSize, ch, &amp;wg)
}
// now we wait for everyone to finish - again, not a must.
// you can just receive from the channel N times, and use a timeout or something for safety
wg.Wait()
// we need to close the channel or the following loop will get stuck
close(ch)
// we iterate over the closed channel and receive all data from it
for individual := range ch {
population = append(population, individual)
}
return population
}	
func createIndividual(size int, ch chan Individual, wg *sync.WaitGroup) {
var individual = Individual{make([]bool, size), 0}
for i := 0; i &lt; len(individual.gene); i++ {
if rand.Intn(2)%2 == 1 {
individual.gene[i] = true
} else {
individual.gene[i] = false
}
}
// push the population object down the channel
ch &lt;- individual
// let the wait group know we finished
wg.Done()
}

答案2

得分: 5

针对你的具体问题,你根本不需要使用通道。

然而,除非你的createIndividual函数在执行计算时需要花费一些时间,否则在并行运行时,goroutine之间的上下文切换总是会更慢。

type Individual struct {
    gene    []bool
    fitness int
}

func createPopulation(populationSize int, individualSize int) (population []*Individual) {
    var wg sync.WaitGroup
    population = make([]*Individual, populationSize)

    wg.Add(populationSize)
    for i := 0; i < populationSize; i++ {
        go func(i int) {
            population[i] = createIndividual(individualSize)
            wg.Done()
        }(i)
    }
    wg.Wait()
    return
}

func createIndividual(size int) *Individual {
    individual := &Individual{make([]bool, size), 0}

    for i := 0; i < size; i++ {
        individual.gene[i] = rand.Intn(2)%2 == 1
    }

    return individual
}

func main() {
    numcpu := flag.Int("cpu", runtime.NumCPU(), "")
    flag.Parse()
    runtime.GOMAXPROCS(*numcpu)
    pop := createPopulation(1e2, 21e3)
    fmt.Println(len(pop))
}

输出结果:

┌─ oneofone@Oa [/tmp]
└──➜ go build blah.go; xtime ./blah -cpu 1
100
0.13u 0.00s 0.13r 4556kB ./blah -cpu 1
┌─ oneofone@Oa [/tmp]
└──➜ go build blah.go; xtime ./blah -cpu 4
100
2.10u 0.12s 0.60r 4724kB ./blah -cpu 4

请注意,这是一个代码示例,用于说明在不使用通道的情况下并行运行goroutine的情况。

英文:

For your specific problem you don't need to use channels at all.

However unless your createIndividual spends some time doing calculations, the context switches between the goroutines is always gonna be much slower when run in parallel.

type Individual struct {
gene    []bool
fitness int
}
func createPopulation(populationSize int, individualSize int) (population []*Individual) {
var wg sync.WaitGroup
population = make([]*Individual, populationSize)
wg.Add(populationSize)
for i := 0; i &lt; populationSize; i++ {
go func(i int) {
population[i] = createIndividual(individualSize)
wg.Done()
}(i)
}
wg.Wait()
return
}
func createIndividual(size int) *Individual {
individual := &amp;Individual{make([]bool, size), 0}
for i := 0; i &lt; size; i++ {
individual.gene[i] = rand.Intn(2)%2 == 1
}
return individual
}
func main() {
numcpu := flag.Int(&quot;cpu&quot;, runtime.NumCPU(), &quot;&quot;)
flag.Parse()
runtime.GOMAXPROCS(*numcpu)
pop := createPopulation(1e2, 21e3)
fmt.Println(len(pop))
}

Output:

┌─ oneofone@Oa [/tmp]                                                                                                            
└──➜ go build blah.go; xtime ./blah -cpu 1
100
0.13u 0.00s 0.13r 4556kB ./blah -cpu 1
┌─ oneofone@Oa [/tmp]                                                                                                            
└──➜ go build blah.go; xtime ./blah -cpu 4
100
2.10u 0.12s 0.60r 4724kB ./blah -cpu 4

答案3

得分: 2

一种常见的在循环中添加可控并行性的方法是创建一些工作协程,它们会从一个通道中读取任务。runtime.NumCPU 函数可以帮助确定创建多少个工作协程是合理的(确保适当设置 GOMAXPROCS 来充分利用这些 CPU)。然后,你只需将任务写入通道,工作协程会处理它们。

在这种情况下,任务是初始化 population 切片的元素,因此使用一个 *Individual 指针类型的通道可能是合适的。可以像这样创建通道:

ch := make(chan *Individual)
for i := 0; i < nworkers; i++ {
go initIndividuals(individualSize, ch)
}
population := make([]Individual, populationSize)
for i := 0; i < len(population); i++ {
ch <- &population[i]
}
close(ch)

工作协程的代码可能如下所示:

func initIndividuals(size int, ch <-chan *Individual) {
for individual := range ch {
// 或者,如果 createIndividual() 是唯一的调用,也可以直接在这里内联代码
*individual = createIndividual(size)
}
}

由于任务不是预先分配的,因此 createIndividual 的执行时间不重要:每个工作协程只有在上一个任务完成后才会接受新任务,并且在没有任务时退出(因为此时通道已关闭)。

但是,我们如何知道任务何时完成呢?sync.WaitGroup 类型可以帮助解决这个问题。可以修改生成工作协程的代码如下:

ch := make(chan *Individual)
var wg sync.WaitGroup
wg.Add(nworkers)
for i := 0; i < nworkers; i++ {
go initIndividuals(individualSize, ch, &wg)
}

还需要修改 initIndividuals 函数,使其接受额外的参数,并在第一条语句中添加 defer wg.Done()。现在,调用 wg.Wait() 将阻塞,直到所有工作协程完成。然后,可以返回完全构建好的 population 切片。

英文:

One common way to add controlled parallelism to a loop like this is to spawn a number of worker goroutines that will read tasks from a channel. The runtime.NumCPU function may help in deciding how many workers it makes sense to spawn (make sure you set GOMAXPROCS appropriately to take advantage of those CPUs though). You then simply write the jobs to the channel and they will be handled by the workers.

In this case where the job is to initialise elements of the population slice, so using a channel of *Individual pointers might make sense. Something like this:

ch := make(chan *Individual)
for i := 0; i &lt; nworkers; i++ {
go initIndividuals(individualSize, ch)
}
population := make([]Individual, populationSize)
for i := 0; i &lt; len(population); i++ {
ch &lt;- &amp;population[i]
}
close(ch)

The worker goroutine would look something like this:

func initIndividuals(size int, ch &lt;-chan *Individual) {
for individual := range ch {
// Or alternatively inline the createIndividual() code here if it is the only call
*individual = createIndividual(size)
}
}

Since tasks are not portioned out ahead of time, it doesn't matter if createIndividual takes a variable amount of time: each worker will only take on a new task when the last is complete, and will exit when there are no tasks left (since the channel is closed at that point).

But how do we know when the job has completed? The sync.WaitGroup type can help here. The code to spawn the worker goroutines can be modified like so:

ch := make(chan *Individual)
var wg sync.WaitGroup
wg.Add(nworkers)
for i := 0; i &lt; nworkers; i++ {
go initIndividuals(individualSize, ch, &amp;wg)
}

The initIndividuals function is also modified to take the additional parameter, and add defer wg.Done() as the first statement. Now a call to wg.Wait() will block until all the worker goroutines have completed. You can then return the fully constructed population slice.

答案4

得分: 1

如果您想避免将并发逻辑与业务逻辑混合在一起,我写了这个库https://github.com/shomali11/parallelizer来帮助您解决这个问题。它封装了并发逻辑,因此您不必担心它。

所以在您的示例中:

package main
import (
"github.com/shomali11/parallelizer"
"fmt"
)
func main() {
populationSize := 100
results = make([]*Individual, populationSize)
options := &Options{ Timeout: time.Second }
group := parallelizer.NewGroup(options)
for i := 0; i < populationSize; i++ {
group.Add(func(index int, results *[]*Individual) {
return func () {
...
results[index] = &Individual{...}
}
}(i, &results))
}
err := group.Run()
fmt.Println("Done")
fmt.Println(fmt.Sprintf("Results: %v", results))
fmt.Printf("Error: %v", err) // 如果完成则为nil,如果超时则为err
}
英文:

If you would like to avoid mixing concurrency logic with business logic, I wrote this library https://github.com/shomali11/parallelizer to help you with that. It encapsulates the concurrency logic so you do not have to worry about it.

So in your example:

package main
import (
&quot;github.com/shomali11/parallelizer&quot;
&quot;fmt&quot;
)
func main() {
populationSize := 100
results = make([]*Individual, populationSize)
options := &amp;Options{ Timeout: time.Second }
group := parallelizer.NewGroup(options)
for i := 0; i &lt; populationSize; i++ {
group.Add(func(index int, results *[]*Individual) {
return func () {
...
results[index] = &amp;Individual{...}
}
}(i, &amp;results))
}
err := group.Run()
fmt.Println(&quot;Done&quot;)
fmt.Println(fmt.Sprintf(&quot;Results: %v&quot;, results))
fmt.Printf(&quot;Error: %v&quot;, err) // nil if it completed, err if timed out
}

答案5

得分: 0

由于您事先知道将有多少个个体,我建议不要使用通道,而是直接在goroutine createIndividual中分配population的个体成员。createIndividual的函数签名将如下所示:

func createIndividual(wg *sync.WaitGroup, individual *Individual, size int)

调用代码将如下所示:

population := make([]Individual, populationSize)
wg := &sync.WaitGroup{}
wg.Add(len(population))

for i := 0; i < len(population); i++ {
    go createIndividual(wg, &population[i], individualSize)
}

wg.Wait()

因此,每个goroutine负责一个个体,它将个体分配给population中的相应位置:

func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) {
    defer wg.Done()
    *individual = Individual{make([]bool, size), 0}

    // 分配其他属性给`individual`
}

您可以在此处查看完整的代码示例。

英文:

Since you know beforehand how many individuals you will have, I would refrain from using channels and just assign the individual members of population in the goroutine createIndividual. The signature of createIndividual would then look like this:

func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) 

and the calling code would look like this:

population := make([]Individual, populationSize)
wg := &amp;sync.WaitGroup{}
wg.Add(len(population))
for i := 0; i &lt; len(population); i++ {
go createIndividual(wg, &amp;population[i], individualSize)
}
wg.Wait()

So, each go routine is responsible for exactly one individual, which it assigns to the corresponding slot in population:

func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) {
defer wg.Done()
*individual = Individual{make([]bool, size), 0}
// assign other attributes to `individual`
}

You can see a complete code example on play here.

huangapple
  • 本文由 发表于 2014年6月16日 15:46:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/24238820.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定