How can I filter an rows in column of ArrayType(StringType) against items in another column in a separate dataframe using pyspark?

huangapple go评论90阅读模式
英文:

How can I filter an rows in column of ArrayType(StringType) against items in another column in a separate dataframe using pyspark?

问题

我明白你想要的翻译内容。以下是翻译好的部分:

我有两个数据框架结构如下

df1

| 列 A    | 列 B                     | 列 C     | 列 D    |
| ------- | ------------------------ | -------- | ------- |
| 1       | 东京新加坡             | 4 小时   | 苹果    |
| 2       | 东京纽约巴黎         | 1.5 小时 | 香蕉    |
| 3       | 巴黎                     | 2 小时   | 橙子    |

其中列 B 是一个数组字符串

df2

| 目的地   |
| ------- |
| 巴黎    |
| 纽约    |

每行只有一个单一值

我想在 df1 中创建一个新列该列使用 df2 进行筛选如果数组中的某个值存在于 df2 中则返回 True否则返回 False

例如

| 列 A    | 列 B                     | 列 C     | 列 D    | 新列            |
| ------- | ------------------------ | -------- | ------- | --------------- |
| 1       | 东京新加坡             | 4 小时   | 苹果    | False, False    |
| 2       | 东京纽约巴黎         | 1.5 小时 | 香蕉    | False, True, True |
| 3       | 巴黎                     | 2 小时   | 橙子    | True            |

df1 中的数组长度没有最大限制df2 大约有约1000行

如何使用 PySpark 创建这个布尔列呢

谢谢

我的大部分错误都是像列不可迭代这样的错误
英文:

I have two dataframes. The structure is as below:

df1:

Column A Column B Column C Column D
1 Tokyo, Singapore 4 hours apple
2 Tokyo, New York, Paris 1.5 hours banana
3 Paris 2 hours orange

where column B is an array (string)

df2:

Destination
Paris
New York

where there is only a single value per row

I want to create a new column in df1, which is filtered using df2. If a value in the array is present in df1 then return True, otherwise False.

for example:

Column A Column B Column C Column D new column
1 Tokyo, Singapore 4 hours apple False, False
2 Tokyo, New York, Paris 1.5 hours banana False, True, True
3 Paris 2 hours orange True

There is no maximum array length in df1 and there are about ~1000 rows of df2.

How can I create this boolean column using PySpark?

Thanks!

Most my error are such like column not iterable.

答案1

得分: 0

使用transform Spark 高级内置函数。

  • df2中获取不重复的目的地列表,并进行交叉连接以将此列表添加到df1中。
  • 然后使用高级函数array_contains()来查找colb中的每个元素是否存在于destination数组中。

示例:

from pyspark.sql.functions import *
df = spark.createDataFrame([('1',['Tokyo', 'Singapore']),('2',['Tokyo', 'New York', 'Paris']),('3',['Paris'])],['cola','colb'])

df1 = spark.createDataFrame([('Paris',),('New York',)], ['Destination'])

df1 = df1.groupBy(lit("1")).agg(collect_set(col("Destination")).alias("dst")).drop('1')

df2 = df.crossJoin(df1)

df2.withColumn("new col", expr("""transform(colb,x ->array_contains(dst,x))""")).show()

结果如下:

+----+------------------------+-----------------+-------------------+
|cola|colb                    |dst              |new col            |
+----+------------------------+-----------------+-------------------+
|1   |[Tokyo, Singapore]      |[Paris, New York]|[false, false]     |
|2   |[Tokyo, New York, Paris]|[Paris, New York]|[false, true, true]|
|3   |[Paris]                 |[Paris, New York]|[true]             |
+----+------------------------+-----------------+-------------------+
英文:

Use transform spark higher built functions.

  • Get the distinct list of destinations from df2 and do cross join to get this list added to df1
  • Then use Higher order functions array_contains() to look for each element in colb if it exists in destination array

Example:

from pyspark.sql.functions import *
df = spark.createDataFrame([('1',['Tokyo', 'Singapore']),('2',['Tokyo', 'New York', 'Paris']),('3',['Paris'])],['cola','colb'])

df1 = spark.createDataFrame([('Paris',),('New York',)], ['Destination'])

