英文:
How to close a channel
问题
我尝试适应这个例子:https://gobyexample.com/worker-pools
但我不知道如何停止通道,因为程序在通道循环结束时不会退出。
你能解释一下如何退出程序吗?
package main
import (
"github.com/SlyMarbo/rss"
"bufio"
"fmt"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on:", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne +1
} else {
continue
}
}
}
}
func main() {
jobs := make(chan string)
results := make(chan string)
for w := 1; w <= 16; w++ {
go worker(w, jobs, results)
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
flux.txt 是一个类似以下内容的纯文本文件:
英文:
I try to adapt this example:
https://gobyexample.com/worker-pools
But I don't know how to stop the channel because program don't exit at the end of the channel loop.
Can you explain how to exit the program?
package main
import (
"github.com/SlyMarbo/rss"
"bufio"
"fmt"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on: ", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne +1
} else {
continue
}
}
}
}
func main() {
jobs := make(chan string)
results := make(chan string)
for w := 1; w <= 16; w++ {
go worker(w, jobs, results)
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
The flux.txt is a flat text file like :
答案1
得分: 2
问题是,在你提到的示例中,工作池从results通道中读取了9次:
for a := 1; a <= 9; a++ {
<-results
}
而你的程序则对results进行了范围循环,这在Go语言中具有不同的语义。范围操作符不会停止,直到通道关闭。
for msg := range results {
fmt.Println(msg)
}
要解决这个问题,你需要关闭results通道。然而,如果你在for循环之前调用close(results),很可能会出现恐慌,因为工作线程可能正在向results写入数据。
为了解决这个问题,你需要添加另一个通道,用于在所有工作线程完成时通知。你可以使用sync.WaitGroup或者:
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
var wg sync.WaitGroup
for w := 0; w < workers; w++ {
go func() {
wg.Add(1)
defer wg.Done()
worker(w, jobs, results)
}()
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
wg.Wait()
close(results)
// 程序似乎已经运行完毕...
for msg := range results {
fmt.Println(msg)
}
}
或者使用done通道:
package main
import (
"bufio"
"fmt"
"github.com/SlyMarbo/rss"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string, done chan struct{}) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on:", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne + 1
} else {
continue
}
}
}
close(done)
}
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
dones := make([]chan struct{}, workers)
for w := 0; w < workers; w++ {
dones[w] = make(chan struct{})
go worker(w, jobs, results, dones[w])
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
for _, done := range dones {
<-done
}
close(results)
// 程序似乎已经运行完毕...
for msg := range results {
fmt.Println(msg)
}
}
英文:
The problem is that, in the example you are referring to, the worker pool reads from results 9 times:
<!-- lang=go -->
for a := 1; a <= 9; a++ {
<-results
}
Your program, on the other hand, does a range loop over the results which has a different semantics in go. The range operator does not stop until the channel is closed.
<!-- lang=go -->
for msg := range results {
fmt.Println(msg)
}
To fix your problem you'd need to close the results channel. However, if you just call close(results) before the for loop, you most probably will
get a panic, because the workers might be writing on results.
To fix this problem, you need to add another channel to be notified when all the workers are done. You can do this either using a sync.WaitGroup or :
<!-- lang=go -->
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
var wg sync.WaitGroup
for w := 0; w < workers; w++ {
go func() {
wg.Add(1)
defer wg.Done()
worker(w, jobs, results)
}()
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
wg.Wait()
close(results)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
Or a done channel:
<!-- lang=go -->
package main
import (
"bufio"
"fmt"
"github.com/SlyMarbo/rss"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string, done chan struct{}) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on: ", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne + 1
} else {
continue
}
}
}
close(done)
}
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
dones := make([]chan struct{}, workers)
for w := 0; w < workers; w++ {
dones[w] = make(chan struct{})
go worker(w, jobs, results, dones[w])
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
for _, done := range dones {
<-done
}
close(results)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论