在N>1个goroutine(在N>1个CPU上)上会产生不同的结果。为什么?

huangapple go评论103阅读模式
英文:

Different results for N>1 goroutines (on N>1 Cpu:s). Why?

问题

我有一个测试程序,当在多个CPU上执行多个goroutine时,会产生不同的结果(Goroutines = Cpus)。这个“测试”是关于使用通道同步goroutines的,程序本身会计算字符串中字符的出现次数。在一个CPU / 一个goroutine上产生一致的结果。

问题:当在多个CPU上执行时,结果(字符的出现次数)会有所变化。为什么?

描述:

  1. 一个goroutine生成工作(SpawnWork)作为字符串传递给Workers。设置人工字符串输入数据(硬编码字符串被复制n次)。
  2. goroutine Workers(Worker)的数量等于CPU的数量。
  3. Workers检查字符串中的每个字符,并计算A、T的数量,并将总和发送到一个通道,计算G、C的数量并发送到另一个通道。
  4. SpawnWork关闭工作字符串通道以控制Workers(使用range消耗字符串的Workers会在输入通道被SpawnWork关闭时退出)。
  5. 当Workers消耗完它们的范围(字符)时,它们会在退出通道上发送一个退出信号(quit <- true)。这些“脉冲”将发生Cpu次数(Cpu计数=goroutines计数)。
  6. 主循环(select)在接收到Cpu计数数量的退出信号时退出。
  7. 主函数打印字符出现次数的摘要(A、T、G、C)。

简化代码:

  1. “Worker”(goroutines)计算行中的字符:

    func Worker(inCh chan *[]byte, resA chan<- *int, resB chan<- *int, quit chan bool) {
    //for p_ch := range inCh {
    for {
    p_ch, ok := <-inCh // 类似于range
    if ok {
    ch := *p_ch
    for i := 0; i < len(ch); i++ {
    if ch[i] == 'A' || ch[i] == 'T' { // 计算A和T的数量
    at++
    } else if ch[i] == 'G' || ch[i] == 'C' { // 计算G和C的数量
    gc++
    }
    }
    resA <- &at // 在不同的通道上发送行结果
    resB <- &gc // 在不同的通道上发送行结果
    } else {
    quit <- true // 表示我们已经完成了
    break
    }
    }
    }

  2. 将工作(字符串)分配给Workers:

    func SpawnWork(inStr chan<- *[]byte, quit chan bool) {
    // 人工输入数据
    StringData :=
    "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
    "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" +
    "... etc\n" +
    // ...
    for scanner.Scan() {
    s := scanner.Bytes()
    if len(s) == 0 || s[0] == '>' {
    continue
    } else {
    i++
    inStr <- &s
    }
    }
    close(inStr) // 表示(对于Workers来说)没有更多的字符串了。
    }

  3. 主例程:

    func main() {
    // 计算CPU数量,并在最终的select子句中倒计时
    CpuCnt := runtime.NumCPU()
    runtime.GOMAXPROCS(CpuCnt)
    // 创建通道
    resChA := make(chan *int)
    resChB := make(chan *int)
    quit := make(chan bool)
    inStr := make(chan *[]byte)

     // 设置Workers(n = Cpu)
     for i := 0; i < CpuCnt; i++ {
         go Worker(inStr, resChA, resChB, quit)
     }
     // 将行发送给Workers
     go SpawnWork(inStr, quit)
    
     // 计算每行的“A”、“T”和“G”、“C”的数量
     // (以整数形式在单独的通道(at和gt)上传入)
     for {
         select {
         case tmp_at := <-resChA:
             tmp_gc := <-resChB // A和B总是成对出现
             A += *tmp_at       // A和T的总和
             B += *tmp_gc       // G和C的总和
         case <-quit:
             // 每个goroutine在完成时发送“quit”信号。由于
             // goroutine的数量等于CPU计数器,每次goroutine告诉我们它完成时,我们都会倒计时(quit为0):
             CpuCnt--
             if CpuCnt == 0 { // 当所有goroutine都完成时,我们完成。
                 goto out     
             }
         }
     }
    

    out:
    // 打印报告到屏幕
    }

为什么这段代码只在单个CPU / goroutine上执行时一致计数?也就是说,通道似乎没有同步,或者主循环在所有goroutine完成之前强制退出?令人困惑。

