英文:
Flink statebackend - how does parallelism work with RocksDB?
问题
在下面的代码中:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
private String checkpointsDir = "file://checkpoints/";
private String rocksDBStateDir = "file://state/rocksdb/";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(env, tableSettings);
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
env.getCheckpointConfig().setCheckpointStorage(checkpointsDir);
stateBackend = EmbeddedRocksDBStateBackend();
stateBackend.setDbStoragePath(rocksDBStateDir);
env.setStateBackend(stateBackend);
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
上述作业的并行度设置为3。env.setParallelism(3)
。
每个作业的运算符是否访问相同的单个 RocksDB 表(Orders
)来读取或写入记录...
还是
并行度(在作业级别)意味着每个作业都有自己的 RocksDB?因为 RocksDB 是每个任务管理器的,无法保证每个作业在相同的任务管理器上运行。
回答: 并行度是指每个作业的任务数量,而不是每个作业是否有自己的 RocksDB。在上述代码中,设置了作业的并行度为3(env.setParallelism(3)
),这意味着作业将会以3个并行任务运行。
RocksDB 是用于状态后端的存储引擎,通常与 Flink 的任务管理器关联。每个任务管理器只有一个 RocksDB 实例,它被用于所有作业的状态存储。因此,不同作业之间共享同一个 RocksDB 实例,它不会为每个作业创建单独的 RocksDB。如果多个作业在同一个任务管理器上运行,它们将共享相同的 RocksDB 存储。
所以,答案是每个作业不会有自己的 RocksDB,而是共享任务管理器上的同一个 RocksDB 实例。
英文:
In the below code:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
private String checkpointsDir = "file://checkpoints/"
private String rocksDBStateDir = "file://state/rocksdb/"
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.SetParallelism(3);
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(env, tableSettings);
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
env.enableCheckpointing(5000)
env.checkpointConfig.minPauseBetweenCheckpoints = 100
env.checkpointConfig.setCheckpointStorage(checkpointsDir)
stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
env.stateBackend = stateBackend
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
Above job has parallelism set to 3. env.SetParallelism(3)
Does the operator in each job access the same single RocksDB table(Orders
)? to read or write records...
Or
Does parallelism(at job level) mean every job has its own RocksDB? Because RocksDB is per task manager and there is no guarantee if each job runs in same task manager
答案1
得分: 1
RocksDB不是一个网络服务;它是一个嵌入式键/值存储。每个任务管理器中都有单独的RocksDB实例。
英文:
RocksDB is not a networked service; it's an embedded key/value store. There are separate RocksDB instances in each task manager.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论