Kafka消息轮询增加尝试

huangapple go评论75阅读模式
英文:

Kafka message poll increase attempt

问题

当前KAFKA Consumer的设置如下:

max_poll_records: 2000
max_poll_interval_ms: 40000
fetch_min_bytes: 104857600
fetch_max_wait_ms: 10000
fetch_max_bytes: 104857600
request_timeout_ms: 300000
max_partition_fetch_bytes: 104857600
heartbeat_interval_ms: 60000
session_timeout_ms: 180000

当前KAFKA Broker的设置如下:

fetch.max.bytes: 104857600

使用这些设置,我能够一次轮询2000条消息,每条消息的大小约为84MB(一个轮询的大小)。
但是,我接收到的第二条消息比之前少,尽管消费者上存在足够的滞后。

我只推送到一个分区,所以轮询也只发生在一个分区上。

从代码的截图中可以看到,轮询的消息呈现以下模式:
第一次轮询:2000条消息
第二次轮询:484条消息
第三次轮询:2000条消息
第四次轮询:484条消息
第五次轮询:2000条消息
第六次轮询:484条消息
第七次轮询:548条消息
以此类推

有人可以帮我找出原因吗?为什么会出现这种情况?

我尝试在Google上搜索解决方案,但没有找到太多信息。

英文:

current setting of KAFKA Consumer:

 max_poll_records: 2000
 max_poll_interval_ms: 40000
 fetch_min_bytes: 104857600
 fetch_max_wait_ms: 10000
 fetch_max_bytes: 104857600
 request_timeout_ms: 300000
 max_partition_fetch_bytes: 104857600
 heartbeat_interval_ms: 60000
 session_timeout_ms: 180000

current setting at KAFKA Broker:

fetch.max.bytes: 104857600

with these settings I am able to poll 2000 messages at once of size 84MB approx (size of one poll).
but the second message i am receiving is lesser than before, while lag of sufficinet amount exists on that consumer.

I am pushing to only one partition so that poll also happens from one partition.

As seen in the screenshot of code, messages polled are in a pattern:
1st poll: 2000 messages
2nd poll: 484 messages
3rd poll: 2000 messages
4th poll: 484 messages
5th poll: 2000 messages
6th poll: 484 messages
7th poll: 548 messages
and so on

Can anyone help me figure this out, why is this happening?

I tried searching for the solution on google but didn't find much

答案1

得分: 1

Kafka消费者维护一个内部队列,一方面从代理服务器获取数据填充队列,另一方面通过轮询操作将队列中的数据取出。当你轮询了2000条记录后,队列几乎为空,Kafka消费者开始获取缺失的记录。由于你将fetch_min_bytes设置为100Mb,因此:
1)获取请求需要一定的时间才能完成,可能比处理轮询消息的时间更长。
2)如果内部队列中仍有一些记录可用,消费者可能不会调用获取请求,因为设置了如此高的最小大小会导致队列溢出。因此,在从代理服务器获取消息之前,需要进行另一次轮询操作以清空内部队列中剩余的记录。

因此,尝试大幅度减少fetch_min_bytes(例如减小十倍),或者直接删除该属性,因为其默认值为1。

英文:

Kafka consumer maintains an internal queue that on one hand is refilling with fetches from a broker and on the other hand is draining by your polls. After you poll 2000 records it becomes almost empty and Kafka consumer starts to fetch missing records. Since you set fetch_min_bytes to 100Mb it:

  1. takes tangible time fetch request to complete. Probably more than it takes you to process polled messages.
  2. consumer can either not call fetch since some records are still available in the internal queue, and a new fetch of such a high minimum size will lead to a queue overflow. So it takes another poll to drain remaining records in the internal queue before fetching messages from the broker.

So, try to dramatically (ten times, for example) decrease fetch_min_bytes or just remove that property since it's default is 1.

答案2

得分: 0

它受到max.partition.fetch.bytes的限制。

Kafka文档
> MAX.PARTITION.FETCH.BYTES

该属性控制服务器每个分区返回的最大字节数。默认值为1 MB,这意味着当KafkaConsumer.poll()返回ConsumerRecords时,每个分区分配给消费者的记录对象最多使用max.partition.fetch.bytes字节。因此,如果一个主题有20个分区,而你有5个消费者,每个消费者需要有4 MB的内存可用于ConsumerRecords。实际上,如果组中的其他消费者失败,你将需要分配更多的内存,因为每个消费者将需要处理更多的分区。max.partition.fetch.bytes必须大于代理将接受的最大消息大小(由代理配置中的max.message.size属性确定),否则代理可能会有消费者无法消费的消息,此时消费者将挂起尝试读取它们。设置max.partition.fetch.bytes时的另一个重要考虑因素是消费者处理数据所需的时间。如你所知,消费者必须频繁调用poll()以避免会话超时和随后的重新平衡。如果单个poll()返回的数据量非常大,消费者处理数据的时间可能会更长,这意味着它无法及时进入下一次轮询循环以避免会话超时。如果发生这种情况,有两个选择,要么降低max.partition.fetch.bytes,要么增加会话超时时间。

英文:

It is limited by max.partition.fetch.bytes,

Kafka Documentation
> MAX.PARTITION.FETCH.BYTES

This property controls the maximum number of bytes the server will return per partition. The default is 1 MB, which means that when KafkaConsumer.poll() returns ConsumerRecords, the record object will use at most max.partition.fetch.bytes per partition assigned to the consumer. So if a topic has 20 partitions, and you have 5 consumers, each consumer will need to have 4 MB of memory available for ConsumerRecords. In practice, you will want to allocate more memory as each consumer will need to handle more partitions if other consumers in the group fail. max. partition.fetch.bytes must be larger than the largest message a broker will accept (determined by the max.message.size property in the broker configuration), or the broker may have messages that the consumer will be unable to consume, in which case the consumer will hang trying to read them. Another important consideration when setting max.partition.fetch.bytes is the amount of time it takes the consumer to process data. As you recall, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a single poll() returns is very large, it may take the consumer longer to process, which means it will not get to the next iteration of the poll loop in time to avoid a session timeout. If this occurs, the two options are either to lower max. partition.fetch.bytes or to increase the session timeout.

huangapple
  • 本文由 发表于 2023年8月9日 17:18:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76866277.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定