如何进行并行的通用Go处理?

huangapple go评论62阅读模式
英文:

How to make parallel generic go treatment?

问题

我有以下函数:

func myrun(entries []WhatEverType) {
    for i := range entries {
        dotreatment(entries[i])
    } 
}

我想要并行调用dotreatment函数,我尝试了以下方法:

func myrunMT(entries []WhatEverType) {
    var wg sync.WaitGroup
    stopped := false
    threads := 5 // 线程数可以作为参数
    com := make(chan WhatEverType, 100) // chan的大小可以作为参数
    wg.Add(threads)
    for i := 0; i < threads; i++ {
        go func() {
            for !stopped || len(com) {
                select {
                    case entry := <-com:
                        dotreatment(entry) // 如果需要,可以加锁
                    case time.After(100*time.Millisecond):
                }
            }
            wg.Done()
        }()
    }
    for _, entry := range entries {
        com <- entry
    }
    stopped = true
    wg.Wait()
}

有没有更好的方法来实现这个功能?特别是我想避免通过chan发送所有的entries,而只使用go routines之间的共享索引。

英文:

I have the following function:

func myrun(entries []WhatEverType) {
    for i := range entries {
        dotreatment(entries[i])
    } 
}

I want to make parallel calls to dotreatment, I tried the following:

    func myrunMT(entries []WhatEverType) {
        var wg sync.WaitGroup
        stopped := false
        threads := 5 //number of threads could be argument
        com := make(chan WhatEverType, 100) //size of chan could be argument
        wg.Add(threads)
        for i := 0; i &lt; threads; i++ {
            go func() {
                for !stopped || len(com) {
                    select {
                        case entry := &lt;-com:
                            dotreatment(entry) //lock if necessary
                        case time.After(100*time.Millisecond):
                    }
                }
                wg.Done()
            }()
        }
        for _, entry := range entries {
            com &lt;- entry
        }
        stopped = true
        wg.Wait()
    }

</code>

Is there any better way to do it? Especially I would like to avoid sending all the entries through a chan and only use a shared index between the go routines.

答案1

得分: 1

首先,你的解决方案存在数据竞争。你从多个goroutine中读取和修改stopped变量。

一个简单的解决方案是将传递的切片的索引范围分割,并让多个goroutine处理不同的索引范围。代码如下:

func process(ms []My) {
    workers := 5
    count := len(ms) / workers
    if count*workers < len(ms) {
        count++
    }

    wg := &sync.WaitGroup{}
    for idx := 0; idx < len(ms); {
        wg.Add(1)
        idx2 := idx + count
        if idx2 > len(ms) {
            idx2 = len(ms)
        }
        ms2 := ms[idx:idx2]
        idx = idx2
        go func() {
            defer wg.Done()
            for i := range ms2 {
                handle(&ms2[i])
            }
        }()
    }
    wg.Wait()
}

func handle(m *My) {}

对于worker goroutine的数量,你可以使用runtime.GOMAXPROCS(),因为如果处理条目不涉及IO操作(或等待goroutine外部的某些东西),则不需要让Go运行时管理超过可以主动运行的goroutine数量:

workers := runtime.GOMAXPROCS(0)

请注意,尽管此解决方案不涉及通过通道发送条目,但如果一个(或一些)goroutine提前完成,CPU利用率可能会在最后降低(当较少的goroutine有工作要做时)。

生产者-消费者模型的优点是所有worker goroutine将平等地工作到最后。但是,通信开销可能不可忽略。哪种方法更好取决于每个条目需要完成的工作量。

改进的版本可以混合两种方法:可以通过通道发送较小的切片,较小的索引范围,例如每次发送100个条目的批次。与第一个解决方案相比,这可以减少空闲时间,并且还可以减少通信开销,因为条目是逐个通过通道发送的,所以发送的值只是总数的百分之一。

以下是这个改进的混合版本的示例实现:

func process(ms []My) {
    workers := runtime.GOMAXPROCS(0)
    // 平均每个worker处理100个任务:
    count := len(ms) / workers / 100
    if count < 1 {
        count = 1
    }

    ch := make(chan []My, workers*2) // 缓冲区大小与worker数量成比例

    wg := &sync.WaitGroup{}

    // 启动worker
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            for ms2 := range ch {
                for j := range ms2 {
                    handle(&ms2[j])
                }
            }
        }()
    }

    // 发送任务:
    for idx := 0; idx < len(ms); {
        idx2 := idx + count
        if idx2 > len(ms) {
            idx2 = len(ms)
        }
        ch <- ms[idx:idx2]
        idx = idx2
    }

    // 任务发送完毕,关闭通道:
    close(ch)

    // 等待worker完成所有任务的处理:
    wg.Wait()
}

请注意,没有stopping变量来表示完成。相反,我们在每个goroutine中使用了for range循环,因为它会在通道关闭之前遍历通道的所有值,并且对于并发使用是安全的。一旦通道关闭并且goroutine处理了通道上发送的所有任务,它们就会终止,整个处理算法也会终止(而不是提前终止),这意味着所有任务都将被处理。

英文:

First, your solution has data race. You're reading and modifying the stopped variable from multiple goroutines.

An easy solution could be to divide the index range of the passed slice, and have multiple goroutines process the different index ranges. This is how it could look like:

func process(ms []My) {
	workers := 5
	count := len(ms) / workers
	if count*workers &lt; len(ms) {
		count++
	}

	wg := &amp;sync.WaitGroup{}
	for idx := 0; idx &lt; len(ms); {
		wg.Add(1)
		idx2 := idx + count
		if idx2 &gt; len(ms) {
			idx2 = len(ms)
		}
		ms2 := ms[idx:idx2]
		idx = idx2
		go func() {
			defer wg.Done()
			for i := range ms2 {
				handle(&amp;ms2[i])
			}
		}()
	}
	wg.Wait()
}

