从Kafka主题消费XML。

huangapple go评论58阅读模式
英文:

Consume XML from Kafka topic

问题

我需要从Kafka主题中消费XML类型。在阅读了一些资源后,我得出了结论,我可能需要编写自己的"序列化器"。但是我完全不知道

  • 首先,如何编写这样的序列化器(用于值)?以及
  • 如何将此序列化器注入Kafka配置(不仅仅是在spring.deserializer.value.delegate.class中分配类名)?
英文:

I need to consume XML type from Kafka topic. After reading through few resources I came to conclusion I might have to write my own "serializer". However I am totally lost on

  • How to write such serializer (for value) in first place? and
  • How to inject this serializer in Kafka configuration (not just assign class name in spring.deserializer.value.deegate.class)

答案1

得分: 0

With raw Kafka client (org.apache.kafka.kafka-clients) it's pretty simple:

  1. 实现一个 org.apache.kafka.common.serialization.Deserializer 接口。这里 有一个教程。简而言之,您需要实现一个将 byte[] 转换为您喜欢的任何类的对象的方法。在这里进行所有的 XML 处理并返回解析后的 XML。假设实现名称为 XmlDeserializer
  2. KafkaConsumer 构造函数中使用一个3参数签名,并将一个反序列化器传递给它:new KafkaConsumer(props, keyDeserializer, new XmlDeserializer())。使用您需要的 keyDeserializer,有很多预定义的选项,例如 new StringDeserializer()

这就是全部。您将获得一个带有表示解析后的键和值的类的 KafkaConsumer。每次调用 poll() 都会返回这些类的对象。您的 XmlDeserializer 会在幕后自动调用。

如果使用 Spring Kafka,您可能会创建一个 DefaultKafkaConsumerFactory 作为一个 bean。在步骤 2) 中,将 new XmlDeserializer() 作为构造函数参数传递。

此外,您还可以将反序列化器的类名放入 props 中,而不是将 new XmlDeserializer() 传递给构造函数(无论是 KafkaConsumer 还是 DefaultKafkaConsumerFactory):props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, XmlDeserializer.class)

英文:

With raw Kafka client (org.apache.kafka.kafka-clients) it's pretty simple:

  1. Implement an org.apache.kafka.common.serialization.Deserializer interface. Here is a tutorial. In a nutshell, you have to implement single method that turns byte[] into object of any class you like. Do all xml stuff here and return a parsed xml. Lets say the implementation is named XmlDeserializer.
  2. Use KafkaConsumer constructor with a 3-arg signature and pass a deserializer into it: new KafkaConsumer(props, keyDeserializer, new XmlDeserializer()). Use keyDeserializer you need, there are plenty predifined of them, for example, new StringDeserializer()

That's all. You will get a KafkaConsumer typed with classes representing parsed key and value. Every poll() would return objects of that classes. Your XmlDeserializer would be called automatically under the hood.

If using Spring Kafka, you probably create DefaultKafkaConsumerFactory as a bean. In step 2) pass new XmlDeserializer() as a constructor parameter.

Also, instead of passing new XmlDeserializer() to a constructor (either KafkaConsumer or DefaultKafkaConsumerFactory) you could put deserializer's class name into props: props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, XmlDeserializer.class) .

答案2

得分: 0

只需使用它,并使用ByteArrayDeserializerStringDeserializer配置您的消费者。

自定义类唯一要做的事情就是包装您在其他两者中使用的相同函数。

英文:

How do you currently read XML data that isn't a file?

Just use that, and configure your consumer with ByteArrayDeserializer or StringDeserializer.

The only thing a custom class would do is wrap the same functions you'd use with the other two.

huangapple
  • 本文由 发表于 2023年6月13日 06:28:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/76460686.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定