英文:
Groupby, Window and rolling average in Spark
问题
我想在大型数据集上使用Pyspark进行分组和滚动平均。由于不熟悉Pyspark,我很难看出我的错误在哪里。为什么这不起作用?
data = pd.DataFrame({'group':['A']*5+['B']*5,
'order':[1,2,3,4,5, 1,2,3,4,5],
'value':[23, 54, 65, 64, 78, 98, 78, 76, 77, 57]})
spark_df = spark.createDataFrame(data)
window_spec = Window.partitionBy("group").orderBy("order").rowsBetween(-1, 0)
计算col_value的滚动平均值
rolling_avg = avg(col("value")).over(window_spec).alias("value_rolling_avg")
按col_group分组并计算col_value的滚动平均值
spark_df.groupby("group").agg(rolling_avg).show()
AnalysisException: [COLUMN_NOT_IN_GROUP_BY_CLAUSE] 表达式"order"既不在分组中,也不是聚合函数。如果您不在乎获取哪个值,请添加到分组中或使用first()
(或first_value()
)包装它。
<details>
<summary>英文:</summary>
I want to do a groupby and do a rolling average on huge dataset using pyspark. Not used to pyspark I struggle to see my mistake here. Why doesn't this work?
data = pd.DataFrame({'group':['A']*5+['B']*5,
'order':[1,2,3,4,5, 1,2,3,4,5],
'value':[23, 54, 65, 64, 78, 98, 78, 76, 77, 57]})
spark_df = spark.createDataFrame(data)
window_spec = Window.partitionBy("group").orderBy("order").rowsBetween(-1, 0)
Calculate the rolling average of col_value
rolling_avg = avg(col("value")).over(window_spec).alias("value_rolling_avg")
Group by col_group and col_date and calculate the rolling average of col_value
spark_df.groupby("group").agg(rolling_avg).show()
AnalysisException: [COLUMN_NOT_IN_GROUP_BY_CLAUSE] The expression "order" is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first()
(or first_value()
) if you don't care which value you get.;
</details>
# 答案1
**得分**: 1
你正在混合使用窗口函数和聚合函数。
你可以简单地选择`rolling_avg`来获得你的结果:
spark_df.select(rolling_avg).show()
+-----------------+
|value_rolling_avg|
+-----------------+
| 23.0|
| 38.5|
| 59.5|
| 64.5|
| 71.0|
| 98.0|
| 88.0|
| 77.0|
| 76.5|
| 67.0|
+-----------------+
或者
spark_df.withColumn("value_rolling_avg", rolling_avg).show()
+------+------+------+-----------------+
|group_|order_|value_|value_rolling_avg|
+------+------+------+-----------------+
| A| 1| 23| 23.0|
| A| 2| 54| 38.5|
| A| 3| 65| 59.5|
| A| 4| 64| 64.5|
| A| 5| 78| 71.0|
| B| 1| 98| 98.0|
| B| 2| 78| 88.0|
| B| 3| 76| 77.0|
| B| 4| 77| 76.5|
| B| 5| 57| 67.0|
+------+------+------+-----------------+
<details>
<summary>英文:</summary>
You are mixing window functions and aggregation functions.
you can simply select `rolling_avg` to get your result :
spark_df.select(rolling_avg).show()
+-----------------+
|value_rolling_avg|
+-----------------+
| 23.0|
| 38.5|
| 59.5|
| 64.5|
| 71.0|
| 98.0|
| 88.0|
| 77.0|
| 76.5|
| 67.0|
+-----------------+
OR
spark_df.withColumn("value_rolling_avg", rolling_avg).show()
+------+------+------+-----------------+
|group_|order_|value_|value_rolling_avg|
+------+------+------+-----------------+
| A| 1| 23| 23.0|
| A| 2| 54| 38.5|
| A| 3| 65| 59.5|
| A| 4| 64| 64.5|
| A| 5| 78| 71.0|
| B| 1| 98| 98.0|
| B| 2| 78| 88.0|
| B| 3| 76| 77.0|
| B| 4| 77| 76.5|
| B| 5| 57| 67.0|
+------+------+------+-----------------+
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论