英文:
Go PubSub without mutexes?
问题
我将在网站后端实现通知系统,每次访问页面时,用户将订阅显示在页面上的某些数据,当系统发生变化时,用户将收到通知。例如,当有人浏览包含新闻文章的页面时,如果发布了一篇新文章,我希望通知用户,以便他可以通过JavaScript获取这些新文章或重新加载页面来获取。可以手动或自动进行。
为了实现这一点,我将使用发布/订阅方式中的通道。例如,将有一个名为"news"的通道。当创建新文章时,该通道将接收到该文章的ID。当用户打开页面并订阅"news"通道(可能通过WebSocket)时,必须有一个订阅者列表,将其添加为要通知的订阅者。
类似于:
type Channel struct {
ingres <-chan int // 新闻文章ID
subs []chan<- int
mx sync.Mutex
}
每个通道都将有一个goroutine,将传入的数据分发到subs列表中。
现在我遇到的问题,可能是过早优化的问题,是将会有大量的通道和大量的订阅者进出。这意味着会有很多阻塞事件和互斥锁。例如,如果有10,000个在线用户订阅了新闻频道,那么goroutine将不得不发送10,000个通知,同时subs切片将被锁定,因此新的订阅者将不得不等待互斥锁解锁。现在将这个问题乘以100个通道,我认为我们有一个问题。
因此,我正在寻找一种在不阻塞其他订阅者的情况下添加和删除订阅者的方法,或者在整个系统中以可接受的时间接收通知。
可以通过为每个订阅者创建一个带有超时的goroutine来解决"等待所有订阅者接收"的问题,因此在接收到ID后,将创建10,000个goroutine,并且互斥锁可以立即解锁。但是,对于多个通道,这可能会累积起来。
英文:
I will be implementing notification system into website backend where each page visit will subscribe user to some data that are displayed on the page and when there are changes in the system, he will be notified about it. For example someone is viewing a page with news articles and when a new article is posted, i want to notify the user so he can then fetch these new articles via js or by reloading the page. Either manually or automatically.
To make this happen I will be using channels in a pub/sub manner. So for example there will be a "news" channel. When new article is created, this channel will receive id of this article. When user opens up a page and subscribes to "news" channel(probably via websocket), there will have to be a list of subscribers for this news channel into which he will be added as a subscriber to be notified.
Something like:
type Channel struct {
ingres <-chan int // news article id
subs [] chan<- int
mx sync.Mutex
}
There will be goroutine for each of these that will be distributing what comes into ingress into the subs list.
Now the problem I am looking at, probably premature optimization, is that there will be a lot of channels and a lot of coming and going subscribers. Which means there will be a lot of stop-the-world events with mutextes. For example if there are 10 000 users online, subscribed to news channel, the goroutine will have to send 10k notifications WHILE the subs slice will be locked so new subscribers will have to wait for mutex to unlock. And now multiply this by 100 channels and I think we have a problem.
So I am looking for a way to add and remove subscribers without blocking other subscribers from being added or removed or potentially just to receive the notification in acceptable time across the board.
That "waiting for all subs to receive" problem can be solved with goroutine for each sub with timeout so after the id is received, 10k goroutines will be created and mutex can be unlocked right away. But still, it can add up with multiple channels.
答案1
得分: 0
根据链接的评论,我编写了以下代码:
package notif
import (
"context"
"sync"
"time"
"unsafe"
)
type Client struct {
recv chan interface{}
ch *Channel
o sync.Once
ctx context.Context
cancel context.CancelFunc
}
// 如果此客户端只能写入,则为nil
func (c *Client) Listen() <-chan interface{} {
return c.recv
}
func (c *Client) Close() {
select {
case <-c.ctx.Done():
case c.ch.unsubscribe <- c:
}
}
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) doClose() {
c.o.Do(func() {
c.cancel()
if c.recv != nil {
close(c.recv)
}
})
}
func (c *Client) send(msg interface{}) {
// 只写客户端不处理任何消息
if c.recv == nil {
return
}
t := time.NewTimer(c.ch.sc)
select {
case <-c.ctx.Done():
case c.recv <- msg:
case <-t.C:
// 超时/消费者缓慢,关闭连接
c.Close()
}
}
func (c *Client) Broadcast(payload interface{}) bool {
select {
case <-c.ctx.Done():
return false
default:
c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}
return true
}
}
type envelope struct {
Message interface{}
Sender uintptr
}
// leech 是阻塞通道,因此应在内部调用goroutine使其非阻塞
// 这是为了确保按正确的顺序处理leeched消息。
func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {
return &Channel{
name: name,
ingres: make(chan interface{}, 1000),
subscribe: make(chan *Client, 1000),
unsubscribe: make(chan *Client, 1000),
aud: make(map[*Client]struct{}, 1000),
ctx: ctx,
sc: slowConsumer,
empty: emptyCh,
leech: leech,
}
}
type Channel struct {
name string
ingres chan interface{}
subscribe chan *Client
unsubscribe chan *Client
aud map[*Client]struct{}
ctx context.Context
sc time.Duration
empty chan string
leech func(interface{})
}
func (ch *Channel) Id() string {
return ch.name
}
// 订阅默认为读写。通过提供"writeOnly=true",可以切换为只写模式
// 在这种情况下,客户端不会因为读取速度慢而断开连接。
func (ch *Channel) Subscribe(writeOnly ...bool) *Client {
c := &Client{
ch: ch,
}
if len(writeOnly) == 0 || writeOnly[0] == false {
c.recv = make(chan interface{})
}
c.ctx, c.cancel = context.WithCancel(ch.ctx)
ch.subscribe <- c
return c
}
func (ch *Channel) Broadcast() chan<- interface{} {
return ch.ingres
}
// 在上下文取消后返回
func (ch *Channel) Start() {
for {
select {
case <-ch.ctx.Done():
for cl := range ch.aud {
delete(ch.aud, cl)
cl.doClose()
}
return
case cl := <-ch.subscribe:
ch.aud[cl] = struct{}{}
case cl := <-ch.unsubscribe:
delete(ch.aud, cl)
cl.doClose()
if len(ch.aud) == 0 {
ch.signalEmpty()
}
case msg := <-ch.ingres:
e, ok := msg.(*envelope)
if ok {
msg = e.Message
}
for cl := range ch.aud {
if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {
go cl.send(e.Message)
}
}
if ch.leech != nil {
ch.leech(msg)
}
}
}
}
func (ch *Channel) signalEmpty() {
if ch.empty == nil {
return
}
select {
case ch.empty <- ch.name:
default:
}
}
type subscribeRequest struct {
name string
recv chan *Client
wo bool
}
type broadcastRequest struct {
name string
recv chan *Channel
}
type brokeredChannel struct {
ch *Channel
cancel context.CancelFunc
}
type brokerLeech interface {
Match(string) func(interface{})
}
func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {
return &Broker{
chans: make(map[string]*brokeredChannel, 100),
subscribe: make(chan *subscribeRequest, 10),
broadcast: make(chan *broadcastRequest, 10),
ctx: ctx,
sc: slowConsumer,
empty: make(chan string, 10),
leech: leech,
}
}
type Broker struct {
chans map[string]*brokeredChannel
subscribe chan *subscribeRequest
broadcast chan *broadcastRequest
ctx context.Context
sc time.Duration
empty chan string
leech brokerLeech
}
// 在上下文取消后返回
func (b *Broker) Start() {
for {
select {
case <-b.ctx.Done():
return
case req := <-b.subscribe:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch.Subscribe(req.wo)
case req := <-b.broadcast:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch
case name := <-b.empty:
if ch, ok := b.chans[name]; ok {
ch.cancel()
delete(b.chans, name)
}
}
}
}
// 订阅默认为读写。通过提供"writeOnly=true",可以切换为只写模式
// 在这种情况下,客户端不会因为读取速度慢而断开连接。
func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {
req := &subscribeRequest{
name: name,
recv: make(chan *Client),
wo: len(writeOnly) > 0 && writeOnly[0] == true,
}
b.subscribe <- req
c := <-req.recv
close(req.recv)
return c
}
func (b *Broker) Broadcast(name string) chan<- interface{} {
req := &broadcastRequest{name: name, recv: make(chan *Channel)}
b.broadcast <- req
ch := <-req.recv
close(req.recv)
return ch.ingres
}
希望对你有帮助!如果有任何其他问题,请随时问我。
英文:
Based on the linked comments I have came up with this code:
package notif
import (
"context"
"sync"
"time"
"unsafe"
)
type Client struct {
recv chan interface{}
ch *Channel
o sync.Once
ctx context.Context
cancel context.CancelFunc
}
// will be nil if this client is write-only
func (c *Client) Listen() <-chan interface{} {
return c.recv
}
func (c *Client) Close() {
select {
case <-c.ctx.Done():
case c.ch.unsubscribe <- c:
}
}
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) doClose() {
c.o.Do(func() {
c.cancel()
if c.recv != nil {
close(c.recv)
}
})
}
func (c *Client) send(msg interface{}) {
// write-only clients will not handle any messages
if c.recv == nil {
return
}
t := time.NewTimer(c.ch.sc)
select {
case <-c.ctx.Done():
case c.recv <- msg:
case <-t.C:
// time out/slow consumer, close the connection
c.Close()
}
}
func (c *Client) Broadcast(payload interface{}) bool {
select {
case <-c.ctx.Done():
return false
default:
c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}
return true
}
}
type envelope struct {
Message interface{}
Sender uintptr
}
// leech is channel-blocking so goroutine should be called internally to make it non-blocking
// this is to ensure proper order of leeched messages.
func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {
return &Channel{
name: name,
ingres: make(chan interface{}, 1000),
subscribe: make(chan *Client, 1000),
unsubscribe: make(chan *Client, 1000),
aud: make(map[*Client]struct{}, 1000),
ctx: ctx,
sc: slowConsumer,
empty: emptyCh,
leech: leech,
}
}
type Channel struct {
name string
ingres chan interface{}
subscribe chan *Client
unsubscribe chan *Client
aud map[*Client]struct{}
ctx context.Context
sc time.Duration
empty chan string
leech func(interface{})
}
func (ch *Channel) Id() string {
return ch.name
}
// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (ch *Channel) Subscribe(writeOnly ...bool) *Client {
c := &Client{
ch: ch,
}
if len(writeOnly) == 0 || writeOnly[0] == false {
c.recv = make(chan interface{})
}
c.ctx, c.cancel = context.WithCancel(ch.ctx)
ch.subscribe <- c
return c
}
func (ch *Channel) Broadcast() chan<- interface{} {
return ch.ingres
}
// returns once context is cancelled
func (ch *Channel) Start() {
for {
select {
case <-ch.ctx.Done():
for cl := range ch.aud {
delete(ch.aud, cl)
cl.doClose()
}
return
case cl := <-ch.subscribe:
ch.aud[cl] = struct{}{}
case cl := <-ch.unsubscribe:
delete(ch.aud, cl)
cl.doClose()
if len(ch.aud) == 0 {
ch.signalEmpty()
}
case msg := <-ch.ingres:
e, ok := msg.(*envelope)
if ok {
msg = e.Message
}
for cl := range ch.aud {
if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {
go cl.send(e.Message)
}
}
if ch.leech != nil {
ch.leech(msg)
}
}
}
}
func (ch *Channel) signalEmpty() {
if ch.empty == nil {
return
}
select {
case ch.empty <- ch.name:
default:
}
}
type subscribeRequest struct {
name string
recv chan *Client
wo bool
}
type broadcastRequest struct {
name string
recv chan *Channel
}
type brokeredChannel struct {
ch *Channel
cancel context.CancelFunc
}
type brokerLeech interface {
Match(string) func(interface{})
}
func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {
return &Broker{
chans: make(map[string]*brokeredChannel, 100),
subscribe: make(chan *subscribeRequest, 10),
broadcast: make(chan *broadcastRequest, 10),
ctx: ctx,
sc: slowConsumer,
empty: make(chan string, 10),
leech: leech,
}
}
type Broker struct {
chans map[string]*brokeredChannel
subscribe chan *subscribeRequest
broadcast chan *broadcastRequest
ctx context.Context
sc time.Duration
empty chan string
leech brokerLeech
}
// returns once context is cancelled
func (b *Broker) Start() {
for {
select {
case <-b.ctx.Done():
return
case req := <-b.subscribe:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch.Subscribe(req.wo)
case req := <-b.broadcast:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch
case name := <-b.empty:
if ch, ok := b.chans[name]; ok {
ch.cancel()
delete(b.chans, name)
}
}
}
}
// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {
req := &subscribeRequest{
name: name,
recv: make(chan *Client),
wo: len(writeOnly) > 0 && writeOnly[0] == true,
}
b.subscribe <- req
c := <-req.recv
close(req.recv)
return c
}
func (b *Broker) Broadcast(name string) chan<- interface{} {
req := &broadcastRequest{name: name, recv: make(chan *Channel)}
b.broadcast <- req
ch := <-req.recv
close(req.recv)
return ch.ingres
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论