PySpark / Mongodb Dataframe to Nested Collection

huangapple go评论57阅读模式
英文:

PySpark / Mongodb Dataframe to Nested Collection

问题

抱歉,由于您要求只返回翻译的部分,我将只提供关于DataFrame变换的翻译,不包括MongoDB部分。以下是您请求的内容的翻译:

使用PySpark,我如何将DataFrame转换为以下格式的模式,然后将其写入MongoDB中的结构化集合?

root
 |-- StartDate: date (nullable = true)
 |-- StartHour: integer (nullable = true)
 |    |-- InterfaceSummary: struct (nullable = false)
 |    |    |-- InterfaceName: string (nullable = true)
 |    |    |-- DocumentCount: string (nullable = true)
 |    |    |-- TotalRowCount: string (nullable = true)

谢谢您提前的支持,

Ben。

英文:

I have a Pandas dataframe in the following format. Data has been pre-aggregated.

+---------------------------+----------+---------+-------------+-------------+
|InterfaceName              |StartDate |StartHour|DocumentCount|TotalRowCount|
+---------------------------+----------+---------+-------------+-------------+
|Interface_A                |2023-04-01|0        |5            |4384         |
|Interface_A   		        |2023-04-01|1        |58           |57168        |
|Interface_B     	        |2023-04-01|1        |1            |136          |
|Interface_C          	    |2023-04-01|1        |1            |131          |
|Interface_A   		        |2023-04-02|0        |58           |57168        |
|Interface_B         	    |2023-04-02|0        |1            |131          |
|Interface_C     	        |2023-04-02|0        |1            |136          |
|Interface_A		        |2023-04-02|1        |2            |1657         |
|Interface_B                |2023-04-02|1        |2            |1539         |
|Interface_C       	        |2023-04-02|1        |2            |1657         |
+---------------------------+----------+---------+-------------+-------------+

Using PySpark, how can I transform the dataframe so that the schema appears as follows, then write to a structured collection in MongoDb?

root
 |-- StartDate: date (nullable = true)
 |-- StartHour: integer (nullable = true)
 |    |-- InterfaceSummary: struct (nullable = false)
 |    |    |-- InterfaceName: string (nullable = true)
 |    |    |-- DocumentCount: string (nullable = true)
 |    |    |-- TotalRowCount: string (nullable = true)

Thanks in advance,

Ben.

答案1

得分: 1

请找下面的实现 -

(I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas dataframe you could use this code -
df = spark.createDataFrame(pdf)
Here pdf will be your pandas dataframe. )

输入数据 -

from pyspark.sql.types import *

schema = StructType([
    StructField("InterfaceName", StringType(), True),
    StructField("StartDate", StringType(), True),
    StructField("StartHour", IntegerType(), True),
    StructField("DocumentCount", IntegerType(), True),
    StructField("TotalRowCount", IntegerType(), True)
])

data = [
    ("Interface_A", "2023-04-01", 0, 5, 4384),
    ("Interface_A", "2023-04-01", 1, 58, 57168),
    ("Interface_B", "2023-04-01", 1, 1, 136),
    ("Interface_C", "2023-04-01", 1, 1, 131),
    ("Interface_A", "2023-04-02", 0, 58, 57168),
    ("Interface_B", "2023-04-02", 0, 1, 131),
    ("Interface_C", "2023-04-02", 0, 1, 136),
    ("Interface_A", "2023-04-02", 1, 2, 1657),
    ("Interface_B", "2023-04-02", 1, 2, 1539),
    ("Interface_C", "2023-04-02", 1, 2, 1657)
]

df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)

+-------------+----------+---------+-------------+-------------+
|InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
+-------------+----------+---------+-------------+-------------+
|Interface_A  |2023-04-01|0        |5            |4384         |
|Interface_A  |2023-04-01|1        |58           |57168        |
|Interface_B  |2023-04-01|1        |1            |136          |
|Interface_C  |2023-04-01|1        |1            |131          |
|Interface_A  |2023-04-02|0        |58           |57168        |
|Interface_B  |2023-04-02|0        |1            |131          |
|Interface_C  |2023-04-02|0        |1            |136          |
|Interface_A  |2023-04-02|1        |2            |1657         |
|Interface_B  |2023-04-02|1        |2            |1539         |
|Interface_C  |2023-04-02|1        |2            |1657         |
+-------------+----------+---------+-------------+-------------+

转换后的模式 -

from pyspark.sql.functions import *

df1 = df.select(
                col("StartDate").cast("Date"),
                col("StartHour").cast("Integer"),
                struct(
                  col("InterfaceName"),
                  col("DocumentCount").cast("String").alias("DocumentCount"),
                  col("TotalRowCount").cast("String").alias("TotalRowCount")
                ).alias("InterfaceSummary")
)
df1.show(truncate=False)
df1.printSchema()

+----------+---------+------------------------+
|StartDate |StartHour|InterfaceSummary        |
+----------+---------+------------------------+
|2023-04-01|0        |{Interface_A, 5, 4384}  |
|2023-04-01|1        |{Interface_A, 58, 57168}|
|2023-04-01|1        |{Interface_B, 1, 136}   |
|2023-04-01|1        |{Interface_C, 1, 131}   |
|2023-04-02|0        |{Interface_A, 58, 57168}|
|2023-04-02|0        |{Interface_B, 1, 131}   |
|2023-04-02|0        |{Interface_C, 1, 136}   |
|2023-04-02|1        |{Interface_A, 2, 1657}  |
|2023-04-02|1        |{Interface_B, 2, 1539}  |
|2023-04-02|1        |{Interface_C, 2, 1657}  |
+----------+---------+------------------------+

