Spark Structured Streaming 中的多重聚合

huangapple go评论59阅读模式
英文:

Multiple aggregation in Spark Structured Streaming

问题

我正在使用Spark Structured Streaming构建一个数据流水线,该流水线从Kafka读取数据。

这是我的源代码:

queries = []

plug_df = event_df.withWatermark('timestamp', '10 minutes').groupby(
    f.window(f.col('timestamp'), '5 minutes', '5 minutes'),
    f.col('house_id'),
    f.col('household_id'),
    f.col('plug_id')
).agg(
    f.avg('value').alias('avg_load')
)

house_df = plug_df.groupby(
    f.col('house_id'),
    f.col('window')
).agg(
    f.sum('avg_load').alias('avg_load')
)

queries.append(plug_df.writeStream.format('console').outputMode('update').start())
queries.append(house_df.writeStream.format('console').outputMode('update').start())

for query in queries:
    query.awaitTermination()

spark.stop()

plug_df查询运行正常,但当我启动house_df查询时,它会引发以下异常:

pyspark.errors.exceptions.captured.AnalysisException: 检测到可能由于全局水印而引起的正确性问题模式查询包含一个可以发出比当前水印加上允许的迟到记录延迟更早的行的有状态操作这些行在下游有状态操作中被视为迟到的行”,并且这些行可能会被丢弃请参考编程指南文档以了解更多详情如果您了解正确性问题的潜在风险并且仍然需要运行查询则可以通过将配置`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`设置为false来禁用此检查

因此,我的问题是,我如何在Spark Structured Streaming中执行多个聚合操作?在Spark Streaming中实现这一目标的推荐方法是什么?

英文:

I'm building a data pipeline using Spark Structured Streaming, which reads data from Kafka.

Here is my source code:

queries = []

plug_df = event_df.withWatermark('timestamp', '10 minutes').groupby(
    f.window(f.col('timestamp'), '5 minutes', '5 minutes'),
    f.col('house_id'),
    f.col('household_id'),
    f.col('plug_id')
).agg(
    f.avg('value').alias('avg_load')
)

house_df = plug_df.groupby(
    f.col('house_id'),
    f.col('window')
).agg(
    f.sum('avg_load').alias('avg_load')
)

queries.append(plug_df.writeStream.format('console').outputMode('update').start())
queries.append(house_df.writeStream.format('console').outputMode('update').start())

for query in queries:
    query.awaitTermination()

spark.stop()

The plug_df query works perfectly fine, but when I start the house_df query, it raises the following exception:

pyspark.errors.exceptions.captured.AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark. The query contains a stateful operation that can emit rows older than the current watermark plus the allowed late record delay, which are considered as "late rows" in downstream stateful operations and these rows can be discarded. Please refer to the programming guide documentation for more details. If you understand the potential risk of correctness issues and still need to run the query, you can disable this check by setting the configuration `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.

So my question is, how can I perform multiple aggregations in Spark Structured Streaming? What is the recommended approach to achieve this in Spark Streaming?

答案1

得分: 1

这确实不受支持。
我并不完全处于你的情况下,因为我不使用水印,但我需要执行聚合操作,所以我在我的一边使用了foreachBatch接收器,而不是在你的一边使用console。

伪代码中:

query.writeStream.foreachBatch(func)
// 在foreachBatch内执行聚合
func:

聚合尽可能多的数据

希望对你有帮助。

英文:

This is indeed not supported.
I'm not exactly in your case, as I don't use watermarks, but I have aggregations to do so what I did on my side is using foreachBatch sink instead of console on your side.

In pseudo code:

query.writeStream.foreachBatch(func)
// do aggregation inside foreachBatch
func:
  #aggregate as much as you want

Hope it works for you

huangapple
  • 本文由 发表于 2023年7月3日 15:04:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/76602533.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定