默认的kafkaListenerContainerFactory如何工作

huangapple go评论103阅读模式
英文:

How does default kafkaListenerContainerFactory work

问题

我有一个Kafka消费者的Java应用程序。
我从该应用程序中读取两个不同的Kafka主题。

对于主题1,有一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory,以下是代码片段。消息采用avro格式。Pojo1是使用avro模式构建的POJO类。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

为了从主题1中消费消息,我们有以下方法:

@KafkaListener(topics = "topic1")
public boolean readTopic1(ConsumerRecord<String, Pojo1> record) {
    // 逻辑
}

现在这个应用程序也在读取主题2的消息,主题2中的消息也是avro消息,Pojo2是使用相应avro模式构建的POJO类。

为了从主题2中读取,我们有以下方法:

@KafkaListener(topics = "topic2")
public boolean readTopic2(ConsumerRecord<String, Pojo2> record) {
    // 逻辑
}

现在我不明白如何确保从主题2中按预期工作。

kafkaListenerContainerFactory 配置为 Pojo1,那么如何从 Pojo2 的主题2中读取消息呢?
据我所知,只有在缺少名为 "kafkaListenerContainerFactory" 的bean时,Kafka 才会创建默认的容器工厂,但在我的应用程序中,我们已经为主题1构建了一个kafkaListenerContainerFactory。

因此,应该为 Pojo2 创建另一个 KafkaListenerContainerFactory,并在使用 @KafkaListener(topics = "topic2", containerFactory = "factoryForTopic2") 时给它一个引用。

有人能帮助我理解 KafkaListenerContainerFactory 如何用于具有不同消息模式的不同主题吗?

英文:

I have a kafka consumer java application.
I am reading two separate kafka topics from that application.

For topic 1, there is a KafkaListenerContainerFactory with name kafkaListenerContainerFactory, below is the code snippet. The message is in avro format. Pojo1 is the pojo class
built using avro schema.

 @Bean
public ConcurrentKafkaListenerContainerFactory&lt;String, Pojo1&gt; kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory&lt;String, Pojo1&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

For consuming message from topic1, we have below method

@kafkaListener (topics=&quot;topic1&quot;)
public boolean readTopic1(ConsumerRecord&lt;String,Pojo1&gt; record){
// logic
}

Now this application is reading messages form topic2 too which is also having avro messages and Pojo2 is the pojo class built using the respective avro schema.

To read from topic2, we have below method

@kafkaListener (topics=&quot;topic2&quot;)
public boolean readTopic2(ConsumerRecord&lt;String,Pojo2&gt; record){
// logic
}

Now I am not getting how consuming messages from topic2 is working as expected.

kafkaListenerContainerFactory was configured with Pojo1 so how does messages are being read from topic2 for Pojo2.
As far as I know kafka creates default container factory only if there is a missing bean with name "kafkaListenerContainerFactory", but in my application
we already have built one kafkaListenerContainerFactory for topic1.

According there should be another KafkaListenerContainerFactory created for Pojo2 and should be given the reference when using @kafkaListener (topics=&quot;topic2&quot;, containerFactory=&quot;factoryForTopic2&quot;)

Can someone help me understand how KafkaListenerContainerFactory is working for different topics with different message schema's.

答案1

得分: 5

这被称为类型擦除;泛型类型仅在编译时适用。在运行时,只要您的反序列化器创建了正确的类型,它就会起作用。

最好使用ConcurrentKafkaListenerContainerFactory<String, Object>来避免任何混淆;特别是在使用可以创建不同类型的Avro或JSON反序列化器时。

话虽如此,您的bean被称为importDecisionMessageKafkaListenerContainerFactory。对于要使用非标准工厂的监听器,必须在监听器的containerFactory属性中指定其名称;否则将使用kafkaListenerContainerFactory

英文:

It's called type erasure; the generic types are only applicable at compile time. At runtime, as long as your deserializer creates the correct type it will work.

It's probably cleaner to use ConcurrentKafkaListenerContainerFactory&lt;String, Object&gt; to avoid any confusion; especially when using an avro or json deserializer that can create different types.

That said, your bean is called importDecisionMessageKafkaListenerContainerFactory. For listeners to use a non-standard factory, it's name must be specified in the containerFactory property on the listener; otherwise kafkaListenerContainerFactory is used.

答案2

得分: 0

这仅在您的情况下起作用,因为反序列化器能够将 String 反序列化为两个对象。但是想象一下,如果您使用 Avro,仍然有 Pojo1 和 Pojo2。在这种情况下,您将需要两个容器工厂。如果您没有为 Pojo2 提供容器工厂,将会抛出异常 - MessageConversionException: 无法将 String 转换为 Pojo2 以用于 GenericMessage。解决方案是定义 @kafkaListener (topics="topic2", containerFactory="factoryForTopic2")

英文:

This works only in your case because the deserializer is able to deserialize String to both objects. But imagine you use Avro and you still have both Pojo1 and Pojo2. In this case you would need two container factories. If you do not provide one for Pojo2 an exception will be thrown - MessageConversionException: cannot convert String to Pojo2 for GenericMessage. The solution would be to define @kafkaListener (topics=&quot;topic2&quot;, containerFactory=&quot;factoryForTopic2&quot;)

huangapple
  • 本文由 发表于 2020年6月29日 15:17:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/62633041.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定