英文:
Difference between KakaConsumer.poll(timeoutMs) and KafkaConsumer.poll(Duration.ZERO)
问题
以下是翻译好的部分:
我正在从头开始阅读一个 Kafka 主题。为了使用 seekToBeginning()
,我首先需要进行一次虚拟的 poll()
调用。以下是我的代码片段:
// 订阅
consumer.subscribe(Collections.singleton(TOPIC_NAME));
// 寻找到开头
// consumer.poll(Duration.ZERO);
consumer.poll(0);
consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));
使用 consumer.poll(0)
是有效的。当我使用 consumer.poll(Duration.ZERO)
时,会导致以下异常:
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group2-1, groupId=group2] 正在寻找分区 test-lc-1-0 的最早偏移量
Exception in thread "main" java.lang.IllegalStateException: 分区 test-lc-1-0 没有当前分配
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:615)
at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4797)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:613)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1659)
at com.ahmed.ConsumeProtobuf.main(ConsumeProtobuf.java:49)
我查看了这两个 API 的实现。最终,这两个 API 都以相同的参数 0
调用了同一个方法。你有关于为什么 poll(Duration.ZERO)
会失败的任何想法吗?
谢谢,
Ahmed.
英文:
I am reading a kafka topic from the beginning. In order to use seekToBeginning()
I first need to do a dummy call to poll(). Following is snippet of my code:
// Subscribe
consumer.subscribe(Collections.singleton(TOPIC_NAME));
// Seek to beginning
// consumer.poll(Duration.ZERO);
consumer.poll(0);
consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));
Using consumer.poll(0)
works fine. When I use consumer.poll(Duration.ZERO)
results in following exception:
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group2-1, groupId=group2] Seeking to EARLIEST offset of partition test-lc-1-0
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-lc-1-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:615)
at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4797)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:613)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1659)
at com.ahmed.ConsumeProtobuf.main(ConsumeProtobuf.java:49)
I looked at the implementation of both the API. At the end both the api end up calling the same method with 0
as the argument. Any idea why poll(Duration.ZERO) would fail?
Thank you,
Ahmed.
答案1
得分: 2
正确的方法是在启动消费者时使用ConsumerRebalanceListener
。
例如,类似这样的代码:
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
consumer.subscribe(Collections.singleton(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
while (true) {
consumer.poll(Duration.ofSeconds(1L));
...
}
}
英文:
The correct way to seek when starting a consumer is to use a ConsumerRebalanceListener
.
For example, something like:
<!-- language: lang-java -->
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
consumer.subscribe(Collections.singleton(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
while (true) {
consumer.poll(Duration.ofSeconds(1L));
...
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论