PySpark / Mongodb Dataframe to Nested Collection

huangapple go评论89阅读模式
英文:

PySpark / Mongodb Dataframe to Nested Collection

问题

抱歉,由于您要求只返回翻译的部分,我将只提供关于DataFrame变换的翻译,不包括MongoDB部分。以下是您请求的内容的翻译:

使用PySpark,我如何将DataFrame转换为以下格式的模式,然后将其写入MongoDB中的结构化集合?

  1. root
  2. |-- StartDate: date (nullable = true)
  3. |-- StartHour: integer (nullable = true)
  4. | |-- InterfaceSummary: struct (nullable = false)
  5. | | |-- InterfaceName: string (nullable = true)
  6. | | |-- DocumentCount: string (nullable = true)
  7. | | |-- TotalRowCount: string (nullable = true)

谢谢您提前的支持,

Ben。

英文:

I have a Pandas dataframe in the following format. Data has been pre-aggregated.

  1. +---------------------------+----------+---------+-------------+-------------+
  2. |InterfaceName |StartDate |StartHour|DocumentCount|TotalRowCount|
  3. +---------------------------+----------+---------+-------------+-------------+
  4. |Interface_A |2023-04-01|0 |5 |4384 |
  5. |Interface_A |2023-04-01|1 |58 |57168 |
  6. |Interface_B |2023-04-01|1 |1 |136 |
  7. |Interface_C |2023-04-01|1 |1 |131 |
  8. |Interface_A |2023-04-02|0 |58 |57168 |
  9. |Interface_B |2023-04-02|0 |1 |131 |
  10. |Interface_C |2023-04-02|0 |1 |136 |
  11. |Interface_A |2023-04-02|1 |2 |1657 |
  12. |Interface_B |2023-04-02|1 |2 |1539 |
  13. |Interface_C |2023-04-02|1 |2 |1657 |
  14. +---------------------------+----------+---------+-------------+-------------+

Using PySpark, how can I transform the dataframe so that the schema appears as follows, then write to a structured collection in MongoDb?

  1. root
  2. |-- StartDate: date (nullable = true)
  3. |-- StartHour: integer (nullable = true)
  4. | |-- InterfaceSummary: struct (nullable = false)
  5. | | |-- InterfaceName: string (nullable = true)
  6. | | |-- DocumentCount: string (nullable = true)
  7. | | |-- TotalRowCount: string (nullable = true)

Thanks in advance,

Ben.

答案1

得分: 1

请找下面的实现 -

(I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas dataframe you could use this code -
df = spark.createDataFrame(pdf)
Here pdf will be your pandas dataframe. )

输入数据 -

  1. from pyspark.sql.types import *
  2. schema = StructType([
  3. StructField("InterfaceName", StringType(), True),
  4. StructField("StartDate", StringType(), True),
  5. StructField("StartHour", IntegerType(), True),
  6. StructField("DocumentCount", IntegerType(), True),
  7. StructField("TotalRowCount", IntegerType(), True)
  8. ])
  9. data = [
  10. ("Interface_A", "2023-04-01", 0, 5, 4384),
  11. ("Interface_A", "2023-04-01", 1, 58, 57168),
  12. ("Interface_B", "2023-04-01", 1, 1, 136),
  13. ("Interface_C", "2023-04-01", 1, 1, 131),
  14. ("Interface_A", "2023-04-02", 0, 58, 57168),
  15. ("Interface_B", "2023-04-02", 0, 1, 131),
  16. ("Interface_C", "2023-04-02", 0, 1, 136),
  17. ("Interface_A", "2023-04-02", 1, 2, 1657),
  18. ("Interface_B", "2023-04-02", 1, 2, 1539),
  19. ("Interface_C", "2023-04-02", 1, 2, 1657)
  20. ]
  21. df = spark.createDataFrame(data, schema=schema)
  22. df.show(truncate=False)
  23. +-------------+----------+---------+-------------+-------------+
  24. |InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
  25. +-------------+----------+---------+-------------+-------------+
  26. |Interface_A |2023-04-01|0 |5 |4384 |
  27. |Interface_A |2023-04-01|1 |58 |57168 |
  28. |Interface_B |2023-04-01|1 |1 |136 |
  29. |Interface_C |2023-04-01|1 |1 |131 |
  30. |Interface_A |2023-04-02|0 |58 |57168 |
  31. |Interface_B |2023-04-02|0 |1 |131 |
  32. |Interface_C |2023-04-02|0 |1 |136 |
  33. |Interface_A |2023-04-02|1 |2 |1657 |
  34. |Interface_B |2023-04-02|1 |2 |1539 |
  35. |Interface_C |2023-04-02|1 |2 |1657 |
  36. +-------------+----------+---------+-------------+-------------+

