如何限制Kafka消费者拉取的数据量(以字节为单位)?

huangapple go评论101阅读模式
英文:

How to limit kafka consumer poll result in bytes?

问题

我的设置有三个Kafka代理和十几个主题,每个主题有15个分区。
Java应用程序启动15个线程,每个消费者订阅所有主题。假设每个消费者被分配了三个不同主题的分区。主题具有String记录,每个记录都小于1KB。消费者配置是:

      "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
      "value.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
      "bootstrap.servers" = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
      "group.id" = "my-group"
      "enable.auto.commit" = "false"
      "fetch.max.bytes" = "60000"
      "max.poll.records" = "10000000"
      "auto.offset.reset" = "earliest"

我已将 fetch.max.bytes 设置为60000字节,以限制整个 consumer.poll 结果,从而在下一步将数据发送到每次调用限制为64KB的API。但实际上,consumer.poll 返回的字节数要多得多,例如150KB。并且API调用失败。

在配置和应用程序设计中我漏掉了什么?如何严格限制 consumer.poll 结果的字节数?

文档 中提到:

> 请注意,消费者并行执行多个提取。

这是什么意思?我应该限制并行性吗?

英文:

My setup has three Kafka brokers and a dozen of topics with 15 partitions each.
Java application starts 15 thread in which every consumer subscribes all topics. Let's say every consumer is assigned with three partitions of different topics. Topics has String records, each of them is less than 1Kb. Consumer config is:

  "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
  "value.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
  "bootstrap.servers" = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
  "group.id" = "my-group"
  "enable.auto.commit" = "false"
  "fetch.max.bytes" = "60000"
  "max.poll.records" = "10000000"
  "auto.offset.reset" = "earliest"

I've set fetch.max.bytes to 60000 bytes to limit the whole consumer.poll result so as to send data on the next step to an API that has limited with 64Kb per call. But in practice consumer.poll returns much more bytes, for instance 150Kb. And API call fails.

What I've missed in the config and in the app design? How to strictly limit the result of consumer.poll in bytes?

The documentation says:

> Note that the consumer performs multiple fetches in parallel.

What does it mean? Should I limit a parallelism?

答案1

得分: 1

问题实际上与此备注有关:

请注意,消费者会同时执行多个抓取操作。

KafkaConsumer 创建 Fetcher 并将 fetch.max.bytes 传递给其构造函数。Fetcher 使用 此值来向节点 发送 并行请求。

因此,应考虑经纪人的数量,以限制 poll 结果的字节数。在我拥有三个经纪人的集群中,fetch.max.bytes 应设置为 20000,以使 poll 结果永远不会超过 60000 字节。

英文:

The problem is really connected with this note:

> Note that the consumer performs multiple fetches in parallel.

KafkaConsumer creates Fetcher and passes fetch.max.bytes to its constructor. Fetcher uses this value to send parallel requests to nodes.

Number of brokers thus should be taken into account to limit the poll result in bytes. In my cluster of three brokers fetch.max.bytes should be equal to 20000 so as to poll result will never exceed 60000 bytes.

答案2

得分: 0

使用fetch.max.bytes时需要注意的一点是记录以批次发送到客户端,如果经纪人必须发送的第一批记录超过此大小,将发送该批记录并忽略限制。这确保了消费者可以继续取得进展。

我的建议是,由于您对下游 API 有如此严格的限制,请考虑将 max.poll.records 减小到大约 50,这样最多每次将轮询 50 条记录,由于您的一条记录大小为 1KB,因此轮询请求的大小将约为 50KB。如果存在某些记录可能为 2KB 或 5KB 大小的可能性,请进一步减小 max.poll.records 的大小(明显的缺点是现在您的消费者将进行更多的往返,导致吞吐量减少)。

英文:

One thing that you have to note when using fetch.max.bytes is that records are sent to the client in batches, and if the first record-batch that the broker has to send exceeds this size, the batch will be sent and the limit will be ignored. This guarantees that the consumer can continue making progress.

My suggestion is, since you have such tight limits of downstreams API, consider decreasing max.poll.records to around 50, so that at max you will poll 50 records and since your 1 record size is 1KB, the size of your poll request will be around 50KB. If there is a chance that some records can be 2KB or 5KB in size, consider decreasing size of your max.poll.records further (with an obvious downside that now your consumer will be doing more round-trips resulting in decrease in throughput).

huangapple
  • 本文由 发表于 2023年3月3日 18:25:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75625867.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定