多个 goroutine 在一个通道上进行有选择地监听。

huangapple go评论73阅读模式
英文:

Multiple goroutines listening selectively on one channel

问题

我看了一下这些链接,但是没有一个能真正帮助我解决这个问题。我有多个goroutine,如果通道中的值是针对特定的goroutine的,它们需要执行某些任务。

在上面的代码中,uuid被一个通道接收,但是没有发生任何事情。为了解决这个问题,我尝试改变逻辑,如果某个uuid的逻辑不在该例程中,就将uuid重新插入通道。我知道这是一个不好的做法,而且也不起作用。

你认为正确的做法是什么?

英文:

I have looked at this, this, this and this but none really help me in this situation.
I have multiple goroutines that need to do some task if the value in the channel is for that particular goroutine.

var uuidChan chan string

func handleEntity(entityUuid string) {
	go func() {
		for {
			select {
			case uuid := <-uuidChan:
				if uuid == entityUuid {
					// logic
					println(uuid)
					return
				}
			case <-time.After(time.Second * 5):
				println("Timeout")
				return
			}
		}
	}()
}

func main() {
	uuidChan = make(chan (string))
	for i := 0; i < 5; i++ {
		handleEntity(fmt.Sprintf("%d", i))
	}
	for i := 0; i < 4; i++ {
		uuidChan <- fmt.Sprintf("%d", i)
	}
}

https://play.golang.org/p/Pu5MhSP9Qtj

In the above logic, uuid is received by one of the channels and nothing happens. To solve this, I tried changing the logic to reinsert the uuid back into the channel if logic for some uuid is not in that routine. I know its a bad practice and that doesn't work too.

func handleEntity(entityUuid string) {
	go func() {
		var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
		for {
			select {
			case uuid := <-uuidChan:
				if uuid == entityUuid {
					// logic
					println(uuid)
					return
				} else {
					notMe = append(notMe, uuid)
				}
			case <-time.After(time.Second * 5):
				println("Timeout")
				defer func() {
					for _, uuid := range notMe {
						uuidChan <- uuid
					}
				}()
				return
			}
		}
	}()
}

https://play.golang.org/p/5On-Vd7UzqP

What could be the correct way to do this?

答案1

得分: 4

也许你想要将你的通道映射到正确的goroutine,以便立即发送消息:

package main

import (
	"fmt"
	"time"
)

func worker(u string, c chan string) {
	for {
		fmt.Printf("在 %s 中接收到 %s\n", u, <-c)
	}
}

func main() {
	workers := make(map[string]chan string)

	for _, u := range []string{"foo", "bar", "baz"} {
		workers[u] = make(chan string)
		go worker(u, workers[u])
	}

	workers["foo"] <- "你好"
	workers["bar"] <- "世界"
	workers["baz"] <- "!"

	fmt.Println()

	time.Sleep(time.Second)
}

希望这对你有帮助!

英文:

maybe you want to map your channels to send the message to correct goroutine right away:

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

func worker(u string, c chan string) {
	for {
		fmt.Printf(&quot;got %s in %s\n&quot;, &lt;-c, u)
	}
}

func main() {
	workers := make(map[string]chan string)

	for _, u := range []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;} {
		workers[u] = make(chan string)
		go worker(u, workers[u])
	}

	workers[&quot;foo&quot;] &lt;- &quot;hello&quot;
	workers[&quot;bar&quot;] &lt;- &quot;world&quot;
	workers[&quot;baz&quot;] &lt;- &quot;!&quot;

	fmt.Println()

	time.Sleep(time.Second)
}

答案2

得分: 4

你有一个带有标签的盒子,所以接收者应该先读取标签,然后决定如何处理它。如果你把标签放在盒子里面,你就是在强迫接收者打开盒子(参见解决方案1)。我建议你提供更好的邮政服务,至少把标签放在盒子外面(参见解决方案3),或者更好地将盒子立即送到正确的地址(参见解决方案2):

有很多解决方案,你的想象力是唯一的限制:
1.
由于你只有一个带有ID数据的通道,用于具有ID的消费者,而且你只能一次从通道中读取数据(假设通道中的数据顺序很重要),你有一个简单的解决方案:使用一个读取goroutine从通道中读取数据,然后应用逻辑来决定如何处理这个数据,例如将其发送到另一个goroutine或运行一个任务。
尝试这个:链接

  1. 使用每个goroutine一个通道,尝试这个:链接

  2. 使用labelsync.Cond的信号广播:
    我们有一个盒子,并使用名为label的共享变量在盒子顶部添加接收者的地址。
    首先使用名为label的共享资源将盒子的label设置为所需的ID,然后使用信号广播通知所有监听的goroutine唤醒并检查label和时间,看看是否有一个被寻址和过期,然后所有的goroutine都返回等待状态,被寻址或过期的goroutine继续读取非缓冲通道或退出。然后使用time.AfterFunc来信号剩余goroutine的过期,最后使用wg.Wait()等待它们全部加入。请注意,第一个c.Broadcast()应该在c.Wait()之后调用,这意味着在第一次调用c.Broadcast()之前,goroutine应该正在运行,所以一种方法是简单地使用另一个名为w4wsync.WaitGroup,表示wait for wait
    尝试这个:链接

希望对你有帮助!

英文:

