英文:
Go, How do I pull X messages from a channel at a time
问题
我有一个带有传入消息的通道,并且有一个Go协程在等待它。
我处理这些消息并将它们发送到另一个服务器。
如果准备好了,我想一次处理100条消息,
或者在5秒钟后处理其中的所有消息,然后再次等待。
在Go语言中,你可以这样做:
package main
import (
"fmt"
"time"
)
func main() {
messageChannel := make(chan string)
go processMessages(messageChannel)
// 模拟发送消息到通道
for i := 1; i <= 1000; i++ {
messageChannel <- fmt.Sprintf("Message %d", i)
}
// 关闭通道,以便告知协程没有更多的消息
close(messageChannel)
// 等待协程完成处理
time.Sleep(time.Second)
fmt.Println("All messages processed")
}
func processMessages(messageChannel chan string) {
var messages []string
timer := time.NewTimer(5 * time.Second)
for {
select {
case message, ok := <-messageChannel:
if !ok {
// 通道已关闭,处理剩余的消息并退出
processBatch(messages)
return
}
messages = append(messages, message)
if len(messages) == 100 {
// 达到100条消息,处理批量消息
processBatch(messages)
messages = nil
}
case <-timer.C:
// 定时器触发,处理当前的消息批次
processBatch(messages)
messages = nil
timer.Reset(5 * time.Second)
}
}
}
func processBatch(messages []string) {
fmt.Println("Processing batch of", len(messages), "messages")
// 在这里处理消息,发送到另一个服务器
}
这个例子中,我们创建了一个带有字符串类型的通道messageChannel
,并在processMessages
函数中处理消息。
在main
函数中,我们模拟发送1000条消息到通道,并在发送完毕后关闭通道。
processMessages
函数中的select
语句用于监听通道的消息和定时器的触发事件。
如果收到消息,我们将消息添加到messages
切片中,并在达到100条消息时处理批量消息。
如果通道已关闭,我们处理剩余的消息并退出。
如果定时器触发,我们也会处理当前的消息批次。
请根据你的实际需求修改processBatch
函数,将消息发送到另一个服务器。
希望对你有帮助!
英文:
I have a channel with incoming messages and a go routine that waits on it
I process these messages and send them to a different server
I would like to either process 100 messages at a time if they are ready,
or after say 5 seconds process what ever is in there and go wait again
How do I do that in Go
答案1
得分: 14
你使用的从消息通道读取的例程应该定义一个缓存,用于存储传入的消息。这些缓存的消息会在缓存达到100条消息或经过5秒后,批量发送到远程服务器。你可以使用定时器通道和Go的select
语句来确定哪个事件先发生。
以下示例可以在Go playground上运行:
package main
import (
"fmt"
"math/rand"
"time"
)
type Message int
const (
CacheLimit = 100
CacheTimeout = 5 * time.Second
)
func main() {
input := make(chan Message, CacheLimit)
go poll(input)
generate(input)
}
// poll检查传入的消息并在内部进行缓存,直到达到最大数量或超时。
func poll(input <-chan Message) {
cache := make([]Message, 0, CacheLimit)
tick := time.NewTicker(CacheTimeout)
for {
select {
// 检查是否有新消息。
// 如果有,存储它并检查缓存是否超过大小限制。
case m := <-input:
cache = append(cache, m)
if len(cache) < CacheLimit {
break
}
// 重置超时计时器。
// 否则会发送太多消息。
tick.Stop()
// 发送缓存的消息并重置缓存。
send(cache)
cache = cache[:0]
// 重新创建计时器,以保持超时触发的一致性。
tick = time.NewTicker(CacheTimeout)
// 如果达到超时时间,无论缓存大小如何,都发送当前消息缓存。
case <-tick.C:
send(cache)
cache = cache[:0]
}
}
}
// send将缓存的消息发送到远程服务器。
func send(cache []Message) {
if len(cache) == 0 {
return // 这里没有要处理的内容。
}
fmt.Printf("%d 条消息待处理\n", len(cache))
}
// generate创建一些随机消息并将它们推送到给定的通道中。
//
// 不是解决方案的一部分。这只是模拟你用于创建消息的方式,通过在随机时间间隔创建新消息。
func generate(input chan<- Message) {
for {
select {
case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
input <- Message(rand.Int())
}
}
}
英文:
The routine you use to read from the message channel should define a cache in which incoming messages are stored. These cached messages are then sent to the remote server in bulk either when the cache reaches 100 messages, or 5 seconds have passed. You use a timer channel and Go's select
statement to determine which one occurs first.
The following example can be run on the Go playground
package main
import (
"fmt"
"math/rand"
"time"
)
type Message int
const (
CacheLimit = 100
CacheTimeout = 5 * time.Second
)
func main() {
input := make(chan Message, CacheLimit)
go poll(input)
generate(input)
}
// poll checks for incoming messages and caches them internally
// until either a maximum amount is reached, or a timeout occurs.
func poll(input <-chan Message) {
cache := make([]Message, 0, CacheLimit)
tick := time.NewTicker(CacheTimeout)
for {
select {
// Check if a new messages is available.
// If so, store it and check if the cache
// has exceeded its size limit.
case m := <-input:
cache = append(cache, m)
if len(cache) < CacheLimit {
break
}
// Reset the timeout ticker.
// Otherwise we will get too many sends.
tick.Stop()
// Send the cached messages and reset the cache.
send(cache)
cache = cache[:0]
// Recreate the ticker, so the timeout trigger
// remains consistent.
tick = time.NewTicker(CacheTimeout)
// If the timeout is reached, send the
// current message cache, regardless of
// its size.
case <-tick.C:
send(cache)
cache = cache[:0]
}
}
}
// send sends cached messages to a remote server.
func send(cache []Message) {
if len(cache) == 0 {
return // Nothing to do here.
}
fmt.Printf("%d message(s) pending\n", len(cache))
}
// generate creates some random messages and pushes them into the given channel.
//
// Not part of the solution. This just simulates whatever you use to create
// the messages by creating a new message at random time intervals.
func generate(input chan<- Message) {
for {
select {
case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
input <- Message(rand.Int())
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论