英文:
How to batch items in golang pipeline stages using channels?
问题
我正在阅读在线的管道教程,并尝试构建一个类似这样的阶段:
- 将传入的事件分批,每批包含10个事件,然后将它们发送到输出通道。
- 如果在5秒内没有收到10个事件,将收到的尽可能多的事件组合起来发送,并关闭输出通道并返回。
然而,我不知道第一个选择语句应该是什么样子的。尝试了多种方法,但无法解决这个问题。非常感谢任何指点!
func BatchEvents(inChan <-chan *Event) <-chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i := 0
for event := range inChan {
select {
case <-time.After(5 * time.Second):
if i < batchSize {
out <- &comboEvent
}
return
default:
if i < batchSize {
comboEvent.data = append(comboEvent.data, event.data)
i++
} else {
out <- &comboEvent
comboEvent = Event{}
i = 0
}
}
}
}()
return out
}
希望对你有帮助!
英文:
I'm reading the pipelines tutorial online and trying to construct a stage that operates like this --
- Batches up incoming events in batches of 10 each before sending them to the out chan
- If we haven't seen 10 events in 5 seconds, combine as many as we received and send them, closing the out chan and returning.
However, I have no idea what would the first select case would look like.Tried multiple things but couldn't get past this.
Any pointers much appreciated!
func BatchEvents(inChan <- chan *Event) <- chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for event := range inChan {
select {
case -WHAT GOES HERE?-:
if i < batchSize {
comboEvent.data = append(comboEvent.data, event.data)
i++;
} else {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i=0;
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
out <- &comboEvent
// stop after
return
}
}
}()
return out
}
答案1
得分: 3
而不是对通道进行范围遍历,你的第一个选择语句应该是从该通道中选择,将整个内容放在一个无限循环内。
func BatchEvents(inChan <-chan *Event) <-chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i := 0
for {
select {
case event, ok := <-inChan:
if !ok {
return
}
comboEvent.data = append(comboEvent.data, event.data)
i++
if i == batchSize {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i = 0
}
case <-time.After(5 * time.Second):
// 如果在5秒内批处理大小未填满,则处理到目前为止的内容
if i > 0 {
out <- &comboEvent
}
// 停止循环
return
}
}
}()
return out
}
英文:
Instead of doing a range over the channel, your first select case should be from that channel, with the whole thing inside an infinite loop.
func BatchEvents(inChan <-chan *Event) <-chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for {
select {
case event, ok := <-inChan:
if !ok {
return
}
comboEvent.data = append(comboEvent.data, event.data)
i++
if i == batchSize {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i = 0
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
if i > 0 {
out <- &comboEvent
}
// stop after
return
}
}
}()
return out
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论