如何在结构化流处理中将数据框转换为弹性分布式数据集(RDDs)?

huangapple go评论64阅读模式
英文:

How to transform dataframes to rdds in structured streaming?

问题

我从Kafka使用PySpark Streaming获取数据,结果是一个DataFrame,当我将DataFrame转换为RDD时出错:

Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka';

正确的版本代码如下

```python
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()

q.awaitTermination()

这是错误的版本代码:

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()

q.awaitTermination()

为什么无法将DataFrame转换为RDD?在PySpark Streaming中想要将DataFrame转换为RDD时应该怎么做?

英文:

I get data from kafka using pyspark streaming, and the result is a dataframe, when I transform dataframe to rdd, it went wrong:

Traceback (most recent call last):
File &quot;/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py&quot;, line 36, in &lt;module&gt;
df = df.rdd.map(lambda x: x.value.split(&quot; &quot;)).toDF()
File &quot;/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py&quot;, line 91, in rdd
File &quot;/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py&quot;, line 1257, in __call__
File &quot;/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py&quot;, line 69, in deco
pyspark.sql.utils.AnalysisException: &#39;Queries with streaming sources must be executed with writeStream.start();;\nkafka&#39;

the right version code:

spark = SparkSession \
    .builder \
    .appName(&quot;StructuredNetworkWordCount&quot;) \
    .getOrCreate()

df = spark \
    .readStream \
    .format(&quot;kafka&quot;) \
    .option(&quot;kafka.bootstrap.servers&quot;, &quot;localhost:9092&quot;) \
    .option(&quot;subscribe&quot;, &quot;test&quot;) \
    .load()

df = df.selectExpr(&quot;CAST(key AS STRING)&quot;, &quot;CAST(value AS STRING)&quot;)
df = df.withColumn(&quot;s&quot;, F.split(df[&#39;value&#39;], &quot; &quot;))
df = df.withColumn(&#39;e&#39;, F.explode(df[&#39;s&#39;]))
# df = df.rdd.map(lambda x: x.value.split(&quot; &quot;)).toDF()

q = df.writeStream \
    .format(&quot;console&quot;) \
    .trigger(processingTime=&#39;30 seconds&#39;) \
    .start()

q.awaitTermination()

this is the wrong version code:

spark = SparkSession \
    .builder \
    .appName(&quot;StructuredNetworkWordCount&quot;) \
    .getOrCreate()

df = spark \
    .readStream \
    .format(&quot;kafka&quot;) \
    .option(&quot;kafka.bootstrap.servers&quot;, &quot;localhost:9092&quot;) \
    .option(&quot;subscribe&quot;, &quot;test&quot;) \
    .load()

df = df.selectExpr(&quot;CAST(key AS STRING)&quot;, &quot;CAST(value AS STRING)&quot;)
# df = df.withColumn(&quot;s&quot;, F.split(df[&#39;value&#39;], &quot; &quot;))
# df = df.withColumn(&#39;e&#39;, F.explode(df[&#39;s&#39;]))
df = df.rdd.map(lambda x: x.value.split(&quot; &quot;)).toDF()

q = df.writeStream \
    .format(&quot;console&quot;) \
    .trigger(processingTime=&#39;30 seconds&#39;) \
    .start()

q.awaitTermination()

Why it cannot convert dataframe to rdd? and how can I do when I want to transform dataframe to rdd in pyspark streaming?

答案1

得分: 3

如果您的 Spark 版本是 2.4.0 及以上,那么您可以使用以下替代方法来处理数据框的每一行:

query = df.writeStream.foreach(自定义方法以处理数据框的每一行而不是 RDD).outputMode("update").start()
ssc.start()
ssc.awaitTermination()

请注意,我已将代码部分保持原样,只翻译了相关文本。

英文:

If your spark version is 2.4.0 and above then u can use below alternative to play around with each row of your dataframe.

query=df.writeStream.foreach(Customized method to work on each row of dataframe rather than RDD).outputMode(&quot;update&quot;).start()
	ssc.start()
	ssc.awaitTermination()

答案2

得分: 2

这个RDD方面根本不受支持。RDDs已经过时,而Spark结构化流是基于DataFrame和DataSet的。不管是流式处理还是批处理,都使用相同的抽象。

英文:

This RDD aspect is simply NOT supported. RDDs are legacy and Spark Structured Streaming is DF/DS based. Common abstraction whether streaming or batch.

答案3

得分: 1

要执行特定操作于你的DataFrame字段,你可以使用UDF函数,甚至可以创建你自己的Spark自定义转换器。但是有一些DataFrame操作不受支持,比如转换为RDD。

英文:

To perform specific actions over your Dataframe fields you can use UDF functions or even you can create your Spark Custom Transformers. But there are some Dataframe operations that are not supported like transforming to RDD.

答案4

得分: 0

Structured Streaming 正在运行在 Spark-SQL 引擎上。不支持将 DataFrame 或 Dataset 转换为 RDD。

英文:

structured streaming is running on the spark-sql enginer.Conversion of dataframe or dataset to RDD is not supported.

huangapple
  • 本文由 发表于 2020年1月6日 15:19:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/59608106.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定