如何使用通道广播消息

huangapple go评论97阅读模式
英文:

How to broadcast message using channel

问题

我是新手,正在尝试创建一个简单的聊天服务器,客户端可以向所有连接的客户端广播消息。

在我的服务器中,我有一个goroutine(无限循环),用于接受连接,所有的连接都通过一个通道接收。

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
    }
}()

然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代通道来向所有连接广播消息。

for c := range ch {
    conn.Write(msg)
}

然而,我无法进行广播,因为(我认为是根据阅读文档得出的结论)在迭代之前需要关闭通道。我不确定何时应该关闭通道,因为我希望不断接受新的连接,关闭通道将不允许我这样做。如果有人可以帮助我,或者提供更好的方法来向所有连接的客户端广播消息,我将不胜感激。

英文:

I am new to go and I am trying to create a simple chat server where clients can broadcast messages to all connected clients.

In my server, I have a goroutine (infinite for loop) that accepts connection and all the connections are received by a channel.

go func() {
	for {
		conn, _ := listener.Accept()
        ch &lt;- conn
		}
}()

Then, I start a handler (goroutine) for every connected client. Inside the handler, I try to broadcast to all connections by iterating through the channel.

for c := range ch {
	conn.Write(msg)
}

However, I cannot broadcast because (I think from reading the docs) the channel needs to be closed before iterating. I am not sure when I should close the channel because I want to continuously accept new connections and closing the channel won't let me do that. If anyone can help me, or provide a better way to broadcast messages to all connected clients, it would be appreciated.

答案1

得分: 73

你正在做的是扇出模式,也就是说,多个终端点监听单个输入源。这种模式的结果是,只有这些监听器中的一个能够在输入源中有消息时接收到消息。唯一的例外是通道的关闭。所有的监听器都会识别到这个关闭操作,从而实现了“广播”。

但是你想要做的是从连接中广播一条消息,所以我们可以这样做:

当监听器数量已知时

让每个工作器监听一个专用的广播通道,并将主通道中的消息分发到每个专用的广播通道。

type worker struct {
    source chan interface{}
    quit   chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // 为了避免阻塞,设置一些缓冲区大小
    go func() {
        for {
            select {
            case msg := <-w.source:
                // 处理消息
            case <-w.quit: // 在最后一节中会解释这个
                return
            }
        }
    }()
}

然后我们可以创建一组工作器:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers {
    worker.Start()
}

然后启动我们的监听器:

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
    }
}()

以及一个分发器:

go func() {
    for {
        msg := <-ch
        for _, worker := range workers {
            worker.source <- msg
        }
    }
}()

当监听器数量未知时

在这种情况下,上面给出的解决方案仍然适用。唯一的区别是,每当你需要一个新的工作器时,你需要创建一个新的工作器,启动它,然后将它推入workers切片中。但是这种方法需要一个线程安全的切片,需要在其周围加锁。以下是其中一种实现方式:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

每当你想要启动一个工作器时:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

然后你的分发器将改变为:

go func() {
    for {
        msg := <-ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

最后:不要留下悬挂的goroutine

一个好的实践是:不要留下悬挂的goroutine。所以当你完成监听时,你需要关闭你启动的所有goroutine。这可以通过worker中的quit通道来实现:

首先,我们需要创建一个全局的quit信号通道:

globalQuit := make(chan struct{})

每当我们创建一个工作器时,我们将globalQuit通道分配给它作为退出信号:

worker.quit = globalQuit

然后当我们想要关闭所有工作器时,我们只需执行:

close(globalQuit)

由于close操作会被所有监听的goroutine识别到(这是你理解的关键点),所有的goroutine都会返回。记得关闭你的分发器例程,但是这部分我留给你自己去完成 如何使用通道广播消息

英文:

What you are doing is a fan out pattern, that is to say, multiple endpoints are listening to a single input source. The result of this pattern is, only one of these listeners will be able to get the message whenever there's a message in the input source. The only exception is a close of channel. This close will be recognized by all of the listeners, and thus a "broadcast".

But what you want to do is broadcasting a message read from connection, so we could do something like this:

##When the number of listeners is known
Let each worker listen to dedicated broadcast channel, and dispatch the message from the main channel to each dedicated broadcast channel.

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := &lt;-w.source
                // do something with msg
            case &lt;-quit: // will explain this in the last section
                return
            }
        }
    }()
}

