重试在 Azure 服务总线的 Go 客户端库中处理吗?

huangapple go评论76阅读模式
英文:

Is retry handled from go client library for azure service bus?

问题

我正在参考 https://github.com/Azure/azure-service-bus-go 中的客户端库。我已经能够编写一个简单的客户端,用于监听订阅并读取消息。如果我断开网络连接,几秒钟后,我会看到接收循环退出,并显示错误消息 "context canceled"。我希望客户端库能够执行某种重试机制,并处理任何连接问题或服务器超时等。我们需要自己处理这个问题吗?此外,我们是否可以获得任何异常,以便用于重试机制?如果有示例代码,将不胜感激。

以下是我尝试的示例代码(仅包含关键部分):

err = subscription.Receive(ctx, servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
    fmt.Println(string(message.Data))
    return message.Complete(ctx)
}))

if err != nil {
    fmt.Println("FATAL: ", err)
    return
}
英文:

I'm referencing to the client library in https://github.com/Azure/azure-service-bus-go. I was able to write a simple client which listens to a subscription and read the messages. If I drop the network, after a few seconds I could see the receive loop exiting with the error message "context canceled". I was hoping the client library will do some kind of retry mechanism and handle any connection issues or any server timeouts etc. Do we need to handle this our selves ? Also do we get any exceptions which we could identify and use it for this retrying mechanism ? Any sample code would be highly appreciated.

below is the sample code I tried (only including the vital parts).

err = subscription.Receive(ctx, servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
		fmt.Println(string(message.Data))
		return message.Complete(ctx)
	}))

if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

答案1

得分: 1

很抱歉,旧的库在重试方面做得不好,所以你需要在自己的重试循环中包装代码。

func demoReceiveWithRecovery(ns *servicebus.Namespace) {
	parentCtx := context.TODO()

	for {
		q, err := ns.NewQueue(queueName)

		if err != nil {
			panic(err)
		}

		defer q.Close(parentCtx)

		handler := servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
			log.Printf("Received message")
			return m.Complete(c)
		})

		err = q.Receive(parentCtx, handler)

		// 检查可能的可恢复情况
		if err != nil && errors.Is(err, context.Canceled) {
			// 这个解决方法区分了处理程序取消是因为库断开连接还是父上下文(由您传入)被取消。
			if parentCtx.Err() != nil {
				// 我们的父上下文被取消,导致整个操作被取消
				log.Printf("Cancelled by parent context")
				return
			} else {
				// 由于错误而内部取消,我们可以重新启动队列
				log.Printf("Error occurred, restarting")
				_ = q.Close(parentCtx)
				continue
			}
		} else {
			log.Printf("Other error, closing client and restarting: %s", err.Error())
			_ = q.Close(parentCtx)
		}
	}
}

注意:这个库最近已经被弃用,但是在你发布问题之后。如果你想尝试新的包,可以在这里找到 azservicebus,这里有一个迁移指南 migrationguide.md 可以帮助你简化操作。

新的包在接收方面应该能够正确恢复。我正在处理发送方的一个开放性错误 #16695

如果你有进一步的问题或者有一些功能请求,你可以在 https://github.com/Azure/azure-sdk-for-go 的 github 问题中提交。

英文:

Unfortunately, the older library doesn't do a good job of retrying, so you'll need to wrap the code in your own retry loop.

func demoReceiveWithRecovery(ns *servicebus.Namespace) {
		parentCtx := context.TODO()

	for {
		q, err := ns.NewQueue(queueName)

		if err != nil {
			panic(err)
		}

		defer q.Close(parentCtx)

		handler := servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
			log.Printf("Received message")
			return m.Complete(c)
		})

		err = q.Receive(parentCtx, handler)

		// check for potential recoverable situation
		if err != nil && errors.Is(err, context.Canceled) {
			// This workaround distinguishes between the handler cancelling because the
			// library has disconnected vs the parent context (passed in by you) being cancelled.
			if parentCtx.Err() != nil {
				// our parent context cancelled, which caused the entire operation to be canceled
				log.Printf("Cancelled by parent context")
				return
			} else {
				// cancelled internally due to an error, we can restart the queue
				log.Printf("Error occurred, restarting")
				_ = q.Close(parentCtx)
				continue
			}
		} else {
			log.Printf("Other error, closing client and restarting: %s", err.Error())
			_ = q.Close(parentCtx)
		}
	}
}

NOTE: This library was deprecated recently, but it was deprecated after you posted your question. The new package is here if you want to try it - azservicebus with a migration guide here to make things easier: migrationguide.md

The new package should recover properly in this scenario for receiving. There is an open bug in the new package I'm working on for the sending side to do recovery #16695.

If you want to ask further questions or have some feature requests you can submit those in the github issues for https://github.com/Azure/azure-sdk-for-go.

huangapple
  • 本文由 发表于 2021年7月16日 18:21:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/68407394.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定