遍历输入并使用固定数量的goroutine存储输出。

huangapple go评论76阅读模式
英文:

Iterate over inputs and store outputs using a fixed number of goroutines

问题

我正在做一些可能是常见模式的事情,但我无法看到应该如何处理这个问题。

在这个人为的例子中,我有一个函数,它计算字符串中的字母数,我希望它在切片的每个元素上运行,并将结果存储在一个映射中,所以

[]string = {"one", "two", "three"}

得到

map[string]int = {"one":3, "two":3, "three":5}

我正在使用"guard"模式,以确保只有cores个goroutine在任何时候运行(我认为将并发goroutine的数量设置为系统上的虚拟处理器数量是理想的)。

const cores int = 2

var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}

type result struct {
    name string
    res  int
}

func count_letters(word string, cGuard chan struct{}, cResults chan result) {
    time.Sleep(1 * time.Second)
    fmt.Println(word)
    <-cGuard
    cResults <- result{word, len(word)}
}

func main() {
    cGuard := make(chan struct{}, cores)
    cResults := make(chan result, cores)

    mResults := map[string]int{}

    for _, name := range words {
        cGuard <- struct{}{}
        // 需要用cResults的输出填充mResults
        go count_letters(name, cGuard, cResults)
    }
    fmt.Scanln()
}

这个代码是可以工作的,但我不确定如何从cResults通道中取出result结构体并在内联中填充映射mResults

我可以将cResults的缓冲区大小设置为len(words),然后等待for循环结束后将它们全部取出,但这似乎非常不优雅,并且如果words的长度非常大,可能会有问题。

英文:

I'm doing something that must be a common pattern, but I can't see the way this should be handled.

In this contrived example, I've got a function which counts the letters in a string, and I want it to run on every element in a slice, and store the results in a map, so

[]string = {&quot;one&quot;, &quot;two&quot;, &quot;three&quot;}

yields

map[string]int = {&quot;one&quot;:3, &quot;two&quot;:3, &quot;three&quot;:5}

I'm using the guard pattern, to ensure only a cores number of goroutines are running at any one time (I figure it must be ideal to have the number of concurrent goroutines set to be the number of virtual processors on the system?)

const cores int = 2

var words = []string{&quot;hello&quot;, &quot;there&quot;, &quot;this&quot;, &quot;is&quot;, &quot;a&quot;, &quot;list&quot;, &quot;of&quot;, &quot;words&quot;}

type result struct {
	name string
	res  int
}

func count_letters(word string, cGuard chan struct{}, cResults chan result) {
	time.Sleep(1 * time.Second)
	fmt.Println(word)
	&lt;-cGuard
	cResults &lt;- result{word, len(word)}
}

func main() {
	cGuard := make(chan struct{}, cores)
	cResults := make(chan result, cores)

	mResults := map[string]int{}

	for _, name := range words {
		cGuard &lt;- struct{}{}
		// Need to populate mResults with the output from cResults 
		go count_letters(name, cGuard, cResults)
	}
	fmt.Scanln()
}

This works, but I'm not sure how to get the result structs out of the cResults channel to populate the map inline.

I could set the buffer size of cResults to len(words), and then wait until the for loop is finished, and pull them all out afterwards, but that seems very inelegant, and an issue if the length of words is very big?

答案1

得分: 4

对于这个特定的用例,工作池模式会更适合。

在你的示例中,你为每个单词启动一个单独的 goroutine,虽然 Go 可以处理这个问题,但效率不高,因为运行时需要启动一个新的 goroutine 并停止旧的 goroutine,同时跟踪所有 goroutine。

使用工作池,我们只启动所需数量的 goroutine,并通过通道给工作线程分配任务。这样可以减少很多开销,因为工作线程始终是相同的 goroutine。结果的收集也是通过通道完成的。我们使用 WaitGroup 来确保在所有工作线程完成之前不会终止。

这是你示例的工作池版本:

