英文:
Spark: Convert a Dataset<Row> to key and value to be added in Kafka
问题
我正在尝试将Spark连接到具有位置数据的Kafka主题。
我想将df(数据框结果)转换为键值对,以便我可以将其输出到另一个Kafka主题。
Dataset<Row> df = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "cab-location")
.option("startingOffsets", "earliest").load();
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.map(new MapFunction<Row, Tuple2<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Row value) throws Exception {
Gson g = new Gson();
CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
}
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.map(new MapFunction<Tuple2<String,String>, Row>(){
private static final long serialVersionUID = 1L;
@Override
public Row call(Tuple2<String, String> value) throws Exception {
return RowFactory.create(value._1.toString(), value._2);
}
}, Encoders.javaSerialization(Row.class));
当我检查df.columns()时,它只显示一个名为value的列。
请问您需要帮助将Dataset<Row>
转换为具有两列(一列用于键,一列用于值)的数据集,以便我可以将其推送为键值对到另一个Kafka主题吗?
英文:
I am trying Spark connected to a Kafka topic which has Location Data.
I want to convert the df (the dataframe result) to key value pairs so that i can output it to another Kafka topic
Dataset<Row> df = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "cab-location")
.option("startingOffsets", "earliest").load();
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.map(new MapFunction<Row, Tuple2<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Row value) throws Exception {
Gson g = new Gson();
CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
}
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.map(new MapFunction<Tuple2<String,String>, Row>(){
private static final long serialVersionUID = 1L;
@Override
public Row call(Tuple2<String, String> value) throws Exception {
return RowFactory.create(value._1.toString(), value._2);
}
}, Encoders.javaSerialization(Row.class));
When I check df.columns() it shows only 1 column which is value.
Could you help to convert the Dataset<Row> to have 2 columns 1 for key and 1 for value so I can push it as key value pair to another Kafka topic
答案1
得分: 0
以下是代码的翻译部分:
感谢 @OneCricketeer 的建议。
以下是有效的代码。
如讨论所述,添加了 .withColumn 来从元组列 _1 和 _2 添加列键和值。
Dataset<Row> df = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "cab-location")
.option("startingOffsets", "earliest").load();
Dataset<Tuple2<String, String>> df2 = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.map(new MapFunction<Row, Tuple2<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Row value) throws Exception {
Gson g = new Gson();
CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
}
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
df2
.withColumn("key", df2.col("_1"))
.withColumn("value", df2.col("_2"))
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("checkpointLocation", "C:\\Workspace\\Java\\app\\ch")
.option("topic", "location-output")
.outputMode("append").start().awaitTermination();
英文:
Thank you @OneCricketeer with the suggestion.
Below is the code which worked
As discussed added the .withColumn to add column key and value from tuple columns _1 and _2
Dataset<Row> df = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "cab-location")
.option("startingOffsets", "earliest").load();
Dataset<Tuple2<String, String>> df2 = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.map(new MapFunction<Row, Tuple2<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Row value) throws Exception {
Gson g = new Gson();
CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
}
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
df2
.withColumn("key", df2.col("_1"))
.withColumn("value", df2.col("_2"))
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("checkpointLocation", "C:\\Workspace\\Java\\app\\ch")
.option("topic", "location-output")
.outputMode("append").start().awaitTermination();
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论