英文:
How can I filter an rows in column of ArrayType(StringType) against items in another column in a separate dataframe using pyspark?
问题
我明白你想要的翻译内容。以下是翻译好的部分:
我有两个数据框架。结构如下:
df1:
| 列 A | 列 B | 列 C | 列 D |
| ------- | ------------------------ | -------- | ------- |
| 1 | 东京,新加坡 | 4 小时 | 苹果 |
| 2 | 东京,纽约,巴黎 | 1.5 小时 | 香蕉 |
| 3 | 巴黎 | 2 小时 | 橙子 |
其中列 B 是一个数组(字符串)
df2:
| 目的地 |
| ------- |
| 巴黎 |
| 纽约 |
每行只有一个单一值
我想在 df1 中创建一个新列,该列使用 df2 进行筛选。如果数组中的某个值存在于 df2 中,则返回 True,否则返回 False。
例如:
| 列 A | 列 B | 列 C | 列 D | 新列 |
| ------- | ------------------------ | -------- | ------- | --------------- |
| 1 | 东京,新加坡 | 4 小时 | 苹果 | False, False |
| 2 | 东京,纽约,巴黎 | 1.5 小时 | 香蕉 | False, True, True |
| 3 | 巴黎 | 2 小时 | 橙子 | True |
df1 中的数组长度没有最大限制,df2 大约有约1000行。
如何使用 PySpark 创建这个布尔列呢?
谢谢!
我的大部分错误都是像列不可迭代这样的错误。
英文:
I have two dataframes. The structure is as below:
df1:
Column A | Column B | Column C | Column D |
---|---|---|---|
1 | Tokyo, Singapore | 4 hours | apple |
2 | Tokyo, New York, Paris | 1.5 hours | banana |
3 | Paris | 2 hours | orange |
where column B is an array (string)
df2:
Destination |
---|
Paris |
New York |
where there is only a single value per row
I want to create a new column in df1, which is filtered using df2. If a value in the array is present in df1 then return True, otherwise False.
for example:
Column A | Column B | Column C | Column D | new column |
---|---|---|---|---|
1 | Tokyo, Singapore | 4 hours | apple | False, False |
2 | Tokyo, New York, Paris | 1.5 hours | banana | False, True, True |
3 | Paris | 2 hours | orange | True |
There is no maximum array length in df1 and there are about ~1000 rows of df2.
How can I create this boolean column using PySpark?
Thanks!
Most my error are such like column not iterable.
答案1
得分: 0
使用transform
Spark 高级内置函数。
- 从
df2
中获取不重复的目的地列表,并进行交叉连接以将此列表添加到df1
中。 - 然后使用高级函数
array_contains()
来查找colb
中的每个元素是否存在于destination
数组中。
示例:
from pyspark.sql.functions import *
df = spark.createDataFrame([('1',['Tokyo', 'Singapore']),('2',['Tokyo', 'New York', 'Paris']),('3',['Paris'])],['cola','colb'])
df1 = spark.createDataFrame([('Paris',),('New York',)], ['Destination'])
df1 = df1.groupBy(lit("1")).agg(collect_set(col("Destination")).alias("dst")).drop('1')
df2 = df.crossJoin(df1)
df2.withColumn("new col", expr("""transform(colb,x ->array_contains(dst,x))""")).show()
结果如下:
+----+------------------------+-----------------+-------------------+
|cola|colb |dst |new col |
+----+------------------------+-----------------+-------------------+
|1 |[Tokyo, Singapore] |[Paris, New York]|[false, false] |
|2 |[Tokyo, New York, Paris]|[Paris, New York]|[false, true, true]|
|3 |[Paris] |[Paris, New York]|[true] |
+----+------------------------+-----------------+-------------------+
英文:
Use transform
spark higher built functions.
- Get the distinct list of destinations from
df2
and do cross join to get this list added todf1
- Then use Higher order functions
array_contains()
to look for each element in colb if it exists indestination
array
Example:
from pyspark.sql.functions import *
df = spark.createDataFrame([('1',['Tokyo', 'Singapore']),('2',['Tokyo', 'New York', 'Paris']),('3',['Paris'])],['cola','colb'])
df1 = spark.createDataFrame([('Paris',),('New York',)], ['Destination'])
df1 = df1.groupBy(lit("1")).agg(collect_set(col("Destination")).alias("dst")).drop('1')
df2 = df.crossJoin(df1)
df2.withColumn("new col", expr("""transform(colb,x ->array_contains(dst,x))""")).show()
#+----+------------------------+-----------------+-------------------+
#|cola|colb |dst |new col |
#+----+------------------------+-----------------+-------------------+
#|1 |[Tokyo, Singapore] |[Paris, New York]|[false, false] |
#|2 |[Tokyo, New York, Paris]|[Paris, New York]|[false, true, true]|
#|3 |[Paris] |[Paris, New York]|[true] |
#+----+------------------------+-----------------+-------------------+
答案2
得分: 0
你可以使用array_contains
来连接两个数据框,使用collect_list
来聚合匹配的Destination,最后使用transform
来创建布尔数组的Column B。
** 假设Column A是标识符。(如果不是,请在评论中回复)
** transform仅在Pyspark 3.1+中可用。如果您使用较低版本的Pyspark,您需要使用内置的SQL F.expr('tranform...')
。
from pyspark.sql import functions as F
df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
.groupby('Column A')
.agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'],
F.collect_list('Destination').alias('new column')))
在这一点上,df应该如下所示。
+---------+--------------------+---------+---------+-----------------+
| Column A| Column B| Column C| Column D| new column|
+---------+--------------------+---------+---------+-----------------+
| 1| [Tokyo, Singapore]| 4 hours| apple| []|
| 2|[Tokyo, New York,...|1.5 hours| banana|[Paris, New York]|
| 3| [Paris]| 2 hours| orange| [Paris]|
+---------+--------------------+---------+---------+-----------------+
然后使用transform
和条件array_contains
,将Column B转换为布尔数组。
df = df.withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x)))
所有在一起。
df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
.groupby('Column A')
.agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'],
F.collect_list('Destination').alias('new column'))
.withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x))))
英文:
You can use array_contains
to join the 2 dataframes, aggregate matching Destination with the collect_list
, and finally transform
the Column B to create the boolean array.
** Assuming Column A is an identifier. (If not, please reply in comment)
<br/>
** transform is only available in Pyspark 3.1+. If you are using lower version of Pyspark, you need to use SQL built-in F.expr('tranform...')
.
from pyspark.sql import functions as F
df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
.groupby('Column A')
.agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'],
F.collect_list('Destination').alias('new column')))
At this point, the df should look like this.
+---------+--------------------+---------+---------+-----------------+
| Column A| Column B| Column C| Column D| new column|
+---------+--------------------+---------+---------+-----------------+
| 1| [Tokyo, Singapore]| 4 hours| apple| []|
| 2|[Tokyo, New York,...|1.5 hours| banana|[Paris, New York]|
| 3| [Paris]| 2 hours| orange| [Paris]|
+---------+--------------------+---------+---------+-----------------+
Then use transform
and condition with array_contains
, turn the Column B into the boolean array.
df = df.withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x)))
All together.
df = (df1.join(df2, on=F.array_contains(df1['Column B'], df2.Destination), how='left')
.groupby('Column A')
.agg(*[F.first(x).alias(x) for x in df1.columns if x != 'Column A'],
F.collect_list('Destination').alias('new column'))
.withColumn('new column', F.transform('Column B', lambda x: F.array_contains('new column', x))))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论