英文:
PySpark compute mean of an RDD in a column of a dataframe
问题
I understand your instructions. Here is the translated code snippet:
我有一个数据框,其中一个列有一个项目列表(rdd)。请注意,这列“sorted_zipped”是使用PySpark的“arrays_zip”函数计算的(基于我已经删除的其他两列)。我想基于每个项目的第二个值计算项目的平均值。我刚刚从常规的Python Pandas切换到PySpark,事情有很大的不同。我正在尽快学习。
我想在这个数据框中再加一列,该列将具有列表中每个元素的平均值。对于此数据框的第三行:
[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]
第三个平均列应该如下(按平均值降序排列):
[{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]
首先,我了解到字典的等价物是PySpark中的Map。...我认为我可以将“sorted_zipped”的每一行创建为一个Map,为每个键计算平均值等,然后将其用作udf。不确定我是否朝着正确的方向前进,还是在摸索。任何帮助都将不胜感激。
```python
def get_avg_1(x):
rdd = parallelize(x)
rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
grouped_k = rdd2.groupByKey()
# 计算值的平均值
avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
return avg_map
在尝试使用上述udf时,我在Databricks上遇到了其他问题。由于Databricks自己创建了一个SparkContext,我无法将单独的上下文传递给工作节点。似乎在工作节点中使用sc存在某种限制。
更新:我尝试了以下方法:
import numpy as np
import json
schema = ArrayType(StructType([
StructField("GroupedBrands", StringType(), True),
StructField("GroupedWeights", FloatType(), True)
]))
array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
mean_df = sdf.withColumn("mean_value", array_mean("sorted_zipped"))
mean_df.show()
我得到了以下异常... 告诉我“sorted_zipped”的每一行都是List类型。
PythonException: An exception was thrown from a UDF: 'TypeError: cannot perform reduce with flexible type'
<details>
<summary>英文:</summary>
I have a dataframe where one of the columns has a list of items (rdd). Please note that the this column "sorted_zipped" was computed using "arrays_zip" function in PySpark (on two other columns that I have dropped since). I want to compute the mean of the items based on the second value of each item. I am just moving over from regular Python Pandas to PySpark and things are very different. I am learning as fast as I can.
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|sorted_zipped |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|13-2023 |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}] |
|14-2023 |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023 |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}] |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I want another column in this dataframe that will have the mean of each element in the list. for the third row of this dataframe:
[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]
The third, mean column should look like this (sorted in descending order of the mean values) :
[{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]
To begin with, I learnt that an equivalent of dictionaries is a Map in PySpark. ...thought that I could create a map out of each row of "sorted_zipped", compute the mean for each key etc and use it as an udf. Not sure if I am heading in the right direction or just plodding around. Any help is appreciated.
def get_avg_1(x):
rdd = parallelize(x)
rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
grouped_k = rdd2.groupByKey()
#print [(k, list(v)) for (k, v) in grouped_k.take(1)]
# compute avg of the values
avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
return avg_map
As I was trying to use the above udf, I hit other problems on databricks. As databricks creates a sparkcontext by itself, I cannot pass a separate context into the worker nodes. There seems to be some sort of restriction to the usage of sc in worker nodes.
Update: I tried this..
import numpy as np
import json
schema = ArrayType(StructType([
StructField("GroupedBrands", StringType(), True),
StructField("GroupedWeights", FloatType(), True)
]))
array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
mean_df = sdf.withColumn("mean_value", array_mean("sorted_zipped"))
mean_df.show()
I get the below exception...telling me that the each of row of "sorted_zipped" is of type List.
PythonException: An exception was thrown from a UDF: 'TypeError: cannot perform reduce with flexible type'
</details>
# 答案1
**得分**: 1
这是您提供的Python代码的翻译部分:
```python
您可以简单地使用纯Python来定义您的`UDF`,请查看此解决方案。
示例输入:
```python
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType
spark = SparkSession.builder.master("local[1]") \
.appName('TestApp') \
.getOrCreate()
data = [
("13-2023", [("bmw", 0.99), ("vw", 0.98), ("chevy", 0.97), ("buick", 0.96)]),
("14-2023", [("chevy", 0.98), ("bmw", 0.98), ("bmw", 0.978), ("bmw", 0.976), ("vw", 0.975), ("bmw", 0.975), ("bmw", 0.97), ("buick", 0.967), ("vw", 0.964), ("vw", 0.96), ("nissan", 0.96), ("chevy", 0.952), ("nissan", 0.95), ("nissan", 0.95), ("lexus", 0.95), ("lexus", 0.94), ("lexus", 0.94), ("nissan", 0.935), ("buick", 0.93), ("chevy", 0.928)]),
("15-2023", [("chevy", 0.992), ("bmw", 0.987), ("nissan", 0.982), ("bmw", 0.982), ("buick", 0.978), ("lexus", 0.976), ("bmw", 0.975), ("bmw", 0.97), ("chevy", 0.967), ("vw", 0.964), ("lexus", 0.961), ("nissan", 0.96), ("vw", 0.952), ("nissan", 0.952), ("vw", 0.952), ("lexus", 0.943)])
]
schema = StructType([ \
StructField("WeekOfYear", StringType(), True), \
StructField("sorted_zipped", ArrayType( \
StructType([ \
StructField("Brand", StringType(), True), \
StructField("Weight", FloatType(), True) \
]) \
), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)
定义您的UDF:
def mean(list):
mydict={}
for i in list:
if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
else: mydict[i.Brand] = [i.Weight]
for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
return mydict
mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))
应用UDF:
df.withColumn("mean_value", mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)
希望这对您有所帮助。如果您有任何其他问题,请随时提问。
英文:
Simply you can use pure Python to define your UDF
, checkout this solution
Sample input
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType
spark = SparkSession.builder.master("local[1]") \
.appName('TestApp') \
.getOrCreate()
data = [
("13-2023", [("bmw", 0.99), ("vw", 0.98), ("chevy", 0.97), ("buick", 0.96)]),
("14-2023", [("chevy", 0.98), ("bmw", 0.98), ("bmw", 0.978), ("bmw", 0.976), ("vw", 0.975), ("bmw", 0.975), ("bmw", 0.97), ("buick", 0.967), ("vw", 0.964), ("vw", 0.96), ("nissan", 0.96), ("chevy", 0.952), ("nissan", 0.95), ("nissan", 0.95), ("lexus", 0.95), ("lexus", 0.94), ("lexus", 0.94), ("nissan", 0.935), ("buick", 0.93), ("chevy", 0.928)]),
("15-2023", [("chevy", 0.992), ("bmw", 0.987), ("nissan", 0.982), ("bmw", 0.982), ("buick", 0.978), ("lexus", 0.976), ("bmw", 0.975), ("bmw", 0.97), ("chevy", 0.967), ("vw", 0.964), ("lexus", 0.961), ("nissan", 0.96), ("vw", 0.952), ("nissan", 0.952), ("vw", 0.952), ("lexus", 0.943)])
]
schema = StructType([ \
StructField("WeekOfYear", StringType(), True), \
StructField("sorted_zipped", ArrayType( \
StructType([ \
StructField("Brand", StringType(), True), \
StructField("Weight", FloatType(), True) \
]) \
), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|sorted_zipped |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|13-2023 |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}] |
|14-2023 |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023 |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}] |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Define your UDF
def mean(list):
mydict={}
for i in list:
if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
else: mydict[i.Brand] = [i.Weight]
for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
return mydict
mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))
Apply the UDF:
df.withColumn("mean_value", mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)
+----------+--------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|mean_value |
+----------+--------------------------------------------------------------------------------------------------------------------+
|13-2023 |{chevy -> 0.97, vw -> 0.98, buick -> 0.96, bmw -> 0.99} |
|14-2023 |{chevy -> 0.9533333, vw -> 0.9663333, buick -> 0.94850004, nissan -> 0.94875, lexus -> 0.9433333, bmw -> 0.97580004}|
|15-2023 |{chevy -> 0.9795, vw -> 0.95600003, buick -> 0.978, nissan -> 0.96466666, lexus -> 0.96000004, bmw -> 0.9785} |
+----------+--------------------------------------------------------------------------------------------------------------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论