英文:
PySpark compute mean of an RDD in a column of a dataframe
问题
I understand your instructions. Here is the translated code snippet:
我有一个数据框,其中一个列有一个项目列表(rdd)。请注意,这列“sorted_zipped”是使用PySpark的“arrays_zip”函数计算的(基于我已经删除的其他两列)。我想基于每个项目的第二个值计算项目的平均值。我刚刚从常规的Python Pandas切换到PySpark,事情有很大的不同。我正在尽快学习。
我想在这个数据框中再加一列,该列将具有列表中每个元素的平均值。对于此数据框的第三行:
[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]
第三个平均列应该如下(按平均值降序排列):
[{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]
首先,我了解到字典的等价物是PySpark中的Map。...我认为我可以将“sorted_zipped”的每一行创建为一个Map,为每个键计算平均值等,然后将其用作udf。不确定我是否朝着正确的方向前进,还是在摸索。任何帮助都将不胜感激。
```python
def get_avg_1(x):
rdd = parallelize(x)
rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
grouped_k = rdd2.groupByKey()
# 计算值的平均值
avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
return avg_map
在尝试使用上述udf时,我在Databricks上遇到了其他问题。由于Databricks自己创建了一个SparkContext,我无法将单独的上下文传递给工作节点。似乎在工作节点中使用sc存在某种限制。
更新:我尝试了以下方法:
import numpy as np
import json
schema = ArrayType(StructType([
StructField("GroupedBrands", StringType(), True),
StructField("GroupedWeights", FloatType(), True)
]))
array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
mean_df = sdf.withColumn("mean_value", array_mean("sorted_zipped"))
mean_df.show()
我得到了以下异常... 告诉我“sorted_zipped”的每一行都是List类型。
PythonException: An exception was thrown from a UDF: 'TypeError: cannot perform reduce with flexible type'
<details>
<summary>英文:</summary>
I have a dataframe where one of the columns has a list of items (rdd). Please note that the this column "sorted_zipped" was computed using "arrays_zip" function in PySpark (on two other columns that I have dropped since). I want to compute the mean of the items based on the second value of each item. I am just moving over from regular Python Pandas to PySpark and things are very different. I am learning as fast as I can.
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|sorted_zipped |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|13-2023 |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}] |
|14-2023 |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023 |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}] |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I want another column in this dataframe that will have the mean of each element in the list. for the third row of this dataframe:
[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]
The third, mean column should look like this (sorted in descending order of the mean values) :
[{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]
To begin with, I learnt that an equivalent of dictionaries is a Map in PySpark. ...thought that I could create a map out of each row of "sorted_zipped", compute the mean for each key etc and use it as an udf. Not sure if I am heading in the right direction or just plodding around. Any help is appreciated.
def get_avg_1(x):
rdd = parallelize(x)
rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
grouped_k = rdd2.groupByKey()
#print [(k, list(v)) for (k, v) in grouped_k.take(1)]
# compute avg of the values
avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
return avg_map
As I was trying to use the above udf, I hit other problems on databricks. As databricks creates a sparkcontext by itself, I cannot pass a separate context into the worker nodes. There seems to be some sort of restriction to the usage of sc in worker nodes.
Update: I tried this..
import numpy as np
import json
schema = ArrayType(StructType([
StructField("GroupedBrands", StringType(), True),
StructField("GroupedWeights", FloatType(), True)
]))
array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
mean_df = sdf.withColumn("mean_value", array_mean("sorted_zipped"))
mean_df.show()
I get the below exception...telling me that the each of row of "sorted_zipped" is of type List.
PythonException: An exception was thrown from a UDF: 'TypeError: cannot perform reduce with flexible type'
</details>
# 答案1
**得分**: 1
这是您提供的Python代码的翻译部分:
```python
您可以简单地使用纯Python来定义您的`UDF`,请查看此解决方案。
示例输入:
```python
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType
spark = SparkSession.builder.master("local[1]") \
.appName('TestApp') \
.getOrCreate()
data = [
("13-2023", [("bmw", 0.99), ("vw", 0.98), ("chevy", 0.97), ("buick", 0.96)]),
("14-2023", [("chevy", 0.98), ("bmw", 0.98), ("bmw", 0.978), ("bmw", 0.976), ("vw", 0.975), ("bmw", 0.975), ("bmw", 0.97), ("buick", 0.967), ("vw", 0.964), ("vw", 0.96), ("nissan", 0.96), ("chevy", 0.952), ("nissan", 0.95), ("nissan", 0.95), ("lexus", 0.95), ("lexus", 0.94), ("lexus", 0.94), ("nissan", 0.935), ("buick", 0.93), ("chevy", 0.928)]),
("15-2023", [("chevy", 0.992), ("bmw", 0.987), ("nissan", 0.982), ("bmw", 0.982), ("buick", 0.978), ("lexus", 0.976), ("bmw", 0.975), ("bmw", 0.97), ("chevy", 0.967), ("vw", 0.964), ("lexus", 0.961), ("nissan", 0.96), ("vw", 0.952), ("nissan", 0.952), ("vw", 0.952), ("lexus", 0.943)])
]
schema = StructType([ \
StructField("WeekOfYear", StringType(), True), \
StructField("sorted_zipped", ArrayType( \
StructType([ \
StructField("Brand", StringType(), True), \
StructField("Weight", FloatType(), True) \
]) \
), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)
定义您的UDF:
def mean(list):
mydict={}
for i in list:
if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
else: mydict[i.Brand] = [i.Weight]
for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
return mydict
mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))
应用UDF:
df.withColumn("mean_value", mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)
希望这对您有所帮助。如果您有任何其他问题,请随时提问。
英文:
Simply you can use pure Python to define your UDF, checkout this solution
Sample input
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType
spark = SparkSession.builder.master("local[1]") \
.appName('TestApp') \
.getOrCreate()
data = [
("13-2023", [("bmw", 0.99), ("vw", 0.98), ("chevy", 0.97), ("buick", 0.96)]),
("14-2023", [("chevy", 0.98), ("bmw", 0.98), ("bmw", 0.978), ("bmw", 0.976), ("vw", 0.975), ("bmw", 0.975), ("bmw", 0.97), ("buick", 0.967), ("vw", 0.964), ("vw", 0.96), ("nissan", 0.96), ("chevy", 0.952), ("nissan", 0.95), ("nissan", 0.95), ("lexus", 0.95), ("lexus", 0.94), ("lexus", 0.94), ("nissan", 0.935), ("buick", 0.93), ("chevy", 0.928)]),
("15-2023", [("chevy", 0.992), ("bmw", 0.987), ("nissan", 0.982), ("bmw", 0.982), ("buick", 0.978), ("lexus", 0.976), ("bmw", 0.975), ("bmw", 0.97), ("chevy", 0.967), ("vw", 0.964), ("lexus", 0.961), ("nissan", 0.96), ("vw", 0.952), ("nissan", 0.952), ("vw", 0.952), ("lexus", 0.943)])
]
schema = StructType([ \
StructField("WeekOfYear", StringType(), True), \
StructField("sorted_zipped", ArrayType( \
StructType([ \
StructField("Brand", StringType(), True), \
StructField("Weight", FloatType(), True) \
]) \
), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|sorted_zipped |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|13-2023 |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}] |
|14-2023 |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023 |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}] |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Define your UDF
def mean(list):
mydict={}
for i in list:
if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
else: mydict[i.Brand] = [i.Weight]
for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
return mydict
mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))
Apply the UDF:
df.withColumn("mean_value", mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)
+----------+--------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|mean_value |
+----------+--------------------------------------------------------------------------------------------------------------------+
|13-2023 |{chevy -> 0.97, vw -> 0.98, buick -> 0.96, bmw -> 0.99} |
|14-2023 |{chevy -> 0.9533333, vw -> 0.9663333, buick -> 0.94850004, nissan -> 0.94875, lexus -> 0.9433333, bmw -> 0.97580004}|
|15-2023 |{chevy -> 0.9795, vw -> 0.95600003, buick -> 0.978, nissan -> 0.96466666, lexus -> 0.96000004, bmw -> 0.9785} |
+----------+--------------------------------------------------------------------------------------------------------------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论