英文:
Efficient way to replace values of multiple columns based on a dictionary map using pyspark
问题
I need to replace values of multiple columns (100s-1000s of columns) of a large parquet file. I am using pyspark.
I have a working implementation using replace
that works with fewer number of columns, but when the number of columns is in the order of 100s it is taking a long time to even generate the spark plan from what I can see(> 3-4s for each column). So, I am looking for an implementation that is faster.
value_label_map = {"col1": {"val1": "new_val1"}, "col2": {"val2": "new_val2"}}
for k, v in value_label_map.items():
print(f"replacing {k}")
columns_to_replace.append(k)
df = df.replace(to_replace=v, subset=k)
I tried an alternate approach, but I couldn't find a way to access the value of pyspark Column
object to be able to look up the dict.
Alternate impl
def replace_values(col, value_map):
if value_map:
return when(col.isin(list(value_map.keys())), value_label_map[col]).otherwise(col)
else:
return col
df = spark.read.parquet("some-path")
updated_cols = [replace_values(df[col_name], value_labels.get(col_name)).alias(col_name) for col_name in df_values_renamed.columns]
The problem with this is that I can't look up value_labels
using a column object.
英文:
I need to replace values of multiple columns (100s-1000s of columns) of a large parquet file. I am using pyspark.
I have a working implementation using replace
that works with fewer number of columns, but when the number of columns is in the order of 100s it is taking a long time to even generate the spark plan from what I can see(> 3-4s for each column). So, I am looking for an implementation that is faster.
value_label_map = {"col1": {"val1": "new_val1"}, "col2": {"val2": "new_val2"}}
for k, v in value_label_map.items():
print(f"replacing {k}")
columns_to_replace.append(k)
df = df.replace(to_replace=v, subset=k)
I tried an alternate approach, but I couldn't find a way to access the value of pyspark Column
object to be able to look up the dict.
Alternate impl
def replace_values(col, value_map):
if value_map:
return when(col.isin(list(value_map.keys())),value_label_map[col]).otherwise(col)
else:
return col
df = spark.read.parquet("some-path")
updated_cols = [replace_values(df[col_name], value_labels.get(col_name)).alias(col_name) for col_name in df_values_renamed.columns]
the problem with this is that I can't look up value_labels
using column object.
答案1
得分: 1
你可以尝试将所有内容放在一个select
语句中。由于replace
基于when
语句,让我们直接使用它们:
def replace_from_dict(col_name, dict):
"""对于字典中的每个(k,v)项,将col_name中的值k替换为值v。"""
res = None
for k, v in dict.items():
if res is None:
res = F.when(F.col(col_name) == k, F.lit(v))
else:
res = res.when(F.col(col_name) == k, F.lit(v))
return res.otherwise(F.col(col_name)).alias(col_name)
def replace_or_not(col_name):
"""如果需要,生成列替换,否则保留列。"""
if col_name in value_label_map:
return replace_from_dict(col_name, value_label_map[col_name])
else:
return col_name
result = df.select(*[replace_or_not(c) for c in df.columns])
英文:
You could try packing everything in one select
. Since replace
is based on when
statements, let's use them directly:
def replace_from_dict(col_name, dict):
"""for each (k,v) item in dict, replace value k from col_name by value v."""
res = None
for k, v in dict.items():
if res is None:
res = F.when(F.col(col_name) == k, F.lit(v))
else:
res = res.when(F.col(col_name) == k, F.lit(v))
return res.otherwise(F.col(col_name)).alias(col_name)
def replace_or_not(col_name):
"""generate a column replacement if need be, keeping the column otherwise"""
if col_name in value_label_map:
return replace_from_dict(col_name, value_label_map[col_name])
else:
return col_name
result = df.select(*[replace_or_not(c) for c in df.columns])
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论