不产生相同的输出并发Go工作池

huangapple go评论86阅读模式
英文:

Does not produce the same output Concurrency Go worker pool

问题

我正在编写一个程序,使用通道和工作池模式并发地从文本文件中逐字读取单词,并计算其出现次数。

程序的工作流程如下:

  1. 读取文本文件(readText 函数)
  2. readText 函数将每个单词发送到 word 通道
  3. 每个 goroutine 执行 countWord 函数,用于计算单词在一个映射中的出现次数
  4. 每个 goroutine 返回一个映射,工作函数将结构体的 Result 值传递给 resultC 通道
  5. Test 函数根据从 resultC 通道接收到的结果值创建一个映射
  6. 打印从第 5 步创建的映射

程序可以正常工作,但是当我尝试像下面这样添加 fmt.Println(0) 来查看进程时,程序会在没有显示/计数所有单词的情况下终止:

func computeTotal() {
	i := 0
	for e := range resultC {
		total[e.word] += e.count
		i += 1
		fmt.Println(i)
	}
}

如果我取消注释 computeTotal 函数中的 fmt.println() 语句,程序会正确显示结果,输出如下所示:

all goroutines finished
map[a:83 about:4 above:2 absolute:1 accepted:1 across:1 affection:1 after:1 again:5  wonder:2 wood:5 wooded:1 woody:1 work:1 worked:2 world:4 would:11 wrapped:1 wrong:1 yellow:2 yielded:1 yielding:1 counts continues ......]
total words:  856
Time taken for reading the book 5.9924ms

这是我 readText 的实现:

// 确保在适当的时机关闭 words
func readText() {
	file, err := os.Open(FILENAME)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()
	scanner := bufio.NewScanner(file)
	scanner.Split(bufio.ScanWords)

	for scanner.Scan() {
		word := strings.ToLower(scanner.Text())
		words <- strings.Trim(word, ".,:;")
	}
	close(words)
}

这是我使用工作池实现的 countWord:

// 调用 countWord 函数
func workerPool() {
	var wg sync.WaitGroup
	for i := 1; i <= NUMOFWORKER; i++ {
		wg.Add(1)
		go worker(&wg)
	}
	wg.Wait()
	fmt.Println("all goroutines finished")
	close(resultC)
}

func worker(wg *sync.WaitGroup) {
	var tempMap = make(map[string]int)
	for w := range words {
		resultC <- countWord(w, tempMap) // 返回 Result 值
	}
	wg.Done()
}

// 创建一个映射,记录每个单词的出现次数
func countWord(word string, tempMap map[string]int) Result {
	_, ok := tempMap[word]
	if ok {
		tempMap[word]++
		return Result{word, tempMap[word] + 1}
	}
	return Result{word, 1}
}

最后,这是主函数:

const FILENAME = "cat.txt"
const BUFFERSIZE = 3000
const NUMOFWORKER = 5

var words = make(chan string, BUFFERSIZE) // 任务
var resultC = make(chan Result, BUFFERSIZE)

var total = map[string]int{}

type Result struct {
	word  string
	count int
}

func main() {
	startTime := time.Now()
	go readText()
	go computeTotal()
	workerPool() // 阻塞
	fmt.Println(total)
	endTime := time.Now()
	timeTaken := endTime.Sub(startTime)
	fmt.Println("total words: ", len(total))
	fmt.Println("Time taken for reading the book", timeTaken)
}

我一直在寻找程序为什么不能产生一致的结果,但是我还没有找到答案。我该如何修改程序以产生相同的结果?

英文:

I am writing a program that concurrently reads word by word from a text file to compute the occurrences using channels and worker pool pattern

The program works in the following flow:

  1. Read a text file (readText function)
  2. readText function sends each word to the word channel
  3. Each goroutine executes countWord function that counts word in a map
  4. Each goroutine returns a map and the worker function passes the Result value of struct to the resultC channel
  5. Test function creates a map based on the result values coming from the resultC channel
  6. Print the map created from step 5

The program works, but when I try to put fmt.Println(0) to see the process as shown below

func computeTotal() {
	i := 0
	for e := range resultC {
		total[e.word] += e.count
		i += 1
		fmt.Println(i)
	}
}

