英文:
Is retry handled from go client library for azure service bus?
问题
我正在参考 https://github.com/Azure/azure-service-bus-go 中的客户端库。我已经能够编写一个简单的客户端,用于监听订阅并读取消息。如果我断开网络连接,几秒钟后,我会看到接收循环退出,并显示错误消息 "context canceled"。我希望客户端库能够执行某种重试机制,并处理任何连接问题或服务器超时等。我们需要自己处理这个问题吗?此外,我们是否可以获得任何异常,以便用于重试机制?如果有示例代码,将不胜感激。
以下是我尝试的示例代码(仅包含关键部分):
err = subscription.Receive(ctx, servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(string(message.Data))
return message.Complete(ctx)
}))
if err != nil {
fmt.Println("FATAL: ", err)
return
}
英文:
I'm referencing to the client library in https://github.com/Azure/azure-service-bus-go. I was able to write a simple client which listens to a subscription and read the messages. If I drop the network, after a few seconds I could see the receive loop exiting with the error message "context canceled". I was hoping the client library will do some kind of retry mechanism and handle any connection issues or any server timeouts etc. Do we need to handle this our selves ? Also do we get any exceptions which we could identify and use it for this retrying mechanism ? Any sample code would be highly appreciated.
below is the sample code I tried (only including the vital parts).
err = subscription.Receive(ctx, servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(string(message.Data))
return message.Complete(ctx)
}))
if err != nil {
fmt.Println("FATAL: ", err)
return
}
答案1
得分: 1
很抱歉,旧的库在重试方面做得不好,所以你需要在自己的重试循环中包装代码。
func demoReceiveWithRecovery(ns *servicebus.Namespace) {
parentCtx := context.TODO()
for {
q, err := ns.NewQueue(queueName)
if err != nil {
panic(err)
}
defer q.Close(parentCtx)
handler := servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
log.Printf("Received message")
return m.Complete(c)
})
err = q.Receive(parentCtx, handler)
// 检查可能的可恢复情况
if err != nil && errors.Is(err, context.Canceled) {
// 这个解决方法区分了处理程序取消是因为库断开连接还是父上下文(由您传入)被取消。
if parentCtx.Err() != nil {
// 我们的父上下文被取消,导致整个操作被取消
log.Printf("Cancelled by parent context")
return
} else {
// 由于错误而内部取消,我们可以重新启动队列
log.Printf("Error occurred, restarting")
_ = q.Close(parentCtx)
continue
}
} else {
log.Printf("Other error, closing client and restarting: %s", err.Error())
_ = q.Close(parentCtx)
}
}
}
注意:这个库最近已经被弃用,但是在你发布问题之后。如果你想尝试新的包,可以在这里找到 azservicebus,这里有一个迁移指南 migrationguide.md 可以帮助你简化操作。
新的包在接收方面应该能够正确恢复。我正在处理发送方的一个开放性错误 #16695。
如果你有进一步的问题或者有一些功能请求,你可以在 https://github.com/Azure/azure-sdk-for-go 的 github 问题中提交。
英文:
Unfortunately, the older library doesn't do a good job of retrying, so you'll need to wrap the code in your own retry loop.
func demoReceiveWithRecovery(ns *servicebus.Namespace) {
parentCtx := context.TODO()
for {
q, err := ns.NewQueue(queueName)
if err != nil {
panic(err)
}
defer q.Close(parentCtx)
handler := servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
log.Printf("Received message")
return m.Complete(c)
})
err = q.Receive(parentCtx, handler)
// check for potential recoverable situation
if err != nil && errors.Is(err, context.Canceled) {
// This workaround distinguishes between the handler cancelling because the
// library has disconnected vs the parent context (passed in by you) being cancelled.
if parentCtx.Err() != nil {
// our parent context cancelled, which caused the entire operation to be canceled
log.Printf("Cancelled by parent context")
return
} else {
// cancelled internally due to an error, we can restart the queue
log.Printf("Error occurred, restarting")
_ = q.Close(parentCtx)
continue
}
} else {
log.Printf("Other error, closing client and restarting: %s", err.Error())
_ = q.Close(parentCtx)
}
}
}
NOTE: This library was deprecated recently, but it was deprecated after you posted your question. The new package is here if you want to try it - azservicebus with a migration guide here to make things easier: migrationguide.md
The new package should recover properly in this scenario for receiving. There is an open bug in the new package I'm working on for the sending side to do recovery #16695.
If you want to ask further questions or have some feature requests you can submit those in the github issues for https://github.com/Azure/azure-sdk-for-go.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论