Pyspark 根据布尔条件展开列

huangapple go评论96阅读模式
英文:

Pyspark Exploding Column on Boolean Conditional

问题

我理解你的问题。你想在Pyspark中对joined_data表按home_code条件进行展开,但只展开在nyc_code表中存在的值。以下是示例代码,用于实现你的目标:

  1. from pyspark.sql.functions import col, explode
  2. # 首先,将`joined_data`和`nyc_code`表进行连接,以获取需要的`code`列
  3. result_df = joined_data.join(nyc_code, col("cleaned_home_code").contains(col("code")))
  4. # 然后,使用explode函数展开`cleaned_home_code`列
  5. result_df = result_df.select("poi_code", "cleaned_home_code", "code").withColumn("code", explode("code"))
  6. # 现在,`result_df`包含了你期望的结果

这段代码首先连接两个表,然后使用explode函数展开cleaned_home_code列,并确保只展开那些在nyc_code表中存在的值。希望这可以帮助你解决问题。

英文:

I am struggling with exploding on a conditional in Pyspark.

I have two tables:

joined_data:

poi_code cleaned_home_code
1 [{"12": 4, "34":2, "56":8}]
2 [{"12": 8, "56": 10, "78":4}]
3 [{"34": 1}]

and nyc_code:

code zipcode
12 11221
56 11385
78 10471

I would like to explode joined_data on home_code but only for values that exist in nyc_code.

Desired result:

poi_code cleaned_home_code code
1 [{"12": 4, "34":2, "56":8}] 12
1 [{"12": 4, "34":2, "56":8}] 56
2 [{"12": 8, "56": 10, "78":4}] 12
2 [{"12": 8, "56": 10, "78":4}] 56
2 [{"12": 8, "56": 10, "78":4}] 78

Can anyone please advise? I have tried exploding and then filtering out non-nyc codes, the challenge is that my dataset becomes too large to compute in the desired timeframe. This is why I'm wondering if I can apply the filtering condition as I explode, so that I only explode values that meet the desired criteria. I am a spark novice so any help is appreciated.

答案1

得分: 1

Here are the translated parts:

  1. 导入必要的包:
  1. from pyspark.sql.functions import explode, map_keys
  1. 展开cleaned_home_code列并提取键值对:
  1. joined_data = joined_data.withColumn("exp_cleaned_home_code", explode("cleaned_home_code"))
  2. joined_data = joined_data.withColumn("home_code_key", map_keys("exp_cleaned_home_code"))
  3. joined_data = joined_data.withColumn("home_code_key", explode("home_code_key"))
  1. nyc_code数据框进行内连接:
  1. joined_nyc = joined_data \
  2. .join(nyc_code, joined_data.home_code_key == nyc_code.code) \
  3. .orderBy("poi_code") \
  4. .drop("home_code_key", "zipcode", "exp_cleaned_home_code")
  5. joined_nyc.show(truncate=False)
  1. 输出结果:
  1. +--------+------------------------------+----+
  2. |poi_code|cleaned_home_code |code|
  3. +--------+------------------------------+----+
  4. |1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |12 |
  5. |1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |56 |
  6. |2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|12 |
  7. |2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|56 |
  8. |2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|78 |
  9. +--------+------------------------------+----+

Please note that I have translated the code comments, but not the code itself, as per your request.

英文:

Try this:

Importing necessary packages

  1. from pyspark.sql.functions import explode, map_keys
  1. Explode the cleaned_home_code columns and extract key out of it
  1. joined_data = joined_data.withColumn("exp_cleaned_home_code", explode("cleaned_home_code"))
  2. joined_data = joined_data.withColumn("home_code_key", map_keys("exp_cleaned_home_code"))
  3. joined_data = joined_data.withColumn("home_code_key", explode("home_code_key"))
  1. perform an inner join with the nyc_code DataFrame
  1. joined_nyc = joined_data \
  2. .join(nyc_code, joined_data.home_code_key == nyc_code.code) \
  3. .orderBy("poi_code") \
  4. .drop("home_code_key", "zipcode", "exp_cleaned_home_code")
  5. joined_nyc.show(truncate=False)

