英文:
How to close a channel
问题
我尝试适应这个例子:https://gobyexample.com/worker-pools
但我不知道如何停止通道,因为程序在通道循环结束时不会退出。
你能解释一下如何退出程序吗?
package main
import (
"github.com/SlyMarbo/rss"
"bufio"
"fmt"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on:", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne +1
} else {
continue
}
}
}
}
func main() {
jobs := make(chan string)
results := make(chan string)
for w := 1; w <= 16; w++ {
go worker(w, jobs, results)
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
flux.txt 是一个类似以下内容的纯文本文件:
英文:
I try to adapt this example:
https://gobyexample.com/worker-pools
But I don't know how to stop the channel because program don't exit at the end of the channel loop.
Can you explain how to exit the program?
package main
import (
"github.com/SlyMarbo/rss"
"bufio"
"fmt"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on: ", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne +1
} else {
continue
}
}
}
}
func main() {
jobs := make(chan string)
results := make(chan string)
for w := 1; w <= 16; w++ {
go worker(w, jobs, results)
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
The flux.txt is a flat text file like :
答案1
得分: 2
问题是,在你提到的示例中,工作池从results
通道中读取了9次:
for a := 1; a <= 9; a++ {
<-results
}
而你的程序则对results
进行了范围循环,这在Go语言中具有不同的语义。范围操作符不会停止,直到通道关闭。
for msg := range results {
fmt.Println(msg)
}
要解决这个问题,你需要关闭results
通道。然而,如果你在for
循环之前调用close(results)
,很可能会出现恐慌,因为工作线程可能正在向results
写入数据。
为了解决这个问题,你需要添加另一个通道,用于在所有工作线程完成时通知。你可以使用sync.WaitGroup
或者:
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
var wg sync.WaitGroup
for w := 0; w < workers; w++ {
go func() {
wg.Add(1)
defer wg.Done()
worker(w, jobs, results)
}()
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
wg.Wait()
close(results)
// 程序似乎已经运行完毕...
for msg := range results {
fmt.Println(msg)
}
}
或者使用done
通道:
package main
import (
"bufio"
"fmt"
"github.com/SlyMarbo/rss"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string, done chan struct{}) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on:", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne + 1
} else {
continue
}
}
}
close(done)
}
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
dones := make([]chan struct{}, workers)
for w := 0; w < workers; w++ {
dones[w] = make(chan struct{})
go worker(w, jobs, results, dones[w])
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
for _, done := range dones {
<-done
}
close(results)
// 程序似乎已经运行完毕...
for msg := range results {
fmt.Println(msg)
}
}
英文:
The problem is that, in the example you are referring to, the worker pool reads from results
9 times:
<!-- lang=go -->
for a := 1; a <= 9; a++ {
<-results
}
Your program, on the other hand, does a range loop over the results
which has a different semantics in go. The range operator does not stop until the channel is closed.
<!-- lang=go -->
for msg := range results {
fmt.Println(msg)
}
To fix your problem you'd need to close the results
channel. However, if you just call close(results)
before the for loop, you most probably will
get a panic, because the workers might be writing on results
.
To fix this problem, you need to add another channel to be notified when all the workers are done. You can do this either using a sync.WaitGroup
or :
<!-- lang=go -->
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
var wg sync.WaitGroup
for w := 0; w < workers; w++ {
go func() {
wg.Add(1)
defer wg.Done()
worker(w, jobs, results)
}()
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
wg.Wait()
close(results)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
Or a done
channel:
<!-- lang=go -->
package main
import (
"bufio"
"fmt"
"github.com/SlyMarbo/rss"
"log"
"os"
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs <-chan string, results chan<- string, done chan struct{}) {
for url := range jobs {
fmt.Println("worker", id, "processing job", url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println("Error on: ", url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne < 5 {
results <- value.Link
borne = borne + 1
} else {
continue
}
}
}
close(done)
}
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
dones := make([]chan struct{}, workers)
for w := 0; w < workers; w++ {
dones[w] = make(chan struct{})
go worker(w, jobs, results, dones[w])
}
urls, err := readLines("flux.txt")
if err != nil {
log.Fatalf("readLines: %s", err)
}
for _, url := range urls {
jobs <- url
}
close(jobs)
for _, done := range dones {
<-done
}
close(results)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论