You have a box with a label inside it, so a reciever should read the label first then decide what to do with it. If you place the label inside the box - you are forcing the reiever to open the box (see the solution number 1). I would encourage you do a better postal service and place the label at least outside of the box (see the solution number 3)- or better deleiver the box to a correct address at once (see the solution number 2):

There are many solutions to this and you only limited by your imagination:
1.
Since you have only one channel with a data with an ID inside it for a consumer with an ID, and you can only read a data from the channel once (assuming the oreder of data inside the channel is important) - you have a simple sulotion: Use a reading goroutine which reads a data from the channel then apply the logic to decide what to do with this data - e.g. send it to another goroutine or run a task.
Try this:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

func main() {
	uuidChan := make(chan string)
	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		t := time.NewTimer(5 * time.Second)
		defer t.Stop()
		for {
			select {
			case uuid, ok := &lt;-uuidChan:
				if !ok {
					fmt.Println(&quot;Channel closed.&quot;)
					return
				}
// logic:
				wg.Add(1)
				// Multiple goroutines listening selectively on one channel
				go consume(uuid, &amp;wg)
				// switch uuid {case 1: go func1(); case 2: go func2()}

			case &lt;-t.C:
				fmt.Println(&quot;Timeout&quot;)
				return
			}
		}
	}()

	for i := 0; i &lt; 4; i++ {
		uuidChan &lt;- fmt.Sprintf(&quot;%d&quot;, i)
	}
	close(uuidChan) // free up the goroutine

	wg.Wait() // wait until all consumers are done
	fmt.Println(&quot;All done.&quot;)
}

// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
	defer wg.Done()
// logic: or decide here based on uuid
	fmt.Println(&quot;job #:&quot;, uuid) // job
}

Output:

job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.

  1. Using a channel per goroutine, try this:
package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	// for {
	select {
	case uuid, ok := &lt;-uuidChan:
		if !ok {
			fmt.Println(&quot;closed&quot;)
			return // free up goroutine on chan closed
		}
		fmt.Println(uuid)
		return // job done

	case &lt;-time.After(1 * time.Second):
		fmt.Println(&quot;Timeout&quot;)
		return
	}
	// }
}

func main() {
	const max = 5
	slice := make([]chan string, max)
	var wg sync.WaitGroup

	for i := 0; i &lt; max; i++ {
		slice[i] = make(chan string, 1)

		wg.Add(1)
		go handleEntity(slice[i], &amp;wg)
	}

	for i := 0; i &lt; 4; i++ {
		slice[i] &lt;- fmt.Sprintf(&quot;%d&quot;, i) // send to the numbered channel
	}

	wg.Wait()
	fmt.Println(&quot;All done.&quot;)
}

Output:

3
0
1
2
Timeout
All done.

  1. Using label and signal broadcast of sync.Cond:
    So we have a box and using shared var named label we add the address of the reciever on top of the box.
    Here using a shared resource named label first set the box label to a desired ID then using signal broadcast inform all listenning goroutines to wake up and check the label and time to see if one is addressed and expired or not then all go back to wait state and the addressed or expired one continues to read the unbuffered channel or exit. Then using the time.AfterFunc to signal the expiration of the remaining goroutine(s) and finally wg.Wait() for them all to join. Note that the first c.Broadcast() should be called after c.Wait() - meaning the goroutines should be running before the first call to c.Broadcast(), so one way is to simply use another sync.WaitGroup named w4w short for wait for wait.
package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

func handleEntity(entityUuid string) {
	defer wg.Done()
	t0 := time.Now()
	var expired, addressed bool

	w4w.Done()
	m.Lock()
	for !expired &amp;&amp; !addressed {
		c.Wait()
		addressed = label == entityUuid
		expired = time.Since(t0) &gt; d
	}
	m.Unlock()

	fmt.Println(&quot;id =&quot;, entityUuid, &quot;addressed =&quot;, addressed, &quot;expired =&quot;, expired)
	if !expired &amp;&amp; addressed {
		uuid := &lt;-uuidChan
		fmt.Println(&quot;matched =&quot;, entityUuid, uuid)
	}
	fmt.Println(&quot;done&quot;, entityUuid)
}

func main() {
	for i := 0; i &lt; 5; i++ {
		w4w.Add(1)
		wg.Add(1)
		go handleEntity(fmt.Sprintf(&quot;%d&quot;, i))
	}
	w4w.Wait()

	time.AfterFunc(d, func() {
		// m.Lock()
		// label = &quot;none&quot;
		// m.Unlock()
		fmt.Println(&quot;expired&quot;)
		c.Broadcast() // expired
	})

	for i := 0; i &lt; 4; i++ {
		m.Lock()
		label = fmt.Sprintf(&quot;%d&quot;, i)
		m.Unlock()
		c.Broadcast() // notify all
		uuidChan &lt;- label
	}

	fmt.Println(&quot;...&quot;)
	wg.Wait()
	fmt.Println(&quot;all done&quot;)
}

var (
	label    string
	uuidChan = make(chan string)
	m        sync.Mutex
	c        = sync.NewCond(&amp;m)
	w4w, wg  sync.WaitGroup
	d        = 1 * time.Second
)

Output:

id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done

huangapple
  • 本文由 发表于 2021年7月15日 13:34:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/68388460.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定