如何在Apache Flink中创建动态规则?

huangapple go评论86阅读模式
英文:

How can I create dynamic rule in apache-flink?

问题

我正在使用Java和Flink,成功定义了一个静态模式如下:

Pattern<Event, ?> pattern = Pattern.<Event>
    .begin("first")
    .where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getTemperature() > 50;
            }
        }).within(Time.seconds(10L));

在Apache Flink中是否有一种动态方式来创建模式?
我需要根据用户的输入来定义模式。

谢谢。

英文:

I'm using flink with Java and I succeeded in defining a static pattern as follow:

Pattern&lt;Event, ?&gt; pattern = Pattern.&lt;Event&gt;
            begin(&quot;first&quot;)
            .where(
                    new SimpleCondition&lt;Event&gt;() {
                        @Override
                            public boolean filter(Event event) {
                            return event.getTemperature() &gt; 50;
                         }
                    }).within(Time.seconds(10L));

Is there a way in apache-flink to create patterns in a dynamic way?
I need to define the pattern according to user's input.

Thanks

答案1

得分: 1

你可能对“应用逻辑的动态更新”模式感兴趣。

对于您连接到流的规则,可以使用BroadcastStream。

在文章中的示例中,您甚至可以具有动态的聚合定义:

// 流设置
DataStream transactions = [...]
DataStream rulesUpdateStream = [...]

BroadcastStream rulesStream = rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR);

// 处理流程设置
DataStream alerts =
transactions
.connect(rulesStream)
.process(new DynamicKeyFunction())
.keyBy((keyed) -> keyed.getKey())
.connect(rulesStream)
.process(new DynamicAlertFunction())

英文:

You might be interested in the "Dynamic Updates of Application Logic" pattern.

Use BroadcastStream for your rules that you connect to the stream.

With the example in the article you could even have dynamic aggregations definitions:

// Streams setup
DataStream&lt;Transaction&gt; transactions = [...]
DataStream&lt;Rule&gt; rulesUpdateStream = [...]

BroadcastStream&lt;Rule&gt; rulesStream = rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR);

// Processing pipeline setup
 DataStream&lt;Alert&gt; alerts =
     transactions
         .connect(rulesStream)
         .process(new DynamicKeyFunction())
         .keyBy((keyed) -&gt; keyed.getKey())
         .connect(rulesStream)
         .process(new DynamicAlertFunction())

huangapple
  • 本文由 发表于 2020年7月29日 19:30:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/63152612.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定