(再次:在playground上查看/运行完整代码:http://play.golang.org/p/PT5jeCKgBv)

// Rolf Lampa

英文:

I have a test program that gives different results when executing more than one goroutine on more than one Cpu (Goroutines = Cpus). The "test" is about syncing goroutines using channels, and the program itself counts occurences of chars in strings. It produces consistent results on one Cpu / one goroutine.

See code example on playground (Note: Run on local machine to execute on multi core, and watch the resulting numbers vary): http://play.golang.org/p/PT5jeCKgBv .

Code summary: The program counts occurences of 4 different chars (A,T, G,C) in (DNA) strings.

Problem: Result (n occurences of chars) varies when executed on multiple Cpu's (goroutines). Why?

Description:

  1. A goroutine spawns work (SpawnWork) as strings to Workers. Sets up
    artificial string input data (hardcoded strings are copied n times).
  2. Goroutine Workers (Worker) are created equalling the numbers of Cpu's.
  3. Workers checks each char in string and counts A,T's and sends the
    sum into a channel, and G,C counts to another channel.
  4. SpawnWork closes workstring channel as to control Workers (which consumes strings using range, which quits when the input channel is closed by SpawnWork).
  5. When Workers has consumed its ranges (of chars) it sends a quit signal on the quit channel (quit <- true). These "pulses" will occure Cpu number of times ( Cpu count = goroutines count).
  6. Main (select) loop will quit when it has received Cpu-count number of quit
    signals.
  7. Main func prints a summary of occurences of Chars (A,T's, G,C's).

Simplified code:

1. "Worker" (goroutines) counting chars in lines:

func Worker(inCh chan *[]byte, resA chan&lt;- *int, resB chan&lt;- *int, quit chan bool) {
    //for p_ch := range inCh {
    for {
        p_ch, ok := &lt;-inCh // similar to range
        if ok {
            ch := *p_ch
            for i := 0; i &lt; len(ch); i++ {
                if ch[i] == &#39;A&#39; || ch[i] == &#39;T&#39; {        // Count A:s and T:s
                    at++
                } else if ch[i] == &#39;G&#39; || ch[i] == &#39;C&#39; { // Count G:s and C:s
                    gc++
                }
            }
            resA &lt;- &amp;at  // Send line results on separate channels
            resB &lt;- &amp;gc  // Send line results on separate channels
        } else {
            quit &lt;- true // Indicate that we&#39;re all done
            break
        }
    }
}

2. Spawn work (strings) to workers:

func SpawnWork(inStr chan&lt;- *[]byte, quit chan bool) {
    // Artificial input data
    StringData :=
        &quot;NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n&quot; +
        &quot;NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n&quot; +
        &quot;... etc\n&quot; +
    // ...
    for scanner.Scan() {
        s := scanner.Bytes()
        if len(s) == 0 || s[0] == &#39;&gt;&#39; {
            continue
        } else {
            i++
            inStr &lt;- &amp;s
        }
    }
    close(inStr) // Indicate (to Workers) that there&#39;s no more strings coming.
}

3. Main routine:

func main() {
    // Count Cpus, and count down in final select clause
    CpuCnt := runtime.NumCPU() 
    runtime.GOMAXPROCS(CpuCnt)
    // Make channels
    resChA := make(chan *int)
    resChB := make(chan *int)
    quit := make(chan bool)
    inStr := make(chan *[]byte)

    // Set up Workers ( n = Cpu )
    for i := 0; i &lt; CpuCnt; i++ {
        go Worker(inStr, resChA, resChB, quit)
    }
    // Send lines to Workers
    go SpawnWork(inStr, quit)

	// Count the number of &quot;A&quot;,&quot;T&quot; &amp; &quot;G&quot;,&quot;C&quot; per line 
    // (comes in here as ints per row, on separate channels (at and gt))
    for {
        select {
        case tmp_at := &lt;-resChA:
            tmp_gc := &lt;-resChB // Ch A and B go in pairs anyway
            A += *tmp_at       // sum of A&#39;s and T&#39;s
            B += *tmp_gc       // sum of G&#39;s and C&#39;s
        case &lt;-quit:
	        // Each goroutine sends &quot;quit&quot; signals when it&#39;s done. Since 
	        // the number of goroutines equals the Cpu counter, we count 
            // down each time a goroutine tells us it&#39;s done (quit at 0):
            CpuCnt--
            if CpuCnt == 0 { // When all goroutines are done then we&#39;re done.
                goto out     
            }
        }
    }
out:
    // Print report to screen
}

