英文:
Efficient way to compute several thousands of averages from time segments of one single TimeSeries DataFrame
问题
假设我有一个名为<b>"values"</b>的PySpark DataFrame,其中包含随时间变化的电压值(时间序列)。DataFrame 包含 10 亿个测量点(10^9 行),最初存储在 Parquet 文件中。DataFrame 的模式如下:
> values<br> |-- time: float (nullable = true)<br> |-- voltage: float
> (nullable = true)
<p><b>values: 1.000.000.000 values</b><br>
[time, voltage]<br>
[0.000000000, 4.1870174]<br>
[0.001000141, 4.199591]<br>
[0.002001438, 4.2184515]<br>
[0.003001813, 4.237312]<br>
[0.004002469, 4.256172]<br>
.....<br>
[459586.004002469, 459586.256172]<br>
<p>
然后,我有另一个名为<b>"timeperiods"</b>的PySpark DataFrame,其中包含 1,000,000 行的时间段的开始/结束:
> timeperiods<br> |-- start: float (nullable = true)<br> |-- end: float
> (nullable = true)
<p><b>timeperiods: 1.000.000 values</b><br>
[start, end]<br>
[0.000000000, 1.1870174]<br>
[2.001000141, 4.199591]<br>
[5.002001438, 5.2184515]<br>
[6.003001813, 6.237312]<br>
.....<br>
[459585.004002469, 459586.256172]<br>
<p>
我想要在DataFrame <b>"timeperiods"</b> 中的每个时间段内计算DataFrame <b>"values"</b> 中 "start" 和 "end" 之间电压值的平均值。在传统编程环境中,我会遍历每个时间段,计算 "start" 和 "end" 之间电压值的总和,将总和除以 "start" 和 "end" 之间的值的数量,最后将结果保存在一个表中。如何在 Spark 中做到这一点呢?
(注意:以下部分提供了解决问题的一种可能方法)
您可以使用 Spark 的分布式计算能力来解决这个问题。首先,您可以将两个 DataFrame 连接起来,以便在同一上下文中访问时间段和电压数据。接下来,您可以使用 Spark 的聚合操作来计算每个时间段内的平均电压值。
这是一个潜在的示例代码片段,演示如何执行这些操作:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 创建 Spark 会话
spark = SparkSession.builder.appName("VoltageAverage").getOrCreate()
# 读取 values 和 timeperiods DataFrame
values_df = spark.read.parquet("path_to_values_parquet_file")
timeperiods_df = spark.read.parquet("path_to_timeperiods_parquet_file")
# 连接两个 DataFrame,根据时间字段进行连接
joined_df = timeperiods_df.join(values_df, (values_df["time"] >= timeperiods_df["start"]) & (values_df["time"] <= timeperiods_df["end"]), "inner")
# 计算每个时间段内的平均电压值
result_df = joined_df.groupBy("start", "end").agg({"voltage": "avg"})
# 将结果保存到表或文件中
result_df.write.parquet("path_to_output_parquet_file")
# 停止 Spark 会话
spark.stop()
请注意,这只是一个示例,您需要根据您的实际数据和需求进行调整。您需要替换 path_to_values_parquet_file
、path_to_timeperiods_parquet_file
和 path_to_output_parquet_file
以匹配您的文件路径。此外,您可能需要调整聚合操作以满足您的准确要求。
英文:
Suppose I have a PySpark DataFrame called <b>"values"</b> containing the values of a voltage over time (Time series). The DataFrame contains 1 billions of measure points (10^9 rows) and is originally stored in a Parquet file. The schema of the DataFrame is:
> values<br> |-- time: float (nullable = true)<br> |-- voltage: float
> (nullable = true)
<p><b>values: 1.000.000.000 values</b><br>
[time, voltage]<br>
[0.000000000, 4.1870174]<br>
[0.001000141, 4.199591]<br>
[0.002001438, 4.2184515]<br>
[0.003001813, 4.237312]<br>
[0.004002469, 4.256172]<br>
.....<br>
[459586.004002469, 459586.256172]<br>
<p>
Then I have another PySpark DataFrame called <b>"timeperiods"</b> containing starts/ends of time periods with 1.000.000 of rows:
> timeperiods<br> |-- start: float (nullable = true)<br> |-- end: float
> (nullable = true)
<p><b>timeperiods: 1.000.000 values</b><br>
[start, end]<br>
[0.000000000, 1.1870174]<br>
[2.001000141, 4.199591]<br>
[5.002001438, 5.2184515]<br>
[6.003001813, 6.237312]<br>
.....<br>
[459585.004002469, 459586.256172]<br>
<p>
I would like, for each time period in the DataFrame <b>"timeperiods"</b>, to compute the average for the voltage values in the DataFrame <b>"values"</b> between "start" and "end". How do would you solve this problem? In a traditional programming environment I would go through each time period, calculate the sum of voltage values between start and end, divide the sum by the number of values between start and end and finally save the result in a table. How can I do this with Spark?
答案1
得分: 0
以下是您要翻译的内容:
- 使用时间戳的范围连接
- 按您用于“分桶”计算的时间之一进行分组(在这种情况下,我在这里使用了“start”)
- 使用
F.mean
对groupBy的结果进行聚合
我已经在df1
中添加了一些额外的值,以确保一些值存在于除第一个之外的其他桶中。
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[(0.000000000, 4.1870174),
(0.001000141, 4.199591),
(0.002001438, 4.2184515),
(0.003001813, 4.237312),
(0.004002469, 4.256172),
(5.1, 10.0),
(5.11, 14.0),
(6.1, 20.0),
(6.11, 0.0)],
["time", "voltage"]
)
df2 = spark.createDataFrame(
[(0.000000000, 1.1870174),
(2.001000141, 4.199591),
(5.002001438, 5.2184515),
(6.003001813, 6.237312)],
["start", "end"]
)
df2.join(df1, on=[df1.time >= df2.start, df1.time <= df2.end]) \
.groupBy("start") \
.agg(F.mean("voltage")) \
.show()
+-----------+------------------+
| start| avg(voltage)|
+-----------+------------------+
| 0.0|4.2197087799999995|
|5.002001438| 12.0|
|6.003001813| 10.0|
+-----------+------------------+
希望这有所帮助!
英文:
As posted in the comments, you can solve this by:
- using a range join on your timestamps
- grouping by the on of the times that you're using to "bucket" your calculations in (I'm using
start
in this case) - aggregating the the result of the groupBy with
F.mean
I've added some extra values in df1
to make sure some values exist in other buckets than the first one.
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[(0.000000000, 4.1870174),
(0.001000141, 4.199591),
(0.002001438, 4.2184515),
(0.003001813, 4.237312),
(0.004002469, 4.256172),
(5.1, 10.0),
(5.11, 14.0),
(6.1, 20.0),
(6.11, 0.0)],
["time", "voltage"]
)
df2 = spark.createDataFrame(
[(0.000000000, 1.1870174),
(2.001000141, 4.199591),
(5.002001438, 5.2184515),
(6.003001813, 6.237312)],
["start", "end"]
)
df2.join(df1, on=[df1.time >= df2.start, df1.time <= df2.end]) \
.groupBy("start") \
.agg(F.mean("voltage")) \
.show()
+-----------+------------------+
| start| avg(voltage)|
+-----------+------------------+
| 0.0|4.2197087799999995|
|5.002001438| 12.0|
|6.003001813| 10.0|
+-----------+------------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论