将数据从一个goroutine发送到多个其他goroutine

huangapple go评论153阅读模式
英文:

Sending data from one goroutine to multiple other goroutines

问题

在一个项目中,程序通过websocket接收数据。这些数据需要由n个算法进行处理。算法的数量可以动态变化。

我的尝试是创建一种发布/订阅模式,可以动态地启动和取消订阅。结果发现这比预期的要困难一些。

以下是我想出的解决方案(基于https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):

package pubsub

import (
	"context"
	"sync"
	"time"
)

type Pubsub struct {
	sync.RWMutex
	subs   []*Subsciption
	closed bool
}

func New() *Pubsub {
	ps := &Pubsub{}
	ps.subs = []*Subsciption{}
	return ps
}

func (ps *Pubsub) Publish(msg interface{}) {
	ps.RLock()
	defer ps.RUnlock()

	if ps.closed {
		return
	}

	for _, sub := range ps.subs {
        // ISSUE1: 这些goroutine显然无法正确退出...
		go func(ch chan interface{}) {
			ch <- msg
		}(sub.Data)
	}
}

func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
	ps.Lock()
	defer ps.Unlock()

	// 准备通道
	ctx, cancel := context.WithCancel(context.Background())
	sub := &Subsciption{
		Data:   make(chan interface{}, 1),
		cancel: cancel,
		ps:     ps,
	}

	// 准备订阅
	ps.subs = append(ps.subs, sub)
	return ctx, sub, nil
}

func (ps *Pubsub) unsubscribe(s *Subsciption) bool {
	ps.Lock()
	defer ps.Unlock()

	found := false
	index := 0
	for i, sub := range ps.subs {
		if sub == s {
			index = i
			found = true
		}
	}
	if found {
		s.cancel()
		ps.subs[index] = ps.subs[len(ps.subs)-1]
		ps.subs = ps.subs[:len(ps.subs)-1]

		// ISSUE2: 异步关闭通道,延迟一段时间以确保
		// 通过Publish()中的挂起goroutine不会再向通道写入任何内容
		go func(ch chan interface{}) {
			time.Sleep(500 * time.Millisecond)
			close(ch)
		}(s.Data)
	}
	return found
}

func (ps *Pubsub) Close() {
	ps.Lock()
	defer ps.Unlock()

	if !ps.closed {
		ps.closed = true
		for _, sub := range ps.subs {
			sub.cancel()

			// ISSUE2: 异步关闭通道,延迟一段时间以确保
			// 通过Publish()中的挂起goroutine不会再向通道写入任何内容
			go func(ch chan interface{}) {
				time.Sleep(500 * time.Millisecond)
				close(ch)
			}(sub.Data)
		}
	}
}

type Subsciption struct {
	Data   chan interface{}
	cancel func()
	ps     *Pubsub
}

func (s *Subsciption) Unsubscribe() {
	s.ps.unsubscribe(s)
}

如评论中所述,这段代码存在(至少)两个问题:

ISSUE1:

在实现中运行一段时间后,我会遇到一些类似这样的错误:

goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
/home/X/Projects/bm/internal/pubsub/pubsub.go:30
created by bookmaker/internal/pubsub.(*Pubsub).Publish
/home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb

我并不真正理解这个错误,但我觉得在Publish()中创建的goroutine会累积/泄漏。这是正确的吗?我在这里做错了什么?

ISSUE2:

当我通过Unsubscribe()结束订阅时,Publish()尝试向一个已关闭的通道写入数据,导致panic。为了解决这个问题,我创建了一个goroutine来延迟关闭通道。这感觉不是最佳实践,但我找不到一个合适的解决方案。有什么确定性的方法可以解决这个问题吗?

这是一个小的playground供您测试:https://play.golang.org/p/K-L8vLjt7_9

英文:

In a project the program receives data via websocket. This data needs to be processed by n algorithms. The amount of algorithms can change dynamically.

My attempt is to create some pub/sub pattern where subscriptions can be started and canceled on the fly. Turns out that this is a bit more challenging than expected.

Here's what I came up with (which is based on https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):

package pubsub

