状态后端是否需要检查点?

huangapple go评论91阅读模式
英文:

Does statebackend need checkpoint?

问题

以下是来自这里的代码,用于维护检查点:

class EnrichmentStream {
    private val checkpointsDir = "file://${System.getProperty("user.dir")}/checkpoints/"
    private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            EnrichmentStream().runStream()
        }
    }

    fun runStream() {
        val environment = StreamExecutionEnvironment
            .createLocalEnvironmentWithWebUI(Configuration())

        environment.parallelism = 3

        // Checkpoint Configurations
        environment.enableCheckpointing(5000)
        environment.checkpointConfig.minPauseBetweenCheckpoints = 100
        environment.checkpointConfig.setCheckpointStorage(checkpointsDir)

        val stateBackend = EmbeddedRocksDBStateBackend()
        stateBackend.setDbStoragePath(rocksDBStateDir)
        environment.stateBackend = stateBackend

        environment.checkpointConfig.externalizedCheckpointCleanup =
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION

        // Configure Restart Strategy
        environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))

        val tableEnvironment = StreamTableEnvironment.create(environment)

        // Run some SQL queries to check the existing Catalogs, Databases and Tables
        tableEnvironment
            .executeSql("SHOW CATALOGS")
            .print()

        tableEnvironment
            .executeSql("SHOW DATABASES")
            .print()

        tableEnvironment
            .executeSql("SHOW TABLES")
            .print()

        tableEnvironment
            .executeSql(Queries.CREATE_SENSORS_TABLE)
            .print()

        tableEnvironment
            .executeSql(Queries.CREATE_READINGS_TABLE)
            .print()

        tableEnvironment
            .executeSql("SHOW TABLES")
            .print()

        tableEnvironment
            .executeSql(Queries.JOIN_SENSOR_READINGS_WITH_INFO_QUERY)
            .print()
    }
}

Flink应用程序使用statebackend需要检查点吗?

英文:

Below is the code from here that maintains the checkpoint:

class EnrichmentStream {
    private val checkpointsDir  = &quot;file://${System.getProperty(&quot;user.dir&quot;)}/checkpoints/&quot;
    private val rocksDBStateDir = &quot;file://${System.getProperty(&quot;user.dir&quot;)}/state/rocksdb/&quot;

    companion object {
        @JvmStatic
        fun main(args: Array&lt;String&gt;) {
            EnrichmentStream().runStream()
        }
    }

    fun runStream() {
        val environment = StreamExecutionEnvironment
            .createLocalEnvironmentWithWebUI(Configuration())

        environment.parallelism = 3

        // Checkpoint Configurations
        environment.enableCheckpointing(5000)
        environment.checkpointConfig.minPauseBetweenCheckpoints = 100
        environment.checkpointConfig.setCheckpointStorage(checkpointsDir)

        val stateBackend = EmbeddedRocksDBStateBackend()
        stateBackend.setDbStoragePath(rocksDBStateDir)
        environment.stateBackend = stateBackend

        environment.checkpointConfig.externalizedCheckpointCleanup =
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION

        // Configure Restart Strategy
        environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))

        val tableEnvironment = StreamTableEnvironment.create(environment)

        // Run some SQL queries to check the existing Catalogs, Databases and Tables
        tableEnvironment
            .executeSql(&quot;SHOW CATALOGS&quot;)
            .print()

        tableEnvironment
            .executeSql(&quot;SHOW DATABASES&quot;)
            .print()

        tableEnvironment
            .executeSql(&quot;SHOW TABLES&quot;)
            .print()

        tableEnvironment
            .executeSql(Queries.CREATE_SENSORS_TABLE)
            .print()

        tableEnvironment
            .executeSql(Queries.CREATE_READINGS_TABLE)
            .print()

        tableEnvironment
            .executeSql(&quot;SHOW TABLES&quot;)
            .print()

        tableEnvironment
            .executeSql(Queries.JOIN_SENSOR_READINGS_WITH_INFO_QUERY)
            .print()
    }
}

Does flink application with statebackend need checkpoint?

答案1

得分: 1

是的,你需要进行检查点操作。如果没有检查点操作,在发生故障时状态将会丢失。

然而,rocksDBStateDircheckpointsDir应该位于不同的文件系统中。rocksDBStateDir应该位于最快的本地文件系统中。这是保存工作状态的地方,如果发生故障,这个磁盘丢失也没关系,因为状态将从最新的检查点恢复。另一方面,checkpointsDir应该位于分布式文件系统中,以确保始终可以进行故障恢复。

英文:

Yes, you need checkpointing. Without it, the state will be lost in the event of a failure.

However, the rocksDBStateDir and checkpointsDir should be in different file systems. The rocksDBStateDir should be in the fastest available local file system. This is where the working state is kept, and it doesn't matter if this disk is lost when failures occur, since the state will be restored from the latest checkpoint. On the other hand, checkpointsDir should be in a distributed file system, to ensure that failure recovery is always possible.

huangapple
  • 本文由 发表于 2023年8月9日 01:36:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76861977.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定