英文:
How to register a StreamingListener with SparkSession?
问题
我有一个结构化流应用程序,我想要向其注册一个StreamingListener
,我该如何做呢?我似乎只能通过sparkSession.streams.addListener()
注册StreamingQueryListener
。
我知道如何将StreamingListener
添加到StreamingContext
中,但问题似乎在于如何获取StreamingContext
,如果我从SparkSession.sparkContext
创建一个StreamingContext
,我认为如果我继续从SparkSession
而不是我创建的StreamingContext
中进行流式处理,StreamingListener
将无法工作。
这是我的StreamingListener
类:
import io.prometheus.client.{CollectorRegistry, Gauge}
import io.prometheus.client.exporter.PushGateway
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
class MicroBatchStatsListener(val pushGateway: PushGateway, jobName: String) extends StreamingListener {
private val processingTimeGauge = Gauge.build()
.name("processingTimeGauge")
.help("Time it took to process this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val schedulingDelayGauge = Gauge.build()
.name("schedulingDelayGauge")
.help("Scheduling delay of this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val numRecordsGauge = Gauge.build()
.name("numRecordsGauge")
.help("Number of records received in this mircobatch")
.register(CollectorRegistry.defaultRegistry)
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val processingTime = if (batchCompleted.batchInfo.processingDelay.isDefined) {
batchCompleted.batchInfo.processingDelay.get / 1000
} else {
0
}
val schedulingDelay = if (batchCompleted.batchInfo.schedulingDelay.isDefined) {
batchCompleted.batchInfo.schedulingDelay.get / 1000
} else {
0
}
val numRecords = batchCompleted.batchInfo.numRecords
processingTimeGauge.set(processingTime)
schedulingDelayGauge.set(schedulingDelay)
numRecordsGauge.set(numRecords)
pushGateway.push(CollectorRegistry.defaultRegistry, jobName)
}
}
如果无法将其连接到SparkSession
,如何在StreamingQueryListener
中执行相同操作呢?我不明白如何从那里获取这些指标:
- 调度延迟(Scheduling delay)
- 处理时间(Processing time)
英文:
I have a structured streaming application and I want to register a StreamingListener
with it, how do I do that? I only seem to be able to register StreamingQueryListener
via sparkSession.streams.addListener()
.
I know how to add a StreamingListener
into StreamingContext
, but the problem seems to be in getting StreamingContext
, if I create a StreamingContext
from SparkSession.sparkContext
, I don't think StreamingListener
will work if I continue doing streaming from SparkSession
instead of that StreamingContext
that I created.
Here's my StreamingListener
class:
import io.prometheus.client.{CollectorRegistry, Gauge}
import io.prometheus.client.exporter.PushGateway
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
class MicroBatchStatsListener(val pushGateway: PushGateway, jobName: String) extends StreamingListener {
private val processingTimeGauge = Gauge.build()
.name("processingTimeGauge")
.help("Time it took to process this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val schedulingDelayGauge = Gauge.build()
.name("schedulingDelayGauge")
.help("Scheduling delay of this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val numRecordsGauge = Gauge.build()
.name("numRecordsGauge")
.help("Number of records received in this mircobatch")
.register(CollectorRegistry.defaultRegistry)
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val processingTime = if (batchCompleted.batchInfo.processingDelay.isDefined) {
batchCompleted.batchInfo.processingDelay.get / 1000
} else {
0
}
val schedulingDelay = if (batchCompleted.batchInfo.schedulingDelay.isDefined) {
batchCompleted.batchInfo.schedulingDelay.get / 1000
} else {
0
}
val numRecords = batchCompleted.batchInfo.numRecords
processingTimeGauge.set(processingTime)
schedulingDelayGauge.set(schedulingDelay)
numRecordsGauge.set(numRecords)
pushGateway.push(CollectorRegistry.defaultRegistry, jobName)
}
}
if I can't connect it to SparkSession
, how do I do the same with StreamingQueryListener
? I don't understand how to get these metrics from there:
- Scheduling delay
- Processing time
答案1
得分: 0
原来,在结构化流式处理中注册StreamingListener
是不可能的,因为如果您使用结构化流,StreamingContext
是不可用的。此获取StreamingContext
的方法在此情况下不起作用,因为它会创建一个单独的StreamingContext
,如果您使用它注册侦听器,它们将无法与您使用的结构化流一起工作。
但是,所有这些指标实际上可以使用StreamingQueryListener
来计算,下面是它具有的指标示例:
23/06/20 17:59:15 INFO MicroBatchExecution: 流式查询取得了进展:{
"id" : "4005e0e1-de55-4e9e-a01f-2bd3eec3d77e",
"runId" : "9d847913-72f8-4410-8b5f-0c69b798bc3c",
"name" : null,
"timestamp" : "2023-06-20T14:59:15.184Z",
"batchId" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472,
"durationMs" : {
"addBatch" : 31,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 4,
"setOffsetRange" : 0,
"triggerExecution" : 212,
"walCommit" : 112
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 2,
"endOffset" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3d192c1a"
}
}
我们可以将durationMs
中的所有值相加,以获取总处理时间,例如。
如果您知道在该映射中提到的操作顺序,还可以从durationMs
中计算调度延迟。因此,您需要将发生在triggerExecution
之前的所有操作相加,您将获得调度延迟。
还有输入记录的数量。
所有这些信息都在StreamingQueryListener
的StreamingQueryListener.QueryProgressEvent
类中可用,它在每个微批处理后触发。
更新:在此处找到了一个类似的问题,并为其提供了答案:https://stackoverflow.com/questions/58959668/structured-spark-streaming-metrics-retrieval/76536197#76536197
英文:
Turns out it is impossible to register a StreamingListener
for structured streaming, because StreamingContext
is not available if you use structured streaming. This method of getting StreamingContext
does not work in this case, because it creates a separate StreamingContext
, and if you register listeners with it, they won't work as you use structured streaming.
But all these metrics, can actually be computed with StreamingQueryListener
, an example of what metrics it has is the following:
23/06/20 17:59:15 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "4005e0e1-de55-4e9e-a01f-2bd3eec3d77e",
"runId" : "9d847913-72f8-4410-8b5f-0c69b798bc3c",
"name" : null,
"timestamp" : "2023-06-20T14:59:15.184Z",
"batchId" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472,
"durationMs" : {
"addBatch" : 31,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 4,
"setOffsetRange" : 0,
"triggerExecution" : 212,
"walCommit" : 112
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 2,
"endOffset" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3d192c1a"
}
}
We can add up all the values from durationMs
and get the total processing time, for example.
Scheduling delays can also be computed from durationMs
if you know the order of operations mentioned in that map. So, you'd have to add up all operations that happen before triggerExecution
and you will get your scheduling delay.
Number of input records is also there.
All this info is available in the StreamingQueryListener.QueryProgressEvent
class of StreamingQueryListener
's onQueryProgress
method, which is triggered after every microbatch.
UPDATE: found a similar question and provided an answer for it here: https://stackoverflow.com/questions/58959668/structured-spark-streaming-metrics-retrieval/76536197#76536197
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论