将复杂的爆炸数据帧中的选定列添加到另一个PySpark数据帧中。

huangapple go评论124阅读模式
英文:

Add selected columns from complex exploding dataframe to another dataframe in pyspark

问题

作为示例数据,我有以下内容:

  1. {
  2. "GeneralInformation":{
  3. "ID":"00001",
  4. "WebLinksInfo":{
  5. "LastUpdated":"2019-10-27",
  6. "WebSite":{
  7. "Type":"Home Page",
  8. "text":"https://www.aaaa.com/"
  9. }
  10. },
  11. "TextInfo":{
  12. "Text":[
  13. {
  14. "Type":"Business",
  15. "updated_at":"2018-09-14",
  16. "unused_field":"en-US",
  17. "Description":"Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum."
  18. },
  19. {
  20. "Type":"Financial",
  21. "updated_at":"2022-08-26",
  22. "unused_field":"en-US",
  23. "Description":"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
  24. }
  25. ]
  26. },
  27. "Advisors":{
  28. "Auditor":{
  29. "Code":"AAA",
  30. "Name":"Aristotle"
  31. }
  32. }
  33. }
  34. }

我有这个模式的数据框:

  1. root
  2. |-- GeneralInformation: struct (nullable = true)
  3. | |-- Advisors: struct (nullable = true)
  4. | | |-- Auditor: struct (nullable = true)
  5. | | | |-- Code: string (nullable = true)
  6. | | | |-- Name: string (nullable = true)
  7. | |-- ID: string (nullable = true)
  8. | |-- TextInfo: struct (nullable = true)
  9. | | |-- Text: array (nullable = true)
  10. | | | |-- element: struct (containsNull = true)
  11. | | | | |-- Description: string (nullable = true)
  12. | | | | |-- updated_at: string (nullable = true)
  13. | | | | |-- Type: string (nullable = true)
  14. | | | | |-- unused_field: string (nullable = true)
  15. | |-- WebLinksInfo: struct (nullable = true)
  16. | | |-- LastUpdated: string (nullable = true)
  17. | | |-- WebSite: struct (nullable = true)
  18. | | | |-- text: string (nullable = true)
  19. | | | |-- Type: string (nullable = true)

我只对提取ID、TextInfo.Text.Description和updated_at感兴趣。

现在我像这样提取ID:

  1. df = spark.read.json('my_file_path')
  2. new_df = df.select(col('GeneralInformation.ID'))
  3. new_df = new_df.join(df.select(col('GeneralInformation.TextInfo.Text')))

得到以下模式,但我希望只在根级别上有ID、Description和Updated_at,并且只有一条记录,无论Text数组中有多少个描述,根据Type值,如果Type值为'Business',则获取该ID的描述和updated_at。

  1. root
  2. |-- ID: string (nullable = true)
  3. |-- Text: array (nullable = true)
  4. | |-- element: struct (containsNull = true)
  5. | | |-- Description: string (nullable = true)
  6. | | |-- updated_at: string (nullable = true)
  7. | | |-- Type: string (nullable = true)
  8. | | |-- unused_field: string (nullable = true)

期望的输出为:

  1. {
  2. "ID": "00001",
  3. "Description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum.",
  4. "Updated_at": "2018-09-14"
  5. }

例如,如果我使用:

  1. df = df.withColumn('description', F.when(
  2. df['Text'][0]['Type'] == 'Business', lit(df['Text'][0]['Description'])))

我会得到一个新的列作为数据框中的描述,我可以保留并删除TextInfo,但它不能保证我会查找Text数组的其他元素。

英文:

As sample data I have:

  1. {
  2. "GeneralInformation":{
  3. "ID":"00001",
  4. "WebLinksInfo":{
  5. "LastUpdated":"2019-10-27",
  6. "WebSite":{
  7. "Type":"Home Page",
  8. "text":"https://www.aaaa.com/"
  9. }
  10. },
  11. "TextInfo":{
  12. "Text":[
  13. {
  14. "Type":"Business",
  15. "updated_at":"2018-09-14",
  16. "unused_field":"en-US",
  17. "Description":"Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum."
  18. },
  19. {
  20. "Type":"Financial",
  21. "updated_at":"2022-08-26",
  22. "unused_field":"en-US",
  23. "Description":"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
  24. }
  25. ]
  26. },
  27. "Advisors":{
  28. "Auditor":{
  29. "Code":"AAA",
  30. "Name":"Aristotle"
  31. }
  32. }
  33. }
  34. }

I have this schema dataframe:

  1. root
  2. |-- GeneralInformation: struct (nullable = true)
  3. | |-- Advisors: struct (nullable = true)
  4. | | |-- Auditor: struct (nullable = true)
  5. | | | |-- Code: string (nullable = true)
  6. | | | |-- Name: string (nullable = true)
  7. | |-- ID: string (nullable = true)
  8. | |-- TextInfo: struct (nullable = true)
  9. | | |-- Text: array (nullable = true)
  10. | | | |-- element: struct (containsNull = true)
  11. | | | | |-- Description: string (nullable = true)
  12. | | | | |-- updated_at: string (nullable = true)
  13. | | | | |-- Type: string (nullable = true)
  14. | | | | |-- unused_field: string (nullable = true)
  15. | |-- WebLinksInfo: struct (nullable = true)
  16. | | |-- LastUpdated: string (nullable = true)
  17. | | |-- WebSite: struct (nullable = true)
  18. | | | |-- text: string (nullable = true)
  19. | | | |-- Type: string (nullable = true)

