使用ConsumerRecord kafka的键值创建一个地图。

huangapple go评论58阅读模式
英文:

create a map using key value of ConsumerRecord kafka

问题

以下是您要翻译的部分:

我正在使用KafkaConsumer从一系列Kafka主题中读取消息。我正在使用自定义的反序列化器。以下是反序列化器中的方法:

public Object deserialize(String s, byte[] bytes) {
    if (!isEmpty(bytes)) {
        try {
            return objectMapper.readValue(bytes, new TypeReference<Map<String, Object>>() {
            });
        } catch (Exception e) {
            LOGGER.error("Error while deserializing bytes[] {} to json object ", new String(bytes), e);
        }
    }

    return null;
}

在这里没有出现错误。以下是我收到的ConsumerRecords的屏幕截图:

使用ConsumerRecord kafka的键值创建一个地图。

ConsumerRecord的值标签包含值作为Map。所以以下是我的需求:

  • 使用来自主题的消息创建CSV文件。
  • 因为有多个主题,所以需要使用来自每个主题的消息创建多个文件。

可能的解决方案:

  • 按主题将消息分开。每个主题可以有一系列消息。
  • 因此,我需要一个Map<String, List<Map<String, Value>>>。这里的键(String)应该是主题,值是:
    来自屏幕截图的ConsumerRecord的Value标签。

问题:

  1. 如何从ConsumerRecord中获取值标签并返回为Map。到目前为止,这是我能做的最好的:
List<Object> customerMigrationKafkaMessages =
            consumerRecords.stream()
                .flatMap(c -> StreamSupport.stream(c.spliterator(), false).map(ConsumerRecord::value))
                .collect(Collectors.toList());
  1. 但是,需要以一种可以创建Map的方式迭代ConsumerRecords。键是主题名称,值是上一步中Map的List对象。
  2. 为什么需要第1步作为HashMap?1. 键集将用作我必须创建的CSV的标题。2. 值集将是CSV文件中的行。

请帮助我提供一些想法,我无法编写这段代码。
谢谢!

编辑1:
添加了对@OneCricketer答案的评论:

  1. 链接 - 我尝试使用以下配置:
kafka.value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

但一直收到异常:“Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided”

  1. 链接:是的,我知道结构。但是,正如我之前提到的,有多个主题,所以我最终将创建多个POJO(我之前计划过),但后来我再次将这些POJO转换为字符串,然后写入文件。此外,必须在反序列化发生之前创建一个Map,以便可以使用与主题对应的正确类类型。示例:
Map myMap = new HashMap<>();
myMap.put("TopicA", MyClassA.class);
myMap.put("TopicB", MyClassB.class);
public Object deserialize(String topic, String s, byte[] bytes) {
  if (!isEmpty(bytes)) {
    try {
      return objectMapper.readValue(bytes, myMap.get(topic));
    } catch (Exception e) {
      LOGGER.error("Error while deserializing bytes[] {} to json object ", new String(bytes), e);
    }
  }
  return null;
}
  1. 链接:我可能误解了这个。我可以为每个主题拥有一系列消息,并且所有这些消息应该放入一个文件,而不是每条消息一个文件。因此,如果我处理了4个主题,我需要创建4个文件。
英文:

I am using KafkaConsumer to read messages from a list of kafka topics. I am using a custom Deserializer. below is the method from Deserializer:

public Object deserialize(String s, byte[] bytes) {
    if (!isEmpty(bytes)) {
      try {
        return objectMapper.readValue(bytes, new TypeReference&lt;Map&lt;String, Object&gt;&gt;() {
        });
      } catch (Exception e) {
        LOGGER.error(&quot;Error while deserializing bytes[] {} to json object &quot;, new String(bytes), e);
      }
    }

    return null;
  }

No error happens here. Following is the screenshot of ConsumerRecords that I get:
使用ConsumerRecord kafka的键值创建一个地图。

The ConsumerRecord value tag has the values as a Map.
So following is my requirement:

  • Use the messages from the topic to create a csv file.
  • because there are multiple topics, so it's required to create multiple files using messages from each topic.

Probable Solution:

  • separate the messages by topic. each topic can have a list of messages.
  • so, I need a Map&lt;String, List&lt;Map&lt;String, Value&gt;&gt;&gt;. here the key(String) should be the topics and the value is :
    Value tag from ConsumerRecord in screenshot.

Questions:

  1. How can I get the value tag from ConsumerRecord return as Map. Till now this is the best I could do:

    List<Object> customerMigrationKafkaMessages =
    consumerRecords.stream()
    .flatMap(c -> StreamSupport.stream(c.spliterator(), false).map(ConsumerRecord::value))
    .collect(Collectors.toList());

  2. But, need to iterate ConsumerRecords in a way that I can create a Map. Key is the topic name, and value is the List object of the Map from above step.

  3. Why do I need step1. as HashMap? 1. The keyset would serve as the header of the csv that I have to create. 2. The valueSet will the rows in the csv file.