df1 = df1.groupBy(lit("1")).agg(collect_set(col("Destination")).alias("dst")).drop('1')

df2 = df.crossJoin(df1)

df2.withColumn("new col", expr("""transform(colb,x ->array_contains(dst,x))""")).show()
#+----+------------------------+-----------------+-------------------+
#|cola|colb                    |dst              |new col            |
#+----+------------------------+-----------------+-------------------+
#|1   |[Tokyo, Singapore]      |[Paris, New York]|[false, false]     |
#|2   |[Tokyo, New York, Paris]|[Paris, New York]|[false, true, true]|
#|3   |[Paris]                 |[Paris, New York]|[true]             |
#+----+------------------------+-----------------+-------------------+

答案2

得分: 0

你可以使用array_contains来连接两个数据框,使用collect_list来聚合匹配的Destination,最后使用transform来创建布尔数组的Column B。

** 假设Column A是标识符。(如果不是,请在评论中回复)
** transform仅在Pyspark 3.1+中可用。如果您使用较低版本的Pyspark,您需要使用内置的SQL F.expr('tranform...')

from pyspark.sql import functions as F

df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
      .groupby('Column A')
      .agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'], 
           F.collect_list('Destination').alias('new column')))

在这一点上,df应该如下所示。

+---------+--------------------+---------+---------+-----------------+
| Column A|            Column B| Column C| Column D|       new column|
+---------+--------------------+---------+---------+-----------------+
|        1|  [Tokyo, Singapore]|  4 hours|    apple|               []|
|        2|[Tokyo, New York,...|1.5 hours|   banana|[Paris, New York]|
|        3|             [Paris]|  2 hours|   orange|          [Paris]|
+---------+--------------------+---------+---------+-----------------+

然后使用transform和条件array_contains,将Column B转换为布尔数组。

df = df.withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x)))

所有在一起。

df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
      .groupby('Column A')
      .agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'], 
           F.collect_list('Destination').alias('new column'))
      .withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x))))
英文:

You can use array_contains to join the 2 dataframes, aggregate matching Destination with the collect_list, and finally transform the Column B to create the boolean array.

** Assuming Column A is an identifier. (If not, please reply in comment)
<br/>
** transform is only available in Pyspark 3.1+. If you are using lower version of Pyspark, you need to use SQL built-in F.expr(&#39;tranform...&#39;).

from pyspark.sql import functions as F

df = (df1.join(df2, on=F.array_contains(df1[&#39;Column B&#39;], df2.Destination), how=&#39;left&#39;)
      .groupby(&#39;Column A&#39;)
      .agg(*[F.first(x).alias(x) for x in df1.columns if x != &#39;Column A&#39;], 
           F.collect_list(&#39;Destination&#39;).alias(&#39;new column&#39;)))

At this point, the df should look like this.

+---------+--------------------+---------+---------+-----------------+
| Column A|            Column B| Column C| Column D|       new column|
+---------+--------------------+---------+---------+-----------------+
|        1|  [Tokyo, Singapore]|  4 hours|    apple|               []|
|        2|[Tokyo, New York,...|1.5 hours|   banana|[Paris, New York]|
|        3|             [Paris]|  2 hours|   orange|          [Paris]|
+---------+--------------------+---------+---------+-----------------+

Then use transform and condition with array_contains, turn the Column B into the boolean array.

df = df.withColumn(&#39;new column&#39;, F.transform(&#39;Column B&#39;, lambda x: F.array_contains(&#39;new column&#39;, x)))

All together.

df = (df1.join(df2, on=F.array_contains(df1[&#39;Column B&#39;], df2.Destination), how=&#39;left&#39;)
      .groupby(&#39;Column A&#39;)
      .agg(*[F.first(x).alias(x) for x in df1.columns if x != &#39;Column A&#39;], 
           F.collect_list(&#39;Destination&#39;).alias(&#39;new column&#39;))
      .withColumn(&#39;new column&#39;, F.transform(&#39;Column B&#39;, lambda x: F.array_contains(&#39;new column&#39;, x))))

huangapple
  • 本文由 发表于 2023年8月5日 00:05:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76837559.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定