如何在Golang管道阶段中使用通道批处理项目?

huangapple go评论80阅读模式
英文:

How to batch items in golang pipeline stages using channels?

问题

我正在阅读在线的管道教程,并尝试构建一个类似这样的阶段:

  1. 将传入的事件分批,每批包含10个事件,然后将它们发送到输出通道。
  2. 如果在5秒内没有收到10个事件,将收到的尽可能多的事件组合起来发送,并关闭输出通道并返回。

然而,我不知道第一个选择语句应该是什么样子的。尝试了多种方法,但无法解决这个问题。非常感谢任何指点!

func BatchEvents(inChan <-chan *Event) <-chan *Event {
    batchSize := 10
    comboEvent := Event{}
    go func() {
        defer close(out)
        i := 0
        for event := range inChan {
            select {
            case <-time.After(5 * time.Second):
                if i < batchSize {
                    out <- &comboEvent
                }
                return
            default:
                if i < batchSize {
                    comboEvent.data = append(comboEvent.data, event.data)
                    i++
                } else {
                    out <- &comboEvent
                    comboEvent = Event{}
                    i = 0
                }
            }
        }
    }()
    return out
}

希望对你有帮助!

英文:

I'm reading the pipelines tutorial online and trying to construct a stage that operates like this --

  1. Batches up incoming events in batches of 10 each before sending them to the out chan
  2. If we haven't seen 10 events in 5 seconds, combine as many as we received and send them, closing the out chan and returning.

However, I have no idea what would the first select case would look like.Tried multiple things but couldn't get past this.
Any pointers much appreciated!

func BatchEvents(inChan &lt;- chan *Event) &lt;- chan *Event {
	batchSize := 10
	comboEvent := Event{}
	go func() {
		defer close(out)
		i = 0
		for event := range inChan {
			select {
			case -WHAT GOES HERE?-:
				if i &lt; batchSize {
					comboEvent.data = append(comboEvent.data, event.data)
					i++;
				} else {
					out &lt;- &amp;comboEvent
					// reset for next batch
					comboEvent = Event{}
					i=0;
				}
			case &lt;-time.After(5 * time.Second):
				// process whatever we have seen so far if the batch size isn&#39;t filled in 5 secs
				out &lt;- &amp;comboEvent
				// stop after
				return
			}
		}
	}()
	return out
}

答案1

得分: 3

而不是对通道进行范围遍历,你的第一个选择语句应该是从该通道中选择,将整个内容放在一个无限循环内。

func BatchEvents(inChan <-chan *Event) <-chan *Event {
    batchSize := 10
    comboEvent := Event{}
    go func() {
        defer close(out)
        i := 0
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    return
                }
                comboEvent.data = append(comboEvent.data, event.data)
                i++
                if i == batchSize {
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event{}
                    i = 0
                }
            case <-time.After(5 * time.Second):
                // 如果在5秒内批处理大小未填满,则处理到目前为止的内容
                if i > 0 {
                    out <- &comboEvent
                }
                // 停止循环
                return
            }
        }
    }()
    return out
}
英文:

Instead of doing a range over the channel, your first select case should be from that channel, with the whole thing inside an infinite loop.

func BatchEvents(inChan &lt;-chan *Event) &lt;-chan *Event {
	batchSize := 10
	comboEvent := Event{}
	go func() {
		defer close(out)
		i = 0
		for {
			select {
			case event, ok := &lt;-inChan:
				if !ok {
					return
				}
				comboEvent.data = append(comboEvent.data, event.data)
				i++
				if i == batchSize {
					out &lt;- &amp;comboEvent
					// reset for next batch
					comboEvent = Event{}
					i = 0
				}
			case &lt;-time.After(5 * time.Second):
				// process whatever we have seen so far if the batch size isn&#39;t filled in 5 secs
				if i &gt; 0 {
					out &lt;- &amp;comboEvent
				}
				// stop after
				return
			}
		}
	}()
	return out
}

huangapple
  • 本文由 发表于 2017年8月25日 07:41:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/45872441.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定