Spark数组列中的值如何高效地替换为来自Pandas数据框的值?

huangapple go评论103阅读模式
英文:

How can values in a Spark array column be efficiently replaced with values from a Pandas data frame?

问题

I understand your request. Here's the translated code portion:

  1. 我有一个包含销售篮子中产品ID数组的Spark数据框。
  2. df_baskets = spark.createDataFrame(
  3. [(1, ["546", "689", "946"]), (2, ["546", "799"] )],
  4. ("case_id","basket")
  5. )
  6. 产品数据 = pd.DataFrame({
  7. "product_id": ["546", "689", "946", "799"],
  8. "new_product_id": ["S12", "S74", "S34", "S56"]
  9. })
  10. def 获取新ID(product_id: str) -> str:
  11. try:
  12. = 产品数据[产品数据["product_id"] == product_id]
  13. return 行["new_product_id"].item()
  14. except ValueError:
  15. return product_id
  16. 应用获取新ID = F.udf(lambda basket: [获取新ID(product) for product in basket], T.ArrayType(T.StringType()))
  17. df_baskets = (
  18. df_baskets
  19. .withColumn('basket_renamed', 应用获取新ID(F.col('basket')))
  20. )
  21. df_baskets.show()
  22. #+-------+---------------+---------------+
  23. #|case_id| basket| basket_renamed|
  24. #+-------+---------------+---------------+
  25. #| 1|["546", "689", "946"]|["S12", "S74", "S34"]|
  26. #| 2| ["546", "799"]| ["S12", "S56"]|
  27. #+-------+---------------+---------------+

Please note that I have translated the code as per your instructions. Let me know if you need any further assistance!

英文:

I have a Spark data frame that contains a column of arrays with product ids from sold baskets.

  1. import pandas as pd
  2. import pyspark.sql.types as T
  3. from pyspark.sql import functions as F
  4. df_baskets = spark.createDataFrame(
  5. [(1, ["546", "689", "946"]), (2, ["546", "799"] )],
  6. ("case_id","basket")
  7. )
  8. df_baskets.show()
  9. #+-------+---------------+
  10. #|case_id| basket|
  11. #+-------+---------------+
  12. #| 1|[546, 689, 946]|
  13. #| 2| [546, 799]|
  14. #+-------+---------------+

I would like to replace the product ids in each array with new ids given in a pandas data frame.

  1. product_data = pd.DataFrame({
  2. "product_id": ["546", "689", "946", "799"],
  3. "new_product_id": ["S12", "S74", "S34", "S56"]
  4. })
  5. product_data

I was able to replace the values by applying a simple python function to the column that performs a lookup on the python data frame.

  1. def get_new_id(product_id: str) -> str:
  2. try:
  3. row = product_data[product_data["product_id"] == product_id]
  4. return row["new_product_id"].item()
  5. except ValueError:
  6. return product_id
  7. apply_get = F.udf(lambda basket: [get_new_id(product) for product in basket], T.ArrayType(T.StringType()))
  8. df_baskets = (
  9. df_baskets
  10. .withColumn('basket_renamed', apply_get(F.col('basket')))
  11. )
  12. df_baskets.show()
  13. #+-------+---------------+---------------+
  14. #|case_id| basket| basket_renamed|
  15. #+-------+---------------+---------------+
  16. #| 1|[546, 689, 946]|[S12, S74, S34]|
  17. #| 2| [546, 799]| [S12, S56]|
  18. #+-------+---------------+---------------+

However, this approach has proven to be quite slow in data frames containing several tens of millions of cases. Is there more efficient way to do this replacement (e.g. by using a different data structure than a pandas data frame or a different method)?

答案1

得分: 2

以下是您要翻译的内容:

您可以将原始数据拆分并加入到product_data(在将其转换为Spark数据框之后):

  1. (
  2. df_baskets
  3. .withColumn("basket", F.explode(F.col("basket")))
  4. .join(
  5. spark.createDataFrame(product_data)
  6. .withColumnRenamed("product_id", "basket")
  7. .withColumnRenamed("new_product_id", "basket_renamed"),
  8. on="basket"
  9. )
  10. .groupby("case_id")
  11. .agg(
  12. F.collect_list(F.col("basket")).alias("basket"),
  13. F.collect_list(F.col("basket_renamed")).alias("basket_renamed")
  14. )
  15. ).show()

