Can you implement Flink's AggregateFunction with Generic Types?

huangapple go评论67阅读模式
英文:

Can you implement Flink's AggregateFunction with Generic Types?

问题

我的目标是为Flink 1.10中的流处理模块提供一个接口。流水线包含一个AggregateFunction以及其他操作符。所有操作符都具有通用类型,但问题出在AggregateFunction中,它无法确定输出类型。

注意:实际的流水线具有一个slidingEventTimeWindow分配器和一个与AggregateFunction一起传递的WindowFunction,但可以使用下面的代码更容易地复制错误。

这是一个简单的测试用例,可以复制错误:

@Test
public void aggregateFunction_genericType() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("0", 1), Tuple2.of("0", 2), Tuple2.of("0", 3));

    ConfigAPI cfg = new ConfigAPI();

    source
            .keyBy(k -> k.f0)
            .countWindow(5, 1)
            .aggregate(new GenericAggregateFunc<>(cfg))
            .print();

    env.execute();
}

如你所见,一个Configuration类被传递作为自定义aggregateFunction的参数。这是用户需要实现的部分。

public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    @Override
    public Tuple2<String, Integer> createAcc() {
        return new Tuple2<>("0", 0);
    }

    @Override
    public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
        acc.f1 += in.f1;
        return acc;
    }
}

提供的接口如下:

public interface BaseConfigAPI<In, Acc> {
    Acc createAcc();
    Acc addAccumulators(In in, Acc acc);
    // 其他需要重写的方法
}

GenericAggregateFunction如下:

public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {

    private BaseConfigAPI<In, Acc> cfg;
    GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
        this.cfg = cfg;
    }
    @Override
    public Acc createAccumulator() {
        return cfg.createAcc();
    }
    @Override
    public Acc add(In in, Acc acc) {
        return cfg.addAccumulators(in, acc);
    }
    @Override
    public Acc getResult(Acc acc) {
        return acc;
    }
    @Override
    public Acc merge(Acc acc, Acc acc1) {
        return null;
    }
}

输出日志:

org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

解决方法1(不起作用):
一开始我以为这是通常的“无法确定返回类型”的情况,所以我尝试在`.aggregate(...)`之后添加`.returns(Types.TUPLE(Types.STRING, Types.INT))`,但没有成功。

解决方法2(有效):
我创建了一个带有泛型类型的包装类,命名为`Accumulator<Acc>`,然后将其作为类型传递给`AggregateFunction<In, Accumulator<Acc>, Accumulator<Acc>>`,似乎有效。

尽管这看起来不太优雅,而且与接口的其余部分不太一致。还有其他解决这个问题的方法吗?

编辑:感谢@deduper的时间和见解,我想我找到了一个解决方案。

解决方法3(有效):我创建了一个新的接口,它扩展了我的`BaseConfigAPI`和`AggregateFunction`,如下所示:

