将Dataset<Row>转换为要添加到Kafka的键和值。

huangapple go评论77阅读模式
英文:

Spark: Convert a Dataset<Row> to key and value to be added in Kafka

问题

我正在尝试将Spark连接到具有位置数据的Kafka主题。
我想将df(数据框结果)转换为键值对,以便我可以将其输出到另一个Kafka主题。

Dataset<Row> df = spark.readStream().format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "cab-location")
    .option("startingOffsets", "earliest").load();
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .map(new MapFunction<Row, Tuple2<String,String>>() {

        private static final long serialVersionUID = 1L;

        @Override
        public Tuple2<String, String> call(Row value) throws Exception {
            Gson g = new Gson();  
            CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
            return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
        }
    }, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
    .map(new MapFunction<Tuple2<String,String>, Row>(){

        private static final long serialVersionUID = 1L;

        @Override
        public Row call(Tuple2<String, String> value) throws Exception {
            return RowFactory.create(value._1.toString(), value._2);
        }
    }, Encoders.javaSerialization(Row.class));

当我检查df.columns()时,它只显示一个名为value的列。

请问您需要帮助将Dataset<Row> 转换为具有两列(一列用于键,一列用于值)的数据集,以便我可以将其推送为键值对到另一个Kafka主题吗?

英文:

I am trying Spark connected to a Kafka topic which has Location Data.
I want to convert the df (the dataframe result) to key value pairs so that i can output it to another Kafka topic

Dataset&lt;Row&gt; df = spark.readStream().format(&quot;kafka&quot;)
			.option(&quot;kafka.bootstrap.servers&quot;, &quot;kafka:9092&quot;)
			.option(&quot;subscribe&quot;, &quot;cab-location&quot;)
			.option(&quot;startingOffsets&quot;, &quot;earliest&quot;).load();
	df = df.selectExpr(&quot;CAST(key AS STRING)&quot;, &quot;CAST(value AS STRING)&quot;)
	.map(new MapFunction&lt;Row, Tuple2&lt;String,String&gt;&gt;() {

		private static final long serialVersionUID = 1L;

		@Override
		public Tuple2&lt;String, String&gt; call(Row value) throws Exception {
			Gson g = new Gson();  
			CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
			return new Tuple2&lt;String, String&gt;(value.getString(0), cabLocationData.getCabName());
		}
	}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
	.map(new MapFunction&lt;Tuple2&lt;String,String&gt;, Row&gt;(){

		private static final long serialVersionUID = 1L;

		@Override
		public Row call(Tuple2&lt;String, String&gt; value) throws Exception {
			return RowFactory.create(value._1.toString(), value._2);
		}
	}, Encoders.javaSerialization(Row.class));

When I check df.columns() it shows only 1 column which is value.

Could you help to convert the Dataset<Row> to have 2 columns 1 for key and 1 for value so I can push it as key value pair to another Kafka topic

答案1

得分: 0

以下是代码的翻译部分:

感谢 @OneCricketeer 的建议。
以下是有效的代码。

如讨论所述,添加了 .withColumn 来从元组列 _1 和 _2 添加列键和值。

Dataset<Row> df = spark.readStream().format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "cab-location")
    .option("startingOffsets", "earliest").load();

Dataset<Tuple2<String, String>> df2 = df
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .map(new MapFunction<Row, Tuple2<String, String>>() {

        private static final long serialVersionUID = 1L;

        @Override
        public Tuple2<String, String> call(Row value) throws Exception {
            Gson g = new Gson();  
            CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
            return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
        }
    }, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));

df2
    .withColumn("key", df2.col("_1"))
    .withColumn("value", df2.col("_2"))
    .writeStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("checkpointLocation", "C:\\Workspace\\Java\\app\\ch")
    .option("topic", "location-output")
    .outputMode("append").start().awaitTermination();
英文:

Thank you @OneCricketeer with the suggestion.
Below is the code which worked

As discussed added the .withColumn to add column key and value from tuple columns _1 and _2

  Dataset&lt;Row&gt; df = spark.readStream().format(&quot;kafka&quot;)
.option(&quot;kafka.bootstrap.servers&quot;, &quot;kafka:9092&quot;)
.option(&quot;subscribe&quot;, &quot;cab-location&quot;)
.option(&quot;startingOffsets&quot;, &quot;earliest&quot;).load();
Dataset&lt;Tuple2&lt;String, String&gt;&gt; df2 = df
.selectExpr(&quot;CAST(key AS STRING)&quot;, &quot;CAST(value AS STRING)&quot;)
.map(new MapFunction&lt;Row, Tuple2&lt;String,String&gt;&gt;() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2&lt;String, String&gt; call(Row value) throws Exception {
Gson g = new Gson();  
CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
return new Tuple2&lt;String, String&gt;(value.getString(0), cabLocationData.getCabName());
}
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
df2
.withColumn(&quot;key&quot;, df2.col(&quot;_1&quot;))
.withColumn(&quot;value&quot;, df2.col(&quot;_2&quot;))
.writeStream()
.format(&quot;kafka&quot;)
.option(&quot;kafka.bootstrap.servers&quot;, &quot;kafka:9092&quot;)
.option(&quot;checkpointLocation&quot;, &quot;C:\\Workspace\\Java\\app\\ch&quot;)
.option(&quot;topic&quot;, &quot;location-output&quot;)
.outputMode(&quot;append&quot;).start().awaitTermination();

huangapple
  • 本文由 发表于 2023年5月26日 01:42:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/76334970.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定