Pyspark – 如何在不使用explode的情况下获取数组中特定元素的计数?

huangapple go评论73阅读模式
英文:

Pyspark - How to get count of a particular element in an array without exploding?

问题

|姓名|动作|走步次数|跑步次数|
|-----|----|----|-----|
|a|[走,跑,坐,走,跑,坐]|2|2|
|b|[走,坐,跑,走,睡,等待]|2|0|
英文:

Input dataframe:

Name action
a [walk,run,sit,walk,run,sit]
b [walk,sit,run,walk,sleep,wait]

Calculate action count of walk and run without exploding the array like below output dataframe.

Name action walk_count run_count
a [walk,run,sit,walk,run,sit] 2 2
b [walk,sit,run,walk,sleep,wait] 2 0

答案1

得分: 2

尝试在pyspark中使用higher order **filter**函数。

  • size(filter(action, n -> n == 'walk')) - 获取数组中每个项,然后仅过滤匹配的记录,然后获取数组的大小。

示例:

df = spark.createDataFrame([('a', ['walk', 'run', 'sit', 'walk', 'run', 'sit'])], ['name', 'action'])
df.withColumn("walk_count", expr("size(filter(action, n -> n == 'walk'))")).
   withColumn("run_count", expr("size(filter(action, n -> n == 'run'))")).
   show(10, False)
+----+--------------------------------+----------+---------+
|name|action                          |walk_count|run_count|
+----+--------------------------------+----------+---------+
|a   |[walk, run, sit, walk, run, sit]|2         |2        |
+----+--------------------------------+----------+---------+
英文:

Try with higher order filter function in pyspark.

  • size(filter(action,n -> n == 'walk')) -> get the array each item then filter only the matching record then get the size of array

Example:

df = spark.createDataFrame([('a',['walk','run','sit','walk','run','sit'])],['name','action'])
df.withColumn("walk_count", expr("""size(filter(action,n -> n == 'walk'))""")).\
withColumn("run_count", expr("""size(filter(action,n -> n == 'run'))""")).\
show(10,False)

#+----+--------------------------------+----------+---------+
#|name|action                          |walk_count|run_count|
#+----+--------------------------------+----------+---------+
#|a   |[walk, run, sit, walk, run, sit]|2         |2        |
#+----+--------------------------------+----------+---------+

答案2

得分: 1

你可以按照以下方式使用aggregate函数:

agg_cols = ['walk', 'run']

for col in agg_cols:
    df = df.withColumn(f'{col}_count', f.expr(f"aggregate(action, 0, (acc, x) -> if(x = '{col}', acc + 1, acc))"))

df.show(truncate=False)

+----+--------------------------------+----------+---------+
|name|action                          |walk_count|run_count|
+----+--------------------------------+----------+---------+
|a   |[walk, run, sit, walk, run, sit]|2         |2        |
+----+--------------------------------+----------+---------+
英文:

You can use the aggregate function as follows:

agg_cols = ['walk', 'run']

for col in agg_cols:
    df = df.withColumn(f'{col}_count', f.expr(f"aggregate(action, 0, (acc, x) -> if(x = '{col}', acc + 1, acc))"))

df.show(truncate=False)

+----+--------------------------------+----------+---------+
|name|action                          |walk_count|run_count|
+----+--------------------------------+----------+---------+
|a   |[walk, run, sit, walk, run, sit]|2         |2        |
+----+--------------------------------+----------+---------+

huangapple
  • 本文由 发表于 2023年7月7日 01:34:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76631280.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定