如何将数组处理分离为goroutine?

huangapple go评论87阅读模式
英文:

How to separate array processing into goroutines?

问题

我有一个包含30,000个字符串的切片。我该如何将这个切片分成10个goroutine来处理?每个goroutine从切片中取出3000个字符串,提取一些数据,并将其放入一个新的切片中。

最终,我将得到10个切片,每个切片中包含3000个处理结果。如何处理这个问题的模式是什么?

我看过这篇文章,但不确定其中哪种模式适用于我的情况。

英文:

I have a slice of 30'000 strings. How do I separate processing this slice into, say, 10 goroutines that would take 3000 strings from the slice, extract some data from them and push into a new slice?

So, in the end, I will have 10 slices with 3000 processed results in each. What's the pattern to handle this problem?

I have had a look at this article, but not sure which of these patterns applies to my case.

答案1

得分: 3

使用通道,从切片中读取元素,使用Fan out来分发负载和传递消息。然后,在goroutine中处理字符串,并在单个goroutine中收集结果(fan in),以避免使用互斥锁。

您可能希望设置最大并发goroutine的数量。

请记住,在写入切片时,切片不是线程安全的。

有用的信息:

https://blog.golang.org/pipelines
https://talks.golang.org/2012/concurrency.slide#1
https://blog.golang.org/advanced-go-concurrency-patterns
https://talks.golang.org/2013/advconc.slide#1

英文:

Using a channel, read the elements from the slice, use a Fan out to distribute load and pass messages. Then, process the strings in goroutines and collect the results back (fan in ) in a single goroutine to avoid mutexes.

You may want to set the number of Max concurrent concurrent goroutines.

Keep in mind that slices are not thread safe when writing to them.

Useful info:

> https://blog.golang.org/pipelines
> https://talks.golang.org/2012/concurrency.slide#1
> https://blog.golang.org/advanced-go-concurrency-patterns
> https://talks.golang.org/2013/advconc.slide#1

答案2

得分: 0

我同意@JimB的观点,为什么要限制goroutine的数量。不过既然这是你的问题,我可能会这样做。如果你真的想让每个goroutine处理3000个项目,那么创建一个二维切片可能更容易。[[3000个项目],[3000个项目],..],然后让每个索引在二维数组中的goroutine进行处理。否则,下面的代码将限制goroutine的数量为10。

方法1

package main

import (
	"crypto/rand"
	"fmt"
	"log"
	"sync"
	"time"
)

var s []string

// 生成一些模拟数据
func init() {
	s = make([]string, 30000)
	n := 5
	for i := 0; i < 30000; i++ {
		b := make([]byte, n)
		if _, err := rand.Read(b); err != nil {
			panic(err)
		}
		s[i] = fmt.Sprintf("%X", b)
	}
}

func main() {
	// 设置工作线程的数量
	ch := make(chan string)
	var mut sync.Mutex
	counter := 0

	// 将goroutine的数量限制为10
	for w := 0; w < 10; w++ {
		go func(ch chan string, mut *sync.Mutex) {
			for {
				// 使用互斥锁获取和更新计数器,以避免竞争条件
				mut.Lock()
				i := counter
				counter++
				mut.Unlock()
				// 跳出循环
				if counter > len(s) {
					return
				}
				// 获取字符串
				myString := s[i]
				// 进行一些操作,然后将其传递到通道中
				ch <- myString

			}
		}(ch, &mut)
	}
	// 添加时间。如果你调整goroutine的数量,你会看到上面的数字如何影响效率
	t := time.Now()
	for i := 0; i < len(s); i++ {
		result := <-ch
		log.Println(time.Since(t), result, i)
	}
}

方法2 init函数创建了一个二维数组,将其分成10个包含3000个元素的数组。如果你按照这种方式解析数据,下面的逻辑需要很少的修改即可工作。

package main

import (
	"crypto/rand"
	"fmt"
	"log"
	"sync"
)

var chunkedSlice [10][3000]string

// 生成一些模拟数据
// 二维数组,每个块中有3000个项目
// 有10个块,每个块有一个goroutine
func init() {
	n := 5
	for i := 0; i < 10; i++ {
		for j := 0; j < 3000; j++ {
			b := make([]byte, n)
			if _, err := rand.Read(b); err != nil {
				panic(err)
			}
			chunkedSlice[i][j] = fmt.Sprintf("%X", b)
		}
	}
}

func main() {
	// 用于发送解析后的数据的通道
	ch := make(chan string)
	var wg sync.WaitGroup

	// 10个块
	for _, chunk := range chunkedSlice {
		wg.Add(1)
		// 如果定义了二维数组,例如[10][3000]string,需要将其作为指针传递,以避免堆栈错误
		go func(ch chan string, wg *sync.WaitGroup, chunk *[3000]string) {
			defer wg.Done()
			for i := 0; i < len(chunk); i++ {
				str := chunk[i]
				// fmt.Println(str)
				// 解析数据(模拟)
				parsedData := str
				// 将解析后的数据发送到通道
				ch <- parsedData
			}
		}(ch, &wg, &chunk)
	}
	// 等待所有的goroutine完成并关闭通道
	go func() {
		wg.Wait()
		close(ch)
	}()

	var counter int // 添加以检查解析的项目数量是否正确
	// 从通道获取数据
	for res := range ch {
		log.Println(res, counter)
		counter++
	}
}
英文:

