在pySpark中计算非唯一列表元素的累积和。

huangapple go评论76阅读模式
英文:

Calculating cumulative sum over non unique list elements in pySpark

问题

我有一个PySpark数据框,其中包含一个包含列表的列。列表项可能在行之间重叠。我需要按'orderCol'列排序,沿着行计算唯一列表元素的累积和。在我的应用程序中,可能有数百万行和每个列表中的数百个项。我似乎无法理解如何在PySpark中做到这一点,以便它能够扩展,并且会非常感谢任何关于如何解决这个问题的大或小的想法。

我已经发布了输入和期望的输出,以便您了解我试图实现的目标。

英文:

I have a PySpark dataframe with a column containing lists. The list items might overlap across rows. I need the cumulative sum of unique list elements down through the rows ordered by 'orderCol' column. In my application there might be millions of rows and hundreds of items in each list. I can't seem to wrap my brain around how to do this in PySpark so that it scales and would be grateful for any ideas big or small on how to solve it.

I have posted input and desired output to give an idea of what I'm trying to achieve.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
        {"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
        {"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
        {"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4},
        ]

df = spark.createDataFrame(data)
df.show()

data_out = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1, "cumulative_item_count": 4},
        {"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2, "cumulative_item_count": 7},
        {"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3, "cumulative_item_count": 9},
        {"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4, "cumulative_item_count": 10},
        ]

df_out = spark.createDataFrame(data_out)
df_out.show()

答案1

得分: 2

尝试使用窗口函数,使用unboundedPreceedingcurrentRow

然后将嵌套数组展开。

最后,我们将使用array_distinctsize函数来计算数组中的不同元素数量。

示例代码:

from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *

data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
        {"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
        {"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
        {"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4}
       ]

w = Window.partitionBy(lit(1)).orderBy("orderCol").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = spark.createDataFrame(data).\
  withColumn("temp_col", collect_list(col("items")).over(w)).\
  withColumn("cumulative_item_count", size(array_distinct(flatten(col("temp_col")))))
df.show(20, False)

这段代码的作用是对给定数据进行一系列操作,最终计算累积的不同元素数量。

英文:

Try with window function using unboundedPreceeding to currentRow.

Then flatten the nested array.

Finally we will array_distinct+ size functions to count distinct elements in the array.

Example:

from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
        {"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
        {"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
        {"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4},
        ]

w=Window.partitionBy(lit(1)).orderBy("orderCol").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = spark.createDataFrame(data).\
  withColumn("temp_col",collect_list(col("items")).over(w)).\
  withColumn("cumulative_item_count",size(array_distinct(flatten(col("temp_col")))))
df.show(20,False)

#+------------+----+--------+--------------------------------------------------------+---------------------+
#|items       |node|orderCol|temp_col                                                |cumulative_item_count|
#+------------+----+--------+--------------------------------------------------------+---------------------+
#|[a, b, c, d]|r1  |1       |[[a, b, c, d]]                                          |4                    |
#|[e, f, g, a]|r2  |2       |[[a, b, c, d], [e, f, g, a]]                            |7                    |
#|[h, i, g, b]|r3  |3       |[[a, b, c, d], [e, f, g, a], [h, i, g, b]]              |9                    |
#|[j, i, f, c]|r4  |4       |[[a, b, c, d], [e, f, g, a], [h, i, g, b], [j, i, f, c]]|10                   |

huangapple
  • 本文由 发表于 2023年4月20日 07:37:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76059539.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定