在Go的select语句中的优先级解决方法

huangapple go评论90阅读模式
英文:

Priority in Go select statement workaround

问题

我希望有一个go例程监听两个通道,在两个通道都被耗尽时被阻塞。然而,如果两个通道都包含数据,我希望一个通道在另一个通道处理之前被耗尽。

在下面的工作示例中,我希望在处理exit之前将所有的out都耗尽。我使用了一个select语句,它没有任何优先级顺序。我该如何解决这个问题,使得所有的10个out值在退出之前都被处理?

package main

import "fmt"

func sender(out chan int, exit chan bool){
    for i := 1; i <= 10; i++ {
        out <- i
    } 
    exit <- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)
    
    go sender(out, exit)
    
    L:
    for {
        select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                fmt.Println("Exiting")
                break L
        }
    }
    fmt.Println("Did we get all 10? Most likely not")
}
英文:

I wish to have a go routine listening on two channels, blocked when both channels are drained. However, if both channels contains data, I want one to be drained before the other is handled.

In the working example below I wish all out to be drained before exit is handled. I use a select-statement which doesn't have any priority order. How might I get around the problem, making all 10 out-values be handled before the exit?

package main

import &quot;fmt&quot;

func sender(out chan int, exit chan bool){
    for i := 1; i &lt;= 10; i++ {
		out &lt;- i
	} 
	exit &lt;- true
}

func main(){
	out := make(chan int, 10)
	exit := make(chan bool)
	
	go sender(out, exit)
	
	L:
	for {
		select {
			case i := &lt;-out:
				fmt.Printf(&quot;Value: %d\n&quot;, i)
			case &lt;-exit:
				fmt.Println(&quot;Exiting&quot;)
				break L
		}
	}
	fmt.Println(&quot;Did we get all 10? Most likely not&quot;)
}

答案1

得分: 43

这种语言本身就支持,不需要绕过。很简单:退出通道只应该对生产者可见。在退出时,生产者关闭通道。只有当通道为空且关闭时,消费者才退出。这是通过对通道进行范围遍历实现的。

以下是一个示例来说明:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    produced  = 0
    processed = 0
)

func produceEndlessly(out chan int, quit chan bool) {
    defer close(out)
    for {
        select {
        case <-quit:
            fmt.Println("RECV QUIT")
            return
        default:
            out <- rand.Int()
            time.Sleep(time.Duration(rand.Int63n(5e6)))
            produced++
        }
    }
}

func quitRandomly(quit chan bool) {
    d := time.Duration(rand.Int63n(5e9))
    fmt.Println("SLEEP", d)
    time.Sleep(d)
    fmt.Println("SEND QUIT")
    quit <- true
}

func main() {
    vals, quit := make(chan int, 10), make(chan bool)
    go produceEndlessly(vals, quit)
    go quitRandomly(quit)
    for x := range vals {
        fmt.Println(x)
        processed++
        time.Sleep(time.Duration(rand.Int63n(5e8)))
    }
    fmt.Println("Produced:", produced)
    fmt.Println("Processed:", processed)
}
英文:

The language supports this natively and no workaround is required. It's very simple: the quit channel should only be visible to the producer. On quit, the producer closes the channel. Only when the channel is empty and closed does the consumer quit. This is made possible by ranging over the channel.

Here is an example to illustrate:

package main

import (
    &quot;fmt&quot;
    &quot;math/rand&quot;
    &quot;time&quot;
)

var (
    produced  = 0
    processed = 0
)

func produceEndlessly(out chan int, quit chan bool) {
    defer close(out)
    for {
        select {
        case &lt;-quit:
            fmt.Println(&quot;RECV QUIT&quot;)
            return
        default:
            out &lt;- rand.Int()
            time.Sleep(time.Duration(rand.Int63n(5e6)))
            produced++
        }
    }
}

func quitRandomly(quit chan bool) {
    d := time.Duration(rand.Int63n(5e9))
    fmt.Println(&quot;SLEEP&quot;, d)
    time.Sleep(d)
    fmt.Println(&quot;SEND QUIT&quot;)
    quit &lt;- true
}

func main() {
    vals, quit := make(chan int, 10), make(chan bool)
    go produceEndlessly(vals, quit)
    go quitRandomly(quit)
    for x := range vals {
        fmt.Println(x)
        processed++
        time.Sleep(time.Duration(rand.Int63n(5e8)))
    }
    fmt.Println(&quot;Produced:&quot;, produced)
    fmt.Println(&quot;Processed:&quot;, processed)
}

答案2

得分: 34

