英文:
Does statebackend need checkpoint?
问题
以下是来自这里的代码,用于维护检查点:
class EnrichmentStream {
private val checkpointsDir = "file://${System.getProperty("user.dir")}/checkpoints/"
private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"
companion object {
@JvmStatic
fun main(args: Array<String>) {
EnrichmentStream().runStream()
}
}
fun runStream() {
val environment = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(Configuration())
environment.parallelism = 3
// Checkpoint Configurations
environment.enableCheckpointing(5000)
environment.checkpointConfig.minPauseBetweenCheckpoints = 100
environment.checkpointConfig.setCheckpointStorage(checkpointsDir)
val stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
environment.stateBackend = stateBackend
environment.checkpointConfig.externalizedCheckpointCleanup =
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// Configure Restart Strategy
environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))
val tableEnvironment = StreamTableEnvironment.create(environment)
// Run some SQL queries to check the existing Catalogs, Databases and Tables
tableEnvironment
.executeSql("SHOW CATALOGS")
.print()
tableEnvironment
.executeSql("SHOW DATABASES")
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql(Queries.CREATE_SENSORS_TABLE)
.print()
tableEnvironment
.executeSql(Queries.CREATE_READINGS_TABLE)
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql(Queries.JOIN_SENSOR_READINGS_WITH_INFO_QUERY)
.print()
}
}
Flink应用程序使用statebackend需要检查点吗?
英文:
Below is the code from here that maintains the checkpoint:
class EnrichmentStream {
private val checkpointsDir = "file://${System.getProperty("user.dir")}/checkpoints/"
private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"
companion object {
@JvmStatic
fun main(args: Array<String>) {
EnrichmentStream().runStream()
}
}
fun runStream() {
val environment = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(Configuration())
environment.parallelism = 3
// Checkpoint Configurations
environment.enableCheckpointing(5000)
environment.checkpointConfig.minPauseBetweenCheckpoints = 100
environment.checkpointConfig.setCheckpointStorage(checkpointsDir)
val stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
environment.stateBackend = stateBackend
environment.checkpointConfig.externalizedCheckpointCleanup =
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// Configure Restart Strategy
environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))
val tableEnvironment = StreamTableEnvironment.create(environment)
// Run some SQL queries to check the existing Catalogs, Databases and Tables
tableEnvironment
.executeSql("SHOW CATALOGS")
.print()
tableEnvironment
.executeSql("SHOW DATABASES")
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql(Queries.CREATE_SENSORS_TABLE)
.print()
tableEnvironment
.executeSql(Queries.CREATE_READINGS_TABLE)
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql(Queries.JOIN_SENSOR_READINGS_WITH_INFO_QUERY)
.print()
}
}
Does flink application with statebackend need checkpoint?
答案1
得分: 1
是的,你需要进行检查点操作。如果没有检查点操作,在发生故障时状态将会丢失。
然而,rocksDBStateDir
和checkpointsDir
应该位于不同的文件系统中。rocksDBStateDir
应该位于最快的本地文件系统中。这是保存工作状态的地方,如果发生故障,这个磁盘丢失也没关系,因为状态将从最新的检查点恢复。另一方面,checkpointsDir
应该位于分布式文件系统中,以确保始终可以进行故障恢复。
英文:
Yes, you need checkpointing. Without it, the state will be lost in the event of a failure.
However, the rocksDBStateDir
and checkpointsDir
should be in different file systems. The rocksDBStateDir
should be in the fastest available local file system. This is where the working state is kept, and it doesn't matter if this disk is lost when failures occur, since the state will be restored from the latest checkpoint. On the other hand, checkpointsDir
should be in a distributed file system, to ensure that failure recovery is always possible.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论