import (
	&quot;context&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type Pubsub struct {
	sync.RWMutex
	subs   []*Subsciption
	closed bool
}

func New() *Pubsub {
	ps := &amp;Pubsub{}
	ps.subs = []*Subsciption{}
	return ps
}

func (ps *Pubsub) Publish(msg interface{}) {
	ps.RLock()
	defer ps.RUnlock()

	if ps.closed {
		return
	}

	for _, sub := range ps.subs {
        // ISSUE1: These goroutines apparently do not exit properly... 
		go func(ch chan interface{}) {
			ch &lt;- msg
		}(sub.Data)
	}
}

func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
	ps.Lock()
	defer ps.Unlock()

	// prep channel
	ctx, cancel := context.WithCancel(context.Background())
	sub := &amp;Subsciption{
		Data:   make(chan interface{}, 1),
		cancel: cancel,
		ps:     ps,
	}

	// prep subsciption
	ps.subs = append(ps.subs, sub)
	return ctx, sub, nil
}

func (ps *Pubsub) unsubscribe(s *Subsciption) bool {
	ps.Lock()
	defer ps.Unlock()

	found := false
	index := 0
	for i, sub := range ps.subs {
		if sub == s {
			index = i
			found = true
		}
	}
	if found {
		s.cancel()
		ps.subs[index] = ps.subs[len(ps.subs)-1]
		ps.subs = ps.subs[:len(ps.subs)-1]

		// ISSUE2: close the channel async with a delay to ensure
		// nothing will be written to the channel anymore
		// via a pending goroutine from Publish()
		go func(ch chan interface{}) {
			time.Sleep(500 * time.Millisecond)
			close(ch)
		}(s.Data)
	}
	return found
}

func (ps *Pubsub) Close() {
	ps.Lock()
	defer ps.Unlock()

	if !ps.closed {
		ps.closed = true
		for _, sub := range ps.subs {
			sub.cancel()

			// ISSUE2: close the channel async with a delay to ensure
			// nothing will be written to the channel anymore
			// via a pending goroutine from Publish()
			go func(ch chan interface{}) {
				time.Sleep(500 * time.Millisecond)
				close(ch)
			}(sub.Data)
		}
	}
}

type Subsciption struct {
	Data   chan interface{}
	cancel func()
	ps     *Pubsub
}

func (s *Subsciption) Unsubscribe() {
	s.ps.unsubscribe(s)
}

As mentioned in the comments there are (at least) two issues with this:

ISSUE1:

After a while of running in implementation of this I get a few errors of this kind:

goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
/home/X/Projects/bm/internal/pubsub/pubsub.go:30
created by bookmaker/internal/pubsub.(*Pubsub).Publish
/home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb

Without really understanding this it appears to me that the goroutines created in Publish() do accumulate/leak. Is this correct and what am I doing wrong here?

ISSUE2:

When I end a subscription via Unsubscribe() it occurs that Publish() tried to write to a closed channel and panics. To mitigate this I have created a goroutine to close the channel with a delay. This feel really off-best-practice but I was not able to find a proper solution to this. What would be a deterministic way to do this?

Heres a little playground for you to test with: https://play.golang.org/p/K-L8vLjt7_9

答案1

得分: 1

在深入讨论你的解决方案及其问题之前,让我再次推荐另一种经纪人方法,该方法在这个答案中有介绍:如何使用通道广播消息

现在让我们来看看你的解决方案。


每当启动一个goroutine时,都要考虑它将如何结束,并确保在goroutine不应该在应用程序的整个生命周期内运行时结束。

// 问题1:这些goroutine显然没有正确退出...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)

这个goroutine试图在ch上发送一个值。这可能是一个阻塞操作:如果ch的缓冲区已满,并且没有准备好的接收者在ch上,它将被阻塞。这超出了启动的goroutine的控制范围,也超出了pubsub包的控制范围。在某些情况下,这可能是可以接受的,但这已经给包的用户增加了负担。尽量避免这种情况。尽量创建易于使用且不容易被误用的API。

此外,仅为了在通道上发送一个值而启动一个goroutine是一种资源浪费(goroutine是廉价且轻量级的,但你不应该随意滥用它们)。

你这样做是因为你不想被阻塞。为了避免阻塞,你可以使用带有“合理”高缓冲区的缓冲通道。是的,这并不能解决阻塞问题,只是在“慢”的客户端从通道接收时有所帮助。

为了在不启动goroutine的情况下“真正”避免阻塞,你可以使用非阻塞发送:

select {
case ch <- msg:
default:
// ch的缓冲区已满,我们现在无法发送
}

如果可以在ch上进行发送,就会执行发送操作。如果不能,将立即选择default分支。然后你必须决定接下来该做什么。丢失一条消息是否可以接受?是否可以等待一段时间直到“放弃”?或者是否可以启动一个goroutine来处理这个问题(但那样你又回到了我们试图解决的问题)?或者是否可以被阻塞直到客户端能够从通道接收...

