如何在SparkSession中注册StreamingListener?

huangapple go评论49阅读模式
英文:

How to register a StreamingListener with SparkSession?

问题

我有一个结构化流应用程序,我想要向其注册一个StreamingListener,我该如何做呢?我似乎只能通过sparkSession.streams.addListener()注册StreamingQueryListener

我知道如何将StreamingListener添加到StreamingContext中,但问题似乎在于如何获取StreamingContext,如果我从SparkSession.sparkContext创建一个StreamingContext,我认为如果我继续从SparkSession而不是我创建的StreamingContext中进行流式处理,StreamingListener将无法工作。

这是我的StreamingListener类:

import io.prometheus.client.{CollectorRegistry, Gauge}
import io.prometheus.client.exporter.PushGateway
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}

class MicroBatchStatsListener(val pushGateway: PushGateway, jobName: String) extends StreamingListener {

  private val processingTimeGauge = Gauge.build()
    .name("processingTimeGauge")
    .help("Time it took to process this microbatch")
    .register(CollectorRegistry.defaultRegistry)

  private val schedulingDelayGauge = Gauge.build()
    .name("schedulingDelayGauge")
    .help("Scheduling delay of this microbatch")
    .register(CollectorRegistry.defaultRegistry)

  private val numRecordsGauge = Gauge.build()
    .name("numRecordsGauge")
    .help("Number of records received in this mircobatch")
    .register(CollectorRegistry.defaultRegistry)

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val processingTime = if (batchCompleted.batchInfo.processingDelay.isDefined) {
      batchCompleted.batchInfo.processingDelay.get / 1000
    } else {
      0
    }

    val schedulingDelay = if (batchCompleted.batchInfo.schedulingDelay.isDefined) {
      batchCompleted.batchInfo.schedulingDelay.get / 1000
    } else {
      0
    }

    val numRecords = batchCompleted.batchInfo.numRecords

    processingTimeGauge.set(processingTime)
    schedulingDelayGauge.set(schedulingDelay)
    numRecordsGauge.set(numRecords)

    pushGateway.push(CollectorRegistry.defaultRegistry, jobName)
  }
}

如果无法将其连接到SparkSession,如何在StreamingQueryListener中执行相同操作呢?我不明白如何从那里获取这些指标:

  1. 调度延迟(Scheduling delay)
  2. 处理时间(Processing time)
英文:

I have a structured streaming application and I want to register a StreamingListener with it, how do I do that? I only seem to be able to register StreamingQueryListener via sparkSession.streams.addListener().

I know how to add a StreamingListener into StreamingContext, but the problem seems to be in getting StreamingContext, if I create a StreamingContext from SparkSession.sparkContext, I don't think StreamingListener will work if I continue doing streaming from SparkSession instead of that StreamingContext that I created.

Here's my StreamingListener class:

import io.prometheus.client.{CollectorRegistry, Gauge}
import io.prometheus.client.exporter.PushGateway
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
class MicroBatchStatsListener(val pushGateway: PushGateway, jobName: String) extends StreamingListener {
private val processingTimeGauge = Gauge.build()
.name("processingTimeGauge")
.help("Time it took to process this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val schedulingDelayGauge = Gauge.build()
.name("schedulingDelayGauge")
.help("Scheduling delay of this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val numRecordsGauge = Gauge.build()
.name("numRecordsGauge")
.help("Number of records received in this mircobatch")
.register(CollectorRegistry.defaultRegistry)
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val processingTime = if (batchCompleted.batchInfo.processingDelay.isDefined) {
batchCompleted.batchInfo.processingDelay.get / 1000
} else {
0
}
val schedulingDelay = if (batchCompleted.batchInfo.schedulingDelay.isDefined) {
batchCompleted.batchInfo.schedulingDelay.get / 1000
} else {
0
}
val numRecords = batchCompleted.batchInfo.numRecords
processingTimeGauge.set(processingTime)
schedulingDelayGauge.set(schedulingDelay)
numRecordsGauge.set(numRecords)
pushGateway.push(CollectorRegistry.defaultRegistry, jobName)
}
}

if I can't connect it to SparkSession, how do I do the same with StreamingQueryListener? I don't understand how to get these metrics from there:

  1. Scheduling delay
  2. Processing time

答案1

得分: 0

原来,在结构化流式处理中注册StreamingListener是不可能的,因为如果您使用结构化流,StreamingContext是不可用的。获取StreamingContext的方法在此情况下不起作用,因为它会创建一个单独的StreamingContext,如果您使用它注册侦听器,它们将无法与您使用的结构化流一起工作。

但是,所有这些指标实际上可以使用StreamingQueryListener来计算,下面是它具有的指标示例:

23/06/20 17:59:15 INFO MicroBatchExecution: 流式查询取得了进展:{
"id" : "4005e0e1-de55-4e9e-a01f-2bd3eec3d77e",
"runId" : "9d847913-72f8-4410-8b5f-0c69b798bc3c",
"name" : null,
"timestamp" : "2023-06-20T14:59:15.184Z",
"batchId" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472,
"durationMs" : {
"addBatch" : 31,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 4,
"setOffsetRange" : 0,
"triggerExecution" : 212,
"walCommit" : 112
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 2,
"endOffset" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3d192c1a"
}
}

我们可以将durationMs中的所有值相加,以获取总处理时间,例如。

如果您知道在该映射中提到的操作顺序,还可以从durationMs中计算调度延迟。因此,您需要将发生在triggerExecution之前的所有操作相加,您将获得调度延迟。

还有输入记录的数量。

所有这些信息都在StreamingQueryListenerStreamingQueryListener.QueryProgressEvent类中可用,它在每个微批处理后触发。

更新:在此处找到了一个类似的问题,并为其提供了答案:https://stackoverflow.com/questions/58959668/structured-spark-streaming-metrics-retrieval/76536197#76536197

英文:

Turns out it is impossible to register a StreamingListener for structured streaming, because StreamingContext is not available if you use structured streaming. This method of getting StreamingContext does not work in this case, because it creates a separate StreamingContext, and if you register listeners with it, they won't work as you use structured streaming.

But all these metrics, can actually be computed with StreamingQueryListener, an example of what metrics it has is the following:

23/06/20 17:59:15 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "4005e0e1-de55-4e9e-a01f-2bd3eec3d77e",
"runId" : "9d847913-72f8-4410-8b5f-0c69b798bc3c",
"name" : null,
"timestamp" : "2023-06-20T14:59:15.184Z",
"batchId" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472,
"durationMs" : {
"addBatch" : 31,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 4,
"setOffsetRange" : 0,
"triggerExecution" : 212,
"walCommit" : 112
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 2,
"endOffset" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 47.16981132075472
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3d192c1a"
}
}

We can add up all the values from durationMs and get the total processing time, for example.

Scheduling delays can also be computed from durationMs if you know the order of operations mentioned in that map. So, you'd have to add up all operations that happen before triggerExecution and you will get your scheduling delay.

Number of input records is also there.

All this info is available in the StreamingQueryListener.QueryProgressEvent class of StreamingQueryListener's onQueryProgress method, which is triggered after every microbatch.

UPDATE: found a similar question and provided an answer for it here: https://stackoverflow.com/questions/58959668/structured-spark-streaming-metrics-retrieval/76536197#76536197

huangapple
  • 本文由 发表于 2023年6月19日 23:34:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/76508118.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定