Flink聚合状态很大,如何修复?

huangapple go评论84阅读模式
英文:

flink aggregate state is huge, how to fix

问题

我试图使用不同窗口大小(窗口大小在流数据中)来计算流数据中的数据,所以我使用了自定义的WindowAssigner和AggregateFunction,但状态很大(窗口范围从一小时到30天)。

在我看来,聚合状态只存储中间结果。

是否有什么问题?

英文:

I am try to count data in stream with different window size(the size of window is in steam data), so I use custom WindowAssigner and AggregateFunction, but state is huge (window range from one hour to 30 day)

In my mind aggregate state is only store intermediate result

Is there something wrong?

public class ElementProcessingTime extends WindowAssigner<Element, TimeWindow> {
    @Override public Collection<TimeWindow> assignWindows(Element element, long timestamp, WindowAssignerContext context) {
        long slide = Time.seconds(10).toMilliseconds();
        long size = element.getTime() * 60 * 1000;
        timestamp = context.getCurrentProcessingTime();

        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, 0, slide);
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    }

    @Override public Trigger<FactorCalDetail, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ElementTimeTrigger.create();
    }

    @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override public boolean isEventTime() {
        return false;
    }
}

public class CountAggregate implements AggregateFunction<FactorCalDetail, AggregateResult, AggregateResult> {

    @Override public AggregateResult createAccumulator() {
        AggregateResult result = new AggregateResult();
        result.setResult(0.0);
        return result;
    }

    @Override public AggregateResult add(FactorCalDetail value, AggregateResult accumulator) {
        accumulator.setKey(value.getGroupKey());
        accumulator.addResult();
        accumulator.setTimeSpan(value.getTimeSpan());
        return accumulator;
    }

    @Override public AggregateResult getResult(AggregateResult accumulator) {
        return accumulator;
    }

    @Override public AggregateResult merge(AggregateResult a, AggregateResult b) {
        if (a.getKey().equals(b.getKey())) {
            a.setResult(a.getResult() + b.getResult());
        }
        return a;
    }
}

env.addSource(source)
    .keyBy(Element::getKey)
    .window(new ElementProcessingTime())
    .aggregate(new CountAggregate())
    .addSink(new RedisCustomizeSink(redisProperties));

答案1

得分: 1

当您分配自定义窗口时,状态大小可能会迅速变得庞大。这主要是因为每个窗口需要保存其内的所有记录,直到窗口被聚合并最终被清除。在您的代码中,似乎每个记录都会创建大量的窗口。

您没有指定您的用例,但我假设您实际上想要计算在给定时间点上有多少事件延伸,每个键都有一个10毫秒的时间段。如果是这样,那么这不是直接适用于窗口的用例。

您想要做的是:

  1. 将事件拆分为较小的事件。
  2. 按键和时间段分组。
  3. 计算您的时间段。

在代码中大致草图如下:

input.flatMap(element -> {
        ...
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            emit(new KeyTime(key, start));
        }
    })
    .keyBy(keyTime -> keyTime)
    .count()

您可以在keyBy之后应用窗口,以强制特定的输出属性,例如等待几分钟,然后输出所有内容并忽略迟到的事件。

注意:KeyTime是一个简单的POJO,保存键和时间段。

编辑:根据您的评论,解决方案实际上要简单得多。

env.addSource(source)
    .keyBy(element -> new Tuple2<>(element.getKey(), element.getTime()))
    .count()
    .addSink(new RedisCustomizeSink(redisProperties));

请注意,这是经过翻译的代码部分。

英文:

When you assign custom windows, the state size may quickly go out of hand. That's mainly because each window need to hold all records that fall within it until the window is aggregated and eventually evicted. In your code, it also seems like you create a huge amount of windows per record.

You didn't specify your use case, but I'm assuming that you actually want to calculate how many events stretch over a given point in time for each key with a 10 ms bin size. If so, then this is not directly a use case for windows.

What you want to do is:

  1. Split your event into smaller events.
  2. Group by key and bin.
  3. Count your bin.

Rough sketch in code:

input.flatMap(element -&gt; {
...
for (long start = lastStart; start &gt; timestamp - size; start -= slide) {
emit(new KeyTime(key, start));
}
})
.keyBy(keyTime -&gt; keyTime)
.count()

You may apply windows after the keyBy to force certain output properties, such as wait for a few minutes and then output everything and ignore late events.

Note: KeyTime is a simple POJO holding the key and the bin time.

edit: after your comment, the solution is actually much simpler.

env.addSource(source)
.keyBy(element -&gt; new Tuple2&lt;&gt;(element.getKey(), element.getTime()))
.count()
.addSink(new RedisCustomizeSink(redisProperties));

答案2

得分: 0

你没有说明来源是什么,而且它将有自己的状态以进行持久化。您还没有说明有多少个唯一的键。即使每个键都有少量状态,随着唯一键的数量增加,状态也会急剧增长。如果问题最终出现在聚合器状态的增长中,您可以尝试将窗口逻辑拆分为两个窗口系列,一个用于每小时聚合,另一个用于将每小时的总计聚合到所需的时间框架中。

英文:

You don't say what source is and that will have its own state to persist. You also don't say how many unique keys there are. Even a small amount of state per key can grow huge as the number of unique keys increases. If the problem does end up being somewhere in the growth of the aggregator state, you might try splitting the windowing logic into a series of two windows, one to aggregate hourly and a second to aggregate the hourly rollups to your desired timeframe.

huangapple
  • 本文由 发表于 2020年1月6日 16:49:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/59609065.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定