is there a way to do custom Window not time based on Kafka stream, using Pyspark

huangapple go评论93阅读模式
英文:

is there a way to do custom Window not time based on Kafka stream, using Pyspark

问题

I have a Kafka stream sending me data of heartbeat for cyclist on circuit.
I need to able to do AVG of heartbeat for each lap he did. I tried to use the session but it only works on time and in my case the time could be different each lap.

I found with foreachBatch I can create a Window on any column.

  1. .foreachBatch(calculate_heartbeat)

and in this function:

  1. def calculate_heartbeat(df, batch_id):
  2. lap_window = Window.partitionBy("lap")
  3. df = df.withColumn("avg", avg("heartbeat").over(lap_window))
  4. df.show(truncate=False)
  5. df.groupBy("lap").agg(avg("heartbeat")).show()
  6. return df

but when using the foreachBatch I am not able to accumulate the whole data of the lap, is there a way to do it?

I tried different approaches to create an empty dataframe and add each batch I received to it.
I am expecting to accumulate the whole lap in the dataframe or any better approach to do my calculation or window the lap.

英文:

I have a Kafka stream sending me data of heartbeat for cyclist on circuit.
I need to able to do AVG of heartbeat for each lap he did. I tried to use the session but it only works on time and in my case the time could be different each lap.

I found with foreachBatch I can create a Window on any column.

  1. .foreachBatch(calculate_heartbeat)

and in this function:

  1. def calculate_heartbeat(df, batch_id):
  2. lap_window = Window.partitionBy("lap")
  3. df = df.withColumn("avg", avg("heartbeat").over(lap_window))
  4. df.show(truncate=False)
  5. df.groupBy("lap").agg(avg("heartbeat")).show()
  6. return df

but when using the foreachBatch I a not able to accumulate the whole data of the lap, is there a way to do it ?

I tried different approach to create and empty dataframe and add each batch I received to it
I am expecting to get accumulate the whole lap in the dataframe, or any better approach to do my calculation ? or windows the lap

答案1

得分: 0

I'm using Spark 3.4.0 for this test.

我正在使用Spark 3.4.0进行此测试。

I have generated some CSVs in /content/input directory with the following content which I understand will be present in your event stream:

我在/content/input目录中生成了一些CSV文件,其内容如下,我理解这些内容将出现在您的事件流中:

lapId,heartbeat,timestamp
1,122,2023-05-23 10:01:00
1,132,2023-05-23 10:02:00
2,137,2023-05-23 10:03:00
2,122,2023-05-23 10:04:00
2,132,2023-05-23 10:05:00
3,137,2023-05-23 10:06:00
3,122,2023-05-23 10:07:00
3,132,2023-05-23 10:08:00
4,137,2023-05-23 10:09:00

在/content/input目录中,我生成了一些CSV文件,其内容如下,我理解这些内容将出现在您的事件流中:

Using the session window aggregation functionality of Spark Structured Streaming where in this case it is assumed that one event will be reported at least every 5 minutes (otherwise the lap is considered as over)

使用Spark Structured Streaming的会话窗口聚合功能,在这种情况下,假设每隔至少5分钟报告一次事件(否则将视为圈数结束)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, TimestampType,StringType,LongType
from pyspark.sql.functions import session_window, avg

从pyspark.sql中导入SparkSession
从pyspark.sql.types中导入StructType, StructField, TimestampType, StringType, LongType
从pyspark.sql.functions中导入session_window和avg

spark = SparkSession.builder.master("local[*]").getOrCreate()

spark = SparkSession.builder.master("local[*]").getOrCreate()

schema = StructType([
StructField('lapId', StringType(), True),
StructField('heartbeat', LongType(), True),
StructField('timestamp', TimestampType(), True)
])

schema = StructType([
StructField('lapId', StringType(), True),
StructField('heartbeat', LongType(), True),
StructField('timestamp', TimestampType(), True)
])

df = spark.readStream.format("csv").schema(schema).option("header",True).load("/content/input")

df = spark.readStream.format("csv").schema(schema).option("header",True).load("/content/input")

This is the part that interests you

这是您感兴趣的部分

avg_heartbeat_rate_per_lap = df
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window(df.timestamp, "5 minutes"),
df.lapId)
.agg(avg("heartbeat"))

avg_heartbeat_rate_per_lap = df
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window(df.timestamp, "5 minutes"),
df.lapId)
.agg(avg("heartbeat"))

