如何根据要求在SPARK AZURE-DATABRICKS中使用SCALA将JSON对象转换为列中的值

huangapple go评论152阅读模式
英文:

How to convert JSON object as a value in a column in SPARK AZURE-DATABRICKS using SCALA as per requirement

问题

  1. 同学们有人能帮我解决下面的问题吗
  2. 我有一个 Spark 数据框其中一个列的值是以 JSON 格式呈现的请参考下面的截图
  3. [![ - 评分][1]][1]
  4. [1]: https://i.stack.imgur.com/tWovr.png
  5. 我的需求如下
  6. 1. 我需要选择每一行中的 3 个对象中不为空的 "average_rating" 属性如果没有则取空值我需要将此值添加到名为 "Average_Rating" 的列中我需要从具有非空 "average_rating" 的对象中选择 "STATUS" 并将该值添加到名为 "average_rating_status" 的列中
  7. 2. 我需要选择每一行中的 3 个对象中不为空的 "number_of_recent_voters" 属性如果没有则取空值我需要将此值添加到名为 "number_of_recent_voters" 的列中我需要从具有非空 "number_of_recent_voters" 的对象中选择 "STATUS" 并将该值添加到名为 "number_of_recent_voters_status" 的列中
  8. 3. 我需要选择每一行中的 3 个对象中不为空的 "number_of_voters" 属性如果没有则取空值我需要将此值添加到名为 "number_of_voters" 的列中我需要从具有非空 "number_of_voters" 的对象中选择 "STATUS" 并将该值添加到名为 "number_of_voters_status" 的列中
  9. 我需要在我的 Azure 数据工厂笔记本中用 Scala 编写代码请问有人能帮忙提供代码吗
  10. 谢谢
  11. 编辑
  12. ```python
  13. +------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
  14. | compliance | average_rating | number_of_recent_voters | number_of_voters | average_rating_status | number_of_recent_voters_status | number_of_voters_status |
  15. +------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
  16. | true | 4.7 | 254 | 254 | PASS | PASS | PASS |
  17. +------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+

输出应该如上所示。

英文:

Guys Can someone help me in resolving the below problem:

I have a spark dataframe in which one of the column's values is coming in JSON format. Please find the below screen shot for your reference:
如何根据要求在SPARK AZURE-DATABRICKS中使用SCALA将JSON对象转换为列中的值

My requirement is as follows:

  1. I have to select the "average_rating" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "Average_Rating". I have to select the "STATUS" from the object where "average_rating" is not null and add the value to a column "average_rating_status".
  2. I have to select the "number_of_recent_voters" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "number_of_recent_voters". I have to select the "STATUS" from the object where "number_of_recent_voters" is not null and add the value to a column "number_of_recent_voters_status".
  3. I have to select the "number_of_voters" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "number_of_voters". I have to select the "STATUS" from the object where "number_of_voters" is not null and add the value to a column "number_of_voters_status".

I have to write the code in scala in my Azure data bricks notebook. Can anyone please help with the code.

Thank you

Edit:

  1. +------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
  2. | compliance | average_rating | number_of_recent_voters | number_of_voters | average_rating_status | number_of_recent_voters_status | number_of_voters_status |
  3. +------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
  4. | true | 4.7 | 254 | 254 | PASS | PASS | PASS |
  5. +------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+

Output should come like above.

答案1

得分: 1

  1. 这段代码通过使用`row_number`函数和在`rating`字段上定义的窗口规范向数据框中添加了一个`row_number`它使用`from_json`函数将`rating`列解析为一个结构数组然后使用`inline_outer`函数将数组展开为单独的行它按`row_number`列对生成的数据框进行分组并计算每个组的`average_rating``number_of_recent_voters``number_of_voters`列的最大值然后它将生成的数据框与原始数据框在`row_number``average_rating`列上进行连接并从原始数据框中选择`status`它将`status`列重命名为`average_rating_status`类似地对于`number_of_recent_voters``number_of_voters`列也是如此`status`列分别重命名为`number_of_recent_voters_status``number_of_voters_status`
  2. 最终得到的数据框`data6`包含所需的结果
英文:

Code

  1. val data = List(
  2. (true, """[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"FAIL"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]"""),
  3. (true, """[{"average_rating":2.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":123,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":324,"number_of_voters":null,"status":"PASS"}]""")
  4. ).toDF("compliance", "rating")
  5. data.show(false)
  6. import org.apache.spark.sql.expressions.Window
  7. import org.apache.spark.sql.functions.{from_json, row_number}
  8. val windowSpec = Window.orderBy("rating")
  9. val data1 = data.withColumn("row_number", row_number.over(windowSpec))
  10. val data2 = data1.selectExpr("compliance", "row_number", """inline_outer(from_json(rating, 'ARRAY<STRUCT<average_rating DOUBLE, number_of_recent_voters DOUBLE, number_of_voters DOUBLE, status STRING>>'))""")
  11. data2.show()
  12. val data3 = data2.groupBy("row_number").max("average_rating","number_of_recent_voters","number_of_voters").withColumnRenamed("max(average_rating)","average_rating").withColumnRenamed("max(number_of_recent_voters)","number_of_recent_voters").withColumnRenamed("max(number_of_voters)","number_of_voters")
  13. data3.show(false)
  14. val data4 = data3.join(data2, Seq("row_number", "average_rating"), "inner").select(data3.col("*"), data2.col("status")).withColumnRenamed("status","average_rating_status")
  15. display(data4)
  16. val data5 = data4.join(data2, Seq("row_number","number_of_recent_voters"),"inner").select(data4.col("*"),data2.col("status")).withColumnRenamed("status","number_of_recent_voters_status")
  17. display(data5)
  18. val data6 = data5.join(data2, Seq("row_number","number_of_voters"),"inner").select(data5.col("*"),data2.col("status")).withColumnRenamed("status","number_of_voters_status")
  19. display(data6)

This code adds a row_number column to the dataframe using the row_number function and the window specification defined on rating field. It uses the from_json function to parse the rating column as an array of structs, and then uses the inline_outer function to explode the array into separate rows. It groups the resulting dataframe by the row_number column and calculates the maximum value of the average_rating, number_of_recent_voters, and number_of_voters columns for each group. It then joins the resulting dataframe with the original dataframe on the row_number and average_rating columns, and selects the status column from the original dataframe. It renames the status column to average_rating_status.
Similarly, it is done for the number_of_recent_voters and number_of_voters columns, and renames the status columns to number_of_recent_voters_status and number_of_voters_status, respectively.

The resulting dataframe data6 has the required results.

答案2

得分: 0

  1. scala> val data = List( (true, """[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]""" ) ).toDF("compliance","rating")
  2. data: org.apache.spark.sql.DataFrame = [compliance: boolean, rating: string]
  3. scala> data.show(false)
  4. +----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  5. |compliance|rating |
  6. +----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  7. |true |[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]|
  8. +----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  1. scala> val finalDF = data.selectExpr("compliance", """inline_outer(from_json(rating, 'ARRAY<STRUCT<average_rating DOUBLE, number_of_recent_voters DOUBLE, number_of_voters DOUBLE, status STRING>>'))""")
  2. finalDF: org.apache.spark.sql.DataFrame = [compliance: boolean, average_rating: double ... 3 more fields]
  1. scala> finalDF.printSchema
  2. root
  3. |-- compliance: boolean (nullable = false)
  4. |-- average_rating: double (nullable = true)
  5. |-- number_of_recent_voters: double (nullable = true)
  6. |-- number_of_voters: double (nullable = true)
  7. |-- status: string (nullable = true)
  1. scala> finalDF.show(false)
  2. +----------+--------------+-----------------------+----------------+------+
  3. |compliance|average_rating|number_of_recent_voters|number_of_voters|status|
  4. +----------+--------------+-----------------------+----------------+------+
  5. |true |4.7 |null |null |PASS |
  6. |true |null |null |254.0 |PASS |
  7. |true |null |254.0 |null |PASS |
  8. +----------+--------------+-----------------------+----------------+------+
英文:

You can use fron_json with schema & below is sample solution.

  1. scala&gt; val data = List( (true, &quot;&quot;&quot;[{&quot;average_rating&quot;:4.7,&quot;number_of_recent_voters&quot;:null,&quot;number_of_voters&quot;:null,&quot;status&quot;:&quot;PASS&quot;},{&quot;average_rating&quot;:null,&quot;number_of_recent_voters&quot;:null,&quot;number_of_voters&quot;:254,&quot;status&quot;:&quot;PASS&quot;},{&quot;average_rating&quot;:null,&quot;number_of_recent_voters&quot;:254,&quot;number_of_voters&quot;:null,&quot;status&quot;:&quot;PASS&quot;}]&quot;&quot;&quot;) ).toDF(&quot;compliance&quot;,&quot;rating&quot;)
  2. data: org.apache.spark.sql.DataFrame = [compliance: boolean, rating: string]
  3. scala&gt; data.show(false)
  4. +----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  5. |compliance|rating |
  6. +----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  7. |true |[{&quot;average_rating&quot;:4.7,&quot;number_of_recent_voters&quot;:null,&quot;number_of_voters&quot;:null,&quot;status&quot;:&quot;PASS&quot;},{&quot;average_rating&quot;:null,&quot;number_of_recent_voters&quot;:null,&quot;number_of_voters&quot;:254,&quot;status&quot;:&quot;PASS&quot;},{&quot;average_rating&quot;:null,&quot;number_of_recent_voters&quot;:254,&quot;number_of_voters&quot;:null,&quot;status&quot;:&quot;PASS&quot;}]|
  8. +----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  1. scala&gt; data.printSchema
  2. root
  3. |-- compliance: boolean (nullable = false)
  4. |-- rating: string (nullable = true)
  1. scala&gt; val finalDF = data.selectExpr(&quot;compliance&quot;, &quot;&quot;&quot;inline_outer(from_json(rating, &#39;ARRAY&lt;STRUCT&lt;average_rating DOUBLE, number_of_recent_voters DOUBLE, number_of_voters DOUBLE, status STRING&gt;&gt;&#39;))&quot;&quot;&quot;)
  2. finalDF: org.apache.spark.sql.DataFrame = [compliance: boolean, average_rating: double ... 3 more fields]
  1. scala&gt; finalDF.printSchema
  2. root
  3. |-- compliance: boolean (nullable = false)
  4. |-- average_rating: double (nullable = true)
  5. |-- number_of_recent_voters: double (nullable = true)
  6. |-- number_of_voters: double (nullable = true)
  7. |-- status: string (nullable = true)
  1. scala&gt; finalDF.show(false)
  2. +----------+--------------+-----------------------+----------------+------+
  3. |compliance|average_rating|number_of_recent_voters|number_of_voters|status|
  4. +----------+--------------+-----------------------+----------------+------+
  5. |true |4.7 |null |null |PASS |
  6. |true |null |null |254.0 |PASS |
  7. |true |null |254.0 |null |PASS |
  8. +----------+--------------+-----------------------+----------------+------+

huangapple
  • 本文由 发表于 2023年8月9日 11:47:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/76864443-2.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定