第一个select语句的default case使其非阻塞。该select语句会从out通道中读取数据,而不会等待exit通道的数据。如果out通道为空,它会立即转到第二个select语句。第二个select语句是阻塞的,它会等待任一通道上的数据。如果收到exit信号,它会处理并允许循环退出。如果收到数据,它会回到循环的顶部并重新进入读取模式。

英文:
package main

import &quot;fmt&quot;

func sender(out chan int, exit chan bool) {
	for i := 1; i &lt;= 10; i++ {
		out &lt;- i
	}
	exit &lt;- true
}

func main() {
	out := make(chan int, 10)
	exit := make(chan bool)

	go sender(out, exit)

	for {
		select {
		case i := &lt;-out:
			fmt.Printf(&quot;Value: %d\n&quot;, i)
			continue
		default:
		}
		select {
		case i := &lt;-out:
			fmt.Printf(&quot;Value: %d\n&quot;, i)
			continue
		case &lt;-exit:
			fmt.Println(&quot;Exiting&quot;)
		}
		break
	}
	fmt.Println(&quot;Did we get all 10? I think so!&quot;)
}

The default case of the first select makes it non-blocking. The select will drain the out channel without looking at the exit channel, but otherwise will not wait. If the out channel is empty, it immediately drops to the second select. The second select is blocking. It will wait for data on either channel. If an exit comes, it handles it and allows the loop to exit. If data comes, it goes back up the top of the loop and back into drain mode.

答案3

得分: 7

另一种方法:

package main

import "fmt"

func sender(c chan int) chan int {
        go func() {
                for i := 1; i <= 15; i++ {
                        c <- i
                }
                close(c)
        }()
        return c
}

func main() {
        for i := range sender(make(chan int, 10)) {
                fmt.Printf("Value: %d\n", i)
        }
        fmt.Println("Did we get all 15? Surely yes")
}

$ go run main.go
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Did we get all 15? Surely yes
$
英文:

Another approach:

package main

import &quot;fmt&quot;

func sender(c chan int) chan int {
        go func() {
                for i := 1; i &lt;= 15; i++ {
                        c &lt;- i
                }
                close(c)
        }()
        return c
}

func main() {
        for i := range sender(make(chan int, 10)) {
                fmt.Printf(&quot;Value: %d\n&quot;, i)
        }
        fmt.Println(&quot;Did we get all 15? Surely yes&quot;)
}

$ go run main.go
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Did we get all 15? Surely yes
$ 

答案4

得分: 6

这里有一个通用的习语,可以解决选择的优先级问题。

是的,这样说不好听,但是可以满足100%的需求,没有任何陷阱和隐藏的限制。

这是一个简短的代码示例,下面是解释。

package main

import(
	"fmt"
	"time"
)

func sender(out chan int, exit chan bool) {
	for i := 1; i <= 10; i++ {
		out <- i
	}
	
	time.Sleep(2000 * time.Millisecond)
	out <- 11
	exit <- true
}

func main(){
	out := make(chan int, 20)
	exit := make(chan bool)
	
	go sender(out, exit)
	
	time.Sleep(500 * time.Millisecond)
	
	L:
	for {
		select {
		case i := <-out:
			fmt.Printf("Value: %d\n", i)
		default:
			select {
			case i := <-out:
				fmt.Printf("Value: %d\n", i)
			case <-exit:
				select {
				case i := <-out:
					fmt.Printf("Value: %d\n", i)
				default:
					fmt.Println("Exiting")
					break L
				}
			}
		}
	}
	fmt.Println("Did we get all 10? Yes.")
	fmt.Println("Did we get 11? DEFINITELY YES")
}

以下是上面的main()函数的注释:

func main(){
	out := make(chan int, 20)
	exit := make(chan bool)
	go sender(out, exit)
	time.Sleep(500 * time.Millisecond)
	L:
	for {
		select {
	
			// 这里是进入下一次循环迭代时执行的代码
			// 并检查out是否有可读取的内容
			
			// 这个select用于处理循环中的缓冲数据
	
		case i := <-out:
			fmt.Printf("Value: %d\n", i)
		default:
			// 否则我们会回到这里

			select {
		
				// 这个select用于在两个通道中都没有数据时阻塞
			
			case i := <-out:
			// 如果out有可读取的内容,我们解除阻塞,然后再次进入循环

				fmt.Printf("Value: %d\n", i)
			case <-exit:
				select {
				
					// 这个select用于明确地将一个通道优先于另一个通道,
					// 以防我们在低优先级的情况下被唤醒(解除阻塞)
					
					// 注意:
					// 即使高优先级的通道在第二个位置,也会优先处理它,快速
					// 连续出现在第一个位置的情况下
					
				case i := <-out:
					fmt.Printf("Value: %d\n", i)
				default:
					fmt.Println("Exiting")
					break L
				}
			}
		}
	}
	
	fmt.Println("Did we get all 10? Yes.")
	fmt.Println("Did we get 11? DEFINITELY YES")
}

