
huangapple go评论66阅读模式

PySpark compute mean of an RDD in a column of a dataframe


I understand your instructions. Here is the translated code snippet:

我有一个数据框其中一个列有一个项目列表rdd)。请注意这列sorted_zipped是使用PySpark的arrays_zip函数计算的基于我已经删除的其他两列)。我想基于每个项目的第二个值计算项目的平均值我刚刚从常规的Python Pandas切换到PySpark事情有很大的不同我正在尽快学习


[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]


[{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]


def get_avg_1(x):
    rdd = parallelize(x)
    rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
    grouped_k = rdd2.groupByKey()
    # 计算值的平均值
    avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
    return avg_map



import numpy as np
import json

schema = ArrayType(StructType([
    StructField("GroupedBrands", StringType(), True),
    StructField("GroupedWeights", FloatType(), True)

array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
mean_df = sdf.withColumn("mean_value", array_mean("sorted_zipped"))

我得到了以下异常... 告诉我“sorted_zipped”的每一行都是List类型。

PythonException: An exception was thrown from a UDF: 'TypeError: cannot perform reduce with flexible type'


I have a dataframe where one of the columns has a list of items (rdd). Please note that the this column &quot;sorted_zipped&quot; was computed using &quot;arrays_zip&quot; function in PySpark (on two other columns that I have dropped since). I want to compute the mean of the items based on the second value of each item. I am just moving over from regular Python Pandas to PySpark and things are very different. I am learning as fast as I can.

    |WeekOfYear|sorted_zipped                                                                                                                                                                                                                                                                                         |
    |13-2023   |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}]                                                                                                                                                                                                                                               |
    |14-2023   |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
    |15-2023   |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]                                                      |

I want another column in this dataframe that will have the mean of each element in the list. for the third row of this dataframe:

    [{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]

The third, mean column should look like this (sorted in descending order of the mean values) :

    [{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]

To begin with, I learnt that an equivalent of dictionaries is a Map in PySpark. ...thought that I could create a map out of each row of &quot;sorted_zipped&quot;, compute the mean for each key etc and use it as an udf. Not sure if I am heading in the right direction or just plodding around. Any help is appreciated.

    def get_avg_1(x):
      rdd = parallelize(x)
      rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
      grouped_k = rdd2.groupByKey()
      #print [(k, list(v)) for (k, v) in grouped_k.take(1)]
      # compute avg of the values
      avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
      return avg_map

As I was trying to use the above udf, I hit other problems on databricks. As databricks creates a sparkcontext by itself, I cannot pass a separate context into the worker nodes. There seems to be some sort of restriction to the usage of sc in worker nodes.

Update: I tried this..

    import numpy as np
    import json
    schema = ArrayType(StructType([
      StructField(&quot;GroupedBrands&quot;, StringType(), True),
      StructField(&quot;GroupedWeights&quot;, FloatType(), True)
    array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
    mean_df = sdf.withColumn(&quot;mean_value&quot;, array_mean(&quot;sorted_zipped&quot;))

I get the below exception...telling me that the each of row of &quot;sorted_zipped&quot; is of type List.

    PythonException: An exception was thrown from a UDF: &#39;TypeError: cannot perform reduce with flexible type&#39;


# 答案1
**得分**: 1




import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType

spark = SparkSession.builder.master("local[1]") \
                    .appName('TestApp') \

data = [
  ("13-2023", [("bmw", 0.99), ("vw", 0.98), ("chevy", 0.97), ("buick", 0.96)]),
  ("14-2023", [("chevy", 0.98), ("bmw", 0.98), ("bmw", 0.978), ("bmw", 0.976), ("vw", 0.975), ("bmw", 0.975), ("bmw", 0.97), ("buick", 0.967), ("vw", 0.964), ("vw", 0.96), ("nissan", 0.96), ("chevy", 0.952), ("nissan", 0.95), ("nissan", 0.95), ("lexus", 0.95), ("lexus", 0.94), ("lexus", 0.94), ("nissan", 0.935), ("buick", 0.93), ("chevy", 0.928)]),
  ("15-2023", [("chevy", 0.992), ("bmw", 0.987), ("nissan", 0.982), ("bmw", 0.982), ("buick", 0.978), ("lexus", 0.976), ("bmw", 0.975), ("bmw", 0.97), ("chevy", 0.967), ("vw", 0.964), ("lexus", 0.961), ("nissan", 0.96), ("vw", 0.952), ("nissan", 0.952), ("vw", 0.952), ("lexus", 0.943)])

schema = StructType([ \
    StructField("WeekOfYear", StringType(), True), \
    StructField("sorted_zipped", ArrayType( \
      StructType([ \
        StructField("Brand", StringType(), True), \
        StructField("Weight", FloatType(), True) \
      ]) \
    ), True) \
df = spark.createDataFrame(data=data,schema=schema)


def mean(list):
  for i in list:
    if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
    else: mydict[i.Brand] = [i.Weight]
  for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
  return mydict

mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))


df.withColumn("mean_value", mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)



Simply you can use pure Python to define your UDF, checkout this solution

Sample input

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType

spark = SparkSession.builder.master(&quot;local[1]&quot;) \
                    .appName(&#39;TestApp&#39;) \

data = [
  (&quot;13-2023&quot;, [(&quot;bmw&quot;, 0.99), (&quot;vw&quot;, 0.98), (&quot;chevy&quot;, 0.97), (&quot;buick&quot;, 0.96)]),
  (&quot;14-2023&quot;, [(&quot;chevy&quot;, 0.98), (&quot;bmw&quot;, 0.98), (&quot;bmw&quot;, 0.978), (&quot;bmw&quot;, 0.976), (&quot;vw&quot;, 0.975), (&quot;bmw&quot;, 0.975), (&quot;bmw&quot;, 0.97), (&quot;buick&quot;, 0.967), (&quot;vw&quot;, 0.964), (&quot;vw&quot;, 0.96), (&quot;nissan&quot;, 0.96), (&quot;chevy&quot;, 0.952), (&quot;nissan&quot;, 0.95), (&quot;nissan&quot;, 0.95), (&quot;lexus&quot;, 0.95), (&quot;lexus&quot;, 0.94), (&quot;lexus&quot;, 0.94), (&quot;nissan&quot;, 0.935), (&quot;buick&quot;, 0.93), (&quot;chevy&quot;, 0.928)]),
  (&quot;15-2023&quot;, [(&quot;chevy&quot;, 0.992), (&quot;bmw&quot;, 0.987), (&quot;nissan&quot;, 0.982), (&quot;bmw&quot;, 0.982), (&quot;buick&quot;, 0.978), (&quot;lexus&quot;, 0.976), (&quot;bmw&quot;, 0.975), (&quot;bmw&quot;, 0.97), (&quot;chevy&quot;, 0.967), (&quot;vw&quot;, 0.964), (&quot;lexus&quot;, 0.961), (&quot;nissan&quot;, 0.96), (&quot;vw&quot;, 0.952), (&quot;nissan&quot;, 0.952), (&quot;vw&quot;, 0.952), (&quot;lexus&quot;, 0.943)])

schema = StructType([ \
    StructField(&quot;WeekOfYear&quot;, StringType(), True), \
    StructField(&quot;sorted_zipped&quot;, ArrayType( \
      StructType([ \
        StructField(&quot;Brand&quot;, StringType(), True), \
        StructField(&quot;Weight&quot;, FloatType(), True) \
      ]) \
    ), True) \
df = spark.createDataFrame(data=data,schema=schema)
|WeekOfYear|sorted_zipped                                                                                                                                                                                                                                                                                         |
|13-2023   |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}]                                                                                                                                                                                                                                               |
|14-2023   |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023   |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]                                                      |

Define your UDF

def mean(list):
  for i in list:
    if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
    else: mydict[i.Brand] = [i.Weight]
  for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
  return mydict

mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))

Apply the UDF:

df.withColumn(&quot;mean_value&quot;, mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)
|WeekOfYear|mean_value                                                                                                          |
|13-2023   |{chevy -&gt; 0.97, vw -&gt; 0.98, buick -&gt; 0.96, bmw -&gt; 0.99}                                                             |
|14-2023   |{chevy -&gt; 0.9533333, vw -&gt; 0.9663333, buick -&gt; 0.94850004, nissan -&gt; 0.94875, lexus -&gt; 0.9433333, bmw -&gt; 0.97580004}|
|15-2023   |{chevy -&gt; 0.9795, vw -&gt; 0.95600003, buick -&gt; 0.978, nissan -&gt; 0.96466666, lexus -&gt; 0.96000004, bmw -&gt; 0.9785}       |

  • 本文由 发表于 2023年5月10日 21:47:54
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
