Kafka重启时,Kafka消费者接收到损坏的消息。

huangapple go评论87阅读模式
英文:

Kafka consumers receive corrupted messages when Kafka restarts

问题

我正在运行一个具有复制因子为2的Kafka服务,生产者在写入时要求1个确认。
在消费者端,我们没有使用消费者组,所有消费者都连接到明确配置的分区号。

我们观察到,当我们对Kafka集群进行滚动重启时,当一个Kafka服务器关闭时,消费者消费的分区会在几毫秒内接收到损坏的消息。

我还没有找到损坏的模式,看起来消息的某些部分会被随机值替换。甚至那些应该始终是ASCII字符的字段也会出现非ASCII字节值。

在Kafka没有被重新启动的其他时间,消费者从未收到过这些损坏的消息,这只会在Kafka重新启动时发生。

在生产者和消费者端,我们使用的是segment-io Kafka客户端:https://github.com/segmentio/kafka-go

我目前还不确定这是Kafka的问题还是segment-io Kafka客户端的问题。

英文:

I'm running a Kafka service with replication factor 2, the producers are requiring 1 ack on write.
On the consumer side we're not using consumer groups, all the consumers are connecting to explicitly configured partition numbers.

We have observed many times that when we do a rolling restart of the Kafka cluster, at the moment when a Kafka server goes down the consumers which are consuming the partitions of which that Kafka server was the leader receive corrupted messages for a few milliseconds.

I have not identified a pattern in the corruption, it looks like certain parts of the messages just get replaced with random values. Even fields which are supposed to always be ascii characters end up with non-ascii byte values.

During all other times while Kafka isn't being restarted, the consumers never receive any of these corrupted messages, this only happens when Kafka gets restarted.

On the producer and consumer side we're using the segment-io Kafka client: https://github.com/segmentio/kafka-go

I'm currently not sure yet if this is a Kafka problem of a problem in the segment-io Kafka client.

答案1

得分: 0

在生产者端,我们使用segmentio/kafka-goWriter.WriteMessages()方法将消息发布到Kafka。该方法接受一个上下文来异步取消操作,并接受要发布的Kafka消息:

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error

在消息成功发布后,我们会对给定消息中的字节切片进行池化和重用,以减少Go的垃圾回收负载。通常情况下,这种做法是有效的,但在以下情况下可能会出现问题:当传递给.WriteMessages()的上下文在消息成功写入之前被取消。

我们将一个超时为2s的上下文传递给.WriteMessages()。通常情况下,这不会有问题,因为消息写入Kafka通常不需要2s的时间。但在某些情况下,如果其中一个Kafka实例关闭,写入器需要切换到其他副本来服务受影响的分区。在这种情况下,上下文超时,.WriteMessages()返回错误,然后我们重新使用之前传递给它的消息的字节切片来构建下一批消息。但此时,写入器仍然在其内部队列中保留这些消息,并在我们重新使用它们之后稍后写入它们。

我已经向segmentio/kafka-go项目提交了一个PR,在注释中对此进行了澄清:

https://github.com/segmentio/kafka-go/pull/1139

英文:

On the producer side we use segmentio/kafka-go's Writer.WriteMessages() to publish messages to Kafka, this method takes a context to cancel the operation asynchronously and the Kafka messages to publish:

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error

We pool and re-use the byte slices in the given messages after a message has been published to reduce the load on Go's GC, which usually works fine, except in a scenario where .WriteMessages() gets passed a context that gets canceled before the passed messages have been written successfully.

We pass a context with a timeout of 2s into .WriteMessages(), this is usually no problem because it never takes 2s until the messages have been written to Kafka, except in a situation where one of the Kafkas gets shut down and the writer needs to switch to the other replica serving the affected partitions. In this situation the context timed out and .WriteMessages() returned with an error, then we re-used the byte slices of the messages that we previously passed into it to build the next batch of messages but at this moment the writer still had these messages in its internal queue and wrote them later while/after we already re-used them.

I have submitted a PR to the segmentio/kafka-go project to clarify this in a comment:

https://github.com/segmentio/kafka-go/pull/1139

huangapple
  • 本文由 发表于 2023年5月23日 02:35:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76309028.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定