KafkaConsumer.commitAsync()在比先前的偏移量更低的情况下的行为

huangapple go评论90阅读模式
英文:

KafkaConsumer.commitAsync() behavior with a lower offset than previous

问题

当主题的偏移值以比先前调用的偏移值更小的值给出时,KafkaConsumer.commitAsync方法将如何处理呢?

英文:

How will kafka deal with a call to

KafkaConsumer.commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

when offset value for a topic is given as a lesser value than a previous invocation?

答案1

得分: 3

这将简单地将分区的偏移量设置为您指定的值,因此下一次您将从committedOffset+1处消费您的消息。
commitAsync()的Javadoc说明:
> 提交的偏移量应该是您的应用程序将要消费的下一条消息,即lastProcessedMessageOffset + 1。

英文:

It will simply set the offset of the partition to the value you specified,so next time you will consume you message from commitedOffset+1.
The javadoc of commitAsync() says:
> The committed offset should be the next message your application will consume,i.e. lastProcessedMessageOffset + 1.

答案2

得分: 3

我很好奇,于是进行了测试以观察其行为。如文档所述,@haoyuwang 在他的回答中写得是正确的(+1)。

其背后的原因相当简单。消费者组的已提交偏移存储在 Kafka 内部主题 __consumer_offsets 中。该主题是“紧凑型(compact)”的,这意味着它旨在为给定的键提供最新的值。在您的情况下,键是消费者组、主题和分区的组合,而值是偏移量。

如果您现在这样做:

  • 提交偏移量 10,然后由于异步过程
  • 提交偏移量 5

偏移量 5 将成为 __consumer_offsets 主题中的最新值。这意味着您的消费者将从该主题分区读取的下一个偏移量是 6,而不是 11。

如何重现

您可以通过在正常提交后(同步地)提交较早的偏移量来简单地重现并测试它,如下所示:

consumer.commitSync();
consumer.commitSync(commitFirstMessage);

其中 commitFirstMessage 被定义为:

TopicPartition zeroTopicPartition = new TopicPartition(topic, 0);
OffsetAndMetadata zeroOffset = new OffsetAndMetadata(0L);

Map<TopicPartition, OffsetAndMetadata> commitFirstMessage = new HashMap<>();
commitFirstMessage.put(zeroTopicPartition, zeroOffset);

编辑:

如何通过 commitAsync 避免提交较低的偏移量

在书籍 Kafka - The Definitive Guide 中,建议避免由于 commitAsync 的重试调用而提交较低的偏移量:

> 重试异步提交:为了异步重试的提交顺序正确,一个简单的模式是使用单调递增的序列号。每次提交时增加序列号,并将序列号在提交时添加到 commitAsync 回调中。在准备发送重试时,检查回调获得的提交序列号是否等于实例变量;如果相等,则没有更新的提交,可以安全地重试。如果实例序列号较高,则不要重试,因为已经发送了更新的提交。

一个实现可能如下所示(未经实际测试!):

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // 定义主题
  val topic = "myOutputTopic"

  // 设置属性
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [设置更多属性...]

  // 创建 KafkaConsumer 并订阅
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // 初始化全局计数器
  val atomicLong = new AtomicLong(0)

  // 消费消息
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // 对记录执行某些操作
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }

  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // 保持此回调实例的位置
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // 仅在没有其他提交增加全局计数器的情况下重试
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}
英文:

I was curious and tested it to see the behavior. As written in the docs, it is correct what @haoyuwang wrote in his answer (+1).

The reason behind it is quite simple. The committed offsets of a consumer group are stored in Kafka within the internal topic __consumer_offsets. This topic is compact which means it is meant to provide the latest value for a given key. In your case the key is a combination of the Consumer Group, Topic and partition whereas your value is the offset.

If you now

  • commit offset 10 and due to asynchronous process later
  • commit offset 5

offset 5 will be the latest value in the __consumer_offsets topic. That means the next offset your consumer will read from that topic partition is offset 6 and not offset 11.

How to reproduce

You could simply reproduce it and test it by (synchronously) commit an earlier offset after your regular commit, like this:

consumer.commitSync();
consumer.commitSync(commitFirstMessage);

where commitFirstMessage is defined as

TopicPartition zeroTopicPartition = new TopicPartition(topic, 0);
OffsetAndMetadata zeroOffset = new OffsetAndMetadata(0L);

Map&lt;TopicPartition, OffsetAndMetadata&gt; commitFirstMessage = new HashMap&lt;&gt;();
commitFirstMessage.put(zeroTopicPartition, zeroOffset);

EDIT:

How to avoid committing lower offsets with commitAsync

In the book Kafka - The Definitive Guide there is a recommendation to avoid commit lower offsets because of a retrying call of commitAsync:

> Retrying Async Commits: A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the
sequence number at the time of the commit to the commitAsync callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry because a newer commit was already sent.

An implementation could look like this (not actually tested!):

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = &quot;myOutputTopic&quot;

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;AsyncCommitter5&quot;)
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;)
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data &lt;- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException =&gt; ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}

huangapple
  • 本文由 发表于 2020年9月4日 19:13:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/63740084.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定