I am just interest on extracting ID and TextInfo.Text.Description and updated_at

Right now I extract the ID like this:

  1. df = spark.read.json('my_file_path')
  2. new_df = df.select(col('GeneralInformation.ID'))
  3. new_df = new_df.join(df.select(col('GeneralInformation.TextInfo.Text')))

End up with this schema, but I want to have just ID, Description and Updated_at on root Level, and just 1 record no matter how many descriptions exist on the Text array based on the Type value, so if Type value is 'Business', get description and updated_at for that ID.

  1. root
  2. |-- ID: string (nullable = true)
  3. |-- Text: array (nullable = true)
  4. | |-- element: struct (containsNull = true)
  5. | | |-- Description: string (nullable = true)
  6. | | |-- updated_at: string (nullable = true)
  7. | | |-- Type: string (nullable = true)
  8. | | |-- unused_field: string (nullable = true)

Expected output:

  1. {
  2. "ID": "00001",
  3. "Description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum.",
  4. "Updated_at": "2018-09-14"
  5. }

For example, if I use:

  1. df = df.withColumn('description', F.when(
  2. df['Text'][0]['Type'] == 'Business', lit(df['Text'][0]['Description'])))

I get the description out as a new column on the dataframe that I can keep and delete TextInfo, but it won't assure me that it will look on other elements of the Text array.

答案1

得分: 1

尝试通过索引访问**array**。

示例:

  1. from pyspark.sql.functions import *
  2. js_str = """{
  3. "GeneralInformation":{
  4. "ID":"00001",
  5. "WebLinksInfo":{
  6. "LastUpdated":"2019-10-27",
  7. "WebSite":{
  8. "Type":"Home Page",
  9. "text":"https://www.aaaa.com/"
  10. }
  11. },
  12. "TextInfo":{
  13. "Text":[
  14. {
  15. "Type":"Business",
  16. "updated_at":"2018-09-14",
  17. "unused_field":"en-US",
  18. "Description":"Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum."
  19. },
  20. {
  21. "Type":"Financial",
  22. "updated_at":"2022-08-26",
  23. "unused_field":"en-US",
  24. "Description":"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
  25. }
  26. ]
  27. },
  28. "Advisors":{
  29. "Auditor":{
  30. "Code":"AAA",
  31. "Name":"Aristotle"
  32. }
  33. }
  34. }
  35. }"""
  36. df = spark.read.json(sc.parallelize([js_str]), multiLine=True)
  37. df.select(col('GeneralInformation.ID'),
  38. col('GeneralInformation.TextInfo.Text.Description')[0].alias("Description"),
  39. col('GeneralInformation.TextInfo.Text.updated_at')[0].alias("updated_at")).\
  40. show(10,False)
  41. #+-----+-----------------------------------------------------------------+----------+
  42. #|ID |Description |updated_at|
  43. #+-----+-----------------------------------------------------------------+----------+
  44. #|00001|Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum.|2018-09-14|
  45. #+-----+-----------------------------------------------------------------+----------+
英文:

Try by accessing array with the index.

Example:

  1. from pyspark.sql.functions import *
  2. js_str = """{
  3. "GeneralInformation":{
  4. "ID":"00001",
  5. "WebLinksInfo":{
  6. "LastUpdated":"2019-10-27",
  7. "WebSite":{
  8. "Type":"Home Page",
  9. "text":"https://www.aaaa.com/"
  10. }
  11. },
  12. "TextInfo":{
  13. "Text":[
  14. {
  15. "Type":"Business",
  16. "updated_at":"2018-09-14",
  17. "unused_field":"en-US",
  18. "Description":"Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum."
  19. },
  20. {
  21. "Type":"Financial",
  22. "updated_at":"2022-08-26",
  23. "unused_field":"en-US",
  24. "Description":"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
  25. }
  26. ]
  27. },
  28. "Advisors":{
  29. "Auditor":{
  30. "Code":"AAA",
  31. "Name":"Aristotle"
  32. }
  33. }
  34. }
  35. }"""
  36. df = spark.read.json(sc.parallelize([js_str]), multiLine=True)
  37. df.select(col('GeneralInformation.ID'),
  38. col('GeneralInformation.TextInfo.Text.Description')[0].alias("Description"),
  39. col('GeneralInformation.TextInfo.Text.updated_at')[0].alias("updated_at")).\
  40. show(10,False)
  41. #+-----+-----------------------------------------------------------------+----------+
  42. #|ID |Description |updated_at|
  43. #+-----+-----------------------------------------------------------------+----------+
  44. #|00001|Lorem ipsum dolor sit amet, consectetur adipiscing elit, laborum.|2018-09-14|
  45. #+-----+-----------------------------------------------------------------+----------+

huangapple
  • 本文由 发表于 2023年8月8日 22:59:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/76860800.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定