英文:
What are the API calls made to create snapshot while using RocksDB as the state backend in Flink?
问题
我想对Flink调用RocksDB的每个API调用的所花时间进行分析。然而,我无法找到这些函数。
我尝试在IDE中设置了完整的Flink源代码,将我的流处理示例集成到源代码中,启动了调试器,并逐步进入了许多调用中,但都没有成功。
以下是示例代码:
package org.apache.flink.streaming.examples.spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
/**
* 正常的代码。
*/
public class FraudDetectionAvi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
env.enableCheckpointing(60000);
env.setStateBackend(new RocksDBStateBackend("file:///home/avsrivas/dev/flink/checkpoints", true));
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetectorAvi())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
点击此处查看完整的源代码。
我尝试步入execute
方法,但我无法推断出哪些函数调用了RocksDB以保存状态。
英文:
I want to profile the time taken for each of the API calls that Flink makes to the RocksDB. However, I am unable to find those functions.
I have tried setting up the complete source code of Flink in an IDE, integrated my streaming example in the source code, started the debugger and stepped into many of the calls made, but in vain.
Here is the example:
package org.apache.flink.streaming.examples.spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
/**
* Normal code.
*/
public class FraudDetectionAvi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
env.enableCheckpointing(60000);
env.setStateBackend(new RocksDBStateBackend("file:///home/avsrivas/dev/flink/checkpoints", true));
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetectorAvi())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
Click here for the complete source code.
I tried stepping into the execute but I could not deduce the functions that make the call to RocksDB for saving the state.
答案1
得分: 1
当RocksDB被用作Flink应用程序的状态后端时,任何分区键状态的工作副本都存储在每个任务管理器中的本地嵌入式RocksDB实例中。定时器也可以存储在那里,或者它们可能在堆上。RocksDB将其状态存储在本地磁盘上;非键控状态始终位于堆上。
当进行快照(即在检查点期间或进行保存点时),存储在RocksDB中的状态会(异步地)复制到快照存储中(应为分布式文件系统)。
在您的应用程序中,例如当您调用flagState.update(true)
时,实际上会在RocksDBValueState.java的这里结束,该位置使用以下代码写入RocksDB:
backend.db.put(columnFamily, writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
serializeValue(value));
稍后在快照过程中会发生的情况取决于您是使用增量检查点还是完整检查点,但您会在https://github.com/kebab-mai-haddi/flink/tree/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot中找到与RocksDB相关的代码。
请注意,快照不会存储在RocksDB中。增量快照是通过镜像SST文件来进行的,而完整快照涉及遍历状态后端中的所有状态,并将结果写出。
有关Flink如何使用RocksDB的更多信息,请搜索Stefan Richter的博客文章和Flink Forward的演讲。
英文:
When RocksDB is used as the state backend for a Flink application, then the working copy of any key-partitioned state is stored in a local, embedded RocksDB instance in each task manager. Timers may kept there as well, or they may be on the heap. RocksDB keeps its state on the local disk; non-keyed state is always on the heap.
When a snapshot is taken (i.e., during checkpointing or when taking a savepoint), the state stored in RocksDB is (asynchronously) copied to the snapshot storage (which should be a distributed filesystem).
In your application, when you call flagState.update(true)
, for example, that ends up here, in RocksDBValueState.java, which uses this code to write to RocksDB:
backend.db.put(columnFamily, writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
serializeValue(value));
What happens later, during snapshotting, depends on whether you use incremental or full checkpointing, but you'll find the RocksDB-specific code in https://github.com/kebab-mai-haddi/flink/tree/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot.
Note that the snapshots are not stored in RocksDB. Incremental snapshots are taken by mirroring the SST files, and full snapshots involve iterating over all of the state in the state backend and writing out the result.
For more on how Flink uses RocksDB, search for blog posts and Flink Forward talks by Stefan Richter.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论