其他goroutine处理完所有项之前,通道关闭了。

huangapple go评论72阅读模式
英文:

Channel closing before all items are processed by other goroutines

问题

将初始数据通道的延迟关闭导致此代码提前结束吗?

输出结果为:

添加函数
添加函数
生成 a
生成 b
运行 a
运行 b
生成 c <- 最后生成的项目,因此延迟关闭关闭此通道
运行 a
运行 b
运行 c <- 为什么 'c' 只运行一次?
接收到数据 a <- 为什么只接收到 'a'?...没有接收到 'b' 或 'c'
完成

在代码片段中,有一个初始的 "data" 通道,它接收字符串并将它们传递给其他函数,但并不是所有发送到 "data" 通道的项目都被两个函数调用处理。关闭 "data" 通道不应导致通道中仍存在的项目无法被读取。

链接:https://go.dev/play/p/2TRUpBlWONd

package main

import (
	"fmt"
)

func run(data chan string, fn func(x string) (string, error)) (chan string, chan error) {
	outCh := make(chan string)
	errCh := make(chan error)

	go func() {
		defer close(outCh)

		for d := range data {
			fmt.Println("running", d)
			out, err := fn(d)
			if err != nil {
				errCh <- err
				continue
			}
			outCh <- out
		}
	}()
	return outCh, errCh
}

func generate(data chan string) {
	// Close the data channel to signal other goroutines that
	// we are done sending data into the channel.
	defer close(data)
	for _, x := range []string{"a", "b", "c"} {
		fmt.Println("generate", x)
		data <- x
	}
}

func main() {
	// Number of functions to run
	N := 2

	// Channel to stream data into the functions
	data := make(chan string)
	// Channel to catch errors from the functions
	errCh := make(chan error)

	// Create a slice of functions that take a string and
	// return a string value and an error value.
	functions := []func(x string) (string, error){}

	// Generate the data to stream into the data channel.
	// This data feeds the functions.
	go generate(data)

	// Generate N number of functions to process stuff from the data channel.
	for i := 0; i < N; i++ {
		fmt.Println("adding function")
		functions = append(functions, func(x string) (string, error) {
			return fmt.Sprintf(x), nil
		})
	}

	// run each of the functions on the items in the data channel
	// and return the data or an error
	for i := 0; i < N; i++ {
		data, errCh = run(data, functions[i])
	}

	select {
	case x := <-data:
		fmt.Println("received data", x)
	case err := <-errCh:
		fmt.Println("received error", err)
	}

	fmt.Println("done")

}
英文:

Is defer closing the initial data channel causing this code to end early?

The output is

adding function
adding function
generate a
generate b
running a
running b
generate c &lt;- last item generated so defer close closes this channel
running a  
running b 
running c &lt;- why is &#39;c&#39; is only run once ?
received data a &lt;- why only received &#39;a&#39; ? ... didn&#39;t receive &#39;b&#39; or &#39;c&#39;
done

In the code snippet there is an initial "data" channel that takes in string and passing them to other functions but not all of the items sent into the "data" channel are processed by the two function calls. Closing the "data" channel shouldn't cause any items still in the channel to not be read?

https://go.dev/play/p/2TRUpBlWONd

package main
import (
&quot;fmt&quot;
)
func run(data chan string, fn func(x string) (string, error)) (chan string, chan error) {
outCh := make(chan string)
errCh := make(chan error)
go func() {
defer close(outCh)
for d := range data {
fmt.Println(&quot;running&quot;, d)
out, err := fn(d)
if err != nil {
errCh &lt;- err
continue
}
outCh &lt;- out
}
}()
return outCh, errCh
}
func generate(data chan string) {
// Close the data channel to signal other goroutines that
// we are done sending data into the channel.
defer close(data)
for _, x := range []string{&quot;a&quot;, &quot;b&quot;, &quot;c&quot;} {
fmt.Println(&quot;generate&quot;, x)
data &lt;- x
}
}
func main() {
// Number of functions to run
N := 2
// Channel to stream data into the functions
data := make(chan string)
// Channel to catch errors from the functions
errCh := make(chan error)
// Create a slice of functions that take a string and
// return a string value and an error value.
functions := []func(x string) (string, error){}
// Generate the data to stream into the data channel.
// This data feeds the functions.
go generate(data)
// Generate N number of functions to process stuff from the data channel.
for i := 0; i &lt; N; i++ {
fmt.Println(&quot;adding function&quot;)
functions = append(functions, func(x string) (string, error) {
return fmt.Sprintf(x), nil
})
}
// run each of the functions on the items in the data channel
// and return the data or an error
for i := 0; i &lt; N; i++ {
data, errCh = run(data, functions[i])
}
select {
case x := &lt;-data:
fmt.Println(&quot;received data&quot;, x)
case err := &lt;-errCh:
fmt.Println(&quot;received error&quot;, err)
}
fmt.Println(&quot;done&quot;)
}

答案1

得分: 2

在最后一步中,您只从通道接收一条消息。您需要将其放入一个循环中,直到通道关闭:

for {
  select {
    case x, ok := <-data:
        if !ok {
           return // 完成
        }
        fmt.Println("接收到数据", x)
    case err := <-errCh:
        fmt.Println("接收到错误", err)
    }
}
英文:

In the last step, you are receiving only one message from the channel. You need to put that into a loop until the channel is closed:

for {
select {
case x,ok := &lt;-data:
if !ok {
return // Done
}
fmt.Println(&quot;received data&quot;, x)
case err := &lt;-errCh:
fmt.Println(&quot;received error&quot;, err)
}
}

huangapple
  • 本文由 发表于 2023年3月17日 10:28:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/75763275.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定