英文:
Pyspark Exploding Column on Boolean Conditional
问题
我理解你的问题。你想在Pyspark中对joined_data
表按home_code
条件进行展开,但只展开在nyc_code
表中存在的值。以下是示例代码,用于实现你的目标:
from pyspark.sql.functions import col, explode
# 首先,将`joined_data`和`nyc_code`表进行连接,以获取需要的`code`列
result_df = joined_data.join(nyc_code, col("cleaned_home_code").contains(col("code")))
# 然后,使用explode函数展开`cleaned_home_code`列
result_df = result_df.select("poi_code", "cleaned_home_code", "code").withColumn("code", explode("code"))
# 现在,`result_df`包含了你期望的结果
这段代码首先连接两个表,然后使用explode
函数展开cleaned_home_code
列,并确保只展开那些在nyc_code
表中存在的值。希望这可以帮助你解决问题。
英文:
I am struggling with exploding on a conditional in Pyspark.
I have two tables:
joined_data
:
poi_code | cleaned_home_code |
---|---|
1 | [{"12": 4, "34":2, "56":8}] |
2 | [{"12": 8, "56": 10, "78":4}] |
3 | [{"34": 1}] |
and nyc_code
:
code | zipcode |
---|---|
12 | 11221 |
56 | 11385 |
78 | 10471 |
I would like to explode joined_data
on home_code
but only for values that exist in nyc_code
.
Desired result:
poi_code | cleaned_home_code | code |
---|---|---|
1 | [{"12": 4, "34":2, "56":8}] | 12 |
1 | [{"12": 4, "34":2, "56":8}] | 56 |
2 | [{"12": 8, "56": 10, "78":4}] | 12 |
2 | [{"12": 8, "56": 10, "78":4}] | 56 |
2 | [{"12": 8, "56": 10, "78":4}] | 78 |
Can anyone please advise? I have tried exploding and then filtering out non-nyc codes, the challenge is that my dataset becomes too large to compute in the desired timeframe. This is why I'm wondering if I can apply the filtering condition as I explode, so that I only explode values that meet the desired criteria. I am a spark novice so any help is appreciated.
答案1
得分: 1
Here are the translated parts:
- 导入必要的包:
from pyspark.sql.functions import explode, map_keys
- 展开
cleaned_home_code
列并提取键值对:
joined_data = joined_data.withColumn("exp_cleaned_home_code", explode("cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", map_keys("exp_cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", explode("home_code_key"))
- 与
nyc_code
数据框进行内连接:
joined_nyc = joined_data \
.join(nyc_code, joined_data.home_code_key == nyc_code.code) \
.orderBy("poi_code") \
.drop("home_code_key", "zipcode", "exp_cleaned_home_code")
joined_nyc.show(truncate=False)
- 输出结果:
+--------+------------------------------+----+
|poi_code|cleaned_home_code |code|
+--------+------------------------------+----+
|1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |12 |
|1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |56 |
|2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|12 |
|2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|56 |
|2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|78 |
+--------+------------------------------+----+
Please note that I have translated the code comments, but not the code itself, as per your request.
英文:
Try this:
Importing necessary packages
from pyspark.sql.functions import explode, map_keys
- Explode the
cleaned_home_code
columns and extract key out of it
joined_data = joined_data.withColumn("exp_cleaned_home_code", explode("cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", map_keys("exp_cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", explode("home_code_key"))
- perform an inner join with the
nyc_code
DataFrame
joined_nyc = joined_data \
.join(nyc_code, joined_data.home_code_key == nyc_code.code) \
.orderBy("poi_code") \
.drop("home_code_key", "zipcode", "exp_cleaned_home_code")
joined_nyc.show(truncate=False)
Output:
+--------+------------------------------+----+
|poi_code|cleaned_home_code |code|
+--------+------------------------------+----+
|1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |12 |
|1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |56 |
|2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|12 |
|2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|56 |
|2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|78 |
+--------+------------------------------+----+
答案2
得分: 0
自 Spark 3.1 起,您可以在爆炸之前过滤您的 cleaned_home_code
列,仅保留指定列表中的值。
然而,指定的列表不能是一个数据框,所以您首先需要将您的 nyc_code
转换为 Python 列表。在您的情况下,由于您的 nyc_code
数据框相对较小(约 6000 条记录),您可以收集它并将其传递给 filter
函数以删除不在 nyc_code
中的所有值。然后,您可以爆炸过滤后的列,如下所示:
from pyspark.sql import functions as F
# 创建要保留的代码列表
kept_codes = [r[0] for r in nyc_code.select(F.col('code').cast('string')).distinct().collect()]
# 在过滤后的映射上爆炸
result = joined_data.withColumn(
'code',
F.explode(
F.filter(
F.map_keys(F.col('cleaned_home_code')),
lambda c: c.isin(kept_codes)
)
)
)
然后,您将获得以下 result
数据框:
+--------+----------------------------+----+
|poi_code|cleaned_home_code |code|
+--------+----------------------------+----+
|1 |{56 -> 8, 34 -> 2, 12 -> 4} |56 |
|1 |{56 -> 8, 34 -> 2, 12 -> 4} |12 |
|2 |{78 -> 4, 56 -> 10, 12 -> 8}|78 |
|2 |{78 -> 4, 56 -> 10, 12 -> 8}|56 |
|2 |{78 -> 4, 56 -> 10, 12 -> 8}|12 |
+--------+----------------------------+----+
英文:
Since Spark 3.1, You can filter your cleaned_home_code
column, keeping only values in a specified list before exploding it.
However, this specified list can't be a dataframe so you first need to transform your nyc_code
to a python list. In your case, as your nyc_code
dataframe is rather small (~6000 records), you can collect it and pass it to the filter
function to remove all values not in nyc_code
. Then, you can explode filtered column, as follows:
from pyspark.sql import functions as F
# create list of codes you want to keep
kept_codes = [r[0] for r in nyc_code.select(F.col('code').cast('string')).distinct().collect()]
# explode on filtered map
result = joined_data.withColumn(
'code',
F.explode(
F.filter(
F.map_keys(F.col('cleaned_home_code')),
lambda c: c.isin(kept_codes)
)
)
)
You then get the following result
dataframe:
+--------+----------------------------+----+
|poi_code|cleaned_home_code |code|
+--------+----------------------------+----+
|1 |{56 -> 8, 34 -> 2, 12 -> 4} |56 |
|1 |{56 -> 8, 34 -> 2, 12 -> 4} |12 |
|2 |{78 -> 4, 56 -> 10, 12 -> 8}|78 |
|2 |{78 -> 4, 56 -> 10, 12 -> 8}|56 |
|2 |{78 -> 4, 56 -> 10, 12 -> 8}|12 |
+--------+----------------------------+----+
``
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论