And then we could have a bunch of workers:

workers := []*worker{&amp;worker{}, &amp;worker{}}
for _, worker := range workers { worker.Start() }

Then start our listener:

go func() {
	for {
		conn, _ := listener.Accept()
        ch &lt;- conn
		}
}()

And a dispatcher:

go func() {
    for {
        msg := &lt;- ch
        for _, worker := workers {
            worker.source &lt;- msg
        }
    }
}()

##When the number of listeners is not known
In this case, the solution given above still works. The only difference is, whenever you need a new worker, you need to create a new worker, start it up, and then push it into workers slice. But this method requires a thread-safe slice, which need a lock around it. One of the implementation may look like as follows:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

Whenever you want to start a worker:

w := &amp;worker{}
w.Start()
threadSafeSlice.Push(w)

And your dispatcher will be changed to:

go func() {
    for {
        msg := &lt;- ch
        threadSafeSlice.Iter(func(w *worker) { w.source &lt;- msg })
    }
}()

##Last words: never leave a dangling goroutine
One of the good practices is: never leave a dangling goroutine. So when you finished listening, you need to close all of the goroutines you fired. This will be done via quit channel in worker:

First we need to create a global quit signalling channel:

globalQuit := make(chan struct{})

And whenever we create a worker, we assign the globalQuit channel to it as its quit signal:

worker.quit = globalQuit

Then when we want to shutdown all workers, we simply do:

close(globalQuit)

Since close will be recognized by all listening goroutines (this is the point you understood), all goroutines will be returned. Remember to close your dispatcher routine as well, but I will leave it to you 如何使用通道广播消息

答案2

得分: 70

一种更优雅的解决方案是使用"broker",客户端可以订阅和取消订阅消息。

为了优雅地处理订阅和取消订阅,我们可以利用通道来实现,因此,接收和分发消息的代理的主循环可以使用单个select语句来处理所有这些,并且同步是由解决方案的性质提供的。

另一个技巧是将订阅者存储在一个映射中,将通道映射到用于向它们分发消息的通道。因此,将通道用作映射中的键,然后添加和删除客户端就非常简单。这是可能的,因为通道值是可比较的,并且它们的比较非常高效,因为通道值只是指向通道描述符的简单指针。

废话不多说,这是一个简单的代理实现:

type Broker[T any] struct {
	stopCh    chan struct{}
	publishCh chan T
	subCh     chan chan T
	unsubCh   chan chan T
}

func NewBroker[T any]() *Broker[T] {
	return &Broker[T]{
		stopCh:    make(chan struct{}),
		publishCh: make(chan T, 1),
		subCh:     make(chan chan T, 1),
		unsubCh:   make(chan chan T, 1),
	}
}

func (b *Broker[T]) Start() {
	subs := map[chan T]struct{}{}
	for {
		select {
		case <-b.stopCh:
			return
		case msgCh := <-b.subCh:
			subs[msgCh] = struct{}{}
		case msgCh := <-b.unsubCh:
			delete(subs, msgCh)
		case msg := <-b.publishCh:
			for msgCh := range subs {
				// msgCh is buffered, use non-blocking send to protect the broker:
				select {
				case msgCh <- msg:
				default:
				}
			}
		}
	}
}

func (b *Broker[T]) Stop() {
	close(b.stopCh)
}

func (b *Broker[T]) Subscribe() chan T {
	msgCh := make(chan T, 5)
	b.subCh <- msgCh
	return msgCh
}

func (b *Broker[T]) Unsubscribe(msgCh chan T) {
	b.unsubCh <- msgCh
}

func (b *Broker[T]) Publish(msg T) {
	b.publishCh <- msg
}

以下是使用示例:

