英文:
How to convert JSON object as a value in a column in SPARK AZURE-DATABRICKS using SCALA as per requirement
问题
同学们,有人能帮我解决下面的问题吗:
我有一个 Spark 数据框,其中一个列的值是以 JSON 格式呈现的。请参考下面的截图:
[![列 - 评分][1]][1]
[1]: https://i.stack.imgur.com/tWovr.png
我的需求如下:
1. 我需要选择每一行中的 3 个对象中不为空的 "average_rating" 属性。如果没有,则取空值。我需要将此值添加到名为 "Average_Rating" 的列中。我需要从具有非空 "average_rating" 的对象中选择 "STATUS" 并将该值添加到名为 "average_rating_status" 的列中。
2. 我需要选择每一行中的 3 个对象中不为空的 "number_of_recent_voters" 属性。如果没有,则取空值。我需要将此值添加到名为 "number_of_recent_voters" 的列中。我需要从具有非空 "number_of_recent_voters" 的对象中选择 "STATUS" 并将该值添加到名为 "number_of_recent_voters_status" 的列中。
3. 我需要选择每一行中的 3 个对象中不为空的 "number_of_voters" 属性。如果没有,则取空值。我需要将此值添加到名为 "number_of_voters" 的列中。我需要从具有非空 "number_of_voters" 的对象中选择 "STATUS" 并将该值添加到名为 "number_of_voters_status" 的列中。
我需要在我的 Azure 数据工厂笔记本中用 Scala 编写代码。请问有人能帮忙提供代码吗?
谢谢!
编辑:
```python
+------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
| compliance | average_rating | number_of_recent_voters | number_of_voters | average_rating_status | number_of_recent_voters_status | number_of_voters_status |
+------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
| true | 4.7 | 254 | 254 | PASS | PASS | PASS |
+------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
输出应该如上所示。
英文:
Guys Can someone help me in resolving the below problem:
I have a spark dataframe in which one of the column's values is coming in JSON format. Please find the below screen shot for your reference:
My requirement is as follows:
- I have to select the "average_rating" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "Average_Rating". I have to select the "STATUS" from the object where "average_rating" is not null and add the value to a column "average_rating_status".
- I have to select the "number_of_recent_voters" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "number_of_recent_voters". I have to select the "STATUS" from the object where "number_of_recent_voters" is not null and add the value to a column "number_of_recent_voters_status".
- I have to select the "number_of_voters" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "number_of_voters". I have to select the "STATUS" from the object where "number_of_voters" is not null and add the value to a column "number_of_voters_status".
I have to write the code in scala in my Azure data bricks notebook. Can anyone please help with the code.
Thank you
Edit:
+------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
| compliance | average_rating | number_of_recent_voters | number_of_voters | average_rating_status | number_of_recent_voters_status | number_of_voters_status |
+------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
| true | 4.7 | 254 | 254 | PASS | PASS | PASS |
+------------+----------------+-------------------------+------------------+-----------------------+--------------------------------+-------------------------+
Output should come like above.
答案1
得分: 1
这段代码通过使用`row_number`函数和在`rating`字段上定义的窗口规范,向数据框中添加了一个`row_number`列。它使用`from_json`函数将`rating`列解析为一个结构数组,然后使用`inline_outer`函数将数组展开为单独的行。它按`row_number`列对生成的数据框进行分组,并计算每个组的`average_rating`、`number_of_recent_voters`和`number_of_voters`列的最大值。然后它将生成的数据框与原始数据框在`row_number`和`average_rating`列上进行连接,并从原始数据框中选择`status`列。它将`status`列重命名为`average_rating_status`。类似地,对于`number_of_recent_voters`和`number_of_voters`列也是如此,将`status`列分别重命名为`number_of_recent_voters_status`和`number_of_voters_status`。
最终得到的数据框`data6`包含所需的结果。
英文:
Code
val data = List(
(true, """[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"FAIL"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]"""),
(true, """[{"average_rating":2.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":123,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":324,"number_of_voters":null,"status":"PASS"}]""")
).toDF("compliance", "rating")
data.show(false)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{from_json, row_number}
val windowSpec = Window.orderBy("rating")
val data1 = data.withColumn("row_number", row_number.over(windowSpec))
val data2 = data1.selectExpr("compliance", "row_number", """inline_outer(from_json(rating, 'ARRAY<STRUCT<average_rating DOUBLE, number_of_recent_voters DOUBLE, number_of_voters DOUBLE, status STRING>>'))""")
data2.show()
val data3 = data2.groupBy("row_number").max("average_rating","number_of_recent_voters","number_of_voters").withColumnRenamed("max(average_rating)","average_rating").withColumnRenamed("max(number_of_recent_voters)","number_of_recent_voters").withColumnRenamed("max(number_of_voters)","number_of_voters")
data3.show(false)
val data4 = data3.join(data2, Seq("row_number", "average_rating"), "inner").select(data3.col("*"), data2.col("status")).withColumnRenamed("status","average_rating_status")
display(data4)
val data5 = data4.join(data2, Seq("row_number","number_of_recent_voters"),"inner").select(data4.col("*"),data2.col("status")).withColumnRenamed("status","number_of_recent_voters_status")
display(data5)
val data6 = data5.join(data2, Seq("row_number","number_of_voters"),"inner").select(data5.col("*"),data2.col("status")).withColumnRenamed("status","number_of_voters_status")
display(data6)
This code adds a row_number
column to the dataframe using the row_number
function and the window specification defined on rating
field. It uses the from_json
function to parse the rating
column as an array of structs, and then uses the inline_outer
function to explode the array into separate rows. It groups the resulting dataframe by the row_number
column and calculates the maximum value of the average_rating
, number_of_recent_voters
, and number_of_voters
columns for each group. It then joins the resulting dataframe with the original dataframe on the row_number
and average_rating
columns, and selects the status
column from the original dataframe. It renames the status
column to average_rating_status
.
Similarly, it is done for the number_of_recent_voters
and number_of_voters
columns, and renames the status
columns to number_of_recent_voters_status
and number_of_voters_status
, respectively.
The resulting dataframe
data6
has the required results.
答案2
得分: 0
scala> val data = List( (true, """[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]""" ) ).toDF("compliance","rating")
data: org.apache.spark.sql.DataFrame = [compliance: boolean, rating: string]
scala> data.show(false)
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|compliance|rating |
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|true |[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]|
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
scala> val finalDF = data.selectExpr("compliance", """inline_outer(from_json(rating, 'ARRAY<STRUCT<average_rating DOUBLE, number_of_recent_voters DOUBLE, number_of_voters DOUBLE, status STRING>>'))""")
finalDF: org.apache.spark.sql.DataFrame = [compliance: boolean, average_rating: double ... 3 more fields]
scala> finalDF.printSchema
root
|-- compliance: boolean (nullable = false)
|-- average_rating: double (nullable = true)
|-- number_of_recent_voters: double (nullable = true)
|-- number_of_voters: double (nullable = true)
|-- status: string (nullable = true)
scala> finalDF.show(false)
+----------+--------------+-----------------------+----------------+------+
|compliance|average_rating|number_of_recent_voters|number_of_voters|status|
+----------+--------------+-----------------------+----------------+------+
|true |4.7 |null |null |PASS |
|true |null |null |254.0 |PASS |
|true |null |254.0 |null |PASS |
+----------+--------------+-----------------------+----------------+------+
英文:
You can use fron_json
with schema & below is sample solution.
scala> val data = List( (true, """[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]""") ).toDF("compliance","rating")
data: org.apache.spark.sql.DataFrame = [compliance: boolean, rating: string]
scala> data.show(false)
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|compliance|rating |
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|true |[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]|
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
scala> data.printSchema
root
|-- compliance: boolean (nullable = false)
|-- rating: string (nullable = true)
scala> val finalDF = data.selectExpr("compliance", """inline_outer(from_json(rating, 'ARRAY<STRUCT<average_rating DOUBLE, number_of_recent_voters DOUBLE, number_of_voters DOUBLE, status STRING>>'))""")
finalDF: org.apache.spark.sql.DataFrame = [compliance: boolean, average_rating: double ... 3 more fields]
scala> finalDF.printSchema
root
|-- compliance: boolean (nullable = false)
|-- average_rating: double (nullable = true)
|-- number_of_recent_voters: double (nullable = true)
|-- number_of_voters: double (nullable = true)
|-- status: string (nullable = true)
scala> finalDF.show(false)
+----------+--------------+-----------------------+----------------+------+
|compliance|average_rating|number_of_recent_voters|number_of_voters|status|
+----------+--------------+-----------------------+----------------+------+
|true |4.7 |null |null |PASS |
|true |null |null |254.0 |PASS |
|true |null |254.0 |null |PASS |
+----------+--------------+-----------------------+----------------+------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论