如何实现具有动态超时的Flink countTriggerWithTimeout以处理每个传入元素。

huangapple go评论76阅读模式
英文:

How to implement a flink countTriggerWithTimeout with dynamic timeouts for each incoming element

问题

Here's the translated content:

对于 Flink 流处理还比较新。以下是我的需求:
当在过去的 20 秒内接收到 2个或更多元素时,通知用户。如果在 20 秒内接收到少于 2 个元素,则不进行通知,只需重新计数和计时。
每个元素的计数和间隔都有所不同。

以下是我的代码:

dataStream
.keyBy("id")
.window(EventTimeSessionWindows.withDynamicGap((event) -> event.getThresholdInterval()))
.trigger(new CountTriggerWithTimeout<TimeWindow>())

触发器代码:
public class CountTriggerWithTimeout<W extends TimeWindow> extends Trigger<SystemEvent, W> {

  private ReducingStateDescriptor<Long> countState =
      new ReducingStateDescriptor<Long>("count", new Sum(), LongSerializer.INSTANCE);
  private ReducingStateDescriptor<Long> processedState =
      new ReducingStateDescriptor<Long>("processed", new Sum(), LongSerializer.INSTANCE);

  @Override
  public TriggerResult onElement(SystemEvent element, long timestamp, W window, TriggerContext ctx)
      throws Exception {
    ReducingState<Long> count = ctx.getPartitionedState(countState);
    ReducingState<Long> processed = ctx.getPartitionedState(processedState);
    count.add(1L);
    processed.add(0L);
    if (count.get() >= element.getThresholdCount() && processed.get() == 0) {
      processed.add(1L);
      return TriggerResult.FIRE_AND_PURGE;
    }
    if (timestamp >= window.getEnd()) { 
      return TriggerResult.PURGE;
    }
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(W window, TriggerContext ctx) throws Exception {
    ctx.getPartitionedState(countState).clear();
    ctx.getPartitionedState(processedState).clear();
  }
  
  @Override
  public boolean canMerge() {
    return true;
  }

  class Sum implements ReduceFunction<java.lang.Long> {
    @Override
    public Long reduce(Long value1, Long value2) throws Exception {
      return value1 + value2;
    }
  }
}

以前我使用以下方式时一切正常运作:

dataStream
.timeWindow(Time.seconds(1))
.trigger(new CountTriggerWithTimeout<TimeWindow>())

一切都正常运作。但由于需要从元素中读取窗口时间的要求,我开始使用 EventTimeSessionWindow 并在触发器中添加了 canMerge() 函数。从那时起,一切都不再正常。clear() 永远不会被调用,onProcessingTime() 和 onEventTime() 也不会被调用。我注意到时间戳始终设置为相同的值,无论元素何时接收。

我的要求是在 event.getThresholdInterval() 内计数 >= 阈值时触发和清除(fire & purge)。如果在 event.getThresholdInterval() 内计数 < 阈值,则清除,即调用 clear 以清除计数和状态并重新开始。是否有一种方法可以在使用 timeWindow 而不是 EventTimeSessionWindows 的情况下实现这一点?

请帮助我解决这个问题。

谢谢...

Please note that the code snippets and technical terms have been kept in English for clarity.

英文:

Quite new to flink stream processing. Here is my requirement:
Alert the user when 2 or more elements were received in the last 20 seconds. If less than 2 elements were received in 20 seconds dont alert, just restart the counting and time.
The count and interval varies for each element.

Here's my code:

dataStream
.keyBy(&quot;id&quot;)
.window(EventTimeSessionWindows.withDynamicGap((event) -&gt; event.getThresholdInterval()))
.trigger(new CountTriggerWithTimeout&lt;TimeWindow&gt;())

TriggerCode:
public class CountTriggerWithTimeout&lt;W extends TimeWindow&gt; extends Trigger&lt;SystemEvent, W&gt; {

