使用pyspark基于字典映射以高效方式替换多列的值。

huangapple go评论96阅读模式
英文:

Efficient way to replace values of multiple columns based on a dictionary map using pyspark

问题

I need to replace values of multiple columns (100s-1000s of columns) of a large parquet file. I am using pyspark.

I have a working implementation using replace that works with fewer number of columns, but when the number of columns is in the order of 100s it is taking a long time to even generate the spark plan from what I can see(> 3-4s for each column). So, I am looking for an implementation that is faster.

  1. value_label_map = {"col1": {"val1": "new_val1"}, "col2": {"val2": "new_val2"}}
  2. for k, v in value_label_map.items():
  3. print(f"replacing {k}")
  4. columns_to_replace.append(k)
  5. df = df.replace(to_replace=v, subset=k)

I tried an alternate approach, but I couldn't find a way to access the value of pyspark Column object to be able to look up the dict.

Alternate impl

  1. def replace_values(col, value_map):
  2. if value_map:
  3. return when(col.isin(list(value_map.keys())), value_label_map[col]).otherwise(col)
  4. else:
  5. return col
  6. df = spark.read.parquet("some-path")
  7. updated_cols = [replace_values(df[col_name], value_labels.get(col_name)).alias(col_name) for col_name in df_values_renamed.columns]

The problem with this is that I can't look up value_labels using a column object.

英文:

I need to replace values of multiple columns (100s-1000s of columns) of a large parquet file. I am using pyspark.

I have a working implementation using replace that works with fewer number of columns, but when the number of columns is in the order of 100s it is taking a long time to even generate the spark plan from what I can see(> 3-4s for each column). So, I am looking for an implementation that is faster.

  1. value_label_map = {"col1": {"val1": "new_val1"}, "col2": {"val2": "new_val2"}}
  2. for k, v in value_label_map.items():
  3. print(f"replacing {k}")
  4. columns_to_replace.append(k)
  5. df = df.replace(to_replace=v, subset=k)

I tried an alternate approach, but I couldn't find a way to access the value of pyspark Column object to be able to look up the dict.

Alternate impl

  1. def replace_values(col, value_map):
  2. if value_map:
  3. return when(col.isin(list(value_map.keys())),value_label_map[col]).otherwise(col)
  4. else:
  5. return col
  6. df = spark.read.parquet("some-path")
  7. updated_cols = [replace_values(df[col_name], value_labels.get(col_name)).alias(col_name) for col_name in df_values_renamed.columns]

the problem with this is that I can't look up value_labels using column object.

答案1

得分: 1

你可以尝试将所有内容放在一个select语句中。由于replace基于when语句,让我们直接使用它们:

  1. def replace_from_dict(col_name, dict):
  2. """对于字典中的每个(k,v)项,将col_name中的值k替换为值v。"""
  3. res = None
  4. for k, v in dict.items():
  5. if res is None:
  6. res = F.when(F.col(col_name) == k, F.lit(v))
  7. else:
  8. res = res.when(F.col(col_name) == k, F.lit(v))
  9. return res.otherwise(F.col(col_name)).alias(col_name)
  10. def replace_or_not(col_name):
  11. """如果需要,生成列替换,否则保留列。"""
  12. if col_name in value_label_map:
  13. return replace_from_dict(col_name, value_label_map[col_name])
  14. else:
  15. return col_name
  16. result = df.select(*[replace_or_not(c) for c in df.columns])
英文:

You could try packing everything in one select. Since replace is based on when statements, let's use them directly:

  1. def replace_from_dict(col_name, dict):
  2. """for each (k,v) item in dict, replace value k from col_name by value v."""
  3. res = None
  4. for k, v in dict.items():
  5. if res is None:
  6. res = F.when(F.col(col_name) == k, F.lit(v))
  7. else:
  8. res = res.when(F.col(col_name) == k, F.lit(v))
  9. return res.otherwise(F.col(col_name)).alias(col_name)
  10. def replace_or_not(col_name):
  11. """generate a column replacement if need be, keeping the column otherwise"""
  12. if col_name in value_label_map:
  13. return replace_from_dict(col_name, value_label_map[col_name])
  14. else:
  15. return col_name
  16. result = df.select(*[replace_or_not(c) for c in df.columns])

huangapple
  • 本文由 发表于 2023年3月31日 23:10:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/75900088.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定