Kafka消费者显示的数字以不可读的格式呈现

huangapple go评论100阅读模式
英文:

Kafka consumer showing numbers in unreadable format

问题

以下是您提供的内容的翻译:

我正在尝试使用Kafka流处理。我正在从一个主题中读取消息,然后进行groupByKey操作,然后统计分组的数量。但问题是消息计数显示为难以阅读的“方框”。

如果我运行控制台消费者,这些消息会显示为空字符串。

这是我编写的WordCount代码:

  1. package streams;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.common.serialization.Serdes;
  4. import org.apache.kafka.streams.KafkaStreams;
  5. import org.apache.kafka.streams.StreamsBuilder;
  6. import org.apache.kafka.streams.StreamsConfig;
  7. import org.apache.kafka.streams.kstream.KStream;
  8. import java.util.Arrays;
  9. import java.util.Properties;
  10. public class WordCount {
  11. public static void main(String[] args) {
  12. Properties properties = new Properties();
  13. properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  14. properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-demo-2");
  15. properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  16. properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
  17. properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
  18. // topology
  19. StreamsBuilder builder = new StreamsBuilder();
  20. KStream<String, String> input = builder.stream("temp-in");
  21. KStream<String, Long> fil = input.flatMapValues(val -> Arrays.asList(val.split(" "))) // making stream of text line to stream of words
  22. .selectKey((k, v) -> v) // changing the key
  23. .groupByKey().count().toStream(); // getting count after groupBy
  24. fil.to("temp-out");
  25. KafkaStreams streams = new KafkaStreams(builder.build(), properties);
  26. streams.start();
  27. System.out.println(streams.toString());
  28. Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  29. }
  30. }

这是我在消费者中得到的输出。它在图像的右侧。

Kafka消费者显示的数字以不可读的格式呈现

我尝试过将长整型再次转换为长整型,以查看是否有效。但它不起作用。

如果有帮助,我还附上了消费者代码。

  1. package tutorial;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.time.Duration;
  8. import java.util.Collections;
  9. import java.util.Properties;
  10. public class Consumer {
  11. public static void main(String[] args) {
  12. Properties properties = new Properties();
  13. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  14. properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  15. properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  16. // 一旦消费者开始运行,即使我们在控制台中停止,它也会继续运行。
  17. // 我们应该创建新的消费者以从最早开始读取,因为先前的消费者已经消费到了某个偏移量
  18. // 当我们在两个控制台中运行相同的消费者时,Kafka会检测到并重新平衡
  19. // 在这种情况下,控制台分割它们消耗的分区,形成一个消费者组
  20. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-application-1"); // -&gt; 消费者ID
  21. properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // -&gt; 消费者获取数据的起点
  22. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  23. consumer.subscribe(Collections.singleton("temp-out"));
  24. while (true) {
  25. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  26. for (ConsumerRecord<String, String> record: consumerRecords) {
  27. System.out.println(record.key() + " " + record.value());
  28. System.out.println(record.partition() + " " + record.offset());
  29. }
  30. }
  31. }
  32. }

非常感谢您的帮助。提前感谢。

英文:

I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable "boxes".

If I run the console consumer these are coming as empty strings

This is the WordCount code I wrote

  1. package streams;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.common.serialization.Serdes;
  4. import org.apache.kafka.streams.KafkaStreams;
  5. import org.apache.kafka.streams.StreamsBuilder;
  6. import org.apache.kafka.streams.StreamsConfig;
  7. import org.apache.kafka.streams.kstream.KStream;
  8. import java.util.Arrays;
  9. import java.util.Properties;
  10. public class WordCount {
  11. public static void main(String[] args) {
  12. Properties properties = new Properties();
  13. properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
  14. properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, &quot;streams-demo-2&quot;);
  15. properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
  16. properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
  17. properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
  18. // topology
  19. StreamsBuilder builder = new StreamsBuilder();
  20. KStream&lt;String, String&gt; input = builder.stream(&quot;temp-in&quot;);
  21. KStream&lt;String, Long&gt; fil = input.flatMapValues(val -&gt; Arrays.asList(val.split(&quot; &quot;))) // making stream of text line to stream of words
  22. .selectKey((k, v) -&gt; v) // changing the key
  23. .groupByKey().count().toStream(); // getting count after groupBy
  24. fil.to(&quot;temp-out&quot;);
  25. KafkaStreams streams = new KafkaStreams(builder.build(), properties);
  26. streams.start();
  27. System.out.println(streams.toString());
  28. Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  29. }
  30. }

