Flink自定义触发器未在处理时间上触发。

huangapple go评论53阅读模式
英文:

Flink custom trigger not triggering onProcessingTime

问题

I have a custom trigger currently implemented that is supposed to trigger when there has been maxElements processed or timeoutMs has elapsed. Currently, the maxElements portion of the trigger works fine and triggers as expected, however, the timeout has never been triggered.

In the Flink main job, I have also tried env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);, however, this has not been helpful.

Can someone guide me in the right direction? I am lost and unsure where to proceed. Here is my implementation:

public class ElementCountOrTimeoutTrigger<W extends Window> extends Trigger<Object, W> {

    private final long maxElements;
    private final long timeoutMs;
    private long elementCount = 0;
    private long lastTimestamp = Long.MIN_VALUE;

    public ElementCountOrTimeoutTrigger(long maxElements, long timeoutMs) {
        this.maxElements = maxElements;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        elementCount++;
        lastTimestamp = timestamp;
        if (elementCount >= maxElements) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        System.out.println("On processing time called");
        System.out.println("Time: " + time);
        System.out.println("Last timestamp: " + lastTimestamp);
        System.out.println("Timeout: " + timeoutMs);
        if (time >= lastTimestamp + timeoutMs) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        elementCount = 0;
        lastTimestamp = Long.MIN_VALUE;
    }

    public static <W extends Window> ElementCountOrTimeoutTrigger<W> of(long maxElements, long timeoutMs) {
        return new ElementCountOrTimeoutTrigger<>(maxElements, timeoutMs);
    }
}

Edit:

Incorporated a timer, however, it is not respecting the delete timer calls:

public class ElementCountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {

    private final long maxElements;
    private final long timeoutMs;
    private int elementCount = Integer.MIN_VALUE;
    private long lastTimestamp = Long.MIN_VALUE;
    private long lastTimerExpire = Long.MIN_VALUE;

    public ElementCountOrTimeTrigger(long maxElements, long timeoutMs) {
        this.maxElements = maxElements;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        elementCount++;
        lastTimestamp = ctx.getCurrentProcessingTime();
        if (lastTimerExpire > lastTimestamp) {
            ctx.deleteProcessingTimeTimer(lastTimerExpire);
            System.out.println("Removed the timer due to new element for time: " + lastTimerExpire);
        }
        lastTimerExpire = lastTimestamp + timeoutMs;
        ctx.registerProcessingTimeTimer(lastTimerExpire);
        System.out.println("Registered timer until " + lastTimerExpire);
        if (elementCount >= maxElements) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        System.out.println("Processing time called for time: " + time + " and last timer expire: " + lastTimerExpire);
        System.out.println("Firing and purging");
        elementCount = Integer.MIN_VALUE;
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        elementCount = Integer.MIN_VALUE;
        ctx.deleteProcessingTimeTimer(lastTimerExpire);
    }

    public static <W extends Window> ElementCountOrTimeTrigger<W> of(long maxElements, long timeoutMs) {
        return new ElementCountOrTimeTrigger<>(maxElements, timeoutMs);
    }
}

The log it produces:

Registered timer until 1683405995456
Removed the timer due to new element for time: 1683405995456
Registered timer until 1683405997053
Removed the timer due to new element for time: 1683405997053
Registered timer until 1683405999054
Removed the timer due to new element for time: 1683405999054
Registered timer until 1683406001054
Removed the timer due to new element for time: 1683406001054
Registered timer until 1683406003054
Removed the timer due to new element for time: 1683406003054
Registered timer until 1683406005055
Processing time called for time: 1683405995456 and last timer expire: 1683406005055
Firing and purging
Processing time called for time: 1683405997053 and last timer expire: 1683406005055
Firing and purging
Removed the timer due to new element for time: 1683406005055
Registered timer until 1683406007055
Removed the timer due to new element for time: 1683406007055
Registered timer until 1683406009055
Processing time called for time: 1683405999054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406001054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406003054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406005055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406007055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406009055 and last timer expire: 1683406009055
Firing and purging
英文:

I have a custom trigger currently implemented that is supposed to trigger when there has been maxElements processed or timeoutMs has elapsed. Currently, the maxElements portion of the trigger works fine and triggers as expected, however the timeout has never been triggered.

In the Flink main job, I have also tried env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);, however this has not been helpful.

