不使用互斥锁的Go PubSub?

huangapple go评论89阅读模式
英文:

Go PubSub without mutexes?

问题

我将在网站后端实现通知系统,每次访问页面时,用户将订阅显示在页面上的某些数据,当系统发生变化时,用户将收到通知。例如,当有人浏览包含新闻文章的页面时,如果发布了一篇新文章,我希望通知用户,以便他可以通过JavaScript获取这些新文章或重新加载页面来获取。可以手动或自动进行。

为了实现这一点,我将使用发布/订阅方式中的通道。例如,将有一个名为"news"的通道。当创建新文章时,该通道将接收到该文章的ID。当用户打开页面并订阅"news"通道(可能通过WebSocket)时,必须有一个订阅者列表,将其添加为要通知的订阅者。

类似于:

type Channel struct {
  ingres <-chan int // 新闻文章ID
  subs []chan<- int
  mx sync.Mutex
}

每个通道都将有一个goroutine,将传入的数据分发到subs列表中。

现在我遇到的问题,可能是过早优化的问题,是将会有大量的通道和大量的订阅者进出。这意味着会有很多阻塞事件和互斥锁。例如,如果有10,000个在线用户订阅了新闻频道,那么goroutine将不得不发送10,000个通知,同时subs切片将被锁定,因此新的订阅者将不得不等待互斥锁解锁。现在将这个问题乘以100个通道,我认为我们有一个问题。

因此,我正在寻找一种在不阻塞其他订阅者的情况下添加和删除订阅者的方法,或者在整个系统中以可接受的时间接收通知。

可以通过为每个订阅者创建一个带有超时的goroutine来解决"等待所有订阅者接收"的问题,因此在接收到ID后,将创建10,000个goroutine,并且互斥锁可以立即解锁。但是,对于多个通道,这可能会累积起来。

英文:

I will be implementing notification system into website backend where each page visit will subscribe user to some data that are displayed on the page and when there are changes in the system, he will be notified about it. For example someone is viewing a page with news articles and when a new article is posted, i want to notify the user so he can then fetch these new articles via js or by reloading the page. Either manually or automatically.

To make this happen I will be using channels in a pub/sub manner. So for example there will be a "news" channel. When new article is created, this channel will receive id of this article. When user opens up a page and subscribes to "news" channel(probably via websocket), there will have to be a list of subscribers for this news channel into which he will be added as a subscriber to be notified.

Something like:

type Channel struct {
  ingres &lt;-chan int // news article id
  subs [] chan&lt;- int
  mx sync.Mutex
}

There will be goroutine for each of these that will be distributing what comes into ingress into the subs list.

Now the problem I am looking at, probably premature optimization, is that there will be a lot of channels and a lot of coming and going subscribers. Which means there will be a lot of stop-the-world events with mutextes. For example if there are 10 000 users online, subscribed to news channel, the goroutine will have to send 10k notifications WHILE the subs slice will be locked so new subscribers will have to wait for mutex to unlock. And now multiply this by 100 channels and I think we have a problem.

So I am looking for a way to add and remove subscribers without blocking other subscribers from being added or removed or potentially just to receive the notification in acceptable time across the board.

That "waiting for all subs to receive" problem can be solved with goroutine for each sub with timeout so after the id is received, 10k goroutines will be created and mutex can be unlocked right away. But still, it can add up with multiple channels.

答案1

得分: 0

根据链接的评论,我编写了以下代码:

package notif

import (
	"context"
	"sync"
	"time"
	"unsafe"
)

type Client struct {
	recv   chan interface{}
	ch     *Channel
	o      sync.Once
	ctx    context.Context
	cancel context.CancelFunc
}

// 如果此客户端只能写入,则为nil
func (c *Client) Listen() <-chan interface{} {
	return c.recv
}

func (c *Client) Close() {
	select {
	case <-c.ctx.Done():
	case c.ch.unsubscribe <- c:
	}
}