注意:在使用优先级处理之前,请确保您正在解决正确的问题。

很有可能可以用不同的方法解决。

然而,如果高优先级通道没有缓冲区,和/或者您只期望 sporadic 单个事件而不是大量数据的话,
那么简单的两阶段习语(与上面的答案相同)就足够了:

L:
for {
	select {
	case i := <-out:
		fmt.Printf("Value: %d\n", i)
	case <-exit:
		select {
		case i := <-out:
			fmt.Printf("Value: %d\n", i)
		default:
			fmt.Println("Exiting")
			break L
		}
	}
}

它基本上是2和3阶段,去掉了第1阶段。

再次强调:在大约90%的情况下,您认为需要优先处理通道切换情况,实际上并不需要。

这是一个可以包装在宏中的一行代码:

for {
	select { case a1 := <-ch_p1: p1_action(a1); default: select { case a1 := <-ch_p1: p1_action(a1); case a2 := <-ch_p2: select { case a1 := <-ch_p1: p1_action(a1); default: p2_action(a2); }}}
}

如果您想要优先处理多于两个的情况怎么办?

那么您有两个选择。第一种选择是使用中间的 goroutine 构建一棵树,使每个分叉都是二进制的(上面的习语)。

第二种选择是使优先分支超过两倍。

以下是三个优先级的示例:

for {
	select {
	case a1 := <-ch_p1:
		p1_action(a1)
	default:
		select {
		case a2 := <-ch_p2:
			p2_action(a2)
		default:
			select {	// 在这个select上阻塞
			case a1 := <-ch_p1:
				p1_action(a1)
			case a2 := <-ch_p2:
				select {
				case a1 := <-ch_p1:
					p1_action(a1)
				default:
					p2_action(a2)
				}
			case a3 := <-ch_p3:
				select {
				case a1 := <-ch_p1:
					p1_action(a1)
				case a2 := <-ch_p2:
					p1_action(a2)
				default:
					p2_action(a3)
				}
			}
		}
	}
}

也就是说,整个结构在概念上分为三个部分,与原始的(二进制)结构相同。

再次强调:很有可能您可以设计您的系统,以避免这种混乱。

附:反问一下,为什么 Golang 没有将其内置到语言中?这个问题是反问的。

英文:

Here's a general idiom that solves the select's priority problem.

Yes, it's not nice to say a least, but does what is needed for 100%, no pitfalls and no hidden limitations.

Here's a short code example, and explanation follows.

package main

import(
	&quot;fmt&quot;
	&quot;time&quot;
)

func sender(out chan int, exit chan bool) {
	for i := 1; i &lt;= 10; i++ {
		out &lt;- i
	}
	
	time.Sleep(2000 * time.Millisecond)
	out &lt;- 11
	exit &lt;- true
}

func main(){
	out := make(chan int, 20)
	exit := make(chan bool)
	
	go sender(out, exit)
	
	time.Sleep(500 * time.Millisecond)
	
	L:
	for {
		select {
		case i := &lt;-out:
			fmt.Printf(&quot;Value: %d\n&quot;, i)
		default:
			select {
			case i := &lt;-out:
				fmt.Printf(&quot;Value: %d\n&quot;, i)
			case &lt;-exit:
				select {
				case i := &lt;-out:
					fmt.Printf(&quot;Value: %d\n&quot;, i)
				default:
					fmt.Println(&quot;Exiting&quot;)
					break L
				}
			}
		}
	}
	fmt.Println(&quot;Did we get all 10? Yes.&quot;)
	fmt.Println(&quot;Did we get 11? DEFINITELY YES&quot;)
}

And, here's how it works, the main() from above, annotated:

