Beam stateful timer 创建窗口?

huangapple go评论53阅读模式
英文:

Beam stateful timer creates window?

问题

我们有一个处理流水线,它会根据全局窗口中的键元素执行处理(直到某个时点)。

在初始步骤和映射之后,我们有一个有状态的步骤,它会存储键的状态,以便将来具有类似键的事件来丰富它们。

并且我们有一个与此状态关联的定时器,设置为1周,以清除状态,因为此时窗口是全局的,我们不希望状态一直累积。

代码示例:

  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId("testState") ValueState<String> testState,
      @TimerId("expiry") Timer expiryTimer) {
    // 处理逻辑

    // 如果条件成立 - 我们找到要存储的值 {
      testState.write(testValue);
      expiryTimer.offset(Interval).setRelative();
   // 处理逻辑
    c.output(KV.of(key, event));
  }
  
  @OnTimer("expiry")
  public void onExpiry(
      OnTimerContext context,
      @StateId("testState") ValueState<String> testState) {
    testState.clear();
  }

尽管如此,在处理元素方法内部,我们输出了所有元素,在“on timer”函数中,我们只清除了状态,似乎它创建了一个“窗口”,因为在onExpiry的间隔时间之前我们看不到任何输出(它配置为1周)。

我认为我们应该能够在有状态步骤之后对元素进行分组,而不必等待定时器触发。我是否漏掉了什么?

英文:

We have a pipeline that does processing per kay elements with a global window(until some point).

After initial steps and mapping, we have a stateful step that stores the state of the key for future events with a similar key to enrich them.

And we have a timer associated with this state set for 1 week to clear the state because at this point the window is global and we don't want to accumulate states forever.

Code sample:

  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId(&quot;testState&quot;) ValueState&lt;String&gt; testState,
      @TimerId(&quot;expiry&quot;) Timer expiryTimer) {
    // processing logic

    // if statement - we found value to store {
      testState.write(testValue);
      expiryTimer.offset(Interval).setRelative();
   // processing logic
    c.output(KV.of(key, event));
  }
  
  @OnTimer(&quot;expiry&quot;)
  public void onExpiry(
      OnTimerContext context,
      @StateId(&quot;testState&quot;) ValueState&lt;String&gt; testState) {
    testState.clear();
  }

Despite that, we output all elements within the process element method, and in the "on timer" function we only clear the state it seems that it creates a "window" because we can't see any output before the interval for onExpiry passes(it configured for 1 week).

I thought we should be able to group elements after a stateful step without waiting for a timer to trigger. Am I missing something?

答案1

得分: 0

所以问题在于,如果默认使用 PROCESSING_TIME 计时器,它会在输入元素的值上保持一个输出水印。
这是描述此行为的文档 https://beam.apache.org/documentation/programming-guide/#timers
然而,这有点模糊,但源代码的注释帮助解释:

如果输出时间戳未明确设置,那么每个 TimeDomain 的默认输出时间戳为:

  • TimeDomain.EVENT_TIME:新计时器的触发时间。
  • TimeDomain.PROCESSING_TIME:当前元素的时间戳或当前计时器的输出时间戳。
  • TimeDomain.SYNCHRONIZED_PROCESSING_TIME:当前元素的时间戳或当前计时器的输出时间戳。
英文:

So the issue was that if you use PROCESSING_TIME timer by default it holds an output watermark on the value of the input element.
this is the dock that describes this behavior https://beam.apache.org/documentation/programming-guide/#timers
However, it's a bit vague, but the source code comments helped:

If the output timestamp has not been explicitly set then the default output timestamp per TimeDomain is:

  • TimeDomain.EVENT_TIME: the firing time of this new timer.
  • TimeDomain.PROCESSING_TIME: current element's timestamp or current timer's output timestamp.
  • TimeDomain.SYNCHRONIZED_PROCESSING_TIME: current element's timestamp or current timer's output timestamp.

huangapple
  • 本文由 发表于 2023年5月26日 16:15:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76338925.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定