root
 |-- StartDate: date (nullable = true)
 |-- StartHour: integer (nullable = true)
 |-- InterfaceSummary: struct (nullable = false)
 |    |-- InterfaceName: string (nullable = true)
 |    |-- DocumentCount: string (nullable = true)
 |    |-- TotalRowCount: string (nullable = true)

一旦您创建了转换后的数据框,您可以将其写入目标 mongodb 集合,如下所示 -

mongo_uri = "<mongodb>://<username>:<password>@<host>:<port>/<dbname>.<collectionname>"
database_name = "<dbname>"
collection_name = "<collectionname>"

df.write.format("mongo") \
  .option("uri", mongo_uri) \
  .option("database", database_name) \
  .option("collection", collection_name) \
  .mode("append") \
  .save()
英文:

See the below implementation - <br>
(I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas dataframe you could use this code - <br>
df = spark.createDataFrame(pdf)<br>Here pdf will be your pandas dataframe. <br>
)

Input Data -

from pyspark.sql.types import *
schema = StructType([
StructField(&quot;InterfaceName&quot;, StringType(), True),
StructField(&quot;StartDate&quot;, StringType(), True),
StructField(&quot;StartHour&quot;, IntegerType(), True),
StructField(&quot;DocumentCount&quot;, IntegerType(), True),
StructField(&quot;TotalRowCount&quot;, IntegerType(), True)
])
data = [
(&quot;Interface_A&quot;, &quot;2023-04-01&quot;, 0, 5, 4384),
(&quot;Interface_A&quot;, &quot;2023-04-01&quot;, 1, 58, 57168),
(&quot;Interface_B&quot;, &quot;2023-04-01&quot;, 1, 1, 136),
(&quot;Interface_C&quot;, &quot;2023-04-01&quot;, 1, 1, 131),
(&quot;Interface_A&quot;, &quot;2023-04-02&quot;, 0, 58, 57168),
(&quot;Interface_B&quot;, &quot;2023-04-02&quot;, 0, 1, 131),
(&quot;Interface_C&quot;, &quot;2023-04-02&quot;, 0, 1, 136),
(&quot;Interface_A&quot;, &quot;2023-04-02&quot;, 1, 2, 1657),
(&quot;Interface_B&quot;, &quot;2023-04-02&quot;, 1, 2, 1539),
(&quot;Interface_C&quot;, &quot;2023-04-02&quot;, 1, 2, 1657)
]
df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)
+-------------+----------+---------+-------------+-------------+
|InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
+-------------+----------+---------+-------------+-------------+
|Interface_A  |2023-04-01|0        |5            |4384         |
|Interface_A  |2023-04-01|1        |58           |57168        |
|Interface_B  |2023-04-01|1        |1            |136          |
|Interface_C  |2023-04-01|1        |1            |131          |
|Interface_A  |2023-04-02|0        |58           |57168        |
|Interface_B  |2023-04-02|0        |1            |131          |
|Interface_C  |2023-04-02|0        |1            |136          |
|Interface_A  |2023-04-02|1        |2            |1657         |
|Interface_B  |2023-04-02|1        |2            |1539         |
|Interface_C  |2023-04-02|1        |2            |1657         |
+-------------+----------+---------+-------------+-------------+

Transformed Schema -

from pyspark.sql.functions import *
df1 = df.select(
col(&quot;StartDate&quot;).cast(&quot;Date&quot;),
col(&quot;StartHour&quot;).cast(&quot;Integer&quot;),
struct(
col(&quot;InterfaceName&quot;),
col(&quot;DocumentCount&quot;).cast(&quot;String&quot;).alias(&quot;DocumentCount&quot;),
col(&quot;TotalRowCount&quot;).cast(&quot;String&quot;).alias(&quot;TotalRowCount&quot;)
).alias(&quot;InterfaceSummary&quot;)
)
df1.show(truncate=False)
df1.printSchema()
+----------+---------+------------------------+
|StartDate |StartHour|InterfaceSummary        |
+----------+---------+------------------------+
|2023-04-01|0        |{Interface_A, 5, 4384}  |
|2023-04-01|1        |{Interface_A, 58, 57168}|
|2023-04-01|1        |{Interface_B, 1, 136}   |
|2023-04-01|1        |{Interface_C, 1, 131}   |
|2023-04-02|0        |{Interface_A, 58, 57168}|
|2023-04-02|0        |{Interface_B, 1, 131}   |
|2023-04-02|0        |{Interface_C, 1, 136}   |
|2023-04-02|1        |{Interface_A, 2, 1657}  |
|2023-04-02|1        |{Interface_B, 2, 1539}  |
|2023-04-02|1        |{Interface_C, 2, 1657}  |
+----------+---------+------------------------+
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
|-- InterfaceSummary: struct (nullable = false)
|    |-- InterfaceName: string (nullable = true)
|    |-- DocumentCount: string (nullable = true)
|    |-- TotalRowCount: string (nullable = true)

Once you have the transformed dataframe created. You can write it to your target mongodb collection somewhat as below -

mongo_uri = &quot;&lt;mongodb&gt;://&lt;username&gt;:&lt;password&gt;@&lt;host&gt;:&lt;port&gt;/&lt;dbname&gt;.&lt;collectionname&gt;&quot;
database_name = &quot;&lt;dbname&gt;&quot;
collection_name = &quot;&lt;collectionname&gt;&quot;
df.write.format(&quot;mongo&quot;) \
.option(&quot;uri&quot;, mongo_uri) \
.option(&quot;database&quot;, database_name) \
.option(&quot;collection&quot;, collection_name) \
.mode(&quot;append&quot;) \
.save()

huangapple
  • 本文由 发表于 2023年4月4日 13:26:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75925780.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定