分组 Spark 数据框并将聚合数据转换为字符串。

huangapple go评论95阅读模式
英文:

GroupBy Spark Dataframe and manipulate aggregated data as string

问题

转换正在AWS Glue Spark作业中进行。在下面的示例中,我按“item_guid”和“item_name”对行进行分组,并将“option”列聚合到集合集中。集合集是一个数组,但稍后我需要将其映射到Postgres数据库,并将该数组转换为字符串。因此,以下代码将选项转换为逗号分隔的字符串。

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))

但是,对于具有文本[]类型的选项列的Postgres,字符串必须用花括号括起来,看起来应该是这样的:

{90000,86000,81000}

问题是:在转换的最后一步中,如何将选项值转换为包含在花括号中的“{90000,86000,81000}”字符串?这似乎是一个简单的技巧,但我找不到一个优雅的解决方案来解决它。

示例代码如下:

from pyspark.sql.functions import collect_list, collect_set, concat_ws, col

simpleData = [("001","1122","YPIA_PROD",90000),
    ("002","1122","YPIA_PROD",86000),
    ("003","1122","YPIA_PROD",81000),
    ("004","1122","YPIA_ABC",90000),
    ("005","1133","YPIA_PROD",99000),
    ("006","1133","YPIA_PROD",83000),
    ("007","1144","YPIA_PROD",79000),
    ("008","1144","YPIA_PROD",80000),
    ("009","1144","YPIA_ABC",91000)
]

rrd = spark.sparkContext.parallelize(simpleData)
df = rrd.toDF(["id","item_guid","item_name","option"])
df.show()

grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
grouped_df.show()
array_to_string_df.show()

DF显示输出:

+---+----------+---------+------+
| id| item_guid|item_name|option|
+---+----------+---------+------+
|001|      1122|YPIA_PROD| 90000|
|002|      1122|YPIA_PROD| 86000|
|003|      1122|YPIA_PROD| 81000|
|004|      1122| YPIA_ABC| 90000|
|005|      1133|YPIA_PROD| 99000|
|006|      1133|YPIA_PROD| 83000|
|007|      1144|YPIA_PROD| 79000|
|008|      1144|YPIA_PROD| 80000|
|009|      1144| YPIA_ABC| 91000|
+---+----------+---------+------+

+----------+---------+--------------------+
| item_guid|item_name|              option|
+----------+---------+--------------------+
|      1133|YPIA_PROD|      [83000, 99000]|
|      1122|YPIA_PROD|[90000, 86000, 81000]|
|      1122| YPIA_ABC|             [90000]|
|      1144|YPIA_PROD|      [79000, 80000]|
|      1144| YPIA_ABC|             [91000]|
+----------+---------+--------------------+

+----------+---------+-----------------+
|item_guid |item_name|           option|
+----------+---------+-----------------+
|      1133|YPIA_PROD|      83000,99000|
|      1122|YPIA_PROD|90000,86000,81000|
|      1122| YPIA_ABC|            90000|
|      1144|YPIA_PROD|      79000,80000|
|      1144| YPIA_ABC|            91000|
+----------+---------+-----------------+
英文:

The tranfformation is happening in AWS Glue Spark job.
In the example below I group rows by “item_guid” and “item_name” and aggregate “option” column into a collection set. A collection set is an array, however later I will need to map it to a Postgres database and I need to make that array into a string.
Thus,

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option"))) 

will transform options into comma separated strings.
However, for the Postgres which column for options has type text[], the string must be enclosed into curly braces and should look like:
{90000,86000,81000}

The question: How can I in the latest step of transformation make options value into “{90000,86000,81000}” enclosed string?
It seems like a simple trick but I couln't come up with an elegant solution to solve it.

Code Example:

from pyspark.sql.functions import collect_list, collect_set, concat_ws, col, lit

simpleData = [("001","1122","YPIA_PROD",90000),
    ("002","1122","YPIA_PROD",86000),
    ("003","1122","YPIA_PROD",81000),
    ("004","1122","YPIA_ABC",90000),
    ("005","1133","YPIA_PROD",99000),
    ("006","1133","YPIA_PROD",83000),
    ("007","1144","YPIA_PROD",79000),
    ("008","1144","YPIA_PROD",80000),
    ("009","1144","YPIA_ABC",91000)
]

