一个使用多个代理的Spring Kafka消费者

huangapple go评论95阅读模式
英文:

a spring kafka consumer with multiple brokers

问题

我目前正在使用Spring Kafka实现Kafka监听器。但是我有点困惑,因为我的监听器有时无法分配到分区,因此无法从其中一个代理接收任何消息。

当前情况

我有两个代理,每个代理都有一个主题,其中只包含一个分区(主题名称相同)。

代理1 代理2
主题1 主题1
一个分区 一个分区

也许我对设置Kafka监听器以从这两个代理获取消息有错误的想法。不过,以下句子是否正确?

传递两个代理的主机将并发设置为2足够吗,因为我有两个分区(一个在代理1,另一个在代理2)?

我所做的

创建一个带有注解的监听器

@KafkaListener(topics = "Topic1", containerFactory = "kafkaListenerContainerFactory")

将并发设置为2

factory.setConcurrency(2);

提供代理数组

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ["123.123.123:9092", "211.211.211:9094"]);

设置群组

props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");

使用轮询分区分配策略

props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

我的期望

能够从两个代理接收消息,但有时无法从其中一个代理获取任何消息。

英文:

I'm currently implementing kafka listener with spring kafka.
And now I'm little bit because my listener sometimes could not be assigned to partition so could not receive any message from one of the brokers.

current situation

I have two brokers and each broker has one topic which includes only one partition.(topic name is same)

broker1 broker2
Topic1 Topic1
one partition one partition

Maybe I'm having wrong idea for setting my kafka listener to take messages from both two brokers. However, is following sentence proper guess?

Are Passing two brokers' host to listener and Setting concurrency to 2 enough config because I have two partitions (one in broker1 and other one in broker2)?

what I did

create a listener with annotation

@KafkaListener(topics = "$Topic1" , containerFactory = "kafkaListenerContainerFactory")

set its concurrency to 2

factory.setConcurrency(2);

give array of brokers

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, [123.123.123:9092, 211.211.211:9094]);

set group

props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");

set partition assignment strategy with round robin way

props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

what I expected

being able to receive messages from both brokers but sometimes could not get any message from one of two brokers

答案1

得分: 0

如果经纪人位于同一集群中,应该可以正常工作。

如果经纪人不属于同一集群,您必须添加两个 @KafkaListener 注释,每个注释使用指向每个集群的容器工厂。或者,您可以在其中一个注释上使用 properties 属性来覆盖 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 属性。

英文:

If the brokers are in the same cluster it should work OK.

If the brokers are not part of the same cluster, you must add two @KafkaListener annotations, each one using a container factory that points to each cluster. Or, you can use the properties attribute on one of the annotations to override the ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG property.

huangapple
  • 本文由 发表于 2023年3月9日 22:47:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/75686187.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定