将包含字典列表的Spark列拆分为字符串格式。

huangapple go评论66阅读模式
英文:

explode spark column containing list of dict in str format

问题

如何在pyspark中将此流数据帧转换为以下形式:

+--------------------+------+---+---+
|           timestamp|offset|num|cor|
+--------------------+------+---+---+
|2023-03-03 17:21:...|    10| 55| 32|
+--------------------+------+---+---+
|2023-03-03 17:21:...|    10| 14| 54|
+--------------------+------+---+---+
|2023-03-03 17:35:...|    11| 55| 98|
+--------------------+------+---+---+
|2023-03-03 17:35:...|    11| 32| 77|
+--------------------+------+---+---+

请注意,这是您的数据帧从原始形式转换为所需形式的示例。

英文:

How to convert this streaming dataframe in pyspark,

+--------------------+------+----------------------------------------------+
|           timestamp|offset|stringdecode(value, UTF-8)                    |
+--------------------+------+----------------------------------------------+
|2023-03-03 17:21:...|    10| "[{"num":55,"cor":32},{"num":14,"cor":54}]"  |
+--------------------+------+----------------------------------------------+
|2023-03-03 17:35:...|    11| "[{"num":55,"cor":98},{"num":32,"cor":77}]"  |
+--------------------+------+----------------------------------------------+

into this

+--------------------+------+---+---+
|           timestamp|offset|num|cor|
+--------------------+------+---+---+
|2023-03-03 17:21:...|    10| 55| 32|
+--------------------+------+---+---+ 
|2023-03-03 17:21:...|    10| 14| 54|
+--------------------+------+---+---+ 
|2023-03-03 17:35:...|    11| 55| 98|
+--------------------+------+---+---+ 
|2023-03-03 17:35:...|    11| 32| 77|
+--------------------+------+---+---+ 

stackoverflow is asking me to add text to post my question, but i don't see any need for this, hence this paragraph to solve the issue

答案1

得分: 1

仅翻译代码部分:

Just use [from_json][1] and expand the column

This would work:

sch = ArrayType(StructType([
    StructField("num", IntegerType()),
    StructField("cor", IntegerType())
]))

df1.withColumn("asArray", F.from_json("dict", sch))\
    .withColumn("asStruct", F.explode("asArray"))\
    .select(*[col for col in df1.schema.names if col != "dict"], "asStruct.*")\
    .show()

如果您有任何问题,请告诉我。

英文:

Just use from_json and expand the column

This would work:

sch=ArrayType(StructType([
        StructField("num", IntegerType()),
        StructField("cor", IntegerType())
]))    

df1.withColumn("asArray", F.from_json("dict", sch))\
    .withColumn("asStruct", F.explode("asArray"))\
    .select(*[col for col in df1.schema.names if col!="dict"], "asStruct.*")\
    .show()

Input:

+-------------------+------+-----------------------------------------+
|timestamp          |offset|dict                                     |
+-------------------+------+-----------------------------------------+
|2023-03-03 00:00:00|10    |[{"num":55,"cor":32},{"num":14,"cor":54}]|
+-------------------+------+-----------------------------------------+

Schema:

root
 |-- timestamp: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- dict: string (nullable = true)

Output:

+-------------------+------+---+---+
|          timestamp|offset|num|cor|
+-------------------+------+---+---+
|2023-03-03 00:00:00|    10| 55| 32|
|2023-03-03 00:00:00|    10| 14| 54|
+-------------------+------+---+---+

Let me know if you face any issue.

huangapple
  • 本文由 发表于 2023年3月4日 00:43:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75629735.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定