英文:
How to add process function on a keyed stream with filters in flink stream processing?
问题
我有一个数据流,我正在使用一个字段作为键对其进行键入,然后使用Flink状态添加了一个RichFilter来过滤不适合的数据。我想使用一个键入的处理函数来处理这些被过滤掉的数据,因为我想在这个处理函数中利用Flink的valueState。但是由于过滤器的输出不是键入流,所以我不能直接将它与键入的处理函数连接起来,除非我再次使用相同的字段进行键入。
目前,这个方法是有效的,但是否有一种更简单的方法来做到这一点,而不必进行两次键入呢?
DataStream stream = 一些数据流
stream.keyBy(myKeySelector).filter(RichFilterFunction).keyBy(myKeySelector).process(KeyedProcessFunction)
英文:
I've a data stream, which I'm keying by a field, on which, I'm adding a RichFilter to filter out data which's not suitable, while using flink state. I want to process this filtered out data with a keyed process function as I want to make use of the flink valueState in this process function. But since the output of filter isn't a keyed stream, I'm not able to chain it with keyed process function, unless I key it again by the same field.
Right now, this's working, but is there a simpler way to do this, rather than keying it twice?
DataStream stream = some stream data
stream.keyBy(myKeySelector).filter(RichFilterFunction).keyBy(myKeySelector).process(KeyedProcessFunction)
答案1
得分: 3
看起来需要使用 reinterpretAsKeyedStream。
英文:
Seems like a job for reinterpretAsKeyedStream.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论