不理解带有事件时间和水印的窗口输出。

huangapple go评论49阅读模式
英文:

Doesn't understand the window output with event time and watermark

问题

我有以下的Flink代码,它根据事件时间和水印生成窗口,并且允许4秒的乱序,窗口长度为8秒。

import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object EventTimeWindowTest {
  def to_milli(str: String) =
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str).getTime

  def to_char(milli: Long) = {
    val date = if (milli <= 0) new Date(0) else new Date(milli)
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val data = Seq(
      ("A", "2020-08-30 10:50:15"),
      ("A", "2020-08-30 10:50:11"),
      ("B", "2020-08-30 10:50:14"),
      ("B", "2020-08-30 10:50:09"),
      ("A", "2020-08-30 10:50:21"),
      ("A", "2020-08-30 10:50:10")
    )

    val stream = env.fromCollection(data).setParallelism(1).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, String)](Time.seconds(4)) {
      override def extractTimestamp(evt: (String, String)): Long = to_milli(evt._2)
    }).keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(8)))
      .apply(new WindowFunction[(String, String), String, String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, windowData: Iterable[(String, String)], out: Collector[String]): Unit = {
          val start = to_char(window.getStart)
          val end = to_char(window.getEnd)
          val sb = new StringBuilder
          sb.append(s"$key($start, $end):")
          windowData.foreach {
            e => sb.append(e._2 + ",")
          }
          out.collect(sb.toString().substring(0, sb.length - 1))
        }
      })
    stream.print().setParallelism(1)

    env.execute()
  }
}

以上代码的输出结果是:

A(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:15,2020-08-30 10:50:11,2020-08-30 10:50:10
B(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:14,2020-08-30 10:50:09
A(2020-08-30 10:50:16, 2020-08-30 10:50:24):2020-08-30 10:50:21

我不明白为什么"A"的2020-08-30 10:50:10被打印出来。对于这个事件,之前的事件("A", "2020-08-30 10:50:21")将水印推进到2020-08-30 10:50:17,因此Flink应该关闭窗口("2020-08-30 10:50:08", "2020-08-30 10:50:16")。能有人看一下吗?谢谢!

英文:

I have following flink code that prints the content of the window that is based on event time and watermark. 4 seconds are allowed to be out of order,and window length is 8 seconds.

import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object EventTimeWindowTest{
def to_milli(str: String) =
new SimpleDateFormat(&quot;yyyy-MM-dd HH:mm:ss&quot;).parse(str).getTime
def to_char(milli: Long) = {
val date = if (milli &lt;= 0) new Date(0) else new Date(milli)
new SimpleDateFormat(&quot;yyyy-MM-dd HH:mm:ss&quot;).format(date)
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val data = Seq(
(&quot;A&quot;, &quot;2020-08-30 10:50:15&quot;),
(&quot;A&quot;, &quot;2020-08-30 10:50:11&quot;),
(&quot;B&quot;, &quot;2020-08-30 10:50:14&quot;),
(&quot;B&quot;, &quot;2020-08-30 10:50:09&quot;),
(&quot;A&quot;, &quot;2020-08-30 10:50:21&quot;),
(&quot;A&quot;, &quot;2020-08-30 10:50:10&quot;)
)
val stream = env.fromCollection(data).setParallelism(1).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, String)](Time.seconds(4)) {
override def extractTimestamp(evt: (String, String)): Long = to_milli(evt._2)
}).keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.of(8, TimeUnit.SECONDS)))
.apply(new WindowFunction[(String, String), String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, windowData: Iterable[(String, String)], out: Collector[String]): Unit = {
val start = to_char(window.getStart)
val end = to_char(window.getEnd)
val sb = new StringBuilder
sb.append(s&quot;$key($start, $end):&quot;)
windowData.foreach {
e =&gt; sb.append(e._2 + &quot;,&quot;)
}
out.collect(sb.toString().substring(0, sb.length - 1))
}
})
stream.print().setParallelism(1)
env.execute()
}
}

With above code, the output is:

A(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:15,2020-08-30 10:50:11,2020-08-30 10:50:10
B(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:14,2020-08-30 10:50:09
A(2020-08-30 10:50:16, 2020-08-30 10:50:24):2020-08-30 10:50:21

I don't understand why 2020-08-30 10:50:10 for A is printed out,

for this event, the previous event (&quot;A&quot;, &quot;2020-08-30 10:50:21&quot;) forwards the watermark to be
2020-08-30 10:50:17, hence flink should close the window (2020-08-30 10:50:08, 2020-08-30 10:50:16) .

Could some one take a look? Thanks!

答案1

得分: 1

尽管事件 A:2020-08-30 10:50:10 可能会延迟,但不一定会。水印不一定是确定性的。

特别是,有界无序水印策略使用周期性水印生成器,默认情况下每200毫秒(挂钟时间)发出一个新的水印。您的作业运行时间不那么长,所以作业结束时只会有一个水印,一次性关闭所有窗口。

如果您使用的是间断性水印生成器而不是周期性的,那么您可以在每一个可能的机会(每次有新的最大时间戳时)生成水印。这样会变得确定性,但代价是增加了开销,因为流会有更多的水印供运算符处理。

英文:

Although the event A:2020-08-30 10:50:10 could be late, it doesn't have to be. Watermarking is not necessarily deterministic.

In particular, the bounded-out-of-orderness watermark strategy uses a periodic watermark generator, that by default emits a new watermark every 200msec (of wall clock time). Your job doesn't run that long, so there's just one watermark happening at the end of the job, which closes all of the windows at once.

If you were to use a punctuated watermark generator rather than a periodic one, then you could have watermarks being generated at every conceivable opportunity (every time there's a new maximum timestamp). This would then behave deterministically, but at the expense of increasing the overhead, because the streams would have more watermarks for the operators to process.

huangapple
  • 本文由 发表于 2023年2月24日 16:33:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/75554239.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定