This is the output I am getting in the consumer. It is there on the right side in image

Kafka消费者显示的数字以不可读的格式呈现

I had tried casting the long to long again to see if it works. But it's not working

I am attaching the consumer code too if it helps.

  1. package tutorial;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.time.Duration;
  8. import java.util.Collections;
  9. import java.util.Properties;
  10. public class Consumer {
  11. public static void main(String[] args) {
  12. Properties properties = new Properties();
  13. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
  14. properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  15. properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  16. // Once the consumer starts running it keeps running even after we stop in console
  17. // We should create new consumer to read from earliest because the previous one had already consumed until certain offset
  18. // when we run the same consumer in two consoles kafka detects it and re balances
  19. // In this case the consoles split the partitions they consume forming a consumer group
  20. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;consumer-application-1&quot;); // -&gt; consumer id
  21. properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;); // -&gt; From when consumer gets data
  22. KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(properties);
  23. consumer.subscribe(Collections.singleton(&quot;temp-out&quot;));
  24. while (true) {
  25. ConsumerRecords&lt;String, String&gt; consumerRecords = consumer.poll(Duration.ofMillis(1000));
  26. for (ConsumerRecord&lt;String, String&gt; record: consumerRecords) {
  27. System.out.println(record.key() + &quot; &quot; + record.value());
  28. System.out.println(record.partition() + &quot; &quot; + record.offset());
  29. }
  30. }
  31. }
  32. }

Any help is appreciated. Thanks in advance.

答案1

得分: 2

以下是您要翻译的内容:

Kafka Streams 中正在编写的消息值是 Long 类型,而您正在以 String 类型进行消费。

如果您对您的 Consumer 类进行以下更改,您将能够正确地在标准输出中看到计数:

  1. // 将此从 StringDeserializer 更改为 LongDeserializer。
  2. properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
  3. ...
  4. // 您在此处消费的值是 Long 类型,而不是 String 类型。
  5. KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties);
  6. consumer.subscribe(Collections.singleton("temp-out"));
  7. while (true) {
  8. ConsumerRecords<String, Long> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  9. for (ConsumerRecord<String, Long> record : consumerRecords) {
  10. System.out.println(record.key() + " " + record.value());
  11. System.out.println(record.partition() + " " + record.offset());
  12. }
  13. }
英文:

The message value you're writing with Kafka Streams is a Long, and you're consuming it as a String.

If you make the following changes to your Consumer class, you'll be able to see the count printed correctly to stdout:

  1. // Change this from StringDeserializer to LongDeserializer.
  2. properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
  3. ...
  4. // The value you&#39;re consuming here is a Long, not a String.
  5. KafkaConsumer&lt;String, Long&gt; consumer = new KafkaConsumer&lt;&gt;(properties);
  6. consumer.subscribe(Collections.singleton(&quot;temp-out&quot;));
  7. while (true) {
  8. ConsumerRecords&lt;String, Long&gt; consumerRecords = consumer.poll(Duration.ofMillis(1000));
  9. for (ConsumerRecord&lt;String, Long&gt; record : consumerRecords) {
  10. System.out.println(record.key() + &quot; &quot; + record.value());
  11. System.out.println(record.partition() + &quot; &quot; + record.offset());
  12. }
  13. }

huangapple
  • 本文由 发表于 2020年5月30日 21:52:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/62103409.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定