Spring Cloud流反序列化不同的POJOs

huangapple go评论69阅读模式
英文:

Spring Cloud stream deserialize different pojos

问题

以下是您要翻译的内容:

我正在使用 KStream 来消费 Kafka 消息并将其持久化到我的数据库。这些消息属于不同的 POJO,目前我正在使用对象映射器来创建对象,然后将它们持久化到数据库中。我已经阅读到可以使用 JsonDeserializerSerde,但我不确定如何将其用于映射到不同的 POJO。为每个 POJO 都创建一个自定义 serde 是没有意义的。请帮忙,提前感谢。

这是我的代码:

public Consumer<KStream<String, String>> process() {

    return input ->
            input.foreach((key, value) -> {
                ObjectMapper mapper = new ObjectMapper();
                try {

                    if(value.contains("Teacher"))
                    {
                        Teacher teacher= mapper.readValue(value,Teacher.class);
                        teacherRepository.save(teacher);
                    }
                    else if(value.contains("Student"))
                    {
                        Student student= mapper.readValue(value,Student.class);
                        studentRepository.save(student)
                    }
                    else  if(value.contains("Principal"))
                    {
                        Principal principal= mapper.readValue(value,Principal.class);
                        principalRepository.save(Principal);
                    }
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }

            });    }

}
英文:

I'm using Kstream to consume kafka message and persist it to my db. The messages belong to different pojo's and currently I'm using object mapper to create the object and then persist them in the DB. I've read that Jsondeserialzerserde can be used but I'm not sure how can I use it to map to different pojo's. Having a custom serde for each pojo doesn't make sense. Please help . Thanks in advance.

here is my code:

    public Consumer&lt;KStream&lt;String, String&gt;&gt; process() {

        return input -&gt;
                inpu.foreach((key, value) -&gt; {
                    ObjectMapper mapper = new ObjectMapper();
                    try {

                        if(value.contains(&quot;Teacher&quot;))
                        {
                            Teacher teacher= mapper.readValue(value,Teacher.class);
                            teacherRepository.save(teacher);
                        }
                        else if(value.contains(&quot;Student&quot;))
                        {
                            Student student= mapper.readValue(value,Student.class);
                            studentRepository.save(student)
						}
                        else  if(value.contains(&quot;Principal&quot;))
                        {
                            Principal principal= mapper.readValue(value,Principal.class);
                            principalRepository.save(Principal);
                        }
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }

                });    }

}

答案1

得分: 0

Kafka Streams在Spring Cloud Stream中的绑定器并没有直接提供执行您所要求的操作的机制。您的消费者类型标识表明您将其以String形式进行消费,因此绑定器可以在您不显式提供任何Serde的情况下推断出该信息。然而,如果您想要进一步使用jackson将String转换为其他类型,您需要在您的业务逻辑中自行处理,就像您已经在其中处理的那样。如果您只有少量有限的类型,我认为这样做没有问题。

英文:

Kafka Streams binder in Spring Cloud Stream does not directly provide any mechanism for doing what you are asking. Your consumers' type signature says that you are consuming it as a String, so the binder can infer that information without you explicitly provide any Serde. However, if you want to further transform the String into other types using jackson, you need to do it on your own in your business logic as you have it there already. If you only have a few finite types, I don't see a problem for doing so.

huangapple
  • 本文由 发表于 2020年9月21日 16:05:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/63988344.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定