英文:
Why is this golang script giving me a deadlock ? + a few questions
问题
我从GitHub上的某人那里得到了这段代码,我正在尝试对其进行调整以理解并发性。
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("gonna sleep a bit\n")
time.Sleep(2 * time.Second)
return url + " added stuff"
}
func main() {
sc := bufio.NewScanner(os.Stdin)
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
for sc.Scan() {
url := sc.Text()
urls <- url
}
for result := range results {
fmt.Printf("%s arrived\n", result)
}
wg.Wait()
close(urls)
close(results)
}
我有几个问题:
1)为什么这段代码会导致死锁?
2)为什么在从用户接收输入之前,该for循环存在?go例程是否等待任何内容通过urls通道传入,然后开始工作?我不明白这一点,因为它不是顺序执行的,为什么从用户接收输入,然后将每个输入放入urls通道,然后运行go例程被认为是错误的?
3)在for循环内部,我有另一个循环,它遍历urls通道,每个go例程处理一行输入吗?还是一个go例程同时处理多行输入?这些是如何工作的?
4)我在这里正确地收集输出吗?
英文:
I got this code from someone on github and I am trying to play around with it to understand concurrency.
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("gonna sleep a bit\n")
time.Sleep(2 * time.Second)
return url + " added stuff"
}
func main() {
sc := bufio.NewScanner(os.Stdin)
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
for sc.Scan() {
url := sc.Text()
urls <- url
}
for result := range results {
fmt.Printf("%s arrived\n", result)
}
wg.Wait()
close(urls)
close(results)
}
I have a few questions:
- Why does this code give me a deadlock?
- How does that for loop exist before the operation of taking in input from user does the go routines wait until anything is passes in the urls channel then start doing work? I don't get this because it's not sequential, like why is taking in input from user then putting every input in the urls channel then running the go routines is considered wrong?
- Inside the for loop I have another loop which is iterating over the urls channel, does each go routine deal with exactly one line of input? or does one go routine handle multiple lines at once? how does any of this work?
- Am i gathering the output correctly here?
答案1
得分: 2
你大部分做得都正确,但是有些顺序有点问题。for sc.Scan()
循环会一直执行,直到 Scanner 完成,而 for result := range results
循环将永远不会运行,因此没有 go routine(在这种情况下是 'main')能够从 results
接收数据。在运行你的示例时,我将 for result := range results
循环放在 for sc.Scan()
之前,并且放在它自己的 go routine 中,否则 for sc.Scan()
将永远不会被执行。
go func() {
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
for sc.Scan() {
url := sc.Text()
urls <- url
}
另外,因为你在调用 wg.Wait()
之前运行了 close(urls)
,主 goroutine 被阻塞等待这 20 个 sad()
go routine 完成。但是它们无法完成直到调用 close(urls)
。所以在等待 waitgroup 之前,先关闭该通道。
close(urls)
wg.Wait()
close(results)
英文:
Mostly you're doing things correctly, but have things a little out of order. The for sc.Scan()
loop will continue until Scanner is done, and the for result := range results
loop will never run, thus no go routine ('main' in this case) will be able to receive from results
. When running your example, I started the for result := range results
loop before for sc.Scan()
and also in its own go routine--otherwise for sc.Scan()
will never be reached.
go func() {
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
for sc.Scan() {
url := sc.Text()
urls <- url
}
Also, because you run wg.Wait()
before close(urls)
, the main goroutine is left blocked waiting for the 20 sad()
go routines to finish. But they can't finish until close(urls)
is called. So just close that channel before waiting for the waitgroup.
close(urls)
wg.Wait()
close(results)
答案2
得分: 1
for循环创建了20个goroutine,它们都在等待从urls
通道接收输入。当有人写入该通道时,其中一个goroutine将接收并处理它。这是一个典型的工作池实现。
然后,扫描器逐行读取输入,并将其发送到urls
通道,其中一个goroutine将接收并将响应写入results
通道。此时,没有其他goroutine从results
通道读取,因此这将阻塞。
当扫描器读取URL时,所有其他goroutine都会接收并阻塞。因此,如果扫描器读取的URL超过20个,它将发生死锁,因为所有goroutine都将等待读取器。
如果URL少于20个,扫描器的for循环将结束,并读取结果。然而,这也最终会发生死锁,因为当通道关闭时,for循环将终止,而没有人关闭通道。
为了解决这个问题,首先在完成读取后立即关闭urls
通道。这将释放所有goroutine中的for循环。然后,将从results
通道读取的for循环放入一个goroutine中,这样在处理结果时可以调用wg.Wait
。在wg.Wait
之后,可以关闭results
通道。
这并不保证results
通道中的所有项都会被读取。程序可能在处理所有消息之前终止,因此使用第三个通道,在从results
通道读取的goroutine结束时关闭它。代码如下:
done := make(chan struct{})
go func() {
defer close(done)
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
wg.Wait()
close(results)
<-done
英文:
The for-loop creates 20 goroutines, all waiting input from the urls
channel. When someone writes into this channel, one of the goroutines will pick it up and work on in. This is a typical worker-pool implementation.
Then, then scanner reads input line by line, and sends it to the urls
channel, where one of the goroutines will pick it up and write the response to the results
channel. At this point, there are no other goroutines reading from the results
channel, so this will block.
As the scanner reads URLs, all other goroutines will pick them up and block. So if the scanner reads more than 20 URLs, it will deadlock because all goroutines will be waiting for a reader.
If there are fewer than 20 URLs, the scanner for-loop will end, and the results will be read. However that will eventually deadlock as well, because the for-loop will terminate when the channel is closed, and there is no one there to close the channel.
To fix this, first, close the urls
channel right after you finish reading. That will release all the for-loops in the goroutines. Then you should put the for-loop reading from the results
channel into a goroutine, so you can call wg.Wait
while results are being processed. After wg.Wait
, you can close the results
channel.
This does not guarantee that all items in the results
channel will be read. The program may terminate before all messages are processed, so use a third channel which you close at the end of the goroutine that reads from the results
channel. That is:
done:=make(chan struct{})
go func() {
defer close(done)
for result := range results {
fmt.Printf("%s arrived\n", result)
}
}()
wg.Wait()
close(results)
<-done
答案3
得分: 1
我对之前的回答不太满意,所以这里是基于Go Tour、Go Doc和规范的文档行为的解决方案。
package main
import (
"bufio"
"fmt"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("稍微休眠一下\n")
time.Sleep(2 * time.Millisecond)
return url + " 添加了一些内容"
}
func main() {
// sc := bufio.NewScanner(os.Stdin)
sc := bufio.NewScanner(strings.NewReader(strings.Repeat("blah blah\n", 15)))
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
// results 被很多 goroutine 消费
// 我们必须等待它们完成后再关闭 results
// 但我们不想在这里阻塞,所以将其放入一个例程中。
go func() {
wg.Wait()
close(results)
}()
go func() {
for sc.Scan() {
url := sc.Text()
urls <- url
}
close(urls) // 完成对通道的消费后,立即关闭它。
}()
for result := range results {
fmt.Printf("%s 到达\n", result)
} // 程序将在退出此循环时结束。
// 它将退出此循环,因为您确保了 results 通道已关闭。
}
英文:
I am not super happy with previous answers, so here is a solution based on the documented behavior in the go tour, the go doc, the specifications.
package main
import (
"bufio"
"fmt"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup
func sad(url string) string {
fmt.Printf("gonna sleep a bit\n")
time.Sleep(2 * time.Millisecond)
return url + " added stuff"
}
func main() {
// sc := bufio.NewScanner(os.Stdin)
sc := bufio.NewScanner(strings.NewReader(strings.Repeat("blah blah\n", 15)))
urls := make(chan string)
results := make(chan string)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range urls {
n := sad(url)
results <- n
}
}()
}
// results is consumed by so many goroutines
// we must wait for them to finish before closing results
// but we dont want to block here, so put that into a routine.
go func() {
wg.Wait()
close(results)
}()
go func() {
for sc.Scan() {
url := sc.Text()
urls <- url
}
close(urls) // done consuming a channel, close it, right away.
}()
for result := range results {
fmt.Printf("%s arrived\n", result)
} // the program will finish when it gets out of this loop.
// It will get out of this loop because you have made sure the results channel is closed.
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论