英文:
Kafka consumer showing numbers in unreadable format
问题
以下是您提供的内容的翻译:
我正在尝试使用Kafka流处理。我正在从一个主题中读取消息,然后进行groupByKey操作,然后统计分组的数量。但问题是消息计数显示为难以阅读的“方框”。
如果我运行控制台消费者,这些消息会显示为空字符串。
这是我编写的WordCount代码:
package streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Properties;
public class WordCount {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-demo-2");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// topology
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("temp-in");
KStream<String, Long> fil = input.flatMapValues(val -> Arrays.asList(val.split(" "))) // making stream of text line to stream of words
.selectKey((k, v) -> v) // changing the key
.groupByKey().count().toStream(); // getting count after groupBy
fil.to("temp-out");
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
System.out.println(streams.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
这是我在消费者中得到的输出。它在图像的右侧。
我尝试过将长整型再次转换为长整型,以查看是否有效。但它不起作用。
如果有帮助,我还附上了消费者代码。
package tutorial;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 一旦消费者开始运行,即使我们在控制台中停止,它也会继续运行。
// 我们应该创建新的消费者以从最早开始读取,因为先前的消费者已经消费到了某个偏移量
// 当我们在两个控制台中运行相同的消费者时,Kafka会检测到并重新平衡
// 在这种情况下,控制台分割它们消耗的分区,形成一个消费者组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-application-1"); // -> 消费者ID
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // -> 消费者获取数据的起点
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("temp-out"));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record: consumerRecords) {
System.out.println(record.key() + " " + record.value());
System.out.println(record.partition() + " " + record.offset());
}
}
}
}
非常感谢您的帮助。提前感谢。
英文:
I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable "boxes".
If I run the console consumer these are coming as empty strings
This is the WordCount code I wrote
package streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Properties;
public class WordCount {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-demo-2");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// topology
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("temp-in");
KStream<String, Long> fil = input.flatMapValues(val -> Arrays.asList(val.split(" "))) // making stream of text line to stream of words
.selectKey((k, v) -> v) // changing the key
.groupByKey().count().toStream(); // getting count after groupBy
fil.to("temp-out");
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
System.out.println(streams.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
This is the output I am getting in the consumer. It is there on the right side in image
I had tried casting the long to long again to see if it works. But it's not working
I am attaching the consumer code too if it helps.
package tutorial;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Once the consumer starts running it keeps running even after we stop in console
// We should create new consumer to read from earliest because the previous one had already consumed until certain offset
// when we run the same consumer in two consoles kafka detects it and re balances
// In this case the consoles split the partitions they consume forming a consumer group
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-application-1"); // -> consumer id
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // -> From when consumer gets data
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("temp-out"));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record: consumerRecords) {
System.out.println(record.key() + " " + record.value());
System.out.println(record.partition() + " " + record.offset());
}
}
}
}
Any help is appreciated. Thanks in advance.
答案1
得分: 2
以下是您要翻译的内容:
Kafka Streams 中正在编写的消息值是 Long
类型,而您正在以 String
类型进行消费。
如果您对您的 Consumer
类进行以下更改,您将能够正确地在标准输出中看到计数:
// 将此从 StringDeserializer 更改为 LongDeserializer。
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
...
// 您在此处消费的值是 Long 类型,而不是 String 类型。
KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("temp-out"));
while (true) {
ConsumerRecords<String, Long> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Long> record : consumerRecords) {
System.out.println(record.key() + " " + record.value());
System.out.println(record.partition() + " " + record.offset());
}
}
英文:
The message value you're writing with Kafka Streams is a Long
, and you're consuming it as a String
.
If you make the following changes to your Consumer
class, you'll be able to see the count printed correctly to stdout:
// Change this from StringDeserializer to LongDeserializer.
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
...
// The value you're consuming here is a Long, not a String.
KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("temp-out"));
while (true) {
ConsumerRecords<String, Long> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Long> record : consumerRecords) {
System.out.println(record.key() + " " + record.value());
System.out.println(record.partition() + " " + record.offset());
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论