Kafka消费者显示的数字以不可读的格式呈现

huangapple go评论78阅读模式
英文:

Kafka consumer showing numbers in unreadable format

问题

以下是您提供的内容的翻译:

我正在尝试使用Kafka流处理。我正在从一个主题中读取消息,然后进行groupByKey操作,然后统计分组的数量。但问题是消息计数显示为难以阅读的“方框”。

如果我运行控制台消费者,这些消息会显示为空字符串。

这是我编写的WordCount代码:

package streams;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;

public class WordCount {
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-demo-2");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
        properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

        // topology
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> input = builder.stream("temp-in");
        KStream<String, Long> fil = input.flatMapValues(val -> Arrays.asList(val.split(" "))) // making stream of text line to stream of words
                .selectKey((k, v) -> v) // changing the key
                .groupByKey().count().toStream(); // getting count after groupBy

        fil.to("temp-out");

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

        System.out.println(streams.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

这是我在消费者中得到的输出。它在图像的右侧。

Kafka消费者显示的数字以不可读的格式呈现

我尝试过将长整型再次转换为长整型,以查看是否有效。但它不起作用。

如果有帮助,我还附上了消费者代码。

package tutorial;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 一旦消费者开始运行,即使我们在控制台中停止,它也会继续运行。
        // 我们应该创建新的消费者以从最早开始读取,因为先前的消费者已经消费到了某个偏移量
        // 当我们在两个控制台中运行相同的消费者时,Kafka会检测到并重新平衡
        // 在这种情况下,控制台分割它们消耗的分区,形成一个消费者组
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-application-1"); // -&gt; 消费者ID
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // -&gt; 消费者获取数据的起点

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("temp-out"));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record: consumerRecords) {
                System.out.println(record.key() + " " + record.value());
                System.out.println(record.partition() + " " + record.offset());
            }
        }
    }
}

非常感谢您的帮助。提前感谢。

英文:

I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable "boxes".

If I run the console consumer these are coming as empty strings

This is the WordCount code I wrote

package streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Properties;
public class WordCount {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, &quot;streams-demo-2&quot;);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// topology
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; input = builder.stream(&quot;temp-in&quot;);
KStream&lt;String, Long&gt; fil = input.flatMapValues(val -&gt; Arrays.asList(val.split(&quot; &quot;))) // making stream of text line to stream of words
.selectKey((k, v) -&gt; v) // changing the key
.groupByKey().count().toStream(); // getting count after groupBy
fil.to(&quot;temp-out&quot;);
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
System.out.println(streams.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

This is the output I am getting in the consumer. It is there on the right side in image

Kafka消费者显示的数字以不可读的格式呈现

I had tried casting the long to long again to see if it works. But it's not working

I am attaching the consumer code too if it helps.

package tutorial;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Once the consumer starts running it keeps running even after we stop in console
// We should create new consumer to read from earliest because the previous one had already consumed until certain offset
// when we run the same consumer in two consoles kafka detects it and re balances
// In this case the consoles split the partitions they consume forming a consumer group
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;consumer-application-1&quot;); // -&gt; consumer id
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;); // -&gt; From when consumer gets data
KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(properties);
consumer.subscribe(Collections.singleton(&quot;temp-out&quot;));
while (true) {
ConsumerRecords&lt;String, String&gt; consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord&lt;String, String&gt; record: consumerRecords) {
System.out.println(record.key() + &quot; &quot; + record.value());
System.out.println(record.partition() + &quot; &quot; + record.offset());
}
}
}
}

Any help is appreciated. Thanks in advance.

答案1

得分: 2

以下是您要翻译的内容:

Kafka Streams 中正在编写的消息值是 Long 类型,而您正在以 String 类型进行消费。

如果您对您的 Consumer 类进行以下更改,您将能够正确地在标准输出中看到计数:

        // 将此从 StringDeserializer 更改为 LongDeserializer。
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        
        ...

        // 您在此处消费的值是 Long 类型,而不是 String 类型。
        KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("temp-out"));

        while (true) {
            ConsumerRecords<String, Long> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, Long> record : consumerRecords) {
                System.out.println(record.key() + " " + record.value());
                System.out.println(record.partition() + " " + record.offset());
            }
        }
英文:

The message value you're writing with Kafka Streams is a Long, and you're consuming it as a String.

If you make the following changes to your Consumer class, you'll be able to see the count printed correctly to stdout:

        // Change this from StringDeserializer to LongDeserializer.
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
...
// The value you&#39;re consuming here is a Long, not a String.
KafkaConsumer&lt;String, Long&gt; consumer = new KafkaConsumer&lt;&gt;(properties);
consumer.subscribe(Collections.singleton(&quot;temp-out&quot;));
while (true) {
ConsumerRecords&lt;String, Long&gt; consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord&lt;String, Long&gt; record : consumerRecords) {
System.out.println(record.key() + &quot; &quot; + record.value());
System.out.println(record.partition() + &quot; &quot; + record.offset());
}
}

huangapple
  • 本文由 发表于 2020年5月30日 21:52:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/62103409.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定