package main

import (
	"fmt"
	"sync"
	"time"
)

// 2 for testing, in the real world runtime.NumCPU() would be used
const cores int = 2

var words = []string{"hello", "there", "this", "is", "a", "list", "of", "words"}

type result struct {
	name string
	res  int
}

func count_letters(wg *sync.WaitGroup, cWords chan string, cResults chan result) {
	// Tell the waitgroup we are done once we return
	defer wg.Done()

	// Read from cWords until it is closed, at which point we return
	for word := range cWords {
		time.Sleep(1 * time.Second)
		cResults <- result{word, len(word)}
	}
}

func main() {
	cWords := make(chan string)
	cResults := make(chan result)

	// This waitgroup will later be used to wait for all worker to be done
	var wg sync.WaitGroup
	for i := 0; i < cores; i++ {
		// Add 1 before starting the goroutine
		wg.Add(1)
		go count_letters(&wg, cWords, cResults)
	}

	// Collect the results via a goroutine, since we need to submit tasks and collect results at the same time
	mResults := map[string]int{}
	go func() {
		for result := range cResults {
			mResults[result.name] = result.res
		}
	}()

	// Insert all words into the cWords chan
	for _, word := range words {
		cWords <- word
	}

	// After all words have been inserted, close the channel, this will cause the workers to exit
	// once all words have been read from the channel
	close(cWords)
	// Wait for all workers to be done
	wg.Wait()
	// Close the results chan, this will terminate our collection go routine, good practice but not necessary in this
	// specific example
	close(cResults)

	// Use the results
	fmt.Println(mResults)
}
英文:

For this specific use case a worker pool pattern would be a better fit.

In your example you start a seperate goroutine for each word, while go can handle this it is not very efficient since the runtime has to spin up a new go routine and stop the old one, all the while keeping track of all of them.

With a worker pool we start exactly the amount of goroutines as we want, and we give the workers tasks via a channel. This cuts out a lot of overhead the workers are always the same goroutines. Collection of the results are also done with a channel. And use a WaitGroup to make sure we don't terminate before all workers are done.

This is the worker pool version of your example:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

// 2 for testing, in the real world runtime.NumCPU() would be used
const cores int = 2

var words = []string{&quot;hello&quot;, &quot;there&quot;, &quot;this&quot;, &quot;is&quot;, &quot;a&quot;, &quot;list&quot;, &quot;of&quot;, &quot;words&quot;}

type result struct {
	name string
	res  int
}

func count_letters(wg *sync.WaitGroup, cWords chan string, cResults chan result) {
	// Tell the waitgroup we are done once we return
	defer wg.Done()

	// Read from cWords until it is closed, at which point we return
	for word := range cWords {
		time.Sleep(1 * time.Second)
		cResults &lt;- result{word, len(word)}
	}
}

func main() {
	cWords := make(chan string)
	cResults := make(chan result)

	// This waitgroup will later be used to wait for all worker to be done
	var wg sync.WaitGroup
	for i := 0; i &lt; cores; i++ {
		// Add 1 before starting the goroutine
		wg.Add(1)
		go count_letters(&amp;wg, cWords, cResults)
	}

	// Collect the results via a goroutine, since we need to submit tasks and collect results at the same time
	mResults := map[string]int{}
	go func() {
		for result := range cResults {
			mResults[result.name] = result.res
		}
	}()

	// Insert all words into the cWords chan
	for _, word := range words {
		cWords &lt;- word
	}

	// After all words have been inserted, close the channel, this will cause the workers to exit
	// once all words have been read from the channel
	close(cWords)
	// Wait for all workers to be done
	wg.Wait()
	// Close the results chan, this will terminate our collection go routine, good practice but not necessary in this
	// specific example
	close(cResults)

	// Use the results
	fmt.Println(mResults)
}

huangapple
  • 本文由 发表于 2022年1月22日 21:00:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/70813065.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定