在不阻塞的情况下从goroutine通道中读取

huangapple go评论75阅读模式
英文:

Reading from a goroutine channel without blocking

问题

我有两个goroutine:主要的worker和一个helperworker会为其提供一些帮助。helper可能会遇到错误,所以我使用一个通道来将错误从helper传递给worker

func helper(c chan<- error) {
    //做一些工作
    c <- err // 在c上发送错误/nil
}

这是如何调用helper()的:

func worker() error {
    //做一些工作
    c := make(chan error, 1)
    go helper(c)
    err := <-c
    return err
}

问题:

  • 语句err := <-c是否会阻塞worker?我认为不会,因为通道是有缓冲的。

  • 如果它是阻塞的,我如何使其非阻塞?我的要求是让worker及其调用者继续进行其余的工作,而不是“等待”值出现在通道上。

谢谢。

英文:

I have two goroutines: the main worker and a helper that it spins off for some help. helper can encounter errors, so I use a channel to communicate errors over from the helper to the worker.

func helper(c chan &lt;- error) (){
    //do some work
    c &lt;- err // send errors/nil on c
}

Here is how helper() is called:

func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    err := &lt;- c
    return err
}

Questions:

  • Is the statement err := &lt;- c blocking worker? I don't think so, since the channel is buffered.

  • If it is blocking, how do I make it non-blocking? My requirement is to have worker and its caller continue with rest of the work, without waiting for the value to appear on the channel.

Thanks.

答案1

得分: 4

你可以轻松验证

func helper(c chan<- error) {
    time.Sleep(5 * time.Second)
    c <- errors.New("")
}

func worker() error {
    fmt.Println("do one")

    c := make(chan error, 1)
    go helper(c)

    err := <-c
    fmt.Println("do two")

    return err
}

func main() {
    worker()
}

问:语句 err := <-c 是否会阻塞 worker?我认为不会,因为通道是有缓冲的。

答: err := <-c 会阻塞 worker。

问:如果它是阻塞的,我如何使其非阻塞?我的要求是让 worker 及其调用者继续执行其余的工作,而不等待通道上的值出现。

答: 如果你不想阻塞,只需移除 err := <-c。如果你需要在最后获取 err,只需将 err := <-c 移到最后。

你不能在不阻塞的情况下读取通道,如果你继续执行而不阻塞,除非你的代码在循环中。

Loop:
    for {
        select {
        case <-c:
            break Loop
        default:
            // default 会在不阻塞的情况下执行
        }
        // 做一些其他操作
    }

你见过 errgroup 或 waitgroup 吗?

它使用原子操作、取消上下文和 sync.Once 来实现。

https://github.com/golang/sync/blob/master/errgroup/errgroup.go

https://github.com/golang/go/blob/master/src/sync/waitgroup.go

或者你可以直接使用它,在任何你想要的地方启动 goroutine,然后等待错误发生。

英文:

You can easily verify

func helper(c chan&lt;- error) {
    time.Sleep(5 * time.Second)
    c &lt;- errors.New(&quot;&quot;) // send errors/nil on c
}

func worker() error {
    fmt.Println(&quot;do one&quot;)

    c := make(chan error, 1)
    go helper(c)

    err := &lt;-c
    fmt.Println(&quot;do two&quot;)

    return err
}

func main() {
    worker()
}

> Q: Is the statement err := <- c blocking worker? I don't think so, since the channel is buffered.

A: err := &lt;- c will block worker.

> Q: If it is blocking, how do I make it non-blocking? My requirement is to have worker and its caller continue with rest of the work, without waiting for the value to appear on the channel.

A: If you don't want blocking, just remove err := &lt;-c. If you need err at the end, just move err := &lt;-c to the end.

You can not read channel without blocking, if you go through without blocking, can can no more exec this code, unless your code is in a loop.

Loop:
    for {
        select {
        case &lt;-c:
            break Loop
        default:
            //default will go through without blocking
        }
        // do something
    }

And have you ever seen errgroup or waitgroup?

It use atomic, cancel context and sync.Once to implement this.

https://github.com/golang/sync/blob/master/errgroup/errgroup.go

https://github.com/golang/go/blob/master/src/sync/waitgroup.go

Or you can just use it, go you func and then wait for error in any place you want.

答案2

得分: 1

在你的代码中,剩下的工作与助手是否遇到错误无关。在完成剩下的工作后,你可以简单地从通道接收。

func worker() error {
	//做一些工作
	c := make(chan error, 1)
	go helper(c)
	//完成剩下的工作
	return <-c
}
英文:

In your code, the rest of the work is independent of whether the helper encountered an error. You can simply receive from the channel after the rest of the work is completed.

func worker() error {
	//do some work
	c := make(chan error, 1)
	go helper(c)
	//do rest of the work
	return &lt;-c
}

答案3

得分: 0

我认为你需要这段代码:

运行这段代码:

package main

import (
	"log"
	"sync"
)

func helper(c chan<- error) {

	for {
		var err error = nil
		// do job

		if err != nil {
			c <- err // send errors/nil on c
			break
		}
	}

}

func worker(c chan error) error {
	log.Println("第一个日志")

	go func() {
		helper(c)
	}()

	count := 1
Loop:
	for {
		select {
		case err := <-c:
			return err
		default:
			log.Println(count, "日志")
			count++
			isFinished := false
			// do your job
			if isFinished {
				break Loop // remove this when you test

			}
		}
	}
	return nil
}

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		c := make(chan error, 1)
		worker(c)
		wg.Done()
	}()
	wg.Wait()
}

希望对你有帮助!

英文:

I think you need this code..

run this code

package main

import (
	&quot;log&quot;
	&quot;sync&quot;
)

func helper(c chan&lt;- error) {

	for {
		var err error = nil
		// do job

		if err != nil {
			c &lt;- err // send errors/nil on c
			break
		}
	}

}

func worker(c chan error) error {
	log.Println(&quot;first log&quot;)

	go func() {
		helper(c)
	}()

	count := 1
	Loop:
		for {
			select {
			case err := &lt;- c :
				return err
			default:
				log.Println(count, &quot; log&quot;)
				count++
				isFinished := false
				// do your job
				if isFinished {
					break Loop // remove this when you test

				}
			}
		}
	return nil
}

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		c := make(chan error, 1)
		worker(c)
		wg.Done()
	}()
	wg.Wait()
}

答案4

得分: 0

以非阻塞方式从通道中读取的函数式方法:

func CollectChanOne[T any](ch <-chan T) (T, bool) {
    select {
    case val, stillOpen := <-ch:
        return val, stillOpen
    default:
        var zeroT T
        return zeroT, false
    }
}
func worker() error {
    // 做一些工作
    c := make(chan error, 1)
    go helper(c)
    err, _ := CollectChanOne(c) // 不会阻塞 worker
    return err
}

示例:https://go.dev/play/p/Njwyt32B4oT

注意:此示例还有另一种方法 CollectChanRemaining(),它会读取通道中的所有缓冲元素。

英文:

Functional way of reading from a channel in a non-blocking manner:

func CollectChanOne[T any](ch &lt;-chan T) (T, bool) {
	select {
	case val, stillOpen := &lt;-ch:
		return val, stillOpen
	default:
		var zeroT T
		return zeroT, false
	}
}



func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    err, _ := CollectChanOne(c) // Wont block worker
    return err
}

Example: https://go.dev/play/p/Njwyt32B4oT

N.B. This example also have another method CollectChanRemaining() which reads all buffered elements in the channel.

huangapple
  • 本文由 发表于 2021年11月24日 09:59:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/70089953.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定