英文:
Flink Task Manager timeout
问题
以下是翻译好的内容:
我的程序在处理越来越多的记录时变得非常缓慢。我最初认为这是由于程序过度消耗内存,因为我的程序对字符串的使用较多(我正在使用Java 11,因此应尽可能使用紧凑字符串),所以我增加了JVM堆大小:
-Xms2048m
-Xmx6144m
我还增加了任务管理器的内存以及超时时间,flink-conf.yaml
:
jobmanager.heap.size: 6144m
heartbeat.timeout: 5000000
然而,这些都没有帮助解决问题。程序仍然在大约处理了350万条记录后变得非常缓慢,只剩下大约50万条记录。当程序接近350万标记时,它变得非常缓慢,直到最终超时,总执行时间约为11分钟。
我在VisualVm中检查了内存消耗,但内存消耗从未超过约700MB。我的flink流水线如下所示:
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1);
environment.setParallelism(1);
DataStream<Tuple> stream = environment.addSource(new TPCHQuery3Source(filePaths, relations));
stream.process(new TPCHQuery3Process(relations)).addSink(new FDSSink());
environment.execute("FlinkDataService");
大部分工作都在process函数中完成,我正在实现数据库连接算法,并且列以字符串形式存储,具体来说,我正在实现TPCH基准测试的查询3,如果您希望可以在此处查看 https://examples.citusdata.com/tpch_queries.html。
超时错误如下所示:
java.util.concurrent.TimeoutException: 任务管理器ID为<id>的心跳超时。
我也曾遇到过以下错误:
Exception in thread "pool-1-thread-1" java.lang.OutOfMemoryError: Java堆空间
此外,我的VisualVM监视在事情变得非常缓慢时捕获了截屏:
这是我的源函数运行循环:
while (run) {
readers.forEach(reader -> {
try {
String line = reader.readLine();
if (line != null) {
Tuple tuple = lineToTuple(line, counter.get() % filePaths.size());
if (tuple != null && isValidTuple(tuple)) {
sourceContext.collect(tuple);
}
} else {
closedReaders.add(reader);
if (closedReaders.size() == filePaths.size()) {
System.out.println("所有文件已经被流式传输");
cancel();
}
}
counter.getAndIncrement();
} catch (IOException e) {
e.printStackTrace();
}
});
}
基本上,我从我需要的3个文件中读取一行,基于文件的顺序,构造一个元组对象,这是我称为元组的自定义类,表示表中的一行,并在它满足特定日期条件时发出该元组。
我还建议JVM在第100万、150万、200万和250万条记录时进行垃圾回收,像这样:
System.gc()
有关如何优化此过程的任何想法吗?
英文:
My program gets very slow as more and more records are processed. I initially thought it is due to excessive memory consumption as my program is String intensive (I am using Java 11 so compact strings should be used whenever possible) so I increased the JVM Heap:
-Xms2048m
-Xmx6144m
I also increased the task manager's memory as well as timeout, flink-conf.yaml
:
jobmanager.heap.size: 6144m
heartbeat.timeout: 5000000
However, none of this helped with the issue. The Program still gets very slow at about the same point which is after processing roughly 3.5 million records, only about 0.5 million more to go. As the program approaches the 3.5 million mark it gets very very slow until it eventually times out, total execution time is about 11 minutes.
I checked the memory consumption in VisualVm, but the memory consumption never goes more than about 700MB.My flink pipeline looks as follows:
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1);
environment.setParallelism(1);
DataStream<Tuple> stream = environment.addSource(new TPCHQuery3Source(filePaths, relations));
stream.process(new TPCHQuery3Process(relations)).addSink(new FDSSink());
environment.execute("FlinkDataService");
Where the bulk of the work is done in the process function, I am implementing data base join algorithms and the columns are stored as Strings, specifically I am implementing query 3 of the TPCH benchmark, check here if you wish https://examples.citusdata.com/tpch_queries.html.
The timeout error is this:
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id <id> timed out.
Once I got this error as well:
Exception in thread "pool-1-thread-1" java.lang.OutOfMemoryError: Java heap space
Also, my VisualVM monitoring, screenshot is captured at the point where things get very slow:
Here is the run loop of my source function:
while (run) {
readers.forEach(reader -> {
try {
String line = reader.readLine();
if (line != null) {
Tuple tuple = lineToTuple(line, counter.get() % filePaths.size());
if (tuple != null && isValidTuple(tuple)) {
sourceContext.collect(tuple);
}
} else {
closedReaders.add(reader);
if (closedReaders.size() == filePaths.size()) {
System.out.println("ALL FILES HAVE BEEN STREAMED");
cancel();
}
}
counter.getAndIncrement();
} catch (IOException e) {
e.printStackTrace();
}
});
}
I basically read a line of each of the 3 files I need, based on the order of the files, I construct a tuple object which is my custom class called tuple representing a row in a table, and emit that tuple if it is valid i.e. fullfils certain conditions on the date.
I am also suggesting the JVM to do garbage collection at the 1 millionth, 1.5millionth, 2 millionth and 2.5 millionth record like this:
System.gc()
Any thoughts on how I can optimize this?
答案1
得分: 1
intern()
方法拯救了我。在将字符串存储到映射中之前,我对每个字符串进行了内部化处理,这起到了奇妙的作用。
英文:
String intern()
saved me. I did intern on every string before storing it in my maps and that worked like a charm.
答案2
得分: 1
以下是我在我的独立链接集群上更改的属性,以计算TPC-H查询03。
jobmanager.memory.process.size: 1600m
heartbeat.timeout: 100000
taskmanager.memory.process.size: 8g # 默认值:1728m
我将此查询实现为仅流式传输Order表,将其他表保留为状态。另外,我将其计算为无窗口查询,我认为这更有意义且更快。
public class TPCHQuery03 {
private final String topic = "topic-tpch-query-03";
public TPCHQuery03() {
this(PARAMETER_OUTPUT_LOG, "127.0.0.1", false, false, -1);
}
// ... (后续代码被省略)
}
UDF(用户定义函数)如下:OrderSource、OrderKeyedByCustomerProcessFunction、ShippingPriorityKeyedProcessFunction和SumShippingPriorityItem。由于状态不会被更新,我在这里使用了com.google.common.collect.ImmutableList
。我只保留了状态中必要的列,例如ImmutableList<Tuple2<Long, Double>> lineItemList
。
英文:
these are the properties that I changed on my link stand-alone cluster to compute the TPC-H query 03.
jobmanager.memory.process.size: 1600m
heartbeat.timeout: 100000
taskmanager.memory.process.size: 8g # defaul: 1728m
I implemented this query to stream only the Order table and I kept the other tables as a state. Also I am computing as a windowless query, which I think it makes more sense and it is faster.
public class TPCHQuery03 {
private final String topic = "topic-tpch-query-03";
public TPCHQuery03() {
this(PARAMETER_OUTPUT_LOG, "127.0.0.1", false, false, -1);
}
public TPCHQuery03(String output, String ipAddressSink, boolean disableOperatorChaining, boolean pinningPolicy, long maxCount) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
if (disableOperatorChaining) {
env.disableOperatorChaining();
}
DataStream<Order> orders = env
.addSource(new OrdersSource(maxCount)).name(OrdersSource.class.getSimpleName()).uid(OrdersSource.class.getSimpleName());
// Filter market segment "AUTOMOBILE"
// customers = customers.filter(new CustomerFilter());
// Filter all Orders with o_orderdate < 12.03.1995
DataStream<Order> ordersFiltered = orders
.filter(new OrderDateFilter("1995-03-12")).name(OrderDateFilter.class.getSimpleName()).uid(OrderDateFilter.class.getSimpleName());
// Join customers with orders and package them into a ShippingPriorityItem
DataStream<ShippingPriorityItem> customerWithOrders = ordersFiltered
.keyBy(new OrderKeySelector())
.process(new OrderKeyedByCustomerProcessFunction(pinningPolicy)).name(OrderKeyedByCustomerProcessFunction.class.getSimpleName()).uid(OrderKeyedByCustomerProcessFunction.class.getSimpleName());
// Join the last join result with Lineitems
DataStream<ShippingPriorityItem> result = customerWithOrders
.keyBy(new ShippingPriorityOrderKeySelector())
.process(new ShippingPriorityKeyedProcessFunction(pinningPolicy)).name(ShippingPriorityKeyedProcessFunction.class.getSimpleName()).uid(ShippingPriorityKeyedProcessFunction.class.getSimpleName());
// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
DataStream<ShippingPriorityItem> resultSum = result
.keyBy(new ShippingPriority3KeySelector())
.reduce(new SumShippingPriorityItem(pinningPolicy)).name(SumShippingPriorityItem.class.getSimpleName()).uid(SumShippingPriorityItem.class.getSimpleName());
// emit result
if (output.equalsIgnoreCase(PARAMETER_OUTPUT_MQTT)) {
resultSum
.map(new ShippingPriorityItemMap(pinningPolicy)).name(ShippingPriorityItemMap.class.getSimpleName()).uid(ShippingPriorityItemMap.class.getSimpleName())
.addSink(new MqttStringPublisher(ipAddressSink, topic, pinningPolicy)).name(OPERATOR_SINK).uid(OPERATOR_SINK);
} else if (output.equalsIgnoreCase(PARAMETER_OUTPUT_LOG)) {
resultSum.print().name(OPERATOR_SINK).uid(OPERATOR_SINK);
} else if (output.equalsIgnoreCase(PARAMETER_OUTPUT_FILE)) {
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(PATH_OUTPUT_FILE), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024).build())
.build();
resultSum
.map(new ShippingPriorityItemMap(pinningPolicy)).name(ShippingPriorityItemMap.class.getSimpleName()).uid(ShippingPriorityItemMap.class.getSimpleName())
.addSink(sink).name(OPERATOR_SINK).uid(OPERATOR_SINK);
} else {
System.out.println("discarding output");
}
System.out.println("Stream job: " + TPCHQuery03.class.getSimpleName());
System.out.println("Execution plan >>>\n" + env.getExecutionPlan());
env.execute(TPCHQuery03.class.getSimpleName());
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
new TPCHQuery03();
}
}
The UDFs are here: OrderSource, OrderKeyedByCustomerProcessFunction, ShippingPriorityKeyedProcessFunction, and SumShippingPriorityItem. I am using the com.google.common.collect.ImmutableList
since the state will not be updated. Also I am keeping only the necessary columns on the state, such as ImmutableList<Tuple2<Long, Double>> lineItemList
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论