Spark数组列中的值如何高效地替换为来自Pandas数据框的值?

huangapple go评论71阅读模式
英文:

How can values in a Spark array column be efficiently replaced with values from a Pandas data frame?

问题

I understand your request. Here's the translated code portion:

我有一个包含销售篮子中产品ID数组的Spark数据框。

df_baskets = spark.createDataFrame(
    [(1, ["546", "689", "946"]), (2, ["546", "799"] )],
    ("case_id","basket")
)

产品数据 = pd.DataFrame({
  "product_id": ["546", "689", "946", "799"],
  "new_product_id": ["S12", "S74", "S34", "S56"]
  })

def 获取新ID(product_id: str) -> str:
  try:
    行 = 产品数据[产品数据["product_id"] == product_id]
    return 行["new_product_id"].item()
  except ValueError:
    return product_id

应用获取新ID = F.udf(lambda basket: [获取新ID(product) for product in basket], T.ArrayType(T.StringType()))

df_baskets = (
  df_baskets
    .withColumn('basket_renamed', 应用获取新ID(F.col('basket')))
)

df_baskets.show()

#+-------+---------------+---------------+
#|case_id|         basket| basket_renamed|
#+-------+---------------+---------------+
#|      1|["546", "689", "946"]|["S12", "S74", "S34"]|
#|      2|     ["546", "799"]|     ["S12", "S56"]|
#+-------+---------------+---------------+

Please note that I have translated the code as per your instructions. Let me know if you need any further assistance!

英文:

I have a Spark data frame that contains a column of arrays with product ids from sold baskets.

import pandas as pd 
import pyspark.sql.types as T
from pyspark.sql import functions as F

df_baskets = spark.createDataFrame(
    [(1, ["546", "689", "946"]), (2, ["546", "799"] )],
    ("case_id","basket")
)

df_baskets.show()

#+-------+---------------+
#|case_id|         basket|
#+-------+---------------+
#|      1|[546, 689, 946]|
#|      2|     [546, 799]|
#+-------+---------------+

I would like to replace the product ids in each array with new ids given in a pandas data frame.


product_data = pd.DataFrame({
  "product_id": ["546", "689", "946", "799"],
  "new_product_id": ["S12", "S74", "S34", "S56"]
  })

product_data

I was able to replace the values by applying a simple python function to the column that performs a lookup on the python data frame.


def get_new_id(product_id: str) -> str:
  try:
    row = product_data[product_data["product_id"] == product_id]
    return row["new_product_id"].item()
  except ValueError:
    return product_id

apply_get = F.udf(lambda basket: [get_new_id(product) for product in basket], T.ArrayType(T.StringType()))

df_baskets = (
  df_baskets
    .withColumn('basket_renamed', apply_get(F.col('basket')))
)

df_baskets.show()

#+-------+---------------+---------------+
#|case_id|         basket| basket_renamed|
#+-------+---------------+---------------+
#|      1|[546, 689, 946]|[S12, S74, S34]|
#|      2|     [546, 799]|     [S12, S56]|
#+-------+---------------+---------------+

However, this approach has proven to be quite slow in data frames containing several tens of millions of cases. Is there more efficient way to do this replacement (e.g. by using a different data structure than a pandas data frame or a different method)?

答案1

得分: 2

以下是您要翻译的内容:

您可以将原始数据拆分并加入到product_data(在将其转换为Spark数据框之后):

(
    df_baskets
    .withColumn("basket", F.explode(F.col("basket")))
    .join(
        spark.createDataFrame(product_data)
        .withColumnRenamed("product_id", "basket")
        .withColumnRenamed("new_product_id", "basket_renamed"),
        on="basket"
    )
    .groupby("case_id")
    .agg(
        F.collect_list(F.col("basket")).alias("basket"),
        F.collect_list(F.col("basket_renamed")).alias("basket_renamed")
    )
).show()

输出:

|case_id|         basket| basket_renamed|
+-------+---------------+---------------+
|      1|[546, 689, 946]|[S12, S74, S34]|
|      2|     [546, 799]|     [S12, S56]|
+-------+---------------+---------------+
英文:

You could explode your original data and join on product_data (after converting it to a spark frame)

(
    df_baskets
    .withColumn("basket", F.explode(F.col("basket")))
    .join(
        spark.createDataFrame(product_data)
        .withColumnRenamed("product_id", "basket")
        .withColumnRenamed("new_product_id", "basket_renamed"),
        on="basket"
    )
    .groupby("case_id")
    .agg(
        F.collect_list(F.col("basket")).alias("basket"),
        F.collect_list(F.col("basket_renamed")).alias("basket_renamed")
    )
).show()

Output:

|case_id|         basket| basket_renamed|
+-------+---------------+---------------+
|      1|[546, 689, 946]|[S12, S74, S34]|
|      2|     [546, 799]|     [S12, S56]|
+-------+---------------+---------------+

答案2

得分: 2

你可以使用RDD和map

将pandas dataframe的行转换为{old: new}值的字典,然后在RDD中使用map来获取映射的new_product_id

这是一个示例:

# 将pandas df转换为字典
# 还可以以其他方式完成
old_new_id_dict = {}

for i in range(len(product_data_df)):
    old_new_id_dict[product_data_df.loc[i, 'product_id']] = product_data_df.loc[i, 'new_product_id']

# {'546': 'S12', '689': 'S74', '946': 'S34', '799': 'S56'}

old_new_id_dict_bc = spark.sparkContext.broadcast(old_new_id_dict)

# 使用`map`处理值
data_sdf.rdd. \
    map(lambda r: (r.case_id, r.basket, [old_new_id_dict_bc.value[k] for k in r.basket])). \
    toDF(['case_id', 'basket', 'new_basket']). \
    show()

# +-------+---------------+---------------+
# |case_id|         basket|     new_basket|
# +-------+---------------+---------------+
# |      1|[546, 689, 946]|[S12, S74, S34]|
# |      2|     [546, 799]|     [S12, S56]|
# +-------+---------------+---------------+

希望这能帮助你。

英文:

you could use RDD and map.

convert the pandas dataframe rows to a dict of {old: new} values. then use a map in RDD to fetch the mapped new_product_id.

here's an example

# convert pandas df to dict
# can be done in other ways as well
old_new_id_dict = {}

for i in range(len(product_data_df)):
    old_new_id_dict[product_data_df.loc[i, 'product_id']] = product_data_df.loc[i, 'new_product_id']

# {'546': 'S12', '689': 'S74', '946': 'S34', '799': 'S56'}

old_new_id_dict_bc = spark.sparkContext.broadcast(old_new_id_dict)

# `map` the values
data_sdf.rdd. \
    map(lambda r: (r.case_id, r.basket, [old_new_id_dict_bc.value[k] for k in r.basket])). \
    toDF(['case_id', 'basket', 'new_basket']). \
    show()

# +-------+---------------+---------------+
# |case_id|         basket|     new_basket|
# +-------+---------------+---------------+
# |      1|[546, 689, 946]|[S12, S74, S34]|
# |      2|     [546, 799]|     [S12, S56]|
# +-------+---------------+---------------+

huangapple
  • 本文由 发表于 2023年4月13日 20:10:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/76005256.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定