“嵌套数据框的扁平化”

huangapple go评论95阅读模式
英文:

Flattening of Nested dataframe

问题

I have a multi-level nested dataframe like the one below:

DataFrame[date_time: timestamp, filename: string, label: string, description: string, feature_set: array<struct<direction:string,tStart:double,tEnd:double, features:array<structfield1:string,field2:string,field3:string,field4:string>>]

and its values are:

[[datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [['east', 78.23018987, 79.23010199, [['fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14']]], ['west', 78.23018987, 79.23010199, [['fld_val21', 'fld_val22', 'fld_val23', 'fld_val24']]], ['south', 78.23018987, 79.23010199, [['fld_val31', 'fld_val32', 'fld_val33', 'fld_val34']]]]]

I am trying to flatten it like this:

date_time filename label description feature_set_direction feature_set_tStart feature_set_tEnd feature_set_features_Field1 feature_set_features_Field2 feature_set_features_Field3 feature_set_features_Field4
2022-08-24 13:47:47 filename1 label1 description of file 1 east 78.230189787 79.23010199 fld_val11 fld_val12 fld_Val13 fld_Val14

I tried this code, but it's giving an error:

flat_df = df.select("date_time", "filename", "label", "description", "feature_set.*")

AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(feature_set).

I also tried other methods like val, but I faced syntax errors.

Could you please suggest how to proceed? Thank you.

英文:

I have a multi level nested dataframe something like the below:

  1. DataFrame[date_time: timestamp, filename: string, label: string, description: string,
  2. feature_set: array&lt;struct&lt;direction:string,tStart:double,tEnd:double,
  3. features:array&lt;struct&lt;field1:string,field2:string,field3:string,field4:string&gt;&gt;&gt;&gt;]

and its values are:

  1. [[datetime.datetime(2022, 8, 24, 7, 51, 54), &#39;filename1&#39;, &#39;label1&#39;, &#39;description of file 1&#39;, [[&#39;east&#39;, 78.23018987, 79.23010199, [[&#39;fld_val11&#39;, &#39;fld_val12&#39;, &#39;fld_Val13&#39;, &#39;fld_Val14&#39;]]], [&#39;west&#39;, 78.23018987, 79.23010199, [[&#39;fld_val21&#39;, &#39;fld_val22&#39;, &#39;fld_val23&#39;, &#39;fld_val24&#39;]]], [&#39;south&#39;, 78.23018987, 79.23010199, [[&#39;fld_val31&#39;, &#39;fld_val32&#39;, &#39;fld_val33&#39;, &#39;fld_val34&#39;]]]
  2. root
  3. |-- date_time: timestamp (nullable = true)
  4. |-- filename: string (nullable = true)
  5. |-- label: string (nullable = true)
  6. |-- description: string (nullable = true)
  7. |-- feature_set: array (nullable = true)
  8. | |-- element: struct (containsNull = true)
  9. | | |-- direction: string (nullable = true)
  10. | | |-- tStart: double (nullable = true)
  11. | | |-- tEnd: double (nullable = true)
  12. | | |-- features: array (nullable = true)
  13. | | | |-- element: struct (containsNull = true)
  14. | | | | |-- field1: string (nullable = true)
  15. | | | | |-- field2: string (nullable = true)
  16. | | | | |-- field3: string (nullable = true)
  17. | | | | |-- field4:string (nullable = true)

I am trying to flatten it in such a way that it should look like below:

  1. -------------------+--------------------+--------------------+--------------------+--------------------+
  2. | date_time| filename| label| description| feature_set_direction| feature_set_tStart| feature_set_tEnd| feature_set_features_Field1| feature_set_features_Field2| feature_set_features_Field3| feature_set_features_Field4|
  3. +-------------------+--------------------+--------------------+--------------------+--------------------+
  4. |2022-08-24 13:47:47|filename1|label1| description of file 1|east| 78.230189787|79.23010199| fld_val11| fld_val12| fld_Val13| fld_Val14|
  5. +-------------------+--------------------+--------------------+--------------------+--------------------+
  6. root
  7. |-- date_time: timestamp (nullable = true)
  8. |-- filename: string (nullable = true)
  9. |-- label: string (nullable = true)
  10. |-- description: string (nullable = true)
  11. |-- feature_set: array (nullable = true)
  12. |-- feature_set_element: struct (containsNull = true)
  13. |-- feature_set_element_direction: string (nullable = true)
  14. |-- feature_set_element_tStart: double (nullable = true)
  15. |-- feature_set_element_tEnd: double (nullable = true)
  16. |-- feature_set_element_features: array (nullable = true)
  17. |-- feature_set_element_features_element: struct (containsNull = true)
  18. |-- feature_set_element_features_element_field1: string (nullable = true)
  19. |-- feature_set_element_features_element_field2: string (nullable = true)
  20. |-- feature_set_element_features_element_field3: string (nullable = true)
  21. |-- feature_set_element_features_element_field4:string (nullable = true)

I tried to flatten it with the help of the below code, but its throwing error:

  1. flat_df = df.select(&quot;date_time&quot;, &quot;filename&quot;, &quot;label&quot;, &quot;description&quot;, &quot;feature_set.*&quot;)

> AnalysisException: Can only star expand struct data types. Attribute:
> ArrayBuffer(feature_set).

I also tried with couple of other methods like val, but quite unsuccessful.

  1. val df2 = df.select(col(&quot;date_time&quot;),
  2. col(&quot;filename&quot;),
  3. col(&quot;label&quot;),
  4. col(&quot;description&quot;),
  5. col(&quot;feature_set&quot;))

> SyntaxError: invalid syntax (, line 1) File <command-655369>:1
> val df2 = df.select(col("date_time")

Could anyone please suggest how to proceed with?

Thank you.

答案1

得分: 2

由于你正在处理数组,所以不能简单地选择子列:你需要先使用explode函数。

这个代码块只是为了重新创建你的数据(你不应该这样做):

  1. import datetime
  2. from pyspark.sql.types import *
  3. schema = StructType([
  4. StructField("date_time", TimestampType(), True),
  5. StructField("filename", StringType(), True),
  6. StructField("label", StringType(), True),
  7. StructField("description", StringType(), True),
  8. StructField("feature_set",
  9. ArrayType(
  10. StructType([
  11. StructField("direction", StringType(), True),
  12. StructField("tStart", DoubleType(), True),
  13. StructField("tEnd", DoubleType(), True),
  14. StructField("features",
  15. ArrayType(
  16. StructType([
  17. StructField("field1", StringType(), True),
  18. StructField("field2", StringType(), True),
  19. StructField("field3", StringType(), True),
  20. StructField("field4", StringType(), True),
  21. ])
  22. ), True),
  23. ])
  24. )
  25. )
  26. ])
  27. data = [ ( datetime.datetime(2022, 8, 24, 7, 51, 54), 'filename1', 'label1', 'description of file 1', [ ( 'east', 78.23018987, 79.23010199, [ ( 'fld_val11', 'fld_val12', 'fld_Val13', 'fld_Val14') ]), ( 'west', 78.23018987, 79.23010199, [ ( 'fld_val21', 'fld_val22', 'fld_val23', 'fld_val24') ]), ( 'south', 78.23018987, 79.23010199, [ ( 'fld_val31', 'fld_val32', 'fld_val33', 'fld_val34') ]) ]) ]
  28. df = spark.createDataFrame(data, schema)

现在我们有了数据,我们需要使用selectexplode的组合来获得我们想要的架构:

  1. from pyspark.sql import functions as F
  2. flat_df = df \
  3. .withColumn("feature_set", F.explode("feature_set")) \
  4. .select("date_time", "filename", "label", "description", "feature_set.*") \
  5. .withColumn("features", F.explode("features")) \
  6. .select("date_time", "filename", "label", "description", "direction", "tStart", "tEnd", "features.*")

因此,正如你所看到的,我们对每个数组列进行了explode操作,然后解包它们。通过这样做,你告诉Spark要为你的每个数组中的元素创建结果数据帧中的一行。

输出的架构可能不会完全与你要求的相同(例如,feature_set数组不再存在),但这样可以更紧凑地表示数据,没有任何数据重复。结果的架构和数据帧如下所示:

  1. >>> flat_df.printSchema()
  2. root
  3. |-- date_time: timestamp (nullable = true)
  4. |-- filename: string (nullable = true)
  5. |-- label: string (nullable = true)
  6. |-- description: string (nullable = true)
  7. |-- direction: string (nullable = true)
  8. |-- tStart: double (nullable = true)
  9. |-- tEnd: double (nullable = true)
  10. |-- field1: string (nullable = true)
  11. |-- field2: string (nullable = true)
  12. |-- field3: string (nullable = true)
  13. |-- field4: string (nullable = true)
  14. >>> flat_df.show()
  15. +-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
  16. | date_time| filename| label| description|direction| tStart| tEnd| field1| field2| field3| field4|
  17. +-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
  18. |2022-08-24 07:51:54|filename1|label1|description of fi...| east|78.23018987|79.23010199|fld_val11|fld_val12|fld_Val13|fld_Val14|
  19. |2022-08-24 07:51:54|filename1|label1|description of fi...| west|78.23018987|79.23010199|fld_val21|fld_val22|fld_val23|fld_val24|
  20. |2022-08-24 07:51:54|filename1|label1|description of fi...| south|78.23018987|79.23010199|fld_val31|fld_val32|fld_val33|fld_val34|
  21. +-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
英文:

Since you're working with arrays, you can't simply select the subcolumns: you'll need to use explode first.

This code block is just to recreate your data (You shouldn't have to do this):

  1. import datetime
  2. from pyspark.sql.types import *
  3. schema = StructType([
  4. StructField(&quot;date_time&quot;, TimestampType(), True),
  5. StructField(&quot;filename&quot;, StringType(), True),
  6. StructField(&quot;label&quot;, StringType(), True),
  7. StructField(&quot;description&quot;, StringType(), True),
  8. StructField(&quot;feature_set&quot;,
  9. ArrayType(
  10. StructType([
  11. StructField(&quot;direction&quot;, StringType(), True),
  12. StructField(&quot;tStart&quot;, DoubleType(), True),
  13. StructField(&quot;tEnd&quot;, DoubleType(), True),
  14. StructField(&quot;features&quot;,
  15. ArrayType(
  16. StructType([
  17. StructField(&quot;field1&quot;, StringType(), True),
  18. StructField(&quot;field2&quot;, StringType(), True),
  19. StructField(&quot;field3&quot;, StringType(), True),
  20. StructField(&quot;field4&quot;, StringType(), True),
  21. ])
  22. ), True),
  23. ])
  24. )
  25. )
  26. ])
  27. data = [ ( datetime.datetime(2022, 8, 24, 7, 51, 54), &#39;filename1&#39;, &#39;label1&#39;, &#39;description of file 1&#39;, [ ( &#39;east&#39;, 78.23018987, 79.23010199, [ ( &#39;fld_val11&#39;, &#39;fld_val12&#39;, &#39;fld_Val13&#39;, &#39;fld_Val14&#39;) ]), ( &#39;west&#39;, 78.23018987, 79.23010199, [ ( &#39;fld_val21&#39;, &#39;fld_val22&#39;, &#39;fld_val23&#39;, &#39;fld_val24&#39;) ]), ( &#39;south&#39;, 78.23018987, 79.23010199, [ ( &#39;fld_val31&#39;, &#39;fld_val32&#39;, &#39;fld_val33&#39;, &#39;fld_val34&#39;) ]) ]) ]
  28. df = spark.createDataFrame(data, schema)

Now that we have the data, we need to use a combination of select and explode to get the schema we want:

  1. from pyspark.sql import functions as F
  2. flat_df = df \
  3. .withColumn(&quot;feature_set&quot;, F.explode(&quot;feature_set&quot;)) \
  4. .select(&quot;date_time&quot;, &quot;filename&quot;, &quot;label&quot;, &quot;description&quot;, &quot;feature_set.*&quot;) \
  5. .withColumn(&quot;features&quot;, F.explode(&quot;features&quot;)) \
  6. .select(&quot;date_time&quot;, &quot;filename&quot;, &quot;label&quot;, &quot;description&quot;, &quot;direction&quot;, &quot;tStart&quot;, &quot;tEnd&quot;, &quot;features.*&quot;)

So as you see, we're explodeing each array column and unwrapping them afterwards. Like this, you're telling Spark to make a row in the resulting dataframe for each element in the arrays you have.

The output schema won't exactly be the same as what you're asking (the feature_set array is not there anymore e.g.) but this makes a more compact version without any data duplication. The resulting schema and dataframe look like this:

  1. &gt;&gt;&gt; flat_df.printSchema()
  2. root
  3. |-- date_time: timestamp (nullable = true)
  4. |-- filename: string (nullable = true)
  5. |-- label: string (nullable = true)
  6. |-- description: string (nullable = true)
  7. |-- direction: string (nullable = true)
  8. |-- tStart: double (nullable = true)
  9. |-- tEnd: double (nullable = true)
  10. |-- field1: string (nullable = true)
  11. |-- field2: string (nullable = true)
  12. |-- field3: string (nullable = true)
  13. |-- field4: string (nullable = true)
  14. &gt;&gt;&gt; flat_df.show()
  15. +-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
  16. | date_time| filename| label| description|direction| tStart| tEnd| field1| field2| field3| field4|
  17. +-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+
  18. |2022-08-24 07:51:54|filename1|label1|description of fi...| east|78.23018987|79.23010199|fld_val11|fld_val12|fld_Val13|fld_Val14|
  19. |2022-08-24 07:51:54|filename1|label1|description of fi...| west|78.23018987|79.23010199|fld_val21|fld_val22|fld_val23|fld_val24|
  20. |2022-08-24 07:51:54|filename1|label1|description of fi...| south|78.23018987|79.23010199|fld_val31|fld_val32|fld_val33|fld_val34|
  21. +-------------------+---------+------+--------------------+---------+-----------+-----------+---------+---------+---------+---------+

huangapple
  • 本文由 发表于 2023年4月17日 12:57:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76031827.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定