英文:
Calculating cumulative sum over non unique list elements in pySpark
问题
我有一个PySpark数据框,其中包含一个包含列表的列。列表项可能在行之间重叠。我需要按'orderCol'列排序,沿着行计算唯一列表元素的累积和。在我的应用程序中,可能有数百万行和每个列表中的数百个项。我似乎无法理解如何在PySpark中做到这一点,以便它能够扩展,并且会非常感谢任何关于如何解决这个问题的大或小的想法。
我已经发布了输入和期望的输出,以便您了解我试图实现的目标。
英文:
I have a PySpark dataframe with a column containing lists. The list items might overlap across rows. I need the cumulative sum of unique list elements down through the rows ordered by 'orderCol' column. In my application there might be millions of rows and hundreds of items in each list. I can't seem to wrap my brain around how to do this in PySpark so that it scales and would be grateful for any ideas big or small on how to solve it.
I have posted input and desired output to give an idea of what I'm trying to achieve.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
{"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
{"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
{"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4},
]
df = spark.createDataFrame(data)
df.show()
data_out = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1, "cumulative_item_count": 4},
{"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2, "cumulative_item_count": 7},
{"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3, "cumulative_item_count": 9},
{"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4, "cumulative_item_count": 10},
]
df_out = spark.createDataFrame(data_out)
df_out.show()
答案1
得分: 2
尝试使用窗口函数,使用unboundedPreceeding
到currentRow
。
然后将嵌套数组展开。
最后,我们将使用array_distinct
和size
函数来计算数组中的不同元素数量。
示例代码:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
{"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
{"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
{"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4}
]
w = Window.partitionBy(lit(1)).orderBy("orderCol").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = spark.createDataFrame(data).\
withColumn("temp_col", collect_list(col("items")).over(w)).\
withColumn("cumulative_item_count", size(array_distinct(flatten(col("temp_col")))))
df.show(20, False)
这段代码的作用是对给定数据进行一系列操作,最终计算累积的不同元素数量。
英文:
Try with window function using unboundedPreceeding
to currentRow
.
Then flatten
the nested array.
Finally we will array_distinct
+ size
functions to count distinct elements in the array.
Example:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
{"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
{"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
{"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4},
]
w=Window.partitionBy(lit(1)).orderBy("orderCol").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = spark.createDataFrame(data).\
withColumn("temp_col",collect_list(col("items")).over(w)).\
withColumn("cumulative_item_count",size(array_distinct(flatten(col("temp_col")))))
df.show(20,False)
#+------------+----+--------+--------------------------------------------------------+---------------------+
#|items |node|orderCol|temp_col |cumulative_item_count|
#+------------+----+--------+--------------------------------------------------------+---------------------+
#|[a, b, c, d]|r1 |1 |[[a, b, c, d]] |4 |
#|[e, f, g, a]|r2 |2 |[[a, b, c, d], [e, f, g, a]] |7 |
#|[h, i, g, b]|r3 |3 |[[a, b, c, d], [e, f, g, a], [h, i, g, b]] |9 |
#|[j, i, f, c]|r4 |4 |[[a, b, c, d], [e, f, g, a], [h, i, g, b], [j, i, f, c]]|10 |
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论