func (c *Client) Done() <-chan struct{} {
	return c.ctx.Done()
}

func (c *Client) doClose() {
	c.o.Do(func() {
		c.cancel()
		if c.recv != nil {
			close(c.recv)
		}
	})
}

func (c *Client) send(msg interface{}) {
	// 只写客户端不处理任何消息
	if c.recv == nil {
		return
	}
	t := time.NewTimer(c.ch.sc)
	select {
	case <-c.ctx.Done():
	case c.recv <- msg:
	case <-t.C:
		// 超时/消费者缓慢,关闭连接
		c.Close()
	}
}

func (c *Client) Broadcast(payload interface{}) bool {
	select {
	case <-c.ctx.Done():
		return false
	default:
		c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}
		return true
	}
}

type envelope struct {
	Message interface{}
	Sender  uintptr
}

// leech 是阻塞通道,因此应在内部调用goroutine使其非阻塞
// 这是为了确保按正确的顺序处理leeched消息。
func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {
	return &Channel{
		name:        name,
		ingres:      make(chan interface{}, 1000),
		subscribe:   make(chan *Client, 1000),
		unsubscribe: make(chan *Client, 1000),
		aud:         make(map[*Client]struct{}, 1000),
		ctx:         ctx,
		sc:          slowConsumer,
		empty:       emptyCh,
		leech:       leech,
	}
}

type Channel struct {
	name        string
	ingres      chan interface{}
	subscribe   chan *Client
	unsubscribe chan *Client
	aud         map[*Client]struct{}
	ctx         context.Context
	sc          time.Duration
	empty       chan string
	leech       func(interface{})
}

func (ch *Channel) Id() string {
	return ch.name
}

// 订阅默认为读写。通过提供"writeOnly=true",可以切换为只写模式
// 在这种情况下,客户端不会因为读取速度慢而断开连接。
func (ch *Channel) Subscribe(writeOnly ...bool) *Client {
	c := &Client{
		ch: ch,
	}
	if len(writeOnly) == 0 || writeOnly[0] == false {
		c.recv = make(chan interface{})
	}
	c.ctx, c.cancel = context.WithCancel(ch.ctx)
	ch.subscribe <- c
	return c
}

func (ch *Channel) Broadcast() chan<- interface{} {
	return ch.ingres
}

// 在上下文取消后返回
func (ch *Channel) Start() {
	for {
		select {
		case <-ch.ctx.Done():
			for cl := range ch.aud {
				delete(ch.aud, cl)
				cl.doClose()
			}
			return
		case cl := <-ch.subscribe:
			ch.aud[cl] = struct{}{}

		case cl := <-ch.unsubscribe:
			delete(ch.aud, cl)
			cl.doClose()
			if len(ch.aud) == 0 {
				ch.signalEmpty()
			}

		case msg := <-ch.ingres:
			e, ok := msg.(*envelope)
			if ok {
				msg = e.Message
			}
			for cl := range ch.aud {
				if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {
					go cl.send(e.Message)
				}
			}
			if ch.leech != nil {
				ch.leech(msg)
			}
		}
	}
}

func (ch *Channel) signalEmpty() {
	if ch.empty == nil {
		return
	}

	select {
	case ch.empty <- ch.name:
	default:
	}
}

type subscribeRequest struct {
	name string
	recv chan *Client
	wo   bool
}

type broadcastRequest struct {
	name string
	recv chan *Channel
}

type brokeredChannel struct {
	ch     *Channel
	cancel context.CancelFunc
}

type brokerLeech interface {
	Match(string) func(interface{})
}

func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {
	return &Broker{
		chans:     make(map[string]*brokeredChannel, 100),
		subscribe: make(chan *subscribeRequest, 10),
		broadcast: make(chan *broadcastRequest, 10),
		ctx:       ctx,
		sc:        slowConsumer,
		empty:     make(chan string, 10),
		leech:     leech,
	}
}

