英文:
How to make every message process successfully?
问题
以下是一个具有3个Go协程的服务,用于处理来自Kafka的消息:
Channel-1和Channel-2是Go中的无缓冲数据通道。通道类似于排队机制。
Goroutine-1从kafka主题中读取消息,在验证消息后,将其消息负载放入Channel-1中。
Goroutine-2从Channel-1中读取并处理负载,然后将处理后的负载放入Channel-2中。
Goroutine-3从Channel-2中读取,并将处理后的负载封装到HTTP数据包中,并通过HTTP请求(使用HTTP客户端)发送到另一个服务。
上述流程中的漏洞:在我们的情况下,处理失败是由于服务之间的网络连接不良或远程服务无法接受来自Go协程3的HTTP请求(HTTP客户端超时)导致的,因此,上述服务会丢失该消息(已从Kafka主题中读取)。
Goroutine-1当前从Kafka订阅消息而不向Kafka发送确认(通知Goroutine-3成功处理了特定消息)
正确性优先于性能。
如何确保每条消息都被成功处理?
英文:
Below is a service with set of 3 Go-routines that process a message from Kafka:
Channel-1 & Channel-2 are unbuffered data channels in Go. Channel is like a queuing mechanism.
Goroutine-1 reads a message from a kafka topic, throw its message payload on Channel-1, after validation of the message.
Goroutine-2 reads from Channel-1 and processes the payload and throws the processed payload on Channel-2.
Goroutine-3 reads from Channel-2 and encapsulates the processed payload into http packet and perform http requests(using http client) to another service.
Loophole in the above flow: In our case, processing fails either due to bad network connections between services or remote service is not ready to accept http requests from Go-routine3(http client timeout), due to which, above service lose that message(already read from Kafka topic).
Goroutine-1 currently subscribes the message from Kafka without an acknowledgement sent to Kafka(to inform that specific message is processed successfully by Goroutine-3)
Correctness is preferred over performance.
How to ensure that every message is processed successfully?
答案1
得分: 1
为确保正确性,您需要在成功处理完成后提交(确认)消息。
对于处理未成功完成的情况 - 通常情况下,您需要自己实现重试机制。
这应该是针对您的用例的特定实现,但通常情况下,您可以将消息发送回专用的Kafka重试主题(您创建的),添加一个延时,然后再次处理该消息。如果处理失败次数达到一定次数(x次),则将消息发送到DLQ(死信队列)。
您可以在以下链接中了解更多信息:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/
英文:
To ensuring correctness you need to commit (=acknowledge) the message after processing finished successfully.
For the cases when the processing wasn't finished successfully - in general, you need to implement retry mechanism by yourself.
That should be specific to your use-case, but generally you throw the message back to a dedicated Kafka retry topic (that you create), add a sleep and process the message again. if after x times the processing fails - you throw the message to a DLQ (=dead letter queue).
You can read more here:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/
答案2
得分: 1
例如,通过新的Channel-3,从Goroutine-3向Goroutine-1添加一个反馈。Goroutine-1将会阻塞,直到从Channel-3收到确认。
// 在Goroutine-1中
channel1 <- data
select {
case <-channel3:
case <-ctx.Done(): // 或者其他防止死锁的操作
}
...
// 在Goroutine-3中
data := <-channel2
for {
if err := sendData(data); err == nil {
break
}
}
channel3 <- struct{}{}
英文:
E.g., add a feedback from Goroutine-3 to Goroutine-1 through new Channel-3. Goroutine-1 will block until it get acknowledgement from Channel-3.
// in gorouting 1
channel1 <- data
select {
case <-channel3:
case <-ctx.Done(): // or smth else to prevent deadlock
}
...
// in gorouting 3
data := <-channel2
for {
if err := sendData(data); err == nil {
break
}
}
channel3<-struct{}{}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论