选择一个合理的高缓冲区,如果遇到仍然满的情况,可能可以阻塞直到客户端能够前进并从消息中接收。如果不能,那么你的整个应用程序可能处于一个不可接受的状态,可以“挂起”或“崩溃”。

// 问题2:异步延迟关闭通道,以确保
// 不会再向通道写入任何内容
// 通过来自Publish()的挂起goroutine
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)

关闭通道是向接收者发送信号,表示不会再向通道发送更多的值。因此,关闭通道应该是发送者的工作(和责任)。启动一个goroutine来关闭通道,你将这个工作和责任交给了另一个“实体”(一个goroutine),这个实体将不会与发送者同步。这很容易导致恐慌(在关闭的通道上发送是运行时恐慌,有关其他公理,请参见https://stackoverflow.com/questions/39015602/how-does-a-non-initialized-channel-behave/39016004#39016004)。不要这样做。

是的,这是必要的,因为你启动了goroutine来发送。如果你不这样做,那么你可以在原地关闭,而不启动一个goroutine,因为发送者和关闭者将是同一个实体:Pubsub本身,其发送和关闭操作受到互斥锁的保护。因此,解决第一个问题自然地解决了第二个问题。

通常,如果一个通道有多个发送者,那么关闭通道必须进行协调。必须有一个单一的实体(通常不是任何一个发送者),等待所有发送者完成,实际上使用sync.WaitGroup,然后该单一实体可以安全地关闭通道。参见https://stackoverflow.com/questions/34283255/closing-channel-of-unknown-length/34283635#34283635。

英文:

Before diving into your solution and its issues, let me recommend again another Broker approach presented in this answer: How to broadcast message using channel

Now on to your solution.


Whenever you launch a goroutine, always think of how it will end and make sure it does if the goroutine is not ought to run for the lifetime of your app.

// ISSUE1: These goroutines apparently do not exit properly... 
go func(ch chan interface{}) {
ch &lt;- msg
}(sub.Data)

This goroutine tries to send a value on ch. This may be a blocking operation: it will block if ch's buffer is full and there is no ready receiver on ch. This is out of the control of the launched goroutine, and also out of the control of the pubsub package. This may be fine in some cases, but this already places a burden on the users of the package. Try to avoid these. Try to create APIs that are easy to use and hard to misuse.

Also, launching a goroutine just to send a value on a channel is a waste of resources (goroutines are cheap and light, but you shouldn't spam them whenever you can).

You do it because you don't want to get blocked. To avoid blocking, you may use a buffered channel with a "reasonable" high buffer. Yes, this doesn't solve the blocking issue, in only helps with "slow" clients receiving from the channel.

To "truly" avoid blocking without launching a goroutine, you may use non-blocking send:

select {
case ch &lt;- msg:
default:
// ch&#39;s buffer is full, we cannot deliver now
}

If send on ch can proceed, it will happen. If not, the default branch is chosen immediately. You have to decide what to do then. Is it acceptable to "lose" a message? Is it acceptable to wait for some time until "giving up"? Or is it acceptable to launch a goroutine to do this (but then you'll be back at what we're trying to fix here)? Or is it acceptable to get blocked until the client can receive from the channel...

Choosing a reasonable high buffer, if you encounter a situation when it still gets full, it may be acceptable to block until the client can advance and receive from the message. If it can't, then your whole app might be in an unacceptable state, and it might be acceptable to "hang" or "crash".

// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)

Closing a channel is a signal to the receiver(s) that no more values will be sent on the channel. So always it should be the sender's job (and responsibility) to close the channel. Launching a goroutine to close the channel, you "hand" that job and responsibility to another "entity" (a goroutine) that will not be synchronized to the sender. This may easily end up in a panic (sending on a closed channel is a runtime panic, for other axioms see https://stackoverflow.com/questions/39015602/how-does-a-non-initialized-channel-behave/39016004#39016004). Don't do that.

Yes, this was necessary because you launched goroutines to send. If you don't do that, then you may close "in-place", without launching a goroutine, because then the sender and closer will be the same entity: the Pubsub itself, whose sending and closing operations are protected by a mutex. So solving the first issue solves the second naturally.

In general if there are multiple senders for a channel, then closing the channel must be coordinated. There must be a single entity (often not any of the senders) that waits for all senders to finish, practically using a sync.WaitGroup, and then that single entity can close the channel, safely. See https://stackoverflow.com/questions/34283255/closing-channel-of-unknown-length/34283635#34283635.

huangapple
  • 本文由 发表于 2021年5月27日 15:35:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/67717657.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定