type Broker struct {
	chans     map[string]*brokeredChannel
	subscribe chan *subscribeRequest
	broadcast chan *broadcastRequest
	ctx       context.Context
	sc        time.Duration
	empty     chan string
	leech     brokerLeech
}

// 在上下文取消后返回
func (b *Broker) Start() {
	for {
		select {
		case <-b.ctx.Done():
			return
		case req := <-b.subscribe:
			ch, ok := b.chans[req.name]
			if ok == false {
				ctx, cancel := context.WithCancel(b.ctx)
				var l func(interface{})
				if b.leech != nil {
					l = b.leech.Match(req.name)
				}
				ch = &brokeredChannel{
					ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),
					cancel: cancel,
				}
				b.chans[req.name] = ch
				go ch.ch.Start()
			}
			req.recv <- ch.ch.Subscribe(req.wo)

		case req := <-b.broadcast:
			ch, ok := b.chans[req.name]
			if ok == false {
				ctx, cancel := context.WithCancel(b.ctx)
				var l func(interface{})
				if b.leech != nil {
					l = b.leech.Match(req.name)
				}
				ch = &brokeredChannel{
					ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),
					cancel: cancel,
				}
				b.chans[req.name] = ch
				go ch.ch.Start()
			}
			req.recv <- ch.ch

		case name := <-b.empty:
			if ch, ok := b.chans[name]; ok {
				ch.cancel()
				delete(b.chans, name)
			}
		}
	}
}

// 订阅默认为读写。通过提供"writeOnly=true",可以切换为只写模式
// 在这种情况下,客户端不会因为读取速度慢而断开连接。
func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {
	req := &subscribeRequest{
		name: name,
		recv: make(chan *Client),
		wo:   len(writeOnly) > 0 && writeOnly[0] == true,
	}
	b.subscribe <- req
	c := <-req.recv
	close(req.recv)
	return c
}

func (b *Broker) Broadcast(name string) chan<- interface{} {
	req := &broadcastRequest{name: name, recv: make(chan *Channel)}
	b.broadcast <- req
	ch := <-req.recv
	close(req.recv)
	return ch.ingres
}

希望对你有帮助!如果有任何其他问题,请随时问我。

英文:

Based on the linked comments I have came up with this code:

