“嵌套数据框的扁平化”

huangapple go评论56阅读模式
英文:

Flattening of Nested dataframe

问题

I have a multi-level nested dataframe like the one below:

DataFrame[date_time: timestamp, filename: string, label: string, description: string, feature_set: array<struct<direction:string,tStart:double,tEnd:double, features:array<structfield1:string,field2:string,field3:string,field4:string>>]

and its values are:

[[datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [['east', 78.23018987, 79.23010199, [['fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14']]], ['west', 78.23018987, 79.23010199, [['fld_val21', 'fld_val22', 'fld_val23', 'fld_val24']]], ['south', 78.23018987, 79.23010199, [['fld_val31', 'fld_val32', 'fld_val33', 'fld_val34']]]]]

I am trying to flatten it like this:

date_time filename label description feature_set_direction feature_set_tStart feature_set_tEnd feature_set_features_Field1 feature_set_features_Field2 feature_set_features_Field3 feature_set_features_Field4
2022-08-24 13:47:47 filename1 label1 description of file 1 east 78.230189787 79.23010199 fld_val11 fld_val12 fld_Val13 fld_Val14

I tried this code, but it's giving an error:

flat_df = df.select("date_time", "filename", "label", "description", "feature_set.*")

AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(feature_set).

I also tried other methods like val, but I faced syntax errors.

Could you please suggest how to proceed? Thank you.

英文:

I have a multi level nested dataframe something like the below:

DataFrame[date_time: timestamp, filename: string, label: string, description: string, 
feature_set: array&lt;struct&lt;direction:string,tStart:double,tEnd:double, 
features:array&lt;struct&lt;field1:string,field2:string,field3:string,field4:string&gt;&gt;&gt;&gt;]

and its values are:

[[datetime.datetime(2022, 8, 24, 7, 51, 54), &#39;filename1&#39;, &#39;label1&#39;, &#39;description of file 1&#39;, [[&#39;east&#39;, 78.23018987, 79.23010199, [[&#39;fld_val11&#39;, &#39;fld_val12&#39;, &#39;fld_Val13&#39;, &#39;fld_Val14&#39;]]], [&#39;west&#39;, 78.23018987, 79.23010199, [[&#39;fld_val21&#39;, &#39;fld_val22&#39;, &#39;fld_val23&#39;, &#39;fld_val24&#39;]]], [&#39;south&#39;, 78.23018987, 79.23010199, [[&#39;fld_val31&#39;, &#39;fld_val32&#39;, &#39;fld_val33&#39;, &#39;fld_val34&#39;]]]


root
 |-- date_time: timestamp (nullable = true)
 |-- filename: string (nullable = true)
 |-- label: string (nullable = true)
 |-- description: string (nullable = true)
 |-- feature_set: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- direction: string (nullable = true)
 |    |    |-- tStart: double (nullable = true)
 |    |    |-- tEnd: double (nullable = true)
 |    |    |-- features: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- field1: string (nullable = true)
 |    |    |    |    |-- field2: string (nullable = true)
 |    |    |    |    |-- field3: string (nullable = true)
 |    |    |    |    |-- field4:string (nullable = true)

I am trying to flatten it in such a way that it should look like below:

-------------------+--------------------+--------------------+--------------------+--------------------+
|          date_time|            filename|               label|         description| feature_set_direction| feature_set_tStart| feature_set_tEnd| feature_set_features_Field1| feature_set_features_Field2| feature_set_features_Field3| feature_set_features_Field4| 
+-------------------+--------------------+--------------------+--------------------+--------------------+

|2022-08-24 13:47:47|filename1|label1| description of file 1|east| 78.230189787|79.23010199| fld_val11| fld_val12| fld_Val13| fld_Val14|

+-------------------+--------------------+--------------------+--------------------+--------------------+
root
 |-- date_time: timestamp (nullable = true)
 |-- filename: string (nullable = true)
 |-- label: string (nullable = true)
 |-- description: string (nullable = true)
 |-- feature_set: array (nullable = true)
 |-- feature_set_element: struct (containsNull = true)
 |-- feature_set_element_direction: string (nullable = true)
 |-- feature_set_element_tStart: double (nullable = true)
 |-- feature_set_element_tEnd: double (nullable = true)
 |-- feature_set_element_features: array (nullable = true)
 |-- feature_set_element_features_element: struct (containsNull = true)
 |-- feature_set_element_features_element_field1: string (nullable = true)
 |-- feature_set_element_features_element_field2: string (nullable = true)
 |-- feature_set_element_features_element_field3: string (nullable = true)
 |-- feature_set_element_features_element_field4:string (nullable = true)

I tried to flatten it with the help of the below code, but its throwing error:

flat_df = df.select(&quot;date_time&quot;, &quot;filename&quot;, &quot;label&quot;, &quot;description&quot;, &quot;feature_set.*&quot;)

> AnalysisException: Can only star expand struct data types. Attribute:
> ArrayBuffer(feature_set).

I also tried with couple of other methods like val, but quite unsuccessful.

val df2 = df.select(col(&quot;date_time&quot;),
                                    col(&quot;filename&quot;),
                                    col(&quot;label&quot;),
                                    col(&quot;description&quot;),
                                    col(&quot;feature_set&quot;))

> SyntaxError: invalid syntax (, line 1) File <command-655369>:1
> val df2 = df.select(col("date_time")

Could anyone please suggest how to proceed with?

Thank you.

答案1

得分: 2

由于你正在处理数组,所以不能简单地选择子列:你需要先使用explode函数。

这个代码块只是为了重新创建你的数据(你不应该这样做):

import datetime
from pyspark.sql.types import *

schema = StructType([
    StructField("date_time", TimestampType(), True),
    StructField("filename", StringType(), True),
    StructField("label", StringType(), True),
    StructField("description", StringType(), True),
    StructField("feature_set",
        ArrayType(
            StructType([
                StructField("direction", StringType(), True),
                StructField("tStart", DoubleType(), True),
                StructField("tEnd", DoubleType(), True),
                StructField("features",
                    ArrayType(
                        StructType([
                            StructField("field1", StringType(), True),
                            StructField("field2", StringType(), True),
                            StructField("field3", StringType(), True),
                            StructField("field4", StringType(), True),
                        ])
                    ), True),
            ])
        )
    )
])

data = [ ( datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [ ( 'east', 78.23018987, 79.23010199, [ ( 'fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14') ]), ( 'west', 78.23018987, 79.23010199, [ ( 'fld_val21', 'fld_val22', 'fld_val23', 'fld_val24') ]), ( 'south', 78.23018987, 79.23010199, [ ( 'fld_val31', 'fld_val32', 'fld_val33', 'fld_val34') ]) ]) ]

df = spark.createDataFrame(data, schema)

现在我们有了数据,我们需要使用selectexplode的组合来获得我们想要的架构:

from pyspark.sql import functions as F

flat_df = df \
  .withColumn("feature_set", F.explode("feature_set")) \
  .select("date_time", "filename", "label", "description", "feature_set.*") \
  .withColumn("features", F.explode("features")) \
  .select("date_time", "filename", "label", "description", "direction", "tStart", "tEnd", "features.*")

因此,正如你所看到的,我们对每个数组列进行了explode操作,然后解包它们。通过这样做,你告诉Spark要为你的每个数组中的元素创建结果数据帧中的一行。

输出的架构可能不会完全与你要求的相同(例如,feature_set数组不再存在),但这样可以更紧凑地表示数据,没有任何数据重复。结果的架构和数据帧如下所示:

>>> flat_df.printSchema()
root
 |-- date_time: timestamp (nullable = true)
 |-- filename: string (nullable = true)
 |-- label: string (nullable = true)
 |-- description: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- tStart: double (nullable = true)
 |-- tEnd: double (nullable = true)
 |-- field1: string (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: string (nullable = true)
 |-- field4: string (nullable = true)

>>> flat_df.show()
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
|          date_time| filename| label|         description|direction|     tStart|       tEnd|   field1|   field2|   field3|   field4|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
|2022-08-24 07:51:54|filename1|label1|description of fi...|     east|78.23018987|79.23010199|fld_val11|fld_val12|fld_Val13|fld_Val14|
|2022-08-24 07:51:54|filename1|label1|description of fi...|     west|78.23018987|79.23010199|fld_val21|fld_val22|fld_val23|fld_val24|
|2022-08-24 07:51:54|filename1|label1|description of fi...|    south|78.23018987|79.23010199|fld_val31|fld_val32|fld_val33|fld_val34|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
英文:

Since you're working with arrays, you can't simply select the subcolumns: you'll need to use explode first.

This code block is just to recreate your data (You shouldn't have to do this):

import datetime
from pyspark.sql.types import *

schema = StructType([
    StructField(&quot;date_time&quot;, TimestampType(), True),
    StructField(&quot;filename&quot;, StringType(), True),
    StructField(&quot;label&quot;, StringType(), True),
    StructField(&quot;description&quot;, StringType(), True),
    StructField(&quot;feature_set&quot;,
        ArrayType(
            StructType([
                StructField(&quot;direction&quot;, StringType(), True),
                StructField(&quot;tStart&quot;, DoubleType(), True),
                StructField(&quot;tEnd&quot;, DoubleType(), True),
                StructField(&quot;features&quot;,
                    ArrayType(
                        StructType([
                            StructField(&quot;field1&quot;, StringType(), True),
                            StructField(&quot;field2&quot;, StringType(), True),
                            StructField(&quot;field3&quot;, StringType(), True),
                            StructField(&quot;field4&quot;, StringType(), True),
                        ])
                    ), True),
            ])
        )
    )
])

data = [ ( datetime.datetime(2022, 8, 24, 7, 51, 54), &#39;filename1&#39;, &#39;label1&#39;, &#39;description of file 1&#39;, [ ( &#39;east&#39;, 78.23018987, 79.23010199, [ ( &#39;fld_val11&#39;, &#39;fld_val12&#39;, &#39;fld_Val13&#39;, &#39;fld_Val14&#39;) ]), ( &#39;west&#39;, 78.23018987, 79.23010199, [ ( &#39;fld_val21&#39;, &#39;fld_val22&#39;, &#39;fld_val23&#39;, &#39;fld_val24&#39;) ]), ( &#39;south&#39;, 78.23018987, 79.23010199, [ ( &#39;fld_val31&#39;, &#39;fld_val32&#39;, &#39;fld_val33&#39;, &#39;fld_val34&#39;) ]) ]) ]

df = spark.createDataFrame(data, schema)

Now that we have the data, we need to use a combination of select and explode to get the schema we want:

from pyspark.sql import functions as F

flat_df = df \
  .withColumn(&quot;feature_set&quot;, F.explode(&quot;feature_set&quot;)) \
  .select(&quot;date_time&quot;, &quot;filename&quot;, &quot;label&quot;, &quot;description&quot;, &quot;feature_set.*&quot;) \
  .withColumn(&quot;features&quot;, F.explode(&quot;features&quot;)) \
  .select(&quot;date_time&quot;, &quot;filename&quot;, &quot;label&quot;, &quot;description&quot;, &quot;direction&quot;, &quot;tStart&quot;, &quot;tEnd&quot;, &quot;features.*&quot;)

So as you see, we're explodeing each array column and unwrapping them afterwards. Like this, you're telling Spark to make a row in the resulting dataframe for each element in the arrays you have.

The output schema won't exactly be the same as what you're asking (the feature_set array is not there anymore e.g.) but this makes a more compact version without any data duplication. The resulting schema and dataframe look like this:

&gt;&gt;&gt; flat_df.printSchema()
root
 |-- date_time: timestamp (nullable = true)
 |-- filename: string (nullable = true)
 |-- label: string (nullable = true)
 |-- description: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- tStart: double (nullable = true)
 |-- tEnd: double (nullable = true)
 |-- field1: string (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: string (nullable = true)
 |-- field4: string (nullable = true)

&gt;&gt;&gt; flat_df.show()
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
|          date_time| filename| label|         description|direction|     tStart|       tEnd|   field1|   field2|   field3|   field4|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
|2022-08-24 07:51:54|filename1|label1|description of fi...|     east|78.23018987|79.23010199|fld_val11|fld_val12|fld_Val13|fld_Val14|
|2022-08-24 07:51:54|filename1|label1|description of fi...|     west|78.23018987|79.23010199|fld_val21|fld_val22|fld_val23|fld_val24|
|2022-08-24 07:51:54|filename1|label1|description of fi...|    south|78.23018987|79.23010199|fld_val31|fld_val32|fld_val33|fld_val34|
+-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+

huangapple
  • 本文由 发表于 2023年4月17日 12:57:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76031827.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定