Can you implement Flink's AggregateFunction with Generic Types?

Can you implement Flink's AggregateFunction with Generic Types?


我的目标是为Flink 1.10中的流处理模块提供一个接口。流水线包含一个AggregateFunction以及其他操作符。所有操作符都具有通用类型,但问题出在AggregateFunction中,它无法确定输出类型。



public void aggregateFunction_genericType() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("0", 1), Tuple2.of("0", 2), Tuple2.of("0", 3));

    ConfigAPI cfg = new ConfigAPI();

            .keyBy(k -> k.f0)
            .countWindow(5, 1)
            .aggregate(new GenericAggregateFunc<>(cfg))



public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    public Tuple2<String, Integer> createAcc() {
        return new Tuple2<>("0", 0);

    public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
        acc.f1 += in.f1;
        return acc;


public interface BaseConfigAPI<In, Acc> {
    Acc createAcc();
    Acc addAccumulators(In in, Acc acc);
    // 其他需要重写的方法


public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {

    private BaseConfigAPI<In, Acc> cfg;
    GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
        this.cfg = cfg;
    public Acc createAccumulator() {
        return cfg.createAcc();
    public Acc add(In in, Acc acc) {
        return cfg.addAccumulators(in, acc);
    public Acc getResult(Acc acc) {
        return acc;
    public Acc merge(Acc acc, Acc acc1) {
        return null;


Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

一开始我以为这是通常的“无法确定返回类型”的情况,所以我尝试在`.aggregate(...)`之后添加`.returns(Types.TUPLE(Types.STRING, Types.INT))`,但没有成功。

我创建了一个带有泛型类型的包装类,命名为`Accumulator<Acc>`,然后将其作为类型传递给`AggregateFunction<In, Accumulator<Acc>, Accumulator<Acc>>`,似乎有效。




public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}

public interface BaseConfigAPI extends Serializable {
    // 这些方法将直接从AggregateFunction接口实现
    // Acc createAcc();
    // Acc addAccumulators(In in, Acc acc);
    // 其他需要重写的方法

现在,用户只需实现MergedConfigAPI<In, Acc, Out>并将其作为参数传递给.aggregate(...)函数。

更新:我测试了@deduper的第3种解决方法,针对框架,但它也没有起作用。看起来异常是由Acc而不是Out类型引发的。仔细查看.aggregate操作符的内部,我意识到有一个重载的aggregate方法,它接受两个额外的参数。一个是TypeInformation<ACC> accumulatorType,另一个是TypeInformation<R> returnType



public void aggregateFunction_genericType() throws Exception {

        new GenericAggregateFunc<>(cfg), 
        Types.TUPLE(Types.STRING, Types.INT),
        Types.TUPLE(Types.STRING, Types.INT))

注意:截止到Flink 1.10.1,aggregate方法被注释为@PublicEvolving。


My goal is to provide an interface for a stream processing module in Flink 1.10. The pipeline contains an AggregateFunction among other operators. All operators have generic types but the problem lies within the AggregateFunction, which cannot determine the output type.

Note: The actual pipeline has a slidingEventTimeWindow assigner and a WindowFunction passed along with the AggregateFunction, but the error can be reproduced much easier with the code below.

This is a simple test case that reproduces the error:

    public void aggregateFunction_genericType() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream&lt;Tuple2&lt;String,Integer&gt;&gt; source = env.fromElements(Tuple2.of(&quot;0&quot;,1), Tuple2.of(&quot;0&quot;,2), Tuple2.of(&quot;0&quot;,3));

        ConfigAPI cfg = new ConfigAPI();

                .keyBy(k -&gt; k.f0)
                .countWindow(5, 1)
                .aggregate(new GenericAggregateFunc&lt;&gt;(cfg))


As you can see, a Configuration class is passed as an argument to the Custom aggregateFunction. This is what the user would implement.

    public static class ConfigAPI implements BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String,Integer&gt;&gt; {
        public Tuple2&lt;String, Integer&gt; createAcc() {
            return new Tuple2&lt;&gt;(&quot;0&quot;, 0);

        public Tuple2&lt;String, Integer&gt; addAccumulators(Tuple2&lt;String, Integer&gt; in, Tuple2&lt;String, Integer&gt; acc) {
            acc.f1 += in.f1;
            return acc;

The provided interface is:

    public interface BaseConfigAPI&lt;In, Acc&gt; {
        Acc createAcc();
        Acc addAccumulators(In in, Acc acc);
        // other methods to override

The GenericAggregateFunction:

    public static class GenericAggregateFunc&lt;In, Acc&gt; implements AggregateFunction&lt;In, Acc, Acc&gt; {

        private BaseConfigAPI&lt;In, Acc&gt; cfg;
        GenericAggregateFunc(BaseConfigAPI&lt;In, Acc&gt; cfg) {
            this.cfg = cfg;
        public Acc createAccumulator() {
            return cfg.createAcc();
        public Acc add(In in, Acc acc) {
            return cfg.addAccumulators(in, acc);
        public Acc getResult(Acc acc) {
            return acc;
        public Acc merge(Acc acc, Acc acc1) {
            return null;

The output log:

Type of TypeVariable &#39;Acc&#39; in &#39;class misc.SlidingWindow$GenericAggregateFunc&#39; could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

Solution 1 (not working):
At first I thought this is the usual case of "return type cannot be determined" so I tried adding

.returns(Types.TUPLE(Types.STRING, Types.INT)) after .aggregate(...) but without success.

Solution 2 (working):
I created a Wrapper class with a generic type, named Accumulator&lt;Acc&gt; which is then passed as Type to the
AggregateFunction&lt;In, Accumulator&lt;Acc&gt;, Accumulator&lt;Acc&gt;&gt; and seems to be working.

This does not look very elegant though and it is not very consistent with the rest of the interface. Is there any other solution to this problem?

Edit: Thanks @deduper for your time and insight, I think I found a solution.

Solution 3 (working): I created a new interface which extends my BaseConfigAPI and the AggregateFunction in the following manner:

public interface MergedConfigAPI&lt;In, Acc, Out&gt; extends BaseConfigAPI, AggregateFunction&lt;In, Acc, Out&gt; {}

public interface BaseConfigAPI extends Serializable {
    //These will be implemented directly from AggregateFunction interface
    //Acc createAcc();
    //Acc addAccumulators(In in, Acc acc);
    //other methods to override

Now the user must only implement the MergedConfigAPI&lt;In, Acc, Out&gt; and pass it as a parameter to the .aggregate(...) function.

UPDATE: I tested @deduper's 3rd solution against the framework and it didn't work either. It seems like the exception is thrown by the Acc and not the Out type. Taking a closer look at the internals of the .aggregate operator, I realized that there is an overloaded aggregate method that takes 2 more arguments. A TypeInformation&lt;ACC&gt; accumulatorType and a TypeInformation&lt;R&gt; returnType.

This is how the simplest solution emerged without any code refactoring.

Solution 4 (working):

 public void aggregateFunction_genericType() throws Exception {

                        new GenericAggregateFunc&lt;&gt;(cfg), 
                        Types.TUPLE(Types.STRING, Types.INT),
                        Types.TUPLE(Types.STRING, Types.INT))

Note: As of Flink 1.10.1 the aggregate methods are annotated with @PublicEvolving.


得分: 1





public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg;

    GenericAggregateFunc(BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg) {
        this.cfg = cfg;

    public Tuple2<String, Integer> createAccumulator() {
        return cfg.createAcc();

    public Tuple2<String, Integer> add(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
        return cfg.addAccumulators(in, acc);

    public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
        return acc;

    public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc, Tuple2<String, Integer> acc1) {
        return null;


Flink已经为您处理了“复杂的”通用多态性。要插入到Flink中,您只需用您想要实例化的特定类型参数实例化它们内置的通用“AggregateFunction<IN, ACC, OUT>”。在您的情况下,这些类型参数是“Tuple2<String, Integer>”。



public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {
    public Out getResult(Acc acc) {
        return ...;


public interface BaseConfigAPI<In, Acc, Out>{ ... }




> „Can you implement Flink's AggregateFunction with Generic Types?

Yes. You can. As you've done yourself already. Your error is a result of how you used it (as inuse-site generics“) rather than how you implemented it.

> „...Is there any other solution to this problem?...

I propose the following three candidate solutions in ascending order of simplicity&hellip;

       .keyBy(k -&gt; k.f0)
       .countWindow(5, 1)
       .aggregate(new GenericAggregateFunc&lt; Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt; &gt;(cfg)) /* filling in the diamond will aid type inference */

The above is the simplest because you wouldn't have to refactor your original GenericAgregateFunc; simply fill in the diamond with the specific type arguments you want to instantiate your generic class with.

There is also another slightly less simple solution&hellip;

public static class GenericAggregateFunc implements AggregateFunction&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; {

    private BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; cfg;
    GenericAggregateFunc(BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; cfg) {
        this.cfg = cfg;
    public Tuple2&lt;String, Integer&gt; createAccumulator() {
        return cfg.createAcc();
    public Tuple2&lt;String, Integer&gt; add(Tuple2&lt;String, Integer&gt; in, Tuple2&lt;String, Integer&gt; acc) {
        return cfg.addAccumulators(in, acc);
    public Tuple2&lt;String, Integer&gt; getResult(Tuple2&lt;String, Integer&gt; acc) {
        return acc;
    public Tuple2&lt;String, Integer&gt; merge(Tuple2&lt;String, Integer&gt; acc, Tuple2&lt;String, Integer&gt; acc1) {
        return null;

Although this one involves a minor refactor, it simplifies your entire application more than the first proposed solution — in my opinion.

Flink already handles the „complicated“ Generic polymorphism for you. All you have to do, to plug-in to Flink, is simply instantiate their built-in generic AggregateFunction&lt;IN, ACC, OUT&gt; with the specific type arguments you want to instantiate it with. Those type arguments being of type Tuple2&lt;String, Integer&gt; in your case.

So you're still „using Generics“ with the second solution, but you're doing so in a much simpler way.

Another option closer to your original implementation, but with a couple minor refactors&hellip;

public static class GenericAggregateFunc&lt;In, Acc, Out&gt; implements AggregateFunction&lt;In, Acc, Out&gt; {
    public Out getResult(Acc acc) {
        return ...;

Also, to force the precondition that the user's config implements an interface that's compatible with your function&hellip;

public interface BaseConfigAPI&lt; In, Acc, Out &gt;{ ... }

In my experiment I've confirmed that adding the Out type parameter to BaseConfigAPI too, makes it compatible.

I did have a more complicated alternative solution in mind. But since simpler is almost always better, I'll leave that more complicated solution for somebody else to propose.

