PySpark在DataFrame的一列中计算RDD的平均值。

huangapple go评论66阅读模式
英文:

PySpark compute mean of an RDD in a column of a dataframe

问题

I understand your instructions. Here is the translated code snippet:

我有一个数据框其中一个列有一个项目列表rdd)。请注意这列sorted_zipped是使用PySpark的arrays_zip函数计算的基于我已经删除的其他两列)。我想基于每个项目的第二个值计算项目的平均值我刚刚从常规的Python Pandas切换到PySpark事情有很大的不同我正在尽快学习

我想在这个数据框中再加一列该列将具有列表中每个元素的平均值对于此数据框的第三行

[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]

第三个平均列应该如下按平均值降序排列):

[{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]

首先我了解到字典的等价物是PySpark中的Map...我认为我可以将sorted_zipped的每一行创建为一个Map为每个键计算平均值等然后将其用作udf不确定我是否朝着正确的方向前进还是在摸索任何帮助都将不胜感激

```python
def get_avg_1(x):
    rdd = parallelize(x)
    rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
    grouped_k = rdd2.groupByKey()
    
    # 计算值的平均值
    avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
    return avg_map

在尝试使用上述udf时,我在Databricks上遇到了其他问题。由于Databricks自己创建了一个SparkContext,我无法将单独的上下文传递给工作节点。似乎在工作节点中使用sc存在某种限制。

更新:我尝试了以下方法:

import numpy as np
import json

schema = ArrayType(StructType([
    StructField("GroupedBrands", StringType(), True),
    StructField("GroupedWeights", FloatType(), True)
]))

array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
mean_df = sdf.withColumn("mean_value", array_mean("sorted_zipped"))

mean_df.show()

我得到了以下异常... 告诉我“sorted_zipped”的每一行都是List类型。

PythonException: An exception was thrown from a UDF: 'TypeError: cannot perform reduce with flexible type'


<details>
<summary>英文:</summary>

I have a dataframe where one of the columns has a list of items (rdd). Please note that the this column &quot;sorted_zipped&quot; was computed using &quot;arrays_zip&quot; function in PySpark (on two other columns that I have dropped since). I want to compute the mean of the items based on the second value of each item. I am just moving over from regular Python Pandas to PySpark and things are very different. I am learning as fast as I can.

    +----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |WeekOfYear|sorted_zipped                                                                                                                                                                                                                                                                                         |
    +----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |13-2023   |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}]                                                                                                                                                                                                                                               |
    |14-2023   |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
    |15-2023   |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]                                                      |
    +----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I want another column in this dataframe that will have the mean of each element in the list. for the third row of this dataframe:

    [{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]

 
The third, mean column should look like this (sorted in descending order of the mean values) :

    [{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]

To begin with, I learnt that an equivalent of dictionaries is a Map in PySpark. ...thought that I could create a map out of each row of &quot;sorted_zipped&quot;, compute the mean for each key etc and use it as an udf. Not sure if I am heading in the right direction or just plodding around. Any help is appreciated.

    def get_avg_1(x):
      rdd = parallelize(x)
      rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
      grouped_k = rdd2.groupByKey()
      #print [(k, list(v)) for (k, v) in grouped_k.take(1)]
    
      # compute avg of the values
      avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
      return avg_map

As I was trying to use the above udf, I hit other problems on databricks. As databricks creates a sparkcontext by itself, I cannot pass a separate context into the worker nodes. There seems to be some sort of restriction to the usage of sc in worker nodes.

Update: I tried this..

    import numpy as np
    import json
    
    schema = ArrayType(StructType([
      StructField(&quot;GroupedBrands&quot;, StringType(), True),
      StructField(&quot;GroupedWeights&quot;, FloatType(), True)
    ]))
    
    array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
    mean_df = sdf.withColumn(&quot;mean_value&quot;, array_mean(&quot;sorted_zipped&quot;))
    
    mean_df.show()

I get the below exception...telling me that the each of row of &quot;sorted_zipped&quot; is of type List.

    PythonException: An exception was thrown from a UDF: &#39;TypeError: cannot perform reduce with flexible type&#39;

</details>


# 答案1
**得分**: 1

这是您提供的Python代码的翻译部分:

```python
您可以简单地使用纯Python来定义您的`UDF`,请查看此解决方案。

示例输入:

```python
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType

spark = SparkSession.builder.master("local[1]") \
                    .appName('TestApp') \
                    .getOrCreate()

data = [
  ("13-2023", [("bmw", 0.99), ("vw", 0.98), ("chevy", 0.97), ("buick", 0.96)]),
  ("14-2023", [("chevy", 0.98), ("bmw", 0.98), ("bmw", 0.978), ("bmw", 0.976), ("vw", 0.975), ("bmw", 0.975), ("bmw", 0.97), ("buick", 0.967), ("vw", 0.964), ("vw", 0.96), ("nissan", 0.96), ("chevy", 0.952), ("nissan", 0.95), ("nissan", 0.95), ("lexus", 0.95), ("lexus", 0.94), ("lexus", 0.94), ("nissan", 0.935), ("buick", 0.93), ("chevy", 0.928)]),
  ("15-2023", [("chevy", 0.992), ("bmw", 0.987), ("nissan", 0.982), ("bmw", 0.982), ("buick", 0.978), ("lexus", 0.976), ("bmw", 0.975), ("bmw", 0.97), ("chevy", 0.967), ("vw", 0.964), ("lexus", 0.961), ("nissan", 0.96), ("vw", 0.952), ("nissan", 0.952), ("vw", 0.952), ("lexus", 0.943)])
]

schema = StructType([ \
    StructField("WeekOfYear", StringType(), True), \
    StructField("sorted_zipped", ArrayType( \
      StructType([ \
        StructField("Brand", StringType(), True), \
        StructField("Weight", FloatType(), True) \
      ]) \
    ), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)

定义您的UDF:

def mean(list):
  mydict={}
  for i in list:
    if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
    else: mydict[i.Brand] = [i.Weight]
    
  for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
  return mydict

mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))

应用UDF:

df.withColumn("mean_value", mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)

希望这对您有所帮助。如果您有任何其他问题,请随时提问。

英文:

Simply you can use pure Python to define your UDF, checkout this solution

Sample input


import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType

spark = SparkSession.builder.master(&quot;local[1]&quot;) \
                    .appName(&#39;TestApp&#39;) \
                    .getOrCreate()

data = [
  (&quot;13-2023&quot;, [(&quot;bmw&quot;, 0.99), (&quot;vw&quot;, 0.98), (&quot;chevy&quot;, 0.97), (&quot;buick&quot;, 0.96)]),
  (&quot;14-2023&quot;, [(&quot;chevy&quot;, 0.98), (&quot;bmw&quot;, 0.98), (&quot;bmw&quot;, 0.978), (&quot;bmw&quot;, 0.976), (&quot;vw&quot;, 0.975), (&quot;bmw&quot;, 0.975), (&quot;bmw&quot;, 0.97), (&quot;buick&quot;, 0.967), (&quot;vw&quot;, 0.964), (&quot;vw&quot;, 0.96), (&quot;nissan&quot;, 0.96), (&quot;chevy&quot;, 0.952), (&quot;nissan&quot;, 0.95), (&quot;nissan&quot;, 0.95), (&quot;lexus&quot;, 0.95), (&quot;lexus&quot;, 0.94), (&quot;lexus&quot;, 0.94), (&quot;nissan&quot;, 0.935), (&quot;buick&quot;, 0.93), (&quot;chevy&quot;, 0.928)]),
  (&quot;15-2023&quot;, [(&quot;chevy&quot;, 0.992), (&quot;bmw&quot;, 0.987), (&quot;nissan&quot;, 0.982), (&quot;bmw&quot;, 0.982), (&quot;buick&quot;, 0.978), (&quot;lexus&quot;, 0.976), (&quot;bmw&quot;, 0.975), (&quot;bmw&quot;, 0.97), (&quot;chevy&quot;, 0.967), (&quot;vw&quot;, 0.964), (&quot;lexus&quot;, 0.961), (&quot;nissan&quot;, 0.96), (&quot;vw&quot;, 0.952), (&quot;nissan&quot;, 0.952), (&quot;vw&quot;, 0.952), (&quot;lexus&quot;, 0.943)])
]

schema = StructType([ \
    StructField(&quot;WeekOfYear&quot;, StringType(), True), \
    StructField(&quot;sorted_zipped&quot;, ArrayType( \
      StructType([ \
        StructField(&quot;Brand&quot;, StringType(), True), \
        StructField(&quot;Weight&quot;, FloatType(), True) \
      ]) \
    ), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|sorted_zipped                                                                                                                                                                                                                                                                                         |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|13-2023   |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}]                                                                                                                                                                                                                                               |
|14-2023   |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023   |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]                                                      |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Define your UDF


def mean(list):
  mydict={}
  for i in list:
    if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
    else: mydict[i.Brand] = [i.Weight]
    
  for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
  return mydict

mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))

Apply the UDF:

df.withColumn(&quot;mean_value&quot;, mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)
+----------+--------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|mean_value                                                                                                          |
+----------+--------------------------------------------------------------------------------------------------------------------+
|13-2023   |{chevy -&gt; 0.97, vw -&gt; 0.98, buick -&gt; 0.96, bmw -&gt; 0.99}                                                             |
|14-2023   |{chevy -&gt; 0.9533333, vw -&gt; 0.9663333, buick -&gt; 0.94850004, nissan -&gt; 0.94875, lexus -&gt; 0.9433333, bmw -&gt; 0.97580004}|
|15-2023   |{chevy -&gt; 0.9795, vw -&gt; 0.95600003, buick -&gt; 0.978, nissan -&gt; 0.96466666, lexus -&gt; 0.96000004, bmw -&gt; 0.9785}       |
+----------+--------------------------------------------------------------------------------------------------------------------+

huangapple
  • 本文由 发表于 2023年5月10日 21:47:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/76219214.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定