Flink statebackend – 并行度如何与 RocksDB 配合工作?

huangapple go评论87阅读模式
英文:

Flink statebackend - how does parallelism work with RocksDB?

问题

在下面的代码中:

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

    private String checkpointsDir = "file://checkpoints/";
    private String rocksDBStateDir = "file://state/rocksdb/";

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);     
        EnvironmentSettings tableSettings = EnvironmentSettings
            .newInstance()
            .useBlinkPlanner()
            .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment
            .create(env, tableSettings);

        tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");

        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
        env.getCheckpointConfig().setCheckpointStorage(checkpointsDir);

        stateBackend = EmbeddedRocksDBStateBackend();
        stateBackend.setDbStoragePath(rocksDBStateDir);
        env.setStateBackend(stateBackend);

        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");

        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");

        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}

上述作业的并行度设置为3。env.setParallelism(3)

每个作业的运算符是否访问相同的单个 RocksDB 表(Orders)来读取或写入记录...

还是

并行度(在作业级别)意味着每个作业都有自己的 RocksDB?因为 RocksDB 是每个任务管理器的,无法保证每个作业在相同的任务管理器上运行。

回答: 并行度是指每个作业的任务数量,而不是每个作业是否有自己的 RocksDB。在上述代码中,设置了作业的并行度为3(env.setParallelism(3)),这意味着作业将会以3个并行任务运行。

RocksDB 是用于状态后端的存储引擎,通常与 Flink 的任务管理器关联。每个任务管理器只有一个 RocksDB 实例,它被用于所有作业的状态存储。因此,不同作业之间共享同一个 RocksDB 实例,它不会为每个作业创建单独的 RocksDB。如果多个作业在同一个任务管理器上运行,它们将共享相同的 RocksDB 存储。

所以,答案是每个作业不会有自己的 RocksDB,而是共享任务管理器上的同一个 RocksDB 实例。

英文:

In the below code:

package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
private String checkpointsDir  = &quot;file://checkpoints/&quot;
private String rocksDBStateDir = &quot;file://state/rocksdb/&quot;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.SetParallelism(3);     
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(env, tableSettings);
tableEnv.executeSql(&quot;CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)&quot;);
env.enableCheckpointing(5000)
env.checkpointConfig.minPauseBetweenCheckpoints = 100
env.checkpointConfig.setCheckpointStorage(checkpointsDir)
stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
env.stateBackend = stateBackend
DataStream&lt;Transaction&gt; transactions = env
.addSource(new TransactionSource())
.name(&quot;transactions&quot;);
DataStream&lt;Alert&gt; alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name(&quot;fraud-detector&quot;);
alerts
.addSink(new AlertSink())
.name(&quot;send-alerts&quot;);
env.execute(&quot;Fraud Detection&quot;);
}
}

Above job has parallelism set to 3. env.SetParallelism(3)

Does the operator in each job access the same single RocksDB table(Orders)? to read or write records...

Or

Does parallelism(at job level) mean every job has its own RocksDB? Because RocksDB is per task manager and there is no guarantee if each job runs in same task manager

答案1

得分: 1

RocksDB不是一个网络服务;它是一个嵌入式键/值存储。每个任务管理器中都有单独的RocksDB实例。

英文:

RocksDB is not a networked service; it's an embedded key/value store. There are separate RocksDB instances in each task manager.

huangapple
  • 本文由 发表于 2023年8月10日 11:11:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/76872428.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定