英文:
Flink custom trigger not triggering onProcessingTime
问题
I have a custom trigger currently implemented that is supposed to trigger when there has been maxElements
processed or timeoutMs
has elapsed. Currently, the maxElements portion of the trigger works fine and triggers as expected, however, the timeout has never been triggered.
In the Flink main job, I have also tried env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
, however, this has not been helpful.
Can someone guide me in the right direction? I am lost and unsure where to proceed. Here is my implementation:
public class ElementCountOrTimeoutTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private long elementCount = 0;
private long lastTimestamp = Long.MIN_VALUE;
public ElementCountOrTimeoutTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = timestamp;
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("On processing time called");
System.out.println("Time: " + time);
System.out.println("Last timestamp: " + lastTimestamp);
System.out.println("Timeout: " + timeoutMs);
if (time >= lastTimestamp + timeoutMs) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = 0;
lastTimestamp = Long.MIN_VALUE;
}
public static <W extends Window> ElementCountOrTimeoutTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeoutTrigger<>(maxElements, timeoutMs);
}
}
Edit:
Incorporated a timer, however, it is not respecting the delete timer calls:
public class ElementCountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private int elementCount = Integer.MIN_VALUE;
private long lastTimestamp = Long.MIN_VALUE;
private long lastTimerExpire = Long.MIN_VALUE;
public ElementCountOrTimeTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = ctx.getCurrentProcessingTime();
if (lastTimerExpire > lastTimestamp) {
ctx.deleteProcessingTimeTimer(lastTimerExpire);
System.out.println("Removed the timer due to new element for time: " + lastTimerExpire);
}
lastTimerExpire = lastTimestamp + timeoutMs;
ctx.registerProcessingTimeTimer(lastTimerExpire);
System.out.println("Registered timer until " + lastTimerExpire);
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("Processing time called for time: " + time + " and last timer expire: " + lastTimerExpire);
System.out.println("Firing and purging");
elementCount = Integer.MIN_VALUE;
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = Integer.MIN_VALUE;
ctx.deleteProcessingTimeTimer(lastTimerExpire);
}
public static <W extends Window> ElementCountOrTimeTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeTrigger<>(maxElements, timeoutMs);
}
}
The log it produces:
Registered timer until 1683405995456
Removed the timer due to new element for time: 1683405995456
Registered timer until 1683405997053
Removed the timer due to new element for time: 1683405997053
Registered timer until 1683405999054
Removed the timer due to new element for time: 1683405999054
Registered timer until 1683406001054
Removed the timer due to new element for time: 1683406001054
Registered timer until 1683406003054
Removed the timer due to new element for time: 1683406003054
Registered timer until 1683406005055
Processing time called for time: 1683405995456 and last timer expire: 1683406005055
Firing and purging
Processing time called for time: 1683405997053 and last timer expire: 1683406005055
Firing and purging
Removed the timer due to new element for time: 1683406005055
Registered timer until 1683406007055
Removed the timer due to new element for time: 1683406007055
Registered timer until 1683406009055
Processing time called for time: 1683405999054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406001054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406003054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406005055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406007055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406009055 and last timer expire: 1683406009055
Firing and purging
英文:
I have a custom trigger currently implemented that is supposed to trigger when there has been maxElements
processed or timeoutMs
has elapsed. Currently, the maxElements portion of the trigger works fine and triggers as expected, however the timeout has never been triggered.
In the Flink main job, I have also tried env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
, however this has not been helpful.
Can someone guide me in the right direction? I am lost and unsure where to proceed. Here is my implementation:
public class ElementCountOrTimeoutTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private long elementCount = 0;
private long lastTimestamp = Long.MIN_VALUE;
public ElementCountOrTimeoutTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = timestamp;
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("On processing time called");
System.out.println("Time: " + time);
System.out.println("Last timestamp: " + lastTimestamp);
System.out.println("Timeout: " + timeoutMs);
if (time >= lastTimestamp + timeoutMs) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = 0;
lastTimestamp = Long.MIN_VALUE;
}
public static <W extends Window> ElementCountOrTimeoutTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeoutTrigger<>(maxElements, timeoutMs);
}
}
Edit:
Incorporated a timer, however it is not respecting the delete timer calls:
public class ElementCountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private int elementCount = Integer.MIN_VALUE;
private long lastTimestamp = Long.MIN_VALUE;
private long lastTimerExpire = Long.MIN_VALUE;
public ElementCountOrTimeTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = ctx.getCurrentProcessingTime();
if (lastTimerExpire > lastTimestamp) {
ctx.deleteProcessingTimeTimer(lastTimerExpire);
System.out.println("Removed the timer due to new element for time: " + lastTimerExpire);
}
lastTimerExpire = lastTimestamp + timeoutMs;
ctx.registerProcessingTimeTimer(lastTimerExpire);
System.out.println("Registered timer until " + lastTimerExpire);
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("Processing time called for time: " + time + " and last timer expire: " + lastTimerExpire);
System.out.println("Firing and purging");
elementCount = Integer.MIN_VALUE;
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = Integer.MIN_VALUE;
ctx.deleteProcessingTimeTimer(lastTimerExpire);
}
public static <W extends Window> ElementCountOrTimeTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeTrigger<>(maxElements, timeoutMs);
}
}
The log it produces:
Registered timer until 1683405995456
Removed the timer due to new element for time: 1683405995456
Registered timer until 1683405997053
Removed the timer due to new element for time: 1683405997053
Registered timer until 1683405999054
Removed the timer due to new element for time: 1683405999054
Registered timer until 1683406001054
Removed the timer due to new element for time: 1683406001054
Registered timer until 1683406003054
Removed the timer due to new element for time: 1683406003054
Registered timer until 1683406005055
Processing time called for time: 1683405995456 and last timer expire: 1683406005055
Firing and purging
Processing time called for time: 1683405997053 and last timer expire: 1683406005055
Firing and purging
Removed the timer due to new element for time: 1683406005055
Registered timer until 1683406007055
Removed the timer due to new element for time: 1683406007055
Registered timer until 1683406009055
Processing time called for time: 1683405999054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406001054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406003054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406005055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406007055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406009055 and last timer expire: 1683406009055
Firing and purging
答案1
得分: 0
你尚未注册处理时间计时器,因此 onProcessingTime
将永远不会被调用。
您可以在内置的 ProcessingTimeTrigger
上建模您自定义触发器的超时部分,您可以在这里找到它:https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
还可以参考这个答案,它提供了一个非常相似的完整示例:https://stackoverflow.com/a/49895802/2000823
英文:
You haven't registered a processing time timer, so onProcessingTime
will never be called.
You can model the timeout part of your custom trigger on the built-in ProcessingTimeTrigger
, which you'll find here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
See also this answer, which provides a complete example of something very similar: https://stackoverflow.com/a/49895802/2000823
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论