Output:

  1. +--------+------------------------------+----+
  2. |poi_code|cleaned_home_code |code|
  3. +--------+------------------------------+----+
  4. |1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |12 |
  5. |1 |[{56 -> 8, 34 -> 2, 12 -> 4}] |56 |
  6. |2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|12 |
  7. |2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|56 |
  8. |2 |[{78 -> 4, 56 -> 10, 12 -> 8}]|78 |
  9. +--------+------------------------------+----+

答案2

得分: 0

自 Spark 3.1 起,您可以在爆炸之前过滤您的 cleaned_home_code 列,仅保留指定列表中的值。

然而,指定的列表不能是一个数据框,所以您首先需要将您的 nyc_code 转换为 Python 列表。在您的情况下,由于您的 nyc_code 数据框相对较小(约 6000 条记录),您可以收集它并将其传递给 filter 函数以删除不在 nyc_code 中的所有值。然后,您可以爆炸过滤后的列,如下所示:

  1. from pyspark.sql import functions as F
  2. # 创建要保留的代码列表
  3. kept_codes = [r[0] for r in nyc_code.select(F.col('code').cast('string')).distinct().collect()]
  4. # 在过滤后的映射上爆炸
  5. result = joined_data.withColumn(
  6. 'code',
  7. F.explode(
  8. F.filter(
  9. F.map_keys(F.col('cleaned_home_code')),
  10. lambda c: c.isin(kept_codes)
  11. )
  12. )
  13. )

然后,您将获得以下 result 数据框:

  1. +--------+----------------------------+----+
  2. |poi_code|cleaned_home_code |code|
  3. +--------+----------------------------+----+
  4. |1 |{56 -> 8, 34 -> 2, 12 -> 4} |56 |
  5. |1 |{56 -> 8, 34 -> 2, 12 -> 4} |12 |
  6. |2 |{78 -> 4, 56 -> 10, 12 -> 8}|78 |
  7. |2 |{78 -> 4, 56 -> 10, 12 -> 8}|56 |
  8. |2 |{78 -> 4, 56 -> 10, 12 -> 8}|12 |
  9. +--------+----------------------------+----+
英文:

Since Spark 3.1, You can filter your cleaned_home_code column, keeping only values in a specified list before exploding it.

However, this specified list can't be a dataframe so you first need to transform your nyc_code to a python list. In your case, as your nyc_code dataframe is rather small (~6000 records), you can collect it and pass it to the filter function to remove all values not in nyc_code. Then, you can explode filtered column, as follows:

  1. from pyspark.sql import functions as F
  2. # create list of codes you want to keep
  3. kept_codes = [r[0] for r in nyc_code.select(F.col('code').cast('string')).distinct().collect()]
  4. # explode on filtered map
  5. result = joined_data.withColumn(
  6. 'code',
  7. F.explode(
  8. F.filter(
  9. F.map_keys(F.col('cleaned_home_code')),
  10. lambda c: c.isin(kept_codes)
  11. )
  12. )
  13. )

You then get the following result dataframe:

  1. +--------+----------------------------+----+
  2. |poi_code|cleaned_home_code |code|
  3. +--------+----------------------------+----+
  4. |1 |{56 -> 8, 34 -> 2, 12 -> 4} |56 |
  5. |1 |{56 -> 8, 34 -> 2, 12 -> 4} |12 |
  6. |2 |{78 -> 4, 56 -> 10, 12 -> 8}|78 |
  7. |2 |{78 -> 4, 56 -> 10, 12 -> 8}|56 |
  8. |2 |{78 -> 4, 56 -> 10, 12 -> 8}|12 |
  9. +--------+----------------------------+----+
  10. ``
  11. </details>

huangapple
  • 本文由 发表于 2023年5月17日 14:24:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/76269090.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定