英文:
Flink Keyedprocess function on timer for idle source
问题
我有一个Flink进程,监听Kafka。消费的消息需要保存在一个并发哈希映射中一段时间,然后需要传输到Cassandra。
操作链大致如下:
DataStream<Message> datastream = KafkaSource.createSource();
DataStream<Message> decodededMessage = datastream.flatMap(new DecodeMessage());
decodedMessage.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Message>(){
public long extractAscendingTimestamp(Message m){
return message.getTimestamp();
}
}).keyBy((KeySelector<Message>) x-> x.getID())
.process(new TimerFunction())
.addSink(new MySink());
}
class TimerFunction extends KeyedProcessFunction<Integer, Message, Message>{
private ValueState<Message> x;
public void processElement(){
//一些逻辑来创建一分钟的时间戳
context.timerService().registerEventTimeTimer(x.getTimestamp());
}
public void onTimer(){
// 在触发时输出值
}
}
在处理事件时间时,我有一些疑问:
- Message将具有唯一的ID和时间戳以及一些其他属性。在一分钟内可能有一百万个唯一的键。keyBy操作会影响性能吗?
我需要处理以下情景:
- X消息的ID为1,于8小时1分钟1秒时到达
- Y消息的ID为2,于8小时1分钟4秒时到达
由于我将ID用作键,这两条消息应该在8小时2分钟0秒时触发计时器。根据Flink文档,如果计时器的时间戳相同,它将只触发一次。我遇到了一个问题,当源空闲几分钟时,计时器会一直等待下一个水印,永远不会触发。如何处理空闲源?
-
在这种情况下,使用处理时间是否更好?
-
此外,我有一个使用Flink v1.8的限制,因此需要一些关于该版本的信息。
谢谢!
英文:
I have a flink process that listens to Kafka. The Messages consumed are then to be saved in a concurrent hash map for a period of time and then need to be sinked to cassandra.
The Operator chain goes something like
DataStream<Message> datastream = KafkaSource.createsource();
DataStream<Message> decodededMessage = datastream.flatmap(new DecodeMessage());
decodedMessage.assigneTimestampsandWatermarks(new AscendingTimestampExtractor<Message>(){
public long extractAscendingTimestamp(Message m){
return message.getTimestamp();
}
}).keyBy((KeySelector<Message>) x-> x.getID())
.process(new Timerfunction())
.addSink(new MySink());
class TimerFunction extends KeyedProcessFunction<Integer,Message,Message>{
private ValueState<Message> x;
public void processElement(){
//some logic to create timestamp for one minute
context.timerService().registerEventTimeTimer(x.getTimestamp());
}
public void onTimer()
// output values on trigger
}
I got some doubts while working with eventime
- Message will have a unique id and timestamp and some other attributes. There could be a million unique keys in a minute. Will keyBy operation effect performance?
I need to cover a scenario as below
-
X Message with ID 1 arrives at 8hrs 1minute and 1sec
-
Y Message with ID 2 arrives at 8hrs 1minute and 4th sec
Since im using Id as Key Both these Messages should have a timer set to trigger at 8hrs 2min 0sec.
As per flink documentation if timestamp of timers are same it will be triggered just once.
I'm facing a problem where source becomes idle for few minutes the timer keeps waiting for next watermark and
never triggers. How to deal with idle source? -
Is using processingtime a better option in this case?
-
Also i have a restriction to use Flink v1.8 so would need some info with respect to that version.
Thanks in Advance
答案1
得分: 2
我不完全理解你的问题;缺少太多上下文。但我可以提供一些要点:
(1) keyBy
操作代价较高:它会强制进行序列化/反序列化操作,并伴随着网络洗牌。
(2) 仅当定时器的时间戳和键值都相同时,它们才会被去重。
(3) 关于空闲源的问题,事件时间定时器最终会在事件重新开始流动时触发,因为这将推进水位线。如果不能等待,你可以使用类似这样的解决方案:https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java,或者切换到处理时间。
英文:
I don't fully understand your question; there's too much context missing. But I can offer a few points:
(1) keyBy
is expensive: it forces serialization/deserialization along with a network shuffle.
(2) Timers are deduplicated if and only if they are for the same timestamp and the same key.
(3) As for the idle source problem, the event time timers will eventually fire when events begin to flow again, as that will advance the watermark(s). If can't wait, you can use something like https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java, or switch to processing time.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论