  private ReducingStateDescriptor&lt;Long&gt; countState =
      new ReducingStateDescriptor&lt;Long&gt;(&quot;count&quot;, new Sum(), LongSerializer.INSTANCE);
  private ReducingStateDescriptor&lt;Long&gt; processedState =
      new ReducingStateDescriptor&lt;Long&gt;(&quot;processed&quot;, new Sum(), LongSerializer.INSTANCE);

  @Override
  public TriggerResult onElement(SystemEvent element, long timestamp, W window, TriggerContext ctx)
      throws Exception {
    ReducingState&lt;Long&gt; count = ctx.getPartitionedState(countState);
    ReducingState&lt;Long&gt; processed = ctx.getPartitionedState(processedState);
    count.add(1L);
    processed.add(0L);
    if (count.get() &gt;= element.getThresholdCount() &amp;&amp; processed.get() == 0) {
      processed.add(1L);
      return TriggerResult.FIRE_AND_PURGE;
    }
    if (timestamp &gt;= window.getEnd()) { 
      return TriggerResult.PURGE;
    }
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(W window, TriggerContext ctx) throws Exception {
    ctx.getPartitionedState(countState).clear();
    ctx.getPartitionedState(processedState).clear();
  }
  
  @Override
  public boolean canMerge() {
    return true;
  }

  class Sum implements ReduceFunction&lt;java.lang.Long&gt; {
    @Override
    public Long reduce(Long value1, Long value2) throws Exception {
      return value1 + value2;
    }
  }
}

Earlier when I was using

dataStream
.timeWindow(Time.seconds(1))
.trigger(new CountTriggerWithTimeout&lt;TimeWindow&gt;())

everything was working perfectly fine. Since there is a requirement to read the window time from element, I started using EventTimeSessionWindow and added canMerge() function in the trigger. Since then, nothing is working. clear() is not getting invoked ever, nor are onProcessingTime() & onEventTime(). I see that timestamp is always set to the same value, irrespective of when the element was received.

My requirement is to "fire & purge" when count >= threshold within event.getThresholdInterval(). If count < threshold within event.getThresholdInterval() then purge i.e. invoke clear to clear the count and state and restart. Is there a way to achieve this with timeWindow instead of EventTimeSessionWindows?

Please help me fix this issue.

Thanks...

答案1

得分: 1

Sure, here's the translated code:

为什么不使用一个简单的 *Tumbling Windows*窗口大小为20秒并计算其中的元素数量

    source
        .keyBy("id")
        .timeWindow(Time.seconds(20))
        .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
            @Override
            public void process(Tuple key, ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>.Context ctx,
                    Iterable<Tuple2<String, Integer>> in, Collector<String> out) throws Exception {

                if (Lists.newArrayList(in).size() >= 2) {
                    out.collect("两个或更多元素位于窗口时间段 "
                            + Instant.ofEpochMilli(ctx.window().getStart())
                            + " 到 " + Instant.ofEpochMilli(ctx.window().getEnd()));
                }
            }
        })

Please note that I've translated the code parts, as requested, but I haven't included any additional content.

英文:

Why dont you use a simple Tumbling Windows of 20 seconds and count the elements on it:

source
		.keyBy(&quot;id&quot;)
		.timeWindow(Time.seconds(20))
		.process(new ProcessWindowFunction&lt;Tuple2&lt;String, Integer&gt;, String, Tuple, TimeWindow&gt;() {
			@Override
			public void process(Tuple key, ProcessWindowFunction&lt;Tuple2&lt;String, Integer&gt;, String, Tuple, TimeWindow&gt;.Context ctx,
					Iterable&lt;Tuple2&lt;String, Integer&gt;&gt; in, Collector&lt;String&gt; out) throws Exception {

				if (Lists.newArrayList(in).size() &gt;= 2) {
					out.collect(&quot;Two or more elements between &quot;
							+ Instant.ofEpochMilli(ctx.window().getStart())
							+ &quot; &quot; + Instant.ofEpochMilli(ctx.window().getEnd()));
				}
			}
		})

huangapple
  • 本文由 发表于 2020年8月11日 19:27:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/63357206.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定