package notif
import (
&quot;context&quot;
&quot;sync&quot;
&quot;time&quot;
&quot;unsafe&quot;
)
type Client struct {
recv   chan interface{}
ch     *Channel
o      sync.Once
ctx    context.Context
cancel context.CancelFunc
}
// will be nil if this client is write-only
func (c *Client) Listen() &lt;-chan interface{} {
return c.recv
}
func (c *Client) Close() {
select {
case &lt;-c.ctx.Done():
case c.ch.unsubscribe &lt;- c:
}
}
func (c *Client) Done() &lt;-chan struct{} {
return c.ctx.Done()
}
func (c *Client) doClose() {
c.o.Do(func() {
c.cancel()
if c.recv != nil {
close(c.recv)
}
})
}
func (c *Client) send(msg interface{}) {
// write-only clients will not handle any messages
if c.recv == nil {
return
}
t := time.NewTimer(c.ch.sc)
select {
case &lt;-c.ctx.Done():
case c.recv &lt;- msg:
case &lt;-t.C:
// time out/slow consumer, close the connection
c.Close()
}
}
func (c *Client) Broadcast(payload interface{}) bool {
select {
case &lt;-c.ctx.Done():
return false
default:
c.ch.Broadcast() &lt;- &amp;envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}
return true
}
}
type envelope struct {
Message interface{}
Sender  uintptr
}
// leech is channel-blocking so goroutine should be called internally to make it non-blocking
// this is to ensure proper order of leeched messages.
func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {
return &amp;Channel{
name:        name,
ingres:      make(chan interface{}, 1000),
subscribe:   make(chan *Client, 1000),
unsubscribe: make(chan *Client, 1000),
aud:         make(map[*Client]struct{}, 1000),
ctx:         ctx,
sc:          slowConsumer,
empty:       emptyCh,
leech:       leech,
}
}
type Channel struct {
name        string
ingres      chan interface{}
subscribe   chan *Client
unsubscribe chan *Client
aud         map[*Client]struct{}
ctx         context.Context
sc          time.Duration
empty       chan string
leech       func(interface{})
}
func (ch *Channel) Id() string {
return ch.name
}
// subscription is read-write by default. by providing &quot;writeOnly=true&quot;, it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (ch *Channel) Subscribe(writeOnly ...bool) *Client {
c := &amp;Client{
ch: ch,
}
if len(writeOnly) == 0 || writeOnly[0] == false {
c.recv = make(chan interface{})
}
c.ctx, c.cancel = context.WithCancel(ch.ctx)
ch.subscribe &lt;- c
return c
}
func (ch *Channel) Broadcast() chan&lt;- interface{} {
return ch.ingres
}
// returns once context is cancelled
func (ch *Channel) Start() {
for {
select {
case &lt;-ch.ctx.Done():
for cl := range ch.aud {
delete(ch.aud, cl)
cl.doClose()
}
return
case cl := &lt;-ch.subscribe:
ch.aud[cl] = struct{}{}
case cl := &lt;-ch.unsubscribe:
delete(ch.aud, cl)
cl.doClose()
if len(ch.aud) == 0 {
ch.signalEmpty()
}
case msg := &lt;-ch.ingres:
e, ok := msg.(*envelope)
if ok {
msg = e.Message
}
for cl := range ch.aud {
if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {
go cl.send(e.Message)
}
}
if ch.leech != nil {
ch.leech(msg)
}
}
}
}
func (ch *Channel) signalEmpty() {
if ch.empty == nil {
return
}
select {
case ch.empty &lt;- ch.name:
default:
}
}
type subscribeRequest struct {
name string
recv chan *Client
wo   bool
}
type broadcastRequest struct {
name string
recv chan *Channel
}
type brokeredChannel struct {
ch     *Channel
cancel context.CancelFunc
}
type brokerLeech interface {
Match(string) func(interface{})
}
func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {
return &amp;Broker{
chans:     make(map[string]*brokeredChannel, 100),
subscribe: make(chan *subscribeRequest, 10),
broadcast: make(chan *broadcastRequest, 10),
ctx:       ctx,
sc:        slowConsumer,
empty:     make(chan string, 10),
leech:     leech,
}
}
type Broker struct {
chans     map[string]*brokeredChannel
subscribe chan *subscribeRequest
broadcast chan *broadcastRequest
ctx       context.Context
sc        time.Duration
empty     chan string
leech     brokerLeech
}
// returns once context is cancelled
func (b *Broker) Start() {
for {
select {
case &lt;-b.ctx.Done():
return
case req := &lt;-b.subscribe:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &amp;brokeredChannel{
ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv &lt;- ch.ch.Subscribe(req.wo)
case req := &lt;-b.broadcast:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &amp;brokeredChannel{
ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv &lt;- ch.ch
case name := &lt;-b.empty:
if ch, ok := b.chans[name]; ok {
ch.cancel()
delete(b.chans, name)
}
}
}
}
// subscription is read-write by default. by providing &quot;writeOnly=true&quot;, it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {
req := &amp;subscribeRequest{
name: name,
recv: make(chan *Client),
wo:   len(writeOnly) &gt; 0 &amp;&amp; writeOnly[0] == true,
}
b.subscribe &lt;- req
c := &lt;-req.recv
close(req.recv)
return c
}
func (b *Broker) Broadcast(name string) chan&lt;- interface{} {
req := &amp;broadcastRequest{name: name, recv: make(chan *Channel)}
b.broadcast &lt;- req
ch := &lt;-req.recv
close(req.recv)
return ch.ingres
}

huangapple
  • 本文由 发表于 2021年12月30日 07:36:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/70526099.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定