Why does this code count consistently only when executed on a singel cpu/goroutine? That is, the channels doesn't seem to sync, or the main loop quits forcefully before all goroutines are done? Scratching head.

(Again: See/run the full code at the playground: http://play.golang.org/p/PT5jeCKgBv )

// Rolf Lampa

答案1

得分: 3

这是一个工作版本,无论使用多少个CPU,都能产生相同的结果。

我做了以下工作:

  • 删除了传递*int的部分 - 在通道中传递非常有竞争性!
  • 删除了传递*[]byte的部分 - 因为切片本身就是引用类型,所以没有意义
  • 在将切片放入通道之前复制切片 - 切片指向相同的内存,导致竞争
  • 修复了Workeratgc的初始化 - 它们在错误的位置 - 这是结果差异的主要原因
  • 使用sync.WaitGroup进行同步和通道关闭

我使用了go build-race参数来查找和修复数据竞争。

package main

import (
	"bufio"
	"fmt"
	"runtime"
	"strings"
	"sync"
)

func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Println("Worker started...")
	for ch := range inCh {
		at := 0
		gc := 0
		for i := 0; i < len(ch); i++ {
			if ch[i] == 'A' || ch[i] == 'T' {
				at++
			} else if ch[i] == 'G' || ch[i] == 'C' {
				gc++
			}
		}
		resA <- at
		resB <- gc
	}

}

func SpawnWork(inStr chan<- []byte) {
	fmt.Println("Spawning work:")
	// An artificial input source.
	StringData :=
		"NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
			"NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" +
			"CTTCCCAATTGGATTAGACTATTAACATTTCAGAAAGGATGTAAGAAAGGACTAGAGAGA\n" +
			"TATACTTAATGTTTTTAGTTTTTTAAACTTTACAAACTTAATACTGTCATTCTGTTGTTC\n" +
			"AGTTAACATCCCTGAATCCTAAATTTCTTCAGATTCTAAAACAAAAAGTTCCAGATGATT\n" +
			"TTATATTACACTATTTACTTAATGGTACTTAAATCCTCATTNNNNNNNNCAGTACGGTTG\n" +
			"TTAAATANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
			"NNNNNNNCTTCAGAAATAAGTATACTGCAATCTGATTCCGGGAAATATTTAGGTTCATAA\n"
	// Expand data n times
	tmp := StringData
	for n := 0; n < 1000; n++ {
		StringData = StringData + tmp
	}
	scanner := bufio.NewScanner(strings.NewReader(StringData))
	scanner.Split(bufio.ScanLines)

	var i int
	for scanner.Scan() {
		s := scanner.Bytes()
		if len(s) == 0 || s[0] == '>' {
			continue
		} else {
			i++
			s_copy := append([]byte(nil), s...)
			inStr <- s_copy
		}
	}
	close(inStr)
}

func main() {
	CpuCnt := runtime.NumCPU() // Count down in select clause
	CpuOut := CpuCnt           // Save for print report
	runtime.GOMAXPROCS(CpuCnt)
	fmt.Printf("Processors: %d\n", CpuCnt)

	resChA := make(chan int)
	resChB := make(chan int)
	inStr := make(chan []byte)

	fmt.Println("Spawning workers:")
	var wg sync.WaitGroup
	for i := 0; i < CpuCnt; i++ {
		wg.Add(1)
		go Worker(inStr, resChA, resChB, &wg)
	}
	fmt.Println("Spawning work:")
	go func() {
		SpawnWork(inStr)
		wg.Wait()
		close(resChA)
		close(resChB)
	}()

	A := 0
	B := 0
	LineCnt := 0
	for tmp_at := range resChA {
		tmp_gc := <-resChB // Theese go together anyway
		A += tmp_at
		B += tmp_gc
		LineCnt++
	}

	if !(A+B > 0) {
		fmt.Println("No A/B was found!")
	} else {
		ABFraction := float32(B) / float32(A+B)
		fmt.Println("\n----------------------------")
		fmt.Printf("Cpu's  : %d\n", CpuOut)
		fmt.Printf("Lines  : %d\n", LineCnt)
		fmt.Printf("A+B    : %d\n", A+B)
		fmt.Printf("A      : %d\n", A)
		fmt.Printf("B      : %d\n", A)
		fmt.Printf("AB frac: %v\n", ABFraction*100)
		fmt.Println("----------------------------")
	}
}
英文:

Here is a working version which consistently produces the same results no matter how many cpus are used.