func handle(m *My) {}

For the number of worker goroutines you could use runtime.GOMAXPROCS(), as if processing the entries does not involve IO operations (or waiting for something outside of the goroutine), no need to have the Go runtime manage more goroutines than those that can run actively:

workers := runtime.GOMAXPROCS(0)

Note that although this solution does not involve sending entries through a channel, if one (some) goroutine finishes earlier, CPU utilization might drop at the end (when less goroutines have work to do).

The advantage of the producer-consumer model is that all worker goroutines will work equally till the end. But yes, the communication overhead might not be negligible. Whether one is better than the other depends on the amount of work that needs to be done on each entry.

An improved version could mix the 2: you could send smaller slices, smaller index ranges over a channel, e.g. batches of 100 entries. This could reduce the idle time compared to the first solution, and could also reduce the communication overhead as entries are sent over the channel individually, so the values sent is only one hundredth of the total number.

This is an example implementation of this improved, mixed version:

func process(ms []My) {
	workers := runtime.GOMAXPROCS(0)
	// 100 jobs per worker average:
	count := len(ms) / workers / 100
	if count &lt; 1 {
		count = 1
	}

	ch := make(chan []My, workers*2) // Buffer size scales with # of workers

	wg := &amp;sync.WaitGroup{}

	// Start workers
	wg.Add(workers)
	for i := 0; i &lt; workers; i++ {
		go func() {
			defer wg.Done()
			for ms2 := range ch {
				for j := range ms2 {
					handle(&amp;ms2[j])
				}
			}
		}()
	}

	// Send jobs:
	for idx := 0; idx &lt; len(ms); {
		idx2 := idx + count
		if idx2 &gt; len(ms) {
			idx2 = len(ms)
		}
		ch &lt;- ms[idx:idx2]
		idx = idx2
	}

	// Jobs sent, close channel:
	close(ch)

	// Wait workers to finish processing all jobs:
	wg.Wait()
}

Note that there is no stopping variable to signal completion. Instead we used a for range on a channel in each goroutine, as this ranges over the channel until the channel is closed, and it's safe for concurrent use. Once the channel is closed and the goroutines processed all jobs sent on the channel, they terminate, and so does the overall processing algorithm (and not earlier – meaning all jobs will be processed).

答案2

得分: 0

我不会混合使用通道和同步原语。在Go语言中,仅使用通道是惯用的做法。请记住,Go协程不是线程,它们更轻量级且开销较低。启动一百万个协程并不是什么大问题。如果结果的顺序不重要,我会这样做:

func parallelRun(input []WhateverInputType) []WhateverOutputType {
    out := make(chan WhateverOutputType, len(input))
    for _, item := range input {
        go func(i WhateverInputType) {
            out <- process(i)
        }(item)
    }

    res := make([]WhateverOutputType, len(input))
    for i := 0; i < len(input); i++ {
        res[i] = <-out
    }

    return res
}

func process(input WhateverInputType) WhateverOutputType {
    time.Sleep(50 * time.Millisecond)
    return WhateverOutputType{}
}

假设process函数的执行时间比收集结果的时间长,我甚至会使用一个阻塞通道out := make(chan WhateverOutputType)。请注意,将数组作为参数传递并不理想(会进行复制),但我尽量保持了你原始代码的精神。

英文:

I would not mix channels and synchronization primitives. Use of channels exclusively is idiomatic Go. Bear in mind that Go routines are not threads, there are much lighter with low overhead. Launching one million of them is not a big deal.<br>If the order of the result does not matter, I would do something like this:

func parallelRun(input []WhateverInputType) []WhateverOutputType {
	out := make(chan WhateverOutputType, len(input))
	for _, item := range input {
		go func(i WhateverInputType) {
			out &lt;- process(i)
		}(item)
	}

	res := make([]WhateverOutputType, len(input))
	for i := 0; i &lt; len(input); i++ {
		res[i] = &lt;-out
	}

	return res
}

func process(input WhateverInputType) WhateverOutputType {
	time.Sleep(50 * time.Millisecond)
	return WhateverOutputType{}
}

Assuming ‘process’ takes much longer than collecting the result, I would even use a blocking channel out := make(chan WhateverOutputType)<br>Please note that passing arrays as parameters is not ideal (there are copied) but I tried to keep the spirit of your original code.

答案3

得分: 0

在使用共享索引而没有复制数据的情况下,经过搜索后,我得到了以下结果:

func myrunMT(entries []WhatEverType) int {
    lastone := int32(len(entries)-1)
    current := int32(0)
    var wg sync.WaitGroup
    threads := 5
    // 启动线程
    wg.Add(threads)
    for i := 0; i < threads; i++ {
        go func() {
            for {
                idx := atomic.AddInt32(&current, 1)-1
                if atomic.LoadInt32(&current) > atomic.LoadInt32(&lastone) {
                    break
                } 
                dotreatment(entries[idx])
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

希望对你有帮助!

英文:

After searching I get the following with no copy of data using a shared index:

func myrunMT(entries []WhatEverType) int {
    lastone := int32(len(entries)-1)
    current := int32(0)
    var wg sync.WaitGroup
    threads := 5
    //start threads
    wg.Add(threads)
    for i := 0; i &lt; threads; i++ {
        go func() {
            for {
                idx := atomic.AddInt32(&amp;current, 1)-1
                if Loadint32(&amp;current) &gt; Loadint32(&amp;lastone) {
                    break
                } 
                dotreatment(entries[idx])
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

huangapple
  • 本文由 发表于 2017年1月23日 17:24:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/41802835.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定