英文:
PySpark / Mongodb Dataframe to Nested Collection
问题
抱歉,由于您要求只返回翻译的部分,我将只提供关于DataFrame变换的翻译,不包括MongoDB部分。以下是您请求的内容的翻译:
使用PySpark,我如何将DataFrame转换为以下格式的模式,然后将其写入MongoDB中的结构化集合?
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
| |-- InterfaceSummary: struct (nullable = false)
| | |-- InterfaceName: string (nullable = true)
| | |-- DocumentCount: string (nullable = true)
| | |-- TotalRowCount: string (nullable = true)
谢谢您提前的支持,
Ben。
英文:
I have a Pandas dataframe in the following format. Data has been pre-aggregated.
+---------------------------+----------+---------+-------------+-------------+
|InterfaceName |StartDate |StartHour|DocumentCount|TotalRowCount|
+---------------------------+----------+---------+-------------+-------------+
|Interface_A |2023-04-01|0 |5 |4384 |
|Interface_A |2023-04-01|1 |58 |57168 |
|Interface_B |2023-04-01|1 |1 |136 |
|Interface_C |2023-04-01|1 |1 |131 |
|Interface_A |2023-04-02|0 |58 |57168 |
|Interface_B |2023-04-02|0 |1 |131 |
|Interface_C |2023-04-02|0 |1 |136 |
|Interface_A |2023-04-02|1 |2 |1657 |
|Interface_B |2023-04-02|1 |2 |1539 |
|Interface_C |2023-04-02|1 |2 |1657 |
+---------------------------+----------+---------+-------------+-------------+
Using PySpark, how can I transform the dataframe so that the schema appears as follows, then write to a structured collection in MongoDb?
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
| |-- InterfaceSummary: struct (nullable = false)
| | |-- InterfaceName: string (nullable = true)
| | |-- DocumentCount: string (nullable = true)
| | |-- TotalRowCount: string (nullable = true)
Thanks in advance,
Ben.
答案1
得分: 1
请找下面的实现 -
(I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas
dataframe you could use this code -
df = spark.createDataFrame(pdf)
Here pdf
will be your pandas
dataframe. )
输入数据 -
from pyspark.sql.types import *
schema = StructType([
StructField("InterfaceName", StringType(), True),
StructField("StartDate", StringType(), True),
StructField("StartHour", IntegerType(), True),
StructField("DocumentCount", IntegerType(), True),
StructField("TotalRowCount", IntegerType(), True)
])
data = [
("Interface_A", "2023-04-01", 0, 5, 4384),
("Interface_A", "2023-04-01", 1, 58, 57168),
("Interface_B", "2023-04-01", 1, 1, 136),
("Interface_C", "2023-04-01", 1, 1, 131),
("Interface_A", "2023-04-02", 0, 58, 57168),
("Interface_B", "2023-04-02", 0, 1, 131),
("Interface_C", "2023-04-02", 0, 1, 136),
("Interface_A", "2023-04-02", 1, 2, 1657),
("Interface_B", "2023-04-02", 1, 2, 1539),
("Interface_C", "2023-04-02", 1, 2, 1657)
]
df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)
+-------------+----------+---------+-------------+-------------+
|InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
+-------------+----------+---------+-------------+-------------+
|Interface_A |2023-04-01|0 |5 |4384 |
|Interface_A |2023-04-01|1 |58 |57168 |
|Interface_B |2023-04-01|1 |1 |136 |
|Interface_C |2023-04-01|1 |1 |131 |
|Interface_A |2023-04-02|0 |58 |57168 |
|Interface_B |2023-04-02|0 |1 |131 |
|Interface_C |2023-04-02|0 |1 |136 |
|Interface_A |2023-04-02|1 |2 |1657 |
|Interface_B |2023-04-02|1 |2 |1539 |
|Interface_C |2023-04-02|1 |2 |1657 |
+-------------+----------+---------+-------------+-------------+
转换后的模式 -
from pyspark.sql.functions import *
df1 = df.select(
col("StartDate").cast("Date"),
col("StartHour").cast("Integer"),
struct(
col("InterfaceName"),
col("DocumentCount").cast("String").alias("DocumentCount"),
col("TotalRowCount").cast("String").alias("TotalRowCount")
).alias("InterfaceSummary")
)
df1.show(truncate=False)
df1.printSchema()
+----------+---------+------------------------+
|StartDate |StartHour|InterfaceSummary |
+----------+---------+------------------------+
|2023-04-01|0 |{Interface_A, 5, 4384} |
|2023-04-01|1 |{Interface_A, 58, 57168}|
|2023-04-01|1 |{Interface_B, 1, 136} |
|2023-04-01|1 |{Interface_C, 1, 131} |
|2023-04-02|0 |{Interface_A, 58, 57168}|
|2023-04-02|0 |{Interface_B, 1, 131} |
|2023-04-02|0 |{Interface_C, 1, 136} |
|2023-04-02|1 |{Interface_A, 2, 1657} |
|2023-04-02|1 |{Interface_B, 2, 1539} |
|2023-04-02|1 |{Interface_C, 2, 1657} |
+----------+---------+------------------------+
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
|-- InterfaceSummary: struct (nullable = false)
| |-- InterfaceName: string (nullable = true)
| |-- DocumentCount: string (nullable = true)
| |-- TotalRowCount: string (nullable = true)
一旦您创建了转换后的数据框,您可以将其写入目标 mongodb 集合,如下所示 -
mongo_uri = "<mongodb>://<username>:<password>@<host>:<port>/<dbname>.<collectionname>"
database_name = "<dbname>"
collection_name = "<collectionname>"
df.write.format("mongo") \
.option("uri", mongo_uri) \
.option("database", database_name) \
.option("collection", collection_name) \
.mode("append") \
.save()
英文:
See the below implementation - <br>
(I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas
dataframe you could use this code - <br>
df = spark.createDataFrame(pdf)
<br>Here pdf
will be your pandas
dataframe. <br>
)
Input Data -
from pyspark.sql.types import *
schema = StructType([
StructField("InterfaceName", StringType(), True),
StructField("StartDate", StringType(), True),
StructField("StartHour", IntegerType(), True),
StructField("DocumentCount", IntegerType(), True),
StructField("TotalRowCount", IntegerType(), True)
])
data = [
("Interface_A", "2023-04-01", 0, 5, 4384),
("Interface_A", "2023-04-01", 1, 58, 57168),
("Interface_B", "2023-04-01", 1, 1, 136),
("Interface_C", "2023-04-01", 1, 1, 131),
("Interface_A", "2023-04-02", 0, 58, 57168),
("Interface_B", "2023-04-02", 0, 1, 131),
("Interface_C", "2023-04-02", 0, 1, 136),
("Interface_A", "2023-04-02", 1, 2, 1657),
("Interface_B", "2023-04-02", 1, 2, 1539),
("Interface_C", "2023-04-02", 1, 2, 1657)
]
df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)
+-------------+----------+---------+-------------+-------------+
|InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
+-------------+----------+---------+-------------+-------------+
|Interface_A |2023-04-01|0 |5 |4384 |
|Interface_A |2023-04-01|1 |58 |57168 |
|Interface_B |2023-04-01|1 |1 |136 |
|Interface_C |2023-04-01|1 |1 |131 |
|Interface_A |2023-04-02|0 |58 |57168 |
|Interface_B |2023-04-02|0 |1 |131 |
|Interface_C |2023-04-02|0 |1 |136 |
|Interface_A |2023-04-02|1 |2 |1657 |
|Interface_B |2023-04-02|1 |2 |1539 |
|Interface_C |2023-04-02|1 |2 |1657 |
+-------------+----------+---------+-------------+-------------+
Transformed Schema -
from pyspark.sql.functions import *
df1 = df.select(
col("StartDate").cast("Date"),
col("StartHour").cast("Integer"),
struct(
col("InterfaceName"),
col("DocumentCount").cast("String").alias("DocumentCount"),
col("TotalRowCount").cast("String").alias("TotalRowCount")
).alias("InterfaceSummary")
)
df1.show(truncate=False)
df1.printSchema()
+----------+---------+------------------------+
|StartDate |StartHour|InterfaceSummary |
+----------+---------+------------------------+
|2023-04-01|0 |{Interface_A, 5, 4384} |
|2023-04-01|1 |{Interface_A, 58, 57168}|
|2023-04-01|1 |{Interface_B, 1, 136} |
|2023-04-01|1 |{Interface_C, 1, 131} |
|2023-04-02|0 |{Interface_A, 58, 57168}|
|2023-04-02|0 |{Interface_B, 1, 131} |
|2023-04-02|0 |{Interface_C, 1, 136} |
|2023-04-02|1 |{Interface_A, 2, 1657} |
|2023-04-02|1 |{Interface_B, 2, 1539} |
|2023-04-02|1 |{Interface_C, 2, 1657} |
+----------+---------+------------------------+
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
|-- InterfaceSummary: struct (nullable = false)
| |-- InterfaceName: string (nullable = true)
| |-- DocumentCount: string (nullable = true)
| |-- TotalRowCount: string (nullable = true)
Once you have the transformed dataframe created. You can write it to your target mongodb collection somewhat as below -
mongo_uri = "<mongodb>://<username>:<password>@<host>:<port>/<dbname>.<collectionname>"
database_name = "<dbname>"
collection_name = "<collectionname>"
df.write.format("mongo") \
.option("uri", mongo_uri) \
.option("database", database_name) \
.option("collection", collection_name) \
.mode("append") \
.save()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论