Here is what I did

  • remove passing of *int - very racy to pass in a channel!
  • remove passing of *[]byte - pointless as slices are reference types anyway
  • copy the slice before putting it in the channel - the slice points to the same memory causing a race
  • fix initialisation of at and gc in Worker - they were in the wrong place - this was the major cause of the difference in results
  • use sync.WaitGroup for synchronisation and channel close()

I used the -race parameter of go build to find and fix the data races.

package main
import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;runtime&quot;
&quot;strings&quot;
&quot;sync&quot;
)
func Worker(inCh chan []byte, resA chan&lt;- int, resB chan&lt;- int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println(&quot;Worker started...&quot;)
for ch := range inCh {
at := 0
gc := 0
for i := 0; i &lt; len(ch); i++ {
if ch[i] == &#39;A&#39; || ch[i] == &#39;T&#39; {
at++
} else if ch[i] == &#39;G&#39; || ch[i] == &#39;C&#39; {
gc++
}
}
resA &lt;- at
resB &lt;- gc
}
}
func SpawnWork(inStr chan&lt;- []byte) {
fmt.Println(&quot;Spawning work:&quot;)
// An artificial input source.
StringData :=
&quot;NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n&quot; +
&quot;NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n&quot; +
&quot;CTTCCCAATTGGATTAGACTATTAACATTTCAGAAAGGATGTAAGAAAGGACTAGAGAGA\n&quot; +
&quot;TATACTTAATGTTTTTAGTTTTTTAAACTTTACAAACTTAATACTGTCATTCTGTTGTTC\n&quot; +
&quot;AGTTAACATCCCTGAATCCTAAATTTCTTCAGATTCTAAAACAAAAAGTTCCAGATGATT\n&quot; +
&quot;TTATATTACACTATTTACTTAATGGTACTTAAATCCTCATTNNNNNNNNCAGTACGGTTG\n&quot; +
&quot;TTAAATANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n&quot; +
&quot;NNNNNNNCTTCAGAAATAAGTATACTGCAATCTGATTCCGGGAAATATTTAGGTTCATAA\n&quot;
// Expand data n times
tmp := StringData
for n := 0; n &lt; 1000; n++ {
StringData = StringData + tmp
}
scanner := bufio.NewScanner(strings.NewReader(StringData))
scanner.Split(bufio.ScanLines)
var i int
for scanner.Scan() {
s := scanner.Bytes()
if len(s) == 0 || s[0] == &#39;&gt;&#39; {
continue
} else {
i++
s_copy := append([]byte(nil), s...)
inStr &lt;- s_copy
}
}
close(inStr)
}
func main() {
CpuCnt := runtime.NumCPU() // Count down in select clause
CpuOut := CpuCnt           // Save for print report
runtime.GOMAXPROCS(CpuCnt)
fmt.Printf(&quot;Processors: %d\n&quot;, CpuCnt)
resChA := make(chan int)
resChB := make(chan int)
inStr := make(chan []byte)
fmt.Println(&quot;Spawning workers:&quot;)
var wg sync.WaitGroup
for i := 0; i &lt; CpuCnt; i++ {
wg.Add(1)
go Worker(inStr, resChA, resChB, &amp;wg)
}
fmt.Println(&quot;Spawning work:&quot;)
go func() {
SpawnWork(inStr)
wg.Wait()
close(resChA)
close(resChB)
}()
A := 0
B := 0
LineCnt := 0
for tmp_at := range resChA {
tmp_gc := &lt;-resChB // Theese go together anyway
A += tmp_at
B += tmp_gc
LineCnt++
}
if !(A+B &gt; 0) {
fmt.Println(&quot;No A/B was found!&quot;)
} else {
ABFraction := float32(B) / float32(A+B)
fmt.Println(&quot;\n----------------------------&quot;)
fmt.Printf(&quot;Cpu&#39;s  : %d\n&quot;, CpuOut)
fmt.Printf(&quot;Lines  : %d\n&quot;, LineCnt)
fmt.Printf(&quot;A+B    : %d\n&quot;, A+B)
fmt.Printf(&quot;A      : %d\n&quot;, A)
fmt.Printf(&quot;B      : %d\n&quot;, A)
fmt.Printf(&quot;AB frac: %v\n&quot;, ABFraction*100)
fmt.Println(&quot;----------------------------&quot;)
}
}

huangapple
  • 本文由 发表于 2013年6月14日 07:12:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/17098722.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定