query = avg_heartbeat_rate_per_lap
.writeStream
.outputMode("complete")
.queryName("aggregates")
.format("memory")
.start()

query = avg_heartbeat_rate_per_lap
.writeStream
.outputMode("complete")
.queryName("aggregates")
.format("memory")
.start()

spark.sql("select * from aggregates").show(truncate=False)

spark.sql("select * from aggregates").show(truncate=False)

The results are correct as per the inputs:

结果与输入相符:

+------------------------------------------+-----+------------------+
|session_window |lapId|avg(heartbeat) |
+------------------------------------------+-----+------------------+
|{2023-05-23 10:06:00, 2023-05-23 10:13:00}|3 |130.33333333333334|
|{2023-05-23 10:01:00, 2023-05-23 10:07:00}|1 |127.0 |
|{2023-05-23 10:09:00, 2023-05-23 10:14:00}|4 |137.0 |
|{2023-05-23 10:03:00, 2023-05-23 10:10:00}|2 |130.33333333333334|
+------------------------------------------+-----+------------------+

+------------------------------------------+-----+------------------+
|session_window |lapId|avg(heartbeat) |
+------------------------------------------+-----+------------------+
|{2023-05-23 10:06:00, 2023-05-23 10:13:00}|3 |130.33333333333334|
|{2023-05-23 10:01:00, 2023-05-23 10:07:00}|1 |127.0 |
|{2023-05-23 10:09:00, 2023-05-23 10:14:00}|4 |137.0 |
|{2023-05-23 10:03:00, 2023-05-23 10:10:00}|2 |130.33333333333334|
+------------------------------------------+-----+------------------+

英文:

I'm using Spark 3.4.0 for this test.

I have generated some CSVs in /content/input directory with the following content which I understand will be present in your event stream:

  1. lapId,heartbeat,timestamp
  2. 1,122,2023-05-23 10:01:00
  3. 1,132,2023-05-23 10:02:00
  4. 2,137,2023-05-23 10:03:00
  5. 2,122,2023-05-23 10:04:00
  6. 2,132,2023-05-23 10:05:00
  7. 3,137,2023-05-23 10:06:00
  8. 3,122,2023-05-23 10:07:00
  9. 3,132,2023-05-23 10:08:00
  10. 4,137,2023-05-23 10:09:00

Using the session window aggregation functionality of Spark Structured Streaming where in this case it is assumed that one event will be reported at least every 5 minutes (otherwise the lap is considered as over)

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import StructType,StructField, TimestampType,StringType,LongType
  3. from pyspark.sql.functions import session_window, avg
  4. spark = SparkSession.builder.master("local[*]").getOrCreate()
  5. schema = StructType([
  6. StructField('lapId', StringType(), True),
  7. StructField('heartbeat', LongType(), True),
  8. StructField('timestamp', TimestampType(), True)
  9. ])
  10. df = spark.readStream.format("csv").schema(schema).option("header",True).load("/content/input")
  11. # This is the part that interests you
  12. avg_heartbeat_rate_per_lap = df \
  13. .withWatermark("timestamp", "10 minutes") \
  14. .groupBy(
  15. session_window(df.timestamp, "5 minutes"),
  16. df.lapId) \
  17. .agg(avg("heartbeat"))
  18. query = avg_heartbeat_rate_per_lap \
  19. .writeStream \
  20. .outputMode("complete") \
  21. .queryName("aggregates") \
  22. .format("memory") \
  23. .start()
  24. spark.sql("select * from aggregates").show(truncate=False)

The results are correct as per the inputs:

  1. +------------------------------------------+-----+------------------+
  2. |session_window |lapId|avg(heartbeat) |
  3. +------------------------------------------+-----+------------------+
  4. |{2023-05-23 10:06:00, 2023-05-23 10:13:00}|3 |130.33333333333334|
  5. |{2023-05-23 10:01:00, 2023-05-23 10:07:00}|1 |127.0 |
  6. |{2023-05-23 10:09:00, 2023-05-23 10:14:00}|4 |137.0 |
  7. |{2023-05-23 10:03:00, 2023-05-23 10:10:00}|2 |130.33333333333334|
  8. +------------------------------------------+-----+------------------+

huangapple
  • 本文由 发表于 2023年5月17日 23:48:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/76273950.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定