英文:
Go - wait for next item in a priority queue if empty
问题
我正在尝试实现一个基于优先级的优先队列,用于通过网络套接字发送 JSON 对象。我正在使用 container/heap
包来实现队列。我想到了以下的代码:
for {
if pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
jsonEncoder.Encode(&item)
} else {
time.Sleep(10 * time.Millisecond)
}
}
除了轮询优先队列以等待新项目之外,是否有更好的方法?
英文:
I am trying to implement a priority queue to send json objects through a network socket based on priority. I am using the container/heap
package to implement the queue. I came up with something like this:
for {
if pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
jsonEncoder.Encode(&item)
} else {
time.Sleep(10 * time.Millisecond)
}
}
Are there better ways to wait for a new item than just polling the priority queue?
答案1
得分: 3
我可能会使用一对排队的goroutine。从PriortyQueue示例中开始,我会构建一个像这样的函数:
func queue(in <-chan *Item, out chan<- *Item) {
// 创建一个队列
pq := make(PriorityQueue, 0)
heap.Init(&pq)
var currentItem *Item // 当前处理的项
var currentIn = in // 当前输入通道(有时可能为nil)
var currentOut chan<- *Item // 当前输出通道(一开始为nil,直到有数据)
defer close(out)
for {
select {
// 从输入通道读取数据
case item, ok := <-currentIn:
if !ok {
// 输入通道已关闭,停止尝试读取
currentIn = nil
// 如果没有待写入的数据,结束处理
if currentItem == nil {
return
}
continue
}
// 如果有待写入的数据,将其放回队列
if currentItem != nil {
heap.Push(&pq, currentItem)
}
// 将新的数据放入队列
heap.Push(&pq, item)
// 如果输出通道未开启,开启它
currentOut = out
// 获取最优的项,至少有一项,因为我们刚刚放入了一个
currentItem = heap.Pop(&pq).(*Item)
// 写入输出通道
case currentOut <- currentItem:
// 写入完成,还有其他数据吗?
if len(pq) > 0 {
// 保存下一次处理的项
currentItem = heap.Pop(&pq).(*Item)
} else {
// 没有数据可写入,输入通道是否已关闭?
if currentIn == nil {
// 输入通道已关闭,结束处理
return
}
// 否则,暂时关闭输出通道
currentItem = nil
currentOut = nil
}
}
}
}
以下是使用示例:
func main() {
// 一些项及其优先级
items := map[string]int{
"banana": 3, "apple": 2, "pear": 4,
}
in := make(chan *Item, 10) // 大输入缓冲区和无缓冲输出通道可获得最佳排序顺序。
out := make(chan *Item) // 但是,对于任何特定的值,系统都会“工作”。
// 启动排队引擎!
go queue(in, out)
// 在另一个goroutine中放入一些数据
go func() {
i := 0
for value, priority := range items {
in <- &Item{
value: value,
priority: priority,
index: i,
}
i++
}
close(in)
}()
// 读取结果
for item := range out {
fmt.Printf("%.2d:%s ", item.priority, item.value)
}
fmt.Println()
}
请注意,如果运行此示例,每次顺序可能会有所不同。这是正常的,它取决于输入和输出通道的速度。
英文:
I'd probably use a couple a queuing goroutine. Starting with the data structures in the PriorityQueue example, I'd build a function like this:
http://play.golang.org/p/hcNFX8ehBW
func queue(in <-chan *Item, out chan<- *Item) {
// Make us a queue!
pq := make(PriorityQueue, 0)
heap.Init(&pq)
var currentItem *Item // Our item "in hand"
var currentIn = in // Current input channel (may be nil sometimes)
var currentOut chan<- *Item // Current output channel (starts nil until we have something)
defer close(out)
for {
select {
// Read from the input
case item, ok := <-currentIn:
if !ok {
// The input has been closed. Don't keep trying to read it
currentIn = nil
// If there's nothing pending to write, we're done
if currentItem == nil {
return
}
continue
}
// Were we holding something to write? Put it back.
if currentItem != nil {
heap.Push(&pq, currentItem)
}
// Put our new thing on the queue
heap.Push(&pq, item)
// Turn on the output queue if it's not turned on
currentOut = out
// Grab our best item. We know there's at least one. We just put it there.
currentItem = heap.Pop(&pq).(*Item)
// Write to the output
case currentOut <- currentItem:
// OK, we wrote. Is there anything else?
if len(pq) > 0 {
// Hold onto it for next time
currentItem = heap.Pop(&pq).(*Item)
} else {
// Oh well, nothing to write. Is the input stream done?
if currentIn == nil {
// Then we're done
return
}
// Otherwise, turn off the output stream for now.
currentItem = nil
currentOut = nil
}
}
}
}
Here's an example of using it:
func main() {
// Some items and their priorities.
items := map[string]int{
"banana": 3, "apple": 2, "pear": 4,
}
in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
out := make(chan *Item) // But the system will "work" for any particular values
// Start the queuing engine!
go queue(in, out)
// Stick some stuff on in another goroutine
go func() {
i := 0
for value, priority := range items {
in <- &Item{
value: value,
priority: priority,
index: i,
}
i++
}
close(in)
}()
// Read the results
for item := range out {
fmt.Printf("%.2d:%s ", item.priority, item.value)
}
fmt.Println()
}
Note that if you run this example, the order will be a little different every time. That's of course expected. It depends on exactly how fast the input and output channels run.
答案2
得分: 2
一种方法是使用sync.Cond
:
> Cond实现了条件变量,是goroutine等待或宣布事件发生的会合点。
以下是该包中的一个示例(用于消费者)的修改版本:
c.L.Lock()
for heap.Len() == 0 {
c.Wait() // 等待直到被推送的例程发出信号
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// 处理item
生产者可以简单地执行以下操作:
c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()
(将它们封装在函数中并使用defer可能是个好主意。)
这是一个线程安全(天真的)堆的示例,其中pop方法会等待直到有可用的项:
package main
import (
"fmt"
"sort"
"sync"
"time"
"math/rand"
)
type Heap struct {
b []int
c *sync.Cond
}
func NewHeap() *Heap {
return &Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop(等待直到有可用项)
func (h *Heap) Pop() int {
h.c.L.Lock()
defer h.c.L.Unlock()
for len(h.b) == 0 {
h.c.Wait()
}
// 这里肯定有东西
x := h.b[len(h.b)-1]
h.b = h.b[:len(h.b)-1]
return x
}
func (h *Heap) Push(x int) {
defer h.c.Signal() // 唤醒一个弹出者
h.c.L.Lock()
defer h.c.L.Unlock()
// 添加并排序以保持优先级(实际上并不是堆的工作方式)
h.b = append(h.b, x)
sort.Ints(h.b)
}
func main() {
heap := NewHeap()
go func() {
for range time.Tick(time.Second) {
for n := 0; n < 3; n++ {
x := rand.Intn(100)
fmt.Println("push:", x)
heap.Push(x)
}
}
}()
for {
item := heap.Pop()
fmt.Println("pop: ", item)
}
}
(请注意,由于for range time.Tick
循环的原因,此示例在playground中无法运行。请在本地运行。)
英文:
One way would be to use sync.Cond
:
> Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.
An example from the package could be amended as follows (for the consumer):
c.L.Lock()
for heap.Len() == 0 {
c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item
And producer could simply do:
c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()
(Wrapping these in functions and using defers might be a good idea.)
Here is an example of thread-safe (naive) heap which pop method waits until item is available:
package main
import (
"fmt"
"sort"
"sync"
"time"
"math/rand"
)
type Heap struct {
b []int
c *sync.Cond
}
func NewHeap() *Heap {
return &Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop (waits until anything available)
func (h *Heap) Pop() int {
h.c.L.Lock()
defer h.c.L.Unlock()
for len(h.b) == 0 {
h.c.Wait()
}
// There is definitely something in there
x := h.b[len(h.b)-1]
h.b = h.b[:len(h.b)-1]
return x
}
func (h *Heap) Push(x int) {
defer h.c.Signal() // will wake up a popper
h.c.L.Lock()
defer h.c.L.Unlock()
// Add and sort to maintain priority (not really how the heap works)
h.b = append(h.b, x)
sort.Ints(h.b)
}
func main() {
heap := NewHeap()
go func() {
for range time.Tick(time.Second) {
for n := 0; n < 3; n++ {
x := rand.Intn(100)
fmt.Println("push:", x)
heap.Push(x)
}
}
}()
for {
item := heap.Pop()
fmt.Println("pop: ", item)
}
}
(Note this is not working in playground because of the for range time.Tick
loop. Run it locally.)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论