如何在Flink流处理中的带有过滤器的键控流上添加处理函数?

huangapple go评论78阅读模式
英文:

How to add process function on a keyed stream with filters in flink stream processing?

问题

我有一个数据流,我正在使用一个字段作为键对其进行键入,然后使用Flink状态添加了一个RichFilter来过滤不适合的数据。我想使用一个键入的处理函数来处理这些被过滤掉的数据,因为我想在这个处理函数中利用Flink的valueState。但是由于过滤器的输出不是键入流,所以我不能直接将它与键入的处理函数连接起来,除非我再次使用相同的字段进行键入。

目前,这个方法是有效的,但是否有一种更简单的方法来做到这一点,而不必进行两次键入呢?

DataStream stream = 一些数据流
stream.keyBy(myKeySelector).filter(RichFilterFunction).keyBy(myKeySelector).process(KeyedProcessFunction)
英文:

I've a data stream, which I'm keying by a field, on which, I'm adding a RichFilter to filter out data which's not suitable, while using flink state. I want to process this filtered out data with a keyed process function as I want to make use of the flink valueState in this process function. But since the output of filter isn't a keyed stream, I'm not able to chain it with keyed process function, unless I key it again by the same field.

Right now, this's working, but is there a simpler way to do this, rather than keying it twice?

DataStream stream = some stream data
stream.keyBy(myKeySelector).filter(RichFilterFunction).keyBy(myKeySelector).process(KeyedProcessFunction)

答案1

得分: 3

看起来需要使用 reinterpretAsKeyedStream

英文:

Seems like a job for reinterpretAsKeyedStream.

huangapple
  • 本文由 发表于 2020年7月28日 14:52:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/63128528.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定