英文:
Does not produce the same output Concurrency Go worker pool
问题
我正在编写一个程序,使用通道和工作池模式并发地从文本文件中逐字读取单词,并计算其出现次数。
程序的工作流程如下:
- 读取文本文件(
readText
函数) readText
函数将每个单词发送到word
通道- 每个 goroutine 执行
countWord
函数,用于计算单词在一个映射中的出现次数 - 每个 goroutine 返回一个映射,工作函数将结构体的 Result 值传递给
resultC
通道 - Test 函数根据从
resultC
通道接收到的结果值创建一个映射 - 打印从第 5 步创建的映射
程序可以正常工作,但是当我尝试像下面这样添加 fmt.Println(0)
来查看进程时,程序会在没有显示/计数所有单词的情况下终止:
func computeTotal() {
i := 0
for e := range resultC {
total[e.word] += e.count
i += 1
fmt.Println(i)
}
}
如果我取消注释 computeTotal 函数中的 fmt.println() 语句,程序会正确显示结果,输出如下所示:
all goroutines finished
map[a:83 about:4 above:2 absolute:1 accepted:1 across:1 affection:1 after:1 again:5 wonder:2 wood:5 wooded:1 woody:1 work:1 worked:2 world:4 would:11 wrapped:1 wrong:1 yellow:2 yielded:1 yielding:1 counts continues ......]
total words: 856
Time taken for reading the book 5.9924ms
这是我 readText 的实现:
// 确保在适当的时机关闭 words
func readText() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
word := strings.ToLower(scanner.Text())
words <- strings.Trim(word, ".,:;")
}
close(words)
}
这是我使用工作池实现的 countWord:
// 调用 countWord 函数
func workerPool() {
var wg sync.WaitGroup
for i := 1; i <= NUMOFWORKER; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
fmt.Println("all goroutines finished")
close(resultC)
}
func worker(wg *sync.WaitGroup) {
var tempMap = make(map[string]int)
for w := range words {
resultC <- countWord(w, tempMap) // 返回 Result 值
}
wg.Done()
}
// 创建一个映射,记录每个单词的出现次数
func countWord(word string, tempMap map[string]int) Result {
_, ok := tempMap[word]
if ok {
tempMap[word]++
return Result{word, tempMap[word] + 1}
}
return Result{word, 1}
}
最后,这是主函数:
const FILENAME = "cat.txt"
const BUFFERSIZE = 3000
const NUMOFWORKER = 5
var words = make(chan string, BUFFERSIZE) // 任务
var resultC = make(chan Result, BUFFERSIZE)
var total = map[string]int{}
type Result struct {
word string
count int
}
func main() {
startTime := time.Now()
go readText()
go computeTotal()
workerPool() // 阻塞
fmt.Println(total)
endTime := time.Now()
timeTaken := endTime.Sub(startTime)
fmt.Println("total words: ", len(total))
fmt.Println("Time taken for reading the book", timeTaken)
}
我一直在寻找程序为什么不能产生一致的结果,但是我还没有找到答案。我该如何修改程序以产生相同的结果?
英文:
I am writing a program that concurrently reads word by word from a text file to compute the occurrences using channels and worker pool pattern
The program works in the following flow:
- Read a text file (
readText
function) readText
function sends each word to theword
channel- Each goroutine executes
countWord
function that counts word in a map - Each goroutine returns a map and the worker function passes the Result value of struct to the
resultC
channel - Test function creates a map based on the result values coming from the
resultC
channel - Print the map created from step 5
The program works, but when I try to put fmt.Println(0)
to see the process as shown below
func computeTotal() {
i := 0
for e := range resultC {
total[e.word] += e.count
i += 1
fmt.Println(i)
}
}
The program terminates without showing/counting all the words
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 all goroutines finished 16 17 18 map[but:1 cat's:1 crouched:1 fur:1 he:2 imperturbable:1 it:1 pointed:1 sat:1 snow:1 stiffly:1 the:1 was:2 with:1] total words: 27 38 ... 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 Time taken for reading the book 5.8145ms
The program shows results correctly if I uncomment the fmt.println() in the compute Total function statement here and the output is as shown below
all goroutines finished
map[a:83 about:4 above:2 absolute:1 accepted:1 across:1 affection:1 after:1 again:5 wonder:2 wood:5 wooded:1 woody:1 work:1 worked:2 world:4 would:11 wrapped:1 wrong:1 yellow:2 yielded:1 yielding:1 counts continues ......]
total words: 856
Time taken for reading the book 5.9924ms
here is my implementation of readtext
//ensure close words at the right timing
func readText() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
word := strings.ToLower(scanner.Text())
words <- strings.Trim(word, ".,:;")
}
//time.Sleep(1 * time.Second)
close(words)
}
here is my count word implementation using worker pool
//call countWord func,
func workerPool() {
var wg sync.WaitGroup
for i := 1; i <= NUMOFWORKER; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
fmt.Println("all goroutines finished")
close(resultC)
}
func worker(wg *sync.WaitGroup) {
var tempMap = make(map[string]int)
for w := range words {
resultC <- countWord(w, tempMap) //retuns Result value
}
wg.Done()
}
//creates a map each word
func countWord(word string, tempMap map[string]int) Result {
_, ok := tempMap[word]
if ok {
tempMap[word]++
return Result{word, tempMap[word] + 1}
}
return Result{word, 1}
}
Finally, this is the main function
const FILENAME = "cat.txt"
const BUFFERSIZE = 3000
const NUMOFWORKER = 5
var words = make(chan string, BUFFERSIZE) //job
var resultC = make(chan Result, BUFFERSIZE)
var total = map[string]int{}
type Result struct {
word string
count int
}
func main() {
startTime := time.Now()
go readText()
go computeTotal()
workerPool() //blocking
fmt.Println(total)
endTime := time.Now()
timeTaken := endTime.Sub(startTime)
fmt.Println("total words: ", len(total))
fmt.Println("Time taken for reading the book", timeTaken)
}
I have been looking for why the program does not show consistent results but I could not figure it out yet. How can I make a change to the program so that it produces the same outcome?
答案1
得分: 2
countWord
函数总是返回一个计数为1的结果。
这是一个增加计数的函数版本:
func countWord(word string, tempMap map[string]int) Result {
count := tempMap[word] + 1
tempMap[word] = count
return Result{word, count}
}
但是请稍等!computeTotal
函数假设结果的计数为1。鉴于问题中的工作人员总是发送Result{word, 1}
,就像computeTotal
所期望的那样,我们可以通过直接从readText
发送Result{word, 1}
来排除工作人员的影响。以下是代码:
func computeTotal() {
i := 0
for e := range resultC {
total[e.word] += e.count
i += 1
fmt.Println(i)
}
}
func readText() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
word := strings.ToLower(scanner.Text())
resultC <- Result{strings.Trim(word, ".,:;"), 1}
}
close(resultC)
}
func main() {
...
go readText()
computeTotal()
fmt.Println(total)
...
}
通道操作的开销可能抵消了将computeTotal
和readText
在单独的goroutine中运行的任何好处。以下是将代码合并为单个goroutine的代码:
func main() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
var total = map[string]int{}
for scanner.Scan() {
word := strings.ToLower(strings.Trim(scanner.Text(), ".,:;"))
total[word]++
}
fmt.Println(total)
}
[在playground上运行](https://play.golang.org/p/bItqxMTF8dO)。
问题中的`countWord`函数让我想到你的目标是计算每个工作人员的单词数,并将结果合并为总数。以下是实现这一目标的代码:
```go
func computeTotal() {
for i := 1; i <= NUMOFWORKER; i++ {
m := <-resultC
for word, count := range m {
total[word] += count
}
}
}
func workerPool() {
for i := 1; i <= NUMOFWORKER; i++ {
go worker()
}
}
func worker() {
var tempMap = make(map[string]int)
for w := range words {
tempMap[w]++
}
resultC <- tempMap
}
...
var resultC = make(chan map[string]int)
...
func main() {
...
go readText()
workerPool()
computeTotal()
...
}
[在playground上运行](https://play.golang.org/p/mHhh6Tm97ol)。
英文:
The countWord
function always returns a result with count == 1.
Here's a version of the function that increments the count:
func countWord(word string, tempMap map[string]int) Result {
count := tempMap[word] + 1
tempMap[word] = count
return Result{word, count}
}
But hold that thought! The computeTotal
function assumes that the result count
is 1. Given that the workers in the question always send Result{word, 1}
as computeTotal
expects, we can cut the workers out of the picture by sending Result{word, 1}
directly from readText
. Here's the code:
func computeTotal() {
i := 0
for e := range resultC {
total[e.word] += e.count
i += 1
fmt.Println(i)
}
}
func readText() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
word := strings.ToLower(scanner.Text())
resultC <- Result{strings.Trim(word, ".,:;"), 1}
}
close(resultC)
}
main() {
...
go readText()
computeTotal()
fmt.Println(total)
...
}
The overhead of the channel operations probably negates any benefit of running computeTotal
and readText
in separate goroutines. Here's the code combined into a single goroutine:
func main() {
file, err := os.Open(FILENAME)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanWords)
var total = map[string]int{}
for scanner.Scan() {
word := strings.ToLower(strings.Trim(scanner.Text(), ".,:;"))
total[word]++
}
fmt.Println(total)
}
The countWord
function in the question makes me think your goal was to count words in each worker and merge the result for a total. Here's the code for that:
func computeTotal() {
for i := 1; i <= NUMOFWORKER; i++ {
m := <-resultC
for word, count := range m {
total[word] += count
}
}
}
func workerPool() {
for i := 1; i <= NUMOFWORKER; i++ {
go worker()
}
}
func worker() {
var tempMap = make(map[string]int)
for w := range words {
tempMap[w]++
}
resultC <- tempMap
}
...
var resultC = make(chan map[string]int)
...
func main() {
...
go readText()
workerPool()
computeTotal()
...
}
答案2
得分: 1
你需要按照以下方式重写你的computeTotal
函数:
func computeTotal(done chan struct{}) {
defer close(done)
i := 0
for e := range resultC {
totalMu.Lock()
total[e.word] += e.count
totalMu.Unlock()
i += 1
fmt.Println(i)
}
}
func main() {
computeTotalDone := make(chan struct{})
go computeTotal(computeTotalDone)
// ...
workerPool() // 阻塞
<-computeTotalDone
totalMu.Lock()
fmt.Println(total)
totalMu.Unlock()
}
添加fmt.Println
导致结果无效的原因是你的实现存在竞态条件。由于在main
函数中打印总结果fmt.Println(total)
和computeTotal
函数并行运行,无法保证computeTotal
在调用fmt.Println(total)
之前处理完所有消息。在没有fmt.Println
的情况下,computeTotal
函数在你的计算机上运行得足够快,可以产生正确的结果。
提出的解决方案确保在调用fmt.Println(total)
之前,computeTotal
函数完成执行。为了保证对total
的访问安全,使用了互斥锁totalMu
。
英文:
You have to rewrite your computeTotal
function in a following way:
func computeTotal(done chan struct{}) {
defer close(done)
i := 0
for e := range resultC {
total[e.word] += e.count
i += 1
fmt.Println(i)
}
}
func main() {
computeTotalDone := make(chan struct{})
go computeTotal(computeTotalDone)
...
workerPool() //blocking
<-computeTotalDone
fmt.Println(total)
}
The reason why adding fmt.Println
leads to invalid result is that your implementation has a race condition. As printing total result in main function fmt.Println(total)
and computeTotal
function runs in parallel there is no guarantee that computeTotal
handles all messages before fmt.Println(total)
is invoked. Without fmt.Println
the computeTotal
function is fast enough on your computer to produce correct results.
The proposed solution ensures that computeTotal
completes before fmt.Println(total)
is invoked.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论