Go – wait for next item in a priority queue if empty

huangapple go评论78阅读模式
英文:

Go - wait for next item in a priority queue if empty

问题

我正在尝试实现一个基于优先级的优先队列,用于通过网络套接字发送 JSON 对象。我正在使用 container/heap 包来实现队列。我想到了以下的代码:

for {
    if pq.Len() > 0 {
        item := heap.Pop(&pq).(*Item)
        jsonEncoder.Encode(&item)
    } else {
        time.Sleep(10 * time.Millisecond)
    }
}

除了轮询优先队列以等待新项目之外,是否有更好的方法?

英文:

I am trying to implement a priority queue to send json objects through a network socket based on priority. I am using the container/heap package to implement the queue. I came up with something like this:

for {
	if pq.Len() > 0 {
		item := heap.Pop(&pq).(*Item)
		jsonEncoder.Encode(&item)
	} else {
		time.Sleep(10 * time.Millisecond)
	}
}

Are there better ways to wait for a new item than just polling the priority queue?

答案1

得分: 3

我可能会使用一对排队的goroutine。从PriortyQueue示例中开始,我会构建一个像这样的函数:

func queue(in <-chan *Item, out chan<- *Item) {
	// 创建一个队列
	pq := make(PriorityQueue, 0)
	heap.Init(&pq)

	var currentItem *Item       // 当前处理的项
	var currentIn = in          // 当前输入通道(有时可能为nil)
	var currentOut chan<- *Item // 当前输出通道(一开始为nil,直到有数据)

	defer close(out)

	for {
		select {
		// 从输入通道读取数据
		case item, ok := <-currentIn:
			if !ok {
				// 输入通道已关闭,停止尝试读取
				currentIn = nil
				// 如果没有待写入的数据,结束处理
				if currentItem == nil {
					return
				}
				continue
			}

			// 如果有待写入的数据,将其放回队列
			if currentItem != nil {
				heap.Push(&pq, currentItem)
			}

			// 将新的数据放入队列
			heap.Push(&pq, item)

			// 如果输出通道未开启,开启它
			currentOut = out

			// 获取最优的项,至少有一项,因为我们刚刚放入了一个
			currentItem = heap.Pop(&pq).(*Item)

			// 写入输出通道
		case currentOut <- currentItem:
			// 写入完成,还有其他数据吗?
			if len(pq) > 0 {
				// 保存下一次处理的项
				currentItem = heap.Pop(&pq).(*Item)
			} else {
				// 没有数据可写入,输入通道是否已关闭?
				if currentIn == nil {
					// 输入通道已关闭,结束处理
					return
				}

				// 否则,暂时关闭输出通道
				currentItem = nil
				currentOut = nil
			}
		}
	}
}

以下是使用示例:

func main() {
	// 一些项及其优先级
	items := map[string]int{
		"banana": 3, "apple": 2, "pear": 4,
	}

	in := make(chan *Item, 10) // 大输入缓冲区和无缓冲输出通道可获得最佳排序顺序。
	out := make(chan *Item)    // 但是,对于任何特定的值,系统都会“工作”。

	// 启动排队引擎!
	go queue(in, out)

	// 在另一个goroutine中放入一些数据
	go func() {
		i := 0
		for value, priority := range items {
			in <- &Item{
				value:    value,
				priority: priority,
				index:    i,
			}
			i++
		}
		close(in)
	}()

	// 读取结果
	for item := range out {
		fmt.Printf("%.2d:%s ", item.priority, item.value)
	}
	fmt.Println()
}

请注意,如果运行此示例,每次顺序可能会有所不同。这是正常的,它取决于输入和输出通道的速度。

英文:

I'd probably use a couple a queuing goroutine. Starting with the data structures in the PriorityQueue example, I'd build a function like this:

http://play.golang.org/p/hcNFX8ehBW

func queue(in &lt;-chan *Item, out chan&lt;- *Item) {
// Make us a queue!
pq := make(PriorityQueue, 0)
heap.Init(&amp;pq)
var currentItem *Item       // Our item &quot;in hand&quot;
var currentIn = in          // Current input channel (may be nil sometimes)
var currentOut chan&lt;- *Item // Current output channel (starts nil until we have something)
defer close(out)
for {
select {
// Read from the input
case item, ok := &lt;-currentIn:
if !ok {
// The input has been closed. Don&#39;t keep trying to read it
currentIn = nil
// If there&#39;s nothing pending to write, we&#39;re done
if currentItem == nil {
return
}
continue
}
// Were we holding something to write? Put it back.
if currentItem != nil {
heap.Push(&amp;pq, currentItem)
}
// Put our new thing on the queue
heap.Push(&amp;pq, item)
// Turn on the output queue if it&#39;s not turned on
currentOut = out
// Grab our best item. We know there&#39;s at least one. We just put it there.
currentItem = heap.Pop(&amp;pq).(*Item)
// Write to the output
case currentOut &lt;- currentItem:
// OK, we wrote. Is there anything else?
if len(pq) &gt; 0 {
// Hold onto it for next time
currentItem = heap.Pop(&amp;pq).(*Item)
} else {
// Oh well, nothing to write. Is the input stream done?
if currentIn == nil {
// Then we&#39;re done
return
}
// Otherwise, turn off the output stream for now.
currentItem = nil
currentOut = nil
}
}
}
}

