英文:
Flattening of Nested dataframe
问题
I have a multi-level nested dataframe like the one below:
DataFrame[date_time: timestamp, filename: string, label: string, description: string, feature_set: array<struct<direction:string,tStart:double,tEnd:double, features:array<structfield1:string,field2:string,field3:string,field4:string>>]
and its values are:
[[datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [['east', 78.23018987, 79.23010199, [['fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14']]], ['west', 78.23018987, 79.23010199, [['fld_val21', 'fld_val22', 'fld_val23', 'fld_val24']]], ['south', 78.23018987, 79.23010199, [['fld_val31', 'fld_val32', 'fld_val33', 'fld_val34']]]]]
I am trying to flatten it like this:
date_time | filename | label | description | feature_set_direction | feature_set_tStart | feature_set_tEnd | feature_set_features_Field1 | feature_set_features_Field2 | feature_set_features_Field3 | feature_set_features_Field4 |
---|---|---|---|---|---|---|---|---|---|---|
2022-08-24 13:47:47 | filename1 | label1 | description of file 1 | east | 78.230189787 | 79.23010199 | fld_val11 | fld_val12 | fld_Val13 | fld_Val14 |
I tried this code, but it's giving an error:
flat_df = df.select("date_time", "filename", "label", "description", "feature_set.*")
AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(feature_set)
.
I also tried other methods like val
, but I faced syntax errors.
Could you please suggest how to proceed? Thank you.
英文:
I have a multi level nested dataframe something like the below:
DataFrame[date_time: timestamp, filename: string, label: string, description: string,
feature_set: array<struct<direction:string,tStart:double,tEnd:double,
features:array<struct<field1:string,field2:string,field3:string,field4:string>>>>]
and its values are:
[[datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [['east', 78.23018987, 79.23010199, [['fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14']]], ['west', 78.23018987, 79.23010199, [['fld_val21', 'fld_val22', 'fld_val23', 'fld_val24']]], ['south', 78.23018987, 79.23010199, [['fld_val31', 'fld_val32', 'fld_val33', 'fld_val34']]]
root
|-- date_time: timestamp (nullable = true)
|-- filename: string (nullable = true)
|-- label: string (nullable = true)
|-- description: string (nullable = true)
|-- feature_set: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- direction: string (nullable = true)
| | |-- tStart: double (nullable = true)
| | |-- tEnd: double (nullable = true)
| | |-- features: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- field1: string (nullable = true)
| | | | |-- field2: string (nullable = true)
| | | | |-- field3: string (nullable = true)
| | | | |-- field4:string (nullable = true)
I am trying to flatten it in such a way that it should look like below:
-------------------+--------------------+--------------------+--------------------+--------------------+
| date_time| filename| label| description| feature_set_direction| feature_set_tStart| feature_set_tEnd| feature_set_features_Field1| feature_set_features_Field2| feature_set_features_Field3| feature_set_features_Field4|
+-------------------+--------------------+--------------------+--------------------+--------------------+
|2022-08-24 13:47:47|filename1|label1| description of file 1|east| 78.230189787|79.23010199| fld_val11| fld_val12| fld_Val13| fld_Val14|
+-------------------+--------------------+--------------------+--------------------+--------------------+
root
|-- date_time: timestamp (nullable = true)
|-- filename: string (nullable = true)
|-- label: string (nullable = true)
|-- description: string (nullable = true)
|-- feature_set: array (nullable = true)
|-- feature_set_element: struct (containsNull = true)
|-- feature_set_element_direction: string (nullable = true)
|-- feature_set_element_tStart: double (nullable = true)
|-- feature_set_element_tEnd: double (nullable = true)
|-- feature_set_element_features: array (nullable = true)
|-- feature_set_element_features_element: struct (containsNull = true)
|-- feature_set_element_features_element_field1: string (nullable = true)
|-- feature_set_element_features_element_field2: string (nullable = true)
|-- feature_set_element_features_element_field3: string (nullable = true)
|-- feature_set_element_features_element_field4:string (nullable = true)
I tried to flatten it with the help of the below code, but its throwing error:
flat_df = df.select("date_time", "filename", "label", "description", "feature_set.*")
> AnalysisException: Can only star expand struct data types. Attribute:
> ArrayBuffer(feature_set)
.
I also tried with couple of other methods like val, but quite unsuccessful.
val df2 = df.select(col("date_time"),
col("filename"),
col("label"),
col("description"),
col("feature_set"))
> SyntaxError: invalid syntax (, line 1) File <command-655369>:1
> val df2 = df.select(col("date_time")
Could anyone please suggest how to proceed with?
Thank you.
答案1
得分: 2
由于你正在处理数组,所以不能简单地选择子列:你需要先使用explode
函数。
这个代码块只是为了重新创建你的数据(你不应该这样做):
import datetime
from pyspark.sql.types import *
schema = StructType([
StructField("date_time", TimestampType(), True),
StructField("filename", StringType(), True),
StructField("label", StringType(), True),
StructField("description", StringType(), True),
StructField("feature_set",
ArrayType(
StructType([
StructField("direction", StringType(), True),
StructField("tStart", DoubleType(), True),
StructField("tEnd", DoubleType(), True),
StructField("features",
ArrayType(
StructType([
StructField("field1", StringType(), True),
StructField("field2", StringType(), True),
StructField("field3", StringType(), True),
StructField("field4", StringType(), True),
])
), True),
])
)
)
])
data = [ ( datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [ ( 'east', 78.23018987, 79.23010199, [ ( 'fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14') ]), ( 'west', 78.23018987, 79.23010199, [ ( 'fld_val21', 'fld_val22', 'fld_val23', 'fld_val24') ]), ( 'south', 78.23018987, 79.23010199, [ ( 'fld_val31', 'fld_val32', 'fld_val33', 'fld_val34') ]) ]) ]
df = spark.createDataFrame(data, schema)
现在我们有了数据,我们需要使用select
和explode
的组合来获得我们想要的架构:
from pyspark.sql import functions as F
flat_df = df \
.withColumn("feature_set", F.explode("feature_set")) \
.select("date_time", "filename", "label", "description", "feature_set.*") \
.withColumn("features", F.explode("features")) \
.select("date_time", "filename", "label", "description", "direction", "tStart", "tEnd", "features.*")
因此,正如你所看到的,我们对每个数组列进行了explode
操作,然后解包它们。通过这样做,你告诉Spark要为你的每个数组中的元素创建结果数据帧中的一行。
输出的架构可能不会完全与你要求的相同(例如,feature_set
数组不再存在),但这样可以更紧凑地表示数据,没有任何数据重复。结果的架构和数据帧如下所示:
>>> flat_df.printSchema()
root
|-- date_time: timestamp (nullable = true)
|-- filename: string (nullable = true)
|-- label: string (nullable = true)
|-- description: string (nullable = true)
|-- direction: string (nullable = true)
|-- tStart: double (nullable = true)
|-- tEnd: double (nullable = true)
|-- field1: string (nullable = true)
|-- field2: string (nullable = true)
|-- field3: string (nullable = true)
|-- field4: string (nullable = true)
>>> flat_df.show()
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
| date_time| filename| label| description|direction| tStart| tEnd| field1| field2| field3| field4|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
|2022-08-24 07:51:54|filename1|label1|description of fi...| east|78.23018987|79.23010199|fld_val11|fld_val12|fld_Val13|fld_Val14|
|2022-08-24 07:51:54|filename1|label1|description of fi...| west|78.23018987|79.23010199|fld_val21|fld_val22|fld_val23|fld_val24|
|2022-08-24 07:51:54|filename1|label1|description of fi...| south|78.23018987|79.23010199|fld_val31|fld_val32|fld_val33|fld_val34|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
英文:
Since you're working with arrays, you can't simply select the subcolumns: you'll need to use explode
first.
This code block is just to recreate your data (You shouldn't have to do this):
import datetime
from pyspark.sql.types import *
schema = StructType([
StructField("date_time", TimestampType(), True),
StructField("filename", StringType(), True),
StructField("label", StringType(), True),
StructField("description", StringType(), True),
StructField("feature_set",
ArrayType(
StructType([
StructField("direction", StringType(), True),
StructField("tStart", DoubleType(), True),
StructField("tEnd", DoubleType(), True),
StructField("features",
ArrayType(
StructType([
StructField("field1", StringType(), True),
StructField("field2", StringType(), True),
StructField("field3", StringType(), True),
StructField("field4", StringType(), True),
])
), True),
])
)
)
])
data = [ ( datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [ ( 'east', 78.23018987, 79.23010199, [ ( 'fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14') ]), ( 'west', 78.23018987, 79.23010199, [ ( 'fld_val21', 'fld_val22', 'fld_val23', 'fld_val24') ]), ( 'south', 78.23018987, 79.23010199, [ ( 'fld_val31', 'fld_val32', 'fld_val33', 'fld_val34') ]) ]) ]
df = spark.createDataFrame(data, schema)
Now that we have the data, we need to use a combination of select
and explode
to get the schema we want:
from pyspark.sql import functions as F
flat_df = df \
.withColumn("feature_set", F.explode("feature_set")) \
.select("date_time", "filename", "label", "description", "feature_set.*") \
.withColumn("features", F.explode("features")) \
.select("date_time", "filename", "label", "description", "direction", "tStart", "tEnd", "features.*")
So as you see, we're explode
ing each array column and unwrapping them afterwards. Like this, you're telling Spark to make a row in the resulting dataframe for each element in the arrays you have.
The output schema won't exactly be the same as what you're asking (the feature_set
array is not there anymore e.g.) but this makes a more compact version without any data duplication. The resulting schema and dataframe look like this:
>>> flat_df.printSchema()
root
|-- date_time: timestamp (nullable = true)
|-- filename: string (nullable = true)
|-- label: string (nullable = true)
|-- description: string (nullable = true)
|-- direction: string (nullable = true)
|-- tStart: double (nullable = true)
|-- tEnd: double (nullable = true)
|-- field1: string (nullable = true)
|-- field2: string (nullable = true)
|-- field3: string (nullable = true)
|-- field4: string (nullable = true)
>>> flat_df.show()
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
| date_time| filename| label| description|direction| tStart| tEnd| field1| field2| field3| field4|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
|2022-08-24 07:51:54|filename1|label1|description of fi...| east|78.23018987|79.23010199|fld_val11|fld_val12|fld_Val13|fld_Val14|
|2022-08-24 07:51:54|filename1|label1|description of fi...| west|78.23018987|79.23010199|fld_val21|fld_val22|fld_val23|fld_val24|
|2022-08-24 07:51:54|filename1|label1|description of fi...| south|78.23018987|79.23010199|fld_val31|fld_val32|fld_val33|fld_val34|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论