如何在处理来自Kafka的记录时将字节转换为字符串?

huangapple go评论47阅读模式
英文:

How to convert bytes to string while processing records from Kafka?

问题

我正在使用Spark与Kafka。我正在运行一个流查询并从一个Kafka主题中读取数据。我的代码是 -

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.concurrent.TimeoutException;

public class Kafka {
    public static void main(String args[]){
        SparkSession spark = SparkSession
                .builder()
                .appName("Spark-Kafka-Integration")
                .config("spark.master", "local")
                .getOrCreate();

        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "quickstart-events")
                .load();
        df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

        StreamingQuery query = null;
        try {
            query = df.writeStream()
                    .outputMode("append")
                    .format("console")
                    .start();
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            throw new RuntimeException(e);
        }

    }
}

我正在使用以下命令从命令行向主题中输入新的事件 -

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

然而,我在控制台上得到的输出如下所示 -

如何在处理来自Kafka的记录时将字节转换为字符串?

我已将输出转换为字符串,但键和值仍然不是字符串。如何将它们转换为字符串?

英文:

I am using spark with kafka. I am running a streaming query and reading from a kafka topic. My code is -

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.concurrent.TimeoutException;

public class Kafka {
    public static void main(String args[]){
        SparkSession spark = SparkSession
                .builder()
                .appName(&quot;Spark-Kafka-Integration&quot;)
                .config(&quot;spark.master&quot;, &quot;local&quot;)
                .getOrCreate();

        Dataset&lt;Row&gt; df = spark
                .readStream()
                .format(&quot;kafka&quot;)
                .option(&quot;kafka.bootstrap.servers&quot;, &quot;localhost:9092&quot;)
                .option(&quot;subscribe&quot;, &quot;quickstart-events&quot;)
                .load();
        df.selectExpr(&quot;CAST(key AS STRING)&quot;, &quot;CAST(value AS STRING)&quot;);

        StreamingQuery query = null;
        try {
            query = df.writeStream()
                    .outputMode(&quot;append&quot;)
                    .format(&quot;console&quot;)
                    .start();
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            throw new RuntimeException(e);
        }

    }
}

I am entering new events in the topic from the command line using the command -

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

However, the output I am getting on my console looks like this -

如何在处理来自Kafka的记录时将字节转换为字符串?

I have casted the output as string, still the key and value are not strings. How can I convert them in string ?

答案1

得分: 2

You are missing assignment in the expression:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

It should be

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
英文:

You are missing assignment in the expression:

df.selectExpr(&quot;CAST(key AS STRING)&quot;, &quot;CAST(value AS STRING)&quot;);

It should be

df = df.selectExpr(&quot;CAST(key AS STRING)&quot;, &quot;CAST(value AS STRING)&quot;);

huangapple
  • 本文由 发表于 2023年6月13日 01:08:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/76458887.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定