I agree with @JimB in that why limit the go routines. However since that's your inquiry I'd maybe so something like this.. If you really want to have each gorountine do 3000 items, then it might be easier to create a 2d slice. [[3000 items],[3000 items],..] then have 1 go routine process per index in that 2d array. Otherwise below just limits the gorountines to 10.
METHOD 1
package main

import (
&quot;crypto/rand&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;sync&quot;
&quot;time&quot;
)
var s []string
// genetate some mock data
func init() {
s = make([]string, 30000)
n := 5
for i := 0; i &lt; 30000; i++ {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
panic(err)
}
s[i] = fmt.Sprintf(&quot;%X&quot;, b)
}
}
func main() {
// set the number of workers
ch := make(chan string)
var mut sync.Mutex
counter := 0
// limit the number of gorountines to 10
for w := 0; w &lt; 10; w++ {
go func(ch chan string, mut *sync.Mutex) {
for {
// get and update counter using mux to stop race condtions
mut.Lock()
i := counter
counter++
mut.Unlock()
// break the loop
if counter &gt; len(s) {
return
}
// get string
myString := s[i]
// to some work then pass to channel
ch &lt;- myString
}
}(ch, &amp;mut)
}
// adding time.  If you play wiht the number of gorountines youll see how changing the number above can efficiency 
t := time.Now()
for i := 0; i &lt; len(s); i++ {
result := &lt;-ch
log.Println(time.Since(t), result, i)
}
}

METHOD2 the init function is creating a 2d array chunked out into 10 arrays each containing 3000 elements.. if you parse your data that way the logic below that needs very little modification to work

package main
import (
&quot;crypto/rand&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;sync&quot;
)
var chunkedSlice [10][3000]string
// genetate some mock data
// 2d array, each chunk has 3000 items in it
// there are 10 chunks, 1 go rountine per chunk
func init() {
n := 5
for i := 0; i &lt; 10; i++ {
for j := 0; j &lt; 3000; j++ {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
panic(err)
}
chunkedSlice[i][j] = fmt.Sprintf(&quot;%X&quot;, b)
}
}
}
func main() {
// channel to send parsed data to
ch := make(chan string)
var wg sync.WaitGroup
// 10 chunks
for _, chunk := range chunkedSlice {
wg.Add(1)
// if defining the 2d array e.g [10][3000]string, you need to pass it as a pointer to avoid stack error
go func(ch chan string, wg *sync.WaitGroup, chunk *[3000]string) {
defer wg.Done()
for i := 0; i &lt; len(chunk); i++ {
str := chunk[i]
// fmt.Println(str)
// parse the data (emulating)
parsedData := str
// send parsed data to the channel
ch &lt;- parsedData
}
}(ch, &amp;wg, &amp;chunk)
}
// wait for all the routines to finish and close the channel
go func() {
wg.Wait()
close(ch)
}()
var counter int // adding to check that the right number of items was parsed
// get the data from the channel
for res := range ch {
log.Println(res, counter)
counter++
}
}

答案3

得分: 0

我开发了一个名为parapipe的库来解决这个问题。只需将你的切片分成10个部分,并将它们发送到Pipeline中,它将并发地处理它们:

import "github.com/nazar256/parapipe"
//...
var longStringSlice []string
// ...
pipeline := parapipe.NewPipeline(10).
	Pipe(func(msg interface{}) interface{} {
	slicePart := msg.([]string)
	// 在这里进行处理以得到结果
	return result
})
// 切割切片并流式传输部分
chopSize := int(math.Ceil(float64(len(longStringSlice)) / 10))
for i:=0;i<10;i++ {
	firstIdx := i * chopSize
	lastIdx := (i+1) * chopSize
	if lastIdx > len(longStringSlice) {
		lastIdx = len(longStringSlice)
	}
	pipeline.In() <- longStringSlice[firstIdx:lastIdx]
}
英文:

I have developed a library parapipe to solve such task. Just slice your slice into 10 parts and send them to Pipeline, which will process them concurrently:

import &quot;github.com/nazar256/parapipe&quot;
//...
var longStringSlice []string
// ...
pipeline := parapipe.NewPipeline(10).
Pipe(func(msg interface{}) interface{} {
slicePart := msg.([]string)
// process here to result
return result
})
// chop the slice and stream parts
chopSize := int(math.Ceil(float64(len(longStringSlice)) / 10))
for i:=0;i&lt;10;i++ {
firstIdx := i * chopSize
lastIdx := (i+1) * chopSize
if lastIdx &gt; len(longStringSlice) {
lastIdx = len(longStringSlice)
}
pipeline.In() &lt;- longStringSlice[firstIdx:lastIdx]
}

答案4

得分: 0

在不同的goroutine中处理不同的内存地址没有问题。如果你的读写操作不会覆盖块,那么你可以简单地使用massif中的范围进行操作。
https://play.golang.org/p/MU5njoNmIf7

英文:

There is no problem handling different memory addresses in different goroutines. If your read and write actions do not overshadow the chunks, that you can simply work with ranges in massif.
https://play.golang.org/p/MU5njoNmIf7

huangapple
  • 本文由 发表于 2017年3月31日 22:50:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/43143632.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定