转换后的模式 -

  1. from pyspark.sql.functions import *
  2. df1 = df.select(
  3. col("StartDate").cast("Date"),
  4. col("StartHour").cast("Integer"),
  5. struct(
  6. col("InterfaceName"),
  7. col("DocumentCount").cast("String").alias("DocumentCount"),
  8. col("TotalRowCount").cast("String").alias("TotalRowCount")
  9. ).alias("InterfaceSummary")
  10. )
  11. df1.show(truncate=False)
  12. df1.printSchema()
  13. +----------+---------+------------------------+
  14. |StartDate |StartHour|InterfaceSummary |
  15. +----------+---------+------------------------+
  16. |2023-04-01|0 |{Interface_A, 5, 4384} |
  17. |2023-04-01|1 |{Interface_A, 58, 57168}|
  18. |2023-04-01|1 |{Interface_B, 1, 136} |
  19. |2023-04-01|1 |{Interface_C, 1, 131} |
  20. |2023-04-02|0 |{Interface_A, 58, 57168}|
  21. |2023-04-02|0 |{Interface_B, 1, 131} |
  22. |2023-04-02|0 |{Interface_C, 1, 136} |
  23. |2023-04-02|1 |{Interface_A, 2, 1657} |
  24. |2023-04-02|1 |{Interface_B, 2, 1539} |
  25. |2023-04-02|1 |{Interface_C, 2, 1657} |
  26. +----------+---------+------------------------+
  27. root
  28. |-- StartDate: date (nullable = true)
  29. |-- StartHour: integer (nullable = true)
  30. |-- InterfaceSummary: struct (nullable = false)
  31. | |-- InterfaceName: string (nullable = true)
  32. | |-- DocumentCount: string (nullable = true)
  33. | |-- TotalRowCount: string (nullable = true)

一旦您创建了转换后的数据框,您可以将其写入目标 mongodb 集合,如下所示 -

  1. mongo_uri = "<mongodb>://<username>:<password>@<host>:<port>/<dbname>.<collectionname>"
  2. database_name = "<dbname>"
  3. collection_name = "<collectionname>"
  4. df.write.format("mongo") \
  5. .option("uri", mongo_uri) \
  6. .option("database", database_name) \
  7. .option("collection", collection_name) \
  8. .mode("append") \
  9. .save()
英文:

See the below implementation - <br>
(I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas dataframe you could use this code - <br>
df = spark.createDataFrame(pdf)<br>Here pdf will be your pandas dataframe. <br>
)

