英文:
KafkaConsumer.commitAsync() behavior with a lower offset than previous
问题
当主题的偏移值以比先前调用的偏移值更小的值给出时,KafkaConsumer.commitAsync方法将如何处理呢?
英文:
How will kafka deal with a call to
KafkaConsumer.commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
when offset value for a topic is given as a lesser value than a previous invocation?
答案1
得分: 3
这将简单地将分区的偏移量设置为您指定的值,因此下一次您将从committedOffset+1处消费您的消息。
commitAsync()
的Javadoc说明:
> 提交的偏移量应该是您的应用程序将要消费的下一条消息,即lastProcessedMessageOffset + 1。
英文:
It will simply set the offset of the partition to the value you specified,so next time you will consume you message from commitedOffset+1.
The javadoc of commitAsync()
says:
> The committed offset should be the next message your application will consume,i.e. lastProcessedMessageOffset + 1.
答案2
得分: 3
我很好奇,于是进行了测试以观察其行为。如文档所述,@haoyuwang 在他的回答中写得是正确的(+1)。
其背后的原因相当简单。消费者组的已提交偏移存储在 Kafka 内部主题 __consumer_offsets
中。该主题是“紧凑型(compact)”的,这意味着它旨在为给定的键提供最新的值。在您的情况下,键是消费者组、主题和分区的组合,而值是偏移量。
如果您现在这样做:
- 提交偏移量 10,然后由于异步过程
- 提交偏移量 5
偏移量 5 将成为 __consumer_offsets
主题中的最新值。这意味着您的消费者将从该主题分区读取的下一个偏移量是 6,而不是 11。
如何重现
您可以通过在正常提交后(同步地)提交较早的偏移量来简单地重现并测试它,如下所示:
consumer.commitSync();
consumer.commitSync(commitFirstMessage);
其中 commitFirstMessage
被定义为:
TopicPartition zeroTopicPartition = new TopicPartition(topic, 0);
OffsetAndMetadata zeroOffset = new OffsetAndMetadata(0L);
Map<TopicPartition, OffsetAndMetadata> commitFirstMessage = new HashMap<>();
commitFirstMessage.put(zeroTopicPartition, zeroOffset);
编辑:
如何通过 commitAsync 避免提交较低的偏移量
在书籍 Kafka - The Definitive Guide 中,建议避免由于 commitAsync
的重试调用而提交较低的偏移量:
> 重试异步提交:为了异步重试的提交顺序正确,一个简单的模式是使用单调递增的序列号。每次提交时增加序列号,并将序列号在提交时添加到 commitAsync 回调中。在准备发送重试时,检查回调获得的提交序列号是否等于实例变量;如果相等,则没有更新的提交,可以安全地重试。如果实例序列号较高,则不要重试,因为已经发送了更新的提交。
一个实现可能如下所示(未经实际测试!):
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// 定义主题
val topic = "myOutputTopic"
// 设置属性
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [设置更多属性...]
// 创建 KafkaConsumer 并订阅
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// 初始化全局计数器
val atomicLong = new AtomicLong(0)
// 消费消息
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// 对记录执行某些操作
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// 保持此回调实例的位置
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// 仅在没有其他提交增加全局计数器的情况下重试
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
英文:
I was curious and tested it to see the behavior. As written in the docs, it is correct what @haoyuwang wrote in his answer (+1).
The reason behind it is quite simple. The committed offsets of a consumer group are stored in Kafka within the internal topic __consumer_offsets
. This topic is compact
which means it is meant to provide the latest value for a given key. In your case the key is a combination of the Consumer Group, Topic and partition whereas your value is the offset.
If you now
- commit offset 10 and due to asynchronous process later
- commit offset 5
offset 5 will be the latest value in the __consumer_offsets
topic. That means the next offset your consumer will read from that topic partition is offset 6 and not offset 11.
How to reproduce
You could simply reproduce it and test it by (synchronously) commit an earlier offset after your regular commit, like this:
consumer.commitSync();
consumer.commitSync(commitFirstMessage);
where commitFirstMessage
is defined as
TopicPartition zeroTopicPartition = new TopicPartition(topic, 0);
OffsetAndMetadata zeroOffset = new OffsetAndMetadata(0L);
Map<TopicPartition, OffsetAndMetadata> commitFirstMessage = new HashMap<>();
commitFirstMessage.put(zeroTopicPartition, zeroOffset);
EDIT:
How to avoid committing lower offsets with commitAsync
In the book Kafka - The Definitive Guide there is a recommendation to avoid commit lower offsets because of a retrying call of commitAsync
:
> Retrying Async Commits: A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the
sequence number at the time of the commit to the commitAsync callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry because a newer commit was already sent.
An implementation could look like this (not actually tested!):
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论