英文:
The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
问题
在 Dataproc 集群上提交 Flink 作业时出现以下错误。请找出代码基础和错误。我正在使用 Flink 1.9.3 版本。
程序以以下异常结束:
org.apache.flink.client.program.ProgramInvocationException: 无法检索执行结果。(JobID: f064ceaa5b318fdad9a77b2723b9ee64)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
...
...
...(省略中间部分)
...
...
... 4 more
我执行的代码片段:
public class ReadFromPubsub
{
public static void main(String args[]) throws Exception
{
System.out.println("Flink Pubsub Code Read 1");
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("Flink Pubsub Code Read 2");
DeserializationSchema<String> deserializer = new SimpleStringSchema();
System.out.println("Flink Pubsub Code Read 3");
SourceFunction<String> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializer)
.withProjectName("vz-it-np-gudv-dev-vzntdo-0")
.withSubscriptionName("subscription1")
.build();
System.out.println("Flink Pubsub Code Read 4");
streamExecEnv.addSource(pubsubSource).print();
streamExecEnv.enableCheckpointing(10);
System.out.println("Flink Pubsub Code Read 5");
streamExecEnv.execute();
}
}
我可以看到代码执行过程中的所有打印语句。在最后一个打印语句后,我遇到了错误。
需要解决所有的异常。
英文:
While submitting the flink job on the dataproc cluster getting the below error. Please find the code base and the error. I am using the flink 1.9.3 version.
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: f064ceaa5b318fdad9a77b2723b9ee64)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at org.flink.ReadFromPubsub.main(ReadFromPubsub.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:376)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:303)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
... 10 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:44)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:154)
at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:219)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:299)
... 20 more
End of exception on server side>]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more
Code snippet I executed
public class ReadFromPubsub
{
public static void main(String args[]) throws Exception
{
System.out.println("Flink Pubsub Code Read 1");
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("Flink Pubsub Code Read 2");
DeserializationSchema<String> deserializer = new SimpleStringSchema();
System.out.println("Flink Pubsub Code Read 3");
SourceFunction<String> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializer)
.withProjectName("vz-it-np-gudv-dev-vzntdo-0")
.withSubscriptionName("subscription1")
.build();
System.out.println("Flink Pubsub Code Read 4");
streamExecEnv.addSource(pubsubSource).print();
streamExecEnv.enableCheckpointing(10);
System.out.println("Flink Pubsub Code Read 5");
streamExecEnv.execute();
}
}
I can see all the print statements during the execution of the code. After the last print statement I am getting the error.
All the exceptions to be resolved
答案1
得分: 0
一般来说,在 Flink 配置文件 flink-conf.yaml
中提供适当的保存点(savepoint)/检查点(checkpoint)目录,具体细节可在文档中找到。如果您尚未进行设置,可以通过以下方式进行设置:
通过 flink-conf.yaml 文件(首选)
state.checkpoints.dir: "file://example/checkpoints"
通过编程方式(在作业内部):
Configuration configuration = new Configuration();
conf.setString("state.checkpoints.dir", "file://example/checkpoints");
StreamExecutionEnvironment streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
通过 CLI:
flink run -Dstate.checkpoints.dir="/example/checkpoints" your-job.jar
此外,如果您实际上并不希望执行检查点操作,您可以在作业中删除以下配置:
streamExecEnv.enableCheckpointing(10);
英文:
Generally, you would supply the appropriate savepoint/checkpointing directory within your Flink configuration within flink-conf.yaml
as detailed in the docs. If you aren't currently setting it, you can do so in the following ways:
Via flink-conf.yaml (Preferred)
state.checkpoints.dir: "file://example/checkpoints"
Programmatically (within the job):
Configuration configuration = new Configuration();
conf.setString("state.checkpoints.dir", "file://example/checkpoints");
StreamExecutionEnvironment streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
Via the CLI:
flink run -Dstate.checkpoints.dir="/example/checkpoints" your-job.jar
Additionally, if you didn't actually want to perform checkpointing, you could likely remove the following configuration within your job:
streamExecEnv.enableCheckpointing(10);
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论