How can I filter an rows in column of ArrayType(StringType) against items in another column in a separate dataframe using pyspark?

huangapple go评论112阅读模式
英文:

How can I filter an rows in column of ArrayType(StringType) against items in another column in a separate dataframe using pyspark?

问题

我明白你想要的翻译内容。以下是翻译好的部分:

  1. 我有两个数据框架结构如下
  2. df1
  3. | A | B | C | D |
  4. | ------- | ------------------------ | -------- | ------- |
  5. | 1 | 东京新加坡 | 4 小时 | 苹果 |
  6. | 2 | 东京纽约巴黎 | 1.5 小时 | 香蕉 |
  7. | 3 | 巴黎 | 2 小时 | 橙子 |
  8. 其中列 B 是一个数组字符串
  9. df2
  10. | 目的地 |
  11. | ------- |
  12. | 巴黎 |
  13. | 纽约 |
  14. 每行只有一个单一值
  15. 我想在 df1 中创建一个新列该列使用 df2 进行筛选如果数组中的某个值存在于 df2 则返回 True否则返回 False
  16. 例如
  17. | A | B | C | D | 新列 |
  18. | ------- | ------------------------ | -------- | ------- | --------------- |
  19. | 1 | 东京新加坡 | 4 小时 | 苹果 | False, False |
  20. | 2 | 东京纽约巴黎 | 1.5 小时 | 香蕉 | False, True, True |
  21. | 3 | 巴黎 | 2 小时 | 橙子 | True |
  22. df1 中的数组长度没有最大限制df2 大约有约1000
  23. 如何使用 PySpark 创建这个布尔列呢
  24. 谢谢
  25. 我的大部分错误都是像列不可迭代这样的错误
英文:

I have two dataframes. The structure is as below:

df1:

Column A Column B Column C Column D
1 Tokyo, Singapore 4 hours apple
2 Tokyo, New York, Paris 1.5 hours banana
3 Paris 2 hours orange

where column B is an array (string)

df2:

Destination
Paris
New York

where there is only a single value per row

I want to create a new column in df1, which is filtered using df2. If a value in the array is present in df1 then return True, otherwise False.

for example:

Column A Column B Column C Column D new column
1 Tokyo, Singapore 4 hours apple False, False
2 Tokyo, New York, Paris 1.5 hours banana False, True, True
3 Paris 2 hours orange True

There is no maximum array length in df1 and there are about ~1000 rows of df2.

How can I create this boolean column using PySpark?

Thanks!

Most my error are such like column not iterable.

答案1

得分: 0

使用transform Spark 高级内置函数。

  • df2中获取不重复的目的地列表,并进行交叉连接以将此列表添加到df1中。
  • 然后使用高级函数array_contains()来查找colb中的每个元素是否存在于destination数组中。

示例:

  1. from pyspark.sql.functions import *
  2. df = spark.createDataFrame([('1',['Tokyo', 'Singapore']),('2',['Tokyo', 'New York', 'Paris']),('3',['Paris'])],['cola','colb'])
  3. df1 = spark.createDataFrame([('Paris',),('New York',)], ['Destination'])
  4. df1 = df1.groupBy(lit("1")).agg(collect_set(col("Destination")).alias("dst")).drop('1')
  5. df2 = df.crossJoin(df1)
  6. df2.withColumn("new col", expr("""transform(colb,x ->array_contains(dst,x))""")).show()

结果如下:

  1. +----+------------------------+-----------------+-------------------+
  2. |cola|colb |dst |new col |
  3. +----+------------------------+-----------------+-------------------+
  4. |1 |[Tokyo, Singapore] |[Paris, New York]|[false, false] |
  5. |2 |[Tokyo, New York, Paris]|[Paris, New York]|[false, true, true]|
  6. |3 |[Paris] |[Paris, New York]|[true] |
  7. +----+------------------------+-----------------+-------------------+
英文:

Use transform spark higher built functions.

  • Get the distinct list of destinations from df2 and do cross join to get this list added to df1
  • Then use Higher order functions array_contains() to look for each element in colb if it exists in destination array

