CPU负载在Flink运行超过12小时时会增加的原因。

huangapple go评论78阅读模式
英文:

Causes of CPU load on the machine increase when flink run more more than 12 hours

问题

我有一个Flink作业,并行度设置为6,进行了一些简单的转换。问题是,当Flink运行超过12小时时,例如机器的负载开始增加。起初我以为这是因为一天中某些小时段流入Flink的流量,但问题是,当流量下降时,机器上的负载仍然会略高,虽然低于之前,但仍然较高。

用例:

DataStream<Event> from_source = rabbitConsumer
                .flatMap(new RabbitMQConsumer())
                .assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source 
                    .filter(new NullidsFilterFunction())
KeyedStream<String, Event> keyed_stream = data_stream.keyby(k->k.id);

/*一个有状态的操作符*/
data_stream.map(new EventCount(x))
            .keyBy(k -> new Date(k.timestamp.getTime()).toString())
            .window(TumblingEventTimeWindows.of(Time.ninutes(30)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*两个有状态的操作符*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*三*/
keyed_stream.filter(new FilterFunction())
            .map(new AvailabilityCheckClass())
            .addSink(new SinkFuncion());

/*四*/
product_view_keyed_stream = data_stream
            .filter(new FilterFunction())
            .map(new EventProdView(x))
            .keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
  product_view_keyed_stream.addSink(new SinkFuncion());

/*五个有状态的操作符*/
product_view_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
          .process(new MyProcessWindowFunction())
          .addSink(new SinkFuncion());

/*带有4个ConcurrentHashMap的六个有状态的操作符*/
keyed_stream.flatmap(new FlatMapFunction())
            .addSink(new SinkFuncion());

/*七个有状态的操作符*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*八个有状态的操作符*/
data_stream.filter(new FilterFunction())
           .keyBy(k -> k.rType.equals("ClickIdent") ? k.avidat : k.email_sha2)
           .flatmap(new FlatMapFunction())
           .addSink(new SinkFuncion());

我的问题是:当我的Flink作业运行超过6小时时,高CPU使用率的原因可能是什么?

洞察力:堆内存看起来正常(没有OOM),所有检查点都已完成,没有丢失的事件,JVM CPU消耗也看起来正常,CMS GC年轻代计数器始终在增加(尽管这应该是正常的,因为它是一个计数器,但增加得太快),此作业作为简单的Java应用程序运行(本地执行,而不是作为带有Flink安装的集群,只是使用java -jar flink.jar运行,不知道是否与此有关,只是共享信息)。

一个小时的示例
CPU负载在Flink运行超过12小时时会增加的原因。

非常感谢!

英文:

I have a flink job, with parallelism set to 6, few simple transformations and the issue is that when Flink is been running for more than 12 hours for example the Load on the machine start to increase, then I thought that was because of the traffic into flink during some hours of the day, but the issue is that when the traffic goes down, the load on the machine continue a bit higher, lower than before but still higher.

Use cases:

DataStream&lt;Event&gt; from_source = rabbitConsumer
                .flatMap(new RabbitMQConsumer())
                .assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator&lt;Event&gt; data_stream = from_source 
                    .filter(new NullidsFilterFunction())
KeyedStream&lt;String, Event&gt; keyed_stream = data_stream.keyby(k-&gt;k.id);

/*one stateful operator*/
data_stream.map(new EventCount(x))
            .keyBy(k -&gt; new Date(k.timestamp.getTime()).toString())
            .window(TumblingEventTimeWindows.of(Time.ninutes(30)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*two stateful operator*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*three*/
keyed_stream.filter(new FilterFunction())
            .map(new AvailabilityCheckClass())
            .addSink(new SinkFuncion());

/*four*/
product_view_keyed_stream = data_stream
            .filter(new FilterFunction())
            .map(new EventProdView(x))
            .keyBy(k -&gt; k.id+ new Date(k.timestamp.getTime()));
  product_view_keyed_stream.addSink(new SinkFuncion());

/*five stateful operator*/
product_view_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
          .process(new MyProcessWindowFunction())
          .addSink(new SinkFuncion());

/*Six stateful operator with 4 ConcurrentHashMap into the state*/
keyed_stream.flatmap(new FlatMapFunction())
            .addSink(new SinkFuncion());

/*seven stateful operator*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*eight stateful operator*/
data_stream.filter(new FilterFunction())
           .keyBy(k -&gt; k.rType.equals(&quot;ClickIdent&quot;) ? k.avidat : k.email_sha2)
           .flatmap(new FlatMapFunction())
           .addSink(new SinkFuncion());

Mi question: What could be the cause of the high CPU Uses when my flink job is running for more than 6 hours for example.

Insights: Heap Memory looks fine(no OOM), checkpoints are all completed, no losing events, JVM CPU consumption looks fine too, CMS GC young generation counter always increases (this worries me despite that should be normal because it is a counter, but increases too fast), this job is running as a simple java application (local execution not as a cluster with a flink installation, just java -jar flink.jar don't know if this has anything to do, just sharing information)

One hour example
CPU负载在Flink运行超过12小时时会增加的原因。

Thanks a lot!

答案1

得分: 1

Since you are using a heap-based state backend (the FSStateBackend keeps its working state on the JVM heap), and the state TTL is configured to 1 (or 3) days, it's to be expected that the state size will grow. How much it will grow is very application specific; it depends on how your key space grows over time.

Can you put the 603MB checkpoint size in some context? I.e., how much state is that for each distinct key? It sounds like you are surprised, but it's not clear why.

There are many reasons why checkpoints can become slow, but generally this is an indication of either backpressure or some sort of resource contention in reaching the remote filesystem -- i.e., S3 rate limits. If you look in the Flink WebUI at the checkpointing statistics, you can look for clues there. Look to see if the checkpoint barriers taking a long time to traverse the execution graph, or if is it taking a long time for the asynchronous part of the checkpointing to write the checkpoint to the remote disks. And look for asymmetries -- is one instance taking much longer and having much more state than others?

If you are doing any blocking i/o in a user function, that can cause trouble. Or you may have significant data skew (e.g., a hot key). Or a slow network between the task manager and the distributed filesystem. Or the cluster may be under provisioned -- you may need to increase the parallelism.

You may need to increase the checkpoint timeout. If at some point the checkpoint duration becomes truly problematic, you could switch to using the RocksDB state backend, in order to be able to use incremental checkpointing (but whether this will help depends on what's going on). Or you could change the state TTL configuration to purge state more quickly.

英文:

Since you are using a heap-based state backend (the FSStateBackend keeps its working state on the JVM heap), and the state TTL is configured to 1 (or 3) days, it's to be expected that the state size will grow. How much it will grow is very application specific; it depends on how your key space grows over time.

Can you put the 603MB checkpoint size in some context? I.e., how much state is that for each distinct key? It sounds like you are surprised, but it's not clear why.

There are many reasons why checkpoints can become slow, but generally this is an indication of either backpressure or some sort of resource contention in reaching the remote filesystem -- i.e., S3 rate limits. If you look in the Flink WebUI at the checkpointing statistics, you can look for clues there. Look to see if the checkpoint barriers taking a long time to traverse the execution graph, or if is it taking a long time for the asynchronous part of the checkpointing to write the checkpoint to the remote disks. And look for asymmetries -- is one instance taking much longer and having much more state than others?

If you are doing any blocking i/o in a user function, that can cause trouble. Or you may have significant data skew (e.g., a hot key). Or a slow network between the task manager and the distributed filesystem. Or the cluster may be under provisioned -- you may need to increase the parallelism.

You may need to increase the checkpoint timeout. If at some point the checkpoint duration becomes truly problematic, you could switch to using the RocksDB state backend, in order to be able to use incremental checkpointing (but whether this will help depends on what's going on). Or you could change the state TTL configuration to purge state more quickly.

huangapple
  • 本文由 发表于 2020年8月1日 01:07:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/63196196.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定