英文:
Flink co group outer join fails with High Backpressure
问题
// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
environment
.addSource(createHFAConsumer())
.name("hfa source");
SingleOutputStreamOperator<EVWindow> stream2 = environment
.addSource(createHFDConsumer())
.name("hfd source");
DataStream<Message> pStream =
stream1
.coGroup(stream2)
.where(obj -> obj.getid())
.equalTo(ev -> ev.getid())
.window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.apply(new CalculateCoGroupFunction());
This works perfectly fine when both Streams have data, but when stream2 has no data, the job fails with very high back pressure. The CPU utilization also spikes by 200%.
How do I handle outer join in such a scenario?
英文:
I have two streams in Flink stream1
has 70000 records per sec and stream2
may or may not have data.
// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
environment
.addSource(createHFAConsumer())
.name("hfa source");
SingleOutputStreamOperator<EVWindow> stream2 = environment
.addSource(createHFDConsumer())
.name("hfd source");
DataStream<Message> pStream =
stream1
.coGroup(stream2)
.where(obj -> obj.getid())
.equalTo(ev -> ev.getid())
.window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.apply(new CalculateCoGroupFunction());
This works perfectly fine when both Streams have data , but when stream2 has no data the job fails with very high back pressure. The CPU utilization also spikes by 200%.
How do I handle outer join in such scenario
答案1
得分: 1
我认为问题在于空闲流缺少水印,这会阻碍整体水印的推进。无论何时连接多个流,生成的水印都是传入水印的最小值。这可能会导致诸如您所遇到的问题。
你有几个选择:
- 将
stream2
的水印设置为Watermark.MAX_WATERMARK
,从而完全由stream1
控制水印。 - 以某种方式检测到
stream2
处于空闲状态,并在没有事件的情况下人为推进水印。这里是一个示例。
英文:
I believe the problem is that the lack of watermarks from the idle stream is holding back the overall watermark. Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks. This can then lead to problems like the one you are experiencing.
You have a couple of options:
- Set the watermark for
stream2
to beWatermark.MAX_WATERMARK
, thereby givingstream1
complete control of watermarking. - Somehow detect that
stream2
is idle, and artificially advance the watermark despite the lack of events. Here is an example.
答案2
得分: 1
感谢David Anderson提供的指引。
根本原因分析:
主要问题出现在我尝试围绕我的流创建一个滚动窗口时。
根据Flink文档的说明:
> 简而言之,窗口在第一个应属于该窗口的元素到达时创建,当时间(事件时间或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。
由于stream2
没有传入数据,窗口从未形成。正如David所指出的:
> 每当多个流连接在一起时,生成的水印是传入水印的最小值。
这意味着在等待stream2
的同时,Flink会缓冲来自stream1
的数据,最终可能导致严重的背压问题,最终导致内存溢出(OOM)。
解决方案:
我创建了一个外部脚本,以期望的间隔向Kafka流stream2
发送虚拟心跳消息,并在我的应用程序中添加逻辑以忽略这些消息以进行计算。
这迫使stream2
和stream1
推进水印,从而使窗口脱离上下文被移除。
英文:
Thanks David Anderson for the pointers
RCA :
The main issue came when I tried to create a Tumbling Window around my Stream.
As per Flink Documentation
> In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness
Since there was no incoming data for stream2
the window never materialized. As David pointed out
> Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks
which means flink was buffering data from stream1
while waiting for stream2
and would eventually result in High Backpressure and finally a OOM.
The Solution :
I created a external script to send dummy heartbeat messages to the Kafka Stream stream2
at the desired interval and added logic in my application to ignore these messages for computation.
This forced the stream2
and stream1
to advance the watermarks and the window was removed out of context.
答案3
得分: 1
之前已经讨论过:
当连接多个流时,生成的水印是传入水印的最小值
和
这意味着 Flink 会在等待 stream2 的同时缓冲来自 stream1 的数据,并最终导致高背压,最终引发 OOM。
这适用于 DataStream<T>
类中的 coGroup()
方法,该方法返回 CoGroupedStreams<T, T2>
。
为了避免这种行为,我们可以使用 union(DataStream<T>... streams)
方法,该方法返回一个普通的 DataStream<T>
,其中水印将像常规流一样前进。
我们需要解决的唯一问题是为两个流拥有一个共同的模式(类)。我们可以使用带有两个字段的一些聚合类:
public class Aggregator {
private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
private EVWindow evWindow;
public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
}
public Aggregator(EVWindow evWindow) {
this.evWindow = evWindow;
}
public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
return flatHighFrequencyAnalog;
}
public EVWindow getEVWindow() {
return evWindow;
}
}
另外,一种更通用的方式是使用 org.apache.flink.types
中的 Either<L, R>
类。
让我们总结一下最终会得到什么:
SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream1 =
environment
.addSource(createHFAConsumer())
.map(hfa -> Either.Left(hfa));
SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream2 =
environment
.addSource(createHFDConsumer())
.map(hfd -> Either.Right(hfd));
DataStream<Message> pStream =
stream1
.union(stream2)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Either<EVWindow, FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
ofSeconds(MAX_OUT_OF_ORDERNESS))
.withTimestampAssigner((input, timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
.keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
.window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
.process(new ProcessWindowFunction());
在处理函数中获取不同的集合
List<EVWindow> evWindows =
Streams.stream(elements)
.filter(Either::isLeft)
.map(Either::left)
.collect(Collectors.toList());
List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
Streams.stream(elements)
.filter(Either::isRight)
.map(Either::right)
.collect(Collectors.toList());
英文:
As was discussed before:
> Whenever multiple streams are connected, the resulting watermark is
> the minimum of the incoming watermarks
and
> which means flink was buffering data from stream1 while waiting for
> stream2 and would eventually result in High Backpressure and finally a
> OOM.
It works for coGroup()
method from the DataStream<T>
class which returns CoGroupedStreams<T, T2>
.
To avoid such behavior we can use union(DataStream<T>... streams)
method which returns a simple DataStream<T>
where the watermarks will be advancing as in a usual stream.
The only problem which we need to solve is to have a common schema (class) for both streams. We can use some aggregation class with two fields:
public class Aggregator {
private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
private EVWindow evWindow;
public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
}
public Aggregator(EVWindow evWindow) {
this.evWindow = evWindow;
}
public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
return flatHighFrequencyAnalog;
}
public EVWindow getEVWindow() {
return evWindow;
}
}
Also, a more generic way is to use Either<L, R>
class from org.apache.flink.types
.
Let's summarize what we'll have in the end:
SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream1 =
environment
.addSource(createHFAConsumer())
.map(hfa -> Either.Left(hfa));
SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream2 =
environment
.addSource(createHFDConsumer())
.map(hfd -> Either.Right(hfd));
DataStream<Message> pStream =
stream1
.union(stream2)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Either<EVWindow, FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
ofSeconds(MAX_OUT_OF_ORDERNESS))
.withTimestampAssigner((input, timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
.keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
.window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
.process(new ProcessWindowFunction());
To get different collections in the process function
List<EVWindow> evWindows =
Streams.stream(elements)
.filter(Either::isLeft)
.map(Either::left)
.collect(Collectors.toList());
List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
Streams.stream(elements)
.filter(Either::isRight)
.map(Either::right)
.collect(Collectors.toList());
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论