Can someone guide me in the right direction? I am lost and unsure where to proceed. Here is my implementation:

public class ElementCountOrTimeoutTrigger<W extends Window> extends Trigger<Object, W> {

    private final long maxElements;
    private final long timeoutMs;
    private long elementCount = 0;
    private long lastTimestamp = Long.MIN_VALUE;

    public ElementCountOrTimeoutTrigger(long maxElements, long timeoutMs) {
        this.maxElements = maxElements;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        elementCount++;
        lastTimestamp = timestamp;
        if (elementCount >= maxElements) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        System.out.println("On processing time called");
        System.out.println("Time: " + time);
        System.out.println("Last timestamp: " + lastTimestamp);
        System.out.println("Timeout: " + timeoutMs);
        if (time >= lastTimestamp + timeoutMs) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        elementCount = 0;
        lastTimestamp = Long.MIN_VALUE;
    }

    public static <W extends Window> ElementCountOrTimeoutTrigger<W> of(long maxElements, long timeoutMs) {
        return new ElementCountOrTimeoutTrigger<>(maxElements, timeoutMs);
    }
}

Edit:

Incorporated a timer, however it is not respecting the delete timer calls:

public class ElementCountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {

    private final long maxElements;
    private final long timeoutMs;
    private int elementCount = Integer.MIN_VALUE;
    private long lastTimestamp = Long.MIN_VALUE;
    private long lastTimerExpire = Long.MIN_VALUE;

    public ElementCountOrTimeTrigger(long maxElements, long timeoutMs) {
        this.maxElements = maxElements;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        elementCount++;
        lastTimestamp = ctx.getCurrentProcessingTime();
        if (lastTimerExpire > lastTimestamp) {
            ctx.deleteProcessingTimeTimer(lastTimerExpire);
            System.out.println("Removed the timer due to new element for time: " + lastTimerExpire);
        }
        lastTimerExpire = lastTimestamp + timeoutMs;
        ctx.registerProcessingTimeTimer(lastTimerExpire);
        System.out.println("Registered timer until " + lastTimerExpire);
        if (elementCount >= maxElements) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        System.out.println("Processing time called for time: " + time + " and last timer expire: " + lastTimerExpire);
        System.out.println("Firing and purging");
        elementCount = Integer.MIN_VALUE;
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        elementCount = Integer.MIN_VALUE;
        ctx.deleteProcessingTimeTimer(lastTimerExpire);
    }

    public static <W extends Window> ElementCountOrTimeTrigger<W> of(long maxElements, long timeoutMs) {
        return new ElementCountOrTimeTrigger<>(maxElements, timeoutMs);
    }
}

The log it produces:

Registered timer until 1683405995456
Removed the timer due to new element for time: 1683405995456
Registered timer until 1683405997053
Removed the timer due to new element for time: 1683405997053
Registered timer until 1683405999054
Removed the timer due to new element for time: 1683405999054
Registered timer until 1683406001054
Removed the timer due to new element for time: 1683406001054
Registered timer until 1683406003054
Removed the timer due to new element for time: 1683406003054
Registered timer until 1683406005055
Processing time called for time: 1683405995456 and last timer expire: 1683406005055
Firing and purging
Processing time called for time: 1683405997053 and last timer expire: 1683406005055
Firing and purging
Removed the timer due to new element for time: 1683406005055
Registered timer until 1683406007055
Removed the timer due to new element for time: 1683406007055
Registered timer until 1683406009055
Processing time called for time: 1683405999054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406001054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406003054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406005055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406007055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406009055 and last timer expire: 1683406009055
Firing and purging

答案1

得分: 0

你尚未注册处理时间计时器,因此 onProcessingTime 将永远不会被调用。

您可以在内置的 ProcessingTimeTrigger 上建模您自定义触发器的超时部分,您可以在这里找到它:https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

还可以参考这个答案,它提供了一个非常相似的完整示例:https://stackoverflow.com/a/49895802/2000823

英文:

You haven't registered a processing time timer, so onProcessingTime will never be called.

You can model the timeout part of your custom trigger on the built-in ProcessingTimeTrigger, which you'll find here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

See also this answer, which provides a complete example of something very similar: https://stackoverflow.com/a/49895802/2000823

huangapple
  • 本文由 发表于 2023年5月6日 16:19:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76187860.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定