Example:

  1. from pyspark.sql.functions import *
  2. df = spark.createDataFrame([('1',['Tokyo', 'Singapore']),('2',['Tokyo', 'New York', 'Paris']),('3',['Paris'])],['cola','colb'])
  3. df1 = spark.createDataFrame([('Paris',),('New York',)], ['Destination'])
  4. df1 = df1.groupBy(lit("1")).agg(collect_set(col("Destination")).alias("dst")).drop('1')
  5. df2 = df.crossJoin(df1)
  6. df2.withColumn("new col", expr("""transform(colb,x ->array_contains(dst,x))""")).show()
  7. #+----+------------------------+-----------------+-------------------+
  8. #|cola|colb |dst |new col |
  9. #+----+------------------------+-----------------+-------------------+
  10. #|1 |[Tokyo, Singapore] |[Paris, New York]|[false, false] |
  11. #|2 |[Tokyo, New York, Paris]|[Paris, New York]|[false, true, true]|
  12. #|3 |[Paris] |[Paris, New York]|[true] |
  13. #+----+------------------------+-----------------+-------------------+

答案2

得分: 0

你可以使用array_contains来连接两个数据框,使用collect_list来聚合匹配的Destination,最后使用transform来创建布尔数组的Column B。

** 假设Column A是标识符。(如果不是,请在评论中回复)
** transform仅在Pyspark 3.1+中可用。如果您使用较低版本的Pyspark,您需要使用内置的SQL F.expr('tranform...')

  1. from pyspark.sql import functions as F
  2. df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
  3. .groupby('Column A')
  4. .agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'],
  5. F.collect_list('Destination').alias('new column')))

在这一点上,df应该如下所示。

  1. +---------+--------------------+---------+---------+-----------------+
  2. | Column A| Column B| Column C| Column D| new column|
  3. +---------+--------------------+---------+---------+-----------------+
  4. | 1| [Tokyo, Singapore]| 4 hours| apple| []|
  5. | 2|[Tokyo, New York,...|1.5 hours| banana|[Paris, New York]|
  6. | 3| [Paris]| 2 hours| orange| [Paris]|
  7. +---------+--------------------+---------+---------+-----------------+

然后使用transform和条件array_contains,将Column B转换为布尔数组。

  1. df = df.withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x)))

所有在一起。

  1. df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
  2. .groupby('Column A')
  3. .agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'],
  4. F.collect_list('Destination').alias('new column'))
  5. .withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x))))
英文:

You can use array_contains to join the 2 dataframes, aggregate matching Destination with the collect_list, and finally transform the Column B to create the boolean array.

** Assuming Column A is an identifier. (If not, please reply in comment)
<br/>
** transform is only available in Pyspark 3.1+. If you are using lower version of Pyspark, you need to use SQL built-in F.expr(&#39;tranform...&#39;).

  1. from pyspark.sql import functions as F
  2. df = (df1.join(df2, on=F.array_contains(df1[&#39;Column B&#39;], df2.Destination), how=&#39;left&#39;)
  3. .groupby(&#39;Column A&#39;)
  4. .agg(*[F.first(x).alias(x) for x in df1.columns if x != &#39;Column A&#39;],
  5. F.collect_list(&#39;Destination&#39;).alias(&#39;new column&#39;)))

At this point, the df should look like this.

  1. +---------+--------------------+---------+---------+-----------------+
  2. | Column A| Column B| Column C| Column D| new column|
  3. +---------+--------------------+---------+---------+-----------------+
  4. | 1| [Tokyo, Singapore]| 4 hours| apple| []|
  5. | 2|[Tokyo, New York,...|1.5 hours| banana|[Paris, New York]|
  6. | 3| [Paris]| 2 hours| orange| [Paris]|
  7. +---------+--------------------+---------+---------+-----------------+

Then use transform and condition with array_contains, turn the Column B into the boolean array.

  1. df = df.withColumn(&#39;new column&#39;, F.transform(&#39;Column B&#39;, lambda x: F.array_contains(&#39;new column&#39;, x)))

All together.

  1. df = (df1.join(df2, on=F.array_contains(df1[&#39;Column B&#39;], df2.Destination), how=&#39;left&#39;)
  2. .groupby(&#39;Column A&#39;)
  3. .agg(*[F.first(x).alias(x) for x in df1.columns if x != &#39;Column A&#39;],
  4. F.collect_list(&#39;Destination&#39;).alias(&#39;new column&#39;))
  5. .withColumn(&#39;new column&#39;, F.transform(&#39;Column B&#39;, lambda x: F.array_contains(&#39;new column&#39;, x))))

huangapple
  • 本文由 发表于 2023年8月5日 00:05:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76837559.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定