更新Spark dataframe的列名,基于Map类型的键值对。

huangapple go评论87阅读模式
英文:

Update Spark dataframe column names based on Map type key value pairs

问题

我有一个Spark dataframe df。我需要根据Map类型的键值对来更新Spark dataframe的列名。

df.show()

   | col1|col2 |col3|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|

Map_value = (col1 -> id, col2 -> name, col3 -> year)

需要帮助,我不确定如何继续。

期望的输出:

   | id  | name|year|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|
英文:

I have a spark dataframe df. I need to update Spark dataframe column names based on Map type key value pairs.

 df.show()

   | col1|col2 |col3|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|

 Map_value = (col1 -> id, col2 -> name, col3 -> year)

Need help. I am not sure how to proceed

Expected output:

   | id  | name|year|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|

答案1

得分: 1

以下是您要翻译的内容:

case class ColData(col1: String, col2: String, col3: Int)

定义在顶层:

    val sourceSeq = Seq(
      ColData("2", "Ive", 1989),
      ColData("Tom", null, 1981),
      ColData("4", "John", 1991),
    )

    import sparkSession.implicits._

    def mapFields[T](ds: Dataset[T], fieldNameMap: Map[String, String]): DataFrame = {
      // 确保字段存在 - 请注意,这不是一个免费操作
      val fieldNames = ds.schema.fieldNames.toSet
      val newNames = fieldNameMap.filterKeys(fieldNames).map{ 
        case (oldFieldName, newFieldName) => col(oldFieldName).as(newFieldName)
      }.toSeq
      
      ds.select(newNames: _*)
    }

    val newNames = mapFields(sourceSeq.toDS(), Map("col1" -> "id", "col2" -> "name", "col3" -> "year", "not a field" -> "field"))

    newNames.show()

产生的结果:

+---+----+----+
| id|name|year|
+---+----+----+
|  2| Ive|1989|
|Tom|null|1981|
|  4|John|1991|
+---+----+----+

注意:

字段名称检查使用ds.schema,这可能非常昂贵,因此最好使用已知字段而不是.schema。
使用withColumn或withColumn重命名大量字段可能会严重影响性能,因为并非所有的投影都实际上在生成的代码中被删除,尽量在可能的情况下保持投影数量较少。

英文:

Given:

case class ColData(col1: String, col2: String, col3: Int)

defined at a top level:

    val sourceSeq = Seq(
      ColData("2", "Ive", 1989),
      ColData("Tom", null, 1981),
      ColData("4", "John", 1991),
    )

    import sparkSession.implicits._

    def mapFields[T](ds: Dataset[T], fieldNameMap: Map[String, String]): DataFrame = {
      // make sure the fields are present - note this is not a free operation
      val fieldNames = ds.schema.fieldNames.toSet
      val newNames = fieldNameMap.filterKeys(fieldNames).map{ 
        case (oldFieldName, newFieldName) => col(oldFieldName).as(newFieldName)
      }.toSeq
      
      ds.select(newNames: _*)
    }

    val newNames = mapFields(sourceSeq.toDS(), Map("col1" -> "id", "col2" -> "name", "col3" -> "year", "not a field" -> "field"))

    newNames.show()

yielding:

+---+----+----+
| id|name|year|
+---+----+----+
|  2| Ive|1989|
|Tom|null|1981|
|  4|John|1991|
+---+----+----+

Note:

The fieldNames check uses ds.schema, which can be very expensive so prefer to use known fields instead of .schema.
Using withColumn or withColumn renamed over lots of fields can severely impact performance as not all the projections are actually removed in generated code, prefer to keep the number of projections low where possible.

答案2

得分: 0

你可以使用withColumnRenamed来重命名一个列。

所以使用伪代码,代码应该是:

map_value.foreach((k,v) ->  df = df.withColumnRenamed(k,v))

对于你的映射中的每个/,在数据框中,将列重命名为新名称

英文:

You can use withColumnRenamed to renanme a column.

so using pseudo-code, the code should be:

map_value.foreach((k,v) ->  df = df.withcolumnrenamed(k,v))

For each key/value in your map, in the dataframe, rename the column key, by the new name value.

答案3

得分: 0

这是另一种解决方案的代码部分:

import org.apache.spark.sql.functions.col
val mapValue = Map(col1 -> id, col2 -> name, col3 -> year)

val colsAll = df.columns
val dfTransform = df.select(colsAll.map(c => col(c).as(mapValue.getOrElse(c,c))): _*)

select 是另一种有用的重命名列的方法,在这种情况下,您可以重命名所有列,然后使用 getOrElse 来更改字段是否存在。

英文:

This another way of solution is fine:

import org.apache.spark.sql.functions.col
val mapValue = Map(col1 -> id, col2 -> name, col3 -> year)

val colsAll = df.columns
val dfTransform = df.select(colsAll.map(c => col(c).as(mapValue.getOrElse(c,c))): _*)

Select is another helpful way to rename columns, in this case, you rename all columns, and then with getOrElse you can change if the field exits.

huangapple
  • 本文由 发表于 2023年8月4日 22:19:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76836779.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定