英文:
How to convert bytes to string while processing records from Kafka?
问题
我正在使用Spark与Kafka。我正在运行一个流查询并从一个Kafka主题中读取数据。我的代码是 -
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.concurrent.TimeoutException;
public class Kafka {
public static void main(String args[]){
SparkSession spark = SparkSession
.builder()
.appName("Spark-Kafka-Integration")
.config("spark.master", "local")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "quickstart-events")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
StreamingQuery query = null;
try {
query = df.writeStream()
.outputMode("append")
.format("console")
.start();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
}
}
}
我正在使用以下命令从命令行向主题中输入新的事件 -
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
然而,我在控制台上得到的输出如下所示 -
我已将输出转换为字符串,但键和值仍然不是字符串。如何将它们转换为字符串?
英文:
I am using spark with kafka. I am running a streaming query and reading from a kafka topic. My code is -
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.concurrent.TimeoutException;
public class Kafka {
public static void main(String args[]){
SparkSession spark = SparkSession
.builder()
.appName("Spark-Kafka-Integration")
.config("spark.master", "local")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "quickstart-events")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
StreamingQuery query = null;
try {
query = df.writeStream()
.outputMode("append")
.format("console")
.start();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
}
}
}
I am entering new events in the topic from the command line using the command -
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
However, the output I am getting on my console looks like this -
I have casted the output as string, still the key and value are not strings. How can I convert them in string ?
答案1
得分: 2
You are missing assignment in the expression:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
It should be
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
英文:
You are missing assignment in the expression:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
It should be
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论