Kafka的偏移量即使在未达到确认(Ack)时也会增加。

huangapple go评论76阅读模式
英文:

Kafka offset increments even without reaching Ack

问题

我有一个消费者它消费一条消息执行一些繁重的任务然后进行确认操作

@KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory = "ContainerFactory")
public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {

  try {
    // 繁重的任务
    ack.acknowledge();
  } catch (Exception e) {
    log("Kafka 消费者错误");
  }
}

现在如果出现异常它应该进入 catch并且不应进行确认操作如果确认操作未完成它应该回到队列中并被重新处理但实际情况并非如此偏移量会更新并且会选择下一条消息
我理解消费者有一个轮询大小使其可以一次选择多条消息但即使一条消息没有被确认它也应该重新处理它而不是忽略它并更新偏移量

以下是 Kafka 消费者配置

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
英文:

I have a consumer which consumes a message, does some heavy job and then acks.

    @KafkaListener(topics = &quot;${kafka.topic}&quot;, groupId = &quot;group&quot;, containerFactory =&quot;ContainerFactory&quot;)
  public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {

try {
  //Heavy Job
  ack.acknowledge();
} catch (Exception e) {
  log(&quot;Error in Kafka Consumer);
    }
  }

Now if there is an exception, it should go to the catch block and Acknowledgement should not happen and if acknowledgement did not happen it should go back to the queue and be processed again. But it is not happening. The offset updates and the next message is picked.
I understand that there is a poll size of the consumer which enables it to pick more than one message at a time. But even if one message is not acked it should reprocess it instead of ignoring it and updating the offset.

Here is the Kafka Consumer Config

`Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

答案1

得分: 2

这是底层 KafkaConsumer 的预期行为。

在内部,KafkaConsumer 使用了在 JavaDocs 中描述的 poll API,如下所示:

> “在每次轮询中,消费者将尝试使用上次消费的偏移量作为起始偏移量并按顺序获取数据。上次消费的偏移量可以通过 seek(TopicPartition, long) 手动设置,或者自动设置为订阅的分区的最后提交偏移量。”

这意味着它不会检查最后一次“提交”的偏移量,而是检查最后一次“消费”的偏移量,然后按顺序获取数据。只有在重新启动作业时,它才会继续从该消费者组的最后提交偏移量读取,或者如果您使用基于 auto_offset_reset 配置的新消费者组。

为了解决您的问题,在 catch 块中,我看到以下几种解决方案可供您应用:

  • 不仅记录“Kafka 消费者中的错误”,而是使作业关闭。修复代码,然后重新启动应用程序。
  • 使用导致异常的偏移量编号,通过 seek API 将您的消费者重新定位到相同的偏移量。有关 seek 方法的详细信息可以在 此处 找到。
英文:

This is the expected behaviour of the underlying KafkaConsumer.

Under the covers, the KafkaConsumer uses the poll API which is described in the JavaDocs as:

> "On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions."

This means, it does not check for the last committed offsets but rather for the last consumed offsets and then fetches the data sequentially. Only when re-starting your job it will continue reading from the last committed offset for that consumer group or if you use a new consumer group based on the auto_offset_reset configuration.

To solve your problem, I see the following solutions that you can apply in the catch block:

  • Instead of just logging the "Error in Kafka Consumer" make your job shut down. Fix the code and re-start your application
  • Use the offset number (that caused the Exception) to re-position your consumer to the same offset again using the seek API. Details on seek method can be found here

huangapple
  • 本文由 发表于 2020年10月21日 18:44:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/64461894.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定