英文:
Parallelize an operation applied to a list (PySpark)
问题
在我的程序中的某一点,一个函数接收一个列表并对其中的每个项执行操作。
举例来说,假设我的程序开始时有一个名为 l
的列表,然后统计某个通用的 PySpark 数据帧 df
满足条件的行数。
l = [2, 4, 5]
res = []
for x in l:
val = df.where((col('id') == x) | (col('id2') == x)).count()
res.append(val)
是否可能让多个工作节点同时计算 val
?也就是说,每个工作节点是否可以独立地计算自己的 val
并将其追加到 l
中?
这个帖子 建议使用 foreach
,但由于我正在迭代一个列表(而不是 RDD 或数据帧),我无法使用该方法。
英文:
At one point in my program, a function receives a list and repeats an operation using each of its items.
For the sake of this example, suppose my program starts off with a list l
and counts the rows of some generic pyspark dataframe df
that meet a condition.
l = [2, 4, 5]
res = []
for x in l:
val = df.where((col('id') == x) | (col('id2') == x)).count()
res.append(val)
Is it possible to have multiple workers calculate val
simultaneously? That is, could each worker calculate its own val
and append it to l
individually?
This post suggests using foreach
, but since I'm iterating over a list (not an RDD or dataframe), I cannot use that method.
答案1
得分: 1
你可以放弃循环,使用Spark函数来计算出现次数。
以下是一个示例:
# 模拟输入数据
data_sdf.show()
# +---+---+
# | id|id2|
# +---+---+
# | 2| 4|
# | 4| 1|
# | 3| 5|
# | 4| 5|
# | 1| 8|
# +---+---+
# 创建包含所有id的数组,并移除不在该行中的id
data_sdf. \
withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
filter(func.size('id_per_row') != 0). \
select('*', func.explode('id_per_row').alias('ids')). \
groupBy('ids'). \
count(). \
show()
# +---+-----+
# |ids|count|
# +---+-----+
# | 4| 3|
# | 2| 1|
# | 5| 2|
# +---+-----+
data_sdf. \
withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
show(truncate=False)
# +---+---+---------+----------+
# |id |id2|id_ls |id_per_row|
# +---+---+---------+----------+
# | 2| 4|[2, 4, 5]|[2, 4] |
# | 4| 1|[2, 4, 5]|[4] |
# | 3| 5|[2, 4, 5]|[5] |
# | 4| 5|[2, 4, 5]|[4, 5] |
# | 1| 8|[2, 4, 5]|[] |
# +---+---+---------+----------+
英文:
you could do away with loops and use spark functions to count occurrences.
here's an example
# mock input data
data_sdf.show()
# +---+---+
# | id|id2|
# +---+---+
# | 2| 4|
# | 4| 1|
# | 3| 5|
# | 4| 5|
# | 1| 8|
# +---+---+
# create array of all ids, and remove ids that are not in that row
data_sdf. \
withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
filter(func.size('id_per_row') != 0). \
select('*', func.explode('id_per_row').alias('ids')). \
groupBy('ids'). \
count(). \
show()
# +---+-----+
# |ids|count|
# +---+-----+
# |4 |3 |
# |2 |1 |
# |5 |2 |
# +---+-----+
data_sdf. \
withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
show(truncate=False)
# +---+---+---------+----------+
# |id |id2|id_ls |id_per_row|
# +---+---+---------+----------+
# |2 |4 |[2, 4, 5]|[2, 4] |
# |4 |1 |[2, 4, 5]|[4] |
# |3 |5 |[2, 4, 5]|[5] |
# |4 |5 |[2, 4, 5]|[4, 5] |
# |1 |8 |[2, 4, 5]|[] |
# +---+---+---------+----------+
答案2
得分: 1
IIUC,您可以尝试使用 array_intersect
来查找匹配的行。
from pyspark.sql import functions as F
l = [2, 4, 5]
# 转换为 Col 类型
l = [F.lit(x) for x in l]
df = (df.withColumn('intersect', F.array_intersect(F.array('id', 'id2'), F.array(l)))
.select('id', F.explode('intersect').alias('intersect'))
.groupby('intersect')
.count())
英文:
IIUC, you can try array_intersect
to find the matching rows.
from pyspark.sql import functions as F
l = [2, 4, 5]
# Convert to Col type
l = [F.lit(x) for x in l]
df = (df.withColumn('intersect', F.array_intersect(F.array('id', 'id2'), F.array(l)))
.select('id', F.explode('intersect').alias('intersect'))
.groupby('intersect')
.count())
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论