英文:
GroupBy Spark Dataframe and manipulate aggregated data as string
问题
转换正在AWS Glue Spark作业中进行。在下面的示例中,我按“item_guid”和“item_name”对行进行分组,并将“option”列聚合到集合集中。集合集是一个数组,但稍后我需要将其映射到Postgres数据库,并将该数组转换为字符串。因此,以下代码将选项转换为逗号分隔的字符串。
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
但是,对于具有文本[]类型的选项列的Postgres,字符串必须用花括号括起来,看起来应该是这样的:
{90000,86000,81000}
问题是:在转换的最后一步中,如何将选项值转换为包含在花括号中的“{90000,86000,81000}”字符串?这似乎是一个简单的技巧,但我找不到一个优雅的解决方案来解决它。
示例代码如下:
from pyspark.sql.functions import collect_list, collect_set, concat_ws, col
simpleData = [("001","1122","YPIA_PROD",90000),
("002","1122","YPIA_PROD",86000),
("003","1122","YPIA_PROD",81000),
("004","1122","YPIA_ABC",90000),
("005","1133","YPIA_PROD",99000),
("006","1133","YPIA_PROD",83000),
("007","1144","YPIA_PROD",79000),
("008","1144","YPIA_PROD",80000),
("009","1144","YPIA_ABC",91000)
]
rrd = spark.sparkContext.parallelize(simpleData)
df = rrd.toDF(["id","item_guid","item_name","option"])
df.show()
grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
grouped_df.show()
array_to_string_df.show()
DF显示输出:
+---+----------+---------+------+
| id| item_guid|item_name|option|
+---+----------+---------+------+
|001| 1122|YPIA_PROD| 90000|
|002| 1122|YPIA_PROD| 86000|
|003| 1122|YPIA_PROD| 81000|
|004| 1122| YPIA_ABC| 90000|
|005| 1133|YPIA_PROD| 99000|
|006| 1133|YPIA_PROD| 83000|
|007| 1144|YPIA_PROD| 79000|
|008| 1144|YPIA_PROD| 80000|
|009| 1144| YPIA_ABC| 91000|
+---+----------+---------+------+
+----------+---------+--------------------+
| item_guid|item_name| option|
+----------+---------+--------------------+
| 1133|YPIA_PROD| [83000, 99000]|
| 1122|YPIA_PROD|[90000, 86000, 81000]|
| 1122| YPIA_ABC| [90000]|
| 1144|YPIA_PROD| [79000, 80000]|
| 1144| YPIA_ABC| [91000]|
+----------+---------+--------------------+
+----------+---------+-----------------+
|item_guid |item_name| option|
+----------+---------+-----------------+
| 1133|YPIA_PROD| 83000,99000|
| 1122|YPIA_PROD|90000,86000,81000|
| 1122| YPIA_ABC| 90000|
| 1144|YPIA_PROD| 79000,80000|
| 1144| YPIA_ABC| 91000|
+----------+---------+-----------------+
英文:
The tranfformation is happening in AWS Glue Spark job.
In the example below I group rows by “item_guid” and “item_name” and aggregate “option” column into a collection set. A collection set is an array, however later I will need to map it to a Postgres database and I need to make that array into a string.
Thus,
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
will transform options into comma separated strings.
However, for the Postgres which column for options has type text[], the string must be enclosed into curly braces and should look like:
{90000,86000,81000}
The question: How can I in the latest step of transformation make options value into “{90000,86000,81000}” enclosed string?
It seems like a simple trick but I couln't come up with an elegant solution to solve it.
Code Example:
from pyspark.sql.functions import collect_list, collect_set, concat_ws, col, lit
simpleData = [("001","1122","YPIA_PROD",90000),
("002","1122","YPIA_PROD",86000),
("003","1122","YPIA_PROD",81000),
("004","1122","YPIA_ABC",90000),
("005","1133","YPIA_PROD",99000),
("006","1133","YPIA_PROD",83000),
("007","1144","YPIA_PROD",79000),
("008","1144","YPIA_PROD",80000),
("009","1144","YPIA_ABC",91000)
]
rrd = spark.sparkContext.parallelize(simpleData)
df = rrd.toDF(["id","item_guid","item_name","option"])
df.show()
grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
grouped_df.show()
array_to_string_df.show()
DF show output:
+---+----------+---------+------+
| id| item_guid|item_name|option|
+---+----------+---------+------+
|001| 1122|YPIA_PROD| 90000|
|002| 1122|YPIA_PROD| 86000|
|003| 1122|YPIA_PROD| 81000|
|004| 1122| YPIA_ABC| 90000|
|005| 1133|YPIA_PROD| 99000|
|006| 1133|YPIA_PROD| 83000|
|007| 1144|YPIA_PROD| 79000|
|008| 1144|YPIA_PROD| 80000|
|009| 1144| YPIA_ABC| 91000|
+---+----------+---------+------+
+----------+---------+--------------------+
| item_guid|item_name| option|
+----------+---------+--------------------+
| 1133|YPIA_PROD| [83000, 99000]|
| 1122|YPIA_PROD|[90000, 86000, 81...|
| 1122| YPIA_ABC| [90000]|
| 1144|YPIA_PROD| [79000, 80000]|
| 1144| YPIA_ABC| [91000]|
+----------+---------+--------------------+
+----------+---------+-----------------+
|item_guid |item_name| option|
+----------+---------+-----------------+
| 1133|YPIA_PROD| 83000,99000|
| 1122|YPIA_PROD|90000,86000,81000|
| 1122| YPIA_ABC| 90000|
| 1144|YPIA_PROD| 79000,80000|
| 1144| YPIA_ABC| 91000|
+----------+---------+-----------------+
答案1
得分: 1
from pyspark.sql.functions import collect_list, collect_set, concat, concat_ws, col, lit
simpleData = [
("001", "1122", "YPIA_PROD", 90000),
("002", "1122", "YPIA_PROD", 86000),
("003", "1122", "YPIA_PROD", 81000),
("004", "1122", "YPIA_ABC", 90000),
("005", "1133", "YPIA_PROD", 99000),
("006", "1133", "YPIA_PROD", 83000),
("007", "1144", "YPIA_PROD", 79000),
("008", "1144", "YPIA_PROD", 80000),
("009", "1144", "YPIA_ABC", 91000)
]
schema = ["id", "item_guid", "item_name", "option"]
df = spark.createDataFrame(data=simpleData, schema=schema)
grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(",", col("option"))).select(
col("item_guid"),
col("item_name"),
concat(lit("{"), col("option"), lit("}")).alias("option")
)
array_to_string_df.show()
英文:
from pyspark.sql.functions import collect_list, collect_set,concat, concat_ws,
col, lit
simpleData = [("001","1122","YPIA_PROD",90000),
("002","1122","YPIA_PROD",86000),
("003","1122","YPIA_PROD",81000),
("004","1122","YPIA_ABC",90000),
("005","1133","YPIA_PROD",99000),
("006","1133","YPIA_PROD",83000),
("007","1144","YPIA_PROD",79000),
("008","1144","YPIA_PROD",80000),
("009","1144","YPIA_ABC",91000)
]
schema = ["id","item_guid","item_name","option"]
df = spark.createDataFrame(data=simpleData, schema = schema)
#df.printSchema()
df.show(truncate=False)
grouped_df = df.groupby("item_guid",
"item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")) ).select(col("item_guid"), col("item_name"), concat(lit("{"), col("option"), lit("}")).alias("option"))
array_to_string_df.show()
答案2
得分: 0
concat('{', substring(aggregate(option, '', (acc, cur) -> concat_ws(',', acc, cur)) from 2), '}')
works using Functions.expr() or the equivalent Column API usage.
英文:
concat('{', substring( aggregate( option, '', (acc, cur) -> concat_ws(',',acc, cur) ) from 2),'}')
works using Functions.expr() or the equivalent Column api usage.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论