如何关闭一个通道

huangapple go评论78阅读模式
英文:

How to close a channel

问题

我尝试适应这个例子:https://gobyexample.com/worker-pools

但我不知道如何停止通道,因为程序在通道循环结束时不会退出。

你能解释一下如何退出程序吗?

package main

import (
    "github.com/SlyMarbo/rss"
    "bufio"
    "fmt"
    "log"
    "os"
)

func readLines(path string) ([]string, error) {
    file, err := os.Open(path)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var lines []string
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        lines = append(lines, scanner.Text())
    }
    return lines, scanner.Err()
}


func worker(id int, jobs <-chan string, results chan<- string) {
    for url := range jobs {
        fmt.Println("worker", id, "processing job", url)
        feed, err := rss.Fetch(url)
        if err != nil {
            fmt.Println("Error on:", url)
            continue
        }
        borne := 0
        for _, value := range feed.Items {
            if borne < 5 {
                results <- value.Link
                borne = borne +1
            } else {
                continue
            }
        }
    }
}


func main() {
    jobs := make(chan string)
    results := make(chan string)

    for w := 1; w <= 16; w++ {
        go worker(w, jobs, results)
    }


    urls, err := readLines("flux.txt")
    if err != nil { 
        log.Fatalf("readLines: %s", err) 
    }

    for _, url := range urls {
        jobs <- url
    }

    close(jobs)

    // it seems program runs over...
    for msg := range results {
        fmt.Println(msg)
    }
}

flux.txt 是一个类似以下内容的纯文本文件:

英文:

I try to adapt this example:
https://gobyexample.com/worker-pools

But I don't know how to stop the channel because program don't exit at the end of the channel loop.

Can you explain how to exit the program?

package main
import (
&quot;github.com/SlyMarbo/rss&quot;
&quot;bufio&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;os&quot;
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs &lt;-chan string, results chan&lt;- string) {
for url := range jobs {
fmt.Println(&quot;worker&quot;, id, &quot;processing job&quot;, url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println(&quot;Error on: &quot;, url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne &lt; 5 {
results &lt;- value.Link
borne = borne +1
} else {
continue
}
}
}
}
func main() {
jobs := make(chan string)
results := make(chan string)
for w := 1; w &lt;= 16; w++ {
go worker(w, jobs, results)
}
urls, err := readLines(&quot;flux.txt&quot;)
if err != nil { 
log.Fatalf(&quot;readLines: %s&quot;, err) 
}
for _, url := range urls {
jobs &lt;- url
}
close(jobs)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}

The flux.txt is a flat text file like :

答案1

得分: 2

问题是,在你提到的示例中,工作池从results通道中读取了9次:

for a := 1; a <= 9; a++ {
    <-results
}

而你的程序则对results进行了范围循环,这在Go语言中具有不同的语义。范围操作符不会停止,直到通道关闭。

for msg := range results {
    fmt.Println(msg)
}

要解决这个问题,你需要关闭results通道。然而,如果你在for循环之前调用close(results),很可能会出现恐慌,因为工作线程可能正在向results写入数据。

为了解决这个问题,你需要添加另一个通道,用于在所有工作线程完成时通知。你可以使用sync.WaitGroup或者:

const (
    workers = 16
)

func main() {
    jobs := make(chan string, 100)
    results := make(chan string, 100)
    var wg sync.WaitGroup

    for w := 0; w < workers; w++ {
        go func() {
            wg.Add(1)
            defer wg.Done()
            worker(w, jobs, results)
        }()
    }

    urls, err := readLines("flux.txt")
    if err != nil {
        log.Fatalf("readLines: %s", err)
    }

    for _, url := range urls {
        jobs <- url
    }

    close(jobs)

    wg.Wait()

    close(results)

    // 程序似乎已经运行完毕...
    for msg := range results {
        fmt.Println(msg)
    }
}

或者使用done通道:

package main

import (
    "bufio"
    "fmt"
    "github.com/SlyMarbo/rss"
    "log"
    "os"
)

func readLines(path string) ([]string, error) {
    file, err := os.Open(path)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var lines []string
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        lines = append(lines, scanner.Text())
    }
    return lines, scanner.Err()
}

