英文:
Doesn't understand the window output with event time and watermark
问题
我有以下的Flink代码,它根据事件时间和水印生成窗口,并且允许4秒的乱序,窗口长度为8秒。
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object EventTimeWindowTest {
def to_milli(str: String) =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str).getTime
def to_char(milli: Long) = {
val date = if (milli <= 0) new Date(0) else new Date(milli)
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val data = Seq(
("A", "2020-08-30 10:50:15"),
("A", "2020-08-30 10:50:11"),
("B", "2020-08-30 10:50:14"),
("B", "2020-08-30 10:50:09"),
("A", "2020-08-30 10:50:21"),
("A", "2020-08-30 10:50:10")
)
val stream = env.fromCollection(data).setParallelism(1).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, String)](Time.seconds(4)) {
override def extractTimestamp(evt: (String, String)): Long = to_milli(evt._2)
}).keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(8)))
.apply(new WindowFunction[(String, String), String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, windowData: Iterable[(String, String)], out: Collector[String]): Unit = {
val start = to_char(window.getStart)
val end = to_char(window.getEnd)
val sb = new StringBuilder
sb.append(s"$key($start, $end):")
windowData.foreach {
e => sb.append(e._2 + ",")
}
out.collect(sb.toString().substring(0, sb.length - 1))
}
})
stream.print().setParallelism(1)
env.execute()
}
}
以上代码的输出结果是:
A(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:15,2020-08-30 10:50:11,2020-08-30 10:50:10
B(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:14,2020-08-30 10:50:09
A(2020-08-30 10:50:16, 2020-08-30 10:50:24):2020-08-30 10:50:21
我不明白为什么"A"的2020-08-30 10:50:10
被打印出来。对于这个事件,之前的事件("A", "2020-08-30 10:50:21")
将水印推进到2020-08-30 10:50:17
,因此Flink应该关闭窗口("2020-08-30 10:50:08", "2020-08-30 10:50:16")
。能有人看一下吗?谢谢!
英文:
I have following flink code that prints the content of the window that is based on event time and watermark. 4 seconds are allowed to be out of order,and window length is 8 seconds.
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object EventTimeWindowTest{
def to_milli(str: String) =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str).getTime
def to_char(milli: Long) = {
val date = if (milli <= 0) new Date(0) else new Date(milli)
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val data = Seq(
("A", "2020-08-30 10:50:15"),
("A", "2020-08-30 10:50:11"),
("B", "2020-08-30 10:50:14"),
("B", "2020-08-30 10:50:09"),
("A", "2020-08-30 10:50:21"),
("A", "2020-08-30 10:50:10")
)
val stream = env.fromCollection(data).setParallelism(1).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, String)](Time.seconds(4)) {
override def extractTimestamp(evt: (String, String)): Long = to_milli(evt._2)
}).keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.of(8, TimeUnit.SECONDS)))
.apply(new WindowFunction[(String, String), String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, windowData: Iterable[(String, String)], out: Collector[String]): Unit = {
val start = to_char(window.getStart)
val end = to_char(window.getEnd)
val sb = new StringBuilder
sb.append(s"$key($start, $end):")
windowData.foreach {
e => sb.append(e._2 + ",")
}
out.collect(sb.toString().substring(0, sb.length - 1))
}
})
stream.print().setParallelism(1)
env.execute()
}
}
With above code, the output is:
A(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:15,2020-08-30 10:50:11,2020-08-30 10:50:10
B(2020-08-30 10:50:08, 2020-08-30 10:50:16):2020-08-30 10:50:14,2020-08-30 10:50:09
A(2020-08-30 10:50:16, 2020-08-30 10:50:24):2020-08-30 10:50:21
I don't understand why 2020-08-30 10:50:10
for A
is printed out,
for this event, the previous event ("A", "2020-08-30 10:50:21")
forwards the watermark to be
2020-08-30 10:50:17
, hence flink should close the window (2020-08-30 10:50:08, 2020-08-30 10:50:16)
.
Could some one take a look? Thanks!
答案1
得分: 1
尽管事件 A:2020-08-30 10:50:10 可能会延迟,但不一定会。水印不一定是确定性的。
特别是,有界无序水印策略使用周期性水印生成器,默认情况下每200毫秒(挂钟时间)发出一个新的水印。您的作业运行时间不那么长,所以作业结束时只会有一个水印,一次性关闭所有窗口。
如果您使用的是间断性水印生成器而不是周期性的,那么您可以在每一个可能的机会(每次有新的最大时间戳时)生成水印。这样会变得确定性,但代价是增加了开销,因为流会有更多的水印供运算符处理。
英文:
Although the event A:2020-08-30 10:50:10 could be late, it doesn't have to be. Watermarking is not necessarily deterministic.
In particular, the bounded-out-of-orderness watermark strategy uses a periodic watermark generator, that by default emits a new watermark every 200msec (of wall clock time). Your job doesn't run that long, so there's just one watermark happening at the end of the job, which closes all of the windows at once.
If you were to use a punctuated watermark generator rather than a periodic one, then you could have watermarks being generated at every conceivable opportunity (every time there's a new maximum timestamp). This would then behave deterministically, but at the expense of increasing the overhead, because the streams would have more watermarks for the operators to process.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论