Please help me with some ideas, I am not able to code this.
Thanks!

Edit 1:
Adding my comments to the answer from @OneCricketer

  1. https://stackoverflow.com/questions/76437635/create-a-map-using-key-value-of-consumerrecord-kafka#:~:text=already%20has%20a-,JsonDeserializer,-class%20that%20returns - I have tried using this:

    kafka.value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    But keep getting exception: Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

  2. https://stackoverflow.com/questions/76437635/create-a-map-using-key-value-of-consumerrecord-kafka/76440122#76440122:~:text=structure%20of%20your-,JSON,-in%20order%20to : Yes, I know the structure. But, as I mentioned, there are multiple topics, so I will end up creating multiple POJO(which I did plan earlier), but later I am again converting those POJO to string and then writing to the file. Also, will have to create a Map before deserialization happens, so that I can use the right class type corresponding to the topic. Example:

Map myMap = new HashMap&lt;&gt;();
    myMap.put(&quot;TopicA&quot;, MyClassA.class);
    myMap.put(&quot;TopicB&quot;, MyClassB.class);
    public Object deserialize(String topic, String s, byte[] bytes) {
      if (!isEmpty(bytes)) {
        try {
          return objectMapper.readValue(bytes, myMap.get(topic));
        } catch (Exception e) {
          LOGGER.error(&quot;Error while deserializing bytes[] {} to json object &quot;, new String(bytes), e);
        }
      }
      return null;
  1. https://stackoverflow.com/questions/76437635/create-a-map-using-key-value-of-consumerrecord-kafka/76440122#76440122:~:text=YourCsvClass.class)-,Open,-a%20file%20handle - I might have misunderstood this.I can have a list of messages for each topic, and all those messages should go to a one file, and not file per message.
    So, if I have 4 topics processed. I need to have 4 files created.

答案1

得分: 1

  • 我正在使用自定义反序列化器

  • Kafka已经有一个JsonDeserializer类,它返回JsonNode,并且可以像Map一样迭代。

  • 创建一个CSV文件

  • 我建议您使用json lines

  • 但是,您需要考虑多个消费者线程或独立的服务器在运行,所以您最终会得到许多文件(除非您保证有一个单一分区的主题)。

  • 无论如何,我假设您已经知道您的JSON结构,以便实际编写CSV标头?

  • 所以,

  1. 创建一个POJO
  2. 添加Jackson csv数据格式依赖项
  3. 重新编写您的反序列化器以返回一个POJO,而不是一个Map(用YourCsvClass.class替换TypeReference)
  4. 在消费者开始之前打开一个文件句柄,或者保持一个Map<String, File>以存储每个键的打开文件,在每个文件句柄完成消费者批处理时必须刷新,并在JVM关闭之前(或者在消费者批处理之后)关闭每个文件句柄
  5. 在消费者循环内将message.value()附加到该文件。在这里,您不需要使用Streams,消费者迭代器也不需要成为Map。
  • 我要注意的是,Kafka与您的问题实际上没有太大关系,除非您尝试为每条记录创建一个文件。请参考https://www.baeldung.com/java-converting-json-to-csv

还值得指出的是,Kafka Connect HDFS/S3 Sink已经可以为您执行此操作,并且将正确处理多个分区主题(尽管无法更改文件名以使用记录键)。

英文:

> I am using a custom Deserializer

Kafka already has a JsonDeserializer class that returns JsonNode, and can be iterated the same as a Map.


> create a csv file

I'd recommend you use json lines.

But otherwise, you need to consider the fact that multiple consumer threads, or separate servers are running, so you'd end up with many files (unless you guarantee you have a single partition topic).

In any case, I assume you already know the structure of your JSON in order to actually write CSV headers?

So,

  1. Create a POJO
  2. Add Jackson csv dataformat dependency
  3. Rewrite your Deserializer to return a POJO, rather than a Map (replace TypeReference with YourCsvClass.class)
  4. Open a file handle before your consumer starts, or keep a Map&lt;String, File&gt; to store an open file per key, which you must flush each file handle when consumer batch is complete, and close each file handle on JVM shutdown (or after consumer batch)
  5. Append message.value() to that file within the consumer loop. You do not need Streams here, nor does the Consumer iterator need to become a Map

I'll note that Kafka isn't really related to your problem, other than you shouldn't try to create one file per record. Refer https://www.baeldung.com/java-converting-json-to-csv

Also worth pointing out the Kafka Connect HDFS/S3 Sink can already do this for you, and will properly handle multiple partition topics (though the filename cannot be changed to use the record key)

huangapple
  • 本文由 发表于 2023年6月9日 14:09:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76437635.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定