func worker(id int, jobs <-chan string, results chan<- string, done chan struct{}) {
    for url := range jobs {
        fmt.Println("worker", id, "processing job", url)
        feed, err := rss.Fetch(url)
        if err != nil {
            fmt.Println("Error on:", url)
            continue
        }
        borne := 0
        for _, value := range feed.Items {
            if borne < 5 {
                results <- value.Link
                borne = borne + 1
            } else {
                continue
            }
        }
    }
    close(done)
}

const (
    workers = 16
)

func main() {
    jobs := make(chan string, 100)
    results := make(chan string, 100)
    dones := make([]chan struct{}, workers)

    for w := 0; w < workers; w++ {
        dones[w] = make(chan struct{})
        go worker(w, jobs, results, dones[w])
    }

    urls, err := readLines("flux.txt")
    if err != nil {
        log.Fatalf("readLines: %s", err)
    }

    for _, url := range urls {
        jobs <- url
    }


    close(jobs)

    for _, done := range dones {
        <-done
    }

    close(results)

    // 程序似乎已经运行完毕...
    for msg := range results {
        fmt.Println(msg)
    }
}
英文:

The problem is that, in the example you are referring to, the worker pool reads from results 9 times:

<!-- lang=go -->

for a := 1; a &lt;= 9; a++ {
&lt;-results
}

Your program, on the other hand, does a range loop over the results which has a different semantics in go. The range operator does not stop until the channel is closed.

<!-- lang=go -->

for msg := range results {
fmt.Println(msg)
}

To fix your problem you'd need to close the results channel. However, if you just call close(results) before the for loop, you most probably will
get a panic, because the workers might be writing on results.

To fix this problem, you need to add another channel to be notified when all the workers are done. You can do this either using a sync.WaitGroup or :

<!-- lang=go -->

const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
var wg sync.WaitGroup
for w := 0; w &lt; workers; w++ {
go func() {
wg.Add(1)
defer wg.Done()
worker(w, jobs, results)
}()
}
urls, err := readLines(&quot;flux.txt&quot;)
if err != nil {
log.Fatalf(&quot;readLines: %s&quot;, err)
}
for _, url := range urls {
jobs &lt;- url
}
close(jobs)
wg.Wait()
close(results)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}

Or a done channel:

<!-- lang=go -->

package main
import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;github.com/SlyMarbo/rss&quot;
&quot;log&quot;
&quot;os&quot;
)
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
func worker(id int, jobs &lt;-chan string, results chan&lt;- string, done chan struct{}) {
for url := range jobs {
fmt.Println(&quot;worker&quot;, id, &quot;processing job&quot;, url)
feed, err := rss.Fetch(url)
if err != nil {
fmt.Println(&quot;Error on: &quot;, url)
continue
}
borne := 0
for _, value := range feed.Items {
if borne &lt; 5 {
results &lt;- value.Link
borne = borne + 1
} else {
continue
}
}
}
close(done)
}
const (
workers = 16
)
func main() {
jobs := make(chan string, 100)
results := make(chan string, 100)
dones := make([]chan struct{}, workers)
for w := 0; w &lt; workers; w++ {
dones[w] = make(chan struct{})
go worker(w, jobs, results, dones[w])
}
urls, err := readLines(&quot;flux.txt&quot;)
if err != nil {
log.Fatalf(&quot;readLines: %s&quot;, err)
}
for _, url := range urls {
jobs &lt;- url
}
close(jobs)
for _, done := range dones {
&lt;-done
}
close(results)
// it seems program runs over...
for msg := range results {
fmt.Println(msg)
}
}

huangapple
  • 本文由 发表于 2015年8月7日 22:31:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/31880252.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定