如何实现具有动态超时的Flink countTriggerWithTimeout以处理每个传入元素。

huangapple go评论99阅读模式
英文:

How to implement a flink countTriggerWithTimeout with dynamic timeouts for each incoming element

问题

Here's the translated content:

  1. 对于 Flink 流处理还比较新。以下是我的需求:
  2. 当在过去的 20 秒内接收到 2个或更多元素时,通知用户。如果在 20 秒内接收到少于 2 个元素,则不进行通知,只需重新计数和计时。
  3. 每个元素的计数和间隔都有所不同。
  4. 以下是我的代码:
  5. dataStream
  6. .keyBy("id")
  7. .window(EventTimeSessionWindows.withDynamicGap((event) -> event.getThresholdInterval()))
  8. .trigger(new CountTriggerWithTimeout<TimeWindow>())
  9. 触发器代码:
  10. public class CountTriggerWithTimeout<W extends TimeWindow> extends Trigger<SystemEvent, W> {
  11. private ReducingStateDescriptor<Long> countState =
  12. new ReducingStateDescriptor<Long>("count", new Sum(), LongSerializer.INSTANCE);
  13. private ReducingStateDescriptor<Long> processedState =
  14. new ReducingStateDescriptor<Long>("processed", new Sum(), LongSerializer.INSTANCE);
  15. @Override
  16. public TriggerResult onElement(SystemEvent element, long timestamp, W window, TriggerContext ctx)
  17. throws Exception {
  18. ReducingState<Long> count = ctx.getPartitionedState(countState);
  19. ReducingState<Long> processed = ctx.getPartitionedState(processedState);
  20. count.add(1L);
  21. processed.add(0L);
  22. if (count.get() >= element.getThresholdCount() && processed.get() == 0) {
  23. processed.add(1L);
  24. return TriggerResult.FIRE_AND_PURGE;
  25. }
  26. if (timestamp >= window.getEnd()) {
  27. return TriggerResult.PURGE;
  28. }
  29. return TriggerResult.CONTINUE;
  30. }
  31. @Override
  32. public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
  33. return TriggerResult.CONTINUE;
  34. }
  35. @Override
  36. public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
  37. return TriggerResult.CONTINUE;
  38. }
  39. @Override
  40. public void clear(W window, TriggerContext ctx) throws Exception {
  41. ctx.getPartitionedState(countState).clear();
  42. ctx.getPartitionedState(processedState).clear();
  43. }
  44. @Override
  45. public boolean canMerge() {
  46. return true;
  47. }
  48. class Sum implements ReduceFunction<java.lang.Long> {
  49. @Override
  50. public Long reduce(Long value1, Long value2) throws Exception {
  51. return value1 + value2;
  52. }
  53. }
  54. }
  55. 以前我使用以下方式时一切正常运作:
  56. dataStream
  57. .timeWindow(Time.seconds(1))
  58. .trigger(new CountTriggerWithTimeout<TimeWindow>())
  59. 一切都正常运作。但由于需要从元素中读取窗口时间的要求,我开始使用 EventTimeSessionWindow 并在触发器中添加了 canMerge() 函数。从那时起,一切都不再正常。clear() 永远不会被调用,onProcessingTime() onEventTime() 也不会被调用。我注意到时间戳始终设置为相同的值,无论元素何时接收。
  60. 我的要求是在 event.getThresholdInterval() 内计数 >= 阈值时触发和清除(fire & purge)。如果在 event.getThresholdInterval() 内计数 < 阈值,则清除,即调用 clear 以清除计数和状态并重新开始。是否有一种方法可以在使用 timeWindow 而不是 EventTimeSessionWindows 的情况下实现这一点?
  61. 请帮助我解决这个问题。
  62. 谢谢...

Please note that the code snippets and technical terms have been kept in English for clarity.

英文:

Quite new to flink stream processing. Here is my requirement:
Alert the user when 2 or more elements were received in the last 20 seconds. If less than 2 elements were received in 20 seconds dont alert, just restart the counting and time.
The count and interval varies for each element.