func main() {
	// 创建并启动代理:
	b := NewBroker[string]()
	go b.Start()

	// 创建并订阅3个客户端:
	clientFunc := func(id int) {
		msgCh := b.Subscribe()
		for {
			fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
		}
	}
	for i := 0; i < 3; i++ {
		go clientFunc(i)
	}

	// 开始发布消息:
	go func() {
		for msgId := 0; ; msgId++ {
			b.Publish(fmt.Sprintf("msg#%d", msgId))
			time.Sleep(300 * time.Millisecond)
		}
	}()

	time.Sleep(time.Second)
}

上述代码的输出结果如下(可以在Go Playground上尝试):

Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3

改进:

你可以考虑以下改进。这些改进可能有用,具体取决于你如何使用代理。

Broker.Unsubscribe()可以关闭消息通道,表示不会再向其发送更多消息:

func (b *Broker[T]) Unsubscribe(msgCh chan T) {
	b.unsubCh <- msgCh
	close(msgCh)
}

这将允许客户端通过range循环遍历消息通道,例如:

msgCh := b.Subscribe()
for msg := range msgCh {
	fmt.Printf("Client %d got message: %v\n", id, msg)
}

然后,如果有人像这样取消订阅msgCh

b.Unsubscribe(msgCh)

上述range循环将在处理调用Unsubscribe()之前发送的所有消息后终止。

如果你希望客户端依赖于消息通道被关闭,并且代理的生命周期比应用程序的生命周期更短,那么你还可以在Start()方法中在代理停止时关闭所有已订阅的客户端,如下所示:

case <-b.stopCh:
	for msgCh := range subs {
		close(msgCh)
	}
	return
英文:

A more elegant solution is a "broker", where clients may subscribe and unsubscribe to messages.

To also handle subscribing and unsubscribing elegantly, we may utilize channels for this, so the main loop of the broker which receives and distributes the messages can incorporate all these using a single select statement, and synchronization is given from the solution's nature.

Another trick is to store the subscribers in a map, mapping from the channel we use to distribute messages to them. So use the channel as the key in the map, and then adding and removing the clients is "dead" simple. This is made possible because channel values are comparable, and their comparison is very efficient as channel values are simple pointers to channel descriptors.

Without further ado, here's a simple broker implementation:

type Broker[T any] struct {
stopCh    chan struct{}
publishCh chan T
subCh     chan chan T
unsubCh   chan chan T
}
func NewBroker[T any]() *Broker[T] {
return &amp;Broker[T]{
stopCh:    make(chan struct{}),
publishCh: make(chan T, 1),
subCh:     make(chan chan T, 1),
unsubCh:   make(chan chan T, 1),
}
}
func (b *Broker[T]) Start() {
subs := map[chan T]struct{}{}
for {
select {
case &lt;-b.stopCh:
return
case msgCh := &lt;-b.subCh:
subs[msgCh] = struct{}{}
case msgCh := &lt;-b.unsubCh:
delete(subs, msgCh)
case msg := &lt;-b.publishCh:
for msgCh := range subs {
// msgCh is buffered, use non-blocking send to protect the broker:
select {
case msgCh &lt;- msg:
default:
}
}
}
}
}
func (b *Broker[T]) Stop() {
close(b.stopCh)
}
func (b *Broker[T]) Subscribe() chan T {
msgCh := make(chan T, 5)
b.subCh &lt;- msgCh
return msgCh
}
func (b *Broker[T]) Unsubscribe(msgCh chan T) {
b.unsubCh &lt;- msgCh
}
func (b *Broker[T]) Publish(msg T) {
b.publishCh &lt;- msg
}

Example using it:

func main() {
// Create and start a broker:
b := NewBroker[string]()
go b.Start()
// Create and subscribe 3 clients:
clientFunc := func(id int) {
msgCh := b.Subscribe()
for {
fmt.Printf(&quot;Client %d got message: %v\n&quot;, id, &lt;-msgCh)
}
}
for i := 0; i &lt; 3; i++ {
go clientFunc(i)
}
// Start publishing messages:
go func() {
for msgId := 0; ; msgId++ {
b.Publish(fmt.Sprintf(&quot;msg#%d&quot;, msgId))
time.Sleep(300 * time.Millisecond)
}
}()
time.Sleep(time.Second)
}

Output of the above will be (try it on the Go Playground):

Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3

Improvements

You may consider the following improvements. These may or may not be useful depending on how / to what you use the broker.

Broker.Unsubscribe() may close the message channel, signalling that no more messages will be sent on it:

func (b *Broker[T]) Unsubscribe(msgCh chan T) {
b.unsubCh &lt;- msgCh
close(msgCh)
}

This would allow clients to range over the message channel, like this:

msgCh := b.Subscribe()
for msg := range msgCh {
fmt.Printf(&quot;Client %d got message: %v\n&quot;, id, msg)
}

Then if someone unsubscribes this msgCh like this:

b.Unsubscribe(msgCh)

The above range loop will terminate after processing all messages that were sent before the call to Unsubscribe().

If you want your clients to rely on the message channel being closed, and the broker's lifetime is narrower than your app's lifetime, then you could also close all subscribed clients when the broker is stopped, in the Start() method like this:

case &lt;-b.stopCh:
for msgCh := range subs {
close(msgCh)
}
return

答案3

得分: 5

在你的情况下,将广播发送到通道的切片,并使用sync.Mutex来管理通道的添加和删除可能是最简单的方法。

以下是你可以在Go中执行“广播”的方法:

  • 你可以使用sync.Cond来广播共享状态的更改。这种方式在设置完成后不会有任何分配,但无法添加超时功能或与另一个通道一起使用。
  • 你可以通过关闭旧通道并创建新通道和sync.Mutex来广播共享状态的更改。这种方式每次状态更改都会有一个分配,但可以添加超时功能并与另一个通道一起使用。
  • 你可以广播到函数回调的切片,并使用sync.Mutex来管理它们。调用者可以处理通道相关的事务。这种方式每个调用者都会有多个分配,并且可以与另一个通道一起使用。
  • 你可以广播到通道的切片,并使用sync.Mutex来管理它们。这种方式每个调用者都会有多个分配,并且可以与另一个通道一起使用。
  • 你可以广播到sync.WaitGroup的切片,并使用sync.Mutex来管理它们。
英文:

Broadcast to a slice of channel and use sync.Mutex to manage channel add and remove may be the easiest way in your case.

Here is what you can do to broadcast in golang:

  • You can broadcast a share status change with sync.Cond. This way do not have any alloc once setup, but you can not add timeout functional or work with another channel.
  • You can broadcast a share status change with a close old channel and create new channel and sync.Mutex. This way have one alloc per status change, but you can add timeout functional and work with another channel.
  • You can broadcast to a slice of function callback and use sync.Mutex to manage them. The caller can do channel stuff. This way have more than one alloc per caller, and work with another channel.
  • You can broadcast to a slice of channel and use sync.Mutex to manage them. This way have more than one alloc per caller, and work with another channel.
  • You can broadcast to a slice of sync.WaitGroup and use sync.Mutex to manage them.

答案4

得分: 4

这是一个晚回答,但我认为它可能会满足一些好奇的读者。

在并发编程中,Go 语言的通道被广泛使用。

Go 社区坚持遵循以下原则:

不要通过共享内存来通信,而应通过通信来共享内存。

我对此持完全中立的态度,我认为在广播方面应该考虑除了明确定义的“通道”之外的其他选项。

这是我的观点:sync 包中的 Cond(条件变量)经常被忽视。在同样的上下文中,按照 Bronze man 的建议实现广播器是值得注意的。

我对 icza 的建议感到高兴,他建议使用通道并通过通道广播消息。我遵循相同的方法,并使用 sync 的条件变量:

// Broadcaster 是封装广播的结构体
type Broadcaster struct {
	cond        *sync.Cond
	subscribers map[interface{}]func(interface{})
	message     interface{}
	running     bool
}

这是我们整个广播概念所依赖的主要结构体。

接下来,我为这个结构体定义了一些行为。简而言之,应该能够添加、删除订阅者,并且整个过程应该是可撤销的。

// SetupBroadcaster 提供广播器对象以供后续消息使用
func SetupBroadcaster() *Broadcaster {

	return &Broadcaster{
		cond:        sync.NewCond(&sync.RWMutex{}),
		subscribers: map[interface{}]func(interface{}){},
	}
}

// Subscribe 允许其他人参与广播事件!
func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {

	b.subscribers[id] = f
}

// Unsubscribe 停止接收广播
func (b *Broadcaster) Unsubscribe(id interface{}) {
	b.cond.L.Lock()
	delete(b.subscribers, id)
	b.cond.L.Unlock()
}

// Publish 发布消息
func (b *Broadcaster) Publish(message interface{}) {
	go func() {
		b.cond.L.Lock()

		b.message = message
		b.cond.Broadcast()
		b.cond.L.Unlock()
	}()
}

// Start 启动主要的广播事件
func (b *Broadcaster) Start() {
	b.running = true
	for b.running {
		b.cond.L.Lock()
		b.cond.Wait()
		go func() {
			for _, f := range b.subscribers {
				f(b.message) // 发布消息
			}
		}()
		b.cond.L.Unlock()
	}

}

// Stop 停止广播事件
func (b *Broadcaster) Stop() {
	b.running = false
}

接下来,我可以很容易地使用它:

messageToaster := func(message interface{}) {
	fmt.Printf("[New Message]: %v\n", message)
}
unwillingReceiver := func(message interface{}) {
	fmt.Println("Do not disturb!")
}
broadcaster := SetupBroadcaster()
broadcaster.Subscribe(1, messageToaster)
broadcaster.Subscribe(2, messageToaster)
broadcaster.Subscribe(3, unwillingReceiver)

go broadcaster.Start()

broadcaster.Publish("Hello!")

time.Sleep(time.Second)
broadcaster.Unsubscribe(3)
broadcaster.Publish("Goodbye!")

它应该以任意顺序打印出类似以下的内容:

[New Message]: Hello!
Do not disturb!
[New Message]: Hello!
[New Message]: Goodbye!
[New Message]: Goodbye!

Go Playground 上查看示例。

英文:

This is a late answer but I think it may appease some curious readers.

Go channels are widely welcomed to be used when it comes to concurrency.

Go community is rigid to follow this saying:

> Do not communicate by sharing memory; instead, share memory by communicating.

I am completely neutral toward this and I think other options rather than well-defined channels should be considered when it comes to broadcasting.

Here is my take: Cond from sync packages are widely overlooked. Implementing braodcaster as suggested by Bronze man in very same context worths noting.

I was delighted witch icza suggestion to use channels and broadcast messages over them. I follow the same methods and use sync's conditional variable:

// Broadcaster is the struct which encompasses broadcasting
type Broadcaster struct {
	cond        *sync.Cond
	subscribers map[interface{}]func(interface{})
	message     interface{}
	running     bool
}

this is the main struct that our whole broadcasting concept relies on.

Below, I define some behaviours for this struct. In a nutshell, subscribers should be able to be added, removed and whole the process should be revokable.

    // SetupBroadcaster gives the broadcaster object to be used further in messaging
    func SetupBroadcaster() *Broadcaster {
    
    	return &amp;Broadcaster{
    		cond:        sync.NewCond(&amp;sync.RWMutex{}),
    		subscribers: map[interface{}]func(interface{}){},
    	}
    }
    
    // Subscribe let others enroll in broadcast event!
    func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {
    
    	b.subscribers[id] = f
    }
    
    // Unsubscribe stop receiving broadcasting
    func (b *Broadcaster) Unsubscribe(id interface{}) {
    	b.cond.L.Lock()
    	delete(b.subscribers, id)
    	b.cond.L.Unlock()
    }
    
    // Publish publishes the message
    func (b *Broadcaster) Publish(message interface{}) {
    	go func() {
    		b.cond.L.Lock()
    
    		b.message = message
    		b.cond.Broadcast()
    		b.cond.L.Unlock()
    	}()
    }
    
    // Start the main broadcasting event
    func (b *Broadcaster) Start() {
    	b.running = true
    	for b.running {
    		b.cond.L.Lock()
    		b.cond.Wait()
    		go func() {
    			for _, f := range b.subscribers {
    				f(b.message) // publishes the message
    			}
    		}()
    		b.cond.L.Unlock()
    	}
    
    }
    
    // Stop broadcasting event
    func (b *Broadcaster) Stop() {
    	b.running = false
    }

Next, I can use it quite easily:

	messageToaster := func(message interface{}) {
		fmt.Printf(&quot;[New Message]: %v\n&quot;, message)
	}
	unwillingReceiver := func(message interface{}) {
		fmt.Println(&quot;Do not disturb!&quot;)
	}
	broadcaster := SetupBroadcaster()
	broadcaster.Subscribe(1, messageToaster)
	broadcaster.Subscribe(2, messageToaster)
	broadcaster.Subscribe(3, unwillingReceiver)

	go broadcaster.Start()

	broadcaster.Publish(&quot;Hello!&quot;)

	time.Sleep(time.Second)
	broadcaster.Unsubscribe(3)
	broadcaster.Publish(&quot;Goodbye!&quot;)

It should print something like this in any order:

[New Message]: Hello!
Do not disturb!
[New Message]: Hello!
[New Message]: Goodbye!
[New Message]: Goodbye!

See this on go playground

答案5

得分: 3

另一个简单的示例:

type Broadcaster struct {
	mu      sync.Mutex
	clients map[int64]chan struct{}
}

func NewBroadcaster() *Broadcaster {
	return &Broadcaster{
		clients: make(map[int64]chan struct{}),
	}
}

func (b *Broadcaster) Subscribe(id int64) (<-chan struct{}, error) {
	defer b.mu.Unlock()
	b.mu.Lock()
	s := make(chan struct{}, 1)

	if _, ok := b.clients[id]; ok {
		return nil, fmt.Errorf("signal %d already exist", id)
	}

	b.clients[id] = s

	return b.clients[id], nil
}

func (b *Broadcaster) Unsubscribe(id int64) {
	defer b.mu.Unlock()
	b.mu.Lock()
	if _, ok := b.clients[id]; ok {
		close(b.clients[id])
	}

	delete(b.clients, id)
}

func (b *Broadcaster) broadcast() {
	defer b.mu.Unlock()
	b.mu.Lock()
	for k := range b.clients {
		if len(b.clients[k]) == 0 {
			b.clients[k] <- struct{}{}
		}
	}
}

type testClient struct {
	name     string
	signal   <-chan struct{}
	signalID int64
	brd      *Broadcaster
}

func (c *testClient) doWork() {
	i := 0
	for range c.signal {
		fmt.Println(c.name, "do work", i)
		if i > 2 {
			c.brd.Unsubscribe(c.signalID)
			fmt.Println(c.name, "unsubscribed")
		}
		i++
	}
	fmt.Println(c.name, "done")
}

func main() {
	var err error
	brd := NewBroadcaster()

	clients := make([]*testClient, 0)

	for i := 0; i < 3; i++ {
		c := &testClient{
			name:     fmt.Sprint("client:", i),
			signalID: time.Now().UnixNano() + int64(i), // +int64(i) for play.golang.org
			brd:      brd,
		}
		c.signal, err = brd.Subscribe(c.signalID)
		if err != nil {
			log.Fatal(err)
		}

		clients = append(clients, c)
	}

	for i := 0; i < len(clients); i++ {
		go clients[i].doWork()
	}

	for i := 0; i < 6; i++ {
		brd.broadcast()
		time.Sleep(time.Second)
	}
}

输出结果:

client:0 do work 0
client:2 do work 0
client:1 do work 0
client:2 do work 1
client:0 do work 1
client:1 do work 1
client:2 do work 2
client:0 do work 2
client:1 do work 2
client:2 do work 3
client:2 unsubscribed
client:2 done
client:0 do work 3
client:0 unsubscribed
client:0 done
client:1 do work 3
client:1 unsubscribed
client:1 done
英文:

another one simple example:
https://play.golang.org

    
type Broadcaster struct {
mu      sync.Mutex
clients map[int64]chan struct{}
}
func NewBroadcaster() *Broadcaster {
return &amp;Broadcaster{
clients: make(map[int64]chan struct{}),
}
}
func (b *Broadcaster) Subscribe(id int64) (&lt;-chan struct{}, error) {
defer b.mu.Unlock()
b.mu.Lock()
s := make(chan struct{}, 1)
if _, ok := b.clients[id]; ok {
return nil, fmt.Errorf(&quot;signal %d already exist&quot;, id)
}
b.clients[id] = s
return b.clients[id], nil
}
func (b *Broadcaster) Unsubscribe(id int64) {
defer b.mu.Unlock()
b.mu.Lock()
if _, ok := b.clients[id]; ok {
close(b.clients[id])
}
delete(b.clients, id)
}
func (b *Broadcaster) broadcast() {
defer b.mu.Unlock()
b.mu.Lock()
for k := range b.clients {
if len(b.clients[k]) == 0 {
b.clients[k] &lt;- struct{}{}
}
}
}
type testClient struct {
name     string
signal   &lt;-chan struct{}
signalID int64
brd      *Broadcaster
}
func (c *testClient) doWork() {
i := 0
for range c.signal {
fmt.Println(c.name, &quot;do work&quot;, i)
if i &gt; 2 {
c.brd.Unsubscribe(c.signalID)
fmt.Println(c.name, &quot;unsubscribed&quot;)
}
i++
}
fmt.Println(c.name, &quot;done&quot;)
}
func main() {
var err error
brd := NewBroadcaster()
clients := make([]*testClient, 0)
for i := 0; i &lt; 3; i++ {
c := &amp;testClient{
name:     fmt.Sprint(&quot;client:&quot;, i),
signalID: time.Now().UnixNano()+int64(i), // +int64(i) for play.golang.org
brd:      brd,
}
c.signal, err = brd.Subscribe(c.signalID)
if err != nil {
log.Fatal(err)
}
clients = append(clients, c)
}
for i := 0; i &lt; len(clients); i++ {
go clients[i].doWork()
}
for i := 0; i &lt; 6; i++ {
brd.broadcast()
time.Sleep(time.Second)
}
}

output:

client:0 do work 0
client:2 do work 0
client:1 do work 0
client:2 do work 1
client:0 do work 1
client:1 do work 1
client:2 do work 2
client:0 do work 2
client:1 do work 2
client:2 do work 3
client:2 unsubscribed
client:2 done
client:0 do work 3
client:0 unsubscribed
client:0 done
client:1 do work 3
client:1 unsubscribed
client:1 done

答案6

得分: 1

因为Go通道遵循通信顺序进程(CSP)模式,通道是一种点对点的通信实体。每个交换中都涉及一个写入者和一个读取者。

然而,每个通道的端点可以在多个goroutine之间共享。这样做是安全的,没有危险的竞争条件。

因此,可以有多个写入者共享写入端。和/或者可以有多个读取者共享读取端。我在另一个答案中对此进行了更详细的说明,其中包括示例。

如果你真的需要广播,不能直接这样做,但是实现一个中间的goroutine来将值复制到一组输出通道中并不难。

英文:

Because Go channels follow the Communicating Sequential Processes (CSP) pattern, channels are a point-to-point communication entity. There is always one writer and one reader involved in each exchange.

However, each channel end can be shared amongst multiple goroutines. This is safe to do - there is no dangerous race condition.

So there can be multiple writers sharing the writing end. And/or there can be multiple readers sharing the reading end. I wrote more on this in a different answer, which includes examples.

If you really need a broadcast, you cannot do this directly, but it is not hard to implement an intermediate goroutine that copies a value out to each of a group of output channels.

答案7

得分: 0

这个问题的规范(也是惯用的Go方式)是使用通道切片,正如Nevetsicza上面推荐的那样。

你应该明确地不要使用回调函数切片。在某些语言中,通常通过传递回调函数来注册观察者,但在这些情况下,你必须在调用回调函数时包装一定量的防御性代码来保护发送者,并且最好通过一个中间的消息传输层将消息生成器(经典观察者模式中的“Subject”)与观察者隔离开来。当你跨越进程边界时,通常会使用发布-订阅模式(JMS代理、gnats、MQ等),但如果主题和观察者都在同一个进程内部,你应该遵循相同的模式(大多数语言都有可用的实现,所以你不需要自己编写)。

不使用回调函数的原因包括:

  1. 除非你构建自己的消息传输层,否则你的主题不再是天真的(它不知道观察者的性质或基数),也不再是无关的(它不关心观察者对消息做什么,只关心消息对任何感兴趣的方面是否可用);
  2. 如果你想要真正的广播,那么你需要假设接收顺序无关紧要 - 理想情况下,每个人都可以同时看到消息,即使在实践中,发送也是迭代的,即使使用通道。但是发送给接收者_n+1_绝对不应该依赖于接收者_n_的接收确认。这不是广播,而是串行赋值。我之所以说赋值,是因为如果你要求一个回调函数,那么在执行回调函数时,你正在强制(即使只是最小程度地)接收者采取一些行为。你基本上将发送者转变为一个协调者,这是一种具有不同用例集的非常不同的模式。
  3. 在没有防御性边界的情况下(例如,在单独的goroutine中使用超时上下文包装每个回调函数调用),你容易受到接收者阻塞的影响 - 这与广播背道而驰。接收(以及可选地,基于广播消息采取任何操作)必须完全异步于原始发送

在Go中使用回调函数提供伪广播是可行的吗?当然可以,但你必须投入大量额外的复杂性来保持代码的整洁性 - 为什么要这样做,当Go提供了一种简单而相当强大的方法呢?上面提到的基于通道的广播示例是很好的示例,几乎每次都应该这样做。

唯一绝对应该使用回调函数的特殊情况是当你不是无关的 - 你确实关心基于发送的消息,接收者采取一些动作(通常是由合同指定的动作)。例如,“我即将卸载这个文件系统,所以请刷新和关闭你的文件句柄,在你完成后让我知道。”(我知道这是一个相当老式的例子,但这是我首先想到的例子。)

英文:

The canonical (and idiomatic go) way to do this is via a slice of channels, as recommended above by Nevets and icza.

You should specifically not use a slice of callbacks. In some languages, you do typically register observers by passing a callback, but in those cases, you have to wrap their invocation in a fair amount of defensive code to protect the sender, and ideally you should have the generator of the message (the "Subject" in classic Observer pattern discussion) segregated from the observers by an intermediate message transport layer. This is where you typically use a pub-sub mesh (JMS brokers, gnats, MQ, whatever) when you're crossing process boundaries, but you should adhere to the same pattern if both subject and observers are internal to the same process (and most languages have available implementations of such mechanisms, so you shouldn't need to roll your own).

The reasons not to use callbacks include:

  1. Unless you build in your own message transport layer, your subject is no longer both naive (it doesn't know the nature or cardinality of the observers) and disinterested (it doesn't care what they do with the message, only that it is made available to any interested parties);
  2. If you want true broadcasting, then you need to act as if the order of receipt does not matter - ideally, everyone can see the message at the same time, even though in practice sending is iterative, even when using channels. But sending to recipient n+1 should absolutely not depend on confirmation of receipt by recipient n. That isn't broadcasting, it's serialized assignment. I say assignment because, if you are asking for a callback, then in executing the callback, you are enforcing (even if only minimally) some behavior to be taken by the recipient. You've basically turned your sender into an orchestrator, which is a very different sort of pattern with a different set of use cases.
  3. Absent a defensive boundary (wrapping each callback invocation in a separate goroutine with a timeout context, e.g.), you are vulnerable to being blocked by a recipient - this is antithetical to broadcasting. Receipt (and optionally, taking any action at all based on) a broadcast message must be entirely asynchronous with respect to the original sending.

Is it doable to provide pseudo-broadcasting by using callbacks in go? Sure, but you have to invest in so much additional complexity to keep things clean - and why would you do that when go provides an easy and rather robust way to do it? The examples of channel-driven broadcasting above are good ones and how you should do it pretty much every time.

The specific exception when you absolutely should use callbacks is when you are not disinterested - you really do care that, on the basis of the sent message, the recipients take some action (and usually something specified by contract). For example, "I am about to unmount this filesystem, so flush and close your filehandles, let me know once you're done." (I know that's a pretty old-fashioned example, but it's the first one that comes to mind.)

huangapple
  • 本文由 发表于 2016年4月5日 12:35:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/36417199.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定