Input Data -

  1. from pyspark.sql.types import *
  2. schema = StructType([
  3. StructField(&quot;InterfaceName&quot;, StringType(), True),
  4. StructField(&quot;StartDate&quot;, StringType(), True),
  5. StructField(&quot;StartHour&quot;, IntegerType(), True),
  6. StructField(&quot;DocumentCount&quot;, IntegerType(), True),
  7. StructField(&quot;TotalRowCount&quot;, IntegerType(), True)
  8. ])
  9. data = [
  10. (&quot;Interface_A&quot;, &quot;2023-04-01&quot;, 0, 5, 4384),
  11. (&quot;Interface_A&quot;, &quot;2023-04-01&quot;, 1, 58, 57168),
  12. (&quot;Interface_B&quot;, &quot;2023-04-01&quot;, 1, 1, 136),
  13. (&quot;Interface_C&quot;, &quot;2023-04-01&quot;, 1, 1, 131),
  14. (&quot;Interface_A&quot;, &quot;2023-04-02&quot;, 0, 58, 57168),
  15. (&quot;Interface_B&quot;, &quot;2023-04-02&quot;, 0, 1, 131),
  16. (&quot;Interface_C&quot;, &quot;2023-04-02&quot;, 0, 1, 136),
  17. (&quot;Interface_A&quot;, &quot;2023-04-02&quot;, 1, 2, 1657),
  18. (&quot;Interface_B&quot;, &quot;2023-04-02&quot;, 1, 2, 1539),
  19. (&quot;Interface_C&quot;, &quot;2023-04-02&quot;, 1, 2, 1657)
  20. ]
  21. df = spark.createDataFrame(data, schema=schema)
  22. df.show(truncate=False)
  23. +-------------+----------+---------+-------------+-------------+
  24. |InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
  25. +-------------+----------+---------+-------------+-------------+
  26. |Interface_A |2023-04-01|0 |5 |4384 |
  27. |Interface_A |2023-04-01|1 |58 |57168 |
  28. |Interface_B |2023-04-01|1 |1 |136 |
  29. |Interface_C |2023-04-01|1 |1 |131 |
  30. |Interface_A |2023-04-02|0 |58 |57168 |
  31. |Interface_B |2023-04-02|0 |1 |131 |
  32. |Interface_C |2023-04-02|0 |1 |136 |
  33. |Interface_A |2023-04-02|1 |2 |1657 |
  34. |Interface_B |2023-04-02|1 |2 |1539 |
  35. |Interface_C |2023-04-02|1 |2 |1657 |
  36. +-------------+----------+---------+-------------+-------------+

Transformed Schema -

  1. from pyspark.sql.functions import *
  2. df1 = df.select(
  3. col(&quot;StartDate&quot;).cast(&quot;Date&quot;),
  4. col(&quot;StartHour&quot;).cast(&quot;Integer&quot;),
  5. struct(
  6. col(&quot;InterfaceName&quot;),
  7. col(&quot;DocumentCount&quot;).cast(&quot;String&quot;).alias(&quot;DocumentCount&quot;),
  8. col(&quot;TotalRowCount&quot;).cast(&quot;String&quot;).alias(&quot;TotalRowCount&quot;)
  9. ).alias(&quot;InterfaceSummary&quot;)
  10. )
  11. df1.show(truncate=False)
  12. df1.printSchema()
  13. +----------+---------+------------------------+
  14. |StartDate |StartHour|InterfaceSummary |
  15. +----------+---------+------------------------+
  16. |2023-04-01|0 |{Interface_A, 5, 4384} |
  17. |2023-04-01|1 |{Interface_A, 58, 57168}|
  18. |2023-04-01|1 |{Interface_B, 1, 136} |
  19. |2023-04-01|1 |{Interface_C, 1, 131} |
  20. |2023-04-02|0 |{Interface_A, 58, 57168}|
  21. |2023-04-02|0 |{Interface_B, 1, 131} |
  22. |2023-04-02|0 |{Interface_C, 1, 136} |
  23. |2023-04-02|1 |{Interface_A, 2, 1657} |
  24. |2023-04-02|1 |{Interface_B, 2, 1539} |
  25. |2023-04-02|1 |{Interface_C, 2, 1657} |
  26. +----------+---------+------------------------+
  27. root
  28. |-- StartDate: date (nullable = true)
  29. |-- StartHour: integer (nullable = true)
  30. |-- InterfaceSummary: struct (nullable = false)
  31. | |-- InterfaceName: string (nullable = true)
  32. | |-- DocumentCount: string (nullable = true)
  33. | |-- TotalRowCount: string (nullable = true)

Once you have the transformed dataframe created. You can write it to your target mongodb collection somewhat as below -

  1. mongo_uri = &quot;&lt;mongodb&gt;://&lt;username&gt;:&lt;password&gt;@&lt;host&gt;:&lt;port&gt;/&lt;dbname&gt;.&lt;collectionname&gt;&quot;
  2. database_name = &quot;&lt;dbname&gt;&quot;
  3. collection_name = &quot;&lt;collectionname&gt;&quot;
  4. df.write.format(&quot;mongo&quot;) \
  5. .option(&quot;uri&quot;, mongo_uri) \
  6. .option(&quot;database&quot;, database_name) \
  7. .option(&quot;collection&quot;, collection_name) \
  8. .mode(&quot;append&quot;) \
  9. .save()

huangapple
  • 本文由 发表于 2023年4月4日 13:26:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75925780.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定