英文:
How to change channel in for-select loop
问题
我想做一个动态通道池,可以监听数十万个通道,并且所有这些通道都在控制之下。正如我期望的那样,如果有太多的通道在监听,我希望它能够自动升级(goroutine => reflect => selectn)。但是在编写selectN通道监视器的代码时,我被通道替换所阻塞。
我想在运行时替换在select循环中的通道,我已经尝试了几次使其可用,但是事情并不顺利。
func Test_Change(t *testing.T) {
type A struct {
ch chan interface{}
}
a := &A{
ch: make(chan interface{}),
}
go func() {
for {
select {
case v := <-a.ch:
fmt.Println(v)
}
}
}()
newCh := make(chan interface{})
go func() {
for i := 0; i < 200; i++ {
a.ch <- i
}
a.ch = newCh
}()
go func() {
for i := 1000; i < 1010; i++ {
newCh <- i
}
}()
for {
select {}
}
}
它被阻塞了。
func Test_Change(t *testing.T) {
type A struct {
ch chan interface{}
bh chan interface{}
}
a := &A{
ch: make(chan interface{}),
bh: make(chan interface{}),
}
notify := make(chan struct{})
go func() {
for {
select {
case v := <-a.ch:
fmt.Println(v)
case <-notify:
fmt.Println("notify")
}
}
}()
newCh := make(chan interface{})
go func() {
for i := 0; i < 200; i++ {
a.ch <- i
}
a.ch = newCh
notify <- struct{}{}
}()
go func() {
for i := 1000; i < 1010; i++ {
newCh <- i
}
}()
for {
select {}
}
}
这次它起作用了。
英文:
I want to do a dynamic channel pool ,which can listen Hundreds of thousands channels and all of these are under control ,and as i excepted i want it to be auto upgradable if there are too much channels listening (goroutine => reflect => selectn)
But during selectN channel watcher coding , i was blocked by channel replacing
I want to replace chan during runtime which is in for select loop, and I have tried it for some times to make it available, but things not went well.
func Test_Change(t *testing.T) {
type A struct {
ch chan interface{}
}
a := &A{
ch: make(chan interface{}),
}
go func() {
for {
select {
case v := <-a.ch:
fmt.Println(v)
}
}
}()
newCh := make(chan interface{})
go func() {
for i := 0; i < 200; i++ {
a.ch <- i
}
a.ch = newCh
}()
go func() {
for i := 1000; i < 1010; i++ {
newCh <- i
}
}()
for {
select {}
}}
It blocked
func Test_Change(t *testing.T) {
type A struct {
ch chan interface{}
bh chan interface{}
}
a := &A{
ch: make(chan interface{}),
bh: make(chan interface{}),
}
notify := make(chan struct{})
go func() {
for {
select {
case v := <-a.ch:
fmt.Println(v)
case <-notify:
fmt.Println("notify")
}
}
}()
newCh := make(chan interface{})
go func() {
for i := 0; i < 200; i++ {
a.ch <- i
}
a.ch = newCh
notify <- struct{}{}
}()
go func() {
for i := 1000; i < 1010; i++ {
newCh <- i
}
}()
for {
select {}
}}
And it worked
答案1
得分: 2
你已经正确确定,使用常规语法(固定数量的情况)无法在任意大的动态通道上使用select
块,但可以使用reflect
包来实现。
然而,我不确定这是否是实现你目标的最佳方式。如果你确实有成千上万个要监视的通道(例如同时连接的成千上万个远程客户端),那么你可以使用“扇入”模式将所有内容写入到非常少的固定数量的通道中,并在其上进行选择。
将代码修改为如下形式:
for {
select {
case <-sigterm:
cleanup()
os.Exit(1)
case msg := <-clients:
// 处理消息...
}
}
func addClient(client chan Message) {
// 扇入:从client读取所有未来的消息,并将它们写入clients通道。
go func(){
for msg := range client {
clients <- msg
}
}()
}
替换通道变量的值不是线程安全的(可能会出现数据竞争),但是允许多个goroutine同时向同一个通道clients
写入和读取数据是完全可以的。
英文:
You have correctly determined that a select
block on an arbitrarily large dynamic number of chans is not possible with the usual syntax (fixed number of cases), and is possible using the reflect
package.
However, I'm not sure this is the best way to achieve your goal. If you do have thousands of channels to watch (e.g. thousands of remote clients connected at the same time), then you may use a "fan-in" pattern to write everything to a very small fixed number of channels, and select on that.
Instead of
for {
select {
case <-sigterm:
cleanup()
os.Exit(1)
case msg := <-client1:
// process msg...
case msg := <-client2:
// process msg...
// HOW CAN I DYNAMICALLY ADD AND REMOVE A CLIENT HERE?
}
}
Think of something like:
for {
select {
case <-sigterm:
cleanup()
os.Exit(1)
case msg := <-clients:
// process msg...
}
}
func addClient(client chan Message) {
// Fan-in: read all future messages from client, and write them
// to clients.
go func(){
for msg := range client {
clients <- msg
}
}()
}
Replacing the value of a channel variable is not thread-safe (can be a data race), however it is perfectly fine to have several goroutines write to and read from the same channel clients
, concurrently.
答案2
得分: 0
a.ch
上的数据竞争是一个严重的错误和设计缺陷。你可以通过使用go test -race
命令运行测试或者使用go run -race program.go
命令运行程序来检测数据竞争。
在for/select循环中,可以替换在其中使用的通道的值,只要这个操作在一个case的代码块内部正确执行,而不是在另一个并发的goroutine中执行。
以下是一个示例代码,它存在数据竞争(不要这样做)。你可以将其保存在你的工作站上,并尝试使用数据竞争检测器进行检测。
这是一个修复后的示例代码,它没有数据竞争。
英文:
The data race on a.ch
is a serious bug and a design flaw. You can detect data races by running the tests with go test -race
or the program with go run -race program.go
.
It is possible to replace the value of the chans used inside a for/select loop, as long as it's properly done inside the body of a case, not in the code of another concurrent goroutine.
replace := time.After(3 * time.Second)
for {
select {
case v, ok := <-ch1:
// use v...
case v, ok := <-ch2:
// use v...
case <-replace:
ch1 = anotherChannel
}
}
This sample runnable code is racy (don't do it). You can save it on your workstation and try with the data race detector.
This fixed sample code is not racy.
答案3
得分: 0
也许这个方法可能会起作用;
非常非常地广泛测试它,它是可行的,但很难做到完美。
我预计它不会完美,而且缺乏关于该方法的详细文档。
它也不是合并输入通道的版本,它始终一次只消耗一个输入通道,这可能会影响性能。
我唯一能保证的是它是无竞争的。
不过,我将把编写一个带有多个输入通道的版本作为读者的练习。
package main
import (
"fmt"
)
func main() {
m := New()
go m.Run()
input := m.Resize(0)
input <- 5
input <- 4
close(input)
input = m.Resize(10)
input <- 6
input <- 7
close(input)
input = m.Resize(2)
input <- 8
input <- 9
close(input)
m.Close()
fmt.Println()
}
type masterOfThings struct {
notify chan notification
wantClose chan chan bool
}
func New() masterOfThings {
return masterOfThings{
notify: make(chan notification, 1),
wantClose: make(chan chan bool),
}
}
type notification struct {
N int
out chan chan interface{}
}
func (m masterOfThings) Resize(n int) chan<- interface{} {
N := notification{
N: n,
out: make(chan chan interface{}, 1),
}
m.notify <- N
return <-N.out
}
func (m masterOfThings) Close() {
closed := make(chan bool)
m.wantClose <- closed
<-closed
}
func (m masterOfThings) Run() {
var input chan interface{}
inputs := []chan interface{}{}
closers := []chan bool{}
defer func() {
for _, c := range closers {
close(c)
}
}()
var wantClose bool
for {
select {
case m := <-m.wantClose:
closers = append(closers, m)
wantClose = true
if len(inputs) < 1 && input == nil {
return
}
case n, ok := <-input:
if !ok {
input = nil
if len(inputs) > 0 {
input = inputs[0]
copy(inputs, inputs[1:])
inputs = inputs[:len(inputs)-1]
} else if wantClose {
return
}
continue
}
fmt.Println(n)
case n := <-m.notify:
nInput := make(chan interface{}, n.N)
if input == nil {
input = nInput
} else {
inputs = append(inputs, nInput)
}
n.out <- nInput
}
}
}
英文:
Maybe this might work;
test it very very extensively, it is doable, but hard to get it right.
I expect it to be not perfect. And it lacks extensive docs about do's and dont's.
It is also not the version that merges input channel, it always ever consume only one input channel at a time. Which might a problem for performance.
The only guarantee i give is that it is race free.
Though I leave as an exercise to the reader the task to write a fan-in-ed version.
package main
import (
"fmt"
)
func main() {
m := New()
go m.Run()
input := m.Resize(0)
input <- 5
input <- 4
close(input)
input = m.Resize(10)
input <- 6
input <- 7
close(input)
input = m.Resize(2)
input <- 8
input <- 9
close(input)
m.Close()
fmt.Println()
}
type masterOfThings struct {
notify chan notification
wantClose chan chan bool
}
func New() masterOfThings {
return masterOfThings{
notify: make(chan notification, 1),
wantClose: make(chan chan bool),
}
}
type notification struct {
N int
out chan chan interface{}
}
func (m masterOfThings) Resize(n int) chan<- interface{} {
N := notification{
N: n,
out: make(chan chan interface{}, 1),
}
m.notify <- N
return <-N.out
}
func (m masterOfThings) Close() {
closed := make(chan bool)
m.wantClose <- closed
<-closed
}
func (m masterOfThings) Run() {
var input chan interface{}
inputs := []chan interface{}{}
closers := []chan bool{}
defer func() {
for _, c := range closers {
close(c)
}
}()
var wantClose bool
for {
select {
case m := <-m.wantClose:
closers = append(closers, m)
wantClose = true
if len(inputs) < 1 && input == nil {
return
}
case n, ok := <-input:
if !ok {
input = nil
if len(inputs) > 0 {
input = inputs[0]
copy(inputs, inputs[1:])
inputs = inputs[:len(inputs)-1]
} else if wantClose {
return
}
continue
}
fmt.Println(n)
case n := <-m.notify:
nInput := make(chan interface{}, n.N)
if input == nil {
input = nInput
} else {
inputs = append(inputs, nInput)
}
n.out <- nInput
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论