Here's an example of using it:

func main() {
// Some items and their priorities.
items := map[string]int{
&quot;banana&quot;: 3, &quot;apple&quot;: 2, &quot;pear&quot;: 4,
}
in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
out := make(chan *Item)    // But the system will &quot;work&quot; for any particular values
// Start the queuing engine!
go queue(in, out)
// Stick some stuff on in another goroutine
go func() {
i := 0
for value, priority := range items {
in &lt;- &amp;Item{
value:    value,
priority: priority,
index:    i,
}
i++
}
close(in)
}()
// Read the results
for item := range out {
fmt.Printf(&quot;%.2d:%s &quot;, item.priority, item.value)
}
fmt.Println()
}

Note that if you run this example, the order will be a little different every time. That's of course expected. It depends on exactly how fast the input and output channels run.

答案2

得分: 2

一种方法是使用sync.Cond

> Cond实现了条件变量,是goroutine等待或宣布事件发生的会合点。

以下是该包中的一个示例(用于消费者)的修改版本:

c.L.Lock()
for heap.Len() == 0 {
c.Wait() // 等待直到被推送的例程发出信号
}
item := heap.Pop(&amp;pq).(*Item)
c.L.Unlock()
// 处理item

生产者可以简单地执行以下操作:

c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()

(将它们封装在函数中并使用defer可能是个好主意。)

这是一个线程安全(天真的)堆的示例,其中pop方法会等待直到有可用的项:

package main
import (
&quot;fmt&quot;
&quot;sort&quot;
&quot;sync&quot;
&quot;time&quot;
&quot;math/rand&quot;
)
type Heap struct {
b []int
c *sync.Cond
}
func NewHeap() *Heap {
return &amp;Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop(等待直到有可用项)
func (h *Heap) Pop() int {
h.c.L.Lock()
defer h.c.L.Unlock()
for len(h.b) == 0 {
h.c.Wait()
}
// 这里肯定有东西
x := h.b[len(h.b)-1]
h.b = h.b[:len(h.b)-1]
return x
}
func (h *Heap) Push(x int) {
defer h.c.Signal() // 唤醒一个弹出者
h.c.L.Lock()
defer h.c.L.Unlock()
// 添加并排序以保持优先级(实际上并不是堆的工作方式)
h.b = append(h.b, x)
sort.Ints(h.b)
}
func main() {
heap := NewHeap()
go func() {
for range time.Tick(time.Second) {
for n := 0; n &lt; 3; n++ {
x := rand.Intn(100)
fmt.Println(&quot;push:&quot;, x)
heap.Push(x)
}
}
}()
for {
item := heap.Pop()
fmt.Println(&quot;pop: &quot;, item)
}
}

(请注意,由于for range time.Tick循环的原因,此示例在playground中无法运行。请在本地运行。)

英文:

One way would be to use sync.Cond:

> Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.

An example from the package could be amended as follows (for the consumer):

c.L.Lock()
for heap.Len() == 0 {
c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&amp;pq).(*Item)
c.L.Unlock()
// Do stuff with the item

And producer could simply do:

c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()

(Wrapping these in functions and using defers might be a good idea.)

Here is an example of thread-safe (naive) heap which pop method waits until item is available:

package main
import (
&quot;fmt&quot;
&quot;sort&quot;
&quot;sync&quot;
&quot;time&quot;
&quot;math/rand&quot;
)
type Heap struct {
b []int
c *sync.Cond
}
func NewHeap() *Heap {
return &amp;Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop (waits until anything available)
func (h *Heap) Pop() int {
h.c.L.Lock()
defer h.c.L.Unlock()
for len(h.b) == 0 {
h.c.Wait()
}
// There is definitely something in there
x := h.b[len(h.b)-1]
h.b = h.b[:len(h.b)-1]
return x
}
func (h *Heap) Push(x int) {
defer h.c.Signal() // will wake up a popper
h.c.L.Lock()
defer h.c.L.Unlock()
// Add and sort to maintain priority (not really how the heap works)
h.b = append(h.b, x)
sort.Ints(h.b)
}
func main() {
heap := NewHeap()
go func() {
for range time.Tick(time.Second) {
for n := 0; n &lt; 3; n++ {
x := rand.Intn(100)
fmt.Println(&quot;push:&quot;, x)
heap.Push(x)
}
}
}()
for {
item := heap.Pop()
fmt.Println(&quot;pop: &quot;, item)
}
}

(Note this is not working in playground because of the for range time.Tick loop. Run it locally.)

huangapple
  • 本文由 发表于 2015年6月26日 04:19:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/31060023.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定