How can we load a non delimited text file using spark scala and save it as a CSV file where column lengths are given dynamically?

huangapple go评论45阅读模式
英文:

How can we load a non delimited text file using spark scala and save it as a CSV file where column lengths are given dynamically?

问题

以下是要翻译的部分:

"如果我们有三列作为模式,名称,地址和年龄,文件中的一行有92个字符,其中前50个字符是名称,接下来的40个字符是地址,最后2个字符是年龄,而这些列的长度可能会变化并会动态给出,那么如何读取文件,将其分隔并保存为文本文件?"

"完全无法理解这个想法"

英文:

For ex - if we have three columns Name , address and age as schema and a line in file has 92 chars where first 50 are name, next 40 are address and last 2 chars is age, and if these column lengths might vary and will be given dynamically, how to read a file and make it delimited and save it as text file?

Could not get the idea at all

答案1

得分: 1

以下是翻译好的部分:

读取文件

我假设你有一个名为input.txt的文件,内容如下:

1 John 100
2 Jack 200
3 Jonah300
10JJ   400

我还假设这些是列名:

// 列名, 长度
val columns = Vector(("id", 2), ("name", 5), ("value", 3))

你应该找到每个列的起始位置:

case class ColumnInfo(name: String, length: Int, position: Int)

val columnInfos = columns.tail.foldLeft(Vector(ColumnInfo(columns.head._1, columns.head._2, 1))) { (acc, current) =>
  acc :+ ColumnInfo(current._1, current._2, acc.last.position + acc.last.length)
}

这将是结果:

Vector(ColumnInfo(id,2,1), ColumnInfo(name,5,3), ColumnInfo(value,3,8))

现在,你可以使用以下代码读取和解析这个文件:

val sparkCols = columnInfos map { columnInfo =>
  trim(substring(col("value"), columnInfo.position, columnInfo.length)) as columnInfo.name
}

val df = spark.read
  .text("input.txt")
  .select(sparkCols: _*)

df.show()

这将是结果:

+---+-----+-----+
| id| name|value|
+---+-----+-----+
|  1| John|  100|
|  2| Jack|  200|
|  3|Jonah|  300|
| 10|   JJ|  400|
+---+-----+-----+

保存文件

你可以使用以下代码保存文件:

df.repartition(1).write.option("header", true).csv("output.csv")

这将是结果:

id,name,value
1,John,100
2,Jack,200
3,Jonah,300
10,JJ,400
英文:

Your question has two parts.

Read file

I assume you have a file named input.txt like this:

1 John 100
2 Jack 200
3 Jonah300
10JJ   400

I also assumed that these are the columns:

// Column Name, Length
val columns = Vector(("id", 2), ("name", 5), ("value", 3))

You should find starting position for each column:

case class ColumnInfo(name: String, length: Int, position: Int)

val columnInfos = columns.tail.foldLeft(Vector(ColumnInfo(columns.head._1, columns.head._2, 1))) { (acc, current) =>
  acc :+ ColumnInfo(current._1, current._2, acc.last.position + acc.last.length)
}

This will be the result:

Vector(ColumnInfo(id,2,1), ColumnInfo(name,5,3), ColumnInfo(value,3,8))

Now, you can read and parse this file using this code:

val sparkCols = columnInfos map { columnInfo =>
  trim(substring(col("value"), columnInfo.position, columnInfo.length)) as columnInfo.name
}

val df = spark.read
  .text("input.txt")
  .select(sparkCols: _*)

df.show()

This will be the result:

+---+-----+-----+
| id| name|value|
+---+-----+-----+
|  1| John|  100|
|  2| Jack|  200|
|  3|Jonah|  300|
| 10|   JJ|  400|
+---+-----+-----+

Save file

You can save file using this code:

df.repartition(1).write.option("header", true).csv("output.csv")

This will be the result:

id,name,value
1,John,100
2,Jack,200
3,Jonah,300
10,JJ,400

huangapple
  • 本文由 发表于 2023年2月14日 19:51:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/75447433.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定