英文:
"fan in" - one "fan out" behavior
问题
假设我们有三种实现"fan in"行为的方法:
func MakeChannel(tries int) chan int {
ch := make(chan int)
go func() {
for i := 0; i < tries; i++ {
ch <- i
}
close(ch)
}()
return ch
}
func MergeByReflection(channels ...chan int) chan int {
length := len(channels)
out := make(chan int)
cases := make([]reflect.SelectCase, length)
for i, ch := range channels {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
go func() {
for length > 0 {
i, line, opened := reflect.Select(cases)
if !opened {
cases[i].Chan = reflect.ValueOf(nil)
length -= 1
} else {
out <- int(line.Int())
}
}
close(out)
}()
return out
}
func MergeByCode(channels ...chan int) chan int {
length := len(channels)
out := make(chan int)
go func() {
var i int
var ok bool
for length > 0 {
select {
case i, ok = <-channels[0]:
out <- i
if !ok {
channels[0] = nil
length -= 1
}
case i, ok = <-channels[1]:
out <- i
if !ok {
channels[1] = nil
length -= 1
}
case i, ok = <-channels[2]:
out <- i
if !ok {
channels[2] = nil
length -= 1
}
case i, ok = <-channels[3]:
out <- i
if !ok {
channels[3] = nil
length -= 1
}
case i, ok = <-channels[4]:
out <- i
if !ok {
channels[4] = nil
length -= 1
}
}
}
close(out)
}()
return out
}
func MergeByGoRoutines(channels ...chan int) chan int {
var group sync.WaitGroup
out := make(chan int)
for _, ch := range channels {
go func(ch chan int) {
for i := range ch {
out <- i
}
group.Done()
}(ch)
}
group.Add(len(channels))
go func() {
group.Wait()
close(out)
}()
return out
}
type MergeFn func(...chan int) chan int
func main() {
length := 5
tries := 1000000
channels := make([]chan int, length)
fns := []MergeFn{MergeByReflection, MergeByCode, MergeByGoRoutines}
for _, fn := range fns {
sum := 0
t := time.Now()
for i := 0; i < length; i++ {
channels[i] = MakeChannel(tries)
}
for i := range fn(channels...) {
sum += i
}
fmt.Println(time.Since(t))
fmt.Println(sum)
}
}
结果是(在1个CPU上,我使用了runtime.GOMAXPROCS(1)
):
19.869秒(MergeByReflection)
2499997500000
8.483秒(MergeByCode)
2499997500000
4.977秒(MergeByGoRoutines)
2499997500000
结果是(在2个CPU上,我使用了runtime.GOMAXPROCS(2)
):
44.94秒
2499997500000
10.853秒
2499997500000
3.728秒
2499997500000
- 我理解为什么MergeByReflection最慢,但MergeByCode和MergeByGoRoutines之间的区别是什么?
- 当我们增加CPU数量时,为什么"select"子句(直接在MergeByReflection中使用,间接在MergeByCode中使用)变得更慢?
英文:
Say, we have three methods to implement "fan in" behavior
func MakeChannel(tries int) chan int {
ch := make(chan int)
go func() {
for i := 0; i < tries; i++ {
ch <- i
}
close(ch)
}()
return ch
}
func MergeByReflection(channels ...chan int) chan int {
length := len(channels)
out := make(chan int)
cases := make([]reflect.SelectCase, length)
for i, ch := range channels {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
go func() {
for length > 0 {
i, line, opened := reflect.Select(cases)
if !opened {
cases[i].Chan = reflect.ValueOf(nil)
length -= 1
} else {
out <- int(line.Int())
}
}
close(out)
}()
return out
}
func MergeByCode(channels ...chan int) chan int {
length := len(channels)
out := make(chan int)
go func() {
var i int
var ok bool
for length > 0 {
select {
case i, ok = <-channels[0]:
out <- i
if !ok {
channels[0] = nil
length -= 1
}
case i, ok = <-channels[1]:
out <- i
if !ok {
channels[1] = nil
length -= 1
}
case i, ok = <-channels[2]:
out <- i
if !ok {
channels[2] = nil
length -= 1
}
case i, ok = <-channels[3]:
out <- i
if !ok {
channels[3] = nil
length -= 1
}
case i, ok = <-channels[4]:
out <- i
if !ok {
channels[4] = nil
length -= 1
}
}
}
close(out)
}()
return out
}
func MergeByGoRoutines(channels ...chan int) chan int {
var group sync.WaitGroup
out := make(chan int)
for _, ch := range channels {
go func(ch chan int) {
for i := range ch {
out <- i
}
group.Done()
}(ch)
}
group.Add(len(channels))
go func() {
group.Wait()
close(out)
}()
return out
}
type MergeFn func(...chan int) chan int
func main() {
length := 5
tries := 1000000
channels := make([]chan int, length)
fns := []MergeFn{MergeByReflection, MergeByCode, MergeByGoRoutines}
for _, fn := range fns {
sum := 0
t := time.Now()
for i := 0; i < length; i++ {
channels[i] = MakeChannel(tries)
}
for i := range fn(channels...) {
sum += i
}
fmt.Println(time.Since(t))
fmt.Println(sum)
}
}
Results are (at 1 CPU, I have used runtime.GOMAXPROCS(1)):
19.869s (MergeByReflection)
2499997500000
8.483s (MergeByCode)
2499997500000
4.977s (MergeByGoRoutines)
2499997500000
Results are (at 2 CPU, I have used runtime.GOMAXPROCS(2)):
44.94s
2499997500000
10.853s
2499997500000
3.728s
2499997500000
- I understand the reason why MergeByReflection is slowest, but what is about the difference between MergeByCode and MergeByGoRoutines?
- And when we increase the CPU number why "select" clause (used MergeByReflection directly and in MergeByCode indirectly) becomes slower?
答案1
得分: 3
这里有一个初步的备注。你的示例中的通道都是无缓冲的,这意味着它们在放入或获取时可能会阻塞。
在这个示例中,除了通道管理之外几乎没有其他处理。因此,性能主要受到同步原语的影响。实际上,这段代码中很少有可以并行化的部分。
在MergeByReflection和MergeByCode函数中,使用select来监听多个输入通道,但没有做任何处理来考虑输出通道(因此输出通道可能会阻塞,而某个输入通道可能有可用的事件)。
在MergeByGoRoutines函数中,这种情况是不会发生的:当输出通道阻塞时,它不会阻止另一个goroutine读取另一个输入通道。因此,运行时有更好的机会并行化goroutine,并且输入通道上的争用较少。
MergeByReflection代码是最慢的,因为它有反射的开销,并且几乎没有什么可以并行化的部分。
MergeByGoRoutines函数是最快的,因为它减少了争用(需要较少的同步),并且输出争用对输入性能的影响较小。因此,在多个核心上运行时可以获得一些改进(与其他两种方法相反)。
在MergeByReflection和MergeByCode中有很多同步活动,因此在多个核心上运行会对性能产生负面影响。不过,如果使用缓冲通道,可能会有不同的性能表现。
英文:
Here is a preliminary remark. The channels in your examples are all unbuffered, meaning they will likely block at put or get time.
In this example, there is almost no processing except channel management. The performance is therefore dominated by synchronization primitives. Actually, there is very little of this code that can be parallelized.
In the MergeByReflection and MergeByCode functions, select is used to listen to multiple input channels, but nothing is done to take in account the output channel (which may therefore block, while some event could be available on one of the input channels).
In the MergeByGoRoutines function, this situation cannot happen: when the output channel blocks, it does not prevent an other input channel to be read by another goroutine. There are therefore better opportunities for the runtime to parallelize the goroutines, and less contention on the input channels.
The MergeByReflection code is the slowest because it has the overhead of reflection, and almost nothing can be parallelized.
The MergeByGoRoutines function is the fastest because it reduces the contention (less synchronization is needed), and because output contention has a lesser impact on the input performance. It can therefore benefit of a small improvement when running with multiple cores (contrary to the two other methods).
There is so much synchronization activity with MergeByReflection and MergeByCode, that running on multiple cores negatively impacts the performance. You could have different performance by using buffered channels though.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论