英文:
explode spark column containing list of dict in str format
问题
如何在pyspark中将此流数据帧转换为以下形式:
+--------------------+------+---+---+
| timestamp|offset|num|cor|
+--------------------+------+---+---+
|2023-03-03 17:21:...| 10| 55| 32|
+--------------------+------+---+---+
|2023-03-03 17:21:...| 10| 14| 54|
+--------------------+------+---+---+
|2023-03-03 17:35:...| 11| 55| 98|
+--------------------+------+---+---+
|2023-03-03 17:35:...| 11| 32| 77|
+--------------------+------+---+---+
请注意,这是您的数据帧从原始形式转换为所需形式的示例。
英文:
How to convert this streaming dataframe in pyspark,
+--------------------+------+----------------------------------------------+
| timestamp|offset|stringdecode(value, UTF-8) |
+--------------------+------+----------------------------------------------+
|2023-03-03 17:21:...| 10| "[{"num":55,"cor":32},{"num":14,"cor":54}]" |
+--------------------+------+----------------------------------------------+
|2023-03-03 17:35:...| 11| "[{"num":55,"cor":98},{"num":32,"cor":77}]" |
+--------------------+------+----------------------------------------------+
into this
+--------------------+------+---+---+
| timestamp|offset|num|cor|
+--------------------+------+---+---+
|2023-03-03 17:21:...| 10| 55| 32|
+--------------------+------+---+---+
|2023-03-03 17:21:...| 10| 14| 54|
+--------------------+------+---+---+
|2023-03-03 17:35:...| 11| 55| 98|
+--------------------+------+---+---+
|2023-03-03 17:35:...| 11| 32| 77|
+--------------------+------+---+---+
stackoverflow is asking me to add text to post my question, but i don't see any need for this, hence this paragraph to solve the issue
答案1
得分: 1
仅翻译代码部分:
Just use [from_json][1] and expand the column
This would work:
sch = ArrayType(StructType([
StructField("num", IntegerType()),
StructField("cor", IntegerType())
]))
df1.withColumn("asArray", F.from_json("dict", sch))\
.withColumn("asStruct", F.explode("asArray"))\
.select(*[col for col in df1.schema.names if col != "dict"], "asStruct.*")\
.show()
如果您有任何问题,请告诉我。
英文:
Just use from_json and expand the column
This would work:
sch=ArrayType(StructType([
StructField("num", IntegerType()),
StructField("cor", IntegerType())
]))
df1.withColumn("asArray", F.from_json("dict", sch))\
.withColumn("asStruct", F.explode("asArray"))\
.select(*[col for col in df1.schema.names if col!="dict"], "asStruct.*")\
.show()
Input:
+-------------------+------+-----------------------------------------+
|timestamp |offset|dict |
+-------------------+------+-----------------------------------------+
|2023-03-03 00:00:00|10 |[{"num":55,"cor":32},{"num":14,"cor":54}]|
+-------------------+------+-----------------------------------------+
Schema:
root
|-- timestamp: string (nullable = true)
|-- offset: string (nullable = true)
|-- dict: string (nullable = true)
Output:
+-------------------+------+---+---+
| timestamp|offset|num|cor|
+-------------------+------+---+---+
|2023-03-03 00:00:00| 10| 55| 32|
|2023-03-03 00:00:00| 10| 14| 54|
+-------------------+------+---+---+
Let me know if you face any issue.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论