Here's my code:

  1. dataStream
  2. .keyBy(&quot;id&quot;)
  3. .window(EventTimeSessionWindows.withDynamicGap((event) -&gt; event.getThresholdInterval()))
  4. .trigger(new CountTriggerWithTimeout&lt;TimeWindow&gt;())
  5. TriggerCode:
  6. public class CountTriggerWithTimeout&lt;W extends TimeWindow&gt; extends Trigger&lt;SystemEvent, W&gt; {
  7. private ReducingStateDescriptor&lt;Long&gt; countState =
  8. new ReducingStateDescriptor&lt;Long&gt;(&quot;count&quot;, new Sum(), LongSerializer.INSTANCE);
  9. private ReducingStateDescriptor&lt;Long&gt; processedState =
  10. new ReducingStateDescriptor&lt;Long&gt;(&quot;processed&quot;, new Sum(), LongSerializer.INSTANCE);
  11. @Override
  12. public TriggerResult onElement(SystemEvent element, long timestamp, W window, TriggerContext ctx)
  13. throws Exception {
  14. ReducingState&lt;Long&gt; count = ctx.getPartitionedState(countState);
  15. ReducingState&lt;Long&gt; processed = ctx.getPartitionedState(processedState);
  16. count.add(1L);
  17. processed.add(0L);
  18. if (count.get() &gt;= element.getThresholdCount() &amp;&amp; processed.get() == 0) {
  19. processed.add(1L);
  20. return TriggerResult.FIRE_AND_PURGE;
  21. }
  22. if (timestamp &gt;= window.getEnd()) {
  23. return TriggerResult.PURGE;
  24. }
  25. return TriggerResult.CONTINUE;
  26. }
  27. @Override
  28. public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
  29. return TriggerResult.CONTINUE;
  30. }
  31. @Override
  32. public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
  33. return TriggerResult.CONTINUE;
  34. }
  35. @Override
  36. public void clear(W window, TriggerContext ctx) throws Exception {
  37. ctx.getPartitionedState(countState).clear();
  38. ctx.getPartitionedState(processedState).clear();
  39. }
  40. @Override
  41. public boolean canMerge() {
  42. return true;
  43. }
  44. class Sum implements ReduceFunction&lt;java.lang.Long&gt; {
  45. @Override
  46. public Long reduce(Long value1, Long value2) throws Exception {
  47. return value1 + value2;
  48. }
  49. }
  50. }

Earlier when I was using

  1. dataStream
  2. .timeWindow(Time.seconds(1))
  3. .trigger(new CountTriggerWithTimeout&lt;TimeWindow&gt;())

everything was working perfectly fine. Since there is a requirement to read the window time from element, I started using EventTimeSessionWindow and added canMerge() function in the trigger. Since then, nothing is working. clear() is not getting invoked ever, nor are onProcessingTime() & onEventTime(). I see that timestamp is always set to the same value, irrespective of when the element was received.

My requirement is to "fire & purge" when count >= threshold within event.getThresholdInterval(). If count < threshold within event.getThresholdInterval() then purge i.e. invoke clear to clear the count and state and restart. Is there a way to achieve this with timeWindow instead of EventTimeSessionWindows?

Please help me fix this issue.

Thanks...

答案1

得分: 1

Sure, here's the translated code:

  1. 为什么不使用一个简单的 *Tumbling Windows*窗口大小为20并计算其中的元素数量
  2. source
  3. .keyBy("id")
  4. .timeWindow(Time.seconds(20))
  5. .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
  6. @Override
  7. public void process(Tuple key, ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>.Context ctx,
  8. Iterable<Tuple2<String, Integer>> in, Collector<String> out) throws Exception {
  9. if (Lists.newArrayList(in).size() >= 2) {
  10. out.collect("两个或更多元素位于窗口时间段 "
  11. + Instant.ofEpochMilli(ctx.window().getStart())
  12. + " 到 " + Instant.ofEpochMilli(ctx.window().getEnd()));
  13. }
  14. }
  15. })

Please note that I've translated the code parts, as requested, but I haven't included any additional content.

英文:

Why dont you use a simple Tumbling Windows of 20 seconds and count the elements on it:

  1. source
  2. .keyBy(&quot;id&quot;)
  3. .timeWindow(Time.seconds(20))
  4. .process(new ProcessWindowFunction&lt;Tuple2&lt;String, Integer&gt;, String, Tuple, TimeWindow&gt;() {
  5. @Override
  6. public void process(Tuple key, ProcessWindowFunction&lt;Tuple2&lt;String, Integer&gt;, String, Tuple, TimeWindow&gt;.Context ctx,
  7. Iterable&lt;Tuple2&lt;String, Integer&gt;&gt; in, Collector&lt;String&gt; out) throws Exception {
  8. if (Lists.newArrayList(in).size() &gt;= 2) {
  9. out.collect(&quot;Two or more elements between &quot;
  10. + Instant.ofEpochMilli(ctx.window().getStart())
  11. + &quot; &quot; + Instant.ofEpochMilli(ctx.window().getEnd()));
  12. }
  13. }
  14. })

huangapple
  • 本文由 发表于 2020年8月11日 19:27:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/63357206.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定