英文:
How can values in a Spark array column be efficiently replaced with values from a Pandas data frame?
问题
I understand your request. Here's the translated code portion:
我有一个包含销售篮子中产品ID数组的Spark数据框。
df_baskets = spark.createDataFrame(
[(1, ["546", "689", "946"]), (2, ["546", "799"] )],
("case_id","basket")
)
产品数据 = pd.DataFrame({
"product_id": ["546", "689", "946", "799"],
"new_product_id": ["S12", "S74", "S34", "S56"]
})
def 获取新ID(product_id: str) -> str:
try:
行 = 产品数据[产品数据["product_id"] == product_id]
return 行["new_product_id"].item()
except ValueError:
return product_id
应用获取新ID = F.udf(lambda basket: [获取新ID(product) for product in basket], T.ArrayType(T.StringType()))
df_baskets = (
df_baskets
.withColumn('basket_renamed', 应用获取新ID(F.col('basket')))
)
df_baskets.show()
#+-------+---------------+---------------+
#|case_id| basket| basket_renamed|
#+-------+---------------+---------------+
#| 1|["546", "689", "946"]|["S12", "S74", "S34"]|
#| 2| ["546", "799"]| ["S12", "S56"]|
#+-------+---------------+---------------+
Please note that I have translated the code as per your instructions. Let me know if you need any further assistance!
英文:
I have a Spark data frame that contains a column of arrays with product ids from sold baskets.
import pandas as pd
import pyspark.sql.types as T
from pyspark.sql import functions as F
df_baskets = spark.createDataFrame(
[(1, ["546", "689", "946"]), (2, ["546", "799"] )],
("case_id","basket")
)
df_baskets.show()
#+-------+---------------+
#|case_id| basket|
#+-------+---------------+
#| 1|[546, 689, 946]|
#| 2| [546, 799]|
#+-------+---------------+
I would like to replace the product ids in each array with new ids given in a pandas data frame.
product_data = pd.DataFrame({
"product_id": ["546", "689", "946", "799"],
"new_product_id": ["S12", "S74", "S34", "S56"]
})
product_data
I was able to replace the values by applying a simple python function to the column that performs a lookup on the python data frame.
def get_new_id(product_id: str) -> str:
try:
row = product_data[product_data["product_id"] == product_id]
return row["new_product_id"].item()
except ValueError:
return product_id
apply_get = F.udf(lambda basket: [get_new_id(product) for product in basket], T.ArrayType(T.StringType()))
df_baskets = (
df_baskets
.withColumn('basket_renamed', apply_get(F.col('basket')))
)
df_baskets.show()
#+-------+---------------+---------------+
#|case_id| basket| basket_renamed|
#+-------+---------------+---------------+
#| 1|[546, 689, 946]|[S12, S74, S34]|
#| 2| [546, 799]| [S12, S56]|
#+-------+---------------+---------------+
However, this approach has proven to be quite slow in data frames containing several tens of millions of cases. Is there more efficient way to do this replacement (e.g. by using a different data structure than a pandas data frame or a different method)?
答案1
得分: 2
以下是您要翻译的内容:
您可以将原始数据拆分并加入到product_data
(在将其转换为Spark数据框之后):
(
df_baskets
.withColumn("basket", F.explode(F.col("basket")))
.join(
spark.createDataFrame(product_data)
.withColumnRenamed("product_id", "basket")
.withColumnRenamed("new_product_id", "basket_renamed"),
on="basket"
)
.groupby("case_id")
.agg(
F.collect_list(F.col("basket")).alias("basket"),
F.collect_list(F.col("basket_renamed")).alias("basket_renamed")
)
).show()
输出:
|case_id| basket| basket_renamed|
+-------+---------------+---------------+
| 1|[546, 689, 946]|[S12, S74, S34]|
| 2| [546, 799]| [S12, S56]|
+-------+---------------+---------------+
英文:
You could explode your original data and join on product_data
(after converting it to a spark frame)
(
df_baskets
.withColumn("basket", F.explode(F.col("basket")))
.join(
spark.createDataFrame(product_data)
.withColumnRenamed("product_id", "basket")
.withColumnRenamed("new_product_id", "basket_renamed"),
on="basket"
)
.groupby("case_id")
.agg(
F.collect_list(F.col("basket")).alias("basket"),
F.collect_list(F.col("basket_renamed")).alias("basket_renamed")
)
).show()
Output:
|case_id| basket| basket_renamed|
+-------+---------------+---------------+
| 1|[546, 689, 946]|[S12, S74, S34]|
| 2| [546, 799]| [S12, S56]|
+-------+---------------+---------------+
答案2
得分: 2
你可以使用RDD和map
。
将pandas dataframe的行转换为{old: new}
值的字典,然后在RDD中使用map
来获取映射的new_product_id
。
这是一个示例:
# 将pandas df转换为字典
# 还可以以其他方式完成
old_new_id_dict = {}
for i in range(len(product_data_df)):
old_new_id_dict[product_data_df.loc[i, 'product_id']] = product_data_df.loc[i, 'new_product_id']
# {'546': 'S12', '689': 'S74', '946': 'S34', '799': 'S56'}
old_new_id_dict_bc = spark.sparkContext.broadcast(old_new_id_dict)
# 使用`map`处理值
data_sdf.rdd. \
map(lambda r: (r.case_id, r.basket, [old_new_id_dict_bc.value[k] for k in r.basket])). \
toDF(['case_id', 'basket', 'new_basket']). \
show()
# +-------+---------------+---------------+
# |case_id| basket| new_basket|
# +-------+---------------+---------------+
# | 1|[546, 689, 946]|[S12, S74, S34]|
# | 2| [546, 799]| [S12, S56]|
# +-------+---------------+---------------+
希望这能帮助你。
英文:
you could use RDD and map
.
convert the pandas dataframe rows to a dict
of {old: new}
values. then use a map
in RDD to fetch the mapped new_product_id
.
here's an example
# convert pandas df to dict
# can be done in other ways as well
old_new_id_dict = {}
for i in range(len(product_data_df)):
old_new_id_dict[product_data_df.loc[i, 'product_id']] = product_data_df.loc[i, 'new_product_id']
# {'546': 'S12', '689': 'S74', '946': 'S34', '799': 'S56'}
old_new_id_dict_bc = spark.sparkContext.broadcast(old_new_id_dict)
# `map` the values
data_sdf.rdd. \
map(lambda r: (r.case_id, r.basket, [old_new_id_dict_bc.value[k] for k in r.basket])). \
toDF(['case_id', 'basket', 'new_basket']). \
show()
# +-------+---------------+---------------+
# |case_id| basket| new_basket|
# +-------+---------------+---------------+
# | 1|[546, 689, 946]|[S12, S74, S34]|
# | 2| [546, 799]| [S12, S56]|
# +-------+---------------+---------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论