`KakaConsumer.poll(timeoutMs)` 和 `KafkaConsumer.poll(Duration.ZERO)` 之间的区别。

huangapple go评论68阅读模式
英文:

Difference between KakaConsumer.poll(timeoutMs) and KafkaConsumer.poll(Duration.ZERO)

问题

以下是翻译好的部分:

我正在从头开始阅读一个 Kafka 主题。为了使用 seekToBeginning(),我首先需要进行一次虚拟的 poll() 调用。以下是我的代码片段:

// 订阅
consumer.subscribe(Collections.singleton(TOPIC_NAME));
// 寻找到开头
// consumer.poll(Duration.ZERO);
consumer.poll(0);
consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));

使用 consumer.poll(0) 是有效的。当我使用 consumer.poll(Duration.ZERO) 时,会导致以下异常:

[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group2-1, groupId=group2] 正在寻找分区 test-lc-1-0 的最早偏移量
Exception in thread "main" java.lang.IllegalStateException: 分区 test-lc-1-0 没有当前分配
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:615)
	at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4797)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:613)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1659)
	at com.ahmed.ConsumeProtobuf.main(ConsumeProtobuf.java:49)

我查看了这两个 API 的实现。最终,这两个 API 都以相同的参数 0 调用了同一个方法。你有关于为什么 poll(Duration.ZERO) 会失败的任何想法吗?

谢谢,
Ahmed.

英文:

I am reading a kafka topic from the beginning. In order to use seekToBeginning() I first need to do a dummy call to poll(). Following is snippet of my code:

    // Subscribe
    consumer.subscribe(Collections.singleton(TOPIC_NAME));
    // Seek to beginning
    // consumer.poll(Duration.ZERO);
    consumer.poll(0);
    consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));

Using consumer.poll(0) works fine. When I use consumer.poll(Duration.ZERO) results in following exception:

[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group2-1, groupId=group2] Seeking to EARLIEST offset of partition test-lc-1-0
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-lc-1-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:615)
	at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4797)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:613)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1659)
	at com.ahmed.ConsumeProtobuf.main(ConsumeProtobuf.java:49)

I looked at the implementation of both the API. At the end both the api end up calling the same method with 0 as the argument. Any idea why poll(Duration.ZERO) would fail?

Thank you,
Ahmed.

答案1

得分: 2

正确的方法是在启动消费者时使用ConsumerRebalanceListener

例如,类似这样的代码:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    consumer.subscribe(Collections.singleton(TOPIC_NAME), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            consumer.seekToBeginning(partitions);
        }
    });
    while (true) {
        consumer.poll(Duration.ofSeconds(1L));
        ...
    }
}
英文:

The correct way to seek when starting a consumer is to use a ConsumerRebalanceListener.

For example, something like:

<!-- language: lang-java -->

try (KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(configs);) {
    consumer.subscribe(Collections.singleton(TOPIC_NAME), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection&lt;TopicPartition&gt; partitions) {}

        @Override
        public void onPartitionsAssigned(Collection&lt;TopicPartition&gt; partitions) {
            consumer.seekToBeginning(partitions);
        }
    });
    while (true) {
        consumer.poll(Duration.ofSeconds(1L));
        ...
    }
}

huangapple
  • 本文由 发表于 2020年8月22日 06:55:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/63530978.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定