如何确保每条消息都能成功处理?

huangapple go评论126阅读模式
英文:

How to make every message process successfully?

问题

以下是一个具有3个Go协程的服务,用于处理来自Kafka的消息:

如何确保每条消息都能成功处理?


Channel-1和Channel-2是Go中的无缓冲数据通道。通道类似于排队机制。

Goroutine-1从kafka主题中读取消息,在验证消息后,将其消息负载放入Channel-1中。

Goroutine-2从Channel-1中读取并处理负载,然后将处理后的负载放入Channel-2中。

Goroutine-3从Channel-2中读取,并将处理后的负载封装到HTTP数据包中,并通过HTTP请求(使用HTTP客户端)发送到另一个服务。

上述流程中的漏洞:在我们的情况下,处理失败是由于服务之间的网络连接不良或远程服务无法接受来自Go协程3的HTTP请求(HTTP客户端超时)导致的,因此,上述服务会丢失该消息(已从Kafka主题中读取)。


Goroutine-1当前从Kafka订阅消息而不向Kafka发送确认(通知Goroutine-3成功处理了特定消息)

正确性优先于性能。


如何确保每条消息都被成功处理?

英文:

Below is a service with set of 3 Go-routines that process a message from Kafka:

如何确保每条消息都能成功处理?


Channel-1 & Channel-2 are unbuffered data channels in Go. Channel is like a queuing mechanism.

Goroutine-1 reads a message from a kafka topic, throw its message payload on Channel-1, after validation of the message.

Goroutine-2 reads from Channel-1 and processes the payload and throws the processed payload on Channel-2.

Goroutine-3 reads from Channel-2 and encapsulates the processed payload into http packet and perform http requests(using http client) to another service.

Loophole in the above flow: In our case, processing fails either due to bad network connections between services or remote service is not ready to accept http requests from Go-routine3(http client timeout), due to which, above service lose that message(already read from Kafka topic).


Goroutine-1 currently subscribes the message from Kafka without an acknowledgement sent to Kafka(to inform that specific message is processed successfully by Goroutine-3)

Correctness is preferred over performance.


How to ensure that every message is processed successfully?

答案1

得分: 1

为确保正确性,您需要在成功处理完成后提交(确认)消息。
对于处理未成功完成的情况 - 通常情况下,您需要自己实现重试机制。
这应该是针对您的用例的特定实现,但通常情况下,您可以将消息发送回专用的Kafka重试主题(您创建的),添加一个延时,然后再次处理该消息。如果处理失败次数达到一定次数(x次),则将消息发送到DLQ(死信队列)。
您可以在以下链接中了解更多信息:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/

英文:

To ensuring correctness you need to commit (=acknowledge) the message after processing finished successfully.
For the cases when the processing wasn't finished successfully - in general, you need to implement retry mechanism by yourself.
That should be specific to your use-case, but generally you throw the message back to a dedicated Kafka retry topic (that you create), add a sleep and process the message again. if after x times the processing fails - you throw the message to a DLQ (=dead letter queue).
You can read more here:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/

答案2

得分: 1

例如,通过新的Channel-3,从Goroutine-3向Goroutine-1添加一个反馈。Goroutine-1将会阻塞,直到从Channel-3收到确认。

  1. // 在Goroutine-1中
  2. channel1 <- data
  3. select {
  4. case <-channel3:
  5. case <-ctx.Done(): // 或者其他防止死锁的操作
  6. }
  7. ...
  8. // 在Goroutine-3中
  9. data := <-channel2
  10. for {
  11. if err := sendData(data); err == nil {
  12. break
  13. }
  14. }
  15. channel3 <- struct{}{}
英文:

E.g., add a feedback from Goroutine-3 to Goroutine-1 through new Channel-3. Goroutine-1 will block until it get acknowledgement from Channel-3.

  1. // in gorouting 1
  2. channel1 &lt;- data
  3. select {
  4. case &lt;-channel3:
  5. case &lt;-ctx.Done(): // or smth else to prevent deadlock
  6. }
  7. ...
  8. // in gorouting 3
  9. data := &lt;-channel2
  10. for {
  11. if err := sendData(data); err == nil {
  12. break
  13. }
  14. }
  15. channel3&lt;-struct{}{}

huangapple
  • 本文由 发表于 2021年9月9日 18:33:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/69116648.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定