rrd = spark.sparkContext.parallelize(simpleData)
df = rrd.toDF(["id","item_guid","item_name","option"])
df.show()

grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
grouped_df.show()
array_to_string_df.show()

DF show output:

+---+----------+---------+------+
| id| item_guid|item_name|option|
+---+----------+---------+------+
|001|      1122|YPIA_PROD| 90000|
|002|      1122|YPIA_PROD| 86000|
|003|      1122|YPIA_PROD| 81000|
|004|      1122| YPIA_ABC| 90000|
|005|      1133|YPIA_PROD| 99000|
|006|      1133|YPIA_PROD| 83000|
|007|      1144|YPIA_PROD| 79000|
|008|      1144|YPIA_PROD| 80000|
|009|      1144| YPIA_ABC| 91000|
+---+----------+---------+------+

+----------+---------+--------------------+
| item_guid|item_name|              option|
+----------+---------+--------------------+
|      1133|YPIA_PROD|      [83000, 99000]|
|      1122|YPIA_PROD|[90000, 86000, 81...|
|      1122| YPIA_ABC|             [90000]|
|      1144|YPIA_PROD|      [79000, 80000]|
|      1144| YPIA_ABC|             [91000]|
+----------+---------+--------------------+

+----------+---------+-----------------+
|item_guid |item_name|           option|
+----------+---------+-----------------+
|      1133|YPIA_PROD|      83000,99000|
|      1122|YPIA_PROD|90000,86000,81000|
|      1122| YPIA_ABC|            90000|
|      1144|YPIA_PROD|      79000,80000|
|      1144| YPIA_ABC|            91000|
+----------+---------+-----------------+

答案1

得分: 1

from pyspark.sql.functions import collect_list, collect_set, concat, concat_ws, col, lit

simpleData = [
    ("001", "1122", "YPIA_PROD", 90000),
    ("002", "1122", "YPIA_PROD", 86000),
    ("003", "1122", "YPIA_PROD", 81000),
    ("004", "1122", "YPIA_ABC", 90000),
    ("005", "1133", "YPIA_PROD", 99000),
    ("006", "1133", "YPIA_PROD", 83000),
    ("007", "1144", "YPIA_PROD", 79000),
    ("008", "1144", "YPIA_PROD", 80000),
    ("009", "1144", "YPIA_ABC", 91000)
]

schema = ["id", "item_guid", "item_name", "option"]
df = spark.createDataFrame(data=simpleData, schema=schema)

grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(",", col("option"))).select(
    col("item_guid"),
    col("item_name"),
    concat(lit("{"), col("option"), lit("}")).alias("option")
)

array_to_string_df.show()
英文:
from pyspark.sql.functions import collect_list, collect_set,concat, concat_ws, 
col, lit 

simpleData = [("001","1122","YPIA_PROD",90000),
    ("002","1122","YPIA_PROD",86000),
    ("003","1122","YPIA_PROD",81000),
    ("004","1122","YPIA_ABC",90000),
    ("005","1133","YPIA_PROD",99000),
    ("006","1133","YPIA_PROD",83000),
    ("007","1144","YPIA_PROD",79000),
    ("008","1144","YPIA_PROD",80000),
    ("009","1144","YPIA_ABC",91000)
]

schema = ["id","item_guid","item_name","option"]
df = spark.createDataFrame(data=simpleData, schema = schema)
#df.printSchema()
df.show(truncate=False)
grouped_df = df.groupby("item_guid", 
"item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")) ).select(col("item_guid"), col("item_name"), concat(lit("{"), col("option"), lit("}")).alias("option"))   

array_to_string_df.show()

答案2

得分: 0

concat('{', substring(aggregate(option, '', (acc, cur) -> concat_ws(',', acc, cur)) from 2), '}')

works using Functions.expr() or the equivalent Column API usage.

英文:
concat('{', substring( aggregate(  option, '', (acc, cur) -> concat_ws(',',acc, cur) ) from 2),'}')

works using Functions.expr() or the equivalent Column api usage.

huangapple
  • 本文由 发表于 2023年7月31日 20:06:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/76803447.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定