英文:
Spark sql - How to concate string rows after grouping by particular column
问题
我正在使用Java编写一个Spark应用程序。我遇到了一个问题,需要在按特定列分组的行之后将来自不同行的字符串连接起来。任何帮助都将不胜感激!谢谢。
输入数据集
预期输出数据集
英文:
I am writing an application in spark using java. I got into a problem where i have to concate string from different rows after grouping rows by particular column. Any help is appreciated !! Thanks.
Input dataset
Expected output dataset
答案1
得分: 2
使用collect_list
在分组后,然后使用concat_ws
函数将列表中的字符串连接起来。
df.show(false)
+--------------------------------------+------+---------------+---------------+----------------+-------+
|Errors |userid|associationtype|associationrank|associationvalue|sparkId|
+--------------------------------------+------+---------------+---------------+----------------+-------+
|主键约束冲突 |3 |Brand5 |错误 |李 |4 |
|associationrank中的数据类型不正确 |3 |Brand5 |错误 |李 |4 |
+--------------------------------------+------+---------------+---------------+----------------+-------+
df.groupBy("userid", "associationtype", "associationrank", "associationvalue", "sparkId")
.agg(collect_list("Errors").as("Errors"))
.withColumn("Errors", concat_ws(", ", col("Errors")))
.show(false)
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
|userid|associationtype|associationrank|associationvalue|sparkId|Errors |
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
|3 |Brand5 |错误 |李 |4 |主键约束冲突, associationrank中的数据类型不正确 |
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
英文:
Use collect_list
when you group by and then use concat_ws
function to make the string from the list.
df.show(false)
+--------------------------------------+------+---------------+---------------+----------------+-------+
|Errors |userid|associationtype|associationrank|associationvalue|sparkId|
+--------------------------------------+------+---------------+---------------+----------------+-------+
|Primary Key Constraint Violated |3 |Brand5 |error |Lee |4 |
|Incorrect datatype in associationrank|3 |Brand5 |error |Lee |4 |
+--------------------------------------+------+---------------+---------------+----------------+-------+
df.groupBy("userid", "associationtype", "associationrank", "associationvalue", "sparkId")
.agg(collect_list("Errors").as("Errors"))
.withColumn("Errors", concat_ws(", ", col("Errors")))
.show(false)
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
|userid|associationtype|associationrank|associationvalue|sparkId|Errors |
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
|3 |Brand5 |error |Lee |4 |Primary Key Constraint Violated, Incorrect datatype in associationrank|
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
答案2
得分: 1
请查看以下代码。
sdf
.groupBy("sparkid")
.agg(collect_set($"errors").as("error_list"), first(struct($"*")).as("data"))
.select($"data.*", concat_ws(",", $"error_list").as("errors_new"))
.show(false)
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
|errors |userid|associationtype|associationrank|associationvalue|sparkid|errors_new |
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
|Incorrect datatype in associationrank|8 |brand3 |dd |LeeNew |7 |Incorrect datatype in associationrank |
|Incorrect datatype in associationrank|4 |brand4 |null |Lee |3 |Incorrect datatype in associationrank |
|Incorrect datatype in associationrank|1 |brand1 |iuy |Lee |0 |Incorrect datatype in associationrank |
|Primary Key Constraint Violated |2 |brand1 |something |Lee |5 |Primary Key Constraint Violated,Incorrect datatype in associationrank|
|Primary Key Constraint Violated |2 |brand2 |22 |Lee |1 |Primary Key Constraint Violated |
|Primary Key Constraint Violated |3 |brand5 |error |Lee |4 |Primary Key Constraint Violated,Incorrect datatype in associationrank|
|Primary Key Constraint Violated |3 |brand3 |40 |LeeNew |2 |Primary Key Constraint Violated |
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
英文:
Check below code.
scala> sdf
.groupBy("sparkid")
.agg(collect_set($"errors").as("error_list"),first(struct($"*")).as("data"))
.select($"data.*",concat_ws(",",$"error_list").as("errors_new"))
.show(false)
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
|errors |userid|associationtype|associationrank|associationvalue|sparkid|errors_new |
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
|Incorrect datatype in associationrank|8 |brand3 |dd |LeeNew |7 |Incorrect datatype in associationrank |
|Incorrect datatype in associationrank|4 |brand4 |null |Lee |3 |Incorrect datatype in associationrank |
|Incorrect datatype in associationrank|1 |brand1 |iuy |Lee |0 |Incorrect datatype in associationrank |
|Primary Key Constraint Violated |2 |brand1 |something |Lee |5 |Primary Key Constraint Violated,Incorrect datatype in associationrank|
|Primary Key Constraint Violated |2 |brand2 |22 |Lee |1 |Primary Key Constraint Violated |
|Primary Key Constraint Violated |3 |brand5 |error |Lee |4 |Primary Key Constraint Violated,Incorrect datatype in associationrank|
|Primary Key Constraint Violated |3 |brand3 |40 |LeeNew |2 |Primary Key Constraint Violated |
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论