英文:
Beam stateful timer creates window?
问题
我们有一个处理流水线,它会根据全局窗口中的键元素执行处理(直到某个时点)。
在初始步骤和映射之后,我们有一个有状态的步骤,它会存储键的状态,以便将来具有类似键的事件来丰富它们。
并且我们有一个与此状态关联的定时器,设置为1周,以清除状态,因为此时窗口是全局的,我们不希望状态一直累积。
代码示例:
@ProcessElement
public void processElement(
ProcessContext c,
@StateId("testState") ValueState<String> testState,
@TimerId("expiry") Timer expiryTimer) {
// 处理逻辑
// 如果条件成立 - 我们找到要存储的值 {
testState.write(testValue);
expiryTimer.offset(Interval).setRelative();
// 处理逻辑
c.output(KV.of(key, event));
}
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("testState") ValueState<String> testState) {
testState.clear();
}
尽管如此,在处理元素方法内部,我们输出了所有元素,在“on timer”函数中,我们只清除了状态,似乎它创建了一个“窗口”,因为在onExpiry的间隔时间之前我们看不到任何输出(它配置为1周)。
我认为我们应该能够在有状态步骤之后对元素进行分组,而不必等待定时器触发。我是否漏掉了什么?
英文:
We have a pipeline that does processing per kay elements with a global window(until some point).
After initial steps and mapping, we have a stateful step that stores the state of the key for future events with a similar key to enrich them.
And we have a timer associated with this state set for 1 week to clear the state because at this point the window is global and we don't want to accumulate states forever.
Code sample:
@ProcessElement
public void processElement(
ProcessContext c,
@StateId("testState") ValueState<String> testState,
@TimerId("expiry") Timer expiryTimer) {
// processing logic
// if statement - we found value to store {
testState.write(testValue);
expiryTimer.offset(Interval).setRelative();
// processing logic
c.output(KV.of(key, event));
}
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("testState") ValueState<String> testState) {
testState.clear();
}
Despite that, we output all elements within the process element method, and in the "on timer" function we only clear the state it seems that it creates a "window" because we can't see any output before the interval for onExpiry passes(it configured for 1 week).
I thought we should be able to group elements after a stateful step without waiting for a timer to trigger. Am I missing something?
答案1
得分: 0
所以问题在于,如果默认使用 PROCESSING_TIME 计时器,它会在输入元素的值上保持一个输出水印。
这是描述此行为的文档 https://beam.apache.org/documentation/programming-guide/#timers
然而,这有点模糊,但源代码的注释帮助解释:
如果输出时间戳未明确设置,那么每个 TimeDomain 的默认输出时间戳为:
- TimeDomain.EVENT_TIME:新计时器的触发时间。
- TimeDomain.PROCESSING_TIME:当前元素的时间戳或当前计时器的输出时间戳。
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME:当前元素的时间戳或当前计时器的输出时间戳。
英文:
So the issue was that if you use PROCESSING_TIME timer by default it holds an output watermark on the value of the input element.
this is the dock that describes this behavior https://beam.apache.org/documentation/programming-guide/#timers
However, it's a bit vague, but the source code comments helped:
If the output timestamp has not been explicitly set then the default output timestamp per TimeDomain is:
- TimeDomain.EVENT_TIME: the firing time of this new timer.
- TimeDomain.PROCESSING_TIME: current element's timestamp or current timer's output timestamp.
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME: current element's timestamp or current timer's output timestamp.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论