英文:
Multiple aggregation in Spark Structured Streaming
问题
我正在使用Spark Structured Streaming构建一个数据流水线,该流水线从Kafka读取数据。
这是我的源代码:
queries = []
plug_df = event_df.withWatermark('timestamp', '10 minutes').groupby(
f.window(f.col('timestamp'), '5 minutes', '5 minutes'),
f.col('house_id'),
f.col('household_id'),
f.col('plug_id')
).agg(
f.avg('value').alias('avg_load')
)
house_df = plug_df.groupby(
f.col('house_id'),
f.col('window')
).agg(
f.sum('avg_load').alias('avg_load')
)
queries.append(plug_df.writeStream.format('console').outputMode('update').start())
queries.append(house_df.writeStream.format('console').outputMode('update').start())
for query in queries:
query.awaitTermination()
spark.stop()
plug_df查询运行正常,但当我启动house_df查询时,它会引发以下异常:
pyspark.errors.exceptions.captured.AnalysisException: 检测到可能由于全局水印而引起的“正确性”问题模式。查询包含一个可以发出比当前水印加上允许的迟到记录延迟更早的行的有状态操作,这些行在下游有状态操作中被视为“迟到的行”,并且这些行可能会被丢弃。请参考编程指南文档以了解更多详情。如果您了解正确性问题的潜在风险并且仍然需要运行查询,则可以通过将配置`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`设置为false来禁用此检查。
因此,我的问题是,我如何在Spark Structured Streaming中执行多个聚合操作?在Spark Streaming中实现这一目标的推荐方法是什么?
英文:
I'm building a data pipeline using Spark Structured Streaming, which reads data from Kafka.
Here is my source code:
queries = []
plug_df = event_df.withWatermark('timestamp', '10 minutes').groupby(
f.window(f.col('timestamp'), '5 minutes', '5 minutes'),
f.col('house_id'),
f.col('household_id'),
f.col('plug_id')
).agg(
f.avg('value').alias('avg_load')
)
house_df = plug_df.groupby(
f.col('house_id'),
f.col('window')
).agg(
f.sum('avg_load').alias('avg_load')
)
queries.append(plug_df.writeStream.format('console').outputMode('update').start())
queries.append(house_df.writeStream.format('console').outputMode('update').start())
for query in queries:
query.awaitTermination()
spark.stop()
The plug_df query works perfectly fine, but when I start the house_df query, it raises the following exception:
pyspark.errors.exceptions.captured.AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark. The query contains a stateful operation that can emit rows older than the current watermark plus the allowed late record delay, which are considered as "late rows" in downstream stateful operations and these rows can be discarded. Please refer to the programming guide documentation for more details. If you understand the potential risk of correctness issues and still need to run the query, you can disable this check by setting the configuration `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.
So my question is, how can I perform multiple aggregations in Spark Structured Streaming? What is the recommended approach to achieve this in Spark Streaming?
答案1
得分: 1
这确实不受支持。
我并不完全处于你的情况下,因为我不使用水印,但我需要执行聚合操作,所以我在我的一边使用了foreachBatch接收器,而不是在你的一边使用console。
伪代码中:
query.writeStream.foreachBatch(func)
// 在foreachBatch内执行聚合
func:
聚合尽可能多的数据
希望对你有帮助。
英文:
This is indeed not supported.
I'm not exactly in your case, as I don't use watermarks, but I have aggregations to do so what I did on my side is using foreachBatch sink instead of console on your side.
In pseudo code:
query.writeStream.foreachBatch(func)
// do aggregation inside foreachBatch
func:
#aggregate as much as you want
Hope it works for you
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论