输出:

  1. |case_id| basket| basket_renamed|
  2. +-------+---------------+---------------+
  3. | 1|[546, 689, 946]|[S12, S74, S34]|
  4. | 2| [546, 799]| [S12, S56]|
  5. +-------+---------------+---------------+
英文:

You could explode your original data and join on product_data (after converting it to a spark frame)

  1. (
  2. df_baskets
  3. .withColumn("basket", F.explode(F.col("basket")))
  4. .join(
  5. spark.createDataFrame(product_data)
  6. .withColumnRenamed("product_id", "basket")
  7. .withColumnRenamed("new_product_id", "basket_renamed"),
  8. on="basket"
  9. )
  10. .groupby("case_id")
  11. .agg(
  12. F.collect_list(F.col("basket")).alias("basket"),
  13. F.collect_list(F.col("basket_renamed")).alias("basket_renamed")
  14. )
  15. ).show()

Output:

  1. |case_id| basket| basket_renamed|
  2. +-------+---------------+---------------+
  3. | 1|[546, 689, 946]|[S12, S74, S34]|
  4. | 2| [546, 799]| [S12, S56]|
  5. +-------+---------------+---------------+

答案2

得分: 2

你可以使用RDD和map

将pandas dataframe的行转换为{old: new}值的字典,然后在RDD中使用map来获取映射的new_product_id

这是一个示例:

  1. # 将pandas df转换为字典
  2. # 还可以以其他方式完成
  3. old_new_id_dict = {}
  4. for i in range(len(product_data_df)):
  5. old_new_id_dict[product_data_df.loc[i, 'product_id']] = product_data_df.loc[i, 'new_product_id']
  6. # {'546': 'S12', '689': 'S74', '946': 'S34', '799': 'S56'}
  7. old_new_id_dict_bc = spark.sparkContext.broadcast(old_new_id_dict)
  8. # 使用`map`处理值
  9. data_sdf.rdd. \
  10. map(lambda r: (r.case_id, r.basket, [old_new_id_dict_bc.value[k] for k in r.basket])). \
  11. toDF(['case_id', 'basket', 'new_basket']). \
  12. show()
  13. # +-------+---------------+---------------+
  14. # |case_id| basket| new_basket|
  15. # +-------+---------------+---------------+
  16. # | 1|[546, 689, 946]|[S12, S74, S34]|
  17. # | 2| [546, 799]| [S12, S56]|
  18. # +-------+---------------+---------------+

希望这能帮助你。

英文:

you could use RDD and map.

convert the pandas dataframe rows to a dict of {old: new} values. then use a map in RDD to fetch the mapped new_product_id.

here's an example

  1. # convert pandas df to dict
  2. # can be done in other ways as well
  3. old_new_id_dict = {}
  4. for i in range(len(product_data_df)):
  5. old_new_id_dict[product_data_df.loc[i, 'product_id']] = product_data_df.loc[i, 'new_product_id']
  6. # {'546': 'S12', '689': 'S74', '946': 'S34', '799': 'S56'}
  7. old_new_id_dict_bc = spark.sparkContext.broadcast(old_new_id_dict)
  8. # `map` the values
  9. data_sdf.rdd. \
  10. map(lambda r: (r.case_id, r.basket, [old_new_id_dict_bc.value[k] for k in r.basket])). \
  11. toDF(['case_id', 'basket', 'new_basket']). \
  12. show()
  13. # +-------+---------------+---------------+
  14. # |case_id| basket| new_basket|
  15. # +-------+---------------+---------------+
  16. # | 1|[546, 689, 946]|[S12, S74, S34]|
  17. # | 2| [546, 799]| [S12, S56]|
  18. # +-------+---------------+---------------+

huangapple
  • 本文由 发表于 2023年4月13日 20:10:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/76005256.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定