英文:
Reading all the available messages from Google Pub Sub using Go
问题
我正在尝试从Google Pub-Sub的主题中获取所有可用的消息。但是在Go语言中,我找不到一个配置可以在Pub-Sub中没有剩余消息时取消接收回调。
我认为一种方法是使用Google Cloud Monitoring API中描述的方法获取Pub-Sub中的消息总数,然后保持已读消息数量的计数,并在计数等于总数时调用取消函数。但是我不确定这是否是正确的方法。
我还尝试使用带有超时的上下文,即在上下文的截止时间之前获取消息,然后取消。
但是这样做不能确保所有消息都已处理。
请提供一个解决方案,确保当Pub-Sub中没有剩余消息时,subscription.Receive函数停止。
英文:
I am trying to get all the available messages from a topic in google pub-sub.
But in go I am not able to find a configuration which can cancel the receive callback once there are no more messages remaining in the Pub-Sub.
One approach I think is to get the total number of messages from Pub-Sub using the Google Cloud Monitoring Api's discribed in this answer <https://stackoverflow.com/questions/35475082/google-pubsub-counting-messages-in-topic> and then keeping a count of the number of messages read and calling cancel if the count is equal to the number, but I am not so sure if this is the right approach to move ahead.
var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
received++
if received == TotalNumberOfMessages {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
I have tried using the context with timeout as well, i.e. fetch until this context deadline is not met, after that cancel.
ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}
But then again that won't give me certainty that all the messages have been processed.
Please suggest a solution that can make sure that subscription.Receive function stops when there are no more messages remaining in the Pub-Sub.
答案1
得分: 2
我已经在我之前的公司实施过这个(可惜我不再拥有代码,它在我之前的公司的git中...)。然而,它是有效的。
原理如下:
msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
msg <- m
})
for {
select {
case res := <-msg:
fmt.Fprintf(w, "收到消息:%q\n", string(res.Data))
res.Ack()
case <-time.After(3 * time.Second):
fmt.Println("超时")
cancel()
}
}
英文:
I already implemented that in my previous company (sadly I no longer have the code, it is in my previous company git...). However it worked.
The principle was the following
msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
msg <- m
})
for {
select {
case res := <-msg:
fmt.Fprintf(w, "Got message: %q\n", string(res.Data))
res.Ack()
case <-time.After(3 * time.Second):
fmt.Println("timeout")
cancel()
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论