Flink键控处理函数在闲置源上的定时器

huangapple go评论82阅读模式
英文:

Flink Keyedprocess function on timer for idle source

问题

我有一个Flink进程,监听Kafka。消费的消息需要保存在一个并发哈希映射中一段时间,然后需要传输到Cassandra。

操作链大致如下:

DataStream<Message> datastream = KafkaSource.createSource();
DataStream<Message> decodededMessage = datastream.flatMap(new DecodeMessage());

decodedMessage.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Message>(){

  public long extractAscendingTimestamp(Message m){

      return message.getTimestamp();
  }
  
}).keyBy((KeySelector<Message>) x-> x.getID())
  .process(new TimerFunction())
  .addSink(new MySink());
}


class TimerFunction extends KeyedProcessFunction<Integer, Message, Message>{

    private ValueState<Message> x;

    public void processElement(){
        //一些逻辑来创建一分钟的时间戳

        context.timerService().registerEventTimeTimer(x.getTimestamp());
    }
  
    public void onTimer(){
        // 在触发时输出值
    }
}

在处理事件时间时,我有一些疑问:

  • Message将具有唯一的ID和时间戳以及一些其他属性。在一分钟内可能有一百万个唯一的键。keyBy操作会影响性能吗?

我需要处理以下情景:

  • X消息的ID为1,于8小时1分钟1秒时到达
  • Y消息的ID为2,于8小时1分钟4秒时到达

由于我将ID用作键,这两条消息应该在8小时2分钟0秒时触发计时器。根据Flink文档,如果计时器的时间戳相同,它将只触发一次。我遇到了一个问题,当源空闲几分钟时,计时器会一直等待下一个水印,永远不会触发。如何处理空闲源?

  • 在这种情况下,使用处理时间是否更好?

  • 此外,我有一个使用Flink v1.8的限制,因此需要一些关于该版本的信息。

谢谢!

英文:

I have a flink process that listens to Kafka. The Messages consumed are then to be saved in a concurrent hash map for a period of time and then need to be sinked to cassandra.

The Operator chain goes something like

DataStream&lt;Message&gt; datastream = KafkaSource.createsource();
DataStream&lt;Message&gt; decodededMessage = datastream.flatmap(new DecodeMessage());

decodedMessage.assigneTimestampsandWatermarks(new AscendingTimestampExtractor&lt;Message&gt;(){

  public long extractAscendingTimestamp(Message m){

      return message.getTimestamp();
  }
  
}).keyBy((KeySelector&lt;Message&gt;) x-&gt; x.getID())
  .process(new Timerfunction())
  .addSink(new MySink());



class TimerFunction extends KeyedProcessFunction&lt;Integer,Message,Message&gt;{

    private ValueState&lt;Message&gt; x;

    public void processElement(){
//some logic to create timestamp for one minute

     context.timerService().registerEventTimeTimer(x.getTimestamp());
     }
  
    public void onTimer()
  // output values on trigger
}


I got some doubts while working with eventime

  • Message will have a unique id and timestamp and some other attributes. There could be a million unique keys in a minute. Will keyBy operation effect performance?

I need to cover a scenario as below

  • X Message with ID 1 arrives at 8hrs 1minute and 1sec

  • Y Message with ID 2 arrives at 8hrs 1minute and 4th sec

    Since im using Id as Key Both these Messages should have a timer set to trigger at 8hrs 2min 0sec.
    As per flink documentation if timestamp of timers are same it will be triggered just once.
    I'm facing a problem where source becomes idle for few minutes the timer keeps waiting for next watermark and
    never triggers. How to deal with idle source?

  • Is using processingtime a better option in this case?

  • Also i have a restriction to use Flink v1.8 so would need some info with respect to that version.

Thanks in Advance

答案1

得分: 2

我不完全理解你的问题;缺少太多上下文。但我可以提供一些要点:

(1) keyBy 操作代价较高:它会强制进行序列化/反序列化操作,并伴随着网络洗牌。

(2) 仅当定时器的时间戳和键值都相同时,它们才会被去重。

(3) 关于空闲源的问题,事件时间定时器最终会在事件重新开始流动时触发,因为这将推进水位线。如果不能等待,你可以使用类似这样的解决方案:https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java,或者切换到处理时间。

英文:

I don't fully understand your question; there's too much context missing. But I can offer a few points:

(1) keyBy is expensive: it forces serialization/deserialization along with a network shuffle.

(2) Timers are deduplicated if and only if they are for the same timestamp and the same key.

(3) As for the idle source problem, the event time timers will eventually fire when events begin to flow again, as that will advance the watermark(s). If can't wait, you can use something like https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java, or switch to processing time.

huangapple
  • 本文由 发表于 2020年8月26日 02:18:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/63584919.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定