```java
public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}

public interface BaseConfigAPI extends Serializable {
    // 这些方法将直接从AggregateFunction接口实现
    // Acc createAcc();
    // Acc addAccumulators(In in, Acc acc);
        
    // 其他需要重写的方法
}

现在,用户只需实现MergedConfigAPI<In, Acc, Out>并将其作为参数传递给.aggregate(...)函数。

更新:我测试了@deduper的第3种解决方法,针对框架,但它也没有起作用。看起来异常是由Acc而不是Out类型引发的。仔细查看.aggregate操作符的内部,我意识到有一个重载的aggregate方法,它接受两个额外的参数。一个是TypeInformation<ACC> accumulatorType,另一个是TypeInformation<R> returnType

这就是在不进行任何代码重构的情况下出现的最简单的解决方法。

解决方法4(有效):

@Test
public void aggregateFunction_genericType() throws Exception {
    ...

    .aggregate(
        new GenericAggregateFunc<>(cfg), 
        Types.TUPLE(Types.STRING, Types.INT),
        Types.TUPLE(Types.STRING, Types.INT))
    ...
}

注意:截止到Flink 1.10.1,aggregate方法被注释为@PublicEvolving。

英文:

My goal is to provide an interface for a stream processing module in Flink 1.10. The pipeline contains an AggregateFunction among other operators. All operators have generic types but the problem lies within the AggregateFunction, which cannot determine the output type.

Note: The actual pipeline has a slidingEventTimeWindow assigner and a WindowFunction passed along with the AggregateFunction, but the error can be reproduced much easier with the code below.

This is a simple test case that reproduces the error:

    @Test
    public void aggregateFunction_genericType() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream&lt;Tuple2&lt;String,Integer&gt;&gt; source = env.fromElements(Tuple2.of(&quot;0&quot;,1), Tuple2.of(&quot;0&quot;,2), Tuple2.of(&quot;0&quot;,3));

        ConfigAPI cfg = new ConfigAPI();

        source
                .keyBy(k -&gt; k.f0)
                .countWindow(5, 1)
                .aggregate(new GenericAggregateFunc&lt;&gt;(cfg))
                .print();


        env.execute();
    }

As you can see, a Configuration class is passed as an argument to the Custom aggregateFunction. This is what the user would implement.

    public static class ConfigAPI implements BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String,Integer&gt;&gt; {
        @Override
        public Tuple2&lt;String, Integer&gt; createAcc() {
            return new Tuple2&lt;&gt;(&quot;0&quot;, 0);
        }

        @Override
        public Tuple2&lt;String, Integer&gt; addAccumulators(Tuple2&lt;String, Integer&gt; in, Tuple2&lt;String, Integer&gt; acc) {
            acc.f1 += in.f1;
            return acc;
        }
    }

The provided interface is:

    public interface BaseConfigAPI&lt;In, Acc&gt; {
        Acc createAcc();
        Acc addAccumulators(In in, Acc acc);
        // other methods to override
    }

The GenericAggregateFunction:

    public static class GenericAggregateFunc&lt;In, Acc&gt; implements AggregateFunction&lt;In, Acc, Acc&gt; {

        private BaseConfigAPI&lt;In, Acc&gt; cfg;
        GenericAggregateFunc(BaseConfigAPI&lt;In, Acc&gt; cfg) {
            this.cfg = cfg;
        }
        @Override
        public Acc createAccumulator() {
            return cfg.createAcc();
        }
        @Override
        public Acc add(In in, Acc acc) {
            return cfg.addAccumulators(in, acc);
        }
        @Override
        public Acc getResult(Acc acc) {
            return acc;
        }
        @Override
        public Acc merge(Acc acc, Acc acc1) {
            return null;
        }
    }

The output log:

org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable &#39;Acc&#39; in &#39;class misc.SlidingWindow$GenericAggregateFunc&#39; could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

Solution 1 (not working):
At first I thought this is the usual case of "return type cannot be determined" so I tried adding

.returns(Types.TUPLE(Types.STRING, Types.INT)) after .aggregate(...) but without success.

Solution 2 (working):
I created a Wrapper class with a generic type, named Accumulator&lt;Acc&gt; which is then passed as Type to the
AggregateFunction&lt;In, Accumulator&lt;Acc&gt;, Accumulator&lt;Acc&gt;&gt; and seems to be working.

This does not look very elegant though and it is not very consistent with the rest of the interface. Is there any other solution to this problem?

Edit: Thanks @deduper for your time and insight, I think I found a solution.

Solution 3 (working): I created a new interface which extends my BaseConfigAPI and the AggregateFunction in the following manner:

public interface MergedConfigAPI&lt;In, Acc, Out&gt; extends BaseConfigAPI, AggregateFunction&lt;In, Acc, Out&gt; {}

public interface BaseConfigAPI extends Serializable {
    //These will be implemented directly from AggregateFunction interface
    //Acc createAcc();
    //Acc addAccumulators(In in, Acc acc);
        
    //other methods to override
}

Now the user must only implement the MergedConfigAPI&lt;In, Acc, Out&gt; and pass it as a parameter to the .aggregate(...) function.

UPDATE: I tested @deduper's 3rd solution against the framework and it didn't work either. It seems like the exception is thrown by the Acc and not the Out type. Taking a closer look at the internals of the .aggregate operator, I realized that there is an overloaded aggregate method that takes 2 more arguments. A TypeInformation&lt;ACC&gt; accumulatorType and a TypeInformation&lt;R&gt; returnType.

This is how the simplest solution emerged without any code refactoring.

Solution 4 (working):

 @Test
 public void aggregateFunction_genericType() throws Exception {
                ...

                .aggregate(
                        new GenericAggregateFunc&lt;&gt;(cfg), 
                        Types.TUPLE(Types.STRING, Types.INT),
                        Types.TUPLE(Types.STRING, Types.INT))
                ...
    }

Note: As of Flink 1.10.1 the aggregate methods are annotated with @PublicEvolving.

答案1

得分: 1

可以。你可以这样做。你自己已经做了。你的错误是因为你使用它(即“使用现场泛型”)而不是你实现它。

我提议以下三种候选解决方案,按简单性递增

最简单的方法是上面的方法,因为你不需要重构你的原始“GenericAggregateFunc”;只需在钻石中填入你想要用来实例化你的泛型类的具体类型参数。

还有另一个稍微不那么简单的解决方案&hellip;

public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg;

    GenericAggregateFunc(BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg) {
        this.cfg = cfg;
    }

    @Override
    public Tuple2<String, Integer> createAccumulator() {
        return cfg.createAcc();
    }

    @Override
    public Tuple2<String, Integer> add(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
        return cfg.addAccumulators(in, acc);
    }

    @Override
    public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
        return acc;
    }

    @Override
    public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc, Tuple2<String, Integer> acc1) {
        return null;
    }
}

虽然这个方法涉及轻微的重构,但它比第一个提出的解决方案更加简化了整个应用程序,我认为

Flink已经为您处理了“复杂的”通用多态性。要插入到Flink中,您只需用您想要实例化的特定类型参数实例化它们内置的通用“AggregateFunction<IN, ACC, OUT>”。在您的情况下,这些类型参数是“Tuple2<String, Integer>”。

所以,第二种解决方案仍然在“使用泛型”,但是以更简单的方式。

还有一种接近您原始实现的选项,但需要进行一些小的重构&hellip;

public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {
    
    ...
    
    @Override
    public Out getResult(Acc acc) {
        return ...;
    }
    
    ...
}

此外,为了强制前提条件,用户的配置实现了与您的函数兼容的接口&hellip;

public interface BaseConfigAPI<In, Acc, Out>{ ... }

我的实验中,我确认将“Out”类型参数添加到“BaseConfigAPI”也使其兼容。

我确实考虑过一个更复杂的替代解决方案。但由于更简单几乎总是更好,我将更复杂的解决方案留给其他人提出。

英文:

> „Can you implement Flink's AggregateFunction with Generic Types?

Yes. You can. As you've done yourself already. Your error is a result of how you used it (as inuse-site generics“) rather than how you implemented it.

> „...Is there any other solution to this problem?...

I propose the following three candidate solutions in ascending order of simplicity&hellip;

...
source
       .keyBy(k -&gt; k.f0)
       .countWindow(5, 1)
       .aggregate(new GenericAggregateFunc&lt; Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt; &gt;(cfg)) /* filling in the diamond will aid type inference */
       .print();
...

The above is the simplest because you wouldn't have to refactor your original GenericAgregateFunc; simply fill in the diamond with the specific type arguments you want to instantiate your generic class with.

There is also another slightly less simple solution&hellip;

public static class GenericAggregateFunc implements AggregateFunction&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; {

    private BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; cfg;
    GenericAggregateFunc(BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; cfg) {
        this.cfg = cfg;
    }
    @Override
    public Tuple2&lt;String, Integer&gt; createAccumulator() {
        return cfg.createAcc();
    }
    @Override
    public Tuple2&lt;String, Integer&gt; add(Tuple2&lt;String, Integer&gt; in, Tuple2&lt;String, Integer&gt; acc) {
        return cfg.addAccumulators(in, acc);
    }
    @Override
    public Tuple2&lt;String, Integer&gt; getResult(Tuple2&lt;String, Integer&gt; acc) {
        return acc;
    }
    @Override
    public Tuple2&lt;String, Integer&gt; merge(Tuple2&lt;String, Integer&gt; acc, Tuple2&lt;String, Integer&gt; acc1) {
        return null;
    }
}

Although this one involves a minor refactor, it simplifies your entire application more than the first proposed solution — in my opinion.

Flink already handles the „complicated“ Generic polymorphism for you. All you have to do, to plug-in to Flink, is simply instantiate their built-in generic AggregateFunction&lt;IN, ACC, OUT&gt; with the specific type arguments you want to instantiate it with. Those type arguments being of type Tuple2&lt;String, Integer&gt; in your case.

So you're still „using Generics“ with the second solution, but you're doing so in a much simpler way.

Another option closer to your original implementation, but with a couple minor refactors&hellip;

public static class GenericAggregateFunc&lt;In, Acc, Out&gt; implements AggregateFunction&lt;In, Acc, Out&gt; {
    
    ...
    @Override
    public Out getResult(Acc acc) {
        return ...;
    }
    ...
}

Also, to force the precondition that the user's config implements an interface that's compatible with your function&hellip;

public interface BaseConfigAPI&lt; In, Acc, Out &gt;{ ... }

In my experiment I've confirmed that adding the Out type parameter to BaseConfigAPI too, makes it compatible.

I did have a more complicated alternative solution in mind. But since simpler is almost always better, I'll leave that more complicated solution for somebody else to propose.

huangapple
  • 本文由 发表于 2020年8月13日 00:11:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/63380582.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定