Pyspark 根据布尔条件展开列

huangapple go评论65阅读模式
英文:

Pyspark Exploding Column on Boolean Conditional

问题

我理解你的问题。你想在Pyspark中对joined_data表按home_code条件进行展开,但只展开在nyc_code表中存在的值。以下是示例代码,用于实现你的目标:

from pyspark.sql.functions import col, explode

# 首先,将`joined_data`和`nyc_code`表进行连接,以获取需要的`code`列
result_df = joined_data.join(nyc_code, col("cleaned_home_code").contains(col("code")))

# 然后,使用explode函数展开`cleaned_home_code`列
result_df = result_df.select("poi_code", "cleaned_home_code", "code").withColumn("code", explode("code"))

# 现在,`result_df`包含了你期望的结果

这段代码首先连接两个表,然后使用explode函数展开cleaned_home_code列,并确保只展开那些在nyc_code表中存在的值。希望这可以帮助你解决问题。

英文:

I am struggling with exploding on a conditional in Pyspark.

I have two tables:

joined_data:

poi_code cleaned_home_code
1 [{"12": 4, "34":2, "56":8}]
2 [{"12": 8, "56": 10, "78":4}]
3 [{"34": 1}]

and nyc_code:

code zipcode
12 11221
56 11385
78 10471

I would like to explode joined_data on home_code but only for values that exist in nyc_code.

Desired result:

poi_code cleaned_home_code code
1 [{"12": 4, "34":2, "56":8}] 12
1 [{"12": 4, "34":2, "56":8}] 56
2 [{"12": 8, "56": 10, "78":4}] 12
2 [{"12": 8, "56": 10, "78":4}] 56
2 [{"12": 8, "56": 10, "78":4}] 78

Can anyone please advise? I have tried exploding and then filtering out non-nyc codes, the challenge is that my dataset becomes too large to compute in the desired timeframe. This is why I'm wondering if I can apply the filtering condition as I explode, so that I only explode values that meet the desired criteria. I am a spark novice so any help is appreciated.

答案1

得分: 1

Here are the translated parts:

  1. 导入必要的包:
from pyspark.sql.functions import explode, map_keys
  1. 展开cleaned_home_code列并提取键值对:
joined_data = joined_data.withColumn("exp_cleaned_home_code", explode("cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", map_keys("exp_cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", explode("home_code_key"))
  1. nyc_code数据框进行内连接:
joined_nyc = joined_data \
    .join(nyc_code, joined_data.home_code_key == nyc_code.code) \
    .orderBy("poi_code") \
    .drop("home_code_key", "zipcode", "exp_cleaned_home_code")

joined_nyc.show(truncate=False)
  1. 输出结果:
+--------+------------------------------+----+
|poi_code|cleaned_home_code             |code|
+--------+------------------------------+----+
|1       |[{56 -> 8, 34 -> 2, 12 -> 4}] |12  |
|1       |[{56 -> 8, 34 -> 2, 12 -> 4}] |56  |
|2       |[{78 -> 4, 56 -> 10, 12 -> 8}]|12  |
|2       |[{78 -> 4, 56 -> 10, 12 -> 8}]|56  |
|2       |[{78 -> 4, 56 -> 10, 12 -> 8}]|78  |
+--------+------------------------------+----+

Please note that I have translated the code comments, but not the code itself, as per your request.

英文:

Try this:

Importing necessary packages

from pyspark.sql.functions import explode, map_keys
  1. Explode the cleaned_home_code columns and extract key out of it
joined_data = joined_data.withColumn("exp_cleaned_home_code", explode("cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", map_keys("exp_cleaned_home_code"))
joined_data = joined_data.withColumn("home_code_key", explode("home_code_key"))
  1. perform an inner join with the nyc_code DataFrame
joined_nyc = joined_data \
    .join(nyc_code, joined_data.home_code_key == nyc_code.code) \
    .orderBy("poi_code") \
    .drop("home_code_key", "zipcode", "exp_cleaned_home_code")

joined_nyc.show(truncate=False)

Output:

+--------+------------------------------+----+
|poi_code|cleaned_home_code             |code|
+--------+------------------------------+----+
|1       |[{56 -> 8, 34 -> 2, 12 -> 4}] |12  |
|1       |[{56 -> 8, 34 -> 2, 12 -> 4}] |56  |
|2       |[{78 -> 4, 56 -> 10, 12 -> 8}]|12  |
|2       |[{78 -> 4, 56 -> 10, 12 -> 8}]|56  |
|2       |[{78 -> 4, 56 -> 10, 12 -> 8}]|78  |
+--------+------------------------------+----+

答案2

得分: 0

自 Spark 3.1 起,您可以在爆炸之前过滤您的 cleaned_home_code 列,仅保留指定列表中的值。

然而,指定的列表不能是一个数据框,所以您首先需要将您的 nyc_code 转换为 Python 列表。在您的情况下,由于您的 nyc_code 数据框相对较小(约 6000 条记录),您可以收集它并将其传递给 filter 函数以删除不在 nyc_code 中的所有值。然后,您可以爆炸过滤后的列,如下所示:

from pyspark.sql import functions as F

# 创建要保留的代码列表
kept_codes = [r[0] for r in nyc_code.select(F.col('code').cast('string')).distinct().collect()]

# 在过滤后的映射上爆炸
result = joined_data.withColumn(
  'code',
  F.explode(
    F.filter(
      F.map_keys(F.col('cleaned_home_code')), 
      lambda c: c.isin(kept_codes)
    )
  )
)

然后,您将获得以下 result 数据框:

+--------+----------------------------+----+
|poi_code|cleaned_home_code           |code|
+--------+----------------------------+----+
|1       |{56 -> 8, 34 -> 2, 12 -> 4} |56  |
|1       |{56 -> 8, 34 -> 2, 12 -> 4} |12  |
|2       |{78 -> 4, 56 -> 10, 12 -> 8}|78  |
|2       |{78 -> 4, 56 -> 10, 12 -> 8}|56  |
|2       |{78 -> 4, 56 -> 10, 12 -> 8}|12  |
+--------+----------------------------+----+
英文:

Since Spark 3.1, You can filter your cleaned_home_code column, keeping only values in a specified list before exploding it.

However, this specified list can't be a dataframe so you first need to transform your nyc_code to a python list. In your case, as your nyc_code dataframe is rather small (~6000 records), you can collect it and pass it to the filter function to remove all values not in nyc_code. Then, you can explode filtered column, as follows:

from pyspark.sql import functions as F

# create list of codes you want to keep
kept_codes = [r[0] for r in nyc_code.select(F.col('code').cast('string')).distinct().collect()]

# explode on filtered map
result = joined_data.withColumn(
  'code',
  F.explode(
    F.filter(
      F.map_keys(F.col('cleaned_home_code')), 
      lambda c: c.isin(kept_codes)
    )
  )
)

You then get the following result dataframe:

+--------+----------------------------+----+
|poi_code|cleaned_home_code           |code|
+--------+----------------------------+----+
|1       |{56 -> 8, 34 -> 2, 12 -> 4} |56  |
|1       |{56 -> 8, 34 -> 2, 12 -> 4} |12  |
|2       |{78 -> 4, 56 -> 10, 12 -> 8}|78  |
|2       |{78 -> 4, 56 -> 10, 12 -> 8}|56  |
|2       |{78 -> 4, 56 -> 10, 12 -> 8}|12  |
+--------+----------------------------+----+
``

</details>



huangapple
  • 本文由 发表于 2023年5月17日 14:24:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/76269090.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定