Doesn't understand the window output with event time and watermark
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object EventTimeWindowTest {
def to_milli(str: String) =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str).getTime
def to_char(milli: Long) = {
val date = if (milli <= 0) new Date(0) else new Date(milli)
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = Seq(
("A", "2020-08-30 10:50:15"),
("A", "2020-08-30 10:50:11"),
("B", "2020-08-30 10:50:14"),
("B", "2020-08-30 10:50:09"),
("A", "2020-08-30 10:50:21"),
("A", "2020-08-30 10:50:10")
val stream = env.fromCollection(data).setParallelism(1).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, String)](Time.seconds(4)) {
override def extractTimestamp(evt: (String, String)): Long = to_milli(evt._2)
.apply(new WindowFunction[(String, String), String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, windowData: Iterable[(String, String)], out: Collector[String]): Unit = {
val start = to_char(window.getStart)
val end = to_char(window.getEnd)
val sb = new StringBuilder
sb.append(s"$key($start, $end):")
windowData.foreach {
e => sb.append(e._2 + ",")
out.collect(sb.toString().substring(0, sb.length - 1))
A(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:15,2020-08-30 10:50:11,2020-08-30 10:50:10
B(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:14,2020-08-30 10:50:09
A(2020-08-30 10:50:16, 2020-08-30 10:50:24):2020-08-30 10:50:21
我不明白为什么"A"的2020-08-30 10:50:10
被打印出来。对于这个事件,之前的事件("A", "2020-08-30 10:50:21")
将水印推进到2020-08-30 10:50:17
,因此Flink应该关闭窗口("2020-08-30 10:50:08", "2020-08-30 10:50:16")
I have following flink code that prints the content of the window that is based on event time and watermark. 4 seconds are allowed to be out of order,and window length is 8 seconds.
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object EventTimeWindowTest{
def to_milli(str: String) =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str).getTime
def to_char(milli: Long) = {
val date = if (milli <= 0) new Date(0) else new Date(milli)
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = Seq(
("A", "2020-08-30 10:50:15"),
("A", "2020-08-30 10:50:11"),
("B", "2020-08-30 10:50:14"),
("B", "2020-08-30 10:50:09"),
("A", "2020-08-30 10:50:21"),
("A", "2020-08-30 10:50:10")
val stream = env.fromCollection(data).setParallelism(1).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, String)](Time.seconds(4)) {
override def extractTimestamp(evt: (String, String)): Long = to_milli(evt._2)
.window(TumblingEventTimeWindows.of(Time.of(8, TimeUnit.SECONDS)))
.apply(new WindowFunction[(String, String), String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, windowData: Iterable[(String, String)], out: Collector[String]): Unit = {
val start = to_char(window.getStart)
val end = to_char(window.getEnd)
val sb = new StringBuilder
sb.append(s"$key($start, $end):")
windowData.foreach {
e => sb.append(e._2 + ",")
out.collect(sb.toString().substring(0, sb.length - 1))
With above code, the output is:
A(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:15,2020-08-30 10:50:11,2020-08-30 10:50:10
B(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:14,2020-08-30 10:50:09
A(2020-08-30 10:50:16, 2020-08-30 10:50:24):2020-08-30 10:50:21
I don't understand why 2020-08-30 10:50:10
for A
is printed out,
for this event, the previous event ("A", "2020-08-30 10:50:21")
forwards the watermark to be
2020-08-30 10:50:17
, hence flink should close the window (2020-08-30 10:50:08, 2020-08-30 10:50:16)
Could some one take a look? Thanks!
得分: 1
尽管事件 A:2020-08-30 10:50:10 可能会延迟,但不一定会。水印不一定是确定性的。
Although the event A:2020-08-30 10:50:10 could be late, it doesn't have to be. Watermarking is not necessarily deterministic.
In particular, the bounded-out-of-orderness watermark strategy uses a periodic watermark generator, that by default emits a new watermark every 200msec (of wall clock time). Your job doesn't run that long, so there's just one watermark happening at the end of the job, which closes all of the windows at once.
If you were to use a punctuated watermark generator rather than a periodic one, then you could have watermarks being generated at every conceivable opportunity (every time there's a new maximum timestamp). This would then behave deterministically, but at the expense of increasing the overhead, because the streams would have more watermarks for the operators to process.