英文:
Update Spark dataframe column names based on Map type key value pairs
问题
我有一个Spark dataframe df。我需要根据Map类型的键值对来更新Spark dataframe的列名。
df.show()
| col1|col2 |col3|
| 2 | Ive|1989|
|Tom | null|1981|
| 4 | John|1991|
Map_value = (col1 -> id, col2 -> name, col3 -> year)
需要帮助,我不确定如何继续。
期望的输出:
| id | name|year|
| 2 | Ive|1989|
|Tom | null|1981|
| 4 | John|1991|
英文:
I have a spark dataframe df. I need to update Spark dataframe column names based on Map type key value pairs.
df.show()
| col1|col2 |col3|
| 2 | Ive|1989|
|Tom | null|1981|
| 4 | John|1991|
Map_value = (col1 -> id, col2 -> name, col3 -> year)
Need help. I am not sure how to proceed
Expected output:
| id | name|year|
| 2 | Ive|1989|
|Tom | null|1981|
| 4 | John|1991|
答案1
得分: 1
以下是您要翻译的内容:
case class ColData(col1: String, col2: String, col3: Int)
定义在顶层:
val sourceSeq = Seq(
ColData("2", "Ive", 1989),
ColData("Tom", null, 1981),
ColData("4", "John", 1991),
)
import sparkSession.implicits._
def mapFields[T](ds: Dataset[T], fieldNameMap: Map[String, String]): DataFrame = {
// 确保字段存在 - 请注意,这不是一个免费操作
val fieldNames = ds.schema.fieldNames.toSet
val newNames = fieldNameMap.filterKeys(fieldNames).map{
case (oldFieldName, newFieldName) => col(oldFieldName).as(newFieldName)
}.toSeq
ds.select(newNames: _*)
}
val newNames = mapFields(sourceSeq.toDS(), Map("col1" -> "id", "col2" -> "name", "col3" -> "year", "not a field" -> "field"))
newNames.show()
产生的结果:
+---+----+----+
| id|name|year|
+---+----+----+
| 2| Ive|1989|
|Tom|null|1981|
| 4|John|1991|
+---+----+----+
注意:
字段名称检查使用ds.schema,这可能非常昂贵,因此最好使用已知字段而不是.schema。
使用withColumn或withColumn重命名大量字段可能会严重影响性能,因为并非所有的投影都实际上在生成的代码中被删除,尽量在可能的情况下保持投影数量较少。
英文:
Given:
case class ColData(col1: String, col2: String, col3: Int)
defined at a top level:
val sourceSeq = Seq(
ColData("2", "Ive", 1989),
ColData("Tom", null, 1981),
ColData("4", "John", 1991),
)
import sparkSession.implicits._
def mapFields[T](ds: Dataset[T], fieldNameMap: Map[String, String]): DataFrame = {
// make sure the fields are present - note this is not a free operation
val fieldNames = ds.schema.fieldNames.toSet
val newNames = fieldNameMap.filterKeys(fieldNames).map{
case (oldFieldName, newFieldName) => col(oldFieldName).as(newFieldName)
}.toSeq
ds.select(newNames: _*)
}
val newNames = mapFields(sourceSeq.toDS(), Map("col1" -> "id", "col2" -> "name", "col3" -> "year", "not a field" -> "field"))
newNames.show()
yielding:
+---+----+----+
| id|name|year|
+---+----+----+
| 2| Ive|1989|
|Tom|null|1981|
| 4|John|1991|
+---+----+----+
Note:
The fieldNames check uses ds.schema, which can be very expensive so prefer to use known fields instead of .schema.
Using withColumn or withColumn renamed over lots of fields can severely impact performance as not all the projections are actually removed in generated code, prefer to keep the number of projections low where possible.
答案2
得分: 0
你可以使用withColumnRenamed
来重命名一个列。
所以使用伪代码,代码应该是:
map_value.foreach((k,v) -> df = df.withColumnRenamed(k,v))
对于你的映射中的每个键/值,在数据框中,将列键重命名为新名称值。
英文:
You can use withColumnRenamed
to renanme a column.
so using pseudo-code, the code should be:
map_value.foreach((k,v) -> df = df.withcolumnrenamed(k,v))
For each key/value in your map, in the dataframe, rename the column key, by the new name value.
答案3
得分: 0
这是另一种解决方案的代码部分:
import org.apache.spark.sql.functions.col
val mapValue = Map(col1 -> id, col2 -> name, col3 -> year)
val colsAll = df.columns
val dfTransform = df.select(colsAll.map(c => col(c).as(mapValue.getOrElse(c,c))): _*)
select
是另一种有用的重命名列的方法,在这种情况下,您可以重命名所有列,然后使用 getOrElse
来更改字段是否存在。
英文:
This another way of solution is fine:
import org.apache.spark.sql.functions.col
val mapValue = Map(col1 -> id, col2 -> name, col3 -> year)
val colsAll = df.columns
val dfTransform = df.select(colsAll.map(c => col(c).as(mapValue.getOrElse(c,c))): _*)
Select
is another helpful way to rename columns, in this case, you rename all columns, and then with getOrElse
you can change if the field exits.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论