func main(){
	out := make(chan int, 20)
	exit := make(chan bool)
	go sender(out, exit)
	time.Sleep(500 * time.Millisecond)
	L:
	for {
		select {
	
			// here we go when entering next loop iteration
			// and check if the out has something to be read from
			
			// this select is used to handle buffered data in a loop
	
		case i := &lt;-out:
			fmt.Printf(&quot;Value: %d\n&quot;, i)
		default:
			// else we fallback in here

			select {
		
				// this select is used to block when there&#39;s no data in either chan
			
			case i := &lt;-out:
			// if out has something to read, we unblock, and then go the loop round again

				fmt.Printf(&quot;Value: %d\n&quot;, i)
			case &lt;-exit:
				select {
				
					// this select is used to explicitly propritize one chan over the another,
					// in case we woke up (unblocked up) on the low-priority case
					
					// NOTE:
					// this will prioritize high-pri one even if it came _second_, in quick
					// succession to the first one
					
				case i := &lt;-out:
					fmt.Printf(&quot;Value: %d\n&quot;, i)
				default:
					fmt.Println(&quot;Exiting&quot;)
					break L
				}
			}
		}
	}
	
	fmt.Println(&quot;Did we get all 10? Yes.&quot;)
	fmt.Println(&quot;Did we get 11? DEFINITELY YES&quot;)
}

NOTE: Before playing tricks with prioritizations, MAKE SURE YOU ARE SOLVING THE RIGHT PROBLEM.

Chances are, it can be solved differently.

Still, to have prioritized select in Go would have been great thing. Just a dream..

NOTE: This is quite a similar answer https://stackoverflow.com/a/45854345/11729048 on this thread, but there is only two select-s are nested, not three ones as I did. What's the difference? My approach is more efficient, and there we explicitly expect to handle random choices on each loop iteration.

However, if the high-priority channel isn't buffered, and/or you don't expect bulk data on it, only the sporadic single events,
then the simpler two-stage idiom (as in that answer) will suffice:

L:
for {
	select {
	case i := &lt;-out:
		fmt.Printf(&quot;Value: %d\n&quot;, i)
	case &lt;-exit:
		select {
		case i := &lt;-out:
			fmt.Printf(&quot;Value: %d\n&quot;, i)
		default:
			fmt.Println(&quot;Exiting&quot;)
			break L
		}
	}
}

It is basically 2 and 3 stages, the 1 being removed.

And once again: in like 90% cases you think you do need to prioritize chan switch cases, you really don't.

And here's a one-liner, that can be wrapped in a macro:

for {
	select { case a1 := &lt;-ch_p1: p1_action(a1); default: select { case a1 := &lt;-ch_p1: p1_action(a1); case a2 := &lt;-ch_p2: select { case a1 := &lt;-ch_p1: p1_action(a1); default: p2_action(a2); }}}
}

And what if you want to prioritize more than two cases?

Then you have two options. First one - build a tree, using intermediate goroutines, so that each fork is exactly binary (the above idiom).

The second option is to make the priority-fork more then double.

Here's an example of three priorities:

for {
	select {
	case a1 := &lt;-ch_p1:
		p1_action(a1)
	default:
		select {
		case a2 := &lt;-ch_p2:
			p2_action(a2)
		default:
			select {	// block here, on this select
			case a1 := &lt;-ch_p1:
				p1_action(a1)
			case a2 := &lt;-ch_p2:
				select {
				case a1 := &lt;-ch_p1:
					p1_action(a1)
				default:
					p2_action(a2)
				}
			case a3 := &lt;-ch_p3:
				select {
				case a1 := &lt;-ch_p1:
					p1_action(a1)
				case a2 := &lt;-ch_p2:
					p1_action(a2)
				default:
					p2_action(a3)
				}
			}
		}
	}
}

That is, the whole structure is conceptually split into three parts, as the original (binary) one.

One again: chances are, you can design your system so that you can avoid this mess.

P.S., the rhetoric question: why Golang doesn't have it built in into the language??? The question is rhetoric one.

答案5

得分: 2

这是另一个选项。

消费者代码:

  go func() {
    stop := false
    for {
      select {
      case item, _ := <-r.queue:
        doWork(item)
      case <-r.stopping:
        stop = true
      }
      if stop && len(r.queue) == 0 {
        break
      }
    }
  }()
英文:

Here's another option.

Consumer Code:

  go func() {
    stop := false
    for {
      select {
      case item, _ := &lt;-r.queue:
        doWork(item)
      case &lt;-r.stopping:
        stop = true
      }
      if stop &amp;&amp; len(r.queue) == 0 {
        break
      }
    }
  }()

答案6

得分: 1

我已经创建了一个相当简单的解决方法。它可以实现我想要的功能,但如果有其他更好的解决方案,请告诉我:

exiting := false
for !exiting || len(out)>0 {
    select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        case <-exit:
            exiting = true
            fmt.Println("Exiting")
    }
}

与其在接收到消息时退出,我会标记一个退出信号,只有在确保chan out中没有剩余内容时才退出。

英文:

I have created one rather simple workaround. It does what I want, but if anyone else has a better solution, please let me know:

exiting := false
for !exiting || len(out)&gt;0 {
	select {
        case i := &lt;-out:
            fmt.Printf(&quot;Value: %d\n&quot;, i)
        case &lt;-exit:
			exiting = true
            fmt.Println(&quot;Exiting&quot;)
    }
}

Instead of exiting on receiving, I flag an exit, exiting once I've made sure nothing is left in chan out.

答案7

得分: 1

我认为Sonia的答案是错误的。这是我的解决方案,有点复杂。

package main

import "fmt"

func sender(out chan int, exit chan bool){
    for i := 1; i <= 10; i++ {
        out <- i
    } 
    exit <- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    L:
    for {
        select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                for{
                    select{
                    case i:=<-out:
                        fmt.Printf("Value: %d\n", i)
                    default:
                        fmt.Println("Exiting")
                        break L
                    }
                }
                fmt.Println("Exiting")
                break L
        }
    }
    fmt.Println("Did we get all 10? Yes!")
}
英文:

I think Sonia's answer is incorrect.This is my solution,a little bit complicate.

package main

import &quot;fmt&quot;

func sender(out chan int, exit chan bool){
    for i := 1; i &lt;= 10; i++ {
        out &lt;- i
    } 
    exit &lt;- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    L:
    for {
        select {
            case i := &lt;-out:
                fmt.Printf(&quot;Value: %d\n&quot;, i)
            case &lt;-exit:
                for{
                    select{
                    case i:=&lt;-out:
                        fmt.Printf(&quot;Value: %d\n&quot;, i)
                    default:
                        fmt.Println(&quot;Exiting&quot;)
                        break L
                    }
                }
                fmt.Println(&quot;Exiting&quot;)
                break L
        }
    }
    fmt.Println(&quot;Did we get all 10? Yes!&quot;)
}

答案8

得分: 0

在我的情况下,我真的想要优先处理一个通道的数据,而不仅仅是有一个带外退出信号。为了让其他有相同问题的人受益,我认为这种方法可以避免潜在的竞态条件:

OUTER:
for channelA != nil || channelB != nil {

    select {

    case typeA, ok := <-channelA:
        if !ok {
            channelA = nil
            continue OUTER
        }
        doSomething(typeA)

    case nodeIn, ok := <-channelB:
        if !ok {
            channelB = nil
            continue OUTER
        }

        // 在处理来自channelB的数据之前,循环的非阻塞嵌套select会检查channelA是否真的被耗尽
        NESTED:
        for {
            select {
            case typeA, ok := <-channelA:
                if !ok {
                    channelA = nil
                    continue NESTED
                }
                doSomething(typeA)

            default:
                // 现在我们可以处理typeB的数据了
                doSomethingElse(typeB)
                break NESTED
            }
        }
    }

}
英文:

In my case, I really wanted to prioritise data from one channel over another, and not just have an out-of-band exit signal. For the benefit of anyone else with the same issue I think this approach works without the potential race condition:

OUTER:
for channelA != nil || channelB != nil {

	select {

	case typeA, ok := &lt;-channelA:
		if !ok {
			channelA = nil
			continue OUTER
		}
		doSomething(typeA)

	case nodeIn, ok := &lt;-channelB:
		if !ok {
			channelB = nil
			continue OUTER
		}

		// Looped non-blocking nested select here checks that channelA
		// really is drained before we deal with the data from channelB
	    NESTED:
		for {
			select {
			case typeA, ok := &lt;-channelA:
				if !ok {
					channelA = nil
					continue NESTED
				}
				doSomething(typeA)

			default:
				// We are free to process the typeB data now
				doSomethingElse(typeB)
				break NESTED
			}
		}
	}

}

答案9

得分: 0

使用缓冲通道make(chan int, 10)有没有特定的原因?

你需要使用无缓冲通道而不是缓冲通道,而你正在使用缓冲通道。

只需删除10,应该只是make(chan int)

这样,在sender函数中的执行只能在最后一条来自out通道的消息被i := <-out语句出队之后才能继续到exit <- true语句。如果该语句尚未执行,那么在goroutine中将无法到达exit <- true

英文:

Is there any specific reason for using a buffered channel make(chan int, 10)?

You need to use an unbuffered channel vs buffered, which you are using.

Just remove 10, it should be just make(chan int).

This way execution in the sender function can only proceed to the exit &lt;- true statement after the last message from the out channel is dequeued by the i := &lt;-out statement. If that statement has not been executed, there is no way the exit &lt;- true could be reached in the goroutine.

huangapple
  • 本文由 发表于 2012年6月20日 18:15:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/11117382.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定