英文:
a spring kafka consumer with multiple brokers
问题
我目前正在使用Spring Kafka实现Kafka监听器。但是我有点困惑,因为我的监听器有时无法分配到分区,因此无法从其中一个代理接收任何消息。
当前情况
我有两个代理,每个代理都有一个主题,其中只包含一个分区(主题名称相同)。
代理1 | 代理2 |
---|---|
主题1 | 主题1 |
一个分区 | 一个分区 |
也许我对设置Kafka监听器以从这两个代理获取消息有错误的想法。不过,以下句子是否正确?
将传递两个代理的主机
和将并发设置为2
足够吗,因为我有两个分区(一个在代理1,另一个在代理2)?
我所做的
创建一个带有注解的监听器
@KafkaListener(topics = "Topic1", containerFactory = "kafkaListenerContainerFactory")
将并发设置为2
factory.setConcurrency(2);
提供代理数组
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ["123.123.123:9092", "211.211.211:9094"]);
设置群组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
使用轮询分区分配策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
我的期望
能够从两个代理接收消息,但有时无法从其中一个代理获取任何消息。
英文:
I'm currently implementing kafka listener with spring kafka.
And now I'm little bit because my listener sometimes could not be assigned to partition so could not receive any message from one of the brokers.
current situation
I have two brokers and each broker has one topic which includes only one partition.(topic name is same)
broker1 | broker2 |
---|---|
Topic1 | Topic1 |
one partition | one partition |
Maybe I'm having wrong idea for setting my kafka listener to take messages from both two brokers. However, is following sentence proper guess?
Are Passing two brokers' host to listener
and Setting concurrency to 2
enough config because I have two partitions (one in broker1 and other one in broker2)?
what I did
create a listener with annotation
@KafkaListener(topics = "$Topic1" , containerFactory = "kafkaListenerContainerFactory")
set its concurrency to 2
factory.setConcurrency(2);
give array of brokers
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, [123.123.123:9092, 211.211.211:9094]);
set group
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
set partition assignment strategy with round robin way
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
what I expected
being able to receive messages from both brokers but sometimes could not get any message from one of two brokers
答案1
得分: 0
如果经纪人位于同一集群中,应该可以正常工作。
如果经纪人不属于同一集群,您必须添加两个 @KafkaListener
注释,每个注释使用指向每个集群的容器工厂。或者,您可以在其中一个注释上使用 properties
属性来覆盖 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
属性。
英文:
If the brokers are in the same cluster it should work OK.
If the brokers are not part of the same cluster, you must add two @KafkaListener
annotations, each one using a container factory that points to each cluster. Or, you can use the properties
attribute on one of the annotations to override the ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
property.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论