英文:
Causes of CPU load on the machine increase when flink run more more than 12 hours
问题
我有一个Flink作业,并行度设置为6,进行了一些简单的转换。问题是,当Flink运行超过12小时时,例如机器的负载开始增加。起初我以为这是因为一天中某些小时段流入Flink的流量,但问题是,当流量下降时,机器上的负载仍然会略高,虽然低于之前,但仍然较高。
用例:
DataStream<Event> from_source = rabbitConsumer
.flatMap(new RabbitMQConsumer())
.assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source
.filter(new NullidsFilterFunction())
KeyedStream<String, Event> keyed_stream = data_stream.keyby(k->k.id);
/*一个有状态的操作符*/
data_stream.map(new EventCount(x))
.keyBy(k -> new Date(k.timestamp.getTime()).toString())
.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*两个有状态的操作符*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*三*/
keyed_stream.filter(new FilterFunction())
.map(new AvailabilityCheckClass())
.addSink(new SinkFuncion());
/*四*/
product_view_keyed_stream = data_stream
.filter(new FilterFunction())
.map(new EventProdView(x))
.keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
product_view_keyed_stream.addSink(new SinkFuncion());
/*五个有状态的操作符*/
product_view_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*带有4个ConcurrentHashMap的六个有状态的操作符*/
keyed_stream.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
/*七个有状态的操作符*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*八个有状态的操作符*/
data_stream.filter(new FilterFunction())
.keyBy(k -> k.rType.equals("ClickIdent") ? k.avidat : k.email_sha2)
.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
我的问题是:当我的Flink作业运行超过6小时时,高CPU使用率的原因可能是什么?
洞察力:堆内存看起来正常(没有OOM),所有检查点都已完成,没有丢失的事件,JVM CPU消耗也看起来正常,CMS GC年轻代计数器始终在增加(尽管这应该是正常的,因为它是一个计数器,但增加得太快),此作业作为简单的Java应用程序运行(本地执行,而不是作为带有Flink安装的集群,只是使用java -jar flink.jar
运行,不知道是否与此有关,只是共享信息)。
非常感谢!
英文:
I have a flink job, with parallelism set to 6, few simple transformations and the issue is that when Flink is been running for more than 12 hours for example the Load on the machine start to increase, then I thought that was because of the traffic into flink during some hours of the day, but the issue is that when the traffic goes down, the load on the machine continue a bit higher, lower than before but still higher.
Use cases:
DataStream<Event> from_source = rabbitConsumer
.flatMap(new RabbitMQConsumer())
.assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source
.filter(new NullidsFilterFunction())
KeyedStream<String, Event> keyed_stream = data_stream.keyby(k->k.id);
/*one stateful operator*/
data_stream.map(new EventCount(x))
.keyBy(k -> new Date(k.timestamp.getTime()).toString())
.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*two stateful operator*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*three*/
keyed_stream.filter(new FilterFunction())
.map(new AvailabilityCheckClass())
.addSink(new SinkFuncion());
/*four*/
product_view_keyed_stream = data_stream
.filter(new FilterFunction())
.map(new EventProdView(x))
.keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
product_view_keyed_stream.addSink(new SinkFuncion());
/*five stateful operator*/
product_view_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*Six stateful operator with 4 ConcurrentHashMap into the state*/
keyed_stream.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
/*seven stateful operator*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*eight stateful operator*/
data_stream.filter(new FilterFunction())
.keyBy(k -> k.rType.equals("ClickIdent") ? k.avidat : k.email_sha2)
.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
Mi question: What could be the cause of the high CPU Uses when my flink job is running for more than 6 hours for example.
Insights: Heap Memory looks fine(no OOM), checkpoints are all completed, no losing events, JVM CPU consumption looks fine too, CMS GC young generation counter always increases (this worries me despite that should be normal because it is a counter, but increases too fast), this job is running as a simple java application (local execution not as a cluster with a flink installation, just java -jar flink.jar don't know if this has anything to do, just sharing information)
Thanks a lot!
答案1
得分: 1
Since you are using a heap-based state backend (the FSStateBackend keeps its working state on the JVM heap), and the state TTL is configured to 1 (or 3) days, it's to be expected that the state size will grow. How much it will grow is very application specific; it depends on how your key space grows over time.
Can you put the 603MB checkpoint size in some context? I.e., how much state is that for each distinct key? It sounds like you are surprised, but it's not clear why.
There are many reasons why checkpoints can become slow, but generally this is an indication of either backpressure or some sort of resource contention in reaching the remote filesystem -- i.e., S3 rate limits. If you look in the Flink WebUI at the checkpointing statistics, you can look for clues there. Look to see if the checkpoint barriers taking a long time to traverse the execution graph, or if is it taking a long time for the asynchronous part of the checkpointing to write the checkpoint to the remote disks. And look for asymmetries -- is one instance taking much longer and having much more state than others?
If you are doing any blocking i/o in a user function, that can cause trouble. Or you may have significant data skew (e.g., a hot key). Or a slow network between the task manager and the distributed filesystem. Or the cluster may be under provisioned -- you may need to increase the parallelism.
You may need to increase the checkpoint timeout. If at some point the checkpoint duration becomes truly problematic, you could switch to using the RocksDB state backend, in order to be able to use incremental checkpointing (but whether this will help depends on what's going on). Or you could change the state TTL configuration to purge state more quickly.
英文:
Since you are using a heap-based state backend (the FSStateBackend keeps its working state on the JVM heap), and the state TTL is configured to 1 (or 3) days, it's to be expected that the state size will grow. How much it will grow is very application specific; it depends on how your key space grows over time.
Can you put the 603MB checkpoint size in some context? I.e., how much state is that for each distinct key? It sounds like you are surprised, but it's not clear why.
There are many reasons why checkpoints can become slow, but generally this is an indication of either backpressure or some sort of resource contention in reaching the remote filesystem -- i.e., S3 rate limits. If you look in the Flink WebUI at the checkpointing statistics, you can look for clues there. Look to see if the checkpoint barriers taking a long time to traverse the execution graph, or if is it taking a long time for the asynchronous part of the checkpointing to write the checkpoint to the remote disks. And look for asymmetries -- is one instance taking much longer and having much more state than others?
If you are doing any blocking i/o in a user function, that can cause trouble. Or you may have significant data skew (e.g., a hot key). Or a slow network between the task manager and the distributed filesystem. Or the cluster may be under provisioned -- you may need to increase the parallelism.
You may need to increase the checkpoint timeout. If at some point the checkpoint duration becomes truly problematic, you could switch to using the RocksDB state backend, in order to be able to use incremental checkpointing (but whether this will help depends on what's going on). Or you could change the state TTL configuration to purge state more quickly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论