英文:
How to implement a flink countTriggerWithTimeout with dynamic timeouts for each incoming element
问题
Here's the translated content:
对于 Flink 流处理还比较新。以下是我的需求:
当在过去的 20 秒内接收到 2个或更多元素时,通知用户。如果在 20 秒内接收到少于 2 个元素,则不进行通知,只需重新计数和计时。
每个元素的计数和间隔都有所不同。
以下是我的代码:
dataStream
.keyBy("id")
.window(EventTimeSessionWindows.withDynamicGap((event) -> event.getThresholdInterval()))
.trigger(new CountTriggerWithTimeout<TimeWindow>())
触发器代码:
public class CountTriggerWithTimeout<W extends TimeWindow> extends Trigger<SystemEvent, W> {
private ReducingStateDescriptor<Long> countState =
new ReducingStateDescriptor<Long>("count", new Sum(), LongSerializer.INSTANCE);
private ReducingStateDescriptor<Long> processedState =
new ReducingStateDescriptor<Long>("processed", new Sum(), LongSerializer.INSTANCE);
@Override
public TriggerResult onElement(SystemEvent element, long timestamp, W window, TriggerContext ctx)
throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(countState);
ReducingState<Long> processed = ctx.getPartitionedState(processedState);
count.add(1L);
processed.add(0L);
if (count.get() >= element.getThresholdCount() && processed.get() == 0) {
processed.add(1L);
return TriggerResult.FIRE_AND_PURGE;
}
if (timestamp >= window.getEnd()) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countState).clear();
ctx.getPartitionedState(processedState).clear();
}
@Override
public boolean canMerge() {
return true;
}
class Sum implements ReduceFunction<java.lang.Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
以前我使用以下方式时一切正常运作:
dataStream
.timeWindow(Time.seconds(1))
.trigger(new CountTriggerWithTimeout<TimeWindow>())
一切都正常运作。但由于需要从元素中读取窗口时间的要求,我开始使用 EventTimeSessionWindow 并在触发器中添加了 canMerge() 函数。从那时起,一切都不再正常。clear() 永远不会被调用,onProcessingTime() 和 onEventTime() 也不会被调用。我注意到时间戳始终设置为相同的值,无论元素何时接收。
我的要求是在 event.getThresholdInterval() 内计数 >= 阈值时触发和清除(fire & purge)。如果在 event.getThresholdInterval() 内计数 < 阈值,则清除,即调用 clear 以清除计数和状态并重新开始。是否有一种方法可以在使用 timeWindow 而不是 EventTimeSessionWindows 的情况下实现这一点?
请帮助我解决这个问题。
谢谢...
Please note that the code snippets and technical terms have been kept in English for clarity.
英文:
Quite new to flink stream processing. Here is my requirement:
Alert the user when 2 or more elements were received in the last 20 seconds. If less than 2 elements were received in 20 seconds dont alert, just restart the counting and time.
The count and interval varies for each element.
Here's my code:
dataStream
.keyBy("id")
.window(EventTimeSessionWindows.withDynamicGap((event) -> event.getThresholdInterval()))
.trigger(new CountTriggerWithTimeout<TimeWindow>())
TriggerCode:
public class CountTriggerWithTimeout<W extends TimeWindow> extends Trigger<SystemEvent, W> {
private ReducingStateDescriptor<Long> countState =
new ReducingStateDescriptor<Long>("count", new Sum(), LongSerializer.INSTANCE);
private ReducingStateDescriptor<Long> processedState =
new ReducingStateDescriptor<Long>("processed", new Sum(), LongSerializer.INSTANCE);
@Override
public TriggerResult onElement(SystemEvent element, long timestamp, W window, TriggerContext ctx)
throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(countState);
ReducingState<Long> processed = ctx.getPartitionedState(processedState);
count.add(1L);
processed.add(0L);
if (count.get() >= element.getThresholdCount() && processed.get() == 0) {
processed.add(1L);
return TriggerResult.FIRE_AND_PURGE;
}
if (timestamp >= window.getEnd()) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countState).clear();
ctx.getPartitionedState(processedState).clear();
}
@Override
public boolean canMerge() {
return true;
}
class Sum implements ReduceFunction<java.lang.Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
Earlier when I was using
dataStream
.timeWindow(Time.seconds(1))
.trigger(new CountTriggerWithTimeout<TimeWindow>())
everything was working perfectly fine. Since there is a requirement to read the window time from element, I started using EventTimeSessionWindow and added canMerge() function in the trigger. Since then, nothing is working. clear() is not getting invoked ever, nor are onProcessingTime() & onEventTime(). I see that timestamp is always set to the same value, irrespective of when the element was received.
My requirement is to "fire & purge" when count >= threshold within event.getThresholdInterval(). If count < threshold within event.getThresholdInterval() then purge i.e. invoke clear to clear the count and state and restart. Is there a way to achieve this with timeWindow instead of EventTimeSessionWindows?
Please help me fix this issue.
Thanks...
答案1
得分: 1
Sure, here's the translated code:
为什么不使用一个简单的 *Tumbling Windows*,窗口大小为20秒,并计算其中的元素数量:
source
.keyBy("id")
.timeWindow(Time.seconds(20))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>.Context ctx,
Iterable<Tuple2<String, Integer>> in, Collector<String> out) throws Exception {
if (Lists.newArrayList(in).size() >= 2) {
out.collect("两个或更多元素位于窗口时间段 "
+ Instant.ofEpochMilli(ctx.window().getStart())
+ " 到 " + Instant.ofEpochMilli(ctx.window().getEnd()));
}
}
})
Please note that I've translated the code parts, as requested, but I haven't included any additional content.
英文:
Why dont you use a simple Tumbling Windows of 20 seconds and count the elements on it:
source
.keyBy("id")
.timeWindow(Time.seconds(20))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>.Context ctx,
Iterable<Tuple2<String, Integer>> in, Collector<String> out) throws Exception {
if (Lists.newArrayList(in).size() >= 2) {
out.collect("Two or more elements between "
+ Instant.ofEpochMilli(ctx.window().getStart())
+ " " + Instant.ofEpochMilli(ctx.window().getEnd()));
}
}
})
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论