并行化应用于列表的操作(PySpark)

huangapple go评论70阅读模式
英文:

Parallelize an operation applied to a list (PySpark)

问题

在我的程序中的某一点,一个函数接收一个列表并对其中的每个项执行操作。

举例来说,假设我的程序开始时有一个名为 l 的列表,然后统计某个通用的 PySpark 数据帧 df 满足条件的行数。

l = [2, 4, 5]
res = []

for x in l:
    val = df.where((col('id') == x) | (col('id2') == x)).count()
    res.append(val)

是否可能让多个工作节点同时计算 val?也就是说,每个工作节点是否可以独立地计算自己的 val 并将其追加到 l 中?

这个帖子 建议使用 foreach,但由于我正在迭代一个列表(而不是 RDD 或数据帧),我无法使用该方法。

英文:

At one point in my program, a function receives a list and repeats an operation using each of its items.

For the sake of this example, suppose my program starts off with a list l and counts the rows of some generic pyspark dataframe df that meet a condition.

l = [2, 4, 5]
res = []

for x in l:
    val = df.where((col('id') == x) | (col('id2') == x)).count()
    res.append(val)

Is it possible to have multiple workers calculate val simultaneously? That is, could each worker calculate its own val and append it to l individually?

This post suggests using foreach, but since I'm iterating over a list (not an RDD or dataframe), I cannot use that method.

答案1

得分: 1

你可以放弃循环,使用Spark函数来计算出现次数。

以下是一个示例:

# 模拟输入数据
data_sdf.show()

# +---+---+
# | id|id2|
# +---+---+
# |  2|  4|
# |  4|  1|
# |  3|  5|
# |  4|  5|
# |  1|  8|
# +---+---+

# 创建包含所有id的数组,并移除不在该行中的id
data_sdf. \
    withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
    withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
    filter(func.size('id_per_row') != 0). \
    select('*', func.explode('id_per_row').alias('ids')). \
    groupBy('ids'). \
    count(). \
    show()

# +---+-----+
# |ids|count|
# +---+-----+
# |  4|    3|
# |  2|    1|
# |  5|    2|
# +---+-----+

data_sdf. \
    withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
    withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
    show(truncate=False)

# +---+---+---------+----------+
# |id |id2|id_ls    |id_per_row|
# +---+---+---------+----------+
# |  2|  4|[2, 4, 5]|[2, 4]    |
# |  4|  1|[2, 4, 5]|[4]       |
# |  3|  5|[2, 4, 5]|[5]       |
# |  4|  5|[2, 4, 5]|[4, 5]    |
# |  1|  8|[2, 4, 5]|[]        |
# +---+---+---------+----------+
英文:

you could do away with loops and use spark functions to count occurrences.

here's an example

# mock input data
data_sdf.show()

# +---+---+
# | id|id2|
# +---+---+
# |  2|  4|
# |  4|  1|
# |  3|  5|
# |  4|  5|
# |  1|  8|
# +---+---+

# create array of all ids, and remove ids that are not in that row
data_sdf. \
    withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
    withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
    filter(func.size('id_per_row') != 0). \
    select('*', func.explode('id_per_row').alias('ids')). \
    groupBy('ids'). \
    count(). \
    show()

# +---+-----+
# |ids|count|
# +---+-----+
# |4  |3    |
# |2  |1    |
# |5  |2    |
# +---+-----+

data_sdf. \
    withColumn('id_ls', func.array(*[func.lit(k) for k in ls])). \
    withColumn('id_per_row', func.expr('filter(id_ls, x -> (id = x) or (id2 = x))')). \
    show(truncate=False)

# +---+---+---------+----------+
# |id |id2|id_ls    |id_per_row|
# +---+---+---------+----------+
# |2  |4  |[2, 4, 5]|[2, 4]    |
# |4  |1  |[2, 4, 5]|[4]       |
# |3  |5  |[2, 4, 5]|[5]       |
# |4  |5  |[2, 4, 5]|[4, 5]    |
# |1  |8  |[2, 4, 5]|[]        |
# +---+---+---------+----------+

答案2

得分: 1

IIUC,您可以尝试使用 array_intersect 来查找匹配的行。

from pyspark.sql import functions as F

l = [2, 4, 5]
# 转换为 Col 类型
l = [F.lit(x) for x in l]

df = (df.withColumn('intersect', F.array_intersect(F.array('id', 'id2'), F.array(l)))
      .select('id', F.explode('intersect').alias('intersect'))
      .groupby('intersect')
      .count())
英文:

IIUC, you can try array_intersect to find the matching rows.

from pyspark.sql import functions as F

l = [2, 4, 5]
# Convert to Col type
l = [F.lit(x) for x in l]

df = (df.withColumn('intersect', F.array_intersect(F.array('id', 'id2'), F.array(l)))
      .select('id', F.explode('intersect').alias('intersect'))
      .groupby('intersect')
      .count())

huangapple
  • 本文由 发表于 2023年6月1日 06:56:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/76377750.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定