The program terminates without showing/counting all the words

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 all goroutines finished 16 17 18 map[but:1 cat&#39;s:1 crouched:1 fur:1 he:2 imperturbable:1 it:1 pointed:1 sat:1 snow:1 stiffly:1 the:1 was:2 with:1] total words: 27 38 ... 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 Time taken for reading the book 5.8145ms

The program shows results correctly if I uncomment the fmt.println() in the compute Total function statement here and the output is as shown below

all goroutines finished
map[a:83 about:4 above:2 absolute:1 accepted:1 across:1 affection:1 after:1 again:5  wonder:2 wood:5 wooded:1 woody:1 work:1 worked:2 world:4 would:11 wrapped:1 wrong:1 yellow:2 yielded:1 yielding:1 counts continues ......]
total words:  856
Time taken for reading the book 5.9924ms

here is my implementation of readtext

//ensure close words at the right timing
func readText() {

	file, err := os.Open(FILENAME)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()
	scanner := bufio.NewScanner(file)
	scanner.Split(bufio.ScanWords)

	for scanner.Scan() {
		word := strings.ToLower(scanner.Text())
		words &lt;- strings.Trim(word, &quot;.,:;&quot;)

	}
	//time.Sleep(1 * time.Second)
	close(words)
}

here is my count word implementation using worker pool

//call countWord func,
func workerPool() {
	var wg sync.WaitGroup
	for i := 1; i &lt;= NUMOFWORKER; i++ {
		wg.Add(1)
		go worker(&amp;wg)
	}
	wg.Wait()
	fmt.Println(&quot;all goroutines finished&quot;)
	close(resultC)
}

func worker(wg *sync.WaitGroup) {
	var tempMap = make(map[string]int)
	for w := range words {
		resultC &lt;- countWord(w, tempMap) //retuns Result value
	}
	wg.Done()

}

//creates a map each word
func countWord(word string, tempMap map[string]int) Result {
	_, ok := tempMap[word]
	if ok {
		tempMap[word]++
		return Result{word, tempMap[word] + 1}

	}
	return Result{word, 1}

}

Finally, this is the main function


const FILENAME = &quot;cat.txt&quot;
const BUFFERSIZE = 3000
const NUMOFWORKER = 5

var words = make(chan string, BUFFERSIZE) //job
var resultC = make(chan Result, BUFFERSIZE)

var total = map[string]int{}

type Result struct {
	word  string
	count int
}

func main() {
	startTime := time.Now()
	go readText()
	go computeTotal()
	workerPool() //blocking
	fmt.Println(total)
	endTime := time.Now()
	timeTaken := endTime.Sub(startTime)
	fmt.Println(&quot;total words: &quot;, len(total))
	fmt.Println(&quot;Time taken for reading the book&quot;, timeTaken)
}

I have been looking for why the program does not show consistent results but I could not figure it out yet. How can I make a change to the program so that it produces the same outcome?

答案1

得分: 2

countWord函数总是返回一个计数为1的结果。

这是一个增加计数的函数版本:

func countWord(word string, tempMap map[string]int) Result {
    count := tempMap[word] + 1
    tempMap[word] = count
    return Result{word, count}
}

但是请稍等!computeTotal函数假设结果的计数为1。鉴于问题中的工作人员总是发送Result{word, 1},就像computeTotal所期望的那样,我们可以通过直接从readText发送Result{word, 1}来排除工作人员的影响。以下是代码:

func computeTotal() {
    i := 0
    for e := range resultC {
        total[e.word] += e.count
        i += 1
        fmt.Println(i)
    }
}

func readText() {
    file, err := os.Open(FILENAME)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
    scanner := bufio.NewScanner(file)
    scanner.Split(bufio.ScanWords)

    for scanner.Scan() {
        word := strings.ToLower(scanner.Text())
        resultC <- Result{strings.Trim(word, ".,:;"), 1}
    }
    close(resultC)
}

func main() {
    ...
    go readText()
    computeTotal()
    fmt.Println(total)
    ...
}

在playground上运行

通道操作的开销可能抵消了将computeTotalreadText在单独的goroutine中运行的任何好处。以下是将代码合并为单个goroutine的代码:

func main() {
    file, err := os.Open(FILENAME)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
    scanner := bufio.NewScanner(file)
    scanner.Split(bufio.ScanWords)

    var total = map[string]int{}
    for scanner.Scan() {
        word := strings.ToLower(strings.Trim(scanner.Text(), ".,:;"))
        total[word]++
    }
    fmt.Println(total)
}

[在playground上运行](https://play.golang.org/p/bItqxMTF8dO)。

问题中的`countWord`函数让我想到你的目标是计算每个工作人员的单词数并将结果合并为总数以下是实现这一目标的代码

```go
func computeTotal() {
    for i := 1; i <= NUMOFWORKER; i++ {
        m := <-resultC
        for word, count := range m {
            total[word] += count
        }
    }
}

func workerPool() {
    for i := 1; i <= NUMOFWORKER; i++ {
        go worker()
    }
}

func worker() {
    var tempMap = make(map[string]int)
    for w := range words {
        tempMap[w]++
    }
    resultC <- tempMap
}

...
var resultC = make(chan map[string]int)
...

func main() {
    ...
    go readText()
    workerPool()
    computeTotal()
    ...
}

[在playground上运行](https://play.golang.org/p/mHhh6Tm97ol)。
英文:

The countWord function always returns a result with count == 1.

Here's a version of the function that increments the count:

func countWord(word string, tempMap map[string]int) Result {
count := tempMap[word] + 1
tempMap[word] = count
return Result{word, count}
}

But hold that thought! The computeTotal function assumes that the result count is 1. Given that the workers in the question always send Result{word, 1} as computeTotal expects, we can cut the workers out of the picture by sending Result{word, 1} directly from readText. Here's the code:

func computeTotal() {
i := 0
for e := range resultC {
total[e.word] += e.count
i += 1
fmt.Println(i)
}
}
func readText() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
word := strings.ToLower(scanner.Text())
resultC &lt;- Result{strings.Trim(word, &quot;.,:;&quot;), 1}
}
close(resultC)
}
main() {
...
go readText()
computeTotal()
fmt.Println(total)
...
}

Run it on the playground.

The overhead of the channel operations probably negates any benefit of running computeTotal and readText in separate goroutines. Here's the code combined into a single goroutine:

func main() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
var total = map[string]int{}
for scanner.Scan() {
word := strings.ToLower(strings.Trim(scanner.Text(), &quot;.,:;&quot;))
total[word]++
}
fmt.Println(total)
}

Run it on the playground.

The countWord function in the question makes me think your goal was to count words in each worker and merge the result for a total. Here's the code for that:

func computeTotal() {
for i := 1; i &lt;= NUMOFWORKER; i++ {
m := &lt;-resultC
for word, count := range m {
total[word] += count
}
}
}
func workerPool() {
for i := 1; i &lt;= NUMOFWORKER; i++ {
go worker()
}
}
func worker() {
var tempMap = make(map[string]int)
for w := range words {
tempMap[w]++
}
resultC &lt;- tempMap
}
...
var resultC = make(chan map[string]int)
...
func main() {
...
go readText()
workerPool()
computeTotal()
...
}

Run it on the playground.

答案2

得分: 1

你需要按照以下方式重写你的computeTotal函数:

func computeTotal(done chan struct{}) {
    defer close(done)
    i := 0
    for e := range resultC {
        totalMu.Lock()
        total[e.word] += e.count
        totalMu.Unlock()
        i += 1
        fmt.Println(i)
    }
}

func main() {
    computeTotalDone := make(chan struct{})
    go computeTotal(computeTotalDone)
    // ...
    workerPool() // 阻塞
    <-computeTotalDone
    totalMu.Lock()
    fmt.Println(total)
    totalMu.Unlock()
}

添加fmt.Println导致结果无效的原因是你的实现存在竞态条件。由于在main函数中打印总结果fmt.Println(total)computeTotal函数并行运行,无法保证computeTotal在调用fmt.Println(total)之前处理完所有消息。在没有fmt.Println的情况下,computeTotal函数在你的计算机上运行得足够快,可以产生正确的结果。

提出的解决方案确保在调用fmt.Println(total)之前,computeTotal函数完成执行。为了保证对total的访问安全,使用了互斥锁totalMu

英文:

You have to rewrite your computeTotal function in a following way:

func computeTotal(done chan struct{}) {
defer close(done)
i := 0
for e := range resultC {
total[e.word] += e.count
i += 1
fmt.Println(i)
}
}
func main() {
computeTotalDone := make(chan struct{})
go computeTotal(computeTotalDone)
...
workerPool() //blocking
&lt;-computeTotalDone
fmt.Println(total)
}

The reason why adding fmt.Println leads to invalid result is that your implementation has a race condition. As printing total result in main function fmt.Println(total) and computeTotal function runs in parallel there is no guarantee that computeTotal handles all messages before fmt.Println(total) is invoked. Without fmt.Println the computeTotal function is fast enough on your computer to produce correct results.

The proposed solution ensures that computeTotal completes before fmt.Println(total) is invoked.

huangapple
  • 本文由 发表于 2021年10月9日 12:42:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/69503808.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定