英文:
Multiple goroutines listening selectively on one channel
问题
我看了一下这些链接,但是没有一个能真正帮助我解决这个问题。我有多个goroutine,如果通道中的值是针对特定的goroutine的,它们需要执行某些任务。
在上面的代码中,uuid被一个通道接收,但是没有发生任何事情。为了解决这个问题,我尝试改变逻辑,如果某个uuid的逻辑不在该例程中,就将uuid重新插入通道。我知道这是一个不好的做法,而且也不起作用。
你认为正确的做法是什么?
英文:
I have looked at this, this, this and this but none really help me in this situation.
I have multiple goroutines that need to do some task if the value in the channel is for that particular goroutine.
var uuidChan chan string
func handleEntity(entityUuid string) {
go func() {
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
}
case <-time.After(time.Second * 5):
println("Timeout")
return
}
}
}()
}
func main() {
uuidChan = make(chan (string))
for i := 0; i < 5; i++ {
handleEntity(fmt.Sprintf("%d", i))
}
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
}
https://play.golang.org/p/Pu5MhSP9Qtj
In the above logic, uuid is received by one of the channels and nothing happens. To solve this, I tried changing the logic to reinsert the uuid back into the channel if logic for some uuid is not in that routine. I know its a bad practice and that doesn't work too.
func handleEntity(entityUuid string) {
go func() {
var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
} else {
notMe = append(notMe, uuid)
}
case <-time.After(time.Second * 5):
println("Timeout")
defer func() {
for _, uuid := range notMe {
uuidChan <- uuid
}
}()
return
}
}
}()
}
https://play.golang.org/p/5On-Vd7UzqP
What could be the correct way to do this?
答案1
得分: 4
也许你想要将你的通道映射到正确的goroutine,以便立即发送消息:
package main
import (
"fmt"
"time"
)
func worker(u string, c chan string) {
for {
fmt.Printf("在 %s 中接收到 %s\n", u, <-c)
}
}
func main() {
workers := make(map[string]chan string)
for _, u := range []string{"foo", "bar", "baz"} {
workers[u] = make(chan string)
go worker(u, workers[u])
}
workers["foo"] <- "你好"
workers["bar"] <- "世界"
workers["baz"] <- "!"
fmt.Println()
time.Sleep(time.Second)
}
希望这对你有帮助!
英文:
maybe you want to map your channels to send the message to correct goroutine right away:
package main
import (
"fmt"
"time"
)
func worker(u string, c chan string) {
for {
fmt.Printf("got %s in %s\n", <-c, u)
}
}
func main() {
workers := make(map[string]chan string)
for _, u := range []string{"foo", "bar", "baz"} {
workers[u] = make(chan string)
go worker(u, workers[u])
}
workers["foo"] <- "hello"
workers["bar"] <- "world"
workers["baz"] <- "!"
fmt.Println()
time.Sleep(time.Second)
}
答案2
得分: 4
你有一个带有标签的盒子,所以接收者应该先读取标签,然后决定如何处理它。如果你把标签放在盒子里面,你就是在强迫接收者打开盒子(参见解决方案1)。我建议你提供更好的邮政服务,至少把标签放在盒子外面(参见解决方案3),或者更好地将盒子立即送到正确的地址(参见解决方案2):
有很多解决方案,你的想象力是唯一的限制:
1.
由于你只有一个带有ID数据的通道,用于具有ID的消费者,而且你只能一次从通道中读取数据(假设通道中的数据顺序很重要),你有一个简单的解决方案:使用一个读取goroutine从通道中读取数据,然后应用逻辑来决定如何处理这个数据,例如将其发送到另一个goroutine或运行一个任务。
尝试这个:链接。
-
使用每个goroutine一个通道,尝试这个:链接。
-
使用
label
和sync.Cond
的信号广播:
我们有一个盒子,并使用名为label
的共享变量在盒子顶部添加接收者的地址。
首先使用名为label
的共享资源将盒子的label
设置为所需的ID,然后使用信号广播通知所有监听的goroutine唤醒并检查label
和时间,看看是否有一个被寻址和过期,然后所有的goroutine都返回等待状态,被寻址或过期的goroutine继续读取非缓冲通道或退出。然后使用time.AfterFunc
来信号剩余goroutine的过期,最后使用wg.Wait()
等待它们全部加入。请注意,第一个c.Broadcast()
应该在c.Wait()
之后调用,这意味着在第一次调用c.Broadcast()
之前,goroutine应该正在运行,所以一种方法是简单地使用另一个名为w4w
的sync.WaitGroup
,表示wait for wait
。
尝试这个:链接。
希望对你有帮助!
英文:
You have a box with a label inside it, so a reciever should read the label first then decide what to do with it. If you place the label inside the box - you are forcing the reiever to open the box (see the solution number 1). I would encourage you do a better postal service and place the label at least outside of the box (see the solution number 3)- or better deleiver the box to a correct address at once (see the solution number 2):
There are many solutions to this and you only limited by your imagination:
1.
Since you have only one channel with a data with an ID inside it for a consumer with an ID, and you can only read a data from the channel once (assuming the oreder of data inside the channel is important) - you have a simple sulotion: Use a reading goroutine which reads a data from the channel then apply the logic to decide what to do with this data - e.g. send it to another goroutine or run a task.
Try this:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
uuidChan := make(chan string)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t := time.NewTimer(5 * time.Second)
defer t.Stop()
for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("Channel closed.")
return
}
// logic:
wg.Add(1)
// Multiple goroutines listening selectively on one channel
go consume(uuid, &wg)
// switch uuid {case 1: go func1(); case 2: go func2()}
case <-t.C:
fmt.Println("Timeout")
return
}
}
}()
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
close(uuidChan) // free up the goroutine
wg.Wait() // wait until all consumers are done
fmt.Println("All done.")
}
// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
defer wg.Done()
// logic: or decide here based on uuid
fmt.Println("job #:", uuid) // job
}
Output:
job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.
- Using a channel per goroutine, try this:
package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
defer wg.Done()
// for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("closed")
return // free up goroutine on chan closed
}
fmt.Println(uuid)
return // job done
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
return
}
// }
}
func main() {
const max = 5
slice := make([]chan string, max)
var wg sync.WaitGroup
for i := 0; i < max; i++ {
slice[i] = make(chan string, 1)
wg.Add(1)
go handleEntity(slice[i], &wg)
}
for i := 0; i < 4; i++ {
slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel
}
wg.Wait()
fmt.Println("All done.")
}
Output:
3
0
1
2
Timeout
All done.
- Using
label
and signal broadcast ofsync.Cond
:
So we have a box and using shared var namedlabel
we add the address of the reciever on top of the box.
Here using a shared resource namedlabel
first set the boxlabel
to a desired ID then using signal broadcast inform all listenning goroutines to wake up and check thelabel
and time to see if one is addressed and expired or not then all go back to wait state and the addressed or expired one continues to read the unbuffered channel or exit. Then using thetime.AfterFunc
to signal the expiration of the remaining goroutine(s) and finallywg.Wait()
for them all to join. Note that the firstc.Broadcast()
should be called afterc.Wait()
- meaning the goroutines should be running before the first call toc.Broadcast()
, so one way is to simply use anothersync.WaitGroup
namedw4w
short forwait for wait
.
package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(entityUuid string) {
defer wg.Done()
t0 := time.Now()
var expired, addressed bool
w4w.Done()
m.Lock()
for !expired && !addressed {
c.Wait()
addressed = label == entityUuid
expired = time.Since(t0) > d
}
m.Unlock()
fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)
if !expired && addressed {
uuid := <-uuidChan
fmt.Println("matched =", entityUuid, uuid)
}
fmt.Println("done", entityUuid)
}
func main() {
for i := 0; i < 5; i++ {
w4w.Add(1)
wg.Add(1)
go handleEntity(fmt.Sprintf("%d", i))
}
w4w.Wait()
time.AfterFunc(d, func() {
// m.Lock()
// label = "none"
// m.Unlock()
fmt.Println("expired")
c.Broadcast() // expired
})
for i := 0; i < 4; i++ {
m.Lock()
label = fmt.Sprintf("%d", i)
m.Unlock()
c.Broadcast() // notify all
uuidChan <- label
}
fmt.Println("...")
wg.Wait()
fmt.Println("all done")
}
var (
label string
uuidChan = make(chan string)
m sync.Mutex
c = sync.NewCond(&m)
w4w, wg sync.WaitGroup
d = 1 * time.Second
)
Output:
id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论